diff options
| author | 2020-02-06 21:40:40 +0000 | |
|---|---|---|
| committer | 2020-02-06 21:40:40 +0000 | |
| commit | 9e758d49a905b5e0c7ee8e722c331157888b7ba9 (patch) | |
| tree | 64e95c43444cd9b65bd8f939d9da4dc7736089f8 /bot/seasons/evergreen/snakes/utils.py | |
| parent | Post results and boards to initial channel (diff) | |
| parent | Update CODEOWNERS (diff) | |
Merge branch 'master' into battleships
Diffstat (limited to 'bot/seasons/evergreen/snakes/utils.py')
| -rw-r--r-- | bot/seasons/evergreen/snakes/utils.py | 149 | 
1 files changed, 71 insertions, 78 deletions
| diff --git a/bot/seasons/evergreen/snakes/utils.py b/bot/seasons/evergreen/snakes/utils.py index 88fb2032..7d6caf04 100644 --- a/bot/seasons/evergreen/snakes/utils.py +++ b/bot/seasons/evergreen/snakes/utils.py @@ -11,7 +11,9 @@ from typing import List, Tuple  from PIL import Image  from PIL.ImageDraw import ImageDraw  from discord import File, Member, Reaction -from discord.ext.commands import Context +from discord.ext.commands import Cog, Context + +from bot.constants import Roles  SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() @@ -116,12 +118,12 @@ def get_resource(file: str) -> List[dict]:          return json.load(snakefile) -def smoothstep(t): +def smoothstep(t: float) -> float:      """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating."""      return t * t * (3. - 2. * t) -def lerp(t, a, b): +def lerp(t: float, a: float, b: float) -> float:      """Linear interpolation between a and b, given a fraction t."""      return a + t * (b - a) @@ -138,7 +140,7 @@ class PerlinNoiseFactory(object):      Licensed under ISC      """ -    def __init__(self, dimension, octaves=1, tile=(), unbias=False): +    def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False):          """          Create a new Perlin noise factory in the given number of dimensions. @@ -152,7 +154,7 @@ class PerlinNoiseFactory(object):          This will produce noise that tiles every 3 units vertically, but never tiles horizontally. -        If ``unbias`` is true, the smoothstep function will be applied to the output before returning +        If ``unbias`` is True, the smoothstep function will be applied to the output before returning          it, to counteract some of Perlin noise's significant bias towards the center of its output range.          """          self.dimension = dimension @@ -166,7 +168,7 @@ class PerlinNoiseFactory(object):          self.gradient = {} -    def _generate_gradient(self): +    def _generate_gradient(self) -> Tuple[float, ...]:          """          Generate a random unit vector at each grid point. @@ -186,7 +188,7 @@ class PerlinNoiseFactory(object):          scale = sum(n * n for n in random_point) ** -0.5          return tuple(coord * scale for coord in random_point) -    def get_plain_noise(self, *point): +    def get_plain_noise(self, *point) -> float:          """Get plain noise for a single point, without taking into account either octaves or tiling."""          if len(point) != self.dimension:              raise ValueError("Expected {0} values, got {1}".format( @@ -234,7 +236,7 @@ class PerlinNoiseFactory(object):          return dots[0] * self.scale_factor -    def __call__(self, *point): +    def __call__(self, *point) -> float:          """          Get the value of this Perlin noise function at the given point. @@ -285,20 +287,8 @@ def create_snek_frame(      """      Creates a single random snek frame using Perlin noise. -    :param perlin_factory: the perlin noise factory used. Required. -    :param perlin_lookup_vertical_shift: the Perlin noise shift in the Y-dimension for this frame -    :param image_dimensions: the size of the output image. -    :param image_margins: the margins to respect inside of the image. -    :param snake_length: the length of the snake, in segments. -    :param snake_color: the color of the snake. -    :param bg_color: the background color. -    :param segment_length_range: the range of the segment length. Values will be generated inside -                                 this range, including the bounds. -    :param snake_width: the width of the snek, in pixels. -    :param text: the text to display with the snek. Set to None for no text. -    :param text_position: the position of the text. -    :param text_color: the color of the text. -    :return: a PIL image, representing a single frame. +    `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame. +    If `text` is given, display the given text with the snek.      """      start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X])      start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y]) @@ -379,7 +369,7 @@ GAME_SCREEN_EMOJI = [  class SnakeAndLaddersGame:      """Snakes and Ladders game Cog.""" -    def __init__(self, snakes, context: Context): +    def __init__(self, snakes: Cog, context: Context):          self.snakes = snakes          self.ctx = context          self.channel = self.ctx.channel @@ -394,14 +384,13 @@ class SnakeAndLaddersGame:          self.positions = None          self.rolls = [] -    async def open_game(self): +    async def open_game(self) -> None:          """          Create a new Snakes and Ladders game. -        Listen for reactions until players have joined, -        and the game has been started. +        Listen for reactions until players have joined, and the game has been started.          """ -        def startup_event_check(reaction_: Reaction, user_: Member): +        def startup_event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -424,7 +413,6 @@ class SnakeAndLaddersGame:              "**Snakes and Ladders**: A new game is about to start!",              file=File(                  str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), -                # os.path.join("bot", "resources", "snakes", "snakes_and_ladders", "banner.jpg"),                  filename='Snakes and Ladders.jpg'              )          ) @@ -447,8 +435,9 @@ class SnakeAndLaddersGame:                  if reaction.emoji == JOIN_EMOJI:                      await self.player_join(user)                  elif reaction.emoji == CANCEL_EMOJI: -                    if self.ctx.author == user: -                        await self.cancel_game(user) +                    if user == self.author or (self._is_moderator(user) and user not in self.players): +                        # Allow game author or non-playing moderation staff to cancel a waiting game +                        await self.cancel_game()                          return                      else:                          await self.player_leave(user) @@ -463,10 +452,11 @@ class SnakeAndLaddersGame:              except asyncio.TimeoutError:                  log.debug("Snakes and Ladders timed out waiting for a reaction") -                self.cancel_game(self.author) +                await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes -    async def _add_player(self, user: Member): +    async def _add_player(self, user: Member) -> None: +        """Add player to game."""          self.players.append(user)          self.player_tiles[user.id] = 1 @@ -474,7 +464,7 @@ class SnakeAndLaddersGame:          im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))          self.avatar_images[user.id] = im -    async def player_join(self, user: Member): +    async def player_join(self, user: Member) -> None:          """          Handle players joining the game. @@ -500,20 +490,16 @@ class SnakeAndLaddersGame:              delete_after=10          ) -    async def player_leave(self, user: Member): +    async def player_leave(self, user: Member) -> bool:          """          Handle players leaving the game. -        Leaving is prevented if the user initiated the game or if they weren't part of it in the -        first place. +        Leaving is prevented if the user wasn't part of the game. + +        If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean +        is returned True to prevent a game from continuing after it's destroyed.          """ -        if user == self.author: -            await self.channel.send( -                user.mention + " You are the author, and cannot leave the game. Execute " -                "`sal cancel` to cancel the game.", -                delete_after=10 -            ) -            return +        is_surrendered = False  # Sentinel value to assist with stopping a surrendered game          for p in self.players:              if user == p:                  self.players.remove(p) @@ -524,47 +510,43 @@ class SnakeAndLaddersGame:                      delete_after=10                  ) -                if self.state != 'waiting' and len(self.players) == 1: +                if self.state != 'waiting' and len(self.players) == 0:                      await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") +                    is_surrendered = True                      self._destruct() -                return -        await self.channel.send(user.mention + " You are not in the match.", delete_after=10) -    async def cancel_game(self, user: Member): -        """Allow the game author to cancel the running game.""" -        if not user == self.author: -            await self.channel.send(user.mention + " Only the author of the game can cancel it.", delete_after=10) -            return +                return is_surrendered +        else: +            await self.channel.send(user.mention + " You are not in the match.", delete_after=10) +            return is_surrendered + +    async def cancel_game(self) -> None: +        """Cancel the running game."""          await self.channel.send("**Snakes and Ladders**: Game has been canceled.")          self._destruct() -    async def start_game(self, user: Member): +    async def start_game(self, user: Member) -> None:          """          Allow the game author to begin the game. -        The game cannot be started if there aren't enough players joined or if the game is in a -        waiting state. +        The game cannot be started if the game is in a waiting state.          """          if not user == self.author:              await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)              return -        if len(self.players) < 1: -            await self.channel.send( -                user.mention + " A minimum of 2 players is required to start the game.", -                delete_after=10 -            ) -            return +          if not self.state == 'waiting':              await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)              return +          self.state = 'starting'          player_list = ', '.join(user.mention for user in self.players)          await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)          await self.start_round() -    async def start_round(self): +    async def start_round(self) -> None:          """Begin the round.""" -        def game_event_check(reaction_: Reaction, user_: Member): +        def game_event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -577,8 +559,6 @@ class SnakeAndLaddersGame:          self.state = 'roll'          for user in self.players:              self.round_has_rolled[user.id] = False -        # board_img = Image.open(os.path.join( -        #     "bot", "resources", "snakes", "snakes_and_ladders", "board.jpg"))          board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg"))          player_row_size = math.ceil(MAX_PLAYERS / 2) @@ -593,9 +573,8 @@ class SnakeAndLaddersGame:              y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size)              board_img.paste(self.avatar_images[player.id],                              box=(x_offset, y_offset)) -        stream = io.BytesIO() -        board_img.save(stream, format='JPEG') -        board_file = File(stream.getvalue(), filename='Board.jpg') + +        board_file = File(frame_to_png_bytes(board_img), filename='Board.jpg')          player_list = '\n'.join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)          # Store and send new messages @@ -625,6 +604,7 @@ class SnakeAndLaddersGame:          for emoji in GAME_SCREEN_EMOJI:              await self.positions.add_reaction(emoji) +        is_surrendered = False          while True:              try:                  reaction, user = await self.ctx.bot.wait_for( @@ -636,11 +616,12 @@ class SnakeAndLaddersGame:                  if reaction.emoji == ROLL_EMOJI:                      await self.player_roll(user)                  elif reaction.emoji == CANCEL_EMOJI: -                    if self.ctx.author == user: -                        await self.cancel_game(user) +                    if self._is_moderator(user) and user not in self.players: +                        # Only allow non-playing moderation staff to cancel a running game +                        await self.cancel_game()                          return                      else: -                        await self.player_leave(user) +                        is_surrendered = await self.player_leave(user)                  await self.positions.remove_reaction(reaction.emoji, user) @@ -649,13 +630,16 @@ class SnakeAndLaddersGame:              except asyncio.TimeoutError:                  log.debug("Snakes and Ladders timed out waiting for a reaction") -                await self.cancel_game(self.author) +                await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes          # Round completed -        await self._complete_round() +        # Check to see if the game was surrendered before completing the round, without this +        # sentinel, the game object would be deleted but the next round still posted into purgatory +        if not is_surrendered: +            await self._complete_round() -    async def player_roll(self, user: Member): +    async def player_roll(self, user: Member) -> None:          """Handle the player's roll."""          if user.id not in self.player_tiles:              await self.channel.send(user.mention + " You are not in the match.", delete_after=10) @@ -687,7 +671,8 @@ class SnakeAndLaddersGame:          self.player_tiles[user.id] = min(100, next_tile)          self.round_has_rolled[user.id] = True -    async def _complete_round(self): +    async def _complete_round(self) -> None: +        """At the conclusion of a round check to see if there's been a winner."""          self.state = 'post_round'          # check for winner @@ -702,22 +687,30 @@ class SnakeAndLaddersGame:          self._destruct()      def _check_winner(self) -> Member: +        """Return a winning member if we're in the post-round state and there's a winner."""          if self.state != 'post_round':              return None          return next((player for player in self.players if self.player_tiles[player.id] == 100),                      None) -    def _check_all_rolled(self): +    def _check_all_rolled(self) -> bool: +        """Check if all members have made their roll."""          return all(rolled for rolled in self.round_has_rolled.values()) -    def _destruct(self): +    def _destruct(self) -> None: +        """Clean up the finished game object."""          del self.snakes.active_sal[self.channel] -    def _board_coordinate_from_index(self, index: int): -        # converts the tile number to the x/y coordinates for graphical purposes +    def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]: +        """Convert the tile number to the x/y coordinates for graphical purposes."""          y_level = 9 - math.floor((index - 1) / 10)          is_reversed = math.floor((index - 1) / 10) % 2 != 0          x_level = (index - 1) % 10          if is_reversed:              x_level = 9 - x_level          return x_level, y_level + +    @staticmethod +    def _is_moderator(user: Member) -> bool: +        """Return True if the user is a Moderator.""" +        return any(Roles.moderator == role.id for role in user.roles) | 
