diff options
author | 2021-09-05 00:31:20 -0400 | |
---|---|---|
committer | 2021-09-05 00:31:20 -0400 | |
commit | 02512e43f3d68ffd89654c5f2e9e3e9a27c0c018 (patch) | |
tree | 4b62a6dbb39601f02aa435c7eb8a10433585c3bb /bot/exts/fun/game.py | |
parent | Move snakes commands into fun folder (diff) |
Move game and fun commands to Fun folder, fix ddg
This moves all the fun commands and games into the fun folder.
This commit also makes changes to the duck_game.
It was setting a footer during an embed init, which is no longer
possible with the version of d.py we use. Additionally, an issue with
editing an embed that had a local image loaded.
The workaround for the time being is to update the message,
not the embed.
Diffstat (limited to 'bot/exts/fun/game.py')
-rw-r--r-- | bot/exts/fun/game.py | 485 |
1 files changed, 485 insertions, 0 deletions
diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py new file mode 100644 index 00000000..f9c150e6 --- /dev/null +++ b/bot/exts/fun/game.py @@ -0,0 +1,485 @@ +import difflib +import logging +import random +import re +from asyncio import sleep +from datetime import datetime as dt, timedelta +from enum import IntEnum +from typing import Any, Optional + +from aiohttp import ClientSession +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import STAFF_ROLES, Tokens +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import ImagePaginator, LinePaginator + +# Base URL of IGDB API +BASE_URL = "https://api.igdb.com/v4" + +CLIENT_ID = Tokens.igdb_client_id +CLIENT_SECRET = Tokens.igdb_client_secret + +# The number of seconds before expiry that we attempt to re-fetch a new access token +ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2 + +# URL to request API access token +OAUTH_URL = "https://id.twitch.tv/oauth2/token" + +OAUTH_PARAMS = { + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "grant_type": "client_credentials" +} + +BASE_HEADERS = { + "Client-ID": CLIENT_ID, + "Accept": "application/json" +} + +logger = logging.getLogger(__name__) + +REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE) + +# --------- +# TEMPLATES +# --------- + +# Body templates +# Request body template for get_games_list +GAMES_LIST_BODY = ( + "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status," + "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;" + "{sort} {limit} {offset} {genre} {additional}" +) + +# Request body template for get_companies_list +COMPANIES_LIST_BODY = ( + "fields name, url, start_date, logo.image_id, developed.name, published.name, description;" + "offset {offset}; limit {limit};" +) + +# Request body template for games search +SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";' + +# Pages templates +# Game embed layout +GAME_PAGE = ( + "**[{name}]({url})**\n" + "{description}" + "**Release Date:** {release_date}\n" + "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n" + "**Platforms:** {platforms}\n" + "**Status:** {status}\n" + "**Age Ratings:** {age_ratings}\n" + "**Made by:** {made_by}\n\n" + "{storyline}" +) + +# .games company command page layout +COMPANY_PAGE = ( + "**[{name}]({url})**\n" + "{description}" + "**Founded:** {founded}\n" + "**Developed:** {developed}\n" + "**Published:** {published}" +) + +# For .games search command line layout +GAME_SEARCH_LINE = ( + "**[{name}]({url})**\n" + "{rating}/100 :star: (based on {rating_count} ratings)\n" +) + +# URL templates +COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg" +LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png" + +# Create aliases for complex genre names +ALIASES = { + "Role-playing (rpg)": ["Role playing", "Rpg"], + "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"], + "Real time strategy (rts)": ["Real time strategy", "Rts"], + "Hack and slash/beat 'em up": ["Hack and slash"] +} + + +class GameStatus(IntEnum): + """Game statuses in IGDB API.""" + + Released = 0 + Alpha = 2 + Beta = 3 + Early = 4 + Offline = 5 + Cancelled = 6 + Rumored = 7 + + +class AgeRatingCategories(IntEnum): + """IGDB API Age Rating categories IDs.""" + + ESRB = 1 + PEGI = 2 + + +class AgeRatings(IntEnum): + """PEGI/ESRB ratings IGDB API IDs.""" + + Three = 1 + Seven = 2 + Twelve = 3 + Sixteen = 4 + Eighteen = 5 + RP = 6 + EC = 7 + E = 8 + E10 = 9 + T = 10 + M = 11 + AO = 12 + + +class Games(Cog): + """Games Cog contains commands that collect data from IGDB.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.http_session: ClientSession = bot.http_session + + self.genres: dict[str, int] = {} + self.headers = BASE_HEADERS + + self.bot.loop.create_task(self.renew_access_token()) + + async def renew_access_token(self) -> None: + """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`.""" + while True: + async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp: + result = await resp.json() + if resp.status != 200: + # If there is a valid access token continue to use that, + # otherwise unload cog. + if "Authorization" in self.headers: + time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW) + logger.error( + "Failed to renew IGDB access token. " + f"Current token will last for {time_delta} " + f"OAuth response message: {result['message']}" + ) + else: + logger.warning( + "Invalid OAuth credentials. Unloading Games cog. " + f"OAuth response message: {result['message']}" + ) + self.bot.remove_cog("Games") + + return + + self.headers["Authorization"] = f"Bearer {result['access_token']}" + + # Attempt to renew before the token expires + next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW + + time_delta = timedelta(seconds=next_renewal) + logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}") + + # This will be true the first time this loop runs. + # Since we now have an access token, its safe to start this task. + if self.genres == {}: + self.refresh_genres_task.start() + await sleep(next_renewal) + + @tasks.loop(hours=24.0) + async def refresh_genres_task(self) -> None: + """Refresh genres in every hour.""" + try: + await self._get_genres() + except Exception as e: + logger.warning(f"There was error while refreshing genres: {e}") + return + logger.info("Successfully refreshed genres.") + + def cog_unload(self) -> None: + """Cancel genres refreshing start when unloading Cog.""" + self.refresh_genres_task.cancel() + logger.info("Successfully stopped Genres Refreshing task.") + + async def _get_genres(self) -> None: + """Create genres variable for games command.""" + body = "fields name; limit 100;" + async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp: + result = await resp.json() + genres = {genre["name"].capitalize(): genre["id"] for genre in result} + + # Replace complex names with names from ALIASES + for genre_name, genre in genres.items(): + if genre_name in ALIASES: + for alias in ALIASES[genre_name]: + self.genres[alias] = genre + else: + self.genres[genre_name] = genre + + @group(name="games", aliases=("game",), invoke_without_command=True) + async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None: + """ + Get random game(s) by genre from IGDB. Use .games genres command to get all available genres. + + Also support amount parameter, what max is 25 and min 1, default 5. Supported formats: + - .games <genre> + - .games <amount> <genre> + """ + # When user didn't specified genre, send help message + if genre is None: + await invoke_help_command(ctx) + return + + # Capitalize genre for check + genre = "".join(genre).capitalize() + + # Check for amounts, max is 25 and min 1 + if not 1 <= amount <= 25: + await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") + return + + # Get games listing, if genre don't exist, show error message with possibilities. + # Offset must be random, due otherwise we will get always same result (offset show in which position should + # API start returning result) + try: + games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150)) + except KeyError: + possibilities = await self.get_best_results(genre) + # If there is more than 1 possibilities, show these. + # If there is only 1 possibility, use it as genre. + # Otherwise send message about invalid genre. + if len(possibilities) > 1: + display_possibilities = "`, `".join(p[1] for p in possibilities) + await ctx.send( + f"Invalid genre `{genre}`. " + f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}" + ) + return + elif len(possibilities) == 1: + games = await self.get_games_list( + amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150) + ) + genre = possibilities[0][1] + else: + await ctx.send(f"Invalid genre `{genre}`.") + return + + # Create pages and paginate + pages = [await self.create_page(game) for game in games] + + await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games")) + + @games.command(name="top", aliases=("t",)) + async def top(self, ctx: Context, amount: int = 10) -> None: + """ + Get current Top games in IGDB. + + Support amount parameter. Max is 25, min is 1. + """ + if not 1 <= amount <= 25: + await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") + return + + games = await self.get_games_list(amount, sort="total_rating desc", + additional_body="where total_rating >= 90; sort total_rating_count desc;") + + pages = [await self.create_page(game) for game in games] + await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games")) + + @games.command(name="genres", aliases=("genre", "g")) + async def genres(self, ctx: Context) -> None: + """Get all available genres.""" + await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}") + + @games.command(name="search", aliases=("s",)) + async def search(self, ctx: Context, *, search_term: str) -> None: + """Find games by name.""" + lines = await self.search_games(search_term) + + await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False) + + @games.command(name="company", aliases=("companies",)) + async def company(self, ctx: Context, amount: int = 5) -> None: + """ + Get random Game Companies companies from IGDB API. + + Support amount parameter. Max is 25, min is 1. + """ + if not 1 <= amount <= 25: + await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") + return + + # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to + # get (almost) every time different companies (offset show in which position should API start returning result) + companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150)) + pages = [await self.create_company_page(co) for co in companies] + + await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies")) + + @with_role(*STAFF_ROLES) + @games.command(name="refresh", aliases=("r",)) + async def refresh_genres_command(self, ctx: Context) -> None: + """Refresh .games command genres.""" + try: + await self._get_genres() + except Exception as e: + await ctx.send(f"There was error while refreshing genres: `{e}`") + return + await ctx.send("Successfully refreshed genres.") + + async def get_games_list( + self, + amount: int, + genre: Optional[str] = None, + sort: Optional[str] = None, + additional_body: str = "", + offset: int = 0 + ) -> list[dict[str, Any]]: + """ + Get list of games from IGDB API by parameters that is provided. + + Amount param show how much games this get, genre is genre ID and at least one genre in game must this when + provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field, + desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start + position in API. + """ + # Create body of IGDB API request, define fields, sorting, offset, limit and genre + params = { + "sort": f"sort {sort};" if sort else "", + "limit": f"limit {amount};", + "offset": f"offset {offset};" if offset else "", + "genre": f"where genres = ({genre});" if genre else "", + "additional": additional_body + } + body = GAMES_LIST_BODY.format(**params) + + # Do request to IGDB API, create headers, URL, define body, return result + async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: + return await resp.json() + + async def create_page(self, data: dict[str, Any]) -> tuple[str, str]: + """Create content of Game Page.""" + # Create cover image URL from template + url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""}) + + # Get release date separately with checking + release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?" + + # Create Age Ratings value + rating = ", ".join( + f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" + for age in data["age_ratings"] + ) if "age_ratings" in data else "?" + + companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?" + + # Create formatting for template page + formatting = { + "name": data["name"], + "url": data["url"], + "description": f"{data['summary']}\n\n" if "summary" in data else "\n", + "release_date": release_date, + "rating": round(data["total_rating"] if "total_rating" in data else 0, 2), + "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?", + "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?", + "status": GameStatus(data["status"]).name if "status" in data else "?", + "age_ratings": rating, + "made_by": ", ".join(companies), + "storyline": data["storyline"] if "storyline" in data else "" + } + page = GAME_PAGE.format(**formatting) + + return page, url + + async def search_games(self, search_term: str) -> list[str]: + """Search game from IGDB API by string, return listing of pages.""" + lines = [] + + # Define request body of IGDB API request and do request + body = SEARCH_BODY.format(**{"term": search_term}) + + async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: + data = await resp.json() + + # Loop over games, format them to good format, make line and append this to total lines + for game in data: + formatting = { + "name": game["name"], + "url": game["url"], + "rating": round(game["total_rating"] if "total_rating" in game else 0, 2), + "rating_count": game["total_rating_count"] if "total_rating" in game else "?" + } + line = GAME_SEARCH_LINE.format(**formatting) + lines.append(line) + + return lines + + async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]: + """ + Get random Game Companies from IGDB API. + + Limit is parameter, that show how much movies this should return, offset show in which position should API start + returning results. + """ + # Create request body from template + body = COMPANIES_LIST_BODY.format(**{ + "limit": limit, + "offset": offset + }) + + async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp: + return await resp.json() + + async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]: + """Create good formatted Game Company page.""" + # Generate URL of company logo + url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""}) + + # Try to get found date of company + founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?" + + # Generate list of games, that company have developed or published + developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?" + published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?" + + formatting = { + "name": data["name"], + "url": data["url"], + "description": f"{data['description']}\n\n" if "description" in data else "\n", + "founded": founded, + "developed": developed, + "published": published + } + page = COMPANY_PAGE.format(**formatting) + + return page, url + + async def get_best_results(self, query: str) -> list[tuple[float, str]]: + """Get best match result of genre when original genre is invalid.""" + results = [] + for genre in self.genres: + ratios = [difflib.SequenceMatcher(None, query, genre).ratio()] + for word in REGEX_NON_ALPHABET.split(genre): + ratios.append(difflib.SequenceMatcher(None, query, word).ratio()) + results.append((round(max(ratios), 2), genre)) + return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] + + +def setup(bot: Bot) -> None: + """Load the Games cog.""" + # Check does IGDB API key exist, if not, log warning and don't load cog + if not Tokens.igdb_client_id: + logger.warning("No IGDB client ID. Not loading Games cog.") + return + if not Tokens.igdb_client_secret: + logger.warning("No IGDB client secret. Not loading Games cog.") + return + bot.add_cog(Games(bot)) |