diff options
| author | 2019-09-23 16:16:10 +0200 | |
|---|---|---|
| committer | 2019-09-23 16:16:10 +0200 | |
| commit | f4f3b2a15ecdce5291ef2d2d98b0af6d77fbc228 (patch) | |
| tree | 250036c9d5162c6ee62d1a7bd6c999a03a2caad5 /bot/cogs/moderation.py | |
| parent | Change log.error to log.exception (diff) | |
| parent | Merge branch 'master' of https://github.com/python-discord/bot into python-di... (diff) | |
Merge branch 'python-discord-master'
Diffstat (limited to '')
| -rw-r--r-- | bot/cogs/moderation.py | 816 |
1 files changed, 367 insertions, 449 deletions
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py index 73359c88c..81b3864a7 100644 --- a/bot/cogs/moderation.py +++ b/bot/cogs/moderation.py @@ -1,25 +1,25 @@ import asyncio import logging import textwrap -from typing import Union +from datetime import datetime +from typing import Dict, Union -from aiohttp import ClientError from discord import ( - Colour, Embed, Forbidden, Guild, HTTPException, Member, Object, User + Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User ) from discord.ext.commands import ( - BadArgument, BadUnionArgument, Bot, Context, command, group + BadArgument, BadUnionArgument, Bot, Cog, Context, command, group ) from bot import constants from bot.cogs.modlog import ModLog -from bot.constants import Colours, Event, Icons, Keys, MODERATION_ROLES, URLs -from bot.converters import InfractionSearchQuery +from bot.constants import Colours, Event, Icons, MODERATION_ROLES +from bot.converters import ExpirationDate, InfractionSearchQuery from bot.decorators import with_role from bot.pagination import LinePaginator -from bot.utils.moderation import post_infraction +from bot.utils.moderation import already_has_active_infraction, post_infraction from bot.utils.scheduling import Scheduler, create_task -from bot.utils.time import parse_rfc1123, wait_until +from bot.utils.time import wait_until log = logging.getLogger(__name__) @@ -28,11 +28,12 @@ INFRACTION_ICONS = { "Kick": Icons.sign_out, "Ban": Icons.user_ban } -RULES_URL = "https://pythondiscord.com/about/rules" +RULES_URL = "https://pythondiscord.com/pages/rules" APPEALABLE_INFRACTIONS = ("Ban", "Mute") def proxy_user(user_id: str) -> Object: + """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved.""" try: user_id = int(user_id) except ValueError: @@ -46,54 +47,41 @@ def proxy_user(user_id: str) -> Object: UserTypes = Union[Member, User, proxy_user] -class Moderation(Scheduler): - """ - Server moderation tools. - """ +class Moderation(Scheduler, Cog): + """Server moderation tools.""" def __init__(self, bot: Bot): self.bot = bot - self.headers = {"X-API-KEY": Keys.site_api} self._muted_role = Object(constants.Roles.muted) super().__init__() @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") - async def on_ready(self): + @Cog.listener() + async def on_ready(self) -> None: + """Schedule expiration for previous infractions.""" # Schedule expiration for previous infractions - response = await self.bot.http_session.get( - URLs.site_infractions, - params={"dangling": "true"}, - headers=self.headers + infractions = await self.bot.api_client.get( + 'bot/infractions', params={'active': 'true'} ) - infraction_list = await response.json() - for infraction_object in infraction_list: - if infraction_object["expires_at"] is not None: - self.schedule_task(self.bot.loop, infraction_object["id"], infraction_object) + for infraction in infractions: + if infraction["expires_at"] is not None: + self.schedule_task(self.bot.loop, infraction["id"], infraction) # region: Permanent infractions @with_role(*MODERATION_ROLES) @command() - async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None): - """ - Create a warning infraction in the database for a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the warning. - """ - - response_object = await post_infraction(ctx, user, type="warning", reason=reason) - if response_object is None: + async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: + """Create a warning infraction in the database for a user.""" + infraction = await post_infraction(ctx, user, type="warning", reason=reason) + if infraction is None: return - notified = await self.notify_infraction( - user=user, - infr_type="Warning", - reason=reason - ) + notified = await self.notify_infraction(user=user, infr_type="Warning", reason=reason) dm_result = ":incoming_envelope: " if notified else "" action = f"{dm_result}:ok_hand: warned {user.mention}" @@ -122,33 +110,23 @@ class Moderation(Scheduler): Reason: {reason} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command() - async def kick(self, ctx: Context, user: Member, *, reason: str = None): - """ - Kicks a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the kick. - """ - + async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Kicks a user with the provided reason.""" if not await self.respect_role_hierarchy(ctx, user, 'kick'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method return - response_object = await post_infraction(ctx, user, type="kick", reason=reason) - if response_object is None: + infraction = await post_infraction(ctx, user, type="kick", reason=reason) + if infraction is None: return - notified = await self.notify_infraction( - user=user, - infr_type="Kick", - reason=reason - ) + notified = await self.notify_infraction(user=user, infr_type="Kick", reason=reason) self.mod_log.ignore(Event.member_remove, user.id) @@ -182,32 +160,28 @@ class Moderation(Scheduler): Reason: {reason} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command() - async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None): - """ - Create a permanent ban infraction in the database for a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the ban. - """ - + async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: + """Create a permanent ban infraction for a user with the provided reason.""" if not await self.respect_role_hierarchy(ctx, user, 'ban'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method return - response_object = await post_infraction(ctx, user, type="ban", reason=reason) - if response_object is None: + if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): + return + + infraction = await post_infraction(ctx, user, type="ban", reason=reason) + if infraction is None: return notified = await self.notify_infraction( user=user, infr_type="Ban", - duration="Permanent", reason=reason ) @@ -246,21 +220,18 @@ class Moderation(Scheduler): Reason: {reason} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command() - async def mute(self, ctx: Context, user: Member, *, reason: str = None): - """ - Create a permanent mute infraction in the database for a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the mute. - """ + async def mute(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Create a permanent mute infraction for a user with the provided reason.""" + if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): + return - response_object = await post_infraction(ctx, user, type="mute", reason=reason) - if response_object is None: + infraction = await post_infraction(ctx, user, type="mute", reason=reason) + if infraction is None: return self.mod_log.ignore(Event.member_update, user.id) @@ -269,7 +240,7 @@ class Moderation(Scheduler): notified = await self.notify_infraction( user=user, infr_type="Mute", - duration="Permanent", + expires_at="Permanent", reason=reason ) @@ -300,7 +271,7 @@ class Moderation(Scheduler): Reason: {reason} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) # endregion @@ -308,19 +279,19 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @command() - async def tempmute(self, ctx: Context, user: Member, duration: str, *, reason: str = None): + async def tempmute(self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None) -> None: """ - Create a temporary mute infraction in the database for a user. + Create a temporary mute infraction for a user with the provided expiration and reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary mute infraction - **`reason`:** The reason for the temporary mute. + Duration strings are parsed per: http://strftime.org/ """ + expiration = duration - response_object = await post_infraction( - ctx, user, type="mute", reason=reason, duration=duration - ) - if response_object is None: + if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): + return + + infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration) + if infraction is None: return self.mod_log.ignore(Event.member_update, user.id) @@ -329,14 +300,17 @@ class Moderation(Scheduler): notified = await self.notify_infraction( user=user, infr_type="Mute", - duration=duration, + expires_at=expiration, reason=reason ) - infraction_object = response_object["infraction"] - infraction_expiration = infraction_object["expires_at"] + infraction_expiration = ( + datetime + .fromisoformat(infraction["expires_at"][:-1]) + .strftime('%c') + ) - self.schedule_task(ctx.bot.loop, infraction_object["id"], infraction_object) + self.schedule_task(ctx.bot.loop, infraction["id"], infraction) dm_result = ":incoming_envelope: " if notified else "" action = f"{dm_result}:ok_hand: muted {user.mention} until {infraction_expiration}" @@ -363,41 +337,38 @@ class Moderation(Scheduler): Actor: {ctx.message.author} DM: {dm_status} Reason: {reason} - Duration: {duration} Expires: {infraction_expiration} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command() - async def tempban( - self, ctx: Context, user: UserTypes, duration: str, *, reason: str = None - ): + async def tempban(self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None) -> None: """ - Create a temporary ban infraction in the database for a user. + Create a temporary ban infraction for a user with the provided expiration and reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary ban infraction - **`reason`:** The reason for the temporary ban. + Duration strings are parsed per: http://strftime.org/ """ + expiration = duration if not await self.respect_role_hierarchy(ctx, user, 'tempban'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method return - response_object = await post_infraction( - ctx, user, type="ban", reason=reason, duration=duration - ) - if response_object is None: + if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): + return + + infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration) + if infraction is None: return notified = await self.notify_infraction( user=user, infr_type="Ban", - duration=duration, + expires_at=expiration, reason=reason ) @@ -410,10 +381,13 @@ class Moderation(Scheduler): except Forbidden: action_result = False - infraction_object = response_object["infraction"] - infraction_expiration = infraction_object["expires_at"] + infraction_expiration = ( + datetime + .fromisoformat(infraction["expires_at"][:-1]) + .strftime('%c') + ) - self.schedule_task(ctx.bot.loop, infraction_object["id"], infraction_object) + self.schedule_task(ctx.bot.loop, infraction["id"], infraction) dm_result = ":incoming_envelope: " if notified else "" action = f"{dm_result}:ok_hand: banned {user.mention} until {infraction_expiration}" @@ -439,11 +413,10 @@ class Moderation(Scheduler): Actor: {ctx.message.author} DM: {dm_status} Reason: {reason} - Duration: {duration} Expires: {infraction_expiration} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) # endregion @@ -451,18 +424,14 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn']) - async def note(self, ctx: Context, user: UserTypes, *, reason: str = None): + async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: """ - Create a private infraction note in the database for a user. + Create a private infraction note in the database for a user with the provided reason. - **`user`:** accepts user mention, ID, etc. - **`reason`:** The reason for the warning. + This does not send the user a notification """ - - response_object = await post_infraction( - ctx, user, type="warning", reason=reason, hidden=True - ) - if response_object is None: + infraction = await post_infraction(ctx, user, type="warning", reason=reason, hidden=True) + if infraction is None: return if reason is None: @@ -480,26 +449,24 @@ class Moderation(Scheduler): Actor: {ctx.message.author} Reason: {reason} """), - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=['shadowkick', 'skick']) - async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None): + async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: """ - Kicks a user. + Kick a user for the provided reason. - **`user`:** accepts user mention, ID, etc. - **`reason`:** The reason for the kick. + This does not send the user a notification. """ - if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method return - response_object = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True) - if response_object is None: + infraction = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True) + if infraction is None: return self.mod_log.ignore(Event.member_remove, user.id) @@ -533,26 +500,27 @@ class Moderation(Scheduler): Reason: {reason} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=['shadowban', 'sban']) - async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None): + async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: """ - Create a permanent ban infraction in the database for a user. + Create a permanent ban infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the ban. + This does not send the user a notification. """ - if not await self.respect_role_hierarchy(ctx, user, 'shadowban'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method return - response_object = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True) - if response_object is None: + if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): + return + + infraction = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True) + if infraction is None: return self.mod_log.ignore(Event.member_ban, user.id) @@ -587,21 +555,22 @@ class Moderation(Scheduler): Reason: {reason} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=['shadowmute', 'smute']) - async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None): + async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None: """ - Create a permanent mute infraction in the database for a user. + Create a permanent mute infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the mute. + This does not send the user a notification. """ + if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): + return - response_object = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True) - if response_object is None: + infraction = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True) + if infraction is None: return self.mod_log.ignore(Event.member_update, user.id) @@ -622,7 +591,7 @@ class Moderation(Scheduler): Actor: {ctx.message.author} Reason: {reason} """), - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) # endregion @@ -631,29 +600,34 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=["shadowtempmute, stempmute"]) async def shadow_tempmute( - self, ctx: Context, user: Member, duration: str, *, reason: str = None - ): + self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None + ) -> None: """ - Create a temporary mute infraction in the database for a user. + Create a temporary mute infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary mute infraction - **`reason`:** The reason for the temporary mute. + Duration strings are parsed per: http://strftime.org/ + + This does not send the user a notification. """ + expiration = duration - response_object = await post_infraction( - ctx, user, type="mute", reason=reason, duration=duration, hidden=True - ) - if response_object is None: + if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): + return + + infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration, hidden=True) + if infraction is None: return self.mod_log.ignore(Event.member_update, user.id) await user.add_roles(self._muted_role, reason=reason) - infraction_object = response_object["infraction"] - infraction_expiration = infraction_object["expires_at"] + infraction_expiration = ( + datetime + .fromisoformat(infraction["expires_at"][:-1]) + .strftime('%c') + ) - self.schedule_expiration(ctx.bot.loop, infraction_object) + self.schedule_task(ctx.bot.loop, infraction["id"], infraction) if reason is None: await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.") @@ -671,34 +645,35 @@ class Moderation(Scheduler): Member: {user.mention} (`{user.id}`) Actor: {ctx.message.author} Reason: {reason} - Duration: {duration} Expires: {infraction_expiration} """), - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=["shadowtempban, stempban"]) async def shadow_tempban( - self, ctx: Context, user: UserTypes, duration: str, *, reason: str = None - ): + self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None + ) -> None: """ - Create a temporary ban infraction in the database for a user. + Create a temporary ban infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary ban infraction - **`reason`:** The reason for the temporary ban. + Duration strings are parsed per: http://strftime.org/ + + This does not send the user a notification. """ + expiration = duration if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method return - response_object = await post_infraction( - ctx, user, type="ban", reason=reason, duration=duration, hidden=True - ) - if response_object is None: + if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): + return + + infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration, hidden=True) + if infraction is None: return self.mod_log.ignore(Event.member_ban, user.id) @@ -710,10 +685,13 @@ class Moderation(Scheduler): except Forbidden: action_result = False - infraction_object = response_object["infraction"] - infraction_expiration = infraction_object["expires_at"] + infraction_expiration = ( + datetime + .fromisoformat(infraction["expires_at"][:-1]) + .strftime('%c') + ) - self.schedule_expiration(ctx.bot.loop, infraction_object) + self.schedule_task(ctx.bot.loop, infraction["id"], infraction) if reason is None: await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.") @@ -739,11 +717,10 @@ class Moderation(Scheduler): Member: {user.mention} (`{user.id}`) Actor: {ctx.message.author} Reason: {reason} - Duration: {duration} Expires: {infraction_expiration} """), content=log_content, - footer=f"ID {response_object['infraction']['id']}" + footer=f"ID {infraction['id']}" ) # endregion @@ -751,40 +728,32 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @command() - async def unmute(self, ctx: Context, user: Member): - """ - Deactivates the active mute infraction for a user. - - **`user`:** Accepts user mention, ID, etc. - """ - + async def unmute(self, ctx: Context, user: UserTypes) -> None: + """Deactivates the active mute infraction for a user.""" try: # check the current active infraction - response = await self.bot.http_session.get( - URLs.site_infractions_user_type_current.format( - user_id=user.id, - infraction_type="mute" - ), - headers=self.headers + response = await self.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': 'mute', + 'user__id': user.id + } ) + if len(response) > 1: + log.warning("Found more than one active mute infraction for user `%d`", user.id) - response_object = await response.json() - if "error_code" in response_object: - return await ctx.send( - ":x: There was an error removing the infraction: " - f"{response_object['error_message']}" - ) - - infraction_object = response_object["infraction"] - if infraction_object is None: + if not response: # no active infraction - return await ctx.send( + await ctx.send( f":x: There is no active mute infraction for user {user.mention}." ) + return - await self._deactivate_infraction(infraction_object) - if infraction_object["expires_at"] is not None: - self.cancel_expiration(infraction_object["id"]) + for infraction in response: + await self._deactivate_infraction(infraction) + if infraction["expires_at"] is not None: + self.cancel_expiration(infraction["id"]) notified = await self.notify_pardon( user=user, @@ -804,61 +773,82 @@ class Moderation(Scheduler): await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.") + embed_text = textwrap.dedent( + f""" + Member: {user.mention} (`{user.id}`) + Actor: {ctx.message.author} + DM: {dm_status} + """ + ) + + if len(response) > 1: + footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + title = "Member unmuted" + embed_text += "Note: User had multiple **active** mute infractions in the database." + else: + infraction = response[0] + footer = f"Infraction ID: {infraction['id']}" + title = "Member unmuted" + # Send a log message to the mod log await self.mod_log.send_log_message( icon_url=Icons.user_unmute, colour=Colour(Colours.soft_green), - title="Member unmuted", + title=title, thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Intended expiry: {infraction_object['expires_at']} - DM: {dm_status} - """), - footer=infraction_object["id"], + text=embed_text, + footer=footer, content=log_content ) - - except Exception as e: - log.exception("There was an error removing an infraction.", exc_info=e) + except Exception: + log.exception("There was an error removing an infraction.") await ctx.send(":x: There was an error removing the infraction.") @with_role(*MODERATION_ROLES) @command() - async def unban(self, ctx: Context, user: UserTypes): - """ - Deactivates the active ban infraction for a user. - - **`user`:** Accepts user mention, ID, etc. - """ - + async def unban(self, ctx: Context, user: UserTypes) -> None: + """Deactivates the active ban infraction for a user.""" try: # check the current active infraction - response = await self.bot.http_session.get( - URLs.site_infractions_user_type_current.format( - user_id=user.id, - infraction_type="ban" - ), - headers=self.headers + response = await self.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': 'ban', + 'user__id': str(user.id) + } ) - response_object = await response.json() - if "error_code" in response_object: - return await ctx.send( - ":x: There was an error removing the infraction: " - f"{response_object['error_message']}" + if len(response) > 1: + log.warning( + "More than one active ban infraction found for user `%d`.", + user.id ) - infraction_object = response_object["infraction"] - if infraction_object is None: + if not response: # no active infraction - return await ctx.send( + await ctx.send( f":x: There is no active ban infraction for user {user.mention}." ) + return + + for infraction in response: + await self._deactivate_infraction(infraction) + if infraction["expires_at"] is not None: + self.cancel_expiration(infraction["id"]) - await self._deactivate_infraction(infraction_object) - if infraction_object["expires_at"] is not None: - self.cancel_expiration(infraction_object["id"]) + embed_text = textwrap.dedent( + f""" + Member: {user.mention} (`{user.id}`) + Actor: {ctx.message.author} + """ + ) + + if len(response) > 1: + footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + embed_text += "Note: User had multiple **active** ban infractions in the database." + else: + infraction = response[0] + footer = f"Infraction ID: {infraction['id']}" await ctx.send(f":ok_hand: Un-banned {user.mention}.") @@ -868,11 +858,8 @@ class Moderation(Scheduler): colour=Colour(Colours.soft_green), title="Member unbanned", thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Intended expiry: {infraction_object['expires_at']} - """) + text=embed_text, + footer=footer, ) except Exception: log.exception("There was an error removing an infraction.") @@ -883,70 +870,68 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) - async def infraction_group(self, ctx: Context): + async def infraction_group(self, ctx: Context) -> None: """Infraction manipulation commands.""" - await ctx.invoke(self.bot.get_command("help"), "infraction") @with_role(*MODERATION_ROLES) @infraction_group.group(name='edit', invoke_without_command=True) - async def infraction_edit_group(self, ctx: Context): + async def infraction_edit_group(self, ctx: Context) -> None: """Infraction editing commands.""" - await ctx.invoke(self.bot.get_command("help"), "infraction", "edit") @with_role(*MODERATION_ROLES) @infraction_edit_group.command(name="duration") - async def edit_duration(self, ctx: Context, infraction_id: str, duration: str): + async def edit_duration( + self, ctx: Context, + infraction_id: int, expires_at: Union[ExpirationDate, str] + ) -> None: """ - Sets the duration of the given infraction, relative to the time of - updating. + Sets the duration of the given infraction, relative to the time of updating. - **`infraction_id`:** The ID (UUID) of the infraction. - **`duration`:** The new duration of the infraction, relative to the - time of updating. Use "permanent" to the infraction as permanent. + Duration strings are parsed per: http://strftime.org/, use "permanent" to mark the infraction as permanent. """ + if isinstance(expires_at, str) and expires_at != 'permanent': + raise BadArgument( + "If `expires_at` is given as a non-datetime, " + "it must be `permanent`." + ) + if expires_at == 'permanent': + expires_at = None try: - previous = await self.bot.http_session.get( - URLs.site_infractions_by_id.format( - infraction_id=infraction_id - ), - headers=self.headers + previous_infraction = await self.bot.api_client.get( + 'bot/infractions/' + str(infraction_id) ) - previous_object = await previous.json() - - if duration == "permanent": - duration = None # check the current active infraction - response = await self.bot.http_session.patch( - URLs.site_infractions, + infraction = await self.bot.api_client.patch( + 'bot/infractions/' + str(infraction_id), json={ - "id": infraction_id, - "duration": duration - }, - headers=self.headers + 'expires_at': ( + expires_at.isoformat() + if expires_at is not None + else None + ) + } ) - response_object = await response.json() - if "error_code" in response_object or response_object.get("success") is False: - return await ctx.send( - ":x: There was an error updating the infraction: " - f"{response_object['error_message']}" - ) - infraction_object = response_object["infraction"] # Re-schedule - self.cancel_task(infraction_id) + self.cancel_task(infraction['id']) loop = asyncio.get_event_loop() - self.schedule_task(loop, infraction_object["id"], infraction_object) + self.schedule_task(loop, infraction['id'], infraction) - if duration is None: + if expires_at is None: await ctx.send(f":ok_hand: Updated infraction: marked as permanent.") else: + human_expiry = ( + datetime + .fromisoformat(infraction['expires_at'][:-1]) + .strftime('%c') + ) await ctx.send( ":ok_hand: Updated infraction: set to expire on " - f"{infraction_object['expires_at']}." + f"{human_expiry}." ) except Exception: @@ -954,10 +939,8 @@ class Moderation(Scheduler): await ctx.send(":x: There was an error updating the infraction.") return - prev_infraction = previous_object["infraction"] - # Get information about the infraction's user - user_id = int(infraction_object["user"]["user_id"]) + user_id = infraction["user"] user = ctx.guild.get_member(user_id) if user: @@ -968,7 +951,7 @@ class Moderation(Scheduler): thumbnail = None # The infraction's actor - actor_id = int(infraction_object["actor"]["user_id"]) + actor_id = infraction["actor"] actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" await self.mod_log.send_log_message( @@ -980,55 +963,33 @@ class Moderation(Scheduler): Member: {member_text} Actor: {actor} Edited by: {ctx.message.author} - Previous expiry: {prev_infraction['expires_at']} - New expiry: {infraction_object['expires_at']} + Previous expiry: {previous_infraction['expires_at']} + New expiry: {infraction['expires_at']} """) ) @with_role(*MODERATION_ROLES) @infraction_edit_group.command(name="reason") - async def edit_reason(self, ctx: Context, infraction_id: str, *, reason: str): - """ - Sets the reason of the given infraction. - **`infraction_id`:** The ID (UUID) of the infraction. - **`reason`:** The new reason of the infraction. - """ - + async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None: + """Edit the reason of the given infraction.""" try: - previous = await self.bot.http_session.get( - URLs.site_infractions_by_id.format( - infraction_id=infraction_id - ), - headers=self.headers + old_infraction = await self.bot.api_client.get( + 'bot/infractions/' + str(infraction_id) ) - previous_object = await previous.json() - - response = await self.bot.http_session.patch( - URLs.site_infractions, - json={ - "id": infraction_id, - "reason": reason - }, - headers=self.headers + updated_infraction = await self.bot.api_client.patch( + 'bot/infractions/' + str(infraction_id), + json={'reason': reason} ) - response_object = await response.json() - if "error_code" in response_object or response_object.get("success") is False: - return await ctx.send( - ":x: There was an error updating the infraction: " - f"{response_object['error_message']}" - ) - await ctx.send(f":ok_hand: Updated infraction: set reason to \"{reason}\".") + except Exception: log.exception("There was an error updating an infraction.") - return await ctx.send(":x: There was an error updating the infraction.") - - new_infraction = response_object["infraction"] - prev_infraction = previous_object["infraction"] + await ctx.send(":x: There was an error updating the infraction.") + return # Get information about the infraction's user - user_id = int(new_infraction["user"]["user_id"]) + user_id = updated_infraction['user'] user = ctx.guild.get_member(user_id) if user: @@ -1039,7 +1000,7 @@ class Moderation(Scheduler): thumbnail = None # The infraction's actor - actor_id = int(new_infraction["actor"]["user_id"]) + actor_id = updated_infraction['actor'] actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" await self.mod_log.send_log_message( @@ -1051,8 +1012,8 @@ class Moderation(Scheduler): Member: {user_text} Actor: {actor} Edited by: {ctx.message.author} - Previous reason: {prev_infraction['reason']} - New reason: {new_infraction['reason']} + Previous reason: {old_infraction['reason']} + New reason: {updated_infraction['reason']} """) ) @@ -1061,11 +1022,8 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @infraction_group.group(name="search", invoke_without_command=True) - async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery): - """ - Searches for infractions in the database. - """ - + async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: + """Searches for infractions in the database.""" if isinstance(query, User): await ctx.invoke(self.search_user, query) @@ -1074,72 +1032,44 @@ class Moderation(Scheduler): @with_role(*MODERATION_ROLES) @infraction_search_group.command(name="user", aliases=("member", "id")) - async def search_user(self, ctx: Context, user: Union[User, proxy_user]): - """ - Search for infractions by member. - """ - - try: - response = await self.bot.http_session.get( - URLs.site_infractions_user.format( - user_id=user.id - ), - params={"hidden": "True"}, - headers=self.headers - ) - infraction_list = await response.json() - except ClientError: - log.exception(f"Failed to fetch infractions for user {user} ({user.id}).") - await ctx.send(":x: An error occurred while fetching infractions.") - return - + async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None: + """Search for infractions by member.""" + infraction_list = await self.bot.api_client.get( + 'bot/infractions', + params={'user__id': str(user.id)} + ) embed = Embed( title=f"Infractions for {user} ({len(infraction_list)} total)", colour=Colour.orange() ) - await self.send_infraction_list(ctx, embed, infraction_list) @with_role(*MODERATION_ROLES) @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) - async def search_reason(self, ctx: Context, reason: str): - """ - Search for infractions by their reason. Use Re2 for matching. - """ - - try: - response = await self.bot.http_session.get( - URLs.site_infractions, - params={"search": reason, "hidden": "True"}, - headers=self.headers - ) - infraction_list = await response.json() - except ClientError: - log.exception(f"Failed to fetch infractions matching reason `{reason}`.") - await ctx.send(":x: An error occurred while fetching infractions.") - return - + async def search_reason(self, ctx: Context, reason: str) -> None: + """Search for infractions by their reason. Use Re2 for matching.""" + infraction_list = await self.bot.api_client.get( + 'bot/infractions', params={'search': reason} + ) embed = Embed( title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", colour=Colour.orange() ) - await self.send_infraction_list(ctx, embed, infraction_list) # endregion # region: Utility functions - async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list): - + async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None: + """Send a paginated embed of infractions for the specified user.""" if not infractions: await ctx.send(f":warning: No infractions could be found for that query.") return - lines = [] - for infraction in infractions: - lines.append( - self._infraction_to_string(infraction) - ) + lines = tuple( + self._infraction_to_string(infraction) + for infraction in infractions + ) await LinePaginator.paginate( lines, @@ -1153,14 +1083,10 @@ class Moderation(Scheduler): # endregion # region: Utility functions - def schedule_expiration(self, loop: asyncio.AbstractEventLoop, infraction_object: dict): - """ - Schedules a task to expire a temporary infraction. - - :param loop: the asyncio event loop - :param infraction_object: the infraction object to expire at the end of the task - """ - + def schedule_expiration( + self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]] + ) -> None: + """Schedules a task to expire a temporary infraction.""" infraction_id = infraction_object["id"] if infraction_id in self.scheduled_tasks: return @@ -1169,12 +1095,8 @@ class Moderation(Scheduler): self.scheduled_tasks[infraction_id] = task - def cancel_expiration(self, infraction_id: str): - """ - Un-schedules a task set to expire a temporary infraction. - :param infraction_id: the ID of the infraction in question - """ - + def cancel_expiration(self, infraction_id: str) -> None: + """Un-schedules a task set to expire a temporary infraction.""" task = self.scheduled_tasks.get(infraction_id) if task is None: log.warning(f"Failed to unschedule {infraction_id}: no task found.") @@ -1183,19 +1105,17 @@ class Moderation(Scheduler): log.debug(f"Unscheduled {infraction_id}.") del self.scheduled_tasks[infraction_id] - async def _scheduled_task(self, infraction_object: dict): + async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: """ - A co-routine which marks an infraction as expired after the delay from the time of - scheduling to the time of expiration. At the time of expiration, the infraction is - marked as inactive on the website, and the expiration task is cancelled. + Marks an infraction expired after the delay from time of scheduling to time of expiration. - :param infraction_object: the infraction in question + At the time of expiration, the infraction is marked as inactive on the website, and the + expiration task is cancelled. The user is then notified via DM. """ - infraction_id = infraction_object["id"] # transform expiration to delay in seconds - expiration_datetime = parse_rfc1123(infraction_object["expires_at"]) + expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1]) await wait_until(expiration_datetime) log.debug(f"Marking infraction {infraction_id} as inactive (expired).") @@ -1204,7 +1124,7 @@ class Moderation(Scheduler): self.cancel_task(infraction_object["id"]) # Notify the user that they've been unmuted. - user_id = int(infraction_object["user"]["user_id"]) + user_id = infraction_object["user"] guild = self.bot.get_guild(constants.Guild.id) await self.notify_pardon( user=guild.get_member(user_id), @@ -1213,16 +1133,14 @@ class Moderation(Scheduler): icon_url=Icons.user_unmute ) - async def _deactivate_infraction(self, infraction_object): + async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: """ - A co-routine which marks an infraction as inactive on the website. This co-routine does - not cancel or un-schedule an expiration task. + A co-routine which marks an infraction as inactive on the website. - :param infraction_object: the infraction in question + This co-routine does not cancel or un-schedule an expiration task. """ - guild: Guild = self.bot.get_guild(constants.Guild.id) - user_id = int(infraction_object["user"]["user_id"]) + user_id = infraction_object["user"] infraction_type = infraction_object["type"] if infraction_type == "mute": @@ -1235,24 +1153,29 @@ class Moderation(Scheduler): log.warning(f"Failed to un-mute user: {user_id} (not found)") elif infraction_type == "ban": user: Object = Object(user_id) - await guild.unban(user) - - await self.bot.http_session.patch( - URLs.site_infractions, - headers=self.headers, - json={ - "id": infraction_object["id"], - "active": False - } + try: + await guild.unban(user) + except NotFound: + log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.") + + await self.bot.api_client.patch( + 'bot/infractions/' + str(infraction_object['id']), + json={"active": False} ) - def _infraction_to_string(self, infraction_object): - actor_id = int(infraction_object["actor"]["user_id"]) + def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str: + """Convert the infraction object to a string representation.""" + actor_id = infraction_object["actor"] guild: Guild = self.bot.get_guild(constants.Guild.id) actor = guild.get_member(actor_id) - active = infraction_object["active"] is True - user_id = int(infraction_object["user"]["user_id"]) - hidden = infraction_object.get("hidden", False) is True + active = infraction_object["active"] + user_id = infraction_object["user"] + hidden = infraction_object["hidden"] + created = datetime.fromisoformat(infraction_object["inserted_at"][:-1]).strftime("%Y-%m-%d %H:%M") + if infraction_object["expires_at"] is None: + expires = "*Permanent*" + else: + expires = datetime.fromisoformat(infraction_object["expires_at"][:-1]).strftime("%Y-%m-%d %H:%M") lines = textwrap.dedent(f""" {"**===============**" if active else "==============="} @@ -1261,8 +1184,8 @@ class Moderation(Scheduler): Type: **{infraction_object["type"]}** Shadow: {hidden} Reason: {infraction_object["reason"] or "*None*"} - Created: {infraction_object["inserted_at"]} - Expires: {infraction_object["expires_at"] or "*Permanent*"} + Created: {created} + Expires: {expires} Actor: {actor.mention if actor else actor_id} ID: `{infraction_object["id"]}` {"**===============**" if active else "==============="} @@ -1271,28 +1194,24 @@ class Moderation(Scheduler): return lines.strip() async def notify_infraction( - self, user: Union[User, Member], infr_type: str, duration: str = None, - reason: str = None - ): + self, + user: Union[User, Member], + infr_type: str, + expires_at: Union[datetime, str] = 'N/A', + reason: str = "No reason provided." + ) -> bool: """ - Notify a user of their fresh infraction :) + Attempt to notify a user, via DM, of their fresh infraction. - :param user: The user to send the message to. - :param infr_type: The type of infraction, as a string. - :param duration: The duration of the infraction. - :param reason: The reason for the infraction. + Returns a boolean indicator of whether the DM was successful. """ - - if duration is None: - duration = "N/A" - - if reason is None: - reason = "No reason provided." + if isinstance(expires_at, datetime): + expires_at = expires_at.strftime('%c') embed = Embed( description=textwrap.dedent(f""" **Type:** {infr_type} - **Duration:** {duration} + **Expires:** {expires_at} **Reason:** {reason} """), colour=Colour(Colours.soft_red) @@ -1309,18 +1228,17 @@ class Moderation(Scheduler): return await self.send_private_embed(user, embed) async def notify_pardon( - self, user: Union[User, Member], title: str, content: str, - icon_url: str = Icons.user_verified - ): + self, + user: Union[User, Member], + title: str, + content: str, + icon_url: str = Icons.user_verified + ) -> bool: """ - Notify a user that an infraction has been lifted. + Attempt to notify a user, via DM, of their expired infraction. - :param user: The user to send the message to. - :param title: The title of the embed. - :param content: The content of the embed. - :param icon_url: URL for the title icon. + Optionally returns a boolean indicator of whether the DM was successful. """ - embed = Embed( description=content, colour=Colour(Colours.soft_green) @@ -1330,16 +1248,14 @@ class Moderation(Scheduler): return await self.send_private_embed(user, embed) - async def send_private_embed(self, user: Union[User, Member], embed: Embed): + async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool: """ A helper method for sending an embed to a user's DMs. - :param user: The user to send the embed to. - :param embed: The embed to send. + Returns a boolean indicator of DM success. """ - # sometimes `user` is a `discord.Object`, so let's make it a proper user. - user = await self.bot.get_user_info(user.id) + user = await self.bot.fetch_user(user.id) try: await user.send(embed=embed) @@ -1351,7 +1267,8 @@ class Moderation(Scheduler): ) return False - async def log_notify_failure(self, target: str, actor: Member, infraction_type: str): + async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None: + """Send a mod log entry if an attempt to DM the target user has failed.""" await self.mod_log.send_log_message( icon_url=Icons.token_removed, content=actor.mention, @@ -1365,23 +1282,23 @@ class Moderation(Scheduler): # endregion - async def __error(self, ctx, error): + @staticmethod + async def cog_command_error(ctx: Context, error: Exception) -> None: + """Send a notification to the invoking context on a Union failure.""" if isinstance(error, BadUnionArgument): if User in error.converters: await ctx.send(str(error.errors[0])) + error.handled = True - async def respect_role_hierarchy(self, ctx: Context, target: UserTypes, infr_type: str) -> bool: + @staticmethod + async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool: """ Check if the highest role of the invoking member is greater than that of the target member. + If this check fails, a warning is sent to the invoking ctx. Returns True always if target is not a discord.Member instance. - - :param ctx: The command context when invoked. - :param target: The target of the infraction. - :param infr_type: The type of infraction. """ - if not isinstance(target, Member): return True @@ -1400,6 +1317,7 @@ class Moderation(Scheduler): return target_is_lower -def setup(bot): +def setup(bot: Bot) -> None: + """Moderation cog load.""" bot.add_cog(Moderation(bot)) log.info("Cog loaded: Moderation") |