diff options
-rw-r--r-- | bot/exts/filtering/_filter_context.py | 6 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/extension.py | 92 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 10 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/token.py | 24 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/extension.py | 10 | ||||
-rw-r--r-- | bot/exts/filtering/_settings_types/infraction_and_notification.py | 14 | ||||
-rw-r--r-- | bot/exts/filtering/filtering.py | 56 |
7 files changed, 163 insertions, 49 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py index ee9e87f56..ad5c8636f 100644 --- a/bot/exts/filtering/_filter_context.py +++ b/bot/exts/filtering/_filter_context.py @@ -4,7 +4,7 @@ from dataclasses import dataclass, field, replace from enum import Enum, auto from typing import Optional, Union -from discord import DMChannel, Embed, Message, TextChannel, Thread, User +from discord import DMChannel, Message, TextChannel, Thread, User class Event(Enum): @@ -22,12 +22,12 @@ class FilterContext: event: Event # The type of event author: User # Who triggered the event channel: Union[TextChannel, Thread, DMChannel] # The channel involved - content: str # What actually needs filtering + content: Union[str, set[str]] # What actually needs filtering message: Optional[Message] # The message involved embeds: list = field(default_factory=list) # Any embeds involved # Output context dm_content: str = field(default_factory=str) # The content to DM the invoker - dm_embed: Embed = field(default_factory=Embed) # The embed to DM the invoker + dm_embed: str = field(default_factory=str) # The embed description to DM the invoker send_alert: bool = field(default=True) # Whether to send an alert for the moderators alert_content: str = field(default_factory=str) # The content of the alert alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py new file mode 100644 index 000000000..c55cda114 --- /dev/null +++ b/bot/exts/filtering/_filter_lists/extension.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +import typing +from os.path import splitext +from typing import Optional + +import bot +from bot.constants import Channels, URLs +from bot.exts.filtering._filter_context import Event, FilterContext +from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType +from bot.exts.filtering._filters.extension import ExtensionFilter +from bot.exts.filtering._settings import ActionSettings + +if typing.TYPE_CHECKING: + from bot.exts.filtering.filtering import Filtering + + +PY_EMBED_DESCRIPTION = ( + "It looks like you tried to attach a Python file - " + f"please use a code-pasting service such as {URLs.site_schema}{URLs.site_paste}" +) + +TXT_LIKE_FILES = {".txt", ".csv", ".json"} +TXT_EMBED_DESCRIPTION = ( + "You either uploaded a `{blocked_extension}` file or entered a message that was too long. " + f"Please use our [paste bin]({URLs.site_schema}{URLs.site_paste}) instead." +) + +DISALLOWED_EMBED_DESCRIPTION = ( + "It looks like you tried to attach file type(s) that we do not allow ({blocked_extensions_str}). " + "We currently allow the following file types: **{joined_whitelist}**.\n\n" + "Feel free to ask in {meta_channel_mention} if you think this is a mistake." +) + + +class ExtensionsList(FilterList): + """A list of filters, each looking for an attachment with a specific extension.""" + + name = "extension" + + def __init__(self, filtering_cog: Filtering): + super().__init__(ExtensionFilter) + filtering_cog.subscribe(self, Event.MESSAGE) + self._whitelisted_description = None + + def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" + # Return early if the message doesn't have attachments. + if not ctx.message.attachments: + return None, "" + + # Find all extensions in the message. + all_ext = { + (splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments + } + new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read. + triggered = self.filter_list_result( + new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"] + ) + allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed. + + # See if there are any extensions left which aren't allowed. + not_allowed = {ext: filename for ext, filename in all_ext if ext not in allowed_ext} + + if not not_allowed: # Yes, it's a double negative. Meaning all attachments are allowed :) + return None, "" + + # Something is disallowed. + if ".py" in not_allowed: + # Provide a pastebin link for .py files. + ctx.dm_embed = PY_EMBED_DESCRIPTION + elif txt_extensions := {ext for ext in TXT_LIKE_FILES if ext in not_allowed}: + # Work around Discord auto-conversion of messages longer than 2000 chars to .txt + cmd_channel = bot.instance.get_channel(Channels.bot_commands) + ctx.dm_embed = TXT_EMBED_DESCRIPTION.format( + blocked_extension=txt_extensions.pop(), + cmd_channel_mention=cmd_channel.mention + ) + else: + meta_channel = bot.instance.get_channel(Channels.meta) + if not self._whitelisted_description: + self._whitelisted_description = ', '.join( + filter_.content for filter_ in self.filter_lists[ListType.ALLOW] + ) + ctx.dm_embed = DISALLOWED_EMBED_DESCRIPTION.format( + joined_whitelist=self._whitelisted_description, + blocked_extensions_str=", ".join(not_allowed), + meta_channel_mention=meta_channel.mention, + ) + + ctx.matches += not_allowed.values() + return self.defaults[ListType.ALLOW]["actions"], ", ".join(f"`{ext}`" for ext in not_allowed) diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index 1060f11db..9fb144354 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -1,12 +1,12 @@ from abc import abstractmethod from enum import Enum -from typing import Dict, List, Type +from typing import Dict, List, Optional, Type from discord.ext.commands import BadArgument, Context, Converter from bot.exts.filtering._filter_context import FilterContext from bot.exts.filtering._filters.filter import Filter -from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings +from bot.exts.filtering._settings import ActionSettings, ValidationSettings, create_settings from bot.exts.filtering._utils import FieldRequiring, past_tense from bot.log import get_logger @@ -45,7 +45,7 @@ class FilterList(FieldRequiring): def __init__(self, filter_type: Type[Filter]): self.filter_lists: dict[ListType, list[Filter]] = {} - self.defaults: dict[ListType, dict[str, Settings]] = {} + self.defaults = {} self.filter_type = filter_type @@ -64,8 +64,8 @@ class FilterList(FieldRequiring): self.filter_lists[list_type] = filters @abstractmethod - def triggers_for(self, ctx: FilterContext) -> list[Filter]: - """Dispatch the given event to the list's filters, and return filters triggered.""" + def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" @staticmethod def filter_list_result(ctx: FilterContext, filters: List[Filter], defaults: ValidationSettings) -> list[Filter]: diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index 01e586132..d4eb10591 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -2,11 +2,14 @@ from __future__ import annotations import re import typing +from functools import reduce +from operator import or_ +from typing import Optional from bot.exts.filtering._filter_context import Event, FilterContext from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType -from bot.exts.filtering._filters.filter import Filter from bot.exts.filtering._filters.token import TokenFilter +from bot.exts.filtering._settings import ActionSettings from bot.exts.filtering._utils import clean_input if typing.TYPE_CHECKING: @@ -24,17 +27,30 @@ class TokensList(FilterList): super().__init__(TokenFilter) filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT) - def triggers_for(self, ctx: FilterContext) -> list[Filter]: - """Dispatch the given event to the list's filters, and return filters triggered.""" + def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" text = ctx.content + if not text: + return None, "" if SPOILER_RE.search(text): text = self._expand_spoilers(text) text = clean_input(text) ctx = ctx.replace(content=text) - return self.filter_list_result( + triggers = self.filter_list_result( ctx, self.filter_lists[ListType.DENY], self.defaults[ListType.DENY]["validations"] ) + actions = None + message = "" + if triggers: + actions = reduce(or_, (filter_.actions for filter_ in triggers)) + if len(triggers) == 1: + message = f"#{triggers[0].id} (`{triggers[0].content}`)" + if triggers[0].description: + message += f" - {triggers[0].description}" + else: + message = ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers) + return actions, message @staticmethod def _expand_spoilers(text: str) -> str: diff --git a/bot/exts/filtering/_filters/extension.py b/bot/exts/filtering/_filters/extension.py new file mode 100644 index 000000000..85bfd05b2 --- /dev/null +++ b/bot/exts/filtering/_filters/extension.py @@ -0,0 +1,10 @@ +from bot.exts.filtering._filter_context import FilterContext +from bot.exts.filtering._filters.filter import Filter + + +class ExtensionFilter(Filter): + """A filter which looks for a specific attachment extension in messages.""" + + def triggered_on(self, ctx: FilterContext) -> bool: + """Searches for an attachment extension in the context content, given as a set of extensions.""" + return self.content in ctx.content diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py index 263fd851c..68ffa166f 100644 --- a/bot/exts/filtering/_settings_types/infraction_and_notification.py +++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py @@ -4,7 +4,7 @@ from enum import Enum, auto from typing import Any, Optional import arrow -from discord import Colour +from discord import Colour, Embed from discord.errors import Forbidden import bot @@ -74,22 +74,20 @@ class InfractionAndNotification(ActionEntry): # If there is no infraction to apply, any DM contents already provided in the context take precedence. if self.infraction_type == Infraction.NONE and (ctx.dm_content or ctx.dm_embed): dm_content = ctx.dm_content - dm_embed = ctx.dm_embed.description + dm_embed = ctx.dm_embed else: dm_content = self.dm_content dm_embed = self.dm_embed if dm_content or dm_embed: dm_content = f"Hey {ctx.author.mention}!\n{dm_content}" - ctx.dm_embed.description = dm_embed - if not ctx.dm_embed.colour: - ctx.dm_embed.colour = Colour.og_blurple() + dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) try: - await ctx.author.send(dm_content, embed=ctx.dm_embed) + await ctx.author.send(dm_content, embed=dm_embed) + ctx.action_descriptions.append("notified") except Forbidden: - await ctx.channel.send(ctx.dm_content, embed=ctx.dm_embed) - ctx.action_descriptions.append("notified") + ctx.action_descriptions.append("notified (failed)") msg_ctx = await bot.instance.get_context(ctx.message) msg_ctx.guild = bot.instance.get_guild(Guild.id) diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index d34b4928a..c22e7316f 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -12,7 +12,6 @@ from bot.bot import Bot from bot.constants import Colours, MODERATION_ROLES, Webhooks from bot.exts.filtering._filter_context import Event, FilterContext from bot.exts.filtering._filter_lists import FilterList, ListType, ListTypeConverter, filter_list_types -from bot.exts.filtering._filters.filter import Filter from bot.exts.filtering._settings import ActionSettings from bot.exts.filtering._ui import ArgumentCompletionView from bot.exts.filtering._utils import past_tense @@ -94,11 +93,11 @@ class Filtering(Cog): ctx = FilterContext(Event.MESSAGE, msg.author, msg.channel, msg.content, msg, msg.embeds) - triggered, result_actions = await self._resolve_action(ctx) + result_actions, list_messages = await self._resolve_action(ctx) if result_actions: await result_actions.action(ctx) - if ctx.send_alert: - await self._send_alert(ctx, triggered) + if ctx.send_alert: + await self._send_alert(ctx, list_messages) # endregion # region: blacklist commands @@ -178,23 +177,29 @@ class Filtering(Cog): async def _resolve_action( self, ctx: FilterContext - ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]: - """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event.""" - triggered = {} + ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]: + """ + Return the actions that should be taken for all filter lists in the given context. + + Additionally, a message is possibly provided from each filter list describing the triggers, + which should be relayed to the moderators. + """ + actions = [] + messages = {} for filter_list in self._subscriptions[ctx.event]: - result = filter_list.triggers_for(ctx) - if result: - triggered[filter_list] = result + list_actions, list_message = filter_list.actions_for(ctx) + if list_actions: + actions.append(list_actions) + if list_message: + messages[filter_list] = list_message result_actions = None - if triggered: - result_actions = reduce( - operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters) - ) + if actions: + result_actions = reduce(operator.or_, (action for action in actions)) - return triggered, result_actions + return result_actions, messages - async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[Filter]]) -> None: + async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, str]) -> None: """Build an alert message from the filter context, and send it via the alert webhook.""" if not self.webhook: return @@ -208,19 +213,12 @@ class Filtering(Cog): triggered_in = f"**Triggered in:** {format_channel(ctx.channel)}" else: triggered_in = "**DM**" - if len(triggered_filters) == 1 and len(list(triggered_filters.values())[0]) == 1: - filter_list, (filter_,) = next(iter(triggered_filters.items())) - filters = f"**{filter_list.name.title()} Filter:** #{filter_.id} (`{filter_.content}`)" - if filter_.description: - filters += f" - {filter_.description}" - else: - filters = [] - for filter_list, list_filters in triggered_filters.items(): - filters.append( - (f"**{filter_list.name.title()} Filters:** " - ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in list_filters)) - ) - filters = "\n".join(filters) + + filters = [] + for filter_list, list_message in triggered_filters.items(): + if list_message: + filters.append(f"**{filter_list.name.title()} Filters:** {list_message}") + filters = "\n".join(filters) matches = "**Matches:** " + ", ".join(repr(match) for match in ctx.matches) actions = "**Actions Taken:** " + (", ".join(ctx.action_descriptions) if ctx.action_descriptions else "-") |