diff options
Diffstat (limited to 'bot/exts/fun/game.py')
| -rw-r--r-- | bot/exts/fun/game.py | 485 | 
1 files changed, 485 insertions, 0 deletions
| diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py new file mode 100644 index 00000000..f9c150e6 --- /dev/null +++ b/bot/exts/fun/game.py @@ -0,0 +1,485 @@ +import difflib +import logging +import random +import re +from asyncio import sleep +from datetime import datetime as dt, timedelta +from enum import IntEnum +from typing import Any, Optional + +from aiohttp import ClientSession +from discord import Embed +from discord.ext import tasks +from discord.ext.commands import Cog, Context, group + +from bot.bot import Bot +from bot.constants import STAFF_ROLES, Tokens +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import ImagePaginator, LinePaginator + +# Base URL of IGDB API +BASE_URL = "https://api.igdb.com/v4" + +CLIENT_ID = Tokens.igdb_client_id +CLIENT_SECRET = Tokens.igdb_client_secret + +# The number of seconds before expiry that we attempt to re-fetch a new access token +ACCESS_TOKEN_RENEWAL_WINDOW = 60*60*24*2 + +# URL to request API access token +OAUTH_URL = "https://id.twitch.tv/oauth2/token" + +OAUTH_PARAMS = { +    "client_id": CLIENT_ID, +    "client_secret": CLIENT_SECRET, +    "grant_type": "client_credentials" +} + +BASE_HEADERS = { +    "Client-ID": CLIENT_ID, +    "Accept": "application/json" +} + +logger = logging.getLogger(__name__) + +REGEX_NON_ALPHABET = re.compile(r"[^a-z0-9]", re.IGNORECASE) + +# --------- +# TEMPLATES +# --------- + +# Body templates +# Request body template for get_games_list +GAMES_LIST_BODY = ( +    "fields cover.image_id, first_release_date, total_rating, name, storyline, url, platforms.name, status," +    "involved_companies.company.name, summary, age_ratings.category, age_ratings.rating, total_rating_count;" +    "{sort} {limit} {offset} {genre} {additional}" +) + +# Request body template for get_companies_list +COMPANIES_LIST_BODY = ( +    "fields name, url, start_date, logo.image_id, developed.name, published.name, description;" +    "offset {offset}; limit {limit};" +) + +# Request body template for games search +SEARCH_BODY = 'fields name, url, storyline, total_rating, total_rating_count; limit 50; search "{term}";' + +# Pages templates +# Game embed layout +GAME_PAGE = ( +    "**[{name}]({url})**\n" +    "{description}" +    "**Release Date:** {release_date}\n" +    "**Rating:** {rating}/100 :star: (based on {rating_count} ratings)\n" +    "**Platforms:** {platforms}\n" +    "**Status:** {status}\n" +    "**Age Ratings:** {age_ratings}\n" +    "**Made by:** {made_by}\n\n" +    "{storyline}" +) + +# .games company command page layout +COMPANY_PAGE = ( +    "**[{name}]({url})**\n" +    "{description}" +    "**Founded:** {founded}\n" +    "**Developed:** {developed}\n" +    "**Published:** {published}" +) + +# For .games search command line layout +GAME_SEARCH_LINE = ( +    "**[{name}]({url})**\n" +    "{rating}/100 :star: (based on {rating_count} ratings)\n" +) + +# URL templates +COVER_URL = "https://images.igdb.com/igdb/image/upload/t_cover_big/{image_id}.jpg" +LOGO_URL = "https://images.igdb.com/igdb/image/upload/t_logo_med/{image_id}.png" + +# Create aliases for complex genre names +ALIASES = { +    "Role-playing (rpg)": ["Role playing", "Rpg"], +    "Turn-based strategy (tbs)": ["Turn based strategy", "Tbs"], +    "Real time strategy (rts)": ["Real time strategy", "Rts"], +    "Hack and slash/beat 'em up": ["Hack and slash"] +} + + +class GameStatus(IntEnum): +    """Game statuses in IGDB API.""" + +    Released = 0 +    Alpha = 2 +    Beta = 3 +    Early = 4 +    Offline = 5 +    Cancelled = 6 +    Rumored = 7 + + +class AgeRatingCategories(IntEnum): +    """IGDB API Age Rating categories IDs.""" + +    ESRB = 1 +    PEGI = 2 + + +class AgeRatings(IntEnum): +    """PEGI/ESRB ratings IGDB API IDs.""" + +    Three = 1 +    Seven = 2 +    Twelve = 3 +    Sixteen = 4 +    Eighteen = 5 +    RP = 6 +    EC = 7 +    E = 8 +    E10 = 9 +    T = 10 +    M = 11 +    AO = 12 + + +class Games(Cog): +    """Games Cog contains commands that collect data from IGDB.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.http_session: ClientSession = bot.http_session + +        self.genres: dict[str, int] = {} +        self.headers = BASE_HEADERS + +        self.bot.loop.create_task(self.renew_access_token()) + +    async def renew_access_token(self) -> None: +        """Refeshes V4 access token a number of seconds before expiry. See `ACCESS_TOKEN_RENEWAL_WINDOW`.""" +        while True: +            async with self.http_session.post(OAUTH_URL, params=OAUTH_PARAMS) as resp: +                result = await resp.json() +                if resp.status != 200: +                    # If there is a valid access token continue to use that, +                    # otherwise unload cog. +                    if "Authorization" in self.headers: +                        time_delta = timedelta(seconds=ACCESS_TOKEN_RENEWAL_WINDOW) +                        logger.error( +                            "Failed to renew IGDB access token. " +                            f"Current token will last for {time_delta} " +                            f"OAuth response message: {result['message']}" +                        ) +                    else: +                        logger.warning( +                            "Invalid OAuth credentials. Unloading Games cog. " +                            f"OAuth response message: {result['message']}" +                        ) +                        self.bot.remove_cog("Games") + +                    return + +            self.headers["Authorization"] = f"Bearer {result['access_token']}" + +            # Attempt to renew before the token expires +            next_renewal = result["expires_in"] - ACCESS_TOKEN_RENEWAL_WINDOW + +            time_delta = timedelta(seconds=next_renewal) +            logger.info(f"Successfully renewed access token. Refreshing again in {time_delta}") + +            # This will be true the first time this loop runs. +            # Since we now have an access token, its safe to start this task. +            if self.genres == {}: +                self.refresh_genres_task.start() +            await sleep(next_renewal) + +    @tasks.loop(hours=24.0) +    async def refresh_genres_task(self) -> None: +        """Refresh genres in every hour.""" +        try: +            await self._get_genres() +        except Exception as e: +            logger.warning(f"There was error while refreshing genres: {e}") +            return +        logger.info("Successfully refreshed genres.") + +    def cog_unload(self) -> None: +        """Cancel genres refreshing start when unloading Cog.""" +        self.refresh_genres_task.cancel() +        logger.info("Successfully stopped Genres Refreshing task.") + +    async def _get_genres(self) -> None: +        """Create genres variable for games command.""" +        body = "fields name; limit 100;" +        async with self.http_session.post(f"{BASE_URL}/genres", data=body, headers=self.headers) as resp: +            result = await resp.json() +        genres = {genre["name"].capitalize(): genre["id"] for genre in result} + +        # Replace complex names with names from ALIASES +        for genre_name, genre in genres.items(): +            if genre_name in ALIASES: +                for alias in ALIASES[genre_name]: +                    self.genres[alias] = genre +            else: +                self.genres[genre_name] = genre + +    @group(name="games", aliases=("game",), invoke_without_command=True) +    async def games(self, ctx: Context, amount: Optional[int] = 5, *, genre: Optional[str]) -> None: +        """ +        Get random game(s) by genre from IGDB. Use .games genres command to get all available genres. + +        Also support amount parameter, what max is 25 and min 1, default 5. Supported formats: +        - .games <genre> +        - .games <amount> <genre> +        """ +        # When user didn't specified genre, send help message +        if genre is None: +            await invoke_help_command(ctx) +            return + +        # Capitalize genre for check +        genre = "".join(genre).capitalize() + +        # Check for amounts, max is 25 and min 1 +        if not 1 <= amount <= 25: +            await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") +            return + +        # Get games listing, if genre don't exist, show error message with possibilities. +        # Offset must be random, due otherwise we will get always same result (offset show in which position should +        # API start returning result) +        try: +            games = await self.get_games_list(amount, self.genres[genre], offset=random.randint(0, 150)) +        except KeyError: +            possibilities = await self.get_best_results(genre) +            # If there is more than 1 possibilities, show these. +            # If there is only 1 possibility, use it as genre. +            # Otherwise send message about invalid genre. +            if len(possibilities) > 1: +                display_possibilities = "`, `".join(p[1] for p in possibilities) +                await ctx.send( +                    f"Invalid genre `{genre}`. " +                    f"{f'Maybe you meant `{display_possibilities}`?' if display_possibilities else ''}" +                ) +                return +            elif len(possibilities) == 1: +                games = await self.get_games_list( +                    amount, self.genres[possibilities[0][1]], offset=random.randint(0, 150) +                ) +                genre = possibilities[0][1] +            else: +                await ctx.send(f"Invalid genre `{genre}`.") +                return + +        # Create pages and paginate +        pages = [await self.create_page(game) for game in games] + +        await ImagePaginator.paginate(pages, ctx, Embed(title=f"Random {genre.title()} Games")) + +    @games.command(name="top", aliases=("t",)) +    async def top(self, ctx: Context, amount: int = 10) -> None: +        """ +        Get current Top games in IGDB. + +        Support amount parameter. Max is 25, min is 1. +        """ +        if not 1 <= amount <= 25: +            await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") +            return + +        games = await self.get_games_list(amount, sort="total_rating desc", +                                          additional_body="where total_rating >= 90; sort total_rating_count desc;") + +        pages = [await self.create_page(game) for game in games] +        await ImagePaginator.paginate(pages, ctx, Embed(title=f"Top {amount} Games")) + +    @games.command(name="genres", aliases=("genre", "g")) +    async def genres(self, ctx: Context) -> None: +        """Get all available genres.""" +        await ctx.send(f"Currently available genres: {', '.join(f'`{genre}`' for genre in self.genres)}") + +    @games.command(name="search", aliases=("s",)) +    async def search(self, ctx: Context, *, search_term: str) -> None: +        """Find games by name.""" +        lines = await self.search_games(search_term) + +        await LinePaginator.paginate(lines, ctx, Embed(title=f"Game Search Results: {search_term}"), empty=False) + +    @games.command(name="company", aliases=("companies",)) +    async def company(self, ctx: Context, amount: int = 5) -> None: +        """ +        Get random Game Companies companies from IGDB API. + +        Support amount parameter. Max is 25, min is 1. +        """ +        if not 1 <= amount <= 25: +            await ctx.send("Your provided amount is out of range. Our minimum is 1 and maximum 25.") +            return + +        # Get companies listing. Provide limit for limiting how much companies will be returned. Get random offset to +        # get (almost) every time different companies (offset show in which position should API start returning result) +        companies = await self.get_companies_list(limit=amount, offset=random.randint(0, 150)) +        pages = [await self.create_company_page(co) for co in companies] + +        await ImagePaginator.paginate(pages, ctx, Embed(title="Random Game Companies")) + +    @with_role(*STAFF_ROLES) +    @games.command(name="refresh", aliases=("r",)) +    async def refresh_genres_command(self, ctx: Context) -> None: +        """Refresh .games command genres.""" +        try: +            await self._get_genres() +        except Exception as e: +            await ctx.send(f"There was error while refreshing genres: `{e}`") +            return +        await ctx.send("Successfully refreshed genres.") + +    async def get_games_list( +        self, +        amount: int, +        genre: Optional[str] = None, +        sort: Optional[str] = None, +        additional_body: str = "", +        offset: int = 0 +    ) -> list[dict[str, Any]]: +        """ +        Get list of games from IGDB API by parameters that is provided. + +        Amount param show how much games this get, genre is genre ID and at least one genre in game must this when +        provided. Sort is sorting by specific field and direction, ex. total_rating desc/asc (total_rating is field, +        desc/asc is direction). Additional_body is field where you can pass extra search parameters. Offset show start +        position in API. +        """ +        # Create body of IGDB API request, define fields, sorting, offset, limit and genre +        params = { +            "sort": f"sort {sort};" if sort else "", +            "limit": f"limit {amount};", +            "offset": f"offset {offset};" if offset else "", +            "genre": f"where genres = ({genre});" if genre else "", +            "additional": additional_body +        } +        body = GAMES_LIST_BODY.format(**params) + +        # Do request to IGDB API, create headers, URL, define body, return result +        async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: +            return await resp.json() + +    async def create_page(self, data: dict[str, Any]) -> tuple[str, str]: +        """Create content of Game Page.""" +        # Create cover image URL from template +        url = COVER_URL.format(**{"image_id": data["cover"]["image_id"] if "cover" in data else ""}) + +        # Get release date separately with checking +        release_date = dt.utcfromtimestamp(data["first_release_date"]).date() if "first_release_date" in data else "?" + +        # Create Age Ratings value +        rating = ", ".join( +            f"{AgeRatingCategories(age['category']).name} {AgeRatings(age['rating']).name}" +            for age in data["age_ratings"] +        ) if "age_ratings" in data else "?" + +        companies = [c["company"]["name"] for c in data["involved_companies"]] if "involved_companies" in data else "?" + +        # Create formatting for template page +        formatting = { +            "name": data["name"], +            "url": data["url"], +            "description": f"{data['summary']}\n\n" if "summary" in data else "\n", +            "release_date": release_date, +            "rating": round(data["total_rating"] if "total_rating" in data else 0, 2), +            "rating_count": data["total_rating_count"] if "total_rating_count" in data else "?", +            "platforms": ", ".join(platform["name"] for platform in data["platforms"]) if "platforms" in data else "?", +            "status": GameStatus(data["status"]).name if "status" in data else "?", +            "age_ratings": rating, +            "made_by": ", ".join(companies), +            "storyline": data["storyline"] if "storyline" in data else "" +        } +        page = GAME_PAGE.format(**formatting) + +        return page, url + +    async def search_games(self, search_term: str) -> list[str]: +        """Search game from IGDB API by string, return listing of pages.""" +        lines = [] + +        # Define request body of IGDB API request and do request +        body = SEARCH_BODY.format(**{"term": search_term}) + +        async with self.http_session.post(url=f"{BASE_URL}/games", data=body, headers=self.headers) as resp: +            data = await resp.json() + +        # Loop over games, format them to good format, make line and append this to total lines +        for game in data: +            formatting = { +                "name": game["name"], +                "url": game["url"], +                "rating": round(game["total_rating"] if "total_rating" in game else 0, 2), +                "rating_count": game["total_rating_count"] if "total_rating" in game else "?" +            } +            line = GAME_SEARCH_LINE.format(**formatting) +            lines.append(line) + +        return lines + +    async def get_companies_list(self, limit: int, offset: int = 0) -> list[dict[str, Any]]: +        """ +        Get random Game Companies from IGDB API. + +        Limit is parameter, that show how much movies this should return, offset show in which position should API start +        returning results. +        """ +        # Create request body from template +        body = COMPANIES_LIST_BODY.format(**{ +            "limit": limit, +            "offset": offset +        }) + +        async with self.http_session.post(url=f"{BASE_URL}/companies", data=body, headers=self.headers) as resp: +            return await resp.json() + +    async def create_company_page(self, data: dict[str, Any]) -> tuple[str, str]: +        """Create good formatted Game Company page.""" +        # Generate URL of company logo +        url = LOGO_URL.format(**{"image_id": data["logo"]["image_id"] if "logo" in data else ""}) + +        # Try to get found date of company +        founded = dt.utcfromtimestamp(data["start_date"]).date() if "start_date" in data else "?" + +        # Generate list of games, that company have developed or published +        developed = ", ".join(game["name"] for game in data["developed"]) if "developed" in data else "?" +        published = ", ".join(game["name"] for game in data["published"]) if "published" in data else "?" + +        formatting = { +            "name": data["name"], +            "url": data["url"], +            "description": f"{data['description']}\n\n" if "description" in data else "\n", +            "founded": founded, +            "developed": developed, +            "published": published +        } +        page = COMPANY_PAGE.format(**formatting) + +        return page, url + +    async def get_best_results(self, query: str) -> list[tuple[float, str]]: +        """Get best match result of genre when original genre is invalid.""" +        results = [] +        for genre in self.genres: +            ratios = [difflib.SequenceMatcher(None, query, genre).ratio()] +            for word in REGEX_NON_ALPHABET.split(genre): +                ratios.append(difflib.SequenceMatcher(None, query, word).ratio()) +            results.append((round(max(ratios), 2), genre)) +        return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] + + +def setup(bot: Bot) -> None: +    """Load the Games cog.""" +    # Check does IGDB API key exist, if not, log warning and don't load cog +    if not Tokens.igdb_client_id: +        logger.warning("No IGDB client ID. Not loading Games cog.") +        return +    if not Tokens.igdb_client_secret: +        logger.warning("No IGDB client secret. Not loading Games cog.") +        return +    bot.add_cog(Games(bot)) | 
