aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/evergreen
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/evergreen')
-rw-r--r--bot/exts/evergreen/battleship.py2
-rw-r--r--bot/exts/evergreen/cheatsheet.py113
-rw-r--r--bot/exts/evergreen/game.py83
-rw-r--r--bot/exts/evergreen/issues.py160
-rw-r--r--bot/exts/evergreen/status_cats.py33
-rw-r--r--bot/exts/evergreen/status_codes.py71
-rw-r--r--bot/exts/evergreen/tic_tac_toe.py323
-rw-r--r--bot/exts/evergreen/xkcd.py89
8 files changed, 796 insertions, 78 deletions
diff --git a/bot/exts/evergreen/battleship.py b/bot/exts/evergreen/battleship.py
index 9bc374e6..fa3fb35c 100644
--- a/bot/exts/evergreen/battleship.py
+++ b/bot/exts/evergreen/battleship.py
@@ -140,7 +140,7 @@ class Game:
@staticmethod
def get_square(grid: Grid, square: str) -> Square:
"""Grabs a square from a grid with an inputted key."""
- index = ord(square[0]) - ord("A")
+ index = ord(square[0].upper()) - ord("A")
number = int(square[1:])
return grid[number-1][index] # -1 since lists are indexed from 0
diff --git a/bot/exts/evergreen/cheatsheet.py b/bot/exts/evergreen/cheatsheet.py
new file mode 100644
index 00000000..a64ddd69
--- /dev/null
+++ b/bot/exts/evergreen/cheatsheet.py
@@ -0,0 +1,113 @@
+import random
+import re
+import typing as t
+from urllib.parse import quote_plus
+
+from discord import Embed
+from discord.ext import commands
+from discord.ext.commands import BucketType, Context
+
+from bot import constants
+from bot.constants import Categories, Channels, Colours, ERROR_REPLIES, Roles, WHITELISTED_CHANNELS
+from bot.utils.decorators import with_role
+
+ERROR_MESSAGE = f"""
+Unknown cheat sheet. Please try to reformulate your query.
+
+**Examples**:
+```md
+{constants.Client.prefix}cht read json
+{constants.Client.prefix}cht hello world
+{constants.Client.prefix}cht lambda
+```
+If the problem persists send a message in <#{Channels.dev_contrib}>
+"""
+
+URL = 'https://cheat.sh/python/{search}'
+ESCAPE_TT = str.maketrans({"`": "\\`"})
+ANSI_RE = re.compile(r"\x1b\[.*?m")
+# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html.
+HEADERS = {'User-Agent': 'curl/7.68.0'}
+
+
+class CheatSheet(commands.Cog):
+ """Commands that sends a result of a cht.sh search in code blocks."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @staticmethod
+ def fmt_error_embed() -> Embed:
+ """
+ Format the Error Embed.
+
+ If the cht.sh search returned 404, overwrite it to send a custom error embed.
+ link -> https://github.com/chubin/cheat.sh/issues/198
+ """
+ embed = Embed(
+ title=random.choice(ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=Colours.soft_red
+ )
+ return embed
+
+ def result_fmt(self, url: str, body_text: str) -> t.Tuple[bool, t.Union[str, Embed]]:
+ """Format Result."""
+ if body_text.startswith("# 404 NOT FOUND"):
+ embed = self.fmt_error_embed()
+ return True, embed
+
+ body_space = min(1986 - len(url), 1000)
+
+ if len(body_text) > body_space:
+ description = (f"**Result Of cht.sh**\n"
+ f"```python\n{body_text[:body_space]}\n"
+ f"... (truncated - too many lines)```\n"
+ f"Full results: {url} ")
+ else:
+ description = (f"**Result Of cht.sh**\n"
+ f"```python\n{body_text}```\n"
+ f"{url}")
+ return False, description
+
+ @commands.command(
+ name="cheat",
+ aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"),
+ )
+ @commands.cooldown(1, 10, BucketType.user)
+ @with_role(Roles.everyone_role)
+ async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None:
+ """
+ Search cheat.sh.
+
+ Gets a post from https://cheat.sh/python/ by default.
+ Usage:
+ --> .cht read json
+ """
+ if not (
+ ctx.channel.category.id == Categories.help_in_use
+ or ctx.channel.id in WHITELISTED_CHANNELS
+ ):
+ return
+
+ async with ctx.typing():
+ search_string = quote_plus(" ".join(search_terms))
+
+ async with self.bot.http_session.get(
+ URL.format(search=search_string), headers=HEADERS
+ ) as response:
+ result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT)
+
+ is_embed, description = self.result_fmt(
+ URL.format(search=search_string),
+ result
+ )
+ if is_embed:
+ await ctx.send(embed=description)
+ else:
+ await ctx.send(content=description)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the CheatSheet cog."""
+ bot.add_cog(CheatSheet(bot))
diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py
index d0fd7a40..d37be0e2 100644
--- a/bot/exts/evergreen/game.py
+++ b/bot/exts/evergreen/game.py
@@ -2,7 +2,8 @@ import difflib
import logging
import random
import re
-from datetime import datetime as dt
+from asyncio import sleep
+from datetime import datetime as dt, timedelta
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
@@ -17,10 +18,25 @@ from bot.utils.decorators import with_role
from bot.utils.pagination import ImagePaginator, LinePaginator
# Base URL of IGDB API
-BASE_URL = "https://api-v3.igdb.com"
+BASE_URL = "https://api.igdb.com/v4"
-HEADERS = {
- "user-key": Tokens.igdb,
+CLIENT_ID = Tokens.igdb_client_id
+CLIENT_SECRET = Tokens.igdb_client_secret
+
+# The number of seconds before expiry that we attempt to re-fetch a new access token
+ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2
+
+# URL to request API access token
+OAUTH_URL = "https://id.twitch.tv/oauth2/token"
+
+OAUTH_PARAMS = {
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ "grant_type": "client_credentials"
+}
+
+BASE_HEADERS = {
+ "Client-ID": CLIENT_ID,
"Accept": "application/json"
}
@@ -135,8 +151,47 @@ class Games(Cog):
self.http_session: ClientSession = bot.http_session
self.genres: Dict[str, int] = {}
-
- self.refresh_genres_task.start()
+ self.headers = BASE_HEADERS
+
+ self.bot.loop.create_task(self.renew_access_token())
+
+ async def renew_access_token(self) -> None:
+ """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`."""
+ while True:
+ async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp:
+ result = await resp.json()
+ if resp.status != 200:
+ # If there is a valid access token continue to use that,
+ # otherwise unload cog.
+ if "Authorization" in self.headers:
+ time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW)
+ logger.error(
+ "Failed to renew IGDB access token. "
+ f"Current token will last for {time_delta} "
+ f"OAuth response message: {result['message']}"
+ )
+ else:
+ logger.warning(
+ "Invalid OAuth credentials. Unloading Games cog. "
+ f"OAuth response message: {result['message']}"
+ )
+ self.bot.remove_cog('Games')
+
+ return
+
+ self.headers["Authorization"] = f"Bearer {result['access_token']}"
+
+ # Attempt to renew before the token expires
+ next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW
+
+ time_delta = timedelta(seconds=next_renewal)
+ logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}")
+
+ # This will be true the first time this loop runs.
+ # Since we now have an access token, its safe to start this task.
+ if self.genres == {}:
+ self.refresh_genres_task.start()
+ await sleep(next_renewal)
@tasks.loop(hours=24.0)
async def refresh_genres_task(self) -> None:
@@ -156,9 +211,8 @@ class Games(Cog):
async def _get_genres(self) -> None:
"""Create genres variable for games command."""
body = "fields name; limit 100;"
- async with self.http_session.get(f"{BASE_URL}/genres", data=body, headers=HEADERS) as resp:
+ async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp:
result = await resp.json()
-
genres = {genre["name"].capitalize(): genre["id"] for genre in result}
# Replace complex names with names from ALIASES
@@ -306,7 +360,7 @@ class Games(Cog):
body = GAMES_LIST_BODY.format(**params)
# Do request to IGDB API, create headers, URL, define body, return result
- async with self.http_session.get(url=f"{BASE_URL}/games", data=body, headers=HEADERS) as resp:
+ async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp:
return await resp.json()
async def create_page(self, data: Dict[str, Any]) -> Tuple[str, str]:
@@ -348,7 +402,7 @@ class Games(Cog):
# Define request body of IGDB API request and do request
body = SEARCH_BODY.format(**{"term": search_term})
- async with self.http_session.get(url=f"{BASE_URL}/games", data=body, headers=HEADERS) as resp:
+ async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp:
data = await resp.json()
# Loop over games, format them to good format, make line and append this to total lines
@@ -377,7 +431,7 @@ class Games(Cog):
"offset": offset
})
- async with self.http_session.get(url=f"{BASE_URL}/companies", data=body, headers=HEADERS) as resp:
+ async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp:
return await resp.json()
async def create_company_page(self, data: Dict[str, Any]) -> Tuple[str, str]:
@@ -418,7 +472,10 @@ class Games(Cog):
def setup(bot: Bot) -> None:
"""Add/Load Games cog."""
# Check does IGDB API key exist, if not, log warning and don't load cog
- if not Tokens.igdb:
- logger.warning("No IGDB API key. Not loading Games cog.")
+ if not Tokens.igdb_client_id:
+ logger.warning("No IGDB client ID. Not loading Games cog.")
+ return
+ if not Tokens.igdb_client_secret:
+ logger.warning("No IGDB client secret. Not loading Games cog.")
return
bot.add_cog(Games(bot))
diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py
index e419a6f5..73ebe547 100644
--- a/bot/exts/evergreen/issues.py
+++ b/bot/exts/evergreen/issues.py
@@ -1,11 +1,13 @@
import logging
import random
+import re
+import typing as t
+from enum import Enum
import discord
-from discord.ext import commands
+from discord.ext import commands, tasks
-from bot.constants import Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS
-from bot.utils.decorators import override_in_channel
+from bot.constants import Categories, Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS
log = logging.getLogger(__name__)
@@ -15,55 +17,86 @@ BAD_RESPONSE = {
}
MAX_REQUESTS = 10
-
REQUEST_HEADERS = dict()
+
+REPOS_API = "https://api.github.com/orgs/{org}/repos"
if GITHUB_TOKEN := Tokens.github:
REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
+WHITELISTED_CATEGORIES = (
+ Categories.devprojects, Categories.media, Categories.development
+)
+WHITELISTED_CHANNELS_ON_MESSAGE = (Channels.organisation, Channels.mod_meta, Channels.mod_tools)
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+
+class FetchIssueErrors(Enum):
+ """Errors returned in fetch issues."""
+
+ value_error = "Numbers not found."
+ max_requests = "Max requests hit."
+
class Issues(commands.Cog):
"""Cog that allows users to retrieve issues from GitHub."""
def __init__(self, bot: commands.Bot):
self.bot = bot
-
- @commands.command(aliases=("pr",))
- @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib, Channels.dev_branding))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
+ self.repos = []
+ self.get_pydis_repos.start()
+
+ @tasks.loop(minutes=30)
+ async def get_pydis_repos(self) -> None:
+ """Get all python-discord repositories on github."""
+ async with self.bot.http_session.get(REPOS_API.format(org="python-discord")) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ for repo in data:
+ self.repos.append(repo["full_name"].split("/")[1])
+ self.repo_regex = "|".join(self.repos)
+ else:
+ log.debug(f"Failed to get latest Pydis repositories. Status code {resp.status}")
+
+ @staticmethod
+ def check_in_block(message: discord.Message, repo_issue: str) -> bool:
+ """Check whether the <repo>#<issue> is in codeblocks."""
+ block = re.findall(CODE_BLOCK_RE, message.content)
+
+ if not block:
+ return False
+ elif "#".join(repo_issue.split(" ")) in "".join([*block[0]]):
+ return True
+ return False
+
+ async def fetch_issues(
+ self,
+ numbers: set,
+ repository: str,
+ user: str
+ ) -> t.Union[FetchIssueErrors, str, list]:
+ """Retrieve issue(s) from a GitHub repository."""
links = []
- numbers = set(numbers) # Convert from list to set to remove duplicates, if any
-
if not numbers:
- await ctx.invoke(self.bot.get_command('help'), 'issue')
- return
+ return FetchIssueErrors.value_error
if len(numbers) > MAX_REQUESTS:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})"
- )
- await ctx.send(embed=embed)
- return
+ return FetchIssueErrors.max_requests
for number in numbers:
url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}"
merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge"
-
log.trace(f"Querying GH issues API: {url}")
async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
json_data = await r.json()
if r.status in BAD_RESPONSE:
log.warning(f"Received response {r.status} from: {url}")
- return await ctx.send(f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}")
+ return f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}"
# The initial API request is made to the issues API endpoint, which will return information
# if the issue or PR is present. However, the scope of information returned for PRs differs
@@ -92,15 +125,80 @@ class Issues(commands.Cog):
issue_url = json_data.get("html_url")
links.append([icon_url, f"[{repository}] #{number} {json_data.get('title')}", issue_url])
- # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link.
- description_list = ["{0} [{1}]({2})".format(*link) for link in links]
+ return links
+
+ @staticmethod
+ def get_embed(result: list, user: str = "python-discord", repository: str = "") -> discord.Embed:
+ """Get Response Embed."""
+ description_list = ["{0} [{1}]({2})".format(*link) for link in result]
resp = discord.Embed(
colour=Colours.bright_green,
description='\n'.join(description_list)
)
resp.set_author(name="GitHub", url=f"https://github.com/{user}/{repository}")
- await ctx.send(embed=resp)
+ return resp
+
+ @commands.command(aliases=("pr",))
+ async def issue(
+ self,
+ ctx: commands.Context,
+ numbers: commands.Greedy[int],
+ repository: str = "sir-lancebot",
+ user: str = "python-discord"
+ ) -> None:
+ """Command to retrieve issue(s) from a GitHub repository."""
+ if not(
+ ctx.channel.category.id in WHITELISTED_CATEGORIES
+ or ctx.channel.id in WHITELISTED_CHANNELS
+ ):
+ return
+
+ result = await self.fetch_issues(set(numbers), repository, user)
+
+ if result == FetchIssueErrors.value_error:
+ await ctx.invoke(self.bot.get_command('help'), 'issue')
+
+ elif result == FetchIssueErrors.max_requests:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})"
+ )
+ await ctx.send(embed=embed)
+
+ elif isinstance(result, list):
+ # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link.
+ resp = self.get_embed(result, user, repository)
+ await ctx.send(embed=resp)
+
+ elif isinstance(result, str):
+ await ctx.send(result)
+
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """Command to retrieve issue(s) from a GitHub repository using automatic linking if matching <repo>#<issue>."""
+ if not(
+ message.channel.category.id in WHITELISTED_CATEGORIES
+ or message.channel.id in WHITELISTED_CHANNELS_ON_MESSAGE
+ ):
+ return
+
+ message_repo_issue_map = re.findall(fr"({self.repo_regex})#(\d+)", message.content)
+ links = []
+
+ if message_repo_issue_map:
+ for repo_issue in message_repo_issue_map:
+ if not self.check_in_block(message, " ".join(repo_issue)):
+ result = await self.fetch_issues({repo_issue[1]}, repo_issue[0], "python-discord")
+ if isinstance(result, list):
+ links.extend(result)
+
+ if not links:
+ return
+
+ resp = self.get_embed(links, "python-discord")
+ await message.channel.send(embed=resp)
def setup(bot: commands.Bot) -> None:
diff --git a/bot/exts/evergreen/status_cats.py b/bot/exts/evergreen/status_cats.py
deleted file mode 100644
index 586b8378..00000000
--- a/bot/exts/evergreen/status_cats.py
+++ /dev/null
@@ -1,33 +0,0 @@
-from http import HTTPStatus
-
-import discord
-from discord.ext import commands
-
-
-class StatusCats(commands.Cog):
- """Commands that give HTTP statuses described and visualized by cats."""
-
- def __init__(self, bot: commands.Bot):
- self.bot = bot
-
- @commands.command(aliases=['statuscat'])
- async def http_cat(self, ctx: commands.Context, code: int) -> None:
- """Sends an embed with an image of a cat, potraying the status code."""
- embed = discord.Embed(title=f'**Status: {code}**')
-
- try:
- HTTPStatus(code)
-
- except ValueError:
- embed.set_footer(text='Inputted status code does not exist.')
-
- else:
- embed.set_image(url=f'https://http.cat/{code}.jpg')
-
- finally:
- await ctx.send(embed=embed)
-
-
-def setup(bot: commands.Bot) -> None:
- """Load the StatusCats cog."""
- bot.add_cog(StatusCats(bot))
diff --git a/bot/exts/evergreen/status_codes.py b/bot/exts/evergreen/status_codes.py
new file mode 100644
index 00000000..874c87eb
--- /dev/null
+++ b/bot/exts/evergreen/status_codes.py
@@ -0,0 +1,71 @@
+from http import HTTPStatus
+
+import discord
+from discord.ext import commands
+
+HTTP_DOG_URL = "https://httpstatusdogs.com/img/{code}.jpg"
+HTTP_CAT_URL = "https://http.cat/{code}.jpg"
+
+
+class HTTPStatusCodes(commands.Cog):
+ """Commands that give HTTP statuses described and visualized by cats and dogs."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @commands.group(name="http_status", aliases=("status", "httpstatus"))
+ async def http_status_group(self, ctx: commands.Context) -> None:
+ """Group containing dog and cat http status code commands."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @http_status_group.command(name='cat')
+ async def http_cat(self, ctx: commands.Context, code: int) -> None:
+ """Sends an embed with an image of a cat, portraying the status code."""
+ embed = discord.Embed(title=f'**Status: {code}**')
+ url = HTTP_CAT_URL.format(code=code)
+
+ try:
+ HTTPStatus(code)
+ async with self.bot.http_session.get(url, allow_redirects=False) as response:
+ if response.status != 404:
+ embed.set_image(url=url)
+ else:
+ raise NotImplementedError
+
+ except ValueError:
+ embed.set_footer(text='Inputted status code does not exist.')
+
+ except NotImplementedError:
+ embed.set_footer(text='Inputted status code is not implemented by http.cat yet.')
+
+ finally:
+ await ctx.send(embed=embed)
+
+ @http_status_group.command(name='dog')
+ async def http_dog(self, ctx: commands.Context, code: int) -> None:
+ """Sends an embed with an image of a dog, portraying the status code."""
+ embed = discord.Embed(title=f'**Status: {code}**')
+ url = HTTP_DOG_URL.format(code=code)
+
+ try:
+ HTTPStatus(code)
+ async with self.bot.http_session.get(url, allow_redirects=False) as response:
+ if response.status != 302:
+ embed.set_image(url=url)
+ else:
+ raise NotImplementedError
+
+ except ValueError:
+ embed.set_footer(text='Inputted status code does not exist.')
+
+ except NotImplementedError:
+ embed.set_footer(text='Inputted status code is not implemented by httpstatusdogs.com yet.')
+
+ finally:
+ await ctx.send(embed=embed)
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the HTTPStatusCodes cog."""
+ bot.add_cog(HTTPStatusCodes(bot))
diff --git a/bot/exts/evergreen/tic_tac_toe.py b/bot/exts/evergreen/tic_tac_toe.py
new file mode 100644
index 00000000..e1190502
--- /dev/null
+++ b/bot/exts/evergreen/tic_tac_toe.py
@@ -0,0 +1,323 @@
+import asyncio
+import random
+import typing as t
+
+import discord
+from discord.ext.commands import Cog, Context, check, group, guild_only
+
+from bot.bot import Bot
+from bot.constants import Emojis
+from bot.utils.pagination import LinePaginator
+
+CONFIRMATION_MESSAGE = (
+ "{opponent}, {requester} wants to play Tic-Tac-Toe against you. React to this message with "
+ f"{Emojis.confirmation} to accept or with {Emojis.decline} to decline."
+)
+
+
+def check_win(board: t.Dict[int, str]) -> bool:
+ """Check from board, is any player won game."""
+ return any(
+ (
+ # Horizontal
+ board[1] == board[2] == board[3],
+ board[4] == board[5] == board[6],
+ board[7] == board[8] == board[9],
+ # Vertical
+ board[1] == board[4] == board[7],
+ board[2] == board[5] == board[8],
+ board[3] == board[6] == board[9],
+ # Diagonal
+ board[1] == board[5] == board[9],
+ board[3] == board[5] == board[7],
+ )
+ )
+
+
+class Player:
+ """Class that contains information about player and functions that interact with player."""
+
+ def __init__(self, user: discord.User, ctx: Context, symbol: str):
+ self.user = user
+ self.ctx = ctx
+ self.symbol = symbol
+
+ async def get_move(self, board: t.Dict[int, str], msg: discord.Message) -> t.Tuple[bool, t.Optional[int]]:
+ """
+ Get move from user.
+
+ Return is timeout reached and position of field what user will fill when timeout don't reach.
+ """
+ def check_for_move(r: discord.Reaction, u: discord.User) -> bool:
+ """Check does user who reacted is user who we want, message is board and emoji is in board values."""
+ return (
+ u.id == self.user.id
+ and msg.id == r.message.id
+ and r.emoji in board.values()
+ and r.emoji in Emojis.number_emojis.values()
+ )
+
+ try:
+ react, _ = await self.ctx.bot.wait_for('reaction_add', timeout=30.0, check=check_for_move)
+ except asyncio.TimeoutError:
+ return True, None
+ else:
+ return False, list(Emojis.number_emojis.keys())[list(Emojis.number_emojis.values()).index(react.emoji)]
+
+ def __str__(self) -> str:
+ """Return mention of user."""
+ return self.user.mention
+
+
+class AI:
+ """Tic Tac Toe AI class for against computer gaming."""
+
+ def __init__(self, symbol: str):
+ self.symbol = symbol
+
+ async def get_move(self, board: t.Dict[int, str], _: discord.Message) -> t.Tuple[bool, int]:
+ """Get move from AI. AI use Minimax strategy."""
+ possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())]
+
+ for symbol in (Emojis.o, Emojis.x):
+ for move in possible_moves:
+ board_copy = board.copy()
+ board_copy[move] = symbol
+ if check_win(board_copy):
+ return False, move
+
+ open_corners = [i for i in possible_moves if i in (1, 3, 7, 9)]
+ if len(open_corners) > 0:
+ return False, random.choice(open_corners)
+
+ if 5 in possible_moves:
+ return False, 5
+
+ open_edges = [i for i in possible_moves if i in (2, 4, 6, 8)]
+ return False, random.choice(open_edges)
+
+ def __str__(self) -> str:
+ """Return `AI` as user name."""
+ return "AI"
+
+
+class Game:
+ """Class that contains information and functions about Tic Tac Toe game."""
+
+ def __init__(self, players: t.List[t.Union[Player, AI]], ctx: Context):
+ self.players = players
+ self.ctx = ctx
+ self.board = {
+ 1: Emojis.number_emojis[1],
+ 2: Emojis.number_emojis[2],
+ 3: Emojis.number_emojis[3],
+ 4: Emojis.number_emojis[4],
+ 5: Emojis.number_emojis[5],
+ 6: Emojis.number_emojis[6],
+ 7: Emojis.number_emojis[7],
+ 8: Emojis.number_emojis[8],
+ 9: Emojis.number_emojis[9]
+ }
+
+ self.current = self.players[0]
+ self.next = self.players[1]
+
+ self.winner: t.Optional[t.Union[Player, AI]] = None
+ self.loser: t.Optional[t.Union[Player, AI]] = None
+ self.over = False
+ self.canceled = False
+ self.draw = False
+
+ async def get_confirmation(self) -> t.Tuple[bool, t.Optional[str]]:
+ """
+ Ask does user want to play TicTacToe against requester. First player is always requester.
+
+ This return tuple that have:
+ - first element boolean (is game accepted?)
+ - (optional, only when first element is False, otherwise None) reason for declining.
+ """
+ confirm_message = await self.ctx.send(
+ CONFIRMATION_MESSAGE.format(
+ opponent=self.players[1].user.mention,
+ requester=self.players[0].user.mention
+ )
+ )
+ await confirm_message.add_reaction(Emojis.confirmation)
+ await confirm_message.add_reaction(Emojis.decline)
+
+ def confirm_check(reaction: discord.Reaction, user: discord.User) -> bool:
+ """Check is user who reacted from who this was requested, message is confirmation and emoji is valid."""
+ return (
+ reaction.emoji in (Emojis.confirmation, Emojis.decline)
+ and reaction.message.id == confirm_message.id
+ and user == self.players[1].user
+ )
+
+ try:
+ reaction, user = await self.ctx.bot.wait_for(
+ "reaction_add",
+ timeout=60.0,
+ check=confirm_check
+ )
+ except asyncio.TimeoutError:
+ self.over = True
+ self.canceled = True
+ await confirm_message.delete()
+ return False, "Running out of time... Cancelled game."
+
+ await confirm_message.delete()
+ if reaction.emoji == Emojis.confirmation:
+ return True, None
+ else:
+ self.over = True
+ self.canceled = True
+ return False, "User declined"
+
+ async def add_reactions(self, msg: discord.Message) -> None:
+ """Add number emojis to message."""
+ for nr in Emojis.number_emojis.values():
+ await msg.add_reaction(nr)
+
+ def format_board(self) -> str:
+ """Get formatted tic-tac-toe board for message."""
+ board = list(self.board.values())
+ return "\n".join(
+ (f"{board[line]} {board[line + 1]} {board[line + 2]}" for line in range(0, len(board), 3))
+ )
+
+ async def play(self) -> None:
+ """Start and handle game."""
+ await self.ctx.send("It's time for the game! Let's begin.")
+ board = await self.ctx.send(
+ embed=discord.Embed(description=self.format_board())
+ )
+ await self.add_reactions(board)
+
+ for _ in range(9):
+ if isinstance(self.current, Player):
+ announce = await self.ctx.send(
+ f"{self.current.user.mention}, it's your turn! "
+ "React with an emoji to take your go."
+ )
+ timeout, pos = await self.current.get_move(self.board, board)
+ if isinstance(self.current, Player):
+ await announce.delete()
+ if timeout:
+ await self.ctx.send(f"{self.current.user.mention} ran out of time. Canceling game.")
+ self.over = True
+ self.canceled = True
+ return
+ self.board[pos] = self.current.symbol
+ await board.edit(
+ embed=discord.Embed(description=self.format_board())
+ )
+ await board.clear_reaction(Emojis.number_emojis[pos])
+ if check_win(self.board):
+ self.winner = self.current
+ self.loser = self.next
+ await self.ctx.send(
+ f":tada: {self.current} won this game! :tada:"
+ )
+ await board.clear_reactions()
+ break
+ self.current, self.next = self.next, self.current
+ if not self.winner:
+ self.draw = True
+ await self.ctx.send("It's a DRAW!")
+ self.over = True
+
+
+def is_channel_free() -> t.Callable:
+ """Check is channel where command will be invoked free."""
+ async def predicate(ctx: Context) -> bool:
+ return all(game.channel != ctx.channel for game in ctx.cog.games if not game.over)
+ return check(predicate)
+
+
+def is_requester_free() -> t.Callable:
+ """Check is requester not already in any game."""
+ async def predicate(ctx: Context) -> bool:
+ return all(
+ ctx.author not in (player.user for player in game.players) for game in ctx.cog.games if not game.over
+ )
+ return check(predicate)
+
+
+class TicTacToe(Cog):
+ """TicTacToe cog contains tic-tac-toe game commands."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.games: t.List[Game] = []
+
+ @guild_only()
+ @is_channel_free()
+ @is_requester_free()
+ @group(name="tictactoe", aliases=("ttt",), invoke_without_command=True)
+ async def tic_tac_toe(self, ctx: Context, opponent: t.Optional[discord.User]) -> None:
+ """Tic Tac Toe game. Play against friends or AI. Use reactions to add your mark to field."""
+ if opponent == ctx.author:
+ await ctx.send("You can't play against yourself.")
+ return
+ if opponent is not None and not all(
+ opponent not in (player.user for player in g.players) for g in ctx.cog.games if not g.over
+ ):
+ await ctx.send("Opponent is already in game.")
+ return
+ if opponent is None:
+ game = Game(
+ [Player(ctx.author, ctx, Emojis.x), AI(Emojis.o)],
+ ctx
+ )
+ else:
+ game = Game(
+ [Player(ctx.author, ctx, Emojis.x), Player(opponent, ctx, Emojis.o)],
+ ctx
+ )
+ self.games.append(game)
+ if opponent is not None:
+ confirmed, msg = await game.get_confirmation()
+
+ if not confirmed:
+ if msg:
+ await ctx.send(msg)
+ return
+ await game.play()
+
+ @tic_tac_toe.group(name="history", aliases=("log",), invoke_without_command=True)
+ async def tic_tac_toe_logs(self, ctx: Context) -> None:
+ """Show most recent tic-tac-toe games."""
+ if len(self.games) < 1:
+ await ctx.send("No recent games.")
+ return
+ log_games = []
+ for i, game in enumerate(self.games):
+ if game.over and not game.canceled:
+ if game.draw:
+ log_games.append(
+ f"**#{i+1}**: {game.players[0]} vs {game.players[1]} (draw)"
+ )
+ else:
+ log_games.append(
+ f"**#{i+1}**: {game.winner} :trophy: vs {game.loser}"
+ )
+ await LinePaginator.paginate(
+ log_games,
+ ctx,
+ discord.Embed(title="Most recent Tic Tac Toe games")
+ )
+
+ @tic_tac_toe_logs.command(name="show", aliases=("s",))
+ async def show_tic_tac_toe_board(self, ctx: Context, game_id: int) -> None:
+ """View game board by ID (ID is possible to get by `.tictactoe history`)."""
+ if len(self.games) < game_id:
+ await ctx.send("Game don't exist.")
+ return
+ game = self.games[game_id - 1]
+ await ctx.send(f"{game.winner} :trophy: vs {game.loser}")
+ await ctx.send(game.format_board())
+
+
+def setup(bot: Bot) -> None:
+ """Load TicTacToe Cog."""
+ bot.add_cog(TicTacToe(bot))
diff --git a/bot/exts/evergreen/xkcd.py b/bot/exts/evergreen/xkcd.py
new file mode 100644
index 00000000..d3224bfe
--- /dev/null
+++ b/bot/exts/evergreen/xkcd.py
@@ -0,0 +1,89 @@
+import logging
+import re
+from random import randint
+from typing import Dict, Optional, Union
+
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import Cog, Context, command
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+log = logging.getLogger(__name__)
+
+COMIC_FORMAT = re.compile(r"latest|[0-9]+")
+BASE_URL = "https://xkcd.com"
+
+
+class XKCD(Cog):
+ """Retrieving XKCD comics."""
+
+ def __init__(self, bot: Bot) -> None:
+ self.bot = bot
+ self.latest_comic_info: Dict[str, Union[str, int]] = {}
+ self.get_latest_comic_info.start()
+
+ def cog_unload(self) -> None:
+ """Cancels refreshing of the task for refreshing the most recent comic info."""
+ self.get_latest_comic_info.cancel()
+
+ @tasks.loop(minutes=30)
+ async def get_latest_comic_info(self) -> None:
+ """Refreshes latest comic's information ever 30 minutes. Also used for finding a random comic."""
+ async with self.bot.http_session.get(f"{BASE_URL}/info.0.json") as resp:
+ if resp.status == 200:
+ self.latest_comic_info = await resp.json()
+ else:
+ log.debug(f"Failed to get latest XKCD comic information. Status code {resp.status}")
+
+ @command(name="xkcd")
+ async def fetch_xkcd_comics(self, ctx: Context, comic: Optional[str]) -> None:
+ """
+ Getting an xkcd comic's information along with the image.
+
+ To get a random comic, don't type any number as an argument. To get the latest, type 'latest'.
+ """
+ embed = Embed(title=f"XKCD comic '{comic}'")
+
+ embed.colour = Colours.soft_red
+
+ if comic and (comic := re.match(COMIC_FORMAT, comic)) is None:
+ embed.description = "Comic parameter should either be an integer or 'latest'."
+ await ctx.send(embed=embed)
+ return
+
+ comic = randint(1, self.latest_comic_info['num']) if comic is None else comic.group(0)
+
+ if comic == "latest":
+ info = self.latest_comic_info
+ else:
+ async with self.bot.http_session.get(f"{BASE_URL}/{comic}/info.0.json") as resp:
+ if resp.status == 200:
+ info = await resp.json()
+ else:
+ embed.title = f"XKCD comic #{comic}"
+ embed.description = f"{resp.status}: Could not retrieve xkcd comic #{comic}."
+ log.debug(f"Retrieving xkcd comic #{comic} failed with status code {resp.status}.")
+ await ctx.send(embed=embed)
+ return
+
+ embed.title = f"XKCD comic #{info['num']}"
+
+ if info["img"][-3:] in ("jpg", "png", "gif"):
+ embed.set_image(url=info["img"])
+ date = f"{info['year']}/{info['month']}/{info['day']}"
+ embed.set_footer(text=f"{date} - #{info['num']}, \'{info['safe_title']}\'")
+ embed.colour = Colours.soft_green
+ else:
+ embed.description = (
+ "The selected comic is interactive, and cannot be displayed within an embed.\n"
+ f"Comic can be viewed [here](https://xkcd.com/{info['num']})."
+ )
+
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Loading the XKCD cog."""
+ bot.add_cog(XKCD(bot))