diff options
author | 2022-09-29 22:11:50 +0300 | |
---|---|---|
committer | 2022-09-29 22:11:50 +0300 | |
commit | f10d3de8699ae52e656396e9fdac0f6f37f2fe47 (patch) | |
tree | 7ddd2d60046537f26594214e5f61678ca817723c | |
parent | Merge branch 'main' into new-filters (diff) |
Filter adding commands, simplify infraction
This commit adds the mechanic to add new filters.
The overrides of the settings can be provided in the command itself, but also through a UI made of Discord components.
The UI adjusts itself to the data type of the setting, e.g a boolean setting will invoke a select with "True" and "False" options.
This commit additionally gets rid of the mechanic to apply a superstar alongside another higher tier infraction for the same message. This is an edge case that isn't worth the complexity at the moment.
Includes a small fix to the Ping setting which made the __or__ method malfunction.
-rw-r--r-- | bot/exts/filtering/_filter_lists/domain.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/extension.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 24 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/invite.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/token.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/filter.py | 16 | ||||
-rw-r--r-- | bot/exts/filtering/_settings_types/infraction_and_notification.py | 95 | ||||
-rw-r--r-- | bot/exts/filtering/_settings_types/ping.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/_ui.py | 621 | ||||
-rw-r--r-- | bot/exts/filtering/filtering.py | 217 |
10 files changed, 869 insertions, 122 deletions
diff --git a/bot/exts/filtering/_filter_lists/domain.py b/bot/exts/filtering/_filter_lists/domain.py index 7f92b62e8..6e12dad41 100644 --- a/bot/exts/filtering/_filter_lists/domain.py +++ b/bot/exts/filtering/_filter_lists/domain.py @@ -36,6 +36,10 @@ class DomainsList(FilterList): super().__init__(DomainFilter) filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT) + def get_filter_type(self, content: str) -> Type[Filter]: + """Get a subclass of filter matching the filter list and the filter's content.""" + return DomainFilter + @property def filter_types(self) -> set[Type[Filter]]: """Return the types of filters used by this list.""" diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py index 2447bebde..e34ead393 100644 --- a/bot/exts/filtering/_filter_lists/extension.py +++ b/bot/exts/filtering/_filter_lists/extension.py @@ -53,6 +53,10 @@ class ExtensionsList(FilterList): filtering_cog.subscribe(self, Event.MESSAGE) self._whitelisted_description = None + def get_filter_type(self, content: str) -> Type[Filter]: + """Get a subclass of filter matching the filter list and the filter's content.""" + return ExtensionFilter + @property def filter_types(self) -> set[Type[Filter]]: """Return the types of filters used by this list.""" diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index 3b5138fe4..c34f46878 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -44,6 +44,7 @@ class FilterList(FieldRequiring): name = FieldRequiring.MUST_SET_UNIQUE def __init__(self, filter_type: Type[Filter]): + self.list_ids = {} self.filter_lists: dict[ListType, dict[int, Filter]] = {} self.defaults = {} @@ -54,14 +55,25 @@ class FilterList(FieldRequiring): actions, validations = create_settings(list_data["settings"], keep_empty=True) list_type = ListType(list_data["list_type"]) self.defaults[list_type] = {"actions": actions, "validations": validations} + self.list_ids[list_type] = list_data["id"] - filters = {} + self.filter_lists[list_type] = {} for filter_data in list_data["filters"]: - try: - filters[filter_data["id"]] = self.filter_type(filter_data) - except TypeError as e: - log.warning(e) - self.filter_lists[list_type] = filters + self.add_filter(filter_data, list_type) + + def add_filter(self, filter_data: dict, list_type: ListType) -> Filter: + """Add a filter to the list of the specified type.""" + try: + new_filter = self.filter_type(filter_data) + self.filter_lists[list_type][filter_data["id"]] = new_filter + except TypeError as e: + log.warning(e) + else: + return new_filter + + @abstractmethod + def get_filter_type(self, content: str) -> Type[Filter]: + """Get a subclass of filter matching the filter list and the filter's content.""" @property @abstractmethod diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py index 4e8d74d8a..095699597 100644 --- a/bot/exts/filtering/_filter_lists/invite.py +++ b/bot/exts/filtering/_filter_lists/invite.py @@ -42,6 +42,10 @@ class InviteList(FilterList): super().__init__(InviteFilter) filtering_cog.subscribe(self, Event.MESSAGE) + def get_filter_type(self, content: str) -> Type[Filter]: + """Get a subclass of filter matching the filter list and the filter's content.""" + return InviteFilter + @property def filter_types(self) -> set[Type[Filter]]: """Return the types of filters used by this list.""" diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index c989b06b9..aca0fdedf 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -37,6 +37,10 @@ class TokensList(FilterList): super().__init__(TokenFilter) filtering_cog.subscribe(self, Event.MESSAGE, Event.MESSAGE_EDIT) + def get_filter_type(self, content: str) -> Type[Filter]: + """Get a subclass of filter matching the filter list and the filter's content.""" + return TokenFilter + @property def filter_types(self) -> set[Type[Filter]]: """Return the types of filters used by this list.""" diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py index da149dce6..92393871a 100644 --- a/bot/exts/filtering/_filters/filter.py +++ b/bot/exts/filtering/_filters/filter.py @@ -1,4 +1,7 @@ from abc import abstractmethod +from typing import Optional + +from pydantic import ValidationError from bot.exts.filtering._filter_context import FilterContext from bot.exts.filtering._settings import create_settings @@ -32,6 +35,19 @@ class Filter(FieldRequiring): def triggered_on(self, ctx: FilterContext) -> bool: """Search for the filter's content within a given context.""" + @classmethod + def validate_filter_settings(cls, extra_fields: dict) -> tuple[bool, Optional[str]]: + """Validate whether the supplied fields are valid for the filter, and provide the error message if not.""" + if cls.extra_fields_type is None: + return True, None + + try: + cls.extra_fields_type(**extra_fields) + except ValidationError as e: + return False, repr(e) + else: + return True, None + def __str__(self) -> str: """A string representation of the filter.""" string = f"#{self.id}. `{self.content}`" diff --git a/bot/exts/filtering/_settings_types/infraction_and_notification.py b/bot/exts/filtering/_settings_types/infraction_and_notification.py index 9c7d7b8ff..c1fc00760 100644 --- a/bot/exts/filtering/_settings_types/infraction_and_notification.py +++ b/bot/exts/filtering/_settings_types/infraction_and_notification.py @@ -1,7 +1,6 @@ -from collections import namedtuple from datetime import timedelta from enum import Enum, auto -from typing import ClassVar, Optional +from typing import ClassVar import arrow from discord import Colour, Embed @@ -25,25 +24,11 @@ class Infraction(Enum): WARNING = auto() WATCH = auto() NOTE = auto() - NONE = auto() # Allows making operations on an entry with no infraction without checking for None. - - def __bool__(self) -> bool: - """ - Make the NONE value false-y. - - This is useful for Settings.create to evaluate whether the entry contains anything. - """ - return self != Infraction.NONE def __str__(self) -> str: - if self == Infraction.NONE: - return "" return self.name -superstar = namedtuple("superstar", ["reason", "duration"]) - - class InfractionAndNotification(ActionEntry): """ A setting entry which specifies what infraction to issue and the notification to DM the user. @@ -66,23 +51,22 @@ class InfractionAndNotification(ActionEntry): "dm_embed": "The contents of the embed to be DMed to the offending user." } - dm_content: str - dm_embed: str - infraction_type: Optional[Infraction] - infraction_reason: Optional[str] - infraction_duration: Optional[float] - superstar: Optional[superstar] = None + dm_content: str | None + dm_embed: str | None + infraction_type: Infraction | None + infraction_reason: str | None + infraction_duration: float | None @validator("infraction_type", pre=True) @classmethod def convert_infraction_name(cls, infr_type: str) -> Infraction: """Convert the string to an Infraction by name.""" - return Infraction[infr_type.replace(" ", "_").upper()] if infr_type else Infraction.NONE + return Infraction[infr_type.replace(" ", "_").upper()] if infr_type else None async def action(self, ctx: FilterContext) -> None: """Send the notification to the user, and apply any specified infractions.""" # If there is no infraction to apply, any DM contents already provided in the context take precedence. - if self.infraction_type == Infraction.NONE and (ctx.dm_content or ctx.dm_embed): + if self.infraction_type is None and (ctx.dm_content or ctx.dm_embed): dm_content = ctx.dm_content dm_embed = ctx.dm_embed else: @@ -107,21 +91,11 @@ class InfractionAndNotification(ActionEntry): msg_ctx.guild = bot.instance.get_guild(Guild.id) msg_ctx.author = ctx.author msg_ctx.channel = ctx.channel - if self.superstar: - msg_ctx.command = bot.instance.get_command("superstarify") - await msg_ctx.invoke( - msg_ctx.command, - ctx.author, - arrow.utcnow() + timedelta(seconds=self.superstar.duration) - if self.superstar.duration is not None else None, - reason=self.superstar.reason - ) - ctx.action_descriptions.append("superstar") - if self.infraction_type != Infraction.NONE: + if self.infraction_type is not None: if self.infraction_type == Infraction.BAN or not hasattr(ctx.channel, "guild"): msg_ctx.channel = bot.instance.get_channel(Channels.mod_alerts) - msg_ctx.command = bot.instance.get_command(self.infraction_type.name) + msg_ctx.command = bot.instance.get_command(self.infraction_type.name.lower()) await msg_ctx.invoke( msg_ctx.command, ctx.author, @@ -137,12 +111,6 @@ class InfractionAndNotification(ActionEntry): If the infractions are different, take the data of the one higher up the hierarchy. - A special case is made for superstar infractions. Even if we decide to auto-mute a user, if they have a - particularly problematic username we will still want to superstarify them. - - This is a "best attempt" implementation. Trying to account for any type of combination would create an - extremely complex ruleset. For example, we could special-case watches as well. - There is no clear way to properly combine several notification messages, especially when it's in two parts. To avoid bombarding the user with several notifications, the message with the more significant infraction is used. @@ -151,42 +119,19 @@ class InfractionAndNotification(ActionEntry): return NotImplemented # Lower number -> higher in the hierarchy - if self.infraction_type.value < other.infraction_type.value and other.infraction_type != Infraction.SUPERSTAR: - result = self.copy() - result.superstar = self._merge_superstars(self.superstar, other.superstar) - return result - elif self.infraction_type.value > other.infraction_type.value and self.infraction_type != Infraction.SUPERSTAR: - result = other.copy() - result.superstar = self._merge_superstars(self.superstar, other.superstar) - return result - - if self.infraction_type == other.infraction_type: + if self.infraction_type is None: + return other.copy() + elif other.infraction_type is None: + return self.copy() + elif self.infraction_type.value < other.infraction_type.value: + return self.copy() + elif self.infraction_type.value > other.infraction_type.value: + return other.copy() + else: if self.infraction_duration is None or ( - other.infraction_duration is not None and self.infraction_duration > other.infraction_duration + other.infraction_duration is not None and self.infraction_duration > other.infraction_duration ): result = self.copy() else: result = other.copy() - result.superstar = self._merge_superstars(self.superstar, other.superstar) return result - - # At this stage the infraction types are different, and the lower one is a superstar. - if self.infraction_type.value < other.infraction_type.value: - result = self.copy() - result.superstar = superstar(other.infraction_reason, other.infraction_duration) - else: - result = other.copy() - result.superstar = superstar(self.infraction_reason, self.infraction_duration) - return result - - @staticmethod - def _merge_superstars(superstar1: Optional[superstar], superstar2: Optional[superstar]) -> Optional[superstar]: - """Take the superstar with the greater duration.""" - if not superstar1: - return superstar2 - if not superstar2: - return superstar1 - - if superstar1.duration is None or superstar1.duration > superstar2.duration: - return superstar1 - return superstar2 diff --git a/bot/exts/filtering/_settings_types/ping.py b/bot/exts/filtering/_settings_types/ping.py index 8a3403b59..0bfc12809 100644 --- a/bot/exts/filtering/_settings_types/ping.py +++ b/bot/exts/filtering/_settings_types/ping.py @@ -45,7 +45,7 @@ class Ping(ActionEntry): if not isinstance(other, Ping): return NotImplemented - return Ping(ping_type=self.guild_pings | other.guild_pings, dm_ping_type=self.dm_pings | other.dm_pings) + return Ping(guild_pings=self.guild_pings | other.guild_pings, dm_pings=self.dm_pings | other.dm_pings) @staticmethod @cache diff --git a/bot/exts/filtering/_ui.py b/bot/exts/filtering/_ui.py index efedb2c0c..d40d4d41e 100644 --- a/bot/exts/filtering/_ui.py +++ b/bot/exts/filtering/_ui.py @@ -1,13 +1,42 @@ -from typing import Callable, Optional +from __future__ import annotations + +import re +from enum import EnumMeta +from functools import partial +from typing import Any, Callable, Coroutine, Optional, TypeVar, Union import discord import discord.ui -from discord.ext.commands import Context +from botcore.site_api import ResponseCodeError +from botcore.utils import scheduling +from discord import Embed, Interaction, User +from discord.ext.commands import BadArgument, Context +from discord.ui.select import MISSING, SelectOption +from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType +from bot.exts.filtering._filters.filter import Filter +from bot.exts.filtering._utils import to_serializable from bot.log import get_logger log = get_logger(__name__) +# Max number of characters in a Discord embed field value, minus 6 characters for a placeholder. +MAX_FIELD_SIZE = 1018 +# Max number of characters for an embed field's value before it should take its own line. +MAX_INLINE_SIZE = 50 +# Number of seconds before a settings editing view timeout. +EDIT_TIMEOUT = 600 +# Number of seconds before timeout of an editing component. +COMPONENT_TIMEOUT = 180 +# Max length of modal title +MAX_MODAL_TITLE_LENGTH = 45 +# Max length of modal text component label +MAX_MODAL_LABEL_LENGTH = 45 +# Max number of items in a select +MAX_SELECT_ITEMS = 25 + +T = TypeVar('T') + class ArgumentCompletionSelect(discord.ui.Select): """A select detailing the options that can be picked to assign to a missing argument.""" @@ -66,3 +95,591 @@ class ArgumentCompletionView(discord.ui.View): await interaction.response.send_message(embed=embed, ephemeral=True) return False return True + + +def build_filter_repr_dict( + filter_list: FilterList, + list_type: ListType, + filter_type: type[Filter], + settings_overrides: dict, + extra_fields_overrides: dict +) -> dict: + """Build a dictionary of field names and values to pass to `_build_embed_from_dict`.""" + # Get filter list settings + default_setting_values = {} + for type_ in ("actions", "validations"): + for _, setting in filter_list.defaults[list_type][type_].items(): + default_setting_values.update(to_serializable(setting.dict())) + + # Add overrides. It's done in this way to preserve field order, since the filter won't have all settings. + total_values = {} + for name, value in default_setting_values.items(): + if name not in settings_overrides: + total_values[name] = value + else: + total_values[f"{name}*"] = settings_overrides[name] + + # Add the filter-specific settings. + if filter_type.extra_fields_type: + # This iterates over the default values of the extra fields model. + for name, value in filter_type.extra_fields_type().dict().items(): + if name not in extra_fields_overrides: + total_values[f"{filter_type.name}/{name}"] = value + else: + total_values[f"{filter_type.name}/{name}*"] = value + + return total_values + + +def populate_embed_from_dict(embed: Embed, data: dict) -> None: + """Populate a Discord embed by populating fields from the given dict.""" + for setting, value in data.items(): + if setting.startswith("_"): + continue + if type(value) in (set, tuple): + value = list(value) + value = str(value) if value not in ("", None) else "-" + if len(value) > MAX_FIELD_SIZE: + value = value[:MAX_FIELD_SIZE] + " [...]" + embed.add_field(name=setting, value=value, inline=len(value) < MAX_INLINE_SIZE) + + +class CustomCallbackSelect(discord.ui.Select): + """A selection which calls the provided callback on interaction.""" + + def __init__( + self, + callback: Callable[[Interaction, discord.ui.Select], Coroutine[None]], + *, + custom_id: str = MISSING, + placeholder: str | None = None, + min_values: int = 1, + max_values: int = 1, + options: list[SelectOption] = MISSING, + disabled: bool = False, + row: int | None = None, + ): + super().__init__( + custom_id=custom_id, + placeholder=placeholder, + min_values=min_values, + max_values=max_values, + options=options, + disabled=disabled, + row=row + ) + self.custom_callback = callback + + async def callback(self, interaction: Interaction) -> Any: + """Invoke the provided callback.""" + await self.custom_callback(interaction, self) + + +class EditContentModal(discord.ui.Modal, title="Edit Content"): + """A modal to input a filter's content.""" + + content = discord.ui.TextInput(label="Content") + + def __init__(self, embed_view: SettingsEditView, message: discord.Message): + super().__init__(timeout=COMPONENT_TIMEOUT) + self.embed_view = embed_view + self.message = message + + async def on_submit(self, interaction: Interaction) -> None: + """Update the embed with the new content.""" + await interaction.response.defer() + await self.embed_view.update_embed(self.message, content=self.content.value) + + +class EditDescriptionModal(discord.ui.Modal, title="Edit Description"): + """A modal to input a filter's description.""" + + description = discord.ui.TextInput(label="Description") + + def __init__(self, embed_view: SettingsEditView, message: discord.Message): + super().__init__(timeout=COMPONENT_TIMEOUT) + self.embed_view = embed_view + self.message = message + + async def on_submit(self, interaction: Interaction) -> None: + """Update the embed with the new description.""" + await interaction.response.defer() + await self.embed_view.update_embed(self.message, description=self.description.value) + + +class BooleanSelectView(discord.ui.View): + """A view containing an instance of BooleanSelect.""" + + class BooleanSelect(discord.ui.Select): + """Select a true or false value and send it to the supplied callback.""" + + def __init__(self, setting_name: str, update_callback: Callable): + super().__init__(options=[SelectOption(label="True"), SelectOption(label="False")]) + self.setting_name = setting_name + self.update_callback = update_callback + + async def callback(self, interaction: Interaction) -> Any: + """Respond to the interaction by sending the boolean value to the update callback.""" + await interaction.response.edit_message(content="✅ Edit confirmed", view=None) + value = self.values[0] == "True" + await self.update_callback(setting_name=self.setting_name, setting_value=value) + + def __init__(self, setting_name: str, update_callback: Callable): + super().__init__(timeout=COMPONENT_TIMEOUT) + self.add_item(self.BooleanSelect(setting_name, update_callback)) + + +class FreeInputModal(discord.ui.Modal): + """A modal to freely enter a value for a setting.""" + + def __init__(self, setting_name: str, required: bool, type_: type, update_callback: Callable): + title = f"{setting_name} Input" if len(setting_name) < MAX_MODAL_TITLE_LENGTH - 6 else "Setting Input" + super().__init__(timeout=COMPONENT_TIMEOUT, title=title) + + self.setting_name = setting_name + self.type_ = type_ + self.update_callback = update_callback + + label = setting_name if len(setting_name) < MAX_MODAL_TITLE_LENGTH else "Value" + self.setting_input = discord.ui.TextInput(label=label, style=discord.TextStyle.paragraph, required=required) + self.add_item(self.setting_input) + + async def on_submit(self, interaction: Interaction) -> None: + """Update the setting with the new value in the embed.""" + try: + value = self.type_(self.setting_input.value) or None + except (ValueError, TypeError): + await interaction.response.send_message( + f"Could not process the input value for `{self.setting_name}`.", ephemeral=True + ) + else: + await interaction.response.defer() + await self.update_callback(setting_name=self.setting_name, setting_value=value) + + +class SequenceEditView(discord.ui.View): + """A view to modify the contents of a sequence of values.""" + + class SingleItemModal(discord.ui.Modal): + """A modal to enter a single list item.""" + + new_item = discord.ui.TextInput(label="New Item") + + def __init__(self, view: SequenceEditView): + super().__init__(title="Item Addition", timeout=COMPONENT_TIMEOUT) + self.view = view + + async def on_submit(self, interaction: Interaction) -> None: + """Send the submitted value to be added to the list.""" + await self.view.apply_addition(interaction, self.new_item.value) + + class NewListModal(discord.ui.Modal): + """A modal to enter new contents for the list.""" + + new_value = discord.ui.TextInput(label="Enter comma separated values", style=discord.TextStyle.paragraph) + + def __init__(self, view: SequenceEditView): + super().__init__(title="New List", timeout=COMPONENT_TIMEOUT) + self.view = view + + async def on_submit(self, interaction: Interaction) -> None: + """Send the submitted value to be added to the list.""" + await self.view.apply_edit(interaction, self.new_value.value) + + def __init__(self, setting_name: str, starting_value: list, type_: type, update_callback: Callable): + super().__init__(timeout=COMPONENT_TIMEOUT) + self.setting_name = setting_name + self.stored_value = starting_value + self.type_ = type_ + self.update_callback = update_callback + + options = [SelectOption(label=item) for item in starting_value[:MAX_SELECT_ITEMS]] + self.removal_select = CustomCallbackSelect( + self.apply_removal, placeholder="Enter an item to remove", options=options, row=1 + ) + if starting_value: + self.add_item(self.removal_select) + + async def apply_removal(self, interaction: Interaction, select: discord.ui.Select) -> None: + """Remove an item from the list.""" + self.stored_value.remove(select.values[0]) + select.options = [SelectOption(label=item) for item in self.stored_value[:MAX_SELECT_ITEMS]] + if not self.stored_value: + self.remove_item(self.removal_select) + await interaction.response.edit_message(content=f"Current list: {self.stored_value}", view=self) + + async def apply_addition(self, interaction: Interaction, item: str) -> None: + """Add an item to the list.""" + self.stored_value.append(item) + self.removal_select.options = [SelectOption(label=item) for item in self.stored_value[:MAX_SELECT_ITEMS]] + if len(self.stored_value) == 1: + self.add_item(self.removal_select) + await interaction.response.edit_message(content=f"Current list: {self.stored_value}", view=self) + + async def apply_edit(self, interaction: Interaction, new_list: str) -> None: + """Change the contents of the list.""" + self.stored_value = new_list.split(",") + self.removal_select.options = [SelectOption(label=item) for item in self.stored_value[:MAX_SELECT_ITEMS]] + if len(self.stored_value) == 1: + self.add_item(self.removal_select) + await interaction.response.edit_message(content=f"Current list: {self.stored_value}", view=self) + + @discord.ui.button(label="Add Value") + async def add_value(self, interaction: Interaction, button: discord.ui.Button) -> None: + """A button to add an item to the list.""" + await interaction.response.send_modal(self.SingleItemModal(self)) + + @discord.ui.button(label="Free Input") + async def free_input(self, interaction: Interaction, button: discord.ui.Button) -> None: + """A button to change the entire list.""" + await interaction.response.send_modal(self.NewListModal(self)) + + @discord.ui.button(label="✅ Confirm", style=discord.ButtonStyle.green) + async def confirm(self, interaction: Interaction, button: discord.ui.Button) -> None: + """Send the final value to the embed editor.""" + final_value = self.type_(self.stored_value) + # Edit first, it might time out otherwise. + await interaction.response.edit_message(content="✅ Edit confirmed", view=None) + await self.update_callback(setting_name=self.setting_name, setting_value=final_value) + self.stop() + + @discord.ui.button(label="🚫 Cancel", style=discord.ButtonStyle.red) + async def cancel(self, interaction: Interaction, button: discord.ui.Button) -> None: + """Cancel the list editing.""" + await interaction.response.edit_message(content="🚫 Canceled", view=None) + self.stop() + + +class EnumSelectView(discord.ui.View): + """A view containing an instance of EnumSelect.""" + + class EnumSelect(discord.ui.Select): + """Select an enum value and send it to the supplied callback.""" + + def __init__(self, setting_name: str, enum_cls: EnumMeta, update_callback: Callable): + super().__init__(options=[SelectOption(label=elem.name) for elem in enum_cls]) + self.setting_name = setting_name + self.enum_cls = enum_cls + self.update_callback = update_callback + + async def callback(self, interaction: Interaction) -> Any: + """Respond to the interaction by sending the enum value to the update callback.""" + await interaction.response.edit_message(content="✅ Edit confirmed", view=None) + await self.update_callback(setting_name=self.setting_name, setting_value=self.values[0]) + + def __init__(self, setting_name: str, enum_cls: EnumMeta, update_callback: Callable): + super().__init__(timeout=COMPONENT_TIMEOUT) + self.add_item(self.EnumSelect(setting_name, enum_cls, update_callback)) + + +class SettingsEditView(discord.ui.View): + """A view used to edit a filter's settings before updating the database.""" + + class _REMOVE: + """Sentinel value for when an override should be removed.""" + + def __init__( + self, + filter_list: FilterList, + list_type: ListType, + filter_type: type[Filter], + content: str | None, + description: str | None, + settings_overrides: dict, + filter_settings_overrides: dict, + loaded_settings: dict, + loaded_filter_settings: dict, + author: User, + embed: Embed, + confirm_callback: Callable + ): + super().__init__(timeout=EDIT_TIMEOUT) + self.filter_list = filter_list + self.list_type = list_type + self.filter_type = filter_type + self.content = content + self.description = description + self.settings_overrides = settings_overrides + self.filter_settings_overrides = filter_settings_overrides + self.loaded_settings = loaded_settings + self.loaded_filter_settings = loaded_filter_settings + self.author = author + self.embed = embed + self.confirm_callback = confirm_callback + + all_settings_repr_dict = build_filter_repr_dict( + filter_list, list_type, filter_type, settings_overrides, filter_settings_overrides + ) + populate_embed_from_dict(embed, all_settings_repr_dict) + + self.type_per_setting_name = {setting: info[2] for setting, info in loaded_settings.items()} + self.type_per_setting_name.update({ + f"{filter_type.name}/{name}": type_ + for name, (_, _, type_) in loaded_filter_settings.get(filter_type.name, {}).items() + }) + + add_select = CustomCallbackSelect( + self._prompt_new_override, + placeholder="Select a setting to edit", + options=[SelectOption(label=name) for name in self.type_per_setting_name], + row=1 + ) + self.add_item(add_select) + + override_names = list(settings_overrides) + list(filter_settings_overrides) + remove_select = CustomCallbackSelect( + self._remove_override, + placeholder="Select an override to remove", + options=[SelectOption(label=name) for name in override_names], + row=2 + ) + if remove_select.options: + self.add_item(remove_select) + + async def interaction_check(self, interaction: Interaction) -> bool: + """Only allow interactions from the command invoker.""" + return interaction.user.id == self.author.id + + @discord.ui.button(label="Edit Content", row=3) + async def edit_content(self, interaction: Interaction, button: discord.ui.Button) -> None: + """A button to edit the filter's content. Pressing the button invokes a modal.""" + modal = EditContentModal(self, interaction.message) + await interaction.response.send_modal(modal) + + @discord.ui.button(label="Edit Description", row=3) + async def edit_description(self, interaction: Interaction, button: discord.ui.Button) -> None: + """A button to edit the filter's description. Pressing the button invokes a modal.""" + modal = EditDescriptionModal(self, interaction.message) + await interaction.response.send_modal(modal) + + @discord.ui.button(label="Empty Description", row=3) + async def empty_description(self, interaction: Interaction, button: discord.ui.Button) -> None: + """A button to empty the filter's description.""" + await self.update_embed(interaction, description=self._REMOVE) + + @discord.ui.button(label="✅ Confirm", style=discord.ButtonStyle.green, row=4) + async def confirm(self, interaction: Interaction, button: discord.ui.Button) -> None: + """Confirm the content, description, and settings, and update the filters database.""" + if self.content is None: + await interaction.response.send_message( + ":x: Cannot add a filter with no content.", ephemeral=True, reference=interaction.message + ) + if self.description is None: + self.description = "" + await interaction.response.edit_message(view=None) # Make sure the interaction succeeds first. + try: + await self.confirm_callback( + interaction.message, + self.filter_list, + self.list_type, + self.filter_type, + self.content, + self.description, + self.settings_overrides, + self.filter_settings_overrides + ) + except ResponseCodeError as e: + await interaction.message.channel.send(f"An error occurred: ```{e}```", reference=interaction.message) + + self.stop() + + @discord.ui.button(label="🚫 Cancel", style=discord.ButtonStyle.red, row=4) + async def cancel(self, interaction: Interaction, button: discord.ui.Button) -> None: + """Cancel the operation.""" + await interaction.response.edit_message(content="🚫 Operation canceled.", embed=None, view=None) + self.stop() + + async def _prompt_new_override(self, interaction: Interaction, select: discord.ui.Select) -> None: + """Prompt the user to give an override value for the setting they selected, and respond to the interaction.""" + setting_name = select.values[0] + type_ = self.type_per_setting_name[setting_name] + is_optional, type_ = _remove_optional(type_) + if hasattr(type_, "__origin__"): # In case this is a types.GenericAlias or a typing._GenericAlias + type_ = type_.__origin__ + new_view = self.copy() + # This is in order to not block the interaction response. There's a potential race condition here, since + # a view's method is used without guaranteeing the task completed, but since it depends on user input + # realistically it shouldn't happen. + scheduling.create_task(interaction.message.edit(view=new_view)) + update_callback = partial(new_view.update_embed, interaction_or_msg=interaction.message) + if type_ is bool: + view = BooleanSelectView(setting_name, update_callback) + await interaction.response.send_message(f"Choose a value for `{setting_name}`:", view=view, ephemeral=True) + elif type_ in (set, list, tuple): + current_value = self.settings_overrides.get(setting_name, []) + await interaction.response.send_message( + f"Current list: {current_value}", + view=SequenceEditView(setting_name, current_value, type_, update_callback), + ephemeral=True + ) + elif isinstance(type_, EnumMeta): + view = EnumSelectView(setting_name, type_, update_callback) + await interaction.response.send_message(f"Choose a value for `{setting_name}`:", view=view, ephemeral=True) + else: + await interaction.response.send_modal(FreeInputModal(setting_name, not is_optional, type_, update_callback)) + self.stop() + + async def update_embed( + self, + interaction_or_msg: discord.Interaction | discord.Message, + *, + content: str | None = None, + description: str | type[SettingsEditView._REMOVE] | None = None, + setting_name: str | None = None, + setting_value: str | type[SettingsEditView._REMOVE] | None = None, + ) -> None: + """ + Update the embed with the new information. + + If a setting name is provided with a _REMOVE value, remove the override. + If `interaction_or_msg` is a Message, the invoking Interaction must be deferred before calling this function. + """ + if content is not None or description is not None: + if content is not None: + self.content = content + else: + content = self.content # If there's no content or description, use the existing values. + if description is self._REMOVE: + self.description = None + elif description is not None: + self.description = description + else: + description = self.description + + # Update the embed with the new content and/or description. + self.embed.description = f"`{content}`" if content else "*No content*" + if description is not None and description is not self._REMOVE: + self.embed.description += f" - {description}" + + if setting_name: + # Find the right dictionary to update. + if "/" in setting_name: + filter_name, setting_name = setting_name.split("/", maxsplit=1) + dict_to_edit = self.filter_settings_overrides[filter_name] + else: + dict_to_edit = self.settings_overrides + # Update the setting override value or remove it + if setting_value is not self._REMOVE: + dict_to_edit[setting_name] = setting_value + else: + del dict_to_edit[setting_name] + + # This is inefficient, but otherwise the selects go insane if the user attempts to edit the same setting + # multiple times, even when replacing the select with a new one. + self.embed.clear_fields() + new_view = self.copy() + + if isinstance(interaction_or_msg, discord.Interaction): + await interaction_or_msg.response.edit_message(embed=self.embed, view=new_view) + else: + await interaction_or_msg.edit(embed=self.embed, view=new_view) + self.stop() + + async def edit_setting_override(self, interaction: Interaction, setting_name: str, override_value: Any) -> None: + """ + Update the overrides with the new value and edit the embed. + + The interaction needs to be the selection of the setting attached to the embed. + """ + await self.update_embed(interaction, setting_name=setting_name, setting_value=override_value) + + async def _remove_override(self, interaction: Interaction, select: discord.ui.Select) -> None: + """ + Remove the override for the setting the user selected, and edit the embed. + + The interaction needs to be the selection of the setting attached to the embed. + """ + await self.update_embed(interaction, setting_name=select.values[0], setting_value=self._REMOVE) + + def copy(self) -> SettingsEditView: + """Create a copy of this view.""" + return SettingsEditView( + self.filter_list, + self.list_type, + self.filter_type, + self.content, + self.description, + self.settings_overrides, + self.filter_settings_overrides, + self.loaded_settings, + self.loaded_filter_settings, + self.author, + self.embed, + self.confirm_callback + ) + + +def _remove_optional(type_: type) -> tuple[bool, type]: + """Return whether the type is Optional, and the Union of types which aren't None.""" + if not hasattr(type_, "__args__"): + return False, type_ + args = list(type_.__args__) + if type(None) not in args: + return False, type_ + args.remove(type(None)) + return True, Union[tuple(args)] + + +def _parse_value(value: str, type_: type[T]) -> T: + """Parse the value and attempt to convert it to the provided type.""" + is_optional, type_ = _remove_optional(type_) + if is_optional and value == '""': + return None + if hasattr(type_, "__origin__"): # In case this is a types.GenericAlias or a typing._GenericAlias + type_ = type_.__origin__ + if type_ in (tuple, list, set): + return type_(value.split(",")) + if type_ is bool: + return value == "True" + if isinstance(type_, EnumMeta): + return type_[value.upper()] + + return type_(value) + + +def description_and_settings_converter( + list_name: str, loaded_settings: dict, loaded_filter_settings: dict, input_data: str +) -> tuple[str, dict[str, Any], dict[str, Any]]: + """Parse a string representing a possible description and setting overrides, and validate the setting names.""" + if not input_data: + return "", {}, {} + + settings_pattern = re.compile(r"\s+(?=\S+=\S+)") + single_setting_pattern = re.compile(r"\w+=.+") + + parsed = settings_pattern.split(input_data) + if not parsed: + return "", {}, {} + + description = "" + if not single_setting_pattern.match(parsed[0]): + description, *parsed = parsed + + settings = {setting: value for setting, value in [part.split("=", maxsplit=1) for part in parsed]} + + filter_settings = {} + for setting, _ in list(settings.items()): + if setting not in loaded_settings: + if "/" in setting: + setting_list_name, filter_setting_name = setting.split("/", maxsplit=1) + if setting_list_name.lower() != list_name.lower(): + raise BadArgument( + f"A setting for a {setting_list_name!r} filter was provided, but the list name is {list_name!r}" + ) + if filter_setting_name not in loaded_filter_settings[list_name]: + raise BadArgument(f"{setting!r} is not a recognized setting.") + type_ = loaded_filter_settings[list_name][filter_setting_name][2] + try: + filter_settings[filter_setting_name] = _parse_value(settings.pop(setting), type_) + except (TypeError, ValueError) as e: + raise BadArgument(e) + else: + raise BadArgument(f"{setting!r} is not a recognized setting.") + else: + type_ = loaded_settings[setting][2] + try: + settings[setting] = _parse_value(settings.pop(setting), type_) + except (TypeError, ValueError) as e: + raise BadArgument(e) + + return description, settings, filter_settings diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index 630474c13..7ffa121e1 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -1,21 +1,26 @@ +import json import operator import re from collections import defaultdict from functools import reduce -from typing import Optional +from typing import Literal, Optional, get_type_hints from discord import Colour, Embed, HTTPException, Message from discord.ext import commands from discord.ext.commands import BadArgument, Cog, Context, has_any_role from discord.utils import escape_markdown +import bot +import bot.exts.filtering._ui as filters_ui from bot.bot import Bot from bot.constants import Colours, MODERATION_ROLES, Webhooks from bot.exts.filtering._filter_context import Event, FilterContext from bot.exts.filtering._filter_lists import FilterList, ListType, filter_list_types, list_type_converter from bot.exts.filtering._filters.filter import Filter from bot.exts.filtering._settings import ActionSettings -from bot.exts.filtering._ui import ArgumentCompletionView +from bot.exts.filtering._ui import ( + ArgumentCompletionView, build_filter_repr_dict, description_and_settings_converter, populate_embed_from_dict +) from bot.exts.filtering._utils import past_tense, to_serializable from bot.log import get_logger from bot.pagination import LinePaginator @@ -23,11 +28,6 @@ from bot.utils.messages import format_channel, format_user log = get_logger(__name__) -# Max number of characters in a Discord embed field value, minus 6 characters for a placeholder. -MAX_FIELD_SIZE = 1018 -# Max number of characters for an embed field's value before it should take its own line. -MAX_INLINE_SIZE = 50 - class Filtering(Cog): """Filtering and alerting for content posted on the server.""" @@ -111,15 +111,18 @@ class Filtering(Cog): settings_types.update(type(setting) for _, setting in list_defaults["actions"].items()) settings_types.update(type(setting) for _, setting in list_defaults["validations"].items()) for setting_type in settings_types: + type_hints = get_type_hints(setting_type) # The description should be either a string or a dictionary. if isinstance(setting_type.description, str): # If it's a string, then the setting matches a single field in the DB, # and its name is the setting type's name attribute. - self.loaded_settings[setting_type.name] = setting_type.description, setting_type + self.loaded_settings[setting_type.name] = ( + setting_type.description, setting_type, type_hints[setting_type.name] + ) else: # Otherwise, the setting type works with compound settings. self.loaded_settings.update({ - subsetting: (description, setting_type) + subsetting: (description, setting_type, type_hints[subsetting]) for subsetting, description in setting_type.description.items() }) @@ -128,9 +131,14 @@ class Filtering(Cog): extra_fields_type = filter_type.extra_fields_type if not extra_fields_type: continue + type_hints = get_type_hints(extra_fields_type) # A class var with a `_description` suffix is expected per field name. self.loaded_filter_settings[filter_name] = { - field_name: (getattr(extra_fields_type, f"{field_name}_description", ""), extra_fields_type) + field_name: ( + getattr(extra_fields_type, f"{field_name}_description", ""), + extra_fields_type, + type_hints[field_name] + ) for field_name in extra_fields_type.__fields__ } @@ -173,6 +181,31 @@ class Filtering(Cog): list_type, filter_list = result await self._send_list(ctx, filter_list, list_type) + @blocklist.command(name="add", aliases=("a",)) + async def bl_add( + self, + ctx: Context, + noui: Optional[Literal["noui"]], + list_name: Optional[str], + content: str, + *, + description_and_settings: Optional[str] = None + ) -> None: + """ + Add a blocked filter to the specified filter list. + + Unless `noui` is specified, a UI will be provided to edit the content, description, and settings + before confirmation. + + The settings can be provided in the command itself, in the format of `setting_name=value` (no spaces around the + equal sign). The value doesn't need to (shouldn't) be surrounded in quotes even if it contains spaces. + """ + result = await self._resolve_list_type_and_name(ctx, ListType.DENY, list_name) + if result is None: + return + list_type, filter_list = result + await self._add_filter(ctx, noui, list_type, filter_list, content, description_and_settings) + # endregion # region: whitelist commands @@ -191,6 +224,31 @@ class Filtering(Cog): list_type, filter_list = result await self._send_list(ctx, filter_list, list_type) + @allowlist.command(name="add", aliases=("a",)) + async def al_add( + self, + ctx: Context, + noui: Optional[Literal["noui"]], + list_name: Optional[str], + content: str, + *, + description_and_settings: Optional[str] = None + ) -> None: + """ + Add an allowed filter to the specified filter list. + + Unless `noui` is specified, a UI will be provided to edit the content, description, and settings + before confirmation. + + The settings can be provided in the command itself, in the format of `setting_name=value` (no spaces around the + equal sign). The value doesn't need to (shouldn't) be surrounded in quotes even if it contains spaces. + """ + result = await self._resolve_list_type_and_name(ctx, ListType.ALLOW, list_name) + if result is None: + return + list_type, filter_list = result + await self._add_filter(ctx, noui, list_type, filter_list, content, description_and_settings) + # endregion # region: filter commands @@ -224,23 +282,16 @@ class Filtering(Cog): for _, setting in settings.items(): overrides_values.update(to_serializable(setting.dict())) - # Combine them. It's done in this way to preserve field order, since the filter won't have all settings. - total_values = {} - for name, value in default_setting_values.items(): - if name not in overrides_values: - total_values[name] = value - else: - total_values[f"{name}*"] = overrides_values[name] - # Add the filter-specific settings. - if hasattr(filter_.extra_fields, "dict"): + if filter_.extra_fields_type: extra_fields_overrides = filter_.extra_fields.dict(exclude_unset=True) - for name, value in filter_.extra_fields.dict().items(): - if name not in extra_fields_overrides: - total_values[f"{filter_.name}/{name}"] = value - else: - total_values[f"{filter_.name}/{name}*"] = value + else: + extra_fields_overrides = {} - embed = self._build_embed_from_dict(total_values) + all_settings_repr_dict = build_filter_repr_dict( + filter_list, list_type, type(filter_), overrides_values, extra_fields_overrides + ) + embed = Embed(colour=Colour.blue()) + populate_embed_from_dict(embed, all_settings_repr_dict) embed.description = f"`{filter_.content}`" if filter_.description: embed.description += f" - {filter_.description}" @@ -253,7 +304,7 @@ class Filtering(Cog): @filter.command(name="list", aliases=("get",)) async def f_list( - self, ctx: Context, list_type: Optional[list_type_converter] = None, list_name: Optional[str] = None + self, ctx: Context, list_type: Optional[list_type_converter] = None, list_name: Optional[str] = None ) -> None: """List the contents of a specified list of filters.""" result = await self._resolve_list_type_and_name(ctx, list_type, list_name) @@ -282,6 +333,34 @@ class Filtering(Cog): embed.colour = Colour.blue() await ctx.send(embed=embed) + @filter.command(name="add", aliases=("a",)) + async def f_add( + self, + ctx: Context, + noui: Optional[Literal["noui"]], + list_type: Optional[list_type_converter], + list_name: Optional[str], + content: str, + *, + description_and_settings: Optional[str] = None + ) -> None: + """ + Add a filter to the specified filter list. + + Unless `noui` is specified, a UI will be provided to edit the content, description, and settings + before confirmation. + + The settings can be provided in the command itself, in the format of `setting_name=value` (no spaces around the + equal sign). The value doesn't need to (shouldn't) be surrounded in quotes even if it contains spaces. + + Example: `!filter add denied token "Scaleios is great" delete_messages=True send_alert=False` + """ + result = await self._resolve_list_type_and_name(ctx, list_type, list_name) + if result is None: + return + list_type, filter_list = result + await self._add_filter(ctx, noui, list_type, filter_list, content, description_and_settings) + @filter.group(aliases=("settings",)) async def setting(self, ctx: Context) -> None: """Group for settings-related commands.""" @@ -347,7 +426,8 @@ class Filtering(Cog): for _, setting in list_defaults[type_].items(): setting_values.update(to_serializable(setting.dict())) - embed = self._build_embed_from_dict(setting_values) + embed = Embed(colour=Colour.blue()) + populate_embed_from_dict(embed, setting_values) # Use the class's docstring, and ignore single newlines. embed.description = re.sub(r"(?<!\n)\n(?!\n)", " ", filter_list.__doc__) embed.set_author( @@ -473,18 +553,79 @@ class Filtering(Cog): if id_ in sublist: return sublist[id_], filter_list, list_type + async def _add_filter( + self, + ctx: Context, + noui: Optional[Literal["noui"]], + list_type: ListType, + filter_list: FilterList, + content: str, + description_and_settings: Optional[str] = None + ) -> None: + """Add a filter to the database.""" + description, settings, filter_settings = description_and_settings_converter( + filter_list.name, self.loaded_settings, self.loaded_filter_settings, description_and_settings + ) + filter_type = filter_list.get_filter_type(content) + + if noui: + await self._post_new_filter( + ctx.message, filter_list, list_type, filter_type, content, description, settings, filter_settings + ) + + else: + embed = Embed(colour=Colour.blue()) + embed.description = f"`{content}`" if content else "*No content*" + if description: + embed.description += f" - {description}" + embed.set_author( + name=f"New Filter - {past_tense(list_type.name.lower())} {filter_list.name}".title()) + embed.set_footer(text=( + "Field names with an asterisk have values which override the defaults of the containing filter list. " + f"To view all defaults of the list, run `!filterlist describe {list_type.name} {filter_list.name}`." + )) + + view = filters_ui.SettingsEditView( + filter_list, + list_type, + filter_type, + content, + description, + settings, + filter_settings, + self.loaded_settings, + self.loaded_filter_settings, + ctx.author, + embed, + self._post_new_filter + ) + await ctx.send(embed=embed, reference=ctx.message, view=view) + @staticmethod - def _build_embed_from_dict(data: dict) -> Embed: - """Build a Discord embed by populating fields from the given dict.""" - embed = Embed(description="", colour=Colour.blue()) - for setting, value in data.items(): - if setting.startswith("_"): - continue - value = str(value) if value not in ("", None) else "-" - if len(value) > MAX_FIELD_SIZE: - value = value[:MAX_FIELD_SIZE] + " [...]" - embed.add_field(name=setting, value=value, inline=len(value) < MAX_INLINE_SIZE) - return embed + async def _post_new_filter( + msg: Message, + filter_list: FilterList, + list_type: ListType, + filter_type: type[Filter], + content: str, + description: str | None, + settings: dict, + filter_settings: dict + ) -> None: + """POST the data of the new filter to the site API.""" + valid, error_msg = filter_type.validate_filter_settings(filter_settings) + if not valid: + raise BadArgument(f"Error while validating filter-specific settings: {error_msg}") + + list_id = filter_list.list_ids[list_type] + description = description or None + payload = { + "filter_list": list_id, "content": content, "description": description, + "additional_field": json.dumps(filter_settings), **settings + } + response = await bot.instance.api_client.post('bot/filter/filters', json=payload) + new_filter = filter_list.add_filter(response, list_type) + await msg.channel.send(f"✅ Added filter: {new_filter}", reference=msg) # endregion |