diff options
Diffstat (limited to 'bot/exts/moderation/infraction/infractions.py')
| -rw-r--r-- | bot/exts/moderation/infraction/infractions.py | 278 | 
1 files changed, 191 insertions, 87 deletions
| diff --git a/bot/exts/moderation/infraction/infractions.py b/bot/exts/moderation/infraction/infractions.py index f19323c7c..af42ab1b8 100644 --- a/bot/exts/moderation/infraction/infractions.py +++ b/bot/exts/moderation/infraction/infractions.py @@ -1,4 +1,3 @@ -import logging  import textwrap  import typing as t @@ -10,14 +9,20 @@ from discord.ext.commands import Context, command  from bot import constants  from bot.bot import Bot  from bot.constants import Event -from bot.converters import Duration, Expiry, FetchedMember +from bot.converters import Age, Duration, Expiry, MemberOrUser, UnambiguousMemberOrUser  from bot.decorators import respect_role_hierarchy  from bot.exts.moderation.infraction import _utils  from bot.exts.moderation.infraction._scheduler import InfractionScheduler -from bot.exts.moderation.infraction._utils import UserSnowflake +from bot.log import get_logger +from bot.utils.members import get_or_fetch_member  from bot.utils.messages import format_user -log = logging.getLogger(__name__) +log = get_logger(__name__) + +if t.TYPE_CHECKING: +    from bot.exts.moderation.clean import Clean +    from bot.exts.moderation.infraction.management import ModManagement +    from bot.exts.moderation.watchchannels.bigbrother import BigBrother  class Infractions(InfractionScheduler, commands.Cog): @@ -27,7 +32,7 @@ class Infractions(InfractionScheduler, commands.Cog):      category_description = "Server moderation tools."      def __init__(self, bot: Bot): -        super().__init__(bot, supported_infractions={"ban", "kick", "mute", "note", "warning", "voice_ban"}) +        super().__init__(bot, supported_infractions={"ban", "kick", "mute", "note", "warning", "voice_mute"})          self.category = "Moderation"          self._muted_role = discord.Object(constants.Roles.muted) @@ -54,7 +59,7 @@ class Infractions(InfractionScheduler, commands.Cog):      # region: Permanent infractions      @command() -    async def warn(self, ctx: Context, user: FetchedMember, *, reason: t.Optional[str] = None) -> None: +    async def warn(self, ctx: Context, user: UnambiguousMemberOrUser, *, reason: t.Optional[str] = None) -> None:          """Warn a user for the given reason."""          if not isinstance(user, Member):              await ctx.send(":x: The user doesn't appear to be on the server.") @@ -67,7 +72,7 @@ class Infractions(InfractionScheduler, commands.Cog):          await self.apply_infraction(ctx, infraction, user)      @command() -    async def kick(self, ctx: Context, user: FetchedMember, *, reason: t.Optional[str] = None) -> None: +    async def kick(self, ctx: Context, user: UnambiguousMemberOrUser, *, reason: t.Optional[str] = None) -> None:          """Kick a user for the given reason."""          if not isinstance(user, Member):              await ctx.send(":x: The user doesn't appear to be on the server.") @@ -79,7 +84,7 @@ class Infractions(InfractionScheduler, commands.Cog):      async def ban(          self,          ctx: Context, -        user: FetchedMember, +        user: UnambiguousMemberOrUser,          duration: t.Optional[Expiry] = None,          *,          reason: t.Optional[str] = None @@ -91,37 +96,85 @@ class Infractions(InfractionScheduler, commands.Cog):          """          await self.apply_ban(ctx, user, reason, expires_at=duration) -    @command(aliases=('pban',)) -    async def purgeban( +    @command(aliases=("cban", "purgeban", "pban")) +    async def cleanban(          self,          ctx: Context, -        user: FetchedMember, +        user: UnambiguousMemberOrUser,          duration: t.Optional[Expiry] = None,          *,          reason: t.Optional[str] = None      ) -> None:          """ -        Same as ban but removes all their messages of the last 24 hours. +        Same as ban, but also cleans all their messages from the last hour.          If duration is specified, it temporarily bans that user for the given duration.          """ -        await self.apply_ban(ctx, user, reason, 1, expires_at=duration) +        clean_cog: t.Optional[Clean] = self.bot.get_cog("Clean") +        if clean_cog is None: +            # If we can't get the clean cog, fall back to native purgeban. +            await self.apply_ban(ctx, user, reason, purge_days=1, expires_at=duration) +            return + +        infraction = await self.apply_ban(ctx, user, reason, expires_at=duration) +        if not infraction or not infraction.get("id"): +            # Ban was unsuccessful, quit early. +            await ctx.send(":x: Failed to apply ban.") +            log.error("Failed to apply ban to user %d", user.id) +            return + +        # Calling commands directly skips Discord.py's convertors, so we need to convert args manually. +        clean_time = await Age().convert(ctx, "1h") + +        log_url = await clean_cog._clean_messages( +            ctx, +            users=[user], +            channels="*", +            first_limit=clean_time, +            attempt_delete_invocation=False, +        ) +        if not log_url: +            # Cleaning failed, or there were no messages to clean, exit early. +            return + +        infr_manage_cog: t.Optional[ModManagement] = self.bot.get_cog("ModManagement") +        if infr_manage_cog is None: +            # If we can't get the mod management cog, don't bother appending the log. +            return + +        # Overwrite the context's send function so infraction append +        # doesn't output the update infraction confirmation message. +        async def send(*args, **kwargs) -> None: +            pass +        ctx.send = send +        await infr_manage_cog.infraction_append(ctx, infraction, None, reason=f"[Clean log]({log_url})") + +    @command(aliases=("vban",)) +    async def voiceban(self, ctx: Context) -> None: +        """ +        NOT IMPLEMENTED. -    @command(aliases=('vban',)) -    async def voiceban( +        Permanently ban a user from joining voice channels. + +        If duration is specified, it temporarily voice bans that user for the given duration. +        """ +        await ctx.send(":x: This command is not yet implemented. Maybe you meant to use `voicemute`?") + +    @command(aliases=("vmute",)) +    async def voicemute(          self,          ctx: Context, -        user: FetchedMember, +        user: UnambiguousMemberOrUser,          duration: t.Optional[Expiry] = None,          *,          reason: t.Optional[str]      ) -> None:          """ -        Permanently ban user from using voice channels. +        Permanently mute user in voice channels. -        If duration is specified, it temporarily voice bans that user for the given duration. +        If duration is specified, it temporarily voice mutes that user for the given duration.          """ -        await self.apply_voice_ban(ctx, user, reason, expires_at=duration) +        await self.apply_voice_mute(ctx, user, reason, expires_at=duration)      # endregion      # region: Temporary infractions @@ -129,7 +182,7 @@ class Infractions(InfractionScheduler, commands.Cog):      @command(aliases=["mute"])      async def tempmute(          self, ctx: Context, -        user: FetchedMember, +        user: UnambiguousMemberOrUser,          duration: t.Optional[Expiry] = None,          *,          reason: t.Optional[str] = None @@ -163,7 +216,7 @@ class Infractions(InfractionScheduler, commands.Cog):      async def tempban(          self,          ctx: Context, -        user: FetchedMember, +        user: UnambiguousMemberOrUser,          duration: Expiry,          *,          reason: t.Optional[str] = None @@ -186,16 +239,25 @@ class Infractions(InfractionScheduler, commands.Cog):          await self.apply_ban(ctx, user, reason, expires_at=duration)      @command(aliases=("tempvban", "tvban")) -    async def tempvoiceban( -            self, -            ctx: Context, -            user: FetchedMember, -            duration: Expiry, -            *, -            reason: t.Optional[str] +    async def tempvoiceban(self, ctx: Context) -> None: +        """ +        NOT IMPLEMENTED. + +        Temporarily voice bans that user for the given duration. +        """ +        await ctx.send(":x: This command is not yet implemented. Maybe you meant to use `tempvoicemute`?") + +    @command(aliases=("tempvmute", "tvmute")) +    async def tempvoicemute( +        self, +        ctx: Context, +        user: UnambiguousMemberOrUser, +        duration: Expiry, +        *, +        reason: t.Optional[str]      ) -> None:          """ -        Temporarily voice ban a user for the given reason and duration. +        Temporarily voice mute a user for the given reason and duration.          A unit of time should be appended to the duration.          Units (∗case-sensitive): @@ -209,13 +271,13 @@ class Infractions(InfractionScheduler, commands.Cog):          Alternatively, an ISO 8601 timestamp can be provided for the duration.          """ -        await self.apply_voice_ban(ctx, user, reason, expires_at=duration) +        await self.apply_voice_mute(ctx, user, reason, expires_at=duration)      # endregion      # region: Permanent shadow infractions      @command(hidden=True) -    async def note(self, ctx: Context, user: FetchedMember, *, reason: t.Optional[str] = None) -> None: +    async def note(self, ctx: Context, user: UnambiguousMemberOrUser, *, reason: t.Optional[str] = None) -> None:          """Create a private note for a user with the given reason without notifying the user."""          infraction = await _utils.post_infraction(ctx, user, "note", reason, hidden=True, active=False)          if infraction is None: @@ -224,7 +286,7 @@ class Infractions(InfractionScheduler, commands.Cog):          await self.apply_infraction(ctx, infraction, user)      @command(hidden=True, aliases=['shadowban', 'sban']) -    async def shadow_ban(self, ctx: Context, user: FetchedMember, *, reason: t.Optional[str] = None) -> None: +    async def shadow_ban(self, ctx: Context, user: UnambiguousMemberOrUser, *, reason: t.Optional[str] = None) -> None:          """Permanently ban a user for the given reason without notifying the user."""          await self.apply_ban(ctx, user, reason, hidden=True) @@ -235,7 +297,7 @@ class Infractions(InfractionScheduler, commands.Cog):      async def shadow_tempban(          self,          ctx: Context, -        user: FetchedMember, +        user: UnambiguousMemberOrUser,          duration: Expiry,          *,          reason: t.Optional[str] = None @@ -261,27 +323,47 @@ class Infractions(InfractionScheduler, commands.Cog):      # region: Remove infractions (un- commands)      @command() -    async def unmute(self, ctx: Context, user: FetchedMember) -> None: +    async def unmute(self, ctx: Context, user: UnambiguousMemberOrUser) -> None:          """Prematurely end the active mute infraction for the user."""          await self.pardon_infraction(ctx, "mute", user)      @command() -    async def unban(self, ctx: Context, user: FetchedMember) -> None: +    async def unban(self, ctx: Context, user: UnambiguousMemberOrUser) -> None:          """Prematurely end the active ban infraction for the user."""          await self.pardon_infraction(ctx, "ban", user)      @command(aliases=("uvban",)) -    async def unvoiceban(self, ctx: Context, user: FetchedMember) -> None: -        """Prematurely end the active voice ban infraction for the user.""" -        await self.pardon_infraction(ctx, "voice_ban", user) +    async def unvoiceban(self, ctx: Context) -> None: +        """ +        NOT IMPLEMENTED. + +        Temporarily voice bans that user for the given duration. +        """ +        await ctx.send(":x: This command is not yet implemented. Maybe you meant to use `unvoicemute`?") + +    @command(aliases=("uvmute",)) +    async def unvoicemute(self, ctx: Context, user: UnambiguousMemberOrUser) -> None: +        """Prematurely end the active voice mute infraction for the user.""" +        await self.pardon_infraction(ctx, "voice_mute", user)      # endregion      # region: Base apply functions      async def apply_mute(self, ctx: Context, user: Member, reason: t.Optional[str], **kwargs) -> None:          """Apply a mute infraction with kwargs passed to `post_infraction`.""" -        if await _utils.get_active_infraction(ctx, user, "mute"): -            return +        if active := await _utils.get_active_infraction(ctx, user, "mute", send_msg=False): +            if active["actor"] != self.bot.user.id: +                await _utils.send_active_infraction_message(ctx, active) +                return + +            # Allow the current mute attempt to override an automatically triggered mute. +            log_text = await self.deactivate_infraction(active, notify=False) +            if "Failure" in log_text: +                await ctx.send( +                    f":x: can't override infraction **mute** for {user.mention}: " +                    f"failed to deactivate. {log_text['Failure']}" +                ) +                return          infraction = await _utils.post_infraction(ctx, user, "mute", reason, active=True, **kwargs)          if infraction is None: @@ -304,6 +386,10 @@ class Infractions(InfractionScheduler, commands.Cog):      @respect_role_hierarchy(member_arg=2)      async def apply_kick(self, ctx: Context, user: Member, reason: t.Optional[str], **kwargs) -> None:          """Apply a kick infraction with kwargs passed to `post_infraction`.""" +        if user.top_role >= ctx.me.top_role: +            await ctx.send(":x: I can't kick users above or equal to me in the role hierarchy.") +            return +          infraction = await _utils.post_infraction(ctx, user, "kick", reason, active=False, **kwargs)          if infraction is None:              return @@ -320,16 +406,20 @@ class Infractions(InfractionScheduler, commands.Cog):      async def apply_ban(          self,          ctx: Context, -        user: UserSnowflake, +        user: MemberOrUser,          reason: t.Optional[str],          purge_days: t.Optional[int] = 0,          **kwargs -    ) -> None: +    ) -> t.Optional[dict]:          """          Apply a ban infraction with kwargs passed to `post_infraction`.          Will also remove the banned user from the Big Brother watch list if applicable.          """ +        if isinstance(user, Member) and user.top_role >= ctx.me.top_role: +            await ctx.send(":x: I can't ban users above or equal to me in the role hierarchy.") +            return None +          # In the case of a permanent ban, we don't need get_active_infractions to tell us if one is active          is_temporary = kwargs.get("expires_at") is not None          active_infraction = await _utils.get_active_infraction(ctx, user, "ban", is_temporary) @@ -337,19 +427,19 @@ class Infractions(InfractionScheduler, commands.Cog):          if active_infraction:              if is_temporary:                  log.trace("Tempban ignored as it cannot overwrite an active ban.") -                return +                return None              if active_infraction.get('expires_at') is None:                  log.trace("Permaban already exists, notify.")                  await ctx.send(f":x: User is already permanently banned (#{active_infraction['id']}).") -                return +                return None              log.trace("Old tempban is being replaced by new permaban.") -            await self.pardon_infraction(ctx, "ban", user, is_temporary) +            await self.pardon_infraction(ctx, "ban", user, send_msg=is_temporary)          infraction = await _utils.post_infraction(ctx, user, "ban", reason, active=True, **kwargs)          if infraction is None: -            return +            return None          infraction["purge"] = "purge " if purge_days else "" @@ -361,27 +451,25 @@ class Infractions(InfractionScheduler, commands.Cog):          action = ctx.guild.ban(user, reason=reason, delete_message_days=purge_days)          await self.apply_infraction(ctx, infraction, user, action) +        bb_cog: t.Optional[BigBrother] = self.bot.get_cog("Big Brother")          if infraction.get('expires_at') is not None:              log.trace(f"Ban isn't permanent; user {user} won't be unwatched by Big Brother.") -            return - -        bb_cog = self.bot.get_cog("Big Brother") -        if not bb_cog: +        elif not bb_cog:              log.error(f"Big Brother cog not loaded; perma-banned user {user} won't be unwatched.") -            return - -        log.trace(f"Big Brother cog loaded; attempting to unwatch perma-banned user {user}.") +        else: +            log.trace(f"Big Brother cog loaded; attempting to unwatch perma-banned user {user}.") +            bb_reason = "User has been permanently banned from the server. Automatically removed." +            await bb_cog.apply_unwatch(ctx, user, bb_reason, send_message=False) -        bb_reason = "User has been permanently banned from the server. Automatically removed." -        await bb_cog.apply_unwatch(ctx, user, bb_reason, send_message=False) +        return infraction      @respect_role_hierarchy(member_arg=2) -    async def apply_voice_ban(self, ctx: Context, user: UserSnowflake, reason: t.Optional[str], **kwargs) -> None: -        """Apply a voice ban infraction with kwargs passed to `post_infraction`.""" -        if await _utils.get_active_infraction(ctx, user, "voice_ban"): +    async def apply_voice_mute(self, ctx: Context, user: MemberOrUser, reason: t.Optional[str], **kwargs) -> None: +        """Apply a voice mute infraction with kwargs passed to `post_infraction`.""" +        if await _utils.get_active_infraction(ctx, user, "voice_mute"):              return -        infraction = await _utils.post_infraction(ctx, user, "voice_ban", reason, active=True, **kwargs) +        infraction = await _utils.post_infraction(ctx, user, "voice_mute", reason, active=True, **kwargs)          if infraction is None:              return @@ -395,7 +483,7 @@ class Infractions(InfractionScheduler, commands.Cog):              if not isinstance(user, Member):                  return -            await user.move_to(None, reason="Disconnected from voice to apply voiceban.") +            await user.move_to(None, reason="Disconnected from voice to apply voice mute.")              await user.remove_roles(self._voice_verified_role, reason=reason)          await self.apply_infraction(ctx, infraction, user, action()) @@ -403,9 +491,16 @@ class Infractions(InfractionScheduler, commands.Cog):      # endregion      # region: Base pardon functions -    async def pardon_mute(self, user_id: int, guild: discord.Guild, reason: t.Optional[str]) -> t.Dict[str, str]: -        """Remove a user's muted role, DM them a notification, and return a log dict.""" -        user = guild.get_member(user_id) +    async def pardon_mute( +        self, +        user_id: int, +        guild: discord.Guild, +        reason: t.Optional[str], +        *, +        notify: bool = True +    ) -> t.Dict[str, str]: +        """Remove a user's muted role, optionally DM them a notification, and return a log dict.""" +        user = await get_or_fetch_member(guild, user_id)          log_text = {}          if user: @@ -413,16 +508,17 @@ class Infractions(InfractionScheduler, commands.Cog):              self.mod_log.ignore(Event.member_update, user.id)              await user.remove_roles(self._muted_role, reason=reason) -            # DM the user about the expiration. -            notified = await _utils.notify_pardon( -                user=user, -                title="You have been unmuted", -                content="You may now send messages in the server.", -                icon_url=_utils.INFRACTION_ICONS["mute"][1] -            ) +            if notify: +                # DM the user about the expiration. +                notified = await _utils.notify_pardon( +                    user=user, +                    title="You have been unmuted", +                    content="You may now send messages in the server.", +                    icon_url=_utils.INFRACTION_ICONS["mute"][1] +                ) +                log_text["DM"] = "Sent" if notified else "**Failed**"              log_text["Member"] = format_user(user) -            log_text["DM"] = "Sent" if notified else "**Failed**"          else:              log.info(f"Failed to unmute user {user_id}: user not found")              log_text["Failure"] = "User was not found in the guild." @@ -444,31 +540,39 @@ class Infractions(InfractionScheduler, commands.Cog):          return log_text -    async def pardon_voice_ban(self, user_id: int, guild: discord.Guild, reason: t.Optional[str]) -> t.Dict[str, str]: -        """Add Voice Verified role back to user, DM them a notification, and return a log dict.""" -        user = guild.get_member(user_id) +    async def pardon_voice_mute( +        self, +        user_id: int, +        guild: discord.Guild, +        *, +        notify: bool = True +    ) -> t.Dict[str, str]: +        """Optionally DM the user a pardon notification and return a log dict.""" +        user = await get_or_fetch_member(guild, user_id)          log_text = {}          if user: -            # DM user about infraction expiration -            notified = await _utils.notify_pardon( -                user=user, -                title="Voice ban ended", -                content="You have been unbanned and can verify yourself again in the server.", -                icon_url=_utils.INFRACTION_ICONS["voice_ban"][1] -            ) +            if notify: +                # DM user about infraction expiration +                notified = await _utils.notify_pardon( +                    user=user, +                    title="Voice mute ended", +                    content="You have been unmuted and can verify yourself again in the server.", +                    icon_url=_utils.INFRACTION_ICONS["voice_mute"][1] +                ) +                log_text["DM"] = "Sent" if notified else "**Failed**"              log_text["Member"] = format_user(user) -            log_text["DM"] = "Sent" if notified else "**Failed**"          else:              log_text["Info"] = "User was not found in the guild."          return log_text -    async def _pardon_action(self, infraction: _utils.Infraction) -> t.Optional[t.Dict[str, str]]: +    async def _pardon_action(self, infraction: _utils.Infraction, notify: bool) -> t.Optional[t.Dict[str, str]]:          """          Execute deactivation steps specific to the infraction's type and return a log dict. +        If `notify` is True, notify the user of the pardon via DM where applicable.          If an infraction type is unsupported, return None instead.          """          guild = self.bot.get_guild(constants.Guild.id) @@ -476,11 +580,11 @@ class Infractions(InfractionScheduler, commands.Cog):          reason = f"Infraction #{infraction['id']} expired or was pardoned."          if infraction["type"] == "mute": -            return await self.pardon_mute(user_id, guild, reason) +            return await self.pardon_mute(user_id, guild, reason, notify=notify)          elif infraction["type"] == "ban":              return await self.pardon_ban(user_id, guild, reason) -        elif infraction["type"] == "voice_ban": -            return await self.pardon_voice_ban(user_id, guild, reason) +        elif infraction["type"] == "voice_mute": +            return await self.pardon_voice_mute(user_id, guild, notify=notify)      # endregion @@ -493,7 +597,7 @@ class Infractions(InfractionScheduler, commands.Cog):      async def cog_command_error(self, ctx: Context, error: Exception) -> None:          """Send a notification to the invoking context on a Union failure."""          if isinstance(error, commands.BadUnionArgument): -            if discord.User in error.converters or discord.Member in error.converters: +            if discord.User in error.converters or Member in error.converters:                  await ctx.send(str(error.errors[0]))                  error.handled = True | 
