diff options
Diffstat (limited to 'bot/exts')
| -rw-r--r-- | bot/exts/easter/april_fools_vids.py | 26 | ||||
| -rw-r--r-- | bot/exts/easter/egg_decorating.py | 4 | ||||
| -rw-r--r-- | bot/exts/evergreen/battleship.py | 2 | ||||
| -rw-r--r-- | bot/exts/evergreen/catify.py | 87 | ||||
| -rw-r--r-- | bot/exts/evergreen/error_handler.py | 10 | ||||
| -rw-r--r-- | bot/exts/evergreen/fun.py | 3 | ||||
| -rw-r--r-- | bot/exts/evergreen/issues.py | 5 | ||||
| -rw-r--r-- | bot/exts/evergreen/ping.py | 27 | ||||
| -rw-r--r-- | bot/exts/evergreen/reddit.py | 6 | ||||
| -rw-r--r-- | bot/exts/evergreen/timed.py | 4 | ||||
| -rw-r--r-- | bot/exts/internal_eval/__init__.py | 10 | ||||
| -rw-r--r-- | bot/exts/internal_eval/_helpers.py | 249 | ||||
| -rw-r--r-- | bot/exts/internal_eval/_internal_eval.py | 176 | 
13 files changed, 582 insertions, 27 deletions
| diff --git a/bot/exts/easter/april_fools_vids.py b/bot/exts/easter/april_fools_vids.py index efe7e677..c7a3c014 100644 --- a/bot/exts/easter/april_fools_vids.py +++ b/bot/exts/easter/april_fools_vids.py @@ -1,36 +1,26 @@  import logging  import random  from json import load -from pathlib import Path  from discord.ext import commands  log = logging.getLogger(__name__) +with open("bot/resources/easter/april_fools_vids.json", encoding="utf-8") as f: +    ALL_VIDS = load(f) +  class AprilFoolVideos(commands.Cog):      """A cog for April Fools' that gets a random April Fools' video from Youtube.""" -    def __init__(self, bot: commands.Bot): -        self.bot = bot -        self.yt_vids = self.load_json() -        self.youtubers = ['google']  # will add more in future - -    @staticmethod -    def load_json() -> dict: -        """A function to load JSON data.""" -        p = Path('bot/resources/easter/april_fools_vids.json') -        with p.open(encoding="utf-8") as json_file: -            all_vids = load(json_file) -        return all_vids -      @commands.command(name='fool')      async def april_fools(self, ctx: commands.Context) -> None:          """Get a random April Fools' video from Youtube.""" -        random_youtuber = random.choice(self.youtubers) -        category = self.yt_vids[random_youtuber] -        random_vid = random.choice(category) -        await ctx.send(f"Check out this April Fools' video by {random_youtuber}.\n\n{random_vid['link']}") +        video = random.choice(ALL_VIDS) + +        channel, url = video["channel"], video["url"] + +        await ctx.send(f"Check out this April Fools' video by {channel}.\n\n{url}")  def setup(bot: commands.Bot) -> None: diff --git a/bot/exts/easter/egg_decorating.py b/bot/exts/easter/egg_decorating.py index b18e6636..a847388d 100644 --- a/bot/exts/easter/egg_decorating.py +++ b/bot/exts/easter/egg_decorating.py @@ -10,6 +10,8 @@ import discord  from PIL import Image  from discord.ext import commands +from bot.utils import helpers +  log = logging.getLogger(__name__)  with open(Path("bot/resources/evergreen/html_colours.json"), encoding="utf8") as f: @@ -65,7 +67,7 @@ class EggDecorating(commands.Cog):              if value:                  colours[idx] = discord.Colour(value)              else: -                invalid.append(colour) +                invalid.append(helpers.suppress_links(colour))          if len(invalid) > 1:              return await ctx.send(f"Sorry, I don't know these colours: {' '.join(invalid)}") diff --git a/bot/exts/evergreen/battleship.py b/bot/exts/evergreen/battleship.py index fa3fb35c..1681434f 100644 --- a/bot/exts/evergreen/battleship.py +++ b/bot/exts/evergreen/battleship.py @@ -227,7 +227,7 @@ class Game:              if message.content.lower() == "surrender":                  self.surrender = True                  return True -            self.match = re.match("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip()) +            self.match = re.fullmatch("([A-J]|[a-j]) ?((10)|[1-9])", message.content.strip())              if not self.match:                  self.bot.loop.create_task(message.add_reaction(CROSS_EMOJI))              return bool(self.match) diff --git a/bot/exts/evergreen/catify.py b/bot/exts/evergreen/catify.py new file mode 100644 index 00000000..d8a7442d --- /dev/null +++ b/bot/exts/evergreen/catify.py @@ -0,0 +1,87 @@ +import random +from contextlib import suppress +from typing import Optional + +from discord import AllowedMentions, Embed, Forbidden +from discord.ext import commands + +from bot.constants import Cats, Colours, NEGATIVE_REPLIES +from bot.utils import helpers + + +class Catify(commands.Cog): +    """Cog for the catify command.""" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot + +    @commands.command(aliases=["ᓚᘏᗢify", "ᓚᘏᗢ"]) +    async def catify(self, ctx: commands.Context, *, text: Optional[str]) -> None: +        """ +        Convert the provided text into a cat themed sentence by interspercing cats throughout text. + +        If no text is given then the users nickname is edited. +        """ +        if not text: +            display_name = ctx.author.display_name + +            if len(display_name) > 26: +                embed = Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description=( +                        "Your display name is too long to be catified! " +                        "Please change it to be under 26 characters." +                    ), +                    color=Colours.soft_red +                ) +                await ctx.send(embed=embed) +                return + +            else: +                display_name += f" | {random.choice(Cats.cats)}" + +                await ctx.send(f"Your catified nickname is: `{display_name}`", allowed_mentions=AllowedMentions.none()) + +                with suppress(Forbidden): +                    await ctx.author.edit(nick=display_name) +        else: +            if len(text) >= 1500: +                embed = Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description="Submitted text was too large! Please submit something under 1500 characters.", +                    color=Colours.soft_red +                ) +                await ctx.send(embed=embed) +                return + +            string_list = text.split() +            for index, name in enumerate(string_list): +                name = name.lower() +                if "cat" in name: +                    if random.randint(0, 5) == 5: +                        string_list[index] = name.replace("cat", f"**{random.choice(Cats.cats)}**") +                    else: +                        string_list[index] = name.replace("cat", random.choice(Cats.cats)) +                for element in Cats.cats: +                    if element in name: +                        string_list[index] = name.replace(element, "cat") + +            string_len = len(string_list) // 3 or len(string_list) + +            for _ in range(random.randint(1, string_len)): +                # insert cat at random index +                if random.randint(0, 5) == 5: +                    string_list.insert(random.randint(0, len(string_list)), f"**{random.choice(Cats.cats)}**") +                else: +                    string_list.insert(random.randint(0, len(string_list)), random.choice(Cats.cats)) + +            text = helpers.suppress_links(" ".join(string_list)) +            await ctx.send( +                f">>> {text}", +                allowed_mentions=AllowedMentions.none() +            ) + + +def setup(bot: commands.Bot) -> None: +    """Loads the catify cog.""" +    bot.add_cog(Catify(bot)) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 28902503..8db49748 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -46,6 +46,11 @@ class CommandErrorHandler(commands.Cog):              logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.")              return +        parent_command = "" +        if subctx := getattr(ctx, "subcontext", None): +            parent_command = f"{ctx.command} " +            ctx = subctx +          error = getattr(error, 'original', error)          logging.debug(              f"Error Encountered: {type(error).__name__} - {str(error)}, " @@ -63,8 +68,9 @@ class CommandErrorHandler(commands.Cog):          if isinstance(error, commands.UserInputError):              self.revert_cooldown_counter(ctx.command, ctx.message) +            usage = f"```{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}```"              embed = self.error_embed( -                f"Your input was invalid: {error}\n\nUsage:\n```{ctx.prefix}{ctx.command} {ctx.command.signature}```" +                f"Your input was invalid: {error}\n\nUsage:{usage}"              )              await ctx.send(embed=embed)              return @@ -95,7 +101,7 @@ class CommandErrorHandler(commands.Cog):              self.revert_cooldown_counter(ctx.command, ctx.message)              embed = self.error_embed(                  "The argument you provided was invalid: " -                f"{error}\n\nUsage:\n```{ctx.prefix}{ctx.command} {ctx.command.signature}```" +                f"{error}\n\nUsage:\n```{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}```"              )              await ctx.send(embed=embed)              return diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py index 101725da..7152d0cb 100644 --- a/bot/exts/evergreen/fun.py +++ b/bot/exts/evergreen/fun.py @@ -11,6 +11,7 @@ from discord.ext.commands import BadArgument, Bot, Cog, Context, MessageConverte  from bot import utils  from bot.constants import Client, Colours, Emojis +from bot.utils import helpers  log = logging.getLogger(__name__) @@ -83,6 +84,7 @@ class Fun(Cog):          if embed is not None:              embed = Fun._convert_embed(conversion_func, embed)          converted_text = conversion_func(text) +        converted_text = helpers.suppress_links(converted_text)          # Don't put >>> if only embed present          if converted_text:              converted_text = f">>> {converted_text.lstrip('> ')}" @@ -101,6 +103,7 @@ class Fun(Cog):          if embed is not None:              embed = Fun._convert_embed(conversion_func, embed)          converted_text = conversion_func(text) +        converted_text = helpers.suppress_links(converted_text)          # Don't put >>> if only embed present          if converted_text:              converted_text = f">>> {converted_text.lstrip('> ')}" diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index bb6273bb..a0316080 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -51,7 +51,10 @@ CODE_BLOCK_RE = re.compile(  MAXIMUM_ISSUES = 5  # Regex used when looking for automatic linking in messages -AUTOMATIC_REGEX = re.compile(r"((?P<org>.+?)\/)?(?P<repo>.+?)#(?P<number>.+?)") +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( +    r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +)  @dataclass diff --git a/bot/exts/evergreen/ping.py b/bot/exts/evergreen/ping.py new file mode 100644 index 00000000..97f8b34d --- /dev/null +++ b/bot/exts/evergreen/ping.py @@ -0,0 +1,27 @@ +from discord import Embed +from discord.ext import commands + +from bot.constants import Colours + + +class Ping(commands.Cog): +    """Ping the bot to see its latency and state.""" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot + +    @commands.command(name="ping") +    async def ping(self, ctx: commands.Context) -> None: +        """Ping the bot to see its latency and state.""" +        embed = Embed( +            title=":ping_pong: Pong!", +            colour=Colours.bright_green, +            description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms", +        ) + +        await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: +    """Cog load.""" +    bot.add_cog(Ping(bot)) diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py index 49127bea..2be511c8 100644 --- a/bot/exts/evergreen/reddit.py +++ b/bot/exts/evergreen/reddit.py @@ -54,16 +54,16 @@ class Reddit(commands.Cog):          if not posts:              return await ctx.send('No posts available!') -        if posts[1]["data"]["over_18"] is True: +        if posts[0]["data"]["over_18"] is True:              return await ctx.send( -                "You cannot access this Subreddit as it is ment for those who " +                "You cannot access this Subreddit as it is meant for those who "                  "are 18 years or older."              )          embed_titles = ""          # Chooses k unique random elements from a population sequence or set. -        random_posts = random.sample(posts, k=5) +        random_posts = random.sample(posts, k=min(len(posts), 5))          # -----------------------------------------------------------          # This code below is bound of change when the emojis are added. diff --git a/bot/exts/evergreen/timed.py b/bot/exts/evergreen/timed.py index 635ccb32..5f177fd6 100644 --- a/bot/exts/evergreen/timed.py +++ b/bot/exts/evergreen/timed.py @@ -21,7 +21,9 @@ class TimedCommands(commands.Cog):          """Time the command execution of a command."""          new_ctx = await self.create_execution_context(ctx, command) -        if not new_ctx.command: +        ctx.subcontext = new_ctx + +        if not ctx.subcontext.command:              help_command = f"{ctx.prefix}help"              error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands." diff --git a/bot/exts/internal_eval/__init__.py b/bot/exts/internal_eval/__init__.py new file mode 100644 index 00000000..695fa74d --- /dev/null +++ b/bot/exts/internal_eval/__init__.py @@ -0,0 +1,10 @@ +from bot.bot import Bot + + +def setup(bot: Bot) -> None: +    """Set up the Internal Eval extension.""" +    # Import the Cog at runtime to prevent side effects like defining +    # RedisCache instances too early. +    from ._internal_eval import InternalEval + +    bot.add_cog(InternalEval(bot)) diff --git a/bot/exts/internal_eval/_helpers.py b/bot/exts/internal_eval/_helpers.py new file mode 100644 index 00000000..3a50b9f3 --- /dev/null +++ b/bot/exts/internal_eval/_helpers.py @@ -0,0 +1,249 @@ +import ast +import collections +import contextlib +import functools +import inspect +import io +import logging +import sys +import traceback +import types +import typing + + +log = logging.getLogger(__name__) + +# A type alias to annotate the tuples returned from `sys.exc_info()` +ExcInfo = typing.Tuple[typing.Type[Exception], Exception, types.TracebackType] +Namespace = typing.Dict[str, typing.Any] + +# This will be used as an coroutine function wrapper for the code +# to be evaluated. The wrapper contains one `pass` statement which +# will be replaced with `ast` with the code that we want to have +# evaluated. +# The function redirects output and captures exceptions that were +# raised in the code we evaluate. The latter is used to provide a +# meaningful traceback to the end user. +EVAL_WRAPPER = """ +async def _eval_wrapper_function(): +    try: +        with contextlib.redirect_stdout(_eval_context.stdout): +            pass +        if '_value_last_expression' in locals(): +            if inspect.isawaitable(_value_last_expression): +                _value_last_expression = await _value_last_expression +            _eval_context._value_last_expression = _value_last_expression +        else: +            _eval_context._value_last_expression = None +    except Exception: +        _eval_context.exc_info = sys.exc_info() +    finally: +        _eval_context.locals = locals() +_eval_context.function = _eval_wrapper_function +""" +INTERNAL_EVAL_FRAMENAME = "<internal eval>" +EVAL_WRAPPER_FUNCTION_FRAMENAME = "_eval_wrapper_function" + + +def format_internal_eval_exception(exc_info: ExcInfo, code: str) -> str: +    """Format an exception caught while evaluation code by inserting lines.""" +    exc_type, exc_value, tb = exc_info +    stack_summary = traceback.StackSummary.extract(traceback.walk_tb(tb)) +    code = code.split("\n") + +    output = ["Traceback (most recent call last):"] +    for frame in stack_summary: +        if frame.filename == INTERNAL_EVAL_FRAMENAME: +            line = code[frame.lineno - 1].lstrip() + +            if frame.name == EVAL_WRAPPER_FUNCTION_FRAMENAME: +                name = INTERNAL_EVAL_FRAMENAME +            else: +                name = frame.name +        else: +            line = frame.line +            name = frame.name + +        output.append( +            f'  File "{frame.filename}", line {frame.lineno}, in {name}\n' +            f"    {line}" +        ) + +    output.extend(traceback.format_exception_only(exc_type, exc_value)) +    return "\n".join(output) + + +class EvalContext: +    """ +    Represents the current `internal eval` context. + +    The context remembers names set during earlier runs of `internal eval`. To +    clear the context, use the `.internal clear` command. +    """ + +    def __init__(self, context_vars: Namespace, local_vars: Namespace) -> None: +        self._locals = dict(local_vars) +        self.context_vars = dict(context_vars) + +        self.stdout = io.StringIO() +        self._value_last_expression = None +        self.exc_info = None +        self.code = "" +        self.function = None +        self.eval_tree = None + +    @property +    def dependencies(self) -> typing.Dict[str, typing.Any]: +        """ +        Return a mapping of the dependencies for the wrapper function. + +        By using a property descriptor, the mapping can't be accidentally +        mutated during evaluation. This ensures the dependencies are always +        available. +        """ +        return { +            "print": functools.partial(print, file=self.stdout), +            "contextlib": contextlib, +            "inspect": inspect, +            "sys": sys, +            "_eval_context": self, +            "_": self._value_last_expression, +        } + +    @property +    def locals(self) -> typing.Dict[str, typing.Any]: +        """Return a mapping of names->values needed for evaluation.""" +        return {**collections.ChainMap(self.dependencies, self.context_vars, self._locals)} + +    @locals.setter +    def locals(self, locals_: typing.Dict[str, typing.Any]) -> None: +        """Update the contextual mapping of names to values.""" +        log.trace(f"Updating {self._locals} with {locals_}") +        self._locals.update(locals_) + +    def prepare_eval(self, code: str) -> typing.Optional[str]: +        """Prepare an evaluation by processing the code and setting up the context.""" +        self.code = code + +        if not self.code: +            log.debug("No code was attached to the evaluation command") +            return "[No code detected]" + +        try: +            code_tree = ast.parse(code, filename=INTERNAL_EVAL_FRAMENAME) +        except SyntaxError: +            log.debug("Got a SyntaxError while parsing the eval code") +            return "".join(traceback.format_exception(*sys.exc_info(), limit=0)) + +        log.trace("Parsing the AST to see if there's a trailing expression we need to capture") +        code_tree = CaptureLastExpression(code_tree).capture() + +        log.trace("Wrapping the AST in the AST of the wrapper coroutine") +        eval_tree = WrapEvalCodeTree(code_tree).wrap() + +        self.eval_tree = eval_tree +        return None + +    async def run_eval(self) -> Namespace: +        """Run the evaluation and return the updated locals.""" +        log.trace("Compiling the AST to bytecode using `exec` mode") +        compiled_code = compile(self.eval_tree, filename=INTERNAL_EVAL_FRAMENAME, mode="exec") + +        log.trace("Executing the compiled code with the desired namespace environment") +        exec(compiled_code, self.locals)  # noqa: B102,S102 + +        log.trace("Awaiting the created evaluation wrapper coroutine.") +        await self.function() + +        log.trace("Returning the updated captured locals.") +        return self._locals + +    def format_output(self) -> str: +        """Format the output of the most recent evaluation.""" +        output = [] + +        log.trace(f"Getting output from stdout `{id(self.stdout)}`") +        stdout_text = self.stdout.getvalue() +        if stdout_text: +            log.trace("Appending output captured from stdout/print") +            output.append(stdout_text) + +        if self._value_last_expression is not None: +            log.trace("Appending the output of a captured trialing expression") +            output.append(f"[Captured] {self._value_last_expression!r}") + +        if self.exc_info: +            log.trace("Appending exception information") +            output.append(format_internal_eval_exception(self.exc_info, self.code)) + +        log.trace(f"Generated output: {output!r}") +        return "\n".join(output) or "[No output]" + + +class WrapEvalCodeTree(ast.NodeTransformer): +    """Wraps the AST of eval code with the wrapper function.""" + +    def __init__(self, eval_code_tree: ast.AST, *args, **kwargs) -> None: +        super().__init__(*args, **kwargs) +        self.eval_code_tree = eval_code_tree + +        # To avoid mutable aliasing, parse the WRAPPER_FUNC for each wrapping +        self.wrapper = ast.parse(EVAL_WRAPPER, filename=INTERNAL_EVAL_FRAMENAME) + +    def wrap(self) -> ast.AST: +        """Wrap the tree of the code by the tree of the wrapper function.""" +        new_tree = self.visit(self.wrapper) +        return ast.fix_missing_locations(new_tree) + +    def visit_Pass(self, node: ast.Pass) -> typing.List[ast.AST]:  # noqa: N802 +        """ +        Replace the `_ast.Pass` node in the wrapper function by the eval AST. + +        This method works on the assumption that there's a single `pass` +        statement in the wrapper function. +        """ +        return list(ast.iter_child_nodes(self.eval_code_tree)) + + +class CaptureLastExpression(ast.NodeTransformer): +    """Captures the return value from a loose expression.""" + +    def __init__(self, tree: ast.AST, *args, **kwargs) -> None: +        super().__init__(*args, **kwargs) +        self.tree = tree +        self.last_node = list(ast.iter_child_nodes(tree))[-1] + +    def visit_Expr(self, node: ast.Expr) -> typing.Union[ast.Expr, ast.Assign]:  # noqa: N802 +        """ +        Replace the Expr node that is last child node of Module with an assignment. + +        We use an assignment to capture the value of the last node, if it's a loose +        Expr node. Normally, the value of an Expr node is lost, meaning we don't get +        the output of such a last "loose" expression. By assigning it a name, we can +        retrieve it for our output. +        """ +        if node is not self.last_node: +            return node + +        log.trace("Found a trailing last expression in the evaluation code") + +        log.trace("Creating assignment statement with trailing expression as the right-hand side") +        right_hand_side = list(ast.iter_child_nodes(node))[0] + +        assignment = ast.Assign( +            targets=[ast.Name(id='_value_last_expression', ctx=ast.Store())], +            value=right_hand_side, +            lineno=node.lineno, +            col_offset=0, +        ) +        ast.fix_missing_locations(assignment) +        return assignment + +    def capture(self) -> ast.AST: +        """Capture the value of the last expression with an assignment.""" +        if not isinstance(self.last_node, ast.Expr): +            # We only have to replace a node if the very last node is an Expr node +            return self.tree + +        new_tree = self.visit(self.tree) +        return ast.fix_missing_locations(new_tree) diff --git a/bot/exts/internal_eval/_internal_eval.py b/bot/exts/internal_eval/_internal_eval.py new file mode 100644 index 00000000..757a2a1e --- /dev/null +++ b/bot/exts/internal_eval/_internal_eval.py @@ -0,0 +1,176 @@ +import logging +import re +import textwrap +import typing + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Roles +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from ._helpers import EvalContext + +__all__ = ["InternalEval"] + +log = logging.getLogger(__name__) + +FORMATTED_CODE_REGEX = re.compile( +    r"(?P<delim>(?P<block>```)|``?)"        # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block +    r"(?(block)(?:(?P<lang>[a-z]+)\n)?)"    # if we're in a block, match optional language (only letters plus newline) +    r"(?:[ \t]*\n)*"                        # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all code inside the markup +    r"\s*"                                  # any more whitespace before the end of the code markup +    r"(?P=delim)",                          # match the exact same delimiter from the start again +    re.DOTALL | re.IGNORECASE               # "." also matches newlines, case insensitive +) + +RAW_CODE_REGEX = re.compile( +    r"^(?:[ \t]*\n)*"                       # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all the rest as code +    r"\s*$",                                # any trailing whitespace until the end of the string +    re.DOTALL                               # "." also matches newlines +) + + +class InternalEval(commands.Cog): +    """Top secret code evaluation for admins and owners.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.locals = {} + +    @staticmethod +    def shorten_output( +            output: str, +            max_length: int = 1900, +            placeholder: str = "\n[output truncated]" +    ) -> str: +        """ +        Shorten the `output` so it's shorter than `max_length`. + +        There are three tactics for this, tried in the following order: +        - Shorten the output on a line-by-line basis +        - Shorten the output on any whitespace character +        - Shorten the output solely on character count +        """ +        max_length = max_length - len(placeholder) + +        shortened_output = [] +        char_count = 0 +        for line in output.split("\n"): +            if char_count + len(line) > max_length: +                break +            shortened_output.append(line) +            char_count += len(line) + 1  # account for (possible) line ending + +        if shortened_output: +            shortened_output.append(placeholder) +            return "\n".join(shortened_output) + +        shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder) + +        if shortened_output.strip() == placeholder.strip(): +            # `textwrap` was unable to find whitespace to shorten on, so it has +            # reduced the output to just the placeholder. Let's shorten based on +            # characters instead. +            shortened_output = output[:max_length] + placeholder + +        return shortened_output + +    async def _upload_output(self, output: str) -> typing.Optional[str]: +        """Upload `internal eval` output to our pastebin and return the url.""" +        try: +            async with self.bot.http_session.post( +                "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True +            ) as resp: +                data = await resp.json() + +            if "key" in data: +                return f"https://paste.pythondiscord.com/{data['key']}" +        except Exception: +            # 400 (Bad Request) means there are too many characters +            log.exception("Failed to upload `internal eval` output to paste service!") + +    async def _send_output(self, ctx: commands.Context, output: str) -> None: +        """Send the `internal eval` output to the command invocation context.""" +        upload_message = "" +        if len(output) >= 1980: +            # The output is too long, let's truncate it for in-channel output and +            # upload the complete output to the paste service. +            url = await self._upload_output(output) + +            if url: +                upload_message = f"\nFull output here: {url}" +            else: +                upload_message = "\n:warning: Failed to upload full output!" + +            output = self.shorten_output(output) + +        await ctx.send(f"```py\n{output}\n```{upload_message}") + +    async def _eval(self, ctx: commands.Context, code: str) -> None: +        """Evaluate the `code` in the current evaluation context.""" +        context_vars = { +            "message": ctx.message, +            "author": ctx.message.author, +            "channel": ctx.channel, +            "guild": ctx.guild, +            "ctx": ctx, +            "self": self, +            "bot": self.bot, +            "discord": discord, +        } + +        eval_context = EvalContext(context_vars, self.locals) + +        log.trace("Preparing the evaluation by parsing the AST of the code") +        error = eval_context.prepare_eval(code) + +        if error: +            log.trace("The code can't be evaluated due to an error") +            await ctx.send(f"```py\n{error}\n```") +            return + +        log.trace("Evaluate the AST we've generated for the evaluation") +        new_locals = await eval_context.run_eval() + +        log.trace("Updating locals with those set during evaluation") +        self.locals.update(new_locals) + +        log.trace("Sending the formatted output back to the context") +        await self._send_output(ctx, eval_context.format_output()) + +    @commands.group(name='internal', aliases=('int',)) +    @with_role(Roles.admin) +    async def internal_group(self, ctx: commands.Context) -> None: +        """Internal commands. Top secret!""" +        if not ctx.invoked_subcommand: +            await invoke_help_command(ctx) + +    @internal_group.command(name='eval', aliases=('e',)) +    @with_role(Roles.admin) +    async def eval(self, ctx: commands.Context, *, code: str) -> None: +        """Run eval in a REPL-like format.""" +        if match := list(FORMATTED_CODE_REGEX.finditer(code)): +            blocks = [block for block in match if block.group("block")] + +            if len(blocks) > 1: +                code = '\n'.join(block.group("code") for block in blocks) +            else: +                match = match[0] if len(blocks) == 0 else blocks[0] +                code, block, lang, delim = match.group("code", "block", "lang", "delim") + +        else: +            code = RAW_CODE_REGEX.fullmatch(code).group("code") + +        code = textwrap.dedent(code) +        await self._eval(ctx, code) + +    @internal_group.command(name='reset', aliases=("clear", "exit", "r", "c")) +    @with_role(Roles.admin) +    async def reset(self, ctx: commands.Context) -> None: +        """Reset the context and locals of the eval session.""" +        self.locals = {} +        await ctx.send("The evaluation context was reset.") | 
