import logging import re import textwrap import typing import discord from discord.ext import commands from rattlesnake.bot import Rattlesnake from rattlesnake.constants import ADMIN_ROLES from rattlesnake.utils import in_whitelist from .helpers import EvalContext __all__ = ["InternalEval"] log = logging.getLogger("rattlesnake.exts.admin_tools.internal_eval") CODEBLOCK_REGEX = re.compile(r"(^```(py(thon)?)?\n)|(```$)") class InternalEval(commands.Cog): """Top secret code evaluation for admins and owners.""" def __init__(self, bot: Rattlesnake): self.bot = bot self.locals = {} @staticmethod def shorten_output( output: str, max_length: int = 1900, placeholder: str = "\n[output truncated]" ) -> str: """ Shorten the `output` so it's shorter than `max_length`. There are three tactics for this, tried in the following order: - Shorten the output on a line-by-line basis - Shorten the output on any whitespace character - Shorten the output solely on character count """ max_length = max_length - len(placeholder) shortened_output = [] char_count = 0 for line in output.split("\n"): if char_count + len(line) > max_length: break shortened_output.append(line) char_count += len(line) + 1 # account for (possible) line ending if shortened_output: shortened_output.append(placeholder) return "\n".join(shortened_output) shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder) if shortened_output.strip() == placeholder.strip(): # `textwrap` was unable to find whitespace to shorten on, so it has # reduced the output to just the placeholder. Let's shorten based on # characters instead. shortened_output = output[:max_length] + placeholder return shortened_output async def _upload_output(self, output: str) -> typing.Optional[str]: """Upload `internal eval` output to our pastebin and return the url.""" try: async with self.bot.http_session.post( "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True ) as resp: data = await resp.json() if "key" in data: return f"https://paste.pythondiscord.com/{data['key']}" except Exception: # 400 (Bad Request) means there are too many characters log.exception("Failed to upload `internal eval` output to paste service!") async def _send_output(self, ctx: commands.Context, output: str) -> None: """Send the `internal eval` output to the command invocation context.""" upload_message = "" if len(output) >= 1980: # The output is too long, let's truncate it for in-channel output and # upload the complete output to the paste service. url = await self._upload_output(output) if url: upload_message = f"\nFull output here: {url}" else: upload_message = "\n:warning: Failed to upload full output!" output = self.shorten_output(output) await ctx.send(f"```py\n{output}```{upload_message}") async def _eval(self, ctx: commands.Context, code: str) -> None: """Evaluate the `code` in the current evaluation context.""" if code.startswith("exit"): self.locals = {} await ctx.send("The evaluation context was reset.") return context_vars = { "message": ctx.message, "author": ctx.message.author, "channel": ctx.channel, "guild": ctx.guild, "ctx": ctx, "self": self, "bot": self.bot, "discord": discord, } eval_context = EvalContext(context_vars, self.locals) log.trace("Preparing the evaluation by parsing the AST of the code") error = eval_context.prepare_eval(code) if error: log.trace("The code can't be evaluated due to an error") await ctx.send(f"```py\n{error}\n```") return log.trace("Evaluate the AST we've generated for the evaluation") new_locals = await eval_context.run_eval() log.trace("Updating locals with those set during evaluation") self.locals.update(new_locals) log.trace("Sending the formatted output back to the context") await self._send_output(ctx, eval_context.format_output()) @commands.group(name='internal', aliases=('int',)) @in_whitelist(roles=ADMIN_ROLES) async def internal_group(self, ctx: commands.Context) -> None: """Internal commands. Top secret!""" if not ctx.invoked_subcommand: await ctx.send_help(ctx.command) @internal_group.command(name='eval', aliases=('e',)) @in_whitelist(roles=ADMIN_ROLES) async def eval(self, ctx: commands.Context, *, code: str) -> None: """Run eval in a REPL-like format.""" code = CODEBLOCK_REGEX.sub("", code.strip()) await self._eval(ctx, code) @internal_group.command(name='reset', aliases=("clear", "exit", "r", "c")) @in_whitelist(roles=ADMIN_ROLES) async def reset(self, ctx: commands.Context) -> None: """Run eval in a REPL-like format.""" self.locals = {} await ctx.send("The evaluation context was reset.")