diff options
-rw-r--r-- | bot/exts/utils/snekbox/__init__.py | 12 | ||||
-rw-r--r-- | bot/exts/utils/snekbox/_cog.py (renamed from bot/exts/utils/snekbox.py) | 112 | ||||
-rw-r--r-- | bot/exts/utils/snekbox/_eval.py | 117 | ||||
-rw-r--r-- | bot/exts/utils/snekbox/_io.py (renamed from bot/exts/utils/snekio.py) | 0 | ||||
-rw-r--r-- | tests/bot/exts/utils/test_snekbox.py | 40 |
5 files changed, 152 insertions, 129 deletions
diff --git a/bot/exts/utils/snekbox/__init__.py b/bot/exts/utils/snekbox/__init__.py new file mode 100644 index 000000000..cd1d3b059 --- /dev/null +++ b/bot/exts/utils/snekbox/__init__.py @@ -0,0 +1,12 @@ +from bot.bot import Bot +from bot.exts.utils.snekbox._cog import CodeblockConverter, Snekbox +from bot.exts.utils.snekbox._eval import EvalJob, EvalResult + +__all__ = ("CodeblockConverter", "Snekbox", "EvalJob", "EvalResult") + + +async def setup(bot: Bot) -> None: + """Load the Snekbox cog.""" + # Defer import to reduce side effects from importing the codeblock package. + from bot.exts.utils.snekbox._cog import Snekbox + await bot.add_cog(Snekbox(bot)) diff --git a/bot/exts/utils/snekbox.py b/bot/exts/utils/snekbox/_cog.py index b89882a65..9abbbcfc4 100644 --- a/bot/exts/utils/snekbox.py +++ b/bot/exts/utils/snekbox/_cog.py @@ -3,10 +3,8 @@ from __future__ import annotations import asyncio import contextlib import re -from dataclasses import dataclass, field from functools import partial from operator import attrgetter -from signal import Signals from textwrap import dedent from typing import Literal, Optional, TYPE_CHECKING, Tuple @@ -19,7 +17,7 @@ from bot.bot import Bot from bot.constants import Channels, MODERATION_ROLES, Roles, URLs from bot.decorators import redirect_output from bot.exts.help_channels._channel import is_help_forum_post -from bot.exts.utils.snekio import FILE_SIZE_LIMIT, FileAttachment, sizeof_fmt +from bot.exts.utils.snekbox._eval import EvalJob, EvalResult from bot.log import get_logger from bot.utils import send_to_paste_service from bot.utils.lock import LockedResourceError, lock_arg @@ -81,115 +79,12 @@ NO_SNEKBOX_CHANNELS = (Channels.python_general,) NO_SNEKBOX_CATEGORIES = () SNEKBOX_ROLES = (Roles.helpers, Roles.moderators, Roles.admins, Roles.owners, Roles.python_community, Roles.partners) -SIGKILL = 9 - REDO_EMOJI = '\U0001f501' # :repeat: REDO_TIMEOUT = 30 PythonVersion = Literal["3.10", "3.11"] -@dataclass -class EvalJob: - """Job to be evaluated by snekbox.""" - - args: list[str] - files: list[FileAttachment] = field(default_factory=list) - name: str = "eval" - version: PythonVersion = "3.11" - - @classmethod - def from_code(cls, code: str, path: str = "main.py") -> EvalJob: - """Create an EvalJob from a code string.""" - return cls( - args=[path], - files=[FileAttachment(path, code.encode())], - ) - - def as_version(self, version: PythonVersion) -> EvalJob: - """Return a copy of the job with a different Python version.""" - return EvalJob( - args=self.args, - files=self.files, - name=self.name, - version=version, - ) - - def to_dict(self) -> dict[str, list[str | dict[str, str]]]: - """Convert the job to a dict.""" - return { - "args": self.args, - "files": [file.to_dict() for file in self.files], - } - - -@dataclass(frozen=True) -class EvalResult: - """The result of an eval job.""" - - stdout: str - returncode: int | None - files: list[FileAttachment] = field(default_factory=list) - err_files: list[str] = field(default_factory=list) - - @property - def status_emoji(self) -> str: - """Return an emoji corresponding to the status code or lack of output in result.""" - # If there are attachments, skip empty output warning - if not self.stdout.strip() and not self.files: # No output - return ":warning:" - elif self.returncode == 0: # No error - return ":white_check_mark:" - else: # Exception - return ":x:" - - def get_message(self, job: EvalJob) -> tuple[str, str]: - """Return a user-friendly message and error corresponding to the process's return code.""" - msg = f"Your {job.version} {job.name} job has completed with return code {self.returncode}" - error = "" - - if self.returncode is None: - msg = f"Your {job.version} {job.name} job has failed" - error = self.stdout.strip() - elif self.returncode == 128 + SIGKILL: - msg = f"Your {job.version} {job.name} job timed out or ran out of memory" - elif self.returncode == 255: - msg = f"Your {job.version} {job.name} job has failed" - error = "A fatal NsJail error occurred" - else: - # Try to append signal's name if one exists - with contextlib.suppress(ValueError): - name = Signals(self.returncode - 128).name - msg = f"{msg} ({name})" - - # Add error message for failed attachments - if self.err_files: - failed_files = f"({', '.join(self.err_files)})" - msg += ( - f".\n\n> Some attached files were not able to be uploaded {failed_files}." - f" Check that the file size is less than {sizeof_fmt(FILE_SIZE_LIMIT)}" - ) - - return msg, error - - @classmethod - def from_dict(cls, data: dict[str, str | int | list[dict[str, str]]]) -> EvalResult: - """Create an EvalResult from a dict.""" - res = cls( - stdout=data["stdout"], - returncode=data["returncode"], - ) - - for file in data.get("files", []): - try: - res.files.append(FileAttachment.from_dict(file)) - except ValueError as e: - log.info(f"Failed to parse file from snekbox response: {e}") - res.err_files.append(file["path"]) - - return res - - class CodeblockConverter(Converter): """Attempts to extract code from a codeblock, if provided.""" @@ -622,8 +517,3 @@ def predicate_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> def predicate_emoji_reaction(ctx: Context, reaction: Reaction, user: User) -> bool: """Return True if the reaction REDO_EMOJI was added by the context message author on this message.""" return reaction.message.id == ctx.message.id and user.id == ctx.author.id and str(reaction) == REDO_EMOJI - - -async def setup(bot: Bot) -> None: - """Load the Snekbox cog.""" - await bot.add_cog(Snekbox(bot)) diff --git a/bot/exts/utils/snekbox/_eval.py b/bot/exts/utils/snekbox/_eval.py new file mode 100644 index 000000000..784de5a10 --- /dev/null +++ b/bot/exts/utils/snekbox/_eval.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import contextlib +from dataclasses import dataclass, field +from signal import Signals +from typing import TYPE_CHECKING + +from bot.exts.utils.snekbox._io import FILE_SIZE_LIMIT, FileAttachment, sizeof_fmt +from bot.log import get_logger + +if TYPE_CHECKING: + from bot.exts.utils.snekbox._cog import PythonVersion + +log = get_logger(__name__) + +SIGKILL = 9 + + +@dataclass +class EvalJob: + """Job to be evaluated by snekbox.""" + + args: list[str] + files: list[FileAttachment] = field(default_factory=list) + name: str = "eval" + version: PythonVersion = "3.11" + + @classmethod + def from_code(cls, code: str, path: str = "main.py") -> EvalJob: + """Create an EvalJob from a code string.""" + return cls( + args=[path], + files=[FileAttachment(path, code.encode())], + ) + + def as_version(self, version: PythonVersion) -> EvalJob: + """Return a copy of the job with a different Python version.""" + return EvalJob( + args=self.args, + files=self.files, + name=self.name, + version=version, + ) + + def to_dict(self) -> dict[str, list[str | dict[str, str]]]: + """Convert the job to a dict.""" + return { + "args": self.args, + "files": [file.to_dict() for file in self.files], + } + + +@dataclass(frozen=True) +class EvalResult: + """The result of an eval job.""" + + stdout: str + returncode: int | None + files: list[FileAttachment] = field(default_factory=list) + err_files: list[str] = field(default_factory=list) + + @property + def status_emoji(self) -> str: + """Return an emoji corresponding to the status code or lack of output in result.""" + # If there are attachments, skip empty output warning + if not self.stdout.strip() and not self.files: # No output + return ":warning:" + elif self.returncode == 0: # No error + return ":white_check_mark:" + else: # Exception + return ":x:" + + def get_message(self, job: EvalJob) -> tuple[str, str]: + """Return a user-friendly message and error corresponding to the process's return code.""" + msg = f"Your {job.version} {job.name} job has completed with return code {self.returncode}" + error = "" + + if self.returncode is None: + msg = f"Your {job.version} {job.name} job has failed" + error = self.stdout.strip() + elif self.returncode == 128 + SIGKILL: + msg = f"Your {job.version} {job.name} job timed out or ran out of memory" + elif self.returncode == 255: + msg = f"Your {job.version} {job.name} job has failed" + error = "A fatal NsJail error occurred" + else: + # Try to append signal's name if one exists + with contextlib.suppress(ValueError): + name = Signals(self.returncode - 128).name + msg = f"{msg} ({name})" + + # Add error message for failed attachments + if self.err_files: + failed_files = f"({', '.join(self.err_files)})" + msg += ( + f".\n\n> Some attached files were not able to be uploaded {failed_files}." + f" Check that the file size is less than {sizeof_fmt(FILE_SIZE_LIMIT)}" + ) + + return msg, error + + @classmethod + def from_dict(cls, data: dict[str, str | int | list[dict[str, str]]]) -> EvalResult: + """Create an EvalResult from a dict.""" + res = cls( + stdout=data["stdout"], + returncode=data["returncode"], + ) + + for file in data.get("files", []): + try: + res.files.append(FileAttachment.from_dict(file)) + except ValueError as e: + log.info(f"Failed to parse file from snekbox response: {e}") + res.err_files.append(file["path"]) + + return res diff --git a/bot/exts/utils/snekio.py b/bot/exts/utils/snekbox/_io.py index a7f84a241..a7f84a241 100644 --- a/bot/exts/utils/snekio.py +++ b/bot/exts/utils/snekbox/_io.py diff --git a/tests/bot/exts/utils/test_snekbox.py b/tests/bot/exts/utils/test_snekbox.py index 722c5c569..31b1ca260 100644 --- a/tests/bot/exts/utils/test_snekbox.py +++ b/tests/bot/exts/utils/test_snekbox.py @@ -55,14 +55,18 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): async def test_upload_output_reject_too_long(self): """Reject output longer than MAX_PASTE_LENGTH.""" - result = await self.cog.upload_output("-" * (snekbox.MAX_PASTE_LENGTH + 1)) + result = await self.cog.upload_output("-" * (snekbox._cog.MAX_PASTE_LENGTH + 1)) self.assertEqual(result, "too long to upload") - @patch("bot.exts.utils.snekbox.send_to_paste_service") + @patch("bot.exts.utils.snekbox._cog.send_to_paste_service") async def test_upload_output(self, mock_paste_util): """Upload the eval output to the URLs.paste_service.format(key="documents") endpoint.""" await self.cog.upload_output("Test output.") - mock_paste_util.assert_called_once_with("Test output.", extension="txt", max_length=snekbox.MAX_PASTE_LENGTH) + mock_paste_util.assert_called_once_with( + "Test output.", + extension="txt", + max_length=snekbox._cog.MAX_PASTE_LENGTH + ) async def test_codeblock_converter(self): ctx = MockContext() @@ -95,7 +99,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ) for case, setup_code, test_name in cases: - setup = snekbox.TIMEIT_SETUP_WRAPPER.format(setup=setup_code) + setup = snekbox._cog.TIMEIT_SETUP_WRAPPER.format(setup=setup_code) expected = [*base_args, setup, '\n'.join(case[1:] if setup_code else case)] with self.subTest(msg=f'Test with {test_name} and expected return {expected}'): self.assertEqual(self.cog.prepare_timeit_input(case), expected) @@ -104,7 +108,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): """EvalResult.message, should return error and message.""" cases = ( ('ERROR', None, ('Your 3.11 eval job has failed', 'ERROR')), - ('', 128 + snekbox.SIGKILL, ('Your 3.11 eval job timed out or ran out of memory', '')), + ('', 128 + snekbox._eval.SIGKILL, ('Your 3.11 eval job timed out or ran out of memory', '')), ('', 255, ('Your 3.11 eval job has failed', 'A fatal NsJail error occurred')) ) for stdout, returncode, expected in cases: @@ -113,7 +117,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): job = EvalJob([]) self.assertEqual(result.get_message(job), expected) - @patch('bot.exts.utils.snekbox.Signals', side_effect=ValueError) + @patch('bot.exts.utils.snekbox._eval.Signals', side_effect=ValueError) def test_eval_result_message_invalid_signal(self, _mock_signals: Mock): result = EvalResult(stdout="", returncode=127) self.assertEqual( @@ -121,7 +125,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ("Your 3.10 eval job has completed with return code 127", "") ) - @patch('bot.exts.utils.snekbox.Signals') + @patch('bot.exts.utils.snekbox._eval.Signals') def test_eval_result_message_valid_signal(self, mock_signals: Mock): mock_signals.return_value.name = "SIGTEST" result = EvalResult(stdout="", returncode=127) @@ -328,7 +332,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): self.cog.post_job.assert_called_once_with(job) self.cog.upload_output.assert_not_called() - @patch("bot.exts.utils.snekbox.partial") + @patch("bot.exts.utils.snekbox._cog.partial") async def test_continue_job_does_continue(self, partial_mock): """Test that the continue_job function does continue if required conditions are met.""" ctx = MockContext( @@ -353,14 +357,14 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ( call( 'message_edit', - check=partial_mock(snekbox.predicate_message_edit, ctx), - timeout=snekbox.REDO_TIMEOUT, + check=partial_mock(snekbox._cog.predicate_message_edit, ctx), + timeout=snekbox._cog.REDO_TIMEOUT, ), - call('reaction_add', check=partial_mock(snekbox.predicate_emoji_reaction, ctx), timeout=10) + call('reaction_add', check=partial_mock(snekbox._cog.predicate_emoji_reaction, ctx), timeout=10) ) ) - ctx.message.add_reaction.assert_called_once_with(snekbox.REDO_EMOJI) - ctx.message.clear_reaction.assert_called_once_with(snekbox.REDO_EMOJI) + ctx.message.add_reaction.assert_called_once_with(snekbox._cog.REDO_EMOJI) + ctx.message.clear_reaction.assert_called_once_with(snekbox._cog.REDO_EMOJI) response.delete.assert_called_once() async def test_continue_job_does_not_continue(self): @@ -369,7 +373,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): actual = await self.cog.continue_job(ctx, MockMessage(), self.cog.eval_command) self.assertEqual(actual, None) - ctx.message.clear_reaction.assert_called_once_with(snekbox.REDO_EMOJI) + ctx.message.clear_reaction.assert_called_once_with(snekbox._cog.REDO_EMOJI) async def test_get_code(self): """Should return 1st arg (or None) if eval cmd in message, otherwise return full content.""" @@ -411,18 +415,18 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): for ctx_msg, new_msg, expected, testname in cases: with self.subTest(msg=f'Messages with {testname} return {expected}'): ctx = MockContext(message=ctx_msg) - actual = snekbox.predicate_message_edit(ctx, ctx_msg, new_msg) + actual = snekbox._cog.predicate_message_edit(ctx, ctx_msg, new_msg) self.assertEqual(actual, expected) def test_predicate_emoji_reaction(self): """Test the predicate_emoji_reaction function.""" valid_reaction = MockReaction(message=MockMessage(id=1)) - valid_reaction.__str__.return_value = snekbox.REDO_EMOJI + valid_reaction.__str__.return_value = snekbox._cog.REDO_EMOJI valid_ctx = MockContext(message=MockMessage(id=1), author=MockUser(id=2)) valid_user = MockUser(id=2) invalid_reaction_id = MockReaction(message=MockMessage(id=42)) - invalid_reaction_id.__str__.return_value = snekbox.REDO_EMOJI + invalid_reaction_id.__str__.return_value = snekbox._cog.REDO_EMOJI invalid_user_id = MockUser(id=42) invalid_reaction_str = MockReaction(message=MockMessage(id=1)) invalid_reaction_str.__str__.return_value = ':longbeard:' @@ -435,7 +439,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ) for reaction, user, expected, testname in cases: with self.subTest(msg=f'Test with {testname} and expected return {expected}'): - actual = snekbox.predicate_emoji_reaction(valid_ctx, reaction, user) + actual = snekbox._cog.predicate_emoji_reaction(valid_ctx, reaction, user) self.assertEqual(actual, expected) |