aboutsummaryrefslogtreecommitdiffstats
path: root/botcore/utils/scheduling.py
diff options
context:
space:
mode:
Diffstat (limited to 'botcore/utils/scheduling.py')
-rw-r--r--botcore/utils/scheduling.py32
1 files changed, 18 insertions, 14 deletions
diff --git a/botcore/utils/scheduling.py b/botcore/utils/scheduling.py
index 164f6b10..ebc42665 100644
--- a/botcore/utils/scheduling.py
+++ b/botcore/utils/scheduling.py
@@ -4,6 +4,7 @@ import asyncio
import contextlib
import inspect
import typing
+from collections import abc
from datetime import datetime
from functools import partial
@@ -38,9 +39,9 @@ class Scheduler:
self.name = name
self._log = logging.get_logger(f"{__name__}.{name}")
- self._scheduled_tasks: typing.Dict[typing.Hashable, asyncio.Task] = {}
+ self._scheduled_tasks: dict[abc.Hashable, asyncio.Task] = {}
- def __contains__(self, task_id: typing.Hashable) -> bool:
+ def __contains__(self, task_id: abc.Hashable) -> bool:
"""
Return :obj:`True` if a task with the given ``task_id`` is currently scheduled.
@@ -52,7 +53,7 @@ class Scheduler:
"""
return task_id in self._scheduled_tasks
- def schedule(self, task_id: typing.Hashable, coroutine: typing.Coroutine) -> None:
+ def schedule(self, task_id: abc.Hashable, coroutine: abc.Coroutine) -> None:
"""
Schedule the execution of a ``coroutine``.
@@ -79,7 +80,7 @@ class Scheduler:
self._scheduled_tasks[task_id] = task
self._log.debug(f"Scheduled task #{task_id} {id(task)}.")
- def schedule_at(self, time: datetime, task_id: typing.Hashable, coroutine: typing.Coroutine) -> None:
+ def schedule_at(self, time: datetime, task_id: abc.Hashable, coroutine: abc.Coroutine) -> None:
"""
Schedule ``coroutine`` to be executed at the given ``time``.
@@ -106,8 +107,8 @@ class Scheduler:
def schedule_later(
self,
delay: typing.Union[int, float],
- task_id: typing.Hashable,
- coroutine: typing.Coroutine
+ task_id: abc.Hashable,
+ coroutine: abc.Coroutine
) -> None:
"""
Schedule ``coroutine`` to be executed after ``delay`` seconds.
@@ -122,7 +123,7 @@ class Scheduler:
"""
self.schedule(task_id, self._await_later(delay, task_id, coroutine))
- def cancel(self, task_id: typing.Hashable) -> None:
+ def cancel(self, task_id: abc.Hashable) -> None:
"""
Unschedule the task identified by ``task_id``. Log a warning if the task doesn't exist.
@@ -150,8 +151,8 @@ class Scheduler:
async def _await_later(
self,
delay: typing.Union[int, float],
- task_id: typing.Hashable,
- coroutine: typing.Coroutine
+ task_id: abc.Hashable,
+ coroutine: abc.Coroutine
) -> None:
"""Await ``coroutine`` after ``delay`` seconds."""
try:
@@ -173,7 +174,7 @@ class Scheduler:
else:
self._log.debug(f"Finally block reached for #{task_id}; {state=}")
- def _task_done_callback(self, task_id: typing.Hashable, done_task: asyncio.Task) -> None:
+ def _task_done_callback(self, task_id: abc.Hashable, done_task: asyncio.Task) -> None:
"""
Delete the task and raise its exception if one exists.
@@ -208,13 +209,16 @@ class Scheduler:
self._log.error(f"Error in task #{task_id} {id(done_task)}!", exc_info=exception)
+TASK_RETURN = typing.TypeVar("TASK_RETURN")
+
+
def create_task(
- coro: typing.Awaitable,
+ coro: abc.Coroutine[typing.Any, typing.Any, TASK_RETURN],
*,
- suppressed_exceptions: tuple[typing.Type[Exception]] = (),
+ suppressed_exceptions: tuple[type[Exception]] = (),
event_loop: typing.Optional[asyncio.AbstractEventLoop] = None,
**kwargs,
-) -> asyncio.Task:
+) -> asyncio.Task[TASK_RETURN]:
"""
Wrapper for creating an :obj:`asyncio.Task` which logs exceptions raised in the task.
@@ -238,7 +242,7 @@ def create_task(
return task
-def _log_task_exception(task: asyncio.Task, *, suppressed_exceptions: typing.Tuple[typing.Type[Exception]]) -> None:
+def _log_task_exception(task: asyncio.Task, *, suppressed_exceptions: tuple[type[Exception]]) -> None:
"""Retrieve and log the exception raised in ``task`` if one exists."""
with contextlib.suppress(asyncio.CancelledError):
exception = task.exception()