diff options
Diffstat (limited to 'bot/exts/fun')
| -rw-r--r-- | bot/exts/fun/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/fun/battleship.py | 448 | ||||
| -rw-r--r-- | bot/exts/fun/catify.py | 86 | ||||
| -rw-r--r-- | bot/exts/fun/coinflip.py | 53 | ||||
| -rw-r--r-- | bot/exts/fun/connect_four.py | 452 | ||||
| -rw-r--r-- | bot/exts/fun/duck_game.py | 336 | ||||
| -rw-r--r-- | bot/exts/fun/fun.py | 250 | ||||
| -rw-r--r-- | bot/exts/fun/game.py | 485 | ||||
| -rw-r--r-- | bot/exts/fun/magic_8ball.py | 30 | ||||
| -rw-r--r-- | bot/exts/fun/minesweeper.py | 270 | ||||
| -rw-r--r-- | bot/exts/fun/movie.py | 205 | ||||
| -rw-r--r-- | bot/exts/fun/recommend_game.py | 51 | ||||
| -rw-r--r-- | bot/exts/fun/rps.py | 57 | ||||
| -rw-r--r-- | bot/exts/fun/snakes/__init__.py | 11 | ||||
| -rw-r--r-- | bot/exts/fun/snakes/_converter.py | 82 | ||||
| -rw-r--r-- | bot/exts/fun/snakes/_snakes_cog.py | 1151 | ||||
| -rw-r--r-- | bot/exts/fun/snakes/_utils.py | 721 | ||||
| -rw-r--r-- | bot/exts/fun/space.py | 236 | ||||
| -rw-r--r-- | bot/exts/fun/speedrun.py | 26 | ||||
| -rw-r--r-- | bot/exts/fun/status_codes.py | 87 | ||||
| -rw-r--r-- | bot/exts/fun/tic_tac_toe.py | 335 | ||||
| -rw-r--r-- | bot/exts/fun/trivia_quiz.py | 593 | ||||
| -rw-r--r-- | bot/exts/fun/wonder_twins.py | 49 | ||||
| -rw-r--r-- | bot/exts/fun/xkcd.py | 91 | 
24 files changed, 6105 insertions, 0 deletions
| diff --git a/bot/exts/fun/__init__.py b/bot/exts/fun/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/fun/__init__.py diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py new file mode 100644 index 00000000..f4351954 --- /dev/null +++ b/bot/exts/fun/battleship.py @@ -0,0 +1,448 @@ +import asyncio +import logging +import random +import re +from dataclasses import dataclass +from functools import partial +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +log = logging.getLogger(__name__) + + +@dataclass +class Square: +    """Each square on the battleship grid - if they contain a boat and if they've been aimed at.""" + +    boat: Optional[str] +    aimed: bool + + +Grid = list[list[Square]] +EmojiSet = dict[tuple[bool, bool], str] + + +@dataclass +class Player: +    """Each player in the game - their messages for the boards and their current grid.""" + +    user: Optional[discord.Member] +    board: Optional[discord.Message] +    opponent_board: discord.Message +    grid: Grid + + +# The name of the ship and its size +SHIPS = { +    "Carrier": 5, +    "Battleship": 4, +    "Cruiser": 3, +    "Submarine": 3, +    "Destroyer": 2, +} + + +# For these two variables, the first boolean is whether the square is a ship (True) or not (False). +# The second boolean is whether the player has aimed for that square (True) or not (False) + +# This is for the player's own board which shows the location of their own ships. +SHIP_EMOJIS = { +    (True, True): ":fire:", +    (True, False): ":ship:", +    (False, True): ":anger:", +    (False, False): ":ocean:", +} + +# This is for the opposing player's board which only shows aimed locations. +HIDDEN_EMOJIS = { +    (True, True): ":red_circle:", +    (True, False): ":black_circle:", +    (False, True): ":white_circle:", +    (False, False): ":black_circle:", +} + +# For the top row of the board +LETTERS = ( +    ":stop_button::regional_indicator_a::regional_indicator_b::regional_indicator_c::regional_indicator_d:" +    ":regional_indicator_e::regional_indicator_f::regional_indicator_g::regional_indicator_h:" +    ":regional_indicator_i::regional_indicator_j:" +) + +# For the first column of the board +NUMBERS = [ +    ":one:", +    ":two:", +    ":three:", +    ":four:", +    ":five:", +    ":six:", +    ":seven:", +    ":eight:", +    ":nine:", +    ":keycap_ten:", +] + +CROSS_EMOJI = "\u274e" +HAND_RAISED_EMOJI = "\U0001f64b" + + +class Game: +    """A Battleship Game.""" + +    def __init__( +        self, +        bot: Bot, +        channel: discord.TextChannel, +        player1: discord.Member, +        player2: discord.Member +    ): + +        self.bot = bot +        self.public_channel = channel + +        self.p1 = Player(player1, None, None, self.generate_grid()) +        self.p2 = Player(player2, None, None, self.generate_grid()) + +        self.gameover: bool = False + +        self.turn: Optional[discord.Member] = None +        self.next: Optional[discord.Member] = None + +        self.match: Optional[re.Match] = None +        self.surrender: bool = False + +        self.setup_grids() + +    @staticmethod +    def generate_grid() -> Grid: +        """Generates a grid by instantiating the Squares.""" +        return [[Square(None, False) for _ in range(10)] for _ in range(10)] + +    @staticmethod +    def format_grid(player: Player, emojiset: EmojiSet) -> str: +        """ +        Gets and formats the grid as a list into a string to be output to the DM. + +        Also adds the Letter and Number indexes. +        """ +        grid = [ +            [emojiset[bool(square.boat), square.aimed] for square in row] +            for row in player.grid +        ] + +        rows = ["".join([number] + row) for number, row in zip(NUMBERS, grid)] +        return "\n".join([LETTERS] + rows) + +    @staticmethod +    def get_square(grid: Grid, square: str) -> Square: +        """Grabs a square from a grid with an inputted key.""" +        index = ord(square[0].upper()) - ord("A") +        number = int(square[1:]) + +        return grid[number-1][index]  # -1 since lists are indexed from 0 + +    async def game_over( +        self, +        *, +        winner: discord.Member, +        loser: discord.Member +    ) -> None: +        """Removes games from list of current games and announces to public chat.""" +        await self.public_channel.send(f"Game Over! {winner.mention} won against {loser.mention}") + +        for player in (self.p1, self.p2): +            grid = self.format_grid(player, SHIP_EMOJIS) +            await self.public_channel.send(f"{player.user}'s Board:\n{grid}") + +    @staticmethod +    def check_sink(grid: Grid, boat: str) -> bool: +        """Checks if all squares containing a given boat have sunk.""" +        return all(square.aimed for row in grid for square in row if square.boat == boat) + +    @staticmethod +    def check_gameover(grid: Grid) -> bool: +        """Checks if all boats have been sunk.""" +        return all(square.aimed for row in grid for square in row if square.boat) + +    def setup_grids(self) -> None: +        """Places the boats on the grids to initialise the game.""" +        for player in (self.p1, self.p2): +            for name, size in SHIPS.items(): +                while True:  # Repeats if about to overwrite another boat +                    ship_collision = False +                    coords = [] + +                    coord1 = random.randint(0, 9) +                    coord2 = random.randint(0, 10 - size) + +                    if random.choice((True, False)):  # Vertical or Horizontal +                        x, y = coord1, coord2 +                        xincr, yincr = 0, 1 +                    else: +                        x, y = coord2, coord1 +                        xincr, yincr = 1, 0 + +                    for i in range(size): +                        new_x = x + (xincr * i) +                        new_y = y + (yincr * i) +                        if player.grid[new_x][new_y].boat:  # Check if there's already a boat +                            ship_collision = True +                            break +                        coords.append((new_x, new_y)) +                    if not ship_collision:  # If not overwriting any other boat spaces, break loop +                        break + +                for x, y in coords: +                    player.grid[x][y].boat = name + +    async def print_grids(self) -> None: +        """Prints grids to the DM channels.""" +        # Convert squares into Emoji + +        boards = [ +            self.format_grid(player, emojiset) +            for emojiset in (HIDDEN_EMOJIS, SHIP_EMOJIS) +            for player in (self.p1, self.p2) +        ] + +        locations = ( +            (self.p2, "opponent_board"), (self.p1, "opponent_board"), +            (self.p1, "board"), (self.p2, "board") +        ) + +        for board, location in zip(boards, locations): +            player, attr = location +            if getattr(player, attr): +                await getattr(player, attr).edit(content=board) +            else: +                setattr(player, attr, await player.user.send(board)) + +    def predicate(self, message: discord.Message) -> bool: +        """Predicate checking the message typed for each turn.""" +        if message.author == self.turn.user and message.channel == self.turn.user.dm_channel: +            if message.content.lower() == "surrender": +                self.surrender = True +                return True +            self.match = re.fullmatch("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip()) +            if not self.match: +                self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI)) +            return bool(self.match) + +    async def take_turn(self) -> Optional[Square]: +        """Lets the player who's turn it is choose a square.""" +        square = None +        turn_message = await self.turn.user.send( +            "It's your turn! Type the square you want to fire at. Format it like this: A1\n" +            "Type `surrender` to give up." +        ) +        await self.next.user.send("Their turn", delete_after=3.0) +        while True: +            try: +                await self.bot.wait_for("message", check=self.predicate, timeout=60.0) +            except asyncio.TimeoutError: +                await self.turn.user.send("You took too long. Game over!") +                await self.next.user.send(f"{self.turn.user} took too long. Game over!") +                await self.public_channel.send( +                    f"Game over! {self.turn.user.mention} timed out so {self.next.user.mention} wins!" +                ) +                self.gameover = True +                break +            else: +                if self.surrender: +                    await self.next.user.send(f"{self.turn.user} surrendered. Game over!") +                    await self.public_channel.send( +                        f"Game over! {self.turn.user.mention} surrendered to {self.next.user.mention}!" +                    ) +                    self.gameover = True +                    break +                square = self.get_square(self.next.grid, self.match.string) +                if square.aimed: +                    await self.turn.user.send("You've already aimed at this square!", delete_after=3.0) +                else: +                    break +        await turn_message.delete() +        return square + +    async def hit(self, square: Square, alert_messages: list[discord.Message]) -> None: +        """Occurs when a player successfully aims for a ship.""" +        await self.turn.user.send("Hit!", delete_after=3.0) +        alert_messages.append(await self.next.user.send("Hit!")) +        if self.check_sink(self.next.grid, square.boat): +            await self.turn.user.send(f"You've sunk their {square.boat} ship!", delete_after=3.0) +            alert_messages.append(await self.next.user.send(f"Oh no! Your {square.boat} ship sunk!")) +            if self.check_gameover(self.next.grid): +                await self.turn.user.send("You win!") +                await self.next.user.send("You lose!") +                self.gameover = True +                await self.game_over(winner=self.turn.user, loser=self.next.user) + +    async def start_game(self) -> None: +        """Begins the game.""" +        await self.p1.user.send(f"You're playing battleship with {self.p2.user}.") +        await self.p2.user.send(f"You're playing battleship with {self.p1.user}.") + +        alert_messages = [] + +        self.turn = self.p1 +        self.next = self.p2 + +        while True: +            await self.print_grids() + +            if self.gameover: +                return + +            square = await self.take_turn() +            if not square: +                return +            square.aimed = True + +            for message in alert_messages: +                await message.delete() + +            alert_messages = [] +            alert_messages.append(await self.next.user.send(f"{self.turn.user} aimed at {self.match.string}!")) + +            if square.boat: +                await self.hit(square, alert_messages) +                if self.gameover: +                    return +            else: +                await self.turn.user.send("Miss!", delete_after=3.0) +                alert_messages.append(await self.next.user.send("Miss!")) + +            self.turn, self.next = self.next, self.turn + + +class Battleship(commands.Cog): +    """Play the classic game Battleship!""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.games: list[Game] = [] +        self.waiting: list[discord.Member] = [] + +    def predicate( +        self, +        ctx: commands.Context, +        announcement: discord.Message, +        reaction: discord.Reaction, +        user: discord.Member +    ) -> bool: +        """Predicate checking the criteria for the announcement message.""" +        if self.already_playing(ctx.author):  # If they've joined a game since requesting a player 2 +            return True  # Is dealt with later on +        if ( +            user.id not in (ctx.me.id, ctx.author.id) +            and str(reaction.emoji) == HAND_RAISED_EMOJI +            and reaction.message.id == announcement.id +        ): +            if self.already_playing(user): +                self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) +                self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) +                return False + +            if user in self.waiting: +                self.bot.loop.create_task(ctx.send( +                    f"{user.mention} Please cancel your game first before joining another one." +                )) +                self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) +                return False + +            return True + +        if ( +            user.id == ctx.author.id +            and str(reaction.emoji) == CROSS_EMOJI +            and reaction.message.id == announcement.id +        ): +            return True +        return False + +    def already_playing(self, player: discord.Member) -> bool: +        """Check if someone is already in a game.""" +        return any(player in (game.p1.user, game.p2.user) for game in self.games) + +    @commands.group(invoke_without_command=True) +    @commands.guild_only() +    async def battleship(self, ctx: commands.Context) -> None: +        """ +        Play a game of Battleship with someone else! + +        This will set up a message waiting for someone else to react and play along. +        The game takes place entirely in DMs. +        Make sure you have your DMs open so that the bot can message you. +        """ +        if self.already_playing(ctx.author): +            await ctx.send("You're already playing a game!") +            return + +        if ctx.author in self.waiting: +            await ctx.send("You've already sent out a request for a player 2.") +            return + +        announcement = await ctx.send( +            "**Battleship**: A new game is about to start!\n" +            f"Press {HAND_RAISED_EMOJI} to play against {ctx.author.mention}!\n" +            f"(Cancel the game with {CROSS_EMOJI}.)" +        ) +        self.waiting.append(ctx.author) +        await announcement.add_reaction(HAND_RAISED_EMOJI) +        await announcement.add_reaction(CROSS_EMOJI) + +        try: +            reaction, user = await self.bot.wait_for( +                "reaction_add", +                check=partial(self.predicate, ctx, announcement), +                timeout=60.0 +            ) +        except asyncio.TimeoutError: +            self.waiting.remove(ctx.author) +            await announcement.delete() +            await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...") +            return + +        if str(reaction.emoji) == CROSS_EMOJI: +            self.waiting.remove(ctx.author) +            await announcement.delete() +            await ctx.send(f"{ctx.author.mention} Game cancelled.") +            return + +        await announcement.delete() +        self.waiting.remove(ctx.author) +        if self.already_playing(ctx.author): +            return +        game = Game(self.bot, ctx.channel, ctx.author, user) +        self.games.append(game) +        try: +            await game.start_game() +            self.games.remove(game) +        except discord.Forbidden: +            await ctx.send( +                f"{ctx.author.mention} {user.mention} " +                "Game failed. This is likely due to you not having your DMs open. Check and try again." +            ) +            self.games.remove(game) +        except Exception: +            # End the game in the event of an unforseen error so the players aren't stuck in a game +            await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed.") +            self.games.remove(game) +            raise + +    @battleship.command(name="ships", aliases=("boats",)) +    async def battleship_ships(self, ctx: commands.Context) -> None: +        """Lists the ships that are found on the battleship grid.""" +        embed = discord.Embed(colour=Colours.blue) +        embed.add_field(name="Name", value="\n".join(SHIPS)) +        embed.add_field(name="Size", value="\n".join(str(size) for size in SHIPS.values())) +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the Battleship Cog.""" +    bot.add_cog(Battleship(bot)) diff --git a/bot/exts/fun/catify.py b/bot/exts/fun/catify.py new file mode 100644 index 00000000..32dfae09 --- /dev/null +++ b/bot/exts/fun/catify.py @@ -0,0 +1,86 @@ +import random +from contextlib import suppress +from typing import Optional + +from discord import AllowedMentions, Embed, Forbidden +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Cats, Colours, NEGATIVE_REPLIES +from bot.utils import helpers + + +class Catify(commands.Cog): +    """Cog for the catify command.""" + +    @commands.command(aliases=("ᓚᘏᗢify", "ᓚᘏᗢ")) +    @commands.cooldown(1, 5, commands.BucketType.user) +    async def catify(self, ctx: commands.Context, *, text: Optional[str]) -> None: +        """ +        Convert the provided text into a cat themed sentence by interspercing cats throughout text. + +        If no text is given then the users nickname is edited. +        """ +        if not text: +            display_name = ctx.author.display_name + +            if len(display_name) > 26: +                embed = Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description=( +                        "Your display name is too long to be catified! " +                        "Please change it to be under 26 characters." +                    ), +                    color=Colours.soft_red +                ) +                await ctx.send(embed=embed) +                return + +            else: +                display_name += f" | {random.choice(Cats.cats)}" + +                await ctx.send(f"Your catified nickname is: `{display_name}`", allowed_mentions=AllowedMentions.none()) + +                with suppress(Forbidden): +                    await ctx.author.edit(nick=display_name) +        else: +            if len(text) >= 1500: +                embed = Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description="Submitted text was too large! Please submit something under 1500 characters.", +                    color=Colours.soft_red +                ) +                await ctx.send(embed=embed) +                return + +            string_list = text.split() +            for index, name in enumerate(string_list): +                name = name.lower() +                if "cat" in name: +                    if random.randint(0, 5) == 5: +                        string_list[index] = name.replace("cat", f"**{random.choice(Cats.cats)}**") +                    else: +                        string_list[index] = name.replace("cat", random.choice(Cats.cats)) +                for element in Cats.cats: +                    if element in name: +                        string_list[index] = name.replace(element, "cat") + +            string_len = len(string_list) // 3 or len(string_list) + +            for _ in range(random.randint(1, string_len)): +                # insert cat at random index +                if random.randint(0, 5) == 5: +                    string_list.insert(random.randint(0, len(string_list)), f"**{random.choice(Cats.cats)}**") +                else: +                    string_list.insert(random.randint(0, len(string_list)), random.choice(Cats.cats)) + +            text = helpers.suppress_links(" ".join(string_list)) +            await ctx.send( +                f">>> {text}", +                allowed_mentions=AllowedMentions.none() +            ) + + +def setup(bot: Bot) -> None: +    """Loads the catify cog.""" +    bot.add_cog(Catify()) diff --git a/bot/exts/fun/coinflip.py b/bot/exts/fun/coinflip.py new file mode 100644 index 00000000..804306bd --- /dev/null +++ b/bot/exts/fun/coinflip.py @@ -0,0 +1,53 @@ +import random + +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Emojis + + +class CoinSide(commands.Converter): +    """Class used to convert the `side` parameter of coinflip command.""" + +    HEADS = ("h", "head", "heads") +    TAILS = ("t", "tail", "tails") + +    async def convert(self, ctx: commands.Context, side: str) -> str: +        """Converts the provided `side` into the corresponding string.""" +        side = side.lower() +        if side in self.HEADS: +            return "heads" + +        if side in self.TAILS: +            return "tails" + +        raise commands.BadArgument(f"{side!r} is not a valid coin side.") + + +class CoinFlip(commands.Cog): +    """Cog for the CoinFlip command.""" + +    @commands.command(name="coinflip", aliases=("flip", "coin", "cf")) +    async def coinflip_command(self, ctx: commands.Context, side: CoinSide = None) -> None: +        """ +        Flips a coin. + +        If `side` is provided will state whether you guessed the side correctly. +        """ +        flipped_side = random.choice(["heads", "tails"]) + +        message = f"{ctx.author.mention} flipped **{flipped_side}**. " +        if not side: +            await ctx.send(message) +            return + +        if side == flipped_side: +            message += f"You guessed correctly! {Emojis.lemon_hyperpleased}" +        else: +            message += f"You guessed incorrectly. {Emojis.lemon_pensive}" +        await ctx.send(message) + + +def setup(bot: Bot) -> None: +    """Loads the coinflip cog.""" +    bot.add_cog(CoinFlip()) diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py new file mode 100644 index 00000000..647bb2b7 --- /dev/null +++ b/bot/exts/fun/connect_four.py @@ -0,0 +1,452 @@ +import asyncio +import random +from functools import partial +from typing import Optional, Union + +import discord +import emojis +from discord.ext import commands +from discord.ext.commands import guild_only + +from bot.bot import Bot +from bot.constants import Emojis + +NUMBERS = list(Emojis.number_emojis.values()) +CROSS_EMOJI = Emojis.incident_unactioned + +Coordinate = Optional[tuple[int, int]] +EMOJI_CHECK = Union[discord.Emoji, str] + + +class Game: +    """A Connect 4 Game.""" + +    def __init__( +        self, +        bot: Bot, +        channel: discord.TextChannel, +        player1: discord.Member, +        player2: Optional[discord.Member], +        tokens: list[str], +        size: int = 7 +    ): +        self.bot = bot +        self.channel = channel +        self.player1 = player1 +        self.player2 = player2 or AI(self.bot, game=self) +        self.tokens = tokens + +        self.grid = self.generate_board(size) +        self.grid_size = size + +        self.unicode_numbers = NUMBERS[:self.grid_size] + +        self.message = None + +        self.player_active = None +        self.player_inactive = None + +    @staticmethod +    def generate_board(size: int) -> list[list[int]]: +        """Generate the connect 4 board.""" +        return [[0 for _ in range(size)] for _ in range(size)] + +    async def print_grid(self) -> None: +        """Formats and outputs the Connect Four grid to the channel.""" +        title = ( +            f"Connect 4: {self.player1.display_name}" +            f" VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}" +        ) + +        rows = [" ".join(self.tokens[s] for s in row) for row in self.grid] +        first_row = " ".join(x for x in NUMBERS[:self.grid_size]) +        formatted_grid = "\n".join([first_row] + rows) +        embed = discord.Embed(title=title, description=formatted_grid) + +        if self.message: +            await self.message.edit(embed=embed) +        else: +            self.message = await self.channel.send(content="Loading...") +            for emoji in self.unicode_numbers: +                await self.message.add_reaction(emoji) +            await self.message.add_reaction(CROSS_EMOJI) +            await self.message.edit(content=None, embed=embed) + +    async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None: +        """Announces to public chat.""" +        if action == "win": +            await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}") +        elif action == "draw": +            await self.channel.send(f"Game Over! {player1.mention} {player2.mention} It's A Draw :tada:") +        elif action == "quit": +            await self.channel.send(f"{self.player1.mention} surrendered. Game over!") +        await self.print_grid() + +    async def start_game(self) -> None: +        """Begins the game.""" +        self.player_active, self.player_inactive = self.player1, self.player2 + +        while True: +            await self.print_grid() + +            if isinstance(self.player_active, AI): +                coords = self.player_active.play() +                if not coords: +                    await self.game_over( +                        "draw", +                        self.bot.user if isinstance(self.player_active, AI) else self.player_active, +                        self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, +                    ) +            else: +                coords = await self.player_turn() + +            if not coords: +                return + +            if self.check_win(coords, 1 if self.player_active == self.player1 else 2): +                await self.game_over( +                    "win", +                    self.bot.user if isinstance(self.player_active, AI) else self.player_active, +                    self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, +                ) +                return + +            self.player_active, self.player_inactive = self.player_inactive, self.player_active + +    def predicate(self, reaction: discord.Reaction, user: discord.Member) -> bool: +        """The predicate to check for the player's reaction.""" +        return ( +            reaction.message.id == self.message.id +            and user.id == self.player_active.id +            and str(reaction.emoji) in (*self.unicode_numbers, CROSS_EMOJI) +        ) + +    async def player_turn(self) -> Coordinate: +        """Initiate the player's turn.""" +        message = await self.channel.send( +            f"{self.player_active.mention}, it's your turn! React with the column you want to place your token in." +        ) +        player_num = 1 if self.player_active == self.player1 else 2 +        while True: +            try: +                reaction, user = await self.bot.wait_for("reaction_add", check=self.predicate, timeout=30.0) +            except asyncio.TimeoutError: +                await self.channel.send(f"{self.player_active.mention}, you took too long. Game over!") +                return +            else: +                await message.delete() +                if str(reaction.emoji) == CROSS_EMOJI: +                    await self.game_over("quit", self.player_active, self.player_inactive) +                    return + +                await self.message.remove_reaction(reaction, user) + +                column_num = self.unicode_numbers.index(str(reaction.emoji)) +                column = [row[column_num] for row in self.grid] + +                for row_num, square in reversed(list(enumerate(column))): +                    if not square: +                        self.grid[row_num][column_num] = player_num +                        return row_num, column_num +                message = await self.channel.send(f"Column {column_num + 1} is full. Try again") + +    def check_win(self, coords: Coordinate, player_num: int) -> bool: +        """Check that placing a counter here would cause the player to win.""" +        vertical = [(-1, 0), (1, 0)] +        horizontal = [(0, 1), (0, -1)] +        forward_diag = [(-1, 1), (1, -1)] +        backward_diag = [(-1, -1), (1, 1)] +        axes = [vertical, horizontal, forward_diag, backward_diag] + +        for axis in axes: +            counters_in_a_row = 1  # The initial counter that is compared to +            for (row_incr, column_incr) in axis: +                row, column = coords +                row += row_incr +                column += column_incr + +                while 0 <= row < self.grid_size and 0 <= column < self.grid_size: +                    if self.grid[row][column] == player_num: +                        counters_in_a_row += 1 +                        row += row_incr +                        column += column_incr +                    else: +                        break +            if counters_in_a_row >= 4: +                return True +        return False + + +class AI: +    """The Computer Player for Single-Player games.""" + +    def __init__(self, bot: Bot, game: Game): +        self.game = game +        self.mention = bot.user.mention + +    def get_possible_places(self) -> list[Coordinate]: +        """Gets all the coordinates where the AI could possibly place a counter.""" +        possible_coords = [] +        for column_num in range(self.game.grid_size): +            column = [row[column_num] for row in self.game.grid] +            for row_num, square in reversed(list(enumerate(column))): +                if not square: +                    possible_coords.append((row_num, column_num)) +                    break +        return possible_coords + +    def check_ai_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]: +        """ +        Check AI win. + +        Check if placing a counter in any possible coordinate would cause the AI to win +        with 10% chance of not winning and returning None +        """ +        if random.randint(1, 10) == 1: +            return +        for coords in coord_list: +            if self.game.check_win(coords, 2): +                return coords + +    def check_player_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]: +        """ +        Check Player win. + +        Check if placing a counter in possible coordinates would stop the player +        from winning with 25% of not blocking them  and returning None. +        """ +        if random.randint(1, 4) == 1: +            return +        for coords in coord_list: +            if self.game.check_win(coords, 1): +                return coords + +    @staticmethod +    def random_coords(coord_list: list[Coordinate]) -> Coordinate: +        """Picks a random coordinate from the possible ones.""" +        return random.choice(coord_list) + +    def play(self) -> Union[Coordinate, bool]: +        """ +        Plays for the AI. + +        Gets all possible coords, and determins the move: +        1. coords where it can win. +        2. coords where the player can win. +        3. Random coord +        The first possible value is choosen. +        """ +        possible_coords = self.get_possible_places() + +        if not possible_coords: +            return False + +        coords = ( +            self.check_ai_win(possible_coords) +            or self.check_player_win(possible_coords) +            or self.random_coords(possible_coords) +        ) + +        row, column = coords +        self.game.grid[row][column] = 2 +        return coords + + +class ConnectFour(commands.Cog): +    """Connect Four. The Classic Vertical Four-in-a-row Game!""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.games: list[Game] = [] +        self.waiting: list[discord.Member] = [] + +        self.tokens = [":white_circle:", ":blue_circle:", ":red_circle:"] + +        self.max_board_size = 9 +        self.min_board_size = 5 + +    async def check_author(self, ctx: commands.Context, board_size: int) -> bool: +        """Check if the requester is free and the board size is correct.""" +        if self.already_playing(ctx.author): +            await ctx.send("You're already playing a game!") +            return False + +        if ctx.author in self.waiting: +            await ctx.send("You've already sent out a request for a player 2") +            return False + +        if not self.min_board_size <= board_size <= self.max_board_size: +            await ctx.send( +                f"{board_size} is not a valid board size. A valid board size is " +                f"between `{self.min_board_size}` and `{self.max_board_size}`." +            ) +            return False + +        return True + +    def get_player( +        self, +        ctx: commands.Context, +        announcement: discord.Message, +        reaction: discord.Reaction, +        user: discord.Member +    ) -> bool: +        """Predicate checking the criteria for the announcement message.""" +        if self.already_playing(ctx.author):  # If they've joined a game since requesting a player 2 +            return True  # Is dealt with later on + +        if ( +            user.id not in (ctx.me.id, ctx.author.id) +            and str(reaction.emoji) == Emojis.hand_raised +            and reaction.message.id == announcement.id +        ): +            if self.already_playing(user): +                self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) +                self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) +                return False + +            if user in self.waiting: +                self.bot.loop.create_task(ctx.send( +                    f"{user.mention} Please cancel your game first before joining another one." +                )) +                self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) +                return False + +            return True + +        if ( +            user.id == ctx.author.id +            and str(reaction.emoji) == CROSS_EMOJI +            and reaction.message.id == announcement.id +        ): +            return True +        return False + +    def already_playing(self, player: discord.Member) -> bool: +        """Check if someone is already in a game.""" +        return any(player in (game.player1, game.player2) for game in self.games) + +    @staticmethod +    def check_emojis( +        e1: EMOJI_CHECK, e2: EMOJI_CHECK +    ) -> tuple[bool, Optional[str]]: +        """Validate the emojis, the user put.""" +        if isinstance(e1, str) and emojis.count(e1) != 1: +            return False, e1 +        if isinstance(e2, str) and emojis.count(e2) != 1: +            return False, e2 +        return True, None + +    async def _play_game( +        self, +        ctx: commands.Context, +        user: Optional[discord.Member], +        board_size: int, +        emoji1: str, +        emoji2: str +    ) -> None: +        """Helper for playing a game of connect four.""" +        self.tokens = [":white_circle:", str(emoji1), str(emoji2)] +        game = None  # if game fails to intialize in try...except + +        try: +            game = Game(self.bot, ctx.channel, ctx.author, user, self.tokens, size=board_size) +            self.games.append(game) +            await game.start_game() +            self.games.remove(game) +        except Exception: +            # End the game in the event of an unforeseen error so the players aren't stuck in a game +            await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed.") +            if game in self.games: +                self.games.remove(game) +            raise + +    @guild_only() +    @commands.group( +        invoke_without_command=True, +        aliases=("4inarow", "connect4", "connectfour", "c4"), +        case_insensitive=True +    ) +    async def connect_four( +        self, +        ctx: commands.Context, +        board_size: int = 7, +        emoji1: EMOJI_CHECK = "\U0001f535", +        emoji2: EMOJI_CHECK = "\U0001f534" +    ) -> None: +        """ +        Play the classic game of Connect Four with someone! + +        Sets up a message waiting for someone else to react and play along. +        The game will start once someone has reacted. +        All inputs will be through reactions. +        """ +        check, emoji = self.check_emojis(emoji1, emoji2) +        if not check: +            raise commands.EmojiNotFound(emoji) + +        check_author_result = await self.check_author(ctx, board_size) +        if not check_author_result: +            return + +        announcement = await ctx.send( +            "**Connect Four**: A new game is about to start!\n" +            f"Press {Emojis.hand_raised} to play against {ctx.author.mention}!\n" +            f"(Cancel the game with {CROSS_EMOJI}.)" +        ) +        self.waiting.append(ctx.author) +        await announcement.add_reaction(Emojis.hand_raised) +        await announcement.add_reaction(CROSS_EMOJI) + +        try: +            reaction, user = await self.bot.wait_for( +                "reaction_add", +                check=partial(self.get_player, ctx, announcement), +                timeout=60.0 +            ) +        except asyncio.TimeoutError: +            self.waiting.remove(ctx.author) +            await announcement.delete() +            await ctx.send( +                f"{ctx.author.mention} Seems like there's no one here to play. " +                f"Use `{ctx.prefix}{ctx.invoked_with} ai` to play against a computer." +            ) +            return + +        if str(reaction.emoji) == CROSS_EMOJI: +            self.waiting.remove(ctx.author) +            await announcement.delete() +            await ctx.send(f"{ctx.author.mention} Game cancelled.") +            return + +        await announcement.delete() +        self.waiting.remove(ctx.author) +        if self.already_playing(ctx.author): +            return + +        await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2)) + +    @guild_only() +    @connect_four.command(aliases=("bot", "computer", "cpu")) +    async def ai( +        self, +        ctx: commands.Context, +        board_size: int = 7, +        emoji1: EMOJI_CHECK = "\U0001f535", +        emoji2: EMOJI_CHECK = "\U0001f534" +    ) -> None: +        """Play Connect Four against a computer player.""" +        check, emoji = self.check_emojis(emoji1, emoji2) +        if not check: +            raise commands.EmojiNotFound(emoji) + +        check_author_result = await self.check_author(ctx, board_size) +        if not check_author_result: +            return + +        await self._play_game(ctx, None, board_size, str(emoji1), str(emoji2)) + + +def setup(bot: Bot) -> None: +    """Load ConnectFour Cog.""" +    bot.add_cog(ConnectFour(bot)) diff --git a/bot/exts/fun/duck_game.py b/bot/exts/fun/duck_game.py new file mode 100644 index 00000000..1ef7513f --- /dev/null +++ b/bot/exts/fun/duck_game.py @@ -0,0 +1,336 @@ +import asyncio +import random +import re +from collections import defaultdict +from io import BytesIO +from itertools import product +from pathlib import Path + +import discord +from PIL import Image, ImageDraw, ImageFont +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, MODERATION_ROLES +from bot.utils.decorators import with_role + +DECK = list(product(*[(0, 1, 2)]*4)) + +GAME_DURATION = 180 + +# Scoring +CORRECT_SOLN = 1 +INCORRECT_SOLN = -1 +CORRECT_GOOSE = 2 +INCORRECT_GOOSE = -1 + +# Distribution of minimum acceptable solutions at board generation. +# This is for gameplay reasons, to shift the number of solutions per board up, +# while still making the end of the game unpredictable. +# Note: this is *not* the same as the distribution of number of solutions. + +SOLN_DISTR = 0, 0.05, 0.05, 0.1, 0.15, 0.25, 0.2, 0.15, .05 + +IMAGE_PATH = Path("bot", "resources", "fun", "all_cards.png") +FONT_PATH = Path("bot", "resources", "fun", "LuckiestGuy-Regular.ttf") +HELP_IMAGE_PATH = Path("bot", "resources", "fun", "ducks_help_ex.png") + +ALL_CARDS = Image.open(IMAGE_PATH) +LABEL_FONT = ImageFont.truetype(str(FONT_PATH), size=16) +CARD_WIDTH = 155 +CARD_HEIGHT = 97 + +EMOJI_WRONG = "\u274C" + +ANSWER_REGEX = re.compile(r'^\D*(\d+)\D+(\d+)\D+(\d+)\D*$') + +HELP_TEXT = """ +**Each card has 4 features** +Color, Number, Hat, and Accessory + +**A valid flight** +3 cards where each feature is either all the same or all different + +**Call "GOOSE"** +if you think there are no more flights + +**+1** for each valid flight +**+2** for a correct "GOOSE" call +**-1** for any wrong answer + +The first flight below is invalid: the first card has swords while the other two have no accessory.\ + It would be valid if the first card was empty-handed, or one of the other two had paintbrushes. + +The second flight is valid because there are no 2:1 splits; each feature is either all the same or all different. +""" + + +def assemble_board_image(board: list[tuple[int]], rows: int, columns: int) -> Image: +    """Cut and paste images representing the given cards into an image representing the board.""" +    new_im = Image.new("RGBA", (CARD_WIDTH*columns, CARD_HEIGHT*rows)) +    draw = ImageDraw.Draw(new_im) +    for idx, card in enumerate(board): +        card_image = get_card_image(card) +        row, col = divmod(idx, columns) +        top, left = row * CARD_HEIGHT, col * CARD_WIDTH +        new_im.paste(card_image, (left, top)) +        draw.text( +            xy=(left+5, top+5),  # magic numbers are buffers for the card labels +            text=str(idx), +            fill=(0, 0, 0), +            font=LABEL_FONT, +        ) +    return new_im + + +def get_card_image(card: tuple[int]) -> Image: +    """Slice the image containing all the cards to get just this card.""" +    # The master card image file should have 9x9 cards, +    # arranged such that their features can be interpreted as ordered trinary. +    row, col = divmod(as_trinary(card), 9) +    x1 = col * CARD_WIDTH +    x2 = x1 + CARD_WIDTH +    y1 = row * CARD_HEIGHT +    y2 = y1 + CARD_HEIGHT +    return ALL_CARDS.crop((x1, y1, x2, y2)) + + +def as_trinary(card: tuple[int]) -> int: +    """Find the card's unique index by interpreting its features as trinary.""" +    return int(''.join(str(x) for x in card), base=3) + + +class DuckGame: +    """A class for a single game.""" + +    def __init__( +        self, +        rows: int = 4, +        columns: int = 3, +        minimum_solutions: int = 1, +    ): +        """ +        Take samples from the deck to generate a board. + +        Args: +            rows (int, optional): Rows in the game board. Defaults to 4. +            columns (int, optional): Columns in the game board. Defaults to 3. +            minimum_solutions (int, optional): Minimum acceptable number of solutions in the board. Defaults to 1. +        """ +        self.rows = rows +        self.columns = columns +        size = rows * columns + +        self._solutions = None +        self.claimed_answers = {} +        self.scores = defaultdict(int) +        self.editing_embed = asyncio.Lock() + +        self.board = random.sample(DECK, size) +        while len(self.solutions) < minimum_solutions: +            self.board = random.sample(DECK, size) + +    @property +    def board(self) -> list[tuple[int]]: +        """Accesses board property.""" +        return self._board + +    @board.setter +    def board(self, val: list[tuple[int]]) -> None: +        """Erases calculated solutions if the board changes.""" +        self._solutions = None +        self._board = val + +    @property +    def solutions(self) -> None: +        """Calculate valid solutions and cache to avoid redoing work.""" +        if self._solutions is None: +            self._solutions = set() +            for idx_a, card_a in enumerate(self.board): +                for idx_b, card_b in enumerate(self.board[idx_a+1:], start=idx_a+1): +                    # Two points determine a line, and there are exactly 3 points per line in {0,1,2}^4. +                    # The completion of a line will only be a duplicate point if the other two points are the same, +                    # which is prevented by the triangle iteration. +                    completion = tuple( +                        feat_a if feat_a == feat_b else 3-feat_a-feat_b +                        for feat_a, feat_b in zip(card_a, card_b) +                    ) +                    try: +                        idx_c = self.board.index(completion) +                    except ValueError: +                        continue + +                    # Indices within the solution are sorted to detect duplicate solutions modulo order. +                    solution = tuple(sorted((idx_a, idx_b, idx_c))) +                    self._solutions.add(solution) + +        return self._solutions + + +class DuckGamesDirector(commands.Cog): +    """A cog for running Duck Duck Duck Goose games.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.current_games = {} + +    @commands.group( +        name='duckduckduckgoose', +        aliases=['dddg', 'ddg', 'duckduckgoose', 'duckgoose'], +        invoke_without_command=True +    ) +    @commands.cooldown(rate=1, per=2, type=commands.BucketType.channel) +    async def start_game(self, ctx: commands.Context) -> None: +        """Generate a board, send the game embed, and end the game after a time limit.""" +        if ctx.channel.id in self.current_games: +            await ctx.send("There's already a game running!") +            return + +        minimum_solutions, = random.choices(range(len(SOLN_DISTR)), weights=SOLN_DISTR) +        game = DuckGame(minimum_solutions=minimum_solutions) +        game.running = True +        self.current_games[ctx.channel.id] = game + +        game.msg_content = "" +        game.embed_msg = await self.send_board_embed(ctx, game) +        await asyncio.sleep(GAME_DURATION) + +        # Checking for the channel ID in the currently running games is not sufficient. +        # The game could have been ended by a player, and a new game already started in the same channel. +        if game.running: +            try: +                del self.current_games[ctx.channel.id] +                await self.end_game(ctx.channel, game, end_message="Time's up!") +            except KeyError: +                pass + +    @commands.Cog.listener() +    async def on_message(self, msg: discord.Message) -> None: +        """Listen for messages and process them as answers if appropriate.""" +        if msg.author.bot: +            return + +        channel = msg.channel +        if channel.id not in self.current_games: +            return + +        game = self.current_games[channel.id] +        if msg.content.strip().lower() == 'goose': +            # If all of the solutions have been claimed, i.e. the "goose" call is correct. +            if len(game.solutions) == len(game.claimed_answers): +                try: +                    del self.current_games[channel.id] +                    game.scores[msg.author] += CORRECT_GOOSE +                    await self.end_game(channel, game, end_message=f"{msg.author.display_name} GOOSED!") +                except KeyError: +                    pass +            else: +                await msg.add_reaction(EMOJI_WRONG) +                game.scores[msg.author] += INCORRECT_GOOSE +            return + +        # Valid answers contain 3 numbers. +        if not (match := re.match(ANSWER_REGEX, msg.content)): +            return +        answer = tuple(sorted(int(m) for m in match.groups())) + +        # Be forgiving for answers that use indices not on the board. +        if not all(0 <= n < len(game.board) for n in answer): +            return + +        # Also be forgiving for answers that have already been claimed (and avoid penalizing for racing conditions). +        if answer in game.claimed_answers: +            return + +        if answer in game.solutions: +            game.claimed_answers[answer] = msg.author +            game.scores[msg.author] += CORRECT_SOLN +            await self.display_claimed_answer(game, msg.author, answer) +        else: +            await msg.add_reaction(EMOJI_WRONG) +            game.scores[msg.author] += INCORRECT_SOLN + +    async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message: +        """Create and send the initial game embed. This will be edited as the game goes on.""" +        image = assemble_board_image(game.board, game.rows, game.columns) +        with BytesIO() as image_stream: +            image.save(image_stream, format="png") +            image_stream.seek(0) +            file = discord.File(fp=image_stream, filename="board.png") +        embed = discord.Embed( +            title="Duck Duck Duck Goose!", +            color=Colours.bright_green, +        ) +        embed.set_image(url="attachment://board.png") +        return await ctx.send(embed=embed, file=file) + +    async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None: +        """Add a claimed answer to the game embed.""" +        async with game.editing_embed: +            # We specifically edit the message contents instead of the embed +            # Because we load in the image from the file, editing any portion of the embed +            # Does weird things to the image and this works around that weirdness +            game.msg_content = f"{game.msg_content}\n{str(answer):12s}  -  {author.display_name}" +            await game.embed_msg.edit(content=game.msg_content) + +    async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None: +        """Edit the game embed to reflect the end of the game and mark the game as not running.""" +        game.running = False + +        scoreboard_embed = discord.Embed( +            title=end_message, +            color=discord.Color.dark_purple(), +        ) +        scores = sorted( +            game.scores.items(), +            key=lambda item: item[1], +            reverse=True, +        ) +        scoreboard = "Final scores:\n\n" +        scoreboard += "\n".join(f"{member.display_name}: {score}" for member, score in scores) +        scoreboard_embed.description = scoreboard +        await channel.send(embed=scoreboard_embed) + +        missed = [ans for ans in game.solutions if ans not in game.claimed_answers] +        if missed: +            missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed) +        else: +            missed_text = "All the flights were found!" + +        await game.embed_msg.edit(content=f"{missed_text}") + +    @start_game.command(name="help") +    async def show_rules(self, ctx: commands.Context) -> None: +        """Explain the rules of the game.""" +        await self.send_help_embed(ctx) + +    @start_game.command(name="stop") +    @with_role(*MODERATION_ROLES) +    async def stop_game(self, ctx: commands.Context) -> None: +        """Stop a currently running game. Only available to mods.""" +        try: +            game = self.current_games.pop(ctx.channel.id) +        except KeyError: +            await ctx.send("No game currently running in this channel") +            return +        await self.end_game(ctx.channel, game, end_message="Game canceled.") + +    @staticmethod +    async def send_help_embed(ctx: commands.Context) -> discord.Message: +        """Send rules embed.""" +        embed = discord.Embed( +            title="Compete against other players to find valid flights!", +            color=discord.Color.dark_purple(), +        ) +        embed.description = HELP_TEXT +        file = discord.File(HELP_IMAGE_PATH, filename="help.png") +        embed.set_image(url="attachment://help.png") +        embed.set_footer( +            text="Tip: using Discord's compact message display mode can help keep the board on the screen" +        ) +        return await ctx.send(file=file, embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the DuckGamesDirector cog.""" +    bot.add_cog(DuckGamesDirector(bot)) diff --git a/bot/exts/fun/fun.py b/bot/exts/fun/fun.py new file mode 100644 index 00000000..b148f1f3 --- /dev/null +++ b/bot/exts/fun/fun.py @@ -0,0 +1,250 @@ +import functools +import json +import logging +import random +from collections.abc import Iterable +from pathlib import Path +from typing import Callable, Optional, Union + +from discord import Embed, Message +from discord.ext import commands +from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content + +from bot import utils +from bot.bot import Bot +from bot.constants import Client, Colours, Emojis +from bot.utils import helpers + +log = logging.getLogger(__name__) + +UWU_WORDS = { +    "fi": "fwi", +    "l": "w", +    "r": "w", +    "some": "sum", +    "th": "d", +    "thing": "fing", +    "tho": "fo", +    "you're": "yuw'we", +    "your": "yur", +    "you": "yuw", +} + + +def caesar_cipher(text: str, offset: int) -> Iterable[str]: +    """ +    Implements a lazy Caesar Cipher algorithm. + +    Encrypts a `text` given a specific integer `offset`. The sign +    of the `offset` dictates the direction in which it shifts to, +    with a negative value shifting to the left, and a positive +    value shifting to the right. +    """ +    for char in text: +        if not char.isascii() or not char.isalpha() or char.isspace(): +            yield char +            continue + +        case_start = 65 if char.isupper() else 97 +        true_offset = (ord(char) - case_start + offset) % 26 + +        yield chr(case_start + true_offset) + + +class Fun(Cog): +    """A collection of general commands for fun.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +        self._caesar_cipher_embed = json.loads(Path("bot/resources/fun/caesar_info.json").read_text("UTF-8")) + +    @staticmethod +    def _get_random_die() -> str: +        """Generate a random die emoji, ready to be sent on Discord.""" +        die_name = f"dice_{random.randint(1, 6)}" +        return getattr(Emojis, die_name) + +    @commands.command() +    async def roll(self, ctx: Context, num_rolls: int = 1) -> None: +        """Outputs a number of random dice emotes (up to 6).""" +        if 1 <= num_rolls <= 6: +            dice = " ".join(self._get_random_die() for _ in range(num_rolls)) +            await ctx.send(dice) +        else: +            raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.") + +    @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) +    async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: +        """Converts a given `text` into it's uwu equivalent.""" +        conversion_func = functools.partial( +            utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True +        ) +        text, embed = await Fun._get_text_and_embed(ctx, text) +        # Convert embed if it exists +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) +        converted_text = conversion_func(text) +        converted_text = helpers.suppress_links(converted_text) +        # Don't put >>> if only embed present +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" +        await ctx.send(content=converted_text, embed=embed) + +    @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) +    async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: +        """Randomly converts the casing of a given `text`.""" +        def conversion_func(text: str) -> str: +            """Randomly converts the casing of a given string.""" +            return "".join( +                char.upper() if round(random.random()) else char.lower() for char in text +            ) +        text, embed = await Fun._get_text_and_embed(ctx, text) +        # Convert embed if it exists +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) +        converted_text = conversion_func(text) +        converted_text = helpers.suppress_links(converted_text) +        # Don't put >>> if only embed present +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" +        await ctx.send(content=converted_text, embed=embed) + +    @commands.group(name="caesarcipher", aliases=("caesar", "cc",)) +    async def caesarcipher_group(self, ctx: Context) -> None: +        """ +        Translates a message using the Caesar Cipher. + +        See `decrypt`, `encrypt`, and `info` subcommands. +        """ +        if ctx.invoked_subcommand is None: +            await ctx.invoke(self.bot.get_command("help"), "caesarcipher") + +    @caesarcipher_group.command(name="info") +    async def caesarcipher_info(self, ctx: Context) -> None: +        """Information about the Caesar Cipher.""" +        embed = Embed.from_dict(self._caesar_cipher_embed) +        embed.colour = Colours.dark_green + +        await ctx.send(embed=embed) + +    @staticmethod +    async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None: +        """ +        Given a positive integer `offset`, translates and sends the given `msg`. + +        Performs a right shift by default unless `left_shift` is specified as `True`. + +        Also accepts a valid Discord Message ID or link. +        """ +        if offset < 0: +            await ctx.send(":no_entry: Cannot use a negative offset.") +            return + +        if left_shift: +            offset = -offset + +        def conversion_func(text: str) -> str: +            """Encrypts the given string using the Caesar Cipher.""" +            return "".join(caesar_cipher(text, offset)) + +        text, embed = await Fun._get_text_and_embed(ctx, msg) + +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) + +        converted_text = conversion_func(text) + +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" + +        await ctx.send(content=converted_text, embed=embed) + +    @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",)) +    async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None: +        """ +        Given a positive integer `offset`, encrypt the given `msg`. + +        Performs a right shift of the letters in the message. + +        Also accepts a valid Discord Message ID or link. +        """ +        await self._caesar_cipher(ctx, offset, msg, left_shift=False) + +    @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",)) +    async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None: +        """ +        Given a positive integer `offset`, decrypt the given `msg`. + +        Performs a left shift of the letters in the message. + +        Also accepts a valid Discord Message ID or link. +        """ +        await self._caesar_cipher(ctx, offset, msg, left_shift=True) + +    @staticmethod +    async def _get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]: +        """ +        Attempts to extract the text and embed from a possible link to a discord Message. + +        Does not retrieve the text and embed from the Message if it is in a channel the user does +        not have read permissions in. + +        Returns a tuple of: +            str: If `text` is a valid discord Message, the contents of the message, else `text`. +            Optional[Embed]: The embed if found in the valid Message, else None +        """ +        embed = None + +        msg = await Fun._get_discord_message(ctx, text) +        # Ensure the user has read permissions for the channel the message is in +        if isinstance(msg, Message): +            permissions = msg.channel.permissions_for(ctx.author) +            if permissions.read_messages: +                text = msg.clean_content +                # Take first embed because we can't send multiple embeds +                if msg.embeds: +                    embed = msg.embeds[0] + +        return (text, embed) + +    @staticmethod +    async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]: +        """ +        Attempts to convert a given `text` to a discord Message object and return it. + +        Conversion will succeed if given a discord Message ID or link. +        Returns `text` if the conversion fails. +        """ +        try: +            text = await MessageConverter().convert(ctx, text) +        except commands.BadArgument: +            log.debug(f"Input '{text:.20}...' is not a valid Discord Message") +        return text + +    @staticmethod +    def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: +        """ +        Converts the text in an embed using a given conversion function, then return the embed. + +        Only modifies the following fields: title, description, footer, fields +        """ +        embed_dict = embed.to_dict() + +        embed_dict["title"] = func(embed_dict.get("title", "")) +        embed_dict["description"] = func(embed_dict.get("description", "")) + +        if "footer" in embed_dict: +            embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) + +        if "fields" in embed_dict: +            for field in embed_dict["fields"]: +                field["name"] = func(field.get("name", "")) +                field["value"] = func(field.get("value", "")) + +        return Embed.from_dict(embed_dict) + + +def setup(bot: Bot) -> None: +    """Load the Fun cog.""" +    bot.add_cog(Fun(bot)) diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py new file mode 100644 index 00000000..f9c150e6 --- /dev/null +++ b/bot/exts/fun/game.py @@ -0,0 +1,485 @@ +import difflib +import logging +import random +import re +from asyncio import sleep +from datetime import datetime as dt, timedelta +from enum import IntEnum +from typing import Any, Optional + +from aiohttp import ClientSession +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import STAFF_ROLES, Tokens +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import ImagePaginator, LinePaginator + +# Base URL of IGDB API +BASE_URL = "https://api.igdb.com/v4" + +CLIENT_ID = Tokens.igdb_client_id +CLIENT_SECRET = Tokens.igdb_client_secret + +# The number of seconds before expiry that we attempt to re-fetch a new access token +ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2 + +# URL to request API access token +OAUTH_URL = "https://id.twitch.tv/oauth2/token" + +OAUTH_PARAMS = { +    "client_id": CLIENT_ID, +    "client_secret": CLIENT_SECRET, +    "grant_type": "client_credentials" +} + +BASE_HEADERS = { +    "Client-ID": CLIENT_ID, +    "Accept": "application/json" +} + +logger = logging.getLogger(__name__) + +REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE) + +# --------- +# TEMPLATES +# --------- + +# Body templates +# Request body template for get_games_list +GAMES_LIST_BODY = ( +    "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status," +    "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;" +    "{sort} {limit} {offset} {genre} {additional}" +) + +# Request body template for get_companies_list +COMPANIES_LIST_BODY = ( +    "fields name, url, start_date, logo.image_id, developed.name, published.name, description;" +    "offset {offset}; limit {limit};" +) + +# Request body template for games search +SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";' + +# Pages templates +# Game embed layout +GAME_PAGE = ( +    "**[{name}]({url})**\n" +    "{description}" +    "**Release Date:** {release_date}\n" +    "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n" +    "**Platforms:** {platforms}\n" +    "**Status:** {status}\n" +    "**Age Ratings:** {age_ratings}\n" +    "**Made by:** {made_by}\n\n" +    "{storyline}" +) + +# .games company command page layout +COMPANY_PAGE = ( +    "**[{name}]({url})**\n" +    "{description}" +    "**Founded:** {founded}\n" +    "**Developed:** {developed}\n" +    "**Published:** {published}" +) + +# For .games search command line layout +GAME_SEARCH_LINE = ( +    "**[{name}]({url})**\n" +    "{rating}/100 :star: (based on {rating_count} ratings)\n" +) + +# URL templates +COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg" +LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png" + +# Create aliases for complex genre names +ALIASES = { +    "Role-playing (rpg)": ["Role playing", "Rpg"], +    "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"], +    "Real time strategy (rts)": ["Real time strategy", "Rts"], +    "Hack and slash/beat 'em up": ["Hack and slash"] +} + + +class GameStatus(IntEnum): +    """Game statuses in IGDB API.""" + +    Released = 0 +    Alpha = 2 +    Beta = 3 +    Early = 4 +    Offline = 5 +    Cancelled = 6 +    Rumored = 7 + + +class AgeRatingCategories(IntEnum): +    """IGDB API Age Rating categories IDs.""" + +    ESRB = 1 +    PEGI = 2 + + +class AgeRatings(IntEnum): +    """PEGI/ESRB ratings IGDB API IDs.""" + +    Three = 1 +    Seven = 2 +    Twelve = 3 +    Sixteen = 4 +    Eighteen = 5 +    RP = 6 +    EC = 7 +    E = 8 +    E10 = 9 +    T = 10 +    M = 11 +    AO = 12 + + +class Games(Cog): +    """Games Cog contains commands that collect data from IGDB.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.http_session: ClientSession = bot.http_session + +        self.genres: dict[str, int] = {} +        self.headers = BASE_HEADERS + +        self.bot.loop.create_task(self.renew_access_token()) + +    async def renew_access_token(self) -> None: +        """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`.""" +        while True: +            async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp: +                result = await resp.json() +                if resp.status != 200: +                    # If there is a valid access token continue to use that, +                    # otherwise unload cog. +                    if "Authorization" in self.headers: +                        time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW) +                        logger.error( +                            "Failed to renew IGDB access token. " +                            f"Current token will last for {time_delta} " +                            f"OAuth response message: {result['message']}" +                        ) +                    else: +                        logger.warning( +                            "Invalid OAuth credentials. Unloading Games cog. " +                            f"OAuth response message: {result['message']}" +                        ) +                        self.bot.remove_cog("Games") + +                    return + +            self.headers["Authorization"] = f"Bearer {result['access_token']}" + +            # Attempt to renew before the token expires +            next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW + +            time_delta = timedelta(seconds=next_renewal) +            logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}") + +            # This will be true the first time this loop runs. +            # Since we now have an access token, its safe to start this task. +            if self.genres == {}: +                self.refresh_genres_task.start() +            await sleep(next_renewal) + +    @tasks.loop(hours=24.0) +    async def refresh_genres_task(self) -> None: +        """Refresh genres in every hour.""" +        try: +            await self._get_genres() +        except Exception as e: +            logger.warning(f"There was error while refreshing genres: {e}") +            return +        logger.info("Successfully refreshed genres.") + +    def cog_unload(self) -> None: +        """Cancel genres refreshing start when unloading Cog.""" +        self.refresh_genres_task.cancel() +        logger.info("Successfully stopped Genres Refreshing task.") + +    async def _get_genres(self) -> None: +        """Create genres variable for games command.""" +        body = "fields name; limit 100;" +        async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp: +            result = await resp.json() +        genres = {genre["name"].capitalize(): genre["id"] for genre in result} + +        # Replace complex names with names from ALIASES +        for genre_name, genre in genres.items(): +            if genre_name in ALIASES: +                for alias in ALIASES[genre_name]: +                    self.genres[alias] = genre +            else: +                self.genres[genre_name] = genre + +    @group(name="games", aliases=("game",), invoke_without_command=True) +    async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None: +        """ +        Get random game(s) by genre from IGDB. Use .games genres command to get all available genres. + +        Also support amount parameter, what max is 25 and min 1, default 5. Supported formats: +        - .games <genre> +        - .games <amount> <genre> +        """ +        # When user didn't specified genre, send help message +        if genre is None: +            await invoke_help_command(ctx) +            return + +        # Capitalize genre for check +        genre = "".join(genre).capitalize() + +        # Check for amounts, max is 25 and min 1 +        if not 1 <= amount <= 25: +            await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") +            return + +        # Get games listing, if genre don't exist, show error message with possibilities. +        # Offset must be random, due otherwise we will get always same result (offset show in which position should +        # API start returning result) +        try: +            games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150)) +        except KeyError: +            possibilities = await self.get_best_results(genre) +            # If there is more than 1 possibilities, show these. +            # If there is only 1 possibility, use it as genre. +            # Otherwise send message about invalid genre. +            if len(possibilities) > 1: +                display_possibilities = "`, `".join(p[1] for p in possibilities) +                await ctx.send( +                    f"Invalid genre `{genre}`. " +                    f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}" +                ) +                return +            elif len(possibilities) == 1: +                games = await self.get_games_list( +                    amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150) +                ) +                genre = possibilities[0][1] +            else: +                await ctx.send(f"Invalid genre `{genre}`.") +                return + +        # Create pages and paginate +        pages = [await self.create_page(game) for game in games] + +        await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games")) + +    @games.command(name="top", aliases=("t",)) +    async def top(self, ctx: Context, amount: int = 10) -> None: +        """ +        Get current Top games in IGDB. + +        Support amount parameter. Max is 25, min is 1. +        """ +        if not 1 <= amount <= 25: +            await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") +            return + +        games = await self.get_games_list(amount, sort="total_rating desc", +                                          additional_body="where total_rating >= 90; sort total_rating_count desc;") + +        pages = [await self.create_page(game) for game in games] +        await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games")) + +    @games.command(name="genres", aliases=("genre", "g")) +    async def genres(self, ctx: Context) -> None: +        """Get all available genres.""" +        await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}") + +    @games.command(name="search", aliases=("s",)) +    async def search(self, ctx: Context, *, search_term: str) -> None: +        """Find games by name.""" +        lines = await self.search_games(search_term) + +        await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False) + +    @games.command(name="company", aliases=("companies",)) +    async def company(self, ctx: Context, amount: int = 5) -> None: +        """ +        Get random Game Companies companies from IGDB API. + +        Support amount parameter. Max is 25, min is 1. +        """ +        if not 1 <= amount <= 25: +            await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") +            return + +        # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to +        # get (almost) every time different companies (offset show in which position should API start returning result) +        companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150)) +        pages = [await self.create_company_page(co) for co in companies] + +        await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies")) + +    @with_role(*STAFF_ROLES) +    @games.command(name="refresh", aliases=("r",)) +    async def refresh_genres_command(self, ctx: Context) -> None: +        """Refresh .games command genres.""" +        try: +            await self._get_genres() +        except Exception as e: +            await ctx.send(f"There was error while refreshing genres: `{e}`") +            return +        await ctx.send("Successfully refreshed genres.") + +    async def get_games_list( +        self, +        amount: int, +        genre: Optional[str] = None, +        sort: Optional[str] = None, +        additional_body: str = "", +        offset: int = 0 +    ) -> list[dict[str, Any]]: +        """ +        Get list of games from IGDB API by parameters that is provided. + +        Amount param show how much games this get, genre is genre ID and at least one genre in game must this when +        provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field, +        desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start +        position in API. +        """ +        # Create body of IGDB API request, define fields, sorting, offset, limit and genre +        params = { +            "sort": f"sort {sort};" if sort else "", +            "limit": f"limit {amount};", +            "offset": f"offset {offset};" if offset else "", +            "genre": f"where genres = ({genre});" if genre else "", +            "additional": additional_body +        } +        body = GAMES_LIST_BODY.format(**params) + +        # Do request to IGDB API, create headers, URL, define body, return result +        async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: +            return await resp.json() + +    async def create_page(self, data: dict[str, Any]) -> tuple[str, str]: +        """Create content of Game Page.""" +        # Create cover image URL from template +        url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""}) + +        # Get release date separately with checking +        release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?" + +        # Create Age Ratings value +        rating = ", ".join( +            f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" +            for age in data["age_ratings"] +        ) if "age_ratings" in data else "?" + +        companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?" + +        # Create formatting for template page +        formatting = { +            "name": data["name"], +            "url": data["url"], +            "description": f"{data['summary']}\n\n" if "summary" in data else "\n", +            "release_date": release_date, +            "rating": round(data["total_rating"] if "total_rating" in data else 0, 2), +            "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?", +            "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?", +            "status": GameStatus(data["status"]).name if "status" in data else "?", +            "age_ratings": rating, +            "made_by": ", ".join(companies), +            "storyline": data["storyline"] if "storyline" in data else "" +        } +        page = GAME_PAGE.format(**formatting) + +        return page, url + +    async def search_games(self, search_term: str) -> list[str]: +        """Search game from IGDB API by string, return listing of pages.""" +        lines = [] + +        # Define request body of IGDB API request and do request +        body = SEARCH_BODY.format(**{"term": search_term}) + +        async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: +            data = await resp.json() + +        # Loop over games, format them to good format, make line and append this to total lines +        for game in data: +            formatting = { +                "name": game["name"], +                "url": game["url"], +                "rating": round(game["total_rating"] if "total_rating" in game else 0, 2), +                "rating_count": game["total_rating_count"] if "total_rating" in game else "?" +            } +            line = GAME_SEARCH_LINE.format(**formatting) +            lines.append(line) + +        return lines + +    async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]: +        """ +        Get random Game Companies from IGDB API. + +        Limit is parameter, that show how much movies this should return, offset show in which position should API start +        returning results. +        """ +        # Create request body from template +        body = COMPANIES_LIST_BODY.format(**{ +            "limit": limit, +            "offset": offset +        }) + +        async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp: +            return await resp.json() + +    async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]: +        """Create good formatted Game Company page.""" +        # Generate URL of company logo +        url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""}) + +        # Try to get found date of company +        founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?" + +        # Generate list of games, that company have developed or published +        developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?" +        published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?" + +        formatting = { +            "name": data["name"], +            "url": data["url"], +            "description": f"{data['description']}\n\n" if "description" in data else "\n", +            "founded": founded, +            "developed": developed, +            "published": published +        } +        page = COMPANY_PAGE.format(**formatting) + +        return page, url + +    async def get_best_results(self, query: str) -> list[tuple[float, str]]: +        """Get best match result of genre when original genre is invalid.""" +        results = [] +        for genre in self.genres: +            ratios = [difflib.SequenceMatcher(None, query, genre).ratio()] +            for word in REGEX_NON_ALPHABET.split(genre): +                ratios.append(difflib.SequenceMatcher(None, query, word).ratio()) +            results.append((round(max(ratios), 2), genre)) +        return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] + + +def setup(bot: Bot) -> None: +    """Load the Games cog.""" +    # Check does IGDB API key exist, if not, log warning and don't load cog +    if not Tokens.igdb_client_id: +        logger.warning("No IGDB client ID. Not loading Games cog.") +        return +    if not Tokens.igdb_client_secret: +        logger.warning("No IGDB client secret. Not loading Games cog.") +        return +    bot.add_cog(Games(bot)) diff --git a/bot/exts/fun/magic_8ball.py b/bot/exts/fun/magic_8ball.py new file mode 100644 index 00000000..a7b682ca --- /dev/null +++ b/bot/exts/fun/magic_8ball.py @@ -0,0 +1,30 @@ +import json +import logging +import random +from pathlib import Path + +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) + +ANSWERS = json.loads(Path("bot/resources/fun/magic8ball.json").read_text("utf8")) + + +class Magic8ball(commands.Cog): +    """A Magic 8ball command to respond to a user's question.""" + +    @commands.command(name="8ball") +    async def output_answer(self, ctx: commands.Context, *, question: str) -> None: +        """Return a Magic 8ball answer from answers list.""" +        if len(question.split()) >= 3: +            answer = random.choice(ANSWERS) +            await ctx.send(answer) +        else: +            await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)") + + +def setup(bot: Bot) -> None: +    """Load the Magic8Ball Cog.""" +    bot.add_cog(Magic8ball()) diff --git a/bot/exts/fun/minesweeper.py b/bot/exts/fun/minesweeper.py new file mode 100644 index 00000000..a48b5051 --- /dev/null +++ b/bot/exts/fun/minesweeper.py @@ -0,0 +1,270 @@ +import logging +from collections.abc import Iterator +from dataclasses import dataclass +from random import randint, random +from typing import Union + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Client +from bot.utils.converters import CoordinateConverter +from bot.utils.exceptions import UserNotPlayingError +from bot.utils.extensions import invoke_help_command + +MESSAGE_MAPPING = { +    0: ":stop_button:", +    1: ":one:", +    2: ":two:", +    3: ":three:", +    4: ":four:", +    5: ":five:", +    6: ":six:", +    7: ":seven:", +    8: ":eight:", +    9: ":nine:", +    10: ":keycap_ten:", +    "bomb": ":bomb:", +    "hidden": ":grey_question:", +    "flag": ":flag_black:", +    "x": ":x:" +} + +log = logging.getLogger(__name__) + + +GameBoard = list[list[Union[str, int]]] + + +@dataclass +class Game: +    """The data for a game.""" + +    board: GameBoard +    revealed: GameBoard +    dm_msg: discord.Message +    chat_msg: discord.Message +    activated_on_server: bool + + +class Minesweeper(commands.Cog): +    """Play a game of Minesweeper.""" + +    def __init__(self): +        self.games: dict[int, Game] = {} + +    @commands.group(name="minesweeper", aliases=("ms",), invoke_without_command=True) +    async def minesweeper_group(self, ctx: commands.Context) -> None: +        """Commands for Playing Minesweeper.""" +        await invoke_help_command(ctx) + +    @staticmethod +    def get_neighbours(x: int, y: int) -> Iterator[tuple[int, int]]: +        """Get all the neighbouring x and y including it self.""" +        for x_ in [x - 1, x, x + 1]: +            for y_ in [y - 1, y, y + 1]: +                if x_ != -1 and x_ != 10 and y_ != -1 and y_ != 10: +                    yield x_, y_ + +    def generate_board(self, bomb_chance: float) -> GameBoard: +        """Generate a 2d array for the board.""" +        board: GameBoard = [ +            [ +                "bomb" if random() <= bomb_chance else "number" +                for _ in range(10) +            ] for _ in range(10) +        ] + +        # make sure there is always a free cell +        board[randint(0, 9)][randint(0, 9)] = "number" + +        for y, row in enumerate(board): +            for x, cell in enumerate(row): +                if cell == "number": +                    # calculate bombs near it +                    bombs = 0 +                    for x_, y_ in self.get_neighbours(x, y): +                        if board[y_][x_] == "bomb": +                            bombs += 1 +                    board[y][x] = bombs +        return board + +    @staticmethod +    def format_for_discord(board: GameBoard) -> str: +        """Format the board as a string for Discord.""" +        discord_msg = ( +            ":stop_button:    :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: " +            ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: " +            ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n" +        ) +        rows = [] +        for row_number, row in enumerate(board): +            new_row = f"{MESSAGE_MAPPING[row_number + 1]}    " +            new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row) +            rows.append(new_row) + +        discord_msg += "\n".join(rows) +        return discord_msg + +    @minesweeper_group.command(name="start") +    async def start_command(self, ctx: commands.Context, bomb_chance: float = .2) -> None: +        """Start a game of Minesweeper.""" +        if ctx.author.id in self.games:  # Player is already playing +            await ctx.send(f"{ctx.author.mention} you already have a game running!", delete_after=2) +            await ctx.message.delete(delay=2) +            return + +        try: +            await ctx.author.send( +                f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n" +                f"Close the game with `{Client.prefix}ms end`\n" +            ) +        except discord.errors.Forbidden: +            log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members.") +            await ctx.send(f":x: {ctx.author.mention}, please enable DMs to play minesweeper.") +            return + +        # Add game to list +        board: GameBoard = self.generate_board(bomb_chance) +        revealed_board: GameBoard = [["hidden"] * 10 for _ in range(10)] +        dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}") + +        if ctx.guild: +            await ctx.send(f"{ctx.author.mention} is playing Minesweeper.") +            chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}") +        else: +            chat_msg = None + +        self.games[ctx.author.id] = Game( +            board=board, +            revealed=revealed_board, +            dm_msg=dm_msg, +            chat_msg=chat_msg, +            activated_on_server=ctx.guild is not None +        ) + +    async def update_boards(self, ctx: commands.Context) -> None: +        """Update both playing boards.""" +        game = self.games[ctx.author.id] +        await game.dm_msg.delete() +        game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}") +        if game.activated_on_server: +            await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}") + +    @commands.dm_only() +    @minesweeper_group.command(name="flag") +    async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: +        """Place multiple flags on the board.""" +        if ctx.author.id not in self.games: +            raise UserNotPlayingError +        board: GameBoard = self.games[ctx.author.id].revealed +        for x, y in coordinates: +            if board[y][x] == "hidden": +                board[y][x] = "flag" + +        await self.update_boards(ctx) + +    @staticmethod +    def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None: +        """Reveals all the bombs.""" +        for y, row in enumerate(board): +            for x, cell in enumerate(row): +                if cell == "bomb": +                    revealed[y][x] = cell + +    async def lost(self, ctx: commands.Context) -> None: +        """The player lost the game.""" +        game = self.games[ctx.author.id] +        self.reveal_bombs(game.revealed, game.board) +        await ctx.author.send(":fire: You lost! :fire:") +        if game.activated_on_server: +            await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:") + +    async def won(self, ctx: commands.Context) -> None: +        """The player won the game.""" +        game = self.games[ctx.author.id] +        await ctx.author.send(":tada: You won! :tada:") +        if game.activated_on_server: +            await game.chat_msg.channel.send(f":tada: {ctx.author.mention} just won Minesweeper! :tada:") + +    def reveal_zeros(self, revealed: GameBoard, board: GameBoard, x: int, y: int) -> None: +        """Recursively reveal adjacent cells when a 0 cell is encountered.""" +        for x_, y_ in self.get_neighbours(x, y): +            if revealed[y_][x_] != "hidden": +                continue +            revealed[y_][x_] = board[y_][x_] +            if board[y_][x_] == 0: +                self.reveal_zeros(revealed, board, x_, y_) + +    async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool: +        """Checks if a player has won.""" +        if any( +            revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb" +            for x in range(10) +            for y in range(10) +        ): +            return False +        else: +            await self.won(ctx) +            return True + +    async def reveal_one( +        self, +        ctx: commands.Context, +        revealed: GameBoard, +        board: GameBoard, +        x: int, +        y: int +    ) -> bool: +        """ +        Reveal one square. + +        return is True if the game ended, breaking the loop in `reveal_command` and deleting the game. +        """ +        revealed[y][x] = board[y][x] +        if board[y][x] == "bomb": +            await self.lost(ctx) +            revealed[y][x] = "x"  # mark bomb that made you lose with a x +            return True +        elif board[y][x] == 0: +            self.reveal_zeros(revealed, board, x, y) +        return await self.check_if_won(ctx, revealed, board) + +    @commands.dm_only() +    @minesweeper_group.command(name="reveal") +    async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: +        """Reveal multiple cells.""" +        if ctx.author.id not in self.games: +            raise UserNotPlayingError +        game = self.games[ctx.author.id] +        revealed: GameBoard = game.revealed +        board: GameBoard = game.board + +        for x, y in coordinates: +            # reveal_one returns True if the revealed cell is a bomb or the player won, ending the game +            if await self.reveal_one(ctx, revealed, board, x, y): +                await self.update_boards(ctx) +                del self.games[ctx.author.id] +                break +        else: +            await self.update_boards(ctx) + +    @minesweeper_group.command(name="end") +    async def end_command(self, ctx: commands.Context) -> None: +        """End your current game.""" +        if ctx.author.id not in self.games: +            raise UserNotPlayingError +        game = self.games[ctx.author.id] +        game.revealed = game.board +        await self.update_boards(ctx) +        new_msg = f":no_entry: Game canceled. :no_entry:\n{game.dm_msg.content}" +        await game.dm_msg.edit(content=new_msg) +        if game.activated_on_server: +            await game.chat_msg.edit(content=new_msg) +        del self.games[ctx.author.id] + + +def setup(bot: Bot) -> None: +    """Load the Minesweeper cog.""" +    bot.add_cog(Minesweeper()) diff --git a/bot/exts/fun/movie.py b/bot/exts/fun/movie.py new file mode 100644 index 00000000..a04eeb41 --- /dev/null +++ b/bot/exts/fun/movie.py @@ -0,0 +1,205 @@ +import logging +import random +from enum import Enum +from typing import Any + +from aiohttp import ClientSession +from discord import Embed +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import Tokens +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import ImagePaginator + +# Define base URL of TMDB +BASE_URL = "https://api.themoviedb.org/3/" + +logger = logging.getLogger(__name__) + +# Define movie params, that will be used for every movie request +MOVIE_PARAMS = { +    "api_key": Tokens.tmdb, +    "language": "en-US" +} + + +class MovieGenres(Enum): +    """Movies Genre names and IDs.""" + +    Action = "28" +    Adventure = "12" +    Animation = "16" +    Comedy = "35" +    Crime = "80" +    Documentary = "99" +    Drama = "18" +    Family = "10751" +    Fantasy = "14" +    History = "36" +    Horror = "27" +    Music = "10402" +    Mystery = "9648" +    Romance = "10749" +    Science = "878" +    Thriller = "53" +    Western = "37" + + +class Movie(Cog): +    """Movie Cog contains movies command that grab random movies from TMDB.""" + +    def __init__(self, bot: Bot): +        self.http_session: ClientSession = bot.http_session + +    @group(name="movies", aliases=("movie",), invoke_without_command=True) +    async def movies(self, ctx: Context, genre: str = "", amount: int = 5) -> None: +        """ +        Get random movies by specifying genre. Also support amount parameter, that define how much movies will be shown. + +        Default 5. Use .movies genres to get all available genres. +        """ +        # Check is there more than 20 movies specified, due TMDB return 20 movies +        # per page, so this is max. Also you can't get less movies than 1, just logic +        if amount > 20: +            await ctx.send("You can't get more than 20 movies at once. (TMDB limits)") +            return +        elif amount < 1: +            await ctx.send("You can't get less than 1 movie.") +            return + +        # Capitalize genre for getting data from Enum, get random page, send help when genre don't exist. +        genre = genre.capitalize() +        try: +            result = await self.get_movies_data(self.http_session, MovieGenres[genre].value, 1) +        except KeyError: +            await invoke_help_command(ctx) +            return + +        # Check if "results" is in result. If not, throw error. +        if "results" not in result: +            err_msg = ( +                f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " +                f"{result['status_message']}." +            ) +            await ctx.send(err_msg) +            logger.warning(err_msg) + +        # Get random page. Max page is last page where is movies with this genre. +        page = random.randint(1, result["total_pages"]) + +        # Get movies list from TMDB, check if results key in result. When not, raise error. +        movies = await self.get_movies_data(self.http_session, MovieGenres[genre].value, page) +        if "results" not in movies: +            err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \ +                      f"{result['status_message']}." +            await ctx.send(err_msg) +            logger.warning(err_msg) + +        # Get all pages and embed +        pages = await self.get_pages(self.http_session, movies, amount) +        embed = await self.get_embed(genre) + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @movies.command(name="genres", aliases=("genre", "g")) +    async def genres(self, ctx: Context) -> None: +        """Show all currently available genres for .movies command.""" +        await ctx.send(f"Current available genres: {', '.join('`' + genre.name + '`' for genre in MovieGenres)}") + +    async def get_movies_data(self, client: ClientSession, genre_id: str, page: int) -> list[dict[str, Any]]: +        """Return JSON of TMDB discover request.""" +        # Define params of request +        params = { +            "api_key": Tokens.tmdb, +            "language": "en-US", +            "sort_by": "popularity.desc", +            "include_adult": "false", +            "include_video": "false", +            "page": page, +            "with_genres": genre_id +        } + +        url = BASE_URL + "discover/movie" + +        # Make discover request to TMDB, return result +        async with client.get(url, params=params) as resp: +            return await resp.json() + +    async def get_pages(self, client: ClientSession, movies: dict[str, Any], amount: int) -> list[tuple[str, str]]: +        """Fetch all movie pages from movies dictionary. Return list of pages.""" +        pages = [] + +        for i in range(amount): +            movie_id = movies["results"][i]["id"] +            movie = await self.get_movie(client, movie_id) + +            page, img = await self.create_page(movie) +            pages.append((page, img)) + +        return pages + +    async def get_movie(self, client: ClientSession, movie: int) -> dict[str, Any]: +        """Get Movie by movie ID from TMDB. Return result dictionary.""" +        if not isinstance(movie, int): +            raise ValueError("Error while fetching movie from TMDB, movie argument must be integer. ") +        url = BASE_URL + f"movie/{movie}" + +        async with client.get(url, params=MOVIE_PARAMS) as resp: +            return await resp.json() + +    async def create_page(self, movie: dict[str, Any]) -> tuple[str, str]: +        """Create page from TMDB movie request result. Return formatted page + image.""" +        text = "" + +        # Add title + tagline (if not empty) +        text += f"**{movie['title']}**\n" +        if movie["tagline"]: +            text += f"{movie['tagline']}\n\n" +        else: +            text += "\n" + +        # Add other information +        text += f"**Rating:** {movie['vote_average']}/10 :star:\n" +        text += f"**Release Date:** {movie['release_date']}\n\n" + +        text += "__**Production Information**__\n" + +        companies = movie["production_companies"] +        countries = movie["production_countries"] + +        text += f"**Made by:** {', '.join(company['name'] for company in companies)}\n" +        text += f"**Made in:** {', '.join(country['name'] for country in countries)}\n\n" + +        text += "__**Some Numbers**__\n" + +        budget = f"{movie['budget']:,d}" if movie['budget'] else "?" +        revenue = f"{movie['revenue']:,d}" if movie['revenue'] else "?" + +        if movie["runtime"] is not None: +            duration = divmod(movie["runtime"], 60) +        else: +            duration = ("?", "?") + +        text += f"**Budget:** ${budget}\n" +        text += f"**Revenue:** ${revenue}\n" +        text += f"**Duration:** {f'{duration[0]} hour(s) {duration[1]} minute(s)'}\n\n" + +        text += movie["overview"] + +        img = f"http://image.tmdb.org/t/p/w200{movie['poster_path']}" + +        # Return page content and image +        return text, img + +    async def get_embed(self, name: str) -> Embed: +        """Return embed of random movies. Uses name in title.""" +        embed = Embed(title=f"Random {name} Movies") +        embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") +        embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") +        return embed + + +def setup(bot: Bot) -> None: +    """Load the Movie Cog.""" +    bot.add_cog(Movie(bot)) diff --git a/bot/exts/fun/recommend_game.py b/bot/exts/fun/recommend_game.py new file mode 100644 index 00000000..42c9f7c2 --- /dev/null +++ b/bot/exts/fun/recommend_game.py @@ -0,0 +1,51 @@ +import json +import logging +from pathlib import Path +from random import shuffle + +import discord +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) +game_recs = [] + +# Populate the list `game_recs` with resource files +for rec_path in Path("bot/resources/fun/game_recs").glob("*.json"): +    data = json.loads(rec_path.read_text("utf8")) +    game_recs.append(data) +shuffle(game_recs) + + +class RecommendGame(commands.Cog): +    """Commands related to recommending games.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.index = 0 + +    @commands.command(name="recommendgame", aliases=("gamerec",)) +    async def recommend_game(self, ctx: commands.Context) -> None: +        """Sends an Embed of a random game recommendation.""" +        if self.index >= len(game_recs): +            self.index = 0 +            shuffle(game_recs) +        game = game_recs[self.index] +        self.index += 1 + +        author = self.bot.get_user(int(game["author"])) + +        # Creating and formatting Embed +        embed = discord.Embed(color=discord.Colour.blue()) +        if author is not None: +            embed.set_author(name=author.name, icon_url=author.display_avatar.url) +        embed.set_image(url=game["image"]) +        embed.add_field(name=f"Recommendation: {game['title']}\n{game['link']}", value=game["description"]) + +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Loads the RecommendGame cog.""" +    bot.add_cog(RecommendGame(bot)) diff --git a/bot/exts/fun/rps.py b/bot/exts/fun/rps.py new file mode 100644 index 00000000..c6bbff46 --- /dev/null +++ b/bot/exts/fun/rps.py @@ -0,0 +1,57 @@ +from random import choice + +from discord.ext import commands + +from bot.bot import Bot + +CHOICES = ["rock", "paper", "scissors"] +SHORT_CHOICES = ["r", "p", "s"] + +# Using a dictionary instead of conditions to check for the winner. +WINNER_DICT = { +    "r": { +        "r": 0, +        "p": -1, +        "s": 1, +    }, +    "p": { +        "r": 1, +        "p": 0, +        "s": -1, +    }, +    "s": { +        "r": -1, +        "p": 1, +        "s": 0, +    } +} + + +class RPS(commands.Cog): +    """Rock Paper Scissors. The Classic Game!""" + +    @commands.command(case_insensitive=True) +    async def rps(self, ctx: commands.Context, move: str) -> None: +        """Play the classic game of Rock Paper Scissors with your own sir-lancebot!""" +        move = move.lower() +        player_mention = ctx.author.mention + +        if move not in CHOICES and move not in SHORT_CHOICES: +            raise commands.BadArgument(f"Invalid move. Please make move from options: {', '.join(CHOICES).upper()}.") + +        bot_move = choice(CHOICES) +        # value of player_result will be from (-1, 0, 1) as (lost, tied, won). +        player_result = WINNER_DICT[move[0]][bot_move[0]] + +        if player_result == 0: +            message_string = f"{player_mention} You and Sir Lancebot played {bot_move}, it's a tie." +            await ctx.send(message_string) +        elif player_result == 1: +            await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} won!") +        else: +            await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} lost!") + + +def setup(bot: Bot) -> None: +    """Load the RPS Cog.""" +    bot.add_cog(RPS(bot)) diff --git a/bot/exts/fun/snakes/__init__.py b/bot/exts/fun/snakes/__init__.py new file mode 100644 index 00000000..ba8333fd --- /dev/null +++ b/bot/exts/fun/snakes/__init__.py @@ -0,0 +1,11 @@ +import logging + +from bot.bot import Bot +from bot.exts.fun.snakes._snakes_cog import Snakes + +log = logging.getLogger(__name__) + + +def setup(bot: Bot) -> None: +    """Load the Snakes Cog.""" +    bot.add_cog(Snakes(bot)) diff --git a/bot/exts/fun/snakes/_converter.py b/bot/exts/fun/snakes/_converter.py new file mode 100644 index 00000000..c24ba8c6 --- /dev/null +++ b/bot/exts/fun/snakes/_converter.py @@ -0,0 +1,82 @@ +import json +import logging +import random +from collections.abc import Iterable + +import discord +from discord.ext.commands import Context, Converter +from rapidfuzz import fuzz + +from bot.exts.fun.snakes._utils import SNAKE_RESOURCES +from bot.utils import disambiguate + +log = logging.getLogger(__name__) + + +class Snake(Converter): +    """Snake converter for the Snakes Cog.""" + +    snakes = None +    special_cases = None + +    async def convert(self, ctx: Context, name: str) -> str: +        """Convert the input snake name to the closest matching Snake object.""" +        await self.build_list() +        name = name.lower() + +        if name == "python": +            return "Python (programming language)" + +        def get_potential(iterable: Iterable, *, threshold: int = 80) -> list[str]: +            nonlocal name +            potential = [] + +            for item in iterable: +                original, item = item, item.lower() + +                if name == item: +                    return [original] + +                a, b = fuzz.ratio(name, item), fuzz.partial_ratio(name, item) +                if a >= threshold or b >= threshold: +                    potential.append(original) + +            return potential + +        # Handle special cases +        if name.lower() in self.special_cases: +            return self.special_cases.get(name.lower(), name.lower()) + +        names = {snake["name"]: snake["scientific"] for snake in self.snakes} +        all_names = names.keys() | names.values() +        timeout = len(all_names) * (3 / 4) + +        embed = discord.Embed( +            title="Found multiple choices. Please choose the correct one.", colour=0x59982F) +        embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url) + +        name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed) +        return names.get(name, name) + +    @classmethod +    async def build_list(cls) -> None: +        """Build list of snakes from the static snake resources.""" +        # Get all the snakes +        if cls.snakes is None: +            cls.snakes = json.loads((SNAKE_RESOURCES / "snake_names.json").read_text("utf8")) +        # Get the special cases +        if cls.special_cases is None: +            special_cases = json.loads((SNAKE_RESOURCES / "special_snakes.json").read_text("utf8")) +            cls.special_cases = {snake["name"].lower(): snake for snake in special_cases} + +    @classmethod +    async def random(cls) -> str: +        """ +        Get a random Snake from the loaded resources. + +        This is stupid. We should find a way to somehow get the global session into a global context, +        so I can get it from here. +        """ +        await cls.build_list() +        names = [snake["scientific"] for snake in cls.snakes] +        return random.choice(names) diff --git a/bot/exts/fun/snakes/_snakes_cog.py b/bot/exts/fun/snakes/_snakes_cog.py new file mode 100644 index 00000000..59e57199 --- /dev/null +++ b/bot/exts/fun/snakes/_snakes_cog.py @@ -0,0 +1,1151 @@ +import asyncio +import colorsys +import logging +import os +import random +import re +import string +import textwrap +import urllib +from functools import partial +from io import BytesIO +from typing import Any, Optional + +import async_timeout +from PIL import Image, ImageDraw, ImageFont +from discord import Colour, Embed, File, Member, Message, Reaction +from discord.errors import HTTPException +from discord.ext.commands import Cog, CommandError, Context, bot_has_permissions, group + +from bot.bot import Bot +from bot.constants import ERROR_REPLIES, Tokens +from bot.exts.fun.snakes import _utils as utils +from bot.exts.fun.snakes._converter import Snake +from bot.utils.decorators import locked +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) + + +# region: Constants +# Color +SNAKE_COLOR = 0x399600 + +# Antidote constants +SYRINGE_EMOJI = "\U0001F489"  # :syringe: +PILL_EMOJI = "\U0001F48A"     # :pill: +HOURGLASS_EMOJI = "\u231B"    # :hourglass: +CROSSBONES_EMOJI = "\u2620"   # :skull_crossbones: +ALEMBIC_EMOJI = "\u2697"      # :alembic: +TICK_EMOJI = "\u2705"         # :white_check_mark: - Correct peg, correct hole +CROSS_EMOJI = "\u274C"        # :x: - Wrong peg, wrong hole +BLANK_EMOJI = "\u26AA"        # :white_circle: - Correct peg, wrong hole +HOLE_EMOJI = "\u2B1C"         # :white_square: - Used in guesses +EMPTY_UNICODE = "\u200b"      # literally just an empty space + +ANTIDOTE_EMOJI = ( +    SYRINGE_EMOJI, +    PILL_EMOJI, +    HOURGLASS_EMOJI, +    CROSSBONES_EMOJI, +    ALEMBIC_EMOJI, +) + +# Quiz constants +ANSWERS_EMOJI = { +    "a": "\U0001F1E6",  # :regional_indicator_a: 🇦 +    "b": "\U0001F1E7",  # :regional_indicator_b: 🇧 +    "c": "\U0001F1E8",  # :regional_indicator_c: 🇨 +    "d": "\U0001F1E9",  # :regional_indicator_d: 🇩 +} + +ANSWERS_EMOJI_REVERSE = { +    "\U0001F1E6": "A",  # :regional_indicator_a: 🇦 +    "\U0001F1E7": "B",  # :regional_indicator_b: 🇧 +    "\U0001F1E8": "C",  # :regional_indicator_c: 🇨 +    "\U0001F1E9": "D",  # :regional_indicator_d: 🇩 +} + +# Zzzen of pythhhon constant +ZEN = """ +Beautiful is better than ugly. +Explicit is better than implicit. +Simple is better than complex. +Complex is better than complicated. +Flat is better than nested. +Sparse is better than dense. +Readability counts. +Special cases aren't special enough to break the rules. +Although practicality beats purity. +Errors should never pass silently. +Unless explicitly silenced. +In the face of ambiguity, refuse the temptation to guess. +There should be one-- and preferably only one --obvious way to do it. +Now is better than never. +Although never is often better than *right* now. +If the implementation is hard to explain, it's a bad idea. +If the implementation is easy to explain, it may be a good idea. +""" + +# Max messages to train snake_chat on +MSG_MAX = 100 + +# get_snek constants +URL = "https://en.wikipedia.org/w/api.php?" + +# snake guess responses +INCORRECT_GUESS = ( +    "Nope, that's not what it is.", +    "Not quite.", +    "Not even close.", +    "Terrible guess.", +    "Nnnno.", +    "Dude. No.", +    "I thought everyone knew this one.", +    "Guess you suck at snakes.", +    "Bet you feel stupid now.", +    "Hahahaha, no.", +    "Did you hit the wrong key?" +) + +CORRECT_GUESS = ( +    "**WRONG**. Wait, no, actually you're right.", +    "Yeah, you got it!", +    "Yep, that's exactly what it is.", +    "Uh-huh. Yep yep yep.", +    "Yeah that's right.", +    "Yup. How did you know that?", +    "Are you a herpetologist?", +    "Sure, okay, but I bet you can't pronounce it.", +    "Are you cheating?" +) + +# snake card consts +CARD = { +    "top": Image.open("bot/resources/fun/snakes/snake_cards/card_top.png"), +    "frame": Image.open("bot/resources/fun/snakes/snake_cards/card_frame.png"), +    "bottom": Image.open("bot/resources/fun/snakes/snake_cards/card_bottom.png"), +    "backs": [ +        Image.open(f"bot/resources/fun/snakes/snake_cards/backs/{file}") +        for file in os.listdir("bot/resources/fun/snakes/snake_cards/backs") +    ], +    "font": ImageFont.truetype("bot/resources/fun/snakes/snake_cards/expressway.ttf", 20) +} +# endregion + + +class Snakes(Cog): +    """ +    Commands related to snakes, created by our community during the first code jam. + +    More information can be found in the code-jam-1 repo. + +    https://github.com/python-discord/code-jam-1 +    """ + +    wiki_brief = re.compile(r"(.*?)(=+ (.*?) =+)", flags=re.DOTALL) +    valid_image_extensions = ("gif", "png", "jpeg", "jpg", "webp") + +    def __init__(self, bot: Bot): +        self.active_sal = {} +        self.bot = bot +        self.snake_names = utils.get_resource("snake_names") +        self.snake_idioms = utils.get_resource("snake_idioms") +        self.snake_quizzes = utils.get_resource("snake_quiz") +        self.snake_facts = utils.get_resource("snake_facts") +        self.num_movie_pages = None + +    # region: Helper methods +    @staticmethod +    def _beautiful_pastel(hue: float) -> int: +        """Returns random bright pastels.""" +        light = random.uniform(0.7, 0.85) +        saturation = 1 + +        rgb = colorsys.hls_to_rgb(hue, light, saturation) +        hex_rgb = "" + +        for part in rgb: +            value = int(part * 0xFF) +            hex_rgb += f"{value:02x}" + +        return int(hex_rgb, 16) + +    @staticmethod +    def _generate_card(buffer: BytesIO, content: dict) -> BytesIO: +        """ +        Generate a card from snake information. + +        Written by juan and Someone during the first code jam. +        """ +        snake = Image.open(buffer) + +        # Get the size of the snake icon, configure the height of the image box (yes, it changes) +        icon_width = 347  # Hardcoded, not much i can do about that +        icon_height = int((icon_width / snake.width) * snake.height) +        frame_copies = icon_height // CARD["frame"].height + 1 +        snake.thumbnail((icon_width, icon_height)) + +        # Get the dimensions of the final image +        main_height = icon_height + CARD["top"].height + CARD["bottom"].height +        main_width = CARD["frame"].width + +        # Start creating the foreground +        foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) +        foreground.paste(CARD["top"], (0, 0)) + +        # Generate the frame borders to the correct height +        for offset in range(frame_copies): +            position = (0, CARD["top"].height + offset * CARD["frame"].height) +            foreground.paste(CARD["frame"], position) + +        # Add the image and bottom part of the image +        foreground.paste(snake, (36, CARD["top"].height))  # Also hardcoded :( +        foreground.paste(CARD["bottom"], (0, CARD["top"].height + icon_height)) + +        # Setup the background +        back = random.choice(CARD["backs"]) +        back_copies = main_height // back.height + 1 +        full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) + +        # Generate the tiled background +        for offset in range(back_copies): +            full_image.paste(back, (16, 16 + offset * back.height)) + +        # Place the foreground onto the final image +        full_image.paste(foreground, (0, 0), foreground) + +        # Get the first two sentences of the info +        description = ".".join(content["info"].split(".")[:2]) + "." + +        # Setup positioning variables +        margin = 36 +        offset = CARD["top"].height + icon_height + margin + +        # Create blank rectangle image which will be behind the text +        rectangle = Image.new( +            "RGBA", +            (main_width, main_height), +            (0, 0, 0, 0) +        ) + +        # Draw a semi-transparent rectangle on it +        rect = ImageDraw.Draw(rectangle) +        rect.rectangle( +            (margin, offset, main_width - margin, main_height - margin), +            fill=(63, 63, 63, 128) +        ) + +        # Paste it onto the final image +        full_image.paste(rectangle, (0, 0), mask=rectangle) + +        # Draw the text onto the final image +        draw = ImageDraw.Draw(full_image) +        for line in textwrap.wrap(description, 36): +            draw.text([margin + 4, offset], line, font=CARD["font"]) +            offset += CARD["font"].getsize(line)[1] + +        # Get the image contents as a BufferIO object +        buffer = BytesIO() +        full_image.save(buffer, "PNG") +        buffer.seek(0) + +        return buffer + +    @staticmethod +    def _snakify(message: str) -> str: +        """Sssnakifffiesss a sstring.""" +        # Replace fricatives with exaggerated snake fricatives. +        simple_fricatives = [ +            "f", "s", "z", "h", +            "F", "S", "Z", "H", +        ] +        complex_fricatives = [ +            "th", "sh", "Th", "Sh" +        ] + +        for letter in simple_fricatives: +            if letter.islower(): +                message = message.replace(letter, letter * random.randint(2, 4)) +            else: +                message = message.replace(letter, (letter * random.randint(2, 4)).title()) + +        for fricative in complex_fricatives: +            message = message.replace(fricative, fricative[0] + fricative[1] * random.randint(2, 4)) + +        return message + +    async def _fetch(self, url: str, params: Optional[dict] = None) -> dict: +        """Asynchronous web request helper method.""" +        if params is None: +            params = {} + +        async with async_timeout.timeout(10): +            async with self.bot.http_session.get(url, params=params) as response: +                return await response.json() + +    def _get_random_long_message(self, messages: list[str], retries: int = 10) -> str: +        """ +        Fetch a message that's at least 3 words long, if possible to do so in retries attempts. + +        Else, just return whatever the last message is. +        """ +        long_message = random.choice(messages) +        if len(long_message.split()) < 3 and retries > 0: +            return self._get_random_long_message( +                messages, +                retries=retries - 1 +            ) + +        return long_message + +    async def _get_snek(self, name: str) -> dict[str, Any]: +        """ +        Fetches all the data from a wikipedia article about a snake. + +        Builds a dict that the .get() method can use. + +        Created by Ava and eivl. +        """ +        snake_info = {} + +        params = { +            "format": "json", +            "action": "query", +            "list": "search", +            "srsearch": name, +            "utf8": "", +            "srlimit": "1", +        } + +        json = await self._fetch(URL, params=params) + +        # Wikipedia does have a error page +        try: +            pageid = json["query"]["search"][0]["pageid"] +        except KeyError: +            # Wikipedia error page ID(?) +            pageid = 41118 +        except IndexError: +            return None + +        params = { +            "format": "json", +            "action": "query", +            "prop": "extracts|images|info", +            "exlimit": "max", +            "explaintext": "", +            "inprop": "url", +            "pageids": pageid +        } + +        json = await self._fetch(URL, params=params) + +        # Constructing dict - handle exceptions later +        try: +            snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"] +            snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"] +            snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"] +            snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"] +            snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"] +        except KeyError: +            snake_info["error"] = True + +        if snake_info["images"]: +            i_url = "https://commons.wikimedia.org/wiki/Special:FilePath/" +            image_list = [] +            map_list = [] +            thumb_list = [] + +            # Wikipedia has arbitrary images that are not snakes +            banned = [ +                "Commons-logo.svg", +                "Red%20Pencil%20Icon.png", +                "distribution", +                "The%20Death%20of%20Cleopatra%20arthur.jpg", +                "Head%20of%20holotype", +                "locator", +                "Woma.png", +                "-map.", +                ".svg", +                "ange.", +                "Adder%20(PSF).png" +            ] + +            for image in snake_info["images"]: +                # Images come in the format of `File:filename.extension` +                file, sep, filename = image["title"].partition(":") +                filename = filename.replace(" ", "%20")  # Wikipedia returns good data! + +                if not filename.startswith("Map"): +                    if any(ban in filename for ban in banned): +                        pass +                    else: +                        image_list.append(f"{i_url}{filename}") +                        thumb_list.append(f"{i_url}{filename}?width=100") +                else: +                    map_list.append(f"{i_url}{filename}") + +        snake_info["image_list"] = image_list +        snake_info["map_list"] = map_list +        snake_info["thumb_list"] = thumb_list +        snake_info["name"] = name + +        match = self.wiki_brief.match(snake_info["extract"]) +        info = match.group(1) if match else None + +        if info: +            info = info.replace("\n", "\n\n")  # Give us some proper paragraphs. + +        snake_info["info"] = info + +        return snake_info + +    async def _get_snake_name(self) -> dict[str, str]: +        """Gets a random snake name.""" +        return random.choice(self.snake_names) + +    async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: dict[str, str]) -> None: +        """Validate the answer using a reaction event loop.""" +        def predicate(reaction: Reaction, user: Member) -> bool: +            """Test if the the answer is valid and can be evaluated.""" +            return ( +                reaction.message.id == message.id                  # The reaction is attached to the question we asked. +                and user == ctx.author                             # It's the user who triggered the quiz. +                and str(reaction.emoji) in ANSWERS_EMOJI.values()  # The reaction is one of the options. +            ) + +        for emoji in ANSWERS_EMOJI.values(): +            await message.add_reaction(emoji) + +        # Validate the answer +        try: +            reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate) +        except asyncio.TimeoutError: +            await ctx.send(f"You took too long. The correct answer was **{options[answer]}**.") +            await message.clear_reactions() +            return + +        if str(reaction.emoji) == ANSWERS_EMOJI[answer]: +            await ctx.send(f"{random.choice(CORRECT_GUESS)} The correct answer was **{options[answer]}**.") +        else: +            await ctx.send( +                f"{random.choice(INCORRECT_GUESS)} The correct answer was **{options[answer]}**." +            ) + +        await message.clear_reactions() +    # endregion + +    # region: Commands +    @group(name="snakes", aliases=("snake",), invoke_without_command=True) +    async def snakes_group(self, ctx: Context) -> None: +        """Commands from our first code jam.""" +        await invoke_help_command(ctx) + +    @bot_has_permissions(manage_messages=True) +    @snakes_group.command(name="antidote") +    @locked() +    async def antidote_command(self, ctx: Context) -> None: +        """ +        Antidote! Can you create the antivenom before the patient dies? + +        Rules:  You have 4 ingredients for each antidote, you only have 10 attempts +                Once you synthesize the antidote, you will be presented with 4 markers +                Tick: This means you have a CORRECT ingredient in the CORRECT position +                Circle: This means you have a CORRECT ingredient in the WRONG position +                Cross: This means you have a WRONG ingredient in the WRONG position + +        Info:   The game automatically ends after 5 minutes inactivity. +                You should only use each ingredient once. + +        This game was created by Lord Bisk and Runew0lf. +        """ +        def predicate(reaction_: Reaction, user_: Member) -> bool: +            """Make sure that this reaction is what we want to operate on.""" +            return ( +                all(( +                    # Reaction is on this message +                    reaction_.message.id == board_id.id, +                    # Reaction is one of the pagination emotes +                    reaction_.emoji in ANTIDOTE_EMOJI, +                    # Reaction was not made by the Bot +                    user_.id != self.bot.user.id, +                    # Reaction was made by author +                    user_.id == ctx.author.id +                )) +            ) + +        # Initialize variables +        antidote_tries = 0 +        antidote_guess_count = 0 +        antidote_guess_list = [] +        guess_result = [] +        board = [] +        page_guess_list = [] +        page_result_list = [] +        win = False + +        antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") +        antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) + +        # Generate answer +        antidote_answer = list(ANTIDOTE_EMOJI)  # Duplicate list, not reference it +        random.shuffle(antidote_answer) +        antidote_answer.pop() + +        # Begin initial board building +        for i in range(0, 10): +            page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}") +            page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}") +            board.append( +                f"`{i+1:02d}` " +                f"{page_guess_list[i]} - " +                f"{page_result_list[i]}" +            ) +            board.append(EMPTY_UNICODE) +        antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board)) +        board_id = await ctx.send(embed=antidote_embed)  # Display board + +        # Add our player reactions +        for emoji in ANTIDOTE_EMOJI: +            await board_id.add_reaction(emoji) + +        # Begin main game loop +        while not win and antidote_tries < 10: +            try: +                reaction, user = await ctx.bot.wait_for( +                    "reaction_add", timeout=300, check=predicate) +            except asyncio.TimeoutError: +                log.debug("Antidote timed out waiting for a reaction") +                break  # We're done, no reactions for the last 5 minutes + +            if antidote_tries < 10: +                if antidote_guess_count < 4: +                    if reaction.emoji in ANTIDOTE_EMOJI: +                        antidote_guess_list.append(reaction.emoji) +                        antidote_guess_count += 1 + +                    if antidote_guess_count == 4:  # Guesses complete +                        antidote_guess_count = 0 +                        page_guess_list[antidote_tries] = " ".join(antidote_guess_list) + +                        # Now check guess +                        for i in range(0, len(antidote_answer)): +                            if antidote_guess_list[i] == antidote_answer[i]: +                                guess_result.append(TICK_EMOJI) +                            elif antidote_guess_list[i] in antidote_answer: +                                guess_result.append(BLANK_EMOJI) +                            else: +                                guess_result.append(CROSS_EMOJI) +                        guess_result.sort() +                        page_result_list[antidote_tries] = " ".join(guess_result) + +                        # Rebuild the board +                        board = [] +                        for i in range(0, 10): +                            board.append(f"`{i+1:02d}` " +                                         f"{page_guess_list[i]} - " +                                         f"{page_result_list[i]}") +                            board.append(EMPTY_UNICODE) + +                        # Remove Reactions +                        for emoji in antidote_guess_list: +                            await board_id.remove_reaction(emoji, user) + +                        if antidote_guess_list == antidote_answer: +                            win = True + +                        antidote_tries += 1 +                        guess_result = [] +                        antidote_guess_list = [] + +                        antidote_embed.clear_fields() +                        antidote_embed.add_field(name=f"{10 - antidote_tries} " +                                                      f"guesses remaining", +                                                 value="\n".join(board)) +                        # Redisplay the board +                        await board_id.edit(embed=antidote_embed) + +        # Winning / Ending Screen +        if win is True: +            antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") +            antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) +            antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif") +            antidote_embed.add_field(name="You have created the snake antidote!", +                                     value=f"The solution was: {' '.join(antidote_answer)}\n" +                                           f"You had {10 - antidote_tries} tries remaining.") +            await board_id.edit(embed=antidote_embed) +        else: +            antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") +            antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) +            antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif") +            antidote_embed.add_field( +                name=EMPTY_UNICODE, +                value=( +                    f"Sorry you didnt make the antidote in time.\n" +                    f"The formula was {' '.join(antidote_answer)}" +                ) +            ) +            await board_id.edit(embed=antidote_embed) + +        log.debug("Ending pagination and removing all reactions...") +        await board_id.clear_reactions() + +    @snakes_group.command(name="draw") +    async def draw_command(self, ctx: Context) -> None: +        """ +        Draws a random snek using Perlin noise. + +        Written by Momo and kel. +        Modified by juan and lemon. +        """ +        with ctx.typing(): + +            # Generate random snake attributes +            width = random.randint(6, 10) +            length = random.randint(15, 22) +            random_hue = random.random() +            snek_color = self._beautiful_pastel(random_hue) +            text_color = self._beautiful_pastel((random_hue + 0.5) % 1) +            bg_color = ( +                random.randint(32, 50), +                random.randint(32, 50), +                random.randint(50, 70), +            ) + +            # Build and send the snek +            text = random.choice(self.snake_idioms)["idiom"] +            factory = utils.PerlinNoiseFactory(dimension=1, octaves=2) +            image_frame = utils.create_snek_frame( +                factory, +                snake_width=width, +                snake_length=length, +                snake_color=snek_color, +                text=text, +                text_color=text_color, +                bg_color=bg_color +            ) +            png_bytes = utils.frame_to_png_bytes(image_frame) +            file = File(png_bytes, filename="snek.png") +            await ctx.send(file=file) + +    @snakes_group.command(name="get") +    @bot_has_permissions(manage_messages=True) +    @locked() +    async def get_command(self, ctx: Context, *, name: Snake = None) -> None: +        """ +        Fetches information about a snake from Wikipedia. + +        Created by Ava and eivl. +        """ +        with ctx.typing(): +            if name is None: +                name = await Snake.random() + +            if isinstance(name, dict): +                data = name +            else: +                data = await self._get_snek(name) + +            if data.get("error"): +                await ctx.send("Could not fetch data from Wikipedia.") +                return + +            description = data["info"] + +            # Shorten the description if needed +            if len(description) > 1000: +                description = description[:1000] +                last_newline = description.rfind("\n") +                if last_newline > 0: +                    description = description[:last_newline] + +            # Strip and add the Wiki link. +            if "fullurl" in data: +                description = description.strip("\n") +                description += f"\n\nRead more on [Wikipedia]({data['fullurl']})" + +            # Build and send the embed. +            embed = Embed( +                title=data.get("title", data.get("name")), +                description=description, +                colour=0x59982F, +            ) + +            emoji = "https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png" + +            _iter = ( +                url +                for url in data["image_list"] +                if url.endswith(self.valid_image_extensions) +            ) +            image = next(_iter, emoji) + +            embed.set_image(url=image) + +            await ctx.send(embed=embed) + +    @snakes_group.command(name="guess", aliases=("identify",)) +    @locked() +    async def guess_command(self, ctx: Context) -> None: +        """ +        Snake identifying game. + +        Made by Ava and eivl. +        Modified by lemon. +        """ +        with ctx.typing(): + +            image = None + +            while image is None: +                snakes = [await Snake.random() for _ in range(4)] +                snake = random.choice(snakes) +                answer = "abcd"[snakes.index(snake)] + +                data = await self._get_snek(snake) + +                _iter = ( +                    url +                    for url in data["image_list"] +                    if url.endswith(self.valid_image_extensions) +                ) +                image = next(_iter, None) + +            embed = Embed( +                title="Which of the following is the snake in the image?", +                description="\n".join( +                    f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes), +                colour=SNAKE_COLOR +            ) +            embed.set_image(url=image) + +        guess = await ctx.send(embed=embed) +        options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes} +        await self._validate_answer(ctx, guess, answer, options) + +    @snakes_group.command(name="hatch") +    async def hatch_command(self, ctx: Context) -> None: +        """ +        Hatches your personal snake. + +        Written by Momo and kel. +        """ +        # Pick a random snake to hatch. +        snake_name = random.choice(list(utils.snakes.keys())) +        snake_image = utils.snakes[snake_name] + +        # Hatch the snake +        message = await ctx.send(embed=Embed(description="Hatching your snake :snake:...")) +        await asyncio.sleep(1) + +        for stage in utils.stages: +            hatch_embed = Embed(description=stage) +            await message.edit(embed=hatch_embed) +            await asyncio.sleep(1) +        await asyncio.sleep(1) +        await message.delete() + +        # Build and send the embed. +        my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name)) +        my_snake_embed.set_thumbnail(url=snake_image) +        my_snake_embed.set_footer( +            text=" Owner: {0}#{1}".format(ctx.author.name, ctx.author.discriminator) +        ) + +        await ctx.send(embed=my_snake_embed) + +    @snakes_group.command(name="movie") +    async def movie_command(self, ctx: Context) -> None: +        """ +        Gets a random snake-related movie from TMDB. + +        Written by Samuel. +        Modified by gdude. +        Modified by Will Da Silva. +        """ +        # Initially 8 pages are fetched. The actual number of pages is set after the first request. +        page = random.randint(1, self.num_movie_pages or 8) + +        async with ctx.typing(): +            response = await self.bot.http_session.get( +                "https://api.themoviedb.org/3/search/movie", +                params={ +                    "query": "snake", +                    "page": page, +                    "language": "en-US", +                    "api_key": Tokens.tmdb, +                } +            ) +            data = await response.json() +            if self.num_movie_pages is None: +                self.num_movie_pages = data["total_pages"] +            movie = random.choice(data["results"])["id"] + +            response = await self.bot.http_session.get( +                f"https://api.themoviedb.org/3/movie/{movie}", +                params={ +                    "language": "en-US", +                    "api_key": Tokens.tmdb, +                } +            ) +            data = await response.json() + +        embed = Embed(title=data["title"], color=SNAKE_COLOR) + +        if data["poster_path"] is not None: +            embed.set_image(url=f"https://images.tmdb.org/t/p/original{data['poster_path']}") + +        if data["overview"]: +            embed.add_field(name="Overview", value=data["overview"]) + +        if data["release_date"]: +            embed.add_field(name="Release Date", value=data["release_date"]) + +        if data["genres"]: +            embed.add_field(name="Genres", value=", ".join([x["name"] for x in data["genres"]])) + +        if data["vote_count"]: +            embed.add_field(name="Rating", value=f"{data['vote_average']}/10 ({data['vote_count']} votes)", inline=True) + +        if data["budget"] and data["revenue"]: +            embed.add_field(name="Budget", value=data["budget"], inline=True) +            embed.add_field(name="Revenue", value=data["revenue"], inline=True) + +        embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") +        embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") + +        try: +            await ctx.send(embed=embed) +        except HTTPException as err: +            await ctx.send("An error occurred while fetching a snake-related movie!") +            raise err from None + +    @snakes_group.command(name="quiz") +    @locked() +    async def quiz_command(self, ctx: Context) -> None: +        """ +        Asks a snake-related question in the chat and validates the user's guess. + +        This was created by Mushy and Cardium, +        and modified by Urthas and lemon. +        """ +        # Prepare a question. +        question = random.choice(self.snake_quizzes) +        answer = question["answerkey"] +        options = {key: question["options"][key] for key in ANSWERS_EMOJI.keys()} + +        # Build and send the embed. +        embed = Embed( +            color=SNAKE_COLOR, +            title=question["question"], +            description="\n".join( +                [f"**{key.upper()}**: {answer}" for key, answer in options.items()] +            ) +        ) + +        quiz = await ctx.send(embed=embed) +        await self._validate_answer(ctx, quiz, answer, options) + +    @snakes_group.command(name="name", aliases=("name_gen",)) +    async def name_command(self, ctx: Context, *, name: str = None) -> None: +        """ +        Snakifies a username. + +        Slices the users name at the last vowel (or second last if the name +        ends with a vowel), and then combines it with a random snake name, +        which is sliced at the first vowel (or second if the name starts with +        a vowel). + +        If the name contains no vowels, it just appends the snakename +        to the end of the name. + +        Examples: +            lemon + anaconda = lemoconda +            krzsn + anaconda = krzsnconda +            gdude + anaconda = gduconda +            aperture + anaconda = apertuconda +            lucy + python = luthon +            joseph + taipan = joseipan + +        This was written by Iceman, and modified for inclusion into the bot by lemon. +        """ +        snake_name = await self._get_snake_name() +        snake_name = snake_name["name"] +        snake_prefix = "" + +        # Set aside every word in the snake name except the last. +        if " " in snake_name: +            snake_prefix = " ".join(snake_name.split()[:-1]) +            snake_name = snake_name.split()[-1] + +        # If no name is provided, use whoever called the command. +        if name: +            user_name = name +        else: +            user_name = ctx.author.display_name + +        # Get the index of the vowel to slice the username at +        user_slice_index = len(user_name) +        for index, char in enumerate(reversed(user_name)): +            if index == 0: +                continue +            if char.lower() in "aeiouy": +                user_slice_index -= index +                break + +        # Now, get the index of the vowel to slice the snake_name at +        snake_slice_index = 0 +        for index, char in enumerate(snake_name): +            if index == 0: +                continue +            if char.lower() in "aeiouy": +                snake_slice_index = index + 1 +                break + +        # Combine! +        snake_name = snake_name[snake_slice_index:] +        user_name = user_name[:user_slice_index] +        result = f"{snake_prefix} {user_name}{snake_name}" +        result = string.capwords(result) + +        # Embed and send +        embed = Embed( +            title="Snake name", +            description=f"Your snake-name is **{result}**", +            color=SNAKE_COLOR +        ) + +        await ctx.send(embed=embed) +        return + +    @snakes_group.command(name="sal") +    @locked() +    async def sal_command(self, ctx: Context) -> None: +        """ +        Play a game of Snakes and Ladders. + +        Written by Momo and kel. +        Modified by lemon. +        """ +        # Check if there is already a game in this channel +        if ctx.channel in self.active_sal: +            await ctx.send(f"{ctx.author.mention} A game is already in progress in this channel.") +            return + +        game = utils.SnakeAndLaddersGame(snakes=self, context=ctx) +        self.active_sal[ctx.channel] = game + +        await game.open_game() + +    @snakes_group.command(name="about") +    async def about_command(self, ctx: Context) -> None: +        """Show an embed with information about the event, its participants, and its winners.""" +        contributors = [ +            "<@!245270749919576066>", +            "<@!396290259907903491>", +            "<@!172395097705414656>", +            "<@!361708843425726474>", +            "<@!300302216663793665>", +            "<@!210248051430916096>", +            "<@!174588005745557505>", +            "<@!87793066227822592>", +            "<@!211619754039967744>", +            "<@!97347867923976192>", +            "<@!136081839474343936>", +            "<@!263560579770220554>", +            "<@!104749643715387392>", +            "<@!303940835005825024>", +        ] + +        embed = Embed( +            title="About the snake cog", +            description=( +                "The features in this cog were created by members of the community " +                "during our first ever " +                "[code jam event](https://pythondiscord.com/pages/code-jams/code-jam-1-snakes-bot/). \n\n" +                "The event saw over 50 participants, who competed to write a discord bot cog with a snake theme over " +                "48 hours. The staff then selected the best features from all the best teams, and made modifications " +                "to ensure they would all work together before integrating them into the community bot.\n\n" +                "It was a tight race, but in the end, <@!104749643715387392> and <@!303940835005825024> " +                f"walked away as grand champions. Make sure you check out `{ctx.prefix}snakes sal`," +                f"`{ctx.prefix}snakes draw` and `{ctx.prefix}snakes hatch` " +                "to see what they came up with." +            ) +        ) + +        embed.add_field( +            name="Contributors", +            value=( +                ", ".join(contributors) +            ) +        ) + +        await ctx.send(embed=embed) + +    @snakes_group.command(name="card") +    async def card_command(self, ctx: Context, *, name: Snake = None) -> None: +        """ +        Create an interesting little card from a snake. + +        Created by juan and Someone during the first code jam. +        """ +        # Get the snake data we need +        if not name: +            name_obj = await self._get_snake_name() +            name = name_obj["scientific"] +            content = await self._get_snek(name) + +        elif isinstance(name, dict): +            content = name + +        else: +            content = await self._get_snek(name) + +        # Make the card +        async with ctx.typing(): + +            stream = BytesIO() +            async with async_timeout.timeout(10): +                async with self.bot.http_session.get(content["image_list"][0]) as response: +                    stream.write(await response.read()) + +            stream.seek(0) + +            func = partial(self._generate_card, stream, content) +            final_buffer = await self.bot.loop.run_in_executor(None, func) + +        # Send it! +        await ctx.send( +            f"A wild {content['name'].title()} appears!", +            file=File(final_buffer, filename=content["name"].replace(" ", "") + ".png") +        ) + +    @snakes_group.command(name="fact") +    async def fact_command(self, ctx: Context) -> None: +        """ +        Gets a snake-related fact. + +        Written by Andrew and Prithaj. +        Modified by lemon. +        """ +        question = random.choice(self.snake_facts)["fact"] +        embed = Embed( +            title="Snake fact", +            color=SNAKE_COLOR, +            description=question +        ) +        await ctx.send(embed=embed) + +    @snakes_group.command(name="snakify") +    async def snakify_command(self, ctx: Context, *, message: str = None) -> None: +        """ +        How would I talk if I were a snake? + +        If `message` is passed, the bot will snakify the message. +        Otherwise, a random message from the user's history is snakified. + +        Written by Momo and kel. +        Modified by lemon. +        """ +        with ctx.typing(): +            embed = Embed() +            user = ctx.author + +            if not message: + +                # Get a random message from the users history +                messages = [] +                async for message in ctx.history(limit=500).filter( +                        lambda msg: msg.author == ctx.author  # Message was sent by author. +                ): +                    messages.append(message.content) + +                message = self._get_random_long_message(messages) + +            # Build and send the embed +            embed.set_author( +                name=f"{user.name}#{user.discriminator}", +                icon_url=user.display_avatar.url, +            ) +            embed.description = f"*{self._snakify(message)}*" + +            await ctx.send(embed=embed) + +    @snakes_group.command(name="video", aliases=("get_video",)) +    async def video_command(self, ctx: Context, *, search: str = None) -> None: +        """ +        Gets a YouTube video about snakes. + +        If `search` is given, a snake with that name will be searched on Youtube. + +        Written by Andrew and Prithaj. +        """ +        # Are we searching for anything specific? +        if search: +            query = search + " snake" +        else: +            snake = await self._get_snake_name() +            query = snake["name"] + +        # Build the URL and make the request +        url = "https://www.googleapis.com/youtube/v3/search" +        response = await self.bot.http_session.get( +            url, +            params={ +                "part": "snippet", +                "q": urllib.parse.quote_plus(query), +                "type": "video", +                "key": Tokens.youtube +            } +        ) +        response = await response.json() +        data = response.get("items", []) + +        # Send the user a video +        if len(data) > 0: +            num = random.randint(0, len(data) - 1) +            youtube_base_url = "https://www.youtube.com/watch?v=" +            await ctx.send( +                content=f"{youtube_base_url}{data[num]['id']['videoId']}" +            ) +        else: +            log.warning(f"YouTube API error. Full response looks like {response}") + +    @snakes_group.command(name="zen") +    async def zen_command(self, ctx: Context) -> None: +        """ +        Gets a random quote from the Zen of Python, except as if spoken by a snake. + +        Written by Prithaj and Andrew. +        Modified by lemon. +        """ +        embed = Embed( +            title="Zzzen of Pythhon", +            color=SNAKE_COLOR +        ) + +        # Get the zen quote and snakify it +        zen_quote = random.choice(ZEN.splitlines()) +        zen_quote = self._snakify(zen_quote) + +        # Embed and send +        embed.description = zen_quote +        await ctx.send( +            embed=embed +        ) +    # endregion + +    # region: Error handlers +    @card_command.error +    async def command_error(self, ctx: Context, error: CommandError) -> None: +        """Local error handler for the Snake Cog.""" +        original_error = getattr(error, "original", None) +        if isinstance(original_error, OSError): +            error.handled = True +            embed = Embed() +            embed.colour = Colour.red() +            log.error(f"snake_card encountered an OSError: {error} ({original_error})") +            embed.description = "Could not generate the snake card! Please try again." +            embed.title = random.choice(ERROR_REPLIES) +            await ctx.send(embed=embed) diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py new file mode 100644 index 00000000..de51339d --- /dev/null +++ b/bot/exts/fun/snakes/_utils.py @@ -0,0 +1,721 @@ +import asyncio +import io +import json +import logging +import math +import random +from itertools import product +from pathlib import Path + +from PIL import Image +from PIL.ImageDraw import ImageDraw +from discord import File, Member, Reaction +from discord.ext.commands import Cog, Context + +from bot.constants import Roles + +SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute() + +h1 = r"""``` +   ---- +  ------ +/--------\ +|--------| +|--------| + \------/ +   ---- +```""" +h2 = r"""``` +   ---- +  ------ +/---\-/--\ +|-----\--| +|--------| + \------/ +   ---- +```""" +h3 = r"""``` +   ---- +  ------ +/---\-/--\ +|-----\--| +|-----/--| + \----\-/ +   ---- +```""" +h4 = r"""``` +   ----- +  -----  \ +/--|  /---\ +|--\  -\---| +|--\--/--  / + \------- / +   ------ +```""" +stages = [h1, h2, h3, h4] +snakes = { +    "Baby Python": "https://i.imgur.com/SYOcmSa.png", +    "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png", +    "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png", +    "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png", +    "Baby Cobra": "https://i.imgur.com/jk14ryt.png", +    "Baby Anaconda": "https://i.imgur.com/EpdrnNr.png", +} + +BOARD_TILE_SIZE = 56         # the size of each board tile +BOARD_PLAYER_SIZE = 20       # the size of each player icon +BOARD_MARGIN = (10, 0)       # margins, in pixels (for player icons) +# The size of the image to download +# Should a power of 2 and higher than BOARD_PLAYER_SIZE +PLAYER_ICON_IMAGE_SIZE = 32 +MAX_PLAYERS = 4              # depends on the board size/quality, 4 is for the default board + +# board definition (from, to) +BOARD = { +    # ladders +    2: 38, +    7: 14, +    8: 31, +    15: 26, +    21: 42, +    28: 84, +    36: 44, +    51: 67, +    71: 91, +    78: 98, +    87: 94, + +    # snakes +    99: 80, +    95: 75, +    92: 88, +    89: 68, +    74: 53, +    64: 60, +    62: 19, +    49: 11, +    46: 25, +    16: 6 +} + +DEFAULT_SNAKE_COLOR = 0x15c7ea +DEFAULT_BACKGROUND_COLOR = 0 +DEFAULT_IMAGE_DIMENSIONS = (200, 200) +DEFAULT_SNAKE_LENGTH = 22 +DEFAULT_SNAKE_WIDTH = 8 +DEFAULT_SEGMENT_LENGTH_RANGE = (7, 10) +DEFAULT_IMAGE_MARGINS = (50, 50) +DEFAULT_TEXT = "snek\nit\nup" +DEFAULT_TEXT_POSITION = ( +    10, +    10 +) +DEFAULT_TEXT_COLOR = 0xf2ea15 +X = 0 +Y = 1 +ANGLE_RANGE = math.pi * 2 + + +def get_resource(file: str) -> list[dict]: +    """Load Snake resources JSON.""" +    return json.loads((SNAKE_RESOURCES / f"{file}.json").read_text("utf-8")) + + +def smoothstep(t: float) -> float: +    """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating.""" +    return t * t * (3. - 2. * t) + + +def lerp(t: float, a: float, b: float) -> float: +    """Linear interpolation between a and b, given a fraction t.""" +    return a + t * (b - a) + + +class PerlinNoiseFactory(object): +    """ +    Callable that produces Perlin noise for an arbitrary point in an arbitrary number of dimensions. + +    The underlying grid is aligned with the integers. + +    There is no limit to the coordinates used; new gradients are generated on the fly as necessary. + +    Taken from: https://gist.github.com/eevee/26f547457522755cb1fb8739d0ea89a1 +    Licensed under ISC +    """ + +    def __init__(self, dimension: int, octaves: int = 1, tile: tuple[int, ...] = (), unbias: bool = False): +        """ +        Create a new Perlin noise factory in the given number of dimensions. + +        dimension should be an integer and at least 1. + +        More octaves create a foggier and more-detailed noise pattern.  More than 4 octaves is rather excessive. + +        ``tile`` can be used to make a seamlessly tiling pattern. +        For example: +            pnf = PerlinNoiseFactory(2, tile=(0, 3)) + +        This will produce noise that tiles every 3 units vertically, but never tiles horizontally. + +        If ``unbias`` is True, the smoothstep function will be applied to the output before returning +        it, to counteract some of Perlin noise's significant bias towards the center of its output range. +        """ +        self.dimension = dimension +        self.octaves = octaves +        self.tile = tile + (0,) * dimension +        self.unbias = unbias + +        # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply +        # by this to scale to ±1 +        self.scale_factor = 2 * dimension ** -0.5 + +        self.gradient = {} + +    def _generate_gradient(self) -> tuple[float, ...]: +        """ +        Generate a random unit vector at each grid point. + +        This is the "gradient" vector, in that the grid tile slopes towards it +        """ +        # 1 dimension is special, since the only unit vector is trivial; +        # instead, use a slope between -1 and 1 +        if self.dimension == 1: +            return (random.uniform(-1, 1),) + +        # Generate a random point on the surface of the unit n-hypersphere; +        # this is the same as a random unit vector in n dimensions.  Thanks +        # to: http://mathworld.wolfram.com/SpherePointPicking.html +        # Pick n normal random variables with stddev 1 +        random_point = [random.gauss(0, 1) for _ in range(self.dimension)] +        # Then scale the result to a unit vector +        scale = sum(n * n for n in random_point) ** -0.5 +        return tuple(coord * scale for coord in random_point) + +    def get_plain_noise(self, *point) -> float: +        """Get plain noise for a single point, without taking into account either octaves or tiling.""" +        if len(point) != self.dimension: +            raise ValueError( +                f"Expected {self.dimension} values, got {len(point)}" +            ) + +        # Build a list of the (min, max) bounds in each dimension +        grid_coords = [] +        for coord in point: +            min_coord = math.floor(coord) +            max_coord = min_coord + 1 +            grid_coords.append((min_coord, max_coord)) + +        # Compute the dot product of each gradient vector and the point's +        # distance from the corresponding grid point.  This gives you each +        # gradient's "influence" on the chosen point. +        dots = [] +        for grid_point in product(*grid_coords): +            if grid_point not in self.gradient: +                self.gradient[grid_point] = self._generate_gradient() +            gradient = self.gradient[grid_point] + +            dot = 0 +            for i in range(self.dimension): +                dot += gradient[i] * (point[i] - grid_point[i]) +            dots.append(dot) + +        # Interpolate all those dot products together.  The interpolation is +        # done with smoothstep to smooth out the slope as you pass from one +        # grid cell into the next. +        # Due to the way product() works, dot products are ordered such that +        # the last dimension alternates: (..., min), (..., max), etc.  So we +        # can interpolate adjacent pairs to "collapse" that last dimension.  Then +        # the results will alternate in their second-to-last dimension, and so +        # forth, until we only have a single value left. +        dim = self.dimension +        while len(dots) > 1: +            dim -= 1 +            s = smoothstep(point[dim] - grid_coords[dim][0]) + +            next_dots = [] +            while dots: +                next_dots.append(lerp(s, dots.pop(0), dots.pop(0))) + +            dots = next_dots + +        return dots[0] * self.scale_factor + +    def __call__(self, *point) -> float: +        """ +        Get the value of this Perlin noise function at the given point. + +        The number of values given should match the number of dimensions. +        """ +        ret = 0 +        for o in range(self.octaves): +            o2 = 1 << o +            new_point = [] +            for i, coord in enumerate(point): +                coord *= o2 +                if self.tile[i]: +                    coord %= self.tile[i] * o2 +                new_point.append(coord) +            ret += self.get_plain_noise(*new_point) / o2 + +        # Need to scale n back down since adding all those extra octaves has +        # probably expanded it beyond ±1 +        # 1 octave: ±1 +        # 2 octaves: ±1½ +        # 3 octaves: ±1¾ +        ret /= 2 - 2 ** (1 - self.octaves) + +        if self.unbias: +            # The output of the plain Perlin noise algorithm has a fairly +            # strong bias towards the center due to the central limit theorem +            # -- in fact the top and bottom 1/8 virtually never happen.  That's +            # a quarter of our entire output range!  If only we had a function +            # in [0..1] that could introduce a bias towards the endpoints... +            r = (ret + 1) / 2 +            # Doing it this many times is a completely made-up heuristic. +            for _ in range(int(self.octaves / 2 + 0.5)): +                r = smoothstep(r) +            ret = r * 2 - 1 + +        return ret + + +def create_snek_frame( +        perlin_factory: PerlinNoiseFactory, perlin_lookup_vertical_shift: float = 0, +        image_dimensions: tuple[int, int] = DEFAULT_IMAGE_DIMENSIONS, +        image_margins: tuple[int, int] = DEFAULT_IMAGE_MARGINS, +        snake_length: int = DEFAULT_SNAKE_LENGTH, +        snake_color: int = DEFAULT_SNAKE_COLOR, bg_color: int = DEFAULT_BACKGROUND_COLOR, +        segment_length_range: tuple[int, int] = DEFAULT_SEGMENT_LENGTH_RANGE, snake_width: int = DEFAULT_SNAKE_WIDTH, +        text: str = DEFAULT_TEXT, text_position: tuple[float, float] = DEFAULT_TEXT_POSITION, +        text_color: int = DEFAULT_TEXT_COLOR +) -> Image.Image: +    """ +    Creates a single random snek frame using Perlin noise. + +    `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame. +    If `text` is given, display the given text with the snek. +    """ +    start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X]) +    start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y]) +    points: list[tuple[float, float]] = [(start_x, start_y)] + +    for index in range(0, snake_length): +        angle = perlin_factory.get_plain_noise( +            ((1 / (snake_length + 1)) * (index + 1)) + perlin_lookup_vertical_shift +        ) * ANGLE_RANGE +        current_point = points[index] +        segment_length = random.randint(segment_length_range[0], segment_length_range[1]) +        points.append(( +            current_point[X] + segment_length * math.cos(angle), +            current_point[Y] + segment_length * math.sin(angle) +        )) + +    # normalize bounds +    min_dimensions: list[float] = [start_x, start_y] +    max_dimensions: list[float] = [start_x, start_y] +    for point in points: +        min_dimensions[X] = min(point[X], min_dimensions[X]) +        min_dimensions[Y] = min(point[Y], min_dimensions[Y]) +        max_dimensions[X] = max(point[X], max_dimensions[X]) +        max_dimensions[Y] = max(point[Y], max_dimensions[Y]) + +    # shift towards middle +    dimension_range = (max_dimensions[X] - min_dimensions[X], max_dimensions[Y] - min_dimensions[Y]) +    shift = ( +        image_dimensions[X] / 2 - (dimension_range[X] / 2 + min_dimensions[X]), +        image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y]) +    ) + +    image = Image.new(mode="RGB", size=image_dimensions, color=bg_color) +    draw = ImageDraw(image) +    for index in range(1, len(points)): +        point = points[index] +        previous = points[index - 1] +        draw.line( +            ( +                shift[X] + previous[X], +                shift[Y] + previous[Y], +                shift[X] + point[X], +                shift[Y] + point[Y] +            ), +            width=snake_width, +            fill=snake_color +        ) +    if text is not None: +        draw.multiline_text(text_position, text, fill=text_color) +    del draw +    return image + + +def frame_to_png_bytes(image: Image) -> io.BytesIO: +    """Convert image to byte stream.""" +    stream = io.BytesIO() +    image.save(stream, format="PNG") +    stream.seek(0) +    return stream + + +log = logging.getLogger(__name__) +START_EMOJI = "\u2611"     # :ballot_box_with_check: - Start the game +CANCEL_EMOJI = "\u274C"    # :x: - Cancel or leave the game +ROLL_EMOJI = "\U0001F3B2"  # :game_die: - Roll the die! +JOIN_EMOJI = "\U0001F64B"  # :raising_hand: - Join the game. +STARTUP_SCREEN_EMOJI = [ +    JOIN_EMOJI, +    START_EMOJI, +    CANCEL_EMOJI +] +GAME_SCREEN_EMOJI = [ +    ROLL_EMOJI, +    CANCEL_EMOJI +] + + +class SnakeAndLaddersGame: +    """Snakes and Ladders game Cog.""" + +    def __init__(self, snakes: Cog, context: Context): +        self.snakes = snakes +        self.ctx = context +        self.channel = self.ctx.channel +        self.state = "booting" +        self.started = False +        self.author = self.ctx.author +        self.players = [] +        self.player_tiles = {} +        self.round_has_rolled = {} +        self.avatar_images = {} +        self.board = None +        self.positions = None +        self.rolls = [] + +    async def open_game(self) -> None: +        """ +        Create a new Snakes and Ladders game. + +        Listen for reactions until players have joined, and the game has been started. +        """ +        def startup_event_check(reaction_: Reaction, user_: Member) -> bool: +            """Make sure that this reaction is what we want to operate on.""" +            return ( +                all(( +                    reaction_.message.id == startup.id,       # Reaction is on startup message +                    reaction_.emoji in STARTUP_SCREEN_EMOJI,  # Reaction is one of the startup emotes +                    user_.id != self.ctx.bot.user.id,         # Reaction was not made by the bot +                )) +            ) + +        # Check to see if the bot can remove reactions +        if not self.channel.permissions_for(self.ctx.guild.me).manage_messages: +            log.warning( +                "Unable to start Snakes and Ladders - " +                f"Missing manage_messages permissions in {self.channel}" +            ) +            return + +        await self._add_player(self.author) +        await self.channel.send( +            "**Snakes and Ladders**: A new game is about to start!", +            file=File( +                str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), +                filename="Snakes and Ladders.jpg" +            ) +        ) +        startup = await self.channel.send( +            f"Press {JOIN_EMOJI} to participate, and press " +            f"{START_EMOJI} to start the game" +        ) +        for emoji in STARTUP_SCREEN_EMOJI: +            await startup.add_reaction(emoji) + +        self.state = "waiting" + +        while not self.started: +            try: +                reaction, user = await self.ctx.bot.wait_for( +                    "reaction_add", +                    timeout=300, +                    check=startup_event_check +                ) +                if reaction.emoji == JOIN_EMOJI: +                    await self.player_join(user) +                elif reaction.emoji == CANCEL_EMOJI: +                    if user == self.author or (self._is_moderator(user) and user not in self.players): +                        # Allow game author or non-playing moderation staff to cancel a waiting game +                        await self.cancel_game() +                        return +                    else: +                        await self.player_leave(user) +                elif reaction.emoji == START_EMOJI: +                    if self.ctx.author == user: +                        self.started = True +                        await self.start_game(user) +                        await startup.delete() +                        break + +                await startup.remove_reaction(reaction.emoji, user) + +            except asyncio.TimeoutError: +                log.debug("Snakes and Ladders timed out waiting for a reaction") +                await self.cancel_game() +                return  # We're done, no reactions for the last 5 minutes + +    async def _add_player(self, user: Member) -> None: +        """Add player to game.""" +        self.players.append(user) +        self.player_tiles[user.id] = 1 + +        avatar_bytes = await user.display_avatar.replace(size=PLAYER_ICON_IMAGE_SIZE).read() +        im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE)) +        self.avatar_images[user.id] = im + +    async def player_join(self, user: Member) -> None: +        """ +        Handle players joining the game. + +        Prevent player joining if they have already joined, if the game is full, or if the game is +        in a waiting state. +        """ +        for p in self.players: +            if user == p: +                await self.channel.send(user.mention + " You are already in the game.", delete_after=10) +                return +        if self.state != "waiting": +            await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10) +            return +        if len(self.players) is MAX_PLAYERS: +            await self.channel.send(user.mention + " The game is full!", delete_after=10) +            return + +        await self._add_player(user) + +        await self.channel.send( +            f"**Snakes and Ladders**: {user.mention} has joined the game.\n" +            f"There are now {str(len(self.players))} players in the game.", +            delete_after=10 +        ) + +    async def player_leave(self, user: Member) -> bool: +        """ +        Handle players leaving the game. + +        Leaving is prevented if the user wasn't part of the game. + +        If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean +        is returned True to prevent a game from continuing after it's destroyed. +        """ +        is_surrendered = False  # Sentinel value to assist with stopping a surrendered game +        for p in self.players: +            if user == p: +                self.players.remove(p) +                self.player_tiles.pop(p.id, None) +                self.round_has_rolled.pop(p.id, None) +                await self.channel.send( +                    "**Snakes and Ladders**: " + user.mention + " has left the game.", +                    delete_after=10 +                ) + +                if self.state != "waiting" and len(self.players) == 0: +                    await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") +                    is_surrendered = True +                    self._destruct() + +                return is_surrendered +        else: +            await self.channel.send(user.mention + " You are not in the match.", delete_after=10) +            return is_surrendered + +    async def cancel_game(self) -> None: +        """Cancel the running game.""" +        await self.channel.send("**Snakes and Ladders**: Game has been canceled.") +        self._destruct() + +    async def start_game(self, user: Member) -> None: +        """ +        Allow the game author to begin the game. + +        The game cannot be started if the game is in a waiting state. +        """ +        if not user == self.author: +            await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10) +            return + +        if not self.state == "waiting": +            await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10) +            return + +        self.state = "starting" +        player_list = ", ".join(user.mention for user in self.players) +        await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list) +        await self.start_round() + +    async def start_round(self) -> None: +        """Begin the round.""" +        def game_event_check(reaction_: Reaction, user_: Member) -> bool: +            """Make sure that this reaction is what we want to operate on.""" +            return ( +                all(( +                    reaction_.message.id == self.positions.id,  # Reaction is on positions message +                    reaction_.emoji in GAME_SCREEN_EMOJI,       # Reaction is one of the game emotes +                    user_.id != self.ctx.bot.user.id,           # Reaction was not made by the bot +                )) +            ) + +        self.state = "roll" +        for user in self.players: +            self.round_has_rolled[user.id] = False +        board_img = Image.open(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg") +        player_row_size = math.ceil(MAX_PLAYERS / 2) + +        for i, player in enumerate(self.players): +            tile = self.player_tiles[player.id] +            tile_coordinates = self._board_coordinate_from_index(tile) +            x_offset = BOARD_MARGIN[0] + tile_coordinates[0] * BOARD_TILE_SIZE +            y_offset = \ +                BOARD_MARGIN[1] + ( +                    (10 * BOARD_TILE_SIZE) - (9 - tile_coordinates[1]) * BOARD_TILE_SIZE - BOARD_PLAYER_SIZE) +            x_offset += BOARD_PLAYER_SIZE * (i % player_row_size) +            y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size) +            board_img.paste(self.avatar_images[player.id], +                            box=(x_offset, y_offset)) + +        board_file = File(frame_to_png_bytes(board_img), filename="Board.jpg") +        player_list = "\n".join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players) + +        # Store and send new messages +        temp_board = await self.channel.send( +            "**Snakes and Ladders**: A new round has started! Current board:", +            file=board_file +        ) +        temp_positions = await self.channel.send( +            f"**Current positions**:\n{player_list}\n\nUse {ROLL_EMOJI} to roll the dice!" +        ) + +        # Delete the previous messages +        if self.board and self.positions: +            await self.board.delete() +            await self.positions.delete() + +        # remove the roll messages +        for roll in self.rolls: +            await roll.delete() +        self.rolls = [] + +        # Save new messages +        self.board = temp_board +        self.positions = temp_positions + +        # Wait for rolls +        for emoji in GAME_SCREEN_EMOJI: +            await self.positions.add_reaction(emoji) + +        is_surrendered = False +        while True: +            try: +                reaction, user = await self.ctx.bot.wait_for( +                    "reaction_add", +                    timeout=300, +                    check=game_event_check +                ) + +                if reaction.emoji == ROLL_EMOJI: +                    await self.player_roll(user) +                elif reaction.emoji == CANCEL_EMOJI: +                    if self._is_moderator(user) and user not in self.players: +                        # Only allow non-playing moderation staff to cancel a running game +                        await self.cancel_game() +                        return +                    else: +                        is_surrendered = await self.player_leave(user) + +                await self.positions.remove_reaction(reaction.emoji, user) + +                if self._check_all_rolled(): +                    break + +            except asyncio.TimeoutError: +                log.debug("Snakes and Ladders timed out waiting for a reaction") +                await self.cancel_game() +                return  # We're done, no reactions for the last 5 minutes + +        # Round completed +        # Check to see if the game was surrendered before completing the round, without this +        # sentinel, the game object would be deleted but the next round still posted into purgatory +        if not is_surrendered: +            await self._complete_round() + +    async def player_roll(self, user: Member) -> None: +        """Handle the player's roll.""" +        if user.id not in self.player_tiles: +            await self.channel.send(user.mention + " You are not in the match.", delete_after=10) +            return +        if self.state != "roll": +            await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10) +            return +        if self.round_has_rolled[user.id]: +            return +        roll = random.randint(1, 6) +        self.rolls.append(await self.channel.send(f"{user.mention} rolled a **{roll}**!")) +        next_tile = self.player_tiles[user.id] + roll + +        # apply snakes and ladders +        if next_tile in BOARD: +            target = BOARD[next_tile] +            if target < next_tile: +                await self.channel.send( +                    f"{user.mention} slips on a snake and falls back to **{target}**", +                    delete_after=15 +                ) +            else: +                await self.channel.send( +                    f"{user.mention} climbs a ladder to **{target}**", +                    delete_after=15 +                ) +            next_tile = target + +        self.player_tiles[user.id] = min(100, next_tile) +        self.round_has_rolled[user.id] = True + +    async def _complete_round(self) -> None: +        """At the conclusion of a round check to see if there's been a winner.""" +        self.state = "post_round" + +        # check for winner +        winner = self._check_winner() +        if winner is None: +            # there is no winner, start the next round +            await self.start_round() +            return + +        # announce winner and exit +        await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:") +        self._destruct() + +    def _check_winner(self) -> Member: +        """Return a winning member if we're in the post-round state and there's a winner.""" +        if self.state != "post_round": +            return None +        return next((player for player in self.players if self.player_tiles[player.id] == 100), +                    None) + +    def _check_all_rolled(self) -> bool: +        """Check if all members have made their roll.""" +        return all(rolled for rolled in self.round_has_rolled.values()) + +    def _destruct(self) -> None: +        """Clean up the finished game object.""" +        del self.snakes.active_sal[self.channel] + +    def _board_coordinate_from_index(self, index: int) -> tuple[int, int]: +        """Convert the tile number to the x/y coordinates for graphical purposes.""" +        y_level = 9 - math.floor((index - 1) / 10) +        is_reversed = math.floor((index - 1) / 10) % 2 != 0 +        x_level = (index - 1) % 10 +        if is_reversed: +            x_level = 9 - x_level +        return x_level, y_level + +    @staticmethod +    def _is_moderator(user: Member) -> bool: +        """Return True if the user is a Moderator.""" +        return any(Roles.moderator == role.id for role in user.roles) diff --git a/bot/exts/fun/space.py b/bot/exts/fun/space.py new file mode 100644 index 00000000..48ad0f96 --- /dev/null +++ b/bot/exts/fun/space.py @@ -0,0 +1,236 @@ +import logging +import random +from datetime import date, datetime +from typing import Any, Optional +from urllib.parse import urlencode + +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import Tokens +from bot.utils.converters import DateConverter +from bot.utils.extensions import invoke_help_command + +logger = logging.getLogger(__name__) + +NASA_BASE_URL = "https://api.nasa.gov" +NASA_IMAGES_BASE_URL = "https://images-api.nasa.gov" +NASA_EPIC_BASE_URL = "https://epic.gsfc.nasa.gov" + +APOD_MIN_DATE = date(1995, 6, 16) + + +class Space(Cog): +    """Space Cog contains commands, that show images, facts or other information about space.""" + +    def __init__(self, bot: Bot): +        self.http_session = bot.http_session + +        self.rovers = {} +        self.get_rovers.start() + +    def cog_unload(self) -> None: +        """Cancel `get_rovers` task when Cog will unload.""" +        self.get_rovers.cancel() + +    @tasks.loop(hours=24) +    async def get_rovers(self) -> None: +        """Get listing of rovers from NASA API and info about their start and end dates.""" +        data = await self.fetch_from_nasa("mars-photos/api/v1/rovers") + +        for rover in data["rovers"]: +            self.rovers[rover["name"].lower()] = { +                "min_date": rover["landing_date"], +                "max_date": rover["max_date"], +                "max_sol": rover["max_sol"] +            } + +    @group(name="space", invoke_without_command=True) +    async def space(self, ctx: Context) -> None: +        """Head command that contains commands about space.""" +        await invoke_help_command(ctx) + +    @space.command(name="apod") +    async def apod(self, ctx: Context, date: Optional[str]) -> None: +        """ +        Get Astronomy Picture of Day from NASA API. Date is optional parameter, what formatting is YYYY-MM-DD. + +        If date is not specified, this will get today APOD. +        """ +        params = {} +        # Parse date to params, when provided. Show error message when invalid formatting +        if date: +            try: +                apod_date = datetime.strptime(date, "%Y-%m-%d").date() +            except ValueError: +                await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.") +                return + +            now = datetime.now().date() +            if APOD_MIN_DATE > apod_date or now < apod_date: +                await ctx.send(f"Date must be between {APOD_MIN_DATE.isoformat()} and {now.isoformat()} (today).") +                return + +            params["date"] = apod_date.isoformat() + +        result = await self.fetch_from_nasa("planetary/apod", params) + +        await ctx.send( +            embed=self.create_nasa_embed( +                f"Astronomy Picture of the Day - {result['date']}", +                result["explanation"], +                result["url"] +            ) +        ) + +    @space.command(name="nasa") +    async def nasa(self, ctx: Context, *, search_term: Optional[str]) -> None: +        """Get random NASA information/facts + image. Support `search_term` parameter for more specific search.""" +        params = { +            "media_type": "image" +        } +        if search_term: +            params["q"] = search_term + +        # Don't use API key, no need for this. +        data = await self.fetch_from_nasa("search", params, NASA_IMAGES_BASE_URL, use_api_key=False) +        if len(data["collection"]["items"]) == 0: +            await ctx.send(f"Can't find any items with search term `{search_term}`.") +            return + +        item = random.choice(data["collection"]["items"]) + +        await ctx.send( +            embed=self.create_nasa_embed( +                item["data"][0]["title"], +                item["data"][0]["description"], +                item["links"][0]["href"] +            ) +        ) + +    @space.command(name="epic") +    async def epic(self, ctx: Context, date: Optional[str]) -> None: +        """Get a random image of the Earth from the NASA EPIC API. Support date parameter, format is YYYY-MM-DD.""" +        if date: +            try: +                show_date = datetime.strptime(date, "%Y-%m-%d").date().isoformat() +            except ValueError: +                await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.") +                return +        else: +            show_date = None + +        # Don't use API key, no need for this. +        data = await self.fetch_from_nasa( +            f"api/natural{f'/date/{show_date}' if show_date else ''}", +            base=NASA_EPIC_BASE_URL, +            use_api_key=False +        ) +        if len(data) < 1: +            await ctx.send("Can't find any images in this date.") +            return + +        item = random.choice(data) + +        year, month, day = item["date"].split(" ")[0].split("-") +        image_url = f"{NASA_EPIC_BASE_URL}/archive/natural/{year}/{month}/{day}/jpg/{item['image']}.jpg" + +        await ctx.send( +            embed=self.create_nasa_embed( +                "Earth Image", item["caption"], image_url, f" \u2022 Identifier: {item['identifier']}" +            ) +        ) + +    @space.group(name="mars", invoke_without_command=True) +    async def mars( +        self, +        ctx: Context, +        date: Optional[DateConverter], +        rover: str = "curiosity" +    ) -> None: +        """ +        Get random Mars image by date. Support both SOL (martian solar day) and earth date and rovers. + +        Earth date formatting is YYYY-MM-DD. Use `.space mars dates` to get all currently available rovers. +        """ +        rover = rover.lower() +        if rover not in self.rovers: +            await ctx.send( +                ( +                    f"Invalid rover `{rover}`.\n" +                    f"**Rovers:** `{'`, `'.join(f'{r.capitalize()}' for r in self.rovers)}`" +                ) +            ) +            return + +        # When date not provided, get random SOL date between 0 and rover's max. +        if date is None: +            date = random.randint(0, self.rovers[rover]["max_sol"]) + +        params = {} +        if isinstance(date, int): +            params["sol"] = date +        else: +            params["earth_date"] = date.date().isoformat() + +        result = await self.fetch_from_nasa(f"mars-photos/api/v1/rovers/{rover}/photos", params) +        if len(result["photos"]) < 1: +            err_msg = ( +                f"We can't find result in date " +                f"{date.date().isoformat() if isinstance(date, datetime) else f'{date} SOL'}.\n" +                f"**Note:** Dates must match with rover's working dates. Please use `{ctx.prefix}space mars dates` to " +                "see working dates for each rover." +            ) +            await ctx.send(err_msg) +            return + +        item = random.choice(result["photos"]) +        await ctx.send( +            embed=self.create_nasa_embed( +                f"{item['rover']['name']}'s {item['camera']['full_name']} Mars Image", "", item["img_src"], +            ) +        ) + +    @mars.command(name="dates", aliases=("d", "date", "rover", "rovers", "r")) +    async def dates(self, ctx: Context) -> None: +        """Get current available rovers photo date ranges.""" +        await ctx.send("\n".join( +            f"**{r.capitalize()}:** {i['min_date']} **-** {i['max_date']}" for r, i in self.rovers.items() +        )) + +    async def fetch_from_nasa( +        self, +        endpoint: str, +        additional_params: Optional[dict[str, Any]] = None, +        base: Optional[str] = NASA_BASE_URL, +        use_api_key: bool = True +    ) -> dict[str, Any]: +        """Fetch information from NASA API, return result.""" +        params = {} +        if use_api_key: +            params["api_key"] = Tokens.nasa + +        # Add additional parameters to request parameters only when they provided by user +        if additional_params is not None: +            params.update(additional_params) + +        async with self.http_session.get(url=f"{base}/{endpoint}?{urlencode(params)}") as resp: +            return await resp.json() + +    def create_nasa_embed(self, title: str, description: str, image: str, footer: Optional[str] = "") -> Embed: +        """Generate NASA commands embeds. Required: title, description and image URL, footer (addition) is optional.""" +        return Embed( +            title=title, +            description=description +        ).set_image(url=image).set_footer(text="Powered by NASA API" + footer) + + +def setup(bot: Bot) -> None: +    """Load the Space cog.""" +    if not Tokens.nasa: +        logger.warning("Can't find NASA API key. Not loading Space Cog.") +        return + +    bot.add_cog(Space(bot)) diff --git a/bot/exts/fun/speedrun.py b/bot/exts/fun/speedrun.py new file mode 100644 index 00000000..c2966ce1 --- /dev/null +++ b/bot/exts/fun/speedrun.py @@ -0,0 +1,26 @@ +import json +import logging +from pathlib import Path +from random import choice + +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) + +LINKS = json.loads(Path("bot/resources/fun/speedrun_links.json").read_text("utf8")) + + +class Speedrun(commands.Cog): +    """Commands about the video game speedrunning community.""" + +    @commands.command(name="speedrun") +    async def get_speedrun(self, ctx: commands.Context) -> None: +        """Sends a link to a video of a random speedrun.""" +        await ctx.send(choice(LINKS)) + + +def setup(bot: Bot) -> None: +    """Load the Speedrun cog.""" +    bot.add_cog(Speedrun()) diff --git a/bot/exts/fun/status_codes.py b/bot/exts/fun/status_codes.py new file mode 100644 index 00000000..501cbe0a --- /dev/null +++ b/bot/exts/fun/status_codes.py @@ -0,0 +1,87 @@ +from random import choice + +import discord +from discord.ext import commands + +from bot.bot import Bot + +HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg" +HTTP_CAT_URL = "https://http.cat/{code}.jpg" +STATUS_TEMPLATE = "**Status: {code}**" +ERR_404 = "Unable to find status floof for {code}." +ERR_UNKNOWN = "Error attempting to retrieve status floof for {code}." +ERROR_LENGTH_EMBED = discord.Embed( +    title="Input status code does not exist", +    description="The range of valid status codes is 100 to 599", +) + + +class HTTPStatusCodes(commands.Cog): +    """ +    Fetch an image depicting HTTP status codes as a dog or a cat. + +    If neither animal is selected a cat or dog is chosen randomly for the given status code. +    """ + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @commands.group( +        name="http_status", +        aliases=("status", "httpstatus"), +        invoke_without_command=True, +    ) +    async def http_status_group(self, ctx: commands.Context, code: int) -> None: +        """Choose a cat or dog randomly for the given status code.""" +        subcmd = choice((self.http_cat, self.http_dog)) +        await subcmd(ctx, code) + +    @http_status_group.command(name="cat") +    async def http_cat(self, ctx: commands.Context, code: int) -> None: +        """Send a cat version of the requested HTTP status code.""" +        if code in range(100, 600): +            await self.build_embed(url=HTTP_CAT_URL.format(code=code), ctx=ctx, code=code) +            return +        await ctx.send(embed=ERROR_LENGTH_EMBED) + +    @http_status_group.command(name="dog") +    async def http_dog(self, ctx: commands.Context, code: int) -> None: +        """Send a dog version of the requested HTTP status code.""" +        if code in range(100, 600): +            await self.build_embed(url=HTTP_DOG_URL.format(code=code), ctx=ctx, code=code) +            return +        await ctx.send(embed=ERROR_LENGTH_EMBED) + +    async def build_embed(self, url: str, ctx: commands.Context, code: int) -> None: +        """Attempt to build and dispatch embed. Append error message instead if something goes wrong.""" +        async with self.bot.http_session.get(url, allow_redirects=False) as response: +            if response.status in range(200, 300): +                await ctx.send( +                    embed=discord.Embed( +                        title=STATUS_TEMPLATE.format(code=code) +                    ).set_image(url=url) +                ) +            elif response.status in (302, 404):  # dog URL returns 302 instead of 404 +                if "dog" in url: +                    await ctx.send( +                        embed=discord.Embed( +                            title=ERR_404.format(code=code) +                        ).set_image(url="https://httpstatusdogs.com/img/404.jpg") +                    ) +                    return +                await ctx.send( +                    embed=discord.Embed( +                        title=ERR_404.format(code=code) +                    ).set_image(url="https://http.cat/404.jpg") +                ) +            else: +                await ctx.send( +                    embed=discord.Embed( +                        title=STATUS_TEMPLATE.format(code=code) +                    ).set_footer(text=ERR_UNKNOWN.format(code=code)) +                ) + + +def setup(bot: Bot) -> None: +    """Load the HTTPStatusCodes cog.""" +    bot.add_cog(HTTPStatusCodes(bot)) diff --git a/bot/exts/fun/tic_tac_toe.py b/bot/exts/fun/tic_tac_toe.py new file mode 100644 index 00000000..5c4f8051 --- /dev/null +++ b/bot/exts/fun/tic_tac_toe.py @@ -0,0 +1,335 @@ +import asyncio +import random +from typing import Callable, Optional, Union + +import discord +from discord.ext.commands import Cog, Context, check, group, guild_only + +from bot.bot import Bot +from bot.constants import Emojis +from bot.utils.pagination import LinePaginator + +CONFIRMATION_MESSAGE = ( +    "{opponent}, {requester} wants to play Tic-Tac-Toe against you." +    f"\nReact to this message with {Emojis.confirmation} to accept or with {Emojis.decline} to decline." +) + + +def check_win(board: dict[int, str]) -> bool: +    """Check from board, is any player won game.""" +    return any( +        ( +            # Horizontal +            board[1] == board[2] == board[3], +            board[4] == board[5] == board[6], +            board[7] == board[8] == board[9], +            # Vertical +            board[1] == board[4] == board[7], +            board[2] == board[5] == board[8], +            board[3] == board[6] == board[9], +            # Diagonal +            board[1] == board[5] == board[9], +            board[3] == board[5] == board[7], +        ) +    ) + + +class Player: +    """Class that contains information about player and functions that interact with player.""" + +    def __init__(self, user: discord.User, ctx: Context, symbol: str): +        self.user = user +        self.ctx = ctx +        self.symbol = symbol + +    async def get_move(self, board: dict[int, str], msg: discord.Message) -> tuple[bool, Optional[int]]: +        """ +        Get move from user. + +        Return is timeout reached and position of field what user will fill when timeout don't reach. +        """ +        def check_for_move(r: discord.Reaction, u: discord.User) -> bool: +            """Check does user who reacted is user who we want, message is board and emoji is in board values.""" +            return ( +                u.id == self.user.id +                and msg.id == r.message.id +                and r.emoji in board.values() +                and r.emoji in Emojis.number_emojis.values() +            ) + +        try: +            react, _ = await self.ctx.bot.wait_for("reaction_add", timeout=30.0, check=check_for_move) +        except asyncio.TimeoutError: +            return True, None +        else: +            return False, list(Emojis.number_emojis.keys())[list(Emojis.number_emojis.values()).index(react.emoji)] + +    def __str__(self) -> str: +        """Return mention of user.""" +        return self.user.mention + + +class AI: +    """Tic Tac Toe AI class for against computer gaming.""" + +    def __init__(self, symbol: str): +        self.symbol = symbol + +    async def get_move(self, board: dict[int, str], _: discord.Message) -> tuple[bool, int]: +        """Get move from AI. AI use Minimax strategy.""" +        possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())] + +        for symbol in (Emojis.o_square, Emojis.x_square): +            for move in possible_moves: +                board_copy = board.copy() +                board_copy[move] = symbol +                if check_win(board_copy): +                    return False, move + +        open_corners = [i for i in possible_moves if i in (1, 3, 7, 9)] +        if len(open_corners) > 0: +            return False, random.choice(open_corners) + +        if 5 in possible_moves: +            return False, 5 + +        open_edges = [i for i in possible_moves if i in (2, 4, 6, 8)] +        return False, random.choice(open_edges) + +    def __str__(self) -> str: +        """Return `AI` as user name.""" +        return "AI" + + +class Game: +    """Class that contains information and functions about Tic Tac Toe game.""" + +    def __init__(self, players: list[Union[Player, AI]], ctx: Context): +        self.players = players +        self.ctx = ctx +        self.board = { +            1: Emojis.number_emojis[1], +            2: Emojis.number_emojis[2], +            3: Emojis.number_emojis[3], +            4: Emojis.number_emojis[4], +            5: Emojis.number_emojis[5], +            6: Emojis.number_emojis[6], +            7: Emojis.number_emojis[7], +            8: Emojis.number_emojis[8], +            9: Emojis.number_emojis[9] +        } + +        self.current = self.players[0] +        self.next = self.players[1] + +        self.winner: Optional[Union[Player, AI]] = None +        self.loser: Optional[Union[Player, AI]] = None +        self.over = False +        self.canceled = False +        self.draw = False + +    async def get_confirmation(self) -> tuple[bool, Optional[str]]: +        """ +        Ask does user want to play TicTacToe against requester. First player is always requester. + +        This return tuple that have: +        - first element boolean (is game accepted?) +        - (optional, only when first element is False, otherwise None) reason for declining. +        """ +        confirm_message = await self.ctx.send( +            CONFIRMATION_MESSAGE.format( +                opponent=self.players[1].user.mention, +                requester=self.players[0].user.mention +            ) +        ) +        await confirm_message.add_reaction(Emojis.confirmation) +        await confirm_message.add_reaction(Emojis.decline) + +        def confirm_check(reaction: discord.Reaction, user: discord.User) -> bool: +            """Check is user who reacted from who this was requested, message is confirmation and emoji is valid.""" +            return ( +                reaction.emoji in (Emojis.confirmation, Emojis.decline) +                and reaction.message.id == confirm_message.id +                and user == self.players[1].user +            ) + +        try: +            reaction, user = await self.ctx.bot.wait_for( +                "reaction_add", +                timeout=60.0, +                check=confirm_check +            ) +        except asyncio.TimeoutError: +            self.over = True +            self.canceled = True +            await confirm_message.delete() +            return False, "Running out of time... Cancelled game." + +        await confirm_message.delete() +        if reaction.emoji == Emojis.confirmation: +            return True, None +        else: +            self.over = True +            self.canceled = True +            return False, "User declined" + +    async def add_reactions(self, msg: discord.Message) -> None: +        """Add number emojis to message.""" +        for nr in Emojis.number_emojis.values(): +            await msg.add_reaction(nr) + +    def format_board(self) -> str: +        """Get formatted tic-tac-toe board for message.""" +        board = list(self.board.values()) +        return "\n".join( +            (f"{board[line]} {board[line + 1]} {board[line + 2]}" for line in range(0, len(board), 3)) +        ) + +    async def play(self) -> None: +        """Start and handle game.""" +        await self.ctx.send("It's time for the game! Let's begin.") +        board = await self.ctx.send( +            embed=discord.Embed(description=self.format_board()) +        ) +        await self.add_reactions(board) + +        for _ in range(9): +            if isinstance(self.current, Player): +                announce = await self.ctx.send( +                    f"{self.current.user.mention}, it's your turn! " +                    "React with an emoji to take your go." +                ) +            timeout, pos = await self.current.get_move(self.board, board) +            if isinstance(self.current, Player): +                await announce.delete() +            if timeout: +                await self.ctx.send(f"{self.current.user.mention} ran out of time. Canceling game.") +                self.over = True +                self.canceled = True +                return +            self.board[pos] = self.current.symbol +            await board.edit( +                embed=discord.Embed(description=self.format_board()) +            ) +            await board.clear_reaction(Emojis.number_emojis[pos]) +            if check_win(self.board): +                self.winner = self.current +                self.loser = self.next +                await self.ctx.send( +                    f":tada: {self.current} won this game! :tada:" +                ) +                await board.clear_reactions() +                break +            self.current, self.next = self.next, self.current +        if not self.winner: +            self.draw = True +            await self.ctx.send("It's a DRAW!") +        self.over = True + + +def is_channel_free() -> Callable: +    """Check is channel where command will be invoked free.""" +    async def predicate(ctx: Context) -> bool: +        return all(game.channel != ctx.channel for game in ctx.cog.games if not game.over) +    return check(predicate) + + +def is_requester_free() -> Callable: +    """Check is requester not already in any game.""" +    async def predicate(ctx: Context) -> bool: +        return all( +            ctx.author not in (player.user for player in game.players) for game in ctx.cog.games if not game.over +        ) +    return check(predicate) + + +class TicTacToe(Cog): +    """TicTacToe cog contains tic-tac-toe game commands.""" + +    def __init__(self): +        self.games: list[Game] = [] + +    @guild_only() +    @is_channel_free() +    @is_requester_free() +    @group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True) +    async def tic_tac_toe(self, ctx: Context, opponent: Optional[discord.User]) -> None: +        """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field.""" +        if opponent == ctx.author: +            await ctx.send("You can't play against yourself.") +            return +        if opponent is not None and not all( +            opponent not in (player.user for player in g.players) for g in ctx.cog.games if not g.over +        ): +            await ctx.send("Opponent is already in game.") +            return +        if opponent is None: +            game = Game( +                [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)], +                ctx +            ) +        else: +            game = Game( +                [Player(ctx.author, ctx, Emojis.x_square), Player(opponent, ctx, Emojis.o_square)], +                ctx +            ) +        self.games.append(game) +        if opponent is not None: +            if opponent.bot:  # check whether the opponent is a bot or not +                await ctx.send("You can't play Tic-Tac-Toe with bots!") +                return + +            confirmed, msg = await game.get_confirmation() + +            if not confirmed: +                if msg: +                    await ctx.send(msg) +                return +        await game.play() + +    @tic_tac_toe.group(name="history", aliases=("log",), invoke_without_command=True) +    async def tic_tac_toe_logs(self, ctx: Context) -> None: +        """Show most recent tic-tac-toe games.""" +        if len(self.games) < 1: +            await ctx.send("No recent games.") +            return +        log_games = [] +        for i, game in enumerate(self.games): +            if game.over and not game.canceled: +                if game.draw: +                    log_games.append( +                        f"**#{i+1}**: {game.players[0]} vs {game.players[1]} (draw)" +                    ) +                else: +                    log_games.append( +                        f"**#{i+1}**: {game.winner} :trophy: vs {game.loser}" +                    ) +        await LinePaginator.paginate( +            log_games, +            ctx, +            discord.Embed(title="Most recent Tic Tac Toe games") +        ) + +    @tic_tac_toe_logs.command(name="show", aliases=("s",)) +    async def show_tic_tac_toe_board(self, ctx: Context, game_id: int) -> None: +        """View game board by ID (ID is possible to get by `.tictactoe history`).""" +        if len(self.games) < game_id: +            await ctx.send("Game don't exist.") +            return +        game = self.games[game_id - 1] + +        if game.draw: +            description = f"{game.players[0]} vs {game.players[1]} (draw)\n\n{game.format_board()}" +        else: +            description = f"{game.winner} :trophy: vs {game.loser}\n\n{game.format_board()}" + +        embed = discord.Embed( +            title=f"Match #{game_id} Game Board", +            description=description, +        ) +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the TicTacToe cog.""" +    bot.add_cog(TicTacToe()) diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py new file mode 100644 index 00000000..cf9e6cd3 --- /dev/null +++ b/bot/exts/fun/trivia_quiz.py @@ -0,0 +1,593 @@ +import asyncio +import json +import logging +import operator +import random +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Optional + +import discord +from discord.ext import commands +from rapidfuzz import fuzz + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, Roles + +logger = logging.getLogger(__name__) + +DEFAULT_QUESTION_LIMIT = 6 +STANDARD_VARIATION_TOLERANCE = 88 +DYNAMICALLY_GEN_VARIATION_TOLERANCE = 97 + +WRONG_ANS_RESPONSE = [ +    "No one answered correctly!", +    "Better luck next time...", +] + +N_PREFIX_STARTS_AT = 5 +N_PREFIXES = [ +    "penta", "hexa", "hepta", "octa", "nona", +    "deca", "hendeca", "dodeca", "trideca", "tetradeca", +] + +PLANETS = [ +    ("1st", "Mercury"), +    ("2nd", "Venus"), +    ("3rd", "Earth"), +    ("4th", "Mars"), +    ("5th", "Jupiter"), +    ("6th", "Saturn"), +    ("7th", "Uranus"), +    ("8th", "Neptune"), +] + +TAXONOMIC_HIERARCHY = [ +    "species", "genus", "family", "order", +    "class", "phylum", "kingdom", "domain", +] + +UNITS_TO_BASE_UNITS = { +    "hertz": ("(unit of frequency)", "s^-1"), +    "newton": ("(unit of force)", "m*kg*s^-2"), +    "pascal": ("(unit of pressure & stress)", "m^-1*kg*s^-2"), +    "joule": ("(unit of energy & quantity of heat)", "m^2*kg*s^-2"), +    "watt": ("(unit of power)", "m^2*kg*s^-3"), +    "coulomb": ("(unit of electric charge & quantity of electricity)", "s*A"), +    "volt": ("(unit of voltage & electromotive force)", "m^2*kg*s^-3*A^-1"), +    "farad": ("(unit of capacitance)", "m^-2*kg^-1*s^4*A^2"), +    "ohm": ("(unit of electric resistance)", "m^2*kg*s^-3*A^-2"), +    "weber": ("(unit of magnetic flux)", "m^2*kg*s^-2*A^-1"), +    "tesla": ("(unit of magnetic flux density)", "kg*s^-2*A^-1"), +} + + +@dataclass(frozen=True) +class QuizEntry: +    """Dataclass for a quiz entry (a question and a string containing answers separated by commas).""" + +    question: str +    answer: str + + +def linear_system(q_format: str, a_format: str) -> QuizEntry: +    """Generate a system of linear equations with two unknowns.""" +    x, y = random.randint(2, 5), random.randint(2, 5) +    answer = a_format.format(x, y) + +    coeffs = random.sample(range(1, 6), 4) + +    question = q_format.format( +        coeffs[0], +        coeffs[1], +        coeffs[0] * x + coeffs[1] * y, +        coeffs[2], +        coeffs[3], +        coeffs[2] * x + coeffs[3] * y, +    ) + +    return QuizEntry(question, answer) + + +def mod_arith(q_format: str, a_format: str) -> QuizEntry: +    """Generate a basic modular arithmetic question.""" +    quotient, m, b = random.randint(30, 40), random.randint(10, 20), random.randint(200, 350) +    ans = random.randint(0, 9)  # max remainder is 9, since the minimum modulus is 10 +    a = quotient * m + ans - b + +    question = q_format.format(a, b, m) +    answer = a_format.format(ans) + +    return QuizEntry(question, answer) + + +def ngonal_prism(q_format: str, a_format: str) -> QuizEntry: +    """Generate a question regarding vertices on n-gonal prisms.""" +    n = random.randint(0, len(N_PREFIXES) - 1) + +    question = q_format.format(N_PREFIXES[n]) +    answer = a_format.format((n + N_PREFIX_STARTS_AT) * 2) + +    return QuizEntry(question, answer) + + +def imag_sqrt(q_format: str, a_format: str) -> QuizEntry: +    """Generate a negative square root question.""" +    ans_coeff = random.randint(3, 10) + +    question = q_format.format(ans_coeff ** 2) +    answer = a_format.format(ans_coeff) + +    return QuizEntry(question, answer) + + +def binary_calc(q_format: str, a_format: str) -> QuizEntry: +    """Generate a binary calculation question.""" +    a = random.randint(15, 20) +    b = random.randint(10, a) +    oper = random.choice( +        ( +            ("+", operator.add), +            ("-", operator.sub), +            ("*", operator.mul), +        ) +    ) + +    # if the operator is multiplication, lower the values of the two operands to make it easier +    if oper[0] == "*": +        a -= 5 +        b -= 5 + +    question = q_format.format(a, oper[0], b) +    answer = a_format.format(oper[1](a, b)) + +    return QuizEntry(question, answer) + + +def solar_system(q_format: str, a_format: str) -> QuizEntry: +    """Generate a question on the planets of the Solar System.""" +    planet = random.choice(PLANETS) + +    question = q_format.format(planet[0]) +    answer = a_format.format(planet[1]) + +    return QuizEntry(question, answer) + + +def taxonomic_rank(q_format: str, a_format: str) -> QuizEntry: +    """Generate a question on taxonomic classification.""" +    level = random.randint(0, len(TAXONOMIC_HIERARCHY) - 2) + +    question = q_format.format(TAXONOMIC_HIERARCHY[level]) +    answer = a_format.format(TAXONOMIC_HIERARCHY[level + 1]) + +    return QuizEntry(question, answer) + + +def base_units_convert(q_format: str, a_format: str) -> QuizEntry: +    """Generate a SI base units conversion question.""" +    unit = random.choice(list(UNITS_TO_BASE_UNITS)) + +    question = q_format.format( +        unit + " " + UNITS_TO_BASE_UNITS[unit][0] +    ) +    answer = a_format.format( +        UNITS_TO_BASE_UNITS[unit][1] +    ) + +    return QuizEntry(question, answer) + + +DYNAMIC_QUESTIONS_FORMAT_FUNCS = { +    201: linear_system, +    202: mod_arith, +    203: ngonal_prism, +    204: imag_sqrt, +    205: binary_calc, +    301: solar_system, +    302: taxonomic_rank, +    303: base_units_convert, +} + + +class TriviaQuiz(commands.Cog): +    """A cog for all quiz commands.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +        self.game_status = {}  # A variable to store the game status: either running or not running. +        self.game_owners = {}  # A variable to store the person's ID who started the quiz game in a channel. + +        self.questions = self.load_questions() +        self.question_limit = 0 + +        self.player_scores = {}  # A variable to store all player's scores for a bot session. +        self.game_player_scores = {}  # A variable to store temporary game player's scores. + +        self.categories = { +            "general": "Test your general knowledge.", +            "retro": "Questions related to retro gaming.", +            "math": "General questions about mathematics ranging from grade 8 to grade 12.", +            "science": "Put your understanding of science to the test!", +            "cs": "A large variety of computer science questions.", +            "python": "Trivia on our amazing language, Python!", +        } + +    @staticmethod +    def load_questions() -> dict: +        """Load the questions from the JSON file.""" +        p = Path("bot", "resources", "fun", "trivia_quiz.json") + +        return json.loads(p.read_text(encoding="utf-8")) + +    @commands.group(name="quiz", aliases=["trivia"], invoke_without_command=True) +    async def quiz_game(self, ctx: commands.Context, category: Optional[str], questions: Optional[int]) -> None: +        """ +        Start a quiz! + +        Questions for the quiz can be selected from the following categories: +        - general: Test your general knowledge. +        - retro: Questions related to retro gaming. +        - math: General questions about mathematics ranging from grade 8 to grade 12. +        - science: Put your understanding of science to the test! +        - cs: A large variety of computer science questions. +        - python: Trivia on our amazing language, Python! + +        (More to come!) +        """ +        if ctx.channel.id not in self.game_status: +            self.game_status[ctx.channel.id] = False + +        if ctx.channel.id not in self.game_player_scores: +            self.game_player_scores[ctx.channel.id] = {} + +        # Stop game if running. +        if self.game_status[ctx.channel.id]: +            await ctx.send( +                "Game is already running... " +                f"do `{self.bot.command_prefix}quiz stop`" +            ) +            return + +        # Send embed showing available categories if inputted category is invalid. +        if category is None: +            category = random.choice(list(self.categories)) + +        category = category.lower() +        if category not in self.categories: +            embed = self.category_embed() +            await ctx.send(embed=embed) +            return + +        topic = self.questions[category] +        topic_length = len(topic) + +        if questions is None: +            self.question_limit = DEFAULT_QUESTION_LIMIT +        else: +            if questions > topic_length: +                await ctx.send( +                    embed=self.make_error_embed( +                        f"This category only has {topic_length} questions. " +                        "Please input a lower value!" +                    ) +                ) +                return + +            elif questions < 1: +                await ctx.send( +                    embed=self.make_error_embed( +                        "You must choose to complete at least one question. " +                        f"(or enter nothing for the default value of {DEFAULT_QUESTION_LIMIT + 1} questions)" +                    ) +                ) +                return + +            else: +                self.question_limit = questions - 1 + +        # Start game if not running. +        if not self.game_status[ctx.channel.id]: +            self.game_owners[ctx.channel.id] = ctx.author +            self.game_status[ctx.channel.id] = True +            start_embed = self.make_start_embed(category) + +            await ctx.send(embed=start_embed)  # send an embed with the rules +            await asyncio.sleep(5) + +        done_question = [] +        hint_no = 0 +        answers = None + +        while self.game_status[ctx.channel.id]: +            # Exit quiz if number of questions for a round are already sent. +            if len(done_question) > self.question_limit and hint_no == 0: +                await ctx.send("The round has ended.") +                await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) + +                self.game_status[ctx.channel.id] = False +                del self.game_owners[ctx.channel.id] +                self.game_player_scores[ctx.channel.id] = {} + +                break + +            # If no hint has been sent or any time alert. Basically if hint_no = 0  means it is a new question. +            if hint_no == 0: +                # Select a random question which has not been used yet. +                while True: +                    question_dict = random.choice(topic) +                    if question_dict["id"] not in done_question: +                        done_question.append(question_dict["id"]) +                        break + +                if "dynamic_id" not in question_dict: +                    question = question_dict["question"] +                    answers = question_dict["answer"].split(", ") + +                    var_tol = STANDARD_VARIATION_TOLERANCE +                else: +                    format_func = DYNAMIC_QUESTIONS_FORMAT_FUNCS[question_dict["dynamic_id"]] + +                    quiz_entry = format_func( +                        question_dict["question"], +                        question_dict["answer"], +                    ) + +                    question, answers = quiz_entry.question, quiz_entry.answer +                    answers = [answers] + +                    var_tol = DYNAMICALLY_GEN_VARIATION_TOLERANCE + +                embed = discord.Embed( +                    colour=Colours.gold, +                    title=f"Question #{len(done_question)}", +                    description=question, +                ) + +                if img_url := question_dict.get("img_url"): +                    embed.set_image(url=img_url) + +                await ctx.send(embed=embed) + +            def check_func(variation_tolerance: int) -> Callable[[discord.Message], bool]: +                def contains_correct_answer(m: discord.Message) -> bool: +                    return m.channel == ctx.channel and any( +                        fuzz.ratio(answer.lower(), m.content.lower()) > variation_tolerance +                        for answer in answers +                    ) + +                return contains_correct_answer + +            try: +                msg = await self.bot.wait_for("message", check=check_func(var_tol), timeout=10) +            except asyncio.TimeoutError: +                # In case of TimeoutError and the game has been stopped, then do nothing. +                if not self.game_status[ctx.channel.id]: +                    break + +                if hint_no < 2: +                    hint_no += 1 + +                    if "hints" in question_dict: +                        hints = question_dict["hints"] + +                        await ctx.send(f"**Hint #{hint_no}\n**{hints[hint_no - 1]}") +                    else: +                        await ctx.send(f"{30 - hint_no * 10}s left!") + +                # Once hint or time alerts has been sent 2 times, the hint_no value will be 3 +                # If hint_no > 2, then it means that all hints/time alerts have been sent. +                # Also means that the answer is not yet given and the bot sends the answer and the next question. +                else: +                    if self.game_status[ctx.channel.id] is False: +                        break + +                    response = random.choice(WRONG_ANS_RESPONSE) +                    await ctx.send(response) + +                    await self.send_answer( +                        ctx.channel, +                        answers, +                        False, +                        question_dict, +                        self.question_limit - len(done_question) + 1, +                    ) +                    await asyncio.sleep(1) + +                    hint_no = 0  # Reset the hint counter so that on the next round, it's in the initial state + +                    await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) +                    await asyncio.sleep(2) +            else: +                if self.game_status[ctx.channel.id] is False: +                    break + +                points = 100 - 25 * hint_no +                if msg.author in self.game_player_scores[ctx.channel.id]: +                    self.game_player_scores[ctx.channel.id][msg.author] += points +                else: +                    self.game_player_scores[ctx.channel.id][msg.author] = points + +                # Also updating the overall scoreboard. +                if msg.author in self.player_scores: +                    self.player_scores[msg.author] += points +                else: +                    self.player_scores[msg.author] = points + +                hint_no = 0 + +                await ctx.send(f"{msg.author.mention} got the correct answer :tada: {points} points!") + +                await self.send_answer( +                    ctx.channel, +                    answers, +                    True, +                    question_dict, +                    self.question_limit - len(done_question) + 1, +                ) +                await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) + +                await asyncio.sleep(2) + +    def make_start_embed(self, category: str) -> discord.Embed: +        """Generate a starting/introduction embed for the quiz.""" +        start_embed = discord.Embed( +            colour=Colours.blue, +            title="A quiz game is starting!", +            description=( +                f"This game consists of {self.question_limit + 1} questions.\n\n" +                "**Rules: **\n" +                "1. Only enclose your answer in backticks when the question tells you to.\n" +                "2. If the question specifies an answer format, follow it or else it won't be accepted.\n" +                "3. You have 30s per question. Points for each question reduces by 25 after 10s or after a hint.\n" +                "4. No cheating and have fun!\n\n" +                f"**Category**: {category}" +            ), +        ) + +        return start_embed + +    @staticmethod +    def make_error_embed(desc: str) -> discord.Embed: +        """Generate an error embed with the given description.""" +        error_embed = discord.Embed( +            colour=Colours.soft_red, +            title=random.choice(NEGATIVE_REPLIES), +            description=desc, +        ) + +        return error_embed + +    @quiz_game.command(name="stop") +    async def stop_quiz(self, ctx: commands.Context) -> None: +        """ +        Stop a quiz game if its running in the channel. + +        Note: Only mods or the owner of the quiz can stop it. +        """ +        try: +            if self.game_status[ctx.channel.id]: +                # Check if the author is the game starter or a moderator. +                if ctx.author == self.game_owners[ctx.channel.id] or any( +                    Roles.moderator == role.id for role in ctx.author.roles +                ): +                    self.game_status[ctx.channel.id] = False +                    del self.game_owners[ctx.channel.id] +                    self.game_player_scores[ctx.channel.id] = {} + +                    await ctx.send("Quiz stopped.") +                    await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) + +                else: +                    await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!") +            else: +                await ctx.send("No quiz running.") +        except KeyError: +            await ctx.send("No quiz running.") + +    @quiz_game.command(name="leaderboard") +    async def leaderboard(self, ctx: commands.Context) -> None: +        """View everyone's score for this bot session.""" +        await self.send_score(ctx.channel, self.player_scores) + +    @staticmethod +    async def send_score(channel: discord.TextChannel, player_data: dict) -> None: +        """Send the current scores of players in the game channel.""" +        if len(player_data) == 0: +            await channel.send("No one has made it onto the leaderboard yet.") +            return + +        embed = discord.Embed( +            colour=Colours.blue, +            title="Score Board", +            description="", +        ) + +        sorted_dict = sorted(player_data.items(), key=operator.itemgetter(1), reverse=True) +        for item in sorted_dict: +            embed.description += f"{item[0]}: {item[1]}\n" + +        await channel.send(embed=embed) + +    @staticmethod +    async def declare_winner(channel: discord.TextChannel, player_data: dict) -> None: +        """Announce the winner of the quiz in the game channel.""" +        if player_data: +            highest_points = max(list(player_data.values())) +            no_of_winners = list(player_data.values()).count(highest_points) + +            # Check if more than 1 player has highest points. +            if no_of_winners > 1: +                winners = [] +                points_copy = list(player_data.values()).copy() + +                for _ in range(no_of_winners): +                    index = points_copy.index(highest_points) +                    winners.append(list(player_data.keys())[index]) +                    points_copy[index] = 0 + +                winners_mention = " ".join(winner.mention for winner in winners) +            else: +                author_index = list(player_data.values()).index(highest_points) +                winner = list(player_data.keys())[author_index] +                winners_mention = winner.mention + +            await channel.send( +                f"Congratulations {winners_mention} :tada: " +                f"You have won this quiz game with a grand total of {highest_points} points!" +            ) + +    def category_embed(self) -> discord.Embed: +        """Build an embed showing all available trivia categories.""" +        embed = discord.Embed( +            colour=Colours.blue, +            title="The available question categories are:", +            description="", +        ) + +        embed.set_footer(text="If a category is not chosen, a random one will be selected.") + +        for cat, description in self.categories.items(): +            embed.description += ( +                f"**- {cat.capitalize()}**\n" +                f"{description.capitalize()}\n" +            ) + +        return embed + +    @staticmethod +    async def send_answer( +        channel: discord.TextChannel, +        answers: list[str], +        answer_is_correct: bool, +        question_dict: dict, +        q_left: int, +    ) -> None: +        """Send the correct answer of a question to the game channel.""" +        info = question_dict.get("info") + +        plurality = " is" if len(answers) == 1 else "s are" + +        embed = discord.Embed( +            color=Colours.bright_green, +            title=( +                ("You got it! " if answer_is_correct else "") +                + f"The correct answer{plurality} **`{', '.join(answers)}`**\n" +            ), +            description="", +        ) + +        if info is not None: +            embed.description += f"**Information**\n{info}\n\n" + +        embed.description += ( +            ("Let's move to the next question." if q_left > 0 else "") +            + f"\nRemaining questions: {q_left}" +        ) +        await channel.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the TriviaQuiz cog.""" +    bot.add_cog(TriviaQuiz(bot)) diff --git a/bot/exts/fun/wonder_twins.py b/bot/exts/fun/wonder_twins.py new file mode 100644 index 00000000..79d6b6d9 --- /dev/null +++ b/bot/exts/fun/wonder_twins.py @@ -0,0 +1,49 @@ +import random +from pathlib import Path + +import yaml +from discord.ext.commands import Cog, Context, command + +from bot.bot import Bot + + +class WonderTwins(Cog): +    """Cog for a Wonder Twins inspired command.""" + +    def __init__(self): +        with open(Path.cwd() / "bot" / "resources" / "fun" / "wonder_twins.yaml", "r", encoding="utf-8") as f: +            info = yaml.load(f, Loader=yaml.FullLoader) +            self.water_types = info["water_types"] +            self.objects = info["objects"] +            self.adjectives = info["adjectives"] + +    @staticmethod +    def append_onto(phrase: str, insert_word: str) -> str: +        """Appends one word onto the end of another phrase in order to format with the proper determiner.""" +        if insert_word.endswith("s"): +            phrase = phrase.split() +            del phrase[0] +            phrase = " ".join(phrase) + +        insert_word = insert_word.split()[-1] +        return " ".join([phrase, insert_word]) + +    def format_phrase(self) -> str: +        """Creates a transformation phrase from available words.""" +        adjective = random.choice((None, random.choice(self.adjectives))) +        object_name = random.choice(self.objects) +        water_type = random.choice(self.water_types) + +        if adjective: +            object_name = self.append_onto(adjective, object_name) +        return f"{object_name} of {water_type}" + +    @command(name="formof", aliases=("wondertwins", "wondertwin", "fo")) +    async def form_of(self, ctx: Context) -> None: +        """Command to send a Wonder Twins inspired phrase to the user invoking the command.""" +        await ctx.send(f"Form of {self.format_phrase()}!") + + +def setup(bot: Bot) -> None: +    """Load the WonderTwins cog.""" +    bot.add_cog(WonderTwins()) diff --git a/bot/exts/fun/xkcd.py b/bot/exts/fun/xkcd.py new file mode 100644 index 00000000..b56c53d9 --- /dev/null +++ b/bot/exts/fun/xkcd.py @@ -0,0 +1,91 @@ +import logging +import re +from random import randint +from typing import Optional, Union + +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, command + +from bot.bot import Bot +from bot.constants import Colours + +log = logging.getLogger(__name__) + +COMIC_FORMAT = re.compile(r"latest|[0-9]+") +BASE_URL = "https://xkcd.com" + + +class XKCD(Cog): +    """Retrieving XKCD comics.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.latest_comic_info: dict[str, Union[str, int]] = {} +        self.get_latest_comic_info.start() + +    def cog_unload(self) -> None: +        """Cancels refreshing of the task for refreshing the most recent comic info.""" +        self.get_latest_comic_info.cancel() + +    @tasks.loop(minutes=30) +    async def get_latest_comic_info(self) -> None: +        """Refreshes latest comic's information ever 30 minutes. Also used for finding a random comic.""" +        async with self.bot.http_session.get(f"{BASE_URL}/info.0.json") as resp: +            if resp.status == 200: +                self.latest_comic_info = await resp.json() +            else: +                log.debug(f"Failed to get latest XKCD comic information. Status code {resp.status}") + +    @command(name="xkcd") +    async def fetch_xkcd_comics(self, ctx: Context, comic: Optional[str]) -> None: +        """ +        Getting an xkcd comic's information along with the image. + +        To get a random comic, don't type any number as an argument. To get the latest, type 'latest'. +        """ +        embed = Embed(title=f"XKCD comic '{comic}'") + +        embed.colour = Colours.soft_red + +        if comic and (comic := re.match(COMIC_FORMAT, comic)) is None: +            embed.description = "Comic parameter should either be an integer or 'latest'." +            await ctx.send(embed=embed) +            return + +        comic = randint(1, self.latest_comic_info["num"]) if comic is None else comic.group(0) + +        if comic == "latest": +            info = self.latest_comic_info +        else: +            async with self.bot.http_session.get(f"{BASE_URL}/{comic}/info.0.json") as resp: +                if resp.status == 200: +                    info = await resp.json() +                else: +                    embed.title = f"XKCD comic #{comic}" +                    embed.description = f"{resp.status}: Could not retrieve xkcd comic #{comic}." +                    log.debug(f"Retrieving xkcd comic #{comic} failed with status code {resp.status}.") +                    await ctx.send(embed=embed) +                    return + +        embed.title = f"XKCD comic #{info['num']}" +        embed.description = info["alt"] +        embed.url = f"{BASE_URL}/{info['num']}" + +        if info["img"][-3:] in ("jpg", "png", "gif"): +            embed.set_image(url=info["img"]) +            date = f"{info['year']}/{info['month']}/{info['day']}" +            embed.set_footer(text=f"{date} - #{info['num']}, \'{info['safe_title']}\'") +            embed.colour = Colours.soft_green +        else: +            embed.description = ( +                "The selected comic is interactive, and cannot be displayed within an embed.\n" +                f"Comic can be viewed [here](https://xkcd.com/{info['num']})." +            ) + +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the XKCD cog.""" +    bot.add_cog(XKCD(bot)) | 
