aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/filtering/_filter_lists/domain.py20
-rw-r--r--bot/exts/filtering/_filter_lists/extension.py2
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py104
-rw-r--r--bot/exts/filtering/_filter_lists/invite.py113
-rw-r--r--bot/exts/filtering/_filter_lists/token.py20
-rw-r--r--bot/exts/filtering/_filter_lists/unique.py75
-rw-r--r--bot/exts/filtering/_filters/filter.py14
-rw-r--r--bot/exts/filtering/_filters/unique/__init__.py9
-rw-r--r--bot/exts/filtering/_filters/unique/everyone.py28
-rw-r--r--bot/exts/filtering/_utils.py2
-rw-r--r--bot/exts/filtering/filtering.py8
11 files changed, 285 insertions, 110 deletions
diff --git a/bot/exts/filtering/_filter_lists/domain.py b/bot/exts/filtering/_filter_lists/domain.py
index 17984e276..d97aa252b 100644
--- a/bot/exts/filtering/_filter_lists/domain.py
+++ b/bot/exts/filtering/_filter_lists/domain.py
@@ -2,8 +2,6 @@ from __future__ import annotations
import re
import typing
-from functools import reduce
-from operator import or_
from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
@@ -18,7 +16,7 @@ if typing.TYPE_CHECKING:
URL_RE = re.compile(r"https?://(\S+)", flags=re.IGNORECASE)
-class DomainsList(FilterList):
+class DomainsList(FilterList[DomainFilter]):
"""
A list of filters, each looking for a specific domain given by URL.
@@ -59,18 +57,6 @@ class DomainsList(FilterList):
actions = None
messages = []
if triggers:
- action_defaults = self[ListType.DENY].defaults.actions
- actions = reduce(
- or_,
- (filter_.actions.fallback_to(action_defaults) if filter_.actions else action_defaults
- for filter_ in triggers
- )
- )
- if len(triggers) == 1:
- message = f"#{triggers[0].id} (`{triggers[0].content}`)"
- if triggers[0].description:
- message += f" - {triggers[0].description}"
- messages = [message]
- else:
- messages = [f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers]
+ actions = self[ListType.DENY].merge_actions(triggers)
+ messages = self[ListType.DENY].format_messages(triggers)
return actions, messages
diff --git a/bot/exts/filtering/_filter_lists/extension.py b/bot/exts/filtering/_filter_lists/extension.py
index a58c6c45e..3f9d2b287 100644
--- a/bot/exts/filtering/_filter_lists/extension.py
+++ b/bot/exts/filtering/_filter_lists/extension.py
@@ -33,7 +33,7 @@ DISALLOWED_EMBED_DESCRIPTION = (
)
-class ExtensionsList(FilterList):
+class ExtensionsList(FilterList[ExtensionFilter]):
"""
A list of filters, each looking for a file attachment with a specific extension.
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index 84a43072b..55204335b 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -1,11 +1,18 @@
+import dataclasses
+import typing
from abc import abstractmethod
+from collections import defaultdict
+from collections.abc import Iterable
+from dataclasses import dataclass
from enum import Enum
-from typing import Any, NamedTuple
+from functools import reduce
+from operator import or_
+from typing import Any
from discord.ext.commands import BadArgument
-from bot.exts.filtering._filter_context import FilterContext
-from bot.exts.filtering._filters.filter import Filter
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filters.filter import Filter, UniqueFilter
from bot.exts.filtering._settings import ActionSettings, Defaults, create_settings
from bot.exts.filtering._utils import FieldRequiring, past_tense
from bot.log import get_logger
@@ -36,7 +43,8 @@ def list_type_converter(argument: str) -> ListType:
raise BadArgument(f"No matching list type found for {argument!r}.")
-class AtomicList(NamedTuple):
+@dataclass(frozen=True)
+class AtomicList:
"""
Represents the atomic structure of a single filter list as it appears in the database.
@@ -68,11 +76,16 @@ class AtomicList(NamedTuple):
If the filter is relevant in context, see if it actually triggers.
"""
- passed_by_default, failed_by_default = self.defaults.validations.evaluate(ctx)
+ return self._create_filter_list_result(ctx, self.defaults, self.filters.values())
+
+ @staticmethod
+ def _create_filter_list_result(ctx: FilterContext, defaults: Defaults, filters: Iterable[Filter]) -> list[Filter]:
+ """A helper function to evaluate the result of `filter_list_result`."""
+ passed_by_default, failed_by_default = defaults.validations.evaluate(ctx)
default_answer = not bool(failed_by_default)
relevant_filters = []
- for filter_ in self.filters.values():
+ for filter_ in filters:
if not filter_.validations:
if default_answer and filter_.triggered_on(ctx):
relevant_filters.append(filter_)
@@ -94,8 +107,36 @@ class AtomicList(NamedTuple):
raise ValueError(f"Couldn't find a setting named {setting_name!r}.")
return value
+ def merge_actions(self, filters: list[Filter]) -> ActionSettings | None:
+ """
+ Merge the settings of the given filters, with the list's defaults as fallback.
+
+ If `merge_default` is True, include it in the merge instead of using it as a fallback.
+ """
+ try:
+ result = reduce(or_, (filter_.actions for filter_ in filters if filter_.actions))
+ except TypeError: # The sequence fed to reduce is empty.
+ return None
+
+ return result.fallback_to(self.defaults.actions)
+
+ @staticmethod
+ def format_messages(triggers: list[Filter], *, expand_single_filter: bool = True) -> list[str]:
+ """Convert the filters into strings that can be added to the alert embed."""
+ if len(triggers) == 1 and expand_single_filter:
+ message = f"#{triggers[0].id} (`{triggers[0].content}`)"
+ if triggers[0].description:
+ message += f" - {triggers[0].description}"
+ messages = [message]
+ else:
+ messages = [f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers]
+ return messages
+
+
+T = typing.TypeVar("T", bound=Filter)
+
-class FilterList(FieldRequiring, dict[ListType, AtomicList]):
+class FilterList(dict[ListType, AtomicList], typing.Generic[T], FieldRequiring):
"""Dispatches events to lists of _filters, and aggregates the responses into a single list of actions to take."""
# Each subclass must define a name matching the filter_list name we're expecting to receive from the database.
@@ -110,39 +151,70 @@ class FilterList(FieldRequiring, dict[ListType, AtomicList]):
filters = {}
for filter_data in list_data["filters"]:
- filters[filter_data["id"]] = self._create_filter(filter_data, defaults)
+ new_filter = self._create_filter(filter_data, defaults)
+ if new_filter:
+ filters[filter_data["id"]] = new_filter
self[list_type] = AtomicList(list_data["id"], self.name, list_type, defaults, filters)
return self[list_type]
- def add_filter(self, list_type: ListType, filter_data: dict) -> Filter:
+ def add_filter(self, list_type: ListType, filter_data: dict) -> T | None:
"""Add a filter to the list of the specified type."""
new_filter = self._create_filter(filter_data, self[list_type].defaults)
- self[list_type].filters[filter_data["id"]] = new_filter
+ if new_filter:
+ self[list_type].filters[filter_data["id"]] = new_filter
return new_filter
@abstractmethod
- def get_filter_type(self, content: str) -> type[Filter]:
+ def get_filter_type(self, content: str) -> type[T]:
"""Get a subclass of filter matching the filter list and the filter's content."""
@property
@abstractmethod
- def filter_types(self) -> set[type[Filter]]:
+ def filter_types(self) -> set[type[T]]:
"""Return the types of filters used by this list."""
@abstractmethod
async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
- def _create_filter(self, filter_data: dict, defaults: Defaults) -> Filter:
+ def _create_filter(self, filter_data: dict, defaults: Defaults) -> T | None:
"""Create a filter from the given data."""
try:
filter_type = self.get_filter_type(filter_data["content"])
- new_filter = filter_type(filter_data, defaults)
+ if filter_type:
+ return filter_type(filter_data, defaults)
+ else:
+ return None
except TypeError as e:
log.warning(e)
- else:
- return new_filter
def __hash__(self):
return hash(id(self))
+
+
+@dataclass(frozen=True)
+class SubscribingAtomicList(AtomicList):
+ """
+ A base class for a list of unique filters.
+
+ Unique filters are ones that should only be run once in a given context.
+ Each unique filter is subscribed to a subset of events to respond to.
+ """
+
+ subscriptions: defaultdict[Event, list[Filter]] = dataclasses.field(default_factory=lambda: defaultdict(list))
+
+ def subscribe(self, filter_: UniqueFilter, *events: Event) -> None:
+ """
+ Subscribe a unique filter to the given events.
+
+ The filter is added to a list for each event. When the event is triggered, the filter context will be
+ dispatched to the subscribed filters.
+ """
+ for event in events:
+ if filter_ not in self.subscriptions[event]:
+ self.subscriptions[event].append(filter_)
+
+ def filter_list_result(self, ctx: FilterContext) -> list[Filter]:
+ """Sift through the list of filters, and return only the ones which apply to the given context."""
+ return self._create_filter_list_result(ctx, self.defaults, self.subscriptions[ctx.event])
diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py
index d35fdd4a4..0b84aec0e 100644
--- a/bot/exts/filtering/_filter_lists/invite.py
+++ b/bot/exts/filtering/_filter_lists/invite.py
@@ -1,8 +1,6 @@
from __future__ import annotations
import typing
-from functools import reduce
-from operator import or_
from botcore.utils.regex import DISCORD_INVITE
from discord import Embed, Invite
@@ -20,7 +18,7 @@ if typing.TYPE_CHECKING:
from bot.exts.filtering.filtering import Filtering
-class InviteList(FilterList):
+class InviteList(FilterList[InviteFilter]):
"""
A list of filters, each looking for guild invites to a specific guild.
@@ -52,10 +50,6 @@ class InviteList(FilterList):
async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
"""Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
- _, failed = self[ListType.ALLOW].defaults.validations.evaluate(ctx)
- if failed: # There's no invite filtering in this context.
- return None, []
-
text = clean_input(ctx.content)
# Avoid escape characters
@@ -66,62 +60,73 @@ class InviteList(FilterList):
if not invite_codes:
return None, []
- # Sort the invites into three categories:
- denied_by_default = dict() # Denied unless whitelisted.
- allowed_by_default = dict() # Allowed unless blacklisted (partnered or verified servers).
- disallowed_invites = dict() # Always denied (invalid invites).
+ _, failed = self[ListType.ALLOW].defaults.validations.evaluate(ctx)
+ # If the allowed list doesn't operate in the context, unknown invites are allowed.
+ check_if_allowed = not failed
+
+ # Sort the invites into two categories:
+ invites_for_inspection = dict() # Found guild invites requiring further inspection.
+ unknown_invites = dict() # Either don't resolve or group DMs.
for invite_code in invite_codes:
try:
invite = await bot.instance.fetch_invite(invite_code)
except NotFound:
- disallowed_invites[invite_code] = None
+ if check_if_allowed:
+ unknown_invites[invite_code] = None
else:
- if not invite.guild:
- disallowed_invites[invite_code] = invite
- else:
- if "PARTNERED" in invite.guild.features or "VERIFIED" in invite.guild.features:
- allowed_by_default[invite_code] = invite
- else:
- denied_by_default[invite_code] = invite
-
- # Add the disallowed by default unless they're whitelisted.
- guilds_for_inspection = {invite.guild.id for invite in denied_by_default.values()}
- new_ctx = ctx.replace(content=guilds_for_inspection)
- allowed = {
- filter_.content for filter_ in self[ListType.ALLOW].filters.values() if filter_.triggered_on(new_ctx)
+ if invite.guild:
+ invites_for_inspection[invite_code] = invite
+ elif check_if_allowed: # Group DM
+ unknown_invites[invite_code] = invite
+
+ # Find any blocked invites
+ new_ctx = ctx.replace(content={invite.guild.id for invite in invites_for_inspection.values()})
+ triggered = self[ListType.DENY].filter_list_result(new_ctx)
+ blocked_guilds = {filter_.content for filter_ in triggered}
+ blocked_invites = {
+ code: invite for code, invite in invites_for_inspection.items() if invite.guild.id in blocked_guilds
}
- disallowed_invites.update({
- invite_code: invite for invite_code, invite in denied_by_default.items() if invite.guild.id not in allowed
- })
-
- # Add the allowed by default only if they're blacklisted.
- guilds_for_inspection = {invite.guild.id for invite in allowed_by_default.values()}
- new_ctx = ctx.replace(content=guilds_for_inspection)
- triggered = self[ListType.ALLOW].filter_list_result(new_ctx)
- disallowed_invites.update({
- invite_code: invite for invite_code, invite in allowed_by_default.items()
- if invite.guild.id in {filter_.content for filter_ in triggered}
- })
-
- if not disallowed_invites:
+
+ # Remove the ones which are already confirmed as blocked, or otherwise ones which are partnered or verified.
+ invites_for_inspection = {
+ code: invite for code, invite in invites_for_inspection.items()
+ if invite.guild.id not in blocked_guilds
+ and "PARTNERED" not in invite.guild.features and "VERIFIED" not in invite.guild.features
+ }
+
+ # Remove any remaining invites which are allowed
+ guilds_for_inspection = {invite.guild.id for invite in invites_for_inspection.values()}
+
+ if check_if_allowed: # Whether unknown invites need to be checked.
+ new_ctx = ctx.replace(content=guilds_for_inspection)
+ allowed = {
+ filter_.content for filter_ in self[ListType.ALLOW].filters.values() if filter_.triggered_on(new_ctx)
+ }
+ unknown_invites.update({
+ code: invite for code, invite in invites_for_inspection.items() if invite.guild.id not in allowed
+ })
+
+ if not triggered and not unknown_invites:
return None, []
actions = None
- if len(disallowed_invites) > len(triggered): # There are invites which weren't allowed but aren't blacklisted.
- deny_defaults = self[ListType.DENY].defaults.actions
- actions = reduce(
- or_,
- (
- filter_.actions.fallback_to(deny_defaults) if filter_.actions else deny_defaults
- for filter_ in triggered
- ),
- self[ListType.ALLOW].defaults.actions
- )
- elif triggered:
- actions = reduce(or_, (filter_.actions for filter_ in triggered))
- ctx.matches += {match[0] for match in matches if match.group("invite") in disallowed_invites}
- ctx.alert_embeds += (self._guild_embed(invite) for invite in disallowed_invites.values() if invite)
- return actions, [f"`{invite}`" for invite in disallowed_invites]
+ if unknown_invites: # There are invites which weren't allowed but aren't explicitly blocked.
+ actions = self[ListType.ALLOW].defaults.actions
+ # Blocked invites come second so that their actions have preference.
+ if triggered:
+ if actions:
+ actions |= self[ListType.DENY].merge_actions(triggered)
+ else:
+ actions = self[ListType.DENY].merge_actions(triggered)
+
+ blocked_invites |= unknown_invites
+ ctx.matches += {match[0] for match in matches if match.group("invite") in blocked_invites}
+ ctx.alert_embeds += (self._guild_embed(invite) for invite in blocked_invites.values() if invite)
+ messages = self[ListType.DENY].format_messages(triggered)
+ messages += [
+ f"`{code} - {invite.guild.id}`" if invite else f"`{code}`" for code, invite in unknown_invites.items()
+ ]
+ return actions, messages
@staticmethod
def _guild_embed(invite: Invite) -> Embed:
diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py
index 4b161d9b7..c7d7cb444 100644
--- a/bot/exts/filtering/_filter_lists/token.py
+++ b/bot/exts/filtering/_filter_lists/token.py
@@ -2,8 +2,6 @@ from __future__ import annotations
import re
import typing
-from functools import reduce
-from operator import or_
from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType
@@ -18,7 +16,7 @@ if typing.TYPE_CHECKING:
SPOILER_RE = re.compile(r"(\|\|.+?\|\|)", re.DOTALL)
-class TokensList(FilterList):
+class TokensList(FilterList[TokenFilter]):
"""
A list of filters, each looking for a specific token in the given content given as regex.
@@ -59,20 +57,8 @@ class TokensList(FilterList):
actions = None
messages = []
if triggers:
- action_defaults = self[ListType.DENY].defaults.actions
- actions = reduce(
- or_,
- (filter_.actions.fallback_to(action_defaults) if filter_.actions else action_defaults
- for filter_ in triggers
- )
- )
- if len(triggers) == 1:
- message = f"#{triggers[0].id} (`{triggers[0].content}`)"
- if triggers[0].description:
- message += f" - {triggers[0].description}"
- messages = [message]
- else:
- messages = [f"#{filter_.id} (`{filter_.content}`)" for filter_ in triggers]
+ actions = self[ListType.DENY].merge_actions(triggers)
+ messages = self[ListType.DENY].format_messages(triggers)
return actions, messages
@staticmethod
diff --git a/bot/exts/filtering/_filter_lists/unique.py b/bot/exts/filtering/_filter_lists/unique.py
new file mode 100644
index 000000000..63caa7d36
--- /dev/null
+++ b/bot/exts/filtering/_filter_lists/unique.py
@@ -0,0 +1,75 @@
+from botcore.utils.logging import get_logger
+from discord.ext.commands import Cog
+
+from bot.exts.filtering._filter_context import FilterContext
+from bot.exts.filtering._filter_lists.filter_list import FilterList, ListType, SubscribingAtomicList
+from bot.exts.filtering._filters.filter import UniqueFilter
+from bot.exts.filtering._filters.unique import unique_filter_types
+from bot.exts.filtering._settings import ActionSettings, Defaults, create_settings
+
+log = get_logger(__name__)
+
+
+class UniquesList(FilterList[UniqueFilter]):
+ """
+ A list of unique filters.
+
+ Unique filters are ones that should only be run once in a given context.
+ Each unique filter subscribes to a subset of events to respond to.
+ """
+
+ name = "unique"
+ _already_warned = set()
+
+ def __init__(self, filtering_cog: Cog):
+ super().__init__()
+ self.filtering_cog = filtering_cog # This is typed as a Cog to avoid a circular import.
+ self.loaded_types: dict[str, type[UniqueFilter]] = {}
+
+ def add_list(self, list_data: dict) -> SubscribingAtomicList:
+ """Add a new type of list (such as a whitelist or a blacklist) this filter list."""
+ actions, validations = create_settings(list_data["settings"], keep_empty=True)
+ list_type = ListType(list_data["list_type"])
+ defaults = Defaults(actions, validations)
+ new_list = SubscribingAtomicList(list_data["id"], self.name, list_type, defaults, {})
+ self[list_type] = new_list
+
+ filters = {}
+ events = set()
+ for filter_data in list_data["filters"]:
+ new_filter = self._create_filter(filter_data, defaults)
+ if new_filter:
+ new_list.subscribe(new_filter, *new_filter.events)
+ filters[filter_data["id"]] = new_filter
+ self.loaded_types[new_filter.name] = type(new_filter)
+ events.update(new_filter.events)
+
+ new_list.filters.update(filters)
+ if hasattr(self.filtering_cog, "subscribe"): # Subscribe the filter list to any new events found.
+ self.filtering_cog.subscribe(self, *events)
+ return new_list
+
+ def get_filter_type(self, content: str) -> type[UniqueFilter] | None:
+ """Get a subclass of filter matching the filter list and the filter's content."""
+ try:
+ return unique_filter_types[content]
+ except KeyError:
+ if content not in self._already_warned:
+ log.warn(f"A unique filter named {content} was supplied, but no matching implementation found.")
+ self._already_warned.add(content)
+ return None
+
+ @property
+ def filter_types(self) -> set[type[UniqueFilter]]:
+ """Return the types of filters used by this list."""
+ return set(self.loaded_types.values())
+
+ async def actions_for(self, ctx: FilterContext) -> tuple[ActionSettings | None, list[str]]:
+ """Dispatch the given event to the list's filters, and return actions to take and messages to relay to mods."""
+ triggers = self[ListType.DENY].filter_list_result(ctx)
+ actions = None
+ messages = []
+ if triggers:
+ actions = self[ListType.DENY].merge_actions(triggers)
+ messages = self[ListType.DENY].format_messages(triggers)
+ return actions, messages
diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py
index 095799781..b0d19d3a8 100644
--- a/bot/exts/filtering/_filters/filter.py
+++ b/bot/exts/filtering/_filters/filter.py
@@ -1,9 +1,9 @@
-from abc import abstractmethod
+from abc import ABC, abstractmethod
from typing import Any
from pydantic import ValidationError
-from bot.exts.filtering._filter_context import FilterContext
+from bot.exts.filtering._filter_context import Event, FilterContext
from bot.exts.filtering._settings import Defaults, create_settings
from bot.exts.filtering._utils import FieldRequiring
@@ -79,3 +79,13 @@ class Filter(FieldRequiring):
if self.description:
string += f" - {self.description}"
return string
+
+
+class UniqueFilter(Filter, ABC):
+ """
+ Unique filters are ones that should only be run once in a given context.
+
+ This is as opposed to say running many domain filters on the same message.
+ """
+
+ events: tuple[Event, ...] = FieldRequiring.MUST_SET
diff --git a/bot/exts/filtering/_filters/unique/__init__.py b/bot/exts/filtering/_filters/unique/__init__.py
new file mode 100644
index 000000000..ce78d6922
--- /dev/null
+++ b/bot/exts/filtering/_filters/unique/__init__.py
@@ -0,0 +1,9 @@
+from os.path import dirname
+
+from bot.exts.filtering._filters.filter import UniqueFilter
+from bot.exts.filtering._utils import subclasses_in_package
+
+unique_filter_types = subclasses_in_package(dirname(__file__), f"{__name__}.", UniqueFilter)
+unique_filter_types = {filter_.name: filter_ for filter_ in unique_filter_types}
+
+__all__ = [unique_filter_types]
diff --git a/bot/exts/filtering/_filters/unique/everyone.py b/bot/exts/filtering/_filters/unique/everyone.py
new file mode 100644
index 000000000..06d3a19bb
--- /dev/null
+++ b/bot/exts/filtering/_filters/unique/everyone.py
@@ -0,0 +1,28 @@
+import re
+
+from bot.constants import Guild
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filters.filter import UniqueFilter
+
+EVERYONE_PING_RE = re.compile(rf"@everyone|<@&{Guild.id}>|@here")
+CODE_BLOCK_RE = re.compile(
+ r"(?P<delim>``?)[^`]+?(?P=delim)(?!`+)" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+
+class EveryoneFilter(UniqueFilter):
+ """Filter messages which contain `@everyone` and `@here` tags outside a codeblock."""
+
+ name = "everyone"
+ events = (Event.MESSAGE, Event.MESSAGE_EDIT)
+
+ def triggered_on(self, ctx: FilterContext) -> bool:
+ """Search for the filter's content within a given context."""
+ # First pass to avoid running re.sub on every message
+ if not EVERYONE_PING_RE.search(ctx.content):
+ return False
+
+ content_without_codeblocks = CODE_BLOCK_RE.sub("", ctx.content)
+ return bool(EVERYONE_PING_RE.search(content_without_codeblocks))
diff --git a/bot/exts/filtering/_utils.py b/bot/exts/filtering/_utils.py
index 7149f7254..a38fa22e4 100644
--- a/bot/exts/filtering/_utils.py
+++ b/bot/exts/filtering/_utils.py
@@ -16,7 +16,7 @@ ZALGO_RE = regex.compile(rf"[\p{{NONSPACING MARK}}\p{{ENCLOSING MARK}}--[{VARIAT
T = TypeVar('T')
-def subclasses_in_package(package: str, prefix: str, parent: type) -> set[type]:
+def subclasses_in_package(package: str, prefix: str, parent: T) -> set[T]:
"""Return all the subclasses of class `parent`, found in the top-level of `package`, given by absolute path."""
subclasses = set()
diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py
index 890b25718..837cd45c1 100644
--- a/bot/exts/filtering/filtering.py
+++ b/bot/exts/filtering/filtering.py
@@ -954,8 +954,11 @@ class Filtering(Cog):
}
response = await bot.instance.api_client.post('bot/filter/filters', json=to_serializable(payload))
new_filter = filter_list.add_filter(list_type, response)
- extra_msg = Filtering._identical_filters_message(content, filter_list, list_type, new_filter)
- await msg.reply(f"✅ Added filter: {new_filter}" + extra_msg)
+ if new_filter:
+ extra_msg = Filtering._identical_filters_message(content, filter_list, list_type, new_filter)
+ await msg.reply(f"✅ Added filter: {new_filter}" + extra_msg)
+ else:
+ await msg.reply(":x: Could not create the filter. Are you sure it's implemented?")
@staticmethod
async def _patch_filter(
@@ -990,6 +993,7 @@ class Filtering(Cog):
response = await bot.instance.api_client.patch(
f'bot/filter/filters/{filter_.id}', json=to_serializable(payload)
)
+ # Return type can be None, but if it's being edited then it's not supposed to be.
edited_filter = filter_list.add_filter(list_type, response)
extra_msg = Filtering._identical_filters_message(content, filter_list, list_type, edited_filter)
await msg.reply(f"✅ Edited filter: {edited_filter}" + extra_msg)