diff options
author | 2021-12-01 22:15:31 +0200 | |
---|---|---|
committer | 2022-07-15 14:52:42 +0300 | |
commit | d2226cc067aebd61113c584ccf833d55cf227a2d (patch) | |
tree | d28875e15d74e1bd9f1b23cc6a94d20578ce5d13 | |
parent | Merge pull request #2215 from python-discord/revival-of-code-role (diff) |
Tear down the old filtering system
Tests and dependent functionality in other extensions will be re-added later on.
40 files changed, 1 insertions, 3702 deletions
diff --git a/bot/bot.py b/bot/bot.py index aff07cd32..e40c3f8c1 100644 --- a/bot/bot.py +++ b/bot/bot.py @@ -27,8 +27,6 @@ class Bot(BotBase): super().__init__(*args, **kwargs) - self.filter_list_cache = defaultdict(dict) - async def ping_services(self) -> None: """A helper to make sure all the services the bot relies on are available on startup.""" # Connect Site/API @@ -45,33 +43,10 @@ class Bot(BotBase): raise await asyncio.sleep(constants.URLs.connect_cooldown) - def insert_item_into_filter_list_cache(self, item: dict[str, str]) -> None: - """Add an item to the bots filter_list_cache.""" - type_ = item["type"] - allowed = item["allowed"] - content = item["content"] - - self.filter_list_cache[f"{type_}.{allowed}"][content] = { - "id": item["id"], - "comment": item["comment"], - "created_at": item["created_at"], - "updated_at": item["updated_at"], - } - - async def cache_filter_list_data(self) -> None: - """Cache all the data in the FilterList on the site.""" - full_cache = await self.api_client.get('bot/filter-lists') - - for item in full_cache: - self.insert_item_into_filter_list_cache(item) - async def setup_hook(self) -> None: """Default async initialisation method for discord.py.""" await super().setup_hook() - # Build the FilterList cache - await self.cache_filter_list_data() - # This is not awaited to avoid a deadlock with any cogs that have # wait_until_guild_available in their cog_load method. scheduling.create_task(self.load_extensions(exts)) diff --git a/bot/converters.py b/bot/converters.py index 5800ea044..23bef0dcc 100644 --- a/bot/converters.py +++ b/bot/converters.py @@ -68,54 +68,6 @@ class ValidDiscordServerInvite(Converter): raise BadArgument("This does not appear to be a valid Discord server invite.") -class ValidFilterListType(Converter): - """ - A converter that checks whether the given string is a valid FilterList type. - - Raises `BadArgument` if the argument is not a valid FilterList type, and simply - passes through the given argument otherwise. - """ - - @staticmethod - async def get_valid_types(bot: Bot) -> list: - """ - Try to get a list of valid filter list types. - - Raise a BadArgument if the API can't respond. - """ - try: - valid_types = await bot.api_client.get('bot/filter-lists/get-types') - except ResponseCodeError: - raise BadArgument("Cannot validate list_type: Unable to fetch valid types from API.") - - return [enum for enum, classname in valid_types] - - async def convert(self, ctx: Context, list_type: str) -> str: - """Checks whether the given string is a valid FilterList type.""" - valid_types = await self.get_valid_types(ctx.bot) - list_type = list_type.upper() - - if list_type not in valid_types: - - # Maybe the user is using the plural form of this type, - # e.g. "guild_invites" instead of "guild_invite". - # - # This code will support the simple plural form (a single 's' at the end), - # which works for all current list types, but if a list type is added in the future - # which has an irregular plural form (like 'ies'), this code will need to be - # refactored to support this. - if list_type.endswith("S") and list_type[:-1] in valid_types: - list_type = list_type[:-1] - - else: - valid_types_list = '\n'.join([f"• {type_.lower()}" for type_ in valid_types]) - raise BadArgument( - f"You have provided an invalid list type!\n\n" - f"Please provide one of the following: \n{valid_types_list}" - ) - return list_type - - class Extension(Converter): """ Fully qualify the name of an extension and ensure it exists. diff --git a/bot/exts/filters/__init__.py b/bot/exts/filters/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/bot/exts/filters/__init__.py +++ /dev/null diff --git a/bot/exts/filters/antimalware.py b/bot/exts/filters/antimalware.py deleted file mode 100644 index ff39700a6..000000000 --- a/bot/exts/filters/antimalware.py +++ /dev/null @@ -1,106 +0,0 @@ -import typing as t -from os.path import splitext - -from discord import Embed, Message, NotFound -from discord.ext.commands import Cog - -from bot.bot import Bot -from bot.constants import Channels, Filter, URLs -from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME -from bot.log import get_logger - -log = get_logger(__name__) - -PY_EMBED_DESCRIPTION = ( - "It looks like you tried to attach a Python file - " - f"please use a code-pasting service such as {URLs.site_schema}{URLs.site_paste}" -) - -TXT_LIKE_FILES = {".txt", ".csv", ".json"} -TXT_EMBED_DESCRIPTION = ( - "You either uploaded a `{blocked_extension}` file or entered a message that was too long. " - f"Please use our [paste bin]({URLs.site_schema}{URLs.site_paste}) instead." -) - -DISALLOWED_EMBED_DESCRIPTION = ( - "It looks like you tried to attach file type(s) that we do not allow ({blocked_extensions_str}). " - "We currently allow the following file types: **{joined_whitelist}**.\n\n" - "Feel free to ask in {meta_channel_mention} if you think this is a mistake." -) - - -class AntiMalware(Cog): - """Delete messages which contain attachments with non-whitelisted file extensions.""" - - def __init__(self, bot: Bot): - self.bot = bot - - def _get_whitelisted_file_formats(self) -> list: - """Get the file formats currently on the whitelist.""" - return self.bot.filter_list_cache['FILE_FORMAT.True'].keys() - - def _get_disallowed_extensions(self, message: Message) -> t.Iterable[str]: - """Get an iterable containing all the disallowed extensions of attachments.""" - file_extensions = {splitext(attachment.filename.lower())[1] for attachment in message.attachments} - extensions_blocked = file_extensions - set(self._get_whitelisted_file_formats()) - return extensions_blocked - - @Cog.listener() - async def on_message(self, message: Message) -> None: - """Identify messages with prohibited attachments.""" - # Return when message don't have attachment and don't moderate DMs - if not message.attachments or not message.guild: - return - - # Ignore webhook and bot messages - if message.webhook_id or message.author.bot: - return - - # Ignore code jam channels - if getattr(message.channel, "category", None) and message.channel.category.name == JAM_CATEGORY_NAME: - return - - # Check if user is staff, if is, return - # Since we only care that roles exist to iterate over, check for the attr rather than a User/Member instance - if hasattr(message.author, "roles") and any(role.id in Filter.role_whitelist for role in message.author.roles): - return - - embed = Embed() - extensions_blocked = self._get_disallowed_extensions(message) - blocked_extensions_str = ', '.join(extensions_blocked) - if ".py" in extensions_blocked: - # Short-circuit on *.py files to provide a pastebin link - embed.description = PY_EMBED_DESCRIPTION - elif extensions := TXT_LIKE_FILES.intersection(extensions_blocked): - # Work around Discord AutoConversion of messages longer than 2000 chars to .txt - cmd_channel = self.bot.get_channel(Channels.bot_commands) - embed.description = TXT_EMBED_DESCRIPTION.format( - blocked_extension=extensions.pop(), - cmd_channel_mention=cmd_channel.mention - ) - elif extensions_blocked: - meta_channel = self.bot.get_channel(Channels.meta) - embed.description = DISALLOWED_EMBED_DESCRIPTION.format( - joined_whitelist=', '.join(self._get_whitelisted_file_formats()), - blocked_extensions_str=blocked_extensions_str, - meta_channel_mention=meta_channel.mention, - ) - - if embed.description: - log.info( - f"User '{message.author}' ({message.author.id}) uploaded blacklisted file(s): {blocked_extensions_str}", - extra={"attachment_list": [attachment.filename for attachment in message.attachments]} - ) - - await message.channel.send(f"Hey {message.author.mention}!", embed=embed) - - # Delete the offending message: - try: - await message.delete() - except NotFound: - log.info(f"Tried to delete message `{message.id}`, but message could not be found.") - - -async def setup(bot: Bot) -> None: - """Load the AntiMalware cog.""" - await bot.add_cog(AntiMalware(bot)) diff --git a/bot/exts/filters/antispam.py b/bot/exts/filters/antispam.py deleted file mode 100644 index 3b925bacd..000000000 --- a/bot/exts/filters/antispam.py +++ /dev/null @@ -1,324 +0,0 @@ -import asyncio -from collections import defaultdict -from collections.abc import Mapping -from dataclasses import dataclass, field -from datetime import timedelta -from itertools import takewhile -from operator import attrgetter, itemgetter -from typing import Dict, Iterable, List, Set - -import arrow -from botcore.utils import scheduling -from discord import Colour, Member, Message, MessageType, NotFound, Object, TextChannel -from discord.ext.commands import Cog - -from bot import rules -from bot.bot import Bot -from bot.constants import ( - AntiSpam as AntiSpamConfig, Channels, Colours, DEBUG_MODE, Event, Filter, Guild as GuildConfig, Icons -) -from bot.converters import Duration -from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME -from bot.exts.moderation.modlog import ModLog -from bot.log import get_logger -from bot.utils import lock -from bot.utils.message_cache import MessageCache -from bot.utils.messages import format_user, send_attachments - -log = get_logger(__name__) - -RULE_FUNCTION_MAPPING = { - 'attachments': rules.apply_attachments, - 'burst': rules.apply_burst, - # burst shared is temporarily disabled due to a bug - # 'burst_shared': rules.apply_burst_shared, - 'chars': rules.apply_chars, - 'discord_emojis': rules.apply_discord_emojis, - 'duplicates': rules.apply_duplicates, - 'links': rules.apply_links, - 'mentions': rules.apply_mentions, - 'newlines': rules.apply_newlines, - 'role_mentions': rules.apply_role_mentions, -} - - -@dataclass -class DeletionContext: - """Represents a Deletion Context for a single spam event.""" - - members: frozenset[Member] - triggered_in: TextChannel - channels: set[TextChannel] = field(default_factory=set) - rules: Set[str] = field(default_factory=set) - messages: Dict[int, Message] = field(default_factory=dict) - attachments: List[List[str]] = field(default_factory=list) - - async def add(self, rule_name: str, channels: Iterable[TextChannel], messages: Iterable[Message]) -> None: - """Adds new rule violation events to the deletion context.""" - self.rules.add(rule_name) - - self.channels.update(channels) - - for message in messages: - if message.id not in self.messages: - self.messages[message.id] = message - - # Re-upload attachments - destination = message.guild.get_channel(Channels.attachment_log) - urls = await send_attachments(message, destination, link_large=False) - self.attachments.append(urls) - - async def upload_messages(self, actor_id: int, modlog: ModLog) -> None: - """Method that takes care of uploading the queue and posting modlog alert.""" - triggered_by_users = ", ".join(format_user(m) for m in self.members) - triggered_in_channel = f"**Triggered in:** {self.triggered_in.mention}\n" if len(self.channels) > 1 else "" - channels_description = ", ".join(channel.mention for channel in self.channels) - - mod_alert_message = ( - f"**Triggered by:** {triggered_by_users}\n" - f"{triggered_in_channel}" - f"**Channels:** {channels_description}\n" - f"**Rules:** {', '.join(rule for rule in self.rules)}\n" - ) - - messages_as_list = list(self.messages.values()) - first_message = messages_as_list[0] - # For multiple messages and those with attachments or excessive newlines, use the logs API - if any(( - len(messages_as_list) > 1, - len(first_message.attachments) > 0, - first_message.content.count('\n') > 15 - )): - url = await modlog.upload_log(self.messages.values(), actor_id, self.attachments) - mod_alert_message += f"A complete log of the offending messages can be found [here]({url})" - else: - mod_alert_message += "Message:\n" - content = first_message.clean_content - remaining_chars = 4080 - len(mod_alert_message) - - if len(content) > remaining_chars: - url = await modlog.upload_log([first_message], actor_id, self.attachments) - log_site_msg = f"The full message can be found [here]({url})" - content = content[:remaining_chars - (3 + len(log_site_msg))] + "..." - - mod_alert_message += content - - await modlog.send_log_message( - content=", ".join(str(m.id) for m in self.members), # quality-of-life improvement for mobile moderators - icon_url=Icons.filtering, - colour=Colour(Colours.soft_red), - title="Spam detected!", - text=mod_alert_message, - thumbnail=first_message.author.display_avatar.url, - channel_id=Channels.mod_alerts, - ping_everyone=AntiSpamConfig.ping_everyone - ) - - -class AntiSpam(Cog): - """Cog that controls our anti-spam measures.""" - - def __init__(self, bot: Bot, validation_errors: Dict[str, str]) -> None: - self.bot = bot - self.validation_errors = validation_errors - role_id = AntiSpamConfig.punishment['role_id'] - self.muted_role = Object(role_id) - self.expiration_date_converter = Duration() - - self.message_deletion_queue = dict() - - # Fetch the rule configuration with the highest rule interval. - max_interval_config = max( - AntiSpamConfig.rules.values(), - key=itemgetter('interval') - ) - self.max_interval = max_interval_config['interval'] - self.cache = MessageCache(AntiSpamConfig.cache_size, newest_first=True) - - @property - def mod_log(self) -> ModLog: - """Allows for easy access of the ModLog cog.""" - return self.bot.get_cog("ModLog") - - async def cog_load(self) -> None: - """Unloads the cog and alerts admins if configuration validation failed.""" - await self.bot.wait_until_guild_available() - if self.validation_errors: - body = "**The following errors were encountered:**\n" - body += "\n".join(f"- {error}" for error in self.validation_errors.values()) - body += "\n\n**The cog has been unloaded.**" - - await self.mod_log.send_log_message( - title="Error: AntiSpam configuration validation failed!", - text=body, - ping_everyone=True, - icon_url=Icons.token_removed, - colour=Colour.red() - ) - - self.bot.remove_cog(self.__class__.__name__) - return - - @Cog.listener() - async def on_message(self, message: Message) -> None: - """Applies the antispam rules to each received message.""" - if ( - not message.guild - or message.guild.id != GuildConfig.id - or message.author.bot - or (getattr(message.channel, "category", None) and message.channel.category.name == JAM_CATEGORY_NAME) - or (message.channel.id in Filter.channel_whitelist and not DEBUG_MODE) - or (any(role.id in Filter.role_whitelist for role in message.author.roles) and not DEBUG_MODE) - or message.type == MessageType.auto_moderation_action - ): - return - - self.cache.append(message) - - earliest_relevant_at = arrow.utcnow() - timedelta(seconds=self.max_interval) - relevant_messages = list(takewhile(lambda msg: msg.created_at > earliest_relevant_at, self.cache)) - - for rule_name in AntiSpamConfig.rules: - rule_config = AntiSpamConfig.rules[rule_name] - rule_function = RULE_FUNCTION_MAPPING[rule_name] - - # Create a list of messages that were sent in the interval that the rule cares about. - latest_interesting_stamp = arrow.utcnow() - timedelta(seconds=rule_config['interval']) - messages_for_rule = list( - takewhile(lambda msg: msg.created_at > latest_interesting_stamp, relevant_messages) - ) - - result = await rule_function(message, messages_for_rule, rule_config) - - # If the rule returns `None`, that means the message didn't violate it. - # If it doesn't, it returns a tuple in the form `(str, Iterable[discord.Member])` - # which contains the reason for why the message violated the rule and - # an iterable of all members that violated the rule. - if result is not None: - self.bot.stats.incr(f"mod_alerts.{rule_name}") - reason, members, relevant_messages = result - full_reason = f"`{rule_name}` rule: {reason}" - - # If there's no spam event going on for this channel, start a new Message Deletion Context - authors_set = frozenset(members) - if authors_set not in self.message_deletion_queue: - log.trace(f"Creating queue for members `{authors_set}`") - self.message_deletion_queue[authors_set] = DeletionContext(authors_set, message.channel) - scheduling.create_task( - self._process_deletion_context(authors_set), - name=f"AntiSpam._process_deletion_context({authors_set})" - ) - - # Add the relevant of this trigger to the Deletion Context - await self.message_deletion_queue[authors_set].add( - rule_name=rule_name, - channels=set(message.channel for message in relevant_messages), - messages=relevant_messages - ) - - for member in members: - scheduling.create_task( - self.punish(message, member, full_reason), - name=f"AntiSpam.punish(message={message.id}, member={member.id}, rule={rule_name})" - ) - - await self.maybe_delete_messages(relevant_messages) - break - - @lock.lock_arg("antispam.punish", "member", attrgetter("id")) - async def punish(self, msg: Message, member: Member, reason: str) -> None: - """Punishes the given member for triggering an antispam rule.""" - if not any(role.id == self.muted_role.id for role in member.roles): - remove_role_after = AntiSpamConfig.punishment['remove_after'] - - # Get context and make sure the bot becomes the actor of infraction by patching the `author` attributes - context = await self.bot.get_context(msg) - context.author = self.bot.user - - # Since we're going to invoke the tempmute command directly, we need to manually call the converter. - dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S") - await context.invoke( - self.bot.get_command('tempmute'), - member, - dt_remove_role_after, - reason=reason - ) - - async def maybe_delete_messages(self, messages: List[Message]) -> None: - """Cleans the messages if cleaning is configured.""" - if AntiSpamConfig.clean_offending: - # If we have more than one message, we can use bulk delete. - if len(messages) > 1: - message_ids = [message.id for message in messages] - self.mod_log.ignore(Event.message_delete, *message_ids) - channel_messages = defaultdict(list) - for message in messages: - channel_messages[message.channel].append(message) - for channel, messages in channel_messages.items(): - try: - await channel.delete_messages(messages) - except NotFound: - # In the rare case where we found messages matching the - # spam filter across multiple channels, it is possible - # that a single channel will only contain a single message - # to delete. If that should be the case, discord.py will - # use the "delete single message" endpoint instead of the - # bulk delete endpoint, and the single message deletion - # endpoint will complain if you give it that does not exist. - # As this means that we have no other message to delete in - # this channel (and message deletes work per-channel), - # we can just log an exception and carry on with business. - log.info(f"Tried to delete message `{messages[0].id}`, but message could not be found.") - - # Otherwise, the bulk delete endpoint will throw up. - # Delete the message directly instead. - else: - self.mod_log.ignore(Event.message_delete, messages[0].id) - try: - await messages[0].delete() - except NotFound: - log.info(f"Tried to delete message `{messages[0].id}`, but message could not be found.") - - async def _process_deletion_context(self, context_id: frozenset) -> None: - """Processes the Deletion Context queue.""" - log.trace("Sleeping before processing message deletion queue.") - await asyncio.sleep(10) - - if context_id not in self.message_deletion_queue: - log.error(f"Started processing deletion queue for context `{context_id}`, but it was not found!") - return - - deletion_context = self.message_deletion_queue.pop(context_id) - await deletion_context.upload_messages(self.bot.user.id, self.mod_log) - - @Cog.listener() - async def on_message_edit(self, before: Message, after: Message) -> None: - """Updates the message in the cache, if it's cached.""" - self.cache.update(after) - - -def validate_config(rules_: Mapping = AntiSpamConfig.rules) -> Dict[str, str]: - """Validates the antispam configs.""" - validation_errors = {} - for name, config in rules_.items(): - if name not in RULE_FUNCTION_MAPPING: - log.error( - f"Unrecognized antispam rule `{name}`. " - f"Valid rules are: {', '.join(RULE_FUNCTION_MAPPING)}" - ) - validation_errors[name] = f"`{name}` is not recognized as an antispam rule." - continue - for required_key in ('interval', 'max'): - if required_key not in config: - log.error( - f"`{required_key}` is required but was not " - f"set in rule `{name}`'s configuration." - ) - validation_errors[name] = f"Key `{required_key}` is required but not set for rule `{name}`" - return validation_errors - - -async def setup(bot: Bot) -> None: - """Validate the AntiSpam configs and load the AntiSpam cog.""" - validation_errors = validate_config() - await bot.add_cog(AntiSpam(bot, validation_errors)) diff --git a/bot/exts/filters/filter_lists.py b/bot/exts/filters/filter_lists.py deleted file mode 100644 index c643f9a84..000000000 --- a/bot/exts/filters/filter_lists.py +++ /dev/null @@ -1,297 +0,0 @@ -import re -from typing import Optional - -from botcore.site_api import ResponseCodeError -from discord import Colour, Embed -from discord.ext.commands import BadArgument, Cog, Context, IDConverter, group, has_any_role - -from bot import constants -from bot.bot import Bot -from bot.constants import Channels -from bot.converters import ValidDiscordServerInvite, ValidFilterListType -from bot.log import get_logger -from bot.pagination import LinePaginator - -log = get_logger(__name__) - - -class FilterLists(Cog): - """Commands for blacklisting and whitelisting things.""" - - methods_with_filterlist_types = [ - "allow_add", - "allow_delete", - "allow_get", - "deny_add", - "deny_delete", - "deny_get", - ] - - def __init__(self, bot: Bot) -> None: - self.bot = bot - - async def cog_load(self) -> None: - """Add the valid FilterList types to the docstrings, so they'll appear in !help invocations.""" - await self.bot.wait_until_guild_available() - - # Add valid filterlist types to the docstrings - valid_types = await ValidFilterListType.get_valid_types(self.bot) - valid_types = [f"`{type_.lower()}`" for type_ in valid_types] - - for method_name in self.methods_with_filterlist_types: - command = getattr(self, method_name) - command.help = ( - f"{command.help}\n\nValid **list_type** values are {', '.join(valid_types)}." - ) - - async def _add_data( - self, - ctx: Context, - allowed: bool, - list_type: ValidFilterListType, - content: str, - comment: Optional[str] = None, - ) -> None: - """Add an item to a filterlist.""" - allow_type = "whitelist" if allowed else "blacklist" - - # If this is a guild invite, we gotta validate it. - if list_type == "GUILD_INVITE": - guild_data = await self._validate_guild_invite(ctx, content) - content = guild_data.get("id") - - # Some guild invites are autoban filters, which require the mod - # to set a comment which includes [autoban]. - # Having the guild name in the comment is still useful when reviewing - # filter list, so prepend it to the set comment in case some mod forgets. - guild_name_part = f'Guild "{guild_data["name"]}"' if "name" in guild_data else None - - comment = " - ".join( - comment_part - for comment_part in (guild_name_part, comment) - if comment_part - ) - - # If it's a file format, let's make sure it has a leading dot. - elif list_type == "FILE_FORMAT" and not content.startswith("."): - content = f".{content}" - - # If it's a filter token, validate the passed regex - elif list_type == "FILTER_TOKEN": - try: - re.compile(content) - except re.error as e: - await ctx.message.add_reaction("❌") - await ctx.send( - f"{ctx.author.mention} that's not a valid regex! " - f"Regex error message: {e.msg}." - ) - return - - # Try to add the item to the database - log.trace(f"Trying to add the {content} item to the {list_type} {allow_type}") - payload = { - "allowed": allowed, - "type": list_type, - "content": content, - "comment": comment, - } - - try: - item = await self.bot.api_client.post( - "bot/filter-lists", - json=payload - ) - except ResponseCodeError as e: - if e.status == 400: - await ctx.message.add_reaction("❌") - log.debug( - f"{ctx.author} tried to add data to a {allow_type}, but the API returned 400, " - "probably because the request violated the UniqueConstraint." - ) - raise BadArgument( - f"Unable to add the item to the {allow_type}. " - "The item probably already exists. Keep in mind that a " - "blacklist and a whitelist for the same item cannot co-exist, " - "and we do not permit any duplicates." - ) - raise - - # If it is an autoban trigger we send a warning in #mod-meta - if comment and "[autoban]" in comment: - await self.bot.get_channel(Channels.mod_meta).send( - f":warning: Heads-up! The new `{list_type}` filter " - f"`{content}` (`{comment}`) will automatically ban users." - ) - - # Insert the item into the cache - self.bot.insert_item_into_filter_list_cache(item) - await ctx.message.add_reaction("✅") - - async def _delete_data(self, ctx: Context, allowed: bool, list_type: ValidFilterListType, content: str) -> None: - """Remove an item from a filterlist.""" - allow_type = "whitelist" if allowed else "blacklist" - - # If this is a server invite, we need to convert it. - if list_type == "GUILD_INVITE" and not IDConverter()._get_id_match(content): - guild_data = await self._validate_guild_invite(ctx, content) - content = guild_data.get("id") - - # If it's a file format, let's make sure it has a leading dot. - elif list_type == "FILE_FORMAT" and not content.startswith("."): - content = f".{content}" - - # Find the content and delete it. - log.trace(f"Trying to delete the {content} item from the {list_type} {allow_type}") - item = self.bot.filter_list_cache[f"{list_type}.{allowed}"].get(content) - - if item is not None: - try: - await self.bot.api_client.delete( - f"bot/filter-lists/{item['id']}" - ) - del self.bot.filter_list_cache[f"{list_type}.{allowed}"][content] - await ctx.message.add_reaction("✅") - except ResponseCodeError as e: - log.debug( - f"{ctx.author} tried to delete an item with the id {item['id']}, but " - f"the API raised an unexpected error: {e}" - ) - await ctx.message.add_reaction("❌") - else: - await ctx.message.add_reaction("❌") - - async def _list_all_data(self, ctx: Context, allowed: bool, list_type: ValidFilterListType) -> None: - """Paginate and display all items in a filterlist.""" - allow_type = "whitelist" if allowed else "blacklist" - result = self.bot.filter_list_cache[f"{list_type}.{allowed}"] - - # Build a list of lines we want to show in the paginator - lines = [] - for content, metadata in result.items(): - line = f"• `{content}`" - - if comment := metadata.get("comment"): - line += f" - {comment}" - - lines.append(line) - lines = sorted(lines) - - # Build the embed - list_type_plural = list_type.lower().replace("_", " ").title() + "s" - embed = Embed( - title=f"{allow_type.title()}ed {list_type_plural} ({len(result)} total)", - colour=Colour.blue() - ) - log.trace(f"Trying to list {len(result)} items from the {list_type.lower()} {allow_type}") - - if result: - await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False) - else: - embed.description = "Hmmm, seems like there's nothing here yet." - await ctx.send(embed=embed) - await ctx.message.add_reaction("❌") - - async def _sync_data(self, ctx: Context) -> None: - """Syncs the filterlists with the API.""" - try: - log.trace("Attempting to sync FilterList cache with data from the API.") - await self.bot.cache_filter_list_data() - await ctx.message.add_reaction("✅") - except ResponseCodeError as e: - log.debug( - f"{ctx.author} tried to sync FilterList cache data but " - f"the API raised an unexpected error: {e}" - ) - await ctx.message.add_reaction("❌") - - @staticmethod - async def _validate_guild_invite(ctx: Context, invite: str) -> dict: - """ - Validates a guild invite, and returns the guild info as a dict. - - Will raise a BadArgument if the guild invite is invalid. - """ - log.trace(f"Attempting to validate whether or not {invite} is a guild invite.") - validator = ValidDiscordServerInvite() - guild_data = await validator.convert(ctx, invite) - - # If we make it this far without raising a BadArgument, the invite is - # valid. Let's return a dict of guild information. - log.trace(f"{invite} validated as server invite. Converting to ID.") - return guild_data - - @group(aliases=("allowlist", "allow", "al", "wl")) - async def whitelist(self, ctx: Context) -> None: - """Group for whitelisting commands.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @group(aliases=("denylist", "deny", "bl", "dl")) - async def blacklist(self, ctx: Context) -> None: - """Group for blacklisting commands.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @whitelist.command(name="add", aliases=("a", "set")) - async def allow_add( - self, - ctx: Context, - list_type: ValidFilterListType, - content: str, - *, - comment: Optional[str] = None, - ) -> None: - """Add an item to the specified allowlist.""" - await self._add_data(ctx, True, list_type, content, comment) - - @blacklist.command(name="add", aliases=("a", "set")) - async def deny_add( - self, - ctx: Context, - list_type: ValidFilterListType, - content: str, - *, - comment: Optional[str] = None, - ) -> None: - """Add an item to the specified denylist.""" - await self._add_data(ctx, False, list_type, content, comment) - - @whitelist.command(name="remove", aliases=("delete", "rm",)) - async def allow_delete(self, ctx: Context, list_type: ValidFilterListType, content: str) -> None: - """Remove an item from the specified allowlist.""" - await self._delete_data(ctx, True, list_type, content) - - @blacklist.command(name="remove", aliases=("delete", "rm",)) - async def deny_delete(self, ctx: Context, list_type: ValidFilterListType, content: str) -> None: - """Remove an item from the specified denylist.""" - await self._delete_data(ctx, False, list_type, content) - - @whitelist.command(name="get", aliases=("list", "ls", "fetch", "show")) - async def allow_get(self, ctx: Context, list_type: ValidFilterListType) -> None: - """Get the contents of a specified allowlist.""" - await self._list_all_data(ctx, True, list_type) - - @blacklist.command(name="get", aliases=("list", "ls", "fetch", "show")) - async def deny_get(self, ctx: Context, list_type: ValidFilterListType) -> None: - """Get the contents of a specified denylist.""" - await self._list_all_data(ctx, False, list_type) - - @whitelist.command(name="sync", aliases=("s",)) - async def allow_sync(self, ctx: Context) -> None: - """Syncs both allowlists and denylists with the API.""" - await self._sync_data(ctx) - - @blacklist.command(name="sync", aliases=("s",)) - async def deny_sync(self, ctx: Context) -> None: - """Syncs both allowlists and denylists with the API.""" - await self._sync_data(ctx) - - async def cog_check(self, ctx: Context) -> bool: - """Only allow moderators to invoke the commands in this cog.""" - return await has_any_role(*constants.MODERATION_ROLES).predicate(ctx) - - -async def setup(bot: Bot) -> None: - """Load the FilterLists cog.""" - await bot.add_cog(FilterLists(bot)) diff --git a/bot/exts/filters/filtering.py b/bot/exts/filters/filtering.py deleted file mode 100644 index ca6ad0064..000000000 --- a/bot/exts/filters/filtering.py +++ /dev/null @@ -1,735 +0,0 @@ -import asyncio -import re -import unicodedata -import urllib.parse -from datetime import timedelta -from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Tuple, Union - -import arrow -import dateutil.parser -import regex -import tldextract -from async_rediscache import RedisCache -from botcore.site_api import ResponseCodeError -from botcore.utils import scheduling -from botcore.utils.regex import DISCORD_INVITE -from dateutil.relativedelta import relativedelta -from discord import ChannelType, Colour, Embed, Forbidden, HTTPException, Member, Message, NotFound, TextChannel -from discord.ext.commands import Cog -from discord.utils import escape_markdown - -from bot.bot import Bot -from bot.constants import Bot as BotConfig, Channels, Colours, Filter, Guild, Icons, URLs -from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME -from bot.exts.moderation.modlog import ModLog -from bot.log import get_logger -from bot.utils.messages import format_user - -log = get_logger(__name__) - -# Regular expressions -CODE_BLOCK_RE = re.compile( - r"(?P<delim>``?)[^`]+?(?P=delim)(?!`+)" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) -EVERYONE_PING_RE = re.compile(rf"@everyone|<@&{Guild.id}>|@here") -SPOILER_RE = re.compile(r"(\|\|.+?\|\|)", re.DOTALL) -URL_RE = re.compile(r"(https?://[^\s]+)", flags=re.IGNORECASE) - -# Exclude variation selectors from zalgo because they're actually invisible. -VARIATION_SELECTORS = r"\uFE00-\uFE0F\U000E0100-\U000E01EF" -INVISIBLE_RE = regex.compile(rf"[{VARIATION_SELECTORS}\p{{UNASSIGNED}}\p{{FORMAT}}\p{{CONTROL}}--\s]", regex.V1) -ZALGO_RE = regex.compile(rf"[\p{{NONSPACING MARK}}\p{{ENCLOSING MARK}}--[{VARIATION_SELECTORS}]]", regex.V1) - -# Other constants. -DAYS_BETWEEN_ALERTS = 3 -OFFENSIVE_MSG_DELETE_TIME = timedelta(days=Filter.offensive_msg_delete_days) - -# Autoban -LINK_PASSWORD = "https://support.discord.com/hc/en-us/articles/218410947-I-forgot-my-Password-Where-can-I-set-a-new-one" -LINK_2FA = "https://support.discord.com/hc/en-us/articles/219576828-Setting-up-Two-Factor-Authentication" -AUTO_BAN_REASON = ( - "Your account has been used to send links to a phishing website. You have been automatically banned. " - "If you are not aware of sending them, that means your account has been compromised.\n\n" - - f"Here is a guide from Discord on [how to change your password]({LINK_PASSWORD}).\n\n" - - f"We also highly recommend that you [enable 2 factor authentication on your account]({LINK_2FA}), " - "for heightened security.\n\n" - - "Once you have changed your password, feel free to follow the instructions at the bottom of " - "this message to appeal your ban." -) -AUTO_BAN_DURATION = timedelta(days=4) - -FilterMatch = Union[re.Match, dict, bool, List[Embed]] - - -class Stats(NamedTuple): - """Additional stats on a triggered filter to append to a mod log.""" - - message_content: str - additional_embeds: Optional[List[Embed]] - - -class Filtering(Cog): - """Filtering out invites, blacklisting domains, and warning us of certain regular expressions.""" - - # Redis cache mapping a user ID to the last timestamp a bad nickname alert was sent - name_alerts = RedisCache() - - def __init__(self, bot: Bot): - self.bot = bot - self.scheduler = scheduling.Scheduler(self.__class__.__name__) - self.name_lock = asyncio.Lock() - - staff_mistake_str = "If you believe this was a mistake, please let staff know!" - self.filters = { - "filter_zalgo": { - "enabled": Filter.filter_zalgo, - "function": self._has_zalgo, - "type": "filter", - "content_only": True, - "user_notification": Filter.notify_user_zalgo, - "notification_msg": ( - "Your post has been removed for abusing Unicode character rendering (aka Zalgo text). " - f"{staff_mistake_str}" - ), - "schedule_deletion": False - }, - "filter_invites": { - "enabled": Filter.filter_invites, - "function": self._has_invites, - "type": "filter", - "content_only": True, - "user_notification": Filter.notify_user_invites, - "notification_msg": ( - f"Per Rule 6, your invite link has been removed. {staff_mistake_str}\n\n" - r"Our server rules can be found here: <https://pythondiscord.com/pages/rules>" - ), - "schedule_deletion": False - }, - "filter_domains": { - "enabled": Filter.filter_domains, - "function": self._has_urls, - "type": "filter", - "content_only": True, - "user_notification": Filter.notify_user_domains, - "notification_msg": ( - f"Your URL has been removed because it matched a blacklisted domain. {staff_mistake_str}" - ), - "schedule_deletion": False - }, - "watch_regex": { - "enabled": Filter.watch_regex, - "function": self._has_watch_regex_match, - "type": "watchlist", - "content_only": True, - "schedule_deletion": True - }, - "watch_rich_embeds": { - "enabled": Filter.watch_rich_embeds, - "function": self._has_rich_embed, - "type": "watchlist", - "content_only": False, - "schedule_deletion": False - }, - "filter_everyone_ping": { - "enabled": Filter.filter_everyone_ping, - "function": self._has_everyone_ping, - "type": "filter", - "content_only": True, - "user_notification": Filter.notify_user_everyone_ping, - "notification_msg": ( - "Please don't try to ping `@everyone` or `@here`. " - f"Your message has been removed. {staff_mistake_str}" - ), - "schedule_deletion": False, - "ping_everyone": False - }, - } - - async def cog_unload(self) -> None: - """Cancel scheduled tasks.""" - self.scheduler.cancel_all() - - def _get_filterlist_items(self, list_type: str, *, allowed: bool) -> list: - """Fetch items from the filter_list_cache.""" - return self.bot.filter_list_cache[f"{list_type.upper()}.{allowed}"].keys() - - def _get_filterlist_value(self, list_type: str, value: Any, *, allowed: bool) -> dict: - """Fetch one specific value from filter_list_cache.""" - return self.bot.filter_list_cache[f"{list_type.upper()}.{allowed}"][value] - - @staticmethod - def _expand_spoilers(text: str) -> str: - """Return a string containing all interpretations of a spoilered message.""" - split_text = SPOILER_RE.split(text) - return ''.join( - split_text[0::2] + split_text[1::2] + split_text - ) - - @property - def mod_log(self) -> ModLog: - """Get currently loaded ModLog cog instance.""" - return self.bot.get_cog("ModLog") - - @Cog.listener() - async def on_message(self, msg: Message) -> None: - """Invoke message filter for new messages.""" - await self._filter_message(msg) - - # Ignore webhook messages. - if msg.webhook_id is None: - await self.check_bad_words_in_name(msg.author) - - @Cog.listener() - async def on_message_edit(self, before: Message, after: Message) -> None: - """ - Invoke message filter for message edits. - - Also calculates the time delta from the previous edit or when message was sent if there's no prior edits. - """ - # We only care about changes to the message contents/attachments and embed additions, not pin status etc. - if all(( - before.content == after.content, # content hasn't changed - before.attachments == after.attachments, # attachments haven't changed - len(before.embeds) >= len(after.embeds) # embeds haven't been added - )): - return - - if not before.edited_at: - delta = relativedelta(after.edited_at, before.created_at).microseconds - else: - delta = relativedelta(after.edited_at, before.edited_at).microseconds - await self._filter_message(after, delta) - - @Cog.listener() - async def on_voice_state_update(self, member: Member, *_) -> None: - """Checks for bad words in usernames when users join, switch or leave a voice channel.""" - await self.check_bad_words_in_name(member) - - def get_name_match(self, name: str) -> Optional[re.Match]: - """Check bad words from passed string (name). Return the first match found.""" - normalised_name = unicodedata.normalize("NFKC", name) - cleaned_normalised_name = "".join([c for c in normalised_name if not unicodedata.combining(c)]) - - # Run filters against normalised, cleaned normalised and the original name, - # in case we have filters for one but not the other. - names_to_check = (name, normalised_name, cleaned_normalised_name) - - watchlist_patterns = self._get_filterlist_items('filter_token', allowed=False) - for pattern in watchlist_patterns: - for name in names_to_check: - if match := re.search(pattern, name, flags=re.IGNORECASE): - return match - return None - - async def check_send_alert(self, member: Member) -> bool: - """When there is less than 3 days after last alert, return `False`, otherwise `True`.""" - if last_alert := await self.name_alerts.get(member.id): - last_alert = arrow.get(last_alert) - if arrow.utcnow() - timedelta(days=DAYS_BETWEEN_ALERTS) < last_alert: - log.trace(f"Last alert was too recent for {member}'s nickname.") - return False - - return True - - async def check_bad_words_in_name(self, member: Member) -> None: - """Send a mod alert every 3 days if a username still matches a watchlist pattern.""" - # Use lock to avoid race conditions - async with self.name_lock: - # Check if we recently alerted about this user first, - # to avoid running all the filter tokens against their name again. - if not await self.check_send_alert(member): - return - - # Check whether the users display name contains any words in our blacklist - match = self.get_name_match(member.display_name) - if not match: - return - - log.info(f"Sending bad nickname alert for '{member.display_name}' ({member.id}).") - - log_string = ( - f"**User:** {format_user(member)}\n" - f"**Display Name:** {escape_markdown(member.display_name)}\n" - f"**Bad Match:** {match.group()}" - ) - - await self.mod_log.send_log_message( - content=str(member.id), # quality-of-life improvement for mobile moderators - icon_url=Icons.token_removed, - colour=Colours.soft_red, - title="Username filtering alert", - text=log_string, - channel_id=Channels.mod_alerts, - thumbnail=member.display_avatar.url, - ping_everyone=True - ) - - # Update time when alert sent - await self.name_alerts.set(member.id, arrow.utcnow().timestamp()) - - async def filter_snekbox_output(self, result: str, msg: Message) -> bool: - """ - Filter the result of a snekbox command to see if it violates any of our rules, and then respond accordingly. - - Also requires the original message, to check whether to filter and for mod logs. - Returns whether a filter was triggered or not. - """ - filter_triggered = False - # Should we filter this message? - if self._check_filter(msg): - for filter_name, _filter in self.filters.items(): - # Is this specific filter enabled in the config? - # We also do not need to worry about filters that take the full message, - # since all we have is an arbitrary string. - if _filter["enabled"] and _filter["content_only"]: - filter_result = await _filter["function"](result) - reason = None - - if isinstance(filter_result, tuple): - match, reason = filter_result - else: - match = filter_result - - if match: - # If this is a filter (not a watchlist), we set the variable so we know - # that it has been triggered - if _filter["type"] == "filter": - filter_triggered = True - - stats = self._add_stats(filter_name, match, result) - await self._send_log(filter_name, _filter, msg, stats, reason, is_eval=True) - - break # We don't want multiple filters to trigger - - return filter_triggered - - async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None: - """Filter the input message to see if it violates any of our rules, and then respond accordingly.""" - # Should we filter this message? - if self._check_filter(msg): - for filter_name, _filter in self.filters.items(): - # Is this specific filter enabled in the config? - if _filter["enabled"]: - # Double trigger check for the embeds filter - if filter_name == "watch_rich_embeds": - # If the edit delta is less than 0.001 seconds, then we're probably dealing - # with a double filter trigger. - if delta is not None and delta < 100: - continue - - if filter_name in ("filter_invites", "filter_everyone_ping"): - # Disable invites filter in codejam team channels - category = getattr(msg.channel, "category", None) - if category and category.name == JAM_CATEGORY_NAME: - continue - - # Does the filter only need the message content or the full message? - if _filter["content_only"]: - payload = msg.content - else: - payload = msg - - result = await _filter["function"](payload) - reason = None - - if isinstance(result, tuple): - match, reason = result - else: - match = result - - if match: - is_private = msg.channel.type is ChannelType.private - - # If this is a filter (not a watchlist) and not in a DM, delete the message. - if _filter["type"] == "filter" and not is_private: - try: - # Embeds (can?) trigger both the `on_message` and `on_message_edit` - # event handlers, triggering filtering twice for the same message. - # - # If `on_message`-triggered filtering already deleted the message - # then `on_message_edit`-triggered filtering will raise exception - # since the message no longer exists. - # - # In addition, to avoid sending two notifications to the user, the - # logs, and mod_alert, we return if the message no longer exists. - await msg.delete() - except NotFound: - return - - # Notify the user if the filter specifies - if _filter["user_notification"]: - await self.notify_member(msg.author, _filter["notification_msg"], msg.channel) - - # If the message is classed as offensive, we store it in the site db and - # it will be deleted after one week. - if _filter["schedule_deletion"] and not is_private: - delete_date = (msg.created_at + OFFENSIVE_MSG_DELETE_TIME).isoformat() - data = { - 'id': msg.id, - 'channel_id': msg.channel.id, - 'delete_date': delete_date - } - - try: - await self.bot.api_client.post('bot/offensive-messages', json=data) - except ResponseCodeError as e: - if e.status == 400 and "already exists" in e.response_json.get("id", [""])[0]: - log.debug(f"Offensive message {msg.id} already exists.") - else: - log.error(f"Offensive message {msg.id} failed to post: {e}") - else: - self.schedule_msg_delete(data) - log.trace(f"Offensive message {msg.id} will be deleted on {delete_date}") - - stats = self._add_stats(filter_name, match, msg.content) - - # If the filter reason contains `[autoban]`, we want to auto-ban the user. - # Also pass this to _send_log so mods are not pinged filter matches that are auto-actioned - autoban = reason and "[autoban]" in reason.lower() - if not autoban and filter_name == "filter_invites" and isinstance(result, dict): - autoban = any( - "[autoban]" in invite_info["reason"].lower() - for invite_info in result.values() - if invite_info.get("reason") - ) - - await self._send_log(filter_name, _filter, msg, stats, reason, autoban=autoban) - - if autoban: - # Create a new context, with the author as is the bot, and the channel as #mod-alerts. - # This sends the ban confirmation directly under watchlist trigger embed, to inform - # mods that the user was auto-banned for the message. - context = await self.bot.get_context(msg) - context.guild = self.bot.get_guild(Guild.id) - context.author = context.guild.get_member(self.bot.user.id) - context.channel = self.bot.get_channel(Channels.mod_alerts) - context.command = self.bot.get_command("tempban") - - await context.invoke( - context.command, - msg.author, - arrow.utcnow() + AUTO_BAN_DURATION, - reason=AUTO_BAN_REASON - ) - - break # We don't want multiple filters to trigger - - async def _send_log( - self, - filter_name: str, - _filter: Dict[str, Any], - msg: Message, - stats: Stats, - reason: Optional[str] = None, - *, - is_eval: bool = False, - autoban: bool = False, - ) -> None: - """Send a mod log for a triggered filter.""" - if msg.channel.type is ChannelType.private: - channel_str = "via DM" - ping_everyone = False - else: - channel_str = f"in {msg.channel.mention}" - # Allow specific filters to override ping_everyone - ping_everyone = Filter.ping_everyone and _filter.get("ping_everyone", True) - - content = str(msg.author.id) # quality-of-life improvement for mobile moderators - - # If we are going to autoban, we don't want to ping and don't need the user ID - if autoban: - ping_everyone = False - content = None - - eval_msg = f"using {BotConfig.prefix}eval " if is_eval else "" - footer = f"Reason: {reason}" if reason else None - message = ( - f"The {filter_name} {_filter['type']} was triggered by {format_user(msg.author)} " - f"{channel_str} {eval_msg}with [the following message]({msg.jump_url}):\n\n" - f"{stats.message_content}" - ) - - log.debug(message) - - # Send pretty mod log embed to mod-alerts - await self.mod_log.send_log_message( - content=content, - icon_url=Icons.filtering, - colour=Colour(Colours.soft_red), - title=f"{_filter['type'].title()} triggered!", - text=message, - thumbnail=msg.author.display_avatar.url, - channel_id=Channels.mod_alerts, - ping_everyone=ping_everyone, - additional_embeds=stats.additional_embeds, - footer=footer, - ) - - def _add_stats(self, name: str, match: FilterMatch, content: str) -> Stats: - """Adds relevant statistical information to the relevant filter and increments the bot's stats.""" - # Word and match stats for watch_regex - if name == "watch_regex": - surroundings = match.string[max(match.start() - 10, 0): match.end() + 10] - message_content = ( - f"**Match:** '{match[0]}'\n" - f"**Location:** '...{escape_markdown(surroundings)}...'\n" - f"\n**Original Message:**\n{escape_markdown(content)}" - ) - else: # Use original content - message_content = content - - additional_embeds = None - - self.bot.stats.incr(f"filters.{name}") - - # The function returns True for invalid invites. - # They have no data so additional embeds can't be created for them. - if name == "filter_invites" and match is not True: - additional_embeds = [] - for _, data in match.items(): - reason = f"Reason: {data['reason']} | " if data.get('reason') else "" - embed = Embed(description=( - f"**Members:**\n{data['members']}\n" - f"**Active:**\n{data['active']}" - )) - embed.set_author(name=data["name"]) - embed.set_thumbnail(url=data["icon"]) - embed.set_footer(text=f"{reason}Guild ID: {data['id']}") - additional_embeds.append(embed) - - elif name == "watch_rich_embeds": - additional_embeds = match - - return Stats(message_content, additional_embeds) - - @staticmethod - def _check_filter(msg: Message) -> bool: - """Check whitelists to see if we should filter this message.""" - role_whitelisted = False - - if type(msg.author) is Member: # Only Member has roles, not User. - for role in msg.author.roles: - if role.id in Filter.role_whitelist: - role_whitelisted = True - - return ( - msg.channel.id not in Filter.channel_whitelist # Channel not in whitelist - and not role_whitelisted # Role not in whitelist - and not msg.author.bot # Author not a bot - ) - - async def _has_watch_regex_match(self, text: str) -> Tuple[Union[bool, re.Match], Optional[str]]: - """ - Return True if `text` matches any regex from `word_watchlist` or `token_watchlist` configs. - - `word_watchlist`'s patterns are placed between word boundaries while `token_watchlist` is - matched as-is. Spoilers are expanded, if any, and URLs are ignored. - Second return value is a reason written to database about blacklist entry (can be None). - """ - if SPOILER_RE.search(text): - text = self._expand_spoilers(text) - - text = self.clean_input(text) - - watchlist_patterns = self._get_filterlist_items('filter_token', allowed=False) - for pattern in watchlist_patterns: - match = re.search(pattern, text, flags=re.IGNORECASE) - if match: - return match, self._get_filterlist_value('filter_token', pattern, allowed=False)['comment'] - - return False, None - - async def _has_urls(self, text: str) -> Tuple[bool, Optional[str]]: - """ - Returns True if the text contains one of the blacklisted URLs from the config file. - - Second return value is a reason of URL blacklisting (can be None). - """ - text = self.clean_input(text) - - domain_blacklist = self._get_filterlist_items("domain_name", allowed=False) - for match in URL_RE.finditer(text): - for url in domain_blacklist: - if url.lower() in match.group(1).lower(): - blacklisted_parsed = tldextract.extract(url.lower()) - url_parsed = tldextract.extract(match.group(1).lower()) - if blacklisted_parsed.registered_domain == url_parsed.registered_domain: - return True, self._get_filterlist_value("domain_name", url, allowed=False)["comment"] - return False, None - - @staticmethod - async def _has_zalgo(text: str) -> bool: - """ - Returns True if the text contains zalgo characters. - - Zalgo range is \u0300 – \u036F and \u0489. - """ - return bool(ZALGO_RE.search(text)) - - async def _has_invites(self, text: str) -> Union[dict, bool]: - """ - Checks if there's any invites in the text content that aren't in the guild whitelist. - - If any are detected, a dictionary of invite data is returned, with a key per invite. - If none are detected, False is returned. - If we are unable to process an invite, True is returned. - - Attempts to catch some of common ways to try to cheat the system. - """ - text = self.clean_input(text) - - # Remove backslashes to prevent escape character aroundfuckery like - # discord\.gg/gdudes-pony-farm - text = text.replace("\\", "") - - invites = [m.group("invite") for m in DISCORD_INVITE.finditer(text)] - invite_data = dict() - for invite in invites: - invite = urllib.parse.quote_plus(invite.rstrip("/")) - if invite in invite_data: - continue - - response = await self.bot.http_session.get( - f"{URLs.discord_invite_api}/{invite}", params={"with_counts": "true"} - ) - response = await response.json() - guild = response.get("guild") - if guild is None: - # Lack of a "guild" key in the JSON response indicates either an group DM invite, an - # expired invite, or an invalid invite. The API does not currently differentiate - # between invalid and expired invites - return True - - guild_id = guild.get("id") - guild_invite_whitelist = self._get_filterlist_items("guild_invite", allowed=True) - guild_invite_blacklist = self._get_filterlist_items("guild_invite", allowed=False) - - # Is this invite allowed? - guild_partnered_or_verified = ( - 'PARTNERED' in guild.get("features", []) - or 'VERIFIED' in guild.get("features", []) - ) - invite_not_allowed = ( - guild_id in guild_invite_blacklist # Blacklisted guilds are never permitted. - or guild_id not in guild_invite_whitelist # Whitelisted guilds are always permitted. - and not guild_partnered_or_verified # Otherwise guilds have to be Verified or Partnered. - ) - - if invite_not_allowed: - reason = None - if guild_id in guild_invite_blacklist: - reason = self._get_filterlist_value("guild_invite", guild_id, allowed=False)["comment"] - - guild_icon_hash = guild["icon"] - guild_icon = ( - "https://cdn.discordapp.com/icons/" - f"{guild_id}/{guild_icon_hash}.png?size=512" - ) - - invite_data[invite] = { - "name": guild["name"], - "id": guild['id'], - "icon": guild_icon, - "members": response["approximate_member_count"], - "active": response["approximate_presence_count"], - "reason": reason - } - - return invite_data if invite_data else False - - @staticmethod - async def _has_rich_embed(msg: Message) -> Union[bool, List[Embed]]: - """Determines if `msg` contains any rich embeds not auto-generated from a URL.""" - if msg.embeds: - for embed in msg.embeds: - if embed.type == "rich": - urls = URL_RE.findall(msg.content) - if not embed.url or embed.url not in urls: - # If `embed.url` does not exist or if `embed.url` is not part of the content - # of the message, it's unlikely to be an auto-generated embed by Discord. - return msg.embeds - else: - log.trace( - "Found a rich embed sent by a regular user account, " - "but it was likely just an automatic URL embed." - ) - return False - return False - - @staticmethod - async def _has_everyone_ping(text: str) -> bool: - """Determines if `msg` contains an @everyone or @here ping outside of a codeblock.""" - # First pass to avoid running re.sub on every message - if not EVERYONE_PING_RE.search(text): - return False - - content_without_codeblocks = CODE_BLOCK_RE.sub("", text) - return bool(EVERYONE_PING_RE.search(content_without_codeblocks)) - - async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None: - """ - Notify filtered_member about a moderation action with the reason str. - - First attempts to DM the user, fall back to in-channel notification if user has DMs disabled - """ - try: - await filtered_member.send(reason) - except Forbidden: - await channel.send(f"{filtered_member.mention} {reason}") - - def schedule_msg_delete(self, msg: dict) -> None: - """Delete an offensive message once its deletion date is reached.""" - delete_at = dateutil.parser.isoparse(msg['delete_date']) - self.scheduler.schedule_at(delete_at, msg['id'], self.delete_offensive_msg(msg)) - - async def cog_load(self) -> None: - """Get all the pending message deletion from the API and reschedule them.""" - await self.bot.wait_until_ready() - response = await self.bot.api_client.get('bot/offensive-messages',) - - now = arrow.utcnow() - - for msg in response: - delete_at = dateutil.parser.isoparse(msg['delete_date']) - - if delete_at < now: - await self.delete_offensive_msg(msg) - else: - self.schedule_msg_delete(msg) - - async def delete_offensive_msg(self, msg: Mapping[str, int]) -> None: - """Delete an offensive message, and then delete it from the db.""" - try: - channel = self.bot.get_channel(msg['channel_id']) - if channel: - msg_obj = await channel.fetch_message(msg['id']) - await msg_obj.delete() - except NotFound: - log.info( - f"Tried to delete message {msg['id']}, but the message can't be found " - f"(it has been probably already deleted)." - ) - except HTTPException as e: - log.warning(f"Failed to delete message {msg['id']}: status {e.status}") - - await self.bot.api_client.delete(f'bot/offensive-messages/{msg["id"]}') - log.info(f"Deleted the offensive message with id {msg['id']}.") - - @staticmethod - def clean_input(string: str) -> str: - """Remove zalgo and invisible characters from `string`.""" - # For future consideration: remove characters in the Mc, Sk, and Lm categories too. - # Can be normalised with form C to merge char + combining char into a single char to avoid - # removing legit diacritics, but this would open up a way to bypass filters. - no_zalgo = ZALGO_RE.sub("", string) - return INVISIBLE_RE.sub("", no_zalgo) - - -async def setup(bot: Bot) -> None: - """Load the Filtering cog.""" - await bot.add_cog(Filtering(bot)) diff --git a/bot/exts/filters/security.py b/bot/exts/filters/security.py deleted file mode 100644 index 27e4d9752..000000000 --- a/bot/exts/filters/security.py +++ /dev/null @@ -1,30 +0,0 @@ -from discord.ext.commands import Cog, Context, NoPrivateMessage - -from bot.bot import Bot -from bot.log import get_logger - -log = get_logger(__name__) - - -class Security(Cog): - """Security-related helpers.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.bot.check(self.check_not_bot) # Global commands check - no bots can run any commands at all - self.bot.check(self.check_on_guild) # Global commands check - commands can't be run in a DM - - def check_not_bot(self, ctx: Context) -> bool: - """Check if the context is a bot user.""" - return not ctx.author.bot - - def check_on_guild(self, ctx: Context) -> bool: - """Check if the context is in a guild.""" - if ctx.guild is None: - raise NoPrivateMessage("This command cannot be used in private messages.") - return True - - -async def setup(bot: Bot) -> None: - """Load the Security cog.""" - await bot.add_cog(Security(bot)) diff --git a/bot/exts/filters/token_remover.py b/bot/exts/filters/token_remover.py deleted file mode 100644 index a0d5aa7b6..000000000 --- a/bot/exts/filters/token_remover.py +++ /dev/null @@ -1,233 +0,0 @@ -import base64 -import re -import typing as t - -from discord import Colour, Message, NotFound -from discord.ext.commands import Cog - -from bot import utils -from bot.bot import Bot -from bot.constants import Channels, Colours, Event, Icons -from bot.exts.moderation.modlog import ModLog -from bot.log import get_logger -from bot.utils.members import get_or_fetch_member -from bot.utils.messages import format_user - -log = get_logger(__name__) - -LOG_MESSAGE = ( - "Censored a seemingly valid token sent by {author} in {channel}, " - "token was `{user_id}.{timestamp}.{hmac}`" -) -UNKNOWN_USER_LOG_MESSAGE = "Decoded user ID: `{user_id}` (Not present in server)." -KNOWN_USER_LOG_MESSAGE = ( - "Decoded user ID: `{user_id}` **(Present in server)**.\n" - "This matches `{user_name}` and means this is likely a valid **{kind}** token." -) -DELETION_MESSAGE_TEMPLATE = ( - "Hey {mention}! I noticed you posted a seemingly valid Discord API " - "token in your message and have removed your message. " - "This means that your token has been **compromised**. " - "Please change your token **immediately** at: " - "<https://discordapp.com/developers/applications/me>\n\n" - "Feel free to re-post it with the token removed. " - "If you believe this was a mistake, please let us know!" -) -DISCORD_EPOCH = 1_420_070_400 -TOKEN_EPOCH = 1_293_840_000 - -# Three parts delimited by dots: user ID, creation timestamp, HMAC. -# The HMAC isn't parsed further, but it's in the regex to ensure it at least exists in the string. -# Each part only matches base64 URL-safe characters. -# Padding has never been observed, but the padding character '=' is matched just in case. -TOKEN_RE = re.compile(r"([\w\-=]+)\.([\w\-=]+)\.([\w\-=]+)", re.ASCII) - - -class Token(t.NamedTuple): - """A Discord Bot token.""" - - user_id: str - timestamp: str - hmac: str - - -class TokenRemover(Cog): - """Scans messages for potential discord.py bot tokens and removes them.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @property - def mod_log(self) -> ModLog: - """Get currently loaded ModLog cog instance.""" - return self.bot.get_cog("ModLog") - - @Cog.listener() - async def on_message(self, msg: Message) -> None: - """ - Check each message for a string that matches Discord's token pattern. - - See: https://discordapp.com/developers/docs/reference#snowflakes - """ - # Ignore DMs; can't delete messages in there anyway. - if not msg.guild or msg.author.bot: - return - - found_token = self.find_token_in_message(msg) - if found_token: - await self.take_action(msg, found_token) - - @Cog.listener() - async def on_message_edit(self, before: Message, after: Message) -> None: - """ - Check each edit for a string that matches Discord's token pattern. - - See: https://discordapp.com/developers/docs/reference#snowflakes - """ - await self.on_message(after) - - async def take_action(self, msg: Message, found_token: Token) -> None: - """Remove the `msg` containing the `found_token` and send a mod log message.""" - self.mod_log.ignore(Event.message_delete, msg.id) - - try: - await msg.delete() - except NotFound: - log.debug(f"Failed to remove token in message {msg.id}: message already deleted.") - return - - await msg.channel.send(DELETION_MESSAGE_TEMPLATE.format(mention=msg.author.mention)) - - log_message = self.format_log_message(msg, found_token) - userid_message, mention_everyone = await self.format_userid_log_message(msg, found_token) - log.debug(log_message) - - # Send pretty mod log embed to mod-alerts - await self.mod_log.send_log_message( - icon_url=Icons.token_removed, - colour=Colour(Colours.soft_red), - title="Token removed!", - text=log_message + "\n" + userid_message, - thumbnail=msg.author.display_avatar.url, - channel_id=Channels.mod_alerts, - ping_everyone=mention_everyone, - ) - - self.bot.stats.incr("tokens.removed_tokens") - - @classmethod - async def format_userid_log_message(cls, msg: Message, token: Token) -> t.Tuple[str, bool]: - """ - Format the portion of the log message that includes details about the detected user ID. - - If the user is resolved to a member, the format includes the user ID, name, and the - kind of user detected. - - If we resolve to a member and it is not a bot, we also return True to ping everyone. - - Returns a tuple of (log_message, mention_everyone) - """ - user_id = cls.extract_user_id(token.user_id) - user = await get_or_fetch_member(msg.guild, user_id) - - if user: - return KNOWN_USER_LOG_MESSAGE.format( - user_id=user_id, - user_name=str(user), - kind="BOT" if user.bot else "USER", - ), True - else: - return UNKNOWN_USER_LOG_MESSAGE.format(user_id=user_id), False - - @staticmethod - def format_log_message(msg: Message, token: Token) -> str: - """Return the generic portion of the log message to send for `token` being censored in `msg`.""" - return LOG_MESSAGE.format( - author=format_user(msg.author), - channel=msg.channel.mention, - user_id=token.user_id, - timestamp=token.timestamp, - hmac='x' * (len(token.hmac) - 3) + token.hmac[-3:], - ) - - @classmethod - def find_token_in_message(cls, msg: Message) -> t.Optional[Token]: - """Return a seemingly valid token found in `msg` or `None` if no token is found.""" - # Use finditer rather than search to guard against method calls prematurely returning the - # token check (e.g. `message.channel.send` also matches our token pattern) - for match in TOKEN_RE.finditer(msg.content): - token = Token(*match.groups()) - if ( - (cls.extract_user_id(token.user_id) is not None) - and cls.is_valid_timestamp(token.timestamp) - and cls.is_maybe_valid_hmac(token.hmac) - ): - # Short-circuit on first match - return token - - # No matching substring - return - - @staticmethod - def extract_user_id(b64_content: str) -> t.Optional[int]: - """Return a user ID integer from part of a potential token, or None if it couldn't be decoded.""" - b64_content = utils.pad_base64(b64_content) - - try: - decoded_bytes = base64.urlsafe_b64decode(b64_content) - string = decoded_bytes.decode('utf-8') - if not (string.isascii() and string.isdigit()): - # This case triggers if there are fancy unicode digits in the base64 encoding, - # that means it's not a valid user id. - return None - return int(string) - except ValueError: - return None - - @staticmethod - def is_valid_timestamp(b64_content: str) -> bool: - """ - Return True if `b64_content` decodes to a valid timestamp. - - If the timestamp is greater than the Discord epoch, it's probably valid. - See: https://i.imgur.com/7WdehGn.png - """ - b64_content = utils.pad_base64(b64_content) - - try: - decoded_bytes = base64.urlsafe_b64decode(b64_content) - timestamp = int.from_bytes(decoded_bytes, byteorder="big") - except ValueError as e: - log.debug(f"Failed to decode token timestamp '{b64_content}': {e}") - return False - - # Seems like newer tokens don't need the epoch added, but add anyway since an upper bound - # is not checked. - if timestamp + TOKEN_EPOCH >= DISCORD_EPOCH: - return True - else: - log.debug(f"Invalid token timestamp '{b64_content}': smaller than Discord epoch") - return False - - @staticmethod - def is_maybe_valid_hmac(b64_content: str) -> bool: - """ - Determine if a given HMAC portion of a token is potentially valid. - - If the HMAC has 3 or less characters, it's probably a dummy value like "xxxxxxxxxx", - and thus the token can probably be skipped. - """ - unique = len(set(b64_content.lower())) - if unique <= 3: - log.debug( - f"Considering the HMAC {b64_content} a dummy because it has {unique}" - " case-insensitively unique characters" - ) - return False - else: - return True - - -async def setup(bot: Bot) -> None: - """Load the TokenRemover cog.""" - await bot.add_cog(TokenRemover(bot)) diff --git a/bot/exts/filters/webhook_remover.py b/bot/exts/filters/webhook_remover.py deleted file mode 100644 index b42613804..000000000 --- a/bot/exts/filters/webhook_remover.py +++ /dev/null @@ -1,94 +0,0 @@ -import re - -from discord import Colour, Message, NotFound -from discord.ext.commands import Cog - -from bot.bot import Bot -from bot.constants import Channels, Colours, Event, Icons -from bot.exts.moderation.modlog import ModLog -from bot.log import get_logger -from bot.utils.messages import format_user - -WEBHOOK_URL_RE = re.compile( - r"((?:https?:\/\/)?(?:ptb\.|canary\.)?discord(?:app)?\.com\/api\/webhooks\/\d+\/)\S+\/?", - re.IGNORECASE -) - -ALERT_MESSAGE_TEMPLATE = ( - "{user}, looks like you posted a Discord webhook URL. Therefore, your " - "message has been removed, and your webhook has been deleted. " - "You can re-create it if you wish to. If you believe this was a " - "mistake, please let us know." -) - -log = get_logger(__name__) - - -class WebhookRemover(Cog): - """Scan messages to detect Discord webhooks links.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @property - def mod_log(self) -> ModLog: - """Get current instance of `ModLog`.""" - return self.bot.get_cog("ModLog") - - async def delete_and_respond(self, msg: Message, redacted_url: str, *, webhook_deleted: bool) -> None: - """Delete `msg` and send a warning that it contained the Discord webhook `redacted_url`.""" - # Don't log this, due internal delete, not by user. Will make different entry. - self.mod_log.ignore(Event.message_delete, msg.id) - - try: - await msg.delete() - except NotFound: - log.debug(f"Failed to remove webhook in message {msg.id}: message already deleted.") - return - - await msg.channel.send(ALERT_MESSAGE_TEMPLATE.format(user=msg.author.mention)) - if webhook_deleted: - delete_state = "The webhook was successfully deleted." - else: - delete_state = "There was an error when deleting the webhook, it might have already been removed." - message = ( - f"{format_user(msg.author)} posted a Discord webhook URL to {msg.channel.mention}. {delete_state} " - f"Webhook URL was `{redacted_url}`" - ) - log.debug(message) - - # Send entry to moderation alerts. - await self.mod_log.send_log_message( - icon_url=Icons.token_removed, - colour=Colour(Colours.soft_red), - title="Discord webhook URL removed!", - text=message, - thumbnail=msg.author.display_avatar.url, - channel_id=Channels.mod_alerts - ) - - self.bot.stats.incr("tokens.removed_webhooks") - - @Cog.listener() - async def on_message(self, msg: Message) -> None: - """Check if a Discord webhook URL is in `message`.""" - # Ignore DMs; can't delete messages in there anyway. - if not msg.guild or msg.author.bot: - return - - matches = WEBHOOK_URL_RE.search(msg.content) - if matches: - async with self.bot.http_session.delete(matches[0]) as resp: - # The Discord API Returns a 204 NO CONTENT response on success. - deleted_successfully = resp.status == 204 - await self.delete_and_respond(msg, matches[1] + "xxx", webhook_deleted=deleted_successfully) - - @Cog.listener() - async def on_message_edit(self, before: Message, after: Message) -> None: - """Check if a Discord webhook URL is in the edited message `after`.""" - await self.on_message(after) - - -async def setup(bot: Bot) -> None: - """Load `WebhookRemover` cog.""" - await bot.add_cog(WebhookRemover(bot)) diff --git a/bot/exts/info/codeblock/_cog.py b/bot/exts/info/codeblock/_cog.py index 9027105d9..cc5862131 100644 --- a/bot/exts/info/codeblock/_cog.py +++ b/bot/exts/info/codeblock/_cog.py @@ -8,8 +8,6 @@ from discord.ext.commands import Cog from bot import constants from bot.bot import Bot -from bot.exts.filters.token_remover import TokenRemover -from bot.exts.filters.webhook_remover import WEBHOOK_URL_RE from bot.exts.info.codeblock._instructions import get_instructions from bot.log import get_logger from bot.utils import has_lines @@ -135,8 +133,6 @@ class CodeBlockCog(Cog, name="Code Block"): not message.author.bot and self.is_valid_channel(message.channel) and has_lines(message.content, constants.CodeBlock.minimum_lines) - and not TokenRemover.find_token_in_message(message) - and not WEBHOOK_URL_RE.search(message.content) ) @Cog.listener() diff --git a/bot/exts/moderation/watchchannels/_watchchannel.py b/bot/exts/moderation/watchchannels/_watchchannel.py index 46f9c296e..30b1f342b 100644 --- a/bot/exts/moderation/watchchannels/_watchchannel.py +++ b/bot/exts/moderation/watchchannels/_watchchannel.py @@ -14,8 +14,6 @@ from discord.ext.commands import Cog, Context from bot.bot import Bot from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig, Icons -from bot.exts.filters.token_remover import TokenRemover -from bot.exts.filters.webhook_remover import WEBHOOK_URL_RE from bot.exts.moderation.modlog import ModLog from bot.log import CustomLogger, get_logger from bot.pagination import LinePaginator @@ -235,9 +233,7 @@ class WatchChannel(metaclass=CogABCMeta): await self.send_header(msg) - if TokenRemover.find_token_in_message(msg) or WEBHOOK_URL_RE.search(msg.content): - cleaned_content = "Content is censored because it contains a bot or webhook token." - elif cleaned_content := msg.clean_content: + if cleaned_content := msg.clean_content: # Put all non-media URLs in a code block to prevent embeds media_urls = {embed.url for embed in msg.embeds if embed.type in ("image", "video")} for url in URL_RE.findall(cleaned_content): diff --git a/bot/rules/__init__.py b/bot/rules/__init__.py deleted file mode 100644 index a01ceae73..000000000 --- a/bot/rules/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# flake8: noqa - -from .attachments import apply as apply_attachments -from .burst import apply as apply_burst -from .burst_shared import apply as apply_burst_shared -from .chars import apply as apply_chars -from .discord_emojis import apply as apply_discord_emojis -from .duplicates import apply as apply_duplicates -from .links import apply as apply_links -from .mentions import apply as apply_mentions -from .newlines import apply as apply_newlines -from .role_mentions import apply as apply_role_mentions diff --git a/bot/rules/attachments.py b/bot/rules/attachments.py deleted file mode 100644 index 8903c385c..000000000 --- a/bot/rules/attachments.py +++ /dev/null @@ -1,26 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total attachments exceeding the limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if ( - msg.author == last_message.author - and len(msg.attachments) > 0 - ) - ) - total_recent_attachments = sum(len(msg.attachments) for msg in relevant_messages) - - if total_recent_attachments > config['max']: - return ( - f"sent {total_recent_attachments} attachments in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/burst.py b/bot/rules/burst.py deleted file mode 100644 index 25c5a2f33..000000000 --- a/bot/rules/burst.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects repeated messages sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - total_relevant = len(relevant_messages) - - if total_relevant > config['max']: - return ( - f"sent {total_relevant} messages in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/burst_shared.py b/bot/rules/burst_shared.py deleted file mode 100644 index bbe9271b3..000000000 --- a/bot/rules/burst_shared.py +++ /dev/null @@ -1,18 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects repeated messages sent by multiple users.""" - total_recent = len(recent_messages) - - if total_recent > config['max']: - return ( - f"sent {total_recent} messages in {config['interval']}s", - set(msg.author for msg in recent_messages), - recent_messages - ) - return None diff --git a/bot/rules/chars.py b/bot/rules/chars.py deleted file mode 100644 index 1f587422c..000000000 --- a/bot/rules/chars.py +++ /dev/null @@ -1,24 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total message char count exceeding the limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - - total_recent_chars = sum(len(msg.content) for msg in relevant_messages) - - if total_recent_chars > config['max']: - return ( - f"sent {total_recent_chars} characters in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/discord_emojis.py b/bot/rules/discord_emojis.py deleted file mode 100644 index d979ac5e7..000000000 --- a/bot/rules/discord_emojis.py +++ /dev/null @@ -1,34 +0,0 @@ -import re -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message -from emoji import demojize - -DISCORD_EMOJI_RE = re.compile(r"<:\w+:\d+>|:\w+:") -CODE_BLOCK_RE = re.compile(r"```.*?```", flags=re.DOTALL) - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total Discord emojis exceeding the limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - - # Get rid of code blocks in the message before searching for emojis. - # Convert Unicode emojis to :emoji: format to get their count. - total_emojis = sum( - len(DISCORD_EMOJI_RE.findall(demojize(CODE_BLOCK_RE.sub("", msg.content)))) - for msg in relevant_messages - ) - - if total_emojis > config['max']: - return ( - f"sent {total_emojis} emojis in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/duplicates.py b/bot/rules/duplicates.py deleted file mode 100644 index 8e4fbc12d..000000000 --- a/bot/rules/duplicates.py +++ /dev/null @@ -1,28 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects duplicated messages sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if ( - msg.author == last_message.author - and msg.content == last_message.content - and msg.content - ) - ) - - total_duplicated = len(relevant_messages) - - if total_duplicated > config['max']: - return ( - f"sent {total_duplicated} duplicated messages in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/links.py b/bot/rules/links.py deleted file mode 100644 index c46b783c5..000000000 --- a/bot/rules/links.py +++ /dev/null @@ -1,36 +0,0 @@ -import re -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - -LINK_RE = re.compile(r"(https?://[^\s]+)") - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total links exceeding the limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - total_links = 0 - messages_with_links = 0 - - for msg in relevant_messages: - total_matches = len(LINK_RE.findall(msg.content)) - if total_matches: - messages_with_links += 1 - total_links += total_matches - - # Only apply the filter if we found more than one message with - # links to prevent wrongfully firing the rule on users posting - # e.g. an installation log of pip packages from GitHub. - if total_links > config['max'] and messages_with_links > 1: - return ( - f"sent {total_links} links in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/mentions.py b/bot/rules/mentions.py deleted file mode 100644 index 6f5addad1..000000000 --- a/bot/rules/mentions.py +++ /dev/null @@ -1,28 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total mentions exceeding the limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - - total_recent_mentions = sum( - not user.bot - for msg in relevant_messages - for user in msg.mentions - ) - - if total_recent_mentions > config['max']: - return ( - f"sent {total_recent_mentions} mentions in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/bot/rules/newlines.py b/bot/rules/newlines.py deleted file mode 100644 index 4e66e1359..000000000 --- a/bot/rules/newlines.py +++ /dev/null @@ -1,45 +0,0 @@ -import re -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total newlines exceeding the set limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - - # Identify groups of newline characters and get group & total counts - exp = r"(\n+)" - newline_counts = [] - for msg in relevant_messages: - newline_counts += [len(group) for group in re.findall(exp, msg.content)] - total_recent_newlines = sum(newline_counts) - - # Get maximum newline group size - if newline_counts: - max_newline_group = max(newline_counts) - else: - # If no newlines are found, newline_counts will be an empty list, which will error out max() - max_newline_group = 0 - - # Check first for total newlines, if this passes then check for large groupings - if total_recent_newlines > config['max']: - return ( - f"sent {total_recent_newlines} newlines in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - elif max_newline_group > config['max_consecutive']: - return ( - f"sent {max_newline_group} consecutive newlines in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - - return None diff --git a/bot/rules/role_mentions.py b/bot/rules/role_mentions.py deleted file mode 100644 index 0649540b6..000000000 --- a/bot/rules/role_mentions.py +++ /dev/null @@ -1,24 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple - -from discord import Member, Message - - -async def apply( - last_message: Message, recent_messages: List[Message], config: Dict[str, int] -) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - """Detects total role mentions exceeding the limit sent by a single user.""" - relevant_messages = tuple( - msg - for msg in recent_messages - if msg.author == last_message.author - ) - - total_recent_mentions = sum(len(msg.role_mentions) for msg in relevant_messages) - - if total_recent_mentions > config['max']: - return ( - f"sent {total_recent_mentions} role mentions in {config['interval']}s", - (last_message.author,), - relevant_messages - ) - return None diff --git a/tests/bot/exts/filters/__init__.py b/tests/bot/exts/filters/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/tests/bot/exts/filters/__init__.py +++ /dev/null diff --git a/tests/bot/exts/filters/test_antimalware.py b/tests/bot/exts/filters/test_antimalware.py deleted file mode 100644 index 7282334e2..000000000 --- a/tests/bot/exts/filters/test_antimalware.py +++ /dev/null @@ -1,202 +0,0 @@ -import unittest -from unittest.mock import AsyncMock, Mock - -from discord import NotFound - -from bot.constants import Channels, STAFF_ROLES -from bot.exts.filters import antimalware -from tests.helpers import MockAttachment, MockBot, MockMessage, MockRole - - -class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): - """Test the AntiMalware cog.""" - - def setUp(self): - """Sets up fresh objects for each test.""" - self.bot = MockBot() - self.bot.filter_list_cache = { - "FILE_FORMAT.True": { - ".first": {}, - ".second": {}, - ".third": {}, - } - } - self.cog = antimalware.AntiMalware(self.bot) - self.message = MockMessage() - self.message.webhook_id = None - self.message.author.bot = None - self.whitelist = [".first", ".second", ".third"] - - async def test_message_with_allowed_attachment(self): - """Messages with allowed extensions should not be deleted""" - attachment = MockAttachment(filename="python.first") - self.message.attachments = [attachment] - - await self.cog.on_message(self.message) - self.message.delete.assert_not_called() - - async def test_message_without_attachment(self): - """Messages without attachments should result in no action.""" - await self.cog.on_message(self.message) - self.message.delete.assert_not_called() - - async def test_direct_message_with_attachment(self): - """Direct messages should have no action taken.""" - attachment = MockAttachment(filename="python.disallowed") - self.message.attachments = [attachment] - self.message.guild = None - - await self.cog.on_message(self.message) - - self.message.delete.assert_not_called() - - async def test_webhook_message_with_illegal_extension(self): - """A webhook message containing an illegal extension should be ignored.""" - attachment = MockAttachment(filename="python.disallowed") - self.message.webhook_id = 697140105563078727 - self.message.attachments = [attachment] - - await self.cog.on_message(self.message) - - self.message.delete.assert_not_called() - - async def test_bot_message_with_illegal_extension(self): - """A bot message containing an illegal extension should be ignored.""" - attachment = MockAttachment(filename="python.disallowed") - self.message.author.bot = 409107086526644234 - self.message.attachments = [attachment] - - await self.cog.on_message(self.message) - - self.message.delete.assert_not_called() - - async def test_message_with_illegal_extension_gets_deleted(self): - """A message containing an illegal extension should send an embed.""" - attachment = MockAttachment(filename="python.disallowed") - self.message.attachments = [attachment] - - await self.cog.on_message(self.message) - - self.message.delete.assert_called_once() - - async def test_message_send_by_staff(self): - """A message send by a member of staff should be ignored.""" - staff_role = MockRole(id=STAFF_ROLES[0]) - self.message.author.roles.append(staff_role) - attachment = MockAttachment(filename="python.disallowed") - self.message.attachments = [attachment] - - await self.cog.on_message(self.message) - - self.message.delete.assert_not_called() - - async def test_python_file_redirect_embed_description(self): - """A message containing a .py file should result in an embed redirecting the user to our paste site""" - attachment = MockAttachment(filename="python.py") - self.message.attachments = [attachment] - self.message.channel.send = AsyncMock() - - await self.cog.on_message(self.message) - self.message.channel.send.assert_called_once() - args, kwargs = self.message.channel.send.call_args - embed = kwargs.pop("embed") - - self.assertEqual(embed.description, antimalware.PY_EMBED_DESCRIPTION) - - async def test_txt_file_redirect_embed_description(self): - """A message containing a .txt/.json/.csv file should result in the correct embed.""" - test_values = ( - ("text", ".txt"), - ("json", ".json"), - ("csv", ".csv"), - ) - - for file_name, disallowed_extension in test_values: - with self.subTest(file_name=file_name, disallowed_extension=disallowed_extension): - - attachment = MockAttachment(filename=f"{file_name}{disallowed_extension}") - self.message.attachments = [attachment] - self.message.channel.send = AsyncMock() - antimalware.TXT_EMBED_DESCRIPTION = Mock() - antimalware.TXT_EMBED_DESCRIPTION.format.return_value = "test" - - await self.cog.on_message(self.message) - self.message.channel.send.assert_called_once() - args, kwargs = self.message.channel.send.call_args - embed = kwargs.pop("embed") - cmd_channel = self.bot.get_channel(Channels.bot_commands) - - self.assertEqual( - embed.description, - antimalware.TXT_EMBED_DESCRIPTION.format.return_value - ) - antimalware.TXT_EMBED_DESCRIPTION.format.assert_called_with( - blocked_extension=disallowed_extension, - cmd_channel_mention=cmd_channel.mention - ) - - async def test_other_disallowed_extension_embed_description(self): - """Test the description for a non .py/.txt/.json/.csv disallowed extension.""" - attachment = MockAttachment(filename="python.disallowed") - self.message.attachments = [attachment] - self.message.channel.send = AsyncMock() - antimalware.DISALLOWED_EMBED_DESCRIPTION = Mock() - antimalware.DISALLOWED_EMBED_DESCRIPTION.format.return_value = "test" - - await self.cog.on_message(self.message) - self.message.channel.send.assert_called_once() - args, kwargs = self.message.channel.send.call_args - embed = kwargs.pop("embed") - meta_channel = self.bot.get_channel(Channels.meta) - - self.assertEqual(embed.description, antimalware.DISALLOWED_EMBED_DESCRIPTION.format.return_value) - antimalware.DISALLOWED_EMBED_DESCRIPTION.format.assert_called_with( - joined_whitelist=", ".join(self.whitelist), - blocked_extensions_str=".disallowed", - meta_channel_mention=meta_channel.mention - ) - - async def test_removing_deleted_message_logs(self): - """Removing an already deleted message logs the correct message""" - attachment = MockAttachment(filename="python.disallowed") - self.message.attachments = [attachment] - self.message.delete = AsyncMock(side_effect=NotFound(response=Mock(status=""), message="")) - - with self.assertLogs(logger=antimalware.log, level="INFO"): - await self.cog.on_message(self.message) - self.message.delete.assert_called_once() - - async def test_message_with_illegal_attachment_logs(self): - """Deleting a message with an illegal attachment should result in a log.""" - attachment = MockAttachment(filename="python.disallowed") - self.message.attachments = [attachment] - - with self.assertLogs(logger=antimalware.log, level="INFO"): - await self.cog.on_message(self.message) - - async def test_get_disallowed_extensions(self): - """The return value should include all non-whitelisted extensions.""" - test_values = ( - ([], []), - (self.whitelist, []), - ([".first"], []), - ([".first", ".disallowed"], [".disallowed"]), - ([".disallowed"], [".disallowed"]), - ([".disallowed", ".illegal"], [".disallowed", ".illegal"]), - ) - - for extensions, expected_disallowed_extensions in test_values: - with self.subTest(extensions=extensions, expected_disallowed_extensions=expected_disallowed_extensions): - self.message.attachments = [MockAttachment(filename=f"filename{extension}") for extension in extensions] - disallowed_extensions = self.cog._get_disallowed_extensions(self.message) - self.assertCountEqual(disallowed_extensions, expected_disallowed_extensions) - - -class AntiMalwareSetupTests(unittest.IsolatedAsyncioTestCase): - """Tests setup of the `AntiMalware` cog.""" - - async def test_setup(self): - """Setup of the extension should call add_cog.""" - bot = MockBot() - await antimalware.setup(bot) - bot.add_cog.assert_awaited_once() diff --git a/tests/bot/exts/filters/test_antispam.py b/tests/bot/exts/filters/test_antispam.py deleted file mode 100644 index 6a0e4fded..000000000 --- a/tests/bot/exts/filters/test_antispam.py +++ /dev/null @@ -1,35 +0,0 @@ -import unittest - -from bot.exts.filters import antispam - - -class AntispamConfigurationValidationTests(unittest.TestCase): - """Tests validation of the antispam cog configuration.""" - - def test_default_antispam_config_is_valid(self): - """The default antispam configuration is valid.""" - validation_errors = antispam.validate_config() - self.assertEqual(validation_errors, {}) - - def test_unknown_rule_returns_error(self): - """Configuring an unknown rule returns an error.""" - self.assertEqual( - antispam.validate_config({'invalid-rule': {}}), - {'invalid-rule': "`invalid-rule` is not recognized as an antispam rule."} - ) - - def test_missing_keys_returns_error(self): - """Not configuring required keys returns an error.""" - keys = (('interval', 'max'), ('max', 'interval')) - for configured_key, unconfigured_key in keys: - with self.subTest( - configured_key=configured_key, - unconfigured_key=unconfigured_key - ): - config = {'burst': {configured_key: 10}} - error = f"Key `{unconfigured_key}` is required but not set for rule `burst`" - - self.assertEqual( - antispam.validate_config(config), - {'burst': error} - ) diff --git a/tests/bot/exts/filters/test_filtering.py b/tests/bot/exts/filters/test_filtering.py deleted file mode 100644 index bd26532f1..000000000 --- a/tests/bot/exts/filters/test_filtering.py +++ /dev/null @@ -1,40 +0,0 @@ -import unittest -from unittest.mock import patch - -from bot.exts.filters import filtering -from tests.helpers import MockBot, autospec - - -class FilteringCogTests(unittest.IsolatedAsyncioTestCase): - """Tests the `Filtering` cog.""" - - def setUp(self): - """Instantiate the bot and cog.""" - self.bot = MockBot() - with patch("botcore.utils.scheduling.create_task", new=lambda task, **_: task.close()): - self.cog = filtering.Filtering(self.bot) - - @autospec(filtering.Filtering, "_get_filterlist_items", pass_mocks=False, return_value=["TOKEN"]) - async def test_token_filter(self): - """Ensure that a filter token is correctly detected in a message.""" - messages = { - "": False, - "no matches": False, - "TOKEN": True, - - # See advisory https://github.com/python-discord/bot/security/advisories/GHSA-j8c3-8x46-8pp6 - "https://google.com TOKEN": True, - "https://google.com something else": False, - } - - for message, match in messages.items(): - with self.subTest(input=message, match=match): - result, _ = await self.cog._has_watch_regex_match(message) - - self.assertEqual( - match, - bool(result), - msg=f"Hit was {'expected' if match else 'not expected'} for this input." - ) - if result: - self.assertEqual("TOKEN", result.group()) diff --git a/tests/bot/exts/filters/test_security.py b/tests/bot/exts/filters/test_security.py deleted file mode 100644 index 007b7b1eb..000000000 --- a/tests/bot/exts/filters/test_security.py +++ /dev/null @@ -1,53 +0,0 @@ -import unittest - -from discord.ext.commands import NoPrivateMessage - -from bot.exts.filters import security -from tests.helpers import MockBot, MockContext - - -class SecurityCogTests(unittest.TestCase): - """Tests the `Security` cog.""" - - def setUp(self): - """Attach an instance of the cog to the class for tests.""" - self.bot = MockBot() - self.cog = security.Security(self.bot) - self.ctx = MockContext() - - def test_check_additions(self): - """The cog should add its checks after initialization.""" - self.bot.check.assert_any_call(self.cog.check_on_guild) - self.bot.check.assert_any_call(self.cog.check_not_bot) - - def test_check_not_bot_returns_false_for_humans(self): - """The bot check should return `True` when invoked with human authors.""" - self.ctx.author.bot = False - self.assertTrue(self.cog.check_not_bot(self.ctx)) - - def test_check_not_bot_returns_true_for_robots(self): - """The bot check should return `False` when invoked with robotic authors.""" - self.ctx.author.bot = True - self.assertFalse(self.cog.check_not_bot(self.ctx)) - - def test_check_on_guild_raises_when_outside_of_guild(self): - """When invoked outside of a guild, `check_on_guild` should cause an error.""" - self.ctx.guild = None - - with self.assertRaises(NoPrivateMessage, msg="This command cannot be used in private messages."): - self.cog.check_on_guild(self.ctx) - - def test_check_on_guild_returns_true_inside_of_guild(self): - """When invoked inside of a guild, `check_on_guild` should return `True`.""" - self.ctx.guild = "lemon's lemonade stand" - self.assertTrue(self.cog.check_on_guild(self.ctx)) - - -class SecurityCogLoadTests(unittest.IsolatedAsyncioTestCase): - """Tests loading the `Security` cog.""" - - async def test_security_cog_load(self): - """Setup of the extension should call add_cog.""" - bot = MockBot() - await security.setup(bot) - bot.add_cog.assert_awaited_once() diff --git a/tests/bot/exts/filters/test_token_remover.py b/tests/bot/exts/filters/test_token_remover.py deleted file mode 100644 index c1f3762ac..000000000 --- a/tests/bot/exts/filters/test_token_remover.py +++ /dev/null @@ -1,409 +0,0 @@ -import unittest -from re import Match -from unittest import mock -from unittest.mock import MagicMock - -from discord import Colour, NotFound - -from bot import constants -from bot.exts.filters import token_remover -from bot.exts.filters.token_remover import Token, TokenRemover -from bot.exts.moderation.modlog import ModLog -from bot.utils.messages import format_user -from tests.helpers import MockBot, MockMessage, autospec - - -class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): - """Tests the `TokenRemover` cog.""" - - def setUp(self): - """Adds the cog, a bot, and a message to the instance for usage in tests.""" - self.bot = MockBot() - self.cog = TokenRemover(bot=self.bot) - - self.msg = MockMessage(id=555, content="hello world") - self.msg.channel.mention = "#lemonade-stand" - self.msg.guild.get_member.return_value.bot = False - self.msg.guild.get_member.return_value.__str__.return_value = "Woody" - self.msg.author.__str__ = MagicMock(return_value=self.msg.author.name) - self.msg.author.display_avatar.url = "picture-lemon.png" - - def test_extract_user_id_valid(self): - """Should consider user IDs valid if they decode into an integer ID.""" - id_pairs = ( - ("NDcyMjY1OTQzMDYyNDEzMzMy", 472265943062413332), - ("NDc1MDczNjI5Mzk5NTQ3OTA0", 475073629399547904), - ("NDY3MjIzMjMwNjUwNzc3NjQx", 467223230650777641), - ) - - for token_id, user_id in id_pairs: - with self.subTest(token_id=token_id): - result = TokenRemover.extract_user_id(token_id) - self.assertEqual(result, user_id) - - def test_extract_user_id_invalid(self): - """Should consider non-digit and non-ASCII IDs invalid.""" - ids = ( - ("SGVsbG8gd29ybGQ", "non-digit ASCII"), - ("0J_RgNC40LLQtdGCINC80LjRgA", "cyrillic text"), - ("4pO14p6L4p6C4pG34p264pGl8J-EiOKSj-KCieKBsA", "Unicode digits"), - ("4oaA4oaB4oWh4oWi4Lyz4Lyq4Lyr4LG9", "Unicode numerals"), - ("8J2fjvCdn5nwnZ-k8J2fr_Cdn7rgravvvJngr6c", "Unicode decimals"), - ("{hello}[world]&(bye!)", "ASCII invalid Base64"), - ("Þíß-ï§-ňøẗ-våłìÐ", "Unicode invalid Base64"), - ) - - for user_id, msg in ids: - with self.subTest(msg=msg): - result = TokenRemover.extract_user_id(user_id) - self.assertIsNone(result) - - def test_is_valid_timestamp_valid(self): - """Should consider timestamps valid if they're greater than the Discord epoch.""" - timestamps = ( - "XsyRkw", - "Xrim9Q", - "XsyR-w", - "XsySD_", - "Dn9r_A", - ) - - for timestamp in timestamps: - with self.subTest(timestamp=timestamp): - result = TokenRemover.is_valid_timestamp(timestamp) - self.assertTrue(result) - - def test_is_valid_timestamp_invalid(self): - """Should consider timestamps invalid if they're before Discord epoch or can't be parsed.""" - timestamps = ( - ("B4Yffw", "DISCORD_EPOCH - TOKEN_EPOCH - 1"), - ("ew", "123"), - ("AoIKgA", "42076800"), - ("{hello}[world]&(bye!)", "ASCII invalid Base64"), - ("Þíß-ï§-ňøẗ-våłìÐ", "Unicode invalid Base64"), - ) - - for timestamp, msg in timestamps: - with self.subTest(msg=msg): - result = TokenRemover.is_valid_timestamp(timestamp) - self.assertFalse(result) - - def test_is_valid_hmac_valid(self): - """Should consider an HMAC valid if it has at least 3 unique characters.""" - valid_hmacs = ( - "VXmErH7j511turNpfURmb0rVNm8", - "Ysnu2wacjaKs7qnoo46S8Dm2us8", - "sJf6omBPORBPju3WJEIAcwW9Zds", - "s45jqDV_Iisn-symw0yDRrk_jf4", - ) - - for hmac in valid_hmacs: - with self.subTest(msg=hmac): - result = TokenRemover.is_maybe_valid_hmac(hmac) - self.assertTrue(result) - - def test_is_invalid_hmac_invalid(self): - """Should consider an HMAC invalid if has fewer than 3 unique characters.""" - invalid_hmacs = ( - ("xxxxxxxxxxxxxxxxxx", "Single character"), - ("XxXxXxXxXxXxXxXxXx", "Single character alternating case"), - ("ASFasfASFasfASFASsf", "Three characters alternating-case"), - ("asdasdasdasdasdasdasd", "Three characters one case"), - ) - - for hmac, msg in invalid_hmacs: - with self.subTest(msg=msg): - result = TokenRemover.is_maybe_valid_hmac(hmac) - self.assertFalse(result) - - def test_mod_log_property(self): - """The `mod_log` property should ask the bot to return the `ModLog` cog.""" - self.bot.get_cog.return_value = 'lemon' - self.assertEqual(self.cog.mod_log, self.bot.get_cog.return_value) - self.bot.get_cog.assert_called_once_with('ModLog') - - async def test_on_message_edit_uses_on_message(self): - """The edit listener should delegate handling of the message to the normal listener.""" - self.cog.on_message = mock.create_autospec(self.cog.on_message, spec_set=True) - - await self.cog.on_message_edit(MockMessage(), self.msg) - self.cog.on_message.assert_awaited_once_with(self.msg) - - @autospec(TokenRemover, "find_token_in_message", "take_action") - async def test_on_message_takes_action(self, find_token_in_message, take_action): - """Should take action if a valid token is found when a message is sent.""" - cog = TokenRemover(self.bot) - found_token = "foobar" - find_token_in_message.return_value = found_token - - await cog.on_message(self.msg) - - find_token_in_message.assert_called_once_with(self.msg) - take_action.assert_awaited_once_with(cog, self.msg, found_token) - - @autospec(TokenRemover, "find_token_in_message", "take_action") - async def test_on_message_skips_missing_token(self, find_token_in_message, take_action): - """Shouldn't take action if a valid token isn't found when a message is sent.""" - cog = TokenRemover(self.bot) - find_token_in_message.return_value = False - - await cog.on_message(self.msg) - - find_token_in_message.assert_called_once_with(self.msg) - take_action.assert_not_awaited() - - @autospec(TokenRemover, "find_token_in_message") - async def test_on_message_ignores_dms_bots(self, find_token_in_message): - """Shouldn't parse a message if it is a DM or authored by a bot.""" - cog = TokenRemover(self.bot) - dm_msg = MockMessage(guild=None) - bot_msg = MockMessage(author=MagicMock(bot=True)) - - for msg in (dm_msg, bot_msg): - await cog.on_message(msg) - find_token_in_message.assert_not_called() - - @autospec("bot.exts.filters.token_remover", "TOKEN_RE") - def test_find_token_no_matches(self, token_re): - """None should be returned if the regex matches no tokens in a message.""" - token_re.finditer.return_value = () - - return_value = TokenRemover.find_token_in_message(self.msg) - - self.assertIsNone(return_value) - token_re.finditer.assert_called_once_with(self.msg.content) - - @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac") - @autospec("bot.exts.filters.token_remover", "Token") - @autospec("bot.exts.filters.token_remover", "TOKEN_RE") - def test_find_token_valid_match( - self, - token_re, - token_cls, - extract_user_id, - is_valid_timestamp, - is_maybe_valid_hmac, - ): - """The first match with a valid user ID, timestamp, and HMAC should be returned as a `Token`.""" - matches = [ - mock.create_autospec(Match, spec_set=True, instance=True), - mock.create_autospec(Match, spec_set=True, instance=True), - ] - tokens = [ - mock.create_autospec(Token, spec_set=True, instance=True), - mock.create_autospec(Token, spec_set=True, instance=True), - ] - - token_re.finditer.return_value = matches - token_cls.side_effect = tokens - extract_user_id.side_effect = (None, True) # The 1st match will be invalid, 2nd one valid. - is_valid_timestamp.return_value = True - is_maybe_valid_hmac.return_value = True - - return_value = TokenRemover.find_token_in_message(self.msg) - - self.assertEqual(tokens[1], return_value) - token_re.finditer.assert_called_once_with(self.msg.content) - - @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac") - @autospec("bot.exts.filters.token_remover", "Token") - @autospec("bot.exts.filters.token_remover", "TOKEN_RE") - def test_find_token_invalid_matches( - self, - token_re, - token_cls, - extract_user_id, - is_valid_timestamp, - is_maybe_valid_hmac, - ): - """None should be returned if no matches have valid user IDs, HMACs, and timestamps.""" - token_re.finditer.return_value = [mock.create_autospec(Match, spec_set=True, instance=True)] - token_cls.return_value = mock.create_autospec(Token, spec_set=True, instance=True) - extract_user_id.return_value = None - is_valid_timestamp.return_value = False - is_maybe_valid_hmac.return_value = False - - return_value = TokenRemover.find_token_in_message(self.msg) - - self.assertIsNone(return_value) - token_re.finditer.assert_called_once_with(self.msg.content) - - def test_regex_invalid_tokens(self): - """Messages without anything looking like a token are not matched.""" - tokens = ( - "", - "lemon wins", - "..", - "x.y", - "x.y.", - ".y.z", - ".y.", - "..z", - "x..z", - " . . ", - "\n.\n.\n", - "hellö.world.bye", - "base64.nötbåse64.morebase64", - "19jd3J.dfkm3d.€víł§tüff", - ) - - for token in tokens: - with self.subTest(token=token): - results = token_remover.TOKEN_RE.findall(token) - self.assertEqual(len(results), 0) - - def test_regex_valid_tokens(self): - """Messages that look like tokens should be matched.""" - # Don't worry, these tokens have been invalidated. - tokens = ( - "NDcyMjY1OTQzMDYy_DEzMz-y.XsyRkw.VXmErH7j511turNpfURmb0rVNm8", - "NDcyMjY1OTQzMDYyNDEzMzMy.Xrim9Q.Ysnu2wacjaKs7qnoo46S8Dm2us8", - "NDc1MDczNjI5Mzk5NTQ3OTA0.XsyR-w.sJf6omBPORBPju3WJEIAcwW9Zds", - "NDY3MjIzMjMwNjUwNzc3NjQx.XsySD_.s45jqDV_Iisn-symw0yDRrk_jf4", - ) - - for token in tokens: - with self.subTest(token=token): - results = token_remover.TOKEN_RE.fullmatch(token) - self.assertIsNotNone(results, f"{token} was not matched by the regex") - - def test_regex_matches_multiple_valid(self): - """Should support multiple matches in the middle of a string.""" - token_1 = "NDY3MjIzMjMwNjUwNzc3NjQx.XsyWGg.uFNEQPCc4ePwGh7egG8UicQssz8" - token_2 = "NDcyMjY1OTQzMDYyNDEzMzMy.XsyWMw.l8XPnDqb0lp-EiQ2g_0xVFT1pyc" - message = f"garbage {token_1} hello {token_2} world" - - results = token_remover.TOKEN_RE.finditer(message) - results = [match[0] for match in results] - self.assertCountEqual((token_1, token_2), results) - - @autospec("bot.exts.filters.token_remover", "LOG_MESSAGE") - def test_format_log_message(self, log_message): - """Should correctly format the log message with info from the message and token.""" - token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") - log_message.format.return_value = "Howdy" - - return_value = TokenRemover.format_log_message(self.msg, token) - - self.assertEqual(return_value, log_message.format.return_value) - log_message.format.assert_called_once_with( - author=format_user(self.msg.author), - channel=self.msg.channel.mention, - user_id=token.user_id, - timestamp=token.timestamp, - hmac="xxxxxxxxxxxxxxxxxxxxxxxxjf4", - ) - - @autospec("bot.exts.filters.token_remover", "UNKNOWN_USER_LOG_MESSAGE") - async def test_format_userid_log_message_unknown(self, unknown_user_log_message,): - """Should correctly format the user ID portion when the actual user it belongs to is unknown.""" - token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") - unknown_user_log_message.format.return_value = " Partner" - msg = MockMessage(id=555, content="hello world") - msg.guild.get_member.return_value = None - msg.guild.fetch_member.side_effect = NotFound(mock.Mock(status=404), "Not found") - - return_value = await TokenRemover.format_userid_log_message(msg, token) - - self.assertEqual(return_value, (unknown_user_log_message.format.return_value, False)) - unknown_user_log_message.format.assert_called_once_with(user_id=472265943062413332) - - @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE") - async def test_format_userid_log_message_bot(self, known_user_log_message): - """Should correctly format the user ID portion when the ID belongs to a known bot.""" - token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") - known_user_log_message.format.return_value = " Partner" - msg = MockMessage(id=555, content="hello world") - msg.guild.get_member.return_value.__str__.return_value = "Sam" - msg.guild.get_member.return_value.bot = True - - return_value = await TokenRemover.format_userid_log_message(msg, token) - - self.assertEqual(return_value, (known_user_log_message.format.return_value, True)) - - known_user_log_message.format.assert_called_once_with( - user_id=472265943062413332, - user_name="Sam", - kind="BOT", - ) - - @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE") - async def test_format_log_message_user_token_user(self, user_token_message): - """Should correctly format the user ID portion when the ID belongs to a known user.""" - token = Token("NDY3MjIzMjMwNjUwNzc3NjQx", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") - user_token_message.format.return_value = "Partner" - - return_value = await TokenRemover.format_userid_log_message(self.msg, token) - - self.assertEqual(return_value, (user_token_message.format.return_value, True)) - user_token_message.format.assert_called_once_with( - user_id=467223230650777641, - user_name="Woody", - kind="USER", - ) - - @mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock) - @autospec("bot.exts.filters.token_remover", "log") - @autospec(TokenRemover, "format_log_message", "format_userid_log_message") - async def test_take_action(self, format_log_message, format_userid_log_message, logger, mod_log_property): - """Should delete the message and send a mod log.""" - cog = TokenRemover(self.bot) - mod_log = mock.create_autospec(ModLog, spec_set=True, instance=True) - token = mock.create_autospec(Token, spec_set=True, instance=True) - token.user_id = "no-id" - log_msg = "testing123" - userid_log_message = "userid-log-message" - - mod_log_property.return_value = mod_log - format_log_message.return_value = log_msg - format_userid_log_message.return_value = (userid_log_message, True) - - await cog.take_action(self.msg, token) - - self.msg.delete.assert_called_once_with() - self.msg.channel.send.assert_called_once_with( - token_remover.DELETION_MESSAGE_TEMPLATE.format(mention=self.msg.author.mention) - ) - - format_log_message.assert_called_once_with(self.msg, token) - format_userid_log_message.assert_called_once_with(self.msg, token) - logger.debug.assert_called_with(log_msg) - self.bot.stats.incr.assert_called_once_with("tokens.removed_tokens") - - mod_log.ignore.assert_called_once_with(constants.Event.message_delete, self.msg.id) - mod_log.send_log_message.assert_called_once_with( - icon_url=constants.Icons.token_removed, - colour=Colour(constants.Colours.soft_red), - title="Token removed!", - text=log_msg + "\n" + userid_log_message, - thumbnail=self.msg.author.display_avatar.url, - channel_id=constants.Channels.mod_alerts, - ping_everyone=True, - ) - - @mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock) - async def test_take_action_delete_failure(self, mod_log_property): - """Shouldn't send any messages if the token message can't be deleted.""" - cog = TokenRemover(self.bot) - mod_log_property.return_value = mock.create_autospec(ModLog, spec_set=True, instance=True) - self.msg.delete.side_effect = NotFound(MagicMock(), MagicMock()) - - token = mock.create_autospec(Token, spec_set=True, instance=True) - await cog.take_action(self.msg, token) - - self.msg.delete.assert_called_once_with() - self.msg.channel.send.assert_not_awaited() - - -class TokenRemoverExtensionTests(unittest.IsolatedAsyncioTestCase): - """Tests for the token_remover extension.""" - - @autospec("bot.exts.filters.token_remover", "TokenRemover") - async def test_extension_setup(self, cog): - """The TokenRemover cog should be added.""" - bot = MockBot() - await token_remover.setup(bot) - - cog.assert_called_once_with(bot) - bot.add_cog.assert_awaited_once() - self.assertTrue(isinstance(bot.add_cog.call_args.args[0], TokenRemover)) diff --git a/tests/bot/rules/__init__.py b/tests/bot/rules/__init__.py deleted file mode 100644 index 0d570f5a3..000000000 --- a/tests/bot/rules/__init__.py +++ /dev/null @@ -1,76 +0,0 @@ -import unittest -from abc import ABCMeta, abstractmethod -from typing import Callable, Dict, Iterable, List, NamedTuple, Tuple - -from tests.helpers import MockMessage - - -class DisallowedCase(NamedTuple): - """Encapsulation for test cases expected to fail.""" - recent_messages: List[MockMessage] - culprits: Iterable[str] - n_violations: int - - -class RuleTest(unittest.IsolatedAsyncioTestCase, metaclass=ABCMeta): - """ - Abstract class for antispam rule test cases. - - Tests for specific rules should inherit from `RuleTest` and implement - `relevant_messages` and `get_report`. Each instance should also set the - `apply` and `config` attributes as necessary. - - The execution of test cases can then be delegated to the `run_allowed` - and `run_disallowed` methods. - """ - - apply: Callable # The tested rule's apply function - config: Dict[str, int] - - async def run_allowed(self, cases: Tuple[List[MockMessage], ...]) -> None: - """Run all `cases` against `self.apply` expecting them to pass.""" - for recent_messages in cases: - last_message = recent_messages[0] - - with self.subTest( - last_message=last_message, - recent_messages=recent_messages, - config=self.config, - ): - self.assertIsNone( - await self.apply(last_message, recent_messages, self.config) - ) - - async def run_disallowed(self, cases: Tuple[DisallowedCase, ...]) -> None: - """Run all `cases` against `self.apply` expecting them to fail.""" - for case in cases: - recent_messages, culprits, n_violations = case - last_message = recent_messages[0] - relevant_messages = self.relevant_messages(case) - desired_output = ( - self.get_report(case), - culprits, - relevant_messages, - ) - - with self.subTest( - last_message=last_message, - recent_messages=recent_messages, - relevant_messages=relevant_messages, - n_violations=n_violations, - config=self.config, - ): - self.assertTupleEqual( - await self.apply(last_message, recent_messages, self.config), - desired_output, - ) - - @abstractmethod - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - """Give expected relevant messages for `case`.""" - raise NotImplementedError # pragma: no cover - - @abstractmethod - def get_report(self, case: DisallowedCase) -> str: - """Give expected error report for `case`.""" - raise NotImplementedError # pragma: no cover diff --git a/tests/bot/rules/test_attachments.py b/tests/bot/rules/test_attachments.py deleted file mode 100644 index d7e779221..000000000 --- a/tests/bot/rules/test_attachments.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import Iterable - -from bot.rules import attachments -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str, total_attachments: int) -> MockMessage: - """Builds a message with `total_attachments` attachments.""" - return MockMessage(author=author, attachments=list(range(total_attachments))) - - -class AttachmentRuleTests(RuleTest): - """Tests applying the `attachments` antispam rule.""" - - def setUp(self): - self.apply = attachments.apply - self.config = {"max": 5, "interval": 10} - - async def test_allows_messages_without_too_many_attachments(self): - """Messages without too many attachments are allowed as-is.""" - cases = ( - [make_msg("bob", 0), make_msg("bob", 0), make_msg("bob", 0)], - [make_msg("bob", 2), make_msg("bob", 2)], - [make_msg("bob", 2), make_msg("alice", 2), make_msg("bob", 2)], - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_with_too_many_attachments(self): - """Messages with too many attachments trigger the rule.""" - cases = ( - DisallowedCase( - [make_msg("bob", 4), make_msg("bob", 0), make_msg("bob", 6)], - ("bob",), - 10, - ), - DisallowedCase( - [make_msg("bob", 4), make_msg("alice", 6), make_msg("bob", 2)], - ("bob",), - 6, - ), - DisallowedCase( - [make_msg("alice", 6)], - ("alice",), - 6, - ), - DisallowedCase( - [make_msg("alice", 1) for _ in range(6)], - ("alice",), - 6, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_message = case.recent_messages[0] - return tuple( - msg - for msg in case.recent_messages - if ( - msg.author == last_message.author - and len(msg.attachments) > 0 - ) - ) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} attachments in {self.config['interval']}s" diff --git a/tests/bot/rules/test_burst.py b/tests/bot/rules/test_burst.py deleted file mode 100644 index 03682966b..000000000 --- a/tests/bot/rules/test_burst.py +++ /dev/null @@ -1,54 +0,0 @@ -from typing import Iterable - -from bot.rules import burst -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str) -> MockMessage: - """ - Init a MockMessage instance with author set to `author`. - - This serves as a shorthand / alias to keep the test cases visually clean. - """ - return MockMessage(author=author) - - -class BurstRuleTests(RuleTest): - """Tests the `burst` antispam rule.""" - - def setUp(self): - self.apply = burst.apply - self.config = {"max": 2, "interval": 10} - - async def test_allows_messages_within_limit(self): - """Cases which do not violate the rule.""" - cases = ( - [make_msg("bob"), make_msg("bob")], - [make_msg("bob"), make_msg("alice"), make_msg("bob")], - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_beyond_limit(self): - """Cases where the amount of messages exceeds the limit, triggering the rule.""" - cases = ( - DisallowedCase( - [make_msg("bob"), make_msg("bob"), make_msg("bob")], - ("bob",), - 3, - ), - DisallowedCase( - [make_msg("bob"), make_msg("bob"), make_msg("alice"), make_msg("bob")], - ("bob",), - 3, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - return tuple(msg for msg in case.recent_messages if msg.author in case.culprits) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} messages in {self.config['interval']}s" diff --git a/tests/bot/rules/test_burst_shared.py b/tests/bot/rules/test_burst_shared.py deleted file mode 100644 index 3275143d5..000000000 --- a/tests/bot/rules/test_burst_shared.py +++ /dev/null @@ -1,57 +0,0 @@ -from typing import Iterable - -from bot.rules import burst_shared -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str) -> MockMessage: - """ - Init a MockMessage instance with the passed arg. - - This serves as a shorthand / alias to keep the test cases visually clean. - """ - return MockMessage(author=author) - - -class BurstSharedRuleTests(RuleTest): - """Tests the `burst_shared` antispam rule.""" - - def setUp(self): - self.apply = burst_shared.apply - self.config = {"max": 2, "interval": 10} - - async def test_allows_messages_within_limit(self): - """ - Cases that do not violate the rule. - - There really isn't more to test here than a single case. - """ - cases = ( - [make_msg("spongebob"), make_msg("patrick")], - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_beyond_limit(self): - """Cases where the amount of messages exceeds the limit, triggering the rule.""" - cases = ( - DisallowedCase( - [make_msg("bob"), make_msg("bob"), make_msg("bob")], - {"bob"}, - 3, - ), - DisallowedCase( - [make_msg("bob"), make_msg("bob"), make_msg("alice"), make_msg("bob")], - {"bob", "alice"}, - 4, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - return case.recent_messages - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} messages in {self.config['interval']}s" diff --git a/tests/bot/rules/test_chars.py b/tests/bot/rules/test_chars.py deleted file mode 100644 index f1e3c76a7..000000000 --- a/tests/bot/rules/test_chars.py +++ /dev/null @@ -1,64 +0,0 @@ -from typing import Iterable - -from bot.rules import chars -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str, n_chars: int) -> MockMessage: - """Build a message with arbitrary content of `n_chars` length.""" - return MockMessage(author=author, content="A" * n_chars) - - -class CharsRuleTests(RuleTest): - """Tests the `chars` antispam rule.""" - - def setUp(self): - self.apply = chars.apply - self.config = { - "max": 20, # Max allowed sum of chars per user - "interval": 10, - } - - async def test_allows_messages_within_limit(self): - """Cases with a total amount of chars within limit.""" - cases = ( - [make_msg("bob", 0)], - [make_msg("bob", 20)], - [make_msg("bob", 15), make_msg("alice", 15)], - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_beyond_limit(self): - """Cases where the total amount of chars exceeds the limit, triggering the rule.""" - cases = ( - DisallowedCase( - [make_msg("bob", 21)], - ("bob",), - 21, - ), - DisallowedCase( - [make_msg("bob", 15), make_msg("bob", 15)], - ("bob",), - 30, - ), - DisallowedCase( - [make_msg("alice", 15), make_msg("bob", 20), make_msg("alice", 15)], - ("alice",), - 30, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_message = case.recent_messages[0] - return tuple( - msg - for msg in case.recent_messages - if msg.author == last_message.author - ) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} characters in {self.config['interval']}s" diff --git a/tests/bot/rules/test_discord_emojis.py b/tests/bot/rules/test_discord_emojis.py deleted file mode 100644 index 66c2d9f92..000000000 --- a/tests/bot/rules/test_discord_emojis.py +++ /dev/null @@ -1,73 +0,0 @@ -from typing import Iterable - -from bot.rules import discord_emojis -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - -discord_emoji = "<:abcd:1234>" # Discord emojis follow the format <:name:id> -unicode_emoji = "🧪" - - -def make_msg(author: str, n_emojis: int, emoji: str = discord_emoji) -> MockMessage: - """Build a MockMessage instance with content containing `n_emojis` arbitrary emojis.""" - return MockMessage(author=author, content=emoji * n_emojis) - - -class DiscordEmojisRuleTests(RuleTest): - """Tests for the `discord_emojis` antispam rule.""" - - def setUp(self): - self.apply = discord_emojis.apply - self.config = {"max": 2, "interval": 10} - - async def test_allows_messages_within_limit(self): - """Cases with a total amount of discord and unicode emojis within limit.""" - cases = ( - [make_msg("bob", 2)], - [make_msg("alice", 1), make_msg("bob", 2), make_msg("alice", 1)], - [make_msg("bob", 2, unicode_emoji)], - [ - make_msg("alice", 1, unicode_emoji), - make_msg("bob", 2, unicode_emoji), - make_msg("alice", 1, unicode_emoji) - ], - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_beyond_limit(self): - """Cases with more than the allowed amount of discord and unicode emojis.""" - cases = ( - DisallowedCase( - [make_msg("bob", 3)], - ("bob",), - 3, - ), - DisallowedCase( - [make_msg("alice", 2), make_msg("bob", 2), make_msg("alice", 2)], - ("alice",), - 4, - ), - DisallowedCase( - [make_msg("bob", 3, unicode_emoji)], - ("bob",), - 3, - ), - DisallowedCase( - [ - make_msg("alice", 2, unicode_emoji), - make_msg("bob", 2, unicode_emoji), - make_msg("alice", 2, unicode_emoji) - ], - ("alice",), - 4 - ) - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - return tuple(msg for msg in case.recent_messages if msg.author in case.culprits) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} emojis in {self.config['interval']}s" diff --git a/tests/bot/rules/test_duplicates.py b/tests/bot/rules/test_duplicates.py deleted file mode 100644 index 9bd886a77..000000000 --- a/tests/bot/rules/test_duplicates.py +++ /dev/null @@ -1,64 +0,0 @@ -from typing import Iterable - -from bot.rules import duplicates -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str, content: str) -> MockMessage: - """Give a MockMessage instance with `author` and `content` attrs.""" - return MockMessage(author=author, content=content) - - -class DuplicatesRuleTests(RuleTest): - """Tests the `duplicates` antispam rule.""" - - def setUp(self): - self.apply = duplicates.apply - self.config = {"max": 2, "interval": 10} - - async def test_allows_messages_within_limit(self): - """Cases which do not violate the rule.""" - cases = ( - [make_msg("alice", "A"), make_msg("alice", "A")], - [make_msg("alice", "A"), make_msg("alice", "B"), make_msg("alice", "C")], # Non-duplicate - [make_msg("alice", "A"), make_msg("bob", "A"), make_msg("alice", "A")], # Different author - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_beyond_limit(self): - """Cases with too many duplicate messages from the same author.""" - cases = ( - DisallowedCase( - [make_msg("alice", "A"), make_msg("alice", "A"), make_msg("alice", "A")], - ("alice",), - 3, - ), - DisallowedCase( - [make_msg("bob", "A"), make_msg("alice", "A"), make_msg("bob", "A"), make_msg("bob", "A")], - ("bob",), - 3, # 4 duplicate messages, but only 3 from bob - ), - DisallowedCase( - [make_msg("bob", "A"), make_msg("bob", "B"), make_msg("bob", "A"), make_msg("bob", "A")], - ("bob",), - 3, # 4 message from bob, but only 3 duplicates - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_message = case.recent_messages[0] - return tuple( - msg - for msg in case.recent_messages - if ( - msg.author == last_message.author - and msg.content == last_message.content - ) - ) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} duplicated messages in {self.config['interval']}s" diff --git a/tests/bot/rules/test_links.py b/tests/bot/rules/test_links.py deleted file mode 100644 index b091bd9d7..000000000 --- a/tests/bot/rules/test_links.py +++ /dev/null @@ -1,67 +0,0 @@ -from typing import Iterable - -from bot.rules import links -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str, total_links: int) -> MockMessage: - """Makes a message with `total_links` links.""" - content = " ".join(["https://pydis.com"] * total_links) - return MockMessage(author=author, content=content) - - -class LinksTests(RuleTest): - """Tests applying the `links` rule.""" - - def setUp(self): - self.apply = links.apply - self.config = { - "max": 2, - "interval": 10 - } - - async def test_links_within_limit(self): - """Messages with an allowed amount of links.""" - cases = ( - [make_msg("bob", 0)], - [make_msg("bob", 2)], - [make_msg("bob", 3)], # Filter only applies if len(messages_with_links) > 1 - [make_msg("bob", 1), make_msg("bob", 1)], - [make_msg("bob", 2), make_msg("alice", 2)] # Only messages from latest author count - ) - - await self.run_allowed(cases) - - async def test_links_exceeding_limit(self): - """Messages with a a higher than allowed amount of links.""" - cases = ( - DisallowedCase( - [make_msg("bob", 1), make_msg("bob", 2)], - ("bob",), - 3 - ), - DisallowedCase( - [make_msg("alice", 1), make_msg("alice", 1), make_msg("alice", 1)], - ("alice",), - 3 - ), - DisallowedCase( - [make_msg("alice", 2), make_msg("bob", 3), make_msg("alice", 1)], - ("alice",), - 3 - ) - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_message = case.recent_messages[0] - return tuple( - msg - for msg in case.recent_messages - if msg.author == last_message.author - ) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} links in {self.config['interval']}s" diff --git a/tests/bot/rules/test_mentions.py b/tests/bot/rules/test_mentions.py deleted file mode 100644 index f8805ac48..000000000 --- a/tests/bot/rules/test_mentions.py +++ /dev/null @@ -1,83 +0,0 @@ -from typing import Iterable - -from bot.rules import mentions -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMember, MockMessage - - -def make_msg(author: str, total_user_mentions: int, total_bot_mentions: int = 0) -> MockMessage: - """Makes a message with `total_mentions` mentions.""" - user_mentions = [MockMember() for _ in range(total_user_mentions)] - bot_mentions = [MockMember(bot=True) for _ in range(total_bot_mentions)] - return MockMessage(author=author, mentions=user_mentions+bot_mentions) - - -class TestMentions(RuleTest): - """Tests applying the `mentions` antispam rule.""" - - def setUp(self): - self.apply = mentions.apply - self.config = { - "max": 2, - "interval": 10, - } - - async def test_mentions_within_limit(self): - """Messages with an allowed amount of mentions.""" - cases = ( - [make_msg("bob", 0)], - [make_msg("bob", 2)], - [make_msg("bob", 1), make_msg("bob", 1)], - [make_msg("bob", 1), make_msg("alice", 2)], - ) - - await self.run_allowed(cases) - - async def test_mentions_exceeding_limit(self): - """Messages with a higher than allowed amount of mentions.""" - cases = ( - DisallowedCase( - [make_msg("bob", 3)], - ("bob",), - 3, - ), - DisallowedCase( - [make_msg("alice", 2), make_msg("alice", 0), make_msg("alice", 1)], - ("alice",), - 3, - ), - DisallowedCase( - [make_msg("bob", 2), make_msg("alice", 3), make_msg("bob", 2)], - ("bob",), - 4, - ), - DisallowedCase( - [make_msg("bob", 3, 1)], - ("bob",), - 3, - ), - ) - - await self.run_disallowed(cases) - - async def test_ignore_bot_mentions(self): - """Messages with an allowed amount of mentions, also containing bot mentions.""" - cases = ( - [make_msg("bob", 0, 3)], - [make_msg("bob", 2, 1)], - [make_msg("bob", 1, 2), make_msg("bob", 1, 2)], - [make_msg("bob", 1, 5), make_msg("alice", 2, 5)] - ) - - await self.run_allowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_message = case.recent_messages[0] - return tuple( - msg - for msg in case.recent_messages - if msg.author == last_message.author - ) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} mentions in {self.config['interval']}s" diff --git a/tests/bot/rules/test_newlines.py b/tests/bot/rules/test_newlines.py deleted file mode 100644 index e35377773..000000000 --- a/tests/bot/rules/test_newlines.py +++ /dev/null @@ -1,102 +0,0 @@ -from typing import Iterable, List - -from bot.rules import newlines -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str, newline_groups: List[int]) -> MockMessage: - """Init a MockMessage instance with `author` and content configured by `newline_groups". - - Configure content by passing a list of ints, where each int `n` will generate - a separate group of `n` newlines. - - Example: - newline_groups=[3, 1, 2] -> content="\n\n\n \n \n\n" - """ - content = " ".join("\n" * n for n in newline_groups) - return MockMessage(author=author, content=content) - - -class TotalNewlinesRuleTests(RuleTest): - """Tests the `newlines` antispam rule against allowed cases and total newline count violations.""" - - def setUp(self): - self.apply = newlines.apply - self.config = { - "max": 5, # Max sum of newlines in relevant messages - "max_consecutive": 3, # Max newlines in one group, in one message - "interval": 10, - } - - async def test_allows_messages_within_limit(self): - """Cases which do not violate the rule.""" - cases = ( - [make_msg("alice", [])], # Single message with no newlines - [make_msg("alice", [1, 2]), make_msg("alice", [1, 1])], # 5 newlines in 2 messages - [make_msg("alice", [2, 2, 1]), make_msg("bob", [2, 3])], # 5 newlines from each author - [make_msg("bob", [1]), make_msg("alice", [5])], # Alice breaks the rule, but only bob is relevant - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_total(self): - """Cases which violate the rule by having too many newlines in total.""" - cases = ( - DisallowedCase( # Alice sends a total of 6 newlines (disallowed) - [make_msg("alice", [2, 2]), make_msg("alice", [2])], - ("alice",), - 6, - ), - DisallowedCase( # Here we test that only alice's newlines count in the sum - [make_msg("alice", [2, 2]), make_msg("bob", [3]), make_msg("alice", [3])], - ("alice",), - 7, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_author = case.recent_messages[0].author - return tuple(msg for msg in case.recent_messages if msg.author == last_author) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} newlines in {self.config['interval']}s" - - -class GroupNewlinesRuleTests(RuleTest): - """ - Tests the `newlines` antispam rule against max consecutive newline violations. - - As these violations yield a different error report, they require a different - `get_report` implementation. - """ - - def setUp(self): - self.apply = newlines.apply - self.config = {"max": 5, "max_consecutive": 3, "interval": 10} - - async def test_disallows_messages_consecutive(self): - """Cases which violate the rule due to having too many consecutive newlines.""" - cases = ( - DisallowedCase( # Bob sends a group of newlines too large - [make_msg("bob", [4])], - ("bob",), - 4, - ), - DisallowedCase( # Alice sends 5 in total (allowed), but 4 in one group (disallowed) - [make_msg("alice", [1]), make_msg("alice", [4])], - ("alice",), - 4, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_author = case.recent_messages[0].author - return tuple(msg for msg in case.recent_messages if msg.author == last_author) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} consecutive newlines in {self.config['interval']}s" diff --git a/tests/bot/rules/test_role_mentions.py b/tests/bot/rules/test_role_mentions.py deleted file mode 100644 index 26c05d527..000000000 --- a/tests/bot/rules/test_role_mentions.py +++ /dev/null @@ -1,55 +0,0 @@ -from typing import Iterable - -from bot.rules import role_mentions -from tests.bot.rules import DisallowedCase, RuleTest -from tests.helpers import MockMessage - - -def make_msg(author: str, n_mentions: int) -> MockMessage: - """Build a MockMessage instance with `n_mentions` role mentions.""" - return MockMessage(author=author, role_mentions=[None] * n_mentions) - - -class RoleMentionsRuleTests(RuleTest): - """Tests for the `role_mentions` antispam rule.""" - - def setUp(self): - self.apply = role_mentions.apply - self.config = {"max": 2, "interval": 10} - - async def test_allows_messages_within_limit(self): - """Cases with a total amount of role mentions within limit.""" - cases = ( - [make_msg("bob", 2)], - [make_msg("bob", 1), make_msg("alice", 1), make_msg("bob", 1)], - ) - - await self.run_allowed(cases) - - async def test_disallows_messages_beyond_limit(self): - """Cases with more than the allowed amount of role mentions.""" - cases = ( - DisallowedCase( - [make_msg("bob", 3)], - ("bob",), - 3, - ), - DisallowedCase( - [make_msg("alice", 2), make_msg("bob", 2), make_msg("alice", 2)], - ("alice",), - 4, - ), - ) - - await self.run_disallowed(cases) - - def relevant_messages(self, case: DisallowedCase) -> Iterable[MockMessage]: - last_message = case.recent_messages[0] - return tuple( - msg - for msg in case.recent_messages - if msg.author == last_message.author - ) - - def get_report(self, case: DisallowedCase) -> str: - return f"sent {case.n_violations} role mentions in {self.config['interval']}s" |