aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Senjan21 <[email protected]>2020-11-05 15:34:16 +0100
committerGravatar GitHub <[email protected]>2020-11-05 15:34:16 +0100
commited55198787188fb1c70f4d88cde2c9fbc04e390a (patch)
treefb69e82de29cf2aae3ffa693de87cf24801d1a0d
parentMerge pull request #1265 from python-discord/voicegate/activity-blocks (diff)
parentFix argument offset (diff)
Merge pull request #901 from ks129/pep-improvisations
PEP command improvements
-rw-r--r--bot/exts/info/doc.py37
-rw-r--r--bot/exts/utils/utils.py194
-rw-r--r--bot/utils/cache.py41
3 files changed, 168 insertions, 104 deletions
diff --git a/bot/exts/info/doc.py b/bot/exts/info/doc.py
index c16a99225..7ec8caa4b 100644
--- a/bot/exts/info/doc.py
+++ b/bot/exts/info/doc.py
@@ -3,10 +3,9 @@ import functools
import logging
import re
import textwrap
-from collections import OrderedDict
from contextlib import suppress
from types import SimpleNamespace
-from typing import Any, Callable, Optional, Tuple
+from typing import Optional, Tuple
import discord
from bs4 import BeautifulSoup
@@ -22,6 +21,7 @@ from bot.bot import Bot
from bot.constants import MODERATION_ROLES, RedirectOutput
from bot.converters import ValidPythonIdentifier, ValidURL
from bot.pagination import LinePaginator
+from bot.utils.cache import AsyncCache
from bot.utils.messages import wait_for_deletion
@@ -65,34 +65,7 @@ WHITESPACE_AFTER_NEWLINES_RE = re.compile(r"(?<=\n\n)(\s+)")
FAILED_REQUEST_RETRY_AMOUNT = 3
NOT_FOUND_DELETE_DELAY = RedirectOutput.delete_delay
-
-def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable:
- """
- LRU cache implementation for coroutines.
-
- Once the cache exceeds the maximum size, keys are deleted in FIFO order.
-
- An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key.
- """
- # Assign the cache to the function itself so we can clear it from outside.
- async_cache.cache = OrderedDict()
-
- def decorator(function: Callable) -> Callable:
- """Define the async_cache decorator."""
- @functools.wraps(function)
- async def wrapper(*args) -> Any:
- """Decorator wrapper for the caching logic."""
- key = ':'.join(args[arg_offset:])
-
- value = async_cache.cache.get(key)
- if value is None:
- if len(async_cache.cache) > max_size:
- async_cache.cache.popitem(last=False)
-
- async_cache.cache[key] = await function(*args)
- return async_cache.cache[key]
- return wrapper
- return decorator
+symbol_cache = AsyncCache()
class DocMarkdownConverter(MarkdownConverter):
@@ -215,7 +188,7 @@ class Doc(commands.Cog):
self.base_urls.clear()
self.inventories.clear()
self.renamed_symbols.clear()
- async_cache.cache = OrderedDict()
+ symbol_cache.clear()
# Run all coroutines concurrently - since each of them performs a HTTP
# request, this speeds up fetching the inventory data heavily.
@@ -280,7 +253,7 @@ class Doc(commands.Cog):
return signatures, description.replace('ΒΆ', '')
- @async_cache(arg_offset=1)
+ @symbol_cache(arg_offset=1)
async def get_symbol_embed(self, symbol: str) -> Optional[discord.Embed]:
"""
Attempt to scrape and fetch the data for the given `symbol`, and build an embed from its contents.
diff --git a/bot/exts/utils/utils.py b/bot/exts/utils/utils.py
index 3e9230414..6d8d98695 100644
--- a/bot/exts/utils/utils.py
+++ b/bot/exts/utils/utils.py
@@ -2,9 +2,10 @@ import difflib
import logging
import re
import unicodedata
+from datetime import datetime, timedelta
from email.parser import HeaderParser
from io import StringIO
-from typing import Tuple, Union
+from typing import Dict, Optional, Tuple, Union
from discord import Colour, Embed, utils
from discord.ext.commands import BadArgument, Cog, Context, clean_content, command, has_any_role
@@ -14,6 +15,7 @@ from bot.constants import Channels, MODERATION_ROLES, STAFF_ROLES
from bot.decorators import in_whitelist
from bot.pagination import LinePaginator
from bot.utils import messages
+from bot.utils.cache import AsyncCache
log = logging.getLogger(__name__)
@@ -41,80 +43,21 @@ Namespaces are one honking great idea -- let's do more of those!
ICON_URL = "https://www.python.org/static/opengraph-icon-200x200.png"
+pep_cache = AsyncCache()
+
class Utils(Cog):
"""A selection of utilities which don't have a clear category."""
+ BASE_PEP_URL = "http://www.python.org/dev/peps/pep-"
+ BASE_GITHUB_PEP_URL = "https://raw.githubusercontent.com/python/peps/master/pep-"
+ PEPS_LISTING_API_URL = "https://api.github.com/repos/python/peps/contents?ref=master"
+
def __init__(self, bot: Bot):
self.bot = bot
-
- self.base_pep_url = "http://www.python.org/dev/peps/pep-"
- self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-"
-
- @command(name='pep', aliases=('get_pep', 'p'))
- async def pep_command(self, ctx: Context, pep_number: str) -> None:
- """Fetches information about a PEP and sends it to the channel."""
- if pep_number.isdigit():
- pep_number = int(pep_number)
- else:
- await ctx.send_help(ctx.command)
- return
-
- # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs.
- if pep_number == 0:
- return await self.send_pep_zero(ctx)
-
- possible_extensions = ['.txt', '.rst']
- found_pep = False
- for extension in possible_extensions:
- # Attempt to fetch the PEP
- pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}"
- log.trace(f"Requesting PEP {pep_number} with {pep_url}")
- response = await self.bot.http_session.get(pep_url)
-
- if response.status == 200:
- log.trace("PEP found")
- found_pep = True
-
- pep_content = await response.text()
-
- # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
- pep_header = HeaderParser().parse(StringIO(pep_content))
-
- # Assemble the embed
- pep_embed = Embed(
- title=f"**PEP {pep_number} - {pep_header['Title']}**",
- url=f"{self.base_pep_url}{pep_number:04}"
- )
-
- pep_embed.set_thumbnail(url=ICON_URL)
-
- # Add the interesting information
- fields_to_check = ("Status", "Python-Version", "Created", "Type")
- for field in fields_to_check:
- # Check for a PEP metadata field that is present but has an empty value
- # embed field values can't contain an empty string
- if pep_header.get(field, ""):
- pep_embed.add_field(name=field, value=pep_header[field])
-
- elif response.status != 404:
- # any response except 200 and 404 is expected
- found_pep = True # actually not, but it's easier to display this way
- log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: "
- f"{response.status}.\n{response.text}")
-
- error_message = "Unexpected HTTP error during PEP search. Please let us know."
- pep_embed = Embed(title="Unexpected error", description=error_message)
- pep_embed.colour = Colour.red()
- break
-
- if not found_pep:
- log.trace("PEP was not found")
- not_found = f"PEP {pep_number} does not exist."
- pep_embed = Embed(title="PEP not found", description=not_found)
- pep_embed.colour = Colour.red()
-
- await ctx.message.channel.send(embed=pep_embed)
+ self.peps: Dict[int, str] = {}
+ self.last_refreshed_peps: Optional[datetime] = None
+ self.bot.loop.create_task(self.refresh_peps_urls())
@command()
@in_whitelist(channels=(Channels.bot_commands,), roles=STAFF_ROLES)
@@ -246,8 +189,53 @@ class Utils(Cog):
for reaction in options:
await message.add_reaction(reaction)
- async def send_pep_zero(self, ctx: Context) -> None:
- """Send information about PEP 0."""
+ # region: PEP
+
+ async def refresh_peps_urls(self) -> None:
+ """Refresh PEP URLs listing in every 3 hours."""
+ # Wait until HTTP client is available
+ await self.bot.wait_until_ready()
+ log.trace("Started refreshing PEP URLs.")
+
+ async with self.bot.http_session.get(self.PEPS_LISTING_API_URL) as resp:
+ listing = await resp.json()
+
+ log.trace("Got PEP URLs listing from GitHub API")
+
+ for file in listing:
+ name = file["name"]
+ if name.startswith("pep-") and name.endswith((".rst", ".txt")):
+ pep_number = name.replace("pep-", "").split(".")[0]
+ self.peps[int(pep_number)] = file["download_url"]
+
+ self.last_refreshed_peps = datetime.now()
+ log.info("Successfully refreshed PEP URLs listing.")
+
+ @command(name='pep', aliases=('get_pep', 'p'))
+ async def pep_command(self, ctx: Context, pep_number: int) -> None:
+ """Fetches information about a PEP and sends it to the channel."""
+ # Trigger typing in chat to show users that bot is responding
+ await ctx.trigger_typing()
+
+ # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs.
+ if pep_number == 0:
+ pep_embed = self.get_pep_zero_embed()
+ success = True
+ else:
+ success = False
+ if not (pep_embed := await self.validate_pep_number(pep_number)):
+ pep_embed, success = await self.get_pep_embed(pep_number)
+
+ await ctx.send(embed=pep_embed)
+ if success:
+ log.trace(f"PEP {pep_number} getting and sending finished successfully. Increasing stat.")
+ self.bot.stats.incr(f"pep_fetches.{pep_number}")
+ else:
+ log.trace(f"Getting PEP {pep_number} failed. Error embed sent.")
+
+ @staticmethod
+ def get_pep_zero_embed() -> Embed:
+ """Get information embed about PEP 0."""
pep_embed = Embed(
title="**PEP 0 - Index of Python Enhancement Proposals (PEPs)**",
url="https://www.python.org/dev/peps/"
@@ -257,7 +245,69 @@ class Utils(Cog):
pep_embed.add_field(name="Created", value="13-Jul-2000")
pep_embed.add_field(name="Type", value="Informational")
- await ctx.send(embed=pep_embed)
+ return pep_embed
+
+ async def validate_pep_number(self, pep_nr: int) -> Optional[Embed]:
+ """Validate is PEP number valid. When it isn't, return error embed, otherwise None."""
+ if (
+ pep_nr not in self.peps
+ and (self.last_refreshed_peps + timedelta(minutes=30)) <= datetime.now()
+ and len(str(pep_nr)) < 5
+ ):
+ await self.refresh_peps_urls()
+
+ if pep_nr not in self.peps:
+ log.trace(f"PEP {pep_nr} was not found")
+ return Embed(
+ title="PEP not found",
+ description=f"PEP {pep_nr} does not exist.",
+ colour=Colour.red()
+ )
+
+ return None
+
+ def generate_pep_embed(self, pep_header: Dict, pep_nr: int) -> Embed:
+ """Generate PEP embed based on PEP headers data."""
+ # Assemble the embed
+ pep_embed = Embed(
+ title=f"**PEP {pep_nr} - {pep_header['Title']}**",
+ description=f"[Link]({self.BASE_PEP_URL}{pep_nr:04})",
+ )
+
+ pep_embed.set_thumbnail(url=ICON_URL)
+
+ # Add the interesting information
+ fields_to_check = ("Status", "Python-Version", "Created", "Type")
+ for field in fields_to_check:
+ # Check for a PEP metadata field that is present but has an empty value
+ # embed field values can't contain an empty string
+ if pep_header.get(field, ""):
+ pep_embed.add_field(name=field, value=pep_header[field])
+
+ return pep_embed
+
+ @pep_cache(arg_offset=1)
+ async def get_pep_embed(self, pep_nr: int) -> Tuple[Embed, bool]:
+ """Fetch, generate and return PEP embed. Second item of return tuple show does getting success."""
+ response = await self.bot.http_session.get(self.peps[pep_nr])
+
+ if response.status == 200:
+ log.trace(f"PEP {pep_nr} found")
+ pep_content = await response.text()
+
+ # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
+ pep_header = HeaderParser().parse(StringIO(pep_content))
+ return self.generate_pep_embed(pep_header, pep_nr), True
+ else:
+ log.trace(
+ f"The user requested PEP {pep_nr}, but the response had an unexpected status code: {response.status}."
+ )
+ return Embed(
+ title="Unexpected error",
+ description="Unexpected HTTP error during PEP search. Please let us know.",
+ colour=Colour.red()
+ ), False
+ # endregion
def setup(bot: Bot) -> None:
diff --git a/bot/utils/cache.py b/bot/utils/cache.py
new file mode 100644
index 000000000..68ce15607
--- /dev/null
+++ b/bot/utils/cache.py
@@ -0,0 +1,41 @@
+import functools
+from collections import OrderedDict
+from typing import Any, Callable
+
+
+class AsyncCache:
+ """
+ LRU cache implementation for coroutines.
+
+ Once the cache exceeds the maximum size, keys are deleted in FIFO order.
+
+ An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key.
+ """
+
+ def __init__(self, max_size: int = 128):
+ self._cache = OrderedDict()
+ self._max_size = max_size
+
+ def __call__(self, arg_offset: int = 0) -> Callable:
+ """Decorator for async cache."""
+
+ def decorator(function: Callable) -> Callable:
+ """Define the async cache decorator."""
+
+ @functools.wraps(function)
+ async def wrapper(*args) -> Any:
+ """Decorator wrapper for the caching logic."""
+ key = args[arg_offset:]
+
+ if key not in self._cache:
+ if len(self._cache) > self._max_size:
+ self._cache.popitem(last=False)
+
+ self._cache[key] = await function(*args)
+ return self._cache[key]
+ return wrapper
+ return decorator
+
+ def clear(self) -> None:
+ """Clear cache instance."""
+ self._cache.clear()