aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/fun
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/fun')
-rw-r--r--bot/exts/fun/snakes/__init__.py11
-rw-r--r--bot/exts/fun/snakes/_converter.py82
-rw-r--r--bot/exts/fun/snakes/_snakes_cog.py1151
-rw-r--r--bot/exts/fun/snakes/_utils.py721
4 files changed, 1965 insertions, 0 deletions
diff --git a/bot/exts/fun/snakes/__init__.py b/bot/exts/fun/snakes/__init__.py
new file mode 100644
index 00000000..ba8333fd
--- /dev/null
+++ b/bot/exts/fun/snakes/__init__.py
@@ -0,0 +1,11 @@
+import logging
+
+from bot.bot import Bot
+from bot.exts.fun.snakes._snakes_cog import Snakes
+
+log = logging.getLogger(__name__)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Snakes Cog."""
+ bot.add_cog(Snakes(bot))
diff --git a/bot/exts/fun/snakes/_converter.py b/bot/exts/fun/snakes/_converter.py
new file mode 100644
index 00000000..c24ba8c6
--- /dev/null
+++ b/bot/exts/fun/snakes/_converter.py
@@ -0,0 +1,82 @@
+import json
+import logging
+import random
+from collections.abc import Iterable
+
+import discord
+from discord.ext.commands import Context, Converter
+from rapidfuzz import fuzz
+
+from bot.exts.fun.snakes._utils import SNAKE_RESOURCES
+from bot.utils import disambiguate
+
+log = logging.getLogger(__name__)
+
+
+class Snake(Converter):
+ """Snake converter for the Snakes Cog."""
+
+ snakes = None
+ special_cases = None
+
+ async def convert(self, ctx: Context, name: str) -> str:
+ """Convert the input snake name to the closest matching Snake object."""
+ await self.build_list()
+ name = name.lower()
+
+ if name == "python":
+ return "Python (programming language)"
+
+ def get_potential(iterable: Iterable, *, threshold: int = 80) -> list[str]:
+ nonlocal name
+ potential = []
+
+ for item in iterable:
+ original, item = item, item.lower()
+
+ if name == item:
+ return [original]
+
+ a, b = fuzz.ratio(name, item), fuzz.partial_ratio(name, item)
+ if a >= threshold or b >= threshold:
+ potential.append(original)
+
+ return potential
+
+ # Handle special cases
+ if name.lower() in self.special_cases:
+ return self.special_cases.get(name.lower(), name.lower())
+
+ names = {snake["name"]: snake["scientific"] for snake in self.snakes}
+ all_names = names.keys() | names.values()
+ timeout = len(all_names) * (3 / 4)
+
+ embed = discord.Embed(
+ title="Found multiple choices. Please choose the correct one.", colour=0x59982F)
+ embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url)
+
+ name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed)
+ return names.get(name, name)
+
+ @classmethod
+ async def build_list(cls) -> None:
+ """Build list of snakes from the static snake resources."""
+ # Get all the snakes
+ if cls.snakes is None:
+ cls.snakes = json.loads((SNAKE_RESOURCES / "snake_names.json").read_text("utf8"))
+ # Get the special cases
+ if cls.special_cases is None:
+ special_cases = json.loads((SNAKE_RESOURCES / "special_snakes.json").read_text("utf8"))
+ cls.special_cases = {snake["name"].lower(): snake for snake in special_cases}
+
+ @classmethod
+ async def random(cls) -> str:
+ """
+ Get a random Snake from the loaded resources.
+
+ This is stupid. We should find a way to somehow get the global session into a global context,
+ so I can get it from here.
+ """
+ await cls.build_list()
+ names = [snake["scientific"] for snake in cls.snakes]
+ return random.choice(names)
diff --git a/bot/exts/fun/snakes/_snakes_cog.py b/bot/exts/fun/snakes/_snakes_cog.py
new file mode 100644
index 00000000..59e57199
--- /dev/null
+++ b/bot/exts/fun/snakes/_snakes_cog.py
@@ -0,0 +1,1151 @@
+import asyncio
+import colorsys
+import logging
+import os
+import random
+import re
+import string
+import textwrap
+import urllib
+from functools import partial
+from io import BytesIO
+from typing import Any, Optional
+
+import async_timeout
+from PIL import Image, ImageDraw, ImageFont
+from discord import Colour, Embed, File, Member, Message, Reaction
+from discord.errors import HTTPException
+from discord.ext.commands import Cog, CommandError, Context, bot_has_permissions, group
+
+from bot.bot import Bot
+from bot.constants import ERROR_REPLIES, Tokens
+from bot.exts.fun.snakes import _utils as utils
+from bot.exts.fun.snakes._converter import Snake
+from bot.utils.decorators import locked
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+
+
+# region: Constants
+# Color
+SNAKE_COLOR = 0x399600
+
+# Antidote constants
+SYRINGE_EMOJI = "\U0001F489" # :syringe:
+PILL_EMOJI = "\U0001F48A" # :pill:
+HOURGLASS_EMOJI = "\u231B" # :hourglass:
+CROSSBONES_EMOJI = "\u2620" # :skull_crossbones:
+ALEMBIC_EMOJI = "\u2697" # :alembic:
+TICK_EMOJI = "\u2705" # :white_check_mark: - Correct peg, correct hole
+CROSS_EMOJI = "\u274C" # :x: - Wrong peg, wrong hole
+BLANK_EMOJI = "\u26AA" # :white_circle: - Correct peg, wrong hole
+HOLE_EMOJI = "\u2B1C" # :white_square: - Used in guesses
+EMPTY_UNICODE = "\u200b" # literally just an empty space
+
+ANTIDOTE_EMOJI = (
+ SYRINGE_EMOJI,
+ PILL_EMOJI,
+ HOURGLASS_EMOJI,
+ CROSSBONES_EMOJI,
+ ALEMBIC_EMOJI,
+)
+
+# Quiz constants
+ANSWERS_EMOJI = {
+ "a": "\U0001F1E6", # :regional_indicator_a: 🇦
+ "b": "\U0001F1E7", # :regional_indicator_b: 🇧
+ "c": "\U0001F1E8", # :regional_indicator_c: 🇨
+ "d": "\U0001F1E9", # :regional_indicator_d: 🇩
+}
+
+ANSWERS_EMOJI_REVERSE = {
+ "\U0001F1E6": "A", # :regional_indicator_a: 🇦
+ "\U0001F1E7": "B", # :regional_indicator_b: 🇧
+ "\U0001F1E8": "C", # :regional_indicator_c: 🇨
+ "\U0001F1E9": "D", # :regional_indicator_d: 🇩
+}
+
+# Zzzen of pythhhon constant
+ZEN = """
+Beautiful is better than ugly.
+Explicit is better than implicit.
+Simple is better than complex.
+Complex is better than complicated.
+Flat is better than nested.
+Sparse is better than dense.
+Readability counts.
+Special cases aren't special enough to break the rules.
+Although practicality beats purity.
+Errors should never pass silently.
+Unless explicitly silenced.
+In the face of ambiguity, refuse the temptation to guess.
+There should be one-- and preferably only one --obvious way to do it.
+Now is better than never.
+Although never is often better than *right* now.
+If the implementation is hard to explain, it's a bad idea.
+If the implementation is easy to explain, it may be a good idea.
+"""
+
+# Max messages to train snake_chat on
+MSG_MAX = 100
+
+# get_snek constants
+URL = "https://en.wikipedia.org/w/api.php?"
+
+# snake guess responses
+INCORRECT_GUESS = (
+ "Nope, that's not what it is.",
+ "Not quite.",
+ "Not even close.",
+ "Terrible guess.",
+ "Nnnno.",
+ "Dude. No.",
+ "I thought everyone knew this one.",
+ "Guess you suck at snakes.",
+ "Bet you feel stupid now.",
+ "Hahahaha, no.",
+ "Did you hit the wrong key?"
+)
+
+CORRECT_GUESS = (
+ "**WRONG**. Wait, no, actually you're right.",
+ "Yeah, you got it!",
+ "Yep, that's exactly what it is.",
+ "Uh-huh. Yep yep yep.",
+ "Yeah that's right.",
+ "Yup. How did you know that?",
+ "Are you a herpetologist?",
+ "Sure, okay, but I bet you can't pronounce it.",
+ "Are you cheating?"
+)
+
+# snake card consts
+CARD = {
+ "top": Image.open("bot/resources/fun/snakes/snake_cards/card_top.png"),
+ "frame": Image.open("bot/resources/fun/snakes/snake_cards/card_frame.png"),
+ "bottom": Image.open("bot/resources/fun/snakes/snake_cards/card_bottom.png"),
+ "backs": [
+ Image.open(f"bot/resources/fun/snakes/snake_cards/backs/{file}")
+ for file in os.listdir("bot/resources/fun/snakes/snake_cards/backs")
+ ],
+ "font": ImageFont.truetype("bot/resources/fun/snakes/snake_cards/expressway.ttf", 20)
+}
+# endregion
+
+
+class Snakes(Cog):
+ """
+ Commands related to snakes, created by our community during the first code jam.
+
+ More information can be found in the code-jam-1 repo.
+
+ https://github.com/python-discord/code-jam-1
+ """
+
+ wiki_brief = re.compile(r"(.*?)(=+ (.*?) =+)", flags=re.DOTALL)
+ valid_image_extensions = ("gif", "png", "jpeg", "jpg", "webp")
+
+ def __init__(self, bot: Bot):
+ self.active_sal = {}
+ self.bot = bot
+ self.snake_names = utils.get_resource("snake_names")
+ self.snake_idioms = utils.get_resource("snake_idioms")
+ self.snake_quizzes = utils.get_resource("snake_quiz")
+ self.snake_facts = utils.get_resource("snake_facts")
+ self.num_movie_pages = None
+
+ # region: Helper methods
+ @staticmethod
+ def _beautiful_pastel(hue: float) -> int:
+ """Returns random bright pastels."""
+ light = random.uniform(0.7, 0.85)
+ saturation = 1
+
+ rgb = colorsys.hls_to_rgb(hue, light, saturation)
+ hex_rgb = ""
+
+ for part in rgb:
+ value = int(part * 0xFF)
+ hex_rgb += f"{value:02x}"
+
+ return int(hex_rgb, 16)
+
+ @staticmethod
+ def _generate_card(buffer: BytesIO, content: dict) -> BytesIO:
+ """
+ Generate a card from snake information.
+
+ Written by juan and Someone during the first code jam.
+ """
+ snake = Image.open(buffer)
+
+ # Get the size of the snake icon, configure the height of the image box (yes, it changes)
+ icon_width = 347 # Hardcoded, not much i can do about that
+ icon_height = int((icon_width / snake.width) * snake.height)
+ frame_copies = icon_height // CARD["frame"].height + 1
+ snake.thumbnail((icon_width, icon_height))
+
+ # Get the dimensions of the final image
+ main_height = icon_height + CARD["top"].height + CARD["bottom"].height
+ main_width = CARD["frame"].width
+
+ # Start creating the foreground
+ foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0))
+ foreground.paste(CARD["top"], (0, 0))
+
+ # Generate the frame borders to the correct height
+ for offset in range(frame_copies):
+ position = (0, CARD["top"].height + offset * CARD["frame"].height)
+ foreground.paste(CARD["frame"], position)
+
+ # Add the image and bottom part of the image
+ foreground.paste(snake, (36, CARD["top"].height)) # Also hardcoded :(
+ foreground.paste(CARD["bottom"], (0, CARD["top"].height + icon_height))
+
+ # Setup the background
+ back = random.choice(CARD["backs"])
+ back_copies = main_height // back.height + 1
+ full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0))
+
+ # Generate the tiled background
+ for offset in range(back_copies):
+ full_image.paste(back, (16, 16 + offset * back.height))
+
+ # Place the foreground onto the final image
+ full_image.paste(foreground, (0, 0), foreground)
+
+ # Get the first two sentences of the info
+ description = ".".join(content["info"].split(".")[:2]) + "."
+
+ # Setup positioning variables
+ margin = 36
+ offset = CARD["top"].height + icon_height + margin
+
+ # Create blank rectangle image which will be behind the text
+ rectangle = Image.new(
+ "RGBA",
+ (main_width, main_height),
+ (0, 0, 0, 0)
+ )
+
+ # Draw a semi-transparent rectangle on it
+ rect = ImageDraw.Draw(rectangle)
+ rect.rectangle(
+ (margin, offset, main_width - margin, main_height - margin),
+ fill=(63, 63, 63, 128)
+ )
+
+ # Paste it onto the final image
+ full_image.paste(rectangle, (0, 0), mask=rectangle)
+
+ # Draw the text onto the final image
+ draw = ImageDraw.Draw(full_image)
+ for line in textwrap.wrap(description, 36):
+ draw.text([margin + 4, offset], line, font=CARD["font"])
+ offset += CARD["font"].getsize(line)[1]
+
+ # Get the image contents as a BufferIO object
+ buffer = BytesIO()
+ full_image.save(buffer, "PNG")
+ buffer.seek(0)
+
+ return buffer
+
+ @staticmethod
+ def _snakify(message: str) -> str:
+ """Sssnakifffiesss a sstring."""
+ # Replace fricatives with exaggerated snake fricatives.
+ simple_fricatives = [
+ "f", "s", "z", "h",
+ "F", "S", "Z", "H",
+ ]
+ complex_fricatives = [
+ "th", "sh", "Th", "Sh"
+ ]
+
+ for letter in simple_fricatives:
+ if letter.islower():
+ message = message.replace(letter, letter * random.randint(2, 4))
+ else:
+ message = message.replace(letter, (letter * random.randint(2, 4)).title())
+
+ for fricative in complex_fricatives:
+ message = message.replace(fricative, fricative[0] + fricative[1] * random.randint(2, 4))
+
+ return message
+
+ async def _fetch(self, url: str, params: Optional[dict] = None) -> dict:
+ """Asynchronous web request helper method."""
+ if params is None:
+ params = {}
+
+ async with async_timeout.timeout(10):
+ async with self.bot.http_session.get(url, params=params) as response:
+ return await response.json()
+
+ def _get_random_long_message(self, messages: list[str], retries: int = 10) -> str:
+ """
+ Fetch a message that's at least 3 words long, if possible to do so in retries attempts.
+
+ Else, just return whatever the last message is.
+ """
+ long_message = random.choice(messages)
+ if len(long_message.split()) < 3 and retries > 0:
+ return self._get_random_long_message(
+ messages,
+ retries=retries - 1
+ )
+
+ return long_message
+
+ async def _get_snek(self, name: str) -> dict[str, Any]:
+ """
+ Fetches all the data from a wikipedia article about a snake.
+
+ Builds a dict that the .get() method can use.
+
+ Created by Ava and eivl.
+ """
+ snake_info = {}
+
+ params = {
+ "format": "json",
+ "action": "query",
+ "list": "search",
+ "srsearch": name,
+ "utf8": "",
+ "srlimit": "1",
+ }
+
+ json = await self._fetch(URL, params=params)
+
+ # Wikipedia does have a error page
+ try:
+ pageid = json["query"]["search"][0]["pageid"]
+ except KeyError:
+ # Wikipedia error page ID(?)
+ pageid = 41118
+ except IndexError:
+ return None
+
+ params = {
+ "format": "json",
+ "action": "query",
+ "prop": "extracts|images|info",
+ "exlimit": "max",
+ "explaintext": "",
+ "inprop": "url",
+ "pageids": pageid
+ }
+
+ json = await self._fetch(URL, params=params)
+
+ # Constructing dict - handle exceptions later
+ try:
+ snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"]
+ snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"]
+ snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"]
+ snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"]
+ snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"]
+ except KeyError:
+ snake_info["error"] = True
+
+ if snake_info["images"]:
+ i_url = "https://commons.wikimedia.org/wiki/Special:FilePath/"
+ image_list = []
+ map_list = []
+ thumb_list = []
+
+ # Wikipedia has arbitrary images that are not snakes
+ banned = [
+ "Commons-logo.svg",
+ "Red%20Pencil%20Icon.png",
+ "distribution",
+ "The%20Death%20of%20Cleopatra%20arthur.jpg",
+ "Head%20of%20holotype",
+ "locator",
+ "Woma.png",
+ "-map.",
+ ".svg",
+ "ange.",
+ "Adder%20(PSF).png"
+ ]
+
+ for image in snake_info["images"]:
+ # Images come in the format of `File:filename.extension`
+ file, sep, filename = image["title"].partition(":")
+ filename = filename.replace(" ", "%20") # Wikipedia returns good data!
+
+ if not filename.startswith("Map"):
+ if any(ban in filename for ban in banned):
+ pass
+ else:
+ image_list.append(f"{i_url}{filename}")
+ thumb_list.append(f"{i_url}{filename}?width=100")
+ else:
+ map_list.append(f"{i_url}{filename}")
+
+ snake_info["image_list"] = image_list
+ snake_info["map_list"] = map_list
+ snake_info["thumb_list"] = thumb_list
+ snake_info["name"] = name
+
+ match = self.wiki_brief.match(snake_info["extract"])
+ info = match.group(1) if match else None
+
+ if info:
+ info = info.replace("\n", "\n\n") # Give us some proper paragraphs.
+
+ snake_info["info"] = info
+
+ return snake_info
+
+ async def _get_snake_name(self) -> dict[str, str]:
+ """Gets a random snake name."""
+ return random.choice(self.snake_names)
+
+ async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: dict[str, str]) -> None:
+ """Validate the answer using a reaction event loop."""
+ def predicate(reaction: Reaction, user: Member) -> bool:
+ """Test if the the answer is valid and can be evaluated."""
+ return (
+ reaction.message.id == message.id # The reaction is attached to the question we asked.
+ and user == ctx.author # It's the user who triggered the quiz.
+ and str(reaction.emoji) in ANSWERS_EMOJI.values() # The reaction is one of the options.
+ )
+
+ for emoji in ANSWERS_EMOJI.values():
+ await message.add_reaction(emoji)
+
+ # Validate the answer
+ try:
+ reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate)
+ except asyncio.TimeoutError:
+ await ctx.send(f"You took too long. The correct answer was **{options[answer]}**.")
+ await message.clear_reactions()
+ return
+
+ if str(reaction.emoji) == ANSWERS_EMOJI[answer]:
+ await ctx.send(f"{random.choice(CORRECT_GUESS)} The correct answer was **{options[answer]}**.")
+ else:
+ await ctx.send(
+ f"{random.choice(INCORRECT_GUESS)} The correct answer was **{options[answer]}**."
+ )
+
+ await message.clear_reactions()
+ # endregion
+
+ # region: Commands
+ @group(name="snakes", aliases=("snake",), invoke_without_command=True)
+ async def snakes_group(self, ctx: Context) -> None:
+ """Commands from our first code jam."""
+ await invoke_help_command(ctx)
+
+ @bot_has_permissions(manage_messages=True)
+ @snakes_group.command(name="antidote")
+ @locked()
+ async def antidote_command(self, ctx: Context) -> None:
+ """
+ Antidote! Can you create the antivenom before the patient dies?
+
+ Rules: You have 4 ingredients for each antidote, you only have 10 attempts
+ Once you synthesize the antidote, you will be presented with 4 markers
+ Tick: This means you have a CORRECT ingredient in the CORRECT position
+ Circle: This means you have a CORRECT ingredient in the WRONG position
+ Cross: This means you have a WRONG ingredient in the WRONG position
+
+ Info: The game automatically ends after 5 minutes inactivity.
+ You should only use each ingredient once.
+
+ This game was created by Lord Bisk and Runew0lf.
+ """
+ def predicate(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ # Reaction is on this message
+ reaction_.message.id == board_id.id,
+ # Reaction is one of the pagination emotes
+ reaction_.emoji in ANTIDOTE_EMOJI,
+ # Reaction was not made by the Bot
+ user_.id != self.bot.user.id,
+ # Reaction was made by author
+ user_.id == ctx.author.id
+ ))
+ )
+
+ # Initialize variables
+ antidote_tries = 0
+ antidote_guess_count = 0
+ antidote_guess_list = []
+ guess_result = []
+ board = []
+ page_guess_list = []
+ page_result_list = []
+ win = False
+
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url)
+
+ # Generate answer
+ antidote_answer = list(ANTIDOTE_EMOJI) # Duplicate list, not reference it
+ random.shuffle(antidote_answer)
+ antidote_answer.pop()
+
+ # Begin initial board building
+ for i in range(0, 10):
+ page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}")
+ page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}")
+ board.append(
+ f"`{i+1:02d}` "
+ f"{page_guess_list[i]} - "
+ f"{page_result_list[i]}"
+ )
+ board.append(EMPTY_UNICODE)
+ antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board))
+ board_id = await ctx.send(embed=antidote_embed) # Display board
+
+ # Add our player reactions
+ for emoji in ANTIDOTE_EMOJI:
+ await board_id.add_reaction(emoji)
+
+ # Begin main game loop
+ while not win and antidote_tries < 10:
+ try:
+ reaction, user = await ctx.bot.wait_for(
+ "reaction_add", timeout=300, check=predicate)
+ except asyncio.TimeoutError:
+ log.debug("Antidote timed out waiting for a reaction")
+ break # We're done, no reactions for the last 5 minutes
+
+ if antidote_tries < 10:
+ if antidote_guess_count < 4:
+ if reaction.emoji in ANTIDOTE_EMOJI:
+ antidote_guess_list.append(reaction.emoji)
+ antidote_guess_count += 1
+
+ if antidote_guess_count == 4: # Guesses complete
+ antidote_guess_count = 0
+ page_guess_list[antidote_tries] = " ".join(antidote_guess_list)
+
+ # Now check guess
+ for i in range(0, len(antidote_answer)):
+ if antidote_guess_list[i] == antidote_answer[i]:
+ guess_result.append(TICK_EMOJI)
+ elif antidote_guess_list[i] in antidote_answer:
+ guess_result.append(BLANK_EMOJI)
+ else:
+ guess_result.append(CROSS_EMOJI)
+ guess_result.sort()
+ page_result_list[antidote_tries] = " ".join(guess_result)
+
+ # Rebuild the board
+ board = []
+ for i in range(0, 10):
+ board.append(f"`{i+1:02d}` "
+ f"{page_guess_list[i]} - "
+ f"{page_result_list[i]}")
+ board.append(EMPTY_UNICODE)
+
+ # Remove Reactions
+ for emoji in antidote_guess_list:
+ await board_id.remove_reaction(emoji, user)
+
+ if antidote_guess_list == antidote_answer:
+ win = True
+
+ antidote_tries += 1
+ guess_result = []
+ antidote_guess_list = []
+
+ antidote_embed.clear_fields()
+ antidote_embed.add_field(name=f"{10 - antidote_tries} "
+ f"guesses remaining",
+ value="\n".join(board))
+ # Redisplay the board
+ await board_id.edit(embed=antidote_embed)
+
+ # Winning / Ending Screen
+ if win is True:
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url)
+ antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif")
+ antidote_embed.add_field(name="You have created the snake antidote!",
+ value=f"The solution was: {' '.join(antidote_answer)}\n"
+ f"You had {10 - antidote_tries} tries remaining.")
+ await board_id.edit(embed=antidote_embed)
+ else:
+ antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote")
+ antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url)
+ antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif")
+ antidote_embed.add_field(
+ name=EMPTY_UNICODE,
+ value=(
+ f"Sorry you didnt make the antidote in time.\n"
+ f"The formula was {' '.join(antidote_answer)}"
+ )
+ )
+ await board_id.edit(embed=antidote_embed)
+
+ log.debug("Ending pagination and removing all reactions...")
+ await board_id.clear_reactions()
+
+ @snakes_group.command(name="draw")
+ async def draw_command(self, ctx: Context) -> None:
+ """
+ Draws a random snek using Perlin noise.
+
+ Written by Momo and kel.
+ Modified by juan and lemon.
+ """
+ with ctx.typing():
+
+ # Generate random snake attributes
+ width = random.randint(6, 10)
+ length = random.randint(15, 22)
+ random_hue = random.random()
+ snek_color = self._beautiful_pastel(random_hue)
+ text_color = self._beautiful_pastel((random_hue + 0.5) % 1)
+ bg_color = (
+ random.randint(32, 50),
+ random.randint(32, 50),
+ random.randint(50, 70),
+ )
+
+ # Build and send the snek
+ text = random.choice(self.snake_idioms)["idiom"]
+ factory = utils.PerlinNoiseFactory(dimension=1, octaves=2)
+ image_frame = utils.create_snek_frame(
+ factory,
+ snake_width=width,
+ snake_length=length,
+ snake_color=snek_color,
+ text=text,
+ text_color=text_color,
+ bg_color=bg_color
+ )
+ png_bytes = utils.frame_to_png_bytes(image_frame)
+ file = File(png_bytes, filename="snek.png")
+ await ctx.send(file=file)
+
+ @snakes_group.command(name="get")
+ @bot_has_permissions(manage_messages=True)
+ @locked()
+ async def get_command(self, ctx: Context, *, name: Snake = None) -> None:
+ """
+ Fetches information about a snake from Wikipedia.
+
+ Created by Ava and eivl.
+ """
+ with ctx.typing():
+ if name is None:
+ name = await Snake.random()
+
+ if isinstance(name, dict):
+ data = name
+ else:
+ data = await self._get_snek(name)
+
+ if data.get("error"):
+ await ctx.send("Could not fetch data from Wikipedia.")
+ return
+
+ description = data["info"]
+
+ # Shorten the description if needed
+ if len(description) > 1000:
+ description = description[:1000]
+ last_newline = description.rfind("\n")
+ if last_newline > 0:
+ description = description[:last_newline]
+
+ # Strip and add the Wiki link.
+ if "fullurl" in data:
+ description = description.strip("\n")
+ description += f"\n\nRead more on [Wikipedia]({data['fullurl']})"
+
+ # Build and send the embed.
+ embed = Embed(
+ title=data.get("title", data.get("name")),
+ description=description,
+ colour=0x59982F,
+ )
+
+ emoji = "https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png"
+
+ _iter = (
+ url
+ for url in data["image_list"]
+ if url.endswith(self.valid_image_extensions)
+ )
+ image = next(_iter, emoji)
+
+ embed.set_image(url=image)
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="guess", aliases=("identify",))
+ @locked()
+ async def guess_command(self, ctx: Context) -> None:
+ """
+ Snake identifying game.
+
+ Made by Ava and eivl.
+ Modified by lemon.
+ """
+ with ctx.typing():
+
+ image = None
+
+ while image is None:
+ snakes = [await Snake.random() for _ in range(4)]
+ snake = random.choice(snakes)
+ answer = "abcd"[snakes.index(snake)]
+
+ data = await self._get_snek(snake)
+
+ _iter = (
+ url
+ for url in data["image_list"]
+ if url.endswith(self.valid_image_extensions)
+ )
+ image = next(_iter, None)
+
+ embed = Embed(
+ title="Which of the following is the snake in the image?",
+ description="\n".join(
+ f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes),
+ colour=SNAKE_COLOR
+ )
+ embed.set_image(url=image)
+
+ guess = await ctx.send(embed=embed)
+ options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes}
+ await self._validate_answer(ctx, guess, answer, options)
+
+ @snakes_group.command(name="hatch")
+ async def hatch_command(self, ctx: Context) -> None:
+ """
+ Hatches your personal snake.
+
+ Written by Momo and kel.
+ """
+ # Pick a random snake to hatch.
+ snake_name = random.choice(list(utils.snakes.keys()))
+ snake_image = utils.snakes[snake_name]
+
+ # Hatch the snake
+ message = await ctx.send(embed=Embed(description="Hatching your snake :snake:..."))
+ await asyncio.sleep(1)
+
+ for stage in utils.stages:
+ hatch_embed = Embed(description=stage)
+ await message.edit(embed=hatch_embed)
+ await asyncio.sleep(1)
+ await asyncio.sleep(1)
+ await message.delete()
+
+ # Build and send the embed.
+ my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name))
+ my_snake_embed.set_thumbnail(url=snake_image)
+ my_snake_embed.set_footer(
+ text=" Owner: {0}#{1}".format(ctx.author.name, ctx.author.discriminator)
+ )
+
+ await ctx.send(embed=my_snake_embed)
+
+ @snakes_group.command(name="movie")
+ async def movie_command(self, ctx: Context) -> None:
+ """
+ Gets a random snake-related movie from TMDB.
+
+ Written by Samuel.
+ Modified by gdude.
+ Modified by Will Da Silva.
+ """
+ # Initially 8 pages are fetched. The actual number of pages is set after the first request.
+ page = random.randint(1, self.num_movie_pages or 8)
+
+ async with ctx.typing():
+ response = await self.bot.http_session.get(
+ "https://api.themoviedb.org/3/search/movie",
+ params={
+ "query": "snake",
+ "page": page,
+ "language": "en-US",
+ "api_key": Tokens.tmdb,
+ }
+ )
+ data = await response.json()
+ if self.num_movie_pages is None:
+ self.num_movie_pages = data["total_pages"]
+ movie = random.choice(data["results"])["id"]
+
+ response = await self.bot.http_session.get(
+ f"https://api.themoviedb.org/3/movie/{movie}",
+ params={
+ "language": "en-US",
+ "api_key": Tokens.tmdb,
+ }
+ )
+ data = await response.json()
+
+ embed = Embed(title=data["title"], color=SNAKE_COLOR)
+
+ if data["poster_path"] is not None:
+ embed.set_image(url=f"https://images.tmdb.org/t/p/original{data['poster_path']}")
+
+ if data["overview"]:
+ embed.add_field(name="Overview", value=data["overview"])
+
+ if data["release_date"]:
+ embed.add_field(name="Release Date", value=data["release_date"])
+
+ if data["genres"]:
+ embed.add_field(name="Genres", value=", ".join([x["name"] for x in data["genres"]]))
+
+ if data["vote_count"]:
+ embed.add_field(name="Rating", value=f"{data['vote_average']}/10 ({data['vote_count']} votes)", inline=True)
+
+ if data["budget"] and data["revenue"]:
+ embed.add_field(name="Budget", value=data["budget"], inline=True)
+ embed.add_field(name="Revenue", value=data["revenue"], inline=True)
+
+ embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.")
+ embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png")
+
+ try:
+ await ctx.send(embed=embed)
+ except HTTPException as err:
+ await ctx.send("An error occurred while fetching a snake-related movie!")
+ raise err from None
+
+ @snakes_group.command(name="quiz")
+ @locked()
+ async def quiz_command(self, ctx: Context) -> None:
+ """
+ Asks a snake-related question in the chat and validates the user's guess.
+
+ This was created by Mushy and Cardium,
+ and modified by Urthas and lemon.
+ """
+ # Prepare a question.
+ question = random.choice(self.snake_quizzes)
+ answer = question["answerkey"]
+ options = {key: question["options"][key] for key in ANSWERS_EMOJI.keys()}
+
+ # Build and send the embed.
+ embed = Embed(
+ color=SNAKE_COLOR,
+ title=question["question"],
+ description="\n".join(
+ [f"**{key.upper()}**: {answer}" for key, answer in options.items()]
+ )
+ )
+
+ quiz = await ctx.send(embed=embed)
+ await self._validate_answer(ctx, quiz, answer, options)
+
+ @snakes_group.command(name="name", aliases=("name_gen",))
+ async def name_command(self, ctx: Context, *, name: str = None) -> None:
+ """
+ Snakifies a username.
+
+ Slices the users name at the last vowel (or second last if the name
+ ends with a vowel), and then combines it with a random snake name,
+ which is sliced at the first vowel (or second if the name starts with
+ a vowel).
+
+ If the name contains no vowels, it just appends the snakename
+ to the end of the name.
+
+ Examples:
+ lemon + anaconda = lemoconda
+ krzsn + anaconda = krzsnconda
+ gdude + anaconda = gduconda
+ aperture + anaconda = apertuconda
+ lucy + python = luthon
+ joseph + taipan = joseipan
+
+ This was written by Iceman, and modified for inclusion into the bot by lemon.
+ """
+ snake_name = await self._get_snake_name()
+ snake_name = snake_name["name"]
+ snake_prefix = ""
+
+ # Set aside every word in the snake name except the last.
+ if " " in snake_name:
+ snake_prefix = " ".join(snake_name.split()[:-1])
+ snake_name = snake_name.split()[-1]
+
+ # If no name is provided, use whoever called the command.
+ if name:
+ user_name = name
+ else:
+ user_name = ctx.author.display_name
+
+ # Get the index of the vowel to slice the username at
+ user_slice_index = len(user_name)
+ for index, char in enumerate(reversed(user_name)):
+ if index == 0:
+ continue
+ if char.lower() in "aeiouy":
+ user_slice_index -= index
+ break
+
+ # Now, get the index of the vowel to slice the snake_name at
+ snake_slice_index = 0
+ for index, char in enumerate(snake_name):
+ if index == 0:
+ continue
+ if char.lower() in "aeiouy":
+ snake_slice_index = index + 1
+ break
+
+ # Combine!
+ snake_name = snake_name[snake_slice_index:]
+ user_name = user_name[:user_slice_index]
+ result = f"{snake_prefix} {user_name}{snake_name}"
+ result = string.capwords(result)
+
+ # Embed and send
+ embed = Embed(
+ title="Snake name",
+ description=f"Your snake-name is **{result}**",
+ color=SNAKE_COLOR
+ )
+
+ await ctx.send(embed=embed)
+ return
+
+ @snakes_group.command(name="sal")
+ @locked()
+ async def sal_command(self, ctx: Context) -> None:
+ """
+ Play a game of Snakes and Ladders.
+
+ Written by Momo and kel.
+ Modified by lemon.
+ """
+ # Check if there is already a game in this channel
+ if ctx.channel in self.active_sal:
+ await ctx.send(f"{ctx.author.mention} A game is already in progress in this channel.")
+ return
+
+ game = utils.SnakeAndLaddersGame(snakes=self, context=ctx)
+ self.active_sal[ctx.channel] = game
+
+ await game.open_game()
+
+ @snakes_group.command(name="about")
+ async def about_command(self, ctx: Context) -> None:
+ """Show an embed with information about the event, its participants, and its winners."""
+ contributors = [
+ "<@!245270749919576066>",
+ "<@!396290259907903491>",
+ "<@!172395097705414656>",
+ "<@!361708843425726474>",
+ "<@!300302216663793665>",
+ "<@!210248051430916096>",
+ "<@!174588005745557505>",
+ "<@!87793066227822592>",
+ "<@!211619754039967744>",
+ "<@!97347867923976192>",
+ "<@!136081839474343936>",
+ "<@!263560579770220554>",
+ "<@!104749643715387392>",
+ "<@!303940835005825024>",
+ ]
+
+ embed = Embed(
+ title="About the snake cog",
+ description=(
+ "The features in this cog were created by members of the community "
+ "during our first ever "
+ "[code jam event](https://pythondiscord.com/pages/code-jams/code-jam-1-snakes-bot/). \n\n"
+ "The event saw over 50 participants, who competed to write a discord bot cog with a snake theme over "
+ "48 hours. The staff then selected the best features from all the best teams, and made modifications "
+ "to ensure they would all work together before integrating them into the community bot.\n\n"
+ "It was a tight race, but in the end, <@!104749643715387392> and <@!303940835005825024> "
+ f"walked away as grand champions. Make sure you check out `{ctx.prefix}snakes sal`,"
+ f"`{ctx.prefix}snakes draw` and `{ctx.prefix}snakes hatch` "
+ "to see what they came up with."
+ )
+ )
+
+ embed.add_field(
+ name="Contributors",
+ value=(
+ ", ".join(contributors)
+ )
+ )
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="card")
+ async def card_command(self, ctx: Context, *, name: Snake = None) -> None:
+ """
+ Create an interesting little card from a snake.
+
+ Created by juan and Someone during the first code jam.
+ """
+ # Get the snake data we need
+ if not name:
+ name_obj = await self._get_snake_name()
+ name = name_obj["scientific"]
+ content = await self._get_snek(name)
+
+ elif isinstance(name, dict):
+ content = name
+
+ else:
+ content = await self._get_snek(name)
+
+ # Make the card
+ async with ctx.typing():
+
+ stream = BytesIO()
+ async with async_timeout.timeout(10):
+ async with self.bot.http_session.get(content["image_list"][0]) as response:
+ stream.write(await response.read())
+
+ stream.seek(0)
+
+ func = partial(self._generate_card, stream, content)
+ final_buffer = await self.bot.loop.run_in_executor(None, func)
+
+ # Send it!
+ await ctx.send(
+ f"A wild {content['name'].title()} appears!",
+ file=File(final_buffer, filename=content["name"].replace(" ", "") + ".png")
+ )
+
+ @snakes_group.command(name="fact")
+ async def fact_command(self, ctx: Context) -> None:
+ """
+ Gets a snake-related fact.
+
+ Written by Andrew and Prithaj.
+ Modified by lemon.
+ """
+ question = random.choice(self.snake_facts)["fact"]
+ embed = Embed(
+ title="Snake fact",
+ color=SNAKE_COLOR,
+ description=question
+ )
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="snakify")
+ async def snakify_command(self, ctx: Context, *, message: str = None) -> None:
+ """
+ How would I talk if I were a snake?
+
+ If `message` is passed, the bot will snakify the message.
+ Otherwise, a random message from the user's history is snakified.
+
+ Written by Momo and kel.
+ Modified by lemon.
+ """
+ with ctx.typing():
+ embed = Embed()
+ user = ctx.author
+
+ if not message:
+
+ # Get a random message from the users history
+ messages = []
+ async for message in ctx.history(limit=500).filter(
+ lambda msg: msg.author == ctx.author # Message was sent by author.
+ ):
+ messages.append(message.content)
+
+ message = self._get_random_long_message(messages)
+
+ # Build and send the embed
+ embed.set_author(
+ name=f"{user.name}#{user.discriminator}",
+ icon_url=user.display_avatar.url,
+ )
+ embed.description = f"*{self._snakify(message)}*"
+
+ await ctx.send(embed=embed)
+
+ @snakes_group.command(name="video", aliases=("get_video",))
+ async def video_command(self, ctx: Context, *, search: str = None) -> None:
+ """
+ Gets a YouTube video about snakes.
+
+ If `search` is given, a snake with that name will be searched on Youtube.
+
+ Written by Andrew and Prithaj.
+ """
+ # Are we searching for anything specific?
+ if search:
+ query = search + " snake"
+ else:
+ snake = await self._get_snake_name()
+ query = snake["name"]
+
+ # Build the URL and make the request
+ url = "https://www.googleapis.com/youtube/v3/search"
+ response = await self.bot.http_session.get(
+ url,
+ params={
+ "part": "snippet",
+ "q": urllib.parse.quote_plus(query),
+ "type": "video",
+ "key": Tokens.youtube
+ }
+ )
+ response = await response.json()
+ data = response.get("items", [])
+
+ # Send the user a video
+ if len(data) > 0:
+ num = random.randint(0, len(data) - 1)
+ youtube_base_url = "https://www.youtube.com/watch?v="
+ await ctx.send(
+ content=f"{youtube_base_url}{data[num]['id']['videoId']}"
+ )
+ else:
+ log.warning(f"YouTube API error. Full response looks like {response}")
+
+ @snakes_group.command(name="zen")
+ async def zen_command(self, ctx: Context) -> None:
+ """
+ Gets a random quote from the Zen of Python, except as if spoken by a snake.
+
+ Written by Prithaj and Andrew.
+ Modified by lemon.
+ """
+ embed = Embed(
+ title="Zzzen of Pythhon",
+ color=SNAKE_COLOR
+ )
+
+ # Get the zen quote and snakify it
+ zen_quote = random.choice(ZEN.splitlines())
+ zen_quote = self._snakify(zen_quote)
+
+ # Embed and send
+ embed.description = zen_quote
+ await ctx.send(
+ embed=embed
+ )
+ # endregion
+
+ # region: Error handlers
+ @card_command.error
+ async def command_error(self, ctx: Context, error: CommandError) -> None:
+ """Local error handler for the Snake Cog."""
+ original_error = getattr(error, "original", None)
+ if isinstance(original_error, OSError):
+ error.handled = True
+ embed = Embed()
+ embed.colour = Colour.red()
+ log.error(f"snake_card encountered an OSError: {error} ({original_error})")
+ embed.description = "Could not generate the snake card! Please try again."
+ embed.title = random.choice(ERROR_REPLIES)
+ await ctx.send(embed=embed)
diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py
new file mode 100644
index 00000000..de51339d
--- /dev/null
+++ b/bot/exts/fun/snakes/_utils.py
@@ -0,0 +1,721 @@
+import asyncio
+import io
+import json
+import logging
+import math
+import random
+from itertools import product
+from pathlib import Path
+
+from PIL import Image
+from PIL.ImageDraw import ImageDraw
+from discord import File, Member, Reaction
+from discord.ext.commands import Cog, Context
+
+from bot.constants import Roles
+
+SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute()
+
+h1 = r"""```
+ ----
+ ------
+/--------\
+|--------|
+|--------|
+ \------/
+ ----
+```"""
+h2 = r"""```
+ ----
+ ------
+/---\-/--\
+|-----\--|
+|--------|
+ \------/
+ ----
+```"""
+h3 = r"""```
+ ----
+ ------
+/---\-/--\
+|-----\--|
+|-----/--|
+ \----\-/
+ ----
+```"""
+h4 = r"""```
+ -----
+ ----- \
+/--| /---\
+|--\ -\---|
+|--\--/-- /
+ \------- /
+ ------
+```"""
+stages = [h1, h2, h3, h4]
+snakes = {
+ "Baby Python": "https://i.imgur.com/SYOcmSa.png",
+ "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png",
+ "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png",
+ "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png",
+ "Baby Cobra": "https://i.imgur.com/jk14ryt.png",
+ "Baby Anaconda": "https://i.imgur.com/EpdrnNr.png",
+}
+
+BOARD_TILE_SIZE = 56 # the size of each board tile
+BOARD_PLAYER_SIZE = 20 # the size of each player icon
+BOARD_MARGIN = (10, 0) # margins, in pixels (for player icons)
+# The size of the image to download
+# Should a power of 2 and higher than BOARD_PLAYER_SIZE
+PLAYER_ICON_IMAGE_SIZE = 32
+MAX_PLAYERS = 4 # depends on the board size/quality, 4 is for the default board
+
+# board definition (from, to)
+BOARD = {
+ # ladders
+ 2: 38,
+ 7: 14,
+ 8: 31,
+ 15: 26,
+ 21: 42,
+ 28: 84,
+ 36: 44,
+ 51: 67,
+ 71: 91,
+ 78: 98,
+ 87: 94,
+
+ # snakes
+ 99: 80,
+ 95: 75,
+ 92: 88,
+ 89: 68,
+ 74: 53,
+ 64: 60,
+ 62: 19,
+ 49: 11,
+ 46: 25,
+ 16: 6
+}
+
+DEFAULT_SNAKE_COLOR = 0x15c7ea
+DEFAULT_BACKGROUND_COLOR = 0
+DEFAULT_IMAGE_DIMENSIONS = (200, 200)
+DEFAULT_SNAKE_LENGTH = 22
+DEFAULT_SNAKE_WIDTH = 8
+DEFAULT_SEGMENT_LENGTH_RANGE = (7, 10)
+DEFAULT_IMAGE_MARGINS = (50, 50)
+DEFAULT_TEXT = "snek\nit\nup"
+DEFAULT_TEXT_POSITION = (
+ 10,
+ 10
+)
+DEFAULT_TEXT_COLOR = 0xf2ea15
+X = 0
+Y = 1
+ANGLE_RANGE = math.pi * 2
+
+
+def get_resource(file: str) -> list[dict]:
+ """Load Snake resources JSON."""
+ return json.loads((SNAKE_RESOURCES / f"{file}.json").read_text("utf-8"))
+
+
+def smoothstep(t: float) -> float:
+ """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating."""
+ return t * t * (3. - 2. * t)
+
+
+def lerp(t: float, a: float, b: float) -> float:
+ """Linear interpolation between a and b, given a fraction t."""
+ return a + t * (b - a)
+
+
+class PerlinNoiseFactory(object):
+ """
+ Callable that produces Perlin noise for an arbitrary point in an arbitrary number of dimensions.
+
+ The underlying grid is aligned with the integers.
+
+ There is no limit to the coordinates used; new gradients are generated on the fly as necessary.
+
+ Taken from: https://gist.github.com/eevee/26f547457522755cb1fb8739d0ea89a1
+ Licensed under ISC
+ """
+
+ def __init__(self, dimension: int, octaves: int = 1, tile: tuple[int, ...] = (), unbias: bool = False):
+ """
+ Create a new Perlin noise factory in the given number of dimensions.
+
+ dimension should be an integer and at least 1.
+
+ More octaves create a foggier and more-detailed noise pattern. More than 4 octaves is rather excessive.
+
+ ``tile`` can be used to make a seamlessly tiling pattern.
+ For example:
+ pnf = PerlinNoiseFactory(2, tile=(0, 3))
+
+ This will produce noise that tiles every 3 units vertically, but never tiles horizontally.
+
+ If ``unbias`` is True, the smoothstep function will be applied to the output before returning
+ it, to counteract some of Perlin noise's significant bias towards the center of its output range.
+ """
+ self.dimension = dimension
+ self.octaves = octaves
+ self.tile = tile + (0,) * dimension
+ self.unbias = unbias
+
+ # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply
+ # by this to scale to ±1
+ self.scale_factor = 2 * dimension ** -0.5
+
+ self.gradient = {}
+
+ def _generate_gradient(self) -> tuple[float, ...]:
+ """
+ Generate a random unit vector at each grid point.
+
+ This is the "gradient" vector, in that the grid tile slopes towards it
+ """
+ # 1 dimension is special, since the only unit vector is trivial;
+ # instead, use a slope between -1 and 1
+ if self.dimension == 1:
+ return (random.uniform(-1, 1),)
+
+ # Generate a random point on the surface of the unit n-hypersphere;
+ # this is the same as a random unit vector in n dimensions. Thanks
+ # to: http://mathworld.wolfram.com/SpherePointPicking.html
+ # Pick n normal random variables with stddev 1
+ random_point = [random.gauss(0, 1) for _ in range(self.dimension)]
+ # Then scale the result to a unit vector
+ scale = sum(n * n for n in random_point) ** -0.5
+ return tuple(coord * scale for coord in random_point)
+
+ def get_plain_noise(self, *point) -> float:
+ """Get plain noise for a single point, without taking into account either octaves or tiling."""
+ if len(point) != self.dimension:
+ raise ValueError(
+ f"Expected {self.dimension} values, got {len(point)}"
+ )
+
+ # Build a list of the (min, max) bounds in each dimension
+ grid_coords = []
+ for coord in point:
+ min_coord = math.floor(coord)
+ max_coord = min_coord + 1
+ grid_coords.append((min_coord, max_coord))
+
+ # Compute the dot product of each gradient vector and the point's
+ # distance from the corresponding grid point. This gives you each
+ # gradient's "influence" on the chosen point.
+ dots = []
+ for grid_point in product(*grid_coords):
+ if grid_point not in self.gradient:
+ self.gradient[grid_point] = self._generate_gradient()
+ gradient = self.gradient[grid_point]
+
+ dot = 0
+ for i in range(self.dimension):
+ dot += gradient[i] * (point[i] - grid_point[i])
+ dots.append(dot)
+
+ # Interpolate all those dot products together. The interpolation is
+ # done with smoothstep to smooth out the slope as you pass from one
+ # grid cell into the next.
+ # Due to the way product() works, dot products are ordered such that
+ # the last dimension alternates: (..., min), (..., max), etc. So we
+ # can interpolate adjacent pairs to "collapse" that last dimension. Then
+ # the results will alternate in their second-to-last dimension, and so
+ # forth, until we only have a single value left.
+ dim = self.dimension
+ while len(dots) > 1:
+ dim -= 1
+ s = smoothstep(point[dim] - grid_coords[dim][0])
+
+ next_dots = []
+ while dots:
+ next_dots.append(lerp(s, dots.pop(0), dots.pop(0)))
+
+ dots = next_dots
+
+ return dots[0] * self.scale_factor
+
+ def __call__(self, *point) -> float:
+ """
+ Get the value of this Perlin noise function at the given point.
+
+ The number of values given should match the number of dimensions.
+ """
+ ret = 0
+ for o in range(self.octaves):
+ o2 = 1 << o
+ new_point = []
+ for i, coord in enumerate(point):
+ coord *= o2
+ if self.tile[i]:
+ coord %= self.tile[i] * o2
+ new_point.append(coord)
+ ret += self.get_plain_noise(*new_point) / o2
+
+ # Need to scale n back down since adding all those extra octaves has
+ # probably expanded it beyond ±1
+ # 1 octave: ±1
+ # 2 octaves: ±1½
+ # 3 octaves: ±1¾
+ ret /= 2 - 2 ** (1 - self.octaves)
+
+ if self.unbias:
+ # The output of the plain Perlin noise algorithm has a fairly
+ # strong bias towards the center due to the central limit theorem
+ # -- in fact the top and bottom 1/8 virtually never happen. That's
+ # a quarter of our entire output range! If only we had a function
+ # in [0..1] that could introduce a bias towards the endpoints...
+ r = (ret + 1) / 2
+ # Doing it this many times is a completely made-up heuristic.
+ for _ in range(int(self.octaves / 2 + 0.5)):
+ r = smoothstep(r)
+ ret = r * 2 - 1
+
+ return ret
+
+
+def create_snek_frame(
+ perlin_factory: PerlinNoiseFactory, perlin_lookup_vertical_shift: float = 0,
+ image_dimensions: tuple[int, int] = DEFAULT_IMAGE_DIMENSIONS,
+ image_margins: tuple[int, int] = DEFAULT_IMAGE_MARGINS,
+ snake_length: int = DEFAULT_SNAKE_LENGTH,
+ snake_color: int = DEFAULT_SNAKE_COLOR, bg_color: int = DEFAULT_BACKGROUND_COLOR,
+ segment_length_range: tuple[int, int] = DEFAULT_SEGMENT_LENGTH_RANGE, snake_width: int = DEFAULT_SNAKE_WIDTH,
+ text: str = DEFAULT_TEXT, text_position: tuple[float, float] = DEFAULT_TEXT_POSITION,
+ text_color: int = DEFAULT_TEXT_COLOR
+) -> Image.Image:
+ """
+ Creates a single random snek frame using Perlin noise.
+
+ `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame.
+ If `text` is given, display the given text with the snek.
+ """
+ start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X])
+ start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y])
+ points: list[tuple[float, float]] = [(start_x, start_y)]
+
+ for index in range(0, snake_length):
+ angle = perlin_factory.get_plain_noise(
+ ((1 / (snake_length + 1)) * (index + 1)) + perlin_lookup_vertical_shift
+ ) * ANGLE_RANGE
+ current_point = points[index]
+ segment_length = random.randint(segment_length_range[0], segment_length_range[1])
+ points.append((
+ current_point[X] + segment_length * math.cos(angle),
+ current_point[Y] + segment_length * math.sin(angle)
+ ))
+
+ # normalize bounds
+ min_dimensions: list[float] = [start_x, start_y]
+ max_dimensions: list[float] = [start_x, start_y]
+ for point in points:
+ min_dimensions[X] = min(point[X], min_dimensions[X])
+ min_dimensions[Y] = min(point[Y], min_dimensions[Y])
+ max_dimensions[X] = max(point[X], max_dimensions[X])
+ max_dimensions[Y] = max(point[Y], max_dimensions[Y])
+
+ # shift towards middle
+ dimension_range = (max_dimensions[X] - min_dimensions[X], max_dimensions[Y] - min_dimensions[Y])
+ shift = (
+ image_dimensions[X] / 2 - (dimension_range[X] / 2 + min_dimensions[X]),
+ image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y])
+ )
+
+ image = Image.new(mode="RGB", size=image_dimensions, color=bg_color)
+ draw = ImageDraw(image)
+ for index in range(1, len(points)):
+ point = points[index]
+ previous = points[index - 1]
+ draw.line(
+ (
+ shift[X] + previous[X],
+ shift[Y] + previous[Y],
+ shift[X] + point[X],
+ shift[Y] + point[Y]
+ ),
+ width=snake_width,
+ fill=snake_color
+ )
+ if text is not None:
+ draw.multiline_text(text_position, text, fill=text_color)
+ del draw
+ return image
+
+
+def frame_to_png_bytes(image: Image) -> io.BytesIO:
+ """Convert image to byte stream."""
+ stream = io.BytesIO()
+ image.save(stream, format="PNG")
+ stream.seek(0)
+ return stream
+
+
+log = logging.getLogger(__name__)
+START_EMOJI = "\u2611" # :ballot_box_with_check: - Start the game
+CANCEL_EMOJI = "\u274C" # :x: - Cancel or leave the game
+ROLL_EMOJI = "\U0001F3B2" # :game_die: - Roll the die!
+JOIN_EMOJI = "\U0001F64B" # :raising_hand: - Join the game.
+STARTUP_SCREEN_EMOJI = [
+ JOIN_EMOJI,
+ START_EMOJI,
+ CANCEL_EMOJI
+]
+GAME_SCREEN_EMOJI = [
+ ROLL_EMOJI,
+ CANCEL_EMOJI
+]
+
+
+class SnakeAndLaddersGame:
+ """Snakes and Ladders game Cog."""
+
+ def __init__(self, snakes: Cog, context: Context):
+ self.snakes = snakes
+ self.ctx = context
+ self.channel = self.ctx.channel
+ self.state = "booting"
+ self.started = False
+ self.author = self.ctx.author
+ self.players = []
+ self.player_tiles = {}
+ self.round_has_rolled = {}
+ self.avatar_images = {}
+ self.board = None
+ self.positions = None
+ self.rolls = []
+
+ async def open_game(self) -> None:
+ """
+ Create a new Snakes and Ladders game.
+
+ Listen for reactions until players have joined, and the game has been started.
+ """
+ def startup_event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ reaction_.message.id == startup.id, # Reaction is on startup message
+ reaction_.emoji in STARTUP_SCREEN_EMOJI, # Reaction is one of the startup emotes
+ user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot
+ ))
+ )
+
+ # Check to see if the bot can remove reactions
+ if not self.channel.permissions_for(self.ctx.guild.me).manage_messages:
+ log.warning(
+ "Unable to start Snakes and Ladders - "
+ f"Missing manage_messages permissions in {self.channel}"
+ )
+ return
+
+ await self._add_player(self.author)
+ await self.channel.send(
+ "**Snakes and Ladders**: A new game is about to start!",
+ file=File(
+ str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"),
+ filename="Snakes and Ladders.jpg"
+ )
+ )
+ startup = await self.channel.send(
+ f"Press {JOIN_EMOJI} to participate, and press "
+ f"{START_EMOJI} to start the game"
+ )
+ for emoji in STARTUP_SCREEN_EMOJI:
+ await startup.add_reaction(emoji)
+
+ self.state = "waiting"
+
+ while not self.started:
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=300,
+ check=startup_event_check
+ )
+ if reaction.emoji == JOIN_EMOJI:
+ await self.player_join(user)
+ elif reaction.emoji == CANCEL_EMOJI:
+ if user == self.author or (self._is_moderator(user) and user not in self.players):
+ # Allow game author or non-playing moderation staff to cancel a waiting game
+ await self.cancel_game()
+ return
+ else:
+ await self.player_leave(user)
+ elif reaction.emoji == START_EMOJI:
+ if self.ctx.author == user:
+ self.started = True
+ await self.start_game(user)
+ await startup.delete()
+ break
+
+ await startup.remove_reaction(reaction.emoji, user)
+
+ except asyncio.TimeoutError:
+ log.debug("Snakes and Ladders timed out waiting for a reaction")
+ await self.cancel_game()
+ return # We're done, no reactions for the last 5 minutes
+
+ async def _add_player(self, user: Member) -> None:
+ """Add player to game."""
+ self.players.append(user)
+ self.player_tiles[user.id] = 1
+
+ avatar_bytes = await user.display_avatar.replace(size=PLAYER_ICON_IMAGE_SIZE).read()
+ im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))
+ self.avatar_images[user.id] = im
+
+ async def player_join(self, user: Member) -> None:
+ """
+ Handle players joining the game.
+
+ Prevent player joining if they have already joined, if the game is full, or if the game is
+ in a waiting state.
+ """
+ for p in self.players:
+ if user == p:
+ await self.channel.send(user.mention + " You are already in the game.", delete_after=10)
+ return
+ if self.state != "waiting":
+ await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10)
+ return
+ if len(self.players) is MAX_PLAYERS:
+ await self.channel.send(user.mention + " The game is full!", delete_after=10)
+ return
+
+ await self._add_player(user)
+
+ await self.channel.send(
+ f"**Snakes and Ladders**: {user.mention} has joined the game.\n"
+ f"There are now {str(len(self.players))} players in the game.",
+ delete_after=10
+ )
+
+ async def player_leave(self, user: Member) -> bool:
+ """
+ Handle players leaving the game.
+
+ Leaving is prevented if the user wasn't part of the game.
+
+ If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean
+ is returned True to prevent a game from continuing after it's destroyed.
+ """
+ is_surrendered = False # Sentinel value to assist with stopping a surrendered game
+ for p in self.players:
+ if user == p:
+ self.players.remove(p)
+ self.player_tiles.pop(p.id, None)
+ self.round_has_rolled.pop(p.id, None)
+ await self.channel.send(
+ "**Snakes and Ladders**: " + user.mention + " has left the game.",
+ delete_after=10
+ )
+
+ if self.state != "waiting" and len(self.players) == 0:
+ await self.channel.send("**Snakes and Ladders**: The game has been surrendered!")
+ is_surrendered = True
+ self._destruct()
+
+ return is_surrendered
+ else:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return is_surrendered
+
+ async def cancel_game(self) -> None:
+ """Cancel the running game."""
+ await self.channel.send("**Snakes and Ladders**: Game has been canceled.")
+ self._destruct()
+
+ async def start_game(self, user: Member) -> None:
+ """
+ Allow the game author to begin the game.
+
+ The game cannot be started if the game is in a waiting state.
+ """
+ if not user == self.author:
+ await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)
+ return
+
+ if not self.state == "waiting":
+ await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)
+ return
+
+ self.state = "starting"
+ player_list = ", ".join(user.mention for user in self.players)
+ await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)
+ await self.start_round()
+
+ async def start_round(self) -> None:
+ """Begin the round."""
+ def game_event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ all((
+ reaction_.message.id == self.positions.id, # Reaction is on positions message
+ reaction_.emoji in GAME_SCREEN_EMOJI, # Reaction is one of the game emotes
+ user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot
+ ))
+ )
+
+ self.state = "roll"
+ for user in self.players:
+ self.round_has_rolled[user.id] = False
+ board_img = Image.open(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg")
+ player_row_size = math.ceil(MAX_PLAYERS / 2)
+
+ for i, player in enumerate(self.players):
+ tile = self.player_tiles[player.id]
+ tile_coordinates = self._board_coordinate_from_index(tile)
+ x_offset = BOARD_MARGIN[0] + tile_coordinates[0] * BOARD_TILE_SIZE
+ y_offset = \
+ BOARD_MARGIN[1] + (
+ (10 * BOARD_TILE_SIZE) - (9 - tile_coordinates[1]) * BOARD_TILE_SIZE - BOARD_PLAYER_SIZE)
+ x_offset += BOARD_PLAYER_SIZE * (i % player_row_size)
+ y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size)
+ board_img.paste(self.avatar_images[player.id],
+ box=(x_offset, y_offset))
+
+ board_file = File(frame_to_png_bytes(board_img), filename="Board.jpg")
+ player_list = "\n".join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)
+
+ # Store and send new messages
+ temp_board = await self.channel.send(
+ "**Snakes and Ladders**: A new round has started! Current board:",
+ file=board_file
+ )
+ temp_positions = await self.channel.send(
+ f"**Current positions**:\n{player_list}\n\nUse {ROLL_EMOJI} to roll the dice!"
+ )
+
+ # Delete the previous messages
+ if self.board and self.positions:
+ await self.board.delete()
+ await self.positions.delete()
+
+ # remove the roll messages
+ for roll in self.rolls:
+ await roll.delete()
+ self.rolls = []
+
+ # Save new messages
+ self.board = temp_board
+ self.positions = temp_positions
+
+ # Wait for rolls
+ for emoji in GAME_SCREEN_EMOJI:
+ await self.positions.add_reaction(emoji)
+
+ is_surrendered = False
+ while True:
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=300,
+ check=game_event_check
+ )
+
+ if reaction.emoji == ROLL_EMOJI:
+ await self.player_roll(user)
+ elif reaction.emoji == CANCEL_EMOJI:
+ if self._is_moderator(user) and user not in self.players:
+ # Only allow non-playing moderation staff to cancel a running game
+ await self.cancel_game()
+ return
+ else:
+ is_surrendered = await self.player_leave(user)
+
+ await self.positions.remove_reaction(reaction.emoji, user)
+
+ if self._check_all_rolled():
+ break
+
+ except asyncio.TimeoutError:
+ log.debug("Snakes and Ladders timed out waiting for a reaction")
+ await self.cancel_game()
+ return # We're done, no reactions for the last 5 minutes
+
+ # Round completed
+ # Check to see if the game was surrendered before completing the round, without this
+ # sentinel, the game object would be deleted but the next round still posted into purgatory
+ if not is_surrendered:
+ await self._complete_round()
+
+ async def player_roll(self, user: Member) -> None:
+ """Handle the player's roll."""
+ if user.id not in self.player_tiles:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return
+ if self.state != "roll":
+ await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10)
+ return
+ if self.round_has_rolled[user.id]:
+ return
+ roll = random.randint(1, 6)
+ self.rolls.append(await self.channel.send(f"{user.mention} rolled a **{roll}**!"))
+ next_tile = self.player_tiles[user.id] + roll
+
+ # apply snakes and ladders
+ if next_tile in BOARD:
+ target = BOARD[next_tile]
+ if target < next_tile:
+ await self.channel.send(
+ f"{user.mention} slips on a snake and falls back to **{target}**",
+ delete_after=15
+ )
+ else:
+ await self.channel.send(
+ f"{user.mention} climbs a ladder to **{target}**",
+ delete_after=15
+ )
+ next_tile = target
+
+ self.player_tiles[user.id] = min(100, next_tile)
+ self.round_has_rolled[user.id] = True
+
+ async def _complete_round(self) -> None:
+ """At the conclusion of a round check to see if there's been a winner."""
+ self.state = "post_round"
+
+ # check for winner
+ winner = self._check_winner()
+ if winner is None:
+ # there is no winner, start the next round
+ await self.start_round()
+ return
+
+ # announce winner and exit
+ await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:")
+ self._destruct()
+
+ def _check_winner(self) -> Member:
+ """Return a winning member if we're in the post-round state and there's a winner."""
+ if self.state != "post_round":
+ return None
+ return next((player for player in self.players if self.player_tiles[player.id] == 100),
+ None)
+
+ def _check_all_rolled(self) -> bool:
+ """Check if all members have made their roll."""
+ return all(rolled for rolled in self.round_has_rolled.values())
+
+ def _destruct(self) -> None:
+ """Clean up the finished game object."""
+ del self.snakes.active_sal[self.channel]
+
+ def _board_coordinate_from_index(self, index: int) -> tuple[int, int]:
+ """Convert the tile number to the x/y coordinates for graphical purposes."""
+ y_level = 9 - math.floor((index - 1) / 10)
+ is_reversed = math.floor((index - 1) / 10) % 2 != 0
+ x_level = (index - 1) % 10
+ if is_reversed:
+ x_level = 9 - x_level
+ return x_level, y_level
+
+ @staticmethod
+ def _is_moderator(user: Member) -> bool:
+ """Return True if the user is a Moderator."""
+ return any(Roles.moderator == role.id for role in user.roles)