diff options
author | 2019-09-30 14:48:49 +0200 | |
---|---|---|
committer | 2019-09-30 14:48:49 +0200 | |
commit | 6120f9d90272cab33aa1da857dbfaeb1b5adbd9a (patch) | |
tree | 740c8bd789e24ca22913caad26bcd5897cd6d270 /bot/seasons/evergreen/snakes/utils.py | |
parent | Edit dates (diff) | |
parent | Merge pull request #276 from python-discord/update-flake8-annotations (diff) |
Merge branch 'master' into date-fix
Diffstat (limited to 'bot/seasons/evergreen/snakes/utils.py')
-rw-r--r-- | bot/seasons/evergreen/snakes/utils.py | 149 |
1 files changed, 71 insertions, 78 deletions
diff --git a/bot/seasons/evergreen/snakes/utils.py b/bot/seasons/evergreen/snakes/utils.py index 88fb2032..7d6caf04 100644 --- a/bot/seasons/evergreen/snakes/utils.py +++ b/bot/seasons/evergreen/snakes/utils.py @@ -11,7 +11,9 @@ from typing import List, Tuple from PIL import Image from PIL.ImageDraw import ImageDraw from discord import File, Member, Reaction -from discord.ext.commands import Context +from discord.ext.commands import Cog, Context + +from bot.constants import Roles SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() @@ -116,12 +118,12 @@ def get_resource(file: str) -> List[dict]: return json.load(snakefile) -def smoothstep(t): +def smoothstep(t: float) -> float: """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating.""" return t * t * (3. - 2. * t) -def lerp(t, a, b): +def lerp(t: float, a: float, b: float) -> float: """Linear interpolation between a and b, given a fraction t.""" return a + t * (b - a) @@ -138,7 +140,7 @@ class PerlinNoiseFactory(object): Licensed under ISC """ - def __init__(self, dimension, octaves=1, tile=(), unbias=False): + def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False): """ Create a new Perlin noise factory in the given number of dimensions. @@ -152,7 +154,7 @@ class PerlinNoiseFactory(object): This will produce noise that tiles every 3 units vertically, but never tiles horizontally. - If ``unbias`` is true, the smoothstep function will be applied to the output before returning + If ``unbias`` is True, the smoothstep function will be applied to the output before returning it, to counteract some of Perlin noise's significant bias towards the center of its output range. """ self.dimension = dimension @@ -166,7 +168,7 @@ class PerlinNoiseFactory(object): self.gradient = {} - def _generate_gradient(self): + def _generate_gradient(self) -> Tuple[float, ...]: """ Generate a random unit vector at each grid point. @@ -186,7 +188,7 @@ class PerlinNoiseFactory(object): scale = sum(n * n for n in random_point) ** -0.5 return tuple(coord * scale for coord in random_point) - def get_plain_noise(self, *point): + def get_plain_noise(self, *point) -> float: """Get plain noise for a single point, without taking into account either octaves or tiling.""" if len(point) != self.dimension: raise ValueError("Expected {0} values, got {1}".format( @@ -234,7 +236,7 @@ class PerlinNoiseFactory(object): return dots[0] * self.scale_factor - def __call__(self, *point): + def __call__(self, *point) -> float: """ Get the value of this Perlin noise function at the given point. @@ -285,20 +287,8 @@ def create_snek_frame( """ Creates a single random snek frame using Perlin noise. - :param perlin_factory: the perlin noise factory used. Required. - :param perlin_lookup_vertical_shift: the Perlin noise shift in the Y-dimension for this frame - :param image_dimensions: the size of the output image. - :param image_margins: the margins to respect inside of the image. - :param snake_length: the length of the snake, in segments. - :param snake_color: the color of the snake. - :param bg_color: the background color. - :param segment_length_range: the range of the segment length. Values will be generated inside - this range, including the bounds. - :param snake_width: the width of the snek, in pixels. - :param text: the text to display with the snek. Set to None for no text. - :param text_position: the position of the text. - :param text_color: the color of the text. - :return: a PIL image, representing a single frame. + `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame. + If `text` is given, display the given text with the snek. """ start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X]) start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y]) @@ -379,7 +369,7 @@ GAME_SCREEN_EMOJI = [ class SnakeAndLaddersGame: """Snakes and Ladders game Cog.""" - def __init__(self, snakes, context: Context): + def __init__(self, snakes: Cog, context: Context): self.snakes = snakes self.ctx = context self.channel = self.ctx.channel @@ -394,14 +384,13 @@ class SnakeAndLaddersGame: self.positions = None self.rolls = [] - async def open_game(self): + async def open_game(self) -> None: """ Create a new Snakes and Ladders game. - Listen for reactions until players have joined, - and the game has been started. + Listen for reactions until players have joined, and the game has been started. """ - def startup_event_check(reaction_: Reaction, user_: Member): + def startup_event_check(reaction_: Reaction, user_: Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -424,7 +413,6 @@ class SnakeAndLaddersGame: "**Snakes and Ladders**: A new game is about to start!", file=File( str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), - # os.path.join("bot", "resources", "snakes", "snakes_and_ladders", "banner.jpg"), filename='Snakes and Ladders.jpg' ) ) @@ -447,8 +435,9 @@ class SnakeAndLaddersGame: if reaction.emoji == JOIN_EMOJI: await self.player_join(user) elif reaction.emoji == CANCEL_EMOJI: - if self.ctx.author == user: - await self.cancel_game(user) + if user == self.author or (self._is_moderator(user) and user not in self.players): + # Allow game author or non-playing moderation staff to cancel a waiting game + await self.cancel_game() return else: await self.player_leave(user) @@ -463,10 +452,11 @@ class SnakeAndLaddersGame: except asyncio.TimeoutError: log.debug("Snakes and Ladders timed out waiting for a reaction") - self.cancel_game(self.author) + await self.cancel_game() return # We're done, no reactions for the last 5 minutes - async def _add_player(self, user: Member): + async def _add_player(self, user: Member) -> None: + """Add player to game.""" self.players.append(user) self.player_tiles[user.id] = 1 @@ -474,7 +464,7 @@ class SnakeAndLaddersGame: im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE)) self.avatar_images[user.id] = im - async def player_join(self, user: Member): + async def player_join(self, user: Member) -> None: """ Handle players joining the game. @@ -500,20 +490,16 @@ class SnakeAndLaddersGame: delete_after=10 ) - async def player_leave(self, user: Member): + async def player_leave(self, user: Member) -> bool: """ Handle players leaving the game. - Leaving is prevented if the user initiated the game or if they weren't part of it in the - first place. + Leaving is prevented if the user wasn't part of the game. + + If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean + is returned True to prevent a game from continuing after it's destroyed. """ - if user == self.author: - await self.channel.send( - user.mention + " You are the author, and cannot leave the game. Execute " - "`sal cancel` to cancel the game.", - delete_after=10 - ) - return + is_surrendered = False # Sentinel value to assist with stopping a surrendered game for p in self.players: if user == p: self.players.remove(p) @@ -524,47 +510,43 @@ class SnakeAndLaddersGame: delete_after=10 ) - if self.state != 'waiting' and len(self.players) == 1: + if self.state != 'waiting' and len(self.players) == 0: await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") + is_surrendered = True self._destruct() - return - await self.channel.send(user.mention + " You are not in the match.", delete_after=10) - async def cancel_game(self, user: Member): - """Allow the game author to cancel the running game.""" - if not user == self.author: - await self.channel.send(user.mention + " Only the author of the game can cancel it.", delete_after=10) - return + return is_surrendered + else: + await self.channel.send(user.mention + " You are not in the match.", delete_after=10) + return is_surrendered + + async def cancel_game(self) -> None: + """Cancel the running game.""" await self.channel.send("**Snakes and Ladders**: Game has been canceled.") self._destruct() - async def start_game(self, user: Member): + async def start_game(self, user: Member) -> None: """ Allow the game author to begin the game. - The game cannot be started if there aren't enough players joined or if the game is in a - waiting state. + The game cannot be started if the game is in a waiting state. """ if not user == self.author: await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10) return - if len(self.players) < 1: - await self.channel.send( - user.mention + " A minimum of 2 players is required to start the game.", - delete_after=10 - ) - return + if not self.state == 'waiting': await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10) return + self.state = 'starting' player_list = ', '.join(user.mention for user in self.players) await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list) await self.start_round() - async def start_round(self): + async def start_round(self) -> None: """Begin the round.""" - def game_event_check(reaction_: Reaction, user_: Member): + def game_event_check(reaction_: Reaction, user_: Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -577,8 +559,6 @@ class SnakeAndLaddersGame: self.state = 'roll' for user in self.players: self.round_has_rolled[user.id] = False - # board_img = Image.open(os.path.join( - # "bot", "resources", "snakes", "snakes_and_ladders", "board.jpg")) board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg")) player_row_size = math.ceil(MAX_PLAYERS / 2) @@ -593,9 +573,8 @@ class SnakeAndLaddersGame: y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size) board_img.paste(self.avatar_images[player.id], box=(x_offset, y_offset)) - stream = io.BytesIO() - board_img.save(stream, format='JPEG') - board_file = File(stream.getvalue(), filename='Board.jpg') + + board_file = File(frame_to_png_bytes(board_img), filename='Board.jpg') player_list = '\n'.join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players) # Store and send new messages @@ -625,6 +604,7 @@ class SnakeAndLaddersGame: for emoji in GAME_SCREEN_EMOJI: await self.positions.add_reaction(emoji) + is_surrendered = False while True: try: reaction, user = await self.ctx.bot.wait_for( @@ -636,11 +616,12 @@ class SnakeAndLaddersGame: if reaction.emoji == ROLL_EMOJI: await self.player_roll(user) elif reaction.emoji == CANCEL_EMOJI: - if self.ctx.author == user: - await self.cancel_game(user) + if self._is_moderator(user) and user not in self.players: + # Only allow non-playing moderation staff to cancel a running game + await self.cancel_game() return else: - await self.player_leave(user) + is_surrendered = await self.player_leave(user) await self.positions.remove_reaction(reaction.emoji, user) @@ -649,13 +630,16 @@ class SnakeAndLaddersGame: except asyncio.TimeoutError: log.debug("Snakes and Ladders timed out waiting for a reaction") - await self.cancel_game(self.author) + await self.cancel_game() return # We're done, no reactions for the last 5 minutes # Round completed - await self._complete_round() + # Check to see if the game was surrendered before completing the round, without this + # sentinel, the game object would be deleted but the next round still posted into purgatory + if not is_surrendered: + await self._complete_round() - async def player_roll(self, user: Member): + async def player_roll(self, user: Member) -> None: """Handle the player's roll.""" if user.id not in self.player_tiles: await self.channel.send(user.mention + " You are not in the match.", delete_after=10) @@ -687,7 +671,8 @@ class SnakeAndLaddersGame: self.player_tiles[user.id] = min(100, next_tile) self.round_has_rolled[user.id] = True - async def _complete_round(self): + async def _complete_round(self) -> None: + """At the conclusion of a round check to see if there's been a winner.""" self.state = 'post_round' # check for winner @@ -702,22 +687,30 @@ class SnakeAndLaddersGame: self._destruct() def _check_winner(self) -> Member: + """Return a winning member if we're in the post-round state and there's a winner.""" if self.state != 'post_round': return None return next((player for player in self.players if self.player_tiles[player.id] == 100), None) - def _check_all_rolled(self): + def _check_all_rolled(self) -> bool: + """Check if all members have made their roll.""" return all(rolled for rolled in self.round_has_rolled.values()) - def _destruct(self): + def _destruct(self) -> None: + """Clean up the finished game object.""" del self.snakes.active_sal[self.channel] - def _board_coordinate_from_index(self, index: int): - # converts the tile number to the x/y coordinates for graphical purposes + def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]: + """Convert the tile number to the x/y coordinates for graphical purposes.""" y_level = 9 - math.floor((index - 1) / 10) is_reversed = math.floor((index - 1) / 10) % 2 != 0 x_level = (index - 1) % 10 if is_reversed: x_level = 9 - x_level return x_level, y_level + + @staticmethod + def _is_moderator(user: Member) -> bool: + """Return True if the user is a Moderator.""" + return any(Roles.moderator == role.id for role in user.roles) |