aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/filtering/_filter_context.py4
-rw-r--r--bot/exts/filtering/_filter_lists/extension.py10
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py2
-rw-r--r--bot/exts/filtering/_filter_lists/invite.py114
-rw-r--r--bot/exts/filtering/_filter_lists/token.py2
-rw-r--r--bot/exts/filtering/_filters/filter.py4
-rw-r--r--bot/exts/filtering/_filters/invite.py17
-rw-r--r--bot/exts/filtering/_settings_types/infraction_and_notification.py2
-rw-r--r--bot/exts/filtering/filtering.py18
9 files changed, 154 insertions, 19 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py
index ad5c8636f..2fec9ce42 100644
--- a/bot/exts/filtering/_filter_context.py
+++ b/bot/exts/filtering/_filter_context.py
@@ -22,13 +22,13 @@ class FilterContext:
event: Event # The type of event
author: User # Who triggered the event
channel: Union[TextChannel, Thread, DMChannel] # The channel involved
- content: Union[str, set[str]] # What actually needs filtering
+ content: Union[str, set] # What actually needs filtering
message: Optional[Message] # The message involved
embeds: list = field(default_factory=list) # Any embeds involved
# Output context
dm_content: str = field(default_factory=str) # The content to DM the invoker
dm_embed: str = field(default_factory=str) # The embed description to DM the invoker
- send_alert: bool = field(default=True) # Whether to send an alert for the moderators
+ send_alert: bool = field(default=False) # Whether to send an alert for the moderators
alert_content: str = field(default_factory=str) # The content of the alert
alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert
action_descriptions: list = field(default_factory=list) # What actions were taken
diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py
index c55cda114..ceb8bb958 100644
--- a/bot/exts/filtering/_filter_lists/extension.py
+++ b/bot/exts/filtering/_filter_lists/extension.py
@@ -43,20 +43,22 @@ class ExtensionsList(FilterList):
filtering_cog.subscribe(self, Event.MESSAGE)
self._whitelisted_description = None
- def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
"""Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
# Return early if the message doesn't have attachments.
if not ctx.message.attachments:
return None, ""
+ _, failed = self.defaults[ListType.ALLOW]["validations"].evaluate(ctx)
+ if failed: # There's no extension filtering in this context.
+ return None, ""
+
# Find all extensions in the message.
all_ext = {
(splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments
}
new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read.
- triggered = self.filter_list_result(
- new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"]
- )
+ triggered = [filter_ for filter_ in self.filter_lists[ListType.ALLOW] if filter_.triggered_on(new_ctx)]
allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed.
# See if there are any extensions left which aren't allowed.
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index 9fb144354..672811444 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -64,7 +64,7 @@ class FilterList(FieldRequiring):
self.filter_lists[list_type] = filters
@abstractmethod
- def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
"""Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
@staticmethod
diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py
new file mode 100644
index 000000000..04afff0f7
--- /dev/null
+++ b/bot/exts/filtering/_filter_lists/invite.py
@@ -0,0 +1,114 @@
+from __future__ import annotations
+
+import typing
+from functools import reduce
+from operator import or_
+from typing import Optional
+
+from botcore.regex import DISCORD_INVITE
+from discord import Embed, Invite
+from discord.errors import NotFound
+
+import bot
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
+from bot.exts.filtering._filters.invite import InviteFilter
+from bot.exts.filtering._settings import ActionSettings
+from bot.exts.filtering._utils import clean_input
+
+if typing.TYPE_CHECKING:
+ from bot.exts.filtering.filtering import Filtering
+
+
+class InviteList(FilterList):
+ """A list of filters, each looking for guild invites to a specific guild."""
+
+ name = "invite"
+
+ def __init__(self, filtering_cog: Filtering):
+ super().__init__(InviteFilter)
+ filtering_cog.subscribe(self, Event.MESSAGE)
+
+ async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
+ _, failed = self.defaults[ListType.ALLOW]["validations"].evaluate(ctx)
+ if failed: # There's no invite filtering in this context.
+ return None, ""
+
+ text = clean_input(ctx.content)
+
+ # Avoid escape characters
+ text = text.replace("\\", "")
+
+ matches = list(DISCORD_INVITE.finditer(text))
+ invite_codes = {m.group("invite") for m in matches}
+ if not invite_codes:
+ return None, ""
+
+ # Sort the invites into three categories:
+ denied_by_default = dict() # Denied unless whitelisted.
+ allowed_by_default = dict() # Allowed unless blacklisted (partnered or verified servers).
+ disallowed_invites = dict() # Always denied (invalid invites).
+ for invite_code in invite_codes:
+ try:
+ invite = await bot.instance.fetch_invite(invite_code)
+ except NotFound:
+ disallowed_invites[invite_code] = None
+ else:
+ if not invite.guild:
+ disallowed_invites[invite_code] = invite
+ else:
+ if "PARTNERED" in invite.guild.features or "VERIFIED" in invite.guild.features:
+ allowed_by_default[invite_code] = invite
+ else:
+ denied_by_default[invite_code] = invite
+
+ # Add the disallowed by default unless they're whitelisted.
+ guilds_for_inspection = {invite.guild.id for invite in denied_by_default.values()}
+ new_ctx = ctx.replace(content=guilds_for_inspection)
+ allowed = {filter_.content for filter_ in self.filter_lists[ListType.ALLOW] if filter_.triggered_on(new_ctx)}
+ disallowed_invites.update({
+ invite_code: invite for invite_code, invite in denied_by_default.items() if invite.guild.id not in allowed
+ })
+
+ # Add the allowed by default only if they're blacklisted.
+ guilds_for_inspection = {invite.guild.id for invite in allowed_by_default.values()}
+ new_ctx = ctx.replace(content=guilds_for_inspection)
+ triggered = self.filter_list_result(
+ new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.DENY]["validations"]
+ )
+ disallowed_invites.update({
+ invite_code: invite for invite_code, invite in allowed_by_default.items()
+ if invite.guild.id in {filter_.content for filter_ in triggered}
+ })
+
+ if not disallowed_invites:
+ return None, ""
+
+ actions = None
+ if len(disallowed_invites) > len(triggered): # There are invites which weren't allowed but aren't blacklisted.
+ actions = reduce(or_, (filter_.actions for filter_ in triggered), self.defaults[ListType.ALLOW]["actions"])
+ elif triggered:
+ actions = reduce(or_, (filter_.actions for filter_ in triggered))
+ ctx.matches += {match[0] for match in matches if match.group("invite") in disallowed_invites}
+ ctx.alert_embeds += (self._guild_embed(invite) for invite in disallowed_invites.values() if invite)
+ return actions, ", ".join(f"`{invite}`" for invite in disallowed_invites)
+
+ @staticmethod
+ def _guild_embed(invite: Invite) -> Embed:
+ """Return an embed representing the guild invites to."""
+ embed = Embed()
+ if invite.guild:
+ embed.title = invite.guild.name
+ embed.set_thumbnail(url=invite.guild.icon.url)
+ embed.set_footer(text=f"Guild ID: {invite.guild.id}")
+ else:
+ embed.title = "Group DM"
+
+ embed.description = (
+ f"**Invite Code:** {invite.code}\n"
+ f"**Members:** {invite.approximate_member_count}\n"
+ f"**Active:** {invite.approximate_presence_count}"
+ )
+
+ return embed
diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py
index d4eb10591..c232b55e5 100644
--- a/bot/exts/filtering/_filter_lists/token.py
+++ b/bot/exts/filtering/_filter_lists/token.py
@@ -27,7 +27,7 @@ class TokensList(FilterList):
super().__init__(TokenFilter)
filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT)
- def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
+ async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]:
"""Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods."""
text = ctx.content
if not text:
diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py
index e7fff20a6..f1e5eac91 100644
--- a/bot/exts/filtering/_filters/filter.py
+++ b/bot/exts/filtering/_filters/filter.py
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
-from typing import Dict, Optional
+from typing import Optional
from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._settings import ActionSettings, create_settings
@@ -13,7 +13,7 @@ class Filter(ABC):
and defines what action should be performed if it is triggered.
"""
- def __init__(self, filter_data: Dict, action_defaults: Optional[ActionSettings] = None):
+ def __init__(self, filter_data: dict, action_defaults: Optional[ActionSettings] = None):
self.id = filter_data["id"]
self.content = filter_data["content"]
self.description = filter_data["description"]
diff --git a/bot/exts/filtering/_filters/invite.py b/bot/exts/filtering/_filters/invite.py
new file mode 100644
index 000000000..afe4fdd94
--- /dev/null
+++ b/bot/exts/filtering/_filters/invite.py
@@ -0,0 +1,17 @@
+from typing import Optional
+
+from bot.exts.filtering._filter_context import FilterContext
+from bot.exts.filtering._filters.filter import Filter
+from bot.exts.filtering._settings import ActionSettings
+
+
+class InviteFilter(Filter):
+ """A filter which looks for invites to a specific guild in messages."""
+
+ def __init__(self, filter_data: dict, action_defaults: Optional[ActionSettings] = None):
+ super().__init__(filter_data, action_defaults)
+ self.content = int(self.content)
+
+ def triggered_on(self, ctx: FilterContext) -> bool:
+ """Searches for a guild ID in the context content, given as a set of IDs."""
+ return self.content in ctx.content
diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py
index 68ffa166f..d308bf444 100644
--- a/bot/exts/filtering/_settings_types/infraction_and_notification.py
+++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py
@@ -81,7 +81,7 @@ class InfractionAndNotification(ActionEntry):
if dm_content or dm_embed:
dm_content = f"Hey {ctx.author.mention}!\n{dm_content}"
- dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple())
+ dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) if dm_embed else None
try:
await ctx.author.send(dm_content, embed=dm_embed)
diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py
index c22e7316f..5eefdf4e4 100644
--- a/bot/exts/filtering/filtering.py
+++ b/bot/exts/filtering/filtering.py
@@ -175,9 +175,7 @@ class Filtering(Cog):
# endregion
# region: helper functions
- async def _resolve_action(
- self, ctx: FilterContext
- ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]:
+ async def _resolve_action(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], dict[FilterList, str]]:
"""
Return the actions that should be taken for all filter lists in the given context.
@@ -187,7 +185,7 @@ class Filtering(Cog):
actions = []
messages = {}
for filter_list in self._subscriptions[ctx.event]:
- list_actions, list_message = filter_list.actions_for(ctx)
+ list_actions, list_message = await filter_list.actions_for(ctx)
if list_actions:
actions.append(list_actions)
if list_message:
@@ -231,7 +229,7 @@ class Filtering(Cog):
embed_content = embed_content[:4000] + " [...]"
embed.description = embed_content
- await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds])
+ await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds][:10])
def _get_list_by_name(self, list_name: str) -> FilterList:
"""Get a filter list by its name, or raise an error if there's no such list."""
@@ -248,12 +246,16 @@ class Filtering(Cog):
async def _send_list(self, ctx: Context, list_name: str, list_type: ListType) -> None:
"""Show the list of filters identified by the list name and type."""
filter_list = self._get_list_by_name(list_name)
- lines = list(map(str, filter_list.filter_lists.get(list_type, [])))
+ type_filters = filter_list.filter_lists.get(list_type)
+ if type_filters is None:
+ await ctx.send(f":x: There is no list of {past_tense(list_type.name.lower())} {filter_list.name}s.")
+ return
+
+ lines = list(map(str, type_filters))
log.trace(f"Sending a list of {len(lines)} filters.")
- list_name_plural = list_name + ("s" if not list_name.endswith("s") else "")
embed = Embed(colour=Colour.blue())
- embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {list_name_plural} ({len(lines)} total)")
+ embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {filter_list.name}s ({len(lines)} total)")
await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False)