aboutsummaryrefslogtreecommitdiffstats
path: root/bot/seasons/evergreen/snakes/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'bot/seasons/evergreen/snakes/utils.py')
-rw-r--r--bot/seasons/evergreen/snakes/utils.py149
1 files changed, 71 insertions, 78 deletions
diff --git a/bot/seasons/evergreen/snakes/utils.py b/bot/seasons/evergreen/snakes/utils.py
index 88fb2032..7d6caf04 100644
--- a/bot/seasons/evergreen/snakes/utils.py
+++ b/bot/seasons/evergreen/snakes/utils.py
@@ -11,7 +11,9 @@ from typing import List, Tuple
from PIL import Image
from PIL.ImageDraw import ImageDraw
from discord import File, Member, Reaction
-from discord.ext.commands import Context
+from discord.ext.commands import Cog, Context
+
+from bot.constants import Roles
SNAKE_RESOURCES = Path("bot/resources/snakes").absolute()
@@ -116,12 +118,12 @@ def get_resource(file: str) -> List[dict]:
return json.load(snakefile)
-def smoothstep(t):
+def smoothstep(t: float) -> float:
"""Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating."""
return t * t * (3. - 2. * t)
-def lerp(t, a, b):
+def lerp(t: float, a: float, b: float) -> float:
"""Linear interpolation between a and b, given a fraction t."""
return a + t * (b - a)
@@ -138,7 +140,7 @@ class PerlinNoiseFactory(object):
Licensed under ISC
"""
- def __init__(self, dimension, octaves=1, tile=(), unbias=False):
+ def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False):
"""
Create a new Perlin noise factory in the given number of dimensions.
@@ -152,7 +154,7 @@ class PerlinNoiseFactory(object):
This will produce noise that tiles every 3 units vertically, but never tiles horizontally.
- If ``unbias`` is true, the smoothstep function will be applied to the output before returning
+ If ``unbias`` is True, the smoothstep function will be applied to the output before returning
it, to counteract some of Perlin noise's significant bias towards the center of its output range.
"""
self.dimension = dimension
@@ -166,7 +168,7 @@ class PerlinNoiseFactory(object):
self.gradient = {}
- def _generate_gradient(self):
+ def _generate_gradient(self) -> Tuple[float, ...]:
"""
Generate a random unit vector at each grid point.
@@ -186,7 +188,7 @@ class PerlinNoiseFactory(object):
scale = sum(n * n for n in random_point) ** -0.5
return tuple(coord * scale for coord in random_point)
- def get_plain_noise(self, *point):
+ def get_plain_noise(self, *point) -> float:
"""Get plain noise for a single point, without taking into account either octaves or tiling."""
if len(point) != self.dimension:
raise ValueError("Expected {0} values, got {1}".format(
@@ -234,7 +236,7 @@ class PerlinNoiseFactory(object):
return dots[0] * self.scale_factor
- def __call__(self, *point):
+ def __call__(self, *point) -> float:
"""
Get the value of this Perlin noise function at the given point.
@@ -285,20 +287,8 @@ def create_snek_frame(
"""
Creates a single random snek frame using Perlin noise.
- :param perlin_factory: the perlin noise factory used. Required.
- :param perlin_lookup_vertical_shift: the Perlin noise shift in the Y-dimension for this frame
- :param image_dimensions: the size of the output image.
- :param image_margins: the margins to respect inside of the image.
- :param snake_length: the length of the snake, in segments.
- :param snake_color: the color of the snake.
- :param bg_color: the background color.
- :param segment_length_range: the range of the segment length. Values will be generated inside
- this range, including the bounds.
- :param snake_width: the width of the snek, in pixels.
- :param text: the text to display with the snek. Set to None for no text.
- :param text_position: the position of the text.
- :param text_color: the color of the text.
- :return: a PIL image, representing a single frame.
+ `perlin_lookup_vertical_shift` represents the Perlin noise shift in the Y-dimension for this frame.
+ If `text` is given, display the given text with the snek.
"""
start_x = random.randint(image_margins[X], image_dimensions[X] - image_margins[X])
start_y = random.randint(image_margins[Y], image_dimensions[Y] - image_margins[Y])
@@ -379,7 +369,7 @@ GAME_SCREEN_EMOJI = [
class SnakeAndLaddersGame:
"""Snakes and Ladders game Cog."""
- def __init__(self, snakes, context: Context):
+ def __init__(self, snakes: Cog, context: Context):
self.snakes = snakes
self.ctx = context
self.channel = self.ctx.channel
@@ -394,14 +384,13 @@ class SnakeAndLaddersGame:
self.positions = None
self.rolls = []
- async def open_game(self):
+ async def open_game(self) -> None:
"""
Create a new Snakes and Ladders game.
- Listen for reactions until players have joined,
- and the game has been started.
+ Listen for reactions until players have joined, and the game has been started.
"""
- def startup_event_check(reaction_: Reaction, user_: Member):
+ def startup_event_check(reaction_: Reaction, user_: Member) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
all((
@@ -424,7 +413,6 @@ class SnakeAndLaddersGame:
"**Snakes and Ladders**: A new game is about to start!",
file=File(
str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"),
- # os.path.join("bot", "resources", "snakes", "snakes_and_ladders", "banner.jpg"),
filename='Snakes and Ladders.jpg'
)
)
@@ -447,8 +435,9 @@ class SnakeAndLaddersGame:
if reaction.emoji == JOIN_EMOJI:
await self.player_join(user)
elif reaction.emoji == CANCEL_EMOJI:
- if self.ctx.author == user:
- await self.cancel_game(user)
+ if user == self.author or (self._is_moderator(user) and user not in self.players):
+ # Allow game author or non-playing moderation staff to cancel a waiting game
+ await self.cancel_game()
return
else:
await self.player_leave(user)
@@ -463,10 +452,11 @@ class SnakeAndLaddersGame:
except asyncio.TimeoutError:
log.debug("Snakes and Ladders timed out waiting for a reaction")
- self.cancel_game(self.author)
+ await self.cancel_game()
return # We're done, no reactions for the last 5 minutes
- async def _add_player(self, user: Member):
+ async def _add_player(self, user: Member) -> None:
+ """Add player to game."""
self.players.append(user)
self.player_tiles[user.id] = 1
@@ -474,7 +464,7 @@ class SnakeAndLaddersGame:
im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))
self.avatar_images[user.id] = im
- async def player_join(self, user: Member):
+ async def player_join(self, user: Member) -> None:
"""
Handle players joining the game.
@@ -500,20 +490,16 @@ class SnakeAndLaddersGame:
delete_after=10
)
- async def player_leave(self, user: Member):
+ async def player_leave(self, user: Member) -> bool:
"""
Handle players leaving the game.
- Leaving is prevented if the user initiated the game or if they weren't part of it in the
- first place.
+ Leaving is prevented if the user wasn't part of the game.
+
+ If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean
+ is returned True to prevent a game from continuing after it's destroyed.
"""
- if user == self.author:
- await self.channel.send(
- user.mention + " You are the author, and cannot leave the game. Execute "
- "`sal cancel` to cancel the game.",
- delete_after=10
- )
- return
+ is_surrendered = False # Sentinel value to assist with stopping a surrendered game
for p in self.players:
if user == p:
self.players.remove(p)
@@ -524,47 +510,43 @@ class SnakeAndLaddersGame:
delete_after=10
)
- if self.state != 'waiting' and len(self.players) == 1:
+ if self.state != 'waiting' and len(self.players) == 0:
await self.channel.send("**Snakes and Ladders**: The game has been surrendered!")
+ is_surrendered = True
self._destruct()
- return
- await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
- async def cancel_game(self, user: Member):
- """Allow the game author to cancel the running game."""
- if not user == self.author:
- await self.channel.send(user.mention + " Only the author of the game can cancel it.", delete_after=10)
- return
+ return is_surrendered
+ else:
+ await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
+ return is_surrendered
+
+ async def cancel_game(self) -> None:
+ """Cancel the running game."""
await self.channel.send("**Snakes and Ladders**: Game has been canceled.")
self._destruct()
- async def start_game(self, user: Member):
+ async def start_game(self, user: Member) -> None:
"""
Allow the game author to begin the game.
- The game cannot be started if there aren't enough players joined or if the game is in a
- waiting state.
+ The game cannot be started if the game is in a waiting state.
"""
if not user == self.author:
await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)
return
- if len(self.players) < 1:
- await self.channel.send(
- user.mention + " A minimum of 2 players is required to start the game.",
- delete_after=10
- )
- return
+
if not self.state == 'waiting':
await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)
return
+
self.state = 'starting'
player_list = ', '.join(user.mention for user in self.players)
await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)
await self.start_round()
- async def start_round(self):
+ async def start_round(self) -> None:
"""Begin the round."""
- def game_event_check(reaction_: Reaction, user_: Member):
+ def game_event_check(reaction_: Reaction, user_: Member) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
all((
@@ -577,8 +559,6 @@ class SnakeAndLaddersGame:
self.state = 'roll'
for user in self.players:
self.round_has_rolled[user.id] = False
- # board_img = Image.open(os.path.join(
- # "bot", "resources", "snakes", "snakes_and_ladders", "board.jpg"))
board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg"))
player_row_size = math.ceil(MAX_PLAYERS / 2)
@@ -593,9 +573,8 @@ class SnakeAndLaddersGame:
y_offset -= BOARD_PLAYER_SIZE * math.floor(i / player_row_size)
board_img.paste(self.avatar_images[player.id],
box=(x_offset, y_offset))
- stream = io.BytesIO()
- board_img.save(stream, format='JPEG')
- board_file = File(stream.getvalue(), filename='Board.jpg')
+
+ board_file = File(frame_to_png_bytes(board_img), filename='Board.jpg')
player_list = '\n'.join((user.mention + ": Tile " + str(self.player_tiles[user.id])) for user in self.players)
# Store and send new messages
@@ -625,6 +604,7 @@ class SnakeAndLaddersGame:
for emoji in GAME_SCREEN_EMOJI:
await self.positions.add_reaction(emoji)
+ is_surrendered = False
while True:
try:
reaction, user = await self.ctx.bot.wait_for(
@@ -636,11 +616,12 @@ class SnakeAndLaddersGame:
if reaction.emoji == ROLL_EMOJI:
await self.player_roll(user)
elif reaction.emoji == CANCEL_EMOJI:
- if self.ctx.author == user:
- await self.cancel_game(user)
+ if self._is_moderator(user) and user not in self.players:
+ # Only allow non-playing moderation staff to cancel a running game
+ await self.cancel_game()
return
else:
- await self.player_leave(user)
+ is_surrendered = await self.player_leave(user)
await self.positions.remove_reaction(reaction.emoji, user)
@@ -649,13 +630,16 @@ class SnakeAndLaddersGame:
except asyncio.TimeoutError:
log.debug("Snakes and Ladders timed out waiting for a reaction")
- await self.cancel_game(self.author)
+ await self.cancel_game()
return # We're done, no reactions for the last 5 minutes
# Round completed
- await self._complete_round()
+ # Check to see if the game was surrendered before completing the round, without this
+ # sentinel, the game object would be deleted but the next round still posted into purgatory
+ if not is_surrendered:
+ await self._complete_round()
- async def player_roll(self, user: Member):
+ async def player_roll(self, user: Member) -> None:
"""Handle the player's roll."""
if user.id not in self.player_tiles:
await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
@@ -687,7 +671,8 @@ class SnakeAndLaddersGame:
self.player_tiles[user.id] = min(100, next_tile)
self.round_has_rolled[user.id] = True
- async def _complete_round(self):
+ async def _complete_round(self) -> None:
+ """At the conclusion of a round check to see if there's been a winner."""
self.state = 'post_round'
# check for winner
@@ -702,22 +687,30 @@ class SnakeAndLaddersGame:
self._destruct()
def _check_winner(self) -> Member:
+ """Return a winning member if we're in the post-round state and there's a winner."""
if self.state != 'post_round':
return None
return next((player for player in self.players if self.player_tiles[player.id] == 100),
None)
- def _check_all_rolled(self):
+ def _check_all_rolled(self) -> bool:
+ """Check if all members have made their roll."""
return all(rolled for rolled in self.round_has_rolled.values())
- def _destruct(self):
+ def _destruct(self) -> None:
+ """Clean up the finished game object."""
del self.snakes.active_sal[self.channel]
- def _board_coordinate_from_index(self, index: int):
- # converts the tile number to the x/y coordinates for graphical purposes
+ def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]:
+ """Convert the tile number to the x/y coordinates for graphical purposes."""
y_level = 9 - math.floor((index - 1) / 10)
is_reversed = math.floor((index - 1) / 10) % 2 != 0
x_level = (index - 1) % 10
if is_reversed:
x_level = 9 - x_level
return x_level, y_level
+
+ @staticmethod
+ def _is_moderator(user: Member) -> bool:
+ """Return True if the user is a Moderator."""
+ return any(Roles.moderator == role.id for role in user.roles)