From cf00aff24d20a57c2c9178d6d9e30f5d33d9a426 Mon Sep 17 00:00:00 2001 From: Numerlor <25886452+Numerlor@users.noreply.github.com> Date: Tue, 15 Dec 2020 00:30:17 +0100 Subject: Create futures for all items in the queue Creating futures for everything and then awaiting at the end takes care of all the potential race conditions that may pop up from items that are parsed and sent to redis while the get_markdown method is in the middle of fetching a page. In case it happens with the implementation we'll just need to move the item to the front and the future will get a result set soon afterwards. --- bot/exts/info/doc/_cog.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/bot/exts/info/doc/_cog.py b/bot/exts/info/doc/_cog.py index 6c51ab738..0d344c363 100644 --- a/bot/exts/info/doc/_cog.py +++ b/bot/exts/info/doc/_cog.py @@ -7,6 +7,7 @@ import sys from collections import defaultdict from contextlib import suppress from functools import partial +from operator import attrgetter from types import SimpleNamespace from typing import Dict, List, NamedTuple, Optional, Union @@ -78,6 +79,14 @@ class QueueItem(NamedTuple): return NamedTuple.__eq__(self, other) +class ParseResultFuture(asyncio.Future): + """Future with the user_requested attribute to know which futures need to be waited for before clearing.""" + + def __init__(self): + super().__init__() + self.user_requested = False + + class CachedParser: """ Get the symbol Markdown from pages with smarter caching. @@ -90,7 +99,7 @@ class CachedParser: def __init__(self): self._queue: List[QueueItem] = [] self._page_symbols: Dict[str, List[DocItem]] = defaultdict(list) - self._item_futures: Dict[DocItem, asyncio.Future] = {} + self._item_futures: Dict[DocItem, ParseResultFuture] = {} self._parse_task = None async def get_markdown(self, doc_item: DocItem) -> str: @@ -99,21 +108,25 @@ class CachedParser: If no symbols were fetched from `doc_item`s page before, the HTML has to be fetched before parsing can be queued. + + Not safe to run while `self.clear` is running. """ if (symbols_to_queue := self._page_symbols.get(doc_item.url)) is not None: async with bot_instance.http_session.get(doc_item.url) as response: soup = BeautifulSoup(await response.text(encoding="utf8"), "lxml") self._queue.extend(QueueItem(symbol, soup) for symbol in symbols_to_queue) + self._item_futures.update((symbol, ParseResultFuture()) for symbol in symbols_to_queue) del self._page_symbols[doc_item.url] log.debug(f"Added symbols from {doc_item.url} to parse queue.") if self._parse_task is None: self._parse_task = asyncio.create_task(self._parse_queue()) - self._move_to_front(doc_item) - if doc_item not in self._item_futures: - self._item_futures[doc_item] = bot_instance.loop.create_future() + with suppress(ValueError): + # If the item is not in the list then the item is already parsed or is being parsed + self._move_to_front(doc_item) + self._item_futures[doc_item].user_requested = True return await self._item_futures[doc_item] async def _parse_queue(self) -> None: @@ -161,7 +174,7 @@ class CachedParser: All currently requested items are waited to be parsed before clearing. """ - for future in self._item_futures.values(): + for future in filter(attrgetter("user_requested"), self._item_futures.values()): await future if self._parse_task is not None: self._parse_task.cancel() -- cgit v1.2.3