diff options
Diffstat (limited to 'bot/exts/evergreen')
| -rw-r--r-- | bot/exts/evergreen/snakes/__init__.py | 11 | ||||
| -rw-r--r-- | bot/exts/evergreen/snakes/_converter.py | 82 | ||||
| -rw-r--r-- | bot/exts/evergreen/snakes/_snakes_cog.py | 1151 | ||||
| -rw-r--r-- | bot/exts/evergreen/snakes/_utils.py | 721 |
4 files changed, 0 insertions, 1965 deletions
diff --git a/bot/exts/evergreen/snakes/__init__.py b/bot/exts/evergreen/snakes/__init__.py deleted file mode 100644 index 7740429b..00000000 --- a/bot/exts/evergreen/snakes/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -import logging - -from bot.bot import Bot -from bot.exts.evergreen.snakes._snakes_cog import Snakes - -log = logging.getLogger(__name__) - - -def setup(bot: Bot) -> None: - """Load the Snakes Cog.""" - bot.add_cog(Snakes(bot)) diff --git a/bot/exts/evergreen/snakes/_converter.py b/bot/exts/evergreen/snakes/_converter.py deleted file mode 100644 index 765b983d..00000000 --- a/bot/exts/evergreen/snakes/_converter.py +++ /dev/null @@ -1,82 +0,0 @@ -import json -import logging -import random -from collections.abc import Iterable - -import discord -from discord.ext.commands import Context, Converter -from rapidfuzz import fuzz - -from bot.exts.evergreen.snakes._utils import SNAKE_RESOURCES -from bot.utils import disambiguate - -log = logging.getLogger(__name__) - - -class Snake(Converter): - """Snake converter for the Snakes Cog.""" - - snakes = None - special_cases = None - - async def convert(self, ctx: Context, name: str) -> str: - """Convert the input snake name to the closest matching Snake object.""" - await self.build_list() - name = name.lower() - - if name == "python": - return "Python (programming language)" - - def get_potential(iterable: Iterable, *, threshold: int = 80) -> list[str]: - nonlocal name - potential = [] - - for item in iterable: - original, item = item, item.lower() - - if name == item: - return [original] - - a, b = fuzz.ratio(name, item), fuzz.partial_ratio(name, item) - if a >= threshold or b >= threshold: - potential.append(original) - - return potential - - # Handle special cases - if name.lower() in self.special_cases: - return self.special_cases.get(name.lower(), name.lower()) - - names = {snake["name"]: snake["scientific"] for snake in self.snakes} - all_names = names.keys() | names.values() - timeout = len(all_names) * (3 / 4) - - embed = discord.Embed( - title="Found multiple choices. Please choose the correct one.", colour=0x59982F) - embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url) - - name = await disambiguate(ctx, get_potential(all_names), timeout=timeout, embed=embed) - return names.get(name, name) - - @classmethod - async def build_list(cls) -> None: - """Build list of snakes from the static snake resources.""" - # Get all the snakes - if cls.snakes is None: - cls.snakes = json.loads((SNAKE_RESOURCES / "snake_names.json").read_text("utf8")) - # Get the special cases - if cls.special_cases is None: - special_cases = json.loads((SNAKE_RESOURCES / "special_snakes.json").read_text("utf8")) - cls.special_cases = {snake["name"].lower(): snake for snake in special_cases} - - @classmethod - async def random(cls) -> str: - """ - Get a random Snake from the loaded resources. - - This is stupid. We should find a way to somehow get the global session into a global context, - so I can get it from here. - """ - await cls.build_list() - names = [snake["scientific"] for snake in cls.snakes] - return random.choice(names) diff --git a/bot/exts/evergreen/snakes/_snakes_cog.py b/bot/exts/evergreen/snakes/_snakes_cog.py deleted file mode 100644 index 04804222..00000000 --- a/bot/exts/evergreen/snakes/_snakes_cog.py +++ /dev/null @@ -1,1151 +0,0 @@ -import asyncio -import colorsys -import logging -import os -import random -import re -import string -import textwrap -import urllib -from functools import partial -from io import BytesIO -from typing import Any, Optional - -import async_timeout -from PIL import Image, ImageDraw, ImageFont -from discord import Colour, Embed, File, Member, Message, Reaction -from discord.errors import HTTPException -from discord.ext.commands import Cog, CommandError, Context, bot_has_permissions, group - -from bot.bot import Bot -from bot.constants import ERROR_REPLIES, Tokens -from bot.exts.evergreen.snakes import _utils as utils -from bot.exts.evergreen.snakes._converter import Snake -from bot.utils.decorators import locked -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - - -# region: Constants -# Color -SNAKE_COLOR = 0x399600 - -# Antidote constants -SYRINGE_EMOJI = "\U0001F489" # :syringe: -PILL_EMOJI = "\U0001F48A" # :pill: -HOURGLASS_EMOJI = "\u231B" # :hourglass: -CROSSBONES_EMOJI = "\u2620" # :skull_crossbones: -ALEMBIC_EMOJI = "\u2697" # :alembic: -TICK_EMOJI = "\u2705" # :white_check_mark: - Correct peg, correct hole -CROSS_EMOJI = "\u274C" # :x: - Wrong peg, wrong hole -BLANK_EMOJI = "\u26AA" # :white_circle: - Correct peg, wrong hole -HOLE_EMOJI = "\u2B1C" # :white_square: - Used in guesses -EMPTY_UNICODE = "\u200b" # literally just an empty space - -ANTIDOTE_EMOJI = ( - SYRINGE_EMOJI, - PILL_EMOJI, - HOURGLASS_EMOJI, - CROSSBONES_EMOJI, - ALEMBIC_EMOJI, -) - -# Quiz constants -ANSWERS_EMOJI = { - "a": "\U0001F1E6", # :regional_indicator_a: 🇦 - "b": "\U0001F1E7", # :regional_indicator_b: 🇧 - "c": "\U0001F1E8", # :regional_indicator_c: 🇨 - "d": "\U0001F1E9", # :regional_indicator_d: 🇩 -} - -ANSWERS_EMOJI_REVERSE = { - "\U0001F1E6": "A", # :regional_indicator_a: 🇦 - "\U0001F1E7": "B", # :regional_indicator_b: 🇧 - "\U0001F1E8": "C", # :regional_indicator_c: 🇨 - "\U0001F1E9": "D", # :regional_indicator_d: 🇩 -} - -# Zzzen of pythhhon constant -ZEN = """ -Beautiful is better than ugly. -Explicit is better than implicit. -Simple is better than complex. -Complex is better than complicated. -Flat is better than nested. -Sparse is better than dense. -Readability counts. -Special cases aren't special enough to break the rules. -Although practicality beats purity. -Errors should never pass silently. -Unless explicitly silenced. -In the face of ambiguity, refuse the temptation to guess. -There should be one-- and preferably only one --obvious way to do it. -Now is better than never. -Although never is often better than *right* now. -If the implementation is hard to explain, it's a bad idea. -If the implementation is easy to explain, it may be a good idea. -""" - -# Max messages to train snake_chat on -MSG_MAX = 100 - -# get_snek constants -URL = "https://en.wikipedia.org/w/api.php?" - -# snake guess responses -INCORRECT_GUESS = ( - "Nope, that's not what it is.", - "Not quite.", - "Not even close.", - "Terrible guess.", - "Nnnno.", - "Dude. No.", - "I thought everyone knew this one.", - "Guess you suck at snakes.", - "Bet you feel stupid now.", - "Hahahaha, no.", - "Did you hit the wrong key?" -) - -CORRECT_GUESS = ( - "**WRONG**. Wait, no, actually you're right.", - "Yeah, you got it!", - "Yep, that's exactly what it is.", - "Uh-huh. Yep yep yep.", - "Yeah that's right.", - "Yup. How did you know that?", - "Are you a herpetologist?", - "Sure, okay, but I bet you can't pronounce it.", - "Are you cheating?" -) - -# snake card consts -CARD = { - "top": Image.open("bot/resources/snakes/snake_cards/card_top.png"), - "frame": Image.open("bot/resources/snakes/snake_cards/card_frame.png"), - "bottom": Image.open("bot/resources/snakes/snake_cards/card_bottom.png"), - "backs": [ - Image.open(f"bot/resources/snakes/snake_cards/backs/{file}") - for file in os.listdir("bot/resources/snakes/snake_cards/backs") - ], - "font": ImageFont.truetype("bot/resources/snakes/snake_cards/expressway.ttf", 20) -} -# endregion - - -class Snakes(Cog): - """ - Commands related to snakes, created by our community during the first code jam. - - More information can be found in the code-jam-1 repo. - - https://github.com/python-discord/code-jam-1 - """ - - wiki_brief = re.compile(r"(.*?)(=+ (.*?) =+)", flags=re.DOTALL) - valid_image_extensions = ("gif", "png", "jpeg", "jpg", "webp") - - def __init__(self, bot: Bot): - self.active_sal = {} - self.bot = bot - self.snake_names = utils.get_resource("snake_names") - self.snake_idioms = utils.get_resource("snake_idioms") - self.snake_quizzes = utils.get_resource("snake_quiz") - self.snake_facts = utils.get_resource("snake_facts") - self.num_movie_pages = None - - # region: Helper methods - @staticmethod - def _beautiful_pastel(hue: float) -> int: - """Returns random bright pastels.""" - light = random.uniform(0.7, 0.85) - saturation = 1 - - rgb = colorsys.hls_to_rgb(hue, light, saturation) - hex_rgb = "" - - for part in rgb: - value = int(part * 0xFF) - hex_rgb += f"{value:02x}" - - return int(hex_rgb, 16) - - @staticmethod - def _generate_card(buffer: BytesIO, content: dict) -> BytesIO: - """ - Generate a card from snake information. - - Written by juan and Someone during the first code jam. - """ - snake = Image.open(buffer) - - # Get the size of the snake icon, configure the height of the image box (yes, it changes) - icon_width = 347 # Hardcoded, not much i can do about that - icon_height = int((icon_width / snake.width) * snake.height) - frame_copies = icon_height // CARD["frame"].height + 1 - snake.thumbnail((icon_width, icon_height)) - - # Get the dimensions of the final image - main_height = icon_height + CARD["top"].height + CARD["bottom"].height - main_width = CARD["frame"].width - - # Start creating the foreground - foreground = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) - foreground.paste(CARD["top"], (0, 0)) - - # Generate the frame borders to the correct height - for offset in range(frame_copies): - position = (0, CARD["top"].height + offset * CARD["frame"].height) - foreground.paste(CARD["frame"], position) - - # Add the image and bottom part of the image - foreground.paste(snake, (36, CARD["top"].height)) # Also hardcoded :( - foreground.paste(CARD["bottom"], (0, CARD["top"].height + icon_height)) - - # Setup the background - back = random.choice(CARD["backs"]) - back_copies = main_height // back.height + 1 - full_image = Image.new("RGBA", (main_width, main_height), (0, 0, 0, 0)) - - # Generate the tiled background - for offset in range(back_copies): - full_image.paste(back, (16, 16 + offset * back.height)) - - # Place the foreground onto the final image - full_image.paste(foreground, (0, 0), foreground) - - # Get the first two sentences of the info - description = ".".join(content["info"].split(".")[:2]) + "." - - # Setup positioning variables - margin = 36 - offset = CARD["top"].height + icon_height + margin - - # Create blank rectangle image which will be behind the text - rectangle = Image.new( - "RGBA", - (main_width, main_height), - (0, 0, 0, 0) - ) - - # Draw a semi-transparent rectangle on it - rect = ImageDraw.Draw(rectangle) - rect.rectangle( - (margin, offset, main_width - margin, main_height - margin), - fill=(63, 63, 63, 128) - ) - - # Paste it onto the final image - full_image.paste(rectangle, (0, 0), mask=rectangle) - - # Draw the text onto the final image - draw = ImageDraw.Draw(full_image) - for line in textwrap.wrap(description, 36): - draw.text([margin + 4, offset], line, font=CARD["font"]) - offset += CARD["font"].getsize(line)[1] - - # Get the image contents as a BufferIO object - buffer = BytesIO() - full_image.save(buffer, "PNG") - buffer.seek(0) - - return buffer - - @staticmethod - def _snakify(message: str) -> str: - """Sssnakifffiesss a sstring.""" - # Replace fricatives with exaggerated snake fricatives. - simple_fricatives = [ - "f", "s", "z", "h", - "F", "S", "Z", "H", - ] - complex_fricatives = [ - "th", "sh", "Th", "Sh" - ] - - for letter in simple_fricatives: - if letter.islower(): - message = message.replace(letter, letter * random.randint(2, 4)) - else: - message = message.replace(letter, (letter * random.randint(2, 4)).title()) - - for fricative in complex_fricatives: - message = message.replace(fricative, fricative[0] + fricative[1] * random.randint(2, 4)) - - return message - - async def _fetch(self, url: str, params: Optional[dict] = None) -> dict: - """Asynchronous web request helper method.""" - if params is None: - params = {} - - async with async_timeout.timeout(10): - async with self.bot.http_session.get(url, params=params) as response: - return await response.json() - - def _get_random_long_message(self, messages: list[str], retries: int = 10) -> str: - """ - Fetch a message that's at least 3 words long, if possible to do so in retries attempts. - - Else, just return whatever the last message is. - """ - long_message = random.choice(messages) - if len(long_message.split()) < 3 and retries > 0: - return self._get_random_long_message( - messages, - retries=retries - 1 - ) - - return long_message - - async def _get_snek(self, name: str) -> dict[str, Any]: - """ - Fetches all the data from a wikipedia article about a snake. - - Builds a dict that the .get() method can use. - - Created by Ava and eivl. - """ - snake_info = {} - - params = { - "format": "json", - "action": "query", - "list": "search", - "srsearch": name, - "utf8": "", - "srlimit": "1", - } - - json = await self._fetch(URL, params=params) - - # Wikipedia does have a error page - try: - pageid = json["query"]["search"][0]["pageid"] - except KeyError: - # Wikipedia error page ID(?) - pageid = 41118 - except IndexError: - return None - - params = { - "format": "json", - "action": "query", - "prop": "extracts|images|info", - "exlimit": "max", - "explaintext": "", - "inprop": "url", - "pageids": pageid - } - - json = await self._fetch(URL, params=params) - - # Constructing dict - handle exceptions later - try: - snake_info["title"] = json["query"]["pages"][f"{pageid}"]["title"] - snake_info["extract"] = json["query"]["pages"][f"{pageid}"]["extract"] - snake_info["images"] = json["query"]["pages"][f"{pageid}"]["images"] - snake_info["fullurl"] = json["query"]["pages"][f"{pageid}"]["fullurl"] - snake_info["pageid"] = json["query"]["pages"][f"{pageid}"]["pageid"] - except KeyError: - snake_info["error"] = True - - if snake_info["images"]: - i_url = "https://commons.wikimedia.org/wiki/Special:FilePath/" - image_list = [] - map_list = [] - thumb_list = [] - - # Wikipedia has arbitrary images that are not snakes - banned = [ - "Commons-logo.svg", - "Red%20Pencil%20Icon.png", - "distribution", - "The%20Death%20of%20Cleopatra%20arthur.jpg", - "Head%20of%20holotype", - "locator", - "Woma.png", - "-map.", - ".svg", - "ange.", - "Adder%20(PSF).png" - ] - - for image in snake_info["images"]: - # Images come in the format of `File:filename.extension` - file, sep, filename = image["title"].partition(":") - filename = filename.replace(" ", "%20") # Wikipedia returns good data! - - if not filename.startswith("Map"): - if any(ban in filename for ban in banned): - pass - else: - image_list.append(f"{i_url}{filename}") - thumb_list.append(f"{i_url}{filename}?width=100") - else: - map_list.append(f"{i_url}{filename}") - - snake_info["image_list"] = image_list - snake_info["map_list"] = map_list - snake_info["thumb_list"] = thumb_list - snake_info["name"] = name - - match = self.wiki_brief.match(snake_info["extract"]) - info = match.group(1) if match else None - - if info: - info = info.replace("\n", "\n\n") # Give us some proper paragraphs. - - snake_info["info"] = info - - return snake_info - - async def _get_snake_name(self) -> dict[str, str]: - """Gets a random snake name.""" - return random.choice(self.snake_names) - - async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: dict[str, str]) -> None: - """Validate the answer using a reaction event loop.""" - def predicate(reaction: Reaction, user: Member) -> bool: - """Test if the the answer is valid and can be evaluated.""" - return ( - reaction.message.id == message.id # The reaction is attached to the question we asked. - and user == ctx.author # It's the user who triggered the quiz. - and str(reaction.emoji) in ANSWERS_EMOJI.values() # The reaction is one of the options. - ) - - for emoji in ANSWERS_EMOJI.values(): - await message.add_reaction(emoji) - - # Validate the answer - try: - reaction, user = await ctx.bot.wait_for("reaction_add", timeout=45.0, check=predicate) - except asyncio.TimeoutError: - await ctx.send(f"You took too long. The correct answer was **{options[answer]}**.") - await message.clear_reactions() - return - - if str(reaction.emoji) == ANSWERS_EMOJI[answer]: - await ctx.send(f"{random.choice(CORRECT_GUESS)} The correct answer was **{options[answer]}**.") - else: - await ctx.send( - f"{random.choice(INCORRECT_GUESS)} The correct answer was **{options[answer]}**." - ) - - await message.clear_reactions() - # endregion - - # region: Commands - @group(name="snakes", aliases=("snake",), invoke_without_command=True) - async def snakes_group(self, ctx: Context) -> None: - """Commands from our first code jam.""" - await invoke_help_command(ctx) - - @bot_has_permissions(manage_messages=True) - @snakes_group.command(name="antidote") - @locked() - async def antidote_command(self, ctx: Context) -> None: - """ - Antidote! Can you create the antivenom before the patient dies? - - Rules: You have 4 ingredients for each antidote, you only have 10 attempts - Once you synthesize the antidote, you will be presented with 4 markers - Tick: This means you have a CORRECT ingredient in the CORRECT position - Circle: This means you have a CORRECT ingredient in the WRONG position - Cross: This means you have a WRONG ingredient in the WRONG position - - Info: The game automatically ends after 5 minutes inactivity. - You should only use each ingredient once. - - This game was created by Lord Bisk and Runew0lf. - """ - def predicate(reaction_: Reaction, user_: Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - all(( - # Reaction is on this message - reaction_.message.id == board_id.id, - # Reaction is one of the pagination emotes - reaction_.emoji in ANTIDOTE_EMOJI, - # Reaction was not made by the Bot - user_.id != self.bot.user.id, - # Reaction was made by author - user_.id == ctx.author.id - )) - ) - - # Initialize variables - antidote_tries = 0 - antidote_guess_count = 0 - antidote_guess_list = [] - guess_result = [] - board = [] - page_guess_list = [] - page_result_list = [] - win = False - - antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") - antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) - - # Generate answer - antidote_answer = list(ANTIDOTE_EMOJI) # Duplicate list, not reference it - random.shuffle(antidote_answer) - antidote_answer.pop() - - # Begin initial board building - for i in range(0, 10): - page_guess_list.append(f"{HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI} {HOLE_EMOJI}") - page_result_list.append(f"{CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI} {CROSS_EMOJI}") - board.append( - f"`{i+1:02d}` " - f"{page_guess_list[i]} - " - f"{page_result_list[i]}" - ) - board.append(EMPTY_UNICODE) - antidote_embed.add_field(name="10 guesses remaining", value="\n".join(board)) - board_id = await ctx.send(embed=antidote_embed) # Display board - - # Add our player reactions - for emoji in ANTIDOTE_EMOJI: - await board_id.add_reaction(emoji) - - # Begin main game loop - while not win and antidote_tries < 10: - try: - reaction, user = await ctx.bot.wait_for( - "reaction_add", timeout=300, check=predicate) - except asyncio.TimeoutError: - log.debug("Antidote timed out waiting for a reaction") - break # We're done, no reactions for the last 5 minutes - - if antidote_tries < 10: - if antidote_guess_count < 4: - if reaction.emoji in ANTIDOTE_EMOJI: - antidote_guess_list.append(reaction.emoji) - antidote_guess_count += 1 - - if antidote_guess_count == 4: # Guesses complete - antidote_guess_count = 0 - page_guess_list[antidote_tries] = " ".join(antidote_guess_list) - - # Now check guess - for i in range(0, len(antidote_answer)): - if antidote_guess_list[i] == antidote_answer[i]: - guess_result.append(TICK_EMOJI) - elif antidote_guess_list[i] in antidote_answer: - guess_result.append(BLANK_EMOJI) - else: - guess_result.append(CROSS_EMOJI) - guess_result.sort() - page_result_list[antidote_tries] = " ".join(guess_result) - - # Rebuild the board - board = [] - for i in range(0, 10): - board.append(f"`{i+1:02d}` " - f"{page_guess_list[i]} - " - f"{page_result_list[i]}") - board.append(EMPTY_UNICODE) - - # Remove Reactions - for emoji in antidote_guess_list: - await board_id.remove_reaction(emoji, user) - - if antidote_guess_list == antidote_answer: - win = True - - antidote_tries += 1 - guess_result = [] - antidote_guess_list = [] - - antidote_embed.clear_fields() - antidote_embed.add_field(name=f"{10 - antidote_tries} " - f"guesses remaining", - value="\n".join(board)) - # Redisplay the board - await board_id.edit(embed=antidote_embed) - - # Winning / Ending Screen - if win is True: - antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") - antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) - antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif") - antidote_embed.add_field(name="You have created the snake antidote!", - value=f"The solution was: {' '.join(antidote_answer)}\n" - f"You had {10 - antidote_tries} tries remaining.") - await board_id.edit(embed=antidote_embed) - else: - antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") - antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.display_avatar.url) - antidote_embed.set_image(url="https://media.giphy.com/media/ceeN6U57leAhi/giphy.gif") - antidote_embed.add_field( - name=EMPTY_UNICODE, - value=( - f"Sorry you didnt make the antidote in time.\n" - f"The formula was {' '.join(antidote_answer)}" - ) - ) - await board_id.edit(embed=antidote_embed) - - log.debug("Ending pagination and removing all reactions...") - await board_id.clear_reactions() - - @snakes_group.command(name="draw") - async def draw_command(self, ctx: Context) -> None: - """ - Draws a random snek using Perlin noise. - - Written by Momo and kel. - Modified by juan and lemon. - """ - with ctx.typing(): - - # Generate random snake attributes - width = random.randint(6, 10) - length = random.randint(15, 22) - random_hue = random.random() - snek_color = self._beautiful_pastel(random_hue) - text_color = self._beautiful_pastel((random_hue + 0.5) % 1) - bg_color = ( - random.randint(32, 50), - random.randint(32, 50), - random.randint(50, 70), - ) - - # Build and send the snek - text = random.choice(self.snake_idioms)["idiom"] - factory = utils.PerlinNoiseFactory(dimension=1, octaves=2) - image_frame = utils.create_snek_frame( - factory, - snake_width=width, - snake_length=length, - snake_color=snek_color, - text=text, - text_color=text_color, - bg_color=bg_color - ) - png_bytes = utils.frame_to_png_bytes(image_frame) - file = File(png_bytes, filename="snek.png") - await ctx.send(file=file) - - @snakes_group.command(name="get") - @bot_has_permissions(manage_messages=True) - @locked() - async def get_command(self, ctx: Context, *, name: Snake = None) -> None: - """ - Fetches information about a snake from Wikipedia. - - Created by Ava and eivl. - """ - with ctx.typing(): - if name is None: - name = await Snake.random() - - if isinstance(name, dict): - data = name - else: - data = await self._get_snek(name) - - if data.get("error"): - await ctx.send("Could not fetch data from Wikipedia.") - return - - description = data["info"] - - # Shorten the description if needed - if len(description) > 1000: - description = description[:1000] - last_newline = description.rfind("\n") - if last_newline > 0: - description = description[:last_newline] - - # Strip and add the Wiki link. - if "fullurl" in data: - description = description.strip("\n") - description += f"\n\nRead more on [Wikipedia]({data['fullurl']})" - - # Build and send the embed. - embed = Embed( - title=data.get("title", data.get("name")), - description=description, - colour=0x59982F, - ) - - emoji = "https://emojipedia-us.s3.amazonaws.com/thumbs/60/google/3/snake_1f40d.png" - - _iter = ( - url - for url in data["image_list"] - if url.endswith(self.valid_image_extensions) - ) - image = next(_iter, emoji) - - embed.set_image(url=image) - - await ctx.send(embed=embed) - - @snakes_group.command(name="guess", aliases=("identify",)) - @locked() - async def guess_command(self, ctx: Context) -> None: - """ - Snake identifying game. - - Made by Ava and eivl. - Modified by lemon. - """ - with ctx.typing(): - - image = None - - while image is None: - snakes = [await Snake.random() for _ in range(4)] - snake = random.choice(snakes) - answer = "abcd"[snakes.index(snake)] - - data = await self._get_snek(snake) - - _iter = ( - url - for url in data["image_list"] - if url.endswith(self.valid_image_extensions) - ) - image = next(_iter, None) - - embed = Embed( - title="Which of the following is the snake in the image?", - description="\n".join( - f"{'ABCD'[snakes.index(snake)]}: {snake}" for snake in snakes), - colour=SNAKE_COLOR - ) - embed.set_image(url=image) - - guess = await ctx.send(embed=embed) - options = {f"{'abcd'[snakes.index(snake)]}": snake for snake in snakes} - await self._validate_answer(ctx, guess, answer, options) - - @snakes_group.command(name="hatch") - async def hatch_command(self, ctx: Context) -> None: - """ - Hatches your personal snake. - - Written by Momo and kel. - """ - # Pick a random snake to hatch. - snake_name = random.choice(list(utils.snakes.keys())) - snake_image = utils.snakes[snake_name] - - # Hatch the snake - message = await ctx.send(embed=Embed(description="Hatching your snake :snake:...")) - await asyncio.sleep(1) - - for stage in utils.stages: - hatch_embed = Embed(description=stage) - await message.edit(embed=hatch_embed) - await asyncio.sleep(1) - await asyncio.sleep(1) - await message.delete() - - # Build and send the embed. - my_snake_embed = Embed(description=":tada: Congrats! You hatched: **{0}**".format(snake_name)) - my_snake_embed.set_thumbnail(url=snake_image) - my_snake_embed.set_footer( - text=" Owner: {0}#{1}".format(ctx.author.name, ctx.author.discriminator) - ) - - await ctx.send(embed=my_snake_embed) - - @snakes_group.command(name="movie") - async def movie_command(self, ctx: Context) -> None: - """ - Gets a random snake-related movie from TMDB. - - Written by Samuel. - Modified by gdude. - Modified by Will Da Silva. - """ - # Initially 8 pages are fetched. The actual number of pages is set after the first request. - page = random.randint(1, self.num_movie_pages or 8) - - async with ctx.typing(): - response = await self.bot.http_session.get( - "https://api.themoviedb.org/3/search/movie", - params={ - "query": "snake", - "page": page, - "language": "en-US", - "api_key": Tokens.tmdb, - } - ) - data = await response.json() - if self.num_movie_pages is None: - self.num_movie_pages = data["total_pages"] - movie = random.choice(data["results"])["id"] - - response = await self.bot.http_session.get( - f"https://api.themoviedb.org/3/movie/{movie}", - params={ - "language": "en-US", - "api_key": Tokens.tmdb, - } - ) - data = await response.json() - - embed = Embed(title=data["title"], color=SNAKE_COLOR) - - if data["poster_path"] is not None: - embed.set_image(url=f"https://images.tmdb.org/t/p/original{data['poster_path']}") - - if data["overview"]: - embed.add_field(name="Overview", value=data["overview"]) - - if data["release_date"]: - embed.add_field(name="Release Date", value=data["release_date"]) - - if data["genres"]: - embed.add_field(name="Genres", value=", ".join([x["name"] for x in data["genres"]])) - - if data["vote_count"]: - embed.add_field(name="Rating", value=f"{data['vote_average']}/10 ({data['vote_count']} votes)", inline=True) - - if data["budget"] and data["revenue"]: - embed.add_field(name="Budget", value=data["budget"], inline=True) - embed.add_field(name="Revenue", value=data["revenue"], inline=True) - - embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") - embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") - - try: - await ctx.send(embed=embed) - except HTTPException as err: - await ctx.send("An error occurred while fetching a snake-related movie!") - raise err from None - - @snakes_group.command(name="quiz") - @locked() - async def quiz_command(self, ctx: Context) -> None: - """ - Asks a snake-related question in the chat and validates the user's guess. - - This was created by Mushy and Cardium, - and modified by Urthas and lemon. - """ - # Prepare a question. - question = random.choice(self.snake_quizzes) - answer = question["answerkey"] - options = {key: question["options"][key] for key in ANSWERS_EMOJI.keys()} - - # Build and send the embed. - embed = Embed( - color=SNAKE_COLOR, - title=question["question"], - description="\n".join( - [f"**{key.upper()}**: {answer}" for key, answer in options.items()] - ) - ) - - quiz = await ctx.send(embed=embed) - await self._validate_answer(ctx, quiz, answer, options) - - @snakes_group.command(name="name", aliases=("name_gen",)) - async def name_command(self, ctx: Context, *, name: str = None) -> None: - """ - Snakifies a username. - - Slices the users name at the last vowel (or second last if the name - ends with a vowel), and then combines it with a random snake name, - which is sliced at the first vowel (or second if the name starts with - a vowel). - - If the name contains no vowels, it just appends the snakename - to the end of the name. - - Examples: - lemon + anaconda = lemoconda - krzsn + anaconda = krzsnconda - gdude + anaconda = gduconda - aperture + anaconda = apertuconda - lucy + python = luthon - joseph + taipan = joseipan - - This was written by Iceman, and modified for inclusion into the bot by lemon. - """ - snake_name = await self._get_snake_name() - snake_name = snake_name["name"] - snake_prefix = "" - - # Set aside every word in the snake name except the last. - if " " in snake_name: - snake_prefix = " ".join(snake_name.split()[:-1]) - snake_name = snake_name.split()[-1] - - # If no name is provided, use whoever called the command. - if name: - user_name = name - else: - user_name = ctx.author.display_name - - # Get the index of the vowel to slice the username at - user_slice_index = len(user_name) - for index, char in enumerate(reversed(user_name)): - if index == 0: - continue - if char.lower() in "aeiouy": - user_slice_index -= index - break - - # Now, get the index of the vowel to slice the snake_name at - snake_slice_index = 0 - for index, char in enumerate(snake_name): - if index == 0: - continue - if char.lower() in "aeiouy": - snake_slice_index = index + 1 - break - - # Combine! - snake_name = snake_name[snake_slice_index:] - user_name = user_name[:user_slice_index] - result = f"{snake_prefix} {user_name}{snake_name}" - result = string.capwords(result) - - # Embed and send - embed = Embed( - title="Snake name", - description=f"Your snake-name is **{result}**", - color=SNAKE_COLOR - ) - - await ctx.send(embed=embed) - return - - @snakes_group.command(name="sal") - @locked() - async def sal_command(self, ctx: Context) -> None: - """ - Play a game of Snakes and Ladders. - - Written by Momo and kel. - Modified by lemon. - """ - # Check if there is already a game in this channel - if ctx.channel in self.active_sal: - await ctx.send(f"{ctx.author.mention} A game is already in progress in this channel.") - return - - game = utils.SnakeAndLaddersGame(snakes=self, context=ctx) - self.active_sal[ctx.channel] = game - - await game.open_game() - - @snakes_group.command(name="about") - async def about_command(self, ctx: Context) -> None: - """Show an embed with information about the event, its participants, and its winners.""" - contributors = [ - "<@!245270749919576066>", - "<@!396290259907903491>", - "<@!172395097705414656>", - "<@!361708843425726474>", - "<@!300302216663793665>", - "<@!210248051430916096>", - "<@!174588005745557505>", - "<@!87793066227822592>", - "<@!211619754039967744>", - "<@!97347867923976192>", - "<@!136081839474343936>", - "<@!263560579770220554>", - "<@!104749643715387392>", - "<@!303940835005825024>", - ] - - embed = Embed( - title="About the snake cog", - description=( - "The features in this cog were created by members of the community " - "during our first ever " - "[code jam event](https://pythondiscord.com/pages/code-jams/code-jam-1-snakes-bot/). \n\n" - "The event saw over 50 participants, who competed to write a discord bot cog with a snake theme over " - "48 hours. The staff then selected the best features from all the best teams, and made modifications " - "to ensure they would all work together before integrating them into the community bot.\n\n" - "It was a tight race, but in the end, <@!104749643715387392> and <@!303940835005825024> " - f"walked away as grand champions. Make sure you check out `{ctx.prefix}snakes sal`," - f"`{ctx.prefix}snakes draw` and `{ctx.prefix}snakes hatch` " - "to see what they came up with." - ) - ) - - embed.add_field( - name="Contributors", - value=( - ", ".join(contributors) - ) - ) - - await ctx.send(embed=embed) - - @snakes_group.command(name="card") - async def card_command(self, ctx: Context, *, name: Snake = None) -> None: - """ - Create an interesting little card from a snake. - - Created by juan and Someone during the first code jam. - """ - # Get the snake data we need - if not name: - name_obj = await self._get_snake_name() - name = name_obj["scientific"] - content = await self._get_snek(name) - - elif isinstance(name, dict): - content = name - - else: - content = await self._get_snek(name) - - # Make the card - async with ctx.typing(): - - stream = BytesIO() - async with async_timeout.timeout(10): - async with self.bot.http_session.get(content["image_list"][0]) as response: - stream.write(await response.read()) - - stream.seek(0) - - func = partial(self._generate_card, stream, content) - final_buffer = await self.bot.loop.run_in_executor(None, func) - - # Send it! - await ctx.send( - f"A wild {content['name'].title()} appears!", - file=File(final_buffer, filename=content["name"].replace(" ", "") + ".png") - ) - - @snakes_group.command(name="fact") - async def fact_command(self, ctx: Context) -> None: - """ - Gets a snake-related fact. - - Written by Andrew and Prithaj. - Modified by lemon. - """ - question = random.choice(self.snake_facts)["fact"] - embed = Embed( - title="Snake fact", - color=SNAKE_COLOR, - description=question - ) - await ctx.send(embed=embed) - - @snakes_group.command(name="snakify") - async def snakify_command(self, ctx: Context, *, message: str = None) -> None: - """ - How would I talk if I were a snake? - - If `message` is passed, the bot will snakify the message. - Otherwise, a random message from the user's history is snakified. - - Written by Momo and kel. - Modified by lemon. - """ - with ctx.typing(): - embed = Embed() - user = ctx.author - - if not message: - - # Get a random message from the users history - messages = [] - async for message in ctx.history(limit=500).filter( - lambda msg: msg.author == ctx.author # Message was sent by author. - ): - messages.append(message.content) - - message = self._get_random_long_message(messages) - - # Build and send the embed - embed.set_author( - name=f"{user.name}#{user.discriminator}", - icon_url=user.display_avatar.url, - ) - embed.description = f"*{self._snakify(message)}*" - - await ctx.send(embed=embed) - - @snakes_group.command(name="video", aliases=("get_video",)) - async def video_command(self, ctx: Context, *, search: str = None) -> None: - """ - Gets a YouTube video about snakes. - - If `search` is given, a snake with that name will be searched on Youtube. - - Written by Andrew and Prithaj. - """ - # Are we searching for anything specific? - if search: - query = search + " snake" - else: - snake = await self._get_snake_name() - query = snake["name"] - - # Build the URL and make the request - url = "https://www.googleapis.com/youtube/v3/search" - response = await self.bot.http_session.get( - url, - params={ - "part": "snippet", - "q": urllib.parse.quote_plus(query), - "type": "video", - "key": Tokens.youtube - } - ) - response = await response.json() - data = response.get("items", []) - - # Send the user a video - if len(data) > 0: - num = random.randint(0, len(data) - 1) - youtube_base_url = "https://www.youtube.com/watch?v=" - await ctx.send( - content=f"{youtube_base_url}{data[num]['id']['videoId']}" - ) - else: - log.warning(f"YouTube API error. Full response looks like {response}") - - @snakes_group.command(name="zen") - async def zen_command(self, ctx: Context) -> None: - """ - Gets a random quote from the Zen of Python, except as if spoken by a snake. - - Written by Prithaj and Andrew. - Modified by lemon. - """ - embed = Embed( - title="Zzzen of Pythhon", - color=SNAKE_COLOR - ) - - # Get the zen quote and snakify it - zen_quote = random.choice(ZEN.splitlines()) - zen_quote = self._snakify(zen_quote) - - # Embed and send - embed.description = zen_quote - await ctx.send( - embed=embed - ) - # endregion - - # region: Error handlers - @card_command.error - async def command_error(self, ctx: Context, error: CommandError) -> None: - """Local error handler for the Snake Cog.""" - original_error = getattr(error, "original", None) - if isinstance(original_error, OSError): - error.handled = True - embed = Embed() - embed.colour = Colour.red() - log.error(f"snake_card encountered an OSError: {error} ({original_error})") - embed.description = "Could not generate the snake card! Please try again." - embed.title = random.choice(ERROR_REPLIES) - await ctx.send(embed=embed) diff --git a/bot/exts/evergreen/snakes/_utils.py b/bot/exts/evergreen/snakes/_utils.py deleted file mode 100644 index b5f13c53..00000000 --- a/bot/exts/evergreen/snakes/_utils.py +++ /dev/null @@ -1,721 +0,0 @@ -import asyncio -import io -import json -import logging -import math -import random -from itertools import product -from pathlib import Path - -from PIL import Image -from PIL.ImageDraw import ImageDraw -from discord import File, Member, Reaction -from discord.ext.commands import Cog, Context - -from bot.constants import Roles - -SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() - -h1 = r"""``` - ---- - ------ -/--------\ -|--------| -|--------| - \------/ - ---- -```""" -h2 = r"""``` - ---- - ------ -/---\-/--\ -|-----\--| -|--------| - \------/ - ---- -```""" -h3 = r"""``` - ---- - ------ -/---\-/--\ -|-----\--| -|-----/--| - \----\-/ - ---- -```""" -h4 = r"""``` - ----- - ----- \ -/--| /---\ -|--\ -\---| -|--\--/-- / - \------- / - ------ -```""" -stages = [h1, h2, h3, h4] -snakes = { - "Baby Python": "https://i.imgur.com/SYOcmSa.png", - "Baby Rattle Snake": "https://i.imgur.com/i5jYA8f.png", - "Baby Dragon Snake": "https://i.imgur.com/SuMKM4m.png", - "Baby Garden Snake": "https://i.imgur.com/5vYx3ah.png", - "Baby Cobra": "https://i.imgur.com/jk14ryt.png", - "Baby Anaconda": "https://i.imgur.com/EpdrnNr.png", -} - -BOARD_TILE_SIZE = 56 # the size of each board tile -BOARD_PLAYER_SIZE = 20 # the size of each player icon -BOARD_MARGIN = (10, 0) # margins, in pixels (for player icons) -# The size of the image to download -# Should a power of 2 and higher than BOARD_PLAYER_SIZE -PLAYER_ICON_IMAGE_SIZE = 32 -MAX_PLAYERS = 4 # depends on the board size/quality, 4 is for the default board - -# board definition (from, to) -BOARD = { - # ladders - 2: 38, - 7: 14, - 8: 31, - 15: 26, - 21: 42, - 28: 84, - 36: 44, - 51: 67, - 71: 91, - 78: 98, - 87: 94, - - # snakes - 99: 80, - 95: 75, - 92: 88, - 89: 68, - 74: 53, - 64: 60, - 62: 19, - 49: 11, - 46: 25, - 16: 6 -} - -DEFAULT_SNAKE_COLOR = 0x15c7ea -DEFAULT_BACKGROUND_COLOR = 0 -DEFAULT_IMAGE_DIMENSIONS = (200, 200) -DEFAULT_SNAKE_LENGTH = 22 -DEFAULT_SNAKE_WIDTH = 8 -DEFAULT_SEGMENT_LENGTH_RANGE = (7, 10) -DEFAULT_IMAGE_MARGINS = (50, 50) -DEFAULT_TEXT = "snek\nit\nup" -DEFAULT_TEXT_POSITION = ( - 10, - 10 -) -DEFAULT_TEXT_COLOR = 0xf2ea15 -X = 0 -Y = 1 -ANGLE_RANGE = math.pi * 2 - - -def get_resource(file: str) -> list[dict]: - """Load Snake resources JSON.""" - return json.loads((SNAKE_RESOURCES / f"{file}.json").read_text("utf-8")) - - -def smoothstep(t: float) -> float: - """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating.""" - return t * t * (3. - 2. * t) - - -def lerp(t: float, a: float, b: float) -> float: - """Linear interpolation between a and b, given a fraction t.""" - return a + t * (b - a) - - -class PerlinNoiseFactory(object): - """ - Callable that produces Perlin noise for an arbitrary point in an arbitrary number of dimensions. - - The underlying grid is aligned with the integers. - - There is no limit to the coordinates used; new gradients are generated on the fly as necessary. - - Taken from: https://gist.github.com/eevee/26f547457522755cb1fb8739d0ea89a1 - Licensed under ISC - """ - - def __init__(self, dimension: int, octaves: int = 1, tile: tuple[int, ...] = (), unbias: bool = False): - """ - Create a new Perlin noise factory in the given number of dimensions. - - dimension should be an integer and at least 1. - - More octaves create a foggier and more-detailed noise pattern. More than 4 octaves is rather excessive. - - ``tile`` can be used to make a seamlessly tiling pattern. - For example: - pnf = PerlinNoiseFactory(2, tile=(0, 3)) - - This will produce noise that tiles every 3 units vertically, but never tiles horizontally. - - If ``unbias`` is True, the smoothstep function will be applied to the output before returning - it, to counteract some of Perlin noise's significant bias towards the center of its output range. - """ - self.dimension = dimension - self.octaves = octaves - self.tile = tile + (0,) * dimension - self.unbias = unbias - - # For n dimensions, the range of Perlin noise is ±sqrt(n)/2; multiply - # by this to scale to ±1 - self.scale_factor = 2 * dimension ** -0.5 - - self.gradient = {} - - def _generate_gradient(self) -> tuple[float, ...]: - """ - Generate a random unit vector at each grid point. - - This is the "gradient" vector, in that the grid tile slopes towards it - """ - # 1 dimension is special, since the only unit vector is trivial; - # instead, use a slope between -1 and 1 - if self.dimension == 1: - return (random.uniform(-1, 1),) - - # Generate a random point on the surface of the unit n-hypersphere; - # this is the same as a random unit vector in n dimensions. Thanks - # to: http://mathworld.wolfram.com/SpherePointPicking.html - # Pick n normal random variables with stddev 1 - random_point = [random.gauss(0, 1) for _ in range(self.dimension)] - # Then scale the result to a unit vector - scale = sum(n * n for n in random_point) ** -0.5 - return tuple(coord * scale for coord in random_point) - - def get_plain_noise(self, *point) -> float: - """Get plain noise for a single point, without taking into account either octaves or tiling.""" - if len(point) != self.dimension: - raise ValueError( - f"Expected {self.dimension} values, got {len(point)}" - ) - - # Build a list of the (min, max) bounds in each dimension - grid_coords = [] - for coord in point: - min_coord = math.floor(coord) - max_coord = min_coord + 1 - grid_coords.append((min_coord, max_coord)) - - # Compute the dot product of each gradient vector and the point's - # distance from the corresponding grid point. This gives you each - # gradient's "influence" on the chosen point. - dots = [] - for grid_point in product(*grid_coords): - if grid_point not in self.gradient: - self.gradient[grid_point] = self._generate_gradient() - gradient = self.gradient[grid_point] - - dot = 0 - for i in range(self.dimension): - dot += gradient[i] * (point[i] - grid_point[i]) - dots.append(dot) - - # Interpolate all those dot products together. The interpolation is - # done with smoothstep to smooth out the slope as you pass from one - # grid cell into the next. - # Due to the way product() works, dot products are ordered such that - # the last dimension alternates: (..., min), (..., max), etc. So we - # can interpolate adjacent pairs to "collapse" that last dimension. Then - # the results will alternate in their second-to-last dimension, and so - # forth, until we only have a single value left. - dim = self.dimension - while len(dots) > 1: - dim -= 1 - s = smoothstep(point[dim] - grid_coords[dim][0]) - - next_dots = [] - while dots: - next_dots.append(lerp(s, dots.pop(0), dots.pop(0))) - - dots = next_dots - - return dots[0] * self.scale_factor - - def __call__(self, *point) -> float: - """ - Get the value of this Perlin noise function at the given point. - - The number of values given should match the number of dimensions. - """ - ret = 0 - for o in range(self.octaves): - o2 = 1 << o - new_point = [] - for i, coord in enumerate(point): - coord *= o2 - if self.tile[i]: - coord %= self.tile[i] * o2 - new_point.append(coord) - ret += self.get_plain_noise(*new_point) / o2 - - # Need to scale n back down since adding all those extra octaves has - # probably expanded it beyond ±1 - # 1 octave: ±1 - # 2 octaves: ±1½ - # 3 octaves: ±1¾ - ret /= 2 - 2 ** (1 - self.octaves) - - if self.unbias: - # The output of the plain Perlin noise algorithm has a fairly - # strong bias towards the center due to the central limit theorem - # -- in fact the top and bottom 1/8 virtually never happen. That's - # a quarter of our entire output range! If only we had a function - # in [0..1] that could introduce a bias towards the endpoints... - r = (ret + 1) / 2 - # Doing it this many times is a completely made-up heuristic. - for _ in range(int(self.octaves / 2 + 0.5)): - r = smoothstep(r) - ret = r * 2 - 1 - - return ret - - -def create_snek_frame( - perlin_factory: PerlinNoiseFactory, perlin_lookup_vertical_shift: float = 0, - image_dimensions: tuple[int, int] = DEFAULT_IMAGE_DIMENSIONS, - image_margins: tuple[int, int] = DEFAULT_IMAGE_MARGINS, - snake_length: int = DEFAULT_SNAKE_LENGTH, - snake_color: int = DEFAULT_SNAKE_COLOR, bg_color: int = DEFAULT_BACKGROUND_COLOR, - segment_length_range: tuple[int, int] = DEFAULT_SEGMENT_LENGTH_RANGE, snake_width: int = DEFAULT_SNAKE_WIDTH, - text: str = DEFAULT_TEXT, text_position: tuple[float, float] = DEFAULT_TEXT_POSITION, - text_color: int = DEFAULT_TEXT_COLOR -) -> Image.Image: - """ - Creates a single random snek frame using Perlin noise. - - `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame. - If `text` is given, display the given text with the snek. - """ - start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X]) - start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y]) - points: list[tuple[float, float]] = [(start_x, start_y)] - - for index in range(0, snake_length): - angle = perlin_factory.get_plain_noise( - ((1 / (snake_length + 1)) * (index + 1)) + perlin_lookup_vertical_shift - ) * ANGLE_RANGE - current_point = points[index] - segment_length = random.randint(segment_length_range[0], segment_length_range[1]) - points.append(( - current_point[X] + segment_length * math.cos(angle), - current_point[Y] + segment_length * math.sin(angle) - )) - - # normalize bounds - min_dimensions: list[float] = [start_x, start_y] - max_dimensions: list[float] = [start_x, start_y] - for point in points: - min_dimensions[X] = min(point[X], min_dimensions[X]) - min_dimensions[Y] = min(point[Y], min_dimensions[Y]) - max_dimensions[X] = max(point[X], max_dimensions[X]) - max_dimensions[Y] = max(point[Y], max_dimensions[Y]) - - # shift towards middle - dimension_range = (max_dimensions[X] - min_dimensions[X], max_dimensions[Y] - min_dimensions[Y]) - shift = ( - image_dimensions[X] / 2 - (dimension_range[X] / 2 + min_dimensions[X]), - image_dimensions[Y] / 2 - (dimension_range[Y] / 2 + min_dimensions[Y]) - ) - - image = Image.new(mode="RGB", size=image_dimensions, color=bg_color) - draw = ImageDraw(image) - for index in range(1, len(points)): - point = points[index] - previous = points[index - 1] - draw.line( - ( - shift[X] + previous[X], - shift[Y] + previous[Y], - shift[X] + point[X], - shift[Y] + point[Y] - ), - width=snake_width, - fill=snake_color - ) - if text is not None: - draw.multiline_text(text_position, text, fill=text_color) - del draw - return image - - -def frame_to_png_bytes(image: Image) -> io.BytesIO: - """Convert image to byte stream.""" - stream = io.BytesIO() - image.save(stream, format="PNG") - stream.seek(0) - return stream - - -log = logging.getLogger(__name__) -START_EMOJI = "\u2611" # :ballot_box_with_check: - Start the game -CANCEL_EMOJI = "\u274C" # :x: - Cancel or leave the game -ROLL_EMOJI = "\U0001F3B2" # :game_die: - Roll the die! -JOIN_EMOJI = "\U0001F64B" # :raising_hand: - Join the game. -STARTUP_SCREEN_EMOJI = [ - JOIN_EMOJI, - START_EMOJI, - CANCEL_EMOJI -] -GAME_SCREEN_EMOJI = [ - ROLL_EMOJI, - CANCEL_EMOJI -] - - -class SnakeAndLaddersGame: - """Snakes and Ladders game Cog.""" - - def __init__(self, snakes: Cog, context: Context): - self.snakes = snakes - self.ctx = context - self.channel = self.ctx.channel - self.state = "booting" - self.started = False - self.author = self.ctx.author - self.players = [] - self.player_tiles = {} - self.round_has_rolled = {} - self.avatar_images = {} - self.board = None - self.positions = None - self.rolls = [] - - async def open_game(self) -> None: - """ - Create a new Snakes and Ladders game. - - Listen for reactions until players have joined, and the game has been started. - """ - def startup_event_check(reaction_: Reaction, user_: Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - all(( - reaction_.message.id == startup.id, # Reaction is on startup message - reaction_.emoji in STARTUP_SCREEN_EMOJI, # Reaction is one of the startup emotes - user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot - )) - ) - - # Check to see if the bot can remove reactions - if not self.channel.permissions_for(self.ctx.guild.me).manage_messages: - log.warning( - "Unable to start Snakes and Ladders - " - f"Missing manage_messages permissions in {self.channel}" - ) - return - - await self._add_player(self.author) - await self.channel.send( - "**Snakes and Ladders**: A new game is about to start!", - file=File( - str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), - filename="Snakes and Ladders.jpg" - ) - ) - startup = await self.channel.send( - f"Press {JOIN_EMOJI} to participate, and press " - f"{START_EMOJI} to start the game" - ) - for emoji in STARTUP_SCREEN_EMOJI: - await startup.add_reaction(emoji) - - self.state = "waiting" - - while not self.started: - try: - reaction, user = await self.ctx.bot.wait_for( - "reaction_add", - timeout=300, - check=startup_event_check - ) - if reaction.emoji == JOIN_EMOJI: - await self.player_join(user) - elif reaction.emoji == CANCEL_EMOJI: - if user == self.author or (self._is_moderator(user) and user not in self.players): - # Allow game author or non-playing moderation staff to cancel a waiting game - await self.cancel_game() - return - else: - await self.player_leave(user) - elif reaction.emoji == START_EMOJI: - if self.ctx.author == user: - self.started = True - await self.start_game(user) - await startup.delete() - break - - await startup.remove_reaction(reaction.emoji, user) - - except asyncio.TimeoutError: - log.debug("Snakes and Ladders timed out waiting for a reaction") - await self.cancel_game() - return # We're done, no reactions for the last 5 minutes - - async def _add_player(self, user: Member) -> None: - """Add player to game.""" - self.players.append(user) - self.player_tiles[user.id] = 1 - - avatar_bytes = await user.display_avatar.replace(size=PLAYER_ICON_IMAGE_SIZE).read() - im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE)) - self.avatar_images[user.id] = im - - async def player_join(self, user: Member) -> None: - """ - Handle players joining the game. - - Prevent player joining if they have already joined, if the game is full, or if the game is - in a waiting state. - """ - for p in self.players: - if user == p: - await self.channel.send(user.mention + " You are already in the game.", delete_after=10) - return - if self.state != "waiting": - await self.channel.send(user.mention + " You cannot join at this time.", delete_after=10) - return - if len(self.players) is MAX_PLAYERS: - await self.channel.send(user.mention + " The game is full!", delete_after=10) - return - - await self._add_player(user) - - await self.channel.send( - f"**Snakes and Ladders**: {user.mention} has joined the game.\n" - f"There are now {str(len(self.players))} players in the game.", - delete_after=10 - ) - - async def player_leave(self, user: Member) -> bool: - """ - Handle players leaving the game. - - Leaving is prevented if the user wasn't part of the game. - - If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean - is returned True to prevent a game from continuing after it's destroyed. - """ - is_surrendered = False # Sentinel value to assist with stopping a surrendered game - for p in self.players: - if user == p: - self.players.remove(p) - self.player_tiles.pop(p.id, None) - self.round_has_rolled.pop(p.id, None) - await self.channel.send( - "**Snakes and Ladders**: " + user.mention + " has left the game.", - delete_after=10 - ) - - if self.state != "waiting" and len(self.players) == 0: - await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") - is_surrendered = True - self._destruct() - - return is_surrendered - else: - await self.channel.send(user.mention + " You are not in the match.", delete_after=10) - return is_surrendered - - async def cancel_game(self) -> None: - """Cancel the running game.""" - await self.channel.send("**Snakes and Ladders**: Game has been canceled.") - self._destruct() - - async def start_game(self, user: Member) -> None: - """ - Allow the game author to begin the game. - - The game cannot be started if the game is in a waiting state. - """ - if not user == self.author: - await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10) - return - - if not self.state == "waiting": - await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10) - return - - self.state = "starting" - player_list = ", ".join(user.mention for user in self.players) - await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list) - await self.start_round() - - async def start_round(self) -> None: - """Begin the round.""" - def game_event_check(reaction_: Reaction, user_: Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - all(( - reaction_.message.id == self.positions.id, # Reaction is on positions message - reaction_.emoji in GAME_SCREEN_EMOJI, # Reaction is one of the game emotes - user_.id != self.ctx.bot.user.id, # Reaction was not made by the bot - )) - ) - - self.state = "roll" - for user in self.players: - self.round_has_rolled[user.id] = False - board_img = Image.open(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg") - player_row_size = math.ceil(MAX_PLAYERS / 2) - - for i, player in enumerate(self.players): - tile = self.player_tiles[player.id] - tile_coordinates = self._board_coordinate_from_index(tile) - x_offset = BOARD_MARGIN[0] + tile_coordinates[0] * BOARD_TILE_SIZE - y_offset = \ - BOARD_MARGIN[1] + ( - (10 * BOARD_TILE_SIZE) - (9 - tile_coordinates[1]) * BOARD_TILE_SIZE - BOARD_PLAYER_SIZE) - x_offset += BOARD_PLAYER_SIZE * (i % player_row_size) - y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size) - board_img.paste(self.avatar_images[player.id], - box=(x_offset, y_offset)) - - board_file = File(frame_to_png_bytes(board_img), filename="Board.jpg") - player_list = "\n".join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players) - - # Store and send new messages - temp_board = await self.channel.send( - "**Snakes and Ladders**: A new round has started! Current board:", - file=board_file - ) - temp_positions = await self.channel.send( - f"**Current positions**:\n{player_list}\n\nUse {ROLL_EMOJI} to roll the dice!" - ) - - # Delete the previous messages - if self.board and self.positions: - await self.board.delete() - await self.positions.delete() - - # remove the roll messages - for roll in self.rolls: - await roll.delete() - self.rolls = [] - - # Save new messages - self.board = temp_board - self.positions = temp_positions - - # Wait for rolls - for emoji in GAME_SCREEN_EMOJI: - await self.positions.add_reaction(emoji) - - is_surrendered = False - while True: - try: - reaction, user = await self.ctx.bot.wait_for( - "reaction_add", - timeout=300, - check=game_event_check - ) - - if reaction.emoji == ROLL_EMOJI: - await self.player_roll(user) - elif reaction.emoji == CANCEL_EMOJI: - if self._is_moderator(user) and user not in self.players: - # Only allow non-playing moderation staff to cancel a running game - await self.cancel_game() - return - else: - is_surrendered = await self.player_leave(user) - - await self.positions.remove_reaction(reaction.emoji, user) - - if self._check_all_rolled(): - break - - except asyncio.TimeoutError: - log.debug("Snakes and Ladders timed out waiting for a reaction") - await self.cancel_game() - return # We're done, no reactions for the last 5 minutes - - # Round completed - # Check to see if the game was surrendered before completing the round, without this - # sentinel, the game object would be deleted but the next round still posted into purgatory - if not is_surrendered: - await self._complete_round() - - async def player_roll(self, user: Member) -> None: - """Handle the player's roll.""" - if user.id not in self.player_tiles: - await self.channel.send(user.mention + " You are not in the match.", delete_after=10) - return - if self.state != "roll": - await self.channel.send(user.mention + " You may not roll at this time.", delete_after=10) - return - if self.round_has_rolled[user.id]: - return - roll = random.randint(1, 6) - self.rolls.append(await self.channel.send(f"{user.mention} rolled a **{roll}**!")) - next_tile = self.player_tiles[user.id] + roll - - # apply snakes and ladders - if next_tile in BOARD: - target = BOARD[next_tile] - if target < next_tile: - await self.channel.send( - f"{user.mention} slips on a snake and falls back to **{target}**", - delete_after=15 - ) - else: - await self.channel.send( - f"{user.mention} climbs a ladder to **{target}**", - delete_after=15 - ) - next_tile = target - - self.player_tiles[user.id] = min(100, next_tile) - self.round_has_rolled[user.id] = True - - async def _complete_round(self) -> None: - """At the conclusion of a round check to see if there's been a winner.""" - self.state = "post_round" - - # check for winner - winner = self._check_winner() - if winner is None: - # there is no winner, start the next round - await self.start_round() - return - - # announce winner and exit - await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:") - self._destruct() - - def _check_winner(self) -> Member: - """Return a winning member if we're in the post-round state and there's a winner.""" - if self.state != "post_round": - return None - return next((player for player in self.players if self.player_tiles[player.id] == 100), - None) - - def _check_all_rolled(self) -> bool: - """Check if all members have made their roll.""" - return all(rolled for rolled in self.round_has_rolled.values()) - - def _destruct(self) -> None: - """Clean up the finished game object.""" - del self.snakes.active_sal[self.channel] - - def _board_coordinate_from_index(self, index: int) -> tuple[int, int]: - """Convert the tile number to the x/y coordinates for graphical purposes.""" - y_level = 9 - math.floor((index - 1) / 10) - is_reversed = math.floor((index - 1) / 10) % 2 != 0 - x_level = (index - 1) % 10 - if is_reversed: - x_level = 9 - x_level - return x_level, y_level - - @staticmethod - def _is_moderator(user: Member) -> bool: - """Return True if the user is a Moderator.""" - return any(Roles.moderator == role.id for role in user.roles) |