diff options
Diffstat (limited to 'bot/exts/fun')
| -rw-r--r-- | bot/exts/fun/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/fun/battleship.py | 448 | ||||
| -rw-r--r-- | bot/exts/fun/catify.py | 86 | ||||
| -rw-r--r-- | bot/exts/fun/coinflip.py | 53 | ||||
| -rw-r--r-- | bot/exts/fun/connect_four.py | 452 | ||||
| -rw-r--r-- | bot/exts/fun/duck_game.py | 336 | ||||
| -rw-r--r-- | bot/exts/fun/fun.py | 250 | ||||
| -rw-r--r-- | bot/exts/fun/game.py | 485 | ||||
| -rw-r--r-- | bot/exts/fun/magic_8ball.py | 30 | ||||
| -rw-r--r-- | bot/exts/fun/minesweeper.py | 270 | ||||
| -rw-r--r-- | bot/exts/fun/movie.py | 205 | ||||
| -rw-r--r-- | bot/exts/fun/recommend_game.py | 51 | ||||
| -rw-r--r-- | bot/exts/fun/rps.py | 57 | ||||
| -rw-r--r-- | bot/exts/fun/space.py | 236 | ||||
| -rw-r--r-- | bot/exts/fun/speedrun.py | 26 | ||||
| -rw-r--r-- | bot/exts/fun/status_codes.py | 87 | ||||
| -rw-r--r-- | bot/exts/fun/tic_tac_toe.py | 335 | ||||
| -rw-r--r-- | bot/exts/fun/trivia_quiz.py | 593 | ||||
| -rw-r--r-- | bot/exts/fun/wonder_twins.py | 49 | ||||
| -rw-r--r-- | bot/exts/fun/xkcd.py | 91 |
20 files changed, 4140 insertions, 0 deletions
diff --git a/bot/exts/fun/__init__.py b/bot/exts/fun/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/fun/__init__.py diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py new file mode 100644 index 00000000..f4351954 --- /dev/null +++ b/bot/exts/fun/battleship.py @@ -0,0 +1,448 @@ +import asyncio +import logging +import random +import re +from dataclasses import dataclass +from functools import partial +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +log = logging.getLogger(__name__) + + +@dataclass +class Square: + """Each square on the battleship grid - if they contain a boat and if they've been aimed at.""" + + boat: Optional[str] + aimed: bool + + +Grid = list[list[Square]] +EmojiSet = dict[tuple[bool, bool], str] + + +@dataclass +class Player: + """Each player in the game - their messages for the boards and their current grid.""" + + user: Optional[discord.Member] + board: Optional[discord.Message] + opponent_board: discord.Message + grid: Grid + + +# The name of the ship and its size +SHIPS = { + "Carrier": 5, + "Battleship": 4, + "Cruiser": 3, + "Submarine": 3, + "Destroyer": 2, +} + + +# For these two variables, the first boolean is whether the square is a ship (True) or not (False). +# The second boolean is whether the player has aimed for that square (True) or not (False) + +# This is for the player's own board which shows the location of their own ships. +SHIP_EMOJIS = { + (True, True): ":fire:", + (True, False): ":ship:", + (False, True): ":anger:", + (False, False): ":ocean:", +} + +# This is for the opposing player's board which only shows aimed locations. +HIDDEN_EMOJIS = { + (True, True): ":red_circle:", + (True, False): ":black_circle:", + (False, True): ":white_circle:", + (False, False): ":black_circle:", +} + +# For the top row of the board +LETTERS = ( + ":stop_button::regional_indicator_a::regional_indicator_b::regional_indicator_c::regional_indicator_d:" + ":regional_indicator_e::regional_indicator_f::regional_indicator_g::regional_indicator_h:" + ":regional_indicator_i::regional_indicator_j:" +) + +# For the first column of the board +NUMBERS = [ + ":one:", + ":two:", + ":three:", + ":four:", + ":five:", + ":six:", + ":seven:", + ":eight:", + ":nine:", + ":keycap_ten:", +] + +CROSS_EMOJI = "\u274e" +HAND_RAISED_EMOJI = "\U0001f64b" + + +class Game: + """A Battleship Game.""" + + def __init__( + self, + bot: Bot, + channel: discord.TextChannel, + player1: discord.Member, + player2: discord.Member + ): + + self.bot = bot + self.public_channel = channel + + self.p1 = Player(player1, None, None, self.generate_grid()) + self.p2 = Player(player2, None, None, self.generate_grid()) + + self.gameover: bool = False + + self.turn: Optional[discord.Member] = None + self.next: Optional[discord.Member] = None + + self.match: Optional[re.Match] = None + self.surrender: bool = False + + self.setup_grids() + + @staticmethod + def generate_grid() -> Grid: + """Generates a grid by instantiating the Squares.""" + return [[Square(None, False) for _ in range(10)] for _ in range(10)] + + @staticmethod + def format_grid(player: Player, emojiset: EmojiSet) -> str: + """ + Gets and formats the grid as a list into a string to be output to the DM. + + Also adds the Letter and Number indexes. + """ + grid = [ + [emojiset[bool(square.boat), square.aimed] for square in row] + for row in player.grid + ] + + rows = ["".join([number] + row) for number, row in zip(NUMBERS, grid)] + return "\n".join([LETTERS] + rows) + + @staticmethod + def get_square(grid: Grid, square: str) -> Square: + """Grabs a square from a grid with an inputted key.""" + index = ord(square[0].upper()) - ord("A") + number = int(square[1:]) + + return grid[number-1][index] # -1 since lists are indexed from 0 + + async def game_over( + self, + *, + winner: discord.Member, + loser: discord.Member + ) -> None: + """Removes games from list of current games and announces to public chat.""" + await self.public_channel.send(f"Game Over! {winner.mention} won against {loser.mention}") + + for player in (self.p1, self.p2): + grid = self.format_grid(player, SHIP_EMOJIS) + await self.public_channel.send(f"{player.user}'s Board:\n{grid}") + + @staticmethod + def check_sink(grid: Grid, boat: str) -> bool: + """Checks if all squares containing a given boat have sunk.""" + return all(square.aimed for row in grid for square in row if square.boat == boat) + + @staticmethod + def check_gameover(grid: Grid) -> bool: + """Checks if all boats have been sunk.""" + return all(square.aimed for row in grid for square in row if square.boat) + + def setup_grids(self) -> None: + """Places the boats on the grids to initialise the game.""" + for player in (self.p1, self.p2): + for name, size in SHIPS.items(): + while True: # Repeats if about to overwrite another boat + ship_collision = False + coords = [] + + coord1 = random.randint(0, 9) + coord2 = random.randint(0, 10 - size) + + if random.choice((True, False)): # Vertical or Horizontal + x, y = coord1, coord2 + xincr, yincr = 0, 1 + else: + x, y = coord2, coord1 + xincr, yincr = 1, 0 + + for i in range(size): + new_x = x + (xincr * i) + new_y = y + (yincr * i) + if player.grid[new_x][new_y].boat: # Check if there's already a boat + ship_collision = True + break + coords.append((new_x, new_y)) + if not ship_collision: # If not overwriting any other boat spaces, break loop + break + + for x, y in coords: + player.grid[x][y].boat = name + + async def print_grids(self) -> None: + """Prints grids to the DM channels.""" + # Convert squares into Emoji + + boards = [ + self.format_grid(player, emojiset) + for emojiset in (HIDDEN_EMOJIS, SHIP_EMOJIS) + for player in (self.p1, self.p2) + ] + + locations = ( + (self.p2, "opponent_board"), (self.p1, "opponent_board"), + (self.p1, "board"), (self.p2, "board") + ) + + for board, location in zip(boards, locations): + player, attr = location + if getattr(player, attr): + await getattr(player, attr).edit(content=board) + else: + setattr(player, attr, await player.user.send(board)) + + def predicate(self, message: discord.Message) -> bool: + """Predicate checking the message typed for each turn.""" + if message.author == self.turn.user and message.channel == self.turn.user.dm_channel: + if message.content.lower() == "surrender": + self.surrender = True + return True + self.match = re.fullmatch("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip()) + if not self.match: + self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI)) + return bool(self.match) + + async def take_turn(self) -> Optional[Square]: + """Lets the player who's turn it is choose a square.""" + square = None + turn_message = await self.turn.user.send( + "It's your turn! Type the square you want to fire at. Format it like this: A1\n" + "Type `surrender` to give up." + ) + await self.next.user.send("Their turn", delete_after=3.0) + while True: + try: + await self.bot.wait_for("message", check=self.predicate, timeout=60.0) + except asyncio.TimeoutError: + await self.turn.user.send("You took too long. Game over!") + await self.next.user.send(f"{self.turn.user} took too long. Game over!") + await self.public_channel.send( + f"Game over! {self.turn.user.mention} timed out so {self.next.user.mention} wins!" + ) + self.gameover = True + break + else: + if self.surrender: + await self.next.user.send(f"{self.turn.user} surrendered. Game over!") + await self.public_channel.send( + f"Game over! {self.turn.user.mention} surrendered to {self.next.user.mention}!" + ) + self.gameover = True + break + square = self.get_square(self.next.grid, self.match.string) + if square.aimed: + await self.turn.user.send("You've already aimed at this square!", delete_after=3.0) + else: + break + await turn_message.delete() + return square + + async def hit(self, square: Square, alert_messages: list[discord.Message]) -> None: + """Occurs when a player successfully aims for a ship.""" + await self.turn.user.send("Hit!", delete_after=3.0) + alert_messages.append(await self.next.user.send("Hit!")) + if self.check_sink(self.next.grid, square.boat): + await self.turn.user.send(f"You've sunk their {square.boat} ship!", delete_after=3.0) + alert_messages.append(await self.next.user.send(f"Oh no! Your {square.boat} ship sunk!")) + if self.check_gameover(self.next.grid): + await self.turn.user.send("You win!") + await self.next.user.send("You lose!") + self.gameover = True + await self.game_over(winner=self.turn.user, loser=self.next.user) + + async def start_game(self) -> None: + """Begins the game.""" + await self.p1.user.send(f"You're playing battleship with {self.p2.user}.") + await self.p2.user.send(f"You're playing battleship with {self.p1.user}.") + + alert_messages = [] + + self.turn = self.p1 + self.next = self.p2 + + while True: + await self.print_grids() + + if self.gameover: + return + + square = await self.take_turn() + if not square: + return + square.aimed = True + + for message in alert_messages: + await message.delete() + + alert_messages = [] + alert_messages.append(await self.next.user.send(f"{self.turn.user} aimed at {self.match.string}!")) + + if square.boat: + await self.hit(square, alert_messages) + if self.gameover: + return + else: + await self.turn.user.send("Miss!", delete_after=3.0) + alert_messages.append(await self.next.user.send("Miss!")) + + self.turn, self.next = self.next, self.turn + + +class Battleship(commands.Cog): + """Play the classic game Battleship!""" + + def __init__(self, bot: Bot): + self.bot = bot + self.games: list[Game] = [] + self.waiting: list[discord.Member] = [] + + def predicate( + self, + ctx: commands.Context, + announcement: discord.Message, + reaction: discord.Reaction, + user: discord.Member + ) -> bool: + """Predicate checking the criteria for the announcement message.""" + if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2 + return True # Is dealt with later on + if ( + user.id not in (ctx.me.id, ctx.author.id) + and str(reaction.emoji) == HAND_RAISED_EMOJI + and reaction.message.id == announcement.id + ): + if self.already_playing(user): + self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) + self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) + return False + + if user in self.waiting: + self.bot.loop.create_task(ctx.send( + f"{user.mention} Please cancel your game first before joining another one." + )) + self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) + return False + + return True + + if ( + user.id == ctx.author.id + and str(reaction.emoji) == CROSS_EMOJI + and reaction.message.id == announcement.id + ): + return True + return False + + def already_playing(self, player: discord.Member) -> bool: + """Check if someone is already in a game.""" + return any(player in (game.p1.user, game.p2.user) for game in self.games) + + @commands.group(invoke_without_command=True) + @commands.guild_only() + async def battleship(self, ctx: commands.Context) -> None: + """ + Play a game of Battleship with someone else! + + This will set up a message waiting for someone else to react and play along. + The game takes place entirely in DMs. + Make sure you have your DMs open so that the bot can message you. + """ + if self.already_playing(ctx.author): + await ctx.send("You're already playing a game!") + return + + if ctx.author in self.waiting: + await ctx.send("You've already sent out a request for a player 2.") + return + + announcement = await ctx.send( + "**Battleship**: A new game is about to start!\n" + f"Press {HAND_RAISED_EMOJI} to play against {ctx.author.mention}!\n" + f"(Cancel the game with {CROSS_EMOJI}.)" + ) + self.waiting.append(ctx.author) + await announcement.add_reaction(HAND_RAISED_EMOJI) + await announcement.add_reaction(CROSS_EMOJI) + + try: + reaction, user = await self.bot.wait_for( + "reaction_add", + check=partial(self.predicate, ctx, announcement), + timeout=60.0 + ) + except asyncio.TimeoutError: + self.waiting.remove(ctx.author) + await announcement.delete() + await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...") + return + + if str(reaction.emoji) == CROSS_EMOJI: + self.waiting.remove(ctx.author) + await announcement.delete() + await ctx.send(f"{ctx.author.mention} Game cancelled.") + return + + await announcement.delete() + self.waiting.remove(ctx.author) + if self.already_playing(ctx.author): + return + game = Game(self.bot, ctx.channel, ctx.author, user) + self.games.append(game) + try: + await game.start_game() + self.games.remove(game) + except discord.Forbidden: + await ctx.send( + f"{ctx.author.mention} {user.mention} " + "Game failed. This is likely due to you not having your DMs open. Check and try again." + ) + self.games.remove(game) + except Exception: + # End the game in the event of an unforseen error so the players aren't stuck in a game + await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed.") + self.games.remove(game) + raise + + @battleship.command(name="ships", aliases=("boats",)) + async def battleship_ships(self, ctx: commands.Context) -> None: + """Lists the ships that are found on the battleship grid.""" + embed = discord.Embed(colour=Colours.blue) + embed.add_field(name="Name", value="\n".join(SHIPS)) + embed.add_field(name="Size", value="\n".join(str(size) for size in SHIPS.values())) + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the Battleship Cog.""" + bot.add_cog(Battleship(bot)) diff --git a/bot/exts/fun/catify.py b/bot/exts/fun/catify.py new file mode 100644 index 00000000..32dfae09 --- /dev/null +++ b/bot/exts/fun/catify.py @@ -0,0 +1,86 @@ +import random +from contextlib import suppress +from typing import Optional + +from discord import AllowedMentions, Embed, Forbidden +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Cats, Colours, NEGATIVE_REPLIES +from bot.utils import helpers + + +class Catify(commands.Cog): + """Cog for the catify command.""" + + @commands.command(aliases=("ᓚᘏᗢify", "ᓚᘏᗢ")) + @commands.cooldown(1, 5, commands.BucketType.user) + async def catify(self, ctx: commands.Context, *, text: Optional[str]) -> None: + """ + Convert the provided text into a cat themed sentence by interspercing cats throughout text. + + If no text is given then the users nickname is edited. + """ + if not text: + display_name = ctx.author.display_name + + if len(display_name) > 26: + embed = Embed( + title=random.choice(NEGATIVE_REPLIES), + description=( + "Your display name is too long to be catified! " + "Please change it to be under 26 characters." + ), + color=Colours.soft_red + ) + await ctx.send(embed=embed) + return + + else: + display_name += f" | {random.choice(Cats.cats)}" + + await ctx.send(f"Your catified nickname is: `{display_name}`", allowed_mentions=AllowedMentions.none()) + + with suppress(Forbidden): + await ctx.author.edit(nick=display_name) + else: + if len(text) >= 1500: + embed = Embed( + title=random.choice(NEGATIVE_REPLIES), + description="Submitted text was too large! Please submit something under 1500 characters.", + color=Colours.soft_red + ) + await ctx.send(embed=embed) + return + + string_list = text.split() + for index, name in enumerate(string_list): + name = name.lower() + if "cat" in name: + if random.randint(0, 5) == 5: + string_list[index] = name.replace("cat", f"**{random.choice(Cats.cats)}**") + else: + string_list[index] = name.replace("cat", random.choice(Cats.cats)) + for element in Cats.cats: + if element in name: + string_list[index] = name.replace(element, "cat") + + string_len = len(string_list) // 3 or len(string_list) + + for _ in range(random.randint(1, string_len)): + # insert cat at random index + if random.randint(0, 5) == 5: + string_list.insert(random.randint(0, len(string_list)), f"**{random.choice(Cats.cats)}**") + else: + string_list.insert(random.randint(0, len(string_list)), random.choice(Cats.cats)) + + text = helpers.suppress_links(" ".join(string_list)) + await ctx.send( + f">>> {text}", + allowed_mentions=AllowedMentions.none() + ) + + +def setup(bot: Bot) -> None: + """Loads the catify cog.""" + bot.add_cog(Catify()) diff --git a/bot/exts/fun/coinflip.py b/bot/exts/fun/coinflip.py new file mode 100644 index 00000000..804306bd --- /dev/null +++ b/bot/exts/fun/coinflip.py @@ -0,0 +1,53 @@ +import random + +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Emojis + + +class CoinSide(commands.Converter): + """Class used to convert the `side` parameter of coinflip command.""" + + HEADS = ("h", "head", "heads") + TAILS = ("t", "tail", "tails") + + async def convert(self, ctx: commands.Context, side: str) -> str: + """Converts the provided `side` into the corresponding string.""" + side = side.lower() + if side in self.HEADS: + return "heads" + + if side in self.TAILS: + return "tails" + + raise commands.BadArgument(f"{side!r} is not a valid coin side.") + + +class CoinFlip(commands.Cog): + """Cog for the CoinFlip command.""" + + @commands.command(name="coinflip", aliases=("flip", "coin", "cf")) + async def coinflip_command(self, ctx: commands.Context, side: CoinSide = None) -> None: + """ + Flips a coin. + + If `side` is provided will state whether you guessed the side correctly. + """ + flipped_side = random.choice(["heads", "tails"]) + + message = f"{ctx.author.mention} flipped **{flipped_side}**. " + if not side: + await ctx.send(message) + return + + if side == flipped_side: + message += f"You guessed correctly! {Emojis.lemon_hyperpleased}" + else: + message += f"You guessed incorrectly. {Emojis.lemon_pensive}" + await ctx.send(message) + + +def setup(bot: Bot) -> None: + """Loads the coinflip cog.""" + bot.add_cog(CoinFlip()) diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py new file mode 100644 index 00000000..647bb2b7 --- /dev/null +++ b/bot/exts/fun/connect_four.py @@ -0,0 +1,452 @@ +import asyncio +import random +from functools import partial +from typing import Optional, Union + +import discord +import emojis +from discord.ext import commands +from discord.ext.commands import guild_only + +from bot.bot import Bot +from bot.constants import Emojis + +NUMBERS = list(Emojis.number_emojis.values()) +CROSS_EMOJI = Emojis.incident_unactioned + +Coordinate = Optional[tuple[int, int]] +EMOJI_CHECK = Union[discord.Emoji, str] + + +class Game: + """A Connect 4 Game.""" + + def __init__( + self, + bot: Bot, + channel: discord.TextChannel, + player1: discord.Member, + player2: Optional[discord.Member], + tokens: list[str], + size: int = 7 + ): + self.bot = bot + self.channel = channel + self.player1 = player1 + self.player2 = player2 or AI(self.bot, game=self) + self.tokens = tokens + + self.grid = self.generate_board(size) + self.grid_size = size + + self.unicode_numbers = NUMBERS[:self.grid_size] + + self.message = None + + self.player_active = None + self.player_inactive = None + + @staticmethod + def generate_board(size: int) -> list[list[int]]: + """Generate the connect 4 board.""" + return [[0 for _ in range(size)] for _ in range(size)] + + async def print_grid(self) -> None: + """Formats and outputs the Connect Four grid to the channel.""" + title = ( + f"Connect 4: {self.player1.display_name}" + f" VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}" + ) + + rows = [" ".join(self.tokens[s] for s in row) for row in self.grid] + first_row = " ".join(x for x in NUMBERS[:self.grid_size]) + formatted_grid = "\n".join([first_row] + rows) + embed = discord.Embed(title=title, description=formatted_grid) + + if self.message: + await self.message.edit(embed=embed) + else: + self.message = await self.channel.send(content="Loading...") + for emoji in self.unicode_numbers: + await self.message.add_reaction(emoji) + await self.message.add_reaction(CROSS_EMOJI) + await self.message.edit(content=None, embed=embed) + + async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None: + """Announces to public chat.""" + if action == "win": + await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}") + elif action == "draw": + await self.channel.send(f"Game Over! {player1.mention} {player2.mention} It's A Draw :tada:") + elif action == "quit": + await self.channel.send(f"{self.player1.mention} surrendered. Game over!") + await self.print_grid() + + async def start_game(self) -> None: + """Begins the game.""" + self.player_active, self.player_inactive = self.player1, self.player2 + + while True: + await self.print_grid() + + if isinstance(self.player_active, AI): + coords = self.player_active.play() + if not coords: + await self.game_over( + "draw", + self.bot.user if isinstance(self.player_active, AI) else self.player_active, + self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, + ) + else: + coords = await self.player_turn() + + if not coords: + return + + if self.check_win(coords, 1 if self.player_active == self.player1 else 2): + await self.game_over( + "win", + self.bot.user if isinstance(self.player_active, AI) else self.player_active, + self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, + ) + return + + self.player_active, self.player_inactive = self.player_inactive, self.player_active + + def predicate(self, reaction: discord.Reaction, user: discord.Member) -> bool: + """The predicate to check for the player's reaction.""" + return ( + reaction.message.id == self.message.id + and user.id == self.player_active.id + and str(reaction.emoji) in (*self.unicode_numbers, CROSS_EMOJI) + ) + + async def player_turn(self) -> Coordinate: + """Initiate the player's turn.""" + message = await self.channel.send( + f"{self.player_active.mention}, it's your turn! React with the column you want to place your token in." + ) + player_num = 1 if self.player_active == self.player1 else 2 + while True: + try: + reaction, user = await self.bot.wait_for("reaction_add", check=self.predicate, timeout=30.0) + except asyncio.TimeoutError: + await self.channel.send(f"{self.player_active.mention}, you took too long. Game over!") + return + else: + await message.delete() + if str(reaction.emoji) == CROSS_EMOJI: + await self.game_over("quit", self.player_active, self.player_inactive) + return + + await self.message.remove_reaction(reaction, user) + + column_num = self.unicode_numbers.index(str(reaction.emoji)) + column = [row[column_num] for row in self.grid] + + for row_num, square in reversed(list(enumerate(column))): + if not square: + self.grid[row_num][column_num] = player_num + return row_num, column_num + message = await self.channel.send(f"Column {column_num + 1} is full. Try again") + + def check_win(self, coords: Coordinate, player_num: int) -> bool: + """Check that placing a counter here would cause the player to win.""" + vertical = [(-1, 0), (1, 0)] + horizontal = [(0, 1), (0, -1)] + forward_diag = [(-1, 1), (1, -1)] + backward_diag = [(-1, -1), (1, 1)] + axes = [vertical, horizontal, forward_diag, backward_diag] + + for axis in axes: + counters_in_a_row = 1 # The initial counter that is compared to + for (row_incr, column_incr) in axis: + row, column = coords + row += row_incr + column += column_incr + + while 0 <= row < self.grid_size and 0 <= column < self.grid_size: + if self.grid[row][column] == player_num: + counters_in_a_row += 1 + row += row_incr + column += column_incr + else: + break + if counters_in_a_row >= 4: + return True + return False + + +class AI: + """The Computer Player for Single-Player games.""" + + def __init__(self, bot: Bot, game: Game): + self.game = game + self.mention = bot.user.mention + + def get_possible_places(self) -> list[Coordinate]: + """Gets all the coordinates where the AI could possibly place a counter.""" + possible_coords = [] + for column_num in range(self.game.grid_size): + column = [row[column_num] for row in self.game.grid] + for row_num, square in reversed(list(enumerate(column))): + if not square: + possible_coords.append((row_num, column_num)) + break + return possible_coords + + def check_ai_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]: + """ + Check AI win. + + Check if placing a counter in any possible coordinate would cause the AI to win + with 10% chance of not winning and returning None + """ + if random.randint(1, 10) == 1: + return + for coords in coord_list: + if self.game.check_win(coords, 2): + return coords + + def check_player_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]: + """ + Check Player win. + + Check if placing a counter in possible coordinates would stop the player + from winning with 25% of not blocking them and returning None. + """ + if random.randint(1, 4) == 1: + return + for coords in coord_list: + if self.game.check_win(coords, 1): + return coords + + @staticmethod + def random_coords(coord_list: list[Coordinate]) -> Coordinate: + """Picks a random coordinate from the possible ones.""" + return random.choice(coord_list) + + def play(self) -> Union[Coordinate, bool]: + """ + Plays for the AI. + + Gets all possible coords, and determins the move: + 1. coords where it can win. + 2. coords where the player can win. + 3. Random coord + The first possible value is choosen. + """ + possible_coords = self.get_possible_places() + + if not possible_coords: + return False + + coords = ( + self.check_ai_win(possible_coords) + or self.check_player_win(possible_coords) + or self.random_coords(possible_coords) + ) + + row, column = coords + self.game.grid[row][column] = 2 + return coords + + +class ConnectFour(commands.Cog): + """Connect Four. The Classic Vertical Four-in-a-row Game!""" + + def __init__(self, bot: Bot): + self.bot = bot + self.games: list[Game] = [] + self.waiting: list[discord.Member] = [] + + self.tokens = [":white_circle:", ":blue_circle:", ":red_circle:"] + + self.max_board_size = 9 + self.min_board_size = 5 + + async def check_author(self, ctx: commands.Context, board_size: int) -> bool: + """Check if the requester is free and the board size is correct.""" + if self.already_playing(ctx.author): + await ctx.send("You're already playing a game!") + return False + + if ctx.author in self.waiting: + await ctx.send("You've already sent out a request for a player 2") + return False + + if not self.min_board_size <= board_size <= self.max_board_size: + await ctx.send( + f"{board_size} is not a valid board size. A valid board size is " + f"between `{self.min_board_size}` and `{self.max_board_size}`." + ) + return False + + return True + + def get_player( + self, + ctx: commands.Context, + announcement: discord.Message, + reaction: discord.Reaction, + user: discord.Member + ) -> bool: + """Predicate checking the criteria for the announcement message.""" + if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2 + return True # Is dealt with later on + + if ( + user.id not in (ctx.me.id, ctx.author.id) + and str(reaction.emoji) == Emojis.hand_raised + and reaction.message.id == announcement.id + ): + if self.already_playing(user): + self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) + self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) + return False + + if user in self.waiting: + self.bot.loop.create_task(ctx.send( + f"{user.mention} Please cancel your game first before joining another one." + )) + self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) + return False + + return True + + if ( + user.id == ctx.author.id + and str(reaction.emoji) == CROSS_EMOJI + and reaction.message.id == announcement.id + ): + return True + return False + + def already_playing(self, player: discord.Member) -> bool: + """Check if someone is already in a game.""" + return any(player in (game.player1, game.player2) for game in self.games) + + @staticmethod + def check_emojis( + e1: EMOJI_CHECK, e2: EMOJI_CHECK + ) -> tuple[bool, Optional[str]]: + """Validate the emojis, the user put.""" + if isinstance(e1, str) and emojis.count(e1) != 1: + return False, e1 + if isinstance(e2, str) and emojis.count(e2) != 1: + return False, e2 + return True, None + + async def _play_game( + self, + ctx: commands.Context, + user: Optional[discord.Member], + board_size: int, + emoji1: str, + emoji2: str + ) -> None: + """Helper for playing a game of connect four.""" + self.tokens = [":white_circle:", str(emoji1), str(emoji2)] + game = None # if game fails to intialize in try...except + + try: + game = Game(self.bot, ctx.channel, ctx.author, user, self.tokens, size=board_size) + self.games.append(game) + await game.start_game() + self.games.remove(game) + except Exception: + # End the game in the event of an unforeseen error so the players aren't stuck in a game + await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed.") + if game in self.games: + self.games.remove(game) + raise + + @guild_only() + @commands.group( + invoke_without_command=True, + aliases=("4inarow", "connect4", "connectfour", "c4"), + case_insensitive=True + ) + async def connect_four( + self, + ctx: commands.Context, + board_size: int = 7, + emoji1: EMOJI_CHECK = "\U0001f535", + emoji2: EMOJI_CHECK = "\U0001f534" + ) -> None: + """ + Play the classic game of Connect Four with someone! + + Sets up a message waiting for someone else to react and play along. + The game will start once someone has reacted. + All inputs will be through reactions. + """ + check, emoji = self.check_emojis(emoji1, emoji2) + if not check: + raise commands.EmojiNotFound(emoji) + + check_author_result = await self.check_author(ctx, board_size) + if not check_author_result: + return + + announcement = await ctx.send( + "**Connect Four**: A new game is about to start!\n" + f"Press {Emojis.hand_raised} to play against {ctx.author.mention}!\n" + f"(Cancel the game with {CROSS_EMOJI}.)" + ) + self.waiting.append(ctx.author) + await announcement.add_reaction(Emojis.hand_raised) + await announcement.add_reaction(CROSS_EMOJI) + + try: + reaction, user = await self.bot.wait_for( + "reaction_add", + check=partial(self.get_player, ctx, announcement), + timeout=60.0 + ) + except asyncio.TimeoutError: + self.waiting.remove(ctx.author) + await announcement.delete() + await ctx.send( + f"{ctx.author.mention} Seems like there's no one here to play. " + f"Use `{ctx.prefix}{ctx.invoked_with} ai` to play against a computer." + ) + return + + if str(reaction.emoji) == CROSS_EMOJI: + self.waiting.remove(ctx.author) + await announcement.delete() + await ctx.send(f"{ctx.author.mention} Game cancelled.") + return + + await announcement.delete() + self.waiting.remove(ctx.author) + if self.already_playing(ctx.author): + return + + await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2)) + + @guild_only() + @connect_four.command(aliases=("bot", "computer", "cpu")) + async def ai( + self, + ctx: commands.Context, + board_size: int = 7, + emoji1: EMOJI_CHECK = "\U0001f535", + emoji2: EMOJI_CHECK = "\U0001f534" + ) -> None: + """Play Connect Four against a computer player.""" + check, emoji = self.check_emojis(emoji1, emoji2) + if not check: + raise commands.EmojiNotFound(emoji) + + check_author_result = await self.check_author(ctx, board_size) + if not check_author_result: + return + + await self._play_game(ctx, None, board_size, str(emoji1), str(emoji2)) + + +def setup(bot: Bot) -> None: + """Load ConnectFour Cog.""" + bot.add_cog(ConnectFour(bot)) diff --git a/bot/exts/fun/duck_game.py b/bot/exts/fun/duck_game.py new file mode 100644 index 00000000..1ef7513f --- /dev/null +++ b/bot/exts/fun/duck_game.py @@ -0,0 +1,336 @@ +import asyncio +import random +import re +from collections import defaultdict +from io import BytesIO +from itertools import product +from pathlib import Path + +import discord +from PIL import Image, ImageDraw, ImageFont +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, MODERATION_ROLES +from bot.utils.decorators import with_role + +DECK = list(product(*[(0, 1, 2)]*4)) + +GAME_DURATION = 180 + +# Scoring +CORRECT_SOLN = 1 +INCORRECT_SOLN = -1 +CORRECT_GOOSE = 2 +INCORRECT_GOOSE = -1 + +# Distribution of minimum acceptable solutions at board generation. +# This is for gameplay reasons, to shift the number of solutions per board up, +# while still making the end of the game unpredictable. +# Note: this is *not* the same as the distribution of number of solutions. + +SOLN_DISTR = 0, 0.05, 0.05, 0.1, 0.15, 0.25, 0.2, 0.15, .05 + +IMAGE_PATH = Path("bot", "resources", "fun", "all_cards.png") +FONT_PATH = Path("bot", "resources", "fun", "LuckiestGuy-Regular.ttf") +HELP_IMAGE_PATH = Path("bot", "resources", "fun", "ducks_help_ex.png") + +ALL_CARDS = Image.open(IMAGE_PATH) +LABEL_FONT = ImageFont.truetype(str(FONT_PATH), size=16) +CARD_WIDTH = 155 +CARD_HEIGHT = 97 + +EMOJI_WRONG = "\u274C" + +ANSWER_REGEX = re.compile(r'^\D*(\d+)\D+(\d+)\D+(\d+)\D*$') + +HELP_TEXT = """ +**Each card has 4 features** +Color, Number, Hat, and Accessory + +**A valid flight** +3 cards where each feature is either all the same or all different + +**Call "GOOSE"** +if you think there are no more flights + +**+1** for each valid flight +**+2** for a correct "GOOSE" call +**-1** for any wrong answer + +The first flight below is invalid: the first card has swords while the other two have no accessory.\ + It would be valid if the first card was empty-handed, or one of the other two had paintbrushes. + +The second flight is valid because there are no 2:1 splits; each feature is either all the same or all different. +""" + + +def assemble_board_image(board: list[tuple[int]], rows: int, columns: int) -> Image: + """Cut and paste images representing the given cards into an image representing the board.""" + new_im = Image.new("RGBA", (CARD_WIDTH*columns, CARD_HEIGHT*rows)) + draw = ImageDraw.Draw(new_im) + for idx, card in enumerate(board): + card_image = get_card_image(card) + row, col = divmod(idx, columns) + top, left = row * CARD_HEIGHT, col * CARD_WIDTH + new_im.paste(card_image, (left, top)) + draw.text( + xy=(left+5, top+5), # magic numbers are buffers for the card labels + text=str(idx), + fill=(0, 0, 0), + font=LABEL_FONT, + ) + return new_im + + +def get_card_image(card: tuple[int]) -> Image: + """Slice the image containing all the cards to get just this card.""" + # The master card image file should have 9x9 cards, + # arranged such that their features can be interpreted as ordered trinary. + row, col = divmod(as_trinary(card), 9) + x1 = col * CARD_WIDTH + x2 = x1 + CARD_WIDTH + y1 = row * CARD_HEIGHT + y2 = y1 + CARD_HEIGHT + return ALL_CARDS.crop((x1, y1, x2, y2)) + + +def as_trinary(card: tuple[int]) -> int: + """Find the card's unique index by interpreting its features as trinary.""" + return int(''.join(str(x) for x in card), base=3) + + +class DuckGame: + """A class for a single game.""" + + def __init__( + self, + rows: int = 4, + columns: int = 3, + minimum_solutions: int = 1, + ): + """ + Take samples from the deck to generate a board. + + Args: + rows (int, optional): Rows in the game board. Defaults to 4. + columns (int, optional): Columns in the game board. Defaults to 3. + minimum_solutions (int, optional): Minimum acceptable number of solutions in the board. Defaults to 1. + """ + self.rows = rows + self.columns = columns + size = rows * columns + + self._solutions = None + self.claimed_answers = {} + self.scores = defaultdict(int) + self.editing_embed = asyncio.Lock() + + self.board = random.sample(DECK, size) + while len(self.solutions) < minimum_solutions: + self.board = random.sample(DECK, size) + + @property + def board(self) -> list[tuple[int]]: + """Accesses board property.""" + return self._board + + @board.setter + def board(self, val: list[tuple[int]]) -> None: + """Erases calculated solutions if the board changes.""" + self._solutions = None + self._board = val + + @property + def solutions(self) -> None: + """Calculate valid solutions and cache to avoid redoing work.""" + if self._solutions is None: + self._solutions = set() + for idx_a, card_a in enumerate(self.board): + for idx_b, card_b in enumerate(self.board[idx_a+1:], start=idx_a+1): + # Two points determine a line, and there are exactly 3 points per line in {0,1,2}^4. + # The completion of a line will only be a duplicate point if the other two points are the same, + # which is prevented by the triangle iteration. + completion = tuple( + feat_a if feat_a == feat_b else 3-feat_a-feat_b + for feat_a, feat_b in zip(card_a, card_b) + ) + try: + idx_c = self.board.index(completion) + except ValueError: + continue + + # Indices within the solution are sorted to detect duplicate solutions modulo order. + solution = tuple(sorted((idx_a, idx_b, idx_c))) + self._solutions.add(solution) + + return self._solutions + + +class DuckGamesDirector(commands.Cog): + """A cog for running Duck Duck Duck Goose games.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.current_games = {} + + @commands.group( + name='duckduckduckgoose', + aliases=['dddg', 'ddg', 'duckduckgoose', 'duckgoose'], + invoke_without_command=True + ) + @commands.cooldown(rate=1, per=2, type=commands.BucketType.channel) + async def start_game(self, ctx: commands.Context) -> None: + """Generate a board, send the game embed, and end the game after a time limit.""" + if ctx.channel.id in self.current_games: + await ctx.send("There's already a game running!") + return + + minimum_solutions, = random.choices(range(len(SOLN_DISTR)), weights=SOLN_DISTR) + game = DuckGame(minimum_solutions=minimum_solutions) + game.running = True + self.current_games[ctx.channel.id] = game + + game.msg_content = "" + game.embed_msg = await self.send_board_embed(ctx, game) + await asyncio.sleep(GAME_DURATION) + + # Checking for the channel ID in the currently running games is not sufficient. + # The game could have been ended by a player, and a new game already started in the same channel. + if game.running: + try: + del self.current_games[ctx.channel.id] + await self.end_game(ctx.channel, game, end_message="Time's up!") + except KeyError: + pass + + @commands.Cog.listener() + async def on_message(self, msg: discord.Message) -> None: + """Listen for messages and process them as answers if appropriate.""" + if msg.author.bot: + return + + channel = msg.channel + if channel.id not in self.current_games: + return + + game = self.current_games[channel.id] + if msg.content.strip().lower() == 'goose': + # If all of the solutions have been claimed, i.e. the "goose" call is correct. + if len(game.solutions) == len(game.claimed_answers): + try: + del self.current_games[channel.id] + game.scores[msg.author] += CORRECT_GOOSE + await self.end_game(channel, game, end_message=f"{msg.author.display_name} GOOSED!") + except KeyError: + pass + else: + await msg.add_reaction(EMOJI_WRONG) + game.scores[msg.author] += INCORRECT_GOOSE + return + + # Valid answers contain 3 numbers. + if not (match := re.match(ANSWER_REGEX, msg.content)): + return + answer = tuple(sorted(int(m) for m in match.groups())) + + # Be forgiving for answers that use indices not on the board. + if not all(0 <= n < len(game.board) for n in answer): + return + + # Also be forgiving for answers that have already been claimed (and avoid penalizing for racing conditions). + if answer in game.claimed_answers: + return + + if answer in game.solutions: + game.claimed_answers[answer] = msg.author + game.scores[msg.author] += CORRECT_SOLN + await self.display_claimed_answer(game, msg.author, answer) + else: + await msg.add_reaction(EMOJI_WRONG) + game.scores[msg.author] += INCORRECT_SOLN + + async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message: + """Create and send the initial game embed. This will be edited as the game goes on.""" + image = assemble_board_image(game.board, game.rows, game.columns) + with BytesIO() as image_stream: + image.save(image_stream, format="png") + image_stream.seek(0) + file = discord.File(fp=image_stream, filename="board.png") + embed = discord.Embed( + title="Duck Duck Duck Goose!", + color=Colours.bright_green, + ) + embed.set_image(url="attachment://board.png") + return await ctx.send(embed=embed, file=file) + + async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None: + """Add a claimed answer to the game embed.""" + async with game.editing_embed: + # We specifically edit the message contents instead of the embed + # Because we load in the image from the file, editing any portion of the embed + # Does weird things to the image and this works around that weirdness + game.msg_content = f"{game.msg_content}\n{str(answer):12s} - {author.display_name}" + await game.embed_msg.edit(content=game.msg_content) + + async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None: + """Edit the game embed to reflect the end of the game and mark the game as not running.""" + game.running = False + + scoreboard_embed = discord.Embed( + title=end_message, + color=discord.Color.dark_purple(), + ) + scores = sorted( + game.scores.items(), + key=lambda item: item[1], + reverse=True, + ) + scoreboard = "Final scores:\n\n" + scoreboard += "\n".join(f"{member.display_name}: {score}" for member, score in scores) + scoreboard_embed.description = scoreboard + await channel.send(embed=scoreboard_embed) + + missed = [ans for ans in game.solutions if ans not in game.claimed_answers] + if missed: + missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed) + else: + missed_text = "All the flights were found!" + + await game.embed_msg.edit(content=f"{missed_text}") + + @start_game.command(name="help") + async def show_rules(self, ctx: commands.Context) -> None: + """Explain the rules of the game.""" + await self.send_help_embed(ctx) + + @start_game.command(name="stop") + @with_role(*MODERATION_ROLES) + async def stop_game(self, ctx: commands.Context) -> None: + """Stop a currently running game. Only available to mods.""" + try: + game = self.current_games.pop(ctx.channel.id) + except KeyError: + await ctx.send("No game currently running in this channel") + return + await self.end_game(ctx.channel, game, end_message="Game canceled.") + + @staticmethod + async def send_help_embed(ctx: commands.Context) -> discord.Message: + """Send rules embed.""" + embed = discord.Embed( + title="Compete against other players to find valid flights!", + color=discord.Color.dark_purple(), + ) + embed.description = HELP_TEXT + file = discord.File(HELP_IMAGE_PATH, filename="help.png") + embed.set_image(url="attachment://help.png") + embed.set_footer( + text="Tip: using Discord's compact message display mode can help keep the board on the screen" + ) + return await ctx.send(file=file, embed=embed) + + +def setup(bot: Bot) -> None: + """Load the DuckGamesDirector cog.""" + bot.add_cog(DuckGamesDirector(bot)) diff --git a/bot/exts/fun/fun.py b/bot/exts/fun/fun.py new file mode 100644 index 00000000..b148f1f3 --- /dev/null +++ b/bot/exts/fun/fun.py @@ -0,0 +1,250 @@ +import functools +import json +import logging +import random +from collections.abc import Iterable +from pathlib import Path +from typing import Callable, Optional, Union + +from discord import Embed, Message +from discord.ext import commands +from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content + +from bot import utils +from bot.bot import Bot +from bot.constants import Client, Colours, Emojis +from bot.utils import helpers + +log = logging.getLogger(__name__) + +UWU_WORDS = { + "fi": "fwi", + "l": "w", + "r": "w", + "some": "sum", + "th": "d", + "thing": "fing", + "tho": "fo", + "you're": "yuw'we", + "your": "yur", + "you": "yuw", +} + + +def caesar_cipher(text: str, offset: int) -> Iterable[str]: + """ + Implements a lazy Caesar Cipher algorithm. + + Encrypts a `text` given a specific integer `offset`. The sign + of the `offset` dictates the direction in which it shifts to, + with a negative value shifting to the left, and a positive + value shifting to the right. + """ + for char in text: + if not char.isascii() or not char.isalpha() or char.isspace(): + yield char + continue + + case_start = 65 if char.isupper() else 97 + true_offset = (ord(char) - case_start + offset) % 26 + + yield chr(case_start + true_offset) + + +class Fun(Cog): + """A collection of general commands for fun.""" + + def __init__(self, bot: Bot): + self.bot = bot + + self._caesar_cipher_embed = json.loads(Path("bot/resources/fun/caesar_info.json").read_text("UTF-8")) + + @staticmethod + def _get_random_die() -> str: + """Generate a random die emoji, ready to be sent on Discord.""" + die_name = f"dice_{random.randint(1, 6)}" + return getattr(Emojis, die_name) + + @commands.command() + async def roll(self, ctx: Context, num_rolls: int = 1) -> None: + """Outputs a number of random dice emotes (up to 6).""" + if 1 <= num_rolls <= 6: + dice = " ".join(self._get_random_die() for _ in range(num_rolls)) + await ctx.send(dice) + else: + raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.") + + @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) + async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: + """Converts a given `text` into it's uwu equivalent.""" + conversion_func = functools.partial( + utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True + ) + text, embed = await Fun._get_text_and_embed(ctx, text) + # Convert embed if it exists + if embed is not None: + embed = Fun._convert_embed(conversion_func, embed) + converted_text = conversion_func(text) + converted_text = helpers.suppress_links(converted_text) + # Don't put >>> if only embed present + if converted_text: + converted_text = f">>> {converted_text.lstrip('> ')}" + await ctx.send(content=converted_text, embed=embed) + + @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) + async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: + """Randomly converts the casing of a given `text`.""" + def conversion_func(text: str) -> str: + """Randomly converts the casing of a given string.""" + return "".join( + char.upper() if round(random.random()) else char.lower() for char in text + ) + text, embed = await Fun._get_text_and_embed(ctx, text) + # Convert embed if it exists + if embed is not None: + embed = Fun._convert_embed(conversion_func, embed) + converted_text = conversion_func(text) + converted_text = helpers.suppress_links(converted_text) + # Don't put >>> if only embed present + if converted_text: + converted_text = f">>> {converted_text.lstrip('> ')}" + await ctx.send(content=converted_text, embed=embed) + + @commands.group(name="caesarcipher", aliases=("caesar", "cc",)) + async def caesarcipher_group(self, ctx: Context) -> None: + """ + Translates a message using the Caesar Cipher. + + See `decrypt`, `encrypt`, and `info` subcommands. + """ + if ctx.invoked_subcommand is None: + await ctx.invoke(self.bot.get_command("help"), "caesarcipher") + + @caesarcipher_group.command(name="info") + async def caesarcipher_info(self, ctx: Context) -> None: + """Information about the Caesar Cipher.""" + embed = Embed.from_dict(self._caesar_cipher_embed) + embed.colour = Colours.dark_green + + await ctx.send(embed=embed) + + @staticmethod + async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None: + """ + Given a positive integer `offset`, translates and sends the given `msg`. + + Performs a right shift by default unless `left_shift` is specified as `True`. + + Also accepts a valid Discord Message ID or link. + """ + if offset < 0: + await ctx.send(":no_entry: Cannot use a negative offset.") + return + + if left_shift: + offset = -offset + + def conversion_func(text: str) -> str: + """Encrypts the given string using the Caesar Cipher.""" + return "".join(caesar_cipher(text, offset)) + + text, embed = await Fun._get_text_and_embed(ctx, msg) + + if embed is not None: + embed = Fun._convert_embed(conversion_func, embed) + + converted_text = conversion_func(text) + + if converted_text: + converted_text = f">>> {converted_text.lstrip('> ')}" + + await ctx.send(content=converted_text, embed=embed) + + @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",)) + async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None: + """ + Given a positive integer `offset`, encrypt the given `msg`. + + Performs a right shift of the letters in the message. + + Also accepts a valid Discord Message ID or link. + """ + await self._caesar_cipher(ctx, offset, msg, left_shift=False) + + @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",)) + async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None: + """ + Given a positive integer `offset`, decrypt the given `msg`. + + Performs a left shift of the letters in the message. + + Also accepts a valid Discord Message ID or link. + """ + await self._caesar_cipher(ctx, offset, msg, left_shift=True) + + @staticmethod + async def _get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]: + """ + Attempts to extract the text and embed from a possible link to a discord Message. + + Does not retrieve the text and embed from the Message if it is in a channel the user does + not have read permissions in. + + Returns a tuple of: + str: If `text` is a valid discord Message, the contents of the message, else `text`. + Optional[Embed]: The embed if found in the valid Message, else None + """ + embed = None + + msg = await Fun._get_discord_message(ctx, text) + # Ensure the user has read permissions for the channel the message is in + if isinstance(msg, Message): + permissions = msg.channel.permissions_for(ctx.author) + if permissions.read_messages: + text = msg.clean_content + # Take first embed because we can't send multiple embeds + if msg.embeds: + embed = msg.embeds[0] + + return (text, embed) + + @staticmethod + async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]: + """ + Attempts to convert a given `text` to a discord Message object and return it. + + Conversion will succeed if given a discord Message ID or link. + Returns `text` if the conversion fails. + """ + try: + text = await MessageConverter().convert(ctx, text) + except commands.BadArgument: + log.debug(f"Input '{text:.20}...' is not a valid Discord Message") + return text + + @staticmethod + def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: + """ + Converts the text in an embed using a given conversion function, then return the embed. + + Only modifies the following fields: title, description, footer, fields + """ + embed_dict = embed.to_dict() + + embed_dict["title"] = func(embed_dict.get("title", "")) + embed_dict["description"] = func(embed_dict.get("description", "")) + + if "footer" in embed_dict: + embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) + + if "fields" in embed_dict: + for field in embed_dict["fields"]: + field["name"] = func(field.get("name", "")) + field["value"] = func(field.get("value", "")) + + return Embed.from_dict(embed_dict) + + +def setup(bot: Bot) -> None: + """Load the Fun cog.""" + bot.add_cog(Fun(bot)) diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py new file mode 100644 index 00000000..f9c150e6 --- /dev/null +++ b/bot/exts/fun/game.py @@ -0,0 +1,485 @@ +import difflib +import logging +import random +import re +from asyncio import sleep +from datetime import datetime as dt, timedelta +from enum import IntEnum +from typing import Any, Optional + +from aiohttp import ClientSession +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import STAFF_ROLES, Tokens +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import ImagePaginator, LinePaginator + +# Base URL of IGDB API +BASE_URL = "https://api.igdb.com/v4" + +CLIENT_ID = Tokens.igdb_client_id +CLIENT_SECRET = Tokens.igdb_client_secret + +# The number of seconds before expiry that we attempt to re-fetch a new access token +ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2 + +# URL to request API access token +OAUTH_URL = "https://id.twitch.tv/oauth2/token" + +OAUTH_PARAMS = { + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "grant_type": "client_credentials" +} + +BASE_HEADERS = { + "Client-ID": CLIENT_ID, + "Accept": "application/json" +} + +logger = logging.getLogger(__name__) + +REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE) + +# --------- +# TEMPLATES +# --------- + +# Body templates +# Request body template for get_games_list +GAMES_LIST_BODY = ( + "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status," + "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;" + "{sort} {limit} {offset} {genre} {additional}" +) + +# Request body template for get_companies_list +COMPANIES_LIST_BODY = ( + "fields name, url, start_date, logo.image_id, developed.name, published.name, description;" + "offset {offset}; limit {limit};" +) + +# Request body template for games search +SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";' + +# Pages templates +# Game embed layout +GAME_PAGE = ( + "**[{name}]({url})**\n" + "{description}" + "**Release Date:** {release_date}\n" + "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n" + "**Platforms:** {platforms}\n" + "**Status:** {status}\n" + "**Age Ratings:** {age_ratings}\n" + "**Made by:** {made_by}\n\n" + "{storyline}" +) + +# .games company command page layout +COMPANY_PAGE = ( + "**[{name}]({url})**\n" + "{description}" + "**Founded:** {founded}\n" + "**Developed:** {developed}\n" + "**Published:** {published}" +) + +# For .games search command line layout +GAME_SEARCH_LINE = ( + "**[{name}]({url})**\n" + "{rating}/100 :star: (based on {rating_count} ratings)\n" +) + +# URL templates +COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg" +LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png" + +# Create aliases for complex genre names +ALIASES = { + "Role-playing (rpg)": ["Role playing", "Rpg"], + "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"], + "Real time strategy (rts)": ["Real time strategy", "Rts"], + "Hack and slash/beat 'em up": ["Hack and slash"] +} + + +class GameStatus(IntEnum): + """Game statuses in IGDB API.""" + + Released = 0 + Alpha = 2 + Beta = 3 + Early = 4 + Offline = 5 + Cancelled = 6 + Rumored = 7 + + +class AgeRatingCategories(IntEnum): + """IGDB API Age Rating categories IDs.""" + + ESRB = 1 + PEGI = 2 + + +class AgeRatings(IntEnum): + """PEGI/ESRB ratings IGDB API IDs.""" + + Three = 1 + Seven = 2 + Twelve = 3 + Sixteen = 4 + Eighteen = 5 + RP = 6 + EC = 7 + E = 8 + E10 = 9 + T = 10 + M = 11 + AO = 12 + + +class Games(Cog): + """Games Cog contains commands that collect data from IGDB.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.http_session: ClientSession = bot.http_session + + self.genres: dict[str, int] = {} + self.headers = BASE_HEADERS + + self.bot.loop.create_task(self.renew_access_token()) + + async def renew_access_token(self) -> None: + """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`.""" + while True: + async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp: + result = await resp.json() + if resp.status != 200: + # If there is a valid access token continue to use that, + # otherwise unload cog. + if "Authorization" in self.headers: + time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW) + logger.error( + "Failed to renew IGDB access token. " + f"Current token will last for {time_delta} " + f"OAuth response message: {result['message']}" + ) + else: + logger.warning( + "Invalid OAuth credentials. Unloading Games cog. " + f"OAuth response message: {result['message']}" + ) + self.bot.remove_cog("Games") + + return + + self.headers["Authorization"] = f"Bearer {result['access_token']}" + + # Attempt to renew before the token expires + next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW + + time_delta = timedelta(seconds=next_renewal) + logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}") + + # This will be true the first time this loop runs. + # Since we now have an access token, its safe to start this task. + if self.genres == {}: + self.refresh_genres_task.start() + await sleep(next_renewal) + + @tasks.loop(hours=24.0) + async def refresh_genres_task(self) -> None: + """Refresh genres in every hour.""" + try: + await self._get_genres() + except Exception as e: + logger.warning(f"There was error while refreshing genres: {e}") + return + logger.info("Successfully refreshed genres.") + + def cog_unload(self) -> None: + """Cancel genres refreshing start when unloading Cog.""" + self.refresh_genres_task.cancel() + logger.info("Successfully stopped Genres Refreshing task.") + + async def _get_genres(self) -> None: + """Create genres variable for games command.""" + body = "fields name; limit 100;" + async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp: + result = await resp.json() + genres = {genre["name"].capitalize(): genre["id"] for genre in result} + + # Replace complex names with names from ALIASES + for genre_name, genre in genres.items(): + if genre_name in ALIASES: + for alias in ALIASES[genre_name]: + self.genres[alias] = genre + else: + self.genres[genre_name] = genre + + @group(name="games", aliases=("game",), invoke_without_command=True) + async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None: + """ + Get random game(s) by genre from IGDB. Use .games genres command to get all available genres. + + Also support amount parameter, what max is 25 and min 1, default 5. Supported formats: + - .games <genre> + - .games <amount> <genre> + """ + # When user didn't specified genre, send help message + if genre is None: + await invoke_help_command(ctx) + return + + # Capitalize genre for check + genre = "".join(genre).capitalize() + + # Check for amounts, max is 25 and min 1 + if not 1 <= amount <= 25: + await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") + return + + # Get games listing, if genre don't exist, show error message with possibilities. + # Offset must be random, due otherwise we will get always same result (offset show in which position should + # API start returning result) + try: + games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150)) + except KeyError: + possibilities = await self.get_best_results(genre) + # If there is more than 1 possibilities, show these. + # If there is only 1 possibility, use it as genre. + # Otherwise send message about invalid genre. + if len(possibilities) > 1: + display_possibilities = "`, `".join(p[1] for p in possibilities) + await ctx.send( + f"Invalid genre `{genre}`. " + f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}" + ) + return + elif len(possibilities) == 1: + games = await self.get_games_list( + amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150) + ) + genre = possibilities[0][1] + else: + await ctx.send(f"Invalid genre `{genre}`.") + return + + # Create pages and paginate + pages = [await self.create_page(game) for game in games] + + await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games")) + + @games.command(name="top", aliases=("t",)) + async def top(self, ctx: Context, amount: int = 10) -> None: + """ + Get current Top games in IGDB. + + Support amount parameter. Max is 25, min is 1. + """ + if not 1 <= amount <= 25: + await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") + return + + games = await self.get_games_list(amount, sort="total_rating desc", + additional_body="where total_rating >= 90; sort total_rating_count desc;") + + pages = [await self.create_page(game) for game in games] + await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games")) + + @games.command(name="genres", aliases=("genre", "g")) + async def genres(self, ctx: Context) -> None: + """Get all available genres.""" + await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}") + + @games.command(name="search", aliases=("s",)) + async def search(self, ctx: Context, *, search_term: str) -> None: + """Find games by name.""" + lines = await self.search_games(search_term) + + await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False) + + @games.command(name="company", aliases=("companies",)) + async def company(self, ctx: Context, amount: int = 5) -> None: + """ + Get random Game Companies companies from IGDB API. + + Support amount parameter. Max is 25, min is 1. + """ + if not 1 <= amount <= 25: + await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") + return + + # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to + # get (almost) every time different companies (offset show in which position should API start returning result) + companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150)) + pages = [await self.create_company_page(co) for co in companies] + + await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies")) + + @with_role(*STAFF_ROLES) + @games.command(name="refresh", aliases=("r",)) + async def refresh_genres_command(self, ctx: Context) -> None: + """Refresh .games command genres.""" + try: + await self._get_genres() + except Exception as e: + await ctx.send(f"There was error while refreshing genres: `{e}`") + return + await ctx.send("Successfully refreshed genres.") + + async def get_games_list( + self, + amount: int, + genre: Optional[str] = None, + sort: Optional[str] = None, + additional_body: str = "", + offset: int = 0 + ) -> list[dict[str, Any]]: + """ + Get list of games from IGDB API by parameters that is provided. + + Amount param show how much games this get, genre is genre ID and at least one genre in game must this when + provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field, + desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start + position in API. + """ + # Create body of IGDB API request, define fields, sorting, offset, limit and genre + params = { + "sort": f"sort {sort};" if sort else "", + "limit": f"limit {amount};", + "offset": f"offset {offset};" if offset else "", + "genre": f"where genres = ({genre});" if genre else "", + "additional": additional_body + } + body = GAMES_LIST_BODY.format(**params) + + # Do request to IGDB API, create headers, URL, define body, return result + async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: + return await resp.json() + + async def create_page(self, data: dict[str, Any]) -> tuple[str, str]: + """Create content of Game Page.""" + # Create cover image URL from template + url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""}) + + # Get release date separately with checking + release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?" + + # Create Age Ratings value + rating = ", ".join( + f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" + for age in data["age_ratings"] + ) if "age_ratings" in data else "?" + + companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?" + + # Create formatting for template page + formatting = { + "name": data["name"], + "url": data["url"], + "description": f"{data['summary']}\n\n" if "summary" in data else "\n", + "release_date": release_date, + "rating": round(data["total_rating"] if "total_rating" in data else 0, 2), + "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?", + "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?", + "status": GameStatus(data["status"]).name if "status" in data else "?", + "age_ratings": rating, + "made_by": ", ".join(companies), + "storyline": data["storyline"] if "storyline" in data else "" + } + page = GAME_PAGE.format(**formatting) + + return page, url + + async def search_games(self, search_term: str) -> list[str]: + """Search game from IGDB API by string, return listing of pages.""" + lines = [] + + # Define request body of IGDB API request and do request + body = SEARCH_BODY.format(**{"term": search_term}) + + async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: + data = await resp.json() + + # Loop over games, format them to good format, make line and append this to total lines + for game in data: + formatting = { + "name": game["name"], + "url": game["url"], + "rating": round(game["total_rating"] if "total_rating" in game else 0, 2), + "rating_count": game["total_rating_count"] if "total_rating" in game else "?" + } + line = GAME_SEARCH_LINE.format(**formatting) + lines.append(line) + + return lines + + async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]: + """ + Get random Game Companies from IGDB API. + + Limit is parameter, that show how much movies this should return, offset show in which position should API start + returning results. + """ + # Create request body from template + body = COMPANIES_LIST_BODY.format(**{ + "limit": limit, + "offset": offset + }) + + async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp: + return await resp.json() + + async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]: + """Create good formatted Game Company page.""" + # Generate URL of company logo + url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""}) + + # Try to get found date of company + founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?" + + # Generate list of games, that company have developed or published + developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?" + published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?" + + formatting = { + "name": data["name"], + "url": data["url"], + "description": f"{data['description']}\n\n" if "description" in data else "\n", + "founded": founded, + "developed": developed, + "published": published + } + page = COMPANY_PAGE.format(**formatting) + + return page, url + + async def get_best_results(self, query: str) -> list[tuple[float, str]]: + """Get best match result of genre when original genre is invalid.""" + results = [] + for genre in self.genres: + ratios = [difflib.SequenceMatcher(None, query, genre).ratio()] + for word in REGEX_NON_ALPHABET.split(genre): + ratios.append(difflib.SequenceMatcher(None, query, word).ratio()) + results.append((round(max(ratios), 2), genre)) + return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] + + +def setup(bot: Bot) -> None: + """Load the Games cog.""" + # Check does IGDB API key exist, if not, log warning and don't load cog + if not Tokens.igdb_client_id: + logger.warning("No IGDB client ID. Not loading Games cog.") + return + if not Tokens.igdb_client_secret: + logger.warning("No IGDB client secret. Not loading Games cog.") + return + bot.add_cog(Games(bot)) diff --git a/bot/exts/fun/magic_8ball.py b/bot/exts/fun/magic_8ball.py new file mode 100644 index 00000000..a7b682ca --- /dev/null +++ b/bot/exts/fun/magic_8ball.py @@ -0,0 +1,30 @@ +import json +import logging +import random +from pathlib import Path + +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) + +ANSWERS = json.loads(Path("bot/resources/fun/magic8ball.json").read_text("utf8")) + + +class Magic8ball(commands.Cog): + """A Magic 8ball command to respond to a user's question.""" + + @commands.command(name="8ball") + async def output_answer(self, ctx: commands.Context, *, question: str) -> None: + """Return a Magic 8ball answer from answers list.""" + if len(question.split()) >= 3: + answer = random.choice(ANSWERS) + await ctx.send(answer) + else: + await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)") + + +def setup(bot: Bot) -> None: + """Load the Magic8Ball Cog.""" + bot.add_cog(Magic8ball()) diff --git a/bot/exts/fun/minesweeper.py b/bot/exts/fun/minesweeper.py new file mode 100644 index 00000000..a48b5051 --- /dev/null +++ b/bot/exts/fun/minesweeper.py @@ -0,0 +1,270 @@ +import logging +from collections.abc import Iterator +from dataclasses import dataclass +from random import randint, random +from typing import Union + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Client +from bot.utils.converters import CoordinateConverter +from bot.utils.exceptions import UserNotPlayingError +from bot.utils.extensions import invoke_help_command + +MESSAGE_MAPPING = { + 0: ":stop_button:", + 1: ":one:", + 2: ":two:", + 3: ":three:", + 4: ":four:", + 5: ":five:", + 6: ":six:", + 7: ":seven:", + 8: ":eight:", + 9: ":nine:", + 10: ":keycap_ten:", + "bomb": ":bomb:", + "hidden": ":grey_question:", + "flag": ":flag_black:", + "x": ":x:" +} + +log = logging.getLogger(__name__) + + +GameBoard = list[list[Union[str, int]]] + + +@dataclass +class Game: + """The data for a game.""" + + board: GameBoard + revealed: GameBoard + dm_msg: discord.Message + chat_msg: discord.Message + activated_on_server: bool + + +class Minesweeper(commands.Cog): + """Play a game of Minesweeper.""" + + def __init__(self): + self.games: dict[int, Game] = {} + + @commands.group(name="minesweeper", aliases=("ms",), invoke_without_command=True) + async def minesweeper_group(self, ctx: commands.Context) -> None: + """Commands for Playing Minesweeper.""" + await invoke_help_command(ctx) + + @staticmethod + def get_neighbours(x: int, y: int) -> Iterator[tuple[int, int]]: + """Get all the neighbouring x and y including it self.""" + for x_ in [x - 1, x, x + 1]: + for y_ in [y - 1, y, y + 1]: + if x_ != -1 and x_ != 10 and y_ != -1 and y_ != 10: + yield x_, y_ + + def generate_board(self, bomb_chance: float) -> GameBoard: + """Generate a 2d array for the board.""" + board: GameBoard = [ + [ + "bomb" if random() <= bomb_chance else "number" + for _ in range(10) + ] for _ in range(10) + ] + + # make sure there is always a free cell + board[randint(0, 9)][randint(0, 9)] = "number" + + for y, row in enumerate(board): + for x, cell in enumerate(row): + if cell == "number": + # calculate bombs near it + bombs = 0 + for x_, y_ in self.get_neighbours(x, y): + if board[y_][x_] == "bomb": + bombs += 1 + board[y][x] = bombs + return board + + @staticmethod + def format_for_discord(board: GameBoard) -> str: + """Format the board as a string for Discord.""" + discord_msg = ( + ":stop_button: :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: " + ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: " + ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n" + ) + rows = [] + for row_number, row in enumerate(board): + new_row = f"{MESSAGE_MAPPING[row_number + 1]} " + new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row) + rows.append(new_row) + + discord_msg += "\n".join(rows) + return discord_msg + + @minesweeper_group.command(name="start") + async def start_command(self, ctx: commands.Context, bomb_chance: float = .2) -> None: + """Start a game of Minesweeper.""" + if ctx.author.id in self.games: # Player is already playing + await ctx.send(f"{ctx.author.mention} you already have a game running!", delete_after=2) + await ctx.message.delete(delay=2) + return + + try: + await ctx.author.send( + f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n" + f"Close the game with `{Client.prefix}ms end`\n" + ) + except discord.errors.Forbidden: + log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members.") + await ctx.send(f":x: {ctx.author.mention}, please enable DMs to play minesweeper.") + return + + # Add game to list + board: GameBoard = self.generate_board(bomb_chance) + revealed_board: GameBoard = [["hidden"] * 10 for _ in range(10)] + dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}") + + if ctx.guild: + await ctx.send(f"{ctx.author.mention} is playing Minesweeper.") + chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}") + else: + chat_msg = None + + self.games[ctx.author.id] = Game( + board=board, + revealed=revealed_board, + dm_msg=dm_msg, + chat_msg=chat_msg, + activated_on_server=ctx.guild is not None + ) + + async def update_boards(self, ctx: commands.Context) -> None: + """Update both playing boards.""" + game = self.games[ctx.author.id] + await game.dm_msg.delete() + game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}") + if game.activated_on_server: + await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}") + + @commands.dm_only() + @minesweeper_group.command(name="flag") + async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: + """Place multiple flags on the board.""" + if ctx.author.id not in self.games: + raise UserNotPlayingError + board: GameBoard = self.games[ctx.author.id].revealed + for x, y in coordinates: + if board[y][x] == "hidden": + board[y][x] = "flag" + + await self.update_boards(ctx) + + @staticmethod + def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None: + """Reveals all the bombs.""" + for y, row in enumerate(board): + for x, cell in enumerate(row): + if cell == "bomb": + revealed[y][x] = cell + + async def lost(self, ctx: commands.Context) -> None: + """The player lost the game.""" + game = self.games[ctx.author.id] + self.reveal_bombs(game.revealed, game.board) + await ctx.author.send(":fire: You lost! :fire:") + if game.activated_on_server: + await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:") + + async def won(self, ctx: commands.Context) -> None: + """The player won the game.""" + game = self.games[ctx.author.id] + await ctx.author.send(":tada: You won! :tada:") + if game.activated_on_server: + await game.chat_msg.channel.send(f":tada: {ctx.author.mention} just won Minesweeper! :tada:") + + def reveal_zeros(self, revealed: GameBoard, board: GameBoard, x: int, y: int) -> None: + """Recursively reveal adjacent cells when a 0 cell is encountered.""" + for x_, y_ in self.get_neighbours(x, y): + if revealed[y_][x_] != "hidden": + continue + revealed[y_][x_] = board[y_][x_] + if board[y_][x_] == 0: + self.reveal_zeros(revealed, board, x_, y_) + + async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool: + """Checks if a player has won.""" + if any( + revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb" + for x in range(10) + for y in range(10) + ): + return False + else: + await self.won(ctx) + return True + + async def reveal_one( + self, + ctx: commands.Context, + revealed: GameBoard, + board: GameBoard, + x: int, + y: int + ) -> bool: + """ + Reveal one square. + + return is True if the game ended, breaking the loop in `reveal_command` and deleting the game. + """ + revealed[y][x] = board[y][x] + if board[y][x] == "bomb": + await self.lost(ctx) + revealed[y][x] = "x" # mark bomb that made you lose with a x + return True + elif board[y][x] == 0: + self.reveal_zeros(revealed, board, x, y) + return await self.check_if_won(ctx, revealed, board) + + @commands.dm_only() + @minesweeper_group.command(name="reveal") + async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: + """Reveal multiple cells.""" + if ctx.author.id not in self.games: + raise UserNotPlayingError + game = self.games[ctx.author.id] + revealed: GameBoard = game.revealed + board: GameBoard = game.board + + for x, y in coordinates: + # reveal_one returns True if the revealed cell is a bomb or the player won, ending the game + if await self.reveal_one(ctx, revealed, board, x, y): + await self.update_boards(ctx) + del self.games[ctx.author.id] + break + else: + await self.update_boards(ctx) + + @minesweeper_group.command(name="end") + async def end_command(self, ctx: commands.Context) -> None: + """End your current game.""" + if ctx.author.id not in self.games: + raise UserNotPlayingError + game = self.games[ctx.author.id] + game.revealed = game.board + await self.update_boards(ctx) + new_msg = f":no_entry: Game canceled. :no_entry:\n{game.dm_msg.content}" + await game.dm_msg.edit(content=new_msg) + if game.activated_on_server: + await game.chat_msg.edit(content=new_msg) + del self.games[ctx.author.id] + + +def setup(bot: Bot) -> None: + """Load the Minesweeper cog.""" + bot.add_cog(Minesweeper()) diff --git a/bot/exts/fun/movie.py b/bot/exts/fun/movie.py new file mode 100644 index 00000000..a04eeb41 --- /dev/null +++ b/bot/exts/fun/movie.py @@ -0,0 +1,205 @@ +import logging +import random +from enum import Enum +from typing import Any + +from aiohttp import ClientSession +from discord import Embed +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import Tokens +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import ImagePaginator + +# Define base URL of TMDB +BASE_URL = "https://api.themoviedb.org/3/" + +logger = logging.getLogger(__name__) + +# Define movie params, that will be used for every movie request +MOVIE_PARAMS = { + "api_key": Tokens.tmdb, + "language": "en-US" +} + + +class MovieGenres(Enum): + """Movies Genre names and IDs.""" + + Action = "28" + Adventure = "12" + Animation = "16" + Comedy = "35" + Crime = "80" + Documentary = "99" + Drama = "18" + Family = "10751" + Fantasy = "14" + History = "36" + Horror = "27" + Music = "10402" + Mystery = "9648" + Romance = "10749" + Science = "878" + Thriller = "53" + Western = "37" + + +class Movie(Cog): + """Movie Cog contains movies command that grab random movies from TMDB.""" + + def __init__(self, bot: Bot): + self.http_session: ClientSession = bot.http_session + + @group(name="movies", aliases=("movie",), invoke_without_command=True) + async def movies(self, ctx: Context, genre: str = "", amount: int = 5) -> None: + """ + Get random movies by specifying genre. Also support amount parameter, that define how much movies will be shown. + + Default 5. Use .movies genres to get all available genres. + """ + # Check is there more than 20 movies specified, due TMDB return 20 movies + # per page, so this is max. Also you can't get less movies than 1, just logic + if amount > 20: + await ctx.send("You can't get more than 20 movies at once. (TMDB limits)") + return + elif amount < 1: + await ctx.send("You can't get less than 1 movie.") + return + + # Capitalize genre for getting data from Enum, get random page, send help when genre don't exist. + genre = genre.capitalize() + try: + result = await self.get_movies_data(self.http_session, MovieGenres[genre].value, 1) + except KeyError: + await invoke_help_command(ctx) + return + + # Check if "results" is in result. If not, throw error. + if "results" not in result: + err_msg = ( + f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " + f"{result['status_message']}." + ) + await ctx.send(err_msg) + logger.warning(err_msg) + + # Get random page. Max page is last page where is movies with this genre. + page = random.randint(1, result["total_pages"]) + + # Get movies list from TMDB, check if results key in result. When not, raise error. + movies = await self.get_movies_data(self.http_session, MovieGenres[genre].value, page) + if "results" not in movies: + err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \ + f"{result['status_message']}." + await ctx.send(err_msg) + logger.warning(err_msg) + + # Get all pages and embed + pages = await self.get_pages(self.http_session, movies, amount) + embed = await self.get_embed(genre) + + await ImagePaginator.paginate(pages, ctx, embed) + + @movies.command(name="genres", aliases=("genre", "g")) + async def genres(self, ctx: Context) -> None: + """Show all currently available genres for .movies command.""" + await ctx.send(f"Current available genres: {', '.join('`' + genre.name + '`' for genre in MovieGenres)}") + + async def get_movies_data(self, client: ClientSession, genre_id: str, page: int) -> list[dict[str, Any]]: + """Return JSON of TMDB discover request.""" + # Define params of request + params = { + "api_key": Tokens.tmdb, + "language": "en-US", + "sort_by": "popularity.desc", + "include_adult": "false", + "include_video": "false", + "page": page, + "with_genres": genre_id + } + + url = BASE_URL + "discover/movie" + + # Make discover request to TMDB, return result + async with client.get(url, params=params) as resp: + return await resp.json() + + async def get_pages(self, client: ClientSession, movies: dict[str, Any], amount: int) -> list[tuple[str, str]]: + """Fetch all movie pages from movies dictionary. Return list of pages.""" + pages = [] + + for i in range(amount): + movie_id = movies["results"][i]["id"] + movie = await self.get_movie(client, movie_id) + + page, img = await self.create_page(movie) + pages.append((page, img)) + + return pages + + async def get_movie(self, client: ClientSession, movie: int) -> dict[str, Any]: + """Get Movie by movie ID from TMDB. Return result dictionary.""" + if not isinstance(movie, int): + raise ValueError("Error while fetching movie from TMDB, movie argument must be integer. ") + url = BASE_URL + f"movie/{movie}" + + async with client.get(url, params=MOVIE_PARAMS) as resp: + return await resp.json() + + async def create_page(self, movie: dict[str, Any]) -> tuple[str, str]: + """Create page from TMDB movie request result. Return formatted page + image.""" + text = "" + + # Add title + tagline (if not empty) + text += f"**{movie['title']}**\n" + if movie["tagline"]: + text += f"{movie['tagline']}\n\n" + else: + text += "\n" + + # Add other information + text += f"**Rating:** {movie['vote_average']}/10 :star:\n" + text += f"**Release Date:** {movie['release_date']}\n\n" + + text += "__**Production Information**__\n" + + companies = movie["production_companies"] + countries = movie["production_countries"] + + text += f"**Made by:** {', '.join(company['name'] for company in companies)}\n" + text += f"**Made in:** {', '.join(country['name'] for country in countries)}\n\n" + + text += "__**Some Numbers**__\n" + + budget = f"{movie['budget']:,d}" if movie['budget'] else "?" + revenue = f"{movie['revenue']:,d}" if movie['revenue'] else "?" + + if movie["runtime"] is not None: + duration = divmod(movie["runtime"], 60) + else: + duration = ("?", "?") + + text += f"**Budget:** ${budget}\n" + text += f"**Revenue:** ${revenue}\n" + text += f"**Duration:** {f'{duration[0]} hour(s) {duration[1]} minute(s)'}\n\n" + + text += movie["overview"] + + img = f"http://image.tmdb.org/t/p/w200{movie['poster_path']}" + + # Return page content and image + return text, img + + async def get_embed(self, name: str) -> Embed: + """Return embed of random movies. Uses name in title.""" + embed = Embed(title=f"Random {name} Movies") + embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") + embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") + return embed + + +def setup(bot: Bot) -> None: + """Load the Movie Cog.""" + bot.add_cog(Movie(bot)) diff --git a/bot/exts/fun/recommend_game.py b/bot/exts/fun/recommend_game.py new file mode 100644 index 00000000..42c9f7c2 --- /dev/null +++ b/bot/exts/fun/recommend_game.py @@ -0,0 +1,51 @@ +import json +import logging +from pathlib import Path +from random import shuffle + +import discord +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) +game_recs = [] + +# Populate the list `game_recs` with resource files +for rec_path in Path("bot/resources/fun/game_recs").glob("*.json"): + data = json.loads(rec_path.read_text("utf8")) + game_recs.append(data) +shuffle(game_recs) + + +class RecommendGame(commands.Cog): + """Commands related to recommending games.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.index = 0 + + @commands.command(name="recommendgame", aliases=("gamerec",)) + async def recommend_game(self, ctx: commands.Context) -> None: + """Sends an Embed of a random game recommendation.""" + if self.index >= len(game_recs): + self.index = 0 + shuffle(game_recs) + game = game_recs[self.index] + self.index += 1 + + author = self.bot.get_user(int(game["author"])) + + # Creating and formatting Embed + embed = discord.Embed(color=discord.Colour.blue()) + if author is not None: + embed.set_author(name=author.name, icon_url=author.display_avatar.url) + embed.set_image(url=game["image"]) + embed.add_field(name=f"Recommendation: {game['title']}\n{game['link']}", value=game["description"]) + + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Loads the RecommendGame cog.""" + bot.add_cog(RecommendGame(bot)) diff --git a/bot/exts/fun/rps.py b/bot/exts/fun/rps.py new file mode 100644 index 00000000..c6bbff46 --- /dev/null +++ b/bot/exts/fun/rps.py @@ -0,0 +1,57 @@ +from random import choice + +from discord.ext import commands + +from bot.bot import Bot + +CHOICES = ["rock", "paper", "scissors"] +SHORT_CHOICES = ["r", "p", "s"] + +# Using a dictionary instead of conditions to check for the winner. +WINNER_DICT = { + "r": { + "r": 0, + "p": -1, + "s": 1, + }, + "p": { + "r": 1, + "p": 0, + "s": -1, + }, + "s": { + "r": -1, + "p": 1, + "s": 0, + } +} + + +class RPS(commands.Cog): + """Rock Paper Scissors. The Classic Game!""" + + @commands.command(case_insensitive=True) + async def rps(self, ctx: commands.Context, move: str) -> None: + """Play the classic game of Rock Paper Scissors with your own sir-lancebot!""" + move = move.lower() + player_mention = ctx.author.mention + + if move not in CHOICES and move not in SHORT_CHOICES: + raise commands.BadArgument(f"Invalid move. Please make move from options: {', '.join(CHOICES).upper()}.") + + bot_move = choice(CHOICES) + # value of player_result will be from (-1, 0, 1) as (lost, tied, won). + player_result = WINNER_DICT[move[0]][bot_move[0]] + + if player_result == 0: + message_string = f"{player_mention} You and Sir Lancebot played {bot_move}, it's a tie." + await ctx.send(message_string) + elif player_result == 1: + await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} won!") + else: + await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} lost!") + + +def setup(bot: Bot) -> None: + """Load the RPS Cog.""" + bot.add_cog(RPS(bot)) diff --git a/bot/exts/fun/space.py b/bot/exts/fun/space.py new file mode 100644 index 00000000..48ad0f96 --- /dev/null +++ b/bot/exts/fun/space.py @@ -0,0 +1,236 @@ +import logging +import random +from datetime import date, datetime +from typing import Any, Optional +from urllib.parse import urlencode + +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import Tokens +from bot.utils.converters import DateConverter +from bot.utils.extensions import invoke_help_command + +logger = logging.getLogger(__name__) + +NASA_BASE_URL = "https://api.nasa.gov" +NASA_IMAGES_BASE_URL = "https://images-api.nasa.gov" +NASA_EPIC_BASE_URL = "https://epic.gsfc.nasa.gov" + +APOD_MIN_DATE = date(1995, 6, 16) + + +class Space(Cog): + """Space Cog contains commands, that show images, facts or other information about space.""" + + def __init__(self, bot: Bot): + self.http_session = bot.http_session + + self.rovers = {} + self.get_rovers.start() + + def cog_unload(self) -> None: + """Cancel `get_rovers` task when Cog will unload.""" + self.get_rovers.cancel() + + @tasks.loop(hours=24) + async def get_rovers(self) -> None: + """Get listing of rovers from NASA API and info about their start and end dates.""" + data = await self.fetch_from_nasa("mars-photos/api/v1/rovers") + + for rover in data["rovers"]: + self.rovers[rover["name"].lower()] = { + "min_date": rover["landing_date"], + "max_date": rover["max_date"], + "max_sol": rover["max_sol"] + } + + @group(name="space", invoke_without_command=True) + async def space(self, ctx: Context) -> None: + """Head command that contains commands about space.""" + await invoke_help_command(ctx) + + @space.command(name="apod") + async def apod(self, ctx: Context, date: Optional[str]) -> None: + """ + Get Astronomy Picture of Day from NASA API. Date is optional parameter, what formatting is YYYY-MM-DD. + + If date is not specified, this will get today APOD. + """ + params = {} + # Parse date to params, when provided. Show error message when invalid formatting + if date: + try: + apod_date = datetime.strptime(date, "%Y-%m-%d").date() + except ValueError: + await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.") + return + + now = datetime.now().date() + if APOD_MIN_DATE > apod_date or now < apod_date: + await ctx.send(f"Date must be between {APOD_MIN_DATE.isoformat()} and {now.isoformat()} (today).") + return + + params["date"] = apod_date.isoformat() + + result = await self.fetch_from_nasa("planetary/apod", params) + + await ctx.send( + embed=self.create_nasa_embed( + f"Astronomy Picture of the Day - {result['date']}", + result["explanation"], + result["url"] + ) + ) + + @space.command(name="nasa") + async def nasa(self, ctx: Context, *, search_term: Optional[str]) -> None: + """Get random NASA information/facts + image. Support `search_term` parameter for more specific search.""" + params = { + "media_type": "image" + } + if search_term: + params["q"] = search_term + + # Don't use API key, no need for this. + data = await self.fetch_from_nasa("search", params, NASA_IMAGES_BASE_URL, use_api_key=False) + if len(data["collection"]["items"]) == 0: + await ctx.send(f"Can't find any items with search term `{search_term}`.") + return + + item = random.choice(data["collection"]["items"]) + + await ctx.send( + embed=self.create_nasa_embed( + item["data"][0]["title"], + item["data"][0]["description"], + item["links"][0]["href"] + ) + ) + + @space.command(name="epic") + async def epic(self, ctx: Context, date: Optional[str]) -> None: + """Get a random image of the Earth from the NASA EPIC API. Support date parameter, format is YYYY-MM-DD.""" + if date: + try: + show_date = datetime.strptime(date, "%Y-%m-%d").date().isoformat() + except ValueError: + await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.") + return + else: + show_date = None + + # Don't use API key, no need for this. + data = await self.fetch_from_nasa( + f"api/natural{f'/date/{show_date}' if show_date else ''}", + base=NASA_EPIC_BASE_URL, + use_api_key=False + ) + if len(data) < 1: + await ctx.send("Can't find any images in this date.") + return + + item = random.choice(data) + + year, month, day = item["date"].split(" ")[0].split("-") + image_url = f"{NASA_EPIC_BASE_URL}/archive/natural/{year}/{month}/{day}/jpg/{item['image']}.jpg" + + await ctx.send( + embed=self.create_nasa_embed( + "Earth Image", item["caption"], image_url, f" \u2022 Identifier: {item['identifier']}" + ) + ) + + @space.group(name="mars", invoke_without_command=True) + async def mars( + self, + ctx: Context, + date: Optional[DateConverter], + rover: str = "curiosity" + ) -> None: + """ + Get random Mars image by date. Support both SOL (martian solar day) and earth date and rovers. + + Earth date formatting is YYYY-MM-DD. Use `.space mars dates` to get all currently available rovers. + """ + rover = rover.lower() + if rover not in self.rovers: + await ctx.send( + ( + f"Invalid rover `{rover}`.\n" + f"**Rovers:** `{'`, `'.join(f'{r.capitalize()}' for r in self.rovers)}`" + ) + ) + return + + # When date not provided, get random SOL date between 0 and rover's max. + if date is None: + date = random.randint(0, self.rovers[rover]["max_sol"]) + + params = {} + if isinstance(date, int): + params["sol"] = date + else: + params["earth_date"] = date.date().isoformat() + + result = await self.fetch_from_nasa(f"mars-photos/api/v1/rovers/{rover}/photos", params) + if len(result["photos"]) < 1: + err_msg = ( + f"We can't find result in date " + f"{date.date().isoformat() if isinstance(date, datetime) else f'{date} SOL'}.\n" + f"**Note:** Dates must match with rover's working dates. Please use `{ctx.prefix}space mars dates` to " + "see working dates for each rover." + ) + await ctx.send(err_msg) + return + + item = random.choice(result["photos"]) + await ctx.send( + embed=self.create_nasa_embed( + f"{item['rover']['name']}'s {item['camera']['full_name']} Mars Image", "", item["img_src"], + ) + ) + + @mars.command(name="dates", aliases=("d", "date", "rover", "rovers", "r")) + async def dates(self, ctx: Context) -> None: + """Get current available rovers photo date ranges.""" + await ctx.send("\n".join( + f"**{r.capitalize()}:** {i['min_date']} **-** {i['max_date']}" for r, i in self.rovers.items() + )) + + async def fetch_from_nasa( + self, + endpoint: str, + additional_params: Optional[dict[str, Any]] = None, + base: Optional[str] = NASA_BASE_URL, + use_api_key: bool = True + ) -> dict[str, Any]: + """Fetch information from NASA API, return result.""" + params = {} + if use_api_key: + params["api_key"] = Tokens.nasa + + # Add additional parameters to request parameters only when they provided by user + if additional_params is not None: + params.update(additional_params) + + async with self.http_session.get(url=f"{base}/{endpoint}?{urlencode(params)}") as resp: + return await resp.json() + + def create_nasa_embed(self, title: str, description: str, image: str, footer: Optional[str] = "") -> Embed: + """Generate NASA commands embeds. Required: title, description and image URL, footer (addition) is optional.""" + return Embed( + title=title, + description=description + ).set_image(url=image).set_footer(text="Powered by NASA API" + footer) + + +def setup(bot: Bot) -> None: + """Load the Space cog.""" + if not Tokens.nasa: + logger.warning("Can't find NASA API key. Not loading Space Cog.") + return + + bot.add_cog(Space(bot)) diff --git a/bot/exts/fun/speedrun.py b/bot/exts/fun/speedrun.py new file mode 100644 index 00000000..c2966ce1 --- /dev/null +++ b/bot/exts/fun/speedrun.py @@ -0,0 +1,26 @@ +import json +import logging +from pathlib import Path +from random import choice + +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) + +LINKS = json.loads(Path("bot/resources/fun/speedrun_links.json").read_text("utf8")) + + +class Speedrun(commands.Cog): + """Commands about the video game speedrunning community.""" + + @commands.command(name="speedrun") + async def get_speedrun(self, ctx: commands.Context) -> None: + """Sends a link to a video of a random speedrun.""" + await ctx.send(choice(LINKS)) + + +def setup(bot: Bot) -> None: + """Load the Speedrun cog.""" + bot.add_cog(Speedrun()) diff --git a/bot/exts/fun/status_codes.py b/bot/exts/fun/status_codes.py new file mode 100644 index 00000000..501cbe0a --- /dev/null +++ b/bot/exts/fun/status_codes.py @@ -0,0 +1,87 @@ +from random import choice + +import discord +from discord.ext import commands + +from bot.bot import Bot + +HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg" +HTTP_CAT_URL = "https://http.cat/{code}.jpg" +STATUS_TEMPLATE = "**Status: {code}**" +ERR_404 = "Unable to find status floof for {code}." +ERR_UNKNOWN = "Error attempting to retrieve status floof for {code}." +ERROR_LENGTH_EMBED = discord.Embed( + title="Input status code does not exist", + description="The range of valid status codes is 100 to 599", +) + + +class HTTPStatusCodes(commands.Cog): + """ + Fetch an image depicting HTTP status codes as a dog or a cat. + + If neither animal is selected a cat or dog is chosen randomly for the given status code. + """ + + def __init__(self, bot: Bot): + self.bot = bot + + @commands.group( + name="http_status", + aliases=("status", "httpstatus"), + invoke_without_command=True, + ) + async def http_status_group(self, ctx: commands.Context, code: int) -> None: + """Choose a cat or dog randomly for the given status code.""" + subcmd = choice((self.http_cat, self.http_dog)) + await subcmd(ctx, code) + + @http_status_group.command(name="cat") + async def http_cat(self, ctx: commands.Context, code: int) -> None: + """Send a cat version of the requested HTTP status code.""" + if code in range(100, 600): + await self.build_embed(url=HTTP_CAT_URL.format(code=code), ctx=ctx, code=code) + return + await ctx.send(embed=ERROR_LENGTH_EMBED) + + @http_status_group.command(name="dog") + async def http_dog(self, ctx: commands.Context, code: int) -> None: + """Send a dog version of the requested HTTP status code.""" + if code in range(100, 600): + await self.build_embed(url=HTTP_DOG_URL.format(code=code), ctx=ctx, code=code) + return + await ctx.send(embed=ERROR_LENGTH_EMBED) + + async def build_embed(self, url: str, ctx: commands.Context, code: int) -> None: + """Attempt to build and dispatch embed. Append error message instead if something goes wrong.""" + async with self.bot.http_session.get(url, allow_redirects=False) as response: + if response.status in range(200, 300): + await ctx.send( + embed=discord.Embed( + title=STATUS_TEMPLATE.format(code=code) + ).set_image(url=url) + ) + elif response.status in (302, 404): # dog URL returns 302 instead of 404 + if "dog" in url: + await ctx.send( + embed=discord.Embed( + title=ERR_404.format(code=code) + ).set_image(url="https://httpstatusdogs.com/img/404.jpg") + ) + return + await ctx.send( + embed=discord.Embed( + title=ERR_404.format(code=code) + ).set_image(url="https://http.cat/404.jpg") + ) + else: + await ctx.send( + embed=discord.Embed( + title=STATUS_TEMPLATE.format(code=code) + ).set_footer(text=ERR_UNKNOWN.format(code=code)) + ) + + +def setup(bot: Bot) -> None: + """Load the HTTPStatusCodes cog.""" + bot.add_cog(HTTPStatusCodes(bot)) diff --git a/bot/exts/fun/tic_tac_toe.py b/bot/exts/fun/tic_tac_toe.py new file mode 100644 index 00000000..5c4f8051 --- /dev/null +++ b/bot/exts/fun/tic_tac_toe.py @@ -0,0 +1,335 @@ +import asyncio +import random +from typing import Callable, Optional, Union + +import discord +from discord.ext.commands import Cog, Context, check, group, guild_only + +from bot.bot import Bot +from bot.constants import Emojis +from bot.utils.pagination import LinePaginator + +CONFIRMATION_MESSAGE = ( + "{opponent}, {requester} wants to play Tic-Tac-Toe against you." + f"\nReact to this message with {Emojis.confirmation} to accept or with {Emojis.decline} to decline." +) + + +def check_win(board: dict[int, str]) -> bool: + """Check from board, is any player won game.""" + return any( + ( + # Horizontal + board[1] == board[2] == board[3], + board[4] == board[5] == board[6], + board[7] == board[8] == board[9], + # Vertical + board[1] == board[4] == board[7], + board[2] == board[5] == board[8], + board[3] == board[6] == board[9], + # Diagonal + board[1] == board[5] == board[9], + board[3] == board[5] == board[7], + ) + ) + + +class Player: + """Class that contains information about player and functions that interact with player.""" + + def __init__(self, user: discord.User, ctx: Context, symbol: str): + self.user = user + self.ctx = ctx + self.symbol = symbol + + async def get_move(self, board: dict[int, str], msg: discord.Message) -> tuple[bool, Optional[int]]: + """ + Get move from user. + + Return is timeout reached and position of field what user will fill when timeout don't reach. + """ + def check_for_move(r: discord.Reaction, u: discord.User) -> bool: + """Check does user who reacted is user who we want, message is board and emoji is in board values.""" + return ( + u.id == self.user.id + and msg.id == r.message.id + and r.emoji in board.values() + and r.emoji in Emojis.number_emojis.values() + ) + + try: + react, _ = await self.ctx.bot.wait_for("reaction_add", timeout=30.0, check=check_for_move) + except asyncio.TimeoutError: + return True, None + else: + return False, list(Emojis.number_emojis.keys())[list(Emojis.number_emojis.values()).index(react.emoji)] + + def __str__(self) -> str: + """Return mention of user.""" + return self.user.mention + + +class AI: + """Tic Tac Toe AI class for against computer gaming.""" + + def __init__(self, symbol: str): + self.symbol = symbol + + async def get_move(self, board: dict[int, str], _: discord.Message) -> tuple[bool, int]: + """Get move from AI. AI use Minimax strategy.""" + possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())] + + for symbol in (Emojis.o_square, Emojis.x_square): + for move in possible_moves: + board_copy = board.copy() + board_copy[move] = symbol + if check_win(board_copy): + return False, move + + open_corners = [i for i in possible_moves if i in (1, 3, 7, 9)] + if len(open_corners) > 0: + return False, random.choice(open_corners) + + if 5 in possible_moves: + return False, 5 + + open_edges = [i for i in possible_moves if i in (2, 4, 6, 8)] + return False, random.choice(open_edges) + + def __str__(self) -> str: + """Return `AI` as user name.""" + return "AI" + + +class Game: + """Class that contains information and functions about Tic Tac Toe game.""" + + def __init__(self, players: list[Union[Player, AI]], ctx: Context): + self.players = players + self.ctx = ctx + self.board = { + 1: Emojis.number_emojis[1], + 2: Emojis.number_emojis[2], + 3: Emojis.number_emojis[3], + 4: Emojis.number_emojis[4], + 5: Emojis.number_emojis[5], + 6: Emojis.number_emojis[6], + 7: Emojis.number_emojis[7], + 8: Emojis.number_emojis[8], + 9: Emojis.number_emojis[9] + } + + self.current = self.players[0] + self.next = self.players[1] + + self.winner: Optional[Union[Player, AI]] = None + self.loser: Optional[Union[Player, AI]] = None + self.over = False + self.canceled = False + self.draw = False + + async def get_confirmation(self) -> tuple[bool, Optional[str]]: + """ + Ask does user want to play TicTacToe against requester. First player is always requester. + + This return tuple that have: + - first element boolean (is game accepted?) + - (optional, only when first element is False, otherwise None) reason for declining. + """ + confirm_message = await self.ctx.send( + CONFIRMATION_MESSAGE.format( + opponent=self.players[1].user.mention, + requester=self.players[0].user.mention + ) + ) + await confirm_message.add_reaction(Emojis.confirmation) + await confirm_message.add_reaction(Emojis.decline) + + def confirm_check(reaction: discord.Reaction, user: discord.User) -> bool: + """Check is user who reacted from who this was requested, message is confirmation and emoji is valid.""" + return ( + reaction.emoji in (Emojis.confirmation, Emojis.decline) + and reaction.message.id == confirm_message.id + and user == self.players[1].user + ) + + try: + reaction, user = await self.ctx.bot.wait_for( + "reaction_add", + timeout=60.0, + check=confirm_check + ) + except asyncio.TimeoutError: + self.over = True + self.canceled = True + await confirm_message.delete() + return False, "Running out of time... Cancelled game." + + await confirm_message.delete() + if reaction.emoji == Emojis.confirmation: + return True, None + else: + self.over = True + self.canceled = True + return False, "User declined" + + async def add_reactions(self, msg: discord.Message) -> None: + """Add number emojis to message.""" + for nr in Emojis.number_emojis.values(): + await msg.add_reaction(nr) + + def format_board(self) -> str: + """Get formatted tic-tac-toe board for message.""" + board = list(self.board.values()) + return "\n".join( + (f"{board[line]} {board[line + 1]} {board[line + 2]}" for line in range(0, len(board), 3)) + ) + + async def play(self) -> None: + """Start and handle game.""" + await self.ctx.send("It's time for the game! Let's begin.") + board = await self.ctx.send( + embed=discord.Embed(description=self.format_board()) + ) + await self.add_reactions(board) + + for _ in range(9): + if isinstance(self.current, Player): + announce = await self.ctx.send( + f"{self.current.user.mention}, it's your turn! " + "React with an emoji to take your go." + ) + timeout, pos = await self.current.get_move(self.board, board) + if isinstance(self.current, Player): + await announce.delete() + if timeout: + await self.ctx.send(f"{self.current.user.mention} ran out of time. Canceling game.") + self.over = True + self.canceled = True + return + self.board[pos] = self.current.symbol + await board.edit( + embed=discord.Embed(description=self.format_board()) + ) + await board.clear_reaction(Emojis.number_emojis[pos]) + if check_win(self.board): + self.winner = self.current + self.loser = self.next + await self.ctx.send( + f":tada: {self.current} won this game! :tada:" + ) + await board.clear_reactions() + break + self.current, self.next = self.next, self.current + if not self.winner: + self.draw = True + await self.ctx.send("It's a DRAW!") + self.over = True + + +def is_channel_free() -> Callable: + """Check is channel where command will be invoked free.""" + async def predicate(ctx: Context) -> bool: + return all(game.channel != ctx.channel for game in ctx.cog.games if not game.over) + return check(predicate) + + +def is_requester_free() -> Callable: + """Check is requester not already in any game.""" + async def predicate(ctx: Context) -> bool: + return all( + ctx.author not in (player.user for player in game.players) for game in ctx.cog.games if not game.over + ) + return check(predicate) + + +class TicTacToe(Cog): + """TicTacToe cog contains tic-tac-toe game commands.""" + + def __init__(self): + self.games: list[Game] = [] + + @guild_only() + @is_channel_free() + @is_requester_free() + @group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True) + async def tic_tac_toe(self, ctx: Context, opponent: Optional[discord.User]) -> None: + """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field.""" + if opponent == ctx.author: + await ctx.send("You can't play against yourself.") + return + if opponent is not None and not all( + opponent not in (player.user for player in g.players) for g in ctx.cog.games if not g.over + ): + await ctx.send("Opponent is already in game.") + return + if opponent is None: + game = Game( + [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)], + ctx + ) + else: + game = Game( + [Player(ctx.author, ctx, Emojis.x_square), Player(opponent, ctx, Emojis.o_square)], + ctx + ) + self.games.append(game) + if opponent is not None: + if opponent.bot: # check whether the opponent is a bot or not + await ctx.send("You can't play Tic-Tac-Toe with bots!") + return + + confirmed, msg = await game.get_confirmation() + + if not confirmed: + if msg: + await ctx.send(msg) + return + await game.play() + + @tic_tac_toe.group(name="history", aliases=("log",), invoke_without_command=True) + async def tic_tac_toe_logs(self, ctx: Context) -> None: + """Show most recent tic-tac-toe games.""" + if len(self.games) < 1: + await ctx.send("No recent games.") + return + log_games = [] + for i, game in enumerate(self.games): + if game.over and not game.canceled: + if game.draw: + log_games.append( + f"**#{i+1}**: {game.players[0]} vs {game.players[1]} (draw)" + ) + else: + log_games.append( + f"**#{i+1}**: {game.winner} :trophy: vs {game.loser}" + ) + await LinePaginator.paginate( + log_games, + ctx, + discord.Embed(title="Most recent Tic Tac Toe games") + ) + + @tic_tac_toe_logs.command(name="show", aliases=("s",)) + async def show_tic_tac_toe_board(self, ctx: Context, game_id: int) -> None: + """View game board by ID (ID is possible to get by `.tictactoe history`).""" + if len(self.games) < game_id: + await ctx.send("Game don't exist.") + return + game = self.games[game_id - 1] + + if game.draw: + description = f"{game.players[0]} vs {game.players[1]} (draw)\n\n{game.format_board()}" + else: + description = f"{game.winner} :trophy: vs {game.loser}\n\n{game.format_board()}" + + embed = discord.Embed( + title=f"Match #{game_id} Game Board", + description=description, + ) + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the TicTacToe cog.""" + bot.add_cog(TicTacToe()) diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py new file mode 100644 index 00000000..cf9e6cd3 --- /dev/null +++ b/bot/exts/fun/trivia_quiz.py @@ -0,0 +1,593 @@ +import asyncio +import json +import logging +import operator +import random +from dataclasses import dataclass +from pathlib import Path +from typing import Callable, Optional + +import discord +from discord.ext import commands +from rapidfuzz import fuzz + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, Roles + +logger = logging.getLogger(__name__) + +DEFAULT_QUESTION_LIMIT = 6 +STANDARD_VARIATION_TOLERANCE = 88 +DYNAMICALLY_GEN_VARIATION_TOLERANCE = 97 + +WRONG_ANS_RESPONSE = [ + "No one answered correctly!", + "Better luck next time...", +] + +N_PREFIX_STARTS_AT = 5 +N_PREFIXES = [ + "penta", "hexa", "hepta", "octa", "nona", + "deca", "hendeca", "dodeca", "trideca", "tetradeca", +] + +PLANETS = [ + ("1st", "Mercury"), + ("2nd", "Venus"), + ("3rd", "Earth"), + ("4th", "Mars"), + ("5th", "Jupiter"), + ("6th", "Saturn"), + ("7th", "Uranus"), + ("8th", "Neptune"), +] + +TAXONOMIC_HIERARCHY = [ + "species", "genus", "family", "order", + "class", "phylum", "kingdom", "domain", +] + +UNITS_TO_BASE_UNITS = { + "hertz": ("(unit of frequency)", "s^-1"), + "newton": ("(unit of force)", "m*kg*s^-2"), + "pascal": ("(unit of pressure & stress)", "m^-1*kg*s^-2"), + "joule": ("(unit of energy & quantity of heat)", "m^2*kg*s^-2"), + "watt": ("(unit of power)", "m^2*kg*s^-3"), + "coulomb": ("(unit of electric charge & quantity of electricity)", "s*A"), + "volt": ("(unit of voltage & electromotive force)", "m^2*kg*s^-3*A^-1"), + "farad": ("(unit of capacitance)", "m^-2*kg^-1*s^4*A^2"), + "ohm": ("(unit of electric resistance)", "m^2*kg*s^-3*A^-2"), + "weber": ("(unit of magnetic flux)", "m^2*kg*s^-2*A^-1"), + "tesla": ("(unit of magnetic flux density)", "kg*s^-2*A^-1"), +} + + +@dataclass(frozen=True) +class QuizEntry: + """Dataclass for a quiz entry (a question and a string containing answers separated by commas).""" + + question: str + answer: str + + +def linear_system(q_format: str, a_format: str) -> QuizEntry: + """Generate a system of linear equations with two unknowns.""" + x, y = random.randint(2, 5), random.randint(2, 5) + answer = a_format.format(x, y) + + coeffs = random.sample(range(1, 6), 4) + + question = q_format.format( + coeffs[0], + coeffs[1], + coeffs[0] * x + coeffs[1] * y, + coeffs[2], + coeffs[3], + coeffs[2] * x + coeffs[3] * y, + ) + + return QuizEntry(question, answer) + + +def mod_arith(q_format: str, a_format: str) -> QuizEntry: + """Generate a basic modular arithmetic question.""" + quotient, m, b = random.randint(30, 40), random.randint(10, 20), random.randint(200, 350) + ans = random.randint(0, 9) # max remainder is 9, since the minimum modulus is 10 + a = quotient * m + ans - b + + question = q_format.format(a, b, m) + answer = a_format.format(ans) + + return QuizEntry(question, answer) + + +def ngonal_prism(q_format: str, a_format: str) -> QuizEntry: + """Generate a question regarding vertices on n-gonal prisms.""" + n = random.randint(0, len(N_PREFIXES) - 1) + + question = q_format.format(N_PREFIXES[n]) + answer = a_format.format((n + N_PREFIX_STARTS_AT) * 2) + + return QuizEntry(question, answer) + + +def imag_sqrt(q_format: str, a_format: str) -> QuizEntry: + """Generate a negative square root question.""" + ans_coeff = random.randint(3, 10) + + question = q_format.format(ans_coeff ** 2) + answer = a_format.format(ans_coeff) + + return QuizEntry(question, answer) + + +def binary_calc(q_format: str, a_format: str) -> QuizEntry: + """Generate a binary calculation question.""" + a = random.randint(15, 20) + b = random.randint(10, a) + oper = random.choice( + ( + ("+", operator.add), + ("-", operator.sub), + ("*", operator.mul), + ) + ) + + # if the operator is multiplication, lower the values of the two operands to make it easier + if oper[0] == "*": + a -= 5 + b -= 5 + + question = q_format.format(a, oper[0], b) + answer = a_format.format(oper[1](a, b)) + + return QuizEntry(question, answer) + + +def solar_system(q_format: str, a_format: str) -> QuizEntry: + """Generate a question on the planets of the Solar System.""" + planet = random.choice(PLANETS) + + question = q_format.format(planet[0]) + answer = a_format.format(planet[1]) + + return QuizEntry(question, answer) + + +def taxonomic_rank(q_format: str, a_format: str) -> QuizEntry: + """Generate a question on taxonomic classification.""" + level = random.randint(0, len(TAXONOMIC_HIERARCHY) - 2) + + question = q_format.format(TAXONOMIC_HIERARCHY[level]) + answer = a_format.format(TAXONOMIC_HIERARCHY[level + 1]) + + return QuizEntry(question, answer) + + +def base_units_convert(q_format: str, a_format: str) -> QuizEntry: + """Generate a SI base units conversion question.""" + unit = random.choice(list(UNITS_TO_BASE_UNITS)) + + question = q_format.format( + unit + " " + UNITS_TO_BASE_UNITS[unit][0] + ) + answer = a_format.format( + UNITS_TO_BASE_UNITS[unit][1] + ) + + return QuizEntry(question, answer) + + +DYNAMIC_QUESTIONS_FORMAT_FUNCS = { + 201: linear_system, + 202: mod_arith, + 203: ngonal_prism, + 204: imag_sqrt, + 205: binary_calc, + 301: solar_system, + 302: taxonomic_rank, + 303: base_units_convert, +} + + +class TriviaQuiz(commands.Cog): + """A cog for all quiz commands.""" + + def __init__(self, bot: Bot): + self.bot = bot + + self.game_status = {} # A variable to store the game status: either running or not running. + self.game_owners = {} # A variable to store the person's ID who started the quiz game in a channel. + + self.questions = self.load_questions() + self.question_limit = 0 + + self.player_scores = {} # A variable to store all player's scores for a bot session. + self.game_player_scores = {} # A variable to store temporary game player's scores. + + self.categories = { + "general": "Test your general knowledge.", + "retro": "Questions related to retro gaming.", + "math": "General questions about mathematics ranging from grade 8 to grade 12.", + "science": "Put your understanding of science to the test!", + "cs": "A large variety of computer science questions.", + "python": "Trivia on our amazing language, Python!", + } + + @staticmethod + def load_questions() -> dict: + """Load the questions from the JSON file.""" + p = Path("bot", "resources", "fun", "trivia_quiz.json") + + return json.loads(p.read_text(encoding="utf-8")) + + @commands.group(name="quiz", aliases=["trivia"], invoke_without_command=True) + async def quiz_game(self, ctx: commands.Context, category: Optional[str], questions: Optional[int]) -> None: + """ + Start a quiz! + + Questions for the quiz can be selected from the following categories: + - general: Test your general knowledge. + - retro: Questions related to retro gaming. + - math: General questions about mathematics ranging from grade 8 to grade 12. + - science: Put your understanding of science to the test! + - cs: A large variety of computer science questions. + - python: Trivia on our amazing language, Python! + + (More to come!) + """ + if ctx.channel.id not in self.game_status: + self.game_status[ctx.channel.id] = False + + if ctx.channel.id not in self.game_player_scores: + self.game_player_scores[ctx.channel.id] = {} + + # Stop game if running. + if self.game_status[ctx.channel.id]: + await ctx.send( + "Game is already running... " + f"do `{self.bot.command_prefix}quiz stop`" + ) + return + + # Send embed showing available categories if inputted category is invalid. + if category is None: + category = random.choice(list(self.categories)) + + category = category.lower() + if category not in self.categories: + embed = self.category_embed() + await ctx.send(embed=embed) + return + + topic = self.questions[category] + topic_length = len(topic) + + if questions is None: + self.question_limit = DEFAULT_QUESTION_LIMIT + else: + if questions > topic_length: + await ctx.send( + embed=self.make_error_embed( + f"This category only has {topic_length} questions. " + "Please input a lower value!" + ) + ) + return + + elif questions < 1: + await ctx.send( + embed=self.make_error_embed( + "You must choose to complete at least one question. " + f"(or enter nothing for the default value of {DEFAULT_QUESTION_LIMIT + 1} questions)" + ) + ) + return + + else: + self.question_limit = questions - 1 + + # Start game if not running. + if not self.game_status[ctx.channel.id]: + self.game_owners[ctx.channel.id] = ctx.author + self.game_status[ctx.channel.id] = True + start_embed = self.make_start_embed(category) + + await ctx.send(embed=start_embed) # send an embed with the rules + await asyncio.sleep(5) + + done_question = [] + hint_no = 0 + answers = None + + while self.game_status[ctx.channel.id]: + # Exit quiz if number of questions for a round are already sent. + if len(done_question) > self.question_limit and hint_no == 0: + await ctx.send("The round has ended.") + await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) + + self.game_status[ctx.channel.id] = False + del self.game_owners[ctx.channel.id] + self.game_player_scores[ctx.channel.id] = {} + + break + + # If no hint has been sent or any time alert. Basically if hint_no = 0 means it is a new question. + if hint_no == 0: + # Select a random question which has not been used yet. + while True: + question_dict = random.choice(topic) + if question_dict["id"] not in done_question: + done_question.append(question_dict["id"]) + break + + if "dynamic_id" not in question_dict: + question = question_dict["question"] + answers = question_dict["answer"].split(", ") + + var_tol = STANDARD_VARIATION_TOLERANCE + else: + format_func = DYNAMIC_QUESTIONS_FORMAT_FUNCS[question_dict["dynamic_id"]] + + quiz_entry = format_func( + question_dict["question"], + question_dict["answer"], + ) + + question, answers = quiz_entry.question, quiz_entry.answer + answers = [answers] + + var_tol = DYNAMICALLY_GEN_VARIATION_TOLERANCE + + embed = discord.Embed( + colour=Colours.gold, + title=f"Question #{len(done_question)}", + description=question, + ) + + if img_url := question_dict.get("img_url"): + embed.set_image(url=img_url) + + await ctx.send(embed=embed) + + def check_func(variation_tolerance: int) -> Callable[[discord.Message], bool]: + def contains_correct_answer(m: discord.Message) -> bool: + return m.channel == ctx.channel and any( + fuzz.ratio(answer.lower(), m.content.lower()) > variation_tolerance + for answer in answers + ) + + return contains_correct_answer + + try: + msg = await self.bot.wait_for("message", check=check_func(var_tol), timeout=10) + except asyncio.TimeoutError: + # In case of TimeoutError and the game has been stopped, then do nothing. + if not self.game_status[ctx.channel.id]: + break + + if hint_no < 2: + hint_no += 1 + + if "hints" in question_dict: + hints = question_dict["hints"] + + await ctx.send(f"**Hint #{hint_no}\n**{hints[hint_no - 1]}") + else: + await ctx.send(f"{30 - hint_no * 10}s left!") + + # Once hint or time alerts has been sent 2 times, the hint_no value will be 3 + # If hint_no > 2, then it means that all hints/time alerts have been sent. + # Also means that the answer is not yet given and the bot sends the answer and the next question. + else: + if self.game_status[ctx.channel.id] is False: + break + + response = random.choice(WRONG_ANS_RESPONSE) + await ctx.send(response) + + await self.send_answer( + ctx.channel, + answers, + False, + question_dict, + self.question_limit - len(done_question) + 1, + ) + await asyncio.sleep(1) + + hint_no = 0 # Reset the hint counter so that on the next round, it's in the initial state + + await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) + await asyncio.sleep(2) + else: + if self.game_status[ctx.channel.id] is False: + break + + points = 100 - 25 * hint_no + if msg.author in self.game_player_scores[ctx.channel.id]: + self.game_player_scores[ctx.channel.id][msg.author] += points + else: + self.game_player_scores[ctx.channel.id][msg.author] = points + + # Also updating the overall scoreboard. + if msg.author in self.player_scores: + self.player_scores[msg.author] += points + else: + self.player_scores[msg.author] = points + + hint_no = 0 + + await ctx.send(f"{msg.author.mention} got the correct answer :tada: {points} points!") + + await self.send_answer( + ctx.channel, + answers, + True, + question_dict, + self.question_limit - len(done_question) + 1, + ) + await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id]) + + await asyncio.sleep(2) + + def make_start_embed(self, category: str) -> discord.Embed: + """Generate a starting/introduction embed for the quiz.""" + start_embed = discord.Embed( + colour=Colours.blue, + title="A quiz game is starting!", + description=( + f"This game consists of {self.question_limit + 1} questions.\n\n" + "**Rules: **\n" + "1. Only enclose your answer in backticks when the question tells you to.\n" + "2. If the question specifies an answer format, follow it or else it won't be accepted.\n" + "3. You have 30s per question. Points for each question reduces by 25 after 10s or after a hint.\n" + "4. No cheating and have fun!\n\n" + f"**Category**: {category}" + ), + ) + + return start_embed + + @staticmethod + def make_error_embed(desc: str) -> discord.Embed: + """Generate an error embed with the given description.""" + error_embed = discord.Embed( + colour=Colours.soft_red, + title=random.choice(NEGATIVE_REPLIES), + description=desc, + ) + + return error_embed + + @quiz_game.command(name="stop") + async def stop_quiz(self, ctx: commands.Context) -> None: + """ + Stop a quiz game if its running in the channel. + + Note: Only mods or the owner of the quiz can stop it. + """ + try: + if self.game_status[ctx.channel.id]: + # Check if the author is the game starter or a moderator. + if ctx.author == self.game_owners[ctx.channel.id] or any( + Roles.moderator == role.id for role in ctx.author.roles + ): + self.game_status[ctx.channel.id] = False + del self.game_owners[ctx.channel.id] + self.game_player_scores[ctx.channel.id] = {} + + await ctx.send("Quiz stopped.") + await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id]) + + else: + await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!") + else: + await ctx.send("No quiz running.") + except KeyError: + await ctx.send("No quiz running.") + + @quiz_game.command(name="leaderboard") + async def leaderboard(self, ctx: commands.Context) -> None: + """View everyone's score for this bot session.""" + await self.send_score(ctx.channel, self.player_scores) + + @staticmethod + async def send_score(channel: discord.TextChannel, player_data: dict) -> None: + """Send the current scores of players in the game channel.""" + if len(player_data) == 0: + await channel.send("No one has made it onto the leaderboard yet.") + return + + embed = discord.Embed( + colour=Colours.blue, + title="Score Board", + description="", + ) + + sorted_dict = sorted(player_data.items(), key=operator.itemgetter(1), reverse=True) + for item in sorted_dict: + embed.description += f"{item[0]}: {item[1]}\n" + + await channel.send(embed=embed) + + @staticmethod + async def declare_winner(channel: discord.TextChannel, player_data: dict) -> None: + """Announce the winner of the quiz in the game channel.""" + if player_data: + highest_points = max(list(player_data.values())) + no_of_winners = list(player_data.values()).count(highest_points) + + # Check if more than 1 player has highest points. + if no_of_winners > 1: + winners = [] + points_copy = list(player_data.values()).copy() + + for _ in range(no_of_winners): + index = points_copy.index(highest_points) + winners.append(list(player_data.keys())[index]) + points_copy[index] = 0 + + winners_mention = " ".join(winner.mention for winner in winners) + else: + author_index = list(player_data.values()).index(highest_points) + winner = list(player_data.keys())[author_index] + winners_mention = winner.mention + + await channel.send( + f"Congratulations {winners_mention} :tada: " + f"You have won this quiz game with a grand total of {highest_points} points!" + ) + + def category_embed(self) -> discord.Embed: + """Build an embed showing all available trivia categories.""" + embed = discord.Embed( + colour=Colours.blue, + title="The available question categories are:", + description="", + ) + + embed.set_footer(text="If a category is not chosen, a random one will be selected.") + + for cat, description in self.categories.items(): + embed.description += ( + f"**- {cat.capitalize()}**\n" + f"{description.capitalize()}\n" + ) + + return embed + + @staticmethod + async def send_answer( + channel: discord.TextChannel, + answers: list[str], + answer_is_correct: bool, + question_dict: dict, + q_left: int, + ) -> None: + """Send the correct answer of a question to the game channel.""" + info = question_dict.get("info") + + plurality = " is" if len(answers) == 1 else "s are" + + embed = discord.Embed( + color=Colours.bright_green, + title=( + ("You got it! " if answer_is_correct else "") + + f"The correct answer{plurality} **`{', '.join(answers)}`**\n" + ), + description="", + ) + + if info is not None: + embed.description += f"**Information**\n{info}\n\n" + + embed.description += ( + ("Let's move to the next question." if q_left > 0 else "") + + f"\nRemaining questions: {q_left}" + ) + await channel.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the TriviaQuiz cog.""" + bot.add_cog(TriviaQuiz(bot)) diff --git a/bot/exts/fun/wonder_twins.py b/bot/exts/fun/wonder_twins.py new file mode 100644 index 00000000..79d6b6d9 --- /dev/null +++ b/bot/exts/fun/wonder_twins.py @@ -0,0 +1,49 @@ +import random +from pathlib import Path + +import yaml +from discord.ext.commands import Cog, Context, command + +from bot.bot import Bot + + +class WonderTwins(Cog): + """Cog for a Wonder Twins inspired command.""" + + def __init__(self): + with open(Path.cwd() / "bot" / "resources" / "fun" / "wonder_twins.yaml", "r", encoding="utf-8") as f: + info = yaml.load(f, Loader=yaml.FullLoader) + self.water_types = info["water_types"] + self.objects = info["objects"] + self.adjectives = info["adjectives"] + + @staticmethod + def append_onto(phrase: str, insert_word: str) -> str: + """Appends one word onto the end of another phrase in order to format with the proper determiner.""" + if insert_word.endswith("s"): + phrase = phrase.split() + del phrase[0] + phrase = " ".join(phrase) + + insert_word = insert_word.split()[-1] + return " ".join([phrase, insert_word]) + + def format_phrase(self) -> str: + """Creates a transformation phrase from available words.""" + adjective = random.choice((None, random.choice(self.adjectives))) + object_name = random.choice(self.objects) + water_type = random.choice(self.water_types) + + if adjective: + object_name = self.append_onto(adjective, object_name) + return f"{object_name} of {water_type}" + + @command(name="formof", aliases=("wondertwins", "wondertwin", "fo")) + async def form_of(self, ctx: Context) -> None: + """Command to send a Wonder Twins inspired phrase to the user invoking the command.""" + await ctx.send(f"Form of {self.format_phrase()}!") + + +def setup(bot: Bot) -> None: + """Load the WonderTwins cog.""" + bot.add_cog(WonderTwins()) diff --git a/bot/exts/fun/xkcd.py b/bot/exts/fun/xkcd.py new file mode 100644 index 00000000..b56c53d9 --- /dev/null +++ b/bot/exts/fun/xkcd.py @@ -0,0 +1,91 @@ +import logging +import re +from random import randint +from typing import Optional, Union + +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, command + +from bot.bot import Bot +from bot.constants import Colours + +log = logging.getLogger(__name__) + +COMIC_FORMAT = re.compile(r"latest|[0-9]+") +BASE_URL = "https://xkcd.com" + + +class XKCD(Cog): + """Retrieving XKCD comics.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.latest_comic_info: dict[str, Union[str, int]] = {} + self.get_latest_comic_info.start() + + def cog_unload(self) -> None: + """Cancels refreshing of the task for refreshing the most recent comic info.""" + self.get_latest_comic_info.cancel() + + @tasks.loop(minutes=30) + async def get_latest_comic_info(self) -> None: + """Refreshes latest comic's information ever 30 minutes. Also used for finding a random comic.""" + async with self.bot.http_session.get(f"{BASE_URL}/info.0.json") as resp: + if resp.status == 200: + self.latest_comic_info = await resp.json() + else: + log.debug(f"Failed to get latest XKCD comic information. Status code {resp.status}") + + @command(name="xkcd") + async def fetch_xkcd_comics(self, ctx: Context, comic: Optional[str]) -> None: + """ + Getting an xkcd comic's information along with the image. + + To get a random comic, don't type any number as an argument. To get the latest, type 'latest'. + """ + embed = Embed(title=f"XKCD comic '{comic}'") + + embed.colour = Colours.soft_red + + if comic and (comic := re.match(COMIC_FORMAT, comic)) is None: + embed.description = "Comic parameter should either be an integer or 'latest'." + await ctx.send(embed=embed) + return + + comic = randint(1, self.latest_comic_info["num"]) if comic is None else comic.group(0) + + if comic == "latest": + info = self.latest_comic_info + else: + async with self.bot.http_session.get(f"{BASE_URL}/{comic}/info.0.json") as resp: + if resp.status == 200: + info = await resp.json() + else: + embed.title = f"XKCD comic #{comic}" + embed.description = f"{resp.status}: Could not retrieve xkcd comic #{comic}." + log.debug(f"Retrieving xkcd comic #{comic} failed with status code {resp.status}.") + await ctx.send(embed=embed) + return + + embed.title = f"XKCD comic #{info['num']}" + embed.description = info["alt"] + embed.url = f"{BASE_URL}/{info['num']}" + + if info["img"][-3:] in ("jpg", "png", "gif"): + embed.set_image(url=info["img"]) + date = f"{info['year']}/{info['month']}/{info['day']}" + embed.set_footer(text=f"{date} - #{info['num']}, \'{info['safe_title']}\'") + embed.colour = Colours.soft_green + else: + embed.description = ( + "The selected comic is interactive, and cannot be displayed within an embed.\n" + f"Comic can be viewed [here](https://xkcd.com/{info['num']})." + ) + + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the XKCD cog.""" + bot.add_cog(XKCD(bot)) |