aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/cogs/doc.py32
-rw-r--r--bot/cogs/utils.py172
-rw-r--r--bot/utils/cache.py32
3 files changed, 130 insertions, 106 deletions
diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py
index 204cffb37..ff60fc80a 100644
--- a/bot/cogs/doc.py
+++ b/bot/cogs/doc.py
@@ -6,7 +6,7 @@ import textwrap
from collections import OrderedDict
from contextlib import suppress
from types import SimpleNamespace
-from typing import Any, Callable, Optional, Tuple
+from typing import Optional, Tuple
import discord
from bs4 import BeautifulSoup
@@ -23,6 +23,7 @@ from bot.constants import MODERATION_ROLES, RedirectOutput
from bot.converters import ValidPythonIdentifier, ValidURL
from bot.decorators import with_role
from bot.pagination import LinePaginator
+from bot.utils.cache import async_cache
log = logging.getLogger(__name__)
@@ -66,35 +67,6 @@ FAILED_REQUEST_RETRY_AMOUNT = 3
NOT_FOUND_DELETE_DELAY = RedirectOutput.delete_delay
-def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable:
- """
- LRU cache implementation for coroutines.
-
- Once the cache exceeds the maximum size, keys are deleted in FIFO order.
-
- An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key.
- """
- # Assign the cache to the function itself so we can clear it from outside.
- async_cache.cache = OrderedDict()
-
- def decorator(function: Callable) -> Callable:
- """Define the async_cache decorator."""
- @functools.wraps(function)
- async def wrapper(*args) -> Any:
- """Decorator wrapper for the caching logic."""
- key = ':'.join(args[arg_offset:])
-
- value = async_cache.cache.get(key)
- if value is None:
- if len(async_cache.cache) > max_size:
- async_cache.cache.popitem(last=False)
-
- async_cache.cache[key] = await function(*args)
- return async_cache.cache[key]
- return wrapper
- return decorator
-
-
class DocMarkdownConverter(MarkdownConverter):
"""Subclass markdownify's MarkdownCoverter to provide custom conversion methods."""
diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py
index 89d556f58..0b1436f3a 100644
--- a/bot/cogs/utils.py
+++ b/bot/cogs/utils.py
@@ -4,14 +4,16 @@ import re
import unicodedata
from email.parser import HeaderParser
from io import StringIO
-from typing import Tuple, Union
+from typing import Dict, Tuple, Union
from discord import Colour, Embed
from discord.ext.commands import BadArgument, Cog, Context, command
+from discord.ext.tasks import loop
from bot.bot import Bot
from bot.constants import Channels, MODERATION_ROLES, STAFF_ROLES
-from bot.decorators import in_whitelist, with_role
+from bot.decorators import in_channel, with_role
+from bot.utils.cache import async_cache
log = logging.getLogger(__name__)
@@ -40,6 +42,20 @@ Namespaces are one honking great idea -- let's do more of those!
ICON_URL = "https://www.python.org/static/opengraph-icon-200x200.png"
+def get_pep_zero_embed() -> Embed:
+ """Send information about PEP 0."""
+ pep_embed = Embed(
+ title=f"**PEP 0 - Index of Python Enhancement Proposals (PEPs)**",
+ description=f"[Link](https://www.python.org/dev/peps/)"
+ )
+ pep_embed.set_thumbnail(url=ICON_URL)
+ pep_embed.add_field(name="Status", value="Active")
+ pep_embed.add_field(name="Created", value="13-Jul-2000")
+ pep_embed.add_field(name="Type", value="Informational")
+
+ return pep_embed
+
+
class Utils(Cog):
"""A selection of utilities which don't have a clear category."""
@@ -48,71 +64,10 @@ class Utils(Cog):
self.base_pep_url = "http://www.python.org/dev/peps/pep-"
self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-"
+ self.peps_listing_api_url = "https://api.github.com/repos/python/peps/contents?ref=master"
- @command(name='pep', aliases=('get_pep', 'p'))
- async def pep_command(self, ctx: Context, pep_number: str) -> None:
- """Fetches information about a PEP and sends it to the channel."""
- if pep_number.isdigit():
- pep_number = int(pep_number)
- else:
- await ctx.invoke(self.bot.get_command("help"), "pep")
- return
-
- # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs.
- if pep_number == 0:
- return await self.send_pep_zero(ctx)
-
- possible_extensions = ['.txt', '.rst']
- found_pep = False
- for extension in possible_extensions:
- # Attempt to fetch the PEP
- pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}"
- log.trace(f"Requesting PEP {pep_number} with {pep_url}")
- response = await self.bot.http_session.get(pep_url)
-
- if response.status == 200:
- log.trace("PEP found")
- found_pep = True
-
- pep_content = await response.text()
-
- # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
- pep_header = HeaderParser().parse(StringIO(pep_content))
-
- # Assemble the embed
- pep_embed = Embed(
- title=f"**PEP {pep_number} - {pep_header['Title']}**",
- description=f"[Link]({self.base_pep_url}{pep_number:04})",
- )
-
- pep_embed.set_thumbnail(url=ICON_URL)
-
- # Add the interesting information
- fields_to_check = ("Status", "Python-Version", "Created", "Type")
- for field in fields_to_check:
- # Check for a PEP metadata field that is present but has an empty value
- # embed field values can't contain an empty string
- if pep_header.get(field, ""):
- pep_embed.add_field(name=field, value=pep_header[field])
-
- elif response.status != 404:
- # any response except 200 and 404 is expected
- found_pep = True # actually not, but it's easier to display this way
- log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: "
- f"{response.status}.\n{response.text}")
-
- error_message = "Unexpected HTTP error during PEP search. Please let us know."
- pep_embed = Embed(title="Unexpected error", description=error_message)
- pep_embed.colour = Colour.red()
- break
-
- if not found_pep:
- log.trace("PEP was not found")
- not_found = f"PEP {pep_number} does not exist."
- pep_embed = Embed(title="PEP not found", description=not_found)
- pep_embed.colour = Colour.red()
-
- await ctx.message.channel.send(embed=pep_embed)
+ self.peps: Dict[int, str] = {}
+ self.refresh_peps_urls.start()
@command()
@in_whitelist(channels=(Channels.bot_commands,), roles=STAFF_ROLES)
@@ -250,19 +205,84 @@ class Utils(Cog):
for reaction in options:
await message.add_reaction(reaction)
- async def send_pep_zero(self, ctx: Context) -> None:
- """Send information about PEP 0."""
- pep_embed = Embed(
- title=f"**PEP 0 - Index of Python Enhancement Proposals (PEPs)**",
- description=f"[Link](https://www.python.org/dev/peps/)"
- )
- pep_embed.set_thumbnail(url=ICON_URL)
- pep_embed.add_field(name="Status", value="Active")
- pep_embed.add_field(name="Created", value="13-Jul-2000")
- pep_embed.add_field(name="Type", value="Informational")
+ # PEPs area
+
+ @loop(hours=3)
+ async def refresh_peps_urls(self) -> None:
+ """Refresh PEP URLs listing in every 3 hours."""
+ # Wait until HTTP client is available
+ await self.bot.wait_until_ready()
+ log.trace("Started refreshing PEP URLs.")
+
+ async with self.bot.http_session.get(self.peps_listing_api_url) as resp:
+ listing = await resp.json()
+ log.trace("Got PEP URLs listing from GitHub API")
+
+ for file in listing:
+ name = file["name"]
+ name: str
+ if name.startswith("pep-") and name.endswith((".rst", ".txt")):
+ pep_number = name.replace("pep-", "").split(".")[0]
+ self.peps[int(pep_number)] = file["download_url"]
+ log.info("Successfully refreshed PEP URLs listing.")
+
+ @command(name='pep', aliases=('get_pep', 'p'))
+ async def pep_command(self, ctx: Context, pep_number: int) -> None:
+ """Fetches information about a PEP and sends it to the channel."""
+ # Trigger typing in chat to show users that bot is responding
+ await ctx.trigger_typing()
+ # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs.
+ if pep_number == 0:
+ pep_embed = get_pep_zero_embed()
+ success = True
+ else:
+ pep_embed, success = await self.get_pep_embed(pep_number)
await ctx.send(embed=pep_embed)
+ if success:
+ log.trace(f"PEP {pep_number} getting and sending finished successfully. Increasing stat.")
+ self.bot.stats.incr(f"pep_fetches.{pep_number}")
+
+ @async_cache(arg_offset=1)
+ async def get_pep_embed(self, pep_nr: int) -> Tuple[Embed, bool]:
+ """Fetch, generate and return PEP embed. Implement `async_cache`."""
+ if pep_nr not in self.peps:
+ log.trace(f"PEP {pep_nr} was not found")
+ not_found = f"PEP {pep_nr} does not exist."
+ return Embed(title="PEP not found", description=not_found, colour=Colour.red()), False
+ response = await self.bot.http_session.get(self.peps[pep_nr])
+
+ if response.status == 200:
+ log.trace(f"PEP {pep_nr} found")
+ pep_content = await response.text()
+
+ # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
+ pep_header = HeaderParser().parse(StringIO(pep_content))
+
+ # Assemble the embed
+ pep_embed = Embed(
+ title=f"**PEP {pep_nr} - {pep_header['Title']}**",
+ description=f"[Link]({self.base_pep_url}{pep_nr:04})",
+ )
+
+ pep_embed.set_thumbnail(url=ICON_URL)
+
+ # Add the interesting information
+ fields_to_check = ("Status", "Python-Version", "Created", "Type")
+ for field in fields_to_check:
+ # Check for a PEP metadata field that is present but has an empty value
+ # embed field values can't contain an empty string
+ if pep_header.get(field, ""):
+ pep_embed.add_field(name=field, value=pep_header[field])
+ return pep_embed, True
+ else:
+ log.trace(f"The user requested PEP {pep_nr}, but the response had an unexpected status code: "
+ f"{response.status}.\n{response.text}")
+
+ error_message = "Unexpected HTTP error during PEP search. Please let us know."
+ return Embed(title="Unexpected error", description=error_message, colour=Colour.red()), False
+
def setup(bot: Bot) -> None:
"""Load the Utils cog."""
diff --git a/bot/utils/cache.py b/bot/utils/cache.py
new file mode 100644
index 000000000..338924df8
--- /dev/null
+++ b/bot/utils/cache.py
@@ -0,0 +1,32 @@
+import functools
+from collections import OrderedDict
+from typing import Any, Callable
+
+
+def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable:
+ """
+ LRU cache implementation for coroutines.
+
+ Once the cache exceeds the maximum size, keys are deleted in FIFO order.
+
+ An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key.
+ """
+ # Assign the cache to the function itself so we can clear it from outside.
+ async_cache.cache = OrderedDict()
+
+ def decorator(function: Callable) -> Callable:
+ """Define the async_cache decorator."""
+ @functools.wraps(function)
+ async def wrapper(*args) -> Any:
+ """Decorator wrapper for the caching logic."""
+ key = ':'.join(str(args[arg_offset:]))
+
+ value = async_cache.cache.get(key)
+ if value is None:
+ if len(async_cache.cache) > max_size:
+ async_cache.cache.popitem(last=False)
+
+ async_cache.cache[key] = await function(*args)
+ return async_cache.cache[key]
+ return wrapper
+ return decorator