diff options
| -rw-r--r-- | bot/cogs/doc.py | 32 | ||||
| -rw-r--r-- | bot/cogs/utils.py | 166 | ||||
| -rw-r--r-- | bot/utils/cache.py | 31 |
3 files changed, 131 insertions, 98 deletions
diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py index 204cffb37..ff60fc80a 100644 --- a/bot/cogs/doc.py +++ b/bot/cogs/doc.py @@ -6,7 +6,7 @@ import textwrap from collections import OrderedDict from contextlib import suppress from types import SimpleNamespace -from typing import Any, Callable, Optional, Tuple +from typing import Optional, Tuple import discord from bs4 import BeautifulSoup @@ -23,6 +23,7 @@ from bot.constants import MODERATION_ROLES, RedirectOutput from bot.converters import ValidPythonIdentifier, ValidURL from bot.decorators import with_role from bot.pagination import LinePaginator +from bot.utils.cache import async_cache log = logging.getLogger(__name__) @@ -66,35 +67,6 @@ FAILED_REQUEST_RETRY_AMOUNT = 3 NOT_FOUND_DELETE_DELAY = RedirectOutput.delete_delay -def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable: - """ - LRU cache implementation for coroutines. - - Once the cache exceeds the maximum size, keys are deleted in FIFO order. - - An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. - """ - # Assign the cache to the function itself so we can clear it from outside. - async_cache.cache = OrderedDict() - - def decorator(function: Callable) -> Callable: - """Define the async_cache decorator.""" - @functools.wraps(function) - async def wrapper(*args) -> Any: - """Decorator wrapper for the caching logic.""" - key = ':'.join(args[arg_offset:]) - - value = async_cache.cache.get(key) - if value is None: - if len(async_cache.cache) > max_size: - async_cache.cache.popitem(last=False) - - async_cache.cache[key] = await function(*args) - return async_cache.cache[key] - return wrapper - return decorator - - class DocMarkdownConverter(MarkdownConverter): """Subclass markdownify's MarkdownCoverter to provide custom conversion methods.""" diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py index 6b59d37c8..73337f012 100644 --- a/bot/cogs/utils.py +++ b/bot/cogs/utils.py @@ -2,9 +2,10 @@ import difflib import logging import re import unicodedata +from datetime import datetime, timedelta from email.parser import HeaderParser from io import StringIO -from typing import Tuple, Union +from typing import Dict, Optional, Tuple, Union from discord import Colour, Embed from discord.ext.commands import BadArgument, Cog, Context, command @@ -12,6 +13,7 @@ from discord.ext.commands import BadArgument, Cog, Context, command from bot.bot import Bot from bot.constants import Channels, MODERATION_ROLES, STAFF_ROLES from bot.decorators import in_whitelist, with_role +from bot.utils.cache import async_cache log = logging.getLogger(__name__) @@ -48,71 +50,11 @@ class Utils(Cog): self.base_pep_url = "http://www.python.org/dev/peps/pep-" self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-" + self.peps_listing_api_url = "https://api.github.com/repos/python/peps/contents?ref=master" - @command(name='pep', aliases=('get_pep', 'p')) - async def pep_command(self, ctx: Context, pep_number: str) -> None: - """Fetches information about a PEP and sends it to the channel.""" - if pep_number.isdigit(): - pep_number = int(pep_number) - else: - await ctx.send_help(ctx.command) - return - - # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. - if pep_number == 0: - return await self.send_pep_zero(ctx) - - possible_extensions = ['.txt', '.rst'] - found_pep = False - for extension in possible_extensions: - # Attempt to fetch the PEP - pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}" - log.trace(f"Requesting PEP {pep_number} with {pep_url}") - response = await self.bot.http_session.get(pep_url) - - if response.status == 200: - log.trace("PEP found") - found_pep = True - - pep_content = await response.text() - - # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 - pep_header = HeaderParser().parse(StringIO(pep_content)) - - # Assemble the embed - pep_embed = Embed( - title=f"**PEP {pep_number} - {pep_header['Title']}**", - description=f"[Link]({self.base_pep_url}{pep_number:04})", - ) - - pep_embed.set_thumbnail(url=ICON_URL) - - # Add the interesting information - fields_to_check = ("Status", "Python-Version", "Created", "Type") - for field in fields_to_check: - # Check for a PEP metadata field that is present but has an empty value - # embed field values can't contain an empty string - if pep_header.get(field, ""): - pep_embed.add_field(name=field, value=pep_header[field]) - - elif response.status != 404: - # any response except 200 and 404 is expected - found_pep = True # actually not, but it's easier to display this way - log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " - f"{response.status}.\n{response.text}") - - error_message = "Unexpected HTTP error during PEP search. Please let us know." - pep_embed = Embed(title="Unexpected error", description=error_message) - pep_embed.colour = Colour.red() - break - - if not found_pep: - log.trace("PEP was not found") - not_found = f"PEP {pep_number} does not exist." - pep_embed = Embed(title="PEP not found", description=not_found) - pep_embed.colour = Colour.red() - - await ctx.message.channel.send(embed=pep_embed) + self.peps: Dict[int, str] = {} + self.last_refreshed_peps: Optional[datetime] = None + self.bot.loop.create_task(self.refresh_peps_urls()) @command() @in_whitelist(channels=(Channels.bot_commands,), roles=STAFF_ROLES) @@ -250,8 +192,48 @@ class Utils(Cog): for reaction in options: await message.add_reaction(reaction) - async def send_pep_zero(self, ctx: Context) -> None: - """Send information about PEP 0.""" + # PEPs area + + async def refresh_peps_urls(self) -> None: + """Refresh PEP URLs listing in every 3 hours.""" + # Wait until HTTP client is available + await self.bot.wait_until_ready() + log.trace("Started refreshing PEP URLs.") + + async with self.bot.http_session.get(self.peps_listing_api_url) as resp: + listing = await resp.json() + + log.trace("Got PEP URLs listing from GitHub API") + + for file in listing: + name = file["name"] + if name.startswith("pep-") and name.endswith((".rst", ".txt")): + pep_number = name.replace("pep-", "").split(".")[0] + self.peps[int(pep_number)] = file["download_url"] + + self.last_refreshed_peps = datetime.now() + log.info("Successfully refreshed PEP URLs listing.") + + @command(name='pep', aliases=('get_pep', 'p')) + async def pep_command(self, ctx: Context, pep_number: int) -> None: + """Fetches information about a PEP and sends it to the channel.""" + # Trigger typing in chat to show users that bot is responding + await ctx.trigger_typing() + + # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. + if pep_number == 0: + pep_embed = self.get_pep_zero_embed() + else: + pep_embed = await self.get_pep_embed(ctx, pep_number) + + if pep_embed: + await ctx.send(embed=pep_embed) + log.trace(f"PEP {pep_number} getting and sending finished successfully. Increasing stat.") + self.bot.stats.incr(f"pep_fetches.{pep_number}") + + @staticmethod + def get_pep_zero_embed() -> Embed: + """Get information embed about PEP 0.""" pep_embed = Embed( title=f"**PEP 0 - Index of Python Enhancement Proposals (PEPs)**", description=f"[Link](https://www.python.org/dev/peps/)" @@ -261,7 +243,55 @@ class Utils(Cog): pep_embed.add_field(name="Created", value="13-Jul-2000") pep_embed.add_field(name="Type", value="Informational") - await ctx.send(embed=pep_embed) + return pep_embed + + @async_cache(arg_offset=2) + async def get_pep_embed(self, ctx: Context, pep_nr: int) -> Optional[Embed]: + """Fetch, generate and return PEP embed.""" + if pep_nr not in self.peps and (self.last_refreshed_peps + timedelta(minutes=30)) <= datetime.now(): + await self.refresh_peps_urls() + + if pep_nr not in self.peps: + log.trace(f"PEP {pep_nr} was not found") + not_found = f"PEP {pep_nr} does not exist." + embed = Embed(title="PEP not found", description=not_found, colour=Colour.red()) + await ctx.send(embed=embed) + return + + response = await self.bot.http_session.get(self.peps[pep_nr]) + + if response.status == 200: + log.trace(f"PEP {pep_nr} found") + pep_content = await response.text() + + # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 + pep_header = HeaderParser().parse(StringIO(pep_content)) + + # Assemble the embed + pep_embed = Embed( + title=f"**PEP {pep_nr} - {pep_header['Title']}**", + description=f"[Link]({self.base_pep_url}{pep_nr:04})", + ) + + pep_embed.set_thumbnail(url=ICON_URL) + + # Add the interesting information + fields_to_check = ("Status", "Python-Version", "Created", "Type") + for field in fields_to_check: + # Check for a PEP metadata field that is present but has an empty value + # embed field values can't contain an empty string + if pep_header.get(field, ""): + pep_embed.add_field(name=field, value=pep_header[field]) + return pep_embed + else: + log.trace( + f"The user requested PEP {pep_nr}, but the response had an unexpected status code: {response.status}." + ) + + error_message = "Unexpected HTTP error during PEP search. Please let us know." + embed = Embed(title="Unexpected error", description=error_message, colour=Colour.red()) + await ctx.send(embed=embed) + return def setup(bot: Bot) -> None: diff --git a/bot/utils/cache.py b/bot/utils/cache.py new file mode 100644 index 000000000..96e1aef95 --- /dev/null +++ b/bot/utils/cache.py @@ -0,0 +1,31 @@ +import functools +from collections import OrderedDict +from typing import Any, Callable + + +def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable: + """ + LRU cache implementation for coroutines. + + Once the cache exceeds the maximum size, keys are deleted in FIFO order. + + An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. + """ + # Assign the cache to the function itself so we can clear it from outside. + async_cache.cache = OrderedDict() + + def decorator(function: Callable) -> Callable: + """Define the async_cache decorator.""" + @functools.wraps(function) + async def wrapper(*args) -> Any: + """Decorator wrapper for the caching logic.""" + key = ':'.join(str(args[arg_offset:])) + + if key not in async_cache.cache: + if len(async_cache.cache) > max_size: + async_cache.cache.popitem(last=False) + + async_cache.cache[key] = await function(*args) + return async_cache.cache[key] + return wrapper + return decorator |