aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/cogs/error_handler.py2
-rw-r--r--bot/cogs/filtering.py171
-rw-r--r--bot/cogs/snekbox.py9
-rw-r--r--tests/bot/cogs/test_snekbox.py12
4 files changed, 141 insertions, 53 deletions
diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py
index 5de961116..233851e41 100644
--- a/bot/cogs/error_handler.py
+++ b/bot/cogs/error_handler.py
@@ -170,7 +170,7 @@ class ErrorHandler(Cog):
await prepared_help_command
self.bot.stats.incr("errors.too_many_arguments")
elif isinstance(e, errors.BadArgument):
- await ctx.send(f"Bad argument: {e}\n")
+ await ctx.send("Bad argument: Please double-check your input arguments and try again.\n")
await prepared_help_command
self.bot.stats.incr("errors.bad_argument")
elif isinstance(e, errors.BadUnionArgument):
diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py
index 099606b82..bd665f424 100644
--- a/bot/cogs/filtering.py
+++ b/bot/cogs/filtering.py
@@ -2,7 +2,7 @@ import asyncio
import logging
import re
from datetime import datetime, timedelta
-from typing import List, Mapping, Optional, Union
+from typing import List, Mapping, Optional, Tuple, Union
import dateutil
import discord.errors
@@ -198,24 +198,67 @@ class Filtering(Cog):
# Update time when alert sent
await self.name_alerts.set(member.id, datetime.utcnow().timestamp())
- async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None:
- """Filter the input message to see if it violates any of our rules, and then respond accordingly."""
+ async def filter_eval(self, result: str, msg: Message) -> bool:
+ """
+ Filter the result of an !eval to see if it violates any of our rules, and then respond accordingly.
+
+ Also requires the original message, to check whether to filter and for mod logs.
+ Returns whether a filter was triggered or not.
+ """
+ filter_triggered = False
# Should we filter this message?
- role_whitelisted = False
+ if self._check_filter(msg):
+ for filter_name, _filter in self.filters.items():
+ # Is this specific filter enabled in the config?
+ # We also do not need to worry about filters that take the full message,
+ # since all we have is an arbitrary string.
+ if _filter["enabled"] and _filter["content_only"]:
+ match = await _filter["function"](result)
- if type(msg.author) is Member: # Only Member has roles, not User.
- for role in msg.author.roles:
- if role.id in Filter.role_whitelist:
- role_whitelisted = True
+ if match:
+ # If this is a filter (not a watchlist), we set the variable so we know
+ # that it has been triggered
+ if _filter["type"] == "filter":
+ filter_triggered = True
- filter_message = (
- msg.channel.id not in Filter.channel_whitelist # Channel not in whitelist
- and not role_whitelisted # Role not in whitelist
- and not msg.author.bot # Author not a bot
- )
+ # We do not have to check against DM channels since !eval cannot be used there.
+ channel_str = f"in {msg.channel.mention}"
+
+ message_content, additional_embeds, additional_embeds_msg = self._add_stats(
+ filter_name, match, result
+ )
+
+ message = (
+ f"The {filter_name} {_filter['type']} was triggered "
+ f"by **{msg.author}** "
+ f"(`{msg.author.id}`) {channel_str} using !eval with "
+ f"[the following message]({msg.jump_url}):\n\n"
+ f"{message_content}"
+ )
+
+ log.debug(message)
- # If none of the above, we can start filtering.
- if filter_message:
+ # Send pretty mod log embed to mod-alerts
+ await self.mod_log.send_log_message(
+ icon_url=Icons.filtering,
+ colour=Colour(Colours.soft_red),
+ title=f"{_filter['type'].title()} triggered!",
+ text=message,
+ thumbnail=msg.author.avatar_url_as(static_format="png"),
+ channel_id=Channels.mod_alerts,
+ ping_everyone=Filter.ping_everyone,
+ additional_embeds=additional_embeds,
+ additional_embeds_msg=additional_embeds_msg
+ )
+
+ break # We don't want multiple filters to trigger
+
+ return filter_triggered
+
+ async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None:
+ """Filter the input message to see if it violates any of our rules, and then respond accordingly."""
+ # Should we filter this message?
+ if self._check_filter(msg):
for filter_name, _filter in self.filters.items():
# Is this specific filter enabled in the config?
if _filter["enabled"]:
@@ -274,16 +317,9 @@ class Filtering(Cog):
else:
channel_str = f"in {msg.channel.mention}"
- # Word and match stats for watch_regex
- if filter_name == "watch_regex":
- surroundings = match.string[max(match.start() - 10, 0): match.end() + 10]
- message_content = (
- f"**Match:** '{match[0]}'\n"
- f"**Location:** '...{escape_markdown(surroundings)}...'\n"
- f"\n**Original Message:**\n{escape_markdown(msg.content)}"
- )
- else: # Use content of discord Message
- message_content = msg.content
+ message_content, additional_embeds, additional_embeds_msg = self._add_stats(
+ filter_name, match, msg.content
+ )
message = (
f"The {filter_name} {_filter['type']} was triggered "
@@ -295,30 +331,6 @@ class Filtering(Cog):
log.debug(message)
- self.bot.stats.incr(f"filters.{filter_name}")
-
- additional_embeds = None
- additional_embeds_msg = None
-
- # The function returns True for invalid invites.
- # They have no data so additional embeds can't be created for them.
- if filter_name == "filter_invites" and match is not True:
- additional_embeds = []
- for invite, data in match.items():
- embed = discord.Embed(description=(
- f"**Members:**\n{data['members']}\n"
- f"**Active:**\n{data['active']}"
- ))
- embed.set_author(name=data["name"])
- embed.set_thumbnail(url=data["icon"])
- embed.set_footer(text=f"Guild Invite Code: {invite}")
- additional_embeds.append(embed)
- additional_embeds_msg = "For the following guild(s):"
-
- elif filter_name == "watch_rich_embeds":
- additional_embeds = msg.embeds
- additional_embeds_msg = "With the following embed(s):"
-
# Send pretty mod log embed to mod-alerts
await self.mod_log.send_log_message(
icon_url=Icons.filtering,
@@ -334,6 +346,63 @@ class Filtering(Cog):
break # We don't want multiple filters to trigger
+ def _add_stats(self, name: str, match: Union[re.Match, dict, bool, List[discord.Embed]], content: str) -> Tuple[
+ str, Optional[List[discord.Embed]], Optional[str]
+ ]:
+ """Adds relevant statistical information to the relevant filter and increments the bot's stats."""
+ # Word and match stats for watch_regex
+ if name == "watch_regex":
+ surroundings = match.string[max(match.start() - 10, 0): match.end() + 10]
+ message_content = (
+ f"**Match:** '{match[0]}'\n"
+ f"**Location:** '...{escape_markdown(surroundings)}...'\n"
+ f"\n**Original Message:**\n{escape_markdown(content)}"
+ )
+ else: # Use original content
+ message_content = content
+
+ additional_embeds = None
+ additional_embeds_msg = None
+
+ self.bot.stats.incr(f"filters.{name}")
+
+ # The function returns True for invalid invites.
+ # They have no data so additional embeds can't be created for them.
+ if name == "filter_invites" and match is not True:
+ additional_embeds = []
+ for invite, data in match.items():
+ embed = discord.Embed(description=(
+ f"**Members:**\n{data['members']}\n"
+ f"**Active:**\n{data['active']}"
+ ))
+ embed.set_author(name=data["name"])
+ embed.set_thumbnail(url=data["icon"])
+ embed.set_footer(text=f"Guild Invite Code: {invite}")
+ additional_embeds.append(embed)
+ additional_embeds_msg = "For the following guild(s):"
+
+ elif name == "watch_rich_embeds":
+ additional_embeds = match
+ additional_embeds_msg = "With the following embed(s):"
+
+ return message_content, additional_embeds, additional_embeds_msg
+
+ @staticmethod
+ def _check_filter(msg: Message) -> bool:
+ """Check whitelists to see if we should filter this message."""
+ role_whitelisted = False
+
+ if type(msg.author) is Member: # Only Member has roles, not User.
+ for role in msg.author.roles:
+ if role.id in Filter.role_whitelist:
+ role_whitelisted = True
+
+ return (
+ msg.channel.id not in Filter.channel_whitelist # Channel not in whitelist
+ and not role_whitelisted # Role not in whitelist
+ and not msg.author.bot # Author not a bot
+ )
+
@staticmethod
async def _has_watch_regex_match(text: str) -> Union[bool, re.Match]:
"""
@@ -426,7 +495,7 @@ class Filtering(Cog):
return invite_data if invite_data else False
@staticmethod
- async def _has_rich_embed(msg: Message) -> bool:
+ async def _has_rich_embed(msg: Message) -> Union[bool, List[discord.Embed]]:
"""Determines if `msg` contains any rich embeds not auto-generated from a URL."""
if msg.embeds:
for embed in msg.embeds:
@@ -435,7 +504,7 @@ class Filtering(Cog):
if not embed.url or embed.url not in urls:
# If `embed.url` does not exist or if `embed.url` is not part of the content
# of the message, it's unlikely to be an auto-generated embed by Discord.
- return True
+ return msg.embeds
else:
log.trace(
"Found a rich embed sent by a regular user account, "
diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py
index a2a7574d4..662f90869 100644
--- a/bot/cogs/snekbox.py
+++ b/bot/cogs/snekbox.py
@@ -212,7 +212,14 @@ class Snekbox(Cog):
else:
self.bot.stats.incr("snekbox.python.success")
- response = await ctx.send(msg)
+ filter_cog = self.bot.get_cog("Filtering")
+ filter_triggered = False
+ if filter_cog:
+ filter_triggered = await filter_cog.filter_eval(msg, ctx.message)
+ if filter_triggered:
+ response = await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.")
+ else:
+ response = await ctx.send(msg)
self.bot.loop.create_task(
wait_for_deletion(response, user_ids=(ctx.author.id,), client=ctx.bot)
)
diff --git a/tests/bot/cogs/test_snekbox.py b/tests/bot/cogs/test_snekbox.py
index cf9adbee0..98dee7a1b 100644
--- a/tests/bot/cogs/test_snekbox.py
+++ b/tests/bot/cogs/test_snekbox.py
@@ -233,6 +233,10 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
self.cog.format_output = AsyncMock(return_value=('[No output]', None))
+ mocked_filter_cog = MagicMock()
+ mocked_filter_cog.filter_eval = AsyncMock(return_value=False)
+ self.bot.get_cog.return_value = mocked_filter_cog
+
await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
'@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```'
@@ -254,6 +258,10 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
self.cog.format_output = AsyncMock(return_value=('Way too long beard', 'lookatmybeard.com'))
+ mocked_filter_cog = MagicMock()
+ mocked_filter_cog.filter_eval = AsyncMock(return_value=False)
+ self.bot.get_cog.return_value = mocked_filter_cog
+
await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
'@LemonLemonishBeard#0042 :yay!: Return code 0.'
@@ -275,6 +283,10 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.get_status_emoji = MagicMock(return_value=':nope!:')
self.cog.format_output = AsyncMock() # This function isn't called
+ mocked_filter_cog = MagicMock()
+ mocked_filter_cog.filter_eval = AsyncMock(return_value=False)
+ self.bot.get_cog.return_value = mocked_filter_cog
+
await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
'@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nBeard got stuck in the eval\n```'