diff options
Diffstat (limited to 'bot/seasons/evergreen/snakes/utils.py')
| -rw-r--r-- | bot/seasons/evergreen/snakes/utils.py | 172 | 
1 files changed, 74 insertions, 98 deletions
| diff --git a/bot/seasons/evergreen/snakes/utils.py b/bot/seasons/evergreen/snakes/utils.py index a7cb70a7..7d6caf04 100644 --- a/bot/seasons/evergreen/snakes/utils.py +++ b/bot/seasons/evergreen/snakes/utils.py @@ -11,9 +11,11 @@ from typing import List, Tuple  from PIL import Image  from PIL.ImageDraw import ImageDraw  from discord import File, Member, Reaction -from discord.ext.commands import Context +from discord.ext.commands import Cog, Context -SNAKE_RESOURCES = Path('bot', 'resources', 'snakes').absolute() +from bot.constants import Roles + +SNAKE_RESOURCES = Path("bot/resources/snakes").absolute()  h1 = r'''```          ---- @@ -112,20 +114,17 @@ ANGLE_RANGE = math.pi * 2  def get_resource(file: str) -> List[dict]:      """Load Snake resources JSON.""" -      with (SNAKE_RESOURCES / f"{file}.json").open(encoding="utf-8") as snakefile:          return json.load(snakefile) -def smoothstep(t): +def smoothstep(t: float) -> float:      """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating.""" -      return t * t * (3. - 2. * t) -def lerp(t, a, b): +def lerp(t: float, a: float, b: float) -> float:      """Linear interpolation between a and b, given a fraction t.""" -      return a + t * (b - a) @@ -141,7 +140,7 @@ class PerlinNoiseFactory(object):      Licensed under ISC      """ -    def __init__(self, dimension, octaves=1, tile=(), unbias=False): +    def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False):          """          Create a new Perlin noise factory in the given number of dimensions. @@ -155,10 +154,9 @@ class PerlinNoiseFactory(object):          This will produce noise that tiles every 3 units vertically, but never tiles horizontally. -        If ``unbias`` is true, the smoothstep function will be applied to the output before returning +        If ``unbias`` is True, the smoothstep function will be applied to the output before returning          it, to counteract some of Perlin noise's significant bias towards the center of its output range.          """ -          self.dimension = dimension          self.octaves = octaves          self.tile = tile + (0,) * dimension @@ -170,13 +168,12 @@ class PerlinNoiseFactory(object):          self.gradient = {} -    def _generate_gradient(self): +    def _generate_gradient(self) -> Tuple[float, ...]:          """          Generate a random unit vector at each grid point.          This is the "gradient" vector, in that the grid tile slopes towards it          """ -          # 1 dimension is special, since the only unit vector is trivial;          # instead, use a slope between -1 and 1          if self.dimension == 1: @@ -191,9 +188,8 @@ class PerlinNoiseFactory(object):          scale = sum(n * n for n in random_point) ** -0.5          return tuple(coord * scale for coord in random_point) -    def get_plain_noise(self, *point): +    def get_plain_noise(self, *point) -> float:          """Get plain noise for a single point, without taking into account either octaves or tiling.""" -          if len(point) != self.dimension:              raise ValueError("Expected {0} values, got {1}".format(                  self.dimension, len(point))) @@ -240,13 +236,12 @@ class PerlinNoiseFactory(object):          return dots[0] * self.scale_factor -    def __call__(self, *point): +    def __call__(self, *point) -> float:          """          Get the value of this Perlin noise function at the given point.          The number of values given should match the number of dimensions.          """ -          ret = 0          for o in range(self.octaves):              o2 = 1 << o @@ -292,22 +287,9 @@ def create_snek_frame(      """      Creates a single random snek frame using Perlin noise. -    :param perlin_factory: the perlin noise factory used. Required. -    :param perlin_lookup_vertical_shift: the Perlin noise shift in the Y-dimension for this frame -    :param image_dimensions: the size of the output image. -    :param image_margins: the margins to respect inside of the image. -    :param snake_length: the length of the snake, in segments. -    :param snake_color: the color of the snake. -    :param bg_color: the background color. -    :param segment_length_range: the range of the segment length. Values will be generated inside -                                 this range, including the bounds. -    :param snake_width: the width of the snek, in pixels. -    :param text: the text to display with the snek. Set to None for no text. -    :param text_position: the position of the text. -    :param text_color: the color of the text. -    :return: a PIL image, representing a single frame. +    `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame. +    If `text` is given, display the given text with the snek.      """ -      start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X])      start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y])      points = [(start_x, start_y)] @@ -360,12 +342,12 @@ def create_snek_frame(      return image -def frame_to_png_bytes(image: Image): +def frame_to_png_bytes(image: Image) -> io.BytesIO:      """Convert image to byte stream.""" -      stream = io.BytesIO()      image.save(stream, format='PNG') -    return stream.getvalue() +    stream.seek(0) +    return stream  log = logging.getLogger(__name__) @@ -387,7 +369,7 @@ GAME_SCREEN_EMOJI = [  class SnakeAndLaddersGame:      """Snakes and Ladders game Cog.""" -    def __init__(self, snakes, context: Context): +    def __init__(self, snakes: Cog, context: Context):          self.snakes = snakes          self.ctx = context          self.channel = self.ctx.channel @@ -402,17 +384,14 @@ class SnakeAndLaddersGame:          self.positions = None          self.rolls = [] -    async def open_game(self): +    async def open_game(self) -> None:          """          Create a new Snakes and Ladders game. -        Listen for reactions until players have joined, -        and the game has been started. +        Listen for reactions until players have joined, and the game has been started.          """ - -        def startup_event_check(reaction_: Reaction, user_: Member): +        def startup_event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on.""" -              return (                  all((                      reaction_.message.id == startup.id,       # Reaction is on startup message @@ -434,7 +413,6 @@ class SnakeAndLaddersGame:              "**Snakes and Ladders**: A new game is about to start!",              file=File(                  str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), -                # os.path.join("bot", "resources", "snakes", "snakes_and_ladders", "banner.jpg"),                  filename='Snakes and Ladders.jpg'              )          ) @@ -457,8 +435,9 @@ class SnakeAndLaddersGame:                  if reaction.emoji == JOIN_EMOJI:                      await self.player_join(user)                  elif reaction.emoji == CANCEL_EMOJI: -                    if self.ctx.author == user: -                        await self.cancel_game(user) +                    if user == self.author or (self._is_moderator(user) and user not in self.players): +                        # Allow game author or non-playing moderation staff to cancel a waiting game +                        await self.cancel_game()                          return                      else:                          await self.player_leave(user) @@ -473,10 +452,11 @@ class SnakeAndLaddersGame:              except asyncio.TimeoutError:                  log.debug("Snakes and Ladders timed out waiting for a reaction") -                self.cancel_game(self.author) +                await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes -    async def _add_player(self, user: Member): +    async def _add_player(self, user: Member) -> None: +        """Add player to game."""          self.players.append(user)          self.player_tiles[user.id] = 1 @@ -484,14 +464,13 @@ class SnakeAndLaddersGame:          im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))          self.avatar_images[user.id] = im -    async def player_join(self, user: Member): +    async def player_join(self, user: Member) -> None:          """          Handle players joining the game.          Prevent player joining if they have already joined, if the game is full, or if the game is          in a waiting state.          """ -          for p in self.players:              if user == p:                  await self.channel.send(user.mention + " You are already in the game.", delete_after=10) @@ -511,21 +490,16 @@ class SnakeAndLaddersGame:              delete_after=10          ) -    async def player_leave(self, user: Member): +    async def player_leave(self, user: Member) -> bool:          """          Handle players leaving the game. -        Leaving is prevented if the user initiated the game or if they weren't part of it in the -        first place. -        """ +        Leaving is prevented if the user wasn't part of the game. -        if user == self.author: -            await self.channel.send( -                user.mention + " You are the author, and cannot leave the game. Execute " -                "`sal cancel` to cancel the game.", -                delete_after=10 -            ) -            return +        If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean +        is returned True to prevent a game from continuing after it's destroyed. +        """ +        is_surrendered = False  # Sentinel value to assist with stopping a surrendered game          for p in self.players:              if user == p:                  self.players.remove(p) @@ -536,52 +510,44 @@ class SnakeAndLaddersGame:                      delete_after=10                  ) -                if self.state != 'waiting' and len(self.players) == 1: +                if self.state != 'waiting' and len(self.players) == 0:                      await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") +                    is_surrendered = True                      self._destruct() -                return -        await self.channel.send(user.mention + " You are not in the match.", delete_after=10) -    async def cancel_game(self, user: Member): -        """Allow the game author to cancel the running game.""" +                return is_surrendered +        else: +            await self.channel.send(user.mention + " You are not in the match.", delete_after=10) +            return is_surrendered -        if not user == self.author: -            await self.channel.send(user.mention + " Only the author of the game can cancel it.", delete_after=10) -            return +    async def cancel_game(self) -> None: +        """Cancel the running game."""          await self.channel.send("**Snakes and Ladders**: Game has been canceled.")          self._destruct() -    async def start_game(self, user: Member): +    async def start_game(self, user: Member) -> None:          """          Allow the game author to begin the game. -        The game cannot be started if there aren't enough players joined or if the game is in a -        waiting state. +        The game cannot be started if the game is in a waiting state.          """ -          if not user == self.author:              await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)              return -        if len(self.players) < 1: -            await self.channel.send( -                user.mention + " A minimum of 2 players is required to start the game.", -                delete_after=10 -            ) -            return +          if not self.state == 'waiting':              await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)              return +          self.state = 'starting'          player_list = ', '.join(user.mention for user in self.players)          await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)          await self.start_round() -    async def start_round(self): +    async def start_round(self) -> None:          """Begin the round.""" - -        def game_event_check(reaction_: Reaction, user_: Member): +        def game_event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on.""" -              return (                  all((                      reaction_.message.id == self.positions.id,  # Reaction is on positions message @@ -593,8 +559,6 @@ class SnakeAndLaddersGame:          self.state = 'roll'          for user in self.players:              self.round_has_rolled[user.id] = False -        # board_img = Image.open(os.path.join( -        #     "bot", "resources", "snakes", "snakes_and_ladders", "board.jpg"))          board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg"))          player_row_size = math.ceil(MAX_PLAYERS / 2) @@ -609,9 +573,8 @@ class SnakeAndLaddersGame:              y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size)              board_img.paste(self.avatar_images[player.id],                              box=(x_offset, y_offset)) -        stream = io.BytesIO() -        board_img.save(stream, format='JPEG') -        board_file = File(stream.getvalue(), filename='Board.jpg') + +        board_file = File(frame_to_png_bytes(board_img), filename='Board.jpg')          player_list = '\n'.join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)          # Store and send new messages @@ -641,6 +604,7 @@ class SnakeAndLaddersGame:          for emoji in GAME_SCREEN_EMOJI:              await self.positions.add_reaction(emoji) +        is_surrendered = False          while True:              try:                  reaction, user = await self.ctx.bot.wait_for( @@ -652,11 +616,12 @@ class SnakeAndLaddersGame:                  if reaction.emoji == ROLL_EMOJI:                      await self.player_roll(user)                  elif reaction.emoji == CANCEL_EMOJI: -                    if self.ctx.author == user: -                        await self.cancel_game(user) +                    if self._is_moderator(user) and user not in self.players: +                        # Only allow non-playing moderation staff to cancel a running game +                        await self.cancel_game()                          return                      else: -                        await self.player_leave(user) +                        is_surrendered = await self.player_leave(user)                  await self.positions.remove_reaction(reaction.emoji, user) @@ -665,15 +630,17 @@ class SnakeAndLaddersGame:              except asyncio.TimeoutError:                  log.debug("Snakes and Ladders timed out waiting for a reaction") -                await self.cancel_game(self.author) +                await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes          # Round completed -        await self._complete_round() +        # Check to see if the game was surrendered before completing the round, without this +        # sentinel, the game object would be deleted but the next round still posted into purgatory +        if not is_surrendered: +            await self._complete_round() -    async def player_roll(self, user: Member): +    async def player_roll(self, user: Member) -> None:          """Handle the player's roll.""" -          if user.id not in self.player_tiles:              await self.channel.send(user.mention + " You are not in the match.", delete_after=10)              return @@ -704,7 +671,8 @@ class SnakeAndLaddersGame:          self.player_tiles[user.id] = min(100, next_tile)          self.round_has_rolled[user.id] = True -    async def _complete_round(self): +    async def _complete_round(self) -> None: +        """At the conclusion of a round check to see if there's been a winner."""          self.state = 'post_round'          # check for winner @@ -719,22 +687,30 @@ class SnakeAndLaddersGame:          self._destruct()      def _check_winner(self) -> Member: +        """Return a winning member if we're in the post-round state and there's a winner."""          if self.state != 'post_round':              return None          return next((player for player in self.players if self.player_tiles[player.id] == 100),                      None) -    def _check_all_rolled(self): +    def _check_all_rolled(self) -> bool: +        """Check if all members have made their roll."""          return all(rolled for rolled in self.round_has_rolled.values()) -    def _destruct(self): +    def _destruct(self) -> None: +        """Clean up the finished game object."""          del self.snakes.active_sal[self.channel] -    def _board_coordinate_from_index(self, index: int): -        # converts the tile number to the x/y coordinates for graphical purposes +    def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]: +        """Convert the tile number to the x/y coordinates for graphical purposes."""          y_level = 9 - math.floor((index - 1) / 10)          is_reversed = math.floor((index - 1) / 10) % 2 != 0          x_level = (index - 1) % 10          if is_reversed:              x_level = 9 - x_level          return x_level, y_level + +    @staticmethod +    def _is_moderator(user: Member) -> bool: +        """Return True if the user is a Moderator.""" +        return any(Roles.moderator == role.id for role in user.roles) | 
