aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar MarkKoz <[email protected]>2020-02-28 09:59:57 -0800
committerGravatar MarkKoz <[email protected]>2020-02-28 09:59:57 -0800
commit5daf1db8ea9e86568da4907d42507aa3286eb3c1 (patch)
tree661ac6e9b2e9006999200207d1c5129e49feb6f4
parentScheduler: log the exception instead of raising (diff)
Scheduler: correct type annotations
-rw-r--r--bot/utils/scheduling.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py
index 1eae817c1..5760ec2d4 100644
--- a/bot/utils/scheduling.py
+++ b/bot/utils/scheduling.py
@@ -1,9 +1,9 @@
import asyncio
import contextlib
import logging
+import typing as t
from abc import abstractmethod
from functools import partial
-from typing import Dict
from bot.utils import CogABCMeta
@@ -17,10 +17,10 @@ class Scheduler(metaclass=CogABCMeta):
# Keep track of the child cog's name so the logs are clear.
self.cog_name = self.__class__.__name__
- self._scheduled_tasks: Dict[str, asyncio.Task] = {}
+ self._scheduled_tasks: t.Dict[t.Hashable, asyncio.Task] = {}
@abstractmethod
- async def _scheduled_task(self, task_object: dict) -> None:
+ async def _scheduled_task(self, task_object: t.Any) -> None:
"""
A coroutine which handles the scheduling.
@@ -31,7 +31,7 @@ class Scheduler(metaclass=CogABCMeta):
then make a site API request to delete the reminder from the database.
"""
- def schedule_task(self, task_id: str, task_data: dict) -> None:
+ def schedule_task(self, task_id: t.Hashable, task_data: t.Any) -> None:
"""
Schedules a task.
@@ -51,7 +51,7 @@ class Scheduler(metaclass=CogABCMeta):
self._scheduled_tasks[task_id] = task
log.debug(f"{self.cog_name}: scheduled task #{task_id} {id(task)}.")
- def cancel_task(self, task_id: str) -> None:
+ def cancel_task(self, task_id: t.Hashable) -> None:
"""Unschedule the task identified by `task_id`."""
log.trace(f"{self.cog_name}: cancelling task #{task_id}...")
task = self._scheduled_tasks.get(task_id)
@@ -65,7 +65,7 @@ class Scheduler(metaclass=CogABCMeta):
log.debug(f"{self.cog_name}: unscheduled task #{task_id} {id(task)}.")
- def _task_done_callback(self, task_id: str, done_task: asyncio.Task) -> None:
+ def _task_done_callback(self, task_id: t.Hashable, done_task: asyncio.Task) -> None:
"""
Delete the task and raise its exception if one exists.