aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/fun/game.py
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/fun/game.py')
-rw-r--r--bot/exts/fun/game.py485
1 files changed, 485 insertions, 0 deletions
diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py
new file mode 100644
index 00000000..f9c150e6
--- /dev/null
+++ b/bot/exts/fun/game.py
@@ -0,0 +1,485 @@
+import difflib
+import logging
+import random
+import re
+from asyncio import sleep
+from datetime import datetime as dt, timedelta
+from enum import IntEnum
+from typing import Any, Optional
+
+from aiohttp import ClientSession
+from discord import Embed
+from discord.ext import tasks
+from discord.ext.commands import Cog, Context, group
+
+from bot.bot import Bot
+from bot.constants import STAFF_ROLES, Tokens
+from bot.utils.decorators import with_role
+from bot.utils.extensions import invoke_help_command
+from bot.utils.pagination import ImagePaginator, LinePaginator
+
+# Base URL of IGDB API
+BASE_URL = "https://api.igdb.com/v4"
+
+CLIENT_ID = Tokens.igdb_client_id
+CLIENT_SECRET = Tokens.igdb_client_secret
+
+# The number of seconds before expiry that we attempt to re-fetch a new access token
+ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2
+
+# URL to request API access token
+OAUTH_URL = "https://id.twitch.tv/oauth2/token"
+
+OAUTH_PARAMS = {
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ "grant_type": "client_credentials"
+}
+
+BASE_HEADERS = {
+ "Client-ID": CLIENT_ID,
+ "Accept": "application/json"
+}
+
+logger = logging.getLogger(__name__)
+
+REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE)
+
+# ---------
+# TEMPLATES
+# ---------
+
+# Body templates
+# Request body template for get_games_list
+GAMES_LIST_BODY = (
+ "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status,"
+ "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;"
+ "{sort} {limit} {offset} {genre} {additional}"
+)
+
+# Request body template for get_companies_list
+COMPANIES_LIST_BODY = (
+ "fields name, url, start_date, logo.image_id, developed.name, published.name, description;"
+ "offset {offset}; limit {limit};"
+)
+
+# Request body template for games search
+SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";'
+
+# Pages templates
+# Game embed layout
+GAME_PAGE = (
+ "**[{name}]({url})**\n"
+ "{description}"
+ "**Release Date:** {release_date}\n"
+ "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n"
+ "**Platforms:** {platforms}\n"
+ "**Status:** {status}\n"
+ "**Age Ratings:** {age_ratings}\n"
+ "**Made by:** {made_by}\n\n"
+ "{storyline}"
+)
+
+# .games company command page layout
+COMPANY_PAGE = (
+ "**[{name}]({url})**\n"
+ "{description}"
+ "**Founded:** {founded}\n"
+ "**Developed:** {developed}\n"
+ "**Published:** {published}"
+)
+
+# For .games search command line layout
+GAME_SEARCH_LINE = (
+ "**[{name}]({url})**\n"
+ "{rating}/100 :star: (based on {rating_count} ratings)\n"
+)
+
+# URL templates
+COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg"
+LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png"
+
+# Create aliases for complex genre names
+ALIASES = {
+ "Role-playing (rpg)": ["Role playing", "Rpg"],
+ "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"],
+ "Real time strategy (rts)": ["Real time strategy", "Rts"],
+ "Hack and slash/beat 'em up": ["Hack and slash"]
+}
+
+
+class GameStatus(IntEnum):
+ """Game statuses in IGDB API."""
+
+ Released = 0
+ Alpha = 2
+ Beta = 3
+ Early = 4
+ Offline = 5
+ Cancelled = 6
+ Rumored = 7
+
+
+class AgeRatingCategories(IntEnum):
+ """IGDB API Age Rating categories IDs."""
+
+ ESRB = 1
+ PEGI = 2
+
+
+class AgeRatings(IntEnum):
+ """PEGI/ESRB ratings IGDB API IDs."""
+
+ Three = 1
+ Seven = 2
+ Twelve = 3
+ Sixteen = 4
+ Eighteen = 5
+ RP = 6
+ EC = 7
+ E = 8
+ E10 = 9
+ T = 10
+ M = 11
+ AO = 12
+
+
+class Games(Cog):
+ """Games Cog contains commands that collect data from IGDB."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.http_session: ClientSession = bot.http_session
+
+ self.genres: dict[str, int] = {}
+ self.headers = BASE_HEADERS
+
+ self.bot.loop.create_task(self.renew_access_token())
+
+ async def renew_access_token(self) -> None:
+ """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`."""
+ while True:
+ async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp:
+ result = await resp.json()
+ if resp.status != 200:
+ # If there is a valid access token continue to use that,
+ # otherwise unload cog.
+ if "Authorization" in self.headers:
+ time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW)
+ logger.error(
+ "Failed to renew IGDB access token. "
+ f"Current token will last for {time_delta} "
+ f"OAuth response message: {result['message']}"
+ )
+ else:
+ logger.warning(
+ "Invalid OAuth credentials. Unloading Games cog. "
+ f"OAuth response message: {result['message']}"
+ )
+ self.bot.remove_cog("Games")
+
+ return
+
+ self.headers["Authorization"] = f"Bearer {result['access_token']}"
+
+ # Attempt to renew before the token expires
+ next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW
+
+ time_delta = timedelta(seconds=next_renewal)
+ logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}")
+
+ # This will be true the first time this loop runs.
+ # Since we now have an access token, its safe to start this task.
+ if self.genres == {}:
+ self.refresh_genres_task.start()
+ await sleep(next_renewal)
+
+ @tasks.loop(hours=24.0)
+ async def refresh_genres_task(self) -> None:
+ """Refresh genres in every hour."""
+ try:
+ await self._get_genres()
+ except Exception as e:
+ logger.warning(f"There was error while refreshing genres: {e}")
+ return
+ logger.info("Successfully refreshed genres.")
+
+ def cog_unload(self) -> None:
+ """Cancel genres refreshing start when unloading Cog."""
+ self.refresh_genres_task.cancel()
+ logger.info("Successfully stopped Genres Refreshing task.")
+
+ async def _get_genres(self) -> None:
+ """Create genres variable for games command."""
+ body = "fields name; limit 100;"
+ async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp:
+ result = await resp.json()
+ genres = {genre["name"].capitalize(): genre["id"] for genre in result}
+
+ # Replace complex names with names from ALIASES
+ for genre_name, genre in genres.items():
+ if genre_name in ALIASES:
+ for alias in ALIASES[genre_name]:
+ self.genres[alias] = genre
+ else:
+ self.genres[genre_name] = genre
+
+ @group(name="games", aliases=("game",), invoke_without_command=True)
+ async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None:
+ """
+ Get random game(s) by genre from IGDB. Use .games genres command to get all available genres.
+
+ Also support amount parameter, what max is 25 and min 1, default 5. Supported formats:
+ - .games <genre>
+ - .games <amount> <genre>
+ """
+ # When user didn't specified genre, send help message
+ if genre is None:
+ await invoke_help_command(ctx)
+ return
+
+ # Capitalize genre for check
+ genre = "".join(genre).capitalize()
+
+ # Check for amounts, max is 25 and min 1
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ # Get games listing, if genre don't exist, show error message with possibilities.
+ # Offset must be random, due otherwise we will get always same result (offset show in which position should
+ # API start returning result)
+ try:
+ games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150))
+ except KeyError:
+ possibilities = await self.get_best_results(genre)
+ # If there is more than 1 possibilities, show these.
+ # If there is only 1 possibility, use it as genre.
+ # Otherwise send message about invalid genre.
+ if len(possibilities) > 1:
+ display_possibilities = "`, `".join(p[1] for p in possibilities)
+ await ctx.send(
+ f"Invalid genre `{genre}`. "
+ f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}"
+ )
+ return
+ elif len(possibilities) == 1:
+ games = await self.get_games_list(
+ amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150)
+ )
+ genre = possibilities[0][1]
+ else:
+ await ctx.send(f"Invalid genre `{genre}`.")
+ return
+
+ # Create pages and paginate
+ pages = [await self.create_page(game) for game in games]
+
+ await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games"))
+
+ @games.command(name="top", aliases=("t",))
+ async def top(self, ctx: Context, amount: int = 10) -> None:
+ """
+ Get current Top games in IGDB.
+
+ Support amount parameter. Max is 25, min is 1.
+ """
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ games = await self.get_games_list(amount, sort="total_rating desc",
+ additional_body="where total_rating >= 90; sort total_rating_count desc;")
+
+ pages = [await self.create_page(game) for game in games]
+ await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games"))
+
+ @games.command(name="genres", aliases=("genre", "g"))
+ async def genres(self, ctx: Context) -> None:
+ """Get all available genres."""
+ await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}")
+
+ @games.command(name="search", aliases=("s",))
+ async def search(self, ctx: Context, *, search_term: str) -> None:
+ """Find games by name."""
+ lines = await self.search_games(search_term)
+
+ await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False)
+
+ @games.command(name="company", aliases=("companies",))
+ async def company(self, ctx: Context, amount: int = 5) -> None:
+ """
+ Get random Game Companies companies from IGDB API.
+
+ Support amount parameter. Max is 25, min is 1.
+ """
+ if not 1 <= amount <= 25:
+ await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.")
+ return
+
+ # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to
+ # get (almost) every time different companies (offset show in which position should API start returning result)
+ companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150))
+ pages = [await self.create_company_page(co) for co in companies]
+
+ await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies"))
+
+ @with_role(*STAFF_ROLES)
+ @games.command(name="refresh", aliases=("r",))
+ async def refresh_genres_command(self, ctx: Context) -> None:
+ """Refresh .games command genres."""
+ try:
+ await self._get_genres()
+ except Exception as e:
+ await ctx.send(f"There was error while refreshing genres: `{e}`")
+ return
+ await ctx.send("Successfully refreshed genres.")
+
+ async def get_games_list(
+ self,
+ amount: int,
+ genre: Optional[str] = None,
+ sort: Optional[str] = None,
+ additional_body: str = "",
+ offset: int = 0
+ ) -> list[dict[str, Any]]:
+ """
+ Get list of games from IGDB API by parameters that is provided.
+
+ Amount param show how much games this get, genre is genre ID and at least one genre in game must this when
+ provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field,
+ desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start
+ position in API.
+ """
+ # Create body of IGDB API request, define fields, sorting, offset, limit and genre
+ params = {
+ "sort": f"sort {sort};" if sort else "",
+ "limit": f"limit {amount};",
+ "offset": f"offset {offset};" if offset else "",
+ "genre": f"where genres = ({genre});" if genre else "",
+ "additional": additional_body
+ }
+ body = GAMES_LIST_BODY.format(**params)
+
+ # Do request to IGDB API, create headers, URL, define body, return result
+ async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp:
+ return await resp.json()
+
+ async def create_page(self, data: dict[str, Any]) -> tuple[str, str]:
+ """Create content of Game Page."""
+ # Create cover image URL from template
+ url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""})
+
+ # Get release date separately with checking
+ release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?"
+
+ # Create Age Ratings value
+ rating = ", ".join(
+ f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}"
+ for age in data["age_ratings"]
+ ) if "age_ratings" in data else "?"
+
+ companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?"
+
+ # Create formatting for template page
+ formatting = {
+ "name": data["name"],
+ "url": data["url"],
+ "description": f"{data['summary']}\n\n" if "summary" in data else "\n",
+ "release_date": release_date,
+ "rating": round(data["total_rating"] if "total_rating" in data else 0, 2),
+ "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?",
+ "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?",
+ "status": GameStatus(data["status"]).name if "status" in data else "?",
+ "age_ratings": rating,
+ "made_by": ", ".join(companies),
+ "storyline": data["storyline"] if "storyline" in data else ""
+ }
+ page = GAME_PAGE.format(**formatting)
+
+ return page, url
+
+ async def search_games(self, search_term: str) -> list[str]:
+ """Search game from IGDB API by string, return listing of pages."""
+ lines = []
+
+ # Define request body of IGDB API request and do request
+ body = SEARCH_BODY.format(**{"term": search_term})
+
+ async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp:
+ data = await resp.json()
+
+ # Loop over games, format them to good format, make line and append this to total lines
+ for game in data:
+ formatting = {
+ "name": game["name"],
+ "url": game["url"],
+ "rating": round(game["total_rating"] if "total_rating" in game else 0, 2),
+ "rating_count": game["total_rating_count"] if "total_rating" in game else "?"
+ }
+ line = GAME_SEARCH_LINE.format(**formatting)
+ lines.append(line)
+
+ return lines
+
+ async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]:
+ """
+ Get random Game Companies from IGDB API.
+
+ Limit is parameter, that show how much movies this should return, offset show in which position should API start
+ returning results.
+ """
+ # Create request body from template
+ body = COMPANIES_LIST_BODY.format(**{
+ "limit": limit,
+ "offset": offset
+ })
+
+ async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp:
+ return await resp.json()
+
+ async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]:
+ """Create good formatted Game Company page."""
+ # Generate URL of company logo
+ url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""})
+
+ # Try to get found date of company
+ founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?"
+
+ # Generate list of games, that company have developed or published
+ developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?"
+ published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?"
+
+ formatting = {
+ "name": data["name"],
+ "url": data["url"],
+ "description": f"{data['description']}\n\n" if "description" in data else "\n",
+ "founded": founded,
+ "developed": developed,
+ "published": published
+ }
+ page = COMPANY_PAGE.format(**formatting)
+
+ return page, url
+
+ async def get_best_results(self, query: str) -> list[tuple[float, str]]:
+ """Get best match result of genre when original genre is invalid."""
+ results = []
+ for genre in self.genres:
+ ratios = [difflib.SequenceMatcher(None, query, genre).ratio()]
+ for word in REGEX_NON_ALPHABET.split(genre):
+ ratios.append(difflib.SequenceMatcher(None, query, word).ratio())
+ results.append((round(max(ratios), 2), genre))
+ return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4]
+
+
+def setup(bot: Bot) -> None:
+ """Load the Games cog."""
+ # Check does IGDB API key exist, if not, log warning and don't load cog
+ if not Tokens.igdb_client_id:
+ logger.warning("No IGDB client ID. Not loading Games cog.")
+ return
+ if not Tokens.igdb_client_secret:
+ logger.warning("No IGDB client secret. Not loading Games cog.")
+ return
+ bot.add_cog(Games(bot))