aboutsummaryrefslogtreecommitdiffstats
path: root/bot/utils/scheduling.py
diff options
context:
space:
mode:
authorGravatar Sebastiaan Zeeff <[email protected]>2020-02-29 13:29:57 +0100
committerGravatar GitHub <[email protected]>2020-02-29 13:29:57 +0100
commitd2941da52face1d0b235f6748342ae67cc38fc9f (patch)
treeb4054c0e052ec4839f3a1cc06b42f8ecd01c646a /bot/utils/scheduling.py
parentMerge pull request #797 from Numerlor/fuzzy_zero_div (diff)
parentMerge branch 'master' into bug/backend/b754/scheduler-suppresses-errors (diff)
Merge pull request #755 from python-discord/bug/backend/b754/scheduler-suppresses-errors
Don't suppress all errors in scheduler
Diffstat (limited to '')
-rw-r--r--bot/utils/scheduling.py83
1 files changed, 56 insertions, 27 deletions
diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py
index ee6c0a8e6..5760ec2d4 100644
--- a/bot/utils/scheduling.py
+++ b/bot/utils/scheduling.py
@@ -1,8 +1,9 @@
import asyncio
import contextlib
import logging
+import typing as t
from abc import abstractmethod
-from typing import Coroutine, Dict, Union
+from functools import partial
from bot.utils import CogABCMeta
@@ -13,12 +14,13 @@ class Scheduler(metaclass=CogABCMeta):
"""Task scheduler."""
def __init__(self):
+ # Keep track of the child cog's name so the logs are clear.
+ self.cog_name = self.__class__.__name__
- self.cog_name = self.__class__.__name__ # keep track of the child cog's name so the logs are clear.
- self.scheduled_tasks: Dict[str, asyncio.Task] = {}
+ self._scheduled_tasks: t.Dict[t.Hashable, asyncio.Task] = {}
@abstractmethod
- async def _scheduled_task(self, task_object: dict) -> None:
+ async def _scheduled_task(self, task_object: t.Any) -> None:
"""
A coroutine which handles the scheduling.
@@ -29,46 +31,73 @@ class Scheduler(metaclass=CogABCMeta):
then make a site API request to delete the reminder from the database.
"""
- def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict) -> None:
+ def schedule_task(self, task_id: t.Hashable, task_data: t.Any) -> None:
"""
Schedules a task.
- `task_data` is passed to `Scheduler._scheduled_expiration`
+ `task_data` is passed to the `Scheduler._scheduled_task()` coroutine.
"""
- if task_id in self.scheduled_tasks:
+ log.trace(f"{self.cog_name}: scheduling task #{task_id}...")
+
+ if task_id in self._scheduled_tasks:
log.debug(
f"{self.cog_name}: did not schedule task #{task_id}; task was already scheduled."
)
return
- task: asyncio.Task = create_task(loop, self._scheduled_task(task_data))
+ task = asyncio.create_task(self._scheduled_task(task_data))
+ task.add_done_callback(partial(self._task_done_callback, task_id))
- self.scheduled_tasks[task_id] = task
- log.debug(f"{self.cog_name}: scheduled task #{task_id}.")
+ self._scheduled_tasks[task_id] = task
+ log.debug(f"{self.cog_name}: scheduled task #{task_id} {id(task)}.")
- def cancel_task(self, task_id: str) -> None:
- """Un-schedules a task."""
- task = self.scheduled_tasks.get(task_id)
+ def cancel_task(self, task_id: t.Hashable) -> None:
+ """Unschedule the task identified by `task_id`."""
+ log.trace(f"{self.cog_name}: cancelling task #{task_id}...")
+ task = self._scheduled_tasks.get(task_id)
- if task is None:
- log.warning(f"{self.cog_name}: Failed to unschedule {task_id} (no task found).")
+ if not task:
+ log.warning(f"{self.cog_name}: failed to unschedule {task_id} (no task found).")
return
task.cancel()
- log.debug(f"{self.cog_name}: unscheduled task #{task_id}.")
- del self.scheduled_tasks[task_id]
+ del self._scheduled_tasks[task_id]
+
+ log.debug(f"{self.cog_name}: unscheduled task #{task_id} {id(task)}.")
+ def _task_done_callback(self, task_id: t.Hashable, done_task: asyncio.Task) -> None:
+ """
+ Delete the task and raise its exception if one exists.
-def create_task(loop: asyncio.AbstractEventLoop, coro_or_future: Union[Coroutine, asyncio.Future]) -> asyncio.Task:
- """Creates an asyncio.Task object from a coroutine or future object."""
- task: asyncio.Task = asyncio.ensure_future(coro_or_future, loop=loop)
+ If `done_task` and the task associated with `task_id` are different, then the latter
+ will not be deleted. In this case, a new task was likely rescheduled with the same ID.
+ """
+ log.trace(f"{self.cog_name}: performing done callback for task #{task_id} {id(done_task)}.")
- # Silently ignore exceptions in a callback (handles the CancelledError nonsense)
- task.add_done_callback(_silent_exception)
- return task
+ scheduled_task = self._scheduled_tasks.get(task_id)
+ if scheduled_task and done_task is scheduled_task:
+ # A task for the ID exists and its the same as the done task.
+ # Since this is the done callback, the task is already done so no need to cancel it.
+ log.trace(f"{self.cog_name}: deleting task #{task_id} {id(done_task)}.")
+ del self._scheduled_tasks[task_id]
+ elif scheduled_task:
+ # A new task was likely rescheduled with the same ID.
+ log.debug(
+ f"{self.cog_name}: the scheduled task #{task_id} {id(scheduled_task)} "
+ f"and the done task {id(done_task)} differ."
+ )
+ elif not done_task.cancelled():
+ log.warning(
+ f"{self.cog_name}: task #{task_id} not found while handling task {id(done_task)}! "
+ f"A task somehow got unscheduled improperly (i.e. deleted but not cancelled)."
+ )
-def _silent_exception(future: asyncio.Future) -> None:
- """Suppress future's exception."""
- with contextlib.suppress(Exception):
- future.exception()
+ with contextlib.suppress(asyncio.CancelledError):
+ exception = done_task.exception()
+ # Log the exception if one exists.
+ if exception:
+ log.error(
+ f"{self.cog_name}: error in task #{task_id} {id(scheduled_task)}!",
+ exc_info=exception
+ )