diff options
Diffstat (limited to 'botcore/utils/scheduling.py')
-rw-r--r-- | botcore/utils/scheduling.py | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/botcore/utils/scheduling.py b/botcore/utils/scheduling.py index 164f6b10..ebc42665 100644 --- a/botcore/utils/scheduling.py +++ b/botcore/utils/scheduling.py @@ -4,6 +4,7 @@ import asyncio import contextlib import inspect import typing +from collections import abc from datetime import datetime from functools import partial @@ -38,9 +39,9 @@ class Scheduler: self.name = name self._log = logging.get_logger(f"{__name__}.{name}") - self._scheduled_tasks: typing.Dict[typing.Hashable, asyncio.Task] = {} + self._scheduled_tasks: dict[abc.Hashable, asyncio.Task] = {} - def __contains__(self, task_id: typing.Hashable) -> bool: + def __contains__(self, task_id: abc.Hashable) -> bool: """ Return :obj:`True` if a task with the given ``task_id`` is currently scheduled. @@ -52,7 +53,7 @@ class Scheduler: """ return task_id in self._scheduled_tasks - def schedule(self, task_id: typing.Hashable, coroutine: typing.Coroutine) -> None: + def schedule(self, task_id: abc.Hashable, coroutine: abc.Coroutine) -> None: """ Schedule the execution of a ``coroutine``. @@ -79,7 +80,7 @@ class Scheduler: self._scheduled_tasks[task_id] = task self._log.debug(f"Scheduled task #{task_id} {id(task)}.") - def schedule_at(self, time: datetime, task_id: typing.Hashable, coroutine: typing.Coroutine) -> None: + def schedule_at(self, time: datetime, task_id: abc.Hashable, coroutine: abc.Coroutine) -> None: """ Schedule ``coroutine`` to be executed at the given ``time``. @@ -106,8 +107,8 @@ class Scheduler: def schedule_later( self, delay: typing.Union[int, float], - task_id: typing.Hashable, - coroutine: typing.Coroutine + task_id: abc.Hashable, + coroutine: abc.Coroutine ) -> None: """ Schedule ``coroutine`` to be executed after ``delay`` seconds. @@ -122,7 +123,7 @@ class Scheduler: """ self.schedule(task_id, self._await_later(delay, task_id, coroutine)) - def cancel(self, task_id: typing.Hashable) -> None: + def cancel(self, task_id: abc.Hashable) -> None: """ Unschedule the task identified by ``task_id``. Log a warning if the task doesn't exist. @@ -150,8 +151,8 @@ class Scheduler: async def _await_later( self, delay: typing.Union[int, float], - task_id: typing.Hashable, - coroutine: typing.Coroutine + task_id: abc.Hashable, + coroutine: abc.Coroutine ) -> None: """Await ``coroutine`` after ``delay`` seconds.""" try: @@ -173,7 +174,7 @@ class Scheduler: else: self._log.debug(f"Finally block reached for #{task_id}; {state=}") - def _task_done_callback(self, task_id: typing.Hashable, done_task: asyncio.Task) -> None: + def _task_done_callback(self, task_id: abc.Hashable, done_task: asyncio.Task) -> None: """ Delete the task and raise its exception if one exists. @@ -208,13 +209,16 @@ class Scheduler: self._log.error(f"Error in task #{task_id} {id(done_task)}!", exc_info=exception) +TASK_RETURN = typing.TypeVar("TASK_RETURN") + + def create_task( - coro: typing.Awaitable, + coro: abc.Coroutine[typing.Any, typing.Any, TASK_RETURN], *, - suppressed_exceptions: tuple[typing.Type[Exception]] = (), + suppressed_exceptions: tuple[type[Exception]] = (), event_loop: typing.Optional[asyncio.AbstractEventLoop] = None, **kwargs, -) -> asyncio.Task: +) -> asyncio.Task[TASK_RETURN]: """ Wrapper for creating an :obj:`asyncio.Task` which logs exceptions raised in the task. @@ -238,7 +242,7 @@ def create_task( return task -def _log_task_exception(task: asyncio.Task, *, suppressed_exceptions: typing.Tuple[typing.Type[Exception]]) -> None: +def _log_task_exception(task: asyncio.Task, *, suppressed_exceptions: tuple[type[Exception]]) -> None: """Retrieve and log the exception raised in ``task`` if one exists.""" with contextlib.suppress(asyncio.CancelledError): exception = task.exception() |