diff options
| -rw-r--r-- | bot/cogs/error_handler.py | 2 | ||||
| -rw-r--r-- | bot/cogs/filtering.py | 171 | ||||
| -rw-r--r-- | bot/cogs/snekbox.py | 9 | ||||
| -rw-r--r-- | tests/bot/cogs/test_snekbox.py | 12 | 
4 files changed, 141 insertions, 53 deletions
| diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py index 5de961116..233851e41 100644 --- a/bot/cogs/error_handler.py +++ b/bot/cogs/error_handler.py @@ -170,7 +170,7 @@ class ErrorHandler(Cog):              await prepared_help_command              self.bot.stats.incr("errors.too_many_arguments")          elif isinstance(e, errors.BadArgument): -            await ctx.send(f"Bad argument: {e}\n") +            await ctx.send("Bad argument: Please double-check your input arguments and try again.\n")              await prepared_help_command              self.bot.stats.incr("errors.bad_argument")          elif isinstance(e, errors.BadUnionArgument): diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py index 099606b82..bd665f424 100644 --- a/bot/cogs/filtering.py +++ b/bot/cogs/filtering.py @@ -2,7 +2,7 @@ import asyncio  import logging  import re  from datetime import datetime, timedelta -from typing import List, Mapping, Optional, Union +from typing import List, Mapping, Optional, Tuple, Union  import dateutil  import discord.errors @@ -198,24 +198,67 @@ class Filtering(Cog):              # Update time when alert sent              await self.name_alerts.set(member.id, datetime.utcnow().timestamp()) -    async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None: -        """Filter the input message to see if it violates any of our rules, and then respond accordingly.""" +    async def filter_eval(self, result: str, msg: Message) -> bool: +        """ +        Filter the result of an !eval to see if it violates any of our rules, and then respond accordingly. + +        Also requires the original message, to check whether to filter and for mod logs. +        Returns whether a filter was triggered or not. +        """ +        filter_triggered = False          # Should we filter this message? -        role_whitelisted = False +        if self._check_filter(msg): +            for filter_name, _filter in self.filters.items(): +                # Is this specific filter enabled in the config? +                # We also do not need to worry about filters that take the full message, +                # since all we have is an arbitrary string. +                if _filter["enabled"] and _filter["content_only"]: +                    match = await _filter["function"](result) -        if type(msg.author) is Member:  # Only Member has roles, not User. -            for role in msg.author.roles: -                if role.id in Filter.role_whitelist: -                    role_whitelisted = True +                    if match: +                        # If this is a filter (not a watchlist), we set the variable so we know +                        # that it has been triggered +                        if _filter["type"] == "filter": +                            filter_triggered = True -        filter_message = ( -            msg.channel.id not in Filter.channel_whitelist  # Channel not in whitelist -            and not role_whitelisted                        # Role not in whitelist -            and not msg.author.bot                          # Author not a bot -        ) +                        # We do not have to check against DM channels since !eval cannot be used there. +                        channel_str = f"in {msg.channel.mention}" + +                        message_content, additional_embeds, additional_embeds_msg = self._add_stats( +                            filter_name, match, result +                        ) + +                        message = ( +                            f"The {filter_name} {_filter['type']} was triggered " +                            f"by **{msg.author}** " +                            f"(`{msg.author.id}`) {channel_str} using !eval with " +                            f"[the following message]({msg.jump_url}):\n\n" +                            f"{message_content}" +                        ) + +                        log.debug(message) -        # If none of the above, we can start filtering. -        if filter_message: +                        # Send pretty mod log embed to mod-alerts +                        await self.mod_log.send_log_message( +                            icon_url=Icons.filtering, +                            colour=Colour(Colours.soft_red), +                            title=f"{_filter['type'].title()} triggered!", +                            text=message, +                            thumbnail=msg.author.avatar_url_as(static_format="png"), +                            channel_id=Channels.mod_alerts, +                            ping_everyone=Filter.ping_everyone, +                            additional_embeds=additional_embeds, +                            additional_embeds_msg=additional_embeds_msg +                        ) + +                        break  # We don't want multiple filters to trigger + +        return filter_triggered + +    async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None: +        """Filter the input message to see if it violates any of our rules, and then respond accordingly.""" +        # Should we filter this message? +        if self._check_filter(msg):              for filter_name, _filter in self.filters.items():                  # Is this specific filter enabled in the config?                  if _filter["enabled"]: @@ -274,16 +317,9 @@ class Filtering(Cog):                          else:                              channel_str = f"in {msg.channel.mention}" -                        # Word and match stats for watch_regex -                        if filter_name == "watch_regex": -                            surroundings = match.string[max(match.start() - 10, 0): match.end() + 10] -                            message_content = ( -                                f"**Match:** '{match[0]}'\n" -                                f"**Location:** '...{escape_markdown(surroundings)}...'\n" -                                f"\n**Original Message:**\n{escape_markdown(msg.content)}" -                            ) -                        else:  # Use content of discord Message -                            message_content = msg.content +                        message_content, additional_embeds, additional_embeds_msg = self._add_stats( +                            filter_name, match, msg.content +                        )                          message = (                              f"The {filter_name} {_filter['type']} was triggered " @@ -295,30 +331,6 @@ class Filtering(Cog):                          log.debug(message) -                        self.bot.stats.incr(f"filters.{filter_name}") - -                        additional_embeds = None -                        additional_embeds_msg = None - -                        # The function returns True for invalid invites. -                        # They have no data so additional embeds can't be created for them. -                        if filter_name == "filter_invites" and match is not True: -                            additional_embeds = [] -                            for invite, data in match.items(): -                                embed = discord.Embed(description=( -                                    f"**Members:**\n{data['members']}\n" -                                    f"**Active:**\n{data['active']}" -                                )) -                                embed.set_author(name=data["name"]) -                                embed.set_thumbnail(url=data["icon"]) -                                embed.set_footer(text=f"Guild Invite Code: {invite}") -                                additional_embeds.append(embed) -                            additional_embeds_msg = "For the following guild(s):" - -                        elif filter_name == "watch_rich_embeds": -                            additional_embeds = msg.embeds -                            additional_embeds_msg = "With the following embed(s):" -                          # Send pretty mod log embed to mod-alerts                          await self.mod_log.send_log_message(                              icon_url=Icons.filtering, @@ -334,6 +346,63 @@ class Filtering(Cog):                          break  # We don't want multiple filters to trigger +    def _add_stats(self, name: str, match: Union[re.Match, dict, bool, List[discord.Embed]], content: str) -> Tuple[ +        str, Optional[List[discord.Embed]], Optional[str] +    ]: +        """Adds relevant statistical information to the relevant filter and increments the bot's stats.""" +        # Word and match stats for watch_regex +        if name == "watch_regex": +            surroundings = match.string[max(match.start() - 10, 0): match.end() + 10] +            message_content = ( +                f"**Match:** '{match[0]}'\n" +                f"**Location:** '...{escape_markdown(surroundings)}...'\n" +                f"\n**Original Message:**\n{escape_markdown(content)}" +            ) +        else:  # Use original content +            message_content = content + +        additional_embeds = None +        additional_embeds_msg = None + +        self.bot.stats.incr(f"filters.{name}") + +        # The function returns True for invalid invites. +        # They have no data so additional embeds can't be created for them. +        if name == "filter_invites" and match is not True: +            additional_embeds = [] +            for invite, data in match.items(): +                embed = discord.Embed(description=( +                    f"**Members:**\n{data['members']}\n" +                    f"**Active:**\n{data['active']}" +                )) +                embed.set_author(name=data["name"]) +                embed.set_thumbnail(url=data["icon"]) +                embed.set_footer(text=f"Guild Invite Code: {invite}") +                additional_embeds.append(embed) +            additional_embeds_msg = "For the following guild(s):" + +        elif name == "watch_rich_embeds": +            additional_embeds = match +            additional_embeds_msg = "With the following embed(s):" + +        return message_content, additional_embeds, additional_embeds_msg + +    @staticmethod +    def _check_filter(msg: Message) -> bool: +        """Check whitelists to see if we should filter this message.""" +        role_whitelisted = False + +        if type(msg.author) is Member:  # Only Member has roles, not User. +            for role in msg.author.roles: +                if role.id in Filter.role_whitelist: +                    role_whitelisted = True + +        return ( +            msg.channel.id not in Filter.channel_whitelist  # Channel not in whitelist +            and not role_whitelisted                        # Role not in whitelist +            and not msg.author.bot                          # Author not a bot +        ) +      @staticmethod      async def _has_watch_regex_match(text: str) -> Union[bool, re.Match]:          """ @@ -426,7 +495,7 @@ class Filtering(Cog):          return invite_data if invite_data else False      @staticmethod -    async def _has_rich_embed(msg: Message) -> bool: +    async def _has_rich_embed(msg: Message) -> Union[bool, List[discord.Embed]]:          """Determines if `msg` contains any rich embeds not auto-generated from a URL."""          if msg.embeds:              for embed in msg.embeds: @@ -435,7 +504,7 @@ class Filtering(Cog):                      if not embed.url or embed.url not in urls:                          # If `embed.url` does not exist or if `embed.url` is not part of the content                          # of the message, it's unlikely to be an auto-generated embed by Discord. -                        return True +                        return msg.embeds                      else:                          log.trace(                              "Found a rich embed sent by a regular user account, " diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py index a2a7574d4..662f90869 100644 --- a/bot/cogs/snekbox.py +++ b/bot/cogs/snekbox.py @@ -212,7 +212,14 @@ class Snekbox(Cog):              else:                  self.bot.stats.incr("snekbox.python.success") -            response = await ctx.send(msg) +            filter_cog = self.bot.get_cog("Filtering") +            filter_triggered = False +            if filter_cog: +                filter_triggered = await filter_cog.filter_eval(msg, ctx.message) +            if filter_triggered: +                response = await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.") +            else: +                response = await ctx.send(msg)              self.bot.loop.create_task(                  wait_for_deletion(response, user_ids=(ctx.author.id,), client=ctx.bot)              ) diff --git a/tests/bot/cogs/test_snekbox.py b/tests/bot/cogs/test_snekbox.py index cf9adbee0..98dee7a1b 100644 --- a/tests/bot/cogs/test_snekbox.py +++ b/tests/bot/cogs/test_snekbox.py @@ -233,6 +233,10 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          self.cog.get_status_emoji = MagicMock(return_value=':yay!:')          self.cog.format_output = AsyncMock(return_value=('[No output]', None)) +        mocked_filter_cog = MagicMock() +        mocked_filter_cog.filter_eval = AsyncMock(return_value=False) +        self.bot.get_cog.return_value = mocked_filter_cog +          await self.cog.send_eval(ctx, 'MyAwesomeCode')          ctx.send.assert_called_once_with(              '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```' @@ -254,6 +258,10 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          self.cog.get_status_emoji = MagicMock(return_value=':yay!:')          self.cog.format_output = AsyncMock(return_value=('Way too long beard', 'lookatmybeard.com')) +        mocked_filter_cog = MagicMock() +        mocked_filter_cog.filter_eval = AsyncMock(return_value=False) +        self.bot.get_cog.return_value = mocked_filter_cog +          await self.cog.send_eval(ctx, 'MyAwesomeCode')          ctx.send.assert_called_once_with(              '@LemonLemonishBeard#0042 :yay!: Return code 0.' @@ -275,6 +283,10 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          self.cog.get_status_emoji = MagicMock(return_value=':nope!:')          self.cog.format_output = AsyncMock()  # This function isn't called +        mocked_filter_cog = MagicMock() +        mocked_filter_cog.filter_eval = AsyncMock(return_value=False) +        self.bot.get_cog.return_value = mocked_filter_cog +          await self.cog.send_eval(ctx, 'MyAwesomeCode')          ctx.send.assert_called_once_with(              '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nBeard got stuck in the eval\n```' | 
