diff options
| -rw-r--r-- | bot/cogs/moderation/infractions.py | 221 | 
1 files changed, 91 insertions, 130 deletions
| diff --git a/bot/cogs/moderation/infractions.py b/bot/cogs/moderation/infractions.py index 84a58df66..59276279e 100644 --- a/bot/cogs/moderation/infractions.py +++ b/bot/cogs/moderation/infractions.py @@ -161,139 +161,12 @@ class Infractions(Scheduler, Cog):      @command()      async def unmute(self, ctx: Context, user: MemberConverter) -> None:          """Deactivates the active mute infraction for a user.""" -        try: -            # check the current active infraction -            response = await self.bot.api_client.get( -                'bot/infractions', -                params={ -                    'active': 'true', -                    'type': 'mute', -                    'user__id': user.id -                } -            ) -            if len(response) > 1: -                log.warning("Found more than one active mute infraction for user `%d`", user.id) - -            if not response: -                # no active infraction -                await ctx.send( -                    f":x: There is no active mute infraction for user {user.mention}." -                ) -                return - -            for infraction in response: -                await self.deactivate_infraction(infraction) -                if infraction["expires_at"] is not None: -                    self.cancel_expiration(infraction["id"]) - -            notified = await self.notify_pardon( -                user=user, -                title="You have been unmuted.", -                content="You may now send messages in the server.", -                icon_url=Icons.user_unmute -            ) - -            if notified: -                dm_status = "Sent" -                dm_emoji = ":incoming_envelope: " -                log_content = None -            else: -                dm_status = "**Failed**" -                dm_emoji = "" -                log_content = ctx.author.mention - -            await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.") - -            embed_text = textwrap.dedent( -                f""" -                    Member: {user.mention} (`{user.id}`) -                    Actor: {ctx.message.author} -                    DM: {dm_status} -                """ -            ) - -            if len(response) > 1: -                footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" -                title = "Member unmuted" -                embed_text += "Note: User had multiple **active** mute infractions in the database." -            else: -                infraction = response[0] -                footer = f"Infraction ID: {infraction['id']}" -                title = "Member unmuted" - -            # Send a log message to the mod log -            await self.mod_log.send_log_message( -                icon_url=Icons.user_unmute, -                colour=Colour(Colours.soft_green), -                title=title, -                thumbnail=user.avatar_url_as(static_format="png"), -                text=embed_text, -                footer=footer, -                content=log_content -            ) -        except Exception: -            log.exception("There was an error removing an infraction.") -            await ctx.send(":x: There was an error removing the infraction.") +        await self.pardon_infraction(ctx, "mute", user)      @command()      async def unban(self, ctx: Context, user: MemberConverter) -> None:          """Deactivates the active ban infraction for a user.""" -        try: -            # check the current active infraction -            response = await self.bot.api_client.get( -                'bot/infractions', -                params={ -                    'active': 'true', -                    'type': 'ban', -                    'user__id': str(user.id) -                } -            ) -            if len(response) > 1: -                log.warning( -                    "More than one active ban infraction found for user `%d`.", -                    user.id -                ) - -            if not response: -                # no active infraction -                await ctx.send( -                    f":x: There is no active ban infraction for user {user.mention}." -                ) -                return - -            for infraction in response: -                await self.deactivate_infraction(infraction) -                if infraction["expires_at"] is not None: -                    self.cancel_expiration(infraction["id"]) - -            embed_text = textwrap.dedent( -                f""" -                    Member: {user.mention} (`{user.id}`) -                    Actor: {ctx.message.author} -                """ -            ) - -            if len(response) > 1: -                footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" -                embed_text += "Note: User had multiple **active** ban infractions in the database." -            else: -                infraction = response[0] -                footer = f"Infraction ID: {infraction['id']}" - -            await ctx.send(f":ok_hand: Un-banned {user.mention}.") - -            # Send a log message to the mod log -            await self.mod_log.send_log_message( -                icon_url=Icons.user_unban, -                colour=Colour(Colours.soft_green), -                title="Member unbanned", -                thumbnail=user.avatar_url_as(static_format="png"), -                text=embed_text, -                footer=footer, -            ) -        except Exception: -            log.exception("There was an error removing an infraction.") -            await ctx.send(":x: There was an error removing the infraction.") +        await self.pardon_infraction(ctx, "ban", user)      # endregion      # region: Base infraction functions @@ -422,7 +295,7 @@ class Infractions(Scheduler, Cog):                          icon_url=INFRACTION_ICONS["mute"][1]                      ) -                    log_text["DM"] = "Sent" if notified else "Failed" +                    log_text["DM"] = "Sent" if notified else "**Failed**"                  else:                      log.info(f"Failed to unmute user {user_id}: user not found")                      log_text["Failure"] = "User was not found in the guild." @@ -605,6 +478,94 @@ class Infractions(Scheduler, Cog):              footer=f"ID {infraction['id']}"          ) +    async def pardon_infraction(self, ctx: Context, infr_type: str, user: MemberObject) -> None: +        """Prematurely end an infraction for a user and log the action in the mod log.""" +        # Check the current active infraction +        response = await self.bot.api_client.get( +            'bot/infractions', +            params={ +                'active': 'true', +                'type': infr_type, +                'user__id': user.id +            } +        ) + +        if not response: +            await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.") +            return + +        # Deactivate the infraction and cancel its scheduled expiration task. +        log_text = await self.deactivate_infraction(response[0], send_log=False) +        if response[0]["expires_at"] is not None: +            self.cancel_expiration(response[0]["id"]) + +        log_text["Member"] = f"{user.mention}(`{user.id}`)" +        log_text["Actor"] = str(ctx.message.author) +        log_content = None +        footer = f"Infraction ID: {response[0]['id']}" + +        # If multiple active infractions were found, mark them as inactive in the database +        # and cancel their expiration tasks. +        if len(response) > 1: +            log.warning(f"Found more than one active {infr_type} infraction for user {user.id}") + +            footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" +            log_text["Note"] = f"Found multiple **active** {infr_type} infractions in the database." + +            # deactivate_infraction() is not called again because: +            #     1. Discord cannot store multiple active bans or assign multiples of the same role +            #     2. It would send a pardon DM for each active infraction, which is redundant +            for infraction in response[1:]: +                _id = infraction['id'] +                try: +                    # Mark infraction as inactive in the database. +                    await self.bot.api_client.patch( +                        f"bot/infractions/{_id}", +                        json={"active": False} +                    ) +                except ResponseCodeError: +                    log.exception(f"Failed to deactivate infraction #{_id} ({infr_type})") +                    # This is simpler and cleaner than trying to concatenate all the errors. +                    log_text["Failure"] = "See bot's logs for details." + +                # Cancel pending expiration tasks. +                if infraction["expires_at"] is not None: +                    self.cancel_expiration(infraction["id"]) + +        # Accordingly display whether the user was successfully notified via DM. +        dm_emoji = "" +        if log_text.get("DM") == "Sent": +            dm_emoji = ":incoming_envelope: " +        elif "DM" in log_text: +            # Mention the actor because the DM failed to send. +            log_content = ctx.author.mention + +        # Accordingly display whether the pardon failed. +        if "Failure" in log_text: +            confirm_msg = ":x: failed to pardon" +            log_title = "pardon failed" +            log_content = ctx.author.mention +        else: +            confirm_msg = f":ok_hand: pardoned" +            log_title = "pardoned" + +        # Send the confirmation message to the invoking context. +        await ctx.send( +            f"{dm_emoji}{confirm_msg} infraction **{infr_type}** for {user.mention}. " +            f"{log_text.get('Failure', '')}" +        ) + +        # Send a log message to the mod log. +        await self.mod_log.send_log_message( +            icon_url=INFRACTION_ICONS[infr_type][1], +            colour=Colour(Colours.soft_green), +            title=f"Infraction {log_title}: {infr_type}", +            thumbnail=user.avatar_url_as(static_format="png"), +            text="\n".join(f"{k}: {v}" for k, v in log_text), +            footer=footer, +            content=log_content, +        ) +      # endregion      # This cannot be static (must have a __func__ attribute). | 
