aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/utils/snekbox.py164
1 files changed, 135 insertions, 29 deletions
diff --git a/bot/exts/utils/snekbox.py b/bot/exts/utils/snekbox.py
index 48bb1d3de..6841527df 100644
--- a/bot/exts/utils/snekbox.py
+++ b/bot/exts/utils/snekbox.py
@@ -1,22 +1,23 @@
import asyncio
import contextlib
-import datetime
import re
from functools import partial
+from operator import attrgetter
from signal import Signals
from textwrap import dedent
-from typing import Optional, Tuple
+from typing import Literal, Optional, Tuple
from botcore.utils import scheduling
from botcore.utils.regex import FORMATTED_CODE_REGEX, RAW_CODE_REGEX
-from discord import AllowedMentions, HTTPException, Message, NotFound, Reaction, User
-from discord.ext.commands import Cog, Command, Context, Converter, command, guild_only
+from discord import AllowedMentions, HTTPException, Interaction, Member, Message, NotFound, Reaction, User, ui
+from discord.ext.commands import Cog, Command, Context, Converter, command, errors, guild_only
from bot.bot import Bot
from bot.constants import Categories, Channels, Roles, URLs
from bot.decorators import redirect_output
from bot.log import get_logger
from bot.utils import send_to_paste_service
+from bot.utils.lock import LockedResourceError, lock_arg
from bot.utils.messages import wait_for_deletion
from bot.utils.services import PasteTooLongError, PasteUploadError
@@ -116,6 +117,58 @@ class CodeblockConverter(Converter):
return codeblocks
+class PythonVersionSwitcherView(ui.View):
+ """
+ A view to hold the Try in X Python version button.
+
+ A subclass is required to implement a custom interaction_check so only the command invoker
+ can change the Python version used.
+ """
+
+ def __init__(self, owner_id: int) -> None:
+ super().__init__()
+ self.owner_id = owner_id
+
+ async def interaction_check(self, interaction: Interaction) -> bool:
+ """Ensure the user clicking the button is the view owner."""
+ if self.owner_id == interaction.user.id:
+ return True
+
+ await interaction.response.send_message("This is not your button to click!", ephemeral=True)
+ return False
+
+
+class PythonVersionSwitcherButton(ui.Button):
+ """A button that allows users to re-run their eval command in a different Python version."""
+
+ def __init__(
+ self,
+ job_name: str,
+ version_to_switch_to: Literal["3.10", "3.11"],
+ snekbox_cog: "Snekbox",
+ ctx: Context,
+ code: str
+ ) -> None:
+ self.version_to_switch_to = version_to_switch_to
+ super().__init__(label=f"Run in {self.version_to_switch_to}")
+
+ self.snekbox_cog = snekbox_cog
+ self.ctx = ctx
+ self.job_name = job_name
+ self.code = code
+
+ async def callback(self, interaction: Interaction) -> None:
+ """
+ Tell snekbox to re-run the user's code in the alternative Python version.
+
+ Use a task calling snekbox, as run_job is blocking while it waits for edit/reaction on the message.
+ """
+ # Call run_job in a task as blocking here would cause the interaction to be reported as failed by Discord's UI.
+ # Discord.py only ACKs the interaction after this callback finishes.
+ scheduling.create_task(self.snekbox_cog.run_job(self.job_name, self.ctx, self.version_to_switch_to, self.code))
+ await interaction.message.delete()
+
+
class Snekbox(Cog):
"""Safe evaluation of Python code using Snekbox."""
@@ -123,9 +176,37 @@ class Snekbox(Cog):
self.bot = bot
self.jobs = {}
- async def post_job(self, code: str, *, args: Optional[list[str]] = None) -> dict:
+ def build_python_version_switcher_view(
+ self,
+ job_name: str,
+ member: Member,
+ current_python_version: Literal["3.10", "3.11"],
+ ctx: Context,
+ code: str
+ ) -> None:
+ """Return a view that allows the user to change what version of Python their code is run on."""
+ if current_python_version == "3.10":
+ alt_python_version = "3.11"
+ else:
+ alt_python_version = "3.10"
+
+ view = PythonVersionSwitcherView(member.id)
+ view.add_item(PythonVersionSwitcherButton(job_name, alt_python_version, self, ctx, code))
+ return view
+
+ async def post_job(
+ self,
+ code: str,
+ python_version: Literal["3.10", "3.11"],
+ *,
+ args: Optional[list[str]] = None
+ ) -> dict:
"""Send a POST request to the Snekbox API to evaluate code and return the results."""
- url = URLs.snekbox_eval_api
+ if python_version == "3.10":
+ url = URLs.snekbox_eval_api
+ else:
+ url = URLs.snekbox_311_eval_api
+
data = {"input": code}
if args is not None:
@@ -244,9 +325,11 @@ class Snekbox(Cog):
return output, paste_link
+ @lock_arg("snekbox.send_job", "ctx", attrgetter("author.id"), raise_error=True)
async def send_job(
self,
ctx: Context,
+ python_version: Literal["3.10", "3.11"],
code: str,
*,
args: Optional[list[str]] = None,
@@ -258,7 +341,7 @@ class Snekbox(Cog):
Return the bot response.
"""
async with ctx.typing():
- results = await self.post_job(code, args=args)
+ results = await self.post_job(code, python_version, args=args)
msg, error = self.get_results_message(results, job_name)
if error:
@@ -286,14 +369,15 @@ class Snekbox(Cog):
response = await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.")
else:
allowed_mentions = AllowedMentions(everyone=False, roles=False, users=[ctx.author])
- response = await ctx.send(msg, allowed_mentions=allowed_mentions)
+ view = self.build_python_version_switcher_view(job_name, ctx.author, python_version, ctx, code)
+ response = await ctx.send(msg, allowed_mentions=allowed_mentions, view=view)
scheduling.create_task(wait_for_deletion(response, (ctx.author.id,)), event_loop=self.bot.loop)
log.info(f"{ctx.author}'s {job_name} job had a return code of {results['returncode']}")
return response
async def continue_job(
- self, ctx: Context, response: Message, command: Command
+ self, ctx: Context, response: Message, job_name: str
) -> tuple[Optional[str], Optional[list[str]]]:
"""
Check if the job's session should continue.
@@ -318,6 +402,11 @@ class Snekbox(Cog):
timeout=10
)
+ # Ensure the response that's about to be edited is still the most recent.
+ # This could have already been updated via a button press to switch to an alt Python version.
+ if self.jobs[ctx.message.id] != response.id:
+ return None, None
+
code = await self.get_code(new_message, ctx.command)
await ctx.message.clear_reaction(REDO_EMOJI)
with contextlib.suppress(HTTPException):
@@ -332,7 +421,7 @@ class Snekbox(Cog):
codeblocks = await CodeblockConverter.convert(ctx, code)
- if command is self.timeit_command:
+ if job_name == "timeit":
return self.prepare_timeit_input(codeblocks)
else:
return "\n".join(codeblocks), None
@@ -363,18 +452,12 @@ class Snekbox(Cog):
self,
job_name: str,
ctx: Context,
+ python_version: Literal["3.10", "3.11"],
code: str,
*,
args: Optional[list[str]] = None,
) -> None:
"""Handles checks, stats and re-evaluation of a snekbox job."""
- if ctx.author.id in self.jobs:
- await ctx.send(
- f"{ctx.author.mention} You've already got a job running - "
- "please wait for it to finish!"
- )
- return
-
if Roles.helpers in (role.id for role in ctx.author.roles):
self.bot.stats.incr("snekbox_usages.roles.helpers")
else:
@@ -390,18 +473,19 @@ class Snekbox(Cog):
log.info(f"Received code from {ctx.author} for evaluation:\n{code}")
while True:
- self.jobs[ctx.author.id] = datetime.datetime.now()
- try:
- response = await self.send_job(ctx, code, args=args, job_name=job_name)
- finally:
- del self.jobs[ctx.author.id]
+ response = await self.send_job(ctx, python_version, code, args=args, job_name=job_name)
- code, args = await self.continue_job(ctx, response, ctx.command)
+ # Store the bot's response message id per invocation, to ensure the `wait_for`s in `continue_job`
+ # don't trigger if the response has already been replaced by a new response.
+ # This can happen when a button is pressed and then original code is edited and re-run.
+ self.jobs[ctx.message.id] = response.id
+
+ code, args = await self.continue_job(ctx, response, job_name)
if not code:
break
log.info(f"Re-evaluating code from message {ctx.message.id}:\n{code}")
- @command(name="eval", aliases=("e",), usage="<code, ...>")
+ @command(name="eval", aliases=("e",), usage="[python_version] <code, ...>")
@guild_only()
@redirect_output(
destination_channel=Channels.bot_commands,
@@ -410,7 +494,13 @@ class Snekbox(Cog):
channels=NO_SNEKBOX_CHANNELS,
ping_user=False
)
- async def eval_command(self, ctx: Context, *, code: CodeblockConverter) -> None:
+ async def eval_command(
+ self,
+ ctx: Context,
+ python_version: Optional[Literal["3.10", "3.11"]],
+ *,
+ code: CodeblockConverter
+ ) -> None:
"""
Run Python code and get the results.
@@ -421,12 +511,17 @@ class Snekbox(Cog):
If multiple codeblocks are in a message, all of them will be joined and evaluated,
ignoring the text outside of them.
+ By default your code is run on Python's 3.11 beta release, to assist with testing. If you
+ run into issues related to this Python version, you can request the bot to use Python
+ 3.10 by specifying the `python_version` arg and setting it to `3.10`.
+
We've done our best to make this sandboxed, but do let us know if you manage to find an
issue with it!
"""
- await self.run_job("eval", ctx, "\n".join(code))
+ python_version = python_version or "3.11"
+ await self.run_job("eval", ctx, python_version, "\n".join(code))
- @command(name="timeit", aliases=("ti",), usage="[setup_code] <code, ...>")
+ @command(name="timeit", aliases=("ti",), usage="[python_version] [setup_code] <code, ...>")
@guild_only()
@redirect_output(
destination_channel=Channels.bot_commands,
@@ -435,7 +530,13 @@ class Snekbox(Cog):
channels=NO_SNEKBOX_CHANNELS,
ping_user=False
)
- async def timeit_command(self, ctx: Context, *, code: CodeblockConverter) -> None:
+ async def timeit_command(
+ self,
+ ctx: Context,
+ python_version: Optional[Literal["3.10", "3.11"]],
+ *,
+ code: CodeblockConverter
+ ) -> None:
"""
Profile Python Code to find execution time.
@@ -446,12 +547,17 @@ class Snekbox(Cog):
If multiple formatted codeblocks are provided, the first one will be the setup code, which will
not be timed. The remaining codeblocks will be joined together and timed.
+ By default your code is run on Python's 3.11 beta release, to assist with testing. If you
+ run into issues related to this Python version, you can request the bot to use Python
+ 3.10 by specifying the `python_version` arg and setting it to `3.10`.
+
We've done our best to make this sandboxed, but do let us know if you manage to find an
issue with it!
"""
+ python_version = python_version or "3.11"
code, args = self.prepare_timeit_input(code)
- await self.run_job("timeit", ctx, code=code, args=args)
+ await self.run_job("timeit", ctx, python_version, code=code, args=args)
def predicate_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> bool: