diff options
author | 2021-02-24 17:22:44 +0300 | |
---|---|---|
committer | 2021-02-24 17:22:44 +0300 | |
commit | 09c79fa15c200d8ba2ca61a4fbc5de218a3ca1ee (patch) | |
tree | 9b4fa77985fcee940ef1cbfce7bf2fb001edd138 | |
parent | Merge pull request #589 from python-discord/Reset-valentine-command-cooldown-... (diff) | |
parent | Correct the parentheses format (diff) |
Merge pull request #560 from Shivansh-007/feature/connect_four
Connect Four Game
-rw-r--r-- | Pipfile | 1 | ||||
-rw-r--r-- | Pipfile.lock | 21 | ||||
-rw-r--r-- | bot/constants.py | 5 | ||||
-rw-r--r-- | bot/exts/evergreen/connect_four.py | 450 | ||||
-rw-r--r-- | bot/exts/evergreen/error_handler.py | 9 |
5 files changed, 477 insertions, 9 deletions
@@ -14,6 +14,7 @@ sentry-sdk = "~=0.19" PyYAML = "~=5.3.1" "discord.py" = {extras = ["voice"], version = "~=1.5.1"} async-rediscache = {extras = ["fakeredis"], version = "~=0.1.4"} +emojis = "~=0.6.0" [dev-packages] flake8 = "~=3.8" diff --git a/Pipfile.lock b/Pipfile.lock index bd894ffa..ec801979 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "9be419062bd9db364ac9dddfcd50aef9c932384b45850363e482591fe7d12403" + "sha256": "b4aaaacbab13179145e36d7b86c736db512286f6cce8e513cc30c48d68fe3810" }, "pipfile-spec": 6, "requires": { @@ -161,6 +161,14 @@ "index": "pypi", "version": "==1.5.1" }, + "emojis": { + "hashes": [ + "sha256:7da34c8a78ae262fd68cef9e2c78a3c1feb59784489eeea0f54ba1d4b7111c7c", + "sha256:bf605d1f1a27a81cd37fe82eb65781c904467f569295a541c33710b97e4225ec" + ], + "index": "pypi", + "version": "==0.6.0" + }, "fakeredis": { "hashes": [ "sha256:01cb47d2286825a171fb49c0e445b1fa9307087e07cbb3d027ea10dbff108b6a", @@ -406,10 +414,11 @@ }, "sentry-sdk": { "hashes": [ - "sha256:3693cb47ba8d90c004ac002425770b32aaf0c83a846ec48e2d1364e7db1d072d" + "sha256:4ae8d1ced6c67f1c8ea51d82a16721c166c489b76876c9f2c202b8a50334b237", + "sha256:e75c8c58932bda8cd293ea8e4b242527129e1caaec91433d21b8b2f20fee030b" ], "index": "pypi", - "version": "==0.20.1" + "version": "==0.20.3" }, "six": { "hashes": [ @@ -576,11 +585,11 @@ }, "identify": { "hashes": [ - "sha256:70b638cf4743f33042bebb3b51e25261a0a10e80f978739f17e7fd4837664a66", - "sha256:9dfb63a2e871b807e3ba62f029813552a24b5289504f5b071dea9b041aee9fe4" + "sha256:de7129142a5c86d75a52b96f394d94d96d497881d2aaf8eafe320cdbe8ac4bcc", + "sha256:e0dae57c0397629ce13c289f6ddde0204edf518f557bfdb1e56474aa143e77c3" ], "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==1.5.13" + "version": "==1.5.14" }, "mccabe": { "hashes": [ diff --git a/bot/constants.py b/bot/constants.py index bb538487..682ccf6f 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -165,6 +165,7 @@ class Emojis: envelope = "\U0001F4E8" trashcan = "<:trashcan:637136429717389331>" ok_hand = ":ok_hand:" + hand_raised = "\U0001f64b" dice_1 = "<:dice_1:755891608859443290>" dice_2 = "<:dice_2:755891608741740635>" @@ -179,7 +180,6 @@ class Emojis: pull_request_closed = "<:PRClosed:629695470519713818>" merge = "<:PRMerged:629695470570176522>" - # TicTacToe Emojis number_emojis = { 1: "\u0031\ufe0f\u20e3", 2: "\u0032\ufe0f\u20e3", @@ -191,8 +191,11 @@ class Emojis: 8: "\u0038\ufe0f\u20e3", 9: "\u0039\ufe0f\u20e3" } + confirmation = "\u2705" decline = "\u274c" + incident_unactioned = "<:incident_unactioned:719645583245180960>" + x = "\U0001f1fd" o = "\U0001f1f4" diff --git a/bot/exts/evergreen/connect_four.py b/bot/exts/evergreen/connect_four.py new file mode 100644 index 00000000..7e3ec42b --- /dev/null +++ b/bot/exts/evergreen/connect_four.py @@ -0,0 +1,450 @@ +import asyncio +import random +import typing +from functools import partial + +import discord +import emojis +from discord.ext import commands +from discord.ext.commands import guild_only + +from bot.constants import Emojis + +NUMBERS = list(Emojis.number_emojis.values()) +CROSS_EMOJI = Emojis.incident_unactioned + +Coordinate = typing.Optional[typing.Tuple[int, int]] +EMOJI_CHECK = typing.Union[discord.Emoji, str] + + +class Game: + """A Connect 4 Game.""" + + def __init__( + self, + bot: commands.Bot, + channel: discord.TextChannel, + player1: discord.Member, + player2: typing.Optional[discord.Member], + tokens: typing.List[str], + size: int = 7 + ) -> None: + + self.bot = bot + self.channel = channel + self.player1 = player1 + self.player2 = player2 or AI(self.bot, game=self) + self.tokens = tokens + + self.grid = self.generate_board(size) + self.grid_size = size + + self.unicode_numbers = NUMBERS[:self.grid_size] + + self.message = None + + self.player_active = None + self.player_inactive = None + + @staticmethod + def generate_board(size: int) -> typing.List[typing.List[int]]: + """Generate the connect 4 board.""" + return [[0 for _ in range(size)] for _ in range(size)] + + async def print_grid(self) -> None: + """Formats and outputs the Connect Four grid to the channel.""" + title = ( + f'Connect 4: {self.player1.display_name}' + f' VS {self.bot.user.display_name if isinstance(self.player2, AI) else self.player2.display_name}' + ) + + rows = [" ".join(self.tokens[s] for s in row) for row in self.grid] + first_row = " ".join(x for x in NUMBERS[:self.grid_size]) + formatted_grid = "\n".join([first_row] + rows) + embed = discord.Embed(title=title, description=formatted_grid) + + if self.message: + await self.message.edit(embed=embed) + else: + self.message = await self.channel.send(content='Loading...') + for emoji in self.unicode_numbers: + await self.message.add_reaction(emoji) + await self.message.add_reaction(CROSS_EMOJI) + await self.message.edit(content=None, embed=embed) + + async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None: + """Announces to public chat.""" + if action == "win": + await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}") + elif action == "draw": + await self.channel.send(f"Game Over! {player1.mention} {player2.mention} It's A Draw :tada:") + elif action == "quit": + await self.channel.send(f"{self.player1.mention} surrendered. Game over!") + await self.print_grid() + + async def start_game(self) -> None: + """Begins the game.""" + self.player_active, self.player_inactive = self.player1, self.player2 + + while True: + await self.print_grid() + + if isinstance(self.player_active, AI): + coords = self.player_active.play() + if not coords: + await self.game_over( + "draw", + self.bot.user if isinstance(self.player_active, AI) else self.player_active, + self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, + ) + else: + coords = await self.player_turn() + + if not coords: + return + + if self.check_win(coords, 1 if self.player_active == self.player1 else 2): + await self.game_over( + "win", + self.bot.user if isinstance(self.player_active, AI) else self.player_active, + self.bot.user if isinstance(self.player_inactive, AI) else self.player_inactive, + ) + return + + self.player_active, self.player_inactive = self.player_inactive, self.player_active + + def predicate(self, reaction: discord.Reaction, user: discord.Member) -> bool: + """The predicate to check for the player's reaction.""" + return ( + reaction.message.id == self.message.id + and user.id == self.player_active.id + and str(reaction.emoji) in (*self.unicode_numbers, CROSS_EMOJI) + ) + + async def player_turn(self) -> Coordinate: + """Initiate the player's turn.""" + message = await self.channel.send( + f"{self.player_active.mention}, it's your turn! React with the column you want to place your token in." + ) + player_num = 1 if self.player_active == self.player1 else 2 + while True: + try: + reaction, user = await self.bot.wait_for("reaction_add", check=self.predicate, timeout=30.0) + except asyncio.TimeoutError: + await self.channel.send(f"{self.player_active.mention}, you took too long. Game over!") + return + else: + await message.delete() + if str(reaction.emoji) == CROSS_EMOJI: + await self.game_over("quit", self.player_active, self.player_inactive) + return + + await self.message.remove_reaction(reaction, user) + + column_num = self.unicode_numbers.index(str(reaction.emoji)) + column = [row[column_num] for row in self.grid] + + for row_num, square in reversed(list(enumerate(column))): + if not square: + self.grid[row_num][column_num] = player_num + return row_num, column_num + message = await self.channel.send(f"Column {column_num + 1} is full. Try again") + + def check_win(self, coords: Coordinate, player_num: int) -> bool: + """Check that placing a counter here would cause the player to win.""" + vertical = [(-1, 0), (1, 0)] + horizontal = [(0, 1), (0, -1)] + forward_diag = [(-1, 1), (1, -1)] + backward_diag = [(-1, -1), (1, 1)] + axes = [vertical, horizontal, forward_diag, backward_diag] + + for axis in axes: + counters_in_a_row = 1 # The initial counter that is compared to + for (row_incr, column_incr) in axis: + row, column = coords + row += row_incr + column += column_incr + + while 0 <= row < self.grid_size and 0 <= column < self.grid_size: + if self.grid[row][column] == player_num: + counters_in_a_row += 1 + row += row_incr + column += column_incr + else: + break + if counters_in_a_row >= 4: + return True + return False + + +class AI: + """The Computer Player for Single-Player games.""" + + def __init__(self, bot: commands.Bot, game: Game) -> None: + self.game = game + self.mention = bot.user.mention + + def get_possible_places(self) -> typing.List[Coordinate]: + """Gets all the coordinates where the AI could possibly place a counter.""" + possible_coords = [] + for column_num in range(self.game.grid_size): + column = [row[column_num] for row in self.game.grid] + for row_num, square in reversed(list(enumerate(column))): + if not square: + possible_coords.append((row_num, column_num)) + break + return possible_coords + + def check_ai_win(self, coord_list: typing.List[Coordinate]) -> typing.Optional[Coordinate]: + """ + Check AI win. + + Check if placing a counter in any possible coordinate would cause the AI to win + with 10% chance of not winning and returning None + """ + if random.randint(1, 10) == 1: + return + for coords in coord_list: + if self.game.check_win(coords, 2): + return coords + + def check_player_win(self, coord_list: typing.List[Coordinate]) -> typing.Optional[Coordinate]: + """ + Check Player win. + + Check if placing a counter in possible coordinates would stop the player + from winning with 25% of not blocking them and returning None. + """ + if random.randint(1, 4) == 1: + return + for coords in coord_list: + if self.game.check_win(coords, 1): + return coords + + @staticmethod + def random_coords(coord_list: typing.List[Coordinate]) -> Coordinate: + """Picks a random coordinate from the possible ones.""" + return random.choice(coord_list) + + def play(self) -> typing.Union[Coordinate, bool]: + """ + Plays for the AI. + + Gets all possible coords, and determins the move: + 1. coords where it can win. + 2. coords where the player can win. + 3. Random coord + The first possible value is choosen. + """ + possible_coords = self.get_possible_places() + + if not possible_coords: + return False + + coords = ( + self.check_ai_win(possible_coords) + or self.check_player_win(possible_coords) + or self.random_coords(possible_coords) + ) + + row, column = coords + self.game.grid[row][column] = 2 + return coords + + +class ConnectFour(commands.Cog): + """Connect Four. The Classic Vertical Four-in-a-row Game!""" + + def __init__(self, bot: commands.Bot) -> None: + self.bot = bot + self.games: typing.List[Game] = [] + self.waiting: typing.List[discord.Member] = [] + + self.tokens = [":white_circle:", ":blue_circle:", ":red_circle:"] + + self.max_board_size = 9 + self.min_board_size = 5 + + async def check_author(self, ctx: commands.Context, board_size: int) -> bool: + """Check if the requester is free and the board size is correct.""" + if self.already_playing(ctx.author): + await ctx.send("You're already playing a game!") + return False + + if ctx.author in self.waiting: + await ctx.send("You've already sent out a request for a player 2") + return False + + if not self.min_board_size <= board_size <= self.max_board_size: + await ctx.send(f"{board_size} is not a valid board size. A valid board size is " + f"between `{self.min_board_size}` and `{self.max_board_size}`.") + return False + + return True + + def get_player( + self, + ctx: commands.Context, + announcement: discord.Message, + reaction: discord.Reaction, + user: discord.Member + ) -> bool: + """Predicate checking the criteria for the announcement message.""" + if self.already_playing(ctx.author): # If they've joined a game since requesting a player 2 + return True # Is dealt with later on + + if ( + user.id not in (ctx.me.id, ctx.author.id) + and str(reaction.emoji) == Emojis.hand_raised + and reaction.message.id == announcement.id + ): + if self.already_playing(user): + self.bot.loop.create_task(ctx.send(f"{user.mention} You're already playing a game!")) + self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) + return False + + if user in self.waiting: + self.bot.loop.create_task(ctx.send( + f"{user.mention} Please cancel your game first before joining another one." + )) + self.bot.loop.create_task(announcement.remove_reaction(reaction, user)) + return False + + return True + + if ( + user.id == ctx.author.id + and str(reaction.emoji) == CROSS_EMOJI + and reaction.message.id == announcement.id + ): + return True + return False + + def already_playing(self, player: discord.Member) -> bool: + """Check if someone is already in a game.""" + return any(player in (game.player1, game.player2) for game in self.games) + + @staticmethod + def check_emojis( + e1: EMOJI_CHECK, e2: EMOJI_CHECK + ) -> typing.Tuple[bool, typing.Optional[str]]: + """Validate the emojis, the user put.""" + if isinstance(e1, str) and emojis.count(e1) != 1: + return False, e1 + if isinstance(e2, str) and emojis.count(e2) != 1: + return False, e2 + return True, None + + async def _play_game( + self, + ctx: commands.Context, + user: typing.Optional[discord.Member], + board_size: int, + emoji1: str, + emoji2: str + ) -> None: + """Helper for playing a game of connect four.""" + self.tokens = [":white_circle:", str(emoji1), str(emoji2)] + game = None # if game fails to intialize in try...except + + try: + game = Game(self.bot, ctx.channel, ctx.author, user, self.tokens, size=board_size) + self.games.append(game) + await game.start_game() + self.games.remove(game) + except Exception: + # End the game in the event of an unforeseen error so the players aren't stuck in a game + await ctx.send(f"{ctx.author.mention} {user.mention if user else ''} An error occurred. Game failed") + if game in self.games: + self.games.remove(game) + raise + + @guild_only() + @commands.group( + invoke_without_command=True, + aliases=["4inarow", "connect4", "connectfour", "c4"], + case_insensitive=True + ) + async def connect_four( + self, + ctx: commands.Context, + board_size: int = 7, + emoji1: EMOJI_CHECK = "\U0001f535", + emoji2: EMOJI_CHECK = "\U0001f534" + ) -> None: + """ + Play the classic game of Connect Four with someone! + + Sets up a message waiting for someone else to react and play along. + The game will start once someone has reacted. + All inputs will be through reactions. + """ + check, emoji = self.check_emojis(emoji1, emoji2) + if not check: + raise commands.EmojiNotFound(emoji) + + check_author_result = await self.check_author(ctx, board_size) + if not check_author_result: + return + + announcement = await ctx.send( + "**Connect Four**: A new game is about to start!\n" + f"Press {Emojis.hand_raised} to play against {ctx.author.mention}!\n" + f"(Cancel the game with {CROSS_EMOJI}.)" + ) + self.waiting.append(ctx.author) + await announcement.add_reaction(Emojis.hand_raised) + await announcement.add_reaction(CROSS_EMOJI) + + try: + reaction, user = await self.bot.wait_for( + "reaction_add", + check=partial(self.get_player, ctx, announcement), + timeout=60.0 + ) + except asyncio.TimeoutError: + self.waiting.remove(ctx.author) + await announcement.delete() + await ctx.send( + f"{ctx.author.mention} Seems like there's no one here to play. " + f"Use `{ctx.prefix}{ctx.invoked_with} ai` to play against a computer." + ) + return + + if str(reaction.emoji) == CROSS_EMOJI: + self.waiting.remove(ctx.author) + await announcement.delete() + await ctx.send(f"{ctx.author.mention} Game cancelled.") + return + + await announcement.delete() + self.waiting.remove(ctx.author) + if self.already_playing(ctx.author): + return + + await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2)) + + @guild_only() + @connect_four.command(aliases=["bot", "computer", "cpu"]) + async def ai( + self, + ctx: commands.Context, + board_size: int = 7, + emoji1: EMOJI_CHECK = "\U0001f535", + emoji2: EMOJI_CHECK = "\U0001f534" + ) -> None: + """Play Connect Four against a computer player.""" + check, emoji = self.check_emojis(emoji1, emoji2) + if not check: + raise commands.EmojiNotFound(emoji) + + check_author_result = await self.check_author(ctx, board_size) + if not check_author_result: + return + + await self._play_game(ctx, None, board_size, str(emoji1), str(emoji2)) + + +def setup(bot: commands.Bot) -> None: + """Load ConnectFour Cog.""" + bot.add_cog(ConnectFour(bot)) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 99af1519..28902503 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -7,7 +7,7 @@ from discord import Embed, Message from discord.ext import commands from sentry_sdk import push_scope -from bot.constants import Colours, ERROR_REPLIES, NEGATIVE_REPLIES +from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure from bot.utils.exceptions import UserNotPlayingError @@ -83,7 +83,12 @@ class CommandErrorHandler(commands.Cog): return if isinstance(error, commands.NoPrivateMessage): - await ctx.send(embed=self.error_embed("This command can only be used in the server.", NEGATIVE_REPLIES)) + await ctx.send( + embed=self.error_embed( + f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", + NEGATIVE_REPLIES + ) + ) return if isinstance(error, commands.BadArgument): |