diff options
-rw-r--r-- | bot/exts/help_channels/_channel.py | 42 | ||||
-rw-r--r-- | bot/exts/help_channels/_cog.py | 255 | ||||
-rw-r--r-- | bot/exts/help_channels/_stats.py | 42 | ||||
-rw-r--r-- | bot/log.py | 3 | ||||
-rw-r--r-- | bot/utils/lock.py | 62 | ||||
-rw-r--r-- | bot/utils/scheduling.py | 17 |
6 files changed, 228 insertions, 193 deletions
diff --git a/bot/exts/help_channels/_channel.py b/bot/exts/help_channels/_channel.py index e717d7af8..224214b00 100644 --- a/bot/exts/help_channels/_channel.py +++ b/bot/exts/help_channels/_channel.py @@ -4,8 +4,10 @@ from datetime import datetime, timedelta import discord +import bot from bot import constants from bot.exts.help_channels import _caches, _message +from bot.utils.channel import try_get_channel log = logging.getLogger(__name__) @@ -55,3 +57,43 @@ async def get_in_use_time(channel_id: int) -> t.Optional[timedelta]: def is_excluded_channel(channel: discord.abc.GuildChannel) -> bool: """Check if a channel should be excluded from the help channel system.""" return not isinstance(channel, discord.TextChannel) or channel.id in EXCLUDED_CHANNELS + + +async def move_to_bottom(channel: discord.TextChannel, category_id: int, **options) -> None: + """ + Move the `channel` to the bottom position of `category` and edit channel attributes. + + To ensure "stable sorting", we use the `bulk_channel_update` endpoint and provide the current + positions of the other channels in the category as-is. This should make sure that the channel + really ends up at the bottom of the category. + + If `options` are provided, the channel will be edited after the move is completed. This is the + same order of operations that `discord.TextChannel.edit` uses. For information on available + options, see the documentation on `discord.TextChannel.edit`. While possible, position-related + options should be avoided, as it may interfere with the category move we perform. + """ + # Get a fresh copy of the category from the bot to avoid the cache mismatch issue we had. + category = await try_get_channel(category_id) + + payload = [{"id": c.id, "position": c.position} for c in category.channels] + + # Calculate the bottom position based on the current highest position in the category. If the + # category is currently empty, we simply use the current position of the channel to avoid making + # unnecessary changes to positions in the guild. + bottom_position = payload[-1]["position"] + 1 if payload else channel.position + + payload.append( + { + "id": channel.id, + "position": bottom_position, + "parent_id": category.id, + "lock_permissions": True, + } + ) + + # We use d.py's method to ensure our request is processed by d.py's rate limit manager + await bot.instance.http.bulk_channel_update(category.guild.id, payload) + + # Now that the channel is moved, we can edit the other attributes + if options: + await channel.edit(**options) diff --git a/bot/exts/help_channels/_cog.py b/bot/exts/help_channels/_cog.py index 983c5d183..0995c8a79 100644 --- a/bot/exts/help_channels/_cog.py +++ b/bot/exts/help_channels/_cog.py @@ -3,6 +3,7 @@ import logging import random import typing as t from datetime import datetime, timezone +from operator import attrgetter import discord import discord.abc @@ -10,12 +11,12 @@ from discord.ext import commands from bot import constants from bot.bot import Bot -from bot.exts.help_channels import _caches, _channel, _cooldown, _message, _name -from bot.utils import channel as channel_utils -from bot.utils.scheduling import Scheduler +from bot.exts.help_channels import _caches, _channel, _cooldown, _message, _name, _stats +from bot.utils import channel as channel_utils, lock, scheduling log = logging.getLogger(__name__) +NAMESPACE = "help" HELP_CHANNEL_TOPIC = """ This is a Python help channel. You can claim your own help channel in the Python Help: Available category. """ @@ -58,7 +59,7 @@ class HelpChannels(commands.Cog): def __init__(self, bot: Bot): self.bot = bot - self.scheduler = Scheduler(self.__class__.__name__) + self.scheduler = scheduling.Scheduler(self.__class__.__name__) # Categories self.available_category: discord.CategoryChannel = None @@ -73,7 +74,6 @@ class HelpChannels(commands.Cog): # Asyncio stuff self.queue_tasks: t.List[asyncio.Task] = [] - self.on_message_lock = asyncio.Lock() self.init_task = self.bot.loop.create_task(self.init_cog()) def cog_unload(self) -> None: @@ -87,6 +87,36 @@ class HelpChannels(commands.Cog): self.scheduler.cancel_all() + @lock.lock_arg(NAMESPACE, "message", attrgetter("channel.id")) + @lock.lock_arg(NAMESPACE, "message", attrgetter("author.id")) + @lock.lock_arg(f"{NAMESPACE}.unclaim", "message", attrgetter("author.id"), wait=True) + async def claim_channel(self, message: discord.Message) -> None: + """ + Claim the channel in which the question `message` was sent. + + Move the channel to the In Use category and pin the `message`. Add a cooldown to the + claimant to prevent them from asking another question. Lastly, make a new channel available. + """ + log.info(f"Channel #{message.channel} was claimed by `{message.author.id}`.") + await self.move_to_in_use(message.channel) + await _cooldown.revoke_send_permissions(message.author, self.scheduler) + + await _message.pin(message) + + # Add user with channel for dormant check. + await _caches.claimants.set(message.channel.id, message.author.id) + + self.bot.stats.incr("help.claimed") + + # Must use a timezone-aware datetime to ensure a correct POSIX timestamp. + timestamp = datetime.now(timezone.utc).timestamp() + await _caches.claim_times.set(message.channel.id, timestamp) + + await _caches.unanswered.set(message.channel.id, True) + + # Not awaited because it may indefinitely hold the lock while waiting for a channel. + scheduling.create_task(self.move_to_available(), name=f"help_claim_{message.id}") + def create_channel_queue(self) -> asyncio.Queue: """ Return a queue of dormant channels to use for getting the next available channel. @@ -124,8 +154,12 @@ class HelpChannels(commands.Cog): log.debug(f"Creating a new dormant channel named {name}.") return await self.dormant_category.create_text_channel(name, topic=HELP_CHANNEL_TOPIC) - async def dormant_check(self, ctx: commands.Context) -> bool: - """Return True if the user is the help channel claimant or passes the role check.""" + async def close_check(self, ctx: commands.Context) -> bool: + """Return True if the channel is in use and the user is the claimant or has a whitelisted role.""" + if ctx.channel.category != self.in_use_category: + log.debug(f"{ctx.author} invoked command 'close' outside an in-use help channel") + return False + if await _caches.claimants.get(ctx.channel.id) == ctx.author.id: log.trace(f"{ctx.author} is the help channel claimant, passing the check for dormant.") self.bot.stats.incr("help.dormant_invoke.claimant") @@ -144,18 +178,12 @@ class HelpChannels(commands.Cog): """ Make the current in-use help channel dormant. - Make the channel dormant if the user passes the `dormant_check`, - delete the message that invoked this. + May only be invoked by the channel's claimant or by staff. """ - log.trace("close command invoked; checking if the channel is in-use.") - - if ctx.channel.category != self.in_use_category: - log.debug(f"{ctx.author} invoked command 'dormant' outside an in-use help channel") - return - - if await self.dormant_check(ctx): - await self.move_to_dormant(ctx.channel, "command") - self.scheduler.cancel(ctx.channel.id) + # Don't use a discord.py check because the check needs to fail silently. + if await self.close_check(ctx): + log.info(f"Close command invoked by {ctx.author} in #{ctx.channel}.") + await self.unclaim_channel(ctx.channel, is_auto=False) async def get_available_candidate(self) -> discord.TextChannel: """ @@ -201,7 +229,7 @@ class HelpChannels(commands.Cog): elif missing < 0: log.trace(f"Moving {abs(missing)} superfluous available channels over to the Dormant category.") for channel in channels[:abs(missing)]: - await self.move_to_dormant(channel, "auto") + await self.unclaim_channel(channel) async def init_categories(self) -> None: """Get the help category objects. Remove the cog if retrieval fails.""" @@ -248,20 +276,10 @@ class HelpChannels(commands.Cog): self.close_command.enabled = True await self.init_available() - self.report_stats() + _stats.report_counts() log.info("Cog is ready!") - def report_stats(self) -> None: - """Report the channel count stats.""" - total_in_use = sum(1 for _ in _channel.get_category_channels(self.in_use_category)) - total_available = sum(1 for _ in _channel.get_category_channels(self.available_category)) - total_dormant = sum(1 for _ in _channel.get_category_channels(self.dormant_category)) - - self.bot.stats.gauge("help.total.in_use", total_in_use) - self.bot.stats.gauge("help.total.available", total_available) - self.bot.stats.gauge("help.total.dormant", total_dormant) - async def move_idle_channel(self, channel: discord.TextChannel, has_task: bool = True) -> None: """ Make the `channel` dormant if idle or schedule the move if still active. @@ -284,7 +302,7 @@ class HelpChannels(commands.Cog): f"and will be made dormant." ) - await self.move_to_dormant(channel, "auto") + await self.unclaim_channel(channel) else: # Cancel the existing task, if any. if has_task: @@ -298,45 +316,6 @@ class HelpChannels(commands.Cog): self.scheduler.schedule_later(delay, channel.id, self.move_idle_channel(channel)) - async def move_to_bottom_position(self, channel: discord.TextChannel, category_id: int, **options) -> None: - """ - Move the `channel` to the bottom position of `category` and edit channel attributes. - - To ensure "stable sorting", we use the `bulk_channel_update` endpoint and provide the current - positions of the other channels in the category as-is. This should make sure that the channel - really ends up at the bottom of the category. - - If `options` are provided, the channel will be edited after the move is completed. This is the - same order of operations that `discord.TextChannel.edit` uses. For information on available - options, see the documentation on `discord.TextChannel.edit`. While possible, position-related - options should be avoided, as it may interfere with the category move we perform. - """ - # Get a fresh copy of the category from the bot to avoid the cache mismatch issue we had. - category = await channel_utils.try_get_channel(category_id) - - payload = [{"id": c.id, "position": c.position} for c in category.channels] - - # Calculate the bottom position based on the current highest position in the category. If the - # category is currently empty, we simply use the current position of the channel to avoid making - # unnecessary changes to positions in the guild. - bottom_position = payload[-1]["position"] + 1 if payload else channel.position - - payload.append( - { - "id": channel.id, - "position": bottom_position, - "parent_id": category.id, - "lock_permissions": True, - } - ) - - # We use d.py's method to ensure our request is processed by d.py's rate limit manager - await self.bot.http.bulk_channel_update(category.guild.id, payload) - - # Now that the channel is moved, we can edit the other attributes - if options: - await channel.edit(**options) - async def move_to_available(self) -> None: """Make a channel available.""" log.trace("Making a channel available.") @@ -348,78 +327,81 @@ class HelpChannels(commands.Cog): log.trace(f"Moving #{channel} ({channel.id}) to the Available category.") - await self.move_to_bottom_position( + await _channel.move_to_bottom( channel=channel, category_id=constants.Categories.help_available, ) - self.report_stats() - - async def move_to_dormant(self, channel: discord.TextChannel, caller: str) -> None: - """ - Make the `channel` dormant. + _stats.report_counts() - A caller argument is provided for metrics. - """ + async def move_to_dormant(self, channel: discord.TextChannel) -> None: + """Make the `channel` dormant.""" log.info(f"Moving #{channel} ({channel.id}) to the Dormant category.") - - await self.move_to_bottom_position( + await _channel.move_to_bottom( channel=channel, category_id=constants.Categories.help_dormant, ) - await self.unclaim_channel(channel) - - self.bot.stats.incr(f"help.dormant_calls.{caller}") - - in_use_time = await _channel.get_in_use_time(channel.id) - if in_use_time: - self.bot.stats.timing("help.in_use_time", in_use_time) - - unanswered = await _caches.unanswered.get(channel.id) - if unanswered: - self.bot.stats.incr("help.sessions.unanswered") - elif unanswered is not None: - self.bot.stats.incr("help.sessions.answered") - - log.trace(f"Position of #{channel} ({channel.id}) is actually {channel.position}.") log.trace(f"Sending dormant message for #{channel} ({channel.id}).") embed = discord.Embed(description=_message.DORMANT_MSG) await channel.send(embed=embed) - await _message.unpin(channel) - log.trace(f"Pushing #{channel} ({channel.id}) into the channel queue.") self.channel_queue.put_nowait(channel) - self.report_stats() - async def unclaim_channel(self, channel: discord.TextChannel) -> None: + _stats.report_counts() + + @lock.lock_arg(f"{NAMESPACE}.unclaim", "channel") + async def unclaim_channel(self, channel: discord.TextChannel, *, is_auto: bool = True) -> None: """ - Mark the channel as unclaimed and remove the cooldown role from the claimant if needed. + Unclaim an in-use help `channel` to make it dormant. + + Unpin the claimant's question message and move the channel to the Dormant category. + Remove the cooldown role from the channel claimant if they have no other channels claimed. + Cancel the scheduled cooldown role removal task. - The role is only removed if they have no claimed channels left once the current one is unclaimed. - This method also handles canceling the automatic removal of the cooldown role. + Set `is_auto` to True if the channel was automatically closed or False if manually closed. """ - claimant_id = await _caches.claimants.pop(channel.id) + claimant_id = await _caches.claimants.get(channel.id) + _unclaim_channel = self._unclaim_channel + + # It could be possible that there is no claimant cached. In such case, it'd be useless and + # possibly incorrect to lock on None. Therefore, the lock is applied conditionally. + if claimant_id is not None: + decorator = lock.lock_arg(f"{NAMESPACE}.unclaim", "claimant_id", wait=True) + _unclaim_channel = decorator(_unclaim_channel) + + return await _unclaim_channel(channel, claimant_id, is_auto) - # Ignore missing task when cooldown has passed but the channel still isn't dormant. + async def _unclaim_channel(self, channel: discord.TextChannel, claimant_id: int, is_auto: bool) -> None: + """Actual implementation of `unclaim_channel`. See that for full documentation.""" + await _caches.claimants.delete(channel.id) + + # Ignore missing tasks because a channel may still be dormant after the cooldown expires. if claimant_id in self.scheduler: self.scheduler.cancel(claimant_id) claimant = self.bot.get_guild(constants.Guild.id).get_member(claimant_id) if claimant is None: log.info(f"{claimant_id} left the guild during their help session; the cooldown role won't be removed") - return - - # Remove the cooldown role if the claimant has no other channels left - if not any(claimant.id == user_id for _, user_id in await _caches.claimants.items()): + elif not any(claimant.id == user_id for _, user_id in await _caches.claimants.items()): + # Remove the cooldown role if the claimant has no other channels left await _cooldown.remove_cooldown_role(claimant) + await _message.unpin(channel) + await _stats.report_complete_session(channel.id, is_auto) + await self.move_to_dormant(channel) + + # Cancel the task that makes the channel dormant only if called by the close command. + # In other cases, the task is either already done or not-existent. + if not is_auto: + self.scheduler.cancel(channel.id) + async def move_to_in_use(self, channel: discord.TextChannel) -> None: """Make a channel in-use and schedule it to be made dormant.""" log.info(f"Moving #{channel} ({channel.id}) to the In Use category.") - await self.move_to_bottom_position( + await _channel.move_to_bottom( channel=channel, category_id=constants.Categories.help_in_use, ) @@ -428,7 +410,7 @@ class HelpChannels(commands.Cog): log.trace(f"Scheduling #{channel} ({channel.id}) to become dormant in {timeout} sec.") self.scheduler.schedule_later(timeout, channel.id, self.move_idle_channel(channel)) - self.report_stats() + _stats.report_counts() @commands.Cog.listener() async def on_message(self, message: discord.Message) -> None: @@ -436,51 +418,13 @@ class HelpChannels(commands.Cog): if message.author.bot: return # Ignore messages sent by bots. - channel = message.channel - - await _message.check_for_answer(message) - - is_available = channel_utils.is_in_category(channel, constants.Categories.help_available) - if not is_available or _channel.is_excluded_channel(channel): - return # Ignore messages outside the Available category or in excluded channels. - - log.trace("Waiting for the cog to be ready before processing messages.") await self.init_task - log.trace("Acquiring lock to prevent a channel from being processed twice...") - async with self.on_message_lock: - log.trace(f"on_message lock acquired for {message.id}.") - - if not channel_utils.is_in_category(channel, constants.Categories.help_available): - log.debug( - f"Message {message.id} will not make #{channel} ({channel.id}) in-use " - f"because another message in the channel already triggered that." - ) - return - - log.info(f"Channel #{channel} was claimed by `{message.author.id}`.") - await self.move_to_in_use(channel) - await _cooldown.revoke_send_permissions(message.author, self.scheduler) - - await _message.pin(message) - - # Add user with channel for dormant check. - await _caches.claimants.set(channel.id, message.author.id) - - self.bot.stats.incr("help.claimed") - - # Must use a timezone-aware datetime to ensure a correct POSIX timestamp. - timestamp = datetime.now(timezone.utc).timestamp() - await _caches.claim_times.set(channel.id, timestamp) - - await _caches.unanswered.set(channel.id, True) - - log.trace(f"Releasing on_message lock for {message.id}.") - - # Move a dormant channel to the Available category to fill in the gap. - # This is done last and outside the lock because it may wait indefinitely for a channel to - # be put in the queue. - await self.move_to_available() + if channel_utils.is_in_category(message.channel, constants.Categories.help_available): + if not _channel.is_excluded_channel(message.channel): + await self.claim_channel(message) + else: + await _message.check_for_answer(message) @commands.Cog.listener() async def on_message_delete(self, msg: discord.Message) -> None: @@ -489,15 +433,14 @@ class HelpChannels(commands.Cog): The new time for the dormant task is configured with `HelpChannels.deleted_idle_minutes`. """ + await self.init_task + if not channel_utils.is_in_category(msg.channel, constants.Categories.help_in_use): return if not await _message.is_empty(msg.channel): return - log.trace("Waiting for the cog to be ready before processing deleted messages.") - await self.init_task - log.info(f"Claimant of #{msg.channel} ({msg.author}) deleted message, channel is empty now. Rescheduling task.") # Cancel existing dormant task before scheduling new. diff --git a/bot/exts/help_channels/_stats.py b/bot/exts/help_channels/_stats.py new file mode 100644 index 000000000..b8778e7d9 --- /dev/null +++ b/bot/exts/help_channels/_stats.py @@ -0,0 +1,42 @@ +import logging + +from more_itertools import ilen + +import bot +from bot import constants +from bot.exts.help_channels import _caches, _channel + +log = logging.getLogger(__name__) + + +def report_counts() -> None: + """Report channel count stats of each help category.""" + for name in ("in_use", "available", "dormant"): + id_ = getattr(constants.Categories, f"help_{name}") + category = bot.instance.get_channel(id_) + + if category: + total = ilen(_channel.get_category_channels(category)) + bot.instance.stats.gauge(f"help.total.{name}", total) + else: + log.warning(f"Couldn't find category {name!r} to track channel count stats.") + + +async def report_complete_session(channel_id: int, is_auto: bool) -> None: + """ + Report stats for a completed help session channel `channel_id`. + + Set `is_auto` to True if the channel was automatically closed or False if manually closed. + """ + caller = "auto" if is_auto else "command" + bot.instance.stats.incr(f"help.dormant_calls.{caller}") + + in_use_time = await _channel.get_in_use_time(channel_id) + if in_use_time: + bot.instance.stats.timing("help.in_use_time", in_use_time) + + unanswered = await _caches.unanswered.get(channel_id) + if unanswered: + bot.instance.stats.incr("help.sessions.unanswered") + elif unanswered is not None: + bot.instance.stats.incr("help.sessions.answered") diff --git a/bot/log.py b/bot/log.py index 0935666d1..e92233a33 100644 --- a/bot/log.py +++ b/bot/log.py @@ -54,6 +54,9 @@ def setup() -> None: logging.getLogger("chardet").setLevel(logging.WARNING) logging.getLogger("async_rediscache").setLevel(logging.WARNING) + # Set back to the default of INFO even if asyncio's debug mode is enabled. + logging.getLogger("asyncio").setLevel(logging.INFO) + def setup_sentry() -> None: """Set up the Sentry logging integrations.""" diff --git a/bot/utils/lock.py b/bot/utils/lock.py index 7aaafbc88..e44776340 100644 --- a/bot/utils/lock.py +++ b/bot/utils/lock.py @@ -1,3 +1,4 @@ +import asyncio import inspect import logging from collections import defaultdict @@ -16,39 +17,21 @@ _IdCallable = Callable[[function.BoundArgs], _IdCallableReturn] ResourceId = Union[Hashable, _IdCallable] -class LockGuard: - """ - A context manager which acquires and releases a lock (mutex). - - Raise RuntimeError if trying to acquire a locked lock. - """ - - def __init__(self): - self._locked = False - - @property - def locked(self) -> bool: - """Return True if currently locked or False if unlocked.""" - return self._locked - - def __enter__(self): - if self._locked: - raise RuntimeError("Cannot acquire a locked lock.") - - self._locked = True - - def __exit__(self, _exc_type, _exc_value, _traceback): # noqa: ANN001 - self._locked = False - return False # Indicate any raised exception shouldn't be suppressed. - - -def lock(namespace: Hashable, resource_id: ResourceId, *, raise_error: bool = False) -> Callable: +def lock( + namespace: Hashable, + resource_id: ResourceId, + *, + raise_error: bool = False, + wait: bool = False, +) -> Callable: """ Turn the decorated coroutine function into a mutually exclusive operation on a `resource_id`. - If any other mutually exclusive function currently holds the lock for a resource, do not run the - decorated function and return None. If `raise_error` is True, raise `LockedResourceError` if - the lock cannot be acquired. + If `wait` is True, wait until the lock becomes available. Otherwise, if any other mutually + exclusive function currently holds the lock for a resource, do not run the decorated function + and return None. + + If `raise_error` is True, raise `LockedResourceError` if the lock cannot be acquired. `namespace` is an identifier used to prevent collisions among resource IDs. @@ -78,15 +61,19 @@ def lock(namespace: Hashable, resource_id: ResourceId, *, raise_error: bool = Fa else: id_ = resource_id - log.trace(f"{name}: getting lock for resource {id_!r} under namespace {namespace!r}") + log.trace(f"{name}: getting the lock object for resource {namespace!r}:{id_!r}") # Get the lock for the ID. Create a lock if one doesn't exist yet. locks = __lock_dicts[namespace] - lock_guard = locks.setdefault(id_, LockGuard()) - - if not lock_guard.locked: - log.debug(f"{name}: resource {namespace!r}:{id_!r} is free; acquiring it...") - with lock_guard: + lock_ = locks.setdefault(id_, asyncio.Lock()) + + # It's safe to check an asyncio.Lock is free before acquiring it because: + # 1. Synchronous code like `if not lock_.locked()` does not yield execution + # 2. `asyncio.Lock.acquire()` does not internally await anything if the lock is free + # 3. awaits only yield execution to the event loop at actual I/O boundaries + if wait or not lock_.locked(): + log.debug(f"{name}: acquiring lock for resource {namespace!r}:{id_!r}...") + async with lock_: return await func(*args, **kwargs) else: log.info(f"{name}: aborted because resource {namespace!r}:{id_!r} is locked") @@ -103,6 +90,7 @@ def lock_arg( func: Callable[[Any], _IdCallableReturn] = None, *, raise_error: bool = False, + wait: bool = False, ) -> Callable: """ Apply the `lock` decorator using the value of the arg at the given name/position as the ID. @@ -110,5 +98,5 @@ def lock_arg( `func` is an optional callable or awaitable which will return the ID given the argument value. See `lock` docs for more information. """ - decorator_func = partial(lock, namespace, raise_error=raise_error) + decorator_func = partial(lock, namespace, raise_error=raise_error, wait=wait) return function.get_arg_value_wrapper(decorator_func, name_or_pos, func) diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py index 03f31d78f..4dd036e4f 100644 --- a/bot/utils/scheduling.py +++ b/bot/utils/scheduling.py @@ -155,3 +155,20 @@ class Scheduler: # Log the exception if one exists. if exception: self._log.error(f"Error in task #{task_id} {id(done_task)}!", exc_info=exception) + + +def create_task(*args, **kwargs) -> asyncio.Task: + """Wrapper for `asyncio.create_task` which logs exceptions raised in the task.""" + task = asyncio.create_task(*args, **kwargs) + task.add_done_callback(_log_task_exception) + return task + + +def _log_task_exception(task: asyncio.Task) -> None: + """Retrieve and log the exception raised in `task` if one exists.""" + with contextlib.suppress(asyncio.CancelledError): + exception = task.exception() + # Log the exception if one exists. + if exception: + log = logging.getLogger(__name__) + log.error(f"Error in task {task.get_name()} {id(task)}!", exc_info=exception) |