aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar mbaruh <[email protected]>2022-02-24 21:20:37 +0200
committerGravatar mbaruh <[email protected]>2022-07-16 02:08:35 +0300
commit555be653507baeda16f069114dbf5e7a2752d6e3 (patch)
tree8efd9c80d60cf085f63519fe549a3eae21dcdf9b
parentAccept strings in channel scope and change role string interpretation (diff)
Add file extension filtering
This commmit migrates the AntiMalware cog to a new filter list which goes over a message's attachments. Some changes were needed to accomodate the new list, primarily what a filter list returns for a given context: Instead of returning a list of filters, it will return the action itself that should be taken. This adds the flexibility of not needing existing filters to dictate the action. For example, in the case of the extensions list, an action should be taken when filters were *not* triggered. Or more precisely, when not all attachment extensions are whitelisted. Therefore, the action in that case is dictated by the filter list (stored as the list's default actions). Additionally each filter list can now return its own message for the alert embed, instead of the cog formatting it according to the filters raised. Because again, an action might be taken without any deny filters being triggered. This is going to be especially relevant for the invites list. Additionally, the infraction_and_notification action now doesn't redirect the notification to the context channel when the DM fails, since this can be incredibly noisy in cases of spam. If we want this functionality, a more suitable solution should be found.
-rw-r--r--bot/exts/filtering/_filter_context.py6
-rw-r--r--bot/exts/filtering/_filter_lists/extension.py92
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py10
-rw-r--r--bot/exts/filtering/_filter_lists/token.py24
-rw-r--r--bot/exts/filtering/_filters/extension.py10
-rw-r--r--bot/exts/filtering/_settings_types/infraction_and_notification.py14
-rw-r--r--bot/exts/filtering/filtering.py56
7 files changed, 163 insertions, 49 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py
index ee9e87f56..ad5c8636f 100644
--- a/bot/exts/filtering/_filter_context.py
+++ b/bot/exts/filtering/_filter_context.py
@@ -4,7 +4,7 @@ from dataclasses import dataclass, field, replace
from enum import Enum, auto
from typing import Optional, Union
-from discord import DMChannel, Embed, Message, TextChannel, Thread, User
+from discord import DMChannel, Message, TextChannel, Thread, User
class Event(Enum):
@@ -22,12 +22,12 @@ class FilterContext:
event: Event # The type of event
author: User # Who triggered the event
channel: Union[TextChannel, Thread, DMChannel] # The channel involved
- content: str # What actually needs filtering
+ content: Union[str, set[str]] # What actually needs filtering
message: Optional[Message] # The message involved
embeds: list = field(default_factory=list) # Any embeds involved
# Output context
dm_content: str = field(default_factory=str) # The content to DM the invoker
- dm_embed: Embed = field(default_factory=Embed) # The embed to DM the invoker
+ dm_embed: str = field(default_factory=str) # The embed description to DM the invoker
send_alert: bool = field(default=True) # Whether to send an alert for the moderators
alert_content: str = field(default_factory=str) # The content of the alert
alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert
diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py
new file mode 100644
index 000000000..c55cda114
--- /dev/null
+++ b/bot/exts/filtering/_filter_lists/extension.py
@@ -0,0 +1,92 @@
+from __future__ import annotations
+
+import typing
+from os.path import splitext
+from typing import Optional
+
+import bot
+from bot.constants import Channels, URLs
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
+from bot.exts.filtering._filters.extension import ExtensionFilter
+from bot.exts.filtering._settings import ActionSettings
+
+if typing.TYPE_CHECKING:
+ from bot.exts.filtering.filtering import Filtering
+
+
+PY_EMBED_DESCRIPTION = (
+ "It looks like you tried to attach a Python file - "
+ f"please use a code-pasting service such as {URLs.site_schema}{URLs.site_paste}"
+)
+
+TXT_LIKE_FILES = {".txt", ".csv", ".json"}
+TXT_EMBED_DESCRIPTION = (
+ "You either uploaded a `{blocked_extension}` file or entered a message that was too long. "
+ f"Please use our [paste bin]({URLs.site_schema}{URLs.site_paste}) instead."
+)
+
+DISALLOWED_EMBED_DESCRIPTION = (
+ "It looks like you tried to attach file type(s) that we do not allow ({blocked_extensions_str}). "
+ "We currently allow the following file types: **{joined_whitelist}**.\n\n"
+ "Feel free to ask in {meta_channel_mention} if you think this is a mistake."
+)
+
+
+class ExtensionsList(FilterList):
+ """A list of filters, each looking for an attachment with a specific extension."""
+
+ name = "extension"
+
+ def __init__(self, filtering_cog: Filtering):
+ super().__init__(ExtensionFilter)
+ filtering_cog.subscribe(self, Event.MESSAGE)
+ self._whitelisted_description = None
+
+ def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
+ # Return early if the message doesn't have attachments.
+ if not ctx.message.attachments:
+ return None, ""
+
+ # Find all extensions in the message.
+ all_ext = {
+ (splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments
+ }
+ new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read.
+ triggered = self.filter_list_result(
+ new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"]
+ )
+ allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed.
+
+ # See if there are any extensions left which aren't allowed.
+ not_allowed = {ext: filename for ext, filename in all_ext if ext not in allowed_ext}
+
+ if not not_allowed: # Yes, it's a double negative. Meaning all attachments are allowed :)
+ return None, ""
+
+ # Something is disallowed.
+ if ".py" in not_allowed:
+ # Provide a pastebin link for .py files.
+ ctx.dm_embed = PY_EMBED_DESCRIPTION
+ elif txt_extensions := {ext for ext in TXT_LIKE_FILES if ext in not_allowed}:
+ # Work around Discord auto-conversion of messages longer than 2000 chars to .txt
+ cmd_channel = bot.instance.get_channel(Channels.bot_commands)
+ ctx.dm_embed = TXT_EMBED_DESCRIPTION.format(
+ blocked_extension=txt_extensions.pop(),
+ cmd_channel_mention=cmd_channel.mention
+ )
+ else:
+ meta_channel = bot.instance.get_channel(Channels.meta)
+ if not self._whitelisted_description:
+ self._whitelisted_description = ', '.join(
+ filter_.content for filter_ in self.filter_lists[ListType.ALLOW]
+ )
+ ctx.dm_embed = DISALLOWED_EMBED_DESCRIPTION.format(
+ joined_whitelist=self._whitelisted_description,
+ blocked_extensions_str=", ".join(not_allowed),
+ meta_channel_mention=meta_channel.mention,
+ )
+
+ ctx.matches += not_allowed.values()
+ return self.defaults[ListType.ALLOW]["actions"], ", ".join(f"`{ext}`" for ext in not_allowed)
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index 1060f11db..9fb144354 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -1,12 +1,12 @@
from abc import abstractmethod
from enum import Enum
-from typing import Dict, List, Type
+from typing import Dict, List, Optional, Type
from discord.ext.commands import BadArgument, Context, Converter
from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._filters.filter import Filter
-from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings
+from bot.exts.filtering._settings import ActionSettings, ValidationSettings, create_settings
from bot.exts.filtering._utils import FieldRequiring, past_tense
from bot.log import get_logger
@@ -45,7 +45,7 @@ class FilterList(FieldRequiring):
def __init__(self, filter_type: Type[Filter]):
self.filter_lists: dict[ListType, list[Filter]] = {}
- self.defaults: dict[ListType, dict[str, Settings]] = {}
+ self.defaults = {}
self.filter_type = filter_type
@@ -64,8 +64,8 @@ class FilterList(FieldRequiring):
self.filter_lists[list_type] = filters
@abstractmethod
- def triggers_for(self, ctx: FilterContext) -> list[Filter]:
- """Dispatch the given event to the list's filters, and return filters triggered."""
+ def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
@staticmethod
def filter_list_result(ctx: FilterContext, filters: List[Filter], defaults: ValidationSettings) -> list[Filter]:
diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py
index 01e586132..d4eb10591 100644
--- a/bot/exts/filtering/_filter_lists/token.py
+++ b/bot/exts/filtering/_filter_lists/token.py
@@ -2,11 +2,14 @@ from __future__ import annotations
import re
import typing
+from functools import reduce
+from operator import or_
+from typing import Optional
from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
-from bot.exts.filtering._filters.filter import Filter
from bot.exts.filtering._filters.token import TokenFilter
+from bot.exts.filtering._settings import ActionSettings
from bot.exts.filtering._utils import clean_input
if typing.TYPE_CHECKING:
@@ -24,17 +27,30 @@ class TokensList(FilterList):
super().__init__(TokenFilter)
filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT)
- def triggers_for(self, ctx: FilterContext) -> list[Filter]:
- """Dispatch the given event to the list's filters, and return filters triggered."""
+ def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
text = ctx.content
+ if not text:
+ return None, ""
if SPOILER_RE.search(text):
text = self._expand_spoilers(text)
text = clean_input(text)
ctx = ctx.replace(content=text)
- return self.filter_list_result(
+ triggers = self.filter_list_result(
ctx, self.filter_lists[ListType.DENY], self.defaults[ListType.DENY]["validations"]
)
+ actions = None
+ message = ""
+ if triggers:
+ actions = reduce(or_, (filter_.actions for filter_ in triggers))
+ if len(triggers) == 1:
+ message = f"#{triggers[0].id} (`{triggers[0].content}`)"
+ if triggers[0].description:
+ message += f" - {triggers[0].description}"
+ else:
+ message = ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers)
+ return actions, message
@staticmethod
def _expand_spoilers(text: str) -> str:
diff --git a/bot/exts/filtering/_filters/extension.py b/bot/exts/filtering/_filters/extension.py
new file mode 100644
index 000000000..85bfd05b2
--- /dev/null
+++ b/bot/exts/filtering/_filters/extension.py
@@ -0,0 +1,10 @@
+from bot.exts.filtering._filter_context import FilterContext
+from bot.exts.filtering._filters.filter import Filter
+
+
+class ExtensionFilter(Filter):
+ """A filter which looks for a specific attachment extension in messages."""
+
+ def triggered_on(self, ctx: FilterContext) -> bool:
+ """Searches for an attachment extension in the context content, given as a set of extensions."""
+ return self.content in ctx.content
diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py
index 263fd851c..68ffa166f 100644
--- a/bot/exts/filtering/_settings_types/infraction_and_notification.py
+++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py
@@ -4,7 +4,7 @@ from enum import Enum, auto
from typing import Any, Optional
import arrow
-from discord import Colour
+from discord import Colour, Embed
from discord.errors import Forbidden
import bot
@@ -74,22 +74,20 @@ class InfractionAndNotification(ActionEntry):
# If there is no infraction to apply, any DM contents already provided in the context take precedence.
if self.infraction_type == Infraction.NONE and (ctx.dm_content or ctx.dm_embed):
dm_content = ctx.dm_content
- dm_embed = ctx.dm_embed.description
+ dm_embed = ctx.dm_embed
else:
dm_content = self.dm_content
dm_embed = self.dm_embed
if dm_content or dm_embed:
dm_content = f"Hey {ctx.author.mention}!\n{dm_content}"
- ctx.dm_embed.description = dm_embed
- if not ctx.dm_embed.colour:
- ctx.dm_embed.colour = Colour.og_blurple()
+ dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple())
try:
- await ctx.author.send(dm_content, embed=ctx.dm_embed)
+ await ctx.author.send(dm_content, embed=dm_embed)
+ ctx.action_descriptions.append("notified")
except Forbidden:
- await ctx.channel.send(ctx.dm_content, embed=ctx.dm_embed)
- ctx.action_descriptions.append("notified")
+ ctx.action_descriptions.append("notified (failed)")
msg_ctx = await bot.instance.get_context(ctx.message)
msg_ctx.guild = bot.instance.get_guild(Guild.id)
diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py
index d34b4928a..c22e7316f 100644
--- a/bot/exts/filtering/filtering.py
+++ b/bot/exts/filtering/filtering.py
@@ -12,7 +12,6 @@ from bot.bot import Bot
from bot.constants import Colours, MODERATION_ROLES, Webhooks
from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._filter_lists import FilterList, ListType, ListTypeConverter, filter_list_types
-from bot.exts.filtering._filters.filter import Filter
from bot.exts.filtering._settings import ActionSettings
from bot.exts.filtering._ui import ArgumentCompletionView
from bot.exts.filtering._utils import past_tense
@@ -94,11 +93,11 @@ class Filtering(Cog):
ctx = FilterContext(Event.MESSAGE, msg.author, msg.channel, msg.content, msg, msg.embeds)
- triggered, result_actions = await self._resolve_action(ctx)
+ result_actions, list_messages = await self._resolve_action(ctx)
if result_actions:
await result_actions.action(ctx)
- if ctx.send_alert:
- await self._send_alert(ctx, triggered)
+ if ctx.send_alert:
+ await self._send_alert(ctx, list_messages)
# endregion
# region: blacklist commands
@@ -178,23 +177,29 @@ class Filtering(Cog):
async def _resolve_action(
self, ctx: FilterContext
- ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]:
- """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event."""
- triggered = {}
+ ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]:
+ """
+ Return the actions that should be taken for all filter lists in the given context.
+
+ Additionally, a message is possibly provided from each filter list describing the triggers,
+ which should be relayed to the moderators.
+ """
+ actions = []
+ messages = {}
for filter_list in self._subscriptions[ctx.event]:
- result = filter_list.triggers_for(ctx)
- if result:
- triggered[filter_list] = result
+ list_actions, list_message = filter_list.actions_for(ctx)
+ if list_actions:
+ actions.append(list_actions)
+ if list_message:
+ messages[filter_list] = list_message
result_actions = None
- if triggered:
- result_actions = reduce(
- operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters)
- )
+ if actions:
+ result_actions = reduce(operator.or_, (action for action in actions))
- return triggered, result_actions
+ return result_actions, messages
- async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[Filter]]) -> None:
+ async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, str]) -> None:
"""Build an alert message from the filter context, and send it via the alert webhook."""
if not self.webhook:
return
@@ -208,19 +213,12 @@ class Filtering(Cog):
triggered_in = f"**Triggered in:** {format_channel(ctx.channel)}"
else:
triggered_in = "**DM**"
- if len(triggered_filters) == 1 and len(list(triggered_filters.values())[0]) == 1:
- filter_list, (filter_,) = next(iter(triggered_filters.items()))
- filters = f"**{filter_list.name.title()} Filter:** #{filter_.id} (`{filter_.content}`)"
- if filter_.description:
- filters += f" - {filter_.description}"
- else:
- filters = []
- for filter_list, list_filters in triggered_filters.items():
- filters.append(
- (f"**{filter_list.name.title()} Filters:** "
- ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in list_filters))
- )
- filters = "\n".join(filters)
+
+ filters = []
+ for filter_list, list_message in triggered_filters.items():
+ if list_message:
+ filters.append(f"**{filter_list.name.title()} Filters:** {list_message}")
+ filters = "\n".join(filters)
matches = "**Matches:** " + ", ".join(repr(match) for match in ctx.matches)
actions = "**Actions Taken:** " + (", ".join(ctx.action_descriptions) if ctx.action_descriptions else "-")