aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Boris Muratov <[email protected]>2025-08-13 02:17:24 +0300
committerGravatar Boris Muratov <[email protected]>2025-08-13 17:42:40 +0300
commit494a0a045bd7957c971953215eaf84434eb85ec8 (patch)
tree14d66d2d74533f1b377e4398d04577069d3ed783
parentDependency bumps (diff)
Initial cleanup and map out issues
-rw-r--r--bot/exts/moderation/modpings.py117
1 files changed, 65 insertions, 52 deletions
diff --git a/bot/exts/moderation/modpings.py b/bot/exts/moderation/modpings.py
index 002bc4cfe..289790a32 100644
--- a/bot/exts/moderation/modpings.py
+++ b/bot/exts/moderation/modpings.py
@@ -1,11 +1,12 @@
import asyncio
-import datetime
+from datetime import UTC, datetime, timedelta
import arrow
+import dateutil
from async_rediscache import RedisCache
from dateutil.parser import isoparse, parse as dateutil_parse
from discord import Member
-from discord.ext.commands import Cog, Context, group, has_any_role
+from discord.ext.commands import BadArgument, Cog, Context, group, has_any_role
from pydis_core.utils.members import get_or_fetch_member
from pydis_core.utils.scheduling import Scheduler
@@ -17,7 +18,7 @@ from bot.utils.time import TimestampFormats, discord_timestamp
log = get_logger(__name__)
-MAXIMUM_WORK_LIMIT = 16
+MAXIMUM_WORK_LIMIT = 23
class ModPings(Cog):
@@ -29,29 +30,33 @@ class ModPings(Cog):
pings_off_mods = RedisCache()
# RedisCache[discord.Member.id, 'start timestamp|total worktime in seconds']
- # The cache's keys are mod's ID
+ # The cache's keys are mods' IDs
# The cache's values are their pings on schedule timestamp and the total seconds (work time) until pings off
modpings_schedule = RedisCache()
def __init__(self, bot: Bot):
self.bot = bot
self._role_scheduler = Scheduler("ModPingsOnOff")
- self._modpings_scheduler = Scheduler("ModPingsSchedule")
+ self._shift_scheduler = Scheduler("ModPingsSchedule")
self.guild = None
self.moderators_role = None
+ async def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return await has_any_role(*MODERATION_ROLES).predicate(ctx)
+
async def cog_load(self) -> None:
"""Schedule both when to reapply role and all mod ping schedules."""
- # await self.reschedule_modpings_schedule()
- await self.reschedule_roles()
-
- async def reschedule_roles(self) -> None:
- """Reschedule moderators role re-apply times."""
await self.bot.wait_until_guild_available()
self.guild = self.bot.get_guild(Guild.id)
self.moderators_role = self.guild.get_role(Roles.moderators)
+ # await self.reschedule_modpings_schedule() TODO uncomment
+ await self.reschedule_roles()
+
+ async def reschedule_roles(self) -> None:
+ """Reschedule moderators role re-apply times."""
mod_team = self.guild.get_role(Roles.mod_team)
pings_on = self.moderators_role.members
pings_off = await self.pings_off_mods.to_dict()
@@ -83,40 +88,39 @@ class ModPings(Cog):
async def reschedule_modpings_schedule(self) -> None:
"""Reschedule moderators schedule ping."""
- await self.bot.wait_until_guild_available()
schedule_cache = await self.modpings_schedule.to_dict()
log.info("Scheduling modpings schedule for applicable moderators found in cache.")
for mod_id, schedule in schedule_cache.items():
start_timestamp, work_time = schedule.split("|")
- start = datetime.datetime.fromtimestamp(float(start_timestamp), tz=datetime.UTC)
+ start = datetime.fromtimestamp(float(start_timestamp), tz=UTC) # TODO What if it's in the past?
mod = await self.bot.fetch_user(mod_id)
- self._modpings_scheduler.schedule_at(
+ self._shift_scheduler.schedule_at(
start,
mod_id,
- self.add_role_schedule(mod, work_time, start)
+ self.add_role_by_schedule(mod, work_time, start)
)
- async def remove_role_schedule(self, mod: Member, work_time: int, schedule_start: datetime.datetime) -> None:
- """Removes the moderator's role to the given moderator."""
+ async def remove_role_by_schedule(self, mod: Member, shift_time: float, schedule_start: datetime) -> None:
+ """Removes the moderators role from the given moderator according to schedule."""
log.trace(f"Removing moderator role from mod with ID {mod.id}")
await mod.remove_roles(self.moderators_role, reason="Moderator schedule time expired.")
# Remove the task before scheduling it again
- self._modpings_scheduler.cancel(mod.id)
+ self._shift_scheduler.cancel(mod.id)
# Add the task again
log.trace(f"Adding mod pings schedule task again for mod with ID {mod.id}")
- schedule_start += datetime.timedelta(days=1)
- self._modpings_scheduler.schedule_at(
+ schedule_start += timedelta(days=1)
+ self._shift_scheduler.schedule_at(
schedule_start,
mod.id,
- self.add_role_schedule(mod, work_time, schedule_start)
+ self.add_role_by_schedule(mod, shift_time, schedule_start)
)
- async def add_role_schedule(self, mod: Member, work_time: int, schedule_start: datetime.datetime) -> None:
- """Adds the moderator's role to the given moderator."""
+ async def add_role_by_schedule(self, mod: Member, shift_time: float, schedule_start: datetime) -> None:
+ """Adds the moderators role to the given moderator."""
# If the moderator has pings off, then skip adding role
if mod.id in await self.pings_off_mods.to_dict():
log.trace(f"Skipping adding moderator role to mod with ID {mod.id} - found in pings off cache.")
@@ -124,24 +128,23 @@ class ModPings(Cog):
log.trace(f"Applying moderator role to mod with ID {mod.id}")
await mod.add_roles(self.moderators_role, reason="Moderator scheduled time started!")
- log.trace(f"Sleeping for {work_time} seconds, worktime for mod with ID {mod.id}")
- await asyncio.sleep(work_time)
- await self.remove_role_schedule(mod, work_time, schedule_start)
+ log.trace(f"Sleeping for {shift_time} seconds, worktime for mod with ID {mod.id}")
+ await asyncio.sleep(shift_time) # TODO don't hang the coroutine or call directly, rely on the scheduler.
+ await self.remove_role_by_schedule(mod, shift_time, schedule_start)
async def reapply_role(self, mod: Member) -> None:
- """Reapply the moderator's role to the given moderator."""
+ """Reapply the moderators role to the given moderator."""
log.trace(f"Re-applying role to mod with ID {mod.id}.")
+ # TODO currently doesn't care about whether mod is off schedule
await mod.add_roles(self.moderators_role, reason="Pings off period expired.")
await self.pings_off_mods.delete(mod.id)
@group(name="modpings", aliases=("modping",), invoke_without_command=True)
- @has_any_role(*MODERATION_ROLES)
async def modpings_group(self, ctx: Context) -> None:
"""Allow the removal and re-addition of the pingable moderators role."""
await ctx.send_help(ctx.command)
@modpings_group.command(name="off")
- @has_any_role(*MODERATION_ROLES)
async def off_command(self, ctx: Context, duration: Expiry) -> None:
"""
Temporarily removes the pingable moderators role for a set amount of time.
@@ -161,7 +164,7 @@ class ModPings(Cog):
The duration cannot be longer than 30 days.
""" # noqa: RUF002
delta = duration - arrow.utcnow()
- if delta > datetime.timedelta(days=30):
+ if delta > timedelta(days=30):
await ctx.send(":x: Cannot remove the role for longer than 30 days.")
return
@@ -183,7 +186,6 @@ class ModPings(Cog):
)
@modpings_group.command(name="on")
- @has_any_role(*MODERATION_ROLES)
async def on_command(self, ctx: Context) -> None:
"""Re-apply the pingable moderators role."""
mod = ctx.author
@@ -205,41 +207,51 @@ class ModPings(Cog):
aliases=("s",),
invoke_without_command=True
)
- @has_any_role(*MODERATION_ROLES)
- async def schedule_modpings(self, ctx: Context, start: str, end: str) -> None:
- """Schedule modpings role to be added at <start> and removed at <end> everyday at UTC time!"""
- start, end = dateutil_parse(start), dateutil_parse(end)
+ async def schedule_modpings(self, ctx: Context, start: str, end: str, tz: int | None) -> None:
+ """
+ Schedule modpings role to be added at <start> time and removed at <end> time.
+
+ Start and end times should be specified in a HH:MM format.
+
+ You may specify a time zone offset for convenience. Times are considered in UTC by default.
+
+ The schedule may be temporarily overridden using the on/off commands.
+ """
+ try:
+ start, end = dateutil_parse(start), dateutil_parse(end)
+ except dateutil.parser._parser.ParserError as e:
+ raise BadArgument(str(e).capitalize())
if end < start:
- end += datetime.timedelta(days=1)
+ end += timedelta(days=1)
- if (end - start) > datetime.timedelta(hours=MAXIMUM_WORK_LIMIT):
- await ctx.send(
- f":x: {ctx.author.mention} You can't have the modpings role for"
- f" more than {MAXIMUM_WORK_LIMIT} hours!"
+ if (end - start) > timedelta(hours=MAXIMUM_WORK_LIMIT):
+ await ctx.reply(
+ f":x: You can't have a schedule with mod pings on for more than {MAXIMUM_WORK_LIMIT} hours!"
+ " If you want to remove your schedule use the `modpings schedule delete` command."
)
return
- if start < datetime.datetime.now(datetime.UTC):
+ if start < datetime.now(UTC):
# The datetime has already gone for the day, so make it tomorrow
- # otherwise the scheduler would schedule it immediately
- start += datetime.timedelta(days=1)
+ # otherwise the scheduler would schedule it immediately TODO but why not?
+ start += timedelta(days=1)
- work_time = (end - start).total_seconds()
+ shift_time = (end - start).total_seconds()
- await self.modpings_schedule.set(ctx.author.id, f"{start.timestamp()}|{work_time}")
+ await self.modpings_schedule.set(ctx.author.id, f"{start.timestamp()}|{shift_time}")
- if ctx.author.id in self._modpings_scheduler:
- self._modpings_scheduler.cancel(ctx.author.id)
+ if ctx.author.id in self._shift_scheduler:
+ self._shift_scheduler.cancel(ctx.author.id) # TODO here as well need to see if role should be re-applied.
- self._modpings_scheduler.schedule_at(
+ self._shift_scheduler.schedule_at(
start,
ctx.author.id,
- self.add_role_schedule(ctx.author, work_time, start)
+ self.add_role_by_schedule(ctx.author, shift_time, start)
)
- await ctx.send(
- f"{Emojis.ok_hand} {ctx.author.mention} Scheduled mod pings from "
+ await ctx.reply(
+ f"{Emojis.ok_hand} Scheduled mod pings from "
f"{discord_timestamp(start, TimestampFormats.TIME)} to "
f"{discord_timestamp(end, TimestampFormats.TIME)}!"
)
@@ -247,15 +259,16 @@ class ModPings(Cog):
@schedule_modpings.command(name="delete", aliases=("del", "d"))
async def modpings_schedule_delete(self, ctx: Context) -> None:
"""Delete your modpings schedule."""
- self._modpings_scheduler.cancel(ctx.author.id)
+ self._shift_scheduler.cancel(ctx.author.id)
await self.modpings_schedule.delete(ctx.author.id)
+ # TODO: Apply the pingable role if was off schedule and pings not off
await ctx.send(f"{Emojis.ok_hand} {ctx.author.mention} Deleted your modpings schedule!")
async def cog_unload(self) -> None:
"""Cancel role tasks when the cog unloads."""
log.trace("Cog unload: cancelling all scheduled tasks.")
self._role_scheduler.cancel_all()
- self._modpings_scheduler.cancel_all()
+ self._shift_scheduler.cancel_all()
async def setup(bot: Bot) -> None: