diff options
author | 2022-02-26 22:03:56 +0200 | |
---|---|---|
committer | 2022-07-16 02:09:08 +0300 | |
commit | d763305d02a1bfd3cae1af3533837dc9dfb7966b (patch) | |
tree | 2b180c3d182162d1d51e813510ca95826953eec7 | |
parent | Add file extension filtering (diff) |
Add guild invite filtering
This commit adds the invite filtering implementation to the new system.
This also fixes an issue with the implementation of the extension filtering, where there was no way to tell the bot to ignore a user when they posted a non-whitelisted file extension, since there's no relevant filter in this scenario.
Instead the extensions and invites filters now use the whitelist validation defaults to dictate when filtering should be done at all. For example, if the list validations are to ignore Helpers, then no invite filtering will occur no matter what.
The meaning of this is that the system is somewhat less configurable, because settings overrides in filters belonging to a whitelist are meaningless.
Additionally this commit contains the following fixes:
- If the user tries to show the filters in a list which doesn't exist, it says it doesn't exist instead of saying there are 0 filters.
- The filter context content is `Union[str, set]` instead of `Union[str, set[str]]`.
- An empty embed will no longer be created when `dm_embed` is empty.
-rw-r--r-- | bot/exts/filtering/_filter_context.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/extension.py | 10 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/invite.py | 114 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/token.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/filter.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/invite.py | 17 | ||||
-rw-r--r-- | bot/exts/filtering/_settings_types/infraction_and_notification.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/filtering.py | 18 |
9 files changed, 154 insertions, 19 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py index ad5c8636f..2fec9ce42 100644 --- a/bot/exts/filtering/_filter_context.py +++ b/bot/exts/filtering/_filter_context.py @@ -22,13 +22,13 @@ class FilterContext: event: Event # The type of event author: User # Who triggered the event channel: Union[TextChannel, Thread, DMChannel] # The channel involved - content: Union[str, set[str]] # What actually needs filtering + content: Union[str, set] # What actually needs filtering message: Optional[Message] # The message involved embeds: list = field(default_factory=list) # Any embeds involved # Output context dm_content: str = field(default_factory=str) # The content to DM the invoker dm_embed: str = field(default_factory=str) # The embed description to DM the invoker - send_alert: bool = field(default=True) # Whether to send an alert for the moderators + send_alert: bool = field(default=False) # Whether to send an alert for the moderators alert_content: str = field(default_factory=str) # The content of the alert alert_embeds: list = field(default_factory=list) # Any embeds to add to the alert action_descriptions: list = field(default_factory=list) # What actions were taken diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py index c55cda114..ceb8bb958 100644 --- a/bot/exts/filtering/_filter_lists/extension.py +++ b/bot/exts/filtering/_filter_lists/extension.py @@ -43,20 +43,22 @@ class ExtensionsList(FilterList): filtering_cog.subscribe(self, Event.MESSAGE) self._whitelisted_description = None - def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" # Return early if the message doesn't have attachments. if not ctx.message.attachments: return None, "" + _, failed = self.defaults[ListType.ALLOW]["validations"].evaluate(ctx) + if failed: # There's no extension filtering in this context. + return None, "" + # Find all extensions in the message. all_ext = { (splitext(attachment.filename.lower())[1], attachment.filename) for attachment in ctx.message.attachments } new_ctx = ctx.replace(content={ext for ext, _ in all_ext}) # And prepare the context for the filters to read. - triggered = self.filter_list_result( - new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.ALLOW]["validations"] - ) + triggered = [filter_ for filter_ in self.filter_lists[ListType.ALLOW] if filter_.triggered_on(new_ctx)] allowed_ext = {filter_.content for filter_ in triggered} # Get the extensions in the message that are allowed. # See if there are any extensions left which aren't allowed. diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index 9fb144354..672811444 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -64,7 +64,7 @@ class FilterList(FieldRequiring): self.filter_lists[list_type] = filters @abstractmethod - def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" @staticmethod diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py new file mode 100644 index 000000000..04afff0f7 --- /dev/null +++ b/bot/exts/filtering/_filter_lists/invite.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import typing +from functools import reduce +from operator import or_ +from typing import Optional + +from botcore.regex import DISCORD_INVITE +from discord import Embed, Invite +from discord.errors import NotFound + +import bot +from bot.exts.filtering._filter_context import Event, FilterContext +from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType +from bot.exts.filtering._filters.invite import InviteFilter +from bot.exts.filtering._settings import ActionSettings +from bot.exts.filtering._utils import clean_input + +if typing.TYPE_CHECKING: + from bot.exts.filtering.filtering import Filtering + + +class InviteList(FilterList): + """A list of filters, each looking for guild invites to a specific guild.""" + + name = "invite" + + def __init__(self, filtering_cog: Filtering): + super().__init__(InviteFilter) + filtering_cog.subscribe(self, Event.MESSAGE) + + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" + _, failed = self.defaults[ListType.ALLOW]["validations"].evaluate(ctx) + if failed: # There's no invite filtering in this context. + return None, "" + + text = clean_input(ctx.content) + + # Avoid escape characters + text = text.replace("\\", "") + + matches = list(DISCORD_INVITE.finditer(text)) + invite_codes = {m.group("invite") for m in matches} + if not invite_codes: + return None, "" + + # Sort the invites into three categories: + denied_by_default = dict() # Denied unless whitelisted. + allowed_by_default = dict() # Allowed unless blacklisted (partnered or verified servers). + disallowed_invites = dict() # Always denied (invalid invites). + for invite_code in invite_codes: + try: + invite = await bot.instance.fetch_invite(invite_code) + except NotFound: + disallowed_invites[invite_code] = None + else: + if not invite.guild: + disallowed_invites[invite_code] = invite + else: + if "PARTNERED" in invite.guild.features or "VERIFIED" in invite.guild.features: + allowed_by_default[invite_code] = invite + else: + denied_by_default[invite_code] = invite + + # Add the disallowed by default unless they're whitelisted. + guilds_for_inspection = {invite.guild.id for invite in denied_by_default.values()} + new_ctx = ctx.replace(content=guilds_for_inspection) + allowed = {filter_.content for filter_ in self.filter_lists[ListType.ALLOW] if filter_.triggered_on(new_ctx)} + disallowed_invites.update({ + invite_code: invite for invite_code, invite in denied_by_default.items() if invite.guild.id not in allowed + }) + + # Add the allowed by default only if they're blacklisted. + guilds_for_inspection = {invite.guild.id for invite in allowed_by_default.values()} + new_ctx = ctx.replace(content=guilds_for_inspection) + triggered = self.filter_list_result( + new_ctx, self.filter_lists[ListType.ALLOW], self.defaults[ListType.DENY]["validations"] + ) + disallowed_invites.update({ + invite_code: invite for invite_code, invite in allowed_by_default.items() + if invite.guild.id in {filter_.content for filter_ in triggered} + }) + + if not disallowed_invites: + return None, "" + + actions = None + if len(disallowed_invites) > len(triggered): # There are invites which weren't allowed but aren't blacklisted. + actions = reduce(or_, (filter_.actions for filter_ in triggered), self.defaults[ListType.ALLOW]["actions"]) + elif triggered: + actions = reduce(or_, (filter_.actions for filter_ in triggered)) + ctx.matches += {match[0] for match in matches if match.group("invite") in disallowed_invites} + ctx.alert_embeds += (self._guild_embed(invite) for invite in disallowed_invites.values() if invite) + return actions, ", ".join(f"`{invite}`" for invite in disallowed_invites) + + @staticmethod + def _guild_embed(invite: Invite) -> Embed: + """Return an embed representing the guild invites to.""" + embed = Embed() + if invite.guild: + embed.title = invite.guild.name + embed.set_thumbnail(url=invite.guild.icon.url) + embed.set_footer(text=f"Guild ID: {invite.guild.id}") + else: + embed.title = "Group DM" + + embed.description = ( + f"**Invite Code:** {invite.code}\n" + f"**Members:** {invite.approximate_member_count}\n" + f"**Active:** {invite.approximate_presence_count}" + ) + + return embed diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index d4eb10591..c232b55e5 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -27,7 +27,7 @@ class TokensList(FilterList): super().__init__(TokenFilter) filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT) - def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: + async def actions_for(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], Optional[str]]: """Dispatch the given event to the list's filters, and return actions to take and a message to relay to mods.""" text = ctx.content if not text: diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py index e7fff20a6..f1e5eac91 100644 --- a/bot/exts/filtering/_filters/filter.py +++ b/bot/exts/filtering/_filters/filter.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict, Optional +from typing import Optional from bot.exts.filtering._filter_context import FilterContext from bot.exts.filtering._settings import ActionSettings, create_settings @@ -13,7 +13,7 @@ class Filter(ABC): and defines what action should be performed if it is triggered. """ - def __init__(self, filter_data: Dict, action_defaults: Optional[ActionSettings] = None): + def __init__(self, filter_data: dict, action_defaults: Optional[ActionSettings] = None): self.id = filter_data["id"] self.content = filter_data["content"] self.description = filter_data["description"] diff --git a/bot/exts/filtering/_filters/invite.py b/bot/exts/filtering/_filters/invite.py new file mode 100644 index 000000000..afe4fdd94 --- /dev/null +++ b/bot/exts/filtering/_filters/invite.py @@ -0,0 +1,17 @@ +from typing import Optional + +from bot.exts.filtering._filter_context import FilterContext +from bot.exts.filtering._filters.filter import Filter +from bot.exts.filtering._settings import ActionSettings + + +class InviteFilter(Filter): + """A filter which looks for invites to a specific guild in messages.""" + + def __init__(self, filter_data: dict, action_defaults: Optional[ActionSettings] = None): + super().__init__(filter_data, action_defaults) + self.content = int(self.content) + + def triggered_on(self, ctx: FilterContext) -> bool: + """Searches for a guild ID in the context content, given as a set of IDs.""" + return self.content in ctx.content diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py index 68ffa166f..d308bf444 100644 --- a/bot/exts/filtering/_settings_types/infraction_and_notification.py +++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py @@ -81,7 +81,7 @@ class InfractionAndNotification(ActionEntry): if dm_content or dm_embed: dm_content = f"Hey {ctx.author.mention}!\n{dm_content}" - dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) + dm_embed = Embed(description=dm_embed, colour=Colour.og_blurple()) if dm_embed else None try: await ctx.author.send(dm_content, embed=dm_embed) diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index c22e7316f..5eefdf4e4 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -175,9 +175,7 @@ class Filtering(Cog): # endregion # region: helper functions - async def _resolve_action( - self, ctx: FilterContext - ) -> tuple[Optional[ActionSettings], dict[FilterList, str]]: + async def _resolve_action(self, ctx: FilterContext) -> tuple[Optional[ActionSettings], dict[FilterList, str]]: """ Return the actions that should be taken for all filter lists in the given context. @@ -187,7 +185,7 @@ class Filtering(Cog): actions = [] messages = {} for filter_list in self._subscriptions[ctx.event]: - list_actions, list_message = filter_list.actions_for(ctx) + list_actions, list_message = await filter_list.actions_for(ctx) if list_actions: actions.append(list_actions) if list_message: @@ -231,7 +229,7 @@ class Filtering(Cog): embed_content = embed_content[:4000] + " [...]" embed.description = embed_content - await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds]) + await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds][:10]) def _get_list_by_name(self, list_name: str) -> FilterList: """Get a filter list by its name, or raise an error if there's no such list.""" @@ -248,12 +246,16 @@ class Filtering(Cog): async def _send_list(self, ctx: Context, list_name: str, list_type: ListType) -> None: """Show the list of filters identified by the list name and type.""" filter_list = self._get_list_by_name(list_name) - lines = list(map(str, filter_list.filter_lists.get(list_type, []))) + type_filters = filter_list.filter_lists.get(list_type) + if type_filters is None: + await ctx.send(f":x: There is no list of {past_tense(list_type.name.lower())} {filter_list.name}s.") + return + + lines = list(map(str, type_filters)) log.trace(f"Sending a list of {len(lines)} filters.") - list_name_plural = list_name + ("s" if not list_name.endswith("s") else "") embed = Embed(colour=Colour.blue()) - embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {list_name_plural} ({len(lines)} total)") + embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {filter_list.name}s ({len(lines)} total)") await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False) |