aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/cogs/snekbox.py112
-rw-r--r--tests/bot/cogs/test_snekbox.py150
2 files changed, 148 insertions, 114 deletions
diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py
index efa4696b5..25b2455e8 100644
--- a/bot/cogs/snekbox.py
+++ b/bot/cogs/snekbox.py
@@ -179,6 +179,68 @@ class Snekbox(Cog):
return output, paste_link
+ async def send_eval(self, ctx: Context, code: str) -> Message:
+ """
+ Evaluate code, format it, and send the output to the corresponding channel.
+
+ Return the bot response.
+ """
+ async with ctx.typing():
+ results = await self.post_eval(code)
+ msg, error = self.get_results_message(results)
+
+ if error:
+ output, paste_link = error, None
+ else:
+ output, paste_link = await self.format_output(results["stdout"])
+
+ icon = self.get_status_emoji(results)
+ msg = f"{ctx.author.mention} {icon} {msg}.\n\n```py\n{output}\n```"
+ if paste_link:
+ msg = f"{msg}\nFull output: {paste_link}"
+
+ response = await ctx.send(msg)
+ self.bot.loop.create_task(
+ wait_for_deletion(response, user_ids=(ctx.author.id,), client=ctx.bot)
+ )
+
+ log.info(f"{ctx.author}'s job had a return code of {results['returncode']}")
+ return response
+
+ async def continue_eval(self, ctx: Context, response: Message) -> Tuple[bool, Optional[str]]:
+ """
+ Check if the eval session should continue.
+
+ First item of the returned tuple is if the eval session should continue,
+ the second is the new code to evaluate.
+ """
+ _predicate_eval_message_edit = partial(predicate_eval_message_edit, ctx)
+ _predicate_emoji_reaction = partial(predicate_eval_emoji_reaction, ctx)
+
+ try:
+ _, new_message = await self.bot.wait_for(
+ 'message_edit',
+ check=_predicate_eval_message_edit,
+ timeout=10
+ )
+ await ctx.message.add_reaction('🔁')
+ await self.bot.wait_for(
+ 'reaction_add',
+ check=_predicate_emoji_reaction,
+ timeout=10
+ )
+
+ code = new_message.content.split(' ', maxsplit=1)[1]
+ await ctx.message.clear_reactions()
+ with contextlib.suppress(HTTPException):
+ await response.delete()
+
+ except asyncio.TimeoutError:
+ await ctx.message.clear_reactions()
+ return False, None
+
+ return True, code
+
@command(name="eval", aliases=("e",))
@guild_only()
@in_channel(Channels.bot, hidden_channels=(Channels.esoteric,), bypass_roles=EVAL_ROLES)
@@ -203,58 +265,18 @@ class Snekbox(Cog):
log.info(f"Received code from {ctx.author} for evaluation:\n{code}")
- _predicate_eval_message_edit = partial(predicate_eval_message_edit, ctx)
- _predicate_emoji_reaction = partial(predicate_eval_emoji_reaction, ctx)
-
while True:
self.jobs[ctx.author.id] = datetime.datetime.now()
code = self.prepare_input(code)
-
try:
- async with ctx.typing():
- results = await self.post_eval(code)
- msg, error = self.get_results_message(results)
-
- if error:
- output, paste_link = error, None
- else:
- output, paste_link = await self.format_output(results["stdout"])
-
- icon = self.get_status_emoji(results)
- msg = f"{ctx.author.mention} {icon} {msg}.\n\n```py\n{output}\n```"
- if paste_link:
- msg = f"{msg}\nFull output: {paste_link}"
-
- response = await ctx.send(msg)
- self.bot.loop.create_task(
- wait_for_deletion(response, user_ids=(ctx.author.id,), client=ctx.bot)
- )
-
- log.info(f"{ctx.author}'s job had a return code of {results['returncode']}")
+ response = await self.send_eval(ctx, code)
finally:
del self.jobs[ctx.author.id]
- try:
- _, new_message = await self.bot.wait_for(
- 'message_edit',
- check=_predicate_eval_message_edit,
- timeout=10
- )
- await ctx.message.add_reaction('🔁')
- await self.bot.wait_for(
- 'reaction_add',
- check=_predicate_emoji_reaction,
- timeout=10
- )
-
- log.info(f"Re-evaluating message {ctx.message.id}")
- code = new_message.content.split(' ', maxsplit=1)[1]
- await ctx.message.clear_reactions()
- with contextlib.suppress(HTTPException):
- await response.delete()
- except asyncio.TimeoutError:
- await ctx.message.clear_reactions()
- return
+ continue_eval, code = await self.continue_eval(ctx, response)
+ if not continue_eval:
+ break
+ log.info(f"Re-evaluating message {ctx.message.id}")
def predicate_eval_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> bool:
diff --git a/tests/bot/cogs/test_snekbox.py b/tests/bot/cogs/test_snekbox.py
index 112c923c8..c1c0f8d47 100644
--- a/tests/bot/cogs/test_snekbox.py
+++ b/tests/bot/cogs/test_snekbox.py
@@ -1,6 +1,7 @@
import asyncio
import logging
import unittest
+from functools import partial
from unittest.mock import MagicMock, Mock, call, patch
from bot.cogs import snekbox
@@ -175,28 +176,33 @@ class SnekboxTests(unittest.TestCase):
async def test_eval_command_evaluate_once(self):
"""Test the eval command procedure."""
ctx = MockContext()
- ctx.message = MockMessage()
- ctx.send = AsyncMock()
- ctx.author.mention = '@LemonLemonishBeard#0042'
- ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None))
- self.cog.post_eval = AsyncMock(return_value={'stdout': '', 'returncode': 0})
- self.cog.get_results_message = MagicMock(return_value=('Return code 0', ''))
- self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
- self.cog.format_output = AsyncMock(return_value=('[No output]', None))
- self.bot.wait_for.side_effect = asyncio.TimeoutError
+ response = MockMessage()
+ self.cog.prepare_input = MagicMock(return_value='MyAwesomeFormattedCode')
+ self.cog.send_eval = AsyncMock(return_value=response)
+ self.cog.continue_eval = AsyncMock(return_value=(False, None))
await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ self.cog.prepare_input.assert_called_once_with('MyAwesomeCode')
+ self.cog.send_eval.assert_called_once_with(ctx, 'MyAwesomeFormattedCode')
+ self.cog.continue_eval.assert_called_once_with(ctx, response)
- ctx.send.assert_called_once_with(
- '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```'
- )
- self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
- self.cog.get_status_emoji.assert_called_once_with({'stdout': '', 'returncode': 0})
- self.cog.get_results_message.assert_called_once_with({'stdout': '', 'returncode': 0})
- self.cog.format_output.assert_called_once_with('')
+ @async_test
+ async def test_eval_command_evaluate_twice(self):
+ """Test the eval and re-eval command procedure."""
+ ctx = MockContext()
+ response = MockMessage()
+ self.cog.prepare_input = MagicMock(return_value='MyAwesomeFormattedCode')
+ self.cog.send_eval = AsyncMock(return_value=response)
+ self.cog.continue_eval = AsyncMock()
+ self.cog.continue_eval.side_effect = ((True, 'MyAwesomeCode-2'), (False, None))
+
+ await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ self.cog.prepare_input.has_calls(call('MyAwesomeCode'), call('MyAwesomeCode-2'))
+ self.cog.send_eval.assert_called_with(ctx, 'MyAwesomeFormattedCode')
+ self.cog.continue_eval.assert_called_with(ctx, response)
@async_test
- async def test_eval_command_reject_two_eval(self):
+ async def test_eval_command_reject_two_eval_at_the_same_time(self):
"""Test if the eval command rejects an eval if the author already have a running eval."""
ctx = MockContext()
ctx.author.id = 42
@@ -217,92 +223,98 @@ class SnekboxTests(unittest.TestCase):
ctx.invoke.assert_called_once_with(self.bot.get_command("help"), "eval")
@async_test
- async def test_eval_command_return_error(self):
- """Test the eval command error handling."""
+ async def test_send_eval(self):
+ """Test the send_eval function."""
ctx = MockContext()
ctx.message = MockMessage()
ctx.send = AsyncMock()
ctx.author.mention = '@LemonLemonishBeard#0042'
ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None))
- self.cog.post_eval = AsyncMock(return_value={'stdout': 'ERROR', 'returncode': 127})
- self.cog.get_results_message = MagicMock(return_value=('Return code 127', 'Error occurred'))
- self.cog.get_status_emoji = MagicMock(return_value=':nope!:')
- self.cog.format_output = AsyncMock()
- self.bot.wait_for.side_effect = asyncio.TimeoutError
-
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ self.cog.post_eval = AsyncMock(return_value={'stdout': '', 'returncode': 0})
+ self.cog.get_results_message = MagicMock(return_value=('Return code 0', ''))
+ self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
+ self.cog.format_output = AsyncMock(return_value=('[No output]', None))
+ await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
- '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nError occurred\n```'
+ '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```'
)
self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
- self.cog.get_results_message.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127})
- self.cog.get_status_emoji.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127})
- self.cog.format_output.assert_not_called()
+ self.cog.get_status_emoji.assert_called_once_with({'stdout': '', 'returncode': 0})
+ self.cog.get_results_message.assert_called_once_with({'stdout': '', 'returncode': 0})
+ self.cog.format_output.assert_called_once_with('')
@async_test
- async def test_eval_command_with_paste_link(self):
- """Test the eval command procedure with the use of a paste link."""
+ async def test_send_eval_with_paste_link(self):
+ """Test the send_eval function with a too long output that generate a paste link."""
ctx = MockContext()
ctx.message = MockMessage()
ctx.send = AsyncMock()
ctx.author.mention = '@LemonLemonishBeard#0042'
ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None))
- self.cog.post_eval = AsyncMock(return_value={'stdout': 'SuperLongBeard', 'returncode': 0})
+ self.cog.post_eval = AsyncMock(return_value={'stdout': 'Way too long beard', 'returncode': 0})
self.cog.get_results_message = MagicMock(return_value=('Return code 0', ''))
self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
- self.cog.format_output = AsyncMock(return_value=('Truncated - too long beard', 'https://testificate.com/'))
- self.bot.wait_for.side_effect = asyncio.TimeoutError
-
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ self.cog.format_output = AsyncMock(return_value=('Way too long beard', 'lookatmybeard.com'))
+ await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
- '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n'
- 'Truncated - too long beard\n```\nFull output: https://testificate.com/'
+ '@LemonLemonishBeard#0042 :yay!: Return code 0.'
+ '\n\n```py\nWay too long beard\n```\nFull output: lookatmybeard.com'
)
self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
- self.cog.get_status_emoji.assert_called_once_with({'stdout': 'SuperLongBeard', 'returncode': 0})
- self.cog.get_results_message.assert_called_once_with({'stdout': 'SuperLongBeard', 'returncode': 0})
- self.cog.format_output.assert_called_with('SuperLongBeard')
+ self.cog.get_status_emoji.assert_called_once_with({'stdout': 'Way too long beard', 'returncode': 0})
+ self.cog.get_results_message.assert_called_once_with({'stdout': 'Way too long beard', 'returncode': 0})
+ self.cog.format_output.assert_called_once_with('Way too long beard')
@async_test
- async def test_eval_command_evaluate_twice(self):
- """Test the eval command re-evaluation procedure."""
+ async def test_send_eval_with_non_zero_eval(self):
+ """Test the send_eval function with a code returning a non-zero code."""
ctx = MockContext()
ctx.message = MockMessage()
- ctx.message.content = '!e MyAwesomeCode'
- updated_msg = MockMessage()
- updated_msg .content = '!e MyAwesomeCode-2'
- response_msg = MockMessage()
- response_msg.delete = AsyncMock()
- ctx.send = AsyncMock(return_value=response_msg)
+ ctx.send = AsyncMock()
ctx.author.mention = '@LemonLemonishBeard#0042'
ctx.typing = MagicMock(return_value=AsyncContextManagerMock(None))
- self.cog.post_eval = AsyncMock(return_value={'stdout': '', 'returncode': 0})
- self.cog.get_results_message = MagicMock(return_value=('Return code 0', ''))
- self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
- self.cog.format_output = AsyncMock(return_value=('[No output]', None))
- self.bot.wait_for.side_effect = ((None, updated_msg), None, asyncio.TimeoutError)
-
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
-
- self.cog.post_eval.assert_has_calls((call('MyAwesomeCode'), call('MyAwesomeCode-2')))
+ self.cog.post_eval = AsyncMock(return_value={'stdout': 'ERROR', 'returncode': 127})
+ self.cog.get_results_message = MagicMock(return_value=('Return code 127', 'Beard got stuck in the eval'))
+ self.cog.get_status_emoji = MagicMock(return_value=':nope!:')
+ self.cog.format_output = AsyncMock() # This function isn't called
- # Multiplied by 2 because we expect it to be called twice
- ctx.send.assert_has_calls(
- [call('@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```')] * 2
+ await self.cog.send_eval(ctx, 'MyAwesomeCode')
+ ctx.send.assert_called_once_with(
+ '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nBeard got stuck in the eval\n```'
)
- self.cog.get_status_emoji.assert_has_calls([call({'stdout': '', 'returncode': 0})] * 2)
- self.cog.get_results_message.assert_has_calls([call({'stdout': '', 'returncode': 0})] * 2)
- self.cog.format_output.assert_has_calls([call('')] * 2)
+ self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
+ self.cog.get_status_emoji.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127})
+ self.cog.get_results_message.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127})
+ self.cog.format_output.assert_not_called()
+ @async_test
+ async def test_continue_eval_does_continue(self):
+ """Test that the continue_eval function does continue if required conditions are met."""
+ ctx = MockContext(message=MockMessage(add_reaction=AsyncMock(), clear_reactions=AsyncMock()))
+ response = MockMessage(delete=AsyncMock())
+ new_msg = MockMessage(content='!e NewCode')
+ self.bot.wait_for.side_effect = ((None, new_msg), None)
+
+ actual = await self.cog.continue_eval(ctx, response)
+ self.assertEqual(actual, (True, 'NewCode'))
self.bot.wait_for.has_calls(
- call('message_edit', check=snekbox.predicate_eval_message_edit, timeout=10),
- call('reaction_add', check=snekbox.predicate_eval_emoji_reaction, timeout=10)
+ call('message_edit', partial(snekbox.predicate_eval_message_edit, ctx), timeout=10),
+ call('reaction_add', partial(snekbox.predicate_eval_emoji_reaction, ctx), timeout=10)
)
ctx.message.add_reaction.assert_called_once_with('🔁')
- ctx.message.clear_reactions.assert_called()
- response_msg.delete.assert_called_once()
+ ctx.message.clear_reactions.assert_called_once()
+ response.delete.assert_called_once()
+
+ @async_test
+ async def test_continue_eval_does_not_continue(self):
+ ctx = MockContext(message=MockMessage(clear_reactions=AsyncMock()))
+ self.bot.wait_for.side_effect = asyncio.TimeoutError
+
+ actual = await self.cog.continue_eval(ctx, MockMessage())
+ self.assertEqual(actual, (False, None))
+ ctx.message.clear_reactions.assert_called_once()
def test_predicate_eval_message_edit(self):
"""Test the predicate_eval_message_edit function."""