aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Boris Muratov <[email protected]>2025-08-14 20:38:59 +0300
committerGravatar Boris Muratov <[email protected]>2025-08-14 20:38:59 +0300
commit29238c56f326f417e25a423768db1c8e7052c034 (patch)
tree7a691ea1eb05e96c392226d3b294234bc2a3070e
parentMake the cog check more specific (diff)
-rw-r--r--bot/exts/moderation/modpings.py73
1 files changed, 58 insertions, 15 deletions
diff --git a/bot/exts/moderation/modpings.py b/bot/exts/moderation/modpings.py
index fdb2ada8a..0be1afdd0 100644
--- a/bot/exts/moderation/modpings.py
+++ b/bot/exts/moderation/modpings.py
@@ -1,4 +1,4 @@
-from datetime import UTC, timedelta
+from datetime import UTC, datetime, timedelta
import arrow
import dateutil
@@ -89,6 +89,24 @@ class ModPings(Cog):
else:
await self.handle_moderator_state(mod)
+ @staticmethod
+ async def _parse_schedule(schedule: str) -> tuple[datetime, datetime]:
+ """Parse the schedule string stored in the schedules cache into the closest start and end times."""
+ start_time, shift_duration = schedule.split("|")
+ start = dateutil_parse(start_time).replace(tzinfo=UTC)
+ end = start + timedelta(seconds=int(shift_duration))
+ now = arrow.utcnow()
+
+ # Move the shift's day such that the end time is in the future and is closest.
+ if start - timedelta(days=1) < now < end - timedelta(days=1): # The shift started yesterday and is ongoing.
+ start -= timedelta(days=1)
+ end -= timedelta(days=1)
+ elif now > end: # Today's shift already ended, next one is tomorrow.
+ start += timedelta(days=1)
+ end += timedelta(days=1)
+
+ return start, end
+
async def handle_moderator_state(self, mod: Member) -> None:
"""Add/remove and/or schedule add/remove of the moderators role according to the mod's state in the caches."""
expiry_iso = await self.pings_off_mods.get(mod.id, None)
@@ -103,23 +121,12 @@ class ModPings(Cog):
await mod.add_roles(self.moderators_role, reason="Pings off period expired.")
return
- start_time, shift_duration = schedule_str.split("|")
- start = dateutil_parse(start_time).replace(tzinfo=UTC)
- end = start + timedelta(seconds=int(shift_duration))
- now = arrow.utcnow()
-
- # Move the shift's day such that the end time is in the future and is closest.
- if start - timedelta(days=1) < now < end - timedelta(days=1): # The shift started yesterday and is ongoing.
- start -= timedelta(days=1)
- end -= timedelta(days=1)
- elif now > end: # Today's shift already ended, next one is tomorrow.
- start += timedelta(days=1)
- end += timedelta(days=1)
+ start, end = await self._parse_schedule(schedule_str)
# The calls to `handle_moderator_state` here aren't recursive as the scheduler creates separate tasks.
# Start/end have to be differentiated in scheduler task ID. The task is removed from the scheduler only after
# completion. That means that task with ID X can't schedule a task with the same ID X.
- if start < now < end:
+ if start < arrow.utcnow() < end:
if mod.get_role(self.moderators_role.id) is None:
await mod.add_roles(self.moderators_role, reason="Mod active hours started.")
if f"{mod.id}_end" not in self._shift_scheduler:
@@ -136,6 +143,33 @@ class ModPings(Cog):
await self.pings_off_mods.delete(mod.id)
await self.handle_moderator_state(mod)
+ async def _get_current_status(self, mod_id: int) -> str:
+ """Build a string summarizing the moderator's current state and schedule (if one exists)."""
+ state = "on"
+ expiry_iso = await self.pings_off_mods.get(mod_id)
+ if expiry_iso is not None:
+ state = f"off until {discord_timestamp(isoparse(expiry_iso), format=TimestampFormats.DAY_TIME)}"
+
+ schedule = ""
+ schedule_str = await self.modpings_schedules.get(mod_id, None)
+ if schedule_str is not None:
+ start, end = await self._parse_schedule(schedule_str)
+ if state == "on":
+ if start < arrow.utcnow() < end:
+ state = "on according to schedule"
+ else:
+ state = "off according to schedule"
+ if state.startswith("off until"):
+ schedule = " Otherwise, pings are on every day between "
+ else:
+ schedule = " Pings are on every day between "
+ schedule += (
+ f"{discord_timestamp(start, TimestampFormats.TIME)} and "
+ f"{discord_timestamp(end, TimestampFormats.TIME)}."
+ )
+
+ return f"Pings are {state}.{schedule}"
+
@group(name="modpings", aliases=("modping",), invoke_without_command=True)
async def modpings_group(self, ctx: Context) -> None:
"""Allow the removal and re-addition of the pingable moderators role."""
@@ -201,7 +235,8 @@ class ModPings(Cog):
await self.handle_moderator_state(mod)
- await ctx.send(f"{Emojis.check_mark} Moderators role has been re-applied.") # TODO make message more accurate.
+ status = await self._get_current_status(mod.id)
+ await ctx.send(f"{Emojis.check_mark} {status}")
@modpings_group.group(name="schedule", aliases=("s",), invoke_without_command=True)
async def schedule_modpings(self, ctx: Context, start_time: str, end_time: str, tz: float | None) -> None:
@@ -259,11 +294,19 @@ class ModPings(Cog):
await self.handle_moderator_state(ctx.author)
await ctx.reply(f"{Emojis.ok_hand} Deleted your modpings schedule.")
+ @modpings_group.command(name="status", aliases=("state",))
+ async def status_command(self, ctx: Context) -> None:
+ """Show your current state and schedule (if one exists)."""
+ status = await self._get_current_status(ctx.author.id)
+ await ctx.reply(f":information: {status}")
+
@modpings_group.command(name="sync")
async def sync_command(self, ctx: Context) -> None:
"""
Attempt to re-sync your pingable moderators role with the stored state.
+ You can view your stored state using the `modpings status` command.
+
If there is a reoccurring problem, please report it.
"""
await self.handle_moderator_state(ctx.author)