diff options
| author | 2022-12-14 17:30:33 +0800 | |
|---|---|---|
| committer | 2022-12-14 17:30:33 +0800 | |
| commit | 727a146f2de0f37c43d6939dc4368ef780373cd4 (patch) | |
| tree | 846e96788a3cf09d91c576725e0575267d46c9a2 | |
| parent | Merge branch 'main' into snekbox-files (diff) | |
Refactor to move snekbox to module
Diffstat (limited to '')
| -rw-r--r-- | bot/exts/utils/snekbox/__init__.py | 12 | ||||
| -rw-r--r-- | bot/exts/utils/snekbox/_cog.py (renamed from bot/exts/utils/snekbox.py) | 112 | ||||
| -rw-r--r-- | bot/exts/utils/snekbox/_eval.py | 117 | ||||
| -rw-r--r-- | bot/exts/utils/snekbox/_io.py (renamed from bot/exts/utils/snekio.py) | 0 | ||||
| -rw-r--r-- | tests/bot/exts/utils/test_snekbox.py | 40 | 
5 files changed, 152 insertions, 129 deletions
diff --git a/bot/exts/utils/snekbox/__init__.py b/bot/exts/utils/snekbox/__init__.py new file mode 100644 index 000000000..cd1d3b059 --- /dev/null +++ b/bot/exts/utils/snekbox/__init__.py @@ -0,0 +1,12 @@ +from bot.bot import Bot +from bot.exts.utils.snekbox._cog import CodeblockConverter, Snekbox +from bot.exts.utils.snekbox._eval import EvalJob, EvalResult + +__all__ = ("CodeblockConverter", "Snekbox", "EvalJob", "EvalResult") + + +async def setup(bot: Bot) -> None: +    """Load the Snekbox cog.""" +    # Defer import to reduce side effects from importing the codeblock package. +    from bot.exts.utils.snekbox._cog import Snekbox +    await bot.add_cog(Snekbox(bot)) diff --git a/bot/exts/utils/snekbox.py b/bot/exts/utils/snekbox/_cog.py index b89882a65..9abbbcfc4 100644 --- a/bot/exts/utils/snekbox.py +++ b/bot/exts/utils/snekbox/_cog.py @@ -3,10 +3,8 @@ from __future__ import annotations  import asyncio  import contextlib  import re -from dataclasses import dataclass, field  from functools import partial  from operator import attrgetter -from signal import Signals  from textwrap import dedent  from typing import Literal, Optional, TYPE_CHECKING, Tuple @@ -19,7 +17,7 @@ from bot.bot import Bot  from bot.constants import Channels, MODERATION_ROLES, Roles, URLs  from bot.decorators import redirect_output  from bot.exts.help_channels._channel import is_help_forum_post -from bot.exts.utils.snekio import FILE_SIZE_LIMIT, FileAttachment, sizeof_fmt +from bot.exts.utils.snekbox._eval import EvalJob, EvalResult  from bot.log import get_logger  from bot.utils import send_to_paste_service  from bot.utils.lock import LockedResourceError, lock_arg @@ -81,115 +79,12 @@ NO_SNEKBOX_CHANNELS = (Channels.python_general,)  NO_SNEKBOX_CATEGORIES = ()  SNEKBOX_ROLES = (Roles.helpers, Roles.moderators, Roles.admins, Roles.owners, Roles.python_community, Roles.partners) -SIGKILL = 9 -  REDO_EMOJI = '\U0001f501'  # :repeat:  REDO_TIMEOUT = 30  PythonVersion = Literal["3.10", "3.11"] -@dataclass -class EvalJob: -    """Job to be evaluated by snekbox.""" - -    args: list[str] -    files: list[FileAttachment] = field(default_factory=list) -    name: str = "eval" -    version: PythonVersion = "3.11" - -    @classmethod -    def from_code(cls, code: str, path: str = "main.py") -> EvalJob: -        """Create an EvalJob from a code string.""" -        return cls( -            args=[path], -            files=[FileAttachment(path, code.encode())], -        ) - -    def as_version(self, version: PythonVersion) -> EvalJob: -        """Return a copy of the job with a different Python version.""" -        return EvalJob( -            args=self.args, -            files=self.files, -            name=self.name, -            version=version, -        ) - -    def to_dict(self) -> dict[str, list[str | dict[str, str]]]: -        """Convert the job to a dict.""" -        return { -            "args": self.args, -            "files": [file.to_dict() for file in self.files], -        } - - -@dataclass(frozen=True) -class EvalResult: -    """The result of an eval job.""" - -    stdout: str -    returncode: int | None -    files: list[FileAttachment] = field(default_factory=list) -    err_files: list[str] = field(default_factory=list) - -    @property -    def status_emoji(self) -> str: -        """Return an emoji corresponding to the status code or lack of output in result.""" -        # If there are attachments, skip empty output warning -        if not self.stdout.strip() and not self.files:  # No output -            return ":warning:" -        elif self.returncode == 0:  # No error -            return ":white_check_mark:" -        else:  # Exception -            return ":x:" - -    def get_message(self, job: EvalJob) -> tuple[str, str]: -        """Return a user-friendly message and error corresponding to the process's return code.""" -        msg = f"Your {job.version} {job.name} job has completed with return code {self.returncode}" -        error = "" - -        if self.returncode is None: -            msg = f"Your {job.version} {job.name} job has failed" -            error = self.stdout.strip() -        elif self.returncode == 128 + SIGKILL: -            msg = f"Your {job.version} {job.name} job timed out or ran out of memory" -        elif self.returncode == 255: -            msg = f"Your {job.version} {job.name} job has failed" -            error = "A fatal NsJail error occurred" -        else: -            # Try to append signal's name if one exists -            with contextlib.suppress(ValueError): -                name = Signals(self.returncode - 128).name -                msg = f"{msg} ({name})" - -        # Add error message for failed attachments -        if self.err_files: -            failed_files = f"({', '.join(self.err_files)})" -            msg += ( -                f".\n\n> Some attached files were not able to be uploaded {failed_files}." -                f" Check that the file size is less than {sizeof_fmt(FILE_SIZE_LIMIT)}" -            ) - -        return msg, error - -    @classmethod -    def from_dict(cls, data: dict[str, str | int | list[dict[str, str]]]) -> EvalResult: -        """Create an EvalResult from a dict.""" -        res = cls( -            stdout=data["stdout"], -            returncode=data["returncode"], -        ) - -        for file in data.get("files", []): -            try: -                res.files.append(FileAttachment.from_dict(file)) -            except ValueError as e: -                log.info(f"Failed to parse file from snekbox response: {e}") -                res.err_files.append(file["path"]) - -        return res - -  class CodeblockConverter(Converter):      """Attempts to extract code from a codeblock, if provided.""" @@ -622,8 +517,3 @@ def predicate_message_edit(ctx: Context, old_msg: Message, new_msg: Message) ->  def predicate_emoji_reaction(ctx: Context, reaction: Reaction, user: User) -> bool:      """Return True if the reaction REDO_EMOJI was added by the context message author on this message."""      return reaction.message.id == ctx.message.id and user.id == ctx.author.id and str(reaction) == REDO_EMOJI - - -async def setup(bot: Bot) -> None: -    """Load the Snekbox cog.""" -    await bot.add_cog(Snekbox(bot)) diff --git a/bot/exts/utils/snekbox/_eval.py b/bot/exts/utils/snekbox/_eval.py new file mode 100644 index 000000000..784de5a10 --- /dev/null +++ b/bot/exts/utils/snekbox/_eval.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import contextlib +from dataclasses import dataclass, field +from signal import Signals +from typing import TYPE_CHECKING + +from bot.exts.utils.snekbox._io import FILE_SIZE_LIMIT, FileAttachment, sizeof_fmt +from bot.log import get_logger + +if TYPE_CHECKING: +    from bot.exts.utils.snekbox._cog import PythonVersion + +log = get_logger(__name__) + +SIGKILL = 9 + + +@dataclass +class EvalJob: +    """Job to be evaluated by snekbox.""" + +    args: list[str] +    files: list[FileAttachment] = field(default_factory=list) +    name: str = "eval" +    version: PythonVersion = "3.11" + +    @classmethod +    def from_code(cls, code: str, path: str = "main.py") -> EvalJob: +        """Create an EvalJob from a code string.""" +        return cls( +            args=[path], +            files=[FileAttachment(path, code.encode())], +        ) + +    def as_version(self, version: PythonVersion) -> EvalJob: +        """Return a copy of the job with a different Python version.""" +        return EvalJob( +            args=self.args, +            files=self.files, +            name=self.name, +            version=version, +        ) + +    def to_dict(self) -> dict[str, list[str | dict[str, str]]]: +        """Convert the job to a dict.""" +        return { +            "args": self.args, +            "files": [file.to_dict() for file in self.files], +        } + + +@dataclass(frozen=True) +class EvalResult: +    """The result of an eval job.""" + +    stdout: str +    returncode: int | None +    files: list[FileAttachment] = field(default_factory=list) +    err_files: list[str] = field(default_factory=list) + +    @property +    def status_emoji(self) -> str: +        """Return an emoji corresponding to the status code or lack of output in result.""" +        # If there are attachments, skip empty output warning +        if not self.stdout.strip() and not self.files:  # No output +            return ":warning:" +        elif self.returncode == 0:  # No error +            return ":white_check_mark:" +        else:  # Exception +            return ":x:" + +    def get_message(self, job: EvalJob) -> tuple[str, str]: +        """Return a user-friendly message and error corresponding to the process's return code.""" +        msg = f"Your {job.version} {job.name} job has completed with return code {self.returncode}" +        error = "" + +        if self.returncode is None: +            msg = f"Your {job.version} {job.name} job has failed" +            error = self.stdout.strip() +        elif self.returncode == 128 + SIGKILL: +            msg = f"Your {job.version} {job.name} job timed out or ran out of memory" +        elif self.returncode == 255: +            msg = f"Your {job.version} {job.name} job has failed" +            error = "A fatal NsJail error occurred" +        else: +            # Try to append signal's name if one exists +            with contextlib.suppress(ValueError): +                name = Signals(self.returncode - 128).name +                msg = f"{msg} ({name})" + +        # Add error message for failed attachments +        if self.err_files: +            failed_files = f"({', '.join(self.err_files)})" +            msg += ( +                f".\n\n> Some attached files were not able to be uploaded {failed_files}." +                f" Check that the file size is less than {sizeof_fmt(FILE_SIZE_LIMIT)}" +            ) + +        return msg, error + +    @classmethod +    def from_dict(cls, data: dict[str, str | int | list[dict[str, str]]]) -> EvalResult: +        """Create an EvalResult from a dict.""" +        res = cls( +            stdout=data["stdout"], +            returncode=data["returncode"], +        ) + +        for file in data.get("files", []): +            try: +                res.files.append(FileAttachment.from_dict(file)) +            except ValueError as e: +                log.info(f"Failed to parse file from snekbox response: {e}") +                res.err_files.append(file["path"]) + +        return res diff --git a/bot/exts/utils/snekio.py b/bot/exts/utils/snekbox/_io.py index a7f84a241..a7f84a241 100644 --- a/bot/exts/utils/snekio.py +++ b/bot/exts/utils/snekbox/_io.py diff --git a/tests/bot/exts/utils/test_snekbox.py b/tests/bot/exts/utils/test_snekbox.py index 722c5c569..31b1ca260 100644 --- a/tests/bot/exts/utils/test_snekbox.py +++ b/tests/bot/exts/utils/test_snekbox.py @@ -55,14 +55,18 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):      async def test_upload_output_reject_too_long(self):          """Reject output longer than MAX_PASTE_LENGTH.""" -        result = await self.cog.upload_output("-" * (snekbox.MAX_PASTE_LENGTH + 1)) +        result = await self.cog.upload_output("-" * (snekbox._cog.MAX_PASTE_LENGTH + 1))          self.assertEqual(result, "too long to upload") -    @patch("bot.exts.utils.snekbox.send_to_paste_service") +    @patch("bot.exts.utils.snekbox._cog.send_to_paste_service")      async def test_upload_output(self, mock_paste_util):          """Upload the eval output to the URLs.paste_service.format(key="documents") endpoint."""          await self.cog.upload_output("Test output.") -        mock_paste_util.assert_called_once_with("Test output.", extension="txt", max_length=snekbox.MAX_PASTE_LENGTH) +        mock_paste_util.assert_called_once_with( +            "Test output.", +            extension="txt", +            max_length=snekbox._cog.MAX_PASTE_LENGTH +        )      async def test_codeblock_converter(self):          ctx = MockContext() @@ -95,7 +99,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          )          for case, setup_code, test_name in cases: -            setup = snekbox.TIMEIT_SETUP_WRAPPER.format(setup=setup_code) +            setup = snekbox._cog.TIMEIT_SETUP_WRAPPER.format(setup=setup_code)              expected = [*base_args, setup, '\n'.join(case[1:] if setup_code else case)]              with self.subTest(msg=f'Test with {test_name} and expected return {expected}'):                  self.assertEqual(self.cog.prepare_timeit_input(case), expected) @@ -104,7 +108,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          """EvalResult.message, should return error and message."""          cases = (              ('ERROR', None, ('Your 3.11 eval job has failed', 'ERROR')), -            ('', 128 + snekbox.SIGKILL, ('Your 3.11 eval job timed out or ran out of memory', '')), +            ('', 128 + snekbox._eval.SIGKILL, ('Your 3.11 eval job timed out or ran out of memory', '')),              ('', 255, ('Your 3.11 eval job has failed', 'A fatal NsJail error occurred'))          )          for stdout, returncode, expected in cases: @@ -113,7 +117,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):                  job = EvalJob([])                  self.assertEqual(result.get_message(job), expected) -    @patch('bot.exts.utils.snekbox.Signals', side_effect=ValueError) +    @patch('bot.exts.utils.snekbox._eval.Signals', side_effect=ValueError)      def test_eval_result_message_invalid_signal(self, _mock_signals: Mock):          result = EvalResult(stdout="", returncode=127)          self.assertEqual( @@ -121,7 +125,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):              ("Your 3.10 eval job has completed with return code 127", "")          ) -    @patch('bot.exts.utils.snekbox.Signals') +    @patch('bot.exts.utils.snekbox._eval.Signals')      def test_eval_result_message_valid_signal(self, mock_signals: Mock):          mock_signals.return_value.name = "SIGTEST"          result = EvalResult(stdout="", returncode=127) @@ -328,7 +332,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          self.cog.post_job.assert_called_once_with(job)          self.cog.upload_output.assert_not_called() -    @patch("bot.exts.utils.snekbox.partial") +    @patch("bot.exts.utils.snekbox._cog.partial")      async def test_continue_job_does_continue(self, partial_mock):          """Test that the continue_job function does continue if required conditions are met."""          ctx = MockContext( @@ -353,14 +357,14 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):              (                  call(                      'message_edit', -                    check=partial_mock(snekbox.predicate_message_edit, ctx), -                    timeout=snekbox.REDO_TIMEOUT, +                    check=partial_mock(snekbox._cog.predicate_message_edit, ctx), +                    timeout=snekbox._cog.REDO_TIMEOUT,                  ), -                call('reaction_add', check=partial_mock(snekbox.predicate_emoji_reaction, ctx), timeout=10) +                call('reaction_add', check=partial_mock(snekbox._cog.predicate_emoji_reaction, ctx), timeout=10)              )          ) -        ctx.message.add_reaction.assert_called_once_with(snekbox.REDO_EMOJI) -        ctx.message.clear_reaction.assert_called_once_with(snekbox.REDO_EMOJI) +        ctx.message.add_reaction.assert_called_once_with(snekbox._cog.REDO_EMOJI) +        ctx.message.clear_reaction.assert_called_once_with(snekbox._cog.REDO_EMOJI)          response.delete.assert_called_once()      async def test_continue_job_does_not_continue(self): @@ -369,7 +373,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          actual = await self.cog.continue_job(ctx, MockMessage(), self.cog.eval_command)          self.assertEqual(actual, None) -        ctx.message.clear_reaction.assert_called_once_with(snekbox.REDO_EMOJI) +        ctx.message.clear_reaction.assert_called_once_with(snekbox._cog.REDO_EMOJI)      async def test_get_code(self):          """Should return 1st arg (or None) if eval cmd in message, otherwise return full content.""" @@ -411,18 +415,18 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          for ctx_msg, new_msg, expected, testname in cases:              with self.subTest(msg=f'Messages with {testname} return {expected}'):                  ctx = MockContext(message=ctx_msg) -                actual = snekbox.predicate_message_edit(ctx, ctx_msg, new_msg) +                actual = snekbox._cog.predicate_message_edit(ctx, ctx_msg, new_msg)                  self.assertEqual(actual, expected)      def test_predicate_emoji_reaction(self):          """Test the predicate_emoji_reaction function."""          valid_reaction = MockReaction(message=MockMessage(id=1)) -        valid_reaction.__str__.return_value = snekbox.REDO_EMOJI +        valid_reaction.__str__.return_value = snekbox._cog.REDO_EMOJI          valid_ctx = MockContext(message=MockMessage(id=1), author=MockUser(id=2))          valid_user = MockUser(id=2)          invalid_reaction_id = MockReaction(message=MockMessage(id=42)) -        invalid_reaction_id.__str__.return_value = snekbox.REDO_EMOJI +        invalid_reaction_id.__str__.return_value = snekbox._cog.REDO_EMOJI          invalid_user_id = MockUser(id=42)          invalid_reaction_str = MockReaction(message=MockMessage(id=1))          invalid_reaction_str.__str__.return_value = ':longbeard:' @@ -435,7 +439,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):          )          for reaction, user, expected, testname in cases:              with self.subTest(msg=f'Test with {testname} and expected return {expected}'): -                actual = snekbox.predicate_emoji_reaction(valid_ctx, reaction, user) +                actual = snekbox._cog.predicate_emoji_reaction(valid_ctx, reaction, user)                  self.assertEqual(actual, expected)  |