aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/utilities
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/utilities')
-rw-r--r--bot/exts/utilities/bookmark.py123
-rw-r--r--bot/exts/utilities/epoch.py7
-rw-r--r--bot/exts/utilities/githubinfo.py218
-rw-r--r--bot/exts/utilities/issues.py277
-rw-r--r--bot/exts/utilities/realpython.py16
-rw-r--r--bot/exts/utilities/twemoji.py150
6 files changed, 444 insertions, 347 deletions
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py
index b50205a0..2e3458d8 100644
--- a/bot/exts/utilities/bookmark.py
+++ b/bot/exts/utilities/bookmark.py
@@ -16,6 +16,13 @@ log = logging.getLogger(__name__)
# Number of seconds to wait for other users to bookmark the same message
TIMEOUT = 120
BOOKMARK_EMOJI = "πŸ“Œ"
+MESSAGE_NOT_FOUND_ERROR = (
+ "You must either provide a reference to a valid message, or reply to one."
+ "\n\nThe lookup strategy for a message is as follows (in order):"
+ "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
+ "\n2. Lookup by message ID (the message **must** be in the current channel)"
+ "\n3. Lookup by message URL"
+)
class Bookmark(commands.Cog):
@@ -42,51 +49,37 @@ class Bookmark(commands.Cog):
return embed
@staticmethod
- def build_error_embed(user: discord.Member) -> discord.Embed:
- """Builds an error embed for when a bookmark requester has DMs disabled."""
+ def build_error_embed(message: str) -> discord.Embed:
+ """Builds an error embed for a given message."""
return discord.Embed(
title=random.choice(ERROR_REPLIES),
- description=f"{user.mention}, please enable your DMs to receive the bookmark.",
+ description=message,
colour=Colours.soft_red
)
async def action_bookmark(
self,
channel: discord.TextChannel,
- user: discord.Member,
+ member: discord.Member,
target_message: discord.Message,
title: str
) -> None:
- """Sends the bookmark DM, or sends an error embed when a user bookmarks a message."""
+ """
+ Sends the given target_message as a bookmark to the member in DMs to the user.
+
+ Send an error embed instead if the member has DMs disabled.
+ """
+ embed = self.build_bookmark_dm(target_message, title)
try:
- embed = self.build_bookmark_dm(target_message, title)
- await user.send(embed=embed)
+ await member.send(embed=embed)
except discord.Forbidden:
- error_embed = self.build_error_embed(user)
+ error_embed = self.build_error_embed(f"{member.mention}, please enable your DMs to receive the bookmark.")
await channel.send(embed=error_embed)
else:
- log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'")
-
- @staticmethod
- async def send_reaction_embed(
- channel: discord.TextChannel,
- target_message: discord.Message
- ) -> discord.Message:
- """Sends an embed, with a reaction, so users can react to bookmark the message too."""
- message = await channel.send(
- embed=discord.Embed(
- description=(
- f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to "
- f"[this message]({target_message.jump_url})."
- ),
- colour=Colours.soft_green
- )
- )
-
- await message.add_reaction(BOOKMARK_EMOJI)
- return message
+ log.info(f"{member} bookmarked {target_message.jump_url} with title '{title}'")
- @commands.command(name="bookmark", aliases=("bm", "pin"))
+ @commands.group(name="bookmark", aliases=("bm", "pin"), invoke_without_command=True)
+ @commands.guild_only()
@whitelist_override(roles=(Roles.everyone,))
async def bookmark(
self,
@@ -95,30 +88,40 @@ class Bookmark(commands.Cog):
*,
title: str = "Bookmark"
) -> None:
- """Send the author a link to `target_message` via DMs."""
- if not target_message:
- if not ctx.message.reference:
- raise commands.UserInputError(
- "You must either provide a valid message to bookmark, or reply to one."
- "\n\nThe lookup strategy for a message is as follows (in order):"
- "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
- "\n2. Lookup by message ID (the message **must** be in the context channel)"
- "\n3. Lookup by message URL"
- )
- target_message = ctx.message.reference.resolved
+ """
+ Send the author a link to the specified message via DMs.
+
+ Members can either give a message as an argument, or reply to a message.
+
+ Bookmarks can subsequently be deleted by using the `bookmark delete` command in DMs.
+ """
+ target_message: Optional[discord.Message] = target_message or getattr(ctx.message.reference, "resolved", None)
+ if target_message is None:
+ raise commands.UserInputError(MESSAGE_NOT_FOUND_ERROR)
# Prevent users from bookmarking a message in a channel they don't have access to
permissions = target_message.channel.permissions_for(ctx.author)
if not permissions.read_messages:
log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.")
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description="You don't have permission to view this channel."
- )
+ embed = self.build_error_embed(f"{ctx.author.mention} You don't have permission to view this channel.")
await ctx.send(embed=embed)
return
+ await self.action_bookmark(ctx.channel, ctx.author, target_message, title)
+
+ # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs
+ bookmarked_users = [ctx.author.id]
+
+ reaction_embed = discord.Embed(
+ description=(
+ f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to "
+ f"[this message]({ctx.message.jump_url})."
+ ),
+ colour=Colours.soft_green
+ )
+ reaction_message = await ctx.send(embed=reaction_embed)
+ await reaction_message.add_reaction(BOOKMARK_EMOJI)
+
def event_check(reaction: discord.Reaction, user: discord.Member) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
@@ -134,11 +137,6 @@ class Bookmark(commands.Cog):
user.id != self.bot.user.id
))
)
- await self.action_bookmark(ctx.channel, ctx.author, target_message, title)
-
- # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs
- bookmarked_users = [ctx.author.id]
- reaction_message = await self.send_reaction_embed(ctx.channel, target_message)
while True:
try:
@@ -152,6 +150,31 @@ class Bookmark(commands.Cog):
await reaction_message.delete()
+ @bookmark.command(name="delete", aliases=("del", "rm"), root_aliases=("unbm", "unbookmark", "dmdelete", "dmdel"))
+ @whitelist_override(bypass_defaults=True, allow_dm=True)
+ async def delete_bookmark(
+ self,
+ ctx: commands.Context,
+ ) -> None:
+ """
+ Delete the Sir-Lancebot message that the command invocation is replying to.
+
+ This command allows deleting any message sent by Sir-Lancebot in the user's DM channel with the bot.
+ The command invocation must be a reply to the message that is to be deleted.
+ """
+ target_message: Optional[discord.Message] = getattr(ctx.message.reference, "resolved", None)
+ if target_message is None:
+ raise commands.UserInputError("You must reply to the message from Sir-Lancebot you wish to delete.")
+
+ if not isinstance(ctx.channel, discord.DMChannel):
+ raise commands.UserInputError("You can only run this command your own DMs!")
+ elif target_message.channel != ctx.channel:
+ raise commands.UserInputError("You can only delete messages in your own DMs!")
+ elif target_message.author != self.bot.user:
+ raise commands.UserInputError("You can only delete messages sent by Sir Lancebot!")
+
+ await target_message.delete()
+
def setup(bot: Bot) -> None:
"""Load the Bookmark cog."""
diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py
index b9feed18..42312dd1 100644
--- a/bot/exts/utilities/epoch.py
+++ b/bot/exts/utilities/epoch.py
@@ -35,7 +35,7 @@ class DateString(commands.Converter):
"""
try:
return arrow.utcnow().dehumanize(argument)
- except ValueError:
+ except (ValueError, OverflowError):
try:
dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True)
except parser.ParserError:
@@ -86,7 +86,10 @@ class Epoch(commands.Cog):
view = TimestampMenuView(ctx, self._format_dates(date_time), epoch)
original = await ctx.send(f"`{epoch}`", view=view)
await view.wait() # wait until expiration before removing the dropdown
- await original.edit(view=None)
+ try:
+ await original.edit(view=None)
+ except discord.NotFound: # disregard the error message if the message is deleled
+ pass
@staticmethod
def _format_dates(date: arrow.Arrow) -> list[str]:
diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py
index 539e388b..046f67df 100644
--- a/bot/exts/utilities/githubinfo.py
+++ b/bot/exts/utilities/githubinfo.py
@@ -1,30 +1,167 @@
import logging
import random
+import re
+import typing as t
+from dataclasses import dataclass
from datetime import datetime
-from urllib.parse import quote, quote_plus
+from urllib.parse import quote
import discord
+from aiohttp import ClientResponse
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES
+from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens
from bot.exts.core.extensions import invoke_help_command
log = logging.getLogger(__name__)
GITHUB_API_URL = "https://api.github.com"
+REQUEST_HEADERS = {
+ "Accept": "application/vnd.github.v3+json"
+}
+
+REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
+ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
+PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
+
+if Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}"
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+# Maximum number of issues in one message
+MAXIMUM_ISSUES = 5
+
+# Regex used when looking for automatic linking in messages
+# regex101 of current regex https://regex101.com/r/V2ji8M/6
+AUTOMATIC_REGEX = re.compile(
+ r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
+)
+
+
+@dataclass(eq=True, frozen=True)
+class FoundIssue:
+ """Dataclass representing an issue found by the regex."""
+
+ organisation: t.Optional[str]
+ repository: str
+ number: str
+
+
+@dataclass(eq=True, frozen=True)
+class FetchError:
+ """Dataclass representing an error while fetching an issue."""
+
+ return_code: int
+ message: str
+
+
+@dataclass(eq=True, frozen=True)
+class IssueState:
+ """Dataclass representing the state of an issue."""
+
+ repository: str
+ number: int
+ url: str
+ title: str
+ emoji: str
+
class GithubInfo(commands.Cog):
- """Fetches info from GitHub."""
+ """A Cog that fetches info from GitHub."""
def __init__(self, bot: Bot):
self.bot = bot
+ self.repos = []
+
+ @staticmethod
+ def remove_codeblocks(message: str) -> str:
+ """Remove any codeblock in a message."""
+ return CODE_BLOCK_RE.sub("", message)
+
+ async def fetch_issue(
+ self,
+ number: int,
+ repository: str,
+ user: str
+ ) -> t.Union[IssueState, FetchError]:
+ """
+ Retrieve an issue from a GitHub repository.
+
+ Returns IssueState on success, FetchError on failure.
+ """
+ url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
+ pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
+
+ json_data, r = await self.fetch_data(url)
+
+ if r.status == 403:
+ if r.headers.get("X-RateLimit-Remaining") == "0":
+ log.info(f"Ratelimit reached while fetching {url}")
+ return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
+ return FetchError(403, "Cannot access issue.")
+ elif r.status in (404, 410):
+ return FetchError(r.status, "Issue not found.")
+ elif r.status != 200:
+ return FetchError(r.status, "Error while fetching issue.")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data["html_url"]:
+ if json_data.get("state") == "open":
+ emoji = Emojis.issue_open
+ else:
+ emoji = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ pull_data, _ = await self.fetch_data(pulls_url)
+ if pull_data["draft"]:
+ emoji = Emojis.pull_request_draft
+ elif pull_data["state"] == "open":
+ emoji = Emojis.pull_request_open
+ # When 'merged_at' is not None, this means that the state of the PR is merged
+ elif pull_data["merged_at"] is not None:
+ emoji = Emojis.pull_request_merged
+ else:
+ emoji = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
+
+ return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
+
+ @staticmethod
+ def format_embed(
+ results: t.List[t.Union[IssueState, FetchError]]
+ ) -> discord.Embed:
+ """Take a list of IssueState or FetchError and format a Discord embed for them."""
+ description_list = []
+
+ for result in results:
+ if isinstance(result, IssueState):
+ description_list.append(
+ f"{result.emoji} [[{result.repository}] #{result.number} {result.title}]({result.url})"
+ )
+ elif isinstance(result, FetchError):
+ description_list.append(f":x: [{result.return_code}] {result.message}")
+
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description="\n".join(description_list)
+ )
- async def fetch_data(self, url: str) -> dict:
- """Retrieve data as a dictionary."""
- async with self.bot.http_session.get(url) as r:
- return await r.json()
+ resp.set_author(name="GitHub")
+ return resp
@commands.group(name="github", aliases=("gh", "git"))
@commands.cooldown(1, 10, commands.BucketType.user)
@@ -33,11 +170,67 @@ class GithubInfo(commands.Cog):
if ctx.invoked_subcommand is None:
await invoke_help_command(ctx)
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """
+ Automatic issue linking.
+
+ Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
+ """
+ # Ignore bots
+ if message.author.bot:
+ return
+
+ issues = [
+ FoundIssue(*match.group("org", "repo", "number"))
+ for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
+ ]
+ links = []
+
+ if issues:
+ # Block this from working in DMs
+ if not message.guild:
+ return
+
+ log.trace(f"Found {issues = }")
+ # Remove duplicates
+ issues = list(dict.fromkeys(issues))
+
+ if len(issues) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await message.channel.send(embed=embed, delete_after=5)
+ return
+
+ for repo_issue in issues:
+ result = await self.fetch_issue(
+ int(repo_issue.number),
+ repo_issue.repository,
+ repo_issue.organisation or "python-discord"
+ )
+ if isinstance(result, IssueState):
+ links.append(result)
+
+ if not links:
+ return
+
+ resp = self.format_embed(links)
+ await message.channel.send(embed=resp)
+
+ async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]:
+ """Retrieve data as a dictionary and the response in a tuple."""
+ log.trace(f"Querying GH issues API: {url}")
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
+ return await r.json(), r
+
@github_group.command(name="user", aliases=("userinfo",))
async def github_user_info(self, ctx: commands.Context, username: str) -> None:
"""Fetches a user's GitHub information."""
async with ctx.typing():
- user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
+ user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")
# User_data will not have a message key if the user exists
if "message" in user_data:
@@ -50,7 +243,7 @@ class GithubInfo(commands.Cog):
await ctx.send(embed=embed)
return
- org_data = await self.fetch_data(user_data["organizations_url"])
+ org_data, _ = await self.fetch_data(user_data["organizations_url"])
orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
orgs_to_add = " | ".join(orgs)
@@ -91,10 +284,7 @@ class GithubInfo(commands.Cog):
)
if user_data["type"] == "User":
- embed.add_field(
- name="Gists",
- value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
- )
+ embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")
embed.add_field(
name=f"Organization{'s' if len(orgs)!=1 else ''}",
@@ -123,7 +313,7 @@ class GithubInfo(commands.Cog):
return
async with ctx.typing():
- repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
+ repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
# There won't be a message key if this repo exists
if "message" in repo_data:
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
deleted file mode 100644
index b6d5a43e..00000000
--- a/bot/exts/utilities/issues.py
+++ /dev/null
@@ -1,277 +0,0 @@
-import logging
-import random
-import re
-from dataclasses import dataclass
-from typing import Optional, Union
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import (
- Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS
-)
-from bot.utils.decorators import whitelist_override
-from bot.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-BAD_RESPONSE = {
- 404: "Issue/pull request not located! Please enter a valid number!",
- 403: "Rate limit has been hit! Please try again later!"
-}
-REQUEST_HEADERS = {
- "Accept": "application/vnd.github.v3+json"
-}
-
-REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
-ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
-PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
-
-if GITHUB_TOKEN := Tokens.github:
- REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
-
-WHITELISTED_CATEGORIES = (
- Categories.development, Categories.devprojects, Categories.media, Categories.staff
-)
-
-CODE_BLOCK_RE = re.compile(
- r"^`([^`\n]+)`" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-
-# Maximum number of issues in one message
-MAXIMUM_ISSUES = 5
-
-# Regex used when looking for automatic linking in messages
-# regex101 of current regex https://regex101.com/r/V2ji8M/6
-AUTOMATIC_REGEX = re.compile(
- r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
-)
-
-
-@dataclass
-class FoundIssue:
- """Dataclass representing an issue found by the regex."""
-
- organisation: Optional[str]
- repository: str
- number: str
-
- def __hash__(self) -> int:
- return hash((self.organisation, self.repository, self.number))
-
-
-@dataclass
-class FetchError:
- """Dataclass representing an error while fetching an issue."""
-
- return_code: int
- message: str
-
-
-@dataclass
-class IssueState:
- """Dataclass representing the state of an issue."""
-
- repository: str
- number: int
- url: str
- title: str
- emoji: str
-
-
-class Issues(commands.Cog):
- """Cog that allows users to retrieve issues from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.repos = []
-
- @staticmethod
- def remove_codeblocks(message: str) -> str:
- """Remove any codeblock in a message."""
- return re.sub(CODE_BLOCK_RE, "", message)
-
- async def fetch_issues(
- self,
- number: int,
- repository: str,
- user: str
- ) -> Union[IssueState, FetchError]:
- """
- Retrieve an issue from a GitHub repository.
-
- Returns IssueState on success, FetchError on failure.
- """
- url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
- pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
- log.trace(f"Querying GH issues API: {url}")
-
- async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
- json_data = await r.json()
-
- if r.status == 403:
- if r.headers.get("X-RateLimit-Remaining") == "0":
- log.info(f"Ratelimit reached while fetching {url}")
- return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
- return FetchError(403, "Cannot access issue.")
- elif r.status in (404, 410):
- return FetchError(r.status, "Issue not found.")
- elif r.status != 200:
- return FetchError(r.status, "Error while fetching issue.")
-
- # The initial API request is made to the issues API endpoint, which will return information
- # if the issue or PR is present. However, the scope of information returned for PRs differs
- # from issues: if the 'issues' key is present in the response then we can pull the data we
- # need from the initial API call.
- if "issues" in json_data["html_url"]:
- if json_data.get("state") == "open":
- emoji = Emojis.issue_open
- else:
- emoji = Emojis.issue_closed
-
- # If the 'issues' key is not contained in the API response and there is no error code, then
- # we know that a PR has been requested and a call to the pulls API endpoint is necessary
- # to get the desired information for the PR.
- else:
- log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
- async with self.bot.http_session.get(pulls_url) as p:
- pull_data = await p.json()
- if pull_data["draft"]:
- emoji = Emojis.pull_request_draft
- elif pull_data["state"] == "open":
- emoji = Emojis.pull_request_open
- # When 'merged_at' is not None, this means that the state of the PR is merged
- elif pull_data["merged_at"] is not None:
- emoji = Emojis.pull_request_merged
- else:
- emoji = Emojis.pull_request_closed
-
- issue_url = json_data.get("html_url")
-
- return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
-
- @staticmethod
- def format_embed(
- results: list[Union[IssueState, FetchError]],
- user: str,
- repository: Optional[str] = None
- ) -> discord.Embed:
- """Take a list of IssueState or FetchError and format a Discord embed for them."""
- description_list = []
-
- for result in results:
- if isinstance(result, IssueState):
- description_list.append(f"{result.emoji} [{result.title}]({result.url})")
- elif isinstance(result, FetchError):
- description_list.append(f":x: [{result.return_code}] {result.message}")
-
- resp = discord.Embed(
- colour=Colours.bright_green,
- description="\n".join(description_list)
- )
-
- embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
- resp.set_author(name="GitHub", url=embed_url)
- return resp
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(aliases=("issues", "pr", "prs"))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
- # Remove duplicates
- numbers = set(numbers)
-
- err_message = None
- if not numbers:
- err_message = "You must have at least one issue/PR!"
-
- elif len(numbers) > MAXIMUM_ISSUES:
- err_message = f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
-
- # If there's an error with command invocation then send an error embed
- if err_message is not None:
- err_embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=err_message
- )
- await ctx.send(embed=err_embed)
- await invoke_help_command(ctx)
- return
-
- results = [await self.fetch_issues(number, repository, user) for number in numbers]
- await ctx.send(embed=self.format_embed(results, user, repository))
-
- @commands.Cog.listener()
- async def on_message(self, message: discord.Message) -> None:
- """
- Automatic issue linking.
-
- Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
- """
- # Ignore bots
- if message.author.bot:
- return
-
- issues = [
- FoundIssue(*match.group("org", "repo", "number"))
- for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
- ]
- links = []
-
- if issues:
- # Block this from working in DMs
- if not message.guild:
- await message.channel.send(
- embed=discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=(
- "You can't retrieve issues from DMs. "
- f"Try again in <#{Channels.community_bot_commands}>"
- ),
- colour=Colours.soft_red
- )
- )
- return
-
- log.trace(f"Found {issues = }")
- # Remove duplicates
- issues = set(issues)
-
- if len(issues) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await message.channel.send(embed=embed, delete_after=5)
- return
-
- for repo_issue in issues:
- result = await self.fetch_issues(
- int(repo_issue.number),
- repo_issue.repository,
- repo_issue.organisation or "python-discord"
- )
- if isinstance(result, IssueState):
- links.append(result)
-
- if not links:
- return
-
- resp = self.format_embed(links, "python-discord")
- await message.channel.send(embed=resp)
-
-
-def setup(bot: Bot) -> None:
- """Load the Issues cog."""
- bot.add_cog(Issues(bot))
diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py
index bf8f1341..5e9757d0 100644
--- a/bot/exts/utilities/realpython.py
+++ b/bot/exts/utilities/realpython.py
@@ -11,11 +11,10 @@ from bot.constants import Colours
logger = logging.getLogger(__name__)
-
API_ROOT = "https://realpython.com/search/api/v1/"
ARTICLE_URL = "https://realpython.com{article_url}"
SEARCH_URL = "https://realpython.com/search?q={user_search}"
-
+HOME_URL = "https://realpython.com/"
ERROR_EMBED = Embed(
title="Error while searching Real Python",
@@ -32,13 +31,22 @@ class RealPython(commands.Cog):
@commands.command(aliases=["rp"])
@commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
- async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None:
+ async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *,
+ user_search: Optional[str] = None) -> None:
"""
Send some articles from RealPython that match the search terms.
- By default the top 5 matches are sent, this can be overwritten to
+ By default, the top 5 matches are sent. This can be overwritten to
a number between 1 and 5 by specifying an amount before the search query.
+ If no search query is specified by the user, the home page is sent.
"""
+ if user_search is None:
+ home_page_embed = Embed(title="Real Python Home Page", url=HOME_URL, colour=Colours.orange)
+
+ await ctx.send(embed=home_page_embed)
+
+ return
+
if not 1 <= amount <= 5:
await ctx.send("`amount` must be between 1 and 5 (inclusive).")
return
diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py
new file mode 100644
index 00000000..c915f05b
--- /dev/null
+++ b/bot/exts/utilities/twemoji.py
@@ -0,0 +1,150 @@
+import logging
+import re
+from typing import Literal, Optional
+
+import discord
+from discord.ext import commands
+from emoji import UNICODE_EMOJI_ENGLISH, is_emoji
+
+from bot.bot import Bot
+from bot.constants import Colours, Roles
+from bot.utils.decorators import whitelist_override
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+BASE_URLS = {
+ "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/",
+ "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/",
+}
+CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$")
+
+
+class Twemoji(commands.Cog):
+ """Utilities for working with Twemojis."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def get_url(codepoint: str, format: Literal["png", "svg"]) -> str:
+ """Returns a source file URL for the specified Twemoji, in the corresponding format."""
+ return f"{BASE_URLS[format]}{codepoint}.{format}"
+
+ @staticmethod
+ def alias_to_name(alias: str) -> str:
+ """
+ Transform a unicode alias to an emoji name.
+
+ Example usages:
+ >>> alias_to_name(":falling_leaf:")
+ "Falling leaf"
+ >>> alias_to_name(":family_man_girl_boy:")
+ "Family man girl boy"
+ """
+ name = alias.strip(":").replace("_", " ")
+ return name.capitalize()
+
+ @staticmethod
+ def build_embed(codepoint: str) -> discord.Embed:
+ """Returns the main embed for the `twemoji` commmand."""
+ emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-"))
+
+ embed = discord.Embed(
+ title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]),
+ description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})",
+ colour=Colours.twitter_blue,
+ )
+ embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png"))
+ return embed
+
+ @staticmethod
+ def emoji(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found.
+
+ The return value is an emoji character, such as "πŸ‚". The `codepoint`
+ argument can be of any format, since it will be trimmed automatically.
+ """
+ if code := Twemoji.trim_code(codepoint):
+ return chr(int(code, 16))
+
+ @staticmethod
+ def codepoint(emoji: Optional[str]) -> Optional[str]:
+ """
+ Returns the codepoint, in a trimmed format, of a single emoji.
+
+ `emoji` should be an emoji character, such as "🐍" and "πŸ₯°", and
+ not a codepoint like "1f1f8". When working with combined emojis,
+ such as "πŸ‡ΈπŸ‡ͺ" and "πŸ‘¨β€πŸ‘©β€πŸ‘¦", send the component emojis through the method
+ one at a time.
+ """
+ if emoji is None:
+ return None
+ return hex(ord(emoji)).removeprefix("0x")
+
+ @staticmethod
+ def trim_code(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the meaningful information from the given `codepoint`.
+
+ If no codepoint is found, `None` is returned.
+
+ Example usages:
+ >>> trim_code("U+1f1f8")
+ "1f1f8"
+ >>> trim_code("\u0001f1f8")
+ "1f1f8"
+ >>> trim_code("1f466")
+ "1f466"
+ """
+ if code := CODEPOINT_REGEX.search(codepoint or ""):
+ return code.group()
+
+ @staticmethod
+ def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str:
+ """
+ Returns the codepoint corresponding to the passed tuple, separated by "-".
+
+ The return format matches the format used in URLs for Twemoji source files.
+
+ Example usages:
+ >>> codepoint_from_input(("🐍",))
+ "1f40d"
+ >>> codepoint_from_input(("1f1f8", "1f1ea"))
+ "1f1f8-1f1ea"
+ >>> codepoint_from_input(("πŸ‘¨β€πŸ‘§β€πŸ‘¦",))
+ "1f468-200d-1f467-200d-1f466"
+ """
+ raw_emoji = [emoji.lower() for emoji in raw_emoji]
+ if is_emoji(raw_emoji[0]):
+ emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0])
+ return "-".join(emojis)
+
+ emoji = "".join(
+ Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji
+ )
+ if is_emoji(emoji):
+ return "-".join(Twemoji.codepoint(e) or "" for e in emoji)
+
+ raise ValueError("No codepoint could be obtained from the given input")
+
+ @commands.command(aliases=("tw",))
+ @whitelist_override(roles=(Roles.everyone,))
+ async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None:
+ """Sends a preview of a given Twemoji, specified by codepoint or emoji."""
+ if len(raw_emoji) == 0:
+ await invoke_help_command(ctx)
+ return
+ try:
+ codepoint = self.codepoint_from_input(raw_emoji)
+ except ValueError:
+ raise commands.BadArgument(
+ "please include a valid emoji or emoji codepoint."
+ )
+
+ await ctx.send(embed=self.build_embed(codepoint))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Twemoji cog."""
+ bot.add_cog(Twemoji(bot))