diff options
Diffstat (limited to 'bot/exts/utilities')
-rw-r--r-- | bot/exts/utilities/bookmark.py | 123 | ||||
-rw-r--r-- | bot/exts/utilities/epoch.py | 7 | ||||
-rw-r--r-- | bot/exts/utilities/githubinfo.py | 218 | ||||
-rw-r--r-- | bot/exts/utilities/issues.py | 277 | ||||
-rw-r--r-- | bot/exts/utilities/realpython.py | 16 | ||||
-rw-r--r-- | bot/exts/utilities/twemoji.py | 150 |
6 files changed, 444 insertions, 347 deletions
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py index b50205a0..2e3458d8 100644 --- a/bot/exts/utilities/bookmark.py +++ b/bot/exts/utilities/bookmark.py @@ -16,6 +16,13 @@ log = logging.getLogger(__name__) # Number of seconds to wait for other users to bookmark the same message TIMEOUT = 120 BOOKMARK_EMOJI = "π" +MESSAGE_NOT_FOUND_ERROR = ( + "You must either provide a reference to a valid message, or reply to one." + "\n\nThe lookup strategy for a message is as follows (in order):" + "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" + "\n2. Lookup by message ID (the message **must** be in the current channel)" + "\n3. Lookup by message URL" +) class Bookmark(commands.Cog): @@ -42,51 +49,37 @@ class Bookmark(commands.Cog): return embed @staticmethod - def build_error_embed(user: discord.Member) -> discord.Embed: - """Builds an error embed for when a bookmark requester has DMs disabled.""" + def build_error_embed(message: str) -> discord.Embed: + """Builds an error embed for a given message.""" return discord.Embed( title=random.choice(ERROR_REPLIES), - description=f"{user.mention}, please enable your DMs to receive the bookmark.", + description=message, colour=Colours.soft_red ) async def action_bookmark( self, channel: discord.TextChannel, - user: discord.Member, + member: discord.Member, target_message: discord.Message, title: str ) -> None: - """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" + """ + Sends the given target_message as a bookmark to the member in DMs to the user. + + Send an error embed instead if the member has DMs disabled. + """ + embed = self.build_bookmark_dm(target_message, title) try: - embed = self.build_bookmark_dm(target_message, title) - await user.send(embed=embed) + await member.send(embed=embed) except discord.Forbidden: - error_embed = self.build_error_embed(user) + error_embed = self.build_error_embed(f"{member.mention}, please enable your DMs to receive the bookmark.") await channel.send(embed=error_embed) else: - log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") - - @staticmethod - async def send_reaction_embed( - channel: discord.TextChannel, - target_message: discord.Message - ) -> discord.Message: - """Sends an embed, with a reaction, so users can react to bookmark the message too.""" - message = await channel.send( - embed=discord.Embed( - description=( - f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " - f"[this message]({target_message.jump_url})." - ), - colour=Colours.soft_green - ) - ) - - await message.add_reaction(BOOKMARK_EMOJI) - return message + log.info(f"{member} bookmarked {target_message.jump_url} with title '{title}'") - @commands.command(name="bookmark", aliases=("bm", "pin")) + @commands.group(name="bookmark", aliases=("bm", "pin"), invoke_without_command=True) + @commands.guild_only() @whitelist_override(roles=(Roles.everyone,)) async def bookmark( self, @@ -95,30 +88,40 @@ class Bookmark(commands.Cog): *, title: str = "Bookmark" ) -> None: - """Send the author a link to `target_message` via DMs.""" - if not target_message: - if not ctx.message.reference: - raise commands.UserInputError( - "You must either provide a valid message to bookmark, or reply to one." - "\n\nThe lookup strategy for a message is as follows (in order):" - "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" - "\n2. Lookup by message ID (the message **must** be in the context channel)" - "\n3. Lookup by message URL" - ) - target_message = ctx.message.reference.resolved + """ + Send the author a link to the specified message via DMs. + + Members can either give a message as an argument, or reply to a message. + + Bookmarks can subsequently be deleted by using the `bookmark delete` command in DMs. + """ + target_message: Optional[discord.Message] = target_message or getattr(ctx.message.reference, "resolved", None) + if target_message is None: + raise commands.UserInputError(MESSAGE_NOT_FOUND_ERROR) # Prevent users from bookmarking a message in a channel they don't have access to permissions = target_message.channel.permissions_for(ctx.author) if not permissions.read_messages: log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.") - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description="You don't have permission to view this channel." - ) + embed = self.build_error_embed(f"{ctx.author.mention} You don't have permission to view this channel.") await ctx.send(embed=embed) return + await self.action_bookmark(ctx.channel, ctx.author, target_message, title) + + # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs + bookmarked_users = [ctx.author.id] + + reaction_embed = discord.Embed( + description=( + f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " + f"[this message]({ctx.message.jump_url})." + ), + colour=Colours.soft_green + ) + reaction_message = await ctx.send(embed=reaction_embed) + await reaction_message.add_reaction(BOOKMARK_EMOJI) + def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( @@ -134,11 +137,6 @@ class Bookmark(commands.Cog): user.id != self.bot.user.id )) ) - await self.action_bookmark(ctx.channel, ctx.author, target_message, title) - - # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs - bookmarked_users = [ctx.author.id] - reaction_message = await self.send_reaction_embed(ctx.channel, target_message) while True: try: @@ -152,6 +150,31 @@ class Bookmark(commands.Cog): await reaction_message.delete() + @bookmark.command(name="delete", aliases=("del", "rm"), root_aliases=("unbm", "unbookmark", "dmdelete", "dmdel")) + @whitelist_override(bypass_defaults=True, allow_dm=True) + async def delete_bookmark( + self, + ctx: commands.Context, + ) -> None: + """ + Delete the Sir-Lancebot message that the command invocation is replying to. + + This command allows deleting any message sent by Sir-Lancebot in the user's DM channel with the bot. + The command invocation must be a reply to the message that is to be deleted. + """ + target_message: Optional[discord.Message] = getattr(ctx.message.reference, "resolved", None) + if target_message is None: + raise commands.UserInputError("You must reply to the message from Sir-Lancebot you wish to delete.") + + if not isinstance(ctx.channel, discord.DMChannel): + raise commands.UserInputError("You can only run this command your own DMs!") + elif target_message.channel != ctx.channel: + raise commands.UserInputError("You can only delete messages in your own DMs!") + elif target_message.author != self.bot.user: + raise commands.UserInputError("You can only delete messages sent by Sir Lancebot!") + + await target_message.delete() + def setup(bot: Bot) -> None: """Load the Bookmark cog.""" diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py index b9feed18..42312dd1 100644 --- a/bot/exts/utilities/epoch.py +++ b/bot/exts/utilities/epoch.py @@ -35,7 +35,7 @@ class DateString(commands.Converter): """ try: return arrow.utcnow().dehumanize(argument) - except ValueError: + except (ValueError, OverflowError): try: dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True) except parser.ParserError: @@ -86,7 +86,10 @@ class Epoch(commands.Cog): view = TimestampMenuView(ctx, self._format_dates(date_time), epoch) original = await ctx.send(f"`{epoch}`", view=view) await view.wait() # wait until expiration before removing the dropdown - await original.edit(view=None) + try: + await original.edit(view=None) + except discord.NotFound: # disregard the error message if the message is deleled + pass @staticmethod def _format_dates(date: arrow.Arrow) -> list[str]: diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py index 539e388b..046f67df 100644 --- a/bot/exts/utilities/githubinfo.py +++ b/bot/exts/utilities/githubinfo.py @@ -1,30 +1,167 @@ import logging import random +import re +import typing as t +from dataclasses import dataclass from datetime import datetime -from urllib.parse import quote, quote_plus +from urllib.parse import quote import discord +from aiohttp import ClientResponse from discord.ext import commands from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES +from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens from bot.exts.core.extensions import invoke_help_command log = logging.getLogger(__name__) GITHUB_API_URL = "https://api.github.com" +REQUEST_HEADERS = { + "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if Tokens.github: + REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}" + +CODE_BLOCK_RE = re.compile( + r"^`([^`\n]+)`" # Inline codeblock + r"|```(.+?)```", # Multiline codeblock + re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( + r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass(eq=True, frozen=True) +class FoundIssue: + """Dataclass representing an issue found by the regex.""" + + organisation: t.Optional[str] + repository: str + number: str + + +@dataclass(eq=True, frozen=True) +class FetchError: + """Dataclass representing an error while fetching an issue.""" + + return_code: int + message: str + + +@dataclass(eq=True, frozen=True) +class IssueState: + """Dataclass representing the state of an issue.""" + + repository: str + number: int + url: str + title: str + emoji: str + class GithubInfo(commands.Cog): - """Fetches info from GitHub.""" + """A Cog that fetches info from GitHub.""" def __init__(self, bot: Bot): self.bot = bot + self.repos = [] + + @staticmethod + def remove_codeblocks(message: str) -> str: + """Remove any codeblock in a message.""" + return CODE_BLOCK_RE.sub("", message) + + async def fetch_issue( + self, + number: int, + repository: str, + user: str + ) -> t.Union[IssueState, FetchError]: + """ + Retrieve an issue from a GitHub repository. + + Returns IssueState on success, FetchError on failure. + """ + url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) + pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + + json_data, r = await self.fetch_data(url) + + if r.status == 403: + if r.headers.get("X-RateLimit-Remaining") == "0": + log.info(f"Ratelimit reached while fetching {url}") + return FetchError(403, "Ratelimit reached, please retry in a few minutes.") + return FetchError(403, "Cannot access issue.") + elif r.status in (404, 410): + return FetchError(r.status, "Issue not found.") + elif r.status != 200: + return FetchError(r.status, "Error while fetching issue.") + + # The initial API request is made to the issues API endpoint, which will return information + # if the issue or PR is present. However, the scope of information returned for PRs differs + # from issues: if the 'issues' key is present in the response then we can pull the data we + # need from the initial API call. + if "issues" in json_data["html_url"]: + if json_data.get("state") == "open": + emoji = Emojis.issue_open + else: + emoji = Emojis.issue_closed + + # If the 'issues' key is not contained in the API response and there is no error code, then + # we know that a PR has been requested and a call to the pulls API endpoint is necessary + # to get the desired information for the PR. + else: + pull_data, _ = await self.fetch_data(pulls_url) + if pull_data["draft"]: + emoji = Emojis.pull_request_draft + elif pull_data["state"] == "open": + emoji = Emojis.pull_request_open + # When 'merged_at' is not None, this means that the state of the PR is merged + elif pull_data["merged_at"] is not None: + emoji = Emojis.pull_request_merged + else: + emoji = Emojis.pull_request_closed + + issue_url = json_data.get("html_url") + + return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + + @staticmethod + def format_embed( + results: t.List[t.Union[IssueState, FetchError]] + ) -> discord.Embed: + """Take a list of IssueState or FetchError and format a Discord embed for them.""" + description_list = [] + + for result in results: + if isinstance(result, IssueState): + description_list.append( + f"{result.emoji} [[{result.repository}] #{result.number} {result.title}]({result.url})" + ) + elif isinstance(result, FetchError): + description_list.append(f":x: [{result.return_code}] {result.message}") + + resp = discord.Embed( + colour=Colours.bright_green, + description="\n".join(description_list) + ) - async def fetch_data(self, url: str) -> dict: - """Retrieve data as a dictionary.""" - async with self.bot.http_session.get(url) as r: - return await r.json() + resp.set_author(name="GitHub") + return resp @commands.group(name="github", aliases=("gh", "git")) @commands.cooldown(1, 10, commands.BucketType.user) @@ -33,11 +170,67 @@ class GithubInfo(commands.Cog): if ctx.invoked_subcommand is None: await invoke_help_command(ctx) + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """ + Automatic issue linking. + + Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. + """ + # Ignore bots + if message.author.bot: + return + + issues = [ + FoundIssue(*match.group("org", "repo", "number")) + for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) + ] + links = [] + + if issues: + # Block this from working in DMs + if not message.guild: + return + + log.trace(f"Found {issues = }") + # Remove duplicates + issues = list(dict.fromkeys(issues)) + + if len(issues) > MAXIMUM_ISSUES: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" + ) + await message.channel.send(embed=embed, delete_after=5) + return + + for repo_issue in issues: + result = await self.fetch_issue( + int(repo_issue.number), + repo_issue.repository, + repo_issue.organisation or "python-discord" + ) + if isinstance(result, IssueState): + links.append(result) + + if not links: + return + + resp = self.format_embed(links) + await message.channel.send(embed=resp) + + async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]: + """Retrieve data as a dictionary and the response in a tuple.""" + log.trace(f"Querying GH issues API: {url}") + async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: + return await r.json(), r + @github_group.command(name="user", aliases=("userinfo",)) async def github_user_info(self, ctx: commands.Context, username: str) -> None: """Fetches a user's GitHub information.""" async with ctx.typing(): - user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") + user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}") # User_data will not have a message key if the user exists if "message" in user_data: @@ -50,7 +243,7 @@ class GithubInfo(commands.Cog): await ctx.send(embed=embed) return - org_data = await self.fetch_data(user_data["organizations_url"]) + org_data, _ = await self.fetch_data(user_data["organizations_url"]) orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] orgs_to_add = " | ".join(orgs) @@ -91,10 +284,7 @@ class GithubInfo(commands.Cog): ) if user_data["type"] == "User": - embed.add_field( - name="Gists", - value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" - ) + embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})") embed.add_field( name=f"Organization{'s' if len(orgs)!=1 else ''}", @@ -123,7 +313,7 @@ class GithubInfo(commands.Cog): return async with ctx.typing(): - repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") # There won't be a message key if this repo exists if "message" in repo_data: diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py deleted file mode 100644 index b6d5a43e..00000000 --- a/bot/exts/utilities/issues.py +++ /dev/null @@ -1,277 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( - Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { - 404: "Issue/pull request not located! Please enter a valid number!", - 403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { - "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: - REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( - Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( - r"^`([^`\n]+)`" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( - r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: - """Dataclass representing an issue found by the regex.""" - - organisation: Optional[str] - repository: str - number: str - - def __hash__(self) -> int: - return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: - """Dataclass representing an error while fetching an issue.""" - - return_code: int - message: str - - -@dataclass -class IssueState: - """Dataclass representing the state of an issue.""" - - repository: str - number: int - url: str - title: str - emoji: str - - -class Issues(commands.Cog): - """Cog that allows users to retrieve issues from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.repos = [] - - @staticmethod - def remove_codeblocks(message: str) -> str: - """Remove any codeblock in a message.""" - return re.sub(CODE_BLOCK_RE, "", message) - - async def fetch_issues( - self, - number: int, - repository: str, - user: str - ) -> Union[IssueState, FetchError]: - """ - Retrieve an issue from a GitHub repository. - - Returns IssueState on success, FetchError on failure. - """ - url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) - pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) - log.trace(f"Querying GH issues API: {url}") - - async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: - json_data = await r.json() - - if r.status == 403: - if r.headers.get("X-RateLimit-Remaining") == "0": - log.info(f"Ratelimit reached while fetching {url}") - return FetchError(403, "Ratelimit reached, please retry in a few minutes.") - return FetchError(403, "Cannot access issue.") - elif r.status in (404, 410): - return FetchError(r.status, "Issue not found.") - elif r.status != 200: - return FetchError(r.status, "Error while fetching issue.") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data["html_url"]: - if json_data.get("state") == "open": - emoji = Emojis.issue_open - else: - emoji = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") - async with self.bot.http_session.get(pulls_url) as p: - pull_data = await p.json() - if pull_data["draft"]: - emoji = Emojis.pull_request_draft - elif pull_data["state"] == "open": - emoji = Emojis.pull_request_open - # When 'merged_at' is not None, this means that the state of the PR is merged - elif pull_data["merged_at"] is not None: - emoji = Emojis.pull_request_merged - else: - emoji = Emojis.pull_request_closed - - issue_url = json_data.get("html_url") - - return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - - @staticmethod - def format_embed( - results: list[Union[IssueState, FetchError]], - user: str, - repository: Optional[str] = None - ) -> discord.Embed: - """Take a list of IssueState or FetchError and format a Discord embed for them.""" - description_list = [] - - for result in results: - if isinstance(result, IssueState): - description_list.append(f"{result.emoji} [{result.title}]({result.url})") - elif isinstance(result, FetchError): - description_list.append(f":x: [{result.return_code}] {result.message}") - - resp = discord.Embed( - colour=Colours.bright_green, - description="\n".join(description_list) - ) - - embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" - resp.set_author(name="GitHub", url=embed_url) - return resp - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(aliases=("issues", "pr", "prs")) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" - # Remove duplicates - numbers = set(numbers) - - err_message = None - if not numbers: - err_message = "You must have at least one issue/PR!" - - elif len(numbers) > MAXIMUM_ISSUES: - err_message = f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - - # If there's an error with command invocation then send an error embed - if err_message is not None: - err_embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=err_message - ) - await ctx.send(embed=err_embed) - await invoke_help_command(ctx) - return - - results = [await self.fetch_issues(number, repository, user) for number in numbers] - await ctx.send(embed=self.format_embed(results, user, repository)) - - @commands.Cog.listener() - async def on_message(self, message: discord.Message) -> None: - """ - Automatic issue linking. - - Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. - """ - # Ignore bots - if message.author.bot: - return - - issues = [ - FoundIssue(*match.group("org", "repo", "number")) - for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) - ] - links = [] - - if issues: - # Block this from working in DMs - if not message.guild: - await message.channel.send( - embed=discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "You can't retrieve issues from DMs. " - f"Try again in <#{Channels.community_bot_commands}>" - ), - colour=Colours.soft_red - ) - ) - return - - log.trace(f"Found {issues = }") - # Remove duplicates - issues = set(issues) - - if len(issues) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await message.channel.send(embed=embed, delete_after=5) - return - - for repo_issue in issues: - result = await self.fetch_issues( - int(repo_issue.number), - repo_issue.repository, - repo_issue.organisation or "python-discord" - ) - if isinstance(result, IssueState): - links.append(result) - - if not links: - return - - resp = self.format_embed(links, "python-discord") - await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: - """Load the Issues cog.""" - bot.add_cog(Issues(bot)) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py index bf8f1341..5e9757d0 100644 --- a/bot/exts/utilities/realpython.py +++ b/bot/exts/utilities/realpython.py @@ -11,11 +11,10 @@ from bot.constants import Colours logger = logging.getLogger(__name__) - API_ROOT = "https://realpython.com/search/api/v1/" ARTICLE_URL = "https://realpython.com{article_url}" SEARCH_URL = "https://realpython.com/search?q={user_search}" - +HOME_URL = "https://realpython.com/" ERROR_EMBED = Embed( title="Error while searching Real Python", @@ -32,13 +31,22 @@ class RealPython(commands.Cog): @commands.command(aliases=["rp"]) @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None: + async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, + user_search: Optional[str] = None) -> None: """ Send some articles from RealPython that match the search terms. - By default the top 5 matches are sent, this can be overwritten to + By default, the top 5 matches are sent. This can be overwritten to a number between 1 and 5 by specifying an amount before the search query. + If no search query is specified by the user, the home page is sent. """ + if user_search is None: + home_page_embed = Embed(title="Real Python Home Page", url=HOME_URL, colour=Colours.orange) + + await ctx.send(embed=home_page_embed) + + return + if not 1 <= amount <= 5: await ctx.send("`amount` must be between 1 and 5 (inclusive).") return diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py new file mode 100644 index 00000000..c915f05b --- /dev/null +++ b/bot/exts/utilities/twemoji.py @@ -0,0 +1,150 @@ +import logging +import re +from typing import Literal, Optional + +import discord +from discord.ext import commands +from emoji import UNICODE_EMOJI_ENGLISH, is_emoji + +from bot.bot import Bot +from bot.constants import Colours, Roles +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) +BASE_URLS = { + "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/", + "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/", +} +CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$") + + +class Twemoji(commands.Cog): + """Utilities for working with Twemojis.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def get_url(codepoint: str, format: Literal["png", "svg"]) -> str: + """Returns a source file URL for the specified Twemoji, in the corresponding format.""" + return f"{BASE_URLS[format]}{codepoint}.{format}" + + @staticmethod + def alias_to_name(alias: str) -> str: + """ + Transform a unicode alias to an emoji name. + + Example usages: + >>> alias_to_name(":falling_leaf:") + "Falling leaf" + >>> alias_to_name(":family_man_girl_boy:") + "Family man girl boy" + """ + name = alias.strip(":").replace("_", " ") + return name.capitalize() + + @staticmethod + def build_embed(codepoint: str) -> discord.Embed: + """Returns the main embed for the `twemoji` commmand.""" + emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-")) + + embed = discord.Embed( + title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]), + description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})", + colour=Colours.twitter_blue, + ) + embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png")) + return embed + + @staticmethod + def emoji(codepoint: Optional[str]) -> Optional[str]: + """ + Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found. + + The return value is an emoji character, such as "π". The `codepoint` + argument can be of any format, since it will be trimmed automatically. + """ + if code := Twemoji.trim_code(codepoint): + return chr(int(code, 16)) + + @staticmethod + def codepoint(emoji: Optional[str]) -> Optional[str]: + """ + Returns the codepoint, in a trimmed format, of a single emoji. + + `emoji` should be an emoji character, such as "π" and "π₯°", and + not a codepoint like "1f1f8". When working with combined emojis, + such as "πΈπͺ" and "π¨βπ©βπ¦", send the component emojis through the method + one at a time. + """ + if emoji is None: + return None + return hex(ord(emoji)).removeprefix("0x") + + @staticmethod + def trim_code(codepoint: Optional[str]) -> Optional[str]: + """ + Returns the meaningful information from the given `codepoint`. + + If no codepoint is found, `None` is returned. + + Example usages: + >>> trim_code("U+1f1f8") + "1f1f8" + >>> trim_code("\u0001f1f8") + "1f1f8" + >>> trim_code("1f466") + "1f466" + """ + if code := CODEPOINT_REGEX.search(codepoint or ""): + return code.group() + + @staticmethod + def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str: + """ + Returns the codepoint corresponding to the passed tuple, separated by "-". + + The return format matches the format used in URLs for Twemoji source files. + + Example usages: + >>> codepoint_from_input(("π",)) + "1f40d" + >>> codepoint_from_input(("1f1f8", "1f1ea")) + "1f1f8-1f1ea" + >>> codepoint_from_input(("π¨βπ§βπ¦",)) + "1f468-200d-1f467-200d-1f466" + """ + raw_emoji = [emoji.lower() for emoji in raw_emoji] + if is_emoji(raw_emoji[0]): + emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0]) + return "-".join(emojis) + + emoji = "".join( + Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji + ) + if is_emoji(emoji): + return "-".join(Twemoji.codepoint(e) or "" for e in emoji) + + raise ValueError("No codepoint could be obtained from the given input") + + @commands.command(aliases=("tw",)) + @whitelist_override(roles=(Roles.everyone,)) + async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None: + """Sends a preview of a given Twemoji, specified by codepoint or emoji.""" + if len(raw_emoji) == 0: + await invoke_help_command(ctx) + return + try: + codepoint = self.codepoint_from_input(raw_emoji) + except ValueError: + raise commands.BadArgument( + "please include a valid emoji or emoji codepoint." + ) + + await ctx.send(embed=self.build_embed(codepoint)) + + +def setup(bot: Bot) -> None: + """Load the Twemoji cog.""" + bot.add_cog(Twemoji(bot)) |