From a3b0ffb72a1e72fc3be0e96c7407ddff2ade67c9 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 14 Jun 2022 00:37:07 +0200 Subject: Add decorator to block duplicate command invocations in a channel --- botcore/utils/cooldown.py | 184 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 botcore/utils/cooldown.py (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py new file mode 100644 index 00000000..a06dce46 --- /dev/null +++ b/botcore/utils/cooldown.py @@ -0,0 +1,184 @@ +"""Helpers for setting a cooldown on commands.""" + +from __future__ import annotations + +import asyncio +import math +import random +import time +import typing +from collections.abc import Awaitable, Hashable +from contextlib import suppress +from dataclasses import dataclass +from typing import Callable # sphinx-autodoc-typehints breaks with collections.abc.Callable + +import discord +from discord.ext.commands import CommandError, Context + +from botcore.utils import scheduling +from botcore.utils.function import command_wraps + +__all__ = ["CommandOnCooldown", "block_duplicate_invocations", "P", "R"] + +_ArgsTuple = tuple[object] + +if typing.TYPE_CHECKING: + from botcore import BotBase + import typing_extensions + P = typing_extensions.ParamSpec("P") + P.__constraints__ = () +else: + P = typing.TypeVar("P") + """The command's signature.""" + +R = typing.TypeVar("R") +"""The command's return value.""" + + +class CommandOnCooldown(CommandError, typing.Generic[P, R]): + """Raised when a command is invoked while on cooldown.""" + + def __init__( + self, + message: str | None, + function: Callable[P, Awaitable[R]], + *args: object, + **kwargs: object, + ): + super().__init__(message, function, args, kwargs) + self._function = function + self._args = args + self._kwargs = kwargs + + async def call_without_cooldown(self) -> R: + """ + Run the command this cooldown blocked. + + Returns: + The command's return value. + """ + return await self._function(*self._args, **self._kwargs) + + +@dataclass +class _CooldownItem: + call_arguments: _ArgsTuple + timeout_timestamp: float + + +class _CommandCooldownManager: + """ + Manage invocation cooldowns for a command through the arguments the command is called with. + + A cooldown is set through `set_cooldown` for a channel with the given `call_arguments`, + if `is_on_cooldown` is checked within `cooldown_duration` seconds + of the call to `set_cooldown` with the same arguments, True is returned. + """ + + def __init__(self, *, cooldown_duration: float): + self._cooldowns = dict[tuple[Hashable, _ArgsTuple], float]() + self._cooldowns_non_hashable = dict[Hashable, list[_CooldownItem]]() + self._cooldown_duration = cooldown_duration + self.cleanup_task = scheduling.create_task( + self._periodical_cleanup(random.uniform(0, 10)), + name="CooldownManager cleanup", + ) + + def set_cooldown(self, channel: Hashable, call_arguments: _ArgsTuple) -> None: + """Set `call_arguments` arguments on cooldown in `channel`.""" + timeout_timestamp = time.monotonic() + self._cooldown_duration + + try: + self._cooldowns[(channel, call_arguments)] = timeout_timestamp + except TypeError: + cooldowns_list = self._cooldowns_non_hashable.setdefault(channel, []) + for item in cooldowns_list: + if item.call_arguments == call_arguments: + item.timeout_timestamp = timeout_timestamp + else: + cooldowns_list.append(_CooldownItem(call_arguments, timeout_timestamp)) + + def is_on_cooldown(self, channel: Hashable, call_arguments: _ArgsTuple) -> bool: + """Check whether ``call_arguments`` is on cooldown in ``channel``.""" + current_time = time.monotonic() + try: + return self._cooldowns.get((channel, call_arguments), -math.inf) > current_time + except TypeError: + cooldowns_list = self._cooldowns_non_hashable.get(channel, None) + if cooldowns_list is None: + return False + + for item in cooldowns_list: + if item.call_arguments == call_arguments: + return item.timeout_timestamp > current_time + return False + + async def _periodical_cleanup(self, initial_delay: float) -> None: + """ + Wait for `initial_delay`, after that delete stale items every hour. + + The `initial_delay` ensures we're not running cleanups for every command at the same time. + """ + await asyncio.sleep(initial_delay) + while True: + await asyncio.sleep(60 * 60) + self._delete_stale_items() + + def _delete_stale_items(self) -> None: + """Remove expired items from internal collections.""" + current_time = time.monotonic() + + for key, timeout_timestamp in self._cooldowns.copy().items(): + if timeout_timestamp < current_time: + del self._cooldowns[key] + + for key, cooldowns_list in self._cooldowns_non_hashable.copy().items(): + filtered_cooldowns = [ + cooldown_item for cooldown_item in cooldowns_list if cooldown_item.timeout_timestamp < current_time + ] + + if not filtered_cooldowns: + del self._cooldowns_non_hashable[key] + else: + self._cooldowns_non_hashable[key] = filtered_cooldowns + + +def block_duplicate_invocations( + *, cooldown_duration: float = 5, send_notice: bool = False +) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: + """ + Prevent duplicate invocations of a command with the same arguments in a channel for ``cooldown_duration`` seconds. + + Args: + cooldown_duration: Length of the cooldown in seconds. + send_notice: If True, the user is notified of the cooldown with a reply. + + Returns: + A decorator that adds a wrapper which applies the cooldowns. + + Warning: + The created wrapper raises :exc:`CommandOnCooldown` when the command is on cooldown. + """ + + def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[R]]: + mgr = _CommandCooldownManager(cooldown_duration=cooldown_duration) + + @command_wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + arg_tuple = (*args[2:], *kwargs.items()) + ctx = typing.cast("Context[BotBase]", args[1]) + channel = ctx.channel + + if not isinstance(channel, discord.DMChannel): + if mgr.is_on_cooldown(ctx.channel, arg_tuple): + if send_notice: + with suppress(discord.NotFound): + await ctx.reply("The command is on cooldown with the given arguments.") + raise CommandOnCooldown(ctx.message.content, func, *args, **kwargs) + mgr.set_cooldown(ctx.channel, arg_tuple) + + return await func(*args, **kwargs) + + return wrapper + + return decorator -- cgit v1.2.3 From c8b23bbbd25372a55d6e3640b68cd96828922af0 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 21 Jun 2022 15:09:57 +0200 Subject: reword docstrings Co-authored-by: MarkKoz --- botcore/utils/cooldown.py | 14 +++++++------- botcore/utils/function.py | 9 +++------ docs/utils.py | 2 +- 3 files changed, 11 insertions(+), 14 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index a06dce46..9e79e48a 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -70,9 +70,9 @@ class _CommandCooldownManager: """ Manage invocation cooldowns for a command through the arguments the command is called with. - A cooldown is set through `set_cooldown` for a channel with the given `call_arguments`, - if `is_on_cooldown` is checked within `cooldown_duration` seconds - of the call to `set_cooldown` with the same arguments, True is returned. + Use `set_cooldown` to set a cooldown, + and `is_on_cooldown` to check for a cooldown for a channel with the given arguments. + A cooldown lasts for `cooldown_duration` seconds. """ def __init__(self, *, cooldown_duration: float): @@ -99,7 +99,7 @@ class _CommandCooldownManager: cooldowns_list.append(_CooldownItem(call_arguments, timeout_timestamp)) def is_on_cooldown(self, channel: Hashable, call_arguments: _ArgsTuple) -> bool: - """Check whether ``call_arguments`` is on cooldown in ``channel``.""" + """Check whether `call_arguments` is on cooldown in `channel`.""" current_time = time.monotonic() try: return self._cooldowns.get((channel, call_arguments), -math.inf) > current_time @@ -115,9 +115,9 @@ class _CommandCooldownManager: async def _periodical_cleanup(self, initial_delay: float) -> None: """ - Wait for `initial_delay`, after that delete stale items every hour. + Delete stale items every hour after waiting for `initial_delay`. - The `initial_delay` ensures we're not running cleanups for every command at the same time. + The `initial_delay` ensures cleanups are not running for every command at the same time. """ await asyncio.sleep(initial_delay) while True: @@ -151,7 +151,7 @@ def block_duplicate_invocations( Args: cooldown_duration: Length of the cooldown in seconds. - send_notice: If True, the user is notified of the cooldown with a reply. + send_notice: If :obj:`True`, notify the user about the cooldown with a reply. Returns: A decorator that adds a wrapper which applies the cooldowns. diff --git a/botcore/utils/function.py b/botcore/utils/function.py index 1cde5cd9..e8d24e90 100644 --- a/botcore/utils/function.py +++ b/botcore/utils/function.py @@ -28,17 +28,14 @@ def update_wrapper_globals( ignored_conflict_names: Set[str] = frozenset(), ) -> Callable[_P, _R]: r""" - Update globals of the ``wrapper`` function with the globals from the ``wrapped`` function. + Create a copy of ``wrapper``\, the copy's globals are updated with ``wrapped``\'s globals. For forwardrefs in command annotations, discord.py uses the ``__global__`` attribute of the function - to resolve their values, with decorators that replace the function this breaks because they have + to resolve their values. This breaks for decorators that replace the function because they have their own globals. - This function creates a new function functionally identical to ``wrapper``\, which has the globals replaced with - a merge of ``wrapped``\s globals and the ``wrapper``\s globals. - .. warning:: - This function captures the state of ``wrapped``\'s module's globals when it's called, + This function captures the state of ``wrapped``\'s module's globals when it's called; changes won't be reflected in the new function's globals. Args: diff --git a/docs/utils.py b/docs/utils.py index a4662ba4..9d299ebf 100644 --- a/docs/utils.py +++ b/docs/utils.py @@ -107,7 +107,7 @@ def _global_assign_pos(ast_: NodeWithBody, name: str) -> typing.Union[tuple[int, """ Find the first instance where the `name` global is defined in `ast_`. - Top level assignments, and assignments nested in top level ifs are checked. + Check top-level assignments and assignments nested in top-level if blocks. """ for ast_obj in ast_.body: if isinstance(ast_obj, ast.Assign): -- cgit v1.2.3 From 4d92e185f8e862642a70cc15cbca2eb6d442a767 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 21 Jun 2022 15:16:12 +0200 Subject: Add comment for skipped arguments --- botcore/utils/cooldown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 9e79e48a..b5641063 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -165,7 +165,7 @@ def block_duplicate_invocations( @command_wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - arg_tuple = (*args[2:], *kwargs.items()) + arg_tuple = (*args[2:], *kwargs.items()) # skip self and ctx from the command ctx = typing.cast("Context[BotBase]", args[1]) channel = ctx.channel -- cgit v1.2.3 From 5322db03696c5b73be00ca03e8958227b256f29d Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 21 Jun 2022 15:56:51 +0200 Subject: Fix typehint --- botcore/utils/cooldown.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index b5641063..34e88901 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -20,7 +20,7 @@ from botcore.utils.function import command_wraps __all__ = ["CommandOnCooldown", "block_duplicate_invocations", "P", "R"] -_ArgsTuple = tuple[object] +_ArgsTuple = tuple[object, ...] if typing.TYPE_CHECKING: from botcore import BotBase @@ -42,8 +42,8 @@ class CommandOnCooldown(CommandError, typing.Generic[P, R]): self, message: str | None, function: Callable[P, Awaitable[R]], - *args: object, - **kwargs: object, + *args: P.args, + **kwargs: P.kwargs, ): super().__init__(message, function, args, kwargs) self._function = function -- cgit v1.2.3 From 977363d6945ceb9b6bd6750f16c23998bbf3edc1 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 21 Jun 2022 16:05:02 +0200 Subject: generalize handling of fully hashable args, and args with non-hashable parts --- botcore/utils/cooldown.py | 91 +++++++++++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 34 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 34e88901..59b0722e 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -3,11 +3,10 @@ from __future__ import annotations import asyncio -import math import random import time import typing -from collections.abc import Awaitable, Hashable +from collections.abc import Awaitable, Hashable, Iterable from contextlib import suppress from dataclasses import dataclass from typing import Callable # sphinx-autodoc-typehints breaks with collections.abc.Callable @@ -20,7 +19,8 @@ from botcore.utils.function import command_wraps __all__ = ["CommandOnCooldown", "block_duplicate_invocations", "P", "R"] -_ArgsTuple = tuple[object, ...] +_ArgsList = list[object] +_HashableArgsTuple = tuple[Hashable, ...] if typing.TYPE_CHECKING: from botcore import BotBase @@ -62,10 +62,34 @@ class CommandOnCooldown(CommandError, typing.Generic[P, R]): @dataclass class _CooldownItem: - call_arguments: _ArgsTuple + arguments: _ArgsList timeout_timestamp: float +@dataclass +class _SeparatedArguments: + """Arguments separated into their hashable and non-hashable parts.""" + + hashable: _HashableArgsTuple + non_hashable: _ArgsList + + @classmethod + def from_full_arguments(cls, call_arguments: Iterable[object]) -> typing_extensions.Self: + """Create a new instance from full call arguments.""" + hashable = list[Hashable]() + non_hashable = list[object]() + + for item in call_arguments: + try: + hash(item) + except TypeError: + non_hashable.append(item) + else: + hashable.append(item) + + return cls(tuple(hashable), non_hashable) + + class _CommandCooldownManager: """ Manage invocation cooldowns for a command through the arguments the command is called with. @@ -76,43 +100,46 @@ class _CommandCooldownManager: """ def __init__(self, *, cooldown_duration: float): - self._cooldowns = dict[tuple[Hashable, _ArgsTuple], float]() - self._cooldowns_non_hashable = dict[Hashable, list[_CooldownItem]]() + self._cooldowns = dict[tuple[Hashable, _HashableArgsTuple], list[_CooldownItem]]() self._cooldown_duration = cooldown_duration self.cleanup_task = scheduling.create_task( self._periodical_cleanup(random.uniform(0, 10)), name="CooldownManager cleanup", ) - def set_cooldown(self, channel: Hashable, call_arguments: _ArgsTuple) -> None: + def set_cooldown(self, channel: Hashable, call_arguments: Iterable[object]) -> None: """Set `call_arguments` arguments on cooldown in `channel`.""" timeout_timestamp = time.monotonic() + self._cooldown_duration + separated_arguments = _SeparatedArguments.from_full_arguments(call_arguments) + cooldowns_list = self._cooldowns.setdefault( + (channel, separated_arguments.hashable), + [] + ) - try: - self._cooldowns[(channel, call_arguments)] = timeout_timestamp - except TypeError: - cooldowns_list = self._cooldowns_non_hashable.setdefault(channel, []) - for item in cooldowns_list: - if item.call_arguments == call_arguments: - item.timeout_timestamp = timeout_timestamp - else: - cooldowns_list.append(_CooldownItem(call_arguments, timeout_timestamp)) + for item in cooldowns_list: + if item.arguments == separated_arguments.non_hashable: + item.timeout_timestamp = timeout_timestamp + return - def is_on_cooldown(self, channel: Hashable, call_arguments: _ArgsTuple) -> bool: + cooldowns_list.append(_CooldownItem(separated_arguments.non_hashable, timeout_timestamp)) + + def is_on_cooldown(self, channel: Hashable, call_arguments: Iterable[object]) -> bool: """Check whether `call_arguments` is on cooldown in `channel`.""" current_time = time.monotonic() - try: - return self._cooldowns.get((channel, call_arguments), -math.inf) > current_time - except TypeError: - cooldowns_list = self._cooldowns_non_hashable.get(channel, None) - if cooldowns_list is None: - return False - - for item in cooldowns_list: - if item.call_arguments == call_arguments: - return item.timeout_timestamp > current_time + separated_arguments = _SeparatedArguments.from_full_arguments(call_arguments) + cooldowns_list = self._cooldowns.get( + (channel, separated_arguments.hashable), + None + ) + + if cooldowns_list is None: return False + for item in cooldowns_list: + if item.arguments == separated_arguments.non_hashable: + return item.timeout_timestamp > current_time + return False + async def _periodical_cleanup(self, initial_delay: float) -> None: """ Delete stale items every hour after waiting for `initial_delay`. @@ -128,19 +155,15 @@ class _CommandCooldownManager: """Remove expired items from internal collections.""" current_time = time.monotonic() - for key, timeout_timestamp in self._cooldowns.copy().items(): - if timeout_timestamp < current_time: - del self._cooldowns[key] - - for key, cooldowns_list in self._cooldowns_non_hashable.copy().items(): + for key, cooldowns_list in self._cooldowns.copy().items(): filtered_cooldowns = [ cooldown_item for cooldown_item in cooldowns_list if cooldown_item.timeout_timestamp < current_time ] if not filtered_cooldowns: - del self._cooldowns_non_hashable[key] + del self._cooldowns[key] else: - self._cooldowns_non_hashable[key] = filtered_cooldowns + self._cooldowns[key] = filtered_cooldowns def block_duplicate_invocations( -- cgit v1.2.3 From 805c60437e50e2153d61e484872b207affd8db1f Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 21 Jun 2022 19:20:03 +0200 Subject: stop cleanup task when manager is destroyed --- botcore/utils/cooldown.py | 8 +++++++- tests/botcore/utils/test_cooldown.py | 5 +++-- 2 files changed, 10 insertions(+), 3 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 59b0722e..099edba0 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -6,6 +6,7 @@ import asyncio import random import time import typing +import weakref from collections.abc import Awaitable, Hashable, Iterable from contextlib import suppress from dataclasses import dataclass @@ -106,6 +107,7 @@ class _CommandCooldownManager: self._periodical_cleanup(random.uniform(0, 10)), name="CooldownManager cleanup", ) + weakref.finalize(self, self.cleanup_task.cancel) def set_cooldown(self, channel: Hashable, call_arguments: Iterable[object]) -> None: """Set `call_arguments` arguments on cooldown in `channel`.""" @@ -145,11 +147,15 @@ class _CommandCooldownManager: Delete stale items every hour after waiting for `initial_delay`. The `initial_delay` ensures cleanups are not running for every command at the same time. + A strong reference to self is only kept while cleanup is running. """ + weak_self = weakref.ref(self) + del self + await asyncio.sleep(initial_delay) while True: await asyncio.sleep(60 * 60) - self._delete_stale_items() + weak_self()._delete_stale_items() def _delete_stale_items(self) -> None: """Remove expired items from internal collections.""" diff --git a/tests/botcore/utils/test_cooldown.py b/tests/botcore/utils/test_cooldown.py index e7fe0f59..87c433ce 100644 --- a/tests/botcore/utils/test_cooldown.py +++ b/tests/botcore/utils/test_cooldown.py @@ -1,10 +1,11 @@ import unittest +from collections.abc import Iterable from unittest.mock import patch -from botcore.utils.cooldown import _ArgsTuple, _CommandCooldownManager +from botcore.utils.cooldown import _CommandCooldownManager -def create_argument_tuple(*args, **kwargs) -> _ArgsTuple: +def create_argument_tuple(*args, **kwargs) -> Iterable[object]: return (*args, *kwargs.items()) -- cgit v1.2.3 From 67003153c718925844447127f291501adddb49c0 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 21 Jun 2022 19:48:36 +0200 Subject: ensure tuples from pos arg and kwarg tuples are differentiated --- botcore/utils/cooldown.py | 8 +++++++- tests/botcore/utils/test_cooldown.py | 26 +++++++++++++------------- 2 files changed, 20 insertions(+), 14 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 099edba0..b9149b48 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -20,6 +20,8 @@ from botcore.utils.function import command_wraps __all__ = ["CommandOnCooldown", "block_duplicate_invocations", "P", "R"] +_KEYWORD_SEP_SENTINEL = object() + _ArgsList = list[object] _HashableArgsTuple = tuple[Hashable, ...] @@ -172,6 +174,10 @@ class _CommandCooldownManager: self._cooldowns[key] = filtered_cooldowns +def _create_argument_tuple(*args: object, **kwargs: object) -> Iterable[object]: + return (*args, _KEYWORD_SEP_SENTINEL, *kwargs.items()) + + def block_duplicate_invocations( *, cooldown_duration: float = 5, send_notice: bool = False ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: @@ -194,7 +200,7 @@ def block_duplicate_invocations( @command_wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - arg_tuple = (*args[2:], *kwargs.items()) # skip self and ctx from the command + arg_tuple = _create_argument_tuple(*args[2:], **kwargs) # skip self and ctx from the command ctx = typing.cast("Context[BotBase]", args[1]) channel = ctx.channel diff --git a/tests/botcore/utils/test_cooldown.py b/tests/botcore/utils/test_cooldown.py index 87c433ce..00e5a052 100644 --- a/tests/botcore/utils/test_cooldown.py +++ b/tests/botcore/utils/test_cooldown.py @@ -1,23 +1,18 @@ import unittest -from collections.abc import Iterable from unittest.mock import patch -from botcore.utils.cooldown import _CommandCooldownManager - - -def create_argument_tuple(*args, **kwargs) -> Iterable[object]: - return (*args, *kwargs.items()) +from botcore.utils.cooldown import _CommandCooldownManager, _create_argument_tuple class CommandCooldownManagerTests(unittest.IsolatedAsyncioTestCase): test_call_args = ( - create_argument_tuple(0), - create_argument_tuple(a=0), - create_argument_tuple([]), - create_argument_tuple(a=[]), - create_argument_tuple(1, 2, 3, a=4, b=5, c=6), - create_argument_tuple([1], [2], [3], a=[4], b=[5], c=[6]), - create_argument_tuple([1], 2, [3], a=4, b=[5], c=6), + _create_argument_tuple(0), + _create_argument_tuple(a=0), + _create_argument_tuple([]), + _create_argument_tuple(a=[]), + _create_argument_tuple(1, 2, 3, a=4, b=5, c=6), + _create_argument_tuple([1], [2], [3], a=[4], b=[5], c=[6]), + _create_argument_tuple([1], 2, [3], a=4, b=[5], c=6), ) async def asyncSetUp(self): @@ -47,3 +42,8 @@ class CommandCooldownManagerTests(unittest.IsolatedAsyncioTestCase): with self.subTest(arguments_tuple=call_args): self.cooldown_manager.set_cooldown(0, call_args) self.assertFalse(self.cooldown_manager.is_on_cooldown(0, call_args)) + + def test_keywords_and_tuples_differentiated(self): + self.cooldown_manager.set_cooldown(0, _create_argument_tuple(("a", 0))) + self.assertFalse(self.cooldown_manager.is_on_cooldown(0, _create_argument_tuple(a=0))) + self.assertTrue(self.cooldown_manager.is_on_cooldown(0, _create_argument_tuple(("a", 0)))) -- cgit v1.2.3 From ed9890abd8c07d6f414e273139e8715f3917b7fc Mon Sep 17 00:00:00 2001 From: Numerlor Date: Sun, 18 Sep 2022 20:41:47 +0200 Subject: use paramspec from typing the package now requires python 3.10 --- botcore/utils/cooldown.py | 9 +++------ botcore/utils/function.py | 3 +-- 2 files changed, 4 insertions(+), 8 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index ee65033d..5fb974e2 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -25,14 +25,11 @@ _ArgsList = list[object] _HashableArgsTuple = tuple[Hashable, ...] if typing.TYPE_CHECKING: - from botcore import BotBase import typing_extensions - P = typing_extensions.ParamSpec("P") - P.__constraints__ = () -else: - P = typing.TypeVar("P") - """The command's signature.""" + from botcore import BotBase +P = typing.ParamSpec("P") +"""The command's signature.""" R = typing.TypeVar("R") """The command's return value.""" diff --git a/botcore/utils/function.py b/botcore/utils/function.py index 0e90d4c5..d89163ec 100644 --- a/botcore/utils/function.py +++ b/botcore/utils/function.py @@ -11,8 +11,7 @@ __all__ = ["command_wraps", "GlobalNameConflictError", "update_wrapper_globals"] if typing.TYPE_CHECKING: - import typing_extensions - _P = typing_extensions.ParamSpec("_P") + _P = typing.ParamSpec("_P") _R = typing.TypeVar("_R") -- cgit v1.2.3 From 824d1ebfd7b1b26df10d5712ad7b7b033007e43e Mon Sep 17 00:00:00 2001 From: Numerlor Date: Mon, 19 Sep 2022 00:37:45 +0200 Subject: use a clearer name --- botcore/utils/cooldown.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 5fb974e2..817f6947 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -61,7 +61,7 @@ class CommandOnCooldown(CommandError, typing.Generic[P, R]): @dataclass class _CooldownItem: - arguments: _ArgsList + non_hashable_arguments: _ArgsList timeout_timestamp: float @@ -117,7 +117,7 @@ class _CommandCooldownManager: ) for item in cooldowns_list: - if item.arguments == separated_arguments.non_hashable: + if item.non_hashable_arguments == separated_arguments.non_hashable: item.timeout_timestamp = timeout_timestamp return @@ -136,7 +136,7 @@ class _CommandCooldownManager: return False for item in cooldowns_list: - if item.arguments == separated_arguments.non_hashable: + if item.non_hashable_arguments == separated_arguments.non_hashable: return item.timeout_timestamp > current_time return False -- cgit v1.2.3 From 27a9e8051b074558a302ac297ec7758044dc65fd Mon Sep 17 00:00:00 2001 From: Numerlor Date: Mon, 19 Sep 2022 02:15:32 +0200 Subject: get rid of unnecessary check --- botcore/utils/cooldown.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 817f6947..28ad8795 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -129,12 +129,9 @@ class _CommandCooldownManager: separated_arguments = _SeparatedArguments.from_full_arguments(call_arguments) cooldowns_list = self._cooldowns.get( (channel, separated_arguments.hashable), - None + [], ) - if cooldowns_list is None: - return False - for item in cooldowns_list: if item.non_hashable_arguments == separated_arguments.non_hashable: return item.timeout_timestamp > current_time -- cgit v1.2.3 From e94b47fc84704d5b4c945e310caee9a2bbae938a Mon Sep 17 00:00:00 2001 From: Numerlor Date: Mon, 19 Sep 2022 12:46:56 +0200 Subject: use ctx.channel directly --- botcore/utils/cooldown.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 28ad8795..879a2894 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -195,9 +195,8 @@ def block_duplicate_invocations( async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: arg_tuple = _create_argument_tuple(*args[2:], **kwargs) # skip self and ctx from the command ctx = typing.cast("Context[BotBase]", args[1]) - channel = ctx.channel - if not isinstance(channel, discord.DMChannel): + if not isinstance(ctx.channel, discord.DMChannel): if mgr.is_on_cooldown(ctx.channel, arg_tuple): if send_notice: with suppress(discord.NotFound): -- cgit v1.2.3 From 825b14565eec85a5ae1bd52ecc99162705178e42 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Mon, 19 Sep 2022 12:48:12 +0200 Subject: type return with tuple while only iterable is strictly needed, any reader would expect the function to return a tuple --- botcore/utils/cooldown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 879a2894..8de7c243 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -167,7 +167,7 @@ class _CommandCooldownManager: self._cooldowns[key] = filtered_cooldowns -def _create_argument_tuple(*args: object, **kwargs: object) -> Iterable[object]: +def _create_argument_tuple(*args: object, **kwargs: object) -> tuple[object, ...]: return (*args, _KEYWORD_SEP_SENTINEL, *kwargs.items()) -- cgit v1.2.3 From 2c903cfc72e80aaa1873cdc50f14353c52201976 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Mon, 19 Sep 2022 12:49:48 +0200 Subject: use more consistent styling --- botcore/utils/cooldown.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 8de7c243..2e722d20 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -113,7 +113,7 @@ class _CommandCooldownManager: separated_arguments = _SeparatedArguments.from_full_arguments(call_arguments) cooldowns_list = self._cooldowns.setdefault( (channel, separated_arguments.hashable), - [] + [], ) for item in cooldowns_list: @@ -172,7 +172,9 @@ def _create_argument_tuple(*args: object, **kwargs: object) -> tuple[object, ... def block_duplicate_invocations( - *, cooldown_duration: float = 5, send_notice: bool = False + *, + cooldown_duration: float = 5, + send_notice: bool = False, ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: """ Prevent duplicate invocations of a command with the same arguments in a channel for ``cooldown_duration`` seconds. -- cgit v1.2.3 From 49d60ac37f368cba72c4863b96f191ba17e0ab22 Mon Sep 17 00:00:00 2001 From: Numerlor Date: Mon, 19 Sep 2022 13:04:01 +0200 Subject: allow user specified function to manipulate cooldown args --- botcore/utils/cooldown.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 2e722d20..31505ace 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -175,6 +175,7 @@ def block_duplicate_invocations( *, cooldown_duration: float = 5, send_notice: bool = False, + args_preprocessor: Callable[P, Iterable[object]] | None = None, ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: """ Prevent duplicate invocations of a command with the same arguments in a channel for ``cooldown_duration`` seconds. @@ -182,6 +183,8 @@ def block_duplicate_invocations( Args: cooldown_duration: Length of the cooldown in seconds. send_notice: If :obj:`True`, notify the user about the cooldown with a reply. + args_preprocessor: If specified, this function is called with the args and kwargs the function is called with, + its return value is then used to check for the cooldown instead of the raw arguments. Returns: A decorator that adds a wrapper which applies the cooldowns. @@ -195,16 +198,19 @@ def block_duplicate_invocations( @command_wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - arg_tuple = _create_argument_tuple(*args[2:], **kwargs) # skip self and ctx from the command + if args_preprocessor is not None: + all_args = args_preprocessor(*args, **kwargs) + else: + all_args = _create_argument_tuple(*args[2:], **kwargs) # skip self and ctx from the command ctx = typing.cast("Context[BotBase]", args[1]) if not isinstance(ctx.channel, discord.DMChannel): - if mgr.is_on_cooldown(ctx.channel, arg_tuple): + if mgr.is_on_cooldown(ctx.channel, all_args): if send_notice: with suppress(discord.NotFound): await ctx.reply("The command is on cooldown with the given arguments.") raise CommandOnCooldown(ctx.message.content, func, *args, **kwargs) - mgr.set_cooldown(ctx.channel, arg_tuple) + mgr.set_cooldown(ctx.channel, all_args) return await func(*args, **kwargs) -- cgit v1.2.3 From 123f054e5490c0b789f70a7a9efcc6b55a8c61fa Mon Sep 17 00:00:00 2001 From: Numerlor Date: Tue, 27 Sep 2022 23:30:53 +0200 Subject: make function and message positional only to free up names for kwargs --- botcore/utils/cooldown.py | 1 + 1 file changed, 1 insertion(+) (limited to 'botcore/utils/cooldown.py') diff --git a/botcore/utils/cooldown.py b/botcore/utils/cooldown.py index 31505ace..015734d2 100644 --- a/botcore/utils/cooldown.py +++ b/botcore/utils/cooldown.py @@ -41,6 +41,7 @@ class CommandOnCooldown(CommandError, typing.Generic[P, R]): self, message: str | None, function: Callable[P, Awaitable[R]], + /, *args: P.args, **kwargs: P.kwargs, ): -- cgit v1.2.3