aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/fun
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/fun')
-rw-r--r--bot/exts/fun/__init__.py0
-rw-r--r--bot/exts/fun/battleship.py448
-rw-r--r--bot/exts/fun/catify.py86
-rw-r--r--bot/exts/fun/coinflip.py53
-rw-r--r--bot/exts/fun/connect_four.py452
-rw-r--r--bot/exts/fun/duck_game.py336
-rw-r--r--bot/exts/fun/fun.py250
-rw-r--r--bot/exts/fun/game.py485
-rw-r--r--bot/exts/fun/magic_8ball.py30
-rw-r--r--bot/exts/fun/minesweeper.py270
-rw-r--r--bot/exts/fun/movie.py205
-rw-r--r--bot/exts/fun/recommend_game.py51
-rw-r--r--bot/exts/fun/rps.py57
-rw-r--r--bot/exts/fun/snakes/__init__.py11
-rw-r--r--bot/exts/fun/snakes/_converter.py82
-rw-r--r--bot/exts/fun/snakes/_snakes_cog.py1151
-rw-r--r--bot/exts/fun/snakes/_utils.py721
-rw-r--r--bot/exts/fun/space.py236
-rw-r--r--bot/exts/fun/speedrun.py26
-rw-r--r--bot/exts/fun/status_codes.py87
-rw-r--r--bot/exts/fun/tic_tac_toe.py335
-rw-r--r--bot/exts/fun/trivia_quiz.py593
-rw-r--r--bot/exts/fun/wonder_twins.py49
-rw-r--r--bot/exts/fun/xkcd.py91
24 files changed, 6105 insertions, 0 deletions
diff --git a/bot/exts/fun/__init__.py b/bot/exts/fun/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/fun/__init__.py
diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py
new file mode 100644
index 00000000..f4351954
--- /dev/null
+++ b/bot/exts/fun/battleship.py
@@ -0,0 +1,448 @@
+import asyncio
+import logging
+import random
+import re
+from dataclasses import dataclass
+from functools import partial
+from typing import Optional
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class Square:
+ """Each square on the battleship grid - if they contain a boat and if they've been aimed at."""
+
+ boat: Optional[str]
+ aimed: bool
+
+
+Grid = list[list[Square]]
+EmojiSet = dict[tuple[bool, bool], str]
+
+
+@dataclass
+class Player:
+ """Each player in the game - their messages for the boards and their current grid."""
+
+ user: Optional[discord.Member]
+ board: Optional[discord.Message]
+ opponent_board: discord.Message
+ grid: Grid
+
+
+# The name of the ship and its size
+SHIPS = {
+ "Carrier": 5,
+ "Battleship": 4,
+ "Cruiser": 3,
+ "Submarine": 3,
+ "Destroyer": 2,
+}
+
+
+# For these two variables, the first boolean is whether the square is a ship (True) or not (False).
+# The second boolean is whether the player has aimed for that square (True) or not (False)
+
+# This is for the player's own board which shows the location of their own ships.
+SHIP_EMOJIS = {
+ (True, True): ":fire:",
+ (True, False): ":ship:",
+ (False, True): ":anger:",
+ (False, False): ":ocean:",
+}
+
+# This is for the opposing player's board which only shows aimed locations.
+HIDDEN_EMOJIS = {
+ (True, True): ":red_circle:",
+ (True, False): ":black_circle:",
+ (False, True): ":white_circle:",
+ (False, False): ":black_circle:",
+}
+
+# For the top row of the board
+LETTERS = (
+ ":stop_button::regional_indicator_a::regional_indicator_b::regional_indicator_c::regional_indicator_d:"
+ ":regional_indicator_e::regional_indicator_f::regional_indicator_g::regional_indicator_h:"
+ ":regional_indicator_i::regional_indicator_j:"
+)
+
+# For the first column of the board
+NUMBERS = [
+ ":one:",
+ ":two:",
+ ":three:",
+ ":four:",
+ ":five:",
+ ":six:",
+ ":seven:",
+ ":eight:",
+ ":nine:",
+ ":keycap_ten:",
+]
+
+CROSS_EMOJI = "\u274e"
+HAND_RAISED_EMOJI = "\U0001f64b"
+
+
+class Game:
+ """A Battleship Game."""
+
+ def __init__(
+ self,
+ bot: Bot,
+ channel: discord.TextChannel,
+ player1: discord.Member,
+ player2: discord.Member
+ ):
+
+ self.bot = bot
+ self.public_channel = channel
+
+ self.p1 = Player(player1, None, None, self.generate_grid())
+ self.p2 = Player(player2, None, None, self.generate_grid())
+
+ self.gameover: bool = False
+
+ self.turn: Optional[discord.Member] = None
+ self.next: Optional[discord.Member] = None
+
+ self.match: Optional[re.Match] = None
+ self.surrender: bool = False
+
+ self.setup_grids()
+
+ @staticmethod
+ def generate_grid() -> Grid:
+ """Generates a grid by instantiating the Squares."""
+ return [[Square(None, False) for _ in range(10)] for _ in range(10)]
+
+ @staticmethod
+ def format_grid(player: Player, emojiset: EmojiSet) -> str:
+ """
+ Gets and formats the grid as a list into a string to be output to the DM.
+
+ Also adds the Letter and Number indexes.
+ """
+ grid = [
+ [emojiset[bool(square.boat), square.aimed] for square in row]
+ for row in player.grid
+ ]
+
+ rows = ["".join([number] + row) for number, row in zip(NUMBERS, grid)]
+ return "\n".join([LETTERS] + rows)
+
+ @staticmethod
+ def get_square(grid: Grid, square: str) -> Square:
+ """Grabs a square from a grid with an inputted key."""
+ index = ord(square[0].upper()) - ord("A")
+ number = int(square[1:])
+
+ return grid[number-1][index] # -1 since lists are indexed from 0
+
+ async def game_over(
+ self,
+ *,
+ winner: discord.Member,
+ loser: discord.Member
+ ) -> None:
+ """Removes games from list of current games and announces to public chat."""
+ await self.public_channel.send(f"Game Over! {winner.mention} won against {loser.mention}")
+
+ for player in (self.p1, self.p2):
+ grid = self.format_grid(player, SHIP_EMOJIS)
+ await self.public_channel.send(f"{player.user}'s Board:\n{grid}")
+
+ @staticmethod
+ def check_sink(grid: Grid, boat: str) -> bool:
+ """Checks if all squares containing a given boat have sunk."""
+ return all(square.aimed for row in grid for square in row if square.boat == boat)
+
+ @staticmethod
+ def check_gameover(grid: Grid) -> bool:
+ """Checks if all boats have been sunk."""
+ return all(square.aimed for row in grid for square in row if square.boat)
+
+ def setup_grids(self) -> None:
+ """Places the boats on the grids to initialise the game."""
+ for player in (self.p1, self.p2):
+ for name, size in SHIPS.items():
+ while True: # Repeats if about to overwrite another boat
+ ship_collision = False
+ coords = []
+
+ coord1 = random.randint(0, 9)
+ coord2 = random.randint(0, 10 - size)
+
+ if random.choice((True, False)): # Vertical or Horizontal
+ x, y = coord1, coord2
+ xincr, yincr = 0, 1
+ else:
+ x, y = coord2, coord1
+ xincr, yincr = 1, 0
+
+ for i in range(size):
+ new_x = x + (xincr * i)
+ new_y = y + (yincr * i)
+ if player.grid[new_x][new_y].boat: # Check if there's already a boat
+ ship_collision = True
+ break
+ coords.append((new_x, new_y))
+ if not ship_collision: # If not overwriting any other boat spaces, break loop
+ break
+
+ for x, y in coords:
+ player.grid[x][y].boat = name
+
+ async def print_grids(self) -> None:
+ """Prints grids to the DM channels."""
+ # Convert squares into Emoji
+
+ boards = [
+ self.format_grid(player, emojiset)
+ for emojiset in (HIDDEN_EMOJIS, SHIP_EMOJIS)
+ for player in (self.p1, self.p2)
+ ]
+
+ locations = (
+ (self.p2, "opponent_board"), (self.p1, "opponent_board"),
+ (self.p1, "board"), (self.p2, "board")
+ )
+
+ for board, location in zip(boards, locations):
+ player, attr = location
+ if getattr(player, attr):
+ await getattr(player, attr).edit(content=board)
+ else:
+ setattr(player, attr, await player.user.send(board))
+
+ def predicate(self, message: discord.Message) -> bool:
+ """Predicate checking the message typed for each turn."""
+ if message.author == self.turn.user and message.channel == self.turn.user.dm_channel:
+ if message.content.lower() == "surrender":
+ self.surrender = True
+ return True
+ self.match = re.fullmatch("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip())
+ if not self.match:
+ self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI))
+ return bool(self.match)
+
+ async def take_turn(self) -> Optional[Square]:
+ """Lets the player who's turn it is choose a square."""
+ square = None
+ turn_message = await self.turn.user.send(
+ "It's your turn! Type the square you want to fire at. Format it like this: A1\n"
+ "Type `surrender` to give up."
+ )
+ await self.next.user.send("Their turn", delete_after=3.0)
+ while True:
+ try:
+ await self.bot.wait_for("message", check=self.predicate, timeout=60.0)
+ except asyncio.TimeoutError:
+ await self.turn.user.send("You took too long. Game over!")
+ await self.next.user.send(f"{self.turn.user} took too long. Game over!")
+ await self.public_channel.send(
+ f"Game over! {self.turn.user.mention} timed out so {self.next.user.mention} wins!"
+ )
+ self.gameover = True
+ break
+ else:
+ if self.surrender:
+ await self.next.user.send(f"{self.turn.user} surrendered. Game over!")
+ await self.public_channel.send(
+ f"Game over! {self.turn.user.mention} surrendered to {self.next.user.mention}!"
+ )
+ self.gameover = True
+ break
+ square = self.get_square(self.next.grid, self.match.string)
+ if square.aimed:
+ await self.turn.user.send("You've already aimed at this square!", delete_after=3.0)
+ else:
+ break
+ await turn_message.delete()
+ return square
+
+ async def hit(self, square: Square, alert_messages: list[discord.Message]) -> None:
+ """Occurs when a player successfully aims for a ship."""
+ await self.turn.user.send("Hit!", delete_after=3.0)
+ alert_messages.append(await self.next.user.send("Hit!"))
+ if self.check_sink(self.next.grid, square.boat):
+ await self.turn.user.send(f"You've sunk their {square.boat} ship!", delete_after=3.0)
+ alert_messages.append(await self.next.user.send(f"Oh no! Your {square.boat} ship sunk!"))
+ if self.check_gameover(self.next.grid):
+ await self.turn.user.send("You win!")
+ await self.next.user.send("You lose!")
+ self.gameover = True
+ await self.game_over(winner=self.turn.user, loser=self.next.user)
+
+ async def start_game(self) -> None:
+ """Begins the game."""
+ await self.p1.user.send(f"You're playing battleship with {self.p2.user}.")
+ await self.p2.user.send(f"You're playing battleship with {self.p1.user}.")
+
+ alert_messages = []
+
+ self.turn = self.p1
+ self.next = self.p2
+
+ while True:
+ await self.print_grids()
+
+ if self.gameover:
+ return
+
+ square = await self.take_turn()
+ if not square:
+ return
+ square.aimed = True
+
+ for message in alert_messages:
+ await message.delete()
+
+ alert_messages = []
+ alert_messages.append(await self.next.user.send(f"{self.turn.user} aimed at {self.match.string}!"))
+
+ if square.boat:
+ await self.hit(square, alert_messages)
+ if self.gameover:
+ return
+ else:
+ await self.turn.user.send("Miss!", delete_after=3.0)
+ alert_messages.append(await self.next.user.send("Miss!"))
+
+ self.turn, self.next = self.next, self.turn
+
+
+class Battleship(commands.Cog):
+ """Play the classic game Battleship!"""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.games: list[Game] = []
+ self.waiting: list[discord.Member] = []
+
+ def predicate(
+ self,
+ ctx: commands.Context,
+ announcement: discord.Message,
+ reaction: discord.Reaction,
+ user: discord.Member
+ ) -> bool:
+ """Predicate checking the criteria for the announcement message."""
+ if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2
+ return True # Is dealt with later on
+ if (
+ user.id not in (ctx.me.id, ctx.author.id)
+ and str(reaction.emoji) == HAND_RAISED_EMOJI
+ and reaction.message.id == announcement.id
+ ):
+ if self.already_playing(user):
+ self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!"))
+ self.bot.loop.create_task(announcement.remove_reaction(reaction, user))
+ return False
+
+ if user in self.waiting:
+ self.bot.loop.create_task(ctx.send(
+ f"{user.mention} Please cancel your game first before joining another one."
+ ))
+ self.bot.loop.create_task(announcement.remove_reaction(reaction, user))
+ return False
+
+ return True
+
+ if (
+ user.id == ctx.author.id
+ and str(reaction.emoji) == CROSS_EMOJI
+ and reaction.message.id == announcement.id
+ ):
+ return True
+ return False
+
+ def already_playing(self, player: discord.Member) -> bool:
+ """Check if someone is already in a game."""
+ return any(player in (game.p1.user, game.p2.user) for game in self.games)
+
+ @commands.group(invoke_without_command=True)
+ @commands.guild_only()
+ async def battleship(self, ctx: commands.Context) -> None:
+ """
+ Play a game of Battleship with someone else!
+
+ This will set up a message waiting for someone else to react and play along.
+ The game takes place entirely in DMs.
+ Make sure you have your DMs open so that the bot can message you.
+ """
+ if self.already_playing(ctx.author):
+ await ctx.send("You're already playing a game!")
+ return
+
+ if ctx.author in self.waiting:
+ await ctx.send("You've already sent out a request for a player 2.")
+ return
+
+ announcement = await ctx.send(
+ "**Battleship**: A new game is about to start!\n"
+ f"Press {HAND_RAISED_EMOJI} to play against {ctx.author.mention}!\n"
+ f"(Cancel the game with {CROSS_EMOJI}.)"
+ )
+ self.waiting.append(ctx.author)
+ await announcement.add_reaction(HAND_RAISED_EMOJI)
+ await announcement.add_reaction(CROSS_EMOJI)
+
+ try:
+ reaction, user = await self.bot.wait_for(
+ "reaction_add",
+ check=partial(self.predicate, ctx, announcement),
+ timeout=60.0
+ )
+ except asyncio.TimeoutError:
+ self.waiting.remove(ctx.author)
+ await announcement.delete()
+ await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...")
+ return
+
+ if str(reaction.emoji) == CROSS_EMOJI:
+ self.waiting.remove(ctx.author)
+ await announcement.delete()
+ await ctx.send(f"{ctx.author.mention} Game cancelled.")
+ return
+
+ await announcement.delete()
+ self.waiting.remove(ctx.author)
+ if self.already_playing(ctx.author):
+ return
+ game = Game(self.bot, ctx.channel, ctx.author, user)
+ self.games.append(game)
+ try:
+ await game.start_game()
+ self.games.remove(game)
+ except discord.Forbidden:
+ await ctx.send(
+ f"{ctx.author.mention} {user.mention} "
+ "Game failed. This is likely due to you not having your DMs open. Check and try again."
+ )
+ self.games.remove(game)
+ except Exception:
+ # End the game in the event of an unforseen error so the players aren't stuck in a game
+ await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed.")
+ self.games.remove(game)
+ raise
+
+ @battleship.command(name="ships", aliases=("boats",))
+ async def battleship_ships(self, ctx: commands.Context) -> None:
+ """Lists the ships that are found on the battleship grid."""
+ embed = discord.Embed(colour=Colours.blue)
+ embed.add_field(name="Name", value="\n".join(SHIPS))
+ embed.add_field(name="Size", value="\n".join(str(size) for size in SHIPS.values()))
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Battleship Cog."""
+ bot.add_cog(Battleship(bot))
diff --git a/bot/exts/fun/catify.py b/bot/exts/fun/catify.py
new file mode 100644
index 00000000..32dfae09
--- /dev/null
+++ b/bot/exts/fun/catify.py
@@ -0,0 +1,86 @@
+import random
+from contextlib import suppress
+from typing import Optional
+
+from discord import AllowedMentions, Embed, Forbidden
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Cats, Colours, NEGATIVE_REPLIES
+from bot.utils import helpers
+
+
+class Catify(commands.Cog):
+ """Cog for the catify command."""
+
+ @commands.command(aliases=("ᓚᘏᗢify", "ᓚᘏᗢ"))
+ @commands.cooldown(1, 5, commands.BucketType.user)
+ async def catify(self, ctx: commands.Context, *, text: Optional[str]) -> None:
+ """
+ Convert the provided text into a cat themed sentence by interspercing cats throughout text.
+
+ If no text is given then the users nickname is edited.
+ """
+ if not text:
+ display_name = ctx.author.display_name
+
+ if len(display_name) > 26:
+ embed = Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description=(
+ "Your display name is too long to be catified! "
+ "Please change it to be under 26 characters."
+ ),
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=embed)
+ return
+
+ else:
+ display_name += f" | {random.choice(Cats.cats)}"
+
+ await ctx.send(f"Your catified nickname is: `{display_name}`", allowed_mentions=AllowedMentions.none())
+
+ with suppress(Forbidden):
+ await ctx.author.edit(nick=display_name)
+ else:
+ if len(text) >= 1500:
+ embed = Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description="Submitted text was too large! Please submit something under 1500 characters.",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=embed)
+ return
+
+ string_list = text.split()
+ for index, name in enumerate(string_list):
+ name = name.lower()
+ if "cat" in name:
+ if random.randint(0, 5) == 5:
+ string_list[index] = name.replace("cat", f"**{random.choice(Cats.cats)}**")
+ else:
+ string_list[index] = name.replace("cat", random.choice(Cats.cats))
+ for element in Cats.cats:
+ if element in name:
+ string_list[index] = name.replace(element, "cat")
+
+ string_len = len(string_list) // 3 or len(string_list)
+
+ for _ in range(random.randint(1, string_len)):
+ # insert cat at random index
+ if random.randint(0, 5) == 5:
+ string_list.insert(random.randint(0, len(string_list)), f"**{random.choice(Cats.cats)}**")
+ else:
+ string_list.insert(random.randint(0, len(string_list)), random.choice(Cats.cats))
+
+ text = helpers.suppress_links(" ".join(string_list))
+ await ctx.send(
+ f">>> {text}",
+ allowed_mentions=AllowedMentions.none()
+ )
+
+
+def setup(bot: Bot) -> None:
+ """Loads the catify cog."""
+ bot.add_cog(Catify())
diff --git a/bot/exts/fun/coinflip.py b/bot/exts/fun/coinflip.py
new file mode 100644
index 00000000..804306bd
--- /dev/null
+++ b/bot/exts/fun/coinflip.py
@@ -0,0 +1,53 @@
+import random
+
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Emojis
+
+
+class CoinSide(commands.Converter):
+ """Class used to convert the `side` parameter of coinflip command."""
+
+ HEADS = ("h", "head", "heads")
+ TAILS = ("t", "tail", "tails")
+
+ async def convert(self, ctx: commands.Context, side: str) -> str:
+ """Converts the provided `side` into the corresponding string."""
+ side = side.lower()
+ if side in self.HEADS:
+ return "heads"
+
+ if side in self.TAILS:
+ return "tails"
+
+ raise commands.BadArgument(f"{side!r} is not a valid coin side.")
+
+
+class CoinFlip(commands.Cog):
+ """Cog for the CoinFlip command."""
+
+ @commands.command(name="coinflip", aliases=("flip", "coin", "cf"))
+ async def coinflip_command(self, ctx: commands.Context, side: CoinSide = None) -> None:
+ """
+ Flips a coin.
+
+ If `side` is provided will state whether you guessed the side correctly.
+ """
+ flipped_side = random.choice(["heads", "tails"])
+
+ message = f"{ctx.author.mention} flipped **{flipped_side}**. "
+ if not side:
+ await ctx.send(message)
+ return
+
+ if side == flipped_side:
+ message += f"You guessed correctly! {Emojis.lemon_hyperpleased}"
+ else:
+ message += f"You guessed incorrectly. {Emojis.lemon_pensive}"
+ await ctx.send(message)
+
+
+def setup(bot: Bot) -> None:
+ """Loads the coinflip cog."""
+ bot.add_cog(CoinFlip())
diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py
new file mode 100644
index 00000000..647bb2b7
--- /dev/null
+++ b/bot/exts/fun/connect_four.py
@@ -0,0 +1,452 @@
+import asyncio
+import random
+from functools import partial
+from typing import Optional, Union
+
+import discord
+import emojis
+from discord.ext import commands
+from discord.ext.commands import guild_only
+
+from bot.bot import Bot
+from bot.constants import Emojis
+
+NUMBERS = list(Emojis.number_emojis.values())
+CROSS_EMOJI = Emojis.incident_unactioned
+
+Coordinate = Optional[tuple[int, int]]
+EMOJI_CHECK = Union[discord.Emoji, str]
+
+
+class Game:
+ """A Connect 4 Game."""
+
+ def __init__(
+ self,
+ bot: Bot,
+ channel: discord.TextChannel,
+ player1: discord.Member,
+ player2: Optional[discord.Member],
+ tokens: list[str],
+ size: int = 7
+ ):
+ self.bot = bot
+ self.channel = channel
+ self.player1 = player1
+ self.player2 = player2 or AI(self.bot, game=self)
+ self.tokens = tokens
+
+ self.grid = self.generate_board(size)
+ self.grid_size = size
+
+ self.unicode_numbers = NUMBERS[:self.grid_size]
+
+ self.message = None
+
+ self.player_active = None
+ self.player_inactive = None
+
+ @staticmethod
+ def generate_board(size: int) -> list[list[int]]:
+ """Generate the connect 4 board."""
+ return [[0 for _ in range(size)] for _ in range(size)]
+
+ async def print_grid(self) -> None:
+ """Formats and outputs the Connect Four grid to the channel."""
+ title = (
+ f"Connect 4: {self.player1.display_name}"
+ f" VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}"
+ )
+
+ rows = [" ".join(self.tokens[s] for s in row) for row in self.grid]
+ first_row = " ".join(x for x in NUMBERS[:self.grid_size])
+ formatted_grid = "\n".join([first_row] + rows)
+ embed = discord.Embed(title=title, description=formatted_grid)
+
+ if self.message:
+ await self.message.edit(embed=embed)
+ else:
+ self.message = await self.channel.send(content="Loading...")
+ for emoji in self.unicode_numbers:
+ await self.message.add_reaction(emoji)
+ await self.message.add_reaction(CROSS_EMOJI)
+ await self.message.edit(content=None, embed=embed)
+
+ async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None:
+ """Announces to public chat."""
+ if action == "win":
+ await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}")
+ elif action == "draw":
+ await self.channel.send(f"Game Over! {player1.mention} {player2.mention} It's A Draw :tada:")
+ elif action == "quit":
+ await self.channel.send(f"{self.player1.mention} surrendered. Game over!")
+ await self.print_grid()
+
+ async def start_game(self) -> None:
+ """Begins the game."""
+ self.player_active, self.player_inactive = self.player1, self.player2
+
+ while True:
+ await self.print_grid()
+
+ if isinstance(self.player_active, AI):
+ coords = self.player_active.play()
+ if not coords:
+ await self.game_over(
+ "draw",
+ self.bot.user if isinstance(self.player_active, AI) else self.player_active,
+ self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive,
+ )
+ else:
+ coords = await self.player_turn()
+
+ if not coords:
+ return
+
+ if self.check_win(coords, 1 if self.player_active == self.player1 else 2):
+ await self.game_over(
+ "win",
+ self.bot.user if isinstance(self.player_active, AI) else self.player_active,
+ self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive,
+ )
+ return
+
+ self.player_active, self.player_inactive = self.player_inactive, self.player_active
+
+ def predicate(self, reaction: discord.Reaction, user: discord.Member) -> bool:
+ """The predicate to check for the player's reaction."""
+ return (
+ reaction.message.id == self.message.id
+ and user.id == self.player_active.id
+ and str(reaction.emoji) in (*self.unicode_numbers, CROSS_EMOJI)
+ )
+
+ async def player_turn(self) -> Coordinate:
+ """Initiate the player's turn."""
+ message = await self.channel.send(
+ f"{self.player_active.mention}, it's your turn! React with the column you want to place your token in."
+ )
+ player_num = 1 if self.player_active == self.player1 else 2
+ while True:
+ try:
+ reaction, user = await self.bot.wait_for("reaction_add", check=self.predicate, timeout=30.0)
+ except asyncio.TimeoutError:
+ await self.channel.send(f"{self.player_active.mention}, you took too long. Game over!")
+ return
+ else:
+ await message.delete()
+ if str(reaction.emoji) == CROSS_EMOJI:
+ await self.game_over("quit", self.player_active, self.player_inactive)
+ return
+
+ await self.message.remove_reaction(reaction, user)
+
+ column_num = self.unicode_numbers.index(str(reaction.emoji))
+ column = [row[column_num] for row in self.grid]
+
+ for row_num, square in reversed(list(enumerate(column))):
+ if not square:
+ self.grid[row_num][column_num] = player_num
+ return row_num, column_num
+ message = await self.channel.send(f"Column {column_num + 1} is full. Try again")
+
+ def check_win(self, coords: Coordinate, player_num: int) -> bool:
+ """Check that placing a counter here would cause the player to win."""
+ vertical = [(-1, 0), (1, 0)]
+ horizontal = [(0, 1), (0, -1)]
+ forward_diag = [(-1, 1), (1, -1)]
+ backward_diag = [(-1, -1), (1, 1)]
+ axes = [vertical, horizontal, forward_diag, backward_diag]
+
+ for axis in axes:
+ counters_in_a_row = 1 # The initial counter that is compared to
+ for (row_incr, column_incr) in axis:
+ row, column = coords
+ row += row_incr
+ column += column_incr
+
+ while 0 <= row < self.grid_size and 0 <= column < self.grid_size:
+ if self.grid[row][column] == player_num:
+ counters_in_a_row += 1
+ row += row_incr
+ column += column_incr
+ else:
+ break
+ if counters_in_a_row >= 4:
+ return True
+ return False
+
+
+class AI:
+ """The Computer Player for Single-Player games."""
+
+ def __init__(self, bot: Bot, game: Game):
+ self.game = game
+ self.mention = bot.user.mention
+
+ def get_possible_places(self) -> list[Coordinate]:
+ """Gets all the coordinates where the AI could possibly place a counter."""
+ possible_coords = []
+ for column_num in range(self.game.grid_size):
+ column = [row[column_num] for row in self.game.grid]
+ for row_num, square in reversed(list(enumerate(column))):
+ if not square:
+ possible_coords.append((row_num, column_num))
+ break
+ return possible_coords
+
+ def check_ai_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]:
+ """
+ Check AI win.
+
+ Check if placing a counter in any possible coordinate would cause the AI to win
+ with 10% chance of not winning and returning None
+ """
+ if random.randint(1, 10) == 1:
+ return
+ for coords in coord_list:
+ if self.game.check_win(coords, 2):
+ return coords
+
+ def check_player_win(self, coord_list: list[Coordinate]) -> Optional[Coordinate]:
+ """
+ Check Player win.
+
+ Check if placing a counter in possible coordinates would stop the player
+ from winning with 25% of not blocking them and returning None.
+ """
+ if random.randint(1, 4) == 1:
+ return
+ for coords in coord_list:
+ if self.game.check_win(coords, 1):
+ return coords
+
+ @staticmethod
+ def random_coords(coord_list: list[Coordinate]) -> Coordinate:
+ """Picks a random coordinate from the possible ones."""
+ return random.choice(coord_list)
+
+ def play(self) -> Union[Coordinate, bool]:
+ """
+ Plays for the AI.
+
+ Gets all possible coords, and determins the move:
+ 1. coords where it can win.
+ 2. coords where the player can win.
+ 3. Random coord
+ The first possible value is choosen.
+ """
+ possible_coords = self.get_possible_places()
+
+ if not possible_coords:
+ return False
+
+ coords = (
+ self.check_ai_win(possible_coords)
+ or self.check_player_win(possible_coords)
+ or self.random_coords(possible_coords)
+ )
+
+ row, column = coords
+ self.game.grid[row][column] = 2
+ return coords
+
+
+class ConnectFour(commands.Cog):
+ """Connect Four. The Classic Vertical Four-in-a-row Game!"""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.games: list[Game] = []
+ self.waiting: list[discord.Member] = []
+
+ self.tokens = [":white_circle:", ":blue_circle:", ":red_circle:"]
+
+ self.max_board_size = 9
+ self.min_board_size = 5
+
+ async def check_author(self, ctx: commands.Context, board_size: int) -> bool:
+ """Check if the requester is free and the board size is correct."""
+ if self.already_playing(ctx.author):
+ await ctx.send("You're already playing a game!")
+ return False
+
+ if ctx.author in self.waiting:
+ await ctx.send("You've already sent out a request for a player 2")
+ return False
+
+ if not self.min_board_size <= board_size <= self.max_board_size:
+ await ctx.send(
+ f"{board_size} is not a valid board size. A valid board size is "
+ f"between `{self.min_board_size}` and `{self.max_board_size}`."
+ )
+ return False
+
+ return True
+
+ def get_player(
+ self,
+ ctx: commands.Context,
+ announcement: discord.Message,
+ reaction: discord.Reaction,
+ user: discord.Member
+ ) -> bool:
+ """Predicate checking the criteria for the announcement message."""
+ if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2
+ return True # Is dealt with later on
+
+ if (
+ user.id not in (ctx.me.id, ctx.author.id)
+ and str(reaction.emoji) == Emojis.hand_raised
+ and reaction.message.id == announcement.id
+ ):
+ if self.already_playing(user):
+ self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!"))
+ self.bot.loop.create_task(announcement.remove_reaction(reaction, user))
+ return False
+
+ if user in self.waiting:
+ self.bot.loop.create_task(ctx.send(
+ f"{user.mention} Please cancel your game first before joining another one."
+ ))
+ self.bot.loop.create_task(announcement.remove_reaction(reaction, user))
+ return False
+
+ return True
+
+ if (
+ user.id == ctx.author.id
+ and str(reaction.emoji) == CROSS_EMOJI
+ and reaction.message.id == announcement.id
+ ):
+ return True
+ return False
+
+ def already_playing(self, player: discord.Member) -> bool:
+ """Check if someone is already in a game."""
+ return any(player in (game.player1, game.player2) for game in self.games)
+
+ @staticmethod
+ def check_emojis(
+ e1: EMOJI_CHECK, e2: EMOJI_CHECK
+ ) -> tuple[bool, Optional[str]]:
+ """Validate the emojis, the user put."""
+ if isinstance(e1, str) and emojis.count(e1) != 1:
+ return False, e1
+ if isinstance(e2, str) and emojis.count(e2) != 1:
+ return False, e2
+ return True, None
+
+ async def _play_game(
+ self,
+ ctx: commands.Context,
+ user: Optional[discord.Member],
+ board_size: int,
+ emoji1: str,
+ emoji2: str
+ ) -> None:
+ """Helper for playing a game of connect four."""
+ self.tokens = [":white_circle:", str(emoji1), str(emoji2)]
+ game = None # if game fails to intialize in try...except
+
+ try:
+ game = Game(self.bot, ctx.channel, ctx.author, user, self.tokens, size=board_size)
+ self.games.append(game)
+ await game.start_game()
+ self.games.remove(game)
+ except Exception:
+ # End the game in the event of an unforeseen error so the players aren't stuck in a game
+ await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed.")
+ if game in self.games:
+ self.games.remove(game)
+ raise
+
+ @guild_only()
+ @commands.group(
+ invoke_without_command=True,
+ aliases=("4inarow", "connect4", "connectfour", "c4"),
+ case_insensitive=True
+ )
+ async def connect_four(
+ self,
+ ctx: commands.Context,
+ board_size: int = 7,
+ emoji1: EMOJI_CHECK = "\U0001f535",
+ emoji2: EMOJI_CHECK = "\U0001f534"
+ ) -> None:
+ """
+ Play the classic game of Connect Four with someone!
+
+ Sets up a message waiting for someone else to react and play along.
+ The game will start once someone has reacted.
+ All inputs will be through reactions.
+ """
+ check, emoji = self.check_emojis(emoji1, emoji2)
+ if not check:
+ raise commands.EmojiNotFound(emoji)
+
+ check_author_result = await self.check_author(ctx, board_size)
+ if not check_author_result:
+ return
+
+ announcement = await ctx.send(
+ "**Connect Four**: A new game is about to start!\n"
+ f"Press {Emojis.hand_raised} to play against {ctx.author.mention}!\n"
+ f"(Cancel the game with {CROSS_EMOJI}.)"
+ )
+ self.waiting.append(ctx.author)
+ await announcement.add_reaction(Emojis.hand_raised)
+ await announcement.add_reaction(CROSS_EMOJI)
+
+ try:
+ reaction, user = await self.bot.wait_for(
+ "reaction_add",
+ check=partial(self.get_player, ctx, announcement),
+ timeout=60.0
+ )
+ except asyncio.TimeoutError:
+ self.waiting.remove(ctx.author)
+ await announcement.delete()
+ await ctx.send(
+ f"{ctx.author.mention} Seems like there's no one here to play. "
+ f"Use `{ctx.prefix}{ctx.invoked_with} ai` to play against a computer."
+ )
+ return
+
+ if str(reaction.emoji) == CROSS_EMOJI:
+ self.waiting.remove(ctx.author)
+ await announcement.delete()
+ await ctx.send(f"{ctx.author.mention} Game cancelled.")
+ return
+
+ await announcement.delete()
+ self.waiting.remove(ctx.author)
+ if self.already_playing(ctx.author):
+ return
+
+ await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2))
+
+ @guild_only()
+ @connect_four.command(aliases=("bot", "computer", "cpu"))
+ async def ai(
+ self,
+ ctx: commands.Context,
+ board_size: int = 7,
+ emoji1: EMOJI_CHECK = "\U0001f535",
+ emoji2: EMOJI_CHECK = "\U0001f534"
+ ) -> None:
+ """Play Connect Four against a computer player."""
+ check, emoji = self.check_emojis(emoji1, emoji2)
+ if not check:
+ raise commands.EmojiNotFound(emoji)
+
+ check_author_result = await self.check_author(ctx, board_size)
+ if not check_author_result:
+ return
+
+ await self._play_game(ctx, None, board_size, str(emoji1), str(emoji2))
+
+
+def setup(bot: Bot) -> None:
+ """Load ConnectFour Cog."""
+ bot.add_cog(ConnectFour(bot))
diff --git a/bot/exts/fun/duck_game.py b/bot/exts/fun/duck_game.py
new file mode 100644
index 00000000..1ef7513f
--- /dev/null
+++ b/bot/exts/fun/duck_game.py
@@ -0,0 +1,336 @@
+import asyncio
+import random
+import re
+from collections import defaultdict
+from io import BytesIO
+from itertools import product
+from pathlib import Path
+
+import discord
+from PIL import Image, ImageDraw, ImageFont
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, MODERATION_ROLES
+from bot.utils.decorators import with_role
+
+DECK = list(product(*[(0, 1, 2)]*4))
+
+GAME_DURATION = 180
+
+# Scoring
+CORRECT_SOLN = 1
+INCORRECT_SOLN = -1
+CORRECT_GOOSE = 2
+INCORRECT_GOOSE = -1
+
+# Distribution of minimum acceptable solutions at board generation.
+# This is for gameplay reasons, to shift the number of solutions per board up,
+# while still making the end of the game unpredictable.
+# Note: this is *not* the same as the distribution of number of solutions.
+
+SOLN_DISTR = 0, 0.05, 0.05, 0.1, 0.15, 0.25, 0.2, 0.15, .05
+
+IMAGE_PATH = Path("bot", "resources", "fun", "all_cards.png")
+FONT_PATH = Path("bot", "resources", "fun", "LuckiestGuy-Regular.ttf")
+HELP_IMAGE_PATH = Path("bot", "resources", "fun", "ducks_help_ex.png")
+
+ALL_CARDS = Image.open(IMAGE_PATH)
+LABEL_FONT = ImageFont.truetype(str(FONT_PATH), size=16)
+CARD_WIDTH = 155
+CARD_HEIGHT = 97
+
+EMOJI_WRONG = "\u274C"
+
+ANSWER_REGEX = re.compile(r'^\D*(\d+)\D+(\d+)\D+(\d+)\D*$')
+
+HELP_TEXT = """
+**Each card has 4 features**
+Color, Number, Hat, and Accessory
+
+**A valid flight**
+3 cards where each feature is either all the same or all different
+
+**Call "GOOSE"**
+if you think there are no more flights
+
+**+1** for each valid flight
+**+2** for a correct "GOOSE" call
+**-1** for any wrong answer
+
+The first flight below is invalid: the first card has swords while the other two have no accessory.\
+ It would be valid if the first card was empty-handed, or one of the other two had paintbrushes.
+
+The second flight is valid because there are no 2:1 splits; each feature is either all the same or all different.
+"""
+
+
+def assemble_board_image(board: list[tuple[int]], rows: int, columns: int) -> Image:
+ """Cut and paste images representing the given cards into an image representing the board."""
+ new_im = Image.new("RGBA", (CARD_WIDTH*columns, CARD_HEIGHT*rows))
+ draw = ImageDraw.Draw(new_im)
+ for idx, card in enumerate(board):
+ card_image = get_card_image(card)
+ row, col = divmod(idx, columns)
+ top, left = row * CARD_HEIGHT, col * CARD_WIDTH
+ new_im.paste(card_image, (left, top))
+ draw.text(
+ xy=(left+5, top+5), # magic numbers are buffers for the card labels
+ text=str(idx),
+ fill=(0, 0, 0),
+ font=LABEL_FONT,
+ )
+ return new_im
+
+
+def get_card_image(card: tuple[int]) -> Image:
+ """Slice the image containing all the cards to get just this card."""
+ # The master card image file should have 9x9 cards,
+ # arranged such that their features can be interpreted as ordered trinary.
+ row, col = divmod(as_trinary(card), 9)
+ x1 = col * CARD_WIDTH
+ x2 = x1 + CARD_WIDTH
+ y1 = row * CARD_HEIGHT
+ y2 = y1 + CARD_HEIGHT
+ return ALL_CARDS.crop((x1, y1, x2, y2))
+
+
+def as_trinary(card: tuple[int]) -> int:
+ """Find the card's unique index by interpreting its features as trinary."""
+ return int(''.join(str(x) for x in card), base=3)
+
+
+class DuckGame:
+ """A class for a single game."""
+
+ def __init__(
+ self,
+ rows: int = 4,
+ columns: int = 3,
+ minimum_solutions: int = 1,
+ ):
+ """
+ Take samples from the deck to generate a board.
+
+ Args:
+ rows (int, optional): Rows in the game board. Defaults to 4.
+ columns (int, optional): Columns in the game board. Defaults to 3.
+ minimum_solutions (int, optional): Minimum acceptable number of solutions in the board. Defaults to 1.
+ """
+ self.rows = rows
+ self.columns = columns
+ size = rows * columns
+
+ self._solutions = None
+ self.claimed_answers = {}
+ self.scores = defaultdict(int)
+ self.editing_embed = asyncio.Lock()
+
+ self.board = random.sample(DECK, size)
+ while len(self.solutions) < minimum_solutions:
+ self.board = random.sample(DECK, size)
+
+ @property
+ def board(self) -> list[tuple[int]]:
+ """Accesses board property."""
+ return self._board
+
+ @board.setter
+ def board(self, val: list[tuple[int]]) -> None:
+ """Erases calculated solutions if the board changes."""
+ self._solutions = None
+ self._board = val
+
+ @property
+ def solutions(self) -> None:
+ """Calculate valid solutions and cache to avoid redoing work."""
+ if self._solutions is None:
+ self._solutions = set()
+ for idx_a, card_a in enumerate(self.board):
+ for idx_b, card_b in enumerate(self.board[idx_a+1:], start=idx_a+1):
+ # Two points determine a line, and there are exactly 3 points per line in {0,1,2}^4.
+ # The completion of a line will only be a duplicate point if the other two points are the same,
+ # which is prevented by the triangle iteration.
+ completion = tuple(
+ feat_a if feat_a == feat_b else 3-feat_a-feat_b
+ for feat_a, feat_b in zip(card_a, card_b)
+ )
+ try:
+ idx_c = self.board.index(completion)
+ except ValueError:
+ continue
+
+ # Indices within the solution are sorted to detect duplicate solutions modulo order.
+ solution = tuple(sorted((idx_a, idx_b, idx_c)))
+ self._solutions.add(solution)
+
+ return self._solutions
+
+
+class DuckGamesDirector(commands.Cog):
+ """A cog for running Duck Duck Duck Goose games."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.current_games = {}
+
+ @commands.group(
+ name='duckduckduckgoose',
+ aliases=['dddg', 'ddg', 'duckduckgoose', 'duckgoose'],
+ invoke_without_command=True
+ )
+ @commands.cooldown(rate=1, per=2, type=commands.BucketType.channel)
+ async def start_game(self, ctx: commands.Context) -> None:
+ """Generate a board, send the game embed, and end the game after a time limit."""
+ if ctx.channel.id in self.current_games:
+ await ctx.send("There's already a game running!")
+ return
+
+ minimum_solutions, = random.choices(range(len(SOLN_DISTR)), weights=SOLN_DISTR)
+ game = DuckGame(minimum_solutions=minimum_solutions)
+ game.running = True
+ self.current_games[ctx.channel.id] = game
+
+ game.msg_content = ""
+ game.embed_msg = await self.send_board_embed(ctx, game)
+ await asyncio.sleep(GAME_DURATION)
+
+ # Checking for the channel ID in the currently running games is not sufficient.
+ # The game could have been ended by a player, and a new game already started in the same channel.
+ if game.running:
+ try:
+ del self.current_games[ctx.channel.id]
+ await self.end_game(ctx.channel, game, end_message="Time's up!")
+ except KeyError:
+ pass
+
+ @commands.Cog.listener()
+ async def on_message(self, msg: discord.Message) -> None:
+ """Listen for messages and process them as answers if appropriate."""
+ if msg.author.bot:
+ return
+
+ channel = msg.channel
+ if channel.id not in self.current_games:
+ return
+
+ game = self.current_games[channel.id]
+ if msg.content.strip().lower() == 'goose':
+ # If all of the solutions have been claimed, i.e. the "goose" call is correct.
+ if len(game.solutions) == len(game.claimed_answers):
+ try:
+ del self.current_games[channel.id]
+ game.scores[msg.author] += CORRECT_GOOSE
+ await self.end_game(channel, game, end_message=f"{msg.author.display_name} GOOSED!")
+ except KeyError:
+ pass
+ else:
+ await msg.add_reaction(EMOJI_WRONG)
+ game.scores[msg.author] += INCORRECT_GOOSE
+ return
+
+ # Valid answers contain 3 numbers.
+ if not (match := re.match(ANSWER_REGEX, msg.content)):
+ return
+ answer = tuple(sorted(int(m) for m in match.groups()))
+
+ # Be forgiving for answers that use indices not on the board.
+ if not all(0 <= n < len(game.board) for n in answer):
+ return
+
+ # Also be forgiving for answers that have already been claimed (and avoid penalizing for racing conditions).
+ if answer in game.claimed_answers:
+ return
+
+ if answer in game.solutions:
+ game.claimed_answers[answer] = msg.author
+ game.scores[msg.author] += CORRECT_SOLN
+ await self.display_claimed_answer(game, msg.author, answer)
+ else:
+ await msg.add_reaction(EMOJI_WRONG)
+ game.scores[msg.author] += INCORRECT_SOLN
+
+ async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message:
+ """Create and send the initial game embed. This will be edited as the game goes on."""
+ image = assemble_board_image(game.board, game.rows, game.columns)
+ with BytesIO() as image_stream:
+ image.save(image_stream, format="png")
+ image_stream.seek(0)
+ file = discord.File(fp=image_stream, filename="board.png")
+ embed = discord.Embed(
+ title="Duck Duck Duck Goose!",
+ color=Colours.bright_green,
+ )
+ embed.set_image(url="attachment://board.png")
+ return await ctx.send(embed=embed, file=file)
+
+ async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None:
+ """Add a claimed answer to the game embed."""
+ async with game.editing_embed:
+ # We specifically edit the message contents instead of the embed
+ # Because we load in the image from the file, editing any portion of the embed
+ # Does weird things to the image and this works around that weirdness
+ game.msg_content = f"{game.msg_content}\n{str(answer):12s} - {author.display_name}"
+ await game.embed_msg.edit(content=game.msg_content)
+
+ async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None:
+ """Edit the game embed to reflect the end of the game and mark the game as not running."""
+ game.running = False
+
+ scoreboard_embed = discord.Embed(
+ title=end_message,
+ color=discord.Color.dark_purple(),
+ )
+ scores = sorted(
+ game.scores.items(),
+ key=lambda item: item[1],
+ reverse=True,
+ )
+ scoreboard = "Final scores:\n\n"
+ scoreboard += "\n".join(f"{member.display_name}: {score}" for member, score in scores)
+ scoreboard_embed.description = scoreboard
+ await channel.send(embed=scoreboard_embed)
+
+ missed = [ans for ans in game.solutions if ans not in game.claimed_answers]
+ if missed:
+ missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed)
+ else:
+ missed_text = "All the flights were found!"
+
+ await game.embed_msg.edit(content=f"{missed_text}")
+
+ @start_game.command(name="help")
+ async def show_rules(self, ctx: commands.Context) -> None:
+ """Explain the rules of the game."""
+ await self.send_help_embed(ctx)
+
+ @start_game.command(name="stop")
+ @with_role(*MODERATION_ROLES)
+ async def stop_game(self, ctx: commands.Context) -> None:
+ """Stop a currently running game. Only available to mods."""
+ try:
+ game = self.current_games.pop(ctx.channel.id)
+ except KeyError:
+ await ctx.send("No game currently running in this channel")
+ return
+ await self.end_game(ctx.channel, game, end_message="Game canceled.")
+
+ @staticmethod
+ async def send_help_embed(ctx: commands.Context) -> discord.Message:
+ """Send rules embed."""
+ embed = discord.Embed(
+ title="Compete against other players to find valid flights!",
+ color=discord.Color.dark_purple(),
+ )
+ embed.description = HELP_TEXT
+ file = discord.File(HELP_IMAGE_PATH, filename="help.png")
+ embed.set_image(url="attachment://help.png")
+ embed.set_footer(
+ text="Tip: using Discord's compact message display mode can help keep the board on the screen"
+ )
+ return await ctx.send(file=file, embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the DuckGamesDirector cog."""
+ bot.add_cog(DuckGamesDirector(bot))
diff --git a/bot/exts/fun/fun.py b/bot/exts/fun/fun.py
new file mode 100644
index 00000000..b148f1f3
--- /dev/null
+++ b/bot/exts/fun/fun.py
@@ -0,0 +1,250 @@
+import functools
+import json
+import logging
+import random
+from collections.abc import Iterable
+from pathlib import Path
+from typing import Callable, Optional, Union
+
+from discord import Embed, Message
+from discord.ext import commands
+from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content
+
+from bot import utils
+from bot.bot import Bot
+from bot.constants import Client, Colours, Emojis
+from bot.utils import helpers
+
+log = logging.getLogger(__name__)
+
+UWU_WORDS = {
+ "fi": "fwi",
+ "l": "w",
+ "r": "w",
+ "some": "sum",
+ "th": "d",
+ "thing": "fing",
+ "tho": "fo",
+ "you're": "yuw'we",
+ "your": "yur",
+ "you": "yuw",
+}
+
+
+def caesar_cipher(text: str, offset: int) -> Iterable[str]:
+ """
+ Implements a lazy Caesar Cipher algorithm.
+
+ Encrypts a `text` given a specific integer `offset`. The sign
+ of the `offset` dictates the direction in which it shifts to,
+ with a negative value shifting to the left, and a positive
+ value shifting to the right.
+ """
+ for char in text:
+ if not char.isascii() or not char.isalpha() or char.isspace():
+ yield char
+ continue
+
+ case_start = 65 if char.isupper() else 97
+ true_offset = (ord(char) - case_start + offset) % 26
+
+ yield chr(case_start + true_offset)
+
+
+class Fun(Cog):
+ """A collection of general commands for fun."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ self._caesar_cipher_embed = json.loads(Path("bot/resources/fun/caesar_info.json").read_text("UTF-8"))
+
+ @staticmethod
+ def _get_random_die() -> str:
+ """Generate a random die emoji, ready to be sent on Discord."""
+ die_name = f"dice_{random.randint(1, 6)}"
+ return getattr(Emojis, die_name)
+
+ @commands.command()
+ async def roll(self, ctx: Context, num_rolls: int = 1) -> None:
+ """Outputs a number of random dice emotes (up to 6)."""
+ if 1 <= num_rolls <= 6:
+ dice = " ".join(self._get_random_die() for _ in range(num_rolls))
+ await ctx.send(dice)
+ else:
+ raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.")
+
+ @commands.command(name="uwu", aliases=("uwuwize", "uwuify",))
+ async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None:
+ """Converts a given `text` into it's uwu equivalent."""
+ conversion_func = functools.partial(
+ utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True
+ )
+ text, embed = await Fun._get_text_and_embed(ctx, text)
+ # Convert embed if it exists
+ if embed is not None:
+ embed = Fun._convert_embed(conversion_func, embed)
+ converted_text = conversion_func(text)
+ converted_text = helpers.suppress_links(converted_text)
+ # Don't put >>> if only embed present
+ if converted_text:
+ converted_text = f">>> {converted_text.lstrip('> ')}"
+ await ctx.send(content=converted_text, embed=embed)
+
+ @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",))
+ async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None:
+ """Randomly converts the casing of a given `text`."""
+ def conversion_func(text: str) -> str:
+ """Randomly converts the casing of a given string."""
+ return "".join(
+ char.upper() if round(random.random()) else char.lower() for char in text
+ )
+ text, embed = await Fun._get_text_and_embed(ctx, text)
+ # Convert embed if it exists
+ if embed is not None:
+ embed = Fun._convert_embed(conversion_func, embed)
+ converted_text = conversion_func(text)
+ converted_text = helpers.suppress_links(converted_text)
+ # Don't put >>> if only embed present
+ if converted_text:
+ converted_text = f">>> {converted_text.lstrip('> ')}"
+ await ctx.send(content=converted_text, embed=embed)
+
+ @commands.group(name="caesarcipher", aliases=("caesar", "cc",))
+ async def caesarcipher_group(self, ctx: Context) -> None:
+ """
+ Translates a message using the Caesar Cipher.
+
+ See `decrypt`, `encrypt`, and `info` subcommands.
+ """
+ if ctx.invoked_subcommand is None:
+ await ctx.invoke(self.bot.get_command("help"), "caesarcipher")
+
+ @caesarcipher_group.command(name="info")
+ async def caesarcipher_info(self, ctx: Context) -> None:
+ """Information about the Caesar Cipher."""
+ embed = Embed.from_dict(self._caesar_cipher_embed)
+ embed.colour = Colours.dark_green
+
+ await ctx.send(embed=embed)
+
+ @staticmethod
+ async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None:
+ """
+ Given a positive integer `offset`, translates and sends the given `msg`.
+
+ Performs a right shift by default unless `left_shift` is specified as `True`.
+
+ Also accepts a valid Discord Message ID or link.
+ """
+ if offset < 0:
+ await ctx.send(":no_entry: Cannot use a negative offset.")
+ return
+
+ if left_shift:
+ offset = -offset
+
+ def conversion_func(text: str) -> str:
+ """Encrypts the given string using the Caesar Cipher."""
+ return "".join(caesar_cipher(text, offset))
+
+ text, embed = await Fun._get_text_and_embed(ctx, msg)
+
+ if embed is not None:
+ embed = Fun._convert_embed(conversion_func, embed)
+
+ converted_text = conversion_func(text)
+
+ if converted_text:
+ converted_text = f">>> {converted_text.lstrip('> ')}"
+
+ await ctx.send(content=converted_text, embed=embed)
+
+ @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",))
+ async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None:
+ """
+ Given a positive integer `offset`, encrypt the given `msg`.
+
+ Performs a right shift of the letters in the message.
+
+ Also accepts a valid Discord Message ID or link.
+ """
+ await self._caesar_cipher(ctx, offset, msg, left_shift=False)
+
+ @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",))
+ async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None:
+ """
+ Given a positive integer `offset`, decrypt the given `msg`.
+
+ Performs a left shift of the letters in the message.
+
+ Also accepts a valid Discord Message ID or link.
+ """
+ await self._caesar_cipher(ctx, offset, msg, left_shift=True)
+
+ @staticmethod
+ async def _get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]:
+ """
+ Attempts to extract the text and embed from a possible link to a discord Message.
+
+ Does not retrieve the text and embed from the Message if it is in a channel the user does
+ not have read permissions in.
+
+ Returns a tuple of:
+ str: If `text` is a valid discord Message, the contents of the message, else `text`.
+ Optional[Embed]: The embed if found in the valid Message, else None
+ """
+ embed = None
+
+ msg = await Fun._get_discord_message(ctx, text)
+ # Ensure the user has read permissions for the channel the message is in
+ if isinstance(msg, Message):
+ permissions = msg.channel.permissions_for(ctx.author)
+ if permissions.read_messages:
+ text = msg.clean_content
+ # Take first embed because we can't send multiple embeds
+ if msg.embeds:
+ embed = msg.embeds[0]
+
+ return (text, embed)
+
+ @staticmethod
+ async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]:
+ """
+ Attempts to convert a given `text` to a discord Message object and return it.
+
+ Conversion will succeed if given a discord Message ID or link.
+ Returns `text` if the conversion fails.
+ """
+ try:
+ text = await MessageConverter().convert(ctx, text)
+ except commands.BadArgument:
+ log.debug(f"Input '{text:.20}...' is not a valid Discord Message")
+ return text
+
+ @staticmethod
+ def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed:
+ """
+ Converts the text in an embed using a given conversion function, then return the embed.
+
+ Only modifies the following fields: title, description, footer, fields
+ """
+ embed_dict = embed.to_dict()
+
+ embed_dict["title"] = func(embed_dict.get("title", ""))
+ embed_dict["description"] = func(embed_dict.get("description", ""))
+
+ if "footer" in embed_dict:
+ embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", ""))
+
+ if "fields" in embed_dict:
+ for field in embed_dict["fields"]:
+ field["name"] = func(field.get("name", ""))
+ field["value"] = func(field.get("value", ""))
+
+ return Embed.from_dict(embed_dict)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Fun cog."""
+ bot.add_cog(Fun(bot))
diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py
new file mode 100644
index 00000000..f9c150e6
--- /dev/null
+++ b/bot/exts/fun/game.py
@@ -0,0 +1,485 @@
+import difflib
+import logging
+import random
+import re
+from asyncio import sleep
+from datetime import datetime as dt, timedelta
+from enum import IntEnum
+from typing import Any, Optional
+
+from aiohttp import ClientSession
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import Cog, Context, group
+
+from bot.bot import Bot
+from bot.constants import STAFF_ROLES, Tokens
+from bot.utils.decorators import with_role
+from bot.utils.extensions import invoke_help_command
+from bot.utils.pagination import ImagePaginator, LinePaginator
+
+# Base URL of IGDB API
+BASE_URL = "https://api.igdb.com/v4"
+
+CLIENT_ID = Tokens.igdb_client_id
+CLIENT_SECRET = Tokens.igdb_client_secret
+
+# The number of seconds before expiry that we attempt to re-fetch a new access token
+ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2
+
+# URL to request API access token
+OAUTH_URL = "https://id.twitch.tv/oauth2/token"
+
+OAUTH_PARAMS = {
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ "grant_type": "client_credentials"
+}
+
+BASE_HEADERS = {
+ "Client-ID": CLIENT_ID,
+ "Accept": "application/json"
+}
+
+logger = logging.getLogger(__name__)
+
+REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE)
+
+# ---------
+# TEMPLATES
+# ---------
+
+# Body templates
+# Request body template for get_games_list
+GAMES_LIST_BODY = (
+ "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status,"
+ "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;"
+ "{sort} {limit} {offset} {genre} {additional}"
+)
+
+# Request body template for get_companies_list
+COMPANIES_LIST_BODY = (
+ "fields name, url, start_date, logo.image_id, developed.name, published.name, description;"
+ "offset {offset}; limit {limit};"
+)
+
+# Request body template for games search
+SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";'
+
+# Pages templates
+# Game embed layout
+GAME_PAGE = (
+ "**[{name}]({url})**\n"
+ "{description}"
+ "**Release Date:** {release_date}\n"
+ "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n"
+ "**Platforms:** {platforms}\n"
+ "**Status:** {status}\n"
+ "**Age Ratings:** {age_ratings}\n"
+ "**Made by:** {made_by}\n\n"
+ "{storyline}"
+)
+
+# .games company command page layout
+COMPANY_PAGE = (
+ "**[{name}]({url})**\n"
+ "{description}"
+ "**Founded:** {founded}\n"
+ "**Developed:** {developed}\n"
+ "**Published:** {published}"
+)
+
+# For .games search command line layout
+GAME_SEARCH_LINE = (
+ "**[{name}]({url})**\n"
+ "{rating}/100 :star: (based on {rating_count} ratings)\n"
+)
+
+# URL templates
+COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg"
+LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png"
+
+# Create aliases for complex genre names
+ALIASES = {
+ "Role-playing (rpg)": ["Role playing", "Rpg"],
+ "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"],
+ "Real time strategy (rts)": ["Real time strategy", "Rts"],
+ "Hack and slash/beat 'em up": ["Hack and slash"]
+}
+
+
+class GameStatus(IntEnum):
+ """Game statuses in IGDB API."""
+
+ Released = 0
+ Alpha = 2
+ Beta = 3
+ Early = 4
+ Offline = 5
+ Cancelled = 6
+ Rumored = 7
+
+
+class AgeRatingCategories(IntEnum):
+ """IGDB API Age Rating categories IDs."""
+
+ ESRB = 1
+ PEGI = 2
+
+
+class AgeRatings(IntEnum):
+ """PEGI/ESRB ratings IGDB API IDs."""
+
+ Three = 1
+ Seven = 2
+ Twelve = 3
+ Sixteen = 4
+ Eighteen = 5
+ RP = 6
+ EC = 7
+ E = 8
+ E10 = 9
+ T = 10
+ M = 11
+ AO = 12
+
+
+class Games(Cog):
+ """Games Cog contains commands that collect data from IGDB."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.http_session: ClientSession = bot.http_session
+
+ self.genres: dict[str, int] = {}
+ self.headers = BASE_HEADERS
+
+ self.bot.loop.create_task(self.renew_access_token())
+
+ async def renew_access_token(self) -> None:
+ """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`."""
+ while True:
+ async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp:
+ result = await resp.json()
+ if resp.status != 200:
+ # If there is a valid access token continue to use that,
+ # otherwise unload cog.
+ if "Authorization" in self.headers:
+ time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW)
+ logger.error(
+ "Failed to renew IGDB access token. "
+ f"Current token will last for {time_delta} "
+ f"OAuth response message: {result['message']}"
+ )
+ else:
+ logger.warning(
+ "Invalid OAuth credentials. Unloading Games cog. "
+ f"OAuth response message: {result['message']}"
+ )
+ self.bot.remove_cog("Games")
+
+ return
+
+ self.headers["Authorization"] = f"Bearer {result['access_token']}"
+
+ # Attempt to renew before the token expires
+ next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW
+
+ time_delta = timedelta(seconds=next_renewal)
+ logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}")
+
+ # This will be true the first time this loop runs.
+ # Since we now have an access token, its safe to start this task.
+ if self.genres == {}:
+ self.refresh_genres_task.start()
+ await sleep(next_renewal)
+
+ @tasks.loop(hours=24.0)
+ async def refresh_genres_task(self) -> None:
+ """Refresh genres in every hour."""
+ try:
+ await self._get_genres()
+ except Exception as e:
+ logger.warning(f"There was error while refreshing genres: {e}")
+ return
+ logger.info("Successfully refreshed genres.")
+
+ def cog_unload(self) -> None:
+ """Cancel genres refreshing start when unloading Cog."""
+ self.refresh_genres_task.cancel()
+ logger.info("Successfully stopped Genres Refreshing task.")
+
+ async def _get_genres(self) -> None:
+ """Create genres variable for games command."""
+ body = "fields name; limit 100;"
+ async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp:
+ result = await resp.json()
+ genres = {genre["name"].capitalize(): genre["id"] for genre in result}
+
+ # Replace complex names with names from ALIASES
+ for genre_name, genre in genres.items():
+ if genre_name in ALIASES:
+ for alias in ALIASES[genre_name]:
+ self.genres[alias] = genre
+ else:
+ self.genres[genre_name] = genre
+
+ @group(name="games", aliases=("game",), invoke_without_command=True)
+ async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None:
+ """
+ Get random game(s) by genre from IGDB. Use .games genres command to get all available genres.
+
+ Also support amount parameter, what max is 25 and min 1, default 5. Supported formats:
+ - .games <genre>
+ - .games <amount> <genre>
+ """
+ # When user didn't specified genre, send help message
+ if genre is None:
+ await invoke_help_command(ctx)
+ return
+
+ # Capitalize genre for check
+ genre = "".join(genre).capitalize()
+
+ # Check for amounts, max is 25 and min 1
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ # Get games listing, if genre don't exist, show error message with possibilities.
+ # Offset must be random, due otherwise we will get always same result (offset show in which position should
+ # API start returning result)
+ try:
+ games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150))
+ except KeyError:
+ possibilities = await self.get_best_results(genre)
+ # If there is more than 1 possibilities, show these.
+ # If there is only 1 possibility, use it as genre.
+ # Otherwise send message about invalid genre.
+ if len(possibilities) > 1:
+ display_possibilities = "`, `".join(p[1] for p in possibilities)
+ await ctx.send(
+ f"Invalid genre `{genre}`. "
+ f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}"
+ )
+ return
+ elif len(possibilities) == 1:
+ games = await self.get_games_list(
+ amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150)
+ )
+ genre = possibilities[0][1]
+ else:
+ await ctx.send(f"Invalid genre `{genre}`.")
+ return
+
+ # Create pages and paginate
+ pages = [await self.create_page(game) for game in games]
+
+ await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games"))
+
+ @games.command(name="top", aliases=("t",))
+ async def top(self, ctx: Context, amount: int = 10) -> None:
+ """
+ Get current Top games in IGDB.
+
+ Support amount parameter. Max is 25, min is 1.
+ """
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ games = await self.get_games_list(amount, sort="total_rating desc",
+ additional_body="where total_rating >= 90; sort total_rating_count desc;")
+
+ pages = [await self.create_page(game) for game in games]
+ await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games"))
+
+ @games.command(name="genres", aliases=("genre", "g"))
+ async def genres(self, ctx: Context) -> None:
+ """Get all available genres."""
+ await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}")
+
+ @games.command(name="search", aliases=("s",))
+ async def search(self, ctx: Context, *, search_term: str) -> None:
+ """Find games by name."""
+ lines = await self.search_games(search_term)
+
+ await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False)
+
+ @games.command(name="company", aliases=("companies",))
+ async def company(self, ctx: Context, amount: int = 5) -> None:
+ """
+ Get random Game Companies companies from IGDB API.
+
+ Support amount parameter. Max is 25, min is 1.
+ """
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to
+ # get (almost) every time different companies (offset show in which position should API start returning result)
+ companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150))
+ pages = [await self.create_company_page(co) for co in companies]
+
+ await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies"))
+
+ @with_role(*STAFF_ROLES)
+ @games.command(name="refresh", aliases=("r",))
+ async def refresh_genres_command(self, ctx: Context) -> None:
+ """Refresh .games command genres."""
+ try:
+ await self._get_genres()
+ except Exception as e:
+ await ctx.send(f"There was error while refreshing genres: `{e}`")
+ return
+ await ctx.send("Successfully refreshed genres.")
+
+ async def get_games_list(
+ self,
+ amount: int,
+ genre: Optional[str] = None,
+ sort: Optional[str] = None,
+ additional_body: str = "",
+ offset: int = 0
+ ) -> list[dict[str, Any]]:
+ """
+ Get list of games from IGDB API by parameters that is provided.
+
+ Amount param show how much games this get, genre is genre ID and at least one genre in game must this when
+ provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field,
+ desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start
+ position in API.
+ """
+ # Create body of IGDB API request, define fields, sorting, offset, limit and genre
+ params = {
+ "sort": f"sort {sort};" if sort else "",
+ "limit": f"limit {amount};",
+ "offset": f"offset {offset};" if offset else "",
+ "genre": f"where genres = ({genre});" if genre else "",
+ "additional": additional_body
+ }
+ body = GAMES_LIST_BODY.format(**params)
+
+ # Do request to IGDB API, create headers, URL, define body, return result
+ async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp:
+ return await resp.json()
+
+ async def create_page(self, data: dict[str, Any]) -> tuple[str, str]:
+ """Create content of Game Page."""
+ # Create cover image URL from template
+ url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""})
+
+ # Get release date separately with checking
+ release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?"
+
+ # Create Age Ratings value
+ rating = ", ".join(
+ f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}"
+ for age in data["age_ratings"]
+ ) if "age_ratings" in data else "?"
+
+ companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?"
+
+ # Create formatting for template page
+ formatting = {
+ "name": data["name"],
+ "url": data["url"],
+ "description": f"{data['summary']}\n\n" if "summary" in data else "\n",
+ "release_date": release_date,
+ "rating": round(data["total_rating"] if "total_rating" in data else 0, 2),
+ "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?",
+ "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?",
+ "status": GameStatus(data["status"]).name if "status" in data else "?",
+ "age_ratings": rating,
+ "made_by": ", ".join(companies),
+ "storyline": data["storyline"] if "storyline" in data else ""
+ }
+ page = GAME_PAGE.format(**formatting)
+
+ return page, url
+
+ async def search_games(self, search_term: str) -> list[str]:
+ """Search game from IGDB API by string, return listing of pages."""
+ lines = []
+
+ # Define request body of IGDB API request and do request
+ body = SEARCH_BODY.format(**{"term": search_term})
+
+ async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp:
+ data = await resp.json()
+
+ # Loop over games, format them to good format, make line and append this to total lines
+ for game in data:
+ formatting = {
+ "name": game["name"],
+ "url": game["url"],
+ "rating": round(game["total_rating"] if "total_rating" in game else 0, 2),
+ "rating_count": game["total_rating_count"] if "total_rating" in game else "?"
+ }
+ line = GAME_SEARCH_LINE.format(**formatting)
+ lines.append(line)
+
+ return lines
+
+ async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]:
+ """
+ Get random Game Companies from IGDB API.
+
+ Limit is parameter, that show how much movies this should return, offset show in which position should API start
+ returning results.
+ """
+ # Create request body from template
+ body = COMPANIES_LIST_BODY.format(**{
+ "limit": limit,
+ "offset": offset
+ })
+
+ async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp:
+ return await resp.json()
+
+ async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]:
+ """Create good formatted Game Company page."""
+ # Generate URL of company logo
+ url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""})
+
+ # Try to get found date of company
+ founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?"
+
+ # Generate list of games, that company have developed or published
+ developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?"
+ published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?"
+
+ formatting = {
+ "name": data["name"],
+ "url": data["url"],
+ "description": f"{data['description']}\n\n" if "description" in data else "\n",
+ "founded": founded,
+ "developed": developed,
+ "published": published
+ }
+ page = COMPANY_PAGE.format(**formatting)
+
+ return page, url
+
+ async def get_best_results(self, query: str) -> list[tuple[float, str]]:
+ """Get best match result of genre when original genre is invalid."""
+ results = []
+ for genre in self.genres:
+ ratios = [difflib.SequenceMatcher(None, query, genre).ratio()]
+ for word in REGEX_NON_ALPHABET.split(genre):
+ ratios.append(difflib.SequenceMatcher(None, query, word).ratio())
+ results.append((round(max(ratios), 2), genre))
+ return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4]
+
+
+def setup(bot: Bot) -> None:
+ """Load the Games cog."""
+ # Check does IGDB API key exist, if not, log warning and don't load cog
+ if not Tokens.igdb_client_id:
+ logger.warning("No IGDB client ID. Not loading Games cog.")
+ return
+ if not Tokens.igdb_client_secret:
+ logger.warning("No IGDB client secret. Not loading Games cog.")
+ return
+ bot.add_cog(Games(bot))
diff --git a/bot/exts/fun/magic_8ball.py b/bot/exts/fun/magic_8ball.py
new file mode 100644
index 00000000..a7b682ca
--- /dev/null
+++ b/bot/exts/fun/magic_8ball.py
@@ -0,0 +1,30 @@
+import json
+import logging
+import random
+from pathlib import Path
+
+from discord.ext import commands
+
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+
+ANSWERS = json.loads(Path("bot/resources/fun/magic8ball.json").read_text("utf8"))
+
+
+class Magic8ball(commands.Cog):
+ """A Magic 8ball command to respond to a user's question."""
+
+ @commands.command(name="8ball")
+ async def output_answer(self, ctx: commands.Context, *, question: str) -> None:
+ """Return a Magic 8ball answer from answers list."""
+ if len(question.split()) >= 3:
+ answer = random.choice(ANSWERS)
+ await ctx.send(answer)
+ else:
+ await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)")
+
+
+def setup(bot: Bot) -> None:
+ """Load the Magic8Ball Cog."""
+ bot.add_cog(Magic8ball())
diff --git a/bot/exts/fun/minesweeper.py b/bot/exts/fun/minesweeper.py
new file mode 100644
index 00000000..a48b5051
--- /dev/null
+++ b/bot/exts/fun/minesweeper.py
@@ -0,0 +1,270 @@
+import logging
+from collections.abc import Iterator
+from dataclasses import dataclass
+from random import randint, random
+from typing import Union
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Client
+from bot.utils.converters import CoordinateConverter
+from bot.utils.exceptions import UserNotPlayingError
+from bot.utils.extensions import invoke_help_command
+
+MESSAGE_MAPPING = {
+ 0: ":stop_button:",
+ 1: ":one:",
+ 2: ":two:",
+ 3: ":three:",
+ 4: ":four:",
+ 5: ":five:",
+ 6: ":six:",
+ 7: ":seven:",
+ 8: ":eight:",
+ 9: ":nine:",
+ 10: ":keycap_ten:",
+ "bomb": ":bomb:",
+ "hidden": ":grey_question:",
+ "flag": ":flag_black:",
+ "x": ":x:"
+}
+
+log = logging.getLogger(__name__)
+
+
+GameBoard = list[list[Union[str, int]]]
+
+
+@dataclass
+class Game:
+ """The data for a game."""
+
+ board: GameBoard
+ revealed: GameBoard
+ dm_msg: discord.Message
+ chat_msg: discord.Message
+ activated_on_server: bool
+
+
+class Minesweeper(commands.Cog):
+ """Play a game of Minesweeper."""
+
+ def __init__(self):
+ self.games: dict[int, Game] = {}
+
+ @commands.group(name="minesweeper", aliases=("ms",), invoke_without_command=True)
+ async def minesweeper_group(self, ctx: commands.Context) -> None:
+ """Commands for Playing Minesweeper."""
+ await invoke_help_command(ctx)
+
+ @staticmethod
+ def get_neighbours(x: int, y: int) -> Iterator[tuple[int, int]]:
+ """Get all the neighbouring x and y including it self."""
+ for x_ in [x - 1, x, x + 1]:
+ for y_ in [y - 1, y, y + 1]:
+ if x_ != -1 and x_ != 10 and y_ != -1 and y_ != 10:
+ yield x_, y_
+
+ def generate_board(self, bomb_chance: float) -> GameBoard:
+ """Generate a 2d array for the board."""
+ board: GameBoard = [
+ [
+ "bomb" if random() <= bomb_chance else "number"
+ for _ in range(10)
+ ] for _ in range(10)
+ ]
+
+ # make sure there is always a free cell
+ board[randint(0, 9)][randint(0, 9)] = "number"
+
+ for y, row in enumerate(board):
+ for x, cell in enumerate(row):
+ if cell == "number":
+ # calculate bombs near it
+ bombs = 0
+ for x_, y_ in self.get_neighbours(x, y):
+ if board[y_][x_] == "bomb":
+ bombs += 1
+ board[y][x] = bombs
+ return board
+
+ @staticmethod
+ def format_for_discord(board: GameBoard) -> str:
+ """Format the board as a string for Discord."""
+ discord_msg = (
+ ":stop_button: :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: "
+ ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: "
+ ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n"
+ )
+ rows = []
+ for row_number, row in enumerate(board):
+ new_row = f"{MESSAGE_MAPPING[row_number + 1]} "
+ new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row)
+ rows.append(new_row)
+
+ discord_msg += "\n".join(rows)
+ return discord_msg
+
+ @minesweeper_group.command(name="start")
+ async def start_command(self, ctx: commands.Context, bomb_chance: float = .2) -> None:
+ """Start a game of Minesweeper."""
+ if ctx.author.id in self.games: # Player is already playing
+ await ctx.send(f"{ctx.author.mention} you already have a game running!", delete_after=2)
+ await ctx.message.delete(delay=2)
+ return
+
+ try:
+ await ctx.author.send(
+ f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n"
+ f"Close the game with `{Client.prefix}ms end`\n"
+ )
+ except discord.errors.Forbidden:
+ log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members.")
+ await ctx.send(f":x: {ctx.author.mention}, please enable DMs to play minesweeper.")
+ return
+
+ # Add game to list
+ board: GameBoard = self.generate_board(bomb_chance)
+ revealed_board: GameBoard = [["hidden"] * 10 for _ in range(10)]
+ dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}")
+
+ if ctx.guild:
+ await ctx.send(f"{ctx.author.mention} is playing Minesweeper.")
+ chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}")
+ else:
+ chat_msg = None
+
+ self.games[ctx.author.id] = Game(
+ board=board,
+ revealed=revealed_board,
+ dm_msg=dm_msg,
+ chat_msg=chat_msg,
+ activated_on_server=ctx.guild is not None
+ )
+
+ async def update_boards(self, ctx: commands.Context) -> None:
+ """Update both playing boards."""
+ game = self.games[ctx.author.id]
+ await game.dm_msg.delete()
+ game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}")
+ if game.activated_on_server:
+ await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}")
+
+ @commands.dm_only()
+ @minesweeper_group.command(name="flag")
+ async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None:
+ """Place multiple flags on the board."""
+ if ctx.author.id not in self.games:
+ raise UserNotPlayingError
+ board: GameBoard = self.games[ctx.author.id].revealed
+ for x, y in coordinates:
+ if board[y][x] == "hidden":
+ board[y][x] = "flag"
+
+ await self.update_boards(ctx)
+
+ @staticmethod
+ def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None:
+ """Reveals all the bombs."""
+ for y, row in enumerate(board):
+ for x, cell in enumerate(row):
+ if cell == "bomb":
+ revealed[y][x] = cell
+
+ async def lost(self, ctx: commands.Context) -> None:
+ """The player lost the game."""
+ game = self.games[ctx.author.id]
+ self.reveal_bombs(game.revealed, game.board)
+ await ctx.author.send(":fire: You lost! :fire:")
+ if game.activated_on_server:
+ await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:")
+
+ async def won(self, ctx: commands.Context) -> None:
+ """The player won the game."""
+ game = self.games[ctx.author.id]
+ await ctx.author.send(":tada: You won! :tada:")
+ if game.activated_on_server:
+ await game.chat_msg.channel.send(f":tada: {ctx.author.mention} just won Minesweeper! :tada:")
+
+ def reveal_zeros(self, revealed: GameBoard, board: GameBoard, x: int, y: int) -> None:
+ """Recursively reveal adjacent cells when a 0 cell is encountered."""
+ for x_, y_ in self.get_neighbours(x, y):
+ if revealed[y_][x_] != "hidden":
+ continue
+ revealed[y_][x_] = board[y_][x_]
+ if board[y_][x_] == 0:
+ self.reveal_zeros(revealed, board, x_, y_)
+
+ async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool:
+ """Checks if a player has won."""
+ if any(
+ revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb"
+ for x in range(10)
+ for y in range(10)
+ ):
+ return False
+ else:
+ await self.won(ctx)
+ return True
+
+ async def reveal_one(
+ self,
+ ctx: commands.Context,
+ revealed: GameBoard,
+ board: GameBoard,
+ x: int,
+ y: int
+ ) -> bool:
+ """
+ Reveal one square.
+
+ return is True if the game ended, breaking the loop in `reveal_command` and deleting the game.
+ """
+ revealed[y][x] = board[y][x]
+ if board[y][x] == "bomb":
+ await self.lost(ctx)
+ revealed[y][x] = "x" # mark bomb that made you lose with a x
+ return True
+ elif board[y][x] == 0:
+ self.reveal_zeros(revealed, board, x, y)
+ return await self.check_if_won(ctx, revealed, board)
+
+ @commands.dm_only()
+ @minesweeper_group.command(name="reveal")
+ async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None:
+ """Reveal multiple cells."""
+ if ctx.author.id not in self.games:
+ raise UserNotPlayingError
+ game = self.games[ctx.author.id]
+ revealed: GameBoard = game.revealed
+ board: GameBoard = game.board
+
+ for x, y in coordinates:
+ # reveal_one returns True if the revealed cell is a bomb or the player won, ending the game
+ if await self.reveal_one(ctx, revealed, board, x, y):
+ await self.update_boards(ctx)
+ del self.games[ctx.author.id]
+ break
+ else:
+ await self.update_boards(ctx)
+
+ @minesweeper_group.command(name="end")
+ async def end_command(self, ctx: commands.Context) -> None:
+ """End your current game."""
+ if ctx.author.id not in self.games:
+ raise UserNotPlayingError
+ game = self.games[ctx.author.id]
+ game.revealed = game.board
+ await self.update_boards(ctx)
+ new_msg = f":no_entry: Game canceled. :no_entry:\n{game.dm_msg.content}"
+ await game.dm_msg.edit(content=new_msg)
+ if game.activated_on_server:
+ await game.chat_msg.edit(content=new_msg)
+ del self.games[ctx.author.id]
+
+
+def setup(bot: Bot) -> None:
+ """Load the Minesweeper cog."""
+ bot.add_cog(Minesweeper())
diff --git a/bot/exts/fun/movie.py b/bot/exts/fun/movie.py
new file mode 100644
index 00000000..a04eeb41
--- /dev/null
+++ b/bot/exts/fun/movie.py
@@ -0,0 +1,205 @@
+import logging
+import random
+from enum import Enum
+from typing import Any
+
+from aiohttp import ClientSession
+from discord import Embed
+from discord.ext.commands import Cog, Context, group
+
+from bot.bot import Bot
+from bot.constants import Tokens
+from bot.utils.extensions import invoke_help_command
+from bot.utils.pagination import ImagePaginator
+
+# Define base URL of TMDB
+BASE_URL = "https://api.themoviedb.org/3/"
+
+logger = logging.getLogger(__name__)
+
+# Define movie params, that will be used for every movie request
+MOVIE_PARAMS = {
+ "api_key": Tokens.tmdb,
+ "language": "en-US"
+}
+
+
+class MovieGenres(Enum):
+ """Movies Genre names and IDs."""
+
+ Action = "28"
+ Adventure = "12"
+ Animation = "16"
+ Comedy = "35"
+ Crime = "80"
+ Documentary = "99"
+ Drama = "18"
+ Family = "10751"
+ Fantasy = "14"
+ History = "36"
+ Horror = "27"
+ Music = "10402"
+ Mystery = "9648"
+ Romance = "10749"
+ Science = "878"
+ Thriller = "53"
+ Western = "37"
+
+
+class Movie(Cog):
+ """Movie Cog contains movies command that grab random movies from TMDB."""
+
+ def __init__(self, bot: Bot):
+ self.http_session: ClientSession = bot.http_session
+
+ @group(name="movies", aliases=("movie",), invoke_without_command=True)
+ async def movies(self, ctx: Context, genre: str = "", amount: int = 5) -> None:
+ """
+ Get random movies by specifying genre. Also support amount parameter, that define how much movies will be shown.
+
+ Default 5. Use .movies genres to get all available genres.
+ """
+ # Check is there more than 20 movies specified, due TMDB return 20 movies
+ # per page, so this is max. Also you can't get less movies than 1, just logic
+ if amount > 20:
+ await ctx.send("You can't get more than 20 movies at once. (TMDB limits)")
+ return
+ elif amount < 1:
+ await ctx.send("You can't get less than 1 movie.")
+ return
+
+ # Capitalize genre for getting data from Enum, get random page, send help when genre don't exist.
+ genre = genre.capitalize()
+ try:
+ result = await self.get_movies_data(self.http_session, MovieGenres[genre].value, 1)
+ except KeyError:
+ await invoke_help_command(ctx)
+ return
+
+ # Check if "results" is in result. If not, throw error.
+ if "results" not in result:
+ err_msg = (
+ f"There is problem while making TMDB API request. Response Code: {result['status_code']}, "
+ f"{result['status_message']}."
+ )
+ await ctx.send(err_msg)
+ logger.warning(err_msg)
+
+ # Get random page. Max page is last page where is movies with this genre.
+ page = random.randint(1, result["total_pages"])
+
+ # Get movies list from TMDB, check if results key in result. When not, raise error.
+ movies = await self.get_movies_data(self.http_session, MovieGenres[genre].value, page)
+ if "results" not in movies:
+ err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \
+ f"{result['status_message']}."
+ await ctx.send(err_msg)
+ logger.warning(err_msg)
+
+ # Get all pages and embed
+ pages = await self.get_pages(self.http_session, movies, amount)
+ embed = await self.get_embed(genre)
+
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+ @movies.command(name="genres", aliases=("genre", "g"))
+ async def genres(self, ctx: Context) -> None:
+ """Show all currently available genres for .movies command."""
+ await ctx.send(f"Current available genres: {', '.join('`' + genre.name + '`' for genre in MovieGenres)}")
+
+ async def get_movies_data(self, client: ClientSession, genre_id: str, page: int) -> list[dict[str, Any]]:
+ """Return JSON of TMDB discover request."""
+ # Define params of request
+ params = {
+ "api_key": Tokens.tmdb,
+ "language": "en-US",
+ "sort_by": "popularity.desc",
+ "include_adult": "false",
+ "include_video": "false",
+ "page": page,
+ "with_genres": genre_id
+ }
+
+ url = BASE_URL + "discover/movie"
+
+ # Make discover request to TMDB, return result
+ async with client.get(url, params=params) as resp:
+ return await resp.json()
+
+ async def get_pages(self, client: ClientSession, movies: dict[str, Any], amount: int) -> list[tuple[str, str]]:
+ """Fetch all movie pages from movies dictionary. Return list of pages."""
+ pages = []
+
+ for i in range(amount):
+ movie_id = movies["results"][i]["id"]
+ movie = await self.get_movie(client, movie_id)
+
+ page, img = await self.create_page(movie)
+ pages.append((page, img))
+
+ return pages
+
+ async def get_movie(self, client: ClientSession, movie: int) -> dict[str, Any]:
+ """Get Movie by movie ID from TMDB. Return result dictionary."""
+ if not isinstance(movie, int):
+ raise ValueError("Error while fetching movie from TMDB, movie argument must be integer. ")
+ url = BASE_URL + f"movie/{movie}"
+
+ async with client.get(url, params=MOVIE_PARAMS) as resp:
+ return await resp.json()
+
+ async def create_page(self, movie: dict[str, Any]) -> tuple[str, str]:
+ """Create page from TMDB movie request result. Return formatted page + image."""
+ text = ""
+
+ # Add title + tagline (if not empty)
+ text += f"**{movie['title']}**\n"
+ if movie["tagline"]:
+ text += f"{movie['tagline']}\n\n"
+ else:
+ text += "\n"
+
+ # Add other information
+ text += f"**Rating:** {movie['vote_average']}/10 :star:\n"
+ text += f"**Release Date:** {movie['release_date']}\n\n"
+
+ text += "__**Production Information**__\n"
+
+ companies = movie["production_companies"]
+ countries = movie["production_countries"]
+
+ text += f"**Made by:** {', '.join(company['name'] for company in companies)}\n"
+ text += f"**Made in:** {', '.join(country['name'] for country in countries)}\n\n"
+
+ text += "__**Some Numbers**__\n"
+
+ budget = f"{movie['budget']:,d}" if movie['budget'] else "?"
+ revenue = f"{movie['revenue']:,d}" if movie['revenue'] else "?"
+
+ if movie["runtime"] is not None:
+ duration = divmod(movie["runtime"], 60)
+ else:
+ duration = ("?", "?")
+
+ text += f"**Budget:** ${budget}\n"
+ text += f"**Revenue:** ${revenue}\n"
+ text += f"**Duration:** {f'{duration[0]} hour(s) {duration[1]} minute(s)'}\n\n"
+
+ text += movie["overview"]
+
+ img = f"http://image.tmdb.org/t/p/w200{movie['poster_path']}"
+
+ # Return page content and image
+ return text, img
+
+ async def get_embed(self, name: str) -> Embed:
+ """Return embed of random movies. Uses name in title."""
+ embed = Embed(title=f"Random {name} Movies")
+ embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.")
+ embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png")
+ return embed
+
+
+def setup(bot: Bot) -> None:
+ """Load the Movie Cog."""
+ bot.add_cog(Movie(bot))
diff --git a/bot/exts/fun/recommend_game.py b/bot/exts/fun/recommend_game.py
new file mode 100644
index 00000000..42c9f7c2
--- /dev/null
+++ b/bot/exts/fun/recommend_game.py
@@ -0,0 +1,51 @@
+import json
+import logging
+from pathlib import Path
+from random import shuffle
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+game_recs = []
+
+# Populate the list `game_recs` with resource files
+for rec_path in Path("bot/resources/fun/game_recs").glob("*.json"):
+ data = json.loads(rec_path.read_text("utf8"))
+ game_recs.append(data)
+shuffle(game_recs)
+
+
+class RecommendGame(commands.Cog):
+ """Commands related to recommending games."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.index = 0
+
+ @commands.command(name="recommendgame", aliases=("gamerec",))
+ async def recommend_game(self, ctx: commands.Context) -> None:
+ """Sends an Embed of a random game recommendation."""
+ if self.index >= len(game_recs):
+ self.index = 0
+ shuffle(game_recs)
+ game = game_recs[self.index]
+ self.index += 1
+
+ author = self.bot.get_user(int(game["author"]))
+
+ # Creating and formatting Embed
+ embed = discord.Embed(color=discord.Colour.blue())
+ if author is not None:
+ embed.set_author(name=author.name, icon_url=author.display_avatar.url)
+ embed.set_image(url=game["image"])
+ embed.add_field(name=f"Recommendation: {game['title']}\n{game['link']}", value=game["description"])
+
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Loads the RecommendGame cog."""
+ bot.add_cog(RecommendGame(bot))
diff --git a/bot/exts/fun/rps.py b/bot/exts/fun/rps.py
new file mode 100644
index 00000000..c6bbff46
--- /dev/null
+++ b/bot/exts/fun/rps.py
@@ -0,0 +1,57 @@
+from random import choice
+
+from discord.ext import commands
+
+from bot.bot import Bot
+
+CHOICES = ["rock", "paper", "scissors"]
+SHORT_CHOICES = ["r", "p", "s"]
+
+# Using a dictionary instead of conditions to check for the winner.
+WINNER_DICT = {
+ "r": {
+ "r": 0,
+ "p": -1,
+ "s": 1,
+ },
+ "p": {
+ "r": 1,
+ "p": 0,
+ "s": -1,
+ },
+ "s": {
+ "r": -1,
+ "p": 1,
+ "s": 0,
+ }
+}
+
+
+class RPS(commands.Cog):
+ """Rock Paper Scissors. The Classic Game!"""
+
+ @commands.command(case_insensitive=True)
+ async def rps(self, ctx: commands.Context, move: str) -> None:
+ """Play the classic game of Rock Paper Scissors with your own sir-lancebot!"""
+ move = move.lower()
+ player_mention = ctx.author.mention
+
+ if move not in CHOICES and move not in SHORT_CHOICES:
+ raise commands.BadArgument(f"Invalid move. Please make move from options: {', '.join(CHOICES).upper()}.")
+
+ bot_move = choice(CHOICES)
+ # value of player_result will be from (-1, 0, 1) as (lost, tied, won).
+ player_result = WINNER_DICT[move[0]][bot_move[0]]
+
+ if player_result == 0:
+ message_string = f"{player_mention} You and Sir Lancebot played {bot_move}, it's a tie."
+ await ctx.send(message_string)
+ elif player_result == 1:
+ await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} won!")
+ else:
+ await ctx.send(f"Sir Lancebot played {bot_move}! {player_mention} lost!")
+
+
+def setup(bot: Bot) -> None:
+ """Load the RPS Cog."""
+ bot.add_cog(RPS(bot))
diff --git a/bot/exts/fun/snakes/__init__.py b/bot/exts/fun/snakes/__init__.py
new file mode 100644
index 00000000..ba8333fd
--- /dev/null
+++ b/bot/exts/fun/snakes/__init__.py
@@ -0,0 +1,11 @@
+import logging
+
+from bot.bot import Bot
+from bot.exts.fun.snakes._snakes_cog import Snakes
+
+log = logging.getLogger(__name__)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Snakes Cog."""
+ bot.add_cog(Snakes(bot))
diff --git a/bot/exts/fun/snakes/_converter.py b/bot/exts/fun/snakes/_converter.py
new file mode 100644
index 00000000..c24ba8c6
--- /dev/null
+++ b/bot/exts/fun/snakes/_converter.py
@@ -0,0 +1,82 @@
+import json
+import logging
+import random
+from collections.abc import Iterable
+
+import discord
+from discord.ext.commands import Context, Converter
+from rapidfuzz import fuzz
+
+from bot.exts.fun.snakes._utils import SNAKE_RESOURCES
+from bot.utils import disambiguate
+
+log = logging.getLogger(__name__)
+
+
+class Snake(Converter):
+ """Snake converter for the Snakes Cog."""
+
+ snakes = None
+ special_cases = None
+
+ async def convert(self, ctx: Context, name: str) -> str:
+ """Convert the input snake name to the closest matching Snake object."""
+ await self.build_list()
+ name = name.lower()
+
+ if name == "python":
+ return "Python (programming language)"
+
+ def get_potential(iterable: Iterable, *, threshold: int = 80) -> list[str]:
+ nonlocal name
+ potential = []
+
+ for item in iterable:
+ original, item = item, item.lower()
+
+ if name == item:
+ return [original]
+
+ a, b = fuzz.ratio(name, item), fuzz.partial_ratio(name, item)
+ if a >= threshold or b >= threshold:
+ potential.append(original)
+
+ return potential
+
+ # Handle special cases
+ if name.lower() in self.special_cases:
+ return self.special_cases.get(name.lower(), name.lower())
+
+ names = {snake["name"]: snake["scientific"] for snake in self.snakes}
+ all_names = names.keys() | names.values()
+ timeout = len(all_names) * (3 / 4)
+
+ embed = discord.Embed(
+ title="Found multiple choices. Please choose the correct one.", colour=0x59982F)
+ embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url)
+
+ name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed)
+ return names.get(name, name)
+
+ @classmethod
+ async def build_list(cls) -> None:
+ """Build list of snakes from the static snake resources."""
+ # Get all the snakes
+ if cls.snakes is None:
+ cls.snakes = json.loads((SNAKE_RESOURCES / "snake_names.json").read_text("utf8"))
+ # Get the special cases
+ if cls.special_cases is None:
+ special_cases = json.loads((SNAKE_RESOURCES / "special_snakes.json").read_text("utf8"))
+ cls.special_cases = {snake["name"].lower(): snake for snake in special_cases}
+
+ @classmethod
+ async def random(cls) -> str:
+ """
+ Get a random Snake from the loaded resources.
+
+ This is stupid. We should find a way to somehow get the global session into a global context,
+ so I can get it from here.
+ """
+ await cls.build_list()
+ names = [snake["scientific"] for snake in cls.snakes]
+ return random.choice(names)
diff --git a/bot/exts/fun/snakes/_snakes_cog.py b/bot/exts/fun/snakes/_snakes_cog.py
new file mode 100644
index 00000000..59e57199
--- /dev/null
+++ b/bot/exts/fun/snakes/_snakes_cog.py
@@ -0,0 +1,1151 @@
+import asyncio
+import colorsys
+import logging
+import os
+import random
+import re
+import string
+import textwrap
+import urllib
+from functools import partial
+from io import BytesIO
+from typing import Any, Optional
+
+import async_timeout
+from PIL import Image, ImageDraw, ImageFont
+from discord import Colour, Embed, File, Member, Message, Reaction
+from discord.errors import HTTPException
+from discord.ext.commands import Cog, CommandError, Context, bot_has_permissions, group
+
+from bot.bot import Bot
+from bot.constants import ERROR_REPLIES, Tokens
+from bot.exts.fun.snakes import _utils as utils
+from bot.exts.fun.snakes._converter import Snake
+from bot.utils.decorators import locked
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+
+
+# region: Constants
+# Color
+SNAKE_COLOR = 0x399600
+
+# Antidote constants
+SYRINGE_EMOJI = "\U0001F489" # :syringe:
+PILL_EMOJI = "\U0001F48A" # :pill:
+HOURGLASS_EMOJI = "\u231B" # :hourglass:
+CROSSBONES_EMOJI = "\u2620" # :skull_crossbones:
+ALEMBIC_EMOJI = "\u2697" # :alembic:
+TICK_EMOJI = "\u2705" # :white_check_mark: - Correct peg, correct hole
+CROSS_EMOJI = "\u274C" # :x: - Wrong peg, wrong hole
+BLANK_EMOJI = "\u26AA" # :white_circle: - Correct peg, wrong hole
+HOLE_EMOJI = "\u2B1C" # :white_square: - Used in guesses
+EMPTY_UNICODE = "\u200b" # literally just an empty space
+
+ANTIDOTE_EMOJI = (
+ SYRINGE_EMOJI,
+ PILL_EMOJI,
+ HOURGLASS_EMOJI,
+ CROSSBONES_EMOJI,
+ ALEMBIC_EMOJI,
+)
+
+# Quiz constants
+ANSWERS_EMOJI = {
+ "a": "\U0001F1E6", # :regional_indicator_a: 🇦
+ "b": "\U0001F1E7", # :regional_indicator_b: 🇧
+ "c": "\U0001F1E8", # :regional_indicator_c: 🇨
+ "d": "\U0001F1E9", # :regional_indicator_d: 🇩
+}
+
+ANSWERS_EMOJI_REVERSE = {
+ "\U0001F1E6": "A", # :regional_indicator_a: 🇦
+ "\U0001F1E7": "B", # :regional_indicator_b: 🇧
+ "\U0001F1E8": "C", # :regional_indicator_c: 🇨
+ "\U0001F1E9": "D", # :regional_indicator_d: 🇩
+}
+
+# Zzzen of pythhhon constant
+ZEN = """
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+"""
+
+# Max messages to train snake_chat on
+MSG_MAX = 100
+
+# get_snek constants
+URL = "https://en.wikipedia.org/w/api.php?"
+
+# snake guess responses
+INCORRECT_GUESS = (
+ "Nope, that's not what it is.",
+ "Not quite.",
+ "Not even close.",
+ "Terrible guess.",
+ "Nnnno.",
+ "Dude. No.",
+ "I thought everyone knew this one.",
+ "Guess you suck at snakes.",
+ "Bet you feel stupid now.",
+ "Hahahaha, no.",
+ "Did you hit the wrong key?"
+)
+
+CORRECT_GUESS = (
+ "**WRONG**. Wait, no, actually you're right.",
+ "Yeah, you got it!",
+ "Yep, that's exactly what it is.",
+ "Uh-huh. Yep yep yep.",
+ "Yeah that's right.",
+ "Yup. How did you know that?",
+ "Are you a herpetologist?",
+ "Sure, okay, but I bet you can't pronounce it.",
+ "Are you cheating?"
+)
+
+# snake card consts
+CARD = {
+ "top": Image.open("bot/resources/fun/snakes/snake_cards/card_top.png"),
+ "frame": Image.open("bot/resources/fun/snakes/snake_cards/card_frame.png"),
+ "bottom": Image.open("bot/resources/fun/snakes/snake_cards/card_bottom.png"),
+ "backs": [
+ Image.open(f"bot/resources/fun/snakes/snake_cards/backs/{file}")
+ for file in os.listdir("bot/resources/fun/snakes/snake_cards/backs")
+ ],
+ "font": ImageFont.truetype("bot/resources/fun/snakes/snake_cards/expressway.ttf", 20)
+}
+# endregion
+
+
+class Snakes(Cog):
+ """
+ Commands related to snakes, created by our community during the first code jam.
+
+ More information can be found in the code-jam-1 repo.
+
+ https://github.com/python-discord/code-jam-1
+ """
+
+ wiki_brief = re.compile(r"(.*?)(=+ (.*?) =+)", flags=re.DOTALL)
+ valid_image_extensions = ("gif", "png", "jpeg", "jpg", "webp")
+
+ def __init__(self, bot: Bot):
+ self.active_sal = {}
+ self.bot = bot
+ self.snake_names = utils.get_resource("snake_names")
+ self.snake_idioms = utils.get_resource("snake_idioms")
+ self.snake_quizzes = utils.get_resource("snake_quiz")
+ self.snake_facts = utils.get_resource("snake_facts")
+ self.num_movie_pages = None
+
+ # region: Helper methods
+ @staticmethod
+ def _beautiful_pastel(hue: float) -> int:
+ """Returns random bright pastels."""
+ light = random.uniform(0.7, 0.85)
+ saturation = 1
+
+ rgb = colorsys.hls_to_rgb(hue, light, saturation)
+ hex_rgb = ""
+
+ for part in rgb:
+ value = int(part * 0xFF)
+ hex_rgb += f"{value:02x}"
+
+ return int(hex_rgb, 16)
+
+ @staticmethod
+ def _generate_card(buffer: BytesIO, content: dict) -> BytesIO:
+ """
+ Generate a card from snake information.
+
+ Written by juan and Someone during the first code jam.
+ """
+ snake = Image.open(buffer)
+
+ # Get the size of the snake icon, configure the height of the image box (yes, it changes)
+ icon_width = 347 # Hardcoded, not much i can do about that
+ icon_height = int((icon_width / snake.width) * snake.height)
+ frame_copies = icon_height // CARD["frame"].height + 1
+ snake.thumbnail((icon_width, icon_height))
+
+ # Get the dimensions of the final image
+ main_height = icon_height + CARD["top"].height + CARD["bottom"].height
+ main_width = CARD["frame"].width
+
+ # Start creating the foreground
+ foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0))
+ foreground.paste(CARD["top"], (0, 0))
+
+ # Generate the frame borders to the correct height
+ for offset in range(frame_copies):
+ position = (0, CARD["top"].height + offset * CARD["frame"].height)
+ foreground.paste(CARD["frame"], position)
+
+ # Add the image and bottom part of the image
+ foreground.paste(snake, (36, CARD["top"].height)) # Also hardcoded :(
+ foreground.paste(CARD["bottom"], (0, CARD["top"].height + icon_height))
+
+ # Setup the background
+ back = random.choice(CARD["backs"])
+ back_copies = main_height // back.height + 1
+ full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0))
+
+ # Generate the tiled background
+ for offset in range(back_copies):
+ full_image.paste(back, (16, 16 + offset * back.height))
+
+ # Place the foreground onto the final image
+ full_image.paste(foreground, (0, 0), foreground)
+
+ # Get the first two sentences of the info
+ description = ".".join(content["info"].split(".")[:2]) + "."
+
+ # Setup positioning variables
+ margin = 36
+ offset = CARD["top"].height + icon_height + margin
+
+ # Create blank rectangle image which will be behind the text
+ rectangle = Image.new(
+ "RGBA",
+ (main_width, main_height),
+ (0, 0, 0, 0)
+ )
+
+ # Draw a semi-transparent rectangle on it
+ rect = ImageDraw.Draw(rectangle)
+ rect.rectangle(
+ (margin, offset, main_width - margin, main_height - margin),
+ fill=(63, 63, 63, 128)
+ )
+
+ # Paste it onto the final image
+ full_image.paste(rectangle, (0, 0), mask=rectangle)
+
+ # Draw the text onto the final image
+ draw = ImageDraw.Draw(full_image)
+ for line in textwrap.wrap(description, 36):
+ draw.text([margin + 4, offset], line, font=CARD["font"])
+ offset += CARD["font"].getsize(line)[1]
+
+ # Get the image contents as a BufferIO object
+ buffer = BytesIO()
+ full_image.save(buffer, "PNG")
+ buffer.seek(0)
+
+ return buffer
+
+ @staticmethod
+ def _snakify(message: str) -> str:
+ """Sssnakifffiesss a sstring."""
+ # Replace fricatives with exaggerated snake fricatives.
+ simple_fricatives = [
+ "f", "s", "z", "h",
+ "F", "S", "Z", "H",
+ ]
+ complex_fricatives = [
+ "th", "sh", "Th", "Sh"
+ ]
+
+ for letter in simple_fricatives:
+ if letter.islower():
+ message = message.replace(letter, letter * random.randint(2, 4))
+ else:
+ message = message.replace(letter, (letter * random.randint(2, 4)).title())
+
+ for fricative in complex_fricatives:
+ message = message.replace(fricative, fricative[0] + fricative[1] * random.randint(2, 4))
+
+ return message
+
+ async def _fetch(self, url: str, params: Optional[dict] = None) -> dict:
+ """Asynchronous web request helper method."""
+ if params is None:
+ params = {}
+
+ async with async_timeout.timeout(10):
+ async with self.bot.http_session.get(url, params=params) as response:
+ return await response.json()
+
+ def _get_random_long_message(self, messages: list[str], retries: int = 10) -> str:
+ """
+ Fetch a message that's at least 3 words long, if possible to do so in retries attempts.
+
+ Else, just return whatever the last message is.
+ """
+ long_message = random.choice(messages)
+ if len(long_message.split()) < 3 and retries > 0:
+ return self._get_random_long_message(
+ messages,
+ retries=retries - 1
+ )
+
+ return long_message
+
+ async def _get_snek(self, name: str) -> dict[str, Any]:
+ """
+ Fetches all the data from a wikipedia article about a snake.
+
+ Builds a dict that the .get() method can use.
+
+ Created by Ava and eivl.
+ """
+ snake_info = {}
+
+ params = {
+ "format": "json",
+ "action": "query",
+ "list": "search",
+ "srsearch": name,
+ "utf8": "",
+ "srlimit": "1",
+ }
+
+ json = await self._fetch(URL, params=params)
+
+ # Wikipedia does have a error page
+ try:
+ pageid = json["query"]["search"][0]["pageid"]
+ except KeyError:
+ # Wikipedia error page ID(?)
+ pageid = 41118
+ except IndexError:
+ return None
+
+ params = {
+ "format": "json",
+ "action": "query",
+ "prop": "extracts|images|info",
+ "exlimit": "max",
+ "explaintext": "",
+ "inprop": "url",
+ "pageids": pageid
+ }
+
+ json = await self._fetch(URL, params=params)
+
+ # Constructing dict - handle exceptions later
+ try:
+ snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"]
+ snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"]
+ snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"]
+ snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"]
+ snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"]
+ except KeyError:
+ snake_info["error"] = True
+
+ if snake_info["images"]:
+ i_url = "https://commons.wikimedia.org/wiki/Special:FilePath/"
+ image_list = []
+ map_list = []
+ thumb_list = []
+
+ # Wikipedia has arbitrary images that are not snakes
+ banned = [
+ "Commons-logo.svg",
+ "Red%20Pencil%20Icon.png",
+ "distribution",
+ "The%20Death%20of%20Cleopatra%20arthur.jpg",
+ "Head%20of%20holotype",
+ "locator",
+ "Woma.png",
+ "-map.",
+ ".svg",
+ "ange.",
+ "Adder%20(PSF).png"
+ ]
+
+ for image in snake_info["images"]:
+ # Images come in the format of `File:filename.extension`
+ file, sep, filename = image["title"].partition(":")
+ filename = filename.replace(" ", "%20") # Wikipedia returns good data!
+
+ if not filename.startswith("Map"):
+ if any(ban in filename for ban in banned):
+ pass
+ else:
+ image_list.append(f"{i_url}{filename}")
+ thumb_list.append(f"{i_url}{filename}?width=100")
+ else:
+ map_list.append(f"{i_url}{filename}")
+
+ snake_info["image_list"] = image_list
+ snake_info["map_list"] = map_list
+ snake_info["thumb_list"] = thumb_list
+ snake_info["name"] = name
+
+ match = self.wiki_brief.match(snake_info["extract"])
+ info = match.group(1) if match else None
+
+ if info:
+ info = info.replace("\n", "\n\n") # Give us some proper paragraphs.
+
+ snake_info["info"] = info
+
+ return snake_info
+
+ async def _get_snake_name(self) -> dict[str, str]:
+ """Gets a random snake name."""
+ return random.choice(self.snake_names)
+
+ async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: dict[str, str]) -> None:
+ """Validate the answer using a reaction event loop."""
+ def predicate(reaction: Reaction, user: Member) -> bool:
+ """Test if the the answer is valid and can be evaluated."""
+ return (
+ reaction.message.id == message.id # The reaction is attached to the question we asked.
+ and user == ctx.author # It's the user who triggered the quiz.
+ and str(reaction.emoji) in ANSWERS_EMOJI.values() # The reaction is one of the options.
+ )
+
+ for emoji in ANSWERS_EMOJI.values():
+ await message.add_reaction(emoji)
+
+ # Validate the answer
+ try:
+ reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate)
+ except asyncio.TimeoutError:
+ await ctx.send(f"You took too long. The correct answer was **{options[answer]}**.")
+ await message.clear_reactions()
+ return
+
+ if str(reaction.emoji) == ANSWERS_EMOJI[answer]:
+ await ctx.send(f"{random.choice(CORRECT_GUESS)} The correct answer was **{options[answer]}**.")
+ else:
+ await ctx.send(
+ f"{random.choice(INCORRECT_GUESS)} The correct answer was **{options[answer]}**."
+ )
+
+ await message.clear_reactions()
+ # endregion
+
+ # region: Commands
+ @group(name="snakes", aliases=("snake",), invoke_without_command=True)
+ async def snakes_group(self, ctx: Context) -> None:
+ """Commands from our first code jam."""
+ await invoke_help_command(ctx)
+
+ @bot_has_permissions(manage_messages=True)
+ @snakes_group.command(name="antidote")
+ @locked()
+ async def antidote_command(self, ctx: Context) -> None:
+ """
+ Antidote! Can you create the antivenom before the patient dies?
+
+ Rules: You have 4 ingredients for each antidote, you only have 10 attempts
+ Once you synthesize the antidote, you will be presented with 4 markers
+ Tick: This means you have a CORRECT ingredient in the CORRECT position
+ Circle: This means you have a CORRECT ingredient in the WRONG position
+ Cross: This means you have a WRONG ingredient in the WRONG position
+
+ Info: The game automatically ends after 5 minutes inactivity.
+ You should only use each ingredient once.
+
+ This game was created by Lord Bisk and Runew0lf.
+ """
+ def predicate(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ # Reaction is on this message
+ reaction_.message.id == board_id.id,
+ # Reaction is one of the pagination emotes
+ reaction_.emoji in ANTIDOTE_EMOJI,
+ # Reaction was not made by the Bot
+ user_.id != self.bot.user.id,
+ # Reaction was made by author
+ user_.id == ctx.author.id
+ ))
+ )
+
+ # Initialize variables
+ antidote_tries = 0
+ antidote_guess_count = 0
+ antidote_guess_list = []
+ guess_result = []
+ board = []
+ page_guess_list = []
+ page_result_list = []
+ win = False
+
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url)
+
+ # Generate answer
+ antidote_answer = list(ANTIDOTE_EMOJI) # Duplicate list, not reference it
+ random.shuffle(antidote_answer)
+ antidote_answer.pop()
+
+ # Begin initial board building
+ for i in range(0, 10):
+ page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}")
+ page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}")
+ board.append(
+ f"`{i+1:02d}` "
+ f"{page_guess_list[i]} - "
+ f"{page_result_list[i]}"
+ )
+ board.append(EMPTY_UNICODE)
+ antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board))
+ board_id = await ctx.send(embed=antidote_embed) # Display board
+
+ # Add our player reactions
+ for emoji in ANTIDOTE_EMOJI:
+ await board_id.add_reaction(emoji)
+
+ # Begin main game loop
+ while not win and antidote_tries < 10:
+ try:
+ reaction, user = await ctx.bot.wait_for(
+ "reaction_add", timeout=300, check=predicate)
+ except asyncio.TimeoutError:
+ log.debug("Antidote timed out waiting for a reaction")
+ break # We're done, no reactions for the last 5 minutes
+
+ if antidote_tries < 10:
+ if antidote_guess_count < 4:
+ if reaction.emoji in ANTIDOTE_EMOJI:
+ antidote_guess_list.append(reaction.emoji)
+ antidote_guess_count += 1
+
+ if antidote_guess_count == 4: # Guesses complete
+ antidote_guess_count = 0
+ page_guess_list[antidote_tries] = " ".join(antidote_guess_list)
+
+ # Now check guess
+ for i in range(0, len(antidote_answer)):
+ if antidote_guess_list[i] == antidote_answer[i]:
+ guess_result.append(TICK_EMOJI)
+ elif antidote_guess_list[i] in antidote_answer:
+ guess_result.append(BLANK_EMOJI)
+ else:
+ guess_result.append(CROSS_EMOJI)
+ guess_result.sort()
+ page_result_list[antidote_tries] = " ".join(guess_result)
+
+ # Rebuild the board
+ board = []
+ for i in range(0, 10):
+ board.append(f"`{i+1:02d}` "
+ f"{page_guess_list[i]} - "
+ f"{page_result_list[i]}")
+ board.append(EMPTY_UNICODE)
+
+ # Remove Reactions
+ for emoji in antidote_guess_list:
+ await board_id.remove_reaction(emoji, user)
+
+ if antidote_guess_list == antidote_answer:
+ win = True
+
+ antidote_tries += 1
+ guess_result = []
+ antidote_guess_list = []
+
+ antidote_embed.clear_fields()
+ antidote_embed.add_field(name=f"{10 - antidote_tries} "
+ f"guesses remaining",
+ value="\n".join(board))
+ # Redisplay the board
+ await board_id.edit(embed=antidote_embed)
+
+ # Winning / Ending Screen
+ if win is True:
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url)
+ antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif")
+ antidote_embed.add_field(name="You have created the snake antidote!",
+ value=f"The solution was: {' '.join(antidote_answer)}\n"
+ f"You had {10 - antidote_tries} tries remaining.")
+ await board_id.edit(embed=antidote_embed)
+ else:
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url)
+ antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif")
+ antidote_embed.add_field(
+ name=EMPTY_UNICODE,
+ value=(
+ f"Sorry you didnt make the antidote in time.\n"
+ f"The formula was {' '.join(antidote_answer)}"
+ )
+ )
+ await board_id.edit(embed=antidote_embed)
+
+ log.debug("Ending pagination and removing all reactions...")
+ await board_id.clear_reactions()
+
+ @snakes_group.command(name="draw")
+ async def draw_command(self, ctx: Context) -> None:
+ """
+ Draws a random snek using Perlin noise.
+
+ Written by Momo and kel.
+ Modified by juan and lemon.
+ """
+ with ctx.typing():
+
+ # Generate random snake attributes
+ width = random.randint(6, 10)
+ length = random.randint(15, 22)
+ random_hue = random.random()
+ snek_color = self._beautiful_pastel(random_hue)
+ text_color = self._beautiful_pastel((random_hue + 0.5) % 1)
+ bg_color = (
+ random.randint(32, 50),
+ random.randint(32, 50),
+ random.randint(50, 70),
+ )
+
+ # Build and send the snek
+ text = random.choice(self.snake_idioms)["idiom"]
+ factory = utils.PerlinNoiseFactory(dimension=1, octaves=2)
+ image_frame = utils.create_snek_frame(
+ factory,
+ snake_width=width,
+ snake_length=length,
+ snake_color=snek_color,
+ text=text,
+ text_color=text_color,
+ bg_color=bg_color
+ )
+ png_bytes = utils.frame_to_png_bytes(image_frame)
+ file = File(png_bytes, filename="snek.png")
+ await ctx.send(file=file)
+
+ @snakes_group.command(name="get")
+ @bot_has_permissions(manage_messages=True)
+ @locked()
+ async def get_command(self, ctx: Context, *, name: Snake = None) -> None:
+ """
+ Fetches information about a snake from Wikipedia.
+
+ Created by Ava and eivl.
+ """
+ with ctx.typing():
+ if name is None:
+ name = await Snake.random()
+
+ if isinstance(name, dict):
+ data = name
+ else:
+ data = await self._get_snek(name)
+
+ if data.get("error"):
+ await ctx.send("Could not fetch data from Wikipedia.")
+ return
+
+ description = data["info"]
+
+ # Shorten the description if needed
+ if len(description) > 1000:
+ description = description[:1000]
+ last_newline = description.rfind("\n")
+ if last_newline > 0:
+ description = description[:last_newline]
+
+ # Strip and add the Wiki link.
+ if "fullurl" in data:
+ description = description.strip("\n")
+ description += f"\n\nRead more on [Wikipedia]({data['fullurl']})"
+
+ # Build and send the embed.
+ embed = Embed(
+ title=data.get("title", data.get("name")),
+ description=description,
+ colour=0x59982F,
+ )
+
+ emoji = "https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png"
+
+ _iter = (
+ url
+ for url in data["image_list"]
+ if url.endswith(self.valid_image_extensions)
+ )
+ image = next(_iter, emoji)
+
+ embed.set_image(url=image)
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="guess", aliases=("identify",))
+ @locked()
+ async def guess_command(self, ctx: Context) -> None:
+ """
+ Snake identifying game.
+
+ Made by Ava and eivl.
+ Modified by lemon.
+ """
+ with ctx.typing():
+
+ image = None
+
+ while image is None:
+ snakes = [await Snake.random() for _ in range(4)]
+ snake = random.choice(snakes)
+ answer = "abcd"[snakes.index(snake)]
+
+ data = await self._get_snek(snake)
+
+ _iter = (
+ url
+ for url in data["image_list"]
+ if url.endswith(self.valid_image_extensions)
+ )
+ image = next(_iter, None)
+
+ embed = Embed(
+ title="Which of the following is the snake in the image?",
+ description="\n".join(
+ f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes),
+ colour=SNAKE_COLOR
+ )
+ embed.set_image(url=image)
+
+ guess = await ctx.send(embed=embed)
+ options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes}
+ await self._validate_answer(ctx, guess, answer, options)
+
+ @snakes_group.command(name="hatch")
+ async def hatch_command(self, ctx: Context) -> None:
+ """
+ Hatches your personal snake.
+
+ Written by Momo and kel.
+ """
+ # Pick a random snake to hatch.
+ snake_name = random.choice(list(utils.snakes.keys()))
+ snake_image = utils.snakes[snake_name]
+
+ # Hatch the snake
+ message = await ctx.send(embed=Embed(description="Hatching your snake :snake:..."))
+ await asyncio.sleep(1)
+
+ for stage in utils.stages:
+ hatch_embed = Embed(description=stage)
+ await message.edit(embed=hatch_embed)
+ await asyncio.sleep(1)
+ await asyncio.sleep(1)
+ await message.delete()
+
+ # Build and send the embed.
+ my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name))
+ my_snake_embed.set_thumbnail(url=snake_image)
+ my_snake_embed.set_footer(
+ text=" Owner: {0}#{1}".format(ctx.author.name, ctx.author.discriminator)
+ )
+
+ await ctx.send(embed=my_snake_embed)
+
+ @snakes_group.command(name="movie")
+ async def movie_command(self, ctx: Context) -> None:
+ """
+ Gets a random snake-related movie from TMDB.
+
+ Written by Samuel.
+ Modified by gdude.
+ Modified by Will Da Silva.
+ """
+ # Initially 8 pages are fetched. The actual number of pages is set after the first request.
+ page = random.randint(1, self.num_movie_pages or 8)
+
+ async with ctx.typing():
+ response = await self.bot.http_session.get(
+ "https://api.themoviedb.org/3/search/movie",
+ params={
+ "query": "snake",
+ "page": page,
+ "language": "en-US",
+ "api_key": Tokens.tmdb,
+ }
+ )
+ data = await response.json()
+ if self.num_movie_pages is None:
+ self.num_movie_pages = data["total_pages"]
+ movie = random.choice(data["results"])["id"]
+
+ response = await self.bot.http_session.get(
+ f"https://api.themoviedb.org/3/movie/{movie}",
+ params={
+ "language": "en-US",
+ "api_key": Tokens.tmdb,
+ }
+ )
+ data = await response.json()
+
+ embed = Embed(title=data["title"], color=SNAKE_COLOR)
+
+ if data["poster_path"] is not None:
+ embed.set_image(url=f"https://images.tmdb.org/t/p/original{data['poster_path']}")
+
+ if data["overview"]:
+ embed.add_field(name="Overview", value=data["overview"])
+
+ if data["release_date"]:
+ embed.add_field(name="Release Date", value=data["release_date"])
+
+ if data["genres"]:
+ embed.add_field(name="Genres", value=", ".join([x["name"] for x in data["genres"]]))
+
+ if data["vote_count"]:
+ embed.add_field(name="Rating", value=f"{data['vote_average']}/10 ({data['vote_count']} votes)", inline=True)
+
+ if data["budget"] and data["revenue"]:
+ embed.add_field(name="Budget", value=data["budget"], inline=True)
+ embed.add_field(name="Revenue", value=data["revenue"], inline=True)
+
+ embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.")
+ embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png")
+
+ try:
+ await ctx.send(embed=embed)
+ except HTTPException as err:
+ await ctx.send("An error occurred while fetching a snake-related movie!")
+ raise err from None
+
+ @snakes_group.command(name="quiz")
+ @locked()
+ async def quiz_command(self, ctx: Context) -> None:
+ """
+ Asks a snake-related question in the chat and validates the user's guess.
+
+ This was created by Mushy and Cardium,
+ and modified by Urthas and lemon.
+ """
+ # Prepare a question.
+ question = random.choice(self.snake_quizzes)
+ answer = question["answerkey"]
+ options = {key: question["options"][key] for key in ANSWERS_EMOJI.keys()}
+
+ # Build and send the embed.
+ embed = Embed(
+ color=SNAKE_COLOR,
+ title=question["question"],
+ description="\n".join(
+ [f"**{key.upper()}**: {answer}" for key, answer in options.items()]
+ )
+ )
+
+ quiz = await ctx.send(embed=embed)
+ await self._validate_answer(ctx, quiz, answer, options)
+
+ @snakes_group.command(name="name", aliases=("name_gen",))
+ async def name_command(self, ctx: Context, *, name: str = None) -> None:
+ """
+ Snakifies a username.
+
+ Slices the users name at the last vowel (or second last if the name
+ ends with a vowel), and then combines it with a random snake name,
+ which is sliced at the first vowel (or second if the name starts with
+ a vowel).
+
+ If the name contains no vowels, it just appends the snakename
+ to the end of the name.
+
+ Examples:
+ lemon + anaconda = lemoconda
+ krzsn + anaconda = krzsnconda
+ gdude + anaconda = gduconda
+ aperture + anaconda = apertuconda
+ lucy + python = luthon
+ joseph + taipan = joseipan
+
+ This was written by Iceman, and modified for inclusion into the bot by lemon.
+ """
+ snake_name = await self._get_snake_name()
+ snake_name = snake_name["name"]
+ snake_prefix = ""
+
+ # Set aside every word in the snake name except the last.
+ if " " in snake_name:
+ snake_prefix = " ".join(snake_name.split()[:-1])
+ snake_name = snake_name.split()[-1]
+
+ # If no name is provided, use whoever called the command.
+ if name:
+ user_name = name
+ else:
+ user_name = ctx.author.display_name
+
+ # Get the index of the vowel to slice the username at
+ user_slice_index = len(user_name)
+ for index, char in enumerate(reversed(user_name)):
+ if index == 0:
+ continue
+ if char.lower() in "aeiouy":
+ user_slice_index -= index
+ break
+
+ # Now, get the index of the vowel to slice the snake_name at
+ snake_slice_index = 0
+ for index, char in enumerate(snake_name):
+ if index == 0:
+ continue
+ if char.lower() in "aeiouy":
+ snake_slice_index = index + 1
+ break
+
+ # Combine!
+ snake_name = snake_name[snake_slice_index:]
+ user_name = user_name[:user_slice_index]
+ result = f"{snake_prefix} {user_name}{snake_name}"
+ result = string.capwords(result)
+
+ # Embed and send
+ embed = Embed(
+ title="Snake name",
+ description=f"Your snake-name is **{result}**",
+ color=SNAKE_COLOR
+ )
+
+ await ctx.send(embed=embed)
+ return
+
+ @snakes_group.command(name="sal")
+ @locked()
+ async def sal_command(self, ctx: Context) -> None:
+ """
+ Play a game of Snakes and Ladders.
+
+ Written by Momo and kel.
+ Modified by lemon.
+ """
+ # Check if there is already a game in this channel
+ if ctx.channel in self.active_sal:
+ await ctx.send(f"{ctx.author.mention} A game is already in progress in this channel.")
+ return
+
+ game = utils.SnakeAndLaddersGame(snakes=self, context=ctx)
+ self.active_sal[ctx.channel] = game
+
+ await game.open_game()
+
+ @snakes_group.command(name="about")
+ async def about_command(self, ctx: Context) -> None:
+ """Show an embed with information about the event, its participants, and its winners."""
+ contributors = [
+ "<@!245270749919576066>",
+ "<@!396290259907903491>",
+ "<@!172395097705414656>",
+ "<@!361708843425726474>",
+ "<@!300302216663793665>",
+ "<@!210248051430916096>",
+ "<@!174588005745557505>",
+ "<@!87793066227822592>",
+ "<@!211619754039967744>",
+ "<@!97347867923976192>",
+ "<@!136081839474343936>",
+ "<@!263560579770220554>",
+ "<@!104749643715387392>",
+ "<@!303940835005825024>",
+ ]
+
+ embed = Embed(
+ title="About the snake cog",
+ description=(
+ "The features in this cog were created by members of the community "
+ "during our first ever "
+ "[code jam event](https://pythondiscord.com/pages/code-jams/code-jam-1-snakes-bot/). \n\n"
+ "The event saw over 50 participants, who competed to write a discord bot cog with a snake theme over "
+ "48 hours. The staff then selected the best features from all the best teams, and made modifications "
+ "to ensure they would all work together before integrating them into the community bot.\n\n"
+ "It was a tight race, but in the end, <@!104749643715387392> and <@!303940835005825024> "
+ f"walked away as grand champions. Make sure you check out `{ctx.prefix}snakes sal`,"
+ f"`{ctx.prefix}snakes draw` and `{ctx.prefix}snakes hatch` "
+ "to see what they came up with."
+ )
+ )
+
+ embed.add_field(
+ name="Contributors",
+ value=(
+ ", ".join(contributors)
+ )
+ )
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="card")
+ async def card_command(self, ctx: Context, *, name: Snake = None) -> None:
+ """
+ Create an interesting little card from a snake.
+
+ Created by juan and Someone during the first code jam.
+ """
+ # Get the snake data we need
+ if not name:
+ name_obj = await self._get_snake_name()
+ name = name_obj["scientific"]
+ content = await self._get_snek(name)
+
+ elif isinstance(name, dict):
+ content = name
+
+ else:
+ content = await self._get_snek(name)
+
+ # Make the card
+ async with ctx.typing():
+
+ stream = BytesIO()
+ async with async_timeout.timeout(10):
+ async with self.bot.http_session.get(content["image_list"][0]) as response:
+ stream.write(await response.read())
+
+ stream.seek(0)
+
+ func = partial(self._generate_card, stream, content)
+ final_buffer = await self.bot.loop.run_in_executor(None, func)
+
+ # Send it!
+ await ctx.send(
+ f"A wild {content['name'].title()} appears!",
+ file=File(final_buffer, filename=content["name"].replace(" ", "") + ".png")
+ )
+
+ @snakes_group.command(name="fact")
+ async def fact_command(self, ctx: Context) -> None:
+ """
+ Gets a snake-related fact.
+
+ Written by Andrew and Prithaj.
+ Modified by lemon.
+ """
+ question = random.choice(self.snake_facts)["fact"]
+ embed = Embed(
+ title="Snake fact",
+ color=SNAKE_COLOR,
+ description=question
+ )
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="snakify")
+ async def snakify_command(self, ctx: Context, *, message: str = None) -> None:
+ """
+ How would I talk if I were a snake?
+
+ If `message` is passed, the bot will snakify the message.
+ Otherwise, a random message from the user's history is snakified.
+
+ Written by Momo and kel.
+ Modified by lemon.
+ """
+ with ctx.typing():
+ embed = Embed()
+ user = ctx.author
+
+ if not message:
+
+ # Get a random message from the users history
+ messages = []
+ async for message in ctx.history(limit=500).filter(
+ lambda msg: msg.author == ctx.author # Message was sent by author.
+ ):
+ messages.append(message.content)
+
+ message = self._get_random_long_message(messages)
+
+ # Build and send the embed
+ embed.set_author(
+ name=f"{user.name}#{user.discriminator}",
+ icon_url=user.display_avatar.url,
+ )
+ embed.description = f"*{self._snakify(message)}*"
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="video", aliases=("get_video",))
+ async def video_command(self, ctx: Context, *, search: str = None) -> None:
+ """
+ Gets a YouTube video about snakes.
+
+ If `search` is given, a snake with that name will be searched on Youtube.
+
+ Written by Andrew and Prithaj.
+ """
+ # Are we searching for anything specific?
+ if search:
+ query = search + " snake"
+ else:
+ snake = await self._get_snake_name()
+ query = snake["name"]
+
+ # Build the URL and make the request
+ url = "https://www.googleapis.com/youtube/v3/search"
+ response = await self.bot.http_session.get(
+ url,
+ params={
+ "part": "snippet",
+ "q": urllib.parse.quote_plus(query),
+ "type": "video",
+ "key": Tokens.youtube
+ }
+ )
+ response = await response.json()
+ data = response.get("items", [])
+
+ # Send the user a video
+ if len(data) > 0:
+ num = random.randint(0, len(data) - 1)
+ youtube_base_url = "https://www.youtube.com/watch?v="
+ await ctx.send(
+ content=f"{youtube_base_url}{data[num]['id']['videoId']}"
+ )
+ else:
+ log.warning(f"YouTube API error. Full response looks like {response}")
+
+ @snakes_group.command(name="zen")
+ async def zen_command(self, ctx: Context) -> None:
+ """
+ Gets a random quote from the Zen of Python, except as if spoken by a snake.
+
+ Written by Prithaj and Andrew.
+ Modified by lemon.
+ """
+ embed = Embed(
+ title="Zzzen of Pythhon",
+ color=SNAKE_COLOR
+ )
+
+ # Get the zen quote and snakify it
+ zen_quote = random.choice(ZEN.splitlines())
+ zen_quote = self._snakify(zen_quote)
+
+ # Embed and send
+ embed.description = zen_quote
+ await ctx.send(
+ embed=embed
+ )
+ # endregion
+
+ # region: Error handlers
+ @card_command.error
+ async def command_error(self, ctx: Context, error: CommandError) -> None:
+ """Local error handler for the Snake Cog."""
+ original_error = getattr(error, "original", None)
+ if isinstance(original_error, OSError):
+ error.handled = True
+ embed = Embed()
+ embed.colour = Colour.red()
+ log.error(f"snake_card encountered an OSError: {error} ({original_error})")
+ embed.description = "Could not generate the snake card! Please try again."
+ embed.title = random.choice(ERROR_REPLIES)
+ await ctx.send(embed=embed)
diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py
new file mode 100644
index 00000000..de51339d
--- /dev/null
+++ b/bot/exts/fun/snakes/_utils.py
@@ -0,0 +1,721 @@
+import asyncio
+import io
+import json
+import logging
+import math
+import random
+from itertools import product
+from pathlib import Path
+
+from PIL import Image
+from PIL.ImageDraw import ImageDraw
+from discord import File, Member, Reaction
+from discord.ext.commands import Cog, Context
+
+from bot.constants import Roles
+
+SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute()
+
+h1 = r"""```
+ ----
+ ------
+/--------\
+|--------|
+|--------|
+ \------/
+ ----
+```"""
+h2 = r"""```
+ ----
+ ------
+/---\-/--\
+|-----\--|
+|--------|
+ \------/
+ ----
+```"""
+h3 = r"""```
+ ----
+ ------
+/---\-/--\
+|-----\--|
+|-----/--|
+ \----\-/
+ ----
+```"""
+h4 = r"""```
+ -----
+ ----- \
+/--| /---\
+|--\ -\---|
+|--\--/-- /
+ \------- /
+ ------
+```"""
+stages = [h1, h2, h3, h4]
+snakes = {
+ "Baby Python": "https://i.imgur.com/SYOcmSa.png",
+ "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png",
+ "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png",
+ "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png",
+ "Baby Cobra": "https://i.imgur.com/jk14ryt.png",
+ "Baby Anaconda": "https://i.imgur.com/EpdrnNr.png",
+}
+
+BOARD_TILE_SIZE = 56 # the size of each board tile
+BOARD_PLAYER_SIZE = 20 # the size of each player icon
+BOARD_MARGIN = (10, 0) # margins, in pixels (for player icons)
+# The size of the image to download
+# Should a power of 2 and higher than BOARD_PLAYER_SIZE
+PLAYER_ICON_IMAGE_SIZE = 32
+MAX_PLAYERS = 4 # depends on the board size/quality, 4 is for the default board
+
+# board definition (from, to)
+BOARD = {
+ # ladders
+ 2: 38,
+ 7: 14,
+ 8: 31,
+ 15: 26,
+ 21: 42,
+ 28: 84,
+ 36: 44,
+ 51: 67,
+ 71: 91,
+ 78: 98,
+ 87: 94,
+
+ # snakes
+ 99: 80,
+ 95: 75,
+ 92: 88,
+ 89: 68,
+ 74: 53,
+ 64: 60,
+ 62: 19,
+ 49: 11,
+ 46: 25,
+ 16: 6
+}
+
+DEFAULT_SNAKE_COLOR = 0x15c7ea
+DEFAULT_BACKGROUND_COLOR = 0
+DEFAULT_IMAGE_DIMENSIONS = (200, 200)
+DEFAULT_SNAKE_LENGTH = 22
+DEFAULT_SNAKE_WIDTH = 8
+DEFAULT_SEGMENT_LENGTH_RANGE = (7, 10)
+DEFAULT_IMAGE_MARGINS = (50, 50)
+DEFAULT_TEXT = "snek\nit\nup"
+DEFAULT_TEXT_POSITION = (
+ 10,
+ 10
+)
+DEFAULT_TEXT_COLOR = 0xf2ea15
+X = 0
+Y = 1
+ANGLE_RANGE = math.pi * 2
+
+
+def get_resource(file: str) -> list[dict]:
+ """Load Snake resources JSON."""
+ return json.loads((SNAKE_RESOURCES / f"{file}.json").read_text("utf-8"))
+
+
+def smoothstep(t: float) -> float:
+ """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating."""
+ return t * t * (3. - 2. * t)
+
+
+def lerp(t: float, a: float, b: float) -> float:
+ """Linear interpolation between a and b, given a fraction t."""
+ return a + t * (b - a)
+
+
+class PerlinNoiseFactory(object):
+ """
+ Callable that produces Perlin noise for an arbitrary point in an arbitrary number of dimensions.
+
+ The underlying grid is aligned with the integers.
+
+ There is no limit to the coordinates used; new gradients are generated on the fly as necessary.
+
+ Taken from: https://gist.github.com/eevee/26f547457522755cb1fb8739d0ea89a1
+ Licensed under ISC
+ """
+
+ def __init__(self, dimension: int, octaves: int = 1, tile: tuple[int, ...] = (), unbias: bool = False):
+ """
+ Create a new Perlin noise factory in the given number of dimensions.
+
+ dimension should be an integer and at least 1.
+
+ More octaves create a foggier and more-detailed noise pattern. More than 4 octaves is rather excessive.
+
+ ``tile`` can be used to make a seamlessly tiling pattern.
+ For example:
+ pnf = PerlinNoiseFactory(2, tile=(0, 3))
+
+ This will produce noise that tiles every 3 units vertically, but never tiles horizontally.
+
+ If ``unbias`` is True, the smoothstep function will be applied to the output before returning
+ it, to counteract some of Perlin noise's significant bias towards the center of its output range.
+ """
+ self.dimension = dimension
+ self.octaves = octaves
+ self.tile = tile + (0,) * dimension
+ self.unbias = unbias
+
+ # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply
+ # by this to scale to ±1
+ self.scale_factor = 2 * dimension ** -0.5
+
+ self.gradient = {}
+
+ def _generate_gradient(self) -> tuple[float, ...]:
+ """
+ Generate a random unit vector at each grid point.
+
+ This is the "gradient" vector, in that the grid tile slopes towards it
+ """
+ # 1 dimension is special, since the only unit vector is trivial;
+ # instead, use a slope between -1 and 1
+ if self.dimension == 1:
+ return (random.uniform(-1, 1),)
+
+ # Generate a random point on the surface of the unit n-hypersphere;
+ # this is the same as a random unit vector in n dimensions. Thanks
+ # to: http://mathworld.wolfram.com/SpherePointPicking.html
+ # Pick n normal random variables with stddev 1
+ random_point = [random.gauss(0, 1) for _ in range(self.dimension)]
+ # Then scale the result to a unit vector
+ scale = sum(n * n for n in random_point) ** -0.5
+ return tuple(coord * scale for coord in random_point)
+
+ def get_plain_noise(self, *point) -> float:
+ """Get plain noise for a single point, without taking into account either octaves or tiling."""
+ if len(point) != self.dimension:
+ raise ValueError(
+ f"Expected {self.dimension} values, got {len(point)}"
+ )
+
+ # Build a list of the (min, max) bounds in each dimension
+ grid_coords = []
+ for coord in point:
+ min_coord = math.floor(coord)
+ max_coord = min_coord + 1
+ grid_coords.append((min_coord, max_coord))
+
+ # Compute the dot product of each gradient vector and the point's
+ # distance from the corresponding grid point. This gives you each
+ # gradient's "influence" on the chosen point.
+ dots = []
+ for grid_point in product(*grid_coords):
+ if grid_point not in self.gradient:
+ self.gradient[grid_point] = self._generate_gradient()
+ gradient = self.gradient[grid_point]
+
+ dot = 0
+ for i in range(self.dimension):
+ dot += gradient[i] * (point[i] - grid_point[i])
+ dots.append(dot)
+
+ # Interpolate all those dot products together. The interpolation is
+ # done with smoothstep to smooth out the slope as you pass from one
+ # grid cell into the next.
+ # Due to the way product() works, dot products are ordered such that
+ # the last dimension alternates: (..., min), (..., max), etc. So we
+ # can interpolate adjacent pairs to "collapse" that last dimension. Then
+ # the results will alternate in their second-to-last dimension, and so
+ # forth, until we only have a single value left.
+ dim = self.dimension
+ while len(dots) > 1:
+ dim -= 1
+ s = smoothstep(point[dim] - grid_coords[dim][0])
+
+ next_dots = []
+ while dots:
+ next_dots.append(lerp(s, dots.pop(0), dots.pop(0)))
+
+ dots = next_dots
+
+ return dots[0] * self.scale_factor
+
+ def __call__(self, *point) -> float:
+ """
+ Get the value of this Perlin noise function at the given point.
+
+ The number of values given should match the number of dimensions.
+ """
+ ret = 0
+ for o in range(self.octaves):
+ o2 = 1 << o
+ new_point = []
+ for i, coord in enumerate(point):
+ coord *= o2
+ if self.tile[i]:
+ coord %= self.tile[i] * o2
+ new_point.append(coord)
+ ret += self.get_plain_noise(*new_point) / o2
+
+ # Need to scale n back down since adding all those extra octaves has
+ # probably expanded it beyond ±1
+ # 1 octave: ±1
+ # 2 octaves: ±1½
+ # 3 octaves: ±1¾
+ ret /= 2 - 2 ** (1 - self.octaves)
+
+ if self.unbias:
+ # The output of the plain Perlin noise algorithm has a fairly
+ # strong bias towards the center due to the central limit theorem
+ # -- in fact the top and bottom 1/8 virtually never happen. That's
+ # a quarter of our entire output range! If only we had a function
+ # in [0..1] that could introduce a bias towards the endpoints...
+ r = (ret + 1) / 2
+ # Doing it this many times is a completely made-up heuristic.
+ for _ in range(int(self.octaves / 2 + 0.5)):
+ r = smoothstep(r)
+ ret = r * 2 - 1
+
+ return ret
+
+
+def create_snek_frame(
+ perlin_factory: PerlinNoiseFactory, perlin_lookup_vertical_shift: float = 0,
+ image_dimensions: tuple[int, int] = DEFAULT_IMAGE_DIMENSIONS,
+ image_margins: tuple[int, int] = DEFAULT_IMAGE_MARGINS,
+ snake_length: int = DEFAULT_SNAKE_LENGTH,
+ snake_color: int = DEFAULT_SNAKE_COLOR, bg_color: int = DEFAULT_BACKGROUND_COLOR,
+ segment_length_range: tuple[int, int] = DEFAULT_SEGMENT_LENGTH_RANGE, snake_width: int = DEFAULT_SNAKE_WIDTH,
+ text: str = DEFAULT_TEXT, text_position: tuple[float, float] = DEFAULT_TEXT_POSITION,
+ text_color: int = DEFAULT_TEXT_COLOR
+) -> Image.Image:
+ """
+ Creates a single random snek frame using Perlin noise.
+
+ `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame.
+ If `text` is given, display the given text with the snek.
+ """
+ start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X])
+ start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y])
+ points: list[tuple[float, float]] = [(start_x, start_y)]
+
+ for index in range(0, snake_length):
+ angle = perlin_factory.get_plain_noise(
+ ((1 / (snake_length + 1)) * (index + 1)) + perlin_lookup_vertical_shift
+ ) * ANGLE_RANGE
+ current_point = points[index]
+ segment_length = random.randint(segment_length_range[0], segment_length_range[1])
+ points.append((
+ current_point[X] + segment_length * math.cos(angle),
+ current_point[Y] + segment_length * math.sin(angle)
+ ))
+
+ # normalize bounds
+ min_dimensions: list[float] = [start_x, start_y]
+ max_dimensions: list[float] = [start_x, start_y]
+ for point in points:
+ min_dimensions[X] = min(point[X], min_dimensions[X])
+ min_dimensions[Y] = min(point[Y], min_dimensions[Y])
+ max_dimensions[X] = max(point[X], max_dimensions[X])
+ max_dimensions[Y] = max(point[Y], max_dimensions[Y])
+
+ # shift towards middle
+ dimension_range = (max_dimensions[X] - min_dimensions[X], max_dimensions[Y] - min_dimensions[Y])
+ shift = (
+ image_dimensions[X] / 2 - (dimension_range[X] / 2 + min_dimensions[X]),
+ image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y])
+ )
+
+ image = Image.new(mode="RGB", size=image_dimensions, color=bg_color)
+ draw = ImageDraw(image)
+ for index in range(1, len(points)):
+ point = points[index]
+ previous = points[index - 1]
+ draw.line(
+ (
+ shift[X] + previous[X],
+ shift[Y] + previous[Y],
+ shift[X] + point[X],
+ shift[Y] + point[Y]
+ ),
+ width=snake_width,
+ fill=snake_color
+ )
+ if text is not None:
+ draw.multiline_text(text_position, text, fill=text_color)
+ del draw
+ return image
+
+
+def frame_to_png_bytes(image: Image) -> io.BytesIO:
+ """Convert image to byte stream."""
+ stream = io.BytesIO()
+ image.save(stream, format="PNG")
+ stream.seek(0)
+ return stream
+
+
+log = logging.getLogger(__name__)
+START_EMOJI = "\u2611" # :ballot_box_with_check: - Start the game
+CANCEL_EMOJI = "\u274C" # :x: - Cancel or leave the game
+ROLL_EMOJI = "\U0001F3B2" # :game_die: - Roll the die!
+JOIN_EMOJI = "\U0001F64B" # :raising_hand: - Join the game.
+STARTUP_SCREEN_EMOJI = [
+ JOIN_EMOJI,
+ START_EMOJI,
+ CANCEL_EMOJI
+]
+GAME_SCREEN_EMOJI = [
+ ROLL_EMOJI,
+ CANCEL_EMOJI
+]
+
+
+class SnakeAndLaddersGame:
+ """Snakes and Ladders game Cog."""
+
+ def __init__(self, snakes: Cog, context: Context):
+ self.snakes = snakes
+ self.ctx = context
+ self.channel = self.ctx.channel
+ self.state = "booting"
+ self.started = False
+ self.author = self.ctx.author
+ self.players = []
+ self.player_tiles = {}
+ self.round_has_rolled = {}
+ self.avatar_images = {}
+ self.board = None
+ self.positions = None
+ self.rolls = []
+
+ async def open_game(self) -> None:
+ """
+ Create a new Snakes and Ladders game.
+
+ Listen for reactions until players have joined, and the game has been started.
+ """
+ def startup_event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ reaction_.message.id == startup.id, # Reaction is on startup message
+ reaction_.emoji in STARTUP_SCREEN_EMOJI, # Reaction is one of the startup emotes
+ user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot
+ ))
+ )
+
+ # Check to see if the bot can remove reactions
+ if not self.channel.permissions_for(self.ctx.guild.me).manage_messages:
+ log.warning(
+ "Unable to start Snakes and Ladders - "
+ f"Missing manage_messages permissions in {self.channel}"
+ )
+ return
+
+ await self._add_player(self.author)
+ await self.channel.send(
+ "**Snakes and Ladders**: A new game is about to start!",
+ file=File(
+ str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"),
+ filename="Snakes and Ladders.jpg"
+ )
+ )
+ startup = await self.channel.send(
+ f"Press {JOIN_EMOJI} to participate, and press "
+ f"{START_EMOJI} to start the game"
+ )
+ for emoji in STARTUP_SCREEN_EMOJI:
+ await startup.add_reaction(emoji)
+
+ self.state = "waiting"
+
+ while not self.started:
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=300,
+ check=startup_event_check
+ )
+ if reaction.emoji == JOIN_EMOJI:
+ await self.player_join(user)
+ elif reaction.emoji == CANCEL_EMOJI:
+ if user == self.author or (self._is_moderator(user) and user not in self.players):
+ # Allow game author or non-playing moderation staff to cancel a waiting game
+ await self.cancel_game()
+ return
+ else:
+ await self.player_leave(user)
+ elif reaction.emoji == START_EMOJI:
+ if self.ctx.author == user:
+ self.started = True
+ await self.start_game(user)
+ await startup.delete()
+ break
+
+ await startup.remove_reaction(reaction.emoji, user)
+
+ except asyncio.TimeoutError:
+ log.debug("Snakes and Ladders timed out waiting for a reaction")
+ await self.cancel_game()
+ return # We're done, no reactions for the last 5 minutes
+
+ async def _add_player(self, user: Member) -> None:
+ """Add player to game."""
+ self.players.append(user)
+ self.player_tiles[user.id] = 1
+
+ avatar_bytes = await user.display_avatar.replace(size=PLAYER_ICON_IMAGE_SIZE).read()
+ im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))
+ self.avatar_images[user.id] = im
+
+ async def player_join(self, user: Member) -> None:
+ """
+ Handle players joining the game.
+
+ Prevent player joining if they have already joined, if the game is full, or if the game is
+ in a waiting state.
+ """
+ for p in self.players:
+ if user == p:
+ await self.channel.send(user.mention + " You are already in the game.", delete_after=10)
+ return
+ if self.state != "waiting":
+ await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10)
+ return
+ if len(self.players) is MAX_PLAYERS:
+ await self.channel.send(user.mention + " The game is full!", delete_after=10)
+ return
+
+ await self._add_player(user)
+
+ await self.channel.send(
+ f"**Snakes and Ladders**: {user.mention} has joined the game.\n"
+ f"There are now {str(len(self.players))} players in the game.",
+ delete_after=10
+ )
+
+ async def player_leave(self, user: Member) -> bool:
+ """
+ Handle players leaving the game.
+
+ Leaving is prevented if the user wasn't part of the game.
+
+ If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean
+ is returned True to prevent a game from continuing after it's destroyed.
+ """
+ is_surrendered = False # Sentinel value to assist with stopping a surrendered game
+ for p in self.players:
+ if user == p:
+ self.players.remove(p)
+ self.player_tiles.pop(p.id, None)
+ self.round_has_rolled.pop(p.id, None)
+ await self.channel.send(
+ "**Snakes and Ladders**: " + user.mention + " has left the game.",
+ delete_after=10
+ )
+
+ if self.state != "waiting" and len(self.players) == 0:
+ await self.channel.send("**Snakes and Ladders**: The game has been surrendered!")
+ is_surrendered = True
+ self._destruct()
+
+ return is_surrendered
+ else:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return is_surrendered
+
+ async def cancel_game(self) -> None:
+ """Cancel the running game."""
+ await self.channel.send("**Snakes and Ladders**: Game has been canceled.")
+ self._destruct()
+
+ async def start_game(self, user: Member) -> None:
+ """
+ Allow the game author to begin the game.
+
+ The game cannot be started if the game is in a waiting state.
+ """
+ if not user == self.author:
+ await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)
+ return
+
+ if not self.state == "waiting":
+ await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)
+ return
+
+ self.state = "starting"
+ player_list = ", ".join(user.mention for user in self.players)
+ await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)
+ await self.start_round()
+
+ async def start_round(self) -> None:
+ """Begin the round."""
+ def game_event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ reaction_.message.id == self.positions.id, # Reaction is on positions message
+ reaction_.emoji in GAME_SCREEN_EMOJI, # Reaction is one of the game emotes
+ user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot
+ ))
+ )
+
+ self.state = "roll"
+ for user in self.players:
+ self.round_has_rolled[user.id] = False
+ board_img = Image.open(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg")
+ player_row_size = math.ceil(MAX_PLAYERS / 2)
+
+ for i, player in enumerate(self.players):
+ tile = self.player_tiles[player.id]
+ tile_coordinates = self._board_coordinate_from_index(tile)
+ x_offset = BOARD_MARGIN[0] + tile_coordinates[0] * BOARD_TILE_SIZE
+ y_offset = \
+ BOARD_MARGIN[1] + (
+ (10 * BOARD_TILE_SIZE) - (9 - tile_coordinates[1]) * BOARD_TILE_SIZE - BOARD_PLAYER_SIZE)
+ x_offset += BOARD_PLAYER_SIZE * (i % player_row_size)
+ y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size)
+ board_img.paste(self.avatar_images[player.id],
+ box=(x_offset, y_offset))
+
+ board_file = File(frame_to_png_bytes(board_img), filename="Board.jpg")
+ player_list = "\n".join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)
+
+ # Store and send new messages
+ temp_board = await self.channel.send(
+ "**Snakes and Ladders**: A new round has started! Current board:",
+ file=board_file
+ )
+ temp_positions = await self.channel.send(
+ f"**Current positions**:\n{player_list}\n\nUse {ROLL_EMOJI} to roll the dice!"
+ )
+
+ # Delete the previous messages
+ if self.board and self.positions:
+ await self.board.delete()
+ await self.positions.delete()
+
+ # remove the roll messages
+ for roll in self.rolls:
+ await roll.delete()
+ self.rolls = []
+
+ # Save new messages
+ self.board = temp_board
+ self.positions = temp_positions
+
+ # Wait for rolls
+ for emoji in GAME_SCREEN_EMOJI:
+ await self.positions.add_reaction(emoji)
+
+ is_surrendered = False
+ while True:
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=300,
+ check=game_event_check
+ )
+
+ if reaction.emoji == ROLL_EMOJI:
+ await self.player_roll(user)
+ elif reaction.emoji == CANCEL_EMOJI:
+ if self._is_moderator(user) and user not in self.players:
+ # Only allow non-playing moderation staff to cancel a running game
+ await self.cancel_game()
+ return
+ else:
+ is_surrendered = await self.player_leave(user)
+
+ await self.positions.remove_reaction(reaction.emoji, user)
+
+ if self._check_all_rolled():
+ break
+
+ except asyncio.TimeoutError:
+ log.debug("Snakes and Ladders timed out waiting for a reaction")
+ await self.cancel_game()
+ return # We're done, no reactions for the last 5 minutes
+
+ # Round completed
+ # Check to see if the game was surrendered before completing the round, without this
+ # sentinel, the game object would be deleted but the next round still posted into purgatory
+ if not is_surrendered:
+ await self._complete_round()
+
+ async def player_roll(self, user: Member) -> None:
+ """Handle the player's roll."""
+ if user.id not in self.player_tiles:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return
+ if self.state != "roll":
+ await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10)
+ return
+ if self.round_has_rolled[user.id]:
+ return
+ roll = random.randint(1, 6)
+ self.rolls.append(await self.channel.send(f"{user.mention} rolled a **{roll}**!"))
+ next_tile = self.player_tiles[user.id] + roll
+
+ # apply snakes and ladders
+ if next_tile in BOARD:
+ target = BOARD[next_tile]
+ if target < next_tile:
+ await self.channel.send(
+ f"{user.mention} slips on a snake and falls back to **{target}**",
+ delete_after=15
+ )
+ else:
+ await self.channel.send(
+ f"{user.mention} climbs a ladder to **{target}**",
+ delete_after=15
+ )
+ next_tile = target
+
+ self.player_tiles[user.id] = min(100, next_tile)
+ self.round_has_rolled[user.id] = True
+
+ async def _complete_round(self) -> None:
+ """At the conclusion of a round check to see if there's been a winner."""
+ self.state = "post_round"
+
+ # check for winner
+ winner = self._check_winner()
+ if winner is None:
+ # there is no winner, start the next round
+ await self.start_round()
+ return
+
+ # announce winner and exit
+ await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:")
+ self._destruct()
+
+ def _check_winner(self) -> Member:
+ """Return a winning member if we're in the post-round state and there's a winner."""
+ if self.state != "post_round":
+ return None
+ return next((player for player in self.players if self.player_tiles[player.id] == 100),
+ None)
+
+ def _check_all_rolled(self) -> bool:
+ """Check if all members have made their roll."""
+ return all(rolled for rolled in self.round_has_rolled.values())
+
+ def _destruct(self) -> None:
+ """Clean up the finished game object."""
+ del self.snakes.active_sal[self.channel]
+
+ def _board_coordinate_from_index(self, index: int) -> tuple[int, int]:
+ """Convert the tile number to the x/y coordinates for graphical purposes."""
+ y_level = 9 - math.floor((index - 1) / 10)
+ is_reversed = math.floor((index - 1) / 10) % 2 != 0
+ x_level = (index - 1) % 10
+ if is_reversed:
+ x_level = 9 - x_level
+ return x_level, y_level
+
+ @staticmethod
+ def _is_moderator(user: Member) -> bool:
+ """Return True if the user is a Moderator."""
+ return any(Roles.moderator == role.id for role in user.roles)
diff --git a/bot/exts/fun/space.py b/bot/exts/fun/space.py
new file mode 100644
index 00000000..48ad0f96
--- /dev/null
+++ b/bot/exts/fun/space.py
@@ -0,0 +1,236 @@
+import logging
+import random
+from datetime import date, datetime
+from typing import Any, Optional
+from urllib.parse import urlencode
+
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import Cog, Context, group
+
+from bot.bot import Bot
+from bot.constants import Tokens
+from bot.utils.converters import DateConverter
+from bot.utils.extensions import invoke_help_command
+
+logger = logging.getLogger(__name__)
+
+NASA_BASE_URL = "https://api.nasa.gov"
+NASA_IMAGES_BASE_URL = "https://images-api.nasa.gov"
+NASA_EPIC_BASE_URL = "https://epic.gsfc.nasa.gov"
+
+APOD_MIN_DATE = date(1995, 6, 16)
+
+
+class Space(Cog):
+ """Space Cog contains commands, that show images, facts or other information about space."""
+
+ def __init__(self, bot: Bot):
+ self.http_session = bot.http_session
+
+ self.rovers = {}
+ self.get_rovers.start()
+
+ def cog_unload(self) -> None:
+ """Cancel `get_rovers` task when Cog will unload."""
+ self.get_rovers.cancel()
+
+ @tasks.loop(hours=24)
+ async def get_rovers(self) -> None:
+ """Get listing of rovers from NASA API and info about their start and end dates."""
+ data = await self.fetch_from_nasa("mars-photos/api/v1/rovers")
+
+ for rover in data["rovers"]:
+ self.rovers[rover["name"].lower()] = {
+ "min_date": rover["landing_date"],
+ "max_date": rover["max_date"],
+ "max_sol": rover["max_sol"]
+ }
+
+ @group(name="space", invoke_without_command=True)
+ async def space(self, ctx: Context) -> None:
+ """Head command that contains commands about space."""
+ await invoke_help_command(ctx)
+
+ @space.command(name="apod")
+ async def apod(self, ctx: Context, date: Optional[str]) -> None:
+ """
+ Get Astronomy Picture of Day from NASA API. Date is optional parameter, what formatting is YYYY-MM-DD.
+
+ If date is not specified, this will get today APOD.
+ """
+ params = {}
+ # Parse date to params, when provided. Show error message when invalid formatting
+ if date:
+ try:
+ apod_date = datetime.strptime(date, "%Y-%m-%d").date()
+ except ValueError:
+ await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.")
+ return
+
+ now = datetime.now().date()
+ if APOD_MIN_DATE > apod_date or now < apod_date:
+ await ctx.send(f"Date must be between {APOD_MIN_DATE.isoformat()} and {now.isoformat()} (today).")
+ return
+
+ params["date"] = apod_date.isoformat()
+
+ result = await self.fetch_from_nasa("planetary/apod", params)
+
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ f"Astronomy Picture of the Day - {result['date']}",
+ result["explanation"],
+ result["url"]
+ )
+ )
+
+ @space.command(name="nasa")
+ async def nasa(self, ctx: Context, *, search_term: Optional[str]) -> None:
+ """Get random NASA information/facts + image. Support `search_term` parameter for more specific search."""
+ params = {
+ "media_type": "image"
+ }
+ if search_term:
+ params["q"] = search_term
+
+ # Don't use API key, no need for this.
+ data = await self.fetch_from_nasa("search", params, NASA_IMAGES_BASE_URL, use_api_key=False)
+ if len(data["collection"]["items"]) == 0:
+ await ctx.send(f"Can't find any items with search term `{search_term}`.")
+ return
+
+ item = random.choice(data["collection"]["items"])
+
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ item["data"][0]["title"],
+ item["data"][0]["description"],
+ item["links"][0]["href"]
+ )
+ )
+
+ @space.command(name="epic")
+ async def epic(self, ctx: Context, date: Optional[str]) -> None:
+ """Get a random image of the Earth from the NASA EPIC API. Support date parameter, format is YYYY-MM-DD."""
+ if date:
+ try:
+ show_date = datetime.strptime(date, "%Y-%m-%d").date().isoformat()
+ except ValueError:
+ await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.")
+ return
+ else:
+ show_date = None
+
+ # Don't use API key, no need for this.
+ data = await self.fetch_from_nasa(
+ f"api/natural{f'/date/{show_date}' if show_date else ''}",
+ base=NASA_EPIC_BASE_URL,
+ use_api_key=False
+ )
+ if len(data) < 1:
+ await ctx.send("Can't find any images in this date.")
+ return
+
+ item = random.choice(data)
+
+ year, month, day = item["date"].split(" ")[0].split("-")
+ image_url = f"{NASA_EPIC_BASE_URL}/archive/natural/{year}/{month}/{day}/jpg/{item['image']}.jpg"
+
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ "Earth Image", item["caption"], image_url, f" \u2022 Identifier: {item['identifier']}"
+ )
+ )
+
+ @space.group(name="mars", invoke_without_command=True)
+ async def mars(
+ self,
+ ctx: Context,
+ date: Optional[DateConverter],
+ rover: str = "curiosity"
+ ) -> None:
+ """
+ Get random Mars image by date. Support both SOL (martian solar day) and earth date and rovers.
+
+ Earth date formatting is YYYY-MM-DD. Use `.space mars dates` to get all currently available rovers.
+ """
+ rover = rover.lower()
+ if rover not in self.rovers:
+ await ctx.send(
+ (
+ f"Invalid rover `{rover}`.\n"
+ f"**Rovers:** `{'`, `'.join(f'{r.capitalize()}' for r in self.rovers)}`"
+ )
+ )
+ return
+
+ # When date not provided, get random SOL date between 0 and rover's max.
+ if date is None:
+ date = random.randint(0, self.rovers[rover]["max_sol"])
+
+ params = {}
+ if isinstance(date, int):
+ params["sol"] = date
+ else:
+ params["earth_date"] = date.date().isoformat()
+
+ result = await self.fetch_from_nasa(f"mars-photos/api/v1/rovers/{rover}/photos", params)
+ if len(result["photos"]) < 1:
+ err_msg = (
+ f"We can't find result in date "
+ f"{date.date().isoformat() if isinstance(date, datetime) else f'{date} SOL'}.\n"
+ f"**Note:** Dates must match with rover's working dates. Please use `{ctx.prefix}space mars dates` to "
+ "see working dates for each rover."
+ )
+ await ctx.send(err_msg)
+ return
+
+ item = random.choice(result["photos"])
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ f"{item['rover']['name']}'s {item['camera']['full_name']} Mars Image", "", item["img_src"],
+ )
+ )
+
+ @mars.command(name="dates", aliases=("d", "date", "rover", "rovers", "r"))
+ async def dates(self, ctx: Context) -> None:
+ """Get current available rovers photo date ranges."""
+ await ctx.send("\n".join(
+ f"**{r.capitalize()}:** {i['min_date']} **-** {i['max_date']}" for r, i in self.rovers.items()
+ ))
+
+ async def fetch_from_nasa(
+ self,
+ endpoint: str,
+ additional_params: Optional[dict[str, Any]] = None,
+ base: Optional[str] = NASA_BASE_URL,
+ use_api_key: bool = True
+ ) -> dict[str, Any]:
+ """Fetch information from NASA API, return result."""
+ params = {}
+ if use_api_key:
+ params["api_key"] = Tokens.nasa
+
+ # Add additional parameters to request parameters only when they provided by user
+ if additional_params is not None:
+ params.update(additional_params)
+
+ async with self.http_session.get(url=f"{base}/{endpoint}?{urlencode(params)}") as resp:
+ return await resp.json()
+
+ def create_nasa_embed(self, title: str, description: str, image: str, footer: Optional[str] = "") -> Embed:
+ """Generate NASA commands embeds. Required: title, description and image URL, footer (addition) is optional."""
+ return Embed(
+ title=title,
+ description=description
+ ).set_image(url=image).set_footer(text="Powered by NASA API" + footer)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Space cog."""
+ if not Tokens.nasa:
+ logger.warning("Can't find NASA API key. Not loading Space Cog.")
+ return
+
+ bot.add_cog(Space(bot))
diff --git a/bot/exts/fun/speedrun.py b/bot/exts/fun/speedrun.py
new file mode 100644
index 00000000..c2966ce1
--- /dev/null
+++ b/bot/exts/fun/speedrun.py
@@ -0,0 +1,26 @@
+import json
+import logging
+from pathlib import Path
+from random import choice
+
+from discord.ext import commands
+
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+
+LINKS = json.loads(Path("bot/resources/fun/speedrun_links.json").read_text("utf8"))
+
+
+class Speedrun(commands.Cog):
+ """Commands about the video game speedrunning community."""
+
+ @commands.command(name="speedrun")
+ async def get_speedrun(self, ctx: commands.Context) -> None:
+ """Sends a link to a video of a random speedrun."""
+ await ctx.send(choice(LINKS))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Speedrun cog."""
+ bot.add_cog(Speedrun())
diff --git a/bot/exts/fun/status_codes.py b/bot/exts/fun/status_codes.py
new file mode 100644
index 00000000..501cbe0a
--- /dev/null
+++ b/bot/exts/fun/status_codes.py
@@ -0,0 +1,87 @@
+from random import choice
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+
+HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg"
+HTTP_CAT_URL = "https://http.cat/{code}.jpg"
+STATUS_TEMPLATE = "**Status: {code}**"
+ERR_404 = "Unable to find status floof for {code}."
+ERR_UNKNOWN = "Error attempting to retrieve status floof for {code}."
+ERROR_LENGTH_EMBED = discord.Embed(
+ title="Input status code does not exist",
+ description="The range of valid status codes is 100 to 599",
+)
+
+
+class HTTPStatusCodes(commands.Cog):
+ """
+ Fetch an image depicting HTTP status codes as a dog or a cat.
+
+ If neither animal is selected a cat or dog is chosen randomly for the given status code.
+ """
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @commands.group(
+ name="http_status",
+ aliases=("status", "httpstatus"),
+ invoke_without_command=True,
+ )
+ async def http_status_group(self, ctx: commands.Context, code: int) -> None:
+ """Choose a cat or dog randomly for the given status code."""
+ subcmd = choice((self.http_cat, self.http_dog))
+ await subcmd(ctx, code)
+
+ @http_status_group.command(name="cat")
+ async def http_cat(self, ctx: commands.Context, code: int) -> None:
+ """Send a cat version of the requested HTTP status code."""
+ if code in range(100, 600):
+ await self.build_embed(url=HTTP_CAT_URL.format(code=code), ctx=ctx, code=code)
+ return
+ await ctx.send(embed=ERROR_LENGTH_EMBED)
+
+ @http_status_group.command(name="dog")
+ async def http_dog(self, ctx: commands.Context, code: int) -> None:
+ """Send a dog version of the requested HTTP status code."""
+ if code in range(100, 600):
+ await self.build_embed(url=HTTP_DOG_URL.format(code=code), ctx=ctx, code=code)
+ return
+ await ctx.send(embed=ERROR_LENGTH_EMBED)
+
+ async def build_embed(self, url: str, ctx: commands.Context, code: int) -> None:
+ """Attempt to build and dispatch embed. Append error message instead if something goes wrong."""
+ async with self.bot.http_session.get(url, allow_redirects=False) as response:
+ if response.status in range(200, 300):
+ await ctx.send(
+ embed=discord.Embed(
+ title=STATUS_TEMPLATE.format(code=code)
+ ).set_image(url=url)
+ )
+ elif response.status in (302, 404): # dog URL returns 302 instead of 404
+ if "dog" in url:
+ await ctx.send(
+ embed=discord.Embed(
+ title=ERR_404.format(code=code)
+ ).set_image(url="https://httpstatusdogs.com/img/404.jpg")
+ )
+ return
+ await ctx.send(
+ embed=discord.Embed(
+ title=ERR_404.format(code=code)
+ ).set_image(url="https://http.cat/404.jpg")
+ )
+ else:
+ await ctx.send(
+ embed=discord.Embed(
+ title=STATUS_TEMPLATE.format(code=code)
+ ).set_footer(text=ERR_UNKNOWN.format(code=code))
+ )
+
+
+def setup(bot: Bot) -> None:
+ """Load the HTTPStatusCodes cog."""
+ bot.add_cog(HTTPStatusCodes(bot))
diff --git a/bot/exts/fun/tic_tac_toe.py b/bot/exts/fun/tic_tac_toe.py
new file mode 100644
index 00000000..5c4f8051
--- /dev/null
+++ b/bot/exts/fun/tic_tac_toe.py
@@ -0,0 +1,335 @@
+import asyncio
+import random
+from typing import Callable, Optional, Union
+
+import discord
+from discord.ext.commands import Cog, Context, check, group, guild_only
+
+from bot.bot import Bot
+from bot.constants import Emojis
+from bot.utils.pagination import LinePaginator
+
+CONFIRMATION_MESSAGE = (
+ "{opponent}, {requester} wants to play Tic-Tac-Toe against you."
+ f"\nReact to this message with {Emojis.confirmation} to accept or with {Emojis.decline} to decline."
+)
+
+
+def check_win(board: dict[int, str]) -> bool:
+ """Check from board, is any player won game."""
+ return any(
+ (
+ # Horizontal
+ board[1] == board[2] == board[3],
+ board[4] == board[5] == board[6],
+ board[7] == board[8] == board[9],
+ # Vertical
+ board[1] == board[4] == board[7],
+ board[2] == board[5] == board[8],
+ board[3] == board[6] == board[9],
+ # Diagonal
+ board[1] == board[5] == board[9],
+ board[3] == board[5] == board[7],
+ )
+ )
+
+
+class Player:
+ """Class that contains information about player and functions that interact with player."""
+
+ def __init__(self, user: discord.User, ctx: Context, symbol: str):
+ self.user = user
+ self.ctx = ctx
+ self.symbol = symbol
+
+ async def get_move(self, board: dict[int, str], msg: discord.Message) -> tuple[bool, Optional[int]]:
+ """
+ Get move from user.
+
+ Return is timeout reached and position of field what user will fill when timeout don't reach.
+ """
+ def check_for_move(r: discord.Reaction, u: discord.User) -> bool:
+ """Check does user who reacted is user who we want, message is board and emoji is in board values."""
+ return (
+ u.id == self.user.id
+ and msg.id == r.message.id
+ and r.emoji in board.values()
+ and r.emoji in Emojis.number_emojis.values()
+ )
+
+ try:
+ react, _ = await self.ctx.bot.wait_for("reaction_add", timeout=30.0, check=check_for_move)
+ except asyncio.TimeoutError:
+ return True, None
+ else:
+ return False, list(Emojis.number_emojis.keys())[list(Emojis.number_emojis.values()).index(react.emoji)]
+
+ def __str__(self) -> str:
+ """Return mention of user."""
+ return self.user.mention
+
+
+class AI:
+ """Tic Tac Toe AI class for against computer gaming."""
+
+ def __init__(self, symbol: str):
+ self.symbol = symbol
+
+ async def get_move(self, board: dict[int, str], _: discord.Message) -> tuple[bool, int]:
+ """Get move from AI. AI use Minimax strategy."""
+ possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())]
+
+ for symbol in (Emojis.o_square, Emojis.x_square):
+ for move in possible_moves:
+ board_copy = board.copy()
+ board_copy[move] = symbol
+ if check_win(board_copy):
+ return False, move
+
+ open_corners = [i for i in possible_moves if i in (1, 3, 7, 9)]
+ if len(open_corners) > 0:
+ return False, random.choice(open_corners)
+
+ if 5 in possible_moves:
+ return False, 5
+
+ open_edges = [i for i in possible_moves if i in (2, 4, 6, 8)]
+ return False, random.choice(open_edges)
+
+ def __str__(self) -> str:
+ """Return `AI` as user name."""
+ return "AI"
+
+
+class Game:
+ """Class that contains information and functions about Tic Tac Toe game."""
+
+ def __init__(self, players: list[Union[Player, AI]], ctx: Context):
+ self.players = players
+ self.ctx = ctx
+ self.board = {
+ 1: Emojis.number_emojis[1],
+ 2: Emojis.number_emojis[2],
+ 3: Emojis.number_emojis[3],
+ 4: Emojis.number_emojis[4],
+ 5: Emojis.number_emojis[5],
+ 6: Emojis.number_emojis[6],
+ 7: Emojis.number_emojis[7],
+ 8: Emojis.number_emojis[8],
+ 9: Emojis.number_emojis[9]
+ }
+
+ self.current = self.players[0]
+ self.next = self.players[1]
+
+ self.winner: Optional[Union[Player, AI]] = None
+ self.loser: Optional[Union[Player, AI]] = None
+ self.over = False
+ self.canceled = False
+ self.draw = False
+
+ async def get_confirmation(self) -> tuple[bool, Optional[str]]:
+ """
+ Ask does user want to play TicTacToe against requester. First player is always requester.
+
+ This return tuple that have:
+ - first element boolean (is game accepted?)
+ - (optional, only when first element is False, otherwise None) reason for declining.
+ """
+ confirm_message = await self.ctx.send(
+ CONFIRMATION_MESSAGE.format(
+ opponent=self.players[1].user.mention,
+ requester=self.players[0].user.mention
+ )
+ )
+ await confirm_message.add_reaction(Emojis.confirmation)
+ await confirm_message.add_reaction(Emojis.decline)
+
+ def confirm_check(reaction: discord.Reaction, user: discord.User) -> bool:
+ """Check is user who reacted from who this was requested, message is confirmation and emoji is valid."""
+ return (
+ reaction.emoji in (Emojis.confirmation, Emojis.decline)
+ and reaction.message.id == confirm_message.id
+ and user == self.players[1].user
+ )
+
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=60.0,
+ check=confirm_check
+ )
+ except asyncio.TimeoutError:
+ self.over = True
+ self.canceled = True
+ await confirm_message.delete()
+ return False, "Running out of time... Cancelled game."
+
+ await confirm_message.delete()
+ if reaction.emoji == Emojis.confirmation:
+ return True, None
+ else:
+ self.over = True
+ self.canceled = True
+ return False, "User declined"
+
+ async def add_reactions(self, msg: discord.Message) -> None:
+ """Add number emojis to message."""
+ for nr in Emojis.number_emojis.values():
+ await msg.add_reaction(nr)
+
+ def format_board(self) -> str:
+ """Get formatted tic-tac-toe board for message."""
+ board = list(self.board.values())
+ return "\n".join(
+ (f"{board[line]} {board[line + 1]} {board[line + 2]}" for line in range(0, len(board), 3))
+ )
+
+ async def play(self) -> None:
+ """Start and handle game."""
+ await self.ctx.send("It's time for the game! Let's begin.")
+ board = await self.ctx.send(
+ embed=discord.Embed(description=self.format_board())
+ )
+ await self.add_reactions(board)
+
+ for _ in range(9):
+ if isinstance(self.current, Player):
+ announce = await self.ctx.send(
+ f"{self.current.user.mention}, it's your turn! "
+ "React with an emoji to take your go."
+ )
+ timeout, pos = await self.current.get_move(self.board, board)
+ if isinstance(self.current, Player):
+ await announce.delete()
+ if timeout:
+ await self.ctx.send(f"{self.current.user.mention} ran out of time. Canceling game.")
+ self.over = True
+ self.canceled = True
+ return
+ self.board[pos] = self.current.symbol
+ await board.edit(
+ embed=discord.Embed(description=self.format_board())
+ )
+ await board.clear_reaction(Emojis.number_emojis[pos])
+ if check_win(self.board):
+ self.winner = self.current
+ self.loser = self.next
+ await self.ctx.send(
+ f":tada: {self.current} won this game! :tada:"
+ )
+ await board.clear_reactions()
+ break
+ self.current, self.next = self.next, self.current
+ if not self.winner:
+ self.draw = True
+ await self.ctx.send("It's a DRAW!")
+ self.over = True
+
+
+def is_channel_free() -> Callable:
+ """Check is channel where command will be invoked free."""
+ async def predicate(ctx: Context) -> bool:
+ return all(game.channel != ctx.channel for game in ctx.cog.games if not game.over)
+ return check(predicate)
+
+
+def is_requester_free() -> Callable:
+ """Check is requester not already in any game."""
+ async def predicate(ctx: Context) -> bool:
+ return all(
+ ctx.author not in (player.user for player in game.players) for game in ctx.cog.games if not game.over
+ )
+ return check(predicate)
+
+
+class TicTacToe(Cog):
+ """TicTacToe cog contains tic-tac-toe game commands."""
+
+ def __init__(self):
+ self.games: list[Game] = []
+
+ @guild_only()
+ @is_channel_free()
+ @is_requester_free()
+ @group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True)
+ async def tic_tac_toe(self, ctx: Context, opponent: Optional[discord.User]) -> None:
+ """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field."""
+ if opponent == ctx.author:
+ await ctx.send("You can't play against yourself.")
+ return
+ if opponent is not None and not all(
+ opponent not in (player.user for player in g.players) for g in ctx.cog.games if not g.over
+ ):
+ await ctx.send("Opponent is already in game.")
+ return
+ if opponent is None:
+ game = Game(
+ [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)],
+ ctx
+ )
+ else:
+ game = Game(
+ [Player(ctx.author, ctx, Emojis.x_square), Player(opponent, ctx, Emojis.o_square)],
+ ctx
+ )
+ self.games.append(game)
+ if opponent is not None:
+ if opponent.bot: # check whether the opponent is a bot or not
+ await ctx.send("You can't play Tic-Tac-Toe with bots!")
+ return
+
+ confirmed, msg = await game.get_confirmation()
+
+ if not confirmed:
+ if msg:
+ await ctx.send(msg)
+ return
+ await game.play()
+
+ @tic_tac_toe.group(name="history", aliases=("log",), invoke_without_command=True)
+ async def tic_tac_toe_logs(self, ctx: Context) -> None:
+ """Show most recent tic-tac-toe games."""
+ if len(self.games) < 1:
+ await ctx.send("No recent games.")
+ return
+ log_games = []
+ for i, game in enumerate(self.games):
+ if game.over and not game.canceled:
+ if game.draw:
+ log_games.append(
+ f"**#{i+1}**: {game.players[0]} vs {game.players[1]} (draw)"
+ )
+ else:
+ log_games.append(
+ f"**#{i+1}**: {game.winner} :trophy: vs {game.loser}"
+ )
+ await LinePaginator.paginate(
+ log_games,
+ ctx,
+ discord.Embed(title="Most recent Tic Tac Toe games")
+ )
+
+ @tic_tac_toe_logs.command(name="show", aliases=("s",))
+ async def show_tic_tac_toe_board(self, ctx: Context, game_id: int) -> None:
+ """View game board by ID (ID is possible to get by `.tictactoe history`)."""
+ if len(self.games) < game_id:
+ await ctx.send("Game don't exist.")
+ return
+ game = self.games[game_id - 1]
+
+ if game.draw:
+ description = f"{game.players[0]} vs {game.players[1]} (draw)\n\n{game.format_board()}"
+ else:
+ description = f"{game.winner} :trophy: vs {game.loser}\n\n{game.format_board()}"
+
+ embed = discord.Embed(
+ title=f"Match #{game_id} Game Board",
+ description=description,
+ )
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the TicTacToe cog."""
+ bot.add_cog(TicTacToe())
diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py
new file mode 100644
index 00000000..cf9e6cd3
--- /dev/null
+++ b/bot/exts/fun/trivia_quiz.py
@@ -0,0 +1,593 @@
+import asyncio
+import json
+import logging
+import operator
+import random
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Callable, Optional
+
+import discord
+from discord.ext import commands
+from rapidfuzz import fuzz
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES, Roles
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_QUESTION_LIMIT = 6
+STANDARD_VARIATION_TOLERANCE = 88
+DYNAMICALLY_GEN_VARIATION_TOLERANCE = 97
+
+WRONG_ANS_RESPONSE = [
+ "No one answered correctly!",
+ "Better luck next time...",
+]
+
+N_PREFIX_STARTS_AT = 5
+N_PREFIXES = [
+ "penta", "hexa", "hepta", "octa", "nona",
+ "deca", "hendeca", "dodeca", "trideca", "tetradeca",
+]
+
+PLANETS = [
+ ("1st", "Mercury"),
+ ("2nd", "Venus"),
+ ("3rd", "Earth"),
+ ("4th", "Mars"),
+ ("5th", "Jupiter"),
+ ("6th", "Saturn"),
+ ("7th", "Uranus"),
+ ("8th", "Neptune"),
+]
+
+TAXONOMIC_HIERARCHY = [
+ "species", "genus", "family", "order",
+ "class", "phylum", "kingdom", "domain",
+]
+
+UNITS_TO_BASE_UNITS = {
+ "hertz": ("(unit of frequency)", "s^-1"),
+ "newton": ("(unit of force)", "m*kg*s^-2"),
+ "pascal": ("(unit of pressure & stress)", "m^-1*kg*s^-2"),
+ "joule": ("(unit of energy & quantity of heat)", "m^2*kg*s^-2"),
+ "watt": ("(unit of power)", "m^2*kg*s^-3"),
+ "coulomb": ("(unit of electric charge & quantity of electricity)", "s*A"),
+ "volt": ("(unit of voltage & electromotive force)", "m^2*kg*s^-3*A^-1"),
+ "farad": ("(unit of capacitance)", "m^-2*kg^-1*s^4*A^2"),
+ "ohm": ("(unit of electric resistance)", "m^2*kg*s^-3*A^-2"),
+ "weber": ("(unit of magnetic flux)", "m^2*kg*s^-2*A^-1"),
+ "tesla": ("(unit of magnetic flux density)", "kg*s^-2*A^-1"),
+}
+
+
+@dataclass(frozen=True)
+class QuizEntry:
+ """Dataclass for a quiz entry (a question and a string containing answers separated by commas)."""
+
+ question: str
+ answer: str
+
+
+def linear_system(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a system of linear equations with two unknowns."""
+ x, y = random.randint(2, 5), random.randint(2, 5)
+ answer = a_format.format(x, y)
+
+ coeffs = random.sample(range(1, 6), 4)
+
+ question = q_format.format(
+ coeffs[0],
+ coeffs[1],
+ coeffs[0] * x + coeffs[1] * y,
+ coeffs[2],
+ coeffs[3],
+ coeffs[2] * x + coeffs[3] * y,
+ )
+
+ return QuizEntry(question, answer)
+
+
+def mod_arith(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a basic modular arithmetic question."""
+ quotient, m, b = random.randint(30, 40), random.randint(10, 20), random.randint(200, 350)
+ ans = random.randint(0, 9) # max remainder is 9, since the minimum modulus is 10
+ a = quotient * m + ans - b
+
+ question = q_format.format(a, b, m)
+ answer = a_format.format(ans)
+
+ return QuizEntry(question, answer)
+
+
+def ngonal_prism(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a question regarding vertices on n-gonal prisms."""
+ n = random.randint(0, len(N_PREFIXES) - 1)
+
+ question = q_format.format(N_PREFIXES[n])
+ answer = a_format.format((n + N_PREFIX_STARTS_AT) * 2)
+
+ return QuizEntry(question, answer)
+
+
+def imag_sqrt(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a negative square root question."""
+ ans_coeff = random.randint(3, 10)
+
+ question = q_format.format(ans_coeff ** 2)
+ answer = a_format.format(ans_coeff)
+
+ return QuizEntry(question, answer)
+
+
+def binary_calc(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a binary calculation question."""
+ a = random.randint(15, 20)
+ b = random.randint(10, a)
+ oper = random.choice(
+ (
+ ("+", operator.add),
+ ("-", operator.sub),
+ ("*", operator.mul),
+ )
+ )
+
+ # if the operator is multiplication, lower the values of the two operands to make it easier
+ if oper[0] == "*":
+ a -= 5
+ b -= 5
+
+ question = q_format.format(a, oper[0], b)
+ answer = a_format.format(oper[1](a, b))
+
+ return QuizEntry(question, answer)
+
+
+def solar_system(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a question on the planets of the Solar System."""
+ planet = random.choice(PLANETS)
+
+ question = q_format.format(planet[0])
+ answer = a_format.format(planet[1])
+
+ return QuizEntry(question, answer)
+
+
+def taxonomic_rank(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a question on taxonomic classification."""
+ level = random.randint(0, len(TAXONOMIC_HIERARCHY) - 2)
+
+ question = q_format.format(TAXONOMIC_HIERARCHY[level])
+ answer = a_format.format(TAXONOMIC_HIERARCHY[level + 1])
+
+ return QuizEntry(question, answer)
+
+
+def base_units_convert(q_format: str, a_format: str) -> QuizEntry:
+ """Generate a SI base units conversion question."""
+ unit = random.choice(list(UNITS_TO_BASE_UNITS))
+
+ question = q_format.format(
+ unit + " " + UNITS_TO_BASE_UNITS[unit][0]
+ )
+ answer = a_format.format(
+ UNITS_TO_BASE_UNITS[unit][1]
+ )
+
+ return QuizEntry(question, answer)
+
+
+DYNAMIC_QUESTIONS_FORMAT_FUNCS = {
+ 201: linear_system,
+ 202: mod_arith,
+ 203: ngonal_prism,
+ 204: imag_sqrt,
+ 205: binary_calc,
+ 301: solar_system,
+ 302: taxonomic_rank,
+ 303: base_units_convert,
+}
+
+
+class TriviaQuiz(commands.Cog):
+ """A cog for all quiz commands."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ self.game_status = {} # A variable to store the game status: either running or not running.
+ self.game_owners = {} # A variable to store the person's ID who started the quiz game in a channel.
+
+ self.questions = self.load_questions()
+ self.question_limit = 0
+
+ self.player_scores = {} # A variable to store all player's scores for a bot session.
+ self.game_player_scores = {} # A variable to store temporary game player's scores.
+
+ self.categories = {
+ "general": "Test your general knowledge.",
+ "retro": "Questions related to retro gaming.",
+ "math": "General questions about mathematics ranging from grade 8 to grade 12.",
+ "science": "Put your understanding of science to the test!",
+ "cs": "A large variety of computer science questions.",
+ "python": "Trivia on our amazing language, Python!",
+ }
+
+ @staticmethod
+ def load_questions() -> dict:
+ """Load the questions from the JSON file."""
+ p = Path("bot", "resources", "fun", "trivia_quiz.json")
+
+ return json.loads(p.read_text(encoding="utf-8"))
+
+ @commands.group(name="quiz", aliases=["trivia"], invoke_without_command=True)
+ async def quiz_game(self, ctx: commands.Context, category: Optional[str], questions: Optional[int]) -> None:
+ """
+ Start a quiz!
+
+ Questions for the quiz can be selected from the following categories:
+ - general: Test your general knowledge.
+ - retro: Questions related to retro gaming.
+ - math: General questions about mathematics ranging from grade 8 to grade 12.
+ - science: Put your understanding of science to the test!
+ - cs: A large variety of computer science questions.
+ - python: Trivia on our amazing language, Python!
+
+ (More to come!)
+ """
+ if ctx.channel.id not in self.game_status:
+ self.game_status[ctx.channel.id] = False
+
+ if ctx.channel.id not in self.game_player_scores:
+ self.game_player_scores[ctx.channel.id] = {}
+
+ # Stop game if running.
+ if self.game_status[ctx.channel.id]:
+ await ctx.send(
+ "Game is already running... "
+ f"do `{self.bot.command_prefix}quiz stop`"
+ )
+ return
+
+ # Send embed showing available categories if inputted category is invalid.
+ if category is None:
+ category = random.choice(list(self.categories))
+
+ category = category.lower()
+ if category not in self.categories:
+ embed = self.category_embed()
+ await ctx.send(embed=embed)
+ return
+
+ topic = self.questions[category]
+ topic_length = len(topic)
+
+ if questions is None:
+ self.question_limit = DEFAULT_QUESTION_LIMIT
+ else:
+ if questions > topic_length:
+ await ctx.send(
+ embed=self.make_error_embed(
+ f"This category only has {topic_length} questions. "
+ "Please input a lower value!"
+ )
+ )
+ return
+
+ elif questions < 1:
+ await ctx.send(
+ embed=self.make_error_embed(
+ "You must choose to complete at least one question. "
+ f"(or enter nothing for the default value of {DEFAULT_QUESTION_LIMIT + 1} questions)"
+ )
+ )
+ return
+
+ else:
+ self.question_limit = questions - 1
+
+ # Start game if not running.
+ if not self.game_status[ctx.channel.id]:
+ self.game_owners[ctx.channel.id] = ctx.author
+ self.game_status[ctx.channel.id] = True
+ start_embed = self.make_start_embed(category)
+
+ await ctx.send(embed=start_embed) # send an embed with the rules
+ await asyncio.sleep(5)
+
+ done_question = []
+ hint_no = 0
+ answers = None
+
+ while self.game_status[ctx.channel.id]:
+ # Exit quiz if number of questions for a round are already sent.
+ if len(done_question) > self.question_limit and hint_no == 0:
+ await ctx.send("The round has ended.")
+ await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id])
+
+ self.game_status[ctx.channel.id] = False
+ del self.game_owners[ctx.channel.id]
+ self.game_player_scores[ctx.channel.id] = {}
+
+ break
+
+ # If no hint has been sent or any time alert. Basically if hint_no = 0 means it is a new question.
+ if hint_no == 0:
+ # Select a random question which has not been used yet.
+ while True:
+ question_dict = random.choice(topic)
+ if question_dict["id"] not in done_question:
+ done_question.append(question_dict["id"])
+ break
+
+ if "dynamic_id" not in question_dict:
+ question = question_dict["question"]
+ answers = question_dict["answer"].split(", ")
+
+ var_tol = STANDARD_VARIATION_TOLERANCE
+ else:
+ format_func = DYNAMIC_QUESTIONS_FORMAT_FUNCS[question_dict["dynamic_id"]]
+
+ quiz_entry = format_func(
+ question_dict["question"],
+ question_dict["answer"],
+ )
+
+ question, answers = quiz_entry.question, quiz_entry.answer
+ answers = [answers]
+
+ var_tol = DYNAMICALLY_GEN_VARIATION_TOLERANCE
+
+ embed = discord.Embed(
+ colour=Colours.gold,
+ title=f"Question #{len(done_question)}",
+ description=question,
+ )
+
+ if img_url := question_dict.get("img_url"):
+ embed.set_image(url=img_url)
+
+ await ctx.send(embed=embed)
+
+ def check_func(variation_tolerance: int) -> Callable[[discord.Message], bool]:
+ def contains_correct_answer(m: discord.Message) -> bool:
+ return m.channel == ctx.channel and any(
+ fuzz.ratio(answer.lower(), m.content.lower()) > variation_tolerance
+ for answer in answers
+ )
+
+ return contains_correct_answer
+
+ try:
+ msg = await self.bot.wait_for("message", check=check_func(var_tol), timeout=10)
+ except asyncio.TimeoutError:
+ # In case of TimeoutError and the game has been stopped, then do nothing.
+ if not self.game_status[ctx.channel.id]:
+ break
+
+ if hint_no < 2:
+ hint_no += 1
+
+ if "hints" in question_dict:
+ hints = question_dict["hints"]
+
+ await ctx.send(f"**Hint #{hint_no}\n**{hints[hint_no - 1]}")
+ else:
+ await ctx.send(f"{30 - hint_no * 10}s left!")
+
+ # Once hint or time alerts has been sent 2 times, the hint_no value will be 3
+ # If hint_no > 2, then it means that all hints/time alerts have been sent.
+ # Also means that the answer is not yet given and the bot sends the answer and the next question.
+ else:
+ if self.game_status[ctx.channel.id] is False:
+ break
+
+ response = random.choice(WRONG_ANS_RESPONSE)
+ await ctx.send(response)
+
+ await self.send_answer(
+ ctx.channel,
+ answers,
+ False,
+ question_dict,
+ self.question_limit - len(done_question) + 1,
+ )
+ await asyncio.sleep(1)
+
+ hint_no = 0 # Reset the hint counter so that on the next round, it's in the initial state
+
+ await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id])
+ await asyncio.sleep(2)
+ else:
+ if self.game_status[ctx.channel.id] is False:
+ break
+
+ points = 100 - 25 * hint_no
+ if msg.author in self.game_player_scores[ctx.channel.id]:
+ self.game_player_scores[ctx.channel.id][msg.author] += points
+ else:
+ self.game_player_scores[ctx.channel.id][msg.author] = points
+
+ # Also updating the overall scoreboard.
+ if msg.author in self.player_scores:
+ self.player_scores[msg.author] += points
+ else:
+ self.player_scores[msg.author] = points
+
+ hint_no = 0
+
+ await ctx.send(f"{msg.author.mention} got the correct answer :tada: {points} points!")
+
+ await self.send_answer(
+ ctx.channel,
+ answers,
+ True,
+ question_dict,
+ self.question_limit - len(done_question) + 1,
+ )
+ await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id])
+
+ await asyncio.sleep(2)
+
+ def make_start_embed(self, category: str) -> discord.Embed:
+ """Generate a starting/introduction embed for the quiz."""
+ start_embed = discord.Embed(
+ colour=Colours.blue,
+ title="A quiz game is starting!",
+ description=(
+ f"This game consists of {self.question_limit + 1} questions.\n\n"
+ "**Rules: **\n"
+ "1. Only enclose your answer in backticks when the question tells you to.\n"
+ "2. If the question specifies an answer format, follow it or else it won't be accepted.\n"
+ "3. You have 30s per question. Points for each question reduces by 25 after 10s or after a hint.\n"
+ "4. No cheating and have fun!\n\n"
+ f"**Category**: {category}"
+ ),
+ )
+
+ return start_embed
+
+ @staticmethod
+ def make_error_embed(desc: str) -> discord.Embed:
+ """Generate an error embed with the given description."""
+ error_embed = discord.Embed(
+ colour=Colours.soft_red,
+ title=random.choice(NEGATIVE_REPLIES),
+ description=desc,
+ )
+
+ return error_embed
+
+ @quiz_game.command(name="stop")
+ async def stop_quiz(self, ctx: commands.Context) -> None:
+ """
+ Stop a quiz game if its running in the channel.
+
+ Note: Only mods or the owner of the quiz can stop it.
+ """
+ try:
+ if self.game_status[ctx.channel.id]:
+ # Check if the author is the game starter or a moderator.
+ if ctx.author == self.game_owners[ctx.channel.id] or any(
+ Roles.moderator == role.id for role in ctx.author.roles
+ ):
+ self.game_status[ctx.channel.id] = False
+ del self.game_owners[ctx.channel.id]
+ self.game_player_scores[ctx.channel.id] = {}
+
+ await ctx.send("Quiz stopped.")
+ await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id])
+
+ else:
+ await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!")
+ else:
+ await ctx.send("No quiz running.")
+ except KeyError:
+ await ctx.send("No quiz running.")
+
+ @quiz_game.command(name="leaderboard")
+ async def leaderboard(self, ctx: commands.Context) -> None:
+ """View everyone's score for this bot session."""
+ await self.send_score(ctx.channel, self.player_scores)
+
+ @staticmethod
+ async def send_score(channel: discord.TextChannel, player_data: dict) -> None:
+ """Send the current scores of players in the game channel."""
+ if len(player_data) == 0:
+ await channel.send("No one has made it onto the leaderboard yet.")
+ return
+
+ embed = discord.Embed(
+ colour=Colours.blue,
+ title="Score Board",
+ description="",
+ )
+
+ sorted_dict = sorted(player_data.items(), key=operator.itemgetter(1), reverse=True)
+ for item in sorted_dict:
+ embed.description += f"{item[0]}: {item[1]}\n"
+
+ await channel.send(embed=embed)
+
+ @staticmethod
+ async def declare_winner(channel: discord.TextChannel, player_data: dict) -> None:
+ """Announce the winner of the quiz in the game channel."""
+ if player_data:
+ highest_points = max(list(player_data.values()))
+ no_of_winners = list(player_data.values()).count(highest_points)
+
+ # Check if more than 1 player has highest points.
+ if no_of_winners > 1:
+ winners = []
+ points_copy = list(player_data.values()).copy()
+
+ for _ in range(no_of_winners):
+ index = points_copy.index(highest_points)
+ winners.append(list(player_data.keys())[index])
+ points_copy[index] = 0
+
+ winners_mention = " ".join(winner.mention for winner in winners)
+ else:
+ author_index = list(player_data.values()).index(highest_points)
+ winner = list(player_data.keys())[author_index]
+ winners_mention = winner.mention
+
+ await channel.send(
+ f"Congratulations {winners_mention} :tada: "
+ f"You have won this quiz game with a grand total of {highest_points} points!"
+ )
+
+ def category_embed(self) -> discord.Embed:
+ """Build an embed showing all available trivia categories."""
+ embed = discord.Embed(
+ colour=Colours.blue,
+ title="The available question categories are:",
+ description="",
+ )
+
+ embed.set_footer(text="If a category is not chosen, a random one will be selected.")
+
+ for cat, description in self.categories.items():
+ embed.description += (
+ f"**- {cat.capitalize()}**\n"
+ f"{description.capitalize()}\n"
+ )
+
+ return embed
+
+ @staticmethod
+ async def send_answer(
+ channel: discord.TextChannel,
+ answers: list[str],
+ answer_is_correct: bool,
+ question_dict: dict,
+ q_left: int,
+ ) -> None:
+ """Send the correct answer of a question to the game channel."""
+ info = question_dict.get("info")
+
+ plurality = " is" if len(answers) == 1 else "s are"
+
+ embed = discord.Embed(
+ color=Colours.bright_green,
+ title=(
+ ("You got it! " if answer_is_correct else "")
+ + f"The correct answer{plurality} **`{', '.join(answers)}`**\n"
+ ),
+ description="",
+ )
+
+ if info is not None:
+ embed.description += f"**Information**\n{info}\n\n"
+
+ embed.description += (
+ ("Let's move to the next question." if q_left > 0 else "")
+ + f"\nRemaining questions: {q_left}"
+ )
+ await channel.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the TriviaQuiz cog."""
+ bot.add_cog(TriviaQuiz(bot))
diff --git a/bot/exts/fun/wonder_twins.py b/bot/exts/fun/wonder_twins.py
new file mode 100644
index 00000000..79d6b6d9
--- /dev/null
+++ b/bot/exts/fun/wonder_twins.py
@@ -0,0 +1,49 @@
+import random
+from pathlib import Path
+
+import yaml
+from discord.ext.commands import Cog, Context, command
+
+from bot.bot import Bot
+
+
+class WonderTwins(Cog):
+ """Cog for a Wonder Twins inspired command."""
+
+ def __init__(self):
+ with open(Path.cwd() / "bot" / "resources" / "fun" / "wonder_twins.yaml", "r", encoding="utf-8") as f:
+ info = yaml.load(f, Loader=yaml.FullLoader)
+ self.water_types = info["water_types"]
+ self.objects = info["objects"]
+ self.adjectives = info["adjectives"]
+
+ @staticmethod
+ def append_onto(phrase: str, insert_word: str) -> str:
+ """Appends one word onto the end of another phrase in order to format with the proper determiner."""
+ if insert_word.endswith("s"):
+ phrase = phrase.split()
+ del phrase[0]
+ phrase = " ".join(phrase)
+
+ insert_word = insert_word.split()[-1]
+ return " ".join([phrase, insert_word])
+
+ def format_phrase(self) -> str:
+ """Creates a transformation phrase from available words."""
+ adjective = random.choice((None, random.choice(self.adjectives)))
+ object_name = random.choice(self.objects)
+ water_type = random.choice(self.water_types)
+
+ if adjective:
+ object_name = self.append_onto(adjective, object_name)
+ return f"{object_name} of {water_type}"
+
+ @command(name="formof", aliases=("wondertwins", "wondertwin", "fo"))
+ async def form_of(self, ctx: Context) -> None:
+ """Command to send a Wonder Twins inspired phrase to the user invoking the command."""
+ await ctx.send(f"Form of {self.format_phrase()}!")
+
+
+def setup(bot: Bot) -> None:
+ """Load the WonderTwins cog."""
+ bot.add_cog(WonderTwins())
diff --git a/bot/exts/fun/xkcd.py b/bot/exts/fun/xkcd.py
new file mode 100644
index 00000000..b56c53d9
--- /dev/null
+++ b/bot/exts/fun/xkcd.py
@@ -0,0 +1,91 @@
+import logging
+import re
+from random import randint
+from typing import Optional, Union
+
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import Cog, Context, command
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+log = logging.getLogger(__name__)
+
+COMIC_FORMAT = re.compile(r"latest|[0-9]+")
+BASE_URL = "https://xkcd.com"
+
+
+class XKCD(Cog):
+ """Retrieving XKCD comics."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.latest_comic_info: dict[str, Union[str, int]] = {}
+ self.get_latest_comic_info.start()
+
+ def cog_unload(self) -> None:
+ """Cancels refreshing of the task for refreshing the most recent comic info."""
+ self.get_latest_comic_info.cancel()
+
+ @tasks.loop(minutes=30)
+ async def get_latest_comic_info(self) -> None:
+ """Refreshes latest comic's information ever 30 minutes. Also used for finding a random comic."""
+ async with self.bot.http_session.get(f"{BASE_URL}/info.0.json") as resp:
+ if resp.status == 200:
+ self.latest_comic_info = await resp.json()
+ else:
+ log.debug(f"Failed to get latest XKCD comic information. Status code {resp.status}")
+
+ @command(name="xkcd")
+ async def fetch_xkcd_comics(self, ctx: Context, comic: Optional[str]) -> None:
+ """
+ Getting an xkcd comic's information along with the image.
+
+ To get a random comic, don't type any number as an argument. To get the latest, type 'latest'.
+ """
+ embed = Embed(title=f"XKCD comic '{comic}'")
+
+ embed.colour = Colours.soft_red
+
+ if comic and (comic := re.match(COMIC_FORMAT, comic)) is None:
+ embed.description = "Comic parameter should either be an integer or 'latest'."
+ await ctx.send(embed=embed)
+ return
+
+ comic = randint(1, self.latest_comic_info["num"]) if comic is None else comic.group(0)
+
+ if comic == "latest":
+ info = self.latest_comic_info
+ else:
+ async with self.bot.http_session.get(f"{BASE_URL}/{comic}/info.0.json") as resp:
+ if resp.status == 200:
+ info = await resp.json()
+ else:
+ embed.title = f"XKCD comic #{comic}"
+ embed.description = f"{resp.status}: Could not retrieve xkcd comic #{comic}."
+ log.debug(f"Retrieving xkcd comic #{comic} failed with status code {resp.status}.")
+ await ctx.send(embed=embed)
+ return
+
+ embed.title = f"XKCD comic #{info['num']}"
+ embed.description = info["alt"]
+ embed.url = f"{BASE_URL}/{info['num']}"
+
+ if info["img"][-3:] in ("jpg", "png", "gif"):
+ embed.set_image(url=info["img"])
+ date = f"{info['year']}/{info['month']}/{info['day']}"
+ embed.set_footer(text=f"{date} - #{info['num']}, \'{info['safe_title']}\'")
+ embed.colour = Colours.soft_green
+ else:
+ embed.description = (
+ "The selected comic is interactive, and cannot be displayed within an embed.\n"
+ f"Comic can be viewed [here](https://xkcd.com/{info['num']})."
+ )
+
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the XKCD cog."""
+ bot.add_cog(XKCD(bot))