aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/evergreen
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/evergreen')
-rw-r--r--bot/exts/evergreen/8bitify.py54
-rw-r--r--bot/exts/evergreen/__init__.py0
-rw-r--r--bot/exts/evergreen/battleship.py443
-rw-r--r--bot/exts/evergreen/bookmark.py64
-rw-r--r--bot/exts/evergreen/branding.py543
-rw-r--r--bot/exts/evergreen/error_handler.py129
-rw-r--r--bot/exts/evergreen/fun.py147
-rw-r--r--bot/exts/evergreen/game.py424
-rw-r--r--bot/exts/evergreen/help.py552
-rw-r--r--bot/exts/evergreen/issues.py76
-rw-r--r--bot/exts/evergreen/magic_8ball.py31
-rw-r--r--bot/exts/evergreen/minesweeper.py284
-rw-r--r--bot/exts/evergreen/movie.py198
-rw-r--r--bot/exts/evergreen/recommend_game.py50
-rw-r--r--bot/exts/evergreen/reddit.py128
-rw-r--r--bot/exts/evergreen/showprojects.py33
-rw-r--r--bot/exts/evergreen/snakes/__init__.py12
-rw-r--r--bot/exts/evergreen/snakes/converter.py85
-rw-r--r--bot/exts/evergreen/snakes/snakes_cog.py1149
-rw-r--r--bot/exts/evergreen/snakes/utils.py716
-rw-r--r--bot/exts/evergreen/space.py240
-rw-r--r--bot/exts/evergreen/speedrun.py27
-rw-r--r--bot/exts/evergreen/trivia_quiz.py302
-rw-r--r--bot/exts/evergreen/uptime.py33
24 files changed, 5720 insertions, 0 deletions
diff --git a/bot/exts/evergreen/8bitify.py b/bot/exts/evergreen/8bitify.py
new file mode 100644
index 00000000..60062fc1
--- /dev/null
+++ b/bot/exts/evergreen/8bitify.py
@@ -0,0 +1,54 @@
+from io import BytesIO
+
+import discord
+from PIL import Image
+from discord.ext import commands
+
+
+class EightBitify(commands.Cog):
+ """Make your avatar 8bit!"""
+
+ def __init__(self, bot: commands.Bot) -> None:
+ self.bot = bot
+
+ @staticmethod
+ def pixelate(image: Image) -> Image:
+ """Takes an image and pixelates it."""
+ return image.resize((32, 32)).resize((1024, 1024))
+
+ @staticmethod
+ def quantize(image: Image) -> Image:
+ """Reduces colour palette to 256 colours."""
+ return image.quantize(colors=32)
+
+ @commands.command(name="8bitify")
+ async def eightbit_command(self, ctx: commands.Context) -> None:
+ """Pixelates your avatar and changes the palette to an 8bit one."""
+ async with ctx.typing():
+ image_bytes = await ctx.author.avatar_url.read()
+ avatar = Image.open(BytesIO(image_bytes))
+ avatar = avatar.convert("RGBA").resize((1024, 1024))
+
+ eightbit = self.pixelate(avatar)
+ eightbit = self.quantize(eightbit)
+
+ bufferedio = BytesIO()
+ eightbit.save(bufferedio, format="PNG")
+ bufferedio.seek(0)
+
+ file = discord.File(bufferedio, filename="8bitavatar.png")
+
+ embed = discord.Embed(
+ title="Your 8-bit avatar",
+ description='Here is your avatar. I think it looks all cool and "retro"'
+ )
+
+ embed.set_image(url="attachment://8bitavatar.png")
+ embed.set_footer(text=f"Made by {ctx.author.display_name}", icon_url=ctx.author.avatar_url)
+
+ await ctx.send(file=file, embed=embed)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Cog load."""
+ bot.add_cog(EightBitify(bot))
diff --git a/bot/exts/evergreen/__init__.py b/bot/exts/evergreen/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/evergreen/__init__.py
diff --git a/bot/exts/evergreen/battleship.py b/bot/exts/evergreen/battleship.py
new file mode 100644
index 00000000..9bc374e6
--- /dev/null
+++ b/bot/exts/evergreen/battleship.py
@@ -0,0 +1,443 @@
+import asyncio
+import logging
+import random
+import re
+import typing
+from dataclasses import dataclass
+from functools import partial
+
+import discord
+from discord.ext import commands
+
+from bot.constants import Colours
+
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class Square:
+ """Each square on the battleship grid - if they contain a boat and if they've been aimed at."""
+
+ boat: typing.Optional[str]
+ aimed: bool
+
+
+Grid = typing.List[typing.List[Square]]
+EmojiSet = typing.Dict[typing.Tuple[bool, bool], str]
+
+
+@dataclass
+class Player:
+ """Each player in the game - their messages for the boards and their current grid."""
+
+ user: discord.Member
+ board: discord.Message
+ opponent_board: discord.Message
+ grid: Grid
+
+
+# The name of the ship and its size
+SHIPS = {
+ "Carrier": 5,
+ "Battleship": 4,
+ "Cruiser": 3,
+ "Submarine": 3,
+ "Destroyer": 2,
+}
+
+
+# For these two variables, the first boolean is whether the square is a ship (True) or not (False).
+# The second boolean is whether the player has aimed for that square (True) or not (False)
+
+# This is for the player's own board which shows the location of their own ships.
+SHIP_EMOJIS = {
+ (True, True): ":fire:",
+ (True, False): ":ship:",
+ (False, True): ":anger:",
+ (False, False): ":ocean:",
+}
+
+# This is for the opposing player's board which only shows aimed locations.
+HIDDEN_EMOJIS = {
+ (True, True): ":red_circle:",
+ (True, False): ":black_circle:",
+ (False, True): ":white_circle:",
+ (False, False): ":black_circle:",
+}
+
+# For the top row of the board
+LETTERS = (
+ ":stop_button::regional_indicator_a::regional_indicator_b::regional_indicator_c::regional_indicator_d:"
+ ":regional_indicator_e::regional_indicator_f::regional_indicator_g::regional_indicator_h:"
+ ":regional_indicator_i::regional_indicator_j:"
+)
+
+# For the first column of the board
+NUMBERS = [
+ ":one:",
+ ":two:",
+ ":three:",
+ ":four:",
+ ":five:",
+ ":six:",
+ ":seven:",
+ ":eight:",
+ ":nine:",
+ ":keycap_ten:",
+]
+
+CROSS_EMOJI = "\u274e"
+HAND_RAISED_EMOJI = "\U0001f64b"
+
+
+class Game:
+ """A Battleship Game."""
+
+ def __init__(
+ self,
+ bot: commands.Bot,
+ channel: discord.TextChannel,
+ player1: discord.Member,
+ player2: discord.Member
+ ) -> None:
+
+ self.bot = bot
+ self.public_channel = channel
+
+ self.p1 = Player(player1, None, None, self.generate_grid())
+ self.p2 = Player(player2, None, None, self.generate_grid())
+
+ self.gameover: bool = False
+
+ self.turn: typing.Optional[discord.Member] = None
+ self.next: typing.Optional[discord.Member] = None
+
+ self.match: typing.Optional[typing.Match] = None
+ self.surrender: bool = False
+
+ self.setup_grids()
+
+ @staticmethod
+ def generate_grid() -> Grid:
+ """Generates a grid by instantiating the Squares."""
+ return [[Square(None, False) for _ in range(10)] for _ in range(10)]
+
+ @staticmethod
+ def format_grid(player: Player, emojiset: EmojiSet) -> str:
+ """
+ Gets and formats the grid as a list into a string to be output to the DM.
+
+ Also adds the Letter and Number indexes.
+ """
+ grid = [
+ [emojiset[bool(square.boat), square.aimed] for square in row]
+ for row in player.grid
+ ]
+
+ rows = ["".join([number] + row) for number, row in zip(NUMBERS, grid)]
+ return "\n".join([LETTERS] + rows)
+
+ @staticmethod
+ def get_square(grid: Grid, square: str) -> Square:
+ """Grabs a square from a grid with an inputted key."""
+ index = ord(square[0]) - ord("A")
+ number = int(square[1:])
+
+ return grid[number-1][index] # -1 since lists are indexed from 0
+
+ async def game_over(
+ self,
+ *,
+ winner: discord.Member,
+ loser: discord.Member
+ ) -> None:
+ """Removes games from list of current games and announces to public chat."""
+ await self.public_channel.send(f"Game Over! {winner.mention} won against {loser.mention}")
+
+ for player in (self.p1, self.p2):
+ grid = self.format_grid(player, SHIP_EMOJIS)
+ await self.public_channel.send(f"{player.user}'s Board:\n{grid}")
+
+ @staticmethod
+ def check_sink(grid: Grid, boat: str) -> bool:
+ """Checks if all squares containing a given boat have sunk."""
+ return all(square.aimed for row in grid for square in row if square.boat == boat)
+
+ @staticmethod
+ def check_gameover(grid: Grid) -> bool:
+ """Checks if all boats have been sunk."""
+ return all(square.aimed for row in grid for square in row if square.boat)
+
+ def setup_grids(self) -> None:
+ """Places the boats on the grids to initialise the game."""
+ for player in (self.p1, self.p2):
+ for name, size in SHIPS.items():
+ while True: # Repeats if about to overwrite another boat
+ ship_collision = False
+ coords = []
+
+ coord1 = random.randint(0, 9)
+ coord2 = random.randint(0, 10 - size)
+
+ if random.choice((True, False)): # Vertical or Horizontal
+ x, y = coord1, coord2
+ xincr, yincr = 0, 1
+ else:
+ x, y = coord2, coord1
+ xincr, yincr = 1, 0
+
+ for i in range(size):
+ new_x = x + (xincr * i)
+ new_y = y + (yincr * i)
+ if player.grid[new_x][new_y].boat: # Check if there's already a boat
+ ship_collision = True
+ break
+ coords.append((new_x, new_y))
+ if not ship_collision: # If not overwriting any other boat spaces, break loop
+ break
+
+ for x, y in coords:
+ player.grid[x][y].boat = name
+
+ async def print_grids(self) -> None:
+ """Prints grids to the DM channels."""
+ # Convert squares into Emoji
+
+ boards = [
+ self.format_grid(player, emojiset)
+ for emojiset in (HIDDEN_EMOJIS, SHIP_EMOJIS)
+ for player in (self.p1, self.p2)
+ ]
+
+ locations = (
+ (self.p2, "opponent_board"), (self.p1, "opponent_board"),
+ (self.p1, "board"), (self.p2, "board")
+ )
+
+ for board, location in zip(boards, locations):
+ player, attr = location
+ if getattr(player, attr):
+ await getattr(player, attr).edit(content=board)
+ else:
+ setattr(player, attr, await player.user.send(board))
+
+ def predicate(self, message: discord.Message) -> bool:
+ """Predicate checking the message typed for each turn."""
+ if message.author == self.turn.user and message.channel == self.turn.user.dm_channel:
+ if message.content.lower() == "surrender":
+ self.surrender = True
+ return True
+ self.match = re.match("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip())
+ if not self.match:
+ self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI))
+ return bool(self.match)
+
+ async def take_turn(self) -> typing.Optional[Square]:
+ """Lets the player who's turn it is choose a square."""
+ square = None
+ turn_message = await self.turn.user.send(
+ "It's your turn! Type the square you want to fire at. Format it like this: A1\n"
+ "Type `surrender` to give up"
+ )
+ await self.next.user.send("Their turn", delete_after=3.0)
+ while True:
+ try:
+ await self.bot.wait_for("message", check=self.predicate, timeout=60.0)
+ except asyncio.TimeoutError:
+ await self.turn.user.send("You took too long. Game over!")
+ await self.next.user.send(f"{self.turn.user} took too long. Game over!")
+ await self.public_channel.send(
+ f"Game over! {self.turn.user.mention} timed out so {self.next.user.mention} wins!"
+ )
+ self.gameover = True
+ break
+ else:
+ if self.surrender:
+ await self.next.user.send(f"{self.turn.user} surrendered. Game over!")
+ await self.public_channel.send(
+ f"Game over! {self.turn.user.mention} surrendered to {self.next.user.mention}!"
+ )
+ self.gameover = True
+ break
+ square = self.get_square(self.next.grid, self.match.string)
+ if square.aimed:
+ await self.turn.user.send("You've already aimed at this square!", delete_after=3.0)
+ else:
+ break
+ await turn_message.delete()
+ return square
+
+ async def hit(self, square: Square, alert_messages: typing.List[discord.Message]) -> None:
+ """Occurs when a player successfully aims for a ship."""
+ await self.turn.user.send("Hit!", delete_after=3.0)
+ alert_messages.append(await self.next.user.send("Hit!"))
+ if self.check_sink(self.next.grid, square.boat):
+ await self.turn.user.send(f"You've sunk their {square.boat} ship!", delete_after=3.0)
+ alert_messages.append(await self.next.user.send(f"Oh no! Your {square.boat} ship sunk!"))
+ if self.check_gameover(self.next.grid):
+ await self.turn.user.send("You win!")
+ await self.next.user.send("You lose!")
+ self.gameover = True
+ await self.game_over(winner=self.turn.user, loser=self.next.user)
+
+ async def start_game(self) -> None:
+ """Begins the game."""
+ await self.p1.user.send(f"You're playing battleship with {self.p2.user}.")
+ await self.p2.user.send(f"You're playing battleship with {self.p1.user}.")
+
+ alert_messages = []
+
+ self.turn = self.p1
+ self.next = self.p2
+
+ while True:
+ await self.print_grids()
+
+ if self.gameover:
+ return
+
+ square = await self.take_turn()
+ if not square:
+ return
+ square.aimed = True
+
+ for message in alert_messages:
+ await message.delete()
+
+ alert_messages = []
+ alert_messages.append(await self.next.user.send(f"{self.turn.user} aimed at {self.match.string}!"))
+
+ if square.boat:
+ await self.hit(square, alert_messages)
+ if self.gameover:
+ return
+ else:
+ await self.turn.user.send("Miss!", delete_after=3.0)
+ alert_messages.append(await self.next.user.send("Miss!"))
+
+ self.turn, self.next = self.next, self.turn
+
+
+class Battleship(commands.Cog):
+ """Play the classic game Battleship!"""
+
+ def __init__(self, bot: commands.Bot) -> None:
+ self.bot = bot
+ self.games: typing.List[Game] = []
+ self.waiting: typing.List[discord.Member] = []
+
+ def predicate(
+ self,
+ ctx: commands.Context,
+ announcement: discord.Message,
+ reaction: discord.Reaction,
+ user: discord.Member
+ ) -> bool:
+ """Predicate checking the criteria for the announcement message."""
+ if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2
+ return True # Is dealt with later on
+ if (
+ user.id not in (ctx.me.id, ctx.author.id)
+ and str(reaction.emoji) == HAND_RAISED_EMOJI
+ and reaction.message.id == announcement.id
+ ):
+ if self.already_playing(user):
+ self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!"))
+ self.bot.loop.create_task(announcement.remove_reaction(reaction, user))
+ return False
+
+ if user in self.waiting:
+ self.bot.loop.create_task(ctx.send(
+ f"{user.mention} Please cancel your game first before joining another one."
+ ))
+ self.bot.loop.create_task(announcement.remove_reaction(reaction, user))
+ return False
+
+ return True
+
+ if (
+ user.id == ctx.author.id
+ and str(reaction.emoji) == CROSS_EMOJI
+ and reaction.message.id == announcement.id
+ ):
+ return True
+ return False
+
+ def already_playing(self, player: discord.Member) -> bool:
+ """Check if someone is already in a game."""
+ return any(player in (game.p1.user, game.p2.user) for game in self.games)
+
+ @commands.group(invoke_without_command=True)
+ @commands.guild_only()
+ async def battleship(self, ctx: commands.Context) -> None:
+ """
+ Play a game of Battleship with someone else!
+
+ This will set up a message waiting for someone else to react and play along.
+ The game takes place entirely in DMs.
+ Make sure you have your DMs open so that the bot can message you.
+ """
+ if self.already_playing(ctx.author):
+ return await ctx.send("You're already playing a game!")
+
+ if ctx.author in self.waiting:
+ return await ctx.send("You've already sent out a request for a player 2")
+
+ announcement = await ctx.send(
+ "**Battleship**: A new game is about to start!\n"
+ f"Press {HAND_RAISED_EMOJI} to play against {ctx.author.mention}!\n"
+ f"(Cancel the game with {CROSS_EMOJI}.)"
+ )
+ self.waiting.append(ctx.author)
+ await announcement.add_reaction(HAND_RAISED_EMOJI)
+ await announcement.add_reaction(CROSS_EMOJI)
+
+ try:
+ reaction, user = await self.bot.wait_for(
+ "reaction_add",
+ check=partial(self.predicate, ctx, announcement),
+ timeout=60.0
+ )
+ except asyncio.TimeoutError:
+ self.waiting.remove(ctx.author)
+ await announcement.delete()
+ return await ctx.send(f"{ctx.author.mention} Seems like there's no one here to play...")
+
+ if str(reaction.emoji) == CROSS_EMOJI:
+ self.waiting.remove(ctx.author)
+ await announcement.delete()
+ return await ctx.send(f"{ctx.author.mention} Game cancelled.")
+
+ await announcement.delete()
+ self.waiting.remove(ctx.author)
+ if self.already_playing(ctx.author):
+ return
+ try:
+ game = Game(self.bot, ctx.channel, ctx.author, user)
+ self.games.append(game)
+ await game.start_game()
+ self.games.remove(game)
+ except discord.Forbidden:
+ await ctx.send(
+ f"{ctx.author.mention} {user.mention} "
+ "Game failed. This is likely due to you not having your DMs open. Check and try again."
+ )
+ self.games.remove(game)
+ except Exception:
+ # End the game in the event of an unforseen error so the players aren't stuck in a game
+ await ctx.send(f"{ctx.author.mention} {user.mention} An error occurred. Game failed")
+ self.games.remove(game)
+ raise
+
+ @battleship.command(name="ships", aliases=["boats"])
+ async def battleship_ships(self, ctx: commands.Context) -> None:
+ """Lists the ships that are found on the battleship grid."""
+ embed = discord.Embed(colour=Colours.blue)
+ embed.add_field(name="Name", value="\n".join(SHIPS))
+ embed.add_field(name="Size", value="\n".join(str(size) for size in SHIPS.values()))
+ await ctx.send(embed=embed)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Cog load."""
+ bot.add_cog(Battleship(bot))
diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py
new file mode 100644
index 00000000..73908702
--- /dev/null
+++ b/bot/exts/evergreen/bookmark.py
@@ -0,0 +1,64 @@
+import logging
+import random
+
+import discord
+from discord.ext import commands
+
+from bot.constants import Colours, ERROR_REPLIES, Emojis, Icons
+
+log = logging.getLogger(__name__)
+
+
+class Bookmark(commands.Cog):
+ """Creates personal bookmarks by relaying a message link to the user's DMs."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @commands.command(name="bookmark", aliases=("bm", "pin"))
+ async def bookmark(
+ self,
+ ctx: commands.Context,
+ target_message: discord.Message,
+ *,
+ title: str = "Bookmark"
+ ) -> None:
+ """Send the author a link to `target_message` via DMs."""
+ # Prevent users from bookmarking a message in a channel they don't have access to
+ permissions = ctx.author.permissions_in(target_message.channel)
+ if not permissions.read_messages:
+ log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions")
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description="You don't have permission to view this channel."
+ )
+ await ctx.send(embed=embed)
+ return
+
+ embed = discord.Embed(
+ title=title,
+ colour=Colours.soft_green,
+ description=target_message.content
+ )
+ embed.add_field(name="Wanna give it a visit?", value=f"[Visit original message]({target_message.jump_url})")
+ embed.set_author(name=target_message.author, icon_url=target_message.author.avatar_url)
+ embed.set_thumbnail(url=Icons.bookmark)
+
+ try:
+ await ctx.author.send(embed=embed)
+ except discord.Forbidden:
+ error_embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ description=f"{ctx.author.mention}, please enable your DMs to receive the bookmark",
+ colour=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ else:
+ log.info(f"{ctx.author} bookmarked {target_message.jump_url} with title '{title}'")
+ await ctx.message.add_reaction(Emojis.envelope)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the Bookmark cog."""
+ bot.add_cog(Bookmark(bot))
diff --git a/bot/exts/evergreen/branding.py b/bot/exts/evergreen/branding.py
new file mode 100644
index 00000000..72f31042
--- /dev/null
+++ b/bot/exts/evergreen/branding.py
@@ -0,0 +1,543 @@
+import asyncio
+import itertools
+import json
+import logging
+import random
+import typing as t
+from datetime import datetime, time, timedelta
+from pathlib import Path
+
+import arrow
+import discord
+from discord.embeds import EmptyEmbed
+from discord.ext import commands
+
+from bot.bot import SeasonalBot
+from bot.constants import Branding, Colours, Emojis, MODERATION_ROLES, Tokens
+from bot.seasons import SeasonBase, get_all_seasons, get_current_season, get_season
+from bot.utils import human_months
+from bot.utils.decorators import with_role
+from bot.utils.exceptions import BrandingError
+from bot.utils.persist import make_persistent
+
+log = logging.getLogger(__name__)
+
+STATUS_OK = 200 # HTTP status code
+
+FILE_BANNER = "banner.png"
+FILE_AVATAR = "avatar.png"
+SERVER_ICONS = "server_icons"
+
+BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents"
+
+PARAMS = {"ref": "master"} # Target branch
+HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3
+
+# A GitHub token is not necessary for the cog to operate,
+# unauthorized requests are however limited to 60 per hour
+if Tokens.github:
+ HEADERS["Authorization"] = f"token {Tokens.github}"
+
+
+class GitHubFile(t.NamedTuple):
+ """
+ Represents a remote file on GitHub.
+
+ The `sha` hash is kept so that we can determine that a file has changed,
+ despite its filename remaining unchanged.
+ """
+
+ download_url: str
+ path: str
+ sha: str
+
+
+def pretty_files(files: t.Iterable[GitHubFile]) -> str:
+ """Provide a human-friendly representation of `files`."""
+ return "\n".join(file.path for file in files)
+
+
+def time_until_midnight() -> timedelta:
+ """
+ Determine amount of time until the next-up UTC midnight.
+
+ The exact `midnight` moment is actually delayed to 5 seconds after, in order
+ to avoid potential problems due to imprecise sleep.
+ """
+ now = datetime.utcnow()
+ tomorrow = now + timedelta(days=1)
+ midnight = datetime.combine(tomorrow, time(second=5))
+
+ return midnight - now
+
+
+class BrandingManager(commands.Cog):
+ """
+ Manages the guild's branding.
+
+ The purpose of this cog is to help automate the synchronization of the branding
+ repository with the guild. It is capable of discovering assets in the repository
+ via GitHub's API, resolving download urls for them, and delegating
+ to the `bot` instance to upload them to the guild.
+
+ BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens
+ once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single
+ season. The daemon can be turned on and off via the `daemon` cmd group. The value set via
+ its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will
+ automatically start on the next bot start-up. Otherwise, it will wait to be started manually.
+
+ All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can
+ also be invoked manually, via the following API:
+
+ branding list
+ - Show all available seasons
+
+ branding set <season_name>
+ - Set the cog's internal state to represent `season_name`, if it exists
+ - If no `season_name` is given, set chronologically current season
+ - This will not automatically apply the season's branding to the guild,
+ the cog's state can be detached from the guild
+ - Seasons can therefore be 'previewed' using this command
+
+ branding info
+ - View detailed information about resolved assets for current season
+
+ branding refresh
+ - Refresh internal state, i.e. synchronize with branding repository
+
+ branding apply
+ - Apply the current internal state to the guild, i.e. upload the assets
+
+ branding cycle
+ - If there are multiple available icons for current season, randomly pick
+ and apply the next one
+
+ The daemon calls these methods autonomously as appropriate. The use of this cog
+ is locked to moderation roles. As it performs media asset uploads, it is prone to
+ rate-limits - the `apply` command should be used with caution. The `set` command can,
+ however, be used freely to 'preview' seasonal branding and check whether paths have been
+ resolved as appropriate.
+
+ While the bot is in debug mode, it will 'mock' asset uploads by logging the passed
+ download urls and pretending that the upload was successful. Make use of this
+ to test this cog's behaviour.
+ """
+
+ current_season: t.Type[SeasonBase]
+
+ banner: t.Optional[GitHubFile]
+ avatar: t.Optional[GitHubFile]
+
+ available_icons: t.List[GitHubFile]
+ remaining_icons: t.List[GitHubFile]
+
+ days_since_cycle: t.Iterator
+
+ config_file: Path
+
+ daemon: t.Optional[asyncio.Task]
+
+ def __init__(self, bot: SeasonalBot) -> None:
+ """
+ Assign safe default values on init.
+
+ At this point, we don't have information about currently available branding.
+ Most of these attributes will be overwritten once the daemon connects, or once
+ the `refresh` command is used.
+ """
+ self.bot = bot
+ self.current_season = get_current_season()
+
+ self.banner = None
+ self.avatar = None
+
+ self.available_icons = []
+ self.remaining_icons = []
+
+ self.days_since_cycle = itertools.cycle([None])
+
+ self.config_file = make_persistent(Path("bot", "resources", "evergreen", "branding.json"))
+ should_run = self._read_config()["daemon_active"]
+
+ if should_run:
+ self.daemon = self.bot.loop.create_task(self._daemon_func())
+ else:
+ self.daemon = None
+
+ @property
+ def _daemon_running(self) -> bool:
+ """True if the daemon is currently active, False otherwise."""
+ return self.daemon is not None and not self.daemon.done()
+
+ def _read_config(self) -> t.Dict[str, bool]:
+ """Read and return persistent config file."""
+ with self.config_file.open("r") as persistent_file:
+ return json.load(persistent_file)
+
+ def _write_config(self, key: str, value: bool) -> None:
+ """Write a `key`, `value` pair to persistent config file."""
+ current_config = self._read_config()
+ current_config[key] = value
+
+ with self.config_file.open("w") as persistent_file:
+ json.dump(current_config, persistent_file)
+
+ async def _daemon_func(self) -> None:
+ """
+ Manage all automated behaviour of the BrandingManager cog.
+
+ Once a day, the daemon will perform the following tasks:
+ - Update `current_season`
+ - Poll GitHub API to see if the available branding for `current_season` has changed
+ - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname)
+ - Check whether it's time to cycle guild icons
+
+ The internal loop runs once when activated, then periodically at the time
+ given by `time_until_midnight`.
+
+ All method calls in the internal loop are considered safe, i.e. no errors propagate
+ to the daemon's loop. The daemon itself does not perform any error handling on its own.
+ """
+ await self.bot.wait_until_ready()
+
+ while True:
+ self.current_season = get_current_season()
+ branding_changed = await self.refresh()
+
+ if branding_changed:
+ await self.apply()
+
+ elif next(self.days_since_cycle) == Branding.cycle_frequency:
+ await self.cycle()
+
+ until_midnight = time_until_midnight()
+ await asyncio.sleep(until_midnight.total_seconds())
+
+ async def _info_embed(self) -> discord.Embed:
+ """Make an informative embed representing current season."""
+ info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour)
+
+ # If we're in a non-evergreen season, also show active months
+ if self.current_season is not SeasonBase:
+ title = f"{self.current_season.season_name} ({human_months(self.current_season.months)})"
+ else:
+ title = self.current_season.season_name
+
+ # Use the author field to show the season's name and avatar if available
+ info_embed.set_author(name=title, icon_url=self.avatar.download_url if self.avatar else EmptyEmbed)
+
+ banner = self.banner.path if self.banner is not None else "Unavailable"
+ info_embed.add_field(name="Banner", value=banner, inline=False)
+
+ avatar = self.avatar.path if self.avatar is not None else "Unavailable"
+ info_embed.add_field(name="Avatar", value=avatar, inline=False)
+
+ icons = pretty_files(self.available_icons) or "Unavailable"
+ info_embed.add_field(name="Available icons", value=icons, inline=False)
+
+ # Only display cycle frequency if we're actually cycling
+ if len(self.available_icons) > 1 and Branding.cycle_frequency:
+ info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}")
+
+ return info_embed
+
+ async def _reset_remaining_icons(self) -> None:
+ """Set `remaining_icons` to a shuffled copy of `available_icons`."""
+ self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons))
+
+ async def _reset_days_since_cycle(self) -> None:
+ """
+ Reset the `days_since_cycle` iterator based on configured frequency.
+
+ If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey,
+ the iterator will always yield None. This signals that the icon shouldn't be cycled.
+
+ Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely.
+ When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle.
+ """
+ if len(self.available_icons) > 1 and Branding.cycle_frequency:
+ sequence = range(1, Branding.cycle_frequency + 1)
+ else:
+ sequence = [None]
+
+ self.days_since_cycle = itertools.cycle(sequence)
+
+ async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]:
+ """
+ Get files at `path` in the branding repository.
+
+ If `include_dirs` is False (default), only returns files at `path`.
+ Otherwise, will return both files and directories. Never returns symlinks.
+
+ Return dict mapping from filename to corresponding `GitHubFile` instance.
+ This may return an empty dict if the response status is non-200,
+ or if the target directory is empty.
+ """
+ url = f"{BRANDING_URL}/{path}"
+ async with self.bot.http_session.get(url, headers=HEADERS, params=PARAMS) as resp:
+ # Short-circuit if we get non-200 response
+ if resp.status != STATUS_OK:
+ log.error(f"GitHub API returned non-200 response: {resp}")
+ return {}
+ directory = await resp.json() # Directory at `path`
+
+ allowed_types = {"file", "dir"} if include_dirs else {"file"}
+ return {
+ file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"])
+ for file in directory
+ if file["type"] in allowed_types
+ }
+
+ async def refresh(self) -> bool:
+ """
+ Synchronize available assets with branding repository.
+
+ If the current season is not the evergreen, and lacks at least one asset,
+ we use the evergreen seasonal dir as fallback for missing assets.
+
+ Finally, if neither the seasonal nor fallback branding directories contain
+ an asset, it will simply be ignored.
+
+ Return True if the branding has changed. This will be the case when we enter
+ a new season, or when something changes in the current seasons's directory
+ in the branding repository.
+ """
+ old_branding = (self.banner, self.avatar, self.available_icons)
+ seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True)
+
+ # Only make a call to the fallback directory if there is something to be gained
+ branding_incomplete = any(
+ asset not in seasonal_dir
+ for asset in (FILE_BANNER, FILE_AVATAR, SERVER_ICONS)
+ )
+ if branding_incomplete and self.current_season is not SeasonBase:
+ fallback_dir = await self._get_files(SeasonBase.branding_path, include_dirs=True)
+ else:
+ fallback_dir = {}
+
+ # Resolve assets in this directory, None is a safe value
+ self.banner = seasonal_dir.get(FILE_BANNER) or fallback_dir.get(FILE_BANNER)
+ self.avatar = seasonal_dir.get(FILE_AVATAR) or fallback_dir.get(FILE_AVATAR)
+
+ # Now resolve server icons by making a call to the proper sub-directory
+ if SERVER_ICONS in seasonal_dir:
+ icons_dir = await self._get_files(f"{self.current_season.branding_path}/{SERVER_ICONS}")
+ self.available_icons = list(icons_dir.values())
+
+ elif SERVER_ICONS in fallback_dir:
+ icons_dir = await self._get_files(f"{SeasonBase.branding_path}/{SERVER_ICONS}")
+ self.available_icons = list(icons_dir.values())
+
+ else:
+ self.available_icons = [] # This should never be the case, but an empty list is a safe value
+
+ # GitHubFile instances carry a `sha` attr so this will pick up if a file changes
+ branding_changed = old_branding != (self.banner, self.avatar, self.available_icons)
+
+ if branding_changed:
+ log.info(f"New branding detected (season: {self.current_season.season_name})")
+ await self._reset_remaining_icons()
+ await self._reset_days_since_cycle()
+
+ return branding_changed
+
+ async def cycle(self) -> bool:
+ """
+ Apply the next-up server icon.
+
+ Returns True if an icon is available and successfully gets applied, False otherwise.
+ """
+ if not self.available_icons:
+ log.info("Cannot cycle: no icons for this season")
+ return False
+
+ if not self.remaining_icons:
+ log.info("Reset & shuffle remaining icons")
+ await self._reset_remaining_icons()
+
+ next_up = self.remaining_icons.pop(0)
+ success = await self.bot.set_icon(next_up.download_url)
+
+ return success
+
+ async def apply(self) -> t.List[str]:
+ """
+ Apply current branding to the guild and bot.
+
+ This delegates to the bot instance to do all the work. We only provide download urls
+ for available assets. Assets unavailable in the branding repo will be ignored.
+
+ Returns a list of names of all failed assets. An asset is considered failed
+ if it isn't found in the branding repo, or if something goes wrong while the
+ bot is trying to apply it.
+
+ An empty list denotes that all assets have been applied successfully.
+ """
+ report = {asset: False for asset in ("banner", "avatar", "nickname", "icon")}
+
+ if self.banner is not None:
+ report["banner"] = await self.bot.set_banner(self.banner.download_url)
+
+ if self.avatar is not None:
+ report["avatar"] = await self.bot.set_avatar(self.avatar.download_url)
+
+ if self.current_season.bot_name:
+ report["nickname"] = await self.bot.set_nickname(self.current_season.bot_name)
+
+ report["icon"] = await self.cycle()
+
+ failed_assets = [asset for asset, succeeded in report.items() if not succeeded]
+ return failed_assets
+
+ @with_role(*MODERATION_ROLES)
+ @commands.group(name="branding")
+ async def branding_cmds(self, ctx: commands.Context) -> None:
+ """Manual branding control."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @branding_cmds.command(name="list", aliases=["ls"])
+ async def branding_list(self, ctx: commands.Context) -> None:
+ """List all available seasons and branding sources."""
+ embed = discord.Embed(title="Available seasons", colour=Colours.soft_green)
+
+ for season in get_all_seasons():
+ if season is SeasonBase:
+ active_when = "always"
+ else:
+ active_when = f"in {human_months(season.months)}"
+
+ description = (
+ f"Active {active_when}\n"
+ f"Branding: {season.branding_path}"
+ )
+ embed.add_field(name=season.season_name, value=description, inline=False)
+
+ await ctx.send(embed=embed)
+
+ @branding_cmds.command(name="set")
+ async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None:
+ """
+ Manually set season, or reset to current if none given.
+
+ Season search is a case-less comparison against both seasonal class name,
+ and its `season_name` attr.
+
+ This only pre-loads the cog's internal state to the chosen season, but does not
+ automatically apply the branding. As that is an expensive operation, the `apply`
+ command must be called explicitly after this command finishes.
+
+ This means that this command can be used to 'preview' a season gathering info
+ about its available assets, without applying them to the guild.
+
+ If the daemon is running, it will automatically reset the season to current when
+ it wakes up. The season set via this command can therefore remain 'detached' from
+ what it should be - the daemon will make sure that it's set back properly.
+ """
+ if season_name is None:
+ new_season = get_current_season()
+ else:
+ new_season = get_season(season_name)
+ if new_season is None:
+ raise BrandingError("No such season exists")
+
+ if self.current_season is new_season:
+ raise BrandingError(f"Season {self.current_season.season_name} already active")
+
+ self.current_season = new_season
+ await self.branding_refresh(ctx)
+
+ @branding_cmds.command(name="info", aliases=["status"])
+ async def branding_info(self, ctx: commands.Context) -> None:
+ """
+ Show available assets for current season.
+
+ This can be used to confirm that assets have been resolved properly.
+ When `apply` is used, it attempts to upload exactly the assets listed here.
+ """
+ await ctx.send(embed=await self._info_embed())
+
+ @branding_cmds.command(name="refresh")
+ async def branding_refresh(self, ctx: commands.Context) -> None:
+ """Sync currently available assets with branding repository."""
+ async with ctx.typing():
+ await self.refresh()
+ await self.branding_info(ctx)
+
+ @branding_cmds.command(name="apply")
+ async def branding_apply(self, ctx: commands.Context) -> None:
+ """
+ Apply current season's branding to the guild.
+
+ Use `info` to check which assets will be applied. Shows which assets have
+ failed to be applied, if any.
+ """
+ async with ctx.typing():
+ failed_assets = await self.apply()
+ if failed_assets:
+ raise BrandingError(f"Failed to apply following assets: {', '.join(failed_assets)}")
+
+ response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green)
+ await ctx.send(embed=response)
+
+ @branding_cmds.command(name="cycle")
+ async def branding_cycle(self, ctx: commands.Context) -> None:
+ """
+ Apply the next-up guild icon, if multiple are available.
+
+ The order is random.
+ """
+ async with ctx.typing():
+ success = await self.cycle()
+ if not success:
+ raise BrandingError("Failed to cycle icon")
+
+ response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green)
+ await ctx.send(embed=response)
+
+ @branding_cmds.group(name="daemon", aliases=["d", "task"])
+ async def daemon_group(self, ctx: commands.Context) -> None:
+ """Control the background daemon."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @daemon_group.command(name="status")
+ async def daemon_status(self, ctx: commands.Context) -> None:
+ """Check whether daemon is currently active."""
+ if self._daemon_running:
+ remaining_time = (arrow.utcnow() + time_until_midnight()).humanize()
+ response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green)
+ response.set_footer(text=f"Next refresh {remaining_time}")
+ else:
+ response = discord.Embed(description="Daemon not running", colour=Colours.soft_red)
+
+ await ctx.send(embed=response)
+
+ @daemon_group.command(name="start")
+ async def daemon_start(self, ctx: commands.Context) -> None:
+ """If the daemon isn't running, start it."""
+ if self._daemon_running:
+ raise BrandingError("Daemon already running!")
+
+ self.daemon = self.bot.loop.create_task(self._daemon_func())
+ self._write_config("daemon_active", True)
+
+ response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green)
+ await ctx.send(embed=response)
+
+ @daemon_group.command(name="stop")
+ async def daemon_stop(self, ctx: commands.Context) -> None:
+ """If the daemon is running, stop it."""
+ if not self._daemon_running:
+ raise BrandingError("Daemon not running!")
+
+ self.daemon.cancel()
+ self._write_config("daemon_active", False)
+
+ response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green)
+ await ctx.send(embed=response)
+
+
+def setup(bot: SeasonalBot) -> None:
+ """Load BrandingManager cog."""
+ bot.add_cog(BrandingManager(bot))
diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py
new file mode 100644
index 00000000..33b1a3f2
--- /dev/null
+++ b/bot/exts/evergreen/error_handler.py
@@ -0,0 +1,129 @@
+import logging
+import math
+import random
+from typing import Iterable, Union
+
+from discord import Embed, Message
+from discord.ext import commands
+from sentry_sdk import push_scope
+
+from bot.constants import Colours, ERROR_REPLIES, NEGATIVE_REPLIES
+from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure
+from bot.utils.exceptions import BrandingError
+
+log = logging.getLogger(__name__)
+
+
+class CommandErrorHandler(commands.Cog):
+ """A error handler for the PythonDiscord server."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @staticmethod
+ def revert_cooldown_counter(command: commands.Command, message: Message) -> None:
+ """Undoes the last cooldown counter for user-error cases."""
+ if command._buckets.valid:
+ bucket = command._buckets.get_bucket(message)
+ bucket._tokens = min(bucket.rate, bucket._tokens + 1)
+ logging.debug("Cooldown counter reverted as the command was not used correctly.")
+
+ @staticmethod
+ def error_embed(message: str, title: Union[Iterable, str] = ERROR_REPLIES) -> Embed:
+ """Build a basic embed with red colour and either a random error title or a title provided."""
+ embed = Embed(colour=Colours.soft_red)
+ if isinstance(title, str):
+ embed.title = title
+ else:
+ embed.title = random.choice(title)
+ embed.description = message
+ return embed
+
+ @commands.Cog.listener()
+ async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
+ """Activates when a command opens an error."""
+ if hasattr(ctx.command, 'on_error'):
+ logging.debug("A command error occured but the command had it's own error handler.")
+ return
+
+ error = getattr(error, 'original', error)
+ logging.debug(
+ f"Error Encountered: {type(error).__name__} - {str(error)}, "
+ f"Command: {ctx.command}, "
+ f"Author: {ctx.author}, "
+ f"Channel: {ctx.channel}"
+ )
+
+ if isinstance(error, commands.CommandNotFound):
+ return
+
+ if isinstance(error, BrandingError):
+ await ctx.send(embed=self.error_embed(str(error)))
+ return
+
+ if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)):
+ await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5)
+ return
+
+ if isinstance(error, commands.UserInputError):
+ self.revert_cooldown_counter(ctx.command, ctx.message)
+ embed = self.error_embed(
+ f"Your input was invalid: {error}\n\nUsage:\n```{ctx.prefix}{ctx.command} {ctx.command.signature}```"
+ )
+ await ctx.send(embed=embed)
+ return
+
+ if isinstance(error, commands.CommandOnCooldown):
+ mins, secs = divmod(math.ceil(error.retry_after), 60)
+ embed = self.error_embed(
+ f"This command is on cooldown:\nPlease retry in {mins} minutes {secs} seconds.",
+ NEGATIVE_REPLIES
+ )
+ await ctx.send(embed=embed, delete_after=7.5)
+ return
+
+ if isinstance(error, commands.DisabledCommand):
+ await ctx.send(embed=self.error_embed("This command has been disabled.", NEGATIVE_REPLIES))
+ return
+
+ if isinstance(error, commands.NoPrivateMessage):
+ await ctx.send(embed=self.error_embed("This command can only be used in the server.", NEGATIVE_REPLIES))
+ return
+
+ if isinstance(error, commands.BadArgument):
+ self.revert_cooldown_counter(ctx.command, ctx.message)
+ embed = self.error_embed(
+ "The argument you provided was invalid: "
+ f"{error}\n\nUsage:\n```{ctx.prefix}{ctx.command} {ctx.command.signature}```"
+ )
+ await ctx.send(embed=embed)
+ return
+
+ if isinstance(error, commands.CheckFailure):
+ await ctx.send(embed=self.error_embed("You are not authorized to use this command.", NEGATIVE_REPLIES))
+ return
+
+ with push_scope() as scope:
+ scope.user = {
+ "id": ctx.author.id,
+ "username": str(ctx.author)
+ }
+
+ scope.set_tag("command", ctx.command.qualified_name)
+ scope.set_tag("message_id", ctx.message.id)
+ scope.set_tag("channel_id", ctx.channel.id)
+
+ scope.set_extra("full_message", ctx.message.content)
+
+ if ctx.guild is not None:
+ scope.set_extra(
+ "jump_to",
+ f"https://discordapp.com/channels/{ctx.guild.id}/{ctx.channel.id}/{ctx.message.id}"
+ )
+
+ log.exception(f"Unhandled command error: {str(error)}", exc_info=error)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Error handler Cog load."""
+ bot.add_cog(CommandErrorHandler(bot))
diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py
new file mode 100644
index 00000000..67a4bae5
--- /dev/null
+++ b/bot/exts/evergreen/fun.py
@@ -0,0 +1,147 @@
+import functools
+import logging
+import random
+from typing import Callable, Tuple, Union
+
+from discord import Embed, Message
+from discord.ext import commands
+from discord.ext.commands import Bot, Cog, Context, MessageConverter
+
+from bot import utils
+from bot.constants import Emojis
+
+log = logging.getLogger(__name__)
+
+UWU_WORDS = {
+ "fi": "fwi",
+ "l": "w",
+ "r": "w",
+ "some": "sum",
+ "th": "d",
+ "thing": "fing",
+ "tho": "fo",
+ "you're": "yuw'we",
+ "your": "yur",
+ "you": "yuw",
+}
+
+
+class Fun(Cog):
+ """A collection of general commands for fun."""
+
+ def __init__(self, bot: Bot) -> None:
+ self.bot = bot
+
+ @commands.command()
+ async def roll(self, ctx: Context, num_rolls: int = 1) -> None:
+ """Outputs a number of random dice emotes (up to 6)."""
+ output = ""
+ if num_rolls > 6:
+ num_rolls = 6
+ elif num_rolls < 1:
+ output = ":no_entry: You must roll at least once."
+ for _ in range(num_rolls):
+ terning = f"terning{random.randint(1, 6)}"
+ output += getattr(Emojis, terning, '')
+ await ctx.send(output)
+
+ @commands.command(name="uwu", aliases=("uwuwize", "uwuify",))
+ async def uwu_command(self, ctx: Context, *, text: str) -> None:
+ """
+ Converts a given `text` into it's uwu equivalent.
+
+ Also accepts a valid discord Message ID or link.
+ """
+ conversion_func = functools.partial(
+ utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True
+ )
+ text, embed = await Fun._get_text_and_embed(ctx, text)
+ # Convert embed if it exists
+ if embed is not None:
+ embed = Fun._convert_embed(conversion_func, embed)
+ converted_text = conversion_func(text)
+ # Don't put >>> if only embed present
+ if converted_text:
+ converted_text = f">>> {converted_text.lstrip('> ')}"
+ await ctx.send(content=converted_text, embed=embed)
+
+ @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",))
+ async def randomcase_command(self, ctx: Context, *, text: str) -> None:
+ """
+ Randomly converts the casing of a given `text`.
+
+ Also accepts a valid discord Message ID or link.
+ """
+ def conversion_func(text: str) -> str:
+ """Randomly converts the casing of a given string."""
+ return "".join(
+ char.upper() if round(random.random()) else char.lower() for char in text
+ )
+ text, embed = await Fun._get_text_and_embed(ctx, text)
+ # Convert embed if it exists
+ if embed is not None:
+ embed = Fun._convert_embed(conversion_func, embed)
+ converted_text = conversion_func(text)
+ # Don't put >>> if only embed present
+ if converted_text:
+ converted_text = f">>> {converted_text.lstrip('> ')}"
+ await ctx.send(content=converted_text, embed=embed)
+
+ @staticmethod
+ async def _get_text_and_embed(ctx: Context, text: str) -> Tuple[str, Union[Embed, None]]:
+ """
+ Attempts to extract the text and embed from a possible link to a discord Message.
+
+ Returns a tuple of:
+ str: If `text` is a valid discord Message, the contents of the message, else `text`.
+ Union[Embed, None]: The embed if found in the valid Message, else None
+ """
+ embed = None
+ message = await Fun._get_discord_message(ctx, text)
+ if isinstance(message, Message):
+ text = message.content
+ # Take first embed because we can't send multiple embeds
+ if message.embeds:
+ embed = message.embeds[0]
+ return (text, embed)
+
+ @staticmethod
+ async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]:
+ """
+ Attempts to convert a given `text` to a discord Message object and return it.
+
+ Conversion will succeed if given a discord Message ID or link.
+ Returns `text` if the conversion fails.
+ """
+ try:
+ text = await MessageConverter().convert(ctx, text)
+ except commands.BadArgument:
+ log.debug(f"Input '{text:.20}...' is not a valid Discord Message")
+ return text
+
+ @staticmethod
+ def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed:
+ """
+ Converts the text in an embed using a given conversion function, then return the embed.
+
+ Only modifies the following fields: title, description, footer, fields
+ """
+ embed_dict = embed.to_dict()
+
+ embed_dict["title"] = func(embed_dict.get("title", ""))
+ embed_dict["description"] = func(embed_dict.get("description", ""))
+
+ if "footer" in embed_dict:
+ embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", ""))
+
+ if "fields" in embed_dict:
+ for field in embed_dict["fields"]:
+ field["name"] = func(field.get("name", ""))
+ field["value"] = func(field.get("value", ""))
+
+ return Embed.from_dict(embed_dict)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Fun Cog load."""
+ bot.add_cog(Fun(bot))
diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py
new file mode 100644
index 00000000..3c8b2725
--- /dev/null
+++ b/bot/exts/evergreen/game.py
@@ -0,0 +1,424 @@
+import difflib
+import logging
+import random
+import re
+from datetime import datetime as dt
+from enum import IntEnum
+from typing import Any, Dict, List, Optional, Tuple
+
+from aiohttp import ClientSession
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import Cog, Context, group
+
+from bot.bot import SeasonalBot
+from bot.constants import STAFF_ROLES, Tokens
+from bot.utils.decorators import with_role
+from bot.utils.pagination import ImagePaginator, LinePaginator
+
+# Base URL of IGDB API
+BASE_URL = "https://api-v3.igdb.com"
+
+HEADERS = {
+ "user-key": Tokens.igdb,
+ "Accept": "application/json"
+}
+
+logger = logging.getLogger(__name__)
+
+REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE)
+
+# ---------
+# TEMPLATES
+# ---------
+
+# Body templates
+# Request body template for get_games_list
+GAMES_LIST_BODY = (
+ "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status,"
+ "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;"
+ "{sort} {limit} {offset} {genre} {additional}"
+)
+
+# Request body template for get_companies_list
+COMPANIES_LIST_BODY = (
+ "fields name, url, start_date, logo.image_id, developed.name, published.name, description;"
+ "offset {offset}; limit {limit};"
+)
+
+# Request body template for games search
+SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";'
+
+# Pages templates
+# Game embed layout
+GAME_PAGE = (
+ "**[{name}]({url})**\n"
+ "{description}"
+ "**Release Date:** {release_date}\n"
+ "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n"
+ "**Platforms:** {platforms}\n"
+ "**Status:** {status}\n"
+ "**Age Ratings:** {age_ratings}\n"
+ "**Made by:** {made_by}\n\n"
+ "{storyline}"
+)
+
+# .games company command page layout
+COMPANY_PAGE = (
+ "**[{name}]({url})**\n"
+ "{description}"
+ "**Founded:** {founded}\n"
+ "**Developed:** {developed}\n"
+ "**Published:** {published}"
+)
+
+# For .games search command line layout
+GAME_SEARCH_LINE = (
+ "**[{name}]({url})**\n"
+ "{rating}/100 :star: (based on {rating_count} ratings)\n"
+)
+
+# URL templates
+COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg"
+LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png"
+
+# Create aliases for complex genre names
+ALIASES = {
+ "Role-playing (rpg)": ["Role playing", "Rpg"],
+ "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"],
+ "Real time strategy (rts)": ["Real time strategy", "Rts"],
+ "Hack and slash/beat 'em up": ["Hack and slash"]
+}
+
+
+class GameStatus(IntEnum):
+ """Game statuses in IGDB API."""
+
+ Released = 0
+ Alpha = 2
+ Beta = 3
+ Early = 4
+ Offline = 5
+ Cancelled = 6
+ Rumored = 7
+
+
+class AgeRatingCategories(IntEnum):
+ """IGDB API Age Rating categories IDs."""
+
+ ESRB = 1
+ PEGI = 2
+
+
+class AgeRatings(IntEnum):
+ """PEGI/ESRB ratings IGDB API IDs."""
+
+ Three = 1
+ Seven = 2
+ Twelve = 3
+ Sixteen = 4
+ Eighteen = 5
+ RP = 6
+ EC = 7
+ E = 8
+ E10 = 9
+ T = 10
+ M = 11
+ AO = 12
+
+
+class Games(Cog):
+ """Games Cog contains commands that collect data from IGDB."""
+
+ def __init__(self, bot: SeasonalBot):
+ self.bot = bot
+ self.http_session: ClientSession = bot.http_session
+
+ self.genres: Dict[str, int] = {}
+
+ self.refresh_genres_task.start()
+
+ @tasks.loop(hours=24.0)
+ async def refresh_genres_task(self) -> None:
+ """Refresh genres in every hour."""
+ try:
+ await self._get_genres()
+ except Exception as e:
+ logger.warning(f"There was error while refreshing genres: {e}")
+ return
+ logger.info("Successfully refreshed genres.")
+
+ def cog_unload(self) -> None:
+ """Cancel genres refreshing start when unloading Cog."""
+ self.refresh_genres_task.cancel()
+ logger.info("Successfully stopped Genres Refreshing task.")
+
+ async def _get_genres(self) -> None:
+ """Create genres variable for games command."""
+ body = "fields name; limit 100;"
+ async with self.http_session.get(f"{BASE_URL}/genres", data=body, headers=HEADERS) as resp:
+ result = await resp.json()
+
+ genres = {genre["name"].capitalize(): genre["id"] for genre in result}
+
+ # Replace complex names with names from ALIASES
+ for genre_name, genre in genres.items():
+ if genre_name in ALIASES:
+ for alias in ALIASES[genre_name]:
+ self.genres[alias] = genre
+ else:
+ self.genres[genre_name] = genre
+
+ @group(name="games", aliases=["game"], invoke_without_command=True)
+ async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str] = None) -> None:
+ """
+ Get random game(s) by genre from IGDB. Use .games genres command to get all available genres.
+
+ Also support amount parameter, what max is 25 and min 1, default 5. Supported formats:
+ - .games <genre>
+ - .games <amount> <genre>
+ """
+ # When user didn't specified genre, send help message
+ if genre is None:
+ await ctx.send_help("games")
+ return
+
+ # Capitalize genre for check
+ genre = "".join(genre).capitalize()
+
+ # Check for amounts, max is 25 and min 1
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ # Get games listing, if genre don't exist, show error message with possibilities.
+ # Offset must be random, due otherwise we will get always same result (offset show in which position should
+ # API start returning result)
+ try:
+ games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150))
+ except KeyError:
+ possibilities = await self.get_best_results(genre)
+ # If there is more than 1 possibilities, show these.
+ # If there is only 1 possibility, use it as genre.
+ # Otherwise send message about invalid genre.
+ if len(possibilities) > 1:
+ display_possibilities = "`, `".join(p[1] for p in possibilities)
+ await ctx.send(
+ f"Invalid genre `{genre}`. "
+ f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}"
+ )
+ return
+ elif len(possibilities) == 1:
+ games = await self.get_games_list(
+ amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150)
+ )
+ genre = possibilities[0][1]
+ else:
+ await ctx.send(f"Invalid genre `{genre}`.")
+ return
+
+ # Create pages and paginate
+ pages = [await self.create_page(game) for game in games]
+
+ await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games"))
+
+ @games.command(name="top", aliases=["t"])
+ async def top(self, ctx: Context, amount: int = 10) -> None:
+ """
+ Get current Top games in IGDB.
+
+ Support amount parameter. Max is 25, min is 1.
+ """
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ games = await self.get_games_list(amount, sort="total_rating desc",
+ additional_body="where total_rating >= 90; sort total_rating_count desc;")
+
+ pages = [await self.create_page(game) for game in games]
+ await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games"))
+
+ @games.command(name="genres", aliases=["genre", "g"])
+ async def genres(self, ctx: Context) -> None:
+ """Get all available genres."""
+ await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}")
+
+ @games.command(name="search", aliases=["s"])
+ async def search(self, ctx: Context, *, search_term: str) -> None:
+ """Find games by name."""
+ lines = await self.search_games(search_term)
+
+ await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False)
+
+ @games.command(name="company", aliases=["companies"])
+ async def company(self, ctx: Context, amount: int = 5) -> None:
+ """
+ Get random Game Companies companies from IGDB API.
+
+ Support amount parameter. Max is 25, min is 1.
+ """
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to
+ # get (almost) every time different companies (offset show in which position should API start returning result)
+ companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150))
+ pages = [await self.create_company_page(co) for co in companies]
+
+ await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies"))
+
+ @with_role(*STAFF_ROLES)
+ @games.command(name="refresh", aliases=["r"])
+ async def refresh_genres_command(self, ctx: Context) -> None:
+ """Refresh .games command genres."""
+ try:
+ await self._get_genres()
+ except Exception as e:
+ await ctx.send(f"There was error while refreshing genres: `{e}`")
+ return
+ await ctx.send("Successfully refreshed genres.")
+
+ async def get_games_list(self,
+ amount: int,
+ genre: Optional[str] = None,
+ sort: Optional[str] = None,
+ additional_body: str = "",
+ offset: int = 0
+ ) -> List[Dict[str, Any]]:
+ """
+ Get list of games from IGDB API by parameters that is provided.
+
+ Amount param show how much games this get, genre is genre ID and at least one genre in game must this when
+ provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field,
+ desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start
+ position in API.
+ """
+ # Create body of IGDB API request, define fields, sorting, offset, limit and genre
+ params = {
+ "sort": f"sort {sort};" if sort else "",
+ "limit": f"limit {amount};",
+ "offset": f"offset {offset};" if offset else "",
+ "genre": f"where genres = ({genre});" if genre else "",
+ "additional": additional_body
+ }
+ body = GAMES_LIST_BODY.format(**params)
+
+ # Do request to IGDB API, create headers, URL, define body, return result
+ async with self.http_session.get(url=f"{BASE_URL}/games", data=body, headers=HEADERS) as resp:
+ return await resp.json()
+
+ async def create_page(self, data: Dict[str, Any]) -> Tuple[str, str]:
+ """Create content of Game Page."""
+ # Create cover image URL from template
+ url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""})
+
+ # Get release date separately with checking
+ release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?"
+
+ # Create Age Ratings value
+ rating = ", ".join(f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}"
+ for age in data["age_ratings"]) if "age_ratings" in data else "?"
+
+ companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?"
+
+ # Create formatting for template page
+ formatting = {
+ "name": data["name"],
+ "url": data["url"],
+ "description": f"{data['summary']}\n\n" if "summary" in data else "\n",
+ "release_date": release_date,
+ "rating": round(data["total_rating"] if "total_rating" in data else 0, 2),
+ "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?",
+ "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?",
+ "status": GameStatus(data["status"]).name if "status" in data else "?",
+ "age_ratings": rating,
+ "made_by": ", ".join(companies),
+ "storyline": data["storyline"] if "storyline" in data else ""
+ }
+ page = GAME_PAGE.format(**formatting)
+
+ return page, url
+
+ async def search_games(self, search_term: str) -> List[str]:
+ """Search game from IGDB API by string, return listing of pages."""
+ lines = []
+
+ # Define request body of IGDB API request and do request
+ body = SEARCH_BODY.format(**{"term": search_term})
+
+ async with self.http_session.get(url=f"{BASE_URL}/games", data=body, headers=HEADERS) as resp:
+ data = await resp.json()
+
+ # Loop over games, format them to good format, make line and append this to total lines
+ for game in data:
+ formatting = {
+ "name": game["name"],
+ "url": game["url"],
+ "rating": round(game["total_rating"] if "total_rating" in game else 0, 2),
+ "rating_count": game["total_rating_count"] if "total_rating" in game else "?"
+ }
+ line = GAME_SEARCH_LINE.format(**formatting)
+ lines.append(line)
+
+ return lines
+
+ async def get_companies_list(self, limit: int, offset: int = 0) -> List[Dict[str, Any]]:
+ """
+ Get random Game Companies from IGDB API.
+
+ Limit is parameter, that show how much movies this should return, offset show in which position should API start
+ returning results.
+ """
+ # Create request body from template
+ body = COMPANIES_LIST_BODY.format(**{
+ "limit": limit,
+ "offset": offset
+ })
+
+ async with self.http_session.get(url=f"{BASE_URL}/companies", data=body, headers=HEADERS) as resp:
+ return await resp.json()
+
+ async def create_company_page(self, data: Dict[str, Any]) -> Tuple[str, str]:
+ """Create good formatted Game Company page."""
+ # Generate URL of company logo
+ url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""})
+
+ # Try to get found date of company
+ founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?"
+
+ # Generate list of games, that company have developed or published
+ developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?"
+ published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?"
+
+ formatting = {
+ "name": data["name"],
+ "url": data["url"],
+ "description": f"{data['description']}\n\n" if "description" in data else "\n",
+ "founded": founded,
+ "developed": developed,
+ "published": published
+ }
+ page = COMPANY_PAGE.format(**formatting)
+
+ return page, url
+
+ async def get_best_results(self, query: str) -> List[Tuple[float, str]]:
+ """Get best match result of genre when original genre is invalid."""
+ results = []
+ for genre in self.genres:
+ ratios = [difflib.SequenceMatcher(None, query, genre).ratio()]
+ for word in REGEX_NON_ALPHABET.split(genre):
+ ratios.append(difflib.SequenceMatcher(None, query, word).ratio())
+ results.append((round(max(ratios), 2), genre))
+ return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4]
+
+
+def setup(bot: SeasonalBot) -> None:
+ """Add/Load Games cog."""
+ # Check does IGDB API key exist, if not, log warning and don't load cog
+ if not Tokens.igdb:
+ logger.warning("No IGDB API key. Not loading Games cog.")
+ return
+ bot.add_cog(Games(bot))
diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py
new file mode 100644
index 00000000..ccd76d76
--- /dev/null
+++ b/bot/exts/evergreen/help.py
@@ -0,0 +1,552 @@
+# Help command from Python bot. All commands that will be added to there in futures should be added to here too.
+import asyncio
+import itertools
+import logging
+from collections import namedtuple
+from contextlib import suppress
+from typing import Union
+
+from discord import Colour, Embed, HTTPException, Message, Reaction, User
+from discord.ext import commands
+from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context
+from fuzzywuzzy import fuzz, process
+
+from bot import constants
+from bot.bot import SeasonalBot
+from bot.constants import Emojis
+from bot.utils.pagination import (
+ FIRST_EMOJI, LAST_EMOJI,
+ LEFT_EMOJI, LinePaginator, RIGHT_EMOJI,
+)
+
+DELETE_EMOJI = Emojis.trashcan
+
+REACTIONS = {
+ FIRST_EMOJI: 'first',
+ LEFT_EMOJI: 'back',
+ RIGHT_EMOJI: 'next',
+ LAST_EMOJI: 'end',
+ DELETE_EMOJI: 'stop',
+}
+
+Cog = namedtuple('Cog', ['name', 'description', 'commands'])
+
+log = logging.getLogger(__name__)
+
+
+class HelpQueryNotFound(ValueError):
+ """
+ Raised when a HelpSession Query doesn't match a command or cog.
+
+ Contains the custom attribute of ``possible_matches``.
+ Instances of this object contain a dictionary of any command(s) that were close to matching the
+ query, where keys are the possible matched command names and values are the likeness match scores.
+ """
+
+ def __init__(self, arg: str, possible_matches: dict = None):
+ super().__init__(arg)
+ self.possible_matches = possible_matches
+
+
+class HelpSession:
+ """
+ An interactive session for bot and command help output.
+
+ Expected attributes include:
+ * title: str
+ The title of the help message.
+ * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command]
+ * description: str
+ The description of the query.
+ * pages: list[str]
+ A list of the help content split into manageable pages.
+ * message: `discord.Message`
+ The message object that's showing the help contents.
+ * destination: `discord.abc.Messageable`
+ Where the help message is to be sent to.
+ Cogs can be grouped into custom categories. All cogs with the same category will be displayed
+ under a single category name in the help output. Custom categories are defined inside the cogs
+ as a class attribute named `category`. A description can also be specified with the attribute
+ `category_description`. If a description is not found in at least one cog, the default will be
+ the regular description (class docstring) of the first cog found in the category.
+ """
+
+ def __init__(
+ self,
+ ctx: Context,
+ *command,
+ cleanup: bool = False,
+ only_can_run: bool = True,
+ show_hidden: bool = False,
+ max_lines: int = 15
+ ):
+ """Creates an instance of the HelpSession class."""
+ self._ctx = ctx
+ self._bot = ctx.bot
+ self.title = "Command Help"
+
+ # set the query details for the session
+ if command:
+ query_str = ' '.join(command)
+ self.query = self._get_query(query_str)
+ self.description = self.query.description or self.query.help
+ else:
+ self.query = ctx.bot
+ self.description = self.query.description
+ self.author = ctx.author
+ self.destination = ctx.channel
+
+ # set the config for the session
+ self._cleanup = cleanup
+ self._only_can_run = only_can_run
+ self._show_hidden = show_hidden
+ self._max_lines = max_lines
+
+ # init session states
+ self._pages = None
+ self._current_page = 0
+ self.message = None
+ self._timeout_task = None
+ self.reset_timeout()
+
+ def _get_query(self, query: str) -> Union[Command, Cog]:
+ """Attempts to match the provided query with a valid command or cog."""
+ command = self._bot.get_command(query)
+ if command:
+ return command
+
+ # Find all cog categories that match.
+ cog_matches = []
+ description = None
+ for cog in self._bot.cogs.values():
+ if hasattr(cog, "category") and cog.category == query:
+ cog_matches.append(cog)
+ if hasattr(cog, "category_description"):
+ description = cog.category_description
+
+ # Try to search by cog name if no categories match.
+ if not cog_matches:
+ cog = self._bot.cogs.get(query)
+
+ # Don't consider it a match if the cog has a category.
+ if cog and not hasattr(cog, "category"):
+ cog_matches = [cog]
+
+ if cog_matches:
+ cog = cog_matches[0]
+ cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs
+
+ return Cog(
+ name=cog.category if hasattr(cog, "category") else cog.qualified_name,
+ description=description or cog.description,
+ commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list
+ )
+
+ self._handle_not_found(query)
+
+ def _handle_not_found(self, query: str) -> None:
+ """
+ Handles when a query does not match a valid command or cog.
+
+ Will pass on possible close matches along with the `HelpQueryNotFound` exception.
+ """
+ # Combine command and cog names
+ choices = list(self._bot.all_commands) + list(self._bot.cogs)
+
+ result = process.extractBests(query, choices, scorer=fuzz.ratio, score_cutoff=90)
+
+ raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result))
+
+ async def timeout(self, seconds: int = 30) -> None:
+ """Waits for a set number of seconds, then stops the help session."""
+ await asyncio.sleep(seconds)
+ await self.stop()
+
+ def reset_timeout(self) -> None:
+ """Cancels the original timeout task and sets it again from the start."""
+ # cancel original if it exists
+ if self._timeout_task:
+ if not self._timeout_task.cancelled():
+ self._timeout_task.cancel()
+
+ # recreate the timeout task
+ self._timeout_task = self._bot.loop.create_task(self.timeout())
+
+ async def on_reaction_add(self, reaction: Reaction, user: User) -> None:
+ """Event handler for when reactions are added on the help message."""
+ # ensure it was the relevant session message
+ if reaction.message.id != self.message.id:
+ return
+
+ # ensure it was the session author who reacted
+ if user.id != self.author.id:
+ return
+
+ emoji = str(reaction.emoji)
+
+ # check if valid action
+ if emoji not in REACTIONS:
+ return
+
+ self.reset_timeout()
+
+ # Run relevant action method
+ action = getattr(self, f'do_{REACTIONS[emoji]}', None)
+ if action:
+ await action()
+
+ # remove the added reaction to prep for re-use
+ with suppress(HTTPException):
+ await self.message.remove_reaction(reaction, user)
+
+ async def on_message_delete(self, message: Message) -> None:
+ """Closes the help session when the help message is deleted."""
+ if message.id == self.message.id:
+ await self.stop()
+
+ async def prepare(self) -> None:
+ """Sets up the help session pages, events, message and reactions."""
+ await self.build_pages()
+
+ self._bot.add_listener(self.on_reaction_add)
+ self._bot.add_listener(self.on_message_delete)
+
+ await self.update_page()
+ self.add_reactions()
+
+ def add_reactions(self) -> None:
+ """Adds the relevant reactions to the help message based on if pagination is required."""
+ # if paginating
+ if len(self._pages) > 1:
+ for reaction in REACTIONS:
+ self._bot.loop.create_task(self.message.add_reaction(reaction))
+
+ # if single-page
+ else:
+ self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI))
+
+ def _category_key(self, cmd: Command) -> str:
+ """
+ Returns a cog name of a given command for use as a key for `sorted` and `groupby`.
+
+ A zero width space is used as a prefix for results with no cogs to force them last in ordering.
+ """
+ if cmd.cog:
+ try:
+ if cmd.cog.category:
+ return f'**{cmd.cog.category}**'
+ except AttributeError:
+ pass
+
+ return f'**{cmd.cog_name}**'
+ else:
+ return "**\u200bNo Category:**"
+
+ def _get_command_params(self, cmd: Command) -> str:
+ """
+ Returns the command usage signature.
+
+ This is a custom implementation of `command.signature` in order to format the command
+ signature without aliases.
+ """
+ results = []
+ for name, param in cmd.clean_params.items():
+
+ # if argument has a default value
+ if param.default is not param.empty:
+
+ if isinstance(param.default, str):
+ show_default = param.default
+ else:
+ show_default = param.default is not None
+
+ # if default is not an empty string or None
+ if show_default:
+ results.append(f'[{name}={param.default}]')
+ else:
+ results.append(f'[{name}]')
+
+ # if variable length argument
+ elif param.kind == param.VAR_POSITIONAL:
+ results.append(f'[{name}...]')
+
+ # if required
+ else:
+ results.append(f'<{name}>')
+
+ return f"{cmd.name} {' '.join(results)}"
+
+ async def build_pages(self) -> None:
+ """Builds the list of content pages to be paginated through in the help message, as a list of str."""
+ # Use LinePaginator to restrict embed line height
+ paginator = LinePaginator(prefix='', suffix='', max_lines=self._max_lines)
+
+ prefix = constants.Client.prefix
+
+ # show signature if query is a command
+ if isinstance(self.query, commands.Command):
+ signature = self._get_command_params(self.query)
+ parent = self.query.full_parent_name + ' ' if self.query.parent else ''
+ paginator.add_line(f'**```{prefix}{parent}{signature}```**')
+
+ aliases = ', '.join(f'`{a}`' for a in self.query.aliases)
+ if aliases:
+ paginator.add_line(f'**Can also use:** {aliases}\n')
+
+ if not await self.query.can_run(self._ctx):
+ paginator.add_line('***You cannot run this command.***\n')
+
+ if isinstance(self.query, Cog):
+ paginator.add_line(f'**{self.query.name}**')
+
+ if self.description:
+ paginator.add_line(f'*{self.description}*')
+
+ # list all children commands of the queried object
+ if isinstance(self.query, (commands.GroupMixin, Cog)):
+
+ # remove hidden commands if session is not wanting hiddens
+ if not self._show_hidden:
+ filtered = [c for c in self.query.commands if not c.hidden]
+ else:
+ filtered = self.query.commands
+
+ # if after filter there are no commands, finish up
+ if not filtered:
+ self._pages = paginator.pages
+ return
+
+ if isinstance(self.query, Cog):
+ grouped = (('**Commands:**', self.query.commands),)
+
+ elif isinstance(self.query, commands.Command):
+ grouped = (('**Subcommands:**', self.query.commands),)
+
+ # don't show prefix for subcommands
+ prefix = ''
+
+ # otherwise sort and organise all commands into categories
+ else:
+ cat_sort = sorted(filtered, key=self._category_key)
+ grouped = itertools.groupby(cat_sort, key=self._category_key)
+
+ for category, cmds in grouped:
+ cmds = sorted(cmds, key=lambda c: c.name)
+
+ if len(cmds) == 0:
+ continue
+
+ cat_cmds = []
+
+ for command in cmds:
+
+ # skip if hidden and hide if session is set to
+ if command.hidden and not self._show_hidden:
+ continue
+
+ # see if the user can run the command
+ strikeout = ''
+
+ # Patch to make the !help command work outside of #bot-commands again
+ # This probably needs a proper rewrite, but this will make it work in
+ # the mean time.
+ try:
+ can_run = await command.can_run(self._ctx)
+ except CheckFailure:
+ can_run = False
+
+ if not can_run:
+ # skip if we don't show commands they can't run
+ if self._only_can_run:
+ continue
+ strikeout = '~~'
+
+ signature = self._get_command_params(command)
+ info = f"{strikeout}**`{prefix}{signature}`**{strikeout}"
+
+ # handle if the command has no docstring
+ if command.short_doc:
+ cat_cmds.append(f'{info}\n*{command.short_doc}*')
+ else:
+ cat_cmds.append(f'{info}\n*No details provided.*')
+
+ # state var for if the category should be added next
+ print_cat = 1
+ new_page = True
+
+ for details in cat_cmds:
+
+ # keep details together, paginating early if it won't fit
+ lines_adding = len(details.split('\n')) + print_cat
+ if paginator._linecount + lines_adding > self._max_lines:
+ paginator._linecount = 0
+ new_page = True
+ paginator.close_page()
+
+ # new page so print category title again
+ print_cat = 1
+
+ if print_cat:
+ if new_page:
+ paginator.add_line('')
+ paginator.add_line(category)
+ print_cat = 0
+
+ paginator.add_line(details)
+
+ self._pages = paginator.pages
+
+ def embed_page(self, page_number: int = 0) -> Embed:
+ """Returns an Embed with the requested page formatted within."""
+ embed = Embed()
+
+ if isinstance(self.query, (commands.Command, Cog)) and page_number > 0:
+ title = f'Command Help | "{self.query.name}"'
+ else:
+ title = self.title
+
+ embed.set_author(name=title, icon_url=constants.Icons.questionmark)
+ embed.description = self._pages[page_number]
+
+ page_count = len(self._pages)
+ if page_count > 1:
+ embed.set_footer(text=f'Page {self._current_page+1} / {page_count}')
+
+ return embed
+
+ async def update_page(self, page_number: int = 0) -> None:
+ """Sends the intial message, or changes the existing one to the given page number."""
+ self._current_page = page_number
+ embed_page = self.embed_page(page_number)
+
+ if not self.message:
+ self.message = await self.destination.send(embed=embed_page)
+ else:
+ await self.message.edit(embed=embed_page)
+
+ @classmethod
+ async def start(cls, ctx: Context, *command, **options) -> "HelpSession":
+ """
+ Create and begin a help session based on the given command context.
+
+ Available options kwargs:
+ * cleanup: Optional[bool]
+ Set to `True` to have the message deleted on session end. Defaults to `False`.
+ * only_can_run: Optional[bool]
+ Set to `True` to hide commands the user can't run. Defaults to `False`.
+ * show_hidden: Optional[bool]
+ Set to `True` to include hidden commands. Defaults to `False`.
+ * max_lines: Optional[int]
+ Sets the max number of lines the paginator will add to a single page. Defaults to 20.
+ """
+ session = cls(ctx, *command, **options)
+ await session.prepare()
+
+ return session
+
+ async def stop(self) -> None:
+ """Stops the help session, removes event listeners and attempts to delete the help message."""
+ self._bot.remove_listener(self.on_reaction_add)
+ self._bot.remove_listener(self.on_message_delete)
+
+ # ignore if permission issue, or the message doesn't exist
+ with suppress(HTTPException, AttributeError):
+ if self._cleanup:
+ await self.message.delete()
+ else:
+ await self.message.clear_reactions()
+
+ @property
+ def is_first_page(self) -> bool:
+ """Check if session is currently showing the first page."""
+ return self._current_page == 0
+
+ @property
+ def is_last_page(self) -> bool:
+ """Check if the session is currently showing the last page."""
+ return self._current_page == (len(self._pages)-1)
+
+ async def do_first(self) -> None:
+ """Event that is called when the user requests the first page."""
+ if not self.is_first_page:
+ await self.update_page(0)
+
+ async def do_back(self) -> None:
+ """Event that is called when the user requests the previous page."""
+ if not self.is_first_page:
+ await self.update_page(self._current_page-1)
+
+ async def do_next(self) -> None:
+ """Event that is called when the user requests the next page."""
+ if not self.is_last_page:
+ await self.update_page(self._current_page+1)
+
+ async def do_end(self) -> None:
+ """Event that is called when the user requests the last page."""
+ if not self.is_last_page:
+ await self.update_page(len(self._pages)-1)
+
+ async def do_stop(self) -> None:
+ """Event that is called when the user requests to stop the help session."""
+ await self.message.delete()
+
+
+class Help(DiscordCog):
+ """Custom Embed Pagination Help feature."""
+
+ @commands.command('help')
+ async def new_help(self, ctx: Context, *commands) -> None:
+ """Shows Command Help."""
+ try:
+ await HelpSession.start(ctx, *commands)
+ except HelpQueryNotFound as error:
+ embed = Embed()
+ embed.colour = Colour.red()
+ embed.title = str(error)
+
+ if error.possible_matches:
+ matches = '\n'.join(error.possible_matches.keys())
+ embed.description = f'**Did you mean:**\n`{matches}`'
+
+ await ctx.send(embed=embed)
+
+
+def unload(bot: SeasonalBot) -> None:
+ """
+ Reinstates the original help command.
+
+ This is run if the cog raises an exception on load, or if the extension is unloaded.
+ """
+ bot.remove_command('help')
+ bot.add_command(bot._old_help)
+
+
+def setup(bot: SeasonalBot) -> None:
+ """
+ The setup for the help extension.
+
+ This is called automatically on `bot.load_extension` being run.
+ Stores the original help command instance on the `bot._old_help` attribute for later
+ reinstatement, before removing it from the command registry so the new help command can be
+ loaded successfully.
+ If an exception is raised during the loading of the cog, `unload` will be called in order to
+ reinstate the original help command.
+ """
+ bot._old_help = bot.get_command('help')
+ bot.remove_command('help')
+
+ try:
+ bot.add_cog(Help())
+ except Exception:
+ unload(bot)
+ raise
+
+
+def teardown(bot: SeasonalBot) -> None:
+ """
+ The teardown for the help extension.
+
+ This is called automatically on `bot.unload_extension` being run.
+ Calls `unload` in order to reinstate the original help command.
+ """
+ unload(bot)
diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py
new file mode 100644
index 00000000..4129156a
--- /dev/null
+++ b/bot/exts/evergreen/issues.py
@@ -0,0 +1,76 @@
+import logging
+
+import discord
+from discord.ext import commands
+
+from bot.constants import Colours, Emojis, WHITELISTED_CHANNELS
+from bot.utils.decorators import override_in_channel
+
+log = logging.getLogger(__name__)
+
+BAD_RESPONSE = {
+ 404: "Issue/pull request not located! Please enter a valid number!",
+ 403: "Rate limit has been hit! Please try again later!"
+}
+
+
+class Issues(commands.Cog):
+ """Cog that allows users to retrieve issues from GitHub."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @commands.command(aliases=("pr",))
+ @override_in_channel(WHITELISTED_CHANNELS)
+ async def issue(
+ self, ctx: commands.Context, number: int, repository: str = "seasonalbot", user: str = "python-discord"
+ ) -> None:
+ """Command to retrieve issues from a GitHub repository."""
+ url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}"
+ merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge"
+
+ log.trace(f"Querying GH issues API: {url}")
+ async with self.bot.http_session.get(url) as r:
+ json_data = await r.json()
+
+ if r.status in BAD_RESPONSE:
+ log.warning(f"Received response {r.status} from: {url}")
+ return await ctx.send(f"[{str(r.status)}] {BAD_RESPONSE.get(r.status)}")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data.get("html_url"):
+ if json_data.get("state") == "open":
+ icon_url = Emojis.issue
+ else:
+ icon_url = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}")
+ async with self.bot.http_session.get(merge_url) as m:
+ if json_data.get("state") == "open":
+ icon_url = Emojis.pull_request
+ # When the status is 204 this means that the state of the PR is merged
+ elif m.status == 204:
+ icon_url = Emojis.merge
+ else:
+ icon_url = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
+ description_text = f"[{repository}] #{number} {json_data.get('title')}"
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description=f"{icon_url} [{description_text}]({issue_url})"
+ )
+ resp.set_author(name="GitHub", url=issue_url)
+ await ctx.send(embed=resp)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Cog Retrieves Issues From Github."""
+ bot.add_cog(Issues(bot))
diff --git a/bot/exts/evergreen/magic_8ball.py b/bot/exts/evergreen/magic_8ball.py
new file mode 100644
index 00000000..c10f1f51
--- /dev/null
+++ b/bot/exts/evergreen/magic_8ball.py
@@ -0,0 +1,31 @@
+import json
+import logging
+import random
+from pathlib import Path
+
+from discord.ext import commands
+
+log = logging.getLogger(__name__)
+
+
+class Magic8ball(commands.Cog):
+ """A Magic 8ball command to respond to a user's question."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+ with open(Path("bot/resources/evergreen/magic8ball.json"), "r") as file:
+ self.answers = json.load(file)
+
+ @commands.command(name="8ball")
+ async def output_answer(self, ctx: commands.Context, *, question: str) -> None:
+ """Return a Magic 8ball answer from answers list."""
+ if len(question.split()) >= 3:
+ answer = random.choice(self.answers)
+ await ctx.send(answer)
+ else:
+ await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)")
+
+
+def setup(bot: commands.Bot) -> None:
+ """Magic 8ball Cog load."""
+ bot.add_cog(Magic8ball(bot))
diff --git a/bot/exts/evergreen/minesweeper.py b/bot/exts/evergreen/minesweeper.py
new file mode 100644
index 00000000..b59cdb14
--- /dev/null
+++ b/bot/exts/evergreen/minesweeper.py
@@ -0,0 +1,284 @@
+import logging
+import typing
+from dataclasses import dataclass
+from random import randint, random
+
+import discord
+from discord.ext import commands
+
+from bot.constants import Client
+
+MESSAGE_MAPPING = {
+ 0: ":stop_button:",
+ 1: ":one:",
+ 2: ":two:",
+ 3: ":three:",
+ 4: ":four:",
+ 5: ":five:",
+ 6: ":six:",
+ 7: ":seven:",
+ 8: ":eight:",
+ 9: ":nine:",
+ 10: ":keycap_ten:",
+ "bomb": ":bomb:",
+ "hidden": ":grey_question:",
+ "flag": ":flag_black:",
+ "x": ":x:"
+}
+
+log = logging.getLogger(__name__)
+
+
+class CoordinateConverter(commands.Converter):
+ """Converter for Coordinates."""
+
+ async def convert(self, ctx: commands.Context, coordinate: str) -> typing.Tuple[int, int]:
+ """Take in a coordinate string and turn it into an (x, y) tuple."""
+ if not 2 <= len(coordinate) <= 3:
+ raise commands.BadArgument('Invalid co-ordinate provided')
+
+ coordinate = coordinate.lower()
+ if coordinate[0].isalpha():
+ digit = coordinate[1:]
+ letter = coordinate[0]
+ else:
+ digit = coordinate[:-1]
+ letter = coordinate[-1]
+
+ if not digit.isdigit():
+ raise commands.BadArgument
+
+ x = ord(letter) - ord('a')
+ y = int(digit) - 1
+
+ if (not 0 <= x <= 9) or (not 0 <= y <= 9):
+ raise commands.BadArgument
+ return x, y
+
+
+GameBoard = typing.List[typing.List[typing.Union[str, int]]]
+
+
+@dataclass
+class Game:
+ """The data for a game."""
+
+ board: GameBoard
+ revealed: GameBoard
+ dm_msg: discord.Message
+ chat_msg: discord.Message
+ activated_on_server: bool
+
+
+GamesDict = typing.Dict[int, Game]
+
+
+class Minesweeper(commands.Cog):
+ """Play a game of Minesweeper."""
+
+ def __init__(self, bot: commands.Bot) -> None:
+ self.games: GamesDict = {} # Store the currently running games
+
+ @commands.group(name='minesweeper', aliases=('ms',), invoke_without_command=True)
+ async def minesweeper_group(self, ctx: commands.Context) -> None:
+ """Commands for Playing Minesweeper."""
+ await ctx.send_help(ctx.command)
+
+ @staticmethod
+ def get_neighbours(x: int, y: int) -> typing.Generator[typing.Tuple[int, int], None, None]:
+ """Get all the neighbouring x and y including it self."""
+ for x_ in [x - 1, x, x + 1]:
+ for y_ in [y - 1, y, y + 1]:
+ if x_ != -1 and x_ != 10 and y_ != -1 and y_ != 10:
+ yield x_, y_
+
+ def generate_board(self, bomb_chance: float) -> GameBoard:
+ """Generate a 2d array for the board."""
+ board: GameBoard = [
+ [
+ "bomb" if random() <= bomb_chance else "number"
+ for _ in range(10)
+ ] for _ in range(10)
+ ]
+
+ # make sure there is always a free cell
+ board[randint(0, 9)][randint(0, 9)] = "number"
+
+ for y, row in enumerate(board):
+ for x, cell in enumerate(row):
+ if cell == "number":
+ # calculate bombs near it
+ bombs = 0
+ for x_, y_ in self.get_neighbours(x, y):
+ if board[y_][x_] == "bomb":
+ bombs += 1
+ board[y][x] = bombs
+ return board
+
+ @staticmethod
+ def format_for_discord(board: GameBoard) -> str:
+ """Format the board as a string for Discord."""
+ discord_msg = (
+ ":stop_button: :regional_indicator_a::regional_indicator_b::regional_indicator_c:"
+ ":regional_indicator_d::regional_indicator_e::regional_indicator_f::regional_indicator_g:"
+ ":regional_indicator_h::regional_indicator_i::regional_indicator_j:\n\n"
+ )
+ rows = []
+ for row_number, row in enumerate(board):
+ new_row = f"{MESSAGE_MAPPING[row_number + 1]} "
+ new_row += "".join(MESSAGE_MAPPING[cell] for cell in row)
+ rows.append(new_row)
+
+ discord_msg += "\n".join(rows)
+ return discord_msg
+
+ @minesweeper_group.command(name="start")
+ async def start_command(self, ctx: commands.Context, bomb_chance: float = .2) -> None:
+ """Start a game of Minesweeper."""
+ if ctx.author.id in self.games: # Player is already playing
+ await ctx.send(f"{ctx.author.mention} you already have a game running!", delete_after=2)
+ await ctx.message.delete(delay=2)
+ return
+
+ # Add game to list
+ board: GameBoard = self.generate_board(bomb_chance)
+ revealed_board: GameBoard = [["hidden"] * 10 for _ in range(10)]
+
+ if ctx.guild:
+ await ctx.send(f"{ctx.author.mention} is playing Minesweeper")
+ chat_msg = await ctx.send(f"Here's there board!\n{self.format_for_discord(revealed_board)}")
+ else:
+ chat_msg = None
+
+ await ctx.author.send(
+ f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n"
+ f"Close the game with `{Client.prefix}ms end`\n"
+ )
+ dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}")
+
+ self.games[ctx.author.id] = Game(
+ board=board,
+ revealed=revealed_board,
+ dm_msg=dm_msg,
+ chat_msg=chat_msg,
+ activated_on_server=ctx.guild is not None
+ )
+
+ async def update_boards(self, ctx: commands.Context) -> None:
+ """Update both playing boards."""
+ game = self.games[ctx.author.id]
+ await game.dm_msg.delete()
+ game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}")
+ if game.activated_on_server:
+ await game.chat_msg.edit(content=f"Here's there board!\n{self.format_for_discord(game.revealed)}")
+
+ @commands.dm_only()
+ @minesweeper_group.command(name="flag")
+ async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None:
+ """Place multiple flags on the board."""
+ board: GameBoard = self.games[ctx.author.id].revealed
+ for x, y in coordinates:
+ if board[y][x] == "hidden":
+ board[y][x] = "flag"
+
+ await self.update_boards(ctx)
+
+ @staticmethod
+ def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None:
+ """Reveals all the bombs."""
+ for y, row in enumerate(board):
+ for x, cell in enumerate(row):
+ if cell == "bomb":
+ revealed[y][x] = cell
+
+ async def lost(self, ctx: commands.Context) -> None:
+ """The player lost the game."""
+ game = self.games[ctx.author.id]
+ self.reveal_bombs(game.revealed, game.board)
+ await ctx.author.send(":fire: You lost! :fire:")
+ if game.activated_on_server:
+ await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:")
+
+ async def won(self, ctx: commands.Context) -> None:
+ """The player won the game."""
+ game = self.games[ctx.author.id]
+ await ctx.author.send(":tada: You won! :tada:")
+ if game.activated_on_server:
+ await game.chat_msg.channel.send(f":tada: {ctx.author.mention} just won Minesweeper! :tada:")
+
+ def reveal_zeros(self, revealed: GameBoard, board: GameBoard, x: int, y: int) -> None:
+ """Recursively reveal adjacent cells when a 0 cell is encountered."""
+ for x_, y_ in self.get_neighbours(x, y):
+ if revealed[y_][x_] != "hidden":
+ continue
+ revealed[y_][x_] = board[y_][x_]
+ if board[y_][x_] == 0:
+ self.reveal_zeros(revealed, board, x_, y_)
+
+ async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool:
+ """Checks if a player has won."""
+ if any(
+ revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb"
+ for x in range(10)
+ for y in range(10)
+ ):
+ return False
+ else:
+ await self.won(ctx)
+ return True
+
+ async def reveal_one(
+ self,
+ ctx: commands.Context,
+ revealed: GameBoard,
+ board: GameBoard,
+ x: int,
+ y: int
+ ) -> bool:
+ """
+ Reveal one square.
+
+ return is True if the game ended, breaking the loop in `reveal_command` and deleting the game
+ """
+ revealed[y][x] = board[y][x]
+ if board[y][x] == "bomb":
+ await self.lost(ctx)
+ revealed[y][x] = "x" # mark bomb that made you lose with a x
+ return True
+ elif board[y][x] == 0:
+ self.reveal_zeros(revealed, board, x, y)
+ return await self.check_if_won(ctx, revealed, board)
+
+ @commands.dm_only()
+ @minesweeper_group.command(name="reveal")
+ async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None:
+ """Reveal multiple cells."""
+ game = self.games[ctx.author.id]
+ revealed: GameBoard = game.revealed
+ board: GameBoard = game.board
+
+ for x, y in coordinates:
+ # reveal_one returns True if the revealed cell is a bomb or the player won, ending the game
+ if await self.reveal_one(ctx, revealed, board, x, y):
+ await self.update_boards(ctx)
+ del self.games[ctx.author.id]
+ break
+ else:
+ await self.update_boards(ctx)
+
+ @minesweeper_group.command(name="end")
+ async def end_command(self, ctx: commands.Context) -> None:
+ """End your current game."""
+ game = self.games[ctx.author.id]
+ game.revealed = game.board
+ await self.update_boards(ctx)
+ new_msg = f":no_entry: Game canceled :no_entry:\n{game.dm_msg.content}"
+ await game.dm_msg.edit(content=new_msg)
+ if game.activated_on_server:
+ await game.chat_msg.edit(content=new_msg)
+ del self.games[ctx.author.id]
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the Minesweeper cog."""
+ bot.add_cog(Minesweeper(bot))
diff --git a/bot/exts/evergreen/movie.py b/bot/exts/evergreen/movie.py
new file mode 100644
index 00000000..93aeef30
--- /dev/null
+++ b/bot/exts/evergreen/movie.py
@@ -0,0 +1,198 @@
+import logging
+import random
+from enum import Enum
+from typing import Any, Dict, List, Tuple
+from urllib.parse import urlencode
+
+from aiohttp import ClientSession
+from discord import Embed
+from discord.ext.commands import Bot, Cog, Context, group
+
+from bot.constants import Tokens
+from bot.utils.pagination import ImagePaginator
+
+# Define base URL of TMDB
+BASE_URL = "https://api.themoviedb.org/3/"
+
+logger = logging.getLogger(__name__)
+
+# Define movie params, that will be used for every movie request
+MOVIE_PARAMS = {
+ "api_key": Tokens.tmdb,
+ "language": "en-US"
+}
+
+
+class MovieGenres(Enum):
+ """Movies Genre names and IDs."""
+
+ Action = "28"
+ Adventure = "12"
+ Animation = "16"
+ Comedy = "35"
+ Crime = "80"
+ Documentary = "99"
+ Drama = "18"
+ Family = "10751"
+ Fantasy = "14"
+ History = "36"
+ Horror = "27"
+ Music = "10402"
+ Mystery = "9648"
+ Romance = "10749"
+ Science = "878"
+ Thriller = "53"
+ Western = "37"
+
+
+class Movie(Cog):
+ """Movie Cog contains movies command that grab random movies from TMDB."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.http_session: ClientSession = bot.http_session
+
+ @group(name='movies', aliases=['movie'], invoke_without_command=True)
+ async def movies(self, ctx: Context, genre: str = "", amount: int = 5) -> None:
+ """
+ Get random movies by specifying genre. Also support amount parameter, that define how much movies will be shown.
+
+ Default 5. Use .movies genres to get all available genres.
+ """
+ # Check is there more than 20 movies specified, due TMDB return 20 movies
+ # per page, so this is max. Also you can't get less movies than 1, just logic
+ if amount > 20:
+ await ctx.send("You can't get more than 20 movies at once. (TMDB limits)")
+ return
+ elif amount < 1:
+ await ctx.send("You can't get less than 1 movie.")
+ return
+
+ # Capitalize genre for getting data from Enum, get random page, send help when genre don't exist.
+ genre = genre.capitalize()
+ try:
+ result = await self.get_movies_list(self.http_session, MovieGenres[genre].value, 1)
+ except KeyError:
+ await ctx.send_help('movies')
+ return
+
+ # Check if "results" is in result. If not, throw error.
+ if "results" not in result.keys():
+ err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \
+ f"{result['status_message']}."
+ await ctx.send(err_msg)
+ logger.warning(err_msg)
+
+ # Get random page. Max page is last page where is movies with this genre.
+ page = random.randint(1, result["total_pages"])
+
+ # Get movies list from TMDB, check if results key in result. When not, raise error.
+ movies = await self.get_movies_list(self.http_session, MovieGenres[genre].value, page)
+ if 'results' not in movies.keys():
+ err_msg = f"There is problem while making TMDB API request. Response Code: {result['status_code']}, " \
+ f"{result['status_message']}."
+ await ctx.send(err_msg)
+ logger.warning(err_msg)
+
+ # Get all pages and embed
+ pages = await self.get_pages(self.http_session, movies, amount)
+ embed = await self.get_embed(genre)
+
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+ @movies.command(name='genres', aliases=['genre', 'g'])
+ async def genres(self, ctx: Context) -> None:
+ """Show all currently available genres for .movies command."""
+ await ctx.send(f"Current available genres: {', '.join('`' + genre.name + '`' for genre in MovieGenres)}")
+
+ async def get_movies_list(self, client: ClientSession, genre_id: str, page: int) -> Dict[str, Any]:
+ """Return JSON of TMDB discover request."""
+ # Define params of request
+ params = {
+ "api_key": Tokens.tmdb,
+ "language": "en-US",
+ "sort_by": "popularity.desc",
+ "include_adult": "false",
+ "include_video": "false",
+ "page": page,
+ "with_genres": genre_id
+ }
+
+ url = BASE_URL + "discover/movie?" + urlencode(params)
+
+ # Make discover request to TMDB, return result
+ async with client.get(url) as resp:
+ return await resp.json()
+
+ async def get_pages(self, client: ClientSession, movies: Dict[str, Any], amount: int) -> List[Tuple[str, str]]:
+ """Fetch all movie pages from movies dictionary. Return list of pages."""
+ pages = []
+
+ for i in range(amount):
+ movie_id = movies['results'][i]['id']
+ movie = await self.get_movie(client, movie_id)
+
+ page, img = await self.create_page(movie)
+ pages.append((page, img))
+
+ return pages
+
+ async def get_movie(self, client: ClientSession, movie: int) -> Dict:
+ """Get Movie by movie ID from TMDB. Return result dictionary."""
+ url = BASE_URL + f"movie/{movie}?" + urlencode(MOVIE_PARAMS)
+
+ async with client.get(url) as resp:
+ return await resp.json()
+
+ async def create_page(self, movie: Dict[str, Any]) -> Tuple[str, str]:
+ """Create page from TMDB movie request result. Return formatted page + image."""
+ text = ""
+
+ # Add title + tagline (if not empty)
+ text += f"**{movie['title']}**\n"
+ if movie['tagline']:
+ text += f"{movie['tagline']}\n\n"
+ else:
+ text += "\n"
+
+ # Add other information
+ text += f"**Rating:** {movie['vote_average']}/10 :star:\n"
+ text += f"**Release Date:** {movie['release_date']}\n\n"
+
+ text += "__**Production Information**__\n"
+
+ companies = movie['production_companies']
+ countries = movie['production_countries']
+
+ text += f"**Made by:** {', '.join(company['name'] for company in companies)}\n"
+ text += f"**Made in:** {', '.join(country['name'] for country in countries)}\n\n"
+
+ text += "__**Some Numbers**__\n"
+
+ budget = f"{movie['budget']:,d}" if movie['budget'] else "?"
+ revenue = f"{movie['revenue']:,d}" if movie['revenue'] else "?"
+
+ if movie['runtime'] is not None:
+ duration = divmod(movie['runtime'], 60)
+ else:
+ duration = ("?", "?")
+
+ text += f"**Budget:** ${budget}\n"
+ text += f"**Revenue:** ${revenue}\n"
+ text += f"**Duration:** {f'{duration[0]} hour(s) {duration[1]} minute(s)'}\n\n"
+
+ text += movie['overview']
+
+ img = f"http://image.tmdb.org/t/p/w200{movie['poster_path']}"
+
+ # Return page content and image
+ return text, img
+
+ async def get_embed(self, name: str) -> Embed:
+ """Return embed of random movies. Uses name in title."""
+ return Embed(title=f'Random {name} Movies').set_footer(text='Powered by TMDB (themoviedb.org)')
+
+
+def setup(bot: Bot) -> None:
+ """Load Movie Cog."""
+ bot.add_cog(Movie(bot))
diff --git a/bot/exts/evergreen/recommend_game.py b/bot/exts/evergreen/recommend_game.py
new file mode 100644
index 00000000..7cd52c2c
--- /dev/null
+++ b/bot/exts/evergreen/recommend_game.py
@@ -0,0 +1,50 @@
+import json
+import logging
+from pathlib import Path
+from random import shuffle
+
+import discord
+from discord.ext import commands
+
+log = logging.getLogger(__name__)
+game_recs = []
+
+# Populate the list `game_recs` with resource files
+for rec_path in Path("bot/resources/evergreen/game_recs").glob("*.json"):
+ with rec_path.open(encoding='utf-8') as file:
+ data = json.load(file)
+ game_recs.append(data)
+shuffle(game_recs)
+
+
+class RecommendGame(commands.Cog):
+ """Commands related to recommending games."""
+
+ def __init__(self, bot: commands.Bot) -> None:
+ self.bot = bot
+ self.index = 0
+
+ @commands.command(name="recommendgame", aliases=['gamerec'])
+ async def recommend_game(self, ctx: commands.Context) -> None:
+ """Sends an Embed of a random game recommendation."""
+ if self.index >= len(game_recs):
+ self.index = 0
+ shuffle(game_recs)
+ game = game_recs[self.index]
+ self.index += 1
+
+ author = self.bot.get_user(int(game['author']))
+
+ # Creating and formatting Embed
+ embed = discord.Embed(color=discord.Colour.blue())
+ if author is not None:
+ embed.set_author(name=author.name, icon_url=author.avatar_url)
+ embed.set_image(url=game['image'])
+ embed.add_field(name='Recommendation: ' + game['title'] + '\n' + game['link'], value=game['description'])
+
+ await ctx.send(embed=embed)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Loads the RecommendGame cog."""
+ bot.add_cog(RecommendGame(bot))
diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py
new file mode 100644
index 00000000..fe204419
--- /dev/null
+++ b/bot/exts/evergreen/reddit.py
@@ -0,0 +1,128 @@
+import logging
+import random
+
+import discord
+from discord.ext import commands
+from discord.ext.commands.cooldowns import BucketType
+
+from bot.utils.pagination import ImagePaginator
+
+log = logging.getLogger(__name__)
+
+
+class Reddit(commands.Cog):
+ """Fetches reddit posts."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ async def fetch(self, url: str) -> dict:
+ """Send a get request to the reddit API and get json response."""
+ session = self.bot.http_session
+ params = {
+ 'limit': 50
+ }
+ headers = {
+ 'User-Agent': 'Iceman'
+ }
+
+ async with session.get(url=url, params=params, headers=headers) as response:
+ return await response.json()
+
+ @commands.command(name='reddit')
+ @commands.cooldown(1, 10, BucketType.user)
+ async def get_reddit(self, ctx: commands.Context, subreddit: str = 'python', sort: str = "hot") -> None:
+ """
+ Fetch reddit posts by using this command.
+
+ Gets a post from r/python by default.
+ Usage:
+ --> .reddit [subreddit_name] [hot/top/new]
+ """
+ pages = []
+ sort_list = ["hot", "new", "top", "rising"]
+ if sort.lower() not in sort_list:
+ await ctx.send(f"Invalid sorting: {sort}\nUsing default sorting: `Hot`")
+ sort = "hot"
+
+ data = await self.fetch(f'https://www.reddit.com/r/{subreddit}/{sort}/.json')
+
+ try:
+ posts = data["data"]["children"]
+ except KeyError:
+ return await ctx.send('Subreddit not found!')
+ if not posts:
+ return await ctx.send('No posts available!')
+
+ if posts[1]["data"]["over_18"] is True:
+ return await ctx.send(
+ "You cannot access this Subreddit as it is ment for those who "
+ "are 18 years or older."
+ )
+
+ embed_titles = ""
+
+ # Chooses k unique random elements from a population sequence or set.
+ random_posts = random.sample(posts, k=5)
+
+ # -----------------------------------------------------------
+ # This code below is bound of change when the emojis are added.
+
+ upvote_emoji = self.bot.get_emoji(638729835245731840)
+ comment_emoji = self.bot.get_emoji(638729835073765387)
+ user_emoji = self.bot.get_emoji(638729835442602003)
+ text_emoji = self.bot.get_emoji(676030265910493204)
+ video_emoji = self.bot.get_emoji(676030265839190047)
+ image_emoji = self.bot.get_emoji(676030265734201344)
+ reddit_emoji = self.bot.get_emoji(676030265734332427)
+
+ # ------------------------------------------------------------
+
+ for i, post in enumerate(random_posts, start=1):
+ post_title = post["data"]["title"][0:50]
+ post_url = post['data']['url']
+ if post_title == "":
+ post_title = "No Title."
+ elif post_title == post_url:
+ post_title = "Title is itself a link."
+
+ # ------------------------------------------------------------------
+ # Embed building.
+
+ embed_titles += f"**{i}.[{post_title}]({post_url})**\n"
+ image_url = " "
+ post_stats = f"{text_emoji}" # Set default content type to text.
+
+ if post["data"]["is_video"] is True or "youtube" in post_url.split("."):
+ # This means the content type in the post is a video.
+ post_stats = f"{video_emoji} "
+
+ elif post_url.endswith("jpg") or post_url.endswith("png") or post_url.endswith("gif"):
+ # This means the content type in the post is an image.
+ post_stats = f"{image_emoji} "
+ image_url = post_url
+
+ votes = f'{upvote_emoji}{post["data"]["ups"]}'
+ comments = f'{comment_emoji}\u2002{ post["data"]["num_comments"]}'
+ post_stats += (
+ f"\u2002{votes}\u2003"
+ f"{comments}"
+ f'\u2003{user_emoji}\u2002{post["data"]["author"]}\n'
+ )
+ embed_titles += f"{post_stats}\n"
+ page_text = f"**[{post_title}]({post_url})**\n{post_stats}\n{post['data']['selftext'][0:200]}"
+
+ embed = discord.Embed()
+ page_tuple = (page_text, image_url)
+ pages.append(page_tuple)
+
+ # ------------------------------------------------------------------
+
+ pages.insert(0, (embed_titles, " "))
+ embed.set_author(name=f"r/{posts[0]['data']['subreddit']} - {sort}", icon_url=reddit_emoji.url)
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the Cog."""
+ bot.add_cog(Reddit(bot))
diff --git a/bot/exts/evergreen/showprojects.py b/bot/exts/evergreen/showprojects.py
new file mode 100644
index 00000000..328a7aa5
--- /dev/null
+++ b/bot/exts/evergreen/showprojects.py
@@ -0,0 +1,33 @@
+import logging
+
+from discord import Message
+from discord.ext import commands
+
+from bot.constants import Channels
+
+log = logging.getLogger(__name__)
+
+
+class ShowProjects(commands.Cog):
+ """Cog that reacts to posts in the #show-your-projects."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+ self.lastPoster = 0 # Given 0 as the default last poster ID as no user can actually have 0 assigned to them
+
+ @commands.Cog.listener()
+ async def on_message(self, message: Message) -> None:
+ """Adds reactions to posts in #show-your-projects."""
+ reactions = ["\U0001f44d", "\U00002764", "\U0001f440", "\U0001f389", "\U0001f680", "\U00002b50", "\U0001f6a9"]
+ if (message.channel.id == Channels.show_your_projects
+ and message.author.bot is False
+ and message.author.id != self.lastPoster):
+ for reaction in reactions:
+ await message.add_reaction(reaction)
+
+ self.lastPoster = message.author.id
+
+
+def setup(bot: commands.Bot) -> None:
+ """Show Projects Reaction Cog."""
+ bot.add_cog(ShowProjects(bot))
diff --git a/bot/exts/evergreen/snakes/__init__.py b/bot/exts/evergreen/snakes/__init__.py
new file mode 100644
index 00000000..2eae2751
--- /dev/null
+++ b/bot/exts/evergreen/snakes/__init__.py
@@ -0,0 +1,12 @@
+import logging
+
+from discord.ext import commands
+
+from bot.exts.evergreen.snakes.snakes_cog import Snakes
+
+log = logging.getLogger(__name__)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Snakes Cog load."""
+ bot.add_cog(Snakes(bot))
diff --git a/bot/exts/evergreen/snakes/converter.py b/bot/exts/evergreen/snakes/converter.py
new file mode 100644
index 00000000..d4e93b56
--- /dev/null
+++ b/bot/exts/evergreen/snakes/converter.py
@@ -0,0 +1,85 @@
+import json
+import logging
+import random
+from typing import Iterable, List
+
+import discord
+from discord.ext.commands import Context, Converter
+from fuzzywuzzy import fuzz
+
+from bot.exts.evergreen.snakes.utils import SNAKE_RESOURCES
+from bot.utils import disambiguate
+
+log = logging.getLogger(__name__)
+
+
+class Snake(Converter):
+ """Snake converter for the Snakes Cog."""
+
+ snakes = None
+ special_cases = None
+
+ async def convert(self, ctx: Context, name: str) -> str:
+ """Convert the input snake name to the closest matching Snake object."""
+ await self.build_list()
+ name = name.lower()
+
+ if name == 'python':
+ return 'Python (programming language)'
+
+ def get_potential(iterable: Iterable, *, threshold: int = 80) -> List[str]:
+ nonlocal name
+ potential = []
+
+ for item in iterable:
+ original, item = item, item.lower()
+
+ if name == item:
+ return [original]
+
+ a, b = fuzz.ratio(name, item), fuzz.partial_ratio(name, item)
+ if a >= threshold or b >= threshold:
+ potential.append(original)
+
+ return potential
+
+ # Handle special cases
+ if name.lower() in self.special_cases:
+ return self.special_cases.get(name.lower(), name.lower())
+
+ names = {snake['name']: snake['scientific'] for snake in self.snakes}
+ all_names = names.keys() | names.values()
+ timeout = len(all_names) * (3 / 4)
+
+ embed = discord.Embed(
+ title='Found multiple choices. Please choose the correct one.', colour=0x59982F)
+ embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.avatar_url)
+
+ name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed)
+ return names.get(name, name)
+
+ @classmethod
+ async def build_list(cls) -> None:
+ """Build list of snakes from the static snake resources."""
+ # Get all the snakes
+ if cls.snakes is None:
+ with (SNAKE_RESOURCES / "snake_names.json").open() as snakefile:
+ cls.snakes = json.load(snakefile)
+
+ # Get the special cases
+ if cls.special_cases is None:
+ with (SNAKE_RESOURCES / "special_snakes.json").open() as snakefile:
+ special_cases = json.load(snakefile)
+ cls.special_cases = {snake['name'].lower(): snake for snake in special_cases}
+
+ @classmethod
+ async def random(cls) -> str:
+ """
+ Get a random Snake from the loaded resources.
+
+ This is stupid. We should find a way to somehow get the global session into a global context,
+ so I can get it from here.
+ """
+ await cls.build_list()
+ names = [snake['scientific'] for snake in cls.snakes]
+ return random.choice(names)
diff --git a/bot/exts/evergreen/snakes/snakes_cog.py b/bot/exts/evergreen/snakes/snakes_cog.py
new file mode 100644
index 00000000..36c176ce
--- /dev/null
+++ b/bot/exts/evergreen/snakes/snakes_cog.py
@@ -0,0 +1,1149 @@
+import asyncio
+import colorsys
+import logging
+import os
+import random
+import re
+import string
+import textwrap
+import urllib
+from functools import partial
+from io import BytesIO
+from typing import Any, Dict, List
+
+import aiohttp
+import async_timeout
+from PIL import Image, ImageDraw, ImageFont
+from discord import Colour, Embed, File, Member, Message, Reaction
+from discord.ext.commands import BadArgument, Bot, Cog, CommandError, Context, bot_has_permissions, group
+
+from bot.constants import ERROR_REPLIES, Tokens
+from bot.exts.evergreen.snakes import utils
+from bot.exts.evergreen.snakes.converter import Snake
+from bot.utils.decorators import locked
+
+log = logging.getLogger(__name__)
+
+
+# region: Constants
+# Color
+SNAKE_COLOR = 0x399600
+
+# Antidote constants
+SYRINGE_EMOJI = "\U0001F489" # :syringe:
+PILL_EMOJI = "\U0001F48A" # :pill:
+HOURGLASS_EMOJI = "\u231B" # :hourglass:
+CROSSBONES_EMOJI = "\u2620" # :skull_crossbones:
+ALEMBIC_EMOJI = "\u2697" # :alembic:
+TICK_EMOJI = "\u2705" # :white_check_mark: - Correct peg, correct hole
+CROSS_EMOJI = "\u274C" # :x: - Wrong peg, wrong hole
+BLANK_EMOJI = "\u26AA" # :white_circle: - Correct peg, wrong hole
+HOLE_EMOJI = "\u2B1C" # :white_square: - Used in guesses
+EMPTY_UNICODE = "\u200b" # literally just an empty space
+
+ANTIDOTE_EMOJI = (
+ SYRINGE_EMOJI,
+ PILL_EMOJI,
+ HOURGLASS_EMOJI,
+ CROSSBONES_EMOJI,
+ ALEMBIC_EMOJI,
+)
+
+# Quiz constants
+ANSWERS_EMOJI = {
+ "a": "\U0001F1E6", # :regional_indicator_a: 🇦
+ "b": "\U0001F1E7", # :regional_indicator_b: 🇧
+ "c": "\U0001F1E8", # :regional_indicator_c: 🇨
+ "d": "\U0001F1E9", # :regional_indicator_d: 🇩
+}
+
+ANSWERS_EMOJI_REVERSE = {
+ "\U0001F1E6": "A", # :regional_indicator_a: 🇦
+ "\U0001F1E7": "B", # :regional_indicator_b: 🇧
+ "\U0001F1E8": "C", # :regional_indicator_c: 🇨
+ "\U0001F1E9": "D", # :regional_indicator_d: 🇩
+}
+
+# Zzzen of pythhhon constant
+ZEN = """
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+"""
+
+# Max messages to train snake_chat on
+MSG_MAX = 100
+
+# get_snek constants
+URL = "https://en.wikipedia.org/w/api.php?"
+
+# snake guess responses
+INCORRECT_GUESS = (
+ "Nope, that's not what it is.",
+ "Not quite.",
+ "Not even close.",
+ "Terrible guess.",
+ "Nnnno.",
+ "Dude. No.",
+ "I thought everyone knew this one.",
+ "Guess you suck at snakes.",
+ "Bet you feel stupid now.",
+ "Hahahaha, no.",
+ "Did you hit the wrong key?"
+)
+
+CORRECT_GUESS = (
+ "**WRONG**. Wait, no, actually you're right.",
+ "Yeah, you got it!",
+ "Yep, that's exactly what it is.",
+ "Uh-huh. Yep yep yep.",
+ "Yeah that's right.",
+ "Yup. How did you know that?",
+ "Are you a herpetologist?",
+ "Sure, okay, but I bet you can't pronounce it.",
+ "Are you cheating?"
+)
+
+# snake card consts
+CARD = {
+ "top": Image.open("bot/resources/snakes/snake_cards/card_top.png"),
+ "frame": Image.open("bot/resources/snakes/snake_cards/card_frame.png"),
+ "bottom": Image.open("bot/resources/snakes/snake_cards/card_bottom.png"),
+ "backs": [
+ Image.open(f"bot/resources/snakes/snake_cards/backs/{file}")
+ for file in os.listdir("bot/resources/snakes/snake_cards/backs")
+ ],
+ "font": ImageFont.truetype("bot/resources/snakes/snake_cards/expressway.ttf", 20)
+}
+# endregion
+
+
+class Snakes(Cog):
+ """
+ Commands related to snakes, created by our community during the first code jam.
+
+ More information can be found in the code-jam-1 repo.
+
+ https://github.com/python-discord/code-jam-1
+ """
+
+ wiki_brief = re.compile(r'(.*?)(=+ (.*?) =+)', flags=re.DOTALL)
+ valid_image_extensions = ('gif', 'png', 'jpeg', 'jpg', 'webp')
+
+ def __init__(self, bot: Bot):
+ self.active_sal = {}
+ self.bot = bot
+ self.snake_names = utils.get_resource("snake_names")
+ self.snake_idioms = utils.get_resource("snake_idioms")
+ self.snake_quizzes = utils.get_resource("snake_quiz")
+ self.snake_facts = utils.get_resource("snake_facts")
+
+ # region: Helper methods
+ @staticmethod
+ def _beautiful_pastel(hue: float) -> int:
+ """Returns random bright pastels."""
+ light = random.uniform(0.7, 0.85)
+ saturation = 1
+
+ rgb = colorsys.hls_to_rgb(hue, light, saturation)
+ hex_rgb = ""
+
+ for part in rgb:
+ value = int(part * 0xFF)
+ hex_rgb += f"{value:02x}"
+
+ return int(hex_rgb, 16)
+
+ @staticmethod
+ def _generate_card(buffer: BytesIO, content: dict) -> BytesIO:
+ """
+ Generate a card from snake information.
+
+ Written by juan and Someone during the first code jam.
+ """
+ snake = Image.open(buffer)
+
+ # Get the size of the snake icon, configure the height of the image box (yes, it changes)
+ icon_width = 347 # Hardcoded, not much i can do about that
+ icon_height = int((icon_width / snake.width) * snake.height)
+ frame_copies = icon_height // CARD['frame'].height + 1
+ snake.thumbnail((icon_width, icon_height))
+
+ # Get the dimensions of the final image
+ main_height = icon_height + CARD['top'].height + CARD['bottom'].height
+ main_width = CARD['frame'].width
+
+ # Start creating the foreground
+ foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0))
+ foreground.paste(CARD['top'], (0, 0))
+
+ # Generate the frame borders to the correct height
+ for offset in range(frame_copies):
+ position = (0, CARD['top'].height + offset * CARD['frame'].height)
+ foreground.paste(CARD['frame'], position)
+
+ # Add the image and bottom part of the image
+ foreground.paste(snake, (36, CARD['top'].height)) # Also hardcoded :(
+ foreground.paste(CARD['bottom'], (0, CARD['top'].height + icon_height))
+
+ # Setup the background
+ back = random.choice(CARD['backs'])
+ back_copies = main_height // back.height + 1
+ full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0))
+
+ # Generate the tiled background
+ for offset in range(back_copies):
+ full_image.paste(back, (16, 16 + offset * back.height))
+
+ # Place the foreground onto the final image
+ full_image.paste(foreground, (0, 0), foreground)
+
+ # Get the first two sentences of the info
+ description = '.'.join(content['info'].split(".")[:2]) + '.'
+
+ # Setup positioning variables
+ margin = 36
+ offset = CARD['top'].height + icon_height + margin
+
+ # Create blank rectangle image which will be behind the text
+ rectangle = Image.new(
+ "RGBA",
+ (main_width, main_height),
+ (0, 0, 0, 0)
+ )
+
+ # Draw a semi-transparent rectangle on it
+ rect = ImageDraw.Draw(rectangle)
+ rect.rectangle(
+ (margin, offset, main_width - margin, main_height - margin),
+ fill=(63, 63, 63, 128)
+ )
+
+ # Paste it onto the final image
+ full_image.paste(rectangle, (0, 0), mask=rectangle)
+
+ # Draw the text onto the final image
+ draw = ImageDraw.Draw(full_image)
+ for line in textwrap.wrap(description, 36):
+ draw.text([margin + 4, offset], line, font=CARD['font'])
+ offset += CARD['font'].getsize(line)[1]
+
+ # Get the image contents as a BufferIO object
+ buffer = BytesIO()
+ full_image.save(buffer, 'PNG')
+ buffer.seek(0)
+
+ return buffer
+
+ @staticmethod
+ def _snakify(message: str) -> str:
+ """Sssnakifffiesss a sstring."""
+ # Replace fricatives with exaggerated snake fricatives.
+ simple_fricatives = [
+ "f", "s", "z", "h",
+ "F", "S", "Z", "H",
+ ]
+ complex_fricatives = [
+ "th", "sh", "Th", "Sh"
+ ]
+
+ for letter in simple_fricatives:
+ if letter.islower():
+ message = message.replace(letter, letter * random.randint(2, 4))
+ else:
+ message = message.replace(letter, (letter * random.randint(2, 4)).title())
+
+ for fricative in complex_fricatives:
+ message = message.replace(fricative, fricative[0] + fricative[1] * random.randint(2, 4))
+
+ return message
+
+ async def _fetch(self, session: aiohttp.ClientSession, url: str, params: dict = None) -> dict:
+ """Asynchronous web request helper method."""
+ if params is None:
+ params = {}
+
+ async with async_timeout.timeout(10):
+ async with session.get(url, params=params) as response:
+ return await response.json()
+
+ def _get_random_long_message(self, messages: List[str], retries: int = 10) -> str:
+ """
+ Fetch a message that's at least 3 words long, if possible to do so in retries attempts.
+
+ Else, just return whatever the last message is.
+ """
+ long_message = random.choice(messages)
+ if len(long_message.split()) < 3 and retries > 0:
+ return self._get_random_long_message(
+ messages,
+ retries=retries - 1
+ )
+
+ return long_message
+
+ async def _get_snek(self, name: str) -> Dict[str, Any]:
+ """
+ Fetches all the data from a wikipedia article about a snake.
+
+ Builds a dict that the .get() method can use.
+
+ Created by Ava and eivl.
+ """
+ snake_info = {}
+
+ async with aiohttp.ClientSession() as session:
+ params = {
+ 'format': 'json',
+ 'action': 'query',
+ 'list': 'search',
+ 'srsearch': name,
+ 'utf8': '',
+ 'srlimit': '1',
+ }
+
+ json = await self._fetch(session, URL, params=params)
+
+ # Wikipedia does have a error page
+ try:
+ pageid = json["query"]["search"][0]["pageid"]
+ except KeyError:
+ # Wikipedia error page ID(?)
+ pageid = 41118
+ except IndexError:
+ return None
+
+ params = {
+ 'format': 'json',
+ 'action': 'query',
+ 'prop': 'extracts|images|info',
+ 'exlimit': 'max',
+ 'explaintext': '',
+ 'inprop': 'url',
+ 'pageids': pageid
+ }
+
+ json = await self._fetch(session, URL, params=params)
+
+ # Constructing dict - handle exceptions later
+ try:
+ snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"]
+ snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"]
+ snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"]
+ snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"]
+ snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"]
+ except KeyError:
+ snake_info["error"] = True
+
+ if snake_info["images"]:
+ i_url = 'https://commons.wikimedia.org/wiki/Special:FilePath/'
+ image_list = []
+ map_list = []
+ thumb_list = []
+
+ # Wikipedia has arbitrary images that are not snakes
+ banned = [
+ 'Commons-logo.svg',
+ 'Red%20Pencil%20Icon.png',
+ 'distribution',
+ 'The%20Death%20of%20Cleopatra%20arthur.jpg',
+ 'Head%20of%20holotype',
+ 'locator',
+ 'Woma.png',
+ '-map.',
+ '.svg',
+ 'ange.',
+ 'Adder%20(PSF).png'
+ ]
+
+ for image in snake_info["images"]:
+ # Images come in the format of `File:filename.extension`
+ file, sep, filename = image["title"].partition(':')
+ filename = filename.replace(" ", "%20") # Wikipedia returns good data!
+
+ if not filename.startswith('Map'):
+ if any(ban in filename for ban in banned):
+ pass
+ else:
+ image_list.append(f"{i_url}{filename}")
+ thumb_list.append(f"{i_url}{filename}?width=100")
+ else:
+ map_list.append(f"{i_url}{filename}")
+
+ snake_info["image_list"] = image_list
+ snake_info["map_list"] = map_list
+ snake_info["thumb_list"] = thumb_list
+ snake_info["name"] = name
+
+ match = self.wiki_brief.match(snake_info['extract'])
+ info = match.group(1) if match else None
+
+ if info:
+ info = info.replace("\n", "\n\n") # Give us some proper paragraphs.
+
+ snake_info["info"] = info
+
+ return snake_info
+
+ async def _get_snake_name(self) -> Dict[str, str]:
+ """Gets a random snake name."""
+ return random.choice(self.snake_names)
+
+ async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: list) -> None:
+ """Validate the answer using a reaction event loop."""
+ def predicate(reaction: Reaction, user: Member) -> bool:
+ """Test if the the answer is valid and can be evaluated."""
+ return (
+ reaction.message.id == message.id # The reaction is attached to the question we asked.
+ and user == ctx.author # It's the user who triggered the quiz.
+ and str(reaction.emoji) in ANSWERS_EMOJI.values() # The reaction is one of the options.
+ )
+
+ for emoji in ANSWERS_EMOJI.values():
+ await message.add_reaction(emoji)
+
+ # Validate the answer
+ try:
+ reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate)
+ except asyncio.TimeoutError:
+ await ctx.channel.send(f"You took too long. The correct answer was **{options[answer]}**.")
+ await message.clear_reactions()
+ return
+
+ if str(reaction.emoji) == ANSWERS_EMOJI[answer]:
+ await ctx.send(f"{random.choice(CORRECT_GUESS)} The correct answer was **{options[answer]}**.")
+ else:
+ await ctx.send(
+ f"{random.choice(INCORRECT_GUESS)} The correct answer was **{options[answer]}**."
+ )
+
+ await message.clear_reactions()
+ # endregion
+
+ # region: Commands
+ @group(name='snakes', aliases=('snake',), invoke_without_command=True)
+ async def snakes_group(self, ctx: Context) -> None:
+ """Commands from our first code jam."""
+ await ctx.send_help(ctx.command)
+
+ @bot_has_permissions(manage_messages=True)
+ @snakes_group.command(name='antidote')
+ @locked()
+ async def antidote_command(self, ctx: Context) -> None:
+ """
+ Antidote! Can you create the antivenom before the patient dies?
+
+ Rules: You have 4 ingredients for each antidote, you only have 10 attempts
+ Once you synthesize the antidote, you will be presented with 4 markers
+ Tick: This means you have a CORRECT ingredient in the CORRECT position
+ Circle: This means you have a CORRECT ingredient in the WRONG position
+ Cross: This means you have a WRONG ingredient in the WRONG position
+
+ Info: The game automatically ends after 5 minutes inactivity.
+ You should only use each ingredient once.
+
+ This game was created by Lord Bisk and Runew0lf.
+ """
+ def predicate(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ # Reaction is on this message
+ reaction_.message.id == board_id.id,
+ # Reaction is one of the pagination emotes
+ reaction_.emoji in ANTIDOTE_EMOJI,
+ # Reaction was not made by the Bot
+ user_.id != self.bot.user.id,
+ # Reaction was made by author
+ user_.id == ctx.author.id
+ ))
+ )
+
+ # Initialize variables
+ antidote_tries = 0
+ antidote_guess_count = 0
+ antidote_guess_list = []
+ guess_result = []
+ board = []
+ page_guess_list = []
+ page_result_list = []
+ win = False
+
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.avatar_url)
+
+ # Generate answer
+ antidote_answer = list(ANTIDOTE_EMOJI) # Duplicate list, not reference it
+ random.shuffle(antidote_answer)
+ antidote_answer.pop()
+
+ # Begin initial board building
+ for i in range(0, 10):
+ page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}")
+ page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}")
+ board.append(f"`{i+1:02d}` "
+ f"{page_guess_list[i]} - "
+ f"{page_result_list[i]}")
+ board.append(EMPTY_UNICODE)
+ antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board))
+ board_id = await ctx.send(embed=antidote_embed) # Display board
+
+ # Add our player reactions
+ for emoji in ANTIDOTE_EMOJI:
+ await board_id.add_reaction(emoji)
+
+ # Begin main game loop
+ while not win and antidote_tries < 10:
+ try:
+ reaction, user = await ctx.bot.wait_for(
+ "reaction_add", timeout=300, check=predicate)
+ except asyncio.TimeoutError:
+ log.debug("Antidote timed out waiting for a reaction")
+ break # We're done, no reactions for the last 5 minutes
+
+ if antidote_tries < 10:
+ if antidote_guess_count < 4:
+ if reaction.emoji in ANTIDOTE_EMOJI:
+ antidote_guess_list.append(reaction.emoji)
+ antidote_guess_count += 1
+
+ if antidote_guess_count == 4: # Guesses complete
+ antidote_guess_count = 0
+ page_guess_list[antidote_tries] = " ".join(antidote_guess_list)
+
+ # Now check guess
+ for i in range(0, len(antidote_answer)):
+ if antidote_guess_list[i] == antidote_answer[i]:
+ guess_result.append(TICK_EMOJI)
+ elif antidote_guess_list[i] in antidote_answer:
+ guess_result.append(BLANK_EMOJI)
+ else:
+ guess_result.append(CROSS_EMOJI)
+ guess_result.sort()
+ page_result_list[antidote_tries] = " ".join(guess_result)
+
+ # Rebuild the board
+ board = []
+ for i in range(0, 10):
+ board.append(f"`{i+1:02d}` "
+ f"{page_guess_list[i]} - "
+ f"{page_result_list[i]}")
+ board.append(EMPTY_UNICODE)
+
+ # Remove Reactions
+ for emoji in antidote_guess_list:
+ await board_id.remove_reaction(emoji, user)
+
+ if antidote_guess_list == antidote_answer:
+ win = True
+
+ antidote_tries += 1
+ guess_result = []
+ antidote_guess_list = []
+
+ antidote_embed.clear_fields()
+ antidote_embed.add_field(name=f"{10 - antidote_tries} "
+ f"guesses remaining",
+ value="\n".join(board))
+ # Redisplay the board
+ await board_id.edit(embed=antidote_embed)
+
+ # Winning / Ending Screen
+ if win is True:
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.avatar_url)
+ antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif")
+ antidote_embed.add_field(name=f"You have created the snake antidote!",
+ value=f"The solution was: {' '.join(antidote_answer)}\n"
+ f"You had {10 - antidote_tries} tries remaining.")
+ await board_id.edit(embed=antidote_embed)
+ else:
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.avatar_url)
+ antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif")
+ antidote_embed.add_field(name=EMPTY_UNICODE,
+ value=f"Sorry you didnt make the antidote in time.\n"
+ f"The formula was {' '.join(antidote_answer)}")
+ await board_id.edit(embed=antidote_embed)
+
+ log.debug("Ending pagination and removing all reactions...")
+ await board_id.clear_reactions()
+
+ @snakes_group.command(name='draw')
+ async def draw_command(self, ctx: Context) -> None:
+ """
+ Draws a random snek using Perlin noise.
+
+ Written by Momo and kel.
+ Modified by juan and lemon.
+ """
+ with ctx.typing():
+
+ # Generate random snake attributes
+ width = random.randint(6, 10)
+ length = random.randint(15, 22)
+ random_hue = random.random()
+ snek_color = self._beautiful_pastel(random_hue)
+ text_color = self._beautiful_pastel((random_hue + 0.5) % 1)
+ bg_color = (
+ random.randint(32, 50),
+ random.randint(32, 50),
+ random.randint(50, 70),
+ )
+
+ # Build and send the snek
+ text = random.choice(self.snake_idioms)["idiom"]
+ factory = utils.PerlinNoiseFactory(dimension=1, octaves=2)
+ image_frame = utils.create_snek_frame(
+ factory,
+ snake_width=width,
+ snake_length=length,
+ snake_color=snek_color,
+ text=text,
+ text_color=text_color,
+ bg_color=bg_color
+ )
+ png_bytes = utils.frame_to_png_bytes(image_frame)
+ file = File(png_bytes, filename='snek.png')
+ await ctx.send(file=file)
+
+ @snakes_group.command(name='get')
+ @bot_has_permissions(manage_messages=True)
+ @locked()
+ async def get_command(self, ctx: Context, *, name: Snake = None) -> None:
+ """
+ Fetches information about a snake from Wikipedia.
+
+ Created by Ava and eivl.
+ """
+ with ctx.typing():
+ if name is None:
+ name = await Snake.random()
+
+ if isinstance(name, dict):
+ data = name
+ else:
+ data = await self._get_snek(name)
+
+ if data.get('error'):
+ return await ctx.send('Could not fetch data from Wikipedia.')
+
+ description = data["info"]
+
+ # Shorten the description if needed
+ if len(description) > 1000:
+ description = description[:1000]
+ last_newline = description.rfind("\n")
+ if last_newline > 0:
+ description = description[:last_newline]
+
+ # Strip and add the Wiki link.
+ if "fullurl" in data:
+ description = description.strip("\n")
+ description += f"\n\nRead more on [Wikipedia]({data['fullurl']})"
+
+ # Build and send the embed.
+ embed = Embed(
+ title=data.get("title", data.get('name')),
+ description=description,
+ colour=0x59982F,
+ )
+
+ emoji = 'https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png'
+ image = next((url for url in data['image_list']
+ if url.endswith(self.valid_image_extensions)), emoji)
+ embed.set_image(url=image)
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name='guess', aliases=('identify',))
+ @locked()
+ async def guess_command(self, ctx: Context) -> None:
+ """
+ Snake identifying game.
+
+ Made by Ava and eivl.
+ Modified by lemon.
+ """
+ with ctx.typing():
+
+ image = None
+
+ while image is None:
+ snakes = [await Snake.random() for _ in range(4)]
+ snake = random.choice(snakes)
+ answer = "abcd"[snakes.index(snake)]
+
+ data = await self._get_snek(snake)
+
+ image = next((url for url in data['image_list']
+ if url.endswith(self.valid_image_extensions)), None)
+
+ embed = Embed(
+ title='Which of the following is the snake in the image?',
+ description="\n".join(
+ f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes),
+ colour=SNAKE_COLOR
+ )
+ embed.set_image(url=image)
+
+ guess = await ctx.send(embed=embed)
+ options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes}
+ await self._validate_answer(ctx, guess, answer, options)
+
+ @snakes_group.command(name='hatch')
+ async def hatch_command(self, ctx: Context) -> None:
+ """
+ Hatches your personal snake.
+
+ Written by Momo and kel.
+ """
+ # Pick a random snake to hatch.
+ snake_name = random.choice(list(utils.snakes.keys()))
+ snake_image = utils.snakes[snake_name]
+
+ # Hatch the snake
+ message = await ctx.channel.send(embed=Embed(description="Hatching your snake :snake:..."))
+ await asyncio.sleep(1)
+
+ for stage in utils.stages:
+ hatch_embed = Embed(description=stage)
+ await message.edit(embed=hatch_embed)
+ await asyncio.sleep(1)
+ await asyncio.sleep(1)
+ await message.delete()
+
+ # Build and send the embed.
+ my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name))
+ my_snake_embed.set_thumbnail(url=snake_image)
+ my_snake_embed.set_footer(
+ text=" Owner: {0}#{1}".format(ctx.message.author.name, ctx.message.author.discriminator)
+ )
+
+ await ctx.channel.send(embed=my_snake_embed)
+
+ @snakes_group.command(name='movie')
+ async def movie_command(self, ctx: Context) -> None:
+ """
+ Gets a random snake-related movie from OMDB.
+
+ Written by Samuel.
+ Modified by gdude.
+ """
+ url = "http://www.omdbapi.com/"
+ page = random.randint(1, 27)
+
+ response = await self.bot.http_session.get(
+ url,
+ params={
+ "s": "snake",
+ "page": page,
+ "type": "movie",
+ "apikey": Tokens.omdb
+ }
+ )
+ data = await response.json()
+ movie = random.choice(data["Search"])["imdbID"]
+
+ response = await self.bot.http_session.get(
+ url,
+ params={
+ "i": movie,
+ "apikey": Tokens.omdb
+ }
+ )
+ data = await response.json()
+
+ embed = Embed(
+ title=data["Title"],
+ color=SNAKE_COLOR
+ )
+
+ del data["Response"], data["imdbID"], data["Title"]
+
+ for key, value in data.items():
+ if not value or value == "N/A" or key in ("Response", "imdbID", "Title", "Type"):
+ continue
+
+ if key == "Ratings": # [{'Source': 'Internet Movie Database', 'Value': '7.6/10'}]
+ rating = random.choice(value)
+
+ if rating["Source"] != "Internet Movie Database":
+ embed.add_field(name=f"Rating: {rating['Source']}", value=rating["Value"])
+
+ continue
+
+ if key == "Poster":
+ embed.set_image(url=value)
+ continue
+
+ elif key == "imdbRating":
+ key = "IMDB Rating"
+
+ elif key == "imdbVotes":
+ key = "IMDB Votes"
+
+ embed.add_field(name=key, value=value, inline=True)
+
+ embed.set_footer(text="Data provided by the OMDB API")
+
+ await ctx.channel.send(
+ embed=embed
+ )
+
+ @snakes_group.command(name='quiz')
+ @locked()
+ async def quiz_command(self, ctx: Context) -> None:
+ """
+ Asks a snake-related question in the chat and validates the user's guess.
+
+ This was created by Mushy and Cardium,
+ and modified by Urthas and lemon.
+ """
+ # Prepare a question.
+ question = random.choice(self.snake_quizzes)
+ answer = question["answerkey"]
+ options = {key: question["options"][key] for key in ANSWERS_EMOJI.keys()}
+
+ # Build and send the embed.
+ embed = Embed(
+ color=SNAKE_COLOR,
+ title=question["question"],
+ description="\n".join(
+ [f"**{key.upper()}**: {answer}" for key, answer in options.items()]
+ )
+ )
+
+ quiz = await ctx.channel.send("", embed=embed)
+ await self._validate_answer(ctx, quiz, answer, options)
+
+ @snakes_group.command(name='name', aliases=('name_gen',))
+ async def name_command(self, ctx: Context, *, name: str = None) -> None:
+ """
+ Snakifies a username.
+
+ Slices the users name at the last vowel (or second last if the name
+ ends with a vowel), and then combines it with a random snake name,
+ which is sliced at the first vowel (or second if the name starts with
+ a vowel).
+
+ If the name contains no vowels, it just appends the snakename
+ to the end of the name.
+
+ Examples:
+ lemon + anaconda = lemoconda
+ krzsn + anaconda = krzsnconda
+ gdude + anaconda = gduconda
+ aperture + anaconda = apertuconda
+ lucy + python = luthon
+ joseph + taipan = joseipan
+
+ This was written by Iceman, and modified for inclusion into the bot by lemon.
+ """
+ snake_name = await self._get_snake_name()
+ snake_name = snake_name['name']
+ snake_prefix = ""
+
+ # Set aside every word in the snake name except the last.
+ if " " in snake_name:
+ snake_prefix = " ".join(snake_name.split()[:-1])
+ snake_name = snake_name.split()[-1]
+
+ # If no name is provided, use whoever called the command.
+ if name:
+ user_name = name
+ else:
+ user_name = ctx.author.display_name
+
+ # Get the index of the vowel to slice the username at
+ user_slice_index = len(user_name)
+ for index, char in enumerate(reversed(user_name)):
+ if index == 0:
+ continue
+ if char.lower() in "aeiouy":
+ user_slice_index -= index
+ break
+
+ # Now, get the index of the vowel to slice the snake_name at
+ snake_slice_index = 0
+ for index, char in enumerate(snake_name):
+ if index == 0:
+ continue
+ if char.lower() in "aeiouy":
+ snake_slice_index = index + 1
+ break
+
+ # Combine!
+ snake_name = snake_name[snake_slice_index:]
+ user_name = user_name[:user_slice_index]
+ result = f"{snake_prefix} {user_name}{snake_name}"
+ result = string.capwords(result)
+
+ # Embed and send
+ embed = Embed(
+ title="Snake name",
+ description=f"Your snake-name is **{result}**",
+ color=SNAKE_COLOR
+ )
+
+ return await ctx.send(embed=embed)
+
+ @snakes_group.command(name='sal')
+ @locked()
+ async def sal_command(self, ctx: Context) -> None:
+ """
+ Play a game of Snakes and Ladders.
+
+ Written by Momo and kel.
+ Modified by lemon.
+ """
+ # Check if there is already a game in this channel
+ if ctx.channel in self.active_sal:
+ await ctx.send(f"{ctx.author.mention} A game is already in progress in this channel.")
+ return
+
+ game = utils.SnakeAndLaddersGame(snakes=self, context=ctx)
+ self.active_sal[ctx.channel] = game
+
+ await game.open_game()
+
+ @snakes_group.command(name='about')
+ async def about_command(self, ctx: Context) -> None:
+ """Show an embed with information about the event, its participants, and its winners."""
+ contributors = [
+ "<@!245270749919576066>",
+ "<@!396290259907903491>",
+ "<@!172395097705414656>",
+ "<@!361708843425726474>",
+ "<@!300302216663793665>",
+ "<@!210248051430916096>",
+ "<@!174588005745557505>",
+ "<@!87793066227822592>",
+ "<@!211619754039967744>",
+ "<@!97347867923976192>",
+ "<@!136081839474343936>",
+ "<@!263560579770220554>",
+ "<@!104749643715387392>",
+ "<@!303940835005825024>",
+ ]
+
+ embed = Embed(
+ title="About the snake cog",
+ description=(
+ "The features in this cog were created by members of the community "
+ "during our first ever [code jam event](https://gitlab.com/discord-python/code-jams/code-jam-1). \n\n"
+ "The event saw over 50 participants, who competed to write a discord bot cog with a snake theme over "
+ "48 hours. The staff then selected the best features from all the best teams, and made modifications "
+ "to ensure they would all work together before integrating them into the community bot.\n\n"
+ "It was a tight race, but in the end, <@!104749643715387392> and <@!303940835005825024> "
+ "walked away as grand champions. Make sure you check out `!snakes sal`, `!snakes draw` "
+ "and `!snakes hatch` to see what they came up with."
+ )
+ )
+
+ embed.add_field(
+ name="Contributors",
+ value=(
+ ", ".join(contributors)
+ )
+ )
+
+ await ctx.channel.send(embed=embed)
+
+ @snakes_group.command(name='card')
+ async def card_command(self, ctx: Context, *, name: Snake = None) -> None:
+ """
+ Create an interesting little card from a snake.
+
+ Created by juan and Someone during the first code jam.
+ """
+ # Get the snake data we need
+ if not name:
+ name_obj = await self._get_snake_name()
+ name = name_obj['scientific']
+ content = await self._get_snek(name)
+
+ elif isinstance(name, dict):
+ content = name
+
+ else:
+ content = await self._get_snek(name)
+
+ # Make the card
+ async with ctx.typing():
+
+ stream = BytesIO()
+ async with async_timeout.timeout(10):
+ async with self.bot.http_session.get(content['image_list'][0]) as response:
+ stream.write(await response.read())
+
+ stream.seek(0)
+
+ func = partial(self._generate_card, stream, content)
+ final_buffer = await self.bot.loop.run_in_executor(None, func)
+
+ # Send it!
+ await ctx.send(
+ f"A wild {content['name'].title()} appears!",
+ file=File(final_buffer, filename=content['name'].replace(" ", "") + ".png")
+ )
+
+ @snakes_group.command(name='fact')
+ async def fact_command(self, ctx: Context) -> None:
+ """
+ Gets a snake-related fact.
+
+ Written by Andrew and Prithaj.
+ Modified by lemon.
+ """
+ question = random.choice(self.snake_facts)["fact"]
+ embed = Embed(
+ title="Snake fact",
+ color=SNAKE_COLOR,
+ description=question
+ )
+ await ctx.channel.send(embed=embed)
+
+ @snakes_group.command(name='snakify')
+ async def snakify_command(self, ctx: Context, *, message: str = None) -> None:
+ """
+ How would I talk if I were a snake?
+
+ If `message` is passed, the bot will snakify the message.
+ Otherwise, a random message from the user's history is snakified.
+
+ Written by Momo and kel.
+ Modified by lemon.
+ """
+ with ctx.typing():
+ embed = Embed()
+ user = ctx.message.author
+
+ if not message:
+
+ # Get a random message from the users history
+ messages = []
+ async for message in ctx.channel.history(limit=500).filter(
+ lambda msg: msg.author == ctx.message.author # Message was sent by author.
+ ):
+ messages.append(message.content)
+
+ message = self._get_random_long_message(messages)
+
+ # Set the avatar
+ if user.avatar is not None:
+ avatar = f"https://cdn.discordapp.com/avatars/{user.id}/{user.avatar}"
+ else:
+ avatar = ctx.author.default_avatar_url
+
+ # Build and send the embed
+ embed.set_author(
+ name=f"{user.name}#{user.discriminator}",
+ icon_url=avatar,
+ )
+ embed.description = f"*{self._snakify(message)}*"
+
+ await ctx.channel.send(embed=embed)
+
+ @snakes_group.command(name='video', aliases=('get_video',))
+ async def video_command(self, ctx: Context, *, search: str = None) -> None:
+ """
+ Gets a YouTube video about snakes.
+
+ If `search` is given, a snake with that name will be searched on Youtube.
+
+ Written by Andrew and Prithaj.
+ """
+ # Are we searching for anything specific?
+ if search:
+ query = search + ' snake'
+ else:
+ snake = await self._get_snake_name()
+ query = snake['name']
+
+ # Build the URL and make the request
+ url = f'https://www.googleapis.com/youtube/v3/search'
+ response = await self.bot.http_session.get(
+ url,
+ params={
+ "part": "snippet",
+ "q": urllib.parse.quote(query),
+ "type": "video",
+ "key": Tokens.youtube
+ }
+ )
+ response = await response.json()
+ data = response['items']
+
+ # Send the user a video
+ if len(data) > 0:
+ num = random.randint(0, len(data) - 1)
+ youtube_base_url = 'https://www.youtube.com/watch?v='
+ await ctx.channel.send(
+ content=f"{youtube_base_url}{data[num]['id']['videoId']}"
+ )
+ else:
+ log.warning(f"YouTube API error. Full response looks like {response}")
+
+ @snakes_group.command(name='zen')
+ async def zen_command(self, ctx: Context) -> None:
+ """
+ Gets a random quote from the Zen of Python, except as if spoken by a snake.
+
+ Written by Prithaj and Andrew.
+ Modified by lemon.
+ """
+ embed = Embed(
+ title="Zzzen of Pythhon",
+ color=SNAKE_COLOR
+ )
+
+ # Get the zen quote and snakify it
+ zen_quote = random.choice(ZEN.splitlines())
+ zen_quote = self._snakify(zen_quote)
+
+ # Embed and send
+ embed.description = zen_quote
+ await ctx.channel.send(
+ embed=embed
+ )
+ # endregion
+
+ # region: Error handlers
+ @get_command.error
+ @card_command.error
+ @video_command.error
+ async def command_error(self, ctx: Context, error: CommandError) -> None:
+ """Local error handler for the Snake Cog."""
+ embed = Embed()
+ embed.colour = Colour.red()
+
+ if isinstance(error, BadArgument):
+ embed.description = str(error)
+ embed.title = random.choice(ERROR_REPLIES)
+
+ elif isinstance(error, OSError):
+ log.error(f"snake_card encountered an OSError: {error} ({error.original})")
+ embed.description = "Could not generate the snake card! Please try again."
+ embed.title = random.choice(ERROR_REPLIES)
+
+ else:
+ log.error(f"Unhandled tag command error: {error} ({error.original})")
+ return
+
+ await ctx.send(embed=embed)
+ # endregion
diff --git a/bot/exts/evergreen/snakes/utils.py b/bot/exts/evergreen/snakes/utils.py
new file mode 100644
index 00000000..7d6caf04
--- /dev/null
+++ b/bot/exts/evergreen/snakes/utils.py
@@ -0,0 +1,716 @@
+import asyncio
+import io
+import json
+import logging
+import math
+import random
+from itertools import product
+from pathlib import Path
+from typing import List, Tuple
+
+from PIL import Image
+from PIL.ImageDraw import ImageDraw
+from discord import File, Member, Reaction
+from discord.ext.commands import Cog, Context
+
+from bot.constants import Roles
+
+SNAKE_RESOURCES = Path("bot/resources/snakes").absolute()
+
+h1 = r'''```
+ ----
+ ------
+ /--------\
+ |--------|
+ |--------|
+ \------/
+ ----```'''
+h2 = r'''```
+ ----
+ ------
+ /---\-/--\
+ |-----\--|
+ |--------|
+ \------/
+ ----```'''
+h3 = r'''```
+ ----
+ ------
+ /---\-/--\
+ |-----\--|
+ |-----/--|
+ \----\-/
+ ----```'''
+h4 = r'''```
+ -----
+ ----- \
+ /--| /---\
+ |--\ -\---|
+ |--\--/-- /
+ \------- /
+ ------```'''
+stages = [h1, h2, h3, h4]
+snakes = {
+ "Baby Python": "https://i.imgur.com/SYOcmSa.png",
+ "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png",
+ "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png",
+ "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png",
+ "Baby Cobra": "https://i.imgur.com/jk14ryt.png"
+}
+
+BOARD_TILE_SIZE = 56 # the size of each board tile
+BOARD_PLAYER_SIZE = 20 # the size of each player icon
+BOARD_MARGIN = (10, 0) # margins, in pixels (for player icons)
+# The size of the image to download
+# Should a power of 2 and higher than BOARD_PLAYER_SIZE
+PLAYER_ICON_IMAGE_SIZE = 32
+MAX_PLAYERS = 4 # depends on the board size/quality, 4 is for the default board
+
+# board definition (from, to)
+BOARD = {
+ # ladders
+ 2: 38,
+ 7: 14,
+ 8: 31,
+ 15: 26,
+ 21: 42,
+ 28: 84,
+ 36: 44,
+ 51: 67,
+ 71: 91,
+ 78: 98,
+ 87: 94,
+
+ # snakes
+ 99: 80,
+ 95: 75,
+ 92: 88,
+ 89: 68,
+ 74: 53,
+ 64: 60,
+ 62: 19,
+ 49: 11,
+ 46: 25,
+ 16: 6
+}
+
+DEFAULT_SNAKE_COLOR: int = 0x15c7ea
+DEFAULT_BACKGROUND_COLOR: int = 0
+DEFAULT_IMAGE_DIMENSIONS: Tuple[int] = (200, 200)
+DEFAULT_SNAKE_LENGTH: int = 22
+DEFAULT_SNAKE_WIDTH: int = 8
+DEFAULT_SEGMENT_LENGTH_RANGE: Tuple[int] = (7, 10)
+DEFAULT_IMAGE_MARGINS: Tuple[int] = (50, 50)
+DEFAULT_TEXT: str = "snek\nit\nup"
+DEFAULT_TEXT_POSITION: Tuple[int] = (
+ 10,
+ 10
+)
+DEFAULT_TEXT_COLOR: int = 0xf2ea15
+X = 0
+Y = 1
+ANGLE_RANGE = math.pi * 2
+
+
+def get_resource(file: str) -> List[dict]:
+ """Load Snake resources JSON."""
+ with (SNAKE_RESOURCES / f"{file}.json").open(encoding="utf-8") as snakefile:
+ return json.load(snakefile)
+
+
+def smoothstep(t: float) -> float:
+ """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating."""
+ return t * t * (3. - 2. * t)
+
+
+def lerp(t: float, a: float, b: float) -> float:
+ """Linear interpolation between a and b, given a fraction t."""
+ return a + t * (b - a)
+
+
+class PerlinNoiseFactory(object):
+ """
+ Callable that produces Perlin noise for an arbitrary point in an arbitrary number of dimensions.
+
+ The underlying grid is aligned with the integers.
+
+ There is no limit to the coordinates used; new gradients are generated on the fly as necessary.
+
+ Taken from: https://gist.github.com/eevee/26f547457522755cb1fb8739d0ea89a1
+ Licensed under ISC
+ """
+
+ def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False):
+ """
+ Create a new Perlin noise factory in the given number of dimensions.
+
+ dimension should be an integer and at least 1.
+
+ More octaves create a foggier and more-detailed noise pattern. More than 4 octaves is rather excessive.
+
+ ``tile`` can be used to make a seamlessly tiling pattern.
+ For example:
+ pnf = PerlinNoiseFactory(2, tile=(0, 3))
+
+ This will produce noise that tiles every 3 units vertically, but never tiles horizontally.
+
+ If ``unbias`` is True, the smoothstep function will be applied to the output before returning
+ it, to counteract some of Perlin noise's significant bias towards the center of its output range.
+ """
+ self.dimension = dimension
+ self.octaves = octaves
+ self.tile = tile + (0,) * dimension
+ self.unbias = unbias
+
+ # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply
+ # by this to scale to ±1
+ self.scale_factor = 2 * dimension ** -0.5
+
+ self.gradient = {}
+
+ def _generate_gradient(self) -> Tuple[float, ...]:
+ """
+ Generate a random unit vector at each grid point.
+
+ This is the "gradient" vector, in that the grid tile slopes towards it
+ """
+ # 1 dimension is special, since the only unit vector is trivial;
+ # instead, use a slope between -1 and 1
+ if self.dimension == 1:
+ return (random.uniform(-1, 1),)
+
+ # Generate a random point on the surface of the unit n-hypersphere;
+ # this is the same as a random unit vector in n dimensions. Thanks
+ # to: http://mathworld.wolfram.com/SpherePointPicking.html
+ # Pick n normal random variables with stddev 1
+ random_point = [random.gauss(0, 1) for _ in range(self.dimension)]
+ # Then scale the result to a unit vector
+ scale = sum(n * n for n in random_point) ** -0.5
+ return tuple(coord * scale for coord in random_point)
+
+ def get_plain_noise(self, *point) -> float:
+ """Get plain noise for a single point, without taking into account either octaves or tiling."""
+ if len(point) != self.dimension:
+ raise ValueError("Expected {0} values, got {1}".format(
+ self.dimension, len(point)))
+
+ # Build a list of the (min, max) bounds in each dimension
+ grid_coords = []
+ for coord in point:
+ min_coord = math.floor(coord)
+ max_coord = min_coord + 1
+ grid_coords.append((min_coord, max_coord))
+
+ # Compute the dot product of each gradient vector and the point's
+ # distance from the corresponding grid point. This gives you each
+ # gradient's "influence" on the chosen point.
+ dots = []
+ for grid_point in product(*grid_coords):
+ if grid_point not in self.gradient:
+ self.gradient[grid_point] = self._generate_gradient()
+ gradient = self.gradient[grid_point]
+
+ dot = 0
+ for i in range(self.dimension):
+ dot += gradient[i] * (point[i] - grid_point[i])
+ dots.append(dot)
+
+ # Interpolate all those dot products together. The interpolation is
+ # done with smoothstep to smooth out the slope as you pass from one
+ # grid cell into the next.
+ # Due to the way product() works, dot products are ordered such that
+ # the last dimension alternates: (..., min), (..., max), etc. So we
+ # can interpolate adjacent pairs to "collapse" that last dimension. Then
+ # the results will alternate in their second-to-last dimension, and so
+ # forth, until we only have a single value left.
+ dim = self.dimension
+ while len(dots) > 1:
+ dim -= 1
+ s = smoothstep(point[dim] - grid_coords[dim][0])
+
+ next_dots = []
+ while dots:
+ next_dots.append(lerp(s, dots.pop(0), dots.pop(0)))
+
+ dots = next_dots
+
+ return dots[0] * self.scale_factor
+
+ def __call__(self, *point) -> float:
+ """
+ Get the value of this Perlin noise function at the given point.
+
+ The number of values given should match the number of dimensions.
+ """
+ ret = 0
+ for o in range(self.octaves):
+ o2 = 1 << o
+ new_point = []
+ for i, coord in enumerate(point):
+ coord *= o2
+ if self.tile[i]:
+ coord %= self.tile[i] * o2
+ new_point.append(coord)
+ ret += self.get_plain_noise(*new_point) / o2
+
+ # Need to scale n back down since adding all those extra octaves has
+ # probably expanded it beyond ±1
+ # 1 octave: ±1
+ # 2 octaves: ±1½
+ # 3 octaves: ±1¾
+ ret /= 2 - 2 ** (1 - self.octaves)
+
+ if self.unbias:
+ # The output of the plain Perlin noise algorithm has a fairly
+ # strong bias towards the center due to the central limit theorem
+ # -- in fact the top and bottom 1/8 virtually never happen. That's
+ # a quarter of our entire output range! If only we had a function
+ # in [0..1] that could introduce a bias towards the endpoints...
+ r = (ret + 1) / 2
+ # Doing it this many times is a completely made-up heuristic.
+ for _ in range(int(self.octaves / 2 + 0.5)):
+ r = smoothstep(r)
+ ret = r * 2 - 1
+
+ return ret
+
+
+def create_snek_frame(
+ perlin_factory: PerlinNoiseFactory, perlin_lookup_vertical_shift: float = 0,
+ image_dimensions: Tuple[int] = DEFAULT_IMAGE_DIMENSIONS, image_margins: Tuple[int] = DEFAULT_IMAGE_MARGINS,
+ snake_length: int = DEFAULT_SNAKE_LENGTH,
+ snake_color: int = DEFAULT_SNAKE_COLOR, bg_color: int = DEFAULT_BACKGROUND_COLOR,
+ segment_length_range: Tuple[int] = DEFAULT_SEGMENT_LENGTH_RANGE, snake_width: int = DEFAULT_SNAKE_WIDTH,
+ text: str = DEFAULT_TEXT, text_position: Tuple[int] = DEFAULT_TEXT_POSITION,
+ text_color: Tuple[int] = DEFAULT_TEXT_COLOR
+) -> Image:
+ """
+ Creates a single random snek frame using Perlin noise.
+
+ `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame.
+ If `text` is given, display the given text with the snek.
+ """
+ start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X])
+ start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y])
+ points = [(start_x, start_y)]
+
+ for index in range(0, snake_length):
+ angle = perlin_factory.get_plain_noise(
+ ((1 / (snake_length + 1)) * (index + 1)) + perlin_lookup_vertical_shift
+ ) * ANGLE_RANGE
+ current_point = points[index]
+ segment_length = random.randint(segment_length_range[0], segment_length_range[1])
+ points.append((
+ current_point[X] + segment_length * math.cos(angle),
+ current_point[Y] + segment_length * math.sin(angle)
+ ))
+
+ # normalize bounds
+ min_dimensions = [start_x, start_y]
+ max_dimensions = [start_x, start_y]
+ for point in points:
+ min_dimensions[X] = min(point[X], min_dimensions[X])
+ min_dimensions[Y] = min(point[Y], min_dimensions[Y])
+ max_dimensions[X] = max(point[X], max_dimensions[X])
+ max_dimensions[Y] = max(point[Y], max_dimensions[Y])
+
+ # shift towards middle
+ dimension_range = (max_dimensions[X] - min_dimensions[X], max_dimensions[Y] - min_dimensions[Y])
+ shift = (
+ image_dimensions[X] / 2 - (dimension_range[X] / 2 + min_dimensions[X]),
+ image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y])
+ )
+
+ image = Image.new(mode='RGB', size=image_dimensions, color=bg_color)
+ draw = ImageDraw(image)
+ for index in range(1, len(points)):
+ point = points[index]
+ previous = points[index - 1]
+ draw.line(
+ (
+ shift[X] + previous[X],
+ shift[Y] + previous[Y],
+ shift[X] + point[X],
+ shift[Y] + point[Y]
+ ),
+ width=snake_width,
+ fill=snake_color
+ )
+ if text is not None:
+ draw.multiline_text(text_position, text, fill=text_color)
+ del draw
+ return image
+
+
+def frame_to_png_bytes(image: Image) -> io.BytesIO:
+ """Convert image to byte stream."""
+ stream = io.BytesIO()
+ image.save(stream, format='PNG')
+ stream.seek(0)
+ return stream
+
+
+log = logging.getLogger(__name__)
+START_EMOJI = "\u2611" # :ballot_box_with_check: - Start the game
+CANCEL_EMOJI = "\u274C" # :x: - Cancel or leave the game
+ROLL_EMOJI = "\U0001F3B2" # :game_die: - Roll the die!
+JOIN_EMOJI = "\U0001F64B" # :raising_hand: - Join the game.
+STARTUP_SCREEN_EMOJI = [
+ JOIN_EMOJI,
+ START_EMOJI,
+ CANCEL_EMOJI
+]
+GAME_SCREEN_EMOJI = [
+ ROLL_EMOJI,
+ CANCEL_EMOJI
+]
+
+
+class SnakeAndLaddersGame:
+ """Snakes and Ladders game Cog."""
+
+ def __init__(self, snakes: Cog, context: Context):
+ self.snakes = snakes
+ self.ctx = context
+ self.channel = self.ctx.channel
+ self.state = 'booting'
+ self.started = False
+ self.author = self.ctx.author
+ self.players = []
+ self.player_tiles = {}
+ self.round_has_rolled = {}
+ self.avatar_images = {}
+ self.board = None
+ self.positions = None
+ self.rolls = []
+
+ async def open_game(self) -> None:
+ """
+ Create a new Snakes and Ladders game.
+
+ Listen for reactions until players have joined, and the game has been started.
+ """
+ def startup_event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ reaction_.message.id == startup.id, # Reaction is on startup message
+ reaction_.emoji in STARTUP_SCREEN_EMOJI, # Reaction is one of the startup emotes
+ user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot
+ ))
+ )
+
+ # Check to see if the bot can remove reactions
+ if not self.channel.permissions_for(self.ctx.guild.me).manage_messages:
+ log.warning(
+ "Unable to start Snakes and Ladders - "
+ f"Missing manage_messages permissions in {self.channel}"
+ )
+ return
+
+ await self._add_player(self.author)
+ await self.channel.send(
+ "**Snakes and Ladders**: A new game is about to start!",
+ file=File(
+ str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"),
+ filename='Snakes and Ladders.jpg'
+ )
+ )
+ startup = await self.channel.send(
+ f"Press {JOIN_EMOJI} to participate, and press "
+ f"{START_EMOJI} to start the game"
+ )
+ for emoji in STARTUP_SCREEN_EMOJI:
+ await startup.add_reaction(emoji)
+
+ self.state = 'waiting'
+
+ while not self.started:
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=300,
+ check=startup_event_check
+ )
+ if reaction.emoji == JOIN_EMOJI:
+ await self.player_join(user)
+ elif reaction.emoji == CANCEL_EMOJI:
+ if user == self.author or (self._is_moderator(user) and user not in self.players):
+ # Allow game author or non-playing moderation staff to cancel a waiting game
+ await self.cancel_game()
+ return
+ else:
+ await self.player_leave(user)
+ elif reaction.emoji == START_EMOJI:
+ if self.ctx.author == user:
+ self.started = True
+ await self.start_game(user)
+ await startup.delete()
+ break
+
+ await startup.remove_reaction(reaction.emoji, user)
+
+ except asyncio.TimeoutError:
+ log.debug("Snakes and Ladders timed out waiting for a reaction")
+ await self.cancel_game()
+ return # We're done, no reactions for the last 5 minutes
+
+ async def _add_player(self, user: Member) -> None:
+ """Add player to game."""
+ self.players.append(user)
+ self.player_tiles[user.id] = 1
+
+ avatar_bytes = await user.avatar_url_as(format='jpeg', size=PLAYER_ICON_IMAGE_SIZE).read()
+ im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))
+ self.avatar_images[user.id] = im
+
+ async def player_join(self, user: Member) -> None:
+ """
+ Handle players joining the game.
+
+ Prevent player joining if they have already joined, if the game is full, or if the game is
+ in a waiting state.
+ """
+ for p in self.players:
+ if user == p:
+ await self.channel.send(user.mention + " You are already in the game.", delete_after=10)
+ return
+ if self.state != 'waiting':
+ await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10)
+ return
+ if len(self.players) is MAX_PLAYERS:
+ await self.channel.send(user.mention + " The game is full!", delete_after=10)
+ return
+
+ await self._add_player(user)
+
+ await self.channel.send(
+ f"**Snakes and Ladders**: {user.mention} has joined the game.\n"
+ f"There are now {str(len(self.players))} players in the game.",
+ delete_after=10
+ )
+
+ async def player_leave(self, user: Member) -> bool:
+ """
+ Handle players leaving the game.
+
+ Leaving is prevented if the user wasn't part of the game.
+
+ If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean
+ is returned True to prevent a game from continuing after it's destroyed.
+ """
+ is_surrendered = False # Sentinel value to assist with stopping a surrendered game
+ for p in self.players:
+ if user == p:
+ self.players.remove(p)
+ self.player_tiles.pop(p.id, None)
+ self.round_has_rolled.pop(p.id, None)
+ await self.channel.send(
+ "**Snakes and Ladders**: " + user.mention + " has left the game.",
+ delete_after=10
+ )
+
+ if self.state != 'waiting' and len(self.players) == 0:
+ await self.channel.send("**Snakes and Ladders**: The game has been surrendered!")
+ is_surrendered = True
+ self._destruct()
+
+ return is_surrendered
+ else:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return is_surrendered
+
+ async def cancel_game(self) -> None:
+ """Cancel the running game."""
+ await self.channel.send("**Snakes and Ladders**: Game has been canceled.")
+ self._destruct()
+
+ async def start_game(self, user: Member) -> None:
+ """
+ Allow the game author to begin the game.
+
+ The game cannot be started if the game is in a waiting state.
+ """
+ if not user == self.author:
+ await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)
+ return
+
+ if not self.state == 'waiting':
+ await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)
+ return
+
+ self.state = 'starting'
+ player_list = ', '.join(user.mention for user in self.players)
+ await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)
+ await self.start_round()
+
+ async def start_round(self) -> None:
+ """Begin the round."""
+ def game_event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ reaction_.message.id == self.positions.id, # Reaction is on positions message
+ reaction_.emoji in GAME_SCREEN_EMOJI, # Reaction is one of the game emotes
+ user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot
+ ))
+ )
+
+ self.state = 'roll'
+ for user in self.players:
+ self.round_has_rolled[user.id] = False
+ board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg"))
+ player_row_size = math.ceil(MAX_PLAYERS / 2)
+
+ for i, player in enumerate(self.players):
+ tile = self.player_tiles[player.id]
+ tile_coordinates = self._board_coordinate_from_index(tile)
+ x_offset = BOARD_MARGIN[0] + tile_coordinates[0] * BOARD_TILE_SIZE
+ y_offset = \
+ BOARD_MARGIN[1] + (
+ (10 * BOARD_TILE_SIZE) - (9 - tile_coordinates[1]) * BOARD_TILE_SIZE - BOARD_PLAYER_SIZE)
+ x_offset += BOARD_PLAYER_SIZE * (i % player_row_size)
+ y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size)
+ board_img.paste(self.avatar_images[player.id],
+ box=(x_offset, y_offset))
+
+ board_file = File(frame_to_png_bytes(board_img), filename='Board.jpg')
+ player_list = '\n'.join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)
+
+ # Store and send new messages
+ temp_board = await self.channel.send(
+ "**Snakes and Ladders**: A new round has started! Current board:",
+ file=board_file
+ )
+ temp_positions = await self.channel.send(
+ f"**Current positions**:\n{player_list}\n\nUse {ROLL_EMOJI} to roll the dice!"
+ )
+
+ # Delete the previous messages
+ if self.board and self.positions:
+ await self.board.delete()
+ await self.positions.delete()
+
+ # remove the roll messages
+ for roll in self.rolls:
+ await roll.delete()
+ self.rolls = []
+
+ # Save new messages
+ self.board = temp_board
+ self.positions = temp_positions
+
+ # Wait for rolls
+ for emoji in GAME_SCREEN_EMOJI:
+ await self.positions.add_reaction(emoji)
+
+ is_surrendered = False
+ while True:
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=300,
+ check=game_event_check
+ )
+
+ if reaction.emoji == ROLL_EMOJI:
+ await self.player_roll(user)
+ elif reaction.emoji == CANCEL_EMOJI:
+ if self._is_moderator(user) and user not in self.players:
+ # Only allow non-playing moderation staff to cancel a running game
+ await self.cancel_game()
+ return
+ else:
+ is_surrendered = await self.player_leave(user)
+
+ await self.positions.remove_reaction(reaction.emoji, user)
+
+ if self._check_all_rolled():
+ break
+
+ except asyncio.TimeoutError:
+ log.debug("Snakes and Ladders timed out waiting for a reaction")
+ await self.cancel_game()
+ return # We're done, no reactions for the last 5 minutes
+
+ # Round completed
+ # Check to see if the game was surrendered before completing the round, without this
+ # sentinel, the game object would be deleted but the next round still posted into purgatory
+ if not is_surrendered:
+ await self._complete_round()
+
+ async def player_roll(self, user: Member) -> None:
+ """Handle the player's roll."""
+ if user.id not in self.player_tiles:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return
+ if self.state != 'roll':
+ await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10)
+ return
+ if self.round_has_rolled[user.id]:
+ return
+ roll = random.randint(1, 6)
+ self.rolls.append(await self.channel.send(f"{user.mention} rolled a **{roll}**!"))
+ next_tile = self.player_tiles[user.id] + roll
+
+ # apply snakes and ladders
+ if next_tile in BOARD:
+ target = BOARD[next_tile]
+ if target < next_tile:
+ await self.channel.send(
+ f"{user.mention} slips on a snake and falls back to **{target}**",
+ delete_after=15
+ )
+ else:
+ await self.channel.send(
+ f"{user.mention} climbs a ladder to **{target}**",
+ delete_after=15
+ )
+ next_tile = target
+
+ self.player_tiles[user.id] = min(100, next_tile)
+ self.round_has_rolled[user.id] = True
+
+ async def _complete_round(self) -> None:
+ """At the conclusion of a round check to see if there's been a winner."""
+ self.state = 'post_round'
+
+ # check for winner
+ winner = self._check_winner()
+ if winner is None:
+ # there is no winner, start the next round
+ await self.start_round()
+ return
+
+ # announce winner and exit
+ await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:")
+ self._destruct()
+
+ def _check_winner(self) -> Member:
+ """Return a winning member if we're in the post-round state and there's a winner."""
+ if self.state != 'post_round':
+ return None
+ return next((player for player in self.players if self.player_tiles[player.id] == 100),
+ None)
+
+ def _check_all_rolled(self) -> bool:
+ """Check if all members have made their roll."""
+ return all(rolled for rolled in self.round_has_rolled.values())
+
+ def _destruct(self) -> None:
+ """Clean up the finished game object."""
+ del self.snakes.active_sal[self.channel]
+
+ def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]:
+ """Convert the tile number to the x/y coordinates for graphical purposes."""
+ y_level = 9 - math.floor((index - 1) / 10)
+ is_reversed = math.floor((index - 1) / 10) % 2 != 0
+ x_level = (index - 1) % 10
+ if is_reversed:
+ x_level = 9 - x_level
+ return x_level, y_level
+
+ @staticmethod
+ def _is_moderator(user: Member) -> bool:
+ """Return True if the user is a Moderator."""
+ return any(Roles.moderator == role.id for role in user.roles)
diff --git a/bot/exts/evergreen/space.py b/bot/exts/evergreen/space.py
new file mode 100644
index 00000000..89b31e87
--- /dev/null
+++ b/bot/exts/evergreen/space.py
@@ -0,0 +1,240 @@
+import logging
+import random
+from datetime import datetime
+from typing import Any, Dict, Optional, Union
+from urllib.parse import urlencode
+
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import BadArgument, Cog, Context, Converter, group
+
+from bot.bot import SeasonalBot
+from bot.constants import Tokens
+
+logger = logging.getLogger(__name__)
+
+NASA_BASE_URL = "https://api.nasa.gov"
+NASA_IMAGES_BASE_URL = "https://images-api.nasa.gov"
+NASA_EPIC_BASE_URL = "https://epic.gsfc.nasa.gov"
+
+
+class DateConverter(Converter):
+ """Parse SOL or earth date (in format YYYY-MM-DD) into `int` or `datetime`. When invalid input, raise error."""
+
+ async def convert(self, ctx: Context, argument: str) -> Union[int, datetime]:
+ """Parse date (SOL or earth) into `datetime` or `int`. When invalid value, raise error."""
+ if argument.isdigit():
+ return int(argument)
+ try:
+ date = datetime.strptime(argument, "%Y-%m-%d")
+ except ValueError:
+ raise BadArgument(f"Can't convert `{argument}` to `datetime` in format `YYYY-MM-DD` or `int` in SOL.")
+ return date
+
+
+class Space(Cog):
+ """Space Cog contains commands, that show images, facts or other information about space."""
+
+ def __init__(self, bot: SeasonalBot):
+ self.bot = bot
+ self.http_session = bot.http_session
+
+ self.rovers = {}
+ self.get_rovers.start()
+
+ def cog_unload(self) -> None:
+ """Cancel `get_rovers` task when Cog will unload."""
+ self.get_rovers.cancel()
+
+ @tasks.loop(hours=24)
+ async def get_rovers(self) -> None:
+ """Get listing of rovers from NASA API and info about their start and end dates."""
+ data = await self.fetch_from_nasa("mars-photos/api/v1/rovers")
+
+ for rover in data["rovers"]:
+ self.rovers[rover["name"].lower()] = {
+ "min_date": rover["landing_date"],
+ "max_date": rover["max_date"],
+ "max_sol": rover["max_sol"]
+ }
+
+ @group(name="space", invoke_without_command=True)
+ async def space(self, ctx: Context) -> None:
+ """Head command that contains commands about space."""
+ await ctx.send_help("space")
+
+ @space.command(name="apod")
+ async def apod(self, ctx: Context, date: Optional[str] = None) -> None:
+ """
+ Get Astronomy Picture of Day from NASA API. Date is optional parameter, what formatting is YYYY-MM-DD.
+
+ If date is not specified, this will get today APOD.
+ """
+ params = {}
+ # Parse date to params, when provided. Show error message when invalid formatting
+ if date:
+ try:
+ params["date"] = datetime.strptime(date, "%Y-%m-%d").date().isoformat()
+ except ValueError:
+ await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.")
+ return
+
+ result = await self.fetch_from_nasa("planetary/apod", params)
+
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ f"Astronomy Picture of the Day - {result['date']}",
+ result["explanation"],
+ result["url"]
+ )
+ )
+
+ @space.command(name="nasa")
+ async def nasa(self, ctx: Context, *, search_term: Optional[str] = None) -> None:
+ """Get random NASA information/facts + image. Support `search_term` parameter for more specific search."""
+ params = {
+ "media_type": "image"
+ }
+ if search_term:
+ params["q"] = search_term
+
+ # Don't use API key, no need for this.
+ data = await self.fetch_from_nasa("search", params, NASA_IMAGES_BASE_URL, use_api_key=False)
+ if len(data["collection"]["items"]) == 0:
+ await ctx.send(f"Can't find any items with search term `{search_term}`.")
+ return
+
+ item = random.choice(data["collection"]["items"])
+
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ item["data"][0]["title"],
+ item["data"][0]["description"],
+ item["links"][0]["href"]
+ )
+ )
+
+ @space.command(name="epic")
+ async def epic(self, ctx: Context, date: Optional[str] = None) -> None:
+ """Get one of latest random image of earth from NASA EPIC API. Support date parameter, format is YYYY-MM-DD."""
+ if date:
+ try:
+ show_date = datetime.strptime(date, "%Y-%m-%d").date().isoformat()
+ except ValueError:
+ await ctx.send(f"Invalid date {date}. Please make sure your date is in format YYYY-MM-DD.")
+ return
+ else:
+ show_date = None
+
+ # Don't use API key, no need for this.
+ data = await self.fetch_from_nasa(
+ f"api/natural{f'/date/{show_date}' if show_date else ''}",
+ base=NASA_EPIC_BASE_URL,
+ use_api_key=False
+ )
+ if len(data) < 1:
+ await ctx.send("Can't find any images in this date.")
+ return
+
+ item = random.choice(data)
+
+ year, month, day = item["date"].split(" ")[0].split("-")
+ image_url = f"{NASA_EPIC_BASE_URL}/archive/natural/{year}/{month}/{day}/jpg/{item['image']}.jpg"
+
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ "Earth Image", item["caption"], image_url, f" \u2022 Identifier: {item['identifier']}"
+ )
+ )
+
+ @space.group(name="mars", invoke_without_command=True)
+ async def mars(
+ self,
+ ctx: Context,
+ date: Optional[DateConverter] = None,
+ rover: Optional[str] = "curiosity"
+ ) -> None:
+ """
+ Get random Mars image by date. Support both SOL (martian solar day) and earth date and rovers.
+
+ Earth date formatting is YYYY-MM-DD. Use `.space mars dates` to get all currently available rovers.
+ """
+ rover = rover.lower()
+ if rover not in self.rovers:
+ await ctx.send(
+ (
+ f"Invalid rover `{rover}`.\n"
+ f"**Rovers:** `{'`, `'.join(f'{r.capitalize()}' for r in self.rovers)}`"
+ )
+ )
+ return
+
+ # When date not provided, get random SOL date between 0 and rover's max.
+ if date is None:
+ date = random.randint(0, self.rovers[rover]["max_sol"])
+
+ params = {}
+ if isinstance(date, int):
+ params["sol"] = date
+ else:
+ params["earth_date"] = date.date().isoformat()
+
+ result = await self.fetch_from_nasa(f"mars-photos/api/v1/rovers/{rover}/photos", params)
+ if len(result["photos"]) < 1:
+ err_msg = (
+ f"We can't find result in date "
+ f"{date.date().isoformat() if isinstance(date, datetime) else f'{date} SOL'}.\n"
+ f"**Note:** Dates must match with rover's working dates. Please use `{ctx.prefix}space mars dates` to "
+ "see working dates for each rover."
+ )
+ await ctx.send(err_msg)
+ return
+
+ item = random.choice(result["photos"])
+ await ctx.send(
+ embed=self.create_nasa_embed(
+ f"{item['rover']['name']}'s {item['camera']['full_name']} Mars Image", "", item["img_src"],
+ )
+ )
+
+ @mars.command(name="dates", aliases=["d", "date", "rover", "rovers", "r"])
+ async def dates(self, ctx: Context) -> None:
+ """Get current available rovers photo date ranges."""
+ await ctx.send("\n".join(
+ f"**{r.capitalize()}:** {i['min_date']} **-** {i['max_date']}" for r, i in self.rovers.items()
+ ))
+
+ async def fetch_from_nasa(
+ self,
+ endpoint: str,
+ additional_params: Optional[Dict[str, Any]] = None,
+ base: Optional[str] = NASA_BASE_URL,
+ use_api_key: bool = True
+ ) -> Dict[str, Any]:
+ """Fetch information from NASA API, return result."""
+ params = {}
+ if use_api_key:
+ params["api_key"] = Tokens.nasa
+
+ # Add additional parameters to request parameters only when they provided by user
+ if additional_params is not None:
+ params.update(additional_params)
+
+ async with self.http_session.get(url=f"{base}/{endpoint}?{urlencode(params)}") as resp:
+ return await resp.json()
+
+ def create_nasa_embed(self, title: str, description: str, image: str, footer: Optional[str] = "") -> Embed:
+ """Generate NASA commands embeds. Required: title, description and image URL, footer (addition) is optional."""
+ return Embed(
+ title=title,
+ description=description
+ ).set_image(url=image).set_footer(text="Powered by NASA API" + footer)
+
+
+def setup(bot: SeasonalBot) -> None:
+ """Load Space Cog."""
+ if not Tokens.nasa:
+ logger.warning("Can't find NASA API key. Not loading Space Cog.")
+ return
+
+ bot.add_cog(Space(bot))
diff --git a/bot/exts/evergreen/speedrun.py b/bot/exts/evergreen/speedrun.py
new file mode 100644
index 00000000..4e8d7aee
--- /dev/null
+++ b/bot/exts/evergreen/speedrun.py
@@ -0,0 +1,27 @@
+import json
+import logging
+from pathlib import Path
+from random import choice
+
+from discord.ext import commands
+
+log = logging.getLogger(__name__)
+with Path('bot/resources/evergreen/speedrun_links.json').open(encoding="utf-8") as file:
+ LINKS = json.load(file)
+
+
+class Speedrun(commands.Cog):
+ """Commands about the video game speedrunning community."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @commands.command(name="speedrun")
+ async def get_speedrun(self, ctx: commands.Context) -> None:
+ """Sends a link to a video of a random speedrun."""
+ await ctx.send(choice(LINKS))
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the Speedrun cog."""
+ bot.add_cog(Speedrun(bot))
diff --git a/bot/exts/evergreen/trivia_quiz.py b/bot/exts/evergreen/trivia_quiz.py
new file mode 100644
index 00000000..c1a271e8
--- /dev/null
+++ b/bot/exts/evergreen/trivia_quiz.py
@@ -0,0 +1,302 @@
+import asyncio
+import json
+import logging
+import random
+from pathlib import Path
+
+import discord
+from discord.ext import commands
+from fuzzywuzzy import fuzz
+
+from bot.constants import Roles
+
+
+logger = logging.getLogger(__name__)
+
+
+WRONG_ANS_RESPONSE = [
+ "No one answered correctly!",
+ "Better luck next time"
+]
+
+
+class TriviaQuiz(commands.Cog):
+ """A cog for all quiz commands."""
+
+ def __init__(self, bot: commands.Bot) -> None:
+ self.bot = bot
+ self.questions = self.load_questions()
+ self.game_status = {} # A variable to store the game status: either running or not running.
+ self.game_owners = {} # A variable to store the person's ID who started the quiz game in a channel.
+ self.question_limit = 4
+ self.player_scores = {} # A variable to store all player's scores for a bot session.
+ self.game_player_scores = {} # A variable to store temporary game player's scores.
+ self.categories = {
+ "general": "Test your general knowledge"
+ # "retro": "Questions related to retro gaming."
+ }
+
+ @staticmethod
+ def load_questions() -> dict:
+ """Load the questions from the JSON file."""
+ p = Path("bot", "resources", "evergreen", "trivia_quiz.json")
+ with p.open() as json_data:
+ questions = json.load(json_data)
+ return questions
+
+ @commands.group(name="quiz", aliases=["trivia"], invoke_without_command=True)
+ async def quiz_game(self, ctx: commands.Context, category: str = None) -> None:
+ """
+ Start a quiz!
+
+ Questions for the quiz can be selected from the following categories:
+ - general : Test your general knowledge. (default)
+ (More to come!)
+ """
+ if ctx.channel.id not in self.game_status:
+ self.game_status[ctx.channel.id] = False
+
+ if ctx.channel.id not in self.game_player_scores:
+ self.game_player_scores[ctx.channel.id] = {}
+
+ # Stop game if running.
+ if self.game_status[ctx.channel.id] is True:
+ return await ctx.send(
+ f"Game is already running..."
+ f"do `{self.bot.command_prefix}quiz stop`"
+ )
+
+ # Send embed showing available categories if inputted category is invalid.
+ if category is None:
+ category = random.choice(list(self.categories))
+
+ category = category.lower()
+ if category not in self.categories:
+ embed = self.category_embed()
+ await ctx.send(embed=embed)
+ return
+
+ # Start game if not running.
+ if self.game_status[ctx.channel.id] is False:
+ self.game_owners[ctx.channel.id] = ctx.author
+ self.game_status[ctx.channel.id] = True
+ start_embed = self.make_start_embed(category)
+
+ await ctx.send(embed=start_embed) # send an embed with the rules
+ await asyncio.sleep(1)
+
+ topic = self.questions[category]
+
+ done_question = []
+ hint_no = 0
+ answer = None
+ while self.game_status[ctx.channel.id]:
+ # Exit quiz if number of questions for a round are already sent.
+ if len(done_question) > self.question_limit and hint_no == 0:
+ await ctx.send("The round has ended.")
+ await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id])
+
+ self.game_status[ctx.channel.id] = False
+ del self.game_owners[ctx.channel.id]
+ self.game_player_scores[ctx.channel.id] = {}
+
+ break
+
+ # If no hint has been sent or any time alert. Basically if hint_no = 0 means it is a new question.
+ if hint_no == 0:
+ # Select a random question which has not been used yet.
+ while True:
+ question_dict = random.choice(topic)
+ if question_dict["id"] not in done_question:
+ done_question.append(question_dict["id"])
+ break
+
+ q = question_dict["question"]
+ answer = question_dict["answer"]
+
+ embed = discord.Embed(colour=discord.Colour.gold())
+ embed.title = f"Question #{len(done_question)}"
+ embed.description = q
+ await ctx.send(embed=embed) # Send question embed.
+
+ # A function to check whether user input is the correct answer(close to the right answer)
+ def check(m: discord.Message) -> bool:
+ ratio = fuzz.ratio(answer.lower(), m.content.lower())
+ return ratio > 85 and m.channel == ctx.channel
+
+ try:
+ msg = await self.bot.wait_for('message', check=check, timeout=10)
+ except asyncio.TimeoutError:
+ # In case of TimeoutError and the game has been stopped, then do nothing.
+ if self.game_status[ctx.channel.id] is False:
+ break
+
+ # if number of hints sent or time alerts sent is less than 2, then send one.
+ if hint_no < 2:
+ hint_no += 1
+ if "hints" in question_dict:
+ hints = question_dict["hints"]
+ await ctx.send(f"**Hint #{hint_no+1}\n**{hints[hint_no]}")
+ else:
+ await ctx.send(f"{30 - hint_no * 10}s left!")
+
+ # Once hint or time alerts has been sent 2 times, the hint_no value will be 3
+ # If hint_no > 2, then it means that all hints/time alerts have been sent.
+ # Also means that the answer is not yet given and the bot sends the answer and the next question.
+ else:
+ if self.game_status[ctx.channel.id] is False:
+ break
+
+ response = random.choice(WRONG_ANS_RESPONSE)
+ await ctx.send(response)
+ await self.send_answer(ctx.channel, question_dict)
+ await asyncio.sleep(1)
+
+ hint_no = 0 # init hint_no = 0 so that 2 hints/time alerts can be sent for the new question.
+
+ await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id])
+ await asyncio.sleep(2)
+ else:
+ if self.game_status[ctx.channel.id] is False:
+ break
+
+ # Reduce points by 25 for every hint/time alert that has been sent.
+ points = 100 - 25*hint_no
+ if msg.author in self.game_player_scores[ctx.channel.id]:
+ self.game_player_scores[ctx.channel.id][msg.author] += points
+ else:
+ self.game_player_scores[ctx.channel.id][msg.author] = points
+
+ # Also updating the overall scoreboard.
+ if msg.author in self.player_scores:
+ self.player_scores[msg.author] += points
+ else:
+ self.player_scores[msg.author] = points
+
+ hint_no = 0
+
+ await ctx.send(f"{msg.author.mention} got the correct answer :tada: {points} points!")
+ await self.send_answer(ctx.channel, question_dict)
+ await self.send_score(ctx.channel, self.game_player_scores[ctx.channel.id])
+ await asyncio.sleep(2)
+
+ @staticmethod
+ def make_start_embed(category: str) -> discord.Embed:
+ """Generate a starting/introduction embed for the quiz."""
+ start_embed = discord.Embed(colour=discord.Colour.red())
+ start_embed.title = "Quiz game Starting!!"
+ start_embed.description = "Each game consists of 5 questions.\n"
+ start_embed.description += "**Rules :**\nNo cheating and have fun!"
+ start_embed.description += f"\n **Category** : {category}"
+ start_embed.set_footer(
+ text="Points for each question reduces by 25 after 10s or after a hint. Total time is 30s per question"
+ )
+ return start_embed
+
+ @quiz_game.command(name="stop")
+ async def stop_quiz(self, ctx: commands.Context) -> None:
+ """
+ Stop a quiz game if its running in the channel.
+
+ Note: Only mods or the owner of the quiz can stop it.
+ """
+ if self.game_status[ctx.channel.id] is True:
+ # Check if the author is the game starter or a moderator.
+ if (
+ ctx.author == self.game_owners[ctx.channel.id]
+ or any(Roles.moderator == role.id for role in ctx.author.roles)
+ ):
+ await ctx.send("Quiz stopped.")
+ await self.declare_winner(ctx.channel, self.game_player_scores[ctx.channel.id])
+
+ self.game_status[ctx.channel.id] = False
+ del self.game_owners[ctx.channel.id]
+ self.game_player_scores[ctx.channel.id] = {}
+ else:
+ await ctx.send(f"{ctx.author.mention}, you are not authorised to stop this game :ghost:!")
+ else:
+ await ctx.send("No quiz running.")
+
+ @quiz_game.command(name="leaderboard")
+ async def leaderboard(self, ctx: commands.Context) -> None:
+ """View everyone's score for this bot session."""
+ await self.send_score(ctx.channel, self.player_scores)
+
+ @staticmethod
+ async def send_score(channel: discord.TextChannel, player_data: dict) -> None:
+ """A function which sends the score."""
+ if len(player_data) == 0:
+ await channel.send("No one has made it onto the leaderboard yet.")
+ return
+
+ embed = discord.Embed(colour=discord.Colour.blue())
+ embed.title = "Score Board"
+ embed.description = ""
+
+ sorted_dict = sorted(player_data.items(), key=lambda a: a[1], reverse=True)
+ for item in sorted_dict:
+ embed.description += f"{item[0]} : {item[1]}\n"
+
+ await channel.send(embed=embed)
+
+ @staticmethod
+ async def declare_winner(channel: discord.TextChannel, player_data: dict) -> None:
+ """Announce the winner of the quiz in the game channel."""
+ if player_data:
+ highest_points = max(list(player_data.values()))
+ no_of_winners = list(player_data.values()).count(highest_points)
+
+ # Check if more than 1 player has highest points.
+ if no_of_winners > 1:
+ word = "You guys"
+ winners = []
+ points_copy = list(player_data.values()).copy()
+
+ for _ in range(no_of_winners):
+ index = points_copy.index(highest_points)
+ winners.append(list(player_data.keys())[index])
+ points_copy[index] = 0
+
+ winners_mention = " ".join(winner.mention for winner in winners)
+ else:
+ word = "You"
+ author_index = list(player_data.values()).index(highest_points)
+ winner = list(player_data.keys())[author_index]
+ winners_mention = winner.mention
+
+ await channel.send(
+ f"Congratulations {winners_mention} :tada: "
+ f"{word} have won this quiz game with a grand total of {highest_points} points!"
+ )
+
+ def category_embed(self) -> discord.Embed:
+ """Build an embed showing all available trivia categories."""
+ embed = discord.Embed(colour=discord.Colour.blue())
+ embed.title = "The available question categories are:"
+ embed.set_footer(text="If a category is not chosen, a random one will be selected.")
+ embed.description = ""
+
+ for cat, description in self.categories.items():
+ embed.description += f"**- {cat.capitalize()}**\n{description.capitalize()}\n"
+
+ return embed
+
+ @staticmethod
+ async def send_answer(channel: discord.TextChannel, question_dict: dict) -> None:
+ """Send the correct answer of a question to the game channel."""
+ answer = question_dict["answer"]
+ info = question_dict["info"]
+ embed = discord.Embed(color=discord.Colour.red())
+ embed.title = f"The correct answer is **{answer}**\n"
+ embed.description = ""
+
+ if info != "":
+ embed.description += f"**Information**\n{info}\n\n"
+
+ embed.description += "Let's move to the next question.\nRemaining questions: "
+ await channel.send(embed=embed)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the cog."""
+ bot.add_cog(TriviaQuiz(bot))
diff --git a/bot/exts/evergreen/uptime.py b/bot/exts/evergreen/uptime.py
new file mode 100644
index 00000000..a9ad9dfb
--- /dev/null
+++ b/bot/exts/evergreen/uptime.py
@@ -0,0 +1,33 @@
+import logging
+
+import arrow
+from dateutil.relativedelta import relativedelta
+from discord.ext import commands
+
+from bot import start_time
+
+log = logging.getLogger(__name__)
+
+
+class Uptime(commands.Cog):
+ """A cog for posting the bot's uptime."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @commands.command(name="uptime")
+ async def uptime(self, ctx: commands.Context) -> None:
+ """Responds with the uptime of the bot."""
+ difference = relativedelta(start_time - arrow.utcnow())
+ uptime_string = start_time.shift(
+ seconds=-difference.seconds,
+ minutes=-difference.minutes,
+ hours=-difference.hours,
+ days=-difference.days
+ ).humanize()
+ await ctx.send(f"I started up {uptime_string}.")
+
+
+def setup(bot: commands.Bot) -> None:
+ """Uptime Cog load."""
+ bot.add_cog(Uptime(bot))