aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar mbaruh <[email protected]>2022-10-23 01:11:59 +0300
committerGravatar mbaruh <[email protected]>2022-10-23 01:11:59 +0300
commite86fe0b9bde9504a9cbc468bd0200ffe1233a47c (patch)
tree0d886e50f2db6db6dd1ab338f2b441f94eb06c22
parentAdd everyone filter, fix invite filtering (diff)
Add webhook filter
This adds a way to add custom coroutines to the context, to execute when the filtering result is actioned.
-rw-r--r--bot/exts/filtering/_filter_context.py2
-rw-r--r--bot/exts/filtering/_filters/unique/webhook.py58
-rw-r--r--bot/exts/filtering/_settings.py11
-rw-r--r--bot/exts/filtering/_settings_types/actions/delete_messages.py8
-rw-r--r--bot/exts/filtering/_settings_types/actions/infraction_and_notification.py2
5 files changed, 76 insertions, 5 deletions
diff --git a/bot/exts/filtering/_filter_context.py b/bot/exts/filtering/_filter_context.py
index 5e2f5b45b..da7ba0c77 100644
--- a/bot/exts/filtering/_filter_context.py
+++ b/bot/exts/filtering/_filter_context.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+from collections.abc import Coroutine
from dataclasses import dataclass, field, replace
from enum import Enum, auto
from typing import Optional, Union
@@ -34,6 +35,7 @@ class FilterContext:
action_descriptions: list = field(default_factory=list) # What actions were taken
matches: list = field(default_factory=list) # What exactly was found
notification_domain: str = field(default_factory=str) # A domain to send the user for context
+ additional_actions: list[Coroutine] = field(default_factory=list) # Additional actions to perform
def replace(self, **changes) -> FilterContext:
"""Return a new context object assigning new values to the specified fields."""
diff --git a/bot/exts/filtering/_filters/unique/webhook.py b/bot/exts/filtering/_filters/unique/webhook.py
new file mode 100644
index 000000000..10f27a922
--- /dev/null
+++ b/bot/exts/filtering/_filters/unique/webhook.py
@@ -0,0 +1,58 @@
+import re
+
+from botcore.utils.logging import get_logger
+
+import bot
+from bot import constants
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filters.filter import UniqueFilter
+from bot.exts.moderation.modlog import ModLog
+
+log = get_logger(__name__)
+
+
+WEBHOOK_URL_RE = re.compile(
+ r"((?:https?://)?(?:ptb\.|canary\.)?discord(?:app)?\.com/api/webhooks/\d+/)\S+/?",
+ re.IGNORECASE
+)
+
+
+class WebhookFilter(UniqueFilter):
+ """Scan messages to detect Discord webhooks links."""
+
+ name = "webhook"
+ events = (Event.MESSAGE, Event.MESSAGE_EDIT)
+
+ @property
+ def mod_log(self) -> ModLog | None:
+ """Get current instance of `ModLog`."""
+ return bot.instance.get_cog("ModLog")
+
+ def triggered_on(self, ctx: FilterContext) -> bool:
+ """Search for a webhook in the given content. If found, attempt to delete it."""
+ matches = set(WEBHOOK_URL_RE.finditer(ctx.content))
+ if not matches:
+ return False
+
+ # Don't log this.
+ if mod_log := self.mod_log:
+ mod_log.ignore(constants.Event.message_delete, ctx.message.id)
+
+ for i, match in enumerate(matches, start=1):
+ extra = "" if len(matches) == 1 else f" ({i})"
+ # Queue the webhook for deletion.
+ ctx.additional_actions.append(self._delete_webhook(ctx, match[0], extra))
+ # Don't show the full webhook in places such as the mod alert.
+ ctx.content = ctx.content.replace(match[0], match[1] + "xxx")
+
+ return True
+
+ @staticmethod
+ async def _delete_webhook(ctx: FilterContext, webhook_url: str, extra_message: str) -> None:
+ """Delete the given webhook and update the filter context."""
+ async with bot.instance.http_session.delete(webhook_url) as resp:
+ # The Discord API Returns a 204 NO CONTENT response on success.
+ if resp.status == 204:
+ ctx.action_descriptions.append("webhook deleted" + extra_message)
+ else:
+ ctx.action_descriptions.append("failed to delete webhook" + extra_message)
diff --git a/bot/exts/filtering/_settings.py b/bot/exts/filtering/_settings.py
index 066c7a369..d53334c1c 100644
--- a/bot/exts/filtering/_settings.py
+++ b/bot/exts/filtering/_settings.py
@@ -193,10 +193,19 @@ class ActionSettings(Settings[ActionEntry]):
return result
async def action(self, ctx: FilterContext) -> None:
- """Execute the action of every action entry stored."""
+ """Execute the action of every action entry stored, as well as any additional actions in the context."""
for entry in self.values():
await entry.action(ctx)
+ _i = len(ctx.additional_actions)
+ try:
+ for _i, action in enumerate(ctx.additional_actions):
+ await action
+ except Exception:
+ for action in ctx.additional_actions[_i+1:]:
+ action.close()
+ raise
+
def fallback_to(self, fallback: ActionSettings) -> ActionSettings:
"""Fill in missing entries from `fallback`."""
new_actions = self.copy()
diff --git a/bot/exts/filtering/_settings_types/actions/delete_messages.py b/bot/exts/filtering/_settings_types/actions/delete_messages.py
index d1ddf8241..1770c29ec 100644
--- a/bot/exts/filtering/_settings_types/actions/delete_messages.py
+++ b/bot/exts/filtering/_settings_types/actions/delete_messages.py
@@ -1,4 +1,3 @@
-from contextlib import suppress
from typing import ClassVar
from discord.errors import NotFound
@@ -25,9 +24,12 @@ class DeleteMessages(ActionEntry):
if not ctx.message.guild:
return
- with suppress(NotFound):
+ try:
await ctx.message.delete()
- ctx.action_descriptions.append("deleted")
+ except NotFound:
+ ctx.action_descriptions.append("failed to delete")
+ else:
+ ctx.action_descriptions.append("deleted")
def __or__(self, other: ActionEntry):
"""Combines two actions of the same type. Each type of action is executed once per filter."""
diff --git a/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py b/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py
index 7835a7d0b..922101d6d 100644
--- a/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py
+++ b/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py
@@ -149,7 +149,7 @@ class InfractionAndNotification(ActionEntry):
await ctx.author.send(dm_content, embed=dm_embed)
ctx.action_descriptions.append("notified")
except Forbidden:
- ctx.action_descriptions.append("notified (failed)")
+ ctx.action_descriptions.append("failed to notify")
if self.infraction_type is not None:
await self.infraction_type.invoke(