diff options
| -rw-r--r-- | bot/cogs/snekbox.py | 112 | ||||
| -rw-r--r-- | tests/bot/cogs/test_snekbox.py | 150 | 
2 files changed, 148 insertions, 114 deletions
| diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py index efa4696b5..25b2455e8 100644 --- a/bot/cogs/snekbox.py +++ b/bot/cogs/snekbox.py @@ -179,6 +179,68 @@ class Snekbox(Cog):          return output, paste_link +    async def send_eval(self, ctx: Context, code: str) -> Message: +        """ +        Evaluate code, format it, and send the output to the corresponding channel. + +        Return the bot response. +        """ +        async with ctx.typing(): +            results = await self.post_eval(code) +            msg, error = self.get_results_message(results) + +            if error: +                output, paste_link = error, None +            else: +                output, paste_link = await self.format_output(results["stdout"]) + +            icon = self.get_status_emoji(results) +            msg = f"{ctx.author.mention} {icon} {msg}.\n\n```py\n{output}\n```" +            if paste_link: +                msg = f"{msg}\nFull output: {paste_link}" + +            response = await ctx.send(msg) +            self.bot.loop.create_task( +                wait_for_deletion(response, user_ids=(ctx.author.id,), client=ctx.bot) +            ) + +            log.info(f"{ctx.author}'s job had a return code of {results['returncode']}") +        return response + +    async def continue_eval(self, ctx: Context, response: Message) -> Tuple[bool, Optional[str]]: +        """ +        Check if the eval session should continue. + +        First item of the returned tuple is if the eval session should continue, +        the second is the new code to evaluate. +        """ +        _predicate_eval_message_edit = partial(predicate_eval_message_edit, ctx) +        _predicate_emoji_reaction = partial(predicate_eval_emoji_reaction, ctx) + +        try: +            _, new_message = await self.bot.wait_for( +                'message_edit', +                check=_predicate_eval_message_edit, +                timeout=10 +            ) +            await ctx.message.add_reaction('🔁') +            await self.bot.wait_for( +                'reaction_add', +                check=_predicate_emoji_reaction, +                timeout=10 +            ) + +            code = new_message.content.split(' ', maxsplit=1)[1] +            await ctx.message.clear_reactions() +            with contextlib.suppress(HTTPException): +                await response.delete() + +        except asyncio.TimeoutError: +            await ctx.message.clear_reactions() +            return False, None + +        return True, code +      @command(name="eval", aliases=("e",))      @guild_only()      @in_channel(Channels.bot, hidden_channels=(Channels.esoteric,), bypass_roles=EVAL_ROLES) @@ -203,58 +265,18 @@ class Snekbox(Cog):          log.info(f"Received code from {ctx.author} for evaluation:\n{code}") -        _predicate_eval_message_edit = partial(predicate_eval_message_edit, ctx) -        _predicate_emoji_reaction = partial(predicate_eval_emoji_reaction, ctx) -          while True:              self.jobs[ctx.author.id] = datetime.datetime.now()              code = self.prepare_input(code) -              try: -                async with ctx.typing(): -                    results = await self.post_eval(code) -                    msg, error = self.get_results_message(results) - -                    if error: -                        output, paste_link = error, None -                    else: -                        output, paste_link = await self.format_output(results["stdout"]) - -                    icon = self.get_status_emoji(results) -                    msg = f"{ctx.author.mention} {icon} {msg}.\n\n```py\n{output}\n```" -                    if paste_link: -                        msg = f"{msg}\nFull output: {paste_link}" - -                    response = await ctx.send(msg) -                    self.bot.loop.create_task( -                        wait_for_deletion(response, user_ids=(ctx.author.id,), client=ctx.bot) -                    ) - -                    log.info(f"{ctx.author}'s job had a return code of {results['returncode']}") +                response = await self.send_eval(ctx, code)              finally:                  del self.jobs[ctx.author.id] -            try: -                _, new_message = await self.bot.wait_for( -                    'message_edit', -                    check=_predicate_eval_message_edit, -                    timeout=10 -                ) -                await ctx.message.add_reaction('🔁') -                await self.bot.wait_for( -                    'reaction_add', -                    check=_predicate_emoji_reaction, -                    timeout=10 -                ) - -                log.info(f"Re-evaluating message {ctx.message.id}") -                code = new_message.content.split(' ', maxsplit=1)[1] -                await ctx.message.clear_reactions() -                with contextlib.suppress(HTTPException): -                    await response.delete() -            except asyncio.TimeoutError: -                await ctx.message.clear_reactions() -                return +            continue_eval, code = await self.continue_eval(ctx, response) +            if not continue_eval: +                break +            log.info(f"Re-evaluating message {ctx.message.id}")  def predicate_eval_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> bool: diff --git a/tests/bot/cogs/test_snekbox.py b/tests/bot/cogs/test_snekbox.py index 112c923c8..c1c0f8d47 100644 --- a/tests/bot/cogs/test_snekbox.py +++ b/tests/bot/cogs/test_snekbox.py @@ -1,6 +1,7 @@  import asyncio  import logging  import unittest +from functools import partial  from unittest.mock import MagicMock, Mock, call, patch  from bot.cogs import snekbox @@ -175,28 +176,33 @@ class SnekboxTests(unittest.TestCase):      async def test_eval_command_evaluate_once(self):          """Test the eval command procedure."""          ctx = MockContext() -        ctx.message = MockMessage() -        ctx.send = AsyncMock() -        ctx.author.mention = '@LemonLemonishBeard#0042' -        ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None)) -        self.cog.post_eval = AsyncMock(return_value={'stdout': '', 'returncode': 0}) -        self.cog.get_results_message = MagicMock(return_value=('Return code 0', '')) -        self.cog.get_status_emoji = MagicMock(return_value=':yay!:') -        self.cog.format_output = AsyncMock(return_value=('[No output]', None)) -        self.bot.wait_for.side_effect = asyncio.TimeoutError +        response = MockMessage() +        self.cog.prepare_input = MagicMock(return_value='MyAwesomeFormattedCode') +        self.cog.send_eval = AsyncMock(return_value=response) +        self.cog.continue_eval = AsyncMock(return_value=(False, None))          await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') +        self.cog.prepare_input.assert_called_once_with('MyAwesomeCode') +        self.cog.send_eval.assert_called_once_with(ctx, 'MyAwesomeFormattedCode') +        self.cog.continue_eval.assert_called_once_with(ctx, response) -        ctx.send.assert_called_once_with( -            '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```' -        ) -        self.cog.post_eval.assert_called_once_with('MyAwesomeCode') -        self.cog.get_status_emoji.assert_called_once_with({'stdout': '', 'returncode': 0}) -        self.cog.get_results_message.assert_called_once_with({'stdout': '', 'returncode': 0}) -        self.cog.format_output.assert_called_once_with('') +    @async_test +    async def test_eval_command_evaluate_twice(self): +        """Test the eval and re-eval command procedure.""" +        ctx = MockContext() +        response = MockMessage() +        self.cog.prepare_input = MagicMock(return_value='MyAwesomeFormattedCode') +        self.cog.send_eval = AsyncMock(return_value=response) +        self.cog.continue_eval = AsyncMock() +        self.cog.continue_eval.side_effect = ((True, 'MyAwesomeCode-2'), (False, None)) + +        await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') +        self.cog.prepare_input.has_calls(call('MyAwesomeCode'), call('MyAwesomeCode-2')) +        self.cog.send_eval.assert_called_with(ctx, 'MyAwesomeFormattedCode') +        self.cog.continue_eval.assert_called_with(ctx, response)      @async_test -    async def test_eval_command_reject_two_eval(self): +    async def test_eval_command_reject_two_eval_at_the_same_time(self):          """Test if the eval command rejects an eval if the author already have a running eval."""          ctx = MockContext()          ctx.author.id = 42 @@ -217,92 +223,98 @@ class SnekboxTests(unittest.TestCase):          ctx.invoke.assert_called_once_with(self.bot.get_command("help"), "eval")      @async_test -    async def test_eval_command_return_error(self): -        """Test the eval command error handling.""" +    async def test_send_eval(self): +        """Test the send_eval function."""          ctx = MockContext()          ctx.message = MockMessage()          ctx.send = AsyncMock()          ctx.author.mention = '@LemonLemonishBeard#0042'          ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None)) -        self.cog.post_eval = AsyncMock(return_value={'stdout': 'ERROR', 'returncode': 127}) -        self.cog.get_results_message = MagicMock(return_value=('Return code 127', 'Error occurred')) -        self.cog.get_status_emoji = MagicMock(return_value=':nope!:') -        self.cog.format_output = AsyncMock() -        self.bot.wait_for.side_effect = asyncio.TimeoutError - -        await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') +        self.cog.post_eval = AsyncMock(return_value={'stdout': '', 'returncode': 0}) +        self.cog.get_results_message = MagicMock(return_value=('Return code 0', '')) +        self.cog.get_status_emoji = MagicMock(return_value=':yay!:') +        self.cog.format_output = AsyncMock(return_value=('[No output]', None)) +        await self.cog.send_eval(ctx, 'MyAwesomeCode')          ctx.send.assert_called_once_with( -            '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nError occurred\n```' +            '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```'          )          self.cog.post_eval.assert_called_once_with('MyAwesomeCode') -        self.cog.get_results_message.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127}) -        self.cog.get_status_emoji.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127}) -        self.cog.format_output.assert_not_called() +        self.cog.get_status_emoji.assert_called_once_with({'stdout': '', 'returncode': 0}) +        self.cog.get_results_message.assert_called_once_with({'stdout': '', 'returncode': 0}) +        self.cog.format_output.assert_called_once_with('')      @async_test -    async def test_eval_command_with_paste_link(self): -        """Test the eval command procedure with the use of a paste link.""" +    async def test_send_eval_with_paste_link(self): +        """Test the send_eval function with a too long output that generate a paste link."""          ctx = MockContext()          ctx.message = MockMessage()          ctx.send = AsyncMock()          ctx.author.mention = '@LemonLemonishBeard#0042'          ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None)) -        self.cog.post_eval = AsyncMock(return_value={'stdout': 'SuperLongBeard', 'returncode': 0}) +        self.cog.post_eval = AsyncMock(return_value={'stdout': 'Way too long beard', 'returncode': 0})          self.cog.get_results_message = MagicMock(return_value=('Return code 0', ''))          self.cog.get_status_emoji = MagicMock(return_value=':yay!:') -        self.cog.format_output = AsyncMock(return_value=('Truncated - too long beard', 'https://testificate.com/')) -        self.bot.wait_for.side_effect = asyncio.TimeoutError - -        await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') +        self.cog.format_output = AsyncMock(return_value=('Way too long beard', 'lookatmybeard.com')) +        await self.cog.send_eval(ctx, 'MyAwesomeCode')          ctx.send.assert_called_once_with( -            '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n' -            'Truncated - too long beard\n```\nFull output: https://testificate.com/' +            '@LemonLemonishBeard#0042 :yay!: Return code 0.' +            '\n\n```py\nWay too long beard\n```\nFull output: lookatmybeard.com'          )          self.cog.post_eval.assert_called_once_with('MyAwesomeCode') -        self.cog.get_status_emoji.assert_called_once_with({'stdout': 'SuperLongBeard', 'returncode': 0}) -        self.cog.get_results_message.assert_called_once_with({'stdout': 'SuperLongBeard', 'returncode': 0}) -        self.cog.format_output.assert_called_with('SuperLongBeard') +        self.cog.get_status_emoji.assert_called_once_with({'stdout': 'Way too long beard', 'returncode': 0}) +        self.cog.get_results_message.assert_called_once_with({'stdout': 'Way too long beard', 'returncode': 0}) +        self.cog.format_output.assert_called_once_with('Way too long beard')      @async_test -    async def test_eval_command_evaluate_twice(self): -        """Test the eval command re-evaluation procedure.""" +    async def test_send_eval_with_non_zero_eval(self): +        """Test the send_eval function with a code returning a non-zero code."""          ctx = MockContext()          ctx.message = MockMessage() -        ctx.message.content = '!e MyAwesomeCode' -        updated_msg = MockMessage() -        updated_msg .content = '!e MyAwesomeCode-2' -        response_msg = MockMessage() -        response_msg.delete = AsyncMock() -        ctx.send = AsyncMock(return_value=response_msg) +        ctx.send = AsyncMock()          ctx.author.mention = '@LemonLemonishBeard#0042'          ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None)) -        self.cog.post_eval = AsyncMock(return_value={'stdout': '', 'returncode': 0}) -        self.cog.get_results_message = MagicMock(return_value=('Return code 0', '')) -        self.cog.get_status_emoji = MagicMock(return_value=':yay!:') -        self.cog.format_output = AsyncMock(return_value=('[No output]', None)) -        self.bot.wait_for.side_effect = ((None, updated_msg), None, asyncio.TimeoutError) - -        await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') - -        self.cog.post_eval.assert_has_calls((call('MyAwesomeCode'), call('MyAwesomeCode-2'))) +        self.cog.post_eval = AsyncMock(return_value={'stdout': 'ERROR', 'returncode': 127}) +        self.cog.get_results_message = MagicMock(return_value=('Return code 127', 'Beard got stuck in the eval')) +        self.cog.get_status_emoji = MagicMock(return_value=':nope!:') +        self.cog.format_output = AsyncMock()  # This function isn't called -        # Multiplied by 2 because we expect it to be called twice -        ctx.send.assert_has_calls( -            [call('@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```')] * 2 +        await self.cog.send_eval(ctx, 'MyAwesomeCode') +        ctx.send.assert_called_once_with( +            '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nBeard got stuck in the eval\n```'          ) -        self.cog.get_status_emoji.assert_has_calls([call({'stdout': '', 'returncode': 0})] * 2) -        self.cog.get_results_message.assert_has_calls([call({'stdout': '', 'returncode': 0})] * 2) -        self.cog.format_output.assert_has_calls([call('')] * 2) +        self.cog.post_eval.assert_called_once_with('MyAwesomeCode') +        self.cog.get_status_emoji.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127}) +        self.cog.get_results_message.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127}) +        self.cog.format_output.assert_not_called() +    @async_test +    async def test_continue_eval_does_continue(self): +        """Test that the continue_eval function does continue if required conditions are met.""" +        ctx = MockContext(message=MockMessage(add_reaction=AsyncMock(), clear_reactions=AsyncMock())) +        response = MockMessage(delete=AsyncMock()) +        new_msg = MockMessage(content='!e NewCode') +        self.bot.wait_for.side_effect = ((None, new_msg), None) + +        actual = await self.cog.continue_eval(ctx, response) +        self.assertEqual(actual, (True, 'NewCode'))          self.bot.wait_for.has_calls( -            call('message_edit', check=snekbox.predicate_eval_message_edit, timeout=10), -            call('reaction_add', check=snekbox.predicate_eval_emoji_reaction, timeout=10) +            call('message_edit', partial(snekbox.predicate_eval_message_edit, ctx), timeout=10), +            call('reaction_add', partial(snekbox.predicate_eval_emoji_reaction, ctx), timeout=10)          )          ctx.message.add_reaction.assert_called_once_with('🔁') -        ctx.message.clear_reactions.assert_called() -        response_msg.delete.assert_called_once() +        ctx.message.clear_reactions.assert_called_once() +        response.delete.assert_called_once() + +    @async_test +    async def test_continue_eval_does_not_continue(self): +        ctx = MockContext(message=MockMessage(clear_reactions=AsyncMock())) +        self.bot.wait_for.side_effect = asyncio.TimeoutError + +        actual = await self.cog.continue_eval(ctx, MockMessage()) +        self.assertEqual(actual, (False, None)) +        ctx.message.clear_reactions.assert_called_once()      def test_predicate_eval_message_edit(self):          """Test the predicate_eval_message_edit function.""" | 
