diff options
| -rw-r--r-- | bot/exts/filters/filtering.py | 60 | 
1 files changed, 44 insertions, 16 deletions
| diff --git a/bot/exts/filters/filtering.py b/bot/exts/filters/filtering.py index 3527bf8bb..4093ba4ad 100644 --- a/bot/exts/filters/filtering.py +++ b/bot/exts/filters/filtering.py @@ -2,7 +2,7 @@ import asyncio  import logging  import re  from datetime import datetime, timedelta -from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Union +from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Tuple, Union  import dateutil  import discord.errors @@ -137,6 +137,10 @@ class Filtering(Cog):          """Fetch items from the filter_list_cache."""          return self.bot.filter_list_cache[f"{list_type.upper()}.{allowed}"].keys() +    def _get_filterlist_value(self, list_type: str, value: Any, *, allowed: bool) -> dict: +        """Fetch one specific value from filter_list_cache.""" +        return self.bot.filter_list_cache[f"{list_type.upper()}.{allowed}"][value] +      @staticmethod      def _expand_spoilers(text: str) -> str:          """Return a string containing all interpretations of a spoilered message.""" @@ -236,7 +240,7 @@ class Filtering(Cog):                  # We also do not need to worry about filters that take the full message,                  # since all we have is an arbitrary string.                  if _filter["enabled"] and _filter["content_only"]: -                    match = await _filter["function"](result) +                    match, reason = await _filter["function"](result)                      if match:                          # If this is a filter (not a watchlist), we set the variable so we know @@ -245,7 +249,7 @@ class Filtering(Cog):                              filter_triggered = True                          stats = self._add_stats(filter_name, match, result) -                        await self._send_log(filter_name, _filter, msg, stats, is_eval=True) +                        await self._send_log(filter_name, _filter, msg, stats, reason, is_eval=True)                          break  # We don't want multiple filters to trigger @@ -267,9 +271,17 @@ class Filtering(Cog):                      # Does the filter only need the message content or the full message?                      if _filter["content_only"]: -                        match = await _filter["function"](msg.content) +                        payload = msg.content +                    else: +                        payload = msg + +                    result = await _filter["function"](payload) +                    reason = None + +                    if isinstance(result, tuple): +                        match, reason = result                      else: -                        match = await _filter["function"](msg) +                        match = result                      if match:                          is_private = msg.channel.type is discord.ChannelType.private @@ -316,7 +328,7 @@ class Filtering(Cog):                                  log.trace(f"Offensive message {msg.id} will be deleted on {delete_date}")                          stats = self._add_stats(filter_name, match, msg.content) -                        await self._send_log(filter_name, _filter, msg, stats) +                        await self._send_log(filter_name, _filter, msg, stats, reason)                          break  # We don't want multiple filters to trigger @@ -326,6 +338,7 @@ class Filtering(Cog):          _filter: Dict[str, Any],          msg: discord.Message,          stats: Stats, +        reason: Optional[str] = None,          *,          is_eval: bool = False,      ) -> None: @@ -339,6 +352,7 @@ class Filtering(Cog):              ping_everyone = Filter.ping_everyone and _filter.get("ping_everyone", True)          eval_msg = "using !eval " if is_eval else "" +        footer = f"Reason: {reason}" if reason else None          message = (              f"The {filter_name} {_filter['type']} was triggered by {format_user(msg.author)} "              f"{channel_str} {eval_msg}with [the following message]({msg.jump_url}):\n\n" @@ -357,6 +371,7 @@ class Filtering(Cog):              channel_id=Channels.mod_alerts,              ping_everyone=ping_everyone,              additional_embeds=stats.additional_embeds, +            footer=footer,          )      def _add_stats(self, name: str, match: FilterMatch, content: str) -> Stats: @@ -381,13 +396,14 @@ class Filtering(Cog):          if name == "filter_invites" and match is not True:              additional_embeds = []              for _, data in match.items(): +                reason = f"Reason: {data['reason']} | " if data.get('reason') else ""                  embed = discord.Embed(description=(                      f"**Members:**\n{data['members']}\n"                      f"**Active:**\n{data['active']}"                  ))                  embed.set_author(name=data["name"])                  embed.set_thumbnail(url=data["icon"]) -                embed.set_footer(text=f"Guild ID: {data['id']}") +                embed.set_footer(text=f"{reason}Guild ID: {data['id']}")                  additional_embeds.append(embed)          elif name == "watch_rich_embeds": @@ -411,39 +427,46 @@ class Filtering(Cog):              and not msg.author.bot                          # Author not a bot          ) -    async def _has_watch_regex_match(self, text: str) -> Union[bool, re.Match]: +    async def _has_watch_regex_match(self, text: str) -> Tuple[Union[bool, re.Match], Optional[str]]:          """          Return True if `text` matches any regex from `word_watchlist` or `token_watchlist` configs.          `word_watchlist`'s patterns are placed between word boundaries while `token_watchlist` is          matched as-is. Spoilers are expanded, if any, and URLs are ignored. +        Second return value is a reason written to database about blacklist entry (can be None).          """          if SPOILER_RE.search(text):              text = self._expand_spoilers(text)          # Make sure it's not a URL          if URL_RE.search(text): -            return False +            return False, None          watchlist_patterns = self._get_filterlist_items('filter_token', allowed=False)          for pattern in watchlist_patterns:              match = re.search(pattern, text, flags=re.IGNORECASE)              if match: -                return match +                return match, self._get_filterlist_value('filter_token', pattern, allowed=False)['comment'] + +        return False, None -    async def _has_urls(self, text: str) -> bool: -        """Returns True if the text contains one of the blacklisted URLs from the config file.""" +    async def _has_urls(self, text: str) -> Tuple[bool, Optional[str]]: +        """ +        Returns True if the text contains one of the blacklisted URLs from the config file. + +        Second return value is a reason of URL blacklisting (can be None). +        """          if not URL_RE.search(text): -            return False +            return False, None          text = text.lower()          domain_blacklist = self._get_filterlist_items("domain_name", allowed=False)          for url in domain_blacklist:              if url.lower() in text: -                return True +                return True, self._get_filterlist_value("domain_name", url, allowed=False)["comment"] -        return False +        return False, None      @staticmethod      async def _has_zalgo(text: str) -> bool: @@ -500,6 +523,10 @@ class Filtering(Cog):              )              if invite_not_allowed: +                reason = None +                if guild_id in guild_invite_blacklist: +                    reason = self._get_filterlist_value("guild_invite", guild_id, allowed=False)["comment"] +                  guild_icon_hash = guild["icon"]                  guild_icon = (                      "https://cdn.discordapp.com/icons/" @@ -511,7 +538,8 @@ class Filtering(Cog):                      "id": guild['id'],                      "icon": guild_icon,                      "members": response["approximate_member_count"], -                    "active": response["approximate_presence_count"] +                    "active": response["approximate_presence_count"], +                    "reason": reason                  }          return invite_data if invite_data else False | 
