diff options
| -rw-r--r-- | bot/cogs/moderation/infractions.py | 422 | ||||
| -rw-r--r-- | bot/cogs/moderation/scheduler.py | 368 | ||||
| -rw-r--r-- | bot/cogs/moderation/superstarify.py | 284 | ||||
| -rw-r--r-- | bot/cogs/moderation/utils.py | 9 | ||||
| -rw-r--r-- | bot/cogs/watchchannels/bigbrother.py | 10 | ||||
| -rw-r--r-- | bot/cogs/watchchannels/talentpool.py | 17 | ||||
| -rw-r--r-- | bot/constants.py | 3 | ||||
| -rw-r--r-- | config-default.yml | 3 | ||||
| -rw-r--r-- | tests/bot/rules/test_links.py | 101 | 
9 files changed, 677 insertions, 540 deletions
| diff --git a/bot/cogs/moderation/infractions.py b/bot/cogs/moderation/infractions.py index 997ffe524..2713a1b68 100644 --- a/bot/cogs/moderation/infractions.py +++ b/bot/cogs/moderation/infractions.py @@ -1,24 +1,17 @@  import logging -import textwrap  import typing as t -from datetime import datetime -from gettext import ngettext -import dateutil.parser  import discord  from discord import Member  from discord.ext import commands  from discord.ext.commands import Context, command  from bot import constants -from bot.api import ResponseCodeError -from bot.constants import Colours, Event, STAFF_CHANNELS +from bot.constants import Event  from bot.decorators import respect_role_hierarchy -from bot.utils import time  from bot.utils.checks import with_role_check -from bot.utils.scheduling import Scheduler  from . import utils -from .modlog import ModLog +from .scheduler import InfractionScheduler  from .utils import MemberObject  log = logging.getLogger(__name__) @@ -26,67 +19,35 @@ log = logging.getLogger(__name__)  MemberConverter = t.Union[utils.UserTypes, utils.proxy_user] -class Infractions(Scheduler, commands.Cog): +class Infractions(InfractionScheduler, commands.Cog):      """Apply and pardon infractions on users for moderation purposes."""      category = "Moderation"      category_description = "Server moderation tools."      def __init__(self, bot: commands.Bot): -        super().__init__() +        super().__init__(bot, supported_infractions={"ban", "kick", "mute", "note", "warning"}) -        self.bot = bot          self.category = "Moderation"          self._muted_role = discord.Object(constants.Roles.muted) -        self.bot.loop.create_task(self.reschedule_infractions()) - -    @property -    def mod_log(self) -> ModLog: -        """Get currently loaded ModLog cog instance.""" -        return self.bot.get_cog("ModLog") - -    async def reschedule_infractions(self) -> None: -        """Schedule expiration for previous infractions.""" -        await self.bot.wait_until_ready() - -        infractions = await self.bot.api_client.get( -            'bot/infractions', -            params={'active': 'true'} -        ) -        for infraction in infractions: -            if infraction["expires_at"] is not None: -                self.schedule_task(self.bot.loop, infraction["id"], infraction) -      @commands.Cog.listener()      async def on_member_join(self, member: Member) -> None:          """Reapply active mute infractions for returning members."""          active_mutes = await self.bot.api_client.get( -            'bot/infractions', +            "bot/infractions",              params={ -                'user__id': str(member.id), -                'type': 'mute', -                'active': 'true' +                "active": "true", +                "type": "mute", +                "user__id": member.id              }          ) -        if not active_mutes: -            return -        # Assume a single mute because of restrictions elsewhere. -        mute = active_mutes[0] +        if active_mutes: +            reason = f"Re-applying active mute: {active_mutes[0]['id']}" +            action = member.add_roles(self._muted_role, reason=reason) -        # Calculate the time remaining, in seconds, for the mute. -        expiry = dateutil.parser.isoparse(mute["expires_at"]).replace(tzinfo=None) -        delta = (expiry - datetime.utcnow()).total_seconds() - -        # Mark as inactive if less than a minute remains. -        if delta < 60: -            await self.deactivate_infraction(mute) -            return - -        # Allowing mod log since this is a passive action that should be logged. -        await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}") -        log.debug(f"User {member.id} has been re-muted on rejoin.") +            await self.reapply_infraction(active_mutes[0], action)      # region: Permanent infractions @@ -234,7 +195,7 @@ class Infractions(Scheduler, commands.Cog):          await self.pardon_infraction(ctx, "ban", user)      # endregion -    # region: Base infraction functions +    # region: Base apply functions      async def apply_mute(self, ctx: Context, user: Member, reason: str, **kwargs) -> None:          """Apply a mute infraction with kwargs passed to `post_infraction`.""" @@ -278,328 +239,63 @@ class Infractions(Scheduler, commands.Cog):          await self.apply_infraction(ctx, infraction, user, action)      # endregion -    # region: Utility functions - -    async def _scheduled_task(self, infraction: utils.Infraction) -> None: -        """ -        Marks an infraction expired after the delay from time of scheduling to time of expiration. - -        At the time of expiration, the infraction is marked as inactive on the website and the -        expiration task is cancelled. -        """ -        _id = infraction["id"] - -        expiry = dateutil.parser.isoparse(infraction["expires_at"]).replace(tzinfo=None) -        await time.wait_until(expiry) - -        log.debug(f"Marking infraction {_id} as inactive (expired).") -        await self.deactivate_infraction(infraction) - -    async def deactivate_infraction( -        self, -        infraction: utils.Infraction, -        send_log: bool = True -    ) -> t.Dict[str, str]: -        """ -        Deactivate an active infraction and return a dictionary of lines to send in a mod log. - -        The infraction is removed from Discord, marked as inactive in the database, and has its -        expiration task cancelled. If `send_log` is True, a mod log is sent for the -        deactivation of the infraction. - -        Supported infraction types are mute and ban. Other types will raise a ValueError. -        """ -        guild = self.bot.get_guild(constants.Guild.id) -        mod_role = guild.get_role(constants.Roles.moderator) -        user_id = infraction["user"] -        _type = infraction["type"] -        _id = infraction["id"] -        reason = f"Infraction #{_id} expired or was pardoned." - -        log.debug(f"Marking infraction #{_id} as inactive (expired).") - -        log_content = None -        log_text = { -            "Member": str(user_id), -            "Actor": str(self.bot.user), -            "Reason": infraction["reason"] -        } - -        try: -            if _type == "mute": -                user = guild.get_member(user_id) -                if user: -                    # Remove the muted role. -                    self.mod_log.ignore(Event.member_update, user.id) -                    await user.remove_roles(self._muted_role, reason=reason) - -                    # DM the user about the expiration. -                    notified = await utils.notify_pardon( -                        user=user, -                        title="You have been unmuted.", -                        content="You may now send messages in the server.", -                        icon_url=utils.INFRACTION_ICONS["mute"][1] -                    ) - -                    log_text["Member"] = f"{user.mention}(`{user.id}`)" -                    log_text["DM"] = "Sent" if notified else "**Failed**" -                else: -                    log.info(f"Failed to unmute user {user_id}: user not found") -                    log_text["Failure"] = "User was not found in the guild." -            elif _type == "ban": -                user = discord.Object(user_id) -                self.mod_log.ignore(Event.member_unban, user_id) -                try: -                    await guild.unban(user, reason=reason) -                except discord.NotFound: -                    log.info(f"Failed to unban user {user_id}: no active ban found on Discord") -                    log_text["Note"] = "No active ban found on Discord." -            else: -                raise ValueError( -                    f"Attempted to deactivate an unsupported infraction #{_id} ({_type})!" -                ) -        except discord.Forbidden: -            log.warning(f"Failed to deactivate infraction #{_id} ({_type}): bot lacks permissions") -            log_text["Failure"] = f"The bot lacks permissions to do this (role hierarchy?)" -            log_content = mod_role.mention -        except discord.HTTPException as e: -            log.exception(f"Failed to deactivate infraction #{_id} ({_type})") -            log_text["Failure"] = f"HTTPException with code {e.code}." -            log_content = mod_role.mention - -        # Check if the user is currently being watched by Big Brother. -        try: -            active_watch = await self.bot.api_client.get( -                "bot/infractions", -                params={ -                    "active": "true", -                    "type": "watch", -                    "user__id": user_id -                } +    # region: Base pardon functions + +    async def pardon_mute(self, user_id: int, guild: discord.Guild, reason: str) -> t.Dict[str, str]: +        """Remove a user's muted role, DM them a notification, and return a log dict.""" +        user = guild.get_member(user_id) +        log_text = {} + +        if user: +            # Remove the muted role. +            self.mod_log.ignore(Event.member_update, user.id) +            await user.remove_roles(self._muted_role, reason=reason) + +            # DM the user about the expiration. +            notified = await utils.notify_pardon( +                user=user, +                title="You have been unmuted", +                content="You may now send messages in the server.", +                icon_url=utils.INFRACTION_ICONS["mute"][1]              ) -            log_text["Watching"] = "Yes" if active_watch else "No" -        except ResponseCodeError: -            log.exception(f"Failed to fetch watch status for user {user_id}") -            log_text["Watching"] = "Unknown - failed to fetch watch status." - -        try: -            # Mark infraction as inactive in the database. -            await self.bot.api_client.patch( -                f"bot/infractions/{_id}", -                json={"active": False} -            ) -        except ResponseCodeError as e: -            log.exception(f"Failed to deactivate infraction #{_id} ({_type})") -            log_line = f"API request failed with code {e.status}." -            log_content = mod_role.mention - -            # Append to an existing failure message if possible -            if "Failure" in log_text: -                log_text["Failure"] += f" {log_line}" -            else: -                log_text["Failure"] = log_line - -        # Cancel the expiration task. -        if infraction["expires_at"] is not None: -            self.cancel_task(infraction["id"]) - -        # Send a log message to the mod log. -        if send_log: -            log_title = f"expiration failed" if "Failure" in log_text else "expired" - -            await self.mod_log.send_log_message( -                icon_url=utils.INFRACTION_ICONS[_type][1], -                colour=Colours.soft_green, -                title=f"Infraction {log_title}: {_type}", -                text="\n".join(f"{k}: {v}" for k, v in log_text.items()), -                footer=f"ID: {_id}", -                content=log_content, -            ) +            log_text["Member"] = f"{user.mention}(`{user.id}`)" +            log_text["DM"] = "Sent" if notified else "**Failed**" +        else: +            log.info(f"Failed to unmute user {user_id}: user not found") +            log_text["Failure"] = "User was not found in the guild."          return log_text -    async def apply_infraction( -        self, -        ctx: Context, -        infraction: utils.Infraction, -        user: MemberObject, -        action_coro: t.Optional[t.Awaitable] = None -    ) -> None: -        """Apply an infraction to the user, log the infraction, and optionally notify the user.""" -        infr_type = infraction["type"] -        icon = utils.INFRACTION_ICONS[infr_type][0] -        reason = infraction["reason"] -        expiry = infraction["expires_at"] - -        if expiry: -            expiry = time.format_infraction(expiry) - -        # Default values for the confirmation message and mod log. -        confirm_msg = f":ok_hand: applied" - -        # Specifying an expiry for a note or warning makes no sense. -        if infr_type in ("note", "warning"): -            expiry_msg = "" -        else: -            expiry_msg = f" until {expiry}" if expiry else " permanently" - -        dm_result = "" -        dm_log_text = "" -        expiry_log_text = f"Expires: {expiry}" if expiry else "" -        log_title = "applied" -        log_content = None - -        # DM the user about the infraction if it's not a shadow/hidden infraction. -        if not infraction["hidden"]: -            # Sometimes user is a discord.Object; make it a proper user. -            await self.bot.fetch_user(user.id) - -            # Accordingly display whether the user was successfully notified via DM. -            if await utils.notify_infraction(user, infr_type, expiry, reason, icon): -                dm_result = ":incoming_envelope: " -                dm_log_text = "\nDM: Sent" -            else: -                dm_log_text = "\nDM: **Failed**" -                log_content = ctx.author.mention - -        if infraction["actor"] == self.bot.user.id: -            end_msg = f" (reason: {infraction['reason']})" -        elif ctx.channel.id not in STAFF_CHANNELS: -            end_msg = '' -        else: -            infractions = await self.bot.api_client.get( -                "bot/infractions", -                params={"user__id": str(user.id)} -            ) -            total = len(infractions) -            end_msg = f" ({total} infraction{ngettext('', 's', total)} total)" - -        # Execute the necessary actions to apply the infraction on Discord. -        if action_coro: -            try: -                await action_coro -                if expiry: -                    # Schedule the expiration of the infraction. -                    self.schedule_task(ctx.bot.loop, infraction["id"], infraction) -            except discord.Forbidden: -                # Accordingly display that applying the infraction failed. -                confirm_msg = f":x: failed to apply" -                expiry_msg = "" -                log_content = ctx.author.mention -                log_title = "failed to apply" - -        # Send a confirmation message to the invoking context. -        await ctx.send( -            f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}{end_msg}." -        ) +    async def pardon_ban(self, user_id: int, guild: discord.Guild, reason: str) -> t.Dict[str, str]: +        """Remove a user's ban on the Discord guild and return a log dict.""" +        user = discord.Object(user_id) +        log_text = {} -        # Send a log message to the mod log. -        await self.mod_log.send_log_message( -            icon_url=icon, -            colour=Colours.soft_red, -            title=f"Infraction {log_title}: {infr_type}", -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author}{dm_log_text} -                Reason: {reason} -                {expiry_log_text} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) +        self.mod_log.ignore(Event.member_unban, user_id) -    async def pardon_infraction(self, ctx: Context, infr_type: str, user: MemberObject) -> None: -        """Prematurely end an infraction for a user and log the action in the mod log.""" -        # Check the current active infraction -        response = await self.bot.api_client.get( -            'bot/infractions', -            params={ -                'active': 'true', -                'type': infr_type, -                'user__id': user.id -            } -        ) +        try: +            await guild.unban(user, reason=reason) +        except discord.NotFound: +            log.info(f"Failed to unban user {user_id}: no active ban found on Discord") +            log_text["Note"] = "No active ban found on Discord." -        if not response: -            await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.") -            return +        return log_text -        # Deactivate the infraction and cancel its scheduled expiration task. -        log_text = await self.deactivate_infraction(response[0], send_log=False) - -        log_text["Member"] = f"{user.mention}(`{user.id}`)" -        log_text["Actor"] = str(ctx.message.author) -        log_content = None -        footer = f"ID: {response[0]['id']}" - -        # If multiple active infractions were found, mark them as inactive in the database -        # and cancel their expiration tasks. -        if len(response) > 1: -            log.warning(f"Found more than one active {infr_type} infraction for user {user.id}") - -            footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" - -            log_note = f"Found multiple **active** {infr_type} infractions in the database." -            if "Note" in log_text: -                log_text["Note"] = f" {log_note}" -            else: -                log_text["Note"] = log_note - -            # deactivate_infraction() is not called again because: -            #     1. Discord cannot store multiple active bans or assign multiples of the same role -            #     2. It would send a pardon DM for each active infraction, which is redundant -            for infraction in response[1:]: -                _id = infraction['id'] -                try: -                    # Mark infraction as inactive in the database. -                    await self.bot.api_client.patch( -                        f"bot/infractions/{_id}", -                        json={"active": False} -                    ) -                except ResponseCodeError: -                    log.exception(f"Failed to deactivate infraction #{_id} ({infr_type})") -                    # This is simpler and cleaner than trying to concatenate all the errors. -                    log_text["Failure"] = "See bot's logs for details." - -                # Cancel pending expiration task. -                if infraction["expires_at"] is not None: -                    self.cancel_task(infraction["id"]) - -        # Accordingly display whether the user was successfully notified via DM. -        dm_emoji = "" -        if log_text.get("DM") == "Sent": -            dm_emoji = ":incoming_envelope: " -        elif "DM" in log_text: -            # Mention the actor because the DM failed to send. -            log_content = ctx.author.mention - -        # Accordingly display whether the pardon failed. -        if "Failure" in log_text: -            confirm_msg = ":x: failed to pardon" -            log_title = "pardon failed" -            log_content = ctx.author.mention -        else: -            confirm_msg = f":ok_hand: pardoned" -            log_title = "pardoned" +    async def _pardon_action(self, infraction: utils.Infraction) -> t.Optional[t.Dict[str, str]]: +        """ +        Execute deactivation steps specific to the infraction's type and return a log dict. -        # Send a confirmation message to the invoking context. -        await ctx.send( -            f"{dm_emoji}{confirm_msg} infraction **{infr_type}** for {user.mention}. " -            f"{log_text.get('Failure', '')}" -        ) +        If an infraction type is unsupported, return None instead. +        """ +        guild = self.bot.get_guild(constants.Guild.id) +        user_id = infraction["user"] +        reason = f"Infraction #{infraction['id']} expired or was pardoned." -        # Send a log message to the mod log. -        await self.mod_log.send_log_message( -            icon_url=utils.INFRACTION_ICONS[infr_type][1], -            colour=Colours.soft_green, -            title=f"Infraction {log_title}: {infr_type}", -            thumbnail=user.avatar_url_as(static_format="png"), -            text="\n".join(f"{k}: {v}" for k, v in log_text.items()), -            footer=footer, -            content=log_content, -        ) +        if infraction["type"] == "mute": +            return await self.pardon_mute(user_id, guild, reason) +        elif infraction["type"] == "ban": +            return await self.pardon_ban(user_id, guild, reason)      # endregion diff --git a/bot/cogs/moderation/scheduler.py b/bot/cogs/moderation/scheduler.py new file mode 100644 index 000000000..7990df226 --- /dev/null +++ b/bot/cogs/moderation/scheduler.py @@ -0,0 +1,368 @@ +import logging +import textwrap +import typing as t +from abc import abstractmethod +from datetime import datetime +from gettext import ngettext + +import dateutil.parser +import discord +from discord.ext.commands import Bot, Context + +from bot import constants +from bot.api import ResponseCodeError +from bot.constants import Colours, STAFF_CHANNELS +from bot.utils import time +from bot.utils.scheduling import Scheduler +from . import utils +from .modlog import ModLog +from .utils import MemberObject + +log = logging.getLogger(__name__) + + +class InfractionScheduler(Scheduler): +    """Handles the application, pardoning, and expiration of infractions.""" + +    def __init__(self, bot: Bot, supported_infractions: t.Container[str]): +        super().__init__() + +        self.bot = bot +        self.bot.loop.create_task(self.reschedule_infractions(supported_infractions)) + +    @property +    def mod_log(self) -> ModLog: +        """Get the currently loaded ModLog cog instance.""" +        return self.bot.get_cog("ModLog") + +    async def reschedule_infractions(self, supported_infractions: t.Container[str]) -> None: +        """Schedule expiration for previous infractions.""" +        await self.bot.wait_until_ready() + +        infractions = await self.bot.api_client.get( +            'bot/infractions', +            params={'active': 'true'} +        ) +        for infraction in infractions: +            if infraction["expires_at"] is not None and infraction["type"] in supported_infractions: +                self.schedule_task(self.bot.loop, infraction["id"], infraction) + +    async def reapply_infraction( +        self, +        infraction: utils.Infraction, +        apply_coro: t.Optional[t.Awaitable] +    ) -> None: +        """Reapply an infraction if it's still active or deactivate it if less than 60 sec left.""" +        # Calculate the time remaining, in seconds, for the mute. +        expiry = dateutil.parser.isoparse(infraction["expires_at"]).replace(tzinfo=None) +        delta = (expiry - datetime.utcnow()).total_seconds() + +        # Mark as inactive if less than a minute remains. +        if delta < 60: +            await self.deactivate_infraction(infraction) +            return + +        # Allowing mod log since this is a passive action that should be logged. +        await apply_coro +        log.info(f"Re-applied {infraction['type']} to user {infraction['user']} upon rejoining.") + +    async def apply_infraction( +        self, +        ctx: Context, +        infraction: utils.Infraction, +        user: MemberObject, +        action_coro: t.Optional[t.Awaitable] = None +    ) -> None: +        """Apply an infraction to the user, log the infraction, and optionally notify the user.""" +        infr_type = infraction["type"] +        icon = utils.INFRACTION_ICONS[infr_type][0] +        reason = infraction["reason"] +        expiry = infraction["expires_at"] + +        if expiry: +            expiry = time.format_infraction(expiry) + +        # Default values for the confirmation message and mod log. +        confirm_msg = f":ok_hand: applied" + +        # Specifying an expiry for a note or warning makes no sense. +        if infr_type in ("note", "warning"): +            expiry_msg = "" +        else: +            expiry_msg = f" until {expiry}" if expiry else " permanently" + +        dm_result = "" +        dm_log_text = "" +        expiry_log_text = f"Expires: {expiry}" if expiry else "" +        log_title = "applied" +        log_content = None + +        # DM the user about the infraction if it's not a shadow/hidden infraction. +        if not infraction["hidden"]: +            # Sometimes user is a discord.Object; make it a proper user. +            user = await self.bot.fetch_user(user.id) + +            # Accordingly display whether the user was successfully notified via DM. +            if await utils.notify_infraction(user, infr_type, expiry, reason, icon): +                dm_result = ":incoming_envelope: " +                dm_log_text = "\nDM: Sent" +            else: +                dm_log_text = "\nDM: **Failed**" +                log_content = ctx.author.mention + +        if infraction["actor"] == self.bot.user.id: +            end_msg = f" (reason: {infraction['reason']})" +        elif ctx.channel.id not in STAFF_CHANNELS: +            end_msg = "" +        else: +            infractions = await self.bot.api_client.get( +                "bot/infractions", +                params={"user__id": str(user.id)} +            ) +            total = len(infractions) +            end_msg = f" ({total} infraction{ngettext('', 's', total)} total)" + +        # Execute the necessary actions to apply the infraction on Discord. +        if action_coro: +            try: +                await action_coro +                if expiry: +                    # Schedule the expiration of the infraction. +                    self.schedule_task(ctx.bot.loop, infraction["id"], infraction) +            except discord.Forbidden: +                # Accordingly display that applying the infraction failed. +                confirm_msg = f":x: failed to apply" +                expiry_msg = "" +                log_content = ctx.author.mention +                log_title = "failed to apply" + +        # Send a confirmation message to the invoking context. +        await ctx.send( +            f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}{end_msg}." +        ) + +        # Send a log message to the mod log. +        await self.mod_log.send_log_message( +            icon_url=icon, +            colour=Colours.soft_red, +            title=f"Infraction {log_title}: {infr_type}", +            thumbnail=user.avatar_url_as(static_format="png"), +            text=textwrap.dedent(f""" +                Member: {user.mention} (`{user.id}`) +                Actor: {ctx.message.author}{dm_log_text} +                Reason: {reason} +                {expiry_log_text} +            """), +            content=log_content, +            footer=f"ID {infraction['id']}" +        ) + +    async def pardon_infraction(self, ctx: Context, infr_type: str, user: MemberObject) -> None: +        """Prematurely end an infraction for a user and log the action in the mod log.""" +        # Check the current active infraction +        response = await self.bot.api_client.get( +            'bot/infractions', +            params={ +                'active': 'true', +                'type': infr_type, +                'user__id': user.id +            } +        ) + +        if not response: +            await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.") +            return + +        # Deactivate the infraction and cancel its scheduled expiration task. +        log_text = await self.deactivate_infraction(response[0], send_log=False) + +        log_text["Member"] = f"{user.mention}(`{user.id}`)" +        log_text["Actor"] = str(ctx.message.author) +        log_content = None +        footer = f"ID: {response[0]['id']}" + +        # If multiple active infractions were found, mark them as inactive in the database +        # and cancel their expiration tasks. +        if len(response) > 1: +            log.warning(f"Found more than one active {infr_type} infraction for user {user.id}") + +            footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + +            log_note = f"Found multiple **active** {infr_type} infractions in the database." +            if "Note" in log_text: +                log_text["Note"] = f" {log_note}" +            else: +                log_text["Note"] = log_note + +            # deactivate_infraction() is not called again because: +            #     1. Discord cannot store multiple active bans or assign multiples of the same role +            #     2. It would send a pardon DM for each active infraction, which is redundant +            for infraction in response[1:]: +                _id = infraction['id'] +                try: +                    # Mark infraction as inactive in the database. +                    await self.bot.api_client.patch( +                        f"bot/infractions/{_id}", +                        json={"active": False} +                    ) +                except ResponseCodeError: +                    log.exception(f"Failed to deactivate infraction #{_id} ({infr_type})") +                    # This is simpler and cleaner than trying to concatenate all the errors. +                    log_text["Failure"] = "See bot's logs for details." + +                # Cancel pending expiration task. +                if infraction["expires_at"] is not None: +                    self.cancel_task(infraction["id"]) + +        # Accordingly display whether the user was successfully notified via DM. +        dm_emoji = "" +        if log_text.get("DM") == "Sent": +            dm_emoji = ":incoming_envelope: " +        elif "DM" in log_text: +            # Mention the actor because the DM failed to send. +            log_content = ctx.author.mention + +        # Accordingly display whether the pardon failed. +        if "Failure" in log_text: +            confirm_msg = ":x: failed to pardon" +            log_title = "pardon failed" +            log_content = ctx.author.mention +        else: +            confirm_msg = f":ok_hand: pardoned" +            log_title = "pardoned" + +        # Send a confirmation message to the invoking context. +        await ctx.send( +            f"{dm_emoji}{confirm_msg} infraction **{infr_type}** for {user.mention}. " +            f"{log_text.get('Failure', '')}" +        ) + +        # Send a log message to the mod log. +        await self.mod_log.send_log_message( +            icon_url=utils.INFRACTION_ICONS[infr_type][1], +            colour=Colours.soft_green, +            title=f"Infraction {log_title}: {infr_type}", +            thumbnail=user.avatar_url_as(static_format="png"), +            text="\n".join(f"{k}: {v}" for k, v in log_text.items()), +            footer=footer, +            content=log_content, +        ) + +    async def deactivate_infraction( +        self, +        infraction: utils.Infraction, +        send_log: bool = True +    ) -> t.Dict[str, str]: +        """ +        Deactivate an active infraction and return a dictionary of lines to send in a mod log. + +        The infraction is removed from Discord, marked as inactive in the database, and has its +        expiration task cancelled. If `send_log` is True, a mod log is sent for the +        deactivation of the infraction. + +        Infractions of unsupported types will raise a ValueError. +        """ +        guild = self.bot.get_guild(constants.Guild.id) +        mod_role = guild.get_role(constants.Roles.moderator) +        user_id = infraction["user"] +        _type = infraction["type"] +        _id = infraction["id"] + +        log.debug(f"Marking infraction #{_id} as inactive (expired).") + +        log_content = None +        log_text = { +            "Member": str(user_id), +            "Actor": str(self.bot.user), +            "Reason": infraction["reason"] +        } + +        try: +            returned_log = await self._pardon_action(infraction) +            if returned_log is not None: +                log_text = {**log_text, **returned_log}  # Merge the logs together +            else: +                raise ValueError( +                    f"Attempted to deactivate an unsupported infraction #{_id} ({_type})!" +                ) +        except discord.Forbidden: +            log.warning(f"Failed to deactivate infraction #{_id} ({_type}): bot lacks permissions") +            log_text["Failure"] = f"The bot lacks permissions to do this (role hierarchy?)" +            log_content = mod_role.mention +        except discord.HTTPException as e: +            log.exception(f"Failed to deactivate infraction #{_id} ({_type})") +            log_text["Failure"] = f"HTTPException with code {e.code}." +            log_content = mod_role.mention + +        # Check if the user is currently being watched by Big Brother. +        try: +            active_watch = await self.bot.api_client.get( +                "bot/infractions", +                params={ +                    "active": "true", +                    "type": "watch", +                    "user__id": user_id +                } +            ) + +            log_text["Watching"] = "Yes" if active_watch else "No" +        except ResponseCodeError: +            log.exception(f"Failed to fetch watch status for user {user_id}") +            log_text["Watching"] = "Unknown - failed to fetch watch status." + +        try: +            # Mark infraction as inactive in the database. +            await self.bot.api_client.patch( +                f"bot/infractions/{_id}", +                json={"active": False} +            ) +        except ResponseCodeError as e: +            log.exception(f"Failed to deactivate infraction #{_id} ({_type})") +            log_line = f"API request failed with code {e.status}." +            log_content = mod_role.mention + +            # Append to an existing failure message if possible +            if "Failure" in log_text: +                log_text["Failure"] += f" {log_line}" +            else: +                log_text["Failure"] = log_line + +        # Cancel the expiration task. +        if infraction["expires_at"] is not None: +            self.cancel_task(infraction["id"]) + +        # Send a log message to the mod log. +        if send_log: +            log_title = f"expiration failed" if "Failure" in log_text else "expired" + +            await self.mod_log.send_log_message( +                icon_url=utils.INFRACTION_ICONS[_type][1], +                colour=Colours.soft_green, +                title=f"Infraction {log_title}: {_type}", +                text="\n".join(f"{k}: {v}" for k, v in log_text.items()), +                footer=f"ID: {_id}", +                content=log_content, +            ) + +        return log_text + +    @abstractmethod +    async def _pardon_action(self, infraction: utils.Infraction) -> t.Optional[t.Dict[str, str]]: +        """ +        Execute deactivation steps specific to the infraction's type and return a log dict. + +        If an infraction type is unsupported, return None instead. +        """ +        raise NotImplementedError + +    async def _scheduled_task(self, infraction: utils.Infraction) -> None: +        """ +        Marks an infraction expired after the delay from time of scheduling to time of expiration. + +        At the time of expiration, the infraction is marked as inactive on the website and the +        expiration task is cancelled. +        """ +        expiry = dateutil.parser.isoparse(infraction["expires_at"]).replace(tzinfo=None) +        await time.wait_until(expiry) + +        await self.deactivate_infraction(infraction) diff --git a/bot/cogs/moderation/superstarify.py b/bot/cogs/moderation/superstarify.py index 82f8621fc..c66222e5a 100644 --- a/bot/cogs/moderation/superstarify.py +++ b/bot/cogs/moderation/superstarify.py @@ -1,17 +1,18 @@  import json  import logging  import random +import textwrap +import typing as t  from pathlib import Path  from discord import Colour, Embed, Member -from discord.errors import Forbidden  from discord.ext.commands import Bot, Cog, Context, command  from bot import constants  from bot.utils.checks import with_role_check  from bot.utils.time import format_infraction  from . import utils -from .modlog import ModLog +from .scheduler import InfractionScheduler  log = logging.getLogger(__name__)  NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#nickname-policy" @@ -20,26 +21,15 @@ with Path("bot/resources/stars.json").open(encoding="utf-8") as stars_file:      STAR_NAMES = json.load(stars_file) -class Superstarify(Cog): +class Superstarify(InfractionScheduler, Cog):      """A set of commands to moderate terrible nicknames."""      def __init__(self, bot: Bot): -        self.bot = bot - -    @property -    def modlog(self) -> ModLog: -        """Get currently loaded ModLog cog instance.""" -        return self.bot.get_cog("ModLog") +        super().__init__(bot, supported_infractions={"superstar"})      @Cog.listener()      async def on_member_update(self, before: Member, after: Member) -> None: -        """ -        This event will trigger when someone changes their name. - -        At this point we will look up the user in our database and check whether they are allowed to -        change their names, or if they are in superstar-prison. If they are not allowed, we will -        change it back. -        """ +        """Revert nickname edits if the user has an active superstarify infraction."""          if before.display_name == after.display_name:              return  # User didn't change their nickname. Abort! @@ -49,103 +39,77 @@ class Superstarify(Cog):          )          active_superstarifies = await self.bot.api_client.get( -            'bot/infractions', +            "bot/infractions",              params={ -                'active': 'true', -                'type': 'superstar', -                'user__id': str(before.id) +                "active": "true", +                "type": "superstar", +                "user__id": str(before.id)              }          ) -        if active_superstarifies: -            [infraction] = active_superstarifies -            forced_nick = self.get_nick(infraction['id'], before.id) -            if after.display_name == forced_nick: -                return  # Nick change was triggered by this event. Ignore. - -            log.info( -                f"{after.display_name} is currently in superstar-prison. " -                f"Changing the nick back to {before.display_name}." -            ) -            await after.edit(nick=forced_nick) -            end_timestamp_human = format_infraction(infraction['expires_at']) - -            try: -                await after.send( -                    "You have tried to change your nickname on the **Python Discord** server " -                    f"from **{before.display_name}** to **{after.display_name}**, but as you " -                    "are currently in superstar-prison, you do not have permission to do so. " -                    "You will be allowed to change your nickname again at the following time:\n\n" -                    f"**{end_timestamp_human}**." -                ) -            except Forbidden: -                log.warning( -                    "The user tried to change their nickname while in superstar-prison. " -                    "This led to the bot trying to DM the user to let them know they cannot do that, " -                    "but the user had either blocked the bot or disabled DMs, so it was not possible " -                    "to DM them, and a discord.errors.Forbidden error was incurred." -                ) +        if not active_superstarifies: +            return + +        infraction = active_superstarifies[0] +        forced_nick = self.get_nick(infraction["id"], before.id) +        if after.display_name == forced_nick: +            return  # Nick change was triggered by this event. Ignore. + +        log.info( +            f"{after.display_name} is currently in superstar-prison. " +            f"Changing the nick back to {before.display_name}." +        ) +        await after.edit( +            nick=forced_nick, +            reason=f"Superstarified member tried to escape the prison: {infraction['id']}" +        ) + +        notified = await utils.notify_infraction( +            user=after, +            infr_type="Superstarify", +            expires_at=format_infraction(infraction["expires_at"]), +            reason=( +                "You have tried to change your nickname on the **Python Discord** server " +                f"from **{before.display_name}** to **{after.display_name}**, but as you " +                "are currently in superstar-prison, you do not have permission to do so." +            ), +            icon_url=utils.INFRACTION_ICONS["superstar"][0] +        ) + +        if not notified: +            log.warning("Failed to DM user about why they cannot change their nickname.")      @Cog.listener()      async def on_member_join(self, member: Member) -> None: -        """ -        This event will trigger when someone (re)joins the server. - -        At this point we will look up the user in our database and check whether they are in -        superstar-prison. If so, we will change their name back to the forced nickname. -        """ +        """Reapply active superstar infractions for returning members."""          active_superstarifies = await self.bot.api_client.get( -            'bot/infractions', +            "bot/infractions",              params={ -                'active': 'true', -                'type': 'superstar', -                'user__id': member.id +                "active": "true", +                "type": "superstar", +                "user__id": member.id              }          )          if active_superstarifies: -            [infraction] = active_superstarifies -            forced_nick = self.get_nick(infraction['id'], member.id) -            await member.edit(nick=forced_nick) -            end_timestamp_human = format_infraction(infraction['expires_at']) - -            try: -                await member.send( -                    "You have left and rejoined the **Python Discord** server, effectively resetting " -                    f"your nickname from **{forced_nick}** to **{member.name}**, " -                    "but as you are currently in superstar-prison, you do not have permission to do so. " -                    "Therefore your nickname was automatically changed back. You will be allowed to " -                    "change your nickname again at the following time:\n\n" -                    f"**{end_timestamp_human}**." -                ) -            except Forbidden: -                log.warning( -                    "The user left and rejoined the server while in superstar-prison. " -                    "This led to the bot trying to DM the user to let them know their name was restored, " -                    "but the user had either blocked the bot or disabled DMs, so it was not possible " -                    "to DM them, and a discord.errors.Forbidden error was incurred." -                ) - -            # Log to the mod_log channel -            log.trace("Logging to the #mod-log channel. This could fail because of channel permissions.") -            mod_log_message = ( -                f"**{member}** (`{member.id}`)\n\n" -                f"Superstarified member potentially tried to escape the prison.\n" -                f"Restored enforced nickname: `{forced_nick}`\n" -                f"Superstardom ends: **{end_timestamp_human}**" -            ) -            await self.modlog.send_log_message( -                icon_url=constants.Icons.user_update, -                colour=Colour.gold(), -                title="Superstar member rejoined server", -                text=mod_log_message, -                thumbnail=member.avatar_url_as(static_format="png") +            infraction = active_superstarifies[0] +            action = member.edit( +                nick=self.get_nick(infraction["id"], member.id), +                reason=f"Superstarified member tried to escape the prison: {infraction['id']}"              ) -    @command(name='superstarify', aliases=('force_nick', 'star')) -    async def superstarify(self, ctx: Context, member: Member, duration: utils.Expiry, reason: str = None) -> None: +            await self.reapply_infraction(infraction, action) + +    @command(name="superstarify", aliases=("force_nick", "star")) +    async def superstarify( +        self, +        ctx: Context, +        member: Member, +        duration: utils.Expiry, +        reason: str = None +    ) -> None:          """ -        Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration. +        Temporarily force a random superstar name (like Taylor Swift) to be the user's nickname.          A unit of time should be appended to the duration.          Units (∗case-sensitive): @@ -165,87 +129,89 @@ class Superstarify(Cog):          if await utils.has_active_infraction(ctx, member, "superstar"):              return -        reason = reason or ('old nick: ' + member.display_name) -        infraction = await utils.post_infraction(ctx, member, 'superstar', reason, expires_at=duration) -        forced_nick = self.get_nick(infraction['id'], member.id) -        expiry_str = format_infraction(infraction["expires_at"]) +        # Post the infraction to the API +        reason = reason or f"old nick: {member.display_name}" +        infraction = await utils.post_infraction(ctx, member, "superstar", reason, duration) -        embed = Embed() -        embed.title = "Congratulations!" -        embed.description = ( -            f"Your previous nickname, **{member.display_name}**, was so bad that we have decided to change it. " -            f"Your new nickname will be **{forced_nick}**.\n\n" -            f"You will be unable to change your nickname until \n**{expiry_str}**.\n\n" -            "If you're confused by this, please read our " -            f"[official nickname policy]({NICKNAME_POLICY_URL})." -        ) +        old_nick = member.display_name +        forced_nick = self.get_nick(infraction["id"], member.id) +        expiry_str = format_infraction(infraction["expires_at"]) -        # Log to the mod_log channel -        log.trace("Logging to the #mod-log channel. This could fail because of channel permissions.") -        mod_log_message = ( -            f"**{member}** (`{member.id}`)\n\n" -            f"Superstarified by **{ctx.author.name}**\n" -            f"Old nickname: `{member.display_name}`\n" -            f"New nickname: `{forced_nick}`\n" -            f"Superstardom ends: **{expiry_str}**" -        ) -        await self.modlog.send_log_message( -            icon_url=constants.Icons.user_update, -            colour=Colour.gold(), -            title="Member Achieved Superstardom", -            text=mod_log_message, -            thumbnail=member.avatar_url_as(static_format="png") -        ) +        # Apply the infraction and schedule the expiration task. +        self.mod_log.ignore(constants.Event.member_update, member.id) +        await member.edit(nick=forced_nick, reason=reason) +        self.schedule_task(ctx.bot.loop, infraction["id"], infraction) +        # Send a DM to the user to notify them of their new infraction.          await utils.notify_infraction(              user=member,              infr_type="Superstarify",              expires_at=expiry_str, +            icon_url=utils.INFRACTION_ICONS["superstar"][0],              reason=f"Your nickname didn't comply with our [nickname policy]({NICKNAME_POLICY_URL})."          ) -        # Change the nick and return the embed -        log.trace("Changing the users nickname and sending the embed.") -        await member.edit(nick=forced_nick) +        # Send an embed with the infraction information to the invoking context. +        embed = Embed( +            title="Congratulations!", +            colour=constants.Colours.soft_orange, +            description=( +                f"Your previous nickname, **{old_nick}**, " +                f"was so bad that we have decided to change it. " +                f"Your new nickname will be **{forced_nick}**.\n\n" +                f"You will be unable to change your nickname until **{expiry_str}**.\n\n" +                "If you're confused by this, please read our " +                f"[official nickname policy]({NICKNAME_POLICY_URL})." +            ) +        )          await ctx.send(embed=embed) -    @command(name='unsuperstarify', aliases=('release_nick', 'unstar')) -    async def unsuperstarify(self, ctx: Context, member: Member) -> None: -        """Remove the superstarify entry from our database, allowing the user to change their nickname.""" -        log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}") +        # Log to the mod log channel. +        await self.mod_log.send_log_message( +            icon_url=utils.INFRACTION_ICONS["superstar"][0], +            colour=Colour.gold(), +            title="Member achieved superstardom", +            thumbnail=member.avatar_url_as(static_format="png"), +            text=textwrap.dedent(f""" +                Member: {member.mention} (`{member.id}`) +                Actor: {ctx.message.author} +                Reason: {reason} +                Expires: {expiry_str} +                Old nickname: `{old_nick}` +                New nickname: `{forced_nick}` +            """), +            footer=f"ID {infraction['id']}" +        ) -        embed = Embed() -        embed.colour = Colour.blurple() +    @command(name="unsuperstarify", aliases=("release_nick", "unstar")) +    async def unsuperstarify(self, ctx: Context, member: Member) -> None: +        """Remove the superstarify infraction and allow the user to change their nickname.""" +        await self.pardon_infraction(ctx, "superstar", member) -        active_superstarifies = await self.bot.api_client.get( -            'bot/infractions', -            params={ -                'active': 'true', -                'type': 'superstar', -                'user__id': str(member.id) -            } -        ) -        if not active_superstarifies: -            await ctx.send(":x: There is no active superstarify infraction for this user.") +    async def _pardon_action(self, infraction: utils.Infraction) -> t.Optional[t.Dict[str, str]]: +        """Pardon a superstar infraction and return a log dict.""" +        if infraction["type"] != "superstar":              return -        [infraction] = active_superstarifies -        await self.bot.api_client.patch( -            'bot/infractions/' + str(infraction['id']), -            json={'active': False} -        ) +        guild = self.bot.get_guild(constants.Guild.id) +        user = guild.get_member(infraction["user"]) -        embed = Embed() -        embed.description = "User has been released from superstar-prison." -        embed.title = random.choice(constants.POSITIVE_REPLIES) +        # Don't bother sending a notification if the user left the guild. +        if not user: +            return {} -        await utils.notify_pardon( -            user=member, -            title="You are no longer superstarified.", -            content="You may now change your nickname on the server." +        # DM the user about the expiration. +        notified = await utils.notify_pardon( +            user=user, +            title="You are no longer superstarified", +            content="You may now change your nickname on the server.", +            icon_url=utils.INFRACTION_ICONS["superstar"][1]          ) -        log.trace(f"{member.display_name} was successfully released from superstar-prison.") -        await ctx.send(embed=embed) + +        return { +            "Member": f"{user.mention}(`{user.id}`)", +            "DM": "Sent" if notified else "**Failed**" +        }      @staticmethod      def get_nick(infraction_id: int, member_id: int) -> str: diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py index 788a40d40..9179c0afb 100644 --- a/bot/cogs/moderation/utils.py +++ b/bot/cogs/moderation/utils.py @@ -15,11 +15,12 @@ log = logging.getLogger(__name__)  # apply icon, pardon icon  INFRACTION_ICONS = { -    "mute": (Icons.user_mute, Icons.user_unmute), -    "kick": (Icons.sign_out, None),      "ban": (Icons.user_ban, Icons.user_unban), -    "warning": (Icons.user_warn, None), +    "kick": (Icons.sign_out, None), +    "mute": (Icons.user_mute, Icons.user_unmute),      "note": (Icons.user_warn, None), +    "superstar": (Icons.superstarify, Icons.unsuperstarify), +    "warning": (Icons.user_warn, None),  }  RULES_URL = "https://pythondiscord.com/pages/rules"  APPEALABLE_INFRACTIONS = ("ban", "mute") @@ -126,7 +127,7 @@ async def notify_infraction(          colour=Colours.soft_red      ) -    embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) +    embed.set_author(name="Infraction information", icon_url=icon_url, url=RULES_URL)      embed.title = f"Please review our rules over at {RULES_URL}"      embed.url = RULES_URL diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py index c516508ca..49783bb09 100644 --- a/bot/cogs/watchchannels/bigbrother.py +++ b/bot/cogs/watchchannels/bigbrother.py @@ -6,7 +6,7 @@ from discord import User  from discord.ext.commands import Bot, Cog, Context, group  from bot.cogs.moderation.utils import post_infraction -from bot.constants import Channels, Roles, Webhooks +from bot.constants import Channels, MODERATION_ROLES, Webhooks  from bot.decorators import with_role  from .watchchannel import WatchChannel, proxy_user @@ -27,13 +27,13 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):          )      @group(name='bigbrother', aliases=('bb',), invoke_without_command=True) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def bigbrother_group(self, ctx: Context) -> None:          """Monitors users by relaying their messages to the Big Brother watch channel."""          await ctx.invoke(self.bot.get_command("help"), "bigbrother")      @bigbrother_group.command(name='watched', aliases=('all', 'list')) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def watched_command(self, ctx: Context, update_cache: bool = True) -> None:          """          Shows the users that are currently being monitored by Big Brother. @@ -44,7 +44,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):          await self.list_watched_users(ctx, update_cache)      @bigbrother_group.command(name='watch', aliases=('w',)) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def watch_command(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:          """          Relay messages sent by the given `user` to the `#big-brother` channel. @@ -91,7 +91,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):          await ctx.send(msg)      @bigbrother_group.command(name='unwatch', aliases=('uw',)) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def unwatch_command(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:          """Stop relaying messages by the given `user`."""          active_watches = await self.bot.api_client.get( diff --git a/bot/cogs/watchchannels/talentpool.py b/bot/cogs/watchchannels/talentpool.py index 176c6f760..4ec42dcc1 100644 --- a/bot/cogs/watchchannels/talentpool.py +++ b/bot/cogs/watchchannels/talentpool.py @@ -7,14 +7,13 @@ from discord import Color, Embed, Member, User  from discord.ext.commands import Bot, Cog, Context, group  from bot.api import ResponseCodeError -from bot.constants import Channels, Guild, Roles, Webhooks +from bot.constants import Channels, Guild, MODERATION_ROLES, STAFF_ROLES, Webhooks  from bot.decorators import with_role  from bot.pagination import LinePaginator  from bot.utils import time  from .watchchannel import WatchChannel, proxy_user  log = logging.getLogger(__name__) -STAFF_ROLES = Roles.owner, Roles.admin, Roles.moderator, Roles.helpers    # <- In constants after the merge?  class TalentPool(WatchChannel, Cog, name="Talentpool"): @@ -31,13 +30,13 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):          )      @group(name='talentpool', aliases=('tp', 'talent', 'nomination', 'n'), invoke_without_command=True) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def nomination_group(self, ctx: Context) -> None:          """Highlights the activity of helper nominees by relaying their messages to the talent pool channel."""          await ctx.invoke(self.bot.get_command("help"), "talentpool")      @nomination_group.command(name='watched', aliases=('all', 'list')) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def watched_command(self, ctx: Context, update_cache: bool = True) -> None:          """          Shows the users that are currently being monitored in the talent pool. @@ -48,7 +47,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):          await self.list_watched_users(ctx, update_cache)      @nomination_group.command(name='watch', aliases=('w', 'add', 'a')) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*STAFF_ROLES)      async def watch_command(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None:          """          Relay messages sent by the given `user` to the `#talent-pool` channel. @@ -113,7 +112,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):          await ctx.send(msg)      @nomination_group.command(name='history', aliases=('info', 'search')) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def history_command(self, ctx: Context, user: Union[User, proxy_user]) -> None:          """Shows the specified user's nomination history."""          result = await self.bot.api_client.get( @@ -142,7 +141,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):          )      @nomination_group.command(name='unwatch', aliases=('end', )) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def unwatch_command(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:          """          Ends the active nomination of the specified user with the given reason. @@ -170,13 +169,13 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):          self._remove_user(user.id)      @nomination_group.group(name='edit', aliases=('e',), invoke_without_command=True) -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def nomination_edit_group(self, ctx: Context) -> None:          """Commands to edit nominations."""          await ctx.invoke(self.bot.get_command("help"), "talentpool", "edit")      @nomination_edit_group.command(name='reason') -    @with_role(Roles.owner, Roles.admin, Roles.moderator) +    @with_role(*MODERATION_ROLES)      async def edit_reason_command(self, ctx: Context, nomination_id: int, *, reason: str) -> None:          """          Edits the reason/unnominate reason for the nomination with the given `id` depending on the status. diff --git a/bot/constants.py b/bot/constants.py index d3e79b4c2..45f42cf81 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -312,6 +312,9 @@ class Icons(metaclass=YAMLGetter):      questionmark: str +    superstarify: str +    unsuperstarify: str +  class CleanMessages(metaclass=YAMLGetter):      section = "bot" diff --git a/config-default.yml b/config-default.yml index bce6ea266..ee9f8a06b 100644 --- a/config-default.yml +++ b/config-default.yml @@ -86,6 +86,9 @@ style:          questionmark: "https://cdn.discordapp.com/emojis/512367613339369475.png" +        superstarify: "https://cdn.discordapp.com/emojis/636288153044516874.png" +        unsuperstarify: "https://cdn.discordapp.com/emojis/636288201258172446.png" +  guild:      id: 267624335836053506 diff --git a/tests/bot/rules/test_links.py b/tests/bot/rules/test_links.py new file mode 100644 index 000000000..be832843b --- /dev/null +++ b/tests/bot/rules/test_links.py @@ -0,0 +1,101 @@ +import unittest +from typing import List, NamedTuple, Tuple + +from bot.rules import links +from tests.helpers import async_test + + +class FakeMessage(NamedTuple): +    author: str +    content: str + + +class Case(NamedTuple): +    recent_messages: List[FakeMessage] +    relevant_messages: Tuple[FakeMessage] +    culprit: Tuple[str] +    total_links: int + + +def msg(author: str, total_links: int) -> FakeMessage: +    """Makes a message with *total_links* links.""" +    content = " ".join(["https://pydis.com"] * total_links) +    return FakeMessage(author=author, content=content) + + +class LinksTests(unittest.TestCase): +    """Tests applying the `links` rule.""" + +    def setUp(self): +        self.config = { +            "max": 2, +            "interval": 10 +        } + +    @async_test +    async def test_links_within_limit(self): +        """Messages with an allowed amount of links.""" +        cases = ( +            [msg("bob", 0)], +            [msg("bob", 2)], +            [msg("bob", 3)],  # Filter only applies if len(messages_with_links) > 1 +            [msg("bob", 1), msg("bob", 1)], +            [msg("bob", 2), msg("alice", 2)]  # Only messages from latest author count +        ) + +        for recent_messages in cases: +            last_message = recent_messages[0] + +            with self.subTest( +                last_message=last_message, +                recent_messages=recent_messages, +                config=self.config +            ): +                self.assertIsNone( +                    await links.apply(last_message, recent_messages, self.config) +                ) + +    @async_test +    async def test_links_exceeding_limit(self): +        """Messages with a a higher than allowed amount of links.""" +        cases = ( +            Case( +                [msg("bob", 1), msg("bob", 2)], +                (msg("bob", 1), msg("bob", 2)), +                ("bob",), +                3 +            ), +            Case( +                [msg("alice", 1), msg("alice", 1), msg("alice", 1)], +                (msg("alice", 1), msg("alice", 1), msg("alice", 1)), +                ("alice",), +                3 +            ), +            Case( +                [msg("alice", 2), msg("bob", 3), msg("alice", 1)], +                (msg("alice", 2), msg("alice", 1)), +                ("alice",), +                3 +            ) +        ) + +        for recent_messages, relevant_messages, culprit, total_links in cases: +            last_message = recent_messages[0] + +            with self.subTest( +                last_message=last_message, +                recent_messages=recent_messages, +                relevant_messages=relevant_messages, +                culprit=culprit, +                total_links=total_links, +                config=self.config +            ): +                desired_output = ( +                    f"sent {total_links} links in {self.config['interval']}s", +                    culprit, +                    relevant_messages +                ) +                self.assertTupleEqual( +                    await links.apply(last_message, recent_messages, self.config), +                    desired_output +                ) | 
