aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/filtering/_filter_lists/antispam.py12
-rw-r--r--bot/exts/filtering/_filter_lists/domain.py8
-rw-r--r--bot/exts/filtering/_filter_lists/extension.py12
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py4
-rw-r--r--bot/exts/filtering/_filter_lists/invite.py19
-rw-r--r--bot/exts/filtering/_filter_lists/token.py8
-rw-r--r--bot/exts/filtering/_filter_lists/unique.py8
-rw-r--r--bot/exts/filtering/filtering.py37
8 files changed, 67 insertions, 41 deletions
diff --git a/bot/exts/filtering/_filter_lists/antispam.py b/bot/exts/filtering/_filter_lists/antispam.py
index b2f873094..ed86c92c0 100644
--- a/bot/exts/filtering/_filter_lists/antispam.py
+++ b/bot/exts/filtering/_filter_lists/antispam.py
@@ -17,7 +17,7 @@ from bot.constants import Webhooks
from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._filter_lists.filter_list import ListType, SubscribingAtomicList, UniquesListBase
from bot.exts.filtering._filters.antispam import antispam_filter_types
-from bot.exts.filtering._filters.filter import UniqueFilter
+from bot.exts.filtering._filters.filter import Filter, UniqueFilter
from bot.exts.filtering._settings import ActionSettings
from bot.exts.filtering._settings_types.actions.infraction_and_notification import Infraction
from bot.exts.filtering._ui.ui import build_mod_alert
@@ -55,10 +55,12 @@ class AntispamList(UniquesListBase):
self._already_warned.add(content)
return None
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
if not ctx.message:
- return None, []
+ return None, [], {}
sublist: SubscribingAtomicList = self[ListType.DENY]
potential_filters = [sublist.filters[id_] for id_ in sublist.subscriptions[ctx.event]]
@@ -71,7 +73,7 @@ class AntispamList(UniquesListBase):
new_ctx = ctx.replace(content=relevant_messages)
triggers = await sublist.filter_list_result(new_ctx)
if not triggers:
- return None, []
+ return None, [], {}
if ctx.author not in self.message_deletion_queue:
self.message_deletion_queue[ctx.author] = DeletionContext()
@@ -99,7 +101,7 @@ class AntispamList(UniquesListBase):
current_actions.pop("infraction_and_notification", None)
# Provide some message in case another filter list wants there to be an alert.
- return current_actions, ["Handling spam event..."]
+ return current_actions, ["Handling spam event..."], {ListType.DENY: triggers}
def _create_deletion_context_handler(self, context_id: Member) -> Callable[[FilterContext], Coroutine]:
async def schedule_processing(ctx: FilterContext) -> None:
diff --git a/bot/exts/filtering/_filter_lists/domain.py b/bot/exts/filtering/_filter_lists/domain.py
index 0b56e8d73..f4062edfe 100644
--- a/bot/exts/filtering/_filter_lists/domain.py
+++ b/bot/exts/filtering/_filter_lists/domain.py
@@ -42,11 +42,13 @@ class DomainsList(FilterList[DomainFilter]):
"""Return the types of filters used by this list."""
return {DomainFilter}
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
text = ctx.content
if not text:
- return None, []
+ return None, [], {}
text = clean_input(text)
urls = {match.group(1).lower().rstrip("/") for match in URL_RE.finditer(text)}
@@ -59,4 +61,4 @@ class DomainsList(FilterList[DomainFilter]):
if triggers:
actions = self[ListType.DENY].merge_actions(triggers)
messages = self[ListType.DENY].format_messages(triggers)
- return actions, messages
+ return actions, messages, {ListType.DENY: triggers}
diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py
index a53520bf7..a739d7191 100644
--- a/bot/exts/filtering/_filter_lists/extension.py
+++ b/bot/exts/filtering/_filter_lists/extension.py
@@ -61,15 +61,17 @@ class ExtensionsList(FilterList[ExtensionFilter]):
"""Return the types of filters used by this list."""
return {ExtensionFilter}
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
# Return early if the message doesn't have attachments.
if not ctx.message or not ctx.message.attachments:
- return None, []
+ return None, [], {}
_, failed = self[ListType.ALLOW].defaults.validations.evaluate(ctx)
if failed: # There's no extension filtering in this context.
- return None, []
+ return None, [], {}
# Find all extensions in the message.
all_ext = {
@@ -85,7 +87,7 @@ class ExtensionsList(FilterList[ExtensionFilter]):
not_allowed = {ext: filename for ext, filename in all_ext if ext not in allowed_ext}
if not not_allowed: # Yes, it's a double negative. Meaning all attachments are allowed :)
- return None, []
+ return None, [], {ListType.ALLOW: triggered}
# Something is disallowed.
if ".py" in not_allowed:
@@ -111,4 +113,4 @@ class ExtensionsList(FilterList[ExtensionFilter]):
)
ctx.matches += not_allowed.values()
- return self[ListType.ALLOW].defaults.actions, [f"`{ext}`" for ext in not_allowed]
+ return self[ListType.ALLOW].defaults.actions, [f"`{ext}`" for ext in not_allowed], {ListType.ALLOW: triggered}
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index 938766aca..f3761fbf9 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -184,7 +184,9 @@ class FilterList(dict[ListType, AtomicList], typing.Generic[T], FieldRequiring):
"""Return the types of filters used by this list."""
@abstractmethod
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
def _create_filter(self, filter_data: dict, defaults: Defaults) -> T | None:
diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py
index 911b951dd..36031f276 100644
--- a/bot/exts/filtering/_filter_lists/invite.py
+++ b/bot/exts/filtering/_filter_lists/invite.py
@@ -48,7 +48,9 @@ class InviteList(FilterList[InviteFilter]):
"""Return the types of filters used by this list."""
return {InviteFilter}
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
text = clean_input(ctx.content)
@@ -58,7 +60,8 @@ class InviteList(FilterList[InviteFilter]):
matches = list(DISCORD_INVITE.finditer(text))
invite_codes = {m.group("invite") for m in matches}
if not invite_codes:
- return None, []
+ return None, [], {}
+ all_triggers = {}
_, failed = self[ListType.ALLOW].defaults.validations.evaluate(ctx)
# If the allowed list doesn't operate in the context, unknown invites are allowed.
@@ -99,16 +102,17 @@ class InviteList(FilterList[InviteFilter]):
if check_if_allowed: # Whether unknown invites need to be checked.
new_ctx = ctx.replace(content=guilds_for_inspection)
- allowed = {
- filter_.content for filter_ in self[ListType.ALLOW].filters.values()
+ all_triggers[ListType.ALLOW] = [
+ filter_ for filter_ in self[ListType.ALLOW].filters.values()
if await filter_.triggered_on(new_ctx)
- }
+ ]
+ allowed = {filter_.content for filter_ in all_triggers[ListType.ALLOW]}
unknown_invites.update({
code: invite for code, invite in invites_for_inspection.items() if invite.guild.id not in allowed
})
if not triggered and not unknown_invites:
- return None, []
+ return None, [], all_triggers
actions = None
if unknown_invites: # There are invites which weren't allowed but aren't explicitly blocked.
@@ -119,6 +123,7 @@ class InviteList(FilterList[InviteFilter]):
actions |= self[ListType.DENY].merge_actions(triggered)
else:
actions = self[ListType.DENY].merge_actions(triggered)
+ all_triggers[ListType.DENY] = triggered
blocked_invites |= unknown_invites
ctx.matches += {match[0] for match in matches if match.group("invite") in blocked_invites}
@@ -127,7 +132,7 @@ class InviteList(FilterList[InviteFilter]):
messages += [
f"`{code} - {invite.guild.id}`" if invite else f"`{code}`" for code, invite in unknown_invites.items()
]
- return actions, messages
+ return actions, messages, all_triggers
@staticmethod
def _guild_embed(invite: Invite) -> Embed:
diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py
index 274dc5ea7..e4dbf4717 100644
--- a/bot/exts/filtering/_filter_lists/token.py
+++ b/bot/exts/filtering/_filter_lists/token.py
@@ -43,11 +43,13 @@ class TokensList(FilterList[TokenFilter]):
"""Return the types of filters used by this list."""
return {TokenFilter}
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
text = ctx.content
if not text:
- return None, []
+ return None, [], {}
if SPOILER_RE.search(text):
text = self._expand_spoilers(text)
text = clean_input(text)
@@ -59,7 +61,7 @@ class TokensList(FilterList[TokenFilter]):
if triggers:
actions = self[ListType.DENY].merge_actions(triggers)
messages = self[ListType.DENY].format_messages(triggers)
- return actions, messages
+ return actions, messages, {ListType.DENY: triggers}
@staticmethod
def _expand_spoilers(text: str) -> str:
diff --git a/bot/exts/filtering/_filter_lists/unique.py b/bot/exts/filtering/_filter_lists/unique.py
index ecc49af87..2cc1b78b2 100644
--- a/bot/exts/filtering/_filter_lists/unique.py
+++ b/bot/exts/filtering/_filter_lists/unique.py
@@ -2,7 +2,7 @@ from botcore.utils.logging import get_logger
from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._filter_lists.filter_list import ListType, UniquesListBase
-from bot.exts.filtering._filters.filter import UniqueFilter
+from bot.exts.filtering._filters.filter import Filter, UniqueFilter
from bot.exts.filtering._filters.unique import unique_filter_types
from bot.exts.filtering._settings import ActionSettings
@@ -29,7 +29,9 @@ class UniquesList(UniquesListBase):
self._already_warned.add(content)
return None
- async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ async def actions_for(
+ self, ctx: FilterContext
+ ) -> tuple[ActionSettings | None, list[str], dict[ListType, list[Filter]]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
triggers = await self[ListType.DENY].filter_list_result(ctx)
actions = None
@@ -37,4 +39,4 @@ class UniquesList(UniquesListBase):
if triggers:
actions = self[ListType.DENY].merge_actions(triggers)
messages = self[ListType.DENY].format_messages(triggers)
- return actions, messages
+ return actions, messages, {ListType.DENY: triggers}
diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py
index 34ca9f6e5..bd1345bc0 100644
--- a/bot/exts/filtering/filtering.py
+++ b/bot/exts/filtering/filtering.py
@@ -175,7 +175,7 @@ class Filtering(Cog):
self.message_cache.append(msg)
ctx = FilterContext(Event.MESSAGE, msg.author, msg.channel, msg.content, msg, msg.embeds)
- result_actions, list_messages = await self._resolve_action(ctx)
+ result_actions, list_messages, _ = await self._resolve_action(ctx)
if result_actions:
await result_actions.action(ctx)
if ctx.send_alert:
@@ -498,31 +498,37 @@ class Filtering(Cog):
await ctx.send(embed=embed)
@filter.command(name="match")
- async def f_match(self, ctx: Context, message: Message | None, *, string: str | None) -> None:
+ async def f_match(
+ self, ctx: Context, no_user: bool | None, message: Message | None, *, string: str | None
+ ) -> None:
"""
Post any responses from the filter lists for the given message or string.
- If there's a message the string will be ignored. Note that if a message is provided, it will go through all
- validations appropriate to where it was sent and who sent it.
+ If there's a `message`, the `string` will be ignored. Note that if a `message` is provided, it will go through
+ all validations appropriate to where it was sent and who sent it. To check for matches regardless of the author
+ (for example if the message was sent by another staff member or yourself) set `no_user` to '1' or 'True'.
- If a string is provided, it will be validated in the context of a user with no roles in python-general.
+ If a `string` is provided, it will be validated in the context of a user with no roles in python-general.
"""
if not message and not string:
- raise BadArgument(":x: Please provide input.")
+ raise BadArgument("Please provide input.")
if message:
+ user = None if no_user else message.author
filter_ctx = FilterContext(
- Event.MESSAGE, message.author, message.channel, message.content, message, message.embeds
+ Event.MESSAGE, user, message.channel, message.content, message, message.embeds
)
else:
filter_ctx = FilterContext(
Event.MESSAGE, None, ctx.guild.get_channel(Channels.python_general), string, None
)
- _, list_messages = await self._resolve_action(filter_ctx)
+ _, _, triggers = await self._resolve_action(filter_ctx)
lines = []
- for filter_list, list_message_list in list_messages.items():
- if list_message_list:
- lines.extend([f"**{filter_list.name.title()}s**", *list_message_list, "\n"])
+ for filter_list, list_triggers in triggers.items():
+ for sublist_type, sublist_triggers in list_triggers.items():
+ if sublist_triggers:
+ triggers_repr = map(str, sublist_triggers)
+ lines.extend([f"**{filter_list[sublist_type].label.title()}s**", *triggers_repr, "\n"])
lines = lines[:-1] # Remove last newline.
embed = Embed(colour=Colour.blue(), title="Match results")
@@ -745,7 +751,9 @@ class Filtering(Cog):
self.filter_lists[list_name] = filter_list_types[list_name](self)
return self.filter_lists[list_name].add_list(list_data)
- async def _resolve_action(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], dict[FilterList, list[str]]]:
+ async def _resolve_action(
+ self, ctx: FilterContext
+ ) -> tuple[Optional[ActionSettings], dict[FilterList, list[str]], dict[FilterList, dict[ListType, list[Filter]]]]:
"""
Return the actions that should be taken for all filter lists in the given context.
@@ -754,8 +762,9 @@ class Filtering(Cog):
"""
actions = []
messages = {}
+ triggers = {}
for filter_list in self._subscriptions[ctx.event]:
- list_actions, list_message = await filter_list.actions_for(ctx)
+ list_actions, list_message, triggers[filter_list] = await filter_list.actions_for(ctx)
if list_actions:
actions.append(list_actions)
if list_message:
@@ -765,7 +774,7 @@ class Filtering(Cog):
if actions:
result_actions = reduce(operator.or_, (action for action in actions))
- return result_actions, messages
+ return result_actions, messages, triggers
async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[str]]) -> None:
"""Build an alert message from the filter context, and send it via the alert webhook."""