aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar mbaruh <[email protected]>2021-12-01 22:15:31 +0200
committerGravatar mbaruh <[email protected]>2022-07-15 14:52:42 +0300
commitd2226cc067aebd61113c584ccf833d55cf227a2d (patch)
treed28875e15d74e1bd9f1b23cc6a94d20578ce5d13
parentMerge pull request #2215 from python-discord/revival-of-code-role (diff)
Tear down the old filtering system
Tests and dependent functionality in other extensions will be re-added later on.
-rw-r--r--bot/bot.py25
-rw-r--r--bot/converters.py48
-rw-r--r--bot/exts/filters/__init__.py0
-rw-r--r--bot/exts/filters/antimalware.py106
-rw-r--r--bot/exts/filters/antispam.py324
-rw-r--r--bot/exts/filters/filter_lists.py297
-rw-r--r--bot/exts/filters/filtering.py735
-rw-r--r--bot/exts/filters/security.py30
-rw-r--r--bot/exts/filters/token_remover.py233
-rw-r--r--bot/exts/filters/webhook_remover.py94
-rw-r--r--bot/exts/info/codeblock/_cog.py4
-rw-r--r--bot/exts/moderation/watchchannels/_watchchannel.py6
-rw-r--r--bot/rules/__init__.py12
-rw-r--r--bot/rules/attachments.py26
-rw-r--r--bot/rules/burst.py23
-rw-r--r--bot/rules/burst_shared.py18
-rw-r--r--bot/rules/chars.py24
-rw-r--r--bot/rules/discord_emojis.py34
-rw-r--r--bot/rules/duplicates.py28
-rw-r--r--bot/rules/links.py36
-rw-r--r--bot/rules/mentions.py28
-rw-r--r--bot/rules/newlines.py45
-rw-r--r--bot/rules/role_mentions.py24
-rw-r--r--tests/bot/exts/filters/__init__.py0
-rw-r--r--tests/bot/exts/filters/test_antimalware.py202
-rw-r--r--tests/bot/exts/filters/test_antispam.py35
-rw-r--r--tests/bot/exts/filters/test_filtering.py40
-rw-r--r--tests/bot/exts/filters/test_security.py53
-rw-r--r--tests/bot/exts/filters/test_token_remover.py409
-rw-r--r--tests/bot/rules/__init__.py76
-rw-r--r--tests/bot/rules/test_attachments.py69
-rw-r--r--tests/bot/rules/test_burst.py54
-rw-r--r--tests/bot/rules/test_burst_shared.py57
-rw-r--r--tests/bot/rules/test_chars.py64
-rw-r--r--tests/bot/rules/test_discord_emojis.py73
-rw-r--r--tests/bot/rules/test_duplicates.py64
-rw-r--r--tests/bot/rules/test_links.py67
-rw-r--r--tests/bot/rules/test_mentions.py83
-rw-r--r--tests/bot/rules/test_newlines.py102
-rw-r--r--tests/bot/rules/test_role_mentions.py55
40 files changed, 1 insertions, 3702 deletions
diff --git a/bot/bot.py b/bot/bot.py
index aff07cd32..e40c3f8c1 100644
--- a/bot/bot.py
+++ b/bot/bot.py
@@ -27,8 +27,6 @@ class Bot(BotBase):
super().__init__(*args, **kwargs)
- self.filter_list_cache = defaultdict(dict)
-
async def ping_services(self) -> None:
"""A helper to make sure all the services the bot relies on are available on startup."""
# Connect Site/API
@@ -45,33 +43,10 @@ class Bot(BotBase):
raise
await asyncio.sleep(constants.URLs.connect_cooldown)
- def insert_item_into_filter_list_cache(self, item: dict[str, str]) -> None:
- """Add an item to the bots filter_list_cache."""
- type_ = item["type"]
- allowed = item["allowed"]
- content = item["content"]
-
- self.filter_list_cache[f"{type_}.{allowed}"][content] = {
- "id": item["id"],
- "comment": item["comment"],
- "created_at": item["created_at"],
- "updated_at": item["updated_at"],
- }
-
- async def cache_filter_list_data(self) -> None:
- """Cache all the data in the FilterList on the site."""
- full_cache = await self.api_client.get('bot/filter-lists')
-
- for item in full_cache:
- self.insert_item_into_filter_list_cache(item)
-
async def setup_hook(self) -> None:
"""Default async initialisation method for discord.py."""
await super().setup_hook()
- # Build the FilterList cache
- await self.cache_filter_list_data()
-
# This is not awaited to avoid a deadlock with any cogs that have
# wait_until_guild_available in their cog_load method.
scheduling.create_task(self.load_extensions(exts))
diff --git a/bot/converters.py b/bot/converters.py
index 5800ea044..23bef0dcc 100644
--- a/bot/converters.py
+++ b/bot/converters.py
@@ -68,54 +68,6 @@ class ValidDiscordServerInvite(Converter):
raise BadArgument("This does not appear to be a valid Discord server invite.")
-class ValidFilterListType(Converter):
- """
- A converter that checks whether the given string is a valid FilterList type.
-
- Raises `BadArgument` if the argument is not a valid FilterList type, and simply
- passes through the given argument otherwise.
- """
-
- @staticmethod
- async def get_valid_types(bot: Bot) -> list:
- """
- Try to get a list of valid filter list types.
-
- Raise a BadArgument if the API can't respond.
- """
- try:
- valid_types = await bot.api_client.get('bot/filter-lists/get-types')
- except ResponseCodeError:
- raise BadArgument("Cannot validate list_type: Unable to fetch valid types from API.")
-
- return [enum for enum, classname in valid_types]
-
- async def convert(self, ctx: Context, list_type: str) -> str:
- """Checks whether the given string is a valid FilterList type."""
- valid_types = await self.get_valid_types(ctx.bot)
- list_type = list_type.upper()
-
- if list_type not in valid_types:
-
- # Maybe the user is using the plural form of this type,
- # e.g. "guild_invites" instead of "guild_invite".
- #
- # This code will support the simple plural form (a single 's' at the end),
- # which works for all current list types, but if a list type is added in the future
- # which has an irregular plural form (like 'ies'), this code will need to be
- # refactored to support this.
- if list_type.endswith("S") and list_type[:-1] in valid_types:
- list_type = list_type[:-1]
-
- else:
- valid_types_list = '\n'.join([f"• {type_.lower()}" for type_ in valid_types])
- raise BadArgument(
- f"You have provided an invalid list type!\n\n"
- f"Please provide one of the following: \n{valid_types_list}"
- )
- return list_type
-
-
class Extension(Converter):
"""
Fully qualify the name of an extension and ensure it exists.
diff --git a/bot/exts/filters/__init__.py b/bot/exts/filters/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/bot/exts/filters/__init__.py
+++ /dev/null
diff --git a/bot/exts/filters/antimalware.py b/bot/exts/filters/antimalware.py
deleted file mode 100644
index ff39700a6..000000000
--- a/bot/exts/filters/antimalware.py
+++ /dev/null
@@ -1,106 +0,0 @@
-import typing as t
-from os.path import splitext
-
-from discord import Embed, Message, NotFound
-from discord.ext.commands import Cog
-
-from bot.bot import Bot
-from bot.constants import Channels, Filter, URLs
-from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME
-from bot.log import get_logger
-
-log = get_logger(__name__)
-
-PY_EMBED_DESCRIPTION = (
- "It looks like you tried to attach a Python file - "
- f"please use a code-pasting service such as {URLs.site_schema}{URLs.site_paste}"
-)
-
-TXT_LIKE_FILES = {".txt", ".csv", ".json"}
-TXT_EMBED_DESCRIPTION = (
- "You either uploaded a `{blocked_extension}` file or entered a message that was too long. "
- f"Please use our [paste bin]({URLs.site_schema}{URLs.site_paste}) instead."
-)
-
-DISALLOWED_EMBED_DESCRIPTION = (
- "It looks like you tried to attach file type(s) that we do not allow ({blocked_extensions_str}). "
- "We currently allow the following file types: **{joined_whitelist}**.\n\n"
- "Feel free to ask in {meta_channel_mention} if you think this is a mistake."
-)
-
-
-class AntiMalware(Cog):
- """Delete messages which contain attachments with non-whitelisted file extensions."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- def _get_whitelisted_file_formats(self) -> list:
- """Get the file formats currently on the whitelist."""
- return self.bot.filter_list_cache['FILE_FORMAT.True'].keys()
-
- def _get_disallowed_extensions(self, message: Message) -> t.Iterable[str]:
- """Get an iterable containing all the disallowed extensions of attachments."""
- file_extensions = {splitext(attachment.filename.lower())[1] for attachment in message.attachments}
- extensions_blocked = file_extensions - set(self._get_whitelisted_file_formats())
- return extensions_blocked
-
- @Cog.listener()
- async def on_message(self, message: Message) -> None:
- """Identify messages with prohibited attachments."""
- # Return when message don't have attachment and don't moderate DMs
- if not message.attachments or not message.guild:
- return
-
- # Ignore webhook and bot messages
- if message.webhook_id or message.author.bot:
- return
-
- # Ignore code jam channels
- if getattr(message.channel, "category", None) and message.channel.category.name == JAM_CATEGORY_NAME:
- return
-
- # Check if user is staff, if is, return
- # Since we only care that roles exist to iterate over, check for the attr rather than a User/Member instance
- if hasattr(message.author, "roles") and any(role.id in Filter.role_whitelist for role in message.author.roles):
- return
-
- embed = Embed()
- extensions_blocked = self._get_disallowed_extensions(message)
- blocked_extensions_str = ', '.join(extensions_blocked)
- if ".py" in extensions_blocked:
- # Short-circuit on *.py files to provide a pastebin link
- embed.description = PY_EMBED_DESCRIPTION
- elif extensions := TXT_LIKE_FILES.intersection(extensions_blocked):
- # Work around Discord AutoConversion of messages longer than 2000 chars to .txt
- cmd_channel = self.bot.get_channel(Channels.bot_commands)
- embed.description = TXT_EMBED_DESCRIPTION.format(
- blocked_extension=extensions.pop(),
- cmd_channel_mention=cmd_channel.mention
- )
- elif extensions_blocked:
- meta_channel = self.bot.get_channel(Channels.meta)
- embed.description = DISALLOWED_EMBED_DESCRIPTION.format(
- joined_whitelist=', '.join(self._get_whitelisted_file_formats()),
- blocked_extensions_str=blocked_extensions_str,
- meta_channel_mention=meta_channel.mention,
- )
-
- if embed.description:
- log.info(
- f"User '{message.author}' ({message.author.id}) uploaded blacklisted file(s): {blocked_extensions_str}",
- extra={"attachment_list": [attachment.filename for attachment in message.attachments]}
- )
-
- await message.channel.send(f"Hey {message.author.mention}!", embed=embed)
-
- # Delete the offending message:
- try:
- await message.delete()
- except NotFound:
- log.info(f"Tried to delete message `{message.id}`, but message could not be found.")
-
-
-async def setup(bot: Bot) -> None:
- """Load the AntiMalware cog."""
- await bot.add_cog(AntiMalware(bot))
diff --git a/bot/exts/filters/antispam.py b/bot/exts/filters/antispam.py
deleted file mode 100644
index 3b925bacd..000000000
--- a/bot/exts/filters/antispam.py
+++ /dev/null
@@ -1,324 +0,0 @@
-import asyncio
-from collections import defaultdict
-from collections.abc import Mapping
-from dataclasses import dataclass, field
-from datetime import timedelta
-from itertools import takewhile
-from operator import attrgetter, itemgetter
-from typing import Dict, Iterable, List, Set
-
-import arrow
-from botcore.utils import scheduling
-from discord import Colour, Member, Message, MessageType, NotFound, Object, TextChannel
-from discord.ext.commands import Cog
-
-from bot import rules
-from bot.bot import Bot
-from bot.constants import (
- AntiSpam as AntiSpamConfig, Channels, Colours, DEBUG_MODE, Event, Filter, Guild as GuildConfig, Icons
-)
-from bot.converters import Duration
-from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME
-from bot.exts.moderation.modlog import ModLog
-from bot.log import get_logger
-from bot.utils import lock
-from bot.utils.message_cache import MessageCache
-from bot.utils.messages import format_user, send_attachments
-
-log = get_logger(__name__)
-
-RULE_FUNCTION_MAPPING = {
- 'attachments': rules.apply_attachments,
- 'burst': rules.apply_burst,
- # burst shared is temporarily disabled due to a bug
- # 'burst_shared': rules.apply_burst_shared,
- 'chars': rules.apply_chars,
- 'discord_emojis': rules.apply_discord_emojis,
- 'duplicates': rules.apply_duplicates,
- 'links': rules.apply_links,
- 'mentions': rules.apply_mentions,
- 'newlines': rules.apply_newlines,
- 'role_mentions': rules.apply_role_mentions,
-}
-
-
-@dataclass
-class DeletionContext:
- """Represents a Deletion Context for a single spam event."""
-
- members: frozenset[Member]
- triggered_in: TextChannel
- channels: set[TextChannel] = field(default_factory=set)
- rules: Set[str] = field(default_factory=set)
- messages: Dict[int, Message] = field(default_factory=dict)
- attachments: List[List[str]] = field(default_factory=list)
-
- async def add(self, rule_name: str, channels: Iterable[TextChannel], messages: Iterable[Message]) -> None:
- """Adds new rule violation events to the deletion context."""
- self.rules.add(rule_name)
-
- self.channels.update(channels)
-
- for message in messages:
- if message.id not in self.messages:
- self.messages[message.id] = message
-
- # Re-upload attachments
- destination = message.guild.get_channel(Channels.attachment_log)
- urls = await send_attachments(message, destination, link_large=False)
- self.attachments.append(urls)
-
- async def upload_messages(self, actor_id: int, modlog: ModLog) -> None:
- """Method that takes care of uploading the queue and posting modlog alert."""
- triggered_by_users = ", ".join(format_user(m) for m in self.members)
- triggered_in_channel = f"**Triggered in:** {self.triggered_in.mention}\n" if len(self.channels) > 1 else ""
- channels_description = ", ".join(channel.mention for channel in self.channels)
-
- mod_alert_message = (
- f"**Triggered by:** {triggered_by_users}\n"
- f"{triggered_in_channel}"
- f"**Channels:** {channels_description}\n"
- f"**Rules:** {', '.join(rule for rule in self.rules)}\n"
- )
-
- messages_as_list = list(self.messages.values())
- first_message = messages_as_list[0]
- # For multiple messages and those with attachments or excessive newlines, use the logs API
- if any((
- len(messages_as_list) > 1,
- len(first_message.attachments) > 0,
- first_message.content.count('\n') > 15
- )):
- url = await modlog.upload_log(self.messages.values(), actor_id, self.attachments)
- mod_alert_message += f"A complete log of the offending messages can be found [here]({url})"
- else:
- mod_alert_message += "Message:\n"
- content = first_message.clean_content
- remaining_chars = 4080 - len(mod_alert_message)
-
- if len(content) > remaining_chars:
- url = await modlog.upload_log([first_message], actor_id, self.attachments)
- log_site_msg = f"The full message can be found [here]({url})"
- content = content[:remaining_chars - (3 + len(log_site_msg))] + "..."
-
- mod_alert_message += content
-
- await modlog.send_log_message(
- content=", ".join(str(m.id) for m in self.members), # quality-of-life improvement for mobile moderators
- icon_url=Icons.filtering,
- colour=Colour(Colours.soft_red),
- title="Spam detected!",
- text=mod_alert_message,
- thumbnail=first_message.author.display_avatar.url,
- channel_id=Channels.mod_alerts,
- ping_everyone=AntiSpamConfig.ping_everyone
- )
-
-
-class AntiSpam(Cog):
- """Cog that controls our anti-spam measures."""
-
- def __init__(self, bot: Bot, validation_errors: Dict[str, str]) -> None:
- self.bot = bot
- self.validation_errors = validation_errors
- role_id = AntiSpamConfig.punishment['role_id']
- self.muted_role = Object(role_id)
- self.expiration_date_converter = Duration()
-
- self.message_deletion_queue = dict()
-
- # Fetch the rule configuration with the highest rule interval.
- max_interval_config = max(
- AntiSpamConfig.rules.values(),
- key=itemgetter('interval')
- )
- self.max_interval = max_interval_config['interval']
- self.cache = MessageCache(AntiSpamConfig.cache_size, newest_first=True)
-
- @property
- def mod_log(self) -> ModLog:
- """Allows for easy access of the ModLog cog."""
- return self.bot.get_cog("ModLog")
-
- async def cog_load(self) -> None:
- """Unloads the cog and alerts admins if configuration validation failed."""
- await self.bot.wait_until_guild_available()
- if self.validation_errors:
- body = "**The following errors were encountered:**\n"
- body += "\n".join(f"- {error}" for error in self.validation_errors.values())
- body += "\n\n**The cog has been unloaded.**"
-
- await self.mod_log.send_log_message(
- title="Error: AntiSpam configuration validation failed!",
- text=body,
- ping_everyone=True,
- icon_url=Icons.token_removed,
- colour=Colour.red()
- )
-
- self.bot.remove_cog(self.__class__.__name__)
- return
-
- @Cog.listener()
- async def on_message(self, message: Message) -> None:
- """Applies the antispam rules to each received message."""
- if (
- not message.guild
- or message.guild.id != GuildConfig.id
- or message.author.bot
- or (getattr(message.channel, "category", None) and message.channel.category.name == JAM_CATEGORY_NAME)
- or (message.channel.id in Filter.channel_whitelist and not DEBUG_MODE)
- or (any(role.id in Filter.role_whitelist for role in message.author.roles) and not DEBUG_MODE)
- or message.type == MessageType.auto_moderation_action
- ):
- return
-
- self.cache.append(message)
-
- earliest_relevant_at = arrow.utcnow() - timedelta(seconds=self.max_interval)
- relevant_messages = list(takewhile(lambda msg: msg.created_at > earliest_relevant_at, self.cache))
-
- for rule_name in AntiSpamConfig.rules:
- rule_config = AntiSpamConfig.rules[rule_name]
- rule_function = RULE_FUNCTION_MAPPING[rule_name]
-
- # Create a list of messages that were sent in the interval that the rule cares about.
- latest_interesting_stamp = arrow.utcnow() - timedelta(seconds=rule_config['interval'])
- messages_for_rule = list(
- takewhile(lambda msg: msg.created_at > latest_interesting_stamp, relevant_messages)
- )
-
- result = await rule_function(message, messages_for_rule, rule_config)
-
- # If the rule returns `None`, that means the message didn't violate it.
- # If it doesn't, it returns a tuple in the form `(str, Iterable[discord.Member])`
- # which contains the reason for why the message violated the rule and
- # an iterable of all members that violated the rule.
- if result is not None:
- self.bot.stats.incr(f"mod_alerts.{rule_name}")
- reason, members, relevant_messages = result
- full_reason = f"`{rule_name}` rule: {reason}"
-
- # If there's no spam event going on for this channel, start a new Message Deletion Context
- authors_set = frozenset(members)
- if authors_set not in self.message_deletion_queue:
- log.trace(f"Creating queue for members `{authors_set}`")
- self.message_deletion_queue[authors_set] = DeletionContext(authors_set, message.channel)
- scheduling.create_task(
- self._process_deletion_context(authors_set),
- name=f"AntiSpam._process_deletion_context({authors_set})"
- )
-
- # Add the relevant of this trigger to the Deletion Context
- await self.message_deletion_queue[authors_set].add(
- rule_name=rule_name,
- channels=set(message.channel for message in relevant_messages),
- messages=relevant_messages
- )
-
- for member in members:
- scheduling.create_task(
- self.punish(message, member, full_reason),
- name=f"AntiSpam.punish(message={message.id}, member={member.id}, rule={rule_name})"
- )
-
- await self.maybe_delete_messages(relevant_messages)
- break
-
- @lock.lock_arg("antispam.punish", "member", attrgetter("id"))
- async def punish(self, msg: Message, member: Member, reason: str) -> None:
- """Punishes the given member for triggering an antispam rule."""
- if not any(role.id == self.muted_role.id for role in member.roles):
- remove_role_after = AntiSpamConfig.punishment['remove_after']
-
- # Get context and make sure the bot becomes the actor of infraction by patching the `author` attributes
- context = await self.bot.get_context(msg)
- context.author = self.bot.user
-
- # Since we're going to invoke the tempmute command directly, we need to manually call the converter.
- dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S")
- await context.invoke(
- self.bot.get_command('tempmute'),
- member,
- dt_remove_role_after,
- reason=reason
- )
-
- async def maybe_delete_messages(self, messages: List[Message]) -> None:
- """Cleans the messages if cleaning is configured."""
- if AntiSpamConfig.clean_offending:
- # If we have more than one message, we can use bulk delete.
- if len(messages) > 1:
- message_ids = [message.id for message in messages]
- self.mod_log.ignore(Event.message_delete, *message_ids)
- channel_messages = defaultdict(list)
- for message in messages:
- channel_messages[message.channel].append(message)
- for channel, messages in channel_messages.items():
- try:
- await channel.delete_messages(messages)
- except NotFound:
- # In the rare case where we found messages matching the
- # spam filter across multiple channels, it is possible
- # that a single channel will only contain a single message
- # to delete. If that should be the case, discord.py will
- # use the "delete single message" endpoint instead of the
- # bulk delete endpoint, and the single message deletion
- # endpoint will complain if you give it that does not exist.
- # As this means that we have no other message to delete in
- # this channel (and message deletes work per-channel),
- # we can just log an exception and carry on with business.
- log.info(f"Tried to delete message `{messages[0].id}`, but message could not be found.")
-
- # Otherwise, the bulk delete endpoint will throw up.
- # Delete the message directly instead.
- else:
- self.mod_log.ignore(Event.message_delete, messages[0].id)
- try:
- await messages[0].delete()
- except NotFound:
- log.info(f"Tried to delete message `{messages[0].id}`, but message could not be found.")
-
- async def _process_deletion_context(self, context_id: frozenset) -> None:
- """Processes the Deletion Context queue."""
- log.trace("Sleeping before processing message deletion queue.")
- await asyncio.sleep(10)
-
- if context_id not in self.message_deletion_queue:
- log.error(f"Started processing deletion queue for context `{context_id}`, but it was not found!")
- return
-
- deletion_context = self.message_deletion_queue.pop(context_id)
- await deletion_context.upload_messages(self.bot.user.id, self.mod_log)
-
- @Cog.listener()
- async def on_message_edit(self, before: Message, after: Message) -> None:
- """Updates the message in the cache, if it's cached."""
- self.cache.update(after)
-
-
-def validate_config(rules_: Mapping = AntiSpamConfig.rules) -> Dict[str, str]:
- """Validates the antispam configs."""
- validation_errors = {}
- for name, config in rules_.items():
- if name not in RULE_FUNCTION_MAPPING:
- log.error(
- f"Unrecognized antispam rule `{name}`. "
- f"Valid rules are: {', '.join(RULE_FUNCTION_MAPPING)}"
- )
- validation_errors[name] = f"`{name}` is not recognized as an antispam rule."
- continue
- for required_key in ('interval', 'max'):
- if required_key not in config:
- log.error(
- f"`{required_key}` is required but was not "
- f"set in rule `{name}`'s configuration."
- )
- validation_errors[name] = f"Key `{required_key}` is required but not set for rule `{name}`"
- return validation_errors
-
-
-async def setup(bot: Bot) -> None:
- """Validate the AntiSpam configs and load the AntiSpam cog."""
- validation_errors = validate_config()
- await bot.add_cog(AntiSpam(bot, validation_errors))
diff --git a/bot/exts/filters/filter_lists.py b/bot/exts/filters/filter_lists.py
deleted file mode 100644
index c643f9a84..000000000
--- a/bot/exts/filters/filter_lists.py
+++ /dev/null
@@ -1,297 +0,0 @@
-import re
-from typing import Optional
-
-from botcore.site_api import ResponseCodeError
-from discord import Colour, Embed
-from discord.ext.commands import BadArgument, Cog, Context, IDConverter, group, has_any_role
-
-from bot import constants
-from bot.bot import Bot
-from bot.constants import Channels
-from bot.converters import ValidDiscordServerInvite, ValidFilterListType
-from bot.log import get_logger
-from bot.pagination import LinePaginator
-
-log = get_logger(__name__)
-
-
-class FilterLists(Cog):
- """Commands for blacklisting and whitelisting things."""
-
- methods_with_filterlist_types = [
- "allow_add",
- "allow_delete",
- "allow_get",
- "deny_add",
- "deny_delete",
- "deny_get",
- ]
-
- def __init__(self, bot: Bot) -> None:
- self.bot = bot
-
- async def cog_load(self) -> None:
- """Add the valid FilterList types to the docstrings, so they'll appear in !help invocations."""
- await self.bot.wait_until_guild_available()
-
- # Add valid filterlist types to the docstrings
- valid_types = await ValidFilterListType.get_valid_types(self.bot)
- valid_types = [f"`{type_.lower()}`" for type_ in valid_types]
-
- for method_name in self.methods_with_filterlist_types:
- command = getattr(self, method_name)
- command.help = (
- f"{command.help}\n\nValid **list_type** values are {', '.join(valid_types)}."
- )
-
- async def _add_data(
- self,
- ctx: Context,
- allowed: bool,
- list_type: ValidFilterListType,
- content: str,
- comment: Optional[str] = None,
- ) -> None:
- """Add an item to a filterlist."""
- allow_type = "whitelist" if allowed else "blacklist"
-
- # If this is a guild invite, we gotta validate it.
- if list_type == "GUILD_INVITE":
- guild_data = await self._validate_guild_invite(ctx, content)
- content = guild_data.get("id")
-
- # Some guild invites are autoban filters, which require the mod
- # to set a comment which includes [autoban].
- # Having the guild name in the comment is still useful when reviewing
- # filter list, so prepend it to the set comment in case some mod forgets.
- guild_name_part = f'Guild "{guild_data["name"]}"' if "name" in guild_data else None
-
- comment = " - ".join(
- comment_part
- for comment_part in (guild_name_part, comment)
- if comment_part
- )
-
- # If it's a file format, let's make sure it has a leading dot.
- elif list_type == "FILE_FORMAT" and not content.startswith("."):
- content = f".{content}"
-
- # If it's a filter token, validate the passed regex
- elif list_type == "FILTER_TOKEN":
- try:
- re.compile(content)
- except re.error as e:
- await ctx.message.add_reaction("❌")
- await ctx.send(
- f"{ctx.author.mention} that's not a valid regex! "
- f"Regex error message: {e.msg}."
- )
- return
-
- # Try to add the item to the database
- log.trace(f"Trying to add the {content} item to the {list_type} {allow_type}")
- payload = {
- "allowed": allowed,
- "type": list_type,
- "content": content,
- "comment": comment,
- }
-
- try:
- item = await self.bot.api_client.post(
- "bot/filter-lists",
- json=payload
- )
- except ResponseCodeError as e:
- if e.status == 400:
- await ctx.message.add_reaction("❌")
- log.debug(
- f"{ctx.author} tried to add data to a {allow_type}, but the API returned 400, "
- "probably because the request violated the UniqueConstraint."
- )
- raise BadArgument(
- f"Unable to add the item to the {allow_type}. "
- "The item probably already exists. Keep in mind that a "
- "blacklist and a whitelist for the same item cannot co-exist, "
- "and we do not permit any duplicates."
- )
- raise
-
- # If it is an autoban trigger we send a warning in #mod-meta
- if comment and "[autoban]" in comment:
- await self.bot.get_channel(Channels.mod_meta).send(
- f":warning: Heads-up! The new `{list_type}` filter "
- f"`{content}` (`{comment}`) will automatically ban users."
- )
-
- # Insert the item into the cache
- self.bot.insert_item_into_filter_list_cache(item)
- await ctx.message.add_reaction("✅")
-
- async def _delete_data(self, ctx: Context, allowed: bool, list_type: ValidFilterListType, content: str) -> None:
- """Remove an item from a filterlist."""
- allow_type = "whitelist" if allowed else "blacklist"
-
- # If this is a server invite, we need to convert it.
- if list_type == "GUILD_INVITE" and not IDConverter()._get_id_match(content):
- guild_data = await self._validate_guild_invite(ctx, content)
- content = guild_data.get("id")
-
- # If it's a file format, let's make sure it has a leading dot.
- elif list_type == "FILE_FORMAT" and not content.startswith("."):
- content = f".{content}"
-
- # Find the content and delete it.
- log.trace(f"Trying to delete the {content} item from the {list_type} {allow_type}")
- item = self.bot.filter_list_cache[f"{list_type}.{allowed}"].get(content)
-
- if item is not None:
- try:
- await self.bot.api_client.delete(
- f"bot/filter-lists/{item['id']}"
- )
- del self.bot.filter_list_cache[f"{list_type}.{allowed}"][content]
- await ctx.message.add_reaction("✅")
- except ResponseCodeError as e:
- log.debug(
- f"{ctx.author} tried to delete an item with the id {item['id']}, but "
- f"the API raised an unexpected error: {e}"
- )
- await ctx.message.add_reaction("❌")
- else:
- await ctx.message.add_reaction("❌")
-
- async def _list_all_data(self, ctx: Context, allowed: bool, list_type: ValidFilterListType) -> None:
- """Paginate and display all items in a filterlist."""
- allow_type = "whitelist" if allowed else "blacklist"
- result = self.bot.filter_list_cache[f"{list_type}.{allowed}"]
-
- # Build a list of lines we want to show in the paginator
- lines = []
- for content, metadata in result.items():
- line = f"• `{content}`"
-
- if comment := metadata.get("comment"):
- line += f" - {comment}"
-
- lines.append(line)
- lines = sorted(lines)
-
- # Build the embed
- list_type_plural = list_type.lower().replace("_", " ").title() + "s"
- embed = Embed(
- title=f"{allow_type.title()}ed {list_type_plural} ({len(result)} total)",
- colour=Colour.blue()
- )
- log.trace(f"Trying to list {len(result)} items from the {list_type.lower()} {allow_type}")
-
- if result:
- await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False)
- else:
- embed.description = "Hmmm, seems like there's nothing here yet."
- await ctx.send(embed=embed)
- await ctx.message.add_reaction("❌")
-
- async def _sync_data(self, ctx: Context) -> None:
- """Syncs the filterlists with the API."""
- try:
- log.trace("Attempting to sync FilterList cache with data from the API.")
- await self.bot.cache_filter_list_data()
- await ctx.message.add_reaction("✅")
- except ResponseCodeError as e:
- log.debug(
- f"{ctx.author} tried to sync FilterList cache data but "
- f"the API raised an unexpected error: {e}"
- )
- await ctx.message.add_reaction("❌")
-
- @staticmethod
- async def _validate_guild_invite(ctx: Context, invite: str) -> dict:
- """
- Validates a guild invite, and returns the guild info as a dict.
-
- Will raise a BadArgument if the guild invite is invalid.
- """
- log.trace(f"Attempting to validate whether or not {invite} is a guild invite.")
- validator = ValidDiscordServerInvite()
- guild_data = await validator.convert(ctx, invite)
-
- # If we make it this far without raising a BadArgument, the invite is
- # valid. Let's return a dict of guild information.
- log.trace(f"{invite} validated as server invite. Converting to ID.")
- return guild_data
-
- @group(aliases=("allowlist", "allow", "al", "wl"))
- async def whitelist(self, ctx: Context) -> None:
- """Group for whitelisting commands."""
- if not ctx.invoked_subcommand:
- await ctx.send_help(ctx.command)
-
- @group(aliases=("denylist", "deny", "bl", "dl"))
- async def blacklist(self, ctx: Context) -> None:
- """Group for blacklisting commands."""
- if not ctx.invoked_subcommand:
- await ctx.send_help(ctx.command)
-
- @whitelist.command(name="add", aliases=("a", "set"))
- async def allow_add(
- self,
- ctx: Context,
- list_type: ValidFilterListType,
- content: str,
- *,
- comment: Optional[str] = None,
- ) -> None:
- """Add an item to the specified allowlist."""
- await self._add_data(ctx, True, list_type, content, comment)
-
- @blacklist.command(name="add", aliases=("a", "set"))
- async def deny_add(
- self,
- ctx: Context,
- list_type: ValidFilterListType,
- content: str,
- *,
- comment: Optional[str] = None,
- ) -> None:
- """Add an item to the specified denylist."""
- await self._add_data(ctx, False, list_type, content, comment)
-
- @whitelist.command(name="remove", aliases=("delete", "rm",))
- async def allow_delete(self, ctx: Context, list_type: ValidFilterListType, content: str) -> None:
- """Remove an item from the specified allowlist."""
- await self._delete_data(ctx, True, list_type, content)
-
- @blacklist.command(name="remove", aliases=("delete", "rm",))
- async def deny_delete(self, ctx: Context, list_type: ValidFilterListType, content: str) -> None:
- """Remove an item from the specified denylist."""
- await self._delete_data(ctx, False, list_type, content)
-
- @whitelist.command(name="get", aliases=("list", "ls", "fetch", "show"))
- async def allow_get(self, ctx: Context, list_type: ValidFilterListType) -> None:
- """Get the contents of a specified allowlist."""
- await self._list_all_data(ctx, True, list_type)
-
- @blacklist.command(name="get", aliases=("list", "ls", "fetch", "show"))
- async def deny_get(self, ctx: Context, list_type: ValidFilterListType) -> None:
- """Get the contents of a specified denylist."""
- await self._list_all_data(ctx, False, list_type)
-
- @whitelist.command(name="sync", aliases=("s",))
- async def allow_sync(self, ctx: Context) -> None:
- """Syncs both allowlists and denylists with the API."""
- await self._sync_data(ctx)
-
- @blacklist.command(name="sync", aliases=("s",))
- async def deny_sync(self, ctx: Context) -> None:
- """Syncs both allowlists and denylists with the API."""
- await self._sync_data(ctx)
-
- async def cog_check(self, ctx: Context) -> bool:
- """Only allow moderators to invoke the commands in this cog."""
- return await has_any_role(*constants.MODERATION_ROLES).predicate(ctx)
-
-
-async def setup(bot: Bot) -> None:
- """Load the FilterLists cog."""
- await bot.add_cog(FilterLists(bot))
diff --git a/bot/exts/filters/filtering.py b/bot/exts/filters/filtering.py
deleted file mode 100644
index ca6ad0064..000000000
--- a/bot/exts/filters/filtering.py
+++ /dev/null
@@ -1,735 +0,0 @@
-import asyncio
-import re
-import unicodedata
-import urllib.parse
-from datetime import timedelta
-from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Tuple, Union
-
-import arrow
-import dateutil.parser
-import regex
-import tldextract
-from async_rediscache import RedisCache
-from botcore.site_api import ResponseCodeError
-from botcore.utils import scheduling
-from botcore.utils.regex import DISCORD_INVITE
-from dateutil.relativedelta import relativedelta
-from discord import ChannelType, Colour, Embed, Forbidden, HTTPException, Member, Message, NotFound, TextChannel
-from discord.ext.commands import Cog
-from discord.utils import escape_markdown
-
-from bot.bot import Bot
-from bot.constants import Bot as BotConfig, Channels, Colours, Filter, Guild, Icons, URLs
-from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME
-from bot.exts.moderation.modlog import ModLog
-from bot.log import get_logger
-from bot.utils.messages import format_user
-
-log = get_logger(__name__)
-
-# Regular expressions
-CODE_BLOCK_RE = re.compile(
- r"(?P<delim>``?)[^`]+?(?P=delim)(?!`+)" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-EVERYONE_PING_RE = re.compile(rf"@everyone|<@&{Guild.id}>|@here")
-SPOILER_RE = re.compile(r"(\|\|.+?\|\|)", re.DOTALL)
-URL_RE = re.compile(r"(https?://[^\s]+)", flags=re.IGNORECASE)
-
-# Exclude variation selectors from zalgo because they're actually invisible.
-VARIATION_SELECTORS = r"\uFE00-\uFE0F\U000E0100-\U000E01EF"
-INVISIBLE_RE = regex.compile(rf"[{VARIATION_SELECTORS}\p{{UNASSIGNED}}\p{{FORMAT}}\p{{CONTROL}}--\s]", regex.V1)
-ZALGO_RE = regex.compile(rf"[\p{{NONSPACING MARK}}\p{{ENCLOSING MARK}}--[{VARIATION_SELECTORS}]]", regex.V1)
-
-# Other constants.
-DAYS_BETWEEN_ALERTS = 3
-OFFENSIVE_MSG_DELETE_TIME = timedelta(days=Filter.offensive_msg_delete_days)
-
-# Autoban
-LINK_PASSWORD = "https://support.discord.com/hc/en-us/articles/218410947-I-forgot-my-Password-Where-can-I-set-a-new-one"
-LINK_2FA = "https://support.discord.com/hc/en-us/articles/219576828-Setting-up-Two-Factor-Authentication"
-AUTO_BAN_REASON = (
- "Your account has been used to send links to a phishing website. You have been automatically banned. "
- "If you are not aware of sending them, that means your account has been compromised.\n\n"
-
- f"Here is a guide from Discord on [how to change your password]({LINK_PASSWORD}).\n\n"
-
- f"We also highly recommend that you [enable 2 factor authentication on your account]({LINK_2FA}), "
- "for heightened security.\n\n"
-
- "Once you have changed your password, feel free to follow the instructions at the bottom of "
- "this message to appeal your ban."
-)
-AUTO_BAN_DURATION = timedelta(days=4)
-
-FilterMatch = Union[re.Match, dict, bool, List[Embed]]
-
-
-class Stats(NamedTuple):
- """Additional stats on a triggered filter to append to a mod log."""
-
- message_content: str
- additional_embeds: Optional[List[Embed]]
-
-
-class Filtering(Cog):
- """Filtering out invites, blacklisting domains, and warning us of certain regular expressions."""
-
- # Redis cache mapping a user ID to the last timestamp a bad nickname alert was sent
- name_alerts = RedisCache()
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.scheduler = scheduling.Scheduler(self.__class__.__name__)
- self.name_lock = asyncio.Lock()
-
- staff_mistake_str = "If you believe this was a mistake, please let staff know!"
- self.filters = {
- "filter_zalgo": {
- "enabled": Filter.filter_zalgo,
- "function": self._has_zalgo,
- "type": "filter",
- "content_only": True,
- "user_notification": Filter.notify_user_zalgo,
- "notification_msg": (
- "Your post has been removed for abusing Unicode character rendering (aka Zalgo text). "
- f"{staff_mistake_str}"
- ),
- "schedule_deletion": False
- },
- "filter_invites": {
- "enabled": Filter.filter_invites,
- "function": self._has_invites,
- "type": "filter",
- "content_only": True,
- "user_notification": Filter.notify_user_invites,
- "notification_msg": (
- f"Per Rule 6, your invite link has been removed. {staff_mistake_str}\n\n"
- r"Our server rules can be found here: <https://pythondiscord.com/pages/rules>"
- ),
- "schedule_deletion": False
- },
- "filter_domains": {
- "enabled": Filter.filter_domains,
- "function": self._has_urls,
- "type": "filter",
- "content_only": True,
- "user_notification": Filter.notify_user_domains,
- "notification_msg": (
- f"Your URL has been removed because it matched a blacklisted domain. {staff_mistake_str}"
- ),
- "schedule_deletion": False
- },
- "watch_regex": {
- "enabled": Filter.watch_regex,
- "function": self._has_watch_regex_match,
- "type": "watchlist",
- "content_only": True,
- "schedule_deletion": True
- },
- "watch_rich_embeds": {
- "enabled": Filter.watch_rich_embeds,
- "function": self._has_rich_embed,
- "type": "watchlist",
- "content_only": False,
- "schedule_deletion": False
- },
- "filter_everyone_ping": {
- "enabled": Filter.filter_everyone_ping,
- "function": self._has_everyone_ping,
- "type": "filter",
- "content_only": True,
- "user_notification": Filter.notify_user_everyone_ping,
- "notification_msg": (
- "Please don't try to ping `@everyone` or `@here`. "
- f"Your message has been removed. {staff_mistake_str}"
- ),
- "schedule_deletion": False,
- "ping_everyone": False
- },
- }
-
- async def cog_unload(self) -> None:
- """Cancel scheduled tasks."""
- self.scheduler.cancel_all()
-
- def _get_filterlist_items(self, list_type: str, *, allowed: bool) -> list:
- """Fetch items from the filter_list_cache."""
- return self.bot.filter_list_cache[f"{list_type.upper()}.{allowed}"].keys()
-
- def _get_filterlist_value(self, list_type: str, value: Any, *, allowed: bool) -> dict:
- """Fetch one specific value from filter_list_cache."""
- return self.bot.filter_list_cache[f"{list_type.upper()}.{allowed}"][value]
-
- @staticmethod
- def _expand_spoilers(text: str) -> str:
- """Return a string containing all interpretations of a spoilered message."""
- split_text = SPOILER_RE.split(text)
- return ''.join(
- split_text[0::2] + split_text[1::2] + split_text
- )
-
- @property
- def mod_log(self) -> ModLog:
- """Get currently loaded ModLog cog instance."""
- return self.bot.get_cog("ModLog")
-
- @Cog.listener()
- async def on_message(self, msg: Message) -> None:
- """Invoke message filter for new messages."""
- await self._filter_message(msg)
-
- # Ignore webhook messages.
- if msg.webhook_id is None:
- await self.check_bad_words_in_name(msg.author)
-
- @Cog.listener()
- async def on_message_edit(self, before: Message, after: Message) -> None:
- """
- Invoke message filter for message edits.
-
- Also calculates the time delta from the previous edit or when message was sent if there's no prior edits.
- """
- # We only care about changes to the message contents/attachments and embed additions, not pin status etc.
- if all((
- before.content == after.content, # content hasn't changed
- before.attachments == after.attachments, # attachments haven't changed
- len(before.embeds) >= len(after.embeds) # embeds haven't been added
- )):
- return
-
- if not before.edited_at:
- delta = relativedelta(after.edited_at, before.created_at).microseconds
- else:
- delta = relativedelta(after.edited_at, before.edited_at).microseconds
- await self._filter_message(after, delta)
-
- @Cog.listener()
- async def on_voice_state_update(self, member: Member, *_) -> None:
- """Checks for bad words in usernames when users join, switch or leave a voice channel."""
- await self.check_bad_words_in_name(member)
-
- def get_name_match(self, name: str) -> Optional[re.Match]:
- """Check bad words from passed string (name). Return the first match found."""
- normalised_name = unicodedata.normalize("NFKC", name)
- cleaned_normalised_name = "".join([c for c in normalised_name if not unicodedata.combining(c)])
-
- # Run filters against normalised, cleaned normalised and the original name,
- # in case we have filters for one but not the other.
- names_to_check = (name, normalised_name, cleaned_normalised_name)
-
- watchlist_patterns = self._get_filterlist_items('filter_token', allowed=False)
- for pattern in watchlist_patterns:
- for name in names_to_check:
- if match := re.search(pattern, name, flags=re.IGNORECASE):
- return match
- return None
-
- async def check_send_alert(self, member: Member) -> bool:
- """When there is less than 3 days after last alert, return `False`, otherwise `True`."""
- if last_alert := await self.name_alerts.get(member.id):
- last_alert = arrow.get(last_alert)
- if arrow.utcnow() - timedelta(days=DAYS_BETWEEN_ALERTS) < last_alert:
- log.trace(f"Last alert was too recent for {member}'s nickname.")
- return False
-
- return True
-
- async def check_bad_words_in_name(self, member: Member) -> None:
- """Send a mod alert every 3 days if a username still matches a watchlist pattern."""
- # Use lock to avoid race conditions
- async with self.name_lock:
- # Check if we recently alerted about this user first,
- # to avoid running all the filter tokens against their name again.
- if not await self.check_send_alert(member):
- return
-
- # Check whether the users display name contains any words in our blacklist
- match = self.get_name_match(member.display_name)
- if not match:
- return
-
- log.info(f"Sending bad nickname alert for '{member.display_name}' ({member.id}).")
-
- log_string = (
- f"**User:** {format_user(member)}\n"
- f"**Display Name:** {escape_markdown(member.display_name)}\n"
- f"**Bad Match:** {match.group()}"
- )
-
- await self.mod_log.send_log_message(
- content=str(member.id), # quality-of-life improvement for mobile moderators
- icon_url=Icons.token_removed,
- colour=Colours.soft_red,
- title="Username filtering alert",
- text=log_string,
- channel_id=Channels.mod_alerts,
- thumbnail=member.display_avatar.url,
- ping_everyone=True
- )
-
- # Update time when alert sent
- await self.name_alerts.set(member.id, arrow.utcnow().timestamp())
-
- async def filter_snekbox_output(self, result: str, msg: Message) -> bool:
- """
- Filter the result of a snekbox command to see if it violates any of our rules, and then respond accordingly.
-
- Also requires the original message, to check whether to filter and for mod logs.
- Returns whether a filter was triggered or not.
- """
- filter_triggered = False
- # Should we filter this message?
- if self._check_filter(msg):
- for filter_name, _filter in self.filters.items():
- # Is this specific filter enabled in the config?
- # We also do not need to worry about filters that take the full message,
- # since all we have is an arbitrary string.
- if _filter["enabled"] and _filter["content_only"]:
- filter_result = await _filter["function"](result)
- reason = None
-
- if isinstance(filter_result, tuple):
- match, reason = filter_result
- else:
- match = filter_result
-
- if match:
- # If this is a filter (not a watchlist), we set the variable so we know
- # that it has been triggered
- if _filter["type"] == "filter":
- filter_triggered = True
-
- stats = self._add_stats(filter_name, match, result)
- await self._send_log(filter_name, _filter, msg, stats, reason, is_eval=True)
-
- break # We don't want multiple filters to trigger
-
- return filter_triggered
-
- async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None:
- """Filter the input message to see if it violates any of our rules, and then respond accordingly."""
- # Should we filter this message?
- if self._check_filter(msg):
- for filter_name, _filter in self.filters.items():
- # Is this specific filter enabled in the config?
- if _filter["enabled"]:
- # Double trigger check for the embeds filter
- if filter_name == "watch_rich_embeds":
- # If the edit delta is less than 0.001 seconds, then we're probably dealing
- # with a double filter trigger.
- if delta is not None and delta < 100:
- continue
-
- if filter_name in ("filter_invites", "filter_everyone_ping"):
- # Disable invites filter in codejam team channels
- category = getattr(msg.channel, "category", None)
- if category and category.name == JAM_CATEGORY_NAME:
- continue
-
- # Does the filter only need the message content or the full message?
- if _filter["content_only"]:
- payload = msg.content
- else:
- payload = msg
-
- result = await _filter["function"](payload)
- reason = None
-
- if isinstance(result, tuple):
- match, reason = result
- else:
- match = result
-
- if match:
- is_private = msg.channel.type is ChannelType.private
-
- # If this is a filter (not a watchlist) and not in a DM, delete the message.
- if _filter["type"] == "filter" and not is_private:
- try:
- # Embeds (can?) trigger both the `on_message` and `on_message_edit`
- # event handlers, triggering filtering twice for the same message.
- #
- # If `on_message`-triggered filtering already deleted the message
- # then `on_message_edit`-triggered filtering will raise exception
- # since the message no longer exists.
- #
- # In addition, to avoid sending two notifications to the user, the
- # logs, and mod_alert, we return if the message no longer exists.
- await msg.delete()
- except NotFound:
- return
-
- # Notify the user if the filter specifies
- if _filter["user_notification"]:
- await self.notify_member(msg.author, _filter["notification_msg"], msg.channel)
-
- # If the message is classed as offensive, we store it in the site db and
- # it will be deleted after one week.
- if _filter["schedule_deletion"] and not is_private:
- delete_date = (msg.created_at + OFFENSIVE_MSG_DELETE_TIME).isoformat()
- data = {
- 'id': msg.id,
- 'channel_id': msg.channel.id,
- 'delete_date': delete_date
- }
-
- try:
- await self.bot.api_client.post('bot/offensive-messages', json=data)
- except ResponseCodeError as e:
- if e.status == 400 and "already exists" in e.response_json.get("id", [""])[0]:
- log.debug(f"Offensive message {msg.id} already exists.")
- else:
- log.error(f"Offensive message {msg.id} failed to post: {e}")
- else:
- self.schedule_msg_delete(data)
- log.trace(f"Offensive message {msg.id} will be deleted on {delete_date}")
-
- stats = self._add_stats(filter_name, match, msg.content)
-
- # If the filter reason contains `[autoban]`, we want to auto-ban the user.
- # Also pass this to _send_log so mods are not pinged filter matches that are auto-actioned
- autoban = reason and "[autoban]" in reason.lower()
- if not autoban and filter_name == "filter_invites" and isinstance(result, dict):
- autoban = any(
- "[autoban]" in invite_info["reason"].lower()
- for invite_info in result.values()
- if invite_info.get("reason")
- )
-
- await self._send_log(filter_name, _filter, msg, stats, reason, autoban=autoban)
-
- if autoban:
- # Create a new context, with the author as is the bot, and the channel as #mod-alerts.
- # This sends the ban confirmation directly under watchlist trigger embed, to inform
- # mods that the user was auto-banned for the message.
- context = await self.bot.get_context(msg)
- context.guild = self.bot.get_guild(Guild.id)
- context.author = context.guild.get_member(self.bot.user.id)
- context.channel = self.bot.get_channel(Channels.mod_alerts)
- context.command = self.bot.get_command("tempban")
-
- await context.invoke(
- context.command,
- msg.author,
- arrow.utcnow() + AUTO_BAN_DURATION,
- reason=AUTO_BAN_REASON
- )
-
- break # We don't want multiple filters to trigger
-
- async def _send_log(
- self,
- filter_name: str,
- _filter: Dict[str, Any],
- msg: Message,
- stats: Stats,
- reason: Optional[str] = None,
- *,
- is_eval: bool = False,
- autoban: bool = False,
- ) -> None:
- """Send a mod log for a triggered filter."""
- if msg.channel.type is ChannelType.private:
- channel_str = "via DM"
- ping_everyone = False
- else:
- channel_str = f"in {msg.channel.mention}"
- # Allow specific filters to override ping_everyone
- ping_everyone = Filter.ping_everyone and _filter.get("ping_everyone", True)
-
- content = str(msg.author.id) # quality-of-life improvement for mobile moderators
-
- # If we are going to autoban, we don't want to ping and don't need the user ID
- if autoban:
- ping_everyone = False
- content = None
-
- eval_msg = f"using {BotConfig.prefix}eval " if is_eval else ""
- footer = f"Reason: {reason}" if reason else None
- message = (
- f"The {filter_name} {_filter['type']} was triggered by {format_user(msg.author)} "
- f"{channel_str} {eval_msg}with [the following message]({msg.jump_url}):\n\n"
- f"{stats.message_content}"
- )
-
- log.debug(message)
-
- # Send pretty mod log embed to mod-alerts
- await self.mod_log.send_log_message(
- content=content,
- icon_url=Icons.filtering,
- colour=Colour(Colours.soft_red),
- title=f"{_filter['type'].title()} triggered!",
- text=message,
- thumbnail=msg.author.display_avatar.url,
- channel_id=Channels.mod_alerts,
- ping_everyone=ping_everyone,
- additional_embeds=stats.additional_embeds,
- footer=footer,
- )
-
- def _add_stats(self, name: str, match: FilterMatch, content: str) -> Stats:
- """Adds relevant statistical information to the relevant filter and increments the bot's stats."""
- # Word and match stats for watch_regex
- if name == "watch_regex":
- surroundings = match.string[max(match.start() - 10, 0): match.end() + 10]
- message_content = (
- f"**Match:** '{match[0]}'\n"
- f"**Location:** '...{escape_markdown(surroundings)}...'\n"
- f"\n**Original Message:**\n{escape_markdown(content)}"
- )
- else: # Use original content
- message_content = content
-
- additional_embeds = None
-
- self.bot.stats.incr(f"filters.{name}")
-
- # The function returns True for invalid invites.
- # They have no data so additional embeds can't be created for them.
- if name == "filter_invites" and match is not True:
- additional_embeds = []
- for _, data in match.items():
- reason = f"Reason: {data['reason']} | " if data.get('reason') else ""
- embed = Embed(description=(
- f"**Members:**\n{data['members']}\n"
- f"**Active:**\n{data['active']}"
- ))
- embed.set_author(name=data["name"])
- embed.set_thumbnail(url=data["icon"])
- embed.set_footer(text=f"{reason}Guild ID: {data['id']}")
- additional_embeds.append(embed)
-
- elif name == "watch_rich_embeds":
- additional_embeds = match
-
- return Stats(message_content, additional_embeds)
-
- @staticmethod
- def _check_filter(msg: Message) -> bool:
- """Check whitelists to see if we should filter this message."""
- role_whitelisted = False
-
- if type(msg.author) is Member: # Only Member has roles, not User.
- for role in msg.author.roles:
- if role.id in Filter.role_whitelist:
- role_whitelisted = True
-
- return (
- msg.channel.id not in Filter.channel_whitelist # Channel not in whitelist
- and not role_whitelisted # Role not in whitelist
- and not msg.author.bot # Author not a bot
- )
-
- async def _has_watch_regex_match(self, text: str) -> Tuple[Union[bool, re.Match], Optional[str]]:
- """
- Return True if `text` matches any regex from `word_watchlist` or `token_watchlist` configs.
-
- `word_watchlist`'s patterns are placed between word boundaries while `token_watchlist` is
- matched as-is. Spoilers are expanded, if any, and URLs are ignored.
- Second return value is a reason written to database about blacklist entry (can be None).
- """
- if SPOILER_RE.search(text):
- text = self._expand_spoilers(text)
-
- text = self.clean_input(text)
-
- watchlist_patterns = self._get_filterlist_items('filter_token', allowed=False)
- for pattern in watchlist_patterns:
- match = re.search(pattern, text, flags=re.IGNORECASE)
- if match:
- return match, self._get_filterlist_value('filter_token', pattern, allowed=False)['comment']
-
- return False, None
-
- async def _has_urls(self, text: str) -> Tuple[bool, Optional[str]]:
- """
- Returns True if the text contains one of the blacklisted URLs from the config file.
-
- Second return value is a reason of URL blacklisting (can be None).
- """
- text = self.clean_input(text)
-
- domain_blacklist = self._get_filterlist_items("domain_name", allowed=False)
- for match in URL_RE.finditer(text):
- for url in domain_blacklist:
- if url.lower() in match.group(1).lower():
- blacklisted_parsed = tldextract.extract(url.lower())
- url_parsed = tldextract.extract(match.group(1).lower())
- if blacklisted_parsed.registered_domain == url_parsed.registered_domain:
- return True, self._get_filterlist_value("domain_name", url, allowed=False)["comment"]
- return False, None
-
- @staticmethod
- async def _has_zalgo(text: str) -> bool:
- """
- Returns True if the text contains zalgo characters.
-
- Zalgo range is \u0300 – \u036F and \u0489.
- """
- return bool(ZALGO_RE.search(text))
-
- async def _has_invites(self, text: str) -> Union[dict, bool]:
- """
- Checks if there's any invites in the text content that aren't in the guild whitelist.
-
- If any are detected, a dictionary of invite data is returned, with a key per invite.
- If none are detected, False is returned.
- If we are unable to process an invite, True is returned.
-
- Attempts to catch some of common ways to try to cheat the system.
- """
- text = self.clean_input(text)
-
- # Remove backslashes to prevent escape character aroundfuckery like
- # discord\.gg/gdudes-pony-farm
- text = text.replace("\\", "")
-
- invites = [m.group("invite") for m in DISCORD_INVITE.finditer(text)]
- invite_data = dict()
- for invite in invites:
- invite = urllib.parse.quote_plus(invite.rstrip("/"))
- if invite in invite_data:
- continue
-
- response = await self.bot.http_session.get(
- f"{URLs.discord_invite_api}/{invite}", params={"with_counts": "true"}
- )
- response = await response.json()
- guild = response.get("guild")
- if guild is None:
- # Lack of a "guild" key in the JSON response indicates either an group DM invite, an
- # expired invite, or an invalid invite. The API does not currently differentiate
- # between invalid and expired invites
- return True
-
- guild_id = guild.get("id")
- guild_invite_whitelist = self._get_filterlist_items("guild_invite", allowed=True)
- guild_invite_blacklist = self._get_filterlist_items("guild_invite", allowed=False)
-
- # Is this invite allowed?
- guild_partnered_or_verified = (
- 'PARTNERED' in guild.get("features", [])
- or 'VERIFIED' in guild.get("features", [])
- )
- invite_not_allowed = (
- guild_id in guild_invite_blacklist # Blacklisted guilds are never permitted.
- or guild_id not in guild_invite_whitelist # Whitelisted guilds are always permitted.
- and not guild_partnered_or_verified # Otherwise guilds have to be Verified or Partnered.
- )
-
- if invite_not_allowed:
- reason = None
- if guild_id in guild_invite_blacklist:
- reason = self._get_filterlist_value("guild_invite", guild_id, allowed=False)["comment"]
-
- guild_icon_hash = guild["icon"]
- guild_icon = (
- "https://cdn.discordapp.com/icons/"
- f"{guild_id}/{guild_icon_hash}.png?size=512"
- )
-
- invite_data[invite] = {
- "name": guild["name"],
- "id": guild['id'],
- "icon": guild_icon,
- "members": response["approximate_member_count"],
- "active": response["approximate_presence_count"],
- "reason": reason
- }
-
- return invite_data if invite_data else False
-
- @staticmethod
- async def _has_rich_embed(msg: Message) -> Union[bool, List[Embed]]:
- """Determines if `msg` contains any rich embeds not auto-generated from a URL."""
- if msg.embeds:
- for embed in msg.embeds:
- if embed.type == "rich":
- urls = URL_RE.findall(msg.content)
- if not embed.url or embed.url not in urls:
- # If `embed.url` does not exist or if `embed.url` is not part of the content
- # of the message, it's unlikely to be an auto-generated embed by Discord.
- return msg.embeds
- else:
- log.trace(
- "Found a rich embed sent by a regular user account, "
- "but it was likely just an automatic URL embed."
- )
- return False
- return False
-
- @staticmethod
- async def _has_everyone_ping(text: str) -> bool:
- """Determines if `msg` contains an @everyone or @here ping outside of a codeblock."""
- # First pass to avoid running re.sub on every message
- if not EVERYONE_PING_RE.search(text):
- return False
-
- content_without_codeblocks = CODE_BLOCK_RE.sub("", text)
- return bool(EVERYONE_PING_RE.search(content_without_codeblocks))
-
- async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None:
- """
- Notify filtered_member about a moderation action with the reason str.
-
- First attempts to DM the user, fall back to in-channel notification if user has DMs disabled
- """
- try:
- await filtered_member.send(reason)
- except Forbidden:
- await channel.send(f"{filtered_member.mention} {reason}")
-
- def schedule_msg_delete(self, msg: dict) -> None:
- """Delete an offensive message once its deletion date is reached."""
- delete_at = dateutil.parser.isoparse(msg['delete_date'])
- self.scheduler.schedule_at(delete_at, msg['id'], self.delete_offensive_msg(msg))
-
- async def cog_load(self) -> None:
- """Get all the pending message deletion from the API and reschedule them."""
- await self.bot.wait_until_ready()
- response = await self.bot.api_client.get('bot/offensive-messages',)
-
- now = arrow.utcnow()
-
- for msg in response:
- delete_at = dateutil.parser.isoparse(msg['delete_date'])
-
- if delete_at < now:
- await self.delete_offensive_msg(msg)
- else:
- self.schedule_msg_delete(msg)
-
- async def delete_offensive_msg(self, msg: Mapping[str, int]) -> None:
- """Delete an offensive message, and then delete it from the db."""
- try:
- channel = self.bot.get_channel(msg['channel_id'])
- if channel:
- msg_obj = await channel.fetch_message(msg['id'])
- await msg_obj.delete()
- except NotFound:
- log.info(
- f"Tried to delete message {msg['id']}, but the message can't be found "
- f"(it has been probably already deleted)."
- )
- except HTTPException as e:
- log.warning(f"Failed to delete message {msg['id']}: status {e.status}")
-
- await self.bot.api_client.delete(f'bot/offensive-messages/{msg["id"]}')
- log.info(f"Deleted the offensive message with id {msg['id']}.")
-
- @staticmethod
- def clean_input(string: str) -> str:
- """Remove zalgo and invisible characters from `string`."""
- # For future consideration: remove characters in the Mc, Sk, and Lm categories too.
- # Can be normalised with form C to merge char + combining char into a single char to avoid
- # removing legit diacritics, but this would open up a way to bypass filters.
- no_zalgo = ZALGO_RE.sub("", string)
- return INVISIBLE_RE.sub("", no_zalgo)
-
-
-async def setup(bot: Bot) -> None:
- """Load the Filtering cog."""
- await bot.add_cog(Filtering(bot))
diff --git a/bot/exts/filters/security.py b/bot/exts/filters/security.py
deleted file mode 100644
index 27e4d9752..000000000
--- a/bot/exts/filters/security.py
+++ /dev/null
@@ -1,30 +0,0 @@
-from discord.ext.commands import Cog, Context, NoPrivateMessage
-
-from bot.bot import Bot
-from bot.log import get_logger
-
-log = get_logger(__name__)
-
-
-class Security(Cog):
- """Security-related helpers."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.bot.check(self.check_not_bot) # Global commands check - no bots can run any commands at all
- self.bot.check(self.check_on_guild) # Global commands check - commands can't be run in a DM
-
- def check_not_bot(self, ctx: Context) -> bool:
- """Check if the context is a bot user."""
- return not ctx.author.bot
-
- def check_on_guild(self, ctx: Context) -> bool:
- """Check if the context is in a guild."""
- if ctx.guild is None:
- raise NoPrivateMessage("This command cannot be used in private messages.")
- return True
-
-
-async def setup(bot: Bot) -> None:
- """Load the Security cog."""
- await bot.add_cog(Security(bot))
diff --git a/bot/exts/filters/token_remover.py b/bot/exts/filters/token_remover.py
deleted file mode 100644
index a0d5aa7b6..000000000
--- a/bot/exts/filters/token_remover.py
+++ /dev/null
@@ -1,233 +0,0 @@
-import base64
-import re
-import typing as t
-
-from discord import Colour, Message, NotFound
-from discord.ext.commands import Cog
-
-from bot import utils
-from bot.bot import Bot
-from bot.constants import Channels, Colours, Event, Icons
-from bot.exts.moderation.modlog import ModLog
-from bot.log import get_logger
-from bot.utils.members import get_or_fetch_member
-from bot.utils.messages import format_user
-
-log = get_logger(__name__)
-
-LOG_MESSAGE = (
- "Censored a seemingly valid token sent by {author} in {channel}, "
- "token was `{user_id}.{timestamp}.{hmac}`"
-)
-UNKNOWN_USER_LOG_MESSAGE = "Decoded user ID: `{user_id}` (Not present in server)."
-KNOWN_USER_LOG_MESSAGE = (
- "Decoded user ID: `{user_id}` **(Present in server)**.\n"
- "This matches `{user_name}` and means this is likely a valid **{kind}** token."
-)
-DELETION_MESSAGE_TEMPLATE = (
- "Hey {mention}! I noticed you posted a seemingly valid Discord API "
- "token in your message and have removed your message. "
- "This means that your token has been **compromised**. "
- "Please change your token **immediately** at: "
- "<https://discordapp.com/developers/applications/me>\n\n"
- "Feel free to re-post it with the token removed. "
- "If you believe this was a mistake, please let us know!"
-)
-DISCORD_EPOCH = 1_420_070_400
-TOKEN_EPOCH = 1_293_840_000
-
-# Three parts delimited by dots: user ID, creation timestamp, HMAC.
-# The HMAC isn't parsed further, but it's in the regex to ensure it at least exists in the string.
-# Each part only matches base64 URL-safe characters.
-# Padding has never been observed, but the padding character '=' is matched just in case.
-TOKEN_RE = re.compile(r"([\w\-=]+)\.([\w\-=]+)\.([\w\-=]+)", re.ASCII)
-
-
-class Token(t.NamedTuple):
- """A Discord Bot token."""
-
- user_id: str
- timestamp: str
- hmac: str
-
-
-class TokenRemover(Cog):
- """Scans messages for potential discord.py bot tokens and removes them."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @property
- def mod_log(self) -> ModLog:
- """Get currently loaded ModLog cog instance."""
- return self.bot.get_cog("ModLog")
-
- @Cog.listener()
- async def on_message(self, msg: Message) -> None:
- """
- Check each message for a string that matches Discord's token pattern.
-
- See: https://discordapp.com/developers/docs/reference#snowflakes
- """
- # Ignore DMs; can't delete messages in there anyway.
- if not msg.guild or msg.author.bot:
- return
-
- found_token = self.find_token_in_message(msg)
- if found_token:
- await self.take_action(msg, found_token)
-
- @Cog.listener()
- async def on_message_edit(self, before: Message, after: Message) -> None:
- """
- Check each edit for a string that matches Discord's token pattern.
-
- See: https://discordapp.com/developers/docs/reference#snowflakes
- """
- await self.on_message(after)
-
- async def take_action(self, msg: Message, found_token: Token) -> None:
- """Remove the `msg` containing the `found_token` and send a mod log message."""
- self.mod_log.ignore(Event.message_delete, msg.id)
-
- try:
- await msg.delete()
- except NotFound:
- log.debug(f"Failed to remove token in message {msg.id}: message already deleted.")
- return
-
- await msg.channel.send(DELETION_MESSAGE_TEMPLATE.format(mention=msg.author.mention))
-
- log_message = self.format_log_message(msg, found_token)
- userid_message, mention_everyone = await self.format_userid_log_message(msg, found_token)
- log.debug(log_message)
-
- # Send pretty mod log embed to mod-alerts
- await self.mod_log.send_log_message(
- icon_url=Icons.token_removed,
- colour=Colour(Colours.soft_red),
- title="Token removed!",
- text=log_message + "\n" + userid_message,
- thumbnail=msg.author.display_avatar.url,
- channel_id=Channels.mod_alerts,
- ping_everyone=mention_everyone,
- )
-
- self.bot.stats.incr("tokens.removed_tokens")
-
- @classmethod
- async def format_userid_log_message(cls, msg: Message, token: Token) -> t.Tuple[str, bool]:
- """
- Format the portion of the log message that includes details about the detected user ID.
-
- If the user is resolved to a member, the format includes the user ID, name, and the
- kind of user detected.
-
- If we resolve to a member and it is not a bot, we also return True to ping everyone.
-
- Returns a tuple of (log_message, mention_everyone)
- """
- user_id = cls.extract_user_id(token.user_id)
- user = await get_or_fetch_member(msg.guild, user_id)
-
- if user:
- return KNOWN_USER_LOG_MESSAGE.format(
- user_id=user_id,
- user_name=str(user),
- kind="BOT" if user.bot else "USER",
- ), True
- else:
- return UNKNOWN_USER_LOG_MESSAGE.format(user_id=user_id), False
-
- @staticmethod
- def format_log_message(msg: Message, token: Token) -> str:
- """Return the generic portion of the log message to send for `token` being censored in `msg`."""
- return LOG_MESSAGE.format(
- author=format_user(msg.author),
- channel=msg.channel.mention,
- user_id=token.user_id,
- timestamp=token.timestamp,
- hmac='x' * (len(token.hmac) - 3) + token.hmac[-3:],
- )
-
- @classmethod
- def find_token_in_message(cls, msg: Message) -> t.Optional[Token]:
- """Return a seemingly valid token found in `msg` or `None` if no token is found."""
- # Use finditer rather than search to guard against method calls prematurely returning the
- # token check (e.g. `message.channel.send` also matches our token pattern)
- for match in TOKEN_RE.finditer(msg.content):
- token = Token(*match.groups())
- if (
- (cls.extract_user_id(token.user_id) is not None)
- and cls.is_valid_timestamp(token.timestamp)
- and cls.is_maybe_valid_hmac(token.hmac)
- ):
- # Short-circuit on first match
- return token
-
- # No matching substring
- return
-
- @staticmethod
- def extract_user_id(b64_content: str) -> t.Optional[int]:
- """Return a user ID integer from part of a potential token, or None if it couldn't be decoded."""
- b64_content = utils.pad_base64(b64_content)
-
- try:
- decoded_bytes = base64.urlsafe_b64decode(b64_content)
- string = decoded_bytes.decode('utf-8')
- if not (string.isascii() and string.isdigit()):
- # This case triggers if there are fancy unicode digits in the base64 encoding,
- # that means it's not a valid user id.
- return None
- return int(string)
- except ValueError:
- return None
-
- @staticmethod
- def is_valid_timestamp(b64_content: str) -> bool:
- """
- Return True if `b64_content` decodes to a valid timestamp.
-
- If the timestamp is greater than the Discord epoch, it's probably valid.
- See: https://i.imgur.com/7WdehGn.png
- """
- b64_content = utils.pad_base64(b64_content)
-
- try:
- decoded_bytes = base64.urlsafe_b64decode(b64_content)
- timestamp = int.from_bytes(decoded_bytes, byteorder="big")
- except ValueError as e:
- log.debug(f"Failed to decode token timestamp '{b64_content}': {e}")
- return False
-
- # Seems like newer tokens don't need the epoch added, but add anyway since an upper bound
- # is not checked.
- if timestamp + TOKEN_EPOCH >= DISCORD_EPOCH:
- return True
- else:
- log.debug(f"Invalid token timestamp '{b64_content}': smaller than Discord epoch")
- return False
-
- @staticmethod
- def is_maybe_valid_hmac(b64_content: str) -> bool:
- """
- Determine if a given HMAC portion of a token is potentially valid.
-
- If the HMAC has 3 or less characters, it's probably a dummy value like "xxxxxxxxxx",
- and thus the token can probably be skipped.
- """
- unique = len(set(b64_content.lower()))
- if unique <= 3:
- log.debug(
- f"Considering the HMAC {b64_content} a dummy because it has {unique}"
- " case-insensitively unique characters"
- )
- return False
- else:
- return True
-
-
-async def setup(bot: Bot) -> None:
- """Load the TokenRemover cog."""
- await bot.add_cog(TokenRemover(bot))
diff --git a/bot/exts/filters/webhook_remover.py b/bot/exts/filters/webhook_remover.py
deleted file mode 100644
index b42613804..000000000
--- a/bot/exts/filters/webhook_remover.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import re
-
-from discord import Colour, Message, NotFound
-from discord.ext.commands import Cog
-
-from bot.bot import Bot
-from bot.constants import Channels, Colours, Event, Icons
-from bot.exts.moderation.modlog import ModLog
-from bot.log import get_logger
-from bot.utils.messages import format_user
-
-WEBHOOK_URL_RE = re.compile(
- r"((?:https?:\/\/)?(?:ptb\.|canary\.)?discord(?:app)?\.com\/api\/webhooks\/\d+\/)\S+\/?",
- re.IGNORECASE
-)
-
-ALERT_MESSAGE_TEMPLATE = (
- "{user}, looks like you posted a Discord webhook URL. Therefore, your "
- "message has been removed, and your webhook has been deleted. "
- "You can re-create it if you wish to. If you believe this was a "
- "mistake, please let us know."
-)
-
-log = get_logger(__name__)
-
-
-class WebhookRemover(Cog):
- """Scan messages to detect Discord webhooks links."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @property
- def mod_log(self) -> ModLog:
- """Get current instance of `ModLog`."""
- return self.bot.get_cog("ModLog")
-
- async def delete_and_respond(self, msg: Message, redacted_url: str, *, webhook_deleted: bool) -> None:
- """Delete `msg` and send a warning that it contained the Discord webhook `redacted_url`."""
- # Don't log this, due internal delete, not by user. Will make different entry.
- self.mod_log.ignore(Event.message_delete, msg.id)
-
- try:
- await msg.delete()
- except NotFound:
- log.debug(f"Failed to remove webhook in message {msg.id}: message already deleted.")
- return
-
- await msg.channel.send(ALERT_MESSAGE_TEMPLATE.format(user=msg.author.mention))
- if webhook_deleted:
- delete_state = "The webhook was successfully deleted."
- else:
- delete_state = "There was an error when deleting the webhook, it might have already been removed."
- message = (
- f"{format_user(msg.author)} posted a Discord webhook URL to {msg.channel.mention}. {delete_state} "
- f"Webhook URL was `{redacted_url}`"
- )
- log.debug(message)
-
- # Send entry to moderation alerts.
- await self.mod_log.send_log_message(
- icon_url=Icons.token_removed,
- colour=Colour(Colours.soft_red),
- title="Discord webhook URL removed!",
- text=message,
- thumbnail=msg.author.display_avatar.url,
- channel_id=Channels.mod_alerts
- )
-
- self.bot.stats.incr("tokens.removed_webhooks")
-
- @Cog.listener()
- async def on_message(self, msg: Message) -> None:
- """Check if a Discord webhook URL is in `message`."""
- # Ignore DMs; can't delete messages in there anyway.
- if not msg.guild or msg.author.bot:
- return
-
- matches = WEBHOOK_URL_RE.search(msg.content)
- if matches:
- async with self.bot.http_session.delete(matches[0]) as resp:
- # The Discord API Returns a 204 NO CONTENT response on success.
- deleted_successfully = resp.status == 204
- await self.delete_and_respond(msg, matches[1] + "xxx", webhook_deleted=deleted_successfully)
-
- @Cog.listener()
- async def on_message_edit(self, before: Message, after: Message) -> None:
- """Check if a Discord webhook URL is in the edited message `after`."""
- await self.on_message(after)
-
-
-async def setup(bot: Bot) -> None:
- """Load `WebhookRemover` cog."""
- await bot.add_cog(WebhookRemover(bot))
diff --git a/bot/exts/info/codeblock/_cog.py b/bot/exts/info/codeblock/_cog.py
index 9027105d9..cc5862131 100644
--- a/bot/exts/info/codeblock/_cog.py
+++ b/bot/exts/info/codeblock/_cog.py
@@ -8,8 +8,6 @@ from discord.ext.commands import Cog
from bot import constants
from bot.bot import Bot
-from bot.exts.filters.token_remover import TokenRemover
-from bot.exts.filters.webhook_remover import WEBHOOK_URL_RE
from bot.exts.info.codeblock._instructions import get_instructions
from bot.log import get_logger
from bot.utils import has_lines
@@ -135,8 +133,6 @@ class CodeBlockCog(Cog, name="Code Block"):
not message.author.bot
and self.is_valid_channel(message.channel)
and has_lines(message.content, constants.CodeBlock.minimum_lines)
- and not TokenRemover.find_token_in_message(message)
- and not WEBHOOK_URL_RE.search(message.content)
)
@Cog.listener()
diff --git a/bot/exts/moderation/watchchannels/_watchchannel.py b/bot/exts/moderation/watchchannels/_watchchannel.py
index 46f9c296e..30b1f342b 100644
--- a/bot/exts/moderation/watchchannels/_watchchannel.py
+++ b/bot/exts/moderation/watchchannels/_watchchannel.py
@@ -14,8 +14,6 @@ from discord.ext.commands import Cog, Context
from bot.bot import Bot
from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig, Icons
-from bot.exts.filters.token_remover import TokenRemover
-from bot.exts.filters.webhook_remover import WEBHOOK_URL_RE
from bot.exts.moderation.modlog import ModLog
from bot.log import CustomLogger, get_logger
from bot.pagination import LinePaginator
@@ -235,9 +233,7 @@ class WatchChannel(metaclass=CogABCMeta):
await self.send_header(msg)
- if TokenRemover.find_token_in_message(msg) or WEBHOOK_URL_RE.search(msg.content):
- cleaned_content = "Content is censored because it contains a bot or webhook token."
- elif cleaned_content := msg.clean_content:
+ if cleaned_content := msg.clean_content:
# Put all non-media URLs in a code block to prevent embeds
media_urls = {embed.url for embed in msg.embeds if embed.type in ("image", "video")}
for url in URL_RE.findall(cleaned_content):
diff --git a/bot/rules/__init__.py b/bot/rules/__init__.py
deleted file mode 100644
index a01ceae73..000000000
--- a/bot/rules/__init__.py
+++ /dev/null
@@ -1,12 +0,0 @@
-# flake8: noqa
-
-from .attachments import apply as apply_attachments
-from .burst import apply as apply_burst
-from .burst_shared import apply as apply_burst_shared
-from .chars import apply as apply_chars
-from .discord_emojis import apply as apply_discord_emojis
-from .duplicates import apply as apply_duplicates
-from .links import apply as apply_links
-from .mentions import apply as apply_mentions
-from .newlines import apply as apply_newlines
-from .role_mentions import apply as apply_role_mentions
diff --git a/bot/rules/attachments.py b/bot/rules/attachments.py
deleted file mode 100644
index 8903c385c..000000000
--- a/bot/rules/attachments.py
+++ /dev/null
@@ -1,26 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total attachments exceeding the limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if (
- msg.author == last_message.author
- and len(msg.attachments) > 0
- )
- )
- total_recent_attachments = sum(len(msg.attachments) for msg in relevant_messages)
-
- if total_recent_attachments > config['max']:
- return (
- f"sent {total_recent_attachments} attachments in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/burst.py b/bot/rules/burst.py
deleted file mode 100644
index 25c5a2f33..000000000
--- a/bot/rules/burst.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects repeated messages sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
- total_relevant = len(relevant_messages)
-
- if total_relevant > config['max']:
- return (
- f"sent {total_relevant} messages in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/burst_shared.py b/bot/rules/burst_shared.py
deleted file mode 100644
index bbe9271b3..000000000
--- a/bot/rules/burst_shared.py
+++ /dev/null
@@ -1,18 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects repeated messages sent by multiple users."""
- total_recent = len(recent_messages)
-
- if total_recent > config['max']:
- return (
- f"sent {total_recent} messages in {config['interval']}s",
- set(msg.author for msg in recent_messages),
- recent_messages
- )
- return None
diff --git a/bot/rules/chars.py b/bot/rules/chars.py
deleted file mode 100644
index 1f587422c..000000000
--- a/bot/rules/chars.py
+++ /dev/null
@@ -1,24 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total message char count exceeding the limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
-
- total_recent_chars = sum(len(msg.content) for msg in relevant_messages)
-
- if total_recent_chars > config['max']:
- return (
- f"sent {total_recent_chars} characters in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/discord_emojis.py b/bot/rules/discord_emojis.py
deleted file mode 100644
index d979ac5e7..000000000
--- a/bot/rules/discord_emojis.py
+++ /dev/null
@@ -1,34 +0,0 @@
-import re
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-from emoji import demojize
-
-DISCORD_EMOJI_RE = re.compile(r"<:\w+:\d+>|:\w+:")
-CODE_BLOCK_RE = re.compile(r"```.*?```", flags=re.DOTALL)
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total Discord emojis exceeding the limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
-
- # Get rid of code blocks in the message before searching for emojis.
- # Convert Unicode emojis to :emoji: format to get their count.
- total_emojis = sum(
- len(DISCORD_EMOJI_RE.findall(demojize(CODE_BLOCK_RE.sub("", msg.content))))
- for msg in relevant_messages
- )
-
- if total_emojis > config['max']:
- return (
- f"sent {total_emojis} emojis in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/duplicates.py b/bot/rules/duplicates.py
deleted file mode 100644
index 8e4fbc12d..000000000
--- a/bot/rules/duplicates.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects duplicated messages sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if (
- msg.author == last_message.author
- and msg.content == last_message.content
- and msg.content
- )
- )
-
- total_duplicated = len(relevant_messages)
-
- if total_duplicated > config['max']:
- return (
- f"sent {total_duplicated} duplicated messages in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/links.py b/bot/rules/links.py
deleted file mode 100644
index c46b783c5..000000000
--- a/bot/rules/links.py
+++ /dev/null
@@ -1,36 +0,0 @@
-import re
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-LINK_RE = re.compile(r"(https?://[^\s]+)")
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total links exceeding the limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
- total_links = 0
- messages_with_links = 0
-
- for msg in relevant_messages:
- total_matches = len(LINK_RE.findall(msg.content))
- if total_matches:
- messages_with_links += 1
- total_links += total_matches
-
- # Only apply the filter if we found more than one message with
- # links to prevent wrongfully firing the rule on users posting
- # e.g. an installation log of pip packages from GitHub.
- if total_links > config['max'] and messages_with_links > 1:
- return (
- f"sent {total_links} links in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/mentions.py b/bot/rules/mentions.py
deleted file mode 100644
index 6f5addad1..000000000
--- a/bot/rules/mentions.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total mentions exceeding the limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
-
- total_recent_mentions = sum(
- not user.bot
- for msg in relevant_messages
- for user in msg.mentions
- )
-
- if total_recent_mentions > config['max']:
- return (
- f"sent {total_recent_mentions} mentions in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/bot/rules/newlines.py b/bot/rules/newlines.py
deleted file mode 100644
index 4e66e1359..000000000
--- a/bot/rules/newlines.py
+++ /dev/null
@@ -1,45 +0,0 @@
-import re
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total newlines exceeding the set limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
-
- # Identify groups of newline characters and get group & total counts
- exp = r"(\n+)"
- newline_counts = []
- for msg in relevant_messages:
- newline_counts += [len(group) for group in re.findall(exp, msg.content)]
- total_recent_newlines = sum(newline_counts)
-
- # Get maximum newline group size
- if newline_counts:
- max_newline_group = max(newline_counts)
- else:
- # If no newlines are found, newline_counts will be an empty list, which will error out max()
- max_newline_group = 0
-
- # Check first for total newlines, if this passes then check for large groupings
- if total_recent_newlines > config['max']:
- return (
- f"sent {total_recent_newlines} newlines in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- elif max_newline_group > config['max_consecutive']:
- return (
- f"sent {max_newline_group} consecutive newlines in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
-
- return None
diff --git a/bot/rules/role_mentions.py b/bot/rules/role_mentions.py
deleted file mode 100644
index 0649540b6..000000000
--- a/bot/rules/role_mentions.py
+++ /dev/null
@@ -1,24 +0,0 @@
-from typing import Dict, Iterable, List, Optional, Tuple
-
-from discord import Member, Message
-
-
-async def apply(
- last_message: Message, recent_messages: List[Message], config: Dict[str, int]
-) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
- """Detects total role mentions exceeding the limit sent by a single user."""
- relevant_messages = tuple(
- msg
- for msg in recent_messages
- if msg.author == last_message.author
- )
-
- total_recent_mentions = sum(len(msg.role_mentions) for msg in relevant_messages)
-
- if total_recent_mentions > config['max']:
- return (
- f"sent {total_recent_mentions} role mentions in {config['interval']}s",
- (last_message.author,),
- relevant_messages
- )
- return None
diff --git a/tests/bot/exts/filters/__init__.py b/tests/bot/exts/filters/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/tests/bot/exts/filters/__init__.py
+++ /dev/null
diff --git a/tests/bot/exts/filters/test_antimalware.py b/tests/bot/exts/filters/test_antimalware.py
deleted file mode 100644
index 7282334e2..000000000
--- a/tests/bot/exts/filters/test_antimalware.py
+++ /dev/null
@@ -1,202 +0,0 @@
-import unittest
-from unittest.mock import AsyncMock, Mock
-
-from discord import NotFound
-
-from bot.constants import Channels, STAFF_ROLES
-from bot.exts.filters import antimalware
-from tests.helpers import MockAttachment, MockBot, MockMessage, MockRole
-
-
-class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
- """Test the AntiMalware cog."""
-
- def setUp(self):
- """Sets up fresh objects for each test."""
- self.bot = MockBot()
- self.bot.filter_list_cache = {
- "FILE_FORMAT.True": {
- ".first": {},
- ".second": {},
- ".third": {},
- }
- }
- self.cog = antimalware.AntiMalware(self.bot)
- self.message = MockMessage()
- self.message.webhook_id = None
- self.message.author.bot = None
- self.whitelist = [".first", ".second", ".third"]
-
- async def test_message_with_allowed_attachment(self):
- """Messages with allowed extensions should not be deleted"""
- attachment = MockAttachment(filename="python.first")
- self.message.attachments = [attachment]
-
- await self.cog.on_message(self.message)
- self.message.delete.assert_not_called()
-
- async def test_message_without_attachment(self):
- """Messages without attachments should result in no action."""
- await self.cog.on_message(self.message)
- self.message.delete.assert_not_called()
-
- async def test_direct_message_with_attachment(self):
- """Direct messages should have no action taken."""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.attachments = [attachment]
- self.message.guild = None
-
- await self.cog.on_message(self.message)
-
- self.message.delete.assert_not_called()
-
- async def test_webhook_message_with_illegal_extension(self):
- """A webhook message containing an illegal extension should be ignored."""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.webhook_id = 697140105563078727
- self.message.attachments = [attachment]
-
- await self.cog.on_message(self.message)
-
- self.message.delete.assert_not_called()
-
- async def test_bot_message_with_illegal_extension(self):
- """A bot message containing an illegal extension should be ignored."""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.author.bot = 409107086526644234
- self.message.attachments = [attachment]
-
- await self.cog.on_message(self.message)
-
- self.message.delete.assert_not_called()
-
- async def test_message_with_illegal_extension_gets_deleted(self):
- """A message containing an illegal extension should send an embed."""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.attachments = [attachment]
-
- await self.cog.on_message(self.message)
-
- self.message.delete.assert_called_once()
-
- async def test_message_send_by_staff(self):
- """A message send by a member of staff should be ignored."""
- staff_role = MockRole(id=STAFF_ROLES[0])
- self.message.author.roles.append(staff_role)
- attachment = MockAttachment(filename="python.disallowed")
- self.message.attachments = [attachment]
-
- await self.cog.on_message(self.message)
-
- self.message.delete.assert_not_called()
-
- async def test_python_file_redirect_embed_description(self):
- """A message containing a .py file should result in an embed redirecting the user to our paste site"""
- attachment = MockAttachment(filename="python.py")
- self.message.attachments = [attachment]
- self.message.channel.send = AsyncMock()
-
- await self.cog.on_message(self.message)
- self.message.channel.send.assert_called_once()
- args, kwargs = self.message.channel.send.call_args
- embed = kwargs.pop("embed")
-
- self.assertEqual(embed.description, antimalware.PY_EMBED_DESCRIPTION)
-
- async def test_txt_file_redirect_embed_description(self):
- """A message containing a .txt/.json/.csv file should result in the correct embed."""
- test_values = (
- ("text", ".txt"),
- ("json", ".json"),
- ("csv", ".csv"),
- )
-
- for file_name, disallowed_extension in test_values:
- with self.subTest(file_name=file_name, disallowed_extension=disallowed_extension):
-
- attachment = MockAttachment(filename=f"{file_name}{disallowed_extension}")
- self.message.attachments = [attachment]
- self.message.channel.send = AsyncMock()
- antimalware.TXT_EMBED_DESCRIPTION = Mock()
- antimalware.TXT_EMBED_DESCRIPTION.format.return_value = "test"
-
- await self.cog.on_message(self.message)
- self.message.channel.send.assert_called_once()
- args, kwargs = self.message.channel.send.call_args
- embed = kwargs.pop("embed")
- cmd_channel = self.bot.get_channel(Channels.bot_commands)
-
- self.assertEqual(
- embed.description,
- antimalware.TXT_EMBED_DESCRIPTION.format.return_value
- )
- antimalware.TXT_EMBED_DESCRIPTION.format.assert_called_with(
- blocked_extension=disallowed_extension,
- cmd_channel_mention=cmd_channel.mention
- )
-
- async def test_other_disallowed_extension_embed_description(self):
- """Test the description for a non .py/.txt/.json/.csv disallowed extension."""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.attachments = [attachment]
- self.message.channel.send = AsyncMock()
- antimalware.DISALLOWED_EMBED_DESCRIPTION = Mock()
- antimalware.DISALLOWED_EMBED_DESCRIPTION.format.return_value = "test"
-
- await self.cog.on_message(self.message)
- self.message.channel.send.assert_called_once()
- args, kwargs = self.message.channel.send.call_args
- embed = kwargs.pop("embed")
- meta_channel = self.bot.get_channel(Channels.meta)
-
- self.assertEqual(embed.description, antimalware.DISALLOWED_EMBED_DESCRIPTION.format.return_value)
- antimalware.DISALLOWED_EMBED_DESCRIPTION.format.assert_called_with(
- joined_whitelist=", ".join(self.whitelist),
- blocked_extensions_str=".disallowed",
- meta_channel_mention=meta_channel.mention
- )
-
- async def test_removing_deleted_message_logs(self):
- """Removing an already deleted message logs the correct message"""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.attachments = [attachment]
- self.message.delete = AsyncMock(side_effect=NotFound(response=Mock(status=""), message=""))
-
- with self.assertLogs(logger=antimalware.log, level="INFO"):
- await self.cog.on_message(self.message)
- self.message.delete.assert_called_once()
-
- async def test_message_with_illegal_attachment_logs(self):
- """Deleting a message with an illegal attachment should result in a log."""
- attachment = MockAttachment(filename="python.disallowed")
- self.message.attachments = [attachment]
-
- with self.assertLogs(logger=antimalware.log, level="INFO"):
- await self.cog.on_message(self.message)
-
- async def test_get_disallowed_extensions(self):
- """The return value should include all non-whitelisted extensions."""
- test_values = (
- ([], []),
- (self.whitelist, []),
- ([".first"], []),
- ([".first", ".disallowed"], [".disallowed"]),
- ([".disallowed"], [".disallowed"]),
- ([".disallowed", ".illegal"], [".disallowed", ".illegal"]),
- )
-
- for extensions, expected_disallowed_extensions in test_values:
- with self.subTest(extensions=extensions, expected_disallowed_extensions=expected_disallowed_extensions):
- self.message.attachments = [MockAttachment(filename=f"filename{extension}") for extension in extensions]
- disallowed_extensions = self.cog._get_disallowed_extensions(self.message)
- self.assertCountEqual(disallowed_extensions, expected_disallowed_extensions)
-
-
-class AntiMalwareSetupTests(unittest.IsolatedAsyncioTestCase):
- """Tests setup of the `AntiMalware` cog."""
-
- async def test_setup(self):
- """Setup of the extension should call add_cog."""
- bot = MockBot()
- await antimalware.setup(bot)
- bot.add_cog.assert_awaited_once()
diff --git a/tests/bot/exts/filters/test_antispam.py b/tests/bot/exts/filters/test_antispam.py
deleted file mode 100644
index 6a0e4fded..000000000
--- a/tests/bot/exts/filters/test_antispam.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import unittest
-
-from bot.exts.filters import antispam
-
-
-class AntispamConfigurationValidationTests(unittest.TestCase):
- """Tests validation of the antispam cog configuration."""
-
- def test_default_antispam_config_is_valid(self):
- """The default antispam configuration is valid."""
- validation_errors = antispam.validate_config()
- self.assertEqual(validation_errors, {})
-
- def test_unknown_rule_returns_error(self):
- """Configuring an unknown rule returns an error."""
- self.assertEqual(
- antispam.validate_config({'invalid-rule': {}}),
- {'invalid-rule': "`invalid-rule` is not recognized as an antispam rule."}
- )
-
- def test_missing_keys_returns_error(self):
- """Not configuring required keys returns an error."""
- keys = (('interval', 'max'), ('max', 'interval'))
- for configured_key, unconfigured_key in keys:
- with self.subTest(
- configured_key=configured_key,
- unconfigured_key=unconfigured_key
- ):
- config = {'burst': {configured_key: 10}}
- error = f"Key `{unconfigured_key}` is required but not set for rule `burst`"
-
- self.assertEqual(
- antispam.validate_config(config),
- {'burst': error}
- )
diff --git a/tests/bot/exts/filters/test_filtering.py b/tests/bot/exts/filters/test_filtering.py
deleted file mode 100644
index bd26532f1..000000000
--- a/tests/bot/exts/filters/test_filtering.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import unittest
-from unittest.mock import patch
-
-from bot.exts.filters import filtering
-from tests.helpers import MockBot, autospec
-
-
-class FilteringCogTests(unittest.IsolatedAsyncioTestCase):
- """Tests the `Filtering` cog."""
-
- def setUp(self):
- """Instantiate the bot and cog."""
- self.bot = MockBot()
- with patch("botcore.utils.scheduling.create_task", new=lambda task, **_: task.close()):
- self.cog = filtering.Filtering(self.bot)
-
- @autospec(filtering.Filtering, "_get_filterlist_items", pass_mocks=False, return_value=["TOKEN"])
- async def test_token_filter(self):
- """Ensure that a filter token is correctly detected in a message."""
- messages = {
- "": False,
- "no matches": False,
- "TOKEN": True,
-
- # See advisory https://github.com/python-discord/bot/security/advisories/GHSA-j8c3-8x46-8pp6
- "https://google.com TOKEN": True,
- "https://google.com something else": False,
- }
-
- for message, match in messages.items():
- with self.subTest(input=message, match=match):
- result, _ = await self.cog._has_watch_regex_match(message)
-
- self.assertEqual(
- match,
- bool(result),
- msg=f"Hit was {'expected' if match else 'not expected'} for this input."
- )
- if result:
- self.assertEqual("TOKEN", result.group())
diff --git a/tests/bot/exts/filters/test_security.py b/tests/bot/exts/filters/test_security.py
deleted file mode 100644
index 007b7b1eb..000000000
--- a/tests/bot/exts/filters/test_security.py
+++ /dev/null
@@ -1,53 +0,0 @@
-import unittest
-
-from discord.ext.commands import NoPrivateMessage
-
-from bot.exts.filters import security
-from tests.helpers import MockBot, MockContext
-
-
-class SecurityCogTests(unittest.TestCase):
- """Tests the `Security` cog."""
-
- def setUp(self):
- """Attach an instance of the cog to the class for tests."""
- self.bot = MockBot()
- self.cog = security.Security(self.bot)
- self.ctx = MockContext()
-
- def test_check_additions(self):
- """The cog should add its checks after initialization."""
- self.bot.check.assert_any_call(self.cog.check_on_guild)
- self.bot.check.assert_any_call(self.cog.check_not_bot)
-
- def test_check_not_bot_returns_false_for_humans(self):
- """The bot check should return `True` when invoked with human authors."""
- self.ctx.author.bot = False
- self.assertTrue(self.cog.check_not_bot(self.ctx))
-
- def test_check_not_bot_returns_true_for_robots(self):
- """The bot check should return `False` when invoked with robotic authors."""
- self.ctx.author.bot = True
- self.assertFalse(self.cog.check_not_bot(self.ctx))
-
- def test_check_on_guild_raises_when_outside_of_guild(self):
- """When invoked outside of a guild, `check_on_guild` should cause an error."""
- self.ctx.guild = None
-
- with self.assertRaises(NoPrivateMessage, msg="This command cannot be used in private messages."):
- self.cog.check_on_guild(self.ctx)
-
- def test_check_on_guild_returns_true_inside_of_guild(self):
- """When invoked inside of a guild, `check_on_guild` should return `True`."""
- self.ctx.guild = "lemon's lemonade stand"
- self.assertTrue(self.cog.check_on_guild(self.ctx))
-
-
-class SecurityCogLoadTests(unittest.IsolatedAsyncioTestCase):
- """Tests loading the `Security` cog."""
-
- async def test_security_cog_load(self):
- """Setup of the extension should call add_cog."""
- bot = MockBot()
- await security.setup(bot)
- bot.add_cog.assert_awaited_once()
diff --git a/tests/bot/exts/filters/test_token_remover.py b/tests/bot/exts/filters/test_token_remover.py
deleted file mode 100644
index c1f3762ac..000000000
--- a/tests/bot/exts/filters/test_token_remover.py
+++ /dev/null
@@ -1,409 +0,0 @@
-import unittest
-from re import Match
-from unittest import mock
-from unittest.mock import MagicMock
-
-from discord import Colour, NotFound
-
-from bot import constants
-from bot.exts.filters import token_remover
-from bot.exts.filters.token_remover import Token, TokenRemover
-from bot.exts.moderation.modlog import ModLog
-from bot.utils.messages import format_user
-from tests.helpers import MockBot, MockMessage, autospec
-
-
-class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
- """Tests the `TokenRemover` cog."""
-
- def setUp(self):
- """Adds the cog, a bot, and a message to the instance for usage in tests."""
- self.bot = MockBot()
- self.cog = TokenRemover(bot=self.bot)
-
- self.msg = MockMessage(id=555, content="hello world")
- self.msg.channel.mention = "#lemonade-stand"
- self.msg.guild.get_member.return_value.bot = False
- self.msg.guild.get_member.return_value.__str__.return_value = "Woody"
- self.msg.author.__str__ = MagicMock(return_value=self.msg.author.name)
- self.msg.author.display_avatar.url = "picture-lemon.png"
-
- def test_extract_user_id_valid(self):
- """Should consider user IDs valid if they decode into an integer ID."""
- id_pairs = (
- ("NDcyMjY1OTQzMDYyNDEzMzMy", 472265943062413332),
- ("NDc1MDczNjI5Mzk5NTQ3OTA0", 475073629399547904),
- ("NDY3MjIzMjMwNjUwNzc3NjQx", 467223230650777641),
- )
-
- for token_id, user_id in id_pairs:
- with self.subTest(token_id=token_id):
- result = TokenRemover.extract_user_id(token_id)
- self.assertEqual(result, user_id)
-
- def test_extract_user_id_invalid(self):
- """Should consider non-digit and non-ASCII IDs invalid."""
- ids = (
- ("SGVsbG8gd29ybGQ", "non-digit ASCII"),
- ("0J_RgNC40LLQtdGCINC80LjRgA", "cyrillic text"),
- ("4pO14p6L4p6C4pG34p264pGl8J-EiOKSj-KCieKBsA", "Unicode digits"),
- ("4oaA4oaB4oWh4oWi4Lyz4Lyq4Lyr4LG9", "Unicode numerals"),
- ("8J2fjvCdn5nwnZ-k8J2fr_Cdn7rgravvvJngr6c", "Unicode decimals"),
- ("{hello}[world]&(bye!)", "ASCII invalid Base64"),
- ("Þíß-ï§-ňøẗ-våłìÐ", "Unicode invalid Base64"),
- )
-
- for user_id, msg in ids:
- with self.subTest(msg=msg):
- result = TokenRemover.extract_user_id(user_id)
- self.assertIsNone(result)
-
- def test_is_valid_timestamp_valid(self):
- """Should consider timestamps valid if they're greater than the Discord epoch."""
- timestamps = (
- "XsyRkw",
- "Xrim9Q",
- "XsyR-w",
- "XsySD_",
- "Dn9r_A",
- )
-
- for timestamp in timestamps:
- with self.subTest(timestamp=timestamp):
- result = TokenRemover.is_valid_timestamp(timestamp)
- self.assertTrue(result)
-
- def test_is_valid_timestamp_invalid(self):
- """Should consider timestamps invalid if they're before Discord epoch or can't be parsed."""
- timestamps = (
- ("B4Yffw", "DISCORD_EPOCH - TOKEN_EPOCH - 1"),
- ("ew", "123"),
- ("AoIKgA", "42076800"),
- ("{hello}[world]&(bye!)", "ASCII invalid Base64"),
- ("Þíß-ï§-ňøẗ-våłìÐ", "Unicode invalid Base64"),
- )
-
- for timestamp, msg in timestamps:
- with self.subTest(msg=msg):
- result = TokenRemover.is_valid_timestamp(timestamp)
- self.assertFalse(result)
-
- def test_is_valid_hmac_valid(self):
- """Should consider an HMAC valid if it has at least 3 unique characters."""
- valid_hmacs = (
- "VXmErH7j511turNpfURmb0rVNm8",
- "Ysnu2wacjaKs7qnoo46S8Dm2us8",
- "sJf6omBPORBPju3WJEIAcwW9Zds",
- "s45jqDV_Iisn-symw0yDRrk_jf4",
- )
-
- for hmac in valid_hmacs:
- with self.subTest(msg=hmac):
- result = TokenRemover.is_maybe_valid_hmac(hmac)
- self.assertTrue(result)
-
- def test_is_invalid_hmac_invalid(self):
- """Should consider an HMAC invalid if has fewer than 3 unique characters."""
- invalid_hmacs = (
- ("xxxxxxxxxxxxxxxxxx", "Single character"),
- ("XxXxXxXxXxXxXxXxXx", "Single character alternating case"),
- ("ASFasfASFasfASFASsf", "Three characters alternating-case"),
- ("asdasdasdasdasdasdasd", "Three characters one case"),
- )
-
- for hmac, msg in invalid_hmacs:
- with self.subTest(msg=msg):
- result = TokenRemover.is_maybe_valid_hmac(hmac)
- self.assertFalse(result)
-
- def test_mod_log_property(self):
- """The `mod_log` property should ask the bot to return the `ModLog` cog."""
- self.bot.get_cog.return_value = 'lemon'
- self.assertEqual(self.cog.mod_log, self.bot.get_cog.return_value)
- self.bot.get_cog.assert_called_once_with('ModLog')
-
- async def test_on_message_edit_uses_on_message(self):
- """The edit listener should delegate handling of the message to the normal listener."""
- self.cog.on_message = mock.create_autospec(self.cog.on_message, spec_set=True)
-
- await self.cog.on_message_edit(MockMessage(), self.msg)
- self.cog.on_message.assert_awaited_once_with(self.msg)
-
- @autospec(TokenRemover, "find_token_in_message", "take_action")
- async def test_on_message_takes_action(self, find_token_in_message, take_action):
- """Should take action if a valid token is found when a message is sent."""
- cog = TokenRemover(self.bot)
- found_token = "foobar"
- find_token_in_message.return_value = found_token
-
- await cog.on_message(self.msg)
-
- find_token_in_message.assert_called_once_with(self.msg)
- take_action.assert_awaited_once_with(cog, self.msg, found_token)
-
- @autospec(TokenRemover, "find_token_in_message", "take_action")
- async def test_on_message_skips_missing_token(self, find_token_in_message, take_action):
- """Shouldn't take action if a valid token isn't found when a message is sent."""
- cog = TokenRemover(self.bot)
- find_token_in_message.return_value = False
-
- await cog.on_message(self.msg)
-
- find_token_in_message.assert_called_once_with(self.msg)
- take_action.assert_not_awaited()
-
- @autospec(TokenRemover, "find_token_in_message")
- async def test_on_message_ignores_dms_bots(self, find_token_in_message):
- """Shouldn't parse a message if it is a DM or authored by a bot."""
- cog = TokenRemover(self.bot)
- dm_msg = MockMessage(guild=None)
- bot_msg = MockMessage(author=MagicMock(bot=True))
-
- for msg in (dm_msg, bot_msg):
- await cog.on_message(msg)
- find_token_in_message.assert_not_called()
-
- @autospec("bot.exts.filters.token_remover", "TOKEN_RE")
- def test_find_token_no_matches(self, token_re):
- """None should be returned if the regex matches no tokens in a message."""
- token_re.finditer.return_value = ()
-
- return_value = TokenRemover.find_token_in_message(self.msg)
-
- self.assertIsNone(return_value)
- token_re.finditer.assert_called_once_with(self.msg.content)
-
- @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac")
- @autospec("bot.exts.filters.token_remover", "Token")
- @autospec("bot.exts.filters.token_remover", "TOKEN_RE")
- def test_find_token_valid_match(
- self,
- token_re,
- token_cls,
- extract_user_id,
- is_valid_timestamp,
- is_maybe_valid_hmac,
- ):
- """The first match with a valid user ID, timestamp, and HMAC should be returned as a `Token`."""
- matches = [
- mock.create_autospec(Match, spec_set=True, instance=True),
- mock.create_autospec(Match, spec_set=True, instance=True),
- ]
- tokens = [
- mock.create_autospec(Token, spec_set=True, instance=True),
- mock.create_autospec(Token, spec_set=True, instance=True),
- ]
-
- token_re.finditer.return_value = matches
- token_cls.side_effect = tokens
- extract_user_id.side_effect = (None, True) # The 1st match will be invalid, 2nd one valid.
- is_valid_timestamp.return_value = True
- is_maybe_valid_hmac.return_value = True
-
- return_value = TokenRemover.find_token_in_message(self.msg)
-
- self.assertEqual(tokens[1], return_value)
- token_re.finditer.assert_called_once_with(self.msg.content)
-
- @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac")
- @autospec("bot.exts.filters.token_remover", "Token")
- @autospec("bot.exts.filters.token_remover", "TOKEN_RE")
- def test_find_token_invalid_matches(
- self,
- token_re,
- token_cls,
- extract_user_id,
- is_valid_timestamp,
- is_maybe_valid_hmac,
- ):
- """None should be returned if no matches have valid user IDs, HMACs, and timestamps."""
- token_re.finditer.return_value = [mock.create_autospec(Match, spec_set=True, instance=True)]
- token_cls.return_value = mock.create_autospec(Token, spec_set=True, instance=True)
- extract_user_id.return_value = None
- is_valid_timestamp.return_value = False
- is_maybe_valid_hmac.return_value = False
-
- return_value = TokenRemover.find_token_in_message(self.msg)
-
- self.assertIsNone(return_value)
- token_re.finditer.assert_called_once_with(self.msg.content)
-
- def test_regex_invalid_tokens(self):
- """Messages without anything looking like a token are not matched."""
- tokens = (
- "",
- "lemon wins",
- "..",
- "x.y",
- "x.y.",
- ".y.z",
- ".y.",
- "..z",
- "x..z",
- " . . ",
- "\n.\n.\n",
- "hellö.world.bye",
- "base64.nötbåse64.morebase64",
- "19jd3J.dfkm3d.€víł§tüff",
- )
-
- for token in tokens:
- with self.subTest(token=token):
- results = token_remover.TOKEN_RE.findall(token)
- self.assertEqual(len(results), 0)
-
- def test_regex_valid_tokens(self):
- """Messages that look like tokens should be matched."""
- # Don't worry, these tokens have been invalidated.
- tokens = (
- "NDcyMjY1OTQzMDYy_DEzMz-y.XsyRkw.VXmErH7j511turNpfURmb0rVNm8",
- "NDcyMjY1OTQzMDYyNDEzMzMy.Xrim9Q.Ysnu2wacjaKs7qnoo46S8Dm2us8",
- "NDc1MDczNjI5Mzk5NTQ3OTA0.XsyR-w.sJf6omBPORBPju3WJEIAcwW9Zds",
- "NDY3MjIzMjMwNjUwNzc3NjQx.XsySD_.s45jqDV_Iisn-symw0yDRrk_jf4",
- )
-
- for token in tokens:
- with self.subTest(token=token):
- results = token_remover.TOKEN_RE.fullmatch(token)
- self.assertIsNotNone(results, f"{token} was not matched by the regex")
-
- def test_regex_matches_multiple_valid(self):
- """Should support multiple matches in the middle of a string."""
- token_1 = "NDY3MjIzMjMwNjUwNzc3NjQx.XsyWGg.uFNEQPCc4ePwGh7egG8UicQssz8"
- token_2 = "NDcyMjY1OTQzMDYyNDEzMzMy.XsyWMw.l8XPnDqb0lp-EiQ2g_0xVFT1pyc"
- message = f"garbage {token_1} hello {token_2} world"
-
- results = token_remover.TOKEN_RE.finditer(message)
- results = [match[0] for match in results]
- self.assertCountEqual((token_1, token_2), results)
-
- @autospec("bot.exts.filters.token_remover", "LOG_MESSAGE")
- def test_format_log_message(self, log_message):
- """Should correctly format the log message with info from the message and token."""
- token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
- log_message.format.return_value = "Howdy"
-
- return_value = TokenRemover.format_log_message(self.msg, token)
-
- self.assertEqual(return_value, log_message.format.return_value)
- log_message.format.assert_called_once_with(
- author=format_user(self.msg.author),
- channel=self.msg.channel.mention,
- user_id=token.user_id,
- timestamp=token.timestamp,
- hmac="xxxxxxxxxxxxxxxxxxxxxxxxjf4",
- )
-
- @autospec("bot.exts.filters.token_remover", "UNKNOWN_USER_LOG_MESSAGE")
- async def test_format_userid_log_message_unknown(self, unknown_user_log_message,):
- """Should correctly format the user ID portion when the actual user it belongs to is unknown."""
- token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
- unknown_user_log_message.format.return_value = " Partner"
- msg = MockMessage(id=555, content="hello world")
- msg.guild.get_member.return_value = None
- msg.guild.fetch_member.side_effect = NotFound(mock.Mock(status=404), "Not found")
-
- return_value = await TokenRemover.format_userid_log_message(msg, token)
-
- self.assertEqual(return_value, (unknown_user_log_message.format.return_value, False))
- unknown_user_log_message.format.assert_called_once_with(user_id=472265943062413332)
-
- @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE")
- async def test_format_userid_log_message_bot(self, known_user_log_message):
- """Should correctly format the user ID portion when the ID belongs to a known bot."""
- token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
- known_user_log_message.format.return_value = " Partner"
- msg = MockMessage(id=555, content="hello world")
- msg.guild.get_member.return_value.__str__.return_value = "Sam"
- msg.guild.get_member.return_value.bot = True
-
- return_value = await TokenRemover.format_userid_log_message(msg, token)
-
- self.assertEqual(return_value, (known_user_log_message.format.return_value, True))
-
- known_user_log_message.format.assert_called_once_with(
- user_id=472265943062413332,
- user_name="Sam",
- kind="BOT",
- )
-
- @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE")
- async def test_format_log_message_user_token_user(self, user_token_message):
- """Should correctly format the user ID portion when the ID belongs to a known user."""
- token = Token("NDY3MjIzMjMwNjUwNzc3NjQx", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
- user_token_message.format.return_value = "Partner"
-
- return_value = await TokenRemover.format_userid_log_message(self.msg, token)
-
- self.assertEqual(return_value, (user_token_message.format.return_value, True))
- user_token_message.format.assert_called_once_with(
- user_id=467223230650777641,
- user_name="Woody",
- kind="USER",
- )
-
- @mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock)
- @autospec("bot.exts.filters.token_remover", "log")
- @autospec(TokenRemover, "format_log_message", "format_userid_log_message")
- async def test_take_action(self, format_log_message, format_userid_log_message, logger, mod_log_property):
- """Should delete the message and send a mod log."""
- cog = TokenRemover(self.bot)
- mod_log = mock.create_autospec(ModLog, spec_set=True, instance=True)
- token = mock.create_autospec(Token, spec_set=True, instance=True)
- token.user_id = "no-id"
- log_msg = "testing123"
- userid_log_message = "userid-log-message"
-
- mod_log_property.return_value = mod_log
- format_log_message.return_value = log_msg
- format_userid_log_message.return_value = (userid_log_message, True)
-
- await cog.take_action(self.msg, token)
-
- self.msg.delete.assert_called_once_with()
- self.msg.channel.send.assert_called_once_with(
- token_remover.DELETION_MESSAGE_TEMPLATE.format(mention=self.msg.author.mention)
- )
-
- format_log_message.assert_called_once_with(self.msg, token)
- format_userid_log_message.assert_called_once_with(self.msg, token)
- logger.debug.assert_called_with(log_msg)
- self.bot.stats.incr.assert_called_once_with("tokens.removed_tokens")
-
- mod_log.ignore.assert_called_once_with(constants.Event.message_delete, self.msg.id)
- mod_log.send_log_message.assert_called_once_with(
- icon_url=constants.Icons.token_removed,
- colour=Colour(constants.Colours.soft_red),
- title="Token removed!",
- text=log_msg + "\n" + userid_log_message,
- thumbnail=self.msg.author.display_avatar.url,
- channel_id=constants.Channels.mod_alerts,
- ping_everyone=True,
- )
-
- @mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock)
- async def test_take_action_delete_failure(self, mod_log_property):
- """Shouldn't send any messages if the token message can't be deleted."""
- cog = TokenRemover(self.bot)
- mod_log_property.return_value = mock.create_autospec(ModLog, spec_set=True, instance=True)
- self.msg.delete.side_effect = NotFound(MagicMock(), MagicMock())
-
- token = mock.create_autospec(Token, spec_set=True, instance=True)
- await cog.take_action(self.msg, token)
-
- self.msg.delete.assert_called_once_with()
- self.msg.channel.send.assert_not_awaited()
-
-
-class TokenRemoverExtensionTests(unittest.IsolatedAsyncioTestCase):
- """Tests for the token_remover extension."""
-
- @autospec("bot.exts.filters.token_remover", "TokenRemover")
- async def test_extension_setup(self, cog):
- """The TokenRemover cog should be added."""
- bot = MockBot()
- await token_remover.setup(bot)
-
- cog.assert_called_once_with(bot)
- bot.add_cog.assert_awaited_once()
- self.assertTrue(isinstance(bot.add_cog.call_args.args[0], TokenRemover))
diff --git a/tests/bot/rules/__init__.py b/tests/bot/rules/__init__.py
deleted file mode 100644
index 0d570f5a3..000000000
--- a/tests/bot/rules/__init__.py
+++ /dev/null
@@ -1,76 +0,0 @@
-import unittest
-from abc import ABCMeta, abstractmethod
-from typing import Callable, Dict, Iterable, List, NamedTuple, Tuple
-
-from tests.helpers import MockMessage
-
-
-class DisallowedCase(NamedTuple):
- """Encapsulation for test cases expected to fail."""
- recent_messages: List[MockMessage]
- culprits: Iterable[str]
- n_violations: int
-
-
-class RuleTest(unittest.IsolatedAsyncioTestCase, metaclass=ABCMeta):
- """
- Abstract class for antispam rule test cases.
-
- Tests for specific rules should inherit from `RuleTest` and implement
- `relevant_messages` and `get_report`. Each instance should also set the
- `apply` and `config` attributes as necessary.
-
- The execution of test cases can then be delegated to the `run_allowed`
- and `run_disallowed` methods.
- """
-
- apply: Callable # The tested rule's apply function
- config: Dict[str, int]
-
- async def run_allowed(self, cases: Tuple[List[MockMessage], ...]) -> None:
- """Run all `cases` against `self.apply` expecting them to pass."""
- for recent_messages in cases:
- last_message = recent_messages[0]
-
- with self.subTest(
- last_message=last_message,
- recent_messages=recent_messages,
- config=self.config,
- ):
- self.assertIsNone(
- await self.apply(last_message, recent_messages, self.config)
- )
-
- async def run_disallowed(self, cases: Tuple[DisallowedCase, ...]) -> None:
- """Run all `cases` against `self.apply` expecting them to fail."""
- for case in cases:
- recent_messages, culprits, n_violations = case
- last_message = recent_messages[0]
- relevant_messages = self.relevant_messages(case)
- desired_output = (
- self.get_report(case),
- culprits,
- relevant_messages,
- )
-
- with self.subTest(
- last_message=last_message,
- recent_messages=recent_messages,
- relevant_messages=relevant_messages,
- n_violations=n_violations,
- config=self.config,
- ):
- self.assertTupleEqual(
- await self.apply(last_message, recent_messages, self.config),
- desired_output,
- )
-
- @abstractmethod
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- """Give expected relevant messages for `case`."""
- raise NotImplementedError # pragma: no cover
-
- @abstractmethod
- def get_report(self, case: DisallowedCase) -> str:
- """Give expected error report for `case`."""
- raise NotImplementedError # pragma: no cover
diff --git a/tests/bot/rules/test_attachments.py b/tests/bot/rules/test_attachments.py
deleted file mode 100644
index d7e779221..000000000
--- a/tests/bot/rules/test_attachments.py
+++ /dev/null
@@ -1,69 +0,0 @@
-from typing import Iterable
-
-from bot.rules import attachments
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str, total_attachments: int) -> MockMessage:
- """Builds a message with `total_attachments` attachments."""
- return MockMessage(author=author, attachments=list(range(total_attachments)))
-
-
-class AttachmentRuleTests(RuleTest):
- """Tests applying the `attachments` antispam rule."""
-
- def setUp(self):
- self.apply = attachments.apply
- self.config = {"max": 5, "interval": 10}
-
- async def test_allows_messages_without_too_many_attachments(self):
- """Messages without too many attachments are allowed as-is."""
- cases = (
- [make_msg("bob", 0), make_msg("bob", 0), make_msg("bob", 0)],
- [make_msg("bob", 2), make_msg("bob", 2)],
- [make_msg("bob", 2), make_msg("alice", 2), make_msg("bob", 2)],
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_with_too_many_attachments(self):
- """Messages with too many attachments trigger the rule."""
- cases = (
- DisallowedCase(
- [make_msg("bob", 4), make_msg("bob", 0), make_msg("bob", 6)],
- ("bob",),
- 10,
- ),
- DisallowedCase(
- [make_msg("bob", 4), make_msg("alice", 6), make_msg("bob", 2)],
- ("bob",),
- 6,
- ),
- DisallowedCase(
- [make_msg("alice", 6)],
- ("alice",),
- 6,
- ),
- DisallowedCase(
- [make_msg("alice", 1) for _ in range(6)],
- ("alice",),
- 6,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_message = case.recent_messages[0]
- return tuple(
- msg
- for msg in case.recent_messages
- if (
- msg.author == last_message.author
- and len(msg.attachments) > 0
- )
- )
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} attachments in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_burst.py b/tests/bot/rules/test_burst.py
deleted file mode 100644
index 03682966b..000000000
--- a/tests/bot/rules/test_burst.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from typing import Iterable
-
-from bot.rules import burst
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str) -> MockMessage:
- """
- Init a MockMessage instance with author set to `author`.
-
- This serves as a shorthand / alias to keep the test cases visually clean.
- """
- return MockMessage(author=author)
-
-
-class BurstRuleTests(RuleTest):
- """Tests the `burst` antispam rule."""
-
- def setUp(self):
- self.apply = burst.apply
- self.config = {"max": 2, "interval": 10}
-
- async def test_allows_messages_within_limit(self):
- """Cases which do not violate the rule."""
- cases = (
- [make_msg("bob"), make_msg("bob")],
- [make_msg("bob"), make_msg("alice"), make_msg("bob")],
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_beyond_limit(self):
- """Cases where the amount of messages exceeds the limit, triggering the rule."""
- cases = (
- DisallowedCase(
- [make_msg("bob"), make_msg("bob"), make_msg("bob")],
- ("bob",),
- 3,
- ),
- DisallowedCase(
- [make_msg("bob"), make_msg("bob"), make_msg("alice"), make_msg("bob")],
- ("bob",),
- 3,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- return tuple(msg for msg in case.recent_messages if msg.author in case.culprits)
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} messages in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_burst_shared.py b/tests/bot/rules/test_burst_shared.py
deleted file mode 100644
index 3275143d5..000000000
--- a/tests/bot/rules/test_burst_shared.py
+++ /dev/null
@@ -1,57 +0,0 @@
-from typing import Iterable
-
-from bot.rules import burst_shared
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str) -> MockMessage:
- """
- Init a MockMessage instance with the passed arg.
-
- This serves as a shorthand / alias to keep the test cases visually clean.
- """
- return MockMessage(author=author)
-
-
-class BurstSharedRuleTests(RuleTest):
- """Tests the `burst_shared` antispam rule."""
-
- def setUp(self):
- self.apply = burst_shared.apply
- self.config = {"max": 2, "interval": 10}
-
- async def test_allows_messages_within_limit(self):
- """
- Cases that do not violate the rule.
-
- There really isn't more to test here than a single case.
- """
- cases = (
- [make_msg("spongebob"), make_msg("patrick")],
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_beyond_limit(self):
- """Cases where the amount of messages exceeds the limit, triggering the rule."""
- cases = (
- DisallowedCase(
- [make_msg("bob"), make_msg("bob"), make_msg("bob")],
- {"bob"},
- 3,
- ),
- DisallowedCase(
- [make_msg("bob"), make_msg("bob"), make_msg("alice"), make_msg("bob")],
- {"bob", "alice"},
- 4,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- return case.recent_messages
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} messages in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_chars.py b/tests/bot/rules/test_chars.py
deleted file mode 100644
index f1e3c76a7..000000000
--- a/tests/bot/rules/test_chars.py
+++ /dev/null
@@ -1,64 +0,0 @@
-from typing import Iterable
-
-from bot.rules import chars
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str, n_chars: int) -> MockMessage:
- """Build a message with arbitrary content of `n_chars` length."""
- return MockMessage(author=author, content="A" * n_chars)
-
-
-class CharsRuleTests(RuleTest):
- """Tests the `chars` antispam rule."""
-
- def setUp(self):
- self.apply = chars.apply
- self.config = {
- "max": 20, # Max allowed sum of chars per user
- "interval": 10,
- }
-
- async def test_allows_messages_within_limit(self):
- """Cases with a total amount of chars within limit."""
- cases = (
- [make_msg("bob", 0)],
- [make_msg("bob", 20)],
- [make_msg("bob", 15), make_msg("alice", 15)],
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_beyond_limit(self):
- """Cases where the total amount of chars exceeds the limit, triggering the rule."""
- cases = (
- DisallowedCase(
- [make_msg("bob", 21)],
- ("bob",),
- 21,
- ),
- DisallowedCase(
- [make_msg("bob", 15), make_msg("bob", 15)],
- ("bob",),
- 30,
- ),
- DisallowedCase(
- [make_msg("alice", 15), make_msg("bob", 20), make_msg("alice", 15)],
- ("alice",),
- 30,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_message = case.recent_messages[0]
- return tuple(
- msg
- for msg in case.recent_messages
- if msg.author == last_message.author
- )
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} characters in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_discord_emojis.py b/tests/bot/rules/test_discord_emojis.py
deleted file mode 100644
index 66c2d9f92..000000000
--- a/tests/bot/rules/test_discord_emojis.py
+++ /dev/null
@@ -1,73 +0,0 @@
-from typing import Iterable
-
-from bot.rules import discord_emojis
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-discord_emoji = "<:abcd:1234>" # Discord emojis follow the format <:name:id>
-unicode_emoji = "🧪"
-
-
-def make_msg(author: str, n_emojis: int, emoji: str = discord_emoji) -> MockMessage:
- """Build a MockMessage instance with content containing `n_emojis` arbitrary emojis."""
- return MockMessage(author=author, content=emoji * n_emojis)
-
-
-class DiscordEmojisRuleTests(RuleTest):
- """Tests for the `discord_emojis` antispam rule."""
-
- def setUp(self):
- self.apply = discord_emojis.apply
- self.config = {"max": 2, "interval": 10}
-
- async def test_allows_messages_within_limit(self):
- """Cases with a total amount of discord and unicode emojis within limit."""
- cases = (
- [make_msg("bob", 2)],
- [make_msg("alice", 1), make_msg("bob", 2), make_msg("alice", 1)],
- [make_msg("bob", 2, unicode_emoji)],
- [
- make_msg("alice", 1, unicode_emoji),
- make_msg("bob", 2, unicode_emoji),
- make_msg("alice", 1, unicode_emoji)
- ],
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_beyond_limit(self):
- """Cases with more than the allowed amount of discord and unicode emojis."""
- cases = (
- DisallowedCase(
- [make_msg("bob", 3)],
- ("bob",),
- 3,
- ),
- DisallowedCase(
- [make_msg("alice", 2), make_msg("bob", 2), make_msg("alice", 2)],
- ("alice",),
- 4,
- ),
- DisallowedCase(
- [make_msg("bob", 3, unicode_emoji)],
- ("bob",),
- 3,
- ),
- DisallowedCase(
- [
- make_msg("alice", 2, unicode_emoji),
- make_msg("bob", 2, unicode_emoji),
- make_msg("alice", 2, unicode_emoji)
- ],
- ("alice",),
- 4
- )
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- return tuple(msg for msg in case.recent_messages if msg.author in case.culprits)
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} emojis in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_duplicates.py b/tests/bot/rules/test_duplicates.py
deleted file mode 100644
index 9bd886a77..000000000
--- a/tests/bot/rules/test_duplicates.py
+++ /dev/null
@@ -1,64 +0,0 @@
-from typing import Iterable
-
-from bot.rules import duplicates
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str, content: str) -> MockMessage:
- """Give a MockMessage instance with `author` and `content` attrs."""
- return MockMessage(author=author, content=content)
-
-
-class DuplicatesRuleTests(RuleTest):
- """Tests the `duplicates` antispam rule."""
-
- def setUp(self):
- self.apply = duplicates.apply
- self.config = {"max": 2, "interval": 10}
-
- async def test_allows_messages_within_limit(self):
- """Cases which do not violate the rule."""
- cases = (
- [make_msg("alice", "A"), make_msg("alice", "A")],
- [make_msg("alice", "A"), make_msg("alice", "B"), make_msg("alice", "C")], # Non-duplicate
- [make_msg("alice", "A"), make_msg("bob", "A"), make_msg("alice", "A")], # Different author
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_beyond_limit(self):
- """Cases with too many duplicate messages from the same author."""
- cases = (
- DisallowedCase(
- [make_msg("alice", "A"), make_msg("alice", "A"), make_msg("alice", "A")],
- ("alice",),
- 3,
- ),
- DisallowedCase(
- [make_msg("bob", "A"), make_msg("alice", "A"), make_msg("bob", "A"), make_msg("bob", "A")],
- ("bob",),
- 3, # 4 duplicate messages, but only 3 from bob
- ),
- DisallowedCase(
- [make_msg("bob", "A"), make_msg("bob", "B"), make_msg("bob", "A"), make_msg("bob", "A")],
- ("bob",),
- 3, # 4 message from bob, but only 3 duplicates
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_message = case.recent_messages[0]
- return tuple(
- msg
- for msg in case.recent_messages
- if (
- msg.author == last_message.author
- and msg.content == last_message.content
- )
- )
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} duplicated messages in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_links.py b/tests/bot/rules/test_links.py
deleted file mode 100644
index b091bd9d7..000000000
--- a/tests/bot/rules/test_links.py
+++ /dev/null
@@ -1,67 +0,0 @@
-from typing import Iterable
-
-from bot.rules import links
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str, total_links: int) -> MockMessage:
- """Makes a message with `total_links` links."""
- content = " ".join(["https://pydis.com"] * total_links)
- return MockMessage(author=author, content=content)
-
-
-class LinksTests(RuleTest):
- """Tests applying the `links` rule."""
-
- def setUp(self):
- self.apply = links.apply
- self.config = {
- "max": 2,
- "interval": 10
- }
-
- async def test_links_within_limit(self):
- """Messages with an allowed amount of links."""
- cases = (
- [make_msg("bob", 0)],
- [make_msg("bob", 2)],
- [make_msg("bob", 3)], # Filter only applies if len(messages_with_links) > 1
- [make_msg("bob", 1), make_msg("bob", 1)],
- [make_msg("bob", 2), make_msg("alice", 2)] # Only messages from latest author count
- )
-
- await self.run_allowed(cases)
-
- async def test_links_exceeding_limit(self):
- """Messages with a a higher than allowed amount of links."""
- cases = (
- DisallowedCase(
- [make_msg("bob", 1), make_msg("bob", 2)],
- ("bob",),
- 3
- ),
- DisallowedCase(
- [make_msg("alice", 1), make_msg("alice", 1), make_msg("alice", 1)],
- ("alice",),
- 3
- ),
- DisallowedCase(
- [make_msg("alice", 2), make_msg("bob", 3), make_msg("alice", 1)],
- ("alice",),
- 3
- )
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_message = case.recent_messages[0]
- return tuple(
- msg
- for msg in case.recent_messages
- if msg.author == last_message.author
- )
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} links in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_mentions.py b/tests/bot/rules/test_mentions.py
deleted file mode 100644
index f8805ac48..000000000
--- a/tests/bot/rules/test_mentions.py
+++ /dev/null
@@ -1,83 +0,0 @@
-from typing import Iterable
-
-from bot.rules import mentions
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMember, MockMessage
-
-
-def make_msg(author: str, total_user_mentions: int, total_bot_mentions: int = 0) -> MockMessage:
- """Makes a message with `total_mentions` mentions."""
- user_mentions = [MockMember() for _ in range(total_user_mentions)]
- bot_mentions = [MockMember(bot=True) for _ in range(total_bot_mentions)]
- return MockMessage(author=author, mentions=user_mentions+bot_mentions)
-
-
-class TestMentions(RuleTest):
- """Tests applying the `mentions` antispam rule."""
-
- def setUp(self):
- self.apply = mentions.apply
- self.config = {
- "max": 2,
- "interval": 10,
- }
-
- async def test_mentions_within_limit(self):
- """Messages with an allowed amount of mentions."""
- cases = (
- [make_msg("bob", 0)],
- [make_msg("bob", 2)],
- [make_msg("bob", 1), make_msg("bob", 1)],
- [make_msg("bob", 1), make_msg("alice", 2)],
- )
-
- await self.run_allowed(cases)
-
- async def test_mentions_exceeding_limit(self):
- """Messages with a higher than allowed amount of mentions."""
- cases = (
- DisallowedCase(
- [make_msg("bob", 3)],
- ("bob",),
- 3,
- ),
- DisallowedCase(
- [make_msg("alice", 2), make_msg("alice", 0), make_msg("alice", 1)],
- ("alice",),
- 3,
- ),
- DisallowedCase(
- [make_msg("bob", 2), make_msg("alice", 3), make_msg("bob", 2)],
- ("bob",),
- 4,
- ),
- DisallowedCase(
- [make_msg("bob", 3, 1)],
- ("bob",),
- 3,
- ),
- )
-
- await self.run_disallowed(cases)
-
- async def test_ignore_bot_mentions(self):
- """Messages with an allowed amount of mentions, also containing bot mentions."""
- cases = (
- [make_msg("bob", 0, 3)],
- [make_msg("bob", 2, 1)],
- [make_msg("bob", 1, 2), make_msg("bob", 1, 2)],
- [make_msg("bob", 1, 5), make_msg("alice", 2, 5)]
- )
-
- await self.run_allowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_message = case.recent_messages[0]
- return tuple(
- msg
- for msg in case.recent_messages
- if msg.author == last_message.author
- )
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} mentions in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_newlines.py b/tests/bot/rules/test_newlines.py
deleted file mode 100644
index e35377773..000000000
--- a/tests/bot/rules/test_newlines.py
+++ /dev/null
@@ -1,102 +0,0 @@
-from typing import Iterable, List
-
-from bot.rules import newlines
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str, newline_groups: List[int]) -> MockMessage:
- """Init a MockMessage instance with `author` and content configured by `newline_groups".
-
- Configure content by passing a list of ints, where each int `n` will generate
- a separate group of `n` newlines.
-
- Example:
- newline_groups=[3, 1, 2] -> content="\n\n\n \n \n\n"
- """
- content = " ".join("\n" * n for n in newline_groups)
- return MockMessage(author=author, content=content)
-
-
-class TotalNewlinesRuleTests(RuleTest):
- """Tests the `newlines` antispam rule against allowed cases and total newline count violations."""
-
- def setUp(self):
- self.apply = newlines.apply
- self.config = {
- "max": 5, # Max sum of newlines in relevant messages
- "max_consecutive": 3, # Max newlines in one group, in one message
- "interval": 10,
- }
-
- async def test_allows_messages_within_limit(self):
- """Cases which do not violate the rule."""
- cases = (
- [make_msg("alice", [])], # Single message with no newlines
- [make_msg("alice", [1, 2]), make_msg("alice", [1, 1])], # 5 newlines in 2 messages
- [make_msg("alice", [2, 2, 1]), make_msg("bob", [2, 3])], # 5 newlines from each author
- [make_msg("bob", [1]), make_msg("alice", [5])], # Alice breaks the rule, but only bob is relevant
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_total(self):
- """Cases which violate the rule by having too many newlines in total."""
- cases = (
- DisallowedCase( # Alice sends a total of 6 newlines (disallowed)
- [make_msg("alice", [2, 2]), make_msg("alice", [2])],
- ("alice",),
- 6,
- ),
- DisallowedCase( # Here we test that only alice's newlines count in the sum
- [make_msg("alice", [2, 2]), make_msg("bob", [3]), make_msg("alice", [3])],
- ("alice",),
- 7,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_author = case.recent_messages[0].author
- return tuple(msg for msg in case.recent_messages if msg.author == last_author)
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} newlines in {self.config['interval']}s"
-
-
-class GroupNewlinesRuleTests(RuleTest):
- """
- Tests the `newlines` antispam rule against max consecutive newline violations.
-
- As these violations yield a different error report, they require a different
- `get_report` implementation.
- """
-
- def setUp(self):
- self.apply = newlines.apply
- self.config = {"max": 5, "max_consecutive": 3, "interval": 10}
-
- async def test_disallows_messages_consecutive(self):
- """Cases which violate the rule due to having too many consecutive newlines."""
- cases = (
- DisallowedCase( # Bob sends a group of newlines too large
- [make_msg("bob", [4])],
- ("bob",),
- 4,
- ),
- DisallowedCase( # Alice sends 5 in total (allowed), but 4 in one group (disallowed)
- [make_msg("alice", [1]), make_msg("alice", [4])],
- ("alice",),
- 4,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_author = case.recent_messages[0].author
- return tuple(msg for msg in case.recent_messages if msg.author == last_author)
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} consecutive newlines in {self.config['interval']}s"
diff --git a/tests/bot/rules/test_role_mentions.py b/tests/bot/rules/test_role_mentions.py
deleted file mode 100644
index 26c05d527..000000000
--- a/tests/bot/rules/test_role_mentions.py
+++ /dev/null
@@ -1,55 +0,0 @@
-from typing import Iterable
-
-from bot.rules import role_mentions
-from tests.bot.rules import DisallowedCase, RuleTest
-from tests.helpers import MockMessage
-
-
-def make_msg(author: str, n_mentions: int) -> MockMessage:
- """Build a MockMessage instance with `n_mentions` role mentions."""
- return MockMessage(author=author, role_mentions=[None] * n_mentions)
-
-
-class RoleMentionsRuleTests(RuleTest):
- """Tests for the `role_mentions` antispam rule."""
-
- def setUp(self):
- self.apply = role_mentions.apply
- self.config = {"max": 2, "interval": 10}
-
- async def test_allows_messages_within_limit(self):
- """Cases with a total amount of role mentions within limit."""
- cases = (
- [make_msg("bob", 2)],
- [make_msg("bob", 1), make_msg("alice", 1), make_msg("bob", 1)],
- )
-
- await self.run_allowed(cases)
-
- async def test_disallows_messages_beyond_limit(self):
- """Cases with more than the allowed amount of role mentions."""
- cases = (
- DisallowedCase(
- [make_msg("bob", 3)],
- ("bob",),
- 3,
- ),
- DisallowedCase(
- [make_msg("alice", 2), make_msg("bob", 2), make_msg("alice", 2)],
- ("alice",),
- 4,
- ),
- )
-
- await self.run_disallowed(cases)
-
- def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]:
- last_message = case.recent_messages[0]
- return tuple(
- msg
- for msg in case.recent_messages
- if msg.author == last_message.author
- )
-
- def get_report(self, case: DisallowedCase) -> str:
- return f"sent {case.n_violations} role mentions in {self.config['interval']}s"