aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/core
diff options
context:
space:
mode:
authorGravatar Janine vN <[email protected]>2021-09-05 00:06:04 -0400
committerGravatar Janine vN <[email protected]>2021-09-05 00:06:04 -0400
commitf0b5c14e1f59e5135f27a4966021f30c77d1fc7d (patch)
tree4c83b68de1e56e6fea10b26b489d493030dbf88a /bot/exts/core
parentUpdate paths to new resource links (diff)
Move internal eval and rename utils to core
Part of this restructure involves splitting out the massive evergreen folder into a `fun` folder and then a `utilities` folder. To help with that we've rename the `util` folder to `core`. The core functions to run the bot have been moved into this folder. `.source`, `.ping`, and `.int e` have been moved into this folder.
Diffstat (limited to 'bot/exts/core')
-rw-r--r--bot/exts/core/__init__.py0
-rw-r--r--bot/exts/core/error_handler.py182
-rw-r--r--bot/exts/core/extensions.py266
-rw-r--r--bot/exts/core/help.py562
-rw-r--r--bot/exts/core/internal_eval/__init__.py10
-rw-r--r--bot/exts/core/internal_eval/_helpers.py248
-rw-r--r--bot/exts/core/internal_eval/_internal_eval.py179
-rw-r--r--bot/exts/core/ping.py45
-rw-r--r--bot/exts/core/source.py85
9 files changed, 1577 insertions, 0 deletions
diff --git a/bot/exts/core/__init__.py b/bot/exts/core/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/core/__init__.py
diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py
new file mode 100644
index 00000000..fd2123e7
--- /dev/null
+++ b/bot/exts/core/error_handler.py
@@ -0,0 +1,182 @@
+import difflib
+import logging
+import math
+import random
+from collections.abc import Iterable
+from typing import Union
+
+from discord import Embed, Message
+from discord.ext import commands
+from sentry_sdk import push_scope
+
+from bot.bot import Bot
+from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput
+from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure
+from bot.utils.exceptions import APIError, UserNotPlayingError
+
+log = logging.getLogger(__name__)
+
+
+QUESTION_MARK_ICON = "https://cdn.discordapp.com/emojis/512367613339369475.png"
+
+
+class CommandErrorHandler(commands.Cog):
+ """A error handler for the PythonDiscord server."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def revert_cooldown_counter(command: commands.Command, message: Message) -> None:
+ """Undoes the last cooldown counter for user-error cases."""
+ if command._buckets.valid:
+ bucket = command._buckets.get_bucket(message)
+ bucket._tokens = min(bucket.rate, bucket._tokens + 1)
+ logging.debug("Cooldown counter reverted as the command was not used correctly.")
+
+ @staticmethod
+ def error_embed(message: str, title: Union[Iterable, str] = ERROR_REPLIES) -> Embed:
+ """Build a basic embed with red colour and either a random error title or a title provided."""
+ embed = Embed(colour=Colours.soft_red)
+ if isinstance(title, str):
+ embed.title = title
+ else:
+ embed.title = random.choice(title)
+ embed.description = message
+ return embed
+
+ @commands.Cog.listener()
+ async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
+ """Activates when a command raises an error."""
+ if getattr(error, "handled", False):
+ logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.")
+ return
+
+ parent_command = ""
+ if subctx := getattr(ctx, "subcontext", None):
+ parent_command = f"{ctx.command} "
+ ctx = subctx
+
+ error = getattr(error, "original", error)
+ logging.debug(
+ f"Error Encountered: {type(error).__name__} - {str(error)}, "
+ f"Command: {ctx.command}, "
+ f"Author: {ctx.author}, "
+ f"Channel: {ctx.channel}"
+ )
+
+ if isinstance(error, commands.CommandNotFound):
+ await self.send_command_suggestion(ctx, ctx.invoked_with)
+ return
+
+ if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)):
+ await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5)
+ return
+
+ if isinstance(error, commands.UserInputError):
+ self.revert_cooldown_counter(ctx.command, ctx.message)
+ usage = f"```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```"
+ embed = self.error_embed(
+ f"Your input was invalid: {error}\n\nUsage:{usage}"
+ )
+ await ctx.send(embed=embed)
+ return
+
+ if isinstance(error, commands.CommandOnCooldown):
+ mins, secs = divmod(math.ceil(error.retry_after), 60)
+ embed = self.error_embed(
+ f"This command is on cooldown:\nPlease retry in {mins} minutes {secs} seconds.",
+ NEGATIVE_REPLIES
+ )
+ await ctx.send(embed=embed, delete_after=7.5)
+ return
+
+ if isinstance(error, commands.DisabledCommand):
+ await ctx.send(embed=self.error_embed("This command has been disabled.", NEGATIVE_REPLIES))
+ return
+
+ if isinstance(error, commands.NoPrivateMessage):
+ await ctx.send(
+ embed=self.error_embed(
+ f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!",
+ NEGATIVE_REPLIES
+ )
+ )
+ return
+
+ if isinstance(error, commands.BadArgument):
+ self.revert_cooldown_counter(ctx.command, ctx.message)
+ embed = self.error_embed(
+ "The argument you provided was invalid: "
+ f"{error}\n\nUsage:\n```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```"
+ )
+ await ctx.send(embed=embed)
+ return
+
+ if isinstance(error, commands.CheckFailure):
+ await ctx.send(embed=self.error_embed("You are not authorized to use this command.", NEGATIVE_REPLIES))
+ return
+
+ if isinstance(error, UserNotPlayingError):
+ await ctx.send("Game not found.")
+ return
+
+ if isinstance(error, APIError):
+ await ctx.send(
+ embed=self.error_embed(
+ f"There was an error when communicating with the {error.api}",
+ NEGATIVE_REPLIES
+ )
+ )
+ return
+
+ with push_scope() as scope:
+ scope.user = {
+ "id": ctx.author.id,
+ "username": str(ctx.author)
+ }
+
+ scope.set_tag("command", ctx.command.qualified_name)
+ scope.set_tag("message_id", ctx.message.id)
+ scope.set_tag("channel_id", ctx.channel.id)
+
+ scope.set_extra("full_message", ctx.message.content)
+
+ if ctx.guild is not None:
+ scope.set_extra("jump_to", ctx.message.jump_url)
+
+ log.exception(f"Unhandled command error: {str(error)}", exc_info=error)
+
+ async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None:
+ """Sends user similar commands if any can be found."""
+ raw_commands = []
+ for cmd in self.bot.walk_commands():
+ if not cmd.hidden:
+ raw_commands += (cmd.name, *cmd.aliases)
+ if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1):
+ similar_command_name = similar_command_data[0]
+ similar_command = self.bot.get_command(similar_command_name)
+
+ if not similar_command:
+ return
+
+ log_msg = "Cancelling attempt to suggest a command due to failed checks."
+ try:
+ if not await similar_command.can_run(ctx):
+ log.debug(log_msg)
+ return
+ except commands.errors.CommandError as cmd_error:
+ log.debug(log_msg)
+ await self.on_command_error(ctx, cmd_error)
+ return
+
+ misspelled_content = ctx.message.content
+ e = Embed()
+ e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON)
+ e.description = misspelled_content.replace(command_name, similar_command_name, 1)
+ await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay)
+
+
+def setup(bot: Bot) -> None:
+ """Load the ErrorHandler cog."""
+ bot.add_cog(CommandErrorHandler(bot))
diff --git a/bot/exts/core/extensions.py b/bot/exts/core/extensions.py
new file mode 100644
index 00000000..424bacac
--- /dev/null
+++ b/bot/exts/core/extensions.py
@@ -0,0 +1,266 @@
+import functools
+import logging
+from collections.abc import Mapping
+from enum import Enum
+from typing import Optional
+
+from discord import Colour, Embed
+from discord.ext import commands
+from discord.ext.commands import Context, group
+
+from bot import exts
+from bot.bot import Bot
+from bot.constants import Client, Emojis, MODERATION_ROLES, Roles
+from bot.utils.checks import with_role_check
+from bot.utils.extensions import EXTENSIONS, invoke_help_command, unqualify
+from bot.utils.pagination import LinePaginator
+
+log = logging.getLogger(__name__)
+
+
+UNLOAD_BLACKLIST = {f"{exts.__name__}.utils.extensions"}
+BASE_PATH_LEN = len(exts.__name__.split("."))
+
+
+class Action(Enum):
+ """Represents an action to perform on an extension."""
+
+ # Need to be partial otherwise they are considered to be function definitions.
+ LOAD = functools.partial(Bot.load_extension)
+ UNLOAD = functools.partial(Bot.unload_extension)
+ RELOAD = functools.partial(Bot.reload_extension)
+
+
+class Extension(commands.Converter):
+ """
+ Fully qualify the name of an extension and ensure it exists.
+
+ The * and ** values bypass this when used with the reload command.
+ """
+
+ async def convert(self, ctx: Context, argument: str) -> str:
+ """Fully qualify the name of an extension and ensure it exists."""
+ # Special values to reload all extensions
+ if argument == "*" or argument == "**":
+ return argument
+
+ argument = argument.lower()
+
+ if argument in EXTENSIONS:
+ return argument
+ elif (qualified_arg := f"{exts.__name__}.{argument}") in EXTENSIONS:
+ return qualified_arg
+
+ matches = []
+ for ext in EXTENSIONS:
+ if argument == unqualify(ext):
+ matches.append(ext)
+
+ if len(matches) > 1:
+ matches.sort()
+ names = "\n".join(matches)
+ raise commands.BadArgument(
+ f":x: `{argument}` is an ambiguous extension name. "
+ f"Please use one of the following fully-qualified names.```\n{names}\n```"
+ )
+ elif matches:
+ return matches[0]
+ else:
+ raise commands.BadArgument(f":x: Could not find the extension `{argument}`.")
+
+
+class Extensions(commands.Cog):
+ """Extension management commands."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True)
+ async def extensions_group(self, ctx: Context) -> None:
+ """Load, unload, reload, and list loaded extensions."""
+ await invoke_help_command(ctx)
+
+ @extensions_group.command(name="load", aliases=("l",))
+ async def load_command(self, ctx: Context, *extensions: Extension) -> None:
+ r"""
+ Load extensions given their fully qualified or unqualified names.
+
+ If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded.
+ """ # noqa: W605
+ if not extensions:
+ await invoke_help_command(ctx)
+ return
+
+ if "*" in extensions or "**" in extensions:
+ extensions = set(EXTENSIONS) - set(self.bot.extensions.keys())
+
+ msg = self.batch_manage(Action.LOAD, *extensions)
+ await ctx.send(msg)
+
+ @extensions_group.command(name="unload", aliases=("ul",))
+ async def unload_command(self, ctx: Context, *extensions: Extension) -> None:
+ r"""
+ Unload currently loaded extensions given their fully qualified or unqualified names.
+
+ If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded.
+ """ # noqa: W605
+ if not extensions:
+ await invoke_help_command(ctx)
+ return
+
+ blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions))
+
+ if blacklisted:
+ msg = f":x: The following extension(s) may not be unloaded:```\n{blacklisted}\n```"
+ else:
+ if "*" in extensions or "**" in extensions:
+ extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST
+
+ msg = self.batch_manage(Action.UNLOAD, *extensions)
+
+ await ctx.send(msg)
+
+ @extensions_group.command(name="reload", aliases=("r",), root_aliases=("reload",))
+ async def reload_command(self, ctx: Context, *extensions: Extension) -> None:
+ r"""
+ Reload extensions given their fully qualified or unqualified names.
+
+ If an extension fails to be reloaded, it will be rolled-back to the prior working state.
+
+ If '\*' is given as the name, all currently loaded extensions will be reloaded.
+ If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded.
+ """ # noqa: W605
+ if not extensions:
+ await invoke_help_command(ctx)
+ return
+
+ if "**" in extensions:
+ extensions = EXTENSIONS
+ elif "*" in extensions:
+ extensions = set(self.bot.extensions.keys()) | set(extensions)
+ extensions.remove("*")
+
+ msg = self.batch_manage(Action.RELOAD, *extensions)
+
+ await ctx.send(msg)
+
+ @extensions_group.command(name="list", aliases=("all",))
+ async def list_command(self, ctx: Context) -> None:
+ """
+ Get a list of all extensions, including their loaded status.
+
+ Grey indicates that the extension is unloaded.
+ Green indicates that the extension is currently loaded.
+ """
+ embed = Embed(colour=Colour.blurple())
+ embed.set_author(
+ name="Extensions List",
+ url=Client.github_bot_repo,
+ icon_url=str(self.bot.user.display_avatar.url)
+ )
+
+ lines = []
+ categories = self.group_extension_statuses()
+ for category, extensions in sorted(categories.items()):
+ # Treat each category as a single line by concatenating everything.
+ # This ensures the paginator will not cut off a page in the middle of a category.
+ category = category.replace("_", " ").title()
+ extensions = "\n".join(sorted(extensions))
+ lines.append(f"**{category}**\n{extensions}\n")
+
+ log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.")
+ await LinePaginator.paginate(lines, ctx, embed, max_size=1200, empty=False)
+
+ def group_extension_statuses(self) -> Mapping[str, str]:
+ """Return a mapping of extension names and statuses to their categories."""
+ categories = {}
+
+ for ext in EXTENSIONS:
+ if ext in self.bot.extensions:
+ status = Emojis.status_online
+ else:
+ status = Emojis.status_offline
+
+ path = ext.split(".")
+ if len(path) > BASE_PATH_LEN + 1:
+ category = " - ".join(path[BASE_PATH_LEN:-1])
+ else:
+ category = "uncategorised"
+
+ categories.setdefault(category, []).append(f"{status} {path[-1]}")
+
+ return categories
+
+ def batch_manage(self, action: Action, *extensions: str) -> str:
+ """
+ Apply an action to multiple extensions and return a message with the results.
+
+ If only one extension is given, it is deferred to `manage()`.
+ """
+ if len(extensions) == 1:
+ msg, _ = self.manage(action, extensions[0])
+ return msg
+
+ verb = action.name.lower()
+ failures = {}
+
+ for extension in extensions:
+ _, error = self.manage(action, extension)
+ if error:
+ failures[extension] = error
+
+ emoji = ":x:" if failures else ":ok_hand:"
+ msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed."
+
+ if failures:
+ failures = "\n".join(f"{ext}\n {err}" for ext, err in failures.items())
+ msg += f"\nFailures:```\n{failures}\n```"
+
+ log.debug(f"Batch {verb}ed extensions.")
+
+ return msg
+
+ def manage(self, action: Action, ext: str) -> tuple[str, Optional[str]]:
+ """Apply an action to an extension and return the status message and any error message."""
+ verb = action.name.lower()
+ error_msg = None
+
+ try:
+ action.value(self.bot, ext)
+ except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded):
+ if action is Action.RELOAD:
+ # When reloading, just load the extension if it was not loaded.
+ return self.manage(Action.LOAD, ext)
+
+ msg = f":x: Extension `{ext}` is already {verb}ed."
+ log.debug(msg[4:])
+ except Exception as e:
+ if hasattr(e, "original"):
+ e = e.original
+
+ log.exception(f"Extension '{ext}' failed to {verb}.")
+
+ error_msg = f"{e.__class__.__name__}: {e}"
+ msg = f":x: Failed to {verb} extension `{ext}`:\n```\n{error_msg}\n```"
+ else:
+ msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`."
+ log.debug(msg[10:])
+
+ return msg, error_msg
+
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators and core developers to invoke the commands in this cog."""
+ return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developers)
+
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Handle BadArgument errors locally to prevent the help command from showing."""
+ if isinstance(error, commands.BadArgument):
+ await ctx.send(str(error))
+ error.handled = True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Extensions cog."""
+ bot.add_cog(Extensions(bot))
diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py
new file mode 100644
index 00000000..4b766b50
--- /dev/null
+++ b/bot/exts/core/help.py
@@ -0,0 +1,562 @@
+# Help command from Python bot. All commands that will be added to there in futures should be added to here too.
+import asyncio
+import itertools
+import logging
+from contextlib import suppress
+from typing import NamedTuple, Union
+
+from discord import Colour, Embed, HTTPException, Message, Reaction, User
+from discord.ext import commands
+from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context
+from rapidfuzz import process
+
+from bot import constants
+from bot.bot import Bot
+from bot.constants import Emojis
+from bot.utils.pagination import (
+ FIRST_EMOJI, LAST_EMOJI,
+ LEFT_EMOJI, LinePaginator, RIGHT_EMOJI,
+)
+
+DELETE_EMOJI = Emojis.trashcan
+
+REACTIONS = {
+ FIRST_EMOJI: "first",
+ LEFT_EMOJI: "back",
+ RIGHT_EMOJI: "next",
+ LAST_EMOJI: "end",
+ DELETE_EMOJI: "stop",
+}
+
+
+class Cog(NamedTuple):
+ """Show information about a Cog's name, description and commands."""
+
+ name: str
+ description: str
+ commands: list[Command]
+
+
+log = logging.getLogger(__name__)
+
+
+class HelpQueryNotFound(ValueError):
+ """
+ Raised when a HelpSession Query doesn't match a command or cog.
+
+ Contains the custom attribute of ``possible_matches``.
+ Instances of this object contain a dictionary of any command(s) that were close to matching the
+ query, where keys are the possible matched command names and values are the likeness match scores.
+ """
+
+ def __init__(self, arg: str, possible_matches: dict = None):
+ super().__init__(arg)
+ self.possible_matches = possible_matches
+
+
+class HelpSession:
+ """
+ An interactive session for bot and command help output.
+
+ Expected attributes include:
+ * title: str
+ The title of the help message.
+ * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command]
+ * description: str
+ The description of the query.
+ * pages: list[str]
+ A list of the help content split into manageable pages.
+ * message: `discord.Message`
+ The message object that's showing the help contents.
+ * destination: `discord.abc.Messageable`
+ Where the help message is to be sent to.
+ Cogs can be grouped into custom categories. All cogs with the same category will be displayed
+ under a single category name in the help output. Custom categories are defined inside the cogs
+ as a class attribute named `category`. A description can also be specified with the attribute
+ `category_description`. If a description is not found in at least one cog, the default will be
+ the regular description (class docstring) of the first cog found in the category.
+ """
+
+ def __init__(
+ self,
+ ctx: Context,
+ *command,
+ cleanup: bool = False,
+ only_can_run: bool = True,
+ show_hidden: bool = False,
+ max_lines: int = 15
+ ):
+ """Creates an instance of the HelpSession class."""
+ self._ctx = ctx
+ self._bot = ctx.bot
+ self.title = "Command Help"
+
+ # set the query details for the session
+ if command:
+ query_str = " ".join(command)
+ self.query = self._get_query(query_str)
+ self.description = self.query.description or self.query.help
+ else:
+ self.query = ctx.bot
+ self.description = self.query.description
+ self.author = ctx.author
+ self.destination = ctx.channel
+
+ # set the config for the session
+ self._cleanup = cleanup
+ self._only_can_run = only_can_run
+ self._show_hidden = show_hidden
+ self._max_lines = max_lines
+
+ # init session states
+ self._pages = None
+ self._current_page = 0
+ self.message = None
+ self._timeout_task = None
+ self.reset_timeout()
+
+ def _get_query(self, query: str) -> Union[Command, Cog]:
+ """Attempts to match the provided query with a valid command or cog."""
+ command = self._bot.get_command(query)
+ if command:
+ return command
+
+ # Find all cog categories that match.
+ cog_matches = []
+ description = None
+ for cog in self._bot.cogs.values():
+ if hasattr(cog, "category") and cog.category == query:
+ cog_matches.append(cog)
+ if hasattr(cog, "category_description"):
+ description = cog.category_description
+
+ # Try to search by cog name if no categories match.
+ if not cog_matches:
+ cog = self._bot.cogs.get(query)
+
+ # Don't consider it a match if the cog has a category.
+ if cog and not hasattr(cog, "category"):
+ cog_matches = [cog]
+
+ if cog_matches:
+ cog = cog_matches[0]
+ cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs
+
+ return Cog(
+ name=cog.category if hasattr(cog, "category") else cog.qualified_name,
+ description=description or cog.description,
+ commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list
+ )
+
+ self._handle_not_found(query)
+
+ def _handle_not_found(self, query: str) -> None:
+ """
+ Handles when a query does not match a valid command or cog.
+
+ Will pass on possible close matches along with the `HelpQueryNotFound` exception.
+ """
+ # Combine command and cog names
+ choices = list(self._bot.all_commands) + list(self._bot.cogs)
+
+ result = process.extract(query, choices, score_cutoff=90)
+
+ raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result))
+
+ async def timeout(self, seconds: int = 30) -> None:
+ """Waits for a set number of seconds, then stops the help session."""
+ await asyncio.sleep(seconds)
+ await self.stop()
+
+ def reset_timeout(self) -> None:
+ """Cancels the original timeout task and sets it again from the start."""
+ # cancel original if it exists
+ if self._timeout_task:
+ if not self._timeout_task.cancelled():
+ self._timeout_task.cancel()
+
+ # recreate the timeout task
+ self._timeout_task = self._bot.loop.create_task(self.timeout())
+
+ async def on_reaction_add(self, reaction: Reaction, user: User) -> None:
+ """Event handler for when reactions are added on the help message."""
+ # ensure it was the relevant session message
+ if reaction.message.id != self.message.id:
+ return
+
+ # ensure it was the session author who reacted
+ if user.id != self.author.id:
+ return
+
+ emoji = str(reaction.emoji)
+
+ # check if valid action
+ if emoji not in REACTIONS:
+ return
+
+ self.reset_timeout()
+
+ # Run relevant action method
+ action = getattr(self, f"do_{REACTIONS[emoji]}", None)
+ if action:
+ await action()
+
+ # remove the added reaction to prep for re-use
+ with suppress(HTTPException):
+ await self.message.remove_reaction(reaction, user)
+
+ async def on_message_delete(self, message: Message) -> None:
+ """Closes the help session when the help message is deleted."""
+ if message.id == self.message.id:
+ await self.stop()
+
+ async def prepare(self) -> None:
+ """Sets up the help session pages, events, message and reactions."""
+ await self.build_pages()
+
+ self._bot.add_listener(self.on_reaction_add)
+ self._bot.add_listener(self.on_message_delete)
+
+ await self.update_page()
+ self.add_reactions()
+
+ def add_reactions(self) -> None:
+ """Adds the relevant reactions to the help message based on if pagination is required."""
+ # if paginating
+ if len(self._pages) > 1:
+ for reaction in REACTIONS:
+ self._bot.loop.create_task(self.message.add_reaction(reaction))
+
+ # if single-page
+ else:
+ self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI))
+
+ def _category_key(self, cmd: Command) -> str:
+ """
+ Returns a cog name of a given command for use as a key for `sorted` and `groupby`.
+
+ A zero width space is used as a prefix for results with no cogs to force them last in ordering.
+ """
+ if cmd.cog:
+ try:
+ if cmd.cog.category:
+ return f"**{cmd.cog.category}**"
+ except AttributeError:
+ pass
+
+ return f"**{cmd.cog_name}**"
+ else:
+ return "**\u200bNo Category:**"
+
+ def _get_command_params(self, cmd: Command) -> str:
+ """
+ Returns the command usage signature.
+
+ This is a custom implementation of `command.signature` in order to format the command
+ signature without aliases.
+ """
+ results = []
+ for name, param in cmd.clean_params.items():
+
+ # if argument has a default value
+ if param.default is not param.empty:
+
+ if isinstance(param.default, str):
+ show_default = param.default
+ else:
+ show_default = param.default is not None
+
+ # if default is not an empty string or None
+ if show_default:
+ results.append(f"[{name}={param.default}]")
+ else:
+ results.append(f"[{name}]")
+
+ # if variable length argument
+ elif param.kind == param.VAR_POSITIONAL:
+ results.append(f"[{name}...]")
+
+ # if required
+ else:
+ results.append(f"<{name}>")
+
+ return f"{cmd.name} {' '.join(results)}"
+
+ async def build_pages(self) -> None:
+ """Builds the list of content pages to be paginated through in the help message, as a list of str."""
+ # Use LinePaginator to restrict embed line height
+ paginator = LinePaginator(prefix="", suffix="", max_lines=self._max_lines)
+
+ # show signature if query is a command
+ if isinstance(self.query, commands.Command):
+ await self._add_command_signature(paginator)
+
+ if isinstance(self.query, Cog):
+ paginator.add_line(f"**{self.query.name}**")
+
+ if self.description:
+ paginator.add_line(f"*{self.description}*")
+
+ # list all children commands of the queried object
+ if isinstance(self.query, (commands.GroupMixin, Cog)):
+ await self._list_child_commands(paginator)
+
+ self._pages = paginator.pages
+
+ async def _add_command_signature(self, paginator: LinePaginator) -> None:
+ prefix = constants.Client.prefix
+
+ signature = self._get_command_params(self.query)
+ parent = self.query.full_parent_name + " " if self.query.parent else ""
+ paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**")
+ aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases]
+ aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())]
+ aliases = ", ".join(sorted(aliases))
+ if aliases:
+ paginator.add_line(f"**Can also use:** {aliases}\n")
+ if not await self.query.can_run(self._ctx):
+ paginator.add_line("***You cannot run this command.***\n")
+
+ async def _list_child_commands(self, paginator: LinePaginator) -> None:
+ # remove hidden commands if session is not wanting hiddens
+ if not self._show_hidden:
+ filtered = [c for c in self.query.commands if not c.hidden]
+ else:
+ filtered = self.query.commands
+
+ # if after filter there are no commands, finish up
+ if not filtered:
+ self._pages = paginator.pages
+ return
+
+ if isinstance(self.query, Cog):
+ grouped = (("**Commands:**", self.query.commands),)
+
+ elif isinstance(self.query, commands.Command):
+ grouped = (("**Subcommands:**", self.query.commands),)
+
+ # otherwise sort and organise all commands into categories
+ else:
+ cat_sort = sorted(filtered, key=self._category_key)
+ grouped = itertools.groupby(cat_sort, key=self._category_key)
+
+ for category, cmds in grouped:
+ await self._format_command_category(paginator, category, list(cmds))
+
+ async def _format_command_category(self, paginator: LinePaginator, category: str, cmds: list[Command]) -> None:
+ cmds = sorted(cmds, key=lambda c: c.name)
+ cat_cmds = []
+ for command in cmds:
+ cat_cmds += await self._format_command(command)
+
+ # state var for if the category should be added next
+ print_cat = 1
+ new_page = True
+
+ for details in cat_cmds:
+
+ # keep details together, paginating early if it won"t fit
+ lines_adding = len(details.split("\n")) + print_cat
+ if paginator._linecount + lines_adding > self._max_lines:
+ paginator._linecount = 0
+ new_page = True
+ paginator.close_page()
+
+ # new page so print category title again
+ print_cat = 1
+
+ if print_cat:
+ if new_page:
+ paginator.add_line("")
+ paginator.add_line(category)
+ print_cat = 0
+
+ paginator.add_line(details)
+
+ async def _format_command(self, command: Command) -> list[str]:
+ # skip if hidden and hide if session is set to
+ if command.hidden and not self._show_hidden:
+ return []
+
+ # Patch to make the !help command work outside of #bot-commands again
+ # This probably needs a proper rewrite, but this will make it work in
+ # the mean time.
+ try:
+ can_run = await command.can_run(self._ctx)
+ except CheckFailure:
+ can_run = False
+
+ # see if the user can run the command
+ strikeout = ""
+ if not can_run:
+ # skip if we don't show commands they can't run
+ if self._only_can_run:
+ return []
+ strikeout = "~~"
+
+ if isinstance(self.query, commands.Command):
+ prefix = ""
+ else:
+ prefix = constants.Client.prefix
+
+ signature = self._get_command_params(command)
+ info = f"{strikeout}**`{prefix}{signature}`**{strikeout}"
+
+ # handle if the command has no docstring
+ short_doc = command.short_doc or "No details provided"
+ return [f"{info}\n*{short_doc}*"]
+
+ def embed_page(self, page_number: int = 0) -> Embed:
+ """Returns an Embed with the requested page formatted within."""
+ embed = Embed()
+
+ if isinstance(self.query, (commands.Command, Cog)) and page_number > 0:
+ title = f'Command Help | "{self.query.name}"'
+ else:
+ title = self.title
+
+ embed.set_author(name=title, icon_url=constants.Icons.questionmark)
+ embed.description = self._pages[page_number]
+
+ page_count = len(self._pages)
+ if page_count > 1:
+ embed.set_footer(text=f"Page {self._current_page+1} / {page_count}")
+
+ return embed
+
+ async def update_page(self, page_number: int = 0) -> None:
+ """Sends the intial message, or changes the existing one to the given page number."""
+ self._current_page = page_number
+ embed_page = self.embed_page(page_number)
+
+ if not self.message:
+ self.message = await self.destination.send(embed=embed_page)
+ else:
+ await self.message.edit(embed=embed_page)
+
+ @classmethod
+ async def start(cls, ctx: Context, *command, **options) -> "HelpSession":
+ """
+ Create and begin a help session based on the given command context.
+
+ Available options kwargs:
+ * cleanup: Optional[bool]
+ Set to `True` to have the message deleted on session end. Defaults to `False`.
+ * only_can_run: Optional[bool]
+ Set to `True` to hide commands the user can't run. Defaults to `False`.
+ * show_hidden: Optional[bool]
+ Set to `True` to include hidden commands. Defaults to `False`.
+ * max_lines: Optional[int]
+ Sets the max number of lines the paginator will add to a single page. Defaults to 20.
+ """
+ session = cls(ctx, *command, **options)
+ await session.prepare()
+
+ return session
+
+ async def stop(self) -> None:
+ """Stops the help session, removes event listeners and attempts to delete the help message."""
+ self._bot.remove_listener(self.on_reaction_add)
+ self._bot.remove_listener(self.on_message_delete)
+
+ # ignore if permission issue, or the message doesn't exist
+ with suppress(HTTPException, AttributeError):
+ if self._cleanup:
+ await self.message.delete()
+ else:
+ await self.message.clear_reactions()
+
+ @property
+ def is_first_page(self) -> bool:
+ """Check if session is currently showing the first page."""
+ return self._current_page == 0
+
+ @property
+ def is_last_page(self) -> bool:
+ """Check if the session is currently showing the last page."""
+ return self._current_page == (len(self._pages)-1)
+
+ async def do_first(self) -> None:
+ """Event that is called when the user requests the first page."""
+ if not self.is_first_page:
+ await self.update_page(0)
+
+ async def do_back(self) -> None:
+ """Event that is called when the user requests the previous page."""
+ if not self.is_first_page:
+ await self.update_page(self._current_page-1)
+
+ async def do_next(self) -> None:
+ """Event that is called when the user requests the next page."""
+ if not self.is_last_page:
+ await self.update_page(self._current_page+1)
+
+ async def do_end(self) -> None:
+ """Event that is called when the user requests the last page."""
+ if not self.is_last_page:
+ await self.update_page(len(self._pages)-1)
+
+ async def do_stop(self) -> None:
+ """Event that is called when the user requests to stop the help session."""
+ await self.message.delete()
+
+
+class Help(DiscordCog):
+ """Custom Embed Pagination Help feature."""
+
+ @commands.command("help")
+ async def new_help(self, ctx: Context, *commands) -> None:
+ """Shows Command Help."""
+ try:
+ await HelpSession.start(ctx, *commands)
+ except HelpQueryNotFound as error:
+ embed = Embed()
+ embed.colour = Colour.red()
+ embed.title = str(error)
+
+ if error.possible_matches:
+ matches = "\n".join(error.possible_matches.keys())
+ embed.description = f"**Did you mean:**\n`{matches}`"
+
+ await ctx.send(embed=embed)
+
+
+def unload(bot: Bot) -> None:
+ """
+ Reinstates the original help command.
+
+ This is run if the cog raises an exception on load, or if the extension is unloaded.
+ """
+ bot.remove_command("help")
+ bot.add_command(bot._old_help)
+
+
+def setup(bot: Bot) -> None:
+ """
+ The setup for the help extension.
+
+ This is called automatically on `bot.load_extension` being run.
+ Stores the original help command instance on the `bot._old_help` attribute for later
+ reinstatement, before removing it from the command registry so the new help command can be
+ loaded successfully.
+ If an exception is raised during the loading of the cog, `unload` will be called in order to
+ reinstate the original help command.
+ """
+ bot._old_help = bot.get_command("help")
+ bot.remove_command("help")
+
+ try:
+ bot.add_cog(Help())
+ except Exception:
+ unload(bot)
+ raise
+
+
+def teardown(bot: Bot) -> None:
+ """
+ The teardown for the help extension.
+
+ This is called automatically on `bot.unload_extension` being run.
+ Calls `unload` in order to reinstate the original help command.
+ """
+ unload(bot)
diff --git a/bot/exts/core/internal_eval/__init__.py b/bot/exts/core/internal_eval/__init__.py
new file mode 100644
index 00000000..695fa74d
--- /dev/null
+++ b/bot/exts/core/internal_eval/__init__.py
@@ -0,0 +1,10 @@
+from bot.bot import Bot
+
+
+def setup(bot: Bot) -> None:
+ """Set up the Internal Eval extension."""
+ # Import the Cog at runtime to prevent side effects like defining
+ # RedisCache instances too early.
+ from ._internal_eval import InternalEval
+
+ bot.add_cog(InternalEval(bot))
diff --git a/bot/exts/core/internal_eval/_helpers.py b/bot/exts/core/internal_eval/_helpers.py
new file mode 100644
index 00000000..5b2f8f5d
--- /dev/null
+++ b/bot/exts/core/internal_eval/_helpers.py
@@ -0,0 +1,248 @@
+import ast
+import collections
+import contextlib
+import functools
+import inspect
+import io
+import logging
+import sys
+import traceback
+import types
+from typing import Any, Optional, Union
+
+log = logging.getLogger(__name__)
+
+# A type alias to annotate the tuples returned from `sys.exc_info()`
+ExcInfo = tuple[type[Exception], Exception, types.TracebackType]
+Namespace = dict[str, Any]
+
+# This will be used as an coroutine function wrapper for the code
+# to be evaluated. The wrapper contains one `pass` statement which
+# will be replaced with `ast` with the code that we want to have
+# evaluated.
+# The function redirects output and captures exceptions that were
+# raised in the code we evaluate. The latter is used to provide a
+# meaningful traceback to the end user.
+EVAL_WRAPPER = """
+async def _eval_wrapper_function():
+ try:
+ with contextlib.redirect_stdout(_eval_context.stdout):
+ pass
+ if '_value_last_expression' in locals():
+ if inspect.isawaitable(_value_last_expression):
+ _value_last_expression = await _value_last_expression
+ _eval_context._value_last_expression = _value_last_expression
+ else:
+ _eval_context._value_last_expression = None
+ except Exception:
+ _eval_context.exc_info = sys.exc_info()
+ finally:
+ _eval_context.locals = locals()
+_eval_context.function = _eval_wrapper_function
+"""
+INTERNAL_EVAL_FRAMENAME = "<internal eval>"
+EVAL_WRAPPER_FUNCTION_FRAMENAME = "_eval_wrapper_function"
+
+
+def format_internal_eval_exception(exc_info: ExcInfo, code: str) -> str:
+ """Format an exception caught while evaluation code by inserting lines."""
+ exc_type, exc_value, tb = exc_info
+ stack_summary = traceback.StackSummary.extract(traceback.walk_tb(tb))
+ code = code.split("\n")
+
+ output = ["Traceback (most recent call last):"]
+ for frame in stack_summary:
+ if frame.filename == INTERNAL_EVAL_FRAMENAME:
+ line = code[frame.lineno - 1].lstrip()
+
+ if frame.name == EVAL_WRAPPER_FUNCTION_FRAMENAME:
+ name = INTERNAL_EVAL_FRAMENAME
+ else:
+ name = frame.name
+ else:
+ line = frame.line
+ name = frame.name
+
+ output.append(
+ f' File "{frame.filename}", line {frame.lineno}, in {name}\n'
+ f" {line}"
+ )
+
+ output.extend(traceback.format_exception_only(exc_type, exc_value))
+ return "\n".join(output)
+
+
+class EvalContext:
+ """
+ Represents the current `internal eval` context.
+
+ The context remembers names set during earlier runs of `internal eval`. To
+ clear the context, use the `.internal clear` command.
+ """
+
+ def __init__(self, context_vars: Namespace, local_vars: Namespace):
+ self._locals = dict(local_vars)
+ self.context_vars = dict(context_vars)
+
+ self.stdout = io.StringIO()
+ self._value_last_expression = None
+ self.exc_info = None
+ self.code = ""
+ self.function = None
+ self.eval_tree = None
+
+ @property
+ def dependencies(self) -> dict[str, Any]:
+ """
+ Return a mapping of the dependencies for the wrapper function.
+
+ By using a property descriptor, the mapping can't be accidentally
+ mutated during evaluation. This ensures the dependencies are always
+ available.
+ """
+ return {
+ "print": functools.partial(print, file=self.stdout),
+ "contextlib": contextlib,
+ "inspect": inspect,
+ "sys": sys,
+ "_eval_context": self,
+ "_": self._value_last_expression,
+ }
+
+ @property
+ def locals(self) -> dict[str, Any]:
+ """Return a mapping of names->values needed for evaluation."""
+ return {**collections.ChainMap(self.dependencies, self.context_vars, self._locals)}
+
+ @locals.setter
+ def locals(self, locals_: dict[str, Any]) -> None:
+ """Update the contextual mapping of names to values."""
+ log.trace(f"Updating {self._locals} with {locals_}")
+ self._locals.update(locals_)
+
+ def prepare_eval(self, code: str) -> Optional[str]:
+ """Prepare an evaluation by processing the code and setting up the context."""
+ self.code = code
+
+ if not self.code:
+ log.debug("No code was attached to the evaluation command")
+ return "[No code detected]"
+
+ try:
+ code_tree = ast.parse(code, filename=INTERNAL_EVAL_FRAMENAME)
+ except SyntaxError:
+ log.debug("Got a SyntaxError while parsing the eval code")
+ return "".join(traceback.format_exception(*sys.exc_info(), limit=0))
+
+ log.trace("Parsing the AST to see if there's a trailing expression we need to capture")
+ code_tree = CaptureLastExpression(code_tree).capture()
+
+ log.trace("Wrapping the AST in the AST of the wrapper coroutine")
+ eval_tree = WrapEvalCodeTree(code_tree).wrap()
+
+ self.eval_tree = eval_tree
+ return None
+
+ async def run_eval(self) -> Namespace:
+ """Run the evaluation and return the updated locals."""
+ log.trace("Compiling the AST to bytecode using `exec` mode")
+ compiled_code = compile(self.eval_tree, filename=INTERNAL_EVAL_FRAMENAME, mode="exec")
+
+ log.trace("Executing the compiled code with the desired namespace environment")
+ exec(compiled_code, self.locals) # noqa: B102,S102
+
+ log.trace("Awaiting the created evaluation wrapper coroutine.")
+ await self.function()
+
+ log.trace("Returning the updated captured locals.")
+ return self._locals
+
+ def format_output(self) -> str:
+ """Format the output of the most recent evaluation."""
+ output = []
+
+ log.trace(f"Getting output from stdout `{id(self.stdout)}`")
+ stdout_text = self.stdout.getvalue()
+ if stdout_text:
+ log.trace("Appending output captured from stdout/print")
+ output.append(stdout_text)
+
+ if self._value_last_expression is not None:
+ log.trace("Appending the output of a captured trialing expression")
+ output.append(f"[Captured] {self._value_last_expression!r}")
+
+ if self.exc_info:
+ log.trace("Appending exception information")
+ output.append(format_internal_eval_exception(self.exc_info, self.code))
+
+ log.trace(f"Generated output: {output!r}")
+ return "\n".join(output) or "[No output]"
+
+
+class WrapEvalCodeTree(ast.NodeTransformer):
+ """Wraps the AST of eval code with the wrapper function."""
+
+ def __init__(self, eval_code_tree: ast.AST, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.eval_code_tree = eval_code_tree
+
+ # To avoid mutable aliasing, parse the WRAPPER_FUNC for each wrapping
+ self.wrapper = ast.parse(EVAL_WRAPPER, filename=INTERNAL_EVAL_FRAMENAME)
+
+ def wrap(self) -> ast.AST:
+ """Wrap the tree of the code by the tree of the wrapper function."""
+ new_tree = self.visit(self.wrapper)
+ return ast.fix_missing_locations(new_tree)
+
+ def visit_Pass(self, node: ast.Pass) -> list[ast.AST]: # noqa: N802
+ """
+ Replace the `_ast.Pass` node in the wrapper function by the eval AST.
+
+ This method works on the assumption that there's a single `pass`
+ statement in the wrapper function.
+ """
+ return list(ast.iter_child_nodes(self.eval_code_tree))
+
+
+class CaptureLastExpression(ast.NodeTransformer):
+ """Captures the return value from a loose expression."""
+
+ def __init__(self, tree: ast.AST, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.tree = tree
+ self.last_node = list(ast.iter_child_nodes(tree))[-1]
+
+ def visit_Expr(self, node: ast.Expr) -> Union[ast.Expr, ast.Assign]: # noqa: N802
+ """
+ Replace the Expr node that is last child node of Module with an assignment.
+
+ We use an assignment to capture the value of the last node, if it's a loose
+ Expr node. Normally, the value of an Expr node is lost, meaning we don't get
+ the output of such a last "loose" expression. By assigning it a name, we can
+ retrieve it for our output.
+ """
+ if node is not self.last_node:
+ return node
+
+ log.trace("Found a trailing last expression in the evaluation code")
+
+ log.trace("Creating assignment statement with trailing expression as the right-hand side")
+ right_hand_side = list(ast.iter_child_nodes(node))[0]
+
+ assignment = ast.Assign(
+ targets=[ast.Name(id='_value_last_expression', ctx=ast.Store())],
+ value=right_hand_side,
+ lineno=node.lineno,
+ col_offset=0,
+ )
+ ast.fix_missing_locations(assignment)
+ return assignment
+
+ def capture(self) -> ast.AST:
+ """Capture the value of the last expression with an assignment."""
+ if not isinstance(self.last_node, ast.Expr):
+ # We only have to replace a node if the very last node is an Expr node
+ return self.tree
+
+ new_tree = self.visit(self.tree)
+ return ast.fix_missing_locations(new_tree)
diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py
new file mode 100644
index 00000000..4f6b4321
--- /dev/null
+++ b/bot/exts/core/internal_eval/_internal_eval.py
@@ -0,0 +1,179 @@
+import logging
+import re
+import textwrap
+from typing import Optional
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Client, Roles
+from bot.utils.decorators import with_role
+from bot.utils.extensions import invoke_help_command
+from ._helpers import EvalContext
+
+__all__ = ["InternalEval"]
+
+log = logging.getLogger(__name__)
+
+FORMATTED_CODE_REGEX = re.compile(
+ r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
+ r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
+ r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
+ r"(?P<code>.*?)" # extract all code inside the markup
+ r"\s*" # any more whitespace before the end of the code markup
+ r"(?P=delim)", # match the exact same delimiter from the start again
+ re.DOTALL | re.IGNORECASE # "." also matches newlines, case insensitive
+)
+
+RAW_CODE_REGEX = re.compile(
+ r"^(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
+ r"(?P<code>.*?)" # extract all the rest as code
+ r"\s*$", # any trailing whitespace until the end of the string
+ re.DOTALL # "." also matches newlines
+)
+
+
+class InternalEval(commands.Cog):
+ """Top secret code evaluation for admins and owners."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.locals = {}
+
+ if Client.debug:
+ self.internal_group.add_check(commands.is_owner().predicate)
+
+ @staticmethod
+ def shorten_output(
+ output: str,
+ max_length: int = 1900,
+ placeholder: str = "\n[output truncated]"
+ ) -> str:
+ """
+ Shorten the `output` so it's shorter than `max_length`.
+
+ There are three tactics for this, tried in the following order:
+ - Shorten the output on a line-by-line basis
+ - Shorten the output on any whitespace character
+ - Shorten the output solely on character count
+ """
+ max_length = max_length - len(placeholder)
+
+ shortened_output = []
+ char_count = 0
+ for line in output.split("\n"):
+ if char_count + len(line) > max_length:
+ break
+ shortened_output.append(line)
+ char_count += len(line) + 1 # account for (possible) line ending
+
+ if shortened_output:
+ shortened_output.append(placeholder)
+ return "\n".join(shortened_output)
+
+ shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder)
+
+ if shortened_output.strip() == placeholder.strip():
+ # `textwrap` was unable to find whitespace to shorten on, so it has
+ # reduced the output to just the placeholder. Let's shorten based on
+ # characters instead.
+ shortened_output = output[:max_length] + placeholder
+
+ return shortened_output
+
+ async def _upload_output(self, output: str) -> Optional[str]:
+ """Upload `internal eval` output to our pastebin and return the url."""
+ try:
+ async with self.bot.http_session.post(
+ "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True
+ ) as resp:
+ data = await resp.json()
+
+ if "key" in data:
+ return f"https://paste.pythondiscord.com/{data['key']}"
+ except Exception:
+ # 400 (Bad Request) means there are too many characters
+ log.exception("Failed to upload `internal eval` output to paste service!")
+
+ async def _send_output(self, ctx: commands.Context, output: str) -> None:
+ """Send the `internal eval` output to the command invocation context."""
+ upload_message = ""
+ if len(output) >= 1980:
+ # The output is too long, let's truncate it for in-channel output and
+ # upload the complete output to the paste service.
+ url = await self._upload_output(output)
+
+ if url:
+ upload_message = f"\nFull output here: {url}"
+ else:
+ upload_message = "\n:warning: Failed to upload full output!"
+
+ output = self.shorten_output(output)
+
+ await ctx.send(f"```py\n{output}\n```{upload_message}")
+
+ async def _eval(self, ctx: commands.Context, code: str) -> None:
+ """Evaluate the `code` in the current evaluation context."""
+ context_vars = {
+ "message": ctx.message,
+ "author": ctx.author,
+ "channel": ctx.channel,
+ "guild": ctx.guild,
+ "ctx": ctx,
+ "self": self,
+ "bot": self.bot,
+ "discord": discord,
+ }
+
+ eval_context = EvalContext(context_vars, self.locals)
+
+ log.trace("Preparing the evaluation by parsing the AST of the code")
+ error = eval_context.prepare_eval(code)
+
+ if error:
+ log.trace("The code can't be evaluated due to an error")
+ await ctx.send(f"```py\n{error}\n```")
+ return
+
+ log.trace("Evaluate the AST we've generated for the evaluation")
+ new_locals = await eval_context.run_eval()
+
+ log.trace("Updating locals with those set during evaluation")
+ self.locals.update(new_locals)
+
+ log.trace("Sending the formatted output back to the context")
+ await self._send_output(ctx, eval_context.format_output())
+
+ @commands.group(name="internal", aliases=("int",))
+ @with_role(Roles.admin)
+ async def internal_group(self, ctx: commands.Context) -> None:
+ """Internal commands. Top secret!"""
+ if not ctx.invoked_subcommand:
+ await invoke_help_command(ctx)
+
+ @internal_group.command(name="eval", aliases=("e",))
+ @with_role(Roles.admin)
+ async def eval(self, ctx: commands.Context, *, code: str) -> None:
+ """Run eval in a REPL-like format."""
+ if match := list(FORMATTED_CODE_REGEX.finditer(code)):
+ blocks = [block for block in match if block.group("block")]
+
+ if len(blocks) > 1:
+ code = "\n".join(block.group("code") for block in blocks)
+ else:
+ match = match[0] if len(blocks) == 0 else blocks[0]
+ code, block, lang, delim = match.group("code", "block", "lang", "delim")
+
+ else:
+ code = RAW_CODE_REGEX.fullmatch(code).group("code")
+
+ code = textwrap.dedent(code)
+ await self._eval(ctx, code)
+
+ @internal_group.command(name="reset", aliases=("clear", "exit", "r", "c"))
+ @with_role(Roles.admin)
+ async def reset(self, ctx: commands.Context) -> None:
+ """Reset the context and locals of the eval session."""
+ self.locals = {}
+ await ctx.send("The evaluation context was reset.")
diff --git a/bot/exts/core/ping.py b/bot/exts/core/ping.py
new file mode 100644
index 00000000..6be78117
--- /dev/null
+++ b/bot/exts/core/ping.py
@@ -0,0 +1,45 @@
+import arrow
+from dateutil.relativedelta import relativedelta
+from discord import Embed
+from discord.ext import commands
+
+from bot import start_time
+from bot.bot import Bot
+from bot.constants import Colours
+
+
+class Ping(commands.Cog):
+ """Get info about the bot's ping and uptime."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @commands.command(name="ping")
+ async def ping(self, ctx: commands.Context) -> None:
+ """Ping the bot to see its latency and state."""
+ embed = Embed(
+ title=":ping_pong: Pong!",
+ colour=Colours.bright_green,
+ description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms",
+ )
+
+ await ctx.send(embed=embed)
+
+ # Originally made in 70d2170a0a6594561d59c7d080c4280f1ebcd70b by lemon & gdude2002
+ @commands.command(name="uptime")
+ async def uptime(self, ctx: commands.Context) -> None:
+ """Get the current uptime of the bot."""
+ difference = relativedelta(start_time - arrow.utcnow())
+ uptime_string = start_time.shift(
+ seconds=-difference.seconds,
+ minutes=-difference.minutes,
+ hours=-difference.hours,
+ days=-difference.days
+ ).humanize()
+
+ await ctx.send(f"I started up {uptime_string}.")
+
+
+def setup(bot: Bot) -> None:
+ """Load the Ping cog."""
+ bot.add_cog(Ping(bot))
diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py
new file mode 100644
index 00000000..7572ce51
--- /dev/null
+++ b/bot/exts/core/source.py
@@ -0,0 +1,85 @@
+import inspect
+from pathlib import Path
+from typing import Optional
+
+from discord import Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Source
+from bot.utils.converters import SourceConverter, SourceType
+
+
+class BotSource(commands.Cog):
+ """Displays information about the bot's source code."""
+
+ @commands.command(name="source", aliases=("src",))
+ async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None:
+ """Display information and a GitHub link to the source code of a command, tag, or cog."""
+ if not source_item:
+ embed = Embed(title="Sir Lancebot's GitHub Repository")
+ embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})")
+ embed.set_thumbnail(url=Source.github_avatar_url)
+ await ctx.send(embed=embed)
+ return
+
+ embed = await self.build_embed(source_item)
+ await ctx.send(embed=embed)
+
+ def get_source_link(self, source_item: SourceType) -> tuple[str, str, Optional[int]]:
+ """
+ Build GitHub link of source item, return this link, file location and first line number.
+
+ Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval).
+ """
+ if isinstance(source_item, commands.Command):
+ callback = inspect.unwrap(source_item.callback)
+ src = callback.__code__
+ filename = src.co_filename
+ else:
+ src = type(source_item)
+ try:
+ filename = inspect.getsourcefile(src)
+ except TypeError:
+ raise commands.BadArgument("Cannot get source for a dynamically-created object.")
+
+ if not isinstance(source_item, str):
+ try:
+ lines, first_line_no = inspect.getsourcelines(src)
+ except OSError:
+ raise commands.BadArgument("Cannot get source for a dynamically-created object.")
+
+ lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}"
+ else:
+ first_line_no = None
+ lines_extension = ""
+
+ file_location = Path(filename).relative_to(Path.cwd()).as_posix()
+
+ url = f"{Source.github}/blob/main/{file_location}{lines_extension}"
+
+ return url, file_location, first_line_no or None
+
+ async def build_embed(self, source_object: SourceType) -> Optional[Embed]:
+ """Build embed based on source object."""
+ url, location, first_line = self.get_source_link(source_object)
+
+ if isinstance(source_object, commands.Command):
+ description = source_object.short_doc
+ title = f"Command: {source_object.qualified_name}"
+ else:
+ title = f"Cog: {source_object.qualified_name}"
+ description = source_object.description.splitlines()[0]
+
+ embed = Embed(title=title, description=description)
+ embed.set_thumbnail(url=Source.github_avatar_url)
+ embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})")
+ line_text = f":{first_line}" if first_line else ""
+ embed.set_footer(text=f"{location}{line_text}")
+
+ return embed
+
+
+def setup(bot: Bot) -> None:
+ """Load the BotSource cog."""
+ bot.add_cog(BotSource())