aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Ionite <[email protected]>2023-02-06 21:27:20 -0500
committerGravatar Ionite <[email protected]>2023-02-06 21:27:20 -0500
commitbebd241756cc4ec35839a3abdc20ffdf18b1b463 (patch)
tree142f5478156c06fe9136cdda1c02d97b41aafe1a
parentChange failed files str to truncate on chars only (diff)
Add file extension filtering
-rw-r--r--bot/exts/utils/snekbox/_cog.py76
-rw-r--r--bot/exts/utils/snekbox/_io.py5
2 files changed, 67 insertions, 14 deletions
diff --git a/bot/exts/utils/snekbox/_cog.py b/bot/exts/utils/snekbox/_cog.py
index 5fe20d339..ffa7d4f57 100644
--- a/bot/exts/utils/snekbox/_cog.py
+++ b/bot/exts/utils/snekbox/_cog.py
@@ -6,7 +6,7 @@ import re
from functools import partial
from operator import attrgetter
from textwrap import dedent
-from typing import Literal, Optional, TYPE_CHECKING, Tuple
+from typing import Literal, NamedTuple, Optional, TYPE_CHECKING, Tuple
from discord import AllowedMentions, HTTPException, Interaction, Message, NotFound, Reaction, User, enums, ui
from discord.ext.commands import Cog, Command, Context, Converter, command, guild_only
@@ -14,10 +14,13 @@ from pydis_core.utils import interactions
from pydis_core.utils.regex import FORMATTED_CODE_REGEX, RAW_CODE_REGEX
from bot.bot import Bot
-from bot.constants import Channels, MODERATION_ROLES, Roles, URLs
+from bot.constants import Channels, Filter, MODERATION_ROLES, Roles, URLs
from bot.decorators import redirect_output
+from bot.exts.events.code_jams._channels import CATEGORY_NAME as JAM_CATEGORY_NAME
+from bot.exts.filters.antimalware import TXT_LIKE_FILES
from bot.exts.help_channels._channel import is_help_forum_post
from bot.exts.utils.snekbox._eval import EvalJob, EvalResult
+from bot.exts.utils.snekbox._io import FileAttachment
from bot.log import get_logger
from bot.utils import send_to_paste_service
from bot.utils.lock import LockedResourceError, lock_arg
@@ -84,6 +87,8 @@ REDO_TIMEOUT = 30
PythonVersion = Literal["3.10", "3.11"]
+FilteredFiles = NamedTuple("FilteredFiles", [("allowed", list[FileAttachment]), ("blocked", list[FileAttachment])])
+
class CodeblockConverter(Converter):
"""Attempts to extract code from a codeblock, if provided."""
@@ -269,6 +274,41 @@ class Snekbox(Cog):
return output, paste_link
+ def get_extensions_whitelist(self) -> set[str]:
+ """Return a set of whitelisted file extensions."""
+ return set(self.bot.filter_list_cache['FILE_FORMAT.True'].keys()) | TXT_LIKE_FILES
+
+ def _filter_files(self, ctx: Context, files: list[FileAttachment]) -> FilteredFiles:
+ """Filter to restrict files to allowed extensions. Return a named tuple of allowed and blocked files lists."""
+ # Check if user is staff, if is, return
+ # Since we only care that roles exist to iterate over, check for the attr rather than a User/Member instance
+ if hasattr(ctx.author, "roles") and any(role.id in Filter.role_whitelist for role in ctx.author.roles):
+ return FilteredFiles(files, [])
+ # Ignore code jam channels
+ if getattr(ctx.channel, "category", None) and ctx.channel.category.name == JAM_CATEGORY_NAME:
+ return FilteredFiles(files, [])
+
+ # Get whitelisted extensions
+ whitelist = self.get_extensions_whitelist()
+
+ # Filter files into allowed and blocked
+ blocked = []
+ allowed = []
+ for file in files:
+ if file.suffix in whitelist:
+ allowed.append(file)
+ else:
+ blocked.append(file)
+
+ if blocked:
+ blocked_str = ", ".join(f.suffix for f in blocked)
+ log.info(
+ f"User '{ctx.author}' ({ctx.author.id}) uploaded blacklisted file(s) in eval: {blocked_str}",
+ extra={"attachment_list": [f.path for f in files]}
+ )
+
+ return FilteredFiles(allowed, blocked)
+
@lock_arg("snekbox.send_job", "ctx", attrgetter("author.id"), raise_error=True)
async def send_job(self, ctx: Context, job: EvalJob) -> Message:
"""
@@ -305,20 +345,28 @@ class Snekbox(Cog):
else:
self.bot.stats.incr("snekbox.python.success")
+ allowed_mentions = AllowedMentions(everyone=False, roles=False, users=[ctx.author])
+ view = self.build_python_version_switcher_view(job.version, ctx, job)
+
+ # Filter file extensions
+ allowed, blocked = self._filter_files(ctx, result.files)
+ # Add notice if any files were blocked
+ if blocked:
+ file_s = "file was" if len(blocked) == 1 else "files were"
+ ext_s = "extension" if len(blocked) == 1 else "extensions"
+ msg += (
+ f"\n{len(blocked)} {file_s} not uploaded due to disallowed {ext_s}: "
+ f"**{', '.join(f.suffix for f in blocked)}**"
+ )
+
filter_cog: Filtering | None = self.bot.get_cog("Filtering")
- filter_triggered = False
- if filter_cog:
- filter_triggered = await filter_cog.filter_snekbox_output(msg, ctx.message)
- if filter_triggered:
- response = await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.")
- else:
- allowed_mentions = AllowedMentions(everyone=False, roles=False, users=[ctx.author])
- view = self.build_python_version_switcher_view(job.version, ctx, job)
+ if filter_cog and (await filter_cog.filter_snekbox_output(msg, ctx.message)):
+ return await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.")
- # Attach files if provided
- files = [f.to_file() for f in result.files]
- response = await ctx.send(msg, allowed_mentions=allowed_mentions, view=view, files=files)
- view.message = response
+ # Attach files if provided
+ files = [f.to_file() for f in allowed]
+ response = await ctx.send(msg, allowed_mentions=allowed_mentions, view=view, files=files)
+ view.message = response
log.info(f"{ctx.author}'s {job.name} job had a return code of {result.returncode}")
return response
diff --git a/bot/exts/utils/snekbox/_io.py b/bot/exts/utils/snekbox/_io.py
index faa7d3bb3..ce645dbca 100644
--- a/bot/exts/utils/snekbox/_io.py
+++ b/bot/exts/utils/snekbox/_io.py
@@ -61,6 +61,11 @@ class FileAttachment:
content = f"{self.content[:10]}..." if len(self.content) > 10 else self.content
return f"FileAttachment(path={self.path!r}, content={content})"
+ @property
+ def suffix(self) -> str:
+ """Return the file suffix."""
+ return Path(self.path).suffix
+
@classmethod
def from_dict(cls, data: dict, size_limit: int = FILE_SIZE_LIMIT) -> FileAttachment:
"""Create a FileAttachment from a dict response."""