aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/internal_eval/internal_eval.py
blob: f68129428cef93a908e6cff6cb68c5606e07bdb8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import logging
import re
import textwrap
import typing

import discord
from discord.ext import commands

from rattlesnake.bot import Rattlesnake
from rattlesnake.constants import ADMIN_ROLES
from rattlesnake.utils import in_whitelist
from .helpers import EvalContext

__all__ = ["InternalEval"]

log = logging.getLogger("rattlesnake.exts.admin_tools.internal_eval")

CODEBLOCK_REGEX = re.compile(r"(^```(py(thon)?)?\n)|(```$)")


class InternalEval(commands.Cog):
    """Top secret code evaluation for admins and owners."""

    def __init__(self, bot: Rattlesnake):
        self.bot = bot
        self.locals = {}

    @staticmethod
    def shorten_output(
            output: str,
            max_length: int = 1900,
            placeholder: str = "\n[output truncated]"
    ) -> str:
        """
        Shorten the `output` so it's shorter than `max_length`.
        There are three tactics for this, tried in the following order:
        - Shorten the output on a line-by-line basis
        - Shorten the output on any whitespace character
        - Shorten the output solely on character count
        """
        max_length = max_length - len(placeholder)

        shortened_output = []
        char_count = 0
        for line in output.split("\n"):
            if char_count + len(line) > max_length:
                break
            shortened_output.append(line)
            char_count += len(line) + 1  # account for (possible) line ending

        if shortened_output:
            shortened_output.append(placeholder)
            return "\n".join(shortened_output)

        shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder)

        if shortened_output.strip() == placeholder.strip():
            # `textwrap` was unable to find whitespace to shorten on, so it has
            # reduced the output to just the placeholder. Let's shorten based on
            # characters instead.
            shortened_output = output[:max_length] + placeholder

        return shortened_output

    async def _upload_output(self, output: str) -> typing.Optional[str]:
        """Upload `internal eval` output to our pastebin and return the url."""
        try:
            async with self.bot.http_session.post(
                "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True
            ) as resp:
                data = await resp.json()

            if "key" in data:
                return f"https://paste.pythondiscord.com/{data['key']}"
        except Exception:
            # 400 (Bad Request) means there are too many characters
            log.exception("Failed to upload `internal eval` output to paste service!")

    async def _send_output(self, ctx: commands.Context, output: str) -> None:
        """Send the `internal eval` output to the command invocation context."""
        upload_message = ""
        if len(output) >= 1980:
            # The output is too long, let's truncate it for in-channel output and
            # upload the complete output to the paste service.
            url = await self._upload_output(output)

            if url:
                upload_message = f"\nFull output here: {url}"
            else:
                upload_message = "\n:warning: Failed to upload full output!"

            output = self.shorten_output(output)

        await ctx.send(f"```py\n{output}```{upload_message}")

    async def _eval(self, ctx: commands.Context, code: str) -> None:
        """Evaluate the `code` in the current evaluation context."""
        if code.startswith("exit"):
            self.locals = {}
            await ctx.send("The evaluation context was reset.")
            return

        context_vars = {
            "message": ctx.message,
            "author": ctx.message.author,
            "channel": ctx.channel,
            "guild": ctx.guild,
            "ctx": ctx,
            "self": self,
            "bot": self.bot,
            "discord": discord,
        }

        eval_context = EvalContext(context_vars, self.locals)

        log.trace("Preparing the evaluation by parsing the AST of the code")
        error = eval_context.prepare_eval(code)

        if error:
            log.trace("The code can't be evaluated due to an error")
            await ctx.send(f"```py\n{error}\n```")
            return

        log.trace("Evaluate the AST we've generated for the evaluation")
        new_locals = await eval_context.run_eval()

        log.trace("Updating locals with those set during evaluation")
        self.locals.update(new_locals)

        log.trace("Sending the formatted output back to the context")
        await self._send_output(ctx, eval_context.format_output())

    @commands.group(name='internal', aliases=('int',))
    @in_whitelist(roles=ADMIN_ROLES)
    async def internal_group(self, ctx: commands.Context) -> None:
        """Internal commands. Top secret!"""
        if not ctx.invoked_subcommand:
            await ctx.send_help(ctx.command)

    @internal_group.command(name='eval', aliases=('e',))
    @in_whitelist(roles=ADMIN_ROLES)
    async def eval(self, ctx: commands.Context, *, code: str) -> None:
        """Run eval in a REPL-like format."""
        code = CODEBLOCK_REGEX.sub("", code.strip())
        await self._eval(ctx, code)

    @internal_group.command(name='reset', aliases=("clear", "exit", "r", "c"))
    @in_whitelist(roles=ADMIN_ROLES)
    async def reset(self, ctx: commands.Context) -> None:
        """Run eval in a REPL-like format."""
        self.locals = {}
        await ctx.send("The evaluation context was reset.")