diff options
-rw-r--r-- | bot/exts/utils/snekbox.py | 164 |
1 files changed, 135 insertions, 29 deletions
diff --git a/bot/exts/utils/snekbox.py b/bot/exts/utils/snekbox.py index 48bb1d3de..6841527df 100644 --- a/bot/exts/utils/snekbox.py +++ b/bot/exts/utils/snekbox.py @@ -1,22 +1,23 @@ import asyncio import contextlib -import datetime import re from functools import partial +from operator import attrgetter from signal import Signals from textwrap import dedent -from typing import Optional, Tuple +from typing import Literal, Optional, Tuple from botcore.utils import scheduling from botcore.utils.regex import FORMATTED_CODE_REGEX, RAW_CODE_REGEX -from discord import AllowedMentions, HTTPException, Message, NotFound, Reaction, User -from discord.ext.commands import Cog, Command, Context, Converter, command, guild_only +from discord import AllowedMentions, HTTPException, Interaction, Member, Message, NotFound, Reaction, User, ui +from discord.ext.commands import Cog, Command, Context, Converter, command, errors, guild_only from bot.bot import Bot from bot.constants import Categories, Channels, Roles, URLs from bot.decorators import redirect_output from bot.log import get_logger from bot.utils import send_to_paste_service +from bot.utils.lock import LockedResourceError, lock_arg from bot.utils.messages import wait_for_deletion from bot.utils.services import PasteTooLongError, PasteUploadError @@ -116,6 +117,58 @@ class CodeblockConverter(Converter): return codeblocks +class PythonVersionSwitcherView(ui.View): + """ + A view to hold the Try in X Python version button. + + A subclass is required to implement a custom interaction_check so only the command invoker + can change the Python version used. + """ + + def __init__(self, owner_id: int) -> None: + super().__init__() + self.owner_id = owner_id + + async def interaction_check(self, interaction: Interaction) -> bool: + """Ensure the user clicking the button is the view owner.""" + if self.owner_id == interaction.user.id: + return True + + await interaction.response.send_message("This is not your button to click!", ephemeral=True) + return False + + +class PythonVersionSwitcherButton(ui.Button): + """A button that allows users to re-run their eval command in a different Python version.""" + + def __init__( + self, + job_name: str, + version_to_switch_to: Literal["3.10", "3.11"], + snekbox_cog: "Snekbox", + ctx: Context, + code: str + ) -> None: + self.version_to_switch_to = version_to_switch_to + super().__init__(label=f"Run in {self.version_to_switch_to}") + + self.snekbox_cog = snekbox_cog + self.ctx = ctx + self.job_name = job_name + self.code = code + + async def callback(self, interaction: Interaction) -> None: + """ + Tell snekbox to re-run the user's code in the alternative Python version. + + Use a task calling snekbox, as run_job is blocking while it waits for edit/reaction on the message. + """ + # Call run_job in a task as blocking here would cause the interaction to be reported as failed by Discord's UI. + # Discord.py only ACKs the interaction after this callback finishes. + scheduling.create_task(self.snekbox_cog.run_job(self.job_name, self.ctx, self.version_to_switch_to, self.code)) + await interaction.message.delete() + + class Snekbox(Cog): """Safe evaluation of Python code using Snekbox.""" @@ -123,9 +176,37 @@ class Snekbox(Cog): self.bot = bot self.jobs = {} - async def post_job(self, code: str, *, args: Optional[list[str]] = None) -> dict: + def build_python_version_switcher_view( + self, + job_name: str, + member: Member, + current_python_version: Literal["3.10", "3.11"], + ctx: Context, + code: str + ) -> None: + """Return a view that allows the user to change what version of Python their code is run on.""" + if current_python_version == "3.10": + alt_python_version = "3.11" + else: + alt_python_version = "3.10" + + view = PythonVersionSwitcherView(member.id) + view.add_item(PythonVersionSwitcherButton(job_name, alt_python_version, self, ctx, code)) + return view + + async def post_job( + self, + code: str, + python_version: Literal["3.10", "3.11"], + *, + args: Optional[list[str]] = None + ) -> dict: """Send a POST request to the Snekbox API to evaluate code and return the results.""" - url = URLs.snekbox_eval_api + if python_version == "3.10": + url = URLs.snekbox_eval_api + else: + url = URLs.snekbox_311_eval_api + data = {"input": code} if args is not None: @@ -244,9 +325,11 @@ class Snekbox(Cog): return output, paste_link + @lock_arg("snekbox.send_job", "ctx", attrgetter("author.id"), raise_error=True) async def send_job( self, ctx: Context, + python_version: Literal["3.10", "3.11"], code: str, *, args: Optional[list[str]] = None, @@ -258,7 +341,7 @@ class Snekbox(Cog): Return the bot response. """ async with ctx.typing(): - results = await self.post_job(code, args=args) + results = await self.post_job(code, python_version, args=args) msg, error = self.get_results_message(results, job_name) if error: @@ -286,14 +369,15 @@ class Snekbox(Cog): response = await ctx.send("Attempt to circumvent filter detected. Moderator team has been alerted.") else: allowed_mentions = AllowedMentions(everyone=False, roles=False, users=[ctx.author]) - response = await ctx.send(msg, allowed_mentions=allowed_mentions) + view = self.build_python_version_switcher_view(job_name, ctx.author, python_version, ctx, code) + response = await ctx.send(msg, allowed_mentions=allowed_mentions, view=view) scheduling.create_task(wait_for_deletion(response, (ctx.author.id,)), event_loop=self.bot.loop) log.info(f"{ctx.author}'s {job_name} job had a return code of {results['returncode']}") return response async def continue_job( - self, ctx: Context, response: Message, command: Command + self, ctx: Context, response: Message, job_name: str ) -> tuple[Optional[str], Optional[list[str]]]: """ Check if the job's session should continue. @@ -318,6 +402,11 @@ class Snekbox(Cog): timeout=10 ) + # Ensure the response that's about to be edited is still the most recent. + # This could have already been updated via a button press to switch to an alt Python version. + if self.jobs[ctx.message.id] != response.id: + return None, None + code = await self.get_code(new_message, ctx.command) await ctx.message.clear_reaction(REDO_EMOJI) with contextlib.suppress(HTTPException): @@ -332,7 +421,7 @@ class Snekbox(Cog): codeblocks = await CodeblockConverter.convert(ctx, code) - if command is self.timeit_command: + if job_name == "timeit": return self.prepare_timeit_input(codeblocks) else: return "\n".join(codeblocks), None @@ -363,18 +452,12 @@ class Snekbox(Cog): self, job_name: str, ctx: Context, + python_version: Literal["3.10", "3.11"], code: str, *, args: Optional[list[str]] = None, ) -> None: """Handles checks, stats and re-evaluation of a snekbox job.""" - if ctx.author.id in self.jobs: - await ctx.send( - f"{ctx.author.mention} You've already got a job running - " - "please wait for it to finish!" - ) - return - if Roles.helpers in (role.id for role in ctx.author.roles): self.bot.stats.incr("snekbox_usages.roles.helpers") else: @@ -390,18 +473,19 @@ class Snekbox(Cog): log.info(f"Received code from {ctx.author} for evaluation:\n{code}") while True: - self.jobs[ctx.author.id] = datetime.datetime.now() - try: - response = await self.send_job(ctx, code, args=args, job_name=job_name) - finally: - del self.jobs[ctx.author.id] + response = await self.send_job(ctx, python_version, code, args=args, job_name=job_name) - code, args = await self.continue_job(ctx, response, ctx.command) + # Store the bot's response message id per invocation, to ensure the `wait_for`s in `continue_job` + # don't trigger if the response has already been replaced by a new response. + # This can happen when a button is pressed and then original code is edited and re-run. + self.jobs[ctx.message.id] = response.id + + code, args = await self.continue_job(ctx, response, job_name) if not code: break log.info(f"Re-evaluating code from message {ctx.message.id}:\n{code}") - @command(name="eval", aliases=("e",), usage="<code, ...>") + @command(name="eval", aliases=("e",), usage="[python_version] <code, ...>") @guild_only() @redirect_output( destination_channel=Channels.bot_commands, @@ -410,7 +494,13 @@ class Snekbox(Cog): channels=NO_SNEKBOX_CHANNELS, ping_user=False ) - async def eval_command(self, ctx: Context, *, code: CodeblockConverter) -> None: + async def eval_command( + self, + ctx: Context, + python_version: Optional[Literal["3.10", "3.11"]], + *, + code: CodeblockConverter + ) -> None: """ Run Python code and get the results. @@ -421,12 +511,17 @@ class Snekbox(Cog): If multiple codeblocks are in a message, all of them will be joined and evaluated, ignoring the text outside of them. + By default your code is run on Python's 3.11 beta release, to assist with testing. If you + run into issues related to this Python version, you can request the bot to use Python + 3.10 by specifying the `python_version` arg and setting it to `3.10`. + We've done our best to make this sandboxed, but do let us know if you manage to find an issue with it! """ - await self.run_job("eval", ctx, "\n".join(code)) + python_version = python_version or "3.11" + await self.run_job("eval", ctx, python_version, "\n".join(code)) - @command(name="timeit", aliases=("ti",), usage="[setup_code] <code, ...>") + @command(name="timeit", aliases=("ti",), usage="[python_version] [setup_code] <code, ...>") @guild_only() @redirect_output( destination_channel=Channels.bot_commands, @@ -435,7 +530,13 @@ class Snekbox(Cog): channels=NO_SNEKBOX_CHANNELS, ping_user=False ) - async def timeit_command(self, ctx: Context, *, code: CodeblockConverter) -> None: + async def timeit_command( + self, + ctx: Context, + python_version: Optional[Literal["3.10", "3.11"]], + *, + code: CodeblockConverter + ) -> None: """ Profile Python Code to find execution time. @@ -446,12 +547,17 @@ class Snekbox(Cog): If multiple formatted codeblocks are provided, the first one will be the setup code, which will not be timed. The remaining codeblocks will be joined together and timed. + By default your code is run on Python's 3.11 beta release, to assist with testing. If you + run into issues related to this Python version, you can request the bot to use Python + 3.10 by specifying the `python_version` arg and setting it to `3.10`. + We've done our best to make this sandboxed, but do let us know if you manage to find an issue with it! """ + python_version = python_version or "3.11" code, args = self.prepare_timeit_input(code) - await self.run_job("timeit", ctx, code=code, args=args) + await self.run_job("timeit", ctx, python_version, code=code, args=args) def predicate_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> bool: |