aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/filtering/_filter_context.py6
-rw-r--r--bot/exts/filtering/_filter_lists/extension.py92
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py10
-rw-r--r--bot/exts/filtering/_filter_lists/token.py24
-rw-r--r--bot/exts/filtering/_filters/extension.py10
-rw-r--r--bot/exts/filtering/_settings_types/infraction_and_notification.py14
-rw-r--r--bot/exts/filtering/filtering.py56
7 files changed, 163 insertions, 49 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py
index ee9e87f56..ad5c8636f 100644
--- a/bot/exts/filtering/_filter_context.py
+++ b/bot/exts/filtering/_filter_context.py
@@ -4,7 +4,7 @@ from dataclasses import dataclass, field, replace
from enum import Enum, auto
from typing import Optional, Union
-from discord import DMChannel, Embed, Message, TextChannel, Thread, User
+from discord import DMChannel, Message, TextChannel, Thread, User
class Event(Enum):
@@ -22,12 +22,12 @@ class FilterContext:
event: Event # The type of event
author: User # Who triggered the event
channel: Union[TextChannel, Thread, DMChannel] # The channel involved
- content: str # What actually needs filtering
+ content: Union[str, set[str]] # What actually needs filtering
message: Optional[Message] # The message involved
embeds: list = field(default_factory=list) # Any embeds involved
# Output context
dm_content: str = field(default_factory=str) # The content to DM the invoker
- dm_embed: Embed = field(default_factory=Embed) # The embed to DM the invoker
+ dm_embed: str = field(default_factory=str) # The embed description to DM the invoker
send_alert: bool = field(default=True) # Whether to send an alert for the moderators
alert_content: str = field(default_factory=str) # The content of the alert
alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert
diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py
new file mode 100644
index 000000000..c55cda114
--- /dev/null
+++ b/bot/exts/filtering/_filter_lists/extension.py
@@ -0,0 +1,92 @@
+from __future__ import annotations
+
+import typing
+from os.path import splitext
+from typing import Optional
+
+import bot
+from bot.constants import Channels, URLs
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
+from bot.exts.filtering._filters.extension import ExtensionFilter
+from bot.exts.filtering._settings import ActionSettings
+
+if typing.TYPE_CHECKING:
+ from bot.exts.filtering.filtering import Filtering
+
+
+PY_EMBED_DESCRIPTION = (
+ "It looks like you tried to attach a Python file - "
+ f"please use a code-pasting service such as {URLs.site_schema}{URLs.site_paste}"
+)
+
+TXT_LIKE_FILES = {".txt", ".csv", ".json"}
+TXT_EMBED_DESCRIPTION = (
+ "You either uploaded a `{blocked_extension}` file or entered a message that was too long. "
+ f"Please use our [paste bin]({URLs.site_schema}{URLs.site_paste}) instead."
+)
+
+DISALLOWED_EMBED_DESCRIPTION = (
+ "It looks like you tried to attach file type(s) that we do not allow ({blocked_extensions_str}). "
+ "We currently allow the following file types: **{joined_whitelist}**.\n\n"
+ "Feel free to ask in {meta_channel_mention} if you think this is a mistake."
+)
+
+
+class ExtensionsList(FilterList):
+ """A list of filters, each looking for an attachment with a specific extension."""
+
+ name = "extension"
+
+ def __init__(self, filtering_cog: Filtering):
+ super().__init__(ExtensionFilter)
+ filtering_cog.subscribe(self, Event.MESSAGE)
+ self._whitelisted_description = None
+
+ def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
+ # Return early if the message doesn't have attachments.
+ if not ctx.message.attachments:
+ return None, ""
+
+ # Find all extensions in the message.
+ all_ext = {
+ (splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments
+ }
+ new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read.
+ triggered = self.filter_list_result(
+ new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"]
+ )
+ allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed.
+
+ # See if there are any extensions left which aren't allowed.
+ not_allowed = {ext: filename for ext, filename in all_ext if ext not in allowed_ext}
+
+ if not not_allowed: # Yes, it's a double negative. Meaning all attachments are allowed :)
+ return None, ""
+
+ # Something is disallowed.
+ if ".py" in not_allowed:
+ # Provide a pastebin link for .py files.
+ ctx.dm_embed = PY_EMBED_DESCRIPTION
+ elif txt_extensions := {ext for ext in TXT_LIKE_FILES if ext in not_allowed}:
+ # Work around Discord auto-conversion of messages longer than 2000 chars to .txt
+ cmd_channel = bot.instance.get_channel(Channels.bot_commands)
+ ctx.dm_embed = TXT_EMBED_DESCRIPTION.format(
+ blocked_extension=txt_extensions.pop(),
+ cmd_channel_mention=cmd_channel.mention
+ )
+ else:
+ meta_channel = bot.instance.get_channel(Channels.meta)
+ if not self._whitelisted_description:
+ self._whitelisted_description = ', '.join(
+ filter_.content for filter_ in self.filter_lists[ListType.ALLOW]
+ )
+ ctx.dm_embed = DISALLOWED_EMBED_DESCRIPTION.format(
+ joined_whitelist=self._whitelisted_description,
+ blocked_extensions_str=", ".join(not_allowed),
+ meta_channel_mention=meta_channel.mention,
+ )
+
+ ctx.matches += not_allowed.values()
+ return self.defaults[ListType.ALLOW]["actions"], ", ".join(f"`{ext}`" for ext in not_allowed)
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index 1060f11db..9fb144354 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -1,12 +1,12 @@
from abc import abstractmethod
from enum import Enum
-from typing import Dict, List, Type
+from typing import Dict, List, Optional, Type
from discord.ext.commands import BadArgument, Context, Converter
from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._filters.filter import Filter
-from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings
+from bot.exts.filtering._settings import ActionSettings, ValidationSettings, create_settings
from bot.exts.filtering._utils import FieldRequiring, past_tense
from bot.log import get_logger
@@ -45,7 +45,7 @@ class FilterList(FieldRequiring):
def __init__(self, filter_type: Type[Filter]):
self.filter_lists: dict[ListType, list[Filter]] = {}
- self.defaults: dict[ListType, dict[str, Settings]] = {}
+ self.defaults = {}
self.filter_type = filter_type
@@ -64,8 +64,8 @@ class FilterList(FieldRequiring):
self.filter_lists[list_type] = filters
@abstractmethod
- def triggers_for(self, ctx: FilterContext) -> list[Filter]:
- """Dispatch the given event to the list's filters, and return filters triggered."""
+ def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
@staticmethod
def filter_list_result(ctx: FilterContext, filters: List[Filter], defaults: ValidationSettings) -> list[Filter]:
diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py
index 01e586132..d4eb10591 100644
--- a/bot/exts/filtering/_filter_lists/token.py
+++ b/bot/exts/filtering/_filter_lists/token.py
@@ -2,11 +2,14 @@ from __future__ import annotations
import re
import typing
+from functools import reduce
+from operator import or_
+from typing import Optional
from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
-from bot.exts.filtering._filters.filter import Filter
from bot.exts.filtering._filters.token import TokenFilter
+from bot.exts.filtering._settings import ActionSettings
from bot.exts.filtering._utils import clean_input
if typing.TYPE_CHECKING:
@@ -24,17 +27,30 @@ class TokensList(FilterList):
super().__init__(TokenFilter)
filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT)
- def triggers_for(self, ctx: FilterContext) -> list[Filter]:
- """Dispatch the given event to the list's filters, and return filters triggered."""
+ def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
text = ctx.content
+ if not text:
+ return None, ""
if SPOILER_RE.search(text):
text = self._expand_spoilers(text)
text = clean_input(text)
ctx = ctx.replace(content=text)
- return self.filter_list_result(
+ triggers = self.filter_list_result(
ctx, self.filter_lists[ListType.DENY], self.defaults[ListType.DENY]["validations"]
)
+ actions = None
+ message = ""
+ if triggers:
+ actions = reduce(or_, (filter_.actions for filter_ in triggers))
+ if len(triggers) == 1:
+ message = f"#{triggers[0].id} (`{triggers[0].content}`)"
+ if triggers[0].description:
+ message += f" - {triggers[0].description}"
+ else:
+ message = ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers)
+ return actions, message
@staticmethod
def _expand_spoilers(text: str) -> str:
diff --git a/bot/exts/filtering/_filters/extension.py b/bot/exts/filtering/_filters/extension.py
new file mode 100644
index 000000000..85bfd05b2
--- /dev/null
+++ b/bot/exts/filtering/_filters/extension.py
@@ -0,0 +1,10 @@
+from bot.exts.filtering._filter_context import FilterContext
+from bot.exts.filtering._filters.filter import Filter
+
+
+class ExtensionFilter(Filter):
+ """A filter which looks for a specific attachment extension in messages."""
+
+ def triggered_on(self, ctx: FilterContext) -> bool:
+ """Searches for an attachment extension in the context content, given as a set of extensions."""
+ return self.content in ctx.content
diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py
index 263fd851c..68ffa166f 100644
--- a/bot/exts/filtering/_settings_types/infraction_and_notification.py
+++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py
@@ -4,7 +4,7 @@ from enum import Enum, auto
from typing import Any, Optional
import arrow
-from discord import Colour
+from discord import Colour, Embed
from discord.errors import Forbidden
import bot
@@ -74,22 +74,20 @@ class InfractionAndNotification(ActionEntry):
# If there is no infraction to apply, any DM contents already provided in the context take precedence.
if self.infraction_type == Infraction.NONE and (ctx.dm_content or ctx.dm_embed):
dm_content = ctx.dm_content
- dm_embed = ctx.dm_embed.description
+ dm_embed = ctx.dm_embed
else:
dm_content = self.dm_content
dm_embed = self.dm_embed
if dm_content or dm_embed:
dm_content = f"Hey {ctx.author.mention}!\n{dm_content}"
- ctx.dm_embed.description = dm_embed
- if not ctx.dm_embed.colour:
- ctx.dm_embed.colour = Colour.og_blurple()
+ dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple())
try:
- await ctx.author.send(dm_content, embed=ctx.dm_embed)
+ await ctx.author.send(dm_content, embed=dm_embed)
+ ctx.action_descriptions.append("notified")
except Forbidden:
- await ctx.channel.send(ctx.dm_content, embed=ctx.dm_embed)
- ctx.action_descriptions.append("notified")
+ ctx.action_descriptions.append("notified (failed)")
msg_ctx = await bot.instance.get_context(ctx.message)
msg_ctx.guild = bot.instance.get_guild(Guild.id)
diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py
index d34b4928a..c22e7316f 100644
--- a/bot/exts/filtering/filtering.py
+++ b/bot/exts/filtering/filtering.py
@@ -12,7 +12,6 @@ from bot.bot import Bot
from bot.constants import Colours, MODERATION_ROLES, Webhooks
from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._filter_lists import FilterList, ListType, ListTypeConverter, filter_list_types
-from bot.exts.filtering._filters.filter import Filter
from bot.exts.filtering._settings import ActionSettings
from bot.exts.filtering._ui import ArgumentCompletionView
from bot.exts.filtering._utils import past_tense
@@ -94,11 +93,11 @@ class Filtering(Cog):
ctx = FilterContext(Event.MESSAGE, msg.author, msg.channel, msg.content, msg, msg.embeds)
- triggered, result_actions = await self._resolve_action(ctx)
+ result_actions, list_messages = await self._resolve_action(ctx)
if result_actions:
await result_actions.action(ctx)
- if ctx.send_alert:
- await self._send_alert(ctx, triggered)
+ if ctx.send_alert:
+ await self._send_alert(ctx, list_messages)
# endregion
# region: blacklist commands
@@ -178,23 +177,29 @@ class Filtering(Cog):
async def _resolve_action(
self, ctx: FilterContext
- ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]:
- """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event."""
- triggered = {}
+ ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]:
+ """
+ Return the actions that should be taken for all filter lists in the given context.
+
+ Additionally, a message is possibly provided from each filter list describing the triggers,
+ which should be relayed to the moderators.
+ """
+ actions = []
+ messages = {}
for filter_list in self._subscriptions[ctx.event]:
- result = filter_list.triggers_for(ctx)
- if result:
- triggered[filter_list] = result
+ list_actions, list_message = filter_list.actions_for(ctx)
+ if list_actions:
+ actions.append(list_actions)
+ if list_message:
+ messages[filter_list] = list_message
result_actions = None
- if triggered:
- result_actions = reduce(
- operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters)
- )
+ if actions:
+ result_actions = reduce(operator.or_, (action for action in actions))
- return triggered, result_actions
+ return result_actions, messages
- async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[Filter]]) -> None:
+ async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, str]) -> None:
"""Build an alert message from the filter context, and send it via the alert webhook."""
if not self.webhook:
return
@@ -208,19 +213,12 @@ class Filtering(Cog):
triggered_in = f"**Triggered in:** {format_channel(ctx.channel)}"
else:
triggered_in = "**DM**"
- if len(triggered_filters) == 1 and len(list(triggered_filters.values())[0]) == 1:
- filter_list, (filter_,) = next(iter(triggered_filters.items()))
- filters = f"**{filter_list.name.title()} Filter:** #{filter_.id} (`{filter_.content}`)"
- if filter_.description:
- filters += f" - {filter_.description}"
- else:
- filters = []
- for filter_list, list_filters in triggered_filters.items():
- filters.append(
- (f"**{filter_list.name.title()} Filters:** "
- ", ".join(f"#{filter_.id} (`{filter_.content}`)" for filter_ in list_filters))
- )
- filters = "\n".join(filters)
+
+ filters = []
+ for filter_list, list_message in triggered_filters.items():
+ if list_message:
+ filters.append(f"**{filter_list.name.title()} Filters:** {list_message}")
+ filters = "\n".join(filters)
matches = "**Matches:** " + ", ".join(repr(match) for match in ctx.matches)
actions = "**Actions Taken:** " + (", ".join(ctx.action_descriptions) if ctx.action_descriptions else "-")