diff options
Diffstat (limited to 'bot/exts/utilities')
-rw-r--r-- | bot/exts/utilities/bookmark.py | 13 | ||||
-rw-r--r-- | bot/exts/utilities/challenges.py | 341 | ||||
-rw-r--r-- | bot/exts/utilities/colour.py | 266 | ||||
-rw-r--r-- | bot/exts/utilities/conversationstarters.py | 91 | ||||
-rw-r--r-- | bot/exts/utilities/emoji.py | 6 | ||||
-rw-r--r-- | bot/exts/utilities/epoch.py | 138 | ||||
-rw-r--r-- | bot/exts/utilities/githubinfo.py | 220 | ||||
-rw-r--r-- | bot/exts/utilities/issues.py | 275 | ||||
-rw-r--r-- | bot/exts/utilities/latex.py | 101 | ||||
-rw-r--r-- | bot/exts/utilities/realpython.py | 16 | ||||
-rw-r--r-- | bot/exts/utilities/reddit.py | 10 | ||||
-rw-r--r-- | bot/exts/utilities/twemoji.py | 150 | ||||
-rw-r--r-- | bot/exts/utilities/wikipedia.py | 6 | ||||
-rw-r--r-- | bot/exts/utilities/wtf_python.py | 138 |
14 files changed, 1343 insertions, 428 deletions
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py index a91ef1c0..b50205a0 100644 --- a/bot/exts/utilities/bookmark.py +++ b/bot/exts/utilities/bookmark.py @@ -7,7 +7,7 @@ import discord from discord.ext import commands from bot.bot import Bot -from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS +from bot.constants import Colours, ERROR_REPLIES, Icons, Roles from bot.utils.converters import WrappedMessageConverter from bot.utils.decorators import whitelist_override @@ -16,7 +16,6 @@ log = logging.getLogger(__name__) # Number of seconds to wait for other users to bookmark the same message TIMEOUT = 120 BOOKMARK_EMOJI = "π" -WHITELISTED_CATEGORIES = (Categories.help_in_use,) class Bookmark(commands.Cog): @@ -87,8 +86,8 @@ class Bookmark(commands.Cog): await message.add_reaction(BOOKMARK_EMOJI) return message - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) @commands.command(name="bookmark", aliases=("bm", "pin")) + @whitelist_override(roles=(Roles.everyone,)) async def bookmark( self, ctx: commands.Context, @@ -99,7 +98,13 @@ class Bookmark(commands.Cog): """Send the author a link to `target_message` via DMs.""" if not target_message: if not ctx.message.reference: - raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") + raise commands.UserInputError( + "You must either provide a valid message to bookmark, or reply to one." + "\n\nThe lookup strategy for a message is as follows (in order):" + "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" + "\n2. Lookup by message ID (the message **must** be in the context channel)" + "\n3. Lookup by message URL" + ) target_message = ctx.message.reference.resolved # Prevent users from bookmarking a message in a channel they don't have access to diff --git a/bot/exts/utilities/challenges.py b/bot/exts/utilities/challenges.py new file mode 100644 index 00000000..ab7ae442 --- /dev/null +++ b/bot/exts/utilities/challenges.py @@ -0,0 +1,341 @@ +import logging +from asyncio import to_thread +from random import choice +from typing import Union + +from bs4 import BeautifulSoup +from discord import Embed, Interaction, SelectOption, ui +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, Emojis, NEGATIVE_REPLIES + +log = logging.getLogger(__name__) +API_ROOT = "https://www.codewars.com/api/v1/code-challenges/{kata_id}" + +# Map difficulty for the kata to color we want to display in the embed. +# These colors are representative of the colors that each kyu's level represents on codewars.com +MAPPING_OF_KYU = { + 8: 0xdddbda, 7: 0xdddbda, 6: 0xecb613, 5: 0xecb613, + 4: 0x3c7ebb, 3: 0x3c7ebb, 2: 0x866cc7, 1: 0x866cc7 +} + +# Supported languages for a kata on codewars.com +SUPPORTED_LANGUAGES = { + "stable": [ + "c", "c#", "c++", "clojure", "coffeescript", "coq", "crystal", "dart", "elixir", + "f#", "go", "groovy", "haskell", "java", "javascript", "kotlin", "lean", "lua", "nasm", + "php", "python", "racket", "ruby", "rust", "scala", "shell", "sql", "swift", "typescript" + ], + "beta": [ + "agda", "bf", "cfml", "cobol", "commonlisp", "elm", "erlang", "factor", + "forth", "fortran", "haxe", "idris", "julia", "nim", "objective-c", "ocaml", + "pascal", "perl", "powershell", "prolog", "purescript", "r", "raku", "reason", "solidity", "vb.net" + ] +} + + +class InformationDropdown(ui.Select): + """A dropdown inheriting from ui.Select that allows finding out other information about the kata.""" + + def __init__(self, language_embed: Embed, tags_embed: Embed, other_info_embed: Embed, main_embed: Embed): + options = [ + SelectOption( + label="Main Information", + description="See the kata's difficulty, description, etc.", + emoji="π" + ), + SelectOption( + label="Languages", + description="See what languages this kata supports!", + emoji=Emojis.reddit_post_text + ), + SelectOption( + label="Tags", + description="See what categories this kata falls under!", + emoji=Emojis.stackoverflow_tag + ), + SelectOption( + label="Other Information", + description="See how other people performed on this kata and more!", + emoji="βΉ" + ) + ] + + # We map the option label to the embed instance so that it can be easily looked up later in O(1) + self.mapping_of_embeds = { + "Main Information": main_embed, + "Languages": language_embed, + "Tags": tags_embed, + "Other Information": other_info_embed, + } + + super().__init__( + placeholder="See more information regarding this kata", + min_values=1, + max_values=1, + options=options + ) + + async def callback(self, interaction: Interaction) -> None: + """Callback for when someone clicks on a dropdown.""" + # Edit the message to the embed selected in the option + # The `original_message` attribute is set just after the message is sent with the view. + # The attribute is not set during initialization. + result_embed = self.mapping_of_embeds[self.values[0]] + await self.original_message.edit(embed=result_embed) + + +class Challenges(commands.Cog): + """ + Cog for the challenge command. + + The challenge command pulls a random kata from codewars.com. + A kata is the name for a challenge, specific to codewars.com. + + The challenge command also has filters to customize the kata that is given. + You can specify the language the kata should be from, difficulty and topic of the kata. + """ + + def __init__(self, bot: Bot): + self.bot = bot + + async def kata_id(self, search_link: str, params: dict) -> Union[str, Embed]: + """ + Uses bs4 to get the HTML code for the page of katas, where the page is the link of the formatted `search_link`. + + This will webscrape the search page with `search_link` and then get the ID of a kata for the + codewars.com API to use. + """ + async with self.bot.http_session.get(search_link, params=params) as response: + if response.status != 200: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="We ran into an error when getting the kata from codewars.com, try again later.", + color=Colours.soft_red + ) + log.error(f"Unexpected response from codewars.com, status code: {response.status}") + return error_embed + + soup = BeautifulSoup(await response.text(), features="lxml") + first_kata_div = await to_thread(soup.find_all, "div", class_="item-title px-0") + + if not first_kata_div: + raise commands.BadArgument("No katas could be found with the filters provided.") + elif len(first_kata_div) >= 3: + first_kata_div = choice(first_kata_div[:3]) + elif "q=" not in search_link: + first_kata_div = choice(first_kata_div) + else: + first_kata_div = first_kata_div[0] + + # There are numerous divs before arriving at the id of the kata, which can be used for the link. + first_kata_id = first_kata_div.a["href"].split("/")[-1] + return first_kata_id + + async def kata_information(self, kata_id: str) -> Union[dict, Embed]: + """ + Returns the information about the Kata. + + Uses the codewars.com API to get information about the kata using `kata_id`. + """ + async with self.bot.http_session.get(API_ROOT.format(kata_id=kata_id)) as response: + if response.status != 200: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="We ran into an error when getting the kata information, try again later.", + color=Colours.soft_red + ) + log.error(f"Unexpected response from codewars.com/api/v1, status code: {response.status}") + return error_embed + + return await response.json() + + @staticmethod + def main_embed(kata_information: dict) -> Embed: + """Creates the main embed which displays the name, difficulty and description of the kata.""" + kata_description = kata_information["description"] + kata_url = f"https://codewars.com/kata/{kata_information['id']}" + + # Ensuring it isn't over the length 1024 + if len(kata_description) > 1024: + kata_description = "\n".join(kata_description[:1000].split("\n")[:-1]) + "..." + kata_description += f" [continue reading]({kata_url})" + + if kata_information["rank"]["name"] is None: + embed_color = 8 + kata_difficulty = "Unable to retrieve difficulty for beta languages." + else: + embed_color = int(kata_information["rank"]["name"].replace(" kyu", "")) + kata_difficulty = kata_information["rank"]["name"] + + kata_embed = Embed( + title=kata_information["name"], + description=kata_description, + color=MAPPING_OF_KYU[embed_color], + url=kata_url + ) + kata_embed.add_field(name="Difficulty", value=kata_difficulty, inline=False) + return kata_embed + + @staticmethod + def language_embed(kata_information: dict) -> Embed: + """Creates the 'language embed' which displays all languages the kata supports.""" + kata_url = f"https://codewars.com/kata/{kata_information['id']}" + + languages = "\n".join(map(str.title, kata_information["languages"])) + language_embed = Embed( + title=kata_information["name"], + description=f"```yaml\nSupported Languages:\n{languages}\n```", + color=Colours.python_blue, + url=kata_url + ) + return language_embed + + @staticmethod + def tags_embed(kata_information: dict) -> Embed: + """ + Creates the 'tags embed' which displays all the tags of the Kata. + + Tags explain what the kata is about, this is what codewars.com calls categories. + """ + kata_url = f"https://codewars.com/kata/{kata_information['id']}" + + tags = "\n".join(kata_information["tags"]) + tags_embed = Embed( + title=kata_information["name"], + description=f"```yaml\nTags:\n{tags}\n```", + color=Colours.grass_green, + url=kata_url + ) + return tags_embed + + @staticmethod + def miscellaneous_embed(kata_information: dict) -> Embed: + """ + Creates the 'other information embed' which displays miscellaneous information about the kata. + + This embed shows statistics such as the total number of people who completed the kata, + the total number of stars of the kata, etc. + """ + kata_url = f"https://codewars.com/kata/{kata_information['id']}" + + embed = Embed( + title=kata_information["name"], + description="```nim\nOther Information\n```", + color=Colours.grass_green, + url=kata_url + ) + embed.add_field( + name="`Total Score`", + value=f"```css\n{kata_information['voteScore']}\n```", + inline=False + ) + embed.add_field( + name="`Total Stars`", + value=f"```css\n{kata_information['totalStars']}\n```", + inline=False + ) + embed.add_field( + name="`Total Completed`", + value=f"```css\n{kata_information['totalCompleted']}\n```", + inline=False + ) + embed.add_field( + name="`Total Attempts`", + value=f"```css\n{kata_information['totalAttempts']}\n```", + inline=False + ) + return embed + + @staticmethod + def create_view(dropdown: InformationDropdown, link: str) -> ui.View: + """ + Creates the discord.py View for the Discord message components (dropdowns and buttons). + + The discord UI is implemented onto the embed, where the user can choose what information about the kata they + want, along with a link button to the kata itself. + """ + view = ui.View() + view.add_item(dropdown) + view.add_item(ui.Button(label="View the Kata", url=link)) + return view + + @commands.command(aliases=["kata"]) + @commands.cooldown(1, 5, commands.BucketType.user) + async def challenge(self, ctx: commands.Context, language: str = "python", *, query: str = None) -> None: + """ + The challenge command pulls a random kata (challenge) from codewars.com. + + The different ways to use this command are: + `.challenge <language>` - Pulls a random challenge within that language's scope. + `.challenge <language> <difficulty>` - The difficulty can be from 1-8, + 1 being the hardest, 8 being the easiest. This pulls a random challenge within that difficulty & language. + `.challenge <language> <query>` - Pulls a random challenge with the query provided under the language + `.challenge <language> <query>, <difficulty>` - Pulls a random challenge with the query provided, + under that difficulty within the language's scope. + """ + language = language.lower() + if language not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]: + raise commands.BadArgument("This is not a recognized language on codewars.com!") + + get_kata_link = f"https://codewars.com/kata/search/{language}" + params = {} + + if query is not None: + if "," in query: + query_splitted = query.split("," if ", " not in query else ", ") + + if len(query_splitted) > 2: + raise commands.BadArgument( + "There can only be one comma within the query, separating the difficulty and the query itself." + ) + + query, level = query_splitted + params["q"] = query + params["r[]"] = f"-{level}" + elif query.isnumeric(): + params["r[]"] = f"-{query}" + else: + params["q"] = query + + params["beta"] = str(language in SUPPORTED_LANGUAGES["beta"]).lower() + + first_kata_id = await self.kata_id(get_kata_link, params) + if isinstance(first_kata_id, Embed): + # We ran into an error when retrieving the website link + await ctx.send(embed=first_kata_id) + return + + kata_information = await self.kata_information(first_kata_id) + if isinstance(kata_information, Embed): + # Something went wrong when trying to fetch the kata information + await ctx.d(embed=kata_information) + return + + kata_embed = self.main_embed(kata_information) + language_embed = self.language_embed(kata_information) + tags_embed = self.tags_embed(kata_information) + miscellaneous_embed = self.miscellaneous_embed(kata_information) + + dropdown = InformationDropdown( + main_embed=kata_embed, + language_embed=language_embed, + tags_embed=tags_embed, + other_info_embed=miscellaneous_embed + ) + kata_view = self.create_view(dropdown, f"https://codewars.com/kata/{first_kata_id}") + original_message = await ctx.send( + embed=kata_embed, + view=kata_view + ) + dropdown.original_message = original_message + + wait_for_kata = await kata_view.wait() + if wait_for_kata: + await original_message.edit(embed=kata_embed, view=None) + + +def setup(bot: Bot) -> None: + """Load the Challenges cog.""" + bot.add_cog(Challenges(bot)) diff --git a/bot/exts/utilities/colour.py b/bot/exts/utilities/colour.py new file mode 100644 index 00000000..ee6bad93 --- /dev/null +++ b/bot/exts/utilities/colour.py @@ -0,0 +1,266 @@ +import colorsys +import json +import pathlib +import random +import string +from io import BytesIO +from typing import Optional + +import discord +import rapidfuzz +from PIL import Image, ImageColor +from discord.ext import commands + +from bot import constants +from bot.bot import Bot +from bot.exts.core.extensions import invoke_help_command +from bot.utils.decorators import whitelist_override + +THUMBNAIL_SIZE = (80, 80) + + +class Colour(commands.Cog): + """Cog for the Colour command.""" + + def __init__(self, bot: Bot): + self.bot = bot + with open(pathlib.Path("bot/resources/utilities/ryanzec_colours.json")) as f: + self.colour_mapping = json.load(f) + del self.colour_mapping['_'] # Delete source credit entry + + async def send_colour_response(self, ctx: commands.Context, rgb: tuple[int, int, int]) -> None: + """Create and send embed from user given colour information.""" + name = self._rgb_to_name(rgb) + try: + colour_or_color = ctx.invoked_parents[0] + except IndexError: + colour_or_color = "colour" + + colour_mode = ctx.invoked_with + if colour_mode == "random": + colour_mode = colour_or_color + input_colour = name + elif colour_mode in ("colour", "color"): + input_colour = ctx.kwargs["colour_input"] + elif colour_mode == "name": + input_colour = ctx.kwargs["user_colour_name"] + elif colour_mode == "hex": + input_colour = ctx.args[2:][0] + if len(input_colour) > 7: + input_colour = input_colour[0:-2] + else: + input_colour = tuple(ctx.args[2:]) + + if colour_mode not in ("name", "hex", "random", "color", "colour"): + colour_mode = colour_mode.upper() + else: + colour_mode = colour_mode.title() + + colour_embed = discord.Embed( + title=f"{name or input_colour}", + description=f"{colour_or_color.title()} information for {colour_mode} `{input_colour or name}`.", + colour=discord.Color.from_rgb(*rgb) + ) + colour_conversions = self.get_colour_conversions(rgb) + for colour_space, value in colour_conversions.items(): + colour_embed.add_field( + name=colour_space, + value=f"`{value}`", + inline=True + ) + + thumbnail = Image.new("RGB", THUMBNAIL_SIZE, color=rgb) + buffer = BytesIO() + thumbnail.save(buffer, "PNG") + buffer.seek(0) + thumbnail_file = discord.File(buffer, filename="colour.png") + + colour_embed.set_thumbnail(url="attachment://colour.png") + + await ctx.send(file=thumbnail_file, embed=colour_embed) + + @commands.group(aliases=("color",), invoke_without_command=True) + @whitelist_override( + channels=constants.WHITELISTED_CHANNELS, + roles=constants.STAFF_ROLES, + categories=[constants.Categories.development, constants.Categories.media] + ) + async def colour(self, ctx: commands.Context, *, colour_input: Optional[str] = None) -> None: + """ + Create an embed that displays colour information. + + If no subcommand is called, a randomly selected colour will be shown. + """ + if colour_input is None: + await self.random(ctx) + return + + try: + extra_colour = ImageColor.getrgb(colour_input) + await self.send_colour_response(ctx, extra_colour) + except ValueError: + await invoke_help_command(ctx) + + @colour.command() + async def rgb(self, ctx: commands.Context, red: int, green: int, blue: int) -> None: + """Create an embed from an RGB input.""" + if any(c not in range(256) for c in (red, green, blue)): + raise commands.BadArgument( + message=f"RGB values can only be from 0 to 255. User input was: `{red, green, blue}`." + ) + rgb_tuple = (red, green, blue) + await self.send_colour_response(ctx, rgb_tuple) + + @colour.command() + async def hsv(self, ctx: commands.Context, hue: int, saturation: int, value: int) -> None: + """Create an embed from an HSV input.""" + if (hue not in range(361)) or any(c not in range(101) for c in (saturation, value)): + raise commands.BadArgument( + message="Hue can only be from 0 to 360. Saturation and Value can only be from 0 to 100. " + f"User input was: `{hue, saturation, value}`." + ) + hsv_tuple = ImageColor.getrgb(f"hsv({hue}, {saturation}%, {value}%)") + await self.send_colour_response(ctx, hsv_tuple) + + @colour.command() + async def hsl(self, ctx: commands.Context, hue: int, saturation: int, lightness: int) -> None: + """Create an embed from an HSL input.""" + if (hue not in range(361)) or any(c not in range(101) for c in (saturation, lightness)): + raise commands.BadArgument( + message="Hue can only be from 0 to 360. Saturation and Lightness can only be from 0 to 100. " + f"User input was: `{hue, saturation, lightness}`." + ) + hsl_tuple = ImageColor.getrgb(f"hsl({hue}, {saturation}%, {lightness}%)") + await self.send_colour_response(ctx, hsl_tuple) + + @colour.command() + async def cmyk(self, ctx: commands.Context, cyan: int, magenta: int, yellow: int, key: int) -> None: + """Create an embed from a CMYK input.""" + if any(c not in range(101) for c in (cyan, magenta, yellow, key)): + raise commands.BadArgument( + message=f"CMYK values can only be from 0 to 100. User input was: `{cyan, magenta, yellow, key}`." + ) + r = round(255 * (1 - (cyan / 100)) * (1 - (key / 100))) + g = round(255 * (1 - (magenta / 100)) * (1 - (key / 100))) + b = round(255 * (1 - (yellow / 100)) * (1 - (key / 100))) + await self.send_colour_response(ctx, (r, g, b)) + + @colour.command() + async def hex(self, ctx: commands.Context, hex_code: str) -> None: + """Create an embed from a HEX input.""" + if hex_code[0] != "#": + hex_code = f"#{hex_code}" + + if len(hex_code) not in (4, 5, 7, 9) or any(digit not in string.hexdigits for digit in hex_code[1:]): + raise commands.BadArgument( + message=f"Cannot convert `{hex_code}` to a recognizable Hex format. " + "Hex values must be hexadecimal and take the form *#RRGGBB* or *#RGB*." + ) + + hex_tuple = ImageColor.getrgb(hex_code) + if len(hex_tuple) == 4: + hex_tuple = hex_tuple[:-1] # Colour must be RGB. If RGBA, we remove the alpha value + await self.send_colour_response(ctx, hex_tuple) + + @colour.command() + async def name(self, ctx: commands.Context, *, user_colour_name: str) -> None: + """Create an embed from a name input.""" + hex_colour = self.match_colour_name(ctx, user_colour_name) + if hex_colour is None: + name_error_embed = discord.Embed( + title="No colour match found.", + description=f"No colour found for: `{user_colour_name}`", + colour=discord.Color.dark_red() + ) + await ctx.send(embed=name_error_embed) + return + hex_tuple = ImageColor.getrgb(hex_colour) + await self.send_colour_response(ctx, hex_tuple) + + @colour.command() + async def random(self, ctx: commands.Context) -> None: + """Create an embed from a randomly chosen colour.""" + hex_colour = random.choice(list(self.colour_mapping.values())) + hex_tuple = ImageColor.getrgb(f"#{hex_colour}") + await self.send_colour_response(ctx, hex_tuple) + + def get_colour_conversions(self, rgb: tuple[int, int, int]) -> dict[str, str]: + """Create a dictionary mapping of colour types and their values.""" + colour_name = self._rgb_to_name(rgb) + if colour_name is None: + colour_name = "No match found" + return { + "RGB": rgb, + "HSV": self._rgb_to_hsv(rgb), + "HSL": self._rgb_to_hsl(rgb), + "CMYK": self._rgb_to_cmyk(rgb), + "Hex": self._rgb_to_hex(rgb), + "Name": colour_name + } + + @staticmethod + def _rgb_to_hsv(rgb: tuple[int, int, int]) -> tuple[int, int, int]: + """Convert RGB values to HSV values.""" + rgb_list = [val / 255 for val in rgb] + h, s, v = colorsys.rgb_to_hsv(*rgb_list) + hsv = (round(h * 360), round(s * 100), round(v * 100)) + return hsv + + @staticmethod + def _rgb_to_hsl(rgb: tuple[int, int, int]) -> tuple[int, int, int]: + """Convert RGB values to HSL values.""" + rgb_list = [val / 255.0 for val in rgb] + h, l, s = colorsys.rgb_to_hls(*rgb_list) + hsl = (round(h * 360), round(s * 100), round(l * 100)) + return hsl + + @staticmethod + def _rgb_to_cmyk(rgb: tuple[int, int, int]) -> tuple[int, int, int, int]: + """Convert RGB values to CMYK values.""" + rgb_list = [val / 255.0 for val in rgb] + if not any(rgb_list): + return 0, 0, 0, 100 + k = 1 - max(rgb_list) + c = round((1 - rgb_list[0] - k) * 100 / (1 - k)) + m = round((1 - rgb_list[1] - k) * 100 / (1 - k)) + y = round((1 - rgb_list[2] - k) * 100 / (1 - k)) + cmyk = (c, m, y, round(k * 100)) + return cmyk + + @staticmethod + def _rgb_to_hex(rgb: tuple[int, int, int]) -> str: + """Convert RGB values to HEX code.""" + hex_ = "".join([hex(val)[2:].zfill(2) for val in rgb]) + hex_code = f"#{hex_}".upper() + return hex_code + + def _rgb_to_name(self, rgb: tuple[int, int, int]) -> Optional[str]: + """Convert RGB values to a fuzzy matched name.""" + input_hex_colour = self._rgb_to_hex(rgb) + try: + match, certainty, _ = rapidfuzz.process.extractOne( + query=input_hex_colour, + choices=self.colour_mapping.values(), + score_cutoff=80 + ) + colour_name = [name for name, hex_code in self.colour_mapping.items() if hex_code == match][0] + except TypeError: + colour_name = None + return colour_name + + def match_colour_name(self, ctx: commands.Context, input_colour_name: str) -> Optional[str]: + """Convert a colour name to HEX code.""" + try: + match, certainty, _ = rapidfuzz.process.extractOne( + query=input_colour_name, + choices=self.colour_mapping.keys(), + score_cutoff=80 + ) + except (ValueError, TypeError): + return + return f"#{self.colour_mapping[match]}" + + +def setup(bot: Bot) -> None: + """Load the Colour cog.""" + bot.add_cog(Colour(bot)) diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py index dd537022..8bf2abfd 100644 --- a/bot/exts/utilities/conversationstarters.py +++ b/bot/exts/utilities/conversationstarters.py @@ -1,11 +1,15 @@ +import asyncio +from contextlib import suppress +from functools import partial from pathlib import Path +from typing import Union +import discord import yaml -from discord import Color, Embed from discord.ext import commands from bot.bot import Bot -from bot.constants import WHITELISTED_CHANNELS +from bot.constants import MODERATION_ROLES, WHITELISTED_CHANNELS from bot.utils.decorators import whitelist_override from bot.utils.randomization import RandomCycle @@ -35,35 +39,88 @@ TOPICS = { class ConvoStarters(commands.Cog): """General conversation topics.""" - @commands.command() - @whitelist_override(channels=ALL_ALLOWED_CHANNELS) - async def topic(self, ctx: commands.Context) -> None: + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def _build_topic_embed(channel_id: int) -> discord.Embed: """ - Responds with a random topic to start a conversation. + Build an embed containing a conversation topic. If in a Python channel, a python-related topic will be given. - Otherwise, a random conversation topic will be received by the user. """ # No matter what, the form will be shown. - embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple()) + embed = discord.Embed( + description=f"Suggest more topics [here]({SUGGESTION_FORM})!", + color=discord.Colour.og_blurple() + ) try: - # Fetching topics. - channel_topics = TOPICS[ctx.channel.id] - - # If the channel isn't Python-related. + channel_topics = TOPICS[channel_id] except KeyError: + # Channel doesn't have any topics. embed.title = f"**{next(TOPICS['default'])}**" - - # If the channel ID doesn't have any topics. else: embed.title = f"**{next(channel_topics)}**" + return embed + + @staticmethod + def _predicate( + command_invoker: Union[discord.User, discord.Member], + message: discord.Message, + reaction: discord.Reaction, + user: discord.User + ) -> bool: + user_is_moderator = any(role.id in MODERATION_ROLES for role in getattr(user, "roles", [])) + user_is_invoker = user.id == command_invoker.id + + is_right_reaction = all(( + reaction.message.id == message.id, + str(reaction.emoji) == "π", + user_is_moderator or user_is_invoker + )) + return is_right_reaction + + async def _listen_for_refresh( + self, + command_invoker: Union[discord.User, discord.Member], + message: discord.Message + ) -> None: + await message.add_reaction("π") + while True: + try: + reaction, user = await self.bot.wait_for( + "reaction_add", + check=partial(self._predicate, command_invoker, message), + timeout=60.0 + ) + except asyncio.TimeoutError: + with suppress(discord.NotFound): + await message.clear_reaction("π") + break + + try: + await message.edit(embed=self._build_topic_embed(message.channel.id)) + except discord.NotFound: + break + + with suppress(discord.NotFound): + await message.remove_reaction(reaction, user) - finally: - await ctx.send(embed=embed) + @commands.command() + @commands.cooldown(1, 60*2, commands.BucketType.channel) + @whitelist_override(channels=ALL_ALLOWED_CHANNELS) + async def topic(self, ctx: commands.Context) -> None: + """ + Responds with a random topic to start a conversation. + + Allows the refresh of a topic by pressing an emoji. + """ + message = await ctx.send(embed=self._build_topic_embed(ctx.channel.id)) + self.bot.loop.create_task(self._listen_for_refresh(ctx.author, message)) def setup(bot: Bot) -> None: """Load the ConvoStarters cog.""" - bot.add_cog(ConvoStarters()) + bot.add_cog(ConvoStarters(bot)) diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py index 55d6b8e9..fa438d7f 100644 --- a/bot/exts/utilities/emoji.py +++ b/bot/exts/utilities/emoji.py @@ -107,11 +107,11 @@ class Emojis(commands.Cog): title=f"Emoji Information: {emoji.name}", description=textwrap.dedent(f""" **Name:** {emoji.name} - **Created:** {time_since(emoji.created_at, precision="hours")} - **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} + **Created:** {time_since(emoji.created_at.replace(tzinfo=None), precision="hours")} + **Date:** {datetime.strftime(emoji.created_at.replace(tzinfo=None), "%d/%m/%Y")} **ID:** {emoji.id} """), - color=Color.blurple(), + color=Color.og_blurple(), url=str(emoji.url), ).set_thumbnail(url=emoji.url) diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py new file mode 100644 index 00000000..42312dd1 --- /dev/null +++ b/bot/exts/utilities/epoch.py @@ -0,0 +1,138 @@ +from typing import Optional, Union + +import arrow +import discord +from dateutil import parser +from discord.ext import commands + +from bot.bot import Bot +from bot.utils.extensions import invoke_help_command + +# https://discord.com/developers/docs/reference#message-formatting-timestamp-styles +STYLES = { + "Epoch": ("",), + "Short Time": ("t", "h:mm A",), + "Long Time": ("T", "h:mm:ss A"), + "Short Date": ("d", "MM/DD/YYYY"), + "Long Date": ("D", "MMMM D, YYYY"), + "Short Date/Time": ("f", "MMMM D, YYYY h:mm A"), + "Long Date/Time": ("F", "dddd, MMMM D, YYYY h:mm A"), + "Relative Time": ("R",) +} +DROPDOWN_TIMEOUT = 60 + + +class DateString(commands.Converter): + """Convert a relative or absolute date/time string to an arrow.Arrow object.""" + + async def convert(self, ctx: commands.Context, argument: str) -> Union[arrow.Arrow, Optional[tuple]]: + """ + Convert a relative or absolute date/time string to an arrow.Arrow object. + + Try to interpret the date string as a relative time. If conversion fails, try to interpret it as an absolute + time. Tokens that are not recognised are returned along with the part of the string that was successfully + converted to an arrow object. If the date string cannot be parsed, BadArgument is raised. + """ + try: + return arrow.utcnow().dehumanize(argument) + except (ValueError, OverflowError): + try: + dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True) + except parser.ParserError: + raise commands.BadArgument(f"`{argument}` Could not be parsed to a relative or absolute date.") + except OverflowError: + raise commands.BadArgument(f"`{argument}` Results in a date outside of the supported range.") + return arrow.get(dt), ignored_tokens + + +class Epoch(commands.Cog): + """Convert an entered time and date to a unix timestamp.""" + + @commands.command(name="epoch") + async def epoch(self, ctx: commands.Context, *, date_time: DateString = None) -> None: + """ + Convert an entered date/time string to the equivalent epoch. + + **Relative time** + Must begin with `in...` or end with `...ago`. + Accepted units: "seconds", "minutes", "hours", "days", "weeks", "months", "years". + eg `.epoch in a month 4 days and 2 hours` + + **Absolute time** + eg `.epoch 2022/6/15 16:43 -04:00` + Absolute times must be entered in descending orders of magnitude. + If AM or PM is left unspecified, the 24-hour clock is assumed. + Timezones are optional, and will default to UTC. The following timezone formats are accepted: + Z (UTC) + Β±HH:MM + Β±HHMM + Β±HH + + Times in the dropdown are shown in UTC + """ + if not date_time: + await invoke_help_command(ctx) + return + + if isinstance(date_time, tuple): + # Remove empty strings. Strip extra whitespace from the remaining items + ignored_tokens = list(map(str.strip, filter(str.strip, date_time[1]))) + date_time = date_time[0] + if ignored_tokens: + await ctx.send(f"Could not parse the following token(s): `{', '.join(ignored_tokens)}`") + await ctx.send(f"Date and time parsed as: `{date_time.format(arrow.FORMAT_RSS)}`") + + epoch = int(date_time.timestamp()) + view = TimestampMenuView(ctx, self._format_dates(date_time), epoch) + original = await ctx.send(f"`{epoch}`", view=view) + await view.wait() # wait until expiration before removing the dropdown + try: + await original.edit(view=None) + except discord.NotFound: # disregard the error message if the message is deleled + pass + + @staticmethod + def _format_dates(date: arrow.Arrow) -> list[str]: + """ + Return a list of date strings formatted according to the discord timestamp styles. + + These are used in the description of each style in the dropdown + """ + date = date.to('utc') + formatted = [str(int(date.timestamp()))] + formatted += [date.format(format[1]) for format in list(STYLES.values())[1:7]] + formatted.append(date.humanize()) + return formatted + + +class TimestampMenuView(discord.ui.View): + """View for the epoch command which contains a single `discord.ui.Select` dropdown component.""" + + def __init__(self, ctx: commands.Context, formatted_times: list[str], epoch: int): + super().__init__(timeout=DROPDOWN_TIMEOUT) + self.ctx = ctx + self.epoch = epoch + self.dropdown: discord.ui.Select = self.children[0] + for label, date_time in zip(STYLES.keys(), formatted_times): + self.dropdown.add_option(label=label, description=date_time) + + @discord.ui.select(placeholder="Select the format of your timestamp") + async def select_format(self, _: discord.ui.Select, interaction: discord.Interaction) -> discord.Message: + """Drop down menu which contains a list of formats which discord timestamps can take.""" + selected = interaction.data["values"][0] + if selected == "Epoch": + return await interaction.response.edit_message(content=f"`{self.epoch}`") + return await interaction.response.edit_message(content=f"`<t:{self.epoch}:{STYLES[selected][0]}>`") + + async def interaction_check(self, interaction: discord.Interaction) -> bool: + """Check to ensure that the interacting user is the user who invoked the command.""" + if interaction.user != self.ctx.author: + embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.") + await interaction.response.send_message(embed=embed, ephemeral=True) + return False + return True + + +def setup(bot: Bot) -> None: + """Load the Epoch cog.""" + bot.add_cog(Epoch()) diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py index d00b408d..963f54e5 100644 --- a/bot/exts/utilities/githubinfo.py +++ b/bot/exts/utilities/githubinfo.py @@ -1,30 +1,165 @@ import logging import random +import re +import typing as t +from dataclasses import dataclass from datetime import datetime -from urllib.parse import quote, quote_plus +from urllib.parse import quote import discord +from aiohttp import ClientResponse from discord.ext import commands from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES +from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens from bot.exts.core.extensions import invoke_help_command log = logging.getLogger(__name__) GITHUB_API_URL = "https://api.github.com" +REQUEST_HEADERS = { + "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if Tokens.github: + REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}" + +CODE_BLOCK_RE = re.compile( + r"^`([^`\n]+)`" # Inline codeblock + r"|```(.+?)```", # Multiline codeblock + re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( + r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass(eq=True, frozen=True) +class FoundIssue: + """Dataclass representing an issue found by the regex.""" + + organisation: t.Optional[str] + repository: str + number: str + + +@dataclass(eq=True, frozen=True) +class FetchError: + """Dataclass representing an error while fetching an issue.""" + + return_code: int + message: str + + +@dataclass(eq=True, frozen=True) +class IssueState: + """Dataclass representing the state of an issue.""" + + repository: str + number: int + url: str + title: str + emoji: str + class GithubInfo(commands.Cog): - """Fetches info from GitHub.""" + """A Cog that fetches info from GitHub.""" def __init__(self, bot: Bot): self.bot = bot + self.repos = [] + + @staticmethod + def remove_codeblocks(message: str) -> str: + """Remove any codeblock in a message.""" + return CODE_BLOCK_RE.sub("", message) + + async def fetch_issue( + self, + number: int, + repository: str, + user: str + ) -> t.Union[IssueState, FetchError]: + """ + Retrieve an issue from a GitHub repository. + + Returns IssueState on success, FetchError on failure. + """ + url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) + pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + + json_data, r = await self.fetch_data(url) + + if r.status == 403: + if r.headers.get("X-RateLimit-Remaining") == "0": + log.info(f"Ratelimit reached while fetching {url}") + return FetchError(403, "Ratelimit reached, please retry in a few minutes.") + return FetchError(403, "Cannot access issue.") + elif r.status in (404, 410): + return FetchError(r.status, "Issue not found.") + elif r.status != 200: + return FetchError(r.status, "Error while fetching issue.") + + # The initial API request is made to the issues API endpoint, which will return information + # if the issue or PR is present. However, the scope of information returned for PRs differs + # from issues: if the 'issues' key is present in the response then we can pull the data we + # need from the initial API call. + if "issues" in json_data["html_url"]: + if json_data.get("state") == "open": + emoji = Emojis.issue_open + else: + emoji = Emojis.issue_closed + + # If the 'issues' key is not contained in the API response and there is no error code, then + # we know that a PR has been requested and a call to the pulls API endpoint is necessary + # to get the desired information for the PR. + else: + pull_data, _ = await self.fetch_data(pulls_url) + if pull_data["draft"]: + emoji = Emojis.pull_request_draft + elif pull_data["state"] == "open": + emoji = Emojis.pull_request_open + # When 'merged_at' is not None, this means that the state of the PR is merged + elif pull_data["merged_at"] is not None: + emoji = Emojis.pull_request_merged + else: + emoji = Emojis.pull_request_closed + + issue_url = json_data.get("html_url") - async def fetch_data(self, url: str) -> dict: - """Retrieve data as a dictionary.""" - async with self.bot.http_session.get(url) as r: - return await r.json() + return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + + @staticmethod + def format_embed( + results: t.List[t.Union[IssueState, FetchError]] + ) -> discord.Embed: + """Take a list of IssueState or FetchError and format a Discord embed for them.""" + description_list = [] + + for result in results: + if isinstance(result, IssueState): + description_list.append(f"{result.emoji} [{result.title}]({result.url})") + elif isinstance(result, FetchError): + description_list.append(f":x: [{result.return_code}] {result.message}") + + resp = discord.Embed( + colour=Colours.bright_green, + description="\n".join(description_list) + ) + + resp.set_author(name="GitHub") + return resp @commands.group(name="github", aliases=("gh", "git")) @commands.cooldown(1, 10, commands.BucketType.user) @@ -33,11 +168,67 @@ class GithubInfo(commands.Cog): if ctx.invoked_subcommand is None: await invoke_help_command(ctx) + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """ + Automatic issue linking. + + Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. + """ + # Ignore bots + if message.author.bot: + return + + issues = [ + FoundIssue(*match.group("org", "repo", "number")) + for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) + ] + links = [] + + if issues: + # Block this from working in DMs + if not message.guild: + return + + log.trace(f"Found {issues = }") + # Remove duplicates + issues = set(issues) + + if len(issues) > MAXIMUM_ISSUES: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" + ) + await message.channel.send(embed=embed, delete_after=5) + return + + for repo_issue in issues: + result = await self.fetch_issue( + int(repo_issue.number), + repo_issue.repository, + repo_issue.organisation or "python-discord" + ) + if isinstance(result, IssueState): + links.append(result) + + if not links: + return + + resp = self.format_embed(links) + await message.channel.send(embed=resp) + + async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]: + """Retrieve data as a dictionary and the response in a tuple.""" + log.trace(f"Querying GH issues API: {url}") + async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: + return await r.json(), r + @github_group.command(name="user", aliases=("userinfo",)) async def github_user_info(self, ctx: commands.Context, username: str) -> None: """Fetches a user's GitHub information.""" async with ctx.typing(): - user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") + user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}") # User_data will not have a message key if the user exists if "message" in user_data: @@ -50,7 +241,7 @@ class GithubInfo(commands.Cog): await ctx.send(embed=embed) return - org_data = await self.fetch_data(user_data["organizations_url"]) + org_data, _ = await self.fetch_data(user_data["organizations_url"]) orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] orgs_to_add = " | ".join(orgs) @@ -67,7 +258,7 @@ class GithubInfo(commands.Cog): embed = discord.Embed( title=f"`{user_data['login']}`'s GitHub profile info", description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "", - colour=discord.Colour.blurple(), + colour=discord.Colour.og_blurple(), url=user_data["html_url"], timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ") ) @@ -91,10 +282,7 @@ class GithubInfo(commands.Cog): ) if user_data["type"] == "User": - embed.add_field( - name="Gists", - value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" - ) + embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})") embed.add_field( name=f"Organization{'s' if len(orgs)!=1 else ''}", @@ -123,7 +311,7 @@ class GithubInfo(commands.Cog): return async with ctx.typing(): - repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") # There won't be a message key if this repo exists if "message" in repo_data: @@ -139,7 +327,7 @@ class GithubInfo(commands.Cog): embed = discord.Embed( title=repo_data["name"], description=repo_data["description"], - colour=discord.Colour.blurple(), + colour=discord.Colour.og_blurple(), url=repo_data["html_url"] ) diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py deleted file mode 100644 index 8a7ebed0..00000000 --- a/bot/exts/utilities/issues.py +++ /dev/null @@ -1,275 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( - Categories, - Channels, - Colours, - ERROR_REPLIES, - Emojis, - NEGATIVE_REPLIES, - Tokens, - WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { - 404: "Issue/pull request not located! Please enter a valid number!", - 403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { - "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: - REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( - Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( - r"^`([^`\n]+)`" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( - r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: - """Dataclass representing an issue found by the regex.""" - - organisation: Optional[str] - repository: str - number: str - - def __hash__(self) -> int: - return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: - """Dataclass representing an error while fetching an issue.""" - - return_code: int - message: str - - -@dataclass -class IssueState: - """Dataclass representing the state of an issue.""" - - repository: str - number: int - url: str - title: str - emoji: str - - -class Issues(commands.Cog): - """Cog that allows users to retrieve issues from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.repos = [] - - @staticmethod - def remove_codeblocks(message: str) -> str: - """Remove any codeblock in a message.""" - return re.sub(CODE_BLOCK_RE, "", message) - - async def fetch_issues( - self, - number: int, - repository: str, - user: str - ) -> Union[IssueState, FetchError]: - """ - Retrieve an issue from a GitHub repository. - - Returns IssueState on success, FetchError on failure. - """ - url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) - pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) - log.trace(f"Querying GH issues API: {url}") - - async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: - json_data = await r.json() - - if r.status == 403: - if r.headers.get("X-RateLimit-Remaining") == "0": - log.info(f"Ratelimit reached while fetching {url}") - return FetchError(403, "Ratelimit reached, please retry in a few minutes.") - return FetchError(403, "Cannot access issue.") - elif r.status in (404, 410): - return FetchError(r.status, "Issue not found.") - elif r.status != 200: - return FetchError(r.status, "Error while fetching issue.") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data["html_url"]: - if json_data.get("state") == "open": - emoji = Emojis.issue_open - else: - emoji = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") - async with self.bot.http_session.get(pulls_url) as p: - pull_data = await p.json() - if pull_data["draft"]: - emoji = Emojis.pull_request_draft - elif pull_data["state"] == "open": - emoji = Emojis.pull_request_open - # When 'merged_at' is not None, this means that the state of the PR is merged - elif pull_data["merged_at"] is not None: - emoji = Emojis.pull_request_merged - else: - emoji = Emojis.pull_request_closed - - issue_url = json_data.get("html_url") - - return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - - @staticmethod - def format_embed( - results: list[Union[IssueState, FetchError]], - user: str, - repository: Optional[str] = None - ) -> discord.Embed: - """Take a list of IssueState or FetchError and format a Discord embed for them.""" - description_list = [] - - for result in results: - if isinstance(result, IssueState): - description_list.append(f"{result.emoji} [{result.title}]({result.url})") - elif isinstance(result, FetchError): - description_list.append(f":x: [{result.return_code}] {result.message}") - - resp = discord.Embed( - colour=Colours.bright_green, - description="\n".join(description_list) - ) - - embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" - resp.set_author(name="GitHub", url=embed_url) - return resp - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(aliases=("pr",)) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" - # Remove duplicates - numbers = set(numbers) - - if len(numbers) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await ctx.send(embed=embed) - await invoke_help_command(ctx) - - results = [await self.fetch_issues(number, repository, user) for number in numbers] - await ctx.send(embed=self.format_embed(results, user, repository)) - - @commands.Cog.listener() - async def on_message(self, message: discord.Message) -> None: - """ - Automatic issue linking. - - Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. - """ - # Ignore bots - if message.author.bot: - return - - issues = [ - FoundIssue(*match.group("org", "repo", "number")) - for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) - ] - links = [] - - if issues: - # Block this from working in DMs - if not message.guild: - await message.channel.send( - embed=discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "You can't retrieve issues from DMs. " - f"Try again in <#{Channels.community_bot_commands}>" - ), - colour=Colours.soft_red - ) - ) - return - - log.trace(f"Found {issues = }") - # Remove duplicates - issues = set(issues) - - if len(issues) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await message.channel.send(embed=embed, delete_after=5) - return - - for repo_issue in issues: - result = await self.fetch_issues( - int(repo_issue.number), - repo_issue.repository, - repo_issue.organisation or "python-discord" - ) - if isinstance(result, IssueState): - links.append(result) - - if not links: - return - - resp = self.format_embed(links, "python-discord") - await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: - """Load the Issues cog.""" - bot.add_cog(Issues(bot)) diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py deleted file mode 100644 index 36c7e0ab..00000000 --- a/bot/exts/utilities/latex.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import hashlib -import pathlib -import re -from concurrent.futures import ThreadPoolExecutor -from io import BytesIO - -import discord -import matplotlib.pyplot as plt -from discord.ext import commands - -from bot.bot import Bot - -# configure fonts and colors for matplotlib -plt.rcParams.update( - { - "font.size": 16, - "mathtext.fontset": "cm", # Computer Modern font set - "mathtext.rm": "serif", - "figure.facecolor": "36393F", # matches Discord's dark mode background color - "text.color": "white", - } -) - -FORMATTED_CODE_REGEX = re.compile( - r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block - r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) - r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code - r"(?P<code>.*?)" # extract all code inside the markup - r"\s*" # any more whitespace before the end of the code markup - r"(?P=delim)", # match the exact same delimiter from the start again - re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive -) - -CACHE_DIRECTORY = pathlib.Path("_latex_cache") -CACHE_DIRECTORY.mkdir(exist_ok=True) - - -class Latex(commands.Cog): - """Renders latex.""" - - @staticmethod - def _render(text: str, filepath: pathlib.Path) -> BytesIO: - """ - Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. - - Saves rendered image to cache. - """ - fig = plt.figure() - rendered_image = BytesIO() - fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") - - try: - plt.savefig(rendered_image, bbox_inches="tight", dpi=600) - except ValueError as e: - raise commands.BadArgument(str(e)) - - rendered_image.seek(0) - - with open(filepath, "wb") as f: - f.write(rendered_image.getbuffer()) - - return rendered_image - - @staticmethod - def _prepare_input(text: str) -> str: - text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\ - - if match := FORMATTED_CODE_REGEX.match(text): - return match.group("code") - else: - return text - - @commands.command() - @commands.max_concurrency(1, commands.BucketType.guild, wait=True) - async def latex(self, ctx: commands.Context, *, text: str) -> None: - """Renders the text in latex and sends the image.""" - text = self._prepare_input(text) - query_hash = hashlib.md5(text.encode()).hexdigest() - image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") - async with ctx.typing(): - if image_path.exists(): - await ctx.send(file=discord.File(image_path)) - return - - with ThreadPoolExecutor() as pool: - image = await asyncio.get_running_loop().run_in_executor( - pool, self._render, text, image_path - ) - - await ctx.send(file=discord.File(image, "latex.png")) - - -def setup(bot: Bot) -> None: - """Load the Latex Cog.""" - # As we have resource issues on this cog, - # we have it currently disabled while we fix it. - import logging - logging.info("Latex cog is currently disabled. It won't be loaded.") - return - bot.add_cog(Latex()) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py index ef8b2638..bf8f1341 100644 --- a/bot/exts/utilities/realpython.py +++ b/bot/exts/utilities/realpython.py @@ -1,5 +1,6 @@ import logging from html import unescape +from typing import Optional from urllib.parse import quote_plus from discord import Embed @@ -31,9 +32,18 @@ class RealPython(commands.Cog): @commands.command(aliases=["rp"]) @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: - """Send 5 articles that match the user's search terms.""" - params = {"q": user_search, "limit": 5, "kind": "article"} + async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None: + """ + Send some articles from RealPython that match the search terms. + + By default the top 5 matches are sent, this can be overwritten to + a number between 1 and 5 by specifying an amount before the search query. + """ + if not 1 <= amount <= 5: + await ctx.send("`amount` must be between 1 and 5 (inclusive).") + return + + params = {"q": user_search, "limit": amount, "kind": "article"} async with self.bot.http_session.get(url=API_ROOT, params=params) as response: if response.status != 200: logger.error( diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py index e6cb5337..782583d2 100644 --- a/bot/exts/utilities/reddit.py +++ b/bot/exts/utilities/reddit.py @@ -244,7 +244,7 @@ class Reddit(Cog): # Use only starting summary page for #reddit channel posts. embed.description = self.build_pagination_pages(posts, paginate=False) - embed.colour = Colour.blurple() + embed.colour = Colour.og_blurple() return embed @loop() @@ -312,7 +312,7 @@ class Reddit(Cog): await ctx.send(f"Here are the top {subreddit} posts of all time!") embed = Embed( - color=Colour.blurple() + color=Colour.og_blurple() ) await ImagePaginator.paginate(pages, ctx, embed) @@ -325,7 +325,7 @@ class Reddit(Cog): await ctx.send(f"Here are today's top {subreddit} posts!") embed = Embed( - color=Colour.blurple() + color=Colour.og_blurple() ) await ImagePaginator.paginate(pages, ctx, embed) @@ -338,7 +338,7 @@ class Reddit(Cog): await ctx.send(f"Here are this week's top {subreddit} posts!") embed = Embed( - color=Colour.blurple() + color=Colour.og_blurple() ) await ImagePaginator.paginate(pages, ctx, embed) @@ -349,7 +349,7 @@ class Reddit(Cog): """Send a paginated embed of all the subreddits we're relaying.""" embed = Embed() embed.title = "Relayed subreddits." - embed.colour = Colour.blurple() + embed.colour = Colour.og_blurple() await LinePaginator.paginate( RedditConfig.subreddits, diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py new file mode 100644 index 00000000..c915f05b --- /dev/null +++ b/bot/exts/utilities/twemoji.py @@ -0,0 +1,150 @@ +import logging +import re +from typing import Literal, Optional + +import discord +from discord.ext import commands +from emoji import UNICODE_EMOJI_ENGLISH, is_emoji + +from bot.bot import Bot +from bot.constants import Colours, Roles +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) +BASE_URLS = { + "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/", + "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/", +} +CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$") + + +class Twemoji(commands.Cog): + """Utilities for working with Twemojis.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def get_url(codepoint: str, format: Literal["png", "svg"]) -> str: + """Returns a source file URL for the specified Twemoji, in the corresponding format.""" + return f"{BASE_URLS[format]}{codepoint}.{format}" + + @staticmethod + def alias_to_name(alias: str) -> str: + """ + Transform a unicode alias to an emoji name. + + Example usages: + >>> alias_to_name(":falling_leaf:") + "Falling leaf" + >>> alias_to_name(":family_man_girl_boy:") + "Family man girl boy" + """ + name = alias.strip(":").replace("_", " ") + return name.capitalize() + + @staticmethod + def build_embed(codepoint: str) -> discord.Embed: + """Returns the main embed for the `twemoji` commmand.""" + emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-")) + + embed = discord.Embed( + title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]), + description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})", + colour=Colours.twitter_blue, + ) + embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png")) + return embed + + @staticmethod + def emoji(codepoint: Optional[str]) -> Optional[str]: + """ + Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found. + + The return value is an emoji character, such as "π". The `codepoint` + argument can be of any format, since it will be trimmed automatically. + """ + if code := Twemoji.trim_code(codepoint): + return chr(int(code, 16)) + + @staticmethod + def codepoint(emoji: Optional[str]) -> Optional[str]: + """ + Returns the codepoint, in a trimmed format, of a single emoji. + + `emoji` should be an emoji character, such as "π" and "π₯°", and + not a codepoint like "1f1f8". When working with combined emojis, + such as "πΈπͺ" and "π¨βπ©βπ¦", send the component emojis through the method + one at a time. + """ + if emoji is None: + return None + return hex(ord(emoji)).removeprefix("0x") + + @staticmethod + def trim_code(codepoint: Optional[str]) -> Optional[str]: + """ + Returns the meaningful information from the given `codepoint`. + + If no codepoint is found, `None` is returned. + + Example usages: + >>> trim_code("U+1f1f8") + "1f1f8" + >>> trim_code("\u0001f1f8") + "1f1f8" + >>> trim_code("1f466") + "1f466" + """ + if code := CODEPOINT_REGEX.search(codepoint or ""): + return code.group() + + @staticmethod + def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str: + """ + Returns the codepoint corresponding to the passed tuple, separated by "-". + + The return format matches the format used in URLs for Twemoji source files. + + Example usages: + >>> codepoint_from_input(("π",)) + "1f40d" + >>> codepoint_from_input(("1f1f8", "1f1ea")) + "1f1f8-1f1ea" + >>> codepoint_from_input(("π¨βπ§βπ¦",)) + "1f468-200d-1f467-200d-1f466" + """ + raw_emoji = [emoji.lower() for emoji in raw_emoji] + if is_emoji(raw_emoji[0]): + emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0]) + return "-".join(emojis) + + emoji = "".join( + Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji + ) + if is_emoji(emoji): + return "-".join(Twemoji.codepoint(e) or "" for e in emoji) + + raise ValueError("No codepoint could be obtained from the given input") + + @commands.command(aliases=("tw",)) + @whitelist_override(roles=(Roles.everyone,)) + async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None: + """Sends a preview of a given Twemoji, specified by codepoint or emoji.""" + if len(raw_emoji) == 0: + await invoke_help_command(ctx) + return + try: + codepoint = self.codepoint_from_input(raw_emoji) + except ValueError: + raise commands.BadArgument( + "please include a valid emoji or emoji codepoint." + ) + + await ctx.send(embed=self.build_embed(codepoint)) + + +def setup(bot: Bot) -> None: + """Load the Twemoji cog.""" + bot.add_cog(Twemoji(bot)) diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py index eccc1f8c..e5e8e289 100644 --- a/bot/exts/utilities/wikipedia.py +++ b/bot/exts/utilities/wikipedia.py @@ -82,13 +82,11 @@ class WikipediaSearch(commands.Cog): if contents: embed = Embed( title="Wikipedia Search Results", - colour=Color.blurple() + colour=Color.og_blurple() ) embed.set_thumbnail(url=WIKI_THUMBNAIL) embed.timestamp = datetime.utcnow() - await LinePaginator.paginate( - contents, ctx, embed - ) + await LinePaginator.paginate(contents, ctx, embed, restrict_to_user=ctx.author) else: await ctx.send( "Sorry, we could not find a wikipedia article using that search term." diff --git a/bot/exts/utilities/wtf_python.py b/bot/exts/utilities/wtf_python.py new file mode 100644 index 00000000..980b3dba --- /dev/null +++ b/bot/exts/utilities/wtf_python.py @@ -0,0 +1,138 @@ +import logging +import random +import re +from typing import Optional + +import rapidfuzz +from discord import Embed, File +from discord.ext import commands, tasks + +from bot import constants +from bot.bot import Bot + +log = logging.getLogger(__name__) + +WTF_PYTHON_RAW_URL = "http://raw.githubusercontent.com/satwikkansal/wtfpython/master/" +BASE_URL = "https://github.com/satwikkansal/wtfpython" +LOGO_PATH = "./bot/resources/utilities/wtf_python_logo.jpg" + +ERROR_MESSAGE = f""" +Unknown WTF Python Query. Please try to reformulate your query. + +**Examples**: +```md +{constants.Client.prefix}wtf wild imports +{constants.Client.prefix}wtf subclass +{constants.Client.prefix}wtf del +``` +If the problem persists send a message in <#{constants.Channels.dev_contrib}> +""" + +MINIMUM_CERTAINTY = 55 + + +class WTFPython(commands.Cog): + """Cog that allows getting WTF Python entries from the WTF Python repository.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.headers: dict[str, str] = {} + self.fetch_readme.start() + + @tasks.loop(minutes=60) + async def fetch_readme(self) -> None: + """Gets the content of README.md from the WTF Python Repository.""" + async with self.bot.http_session.get(f"{WTF_PYTHON_RAW_URL}README.md") as resp: + log.trace("Fetching the latest WTF Python README.md") + if resp.status == 200: + raw = await resp.text() + self.parse_readme(raw) + + def parse_readme(self, data: str) -> None: + """ + Parses the README.md into a dict. + + It parses the readme into the `self.headers` dict, + where the key is the heading and the value is the + link to the heading. + """ + # Match the start of examples, until the end of the table of contents (toc) + table_of_contents = re.search( + r"\[π Examples\]\(#-examples\)\n([\w\W]*)<!-- tocstop -->", data + )[0].split("\n") + + for header in list(map(str.strip, table_of_contents)): + match = re.search(r"\[βΆ (.*)\]\((.*)\)", header) + if match: + hyper_link = match[0].split("(")[1].replace(")", "") + self.headers[match[0]] = f"{BASE_URL}/{hyper_link}" + + def fuzzy_match_header(self, query: str) -> Optional[str]: + """ + Returns the fuzzy match of a query if its ratio is above "MINIMUM_CERTAINTY" else returns None. + + "MINIMUM_CERTAINTY" is the lowest score at which the fuzzy match will return a result. + The certainty returned by rapidfuzz.process.extractOne is a score between 0 and 100, + with 100 being a perfect match. + """ + match, certainty, _ = rapidfuzz.process.extractOne(query, self.headers.keys()) + return match if certainty > MINIMUM_CERTAINTY else None + + @commands.command(aliases=("wtf", "WTF")) + async def wtf_python(self, ctx: commands.Context, *, query: Optional[str] = None) -> None: + """ + Search WTF Python repository. + + Gets the link of the fuzzy matched query from https://github.com/satwikkansal/wtfpython. + Usage: + --> .wtf wild imports + """ + if query is None: + no_query_embed = Embed( + title="WTF Python?!", + colour=constants.Colours.dark_green, + description="A repository filled with suprising snippets that can make you say WTF?!\n\n" + f"[Go to the Repository]({BASE_URL})" + ) + logo = File(LOGO_PATH, filename="wtf_logo.jpg") + no_query_embed.set_thumbnail(url="attachment://wtf_logo.jpg") + await ctx.send(embed=no_query_embed, file=logo) + return + + if len(query) > 50: + embed = Embed( + title=random.choice(constants.ERROR_REPLIES), + description=ERROR_MESSAGE, + colour=constants.Colours.soft_red, + ) + match = None + else: + match = self.fuzzy_match_header(query) + + if not match: + embed = Embed( + title=random.choice(constants.ERROR_REPLIES), + description=ERROR_MESSAGE, + colour=constants.Colours.soft_red, + ) + await ctx.send(embed=embed) + return + + embed = Embed( + title="WTF Python?!", + colour=constants.Colours.dark_green, + description=f"""Search result for '{query}': {match.split("]")[0].replace("[", "")} + [Go to Repository Section]({self.headers[match]})""", + ) + logo = File(LOGO_PATH, filename="wtf_logo.jpg") + embed.set_thumbnail(url="attachment://wtf_logo.jpg") + await ctx.send(embed=embed, file=logo) + + def cog_unload(self) -> None: + """Unload the cog and cancel the task.""" + self.fetch_readme.cancel() + + +def setup(bot: Bot) -> None: + """Load the WTFPython Cog.""" + bot.add_cog(WTFPython(bot)) |