aboutsummaryrefslogtreecommitdiffstats
path: root/bot/utils/scheduling.py
diff options
context:
space:
mode:
authorGravatar Kingsley McDonald <[email protected]>2018-10-07 13:33:42 +0100
committerGravatar Kingsley McDonald <[email protected]>2018-10-07 13:33:42 +0100
commitd079b3d34ba1fb045e63d332b70bc91940246492 (patch)
tree8e91735ad9b4de23bda71fa7cb10771cc619282b /bot/utils/scheduling.py
parentMerge branch 'remind-command' into 'master' (diff)
common scheduling methods have been moved to a separate abstract class.
Diffstat (limited to '')
-rw-r--r--bot/utils/scheduling.py55
1 files changed, 55 insertions, 0 deletions
diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py
index f9b844046..ded6401b0 100644
--- a/bot/utils/scheduling.py
+++ b/bot/utils/scheduling.py
@@ -1,5 +1,60 @@
import asyncio
import contextlib
+import logging
+from abc import ABC, abstractmethod
+from typing import Dict
+
+log = logging.getLogger(__name__)
+
+
+class Scheduler(ABC):
+
+ def __init__(self):
+
+ self.cog_name = self.__class__.__name__ # keep track of the child cog's name so the logs are clear.
+ self.scheduled_tasks: Dict[str, asyncio.Task] = {}
+
+ @abstractmethod
+ async def _scheduled_task(self, task_object: dict):
+ """
+ A coroutine which handles the scheduling. This is added to the scheduled tasks,
+ and should wait the task duration, execute the desired code, and clean up the task.
+ For example, in Reminders this will wait for the reminder duration, send the reminder,
+ then make a site API request to delete the reminder from the database.
+
+ :param task_object:
+ """
+
+ def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict):
+ """
+ Schedules a task.
+ :param loop: the asyncio event loop
+ :param task_id: the ID of the task.
+ :param task_data: the data of the task, passed to `Scheduler._scheduled_expiration`.
+ """
+
+ if task_id in self.scheduled_tasks:
+ return
+
+ task: asyncio.Task = create_task(loop, self._scheduled_task(task_data))
+
+ self.scheduled_tasks[task_id] = task
+
+ def cancel_task(self, task_id: str):
+ """
+ Un-schedules a task.
+ :param task_id: the ID of the infraction in question
+ """
+
+ task = self.scheduled_tasks.get(task_id)
+
+ if task is None:
+ log.warning(f"{self.cog_name}: Failed to unschedule {task_id} (no task found).")
+ return
+
+ task.cancel()
+ log.debug(f"{self.cog_name}: Unscheduled {task_id}.")
+ del self.scheduled_tasks[task_id]
def create_task(loop: asyncio.AbstractEventLoop, coro_or_future):