diff options
| -rw-r--r-- | bot/exts/utils/snekbox/_cog.py | 76 | ||||
| -rw-r--r-- | bot/exts/utils/snekbox/_io.py | 5 | 
2 files changed, 67 insertions, 14 deletions
| diff --git a/bot/exts/utils/snekbox/_cog.py b/bot/exts/utils/snekbox/_cog.py index 5fe20d339..ffa7d4f57 100644 --- a/bot/exts/utils/snekbox/_cog.py +++ b/bot/exts/utils/snekbox/_cog.py @@ -6,7 +6,7 @@ import re  from functools import partial  from operator import attrgetter  from textwrap import dedent -from typing import Literal, Optional, TYPE_CHECKING, Tuple +from typing import Literal, NamedTuple, Optional, TYPE_CHECKING, Tuple  from discord import AllowedMentions, HTTPException, Interaction, Message, NotFound, Reaction, User, enums, ui  from discord.ext.commands import Cog, Command, Context, Converter, command, guild_only @@ -14,10 +14,13 @@ from pydis_core.utils import interactions  from pydis_core.utils.regex import FORMATTED_CODE_REGEX, RAW_CODE_REGEX  from bot.bot import Bot -from bot.constants import Channels, MODERATION_ROLES, Roles, URLs +from bot.constants import Channels, Filter, MODERATION_ROLES, Roles, URLs  from bot.decorators import redirect_output +from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME +from bot.exts.filters.antimalware import TXT_LIKE_FILES  from bot.exts.help_channels._channel import is_help_forum_post  from bot.exts.utils.snekbox._eval import EvalJob, EvalResult +from bot.exts.utils.snekbox._io import FileAttachment  from bot.log import get_logger  from bot.utils import send_to_paste_service  from bot.utils.lock import LockedResourceError, lock_arg @@ -84,6 +87,8 @@ REDO_TIMEOUT = 30  PythonVersion = Literal["3.10", "3.11"] +FilteredFiles = NamedTuple("FilteredFiles", [("allowed", list[FileAttachment]), ("blocked", list[FileAttachment])]) +  class CodeblockConverter(Converter):      """Attempts to extract code from a codeblock, if provided.""" @@ -269,6 +274,41 @@ class Snekbox(Cog):          return output, paste_link +    def get_extensions_whitelist(self) -> set[str]: +        """Return a set of whitelisted file extensions.""" +        return set(self.bot.filter_list_cache['FILE_FORMAT.True'].keys()) | TXT_LIKE_FILES + +    def _filter_files(self, ctx: Context, files: list[FileAttachment]) -> FilteredFiles: +        """Filter to restrict files to allowed extensions. Return a named tuple of allowed and blocked files lists.""" +        # Check if user is staff, if is, return +        # Since we only care that roles exist to iterate over, check for the attr rather than a User/Member instance +        if hasattr(ctx.author, "roles") and any(role.id in Filter.role_whitelist for role in ctx.author.roles): +            return FilteredFiles(files, []) +        # Ignore code jam channels +        if getattr(ctx.channel, "category", None) and ctx.channel.category.name == JAM_CATEGORY_NAME: +            return FilteredFiles(files, []) + +        # Get whitelisted extensions +        whitelist = self.get_extensions_whitelist() + +        # Filter files into allowed and blocked +        blocked = [] +        allowed = [] +        for file in files: +            if file.suffix in whitelist: +                allowed.append(file) +            else: +                blocked.append(file) + +        if blocked: +            blocked_str = ", ".join(f.suffix for f in blocked) +            log.info( +                f"User '{ctx.author}' ({ctx.author.id}) uploaded blacklisted file(s) in eval: {blocked_str}", +                extra={"attachment_list": [f.path for f in files]} +            ) + +        return FilteredFiles(allowed, blocked) +      @lock_arg("snekbox.send_job", "ctx", attrgetter("author.id"), raise_error=True)      async def send_job(self, ctx: Context, job: EvalJob) -> Message:          """ @@ -305,20 +345,28 @@ class Snekbox(Cog):              else:                  self.bot.stats.incr("snekbox.python.success") +            allowed_mentions = AllowedMentions(everyone=False, roles=False, users=[ctx.author]) +            view = self.build_python_version_switcher_view(job.version, ctx, job) + +            # Filter file extensions +            allowed, blocked = self._filter_files(ctx, result.files) +            # Add notice if any files were blocked +            if blocked: +                file_s = "file was" if len(blocked) == 1 else "files were" +                ext_s = "extension" if len(blocked) == 1 else "extensions" +                msg += ( +                    f"\n{len(blocked)} {file_s} not uploaded due to disallowed {ext_s}: " +                    f"**{', '.join(f.suffix for f in blocked)}**" +                ) +              filter_cog: Filtering | None = self.bot.get_cog("Filtering") -            filter_triggered = False -            if filter_cog: -                filter_triggered = await filter_cog.filter_snekbox_output(msg, ctx.message) -            if filter_triggered: -                response = await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.") -            else: -                allowed_mentions = AllowedMentions(everyone=False, roles=False, users=[ctx.author]) -                view = self.build_python_version_switcher_view(job.version, ctx, job) +            if filter_cog and (await filter_cog.filter_snekbox_output(msg, ctx.message)): +                return await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.") -                # Attach files if provided -                files = [f.to_file() for f in result.files] -                response = await ctx.send(msg, allowed_mentions=allowed_mentions, view=view, files=files) -                view.message = response +            # Attach files if provided +            files = [f.to_file() for f in allowed] +            response = await ctx.send(msg, allowed_mentions=allowed_mentions, view=view, files=files) +            view.message = response              log.info(f"{ctx.author}'s {job.name} job had a return code of {result.returncode}")          return response diff --git a/bot/exts/utils/snekbox/_io.py b/bot/exts/utils/snekbox/_io.py index faa7d3bb3..ce645dbca 100644 --- a/bot/exts/utils/snekbox/_io.py +++ b/bot/exts/utils/snekbox/_io.py @@ -61,6 +61,11 @@ class FileAttachment:          content = f"{self.content[:10]}..." if len(self.content) > 10 else self.content          return f"FileAttachment(path={self.path!r}, content={content})" +    @property +    def suffix(self) -> str: +        """Return the file suffix.""" +        return Path(self.path).suffix +      @classmethod      def from_dict(cls, data: dict, size_limit: int = FILE_SIZE_LIMIT) -> FileAttachment:          """Create a FileAttachment from a dict response.""" | 
