diff options
| author | 2022-10-26 23:39:44 +0300 | |
|---|---|---|
| committer | 2022-10-26 23:39:44 +0300 | |
| commit | 08208df37b036cfc76eea0fd276aba61b602951c (patch) | |
| tree | 306d85c2a76469b23a2992a442882965d7652d4a | |
| parent | Add rich embed filter (diff) | |
Add Discord token filter
Also fix a bug with the cog trying to serialize a set when trying to modify the DB with no UI.
Also fix a bug with the domain setting description having a mismatching name.
| -rw-r--r-- | bot/exts/filtering/_filter_lists/filter_list.py | 7 | ||||
| -rw-r--r-- | bot/exts/filtering/_filters/domain.py | 2 | ||||
| -rw-r--r-- | bot/exts/filtering/_filters/unique/discord_token.py | 217 | ||||
| -rw-r--r-- | bot/exts/filtering/_settings_types/actions/ping.py | 32 | ||||
| -rw-r--r-- | bot/exts/filtering/_ui/ui.py | 2 | ||||
| -rw-r--r-- | bot/exts/filtering/_utils.py | 30 |
6 files changed, 255 insertions, 35 deletions
diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index 55204335b..50793b085 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -202,7 +202,7 @@ class SubscribingAtomicList(AtomicList): Each unique filter is subscribed to a subset of events to respond to. """ - subscriptions: defaultdict[Event, list[Filter]] = dataclasses.field(default_factory=lambda: defaultdict(list)) + subscriptions: defaultdict[Event, list[int]] = dataclasses.field(default_factory=lambda: defaultdict(list)) def subscribe(self, filter_: UniqueFilter, *events: Event) -> None: """ @@ -213,8 +213,9 @@ class SubscribingAtomicList(AtomicList): """ for event in events: if filter_ not in self.subscriptions[event]: - self.subscriptions[event].append(filter_) + self.subscriptions[event].append(filter_.id) def filter_list_result(self, ctx: FilterContext) -> list[Filter]: """Sift through the list of filters, and return only the ones which apply to the given context.""" - return self._create_filter_list_result(ctx, self.defaults, self.subscriptions[ctx.event]) + event_filters = [self.filters[id_] for id_ in self.subscriptions[ctx.event]] + return self._create_filter_list_result(ctx, self.defaults, event_filters) diff --git a/bot/exts/filtering/_filters/domain.py b/bot/exts/filtering/_filters/domain.py index 4976198cd..e22cafbb7 100644 --- a/bot/exts/filtering/_filters/domain.py +++ b/bot/exts/filtering/_filters/domain.py @@ -15,7 +15,7 @@ URL_RE = re.compile(r"(?:https?://)?(\S+?)[\\/]*", flags=re.IGNORECASE) class ExtraDomainSettings(BaseModel): """Extra settings for how domains should be matched in a message.""" - exact_description: ClassVar[str] = ( + subdomains_description: ClassVar[str] = ( "A boolean. If True, will will only trigger for subdomains and subpaths, and not for the domain itself." ) diff --git a/bot/exts/filtering/_filters/unique/discord_token.py b/bot/exts/filtering/_filters/unique/discord_token.py new file mode 100644 index 000000000..571a8a9b1 --- /dev/null +++ b/bot/exts/filtering/_filters/unique/discord_token.py @@ -0,0 +1,217 @@ +import base64 +import re +from collections.abc import Callable, Coroutine +from typing import ClassVar, NamedTuple + +import discord +from botcore.utils.logging import get_logger +from botcore.utils.members import get_or_fetch_member +from pydantic import BaseModel, Field + +import bot +from bot import constants, utils +from bot.exts.filtering._filter_context import Event, FilterContext +from bot.exts.filtering._filters.filter import UniqueFilter +from bot.exts.filtering._utils import resolve_mention +from bot.exts.moderation.modlog import ModLog +from bot.utils.messages import format_user + +log = get_logger(__name__) + + +LOG_MESSAGE = ( + "Censored a seemingly valid token sent by {author} in {channel}. " + "Token was: `{user_id}.{timestamp}.{hmac}`." +) +UNKNOWN_USER_LOG_MESSAGE = "Decoded user ID: `{user_id}` (Not present in server)." +KNOWN_USER_LOG_MESSAGE = ( + "Decoded user ID: `{user_id}` **(Present in server)**.\n" + "This matches `{user_name}` and means this is likely a valid **{kind}** token." +) +DISCORD_EPOCH = 1_420_070_400 +TOKEN_EPOCH = 1_293_840_000 + +# Three parts delimited by dots: user ID, creation timestamp, HMAC. +# The HMAC isn't parsed further, but it's in the regex to ensure it at least exists in the string. +# Each part only matches base64 URL-safe characters. +# These regexes were taken from discord-developers, which are used by the client itself. +TOKEN_RE = re.compile(r"([\w-]{10,})\.([\w-]{5,})\.([\w-]{10,})") + + +class ExtraDiscordTokenSettings(BaseModel): + """Extra settings for who should be pinged when a Discord token is detected.""" + + pings_for_bot_description: ClassVar[str] = "A sequence. Who should be pinged if the token found belongs to a bot." + pings_for_user_description: ClassVar[str] = "A sequence. Who should be pinged if the token found belongs to a user." + + pings_for_bot: set[str] = Field(default_factory=set) + pings_for_user: set[str] = Field(default_factory=lambda: {"Moderators"}) + + +class Token(NamedTuple): + """A Discord Bot token.""" + + user_id: str + timestamp: str + hmac: str + + +class DiscordTokenFilter(UniqueFilter): + """Scans messages for potential discord client tokens and removes them.""" + + name = "discord_token" + events = (Event.MESSAGE, Event.MESSAGE_EDIT) + extra_fields_type = ExtraDiscordTokenSettings + + @property + def mod_log(self) -> ModLog | None: + """Get currently loaded ModLog cog instance.""" + return bot.instance.get_cog("ModLog") + + def triggered_on(self, ctx: FilterContext) -> bool: + """Return whether the message contains Discord client tokens.""" + found_token = self.find_token_in_message(ctx.content) + if not found_token: + return False + + if mod_log := self.mod_log: + mod_log.ignore(constants.Event.message_delete, ctx.message.id) + ctx.content = ctx.content.replace(found_token.hmac, self.censor_hmac(found_token.hmac)) + ctx.additional_actions.append(self._create_token_alert_embed_wrapper(found_token)) + return True + + def _create_token_alert_embed_wrapper(self, found_token: Token) -> Callable[[FilterContext], Coroutine]: + """Create the action to perform when an alert should be sent for a message containing a Discord token.""" + async def _create_token_alert_embed(ctx: FilterContext) -> None: + """Add an alert embed to the context with info about the token sent.""" + userid_message, is_user = await self.format_userid_log_message(ctx.message, found_token) + log_message = self.format_log_message(ctx.message, found_token) + log.debug(log_message) + + if is_user: + mentions = map(resolve_mention, self.extra_fields.pings_for_user) + color = discord.Colour.red() + else: + mentions = map(resolve_mention, self.extra_fields.pings_for_bot) + color = discord.Colour.blue() + unmentioned = [mention for mention in mentions if mention not in ctx.alert_content] + if unmentioned: + ctx.alert_content = f"{' '.join(unmentioned)} {ctx.alert_content}" + ctx.alert_embeds.append(discord.Embed(colour=color, description=userid_message)) + + return _create_token_alert_embed + + @classmethod + async def format_userid_log_message(cls, msg: discord.Message, token: Token) -> tuple[str, bool]: + """ + Format the portion of the log message that includes details about the detected user ID. + + If the user is resolved to a member, the format includes the user ID, name, and the + kind of user detected. + If it is resolved to a user or a member, and it is not a bot, also return True. + Returns a tuple of (log_message, is_user) + """ + user_id = cls.extract_user_id(token.user_id) + user = await get_or_fetch_member(msg.guild, user_id) + + if user: + return KNOWN_USER_LOG_MESSAGE.format( + user_id=user_id, + user_name=str(user), + kind="BOT" if user.bot else "USER", + ), True + else: + return UNKNOWN_USER_LOG_MESSAGE.format(user_id=user_id), False + + @staticmethod + def censor_hmac(hmac: str) -> str: + """Return a censored version of the hmac.""" + return 'x' * (len(hmac) - 3) + hmac[-3:] + + @classmethod + def format_log_message(cls, msg: discord.Message, token: Token) -> str: + """Return the generic portion of the log message to send for `token` being censored in `msg`.""" + return LOG_MESSAGE.format( + author=format_user(msg.author), + channel=msg.channel.mention, + user_id=token.user_id, + timestamp=token.timestamp, + hmac=cls.censor_hmac(token.hmac), + ) + + @classmethod + def find_token_in_message(cls, content: str) -> Token | None: + """Return a seemingly valid token found in `msg` or `None` if no token is found.""" + # Use finditer rather than search to guard against method calls prematurely returning the + # token check (e.g. `message.channel.send` also matches our token pattern) + for match in TOKEN_RE.finditer(content): + token = Token(*match.groups()) + if ( + (cls.extract_user_id(token.user_id) is not None) + and cls.is_valid_timestamp(token.timestamp) + and cls.is_maybe_valid_hmac(token.hmac) + ): + # Short-circuit on first match + return token + + # No matching substring + return None + + @staticmethod + def extract_user_id(b64_content: str) -> int | None: + """Return a user ID integer from part of a potential token, or None if it couldn't be decoded.""" + b64_content = utils.pad_base64(b64_content) + + try: + decoded_bytes = base64.urlsafe_b64decode(b64_content) + string = decoded_bytes.decode('utf-8') + if not (string.isascii() and string.isdigit()): + # This case triggers if there are fancy unicode digits in the base64 encoding, + # that means it's not a valid user id. + return None + return int(string) + except ValueError: + return None + + @staticmethod + def is_valid_timestamp(b64_content: str) -> bool: + """ + Return True if `b64_content` decodes to a valid timestamp. + + If the timestamp is greater than the Discord epoch, it's probably valid. + See: https://i.imgur.com/7WdehGn.png + """ + b64_content = utils.pad_base64(b64_content) + + try: + decoded_bytes = base64.urlsafe_b64decode(b64_content) + timestamp = int.from_bytes(decoded_bytes, byteorder="big") + except ValueError as e: + log.debug(f"Failed to decode token timestamp '{b64_content}': {e}") + return False + + # Seems like newer tokens don't need the epoch added, but add anyway since an upper bound + # is not checked. + if timestamp + TOKEN_EPOCH >= DISCORD_EPOCH: + return True + else: + log.debug(f"Invalid token timestamp '{b64_content}': smaller than Discord epoch") + return False + + @staticmethod + def is_maybe_valid_hmac(b64_content: str) -> bool: + """ + Determine if a given HMAC portion of a token is potentially valid. + + If the HMAC has 3 or fewer characters, it's probably a dummy value like "xxxxxxxxxx", + and thus the token can probably be skipped. + """ + unique = len(set(b64_content.lower())) + if unique <= 3: + log.debug( + f"Considering the HMAC {b64_content} a dummy because it has {unique}" + " case-insensitively unique characters" + ) + return False + else: + return True diff --git a/bot/exts/filtering/_settings_types/actions/ping.py b/bot/exts/filtering/_settings_types/actions/ping.py index faac8f4b9..5597bdd59 100644 --- a/bot/exts/filtering/_settings_types/actions/ping.py +++ b/bot/exts/filtering/_settings_types/actions/ping.py @@ -1,12 +1,10 @@ -from functools import cache from typing import ClassVar from pydantic import validator -import bot -from bot.constants import Guild from bot.exts.filtering._filter_context import FilterContext from bot.exts.filtering._settings_types.settings_entry import ActionEntry +from bot.exts.filtering._utils import resolve_mention class Ping(ActionEntry): @@ -38,7 +36,7 @@ class Ping(ActionEntry): async def action(self, ctx: FilterContext) -> None: """Add the stored pings to the alert message content.""" mentions = self.guild_pings if ctx.channel.guild else self.dm_pings - new_content = " ".join([self._resolve_mention(mention) for mention in mentions]) + new_content = " ".join([resolve_mention(mention) for mention in mentions]) ctx.alert_content = f"{new_content} {ctx.alert_content}" def __or__(self, other: ActionEntry): @@ -47,29 +45,3 @@ class Ping(ActionEntry): return NotImplemented return Ping(guild_pings=self.guild_pings | other.guild_pings, dm_pings=self.dm_pings | other.dm_pings) - - @staticmethod - @cache - def _resolve_mention(mention: str) -> str: - """Return the appropriate formatting for the formatting, be it a literal, a user ID, or a role ID.""" - guild = bot.instance.get_guild(Guild.id) - if mention in ("here", "everyone"): - return f"@{mention}" - try: - mention = int(mention) # It's an ID. - except ValueError: - pass - else: - if any(mention == role.id for role in guild.roles): - return f"<@&{mention}>" - else: - return f"<@{mention}>" - - # It's a name - for role in guild.roles: - if role.name == mention: - return role.mention - for member in guild.members: - if str(member) == mention: - return member.mention - return mention diff --git a/bot/exts/filtering/_ui/ui.py b/bot/exts/filtering/_ui/ui.py index c506db1fe..6a261bc46 100644 --- a/bot/exts/filtering/_ui/ui.py +++ b/bot/exts/filtering/_ui/ui.py @@ -74,7 +74,7 @@ def parse_value(value: str, type_: type[T]) -> T: if hasattr(type_, "__origin__"): # In case this is a types.GenericAlias or a typing._GenericAlias type_ = type_.__origin__ if type_ in (tuple, list, set): - return type_(value.split(",")) + return list(value.split(",")) if type_ is bool: return value.lower() == "true" or value == "1" if isinstance(type_, EnumMeta): diff --git a/bot/exts/filtering/_utils.py b/bot/exts/filtering/_utils.py index a38fa22e4..d5dfbfc83 100644 --- a/bot/exts/filtering/_utils.py +++ b/bot/exts/filtering/_utils.py @@ -4,10 +4,14 @@ import inspect import pkgutil from abc import ABC, abstractmethod from collections import defaultdict +from functools import cache from typing import Any, Iterable, TypeVar, Union import regex +import bot +from bot.constants import Guild + VARIATION_SELECTORS = r"\uFE00-\uFE0F\U000E0100-\U000E01EF" INVISIBLE_RE = regex.compile(rf"[{VARIATION_SELECTORS}\p{{UNASSIGNED}}\p{{FORMAT}}\p{{CONTROL}}--\s]", regex.V1) ZALGO_RE = regex.compile(rf"[\p{{NONSPACING MARK}}\p{{ENCLOSING MARK}}--[{VARIATION_SELECTORS}]]", regex.V1) @@ -69,6 +73,32 @@ def to_serializable(item: Any) -> Union[bool, int, float, str, list, dict, None] return str(item) +@cache +def resolve_mention(mention: str) -> str: + """Return the appropriate formatting for the mention, be it a literal, a user ID, or a role ID.""" + guild = bot.instance.get_guild(Guild.id) + if mention in ("here", "everyone"): + return f"@{mention}" + try: + mention = int(mention) # It's an ID. + except ValueError: + pass + else: + if any(mention == role.id for role in guild.roles): + return f"<@&{mention}>" + else: + return f"<@{mention}>" + + # It's a name + for role in guild.roles: + if role.name == mention: + return role.mention + for member in guild.members: + if str(member) == mention: + return member.mention + return mention + + def repr_equals(override: Any, default: Any) -> bool: """Return whether the override and the default have the same representation.""" if override is None: # It's not an override |