aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/utilities
diff options
context:
space:
mode:
authorGravatar Senjan21 <[email protected]>2022-05-09 18:21:21 +0200
committerGravatar GitHub <[email protected]>2022-05-09 18:21:21 +0200
commit019983c3785191a0c7182c62394cec2bac123d51 (patch)
tree54c296bd04a13a0097cbd7249b78a6716556d735 /bot/exts/utilities
parentDoublefixed indentation and removed unused import. (diff)
parentBump pillow from 9.0.0 to 9.0.1 (#1045) (diff)
Merge branch 'main' into uwu
Diffstat (limited to 'bot/exts/utilities')
-rw-r--r--bot/exts/utilities/bookmark.py13
-rw-r--r--bot/exts/utilities/challenges.py341
-rw-r--r--bot/exts/utilities/colour.py266
-rw-r--r--bot/exts/utilities/conversationstarters.py91
-rw-r--r--bot/exts/utilities/emoji.py6
-rw-r--r--bot/exts/utilities/epoch.py138
-rw-r--r--bot/exts/utilities/githubinfo.py220
-rw-r--r--bot/exts/utilities/issues.py275
-rw-r--r--bot/exts/utilities/latex.py101
-rw-r--r--bot/exts/utilities/realpython.py16
-rw-r--r--bot/exts/utilities/reddit.py10
-rw-r--r--bot/exts/utilities/twemoji.py150
-rw-r--r--bot/exts/utilities/wikipedia.py6
-rw-r--r--bot/exts/utilities/wtf_python.py138
14 files changed, 1343 insertions, 428 deletions
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py
index a91ef1c0..b50205a0 100644
--- a/bot/exts/utilities/bookmark.py
+++ b/bot/exts/utilities/bookmark.py
@@ -7,7 +7,7 @@ import discord
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS
+from bot.constants import Colours, ERROR_REPLIES, Icons, Roles
from bot.utils.converters import WrappedMessageConverter
from bot.utils.decorators import whitelist_override
@@ -16,7 +16,6 @@ log = logging.getLogger(__name__)
# Number of seconds to wait for other users to bookmark the same message
TIMEOUT = 120
BOOKMARK_EMOJI = "πŸ“Œ"
-WHITELISTED_CATEGORIES = (Categories.help_in_use,)
class Bookmark(commands.Cog):
@@ -87,8 +86,8 @@ class Bookmark(commands.Cog):
await message.add_reaction(BOOKMARK_EMOJI)
return message
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
@commands.command(name="bookmark", aliases=("bm", "pin"))
+ @whitelist_override(roles=(Roles.everyone,))
async def bookmark(
self,
ctx: commands.Context,
@@ -99,7 +98,13 @@ class Bookmark(commands.Cog):
"""Send the author a link to `target_message` via DMs."""
if not target_message:
if not ctx.message.reference:
- raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.")
+ raise commands.UserInputError(
+ "You must either provide a valid message to bookmark, or reply to one."
+ "\n\nThe lookup strategy for a message is as follows (in order):"
+ "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
+ "\n2. Lookup by message ID (the message **must** be in the context channel)"
+ "\n3. Lookup by message URL"
+ )
target_message = ctx.message.reference.resolved
# Prevent users from bookmarking a message in a channel they don't have access to
diff --git a/bot/exts/utilities/challenges.py b/bot/exts/utilities/challenges.py
new file mode 100644
index 00000000..ab7ae442
--- /dev/null
+++ b/bot/exts/utilities/challenges.py
@@ -0,0 +1,341 @@
+import logging
+from asyncio import to_thread
+from random import choice
+from typing import Union
+
+from bs4 import BeautifulSoup
+from discord import Embed, Interaction, SelectOption, ui
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, Emojis, NEGATIVE_REPLIES
+
+log = logging.getLogger(__name__)
+API_ROOT = "https://www.codewars.com/api/v1/code-challenges/{kata_id}"
+
+# Map difficulty for the kata to color we want to display in the embed.
+# These colors are representative of the colors that each kyu's level represents on codewars.com
+MAPPING_OF_KYU = {
+ 8: 0xdddbda, 7: 0xdddbda, 6: 0xecb613, 5: 0xecb613,
+ 4: 0x3c7ebb, 3: 0x3c7ebb, 2: 0x866cc7, 1: 0x866cc7
+}
+
+# Supported languages for a kata on codewars.com
+SUPPORTED_LANGUAGES = {
+ "stable": [
+ "c", "c#", "c++", "clojure", "coffeescript", "coq", "crystal", "dart", "elixir",
+ "f#", "go", "groovy", "haskell", "java", "javascript", "kotlin", "lean", "lua", "nasm",
+ "php", "python", "racket", "ruby", "rust", "scala", "shell", "sql", "swift", "typescript"
+ ],
+ "beta": [
+ "agda", "bf", "cfml", "cobol", "commonlisp", "elm", "erlang", "factor",
+ "forth", "fortran", "haxe", "idris", "julia", "nim", "objective-c", "ocaml",
+ "pascal", "perl", "powershell", "prolog", "purescript", "r", "raku", "reason", "solidity", "vb.net"
+ ]
+}
+
+
+class InformationDropdown(ui.Select):
+ """A dropdown inheriting from ui.Select that allows finding out other information about the kata."""
+
+ def __init__(self, language_embed: Embed, tags_embed: Embed, other_info_embed: Embed, main_embed: Embed):
+ options = [
+ SelectOption(
+ label="Main Information",
+ description="See the kata's difficulty, description, etc.",
+ emoji="🌎"
+ ),
+ SelectOption(
+ label="Languages",
+ description="See what languages this kata supports!",
+ emoji=Emojis.reddit_post_text
+ ),
+ SelectOption(
+ label="Tags",
+ description="See what categories this kata falls under!",
+ emoji=Emojis.stackoverflow_tag
+ ),
+ SelectOption(
+ label="Other Information",
+ description="See how other people performed on this kata and more!",
+ emoji="β„Ή"
+ )
+ ]
+
+ # We map the option label to the embed instance so that it can be easily looked up later in O(1)
+ self.mapping_of_embeds = {
+ "Main Information": main_embed,
+ "Languages": language_embed,
+ "Tags": tags_embed,
+ "Other Information": other_info_embed,
+ }
+
+ super().__init__(
+ placeholder="See more information regarding this kata",
+ min_values=1,
+ max_values=1,
+ options=options
+ )
+
+ async def callback(self, interaction: Interaction) -> None:
+ """Callback for when someone clicks on a dropdown."""
+ # Edit the message to the embed selected in the option
+ # The `original_message` attribute is set just after the message is sent with the view.
+ # The attribute is not set during initialization.
+ result_embed = self.mapping_of_embeds[self.values[0]]
+ await self.original_message.edit(embed=result_embed)
+
+
+class Challenges(commands.Cog):
+ """
+ Cog for the challenge command.
+
+ The challenge command pulls a random kata from codewars.com.
+ A kata is the name for a challenge, specific to codewars.com.
+
+ The challenge command also has filters to customize the kata that is given.
+ You can specify the language the kata should be from, difficulty and topic of the kata.
+ """
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ async def kata_id(self, search_link: str, params: dict) -> Union[str, Embed]:
+ """
+ Uses bs4 to get the HTML code for the page of katas, where the page is the link of the formatted `search_link`.
+
+ This will webscrape the search page with `search_link` and then get the ID of a kata for the
+ codewars.com API to use.
+ """
+ async with self.bot.http_session.get(search_link, params=params) as response:
+ if response.status != 200:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="We ran into an error when getting the kata from codewars.com, try again later.",
+ color=Colours.soft_red
+ )
+ log.error(f"Unexpected response from codewars.com, status code: {response.status}")
+ return error_embed
+
+ soup = BeautifulSoup(await response.text(), features="lxml")
+ first_kata_div = await to_thread(soup.find_all, "div", class_="item-title px-0")
+
+ if not first_kata_div:
+ raise commands.BadArgument("No katas could be found with the filters provided.")
+ elif len(first_kata_div) >= 3:
+ first_kata_div = choice(first_kata_div[:3])
+ elif "q=" not in search_link:
+ first_kata_div = choice(first_kata_div)
+ else:
+ first_kata_div = first_kata_div[0]
+
+ # There are numerous divs before arriving at the id of the kata, which can be used for the link.
+ first_kata_id = first_kata_div.a["href"].split("/")[-1]
+ return first_kata_id
+
+ async def kata_information(self, kata_id: str) -> Union[dict, Embed]:
+ """
+ Returns the information about the Kata.
+
+ Uses the codewars.com API to get information about the kata using `kata_id`.
+ """
+ async with self.bot.http_session.get(API_ROOT.format(kata_id=kata_id)) as response:
+ if response.status != 200:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="We ran into an error when getting the kata information, try again later.",
+ color=Colours.soft_red
+ )
+ log.error(f"Unexpected response from codewars.com/api/v1, status code: {response.status}")
+ return error_embed
+
+ return await response.json()
+
+ @staticmethod
+ def main_embed(kata_information: dict) -> Embed:
+ """Creates the main embed which displays the name, difficulty and description of the kata."""
+ kata_description = kata_information["description"]
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ # Ensuring it isn't over the length 1024
+ if len(kata_description) > 1024:
+ kata_description = "\n".join(kata_description[:1000].split("\n")[:-1]) + "..."
+ kata_description += f" [continue reading]({kata_url})"
+
+ if kata_information["rank"]["name"] is None:
+ embed_color = 8
+ kata_difficulty = "Unable to retrieve difficulty for beta languages."
+ else:
+ embed_color = int(kata_information["rank"]["name"].replace(" kyu", ""))
+ kata_difficulty = kata_information["rank"]["name"]
+
+ kata_embed = Embed(
+ title=kata_information["name"],
+ description=kata_description,
+ color=MAPPING_OF_KYU[embed_color],
+ url=kata_url
+ )
+ kata_embed.add_field(name="Difficulty", value=kata_difficulty, inline=False)
+ return kata_embed
+
+ @staticmethod
+ def language_embed(kata_information: dict) -> Embed:
+ """Creates the 'language embed' which displays all languages the kata supports."""
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ languages = "\n".join(map(str.title, kata_information["languages"]))
+ language_embed = Embed(
+ title=kata_information["name"],
+ description=f"```yaml\nSupported Languages:\n{languages}\n```",
+ color=Colours.python_blue,
+ url=kata_url
+ )
+ return language_embed
+
+ @staticmethod
+ def tags_embed(kata_information: dict) -> Embed:
+ """
+ Creates the 'tags embed' which displays all the tags of the Kata.
+
+ Tags explain what the kata is about, this is what codewars.com calls categories.
+ """
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ tags = "\n".join(kata_information["tags"])
+ tags_embed = Embed(
+ title=kata_information["name"],
+ description=f"```yaml\nTags:\n{tags}\n```",
+ color=Colours.grass_green,
+ url=kata_url
+ )
+ return tags_embed
+
+ @staticmethod
+ def miscellaneous_embed(kata_information: dict) -> Embed:
+ """
+ Creates the 'other information embed' which displays miscellaneous information about the kata.
+
+ This embed shows statistics such as the total number of people who completed the kata,
+ the total number of stars of the kata, etc.
+ """
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ embed = Embed(
+ title=kata_information["name"],
+ description="```nim\nOther Information\n```",
+ color=Colours.grass_green,
+ url=kata_url
+ )
+ embed.add_field(
+ name="`Total Score`",
+ value=f"```css\n{kata_information['voteScore']}\n```",
+ inline=False
+ )
+ embed.add_field(
+ name="`Total Stars`",
+ value=f"```css\n{kata_information['totalStars']}\n```",
+ inline=False
+ )
+ embed.add_field(
+ name="`Total Completed`",
+ value=f"```css\n{kata_information['totalCompleted']}\n```",
+ inline=False
+ )
+ embed.add_field(
+ name="`Total Attempts`",
+ value=f"```css\n{kata_information['totalAttempts']}\n```",
+ inline=False
+ )
+ return embed
+
+ @staticmethod
+ def create_view(dropdown: InformationDropdown, link: str) -> ui.View:
+ """
+ Creates the discord.py View for the Discord message components (dropdowns and buttons).
+
+ The discord UI is implemented onto the embed, where the user can choose what information about the kata they
+ want, along with a link button to the kata itself.
+ """
+ view = ui.View()
+ view.add_item(dropdown)
+ view.add_item(ui.Button(label="View the Kata", url=link))
+ return view
+
+ @commands.command(aliases=["kata"])
+ @commands.cooldown(1, 5, commands.BucketType.user)
+ async def challenge(self, ctx: commands.Context, language: str = "python", *, query: str = None) -> None:
+ """
+ The challenge command pulls a random kata (challenge) from codewars.com.
+
+ The different ways to use this command are:
+ `.challenge <language>` - Pulls a random challenge within that language's scope.
+ `.challenge <language> <difficulty>` - The difficulty can be from 1-8,
+ 1 being the hardest, 8 being the easiest. This pulls a random challenge within that difficulty & language.
+ `.challenge <language> <query>` - Pulls a random challenge with the query provided under the language
+ `.challenge <language> <query>, <difficulty>` - Pulls a random challenge with the query provided,
+ under that difficulty within the language's scope.
+ """
+ language = language.lower()
+ if language not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]:
+ raise commands.BadArgument("This is not a recognized language on codewars.com!")
+
+ get_kata_link = f"https://codewars.com/kata/search/{language}"
+ params = {}
+
+ if query is not None:
+ if "," in query:
+ query_splitted = query.split("," if ", " not in query else ", ")
+
+ if len(query_splitted) > 2:
+ raise commands.BadArgument(
+ "There can only be one comma within the query, separating the difficulty and the query itself."
+ )
+
+ query, level = query_splitted
+ params["q"] = query
+ params["r[]"] = f"-{level}"
+ elif query.isnumeric():
+ params["r[]"] = f"-{query}"
+ else:
+ params["q"] = query
+
+ params["beta"] = str(language in SUPPORTED_LANGUAGES["beta"]).lower()
+
+ first_kata_id = await self.kata_id(get_kata_link, params)
+ if isinstance(first_kata_id, Embed):
+ # We ran into an error when retrieving the website link
+ await ctx.send(embed=first_kata_id)
+ return
+
+ kata_information = await self.kata_information(first_kata_id)
+ if isinstance(kata_information, Embed):
+ # Something went wrong when trying to fetch the kata information
+ await ctx.d(embed=kata_information)
+ return
+
+ kata_embed = self.main_embed(kata_information)
+ language_embed = self.language_embed(kata_information)
+ tags_embed = self.tags_embed(kata_information)
+ miscellaneous_embed = self.miscellaneous_embed(kata_information)
+
+ dropdown = InformationDropdown(
+ main_embed=kata_embed,
+ language_embed=language_embed,
+ tags_embed=tags_embed,
+ other_info_embed=miscellaneous_embed
+ )
+ kata_view = self.create_view(dropdown, f"https://codewars.com/kata/{first_kata_id}")
+ original_message = await ctx.send(
+ embed=kata_embed,
+ view=kata_view
+ )
+ dropdown.original_message = original_message
+
+ wait_for_kata = await kata_view.wait()
+ if wait_for_kata:
+ await original_message.edit(embed=kata_embed, view=None)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Challenges cog."""
+ bot.add_cog(Challenges(bot))
diff --git a/bot/exts/utilities/colour.py b/bot/exts/utilities/colour.py
new file mode 100644
index 00000000..ee6bad93
--- /dev/null
+++ b/bot/exts/utilities/colour.py
@@ -0,0 +1,266 @@
+import colorsys
+import json
+import pathlib
+import random
+import string
+from io import BytesIO
+from typing import Optional
+
+import discord
+import rapidfuzz
+from PIL import Image, ImageColor
+from discord.ext import commands
+
+from bot import constants
+from bot.bot import Bot
+from bot.exts.core.extensions import invoke_help_command
+from bot.utils.decorators import whitelist_override
+
+THUMBNAIL_SIZE = (80, 80)
+
+
+class Colour(commands.Cog):
+ """Cog for the Colour command."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ with open(pathlib.Path("bot/resources/utilities/ryanzec_colours.json")) as f:
+ self.colour_mapping = json.load(f)
+ del self.colour_mapping['_'] # Delete source credit entry
+
+ async def send_colour_response(self, ctx: commands.Context, rgb: tuple[int, int, int]) -> None:
+ """Create and send embed from user given colour information."""
+ name = self._rgb_to_name(rgb)
+ try:
+ colour_or_color = ctx.invoked_parents[0]
+ except IndexError:
+ colour_or_color = "colour"
+
+ colour_mode = ctx.invoked_with
+ if colour_mode == "random":
+ colour_mode = colour_or_color
+ input_colour = name
+ elif colour_mode in ("colour", "color"):
+ input_colour = ctx.kwargs["colour_input"]
+ elif colour_mode == "name":
+ input_colour = ctx.kwargs["user_colour_name"]
+ elif colour_mode == "hex":
+ input_colour = ctx.args[2:][0]
+ if len(input_colour) > 7:
+ input_colour = input_colour[0:-2]
+ else:
+ input_colour = tuple(ctx.args[2:])
+
+ if colour_mode not in ("name", "hex", "random", "color", "colour"):
+ colour_mode = colour_mode.upper()
+ else:
+ colour_mode = colour_mode.title()
+
+ colour_embed = discord.Embed(
+ title=f"{name or input_colour}",
+ description=f"{colour_or_color.title()} information for {colour_mode} `{input_colour or name}`.",
+ colour=discord.Color.from_rgb(*rgb)
+ )
+ colour_conversions = self.get_colour_conversions(rgb)
+ for colour_space, value in colour_conversions.items():
+ colour_embed.add_field(
+ name=colour_space,
+ value=f"`{value}`",
+ inline=True
+ )
+
+ thumbnail = Image.new("RGB", THUMBNAIL_SIZE, color=rgb)
+ buffer = BytesIO()
+ thumbnail.save(buffer, "PNG")
+ buffer.seek(0)
+ thumbnail_file = discord.File(buffer, filename="colour.png")
+
+ colour_embed.set_thumbnail(url="attachment://colour.png")
+
+ await ctx.send(file=thumbnail_file, embed=colour_embed)
+
+ @commands.group(aliases=("color",), invoke_without_command=True)
+ @whitelist_override(
+ channels=constants.WHITELISTED_CHANNELS,
+ roles=constants.STAFF_ROLES,
+ categories=[constants.Categories.development, constants.Categories.media]
+ )
+ async def colour(self, ctx: commands.Context, *, colour_input: Optional[str] = None) -> None:
+ """
+ Create an embed that displays colour information.
+
+ If no subcommand is called, a randomly selected colour will be shown.
+ """
+ if colour_input is None:
+ await self.random(ctx)
+ return
+
+ try:
+ extra_colour = ImageColor.getrgb(colour_input)
+ await self.send_colour_response(ctx, extra_colour)
+ except ValueError:
+ await invoke_help_command(ctx)
+
+ @colour.command()
+ async def rgb(self, ctx: commands.Context, red: int, green: int, blue: int) -> None:
+ """Create an embed from an RGB input."""
+ if any(c not in range(256) for c in (red, green, blue)):
+ raise commands.BadArgument(
+ message=f"RGB values can only be from 0 to 255. User input was: `{red, green, blue}`."
+ )
+ rgb_tuple = (red, green, blue)
+ await self.send_colour_response(ctx, rgb_tuple)
+
+ @colour.command()
+ async def hsv(self, ctx: commands.Context, hue: int, saturation: int, value: int) -> None:
+ """Create an embed from an HSV input."""
+ if (hue not in range(361)) or any(c not in range(101) for c in (saturation, value)):
+ raise commands.BadArgument(
+ message="Hue can only be from 0 to 360. Saturation and Value can only be from 0 to 100. "
+ f"User input was: `{hue, saturation, value}`."
+ )
+ hsv_tuple = ImageColor.getrgb(f"hsv({hue}, {saturation}%, {value}%)")
+ await self.send_colour_response(ctx, hsv_tuple)
+
+ @colour.command()
+ async def hsl(self, ctx: commands.Context, hue: int, saturation: int, lightness: int) -> None:
+ """Create an embed from an HSL input."""
+ if (hue not in range(361)) or any(c not in range(101) for c in (saturation, lightness)):
+ raise commands.BadArgument(
+ message="Hue can only be from 0 to 360. Saturation and Lightness can only be from 0 to 100. "
+ f"User input was: `{hue, saturation, lightness}`."
+ )
+ hsl_tuple = ImageColor.getrgb(f"hsl({hue}, {saturation}%, {lightness}%)")
+ await self.send_colour_response(ctx, hsl_tuple)
+
+ @colour.command()
+ async def cmyk(self, ctx: commands.Context, cyan: int, magenta: int, yellow: int, key: int) -> None:
+ """Create an embed from a CMYK input."""
+ if any(c not in range(101) for c in (cyan, magenta, yellow, key)):
+ raise commands.BadArgument(
+ message=f"CMYK values can only be from 0 to 100. User input was: `{cyan, magenta, yellow, key}`."
+ )
+ r = round(255 * (1 - (cyan / 100)) * (1 - (key / 100)))
+ g = round(255 * (1 - (magenta / 100)) * (1 - (key / 100)))
+ b = round(255 * (1 - (yellow / 100)) * (1 - (key / 100)))
+ await self.send_colour_response(ctx, (r, g, b))
+
+ @colour.command()
+ async def hex(self, ctx: commands.Context, hex_code: str) -> None:
+ """Create an embed from a HEX input."""
+ if hex_code[0] != "#":
+ hex_code = f"#{hex_code}"
+
+ if len(hex_code) not in (4, 5, 7, 9) or any(digit not in string.hexdigits for digit in hex_code[1:]):
+ raise commands.BadArgument(
+ message=f"Cannot convert `{hex_code}` to a recognizable Hex format. "
+ "Hex values must be hexadecimal and take the form *#RRGGBB* or *#RGB*."
+ )
+
+ hex_tuple = ImageColor.getrgb(hex_code)
+ if len(hex_tuple) == 4:
+ hex_tuple = hex_tuple[:-1] # Colour must be RGB. If RGBA, we remove the alpha value
+ await self.send_colour_response(ctx, hex_tuple)
+
+ @colour.command()
+ async def name(self, ctx: commands.Context, *, user_colour_name: str) -> None:
+ """Create an embed from a name input."""
+ hex_colour = self.match_colour_name(ctx, user_colour_name)
+ if hex_colour is None:
+ name_error_embed = discord.Embed(
+ title="No colour match found.",
+ description=f"No colour found for: `{user_colour_name}`",
+ colour=discord.Color.dark_red()
+ )
+ await ctx.send(embed=name_error_embed)
+ return
+ hex_tuple = ImageColor.getrgb(hex_colour)
+ await self.send_colour_response(ctx, hex_tuple)
+
+ @colour.command()
+ async def random(self, ctx: commands.Context) -> None:
+ """Create an embed from a randomly chosen colour."""
+ hex_colour = random.choice(list(self.colour_mapping.values()))
+ hex_tuple = ImageColor.getrgb(f"#{hex_colour}")
+ await self.send_colour_response(ctx, hex_tuple)
+
+ def get_colour_conversions(self, rgb: tuple[int, int, int]) -> dict[str, str]:
+ """Create a dictionary mapping of colour types and their values."""
+ colour_name = self._rgb_to_name(rgb)
+ if colour_name is None:
+ colour_name = "No match found"
+ return {
+ "RGB": rgb,
+ "HSV": self._rgb_to_hsv(rgb),
+ "HSL": self._rgb_to_hsl(rgb),
+ "CMYK": self._rgb_to_cmyk(rgb),
+ "Hex": self._rgb_to_hex(rgb),
+ "Name": colour_name
+ }
+
+ @staticmethod
+ def _rgb_to_hsv(rgb: tuple[int, int, int]) -> tuple[int, int, int]:
+ """Convert RGB values to HSV values."""
+ rgb_list = [val / 255 for val in rgb]
+ h, s, v = colorsys.rgb_to_hsv(*rgb_list)
+ hsv = (round(h * 360), round(s * 100), round(v * 100))
+ return hsv
+
+ @staticmethod
+ def _rgb_to_hsl(rgb: tuple[int, int, int]) -> tuple[int, int, int]:
+ """Convert RGB values to HSL values."""
+ rgb_list = [val / 255.0 for val in rgb]
+ h, l, s = colorsys.rgb_to_hls(*rgb_list)
+ hsl = (round(h * 360), round(s * 100), round(l * 100))
+ return hsl
+
+ @staticmethod
+ def _rgb_to_cmyk(rgb: tuple[int, int, int]) -> tuple[int, int, int, int]:
+ """Convert RGB values to CMYK values."""
+ rgb_list = [val / 255.0 for val in rgb]
+ if not any(rgb_list):
+ return 0, 0, 0, 100
+ k = 1 - max(rgb_list)
+ c = round((1 - rgb_list[0] - k) * 100 / (1 - k))
+ m = round((1 - rgb_list[1] - k) * 100 / (1 - k))
+ y = round((1 - rgb_list[2] - k) * 100 / (1 - k))
+ cmyk = (c, m, y, round(k * 100))
+ return cmyk
+
+ @staticmethod
+ def _rgb_to_hex(rgb: tuple[int, int, int]) -> str:
+ """Convert RGB values to HEX code."""
+ hex_ = "".join([hex(val)[2:].zfill(2) for val in rgb])
+ hex_code = f"#{hex_}".upper()
+ return hex_code
+
+ def _rgb_to_name(self, rgb: tuple[int, int, int]) -> Optional[str]:
+ """Convert RGB values to a fuzzy matched name."""
+ input_hex_colour = self._rgb_to_hex(rgb)
+ try:
+ match, certainty, _ = rapidfuzz.process.extractOne(
+ query=input_hex_colour,
+ choices=self.colour_mapping.values(),
+ score_cutoff=80
+ )
+ colour_name = [name for name, hex_code in self.colour_mapping.items() if hex_code == match][0]
+ except TypeError:
+ colour_name = None
+ return colour_name
+
+ def match_colour_name(self, ctx: commands.Context, input_colour_name: str) -> Optional[str]:
+ """Convert a colour name to HEX code."""
+ try:
+ match, certainty, _ = rapidfuzz.process.extractOne(
+ query=input_colour_name,
+ choices=self.colour_mapping.keys(),
+ score_cutoff=80
+ )
+ except (ValueError, TypeError):
+ return
+ return f"#{self.colour_mapping[match]}"
+
+
+def setup(bot: Bot) -> None:
+ """Load the Colour cog."""
+ bot.add_cog(Colour(bot))
diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py
index dd537022..8bf2abfd 100644
--- a/bot/exts/utilities/conversationstarters.py
+++ b/bot/exts/utilities/conversationstarters.py
@@ -1,11 +1,15 @@
+import asyncio
+from contextlib import suppress
+from functools import partial
from pathlib import Path
+from typing import Union
+import discord
import yaml
-from discord import Color, Embed
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import WHITELISTED_CHANNELS
+from bot.constants import MODERATION_ROLES, WHITELISTED_CHANNELS
from bot.utils.decorators import whitelist_override
from bot.utils.randomization import RandomCycle
@@ -35,35 +39,88 @@ TOPICS = {
class ConvoStarters(commands.Cog):
"""General conversation topics."""
- @commands.command()
- @whitelist_override(channels=ALL_ALLOWED_CHANNELS)
- async def topic(self, ctx: commands.Context) -> None:
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def _build_topic_embed(channel_id: int) -> discord.Embed:
"""
- Responds with a random topic to start a conversation.
+ Build an embed containing a conversation topic.
If in a Python channel, a python-related topic will be given.
-
Otherwise, a random conversation topic will be received by the user.
"""
# No matter what, the form will be shown.
- embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple())
+ embed = discord.Embed(
+ description=f"Suggest more topics [here]({SUGGESTION_FORM})!",
+ color=discord.Colour.og_blurple()
+ )
try:
- # Fetching topics.
- channel_topics = TOPICS[ctx.channel.id]
-
- # If the channel isn't Python-related.
+ channel_topics = TOPICS[channel_id]
except KeyError:
+ # Channel doesn't have any topics.
embed.title = f"**{next(TOPICS['default'])}**"
-
- # If the channel ID doesn't have any topics.
else:
embed.title = f"**{next(channel_topics)}**"
+ return embed
+
+ @staticmethod
+ def _predicate(
+ command_invoker: Union[discord.User, discord.Member],
+ message: discord.Message,
+ reaction: discord.Reaction,
+ user: discord.User
+ ) -> bool:
+ user_is_moderator = any(role.id in MODERATION_ROLES for role in getattr(user, "roles", []))
+ user_is_invoker = user.id == command_invoker.id
+
+ is_right_reaction = all((
+ reaction.message.id == message.id,
+ str(reaction.emoji) == "πŸ”„",
+ user_is_moderator or user_is_invoker
+ ))
+ return is_right_reaction
+
+ async def _listen_for_refresh(
+ self,
+ command_invoker: Union[discord.User, discord.Member],
+ message: discord.Message
+ ) -> None:
+ await message.add_reaction("πŸ”„")
+ while True:
+ try:
+ reaction, user = await self.bot.wait_for(
+ "reaction_add",
+ check=partial(self._predicate, command_invoker, message),
+ timeout=60.0
+ )
+ except asyncio.TimeoutError:
+ with suppress(discord.NotFound):
+ await message.clear_reaction("πŸ”„")
+ break
+
+ try:
+ await message.edit(embed=self._build_topic_embed(message.channel.id))
+ except discord.NotFound:
+ break
+
+ with suppress(discord.NotFound):
+ await message.remove_reaction(reaction, user)
- finally:
- await ctx.send(embed=embed)
+ @commands.command()
+ @commands.cooldown(1, 60*2, commands.BucketType.channel)
+ @whitelist_override(channels=ALL_ALLOWED_CHANNELS)
+ async def topic(self, ctx: commands.Context) -> None:
+ """
+ Responds with a random topic to start a conversation.
+
+ Allows the refresh of a topic by pressing an emoji.
+ """
+ message = await ctx.send(embed=self._build_topic_embed(ctx.channel.id))
+ self.bot.loop.create_task(self._listen_for_refresh(ctx.author, message))
def setup(bot: Bot) -> None:
"""Load the ConvoStarters cog."""
- bot.add_cog(ConvoStarters())
+ bot.add_cog(ConvoStarters(bot))
diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py
index 55d6b8e9..fa438d7f 100644
--- a/bot/exts/utilities/emoji.py
+++ b/bot/exts/utilities/emoji.py
@@ -107,11 +107,11 @@ class Emojis(commands.Cog):
title=f"Emoji Information: {emoji.name}",
description=textwrap.dedent(f"""
**Name:** {emoji.name}
- **Created:** {time_since(emoji.created_at, precision="hours")}
- **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")}
+ **Created:** {time_since(emoji.created_at.replace(tzinfo=None), precision="hours")}
+ **Date:** {datetime.strftime(emoji.created_at.replace(tzinfo=None), "%d/%m/%Y")}
**ID:** {emoji.id}
"""),
- color=Color.blurple(),
+ color=Color.og_blurple(),
url=str(emoji.url),
).set_thumbnail(url=emoji.url)
diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py
new file mode 100644
index 00000000..42312dd1
--- /dev/null
+++ b/bot/exts/utilities/epoch.py
@@ -0,0 +1,138 @@
+from typing import Optional, Union
+
+import arrow
+import discord
+from dateutil import parser
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.utils.extensions import invoke_help_command
+
+# https://discord.com/developers/docs/reference#message-formatting-timestamp-styles
+STYLES = {
+ "Epoch": ("",),
+ "Short Time": ("t", "h:mm A",),
+ "Long Time": ("T", "h:mm:ss A"),
+ "Short Date": ("d", "MM/DD/YYYY"),
+ "Long Date": ("D", "MMMM D, YYYY"),
+ "Short Date/Time": ("f", "MMMM D, YYYY h:mm A"),
+ "Long Date/Time": ("F", "dddd, MMMM D, YYYY h:mm A"),
+ "Relative Time": ("R",)
+}
+DROPDOWN_TIMEOUT = 60
+
+
+class DateString(commands.Converter):
+ """Convert a relative or absolute date/time string to an arrow.Arrow object."""
+
+ async def convert(self, ctx: commands.Context, argument: str) -> Union[arrow.Arrow, Optional[tuple]]:
+ """
+ Convert a relative or absolute date/time string to an arrow.Arrow object.
+
+ Try to interpret the date string as a relative time. If conversion fails, try to interpret it as an absolute
+ time. Tokens that are not recognised are returned along with the part of the string that was successfully
+ converted to an arrow object. If the date string cannot be parsed, BadArgument is raised.
+ """
+ try:
+ return arrow.utcnow().dehumanize(argument)
+ except (ValueError, OverflowError):
+ try:
+ dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True)
+ except parser.ParserError:
+ raise commands.BadArgument(f"`{argument}` Could not be parsed to a relative or absolute date.")
+ except OverflowError:
+ raise commands.BadArgument(f"`{argument}` Results in a date outside of the supported range.")
+ return arrow.get(dt), ignored_tokens
+
+
+class Epoch(commands.Cog):
+ """Convert an entered time and date to a unix timestamp."""
+
+ @commands.command(name="epoch")
+ async def epoch(self, ctx: commands.Context, *, date_time: DateString = None) -> None:
+ """
+ Convert an entered date/time string to the equivalent epoch.
+
+ **Relative time**
+ Must begin with `in...` or end with `...ago`.
+ Accepted units: "seconds", "minutes", "hours", "days", "weeks", "months", "years".
+ eg `.epoch in a month 4 days and 2 hours`
+
+ **Absolute time**
+ eg `.epoch 2022/6/15 16:43 -04:00`
+ Absolute times must be entered in descending orders of magnitude.
+ If AM or PM is left unspecified, the 24-hour clock is assumed.
+ Timezones are optional, and will default to UTC. The following timezone formats are accepted:
+ Z (UTC)
+ Β±HH:MM
+ Β±HHMM
+ Β±HH
+
+ Times in the dropdown are shown in UTC
+ """
+ if not date_time:
+ await invoke_help_command(ctx)
+ return
+
+ if isinstance(date_time, tuple):
+ # Remove empty strings. Strip extra whitespace from the remaining items
+ ignored_tokens = list(map(str.strip, filter(str.strip, date_time[1])))
+ date_time = date_time[0]
+ if ignored_tokens:
+ await ctx.send(f"Could not parse the following token(s): `{', '.join(ignored_tokens)}`")
+ await ctx.send(f"Date and time parsed as: `{date_time.format(arrow.FORMAT_RSS)}`")
+
+ epoch = int(date_time.timestamp())
+ view = TimestampMenuView(ctx, self._format_dates(date_time), epoch)
+ original = await ctx.send(f"`{epoch}`", view=view)
+ await view.wait() # wait until expiration before removing the dropdown
+ try:
+ await original.edit(view=None)
+ except discord.NotFound: # disregard the error message if the message is deleled
+ pass
+
+ @staticmethod
+ def _format_dates(date: arrow.Arrow) -> list[str]:
+ """
+ Return a list of date strings formatted according to the discord timestamp styles.
+
+ These are used in the description of each style in the dropdown
+ """
+ date = date.to('utc')
+ formatted = [str(int(date.timestamp()))]
+ formatted += [date.format(format[1]) for format in list(STYLES.values())[1:7]]
+ formatted.append(date.humanize())
+ return formatted
+
+
+class TimestampMenuView(discord.ui.View):
+ """View for the epoch command which contains a single `discord.ui.Select` dropdown component."""
+
+ def __init__(self, ctx: commands.Context, formatted_times: list[str], epoch: int):
+ super().__init__(timeout=DROPDOWN_TIMEOUT)
+ self.ctx = ctx
+ self.epoch = epoch
+ self.dropdown: discord.ui.Select = self.children[0]
+ for label, date_time in zip(STYLES.keys(), formatted_times):
+ self.dropdown.add_option(label=label, description=date_time)
+
+ @discord.ui.select(placeholder="Select the format of your timestamp")
+ async def select_format(self, _: discord.ui.Select, interaction: discord.Interaction) -> discord.Message:
+ """Drop down menu which contains a list of formats which discord timestamps can take."""
+ selected = interaction.data["values"][0]
+ if selected == "Epoch":
+ return await interaction.response.edit_message(content=f"`{self.epoch}`")
+ return await interaction.response.edit_message(content=f"`<t:{self.epoch}:{STYLES[selected][0]}>`")
+
+ async def interaction_check(self, interaction: discord.Interaction) -> bool:
+ """Check to ensure that the interacting user is the user who invoked the command."""
+ if interaction.user != self.ctx.author:
+ embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.")
+ await interaction.response.send_message(embed=embed, ephemeral=True)
+ return False
+ return True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Epoch cog."""
+ bot.add_cog(Epoch())
diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py
index d00b408d..963f54e5 100644
--- a/bot/exts/utilities/githubinfo.py
+++ b/bot/exts/utilities/githubinfo.py
@@ -1,30 +1,165 @@
import logging
import random
+import re
+import typing as t
+from dataclasses import dataclass
from datetime import datetime
-from urllib.parse import quote, quote_plus
+from urllib.parse import quote
import discord
+from aiohttp import ClientResponse
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES
+from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens
from bot.exts.core.extensions import invoke_help_command
log = logging.getLogger(__name__)
GITHUB_API_URL = "https://api.github.com"
+REQUEST_HEADERS = {
+ "Accept": "application/vnd.github.v3+json"
+}
+
+REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
+ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
+PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
+
+if Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}"
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+# Maximum number of issues in one message
+MAXIMUM_ISSUES = 5
+
+# Regex used when looking for automatic linking in messages
+# regex101 of current regex https://regex101.com/r/V2ji8M/6
+AUTOMATIC_REGEX = re.compile(
+ r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
+)
+
+
+@dataclass(eq=True, frozen=True)
+class FoundIssue:
+ """Dataclass representing an issue found by the regex."""
+
+ organisation: t.Optional[str]
+ repository: str
+ number: str
+
+
+@dataclass(eq=True, frozen=True)
+class FetchError:
+ """Dataclass representing an error while fetching an issue."""
+
+ return_code: int
+ message: str
+
+
+@dataclass(eq=True, frozen=True)
+class IssueState:
+ """Dataclass representing the state of an issue."""
+
+ repository: str
+ number: int
+ url: str
+ title: str
+ emoji: str
+
class GithubInfo(commands.Cog):
- """Fetches info from GitHub."""
+ """A Cog that fetches info from GitHub."""
def __init__(self, bot: Bot):
self.bot = bot
+ self.repos = []
+
+ @staticmethod
+ def remove_codeblocks(message: str) -> str:
+ """Remove any codeblock in a message."""
+ return CODE_BLOCK_RE.sub("", message)
+
+ async def fetch_issue(
+ self,
+ number: int,
+ repository: str,
+ user: str
+ ) -> t.Union[IssueState, FetchError]:
+ """
+ Retrieve an issue from a GitHub repository.
+
+ Returns IssueState on success, FetchError on failure.
+ """
+ url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
+ pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
+
+ json_data, r = await self.fetch_data(url)
+
+ if r.status == 403:
+ if r.headers.get("X-RateLimit-Remaining") == "0":
+ log.info(f"Ratelimit reached while fetching {url}")
+ return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
+ return FetchError(403, "Cannot access issue.")
+ elif r.status in (404, 410):
+ return FetchError(r.status, "Issue not found.")
+ elif r.status != 200:
+ return FetchError(r.status, "Error while fetching issue.")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data["html_url"]:
+ if json_data.get("state") == "open":
+ emoji = Emojis.issue_open
+ else:
+ emoji = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ pull_data, _ = await self.fetch_data(pulls_url)
+ if pull_data["draft"]:
+ emoji = Emojis.pull_request_draft
+ elif pull_data["state"] == "open":
+ emoji = Emojis.pull_request_open
+ # When 'merged_at' is not None, this means that the state of the PR is merged
+ elif pull_data["merged_at"] is not None:
+ emoji = Emojis.pull_request_merged
+ else:
+ emoji = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
- async def fetch_data(self, url: str) -> dict:
- """Retrieve data as a dictionary."""
- async with self.bot.http_session.get(url) as r:
- return await r.json()
+ return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
+
+ @staticmethod
+ def format_embed(
+ results: t.List[t.Union[IssueState, FetchError]]
+ ) -> discord.Embed:
+ """Take a list of IssueState or FetchError and format a Discord embed for them."""
+ description_list = []
+
+ for result in results:
+ if isinstance(result, IssueState):
+ description_list.append(f"{result.emoji} [{result.title}]({result.url})")
+ elif isinstance(result, FetchError):
+ description_list.append(f":x: [{result.return_code}] {result.message}")
+
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description="\n".join(description_list)
+ )
+
+ resp.set_author(name="GitHub")
+ return resp
@commands.group(name="github", aliases=("gh", "git"))
@commands.cooldown(1, 10, commands.BucketType.user)
@@ -33,11 +168,67 @@ class GithubInfo(commands.Cog):
if ctx.invoked_subcommand is None:
await invoke_help_command(ctx)
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """
+ Automatic issue linking.
+
+ Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
+ """
+ # Ignore bots
+ if message.author.bot:
+ return
+
+ issues = [
+ FoundIssue(*match.group("org", "repo", "number"))
+ for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
+ ]
+ links = []
+
+ if issues:
+ # Block this from working in DMs
+ if not message.guild:
+ return
+
+ log.trace(f"Found {issues = }")
+ # Remove duplicates
+ issues = set(issues)
+
+ if len(issues) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await message.channel.send(embed=embed, delete_after=5)
+ return
+
+ for repo_issue in issues:
+ result = await self.fetch_issue(
+ int(repo_issue.number),
+ repo_issue.repository,
+ repo_issue.organisation or "python-discord"
+ )
+ if isinstance(result, IssueState):
+ links.append(result)
+
+ if not links:
+ return
+
+ resp = self.format_embed(links)
+ await message.channel.send(embed=resp)
+
+ async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]:
+ """Retrieve data as a dictionary and the response in a tuple."""
+ log.trace(f"Querying GH issues API: {url}")
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
+ return await r.json(), r
+
@github_group.command(name="user", aliases=("userinfo",))
async def github_user_info(self, ctx: commands.Context, username: str) -> None:
"""Fetches a user's GitHub information."""
async with ctx.typing():
- user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
+ user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")
# User_data will not have a message key if the user exists
if "message" in user_data:
@@ -50,7 +241,7 @@ class GithubInfo(commands.Cog):
await ctx.send(embed=embed)
return
- org_data = await self.fetch_data(user_data["organizations_url"])
+ org_data, _ = await self.fetch_data(user_data["organizations_url"])
orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
orgs_to_add = " | ".join(orgs)
@@ -67,7 +258,7 @@ class GithubInfo(commands.Cog):
embed = discord.Embed(
title=f"`{user_data['login']}`'s GitHub profile info",
description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "",
- colour=discord.Colour.blurple(),
+ colour=discord.Colour.og_blurple(),
url=user_data["html_url"],
timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ")
)
@@ -91,10 +282,7 @@ class GithubInfo(commands.Cog):
)
if user_data["type"] == "User":
- embed.add_field(
- name="Gists",
- value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
- )
+ embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")
embed.add_field(
name=f"Organization{'s' if len(orgs)!=1 else ''}",
@@ -123,7 +311,7 @@ class GithubInfo(commands.Cog):
return
async with ctx.typing():
- repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
+ repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
# There won't be a message key if this repo exists
if "message" in repo_data:
@@ -139,7 +327,7 @@ class GithubInfo(commands.Cog):
embed = discord.Embed(
title=repo_data["name"],
description=repo_data["description"],
- colour=discord.Colour.blurple(),
+ colour=discord.Colour.og_blurple(),
url=repo_data["html_url"]
)
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
deleted file mode 100644
index 8a7ebed0..00000000
--- a/bot/exts/utilities/issues.py
+++ /dev/null
@@ -1,275 +0,0 @@
-import logging
-import random
-import re
-from dataclasses import dataclass
-from typing import Optional, Union
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import (
- Categories,
- Channels,
- Colours,
- ERROR_REPLIES,
- Emojis,
- NEGATIVE_REPLIES,
- Tokens,
- WHITELISTED_CHANNELS
-)
-from bot.utils.decorators import whitelist_override
-from bot.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-BAD_RESPONSE = {
- 404: "Issue/pull request not located! Please enter a valid number!",
- 403: "Rate limit has been hit! Please try again later!"
-}
-REQUEST_HEADERS = {
- "Accept": "application/vnd.github.v3+json"
-}
-
-REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
-ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
-PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
-
-if GITHUB_TOKEN := Tokens.github:
- REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
-
-WHITELISTED_CATEGORIES = (
- Categories.development, Categories.devprojects, Categories.media, Categories.staff
-)
-
-CODE_BLOCK_RE = re.compile(
- r"^`([^`\n]+)`" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-
-# Maximum number of issues in one message
-MAXIMUM_ISSUES = 5
-
-# Regex used when looking for automatic linking in messages
-# regex101 of current regex https://regex101.com/r/V2ji8M/6
-AUTOMATIC_REGEX = re.compile(
- r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
-)
-
-
-@dataclass
-class FoundIssue:
- """Dataclass representing an issue found by the regex."""
-
- organisation: Optional[str]
- repository: str
- number: str
-
- def __hash__(self) -> int:
- return hash((self.organisation, self.repository, self.number))
-
-
-@dataclass
-class FetchError:
- """Dataclass representing an error while fetching an issue."""
-
- return_code: int
- message: str
-
-
-@dataclass
-class IssueState:
- """Dataclass representing the state of an issue."""
-
- repository: str
- number: int
- url: str
- title: str
- emoji: str
-
-
-class Issues(commands.Cog):
- """Cog that allows users to retrieve issues from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.repos = []
-
- @staticmethod
- def remove_codeblocks(message: str) -> str:
- """Remove any codeblock in a message."""
- return re.sub(CODE_BLOCK_RE, "", message)
-
- async def fetch_issues(
- self,
- number: int,
- repository: str,
- user: str
- ) -> Union[IssueState, FetchError]:
- """
- Retrieve an issue from a GitHub repository.
-
- Returns IssueState on success, FetchError on failure.
- """
- url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
- pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
- log.trace(f"Querying GH issues API: {url}")
-
- async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
- json_data = await r.json()
-
- if r.status == 403:
- if r.headers.get("X-RateLimit-Remaining") == "0":
- log.info(f"Ratelimit reached while fetching {url}")
- return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
- return FetchError(403, "Cannot access issue.")
- elif r.status in (404, 410):
- return FetchError(r.status, "Issue not found.")
- elif r.status != 200:
- return FetchError(r.status, "Error while fetching issue.")
-
- # The initial API request is made to the issues API endpoint, which will return information
- # if the issue or PR is present. However, the scope of information returned for PRs differs
- # from issues: if the 'issues' key is present in the response then we can pull the data we
- # need from the initial API call.
- if "issues" in json_data["html_url"]:
- if json_data.get("state") == "open":
- emoji = Emojis.issue_open
- else:
- emoji = Emojis.issue_closed
-
- # If the 'issues' key is not contained in the API response and there is no error code, then
- # we know that a PR has been requested and a call to the pulls API endpoint is necessary
- # to get the desired information for the PR.
- else:
- log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
- async with self.bot.http_session.get(pulls_url) as p:
- pull_data = await p.json()
- if pull_data["draft"]:
- emoji = Emojis.pull_request_draft
- elif pull_data["state"] == "open":
- emoji = Emojis.pull_request_open
- # When 'merged_at' is not None, this means that the state of the PR is merged
- elif pull_data["merged_at"] is not None:
- emoji = Emojis.pull_request_merged
- else:
- emoji = Emojis.pull_request_closed
-
- issue_url = json_data.get("html_url")
-
- return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
-
- @staticmethod
- def format_embed(
- results: list[Union[IssueState, FetchError]],
- user: str,
- repository: Optional[str] = None
- ) -> discord.Embed:
- """Take a list of IssueState or FetchError and format a Discord embed for them."""
- description_list = []
-
- for result in results:
- if isinstance(result, IssueState):
- description_list.append(f"{result.emoji} [{result.title}]({result.url})")
- elif isinstance(result, FetchError):
- description_list.append(f":x: [{result.return_code}] {result.message}")
-
- resp = discord.Embed(
- colour=Colours.bright_green,
- description="\n".join(description_list)
- )
-
- embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
- resp.set_author(name="GitHub", url=embed_url)
- return resp
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(aliases=("pr",))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
- # Remove duplicates
- numbers = set(numbers)
-
- if len(numbers) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await ctx.send(embed=embed)
- await invoke_help_command(ctx)
-
- results = [await self.fetch_issues(number, repository, user) for number in numbers]
- await ctx.send(embed=self.format_embed(results, user, repository))
-
- @commands.Cog.listener()
- async def on_message(self, message: discord.Message) -> None:
- """
- Automatic issue linking.
-
- Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
- """
- # Ignore bots
- if message.author.bot:
- return
-
- issues = [
- FoundIssue(*match.group("org", "repo", "number"))
- for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
- ]
- links = []
-
- if issues:
- # Block this from working in DMs
- if not message.guild:
- await message.channel.send(
- embed=discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=(
- "You can't retrieve issues from DMs. "
- f"Try again in <#{Channels.community_bot_commands}>"
- ),
- colour=Colours.soft_red
- )
- )
- return
-
- log.trace(f"Found {issues = }")
- # Remove duplicates
- issues = set(issues)
-
- if len(issues) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await message.channel.send(embed=embed, delete_after=5)
- return
-
- for repo_issue in issues:
- result = await self.fetch_issues(
- int(repo_issue.number),
- repo_issue.repository,
- repo_issue.organisation or "python-discord"
- )
- if isinstance(result, IssueState):
- links.append(result)
-
- if not links:
- return
-
- resp = self.format_embed(links, "python-discord")
- await message.channel.send(embed=resp)
-
-
-def setup(bot: Bot) -> None:
- """Load the Issues cog."""
- bot.add_cog(Issues(bot))
diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py
deleted file mode 100644
index 36c7e0ab..00000000
--- a/bot/exts/utilities/latex.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import asyncio
-import hashlib
-import pathlib
-import re
-from concurrent.futures import ThreadPoolExecutor
-from io import BytesIO
-
-import discord
-import matplotlib.pyplot as plt
-from discord.ext import commands
-
-from bot.bot import Bot
-
-# configure fonts and colors for matplotlib
-plt.rcParams.update(
- {
- "font.size": 16,
- "mathtext.fontset": "cm", # Computer Modern font set
- "mathtext.rm": "serif",
- "figure.facecolor": "36393F", # matches Discord's dark mode background color
- "text.color": "white",
- }
-)
-
-FORMATTED_CODE_REGEX = re.compile(
- r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
- r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
- r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
- r"(?P<code>.*?)" # extract all code inside the markup
- r"\s*" # any more whitespace before the end of the code markup
- r"(?P=delim)", # match the exact same delimiter from the start again
- re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
-)
-
-CACHE_DIRECTORY = pathlib.Path("_latex_cache")
-CACHE_DIRECTORY.mkdir(exist_ok=True)
-
-
-class Latex(commands.Cog):
- """Renders latex."""
-
- @staticmethod
- def _render(text: str, filepath: pathlib.Path) -> BytesIO:
- """
- Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception.
-
- Saves rendered image to cache.
- """
- fig = plt.figure()
- rendered_image = BytesIO()
- fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top")
-
- try:
- plt.savefig(rendered_image, bbox_inches="tight", dpi=600)
- except ValueError as e:
- raise commands.BadArgument(str(e))
-
- rendered_image.seek(0)
-
- with open(filepath, "wb") as f:
- f.write(rendered_image.getbuffer())
-
- return rendered_image
-
- @staticmethod
- def _prepare_input(text: str) -> str:
- text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\
-
- if match := FORMATTED_CODE_REGEX.match(text):
- return match.group("code")
- else:
- return text
-
- @commands.command()
- @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
- async def latex(self, ctx: commands.Context, *, text: str) -> None:
- """Renders the text in latex and sends the image."""
- text = self._prepare_input(text)
- query_hash = hashlib.md5(text.encode()).hexdigest()
- image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png")
- async with ctx.typing():
- if image_path.exists():
- await ctx.send(file=discord.File(image_path))
- return
-
- with ThreadPoolExecutor() as pool:
- image = await asyncio.get_running_loop().run_in_executor(
- pool, self._render, text, image_path
- )
-
- await ctx.send(file=discord.File(image, "latex.png"))
-
-
-def setup(bot: Bot) -> None:
- """Load the Latex Cog."""
- # As we have resource issues on this cog,
- # we have it currently disabled while we fix it.
- import logging
- logging.info("Latex cog is currently disabled. It won't be loaded.")
- return
- bot.add_cog(Latex())
diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py
index ef8b2638..bf8f1341 100644
--- a/bot/exts/utilities/realpython.py
+++ b/bot/exts/utilities/realpython.py
@@ -1,5 +1,6 @@
import logging
from html import unescape
+from typing import Optional
from urllib.parse import quote_plus
from discord import Embed
@@ -31,9 +32,18 @@ class RealPython(commands.Cog):
@commands.command(aliases=["rp"])
@commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
- async def realpython(self, ctx: commands.Context, *, user_search: str) -> None:
- """Send 5 articles that match the user's search terms."""
- params = {"q": user_search, "limit": 5, "kind": "article"}
+ async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None:
+ """
+ Send some articles from RealPython that match the search terms.
+
+ By default the top 5 matches are sent, this can be overwritten to
+ a number between 1 and 5 by specifying an amount before the search query.
+ """
+ if not 1 <= amount <= 5:
+ await ctx.send("`amount` must be between 1 and 5 (inclusive).")
+ return
+
+ params = {"q": user_search, "limit": amount, "kind": "article"}
async with self.bot.http_session.get(url=API_ROOT, params=params) as response:
if response.status != 200:
logger.error(
diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py
index e6cb5337..782583d2 100644
--- a/bot/exts/utilities/reddit.py
+++ b/bot/exts/utilities/reddit.py
@@ -244,7 +244,7 @@ class Reddit(Cog):
# Use only starting summary page for #reddit channel posts.
embed.description = self.build_pagination_pages(posts, paginate=False)
- embed.colour = Colour.blurple()
+ embed.colour = Colour.og_blurple()
return embed
@loop()
@@ -312,7 +312,7 @@ class Reddit(Cog):
await ctx.send(f"Here are the top {subreddit} posts of all time!")
embed = Embed(
- color=Colour.blurple()
+ color=Colour.og_blurple()
)
await ImagePaginator.paginate(pages, ctx, embed)
@@ -325,7 +325,7 @@ class Reddit(Cog):
await ctx.send(f"Here are today's top {subreddit} posts!")
embed = Embed(
- color=Colour.blurple()
+ color=Colour.og_blurple()
)
await ImagePaginator.paginate(pages, ctx, embed)
@@ -338,7 +338,7 @@ class Reddit(Cog):
await ctx.send(f"Here are this week's top {subreddit} posts!")
embed = Embed(
- color=Colour.blurple()
+ color=Colour.og_blurple()
)
await ImagePaginator.paginate(pages, ctx, embed)
@@ -349,7 +349,7 @@ class Reddit(Cog):
"""Send a paginated embed of all the subreddits we're relaying."""
embed = Embed()
embed.title = "Relayed subreddits."
- embed.colour = Colour.blurple()
+ embed.colour = Colour.og_blurple()
await LinePaginator.paginate(
RedditConfig.subreddits,
diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py
new file mode 100644
index 00000000..c915f05b
--- /dev/null
+++ b/bot/exts/utilities/twemoji.py
@@ -0,0 +1,150 @@
+import logging
+import re
+from typing import Literal, Optional
+
+import discord
+from discord.ext import commands
+from emoji import UNICODE_EMOJI_ENGLISH, is_emoji
+
+from bot.bot import Bot
+from bot.constants import Colours, Roles
+from bot.utils.decorators import whitelist_override
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+BASE_URLS = {
+ "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/",
+ "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/",
+}
+CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$")
+
+
+class Twemoji(commands.Cog):
+ """Utilities for working with Twemojis."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def get_url(codepoint: str, format: Literal["png", "svg"]) -> str:
+ """Returns a source file URL for the specified Twemoji, in the corresponding format."""
+ return f"{BASE_URLS[format]}{codepoint}.{format}"
+
+ @staticmethod
+ def alias_to_name(alias: str) -> str:
+ """
+ Transform a unicode alias to an emoji name.
+
+ Example usages:
+ >>> alias_to_name(":falling_leaf:")
+ "Falling leaf"
+ >>> alias_to_name(":family_man_girl_boy:")
+ "Family man girl boy"
+ """
+ name = alias.strip(":").replace("_", " ")
+ return name.capitalize()
+
+ @staticmethod
+ def build_embed(codepoint: str) -> discord.Embed:
+ """Returns the main embed for the `twemoji` commmand."""
+ emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-"))
+
+ embed = discord.Embed(
+ title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]),
+ description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})",
+ colour=Colours.twitter_blue,
+ )
+ embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png"))
+ return embed
+
+ @staticmethod
+ def emoji(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found.
+
+ The return value is an emoji character, such as "πŸ‚". The `codepoint`
+ argument can be of any format, since it will be trimmed automatically.
+ """
+ if code := Twemoji.trim_code(codepoint):
+ return chr(int(code, 16))
+
+ @staticmethod
+ def codepoint(emoji: Optional[str]) -> Optional[str]:
+ """
+ Returns the codepoint, in a trimmed format, of a single emoji.
+
+ `emoji` should be an emoji character, such as "🐍" and "πŸ₯°", and
+ not a codepoint like "1f1f8". When working with combined emojis,
+ such as "πŸ‡ΈπŸ‡ͺ" and "πŸ‘¨β€πŸ‘©β€πŸ‘¦", send the component emojis through the method
+ one at a time.
+ """
+ if emoji is None:
+ return None
+ return hex(ord(emoji)).removeprefix("0x")
+
+ @staticmethod
+ def trim_code(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the meaningful information from the given `codepoint`.
+
+ If no codepoint is found, `None` is returned.
+
+ Example usages:
+ >>> trim_code("U+1f1f8")
+ "1f1f8"
+ >>> trim_code("\u0001f1f8")
+ "1f1f8"
+ >>> trim_code("1f466")
+ "1f466"
+ """
+ if code := CODEPOINT_REGEX.search(codepoint or ""):
+ return code.group()
+
+ @staticmethod
+ def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str:
+ """
+ Returns the codepoint corresponding to the passed tuple, separated by "-".
+
+ The return format matches the format used in URLs for Twemoji source files.
+
+ Example usages:
+ >>> codepoint_from_input(("🐍",))
+ "1f40d"
+ >>> codepoint_from_input(("1f1f8", "1f1ea"))
+ "1f1f8-1f1ea"
+ >>> codepoint_from_input(("πŸ‘¨β€πŸ‘§β€πŸ‘¦",))
+ "1f468-200d-1f467-200d-1f466"
+ """
+ raw_emoji = [emoji.lower() for emoji in raw_emoji]
+ if is_emoji(raw_emoji[0]):
+ emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0])
+ return "-".join(emojis)
+
+ emoji = "".join(
+ Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji
+ )
+ if is_emoji(emoji):
+ return "-".join(Twemoji.codepoint(e) or "" for e in emoji)
+
+ raise ValueError("No codepoint could be obtained from the given input")
+
+ @commands.command(aliases=("tw",))
+ @whitelist_override(roles=(Roles.everyone,))
+ async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None:
+ """Sends a preview of a given Twemoji, specified by codepoint or emoji."""
+ if len(raw_emoji) == 0:
+ await invoke_help_command(ctx)
+ return
+ try:
+ codepoint = self.codepoint_from_input(raw_emoji)
+ except ValueError:
+ raise commands.BadArgument(
+ "please include a valid emoji or emoji codepoint."
+ )
+
+ await ctx.send(embed=self.build_embed(codepoint))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Twemoji cog."""
+ bot.add_cog(Twemoji(bot))
diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py
index eccc1f8c..e5e8e289 100644
--- a/bot/exts/utilities/wikipedia.py
+++ b/bot/exts/utilities/wikipedia.py
@@ -82,13 +82,11 @@ class WikipediaSearch(commands.Cog):
if contents:
embed = Embed(
title="Wikipedia Search Results",
- colour=Color.blurple()
+ colour=Color.og_blurple()
)
embed.set_thumbnail(url=WIKI_THUMBNAIL)
embed.timestamp = datetime.utcnow()
- await LinePaginator.paginate(
- contents, ctx, embed
- )
+ await LinePaginator.paginate(contents, ctx, embed, restrict_to_user=ctx.author)
else:
await ctx.send(
"Sorry, we could not find a wikipedia article using that search term."
diff --git a/bot/exts/utilities/wtf_python.py b/bot/exts/utilities/wtf_python.py
new file mode 100644
index 00000000..980b3dba
--- /dev/null
+++ b/bot/exts/utilities/wtf_python.py
@@ -0,0 +1,138 @@
+import logging
+import random
+import re
+from typing import Optional
+
+import rapidfuzz
+from discord import Embed, File
+from discord.ext import commands, tasks
+
+from bot import constants
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+
+WTF_PYTHON_RAW_URL = "http://raw.githubusercontent.com/satwikkansal/wtfpython/master/"
+BASE_URL = "https://github.com/satwikkansal/wtfpython"
+LOGO_PATH = "./bot/resources/utilities/wtf_python_logo.jpg"
+
+ERROR_MESSAGE = f"""
+Unknown WTF Python Query. Please try to reformulate your query.
+
+**Examples**:
+```md
+{constants.Client.prefix}wtf wild imports
+{constants.Client.prefix}wtf subclass
+{constants.Client.prefix}wtf del
+```
+If the problem persists send a message in <#{constants.Channels.dev_contrib}>
+"""
+
+MINIMUM_CERTAINTY = 55
+
+
+class WTFPython(commands.Cog):
+ """Cog that allows getting WTF Python entries from the WTF Python repository."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.headers: dict[str, str] = {}
+ self.fetch_readme.start()
+
+ @tasks.loop(minutes=60)
+ async def fetch_readme(self) -> None:
+ """Gets the content of README.md from the WTF Python Repository."""
+ async with self.bot.http_session.get(f"{WTF_PYTHON_RAW_URL}README.md") as resp:
+ log.trace("Fetching the latest WTF Python README.md")
+ if resp.status == 200:
+ raw = await resp.text()
+ self.parse_readme(raw)
+
+ def parse_readme(self, data: str) -> None:
+ """
+ Parses the README.md into a dict.
+
+ It parses the readme into the `self.headers` dict,
+ where the key is the heading and the value is the
+ link to the heading.
+ """
+ # Match the start of examples, until the end of the table of contents (toc)
+ table_of_contents = re.search(
+ r"\[πŸ‘€ Examples\]\(#-examples\)\n([\w\W]*)<!-- tocstop -->", data
+ )[0].split("\n")
+
+ for header in list(map(str.strip, table_of_contents)):
+ match = re.search(r"\[β–Ά (.*)\]\((.*)\)", header)
+ if match:
+ hyper_link = match[0].split("(")[1].replace(")", "")
+ self.headers[match[0]] = f"{BASE_URL}/{hyper_link}"
+
+ def fuzzy_match_header(self, query: str) -> Optional[str]:
+ """
+ Returns the fuzzy match of a query if its ratio is above "MINIMUM_CERTAINTY" else returns None.
+
+ "MINIMUM_CERTAINTY" is the lowest score at which the fuzzy match will return a result.
+ The certainty returned by rapidfuzz.process.extractOne is a score between 0 and 100,
+ with 100 being a perfect match.
+ """
+ match, certainty, _ = rapidfuzz.process.extractOne(query, self.headers.keys())
+ return match if certainty > MINIMUM_CERTAINTY else None
+
+ @commands.command(aliases=("wtf", "WTF"))
+ async def wtf_python(self, ctx: commands.Context, *, query: Optional[str] = None) -> None:
+ """
+ Search WTF Python repository.
+
+ Gets the link of the fuzzy matched query from https://github.com/satwikkansal/wtfpython.
+ Usage:
+ --> .wtf wild imports
+ """
+ if query is None:
+ no_query_embed = Embed(
+ title="WTF Python?!",
+ colour=constants.Colours.dark_green,
+ description="A repository filled with suprising snippets that can make you say WTF?!\n\n"
+ f"[Go to the Repository]({BASE_URL})"
+ )
+ logo = File(LOGO_PATH, filename="wtf_logo.jpg")
+ no_query_embed.set_thumbnail(url="attachment://wtf_logo.jpg")
+ await ctx.send(embed=no_query_embed, file=logo)
+ return
+
+ if len(query) > 50:
+ embed = Embed(
+ title=random.choice(constants.ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=constants.Colours.soft_red,
+ )
+ match = None
+ else:
+ match = self.fuzzy_match_header(query)
+
+ if not match:
+ embed = Embed(
+ title=random.choice(constants.ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=constants.Colours.soft_red,
+ )
+ await ctx.send(embed=embed)
+ return
+
+ embed = Embed(
+ title="WTF Python?!",
+ colour=constants.Colours.dark_green,
+ description=f"""Search result for '{query}': {match.split("]")[0].replace("[", "")}
+ [Go to Repository Section]({self.headers[match]})""",
+ )
+ logo = File(LOGO_PATH, filename="wtf_logo.jpg")
+ embed.set_thumbnail(url="attachment://wtf_logo.jpg")
+ await ctx.send(embed=embed, file=logo)
+
+ def cog_unload(self) -> None:
+ """Unload the cog and cancel the task."""
+ self.fetch_readme.cancel()
+
+
+def setup(bot: Bot) -> None:
+ """Load the WTFPython Cog."""
+ bot.add_cog(WTFPython(bot))