diff options
Diffstat (limited to 'bot/exts/evergreen')
47 files changed, 3657 insertions, 1269 deletions
| diff --git a/bot/exts/evergreen/8bitify.py b/bot/exts/evergreen/8bitify.py deleted file mode 100644 index 54e68f80..00000000 --- a/bot/exts/evergreen/8bitify.py +++ /dev/null @@ -1,54 +0,0 @@ -from io import BytesIO - -import discord -from PIL import Image -from discord.ext import commands - - -class EightBitify(commands.Cog): -    """Make your avatar 8bit!""" - -    def __init__(self, bot: commands.Bot) -> None: -        self.bot = bot - -    @staticmethod -    def pixelate(image: Image) -> Image: -        """Takes an image and pixelates it.""" -        return image.resize((32, 32), resample=Image.NEAREST).resize((1024, 1024), resample=Image.NEAREST) - -    @staticmethod -    def quantize(image: Image) -> Image: -        """Reduces colour palette to 256 colours.""" -        return image.quantize() - -    @commands.command(name="8bitify") -    async def eightbit_command(self, ctx: commands.Context) -> None: -        """Pixelates your avatar and changes the palette to an 8bit one.""" -        async with ctx.typing(): -            image_bytes = await ctx.author.avatar_url.read() -            avatar = Image.open(BytesIO(image_bytes)) -            avatar = avatar.convert("RGBA").resize((1024, 1024)) - -            eightbit = self.pixelate(avatar) -            eightbit = self.quantize(eightbit) - -            bufferedio = BytesIO() -            eightbit.save(bufferedio, format="PNG") -            bufferedio.seek(0) - -            file = discord.File(bufferedio, filename="8bitavatar.png") - -            embed = discord.Embed( -                title="Your 8-bit avatar", -                description='Here is your avatar. I think it looks all cool and "retro"' -            ) - -            embed.set_image(url="attachment://8bitavatar.png") -            embed.set_footer(text=f"Made by {ctx.author.display_name}", icon_url=ctx.author.avatar_url) - -        await ctx.send(file=file, embed=embed) - - -def setup(bot: commands.Bot) -> None: -    """Cog load.""" -    bot.add_cog(EightBitify(bot)) diff --git a/bot/exts/evergreen/avatar_modification/__init__.py b/bot/exts/evergreen/avatar_modification/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/evergreen/avatar_modification/__init__.py diff --git a/bot/exts/evergreen/avatar_modification/_effects.py b/bot/exts/evergreen/avatar_modification/_effects.py new file mode 100644 index 00000000..92244207 --- /dev/null +++ b/bot/exts/evergreen/avatar_modification/_effects.py @@ -0,0 +1,296 @@ +import math +import random +import typing as t +from io import BytesIO +from pathlib import Path + +import discord +from PIL import Image, ImageDraw, ImageOps + +from bot.constants import Colours + + +class PfpEffects: +    """ +    Implements various image modifying effects, for the PfpModify cog. + +    All of these functions are slow, and blocking, so they should be ran in executors. +    """ + +    @staticmethod +    def apply_effect(image_bytes: bytes, effect: t.Callable, filename: str, *args) -> discord.File: +        """Applies the given effect to the image passed to it.""" +        im = Image.open(BytesIO(image_bytes)) +        im = im.convert("RGBA") +        im = im.resize((1024, 1024)) +        im = effect(im, *args) + +        bufferedio = BytesIO() +        im.save(bufferedio, format="PNG") +        bufferedio.seek(0) + +        return discord.File(bufferedio, filename=filename) + +    @staticmethod +    def closest(x: t.Tuple[int, int, int]) -> t.Tuple[int, int, int]: +        """ +        Finds the closest "easter" colour to a given pixel. + +        Returns a merge between the original colour and the closest colour. +        """ +        r1, g1, b1 = x + +        def distance(point: t.Tuple[int, int, int]) -> t.Tuple[int, int, int]: +            """Finds the difference between a pastel colour and the original pixel colour.""" +            r2, g2, b2 = point +            return (r1 - r2) ** 2 + (g1 - g2) ** 2 + (b1 - b2) ** 2 + +        closest_colours = sorted(Colours.easter_like_colours, key=distance) +        r2, g2, b2 = closest_colours[0] +        r = (r1 + r2) // 2 +        g = (g1 + g2) // 2 +        b = (b1 + b2) // 2 + +        return r, g, b + +    @staticmethod +    def crop_avatar_circle(avatar: Image.Image) -> Image.Image: +        """This crops the avatar given into a circle.""" +        mask = Image.new("L", avatar.size, 0) +        draw = ImageDraw.Draw(mask) +        draw.ellipse((0, 0) + avatar.size, fill=255) +        avatar.putalpha(mask) +        return avatar + +    @staticmethod +    def crop_ring(ring: Image.Image, px: int) -> Image.Image: +        """This crops the given ring into a circle.""" +        mask = Image.new("L", ring.size, 0) +        draw = ImageDraw.Draw(mask) +        draw.ellipse((0, 0) + ring.size, fill=255) +        draw.ellipse((px, px, 1024-px, 1024-px), fill=0) +        ring.putalpha(mask) +        return ring + +    @staticmethod +    def pridify_effect(image: Image.Image, pixels: int, flag: str) -> Image.Image: +        """Applies the given pride effect to the given image.""" +        image = PfpEffects.crop_avatar_circle(image) + +        ring = Image.open(Path(f"bot/resources/pride/flags/{flag}.png")).resize((1024, 1024)) +        ring = ring.convert("RGBA") +        ring = PfpEffects.crop_ring(ring, pixels) + +        image.alpha_composite(ring, (0, 0)) +        return image + +    @staticmethod +    def eight_bitify_effect(image: Image.Image) -> Image.Image: +        """ +        Applies the 8bit effect to the given image. + +        This is done by reducing the image to 32x32 and then back up to 1024x1024. +        We then quantize the image before returning too. +        """ +        image = image.resize((32, 32), resample=Image.NEAREST) +        image = image.resize((1024, 1024), resample=Image.NEAREST) +        return image.quantize() + +    @staticmethod +    def flip_effect(image: Image.Image) -> Image.Image: +        """ +        Flips the image horizontally. + +        This is done by just using ImageOps.mirror(). +        """ +        image = ImageOps.mirror(image) + +        return image + +    @staticmethod +    def easterify_effect(image: Image.Image, overlay_image: t.Optional[Image.Image] = None) -> Image.Image: +        """ +        Applies the easter effect to the given image. + +        This is done by getting the closest "easter" colour to each pixel and changing the colour +        to the half-way RGB value. + +        We also then add an overlay image on top in middle right, a chocolate bunny by default. +        """ +        if overlay_image: +            ratio = 64 / overlay_image.height +            overlay_image = overlay_image.resize(( +                round(overlay_image.width * ratio), +                round(overlay_image.height * ratio) +            )) +            overlay_image = overlay_image.convert("RGBA") +        else: +            overlay_image = Image.open(Path("bot/resources/easter/chocolate_bunny.png")) + +        alpha = image.getchannel("A").getdata() +        image = image.convert("RGB") +        image = ImageOps.posterize(image, 6) + +        data = image.getdata() +        data_set = set(data) +        easterified_data_set = {} + +        for x in data_set: +            easterified_data_set[x] = PfpEffects.closest(x) +        new_pixel_data = [ +            (*easterified_data_set[x], alpha[i]) +            if x in easterified_data_set else x +            for i, x in enumerate(data) +        ] + +        im = Image.new("RGBA", image.size) +        im.putdata(new_pixel_data) +        im.alpha_composite( +            overlay_image, +            (im.width - overlay_image.width, (im.height - overlay_image.height) // 2) +        ) +        return im + +    @staticmethod +    def split_image(img: Image.Image, squares: int) -> list: +        """ +        Split an image into a selection of squares, specified by the squares argument. + +        Explanation: + +        1. It gets the width and the height of the Image passed to the function. + +        2. It gets the root of a number of squares (number of squares) passed, which is called xy. Reason: if let's say +        25 squares (number of squares) were passed, that is the total squares (split pieces) that the image is supposed +        to be split into. As it is known, a 2D shape has a height and a width, and in this case the program thinks of it +        as rows and columns. Rows multiplied by columns is equal to the passed squares (number of squares). To get rows +        and columns, since in this case, each square (split piece) is identical, rows are equal to columns and the +        program treats the image as a square-shaped, it gets the root out of the squares (number of squares) passed. + +        3. Now width and height are both of the original Image, Discord PFP, so when it comes to forming the squares, +        the program divides the original image's height and width by the xy. In a case of 25 squares (number of squares) +        passed, xy would be 5, so if an image was 250x300, x_frac would be 50 and y_frac - 60. Note: +        x_frac stands for a fracture of width. The reason it's called that is because it is shorter to use x for width +        in mind and then it's just half of the word fracture, same applies to y_frac, just height instead of width. +        x_frac and y_frac are width and height of a single square (split piece). + +        4. With left, top, right, bottom, = 0, 0, x_frac, y_frac, the program sets these variables to create the initial +        square (split piece). Explanation: all of these 4 variables start at the top left corner of the Image, by adding +        value to right and bottom, it's creating the initial square (split piece). + +        5. In the for loop, it keeps adding those squares (split pieces) in a row and once (index + 1) % xy == 0 is +        True, it adds to top and bottom to lower them and reset right and left to recreate the initial space between +        them, forming a square (split piece), it also adds the newly created square (split piece) into the new_imgs list +        where it stores them. The program keeps repeating this process till all 25 squares get added to the list. + +        6. It returns new_imgs, a list of squares (split pieces). +        """ +        width, heigth = img.size + +        xy = math.sqrt(squares) + +        x_frac = width // xy +        y_frac = heigth // xy + +        left, top, right, bottom, = 0, 0, x_frac, y_frac + +        new_imgs = [] + +        for index in range(squares): +            new_img = img.crop((left, top, right, bottom)) +            new_imgs.append(new_img) + +            if (index + 1) % xy == 0: +                top += y_frac +                bottom += y_frac +                left = 0 +                right = x_frac +            else: +                left += x_frac +                right += x_frac + +        return new_imgs + +    @staticmethod +    def join_images(images: t.List[Image.Image]) -> Image.Image: +        """ +        Stitches all the image squares into a new image. + +        Explanation: + +        1. Shuffles the passed images to randomize the pieces. + +        2. The program gets a single square (split piece) out of the list and defines single_width as the square's width +        and single_height as the square's height. + +        3. It gets the root of type integer of the number of images (split pieces) in the list and calls it multiplier. +        Program then proceeds to calculate total height and width of the new image that it's creating using the same +        multiplier. + +        4. The program then defines new_image as the image that it's creating, using the previously obtained total_width +        and total_height. + +        5. Now it defines width_multiplier as well as height with values of 0. These will be used to correctly position +        squares (split pieces) onto the new_image canvas. + +        6. Similar to how in the split_image function, the program gets the root of number of images in the list. +        In split_image function, it was the passed squares (number of squares) instead of a number of imgs in the +        list that it got the square of here. + +        7. In the for loop, as it iterates, the program multiplies single_width by width_multiplier to correctly +        position a square (split piece) width wise. It then proceeds to paste the newly positioned square (split piece) +        onto the new_image. The program increases the width_multiplier by 1 every iteration so the image wouldn't get +        pasted in the same spot and the positioning would move accordingly. It makes sure to increase the +        width_multiplier before the check, which checks if the end of a row has been reached, - +        (index + 1) % pieces == 0, so after it, if it was True, width_multiplier would have been reset to 0 (start of +        the row). If the check returns True, the height gets increased by a single square's (split piece) height to +        lower the positioning height wise and, as mentioned, the width_multiplier gets reset to 0 and width will +        then be calculated from the start of the new row. The for loop finishes once all the squares (split pieces) were +        positioned accordingly. + +        8. Finally, it returns the new_image, the randomized squares (split pieces) stitched back into the format of the +        original image - user's PFP. +        """ +        random.shuffle(images) +        single_img = images[0] + +        single_wdith = single_img.size[0] +        single_height = single_img.size[1] + +        multiplier = int(math.sqrt(len(images))) + +        total_width = multiplier * single_wdith +        total_height = multiplier * single_height + +        new_image = Image.new("RGBA", (total_width, total_height), (250, 250, 250)) + +        width_multiplier = 0 +        height = 0 + +        squares = math.sqrt(len(images)) + +        for index, image in enumerate(images): +            width = single_wdith * width_multiplier + +            new_image.paste(image, (width, height)) + +            width_multiplier += 1 + +            if (index + 1) % squares == 0: +                width_multiplier = 0 +                height += single_height + +        return new_image + +    @staticmethod +    def mosaic_effect(image: Image.Image, squares: int) -> Image.Image: +        """ +        Applies a mosaic effect to the given image. + +        The "squares" argument specifies the number of squares to split +        the image into. This should be a square number. +        """ +        img_squares = PfpEffects.split_image(image, squares) +        new_img = PfpEffects.join_images(img_squares) + +        return new_img diff --git a/bot/exts/evergreen/avatar_modification/avatar_modify.py b/bot/exts/evergreen/avatar_modification/avatar_modify.py new file mode 100644 index 00000000..7b4ae9c7 --- /dev/null +++ b/bot/exts/evergreen/avatar_modification/avatar_modify.py @@ -0,0 +1,372 @@ +import asyncio +import json +import logging +import math +import string +import typing as t +import unicodedata +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, Emojis +from bot.exts.evergreen.avatar_modification._effects import PfpEffects +from bot.utils.extensions import invoke_help_command +from bot.utils.halloween import spookifications + +log = logging.getLogger(__name__) + +_EXECUTOR = ThreadPoolExecutor(10) + +FILENAME_STRING = "{effect}_{author}.png" + +MAX_SQUARES = 10_000 + +T = t.TypeVar("T") + +GENDER_OPTIONS = json.loads(Path("bot/resources/pride/gender_options.json").read_text("utf8")) + + +async def in_executor(func: t.Callable[..., T], *args) -> T: +    """ +    Runs the given synchronous function `func` in an executor. + +    This is useful for running slow, blocking code within async +    functions, so that they don't block the bot. +    """ +    log.trace(f"Running {func.__name__} in an executor.") +    loop = asyncio.get_event_loop() +    return await loop.run_in_executor(_EXECUTOR, func, *args) + + +def file_safe_name(effect: str, display_name: str) -> str: +    """Returns a file safe filename based on the given effect and display name.""" +    valid_filename_chars = f"-_. {string.ascii_letters}{string.digits}" + +    file_name = FILENAME_STRING.format(effect=effect, author=display_name) + +    # Replace spaces +    file_name = file_name.replace(" ", "_") + +    # Normalize unicode characters +    cleaned_filename = unicodedata.normalize("NFKD", file_name).encode("ASCII", "ignore").decode() + +    # Remove invalid filename characters +    cleaned_filename = "".join(c for c in cleaned_filename if c in valid_filename_chars) +    return cleaned_filename + + +class AvatarModify(commands.Cog): +    """Various commands for users to apply affects to their own avatars.""" + +    def __init__(self, bot: Bot) -> None: +        self.bot = bot + +    async def _fetch_user(self, user_id: int) -> t.Optional[discord.User]: +        """ +        Fetches a user and handles errors. + +        This helper function is required as the member cache doesn't always have the most up to date +        profile picture. This can lead to errors if the image is deleted from the Discord CDN. +        fetch_member can't be used due to the avatar url being part of the user object, and +        some weird caching that D.py does +        """ +        try: +            user = await self.bot.fetch_user(user_id) +        except discord.errors.NotFound: +            log.debug(f"User {user_id} could not be found.") +            return None +        except discord.HTTPException: +            log.exception(f"Exception while trying to retrieve user {user_id} from Discord.") +            return None + +        return user + +    @commands.group(aliases=("avatar_mod", "pfp_mod", "avatarmod", "pfpmod")) +    async def avatar_modify(self, ctx: commands.Context) -> None: +        """Groups all of the pfp modifying commands to allow a single concurrency limit.""" +        if not ctx.invoked_subcommand: +            await invoke_help_command(ctx) + +    @avatar_modify.command(name="8bitify", root_aliases=("8bitify",)) +    async def eightbit_command(self, ctx: commands.Context) -> None: +        """Pixelates your avatar and changes the palette to an 8bit one.""" +        async with ctx.typing(): +            user = await self._fetch_user(ctx.author.id) +            if not user: +                await ctx.send(f"{Emojis.cross_mark} Could not get user info.") +                return + +            image_bytes = await user.avatar_url_as(size=1024).read() +            file_name = file_safe_name("eightbit_avatar", ctx.author.display_name) + +            file = await in_executor( +                PfpEffects.apply_effect, +                image_bytes, +                PfpEffects.eight_bitify_effect, +                file_name +            ) + +            embed = discord.Embed( +                title="Your 8-bit avatar", +                description="Here is your avatar. I think it looks all cool and 'retro'." +            ) + +            embed.set_image(url=f"attachment://{file_name}") +            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=user.avatar_url) + +        await ctx.send(embed=embed, file=file) + +    @avatar_modify.command(name="reverse", root_aliases=("reverse",)) +    async def reverse(self, ctx: commands.Context, *, text: t.Optional[str]) -> None: +        """ +        Reverses the sent text. + +        If no text is provided, the user's profile picture will be reversed. +        """ +        if text: +            await ctx.send(f"> {text[::-1]}", allowed_mentions=discord.AllowedMentions.none()) +            return + +        async with ctx.typing(): +            user = await self._fetch_user(ctx.author.id) +            if not user: +                await ctx.send(f"{Emojis.cross_mark} Could not get user info.") +                return + +            image_bytes = await user.avatar_url_as(size=1024).read() +            filename = file_safe_name("reverse_avatar", ctx.author.display_name) + +            file = await in_executor( +                PfpEffects.apply_effect, +                image_bytes, +                PfpEffects.flip_effect, +                filename +            ) + +            embed = discord.Embed( +                title="Your reversed avatar.", +                description="Here is your reversed avatar. I think it is a spitting image of you." +            ) + +            embed.set_image(url=f"attachment://{filename}") +            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=user.avatar_url) + +            await ctx.send(embed=embed, file=file) + +    @avatar_modify.command(aliases=("easterify",), root_aliases=("easterify", "avatareasterify")) +    async def avatareasterify(self, ctx: commands.Context, *colours: t.Union[discord.Colour, str]) -> None: +        """ +        This "Easterifies" the user's avatar. + +        Given colours will produce a personalised egg in the corner, similar to the egg_decorate command. +        If colours are not given, a nice little chocolate bunny will sit in the corner. +        Colours are split by spaces, unless you wrap the colour name in double quotes. +        Discord colour names, HTML colour names, XKCD colour names and hex values are accepted. +        """ +        async def send(*args, **kwargs) -> str: +            """ +            This replaces the original ctx.send. + +            When invoking the egg decorating command, the egg itself doesn't print to to the channel. +            Returns the message content so that if any errors occur, the error message can be output. +            """ +            if args: +                return args[0] + +        async with ctx.typing(): +            user = await self._fetch_user(ctx.author.id) +            if not user: +                await ctx.send(f"{Emojis.cross_mark} Could not get user info.") +                return + +            egg = None +            if colours: +                send_message = ctx.send +                ctx.send = send  # Assigns ctx.send to a fake send +                egg = await ctx.invoke(self.bot.get_command("eggdecorate"), *colours) +                if isinstance(egg, str):  # When an error message occurs in eggdecorate. +                    await send_message(egg) +                    return +                ctx.send = send_message  # Reassigns ctx.send + +            image_bytes = await user.avatar_url_as(size=256).read() +            file_name = file_safe_name("easterified_avatar", ctx.author.display_name) + +            file = await in_executor( +                PfpEffects.apply_effect, +                image_bytes, +                PfpEffects.easterify_effect, +                file_name, +                egg +            ) + +            embed = discord.Embed( +                name="Your Lovely Easterified Avatar!", +                description="Here is your lovely avatar, all bright and colourful\nwith Easter pastel colours. Enjoy :D" +            ) +            embed.set_image(url=f"attachment://{file_name}") +            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=user.avatar_url) + +        await ctx.send(file=file, embed=embed) + +    @staticmethod +    async def send_pride_image( +        ctx: commands.Context, +        image_bytes: bytes, +        pixels: int, +        flag: str, +        option: str +    ) -> None: +        """Gets and sends the image in an embed. Used by the pride commands.""" +        async with ctx.typing(): +            file_name = file_safe_name("pride_avatar", ctx.author.display_name) + +            file = await in_executor( +                PfpEffects.apply_effect, +                image_bytes, +                PfpEffects.pridify_effect, +                file_name, +                pixels, +                flag +            ) + +            embed = discord.Embed( +                name="Your Lovely Pride Avatar!", +                description=f"Here is your lovely avatar, surrounded by\n a beautiful {option} flag. Enjoy :D" +            ) +            embed.set_image(url=f"attachment://{file_name}") +            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.avatar_url) +            await ctx.send(file=file, embed=embed) + +    @avatar_modify.group( +        aliases=("avatarpride", "pridepfp", "prideprofile"), +        root_aliases=("prideavatar", "avatarpride", "pridepfp", "prideprofile"), +        invoke_without_command=True +    ) +    async def prideavatar(self, ctx: commands.Context, option: str = "lgbt", pixels: int = 64) -> None: +        """ +        This surrounds an avatar with a border of a specified LGBT flag. + +        This defaults to the LGBT rainbow flag if none is given. +        The amount of pixels can be given which determines the thickness of the flag border. +        This has a maximum of 512px and defaults to a 64px border. +        The full image is 1024x1024. +        """ +        option = option.lower() +        pixels = max(0, min(512, pixels)) +        flag = GENDER_OPTIONS.get(option) +        if flag is None: +            await ctx.send("I don't have that flag!") +            return + +        async with ctx.typing(): +            user = await self._fetch_user(ctx.author.id) +            if not user: +                await ctx.send(f"{Emojis.cross_mark} Could not get user info.") +                return +            image_bytes = await user.avatar_url_as(size=1024).read() +            await self.send_pride_image(ctx, image_bytes, pixels, flag, option) + +    @prideavatar.command() +    async def flags(self, ctx: commands.Context) -> None: +        """This lists the flags that can be used with the prideavatar command.""" +        choices = sorted(set(GENDER_OPTIONS.values())) +        options = "• " + "\n• ".join(choices) +        embed = discord.Embed( +            title="I have the following flags:", +            description=options, +            colour=Colours.soft_red +        ) +        await ctx.send(embed=embed) + +    @avatar_modify.command( +        aliases=("savatar", "spookify"), +        root_aliases=("spookyavatar", "spookify", "savatar"), +        brief="Spookify an user's avatar." +    ) +    async def spookyavatar(self, ctx: commands.Context) -> None: +        """This "spookifies" the user's avatar, with a random *spooky* effect.""" +        user = await self._fetch_user(ctx.author.id) +        if not user: +            await ctx.send(f"{Emojis.cross_mark} Could not get user info.") +            return + +        async with ctx.typing(): +            image_bytes = await user.avatar_url_as(size=1024).read() + +            file_name = file_safe_name("spooky_avatar", ctx.author.display_name) + +            file = await in_executor( +                PfpEffects.apply_effect, +                image_bytes, +                spookifications.get_random_effect, +                file_name +            ) + +            embed = discord.Embed( +                title="Is this you or am I just really paranoid?", +                colour=Colours.soft_red +            ) +            embed.set_image(url=f"attachment://{file_name}") +            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.avatar_url) + +            await ctx.send(file=file, embed=embed) + +    @avatar_modify.command(name="mosaic", root_aliases=("mosaic",)) +    async def mosaic_command(self, ctx: commands.Context, squares: int = 16) -> None: +        """Splits your avatar into x squares, randomizes them and stitches them back into a new image!""" +        async with ctx.typing(): +            user = await self._fetch_user(ctx.author.id) +            if not user: +                await ctx.send(f"{Emojis.cross_mark} Could not get user info.") +                return + +            if not 1 <= squares <= MAX_SQUARES: +                raise commands.BadArgument(f"Squares must be a positive number less than or equal to {MAX_SQUARES:,}.") + +            sqrt = math.sqrt(squares) + +            if not sqrt.is_integer(): +                squares = math.ceil(sqrt) ** 2  # Get the next perfect square + +            file_name = file_safe_name("mosaic_avatar", ctx.author.display_name) + +            img_bytes = await user.avatar_url_as(size=1024).read() + +            file = await in_executor( +                PfpEffects.apply_effect, +                img_bytes, +                PfpEffects.mosaic_effect, +                file_name, +                squares, +            ) + +            if squares == 1: +                title = "Hooh... that was a lot of work" +                description = "I present to you... Yourself!" +            elif squares == MAX_SQUARES: +                title = "Testing the limits I see..." +                description = "What a masterpiece. :star:" +            else: +                title = "Your mosaic avatar" +                description = f"Here is your avatar. I think it looks a bit *puzzling*\nMade with {squares} squares." + +            embed = discord.Embed( +                title=title, +                description=description, +                colour=Colours.blue +            ) + +            embed.set_image(url=f"attachment://{file_name}") +            embed.set_footer(text=f"Made by {ctx.author.display_name}", icon_url=user.avatar_url) + +            await ctx.send(file=file, embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the AvatarModify cog.""" +    bot.add_cog(AvatarModify(bot)) diff --git a/bot/exts/evergreen/battleship.py b/bot/exts/evergreen/battleship.py index fa3fb35c..c2f2079c 100644 --- a/bot/exts/evergreen/battleship.py +++ b/bot/exts/evergreen/battleship.py @@ -9,6 +9,7 @@ from functools import partial  import discord  from discord.ext import commands +from bot.bot import Bot  from bot.constants import Colours  log = logging.getLogger(__name__) @@ -30,8 +31,8 @@ EmojiSet = typing.Dict[typing.Tuple[bool, bool], str]  class Player:      """Each player in the game - their messages for the boards and their current grid.""" -    user: discord.Member -    board: discord.Message +    user: typing.Optional[discord.Member] +    board: typing.Optional[discord.Message]      opponent_board: discord.Message      grid: Grid @@ -95,7 +96,7 @@ class Game:      def __init__(          self, -        bot: commands.Bot, +        bot: Bot,          channel: discord.TextChannel,          player1: discord.Member,          player2: discord.Member @@ -227,7 +228,7 @@ class Game:              if message.content.lower() == "surrender":                  self.surrender = True                  return True -            self.match = re.match("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip()) +            self.match = re.fullmatch("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip())              if not self.match:                  self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI))              return bool(self.match) @@ -237,7 +238,7 @@ class Game:          square = None          turn_message = await self.turn.user.send(              "It's your turn! Type the square you want to fire at. Format it like this: A1\n" -            "Type `surrender` to give up" +            "Type `surrender` to give up."          )          await self.next.user.send("Their turn", delete_after=3.0)          while True: @@ -321,7 +322,7 @@ class Game:  class Battleship(commands.Cog):      """Play the classic game Battleship!""" -    def __init__(self, bot: commands.Bot) -> None: +    def __init__(self, bot: Bot) -> None:          self.bot = bot          self.games: typing.List[Game] = []          self.waiting: typing.List[discord.Member] = [] @@ -378,10 +379,12 @@ class Battleship(commands.Cog):          Make sure you have your DMs open so that the bot can message you.          """          if self.already_playing(ctx.author): -            return await ctx.send("You're already playing a game!") +            await ctx.send("You're already playing a game!") +            return          if ctx.author in self.waiting: -            return await ctx.send("You've already sent out a request for a player 2") +            await ctx.send("You've already sent out a request for a player 2.") +            return          announcement = await ctx.send(              "**Battleship**: A new game is about to start!\n" @@ -401,20 +404,22 @@ class Battleship(commands.Cog):          except asyncio.TimeoutError:              self.waiting.remove(ctx.author)              await announcement.delete() -            return await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...") +            await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...") +            return          if str(reaction.emoji) == CROSS_EMOJI:              self.waiting.remove(ctx.author)              await announcement.delete() -            return await ctx.send(f"{ctx.author.mention} Game cancelled.") +            await ctx.send(f"{ctx.author.mention} Game cancelled.") +            return          await announcement.delete()          self.waiting.remove(ctx.author)          if self.already_playing(ctx.author):              return +        game = Game(self.bot, ctx.channel, ctx.author, user) +        self.games.append(game)          try: -            game = Game(self.bot, ctx.channel, ctx.author, user) -            self.games.append(game)              await game.start_game()              self.games.remove(game)          except discord.Forbidden: @@ -425,11 +430,11 @@ class Battleship(commands.Cog):              self.games.remove(game)          except Exception:              # End the game in the event of an unforseen error so the players aren't stuck in a game -            await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed") +            await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed.")              self.games.remove(game)              raise -    @battleship.command(name="ships", aliases=["boats"]) +    @battleship.command(name="ships", aliases=("boats",))      async def battleship_ships(self, ctx: commands.Context) -> None:          """Lists the ships that are found on the battleship grid."""          embed = discord.Embed(colour=Colours.blue) @@ -438,6 +443,6 @@ class Battleship(commands.Cog):          await ctx.send(embed=embed) -def setup(bot: commands.Bot) -> None: -    """Cog load.""" +def setup(bot: Bot) -> None: +    """Load the Battleship Cog."""      bot.add_cog(Battleship(bot)) diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py index 5fa05d2e..f93371a6 100644 --- a/bot/exts/evergreen/bookmark.py +++ b/bot/exts/evergreen/bookmark.py @@ -1,34 +1,111 @@ +import asyncio  import logging  import random +import typing as t  import discord  from discord.ext import commands -from bot.constants import Colours, ERROR_REPLIES, Emojis, Icons +from bot.bot import Bot +from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS  from bot.utils.converters import WrappedMessageConverter +from bot.utils.decorators import whitelist_override  log = logging.getLogger(__name__) +# Number of seconds to wait for other users to bookmark the same message +TIMEOUT = 120 +BOOKMARK_EMOJI = "📌" +WHITELISTED_CATEGORIES = (Categories.help_in_use,) +  class Bookmark(commands.Cog):      """Creates personal bookmarks by relaying a message link to the user's DMs.""" -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot):          self.bot = bot +    @staticmethod +    def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed: +        """Build the embed to DM the bookmark requester.""" +        embed = discord.Embed( +            title=title, +            description=target_message.content, +            colour=Colours.soft_green +        ) +        embed.add_field( +            name="Wanna give it a visit?", +            value=f"[Visit original message]({target_message.jump_url})" +        ) +        embed.set_author(name=target_message.author, icon_url=target_message.author.avatar_url) +        embed.set_thumbnail(url=Icons.bookmark) + +        return embed + +    @staticmethod +    def build_error_embed(user: discord.Member) -> discord.Embed: +        """Builds an error embed for when a bookmark requester has DMs disabled.""" +        return discord.Embed( +            title=random.choice(ERROR_REPLIES), +            description=f"{user.mention}, please enable your DMs to receive the bookmark.", +            colour=Colours.soft_red +        ) + +    async def action_bookmark( +        self, +        channel: discord.TextChannel, +        user: discord.Member, +        target_message: discord.Message, +        title: str +    ) -> None: +        """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" +        try: +            embed = self.build_bookmark_dm(target_message, title) +            await user.send(embed=embed) +        except discord.Forbidden: +            error_embed = self.build_error_embed(user) +            await channel.send(embed=error_embed) +        else: +            log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") + +    @staticmethod +    async def send_reaction_embed( +        channel: discord.TextChannel, +        target_message: discord.Message +    ) -> discord.Message: +        """Sends an embed, with a reaction, so users can react to bookmark the message too.""" +        message = await channel.send( +            embed=discord.Embed( +                description=( +                    f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " +                    f"[this message]({target_message.jump_url})." +                ), +                colour=Colours.soft_green +            ) +        ) + +        await message.add_reaction(BOOKMARK_EMOJI) +        return message + +    @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)      @commands.command(name="bookmark", aliases=("bm", "pin"))      async def bookmark(          self,          ctx: commands.Context, -        target_message: WrappedMessageConverter, +        target_message: t.Optional[WrappedMessageConverter],          *,          title: str = "Bookmark"      ) -> None:          """Send the author a link to `target_message` via DMs.""" +        if not target_message: +            if not ctx.message.reference: +                raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") +            target_message = ctx.message.reference.resolved +          # Prevent users from bookmarking a message in a channel they don't have access to          permissions = ctx.author.permissions_in(target_message.channel)          if not permissions.read_messages: -            log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions") +            log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.")              embed = discord.Embed(                  title=random.choice(ERROR_REPLIES),                  color=Colours.soft_red, @@ -37,29 +114,40 @@ class Bookmark(commands.Cog):              await ctx.send(embed=embed)              return -        embed = discord.Embed( -            title=title, -            colour=Colours.soft_green, -            description=target_message.content -        ) -        embed.add_field(name="Wanna give it a visit?", value=f"[Visit original message]({target_message.jump_url})") -        embed.set_author(name=target_message.author, icon_url=target_message.author.avatar_url) -        embed.set_thumbnail(url=Icons.bookmark) - -        try: -            await ctx.author.send(embed=embed) -        except discord.Forbidden: -            error_embed = discord.Embed( -                title=random.choice(ERROR_REPLIES), -                description=f"{ctx.author.mention}, please enable your DMs to receive the bookmark", -                colour=Colours.soft_red +        def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: +            """Make sure that this reaction is what we want to operate on.""" +            return ( +                # Conditions for a successful pagination: +                all(( +                    # Reaction is on this message +                    reaction.message.id == reaction_message.id, +                    # User has not already bookmarked this message +                    user.id not in bookmarked_users, +                    # Reaction is the `BOOKMARK_EMOJI` emoji +                    str(reaction.emoji) == BOOKMARK_EMOJI, +                    # Reaction was not made by the Bot +                    user.id != self.bot.user.id +                ))              ) -            await ctx.send(embed=error_embed) -        else: -            log.info(f"{ctx.author} bookmarked {target_message.jump_url} with title '{title}'") -            await ctx.message.add_reaction(Emojis.envelope) +        await self.action_bookmark(ctx.channel, ctx.author, target_message, title) + +        # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs +        bookmarked_users = [ctx.author.id] +        reaction_message = await self.send_reaction_embed(ctx.channel, target_message) + +        while True: +            try: +                _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check) +            except asyncio.TimeoutError: +                log.debug("Timed out waiting for a reaction") +                break +            log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.") +            await self.action_bookmark(ctx.channel, user, target_message, title) +            bookmarked_users.append(user.id) + +        await reaction_message.delete() -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the Bookmark cog."""      bot.add_cog(Bookmark(bot)) diff --git a/bot/exts/evergreen/catify.py b/bot/exts/evergreen/catify.py new file mode 100644 index 00000000..32dfae09 --- /dev/null +++ b/bot/exts/evergreen/catify.py @@ -0,0 +1,86 @@ +import random +from contextlib import suppress +from typing import Optional + +from discord import AllowedMentions, Embed, Forbidden +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Cats, Colours, NEGATIVE_REPLIES +from bot.utils import helpers + + +class Catify(commands.Cog): +    """Cog for the catify command.""" + +    @commands.command(aliases=("ᓚᘏᗢify", "ᓚᘏᗢ")) +    @commands.cooldown(1, 5, commands.BucketType.user) +    async def catify(self, ctx: commands.Context, *, text: Optional[str]) -> None: +        """ +        Convert the provided text into a cat themed sentence by interspercing cats throughout text. + +        If no text is given then the users nickname is edited. +        """ +        if not text: +            display_name = ctx.author.display_name + +            if len(display_name) > 26: +                embed = Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description=( +                        "Your display name is too long to be catified! " +                        "Please change it to be under 26 characters." +                    ), +                    color=Colours.soft_red +                ) +                await ctx.send(embed=embed) +                return + +            else: +                display_name += f" | {random.choice(Cats.cats)}" + +                await ctx.send(f"Your catified nickname is: `{display_name}`", allowed_mentions=AllowedMentions.none()) + +                with suppress(Forbidden): +                    await ctx.author.edit(nick=display_name) +        else: +            if len(text) >= 1500: +                embed = Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description="Submitted text was too large! Please submit something under 1500 characters.", +                    color=Colours.soft_red +                ) +                await ctx.send(embed=embed) +                return + +            string_list = text.split() +            for index, name in enumerate(string_list): +                name = name.lower() +                if "cat" in name: +                    if random.randint(0, 5) == 5: +                        string_list[index] = name.replace("cat", f"**{random.choice(Cats.cats)}**") +                    else: +                        string_list[index] = name.replace("cat", random.choice(Cats.cats)) +                for element in Cats.cats: +                    if element in name: +                        string_list[index] = name.replace(element, "cat") + +            string_len = len(string_list) // 3 or len(string_list) + +            for _ in range(random.randint(1, string_len)): +                # insert cat at random index +                if random.randint(0, 5) == 5: +                    string_list.insert(random.randint(0, len(string_list)), f"**{random.choice(Cats.cats)}**") +                else: +                    string_list.insert(random.randint(0, len(string_list)), random.choice(Cats.cats)) + +            text = helpers.suppress_links(" ".join(string_list)) +            await ctx.send( +                f">>> {text}", +                allowed_mentions=AllowedMentions.none() +            ) + + +def setup(bot: Bot) -> None: +    """Loads the catify cog.""" +    bot.add_cog(Catify()) diff --git a/bot/exts/evergreen/cheatsheet.py b/bot/exts/evergreen/cheatsheet.py index 3fe709d5..ae7793c9 100644 --- a/bot/exts/evergreen/cheatsheet.py +++ b/bot/exts/evergreen/cheatsheet.py @@ -8,6 +8,7 @@ from discord.ext import commands  from discord.ext.commands import BucketType, Context  from bot import constants +from bot.bot import Bot  from bot.constants import Categories, Channels, Colours, ERROR_REPLIES  from bot.utils.decorators import whitelist_override @@ -23,17 +24,17 @@ Unknown cheat sheet. Please try to reformulate your query.  If the problem persists send a message in <#{Channels.dev_contrib}>  """ -URL = 'https://cheat.sh/python/{search}' +URL = "https://cheat.sh/python/{search}"  ESCAPE_TT = str.maketrans({"`": "\\`"})  ANSI_RE = re.compile(r"\x1b\[.*?m")  # We need to pass headers as curl otherwise it would default to aiohttp which would return raw html. -HEADERS = {'User-Agent': 'curl/7.68.0'} +HEADERS = {"User-Agent": "curl/7.68.0"}  class CheatSheet(commands.Cog):      """Commands that sends a result of a cht.sh search in code blocks.""" -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot):          self.bot = bot      @staticmethod @@ -60,14 +61,18 @@ class CheatSheet(commands.Cog):          body_space = min(1986 - len(url), 1000)          if len(body_text) > body_space: -            description = (f"**Result Of cht.sh**\n" -                           f"```python\n{body_text[:body_space]}\n" -                           f"... (truncated - too many lines)```\n" -                           f"Full results: {url} ") +            description = ( +                f"**Result Of cht.sh**\n" +                f"```python\n{body_text[:body_space]}\n" +                f"... (truncated - too many lines)```\n" +                f"Full results: {url} " +            )          else: -            description = (f"**Result Of cht.sh**\n" -                           f"```python\n{body_text}```\n" -                           f"{url}") +            description = ( +                f"**Result Of cht.sh**\n" +                f"```python\n{body_text}```\n" +                f"{url}" +            )          return False, description      @commands.command( @@ -102,6 +107,6 @@ class CheatSheet(commands.Cog):                  await ctx.send(content=description) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the CheatSheet cog."""      bot.add_cog(CheatSheet(bot)) diff --git a/bot/exts/evergreen/coinflip.py b/bot/exts/evergreen/coinflip.py new file mode 100644 index 00000000..d1762463 --- /dev/null +++ b/bot/exts/evergreen/coinflip.py @@ -0,0 +1,54 @@ +import random +from typing import Tuple + +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Emojis + + +class CoinSide(commands.Converter): +    """Class used to convert the `side` parameter of coinflip command.""" + +    HEADS: Tuple[str] = ("h", "head", "heads") +    TAILS: Tuple[str] = ("t", "tail", "tails") + +    async def convert(self, ctx: commands.Context, side: str) -> str: +        """Converts the provided `side` into the corresponding string.""" +        side = side.lower() +        if side in self.HEADS: +            return "heads" + +        if side in self.TAILS: +            return "tails" + +        raise commands.BadArgument(f"{side!r} is not a valid coin side.") + + +class CoinFlip(commands.Cog): +    """Cog for the CoinFlip command.""" + +    @commands.command(name="coinflip", aliases=("flip", "coin", "cf")) +    async def coinflip_command(self, ctx: commands.Context, side: CoinSide = None) -> None: +        """ +        Flips a coin. + +        If `side` is provided will state whether you guessed the side correctly. +        """ +        flipped_side = random.choice(["heads", "tails"]) + +        message = f"{ctx.author.mention} flipped **{flipped_side}**. " +        if not side: +            await ctx.send(message) +            return + +        if side == flipped_side: +            message += f"You guessed correctly! {Emojis.lemon_hyperpleased}" +        else: +            message += f"You guessed incorrectly. {Emojis.lemon_pensive}" +        await ctx.send(message) + + +def setup(bot: Bot) -> None: +    """Loads the coinflip cog.""" +    bot.add_cog(CoinFlip()) diff --git a/bot/exts/evergreen/connect_four.py b/bot/exts/evergreen/connect_four.py index 7e3ec42b..5c82ffee 100644 --- a/bot/exts/evergreen/connect_four.py +++ b/bot/exts/evergreen/connect_four.py @@ -8,6 +8,7 @@ import emojis  from discord.ext import commands  from discord.ext.commands import guild_only +from bot.bot import Bot  from bot.constants import Emojis  NUMBERS = list(Emojis.number_emojis.values()) @@ -21,13 +22,13 @@ class Game:      """A Connect 4 Game."""      def __init__( -            self, -            bot: commands.Bot, -            channel: discord.TextChannel, -            player1: discord.Member, -            player2: typing.Optional[discord.Member], -            tokens: typing.List[str], -            size: int = 7 +        self, +        bot: Bot, +        channel: discord.TextChannel, +        player1: discord.Member, +        player2: typing.Optional[discord.Member], +        tokens: typing.List[str], +        size: int = 7      ) -> None:          self.bot = bot @@ -54,8 +55,8 @@ class Game:      async def print_grid(self) -> None:          """Formats and outputs the Connect Four grid to the channel."""          title = ( -            f'Connect 4: {self.player1.display_name}' -            f' VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}' +            f"Connect 4: {self.player1.display_name}" +            f" VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}"          )          rows = [" ".join(self.tokens[s] for s in row) for row in self.grid] @@ -66,7 +67,7 @@ class Game:          if self.message:              await self.message.edit(embed=embed)          else: -            self.message = await self.channel.send(content='Loading...') +            self.message = await self.channel.send(content="Loading...")              for emoji in self.unicode_numbers:                  await self.message.add_reaction(emoji)              await self.message.add_reaction(CROSS_EMOJI) @@ -180,7 +181,7 @@ class Game:  class AI:      """The Computer Player for Single-Player games.""" -    def __init__(self, bot: commands.Bot, game: Game) -> None: +    def __init__(self, bot: Bot, game: Game) -> None:          self.game = game          self.mention = bot.user.mention @@ -255,7 +256,7 @@ class AI:  class ConnectFour(commands.Cog):      """Connect Four. The Classic Vertical Four-in-a-row Game!""" -    def __init__(self, bot: commands.Bot) -> None: +    def __init__(self, bot: Bot) -> None:          self.bot = bot          self.games: typing.List[Game] = []          self.waiting: typing.List[discord.Member] = [] @@ -276,27 +277,29 @@ class ConnectFour(commands.Cog):              return False          if not self.min_board_size <= board_size <= self.max_board_size: -            await ctx.send(f"{board_size} is not a valid board size. A valid board size is " -                           f"between `{self.min_board_size}` and `{self.max_board_size}`.") +            await ctx.send( +                f"{board_size} is not a valid board size. A valid board size is " +                f"between `{self.min_board_size}` and `{self.max_board_size}`." +            )              return False          return True      def get_player( -            self, -            ctx: commands.Context, -            announcement: discord.Message, -            reaction: discord.Reaction, -            user: discord.Member +        self, +        ctx: commands.Context, +        announcement: discord.Message, +        reaction: discord.Reaction, +        user: discord.Member      ) -> bool:          """Predicate checking the criteria for the announcement message."""          if self.already_playing(ctx.author):  # If they've joined a game since requesting a player 2              return True  # Is dealt with later on          if ( -                user.id not in (ctx.me.id, ctx.author.id) -                and str(reaction.emoji) == Emojis.hand_raised -                and reaction.message.id == announcement.id +            user.id not in (ctx.me.id, ctx.author.id) +            and str(reaction.emoji) == Emojis.hand_raised +            and reaction.message.id == announcement.id          ):              if self.already_playing(user):                  self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) @@ -313,9 +316,9 @@ class ConnectFour(commands.Cog):              return True          if ( -                user.id == ctx.author.id -                and str(reaction.emoji) == CROSS_EMOJI -                and reaction.message.id == announcement.id +            user.id == ctx.author.id +            and str(reaction.emoji) == CROSS_EMOJI +            and reaction.message.id == announcement.id          ):              return True          return False @@ -326,7 +329,7 @@ class ConnectFour(commands.Cog):      @staticmethod      def check_emojis( -            e1: EMOJI_CHECK, e2: EMOJI_CHECK +        e1: EMOJI_CHECK, e2: EMOJI_CHECK      ) -> typing.Tuple[bool, typing.Optional[str]]:          """Validate the emojis, the user put."""          if isinstance(e1, str) and emojis.count(e1) != 1: @@ -336,12 +339,12 @@ class ConnectFour(commands.Cog):          return True, None      async def _play_game( -            self, -            ctx: commands.Context, -            user: typing.Optional[discord.Member], -            board_size: int, -            emoji1: str, -            emoji2: str +        self, +        ctx: commands.Context, +        user: typing.Optional[discord.Member], +        board_size: int, +        emoji1: str, +        emoji2: str      ) -> None:          """Helper for playing a game of connect four."""          self.tokens = [":white_circle:", str(emoji1), str(emoji2)] @@ -354,7 +357,7 @@ class ConnectFour(commands.Cog):              self.games.remove(game)          except Exception:              # End the game in the event of an unforeseen error so the players aren't stuck in a game -            await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed") +            await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed.")              if game in self.games:                  self.games.remove(game)              raise @@ -362,15 +365,15 @@ class ConnectFour(commands.Cog):      @guild_only()      @commands.group(          invoke_without_command=True, -        aliases=["4inarow", "connect4", "connectfour", "c4"], +        aliases=("4inarow", "connect4", "connectfour", "c4"),          case_insensitive=True      )      async def connect_four( -            self, -            ctx: commands.Context, -            board_size: int = 7, -            emoji1: EMOJI_CHECK = "\U0001f535", -            emoji2: EMOJI_CHECK = "\U0001f534" +        self, +        ctx: commands.Context, +        board_size: int = 7, +        emoji1: EMOJI_CHECK = "\U0001f535", +        emoji2: EMOJI_CHECK = "\U0001f534"      ) -> None:          """          Play the classic game of Connect Four with someone! @@ -425,13 +428,13 @@ class ConnectFour(commands.Cog):          await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2))      @guild_only() -    @connect_four.command(aliases=["bot", "computer", "cpu"]) +    @connect_four.command(aliases=("bot", "computer", "cpu"))      async def ai( -            self, -            ctx: commands.Context, -            board_size: int = 7, -            emoji1: EMOJI_CHECK = "\U0001f535", -            emoji2: EMOJI_CHECK = "\U0001f534" +        self, +        ctx: commands.Context, +        board_size: int = 7, +        emoji1: EMOJI_CHECK = "\U0001f535", +        emoji2: EMOJI_CHECK = "\U0001f534"      ) -> None:          """Play Connect Four against a computer player."""          check, emoji = self.check_emojis(emoji1, emoji2) @@ -445,6 +448,6 @@ class ConnectFour(commands.Cog):          await self._play_game(ctx, None, board_size, str(emoji1), str(emoji2)) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load ConnectFour Cog."""      bot.add_cog(ConnectFour(bot)) diff --git a/bot/exts/evergreen/conversationstarters.py b/bot/exts/evergreen/conversationstarters.py index e7058961..fdc4467a 100644 --- a/bot/exts/evergreen/conversationstarters.py +++ b/bot/exts/evergreen/conversationstarters.py @@ -4,11 +4,12 @@ import yaml  from discord import Color, Embed  from discord.ext import commands +from bot.bot import Bot  from bot.constants import WHITELISTED_CHANNELS  from bot.utils.decorators import whitelist_override  from bot.utils.randomization import RandomCycle -SUGGESTION_FORM = 'https://forms.gle/zw6kkJqv8U43Nfjg9' +SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9"  with Path("bot/resources/evergreen/starter.yaml").open("r", encoding="utf8") as f:      STARTERS = yaml.load(f, Loader=yaml.FullLoader) @@ -24,9 +25,9 @@ with Path("bot/resources/evergreen/py_topics.yaml").open("r", encoding="utf8") a      ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS)  # Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions. -ALL_TOPICS = {'default': STARTERS, **PY_TOPICS} +ALL_TOPICS = {"default": STARTERS, **PY_TOPICS}  TOPICS = { -    channel: RandomCycle(topics or ['No topics found for this channel.']) +    channel: RandomCycle(topics or ["No topics found for this channel."])      for channel, topics in ALL_TOPICS.items()  } @@ -34,9 +35,6 @@ TOPICS = {  class ConvoStarters(commands.Cog):      """Evergreen conversation topics.""" -    def __init__(self, bot: commands.Bot): -        self.bot = bot -      @commands.command()      @whitelist_override(channels=ALL_ALLOWED_CHANNELS)      async def topic(self, ctx: commands.Context) -> None: @@ -48,7 +46,7 @@ class ConvoStarters(commands.Cog):          Otherwise, a random conversation topic will be received by the user.          """          # No matter what, the form will be shown. -        embed = Embed(description=f'Suggest more topics [here]({SUGGESTION_FORM})!', color=Color.blurple()) +        embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple())          try:              # Fetching topics. @@ -56,16 +54,16 @@ class ConvoStarters(commands.Cog):          # If the channel isn't Python-related.          except KeyError: -            embed.title = f'**{next(TOPICS["default"])}**' +            embed.title = f"**{next(TOPICS['default'])}**"          # If the channel ID doesn't have any topics.          else: -            embed.title = f'**{next(channel_topics)}**' +            embed.title = f"**{next(channel_topics)}**"          finally:              await ctx.send(embed=embed) -def setup(bot: commands.Bot) -> None: -    """Conversation starters Cog load.""" -    bot.add_cog(ConvoStarters(bot)) +def setup(bot: Bot) -> None: +    """Load the ConvoStarters cog.""" +    bot.add_cog(ConvoStarters()) diff --git a/bot/exts/evergreen/duck_game.py b/bot/exts/evergreen/duck_game.py new file mode 100644 index 00000000..51e7a98a --- /dev/null +++ b/bot/exts/evergreen/duck_game.py @@ -0,0 +1,356 @@ +import asyncio +import random +import re +from collections import defaultdict +from io import BytesIO +from itertools import product +from pathlib import Path +from urllib.parse import urlparse + +import discord +from PIL import Image, ImageDraw, ImageFont +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, MODERATION_ROLES +from bot.utils.decorators import with_role + + +DECK = list(product(*[(0, 1, 2)]*4)) + +GAME_DURATION = 180 + +# Scoring +CORRECT_SOLN = 1 +INCORRECT_SOLN = -1 +CORRECT_GOOSE = 2 +INCORRECT_GOOSE = -1 + +# Distribution of minimum acceptable solutions at board generation. +# This is for gameplay reasons, to shift the number of solutions per board up, +# while still making the end of the game unpredictable. +# Note: this is *not* the same as the distribution of number of solutions. + +SOLN_DISTR = 0, 0.05, 0.05, 0.1, 0.15, 0.25, 0.2, 0.15, .05 + +IMAGE_PATH = Path("bot", "resources", "evergreen", "all_cards.png") +FONT_PATH = Path("bot", "resources", "evergreen", "LuckiestGuy-Regular.ttf") +HELP_IMAGE_PATH = Path("bot", "resources", "evergreen", "ducks_help_ex.png") + +ALL_CARDS = Image.open(IMAGE_PATH) +LABEL_FONT = ImageFont.truetype(str(FONT_PATH), size=16) +CARD_WIDTH = 155 +CARD_HEIGHT = 97 + +EMOJI_WRONG = "\u274C" + +ANSWER_REGEX = re.compile(r'^\D*(\d+)\D+(\d+)\D+(\d+)\D*$') + +HELP_TEXT = """ +**Each card has 4 features** +Color, Number, Hat, and Accessory + +**A valid flight** +3 cards where each feature is either all the same or all different + +**Call "GOOSE"** +if you think there are no more flights + +**+1** for each valid flight +**+2** for a correct "GOOSE" call +**-1** for any wrong answer + +The first flight below is invalid: the first card has swords while the other two have no accessory.\ + It would be valid if the first card was empty-handed, or one of the other two had paintbrushes. + +The second flight is valid because there are no 2:1 splits; each feature is either all the same or all different. +""" + + +def assemble_board_image(board: list[tuple[int]], rows: int, columns: int) -> Image: +    """Cut and paste images representing the given cards into an image representing the board.""" +    new_im = Image.new("RGBA", (CARD_WIDTH*columns, CARD_HEIGHT*rows)) +    draw = ImageDraw.Draw(new_im) +    for idx, card in enumerate(board): +        card_image = get_card_image(card) +        row, col = divmod(idx, columns) +        top, left = row * CARD_HEIGHT, col * CARD_WIDTH +        new_im.paste(card_image, (left, top)) +        draw.text( +            xy=(left+5, top+5),  # magic numbers are buffers for the card labels +            text=str(idx), +            fill=(0, 0, 0), +            font=LABEL_FONT, +        ) +    return new_im + + +def get_card_image(card: tuple[int]) -> Image: +    """Slice the image containing all the cards to get just this card.""" +    # The master card image file should have 9x9 cards, +    # arranged such that their features can be interpreted as ordered trinary. +    row, col = divmod(as_trinary(card), 9) +    x1 = col * CARD_WIDTH +    x2 = x1 + CARD_WIDTH +    y1 = row * CARD_HEIGHT +    y2 = y1 + CARD_HEIGHT +    return ALL_CARDS.crop((x1, y1, x2, y2)) + + +def as_trinary(card: tuple[int]) -> int: +    """Find the card's unique index by interpreting its features as trinary.""" +    return int(''.join(str(x) for x in card), base=3) + + +class DuckGame: +    """A class for a single game.""" + +    def __init__( +        self, +        rows: int = 4, +        columns: int = 3, +        minimum_solutions: int = 1, +    ) -> None: +        """ +        Take samples from the deck to generate a board. + +        Args: +            rows (int, optional): Rows in the game board. Defaults to 4. +            columns (int, optional): Columns in the game board. Defaults to 3. +            minimum_solutions (int, optional): Minimum acceptable number of solutions in the board. Defaults to 1. +        """ +        self.rows = rows +        self.columns = columns +        size = rows * columns + +        self._solutions = None +        self.claimed_answers = {} +        self.scores = defaultdict(int) +        self.editing_embed = asyncio.Lock() + +        self.board = random.sample(DECK, size) +        while len(self.solutions) < minimum_solutions: +            self.board = random.sample(DECK, size) + +    @property +    def board(self) -> list[tuple[int]]: +        """Accesses board property.""" +        return self._board + +    @board.setter +    def board(self, val: list[tuple[int]]) -> None: +        """Erases calculated solutions if the board changes.""" +        self._solutions = None +        self._board = val + +    @property +    def solutions(self) -> None: +        """Calculate valid solutions and cache to avoid redoing work.""" +        if self._solutions is None: +            self._solutions = set() +            for idx_a, card_a in enumerate(self.board): +                for idx_b, card_b in enumerate(self.board[idx_a+1:], start=idx_a+1): +                    # Two points determine a line, and there are exactly 3 points per line in {0,1,2}^4. +                    # The completion of a line will only be a duplicate point if the other two points are the same, +                    # which is prevented by the triangle iteration. +                    completion = tuple( +                        feat_a if feat_a == feat_b else 3-feat_a-feat_b +                        for feat_a, feat_b in zip(card_a, card_b) +                    ) +                    try: +                        idx_c = self.board.index(completion) +                    except ValueError: +                        continue + +                    # Indices within the solution are sorted to detect duplicate solutions modulo order. +                    solution = tuple(sorted((idx_a, idx_b, idx_c))) +                    self._solutions.add(solution) + +        return self._solutions + + +class DuckGamesDirector(commands.Cog): +    """A cog for running Duck Duck Duck Goose games.""" + +    def __init__(self, bot: Bot) -> None: +        self.bot = bot +        self.current_games = {} + +    @commands.group( +        name='duckduckduckgoose', +        aliases=['dddg', 'ddg', 'duckduckgoose', 'duckgoose'], +        invoke_without_command=True +    ) +    @commands.cooldown(rate=1, per=2, type=commands.BucketType.channel) +    async def start_game(self, ctx: commands.Context) -> None: +        """Generate a board, send the game embed, and end the game after a time limit.""" +        if ctx.channel.id in self.current_games: +            await ctx.send("There's already a game running!") +            return + +        minimum_solutions, = random.choices(range(len(SOLN_DISTR)), weights=SOLN_DISTR) +        game = DuckGame(minimum_solutions=minimum_solutions) +        game.running = True +        self.current_games[ctx.channel.id] = game + +        game.embed_msg = await self.send_board_embed(ctx, game) +        await asyncio.sleep(GAME_DURATION) + +        # Checking for the channel ID in the currently running games is not sufficient. +        # The game could have been ended by a player, and a new game already started in the same channel. +        if game.running: +            try: +                del self.current_games[ctx.channel.id] +                await self.end_game(ctx.channel, game, end_message="Time's up!") +            except KeyError: +                pass + +    @commands.Cog.listener() +    async def on_message(self, msg: discord.Message) -> None: +        """Listen for messages and process them as answers if appropriate.""" +        if msg.author.bot: +            return + +        channel = msg.channel +        if channel.id not in self.current_games: +            return + +        game = self.current_games[channel.id] +        if msg.content.strip().lower() == 'goose': +            # If all of the solutions have been claimed, i.e. the "goose" call is correct. +            if len(game.solutions) == len(game.claimed_answers): +                try: +                    del self.current_games[channel.id] +                    game.scores[msg.author] += CORRECT_GOOSE +                    await self.end_game(channel, game, end_message=f"{msg.author.display_name} GOOSED!") +                except KeyError: +                    pass +            else: +                await msg.add_reaction(EMOJI_WRONG) +                game.scores[msg.author] += INCORRECT_GOOSE +            return + +        # Valid answers contain 3 numbers. +        if not (match := re.match(ANSWER_REGEX, msg.content)): +            return +        answer = tuple(sorted(int(m) for m in match.groups())) + +        # Be forgiving for answers that use indices not on the board. +        if not all(0 <= n < len(game.board) for n in answer): +            return + +        # Also be forgiving for answers that have already been claimed (and avoid penalizing for racing conditions). +        if answer in game.claimed_answers: +            return + +        if answer in game.solutions: +            game.claimed_answers[answer] = msg.author +            game.scores[msg.author] += CORRECT_SOLN +            await self.display_claimed_answer(game, msg.author, answer) +        else: +            await msg.add_reaction(EMOJI_WRONG) +            game.scores[msg.author] += INCORRECT_SOLN + +    async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message: +        """Create and send the initial game embed. This will be edited as the game goes on.""" +        image = assemble_board_image(game.board, game.rows, game.columns) +        with BytesIO() as image_stream: +            image.save(image_stream, format="png") +            image_stream.seek(0) +            file = discord.File(fp=image_stream, filename="board.png") +        embed = discord.Embed( +            title="Duck Duck Duck Goose!", +            color=Colours.bright_green, +            footer="" +        ) +        embed.set_image(url="attachment://board.png") +        return await ctx.send(embed=embed, file=file) + +    async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None: +        """Add a claimed answer to the game embed.""" +        async with game.editing_embed: +            game_embed, = game.embed_msg.embeds +            old_footer = game_embed.footer.text +            if old_footer == discord.Embed.Empty: +                old_footer = "" +            game_embed.set_footer(text=f"{old_footer}\n{str(answer):12s}  -  {author.display_name}") +            await self.edit_embed_with_image(game.embed_msg, game_embed) + +    async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None: +        """Edit the game embed to reflect the end of the game and mark the game as not running.""" +        game.running = False + +        scoreboard_embed = discord.Embed( +            title=end_message, +            color=discord.Color.dark_purple(), +        ) +        scores = sorted( +            game.scores.items(), +            key=lambda item: item[1], +            reverse=True, +        ) +        scoreboard = "Final scores:\n\n" +        scoreboard += "\n".join(f"{member.display_name}: {score}" for member, score in scores) +        scoreboard_embed.description = scoreboard +        await channel.send(embed=scoreboard_embed) + +        missed = [ans for ans in game.solutions if ans not in game.claimed_answers] +        if missed: +            missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed) +        else: +            missed_text = "All the flights were found!" + +        game_embed, = game.embed_msg.embeds +        old_footer = game_embed.footer.text +        if old_footer == discord.Embed.Empty: +            old_footer = "" +        embed_as_dict = game_embed.to_dict()  # Cannot set embed color after initialization +        embed_as_dict["color"] = discord.Color.red().value +        game_embed = discord.Embed.from_dict(embed_as_dict) +        game_embed.set_footer( +            text=f"{old_footer.rstrip()}\n\n{missed_text}" +        ) +        await self.edit_embed_with_image(game.embed_msg, game_embed) + +    @start_game.command(name="help") +    async def show_rules(self, ctx: commands.Context) -> None: +        """Explain the rules of the game.""" +        await self.send_help_embed(ctx) + +    @start_game.command(name="stop") +    @with_role(*MODERATION_ROLES) +    async def stop_game(self, ctx: commands.Context) -> None: +        """Stop a currently running game. Only available to mods.""" +        try: +            game = self.current_games.pop(ctx.channel.id) +        except KeyError: +            await ctx.send("No game currently running in this channel") +            return +        await self.end_game(ctx.channel, game, end_message="Game canceled.") + +    @staticmethod +    async def send_help_embed(ctx: commands.Context) -> discord.Message: +        """Send rules embed.""" +        embed = discord.Embed( +            title="Compete against other players to find valid flights!", +            color=discord.Color.dark_purple(), +        ) +        embed.description = HELP_TEXT +        file = discord.File(HELP_IMAGE_PATH, filename="help.png") +        embed.set_image(url="attachment://help.png") +        embed.set_footer( +            text="Tip: using Discord's compact message display mode can help keep the board on the screen" +        ) +        return await ctx.send(file=file, embed=embed) + +    @staticmethod +    async def edit_embed_with_image(msg: discord.Message, embed: discord.Embed) -> None: +        """Edit an embed without the attached image going wonky.""" +        attach_name = urlparse(embed.image.url).path.split("/")[-1] +        embed.set_image(url=f"attachment://{attach_name}") +        await msg.edit(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the DuckGamesDirector cog.""" +    bot.add_cog(DuckGamesDirector(bot)) diff --git a/bot/exts/evergreen/emoji.py b/bot/exts/evergreen/emoji.py new file mode 100644 index 00000000..11615214 --- /dev/null +++ b/bot/exts/evergreen/emoji.py @@ -0,0 +1,123 @@ +import logging +import random +import textwrap +from collections import defaultdict +from datetime import datetime +from typing import List, Optional, Tuple + +from discord import Color, Embed, Emoji +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, ERROR_REPLIES +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import LinePaginator +from bot.utils.time import time_since + +log = logging.getLogger(__name__) + + +class Emojis(commands.Cog): +    """A collection of commands related to emojis in the server.""" + +    @staticmethod +    def embed_builder(emoji: dict) -> Tuple[Embed, List[str]]: +        """Generates an embed with the emoji names and count.""" +        embed = Embed( +            color=Colours.orange, +            title="Emoji Count", +            timestamp=datetime.utcnow() +        ) +        msg = [] + +        if len(emoji) == 1: +            for category_name, category_emojis in emoji.items(): +                if len(category_emojis) == 1: +                    msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.") +                else: +                    msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.") +                embed.set_thumbnail(url=random.choice(category_emojis).url) + +        else: +            for category_name, category_emojis in emoji.items(): +                emoji_choice = random.choice(category_emojis) +                if len(category_emojis) > 1: +                    emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category." +                else: +                    emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category." +                if emoji_choice.animated: +                    msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") +                else: +                    msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") +        return embed, msg + +    @staticmethod +    def generate_invalid_embed(emojis: list) -> Tuple[Embed, List[str]]: +        """Generates error embed for invalid emoji categories.""" +        embed = Embed( +            color=Colours.soft_red, +            title=random.choice(ERROR_REPLIES) +        ) +        msg = [] + +        emoji_dict = defaultdict(list) +        for emoji in emojis: +            emoji_dict[emoji.name.split("_")[0]].append(emoji) + +        error_comp = ", ".join(emoji_dict) +        msg.append(f"These are the valid emoji categories:\n```{error_comp}```") +        return embed, msg + +    @commands.group(name="emoji", invoke_without_command=True) +    async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None: +        """A group of commands related to emojis.""" +        if emoji is not None: +            await ctx.invoke(self.info_command, emoji) +        else: +            await invoke_help_command(ctx) + +    @emoji_group.command(name="count", aliases=("c",)) +    async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None: +        """Returns embed with emoji category and info given by the user.""" +        emoji_dict = defaultdict(list) + +        if not ctx.guild.emojis: +            await ctx.send("No emojis found.") +            return +        log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.") +        for emoji in ctx.guild.emojis: +            emoji_category = emoji.name.split("_")[0] + +            if category_query is not None and emoji_category not in category_query: +                continue + +            emoji_dict[emoji_category].append(emoji) + +        if not emoji_dict: +            log.trace("Invalid name provided by the user") +            embed, msg = self.generate_invalid_embed(ctx.guild.emojis) +        else: +            embed, msg = self.embed_builder(emoji_dict) +        await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) + +    @emoji_group.command(name="info", aliases=("i",)) +    async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None: +        """Returns relevant information about a Discord Emoji.""" +        emoji_information = Embed( +            title=f"Emoji Information: {emoji.name}", +            description=textwrap.dedent(f""" +                **Name:** {emoji.name} +                **Created:** {time_since(emoji.created_at, precision="hours")} +                **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} +                **ID:** {emoji.id} +            """), +            color=Color.blurple(), +            url=str(emoji.url), +        ).set_thumbnail(url=emoji.url) + +        await ctx.send(embed=emoji_information) + + +def setup(bot: Bot) -> None: +    """Load the Emojis cog.""" +    bot.add_cog(Emojis()) diff --git a/bot/exts/evergreen/emoji_count.py b/bot/exts/evergreen/emoji_count.py deleted file mode 100644 index cc43e9ab..00000000 --- a/bot/exts/evergreen/emoji_count.py +++ /dev/null @@ -1,97 +0,0 @@ -import datetime -import logging -import random -from collections import defaultdict -from typing import List, Tuple - -import discord -from discord.ext import commands - -from bot.constants import Colours, ERROR_REPLIES -from bot.utils.pagination import LinePaginator - -log = logging.getLogger(__name__) - - -class EmojiCount(commands.Cog): -    """Command that give random emoji based on category.""" - -    def __init__(self, bot: commands.Bot): -        self.bot = bot - -    @staticmethod -    def embed_builder(emoji: dict) -> Tuple[discord.Embed, List[str]]: -        """Generates an embed with the emoji names and count.""" -        embed = discord.Embed( -            color=Colours.orange, -            title="Emoji Count", -            timestamp=datetime.datetime.utcnow() -        ) -        msg = [] - -        if len(emoji) == 1: -            for category_name, category_emojis in emoji.items(): -                if len(category_emojis) == 1: -                    msg.append(f"There is **{len(category_emojis)}** emoji in **{category_name}** category") -                else: -                    msg.append(f"There are **{len(category_emojis)}** emojis in **{category_name}** category") -                embed.set_thumbnail(url=random.choice(category_emojis).url) - -        else: -            for category_name, category_emojis in emoji.items(): -                emoji_choice = random.choice(category_emojis) -                if len(category_emojis) > 1: -                    emoji_info = f"There are **{len(category_emojis)}** emojis in **{category_name}** category" -                else: -                    emoji_info = f"There is **{len(category_emojis)}** emoji in **{category_name}** category" -                if emoji_choice.animated: -                    msg.append(f'<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') -                else: -                    msg.append(f'<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') -        return embed, msg - -    @staticmethod -    def generate_invalid_embed(emojis: list) -> Tuple[discord.Embed, List[str]]: -        """Generates error embed.""" -        embed = discord.Embed( -            color=Colours.soft_red, -            title=random.choice(ERROR_REPLIES) -        ) -        msg = [] - -        emoji_dict = defaultdict(list) -        for emoji in emojis: -            emoji_dict[emoji.name.split("_")[0]].append(emoji) - -        error_comp = ', '.join(emoji_dict) -        msg.append(f"These are the valid categories\n```{error_comp}```") -        return embed, msg - -    @commands.command(name="emojicount", aliases=["ec", "emojis"]) -    async def emoji_count(self, ctx: commands.Context, *, category_query: str = None) -> None: -        """Returns embed with emoji category and info given by the user.""" -        emoji_dict = defaultdict(list) - -        if not ctx.guild.emojis: -            await ctx.send("No emojis found.") -            return -        log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user") -        for emoji in ctx.guild.emojis: -            emoji_category = emoji.name.split("_")[0] - -            if category_query is not None and emoji_category not in category_query: -                continue - -            emoji_dict[emoji_category].append(emoji) - -        if not emoji_dict: -            log.trace("Invalid name provided by the user") -            embed, msg = self.generate_invalid_embed(ctx.guild.emojis) -        else: -            embed, msg = self.embed_builder(emoji_dict) -        await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) - - -def setup(bot: commands.Bot) -> None: -    """Emoji Count Cog load.""" -    bot.add_cog(EmojiCount(bot)) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 28902503..a280c725 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -1,3 +1,4 @@ +import difflib  import logging  import math  import random @@ -7,17 +8,21 @@ from discord import Embed, Message  from discord.ext import commands  from sentry_sdk import push_scope -from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES +from bot.bot import Bot +from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput  from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import UserNotPlayingError +from bot.utils.exceptions import APIError, UserNotPlayingError  log = logging.getLogger(__name__) +QUESTION_MARK_ICON = "https://cdn.discordapp.com/emojis/512367613339369475.png" + +  class CommandErrorHandler(commands.Cog):      """A error handler for the PythonDiscord server.""" -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot) -> None:          self.bot = bot      @staticmethod @@ -41,12 +46,17 @@ class CommandErrorHandler(commands.Cog):      @commands.Cog.listener()      async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None: -        """Activates when a command opens an error.""" -        if getattr(error, 'handled', False): +        """Activates when a command raises an error.""" +        if getattr(error, "handled", False):              logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.")              return -        error = getattr(error, 'original', error) +        parent_command = "" +        if subctx := getattr(ctx, "subcontext", None): +            parent_command = f"{ctx.command} " +            ctx = subctx + +        error = getattr(error, "original", error)          logging.debug(              f"Error Encountered: {type(error).__name__} - {str(error)}, "              f"Command: {ctx.command}, " @@ -55,6 +65,7 @@ class CommandErrorHandler(commands.Cog):          )          if isinstance(error, commands.CommandNotFound): +            await self.send_command_suggestion(ctx, ctx.invoked_with)              return          if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): @@ -63,8 +74,9 @@ class CommandErrorHandler(commands.Cog):          if isinstance(error, commands.UserInputError):              self.revert_cooldown_counter(ctx.command, ctx.message) +            usage = f"```{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}```"              embed = self.error_embed( -                f"Your input was invalid: {error}\n\nUsage:\n```{ctx.prefix}{ctx.command} {ctx.command.signature}```" +                f"Your input was invalid: {error}\n\nUsage:{usage}"              )              await ctx.send(embed=embed)              return @@ -95,7 +107,7 @@ class CommandErrorHandler(commands.Cog):              self.revert_cooldown_counter(ctx.command, ctx.message)              embed = self.error_embed(                  "The argument you provided was invalid: " -                f"{error}\n\nUsage:\n```{ctx.prefix}{ctx.command} {ctx.command.signature}```" +                f"{error}\n\nUsage:\n```{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}```"              )              await ctx.send(embed=embed)              return @@ -108,6 +120,15 @@ class CommandErrorHandler(commands.Cog):              await ctx.send("Game not found.")              return +        if isinstance(error, APIError): +            await ctx.send( +                embed=self.error_embed( +                    f"There was an error when communicating with the {error.api}", +                    NEGATIVE_REPLIES +                ) +            ) +            return +          with push_scope() as scope:              scope.user = {                  "id": ctx.author.id, @@ -121,14 +142,40 @@ class CommandErrorHandler(commands.Cog):              scope.set_extra("full_message", ctx.message.content)              if ctx.guild is not None: -                scope.set_extra( -                    "jump_to", -                    f"https://discordapp.com/channels/{ctx.guild.id}/{ctx.channel.id}/{ctx.message.id}" -                ) +                scope.set_extra("jump_to", ctx.message.jump_url)              log.exception(f"Unhandled command error: {str(error)}", exc_info=error) - -def setup(bot: commands.Bot) -> None: -    """Error handler Cog load.""" +    async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None: +        """Sends user similar commands if any can be found.""" +        raw_commands = [] +        for cmd in self.bot.walk_commands(): +            if not cmd.hidden: +                raw_commands += (cmd.name, *cmd.aliases) +        if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1): +            similar_command_name = similar_command_data[0] +            similar_command = self.bot.get_command(similar_command_name) + +            if not similar_command: +                return + +            log_msg = "Cancelling attempt to suggest a command due to failed checks." +            try: +                if not await similar_command.can_run(ctx): +                    log.debug(log_msg) +                    return +            except commands.errors.CommandError as cmd_error: +                log.debug(log_msg) +                await self.on_command_error(ctx, cmd_error) +                return + +            misspelled_content = ctx.message.content +            e = Embed() +            e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON) +            e.description = misspelled_content.replace(command_name, similar_command_name, 1) +            await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay) + + +def setup(bot: Bot) -> None: +    """Load the ErrorHandler cog."""      bot.add_cog(CommandErrorHandler(bot)) diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py index 101725da..3b266e1b 100644 --- a/bot/exts/evergreen/fun.py +++ b/bot/exts/evergreen/fun.py @@ -7,10 +7,12 @@ from typing import Callable, Iterable, Tuple, Union  from discord import Embed, Message  from discord.ext import commands -from discord.ext.commands import BadArgument, Bot, Cog, Context, MessageConverter, clean_content +from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content  from bot import utils +from bot.bot import Bot  from bot.constants import Client, Colours, Emojis +from bot.utils import helpers  log = logging.getLogger(__name__) @@ -54,8 +56,7 @@ class Fun(Cog):      def __init__(self, bot: Bot) -> None:          self.bot = bot -        with Path("bot/resources/evergreen/caesar_info.json").open("r", encoding="UTF-8") as f: -            self._caesar_cipher_embed = json.load(f) +        self._caesar_cipher_embed = json.loads(Path("bot/resources/evergreen/caesar_info.json").read_text("UTF-8"))      @staticmethod      def _get_random_die() -> str: @@ -83,6 +84,7 @@ class Fun(Cog):          if embed is not None:              embed = Fun._convert_embed(conversion_func, embed)          converted_text = conversion_func(text) +        converted_text = helpers.suppress_links(converted_text)          # Don't put >>> if only embed present          if converted_text:              converted_text = f">>> {converted_text.lstrip('> ')}" @@ -101,6 +103,7 @@ class Fun(Cog):          if embed is not None:              embed = Fun._convert_embed(conversion_func, embed)          converted_text = conversion_func(text) +        converted_text = helpers.suppress_links(converted_text)          # Don't put >>> if only embed present          if converted_text:              converted_text = f">>> {converted_text.lstrip('> ')}" @@ -239,6 +242,6 @@ class Fun(Cog):          return Embed.from_dict(embed_dict) -def setup(bot: commands.Bot) -> None: -    """Fun Cog load.""" +def setup(bot: Bot) -> None: +    """Load the Fun cog."""      bot.add_cog(Fun(bot)) diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py index d37be0e2..32fe9263 100644 --- a/bot/exts/evergreen/game.py +++ b/bot/exts/evergreen/game.py @@ -15,6 +15,7 @@ from discord.ext.commands import Cog, Context, group  from bot.bot import Bot  from bot.constants import STAFF_ROLES, Tokens  from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command  from bot.utils.pagination import ImagePaginator, LinePaginator  # Base URL of IGDB API @@ -175,7 +176,7 @@ class Games(Cog):                              "Invalid OAuth credentials. Unloading Games cog. "                              f"OAuth response message: {result['message']}"                          ) -                        self.bot.remove_cog('Games') +                        self.bot.remove_cog("Games")                      return @@ -223,8 +224,8 @@ class Games(Cog):              else:                  self.genres[genre_name] = genre -    @group(name="games", aliases=["game"], invoke_without_command=True) -    async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str] = None) -> None: +    @group(name="games", aliases=("game",), invoke_without_command=True) +    async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None:          """          Get random game(s) by genre from IGDB. Use .games genres command to get all available genres. @@ -234,7 +235,7 @@ class Games(Cog):          """          # When user didn't specified genre, send help message          if genre is None: -            await ctx.send_help("games") +            await invoke_help_command(ctx)              return          # Capitalize genre for check @@ -276,7 +277,7 @@ class Games(Cog):          await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games")) -    @games.command(name="top", aliases=["t"]) +    @games.command(name="top", aliases=("t",))      async def top(self, ctx: Context, amount: int = 10) -> None:          """          Get current Top games in IGDB. @@ -293,19 +294,19 @@ class Games(Cog):          pages = [await self.create_page(game) for game in games]          await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games")) -    @games.command(name="genres", aliases=["genre", "g"]) +    @games.command(name="genres", aliases=("genre", "g"))      async def genres(self, ctx: Context) -> None:          """Get all available genres."""          await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}") -    @games.command(name="search", aliases=["s"]) +    @games.command(name="search", aliases=("s",))      async def search(self, ctx: Context, *, search_term: str) -> None:          """Find games by name."""          lines = await self.search_games(search_term)          await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False) -    @games.command(name="company", aliases=["companies"]) +    @games.command(name="company", aliases=("companies",))      async def company(self, ctx: Context, amount: int = 5) -> None:          """          Get random Game Companies companies from IGDB API. @@ -324,7 +325,7 @@ class Games(Cog):          await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies"))      @with_role(*STAFF_ROLES) -    @games.command(name="refresh", aliases=["r"]) +    @games.command(name="refresh", aliases=("r",))      async def refresh_genres_command(self, ctx: Context) -> None:          """Refresh .games command genres."""          try: @@ -334,13 +335,14 @@ class Games(Cog):              return          await ctx.send("Successfully refreshed genres.") -    async def get_games_list(self, -                             amount: int, -                             genre: Optional[str] = None, -                             sort: Optional[str] = None, -                             additional_body: str = "", -                             offset: int = 0 -                             ) -> List[Dict[str, Any]]: +    async def get_games_list( +        self, +        amount: int, +        genre: Optional[str] = None, +        sort: Optional[str] = None, +        additional_body: str = "", +        offset: int = 0 +    ) -> List[Dict[str, Any]]:          """          Get list of games from IGDB API by parameters that is provided. @@ -372,8 +374,10 @@ class Games(Cog):          release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?"          # Create Age Ratings value -        rating = ", ".join(f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" -                           for age in data["age_ratings"]) if "age_ratings" in data else "?" +        rating = ", ".join( +            f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" +            for age in data["age_ratings"] +        ) if "age_ratings" in data else "?"          companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?" @@ -470,7 +474,7 @@ class Games(Cog):  def setup(bot: Bot) -> None: -    """Add/Load Games cog.""" +    """Load the Games cog."""      # Check does IGDB API key exist, if not, log warning and don't load cog      if not Tokens.igdb_client_id:          logger.warning("No IGDB client ID. Not loading Games cog.") diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py index 2e38e3ab..d29f3aa9 100644 --- a/bot/exts/evergreen/githubinfo.py +++ b/bot/exts/evergreen/githubinfo.py @@ -1,21 +1,24 @@  import logging  import random  from datetime import datetime -from typing import Optional +from urllib.parse import quote, quote_plus  import discord  from discord.ext import commands -from discord.ext.commands.cooldowns import BucketType -from bot.constants import NEGATIVE_REPLIES +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES +from bot.exts.utils.extensions import invoke_help_command  log = logging.getLogger(__name__) +GITHUB_API_URL = "https://api.github.com" +  class GithubInfo(commands.Cog):      """Fetches info from GitHub.""" -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot):          self.bot = bot      async def fetch_data(self, url: str) -> dict: @@ -23,76 +26,153 @@ class GithubInfo(commands.Cog):          async with self.bot.http_session.get(url) as r:              return await r.json() -    @commands.command(name='github', aliases=['gh']) -    @commands.cooldown(1, 60, BucketType.user) -    async def get_github_info(self, ctx: commands.Context, username: Optional[str]) -> None: -        """ -        Fetches a user's GitHub information. - -        Username is optional and sends the help command if not specified. -        """ -        if username is None: -            await ctx.invoke(self.bot.get_command('help'), 'github') -            ctx.command.reset_cooldown(ctx) -            return +    @commands.group(name="github", aliases=("gh", "git")) +    @commands.cooldown(1, 10, commands.BucketType.user) +    async def github_group(self, ctx: commands.Context) -> None: +        """Commands for finding information related to GitHub.""" +        if ctx.invoked_subcommand is None: +            await invoke_help_command(ctx) +    @github_group.command(name="user", aliases=("userinfo",)) +    async def github_user_info(self, ctx: commands.Context, username: str) -> None: +        """Fetches a user's GitHub information."""          async with ctx.typing(): -            user_data = await self.fetch_data(f"https://api.github.com/users/{username}") +            user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")              # User_data will not have a message key if the user exists -            if user_data.get('message') is not None: -                await ctx.send(embed=discord.Embed(title=random.choice(NEGATIVE_REPLIES), -                                                   description=f"The profile for `{username}` was not found.", -                                                   colour=discord.Colour.red())) +            if "message" in user_data: +                embed = discord.Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description=f"The profile for `{username}` was not found.", +                    colour=Colours.soft_red +                ) + +                await ctx.send(embed=embed)                  return -            org_data = await self.fetch_data(user_data['organizations_url']) +            org_data = await self.fetch_data(user_data["organizations_url"])              orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] -            orgs_to_add = ' | '.join(orgs) +            orgs_to_add = " | ".join(orgs) -            gists = user_data['public_gists'] +            gists = user_data["public_gists"]              # Forming blog link -            if user_data['blog'].startswith("http"):  # Blog link is complete -                blog = user_data['blog'] -            elif user_data['blog']:  # Blog exists but the link is not complete +            if user_data["blog"].startswith("http"):  # Blog link is complete +                blog = user_data["blog"] +            elif user_data["blog"]:  # Blog exists but the link is not complete                  blog = f"https://{user_data['blog']}"              else:                  blog = "No website link available"              embed = discord.Embed(                  title=f"`{user_data['login']}`'s GitHub profile info", -                description=f"```{user_data['bio']}```\n" if user_data['bio'] is not None else "", -                colour=0x7289da, -                url=user_data['html_url'], -                timestamp=datetime.strptime(user_data['created_at'], "%Y-%m-%dT%H:%M:%SZ") +                description=f"```{user_data['bio']}```\n" if user_data["bio"] else "", +                colour=discord.Colour.blurple(), +                url=user_data["html_url"], +                timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ")              ) -            embed.set_thumbnail(url=user_data['avatar_url']) +            embed.set_thumbnail(url=user_data["avatar_url"])              embed.set_footer(text="Account created at") -            if user_data['type'] == "User": +            if user_data["type"] == "User": -                embed.add_field(name="Followers", -                                value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)") -                embed.add_field(name="\u200b", value="\u200b") -                embed.add_field(name="Following", -                                value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)") +                embed.add_field( +                    name="Followers", +                    value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)" +                ) +                embed.add_field( +                    name="Following", +                    value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)" +                ) -            embed.add_field(name="Public repos", -                            value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)") -            embed.add_field(name="\u200b", value="\u200b") +            embed.add_field( +                name="Public repos", +                value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)" +            ) -            if user_data['type'] == "User": -                embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{username})") +            if user_data["type"] == "User": +                embed.add_field( +                    name="Gists", +                    value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" +                ) -                embed.add_field(name=f"Organization{'s' if len(orgs)!=1 else ''}", -                                value=orgs_to_add if orgs else "No organizations") -                embed.add_field(name="\u200b", value="\u200b") +                embed.add_field( +                    name=f"Organization{'s' if len(orgs)!=1 else ''}", +                    value=orgs_to_add if orgs else "No organizations." +                )              embed.add_field(name="Website", value=blog)          await ctx.send(embed=embed) +    @github_group.command(name='repository', aliases=('repo',)) +    async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None: +        """ +        Fetches a repositories' GitHub information. + +        The repository should look like `user/reponame` or `user reponame`. +        """ +        repo = "/".join(repo) +        if repo.count("/") != 1: +            embed = discord.Embed( +                title=random.choice(NEGATIVE_REPLIES), +                description="The repository should look like `user/reponame` or `user reponame`.", +                colour=Colours.soft_red +            ) + +            await ctx.send(embed=embed) +            return + +        async with ctx.typing(): +            repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + +            # There won't be a message key if this repo exists +            if "message" in repo_data: +                embed = discord.Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description="The requested repository was not found.", +                    colour=Colours.soft_red +                ) + +                await ctx.send(embed=embed) +                return + +        embed = discord.Embed( +            title=repo_data["name"], +            description=repo_data["description"], +            colour=discord.Colour.blurple(), +            url=repo_data["html_url"] +        ) + +        # If it's a fork, then it will have a parent key +        try: +            parent = repo_data["parent"] +            embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})" +        except KeyError: +            log.debug("Repository is not a fork.") + +        repo_owner = repo_data["owner"] + +        embed.set_author( +            name=repo_owner["login"], +            url=repo_owner["html_url"], +            icon_url=repo_owner["avatar_url"] +        ) + +        repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y") +        last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M") + +        embed.set_footer( +            text=( +                f"{repo_data['forks_count']} ⑂ " +                f"• {repo_data['stargazers_count']} ⭐ " +                f"• Created At {repo_created_at} " +                f"• Last Commit {last_pushed}" +            ) +        ) + +        await ctx.send(embed=embed) + -def setup(bot: commands.Bot) -> None: -    """Adding the cog to the bot.""" +def setup(bot: Bot) -> None: +    """Load the GithubInfo cog."""      bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py index 91147243..bfb5db17 100644 --- a/bot/exts/evergreen/help.py +++ b/bot/exts/evergreen/help.py @@ -2,14 +2,13 @@  import asyncio  import itertools  import logging -from collections import namedtuple  from contextlib import suppress -from typing import Union +from typing import List, NamedTuple, Union  from discord import Colour, Embed, HTTPException, Message, Reaction, User  from discord.ext import commands  from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context -from fuzzywuzzy import fuzz, process +from rapidfuzz import process  from bot import constants  from bot.bot import Bot @@ -22,14 +21,21 @@ from bot.utils.pagination import (  DELETE_EMOJI = Emojis.trashcan  REACTIONS = { -    FIRST_EMOJI: 'first', -    LEFT_EMOJI: 'back', -    RIGHT_EMOJI: 'next', -    LAST_EMOJI: 'end', -    DELETE_EMOJI: 'stop', +    FIRST_EMOJI: "first", +    LEFT_EMOJI: "back", +    RIGHT_EMOJI: "next", +    LAST_EMOJI: "end", +    DELETE_EMOJI: "stop",  } -Cog = namedtuple('Cog', ['name', 'description', 'commands']) + +class Cog(NamedTuple): +    """Show information about a Cog's name, description and commands.""" + +    name: str +    description: str +    commands: List[Command] +  log = logging.getLogger(__name__) @@ -87,7 +93,7 @@ class HelpSession:          # set the query details for the session          if command: -            query_str = ' '.join(command) +            query_str = " ".join(command)              self.query = self._get_query(query_str)              self.description = self.query.description or self.query.help          else: @@ -153,7 +159,7 @@ class HelpSession:          # Combine command and cog names          choices = list(self._bot.all_commands) + list(self._bot.cogs) -        result = process.extractBests(query, choices, scorer=fuzz.ratio, score_cutoff=90) +        result = process.extract(query, choices, score_cutoff=90)          raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) @@ -191,7 +197,7 @@ class HelpSession:          self.reset_timeout()          # Run relevant action method -        action = getattr(self, f'do_{REACTIONS[emoji]}', None) +        action = getattr(self, f"do_{REACTIONS[emoji]}", None)          if action:              await action() @@ -234,11 +240,11 @@ class HelpSession:          if cmd.cog:              try:                  if cmd.cog.category: -                    return f'**{cmd.cog.category}**' +                    return f"**{cmd.cog.category}**"              except AttributeError:                  pass -            return f'**{cmd.cog_name}**' +            return f"**{cmd.cog_name}**"          else:              return "**\u200bNo Category:**" @@ -262,139 +268,143 @@ class HelpSession:                  # if default is not an empty string or None                  if show_default: -                    results.append(f'[{name}={param.default}]') +                    results.append(f"[{name}={param.default}]")                  else: -                    results.append(f'[{name}]') +                    results.append(f"[{name}]")              # if variable length argument              elif param.kind == param.VAR_POSITIONAL: -                results.append(f'[{name}...]') +                results.append(f"[{name}...]")              # if required              else: -                results.append(f'<{name}>') +                results.append(f"<{name}>")          return f"{cmd.name} {' '.join(results)}"      async def build_pages(self) -> None:          """Builds the list of content pages to be paginated through in the help message, as a list of str."""          # Use LinePaginator to restrict embed line height -        paginator = LinePaginator(prefix='', suffix='', max_lines=self._max_lines) - -        prefix = constants.Client.prefix +        paginator = LinePaginator(prefix="", suffix="", max_lines=self._max_lines)          # show signature if query is a command          if isinstance(self.query, commands.Command): -            signature = self._get_command_params(self.query) -            parent = self.query.full_parent_name + ' ' if self.query.parent else '' -            paginator.add_line(f'**```{prefix}{parent}{signature}```**') - -            aliases = ', '.join(f'`{a}`' for a in self.query.aliases) -            if aliases: -                paginator.add_line(f'**Can also use:** {aliases}\n') - -            if not await self.query.can_run(self._ctx): -                paginator.add_line('***You cannot run this command.***\n') +            await self._add_command_signature(paginator)          if isinstance(self.query, Cog): -            paginator.add_line(f'**{self.query.name}**') +            paginator.add_line(f"**{self.query.name}**")          if self.description: -            paginator.add_line(f'*{self.description}*') +            paginator.add_line(f"*{self.description}*")          # list all children commands of the queried object          if isinstance(self.query, (commands.GroupMixin, Cog)): +            await self._list_child_commands(paginator) -            # remove hidden commands if session is not wanting hiddens -            if not self._show_hidden: -                filtered = [c for c in self.query.commands if not c.hidden] -            else: -                filtered = self.query.commands - -            # if after filter there are no commands, finish up -            if not filtered: -                self._pages = paginator.pages -                return - -            if isinstance(self.query, Cog): -                grouped = (('**Commands:**', self.query.commands),) - -            elif isinstance(self.query, commands.Command): -                grouped = (('**Subcommands:**', self.query.commands),) - -                # don't show prefix for subcommands -                prefix = '' +        self._pages = paginator.pages -            # otherwise sort and organise all commands into categories -            else: -                cat_sort = sorted(filtered, key=self._category_key) -                grouped = itertools.groupby(cat_sort, key=self._category_key) +    async def _add_command_signature(self, paginator: LinePaginator) -> None: +        prefix = constants.Client.prefix -            for category, cmds in grouped: -                cmds = sorted(cmds, key=lambda c: c.name) +        signature = self._get_command_params(self.query) +        parent = self.query.full_parent_name + " " if self.query.parent else "" +        paginator.add_line(f"**```{prefix}{parent}{signature}```**") +        aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases] +        aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())] +        aliases = ", ".join(sorted(aliases)) +        if aliases: +            paginator.add_line(f"**Can also use:** {aliases}\n") +        if not await self.query.can_run(self._ctx): +            paginator.add_line("***You cannot run this command.***\n") + +    async def _list_child_commands(self, paginator: LinePaginator) -> None: +        # remove hidden commands if session is not wanting hiddens +        if not self._show_hidden: +            filtered = [c for c in self.query.commands if not c.hidden] +        else: +            filtered = self.query.commands -                if len(cmds) == 0: -                    continue +        # if after filter there are no commands, finish up +        if not filtered: +            self._pages = paginator.pages +            return -                cat_cmds = [] +        if isinstance(self.query, Cog): +            grouped = (("**Commands:**", self.query.commands),) -                for command in cmds: +        elif isinstance(self.query, commands.Command): +            grouped = (("**Subcommands:**", self.query.commands),) -                    # skip if hidden and hide if session is set to -                    if command.hidden and not self._show_hidden: -                        continue +        # otherwise sort and organise all commands into categories +        else: +            cat_sort = sorted(filtered, key=self._category_key) +            grouped = itertools.groupby(cat_sort, key=self._category_key) -                    # see if the user can run the command -                    strikeout = '' +        for category, cmds in grouped: +            await self._format_command_category(paginator, category, list(cmds)) -                    # Patch to make the !help command work outside of #bot-commands again -                    # This probably needs a proper rewrite, but this will make it work in -                    # the mean time. -                    try: -                        can_run = await command.can_run(self._ctx) -                    except CheckFailure: -                        can_run = False +    async def _format_command_category(self, paginator: LinePaginator, category: str, cmds: List[Command]) -> None: +        cmds = sorted(cmds, key=lambda c: c.name) +        cat_cmds = [] +        for command in cmds: +            cat_cmds += await self._format_command(command) -                    if not can_run: -                        # skip if we don't show commands they can't run -                        if self._only_can_run: -                            continue -                        strikeout = '~~' +        # state var for if the category should be added next +        print_cat = 1 +        new_page = True -                    signature = self._get_command_params(command) -                    info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" +        for details in cat_cmds: -                    # handle if the command has no docstring -                    if command.short_doc: -                        cat_cmds.append(f'{info}\n*{command.short_doc}*') -                    else: -                        cat_cmds.append(f'{info}\n*No details provided.*') +            # keep details together, paginating early if it won"t fit +            lines_adding = len(details.split("\n")) + print_cat +            if paginator._linecount + lines_adding > self._max_lines: +                paginator._linecount = 0 +                new_page = True +                paginator.close_page() -                # state var for if the category should be added next +                # new page so print category title again                  print_cat = 1 -                new_page = True -                for details in cat_cmds: +            if print_cat: +                if new_page: +                    paginator.add_line("") +                paginator.add_line(category) +                print_cat = 0 + +            paginator.add_line(details) -                    # keep details together, paginating early if it won't fit -                    lines_adding = len(details.split('\n')) + print_cat -                    if paginator._linecount + lines_adding > self._max_lines: -                        paginator._linecount = 0 -                        new_page = True -                        paginator.close_page() +    async def _format_command(self, command: Command) -> List[str]: +        # skip if hidden and hide if session is set to +        if command.hidden and not self._show_hidden: +            return [] -                        # new page so print category title again -                        print_cat = 1 +        # Patch to make the !help command work outside of #bot-commands again +        # This probably needs a proper rewrite, but this will make it work in +        # the mean time. +        try: +            can_run = await command.can_run(self._ctx) +        except CheckFailure: +            can_run = False + +        # see if the user can run the command +        strikeout = "" +        if not can_run: +            # skip if we don't show commands they can't run +            if self._only_can_run: +                return [] +            strikeout = "~~" -                    if print_cat: -                        if new_page: -                            paginator.add_line('') -                        paginator.add_line(category) -                        print_cat = 0 +        if isinstance(self.query, commands.Command): +            prefix = "" +        else: +            prefix = constants.Client.prefix -                    paginator.add_line(details) +        signature = self._get_command_params(command) +        info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" -        self._pages = paginator.pages +        # handle if the command has no docstring +        short_doc = command.short_doc or "No details provided" +        return [f"{info}\n*{short_doc}*"]      def embed_page(self, page_number: int = 0) -> Embed:          """Returns an Embed with the requested page formatted within.""" @@ -410,7 +420,7 @@ class HelpSession:          page_count = len(self._pages)          if page_count > 1: -            embed.set_footer(text=f'Page {self._current_page+1} / {page_count}') +            embed.set_footer(text=f"Page {self._current_page+1} / {page_count}")          return embed @@ -494,7 +504,7 @@ class HelpSession:  class Help(DiscordCog):      """Custom Embed Pagination Help feature.""" -    @commands.command('help') +    @commands.command("help")      async def new_help(self, ctx: Context, *commands) -> None:          """Shows Command Help."""          try: @@ -505,8 +515,8 @@ class Help(DiscordCog):              embed.title = str(error)              if error.possible_matches: -                matches = '\n'.join(error.possible_matches.keys()) -                embed.description = f'**Did you mean:**\n`{matches}`' +                matches = "\n".join(error.possible_matches.keys()) +                embed.description = f"**Did you mean:**\n`{matches}`"              await ctx.send(embed=embed) @@ -517,7 +527,7 @@ def unload(bot: Bot) -> None:      This is run if the cog raises an exception on load, or if the extension is unloaded.      """ -    bot.remove_command('help') +    bot.remove_command("help")      bot.add_command(bot._old_help) @@ -532,8 +542,8 @@ def setup(bot: Bot) -> None:      If an exception is raised during the loading of the cog, `unload` will be called in order to      reinstate the original help command.      """ -    bot._old_help = bot.get_command('help') -    bot.remove_command('help') +    bot._old_help = bot.get_command("help") +    bot.remove_command("help")      try:          bot.add_cog(Help()) diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index bbcbf611..00810de8 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -2,12 +2,24 @@ import logging  import random  import re  import typing as t -from enum import Enum +from dataclasses import dataclass  import discord -from discord.ext import commands, tasks - -from bot.constants import Categories, Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import ( +    Categories, +    Channels, +    Colours, +    ERROR_REPLIES, +    Emojis, +    NEGATIVE_REPLIES, +    Tokens, +    WHITELISTED_CHANNELS +) +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command  log = logging.getLogger(__name__) @@ -15,20 +27,20 @@ BAD_RESPONSE = {      404: "Issue/pull request not located! Please enter a valid number!",      403: "Rate limit has been hit! Please try again later!"  } +REQUEST_HEADERS = { +    "Accept": "application/vnd.github.v3+json" +} -MAX_REQUESTS = 10 -REQUEST_HEADERS = dict() +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" -REPOS_API = "https://api.github.com/orgs/{org}/repos"  if GITHUB_TOKEN := Tokens.github:      REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"  WHITELISTED_CATEGORIES = (      Categories.development, Categories.devprojects, Categories.media, Categories.staff  ) -WHITELISTED_CHANNELS_ON_MESSAGE = ( -    Channels.organisation, Channels.mod_meta, Channels.mod_tools, Channels.staff_voice -)  CODE_BLOCK_RE = re.compile(      r"^`([^`\n]+)`"  # Inline codeblock @@ -36,173 +48,228 @@ CODE_BLOCK_RE = re.compile(      re.DOTALL | re.MULTILINE  ) +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( +    r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass +class FoundIssue: +    """Dataclass representing an issue found by the regex.""" + +    organisation: t.Optional[str] +    repository: str +    number: str + +    def __hash__(self) -> int: +        return hash((self.organisation, self.repository, self.number)) + + +@dataclass +class FetchError: +    """Dataclass representing an error while fetching an issue.""" + +    return_code: int +    message: str + -class FetchIssueErrors(Enum): -    """Errors returned in fetch issues.""" +@dataclass +class IssueState: +    """Dataclass representing the state of an issue.""" -    value_error = "Numbers not found." -    max_requests = "Max requests hit." +    repository: str +    number: int +    url: str +    title: str +    emoji: str  class Issues(commands.Cog):      """Cog that allows users to retrieve issues from GitHub.""" -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot):          self.bot = bot          self.repos = [] -        self.get_pydis_repos.start() - -    @tasks.loop(minutes=30) -    async def get_pydis_repos(self) -> None: -        """Get all python-discord repositories on github.""" -        async with self.bot.http_session.get(REPOS_API.format(org="python-discord")) as resp: -            if resp.status == 200: -                data = await resp.json() -                for repo in data: -                    self.repos.append(repo["full_name"].split("/")[1]) -                self.repo_regex = "|".join(self.repos) -            else: -                log.debug(f"Failed to get latest Pydis repositories. Status code {resp.status}")      @staticmethod -    def check_in_block(message: discord.Message, repo_issue: str) -> bool: -        """Check whether the <repo>#<issue> is in codeblocks.""" -        block = re.findall(CODE_BLOCK_RE, message.content) - -        if not block: -            return False -        elif "#".join(repo_issue.split(" ")) in "".join([*block[0]]): -            return True -        return False +    def remove_codeblocks(message: str) -> str: +        """Remove any codeblock in a message.""" +        return re.sub(CODE_BLOCK_RE, "", message)      async def fetch_issues(              self, -            numbers: set, +            number: int,              repository: str,              user: str -    ) -> t.Union[FetchIssueErrors, str, list]: -        """Retrieve issue(s) from a GitHub repository.""" -        links = [] -        if not numbers: -            return FetchIssueErrors.value_error - -        if len(numbers) > MAX_REQUESTS: -            return FetchIssueErrors.max_requests - -        for number in numbers: -            url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" -            merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" -            log.trace(f"Querying GH issues API: {url}") -            async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: -                json_data = await r.json() - -            if r.status in BAD_RESPONSE: -                log.warning(f"Received response {r.status} from: {url}") -                return f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}" - -            # The initial API request is made to the issues API endpoint, which will return information -            # if the issue or PR is present. However, the scope of information returned for PRs differs -            # from issues: if the 'issues' key is present in the response then we can pull the data we -            # need from the initial API call. -            if "issues" in json_data.get("html_url"): -                if json_data.get("state") == "open": -                    icon_url = Emojis.issue -                else: -                    icon_url = Emojis.issue_closed - -            # If the 'issues' key is not contained in the API response and there is no error code, then -            # we know that a PR has been requested and a call to the pulls API endpoint is necessary -            # to get the desired information for the PR. +    ) -> t.Union[IssueState, FetchError]: +        """ +        Retrieve an issue from a GitHub repository. + +        Returns IssueState on success, FetchError on failure. +        """ +        url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) +        pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) +        log.trace(f"Querying GH issues API: {url}") + +        async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: +            json_data = await r.json() + +        if r.status == 403: +            if r.headers.get("X-RateLimit-Remaining") == "0": +                log.info(f"Ratelimit reached while fetching {url}") +                return FetchError(403, "Ratelimit reached, please retry in a few minutes.") +            return FetchError(403, "Cannot access issue.") +        elif r.status in (404, 410): +            return FetchError(r.status, "Issue not found.") +        elif r.status != 200: +            return FetchError(r.status, "Error while fetching issue.") + +        # The initial API request is made to the issues API endpoint, which will return information +        # if the issue or PR is present. However, the scope of information returned for PRs differs +        # from issues: if the 'issues' key is present in the response then we can pull the data we +        # need from the initial API call. +        if "issues" in json_data["html_url"]: +            if json_data.get("state") == "open": +                emoji = Emojis.issue_open              else: -                log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") -                async with self.bot.http_session.get(merge_url) as m: -                    if json_data.get("state") == "open": -                        icon_url = Emojis.pull_request -                    # When the status is 204 this means that the state of the PR is merged -                    elif m.status == 204: -                        icon_url = Emojis.merge -                    else: -                        icon_url = Emojis.pull_request_closed +                emoji = Emojis.issue_closed + +        # If the 'issues' key is not contained in the API response and there is no error code, then +        # we know that a PR has been requested and a call to the pulls API endpoint is necessary +        # to get the desired information for the PR. +        else: +            log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") +            async with self.bot.http_session.get(pulls_url) as p: +                pull_data = await p.json() +                if pull_data["draft"]: +                    emoji = Emojis.pull_request_draft +                elif pull_data["state"] == "open": +                    emoji = Emojis.pull_request_open +                # When 'merged_at' is not None, this means that the state of the PR is merged +                elif pull_data["merged_at"] is not None: +                    emoji = Emojis.pull_request_merged +                else: +                    emoji = Emojis.pull_request_closed -            issue_url = json_data.get("html_url") -            links.append([icon_url, f"[{repository}] #{number} {json_data.get('title')}", issue_url]) +        issue_url = json_data.get("html_url") -        return links +        return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)      @staticmethod -    def get_embed(result: list, user: str = "python-discord", repository: str = "") -> discord.Embed: -        """Get Response Embed.""" -        description_list = ["{0} [{1}]({2})".format(*link) for link in result] +    def format_embed( +        results: t.List[t.Union[IssueState, FetchError]], +        user: str, +        repository: t.Optional[str] = None +    ) -> discord.Embed: +        """Take a list of IssueState or FetchError and format a Discord embed for them.""" +        description_list = [] + +        for result in results: +            if isinstance(result, IssueState): +                description_list.append(f"{result.emoji} [{result.title}]({result.url})") +            elif isinstance(result, FetchError): +                description_list.append(f":x: [{result.return_code}] {result.message}") +          resp = discord.Embed(              colour=Colours.bright_green, -            description='\n'.join(description_list) +            description="\n".join(description_list)          ) -        resp.set_author(name="GitHub", url=f"https://github.com/{user}/{repository}") +        embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" +        resp.set_author(name="GitHub", url=embed_url)          return resp +    @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)      @commands.command(aliases=("pr",))      async def issue( -            self, -            ctx: commands.Context, -            numbers: commands.Greedy[int], -            repository: str = "sir-lancebot", -            user: str = "python-discord" +        self, +        ctx: commands.Context, +        numbers: commands.Greedy[int], +        repository: str = "sir-lancebot", +        user: str = "python-discord"      ) -> None:          """Command to retrieve issue(s) from a GitHub repository.""" -        if not( -            ctx.channel.category.id in WHITELISTED_CATEGORIES -            or ctx.channel.id in WHITELISTED_CHANNELS -        ): -            return - -        result = await self.fetch_issues(set(numbers), repository, user) +        # Remove duplicates +        numbers = set(numbers) -        if result == FetchIssueErrors.value_error: -            await ctx.invoke(self.bot.get_command('help'), 'issue') - -        elif result == FetchIssueErrors.max_requests: +        if len(numbers) > MAXIMUM_ISSUES:              embed = discord.Embed(                  title=random.choice(ERROR_REPLIES),                  color=Colours.soft_red, -                description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})" +                description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"              )              await ctx.send(embed=embed) +            await invoke_help_command(ctx) -        elif isinstance(result, list): -            # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link. -            resp = self.get_embed(result, user, repository) -            await ctx.send(embed=resp) - -        elif isinstance(result, str): -            await ctx.send(result) +        results = [await self.fetch_issues(number, repository, user) for number in numbers] +        await ctx.send(embed=self.format_embed(results, user, repository))      @commands.Cog.listener()      async def on_message(self, message: discord.Message) -> None: -        """Command to retrieve issue(s) from a GitHub repository using automatic linking if matching <repo>#<issue>.""" -        if not( -            message.channel.category.id in WHITELISTED_CATEGORIES -            or message.channel.id in WHITELISTED_CHANNELS_ON_MESSAGE -        ): +        """ +        Automatic issue linking. + +        Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. +        """ +        # Ignore bots +        if message.author.bot:              return -        message_repo_issue_map = re.findall(fr"({self.repo_regex})#(\d+)", message.content) +        issues = [ +            FoundIssue(*match.group("org", "repo", "number")) +            for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) +        ]          links = [] -        if message_repo_issue_map: -            for repo_issue in message_repo_issue_map: -                if not self.check_in_block(message, " ".join(repo_issue)): -                    result = await self.fetch_issues({repo_issue[1]}, repo_issue[0], "python-discord") -                    if isinstance(result, list): -                        links.extend(result) +        if issues: +            # Block this from working in DMs +            if not message.guild: +                await message.channel.send( +                    embed=discord.Embed( +                        title=random.choice(NEGATIVE_REPLIES), +                        description=( +                            "You can't retrieve issues from DMs. " +                            f"Try again in <#{Channels.community_bot_commands}>" +                        ), +                        colour=Colours.soft_red +                    ) +                ) +                return + +            log.trace(f"Found {issues = }") +            # Remove duplicates +            issues = set(issues) + +            if len(issues) > MAXIMUM_ISSUES: +                embed = discord.Embed( +                    title=random.choice(ERROR_REPLIES), +                    color=Colours.soft_red, +                    description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" +                ) +                await message.channel.send(embed=embed, delete_after=5) +                return + +            for repo_issue in issues: +                result = await self.fetch_issues( +                    int(repo_issue.number), +                    repo_issue.repository, +                    repo_issue.organisation or "python-discord" +                ) +                if isinstance(result, IssueState): +                    links.append(result)          if not links:              return -        resp = self.get_embed(links, "python-discord") +        resp = self.format_embed(links, "python-discord")          await message.channel.send(embed=resp) -def setup(bot: commands.Bot) -> None: -    """Cog Retrieves Issues From Github.""" +def setup(bot: Bot) -> None: +    """Load the Issues cog."""      bot.add_cog(Issues(bot)) diff --git a/bot/exts/evergreen/latex.py b/bot/exts/evergreen/latex.py new file mode 100644 index 00000000..36c7e0ab --- /dev/null +++ b/bot/exts/evergreen/latex.py @@ -0,0 +1,101 @@ +import asyncio +import hashlib +import pathlib +import re +from concurrent.futures import ThreadPoolExecutor +from io import BytesIO + +import discord +import matplotlib.pyplot as plt +from discord.ext import commands + +from bot.bot import Bot + +# configure fonts and colors for matplotlib +plt.rcParams.update( +    { +        "font.size": 16, +        "mathtext.fontset": "cm",  # Computer Modern font set +        "mathtext.rm": "serif", +        "figure.facecolor": "36393F",  # matches Discord's dark mode background color +        "text.color": "white", +    } +) + +FORMATTED_CODE_REGEX = re.compile( +    r"(?P<delim>(?P<block>```)|``?)"        # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block +    r"(?(block)(?:(?P<lang>[a-z]+)\n)?)"    # if we're in a block, match optional language (only letters plus newline) +    r"(?:[ \t]*\n)*"                        # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all code inside the markup +    r"\s*"                                  # any more whitespace before the end of the code markup +    r"(?P=delim)",                          # match the exact same delimiter from the start again +    re.DOTALL | re.IGNORECASE,              # "." also matches newlines, case insensitive +) + +CACHE_DIRECTORY = pathlib.Path("_latex_cache") +CACHE_DIRECTORY.mkdir(exist_ok=True) + + +class Latex(commands.Cog): +    """Renders latex.""" + +    @staticmethod +    def _render(text: str, filepath: pathlib.Path) -> BytesIO: +        """ +        Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. + +        Saves rendered image to cache. +        """ +        fig = plt.figure() +        rendered_image = BytesIO() +        fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") + +        try: +            plt.savefig(rendered_image, bbox_inches="tight", dpi=600) +        except ValueError as e: +            raise commands.BadArgument(str(e)) + +        rendered_image.seek(0) + +        with open(filepath, "wb") as f: +            f.write(rendered_image.getbuffer()) + +        return rendered_image + +    @staticmethod +    def _prepare_input(text: str) -> str: +        text = text.replace(r"\\", "$\n$")  # matplotlib uses \n for newlines, not \\ + +        if match := FORMATTED_CODE_REGEX.match(text): +            return match.group("code") +        else: +            return text + +    @commands.command() +    @commands.max_concurrency(1, commands.BucketType.guild, wait=True) +    async def latex(self, ctx: commands.Context, *, text: str) -> None: +        """Renders the text in latex and sends the image.""" +        text = self._prepare_input(text) +        query_hash = hashlib.md5(text.encode()).hexdigest() +        image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") +        async with ctx.typing(): +            if image_path.exists(): +                await ctx.send(file=discord.File(image_path)) +                return + +            with ThreadPoolExecutor() as pool: +                image = await asyncio.get_running_loop().run_in_executor( +                    pool, self._render, text, image_path +                ) + +            await ctx.send(file=discord.File(image, "latex.png")) + + +def setup(bot: Bot) -> None: +    """Load the Latex Cog.""" +    # As we have resource issues on this cog, +    # we have it currently disabled while we fix it. +    import logging +    logging.info("Latex cog is currently disabled. It won't be loaded.") +    return +    bot.add_cog(Latex()) diff --git a/bot/exts/evergreen/magic_8ball.py b/bot/exts/evergreen/magic_8ball.py index f974e487..28ddcea0 100644 --- a/bot/exts/evergreen/magic_8ball.py +++ b/bot/exts/evergreen/magic_8ball.py @@ -5,27 +5,26 @@ from pathlib import Path  from discord.ext import commands +from bot.bot import Bot +  log = logging.getLogger(__name__) +ANSWERS = json.loads(Path("bot/resources/evergreen/magic8ball.json").read_text("utf8")) +  class Magic8ball(commands.Cog):      """A Magic 8ball command to respond to a user's question.""" -    def __init__(self, bot: commands.Bot): -        self.bot = bot -        with open(Path("bot/resources/evergreen/magic8ball.json"), "r", encoding="utf8") as file: -            self.answers = json.load(file) -      @commands.command(name="8ball")      async def output_answer(self, ctx: commands.Context, *, question: str) -> None:          """Return a Magic 8ball answer from answers list."""          if len(question.split()) >= 3: -            answer = random.choice(self.answers) +            answer = random.choice(ANSWERS)              await ctx.send(answer)          else:              await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)") -def setup(bot: commands.Bot) -> None: -    """Magic 8ball Cog load.""" -    bot.add_cog(Magic8ball(bot)) +def setup(bot: Bot) -> None: +    """Load the Magic8Ball Cog.""" +    bot.add_cog(Magic8ball()) diff --git a/bot/exts/evergreen/minesweeper.py b/bot/exts/evergreen/minesweeper.py index 286ac7a5..932358f9 100644 --- a/bot/exts/evergreen/minesweeper.py +++ b/bot/exts/evergreen/minesweeper.py @@ -6,8 +6,11 @@ from random import randint, random  import discord  from discord.ext import commands +from bot.bot import Bot  from bot.constants import Client +from bot.utils.converters import CoordinateConverter  from bot.utils.exceptions import UserNotPlayingError +from bot.utils.extensions import invoke_help_command  MESSAGE_MAPPING = {      0: ":stop_button:", @@ -30,33 +33,6 @@ MESSAGE_MAPPING = {  log = logging.getLogger(__name__) -class CoordinateConverter(commands.Converter): -    """Converter for Coordinates.""" - -    async def convert(self, ctx: commands.Context, coordinate: str) -> typing.Tuple[int, int]: -        """Take in a coordinate string and turn it into an (x, y) tuple.""" -        if not 2 <= len(coordinate) <= 3: -            raise commands.BadArgument('Invalid co-ordinate provided') - -        coordinate = coordinate.lower() -        if coordinate[0].isalpha(): -            digit = coordinate[1:] -            letter = coordinate[0] -        else: -            digit = coordinate[:-1] -            letter = coordinate[-1] - -        if not digit.isdigit(): -            raise commands.BadArgument - -        x = ord(letter) - ord('a') -        y = int(digit) - 1 - -        if (not 0 <= x <= 9) or (not 0 <= y <= 9): -            raise commands.BadArgument -        return x, y - -  GameBoard = typing.List[typing.List[typing.Union[str, int]]] @@ -77,13 +53,13 @@ GamesDict = typing.Dict[int, Game]  class Minesweeper(commands.Cog):      """Play a game of Minesweeper.""" -    def __init__(self, bot: commands.Bot) -> None: +    def __init__(self) -> None:          self.games: GamesDict = {}  # Store the currently running games -    @commands.group(name='minesweeper', aliases=('ms',), invoke_without_command=True) +    @commands.group(name="minesweeper", aliases=("ms",), invoke_without_command=True)      async def minesweeper_group(self, ctx: commands.Context) -> None:          """Commands for Playing Minesweeper.""" -        await ctx.send_help(ctx.command) +        await invoke_help_command(ctx)      @staticmethod      def get_neighbours(x: int, y: int) -> typing.Generator[typing.Tuple[int, int], None, None]: @@ -147,7 +123,7 @@ class Minesweeper(commands.Cog):                  f"Close the game with `{Client.prefix}ms end`\n"              )          except discord.errors.Forbidden: -            log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members") +            log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members.")              await ctx.send(f":x: {ctx.author.mention}, please enable DMs to play minesweeper.")              return @@ -157,7 +133,7 @@ class Minesweeper(commands.Cog):          dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}")          if ctx.guild: -            await ctx.send(f"{ctx.author.mention} is playing Minesweeper") +            await ctx.send(f"{ctx.author.mention} is playing Minesweeper.")              chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}")          else:              chat_msg = None @@ -236,17 +212,17 @@ class Minesweeper(commands.Cog):              return True      async def reveal_one( -            self, -            ctx: commands.Context, -            revealed: GameBoard, -            board: GameBoard, -            x: int, -            y: int +        self, +        ctx: commands.Context, +        revealed: GameBoard, +        board: GameBoard, +        x: int, +        y: int      ) -> bool:          """          Reveal one square. -        return is True if the game ended, breaking the loop in `reveal_command` and deleting the game +        return is True if the game ended, breaking the loop in `reveal_command` and deleting the game.          """          revealed[y][x] = board[y][x]          if board[y][x] == "bomb": @@ -284,13 +260,13 @@ class Minesweeper(commands.Cog):          game = self.games[ctx.author.id]          game.revealed = game.board          await self.update_boards(ctx) -        new_msg = f":no_entry: Game canceled :no_entry:\n{game.dm_msg.content}" +        new_msg = f":no_entry: Game canceled. :no_entry:\n{game.dm_msg.content}"          await game.dm_msg.edit(content=new_msg)          if game.activated_on_server:              await game.chat_msg.edit(content=new_msg)          del self.games[ctx.author.id] -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the Minesweeper cog.""" -    bot.add_cog(Minesweeper(bot)) +    bot.add_cog(Minesweeper()) diff --git a/bot/exts/evergreen/movie.py b/bot/exts/evergreen/movie.py index 340a5724..c6af4bcd 100644 --- a/bot/exts/evergreen/movie.py +++ b/bot/exts/evergreen/movie.py @@ -2,13 +2,14 @@ import logging  import random  from enum import Enum  from typing import Any, Dict, List, Tuple -from urllib.parse import urlencode  from aiohttp import ClientSession  from discord import Embed -from discord.ext.commands import Bot, Cog, Context, group +from discord.ext.commands import Cog, Context, group +from bot.bot import Bot  from bot.constants import Tokens +from bot.utils.extensions import invoke_help_command  from bot.utils.pagination import ImagePaginator  # Define base URL of TMDB @@ -49,10 +50,9 @@ class Movie(Cog):      """Movie Cog contains movies command that grab random movies from TMDB."""      def __init__(self, bot: Bot): -        self.bot = bot          self.http_session: ClientSession = bot.http_session -    @group(name='movies', aliases=['movie'], invoke_without_command=True) +    @group(name="movies", aliases=("movie",), invoke_without_command=True)      async def movies(self, ctx: Context, genre: str = "", amount: int = 5) -> None:          """          Get random movies by specifying genre. Also support amount parameter, that define how much movies will be shown. @@ -71,15 +71,17 @@ class Movie(Cog):          # Capitalize genre for getting data from Enum, get random page, send help when genre don't exist.          genre = genre.capitalize()          try: -            result = await self.get_movies_list(self.http_session, MovieGenres[genre].value, 1) +            result = await self.get_movies_data(self.http_session, MovieGenres[genre].value, 1)          except KeyError: -            await ctx.send_help('movies') +            await invoke_help_command(ctx)              return          # Check if "results" is in result. If not, throw error. -        if "results" not in result.keys(): -            err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \ -                      f"{result['status_message']}." +        if "results" not in result: +            err_msg = ( +                f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " +                f"{result['status_message']}." +            )              await ctx.send(err_msg)              logger.warning(err_msg) @@ -87,8 +89,8 @@ class Movie(Cog):          page = random.randint(1, result["total_pages"])          # Get movies list from TMDB, check if results key in result. When not, raise error. -        movies = await self.get_movies_list(self.http_session, MovieGenres[genre].value, page) -        if 'results' not in movies.keys(): +        movies = await self.get_movies_data(self.http_session, MovieGenres[genre].value, page) +        if "results" not in movies:              err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \                        f"{result['status_message']}."              await ctx.send(err_msg) @@ -100,12 +102,12 @@ class Movie(Cog):          await ImagePaginator.paginate(pages, ctx, embed) -    @movies.command(name='genres', aliases=['genre', 'g']) +    @movies.command(name="genres", aliases=("genre", "g"))      async def genres(self, ctx: Context) -> None:          """Show all currently available genres for .movies command."""          await ctx.send(f"Current available genres: {', '.join('`' + genre.name + '`' for genre in MovieGenres)}") -    async def get_movies_list(self, client: ClientSession, genre_id: str, page: int) -> Dict[str, Any]: +    async def get_movies_data(self, client: ClientSession, genre_id: str, page: int) -> List[Dict[str, Any]]:          """Return JSON of TMDB discover request."""          # Define params of request          params = { @@ -118,10 +120,10 @@ class Movie(Cog):              "with_genres": genre_id          } -        url = BASE_URL + "discover/movie?" + urlencode(params) +        url = BASE_URL + "discover/movie"          # Make discover request to TMDB, return result -        async with client.get(url) as resp: +        async with client.get(url, params=params) as resp:              return await resp.json()      async def get_pages(self, client: ClientSession, movies: Dict[str, Any], amount: int) -> List[Tuple[str, str]]: @@ -129,7 +131,7 @@ class Movie(Cog):          pages = []          for i in range(amount): -            movie_id = movies['results'][i]['id'] +            movie_id = movies["results"][i]["id"]              movie = await self.get_movie(client, movie_id)              page, img = await self.create_page(movie) @@ -139,9 +141,11 @@ class Movie(Cog):      async def get_movie(self, client: ClientSession, movie: int) -> Dict:          """Get Movie by movie ID from TMDB. Return result dictionary.""" -        url = BASE_URL + f"movie/{movie}?" + urlencode(MOVIE_PARAMS) +        if not isinstance(movie, int): +            raise ValueError("Error while fetching movie from TMDB, movie argument must be integer. ") +        url = BASE_URL + f"movie/{movie}" -        async with client.get(url) as resp: +        async with client.get(url, params=MOVIE_PARAMS) as resp:              return await resp.json()      async def create_page(self, movie: Dict[str, Any]) -> Tuple[str, str]: @@ -150,7 +154,7 @@ class Movie(Cog):          # Add title + tagline (if not empty)          text += f"**{movie['title']}**\n" -        if movie['tagline']: +        if movie["tagline"]:              text += f"{movie['tagline']}\n\n"          else:              text += "\n" @@ -161,8 +165,8 @@ class Movie(Cog):          text += "__**Production Information**__\n" -        companies = movie['production_companies'] -        countries = movie['production_countries'] +        companies = movie["production_companies"] +        countries = movie["production_countries"]          text += f"**Made by:** {', '.join(company['name'] for company in companies)}\n"          text += f"**Made in:** {', '.join(country['name'] for country in countries)}\n\n" @@ -172,8 +176,8 @@ class Movie(Cog):          budget = f"{movie['budget']:,d}" if movie['budget'] else "?"          revenue = f"{movie['revenue']:,d}" if movie['revenue'] else "?" -        if movie['runtime'] is not None: -            duration = divmod(movie['runtime'], 60) +        if movie["runtime"] is not None: +            duration = divmod(movie["runtime"], 60)          else:              duration = ("?", "?") @@ -181,7 +185,7 @@ class Movie(Cog):          text += f"**Revenue:** ${revenue}\n"          text += f"**Duration:** {f'{duration[0]} hour(s) {duration[1]} minute(s)'}\n\n" -        text += movie['overview'] +        text += movie["overview"]          img = f"http://image.tmdb.org/t/p/w200{movie['poster_path']}" @@ -197,5 +201,5 @@ class Movie(Cog):  def setup(bot: Bot) -> None: -    """Load Movie Cog.""" +    """Load the Movie Cog."""      bot.add_cog(Movie(bot)) diff --git a/bot/exts/evergreen/ping.py b/bot/exts/evergreen/ping.py new file mode 100644 index 00000000..6be78117 --- /dev/null +++ b/bot/exts/evergreen/ping.py @@ -0,0 +1,45 @@ +import arrow +from dateutil.relativedelta import relativedelta +from discord import Embed +from discord.ext import commands + +from bot import start_time +from bot.bot import Bot +from bot.constants import Colours + + +class Ping(commands.Cog): +    """Get info about the bot's ping and uptime.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @commands.command(name="ping") +    async def ping(self, ctx: commands.Context) -> None: +        """Ping the bot to see its latency and state.""" +        embed = Embed( +            title=":ping_pong: Pong!", +            colour=Colours.bright_green, +            description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms", +        ) + +        await ctx.send(embed=embed) + +    # Originally made in 70d2170a0a6594561d59c7d080c4280f1ebcd70b by lemon & gdude2002 +    @commands.command(name="uptime") +    async def uptime(self, ctx: commands.Context) -> None: +        """Get the current uptime of the bot.""" +        difference = relativedelta(start_time - arrow.utcnow()) +        uptime_string = start_time.shift( +            seconds=-difference.seconds, +            minutes=-difference.minutes, +            hours=-difference.hours, +            days=-difference.days +        ).humanize() + +        await ctx.send(f"I started up {uptime_string}.") + + +def setup(bot: Bot) -> None: +    """Load the Ping cog.""" +    bot.add_cog(Ping(bot)) diff --git a/bot/exts/evergreen/pythonfacts.py b/bot/exts/evergreen/pythonfacts.py new file mode 100644 index 00000000..80a8da5d --- /dev/null +++ b/bot/exts/evergreen/pythonfacts.py @@ -0,0 +1,36 @@ +import itertools + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +with open("bot/resources/evergreen/python_facts.txt") as file: +    FACTS = itertools.cycle(list(file)) + +COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow]) +PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93" + + +class PythonFacts(commands.Cog): +    """Sends a random fun fact about Python.""" + +    @commands.command(name="pythonfact", aliases=("pyfact",)) +    async def get_python_fact(self, ctx: commands.Context) -> None: +        """Sends a Random fun fact about Python.""" +        embed = discord.Embed( +            title="Python Facts", +            description=next(FACTS), +            colour=next(COLORS) +        ) +        embed.add_field( +            name="Suggestions", +            value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})" +        ) +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the PythonFacts Cog.""" +    bot.add_cog(PythonFacts()) diff --git a/bot/exts/evergreen/realpython.py b/bot/exts/evergreen/realpython.py new file mode 100644 index 00000000..5d9e5c5c --- /dev/null +++ b/bot/exts/evergreen/realpython.py @@ -0,0 +1,81 @@ +import logging +from html import unescape +from urllib.parse import quote_plus + +from discord import Embed +from discord.ext import commands + +from bot import bot +from bot.constants import Colours + +logger = logging.getLogger(__name__) + + +API_ROOT = "https://realpython.com/search/api/v1/" +ARTICLE_URL = "https://realpython.com{article_url}" +SEARCH_URL = "https://realpython.com/search?q={user_search}" + + +ERROR_EMBED = Embed( +    title="Error while searching Real Python", +    description="There was an error while trying to reach Real Python. Please try again shortly.", +    color=Colours.soft_red, +) + + +class RealPython(commands.Cog): +    """User initiated command to search for a Real Python article.""" + +    def __init__(self, bot: bot.Bot): +        self.bot = bot + +    @commands.command(aliases=["rp"]) +    @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) +    async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: +        """Send 5 articles that match the user's search terms.""" +        params = {"q": user_search, "limit": 5, "kind": "article"} +        async with self.bot.http_session.get(url=API_ROOT, params=params) as response: +            if response.status != 200: +                logger.error( +                    f"Unexpected status code {response.status} from Real Python" +                ) +                await ctx.send(embed=ERROR_EMBED) +                return + +            data = await response.json() + +        articles = data["results"] + +        if len(articles) == 0: +            no_articles = Embed( +                title=f"No articles found for '{user_search}'", color=Colours.soft_red +            ) +            await ctx.send(embed=no_articles) +            return + +        if len(articles) == 1: +            article_description = "Here is the result:" +        else: +            article_description = f"Here are the top {len(articles)} results:" + +        article_embed = Embed( +            title="Search results - Real Python", +            url=SEARCH_URL.format(user_search=quote_plus(user_search)), +            description=article_description, +            color=Colours.orange, +        ) + +        for article in articles: +            article_embed.add_field( +                name=unescape(article["title"]), +                value=ARTICLE_URL.format(article_url=article["url"]), +                inline=False, +            ) +        article_embed.set_footer(text="Click the links to go to the articles.") + +        await ctx.send(embed=article_embed) + + +def setup(bot: bot.Bot) -> None: +    """Load the Real Python Cog.""" +    bot.add_cog(RealPython(bot)) diff --git a/bot/exts/evergreen/recommend_game.py b/bot/exts/evergreen/recommend_game.py index 5e262a5b..35d60128 100644 --- a/bot/exts/evergreen/recommend_game.py +++ b/bot/exts/evergreen/recommend_game.py @@ -6,13 +6,14 @@ from random import shuffle  import discord  from discord.ext import commands +from bot.bot import Bot +  log = logging.getLogger(__name__)  game_recs = []  # Populate the list `game_recs` with resource files  for rec_path in Path("bot/resources/evergreen/game_recs").glob("*.json"): -    with rec_path.open(encoding='utf8') as file: -        data = json.load(file) +    data = json.loads(rec_path.read_text("utf8"))      game_recs.append(data)  shuffle(game_recs) @@ -20,11 +21,11 @@ shuffle(game_recs)  class RecommendGame(commands.Cog):      """Commands related to recommending games.""" -    def __init__(self, bot: commands.Bot) -> None: +    def __init__(self, bot: Bot) -> None:          self.bot = bot          self.index = 0 -    @commands.command(name="recommendgame", aliases=['gamerec']) +    @commands.command(name="recommendgame", aliases=("gamerec",))      async def recommend_game(self, ctx: commands.Context) -> None:          """Sends an Embed of a random game recommendation."""          if self.index >= len(game_recs): @@ -33,18 +34,18 @@ class RecommendGame(commands.Cog):          game = game_recs[self.index]          self.index += 1 -        author = self.bot.get_user(int(game['author'])) +        author = self.bot.get_user(int(game["author"]))          # Creating and formatting Embed          embed = discord.Embed(color=discord.Colour.blue())          if author is not None:              embed.set_author(name=author.name, icon_url=author.avatar_url) -        embed.set_image(url=game['image']) -        embed.add_field(name='Recommendation: ' + game['title'] + '\n' + game['link'], value=game['description']) +        embed.set_image(url=game["image"]) +        embed.add_field(name=f"Recommendation: {game['title']}\n{game['link']}", value=game["description"])          await ctx.send(embed=embed) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Loads the RecommendGame cog."""      bot.add_cog(RecommendGame(bot)) diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py index 49127bea..4df170c6 100644 --- a/bot/exts/evergreen/reddit.py +++ b/bot/exts/evergreen/reddit.py @@ -1,128 +1,368 @@ +import asyncio  import logging  import random +import textwrap +from collections import namedtuple +from datetime import datetime, timedelta +from typing import List, Union -import discord -from discord.ext import commands -from discord.ext.commands.cooldowns import BucketType +from aiohttp import BasicAuth, ClientError +from discord import Colour, Embed, TextChannel +from discord.ext.commands import Cog, Context, group, has_any_role +from discord.ext.tasks import loop +from discord.utils import escape_markdown, sleep_until -from bot.utils.pagination import ImagePaginator +from bot.bot import Bot +from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES +from bot.utils.converters import Subreddit +from bot.utils.extensions import invoke_help_command +from bot.utils.messages import sub_clyde +from bot.utils.pagination import ImagePaginator, LinePaginator  log = logging.getLogger(__name__) +AccessToken = namedtuple("AccessToken", ["token", "expires_at"]) -class Reddit(commands.Cog): -    """Fetches reddit posts.""" -    def __init__(self, bot: commands.Bot): +class Reddit(Cog): +    """Track subreddit posts and show detailed statistics about them.""" + +    HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"} +    URL = "https://www.reddit.com" +    OAUTH_URL = "https://oauth.reddit.com" +    MAX_RETRIES = 3 + +    def __init__(self, bot: Bot):          self.bot = bot -    async def fetch(self, url: str) -> dict: -        """Send a get request to the reddit API and get json response.""" -        session = self.bot.http_session -        params = { -            'limit': 50 -        } -        headers = { -            'User-Agent': 'Iceman' -        } - -        async with session.get(url=url, params=params, headers=headers) as response: -            return await response.json() - -    @commands.command(name='reddit') -    @commands.cooldown(1, 10, BucketType.user) -    async def get_reddit(self, ctx: commands.Context, subreddit: str = 'python', sort: str = "hot") -> None: -        """ -        Fetch reddit posts by using this command. +        self.webhook = None +        self.access_token = None +        self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret) -        Gets a post from r/python by default. -        Usage: -        --> .reddit [subreddit_name] [hot/top/new] -        """ +        bot.loop.create_task(self.init_reddit_ready()) +        self.auto_poster_loop.start() + +    def cog_unload(self) -> None: +        """Stop the loop task and revoke the access token when the cog is unloaded.""" +        self.auto_poster_loop.cancel() +        if self.access_token and self.access_token.expires_at > datetime.utcnow(): +            asyncio.create_task(self.revoke_access_token()) + +    async def init_reddit_ready(self) -> None: +        """Sets the reddit webhook when the cog is loaded.""" +        await self.bot.wait_until_guild_available() +        if not self.webhook: +            self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook) + +    @property +    def channel(self) -> TextChannel: +        """Get the #reddit channel object from the bot's cache.""" +        return self.bot.get_channel(Channels.reddit) + +    def build_pagination_pages(self, posts: List[dict], paginate: bool) -> Union[List[tuple], str]: +        """Build embed pages required for Paginator."""          pages = [] -        sort_list = ["hot", "new", "top", "rising"] -        if sort.lower() not in sort_list: -            await ctx.send(f"Invalid sorting: {sort}\nUsing default sorting: `Hot`") -            sort = "hot" +        first_page = "" +        for post in posts: +            post_page = "" +            image_url = "" -        data = await self.fetch(f'https://www.reddit.com/r/{subreddit}/{sort}/.json') +            data = post["data"] -        try: -            posts = data["data"]["children"] -        except KeyError: -            return await ctx.send('Subreddit not found!') -        if not posts: -            return await ctx.send('No posts available!') +            title = textwrap.shorten(data["title"], width=50, placeholder="...") + +            # Normal brackets interfere with Markdown. +            title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌") +            link = self.URL + data["permalink"] + +            first_page += f"**[{title.replace('*', '')}]({link})**\n" + +            text = data["selftext"] +            if text: +                text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌") +                first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n" + +            ups = data["ups"] +            comments = data["num_comments"] +            author = data["author"] + +            content_type = Emojis.reddit_post_text +            if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))): +                # This means the content type in the post is a video. +                content_type = f"{Emojis.reddit_post_video}" + +            elif data["url"].endswith(("jpg", "png", "gif")): +                # This means the content type in the post is an image. +                content_type = f"{Emojis.reddit_post_photo}" +                image_url = data["url"] -        if posts[1]["data"]["over_18"] is True: -            return await ctx.send( -                "You cannot access this Subreddit as it is ment for those who " -                "are 18 years or older." +            first_page += ( +                f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}" +                f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n"              ) -        embed_titles = "" +            if paginate: +                post_page += f"**[{title}]({link})**\n\n" +                if text: +                    post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n" +                post_page += ( +                    f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002" +                    f"{comments}\u2003{Emojis.reddit_users}{author}" +                ) -        # Chooses k unique random elements from a population sequence or set. -        random_posts = random.sample(posts, k=5) +                pages.append((post_page, image_url)) -        # ----------------------------------------------------------- -        # This code below is bound of change when the emojis are added. +        if not paginate: +            # Return the first summery page if pagination is not required +            return first_page -        upvote_emoji = self.bot.get_emoji(755845219890757644) -        comment_emoji = self.bot.get_emoji(755845255001014384) -        user_emoji = self.bot.get_emoji(755845303822974997) -        text_emoji = self.bot.get_emoji(676030265910493204) -        video_emoji = self.bot.get_emoji(676030265839190047) -        image_emoji = self.bot.get_emoji(676030265734201344) -        reddit_emoji = self.bot.get_emoji(676030265734332427) +        pages.insert(0, (first_page, ""))  # Using image paginator, hence settings image url to empty string +        return pages -        # ------------------------------------------------------------ +    async def get_access_token(self) -> None: +        """ +        Get a Reddit API OAuth2 access token and assign it to self.access_token. -        for i, post in enumerate(random_posts, start=1): -            post_title = post["data"]["title"][0:50] -            post_url = post['data']['url'] -            if post_title == "": -                post_title = "No Title." -            elif post_title == post_url: -                post_title = "Title is itself a link." +        A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog +        will be unloaded and a ClientError raised if retrieval was still unsuccessful. +        """ +        for i in range(1, self.MAX_RETRIES + 1): +            response = await self.bot.http_session.post( +                url=f"{self.URL}/api/v1/access_token", +                headers=self.HEADERS, +                auth=self.client_auth, +                data={ +                    "grant_type": "client_credentials", +                    "duration": "temporary" +                } +            ) -            # ------------------------------------------------------------------ -            # Embed building. +            if response.status == 200 and response.content_type == "application/json": +                content = await response.json() +                expiration = int(content["expires_in"]) - 60  # Subtract 1 minute for leeway. +                self.access_token = AccessToken( +                    token=content["access_token"], +                    expires_at=datetime.utcnow() + timedelta(seconds=expiration) +                ) -            embed_titles += f"**{i}.[{post_title}]({post_url})**\n" -            image_url = " " -            post_stats = f"{text_emoji}"  # Set default content type to text. +                log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}") +                return +            else: +                log.debug( +                    f"Failed to get an access token: " +                    f"status {response.status} & content type {response.content_type}; " +                    f"retrying ({i}/{self.MAX_RETRIES})" +                ) -            if post["data"]["is_video"] is True or "youtube" in post_url.split("."): -                # This means the content type in the post is a video. -                post_stats = f"{video_emoji} " +            await asyncio.sleep(3) -            elif post_url.endswith("jpg") or post_url.endswith("png") or post_url.endswith("gif"): -                # This means the content type in the post is an image. -                post_stats = f"{image_emoji} " -                image_url = post_url - -            votes = f'{upvote_emoji}{post["data"]["ups"]}' -            comments = f'{comment_emoji}\u2002{ post["data"]["num_comments"]}' -            post_stats += ( -                f"\u2002{votes}\u2003" -                f"{comments}" -                f'\u2003{user_emoji}\u2002{post["data"]["author"]}\n' +        self.bot.remove_cog(self.qualified_name) +        raise ClientError("Authentication with the Reddit API failed. Unloading the cog.") + +    async def revoke_access_token(self) -> None: +        """ +        Revoke the OAuth2 access token for the Reddit API. + +        For security reasons, it's good practice to revoke the token when it's no longer being used. +        """ +        response = await self.bot.http_session.post( +            url=f"{self.URL}/api/v1/revoke_token", +            headers=self.HEADERS, +            auth=self.client_auth, +            data={ +                "token": self.access_token.token, +                "token_type_hint": "access_token" +            } +        ) + +        if response.status in [200, 204] and response.content_type == "application/json": +            self.access_token = None +        else: +            log.warning(f"Unable to revoke access token: status {response.status}.") + +    async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]: +        """A helper method to fetch a certain amount of Reddit posts at a given route.""" +        # Reddit's JSON responses only provide 25 posts at most. +        if not 25 >= amount > 0: +            raise ValueError("Invalid amount of subreddit posts requested.") + +        # Renew the token if necessary. +        if not self.access_token or self.access_token.expires_at < datetime.utcnow(): +            await self.get_access_token() + +        url = f"{self.OAUTH_URL}/{route}" +        for _ in range(self.MAX_RETRIES): +            response = await self.bot.http_session.get( +                url=url, +                headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"}, +                params=params +            ) +            if response.status == 200 and response.content_type == 'application/json': +                # Got appropriate response - process and return. +                content = await response.json() +                posts = content["data"]["children"] + +                filtered_posts = [post for post in posts if not post["data"]["over_18"]] + +                return filtered_posts[:amount] + +            await asyncio.sleep(3) + +        log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}") +        return list()  # Failed to get appropriate response within allowed number of retries. + +    async def get_top_posts( +            self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False +    ) -> Union[Embed, List[tuple]]: +        """ +        Get the top amount of posts for a given subreddit within a specified timeframe. + +        A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top +        weekly posts. + +        The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most. +        """ +        embed = Embed() + +        posts = await self.fetch_posts( +            route=f"{subreddit}/top", +            amount=amount, +            params={"t": time} +        ) +        if not posts: +            embed.title = random.choice(ERROR_REPLIES) +            embed.colour = Colour.red() +            embed.description = ( +                "Sorry! We couldn't find any SFW posts from that subreddit. " +                "If this problem persists, please let us know."              ) -            embed_titles += f"{post_stats}\n" -            page_text = f"**[{post_title}]({post_url})**\n{post_stats}\n{post['data']['selftext'][0:200]}" -            embed = discord.Embed() -            page_tuple = (page_text, image_url) -            pages.append(page_tuple) +            return embed + +        if paginate: +            return self.build_pagination_pages(posts, paginate=True) + +        # Use only starting summary page for #reddit channel posts. +        embed.description = self.build_pagination_pages(posts, paginate=False) +        embed.colour = Colour.blurple() +        return embed + +    @loop() +    async def auto_poster_loop(self) -> None: +        """Post the top 5 posts daily, and the top 5 posts weekly.""" +        # once d.py get support for `time` parameter in loop decorator, +        # this can be removed and the loop can use the `time=datetime.time.min` parameter +        now = datetime.utcnow() +        tomorrow = now + timedelta(days=1) +        midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0) + +        await sleep_until(midnight_tomorrow) + +        await self.bot.wait_until_guild_available() +        if not self.webhook: +            await self.bot.fetch_webhook(RedditConfig.webhook) + +        if datetime.utcnow().weekday() == 0: +            await self.top_weekly_posts() +            # if it's a monday send the top weekly posts + +        for subreddit in RedditConfig.subreddits: +            top_posts = await self.get_top_posts(subreddit=subreddit, time="day") +            username = sub_clyde(f"{subreddit} Top Daily Posts") +            message = await self.webhook.send(username=username, embed=top_posts, wait=True) + +            if message.channel.is_news(): +                await message.publish() + +    async def top_weekly_posts(self) -> None: +        """Post a summary of the top posts.""" +        for subreddit in RedditConfig.subreddits: +            # Send and pin the new weekly posts. +            top_posts = await self.get_top_posts(subreddit=subreddit, time="week") +            username = sub_clyde(f"{subreddit} Top Weekly Posts") +            message = await self.webhook.send(wait=True, username=username, embed=top_posts) -            # ------------------------------------------------------------------ +            if subreddit.lower() == "r/python": +                if not self.channel: +                    log.warning("Failed to get #reddit channel to remove pins in the weekly loop.") +                    return + +                # Remove the oldest pins so that only 12 remain at most. +                pins = await self.channel.pins() + +                while len(pins) >= 12: +                    await pins[-1].unpin() +                    del pins[-1] + +                await message.pin() + +                if message.channel.is_news(): +                    await message.publish() + +    @group(name="reddit", invoke_without_command=True) +    async def reddit_group(self, ctx: Context) -> None: +        """View the top posts from various subreddits.""" +        await invoke_help_command(ctx) + +    @reddit_group.command(name="top") +    async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: +        """Send the top posts of all time from a given subreddit.""" +        async with ctx.typing(): +            pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True) + +        await ctx.send(f"Here are the top {subreddit} posts of all time!") +        embed = Embed( +            color=Colour.blurple() +        ) -        pages.insert(0, (embed_titles, " ")) -        embed.set_author(name=f"r/{posts[0]['data']['subreddit']} - {sort}", icon_url=reddit_emoji.url)          await ImagePaginator.paginate(pages, ctx, embed) +    @reddit_group.command(name="daily") +    async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: +        """Send the top posts of today from a given subreddit.""" +        async with ctx.typing(): +            pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True) + +        await ctx.send(f"Here are today's top {subreddit} posts!") +        embed = Embed( +            color=Colour.blurple() +        ) + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @reddit_group.command(name="weekly") +    async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: +        """Send the top posts of this week from a given subreddit.""" +        async with ctx.typing(): +            pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True) + +        await ctx.send(f"Here are this week's top {subreddit} posts!") +        embed = Embed( +            color=Colour.blurple() +        ) + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @has_any_role(*STAFF_ROLES) +    @reddit_group.command(name="subreddits", aliases=("subs",)) +    async def subreddits_command(self, ctx: Context) -> None: +        """Send a paginated embed of all the subreddits we're relaying.""" +        embed = Embed() +        embed.title = "Relayed subreddits." +        embed.colour = Colour.blurple() + +        await LinePaginator.paginate( +            RedditConfig.subreddits, +            ctx, embed, +            footer_text="Use the reddit commands along with these to view their posts.", +            empty=False, +            max_lines=15 +        ) + -def setup(bot: commands.Bot) -> None: -    """Load the Cog.""" +def setup(bot: Bot) -> None: +    """Load the Reddit cog.""" +    if not RedditConfig.secret or not RedditConfig.client_id: +        log.error("Credentials not provided, cog not loaded.") +        return      bot.add_cog(Reddit(bot)) diff --git a/bot/exts/evergreen/rps.py b/bot/exts/evergreen/rps.py new file mode 100644 index 00000000..c6bbff46 --- /dev/null +++ b/bot/exts/evergreen/rps.py @@ -0,0 +1,57 @@ +from random import choice + +from discord.ext import commands + +from bot.bot import Bot + +CHOICES = ["rock", "paper", "scissors"] +SHORT_CHOICES = ["r", "p", "s"] + +# Using a dictionary instead of conditions to check for the winner. +WINNER_DICT = { +    "r": { +        "r": 0, +        "p": -1, +        "s": 1, +    }, +    "p": { +        "r": 1, +        "p": 0, +        "s": -1, +    }, +    "s": { +        "r": -1, +        "p": 1, +        "s": 0, +    } +} + + +class RPS(commands.Cog): +    """Rock Paper Scissors. The Classic Game!""" + +    @commands.command(case_insensitive=True) +    async def rps(self, ctx: commands.Context, move: str) -> None: +        """Play the classic game of Rock Paper Scissors with your own sir-lancebot!""" +        move = move.lower() +        player_mention = ctx.author.mention + +        if move not in CHOICES and move not in SHORT_CHOICES: +            raise commands.BadArgument(f"Invalid move. Please make move from options: {', '.join(CHOICES).upper()}.") + +        bot_move = choice(CHOICES) +        # value of player_result will be from (-1, 0, 1) as (lost, tied, won). +        player_result = WINNER_DICT[move[0]][bot_move[0]] + +        if player_result == 0: +            message_string = f"{player_mention} You and Sir Lancebot played {bot_move}, it's a tie." +            await ctx.send(message_string) +        elif player_result == 1: +            await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} won!") +        else: +            await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} lost!") + + +def setup(bot: Bot) -> None: +    """Load the RPS Cog.""" +    bot.add_cog(RPS(bot)) diff --git a/bot/exts/evergreen/snakes/__init__.py b/bot/exts/evergreen/snakes/__init__.py index bc42f0c2..7740429b 100644 --- a/bot/exts/evergreen/snakes/__init__.py +++ b/bot/exts/evergreen/snakes/__init__.py @@ -1,12 +1,11 @@  import logging -from discord.ext import commands - +from bot.bot import Bot  from bot.exts.evergreen.snakes._snakes_cog import Snakes  log = logging.getLogger(__name__) -def setup(bot: commands.Bot) -> None: -    """Snakes Cog load.""" +def setup(bot: Bot) -> None: +    """Load the Snakes Cog."""      bot.add_cog(Snakes(bot)) diff --git a/bot/exts/evergreen/snakes/_converter.py b/bot/exts/evergreen/snakes/_converter.py index eee248cf..c8d1909b 100644 --- a/bot/exts/evergreen/snakes/_converter.py +++ b/bot/exts/evergreen/snakes/_converter.py @@ -5,7 +5,7 @@ from typing import Iterable, List  import discord  from discord.ext.commands import Context, Converter -from fuzzywuzzy import fuzz +from rapidfuzz import fuzz  from bot.exts.evergreen.snakes._utils import SNAKE_RESOURCES  from bot.utils import disambiguate @@ -24,8 +24,8 @@ class Snake(Converter):          await self.build_list()          name = name.lower() -        if name == 'python': -            return 'Python (programming language)' +        if name == "python": +            return "Python (programming language)"          def get_potential(iterable: Iterable, *, threshold: int = 80) -> List[str]:              nonlocal name @@ -47,12 +47,12 @@ class Snake(Converter):          if name.lower() in self.special_cases:              return self.special_cases.get(name.lower(), name.lower()) -        names = {snake['name']: snake['scientific'] for snake in self.snakes} +        names = {snake["name"]: snake["scientific"] for snake in self.snakes}          all_names = names.keys() | names.values()          timeout = len(all_names) * (3 / 4)          embed = discord.Embed( -            title='Found multiple choices. Please choose the correct one.', colour=0x59982F) +            title="Found multiple choices. Please choose the correct one.", colour=0x59982F)          embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.avatar_url)          name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed) @@ -63,14 +63,11 @@ class Snake(Converter):          """Build list of snakes from the static snake resources."""          # Get all the snakes          if cls.snakes is None: -            with (SNAKE_RESOURCES / "snake_names.json").open(encoding="utf8") as snakefile: -                cls.snakes = json.load(snakefile) - +            cls.snakes = json.loads((SNAKE_RESOURCES / "snake_names.json").read_text("utf8"))          # Get the special cases          if cls.special_cases is None: -            with (SNAKE_RESOURCES / "special_snakes.json").open(encoding="utf8") as snakefile: -                special_cases = json.load(snakefile) -            cls.special_cases = {snake['name'].lower(): snake for snake in special_cases} +            special_cases = json.loads((SNAKE_RESOURCES / "special_snakes.json").read_text("utf8")) +            cls.special_cases = {snake["name"].lower(): snake for snake in special_cases}      @classmethod      async def random(cls) -> str: @@ -81,5 +78,5 @@ class Snake(Converter):          so I can get it from here.          """          await cls.build_list() -        names = [snake['scientific'] for snake in cls.snakes] +        names = [snake["scientific"] for snake in cls.snakes]          return random.choice(names) diff --git a/bot/exts/evergreen/snakes/_snakes_cog.py b/bot/exts/evergreen/snakes/_snakes_cog.py index d5e4f206..07d3c363 100644 --- a/bot/exts/evergreen/snakes/_snakes_cog.py +++ b/bot/exts/evergreen/snakes/_snakes_cog.py @@ -9,19 +9,20 @@ import textwrap  import urllib  from functools import partial  from io import BytesIO -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional -import aiohttp  import async_timeout  from PIL import Image, ImageDraw, ImageFont  from discord import Colour, Embed, File, Member, Message, Reaction  from discord.errors import HTTPException -from discord.ext.commands import Bot, Cog, CommandError, Context, bot_has_permissions, group +from discord.ext.commands import Cog, CommandError, Context, bot_has_permissions, group +from bot.bot import Bot  from bot.constants import ERROR_REPLIES, Tokens  from bot.exts.evergreen.snakes import _utils as utils  from bot.exts.evergreen.snakes._converter import Snake  from bot.utils.decorators import locked +from bot.utils.extensions import invoke_help_command  log = logging.getLogger(__name__) @@ -142,8 +143,8 @@ class Snakes(Cog):      https://github.com/python-discord/code-jam-1      """ -    wiki_brief = re.compile(r'(.*?)(=+ (.*?) =+)', flags=re.DOTALL) -    valid_image_extensions = ('gif', 'png', 'jpeg', 'jpg', 'webp') +    wiki_brief = re.compile(r"(.*?)(=+ (.*?) =+)", flags=re.DOTALL) +    valid_image_extensions = ("gif", "png", "jpeg", "jpg", "webp")      def __init__(self, bot: Bot):          self.active_sal = {} @@ -182,28 +183,28 @@ class Snakes(Cog):          # Get the size of the snake icon, configure the height of the image box (yes, it changes)          icon_width = 347  # Hardcoded, not much i can do about that          icon_height = int((icon_width / snake.width) * snake.height) -        frame_copies = icon_height // CARD['frame'].height + 1 +        frame_copies = icon_height // CARD["frame"].height + 1          snake.thumbnail((icon_width, icon_height))          # Get the dimensions of the final image -        main_height = icon_height + CARD['top'].height + CARD['bottom'].height -        main_width = CARD['frame'].width +        main_height = icon_height + CARD["top"].height + CARD["bottom"].height +        main_width = CARD["frame"].width          # Start creating the foreground          foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) -        foreground.paste(CARD['top'], (0, 0)) +        foreground.paste(CARD["top"], (0, 0))          # Generate the frame borders to the correct height          for offset in range(frame_copies): -            position = (0, CARD['top'].height + offset * CARD['frame'].height) -            foreground.paste(CARD['frame'], position) +            position = (0, CARD["top"].height + offset * CARD["frame"].height) +            foreground.paste(CARD["frame"], position)          # Add the image and bottom part of the image -        foreground.paste(snake, (36, CARD['top'].height))  # Also hardcoded :( -        foreground.paste(CARD['bottom'], (0, CARD['top'].height + icon_height)) +        foreground.paste(snake, (36, CARD["top"].height))  # Also hardcoded :( +        foreground.paste(CARD["bottom"], (0, CARD["top"].height + icon_height))          # Setup the background -        back = random.choice(CARD['backs']) +        back = random.choice(CARD["backs"])          back_copies = main_height // back.height + 1          full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) @@ -215,11 +216,11 @@ class Snakes(Cog):          full_image.paste(foreground, (0, 0), foreground)          # Get the first two sentences of the info -        description = '.'.join(content['info'].split(".")[:2]) + '.' +        description = ".".join(content["info"].split(".")[:2]) + "."          # Setup positioning variables          margin = 36 -        offset = CARD['top'].height + icon_height + margin +        offset = CARD["top"].height + icon_height + margin          # Create blank rectangle image which will be behind the text          rectangle = Image.new( @@ -241,12 +242,12 @@ class Snakes(Cog):          # Draw the text onto the final image          draw = ImageDraw.Draw(full_image)          for line in textwrap.wrap(description, 36): -            draw.text([margin + 4, offset], line, font=CARD['font']) -            offset += CARD['font'].getsize(line)[1] +            draw.text([margin + 4, offset], line, font=CARD["font"]) +            offset += CARD["font"].getsize(line)[1]          # Get the image contents as a BufferIO object          buffer = BytesIO() -        full_image.save(buffer, 'PNG') +        full_image.save(buffer, "PNG")          buffer.seek(0)          return buffer @@ -274,13 +275,13 @@ class Snakes(Cog):          return message -    async def _fetch(self, session: aiohttp.ClientSession, url: str, params: dict = None) -> dict: +    async def _fetch(self, url: str, params: Optional[dict] = None) -> dict:          """Asynchronous web request helper method."""          if params is None:              params = {}          async with async_timeout.timeout(10): -            async with session.get(url, params=params) as response: +            async with self.bot.http_session.get(url, params=params) as response:                  return await response.json()      def _get_random_long_message(self, messages: List[str], retries: int = 10) -> str: @@ -308,96 +309,95 @@ class Snakes(Cog):          """          snake_info = {} -        async with aiohttp.ClientSession() as session: -            params = { -                'format': 'json', -                'action': 'query', -                'list': 'search', -                'srsearch': name, -                'utf8': '', -                'srlimit': '1', -            } - -            json = await self._fetch(session, URL, params=params) - -            # Wikipedia does have a error page -            try: -                pageid = json["query"]["search"][0]["pageid"] -            except KeyError: -                # Wikipedia error page ID(?) -                pageid = 41118 -            except IndexError: -                return None - -            params = { -                'format': 'json', -                'action': 'query', -                'prop': 'extracts|images|info', -                'exlimit': 'max', -                'explaintext': '', -                'inprop': 'url', -                'pageids': pageid -            } +        params = { +            "format": "json", +            "action": "query", +            "list": "search", +            "srsearch": name, +            "utf8": "", +            "srlimit": "1", +        } -            json = await self._fetch(session, URL, params=params) +        json = await self._fetch(URL, params=params) -            # Constructing dict - handle exceptions later -            try: -                snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"] -                snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"] -                snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"] -                snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"] -                snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"] -            except KeyError: -                snake_info["error"] = True - -            if snake_info["images"]: -                i_url = 'https://commons.wikimedia.org/wiki/Special:FilePath/' -                image_list = [] -                map_list = [] -                thumb_list = [] - -                # Wikipedia has arbitrary images that are not snakes -                banned = [ -                    'Commons-logo.svg', -                    'Red%20Pencil%20Icon.png', -                    'distribution', -                    'The%20Death%20of%20Cleopatra%20arthur.jpg', -                    'Head%20of%20holotype', -                    'locator', -                    'Woma.png', -                    '-map.', -                    '.svg', -                    'ange.', -                    'Adder%20(PSF).png' -                ] - -                for image in snake_info["images"]: -                    # Images come in the format of `File:filename.extension` -                    file, sep, filename = image["title"].partition(':') -                    filename = filename.replace(" ", "%20")  # Wikipedia returns good data! - -                    if not filename.startswith('Map'): -                        if any(ban in filename for ban in banned): -                            pass -                        else: -                            image_list.append(f"{i_url}{filename}") -                            thumb_list.append(f"{i_url}{filename}?width=100") +        # Wikipedia does have a error page +        try: +            pageid = json["query"]["search"][0]["pageid"] +        except KeyError: +            # Wikipedia error page ID(?) +            pageid = 41118 +        except IndexError: +            return None + +        params = { +            "format": "json", +            "action": "query", +            "prop": "extracts|images|info", +            "exlimit": "max", +            "explaintext": "", +            "inprop": "url", +            "pageids": pageid +        } + +        json = await self._fetch(URL, params=params) + +        # Constructing dict - handle exceptions later +        try: +            snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"] +            snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"] +            snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"] +            snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"] +            snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"] +        except KeyError: +            snake_info["error"] = True + +        if snake_info["images"]: +            i_url = "https://commons.wikimedia.org/wiki/Special:FilePath/" +            image_list = [] +            map_list = [] +            thumb_list = [] + +            # Wikipedia has arbitrary images that are not snakes +            banned = [ +                "Commons-logo.svg", +                "Red%20Pencil%20Icon.png", +                "distribution", +                "The%20Death%20of%20Cleopatra%20arthur.jpg", +                "Head%20of%20holotype", +                "locator", +                "Woma.png", +                "-map.", +                ".svg", +                "ange.", +                "Adder%20(PSF).png" +            ] + +            for image in snake_info["images"]: +                # Images come in the format of `File:filename.extension` +                file, sep, filename = image["title"].partition(":") +                filename = filename.replace(" ", "%20")  # Wikipedia returns good data! + +                if not filename.startswith("Map"): +                    if any(ban in filename for ban in banned): +                        pass                      else: -                        map_list.append(f"{i_url}{filename}") +                        image_list.append(f"{i_url}{filename}") +                        thumb_list.append(f"{i_url}{filename}?width=100") +                else: +                    map_list.append(f"{i_url}{filename}") -            snake_info["image_list"] = image_list -            snake_info["map_list"] = map_list -            snake_info["thumb_list"] = thumb_list -            snake_info["name"] = name +        snake_info["image_list"] = image_list +        snake_info["map_list"] = map_list +        snake_info["thumb_list"] = thumb_list +        snake_info["name"] = name -            match = self.wiki_brief.match(snake_info['extract']) -            info = match.group(1) if match else None +        match = self.wiki_brief.match(snake_info["extract"]) +        info = match.group(1) if match else None -            if info: -                info = info.replace("\n", "\n\n")  # Give us some proper paragraphs. +        if info: +            info = info.replace("\n", "\n\n")  # Give us some proper paragraphs. -            snake_info["info"] = info +        snake_info["info"] = info          return snake_info @@ -422,7 +422,7 @@ class Snakes(Cog):          try:              reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate)          except asyncio.TimeoutError: -            await ctx.channel.send(f"You took too long. The correct answer was **{options[answer]}**.") +            await ctx.send(f"You took too long. The correct answer was **{options[answer]}**.")              await message.clear_reactions()              return @@ -437,13 +437,13 @@ class Snakes(Cog):      # endregion      # region: Commands -    @group(name='snakes', aliases=('snake',), invoke_without_command=True) +    @group(name="snakes", aliases=("snake",), invoke_without_command=True)      async def snakes_group(self, ctx: Context) -> None:          """Commands from our first code jam.""" -        await ctx.send_help(ctx.command) +        await invoke_help_command(ctx)      @bot_has_permissions(manage_messages=True) -    @snakes_group.command(name='antidote') +    @snakes_group.command(name="antidote")      @locked()      async def antidote_command(self, ctx: Context) -> None:          """ @@ -497,9 +497,11 @@ class Snakes(Cog):          for i in range(0, 10):              page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}")              page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}") -            board.append(f"`{i+1:02d}` " -                         f"{page_guess_list[i]} - " -                         f"{page_result_list[i]}") +            board.append( +                f"`{i+1:02d}` " +                f"{page_guess_list[i]} - " +                f"{page_result_list[i]}" +            )              board.append(EMPTY_UNICODE)          antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board))          board_id = await ctx.send(embed=antidote_embed)  # Display board @@ -577,15 +579,19 @@ class Snakes(Cog):              antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")              antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.avatar_url)              antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif") -            antidote_embed.add_field(name=EMPTY_UNICODE, -                                     value=f"Sorry you didnt make the antidote in time.\n" -                                           f"The formula was {' '.join(antidote_answer)}") +            antidote_embed.add_field( +                name=EMPTY_UNICODE, +                value=( +                    f"Sorry you didnt make the antidote in time.\n" +                    f"The formula was {' '.join(antidote_answer)}" +                ) +            )              await board_id.edit(embed=antidote_embed)          log.debug("Ending pagination and removing all reactions...")          await board_id.clear_reactions() -    @snakes_group.command(name='draw') +    @snakes_group.command(name="draw")      async def draw_command(self, ctx: Context) -> None:          """          Draws a random snek using Perlin noise. @@ -620,10 +626,10 @@ class Snakes(Cog):                  bg_color=bg_color              )              png_bytes = utils.frame_to_png_bytes(image_frame) -            file = File(png_bytes, filename='snek.png') +            file = File(png_bytes, filename="snek.png")              await ctx.send(file=file) -    @snakes_group.command(name='get') +    @snakes_group.command(name="get")      @bot_has_permissions(manage_messages=True)      @locked()      async def get_command(self, ctx: Context, *, name: Snake = None) -> None: @@ -641,8 +647,9 @@ class Snakes(Cog):              else:                  data = await self._get_snek(name) -            if data.get('error'): -                return await ctx.send('Could not fetch data from Wikipedia.') +            if data.get("error"): +                await ctx.send("Could not fetch data from Wikipedia.") +                return              description = data["info"] @@ -660,19 +667,25 @@ class Snakes(Cog):              # Build and send the embed.              embed = Embed( -                title=data.get("title", data.get('name')), +                title=data.get("title", data.get("name")),                  description=description,                  colour=0x59982F,              ) -            emoji = 'https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png' -            image = next((url for url in data['image_list'] -                          if url.endswith(self.valid_image_extensions)), emoji) +            emoji = "https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png" + +            _iter = ( +                url +                for url in data["image_list"] +                if url.endswith(self.valid_image_extensions) +            ) +            image = next(_iter, emoji) +              embed.set_image(url=image)              await ctx.send(embed=embed) -    @snakes_group.command(name='guess', aliases=('identify',)) +    @snakes_group.command(name="guess", aliases=("identify",))      @locked()      async def guess_command(self, ctx: Context) -> None:          """ @@ -692,11 +705,15 @@ class Snakes(Cog):                  data = await self._get_snek(snake) -                image = next((url for url in data['image_list'] -                              if url.endswith(self.valid_image_extensions)), None) +                _iter = ( +                    url +                    for url in data["image_list"] +                    if url.endswith(self.valid_image_extensions) +                ) +                image = next(_iter, None)              embed = Embed( -                title='Which of the following is the snake in the image?', +                title="Which of the following is the snake in the image?",                  description="\n".join(                      f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes),                  colour=SNAKE_COLOR @@ -707,7 +724,7 @@ class Snakes(Cog):          options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes}          await self._validate_answer(ctx, guess, answer, options) -    @snakes_group.command(name='hatch') +    @snakes_group.command(name="hatch")      async def hatch_command(self, ctx: Context) -> None:          """          Hatches your personal snake. @@ -719,7 +736,7 @@ class Snakes(Cog):          snake_image = utils.snakes[snake_name]          # Hatch the snake -        message = await ctx.channel.send(embed=Embed(description="Hatching your snake :snake:...")) +        message = await ctx.send(embed=Embed(description="Hatching your snake :snake:..."))          await asyncio.sleep(1)          for stage in utils.stages: @@ -733,12 +750,12 @@ class Snakes(Cog):          my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name))          my_snake_embed.set_thumbnail(url=snake_image)          my_snake_embed.set_footer( -            text=" Owner: {0}#{1}".format(ctx.message.author.name, ctx.message.author.discriminator) +            text=" Owner: {0}#{1}".format(ctx.author.name, ctx.author.discriminator)          ) -        await ctx.channel.send(embed=my_snake_embed) +        await ctx.send(embed=my_snake_embed) -    @snakes_group.command(name='movie') +    @snakes_group.command(name="movie")      async def movie_command(self, ctx: Context) -> None:          """          Gets a random snake-related movie from TMDB. @@ -799,12 +816,12 @@ class Snakes(Cog):          embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png")          try: -            await ctx.channel.send(embed=embed) +            await ctx.send(embed=embed)          except HTTPException as err: -            await ctx.channel.send("An error occurred while fetching a snake-related movie!") +            await ctx.send("An error occurred while fetching a snake-related movie!")              raise err from None -    @snakes_group.command(name='quiz') +    @snakes_group.command(name="quiz")      @locked()      async def quiz_command(self, ctx: Context) -> None:          """ @@ -827,10 +844,10 @@ class Snakes(Cog):              )          ) -        quiz = await ctx.channel.send("", embed=embed) +        quiz = await ctx.send(embed=embed)          await self._validate_answer(ctx, quiz, answer, options) -    @snakes_group.command(name='name', aliases=('name_gen',)) +    @snakes_group.command(name="name", aliases=("name_gen",))      async def name_command(self, ctx: Context, *, name: str = None) -> None:          """          Snakifies a username. @@ -854,7 +871,7 @@ class Snakes(Cog):          This was written by Iceman, and modified for inclusion into the bot by lemon.          """          snake_name = await self._get_snake_name() -        snake_name = snake_name['name'] +        snake_name = snake_name["name"]          snake_prefix = ""          # Set aside every word in the snake name except the last. @@ -899,9 +916,10 @@ class Snakes(Cog):              color=SNAKE_COLOR          ) -        return await ctx.send(embed=embed) +        await ctx.send(embed=embed) +        return -    @snakes_group.command(name='sal') +    @snakes_group.command(name="sal")      @locked()      async def sal_command(self, ctx: Context) -> None:          """ @@ -920,7 +938,7 @@ class Snakes(Cog):          await game.open_game() -    @snakes_group.command(name='about') +    @snakes_group.command(name="about")      async def about_command(self, ctx: Context) -> None:          """Show an embed with information about the event, its participants, and its winners."""          contributors = [ @@ -963,9 +981,9 @@ class Snakes(Cog):              )          ) -        await ctx.channel.send(embed=embed) +        await ctx.send(embed=embed) -    @snakes_group.command(name='card') +    @snakes_group.command(name="card")      async def card_command(self, ctx: Context, *, name: Snake = None) -> None:          """          Create an interesting little card from a snake. @@ -975,7 +993,7 @@ class Snakes(Cog):          # Get the snake data we need          if not name:              name_obj = await self._get_snake_name() -            name = name_obj['scientific'] +            name = name_obj["scientific"]              content = await self._get_snek(name)          elif isinstance(name, dict): @@ -989,7 +1007,7 @@ class Snakes(Cog):              stream = BytesIO()              async with async_timeout.timeout(10): -                async with self.bot.http_session.get(content['image_list'][0]) as response: +                async with self.bot.http_session.get(content["image_list"][0]) as response:                      stream.write(await response.read())              stream.seek(0) @@ -1000,10 +1018,10 @@ class Snakes(Cog):          # Send it!          await ctx.send(              f"A wild {content['name'].title()} appears!", -            file=File(final_buffer, filename=content['name'].replace(" ", "") + ".png") +            file=File(final_buffer, filename=content["name"].replace(" ", "") + ".png")          ) -    @snakes_group.command(name='fact') +    @snakes_group.command(name="fact")      async def fact_command(self, ctx: Context) -> None:          """          Gets a snake-related fact. @@ -1017,9 +1035,9 @@ class Snakes(Cog):              color=SNAKE_COLOR,              description=question          ) -        await ctx.channel.send(embed=embed) +        await ctx.send(embed=embed) -    @snakes_group.command(name='snakify') +    @snakes_group.command(name="snakify")      async def snakify_command(self, ctx: Context, *, message: str = None) -> None:          """          How would I talk if I were a snake? @@ -1032,14 +1050,14 @@ class Snakes(Cog):          """          with ctx.typing():              embed = Embed() -            user = ctx.message.author +            user = ctx.author              if not message:                  # Get a random message from the users history                  messages = [] -                async for message in ctx.channel.history(limit=500).filter( -                        lambda msg: msg.author == ctx.message.author  # Message was sent by author. +                async for message in ctx.history(limit=500).filter( +                        lambda msg: msg.author == ctx.author  # Message was sent by author.                  ):                      messages.append(message.content) @@ -1058,9 +1076,9 @@ class Snakes(Cog):              )              embed.description = f"*{self._snakify(message)}*" -            await ctx.channel.send(embed=embed) +            await ctx.send(embed=embed) -    @snakes_group.command(name='video', aliases=('get_video',)) +    @snakes_group.command(name="video", aliases=("get_video",))      async def video_command(self, ctx: Context, *, search: str = None) -> None:          """          Gets a YouTube video about snakes. @@ -1071,13 +1089,13 @@ class Snakes(Cog):          """          # Are we searching for anything specific?          if search: -            query = search + ' snake' +            query = search + " snake"          else:              snake = await self._get_snake_name() -            query = snake['name'] +            query = snake["name"]          # Build the URL and make the request -        url = 'https://www.googleapis.com/youtube/v3/search' +        url = "https://www.googleapis.com/youtube/v3/search"          response = await self.bot.http_session.get(              url,              params={ @@ -1093,14 +1111,14 @@ class Snakes(Cog):          # Send the user a video          if len(data) > 0:              num = random.randint(0, len(data) - 1) -            youtube_base_url = 'https://www.youtube.com/watch?v=' -            await ctx.channel.send( +            youtube_base_url = "https://www.youtube.com/watch?v=" +            await ctx.send(                  content=f"{youtube_base_url}{data[num]['id']['videoId']}"              )          else:              log.warning(f"YouTube API error. Full response looks like {response}") -    @snakes_group.command(name='zen') +    @snakes_group.command(name="zen")      async def zen_command(self, ctx: Context) -> None:          """          Gets a random quote from the Zen of Python, except as if spoken by a snake. @@ -1119,7 +1137,7 @@ class Snakes(Cog):          # Embed and send          embed.description = zen_quote -        await ctx.channel.send( +        await ctx.send(              embed=embed          )      # endregion diff --git a/bot/exts/evergreen/snakes/_utils.py b/bot/exts/evergreen/snakes/_utils.py index 7d6caf04..f996d7f8 100644 --- a/bot/exts/evergreen/snakes/_utils.py +++ b/bot/exts/evergreen/snakes/_utils.py @@ -17,45 +17,46 @@ from bot.constants import Roles  SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() -h1 = r'''``` +h1 = r"""```          ----         ------       /--------\       |--------|       |--------|        \------/ -        ----```''' -h2 = r'''``` +        ----```""" +h2 = r"""```          ----         ------       /---\-/--\       |-----\--|       |--------|        \------/ -        ----```''' -h3 = r'''``` +        ----```""" +h3 = r"""```          ----         ------       /---\-/--\       |-----\--|       |-----/--|        \----\-/ -        ----```''' -h4 = r'''``` +        ----```""" +h4 = r"""```          -----         -----  \       /--|  /---\       |--\  -\---|       |--\--/--  /        \------- / -        ------```''' +        ------```"""  stages = [h1, h2, h3, h4]  snakes = {      "Baby Python": "https://i.imgur.com/SYOcmSa.png",      "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png",      "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png",      "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png", -    "Baby Cobra": "https://i.imgur.com/jk14ryt.png" +    "Baby Cobra": "https://i.imgur.com/jk14ryt.png", +    "Baby Anaconda": "https://i.imgur.com/EpdrnNr.png",  }  BOARD_TILE_SIZE = 56         # the size of each board tile @@ -114,8 +115,7 @@ ANGLE_RANGE = math.pi * 2  def get_resource(file: str) -> List[dict]:      """Load Snake resources JSON.""" -    with (SNAKE_RESOURCES / f"{file}.json").open(encoding="utf-8") as snakefile: -        return json.load(snakefile) +    return json.loads((SNAKE_RESOURCES / f"{file}.json").read_text("utf-8"))  def smoothstep(t: float) -> float: @@ -191,8 +191,9 @@ class PerlinNoiseFactory(object):      def get_plain_noise(self, *point) -> float:          """Get plain noise for a single point, without taking into account either octaves or tiling."""          if len(point) != self.dimension: -            raise ValueError("Expected {0} values, got {1}".format( -                self.dimension, len(point))) +            raise ValueError( +                f"Expected {self.dimension} values, got {len(point)}" +            )          # Build a list of the (min, max) bounds in each dimension          grid_coords = [] @@ -321,7 +322,7 @@ def create_snek_frame(          image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y])      ) -    image = Image.new(mode='RGB', size=image_dimensions, color=bg_color) +    image = Image.new(mode="RGB", size=image_dimensions, color=bg_color)      draw = ImageDraw(image)      for index in range(1, len(points)):          point = points[index] @@ -345,7 +346,7 @@ def create_snek_frame(  def frame_to_png_bytes(image: Image) -> io.BytesIO:      """Convert image to byte stream."""      stream = io.BytesIO() -    image.save(stream, format='PNG') +    image.save(stream, format="PNG")      stream.seek(0)      return stream @@ -373,7 +374,7 @@ class SnakeAndLaddersGame:          self.snakes = snakes          self.ctx = context          self.channel = self.ctx.channel -        self.state = 'booting' +        self.state = "booting"          self.started = False          self.author = self.ctx.author          self.players = [] @@ -413,7 +414,7 @@ class SnakeAndLaddersGame:              "**Snakes and Ladders**: A new game is about to start!",              file=File(                  str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), -                filename='Snakes and Ladders.jpg' +                filename="Snakes and Ladders.jpg"              )          )          startup = await self.channel.send( @@ -423,7 +424,7 @@ class SnakeAndLaddersGame:          for emoji in STARTUP_SCREEN_EMOJI:              await startup.add_reaction(emoji) -        self.state = 'waiting' +        self.state = "waiting"          while not self.started:              try: @@ -460,7 +461,7 @@ class SnakeAndLaddersGame:          self.players.append(user)          self.player_tiles[user.id] = 1 -        avatar_bytes = await user.avatar_url_as(format='jpeg', size=PLAYER_ICON_IMAGE_SIZE).read() +        avatar_bytes = await user.avatar_url_as(format="jpeg", size=PLAYER_ICON_IMAGE_SIZE).read()          im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))          self.avatar_images[user.id] = im @@ -475,7 +476,7 @@ class SnakeAndLaddersGame:              if user == p:                  await self.channel.send(user.mention + " You are already in the game.", delete_after=10)                  return -        if self.state != 'waiting': +        if self.state != "waiting":              await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10)              return          if len(self.players) is MAX_PLAYERS: @@ -510,7 +511,7 @@ class SnakeAndLaddersGame:                      delete_after=10                  ) -                if self.state != 'waiting' and len(self.players) == 0: +                if self.state != "waiting" and len(self.players) == 0:                      await self.channel.send("**Snakes and Ladders**: The game has been surrendered!")                      is_surrendered = True                      self._destruct() @@ -535,12 +536,12 @@ class SnakeAndLaddersGame:              await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)              return -        if not self.state == 'waiting': +        if not self.state == "waiting":              await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)              return -        self.state = 'starting' -        player_list = ', '.join(user.mention for user in self.players) +        self.state = "starting" +        player_list = ", ".join(user.mention for user in self.players)          await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)          await self.start_round() @@ -556,10 +557,10 @@ class SnakeAndLaddersGame:                  ))              ) -        self.state = 'roll' +        self.state = "roll"          for user in self.players:              self.round_has_rolled[user.id] = False -        board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg")) +        board_img = Image.open(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg")          player_row_size = math.ceil(MAX_PLAYERS / 2)          for i, player in enumerate(self.players): @@ -574,8 +575,8 @@ class SnakeAndLaddersGame:              board_img.paste(self.avatar_images[player.id],                              box=(x_offset, y_offset)) -        board_file = File(frame_to_png_bytes(board_img), filename='Board.jpg') -        player_list = '\n'.join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players) +        board_file = File(frame_to_png_bytes(board_img), filename="Board.jpg") +        player_list = "\n".join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)          # Store and send new messages          temp_board = await self.channel.send( @@ -644,7 +645,7 @@ class SnakeAndLaddersGame:          if user.id not in self.player_tiles:              await self.channel.send(user.mention + " You are not in the match.", delete_after=10)              return -        if self.state != 'roll': +        if self.state != "roll":              await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10)              return          if self.round_has_rolled[user.id]: @@ -673,7 +674,7 @@ class SnakeAndLaddersGame:      async def _complete_round(self) -> None:          """At the conclusion of a round check to see if there's been a winner.""" -        self.state = 'post_round' +        self.state = "post_round"          # check for winner          winner = self._check_winner() @@ -688,7 +689,7 @@ class SnakeAndLaddersGame:      def _check_winner(self) -> Member:          """Return a winning member if we're in the post-round state and there's a winner.""" -        if self.state != 'post_round': +        if self.state != "post_round":              return None          return next((player for player in self.players if self.player_tiles[player.id] == 100),                      None) diff --git a/bot/exts/evergreen/source.py b/bot/exts/evergreen/source.py index cdfe54ec..fc209bc3 100644 --- a/bot/exts/evergreen/source.py +++ b/bot/exts/evergreen/source.py @@ -1,39 +1,18 @@  import inspect  from pathlib import Path -from typing import Optional, Tuple, Union +from typing import Optional, Tuple  from discord import Embed  from discord.ext import commands +from bot.bot import Bot  from bot.constants import Source - -SourceType = Union[commands.Command, commands.Cog, str, commands.ExtensionNotLoaded] - - -class SourceConverter(commands.Converter): -    """Convert an argument into a help command, tag, command, or cog.""" - -    async def convert(self, ctx: commands.Context, argument: str) -> SourceType: -        """Convert argument into source object.""" -        cog = ctx.bot.get_cog(argument) -        if cog: -            return cog - -        cmd = ctx.bot.get_command(argument) -        if cmd: -            return cmd - -        raise commands.BadArgument( -            f"Unable to convert `{argument}` to valid command or Cog." -        ) +from bot.utils.converters import SourceConverter, SourceType  class BotSource(commands.Cog):      """Displays information about the bot's source code.""" -    def __init__(self, bot: commands.Bot): -        self.bot = bot -      @commands.command(name="source", aliases=("src",))      async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None:          """Display information and a GitHub link to the source code of a command, tag, or cog.""" @@ -54,7 +33,8 @@ class BotSource(commands.Cog):          Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval).          """          if isinstance(source_item, commands.Command): -            src = source_item.callback.__code__ +            callback = inspect.unwrap(source_item.callback) +            src = callback.__code__              filename = src.co_filename          else:              src = type(source_item) @@ -76,7 +56,7 @@ class BotSource(commands.Cog):          file_location = Path(filename).relative_to(Path.cwd()).as_posix() -        url = f"{Source.github}/blob/master/{file_location}{lines_extension}" +        url = f"{Source.github}/blob/main/{file_location}{lines_extension}"          return url, file_location, first_line_no or None @@ -85,12 +65,8 @@ class BotSource(commands.Cog):          url, location, first_line = self.get_source_link(source_object)          if isinstance(source_object, commands.Command): -            if source_object.cog_name == 'Help': -                title = "Help Command" -                description = source_object.__doc__.splitlines()[1] -            else: -                description = source_object.short_doc -                title = f"Command: {source_object.qualified_name}" +            description = source_object.short_doc +            title = f"Command: {source_object.qualified_name}"          else:              title = f"Cog: {source_object.qualified_name}"              description = source_object.description.splitlines()[0] @@ -104,6 +80,6 @@ class BotSource(commands.Cog):          return embed -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the BotSource cog.""" -    bot.add_cog(BotSource(bot)) +    bot.add_cog(BotSource()) diff --git a/bot/exts/evergreen/space.py b/bot/exts/evergreen/space.py index bc8e3118..5e87c6d5 100644 --- a/bot/exts/evergreen/space.py +++ b/bot/exts/evergreen/space.py @@ -1,15 +1,17 @@  import logging  import random  from datetime import date, datetime -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional  from urllib.parse import urlencode  from discord import Embed  from discord.ext import tasks -from discord.ext.commands import BadArgument, Cog, Context, Converter, group +from discord.ext.commands import Cog, Context, group  from bot.bot import Bot  from bot.constants import Tokens +from bot.utils.converters import DateConverter +from bot.utils.extensions import invoke_help_command  logger = logging.getLogger(__name__) @@ -20,25 +22,10 @@ NASA_EPIC_BASE_URL = "https://epic.gsfc.nasa.gov"  APOD_MIN_DATE = date(1995, 6, 16) -class DateConverter(Converter): -    """Parse SOL or earth date (in format YYYY-MM-DD) into `int` or `datetime`. When invalid input, raise error.""" - -    async def convert(self, ctx: Context, argument: str) -> Union[int, datetime]: -        """Parse date (SOL or earth) into `datetime` or `int`. When invalid value, raise error.""" -        if argument.isdigit(): -            return int(argument) -        try: -            date = datetime.strptime(argument, "%Y-%m-%d") -        except ValueError: -            raise BadArgument(f"Can't convert `{argument}` to `datetime` in format `YYYY-MM-DD` or `int` in SOL.") -        return date - -  class Space(Cog):      """Space Cog contains commands, that show images, facts or other information about space."""      def __init__(self, bot: Bot): -        self.bot = bot          self.http_session = bot.http_session          self.rovers = {} @@ -63,10 +50,10 @@ class Space(Cog):      @group(name="space", invoke_without_command=True)      async def space(self, ctx: Context) -> None:          """Head command that contains commands about space.""" -        await ctx.send_help("space") +        await invoke_help_command(ctx)      @space.command(name="apod") -    async def apod(self, ctx: Context, date: Optional[str] = None) -> None: +    async def apod(self, ctx: Context, date: Optional[str]) -> None:          """          Get Astronomy Picture of Day from NASA API. Date is optional parameter, what formatting is YYYY-MM-DD. @@ -99,7 +86,7 @@ class Space(Cog):          )      @space.command(name="nasa") -    async def nasa(self, ctx: Context, *, search_term: Optional[str] = None) -> None: +    async def nasa(self, ctx: Context, *, search_term: Optional[str]) -> None:          """Get random NASA information/facts + image. Support `search_term` parameter for more specific search."""          params = {              "media_type": "image" @@ -124,8 +111,8 @@ class Space(Cog):          )      @space.command(name="epic") -    async def epic(self, ctx: Context, date: Optional[str] = None) -> None: -        """Get one of latest random image of earth from NASA EPIC API. Support date parameter, format is YYYY-MM-DD.""" +    async def epic(self, ctx: Context, date: Optional[str]) -> None: +        """Get a random image of the Earth from the NASA EPIC API. Support date parameter, format is YYYY-MM-DD."""          if date:              try:                  show_date = datetime.strptime(date, "%Y-%m-%d").date().isoformat() @@ -160,8 +147,8 @@ class Space(Cog):      async def mars(          self,          ctx: Context, -        date: Optional[DateConverter] = None, -        rover: Optional[str] = "curiosity" +        date: Optional[DateConverter], +        rover: str = "curiosity"      ) -> None:          """          Get random Mars image by date. Support both SOL (martian solar day) and earth date and rovers. @@ -206,7 +193,7 @@ class Space(Cog):              )          ) -    @mars.command(name="dates", aliases=["d", "date", "rover", "rovers", "r"]) +    @mars.command(name="dates", aliases=("d", "date", "rover", "rovers", "r"))      async def dates(self, ctx: Context) -> None:          """Get current available rovers photo date ranges."""          await ctx.send("\n".join( @@ -241,7 +228,7 @@ class Space(Cog):  def setup(bot: Bot) -> None: -    """Load Space Cog.""" +    """Load the Space cog."""      if not Tokens.nasa:          logger.warning("Can't find NASA API key. Not loading Space Cog.")          return diff --git a/bot/exts/evergreen/speedrun.py b/bot/exts/evergreen/speedrun.py index 21aad5aa..774eff81 100644 --- a/bot/exts/evergreen/speedrun.py +++ b/bot/exts/evergreen/speedrun.py @@ -5,23 +5,22 @@ from random import choice  from discord.ext import commands +from bot.bot import Bot +  log = logging.getLogger(__name__) -with Path('bot/resources/evergreen/speedrun_links.json').open(encoding="utf8") as file: -    LINKS = json.load(file) + +LINKS = json.loads(Path("bot/resources/evergreen/speedrun_links.json").read_text("utf8"))  class Speedrun(commands.Cog):      """Commands about the video game speedrunning community.""" -    def __init__(self, bot: commands.Bot): -        self.bot = bot -      @commands.command(name="speedrun")      async def get_speedrun(self, ctx: commands.Context) -> None:          """Sends a link to a video of a random speedrun."""          await ctx.send(choice(LINKS)) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the Speedrun cog.""" -    bot.add_cog(Speedrun(bot)) +    bot.add_cog(Speedrun()) diff --git a/bot/exts/evergreen/stackoverflow.py b/bot/exts/evergreen/stackoverflow.py new file mode 100644 index 00000000..40f149c9 --- /dev/null +++ b/bot/exts/evergreen/stackoverflow.py @@ -0,0 +1,88 @@ +import logging +from html import unescape +from urllib.parse import quote_plus + +from discord import Embed, HTTPException +from discord.ext import commands + +from bot import bot +from bot.constants import Colours, Emojis + +logger = logging.getLogger(__name__) + +BASE_URL = "https://api.stackexchange.com/2.2/search/advanced" +SO_PARAMS = { +    "order": "desc", +    "sort": "activity", +    "site": "stackoverflow" +} +SEARCH_URL = "https://stackoverflow.com/search?q={query}" +ERR_EMBED = Embed( +    title="Error in fetching results from Stackoverflow", +    description=( +        "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some " +        "time. If this issue persists, please contact the staff or send a message in #dev-contrib." +    ), +    color=Colours.soft_red +) + + +class Stackoverflow(commands.Cog): +    """Contains command to interact with stackoverflow from discord.""" + +    def __init__(self, bot: bot.Bot): +        self.bot = bot + +    @commands.command(aliases=["so"]) +    @commands.cooldown(1, 15, commands.cooldowns.BucketType.user) +    async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None: +        """Sends the top 5 results of a search query from stackoverflow.""" +        params = SO_PARAMS | {"q": search_query} +        async with self.bot.http_session.get(url=BASE_URL, params=params) as response: +            if response.status == 200: +                data = await response.json() +            else: +                logger.error(f'Status code is not 200, it is {response.status}') +                await ctx.send(embed=ERR_EMBED) +                return +        if not data['items']: +            no_search_result = Embed( +                title=f"No search results found for {search_query}", +                color=Colours.soft_red +            ) +            await ctx.send(embed=no_search_result) +            return + +        top5 = data["items"][:5] +        encoded_search_query = quote_plus(search_query) +        embed = Embed( +            title="Search results - Stackoverflow", +            url=SEARCH_URL.format(query=encoded_search_query), +            description=f"Here are the top {len(top5)} results:", +            color=Colours.orange +        ) +        for item in top5: +            embed.add_field( +                name=unescape(item['title']), +                value=( +                    f"[{Emojis.reddit_upvote} {item['score']}    " +                    f"{Emojis.stackoverflow_views} {item['view_count']}     " +                    f"{Emojis.reddit_comments} {item['answer_count']}   " +                    f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]" +                    f"({item['link']})" +                ), +                inline=False) +        embed.set_footer(text="View the original link for more results.") +        try: +            await ctx.send(embed=embed) +        except HTTPException: +            search_query_too_long = Embed( +                title="Your search query is too long, please try shortening your search query", +                color=Colours.soft_red +            ) +            await ctx.send(embed=search_query_too_long) + + +def setup(bot: bot.Bot) -> None: +    """Load the Stackoverflow Cog.""" +    bot.add_cog(Stackoverflow(bot)) diff --git a/bot/exts/evergreen/status_codes.py b/bot/exts/evergreen/status_codes.py index 3725afa8..7f6d160d 100644 --- a/bot/exts/evergreen/status_codes.py +++ b/bot/exts/evergreen/status_codes.py @@ -1,6 +1,10 @@ +from random import choice +  import discord  from discord.ext import commands +from bot.bot import Bot +  HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg"  HTTP_CAT_URL = "https://http.cat/{code}.jpg"  STATUS_TEMPLATE = '**Status: {code}**' @@ -9,23 +13,27 @@ ERR_UNKNOWN = 'Error attempting to retrieve status Floof for {code}.'  class HTTPStatusCodes(commands.Cog): -    """Commands that give HTTP statuses described and visualized by cats and dogs.""" +    """ +    Fetch an image depicting HTTP status codes as a dog or a cat. + +    If neither animal is selected a cat or dog is chosen randomly for the given status code. +    """ -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot):          self.bot = bot -    @commands.group(name="http_status", aliases=("status", "httpstatus")) -    async def http_status_group(self, ctx: commands.Context) -> None: -        """Group containing dog and cat http status code commands.""" -        if not ctx.invoked_subcommand: -            await ctx.send_help(ctx.command) +    @commands.group(name="http_status", aliases=("status", "httpstatus"), invoke_without_command=True) +    async def http_status_group(self, ctx: commands.Context, code: int) -> None: +        """Choose a cat or dog randomly for the given status code.""" +        subcmd = choice((self.http_cat, self.http_dog)) +        await subcmd(ctx, code) -    @http_status_group.command(name='cat') +    @http_status_group.command(name="cat")      async def http_cat(self, ctx: commands.Context, code: int) -> None:          """Assemble Cat URL and build Embed."""          await self.build_embed(url=HTTP_CAT_URL.format(code=code), ctx=ctx, code=code) -    @http_status_group.command(name='dog') +    @http_status_group.command(name="dog")      async def http_dog(self, ctx: commands.Context, code: int) -> None:          """Assemble Dog URL and build Embed."""          await self.build_embed(url=HTTP_DOG_URL.format(code=code), ctx=ctx, code=code) @@ -46,6 +54,6 @@ class HTTPStatusCodes(commands.Cog):                  ) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the HTTPStatusCodes cog."""      bot.add_cog(HTTPStatusCodes(bot)) diff --git a/bot/exts/evergreen/tic_tac_toe.py b/bot/exts/evergreen/tic_tac_toe.py index e1190502..164e056d 100644 --- a/bot/exts/evergreen/tic_tac_toe.py +++ b/bot/exts/evergreen/tic_tac_toe.py @@ -10,8 +10,8 @@ from bot.constants import Emojis  from bot.utils.pagination import LinePaginator  CONFIRMATION_MESSAGE = ( -    "{opponent}, {requester} wants to play Tic-Tac-Toe against you. React to this message with " -    f"{Emojis.confirmation} to accept or with {Emojis.decline} to decline." +    "{opponent}, {requester} wants to play Tic-Tac-Toe against you." +    f"\nReact to this message with {Emojis.confirmation} to accept or with {Emojis.decline} to decline."  ) @@ -58,7 +58,7 @@ class Player:              )          try: -            react, _ = await self.ctx.bot.wait_for('reaction_add', timeout=30.0, check=check_for_move) +            react, _ = await self.ctx.bot.wait_for("reaction_add", timeout=30.0, check=check_for_move)          except asyncio.TimeoutError:              return True, None          else: @@ -79,7 +79,7 @@ class AI:          """Get move from AI. AI use Minimax strategy."""          possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())] -        for symbol in (Emojis.o, Emojis.x): +        for symbol in (Emojis.o_square, Emojis.x_square):              for move in possible_moves:                  board_copy = board.copy()                  board_copy[move] = symbol @@ -246,14 +246,13 @@ def is_requester_free() -> t.Callable:  class TicTacToe(Cog):      """TicTacToe cog contains tic-tac-toe game commands.""" -    def __init__(self, bot: Bot): -        self.bot = bot +    def __init__(self):          self.games: t.List[Game] = []      @guild_only()      @is_channel_free()      @is_requester_free() -    @group(name="tictactoe", aliases=("ttt",), invoke_without_command=True) +    @group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True)      async def tic_tac_toe(self, ctx: Context, opponent: t.Optional[discord.User]) -> None:          """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field."""          if opponent == ctx.author: @@ -266,16 +265,20 @@ class TicTacToe(Cog):              return          if opponent is None:              game = Game( -                [Player(ctx.author, ctx, Emojis.x), AI(Emojis.o)], +                [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)],                  ctx              )          else:              game = Game( -                [Player(ctx.author, ctx, Emojis.x), Player(opponent, ctx, Emojis.o)], +                [Player(ctx.author, ctx, Emojis.x_square), Player(opponent, ctx, Emojis.o_square)],                  ctx              )          self.games.append(game)          if opponent is not None: +            if opponent.bot:  # check whether the opponent is a bot or not +                await ctx.send("You can't play Tic-Tac-Toe with bots!") +                return +              confirmed, msg = await game.get_confirmation()              if not confirmed: @@ -314,10 +317,19 @@ class TicTacToe(Cog):              await ctx.send("Game don't exist.")              return          game = self.games[game_id - 1] -        await ctx.send(f"{game.winner} :trophy: vs {game.loser}") -        await ctx.send(game.format_board()) + +        if game.draw: +            description = f"{game.players[0]} vs {game.players[1]} (draw)\n\n{game.format_board()}" +        else: +            description = f"{game.winner} :trophy: vs {game.loser}\n\n{game.format_board()}" + +        embed = discord.Embed( +            title=f"Match #{game_id} Game Board", +            description=description, +        ) +        await ctx.send(embed=embed)  def setup(bot: Bot) -> None: -    """Load TicTacToe Cog.""" -    bot.add_cog(TicTacToe(bot)) +    """Load the TicTacToe cog.""" +    bot.add_cog(TicTacToe()) diff --git a/bot/exts/evergreen/timed.py b/bot/exts/evergreen/timed.py new file mode 100644 index 00000000..2ea6b419 --- /dev/null +++ b/bot/exts/evergreen/timed.py @@ -0,0 +1,48 @@ +from copy import copy +from time import perf_counter + +from discord import Message +from discord.ext import commands + +from bot.bot import Bot + + +class TimedCommands(commands.Cog): +    """Time the command execution of a command.""" + +    @staticmethod +    async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context: +        """Get a new execution context for a command.""" +        msg: Message = copy(ctx.message) +        msg.content = f"{ctx.prefix}{command}" + +        return await ctx.bot.get_context(msg) + +    @commands.command(name="timed", aliases=("time", "t")) +    async def timed(self, ctx: commands.Context, *, command: str) -> None: +        """Time the command execution of a command.""" +        new_ctx = await self.create_execution_context(ctx, command) + +        ctx.subcontext = new_ctx + +        if not ctx.subcontext.command: +            help_command = f"{ctx.prefix}help" +            error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands." + +            await ctx.send(error) +            return + +        if new_ctx.command.qualified_name == "timed": +            await ctx.send("You are not allowed to time the execution of the `timed` command.") +            return + +        t_start = perf_counter() +        await new_ctx.command.invoke(new_ctx) +        t_end = perf_counter() + +        await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.") + + +def setup(bot: Bot) -> None: +    """Load the Timed cog.""" +    bot.add_cog(TimedCommands()) diff --git a/bot/exts/evergreen/trivia_quiz.py b/bot/exts/evergreen/trivia_quiz.py index fe692c2a..bc25cbf7 100644 --- a/bot/exts/evergreen/trivia_quiz.py +++ b/bot/exts/evergreen/trivia_quiz.py @@ -1,56 +1,239 @@  import asyncio  import json  import logging +import operator  import random +from dataclasses import dataclass  from pathlib import Path +from typing import Callable, List, Optional  import discord  from discord.ext import commands -from fuzzywuzzy import fuzz - -from bot.constants import Roles +from rapidfuzz import fuzz +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, Roles  logger = logging.getLogger(__name__) +DEFAULT_QUESTION_LIMIT = 6 +STANDARD_VARIATION_TOLERANCE = 88 +DYNAMICALLY_GEN_VARIATION_TOLERANCE = 97  WRONG_ANS_RESPONSE = [      "No one answered correctly!", -    "Better luck next time" +    "Better luck next time...", +] + +N_PREFIX_STARTS_AT = 5 +N_PREFIXES = [ +    "penta", "hexa", "hepta", "octa", "nona", +    "deca", "hendeca", "dodeca", "trideca", "tetradeca", +] + +PLANETS = [ +    ("1st", "Mercury"), +    ("2nd", "Venus"), +    ("3rd", "Earth"), +    ("4th", "Mars"), +    ("5th", "Jupiter"), +    ("6th", "Saturn"), +    ("7th", "Uranus"), +    ("8th", "Neptune"), +] + +TAXONOMIC_HIERARCHY = [ +    "species", "genus", "family", "order", +    "class", "phylum", "kingdom", "domain",  ] +UNITS_TO_BASE_UNITS = { +    "hertz": ("(unit of frequency)", "s^-1"), +    "newton": ("(unit of force)", "m*kg*s^-2"), +    "pascal": ("(unit of pressure & stress)", "m^-1*kg*s^-2"), +    "joule": ("(unit of energy & quantity of heat)", "m^2*kg*s^-2"), +    "watt": ("(unit of power)", "m^2*kg*s^-3"), +    "coulomb": ("(unit of electric charge & quantity of electricity)", "s*A"), +    "volt": ("(unit of voltage & electromotive force)", "m^2*kg*s^-3*A^-1"), +    "farad": ("(unit of capacitance)", "m^-2*kg^-1*s^4*A^2"), +    "ohm": ("(unit of electric resistance)", "m^2*kg*s^-3*A^-2"), +    "weber": ("(unit of magnetic flux)", "m^2*kg*s^-2*A^-1"), +    "tesla": ("(unit of magnetic flux density)", "kg*s^-2*A^-1"), +} + + +@dataclass(frozen=True) +class QuizEntry: +    """Dataclass for a quiz entry (a question and a string containing answers separated by commas).""" + +    question: str +    answer: str + + +def linear_system(q_format: str, a_format: str) -> QuizEntry: +    """Generate a system of linear equations with two unknowns.""" +    x, y = random.randint(2, 5), random.randint(2, 5) +    answer = a_format.format(x, y) + +    coeffs = random.sample(range(1, 6), 4) + +    question = q_format.format( +        coeffs[0], +        coeffs[1], +        coeffs[0] * x + coeffs[1] * y, +        coeffs[2], +        coeffs[3], +        coeffs[2] * x + coeffs[3] * y, +    ) + +    return QuizEntry(question, answer) + + +def mod_arith(q_format: str, a_format: str) -> QuizEntry: +    """Generate a basic modular arithmetic question.""" +    quotient, m, b = random.randint(30, 40), random.randint(10, 20), random.randint(200, 350) +    ans = random.randint(0, 9)  # max remainder is 9, since the minimum modulus is 10 +    a = quotient * m + ans - b + +    question = q_format.format(a, b, m) +    answer = a_format.format(ans) + +    return QuizEntry(question, answer) + + +def ngonal_prism(q_format: str, a_format: str) -> QuizEntry: +    """Generate a question regarding vertices on n-gonal prisms.""" +    n = random.randint(0, len(N_PREFIXES) - 1) + +    question = q_format.format(N_PREFIXES[n]) +    answer = a_format.format((n + N_PREFIX_STARTS_AT) * 2) + +    return QuizEntry(question, answer) + + +def imag_sqrt(q_format: str, a_format: str) -> QuizEntry: +    """Generate a negative square root question.""" +    ans_coeff = random.randint(3, 10) + +    question = q_format.format(ans_coeff ** 2) +    answer = a_format.format(ans_coeff) + +    return QuizEntry(question, answer) + + +def binary_calc(q_format: str, a_format: str) -> QuizEntry: +    """Generate a binary calculation question.""" +    a = random.randint(15, 20) +    b = random.randint(10, a) +    oper = random.choice( +        ( +            ("+", operator.add), +            ("-", operator.sub), +            ("*", operator.mul), +        ) +    ) + +    # if the operator is multiplication, lower the values of the two operands to make it easier +    if oper[0] == "*": +        a -= 5 +        b -= 5 + +    question = q_format.format(a, oper[0], b) +    answer = a_format.format(oper[1](a, b)) + +    return QuizEntry(question, answer) + + +def solar_system(q_format: str, a_format: str) -> QuizEntry: +    """Generate a question on the planets of the Solar System.""" +    planet = random.choice(PLANETS) + +    question = q_format.format(planet[0]) +    answer = a_format.format(planet[1]) + +    return QuizEntry(question, answer) + + +def taxonomic_rank(q_format: str, a_format: str) -> QuizEntry: +    """Generate a question on taxonomic classification.""" +    level = random.randint(0, len(TAXONOMIC_HIERARCHY) - 2) + +    question = q_format.format(TAXONOMIC_HIERARCHY[level]) +    answer = a_format.format(TAXONOMIC_HIERARCHY[level + 1]) + +    return QuizEntry(question, answer) + + +def base_units_convert(q_format: str, a_format: str) -> QuizEntry: +    """Generate a SI base units conversion question.""" +    unit = random.choice(list(UNITS_TO_BASE_UNITS)) + +    question = q_format.format( +        unit + " " + UNITS_TO_BASE_UNITS[unit][0] +    ) +    answer = a_format.format( +        UNITS_TO_BASE_UNITS[unit][1] +    ) + +    return QuizEntry(question, answer) + + +DYNAMIC_QUESTIONS_FORMAT_FUNCS = { +    201: linear_system, +    202: mod_arith, +    203: ngonal_prism, +    204: imag_sqrt, +    205: binary_calc, +    301: solar_system, +    302: taxonomic_rank, +    303: base_units_convert, +} +  class TriviaQuiz(commands.Cog):      """A cog for all quiz commands.""" -    def __init__(self, bot: commands.Bot) -> None: +    def __init__(self, bot: Bot) -> None:          self.bot = bot -        self.questions = self.load_questions() +          self.game_status = {}  # A variable to store the game status: either running or not running.          self.game_owners = {}  # A variable to store the person's ID who started the quiz game in a channel. -        self.question_limit = 4 + +        self.questions = self.load_questions() +        self.question_limit = 0 +          self.player_scores = {}  # A variable to store all player's scores for a bot session.          self.game_player_scores = {}  # A variable to store temporary game player's scores. +          self.categories = { -            "general": "Test your general knowledge" -            # "retro": "Questions related to retro gaming." +            "general": "Test your general knowledge.", +            "retro": "Questions related to retro gaming.", +            "math": "General questions about mathematics ranging from grade 8 to grade 12.", +            "science": "Put your understanding of science to the test!", +            "cs": "A large variety of computer science questions.", +            "python": "Trivia on our amazing language, Python!",          }      @staticmethod      def load_questions() -> dict:          """Load the questions from the JSON file."""          p = Path("bot", "resources", "evergreen", "trivia_quiz.json") -        with p.open(encoding="utf8") as json_data: -            questions = json.load(json_data) -            return questions + +        return json.loads(p.read_text(encoding="utf-8"))      @commands.group(name="quiz", aliases=["trivia"], invoke_without_command=True) -    async def quiz_game(self, ctx: commands.Context, category: str = None) -> None: +    async def quiz_game(self, ctx: commands.Context, category: Optional[str], questions: Optional[int]) -> None:          """          Start a quiz!          Questions for the quiz can be selected from the following categories: -        - general : Test your general knowledge. (default) +        - general: Test your general knowledge. +        - retro: Questions related to retro gaming. +        - math: General questions about mathematics ranging from grade 8 to grade 12. +        - science: Put your understanding of science to the test! +        - cs: A large variety of computer science questions. +        - python: Trivia on our amazing language, Python! +          (More to come!)          """          if ctx.channel.id not in self.game_status: @@ -60,11 +243,12 @@ class TriviaQuiz(commands.Cog):              self.game_player_scores[ctx.channel.id] = {}          # Stop game if running. -        if self.game_status[ctx.channel.id] is True: -            return await ctx.send( -                f"Game is already running..." +        if self.game_status[ctx.channel.id]: +            await ctx.send( +                "Game is already running... "                  f"do `{self.bot.command_prefix}quiz stop`"              ) +            return          # Send embed showing available categories if inputted category is invalid.          if category is None: @@ -76,20 +260,46 @@ class TriviaQuiz(commands.Cog):              await ctx.send(embed=embed)              return +        topic = self.questions[category] +        topic_length = len(topic) + +        if questions is None: +            self.question_limit = DEFAULT_QUESTION_LIMIT +        else: +            if questions > topic_length: +                await ctx.send( +                    embed=self.make_error_embed( +                        f"This category only has {topic_length} questions. " +                        "Please input a lower value!" +                    ) +                ) +                return + +            elif questions < 1: +                await ctx.send( +                    embed=self.make_error_embed( +                        "You must choose to complete at least one question. " +                        f"(or enter nothing for the default value of {DEFAULT_QUESTION_LIMIT + 1} questions)" +                    ) +                ) +                return + +            else: +                self.question_limit = questions - 1 +          # Start game if not running. -        if self.game_status[ctx.channel.id] is False: +        if not self.game_status[ctx.channel.id]:              self.game_owners[ctx.channel.id] = ctx.author              self.game_status[ctx.channel.id] = True              start_embed = self.make_start_embed(category)              await ctx.send(embed=start_embed)  # send an embed with the rules -            await asyncio.sleep(1) - -        topic = self.questions[category] +            await asyncio.sleep(5)          done_question = []          hint_no = 0 -        answer = None +        answers = None +          while self.game_status[ctx.channel.id]:              # Exit quiz if number of questions for a round are already sent.              if len(done_question) > self.question_limit and hint_no == 0: @@ -111,34 +321,58 @@ class TriviaQuiz(commands.Cog):                          done_question.append(question_dict["id"])                          break -                q = question_dict["question"] -                answer = question_dict["answer"] +                if "dynamic_id" not in question_dict: +                    question = question_dict["question"] +                    answers = question_dict["answer"].split(", ") + +                    var_tol = STANDARD_VARIATION_TOLERANCE +                else: +                    format_func = DYNAMIC_QUESTIONS_FORMAT_FUNCS[question_dict["dynamic_id"]] + +                    quiz_entry = format_func( +                        question_dict["question"], +                        question_dict["answer"], +                    ) + +                    question, answers = quiz_entry.question, quiz_entry.answer +                    answers = [answers] -                embed = discord.Embed(colour=discord.Colour.gold()) -                embed.title = f"Question #{len(done_question)}" -                embed.description = q -                await ctx.send(embed=embed)  # Send question embed. +                    var_tol = DYNAMICALLY_GEN_VARIATION_TOLERANCE -            # A function to check whether user input is the correct answer(close to the right answer) -            def check(m: discord.Message) -> bool: -                return ( -                    m.channel == ctx.channel -                    and fuzz.ratio(answer.lower(), m.content.lower()) > 85 +                embed = discord.Embed( +                    colour=Colours.gold, +                    title=f"Question #{len(done_question)}", +                    description=question,                  ) +                if img_url := question_dict.get("img_url"): +                    embed.set_image(url=img_url) + +                await ctx.send(embed=embed) + +            def check_func(variation_tolerance: int) -> Callable[[discord.Message], bool]: +                def contains_correct_answer(m: discord.Message) -> bool: +                    return m.channel == ctx.channel and any( +                        fuzz.ratio(answer.lower(), m.content.lower()) > variation_tolerance +                        for answer in answers +                    ) + +                return contains_correct_answer +              try: -                msg = await self.bot.wait_for('message', check=check, timeout=10) +                msg = await self.bot.wait_for("message", check=check_func(var_tol), timeout=10)              except asyncio.TimeoutError:                  # In case of TimeoutError and the game has been stopped, then do nothing. -                if self.game_status[ctx.channel.id] is False: +                if not self.game_status[ctx.channel.id]:                      break -                # if number of hints sent or time alerts sent is less than 2, then send one.                  if hint_no < 2:                      hint_no += 1 +                      if "hints" in question_dict:                          hints = question_dict["hints"] -                        await ctx.send(f"**Hint #{hint_no+1}\n**{hints[hint_no]}") + +                        await ctx.send(f"**Hint #{hint_no}\n**{hints[hint_no - 1]}")                      else:                          await ctx.send(f"{30 - hint_no * 10}s left!") @@ -151,10 +385,17 @@ class TriviaQuiz(commands.Cog):                      response = random.choice(WRONG_ANS_RESPONSE)                      await ctx.send(response) -                    await self.send_answer(ctx.channel, question_dict) + +                    await self.send_answer( +                        ctx.channel, +                        answers, +                        False, +                        question_dict, +                        self.question_limit - len(done_question) + 1, +                    )                      await asyncio.sleep(1) -                    hint_no = 0  # init hint_no = 0 so that 2 hints/time alerts can be sent for the new question. +                    hint_no = 0  # Reset the hint counter so that on the next round, it's in the initial state                      await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id])                      await asyncio.sleep(2) @@ -162,8 +403,7 @@ class TriviaQuiz(commands.Cog):                  if self.game_status[ctx.channel.id] is False:                      break -                # Reduce points by 25 for every hint/time alert that has been sent. -                points = 100 - 25*hint_no +                points = 100 - 25 * hint_no                  if msg.author in self.game_player_scores[ctx.channel.id]:                      self.game_player_scores[ctx.channel.id][msg.author] += points                  else: @@ -178,23 +418,47 @@ class TriviaQuiz(commands.Cog):                  hint_no = 0                  await ctx.send(f"{msg.author.mention} got the correct answer :tada: {points} points!") -                await self.send_answer(ctx.channel, question_dict) + +                await self.send_answer( +                    ctx.channel, +                    answers, +                    True, +                    question_dict, +                    self.question_limit - len(done_question) + 1, +                )                  await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) +                  await asyncio.sleep(2) -    @staticmethod -    def make_start_embed(category: str) -> discord.Embed: +    def make_start_embed(self, category: str) -> discord.Embed:          """Generate a starting/introduction embed for the quiz.""" -        start_embed = discord.Embed(colour=discord.Colour.red()) -        start_embed.title = "Quiz game Starting!!" -        start_embed.description = "Each game consists of 5 questions.\n" -        start_embed.description += "**Rules :**\nNo cheating and have fun!" -        start_embed.description += f"\n **Category** : {category}" -        start_embed.set_footer( -            text="Points for each question reduces by 25 after 10s or after a hint. Total time is 30s per question" +        start_embed = discord.Embed( +            colour=Colours.blue, +            title="A quiz game is starting!", +            description=( +                f"This game consists of {self.question_limit + 1} questions.\n\n" +                "**Rules: **\n" +                "1. Only enclose your answer in backticks when the question tells you to.\n" +                "2. If the question specifies an answer format, follow it or else it won't be accepted.\n" +                "3. You have 30s per question. Points for each question reduces by 25 after 10s or after a hint.\n" +                "4. No cheating and have fun!\n\n" +                f"**Category**: {category}" +            ),          ) +          return start_embed +    @staticmethod +    def make_error_embed(desc: str) -> discord.Embed: +        """Generate an error embed with the given description.""" +        error_embed = discord.Embed( +            colour=Colours.soft_red, +            title=random.choice(NEGATIVE_REPLIES), +            description=desc, +        ) + +        return error_embed +      @quiz_game.command(name="stop")      async def stop_quiz(self, ctx: commands.Context) -> None:          """ @@ -202,21 +466,24 @@ class TriviaQuiz(commands.Cog):          Note: Only mods or the owner of the quiz can stop it.          """ -        if self.game_status[ctx.channel.id] is True: -            # Check if the author is the game starter or a moderator. -            if ( -                ctx.author == self.game_owners[ctx.channel.id] -                or any(Roles.moderator == role.id for role in ctx.author.roles) -            ): -                await ctx.send("Quiz stopped.") -                await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) +        try: +            if self.game_status[ctx.channel.id]: +                # Check if the author is the game starter or a moderator. +                if ctx.author == self.game_owners[ctx.channel.id] or any( +                    Roles.moderator == role.id for role in ctx.author.roles +                ): +                    self.game_status[ctx.channel.id] = False +                    del self.game_owners[ctx.channel.id] +                    self.game_player_scores[ctx.channel.id] = {} + +                    await ctx.send("Quiz stopped.") +                    await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) -                self.game_status[ctx.channel.id] = False -                del self.game_owners[ctx.channel.id] -                self.game_player_scores[ctx.channel.id] = {} +                else: +                    await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!")              else: -                await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!") -        else: +                await ctx.send("No quiz running.") +        except KeyError:              await ctx.send("No quiz running.")      @quiz_game.command(name="leaderboard") @@ -226,18 +493,20 @@ class TriviaQuiz(commands.Cog):      @staticmethod      async def send_score(channel: discord.TextChannel, player_data: dict) -> None: -        """A function which sends the score.""" +        """Send the current scores of players in the game channel."""          if len(player_data) == 0:              await channel.send("No one has made it onto the leaderboard yet.")              return -        embed = discord.Embed(colour=discord.Colour.blue()) -        embed.title = "Score Board" -        embed.description = "" +        embed = discord.Embed( +            colour=Colours.blue, +            title="Score Board", +            description="", +        ) -        sorted_dict = sorted(player_data.items(), key=lambda a: a[1], reverse=True) +        sorted_dict = sorted(player_data.items(), key=operator.itemgetter(1), reverse=True)          for item in sorted_dict: -            embed.description += f"{item[0]} : {item[1]}\n" +            embed.description += f"{item[0]}: {item[1]}\n"          await channel.send(embed=embed) @@ -250,7 +519,6 @@ class TriviaQuiz(commands.Cog):              # Check if more than 1 player has highest points.              if no_of_winners > 1: -                word = "You guys"                  winners = []                  points_copy = list(player_data.values()).copy() @@ -261,44 +529,65 @@ class TriviaQuiz(commands.Cog):                  winners_mention = " ".join(winner.mention for winner in winners)              else: -                word = "You"                  author_index = list(player_data.values()).index(highest_points)                  winner = list(player_data.keys())[author_index]                  winners_mention = winner.mention              await channel.send(                  f"Congratulations {winners_mention} :tada: " -                f"{word} have won this quiz game with a grand total of {highest_points} points!" +                f"You have won this quiz game with a grand total of {highest_points} points!"              )      def category_embed(self) -> discord.Embed:          """Build an embed showing all available trivia categories.""" -        embed = discord.Embed(colour=discord.Colour.blue()) -        embed.title = "The available question categories are:" +        embed = discord.Embed( +            colour=Colours.blue, +            title="The available question categories are:", +            description="", +        ) +          embed.set_footer(text="If a category is not chosen, a random one will be selected.") -        embed.description = ""          for cat, description in self.categories.items(): -            embed.description += f"**- {cat.capitalize()}**\n{description.capitalize()}\n" +            embed.description += ( +                f"**- {cat.capitalize()}**\n" +                f"{description.capitalize()}\n" +            )          return embed      @staticmethod -    async def send_answer(channel: discord.TextChannel, question_dict: dict) -> None: +    async def send_answer( +        channel: discord.TextChannel, +        answers: List[str], +        answer_is_correct: bool, +        question_dict: dict, +        q_left: int, +    ) -> None:          """Send the correct answer of a question to the game channel.""" -        answer = question_dict["answer"] -        info = question_dict["info"] -        embed = discord.Embed(color=discord.Colour.red()) -        embed.title = f"The correct answer is **{answer}**\n" -        embed.description = "" +        info = question_dict.get("info") + +        plurality = " is" if len(answers) == 1 else "s are" -        if info != "": +        embed = discord.Embed( +            color=Colours.bright_green, +            title=( +                ("You got it! " if answer_is_correct else "") +                + f"The correct answer{plurality} **`{', '.join(answers)}`**\n" +            ), +            description="", +        ) + +        if info is not None:              embed.description += f"**Information**\n{info}\n\n" -        embed.description += "Let's move to the next question.\nRemaining questions: " +        embed.description += ( +            ("Let's move to the next question." if q_left > 0 else "") +            + f"\nRemaining questions: {q_left}" +        )          await channel.send(embed=embed) -def setup(bot: commands.Bot) -> None: -    """Load the cog.""" +def setup(bot: Bot) -> None: +    """Load the TriviaQuiz cog."""      bot.add_cog(TriviaQuiz(bot)) diff --git a/bot/exts/evergreen/uptime.py b/bot/exts/evergreen/uptime.py deleted file mode 100644 index a9ad9dfb..00000000 --- a/bot/exts/evergreen/uptime.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -import arrow -from dateutil.relativedelta import relativedelta -from discord.ext import commands - -from bot import start_time - -log = logging.getLogger(__name__) - - -class Uptime(commands.Cog): -    """A cog for posting the bot's uptime.""" - -    def __init__(self, bot: commands.Bot): -        self.bot = bot - -    @commands.command(name="uptime") -    async def uptime(self, ctx: commands.Context) -> None: -        """Responds with the uptime of the bot.""" -        difference = relativedelta(start_time - arrow.utcnow()) -        uptime_string = start_time.shift( -            seconds=-difference.seconds, -            minutes=-difference.minutes, -            hours=-difference.hours, -            days=-difference.days -        ).humanize() -        await ctx.send(f"I started up {uptime_string}.") - - -def setup(bot: commands.Bot) -> None: -    """Uptime Cog load.""" -    bot.add_cog(Uptime(bot)) diff --git a/bot/exts/evergreen/wikipedia.py b/bot/exts/evergreen/wikipedia.py index 068c4f43..27e68397 100644 --- a/bot/exts/evergreen/wikipedia.py +++ b/bot/exts/evergreen/wikipedia.py @@ -2,25 +2,35 @@ import logging  import re  from datetime import datetime  from html import unescape -from typing import List, Optional +from typing import List  from discord import Color, Embed, TextChannel  from discord.ext import commands  from bot.bot import Bot  from bot.utils import LinePaginator +from bot.utils.exceptions import APIError  log = logging.getLogger(__name__)  SEARCH_API = ( -    "https://en.wikipedia.org/w/api.php?action=query&list=search&prop=info&inprop=url&utf8=&" -    "format=json&origin=*&srlimit={number_of_results}&srsearch={string}" +    "https://en.wikipedia.org/w/api.php"  ) +WIKI_PARAMS = { +    "action": "query", +    "list": "search", +    "prop": "info", +    "inprop": "url", +    "utf8": "", +    "format": "json", +    "origin": "*", + +}  WIKI_THUMBNAIL = (      "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg"      "/330px-Wikipedia-logo-v2.svg.png"  ) -WIKI_SNIPPET_REGEX = r'(<!--.*?-->|<[^>]*>)' +WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)"  WIKI_SEARCH_RESULT = (      "**[{name}]({url})**\n"      "{description}\n" @@ -33,46 +43,39 @@ class WikipediaSearch(commands.Cog):      def __init__(self, bot: Bot):          self.bot = bot -    async def wiki_request(self, channel: TextChannel, search: str) -> Optional[List[str]]: +    async def wiki_request(self, channel: TextChannel, search: str) -> List[str]:          """Search wikipedia search string and return formatted first 10 pages found.""" -        url = SEARCH_API.format(number_of_results=10, string=search) -        async with self.bot.http_session.get(url=url) as resp: -            if resp.status == 200: -                raw_data = await resp.json() -                number_of_results = raw_data['query']['searchinfo']['totalhits'] - -                if number_of_results: -                    results = raw_data['query']['search'] -                    lines = [] - -                    for article in results: -                        line = WIKI_SEARCH_RESULT.format( -                            name=article['title'], -                            description=unescape( -                                re.sub( -                                    WIKI_SNIPPET_REGEX, '', article['snippet'] -                                ) -                            ), -                            url=f"https://en.wikipedia.org/?curid={article['pageid']}" -                        ) -                        lines.append(line) - -                    return lines - -                else: -                    await channel.send( -                        "Sorry, we could not find a wikipedia article using that search term." -                    ) -                    return -            else: +        params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search} +        async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp: +            if resp.status != 200:                  log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`") -                await channel.send( -                    "Whoops, the Wikipedia API is having some issues right now. Try again later." -                ) -                return +                raise APIError("Wikipedia API", resp.status) + +            raw_data = await resp.json() + +            if not raw_data.get("query"): +                if error := raw_data.get("errors"): +                    log.error(f"There was an error while communicating with the Wikipedia API: {error}") +                raise APIError("Wikipedia API", resp.status, error) + +            lines = [] +            if raw_data["query"]["searchinfo"]["totalhits"]: +                for article in raw_data["query"]["search"]: +                    line = WIKI_SEARCH_RESULT.format( +                        name=article["title"], +                        description=unescape( +                            re.sub( +                                WIKI_SNIPPET_REGEX, "", article["snippet"] +                            ) +                        ), +                        url=f"https://en.wikipedia.org/?curid={article['pageid']}" +                    ) +                    lines.append(line) + +            return lines      @commands.cooldown(1, 10, commands.BucketType.user) -    @commands.command(name="wikipedia", aliases=["wiki"]) +    @commands.command(name="wikipedia", aliases=("wiki",))      async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None:          """Sends paginated top 10 results of Wikipedia search.."""          contents = await self.wiki_request(ctx.channel, search) @@ -87,8 +90,12 @@ class WikipediaSearch(commands.Cog):              await LinePaginator.paginate(                  contents, ctx, embed              ) +        else: +            await ctx.send( +                "Sorry, we could not find a wikipedia article using that search term." +            )  def setup(bot: Bot) -> None: -    """Wikipedia Cog load.""" +    """Load the WikipediaSearch cog."""      bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/evergreen/wolfram.py b/bot/exts/evergreen/wolfram.py index 437d9e1a..26674d37 100644 --- a/bot/exts/evergreen/wolfram.py +++ b/bot/exts/evergreen/wolfram.py @@ -1,7 +1,7 @@  import logging  from io import BytesIO  from typing import Callable, List, Optional, Tuple -from urllib import parse +from urllib.parse import urlencode  import arrow  import discord @@ -9,6 +9,7 @@ from discord import Embed  from discord.ext import commands  from discord.ext.commands import BucketType, Cog, Context, check, group +from bot.bot import Bot  from bot.constants import Colours, STAFF_ROLES, Wolfram  from bot.utils.pagination import ImagePaginator @@ -16,7 +17,7 @@ log = logging.getLogger(__name__)  APPID = Wolfram.key  DEFAULT_OUTPUT_FORMAT = "JSON" -QUERY = "http://api.wolframalpha.com/v2/{request}?{data}" +QUERY = "http://api.wolframalpha.com/v2/{request}"  WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1"  MAX_PODS = 20 @@ -39,9 +40,11 @@ async def send_embed(      """Generate & send a response embed with Wolfram as the author."""      embed = Embed(colour=colour)      embed.description = message_txt -    embed.set_author(name="Wolfram Alpha", -                     icon_url=WOLF_IMAGE, -                     url="https://www.wolframalpha.com/") +    embed.set_author( +        name="Wolfram Alpha", +        icon_url=WOLF_IMAGE, +        url="https://www.wolframalpha.com/" +    )      if footer:          embed.set_footer(text=footer) @@ -55,14 +58,15 @@ def custom_cooldown(*ignore: List[int]) -> Callable:      """      Implement per-user and per-guild cooldowns for requests to the Wolfram API. -    A list of roles may be provided to ignore the per-user cooldown +    A list of roles may be provided to ignore the per-user cooldown.      """      async def predicate(ctx: Context) -> bool: -        if ctx.invoked_with == 'help': +        if ctx.invoked_with == "help":              # if the invoked command is help we don't want to increase the ratelimits since it's not actually              # invoking the command/making a request, so instead just check if the user/guild are on cooldown.              guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0  # if guild is on cooldown -            if not any(r.id in ignore for r in ctx.author.roles):  # check user bucket if user is not ignored +            # check the message is in a guild, and check user bucket if user is not ignored +            if ctx.guild and not any(r.id in ignore for r in ctx.author.roles):                  return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0              return guild_cooldown @@ -101,10 +105,10 @@ def custom_cooldown(*ignore: List[int]) -> Callable:      return check(predicate) -async def get_pod_pages(ctx: Context, bot: commands.Bot, query: str) -> Optional[List[Tuple]]: +async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[List[Tuple]]:      """Get the Wolfram API pod pages for the provided query.""" -    async with ctx.channel.typing(): -        url_str = parse.urlencode({ +    async with ctx.typing(): +        params = {              "input": query,              "appid": APPID,              "output": DEFAULT_OUTPUT_FORMAT, @@ -112,27 +116,27 @@ async def get_pod_pages(ctx: Context, bot: commands.Bot, query: str) -> Optional              "location": "the moon",              "latlong": "0.0,0.0",              "ip": "1.1.1.1" -        }) -        request_url = QUERY.format(request="query", data=url_str) +        } +        request_url = QUERY.format(request="query") -        async with bot.http_session.get(request_url) as response: -            json = await response.json(content_type='text/plain') +        async with bot.http_session.get(url=request_url, params=params) as response: +            json = await response.json(content_type="text/plain")          result = json["queryresult"] - +        log_full_url = f"{request_url}?{urlencode(params)}"          if result["error"]:              # API key not set up correctly              if result["error"]["msg"] == "Invalid appid":                  message = "Wolfram API key is invalid or missing."                  log.warning(                      "API key seems to be missing, or invalid when " -                    f"processing a wolfram request: {url_str}, Response: {json}" +                    f"processing a wolfram request: {log_full_url}, Response: {json}"                  )                  await send_embed(ctx, message)                  return              message = "Something went wrong internally with your request, please notify staff!" -            log.warning(f"Something went wrong getting a response from wolfram: {url_str}, Response: {json}") +            log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}")              await send_embed(ctx, message)              return @@ -161,25 +165,25 @@ async def get_pod_pages(ctx: Context, bot: commands.Bot, query: str) -> Optional  class Wolfram(Cog):      """Commands for interacting with the Wolfram|Alpha API.""" -    def __init__(self, bot: commands.Bot): +    def __init__(self, bot: Bot):          self.bot = bot      @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)      @custom_cooldown(*STAFF_ROLES)      async def wolfram_command(self, ctx: Context, *, query: str) -> None:          """Requests all answers on a single image, sends an image of all related pods.""" -        url_str = parse.urlencode({ +        params = {              "i": query,              "appid": APPID,              "location": "the moon",              "latlong": "0.0,0.0",              "ip": "1.1.1.1" -        }) -        query = QUERY.format(request="simple", data=url_str) +        } +        request_url = QUERY.format(request="simple")          # Give feedback that the bot is working. -        async with ctx.channel.typing(): -            async with self.bot.http_session.get(query) as response: +        async with ctx.typing(): +            async with self.bot.http_session.get(url=request_url, params=params) as response:                  status = response.status                  image_bytes = await response.read() @@ -187,11 +191,11 @@ class Wolfram(Cog):              image_url = "attachment://image.png"              if status == 501: -                message = "Failed to get response" +                message = "Failed to get response."                  footer = ""                  color = Colours.soft_red              elif status == 400: -                message = "No input found" +                message = "No input found."                  footer = ""                  color = Colours.soft_red              elif status == 403: @@ -220,9 +224,11 @@ class Wolfram(Cog):              return          embed = Embed() -        embed.set_author(name="Wolfram Alpha", -                         icon_url=WOLF_IMAGE, -                         url="https://www.wolframalpha.com/") +        embed.set_author( +            name="Wolfram Alpha", +            icon_url=WOLF_IMAGE, +            url="https://www.wolframalpha.com/" +        )          embed.colour = Colours.soft_orange          await ImagePaginator.paginate(pages, ctx, embed) @@ -251,28 +257,28 @@ class Wolfram(Cog):      @custom_cooldown(*STAFF_ROLES)      async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:          """Requests an answer to a simple question.""" -        url_str = parse.urlencode({ +        params = {              "i": query,              "appid": APPID,              "location": "the moon",              "latlong": "0.0,0.0",              "ip": "1.1.1.1" -        }) -        query = QUERY.format(request="result", data=url_str) +        } +        request_url = QUERY.format(request="result")          # Give feedback that the bot is working. -        async with ctx.channel.typing(): -            async with self.bot.http_session.get(query) as response: +        async with ctx.typing(): +            async with self.bot.http_session.get(url=request_url, params=params) as response:                  status = response.status                  response_text = await response.text()              if status == 501: -                message = "Failed to get response" +                message = "Failed to get response."                  color = Colours.soft_red              elif status == 400: -                message = "No input found" +                message = "No input found."                  color = Colours.soft_red -            elif response_text == "Error 1: Invalid appid": +            elif response_text == "Error 1: Invalid appid.":                  message = "Wolfram API key is invalid or missing."                  color = Colours.soft_red              else: @@ -282,6 +288,6 @@ class Wolfram(Cog):              await send_embed(ctx, message, color) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None:      """Load the Wolfram cog."""      bot.add_cog(Wolfram(bot)) diff --git a/bot/exts/evergreen/wonder_twins.py b/bot/exts/evergreen/wonder_twins.py index afc5346e..40edf785 100644 --- a/bot/exts/evergreen/wonder_twins.py +++ b/bot/exts/evergreen/wonder_twins.py @@ -2,15 +2,15 @@ import random  from pathlib import Path  import yaml -from discord.ext.commands import Bot, Cog, Context, command +from discord.ext.commands import Cog, Context, command + +from bot.bot import Bot  class WonderTwins(Cog):      """Cog for a Wonder Twins inspired command.""" -    def __init__(self, bot: Bot): -        self.bot = bot - +    def __init__(self):          with open(Path.cwd() / "bot" / "resources" / "evergreen" / "wonder_twins.yaml", "r", encoding="utf-8") as f:              info = yaml.load(f, Loader=yaml.FullLoader)              self.water_types = info["water_types"] @@ -38,7 +38,7 @@ class WonderTwins(Cog):              object_name = self.append_onto(adjective, object_name)          return f"{object_name} of {water_type}" -    @command(name="formof", aliases=["wondertwins", "wondertwin", "fo"]) +    @command(name="formof", aliases=("wondertwins", "wondertwin", "fo"))      async def form_of(self, ctx: Context) -> None:          """Command to send a Wonder Twins inspired phrase to the user invoking the command."""          await ctx.send(f"Form of {self.format_phrase()}!") @@ -46,4 +46,4 @@ class WonderTwins(Cog):  def setup(bot: Bot) -> None:      """Load the WonderTwins cog.""" -    bot.add_cog(WonderTwins(bot)) +    bot.add_cog(WonderTwins()) diff --git a/bot/exts/evergreen/xkcd.py b/bot/exts/evergreen/xkcd.py index 1ff98ca2..c98830bc 100644 --- a/bot/exts/evergreen/xkcd.py +++ b/bot/exts/evergreen/xkcd.py @@ -53,7 +53,7 @@ class XKCD(Cog):              await ctx.send(embed=embed)              return -        comic = randint(1, self.latest_comic_info['num']) if comic is None else comic.group(0) +        comic = randint(1, self.latest_comic_info["num"]) if comic is None else comic.group(0)          if comic == "latest":              info = self.latest_comic_info @@ -69,7 +69,7 @@ class XKCD(Cog):                      return          embed.title = f"XKCD comic #{info['num']}" -        embed.description = info['alt'] +        embed.description = info["alt"]          embed.url = f"{BASE_URL}/{info['num']}"          if info["img"][-3:] in ("jpg", "png", "gif"): @@ -87,5 +87,5 @@ class XKCD(Cog):  def setup(bot: Bot) -> None: -    """Loading the XKCD cog.""" +    """Load the XKCD cog."""      bot.add_cog(XKCD(bot)) | 
