diff options
author | 2022-02-24 21:20:37 +0200 | |
---|---|---|
committer | 2022-07-16 02:08:35 +0300 | |
commit | 555be653507baeda16f069114dbf5e7a2752d6e3 (patch) | |
tree | 8efd9c80d60cf085f63519fe549a3eae21dcdf9b | |
parent | Accept strings in channel scope and change role string interpretation (diff) |
Add file extension filtering
This commmit migrates the AntiMalware cog to a new filter list which goes over a message's attachments.
Some changes were needed to accomodate the new list, primarily what a filter list returns for a given context:
Instead of returning a list of filters, it will return the action itself that should be taken. This adds the flexibility of not needing existing filters to dictate the action. For example, in the case of the extensions list, an action should be taken when filters were *not* triggered. Or more precisely, when not all attachment extensions are whitelisted. Therefore, the action in that case is dictated by the filter list (stored as the list's default actions).
Additionally each filter list can now return its own message for the alert embed, instead of the cog formatting it according to the filters raised. Because again, an action might be taken without any deny filters being triggered. This is going to be especially relevant for the invites list.
Additionally, the infraction_and_notification action now doesn't redirect the notification to the context channel when the DM fails, since this can be incredibly noisy in cases of spam. If we want this functionality, a more suitable solution should be found.
-rw-r--r-- | bot/exts/filtering/_filter_context.py | 6 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/extension.py | 92 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 10 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/token.py | 24 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/extension.py | 10 | ||||
-rw-r--r-- | bot/exts/filtering/_settings_types/infraction_and_notification.py | 14 | ||||
-rw-r--r-- | bot/exts/filtering/filtering.py | 56 |
7 files changed, 163 insertions, 49 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py index ee9e87f56..ad5c8636f 100644 --- a/bot/exts/filtering/_filter_context.py +++ b/bot/exts/filtering/_filter_context.py @@ -4,7 +4,7 @@ from dataclasses import dataclass, field, replace from enum import Enum, auto from typing import Optional, Union -from discord import DMChannel, Embed, Message, TextChannel, Thread, User +from discord import DMChannel, Message, TextChannel, Thread, User class Event(Enum): @@ -22,12 +22,12 @@ class FilterContext: event: Event # The type of event author: User # Who triggered the event channel: Union[TextChannel, Thread, DMChannel] # The channel involved - content: str # What actually needs filtering + content: Union[str, set[str]] # What actually needs filtering message: Optional[Message] # The message involved embeds: list = field(default_factory=list) # Any embeds involved # Output context dm_content: str = field(default_factory=str) # The content to DM the invoker - dm_embed: Embed = field(default_factory=Embed) # The embed to DM the invoker + dm_embed: str = field(default_factory=str) # The embed description to DM the invoker send_alert: bool = field(default=True) # Whether to send an alert for the moderators alert_content: str = field(default_factory=str) # The content of the alert alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py new file mode 100644 index 000000000..c55cda114 --- /dev/null +++ b/bot/exts/filtering/_filter_lists/extension.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import typing +from os.path import splitext +from typing import Optional + +import bot +from bot.constants import Channels, URLs +from bot.exts.filtering._filter_context import Event, FilterContext +from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType +from bot.exts.filtering._filters.extension import ExtensionFilter +from bot.exts.filtering._settings import ActionSettings + +if typing.TYPE_CHECKING: + from bot.exts.filtering.filtering import Filtering + + +PY_EMBED_DESCRIPTION = ( + "It looks like you tried to attach a Python file - " + f"please use a code-pasting service such as {URLs.site_schema}{URLs.site_paste}" +) + +TXT_LIKE_FILES = {".txt", ".csv", ".json"} +TXT_EMBED_DESCRIPTION = ( + "You either uploaded a `{blocked_extension}` file or entered a message that was too long. " + f"Please use our [paste bin]({URLs.site_schema}{URLs.site_paste}) instead." +) + +DISALLOWED_EMBED_DESCRIPTION = ( + "It looks like you tried to attach file type(s) that we do not allow ({blocked_extensions_str}). " + "We currently allow the following file types: **{joined_whitelist}**.\n\n" + "Feel free to ask in {meta_channel_mention} if you think this is a mistake." +) + + +class ExtensionsList(FilterList): + """A list of filters, each looking for an attachment with a specific extension.""" + + name = "extension" + + def __init__(self, filtering_cog: Filtering): + super().__init__(ExtensionFilter) + filtering_cog.subscribe(self, Event.MESSAGE) + self._whitelisted_description = None + + def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" + # Return early if the message doesn't have attachments. + if not ctx.message.attachments: + return None, "" + + # Find all extensions in the message. + all_ext = { + (splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments + } + new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read. + triggered = self.filter_list_result( + new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"] + ) + allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed. + + # See if there are any extensions left which aren't allowed. + not_allowed = {ext: filename for ext, filename in all_ext if ext not in allowed_ext} + + if not not_allowed: # Yes, it's a double negative. Meaning all attachments are allowed :) + return None, "" + + # Something is disallowed. + if ".py" in not_allowed: + # Provide a pastebin link for .py files. + ctx.dm_embed = PY_EMBED_DESCRIPTION + elif txt_extensions := {ext for ext in TXT_LIKE_FILES if ext in not_allowed}: + # Work around Discord auto-conversion of messages longer than 2000 chars to .txt + cmd_channel = bot.instance.get_channel(Channels.bot_commands) + ctx.dm_embed = TXT_EMBED_DESCRIPTION.format( + blocked_extension=txt_extensions.pop(), + cmd_channel_mention=cmd_channel.mention + ) + else: + meta_channel = bot.instance.get_channel(Channels.meta) + if not self._whitelisted_description: + self._whitelisted_description = ', '.join( + filter_.content for filter_ in self.filter_lists[ListType.ALLOW] + ) + ctx.dm_embed = DISALLOWED_EMBED_DESCRIPTION.format( + joined_whitelist=self._whitelisted_description, + blocked_extensions_str=", ".join(not_allowed), + meta_channel_mention=meta_channel.mention, + ) + + ctx.matches += not_allowed.values() + return self.defaults[ListType.ALLOW]["actions"], ", ".join(f"`{ext}`" for ext in not_allowed) diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index 1060f11db..9fb144354 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -1,12 +1,12 @@ from abc import abstractmethod from enum import Enum -from typing import Dict, List, Type +from typing import Dict, List, Optional, Type from discord.ext.commands import BadArgument, Context, Converter from bot.exts.filtering._filter_context import FilterContext from bot.exts.filtering._filters.filter import Filter -from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings +from bot.exts.filtering._settings import ActionSettings, ValidationSettings, create_settings from bot.exts.filtering._utils import FieldRequiring, past_tense from bot.log import get_logger @@ -45,7 +45,7 @@ class FilterList(FieldRequiring): def __init__(self, filter_type: Type[Filter]): self.filter_lists: dict[ListType, list[Filter]] = {} - self.defaults: dict[ListType, dict[str, Settings]] = {} + self.defaults = {} self.filter_type = filter_type @@ -64,8 +64,8 @@ class FilterList(FieldRequiring): self.filter_lists[list_type] = filters @abstractmethod - def triggers_for(self, ctx: FilterContext) -> list[Filter]: - """Dispatch the given event to the list's filters, and return filters triggered.""" + def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" @staticmethod def filter_list_result(ctx: FilterContext, filters: List[Filter], defaults: ValidationSettings) -> list[Filter]: diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index 01e586132..d4eb10591 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -2,11 +2,14 @@ from __future__ import annotations import re import typing +from functools import reduce +from operator import or_ +from typing import Optional from bot.exts.filtering._filter_context import Event, FilterContext from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType -from bot.exts.filtering._filters.filter import Filter from bot.exts.filtering._filters.token import TokenFilter +from bot.exts.filtering._settings import ActionSettings from bot.exts.filtering._utils import clean_input if typing.TYPE_CHECKING: @@ -24,17 +27,30 @@ class TokensList(FilterList): super().__init__(TokenFilter) filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT) - def triggers_for(self, ctx: FilterContext) -> list[Filter]: - """Dispatch the given event to the list's filters, and return filters triggered.""" + def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" text = ctx.content + if not text: + return None, "" if SPOILER_RE.search(text): text = self._expand_spoilers(text) text = clean_input(text) ctx = ctx.replace(content=text) - return self.filter_list_result( + triggers = self.filter_list_result( ctx, self.filter_lists[ListType.DENY], self.defaults[ListType.DENY]["validations"] ) + actions = None + message = "" + if triggers: + actions = reduce(or_, (filter_.actions for filter_ in triggers)) + if len(triggers) == 1: + message = f"#{triggers[0].id} (`{triggers[0].content}`)" + if triggers[0].description: + message += f" - {triggers[0].description}" + else: + message = ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers) + return actions, message @staticmethod def _expand_spoilers(text: str) -> str: diff --git a/bot/exts/filtering/_filters/extension.py b/bot/exts/filtering/_filters/extension.py new file mode 100644 index 000000000..85bfd05b2 --- /dev/null +++ b/bot/exts/filtering/_filters/extension.py @@ -0,0 +1,10 @@ +from bot.exts.filtering._filter_context import FilterContext +from bot.exts.filtering._filters.filter import Filter + + +class ExtensionFilter(Filter): + """A filter which looks for a specific attachment extension in messages.""" + + def triggered_on(self, ctx: FilterContext) -> bool: + """Searches for an attachment extension in the context content, given as a set of extensions.""" + return self.content in ctx.content diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py index 263fd851c..68ffa166f 100644 --- a/bot/exts/filtering/_settings_types/infraction_and_notification.py +++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py @@ -4,7 +4,7 @@ from enum import Enum, auto from typing import Any, Optional import arrow -from discord import Colour +from discord import Colour, Embed from discord.errors import Forbidden import bot @@ -74,22 +74,20 @@ class InfractionAndNotification(ActionEntry): # If there is no infraction to apply, any DM contents already provided in the context take precedence. if self.infraction_type == Infraction.NONE and (ctx.dm_content or ctx.dm_embed): dm_content = ctx.dm_content - dm_embed = ctx.dm_embed.description + dm_embed = ctx.dm_embed else: dm_content = self.dm_content dm_embed = self.dm_embed if dm_content or dm_embed: dm_content = f"Hey {ctx.author.mention}!\n{dm_content}" - ctx.dm_embed.description = dm_embed - if not ctx.dm_embed.colour: - ctx.dm_embed.colour = Colour.og_blurple() + dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) try: - await ctx.author.send(dm_content, embed=ctx.dm_embed) + await ctx.author.send(dm_content, embed=dm_embed) + ctx.action_descriptions.append("notified") except Forbidden: - await ctx.channel.send(ctx.dm_content, embed=ctx.dm_embed) - ctx.action_descriptions.append("notified") + ctx.action_descriptions.append("notified (failed)") msg_ctx = await bot.instance.get_context(ctx.message) msg_ctx.guild = bot.instance.get_guild(Guild.id) diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index d34b4928a..c22e7316f 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -12,7 +12,6 @@ from bot.bot import Bot from bot.constants import Colours, MODERATION_ROLES, Webhooks from bot.exts.filtering._filter_context import Event, FilterContext from bot.exts.filtering._filter_lists import FilterList, ListType, ListTypeConverter, filter_list_types -from bot.exts.filtering._filters.filter import Filter from bot.exts.filtering._settings import ActionSettings from bot.exts.filtering._ui import ArgumentCompletionView from bot.exts.filtering._utils import past_tense @@ -94,11 +93,11 @@ class Filtering(Cog): ctx = FilterContext(Event.MESSAGE, msg.author, msg.channel, msg.content, msg, msg.embeds) - triggered, result_actions = await self._resolve_action(ctx) + result_actions, list_messages = await self._resolve_action(ctx) if result_actions: await result_actions.action(ctx) - if ctx.send_alert: - await self._send_alert(ctx, triggered) + if ctx.send_alert: + await self._send_alert(ctx, list_messages) # endregion # region: blacklist commands @@ -178,23 +177,29 @@ class Filtering(Cog): async def _resolve_action( self, ctx: FilterContext - ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]: - """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event.""" - triggered = {} + ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]: + """ + Return the actions that should be taken for all filter lists in the given context. + + Additionally, a message is possibly provided from each filter list describing the triggers, + which should be relayed to the moderators. + """ + actions = [] + messages = {} for filter_list in self._subscriptions[ctx.event]: - result = filter_list.triggers_for(ctx) - if result: - triggered[filter_list] = result + list_actions, list_message = filter_list.actions_for(ctx) + if list_actions: + actions.append(list_actions) + if list_message: + messages[filter_list] = list_message result_actions = None - if triggered: - result_actions = reduce( - operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters) - ) + if actions: + result_actions = reduce(operator.or_, (action for action in actions)) - return triggered, result_actions + return result_actions, messages - async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[Filter]]) -> None: + async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, str]) -> None: """Build an alert message from the filter context, and send it via the alert webhook.""" if not self.webhook: return @@ -208,19 +213,12 @@ class Filtering(Cog): triggered_in = f"**Triggered in:** {format_channel(ctx.channel)}" else: triggered_in = "**DM**" - if len(triggered_filters) == 1 and len(list(triggered_filters.values())[0]) == 1: - filter_list, (filter_,) = next(iter(triggered_filters.items())) - filters = f"**{filter_list.name.title()} Filter:** #{filter_.id} (`{filter_.content}`)" - if filter_.description: - filters += f" - {filter_.description}" - else: - filters = [] - for filter_list, list_filters in triggered_filters.items(): - filters.append( - (f"**{filter_list.name.title()} Filters:** " - ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in list_filters)) - ) - filters = "\n".join(filters) + + filters = [] + for filter_list, list_message in triggered_filters.items(): + if list_message: + filters.append(f"**{filter_list.name.title()} Filters:** {list_message}") + filters = "\n".join(filters) matches = "**Matches:** " + ", ".join(repr(match) for match in ctx.matches) actions = "**Actions Taken:** " + (", ".join(ctx.action_descriptions) if ctx.action_descriptions else "-") |