diff options
| author | 2021-01-10 06:15:27 +0100 | |
|---|---|---|
| committer | 2021-01-10 19:07:53 +0100 | |
| commit | 33b408d9e2cc805e2cfc6851225929c50725ea80 (patch) | |
| tree | ad0123b2b5a5d115fccf77057179597df2115eea | |
| parent | Defer import to avoid circular imports (diff) | |
Rename CachedParser to BatchParser and move it to its own module
| -rw-r--r-- | bot/exts/info/doc/__init__.py | 3 | ||||
| -rw-r--r-- | bot/exts/info/doc/_batch_parser.py | 173 | ||||
| -rw-r--r-- | bot/exts/info/doc/_cog.py | 170 |
3 files changed, 180 insertions, 166 deletions
diff --git a/bot/exts/info/doc/__init__.py b/bot/exts/info/doc/__init__.py index dff7a0269..2bb43a950 100644 --- a/bot/exts/info/doc/__init__.py +++ b/bot/exts/info/doc/__init__.py @@ -1,10 +1,13 @@ from bot.bot import Bot +from ._redis_cache import DocRedisCache MAX_SIGNATURE_AMOUNT = 3 PRIORITY_PACKAGES = ( "python", ) +doc_cache = DocRedisCache(namespace="Docs") + def setup(bot: Bot) -> None: """Load the Doc cog.""" diff --git a/bot/exts/info/doc/_batch_parser.py b/bot/exts/info/doc/_batch_parser.py new file mode 100644 index 000000000..edd6bb090 --- /dev/null +++ b/bot/exts/info/doc/_batch_parser.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from collections import defaultdict +from contextlib import suppress +from functools import partial +from operator import attrgetter +from typing import Dict, List, NamedTuple, TYPE_CHECKING, Union + +from bs4 import BeautifulSoup + +import bot +from . import doc_cache +from ._parsing import get_symbol_markdown +if TYPE_CHECKING: + from ._cog import DocItem + +log = logging.getLogger(__name__) + + +class QueueItem(NamedTuple): + """Contains a symbol and the BeautifulSoup object needed to parse it.""" + + symbol: DocItem + soup: BeautifulSoup + + def __eq__(self, other: Union[QueueItem, DocItem]): + if isinstance(other, type(self.symbol)): + return self.symbol == other + return NamedTuple.__eq__(self, other) + + +class ParseResultFuture(asyncio.Future): + """ + Future with metadata for the parser class. + + `user_requested` is set by the parser when a Future is requested by an user and moved to the front, + allowing the futures to only be waited for when clearing if they were user requested. + + `result_set_time` provides the time at which the future's result has been set, + or -inf if the result hasn't been set yet + """ + + def __init__(self): + super().__init__() + self.user_requested = False + self.result_set_time = float("inf") + + def set_result(self, result: str, /) -> None: + """Set `self.result_set_time` to current time when the result is set.""" + self.result_set_time = time.time() + super().set_result(result) + + +class BatchParser: + """ + Get the Markdown of all symbols on a page and send them to redis when a symbol is requested. + + DocItems are added through the `add_item` method which adds them to the `_page_symbols` dict. + `get_markdown` is used to fetch the Markdown; when this is used for the first time on a page, + all of the symbols are queued to be parsed to avoid multiple web requests to the same page. + """ + + def __init__(self): + self._queue: List[QueueItem] = [] + self._page_symbols: Dict[str, List[DocItem]] = defaultdict(list) + self._item_futures: Dict[DocItem, ParseResultFuture] = {} + self._parse_task = None + + self.cleanup_futures_task = bot.instance.loop.create_task(self._cleanup_futures()) + + async def get_markdown(self, doc_item: DocItem) -> str: + """ + Get the result Markdown of `doc_item`. + + If no symbols were fetched from `doc_item`s page before, + the HTML has to be fetched and then all items from the page are put into the parse queue. + + Not safe to run while `self.clear` is running. + """ + if (symbols_to_queue := self._page_symbols.get(doc_item.url)) is not None: + async with bot.instance.http_session.get(doc_item.url) as response: + soup = BeautifulSoup(await response.text(encoding="utf8"), "lxml") + + self._queue.extend(QueueItem(symbol, soup) for symbol in symbols_to_queue) + self._item_futures.update((symbol, ParseResultFuture()) for symbol in symbols_to_queue) + del self._page_symbols[doc_item.url] + log.debug(f"Added symbols from {doc_item.url} to parse queue.") + + if self._parse_task is None: + self._parse_task = asyncio.create_task(self._parse_queue()) + + with suppress(ValueError): + # If the item is not in the list then the item is already parsed or is being parsed + self._move_to_front(doc_item) + self._item_futures[doc_item].user_requested = True + return await self._item_futures[doc_item] + + async def _parse_queue(self) -> None: + """ + Parse all item from the queue, setting their result markdown on the futures and sending them to redis. + + The coroutine will run as long as the queue is not empty, resetting `self._parse_task` to None when finished. + """ + log.trace("Starting queue parsing.") + try: + while self._queue: + item, soup = self._queue.pop() + try: + if (future := self._item_futures[item]).done(): + # Some items are present in the inventories multiple times under different symbols, + # if we already parsed an equal item, we can just skip it. + continue + + markdown = await bot.instance.loop.run_in_executor( + None, + partial(get_symbol_markdown, soup, item), + ) + if markdown is not None: + await doc_cache.set(item, markdown) + except Exception as e: + log.exception(f"Unexpected error when handling {item}") + future.set_exception(e) + else: + future.set_result(markdown) + await asyncio.sleep(0.1) + finally: + self._parse_task = None + log.trace("Finished parsing queue.") + + def _move_to_front(self, item: Union[QueueItem, DocItem]) -> None: + """Move `item` to the front of the parse queue.""" + # The parse queue stores soups along with the doc symbols in QueueItem objects, + # in case we're moving a DocItem we have to get the associated QueueItem first and then move it. + item_index = self._queue.index(item) + queue_item = self._queue.pop(item_index) + + self._queue.append(queue_item) + + def add_item(self, doc_item: DocItem) -> None: + """Map a DocItem to its page so that the symbol will be parsed once the page is requested.""" + self._page_symbols[doc_item.url].append(doc_item) + + async def clear(self) -> None: + """ + Clear all internal symbol data. + + All currently requested items are waited to be parsed before clearing. + """ + for future in filter(attrgetter("user_requested"), self._item_futures.values()): + await future + if self._parse_task is not None: + self._parse_task.cancel() + self._queue.clear() + self._page_symbols.clear() + self._item_futures.clear() + + async def _cleanup_futures(self) -> None: + """ + Clear old futures from internal results. + + After a future is set, we only need to wait for old requests to its associated `DocItem` to finish + as all new requests will get the value from the redis cache in the cog first. + Keeping them around for longer than a second is unnecessary and keeps the parsed Markdown strings alive. + """ + while True: + current_time = time.time() + for key, future in self._item_futures.copy().items(): + if current_time - future.result_set_time > 5: + del self._item_futures[key] + await asyncio.sleep(5) diff --git a/bot/exts/info/doc/_cog.py b/bot/exts/info/doc/_cog.py index fd211d9f1..7a943f1a4 100644 --- a/bot/exts/info/doc/_cog.py +++ b/bot/exts/info/doc/_cog.py @@ -4,19 +4,13 @@ import asyncio import logging import re import sys -import time -from collections import defaultdict from contextlib import suppress -from functools import partial -from operator import attrgetter from types import SimpleNamespace -from typing import Dict, List, NamedTuple, Optional, Union +from typing import Dict, NamedTuple, Optional import discord -from bs4 import BeautifulSoup from discord.ext import commands -from bot import instance as bot_instance from bot.bot import Bot from bot.constants import MODERATION_ROLES, RedirectOutput from bot.converters import Inventory, PackageName, ValidURL @@ -24,10 +18,9 @@ from bot.pagination import LinePaginator from bot.utils.lock import lock from bot.utils.messages import send_denial, wait_for_deletion from bot.utils.scheduling import Scheduler -from . import PRIORITY_PACKAGES +from . import PRIORITY_PACKAGES, doc_cache +from ._batch_parser import BatchParser from ._inventory_parser import INVENTORY_DICT, fetch_inventory -from ._parsing import get_symbol_markdown -from ._redis_cache import DocRedisCache log = logging.getLogger(__name__) @@ -48,8 +41,6 @@ REFRESH_EVENT = asyncio.Event() REFRESH_EVENT.set() COMMAND_LOCK_SINGLETON = "inventory refresh" -doc_cache = DocRedisCache(namespace="Docs") - class DocItem(NamedTuple): """Holds inventory symbol information.""" @@ -66,159 +57,6 @@ class DocItem(NamedTuple): return self.base_url + self.relative_url_path -class QueueItem(NamedTuple): - """Contains a symbol and the BeautifulSoup object needed to parse it.""" - - symbol: DocItem - soup: BeautifulSoup - - def __eq__(self, other: Union[QueueItem, DocItem]): - if isinstance(other, DocItem): - return self.symbol == other - return NamedTuple.__eq__(self, other) - - -class ParseResultFuture(asyncio.Future): - """ - Future with metadata for the parser class. - - `user_requested` is set by the parser when a Future is requested by an user and moved to the front, - allowing the futures to only be waited for when clearing if they were user requested. - - `result_set_time` provides the time at which the future's result has been set, - or -inf if the result hasn't been set yet - """ - - def __init__(self): - super().__init__() - self.user_requested = False - self.result_set_time = float("inf") - - def set_result(self, result: str, /) -> None: - """Set `self.result_set_time` to current time when the result is set.""" - self.result_set_time = time.time() - super().set_result(result) - - -class CachedParser: - """ - Get the symbol Markdown from pages with smarter caching. - - DocItems are added through the `add_item` method which adds them to the `_page_symbols` dict. - `get_markdown` is used to fetch the Markdown; when this is used for the first time on a page, - all of the symbols are queued to be parsed to avoid multiple web requests to the same page. - """ - - def __init__(self): - self._queue: List[QueueItem] = [] - self._page_symbols: Dict[str, List[DocItem]] = defaultdict(list) - self._item_futures: Dict[DocItem, ParseResultFuture] = {} - self._parse_task = None - - self.cleanup_futures_task = bot_instance.loop.create_task(self._cleanup_futures()) - - async def get_markdown(self, doc_item: DocItem) -> str: - """ - Get the result Markdown of `doc_item`. - - If no symbols were fetched from `doc_item`s page before, - the HTML has to be fetched before parsing can be queued. - - Not safe to run while `self.clear` is running. - """ - if (symbols_to_queue := self._page_symbols.get(doc_item.url)) is not None: - async with bot_instance.http_session.get(doc_item.url) as response: - soup = BeautifulSoup(await response.text(encoding="utf8"), "lxml") - - self._queue.extend(QueueItem(symbol, soup) for symbol in symbols_to_queue) - self._item_futures.update((symbol, ParseResultFuture()) for symbol in symbols_to_queue) - del self._page_symbols[doc_item.url] - log.debug(f"Added symbols from {doc_item.url} to parse queue.") - - if self._parse_task is None: - self._parse_task = asyncio.create_task(self._parse_queue()) - - with suppress(ValueError): - # If the item is not in the list then the item is already parsed or is being parsed - self._move_to_front(doc_item) - self._item_futures[doc_item].user_requested = True - return await self._item_futures[doc_item] - - async def _parse_queue(self) -> None: - """ - Parse all item from the queue, setting associated events for symbols if present. - - The coroutine will run as long as the queue is not empty, resetting `self._parse_task` to None when finished. - """ - log.trace("Starting queue parsing.") - try: - while self._queue: - item, soup = self._queue.pop() - try: - if (future := self._item_futures[item]).done(): - # Some items are present in the inventories multiple times under different symbols, - # if we already parsed an equal item, we can just skip it. - continue - - markdown = await bot_instance.loop.run_in_executor( - None, - partial(get_symbol_markdown, soup, item), - ) - if markdown is not None: - await doc_cache.set(item, markdown) - except Exception as e: - log.exception(f"Unexpected error when handling {item}") - future.set_exception(e) - else: - future.set_result(markdown) - await asyncio.sleep(0.1) - finally: - self._parse_task = None - log.trace("Finished parsing queue.") - - def _move_to_front(self, item: Union[QueueItem, DocItem]) -> None: - """Move `item` to the front of the parse queue.""" - # The parse queue stores soups along with the doc symbols in QueueItem objects, - # in case we're moving a DocItem we have to get the associated QueueItem first and then move it. - item_index = self._queue.index(item) - queue_item = self._queue.pop(item_index) - - self._queue.append(queue_item) - - def add_item(self, doc_item: DocItem) -> None: - """Map a DocItem to its page so that the symbol will be parsed once the page is requested.""" - self._page_symbols[doc_item.url].append(doc_item) - - async def clear(self) -> None: - """ - Clear all internal symbol data. - - All currently requested items are waited to be parsed before clearing. - """ - for future in filter(attrgetter("user_requested"), self._item_futures.values()): - await future - if self._parse_task is not None: - self._parse_task.cancel() - self._queue.clear() - self._page_symbols.clear() - self._item_futures.clear() - - async def _cleanup_futures(self) -> None: - """ - Clear old futures from internal results. - - After a future is set, we only need to wait for old requests to its associated DocItem to finish - as all new requests will get the value from the redis cache in the cog first. - Keeping them around for longer than a second is unnecessary and keeps the parsed Markdown strings alive. - """ - while True: - current_time = time.time() - for key, future in self._item_futures.copy().items(): - if current_time - future.result_set_time > 5: - del self._item_futures[key] - await asyncio.sleep(5) - - class DocCog(commands.Cog): """A set of commands for querying & displaying documentation.""" @@ -226,7 +64,7 @@ class DocCog(commands.Cog): self.base_urls = {} self.bot = bot self.doc_symbols: Dict[str, DocItem] = {} - self.item_fetcher = CachedParser() + self.item_fetcher = BatchParser() self.renamed_symbols = set() self.inventory_scheduler = Scheduler(self.__class__.__name__) |