diff options
| -rw-r--r-- | bot/exts/internal_eval/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/internal_eval/internal_eval.py | 152 | 
2 files changed, 152 insertions, 0 deletions
diff --git a/bot/exts/internal_eval/__init__.py b/bot/exts/internal_eval/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/internal_eval/__init__.py diff --git a/bot/exts/internal_eval/internal_eval.py b/bot/exts/internal_eval/internal_eval.py new file mode 100644 index 00000000..f6812942 --- /dev/null +++ b/bot/exts/internal_eval/internal_eval.py @@ -0,0 +1,152 @@ +import logging +import re +import textwrap +import typing + +import discord +from discord.ext import commands + +from rattlesnake.bot import Rattlesnake +from rattlesnake.constants import ADMIN_ROLES +from rattlesnake.utils import in_whitelist +from .helpers import EvalContext + +__all__ = ["InternalEval"] + +log = logging.getLogger("rattlesnake.exts.admin_tools.internal_eval") + +CODEBLOCK_REGEX = re.compile(r"(^```(py(thon)?)?\n)|(```$)") + + +class InternalEval(commands.Cog): +    """Top secret code evaluation for admins and owners.""" + +    def __init__(self, bot: Rattlesnake): +        self.bot = bot +        self.locals = {} + +    @staticmethod +    def shorten_output( +            output: str, +            max_length: int = 1900, +            placeholder: str = "\n[output truncated]" +    ) -> str: +        """ +        Shorten the `output` so it's shorter than `max_length`. +        There are three tactics for this, tried in the following order: +        - Shorten the output on a line-by-line basis +        - Shorten the output on any whitespace character +        - Shorten the output solely on character count +        """ +        max_length = max_length - len(placeholder) + +        shortened_output = [] +        char_count = 0 +        for line in output.split("\n"): +            if char_count + len(line) > max_length: +                break +            shortened_output.append(line) +            char_count += len(line) + 1  # account for (possible) line ending + +        if shortened_output: +            shortened_output.append(placeholder) +            return "\n".join(shortened_output) + +        shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder) + +        if shortened_output.strip() == placeholder.strip(): +            # `textwrap` was unable to find whitespace to shorten on, so it has +            # reduced the output to just the placeholder. Let's shorten based on +            # characters instead. +            shortened_output = output[:max_length] + placeholder + +        return shortened_output + +    async def _upload_output(self, output: str) -> typing.Optional[str]: +        """Upload `internal eval` output to our pastebin and return the url.""" +        try: +            async with self.bot.http_session.post( +                "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True +            ) as resp: +                data = await resp.json() + +            if "key" in data: +                return f"https://paste.pythondiscord.com/{data['key']}" +        except Exception: +            # 400 (Bad Request) means there are too many characters +            log.exception("Failed to upload `internal eval` output to paste service!") + +    async def _send_output(self, ctx: commands.Context, output: str) -> None: +        """Send the `internal eval` output to the command invocation context.""" +        upload_message = "" +        if len(output) >= 1980: +            # The output is too long, let's truncate it for in-channel output and +            # upload the complete output to the paste service. +            url = await self._upload_output(output) + +            if url: +                upload_message = f"\nFull output here: {url}" +            else: +                upload_message = "\n:warning: Failed to upload full output!" + +            output = self.shorten_output(output) + +        await ctx.send(f"```py\n{output}```{upload_message}") + +    async def _eval(self, ctx: commands.Context, code: str) -> None: +        """Evaluate the `code` in the current evaluation context.""" +        if code.startswith("exit"): +            self.locals = {} +            await ctx.send("The evaluation context was reset.") +            return + +        context_vars = { +            "message": ctx.message, +            "author": ctx.message.author, +            "channel": ctx.channel, +            "guild": ctx.guild, +            "ctx": ctx, +            "self": self, +            "bot": self.bot, +            "discord": discord, +        } + +        eval_context = EvalContext(context_vars, self.locals) + +        log.trace("Preparing the evaluation by parsing the AST of the code") +        error = eval_context.prepare_eval(code) + +        if error: +            log.trace("The code can't be evaluated due to an error") +            await ctx.send(f"```py\n{error}\n```") +            return + +        log.trace("Evaluate the AST we've generated for the evaluation") +        new_locals = await eval_context.run_eval() + +        log.trace("Updating locals with those set during evaluation") +        self.locals.update(new_locals) + +        log.trace("Sending the formatted output back to the context") +        await self._send_output(ctx, eval_context.format_output()) + +    @commands.group(name='internal', aliases=('int',)) +    @in_whitelist(roles=ADMIN_ROLES) +    async def internal_group(self, ctx: commands.Context) -> None: +        """Internal commands. Top secret!""" +        if not ctx.invoked_subcommand: +            await ctx.send_help(ctx.command) + +    @internal_group.command(name='eval', aliases=('e',)) +    @in_whitelist(roles=ADMIN_ROLES) +    async def eval(self, ctx: commands.Context, *, code: str) -> None: +        """Run eval in a REPL-like format.""" +        code = CODEBLOCK_REGEX.sub("", code.strip()) +        await self._eval(ctx, code) + +    @internal_group.command(name='reset', aliases=("clear", "exit", "r", "c")) +    @in_whitelist(roles=ADMIN_ROLES) +    async def reset(self, ctx: commands.Context) -> None: +        """Run eval in a REPL-like format.""" +        self.locals = {} +        await ctx.send("The evaluation context was reset.")  |