diff options
Diffstat (limited to '')
| -rw-r--r-- | bot/exts/info/doc.py | 37 | ||||
| -rw-r--r-- | bot/exts/utils/utils.py | 193 | ||||
| -rw-r--r-- | bot/utils/cache.py | 41 | 
3 files changed, 167 insertions, 104 deletions
| diff --git a/bot/exts/info/doc.py b/bot/exts/info/doc.py index c16a99225..7ec8caa4b 100644 --- a/bot/exts/info/doc.py +++ b/bot/exts/info/doc.py @@ -3,10 +3,9 @@ import functools  import logging  import re  import textwrap -from collections import OrderedDict  from contextlib import suppress  from types import SimpleNamespace -from typing import Any, Callable, Optional, Tuple +from typing import Optional, Tuple  import discord  from bs4 import BeautifulSoup @@ -22,6 +21,7 @@ from bot.bot import Bot  from bot.constants import MODERATION_ROLES, RedirectOutput  from bot.converters import ValidPythonIdentifier, ValidURL  from bot.pagination import LinePaginator +from bot.utils.cache import AsyncCache  from bot.utils.messages import wait_for_deletion @@ -65,34 +65,7 @@ WHITESPACE_AFTER_NEWLINES_RE = re.compile(r"(?<=\n\n)(\s+)")  FAILED_REQUEST_RETRY_AMOUNT = 3  NOT_FOUND_DELETE_DELAY = RedirectOutput.delete_delay - -def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable: -    """ -    LRU cache implementation for coroutines. - -    Once the cache exceeds the maximum size, keys are deleted in FIFO order. - -    An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. -    """ -    # Assign the cache to the function itself so we can clear it from outside. -    async_cache.cache = OrderedDict() - -    def decorator(function: Callable) -> Callable: -        """Define the async_cache decorator.""" -        @functools.wraps(function) -        async def wrapper(*args) -> Any: -            """Decorator wrapper for the caching logic.""" -            key = ':'.join(args[arg_offset:]) - -            value = async_cache.cache.get(key) -            if value is None: -                if len(async_cache.cache) > max_size: -                    async_cache.cache.popitem(last=False) - -                async_cache.cache[key] = await function(*args) -            return async_cache.cache[key] -        return wrapper -    return decorator +symbol_cache = AsyncCache()  class DocMarkdownConverter(MarkdownConverter): @@ -215,7 +188,7 @@ class Doc(commands.Cog):          self.base_urls.clear()          self.inventories.clear()          self.renamed_symbols.clear() -        async_cache.cache = OrderedDict() +        symbol_cache.clear()          # Run all coroutines concurrently - since each of them performs a HTTP          # request, this speeds up fetching the inventory data heavily. @@ -280,7 +253,7 @@ class Doc(commands.Cog):          return signatures, description.replace('ΒΆ', '') -    @async_cache(arg_offset=1) +    @symbol_cache(arg_offset=1)      async def get_symbol_embed(self, symbol: str) -> Optional[discord.Embed]:          """          Attempt to scrape and fetch the data for the given `symbol`, and build an embed from its contents. diff --git a/bot/exts/utils/utils.py b/bot/exts/utils/utils.py index 3e9230414..558d0cf72 100644 --- a/bot/exts/utils/utils.py +++ b/bot/exts/utils/utils.py @@ -2,9 +2,10 @@ import difflib  import logging  import re  import unicodedata +from datetime import datetime, timedelta  from email.parser import HeaderParser  from io import StringIO -from typing import Tuple, Union +from typing import Dict, Optional, Tuple, Union  from discord import Colour, Embed, utils  from discord.ext.commands import BadArgument, Cog, Context, clean_content, command, has_any_role @@ -14,6 +15,7 @@ from bot.constants import Channels, MODERATION_ROLES, STAFF_ROLES  from bot.decorators import in_whitelist  from bot.pagination import LinePaginator  from bot.utils import messages +from bot.utils.cache import AsyncCache  log = logging.getLogger(__name__) @@ -41,80 +43,21 @@ Namespaces are one honking great idea -- let's do more of those!  ICON_URL = "https://www.python.org/static/opengraph-icon-200x200.png" +pep_cache = AsyncCache() +  class Utils(Cog):      """A selection of utilities which don't have a clear category.""" +    BASE_PEP_URL = "http://www.python.org/dev/peps/pep-" +    BASE_GITHUB_PEP_URL = "https://raw.githubusercontent.com/python/peps/master/pep-" +    PEPS_LISTING_API_URL = "https://api.github.com/repos/python/peps/contents?ref=master" +      def __init__(self, bot: Bot):          self.bot = bot - -        self.base_pep_url = "http://www.python.org/dev/peps/pep-" -        self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-" - -    @command(name='pep', aliases=('get_pep', 'p')) -    async def pep_command(self, ctx: Context, pep_number: str) -> None: -        """Fetches information about a PEP and sends it to the channel.""" -        if pep_number.isdigit(): -            pep_number = int(pep_number) -        else: -            await ctx.send_help(ctx.command) -            return - -        # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. -        if pep_number == 0: -            return await self.send_pep_zero(ctx) - -        possible_extensions = ['.txt', '.rst'] -        found_pep = False -        for extension in possible_extensions: -            # Attempt to fetch the PEP -            pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}" -            log.trace(f"Requesting PEP {pep_number} with {pep_url}") -            response = await self.bot.http_session.get(pep_url) - -            if response.status == 200: -                log.trace("PEP found") -                found_pep = True - -                pep_content = await response.text() - -                # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 -                pep_header = HeaderParser().parse(StringIO(pep_content)) - -                # Assemble the embed -                pep_embed = Embed( -                    title=f"**PEP {pep_number} - {pep_header['Title']}**", -                    url=f"{self.base_pep_url}{pep_number:04}" -                ) - -                pep_embed.set_thumbnail(url=ICON_URL) - -                # Add the interesting information -                fields_to_check = ("Status", "Python-Version", "Created", "Type") -                for field in fields_to_check: -                    # Check for a PEP metadata field that is present but has an empty value -                    # embed field values can't contain an empty string -                    if pep_header.get(field, ""): -                        pep_embed.add_field(name=field, value=pep_header[field]) - -            elif response.status != 404: -                # any response except 200 and 404 is expected -                found_pep = True  # actually not, but it's easier to display this way -                log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " -                          f"{response.status}.\n{response.text}") - -                error_message = "Unexpected HTTP error during PEP search. Please let us know." -                pep_embed = Embed(title="Unexpected error", description=error_message) -                pep_embed.colour = Colour.red() -                break - -        if not found_pep: -            log.trace("PEP was not found") -            not_found = f"PEP {pep_number} does not exist." -            pep_embed = Embed(title="PEP not found", description=not_found) -            pep_embed.colour = Colour.red() - -        await ctx.message.channel.send(embed=pep_embed) +        self.peps: Dict[int, str] = {} +        self.last_refreshed_peps: Optional[datetime] = None +        self.bot.loop.create_task(self.refresh_peps_urls())      @command()      @in_whitelist(channels=(Channels.bot_commands,), roles=STAFF_ROLES) @@ -246,8 +189,51 @@ class Utils(Cog):          for reaction in options:              await message.add_reaction(reaction) -    async def send_pep_zero(self, ctx: Context) -> None: -        """Send information about PEP 0.""" +    # region: PEP + +    async def refresh_peps_urls(self) -> None: +        """Refresh PEP URLs listing in every 3 hours.""" +        # Wait until HTTP client is available +        await self.bot.wait_until_ready() +        log.trace("Started refreshing PEP URLs.") + +        async with self.bot.http_session.get(self.PEPS_LISTING_API_URL) as resp: +            listing = await resp.json() + +        log.trace("Got PEP URLs listing from GitHub API") + +        for file in listing: +            name = file["name"] +            if name.startswith("pep-") and name.endswith((".rst", ".txt")): +                pep_number = name.replace("pep-", "").split(".")[0] +                self.peps[int(pep_number)] = file["download_url"] + +        self.last_refreshed_peps = datetime.now() +        log.info("Successfully refreshed PEP URLs listing.") + +    @command(name='pep', aliases=('get_pep', 'p')) +    async def pep_command(self, ctx: Context, pep_number: int) -> None: +        """Fetches information about a PEP and sends it to the channel.""" +        # Trigger typing in chat to show users that bot is responding +        await ctx.trigger_typing() + +        # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. +        if pep_number == 0: +            pep_embed = self.get_pep_zero_embed() +        else: +            if not await self.validate_pep_number(ctx, pep_number): +                return + +            pep_embed = await self.get_pep_embed(ctx, pep_number) + +        if pep_embed: +            await ctx.send(embed=pep_embed) +            log.trace(f"PEP {pep_number} getting and sending finished successfully. Increasing stat.") +            self.bot.stats.incr(f"pep_fetches.{pep_number}") + +    @staticmethod +    def get_pep_zero_embed() -> Embed: +        """Get information embed about PEP 0."""          pep_embed = Embed(              title="**PEP 0 - Index of Python Enhancement Proposals (PEPs)**",              url="https://www.python.org/dev/peps/" @@ -257,7 +243,70 @@ class Utils(Cog):          pep_embed.add_field(name="Created", value="13-Jul-2000")          pep_embed.add_field(name="Type", value="Informational") -        await ctx.send(embed=pep_embed) +        return pep_embed + +    async def validate_pep_number(self, ctx: Context, pep_nr: int) -> bool: +        """Validate is PEP number valid. When it isn't, send error and return False. Otherwise return True.""" +        if ( +            pep_nr not in self.peps +            and (self.last_refreshed_peps + timedelta(minutes=30)) <= datetime.now() +            and len(str(pep_nr)) < 5 +        ): +            await self.refresh_peps_urls() + +        if pep_nr not in self.peps: +            log.trace(f"PEP {pep_nr} was not found") +            not_found = f"PEP {pep_nr} does not exist." +            await self.send_pep_error_embed(ctx, "PEP not found", not_found) +            return False + +        return True + +    def generate_pep_embed(self, pep_header: Dict, pep_nr: int) -> Embed: +        """Generate PEP embed based on PEP headers data.""" +        # Assemble the embed +        pep_embed = Embed( +            title=f"**PEP {pep_nr} - {pep_header['Title']}**", +            description=f"[Link]({self.BASE_PEP_URL}{pep_nr:04})", +        ) + +        pep_embed.set_thumbnail(url=ICON_URL) + +        # Add the interesting information +        fields_to_check = ("Status", "Python-Version", "Created", "Type") +        for field in fields_to_check: +            # Check for a PEP metadata field that is present but has an empty value +            # embed field values can't contain an empty string +            if pep_header.get(field, ""): +                pep_embed.add_field(name=field, value=pep_header[field]) + +        return pep_embed + +    @pep_cache(arg_offset=2) +    async def get_pep_embed(self, ctx: Context, pep_nr: int) -> Optional[Embed]: +        """Fetch, generate and return PEP embed. When any error occur, use `self.send_pep_error_embed`.""" +        response = await self.bot.http_session.get(self.peps[pep_nr]) + +        if response.status == 200: +            log.trace(f"PEP {pep_nr} found") +            pep_content = await response.text() + +            # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 +            pep_header = HeaderParser().parse(StringIO(pep_content)) +            return self.generate_pep_embed(pep_header, pep_nr) +        else: +            log.trace( +                f"The user requested PEP {pep_nr}, but the response had an unexpected status code: {response.status}." +            ) +            error_message = "Unexpected HTTP error during PEP search. Please let us know." +            return await self.send_pep_error_embed(ctx, "Unexpected error", error_message) + +    @staticmethod +    async def send_pep_error_embed(ctx: Context, title: str, description: str) -> None: +        """Send error PEP embed with `ctx.send`.""" +        embed = Embed(title=title, description=description, colour=Colour.red()) +        await ctx.send(embed=embed) +    # endregion  def setup(bot: Bot) -> None: diff --git a/bot/utils/cache.py b/bot/utils/cache.py new file mode 100644 index 000000000..68ce15607 --- /dev/null +++ b/bot/utils/cache.py @@ -0,0 +1,41 @@ +import functools +from collections import OrderedDict +from typing import Any, Callable + + +class AsyncCache: +    """ +    LRU cache implementation for coroutines. + +    Once the cache exceeds the maximum size, keys are deleted in FIFO order. + +    An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. +    """ + +    def __init__(self, max_size: int = 128): +        self._cache = OrderedDict() +        self._max_size = max_size + +    def __call__(self, arg_offset: int = 0) -> Callable: +        """Decorator for async cache.""" + +        def decorator(function: Callable) -> Callable: +            """Define the async cache decorator.""" + +            @functools.wraps(function) +            async def wrapper(*args) -> Any: +                """Decorator wrapper for the caching logic.""" +                key = args[arg_offset:] + +                if key not in self._cache: +                    if len(self._cache) > self._max_size: +                        self._cache.popitem(last=False) + +                    self._cache[key] = await function(*args) +                return self._cache[key] +            return wrapper +        return decorator + +    def clear(self) -> None: +        """Clear cache instance.""" +        self._cache.clear() | 
