diff options
Diffstat (limited to 'bot/exts/utilities/githubinfo.py')
| -rw-r--r-- | bot/exts/utilities/githubinfo.py | 218 | 
1 files changed, 204 insertions, 14 deletions
| diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py index 539e388b..046f67df 100644 --- a/bot/exts/utilities/githubinfo.py +++ b/bot/exts/utilities/githubinfo.py @@ -1,30 +1,167 @@  import logging  import random +import re +import typing as t +from dataclasses import dataclass  from datetime import datetime -from urllib.parse import quote, quote_plus +from urllib.parse import quote  import discord +from aiohttp import ClientResponse  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES +from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens  from bot.exts.core.extensions import invoke_help_command  log = logging.getLogger(__name__)  GITHUB_API_URL = "https://api.github.com" +REQUEST_HEADERS = { +    "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}" + +CODE_BLOCK_RE = re.compile( +    r"^`([^`\n]+)`"   # Inline codeblock +    r"|```(.+?)```",  # Multiline codeblock +    re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( +    r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass(eq=True, frozen=True) +class FoundIssue: +    """Dataclass representing an issue found by the regex.""" + +    organisation: t.Optional[str] +    repository: str +    number: str + + +@dataclass(eq=True, frozen=True) +class FetchError: +    """Dataclass representing an error while fetching an issue.""" + +    return_code: int +    message: str + + +@dataclass(eq=True, frozen=True) +class IssueState: +    """Dataclass representing the state of an issue.""" + +    repository: str +    number: int +    url: str +    title: str +    emoji: str +  class GithubInfo(commands.Cog): -    """Fetches info from GitHub.""" +    """A Cog that fetches info from GitHub."""      def __init__(self, bot: Bot):          self.bot = bot +        self.repos = [] + +    @staticmethod +    def remove_codeblocks(message: str) -> str: +        """Remove any codeblock in a message.""" +        return CODE_BLOCK_RE.sub("", message) + +    async def fetch_issue( +        self, +        number: int, +        repository: str, +        user: str +    ) -> t.Union[IssueState, FetchError]: +        """ +        Retrieve an issue from a GitHub repository. + +        Returns IssueState on success, FetchError on failure. +        """ +        url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) +        pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + +        json_data, r = await self.fetch_data(url) + +        if r.status == 403: +            if r.headers.get("X-RateLimit-Remaining") == "0": +                log.info(f"Ratelimit reached while fetching {url}") +                return FetchError(403, "Ratelimit reached, please retry in a few minutes.") +            return FetchError(403, "Cannot access issue.") +        elif r.status in (404, 410): +            return FetchError(r.status, "Issue not found.") +        elif r.status != 200: +            return FetchError(r.status, "Error while fetching issue.") + +        # The initial API request is made to the issues API endpoint, which will return information +        # if the issue or PR is present. However, the scope of information returned for PRs differs +        # from issues: if the 'issues' key is present in the response then we can pull the data we +        # need from the initial API call. +        if "issues" in json_data["html_url"]: +            if json_data.get("state") == "open": +                emoji = Emojis.issue_open +            else: +                emoji = Emojis.issue_closed + +        # If the 'issues' key is not contained in the API response and there is no error code, then +        # we know that a PR has been requested and a call to the pulls API endpoint is necessary +        # to get the desired information for the PR. +        else: +            pull_data, _ = await self.fetch_data(pulls_url) +            if pull_data["draft"]: +                emoji = Emojis.pull_request_draft +            elif pull_data["state"] == "open": +                emoji = Emojis.pull_request_open +            # When 'merged_at' is not None, this means that the state of the PR is merged +            elif pull_data["merged_at"] is not None: +                emoji = Emojis.pull_request_merged +            else: +                emoji = Emojis.pull_request_closed + +        issue_url = json_data.get("html_url") + +        return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + +    @staticmethod +    def format_embed( +        results: t.List[t.Union[IssueState, FetchError]] +    ) -> discord.Embed: +        """Take a list of IssueState or FetchError and format a Discord embed for them.""" +        description_list = [] + +        for result in results: +            if isinstance(result, IssueState): +                description_list.append( +                    f"{result.emoji} [[{result.repository}] #{result.number} {result.title}]({result.url})" +                ) +            elif isinstance(result, FetchError): +                description_list.append(f":x: [{result.return_code}] {result.message}") + +        resp = discord.Embed( +            colour=Colours.bright_green, +            description="\n".join(description_list) +        ) -    async def fetch_data(self, url: str) -> dict: -        """Retrieve data as a dictionary.""" -        async with self.bot.http_session.get(url) as r: -            return await r.json() +        resp.set_author(name="GitHub") +        return resp      @commands.group(name="github", aliases=("gh", "git"))      @commands.cooldown(1, 10, commands.BucketType.user) @@ -33,11 +170,67 @@ class GithubInfo(commands.Cog):          if ctx.invoked_subcommand is None:              await invoke_help_command(ctx) +    @commands.Cog.listener() +    async def on_message(self, message: discord.Message) -> None: +        """ +        Automatic issue linking. + +        Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. +        """ +        # Ignore bots +        if message.author.bot: +            return + +        issues = [ +            FoundIssue(*match.group("org", "repo", "number")) +            for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) +        ] +        links = [] + +        if issues: +            # Block this from working in DMs +            if not message.guild: +                return + +            log.trace(f"Found {issues = }") +            # Remove duplicates +            issues = list(dict.fromkeys(issues)) + +            if len(issues) > MAXIMUM_ISSUES: +                embed = discord.Embed( +                    title=random.choice(ERROR_REPLIES), +                    color=Colours.soft_red, +                    description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" +                ) +                await message.channel.send(embed=embed, delete_after=5) +                return + +            for repo_issue in issues: +                result = await self.fetch_issue( +                    int(repo_issue.number), +                    repo_issue.repository, +                    repo_issue.organisation or "python-discord" +                ) +                if isinstance(result, IssueState): +                    links.append(result) + +        if not links: +            return + +        resp = self.format_embed(links) +        await message.channel.send(embed=resp) + +    async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]: +        """Retrieve data as a dictionary and the response in a tuple.""" +        log.trace(f"Querying GH issues API: {url}") +        async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: +            return await r.json(), r +      @github_group.command(name="user", aliases=("userinfo",))      async def github_user_info(self, ctx: commands.Context, username: str) -> None:          """Fetches a user's GitHub information."""          async with ctx.typing(): -            user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") +            user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")              # User_data will not have a message key if the user exists              if "message" in user_data: @@ -50,7 +243,7 @@ class GithubInfo(commands.Cog):                  await ctx.send(embed=embed)                  return -            org_data = await self.fetch_data(user_data["organizations_url"]) +            org_data, _ = await self.fetch_data(user_data["organizations_url"])              orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]              orgs_to_add = " | ".join(orgs) @@ -91,10 +284,7 @@ class GithubInfo(commands.Cog):              )              if user_data["type"] == "User": -                embed.add_field( -                    name="Gists", -                    value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" -                ) +                embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")                  embed.add_field(                      name=f"Organization{'s' if len(orgs)!=1 else ''}", @@ -123,7 +313,7 @@ class GithubInfo(commands.Cog):              return          async with ctx.typing(): -            repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") +            repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")              # There won't be a message key if this repo exists              if "message" in repo_data: | 
