diff options
Diffstat (limited to 'bot/exts/evergreen')
| -rw-r--r-- | bot/exts/evergreen/battleship.py | 2 | ||||
| -rw-r--r-- | bot/exts/evergreen/cheatsheet.py | 113 | ||||
| -rw-r--r-- | bot/exts/evergreen/game.py | 83 | ||||
| -rw-r--r-- | bot/exts/evergreen/issues.py | 160 | ||||
| -rw-r--r-- | bot/exts/evergreen/status_cats.py | 33 | ||||
| -rw-r--r-- | bot/exts/evergreen/status_codes.py | 71 | ||||
| -rw-r--r-- | bot/exts/evergreen/tic_tac_toe.py | 323 | ||||
| -rw-r--r-- | bot/exts/evergreen/xkcd.py | 89 |
8 files changed, 796 insertions, 78 deletions
diff --git a/bot/exts/evergreen/battleship.py b/bot/exts/evergreen/battleship.py index 9bc374e6..fa3fb35c 100644 --- a/bot/exts/evergreen/battleship.py +++ b/bot/exts/evergreen/battleship.py @@ -140,7 +140,7 @@ class Game: @staticmethod def get_square(grid: Grid, square: str) -> Square: """Grabs a square from a grid with an inputted key.""" - index = ord(square[0]) - ord("A") + index = ord(square[0].upper()) - ord("A") number = int(square[1:]) return grid[number-1][index] # -1 since lists are indexed from 0 diff --git a/bot/exts/evergreen/cheatsheet.py b/bot/exts/evergreen/cheatsheet.py new file mode 100644 index 00000000..a64ddd69 --- /dev/null +++ b/bot/exts/evergreen/cheatsheet.py @@ -0,0 +1,113 @@ +import random +import re +import typing as t +from urllib.parse import quote_plus + +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Context + +from bot import constants +from bot.constants import Categories, Channels, Colours, ERROR_REPLIES, Roles, WHITELISTED_CHANNELS +from bot.utils.decorators import with_role + +ERROR_MESSAGE = f""" +Unknown cheat sheet. Please try to reformulate your query. + +**Examples**: +```md +{constants.Client.prefix}cht read json +{constants.Client.prefix}cht hello world +{constants.Client.prefix}cht lambda +``` +If the problem persists send a message in <#{Channels.dev_contrib}> +""" + +URL = 'https://cheat.sh/python/{search}' +ESCAPE_TT = str.maketrans({"`": "\\`"}) +ANSI_RE = re.compile(r"\x1b\[.*?m") +# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html. +HEADERS = {'User-Agent': 'curl/7.68.0'} + + +class CheatSheet(commands.Cog): + """Commands that sends a result of a cht.sh search in code blocks.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @staticmethod + def fmt_error_embed() -> Embed: + """ + Format the Error Embed. + + If the cht.sh search returned 404, overwrite it to send a custom error embed. + link -> https://github.com/chubin/cheat.sh/issues/198 + """ + embed = Embed( + title=random.choice(ERROR_REPLIES), + description=ERROR_MESSAGE, + colour=Colours.soft_red + ) + return embed + + def result_fmt(self, url: str, body_text: str) -> t.Tuple[bool, t.Union[str, Embed]]: + """Format Result.""" + if body_text.startswith("# 404 NOT FOUND"): + embed = self.fmt_error_embed() + return True, embed + + body_space = min(1986 - len(url), 1000) + + if len(body_text) > body_space: + description = (f"**Result Of cht.sh**\n" + f"```python\n{body_text[:body_space]}\n" + f"... (truncated - too many lines)```\n" + f"Full results: {url} ") + else: + description = (f"**Result Of cht.sh**\n" + f"```python\n{body_text}```\n" + f"{url}") + return False, description + + @commands.command( + name="cheat", + aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"), + ) + @commands.cooldown(1, 10, BucketType.user) + @with_role(Roles.everyone_role) + async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None: + """ + Search cheat.sh. + + Gets a post from https://cheat.sh/python/ by default. + Usage: + --> .cht read json + """ + if not ( + ctx.channel.category.id == Categories.help_in_use + or ctx.channel.id in WHITELISTED_CHANNELS + ): + return + + async with ctx.typing(): + search_string = quote_plus(" ".join(search_terms)) + + async with self.bot.http_session.get( + URL.format(search=search_string), headers=HEADERS + ) as response: + result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT) + + is_embed, description = self.result_fmt( + URL.format(search=search_string), + result + ) + if is_embed: + await ctx.send(embed=description) + else: + await ctx.send(content=description) + + +def setup(bot: commands.Bot) -> None: + """Load the CheatSheet cog.""" + bot.add_cog(CheatSheet(bot)) diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py index d0fd7a40..d37be0e2 100644 --- a/bot/exts/evergreen/game.py +++ b/bot/exts/evergreen/game.py @@ -2,7 +2,8 @@ import difflib import logging import random import re -from datetime import datetime as dt +from asyncio import sleep +from datetime import datetime as dt, timedelta from enum import IntEnum from typing import Any, Dict, List, Optional, Tuple @@ -17,10 +18,25 @@ from bot.utils.decorators import with_role from bot.utils.pagination import ImagePaginator, LinePaginator # Base URL of IGDB API -BASE_URL = "https://api-v3.igdb.com" +BASE_URL = "https://api.igdb.com/v4" -HEADERS = { - "user-key": Tokens.igdb, +CLIENT_ID = Tokens.igdb_client_id +CLIENT_SECRET = Tokens.igdb_client_secret + +# The number of seconds before expiry that we attempt to re-fetch a new access token +ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2 + +# URL to request API access token +OAUTH_URL = "https://id.twitch.tv/oauth2/token" + +OAUTH_PARAMS = { + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "grant_type": "client_credentials" +} + +BASE_HEADERS = { + "Client-ID": CLIENT_ID, "Accept": "application/json" } @@ -135,8 +151,47 @@ class Games(Cog): self.http_session: ClientSession = bot.http_session self.genres: Dict[str, int] = {} - - self.refresh_genres_task.start() + self.headers = BASE_HEADERS + + self.bot.loop.create_task(self.renew_access_token()) + + async def renew_access_token(self) -> None: + """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`.""" + while True: + async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp: + result = await resp.json() + if resp.status != 200: + # If there is a valid access token continue to use that, + # otherwise unload cog. + if "Authorization" in self.headers: + time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW) + logger.error( + "Failed to renew IGDB access token. " + f"Current token will last for {time_delta} " + f"OAuth response message: {result['message']}" + ) + else: + logger.warning( + "Invalid OAuth credentials. Unloading Games cog. " + f"OAuth response message: {result['message']}" + ) + self.bot.remove_cog('Games') + + return + + self.headers["Authorization"] = f"Bearer {result['access_token']}" + + # Attempt to renew before the token expires + next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW + + time_delta = timedelta(seconds=next_renewal) + logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}") + + # This will be true the first time this loop runs. + # Since we now have an access token, its safe to start this task. + if self.genres == {}: + self.refresh_genres_task.start() + await sleep(next_renewal) @tasks.loop(hours=24.0) async def refresh_genres_task(self) -> None: @@ -156,9 +211,8 @@ class Games(Cog): async def _get_genres(self) -> None: """Create genres variable for games command.""" body = "fields name; limit 100;" - async with self.http_session.get(f"{BASE_URL}/genres", data=body, headers=HEADERS) as resp: + async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp: result = await resp.json() - genres = {genre["name"].capitalize(): genre["id"] for genre in result} # Replace complex names with names from ALIASES @@ -306,7 +360,7 @@ class Games(Cog): body = GAMES_LIST_BODY.format(**params) # Do request to IGDB API, create headers, URL, define body, return result - async with self.http_session.get(url=f"{BASE_URL}/games", data=body, headers=HEADERS) as resp: + async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: return await resp.json() async def create_page(self, data: Dict[str, Any]) -> Tuple[str, str]: @@ -348,7 +402,7 @@ class Games(Cog): # Define request body of IGDB API request and do request body = SEARCH_BODY.format(**{"term": search_term}) - async with self.http_session.get(url=f"{BASE_URL}/games", data=body, headers=HEADERS) as resp: + async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: data = await resp.json() # Loop over games, format them to good format, make line and append this to total lines @@ -377,7 +431,7 @@ class Games(Cog): "offset": offset }) - async with self.http_session.get(url=f"{BASE_URL}/companies", data=body, headers=HEADERS) as resp: + async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp: return await resp.json() async def create_company_page(self, data: Dict[str, Any]) -> Tuple[str, str]: @@ -418,7 +472,10 @@ class Games(Cog): def setup(bot: Bot) -> None: """Add/Load Games cog.""" # Check does IGDB API key exist, if not, log warning and don't load cog - if not Tokens.igdb: - logger.warning("No IGDB API key. Not loading Games cog.") + if not Tokens.igdb_client_id: + logger.warning("No IGDB client ID. Not loading Games cog.") + return + if not Tokens.igdb_client_secret: + logger.warning("No IGDB client secret. Not loading Games cog.") return bot.add_cog(Games(bot)) diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index e419a6f5..73ebe547 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -1,11 +1,13 @@ import logging import random +import re +import typing as t +from enum import Enum import discord -from discord.ext import commands +from discord.ext import commands, tasks -from bot.constants import Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS -from bot.utils.decorators import override_in_channel +from bot.constants import Categories, Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS log = logging.getLogger(__name__) @@ -15,55 +17,86 @@ BAD_RESPONSE = { } MAX_REQUESTS = 10 - REQUEST_HEADERS = dict() + +REPOS_API = "https://api.github.com/orgs/{org}/repos" if GITHUB_TOKEN := Tokens.github: REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" +WHITELISTED_CATEGORIES = ( + Categories.devprojects, Categories.media, Categories.development +) +WHITELISTED_CHANNELS_ON_MESSAGE = (Channels.organisation, Channels.mod_meta, Channels.mod_tools) + +CODE_BLOCK_RE = re.compile( + r"^`([^`\n]+)`" # Inline codeblock + r"|```(.+?)```", # Multiline codeblock + re.DOTALL | re.MULTILINE +) + + +class FetchIssueErrors(Enum): + """Errors returned in fetch issues.""" + + value_error = "Numbers not found." + max_requests = "Max requests hit." + class Issues(commands.Cog): """Cog that allows users to retrieve issues from GitHub.""" def __init__(self, bot: commands.Bot): self.bot = bot - - @commands.command(aliases=("pr",)) - @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib, Channels.dev_branding)) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" + self.repos = [] + self.get_pydis_repos.start() + + @tasks.loop(minutes=30) + async def get_pydis_repos(self) -> None: + """Get all python-discord repositories on github.""" + async with self.bot.http_session.get(REPOS_API.format(org="python-discord")) as resp: + if resp.status == 200: + data = await resp.json() + for repo in data: + self.repos.append(repo["full_name"].split("/")[1]) + self.repo_regex = "|".join(self.repos) + else: + log.debug(f"Failed to get latest Pydis repositories. Status code {resp.status}") + + @staticmethod + def check_in_block(message: discord.Message, repo_issue: str) -> bool: + """Check whether the <repo>#<issue> is in codeblocks.""" + block = re.findall(CODE_BLOCK_RE, message.content) + + if not block: + return False + elif "#".join(repo_issue.split(" ")) in "".join([*block[0]]): + return True + return False + + async def fetch_issues( + self, + numbers: set, + repository: str, + user: str + ) -> t.Union[FetchIssueErrors, str, list]: + """Retrieve issue(s) from a GitHub repository.""" links = [] - numbers = set(numbers) # Convert from list to set to remove duplicates, if any - if not numbers: - await ctx.invoke(self.bot.get_command('help'), 'issue') - return + return FetchIssueErrors.value_error if len(numbers) > MAX_REQUESTS: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})" - ) - await ctx.send(embed=embed) - return + return FetchIssueErrors.max_requests for number in numbers: url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" - log.trace(f"Querying GH issues API: {url}") async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: json_data = await r.json() if r.status in BAD_RESPONSE: log.warning(f"Received response {r.status} from: {url}") - return await ctx.send(f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}") + return f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}" # The initial API request is made to the issues API endpoint, which will return information # if the issue or PR is present. However, the scope of information returned for PRs differs @@ -92,15 +125,80 @@ class Issues(commands.Cog): issue_url = json_data.get("html_url") links.append([icon_url, f"[{repository}] #{number} {json_data.get('title')}", issue_url]) - # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link. - description_list = ["{0} [{1}]({2})".format(*link) for link in links] + return links + + @staticmethod + def get_embed(result: list, user: str = "python-discord", repository: str = "") -> discord.Embed: + """Get Response Embed.""" + description_list = ["{0} [{1}]({2})".format(*link) for link in result] resp = discord.Embed( colour=Colours.bright_green, description='\n'.join(description_list) ) resp.set_author(name="GitHub", url=f"https://github.com/{user}/{repository}") - await ctx.send(embed=resp) + return resp + + @commands.command(aliases=("pr",)) + async def issue( + self, + ctx: commands.Context, + numbers: commands.Greedy[int], + repository: str = "sir-lancebot", + user: str = "python-discord" + ) -> None: + """Command to retrieve issue(s) from a GitHub repository.""" + if not( + ctx.channel.category.id in WHITELISTED_CATEGORIES + or ctx.channel.id in WHITELISTED_CHANNELS + ): + return + + result = await self.fetch_issues(set(numbers), repository, user) + + if result == FetchIssueErrors.value_error: + await ctx.invoke(self.bot.get_command('help'), 'issue') + + elif result == FetchIssueErrors.max_requests: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})" + ) + await ctx.send(embed=embed) + + elif isinstance(result, list): + # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link. + resp = self.get_embed(result, user, repository) + await ctx.send(embed=resp) + + elif isinstance(result, str): + await ctx.send(result) + + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """Command to retrieve issue(s) from a GitHub repository using automatic linking if matching <repo>#<issue>.""" + if not( + message.channel.category.id in WHITELISTED_CATEGORIES + or message.channel.id in WHITELISTED_CHANNELS_ON_MESSAGE + ): + return + + message_repo_issue_map = re.findall(fr"({self.repo_regex})#(\d+)", message.content) + links = [] + + if message_repo_issue_map: + for repo_issue in message_repo_issue_map: + if not self.check_in_block(message, " ".join(repo_issue)): + result = await self.fetch_issues({repo_issue[1]}, repo_issue[0], "python-discord") + if isinstance(result, list): + links.extend(result) + + if not links: + return + + resp = self.get_embed(links, "python-discord") + await message.channel.send(embed=resp) def setup(bot: commands.Bot) -> None: diff --git a/bot/exts/evergreen/status_cats.py b/bot/exts/evergreen/status_cats.py deleted file mode 100644 index 586b8378..00000000 --- a/bot/exts/evergreen/status_cats.py +++ /dev/null @@ -1,33 +0,0 @@ -from http import HTTPStatus - -import discord -from discord.ext import commands - - -class StatusCats(commands.Cog): - """Commands that give HTTP statuses described and visualized by cats.""" - - def __init__(self, bot: commands.Bot): - self.bot = bot - - @commands.command(aliases=['statuscat']) - async def http_cat(self, ctx: commands.Context, code: int) -> None: - """Sends an embed with an image of a cat, potraying the status code.""" - embed = discord.Embed(title=f'**Status: {code}**') - - try: - HTTPStatus(code) - - except ValueError: - embed.set_footer(text='Inputted status code does not exist.') - - else: - embed.set_image(url=f'https://http.cat/{code}.jpg') - - finally: - await ctx.send(embed=embed) - - -def setup(bot: commands.Bot) -> None: - """Load the StatusCats cog.""" - bot.add_cog(StatusCats(bot)) diff --git a/bot/exts/evergreen/status_codes.py b/bot/exts/evergreen/status_codes.py new file mode 100644 index 00000000..874c87eb --- /dev/null +++ b/bot/exts/evergreen/status_codes.py @@ -0,0 +1,71 @@ +from http import HTTPStatus + +import discord +from discord.ext import commands + +HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg" +HTTP_CAT_URL = "https://http.cat/{code}.jpg" + + +class HTTPStatusCodes(commands.Cog): + """Commands that give HTTP statuses described and visualized by cats and dogs.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @commands.group(name="http_status", aliases=("status", "httpstatus")) + async def http_status_group(self, ctx: commands.Context) -> None: + """Group containing dog and cat http status code commands.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @http_status_group.command(name='cat') + async def http_cat(self, ctx: commands.Context, code: int) -> None: + """Sends an embed with an image of a cat, portraying the status code.""" + embed = discord.Embed(title=f'**Status: {code}**') + url = HTTP_CAT_URL.format(code=code) + + try: + HTTPStatus(code) + async with self.bot.http_session.get(url, allow_redirects=False) as response: + if response.status != 404: + embed.set_image(url=url) + else: + raise NotImplementedError + + except ValueError: + embed.set_footer(text='Inputted status code does not exist.') + + except NotImplementedError: + embed.set_footer(text='Inputted status code is not implemented by http.cat yet.') + + finally: + await ctx.send(embed=embed) + + @http_status_group.command(name='dog') + async def http_dog(self, ctx: commands.Context, code: int) -> None: + """Sends an embed with an image of a dog, portraying the status code.""" + embed = discord.Embed(title=f'**Status: {code}**') + url = HTTP_DOG_URL.format(code=code) + + try: + HTTPStatus(code) + async with self.bot.http_session.get(url, allow_redirects=False) as response: + if response.status != 302: + embed.set_image(url=url) + else: + raise NotImplementedError + + except ValueError: + embed.set_footer(text='Inputted status code does not exist.') + + except NotImplementedError: + embed.set_footer(text='Inputted status code is not implemented by httpstatusdogs.com yet.') + + finally: + await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: + """Load the HTTPStatusCodes cog.""" + bot.add_cog(HTTPStatusCodes(bot)) diff --git a/bot/exts/evergreen/tic_tac_toe.py b/bot/exts/evergreen/tic_tac_toe.py new file mode 100644 index 00000000..e1190502 --- /dev/null +++ b/bot/exts/evergreen/tic_tac_toe.py @@ -0,0 +1,323 @@ +import asyncio +import random +import typing as t + +import discord +from discord.ext.commands import Cog, Context, check, group, guild_only + +from bot.bot import Bot +from bot.constants import Emojis +from bot.utils.pagination import LinePaginator + +CONFIRMATION_MESSAGE = ( + "{opponent}, {requester} wants to play Tic-Tac-Toe against you. React to this message with " + f"{Emojis.confirmation} to accept or with {Emojis.decline} to decline." +) + + +def check_win(board: t.Dict[int, str]) -> bool: + """Check from board, is any player won game.""" + return any( + ( + # Horizontal + board[1] == board[2] == board[3], + board[4] == board[5] == board[6], + board[7] == board[8] == board[9], + # Vertical + board[1] == board[4] == board[7], + board[2] == board[5] == board[8], + board[3] == board[6] == board[9], + # Diagonal + board[1] == board[5] == board[9], + board[3] == board[5] == board[7], + ) + ) + + +class Player: + """Class that contains information about player and functions that interact with player.""" + + def __init__(self, user: discord.User, ctx: Context, symbol: str): + self.user = user + self.ctx = ctx + self.symbol = symbol + + async def get_move(self, board: t.Dict[int, str], msg: discord.Message) -> t.Tuple[bool, t.Optional[int]]: + """ + Get move from user. + + Return is timeout reached and position of field what user will fill when timeout don't reach. + """ + def check_for_move(r: discord.Reaction, u: discord.User) -> bool: + """Check does user who reacted is user who we want, message is board and emoji is in board values.""" + return ( + u.id == self.user.id + and msg.id == r.message.id + and r.emoji in board.values() + and r.emoji in Emojis.number_emojis.values() + ) + + try: + react, _ = await self.ctx.bot.wait_for('reaction_add', timeout=30.0, check=check_for_move) + except asyncio.TimeoutError: + return True, None + else: + return False, list(Emojis.number_emojis.keys())[list(Emojis.number_emojis.values()).index(react.emoji)] + + def __str__(self) -> str: + """Return mention of user.""" + return self.user.mention + + +class AI: + """Tic Tac Toe AI class for against computer gaming.""" + + def __init__(self, symbol: str): + self.symbol = symbol + + async def get_move(self, board: t.Dict[int, str], _: discord.Message) -> t.Tuple[bool, int]: + """Get move from AI. AI use Minimax strategy.""" + possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())] + + for symbol in (Emojis.o, Emojis.x): + for move in possible_moves: + board_copy = board.copy() + board_copy[move] = symbol + if check_win(board_copy): + return False, move + + open_corners = [i for i in possible_moves if i in (1, 3, 7, 9)] + if len(open_corners) > 0: + return False, random.choice(open_corners) + + if 5 in possible_moves: + return False, 5 + + open_edges = [i for i in possible_moves if i in (2, 4, 6, 8)] + return False, random.choice(open_edges) + + def __str__(self) -> str: + """Return `AI` as user name.""" + return "AI" + + +class Game: + """Class that contains information and functions about Tic Tac Toe game.""" + + def __init__(self, players: t.List[t.Union[Player, AI]], ctx: Context): + self.players = players + self.ctx = ctx + self.board = { + 1: Emojis.number_emojis[1], + 2: Emojis.number_emojis[2], + 3: Emojis.number_emojis[3], + 4: Emojis.number_emojis[4], + 5: Emojis.number_emojis[5], + 6: Emojis.number_emojis[6], + 7: Emojis.number_emojis[7], + 8: Emojis.number_emojis[8], + 9: Emojis.number_emojis[9] + } + + self.current = self.players[0] + self.next = self.players[1] + + self.winner: t.Optional[t.Union[Player, AI]] = None + self.loser: t.Optional[t.Union[Player, AI]] = None + self.over = False + self.canceled = False + self.draw = False + + async def get_confirmation(self) -> t.Tuple[bool, t.Optional[str]]: + """ + Ask does user want to play TicTacToe against requester. First player is always requester. + + This return tuple that have: + - first element boolean (is game accepted?) + - (optional, only when first element is False, otherwise None) reason for declining. + """ + confirm_message = await self.ctx.send( + CONFIRMATION_MESSAGE.format( + opponent=self.players[1].user.mention, + requester=self.players[0].user.mention + ) + ) + await confirm_message.add_reaction(Emojis.confirmation) + await confirm_message.add_reaction(Emojis.decline) + + def confirm_check(reaction: discord.Reaction, user: discord.User) -> bool: + """Check is user who reacted from who this was requested, message is confirmation and emoji is valid.""" + return ( + reaction.emoji in (Emojis.confirmation, Emojis.decline) + and reaction.message.id == confirm_message.id + and user == self.players[1].user + ) + + try: + reaction, user = await self.ctx.bot.wait_for( + "reaction_add", + timeout=60.0, + check=confirm_check + ) + except asyncio.TimeoutError: + self.over = True + self.canceled = True + await confirm_message.delete() + return False, "Running out of time... Cancelled game." + + await confirm_message.delete() + if reaction.emoji == Emojis.confirmation: + return True, None + else: + self.over = True + self.canceled = True + return False, "User declined" + + async def add_reactions(self, msg: discord.Message) -> None: + """Add number emojis to message.""" + for nr in Emojis.number_emojis.values(): + await msg.add_reaction(nr) + + def format_board(self) -> str: + """Get formatted tic-tac-toe board for message.""" + board = list(self.board.values()) + return "\n".join( + (f"{board[line]} {board[line + 1]} {board[line + 2]}" for line in range(0, len(board), 3)) + ) + + async def play(self) -> None: + """Start and handle game.""" + await self.ctx.send("It's time for the game! Let's begin.") + board = await self.ctx.send( + embed=discord.Embed(description=self.format_board()) + ) + await self.add_reactions(board) + + for _ in range(9): + if isinstance(self.current, Player): + announce = await self.ctx.send( + f"{self.current.user.mention}, it's your turn! " + "React with an emoji to take your go." + ) + timeout, pos = await self.current.get_move(self.board, board) + if isinstance(self.current, Player): + await announce.delete() + if timeout: + await self.ctx.send(f"{self.current.user.mention} ran out of time. Canceling game.") + self.over = True + self.canceled = True + return + self.board[pos] = self.current.symbol + await board.edit( + embed=discord.Embed(description=self.format_board()) + ) + await board.clear_reaction(Emojis.number_emojis[pos]) + if check_win(self.board): + self.winner = self.current + self.loser = self.next + await self.ctx.send( + f":tada: {self.current} won this game! :tada:" + ) + await board.clear_reactions() + break + self.current, self.next = self.next, self.current + if not self.winner: + self.draw = True + await self.ctx.send("It's a DRAW!") + self.over = True + + +def is_channel_free() -> t.Callable: + """Check is channel where command will be invoked free.""" + async def predicate(ctx: Context) -> bool: + return all(game.channel != ctx.channel for game in ctx.cog.games if not game.over) + return check(predicate) + + +def is_requester_free() -> t.Callable: + """Check is requester not already in any game.""" + async def predicate(ctx: Context) -> bool: + return all( + ctx.author not in (player.user for player in game.players) for game in ctx.cog.games if not game.over + ) + return check(predicate) + + +class TicTacToe(Cog): + """TicTacToe cog contains tic-tac-toe game commands.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.games: t.List[Game] = [] + + @guild_only() + @is_channel_free() + @is_requester_free() + @group(name="tictactoe", aliases=("ttt",), invoke_without_command=True) + async def tic_tac_toe(self, ctx: Context, opponent: t.Optional[discord.User]) -> None: + """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field.""" + if opponent == ctx.author: + await ctx.send("You can't play against yourself.") + return + if opponent is not None and not all( + opponent not in (player.user for player in g.players) for g in ctx.cog.games if not g.over + ): + await ctx.send("Opponent is already in game.") + return + if opponent is None: + game = Game( + [Player(ctx.author, ctx, Emojis.x), AI(Emojis.o)], + ctx + ) + else: + game = Game( + [Player(ctx.author, ctx, Emojis.x), Player(opponent, ctx, Emojis.o)], + ctx + ) + self.games.append(game) + if opponent is not None: + confirmed, msg = await game.get_confirmation() + + if not confirmed: + if msg: + await ctx.send(msg) + return + await game.play() + + @tic_tac_toe.group(name="history", aliases=("log",), invoke_without_command=True) + async def tic_tac_toe_logs(self, ctx: Context) -> None: + """Show most recent tic-tac-toe games.""" + if len(self.games) < 1: + await ctx.send("No recent games.") + return + log_games = [] + for i, game in enumerate(self.games): + if game.over and not game.canceled: + if game.draw: + log_games.append( + f"**#{i+1}**: {game.players[0]} vs {game.players[1]} (draw)" + ) + else: + log_games.append( + f"**#{i+1}**: {game.winner} :trophy: vs {game.loser}" + ) + await LinePaginator.paginate( + log_games, + ctx, + discord.Embed(title="Most recent Tic Tac Toe games") + ) + + @tic_tac_toe_logs.command(name="show", aliases=("s",)) + async def show_tic_tac_toe_board(self, ctx: Context, game_id: int) -> None: + """View game board by ID (ID is possible to get by `.tictactoe history`).""" + if len(self.games) < game_id: + await ctx.send("Game don't exist.") + return + game = self.games[game_id - 1] + await ctx.send(f"{game.winner} :trophy: vs {game.loser}") + await ctx.send(game.format_board()) + + +def setup(bot: Bot) -> None: + """Load TicTacToe Cog.""" + bot.add_cog(TicTacToe(bot)) diff --git a/bot/exts/evergreen/xkcd.py b/bot/exts/evergreen/xkcd.py new file mode 100644 index 00000000..d3224bfe --- /dev/null +++ b/bot/exts/evergreen/xkcd.py @@ -0,0 +1,89 @@ +import logging +import re +from random import randint +from typing import Dict, Optional, Union + +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, command + +from bot.bot import Bot +from bot.constants import Colours + +log = logging.getLogger(__name__) + +COMIC_FORMAT = re.compile(r"latest|[0-9]+") +BASE_URL = "https://xkcd.com" + + +class XKCD(Cog): + """Retrieving XKCD comics.""" + + def __init__(self, bot: Bot) -> None: + self.bot = bot + self.latest_comic_info: Dict[str, Union[str, int]] = {} + self.get_latest_comic_info.start() + + def cog_unload(self) -> None: + """Cancels refreshing of the task for refreshing the most recent comic info.""" + self.get_latest_comic_info.cancel() + + @tasks.loop(minutes=30) + async def get_latest_comic_info(self) -> None: + """Refreshes latest comic's information ever 30 minutes. Also used for finding a random comic.""" + async with self.bot.http_session.get(f"{BASE_URL}/info.0.json") as resp: + if resp.status == 200: + self.latest_comic_info = await resp.json() + else: + log.debug(f"Failed to get latest XKCD comic information. Status code {resp.status}") + + @command(name="xkcd") + async def fetch_xkcd_comics(self, ctx: Context, comic: Optional[str]) -> None: + """ + Getting an xkcd comic's information along with the image. + + To get a random comic, don't type any number as an argument. To get the latest, type 'latest'. + """ + embed = Embed(title=f"XKCD comic '{comic}'") + + embed.colour = Colours.soft_red + + if comic and (comic := re.match(COMIC_FORMAT, comic)) is None: + embed.description = "Comic parameter should either be an integer or 'latest'." + await ctx.send(embed=embed) + return + + comic = randint(1, self.latest_comic_info['num']) if comic is None else comic.group(0) + + if comic == "latest": + info = self.latest_comic_info + else: + async with self.bot.http_session.get(f"{BASE_URL}/{comic}/info.0.json") as resp: + if resp.status == 200: + info = await resp.json() + else: + embed.title = f"XKCD comic #{comic}" + embed.description = f"{resp.status}: Could not retrieve xkcd comic #{comic}." + log.debug(f"Retrieving xkcd comic #{comic} failed with status code {resp.status}.") + await ctx.send(embed=embed) + return + + embed.title = f"XKCD comic #{info['num']}" + + if info["img"][-3:] in ("jpg", "png", "gif"): + embed.set_image(url=info["img"]) + date = f"{info['year']}/{info['month']}/{info['day']}" + embed.set_footer(text=f"{date} - #{info['num']}, \'{info['safe_title']}\'") + embed.colour = Colours.soft_green + else: + embed.description = ( + "The selected comic is interactive, and cannot be displayed within an embed.\n" + f"Comic can be viewed [here](https://xkcd.com/{info['num']})." + ) + + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Loading the XKCD cog.""" + bot.add_cog(XKCD(bot)) |