diff options
-rw-r--r-- | bot/exts/filtering/_filter_lists/__init__.py | 4 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 34 | ||||
-rw-r--r-- | bot/exts/filtering/_filter_lists/token.py | 2 | ||||
-rw-r--r-- | bot/exts/filtering/_filters/filter.py | 7 | ||||
-rw-r--r-- | bot/exts/filtering/_ui.py | 48 | ||||
-rw-r--r-- | bot/exts/filtering/_utils.py | 11 | ||||
-rw-r--r-- | bot/exts/filtering/filtering.py | 155 |
7 files changed, 232 insertions, 29 deletions
diff --git a/bot/exts/filtering/_filter_lists/__init__.py b/bot/exts/filtering/_filter_lists/__init__.py index 415e3a6bf..1273e5588 100644 --- a/bot/exts/filtering/_filter_lists/__init__.py +++ b/bot/exts/filtering/_filter_lists/__init__.py @@ -1,9 +1,9 @@ from os.path import dirname -from bot.exts.filtering._filter_lists.filter_list import FilterList +from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType, ListTypeConverter from bot.exts.filtering._utils import subclasses_in_package filter_list_types = subclasses_in_package(dirname(__file__), f"{__name__}.", FilterList) filter_list_types = {filter_list.name: filter_list for filter_list in filter_list_types} -__all__ = [filter_list_types, FilterList] +__all__ = [filter_list_types, FilterList, ListType, ListTypeConverter] diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index f9e304b59..1060f11db 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -2,20 +2,40 @@ from abc import abstractmethod from enum import Enum from typing import Dict, List, Type -from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings -from bot.exts.filtering._filters.filter import Filter +from discord.ext.commands import BadArgument, Context, Converter + from bot.exts.filtering._filter_context import FilterContext -from bot.exts.filtering._utils import FieldRequiring +from bot.exts.filtering._filters.filter import Filter +from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings +from bot.exts.filtering._utils import FieldRequiring, past_tense from bot.log import get_logger log = get_logger(__name__) class ListType(Enum): + """An enumeration of list types.""" + DENY = 0 ALLOW = 1 +class ListTypeConverter(Converter): + """A Converter to get the appropriate list type.""" + + aliases = ( + (ListType.DENY, {"deny", "blocklist", "blacklist", "denylist", "bl", "dl"}), + (ListType.ALLOW, {"allow", "allowlist", "whitelist", "al", "wl"}) + ) + + async def convert(self, ctx: Context, argument: str) -> ListType: + """Get the appropriate list type.""" + for list_type, aliases in self.aliases: + if argument in aliases or argument in map(past_tense, aliases): + return list_type + raise BadArgument(f"No matching list type found for {argument!r}.") + + class FilterList(FieldRequiring): """Dispatches events to lists of _filters, and aggregates the responses into a single list of actions to take.""" @@ -24,8 +44,8 @@ class FilterList(FieldRequiring): name = FieldRequiring.MUST_SET_UNIQUE def __init__(self, filter_type: Type[Filter]): - self._filter_lists: dict[ListType, list[Filter]] = {} - self._defaults: dict[ListType, dict[str, Settings]] = {} + self.filter_lists: dict[ListType, list[Filter]] = {} + self.defaults: dict[ListType, dict[str, Settings]] = {} self.filter_type = filter_type @@ -33,7 +53,7 @@ class FilterList(FieldRequiring): """Add a new type of list (such as a whitelist or a blacklist) this filter list.""" actions, validations = create_settings(list_data["settings"]) list_type = ListType(list_data["list_type"]) - self._defaults[list_type] = {"actions": actions, "validations": validations} + self.defaults[list_type] = {"actions": actions, "validations": validations} filters = [] for filter_data in list_data["filters"]: @@ -41,7 +61,7 @@ class FilterList(FieldRequiring): filters.append(self.filter_type(filter_data, actions)) except TypeError as e: log.warning(e) - self._filter_lists[list_type] = filters + self.filter_lists[list_type] = filters @abstractmethod def triggers_for(self, ctx: FilterContext) -> list[Filter]: diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index 4495f4414..01e586132 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -33,7 +33,7 @@ class TokensList(FilterList): ctx = ctx.replace(content=text) return self.filter_list_result( - ctx, self._filter_lists[ListType.DENY], self._defaults[ListType.DENY]["validations"] + ctx, self.filter_lists[ListType.DENY], self.defaults[ListType.DENY]["validations"] ) @staticmethod diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py index 484e506fc..e7fff20a6 100644 --- a/bot/exts/filtering/_filters/filter.py +++ b/bot/exts/filtering/_filters/filter.py @@ -27,3 +27,10 @@ class Filter(ABC): @abstractmethod def triggered_on(self, ctx: FilterContext) -> bool: """Search for the filter's content within a given context.""" + + def __str__(self) -> str: + """A string representation of the filter.""" + string = f"#{self.id}. `{self.content}`" + if self.description: + string += f" - {self.description}" + return string diff --git a/bot/exts/filtering/_ui.py b/bot/exts/filtering/_ui.py new file mode 100644 index 000000000..95a840be8 --- /dev/null +++ b/bot/exts/filtering/_ui.py @@ -0,0 +1,48 @@ +from copy import copy + +import discord +import discord.ui +from discord.ext.commands import Context + +import bot +from bot.log import get_logger + +log = get_logger(__name__) + + +class ArgumentCompletionSelect(discord.ui.Select): + """A select detailing the options that can be picked to assign to a missing argument.""" + + def __init__(self, ctx: Context, arg_name: str, options: list[str]): + super().__init__( + placeholder=f"Select a value for {arg_name!r}", + options=[discord.SelectOption(label=option) for option in options] + ) + self.ctx = ctx + + async def callback(self, interaction: discord.Interaction) -> None: + """re-invoke the context command with the completed argument value.""" + await interaction.response.defer() + value = interaction.data['values'][0] + message = copy(self.ctx.message) + message.content = f'{message.content} "{value}"' + log.trace(f"Argument filled with the value {value}. Invoking {message.content!r}") + await bot.instance.process_commands(message) + + +class ArgumentCompletionView(discord.ui.View): + """A view used to complete a missing argument in an in invoked command.""" + + def __init__(self, ctx: Context, arg_name: str, options: list[str]): + super().__init__() + log.trace(f"The {arg_name} argument was designated missing in the invocation {ctx.view.buffer!r}") + self.add_item(ArgumentCompletionSelect(ctx, arg_name, options)) + self.ctx = ctx + + async def interaction_check(self, interaction: discord.Interaction) -> bool: + """Check to ensure that the interacting user is the user who invoked the command.""" + if interaction.user != self.ctx.author: + embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.") + await interaction.response.send_message(embed=embed, ephemeral=True) + return False + return True diff --git a/bot/exts/filtering/_utils.py b/bot/exts/filtering/_utils.py index a769001f6..790f70ee5 100644 --- a/bot/exts/filtering/_utils.py +++ b/bot/exts/filtering/_utils.py @@ -47,6 +47,17 @@ def clean_input(string: str) -> str: return INVISIBLE_RE.sub("", no_zalgo) +def past_tense(word: str) -> str: + """Return the past tense form of the input word.""" + if not word: + return word + if word.endswith("e"): + return word + "d" + if word.endswith("y") and len(word) > 1 and word[-2] not in "aeiou": + return word[:-1] + "ied" + return word + "ed" + + class FieldRequiring(ABC): """A mixin class that can force its concrete subclasses to set a value for specific class attributes.""" diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index c74b85698..58e16043a 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -3,17 +3,21 @@ from collections import defaultdict from functools import reduce from typing import Optional -from discord import Embed, HTTPException, Message -from discord.ext.commands import Cog +from discord import Colour, Embed, HTTPException, Message +from discord.ext import commands +from discord.ext.commands import BadArgument, Cog, Context, has_any_role from discord.utils import escape_markdown from bot.bot import Bot -from bot.constants import Colours, Webhooks +from bot.constants import Colours, MODERATION_ROLES, Webhooks from bot.exts.filtering._filter_context import Event, FilterContext -from bot.exts.filtering._filter_lists import FilterList, filter_list_types +from bot.exts.filtering._filter_lists import FilterList, ListType, ListTypeConverter, filter_list_types from bot.exts.filtering._filters.filter import Filter from bot.exts.filtering._settings import ActionSettings +from bot.exts.filtering._ui import ArgumentCompletionView +from bot.exts.filtering._utils import past_tense from bot.log import get_logger +from bot.pagination import LinePaginator from bot.utils.messages import format_channel, format_user log = get_logger(__name__) @@ -22,6 +26,8 @@ log = get_logger(__name__) class Filtering(Cog): """Filtering and alerting for content posted on the server.""" + # region: init + def __init__(self, bot: Bot): self.bot = bot self.filter_lists: dict[str, FilterList] = {} @@ -54,7 +60,7 @@ class Filtering(Cog): try: self.webhook = await self.bot.fetch_webhook(Webhooks.filters) except HTTPException: - log.error(f"Failed to fetch incidents webhook with id `{Webhooks.incidents}`.") + log.error(f"Failed to fetch incidents webhook with ID `{Webhooks.incidents}`.") def subscribe(self, filter_list: FilterList, *events: Event) -> None: """ @@ -73,21 +79,12 @@ class Filtering(Cog): if filter_list not in self._subscriptions[event]: self._subscriptions[event].append(filter_list) - async def _resolve_action( - self, ctx: FilterContext - ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]: - """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event.""" - triggered = {} - for filter_list in self._subscriptions[ctx.event]: - triggered[filter_list] = filter_list.triggers_for(ctx) - - result_actions = None - if triggered: - result_actions = reduce( - operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters) - ) + async def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return await has_any_role(*MODERATION_ROLES).predicate(ctx) - return triggered, result_actions + # endregion + # region: listeners @Cog.listener() async def on_message(self, msg: Message) -> None: @@ -103,6 +100,100 @@ class Filtering(Cog): if ctx.send_alert: await self._send_alert(ctx, triggered) + # endregion + # region: blacklist commands + + @commands.group(aliases=("bl", "blacklist", "denylist", "dl")) + async def blocklist(self, ctx: Context) -> None: + """Group for managing blacklisted items.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @blocklist.command(name="list", aliases=("get",)) + async def bl_list(self, ctx: Context, list_name: Optional[str] = None) -> None: + """List the contents of a specified blacklist.""" + if list_name is None: + await ctx.send( + "The **list_name** argument is unspecified. Please pick a value from the options below:", + view=ArgumentCompletionView(ctx, "list_name", list(self.filter_lists)) + ) + return + await self._send_list(ctx, list_name, ListType.DENY) + + # endregion + # region: whitelist commands + + @commands.group(aliases=("wl", "whitelist", "al")) + async def allowlist(self, ctx: Context) -> None: + """Group for managing blacklisted items.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @allowlist.command(name="list", aliases=("get",)) + async def al_list(self, ctx: Context, list_name: Optional[str] = None) -> None: + """List the contents of a specified whitelist.""" + if list_name is None: + await ctx.send( + "The **list_name** argument is unspecified. Please pick a value from the options below:", + view=ArgumentCompletionView(ctx, "list_name", list(self.filter_lists)) + ) + return + await self._send_list(ctx, list_name, ListType.ALLOW) + + # endregion + # region: filter commands + + @commands.group(aliases=("filters", "f")) + async def filter(self, ctx: Context) -> None: + """Group for managing filters.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @filter.command(name="list", aliases=("get",)) + async def f_list( + self, ctx: Context, list_type: Optional[ListTypeConverter] = None, list_name: Optional[str] = None + ) -> None: + """List the contents of a specified list of filters.""" + if list_name is None: + await ctx.send( + "The **list_name** argument is unspecified. Please pick a value from the options below:", + view=ArgumentCompletionView(ctx, "list_name", list(self.filter_lists)) + ) + return + + if list_type is None: + filter_list = self._get_list_by_name(list_name) + if len(filter_list.filter_lists) > 1: + await ctx.send( + "The **list_type** argument is unspecified. Please pick a value from the options below:", + view=ArgumentCompletionView(ctx, "list_type", [option.name for option in ListType]) + ) + return + list_type = list(filter_list.filter_lists)[0] + + await self._send_list(ctx, list_name, list_type) + + # endregion + # region: helper functions + + async def _resolve_action( + self, ctx: FilterContext + ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]: + """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event.""" + triggered = {} + for filter_list in self._subscriptions[ctx.event]: + result = filter_list.triggers_for(ctx) + if result: + triggered[filter_list] = result + + result_actions = None + if triggered: + result_actions = reduce( + operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters) + ) + + return triggered, result_actions + async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[Filter]]) -> None: """Build an alert message from the filter context, and send it via the alert webhook.""" if not self.webhook: @@ -144,6 +235,32 @@ class Filtering(Cog): await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds]) + def _get_list_by_name(self, list_name: str) -> FilterList: + """Get a filter list by its name, or raise an error if there's no such list.""" + log.trace(f"Getting the filter list matching the name {list_name}") + filter_list = self.filter_lists.get(list_name) + if not filter_list: + if list_name.endswith("s"): # The user may have attempted to use the plural form. + filter_list = self.filter_lists.get(list_name[:-1]) + if not filter_list: + raise BadArgument(f"There's no filter list named {list_name!r}.") + log.trace(f"Found list named {filter_list.name}") + return filter_list + + async def _send_list(self, ctx: Context, list_name: str, list_type: ListType) -> None: + """Show the list of filters identified by the list name and type.""" + filter_list = self._get_list_by_name(list_name) + lines = list(map(str, filter_list.filter_lists.get(list_type, []))) + log.trace(f"Sending a list of {len(lines)} filters.") + + list_name_plural = list_name + ("s" if not list_name.endswith("s") else "") + embed = Embed(colour=Colour.blue()) + embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {list_name_plural} ({len(lines)} total)") + + await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False) + + # endregion + async def setup(bot: Bot) -> None: """Load the Filtering cog.""" |