aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/filtering/_filter_lists/__init__.py4
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py34
-rw-r--r--bot/exts/filtering/_filter_lists/token.py2
-rw-r--r--bot/exts/filtering/_filters/filter.py7
-rw-r--r--bot/exts/filtering/_ui.py48
-rw-r--r--bot/exts/filtering/_utils.py11
-rw-r--r--bot/exts/filtering/filtering.py155
7 files changed, 232 insertions, 29 deletions
diff --git a/bot/exts/filtering/_filter_lists/__init__.py b/bot/exts/filtering/_filter_lists/__init__.py
index 415e3a6bf..1273e5588 100644
--- a/bot/exts/filtering/_filter_lists/__init__.py
+++ b/bot/exts/filtering/_filter_lists/__init__.py
@@ -1,9 +1,9 @@
from os.path import dirname
-from bot.exts.filtering._filter_lists.filter_list import FilterList
+from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType, ListTypeConverter
from bot.exts.filtering._utils import subclasses_in_package
filter_list_types = subclasses_in_package(dirname(__file__), f"{__name__}.", FilterList)
filter_list_types = {filter_list.name: filter_list for filter_list in filter_list_types}
-__all__ = [filter_list_types, FilterList]
+__all__ = [filter_list_types, FilterList, ListType, ListTypeConverter]
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index f9e304b59..1060f11db 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -2,20 +2,40 @@ from abc import abstractmethod
from enum import Enum
from typing import Dict, List, Type
-from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings
-from bot.exts.filtering._filters.filter import Filter
+from discord.ext.commands import BadArgument, Context, Converter
+
from bot.exts.filtering._filter_context import FilterContext
-from bot.exts.filtering._utils import FieldRequiring
+from bot.exts.filtering._filters.filter import Filter
+from bot.exts.filtering._settings import Settings, ValidationSettings, create_settings
+from bot.exts.filtering._utils import FieldRequiring, past_tense
from bot.log import get_logger
log = get_logger(__name__)
class ListType(Enum):
+ """An enumeration of list types."""
+
DENY = 0
ALLOW = 1
+class ListTypeConverter(Converter):
+ """A Converter to get the appropriate list type."""
+
+ aliases = (
+ (ListType.DENY, {"deny", "blocklist", "blacklist", "denylist", "bl", "dl"}),
+ (ListType.ALLOW, {"allow", "allowlist", "whitelist", "al", "wl"})
+ )
+
+ async def convert(self, ctx: Context, argument: str) -> ListType:
+ """Get the appropriate list type."""
+ for list_type, aliases in self.aliases:
+ if argument in aliases or argument in map(past_tense, aliases):
+ return list_type
+ raise BadArgument(f"No matching list type found for {argument!r}.")
+
+
class FilterList(FieldRequiring):
"""Dispatches events to lists of _filters, and aggregates the responses into a single list of actions to take."""
@@ -24,8 +44,8 @@ class FilterList(FieldRequiring):
name = FieldRequiring.MUST_SET_UNIQUE
def __init__(self, filter_type: Type[Filter]):
- self._filter_lists: dict[ListType, list[Filter]] = {}
- self._defaults: dict[ListType, dict[str, Settings]] = {}
+ self.filter_lists: dict[ListType, list[Filter]] = {}
+ self.defaults: dict[ListType, dict[str, Settings]] = {}
self.filter_type = filter_type
@@ -33,7 +53,7 @@ class FilterList(FieldRequiring):
"""Add a new type of list (such as a whitelist or a blacklist) this filter list."""
actions, validations = create_settings(list_data["settings"])
list_type = ListType(list_data["list_type"])
- self._defaults[list_type] = {"actions": actions, "validations": validations}
+ self.defaults[list_type] = {"actions": actions, "validations": validations}
filters = []
for filter_data in list_data["filters"]:
@@ -41,7 +61,7 @@ class FilterList(FieldRequiring):
filters.append(self.filter_type(filter_data, actions))
except TypeError as e:
log.warning(e)
- self._filter_lists[list_type] = filters
+ self.filter_lists[list_type] = filters
@abstractmethod
def triggers_for(self, ctx: FilterContext) -> list[Filter]:
diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py
index 4495f4414..01e586132 100644
--- a/bot/exts/filtering/_filter_lists/token.py
+++ b/bot/exts/filtering/_filter_lists/token.py
@@ -33,7 +33,7 @@ class TokensList(FilterList):
ctx = ctx.replace(content=text)
return self.filter_list_result(
- ctx, self._filter_lists[ListType.DENY], self._defaults[ListType.DENY]["validations"]
+ ctx, self.filter_lists[ListType.DENY], self.defaults[ListType.DENY]["validations"]
)
@staticmethod
diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py
index 484e506fc..e7fff20a6 100644
--- a/bot/exts/filtering/_filters/filter.py
+++ b/bot/exts/filtering/_filters/filter.py
@@ -27,3 +27,10 @@ class Filter(ABC):
@abstractmethod
def triggered_on(self, ctx: FilterContext) -> bool:
"""Search for the filter's content within a given context."""
+
+ def __str__(self) -> str:
+ """A string representation of the filter."""
+ string = f"#{self.id}. `{self.content}`"
+ if self.description:
+ string += f" - {self.description}"
+ return string
diff --git a/bot/exts/filtering/_ui.py b/bot/exts/filtering/_ui.py
new file mode 100644
index 000000000..95a840be8
--- /dev/null
+++ b/bot/exts/filtering/_ui.py
@@ -0,0 +1,48 @@
+from copy import copy
+
+import discord
+import discord.ui
+from discord.ext.commands import Context
+
+import bot
+from bot.log import get_logger
+
+log = get_logger(__name__)
+
+
+class ArgumentCompletionSelect(discord.ui.Select):
+ """A select detailing the options that can be picked to assign to a missing argument."""
+
+ def __init__(self, ctx: Context, arg_name: str, options: list[str]):
+ super().__init__(
+ placeholder=f"Select a value for {arg_name!r}",
+ options=[discord.SelectOption(label=option) for option in options]
+ )
+ self.ctx = ctx
+
+ async def callback(self, interaction: discord.Interaction) -> None:
+ """re-invoke the context command with the completed argument value."""
+ await interaction.response.defer()
+ value = interaction.data['values'][0]
+ message = copy(self.ctx.message)
+ message.content = f'{message.content} "{value}"'
+ log.trace(f"Argument filled with the value {value}. Invoking {message.content!r}")
+ await bot.instance.process_commands(message)
+
+
+class ArgumentCompletionView(discord.ui.View):
+ """A view used to complete a missing argument in an in invoked command."""
+
+ def __init__(self, ctx: Context, arg_name: str, options: list[str]):
+ super().__init__()
+ log.trace(f"The {arg_name} argument was designated missing in the invocation {ctx.view.buffer!r}")
+ self.add_item(ArgumentCompletionSelect(ctx, arg_name, options))
+ self.ctx = ctx
+
+ async def interaction_check(self, interaction: discord.Interaction) -> bool:
+ """Check to ensure that the interacting user is the user who invoked the command."""
+ if interaction.user != self.ctx.author:
+ embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.")
+ await interaction.response.send_message(embed=embed, ephemeral=True)
+ return False
+ return True
diff --git a/bot/exts/filtering/_utils.py b/bot/exts/filtering/_utils.py
index a769001f6..790f70ee5 100644
--- a/bot/exts/filtering/_utils.py
+++ b/bot/exts/filtering/_utils.py
@@ -47,6 +47,17 @@ def clean_input(string: str) -> str:
return INVISIBLE_RE.sub("", no_zalgo)
+def past_tense(word: str) -> str:
+ """Return the past tense form of the input word."""
+ if not word:
+ return word
+ if word.endswith("e"):
+ return word + "d"
+ if word.endswith("y") and len(word) > 1 and word[-2] not in "aeiou":
+ return word[:-1] + "ied"
+ return word + "ed"
+
+
class FieldRequiring(ABC):
"""A mixin class that can force its concrete subclasses to set a value for specific class attributes."""
diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py
index c74b85698..58e16043a 100644
--- a/bot/exts/filtering/filtering.py
+++ b/bot/exts/filtering/filtering.py
@@ -3,17 +3,21 @@ from collections import defaultdict
from functools import reduce
from typing import Optional
-from discord import Embed, HTTPException, Message
-from discord.ext.commands import Cog
+from discord import Colour, Embed, HTTPException, Message
+from discord.ext import commands
+from discord.ext.commands import BadArgument, Cog, Context, has_any_role
from discord.utils import escape_markdown
from bot.bot import Bot
-from bot.constants import Colours, Webhooks
+from bot.constants import Colours, MODERATION_ROLES, Webhooks
from bot.exts.filtering._filter_context import Event, FilterContext
-from bot.exts.filtering._filter_lists import FilterList, filter_list_types
+from bot.exts.filtering._filter_lists import FilterList, ListType, ListTypeConverter, filter_list_types
from bot.exts.filtering._filters.filter import Filter
from bot.exts.filtering._settings import ActionSettings
+from bot.exts.filtering._ui import ArgumentCompletionView
+from bot.exts.filtering._utils import past_tense
from bot.log import get_logger
+from bot.pagination import LinePaginator
from bot.utils.messages import format_channel, format_user
log = get_logger(__name__)
@@ -22,6 +26,8 @@ log = get_logger(__name__)
class Filtering(Cog):
"""Filtering and alerting for content posted on the server."""
+ # region: init
+
def __init__(self, bot: Bot):
self.bot = bot
self.filter_lists: dict[str, FilterList] = {}
@@ -54,7 +60,7 @@ class Filtering(Cog):
try:
self.webhook = await self.bot.fetch_webhook(Webhooks.filters)
except HTTPException:
- log.error(f"Failed to fetch incidents webhook with id `{Webhooks.incidents}`.")
+ log.error(f"Failed to fetch incidents webhook with ID `{Webhooks.incidents}`.")
def subscribe(self, filter_list: FilterList, *events: Event) -> None:
"""
@@ -73,21 +79,12 @@ class Filtering(Cog):
if filter_list not in self._subscriptions[event]:
self._subscriptions[event].append(filter_list)
- async def _resolve_action(
- self, ctx: FilterContext
- ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]:
- """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event."""
- triggered = {}
- for filter_list in self._subscriptions[ctx.event]:
- triggered[filter_list] = filter_list.triggers_for(ctx)
-
- result_actions = None
- if triggered:
- result_actions = reduce(
- operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters)
- )
+ async def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return await has_any_role(*MODERATION_ROLES).predicate(ctx)
- return triggered, result_actions
+ # endregion
+ # region: listeners
@Cog.listener()
async def on_message(self, msg: Message) -> None:
@@ -103,6 +100,100 @@ class Filtering(Cog):
if ctx.send_alert:
await self._send_alert(ctx, triggered)
+ # endregion
+ # region: blacklist commands
+
+ @commands.group(aliases=("bl", "blacklist", "denylist", "dl"))
+ async def blocklist(self, ctx: Context) -> None:
+ """Group for managing blacklisted items."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @blocklist.command(name="list", aliases=("get",))
+ async def bl_list(self, ctx: Context, list_name: Optional[str] = None) -> None:
+ """List the contents of a specified blacklist."""
+ if list_name is None:
+ await ctx.send(
+ "The **list_name** argument is unspecified. Please pick a value from the options below:",
+ view=ArgumentCompletionView(ctx, "list_name", list(self.filter_lists))
+ )
+ return
+ await self._send_list(ctx, list_name, ListType.DENY)
+
+ # endregion
+ # region: whitelist commands
+
+ @commands.group(aliases=("wl", "whitelist", "al"))
+ async def allowlist(self, ctx: Context) -> None:
+ """Group for managing blacklisted items."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @allowlist.command(name="list", aliases=("get",))
+ async def al_list(self, ctx: Context, list_name: Optional[str] = None) -> None:
+ """List the contents of a specified whitelist."""
+ if list_name is None:
+ await ctx.send(
+ "The **list_name** argument is unspecified. Please pick a value from the options below:",
+ view=ArgumentCompletionView(ctx, "list_name", list(self.filter_lists))
+ )
+ return
+ await self._send_list(ctx, list_name, ListType.ALLOW)
+
+ # endregion
+ # region: filter commands
+
+ @commands.group(aliases=("filters", "f"))
+ async def filter(self, ctx: Context) -> None:
+ """Group for managing filters."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @filter.command(name="list", aliases=("get",))
+ async def f_list(
+ self, ctx: Context, list_type: Optional[ListTypeConverter] = None, list_name: Optional[str] = None
+ ) -> None:
+ """List the contents of a specified list of filters."""
+ if list_name is None:
+ await ctx.send(
+ "The **list_name** argument is unspecified. Please pick a value from the options below:",
+ view=ArgumentCompletionView(ctx, "list_name", list(self.filter_lists))
+ )
+ return
+
+ if list_type is None:
+ filter_list = self._get_list_by_name(list_name)
+ if len(filter_list.filter_lists) > 1:
+ await ctx.send(
+ "The **list_type** argument is unspecified. Please pick a value from the options below:",
+ view=ArgumentCompletionView(ctx, "list_type", [option.name for option in ListType])
+ )
+ return
+ list_type = list(filter_list.filter_lists)[0]
+
+ await self._send_list(ctx, list_name, list_type)
+
+ # endregion
+ # region: helper functions
+
+ async def _resolve_action(
+ self, ctx: FilterContext
+ ) -> tuple[dict[FilterList, list[Filter]], Optional[ActionSettings]]:
+ """Get the filters triggered per list, and resolve from them the action that needs to be taken for the event."""
+ triggered = {}
+ for filter_list in self._subscriptions[ctx.event]:
+ result = filter_list.triggers_for(ctx)
+ if result:
+ triggered[filter_list] = result
+
+ result_actions = None
+ if triggered:
+ result_actions = reduce(
+ operator.or_, (filter_.actions for filters in triggered.values() for filter_ in filters)
+ )
+
+ return triggered, result_actions
+
async def _send_alert(self, ctx: FilterContext, triggered_filters: dict[FilterList, list[Filter]]) -> None:
"""Build an alert message from the filter context, and send it via the alert webhook."""
if not self.webhook:
@@ -144,6 +235,32 @@ class Filtering(Cog):
await self.webhook.send(username=name, content=ctx.alert_content, embeds=[embed, *ctx.alert_embeds])
+ def _get_list_by_name(self, list_name: str) -> FilterList:
+ """Get a filter list by its name, or raise an error if there's no such list."""
+ log.trace(f"Getting the filter list matching the name {list_name}")
+ filter_list = self.filter_lists.get(list_name)
+ if not filter_list:
+ if list_name.endswith("s"): # The user may have attempted to use the plural form.
+ filter_list = self.filter_lists.get(list_name[:-1])
+ if not filter_list:
+ raise BadArgument(f"There's no filter list named {list_name!r}.")
+ log.trace(f"Found list named {filter_list.name}")
+ return filter_list
+
+ async def _send_list(self, ctx: Context, list_name: str, list_type: ListType) -> None:
+ """Show the list of filters identified by the list name and type."""
+ filter_list = self._get_list_by_name(list_name)
+ lines = list(map(str, filter_list.filter_lists.get(list_type, [])))
+ log.trace(f"Sending a list of {len(lines)} filters.")
+
+ list_name_plural = list_name + ("s" if not list_name.endswith("s") else "")
+ embed = Embed(colour=Colour.blue())
+ embed.set_author(name=f"List of {past_tense(list_type.name.lower())} {list_name_plural} ({len(lines)} total)")
+
+ await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False)
+
+ # endregion
+
async def setup(bot: Bot) -> None:
"""Load the Filtering cog."""