diff options
-rw-r--r-- | bot/exts/filtering/_filter_context.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/extension.py | 10 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/invite.py | 114 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/token.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/filter.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/invite.py | 17 | ||||
-rw-r--r-- | bot/exts/filtering/_settings_types/infraction_and_notification.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/filtering.py | 18 |
9 files changed, 154 insertions, 19 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py index ad5c8636f..2fec9ce42 100644 --- a/bot/exts/filtering/_filter_context.py +++ b/bot/exts/filtering/_filter_context.py @@ -22,13 +22,13 @@ class FilterContext: event: Event # The type of event author: User # Who triggered the event channel: Union[TextChannel, Thread, DMChannel] # The channel involved - content: Union[str, set[str]] # What actually needs filtering + content: Union[str, set] # What actually needs filtering message: Optional[Message] # The message involved embeds: list = field(default_factory=list) # Any embeds involved # Output context dm_content: str = field(default_factory=str) # The content to DM the invoker dm_embed: str = field(default_factory=str) # The embed description to DM the invoker - send_alert: bool = field(default=True) # Whether to send an alert for the moderators + send_alert: bool = field(default=False) # Whether to send an alert for the moderators alert_content: str = field(default_factory=str) # The content of the alert alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert action_descriptions: list = field(default_factory=list) # What actions were taken diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py index c55cda114..ceb8bb958 100644 --- a/bot/exts/filtering/_filter_lists/extension.py +++ b/bot/exts/filtering/_filter_lists/extension.py @@ -43,20 +43,22 @@ class ExtensionsList(FilterList): filtering_cog.subscribe(self, Event.MESSAGE) self._whitelisted_description = None - def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" # Return early if the message doesn't have attachments. if not ctx.message.attachments: return None, "" + _, failed = self.defaults[ListType.ALLOW]["validations"].evaluate(ctx) + if failed: # There's no extension filtering in this context. + return None, "" + # Find all extensions in the message. all_ext = { (splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments } new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read. - triggered = self.filter_list_result( - new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"] - ) + triggered = [filter_ for filter_ in self.filter_lists[ListType.ALLOW] if filter_.triggered_on(new_ctx)] allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed. # See if there are any extensions left which aren't allowed. diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index 9fb144354..672811444 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -64,7 +64,7 @@ class FilterList(FieldRequiring): self.filter_lists[list_type] = filters @abstractmethod - def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" @staticmethod diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py new file mode 100644 index 000000000..04afff0f7 --- /dev/null +++ b/bot/exts/filtering/_filter_lists/invite.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import typing +from functools import reduce +from operator import or_ +from typing import Optional + +from botcore.regex import DISCORD_INVITE +from discord import Embed, Invite +from discord.errors import NotFound + +import bot +from bot.exts.filtering._filter_context import Event, FilterContext +from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType +from bot.exts.filtering._filters.invite import InviteFilter +from bot.exts.filtering._settings import ActionSettings +from bot.exts.filtering._utils import clean_input + +if typing.TYPE_CHECKING: + from bot.exts.filtering.filtering import Filtering + + +class InviteList(FilterList): + """A list of filters, each looking for guild invites to a specific guild.""" + + name = "invite" + + def __init__(self, filtering_cog: Filtering): + super().__init__(InviteFilter) + filtering_cog.subscribe(self, Event.MESSAGE) + + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" + _, failed = self.defaults[ListType.ALLOW]["validations"].evaluate(ctx) + if failed: # There's no invite filtering in this context. + return None, "" + + text = clean_input(ctx.content) + + # Avoid escape characters + text = text.replace("\\", "") + + matches = list(DISCORD_INVITE.finditer(text)) + invite_codes = {m.group("invite") for m in matches} + if not invite_codes: + return None, "" + + # Sort the invites into three categories: + denied_by_default = dict() # Denied unless whitelisted. + allowed_by_default = dict() # Allowed unless blacklisted (partnered or verified servers). + disallowed_invites = dict() # Always denied (invalid invites). + for invite_code in invite_codes: + try: + invite = await bot.instance.fetch_invite(invite_code) + except NotFound: + disallowed_invites[invite_code] = None + else: + if not invite.guild: + disallowed_invites[invite_code] = invite + else: + if "PARTNERED" in invite.guild.features or "VERIFIED" in invite.guild.features: + allowed_by_default[invite_code] = invite + else: + denied_by_default[invite_code] = invite + + # Add the disallowed by default unless they're whitelisted. + guilds_for_inspection = {invite.guild.id for invite in denied_by_default.values()} + new_ctx = ctx.replace(content=guilds_for_inspection) + allowed = {filter_.content for filter_ in self.filter_lists[ListType.ALLOW] if filter_.triggered_on(new_ctx)} + disallowed_invites.update({ + invite_code: invite for invite_code, invite in denied_by_default.items() if invite.guild.id not in allowed + }) + + # Add the allowed by default only if they're blacklisted. + guilds_for_inspection = {invite.guild.id for invite in allowed_by_default.values()} + new_ctx = ctx.replace(content=guilds_for_inspection) + triggered = self.filter_list_result( + new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.DENY]["validations"] + ) + disallowed_invites.update({ + invite_code: invite for invite_code, invite in allowed_by_default.items() + if invite.guild.id in {filter_.content for filter_ in triggered} + }) + + if not disallowed_invites: + return None, "" + + actions = None + if len(disallowed_invites) > len(triggered): # There are invites which weren't allowed but aren't blacklisted. + actions = reduce(or_, (filter_.actions for filter_ in triggered), self.defaults[ListType.ALLOW]["actions"]) + elif triggered: + actions = reduce(or_, (filter_.actions for filter_ in triggered)) + ctx.matches += {match[0] for match in matches if match.group("invite") in disallowed_invites} + ctx.alert_embeds += (self._guild_embed(invite) for invite in disallowed_invites.values() if invite) + return actions, ", ".join(f"`{invite}`" for invite in disallowed_invites) + + @staticmethod + def _guild_embed(invite: Invite) -> Embed: + """Return an embed representing the guild invites to.""" + embed = Embed() + if invite.guild: + embed.title = invite.guild.name + embed.set_thumbnail(url=invite.guild.icon.url) + embed.set_footer(text=f"Guild ID: {invite.guild.id}") + else: + embed.title = "Group DM" + + embed.description = ( + f"**Invite Code:** {invite.code}\n" + f"**Members:** {invite.approximate_member_count}\n" + f"**Active:** {invite.approximate_presence_count}" + ) + + return embed diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index d4eb10591..c232b55e5 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -27,7 +27,7 @@ class TokensList(FilterList): super().__init__(TokenFilter) filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT) - def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" text = ctx.content if not text: diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py index e7fff20a6..f1e5eac91 100644 --- a/bot/exts/filtering/_filters/filter.py +++ b/bot/exts/filtering/_filters/filter.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict, Optional +from typing import Optional from bot.exts.filtering._filter_context import FilterContext from bot.exts.filtering._settings import ActionSettings, create_settings @@ -13,7 +13,7 @@ class Filter(ABC): and defines what action should be performed if it is triggered. """ - def __init__(self, filter_data: Dict, action_defaults: Optional[ActionSettings] = None): + def __init__(self, filter_data: dict, action_defaults: Optional[ActionSettings] = None): self.id = filter_data["id"] self.content = filter_data["content"] self.description = filter_data["description"] diff --git a/bot/exts/filtering/_filters/invite.py b/bot/exts/filtering/_filters/invite.py new file mode 100644 index 000000000..afe4fdd94 --- /dev/null +++ b/bot/exts/filtering/_filters/invite.py @@ -0,0 +1,17 @@ +from typing import Optional + +from bot.exts.filtering._filter_context import FilterContext +from bot.exts.filtering._filters.filter import Filter +from bot.exts.filtering._settings import ActionSettings + + +class InviteFilter(Filter): + """A filter which looks for invites to a specific guild in messages.""" + + def __init__(self, filter_data: dict, action_defaults: Optional[ActionSettings] = None): + super().__init__(filter_data, action_defaults) + self.content = int(self.content) + + def triggered_on(self, ctx: FilterContext) -> bool: + """Searches for a guild ID in the context content, given as a set of IDs.""" + return self.content in ctx.content diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py index 68ffa166f..d308bf444 100644 --- a/bot/exts/filtering/_settings_types/infraction_and_notification.py +++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py @@ -81,7 +81,7 @@ class InfractionAndNotification(ActionEntry): if dm_content or dm_embed: dm_content = f"Hey {ctx.author.mention}!\n{dm_content}" - dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) + dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) if dm_embed else None try: await ctx.author.send(dm_content, embed=dm_embed) diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index c22e7316f..5eefdf4e4 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -175,9 +175,7 @@ class Filtering(Cog): # endregion # region: helper functions - async def _resolve_action( - self, ctx: FilterContext - ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]: + async def _resolve_action(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], dict[FilterList, str]]: """ Return the actions that should be taken for all filter lists in the given context. @@ -187,7 +185,7 @@ class Filtering(Cog): actions = [] messages = {} for filter_list in self._subscriptions[ctx.event]: - list_actions, list_message = filter_list.actions_for(ctx) + list_actions, list_message = await filter_list.actions_for(ctx) if list_actions: actions.append(list_actions) if list_message: @@ -231,7 +229,7 @@ class Filtering(Cog): embed_content = embed_content[:4000] + " [...]" embed.description = embed_content - await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds]) + await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds][:10]) def _get_list_by_name(self, list_name: str) -> FilterList: """Get a filter list by its name, or raise an error if there's no such list.""" @@ -248,12 +246,16 @@ class Filtering(Cog): async def _send_list(self, ctx: Context, list_name: str, list_type: ListType) -> None: """Show the list of filters identified by the list name and type.""" filter_list = self._get_list_by_name(list_name) - lines = list(map(str, filter_list.filter_lists.get(list_type, []))) + type_filters = filter_list.filter_lists.get(list_type) + if type_filters is None: + await ctx.send(f":x: There is no list of {past_tense(list_type.name.lower())} {filter_list.name}s.") + return + + lines = list(map(str, type_filters)) log.trace(f"Sending a list of {len(lines)} filters.") - list_name_plural = list_name + ("s" if not list_name.endswith("s") else "") embed = Embed(colour=Colour.blue()) - embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {list_name_plural} ({len(lines)} total)") + embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {filter_list.name}s ({len(lines)} total)") await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False) |