diff options
author | 2025-08-13 02:17:24 +0300 | |
---|---|---|
committer | 2025-08-13 17:42:40 +0300 | |
commit | 494a0a045bd7957c971953215eaf84434eb85ec8 (patch) | |
tree | 14d66d2d74533f1b377e4398d04577069d3ed783 | |
parent | Dependency bumps (diff) |
Initial cleanup and map out issues
-rw-r--r-- | bot/exts/moderation/modpings.py | 117 |
1 files changed, 65 insertions, 52 deletions
diff --git a/bot/exts/moderation/modpings.py b/bot/exts/moderation/modpings.py index 002bc4cfe..289790a32 100644 --- a/bot/exts/moderation/modpings.py +++ b/bot/exts/moderation/modpings.py @@ -1,11 +1,12 @@ import asyncio -import datetime +from datetime import UTC, datetime, timedelta import arrow +import dateutil from async_rediscache import RedisCache from dateutil.parser import isoparse, parse as dateutil_parse from discord import Member -from discord.ext.commands import Cog, Context, group, has_any_role +from discord.ext.commands import BadArgument, Cog, Context, group, has_any_role from pydis_core.utils.members import get_or_fetch_member from pydis_core.utils.scheduling import Scheduler @@ -17,7 +18,7 @@ from bot.utils.time import TimestampFormats, discord_timestamp log = get_logger(__name__) -MAXIMUM_WORK_LIMIT = 16 +MAXIMUM_WORK_LIMIT = 23 class ModPings(Cog): @@ -29,29 +30,33 @@ class ModPings(Cog): pings_off_mods = RedisCache() # RedisCache[discord.Member.id, 'start timestamp|total worktime in seconds'] - # The cache's keys are mod's ID + # The cache's keys are mods' IDs # The cache's values are their pings on schedule timestamp and the total seconds (work time) until pings off modpings_schedule = RedisCache() def __init__(self, bot: Bot): self.bot = bot self._role_scheduler = Scheduler("ModPingsOnOff") - self._modpings_scheduler = Scheduler("ModPingsSchedule") + self._shift_scheduler = Scheduler("ModPingsSchedule") self.guild = None self.moderators_role = None + async def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return await has_any_role(*MODERATION_ROLES).predicate(ctx) + async def cog_load(self) -> None: """Schedule both when to reapply role and all mod ping schedules.""" - # await self.reschedule_modpings_schedule() - await self.reschedule_roles() - - async def reschedule_roles(self) -> None: - """Reschedule moderators role re-apply times.""" await self.bot.wait_until_guild_available() self.guild = self.bot.get_guild(Guild.id) self.moderators_role = self.guild.get_role(Roles.moderators) + # await self.reschedule_modpings_schedule() TODO uncomment + await self.reschedule_roles() + + async def reschedule_roles(self) -> None: + """Reschedule moderators role re-apply times.""" mod_team = self.guild.get_role(Roles.mod_team) pings_on = self.moderators_role.members pings_off = await self.pings_off_mods.to_dict() @@ -83,40 +88,39 @@ class ModPings(Cog): async def reschedule_modpings_schedule(self) -> None: """Reschedule moderators schedule ping.""" - await self.bot.wait_until_guild_available() schedule_cache = await self.modpings_schedule.to_dict() log.info("Scheduling modpings schedule for applicable moderators found in cache.") for mod_id, schedule in schedule_cache.items(): start_timestamp, work_time = schedule.split("|") - start = datetime.datetime.fromtimestamp(float(start_timestamp), tz=datetime.UTC) + start = datetime.fromtimestamp(float(start_timestamp), tz=UTC) # TODO What if it's in the past? mod = await self.bot.fetch_user(mod_id) - self._modpings_scheduler.schedule_at( + self._shift_scheduler.schedule_at( start, mod_id, - self.add_role_schedule(mod, work_time, start) + self.add_role_by_schedule(mod, work_time, start) ) - async def remove_role_schedule(self, mod: Member, work_time: int, schedule_start: datetime.datetime) -> None: - """Removes the moderator's role to the given moderator.""" + async def remove_role_by_schedule(self, mod: Member, shift_time: float, schedule_start: datetime) -> None: + """Removes the moderators role from the given moderator according to schedule.""" log.trace(f"Removing moderator role from mod with ID {mod.id}") await mod.remove_roles(self.moderators_role, reason="Moderator schedule time expired.") # Remove the task before scheduling it again - self._modpings_scheduler.cancel(mod.id) + self._shift_scheduler.cancel(mod.id) # Add the task again log.trace(f"Adding mod pings schedule task again for mod with ID {mod.id}") - schedule_start += datetime.timedelta(days=1) - self._modpings_scheduler.schedule_at( + schedule_start += timedelta(days=1) + self._shift_scheduler.schedule_at( schedule_start, mod.id, - self.add_role_schedule(mod, work_time, schedule_start) + self.add_role_by_schedule(mod, shift_time, schedule_start) ) - async def add_role_schedule(self, mod: Member, work_time: int, schedule_start: datetime.datetime) -> None: - """Adds the moderator's role to the given moderator.""" + async def add_role_by_schedule(self, mod: Member, shift_time: float, schedule_start: datetime) -> None: + """Adds the moderators role to the given moderator.""" # If the moderator has pings off, then skip adding role if mod.id in await self.pings_off_mods.to_dict(): log.trace(f"Skipping adding moderator role to mod with ID {mod.id} - found in pings off cache.") @@ -124,24 +128,23 @@ class ModPings(Cog): log.trace(f"Applying moderator role to mod with ID {mod.id}") await mod.add_roles(self.moderators_role, reason="Moderator scheduled time started!") - log.trace(f"Sleeping for {work_time} seconds, worktime for mod with ID {mod.id}") - await asyncio.sleep(work_time) - await self.remove_role_schedule(mod, work_time, schedule_start) + log.trace(f"Sleeping for {shift_time} seconds, worktime for mod with ID {mod.id}") + await asyncio.sleep(shift_time) # TODO don't hang the coroutine or call directly, rely on the scheduler. + await self.remove_role_by_schedule(mod, shift_time, schedule_start) async def reapply_role(self, mod: Member) -> None: - """Reapply the moderator's role to the given moderator.""" + """Reapply the moderators role to the given moderator.""" log.trace(f"Re-applying role to mod with ID {mod.id}.") + # TODO currently doesn't care about whether mod is off schedule await mod.add_roles(self.moderators_role, reason="Pings off period expired.") await self.pings_off_mods.delete(mod.id) @group(name="modpings", aliases=("modping",), invoke_without_command=True) - @has_any_role(*MODERATION_ROLES) async def modpings_group(self, ctx: Context) -> None: """Allow the removal and re-addition of the pingable moderators role.""" await ctx.send_help(ctx.command) @modpings_group.command(name="off") - @has_any_role(*MODERATION_ROLES) async def off_command(self, ctx: Context, duration: Expiry) -> None: """ Temporarily removes the pingable moderators role for a set amount of time. @@ -161,7 +164,7 @@ class ModPings(Cog): The duration cannot be longer than 30 days. """ # noqa: RUF002 delta = duration - arrow.utcnow() - if delta > datetime.timedelta(days=30): + if delta > timedelta(days=30): await ctx.send(":x: Cannot remove the role for longer than 30 days.") return @@ -183,7 +186,6 @@ class ModPings(Cog): ) @modpings_group.command(name="on") - @has_any_role(*MODERATION_ROLES) async def on_command(self, ctx: Context) -> None: """Re-apply the pingable moderators role.""" mod = ctx.author @@ -205,41 +207,51 @@ class ModPings(Cog): aliases=("s",), invoke_without_command=True ) - @has_any_role(*MODERATION_ROLES) - async def schedule_modpings(self, ctx: Context, start: str, end: str) -> None: - """Schedule modpings role to be added at <start> and removed at <end> everyday at UTC time!""" - start, end = dateutil_parse(start), dateutil_parse(end) + async def schedule_modpings(self, ctx: Context, start: str, end: str, tz: int | None) -> None: + """ + Schedule modpings role to be added at <start> time and removed at <end> time. + + Start and end times should be specified in a HH:MM format. + + You may specify a time zone offset for convenience. Times are considered in UTC by default. + + The schedule may be temporarily overridden using the on/off commands. + """ + try: + start, end = dateutil_parse(start), dateutil_parse(end) + except dateutil.parser._parser.ParserError as e: + raise BadArgument(str(e).capitalize()) if end < start: - end += datetime.timedelta(days=1) + end += timedelta(days=1) - if (end - start) > datetime.timedelta(hours=MAXIMUM_WORK_LIMIT): - await ctx.send( - f":x: {ctx.author.mention} You can't have the modpings role for" - f" more than {MAXIMUM_WORK_LIMIT} hours!" + if (end - start) > timedelta(hours=MAXIMUM_WORK_LIMIT): + await ctx.reply( + f":x: You can't have a schedule with mod pings on for more than {MAXIMUM_WORK_LIMIT} hours!" + " If you want to remove your schedule use the `modpings schedule delete` command." ) return - if start < datetime.datetime.now(datetime.UTC): + if start < datetime.now(UTC): # The datetime has already gone for the day, so make it tomorrow - # otherwise the scheduler would schedule it immediately - start += datetime.timedelta(days=1) + # otherwise the scheduler would schedule it immediately TODO but why not? + start += timedelta(days=1) - work_time = (end - start).total_seconds() + shift_time = (end - start).total_seconds() - await self.modpings_schedule.set(ctx.author.id, f"{start.timestamp()}|{work_time}") + await self.modpings_schedule.set(ctx.author.id, f"{start.timestamp()}|{shift_time}") - if ctx.author.id in self._modpings_scheduler: - self._modpings_scheduler.cancel(ctx.author.id) + if ctx.author.id in self._shift_scheduler: + self._shift_scheduler.cancel(ctx.author.id) # TODO here as well need to see if role should be re-applied. - self._modpings_scheduler.schedule_at( + self._shift_scheduler.schedule_at( start, ctx.author.id, - self.add_role_schedule(ctx.author, work_time, start) + self.add_role_by_schedule(ctx.author, shift_time, start) ) - await ctx.send( - f"{Emojis.ok_hand} {ctx.author.mention} Scheduled mod pings from " + await ctx.reply( + f"{Emojis.ok_hand} Scheduled mod pings from " f"{discord_timestamp(start, TimestampFormats.TIME)} to " f"{discord_timestamp(end, TimestampFormats.TIME)}!" ) @@ -247,15 +259,16 @@ class ModPings(Cog): @schedule_modpings.command(name="delete", aliases=("del", "d")) async def modpings_schedule_delete(self, ctx: Context) -> None: """Delete your modpings schedule.""" - self._modpings_scheduler.cancel(ctx.author.id) + self._shift_scheduler.cancel(ctx.author.id) await self.modpings_schedule.delete(ctx.author.id) + # TODO: Apply the pingable role if was off schedule and pings not off await ctx.send(f"{Emojis.ok_hand} {ctx.author.mention} Deleted your modpings schedule!") async def cog_unload(self) -> None: """Cancel role tasks when the cog unloads.""" log.trace("Cog unload: cancelling all scheduled tasks.") self._role_scheduler.cancel_all() - self._modpings_scheduler.cancel_all() + self._shift_scheduler.cancel_all() async def setup(bot: Bot) -> None: |