diff options
Diffstat (limited to '')
| -rw-r--r-- | bot/cogs/doc.py | 32 | ||||
| -rw-r--r-- | bot/cogs/utils.py | 166 | ||||
| -rw-r--r-- | bot/utils/cache.py | 31 | 
3 files changed, 131 insertions, 98 deletions
diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py index 204cffb37..ff60fc80a 100644 --- a/bot/cogs/doc.py +++ b/bot/cogs/doc.py @@ -6,7 +6,7 @@ import textwrap  from collections import OrderedDict  from contextlib import suppress  from types import SimpleNamespace -from typing import Any, Callable, Optional, Tuple +from typing import Optional, Tuple  import discord  from bs4 import BeautifulSoup @@ -23,6 +23,7 @@ from bot.constants import MODERATION_ROLES, RedirectOutput  from bot.converters import ValidPythonIdentifier, ValidURL  from bot.decorators import with_role  from bot.pagination import LinePaginator +from bot.utils.cache import async_cache  log = logging.getLogger(__name__) @@ -66,35 +67,6 @@ FAILED_REQUEST_RETRY_AMOUNT = 3  NOT_FOUND_DELETE_DELAY = RedirectOutput.delete_delay -def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable: -    """ -    LRU cache implementation for coroutines. - -    Once the cache exceeds the maximum size, keys are deleted in FIFO order. - -    An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. -    """ -    # Assign the cache to the function itself so we can clear it from outside. -    async_cache.cache = OrderedDict() - -    def decorator(function: Callable) -> Callable: -        """Define the async_cache decorator.""" -        @functools.wraps(function) -        async def wrapper(*args) -> Any: -            """Decorator wrapper for the caching logic.""" -            key = ':'.join(args[arg_offset:]) - -            value = async_cache.cache.get(key) -            if value is None: -                if len(async_cache.cache) > max_size: -                    async_cache.cache.popitem(last=False) - -                async_cache.cache[key] = await function(*args) -            return async_cache.cache[key] -        return wrapper -    return decorator - -  class DocMarkdownConverter(MarkdownConverter):      """Subclass markdownify's MarkdownCoverter to provide custom conversion methods.""" diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py index 6b59d37c8..73337f012 100644 --- a/bot/cogs/utils.py +++ b/bot/cogs/utils.py @@ -2,9 +2,10 @@ import difflib  import logging  import re  import unicodedata +from datetime import datetime, timedelta  from email.parser import HeaderParser  from io import StringIO -from typing import Tuple, Union +from typing import Dict, Optional, Tuple, Union  from discord import Colour, Embed  from discord.ext.commands import BadArgument, Cog, Context, command @@ -12,6 +13,7 @@ from discord.ext.commands import BadArgument, Cog, Context, command  from bot.bot import Bot  from bot.constants import Channels, MODERATION_ROLES, STAFF_ROLES  from bot.decorators import in_whitelist, with_role +from bot.utils.cache import async_cache  log = logging.getLogger(__name__) @@ -48,71 +50,11 @@ class Utils(Cog):          self.base_pep_url = "http://www.python.org/dev/peps/pep-"          self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-" +        self.peps_listing_api_url = "https://api.github.com/repos/python/peps/contents?ref=master" -    @command(name='pep', aliases=('get_pep', 'p')) -    async def pep_command(self, ctx: Context, pep_number: str) -> None: -        """Fetches information about a PEP and sends it to the channel.""" -        if pep_number.isdigit(): -            pep_number = int(pep_number) -        else: -            await ctx.send_help(ctx.command) -            return - -        # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. -        if pep_number == 0: -            return await self.send_pep_zero(ctx) - -        possible_extensions = ['.txt', '.rst'] -        found_pep = False -        for extension in possible_extensions: -            # Attempt to fetch the PEP -            pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}" -            log.trace(f"Requesting PEP {pep_number} with {pep_url}") -            response = await self.bot.http_session.get(pep_url) - -            if response.status == 200: -                log.trace("PEP found") -                found_pep = True - -                pep_content = await response.text() - -                # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 -                pep_header = HeaderParser().parse(StringIO(pep_content)) - -                # Assemble the embed -                pep_embed = Embed( -                    title=f"**PEP {pep_number} - {pep_header['Title']}**", -                    description=f"[Link]({self.base_pep_url}{pep_number:04})", -                ) - -                pep_embed.set_thumbnail(url=ICON_URL) - -                # Add the interesting information -                fields_to_check = ("Status", "Python-Version", "Created", "Type") -                for field in fields_to_check: -                    # Check for a PEP metadata field that is present but has an empty value -                    # embed field values can't contain an empty string -                    if pep_header.get(field, ""): -                        pep_embed.add_field(name=field, value=pep_header[field]) - -            elif response.status != 404: -                # any response except 200 and 404 is expected -                found_pep = True  # actually not, but it's easier to display this way -                log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " -                          f"{response.status}.\n{response.text}") - -                error_message = "Unexpected HTTP error during PEP search. Please let us know." -                pep_embed = Embed(title="Unexpected error", description=error_message) -                pep_embed.colour = Colour.red() -                break - -        if not found_pep: -            log.trace("PEP was not found") -            not_found = f"PEP {pep_number} does not exist." -            pep_embed = Embed(title="PEP not found", description=not_found) -            pep_embed.colour = Colour.red() - -        await ctx.message.channel.send(embed=pep_embed) +        self.peps: Dict[int, str] = {} +        self.last_refreshed_peps: Optional[datetime] = None +        self.bot.loop.create_task(self.refresh_peps_urls())      @command()      @in_whitelist(channels=(Channels.bot_commands,), roles=STAFF_ROLES) @@ -250,8 +192,48 @@ class Utils(Cog):          for reaction in options:              await message.add_reaction(reaction) -    async def send_pep_zero(self, ctx: Context) -> None: -        """Send information about PEP 0.""" +    # PEPs area + +    async def refresh_peps_urls(self) -> None: +        """Refresh PEP URLs listing in every 3 hours.""" +        # Wait until HTTP client is available +        await self.bot.wait_until_ready() +        log.trace("Started refreshing PEP URLs.") + +        async with self.bot.http_session.get(self.peps_listing_api_url) as resp: +            listing = await resp.json() + +        log.trace("Got PEP URLs listing from GitHub API") + +        for file in listing: +            name = file["name"] +            if name.startswith("pep-") and name.endswith((".rst", ".txt")): +                pep_number = name.replace("pep-", "").split(".")[0] +                self.peps[int(pep_number)] = file["download_url"] + +        self.last_refreshed_peps = datetime.now() +        log.info("Successfully refreshed PEP URLs listing.") + +    @command(name='pep', aliases=('get_pep', 'p')) +    async def pep_command(self, ctx: Context, pep_number: int) -> None: +        """Fetches information about a PEP and sends it to the channel.""" +        # Trigger typing in chat to show users that bot is responding +        await ctx.trigger_typing() + +        # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. +        if pep_number == 0: +            pep_embed = self.get_pep_zero_embed() +        else: +            pep_embed = await self.get_pep_embed(ctx, pep_number) + +        if pep_embed: +            await ctx.send(embed=pep_embed) +            log.trace(f"PEP {pep_number} getting and sending finished successfully. Increasing stat.") +            self.bot.stats.incr(f"pep_fetches.{pep_number}") + +    @staticmethod +    def get_pep_zero_embed() -> Embed: +        """Get information embed about PEP 0."""          pep_embed = Embed(              title=f"**PEP 0 - Index of Python Enhancement Proposals (PEPs)**",              description=f"[Link](https://www.python.org/dev/peps/)" @@ -261,7 +243,55 @@ class Utils(Cog):          pep_embed.add_field(name="Created", value="13-Jul-2000")          pep_embed.add_field(name="Type", value="Informational") -        await ctx.send(embed=pep_embed) +        return pep_embed + +    @async_cache(arg_offset=2) +    async def get_pep_embed(self, ctx: Context, pep_nr: int) -> Optional[Embed]: +        """Fetch, generate and return PEP embed.""" +        if pep_nr not in self.peps and (self.last_refreshed_peps + timedelta(minutes=30)) <= datetime.now(): +            await self.refresh_peps_urls() + +        if pep_nr not in self.peps: +            log.trace(f"PEP {pep_nr} was not found") +            not_found = f"PEP {pep_nr} does not exist." +            embed = Embed(title="PEP not found", description=not_found, colour=Colour.red()) +            await ctx.send(embed=embed) +            return + +        response = await self.bot.http_session.get(self.peps[pep_nr]) + +        if response.status == 200: +            log.trace(f"PEP {pep_nr} found") +            pep_content = await response.text() + +            # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 +            pep_header = HeaderParser().parse(StringIO(pep_content)) + +            # Assemble the embed +            pep_embed = Embed( +                title=f"**PEP {pep_nr} - {pep_header['Title']}**", +                description=f"[Link]({self.base_pep_url}{pep_nr:04})", +            ) + +            pep_embed.set_thumbnail(url=ICON_URL) + +            # Add the interesting information +            fields_to_check = ("Status", "Python-Version", "Created", "Type") +            for field in fields_to_check: +                # Check for a PEP metadata field that is present but has an empty value +                # embed field values can't contain an empty string +                if pep_header.get(field, ""): +                    pep_embed.add_field(name=field, value=pep_header[field]) +            return pep_embed +        else: +            log.trace( +                f"The user requested PEP {pep_nr}, but the response had an unexpected status code: {response.status}." +            ) + +            error_message = "Unexpected HTTP error during PEP search. Please let us know." +            embed = Embed(title="Unexpected error", description=error_message, colour=Colour.red()) +            await ctx.send(embed=embed) +            return  def setup(bot: Bot) -> None: diff --git a/bot/utils/cache.py b/bot/utils/cache.py new file mode 100644 index 000000000..96e1aef95 --- /dev/null +++ b/bot/utils/cache.py @@ -0,0 +1,31 @@ +import functools +from collections import OrderedDict +from typing import Any, Callable + + +def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable: +    """ +    LRU cache implementation for coroutines. + +    Once the cache exceeds the maximum size, keys are deleted in FIFO order. + +    An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. +    """ +    # Assign the cache to the function itself so we can clear it from outside. +    async_cache.cache = OrderedDict() + +    def decorator(function: Callable) -> Callable: +        """Define the async_cache decorator.""" +        @functools.wraps(function) +        async def wrapper(*args) -> Any: +            """Decorator wrapper for the caching logic.""" +            key = ':'.join(str(args[arg_offset:])) + +            if key not in async_cache.cache: +                if len(async_cache.cache) > max_size: +                    async_cache.cache.popitem(last=False) + +                async_cache.cache[key] = await function(*args) +            return async_cache.cache[key] +        return wrapper +    return decorator  |