diff options
Diffstat (limited to 'bot/exts/evergreen')
46 files changed, 0 insertions, 9801 deletions
diff --git a/bot/exts/evergreen/__init__.py b/bot/exts/evergreen/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/bot/exts/evergreen/__init__.py +++ /dev/null diff --git a/bot/exts/evergreen/avatar_modification/__init__.py b/bot/exts/evergreen/avatar_modification/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/bot/exts/evergreen/avatar_modification/__init__.py +++ /dev/null diff --git a/bot/exts/evergreen/avatar_modification/_effects.py b/bot/exts/evergreen/avatar_modification/_effects.py deleted file mode 100644 index df741973..00000000 --- a/bot/exts/evergreen/avatar_modification/_effects.py +++ /dev/null @@ -1,296 +0,0 @@ -import math -import random -from io import BytesIO -from pathlib import Path -from typing import Callable, Optional - -import discord -from PIL import Image, ImageDraw, ImageOps - -from bot.constants import Colours - - -class PfpEffects: - """ - Implements various image modifying effects, for the PfpModify cog. - - All of these functions are slow, and blocking, so they should be ran in executors. - """ - - @staticmethod - def apply_effect(image_bytes: bytes, effect: Callable, filename: str, *args) -> discord.File: - """Applies the given effect to the image passed to it.""" - im = Image.open(BytesIO(image_bytes)) - im = im.convert("RGBA") - im = im.resize((1024, 1024)) - im = effect(im, *args) - - bufferedio = BytesIO() - im.save(bufferedio, format="PNG") - bufferedio.seek(0) - - return discord.File(bufferedio, filename=filename) - - @staticmethod - def closest(x: tuple[int, int, int]) -> tuple[int, int, int]: - """ - Finds the closest "easter" colour to a given pixel. - - Returns a merge between the original colour and the closest colour. - """ - r1, g1, b1 = x - - def distance(point: tuple[int, int, int]) -> int: - """Finds the difference between a pastel colour and the original pixel colour.""" - r2, g2, b2 = point - return (r1 - r2) ** 2 + (g1 - g2) ** 2 + (b1 - b2) ** 2 - - closest_colours = sorted(Colours.easter_like_colours, key=distance) - r2, g2, b2 = closest_colours[0] - r = (r1 + r2) // 2 - g = (g1 + g2) // 2 - b = (b1 + b2) // 2 - - return r, g, b - - @staticmethod - def crop_avatar_circle(avatar: Image.Image) -> Image.Image: - """This crops the avatar given into a circle.""" - mask = Image.new("L", avatar.size, 0) - draw = ImageDraw.Draw(mask) - draw.ellipse((0, 0) + avatar.size, fill=255) - avatar.putalpha(mask) - return avatar - - @staticmethod - def crop_ring(ring: Image.Image, px: int) -> Image.Image: - """This crops the given ring into a circle.""" - mask = Image.new("L", ring.size, 0) - draw = ImageDraw.Draw(mask) - draw.ellipse((0, 0) + ring.size, fill=255) - draw.ellipse((px, px, 1024-px, 1024-px), fill=0) - ring.putalpha(mask) - return ring - - @staticmethod - def pridify_effect(image: Image.Image, pixels: int, flag: str) -> Image.Image: - """Applies the given pride effect to the given image.""" - image = PfpEffects.crop_avatar_circle(image) - - ring = Image.open(Path(f"bot/resources/pride/flags/{flag}.png")).resize((1024, 1024)) - ring = ring.convert("RGBA") - ring = PfpEffects.crop_ring(ring, pixels) - - image.alpha_composite(ring, (0, 0)) - return image - - @staticmethod - def eight_bitify_effect(image: Image.Image) -> Image.Image: - """ - Applies the 8bit effect to the given image. - - This is done by reducing the image to 32x32 and then back up to 1024x1024. - We then quantize the image before returning too. - """ - image = image.resize((32, 32), resample=Image.NEAREST) - image = image.resize((1024, 1024), resample=Image.NEAREST) - return image.quantize() - - @staticmethod - def flip_effect(image: Image.Image) -> Image.Image: - """ - Flips the image horizontally. - - This is done by just using ImageOps.mirror(). - """ - image = ImageOps.mirror(image) - - return image - - @staticmethod - def easterify_effect(image: Image.Image, overlay_image: Optional[Image.Image] = None) -> Image.Image: - """ - Applies the easter effect to the given image. - - This is done by getting the closest "easter" colour to each pixel and changing the colour - to the half-way RGB value. - - We also then add an overlay image on top in middle right, a chocolate bunny by default. - """ - if overlay_image: - ratio = 64 / overlay_image.height - overlay_image = overlay_image.resize(( - round(overlay_image.width * ratio), - round(overlay_image.height * ratio) - )) - overlay_image = overlay_image.convert("RGBA") - else: - overlay_image = Image.open(Path("bot/resources/easter/chocolate_bunny.png")) - - alpha = image.getchannel("A").getdata() - image = image.convert("RGB") - image = ImageOps.posterize(image, 6) - - data = image.getdata() - data_set = set(data) - easterified_data_set = {} - - for x in data_set: - easterified_data_set[x] = PfpEffects.closest(x) - new_pixel_data = [ - (*easterified_data_set[x], alpha[i]) - if x in easterified_data_set else x - for i, x in enumerate(data) - ] - - im = Image.new("RGBA", image.size) - im.putdata(new_pixel_data) - im.alpha_composite( - overlay_image, - (im.width - overlay_image.width, (im.height - overlay_image.height) // 2) - ) - return im - - @staticmethod - def split_image(img: Image.Image, squares: int) -> list: - """ - Split an image into a selection of squares, specified by the squares argument. - - Explanation: - - 1. It gets the width and the height of the Image passed to the function. - - 2. It gets the root of a number of squares (number of squares) passed, which is called xy. Reason: if let's say - 25 squares (number of squares) were passed, that is the total squares (split pieces) that the image is supposed - to be split into. As it is known, a 2D shape has a height and a width, and in this case the program thinks of it - as rows and columns. Rows multiplied by columns is equal to the passed squares (number of squares). To get rows - and columns, since in this case, each square (split piece) is identical, rows are equal to columns and the - program treats the image as a square-shaped, it gets the root out of the squares (number of squares) passed. - - 3. Now width and height are both of the original Image, Discord PFP, so when it comes to forming the squares, - the program divides the original image's height and width by the xy. In a case of 25 squares (number of squares) - passed, xy would be 5, so if an image was 250x300, x_frac would be 50 and y_frac - 60. Note: - x_frac stands for a fracture of width. The reason it's called that is because it is shorter to use x for width - in mind and then it's just half of the word fracture, same applies to y_frac, just height instead of width. - x_frac and y_frac are width and height of a single square (split piece). - - 4. With left, top, right, bottom, = 0, 0, x_frac, y_frac, the program sets these variables to create the initial - square (split piece). Explanation: all of these 4 variables start at the top left corner of the Image, by adding - value to right and bottom, it's creating the initial square (split piece). - - 5. In the for loop, it keeps adding those squares (split pieces) in a row and once (index + 1) % xy == 0 is - True, it adds to top and bottom to lower them and reset right and left to recreate the initial space between - them, forming a square (split piece), it also adds the newly created square (split piece) into the new_imgs list - where it stores them. The program keeps repeating this process till all 25 squares get added to the list. - - 6. It returns new_imgs, a list of squares (split pieces). - """ - width, heigth = img.size - - xy = math.sqrt(squares) - - x_frac = width // xy - y_frac = heigth // xy - - left, top, right, bottom, = 0, 0, x_frac, y_frac - - new_imgs = [] - - for index in range(squares): - new_img = img.crop((left, top, right, bottom)) - new_imgs.append(new_img) - - if (index + 1) % xy == 0: - top += y_frac - bottom += y_frac - left = 0 - right = x_frac - else: - left += x_frac - right += x_frac - - return new_imgs - - @staticmethod - def join_images(images: list[Image.Image]) -> Image.Image: - """ - Stitches all the image squares into a new image. - - Explanation: - - 1. Shuffles the passed images to randomize the pieces. - - 2. The program gets a single square (split piece) out of the list and defines single_width as the square's width - and single_height as the square's height. - - 3. It gets the root of type integer of the number of images (split pieces) in the list and calls it multiplier. - Program then proceeds to calculate total height and width of the new image that it's creating using the same - multiplier. - - 4. The program then defines new_image as the image that it's creating, using the previously obtained total_width - and total_height. - - 5. Now it defines width_multiplier as well as height with values of 0. These will be used to correctly position - squares (split pieces) onto the new_image canvas. - - 6. Similar to how in the split_image function, the program gets the root of number of images in the list. - In split_image function, it was the passed squares (number of squares) instead of a number of imgs in the - list that it got the square of here. - - 7. In the for loop, as it iterates, the program multiplies single_width by width_multiplier to correctly - position a square (split piece) width wise. It then proceeds to paste the newly positioned square (split piece) - onto the new_image. The program increases the width_multiplier by 1 every iteration so the image wouldn't get - pasted in the same spot and the positioning would move accordingly. It makes sure to increase the - width_multiplier before the check, which checks if the end of a row has been reached, - - (index + 1) % pieces == 0, so after it, if it was True, width_multiplier would have been reset to 0 (start of - the row). If the check returns True, the height gets increased by a single square's (split piece) height to - lower the positioning height wise and, as mentioned, the width_multiplier gets reset to 0 and width will - then be calculated from the start of the new row. The for loop finishes once all the squares (split pieces) were - positioned accordingly. - - 8. Finally, it returns the new_image, the randomized squares (split pieces) stitched back into the format of the - original image - user's PFP. - """ - random.shuffle(images) - single_img = images[0] - - single_wdith = single_img.size[0] - single_height = single_img.size[1] - - multiplier = int(math.sqrt(len(images))) - - total_width = multiplier * single_wdith - total_height = multiplier * single_height - - new_image = Image.new("RGBA", (total_width, total_height), (250, 250, 250)) - - width_multiplier = 0 - height = 0 - - squares = math.sqrt(len(images)) - - for index, image in enumerate(images): - width = single_wdith * width_multiplier - - new_image.paste(image, (width, height)) - - width_multiplier += 1 - - if (index + 1) % squares == 0: - width_multiplier = 0 - height += single_height - - return new_image - - @staticmethod - def mosaic_effect(image: Image.Image, squares: int) -> Image.Image: - """ - Applies a mosaic effect to the given image. - - The "squares" argument specifies the number of squares to split - the image into. This should be a square number. - """ - img_squares = PfpEffects.split_image(image, squares) - new_img = PfpEffects.join_images(img_squares) - - return new_img diff --git a/bot/exts/evergreen/avatar_modification/avatar_modify.py b/bot/exts/evergreen/avatar_modification/avatar_modify.py deleted file mode 100644 index 18202902..00000000 --- a/bot/exts/evergreen/avatar_modification/avatar_modify.py +++ /dev/null @@ -1,372 +0,0 @@ -import asyncio -import json -import logging -import math -import string -import unicodedata -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from typing import Callable, Optional, TypeVar, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, Emojis -from bot.exts.evergreen.avatar_modification._effects import PfpEffects -from bot.utils.extensions import invoke_help_command -from bot.utils.halloween import spookifications - -log = logging.getLogger(__name__) - -_EXECUTOR = ThreadPoolExecutor(10) - -FILENAME_STRING = "{effect}_{author}.png" - -MAX_SQUARES = 10_000 - -T = TypeVar("T") - -GENDER_OPTIONS = json.loads(Path("bot/resources/pride/gender_options.json").read_text("utf8")) - - -async def in_executor(func: Callable[..., T], *args) -> T: - """ - Runs the given synchronous function `func` in an executor. - - This is useful for running slow, blocking code within async - functions, so that they don't block the bot. - """ - log.trace(f"Running {func.__name__} in an executor.") - loop = asyncio.get_event_loop() - return await loop.run_in_executor(_EXECUTOR, func, *args) - - -def file_safe_name(effect: str, display_name: str) -> str: - """Returns a file safe filename based on the given effect and display name.""" - valid_filename_chars = f"-_. {string.ascii_letters}{string.digits}" - - file_name = FILENAME_STRING.format(effect=effect, author=display_name) - - # Replace spaces - file_name = file_name.replace(" ", "_") - - # Normalize unicode characters - cleaned_filename = unicodedata.normalize("NFKD", file_name).encode("ASCII", "ignore").decode() - - # Remove invalid filename characters - cleaned_filename = "".join(c for c in cleaned_filename if c in valid_filename_chars) - return cleaned_filename - - -class AvatarModify(commands.Cog): - """Various commands for users to apply affects to their own avatars.""" - - def __init__(self, bot: Bot): - self.bot = bot - - async def _fetch_user(self, user_id: int) -> Optional[discord.User]: - """ - Fetches a user and handles errors. - - This helper function is required as the member cache doesn't always have the most up to date - profile picture. This can lead to errors if the image is deleted from the Discord CDN. - fetch_member can't be used due to the avatar url being part of the user object, and - some weird caching that D.py does - """ - try: - user = await self.bot.fetch_user(user_id) - except discord.errors.NotFound: - log.debug(f"User {user_id} could not be found.") - return None - except discord.HTTPException: - log.exception(f"Exception while trying to retrieve user {user_id} from Discord.") - return None - - return user - - @commands.group(aliases=("avatar_mod", "pfp_mod", "avatarmod", "pfpmod")) - async def avatar_modify(self, ctx: commands.Context) -> None: - """Groups all of the pfp modifying commands to allow a single concurrency limit.""" - if not ctx.invoked_subcommand: - await invoke_help_command(ctx) - - @avatar_modify.command(name="8bitify", root_aliases=("8bitify",)) - async def eightbit_command(self, ctx: commands.Context) -> None: - """Pixelates your avatar and changes the palette to an 8bit one.""" - async with ctx.typing(): - user = await self._fetch_user(ctx.author.id) - if not user: - await ctx.send(f"{Emojis.cross_mark} Could not get user info.") - return - - image_bytes = await user.display_avatar.replace(size=1024).read() - file_name = file_safe_name("eightbit_avatar", ctx.author.display_name) - - file = await in_executor( - PfpEffects.apply_effect, - image_bytes, - PfpEffects.eight_bitify_effect, - file_name - ) - - embed = discord.Embed( - title="Your 8-bit avatar", - description="Here is your avatar. I think it looks all cool and 'retro'." - ) - - embed.set_image(url=f"attachment://{file_name}") - embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=user.display_avatar.url) - - await ctx.send(embed=embed, file=file) - - @avatar_modify.command(name="reverse", root_aliases=("reverse",)) - async def reverse(self, ctx: commands.Context, *, text: Optional[str]) -> None: - """ - Reverses the sent text. - - If no text is provided, the user's profile picture will be reversed. - """ - if text: - await ctx.send(f"> {text[::-1]}", allowed_mentions=discord.AllowedMentions.none()) - return - - async with ctx.typing(): - user = await self._fetch_user(ctx.author.id) - if not user: - await ctx.send(f"{Emojis.cross_mark} Could not get user info.") - return - - image_bytes = await user.display_avatar.replace(size=1024).read() - filename = file_safe_name("reverse_avatar", ctx.author.display_name) - - file = await in_executor( - PfpEffects.apply_effect, - image_bytes, - PfpEffects.flip_effect, - filename - ) - - embed = discord.Embed( - title="Your reversed avatar.", - description="Here is your reversed avatar. I think it is a spitting image of you." - ) - - embed.set_image(url=f"attachment://{filename}") - embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=user.display_avatar.url) - - await ctx.send(embed=embed, file=file) - - @avatar_modify.command(aliases=("easterify",), root_aliases=("easterify", "avatareasterify")) - async def avatareasterify(self, ctx: commands.Context, *colours: Union[discord.Colour, str]) -> None: - """ - This "Easterifies" the user's avatar. - - Given colours will produce a personalised egg in the corner, similar to the egg_decorate command. - If colours are not given, a nice little chocolate bunny will sit in the corner. - Colours are split by spaces, unless you wrap the colour name in double quotes. - Discord colour names, HTML colour names, XKCD colour names and hex values are accepted. - """ - async def send(*args, **kwargs) -> str: - """ - This replaces the original ctx.send. - - When invoking the egg decorating command, the egg itself doesn't print to to the channel. - Returns the message content so that if any errors occur, the error message can be output. - """ - if args: - return args[0] - - async with ctx.typing(): - user = await self._fetch_user(ctx.author.id) - if not user: - await ctx.send(f"{Emojis.cross_mark} Could not get user info.") - return - - egg = None - if colours: - send_message = ctx.send - ctx.send = send # Assigns ctx.send to a fake send - egg = await ctx.invoke(self.bot.get_command("eggdecorate"), *colours) - if isinstance(egg, str): # When an error message occurs in eggdecorate. - await send_message(egg) - return - ctx.send = send_message # Reassigns ctx.send - - image_bytes = await user.display_avatar.replace(size=256).read() - file_name = file_safe_name("easterified_avatar", ctx.author.display_name) - - file = await in_executor( - PfpEffects.apply_effect, - image_bytes, - PfpEffects.easterify_effect, - file_name, - egg - ) - - embed = discord.Embed( - title="Your Lovely Easterified Avatar!", - description="Here is your lovely avatar, all bright and colourful\nwith Easter pastel colours. Enjoy :D" - ) - embed.set_image(url=f"attachment://{file_name}") - embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=user.display_avatar.url) - - await ctx.send(file=file, embed=embed) - - @staticmethod - async def send_pride_image( - ctx: commands.Context, - image_bytes: bytes, - pixels: int, - flag: str, - option: str - ) -> None: - """Gets and sends the image in an embed. Used by the pride commands.""" - async with ctx.typing(): - file_name = file_safe_name("pride_avatar", ctx.author.display_name) - - file = await in_executor( - PfpEffects.apply_effect, - image_bytes, - PfpEffects.pridify_effect, - file_name, - pixels, - flag - ) - - embed = discord.Embed( - title="Your Lovely Pride Avatar!", - description=f"Here is your lovely avatar, surrounded by\n a beautiful {option} flag. Enjoy :D" - ) - embed.set_image(url=f"attachment://{file_name}") - embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.avatar_url) - await ctx.send(file=file, embed=embed) - - @avatar_modify.group( - aliases=("avatarpride", "pridepfp", "prideprofile"), - root_aliases=("prideavatar", "avatarpride", "pridepfp", "prideprofile"), - invoke_without_command=True - ) - async def prideavatar(self, ctx: commands.Context, option: str = "lgbt", pixels: int = 64) -> None: - """ - This surrounds an avatar with a border of a specified LGBT flag. - - This defaults to the LGBT rainbow flag if none is given. - The amount of pixels can be given which determines the thickness of the flag border. - This has a maximum of 512px and defaults to a 64px border. - The full image is 1024x1024. - """ - option = option.lower() - pixels = max(0, min(512, pixels)) - flag = GENDER_OPTIONS.get(option) - if flag is None: - await ctx.send("I don't have that flag!") - return - - async with ctx.typing(): - user = await self._fetch_user(ctx.author.id) - if not user: - await ctx.send(f"{Emojis.cross_mark} Could not get user info.") - return - image_bytes = await user.display_avatar.replace(size=1024).read() - await self.send_pride_image(ctx, image_bytes, pixels, flag, option) - - @prideavatar.command() - async def flags(self, ctx: commands.Context) -> None: - """This lists the flags that can be used with the prideavatar command.""" - choices = sorted(set(GENDER_OPTIONS.values())) - options = "• " + "\n• ".join(choices) - embed = discord.Embed( - title="I have the following flags:", - description=options, - colour=Colours.soft_red - ) - await ctx.send(embed=embed) - - @avatar_modify.command( - aliases=("savatar", "spookify"), - root_aliases=("spookyavatar", "spookify", "savatar"), - brief="Spookify an user's avatar." - ) - async def spookyavatar(self, ctx: commands.Context) -> None: - """This "spookifies" the user's avatar, with a random *spooky* effect.""" - user = await self._fetch_user(ctx.author.id) - if not user: - await ctx.send(f"{Emojis.cross_mark} Could not get user info.") - return - - async with ctx.typing(): - image_bytes = await user.display_avatar.replace(size=1024).read() - - file_name = file_safe_name("spooky_avatar", ctx.author.display_name) - - file = await in_executor( - PfpEffects.apply_effect, - image_bytes, - spookifications.get_random_effect, - file_name - ) - - embed = discord.Embed( - title="Is this you or am I just really paranoid?", - colour=Colours.soft_red - ) - embed.set_image(url=f"attachment://{file_name}") - embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.display_avatar.url) - - await ctx.send(file=file, embed=embed) - - @avatar_modify.command(name="mosaic", root_aliases=("mosaic",)) - async def mosaic_command(self, ctx: commands.Context, squares: int = 16) -> None: - """Splits your avatar into x squares, randomizes them and stitches them back into a new image!""" - async with ctx.typing(): - user = await self._fetch_user(ctx.author.id) - if not user: - await ctx.send(f"{Emojis.cross_mark} Could not get user info.") - return - - if not 1 <= squares <= MAX_SQUARES: - raise commands.BadArgument(f"Squares must be a positive number less than or equal to {MAX_SQUARES:,}.") - - sqrt = math.sqrt(squares) - - if not sqrt.is_integer(): - squares = math.ceil(sqrt) ** 2 # Get the next perfect square - - file_name = file_safe_name("mosaic_avatar", ctx.author.display_name) - - img_bytes = await user.display_avatar.replace(size=1024).read() - - file = await in_executor( - PfpEffects.apply_effect, - img_bytes, - PfpEffects.mosaic_effect, - file_name, - squares, - ) - - if squares == 1: - title = "Hooh... that was a lot of work" - description = "I present to you... Yourself!" - elif squares == MAX_SQUARES: - title = "Testing the limits I see..." - description = "What a masterpiece. :star:" - else: - title = "Your mosaic avatar" - description = f"Here is your avatar. I think it looks a bit *puzzling*\nMade with {squares} squares." - - embed = discord.Embed( - title=title, - description=description, - colour=Colours.blue - ) - - embed.set_image(url=f"attachment://{file_name}") - embed.set_footer(text=f"Made by {ctx.author.display_name}", icon_url=user.display_avatar.url) - - await ctx.send(file=file, embed=embed) - - -def setup(bot: Bot) -> None: - """Load the AvatarModify cog.""" - bot.add_cog(AvatarModify(bot)) diff --git a/bot/exts/evergreen/battleship.py b/bot/exts/evergreen/battleship.py deleted file mode 100644 index f4351954..00000000 --- a/bot/exts/evergreen/battleship.py +++ /dev/null @@ -1,448 +0,0 @@ -import asyncio -import logging -import random -import re -from dataclasses import dataclass -from functools import partial -from typing import Optional - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours - -log = logging.getLogger(__name__) - - -@dataclass -class Square: - """Each square on the battleship grid - if they contain a boat and if they've been aimed at.""" - - boat: Optional[str] - aimed: bool - - -Grid = list[list[Square]] -EmojiSet = dict[tuple[bool, bool], str] - - -@dataclass -class Player: - """Each player in the game - their messages for the boards and their current grid.""" - - user: Optional[discord.Member] - board: Optional[discord.Message] - opponent_board: discord.Message - grid: Grid - - -# The name of the ship and its size -SHIPS = { - "Carrier": 5, - "Battleship": 4, - "Cruiser": 3, - "Submarine": 3, - "Destroyer": 2, -} - - -# For these two variables, the first boolean is whether the square is a ship (True) or not (False). -# The second boolean is whether the player has aimed for that square (True) or not (False) - -# This is for the player's own board which shows the location of their own ships. -SHIP_EMOJIS = { - (True, True): ":fire:", - (True, False): ":ship:", - (False, True): ":anger:", - (False, False): ":ocean:", -} - -# This is for the opposing player's board which only shows aimed locations. -HIDDEN_EMOJIS = { - (True, True): ":red_circle:", - (True, False): ":black_circle:", - (False, True): ":white_circle:", - (False, False): ":black_circle:", -} - -# For the top row of the board -LETTERS = ( - ":stop_button::regional_indicator_a::regional_indicator_b::regional_indicator_c::regional_indicator_d:" - ":regional_indicator_e::regional_indicator_f::regional_indicator_g::regional_indicator_h:" - ":regional_indicator_i::regional_indicator_j:" -) - -# For the first column of the board -NUMBERS = [ - ":one:", - ":two:", - ":three:", - ":four:", - ":five:", - ":six:", - ":seven:", - ":eight:", - ":nine:", - ":keycap_ten:", -] - -CROSS_EMOJI = "\u274e" -HAND_RAISED_EMOJI = "\U0001f64b" - - -class Game: - """A Battleship Game.""" - - def __init__( - self, - bot: Bot, - channel: discord.TextChannel, - player1: discord.Member, - player2: discord.Member - ): - - self.bot = bot - self.public_channel = channel - - self.p1 = Player(player1, None, None, self.generate_grid()) - self.p2 = Player(player2, None, None, self.generate_grid()) - - self.gameover: bool = False - - self.turn: Optional[discord.Member] = None - self.next: Optional[discord.Member] = None - - self.match: Optional[re.Match] = None - self.surrender: bool = False - - self.setup_grids() - - @staticmethod - def generate_grid() -> Grid: - """Generates a grid by instantiating the Squares.""" - return [[Square(None, False) for _ in range(10)] for _ in range(10)] - - @staticmethod - def format_grid(player: Player, emojiset: EmojiSet) -> str: - """ - Gets and formats the grid as a list into a string to be output to the DM. - - Also adds the Letter and Number indexes. - """ - grid = [ - [emojiset[bool(square.boat), square.aimed] for square in row] - for row in player.grid - ] - - rows = ["".join([number] + row) for number, row in zip(NUMBERS, grid)] - return "\n".join([LETTERS] + rows) - - @staticmethod - def get_square(grid: Grid, square: str) -> Square: - """Grabs a square from a grid with an inputted key.""" - index = ord(square[0].upper()) - ord("A") - number = int(square[1:]) - - return grid[number-1][index] # -1 since lists are indexed from 0 - - async def game_over( - self, - *, - winner: discord.Member, - loser: discord.Member - ) -> None: - """Removes games from list of current games and announces to public chat.""" - await self.public_channel.send(f"Game Over! {winner.mention} won against {loser.mention}") - - for player in (self.p1, self.p2): - grid = self.format_grid(player, SHIP_EMOJIS) - await self.public_channel.send(f"{player.user}'s Board:\n{grid}") - - @staticmethod - def check_sink(grid: Grid, boat: str) -> bool: - """Checks if all squares containing a given boat have sunk.""" - return all(square.aimed for row in grid for square in row if square.boat == boat) - - @staticmethod - def check_gameover(grid: Grid) -> bool: - """Checks if all boats have been sunk.""" - return all(square.aimed for row in grid for square in row if square.boat) - - def setup_grids(self) -> None: - """Places the boats on the grids to initialise the game.""" - for player in (self.p1, self.p2): - for name, size in SHIPS.items(): - while True: # Repeats if about to overwrite another boat - ship_collision = False - coords = [] - - coord1 = random.randint(0, 9) - coord2 = random.randint(0, 10 - size) - - if random.choice((True, False)): # Vertical or Horizontal - x, y = coord1, coord2 - xincr, yincr = 0, 1 - else: - x, y = coord2, coord1 - xincr, yincr = 1, 0 - - for i in range(size): - new_x = x + (xincr * i) - new_y = y + (yincr * i) - if player.grid[new_x][new_y].boat: # Check if there's already a boat - ship_collision = True - break - coords.append((new_x, new_y)) - if not ship_collision: # If not overwriting any other boat spaces, break loop - break - - for x, y in coords: - player.grid[x][y].boat = name - - async def print_grids(self) -> None: - """Prints grids to the DM channels.""" - # Convert squares into Emoji - - boards = [ - self.format_grid(player, emojiset) - for emojiset in (HIDDEN_EMOJIS, SHIP_EMOJIS) - for player in (self.p1, self.p2) - ] - - locations = ( - (self.p2, "opponent_board"), (self.p1, "opponent_board"), - (self.p1, "board"), (self.p2, "board") - ) - - for board, location in zip(boards, locations): - player, attr = location - if getattr(player, attr): - await getattr(player, attr).edit(content=board) - else: - setattr(player, attr, await player.user.send(board)) - - def predicate(self, message: discord.Message) -> bool: - """Predicate checking the message typed for each turn.""" - if message.author == self.turn.user and message.channel == self.turn.user.dm_channel: - if message.content.lower() == "surrender": - self.surrender = True - return True - self.match = re.fullmatch("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip()) - if not self.match: - self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI)) - return bool(self.match) - - async def take_turn(self) -> Optional[Square]: - """Lets the player who's turn it is choose a square.""" - square = None - turn_message = await self.turn.user.send( - "It's your turn! Type the square you want to fire at. Format it like this: A1\n" - "Type `surrender` to give up." - ) - await self.next.user.send("Their turn", delete_after=3.0) - while True: - try: - await self.bot.wait_for("message", check=self.predicate, timeout=60.0) - except asyncio.TimeoutError: - await self.turn.user.send("You took too long. Game over!") - await self.next.user.send(f"{self.turn.user} took too long. Game over!") - await self.public_channel.send( - f"Game over! {self.turn.user.mention} timed out so {self.next.user.mention} wins!" - ) - self.gameover = True - break - else: - if self.surrender: - await self.next.user.send(f"{self.turn.user} surrendered. Game over!") - await self.public_channel.send( - f"Game over! {self.turn.user.mention} surrendered to {self.next.user.mention}!" - ) - self.gameover = True - break - square = self.get_square(self.next.grid, self.match.string) - if square.aimed: - await self.turn.user.send("You've already aimed at this square!", delete_after=3.0) - else: - break - await turn_message.delete() - return square - - async def hit(self, square: Square, alert_messages: list[discord.Message]) -> None: - """Occurs when a player successfully aims for a ship.""" - await self.turn.user.send("Hit!", delete_after=3.0) - alert_messages.append(await self.next.user.send("Hit!")) - if self.check_sink(self.next.grid, square.boat): - await self.turn.user.send(f"You've sunk their {square.boat} ship!", delete_after=3.0) - alert_messages.append(await self.next.user.send(f"Oh no! Your {square.boat} ship sunk!")) - if self.check_gameover(self.next.grid): - await self.turn.user.send("You win!") - await self.next.user.send("You lose!") - self.gameover = True - await self.game_over(winner=self.turn.user, loser=self.next.user) - - async def start_game(self) -> None: - """Begins the game.""" - await self.p1.user.send(f"You're playing battleship with {self.p2.user}.") - await self.p2.user.send(f"You're playing battleship with {self.p1.user}.") - - alert_messages = [] - - self.turn = self.p1 - self.next = self.p2 - - while True: - await self.print_grids() - - if self.gameover: - return - - square = await self.take_turn() - if not square: - return - square.aimed = True - - for message in alert_messages: - await message.delete() - - alert_messages = [] - alert_messages.append(await self.next.user.send(f"{self.turn.user} aimed at {self.match.string}!")) - - if square.boat: - await self.hit(square, alert_messages) - if self.gameover: - return - else: - await self.turn.user.send("Miss!", delete_after=3.0) - alert_messages.append(await self.next.user.send("Miss!")) - - self.turn, self.next = self.next, self.turn - - -class Battleship(commands.Cog): - """Play the classic game Battleship!""" - - def __init__(self, bot: Bot): - self.bot = bot - self.games: list[Game] = [] - self.waiting: list[discord.Member] = [] - - def predicate( - self, - ctx: commands.Context, - announcement: discord.Message, - reaction: discord.Reaction, - user: discord.Member - ) -> bool: - """Predicate checking the criteria for the announcement message.""" - if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2 - return True # Is dealt with later on - if ( - user.id not in (ctx.me.id, ctx.author.id) - and str(reaction.emoji) == HAND_RAISED_EMOJI - and reaction.message.id == announcement.id - ): - if self.already_playing(user): - self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) - self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) - return False - - if user in self.waiting: - self.bot.loop.create_task(ctx.send( - f"{user.mention} Please cancel your game first before joining another one." - )) - self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) - return False - - return True - - if ( - user.id == ctx.author.id - and str(reaction.emoji) == CROSS_EMOJI - and reaction.message.id == announcement.id - ): - return True - return False - - def already_playing(self, player: discord.Member) -> bool: - """Check if someone is already in a game.""" - return any(player in (game.p1.user, game.p2.user) for game in self.games) - - @commands.group(invoke_without_command=True) - @commands.guild_only() - async def battleship(self, ctx: commands.Context) -> None: - """ - Play a game of Battleship with someone else! - - This will set up a message waiting for someone else to react and play along. - The game takes place entirely in DMs. - Make sure you have your DMs open so that the bot can message you. - """ - if self.already_playing(ctx.author): - await ctx.send("You're already playing a game!") - return - - if ctx.author in self.waiting: - await ctx.send("You've already sent out a request for a player 2.") - return - - announcement = await ctx.send( - "**Battleship**: A new game is about to start!\n" - f"Press {HAND_RAISED_EMOJI} to play against {ctx.author.mention}!\n" - f"(Cancel the game with {CROSS_EMOJI}.)" - ) - self.waiting.append(ctx.author) - await announcement.add_reaction(HAND_RAISED_EMOJI) - await announcement.add_reaction(CROSS_EMOJI) - - try: - reaction, user = await self.bot.wait_for( - "reaction_add", - check=partial(self.predicate, ctx, announcement), - timeout=60.0 - ) - except asyncio.TimeoutError: - self.waiting.remove(ctx.author) - await announcement.delete() - await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...") - return - - if str(reaction.emoji) == CROSS_EMOJI: - self.waiting.remove(ctx.author) - await announcement.delete() - await ctx.send(f"{ctx.author.mention} Game cancelled.") - return - - await announcement.delete() - self.waiting.remove(ctx.author) - if self.already_playing(ctx.author): - return - game = Game(self.bot, ctx.channel, ctx.author, user) - self.games.append(game) - try: - await game.start_game() - self.games.remove(game) - except discord.Forbidden: - await ctx.send( - f"{ctx.author.mention} {user.mention} " - "Game failed. This is likely due to you not having your DMs open. Check and try again." - ) - self.games.remove(game) - except Exception: - # End the game in the event of an unforseen error so the players aren't stuck in a game - await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed.") - self.games.remove(game) - raise - - @battleship.command(name="ships", aliases=("boats",)) - async def battleship_ships(self, ctx: commands.Context) -> None: - """Lists the ships that are found on the battleship grid.""" - embed = discord.Embed(colour=Colours.blue) - embed.add_field(name="Name", value="\n".join(SHIPS)) - embed.add_field(name="Size", value="\n".join(str(size) for size in SHIPS.values())) - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the Battleship Cog.""" - bot.add_cog(Battleship(bot)) diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py deleted file mode 100644 index a91ef1c0..00000000 --- a/bot/exts/evergreen/bookmark.py +++ /dev/null @@ -1,153 +0,0 @@ -import asyncio -import logging -import random -from typing import Optional - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS -from bot.utils.converters import WrappedMessageConverter -from bot.utils.decorators import whitelist_override - -log = logging.getLogger(__name__) - -# Number of seconds to wait for other users to bookmark the same message -TIMEOUT = 120 -BOOKMARK_EMOJI = "📌" -WHITELISTED_CATEGORIES = (Categories.help_in_use,) - - -class Bookmark(commands.Cog): - """Creates personal bookmarks by relaying a message link to the user's DMs.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @staticmethod - def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed: - """Build the embed to DM the bookmark requester.""" - embed = discord.Embed( - title=title, - description=target_message.content, - colour=Colours.soft_green - ) - embed.add_field( - name="Wanna give it a visit?", - value=f"[Visit original message]({target_message.jump_url})" - ) - embed.set_author(name=target_message.author, icon_url=target_message.author.display_avatar.url) - embed.set_thumbnail(url=Icons.bookmark) - - return embed - - @staticmethod - def build_error_embed(user: discord.Member) -> discord.Embed: - """Builds an error embed for when a bookmark requester has DMs disabled.""" - return discord.Embed( - title=random.choice(ERROR_REPLIES), - description=f"{user.mention}, please enable your DMs to receive the bookmark.", - colour=Colours.soft_red - ) - - async def action_bookmark( - self, - channel: discord.TextChannel, - user: discord.Member, - target_message: discord.Message, - title: str - ) -> None: - """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" - try: - embed = self.build_bookmark_dm(target_message, title) - await user.send(embed=embed) - except discord.Forbidden: - error_embed = self.build_error_embed(user) - await channel.send(embed=error_embed) - else: - log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") - - @staticmethod - async def send_reaction_embed( - channel: discord.TextChannel, - target_message: discord.Message - ) -> discord.Message: - """Sends an embed, with a reaction, so users can react to bookmark the message too.""" - message = await channel.send( - embed=discord.Embed( - description=( - f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " - f"[this message]({target_message.jump_url})." - ), - colour=Colours.soft_green - ) - ) - - await message.add_reaction(BOOKMARK_EMOJI) - return message - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(name="bookmark", aliases=("bm", "pin")) - async def bookmark( - self, - ctx: commands.Context, - target_message: Optional[WrappedMessageConverter], - *, - title: str = "Bookmark" - ) -> None: - """Send the author a link to `target_message` via DMs.""" - if not target_message: - if not ctx.message.reference: - raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") - target_message = ctx.message.reference.resolved - - # Prevent users from bookmarking a message in a channel they don't have access to - permissions = target_message.channel.permissions_for(ctx.author) - if not permissions.read_messages: - log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.") - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description="You don't have permission to view this channel." - ) - await ctx.send(embed=embed) - return - - def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - # Conditions for a successful pagination: - all(( - # Reaction is on this message - reaction.message.id == reaction_message.id, - # User has not already bookmarked this message - user.id not in bookmarked_users, - # Reaction is the `BOOKMARK_EMOJI` emoji - str(reaction.emoji) == BOOKMARK_EMOJI, - # Reaction was not made by the Bot - user.id != self.bot.user.id - )) - ) - await self.action_bookmark(ctx.channel, ctx.author, target_message, title) - - # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs - bookmarked_users = [ctx.author.id] - reaction_message = await self.send_reaction_embed(ctx.channel, target_message) - - while True: - try: - _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check) - except asyncio.TimeoutError: - log.debug("Timed out waiting for a reaction") - break - log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.") - await self.action_bookmark(ctx.channel, user, target_message, title) - bookmarked_users.append(user.id) - - await reaction_message.delete() - - -def setup(bot: Bot) -> None: - """Load the Bookmark cog.""" - bot.add_cog(Bookmark(bot)) diff --git a/bot/exts/evergreen/catify.py b/bot/exts/evergreen/catify.py deleted file mode 100644 index 32dfae09..00000000 --- a/bot/exts/evergreen/catify.py +++ /dev/null @@ -1,86 +0,0 @@ -import random -from contextlib import suppress -from typing import Optional - -from discord import AllowedMentions, Embed, Forbidden -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Cats, Colours, NEGATIVE_REPLIES -from bot.utils import helpers - - -class Catify(commands.Cog): - """Cog for the catify command.""" - - @commands.command(aliases=("ᓚᘏᗢify", "ᓚᘏᗢ")) - @commands.cooldown(1, 5, commands.BucketType.user) - async def catify(self, ctx: commands.Context, *, text: Optional[str]) -> None: - """ - Convert the provided text into a cat themed sentence by interspercing cats throughout text. - - If no text is given then the users nickname is edited. - """ - if not text: - display_name = ctx.author.display_name - - if len(display_name) > 26: - embed = Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "Your display name is too long to be catified! " - "Please change it to be under 26 characters." - ), - color=Colours.soft_red - ) - await ctx.send(embed=embed) - return - - else: - display_name += f" | {random.choice(Cats.cats)}" - - await ctx.send(f"Your catified nickname is: `{display_name}`", allowed_mentions=AllowedMentions.none()) - - with suppress(Forbidden): - await ctx.author.edit(nick=display_name) - else: - if len(text) >= 1500: - embed = Embed( - title=random.choice(NEGATIVE_REPLIES), - description="Submitted text was too large! Please submit something under 1500 characters.", - color=Colours.soft_red - ) - await ctx.send(embed=embed) - return - - string_list = text.split() - for index, name in enumerate(string_list): - name = name.lower() - if "cat" in name: - if random.randint(0, 5) == 5: - string_list[index] = name.replace("cat", f"**{random.choice(Cats.cats)}**") - else: - string_list[index] = name.replace("cat", random.choice(Cats.cats)) - for element in Cats.cats: - if element in name: - string_list[index] = name.replace(element, "cat") - - string_len = len(string_list) // 3 or len(string_list) - - for _ in range(random.randint(1, string_len)): - # insert cat at random index - if random.randint(0, 5) == 5: - string_list.insert(random.randint(0, len(string_list)), f"**{random.choice(Cats.cats)}**") - else: - string_list.insert(random.randint(0, len(string_list)), random.choice(Cats.cats)) - - text = helpers.suppress_links(" ".join(string_list)) - await ctx.send( - f">>> {text}", - allowed_mentions=AllowedMentions.none() - ) - - -def setup(bot: Bot) -> None: - """Loads the catify cog.""" - bot.add_cog(Catify()) diff --git a/bot/exts/evergreen/cheatsheet.py b/bot/exts/evergreen/cheatsheet.py deleted file mode 100644 index 33d29f67..00000000 --- a/bot/exts/evergreen/cheatsheet.py +++ /dev/null @@ -1,112 +0,0 @@ -import random -import re -from typing import Union -from urllib.parse import quote_plus - -from discord import Embed -from discord.ext import commands -from discord.ext.commands import BucketType, Context - -from bot import constants -from bot.bot import Bot -from bot.constants import Categories, Channels, Colours, ERROR_REPLIES -from bot.utils.decorators import whitelist_override - -ERROR_MESSAGE = f""" -Unknown cheat sheet. Please try to reformulate your query. - -**Examples**: -```md -{constants.Client.prefix}cht read json -{constants.Client.prefix}cht hello world -{constants.Client.prefix}cht lambda -``` -If the problem persists send a message in <#{Channels.dev_contrib}> -""" - -URL = "https://cheat.sh/python/{search}" -ESCAPE_TT = str.maketrans({"`": "\\`"}) -ANSI_RE = re.compile(r"\x1b\[.*?m") -# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html. -HEADERS = {"User-Agent": "curl/7.68.0"} - - -class CheatSheet(commands.Cog): - """Commands that sends a result of a cht.sh search in code blocks.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @staticmethod - def fmt_error_embed() -> Embed: - """ - Format the Error Embed. - - If the cht.sh search returned 404, overwrite it to send a custom error embed. - link -> https://github.com/chubin/cheat.sh/issues/198 - """ - embed = Embed( - title=random.choice(ERROR_REPLIES), - description=ERROR_MESSAGE, - colour=Colours.soft_red - ) - return embed - - def result_fmt(self, url: str, body_text: str) -> tuple[bool, Union[str, Embed]]: - """Format Result.""" - if body_text.startswith("# 404 NOT FOUND"): - embed = self.fmt_error_embed() - return True, embed - - body_space = min(1986 - len(url), 1000) - - if len(body_text) > body_space: - description = ( - f"**Result Of cht.sh**\n" - f"```python\n{body_text[:body_space]}\n" - f"... (truncated - too many lines)\n```\n" - f"Full results: {url} " - ) - else: - description = ( - f"**Result Of cht.sh**\n" - f"```python\n{body_text}\n```\n" - f"{url}" - ) - return False, description - - @commands.command( - name="cheat", - aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"), - ) - @commands.cooldown(1, 10, BucketType.user) - @whitelist_override(categories=[Categories.help_in_use]) - async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None: - """ - Search cheat.sh. - - Gets a post from https://cheat.sh/python/ by default. - Usage: - --> .cht read json - """ - async with ctx.typing(): - search_string = quote_plus(" ".join(search_terms)) - - async with self.bot.http_session.get( - URL.format(search=search_string), headers=HEADERS - ) as response: - result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT) - - is_embed, description = self.result_fmt( - URL.format(search=search_string), - result - ) - if is_embed: - await ctx.send(embed=description) - else: - await ctx.send(content=description) - - -def setup(bot: Bot) -> None: - """Load the CheatSheet cog.""" - bot.add_cog(CheatSheet(bot)) diff --git a/bot/exts/evergreen/coinflip.py b/bot/exts/evergreen/coinflip.py deleted file mode 100644 index 804306bd..00000000 --- a/bot/exts/evergreen/coinflip.py +++ /dev/null @@ -1,53 +0,0 @@ -import random - -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Emojis - - -class CoinSide(commands.Converter): - """Class used to convert the `side` parameter of coinflip command.""" - - HEADS = ("h", "head", "heads") - TAILS = ("t", "tail", "tails") - - async def convert(self, ctx: commands.Context, side: str) -> str: - """Converts the provided `side` into the corresponding string.""" - side = side.lower() - if side in self.HEADS: - return "heads" - - if side in self.TAILS: - return "tails" - - raise commands.BadArgument(f"{side!r} is not a valid coin side.") - - -class CoinFlip(commands.Cog): - """Cog for the CoinFlip command.""" - - @commands.command(name="coinflip", aliases=("flip", "coin", "cf")) - async def coinflip_command(self, ctx: commands.Context, side: CoinSide = None) -> None: - """ - Flips a coin. - - If `side` is provided will state whether you guessed the side correctly. - """ - flipped_side = random.choice(["heads", "tails"]) - - message = f"{ctx.author.mention} flipped **{flipped_side}**. " - if not side: - await ctx.send(message) - return - - if side == flipped_side: - message += f"You guessed correctly! {Emojis.lemon_hyperpleased}" - else: - message += f"You guessed incorrectly. {Emojis.lemon_pensive}" - await ctx.send(message) - - -def setup(bot: Bot) -> None: - """Loads the coinflip cog.""" - bot.add_cog(CoinFlip()) diff --git a/bot/exts/evergreen/color.py b/bot/exts/evergreen/color.py deleted file mode 100644 index a00a956b..00000000 --- a/bot/exts/evergreen/color.py +++ /dev/null @@ -1,113 +0,0 @@ -# imports -import logging - -import colorsys -import pillow -from discord import Embed -# ! need to install discord-flags and add to poetry.lock file -from discord.ext import commands, flags -from rapidfuzz import process - -from bot.bot import Bot -from bot.constants import Colours - -logger = logging.getLogger(__name__) - -# constants if needed -# Color URLs - will be replaced by JSON file? -COLOR_JSON_PATH = ".bot//exts//resources//evergreen//" -COLOR_URL_XKCD = "https://xkcd.com/color/rgb/" -COLOR_URL_NAME_THAT_COLOR = "https://github.com/ryanzec/name-that-color/blob/master/lib/ntc.js#L116-L1681" - - -COLOR_ERROR = Embed( - title="Input color is not possible", - description="The color code {user_color} is not a possible color combination." - "\nThe range of possible values are: " - "\nRGB & HSV: 0-255" - "\nCMYK: 0-100%" - "\nHSL: 0-360 degrees" - "\nHex: #000000-#FFFFFF" -) -COLOR_EMBED = Embed( - title="{color_name}", - description="RGB" - "\n{RGB}" - "\nHSV" - "\n{HSV}" - "\nCMYK" - "\n{CMYK}" - "\nHSL" - "\n{HSL}" - "\nHex" - "\n{Hex}" -) - - -# define color command -class Color(commands.cog): - """User initiated command to receive color information.""" - - def __init__(self, bot: Bot): - self.bot = bot - - # ? possible to use discord-flags to allow user to decide on color - # https://pypi.org/project/discord-flags/ - # @flags.add_flag("--rgb", type=str) - # @flags.add_flag("--hsv", type=str) - # @flags.add_flag("--cmyk", type=str) - # @flags.add_flag("--hsl", type=str) - # @flags.add_flag("--hex", type=str) - # @flags.add_flag("--name", type=str) - # @flags.command() - @commands.command(aliases=["color", "colour"]) - @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def color(self, ctx: commands.Context, *, user_color: str) -> None: - """Send information on input color code or color name.""" - # need to check if user_color is RGB, HSV, CMYK, HSL, Hex or color name - # should we assume the color is RGB if not defined? - # should discord tags be used? - # need to review discord.py V2.0 - - # TODO code to check if color code is possible - await ctx.send(embed=COLOR_ERROR.format(color=user_color)) - # await ctx.send(embed=COLOR_EMBED.format( - # RGB=color_dict["RGB"], - # HSV=color_dict["HSV"], - # HSL=color_dict["HSL"], - # CMYK=color_dict["CMYK"], - # HSL=color_dict["HSL"], - # Hex=color_dict["Hex"], - # color_name=color_dict["color_name"] - # ).set_image() # url for image? - # ) - - # TODO pass for now - pass - - # if user_color in color_lists: - # # TODO fuzzy match for color - # pass - - async def color_converter(self, color: str, code_type: str) -> dict: - """Generate alternative color codes for use in the embed.""" - # TODO add code to take color and code type and return other types - # color_dict = { - # "RGB": color_RGB, - # "HSV": color_HSV, - # "HSL": color_HSL, - # "CMYK": color_CMYK, - # "HSL": color_HSL, - # "Hex": color_Hex, - # "color_name": color_name, - # } - pass - - async def photo_generator(self, color: str) -> None: - """Generate photo to use in embed.""" - # TODO need to find a way to store photo in cache to add to embed, then remove - - -def setup(bot: Bot) -> None: - """Load the Color Cog.""" - bot.add_cog(Color(bot)) diff --git a/bot/exts/evergreen/connect_four.py b/bot/exts/evergreen/connect_four.py deleted file mode 100644 index 647bb2b7..00000000 --- a/bot/exts/evergreen/connect_four.py +++ /dev/null @@ -1,452 +0,0 @@ -import asyncio -import random -from functools import partial -from typing import Optional, Union - -import discord -import emojis -from discord.ext import commands -from discord.ext.commands import guild_only - -from bot.bot import Bot -from bot.constants import Emojis - -NUMBERS = list(Emojis.number_emojis.values()) -CROSS_EMOJI = Emojis.incident_unactioned - -Coordinate = Optional[tuple[int, int]] -EMOJI_CHECK = Union[discord.Emoji, str] - - -class Game: - """A Connect 4 Game.""" - - def __init__( - self, - bot: Bot, - channel: discord.TextChannel, - player1: discord.Member, - player2: Optional[discord.Member], - tokens: list[str], - size: int = 7 - ): - self.bot = bot - self.channel = channel - self.player1 = player1 - self.player2 = player2 or AI(self.bot, game=self) - self.tokens = tokens - - self.grid = self.generate_board(size) - self.grid_size = size - - self.unicode_numbers = NUMBERS[:self.grid_size] - - self.message = None - - self.player_active = None - self.player_inactive = None - - @staticmethod - def generate_board(size: int) -> list[list[int]]: - """Generate the connect 4 board.""" - return [[0 for _ in range(size)] for _ in range(size)] - - async def print_grid(self) -> None: - """Formats and outputs the Connect Four grid to the channel.""" - title = ( - f"Connect 4: {self.player1.display_name}" - f" VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}" - ) - - rows = [" ".join(self.tokens[s] for s in row) for row in self.grid] - first_row = " ".join(x for x in NUMBERS[:self.grid_size]) - formatted_grid = "\n".join([first_row] + rows) - embed = discord.Embed(title=title, description=formatted_grid) - - if self.message: - await self.message.edit(embed=embed) - else: - self.message = await self.channel.send(content="Loading...") - for emoji in self.unicode_numbers: - await self.message.add_reaction(emoji) - await self.message.add_reaction(CROSS_EMOJI) - await self.message.edit(content=None, embed=embed) - - async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None: - """Announces to public chat.""" - if action == "win": - await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}") - elif action == "draw": - await self.channel.send(f"Game Over! {player1.mention} {player2.mention} It's A Draw :tada:") - elif action == "quit": - await self.channel.send(f"{self.player1.mention} surrendered. Game over!") - await self.print_grid() - - async def start_game(self) -> None: - """Begins the game.""" - self.player_active, self.player_inactive = self.player1, self.player2 - - while True: - await self.print_grid() - - if isinstance(self.player_active, AI): - coords = self.player_active.play() - if not coords: - await self.game_over( - "draw", - self.bot.user if isinstance(self.player_active, AI) else self.player_active, - self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, - ) - else: - coords = await self.player_turn() - - if not coords: - return - - if self.check_win(coords, 1 if self.player_active == self.player1 else 2): - await self.game_over( - "win", - self.bot.user if isinstance(self.player_active, AI) else self.player_active, - self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, - ) - return - - self.player_active, self.player_inactive = self.player_inactive, self.player_active - - def predicate(self, reaction: discord.Reaction, user: discord.Member) -> bool: - """The predicate to check for the player's reaction.""" - return ( - reaction.message.id == self.message.id - and user.id == self.player_active.id - and str(reaction.emoji) in (*self.unicode_numbers, CROSS_EMOJI) - ) - - async def player_turn(self) -> Coordinate: - """Initiate the player's turn.""" - message = await self.channel.send( - f"{self.player_active.mention}, it's your turn! React with the column you want to place your token in." - ) - player_num = 1 if self.player_active == self.player1 else 2 - while True: - try: - reaction, user = await self.bot.wait_for("reaction_add", check=self.predicate, timeout=30.0) - except asyncio.TimeoutError: - await self.channel.send(f"{self.player_active.mention}, you took too long. Game over!") - return - else: - await message.delete() - if str(reaction.emoji) == CROSS_EMOJI: - await self.game_over("quit", self.player_active, self.player_inactive) - return - - await self.message.remove_reaction(reaction, user) - - column_num = self.unicode_numbers.index(str(reaction.emoji)) - column = [row[column_num] for row in self.grid] - - for row_num, square in reversed(list(enumerate(column))): - if not square: - self.grid[row_num][column_num] = player_num - return row_num, column_num - message = await self.channel.send(f"Column {column_num + 1} is full. Try again") - - def check_win(self, coords: Coordinate, player_num: int) -> bool: - """Check that placing a counter here would cause the player to win.""" - vertical = [(-1, 0), (1, 0)] - horizontal = [(0, 1), (0, -1)] - forward_diag = [(-1, 1), (1, -1)] - backward_diag = [(-1, -1), (1, 1)] - axes = [vertical, horizontal, forward_diag, backward_diag] - - for axis in axes: - counters_in_a_row = 1 # The initial counter that is compared to - for (row_incr, column_incr) in axis: - row, column = coords - row += row_incr - column += column_incr - - while 0 <= row < self.grid_size and 0 <= column < self.grid_size: - if self.grid[row][column] == player_num: - counters_in_a_row += 1 - row += row_incr - column += column_incr - else: - break - if counters_in_a_row >= 4: - return True - return False - - -class AI: - """The Computer Player for Single-Player games.""" - - def __init__(self, bot: Bot, game: Game): - self.game = game - self.mention = bot.user.mention - - def get_possible_places(self) -> list[Coordinate]: - """Gets all the coordinates where the AI could possibly place a counter.""" - possible_coords = [] - for column_num in range(self.game.grid_size): - column = [row[column_num] for row in self.game.grid] - for row_num, square in reversed(list(enumerate(column))): - if not square: - possible_coords.append((row_num, column_num)) - break - return possible_coords - - def check_ai_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]: - """ - Check AI win. - - Check if placing a counter in any possible coordinate would cause the AI to win - with 10% chance of not winning and returning None - """ - if random.randint(1, 10) == 1: - return - for coords in coord_list: - if self.game.check_win(coords, 2): - return coords - - def check_player_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]: - """ - Check Player win. - - Check if placing a counter in possible coordinates would stop the player - from winning with 25% of not blocking them and returning None. - """ - if random.randint(1, 4) == 1: - return - for coords in coord_list: - if self.game.check_win(coords, 1): - return coords - - @staticmethod - def random_coords(coord_list: list[Coordinate]) -> Coordinate: - """Picks a random coordinate from the possible ones.""" - return random.choice(coord_list) - - def play(self) -> Union[Coordinate, bool]: - """ - Plays for the AI. - - Gets all possible coords, and determins the move: - 1. coords where it can win. - 2. coords where the player can win. - 3. Random coord - The first possible value is choosen. - """ - possible_coords = self.get_possible_places() - - if not possible_coords: - return False - - coords = ( - self.check_ai_win(possible_coords) - or self.check_player_win(possible_coords) - or self.random_coords(possible_coords) - ) - - row, column = coords - self.game.grid[row][column] = 2 - return coords - - -class ConnectFour(commands.Cog): - """Connect Four. The Classic Vertical Four-in-a-row Game!""" - - def __init__(self, bot: Bot): - self.bot = bot - self.games: list[Game] = [] - self.waiting: list[discord.Member] = [] - - self.tokens = [":white_circle:", ":blue_circle:", ":red_circle:"] - - self.max_board_size = 9 - self.min_board_size = 5 - - async def check_author(self, ctx: commands.Context, board_size: int) -> bool: - """Check if the requester is free and the board size is correct.""" - if self.already_playing(ctx.author): - await ctx.send("You're already playing a game!") - return False - - if ctx.author in self.waiting: - await ctx.send("You've already sent out a request for a player 2") - return False - - if not self.min_board_size <= board_size <= self.max_board_size: - await ctx.send( - f"{board_size} is not a valid board size. A valid board size is " - f"between `{self.min_board_size}` and `{self.max_board_size}`." - ) - return False - - return True - - def get_player( - self, - ctx: commands.Context, - announcement: discord.Message, - reaction: discord.Reaction, - user: discord.Member - ) -> bool: - """Predicate checking the criteria for the announcement message.""" - if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2 - return True # Is dealt with later on - - if ( - user.id not in (ctx.me.id, ctx.author.id) - and str(reaction.emoji) == Emojis.hand_raised - and reaction.message.id == announcement.id - ): - if self.already_playing(user): - self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) - self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) - return False - - if user in self.waiting: - self.bot.loop.create_task(ctx.send( - f"{user.mention} Please cancel your game first before joining another one." - )) - self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) - return False - - return True - - if ( - user.id == ctx.author.id - and str(reaction.emoji) == CROSS_EMOJI - and reaction.message.id == announcement.id - ): - return True - return False - - def already_playing(self, player: discord.Member) -> bool: - """Check if someone is already in a game.""" - return any(player in (game.player1, game.player2) for game in self.games) - - @staticmethod - def check_emojis( - e1: EMOJI_CHECK, e2: EMOJI_CHECK - ) -> tuple[bool, Optional[str]]: - """Validate the emojis, the user put.""" - if isinstance(e1, str) and emojis.count(e1) != 1: - return False, e1 - if isinstance(e2, str) and emojis.count(e2) != 1: - return False, e2 - return True, None - - async def _play_game( - self, - ctx: commands.Context, - user: Optional[discord.Member], - board_size: int, - emoji1: str, - emoji2: str - ) -> None: - """Helper for playing a game of connect four.""" - self.tokens = [":white_circle:", str(emoji1), str(emoji2)] - game = None # if game fails to intialize in try...except - - try: - game = Game(self.bot, ctx.channel, ctx.author, user, self.tokens, size=board_size) - self.games.append(game) - await game.start_game() - self.games.remove(game) - except Exception: - # End the game in the event of an unforeseen error so the players aren't stuck in a game - await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed.") - if game in self.games: - self.games.remove(game) - raise - - @guild_only() - @commands.group( - invoke_without_command=True, - aliases=("4inarow", "connect4", "connectfour", "c4"), - case_insensitive=True - ) - async def connect_four( - self, - ctx: commands.Context, - board_size: int = 7, - emoji1: EMOJI_CHECK = "\U0001f535", - emoji2: EMOJI_CHECK = "\U0001f534" - ) -> None: - """ - Play the classic game of Connect Four with someone! - - Sets up a message waiting for someone else to react and play along. - The game will start once someone has reacted. - All inputs will be through reactions. - """ - check, emoji = self.check_emojis(emoji1, emoji2) - if not check: - raise commands.EmojiNotFound(emoji) - - check_author_result = await self.check_author(ctx, board_size) - if not check_author_result: - return - - announcement = await ctx.send( - "**Connect Four**: A new game is about to start!\n" - f"Press {Emojis.hand_raised} to play against {ctx.author.mention}!\n" - f"(Cancel the game with {CROSS_EMOJI}.)" - ) - self.waiting.append(ctx.author) - await announcement.add_reaction(Emojis.hand_raised) - await announcement.add_reaction(CROSS_EMOJI) - - try: - reaction, user = await self.bot.wait_for( - "reaction_add", - check=partial(self.get_player, ctx, announcement), - timeout=60.0 - ) - except asyncio.TimeoutError: - self.waiting.remove(ctx.author) - await announcement.delete() - await ctx.send( - f"{ctx.author.mention} Seems like there's no one here to play. " - f"Use `{ctx.prefix}{ctx.invoked_with} ai` to play against a computer." - ) - return - - if str(reaction.emoji) == CROSS_EMOJI: - self.waiting.remove(ctx.author) - await announcement.delete() - await ctx.send(f"{ctx.author.mention} Game cancelled.") - return - - await announcement.delete() - self.waiting.remove(ctx.author) - if self.already_playing(ctx.author): - return - - await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2)) - - @guild_only() - @connect_four.command(aliases=("bot", "computer", "cpu")) - async def ai( - self, - ctx: commands.Context, - board_size: int = 7, - emoji1: EMOJI_CHECK = "\U0001f535", - emoji2: EMOJI_CHECK = "\U0001f534" - ) -> None: - """Play Connect Four against a computer player.""" - check, emoji = self.check_emojis(emoji1, emoji2) - if not check: - raise commands.EmojiNotFound(emoji) - - check_author_result = await self.check_author(ctx, board_size) - if not check_author_result: - return - - await self._play_game(ctx, None, board_size, str(emoji1), str(emoji2)) - - -def setup(bot: Bot) -> None: - """Load ConnectFour Cog.""" - bot.add_cog(ConnectFour(bot)) diff --git a/bot/exts/evergreen/conversationstarters.py b/bot/exts/evergreen/conversationstarters.py deleted file mode 100644 index fdc4467a..00000000 --- a/bot/exts/evergreen/conversationstarters.py +++ /dev/null @@ -1,69 +0,0 @@ -from pathlib import Path - -import yaml -from discord import Color, Embed -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import WHITELISTED_CHANNELS -from bot.utils.decorators import whitelist_override -from bot.utils.randomization import RandomCycle - -SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9" - -with Path("bot/resources/evergreen/starter.yaml").open("r", encoding="utf8") as f: - STARTERS = yaml.load(f, Loader=yaml.FullLoader) - -with Path("bot/resources/evergreen/py_topics.yaml").open("r", encoding="utf8") as f: - # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help. - PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader) - - # Removing `None` from lists of topics, if not a list, it is changed to an empty one. - PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()} - - # All the allowed channels that the ".topic" command is allowed to be executed in. - ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS) - -# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions. -ALL_TOPICS = {"default": STARTERS, **PY_TOPICS} -TOPICS = { - channel: RandomCycle(topics or ["No topics found for this channel."]) - for channel, topics in ALL_TOPICS.items() -} - - -class ConvoStarters(commands.Cog): - """Evergreen conversation topics.""" - - @commands.command() - @whitelist_override(channels=ALL_ALLOWED_CHANNELS) - async def topic(self, ctx: commands.Context) -> None: - """ - Responds with a random topic to start a conversation. - - If in a Python channel, a python-related topic will be given. - - Otherwise, a random conversation topic will be received by the user. - """ - # No matter what, the form will be shown. - embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple()) - - try: - # Fetching topics. - channel_topics = TOPICS[ctx.channel.id] - - # If the channel isn't Python-related. - except KeyError: - embed.title = f"**{next(TOPICS['default'])}**" - - # If the channel ID doesn't have any topics. - else: - embed.title = f"**{next(channel_topics)}**" - - finally: - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the ConvoStarters cog.""" - bot.add_cog(ConvoStarters()) diff --git a/bot/exts/evergreen/duck_game.py b/bot/exts/evergreen/duck_game.py deleted file mode 100644 index d592f3df..00000000 --- a/bot/exts/evergreen/duck_game.py +++ /dev/null @@ -1,356 +0,0 @@ -import asyncio -import random -import re -from collections import defaultdict -from io import BytesIO -from itertools import product -from pathlib import Path -from urllib.parse import urlparse - -import discord -from PIL import Image, ImageDraw, ImageFont -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, MODERATION_ROLES -from bot.utils.decorators import with_role - - -DECK = list(product(*[(0, 1, 2)]*4)) - -GAME_DURATION = 180 - -# Scoring -CORRECT_SOLN = 1 -INCORRECT_SOLN = -1 -CORRECT_GOOSE = 2 -INCORRECT_GOOSE = -1 - -# Distribution of minimum acceptable solutions at board generation. -# This is for gameplay reasons, to shift the number of solutions per board up, -# while still making the end of the game unpredictable. -# Note: this is *not* the same as the distribution of number of solutions. - -SOLN_DISTR = 0, 0.05, 0.05, 0.1, 0.15, 0.25, 0.2, 0.15, .05 - -IMAGE_PATH = Path("bot", "resources", "evergreen", "all_cards.png") -FONT_PATH = Path("bot", "resources", "evergreen", "LuckiestGuy-Regular.ttf") -HELP_IMAGE_PATH = Path("bot", "resources", "evergreen", "ducks_help_ex.png") - -ALL_CARDS = Image.open(IMAGE_PATH) -LABEL_FONT = ImageFont.truetype(str(FONT_PATH), size=16) -CARD_WIDTH = 155 -CARD_HEIGHT = 97 - -EMOJI_WRONG = "\u274C" - -ANSWER_REGEX = re.compile(r'^\D*(\d+)\D+(\d+)\D+(\d+)\D*$') - -HELP_TEXT = """ -**Each card has 4 features** -Color, Number, Hat, and Accessory - -**A valid flight** -3 cards where each feature is either all the same or all different - -**Call "GOOSE"** -if you think there are no more flights - -**+1** for each valid flight -**+2** for a correct "GOOSE" call -**-1** for any wrong answer - -The first flight below is invalid: the first card has swords while the other two have no accessory.\ - It would be valid if the first card was empty-handed, or one of the other two had paintbrushes. - -The second flight is valid because there are no 2:1 splits; each feature is either all the same or all different. -""" - - -def assemble_board_image(board: list[tuple[int]], rows: int, columns: int) -> Image: - """Cut and paste images representing the given cards into an image representing the board.""" - new_im = Image.new("RGBA", (CARD_WIDTH*columns, CARD_HEIGHT*rows)) - draw = ImageDraw.Draw(new_im) - for idx, card in enumerate(board): - card_image = get_card_image(card) - row, col = divmod(idx, columns) - top, left = row * CARD_HEIGHT, col * CARD_WIDTH - new_im.paste(card_image, (left, top)) - draw.text( - xy=(left+5, top+5), # magic numbers are buffers for the card labels - text=str(idx), - fill=(0, 0, 0), - font=LABEL_FONT, - ) - return new_im - - -def get_card_image(card: tuple[int]) -> Image: - """Slice the image containing all the cards to get just this card.""" - # The master card image file should have 9x9 cards, - # arranged such that their features can be interpreted as ordered trinary. - row, col = divmod(as_trinary(card), 9) - x1 = col * CARD_WIDTH - x2 = x1 + CARD_WIDTH - y1 = row * CARD_HEIGHT - y2 = y1 + CARD_HEIGHT - return ALL_CARDS.crop((x1, y1, x2, y2)) - - -def as_trinary(card: tuple[int]) -> int: - """Find the card's unique index by interpreting its features as trinary.""" - return int(''.join(str(x) for x in card), base=3) - - -class DuckGame: - """A class for a single game.""" - - def __init__( - self, - rows: int = 4, - columns: int = 3, - minimum_solutions: int = 1, - ): - """ - Take samples from the deck to generate a board. - - Args: - rows (int, optional): Rows in the game board. Defaults to 4. - columns (int, optional): Columns in the game board. Defaults to 3. - minimum_solutions (int, optional): Minimum acceptable number of solutions in the board. Defaults to 1. - """ - self.rows = rows - self.columns = columns - size = rows * columns - - self._solutions = None - self.claimed_answers = {} - self.scores = defaultdict(int) - self.editing_embed = asyncio.Lock() - - self.board = random.sample(DECK, size) - while len(self.solutions) < minimum_solutions: - self.board = random.sample(DECK, size) - - @property - def board(self) -> list[tuple[int]]: - """Accesses board property.""" - return self._board - - @board.setter - def board(self, val: list[tuple[int]]) -> None: - """Erases calculated solutions if the board changes.""" - self._solutions = None - self._board = val - - @property - def solutions(self) -> None: - """Calculate valid solutions and cache to avoid redoing work.""" - if self._solutions is None: - self._solutions = set() - for idx_a, card_a in enumerate(self.board): - for idx_b, card_b in enumerate(self.board[idx_a+1:], start=idx_a+1): - # Two points determine a line, and there are exactly 3 points per line in {0,1,2}^4. - # The completion of a line will only be a duplicate point if the other two points are the same, - # which is prevented by the triangle iteration. - completion = tuple( - feat_a if feat_a == feat_b else 3-feat_a-feat_b - for feat_a, feat_b in zip(card_a, card_b) - ) - try: - idx_c = self.board.index(completion) - except ValueError: - continue - - # Indices within the solution are sorted to detect duplicate solutions modulo order. - solution = tuple(sorted((idx_a, idx_b, idx_c))) - self._solutions.add(solution) - - return self._solutions - - -class DuckGamesDirector(commands.Cog): - """A cog for running Duck Duck Duck Goose games.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.current_games = {} - - @commands.group( - name='duckduckduckgoose', - aliases=['dddg', 'ddg', 'duckduckgoose', 'duckgoose'], - invoke_without_command=True - ) - @commands.cooldown(rate=1, per=2, type=commands.BucketType.channel) - async def start_game(self, ctx: commands.Context) -> None: - """Generate a board, send the game embed, and end the game after a time limit.""" - if ctx.channel.id in self.current_games: - await ctx.send("There's already a game running!") - return - - minimum_solutions, = random.choices(range(len(SOLN_DISTR)), weights=SOLN_DISTR) - game = DuckGame(minimum_solutions=minimum_solutions) - game.running = True - self.current_games[ctx.channel.id] = game - - game.embed_msg = await self.send_board_embed(ctx, game) - await asyncio.sleep(GAME_DURATION) - - # Checking for the channel ID in the currently running games is not sufficient. - # The game could have been ended by a player, and a new game already started in the same channel. - if game.running: - try: - del self.current_games[ctx.channel.id] - await self.end_game(ctx.channel, game, end_message="Time's up!") - except KeyError: - pass - - @commands.Cog.listener() - async def on_message(self, msg: discord.Message) -> None: - """Listen for messages and process them as answers if appropriate.""" - if msg.author.bot: - return - - channel = msg.channel - if channel.id not in self.current_games: - return - - game = self.current_games[channel.id] - if msg.content.strip().lower() == 'goose': - # If all of the solutions have been claimed, i.e. the "goose" call is correct. - if len(game.solutions) == len(game.claimed_answers): - try: - del self.current_games[channel.id] - game.scores[msg.author] += CORRECT_GOOSE - await self.end_game(channel, game, end_message=f"{msg.author.display_name} GOOSED!") - except KeyError: - pass - else: - await msg.add_reaction(EMOJI_WRONG) - game.scores[msg.author] += INCORRECT_GOOSE - return - - # Valid answers contain 3 numbers. - if not (match := re.match(ANSWER_REGEX, msg.content)): - return - answer = tuple(sorted(int(m) for m in match.groups())) - - # Be forgiving for answers that use indices not on the board. - if not all(0 <= n < len(game.board) for n in answer): - return - - # Also be forgiving for answers that have already been claimed (and avoid penalizing for racing conditions). - if answer in game.claimed_answers: - return - - if answer in game.solutions: - game.claimed_answers[answer] = msg.author - game.scores[msg.author] += CORRECT_SOLN - await self.display_claimed_answer(game, msg.author, answer) - else: - await msg.add_reaction(EMOJI_WRONG) - game.scores[msg.author] += INCORRECT_SOLN - - async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message: - """Create and send the initial game embed. This will be edited as the game goes on.""" - image = assemble_board_image(game.board, game.rows, game.columns) - with BytesIO() as image_stream: - image.save(image_stream, format="png") - image_stream.seek(0) - file = discord.File(fp=image_stream, filename="board.png") - embed = discord.Embed( - title="Duck Duck Duck Goose!", - color=Colours.bright_green, - footer="" - ) - embed.set_image(url="attachment://board.png") - return await ctx.send(embed=embed, file=file) - - async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None: - """Add a claimed answer to the game embed.""" - async with game.editing_embed: - game_embed, = game.embed_msg.embeds - old_footer = game_embed.footer.text - if old_footer == discord.Embed.Empty: - old_footer = "" - game_embed.set_footer(text=f"{old_footer}\n{str(answer):12s} - {author.display_name}") - await self.edit_embed_with_image(game.embed_msg, game_embed) - - async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None: - """Edit the game embed to reflect the end of the game and mark the game as not running.""" - game.running = False - - scoreboard_embed = discord.Embed( - title=end_message, - color=discord.Color.dark_purple(), - ) - scores = sorted( - game.scores.items(), - key=lambda item: item[1], - reverse=True, - ) - scoreboard = "Final scores:\n\n" - scoreboard += "\n".join(f"{member.display_name}: {score}" for member, score in scores) - scoreboard_embed.description = scoreboard - await channel.send(embed=scoreboard_embed) - - missed = [ans for ans in game.solutions if ans not in game.claimed_answers] - if missed: - missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed) - else: - missed_text = "All the flights were found!" - - game_embed, = game.embed_msg.embeds - old_footer = game_embed.footer.text - if old_footer == discord.Embed.Empty: - old_footer = "" - embed_as_dict = game_embed.to_dict() # Cannot set embed color after initialization - embed_as_dict["color"] = discord.Color.red().value - game_embed = discord.Embed.from_dict(embed_as_dict) - game_embed.set_footer( - text=f"{old_footer.rstrip()}\n\n{missed_text}" - ) - await self.edit_embed_with_image(game.embed_msg, game_embed) - - @start_game.command(name="help") - async def show_rules(self, ctx: commands.Context) -> None: - """Explain the rules of the game.""" - await self.send_help_embed(ctx) - - @start_game.command(name="stop") - @with_role(*MODERATION_ROLES) - async def stop_game(self, ctx: commands.Context) -> None: - """Stop a currently running game. Only available to mods.""" - try: - game = self.current_games.pop(ctx.channel.id) - except KeyError: - await ctx.send("No game currently running in this channel") - return - await self.end_game(ctx.channel, game, end_message="Game canceled.") - - @staticmethod - async def send_help_embed(ctx: commands.Context) -> discord.Message: - """Send rules embed.""" - embed = discord.Embed( - title="Compete against other players to find valid flights!", - color=discord.Color.dark_purple(), - ) - embed.description = HELP_TEXT - file = discord.File(HELP_IMAGE_PATH, filename="help.png") - embed.set_image(url="attachment://help.png") - embed.set_footer( - text="Tip: using Discord's compact message display mode can help keep the board on the screen" - ) - return await ctx.send(file=file, embed=embed) - - @staticmethod - async def edit_embed_with_image(msg: discord.Message, embed: discord.Embed) -> None: - """Edit an embed without the attached image going wonky.""" - attach_name = urlparse(embed.image.url).path.split("/")[-1] - embed.set_image(url=f"attachment://{attach_name}") - await msg.edit(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the DuckGamesDirector cog.""" - bot.add_cog(DuckGamesDirector(bot)) diff --git a/bot/exts/evergreen/emoji.py b/bot/exts/evergreen/emoji.py deleted file mode 100644 index 55d6b8e9..00000000 --- a/bot/exts/evergreen/emoji.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging -import random -import textwrap -from collections import defaultdict -from datetime import datetime -from typing import Optional - -from discord import Color, Embed, Emoji -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, ERROR_REPLIES -from bot.utils.extensions import invoke_help_command -from bot.utils.pagination import LinePaginator -from bot.utils.time import time_since - -log = logging.getLogger(__name__) - - -class Emojis(commands.Cog): - """A collection of commands related to emojis in the server.""" - - @staticmethod - def embed_builder(emoji: dict) -> tuple[Embed, list[str]]: - """Generates an embed with the emoji names and count.""" - embed = Embed( - color=Colours.orange, - title="Emoji Count", - timestamp=datetime.utcnow() - ) - msg = [] - - if len(emoji) == 1: - for category_name, category_emojis in emoji.items(): - if len(category_emojis) == 1: - msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.") - else: - msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.") - embed.set_thumbnail(url=random.choice(category_emojis).url) - - else: - for category_name, category_emojis in emoji.items(): - emoji_choice = random.choice(category_emojis) - if len(category_emojis) > 1: - emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category." - else: - emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category." - if emoji_choice.animated: - msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") - else: - msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") - return embed, msg - - @staticmethod - def generate_invalid_embed(emojis: list[Emoji]) -> tuple[Embed, list[str]]: - """Generates error embed for invalid emoji categories.""" - embed = Embed( - color=Colours.soft_red, - title=random.choice(ERROR_REPLIES) - ) - msg = [] - - emoji_dict = defaultdict(list) - for emoji in emojis: - emoji_dict[emoji.name.split("_")[0]].append(emoji) - - error_comp = ", ".join(emoji_dict) - msg.append(f"These are the valid emoji categories:\n```\n{error_comp}\n```") - return embed, msg - - @commands.group(name="emoji", invoke_without_command=True) - async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None: - """A group of commands related to emojis.""" - if emoji is not None: - await ctx.invoke(self.info_command, emoji) - else: - await invoke_help_command(ctx) - - @emoji_group.command(name="count", aliases=("c",)) - async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None: - """Returns embed with emoji category and info given by the user.""" - emoji_dict = defaultdict(list) - - if not ctx.guild.emojis: - await ctx.send("No emojis found.") - return - log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.") - for emoji in ctx.guild.emojis: - emoji_category = emoji.name.split("_")[0] - - if category_query is not None and emoji_category not in category_query: - continue - - emoji_dict[emoji_category].append(emoji) - - if not emoji_dict: - log.trace("Invalid name provided by the user") - embed, msg = self.generate_invalid_embed(ctx.guild.emojis) - else: - embed, msg = self.embed_builder(emoji_dict) - await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) - - @emoji_group.command(name="info", aliases=("i",)) - async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None: - """Returns relevant information about a Discord Emoji.""" - emoji_information = Embed( - title=f"Emoji Information: {emoji.name}", - description=textwrap.dedent(f""" - **Name:** {emoji.name} - **Created:** {time_since(emoji.created_at, precision="hours")} - **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} - **ID:** {emoji.id} - """), - color=Color.blurple(), - url=str(emoji.url), - ).set_thumbnail(url=emoji.url) - - await ctx.send(embed=emoji_information) - - -def setup(bot: Bot) -> None: - """Load the Emojis cog.""" - bot.add_cog(Emojis()) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py deleted file mode 100644 index fd2123e7..00000000 --- a/bot/exts/evergreen/error_handler.py +++ /dev/null @@ -1,182 +0,0 @@ -import difflib -import logging -import math -import random -from collections.abc import Iterable -from typing import Union - -from discord import Embed, Message -from discord.ext import commands -from sentry_sdk import push_scope - -from bot.bot import Bot -from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput -from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import APIError, UserNotPlayingError - -log = logging.getLogger(__name__) - - -QUESTION_MARK_ICON = "https://cdn.discordapp.com/emojis/512367613339369475.png" - - -class CommandErrorHandler(commands.Cog): - """A error handler for the PythonDiscord server.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @staticmethod - def revert_cooldown_counter(command: commands.Command, message: Message) -> None: - """Undoes the last cooldown counter for user-error cases.""" - if command._buckets.valid: - bucket = command._buckets.get_bucket(message) - bucket._tokens = min(bucket.rate, bucket._tokens + 1) - logging.debug("Cooldown counter reverted as the command was not used correctly.") - - @staticmethod - def error_embed(message: str, title: Union[Iterable, str] = ERROR_REPLIES) -> Embed: - """Build a basic embed with red colour and either a random error title or a title provided.""" - embed = Embed(colour=Colours.soft_red) - if isinstance(title, str): - embed.title = title - else: - embed.title = random.choice(title) - embed.description = message - return embed - - @commands.Cog.listener() - async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None: - """Activates when a command raises an error.""" - if getattr(error, "handled", False): - logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.") - return - - parent_command = "" - if subctx := getattr(ctx, "subcontext", None): - parent_command = f"{ctx.command} " - ctx = subctx - - error = getattr(error, "original", error) - logging.debug( - f"Error Encountered: {type(error).__name__} - {str(error)}, " - f"Command: {ctx.command}, " - f"Author: {ctx.author}, " - f"Channel: {ctx.channel}" - ) - - if isinstance(error, commands.CommandNotFound): - await self.send_command_suggestion(ctx, ctx.invoked_with) - return - - if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): - await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) - return - - if isinstance(error, commands.UserInputError): - self.revert_cooldown_counter(ctx.command, ctx.message) - usage = f"```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" - embed = self.error_embed( - f"Your input was invalid: {error}\n\nUsage:{usage}" - ) - await ctx.send(embed=embed) - return - - if isinstance(error, commands.CommandOnCooldown): - mins, secs = divmod(math.ceil(error.retry_after), 60) - embed = self.error_embed( - f"This command is on cooldown:\nPlease retry in {mins} minutes {secs} seconds.", - NEGATIVE_REPLIES - ) - await ctx.send(embed=embed, delete_after=7.5) - return - - if isinstance(error, commands.DisabledCommand): - await ctx.send(embed=self.error_embed("This command has been disabled.", NEGATIVE_REPLIES)) - return - - if isinstance(error, commands.NoPrivateMessage): - await ctx.send( - embed=self.error_embed( - f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", - NEGATIVE_REPLIES - ) - ) - return - - if isinstance(error, commands.BadArgument): - self.revert_cooldown_counter(ctx.command, ctx.message) - embed = self.error_embed( - "The argument you provided was invalid: " - f"{error}\n\nUsage:\n```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" - ) - await ctx.send(embed=embed) - return - - if isinstance(error, commands.CheckFailure): - await ctx.send(embed=self.error_embed("You are not authorized to use this command.", NEGATIVE_REPLIES)) - return - - if isinstance(error, UserNotPlayingError): - await ctx.send("Game not found.") - return - - if isinstance(error, APIError): - await ctx.send( - embed=self.error_embed( - f"There was an error when communicating with the {error.api}", - NEGATIVE_REPLIES - ) - ) - return - - with push_scope() as scope: - scope.user = { - "id": ctx.author.id, - "username": str(ctx.author) - } - - scope.set_tag("command", ctx.command.qualified_name) - scope.set_tag("message_id", ctx.message.id) - scope.set_tag("channel_id", ctx.channel.id) - - scope.set_extra("full_message", ctx.message.content) - - if ctx.guild is not None: - scope.set_extra("jump_to", ctx.message.jump_url) - - log.exception(f"Unhandled command error: {str(error)}", exc_info=error) - - async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None: - """Sends user similar commands if any can be found.""" - raw_commands = [] - for cmd in self.bot.walk_commands(): - if not cmd.hidden: - raw_commands += (cmd.name, *cmd.aliases) - if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1): - similar_command_name = similar_command_data[0] - similar_command = self.bot.get_command(similar_command_name) - - if not similar_command: - return - - log_msg = "Cancelling attempt to suggest a command due to failed checks." - try: - if not await similar_command.can_run(ctx): - log.debug(log_msg) - return - except commands.errors.CommandError as cmd_error: - log.debug(log_msg) - await self.on_command_error(ctx, cmd_error) - return - - misspelled_content = ctx.message.content - e = Embed() - e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON) - e.description = misspelled_content.replace(command_name, similar_command_name, 1) - await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay) - - -def setup(bot: Bot) -> None: - """Load the ErrorHandler cog.""" - bot.add_cog(CommandErrorHandler(bot)) diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py deleted file mode 100644 index 4bbfe859..00000000 --- a/bot/exts/evergreen/fun.py +++ /dev/null @@ -1,250 +0,0 @@ -import functools -import json -import logging -import random -from collections.abc import Iterable -from pathlib import Path -from typing import Callable, Optional, Union - -from discord import Embed, Message -from discord.ext import commands -from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content - -from bot import utils -from bot.bot import Bot -from bot.constants import Client, Colours, Emojis -from bot.utils import helpers - -log = logging.getLogger(__name__) - -UWU_WORDS = { - "fi": "fwi", - "l": "w", - "r": "w", - "some": "sum", - "th": "d", - "thing": "fing", - "tho": "fo", - "you're": "yuw'we", - "your": "yur", - "you": "yuw", -} - - -def caesar_cipher(text: str, offset: int) -> Iterable[str]: - """ - Implements a lazy Caesar Cipher algorithm. - - Encrypts a `text` given a specific integer `offset`. The sign - of the `offset` dictates the direction in which it shifts to, - with a negative value shifting to the left, and a positive - value shifting to the right. - """ - for char in text: - if not char.isascii() or not char.isalpha() or char.isspace(): - yield char - continue - - case_start = 65 if char.isupper() else 97 - true_offset = (ord(char) - case_start + offset) % 26 - - yield chr(case_start + true_offset) - - -class Fun(Cog): - """A collection of general commands for fun.""" - - def __init__(self, bot: Bot): - self.bot = bot - - self._caesar_cipher_embed = json.loads(Path("bot/resources/evergreen/caesar_info.json").read_text("UTF-8")) - - @staticmethod - def _get_random_die() -> str: - """Generate a random die emoji, ready to be sent on Discord.""" - die_name = f"dice_{random.randint(1, 6)}" - return getattr(Emojis, die_name) - - @commands.command() - async def roll(self, ctx: Context, num_rolls: int = 1) -> None: - """Outputs a number of random dice emotes (up to 6).""" - if 1 <= num_rolls <= 6: - dice = " ".join(self._get_random_die() for _ in range(num_rolls)) - await ctx.send(dice) - else: - raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.") - - @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) - async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: - """Converts a given `text` into it's uwu equivalent.""" - conversion_func = functools.partial( - utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True - ) - text, embed = await Fun._get_text_and_embed(ctx, text) - # Convert embed if it exists - if embed is not None: - embed = Fun._convert_embed(conversion_func, embed) - converted_text = conversion_func(text) - converted_text = helpers.suppress_links(converted_text) - # Don't put >>> if only embed present - if converted_text: - converted_text = f">>> {converted_text.lstrip('> ')}" - await ctx.send(content=converted_text, embed=embed) - - @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) - async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: - """Randomly converts the casing of a given `text`.""" - def conversion_func(text: str) -> str: - """Randomly converts the casing of a given string.""" - return "".join( - char.upper() if round(random.random()) else char.lower() for char in text - ) - text, embed = await Fun._get_text_and_embed(ctx, text) - # Convert embed if it exists - if embed is not None: - embed = Fun._convert_embed(conversion_func, embed) - converted_text = conversion_func(text) - converted_text = helpers.suppress_links(converted_text) - # Don't put >>> if only embed present - if converted_text: - converted_text = f">>> {converted_text.lstrip('> ')}" - await ctx.send(content=converted_text, embed=embed) - - @commands.group(name="caesarcipher", aliases=("caesar", "cc",)) - async def caesarcipher_group(self, ctx: Context) -> None: - """ - Translates a message using the Caesar Cipher. - - See `decrypt`, `encrypt`, and `info` subcommands. - """ - if ctx.invoked_subcommand is None: - await ctx.invoke(self.bot.get_command("help"), "caesarcipher") - - @caesarcipher_group.command(name="info") - async def caesarcipher_info(self, ctx: Context) -> None: - """Information about the Caesar Cipher.""" - embed = Embed.from_dict(self._caesar_cipher_embed) - embed.colour = Colours.dark_green - - await ctx.send(embed=embed) - - @staticmethod - async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None: - """ - Given a positive integer `offset`, translates and sends the given `msg`. - - Performs a right shift by default unless `left_shift` is specified as `True`. - - Also accepts a valid Discord Message ID or link. - """ - if offset < 0: - await ctx.send(":no_entry: Cannot use a negative offset.") - return - - if left_shift: - offset = -offset - - def conversion_func(text: str) -> str: - """Encrypts the given string using the Caesar Cipher.""" - return "".join(caesar_cipher(text, offset)) - - text, embed = await Fun._get_text_and_embed(ctx, msg) - - if embed is not None: - embed = Fun._convert_embed(conversion_func, embed) - - converted_text = conversion_func(text) - - if converted_text: - converted_text = f">>> {converted_text.lstrip('> ')}" - - await ctx.send(content=converted_text, embed=embed) - - @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",)) - async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None: - """ - Given a positive integer `offset`, encrypt the given `msg`. - - Performs a right shift of the letters in the message. - - Also accepts a valid Discord Message ID or link. - """ - await self._caesar_cipher(ctx, offset, msg, left_shift=False) - - @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",)) - async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None: - """ - Given a positive integer `offset`, decrypt the given `msg`. - - Performs a left shift of the letters in the message. - - Also accepts a valid Discord Message ID or link. - """ - await self._caesar_cipher(ctx, offset, msg, left_shift=True) - - @staticmethod - async def _get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]: - """ - Attempts to extract the text and embed from a possible link to a discord Message. - - Does not retrieve the text and embed from the Message if it is in a channel the user does - not have read permissions in. - - Returns a tuple of: - str: If `text` is a valid discord Message, the contents of the message, else `text`. - Optional[Embed]: The embed if found in the valid Message, else None - """ - embed = None - - msg = await Fun._get_discord_message(ctx, text) - # Ensure the user has read permissions for the channel the message is in - if isinstance(msg, Message): - permissions = msg.channel.permissions_for(ctx.author) - if permissions.read_messages: - text = msg.clean_content - # Take first embed because we can't send multiple embeds - if msg.embeds: - embed = msg.embeds[0] - - return (text, embed) - - @staticmethod - async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]: - """ - Attempts to convert a given `text` to a discord Message object and return it. - - Conversion will succeed if given a discord Message ID or link. - Returns `text` if the conversion fails. - """ - try: - text = await MessageConverter().convert(ctx, text) - except commands.BadArgument: - log.debug(f"Input '{text:.20}...' is not a valid Discord Message") - return text - - @staticmethod - def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: - """ - Converts the text in an embed using a given conversion function, then return the embed. - - Only modifies the following fields: title, description, footer, fields - """ - embed_dict = embed.to_dict() - - embed_dict["title"] = func(embed_dict.get("title", "")) - embed_dict["description"] = func(embed_dict.get("description", "")) - - if "footer" in embed_dict: - embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) - - if "fields" in embed_dict: - for field in embed_dict["fields"]: - field["name"] = func(field.get("name", "")) - field["value"] = func(field.get("value", "")) - - return Embed.from_dict(embed_dict) - - -def setup(bot: Bot) -> None: - """Load the Fun cog.""" - bot.add_cog(Fun(bot)) diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py deleted file mode 100644 index f9c150e6..00000000 --- a/bot/exts/evergreen/game.py +++ /dev/null @@ -1,485 +0,0 @@ -import difflib -import logging -import random -import re -from asyncio import sleep -from datetime import datetime as dt, timedelta -from enum import IntEnum -from typing import Any, Optional - -from aiohttp import ClientSession -from discord import Embed -from discord.ext import tasks -from discord.ext.commands import Cog, Context, group - -from bot.bot import Bot -from bot.constants import STAFF_ROLES, Tokens -from bot.utils.decorators import with_role -from bot.utils.extensions import invoke_help_command -from bot.utils.pagination import ImagePaginator, LinePaginator - -# Base URL of IGDB API -BASE_URL = "https://api.igdb.com/v4" - -CLIENT_ID = Tokens.igdb_client_id -CLIENT_SECRET = Tokens.igdb_client_secret - -# The number of seconds before expiry that we attempt to re-fetch a new access token -ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2 - -# URL to request API access token -OAUTH_URL = "https://id.twitch.tv/oauth2/token" - -OAUTH_PARAMS = { - "client_id": CLIENT_ID, - "client_secret": CLIENT_SECRET, - "grant_type": "client_credentials" -} - -BASE_HEADERS = { - "Client-ID": CLIENT_ID, - "Accept": "application/json" -} - -logger = logging.getLogger(__name__) - -REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE) - -# --------- -# TEMPLATES -# --------- - -# Body templates -# Request body template for get_games_list -GAMES_LIST_BODY = ( - "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status," - "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;" - "{sort} {limit} {offset} {genre} {additional}" -) - -# Request body template for get_companies_list -COMPANIES_LIST_BODY = ( - "fields name, url, start_date, logo.image_id, developed.name, published.name, description;" - "offset {offset}; limit {limit};" -) - -# Request body template for games search -SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";' - -# Pages templates -# Game embed layout -GAME_PAGE = ( - "**[{name}]({url})**\n" - "{description}" - "**Release Date:** {release_date}\n" - "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n" - "**Platforms:** {platforms}\n" - "**Status:** {status}\n" - "**Age Ratings:** {age_ratings}\n" - "**Made by:** {made_by}\n\n" - "{storyline}" -) - -# .games company command page layout -COMPANY_PAGE = ( - "**[{name}]({url})**\n" - "{description}" - "**Founded:** {founded}\n" - "**Developed:** {developed}\n" - "**Published:** {published}" -) - -# For .games search command line layout -GAME_SEARCH_LINE = ( - "**[{name}]({url})**\n" - "{rating}/100 :star: (based on {rating_count} ratings)\n" -) - -# URL templates -COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg" -LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png" - -# Create aliases for complex genre names -ALIASES = { - "Role-playing (rpg)": ["Role playing", "Rpg"], - "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"], - "Real time strategy (rts)": ["Real time strategy", "Rts"], - "Hack and slash/beat 'em up": ["Hack and slash"] -} - - -class GameStatus(IntEnum): - """Game statuses in IGDB API.""" - - Released = 0 - Alpha = 2 - Beta = 3 - Early = 4 - Offline = 5 - Cancelled = 6 - Rumored = 7 - - -class AgeRatingCategories(IntEnum): - """IGDB API Age Rating categories IDs.""" - - ESRB = 1 - PEGI = 2 - - -class AgeRatings(IntEnum): - """PEGI/ESRB ratings IGDB API IDs.""" - - Three = 1 - Seven = 2 - Twelve = 3 - Sixteen = 4 - Eighteen = 5 - RP = 6 - EC = 7 - E = 8 - E10 = 9 - T = 10 - M = 11 - AO = 12 - - -class Games(Cog): - """Games Cog contains commands that collect data from IGDB.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.http_session: ClientSession = bot.http_session - - self.genres: dict[str, int] = {} - self.headers = BASE_HEADERS - - self.bot.loop.create_task(self.renew_access_token()) - - async def renew_access_token(self) -> None: - """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`.""" - while True: - async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp: - result = await resp.json() - if resp.status != 200: - # If there is a valid access token continue to use that, - # otherwise unload cog. - if "Authorization" in self.headers: - time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW) - logger.error( - "Failed to renew IGDB access token. " - f"Current token will last for {time_delta} " - f"OAuth response message: {result['message']}" - ) - else: - logger.warning( - "Invalid OAuth credentials. Unloading Games cog. " - f"OAuth response message: {result['message']}" - ) - self.bot.remove_cog("Games") - - return - - self.headers["Authorization"] = f"Bearer {result['access_token']}" - - # Attempt to renew before the token expires - next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW - - time_delta = timedelta(seconds=next_renewal) - logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}") - - # This will be true the first time this loop runs. - # Since we now have an access token, its safe to start this task. - if self.genres == {}: - self.refresh_genres_task.start() - await sleep(next_renewal) - - @tasks.loop(hours=24.0) - async def refresh_genres_task(self) -> None: - """Refresh genres in every hour.""" - try: - await self._get_genres() - except Exception as e: - logger.warning(f"There was error while refreshing genres: {e}") - return - logger.info("Successfully refreshed genres.") - - def cog_unload(self) -> None: - """Cancel genres refreshing start when unloading Cog.""" - self.refresh_genres_task.cancel() - logger.info("Successfully stopped Genres Refreshing task.") - - async def _get_genres(self) -> None: - """Create genres variable for games command.""" - body = "fields name; limit 100;" - async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp: - result = await resp.json() - genres = {genre["name"].capitalize(): genre["id"] for genre in result} - - # Replace complex names with names from ALIASES - for genre_name, genre in genres.items(): - if genre_name in ALIASES: - for alias in ALIASES[genre_name]: - self.genres[alias] = genre - else: - self.genres[genre_name] = genre - - @group(name="games", aliases=("game",), invoke_without_command=True) - async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None: - """ - Get random game(s) by genre from IGDB. Use .games genres command to get all available genres. - - Also support amount parameter, what max is 25 and min 1, default 5. Supported formats: - - .games <genre> - - .games <amount> <genre> - """ - # When user didn't specified genre, send help message - if genre is None: - await invoke_help_command(ctx) - return - - # Capitalize genre for check - genre = "".join(genre).capitalize() - - # Check for amounts, max is 25 and min 1 - if not 1 <= amount <= 25: - await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") - return - - # Get games listing, if genre don't exist, show error message with possibilities. - # Offset must be random, due otherwise we will get always same result (offset show in which position should - # API start returning result) - try: - games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150)) - except KeyError: - possibilities = await self.get_best_results(genre) - # If there is more than 1 possibilities, show these. - # If there is only 1 possibility, use it as genre. - # Otherwise send message about invalid genre. - if len(possibilities) > 1: - display_possibilities = "`, `".join(p[1] for p in possibilities) - await ctx.send( - f"Invalid genre `{genre}`. " - f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}" - ) - return - elif len(possibilities) == 1: - games = await self.get_games_list( - amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150) - ) - genre = possibilities[0][1] - else: - await ctx.send(f"Invalid genre `{genre}`.") - return - - # Create pages and paginate - pages = [await self.create_page(game) for game in games] - - await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games")) - - @games.command(name="top", aliases=("t",)) - async def top(self, ctx: Context, amount: int = 10) -> None: - """ - Get current Top games in IGDB. - - Support amount parameter. Max is 25, min is 1. - """ - if not 1 <= amount <= 25: - await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") - return - - games = await self.get_games_list(amount, sort="total_rating desc", - additional_body="where total_rating >= 90; sort total_rating_count desc;") - - pages = [await self.create_page(game) for game in games] - await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games")) - - @games.command(name="genres", aliases=("genre", "g")) - async def genres(self, ctx: Context) -> None: - """Get all available genres.""" - await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}") - - @games.command(name="search", aliases=("s",)) - async def search(self, ctx: Context, *, search_term: str) -> None: - """Find games by name.""" - lines = await self.search_games(search_term) - - await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False) - - @games.command(name="company", aliases=("companies",)) - async def company(self, ctx: Context, amount: int = 5) -> None: - """ - Get random Game Companies companies from IGDB API. - - Support amount parameter. Max is 25, min is 1. - """ - if not 1 <= amount <= 25: - await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") - return - - # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to - # get (almost) every time different companies (offset show in which position should API start returning result) - companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150)) - pages = [await self.create_company_page(co) for co in companies] - - await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies")) - - @with_role(*STAFF_ROLES) - @games.command(name="refresh", aliases=("r",)) - async def refresh_genres_command(self, ctx: Context) -> None: - """Refresh .games command genres.""" - try: - await self._get_genres() - except Exception as e: - await ctx.send(f"There was error while refreshing genres: `{e}`") - return - await ctx.send("Successfully refreshed genres.") - - async def get_games_list( - self, - amount: int, - genre: Optional[str] = None, - sort: Optional[str] = None, - additional_body: str = "", - offset: int = 0 - ) -> list[dict[str, Any]]: - """ - Get list of games from IGDB API by parameters that is provided. - - Amount param show how much games this get, genre is genre ID and at least one genre in game must this when - provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field, - desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start - position in API. - """ - # Create body of IGDB API request, define fields, sorting, offset, limit and genre - params = { - "sort": f"sort {sort};" if sort else "", - "limit": f"limit {amount};", - "offset": f"offset {offset};" if offset else "", - "genre": f"where genres = ({genre});" if genre else "", - "additional": additional_body - } - body = GAMES_LIST_BODY.format(**params) - - # Do request to IGDB API, create headers, URL, define body, return result - async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: - return await resp.json() - - async def create_page(self, data: dict[str, Any]) -> tuple[str, str]: - """Create content of Game Page.""" - # Create cover image URL from template - url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""}) - - # Get release date separately with checking - release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?" - - # Create Age Ratings value - rating = ", ".join( - f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" - for age in data["age_ratings"] - ) if "age_ratings" in data else "?" - - companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?" - - # Create formatting for template page - formatting = { - "name": data["name"], - "url": data["url"], - "description": f"{data['summary']}\n\n" if "summary" in data else "\n", - "release_date": release_date, - "rating": round(data["total_rating"] if "total_rating" in data else 0, 2), - "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?", - "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?", - "status": GameStatus(data["status"]).name if "status" in data else "?", - "age_ratings": rating, - "made_by": ", ".join(companies), - "storyline": data["storyline"] if "storyline" in data else "" - } - page = GAME_PAGE.format(**formatting) - - return page, url - - async def search_games(self, search_term: str) -> list[str]: - """Search game from IGDB API by string, return listing of pages.""" - lines = [] - - # Define request body of IGDB API request and do request - body = SEARCH_BODY.format(**{"term": search_term}) - - async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: - data = await resp.json() - - # Loop over games, format them to good format, make line and append this to total lines - for game in data: - formatting = { - "name": game["name"], - "url": game["url"], - "rating": round(game["total_rating"] if "total_rating" in game else 0, 2), - "rating_count": game["total_rating_count"] if "total_rating" in game else "?" - } - line = GAME_SEARCH_LINE.format(**formatting) - lines.append(line) - - return lines - - async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]: - """ - Get random Game Companies from IGDB API. - - Limit is parameter, that show how much movies this should return, offset show in which position should API start - returning results. - """ - # Create request body from template - body = COMPANIES_LIST_BODY.format(**{ - "limit": limit, - "offset": offset - }) - - async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp: - return await resp.json() - - async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]: - """Create good formatted Game Company page.""" - # Generate URL of company logo - url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""}) - - # Try to get found date of company - founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?" - - # Generate list of games, that company have developed or published - developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?" - published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?" - - formatting = { - "name": data["name"], - "url": data["url"], - "description": f"{data['description']}\n\n" if "description" in data else "\n", - "founded": founded, - "developed": developed, - "published": published - } - page = COMPANY_PAGE.format(**formatting) - - return page, url - - async def get_best_results(self, query: str) -> list[tuple[float, str]]: - """Get best match result of genre when original genre is invalid.""" - results = [] - for genre in self.genres: - ratios = [difflib.SequenceMatcher(None, query, genre).ratio()] - for word in REGEX_NON_ALPHABET.split(genre): - ratios.append(difflib.SequenceMatcher(None, query, word).ratio()) - results.append((round(max(ratios), 2), genre)) - return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] - - -def setup(bot: Bot) -> None: - """Load the Games cog.""" - # Check does IGDB API key exist, if not, log warning and don't load cog - if not Tokens.igdb_client_id: - logger.warning("No IGDB client ID. Not loading Games cog.") - return - if not Tokens.igdb_client_secret: - logger.warning("No IGDB client secret. Not loading Games cog.") - return - bot.add_cog(Games(bot)) diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py deleted file mode 100644 index bbc9061a..00000000 --- a/bot/exts/evergreen/githubinfo.py +++ /dev/null @@ -1,178 +0,0 @@ -import logging -import random -from datetime import datetime -from urllib.parse import quote, quote_plus - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES -from bot.exts.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -GITHUB_API_URL = "https://api.github.com" - - -class GithubInfo(commands.Cog): - """Fetches info from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - - async def fetch_data(self, url: str) -> dict: - """Retrieve data as a dictionary.""" - async with self.bot.http_session.get(url) as r: - return await r.json() - - @commands.group(name="github", aliases=("gh", "git")) - @commands.cooldown(1, 10, commands.BucketType.user) - async def github_group(self, ctx: commands.Context) -> None: - """Commands for finding information related to GitHub.""" - if ctx.invoked_subcommand is None: - await invoke_help_command(ctx) - - @github_group.command(name="user", aliases=("userinfo",)) - async def github_user_info(self, ctx: commands.Context, username: str) -> None: - """Fetches a user's GitHub information.""" - async with ctx.typing(): - user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") - - # User_data will not have a message key if the user exists - if "message" in user_data: - embed = discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=f"The profile for `{username}` was not found.", - colour=Colours.soft_red - ) - - await ctx.send(embed=embed) - return - - org_data = await self.fetch_data(user_data["organizations_url"]) - orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] - orgs_to_add = " | ".join(orgs) - - gists = user_data["public_gists"] - - # Forming blog link - if user_data["blog"].startswith("http"): # Blog link is complete - blog = user_data["blog"] - elif user_data["blog"]: # Blog exists but the link is not complete - blog = f"https://{user_data['blog']}" - else: - blog = "No website link available" - - embed = discord.Embed( - title=f"`{user_data['login']}`'s GitHub profile info", - description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "", - colour=discord.Colour.blurple(), - url=user_data["html_url"], - timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ") - ) - embed.set_thumbnail(url=user_data["avatar_url"]) - embed.set_footer(text="Account created at") - - if user_data["type"] == "User": - - embed.add_field( - name="Followers", - value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)" - ) - embed.add_field( - name="Following", - value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)" - ) - - embed.add_field( - name="Public repos", - value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)" - ) - - if user_data["type"] == "User": - embed.add_field( - name="Gists", - value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" - ) - - embed.add_field( - name=f"Organization{'s' if len(orgs)!=1 else ''}", - value=orgs_to_add if orgs else "No organizations." - ) - embed.add_field(name="Website", value=blog) - - await ctx.send(embed=embed) - - @github_group.command(name='repository', aliases=('repo',)) - async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None: - """ - Fetches a repositories' GitHub information. - - The repository should look like `user/reponame` or `user reponame`. - """ - repo = "/".join(repo) - if repo.count("/") != 1: - embed = discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description="The repository should look like `user/reponame` or `user reponame`.", - colour=Colours.soft_red - ) - - await ctx.send(embed=embed) - return - - async with ctx.typing(): - repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") - - # There won't be a message key if this repo exists - if "message" in repo_data: - embed = discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description="The requested repository was not found.", - colour=Colours.soft_red - ) - - await ctx.send(embed=embed) - return - - embed = discord.Embed( - title=repo_data["name"], - description=repo_data["description"], - colour=discord.Colour.blurple(), - url=repo_data["html_url"] - ) - - # If it's a fork, then it will have a parent key - try: - parent = repo_data["parent"] - embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})" - except KeyError: - log.debug("Repository is not a fork.") - - repo_owner = repo_data["owner"] - - embed.set_author( - name=repo_owner["login"], - url=repo_owner["html_url"], - icon_url=repo_owner["avatar_url"] - ) - - repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y") - last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M") - - embed.set_footer( - text=( - f"{repo_data['forks_count']} ⑂ " - f"• {repo_data['stargazers_count']} ⭐ " - f"• Created At {repo_created_at} " - f"• Last Commit {last_pushed}" - ) - ) - - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the GithubInfo cog.""" - bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py deleted file mode 100644 index 4b766b50..00000000 --- a/bot/exts/evergreen/help.py +++ /dev/null @@ -1,562 +0,0 @@ -# Help command from Python bot. All commands that will be added to there in futures should be added to here too. -import asyncio -import itertools -import logging -from contextlib import suppress -from typing import NamedTuple, Union - -from discord import Colour, Embed, HTTPException, Message, Reaction, User -from discord.ext import commands -from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context -from rapidfuzz import process - -from bot import constants -from bot.bot import Bot -from bot.constants import Emojis -from bot.utils.pagination import ( - FIRST_EMOJI, LAST_EMOJI, - LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, -) - -DELETE_EMOJI = Emojis.trashcan - -REACTIONS = { - FIRST_EMOJI: "first", - LEFT_EMOJI: "back", - RIGHT_EMOJI: "next", - LAST_EMOJI: "end", - DELETE_EMOJI: "stop", -} - - -class Cog(NamedTuple): - """Show information about a Cog's name, description and commands.""" - - name: str - description: str - commands: list[Command] - - -log = logging.getLogger(__name__) - - -class HelpQueryNotFound(ValueError): - """ - Raised when a HelpSession Query doesn't match a command or cog. - - Contains the custom attribute of ``possible_matches``. - Instances of this object contain a dictionary of any command(s) that were close to matching the - query, where keys are the possible matched command names and values are the likeness match scores. - """ - - def __init__(self, arg: str, possible_matches: dict = None): - super().__init__(arg) - self.possible_matches = possible_matches - - -class HelpSession: - """ - An interactive session for bot and command help output. - - Expected attributes include: - * title: str - The title of the help message. - * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command] - * description: str - The description of the query. - * pages: list[str] - A list of the help content split into manageable pages. - * message: `discord.Message` - The message object that's showing the help contents. - * destination: `discord.abc.Messageable` - Where the help message is to be sent to. - Cogs can be grouped into custom categories. All cogs with the same category will be displayed - under a single category name in the help output. Custom categories are defined inside the cogs - as a class attribute named `category`. A description can also be specified with the attribute - `category_description`. If a description is not found in at least one cog, the default will be - the regular description (class docstring) of the first cog found in the category. - """ - - def __init__( - self, - ctx: Context, - *command, - cleanup: bool = False, - only_can_run: bool = True, - show_hidden: bool = False, - max_lines: int = 15 - ): - """Creates an instance of the HelpSession class.""" - self._ctx = ctx - self._bot = ctx.bot - self.title = "Command Help" - - # set the query details for the session - if command: - query_str = " ".join(command) - self.query = self._get_query(query_str) - self.description = self.query.description or self.query.help - else: - self.query = ctx.bot - self.description = self.query.description - self.author = ctx.author - self.destination = ctx.channel - - # set the config for the session - self._cleanup = cleanup - self._only_can_run = only_can_run - self._show_hidden = show_hidden - self._max_lines = max_lines - - # init session states - self._pages = None - self._current_page = 0 - self.message = None - self._timeout_task = None - self.reset_timeout() - - def _get_query(self, query: str) -> Union[Command, Cog]: - """Attempts to match the provided query with a valid command or cog.""" - command = self._bot.get_command(query) - if command: - return command - - # Find all cog categories that match. - cog_matches = [] - description = None - for cog in self._bot.cogs.values(): - if hasattr(cog, "category") and cog.category == query: - cog_matches.append(cog) - if hasattr(cog, "category_description"): - description = cog.category_description - - # Try to search by cog name if no categories match. - if not cog_matches: - cog = self._bot.cogs.get(query) - - # Don't consider it a match if the cog has a category. - if cog and not hasattr(cog, "category"): - cog_matches = [cog] - - if cog_matches: - cog = cog_matches[0] - cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs - - return Cog( - name=cog.category if hasattr(cog, "category") else cog.qualified_name, - description=description or cog.description, - commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list - ) - - self._handle_not_found(query) - - def _handle_not_found(self, query: str) -> None: - """ - Handles when a query does not match a valid command or cog. - - Will pass on possible close matches along with the `HelpQueryNotFound` exception. - """ - # Combine command and cog names - choices = list(self._bot.all_commands) + list(self._bot.cogs) - - result = process.extract(query, choices, score_cutoff=90) - - raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) - - async def timeout(self, seconds: int = 30) -> None: - """Waits for a set number of seconds, then stops the help session.""" - await asyncio.sleep(seconds) - await self.stop() - - def reset_timeout(self) -> None: - """Cancels the original timeout task and sets it again from the start.""" - # cancel original if it exists - if self._timeout_task: - if not self._timeout_task.cancelled(): - self._timeout_task.cancel() - - # recreate the timeout task - self._timeout_task = self._bot.loop.create_task(self.timeout()) - - async def on_reaction_add(self, reaction: Reaction, user: User) -> None: - """Event handler for when reactions are added on the help message.""" - # ensure it was the relevant session message - if reaction.message.id != self.message.id: - return - - # ensure it was the session author who reacted - if user.id != self.author.id: - return - - emoji = str(reaction.emoji) - - # check if valid action - if emoji not in REACTIONS: - return - - self.reset_timeout() - - # Run relevant action method - action = getattr(self, f"do_{REACTIONS[emoji]}", None) - if action: - await action() - - # remove the added reaction to prep for re-use - with suppress(HTTPException): - await self.message.remove_reaction(reaction, user) - - async def on_message_delete(self, message: Message) -> None: - """Closes the help session when the help message is deleted.""" - if message.id == self.message.id: - await self.stop() - - async def prepare(self) -> None: - """Sets up the help session pages, events, message and reactions.""" - await self.build_pages() - - self._bot.add_listener(self.on_reaction_add) - self._bot.add_listener(self.on_message_delete) - - await self.update_page() - self.add_reactions() - - def add_reactions(self) -> None: - """Adds the relevant reactions to the help message based on if pagination is required.""" - # if paginating - if len(self._pages) > 1: - for reaction in REACTIONS: - self._bot.loop.create_task(self.message.add_reaction(reaction)) - - # if single-page - else: - self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI)) - - def _category_key(self, cmd: Command) -> str: - """ - Returns a cog name of a given command for use as a key for `sorted` and `groupby`. - - A zero width space is used as a prefix for results with no cogs to force them last in ordering. - """ - if cmd.cog: - try: - if cmd.cog.category: - return f"**{cmd.cog.category}**" - except AttributeError: - pass - - return f"**{cmd.cog_name}**" - else: - return "**\u200bNo Category:**" - - def _get_command_params(self, cmd: Command) -> str: - """ - Returns the command usage signature. - - This is a custom implementation of `command.signature` in order to format the command - signature without aliases. - """ - results = [] - for name, param in cmd.clean_params.items(): - - # if argument has a default value - if param.default is not param.empty: - - if isinstance(param.default, str): - show_default = param.default - else: - show_default = param.default is not None - - # if default is not an empty string or None - if show_default: - results.append(f"[{name}={param.default}]") - else: - results.append(f"[{name}]") - - # if variable length argument - elif param.kind == param.VAR_POSITIONAL: - results.append(f"[{name}...]") - - # if required - else: - results.append(f"<{name}>") - - return f"{cmd.name} {' '.join(results)}" - - async def build_pages(self) -> None: - """Builds the list of content pages to be paginated through in the help message, as a list of str.""" - # Use LinePaginator to restrict embed line height - paginator = LinePaginator(prefix="", suffix="", max_lines=self._max_lines) - - # show signature if query is a command - if isinstance(self.query, commands.Command): - await self._add_command_signature(paginator) - - if isinstance(self.query, Cog): - paginator.add_line(f"**{self.query.name}**") - - if self.description: - paginator.add_line(f"*{self.description}*") - - # list all children commands of the queried object - if isinstance(self.query, (commands.GroupMixin, Cog)): - await self._list_child_commands(paginator) - - self._pages = paginator.pages - - async def _add_command_signature(self, paginator: LinePaginator) -> None: - prefix = constants.Client.prefix - - signature = self._get_command_params(self.query) - parent = self.query.full_parent_name + " " if self.query.parent else "" - paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**") - aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases] - aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())] - aliases = ", ".join(sorted(aliases)) - if aliases: - paginator.add_line(f"**Can also use:** {aliases}\n") - if not await self.query.can_run(self._ctx): - paginator.add_line("***You cannot run this command.***\n") - - async def _list_child_commands(self, paginator: LinePaginator) -> None: - # remove hidden commands if session is not wanting hiddens - if not self._show_hidden: - filtered = [c for c in self.query.commands if not c.hidden] - else: - filtered = self.query.commands - - # if after filter there are no commands, finish up - if not filtered: - self._pages = paginator.pages - return - - if isinstance(self.query, Cog): - grouped = (("**Commands:**", self.query.commands),) - - elif isinstance(self.query, commands.Command): - grouped = (("**Subcommands:**", self.query.commands),) - - # otherwise sort and organise all commands into categories - else: - cat_sort = sorted(filtered, key=self._category_key) - grouped = itertools.groupby(cat_sort, key=self._category_key) - - for category, cmds in grouped: - await self._format_command_category(paginator, category, list(cmds)) - - async def _format_command_category(self, paginator: LinePaginator, category: str, cmds: list[Command]) -> None: - cmds = sorted(cmds, key=lambda c: c.name) - cat_cmds = [] - for command in cmds: - cat_cmds += await self._format_command(command) - - # state var for if the category should be added next - print_cat = 1 - new_page = True - - for details in cat_cmds: - - # keep details together, paginating early if it won"t fit - lines_adding = len(details.split("\n")) + print_cat - if paginator._linecount + lines_adding > self._max_lines: - paginator._linecount = 0 - new_page = True - paginator.close_page() - - # new page so print category title again - print_cat = 1 - - if print_cat: - if new_page: - paginator.add_line("") - paginator.add_line(category) - print_cat = 0 - - paginator.add_line(details) - - async def _format_command(self, command: Command) -> list[str]: - # skip if hidden and hide if session is set to - if command.hidden and not self._show_hidden: - return [] - - # Patch to make the !help command work outside of #bot-commands again - # This probably needs a proper rewrite, but this will make it work in - # the mean time. - try: - can_run = await command.can_run(self._ctx) - except CheckFailure: - can_run = False - - # see if the user can run the command - strikeout = "" - if not can_run: - # skip if we don't show commands they can't run - if self._only_can_run: - return [] - strikeout = "~~" - - if isinstance(self.query, commands.Command): - prefix = "" - else: - prefix = constants.Client.prefix - - signature = self._get_command_params(command) - info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" - - # handle if the command has no docstring - short_doc = command.short_doc or "No details provided" - return [f"{info}\n*{short_doc}*"] - - def embed_page(self, page_number: int = 0) -> Embed: - """Returns an Embed with the requested page formatted within.""" - embed = Embed() - - if isinstance(self.query, (commands.Command, Cog)) and page_number > 0: - title = f'Command Help | "{self.query.name}"' - else: - title = self.title - - embed.set_author(name=title, icon_url=constants.Icons.questionmark) - embed.description = self._pages[page_number] - - page_count = len(self._pages) - if page_count > 1: - embed.set_footer(text=f"Page {self._current_page+1} / {page_count}") - - return embed - - async def update_page(self, page_number: int = 0) -> None: - """Sends the intial message, or changes the existing one to the given page number.""" - self._current_page = page_number - embed_page = self.embed_page(page_number) - - if not self.message: - self.message = await self.destination.send(embed=embed_page) - else: - await self.message.edit(embed=embed_page) - - @classmethod - async def start(cls, ctx: Context, *command, **options) -> "HelpSession": - """ - Create and begin a help session based on the given command context. - - Available options kwargs: - * cleanup: Optional[bool] - Set to `True` to have the message deleted on session end. Defaults to `False`. - * only_can_run: Optional[bool] - Set to `True` to hide commands the user can't run. Defaults to `False`. - * show_hidden: Optional[bool] - Set to `True` to include hidden commands. Defaults to `False`. - * max_lines: Optional[int] - Sets the max number of lines the paginator will add to a single page. Defaults to 20. - """ - session = cls(ctx, *command, **options) - await session.prepare() - - return session - - async def stop(self) -> None: - """Stops the help session, removes event listeners and attempts to delete the help message.""" - self._bot.remove_listener(self.on_reaction_add) - self._bot.remove_listener(self.on_message_delete) - - # ignore if permission issue, or the message doesn't exist - with suppress(HTTPException, AttributeError): - if self._cleanup: - await self.message.delete() - else: - await self.message.clear_reactions() - - @property - def is_first_page(self) -> bool: - """Check if session is currently showing the first page.""" - return self._current_page == 0 - - @property - def is_last_page(self) -> bool: - """Check if the session is currently showing the last page.""" - return self._current_page == (len(self._pages)-1) - - async def do_first(self) -> None: - """Event that is called when the user requests the first page.""" - if not self.is_first_page: - await self.update_page(0) - - async def do_back(self) -> None: - """Event that is called when the user requests the previous page.""" - if not self.is_first_page: - await self.update_page(self._current_page-1) - - async def do_next(self) -> None: - """Event that is called when the user requests the next page.""" - if not self.is_last_page: - await self.update_page(self._current_page+1) - - async def do_end(self) -> None: - """Event that is called when the user requests the last page.""" - if not self.is_last_page: - await self.update_page(len(self._pages)-1) - - async def do_stop(self) -> None: - """Event that is called when the user requests to stop the help session.""" - await self.message.delete() - - -class Help(DiscordCog): - """Custom Embed Pagination Help feature.""" - - @commands.command("help") - async def new_help(self, ctx: Context, *commands) -> None: - """Shows Command Help.""" - try: - await HelpSession.start(ctx, *commands) - except HelpQueryNotFound as error: - embed = Embed() - embed.colour = Colour.red() - embed.title = str(error) - - if error.possible_matches: - matches = "\n".join(error.possible_matches.keys()) - embed.description = f"**Did you mean:**\n`{matches}`" - - await ctx.send(embed=embed) - - -def unload(bot: Bot) -> None: - """ - Reinstates the original help command. - - This is run if the cog raises an exception on load, or if the extension is unloaded. - """ - bot.remove_command("help") - bot.add_command(bot._old_help) - - -def setup(bot: Bot) -> None: - """ - The setup for the help extension. - - This is called automatically on `bot.load_extension` being run. - Stores the original help command instance on the `bot._old_help` attribute for later - reinstatement, before removing it from the command registry so the new help command can be - loaded successfully. - If an exception is raised during the loading of the cog, `unload` will be called in order to - reinstate the original help command. - """ - bot._old_help = bot.get_command("help") - bot.remove_command("help") - - try: - bot.add_cog(Help()) - except Exception: - unload(bot) - raise - - -def teardown(bot: Bot) -> None: - """ - The teardown for the help extension. - - This is called automatically on `bot.unload_extension` being run. - Calls `unload` in order to reinstate the original help command. - """ - unload(bot) diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py deleted file mode 100644 index 8a7ebed0..00000000 --- a/bot/exts/evergreen/issues.py +++ /dev/null @@ -1,275 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( - Categories, - Channels, - Colours, - ERROR_REPLIES, - Emojis, - NEGATIVE_REPLIES, - Tokens, - WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { - 404: "Issue/pull request not located! Please enter a valid number!", - 403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { - "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: - REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( - Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( - r"^`([^`\n]+)`" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( - r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: - """Dataclass representing an issue found by the regex.""" - - organisation: Optional[str] - repository: str - number: str - - def __hash__(self) -> int: - return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: - """Dataclass representing an error while fetching an issue.""" - - return_code: int - message: str - - -@dataclass -class IssueState: - """Dataclass representing the state of an issue.""" - - repository: str - number: int - url: str - title: str - emoji: str - - -class Issues(commands.Cog): - """Cog that allows users to retrieve issues from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.repos = [] - - @staticmethod - def remove_codeblocks(message: str) -> str: - """Remove any codeblock in a message.""" - return re.sub(CODE_BLOCK_RE, "", message) - - async def fetch_issues( - self, - number: int, - repository: str, - user: str - ) -> Union[IssueState, FetchError]: - """ - Retrieve an issue from a GitHub repository. - - Returns IssueState on success, FetchError on failure. - """ - url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) - pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) - log.trace(f"Querying GH issues API: {url}") - - async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: - json_data = await r.json() - - if r.status == 403: - if r.headers.get("X-RateLimit-Remaining") == "0": - log.info(f"Ratelimit reached while fetching {url}") - return FetchError(403, "Ratelimit reached, please retry in a few minutes.") - return FetchError(403, "Cannot access issue.") - elif r.status in (404, 410): - return FetchError(r.status, "Issue not found.") - elif r.status != 200: - return FetchError(r.status, "Error while fetching issue.") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data["html_url"]: - if json_data.get("state") == "open": - emoji = Emojis.issue_open - else: - emoji = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") - async with self.bot.http_session.get(pulls_url) as p: - pull_data = await p.json() - if pull_data["draft"]: - emoji = Emojis.pull_request_draft - elif pull_data["state"] == "open": - emoji = Emojis.pull_request_open - # When 'merged_at' is not None, this means that the state of the PR is merged - elif pull_data["merged_at"] is not None: - emoji = Emojis.pull_request_merged - else: - emoji = Emojis.pull_request_closed - - issue_url = json_data.get("html_url") - - return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - - @staticmethod - def format_embed( - results: list[Union[IssueState, FetchError]], - user: str, - repository: Optional[str] = None - ) -> discord.Embed: - """Take a list of IssueState or FetchError and format a Discord embed for them.""" - description_list = [] - - for result in results: - if isinstance(result, IssueState): - description_list.append(f"{result.emoji} [{result.title}]({result.url})") - elif isinstance(result, FetchError): - description_list.append(f":x: [{result.return_code}] {result.message}") - - resp = discord.Embed( - colour=Colours.bright_green, - description="\n".join(description_list) - ) - - embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" - resp.set_author(name="GitHub", url=embed_url) - return resp - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(aliases=("pr",)) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" - # Remove duplicates - numbers = set(numbers) - - if len(numbers) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await ctx.send(embed=embed) - await invoke_help_command(ctx) - - results = [await self.fetch_issues(number, repository, user) for number in numbers] - await ctx.send(embed=self.format_embed(results, user, repository)) - - @commands.Cog.listener() - async def on_message(self, message: discord.Message) -> None: - """ - Automatic issue linking. - - Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. - """ - # Ignore bots - if message.author.bot: - return - - issues = [ - FoundIssue(*match.group("org", "repo", "number")) - for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) - ] - links = [] - - if issues: - # Block this from working in DMs - if not message.guild: - await message.channel.send( - embed=discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "You can't retrieve issues from DMs. " - f"Try again in <#{Channels.community_bot_commands}>" - ), - colour=Colours.soft_red - ) - ) - return - - log.trace(f"Found {issues = }") - # Remove duplicates - issues = set(issues) - - if len(issues) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await message.channel.send(embed=embed, delete_after=5) - return - - for repo_issue in issues: - result = await self.fetch_issues( - int(repo_issue.number), - repo_issue.repository, - repo_issue.organisation or "python-discord" - ) - if isinstance(result, IssueState): - links.append(result) - - if not links: - return - - resp = self.format_embed(links, "python-discord") - await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: - """Load the Issues cog.""" - bot.add_cog(Issues(bot)) diff --git a/bot/exts/evergreen/latex.py b/bot/exts/evergreen/latex.py deleted file mode 100644 index 36c7e0ab..00000000 --- a/bot/exts/evergreen/latex.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import hashlib -import pathlib -import re -from concurrent.futures import ThreadPoolExecutor -from io import BytesIO - -import discord -import matplotlib.pyplot as plt -from discord.ext import commands - -from bot.bot import Bot - -# configure fonts and colors for matplotlib -plt.rcParams.update( - { - "font.size": 16, - "mathtext.fontset": "cm", # Computer Modern font set - "mathtext.rm": "serif", - "figure.facecolor": "36393F", # matches Discord's dark mode background color - "text.color": "white", - } -) - -FORMATTED_CODE_REGEX = re.compile( - r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block - r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) - r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code - r"(?P<code>.*?)" # extract all code inside the markup - r"\s*" # any more whitespace before the end of the code markup - r"(?P=delim)", # match the exact same delimiter from the start again - re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive -) - -CACHE_DIRECTORY = pathlib.Path("_latex_cache") -CACHE_DIRECTORY.mkdir(exist_ok=True) - - -class Latex(commands.Cog): - """Renders latex.""" - - @staticmethod - def _render(text: str, filepath: pathlib.Path) -> BytesIO: - """ - Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. - - Saves rendered image to cache. - """ - fig = plt.figure() - rendered_image = BytesIO() - fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") - - try: - plt.savefig(rendered_image, bbox_inches="tight", dpi=600) - except ValueError as e: - raise commands.BadArgument(str(e)) - - rendered_image.seek(0) - - with open(filepath, "wb") as f: - f.write(rendered_image.getbuffer()) - - return rendered_image - - @staticmethod - def _prepare_input(text: str) -> str: - text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\ - - if match := FORMATTED_CODE_REGEX.match(text): - return match.group("code") - else: - return text - - @commands.command() - @commands.max_concurrency(1, commands.BucketType.guild, wait=True) - async def latex(self, ctx: commands.Context, *, text: str) -> None: - """Renders the text in latex and sends the image.""" - text = self._prepare_input(text) - query_hash = hashlib.md5(text.encode()).hexdigest() - image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") - async with ctx.typing(): - if image_path.exists(): - await ctx.send(file=discord.File(image_path)) - return - - with ThreadPoolExecutor() as pool: - image = await asyncio.get_running_loop().run_in_executor( - pool, self._render, text, image_path - ) - - await ctx.send(file=discord.File(image, "latex.png")) - - -def setup(bot: Bot) -> None: - """Load the Latex Cog.""" - # As we have resource issues on this cog, - # we have it currently disabled while we fix it. - import logging - logging.info("Latex cog is currently disabled. It won't be loaded.") - return - bot.add_cog(Latex()) diff --git a/bot/exts/evergreen/magic_8ball.py b/bot/exts/evergreen/magic_8ball.py deleted file mode 100644 index 28ddcea0..00000000 --- a/bot/exts/evergreen/magic_8ball.py +++ /dev/null @@ -1,30 +0,0 @@ -import json -import logging -import random -from pathlib import Path - -from discord.ext import commands - -from bot.bot import Bot - -log = logging.getLogger(__name__) - -ANSWERS = json.loads(Path("bot/resources/evergreen/magic8ball.json").read_text("utf8")) - - -class Magic8ball(commands.Cog): - """A Magic 8ball command to respond to a user's question.""" - - @commands.command(name="8ball") - async def output_answer(self, ctx: commands.Context, *, question: str) -> None: - """Return a Magic 8ball answer from answers list.""" - if len(question.split()) >= 3: - answer = random.choice(ANSWERS) - await ctx.send(answer) - else: - await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)") - - -def setup(bot: Bot) -> None: - """Load the Magic8Ball Cog.""" - bot.add_cog(Magic8ball()) diff --git a/bot/exts/evergreen/minesweeper.py b/bot/exts/evergreen/minesweeper.py deleted file mode 100644 index a48b5051..00000000 --- a/bot/exts/evergreen/minesweeper.py +++ /dev/null @@ -1,270 +0,0 @@ -import logging -from collections.abc import Iterator -from dataclasses import dataclass -from random import randint, random -from typing import Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Client -from bot.utils.converters import CoordinateConverter -from bot.utils.exceptions import UserNotPlayingError -from bot.utils.extensions import invoke_help_command - -MESSAGE_MAPPING = { - 0: ":stop_button:", - 1: ":one:", - 2: ":two:", - 3: ":three:", - 4: ":four:", - 5: ":five:", - 6: ":six:", - 7: ":seven:", - 8: ":eight:", - 9: ":nine:", - 10: ":keycap_ten:", - "bomb": ":bomb:", - "hidden": ":grey_question:", - "flag": ":flag_black:", - "x": ":x:" -} - -log = logging.getLogger(__name__) - - -GameBoard = list[list[Union[str, int]]] - - -@dataclass -class Game: - """The data for a game.""" - - board: GameBoard - revealed: GameBoard - dm_msg: discord.Message - chat_msg: discord.Message - activated_on_server: bool - - -class Minesweeper(commands.Cog): - """Play a game of Minesweeper.""" - - def __init__(self): - self.games: dict[int, Game] = {} - - @commands.group(name="minesweeper", aliases=("ms",), invoke_without_command=True) - async def minesweeper_group(self, ctx: commands.Context) -> None: - """Commands for Playing Minesweeper.""" - await invoke_help_command(ctx) - - @staticmethod - def get_neighbours(x: int, y: int) -> Iterator[tuple[int, int]]: - """Get all the neighbouring x and y including it self.""" - for x_ in [x - 1, x, x + 1]: - for y_ in [y - 1, y, y + 1]: - if x_ != -1 and x_ != 10 and y_ != -1 and y_ != 10: - yield x_, y_ - - def generate_board(self, bomb_chance: float) -> GameBoard: - """Generate a 2d array for the board.""" - board: GameBoard = [ - [ - "bomb" if random() <= bomb_chance else "number" - for _ in range(10) - ] for _ in range(10) - ] - - # make sure there is always a free cell - board[randint(0, 9)][randint(0, 9)] = "number" - - for y, row in enumerate(board): - for x, cell in enumerate(row): - if cell == "number": - # calculate bombs near it - bombs = 0 - for x_, y_ in self.get_neighbours(x, y): - if board[y_][x_] == "bomb": - bombs += 1 - board[y][x] = bombs - return board - - @staticmethod - def format_for_discord(board: GameBoard) -> str: - """Format the board as a string for Discord.""" - discord_msg = ( - ":stop_button: :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: " - ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: " - ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n" - ) - rows = [] - for row_number, row in enumerate(board): - new_row = f"{MESSAGE_MAPPING[row_number + 1]} " - new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row) - rows.append(new_row) - - discord_msg += "\n".join(rows) - return discord_msg - - @minesweeper_group.command(name="start") - async def start_command(self, ctx: commands.Context, bomb_chance: float = .2) -> None: - """Start a game of Minesweeper.""" - if ctx.author.id in self.games: # Player is already playing - await ctx.send(f"{ctx.author.mention} you already have a game running!", delete_after=2) - await ctx.message.delete(delay=2) - return - - try: - await ctx.author.send( - f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n" - f"Close the game with `{Client.prefix}ms end`\n" - ) - except discord.errors.Forbidden: - log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members.") - await ctx.send(f":x: {ctx.author.mention}, please enable DMs to play minesweeper.") - return - - # Add game to list - board: GameBoard = self.generate_board(bomb_chance) - revealed_board: GameBoard = [["hidden"] * 10 for _ in range(10)] - dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}") - - if ctx.guild: - await ctx.send(f"{ctx.author.mention} is playing Minesweeper.") - chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}") - else: - chat_msg = None - - self.games[ctx.author.id] = Game( - board=board, - revealed=revealed_board, - dm_msg=dm_msg, - chat_msg=chat_msg, - activated_on_server=ctx.guild is not None - ) - - async def update_boards(self, ctx: commands.Context) -> None: - """Update both playing boards.""" - game = self.games[ctx.author.id] - await game.dm_msg.delete() - game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}") - if game.activated_on_server: - await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}") - - @commands.dm_only() - @minesweeper_group.command(name="flag") - async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: - """Place multiple flags on the board.""" - if ctx.author.id not in self.games: - raise UserNotPlayingError - board: GameBoard = self.games[ctx.author.id].revealed - for x, y in coordinates: - if board[y][x] == "hidden": - board[y][x] = "flag" - - await self.update_boards(ctx) - - @staticmethod - def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None: - """Reveals all the bombs.""" - for y, row in enumerate(board): - for x, cell in enumerate(row): - if cell == "bomb": - revealed[y][x] = cell - - async def lost(self, ctx: commands.Context) -> None: - """The player lost the game.""" - game = self.games[ctx.author.id] - self.reveal_bombs(game.revealed, game.board) - await ctx.author.send(":fire: You lost! :fire:") - if game.activated_on_server: - await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:") - - async def won(self, ctx: commands.Context) -> None: - """The player won the game.""" - game = self.games[ctx.author.id] - await ctx.author.send(":tada: You won! :tada:") - if game.activated_on_server: - await game.chat_msg.channel.send(f":tada: {ctx.author.mention} just won Minesweeper! :tada:") - - def reveal_zeros(self, revealed: GameBoard, board: GameBoard, x: int, y: int) -> None: - """Recursively reveal adjacent cells when a 0 cell is encountered.""" - for x_, y_ in self.get_neighbours(x, y): - if revealed[y_][x_] != "hidden": - continue - revealed[y_][x_] = board[y_][x_] - if board[y_][x_] == 0: - self.reveal_zeros(revealed, board, x_, y_) - - async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool: - """Checks if a player has won.""" - if any( - revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb" - for x in range(10) - for y in range(10) - ): - return False - else: - await self.won(ctx) - return True - - async def reveal_one( - self, - ctx: commands.Context, - revealed: GameBoard, - board: GameBoard, - x: int, - y: int - ) -> bool: - """ - Reveal one square. - - return is True if the game ended, breaking the loop in `reveal_command` and deleting the game. - """ - revealed[y][x] = board[y][x] - if board[y][x] == "bomb": - await self.lost(ctx) - revealed[y][x] = "x" # mark bomb that made you lose with a x - return True - elif board[y][x] == 0: - self.reveal_zeros(revealed, board, x, y) - return await self.check_if_won(ctx, revealed, board) - - @commands.dm_only() - @minesweeper_group.command(name="reveal") - async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: - """Reveal multiple cells.""" - if ctx.author.id not in self.games: - raise UserNotPlayingError - game = self.games[ctx.author.id] - revealed: GameBoard = game.revealed - board: GameBoard = game.board - - for x, y in coordinates: - # reveal_one returns True if the revealed cell is a bomb or the player won, ending the game - if await self.reveal_one(ctx, revealed, board, x, y): - await self.update_boards(ctx) - del self.games[ctx.author.id] - break - else: - await self.update_boards(ctx) - - @minesweeper_group.command(name="end") - async def end_command(self, ctx: commands.Context) -> None: - """End your current game.""" - if ctx.author.id not in self.games: - raise UserNotPlayingError - game = self.games[ctx.author.id] - game.revealed = game.board - await self.update_boards(ctx) - new_msg = f":no_entry: Game canceled. :no_entry:\n{game.dm_msg.content}" - await game.dm_msg.edit(content=new_msg) - if game.activated_on_server: - await game.chat_msg.edit(content=new_msg) - del self.games[ctx.author.id] - - -def setup(bot: Bot) -> None: - """Load the Minesweeper cog.""" - bot.add_cog(Minesweeper()) diff --git a/bot/exts/evergreen/movie.py b/bot/exts/evergreen/movie.py deleted file mode 100644 index a04eeb41..00000000 --- a/bot/exts/evergreen/movie.py +++ /dev/null @@ -1,205 +0,0 @@ -import logging -import random -from enum import Enum -from typing import Any - -from aiohttp import ClientSession -from discord import Embed -from discord.ext.commands import Cog, Context, group - -from bot.bot import Bot -from bot.constants import Tokens -from bot.utils.extensions import invoke_help_command -from bot.utils.pagination import ImagePaginator - -# Define base URL of TMDB -BASE_URL = "https://api.themoviedb.org/3/" - -logger = logging.getLogger(__name__) - -# Define movie params, that will be used for every movie request -MOVIE_PARAMS = { - "api_key": Tokens.tmdb, - "language": "en-US" -} - - -class MovieGenres(Enum): - """Movies Genre names and IDs.""" - - Action = "28" - Adventure = "12" - Animation = "16" - Comedy = "35" - Crime = "80" - Documentary = "99" - Drama = "18" - Family = "10751" - Fantasy = "14" - History = "36" - Horror = "27" - Music = "10402" - Mystery = "9648" - Romance = "10749" - Science = "878" - Thriller = "53" - Western = "37" - - -class Movie(Cog): - """Movie Cog contains movies command that grab random movies from TMDB.""" - - def __init__(self, bot: Bot): - self.http_session: ClientSession = bot.http_session - - @group(name="movies", aliases=("movie",), invoke_without_command=True) - async def movies(self, ctx: Context, genre: str = "", amount: int = 5) -> None: - """ - Get random movies by specifying genre. Also support amount parameter, that define how much movies will be shown. - - Default 5. Use .movies genres to get all available genres. - """ - # Check is there more than 20 movies specified, due TMDB return 20 movies - # per page, so this is max. Also you can't get less movies than 1, just logic - if amount > 20: - await ctx.send("You can't get more than 20 movies at once. (TMDB limits)") - return - elif amount < 1: - await ctx.send("You can't get less than 1 movie.") - return - - # Capitalize genre for getting data from Enum, get random page, send help when genre don't exist. - genre = genre.capitalize() - try: - result = await self.get_movies_data(self.http_session, MovieGenres[genre].value, 1) - except KeyError: - await invoke_help_command(ctx) - return - - # Check if "results" is in result. If not, throw error. - if "results" not in result: - err_msg = ( - f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " - f"{result['status_message']}." - ) - await ctx.send(err_msg) - logger.warning(err_msg) - - # Get random page. Max page is last page where is movies with this genre. - page = random.randint(1, result["total_pages"]) - - # Get movies list from TMDB, check if results key in result. When not, raise error. - movies = await self.get_movies_data(self.http_session, MovieGenres[genre].value, page) - if "results" not in movies: - err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \ - f"{result['status_message']}." - await ctx.send(err_msg) - logger.warning(err_msg) - - # Get all pages and embed - pages = await self.get_pages(self.http_session, movies, amount) - embed = await self.get_embed(genre) - - await ImagePaginator.paginate(pages, ctx, embed) - - @movies.command(name="genres", aliases=("genre", "g")) - async def genres(self, ctx: Context) -> None: - """Show all currently available genres for .movies command.""" - await ctx.send(f"Current available genres: {', '.join('`' + genre.name + '`' for genre in MovieGenres)}") - - async def get_movies_data(self, client: ClientSession, genre_id: str, page: int) -> list[dict[str, Any]]: - """Return JSON of TMDB discover request.""" - # Define params of request - params = { - "api_key": Tokens.tmdb, - "language": "en-US", - "sort_by": "popularity.desc", - "include_adult": "false", - "include_video": "false", - "page": page, - "with_genres": genre_id - } - - url = BASE_URL + "discover/movie" - - # Make discover request to TMDB, return result - async with client.get(url, params=params) as resp: - return await resp.json() - - async def get_pages(self, client: ClientSession, movies: dict[str, Any], amount: int) -> list[tuple[str, str]]: - """Fetch all movie pages from movies dictionary. Return list of pages.""" - pages = [] - - for i in range(amount): - movie_id = movies["results"][i]["id"] - movie = await self.get_movie(client, movie_id) - - page, img = await self.create_page(movie) - pages.append((page, img)) - - return pages - - async def get_movie(self, client: ClientSession, movie: int) -> dict[str, Any]: - """Get Movie by movie ID from TMDB. Return result dictionary.""" - if not isinstance(movie, int): - raise ValueError("Error while fetching movie from TMDB, movie argument must be integer. ") - url = BASE_URL + f"movie/{movie}" - - async with client.get(url, params=MOVIE_PARAMS) as resp: - return await resp.json() - - async def create_page(self, movie: dict[str, Any]) -> tuple[str, str]: - """Create page from TMDB movie request result. Return formatted page + image.""" - text = "" - - # Add title + tagline (if not empty) - text += f"**{movie['title']}**\n" - if movie["tagline"]: - text += f"{movie['tagline']}\n\n" - else: - text += "\n" - - # Add other information - text += f"**Rating:** {movie['vote_average']}/10 :star:\n" - text += f"**Release Date:** {movie['release_date']}\n\n" - - text += "__**Production Information**__\n" - - companies = movie["production_companies"] - countries = movie["production_countries"] - - text += f"**Made by:** {', '.join(company['name'] for company in companies)}\n" - text += f"**Made in:** {', '.join(country['name'] for country in countries)}\n\n" - - text += "__**Some Numbers**__\n" - - budget = f"{movie['budget']:,d}" if movie['budget'] else "?" - revenue = f"{movie['revenue']:,d}" if movie['revenue'] else "?" - - if movie["runtime"] is not None: - duration = divmod(movie["runtime"], 60) - else: - duration = ("?", "?") - - text += f"**Budget:** ${budget}\n" - text += f"**Revenue:** ${revenue}\n" - text += f"**Duration:** {f'{duration[0]} hour(s) {duration[1]} minute(s)'}\n\n" - - text += movie["overview"] - - img = f"http://image.tmdb.org/t/p/w200{movie['poster_path']}" - - # Return page content and image - return text, img - - async def get_embed(self, name: str) -> Embed: - """Return embed of random movies. Uses name in title.""" - embed = Embed(title=f"Random {name} Movies") - embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") - embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") - return embed - - -def setup(bot: Bot) -> None: - """Load the Movie Cog.""" - bot.add_cog(Movie(bot)) diff --git a/bot/exts/evergreen/ping.py b/bot/exts/evergreen/ping.py deleted file mode 100644 index 6be78117..00000000 --- a/bot/exts/evergreen/ping.py +++ /dev/null @@ -1,45 +0,0 @@ -import arrow -from dateutil.relativedelta import relativedelta -from discord import Embed -from discord.ext import commands - -from bot import start_time -from bot.bot import Bot -from bot.constants import Colours - - -class Ping(commands.Cog): - """Get info about the bot's ping and uptime.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.command(name="ping") - async def ping(self, ctx: commands.Context) -> None: - """Ping the bot to see its latency and state.""" - embed = Embed( - title=":ping_pong: Pong!", - colour=Colours.bright_green, - description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms", - ) - - await ctx.send(embed=embed) - - # Originally made in 70d2170a0a6594561d59c7d080c4280f1ebcd70b by lemon & gdude2002 - @commands.command(name="uptime") - async def uptime(self, ctx: commands.Context) -> None: - """Get the current uptime of the bot.""" - difference = relativedelta(start_time - arrow.utcnow()) - uptime_string = start_time.shift( - seconds=-difference.seconds, - minutes=-difference.minutes, - hours=-difference.hours, - days=-difference.days - ).humanize() - - await ctx.send(f"I started up {uptime_string}.") - - -def setup(bot: Bot) -> None: - """Load the Ping cog.""" - bot.add_cog(Ping(bot)) diff --git a/bot/exts/evergreen/pythonfacts.py b/bot/exts/evergreen/pythonfacts.py deleted file mode 100644 index 80a8da5d..00000000 --- a/bot/exts/evergreen/pythonfacts.py +++ /dev/null @@ -1,36 +0,0 @@ -import itertools - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours - -with open("bot/resources/evergreen/python_facts.txt") as file: - FACTS = itertools.cycle(list(file)) - -COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow]) -PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93" - - -class PythonFacts(commands.Cog): - """Sends a random fun fact about Python.""" - - @commands.command(name="pythonfact", aliases=("pyfact",)) - async def get_python_fact(self, ctx: commands.Context) -> None: - """Sends a Random fun fact about Python.""" - embed = discord.Embed( - title="Python Facts", - description=next(FACTS), - colour=next(COLORS) - ) - embed.add_field( - name="Suggestions", - value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})" - ) - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the PythonFacts Cog.""" - bot.add_cog(PythonFacts()) diff --git a/bot/exts/evergreen/realpython.py b/bot/exts/evergreen/realpython.py deleted file mode 100644 index ef8b2638..00000000 --- a/bot/exts/evergreen/realpython.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -from html import unescape -from urllib.parse import quote_plus - -from discord import Embed -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours - -logger = logging.getLogger(__name__) - - -API_ROOT = "https://realpython.com/search/api/v1/" -ARTICLE_URL = "https://realpython.com{article_url}" -SEARCH_URL = "https://realpython.com/search?q={user_search}" - - -ERROR_EMBED = Embed( - title="Error while searching Real Python", - description="There was an error while trying to reach Real Python. Please try again shortly.", - color=Colours.soft_red, -) - - -class RealPython(commands.Cog): - """User initiated command to search for a Real Python article.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.command(aliases=["rp"]) - @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: - """Send 5 articles that match the user's search terms.""" - params = {"q": user_search, "limit": 5, "kind": "article"} - async with self.bot.http_session.get(url=API_ROOT, params=params) as response: - if response.status != 200: - logger.error( - f"Unexpected status code {response.status} from Real Python" - ) - await ctx.send(embed=ERROR_EMBED) - return - - data = await response.json() - - articles = data["results"] - - if len(articles) == 0: - no_articles = Embed( - title=f"No articles found for '{user_search}'", color=Colours.soft_red - ) - await ctx.send(embed=no_articles) - return - - if len(articles) == 1: - article_description = "Here is the result:" - else: - article_description = f"Here are the top {len(articles)} results:" - - article_embed = Embed( - title="Search results - Real Python", - url=SEARCH_URL.format(user_search=quote_plus(user_search)), - description=article_description, - color=Colours.orange, - ) - - for article in articles: - article_embed.add_field( - name=unescape(article["title"]), - value=ARTICLE_URL.format(article_url=article["url"]), - inline=False, - ) - article_embed.set_footer(text="Click the links to go to the articles.") - - await ctx.send(embed=article_embed) - - -def setup(bot: Bot) -> None: - """Load the Real Python Cog.""" - bot.add_cog(RealPython(bot)) diff --git a/bot/exts/evergreen/recommend_game.py b/bot/exts/evergreen/recommend_game.py deleted file mode 100644 index bdd3acb1..00000000 --- a/bot/exts/evergreen/recommend_game.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -import logging -from pathlib import Path -from random import shuffle - -import discord -from discord.ext import commands - -from bot.bot import Bot - -log = logging.getLogger(__name__) -game_recs = [] - -# Populate the list `game_recs` with resource files -for rec_path in Path("bot/resources/evergreen/game_recs").glob("*.json"): - data = json.loads(rec_path.read_text("utf8")) - game_recs.append(data) -shuffle(game_recs) - - -class RecommendGame(commands.Cog): - """Commands related to recommending games.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.index = 0 - - @commands.command(name="recommendgame", aliases=("gamerec",)) - async def recommend_game(self, ctx: commands.Context) -> None: - """Sends an Embed of a random game recommendation.""" - if self.index >= len(game_recs): - self.index = 0 - shuffle(game_recs) - game = game_recs[self.index] - self.index += 1 - - author = self.bot.get_user(int(game["author"])) - - # Creating and formatting Embed - embed = discord.Embed(color=discord.Colour.blue()) - if author is not None: - embed.set_author(name=author.name, icon_url=author.display_avatar.url) - embed.set_image(url=game["image"]) - embed.add_field(name=f"Recommendation: {game['title']}\n{game['link']}", value=game["description"]) - - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Loads the RecommendGame cog.""" - bot.add_cog(RecommendGame(bot)) diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py deleted file mode 100644 index e6cb5337..00000000 --- a/bot/exts/evergreen/reddit.py +++ /dev/null @@ -1,368 +0,0 @@ -import asyncio -import logging -import random -import textwrap -from collections import namedtuple -from datetime import datetime, timedelta -from typing import Union - -from aiohttp import BasicAuth, ClientError -from discord import Colour, Embed, TextChannel -from discord.ext.commands import Cog, Context, group, has_any_role -from discord.ext.tasks import loop -from discord.utils import escape_markdown, sleep_until - -from bot.bot import Bot -from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES -from bot.utils.converters import Subreddit -from bot.utils.extensions import invoke_help_command -from bot.utils.messages import sub_clyde -from bot.utils.pagination import ImagePaginator, LinePaginator - -log = logging.getLogger(__name__) - -AccessToken = namedtuple("AccessToken", ["token", "expires_at"]) - - -class Reddit(Cog): - """Track subreddit posts and show detailed statistics about them.""" - - HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"} - URL = "https://www.reddit.com" - OAUTH_URL = "https://oauth.reddit.com" - MAX_RETRIES = 3 - - def __init__(self, bot: Bot): - self.bot = bot - - self.webhook = None - self.access_token = None - self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret) - - bot.loop.create_task(self.init_reddit_ready()) - self.auto_poster_loop.start() - - def cog_unload(self) -> None: - """Stop the loop task and revoke the access token when the cog is unloaded.""" - self.auto_poster_loop.cancel() - if self.access_token and self.access_token.expires_at > datetime.utcnow(): - asyncio.create_task(self.revoke_access_token()) - - async def init_reddit_ready(self) -> None: - """Sets the reddit webhook when the cog is loaded.""" - await self.bot.wait_until_guild_available() - if not self.webhook: - self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook) - - @property - def channel(self) -> TextChannel: - """Get the #reddit channel object from the bot's cache.""" - return self.bot.get_channel(Channels.reddit) - - def build_pagination_pages(self, posts: list[dict], paginate: bool) -> Union[list[tuple], str]: - """Build embed pages required for Paginator.""" - pages = [] - first_page = "" - for post in posts: - post_page = "" - image_url = "" - - data = post["data"] - - title = textwrap.shorten(data["title"], width=50, placeholder="...") - - # Normal brackets interfere with Markdown. - title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌") - link = self.URL + data["permalink"] - - first_page += f"**[{title.replace('*', '')}]({link})**\n" - - text = data["selftext"] - if text: - text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌") - first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n" - - ups = data["ups"] - comments = data["num_comments"] - author = data["author"] - - content_type = Emojis.reddit_post_text - if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))): - # This means the content type in the post is a video. - content_type = f"{Emojis.reddit_post_video}" - - elif data["url"].endswith(("jpg", "png", "gif")): - # This means the content type in the post is an image. - content_type = f"{Emojis.reddit_post_photo}" - image_url = data["url"] - - first_page += ( - f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}" - f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n" - ) - - if paginate: - post_page += f"**[{title}]({link})**\n\n" - if text: - post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n" - post_page += ( - f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002" - f"{comments}\u2003{Emojis.reddit_users}{author}" - ) - - pages.append((post_page, image_url)) - - if not paginate: - # Return the first summery page if pagination is not required - return first_page - - pages.insert(0, (first_page, "")) # Using image paginator, hence settings image url to empty string - return pages - - async def get_access_token(self) -> None: - """ - Get a Reddit API OAuth2 access token and assign it to self.access_token. - - A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog - will be unloaded and a ClientError raised if retrieval was still unsuccessful. - """ - for i in range(1, self.MAX_RETRIES + 1): - response = await self.bot.http_session.post( - url=f"{self.URL}/api/v1/access_token", - headers=self.HEADERS, - auth=self.client_auth, - data={ - "grant_type": "client_credentials", - "duration": "temporary" - } - ) - - if response.status == 200 and response.content_type == "application/json": - content = await response.json() - expiration = int(content["expires_in"]) - 60 # Subtract 1 minute for leeway. - self.access_token = AccessToken( - token=content["access_token"], - expires_at=datetime.utcnow() + timedelta(seconds=expiration) - ) - - log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}") - return - else: - log.debug( - f"Failed to get an access token: " - f"status {response.status} & content type {response.content_type}; " - f"retrying ({i}/{self.MAX_RETRIES})" - ) - - await asyncio.sleep(3) - - self.bot.remove_cog(self.qualified_name) - raise ClientError("Authentication with the Reddit API failed. Unloading the cog.") - - async def revoke_access_token(self) -> None: - """ - Revoke the OAuth2 access token for the Reddit API. - - For security reasons, it's good practice to revoke the token when it's no longer being used. - """ - response = await self.bot.http_session.post( - url=f"{self.URL}/api/v1/revoke_token", - headers=self.HEADERS, - auth=self.client_auth, - data={ - "token": self.access_token.token, - "token_type_hint": "access_token" - } - ) - - if response.status in [200, 204] and response.content_type == "application/json": - self.access_token = None - else: - log.warning(f"Unable to revoke access token: status {response.status}.") - - async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> list[dict]: - """A helper method to fetch a certain amount of Reddit posts at a given route.""" - # Reddit's JSON responses only provide 25 posts at most. - if not 25 >= amount > 0: - raise ValueError("Invalid amount of subreddit posts requested.") - - # Renew the token if necessary. - if not self.access_token or self.access_token.expires_at < datetime.utcnow(): - await self.get_access_token() - - url = f"{self.OAUTH_URL}/{route}" - for _ in range(self.MAX_RETRIES): - response = await self.bot.http_session.get( - url=url, - headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"}, - params=params - ) - if response.status == 200 and response.content_type == 'application/json': - # Got appropriate response - process and return. - content = await response.json() - posts = content["data"]["children"] - - filtered_posts = [post for post in posts if not post["data"]["over_18"]] - - return filtered_posts[:amount] - - await asyncio.sleep(3) - - log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}") - return list() # Failed to get appropriate response within allowed number of retries. - - async def get_top_posts( - self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False - ) -> Union[Embed, list[tuple]]: - """ - Get the top amount of posts for a given subreddit within a specified timeframe. - - A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top - weekly posts. - - The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most. - """ - embed = Embed() - - posts = await self.fetch_posts( - route=f"{subreddit}/top", - amount=amount, - params={"t": time} - ) - if not posts: - embed.title = random.choice(ERROR_REPLIES) - embed.colour = Colour.red() - embed.description = ( - "Sorry! We couldn't find any SFW posts from that subreddit. " - "If this problem persists, please let us know." - ) - - return embed - - if paginate: - return self.build_pagination_pages(posts, paginate=True) - - # Use only starting summary page for #reddit channel posts. - embed.description = self.build_pagination_pages(posts, paginate=False) - embed.colour = Colour.blurple() - return embed - - @loop() - async def auto_poster_loop(self) -> None: - """Post the top 5 posts daily, and the top 5 posts weekly.""" - # once d.py get support for `time` parameter in loop decorator, - # this can be removed and the loop can use the `time=datetime.time.min` parameter - now = datetime.utcnow() - tomorrow = now + timedelta(days=1) - midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0) - - await sleep_until(midnight_tomorrow) - - await self.bot.wait_until_guild_available() - if not self.webhook: - await self.bot.fetch_webhook(RedditConfig.webhook) - - if datetime.utcnow().weekday() == 0: - await self.top_weekly_posts() - # if it's a monday send the top weekly posts - - for subreddit in RedditConfig.subreddits: - top_posts = await self.get_top_posts(subreddit=subreddit, time="day") - username = sub_clyde(f"{subreddit} Top Daily Posts") - message = await self.webhook.send(username=username, embed=top_posts, wait=True) - - if message.channel.is_news(): - await message.publish() - - async def top_weekly_posts(self) -> None: - """Post a summary of the top posts.""" - for subreddit in RedditConfig.subreddits: - # Send and pin the new weekly posts. - top_posts = await self.get_top_posts(subreddit=subreddit, time="week") - username = sub_clyde(f"{subreddit} Top Weekly Posts") - message = await self.webhook.send(wait=True, username=username, embed=top_posts) - - if subreddit.lower() == "r/python": - if not self.channel: - log.warning("Failed to get #reddit channel to remove pins in the weekly loop.") - return - - # Remove the oldest pins so that only 12 remain at most. - pins = await self.channel.pins() - - while len(pins) >= 12: - await pins[-1].unpin() - del pins[-1] - - await message.pin() - - if message.channel.is_news(): - await message.publish() - - @group(name="reddit", invoke_without_command=True) - async def reddit_group(self, ctx: Context) -> None: - """View the top posts from various subreddits.""" - await invoke_help_command(ctx) - - @reddit_group.command(name="top") - async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: - """Send the top posts of all time from a given subreddit.""" - async with ctx.typing(): - pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True) - - await ctx.send(f"Here are the top {subreddit} posts of all time!") - embed = Embed( - color=Colour.blurple() - ) - - await ImagePaginator.paginate(pages, ctx, embed) - - @reddit_group.command(name="daily") - async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: - """Send the top posts of today from a given subreddit.""" - async with ctx.typing(): - pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True) - - await ctx.send(f"Here are today's top {subreddit} posts!") - embed = Embed( - color=Colour.blurple() - ) - - await ImagePaginator.paginate(pages, ctx, embed) - - @reddit_group.command(name="weekly") - async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: - """Send the top posts of this week from a given subreddit.""" - async with ctx.typing(): - pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True) - - await ctx.send(f"Here are this week's top {subreddit} posts!") - embed = Embed( - color=Colour.blurple() - ) - - await ImagePaginator.paginate(pages, ctx, embed) - - @has_any_role(*STAFF_ROLES) - @reddit_group.command(name="subreddits", aliases=("subs",)) - async def subreddits_command(self, ctx: Context) -> None: - """Send a paginated embed of all the subreddits we're relaying.""" - embed = Embed() - embed.title = "Relayed subreddits." - embed.colour = Colour.blurple() - - await LinePaginator.paginate( - RedditConfig.subreddits, - ctx, embed, - footer_text="Use the reddit commands along with these to view their posts.", - empty=False, - max_lines=15 - ) - - -def setup(bot: Bot) -> None: - """Load the Reddit cog.""" - if not RedditConfig.secret or not RedditConfig.client_id: - log.error("Credentials not provided, cog not loaded.") - return - bot.add_cog(Reddit(bot)) diff --git a/bot/exts/evergreen/rps.py b/bot/exts/evergreen/rps.py deleted file mode 100644 index c6bbff46..00000000 --- a/bot/exts/evergreen/rps.py +++ /dev/null @@ -1,57 +0,0 @@ -from random import choice - -from discord.ext import commands - -from bot.bot import Bot - -CHOICES = ["rock", "paper", "scissors"] -SHORT_CHOICES = ["r", "p", "s"] - -# Using a dictionary instead of conditions to check for the winner. -WINNER_DICT = { - "r": { - "r": 0, - "p": -1, - "s": 1, - }, - "p": { - "r": 1, - "p": 0, - "s": -1, - }, - "s": { - "r": -1, - "p": 1, - "s": 0, - } -} - - -class RPS(commands.Cog): - """Rock Paper Scissors. The Classic Game!""" - - @commands.command(case_insensitive=True) - async def rps(self, ctx: commands.Context, move: str) -> None: - """Play the classic game of Rock Paper Scissors with your own sir-lancebot!""" - move = move.lower() - player_mention = ctx.author.mention - - if move not in CHOICES and move not in SHORT_CHOICES: - raise commands.BadArgument(f"Invalid move. Please make move from options: {', '.join(CHOICES).upper()}.") - - bot_move = choice(CHOICES) - # value of player_result will be from (-1, 0, 1) as (lost, tied, won). - player_result = WINNER_DICT[move[0]][bot_move[0]] - - if player_result == 0: - message_string = f"{player_mention} You and Sir Lancebot played {bot_move}, it's a tie." - await ctx.send(message_string) - elif player_result == 1: - await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} won!") - else: - await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} lost!") - - -def setup(bot: Bot) -> None: - """Load the RPS Cog.""" - bot.add_cog(RPS(bot)) diff --git a/bot/exts/evergreen/snakes/__init__.py b/bot/exts/evergreen/snakes/__init__.py deleted file mode 100644 index 7740429b..00000000 --- a/bot/exts/evergreen/snakes/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -import logging - -from bot.bot import Bot -from bot.exts.evergreen.snakes._snakes_cog import Snakes - -log = logging.getLogger(__name__) - - -def setup(bot: Bot) -> None: - """Load the Snakes Cog.""" - bot.add_cog(Snakes(bot)) diff --git a/bot/exts/evergreen/snakes/_converter.py b/bot/exts/evergreen/snakes/_converter.py deleted file mode 100644 index 765b983d..00000000 --- a/bot/exts/evergreen/snakes/_converter.py +++ /dev/null @@ -1,82 +0,0 @@ -import json -import logging -import random -from collections.abc import Iterable - -import discord -from discord.ext.commands import Context, Converter -from rapidfuzz import fuzz - -from bot.exts.evergreen.snakes._utils import SNAKE_RESOURCES -from bot.utils import disambiguate - -log = logging.getLogger(__name__) - - -class Snake(Converter): - """Snake converter for the Snakes Cog.""" - - snakes = None - special_cases = None - - async def convert(self, ctx: Context, name: str) -> str: - """Convert the input snake name to the closest matching Snake object.""" - await self.build_list() - name = name.lower() - - if name == "python": - return "Python (programming language)" - - def get_potential(iterable: Iterable, *, threshold: int = 80) -> list[str]: - nonlocal name - potential = [] - - for item in iterable: - original, item = item, item.lower() - - if name == item: - return [original] - - a, b = fuzz.ratio(name, item), fuzz.partial_ratio(name, item) - if a >= threshold or b >= threshold: - potential.append(original) - - return potential - - # Handle special cases - if name.lower() in self.special_cases: - return self.special_cases.get(name.lower(), name.lower()) - - names = {snake["name"]: snake["scientific"] for snake in self.snakes} - all_names = names.keys() | names.values() - timeout = len(all_names) * (3 / 4) - - embed = discord.Embed( - title="Found multiple choices. Please choose the correct one.", colour=0x59982F) - embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url) - - name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed) - return names.get(name, name) - - @classmethod - async def build_list(cls) -> None: - """Build list of snakes from the static snake resources.""" - # Get all the snakes - if cls.snakes is None: - cls.snakes = json.loads((SNAKE_RESOURCES / "snake_names.json").read_text("utf8")) - # Get the special cases - if cls.special_cases is None: - special_cases = json.loads((SNAKE_RESOURCES / "special_snakes.json").read_text("utf8")) - cls.special_cases = {snake["name"].lower(): snake for snake in special_cases} - - @classmethod - async def random(cls) -> str: - """ - Get a random Snake from the loaded resources. - - This is stupid. We should find a way to somehow get the global session into a global context, - so I can get it from here. - """ - await cls.build_list() - names = [snake["scientific"] for snake in cls.snakes] - return random.choice(names) diff --git a/bot/exts/evergreen/snakes/_snakes_cog.py b/bot/exts/evergreen/snakes/_snakes_cog.py deleted file mode 100644 index 04804222..00000000 --- a/bot/exts/evergreen/snakes/_snakes_cog.py +++ /dev/null @@ -1,1151 +0,0 @@ -import asyncio -import colorsys -import logging -import os -import random -import re -import string -import textwrap -import urllib -from functools import partial -from io import BytesIO -from typing import Any, Optional - -import async_timeout -from PIL import Image, ImageDraw, ImageFont -from discord import Colour, Embed, File, Member, Message, Reaction -from discord.errors import HTTPException -from discord.ext.commands import Cog, CommandError, Context, bot_has_permissions, group - -from bot.bot import Bot -from bot.constants import ERROR_REPLIES, Tokens -from bot.exts.evergreen.snakes import _utils as utils -from bot.exts.evergreen.snakes._converter import Snake -from bot.utils.decorators import locked -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - - -# region: Constants -# Color -SNAKE_COLOR = 0x399600 - -# Antidote constants -SYRINGE_EMOJI = "\U0001F489" # :syringe: -PILL_EMOJI = "\U0001F48A" # :pill: -HOURGLASS_EMOJI = "\u231B" # :hourglass: -CROSSBONES_EMOJI = "\u2620" # :skull_crossbones: -ALEMBIC_EMOJI = "\u2697" # :alembic: -TICK_EMOJI = "\u2705" # :white_check_mark: - Correct peg, correct hole -CROSS_EMOJI = "\u274C" # :x: - Wrong peg, wrong hole -BLANK_EMOJI = "\u26AA" # :white_circle: - Correct peg, wrong hole -HOLE_EMOJI = "\u2B1C" # :white_square: - Used in guesses -EMPTY_UNICODE = "\u200b" # literally just an empty space - -ANTIDOTE_EMOJI = ( - SYRINGE_EMOJI, - PILL_EMOJI, - HOURGLASS_EMOJI, - CROSSBONES_EMOJI, - ALEMBIC_EMOJI, -) - -# Quiz constants -ANSWERS_EMOJI = { - "a": "\U0001F1E6", # :regional_indicator_a: 🇦 - "b": "\U0001F1E7", # :regional_indicator_b: 🇧 - "c": "\U0001F1E8", # :regional_indicator_c: 🇨 - "d": "\U0001F1E9", # :regional_indicator_d: 🇩 -} - -ANSWERS_EMOJI_REVERSE = { - "\U0001F1E6": "A", # :regional_indicator_a: 🇦 - "\U0001F1E7": "B", # :regional_indicator_b: 🇧 - "\U0001F1E8": "C", # :regional_indicator_c: 🇨 - "\U0001F1E9": "D", # :regional_indicator_d: 🇩 -} - -# Zzzen of pythhhon constant -ZEN = """ -Beautiful is better than ugly. -Explicit is better than implicit. -Simple is better than complex. -Complex is better than complicated. -Flat is better than nested. -Sparse is better than dense. -Readability counts. -Special cases aren't special enough to break the rules. -Although practicality beats purity. -Errors should never pass silently. -Unless explicitly silenced. -In the face of ambiguity, refuse the temptation to guess. -There should be one-- and preferably only one --obvious way to do it. -Now is better than never. -Although never is often better than *right* now. -If the implementation is hard to explain, it's a bad idea. -If the implementation is easy to explain, it may be a good idea. -""" - -# Max messages to train snake_chat on -MSG_MAX = 100 - -# get_snek constants -URL = "https://en.wikipedia.org/w/api.php?" - -# snake guess responses -INCORRECT_GUESS = ( - "Nope, that's not what it is.", - "Not quite.", - "Not even close.", - "Terrible guess.", - "Nnnno.", - "Dude. No.", - "I thought everyone knew this one.", - "Guess you suck at snakes.", - "Bet you feel stupid now.", - "Hahahaha, no.", - "Did you hit the wrong key?" -) - -CORRECT_GUESS = ( - "**WRONG**. Wait, no, actually you're right.", - "Yeah, you got it!", - "Yep, that's exactly what it is.", - "Uh-huh. Yep yep yep.", - "Yeah that's right.", - "Yup. How did you know that?", - "Are you a herpetologist?", - "Sure, okay, but I bet you can't pronounce it.", - "Are you cheating?" -) - -# snake card consts -CARD = { - "top": Image.open("bot/resources/snakes/snake_cards/card_top.png"), - "frame": Image.open("bot/resources/snakes/snake_cards/card_frame.png"), - "bottom": Image.open("bot/resources/snakes/snake_cards/card_bottom.png"), - "backs": [ - Image.open(f"bot/resources/snakes/snake_cards/backs/{file}") - for file in os.listdir("bot/resources/snakes/snake_cards/backs") - ], - "font": ImageFont.truetype("bot/resources/snakes/snake_cards/expressway.ttf", 20) -} -# endregion - - -class Snakes(Cog): - """ - Commands related to snakes, created by our community during the first code jam. - - More information can be found in the code-jam-1 repo. - - https://github.com/python-discord/code-jam-1 - """ - - wiki_brief = re.compile(r"(.*?)(=+ (.*?) =+)", flags=re.DOTALL) - valid_image_extensions = ("gif", "png", "jpeg", "jpg", "webp") - - def __init__(self, bot: Bot): - self.active_sal = {} - self.bot = bot - self.snake_names = utils.get_resource("snake_names") - self.snake_idioms = utils.get_resource("snake_idioms") - self.snake_quizzes = utils.get_resource("snake_quiz") - self.snake_facts = utils.get_resource("snake_facts") - self.num_movie_pages = None - - # region: Helper methods - @staticmethod - def _beautiful_pastel(hue: float) -> int: - """Returns random bright pastels.""" - light = random.uniform(0.7, 0.85) - saturation = 1 - - rgb = colorsys.hls_to_rgb(hue, light, saturation) - hex_rgb = "" - - for part in rgb: - value = int(part * 0xFF) - hex_rgb += f"{value:02x}" - - return int(hex_rgb, 16) - - @staticmethod - def _generate_card(buffer: BytesIO, content: dict) -> BytesIO: - """ - Generate a card from snake information. - - Written by juan and Someone during the first code jam. - """ - snake = Image.open(buffer) - - # Get the size of the snake icon, configure the height of the image box (yes, it changes) - icon_width = 347 # Hardcoded, not much i can do about that - icon_height = int((icon_width / snake.width) * snake.height) - frame_copies = icon_height // CARD["frame"].height + 1 - snake.thumbnail((icon_width, icon_height)) - - # Get the dimensions of the final image - main_height = icon_height + CARD["top"].height + CARD["bottom"].height - main_width = CARD["frame"].width - - # Start creating the foreground - foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) - foreground.paste(CARD["top"], (0, 0)) - - # Generate the frame borders to the correct height - for offset in range(frame_copies): - position = (0, CARD["top"].height + offset * CARD["frame"].height) - foreground.paste(CARD["frame"], position) - - # Add the image and bottom part of the image - foreground.paste(snake, (36, CARD["top"].height)) # Also hardcoded :( - foreground.paste(CARD["bottom"], (0, CARD["top"].height + icon_height)) - - # Setup the background - back = random.choice(CARD["backs"]) - back_copies = main_height // back.height + 1 - full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) - - # Generate the tiled background - for offset in range(back_copies): - full_image.paste(back, (16, 16 + offset * back.height)) - - # Place the foreground onto the final image - full_image.paste(foreground, (0, 0), foreground) - - # Get the first two sentences of the info - description = ".".join(content["info"].split(".")[:2]) + "." - - # Setup positioning variables - margin = 36 - offset = CARD["top"].height + icon_height + margin - - # Create blank rectangle image which will be behind the text - rectangle = Image.new( - "RGBA", - (main_width, main_height), - (0, 0, 0, 0) - ) - - # Draw a semi-transparent rectangle on it - rect = ImageDraw.Draw(rectangle) - rect.rectangle( - (margin, offset, main_width - margin, main_height - margin), - fill=(63, 63, 63, 128) - ) - - # Paste it onto the final image - full_image.paste(rectangle, (0, 0), mask=rectangle) - - # Draw the text onto the final image - draw = ImageDraw.Draw(full_image) - for line in textwrap.wrap(description, 36): - draw.text([margin + 4, offset], line, font=CARD["font"]) - offset += CARD["font"].getsize(line)[1] - - # Get the image contents as a BufferIO object - buffer = BytesIO() - full_image.save(buffer, "PNG") - buffer.seek(0) - - return buffer - - @staticmethod - def _snakify(message: str) -> str: - """Sssnakifffiesss a sstring.""" - # Replace fricatives with exaggerated snake fricatives. - simple_fricatives = [ - "f", "s", "z", "h", - "F", "S", "Z", "H", - ] - complex_fricatives = [ - "th", "sh", "Th", "Sh" - ] - - for letter in simple_fricatives: - if letter.islower(): - message = message.replace(letter, letter * random.randint(2, 4)) - else: - message = message.replace(letter, (letter * random.randint(2, 4)).title()) - - for fricative in complex_fricatives: - message = message.replace(fricative, fricative[0] + fricative[1] * random.randint(2, 4)) - - return message - - async def _fetch(self, url: str, params: Optional[dict] = None) -> dict: - """Asynchronous web request helper method.""" - if params is None: - params = {} - - async with async_timeout.timeout(10): - async with self.bot.http_session.get(url, params=params) as response: - return await response.json() - - def _get_random_long_message(self, messages: list[str], retries: int = 10) -> str: - """ - Fetch a message that's at least 3 words long, if possible to do so in retries attempts. - - Else, just return whatever the last message is. - """ - long_message = random.choice(messages) - if len(long_message.split()) < 3 and retries > 0: - return self._get_random_long_message( - messages, - retries=retries - 1 - ) - - return long_message - - async def _get_snek(self, name: str) -> dict[str, Any]: - """ - Fetches all the data from a wikipedia article about a snake. - - Builds a dict that the .get() method can use. - - Created by Ava and eivl. - """ - snake_info = {} - - params = { - "format": "json", - "action": "query", - "list": "search", - "srsearch": name, - "utf8": "", - "srlimit": "1", - } - - json = await self._fetch(URL, params=params) - - # Wikipedia does have a error page - try: - pageid = json["query"]["search"][0]["pageid"] - except KeyError: - # Wikipedia error page ID(?) - pageid = 41118 - except IndexError: - return None - - params = { - "format": "json", - "action": "query", - "prop": "extracts|images|info", - "exlimit": "max", - "explaintext": "", - "inprop": "url", - "pageids": pageid - } - - json = await self._fetch(URL, params=params) - - # Constructing dict - handle exceptions later - try: - snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"] - snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"] - snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"] - snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"] - snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"] - except KeyError: - snake_info["error"] = True - - if snake_info["images"]: - i_url = "https://commons.wikimedia.org/wiki/Special:FilePath/" - image_list = [] - map_list = [] - thumb_list = [] - - # Wikipedia has arbitrary images that are not snakes - banned = [ - "Commons-logo.svg", - "Red%20Pencil%20Icon.png", - "distribution", - "The%20Death%20of%20Cleopatra%20arthur.jpg", - "Head%20of%20holotype", - "locator", - "Woma.png", - "-map.", - ".svg", - "ange.", - "Adder%20(PSF).png" - ] - - for image in snake_info["images"]: - # Images come in the format of `File:filename.extension` - file, sep, filename = image["title"].partition(":") - filename = filename.replace(" ", "%20") # Wikipedia returns good data! - - if not filename.startswith("Map"): - if any(ban in filename for ban in banned): - pass - else: - image_list.append(f"{i_url}{filename}") - thumb_list.append(f"{i_url}{filename}?width=100") - else: - map_list.append(f"{i_url}{filename}") - - snake_info["image_list"] = image_list - snake_info["map_list"] = map_list - snake_info["thumb_list"] = thumb_list - snake_info["name"] = name - - match = self.wiki_brief.match(snake_info["extract"]) - info = match.group(1) if match else None - - if info: - info = info.replace("\n", "\n\n") # Give us some proper paragraphs. - - snake_info["info"] = info - - return snake_info - - async def _get_snake_name(self) -> dict[str, str]: - """Gets a random snake name.""" - return random.choice(self.snake_names) - - async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: dict[str, str]) -> None: - """Validate the answer using a reaction event loop.""" - def predicate(reaction: Reaction, user: Member) -> bool: - """Test if the the answer is valid and can be evaluated.""" - return ( - reaction.message.id == message.id # The reaction is attached to the question we asked. - and user == ctx.author # It's the user who triggered the quiz. - and str(reaction.emoji) in ANSWERS_EMOJI.values() # The reaction is one of the options. - ) - - for emoji in ANSWERS_EMOJI.values(): - await message.add_reaction(emoji) - - # Validate the answer - try: - reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate) - except asyncio.TimeoutError: - await ctx.send(f"You took too long. The correct answer was **{options[answer]}**.") - await message.clear_reactions() - return - - if str(reaction.emoji) == ANSWERS_EMOJI[answer]: - await ctx.send(f"{random.choice(CORRECT_GUESS)} The correct answer was **{options[answer]}**.") - else: - await ctx.send( - f"{random.choice(INCORRECT_GUESS)} The correct answer was **{options[answer]}**." - ) - - await message.clear_reactions() - # endregion - - # region: Commands - @group(name="snakes", aliases=("snake",), invoke_without_command=True) - async def snakes_group(self, ctx: Context) -> None: - """Commands from our first code jam.""" - await invoke_help_command(ctx) - - @bot_has_permissions(manage_messages=True) - @snakes_group.command(name="antidote") - @locked() - async def antidote_command(self, ctx: Context) -> None: - """ - Antidote! Can you create the antivenom before the patient dies? - - Rules: You have 4 ingredients for each antidote, you only have 10 attempts - Once you synthesize the antidote, you will be presented with 4 markers - Tick: This means you have a CORRECT ingredient in the CORRECT position - Circle: This means you have a CORRECT ingredient in the WRONG position - Cross: This means you have a WRONG ingredient in the WRONG position - - Info: The game automatically ends after 5 minutes inactivity. - You should only use each ingredient once. - - This game was created by Lord Bisk and Runew0lf. - """ - def predicate(reaction_: Reaction, user_: Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - all(( - # Reaction is on this message - reaction_.message.id == board_id.id, - # Reaction is one of the pagination emotes - reaction_.emoji in ANTIDOTE_EMOJI, - # Reaction was not made by the Bot - user_.id != self.bot.user.id, - # Reaction was made by author - user_.id == ctx.author.id - )) - ) - - # Initialize variables - antidote_tries = 0 - antidote_guess_count = 0 - antidote_guess_list = [] - guess_result = [] - board = [] - page_guess_list = [] - page_result_list = [] - win = False - - antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") - antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) - - # Generate answer - antidote_answer = list(ANTIDOTE_EMOJI) # Duplicate list, not reference it - random.shuffle(antidote_answer) - antidote_answer.pop() - - # Begin initial board building - for i in range(0, 10): - page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}") - page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}") - board.append( - f"`{i+1:02d}` " - f"{page_guess_list[i]} - " - f"{page_result_list[i]}" - ) - board.append(EMPTY_UNICODE) - antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board)) - board_id = await ctx.send(embed=antidote_embed) # Display board - - # Add our player reactions - for emoji in ANTIDOTE_EMOJI: - await board_id.add_reaction(emoji) - - # Begin main game loop - while not win and antidote_tries < 10: - try: - reaction, user = await ctx.bot.wait_for( - "reaction_add", timeout=300, check=predicate) - except asyncio.TimeoutError: - log.debug("Antidote timed out waiting for a reaction") - break # We're done, no reactions for the last 5 minutes - - if antidote_tries < 10: - if antidote_guess_count < 4: - if reaction.emoji in ANTIDOTE_EMOJI: - antidote_guess_list.append(reaction.emoji) - antidote_guess_count += 1 - - if antidote_guess_count == 4: # Guesses complete - antidote_guess_count = 0 - page_guess_list[antidote_tries] = " ".join(antidote_guess_list) - - # Now check guess - for i in range(0, len(antidote_answer)): - if antidote_guess_list[i] == antidote_answer[i]: - guess_result.append(TICK_EMOJI) - elif antidote_guess_list[i] in antidote_answer: - guess_result.append(BLANK_EMOJI) - else: - guess_result.append(CROSS_EMOJI) - guess_result.sort() - page_result_list[antidote_tries] = " ".join(guess_result) - - # Rebuild the board - board = [] - for i in range(0, 10): - board.append(f"`{i+1:02d}` " - f"{page_guess_list[i]} - " - f"{page_result_list[i]}") - board.append(EMPTY_UNICODE) - - # Remove Reactions - for emoji in antidote_guess_list: - await board_id.remove_reaction(emoji, user) - - if antidote_guess_list == antidote_answer: - win = True - - antidote_tries += 1 - guess_result = [] - antidote_guess_list = [] - - antidote_embed.clear_fields() - antidote_embed.add_field(name=f"{10 - antidote_tries} " - f"guesses remaining", - value="\n".join(board)) - # Redisplay the board - await board_id.edit(embed=antidote_embed) - - # Winning / Ending Screen - if win is True: - antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") - antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) - antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif") - antidote_embed.add_field(name="You have created the snake antidote!", - value=f"The solution was: {' '.join(antidote_answer)}\n" - f"You had {10 - antidote_tries} tries remaining.") - await board_id.edit(embed=antidote_embed) - else: - antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") - antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) - antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif") - antidote_embed.add_field( - name=EMPTY_UNICODE, - value=( - f"Sorry you didnt make the antidote in time.\n" - f"The formula was {' '.join(antidote_answer)}" - ) - ) - await board_id.edit(embed=antidote_embed) - - log.debug("Ending pagination and removing all reactions...") - await board_id.clear_reactions() - - @snakes_group.command(name="draw") - async def draw_command(self, ctx: Context) -> None: - """ - Draws a random snek using Perlin noise. - - Written by Momo and kel. - Modified by juan and lemon. - """ - with ctx.typing(): - - # Generate random snake attributes - width = random.randint(6, 10) - length = random.randint(15, 22) - random_hue = random.random() - snek_color = self._beautiful_pastel(random_hue) - text_color = self._beautiful_pastel((random_hue + 0.5) % 1) - bg_color = ( - random.randint(32, 50), - random.randint(32, 50), - random.randint(50, 70), - ) - - # Build and send the snek - text = random.choice(self.snake_idioms)["idiom"] - factory = utils.PerlinNoiseFactory(dimension=1, octaves=2) - image_frame = utils.create_snek_frame( - factory, - snake_width=width, - snake_length=length, - snake_color=snek_color, - text=text, - text_color=text_color, - bg_color=bg_color - ) - png_bytes = utils.frame_to_png_bytes(image_frame) - file = File(png_bytes, filename="snek.png") - await ctx.send(file=file) - - @snakes_group.command(name="get") - @bot_has_permissions(manage_messages=True) - @locked() - async def get_command(self, ctx: Context, *, name: Snake = None) -> None: - """ - Fetches information about a snake from Wikipedia. - - Created by Ava and eivl. - """ - with ctx.typing(): - if name is None: - name = await Snake.random() - - if isinstance(name, dict): - data = name - else: - data = await self._get_snek(name) - - if data.get("error"): - await ctx.send("Could not fetch data from Wikipedia.") - return - - description = data["info"] - - # Shorten the description if needed - if len(description) > 1000: - description = description[:1000] - last_newline = description.rfind("\n") - if last_newline > 0: - description = description[:last_newline] - - # Strip and add the Wiki link. - if "fullurl" in data: - description = description.strip("\n") - description += f"\n\nRead more on [Wikipedia]({data['fullurl']})" - - # Build and send the embed. - embed = Embed( - title=data.get("title", data.get("name")), - description=description, - colour=0x59982F, - ) - - emoji = "https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png" - - _iter = ( - url - for url in data["image_list"] - if url.endswith(self.valid_image_extensions) - ) - image = next(_iter, emoji) - - embed.set_image(url=image) - - await ctx.send(embed=embed) - - @snakes_group.command(name="guess", aliases=("identify",)) - @locked() - async def guess_command(self, ctx: Context) -> None: - """ - Snake identifying game. - - Made by Ava and eivl. - Modified by lemon. - """ - with ctx.typing(): - - image = None - - while image is None: - snakes = [await Snake.random() for _ in range(4)] - snake = random.choice(snakes) - answer = "abcd"[snakes.index(snake)] - - data = await self._get_snek(snake) - - _iter = ( - url - for url in data["image_list"] - if url.endswith(self.valid_image_extensions) - ) - image = next(_iter, None) - - embed = Embed( - title="Which of the following is the snake in the image?", - description="\n".join( - f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes), - colour=SNAKE_COLOR - ) - embed.set_image(url=image) - - guess = await ctx.send(embed=embed) - options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes} - await self._validate_answer(ctx, guess, answer, options) - - @snakes_group.command(name="hatch") - async def hatch_command(self, ctx: Context) -> None: - """ - Hatches your personal snake. - - Written by Momo and kel. - """ - # Pick a random snake to hatch. - snake_name = random.choice(list(utils.snakes.keys())) - snake_image = utils.snakes[snake_name] - - # Hatch the snake - message = await ctx.send(embed=Embed(description="Hatching your snake :snake:...")) - await asyncio.sleep(1) - - for stage in utils.stages: - hatch_embed = Embed(description=stage) - await message.edit(embed=hatch_embed) - await asyncio.sleep(1) - await asyncio.sleep(1) - await message.delete() - - # Build and send the embed. - my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name)) - my_snake_embed.set_thumbnail(url=snake_image) - my_snake_embed.set_footer( - text=" Owner: {0}#{1}".format(ctx.author.name, ctx.author.discriminator) - ) - - await ctx.send(embed=my_snake_embed) - - @snakes_group.command(name="movie") - async def movie_command(self, ctx: Context) -> None: - """ - Gets a random snake-related movie from TMDB. - - Written by Samuel. - Modified by gdude. - Modified by Will Da Silva. - """ - # Initially 8 pages are fetched. The actual number of pages is set after the first request. - page = random.randint(1, self.num_movie_pages or 8) - - async with ctx.typing(): - response = await self.bot.http_session.get( - "https://api.themoviedb.org/3/search/movie", - params={ - "query": "snake", - "page": page, - "language": "en-US", - "api_key": Tokens.tmdb, - } - ) - data = await response.json() - if self.num_movie_pages is None: - self.num_movie_pages = data["total_pages"] - movie = random.choice(data["results"])["id"] - - response = await self.bot.http_session.get( - f"https://api.themoviedb.org/3/movie/{movie}", - params={ - "language": "en-US", - "api_key": Tokens.tmdb, - } - ) - data = await response.json() - - embed = Embed(title=data["title"], color=SNAKE_COLOR) - - if data["poster_path"] is not None: - embed.set_image(url=f"https://images.tmdb.org/t/p/original{data['poster_path']}") - - if data["overview"]: - embed.add_field(name="Overview", value=data["overview"]) - - if data["release_date"]: - embed.add_field(name="Release Date", value=data["release_date"]) - - if data["genres"]: - embed.add_field(name="Genres", value=", ".join([x["name"] for x in data["genres"]])) - - if data["vote_count"]: - embed.add_field(name="Rating", value=f"{data['vote_average']}/10 ({data['vote_count']} votes)", inline=True) - - if data["budget"] and data["revenue"]: - embed.add_field(name="Budget", value=data["budget"], inline=True) - embed.add_field(name="Revenue", value=data["revenue"], inline=True) - - embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") - embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") - - try: - await ctx.send(embed=embed) - except HTTPException as err: - await ctx.send("An error occurred while fetching a snake-related movie!") - raise err from None - - @snakes_group.command(name="quiz") - @locked() - async def quiz_command(self, ctx: Context) -> None: - """ - Asks a snake-related question in the chat and validates the user's guess. - - This was created by Mushy and Cardium, - and modified by Urthas and lemon. - """ - # Prepare a question. - question = random.choice(self.snake_quizzes) - answer = question["answerkey"] - options = {key: question["options"][key] for key in ANSWERS_EMOJI.keys()} - - # Build and send the embed. - embed = Embed( - color=SNAKE_COLOR, - title=question["question"], - description="\n".join( - [f"**{key.upper()}**: {answer}" for key, answer in options.items()] - ) - ) - - quiz = await ctx.send(embed=embed) - await self._validate_answer(ctx, quiz, answer, options) - - @snakes_group.command(name="name", aliases=("name_gen",)) - async def name_command(self, ctx: Context, *, name: str = None) -> None: - """ - Snakifies a username. - - Slices the users name at the last vowel (or second last if the name - ends with a vowel), and then combines it with a random snake name, - which is sliced at the first vowel (or second if the name starts with - a vowel). - - If the name contains no vowels, it just appends the snakename - to the end of the name. - - Examples: - lemon + anaconda = lemoconda - krzsn + anaconda = krzsnconda - gdude + anaconda = gduconda - aperture + anaconda = apertuconda - lucy + python = luthon - joseph + taipan = joseipan - - This was written by Iceman, and modified for inclusion into the bot by lemon. - """ - snake_name = await self._get_snake_name() - snake_name = snake_name["name"] - snake_prefix = "" - - # Set aside every word in the snake name except the last. - if " " in snake_name: - snake_prefix = " ".join(snake_name.split()[:-1]) - snake_name = snake_name.split()[-1] - - # If no name is provided, use whoever called the command. - if name: - user_name = name - else: - user_name = ctx.author.display_name - - # Get the index of the vowel to slice the username at - user_slice_index = len(user_name) - for index, char in enumerate(reversed(user_name)): - if index == 0: - continue - if char.lower() in "aeiouy": - user_slice_index -= index - break - - # Now, get the index of the vowel to slice the snake_name at - snake_slice_index = 0 - for index, char in enumerate(snake_name): - if index == 0: - continue - if char.lower() in "aeiouy": - snake_slice_index = index + 1 - break - - # Combine! - snake_name = snake_name[snake_slice_index:] - user_name = user_name[:user_slice_index] - result = f"{snake_prefix} {user_name}{snake_name}" - result = string.capwords(result) - - # Embed and send - embed = Embed( - title="Snake name", - description=f"Your snake-name is **{result}**", - color=SNAKE_COLOR - ) - - await ctx.send(embed=embed) - return - - @snakes_group.command(name="sal") - @locked() - async def sal_command(self, ctx: Context) -> None: - """ - Play a game of Snakes and Ladders. - - Written by Momo and kel. - Modified by lemon. - """ - # Check if there is already a game in this channel - if ctx.channel in self.active_sal: - await ctx.send(f"{ctx.author.mention} A game is already in progress in this channel.") - return - - game = utils.SnakeAndLaddersGame(snakes=self, context=ctx) - self.active_sal[ctx.channel] = game - - await game.open_game() - - @snakes_group.command(name="about") - async def about_command(self, ctx: Context) -> None: - """Show an embed with information about the event, its participants, and its winners.""" - contributors = [ - "<@!245270749919576066>", - "<@!396290259907903491>", - "<@!172395097705414656>", - "<@!361708843425726474>", - "<@!300302216663793665>", - "<@!210248051430916096>", - "<@!174588005745557505>", - "<@!87793066227822592>", - "<@!211619754039967744>", - "<@!97347867923976192>", - "<@!136081839474343936>", - "<@!263560579770220554>", - "<@!104749643715387392>", - "<@!303940835005825024>", - ] - - embed = Embed( - title="About the snake cog", - description=( - "The features in this cog were created by members of the community " - "during our first ever " - "[code jam event](https://pythondiscord.com/pages/code-jams/code-jam-1-snakes-bot/). \n\n" - "The event saw over 50 participants, who competed to write a discord bot cog with a snake theme over " - "48 hours. The staff then selected the best features from all the best teams, and made modifications " - "to ensure they would all work together before integrating them into the community bot.\n\n" - "It was a tight race, but in the end, <@!104749643715387392> and <@!303940835005825024> " - f"walked away as grand champions. Make sure you check out `{ctx.prefix}snakes sal`," - f"`{ctx.prefix}snakes draw` and `{ctx.prefix}snakes hatch` " - "to see what they came up with." - ) - ) - - embed.add_field( - name="Contributors", - value=( - ", ".join(contributors) - ) - ) - - await ctx.send(embed=embed) - - @snakes_group.command(name="card") - async def card_command(self, ctx: Context, *, name: Snake = None) -> None: - """ - Create an interesting little card from a snake. - - Created by juan and Someone during the first code jam. - """ - # Get the snake data we need - if not name: - name_obj = await self._get_snake_name() - name = name_obj["scientific"] - content = await self._get_snek(name) - - elif isinstance(name, dict): - content = name - - else: - content = await self._get_snek(name) - - # Make the card - async with ctx.typing(): - - stream = BytesIO() - async with async_timeout.timeout(10): - async with self.bot.http_session.get(content["image_list"][0]) as response: - stream.write(await response.read()) - - stream.seek(0) - - func = partial(self._generate_card, stream, content) - final_buffer = await self.bot.loop.run_in_executor(None, func) - - # Send it! - await ctx.send( - f"A wild {content['name'].title()} appears!", - file=File(final_buffer, filename=content["name"].replace(" ", "") + ".png") - ) - - @snakes_group.command(name="fact") - async def fact_command(self, ctx: Context) -> None: - """ - Gets a snake-related fact. - - Written by Andrew and Prithaj. - Modified by lemon. - """ - question = random.choice(self.snake_facts)["fact"] - embed = Embed( - title="Snake fact", - color=SNAKE_COLOR, - description=question - ) - await ctx.send(embed=embed) - - @snakes_group.command(name="snakify") - async def snakify_command(self, ctx: Context, *, message: str = None) -> None: - """ - How would I talk if I were a snake? - - If `message` is passed, the bot will snakify the message. - Otherwise, a random message from the user's history is snakified. - - Written by Momo and kel. - Modified by lemon. - """ - with ctx.typing(): - embed = Embed() - user = ctx.author - - if not message: - - # Get a random message from the users history - messages = [] - async for message in ctx.history(limit=500).filter( - lambda msg: msg.author == ctx.author # Message was sent by author. - ): - messages.append(message.content) - - message = self._get_random_long_message(messages) - - # Build and send the embed - embed.set_author( - name=f"{user.name}#{user.discriminator}", - icon_url=user.display_avatar.url, - ) - embed.description = f"*{self._snakify(message)}*" - - await ctx.send(embed=embed) - - @snakes_group.command(name="video", aliases=("get_video",)) - async def video_command(self, ctx: Context, *, search: str = None) -> None: - """ - Gets a YouTube video about snakes. - - If `search` is given, a snake with that name will be searched on Youtube. - - Written by Andrew and Prithaj. - """ - # Are we searching for anything specific? - if search: - query = search + " snake" - else: - snake = await self._get_snake_name() - query = snake["name"] - - # Build the URL and make the request - url = "https://www.googleapis.com/youtube/v3/search" - response = await self.bot.http_session.get( - url, - params={ - "part": "snippet", - "q": urllib.parse.quote_plus(query), - "type": "video", - "key": Tokens.youtube - } - ) - response = await response.json() - data = response.get("items", []) - - # Send the user a video - if len(data) > 0: - num = random.randint(0, len(data) - 1) - youtube_base_url = "https://www.youtube.com/watch?v=" - await ctx.send( - content=f"{youtube_base_url}{data[num]['id']['videoId']}" - ) - else: - log.warning(f"YouTube API error. Full response looks like {response}") - - @snakes_group.command(name="zen") - async def zen_command(self, ctx: Context) -> None: - """ - Gets a random quote from the Zen of Python, except as if spoken by a snake. - - Written by Prithaj and Andrew. - Modified by lemon. - """ - embed = Embed( - title="Zzzen of Pythhon", - color=SNAKE_COLOR - ) - - # Get the zen quote and snakify it - zen_quote = random.choice(ZEN.splitlines()) - zen_quote = self._snakify(zen_quote) - - # Embed and send - embed.description = zen_quote - await ctx.send( - embed=embed - ) - # endregion - - # region: Error handlers - @card_command.error - async def command_error(self, ctx: Context, error: CommandError) -> None: - """Local error handler for the Snake Cog.""" - original_error = getattr(error, "original", None) - if isinstance(original_error, OSError): - error.handled = True - embed = Embed() - embed.colour = Colour.red() - log.error(f"snake_card encountered an OSError: {error} ({original_error})") - embed.description = "Could not generate the snake card! Please try again." - embed.title = random.choice(ERROR_REPLIES) - await ctx.send(embed=embed) diff --git a/bot/exts/evergreen/snakes/_utils.py b/bot/exts/evergreen/snakes/_utils.py deleted file mode 100644 index b5f13c53..00000000 --- a/bot/exts/evergreen/snakes/_utils.py +++ /dev/null @@ -1,721 +0,0 @@ -import asyncio -import io -import json -import logging -import math -import random -from itertools import product -from pathlib import Path - -from PIL import Image -from PIL.ImageDraw import ImageDraw -from discord import File, Member, Reaction -from discord.ext.commands import Cog, Context - -from bot.constants import Roles - -SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() - -h1 = r"""``` - ---- - ------ -/--------\ -|--------| -|--------| - \------/ - ---- -```""" -h2 = r"""``` - ---- - ------ -/---\-/--\ -|-----\--| -|--------| - \------/ - ---- -```""" -h3 = r"""``` - ---- - ------ -/---\-/--\ -|-----\--| -|-----/--| - \----\-/ - ---- -```""" -h4 = r"""``` - ----- - ----- \ -/--| /---\ -|--\ -\---| -|--\--/-- / - \------- / - ------ -```""" -stages = [h1, h2, h3, h4] -snakes = { - "Baby Python": "https://i.imgur.com/SYOcmSa.png", - "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png", - "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png", - "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png", - "Baby Cobra": "https://i.imgur.com/jk14ryt.png", - "Baby Anaconda": "https://i.imgur.com/EpdrnNr.png", -} - -BOARD_TILE_SIZE = 56 # the size of each board tile -BOARD_PLAYER_SIZE = 20 # the size of each player icon -BOARD_MARGIN = (10, 0) # margins, in pixels (for player icons) -# The size of the image to download -# Should a power of 2 and higher than BOARD_PLAYER_SIZE -PLAYER_ICON_IMAGE_SIZE = 32 -MAX_PLAYERS = 4 # depends on the board size/quality, 4 is for the default board - -# board definition (from, to) -BOARD = { - # ladders - 2: 38, - 7: 14, - 8: 31, - 15: 26, - 21: 42, - 28: 84, - 36: 44, - 51: 67, - 71: 91, - 78: 98, - 87: 94, - - # snakes - 99: 80, - 95: 75, - 92: 88, - 89: 68, - 74: 53, - 64: 60, - 62: 19, - 49: 11, - 46: 25, - 16: 6 -} - -DEFAULT_SNAKE_COLOR = 0x15c7ea -DEFAULT_BACKGROUND_COLOR = 0 -DEFAULT_IMAGE_DIMENSIONS = (200, 200) -DEFAULT_SNAKE_LENGTH = 22 -DEFAULT_SNAKE_WIDTH = 8 -DEFAULT_SEGMENT_LENGTH_RANGE = (7, 10) -DEFAULT_IMAGE_MARGINS = (50, 50) -DEFAULT_TEXT = "snek\nit\nup" -DEFAULT_TEXT_POSITION = ( - 10, - 10 -) -DEFAULT_TEXT_COLOR = 0xf2ea15 -X = 0 -Y = 1 -ANGLE_RANGE = math.pi * 2 - - -def get_resource(file: str) -> list[dict]: - """Load Snake resources JSON.""" - return json.loads((SNAKE_RESOURCES / f"{file}.json").read_text("utf-8")) - - -def smoothstep(t: float) -> float: - """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating.""" - return t * t * (3. - 2. * t) - - -def lerp(t: float, a: float, b: float) -> float: - """Linear interpolation between a and b, given a fraction t.""" - return a + t * (b - a) - - -class PerlinNoiseFactory(object): - """ - Callable that produces Perlin noise for an arbitrary point in an arbitrary number of dimensions. - - The underlying grid is aligned with the integers. - - There is no limit to the coordinates used; new gradients are generated on the fly as necessary. - - Taken from: https://gist.github.com/eevee/26f547457522755cb1fb8739d0ea89a1 - Licensed under ISC - """ - - def __init__(self, dimension: int, octaves: int = 1, tile: tuple[int, ...] = (), unbias: bool = False): - """ - Create a new Perlin noise factory in the given number of dimensions. - - dimension should be an integer and at least 1. - - More octaves create a foggier and more-detailed noise pattern. More than 4 octaves is rather excessive. - - ``tile`` can be used to make a seamlessly tiling pattern. - For example: - pnf = PerlinNoiseFactory(2, tile=(0, 3)) - - This will produce noise that tiles every 3 units vertically, but never tiles horizontally. - - If ``unbias`` is True, the smoothstep function will be applied to the output before returning - it, to counteract some of Perlin noise's significant bias towards the center of its output range. - """ - self.dimension = dimension - self.octaves = octaves - self.tile = tile + (0,) * dimension - self.unbias = unbias - - # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply - # by this to scale to ±1 - self.scale_factor = 2 * dimension ** -0.5 - - self.gradient = {} - - def _generate_gradient(self) -> tuple[float, ...]: - """ - Generate a random unit vector at each grid point. - - This is the "gradient" vector, in that the grid tile slopes towards it - """ - # 1 dimension is special, since the only unit vector is trivial; - # instead, use a slope between -1 and 1 - if self.dimension == 1: - return (random.uniform(-1, 1),) - - # Generate a random point on the surface of the unit n-hypersphere; - # this is the same as a random unit vector in n dimensions. Thanks - # to: http://mathworld.wolfram.com/SpherePointPicking.html - # Pick n normal random variables with stddev 1 - random_point = [random.gauss(0, 1) for _ in range(self.dimension)] - # Then scale the result to a unit vector - scale = sum(n * n for n in random_point) ** -0.5 - return tuple(coord * scale for coord in random_point) - - def get_plain_noise(self, *point) -> float: - """Get plain noise for a single point, without taking into account either octaves or tiling.""" - if len(point) != self.dimension: - raise ValueError( - f"Expected {self.dimension} values, got {len(point)}" - ) - - # Build a list of the (min, max) bounds in each dimension - grid_coords = [] - for coord in point: - min_coord = math.floor(coord) - max_coord = min_coord + 1 - grid_coords.append((min_coord, max_coord)) - - # Compute the dot product of each gradient vector and the point's - # distance from the corresponding grid point. This gives you each - # gradient's "influence" on the chosen point. - dots = [] - for grid_point in product(*grid_coords): - if grid_point not in self.gradient: - self.gradient[grid_point] = self._generate_gradient() - gradient = self.gradient[grid_point] - - dot = 0 - for i in range(self.dimension): - dot += gradient[i] * (point[i] - grid_point[i]) - dots.append(dot) - - # Interpolate all those dot products together. The interpolation is - # done with smoothstep to smooth out the slope as you pass from one - # grid cell into the next. - # Due to the way product() works, dot products are ordered such that - # the last dimension alternates: (..., min), (..., max), etc. So we - # can interpolate adjacent pairs to "collapse" that last dimension. Then - # the results will alternate in their second-to-last dimension, and so - # forth, until we only have a single value left. - dim = self.dimension - while len(dots) > 1: - dim -= 1 - s = smoothstep(point[dim] - grid_coords[dim][0]) - - next_dots = [] - while dots: - next_dots.append(lerp(s, dots.pop(0), dots.pop(0))) - - dots = next_dots - - return dots[0] * self.scale_factor - - def __call__(self, *point) -> float: - """ - Get the value of this Perlin noise function at the given point. - - The number of values given should match the number of dimensions. - """ - ret = 0 - for o in range(self.octaves): - o2 = 1 << o - new_point = [] - for i, coord in enumerate(point): - coord *= o2 - if self.tile[i]: - coord %= self.tile[i] * o2 - new_point.append(coord) - ret += self.get_plain_noise(*new_point) / o2 - - # Need to scale n back down since adding all those extra octaves has - # probably expanded it beyond ±1 - # 1 octave: ±1 - # 2 octaves: ±1½ - # 3 octaves: ±1¾ - ret /= 2 - 2 ** (1 - self.octaves) - - if self.unbias: - # The output of the plain Perlin noise algorithm has a fairly - # strong bias towards the center due to the central limit theorem - # -- in fact the top and bottom 1/8 virtually never happen. That's - # a quarter of our entire output range! If only we had a function - # in [0..1] that could introduce a bias towards the endpoints... - r = (ret + 1) / 2 - # Doing it this many times is a completely made-up heuristic. - for _ in range(int(self.octaves / 2 + 0.5)): - r = smoothstep(r) - ret = r * 2 - 1 - - return ret - - -def create_snek_frame( - perlin_factory: PerlinNoiseFactory, perlin_lookup_vertical_shift: float = 0, - image_dimensions: tuple[int, int] = DEFAULT_IMAGE_DIMENSIONS, - image_margins: tuple[int, int] = DEFAULT_IMAGE_MARGINS, - snake_length: int = DEFAULT_SNAKE_LENGTH, - snake_color: int = DEFAULT_SNAKE_COLOR, bg_color: int = DEFAULT_BACKGROUND_COLOR, - segment_length_range: tuple[int, int] = DEFAULT_SEGMENT_LENGTH_RANGE, snake_width: int = DEFAULT_SNAKE_WIDTH, - text: str = DEFAULT_TEXT, text_position: tuple[float, float] = DEFAULT_TEXT_POSITION, - text_color: int = DEFAULT_TEXT_COLOR -) -> Image.Image: - """ - Creates a single random snek frame using Perlin noise. - - `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame. - If `text` is given, display the given text with the snek. - """ - start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X]) - start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y]) - points: list[tuple[float, float]] = [(start_x, start_y)] - - for index in range(0, snake_length): - angle = perlin_factory.get_plain_noise( - ((1 / (snake_length + 1)) * (index + 1)) + perlin_lookup_vertical_shift - ) * ANGLE_RANGE - current_point = points[index] - segment_length = random.randint(segment_length_range[0], segment_length_range[1]) - points.append(( - current_point[X] + segment_length * math.cos(angle), - current_point[Y] + segment_length * math.sin(angle) - )) - - # normalize bounds - min_dimensions: list[float] = [start_x, start_y] - max_dimensions: list[float] = [start_x, start_y] - for point in points: - min_dimensions[X] = min(point[X], min_dimensions[X]) - min_dimensions[Y] = min(point[Y], min_dimensions[Y]) - max_dimensions[X] = max(point[X], max_dimensions[X]) - max_dimensions[Y] = max(point[Y], max_dimensions[Y]) - - # shift towards middle - dimension_range = (max_dimensions[X] - min_dimensions[X], max_dimensions[Y] - min_dimensions[Y]) - shift = ( - image_dimensions[X] / 2 - (dimension_range[X] / 2 + min_dimensions[X]), - image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y]) - ) - - image = Image.new(mode="RGB", size=image_dimensions, color=bg_color) - draw = ImageDraw(image) - for index in range(1, len(points)): - point = points[index] - previous = points[index - 1] - draw.line( - ( - shift[X] + previous[X], - shift[Y] + previous[Y], - shift[X] + point[X], - shift[Y] + point[Y] - ), - width=snake_width, - fill=snake_color - ) - if text is not None: - draw.multiline_text(text_position, text, fill=text_color) - del draw - return image - - -def frame_to_png_bytes(image: Image) -> io.BytesIO: - """Convert image to byte stream.""" - stream = io.BytesIO() - image.save(stream, format="PNG") - stream.seek(0) - return stream - - -log = logging.getLogger(__name__) -START_EMOJI = "\u2611" # :ballot_box_with_check: - Start the game -CANCEL_EMOJI = "\u274C" # :x: - Cancel or leave the game -ROLL_EMOJI = "\U0001F3B2" # :game_die: - Roll the die! -JOIN_EMOJI = "\U0001F64B" # :raising_hand: - Join the game. -STARTUP_SCREEN_EMOJI = [ - JOIN_EMOJI, - START_EMOJI, - CANCEL_EMOJI -] -GAME_SCREEN_EMOJI = [ - ROLL_EMOJI, - CANCEL_EMOJI -] - - -class SnakeAndLaddersGame: - """Snakes and Ladders game Cog.""" - - def __init__(self, snakes: Cog, context: Context): - self.snakes = snakes - self.ctx = context - self.channel = self.ctx.channel - self.state = "booting" - self.started = False - self.author = self.ctx.author - self.players = [] - self.player_tiles = {} - self.round_has_rolled = {} - self.avatar_images = {} - self.board = None - self.positions = None - self.rolls = [] - - async def open_game(self) -> None: - """ - Create a new Snakes and Ladders game. - - Listen for reactions until players have joined, and the game has been started. - """ - def startup_event_check(reaction_: Reaction, user_: Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - all(( - reaction_.message.id == startup.id, # Reaction is on startup message - reaction_.emoji in STARTUP_SCREEN_EMOJI, # Reaction is one of the startup emotes - user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot - )) - ) - - # Check to see if the bot can remove reactions - if not self.channel.permissions_for(self.ctx.guild.me).manage_messages: - log.warning( - "Unable to start Snakes and Ladders - " - f"Missing manage_messages permissions in {self.channel}" - ) - return - - await self._add_player(self.author) - await self.channel.send( - "**Snakes and Ladders**: A new game is about to start!", - file=File( - str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), - filename="Snakes and Ladders.jpg" - ) - ) - startup = await self.channel.send( - f"Press {JOIN_EMOJI} to participate, and press " - f"{START_EMOJI} to start the game" - ) - for emoji in STARTUP_SCREEN_EMOJI: - await startup.add_reaction(emoji) - - self.state = "waiting" - - while not self.started: - try: - reaction, user = await self.ctx.bot.wait_for( - "reaction_add", - timeout=300, - check=startup_event_check - ) - if reaction.emoji == JOIN_EMOJI: - await self.player_join(user) - elif reaction.emoji == CANCEL_EMOJI: - if user == self.author or (self._is_moderator(user) and user not in self.players): - # Allow game author or non-playing moderation staff to cancel a waiting game - await self.cancel_game() - return - else: - await self.player_leave(user) - elif reaction.emoji == START_EMOJI: - if self.ctx.author == user: - self.started = True - await self.start_game(user) - await startup.delete() - break - - await startup.remove_reaction(reaction.emoji, user) - - except asyncio.TimeoutError: - log.debug("Snakes and Ladders timed out waiting for a reaction") - await self.cancel_game() - return # We're done, no reactions for the last 5 minutes - - async def _add_player(self, user: Member) -> None: - """Add player to game.""" - self.players.append(user) - self.player_tiles[user.id] = 1 - - avatar_bytes = await user.display_avatar.replace(size=PLAYER_ICON_IMAGE_SIZE).read() - im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE)) - self.avatar_images[user.id] = im - - async def player_join(self, user: Member) -> None: - """ - Handle players joining the game. - - Prevent player joining if they have already joined, if the game is full, or if the game is - in a waiting state. - """ - for p in self.players: - if user == p: - await self.channel.send(user.mention + " You are already in the game.", delete_after=10) - return - if self.state != "waiting": - await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10) - return - if len(self.players) is MAX_PLAYERS: - await self.channel.send(user.mention + " The game is full!", delete_after=10) - return - - await self._add_player(user) - - await self.channel.send( - f"**Snakes and Ladders**: {user.mention} has joined the game.\n" - f"There are now {str(len(self.players))} players in the game.", - delete_after=10 - ) - - async def player_leave(self, user: Member) -> bool: - """ - Handle players leaving the game. - - Leaving is prevented if the user wasn't part of the game. - - If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean - is returned True to prevent a game from continuing after it's destroyed. - """ - is_surrendered = False # Sentinel value to assist with stopping a surrendered game - for p in self.players: - if user == p: - self.players.remove(p) - self.player_tiles.pop(p.id, None) - self.round_has_rolled.pop(p.id, None) - await self.channel.send( - "**Snakes and Ladders**: " + user.mention + " has left the game.", - delete_after=10 - ) - - if self.state != "waiting" and len(self.players) == 0: - await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") - is_surrendered = True - self._destruct() - - return is_surrendered - else: - await self.channel.send(user.mention + " You are not in the match.", delete_after=10) - return is_surrendered - - async def cancel_game(self) -> None: - """Cancel the running game.""" - await self.channel.send("**Snakes and Ladders**: Game has been canceled.") - self._destruct() - - async def start_game(self, user: Member) -> None: - """ - Allow the game author to begin the game. - - The game cannot be started if the game is in a waiting state. - """ - if not user == self.author: - await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10) - return - - if not self.state == "waiting": - await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10) - return - - self.state = "starting" - player_list = ", ".join(user.mention for user in self.players) - await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list) - await self.start_round() - - async def start_round(self) -> None: - """Begin the round.""" - def game_event_check(reaction_: Reaction, user_: Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - all(( - reaction_.message.id == self.positions.id, # Reaction is on positions message - reaction_.emoji in GAME_SCREEN_EMOJI, # Reaction is one of the game emotes - user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot - )) - ) - - self.state = "roll" - for user in self.players: - self.round_has_rolled[user.id] = False - board_img = Image.open(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg") - player_row_size = math.ceil(MAX_PLAYERS / 2) - - for i, player in enumerate(self.players): - tile = self.player_tiles[player.id] - tile_coordinates = self._board_coordinate_from_index(tile) - x_offset = BOARD_MARGIN[0] + tile_coordinates[0] * BOARD_TILE_SIZE - y_offset = \ - BOARD_MARGIN[1] + ( - (10 * BOARD_TILE_SIZE) - (9 - tile_coordinates[1]) * BOARD_TILE_SIZE - BOARD_PLAYER_SIZE) - x_offset += BOARD_PLAYER_SIZE * (i % player_row_size) - y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size) - board_img.paste(self.avatar_images[player.id], - box=(x_offset, y_offset)) - - board_file = File(frame_to_png_bytes(board_img), filename="Board.jpg") - player_list = "\n".join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players) - - # Store and send new messages - temp_board = await self.channel.send( - "**Snakes and Ladders**: A new round has started! Current board:", - file=board_file - ) - temp_positions = await self.channel.send( - f"**Current positions**:\n{player_list}\n\nUse {ROLL_EMOJI} to roll the dice!" - ) - - # Delete the previous messages - if self.board and self.positions: - await self.board.delete() - await self.positions.delete() - - # remove the roll messages - for roll in self.rolls: - await roll.delete() - self.rolls = [] - - # Save new messages - self.board = temp_board - self.positions = temp_positions - - # Wait for rolls - for emoji in GAME_SCREEN_EMOJI: - await self.positions.add_reaction(emoji) - - is_surrendered = False - while True: - try: - reaction, user = await self.ctx.bot.wait_for( - "reaction_add", - timeout=300, - check=game_event_check - ) - - if reaction.emoji == ROLL_EMOJI: - await self.player_roll(user) - elif reaction.emoji == CANCEL_EMOJI: - if self._is_moderator(user) and user not in self.players: - # Only allow non-playing moderation staff to cancel a running game - await self.cancel_game() - return - else: - is_surrendered = await self.player_leave(user) - - await self.positions.remove_reaction(reaction.emoji, user) - - if self._check_all_rolled(): - break - - except asyncio.TimeoutError: - log.debug("Snakes and Ladders timed out waiting for a reaction") - await self.cancel_game() - return # We're done, no reactions for the last 5 minutes - - # Round completed - # Check to see if the game was surrendered before completing the round, without this - # sentinel, the game object would be deleted but the next round still posted into purgatory - if not is_surrendered: - await self._complete_round() - - async def player_roll(self, user: Member) -> None: - """Handle the player's roll.""" - if user.id not in self.player_tiles: - await self.channel.send(user.mention + " You are not in the match.", delete_after=10) - return - if self.state != "roll": - await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10) - return - if self.round_has_rolled[user.id]: - return - roll = random.randint(1, 6) - self.rolls.append(await self.channel.send(f"{user.mention} rolled a **{roll}**!")) - next_tile = self.player_tiles[user.id] + roll - - # apply snakes and ladders - if next_tile in BOARD: - target = BOARD[next_tile] - if target < next_tile: - await self.channel.send( - f"{user.mention} slips on a snake and falls back to **{target}**", - delete_after=15 - ) - else: - await self.channel.send( - f"{user.mention} climbs a ladder to **{target}**", - delete_after=15 - ) - next_tile = target - - self.player_tiles[user.id] = min(100, next_tile) - self.round_has_rolled[user.id] = True - - async def _complete_round(self) -> None: - """At the conclusion of a round check to see if there's been a winner.""" - self.state = "post_round" - - # check for winner - winner = self._check_winner() - if winner is None: - # there is no winner, start the next round - await self.start_round() - return - - # announce winner and exit - await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:") - self._destruct() - - def _check_winner(self) -> Member: - """Return a winning member if we're in the post-round state and there's a winner.""" - if self.state != "post_round": - return None - return next((player for player in self.players if self.player_tiles[player.id] == 100), - None) - - def _check_all_rolled(self) -> bool: - """Check if all members have made their roll.""" - return all(rolled for rolled in self.round_has_rolled.values()) - - def _destruct(self) -> None: - """Clean up the finished game object.""" - del self.snakes.active_sal[self.channel] - - def _board_coordinate_from_index(self, index: int) -> tuple[int, int]: - """Convert the tile number to the x/y coordinates for graphical purposes.""" - y_level = 9 - math.floor((index - 1) / 10) - is_reversed = math.floor((index - 1) / 10) % 2 != 0 - x_level = (index - 1) % 10 - if is_reversed: - x_level = 9 - x_level - return x_level, y_level - - @staticmethod - def _is_moderator(user: Member) -> bool: - """Return True if the user is a Moderator.""" - return any(Roles.moderator == role.id for role in user.roles) diff --git a/bot/exts/evergreen/source.py b/bot/exts/evergreen/source.py deleted file mode 100644 index 7572ce51..00000000 --- a/bot/exts/evergreen/source.py +++ /dev/null @@ -1,85 +0,0 @@ -import inspect -from pathlib import Path -from typing import Optional - -from discord import Embed -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Source -from bot.utils.converters import SourceConverter, SourceType - - -class BotSource(commands.Cog): - """Displays information about the bot's source code.""" - - @commands.command(name="source", aliases=("src",)) - async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: - """Display information and a GitHub link to the source code of a command, tag, or cog.""" - if not source_item: - embed = Embed(title="Sir Lancebot's GitHub Repository") - embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") - embed.set_thumbnail(url=Source.github_avatar_url) - await ctx.send(embed=embed) - return - - embed = await self.build_embed(source_item) - await ctx.send(embed=embed) - - def get_source_link(self, source_item: SourceType) -> tuple[str, str, Optional[int]]: - """ - Build GitHub link of source item, return this link, file location and first line number. - - Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval). - """ - if isinstance(source_item, commands.Command): - callback = inspect.unwrap(source_item.callback) - src = callback.__code__ - filename = src.co_filename - else: - src = type(source_item) - try: - filename = inspect.getsourcefile(src) - except TypeError: - raise commands.BadArgument("Cannot get source for a dynamically-created object.") - - if not isinstance(source_item, str): - try: - lines, first_line_no = inspect.getsourcelines(src) - except OSError: - raise commands.BadArgument("Cannot get source for a dynamically-created object.") - - lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}" - else: - first_line_no = None - lines_extension = "" - - file_location = Path(filename).relative_to(Path.cwd()).as_posix() - - url = f"{Source.github}/blob/main/{file_location}{lines_extension}" - - return url, file_location, first_line_no or None - - async def build_embed(self, source_object: SourceType) -> Optional[Embed]: - """Build embed based on source object.""" - url, location, first_line = self.get_source_link(source_object) - - if isinstance(source_object, commands.Command): - description = source_object.short_doc - title = f"Command: {source_object.qualified_name}" - else: - title = f"Cog: {source_object.qualified_name}" - description = source_object.description.splitlines()[0] - - embed = Embed(title=title, description=description) - embed.set_thumbnail(url=Source.github_avatar_url) - embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})") - line_text = f":{first_line}" if first_line else "" - embed.set_footer(text=f"{location}{line_text}") - - return embed - - -def setup(bot: Bot) -> None: - """Load the BotSource cog.""" - bot.add_cog(BotSource()) diff --git a/bot/exts/evergreen/space.py b/bot/exts/evergreen/space.py deleted file mode 100644 index 48ad0f96..00000000 --- a/bot/exts/evergreen/space.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging -import random -from datetime import date, datetime -from typing import Any, Optional -from urllib.parse import urlencode - -from discord import Embed -from discord.ext import tasks -from discord.ext.commands import Cog, Context, group - -from bot.bot import Bot -from bot.constants import Tokens -from bot.utils.converters import DateConverter -from bot.utils.extensions import invoke_help_command - -logger = logging.getLogger(__name__) - -NASA_BASE_URL = "https://api.nasa.gov" -NASA_IMAGES_BASE_URL = "https://images-api.nasa.gov" -NASA_EPIC_BASE_URL = "https://epic.gsfc.nasa.gov" - -APOD_MIN_DATE = date(1995, 6, 16) - - -class Space(Cog): - """Space Cog contains commands, that show images, facts or other information about space.""" - - def __init__(self, bot: Bot): - self.http_session = bot.http_session - - self.rovers = {} - self.get_rovers.start() - - def cog_unload(self) -> None: - """Cancel `get_rovers` task when Cog will unload.""" - self.get_rovers.cancel() - - @tasks.loop(hours=24) - async def get_rovers(self) -> None: - """Get listing of rovers from NASA API and info about their start and end dates.""" - data = await self.fetch_from_nasa("mars-photos/api/v1/rovers") - - for rover in data["rovers"]: - self.rovers[rover["name"].lower()] = { - "min_date": rover["landing_date"], - "max_date": rover["max_date"], - "max_sol": rover["max_sol"] - } - - @group(name="space", invoke_without_command=True) - async def space(self, ctx: Context) -> None: - """Head command that contains commands about space.""" - await invoke_help_command(ctx) - - @space.command(name="apod") - async def apod(self, ctx: Context, date: Optional[str]) -> None: - """ - Get Astronomy Picture of Day from NASA API. Date is optional parameter, what formatting is YYYY-MM-DD. - - If date is not specified, this will get today APOD. - """ - params = {} - # Parse date to params, when provided. Show error message when invalid formatting - if date: - try: - apod_date = datetime.strptime(date, "%Y-%m-%d").date() - except ValueError: - await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.") - return - - now = datetime.now().date() - if APOD_MIN_DATE > apod_date or now < apod_date: - await ctx.send(f"Date must be between {APOD_MIN_DATE.isoformat()} and {now.isoformat()} (today).") - return - - params["date"] = apod_date.isoformat() - - result = await self.fetch_from_nasa("planetary/apod", params) - - await ctx.send( - embed=self.create_nasa_embed( - f"Astronomy Picture of the Day - {result['date']}", - result["explanation"], - result["url"] - ) - ) - - @space.command(name="nasa") - async def nasa(self, ctx: Context, *, search_term: Optional[str]) -> None: - """Get random NASA information/facts + image. Support `search_term` parameter for more specific search.""" - params = { - "media_type": "image" - } - if search_term: - params["q"] = search_term - - # Don't use API key, no need for this. - data = await self.fetch_from_nasa("search", params, NASA_IMAGES_BASE_URL, use_api_key=False) - if len(data["collection"]["items"]) == 0: - await ctx.send(f"Can't find any items with search term `{search_term}`.") - return - - item = random.choice(data["collection"]["items"]) - - await ctx.send( - embed=self.create_nasa_embed( - item["data"][0]["title"], - item["data"][0]["description"], - item["links"][0]["href"] - ) - ) - - @space.command(name="epic") - async def epic(self, ctx: Context, date: Optional[str]) -> None: - """Get a random image of the Earth from the NASA EPIC API. Support date parameter, format is YYYY-MM-DD.""" - if date: - try: - show_date = datetime.strptime(date, "%Y-%m-%d").date().isoformat() - except ValueError: - await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.") - return - else: - show_date = None - - # Don't use API key, no need for this. - data = await self.fetch_from_nasa( - f"api/natural{f'/date/{show_date}' if show_date else ''}", - base=NASA_EPIC_BASE_URL, - use_api_key=False - ) - if len(data) < 1: - await ctx.send("Can't find any images in this date.") - return - - item = random.choice(data) - - year, month, day = item["date"].split(" ")[0].split("-") - image_url = f"{NASA_EPIC_BASE_URL}/archive/natural/{year}/{month}/{day}/jpg/{item['image']}.jpg" - - await ctx.send( - embed=self.create_nasa_embed( - "Earth Image", item["caption"], image_url, f" \u2022 Identifier: {item['identifier']}" - ) - ) - - @space.group(name="mars", invoke_without_command=True) - async def mars( - self, - ctx: Context, - date: Optional[DateConverter], - rover: str = "curiosity" - ) -> None: - """ - Get random Mars image by date. Support both SOL (martian solar day) and earth date and rovers. - - Earth date formatting is YYYY-MM-DD. Use `.space mars dates` to get all currently available rovers. - """ - rover = rover.lower() - if rover not in self.rovers: - await ctx.send( - ( - f"Invalid rover `{rover}`.\n" - f"**Rovers:** `{'`, `'.join(f'{r.capitalize()}' for r in self.rovers)}`" - ) - ) - return - - # When date not provided, get random SOL date between 0 and rover's max. - if date is None: - date = random.randint(0, self.rovers[rover]["max_sol"]) - - params = {} - if isinstance(date, int): - params["sol"] = date - else: - params["earth_date"] = date.date().isoformat() - - result = await self.fetch_from_nasa(f"mars-photos/api/v1/rovers/{rover}/photos", params) - if len(result["photos"]) < 1: - err_msg = ( - f"We can't find result in date " - f"{date.date().isoformat() if isinstance(date, datetime) else f'{date} SOL'}.\n" - f"**Note:** Dates must match with rover's working dates. Please use `{ctx.prefix}space mars dates` to " - "see working dates for each rover." - ) - await ctx.send(err_msg) - return - - item = random.choice(result["photos"]) - await ctx.send( - embed=self.create_nasa_embed( - f"{item['rover']['name']}'s {item['camera']['full_name']} Mars Image", "", item["img_src"], - ) - ) - - @mars.command(name="dates", aliases=("d", "date", "rover", "rovers", "r")) - async def dates(self, ctx: Context) -> None: - """Get current available rovers photo date ranges.""" - await ctx.send("\n".join( - f"**{r.capitalize()}:** {i['min_date']} **-** {i['max_date']}" for r, i in self.rovers.items() - )) - - async def fetch_from_nasa( - self, - endpoint: str, - additional_params: Optional[dict[str, Any]] = None, - base: Optional[str] = NASA_BASE_URL, - use_api_key: bool = True - ) -> dict[str, Any]: - """Fetch information from NASA API, return result.""" - params = {} - if use_api_key: - params["api_key"] = Tokens.nasa - - # Add additional parameters to request parameters only when they provided by user - if additional_params is not None: - params.update(additional_params) - - async with self.http_session.get(url=f"{base}/{endpoint}?{urlencode(params)}") as resp: - return await resp.json() - - def create_nasa_embed(self, title: str, description: str, image: str, footer: Optional[str] = "") -> Embed: - """Generate NASA commands embeds. Required: title, description and image URL, footer (addition) is optional.""" - return Embed( - title=title, - description=description - ).set_image(url=image).set_footer(text="Powered by NASA API" + footer) - - -def setup(bot: Bot) -> None: - """Load the Space cog.""" - if not Tokens.nasa: - logger.warning("Can't find NASA API key. Not loading Space Cog.") - return - - bot.add_cog(Space(bot)) diff --git a/bot/exts/evergreen/speedrun.py b/bot/exts/evergreen/speedrun.py deleted file mode 100644 index 774eff81..00000000 --- a/bot/exts/evergreen/speedrun.py +++ /dev/null @@ -1,26 +0,0 @@ -import json -import logging -from pathlib import Path -from random import choice - -from discord.ext import commands - -from bot.bot import Bot - -log = logging.getLogger(__name__) - -LINKS = json.loads(Path("bot/resources/evergreen/speedrun_links.json").read_text("utf8")) - - -class Speedrun(commands.Cog): - """Commands about the video game speedrunning community.""" - - @commands.command(name="speedrun") - async def get_speedrun(self, ctx: commands.Context) -> None: - """Sends a link to a video of a random speedrun.""" - await ctx.send(choice(LINKS)) - - -def setup(bot: Bot) -> None: - """Load the Speedrun cog.""" - bot.add_cog(Speedrun()) diff --git a/bot/exts/evergreen/stackoverflow.py b/bot/exts/evergreen/stackoverflow.py deleted file mode 100644 index 64455e33..00000000 --- a/bot/exts/evergreen/stackoverflow.py +++ /dev/null @@ -1,88 +0,0 @@ -import logging -from html import unescape -from urllib.parse import quote_plus - -from discord import Embed, HTTPException -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, Emojis - -logger = logging.getLogger(__name__) - -BASE_URL = "https://api.stackexchange.com/2.2/search/advanced" -SO_PARAMS = { - "order": "desc", - "sort": "activity", - "site": "stackoverflow" -} -SEARCH_URL = "https://stackoverflow.com/search?q={query}" -ERR_EMBED = Embed( - title="Error in fetching results from Stackoverflow", - description=( - "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some " - "time. If this issue persists, please contact the staff or send a message in #dev-contrib." - ), - color=Colours.soft_red -) - - -class Stackoverflow(commands.Cog): - """Contains command to interact with stackoverflow from discord.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.command(aliases=["so"]) - @commands.cooldown(1, 15, commands.cooldowns.BucketType.user) - async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None: - """Sends the top 5 results of a search query from stackoverflow.""" - params = SO_PARAMS | {"q": search_query} - async with self.bot.http_session.get(url=BASE_URL, params=params) as response: - if response.status == 200: - data = await response.json() - else: - logger.error(f'Status code is not 200, it is {response.status}') - await ctx.send(embed=ERR_EMBED) - return - if not data['items']: - no_search_result = Embed( - title=f"No search results found for {search_query}", - color=Colours.soft_red - ) - await ctx.send(embed=no_search_result) - return - - top5 = data["items"][:5] - encoded_search_query = quote_plus(search_query) - embed = Embed( - title="Search results - Stackoverflow", - url=SEARCH_URL.format(query=encoded_search_query), - description=f"Here are the top {len(top5)} results:", - color=Colours.orange - ) - for item in top5: - embed.add_field( - name=unescape(item['title']), - value=( - f"[{Emojis.reddit_upvote} {item['score']} " - f"{Emojis.stackoverflow_views} {item['view_count']} " - f"{Emojis.reddit_comments} {item['answer_count']} " - f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]" - f"({item['link']})" - ), - inline=False) - embed.set_footer(text="View the original link for more results.") - try: - await ctx.send(embed=embed) - except HTTPException: - search_query_too_long = Embed( - title="Your search query is too long, please try shortening your search query", - color=Colours.soft_red - ) - await ctx.send(embed=search_query_too_long) - - -def setup(bot: Bot) -> None: - """Load the Stackoverflow Cog.""" - bot.add_cog(Stackoverflow(bot)) diff --git a/bot/exts/evergreen/status_codes.py b/bot/exts/evergreen/status_codes.py deleted file mode 100644 index 181c71ce..00000000 --- a/bot/exts/evergreen/status_codes.py +++ /dev/null @@ -1,83 +0,0 @@ -from http import HTTPStatus -from random import choice - -import discord -from discord.ext import commands - -from bot.bot import Bot - -HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg" -HTTP_CAT_URL = "https://http.cat/{code}.jpg" - - -class HTTPStatusCodes(commands.Cog): - """ - Fetch an image depicting HTTP status codes as a dog or a cat. - - If neither animal is selected a cat or dog is chosen randomly for the given status code. - """ - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.group(name="http_status", aliases=("status", "httpstatus"), invoke_without_command=True) - async def http_status_group(self, ctx: commands.Context, code: int) -> None: - """Choose a cat or dog randomly for the given status code.""" - subcmd = choice((self.http_cat, self.http_dog)) - await subcmd(ctx, code) - - @http_status_group.command(name="cat") - async def http_cat(self, ctx: commands.Context, code: int) -> None: - """Sends an embed with an image of a cat, portraying the status code.""" - embed = discord.Embed(title=f"**Status: {code}**") - url = HTTP_CAT_URL.format(code=code) - - try: - HTTPStatus(code) - async with self.bot.http_session.get(url, allow_redirects=False) as response: - if response.status != 404: - embed.set_image(url=url) - else: - raise NotImplementedError - - except ValueError: - embed.set_footer(text="Inputted status code does not exist.") - - except NotImplementedError: - embed.set_footer(text="Inputted status code is not implemented by http.cat yet.") - - finally: - await ctx.send(embed=embed) - - @http_status_group.command(name="dog") - async def http_dog(self, ctx: commands.Context, code: int) -> None: - """Sends an embed with an image of a dog, portraying the status code.""" - # These codes aren't server-friendly. - if code in (304, 422): - await self.http_cat(ctx, code) - return - - embed = discord.Embed(title=f"**Status: {code}**") - url = HTTP_DOG_URL.format(code=code) - - try: - HTTPStatus(code) - async with self.bot.http_session.get(url, allow_redirects=False) as response: - if response.status != 302: - embed.set_image(url=url) - else: - raise NotImplementedError - - except ValueError: - embed.set_footer(text="Inputted status code does not exist.") - - except NotImplementedError: - embed.set_footer(text="Inputted status code is not implemented by httpstatusdogs.com yet.") - - finally: - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the HTTPStatusCodes cog.""" - bot.add_cog(HTTPStatusCodes(bot)) diff --git a/bot/exts/evergreen/tic_tac_toe.py b/bot/exts/evergreen/tic_tac_toe.py deleted file mode 100644 index 5c4f8051..00000000 --- a/bot/exts/evergreen/tic_tac_toe.py +++ /dev/null @@ -1,335 +0,0 @@ -import asyncio -import random -from typing import Callable, Optional, Union - -import discord -from discord.ext.commands import Cog, Context, check, group, guild_only - -from bot.bot import Bot -from bot.constants import Emojis -from bot.utils.pagination import LinePaginator - -CONFIRMATION_MESSAGE = ( - "{opponent}, {requester} wants to play Tic-Tac-Toe against you." - f"\nReact to this message with {Emojis.confirmation} to accept or with {Emojis.decline} to decline." -) - - -def check_win(board: dict[int, str]) -> bool: - """Check from board, is any player won game.""" - return any( - ( - # Horizontal - board[1] == board[2] == board[3], - board[4] == board[5] == board[6], - board[7] == board[8] == board[9], - # Vertical - board[1] == board[4] == board[7], - board[2] == board[5] == board[8], - board[3] == board[6] == board[9], - # Diagonal - board[1] == board[5] == board[9], - board[3] == board[5] == board[7], - ) - ) - - -class Player: - """Class that contains information about player and functions that interact with player.""" - - def __init__(self, user: discord.User, ctx: Context, symbol: str): - self.user = user - self.ctx = ctx - self.symbol = symbol - - async def get_move(self, board: dict[int, str], msg: discord.Message) -> tuple[bool, Optional[int]]: - """ - Get move from user. - - Return is timeout reached and position of field what user will fill when timeout don't reach. - """ - def check_for_move(r: discord.Reaction, u: discord.User) -> bool: - """Check does user who reacted is user who we want, message is board and emoji is in board values.""" - return ( - u.id == self.user.id - and msg.id == r.message.id - and r.emoji in board.values() - and r.emoji in Emojis.number_emojis.values() - ) - - try: - react, _ = await self.ctx.bot.wait_for("reaction_add", timeout=30.0, check=check_for_move) - except asyncio.TimeoutError: - return True, None - else: - return False, list(Emojis.number_emojis.keys())[list(Emojis.number_emojis.values()).index(react.emoji)] - - def __str__(self) -> str: - """Return mention of user.""" - return self.user.mention - - -class AI: - """Tic Tac Toe AI class for against computer gaming.""" - - def __init__(self, symbol: str): - self.symbol = symbol - - async def get_move(self, board: dict[int, str], _: discord.Message) -> tuple[bool, int]: - """Get move from AI. AI use Minimax strategy.""" - possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())] - - for symbol in (Emojis.o_square, Emojis.x_square): - for move in possible_moves: - board_copy = board.copy() - board_copy[move] = symbol - if check_win(board_copy): - return False, move - - open_corners = [i for i in possible_moves if i in (1, 3, 7, 9)] - if len(open_corners) > 0: - return False, random.choice(open_corners) - - if 5 in possible_moves: - return False, 5 - - open_edges = [i for i in possible_moves if i in (2, 4, 6, 8)] - return False, random.choice(open_edges) - - def __str__(self) -> str: - """Return `AI` as user name.""" - return "AI" - - -class Game: - """Class that contains information and functions about Tic Tac Toe game.""" - - def __init__(self, players: list[Union[Player, AI]], ctx: Context): - self.players = players - self.ctx = ctx - self.board = { - 1: Emojis.number_emojis[1], - 2: Emojis.number_emojis[2], - 3: Emojis.number_emojis[3], - 4: Emojis.number_emojis[4], - 5: Emojis.number_emojis[5], - 6: Emojis.number_emojis[6], - 7: Emojis.number_emojis[7], - 8: Emojis.number_emojis[8], - 9: Emojis.number_emojis[9] - } - - self.current = self.players[0] - self.next = self.players[1] - - self.winner: Optional[Union[Player, AI]] = None - self.loser: Optional[Union[Player, AI]] = None - self.over = False - self.canceled = False - self.draw = False - - async def get_confirmation(self) -> tuple[bool, Optional[str]]: - """ - Ask does user want to play TicTacToe against requester. First player is always requester. - - This return tuple that have: - - first element boolean (is game accepted?) - - (optional, only when first element is False, otherwise None) reason for declining. - """ - confirm_message = await self.ctx.send( - CONFIRMATION_MESSAGE.format( - opponent=self.players[1].user.mention, - requester=self.players[0].user.mention - ) - ) - await confirm_message.add_reaction(Emojis.confirmation) - await confirm_message.add_reaction(Emojis.decline) - - def confirm_check(reaction: discord.Reaction, user: discord.User) -> bool: - """Check is user who reacted from who this was requested, message is confirmation and emoji is valid.""" - return ( - reaction.emoji in (Emojis.confirmation, Emojis.decline) - and reaction.message.id == confirm_message.id - and user == self.players[1].user - ) - - try: - reaction, user = await self.ctx.bot.wait_for( - "reaction_add", - timeout=60.0, - check=confirm_check - ) - except asyncio.TimeoutError: - self.over = True - self.canceled = True - await confirm_message.delete() - return False, "Running out of time... Cancelled game." - - await confirm_message.delete() - if reaction.emoji == Emojis.confirmation: - return True, None - else: - self.over = True - self.canceled = True - return False, "User declined" - - async def add_reactions(self, msg: discord.Message) -> None: - """Add number emojis to message.""" - for nr in Emojis.number_emojis.values(): - await msg.add_reaction(nr) - - def format_board(self) -> str: - """Get formatted tic-tac-toe board for message.""" - board = list(self.board.values()) - return "\n".join( - (f"{board[line]} {board[line + 1]} {board[line + 2]}" for line in range(0, len(board), 3)) - ) - - async def play(self) -> None: - """Start and handle game.""" - await self.ctx.send("It's time for the game! Let's begin.") - board = await self.ctx.send( - embed=discord.Embed(description=self.format_board()) - ) - await self.add_reactions(board) - - for _ in range(9): - if isinstance(self.current, Player): - announce = await self.ctx.send( - f"{self.current.user.mention}, it's your turn! " - "React with an emoji to take your go." - ) - timeout, pos = await self.current.get_move(self.board, board) - if isinstance(self.current, Player): - await announce.delete() - if timeout: - await self.ctx.send(f"{self.current.user.mention} ran out of time. Canceling game.") - self.over = True - self.canceled = True - return - self.board[pos] = self.current.symbol - await board.edit( - embed=discord.Embed(description=self.format_board()) - ) - await board.clear_reaction(Emojis.number_emojis[pos]) - if check_win(self.board): - self.winner = self.current - self.loser = self.next - await self.ctx.send( - f":tada: {self.current} won this game! :tada:" - ) - await board.clear_reactions() - break - self.current, self.next = self.next, self.current - if not self.winner: - self.draw = True - await self.ctx.send("It's a DRAW!") - self.over = True - - -def is_channel_free() -> Callable: - """Check is channel where command will be invoked free.""" - async def predicate(ctx: Context) -> bool: - return all(game.channel != ctx.channel for game in ctx.cog.games if not game.over) - return check(predicate) - - -def is_requester_free() -> Callable: - """Check is requester not already in any game.""" - async def predicate(ctx: Context) -> bool: - return all( - ctx.author not in (player.user for player in game.players) for game in ctx.cog.games if not game.over - ) - return check(predicate) - - -class TicTacToe(Cog): - """TicTacToe cog contains tic-tac-toe game commands.""" - - def __init__(self): - self.games: list[Game] = [] - - @guild_only() - @is_channel_free() - @is_requester_free() - @group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True) - async def tic_tac_toe(self, ctx: Context, opponent: Optional[discord.User]) -> None: - """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field.""" - if opponent == ctx.author: - await ctx.send("You can't play against yourself.") - return - if opponent is not None and not all( - opponent not in (player.user for player in g.players) for g in ctx.cog.games if not g.over - ): - await ctx.send("Opponent is already in game.") - return - if opponent is None: - game = Game( - [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)], - ctx - ) - else: - game = Game( - [Player(ctx.author, ctx, Emojis.x_square), Player(opponent, ctx, Emojis.o_square)], - ctx - ) - self.games.append(game) - if opponent is not None: - if opponent.bot: # check whether the opponent is a bot or not - await ctx.send("You can't play Tic-Tac-Toe with bots!") - return - - confirmed, msg = await game.get_confirmation() - - if not confirmed: - if msg: - await ctx.send(msg) - return - await game.play() - - @tic_tac_toe.group(name="history", aliases=("log",), invoke_without_command=True) - async def tic_tac_toe_logs(self, ctx: Context) -> None: - """Show most recent tic-tac-toe games.""" - if len(self.games) < 1: - await ctx.send("No recent games.") - return - log_games = [] - for i, game in enumerate(self.games): - if game.over and not game.canceled: - if game.draw: - log_games.append( - f"**#{i+1}**: {game.players[0]} vs {game.players[1]} (draw)" - ) - else: - log_games.append( - f"**#{i+1}**: {game.winner} :trophy: vs {game.loser}" - ) - await LinePaginator.paginate( - log_games, - ctx, - discord.Embed(title="Most recent Tic Tac Toe games") - ) - - @tic_tac_toe_logs.command(name="show", aliases=("s",)) - async def show_tic_tac_toe_board(self, ctx: Context, game_id: int) -> None: - """View game board by ID (ID is possible to get by `.tictactoe history`).""" - if len(self.games) < game_id: - await ctx.send("Game don't exist.") - return - game = self.games[game_id - 1] - - if game.draw: - description = f"{game.players[0]} vs {game.players[1]} (draw)\n\n{game.format_board()}" - else: - description = f"{game.winner} :trophy: vs {game.loser}\n\n{game.format_board()}" - - embed = discord.Embed( - title=f"Match #{game_id} Game Board", - description=description, - ) - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the TicTacToe cog.""" - bot.add_cog(TicTacToe()) diff --git a/bot/exts/evergreen/timed.py b/bot/exts/evergreen/timed.py deleted file mode 100644 index 2ea6b419..00000000 --- a/bot/exts/evergreen/timed.py +++ /dev/null @@ -1,48 +0,0 @@ -from copy import copy -from time import perf_counter - -from discord import Message -from discord.ext import commands - -from bot.bot import Bot - - -class TimedCommands(commands.Cog): - """Time the command execution of a command.""" - - @staticmethod - async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context: - """Get a new execution context for a command.""" - msg: Message = copy(ctx.message) - msg.content = f"{ctx.prefix}{command}" - - return await ctx.bot.get_context(msg) - - @commands.command(name="timed", aliases=("time", "t")) - async def timed(self, ctx: commands.Context, *, command: str) -> None: - """Time the command execution of a command.""" - new_ctx = await self.create_execution_context(ctx, command) - - ctx.subcontext = new_ctx - - if not ctx.subcontext.command: - help_command = f"{ctx.prefix}help" - error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands." - - await ctx.send(error) - return - - if new_ctx.command.qualified_name == "timed": - await ctx.send("You are not allowed to time the execution of the `timed` command.") - return - - t_start = perf_counter() - await new_ctx.command.invoke(new_ctx) - t_end = perf_counter() - - await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.") - - -def setup(bot: Bot) -> None: - """Load the Timed cog.""" - bot.add_cog(TimedCommands()) diff --git a/bot/exts/evergreen/trivia_quiz.py b/bot/exts/evergreen/trivia_quiz.py deleted file mode 100644 index aa4020d6..00000000 --- a/bot/exts/evergreen/trivia_quiz.py +++ /dev/null @@ -1,593 +0,0 @@ -import asyncio -import json -import logging -import operator -import random -from dataclasses import dataclass -from pathlib import Path -from typing import Callable, Optional - -import discord -from discord.ext import commands -from rapidfuzz import fuzz - -from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES, Roles - -logger = logging.getLogger(__name__) - -DEFAULT_QUESTION_LIMIT = 6 -STANDARD_VARIATION_TOLERANCE = 88 -DYNAMICALLY_GEN_VARIATION_TOLERANCE = 97 - -WRONG_ANS_RESPONSE = [ - "No one answered correctly!", - "Better luck next time...", -] - -N_PREFIX_STARTS_AT = 5 -N_PREFIXES = [ - "penta", "hexa", "hepta", "octa", "nona", - "deca", "hendeca", "dodeca", "trideca", "tetradeca", -] - -PLANETS = [ - ("1st", "Mercury"), - ("2nd", "Venus"), - ("3rd", "Earth"), - ("4th", "Mars"), - ("5th", "Jupiter"), - ("6th", "Saturn"), - ("7th", "Uranus"), - ("8th", "Neptune"), -] - -TAXONOMIC_HIERARCHY = [ - "species", "genus", "family", "order", - "class", "phylum", "kingdom", "domain", -] - -UNITS_TO_BASE_UNITS = { - "hertz": ("(unit of frequency)", "s^-1"), - "newton": ("(unit of force)", "m*kg*s^-2"), - "pascal": ("(unit of pressure & stress)", "m^-1*kg*s^-2"), - "joule": ("(unit of energy & quantity of heat)", "m^2*kg*s^-2"), - "watt": ("(unit of power)", "m^2*kg*s^-3"), - "coulomb": ("(unit of electric charge & quantity of electricity)", "s*A"), - "volt": ("(unit of voltage & electromotive force)", "m^2*kg*s^-3*A^-1"), - "farad": ("(unit of capacitance)", "m^-2*kg^-1*s^4*A^2"), - "ohm": ("(unit of electric resistance)", "m^2*kg*s^-3*A^-2"), - "weber": ("(unit of magnetic flux)", "m^2*kg*s^-2*A^-1"), - "tesla": ("(unit of magnetic flux density)", "kg*s^-2*A^-1"), -} - - -@dataclass(frozen=True) -class QuizEntry: - """Dataclass for a quiz entry (a question and a string containing answers separated by commas).""" - - question: str - answer: str - - -def linear_system(q_format: str, a_format: str) -> QuizEntry: - """Generate a system of linear equations with two unknowns.""" - x, y = random.randint(2, 5), random.randint(2, 5) - answer = a_format.format(x, y) - - coeffs = random.sample(range(1, 6), 4) - - question = q_format.format( - coeffs[0], - coeffs[1], - coeffs[0] * x + coeffs[1] * y, - coeffs[2], - coeffs[3], - coeffs[2] * x + coeffs[3] * y, - ) - - return QuizEntry(question, answer) - - -def mod_arith(q_format: str, a_format: str) -> QuizEntry: - """Generate a basic modular arithmetic question.""" - quotient, m, b = random.randint(30, 40), random.randint(10, 20), random.randint(200, 350) - ans = random.randint(0, 9) # max remainder is 9, since the minimum modulus is 10 - a = quotient * m + ans - b - - question = q_format.format(a, b, m) - answer = a_format.format(ans) - - return QuizEntry(question, answer) - - -def ngonal_prism(q_format: str, a_format: str) -> QuizEntry: - """Generate a question regarding vertices on n-gonal prisms.""" - n = random.randint(0, len(N_PREFIXES) - 1) - - question = q_format.format(N_PREFIXES[n]) - answer = a_format.format((n + N_PREFIX_STARTS_AT) * 2) - - return QuizEntry(question, answer) - - -def imag_sqrt(q_format: str, a_format: str) -> QuizEntry: - """Generate a negative square root question.""" - ans_coeff = random.randint(3, 10) - - question = q_format.format(ans_coeff ** 2) - answer = a_format.format(ans_coeff) - - return QuizEntry(question, answer) - - -def binary_calc(q_format: str, a_format: str) -> QuizEntry: - """Generate a binary calculation question.""" - a = random.randint(15, 20) - b = random.randint(10, a) - oper = random.choice( - ( - ("+", operator.add), - ("-", operator.sub), - ("*", operator.mul), - ) - ) - - # if the operator is multiplication, lower the values of the two operands to make it easier - if oper[0] == "*": - a -= 5 - b -= 5 - - question = q_format.format(a, oper[0], b) - answer = a_format.format(oper[1](a, b)) - - return QuizEntry(question, answer) - - -def solar_system(q_format: str, a_format: str) -> QuizEntry: - """Generate a question on the planets of the Solar System.""" - planet = random.choice(PLANETS) - - question = q_format.format(planet[0]) - answer = a_format.format(planet[1]) - - return QuizEntry(question, answer) - - -def taxonomic_rank(q_format: str, a_format: str) -> QuizEntry: - """Generate a question on taxonomic classification.""" - level = random.randint(0, len(TAXONOMIC_HIERARCHY) - 2) - - question = q_format.format(TAXONOMIC_HIERARCHY[level]) - answer = a_format.format(TAXONOMIC_HIERARCHY[level + 1]) - - return QuizEntry(question, answer) - - -def base_units_convert(q_format: str, a_format: str) -> QuizEntry: - """Generate a SI base units conversion question.""" - unit = random.choice(list(UNITS_TO_BASE_UNITS)) - - question = q_format.format( - unit + " " + UNITS_TO_BASE_UNITS[unit][0] - ) - answer = a_format.format( - UNITS_TO_BASE_UNITS[unit][1] - ) - - return QuizEntry(question, answer) - - -DYNAMIC_QUESTIONS_FORMAT_FUNCS = { - 201: linear_system, - 202: mod_arith, - 203: ngonal_prism, - 204: imag_sqrt, - 205: binary_calc, - 301: solar_system, - 302: taxonomic_rank, - 303: base_units_convert, -} - - -class TriviaQuiz(commands.Cog): - """A cog for all quiz commands.""" - - def __init__(self, bot: Bot): - self.bot = bot - - self.game_status = {} # A variable to store the game status: either running or not running. - self.game_owners = {} # A variable to store the person's ID who started the quiz game in a channel. - - self.questions = self.load_questions() - self.question_limit = 0 - - self.player_scores = {} # A variable to store all player's scores for a bot session. - self.game_player_scores = {} # A variable to store temporary game player's scores. - - self.categories = { - "general": "Test your general knowledge.", - "retro": "Questions related to retro gaming.", - "math": "General questions about mathematics ranging from grade 8 to grade 12.", - "science": "Put your understanding of science to the test!", - "cs": "A large variety of computer science questions.", - "python": "Trivia on our amazing language, Python!", - } - - @staticmethod - def load_questions() -> dict: - """Load the questions from the JSON file.""" - p = Path("bot", "resources", "evergreen", "trivia_quiz.json") - - return json.loads(p.read_text(encoding="utf-8")) - - @commands.group(name="quiz", aliases=["trivia"], invoke_without_command=True) - async def quiz_game(self, ctx: commands.Context, category: Optional[str], questions: Optional[int]) -> None: - """ - Start a quiz! - - Questions for the quiz can be selected from the following categories: - - general: Test your general knowledge. - - retro: Questions related to retro gaming. - - math: General questions about mathematics ranging from grade 8 to grade 12. - - science: Put your understanding of science to the test! - - cs: A large variety of computer science questions. - - python: Trivia on our amazing language, Python! - - (More to come!) - """ - if ctx.channel.id not in self.game_status: - self.game_status[ctx.channel.id] = False - - if ctx.channel.id not in self.game_player_scores: - self.game_player_scores[ctx.channel.id] = {} - - # Stop game if running. - if self.game_status[ctx.channel.id]: - await ctx.send( - "Game is already running... " - f"do `{self.bot.command_prefix}quiz stop`" - ) - return - - # Send embed showing available categories if inputted category is invalid. - if category is None: - category = random.choice(list(self.categories)) - - category = category.lower() - if category not in self.categories: - embed = self.category_embed() - await ctx.send(embed=embed) - return - - topic = self.questions[category] - topic_length = len(topic) - - if questions is None: - self.question_limit = DEFAULT_QUESTION_LIMIT - else: - if questions > topic_length: - await ctx.send( - embed=self.make_error_embed( - f"This category only has {topic_length} questions. " - "Please input a lower value!" - ) - ) - return - - elif questions < 1: - await ctx.send( - embed=self.make_error_embed( - "You must choose to complete at least one question. " - f"(or enter nothing for the default value of {DEFAULT_QUESTION_LIMIT + 1} questions)" - ) - ) - return - - else: - self.question_limit = questions - 1 - - # Start game if not running. - if not self.game_status[ctx.channel.id]: - self.game_owners[ctx.channel.id] = ctx.author - self.game_status[ctx.channel.id] = True - start_embed = self.make_start_embed(category) - - await ctx.send(embed=start_embed) # send an embed with the rules - await asyncio.sleep(5) - - done_question = [] - hint_no = 0 - answers = None - - while self.game_status[ctx.channel.id]: - # Exit quiz if number of questions for a round are already sent. - if len(done_question) > self.question_limit and hint_no == 0: - await ctx.send("The round has ended.") - await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) - - self.game_status[ctx.channel.id] = False - del self.game_owners[ctx.channel.id] - self.game_player_scores[ctx.channel.id] = {} - - break - - # If no hint has been sent or any time alert. Basically if hint_no = 0 means it is a new question. - if hint_no == 0: - # Select a random question which has not been used yet. - while True: - question_dict = random.choice(topic) - if question_dict["id"] not in done_question: - done_question.append(question_dict["id"]) - break - - if "dynamic_id" not in question_dict: - question = question_dict["question"] - answers = question_dict["answer"].split(", ") - - var_tol = STANDARD_VARIATION_TOLERANCE - else: - format_func = DYNAMIC_QUESTIONS_FORMAT_FUNCS[question_dict["dynamic_id"]] - - quiz_entry = format_func( - question_dict["question"], - question_dict["answer"], - ) - - question, answers = quiz_entry.question, quiz_entry.answer - answers = [answers] - - var_tol = DYNAMICALLY_GEN_VARIATION_TOLERANCE - - embed = discord.Embed( - colour=Colours.gold, - title=f"Question #{len(done_question)}", - description=question, - ) - - if img_url := question_dict.get("img_url"): - embed.set_image(url=img_url) - - await ctx.send(embed=embed) - - def check_func(variation_tolerance: int) -> Callable[[discord.Message], bool]: - def contains_correct_answer(m: discord.Message) -> bool: - return m.channel == ctx.channel and any( - fuzz.ratio(answer.lower(), m.content.lower()) > variation_tolerance - for answer in answers - ) - - return contains_correct_answer - - try: - msg = await self.bot.wait_for("message", check=check_func(var_tol), timeout=10) - except asyncio.TimeoutError: - # In case of TimeoutError and the game has been stopped, then do nothing. - if not self.game_status[ctx.channel.id]: - break - - if hint_no < 2: - hint_no += 1 - - if "hints" in question_dict: - hints = question_dict["hints"] - - await ctx.send(f"**Hint #{hint_no}\n**{hints[hint_no - 1]}") - else: - await ctx.send(f"{30 - hint_no * 10}s left!") - - # Once hint or time alerts has been sent 2 times, the hint_no value will be 3 - # If hint_no > 2, then it means that all hints/time alerts have been sent. - # Also means that the answer is not yet given and the bot sends the answer and the next question. - else: - if self.game_status[ctx.channel.id] is False: - break - - response = random.choice(WRONG_ANS_RESPONSE) - await ctx.send(response) - - await self.send_answer( - ctx.channel, - answers, - False, - question_dict, - self.question_limit - len(done_question) + 1, - ) - await asyncio.sleep(1) - - hint_no = 0 # Reset the hint counter so that on the next round, it's in the initial state - - await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) - await asyncio.sleep(2) - else: - if self.game_status[ctx.channel.id] is False: - break - - points = 100 - 25 * hint_no - if msg.author in self.game_player_scores[ctx.channel.id]: - self.game_player_scores[ctx.channel.id][msg.author] += points - else: - self.game_player_scores[ctx.channel.id][msg.author] = points - - # Also updating the overall scoreboard. - if msg.author in self.player_scores: - self.player_scores[msg.author] += points - else: - self.player_scores[msg.author] = points - - hint_no = 0 - - await ctx.send(f"{msg.author.mention} got the correct answer :tada: {points} points!") - - await self.send_answer( - ctx.channel, - answers, - True, - question_dict, - self.question_limit - len(done_question) + 1, - ) - await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) - - await asyncio.sleep(2) - - def make_start_embed(self, category: str) -> discord.Embed: - """Generate a starting/introduction embed for the quiz.""" - start_embed = discord.Embed( - colour=Colours.blue, - title="A quiz game is starting!", - description=( - f"This game consists of {self.question_limit + 1} questions.\n\n" - "**Rules: **\n" - "1. Only enclose your answer in backticks when the question tells you to.\n" - "2. If the question specifies an answer format, follow it or else it won't be accepted.\n" - "3. You have 30s per question. Points for each question reduces by 25 after 10s or after a hint.\n" - "4. No cheating and have fun!\n\n" - f"**Category**: {category}" - ), - ) - - return start_embed - - @staticmethod - def make_error_embed(desc: str) -> discord.Embed: - """Generate an error embed with the given description.""" - error_embed = discord.Embed( - colour=Colours.soft_red, - title=random.choice(NEGATIVE_REPLIES), - description=desc, - ) - - return error_embed - - @quiz_game.command(name="stop") - async def stop_quiz(self, ctx: commands.Context) -> None: - """ - Stop a quiz game if its running in the channel. - - Note: Only mods or the owner of the quiz can stop it. - """ - try: - if self.game_status[ctx.channel.id]: - # Check if the author is the game starter or a moderator. - if ctx.author == self.game_owners[ctx.channel.id] or any( - Roles.moderator == role.id for role in ctx.author.roles - ): - self.game_status[ctx.channel.id] = False - del self.game_owners[ctx.channel.id] - self.game_player_scores[ctx.channel.id] = {} - - await ctx.send("Quiz stopped.") - await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) - - else: - await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!") - else: - await ctx.send("No quiz running.") - except KeyError: - await ctx.send("No quiz running.") - - @quiz_game.command(name="leaderboard") - async def leaderboard(self, ctx: commands.Context) -> None: - """View everyone's score for this bot session.""" - await self.send_score(ctx.channel, self.player_scores) - - @staticmethod - async def send_score(channel: discord.TextChannel, player_data: dict) -> None: - """Send the current scores of players in the game channel.""" - if len(player_data) == 0: - await channel.send("No one has made it onto the leaderboard yet.") - return - - embed = discord.Embed( - colour=Colours.blue, - title="Score Board", - description="", - ) - - sorted_dict = sorted(player_data.items(), key=operator.itemgetter(1), reverse=True) - for item in sorted_dict: - embed.description += f"{item[0]}: {item[1]}\n" - - await channel.send(embed=embed) - - @staticmethod - async def declare_winner(channel: discord.TextChannel, player_data: dict) -> None: - """Announce the winner of the quiz in the game channel.""" - if player_data: - highest_points = max(list(player_data.values())) - no_of_winners = list(player_data.values()).count(highest_points) - - # Check if more than 1 player has highest points. - if no_of_winners > 1: - winners = [] - points_copy = list(player_data.values()).copy() - - for _ in range(no_of_winners): - index = points_copy.index(highest_points) - winners.append(list(player_data.keys())[index]) - points_copy[index] = 0 - - winners_mention = " ".join(winner.mention for winner in winners) - else: - author_index = list(player_data.values()).index(highest_points) - winner = list(player_data.keys())[author_index] - winners_mention = winner.mention - - await channel.send( - f"Congratulations {winners_mention} :tada: " - f"You have won this quiz game with a grand total of {highest_points} points!" - ) - - def category_embed(self) -> discord.Embed: - """Build an embed showing all available trivia categories.""" - embed = discord.Embed( - colour=Colours.blue, - title="The available question categories are:", - description="", - ) - - embed.set_footer(text="If a category is not chosen, a random one will be selected.") - - for cat, description in self.categories.items(): - embed.description += ( - f"**- {cat.capitalize()}**\n" - f"{description.capitalize()}\n" - ) - - return embed - - @staticmethod - async def send_answer( - channel: discord.TextChannel, - answers: list[str], - answer_is_correct: bool, - question_dict: dict, - q_left: int, - ) -> None: - """Send the correct answer of a question to the game channel.""" - info = question_dict.get("info") - - plurality = " is" if len(answers) == 1 else "s are" - - embed = discord.Embed( - color=Colours.bright_green, - title=( - ("You got it! " if answer_is_correct else "") - + f"The correct answer{plurality} **`{', '.join(answers)}`**\n" - ), - description="", - ) - - if info is not None: - embed.description += f"**Information**\n{info}\n\n" - - embed.description += ( - ("Let's move to the next question." if q_left > 0 else "") - + f"\nRemaining questions: {q_left}" - ) - await channel.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the TriviaQuiz cog.""" - bot.add_cog(TriviaQuiz(bot)) diff --git a/bot/exts/evergreen/wikipedia.py b/bot/exts/evergreen/wikipedia.py deleted file mode 100644 index eccc1f8c..00000000 --- a/bot/exts/evergreen/wikipedia.py +++ /dev/null @@ -1,100 +0,0 @@ -import logging -import re -from datetime import datetime -from html import unescape - -from discord import Color, Embed, TextChannel -from discord.ext import commands - -from bot.bot import Bot -from bot.utils import LinePaginator -from bot.utils.exceptions import APIError - -log = logging.getLogger(__name__) - -SEARCH_API = ( - "https://en.wikipedia.org/w/api.php" -) -WIKI_PARAMS = { - "action": "query", - "list": "search", - "prop": "info", - "inprop": "url", - "utf8": "", - "format": "json", - "origin": "*", - -} -WIKI_THUMBNAIL = ( - "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg" - "/330px-Wikipedia-logo-v2.svg.png" -) -WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)" -WIKI_SEARCH_RESULT = ( - "**[{name}]({url})**\n" - "{description}\n" -) - - -class WikipediaSearch(commands.Cog): - """Get info from wikipedia.""" - - def __init__(self, bot: Bot): - self.bot = bot - - async def wiki_request(self, channel: TextChannel, search: str) -> list[str]: - """Search wikipedia search string and return formatted first 10 pages found.""" - params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search} - async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp: - if resp.status != 200: - log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`") - raise APIError("Wikipedia API", resp.status) - - raw_data = await resp.json() - - if not raw_data.get("query"): - if error := raw_data.get("errors"): - log.error(f"There was an error while communicating with the Wikipedia API: {error}") - raise APIError("Wikipedia API", resp.status, error) - - lines = [] - if raw_data["query"]["searchinfo"]["totalhits"]: - for article in raw_data["query"]["search"]: - line = WIKI_SEARCH_RESULT.format( - name=article["title"], - description=unescape( - re.sub( - WIKI_SNIPPET_REGEX, "", article["snippet"] - ) - ), - url=f"https://en.wikipedia.org/?curid={article['pageid']}" - ) - lines.append(line) - - return lines - - @commands.cooldown(1, 10, commands.BucketType.user) - @commands.command(name="wikipedia", aliases=("wiki",)) - async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None: - """Sends paginated top 10 results of Wikipedia search..""" - contents = await self.wiki_request(ctx.channel, search) - - if contents: - embed = Embed( - title="Wikipedia Search Results", - colour=Color.blurple() - ) - embed.set_thumbnail(url=WIKI_THUMBNAIL) - embed.timestamp = datetime.utcnow() - await LinePaginator.paginate( - contents, ctx, embed - ) - else: - await ctx.send( - "Sorry, we could not find a wikipedia article using that search term." - ) - - -def setup(bot: Bot) -> None: - """Load the WikipediaSearch cog.""" - bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/evergreen/wolfram.py b/bot/exts/evergreen/wolfram.py deleted file mode 100644 index 9a26e545..00000000 --- a/bot/exts/evergreen/wolfram.py +++ /dev/null @@ -1,293 +0,0 @@ -import logging -from io import BytesIO -from typing import Callable, Optional -from urllib.parse import urlencode - -import arrow -import discord -from discord import Embed -from discord.ext import commands -from discord.ext.commands import BucketType, Cog, Context, check, group - -from bot.bot import Bot -from bot.constants import Colours, STAFF_ROLES, Wolfram -from bot.utils.pagination import ImagePaginator - -log = logging.getLogger(__name__) - -APPID = Wolfram.key -DEFAULT_OUTPUT_FORMAT = "JSON" -QUERY = "http://api.wolframalpha.com/v2/{request}" -WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" - -MAX_PODS = 20 - -# Allows for 10 wolfram calls pr user pr day -usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user) - -# Allows for max api requests / days in month per day for the entire guild (Temporary) -guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild) - - -async def send_embed( - ctx: Context, - message_txt: str, - colour: int = Colours.soft_red, - footer: str = None, - img_url: str = None, - f: discord.File = None -) -> None: - """Generate & send a response embed with Wolfram as the author.""" - embed = Embed(colour=colour) - embed.description = message_txt - embed.set_author( - name="Wolfram Alpha", - icon_url=WOLF_IMAGE, - url="https://www.wolframalpha.com/" - ) - if footer: - embed.set_footer(text=footer) - - if img_url: - embed.set_image(url=img_url) - - await ctx.send(embed=embed, file=f) - - -def custom_cooldown(*ignore: int) -> Callable: - """ - Implement per-user and per-guild cooldowns for requests to the Wolfram API. - - A list of roles may be provided to ignore the per-user cooldown. - """ - async def predicate(ctx: Context) -> bool: - if ctx.invoked_with == "help": - # if the invoked command is help we don't want to increase the ratelimits since it's not actually - # invoking the command/making a request, so instead just check if the user/guild are on cooldown. - guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown - # check the message is in a guild, and check user bucket if user is not ignored - if ctx.guild and not any(r.id in ignore for r in ctx.author.roles): - return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 - return guild_cooldown - - user_bucket = usercd.get_bucket(ctx.message) - - if all(role.id not in ignore for role in ctx.author.roles): - user_rate = user_bucket.update_rate_limit() - - if user_rate: - # Can't use api; cause: member limit - cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True) - message = ( - "You've used up your limit for Wolfram|Alpha requests.\n" - f"Cooldown: {cooldown}" - ) - await send_embed(ctx, message) - return False - - guild_bucket = guildcd.get_bucket(ctx.message) - guild_rate = guild_bucket.update_rate_limit() - - # Repr has a token attribute to read requests left - log.debug(guild_bucket) - - if guild_rate: - # Can't use api; cause: guild limit - message = ( - "The max limit of requests for the server has been reached for today.\n" - f"Cooldown: {int(guild_rate)}" - ) - await send_embed(ctx, message) - return False - - return True - - return check(predicate) - - -async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]: - """Get the Wolfram API pod pages for the provided query.""" - async with ctx.typing(): - params = { - "input": query, - "appid": APPID, - "output": DEFAULT_OUTPUT_FORMAT, - "format": "image,plaintext", - "location": "the moon", - "latlong": "0.0,0.0", - "ip": "1.1.1.1" - } - request_url = QUERY.format(request="query") - - async with bot.http_session.get(url=request_url, params=params) as response: - json = await response.json(content_type="text/plain") - - result = json["queryresult"] - log_full_url = f"{request_url}?{urlencode(params)}" - if result["error"]: - # API key not set up correctly - if result["error"]["msg"] == "Invalid appid": - message = "Wolfram API key is invalid or missing." - log.warning( - "API key seems to be missing, or invalid when " - f"processing a wolfram request: {log_full_url}, Response: {json}" - ) - await send_embed(ctx, message) - return None - - message = "Something went wrong internally with your request, please notify staff!" - log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}") - await send_embed(ctx, message) - return None - - if not result["success"]: - message = f"I couldn't find anything for {query}." - await send_embed(ctx, message) - return None - - if not result["numpods"]: - message = "Could not find any results." - await send_embed(ctx, message) - return None - - pods = result["pods"] - pages = [] - for pod in pods[:MAX_PODS]: - subs = pod.get("subpods") - - for sub in subs: - title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") - img = sub["img"]["src"] - pages.append((title, img)) - return pages - - -class Wolfram(Cog): - """Commands for interacting with the Wolfram|Alpha API.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_command(self, ctx: Context, *, query: str) -> None: - """Requests all answers on a single image, sends an image of all related pods.""" - params = { - "i": query, - "appid": APPID, - "location": "the moon", - "latlong": "0.0,0.0", - "ip": "1.1.1.1" - } - request_url = QUERY.format(request="simple") - - # Give feedback that the bot is working. - async with ctx.typing(): - async with self.bot.http_session.get(url=request_url, params=params) as response: - status = response.status - image_bytes = await response.read() - - f = discord.File(BytesIO(image_bytes), filename="image.png") - image_url = "attachment://image.png" - - if status == 501: - message = "Failed to get response." - footer = "" - color = Colours.soft_red - elif status == 400: - message = "No input found." - footer = "" - color = Colours.soft_red - elif status == 403: - message = "Wolfram API key is invalid or missing." - footer = "" - color = Colours.soft_red - else: - message = "" - footer = "View original for a bigger picture." - color = Colours.soft_orange - - # Sends a "blank" embed if no request is received, unsure how to fix - await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) - - @wolfram_command.command(name="page", aliases=("pa", "p")) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: - """ - Requests a drawn image of given query. - - Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. - """ - pages = await get_pod_pages(ctx, self.bot, query) - - if not pages: - return - - embed = Embed() - embed.set_author( - name="Wolfram Alpha", - icon_url=WOLF_IMAGE, - url="https://www.wolframalpha.com/" - ) - embed.colour = Colours.soft_orange - - await ImagePaginator.paginate(pages, ctx, embed) - - @wolfram_command.command(name="cut", aliases=("c",)) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: - """ - Requests a drawn image of given query. - - Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. - """ - pages = await get_pod_pages(ctx, self.bot, query) - - if not pages: - return - - if len(pages) >= 2: - page = pages[1] - else: - page = pages[0] - - await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) - - @wolfram_command.command(name="short", aliases=("sh", "s")) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: - """Requests an answer to a simple question.""" - params = { - "i": query, - "appid": APPID, - "location": "the moon", - "latlong": "0.0,0.0", - "ip": "1.1.1.1" - } - request_url = QUERY.format(request="result") - - # Give feedback that the bot is working. - async with ctx.typing(): - async with self.bot.http_session.get(url=request_url, params=params) as response: - status = response.status - response_text = await response.text() - - if status == 501: - message = "Failed to get response." - color = Colours.soft_red - elif status == 400: - message = "No input found." - color = Colours.soft_red - elif response_text == "Error 1: Invalid appid.": - message = "Wolfram API key is invalid or missing." - color = Colours.soft_red - else: - message = response_text - color = Colours.soft_orange - - await send_embed(ctx, message, color) - - -def setup(bot: Bot) -> None: - """Load the Wolfram cog.""" - bot.add_cog(Wolfram(bot)) diff --git a/bot/exts/evergreen/wonder_twins.py b/bot/exts/evergreen/wonder_twins.py deleted file mode 100644 index 40edf785..00000000 --- a/bot/exts/evergreen/wonder_twins.py +++ /dev/null @@ -1,49 +0,0 @@ -import random -from pathlib import Path - -import yaml -from discord.ext.commands import Cog, Context, command - -from bot.bot import Bot - - -class WonderTwins(Cog): - """Cog for a Wonder Twins inspired command.""" - - def __init__(self): - with open(Path.cwd() / "bot" / "resources" / "evergreen" / "wonder_twins.yaml", "r", encoding="utf-8") as f: - info = yaml.load(f, Loader=yaml.FullLoader) - self.water_types = info["water_types"] - self.objects = info["objects"] - self.adjectives = info["adjectives"] - - @staticmethod - def append_onto(phrase: str, insert_word: str) -> str: - """Appends one word onto the end of another phrase in order to format with the proper determiner.""" - if insert_word.endswith("s"): - phrase = phrase.split() - del phrase[0] - phrase = " ".join(phrase) - - insert_word = insert_word.split()[-1] - return " ".join([phrase, insert_word]) - - def format_phrase(self) -> str: - """Creates a transformation phrase from available words.""" - adjective = random.choice((None, random.choice(self.adjectives))) - object_name = random.choice(self.objects) - water_type = random.choice(self.water_types) - - if adjective: - object_name = self.append_onto(adjective, object_name) - return f"{object_name} of {water_type}" - - @command(name="formof", aliases=("wondertwins", "wondertwin", "fo")) - async def form_of(self, ctx: Context) -> None: - """Command to send a Wonder Twins inspired phrase to the user invoking the command.""" - await ctx.send(f"Form of {self.format_phrase()}!") - - -def setup(bot: Bot) -> None: - """Load the WonderTwins cog.""" - bot.add_cog(WonderTwins()) diff --git a/bot/exts/evergreen/xkcd.py b/bot/exts/evergreen/xkcd.py deleted file mode 100644 index b56c53d9..00000000 --- a/bot/exts/evergreen/xkcd.py +++ /dev/null @@ -1,91 +0,0 @@ -import logging -import re -from random import randint -from typing import Optional, Union - -from discord import Embed -from discord.ext import tasks -from discord.ext.commands import Cog, Context, command - -from bot.bot import Bot -from bot.constants import Colours - -log = logging.getLogger(__name__) - -COMIC_FORMAT = re.compile(r"latest|[0-9]+") -BASE_URL = "https://xkcd.com" - - -class XKCD(Cog): - """Retrieving XKCD comics.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.latest_comic_info: dict[str, Union[str, int]] = {} - self.get_latest_comic_info.start() - - def cog_unload(self) -> None: - """Cancels refreshing of the task for refreshing the most recent comic info.""" - self.get_latest_comic_info.cancel() - - @tasks.loop(minutes=30) - async def get_latest_comic_info(self) -> None: - """Refreshes latest comic's information ever 30 minutes. Also used for finding a random comic.""" - async with self.bot.http_session.get(f"{BASE_URL}/info.0.json") as resp: - if resp.status == 200: - self.latest_comic_info = await resp.json() - else: - log.debug(f"Failed to get latest XKCD comic information. Status code {resp.status}") - - @command(name="xkcd") - async def fetch_xkcd_comics(self, ctx: Context, comic: Optional[str]) -> None: - """ - Getting an xkcd comic's information along with the image. - - To get a random comic, don't type any number as an argument. To get the latest, type 'latest'. - """ - embed = Embed(title=f"XKCD comic '{comic}'") - - embed.colour = Colours.soft_red - - if comic and (comic := re.match(COMIC_FORMAT, comic)) is None: - embed.description = "Comic parameter should either be an integer or 'latest'." - await ctx.send(embed=embed) - return - - comic = randint(1, self.latest_comic_info["num"]) if comic is None else comic.group(0) - - if comic == "latest": - info = self.latest_comic_info - else: - async with self.bot.http_session.get(f"{BASE_URL}/{comic}/info.0.json") as resp: - if resp.status == 200: - info = await resp.json() - else: - embed.title = f"XKCD comic #{comic}" - embed.description = f"{resp.status}: Could not retrieve xkcd comic #{comic}." - log.debug(f"Retrieving xkcd comic #{comic} failed with status code {resp.status}.") - await ctx.send(embed=embed) - return - - embed.title = f"XKCD comic #{info['num']}" - embed.description = info["alt"] - embed.url = f"{BASE_URL}/{info['num']}" - - if info["img"][-3:] in ("jpg", "png", "gif"): - embed.set_image(url=info["img"]) - date = f"{info['year']}/{info['month']}/{info['day']}" - embed.set_footer(text=f"{date} - #{info['num']}, \'{info['safe_title']}\'") - embed.colour = Colours.soft_green - else: - embed.description = ( - "The selected comic is interactive, and cannot be displayed within an embed.\n" - f"Comic can be viewed [here](https://xkcd.com/{info['num']})." - ) - - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the XKCD cog.""" - bot.add_cog(XKCD(bot)) |