aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar mbaruh <[email protected]>2022-10-26 23:39:44 +0300
committerGravatar mbaruh <[email protected]>2022-10-26 23:39:44 +0300
commit08208df37b036cfc76eea0fd276aba61b602951c (patch)
tree306d85c2a76469b23a2992a442882965d7652d4a
parentAdd rich embed filter (diff)
Add Discord token filter
Also fix a bug with the cog trying to serialize a set when trying to modify the DB with no UI. Also fix a bug with the domain setting description having a mismatching name.
-rw-r--r--bot/exts/filtering/_filter_lists/filter_list.py7
-rw-r--r--bot/exts/filtering/_filters/domain.py2
-rw-r--r--bot/exts/filtering/_filters/unique/discord_token.py217
-rw-r--r--bot/exts/filtering/_settings_types/actions/ping.py32
-rw-r--r--bot/exts/filtering/_ui/ui.py2
-rw-r--r--bot/exts/filtering/_utils.py30
6 files changed, 255 insertions, 35 deletions
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py
index 55204335b..50793b085 100644
--- a/bot/exts/filtering/_filter_lists/filter_list.py
+++ b/bot/exts/filtering/_filter_lists/filter_list.py
@@ -202,7 +202,7 @@ class SubscribingAtomicList(AtomicList):
Each unique filter is subscribed to a subset of events to respond to.
"""
- subscriptions: defaultdict[Event, list[Filter]] = dataclasses.field(default_factory=lambda: defaultdict(list))
+ subscriptions: defaultdict[Event, list[int]] = dataclasses.field(default_factory=lambda: defaultdict(list))
def subscribe(self, filter_: UniqueFilter, *events: Event) -> None:
"""
@@ -213,8 +213,9 @@ class SubscribingAtomicList(AtomicList):
"""
for event in events:
if filter_ not in self.subscriptions[event]:
- self.subscriptions[event].append(filter_)
+ self.subscriptions[event].append(filter_.id)
def filter_list_result(self, ctx: FilterContext) -> list[Filter]:
"""Sift through the list of filters, and return only the ones which apply to the given context."""
- return self._create_filter_list_result(ctx, self.defaults, self.subscriptions[ctx.event])
+ event_filters = [self.filters[id_] for id_ in self.subscriptions[ctx.event]]
+ return self._create_filter_list_result(ctx, self.defaults, event_filters)
diff --git a/bot/exts/filtering/_filters/domain.py b/bot/exts/filtering/_filters/domain.py
index 4976198cd..e22cafbb7 100644
--- a/bot/exts/filtering/_filters/domain.py
+++ b/bot/exts/filtering/_filters/domain.py
@@ -15,7 +15,7 @@ URL_RE = re.compile(r"(?:https?://)?(\S+?)[\\/]*", flags=re.IGNORECASE)
class ExtraDomainSettings(BaseModel):
"""Extra settings for how domains should be matched in a message."""
- exact_description: ClassVar[str] = (
+ subdomains_description: ClassVar[str] = (
"A boolean. If True, will will only trigger for subdomains and subpaths, and not for the domain itself."
)
diff --git a/bot/exts/filtering/_filters/unique/discord_token.py b/bot/exts/filtering/_filters/unique/discord_token.py
new file mode 100644
index 000000000..571a8a9b1
--- /dev/null
+++ b/bot/exts/filtering/_filters/unique/discord_token.py
@@ -0,0 +1,217 @@
+import base64
+import re
+from collections.abc import Callable, Coroutine
+from typing import ClassVar, NamedTuple
+
+import discord
+from botcore.utils.logging import get_logger
+from botcore.utils.members import get_or_fetch_member
+from pydantic import BaseModel, Field
+
+import bot
+from bot import constants, utils
+from bot.exts.filtering._filter_context import Event, FilterContext
+from bot.exts.filtering._filters.filter import UniqueFilter
+from bot.exts.filtering._utils import resolve_mention
+from bot.exts.moderation.modlog import ModLog
+from bot.utils.messages import format_user
+
+log = get_logger(__name__)
+
+
+LOG_MESSAGE = (
+ "Censored a seemingly valid token sent by {author} in {channel}. "
+ "Token was: `{user_id}.{timestamp}.{hmac}`."
+)
+UNKNOWN_USER_LOG_MESSAGE = "Decoded user ID: `{user_id}` (Not present in server)."
+KNOWN_USER_LOG_MESSAGE = (
+ "Decoded user ID: `{user_id}` **(Present in server)**.\n"
+ "This matches `{user_name}` and means this is likely a valid **{kind}** token."
+)
+DISCORD_EPOCH = 1_420_070_400
+TOKEN_EPOCH = 1_293_840_000
+
+# Three parts delimited by dots: user ID, creation timestamp, HMAC.
+# The HMAC isn't parsed further, but it's in the regex to ensure it at least exists in the string.
+# Each part only matches base64 URL-safe characters.
+# These regexes were taken from discord-developers, which are used by the client itself.
+TOKEN_RE = re.compile(r"([\w-]{10,})\.([\w-]{5,})\.([\w-]{10,})")
+
+
+class ExtraDiscordTokenSettings(BaseModel):
+ """Extra settings for who should be pinged when a Discord token is detected."""
+
+ pings_for_bot_description: ClassVar[str] = "A sequence. Who should be pinged if the token found belongs to a bot."
+ pings_for_user_description: ClassVar[str] = "A sequence. Who should be pinged if the token found belongs to a user."
+
+ pings_for_bot: set[str] = Field(default_factory=set)
+ pings_for_user: set[str] = Field(default_factory=lambda: {"Moderators"})
+
+
+class Token(NamedTuple):
+ """A Discord Bot token."""
+
+ user_id: str
+ timestamp: str
+ hmac: str
+
+
+class DiscordTokenFilter(UniqueFilter):
+ """Scans messages for potential discord client tokens and removes them."""
+
+ name = "discord_token"
+ events = (Event.MESSAGE, Event.MESSAGE_EDIT)
+ extra_fields_type = ExtraDiscordTokenSettings
+
+ @property
+ def mod_log(self) -> ModLog | None:
+ """Get currently loaded ModLog cog instance."""
+ return bot.instance.get_cog("ModLog")
+
+ def triggered_on(self, ctx: FilterContext) -> bool:
+ """Return whether the message contains Discord client tokens."""
+ found_token = self.find_token_in_message(ctx.content)
+ if not found_token:
+ return False
+
+ if mod_log := self.mod_log:
+ mod_log.ignore(constants.Event.message_delete, ctx.message.id)
+ ctx.content = ctx.content.replace(found_token.hmac, self.censor_hmac(found_token.hmac))
+ ctx.additional_actions.append(self._create_token_alert_embed_wrapper(found_token))
+ return True
+
+ def _create_token_alert_embed_wrapper(self, found_token: Token) -> Callable[[FilterContext], Coroutine]:
+ """Create the action to perform when an alert should be sent for a message containing a Discord token."""
+ async def _create_token_alert_embed(ctx: FilterContext) -> None:
+ """Add an alert embed to the context with info about the token sent."""
+ userid_message, is_user = await self.format_userid_log_message(ctx.message, found_token)
+ log_message = self.format_log_message(ctx.message, found_token)
+ log.debug(log_message)
+
+ if is_user:
+ mentions = map(resolve_mention, self.extra_fields.pings_for_user)
+ color = discord.Colour.red()
+ else:
+ mentions = map(resolve_mention, self.extra_fields.pings_for_bot)
+ color = discord.Colour.blue()
+ unmentioned = [mention for mention in mentions if mention not in ctx.alert_content]
+ if unmentioned:
+ ctx.alert_content = f"{' '.join(unmentioned)} {ctx.alert_content}"
+ ctx.alert_embeds.append(discord.Embed(colour=color, description=userid_message))
+
+ return _create_token_alert_embed
+
+ @classmethod
+ async def format_userid_log_message(cls, msg: discord.Message, token: Token) -> tuple[str, bool]:
+ """
+ Format the portion of the log message that includes details about the detected user ID.
+
+ If the user is resolved to a member, the format includes the user ID, name, and the
+ kind of user detected.
+ If it is resolved to a user or a member, and it is not a bot, also return True.
+ Returns a tuple of (log_message, is_user)
+ """
+ user_id = cls.extract_user_id(token.user_id)
+ user = await get_or_fetch_member(msg.guild, user_id)
+
+ if user:
+ return KNOWN_USER_LOG_MESSAGE.format(
+ user_id=user_id,
+ user_name=str(user),
+ kind="BOT" if user.bot else "USER",
+ ), True
+ else:
+ return UNKNOWN_USER_LOG_MESSAGE.format(user_id=user_id), False
+
+ @staticmethod
+ def censor_hmac(hmac: str) -> str:
+ """Return a censored version of the hmac."""
+ return 'x' * (len(hmac) - 3) + hmac[-3:]
+
+ @classmethod
+ def format_log_message(cls, msg: discord.Message, token: Token) -> str:
+ """Return the generic portion of the log message to send for `token` being censored in `msg`."""
+ return LOG_MESSAGE.format(
+ author=format_user(msg.author),
+ channel=msg.channel.mention,
+ user_id=token.user_id,
+ timestamp=token.timestamp,
+ hmac=cls.censor_hmac(token.hmac),
+ )
+
+ @classmethod
+ def find_token_in_message(cls, content: str) -> Token | None:
+ """Return a seemingly valid token found in `msg` or `None` if no token is found."""
+ # Use finditer rather than search to guard against method calls prematurely returning the
+ # token check (e.g. `message.channel.send` also matches our token pattern)
+ for match in TOKEN_RE.finditer(content):
+ token = Token(*match.groups())
+ if (
+ (cls.extract_user_id(token.user_id) is not None)
+ and cls.is_valid_timestamp(token.timestamp)
+ and cls.is_maybe_valid_hmac(token.hmac)
+ ):
+ # Short-circuit on first match
+ return token
+
+ # No matching substring
+ return None
+
+ @staticmethod
+ def extract_user_id(b64_content: str) -> int | None:
+ """Return a user ID integer from part of a potential token, or None if it couldn't be decoded."""
+ b64_content = utils.pad_base64(b64_content)
+
+ try:
+ decoded_bytes = base64.urlsafe_b64decode(b64_content)
+ string = decoded_bytes.decode('utf-8')
+ if not (string.isascii() and string.isdigit()):
+ # This case triggers if there are fancy unicode digits in the base64 encoding,
+ # that means it's not a valid user id.
+ return None
+ return int(string)
+ except ValueError:
+ return None
+
+ @staticmethod
+ def is_valid_timestamp(b64_content: str) -> bool:
+ """
+ Return True if `b64_content` decodes to a valid timestamp.
+
+ If the timestamp is greater than the Discord epoch, it's probably valid.
+ See: https://i.imgur.com/7WdehGn.png
+ """
+ b64_content = utils.pad_base64(b64_content)
+
+ try:
+ decoded_bytes = base64.urlsafe_b64decode(b64_content)
+ timestamp = int.from_bytes(decoded_bytes, byteorder="big")
+ except ValueError as e:
+ log.debug(f"Failed to decode token timestamp '{b64_content}': {e}")
+ return False
+
+ # Seems like newer tokens don't need the epoch added, but add anyway since an upper bound
+ # is not checked.
+ if timestamp + TOKEN_EPOCH >= DISCORD_EPOCH:
+ return True
+ else:
+ log.debug(f"Invalid token timestamp '{b64_content}': smaller than Discord epoch")
+ return False
+
+ @staticmethod
+ def is_maybe_valid_hmac(b64_content: str) -> bool:
+ """
+ Determine if a given HMAC portion of a token is potentially valid.
+
+ If the HMAC has 3 or fewer characters, it's probably a dummy value like "xxxxxxxxxx",
+ and thus the token can probably be skipped.
+ """
+ unique = len(set(b64_content.lower()))
+ if unique <= 3:
+ log.debug(
+ f"Considering the HMAC {b64_content} a dummy because it has {unique}"
+ " case-insensitively unique characters"
+ )
+ return False
+ else:
+ return True
diff --git a/bot/exts/filtering/_settings_types/actions/ping.py b/bot/exts/filtering/_settings_types/actions/ping.py
index faac8f4b9..5597bdd59 100644
--- a/bot/exts/filtering/_settings_types/actions/ping.py
+++ b/bot/exts/filtering/_settings_types/actions/ping.py
@@ -1,12 +1,10 @@
-from functools import cache
from typing import ClassVar
from pydantic import validator
-import bot
-from bot.constants import Guild
from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._settings_types.settings_entry import ActionEntry
+from bot.exts.filtering._utils import resolve_mention
class Ping(ActionEntry):
@@ -38,7 +36,7 @@ class Ping(ActionEntry):
async def action(self, ctx: FilterContext) -> None:
"""Add the stored pings to the alert message content."""
mentions = self.guild_pings if ctx.channel.guild else self.dm_pings
- new_content = " ".join([self._resolve_mention(mention) for mention in mentions])
+ new_content = " ".join([resolve_mention(mention) for mention in mentions])
ctx.alert_content = f"{new_content} {ctx.alert_content}"
def __or__(self, other: ActionEntry):
@@ -47,29 +45,3 @@ class Ping(ActionEntry):
return NotImplemented
return Ping(guild_pings=self.guild_pings | other.guild_pings, dm_pings=self.dm_pings | other.dm_pings)
-
- @staticmethod
- @cache
- def _resolve_mention(mention: str) -> str:
- """Return the appropriate formatting for the formatting, be it a literal, a user ID, or a role ID."""
- guild = bot.instance.get_guild(Guild.id)
- if mention in ("here", "everyone"):
- return f"@{mention}"
- try:
- mention = int(mention) # It's an ID.
- except ValueError:
- pass
- else:
- if any(mention == role.id for role in guild.roles):
- return f"<@&{mention}>"
- else:
- return f"<@{mention}>"
-
- # It's a name
- for role in guild.roles:
- if role.name == mention:
- return role.mention
- for member in guild.members:
- if str(member) == mention:
- return member.mention
- return mention
diff --git a/bot/exts/filtering/_ui/ui.py b/bot/exts/filtering/_ui/ui.py
index c506db1fe..6a261bc46 100644
--- a/bot/exts/filtering/_ui/ui.py
+++ b/bot/exts/filtering/_ui/ui.py
@@ -74,7 +74,7 @@ def parse_value(value: str, type_: type[T]) -> T:
if hasattr(type_, "__origin__"): # In case this is a types.GenericAlias or a typing._GenericAlias
type_ = type_.__origin__
if type_ in (tuple, list, set):
- return type_(value.split(","))
+ return list(value.split(","))
if type_ is bool:
return value.lower() == "true" or value == "1"
if isinstance(type_, EnumMeta):
diff --git a/bot/exts/filtering/_utils.py b/bot/exts/filtering/_utils.py
index a38fa22e4..d5dfbfc83 100644
--- a/bot/exts/filtering/_utils.py
+++ b/bot/exts/filtering/_utils.py
@@ -4,10 +4,14 @@ import inspect
import pkgutil
from abc import ABC, abstractmethod
from collections import defaultdict
+from functools import cache
from typing import Any, Iterable, TypeVar, Union
import regex
+import bot
+from bot.constants import Guild
+
VARIATION_SELECTORS = r"\uFE00-\uFE0F\U000E0100-\U000E01EF"
INVISIBLE_RE = regex.compile(rf"[{VARIATION_SELECTORS}\p{{UNASSIGNED}}\p{{FORMAT}}\p{{CONTROL}}--\s]", regex.V1)
ZALGO_RE = regex.compile(rf"[\p{{NONSPACING MARK}}\p{{ENCLOSING MARK}}--[{VARIATION_SELECTORS}]]", regex.V1)
@@ -69,6 +73,32 @@ def to_serializable(item: Any) -> Union[bool, int, float, str, list, dict, None]
return str(item)
+@cache
+def resolve_mention(mention: str) -> str:
+ """Return the appropriate formatting for the mention, be it a literal, a user ID, or a role ID."""
+ guild = bot.instance.get_guild(Guild.id)
+ if mention in ("here", "everyone"):
+ return f"@{mention}"
+ try:
+ mention = int(mention) # It's an ID.
+ except ValueError:
+ pass
+ else:
+ if any(mention == role.id for role in guild.roles):
+ return f"<@&{mention}>"
+ else:
+ return f"<@{mention}>"
+
+ # It's a name
+ for role in guild.roles:
+ if role.name == mention:
+ return role.mention
+ for member in guild.members:
+ if str(member) == mention:
+ return member.mention
+ return mention
+
+
def repr_equals(override: Any, default: Any) -> bool:
"""Return whether the override and the default have the same representation."""
if override is None: # It's not an override