diff options
author | 2021-09-05 00:06:04 -0400 | |
---|---|---|
committer | 2021-09-05 00:06:04 -0400 | |
commit | f0b5c14e1f59e5135f27a4966021f30c77d1fc7d (patch) | |
tree | 4c83b68de1e56e6fea10b26b489d493030dbf88a /bot/exts/core | |
parent | Update paths to new resource links (diff) |
Move internal eval and rename utils to core
Part of this restructure involves splitting out the massive evergreen
folder into a `fun` folder and then a `utilities` folder. To help with
that we've rename the `util` folder to `core`.
The core functions to run the bot have been moved into this folder.
`.source`, `.ping`, and `.int e` have been moved into this folder.
Diffstat (limited to 'bot/exts/core')
-rw-r--r-- | bot/exts/core/__init__.py | 0 | ||||
-rw-r--r-- | bot/exts/core/error_handler.py | 182 | ||||
-rw-r--r-- | bot/exts/core/extensions.py | 266 | ||||
-rw-r--r-- | bot/exts/core/help.py | 562 | ||||
-rw-r--r-- | bot/exts/core/internal_eval/__init__.py | 10 | ||||
-rw-r--r-- | bot/exts/core/internal_eval/_helpers.py | 248 | ||||
-rw-r--r-- | bot/exts/core/internal_eval/_internal_eval.py | 179 | ||||
-rw-r--r-- | bot/exts/core/ping.py | 45 | ||||
-rw-r--r-- | bot/exts/core/source.py | 85 |
9 files changed, 1577 insertions, 0 deletions
diff --git a/bot/exts/core/__init__.py b/bot/exts/core/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/core/__init__.py diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py new file mode 100644 index 00000000..fd2123e7 --- /dev/null +++ b/bot/exts/core/error_handler.py @@ -0,0 +1,182 @@ +import difflib +import logging +import math +import random +from collections.abc import Iterable +from typing import Union + +from discord import Embed, Message +from discord.ext import commands +from sentry_sdk import push_scope + +from bot.bot import Bot +from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput +from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure +from bot.utils.exceptions import APIError, UserNotPlayingError + +log = logging.getLogger(__name__) + + +QUESTION_MARK_ICON = "https://cdn.discordapp.com/emojis/512367613339369475.png" + + +class CommandErrorHandler(commands.Cog): + """A error handler for the PythonDiscord server.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def revert_cooldown_counter(command: commands.Command, message: Message) -> None: + """Undoes the last cooldown counter for user-error cases.""" + if command._buckets.valid: + bucket = command._buckets.get_bucket(message) + bucket._tokens = min(bucket.rate, bucket._tokens + 1) + logging.debug("Cooldown counter reverted as the command was not used correctly.") + + @staticmethod + def error_embed(message: str, title: Union[Iterable, str] = ERROR_REPLIES) -> Embed: + """Build a basic embed with red colour and either a random error title or a title provided.""" + embed = Embed(colour=Colours.soft_red) + if isinstance(title, str): + embed.title = title + else: + embed.title = random.choice(title) + embed.description = message + return embed + + @commands.Cog.listener() + async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None: + """Activates when a command raises an error.""" + if getattr(error, "handled", False): + logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.") + return + + parent_command = "" + if subctx := getattr(ctx, "subcontext", None): + parent_command = f"{ctx.command} " + ctx = subctx + + error = getattr(error, "original", error) + logging.debug( + f"Error Encountered: {type(error).__name__} - {str(error)}, " + f"Command: {ctx.command}, " + f"Author: {ctx.author}, " + f"Channel: {ctx.channel}" + ) + + if isinstance(error, commands.CommandNotFound): + await self.send_command_suggestion(ctx, ctx.invoked_with) + return + + if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): + await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) + return + + if isinstance(error, commands.UserInputError): + self.revert_cooldown_counter(ctx.command, ctx.message) + usage = f"```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" + embed = self.error_embed( + f"Your input was invalid: {error}\n\nUsage:{usage}" + ) + await ctx.send(embed=embed) + return + + if isinstance(error, commands.CommandOnCooldown): + mins, secs = divmod(math.ceil(error.retry_after), 60) + embed = self.error_embed( + f"This command is on cooldown:\nPlease retry in {mins} minutes {secs} seconds.", + NEGATIVE_REPLIES + ) + await ctx.send(embed=embed, delete_after=7.5) + return + + if isinstance(error, commands.DisabledCommand): + await ctx.send(embed=self.error_embed("This command has been disabled.", NEGATIVE_REPLIES)) + return + + if isinstance(error, commands.NoPrivateMessage): + await ctx.send( + embed=self.error_embed( + f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", + NEGATIVE_REPLIES + ) + ) + return + + if isinstance(error, commands.BadArgument): + self.revert_cooldown_counter(ctx.command, ctx.message) + embed = self.error_embed( + "The argument you provided was invalid: " + f"{error}\n\nUsage:\n```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" + ) + await ctx.send(embed=embed) + return + + if isinstance(error, commands.CheckFailure): + await ctx.send(embed=self.error_embed("You are not authorized to use this command.", NEGATIVE_REPLIES)) + return + + if isinstance(error, UserNotPlayingError): + await ctx.send("Game not found.") + return + + if isinstance(error, APIError): + await ctx.send( + embed=self.error_embed( + f"There was an error when communicating with the {error.api}", + NEGATIVE_REPLIES + ) + ) + return + + with push_scope() as scope: + scope.user = { + "id": ctx.author.id, + "username": str(ctx.author) + } + + scope.set_tag("command", ctx.command.qualified_name) + scope.set_tag("message_id", ctx.message.id) + scope.set_tag("channel_id", ctx.channel.id) + + scope.set_extra("full_message", ctx.message.content) + + if ctx.guild is not None: + scope.set_extra("jump_to", ctx.message.jump_url) + + log.exception(f"Unhandled command error: {str(error)}", exc_info=error) + + async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None: + """Sends user similar commands if any can be found.""" + raw_commands = [] + for cmd in self.bot.walk_commands(): + if not cmd.hidden: + raw_commands += (cmd.name, *cmd.aliases) + if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1): + similar_command_name = similar_command_data[0] + similar_command = self.bot.get_command(similar_command_name) + + if not similar_command: + return + + log_msg = "Cancelling attempt to suggest a command due to failed checks." + try: + if not await similar_command.can_run(ctx): + log.debug(log_msg) + return + except commands.errors.CommandError as cmd_error: + log.debug(log_msg) + await self.on_command_error(ctx, cmd_error) + return + + misspelled_content = ctx.message.content + e = Embed() + e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON) + e.description = misspelled_content.replace(command_name, similar_command_name, 1) + await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay) + + +def setup(bot: Bot) -> None: + """Load the ErrorHandler cog.""" + bot.add_cog(CommandErrorHandler(bot)) diff --git a/bot/exts/core/extensions.py b/bot/exts/core/extensions.py new file mode 100644 index 00000000..424bacac --- /dev/null +++ b/bot/exts/core/extensions.py @@ -0,0 +1,266 @@ +import functools +import logging +from collections.abc import Mapping +from enum import Enum +from typing import Optional + +from discord import Colour, Embed +from discord.ext import commands +from discord.ext.commands import Context, group + +from bot import exts +from bot.bot import Bot +from bot.constants import Client, Emojis, MODERATION_ROLES, Roles +from bot.utils.checks import with_role_check +from bot.utils.extensions import EXTENSIONS, invoke_help_command, unqualify +from bot.utils.pagination import LinePaginator + +log = logging.getLogger(__name__) + + +UNLOAD_BLACKLIST = {f"{exts.__name__}.utils.extensions"} +BASE_PATH_LEN = len(exts.__name__.split(".")) + + +class Action(Enum): + """Represents an action to perform on an extension.""" + + # Need to be partial otherwise they are considered to be function definitions. + LOAD = functools.partial(Bot.load_extension) + UNLOAD = functools.partial(Bot.unload_extension) + RELOAD = functools.partial(Bot.reload_extension) + + +class Extension(commands.Converter): + """ + Fully qualify the name of an extension and ensure it exists. + + The * and ** values bypass this when used with the reload command. + """ + + async def convert(self, ctx: Context, argument: str) -> str: + """Fully qualify the name of an extension and ensure it exists.""" + # Special values to reload all extensions + if argument == "*" or argument == "**": + return argument + + argument = argument.lower() + + if argument in EXTENSIONS: + return argument + elif (qualified_arg := f"{exts.__name__}.{argument}") in EXTENSIONS: + return qualified_arg + + matches = [] + for ext in EXTENSIONS: + if argument == unqualify(ext): + matches.append(ext) + + if len(matches) > 1: + matches.sort() + names = "\n".join(matches) + raise commands.BadArgument( + f":x: `{argument}` is an ambiguous extension name. " + f"Please use one of the following fully-qualified names.```\n{names}\n```" + ) + elif matches: + return matches[0] + else: + raise commands.BadArgument(f":x: Could not find the extension `{argument}`.") + + +class Extensions(commands.Cog): + """Extension management commands.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True) + async def extensions_group(self, ctx: Context) -> None: + """Load, unload, reload, and list loaded extensions.""" + await invoke_help_command(ctx) + + @extensions_group.command(name="load", aliases=("l",)) + async def load_command(self, ctx: Context, *extensions: Extension) -> None: + r""" + Load extensions given their fully qualified or unqualified names. + + If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded. + """ # noqa: W605 + if not extensions: + await invoke_help_command(ctx) + return + + if "*" in extensions or "**" in extensions: + extensions = set(EXTENSIONS) - set(self.bot.extensions.keys()) + + msg = self.batch_manage(Action.LOAD, *extensions) + await ctx.send(msg) + + @extensions_group.command(name="unload", aliases=("ul",)) + async def unload_command(self, ctx: Context, *extensions: Extension) -> None: + r""" + Unload currently loaded extensions given their fully qualified or unqualified names. + + If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded. + """ # noqa: W605 + if not extensions: + await invoke_help_command(ctx) + return + + blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions)) + + if blacklisted: + msg = f":x: The following extension(s) may not be unloaded:```\n{blacklisted}\n```" + else: + if "*" in extensions or "**" in extensions: + extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST + + msg = self.batch_manage(Action.UNLOAD, *extensions) + + await ctx.send(msg) + + @extensions_group.command(name="reload", aliases=("r",), root_aliases=("reload",)) + async def reload_command(self, ctx: Context, *extensions: Extension) -> None: + r""" + Reload extensions given their fully qualified or unqualified names. + + If an extension fails to be reloaded, it will be rolled-back to the prior working state. + + If '\*' is given as the name, all currently loaded extensions will be reloaded. + If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded. + """ # noqa: W605 + if not extensions: + await invoke_help_command(ctx) + return + + if "**" in extensions: + extensions = EXTENSIONS + elif "*" in extensions: + extensions = set(self.bot.extensions.keys()) | set(extensions) + extensions.remove("*") + + msg = self.batch_manage(Action.RELOAD, *extensions) + + await ctx.send(msg) + + @extensions_group.command(name="list", aliases=("all",)) + async def list_command(self, ctx: Context) -> None: + """ + Get a list of all extensions, including their loaded status. + + Grey indicates that the extension is unloaded. + Green indicates that the extension is currently loaded. + """ + embed = Embed(colour=Colour.blurple()) + embed.set_author( + name="Extensions List", + url=Client.github_bot_repo, + icon_url=str(self.bot.user.display_avatar.url) + ) + + lines = [] + categories = self.group_extension_statuses() + for category, extensions in sorted(categories.items()): + # Treat each category as a single line by concatenating everything. + # This ensures the paginator will not cut off a page in the middle of a category. + category = category.replace("_", " ").title() + extensions = "\n".join(sorted(extensions)) + lines.append(f"**{category}**\n{extensions}\n") + + log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") + await LinePaginator.paginate(lines, ctx, embed, max_size=1200, empty=False) + + def group_extension_statuses(self) -> Mapping[str, str]: + """Return a mapping of extension names and statuses to their categories.""" + categories = {} + + for ext in EXTENSIONS: + if ext in self.bot.extensions: + status = Emojis.status_online + else: + status = Emojis.status_offline + + path = ext.split(".") + if len(path) > BASE_PATH_LEN + 1: + category = " - ".join(path[BASE_PATH_LEN:-1]) + else: + category = "uncategorised" + + categories.setdefault(category, []).append(f"{status} {path[-1]}") + + return categories + + def batch_manage(self, action: Action, *extensions: str) -> str: + """ + Apply an action to multiple extensions and return a message with the results. + + If only one extension is given, it is deferred to `manage()`. + """ + if len(extensions) == 1: + msg, _ = self.manage(action, extensions[0]) + return msg + + verb = action.name.lower() + failures = {} + + for extension in extensions: + _, error = self.manage(action, extension) + if error: + failures[extension] = error + + emoji = ":x:" if failures else ":ok_hand:" + msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed." + + if failures: + failures = "\n".join(f"{ext}\n {err}" for ext, err in failures.items()) + msg += f"\nFailures:```\n{failures}\n```" + + log.debug(f"Batch {verb}ed extensions.") + + return msg + + def manage(self, action: Action, ext: str) -> tuple[str, Optional[str]]: + """Apply an action to an extension and return the status message and any error message.""" + verb = action.name.lower() + error_msg = None + + try: + action.value(self.bot, ext) + except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded): + if action is Action.RELOAD: + # When reloading, just load the extension if it was not loaded. + return self.manage(Action.LOAD, ext) + + msg = f":x: Extension `{ext}` is already {verb}ed." + log.debug(msg[4:]) + except Exception as e: + if hasattr(e, "original"): + e = e.original + + log.exception(f"Extension '{ext}' failed to {verb}.") + + error_msg = f"{e.__class__.__name__}: {e}" + msg = f":x: Failed to {verb} extension `{ext}`:\n```\n{error_msg}\n```" + else: + msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`." + log.debug(msg[10:]) + + return msg, error_msg + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators and core developers to invoke the commands in this cog.""" + return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developers) + + # This cannot be static (must have a __func__ attribute). + async def cog_command_error(self, ctx: Context, error: Exception) -> None: + """Handle BadArgument errors locally to prevent the help command from showing.""" + if isinstance(error, commands.BadArgument): + await ctx.send(str(error)) + error.handled = True + + +def setup(bot: Bot) -> None: + """Load the Extensions cog.""" + bot.add_cog(Extensions(bot)) diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py new file mode 100644 index 00000000..4b766b50 --- /dev/null +++ b/bot/exts/core/help.py @@ -0,0 +1,562 @@ +# Help command from Python bot. All commands that will be added to there in futures should be added to here too. +import asyncio +import itertools +import logging +from contextlib import suppress +from typing import NamedTuple, Union + +from discord import Colour, Embed, HTTPException, Message, Reaction, User +from discord.ext import commands +from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context +from rapidfuzz import process + +from bot import constants +from bot.bot import Bot +from bot.constants import Emojis +from bot.utils.pagination import ( + FIRST_EMOJI, LAST_EMOJI, + LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, +) + +DELETE_EMOJI = Emojis.trashcan + +REACTIONS = { + FIRST_EMOJI: "first", + LEFT_EMOJI: "back", + RIGHT_EMOJI: "next", + LAST_EMOJI: "end", + DELETE_EMOJI: "stop", +} + + +class Cog(NamedTuple): + """Show information about a Cog's name, description and commands.""" + + name: str + description: str + commands: list[Command] + + +log = logging.getLogger(__name__) + + +class HelpQueryNotFound(ValueError): + """ + Raised when a HelpSession Query doesn't match a command or cog. + + Contains the custom attribute of ``possible_matches``. + Instances of this object contain a dictionary of any command(s) that were close to matching the + query, where keys are the possible matched command names and values are the likeness match scores. + """ + + def __init__(self, arg: str, possible_matches: dict = None): + super().__init__(arg) + self.possible_matches = possible_matches + + +class HelpSession: + """ + An interactive session for bot and command help output. + + Expected attributes include: + * title: str + The title of the help message. + * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command] + * description: str + The description of the query. + * pages: list[str] + A list of the help content split into manageable pages. + * message: `discord.Message` + The message object that's showing the help contents. + * destination: `discord.abc.Messageable` + Where the help message is to be sent to. + Cogs can be grouped into custom categories. All cogs with the same category will be displayed + under a single category name in the help output. Custom categories are defined inside the cogs + as a class attribute named `category`. A description can also be specified with the attribute + `category_description`. If a description is not found in at least one cog, the default will be + the regular description (class docstring) of the first cog found in the category. + """ + + def __init__( + self, + ctx: Context, + *command, + cleanup: bool = False, + only_can_run: bool = True, + show_hidden: bool = False, + max_lines: int = 15 + ): + """Creates an instance of the HelpSession class.""" + self._ctx = ctx + self._bot = ctx.bot + self.title = "Command Help" + + # set the query details for the session + if command: + query_str = " ".join(command) + self.query = self._get_query(query_str) + self.description = self.query.description or self.query.help + else: + self.query = ctx.bot + self.description = self.query.description + self.author = ctx.author + self.destination = ctx.channel + + # set the config for the session + self._cleanup = cleanup + self._only_can_run = only_can_run + self._show_hidden = show_hidden + self._max_lines = max_lines + + # init session states + self._pages = None + self._current_page = 0 + self.message = None + self._timeout_task = None + self.reset_timeout() + + def _get_query(self, query: str) -> Union[Command, Cog]: + """Attempts to match the provided query with a valid command or cog.""" + command = self._bot.get_command(query) + if command: + return command + + # Find all cog categories that match. + cog_matches = [] + description = None + for cog in self._bot.cogs.values(): + if hasattr(cog, "category") and cog.category == query: + cog_matches.append(cog) + if hasattr(cog, "category_description"): + description = cog.category_description + + # Try to search by cog name if no categories match. + if not cog_matches: + cog = self._bot.cogs.get(query) + + # Don't consider it a match if the cog has a category. + if cog and not hasattr(cog, "category"): + cog_matches = [cog] + + if cog_matches: + cog = cog_matches[0] + cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs + + return Cog( + name=cog.category if hasattr(cog, "category") else cog.qualified_name, + description=description or cog.description, + commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list + ) + + self._handle_not_found(query) + + def _handle_not_found(self, query: str) -> None: + """ + Handles when a query does not match a valid command or cog. + + Will pass on possible close matches along with the `HelpQueryNotFound` exception. + """ + # Combine command and cog names + choices = list(self._bot.all_commands) + list(self._bot.cogs) + + result = process.extract(query, choices, score_cutoff=90) + + raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) + + async def timeout(self, seconds: int = 30) -> None: + """Waits for a set number of seconds, then stops the help session.""" + await asyncio.sleep(seconds) + await self.stop() + + def reset_timeout(self) -> None: + """Cancels the original timeout task and sets it again from the start.""" + # cancel original if it exists + if self._timeout_task: + if not self._timeout_task.cancelled(): + self._timeout_task.cancel() + + # recreate the timeout task + self._timeout_task = self._bot.loop.create_task(self.timeout()) + + async def on_reaction_add(self, reaction: Reaction, user: User) -> None: + """Event handler for when reactions are added on the help message.""" + # ensure it was the relevant session message + if reaction.message.id != self.message.id: + return + + # ensure it was the session author who reacted + if user.id != self.author.id: + return + + emoji = str(reaction.emoji) + + # check if valid action + if emoji not in REACTIONS: + return + + self.reset_timeout() + + # Run relevant action method + action = getattr(self, f"do_{REACTIONS[emoji]}", None) + if action: + await action() + + # remove the added reaction to prep for re-use + with suppress(HTTPException): + await self.message.remove_reaction(reaction, user) + + async def on_message_delete(self, message: Message) -> None: + """Closes the help session when the help message is deleted.""" + if message.id == self.message.id: + await self.stop() + + async def prepare(self) -> None: + """Sets up the help session pages, events, message and reactions.""" + await self.build_pages() + + self._bot.add_listener(self.on_reaction_add) + self._bot.add_listener(self.on_message_delete) + + await self.update_page() + self.add_reactions() + + def add_reactions(self) -> None: + """Adds the relevant reactions to the help message based on if pagination is required.""" + # if paginating + if len(self._pages) > 1: + for reaction in REACTIONS: + self._bot.loop.create_task(self.message.add_reaction(reaction)) + + # if single-page + else: + self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI)) + + def _category_key(self, cmd: Command) -> str: + """ + Returns a cog name of a given command for use as a key for `sorted` and `groupby`. + + A zero width space is used as a prefix for results with no cogs to force them last in ordering. + """ + if cmd.cog: + try: + if cmd.cog.category: + return f"**{cmd.cog.category}**" + except AttributeError: + pass + + return f"**{cmd.cog_name}**" + else: + return "**\u200bNo Category:**" + + def _get_command_params(self, cmd: Command) -> str: + """ + Returns the command usage signature. + + This is a custom implementation of `command.signature` in order to format the command + signature without aliases. + """ + results = [] + for name, param in cmd.clean_params.items(): + + # if argument has a default value + if param.default is not param.empty: + + if isinstance(param.default, str): + show_default = param.default + else: + show_default = param.default is not None + + # if default is not an empty string or None + if show_default: + results.append(f"[{name}={param.default}]") + else: + results.append(f"[{name}]") + + # if variable length argument + elif param.kind == param.VAR_POSITIONAL: + results.append(f"[{name}...]") + + # if required + else: + results.append(f"<{name}>") + + return f"{cmd.name} {' '.join(results)}" + + async def build_pages(self) -> None: + """Builds the list of content pages to be paginated through in the help message, as a list of str.""" + # Use LinePaginator to restrict embed line height + paginator = LinePaginator(prefix="", suffix="", max_lines=self._max_lines) + + # show signature if query is a command + if isinstance(self.query, commands.Command): + await self._add_command_signature(paginator) + + if isinstance(self.query, Cog): + paginator.add_line(f"**{self.query.name}**") + + if self.description: + paginator.add_line(f"*{self.description}*") + + # list all children commands of the queried object + if isinstance(self.query, (commands.GroupMixin, Cog)): + await self._list_child_commands(paginator) + + self._pages = paginator.pages + + async def _add_command_signature(self, paginator: LinePaginator) -> None: + prefix = constants.Client.prefix + + signature = self._get_command_params(self.query) + parent = self.query.full_parent_name + " " if self.query.parent else "" + paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**") + aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases] + aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())] + aliases = ", ".join(sorted(aliases)) + if aliases: + paginator.add_line(f"**Can also use:** {aliases}\n") + if not await self.query.can_run(self._ctx): + paginator.add_line("***You cannot run this command.***\n") + + async def _list_child_commands(self, paginator: LinePaginator) -> None: + # remove hidden commands if session is not wanting hiddens + if not self._show_hidden: + filtered = [c for c in self.query.commands if not c.hidden] + else: + filtered = self.query.commands + + # if after filter there are no commands, finish up + if not filtered: + self._pages = paginator.pages + return + + if isinstance(self.query, Cog): + grouped = (("**Commands:**", self.query.commands),) + + elif isinstance(self.query, commands.Command): + grouped = (("**Subcommands:**", self.query.commands),) + + # otherwise sort and organise all commands into categories + else: + cat_sort = sorted(filtered, key=self._category_key) + grouped = itertools.groupby(cat_sort, key=self._category_key) + + for category, cmds in grouped: + await self._format_command_category(paginator, category, list(cmds)) + + async def _format_command_category(self, paginator: LinePaginator, category: str, cmds: list[Command]) -> None: + cmds = sorted(cmds, key=lambda c: c.name) + cat_cmds = [] + for command in cmds: + cat_cmds += await self._format_command(command) + + # state var for if the category should be added next + print_cat = 1 + new_page = True + + for details in cat_cmds: + + # keep details together, paginating early if it won"t fit + lines_adding = len(details.split("\n")) + print_cat + if paginator._linecount + lines_adding > self._max_lines: + paginator._linecount = 0 + new_page = True + paginator.close_page() + + # new page so print category title again + print_cat = 1 + + if print_cat: + if new_page: + paginator.add_line("") + paginator.add_line(category) + print_cat = 0 + + paginator.add_line(details) + + async def _format_command(self, command: Command) -> list[str]: + # skip if hidden and hide if session is set to + if command.hidden and not self._show_hidden: + return [] + + # Patch to make the !help command work outside of #bot-commands again + # This probably needs a proper rewrite, but this will make it work in + # the mean time. + try: + can_run = await command.can_run(self._ctx) + except CheckFailure: + can_run = False + + # see if the user can run the command + strikeout = "" + if not can_run: + # skip if we don't show commands they can't run + if self._only_can_run: + return [] + strikeout = "~~" + + if isinstance(self.query, commands.Command): + prefix = "" + else: + prefix = constants.Client.prefix + + signature = self._get_command_params(command) + info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" + + # handle if the command has no docstring + short_doc = command.short_doc or "No details provided" + return [f"{info}\n*{short_doc}*"] + + def embed_page(self, page_number: int = 0) -> Embed: + """Returns an Embed with the requested page formatted within.""" + embed = Embed() + + if isinstance(self.query, (commands.Command, Cog)) and page_number > 0: + title = f'Command Help | "{self.query.name}"' + else: + title = self.title + + embed.set_author(name=title, icon_url=constants.Icons.questionmark) + embed.description = self._pages[page_number] + + page_count = len(self._pages) + if page_count > 1: + embed.set_footer(text=f"Page {self._current_page+1} / {page_count}") + + return embed + + async def update_page(self, page_number: int = 0) -> None: + """Sends the intial message, or changes the existing one to the given page number.""" + self._current_page = page_number + embed_page = self.embed_page(page_number) + + if not self.message: + self.message = await self.destination.send(embed=embed_page) + else: + await self.message.edit(embed=embed_page) + + @classmethod + async def start(cls, ctx: Context, *command, **options) -> "HelpSession": + """ + Create and begin a help session based on the given command context. + + Available options kwargs: + * cleanup: Optional[bool] + Set to `True` to have the message deleted on session end. Defaults to `False`. + * only_can_run: Optional[bool] + Set to `True` to hide commands the user can't run. Defaults to `False`. + * show_hidden: Optional[bool] + Set to `True` to include hidden commands. Defaults to `False`. + * max_lines: Optional[int] + Sets the max number of lines the paginator will add to a single page. Defaults to 20. + """ + session = cls(ctx, *command, **options) + await session.prepare() + + return session + + async def stop(self) -> None: + """Stops the help session, removes event listeners and attempts to delete the help message.""" + self._bot.remove_listener(self.on_reaction_add) + self._bot.remove_listener(self.on_message_delete) + + # ignore if permission issue, or the message doesn't exist + with suppress(HTTPException, AttributeError): + if self._cleanup: + await self.message.delete() + else: + await self.message.clear_reactions() + + @property + def is_first_page(self) -> bool: + """Check if session is currently showing the first page.""" + return self._current_page == 0 + + @property + def is_last_page(self) -> bool: + """Check if the session is currently showing the last page.""" + return self._current_page == (len(self._pages)-1) + + async def do_first(self) -> None: + """Event that is called when the user requests the first page.""" + if not self.is_first_page: + await self.update_page(0) + + async def do_back(self) -> None: + """Event that is called when the user requests the previous page.""" + if not self.is_first_page: + await self.update_page(self._current_page-1) + + async def do_next(self) -> None: + """Event that is called when the user requests the next page.""" + if not self.is_last_page: + await self.update_page(self._current_page+1) + + async def do_end(self) -> None: + """Event that is called when the user requests the last page.""" + if not self.is_last_page: + await self.update_page(len(self._pages)-1) + + async def do_stop(self) -> None: + """Event that is called when the user requests to stop the help session.""" + await self.message.delete() + + +class Help(DiscordCog): + """Custom Embed Pagination Help feature.""" + + @commands.command("help") + async def new_help(self, ctx: Context, *commands) -> None: + """Shows Command Help.""" + try: + await HelpSession.start(ctx, *commands) + except HelpQueryNotFound as error: + embed = Embed() + embed.colour = Colour.red() + embed.title = str(error) + + if error.possible_matches: + matches = "\n".join(error.possible_matches.keys()) + embed.description = f"**Did you mean:**\n`{matches}`" + + await ctx.send(embed=embed) + + +def unload(bot: Bot) -> None: + """ + Reinstates the original help command. + + This is run if the cog raises an exception on load, or if the extension is unloaded. + """ + bot.remove_command("help") + bot.add_command(bot._old_help) + + +def setup(bot: Bot) -> None: + """ + The setup for the help extension. + + This is called automatically on `bot.load_extension` being run. + Stores the original help command instance on the `bot._old_help` attribute for later + reinstatement, before removing it from the command registry so the new help command can be + loaded successfully. + If an exception is raised during the loading of the cog, `unload` will be called in order to + reinstate the original help command. + """ + bot._old_help = bot.get_command("help") + bot.remove_command("help") + + try: + bot.add_cog(Help()) + except Exception: + unload(bot) + raise + + +def teardown(bot: Bot) -> None: + """ + The teardown for the help extension. + + This is called automatically on `bot.unload_extension` being run. + Calls `unload` in order to reinstate the original help command. + """ + unload(bot) diff --git a/bot/exts/core/internal_eval/__init__.py b/bot/exts/core/internal_eval/__init__.py new file mode 100644 index 00000000..695fa74d --- /dev/null +++ b/bot/exts/core/internal_eval/__init__.py @@ -0,0 +1,10 @@ +from bot.bot import Bot + + +def setup(bot: Bot) -> None: + """Set up the Internal Eval extension.""" + # Import the Cog at runtime to prevent side effects like defining + # RedisCache instances too early. + from ._internal_eval import InternalEval + + bot.add_cog(InternalEval(bot)) diff --git a/bot/exts/core/internal_eval/_helpers.py b/bot/exts/core/internal_eval/_helpers.py new file mode 100644 index 00000000..5b2f8f5d --- /dev/null +++ b/bot/exts/core/internal_eval/_helpers.py @@ -0,0 +1,248 @@ +import ast +import collections +import contextlib +import functools +import inspect +import io +import logging +import sys +import traceback +import types +from typing import Any, Optional, Union + +log = logging.getLogger(__name__) + +# A type alias to annotate the tuples returned from `sys.exc_info()` +ExcInfo = tuple[type[Exception], Exception, types.TracebackType] +Namespace = dict[str, Any] + +# This will be used as an coroutine function wrapper for the code +# to be evaluated. The wrapper contains one `pass` statement which +# will be replaced with `ast` with the code that we want to have +# evaluated. +# The function redirects output and captures exceptions that were +# raised in the code we evaluate. The latter is used to provide a +# meaningful traceback to the end user. +EVAL_WRAPPER = """ +async def _eval_wrapper_function(): + try: + with contextlib.redirect_stdout(_eval_context.stdout): + pass + if '_value_last_expression' in locals(): + if inspect.isawaitable(_value_last_expression): + _value_last_expression = await _value_last_expression + _eval_context._value_last_expression = _value_last_expression + else: + _eval_context._value_last_expression = None + except Exception: + _eval_context.exc_info = sys.exc_info() + finally: + _eval_context.locals = locals() +_eval_context.function = _eval_wrapper_function +""" +INTERNAL_EVAL_FRAMENAME = "<internal eval>" +EVAL_WRAPPER_FUNCTION_FRAMENAME = "_eval_wrapper_function" + + +def format_internal_eval_exception(exc_info: ExcInfo, code: str) -> str: + """Format an exception caught while evaluation code by inserting lines.""" + exc_type, exc_value, tb = exc_info + stack_summary = traceback.StackSummary.extract(traceback.walk_tb(tb)) + code = code.split("\n") + + output = ["Traceback (most recent call last):"] + for frame in stack_summary: + if frame.filename == INTERNAL_EVAL_FRAMENAME: + line = code[frame.lineno - 1].lstrip() + + if frame.name == EVAL_WRAPPER_FUNCTION_FRAMENAME: + name = INTERNAL_EVAL_FRAMENAME + else: + name = frame.name + else: + line = frame.line + name = frame.name + + output.append( + f' File "{frame.filename}", line {frame.lineno}, in {name}\n' + f" {line}" + ) + + output.extend(traceback.format_exception_only(exc_type, exc_value)) + return "\n".join(output) + + +class EvalContext: + """ + Represents the current `internal eval` context. + + The context remembers names set during earlier runs of `internal eval`. To + clear the context, use the `.internal clear` command. + """ + + def __init__(self, context_vars: Namespace, local_vars: Namespace): + self._locals = dict(local_vars) + self.context_vars = dict(context_vars) + + self.stdout = io.StringIO() + self._value_last_expression = None + self.exc_info = None + self.code = "" + self.function = None + self.eval_tree = None + + @property + def dependencies(self) -> dict[str, Any]: + """ + Return a mapping of the dependencies for the wrapper function. + + By using a property descriptor, the mapping can't be accidentally + mutated during evaluation. This ensures the dependencies are always + available. + """ + return { + "print": functools.partial(print, file=self.stdout), + "contextlib": contextlib, + "inspect": inspect, + "sys": sys, + "_eval_context": self, + "_": self._value_last_expression, + } + + @property + def locals(self) -> dict[str, Any]: + """Return a mapping of names->values needed for evaluation.""" + return {**collections.ChainMap(self.dependencies, self.context_vars, self._locals)} + + @locals.setter + def locals(self, locals_: dict[str, Any]) -> None: + """Update the contextual mapping of names to values.""" + log.trace(f"Updating {self._locals} with {locals_}") + self._locals.update(locals_) + + def prepare_eval(self, code: str) -> Optional[str]: + """Prepare an evaluation by processing the code and setting up the context.""" + self.code = code + + if not self.code: + log.debug("No code was attached to the evaluation command") + return "[No code detected]" + + try: + code_tree = ast.parse(code, filename=INTERNAL_EVAL_FRAMENAME) + except SyntaxError: + log.debug("Got a SyntaxError while parsing the eval code") + return "".join(traceback.format_exception(*sys.exc_info(), limit=0)) + + log.trace("Parsing the AST to see if there's a trailing expression we need to capture") + code_tree = CaptureLastExpression(code_tree).capture() + + log.trace("Wrapping the AST in the AST of the wrapper coroutine") + eval_tree = WrapEvalCodeTree(code_tree).wrap() + + self.eval_tree = eval_tree + return None + + async def run_eval(self) -> Namespace: + """Run the evaluation and return the updated locals.""" + log.trace("Compiling the AST to bytecode using `exec` mode") + compiled_code = compile(self.eval_tree, filename=INTERNAL_EVAL_FRAMENAME, mode="exec") + + log.trace("Executing the compiled code with the desired namespace environment") + exec(compiled_code, self.locals) # noqa: B102,S102 + + log.trace("Awaiting the created evaluation wrapper coroutine.") + await self.function() + + log.trace("Returning the updated captured locals.") + return self._locals + + def format_output(self) -> str: + """Format the output of the most recent evaluation.""" + output = [] + + log.trace(f"Getting output from stdout `{id(self.stdout)}`") + stdout_text = self.stdout.getvalue() + if stdout_text: + log.trace("Appending output captured from stdout/print") + output.append(stdout_text) + + if self._value_last_expression is not None: + log.trace("Appending the output of a captured trialing expression") + output.append(f"[Captured] {self._value_last_expression!r}") + + if self.exc_info: + log.trace("Appending exception information") + output.append(format_internal_eval_exception(self.exc_info, self.code)) + + log.trace(f"Generated output: {output!r}") + return "\n".join(output) or "[No output]" + + +class WrapEvalCodeTree(ast.NodeTransformer): + """Wraps the AST of eval code with the wrapper function.""" + + def __init__(self, eval_code_tree: ast.AST, *args, **kwargs): + super().__init__(*args, **kwargs) + self.eval_code_tree = eval_code_tree + + # To avoid mutable aliasing, parse the WRAPPER_FUNC for each wrapping + self.wrapper = ast.parse(EVAL_WRAPPER, filename=INTERNAL_EVAL_FRAMENAME) + + def wrap(self) -> ast.AST: + """Wrap the tree of the code by the tree of the wrapper function.""" + new_tree = self.visit(self.wrapper) + return ast.fix_missing_locations(new_tree) + + def visit_Pass(self, node: ast.Pass) -> list[ast.AST]: # noqa: N802 + """ + Replace the `_ast.Pass` node in the wrapper function by the eval AST. + + This method works on the assumption that there's a single `pass` + statement in the wrapper function. + """ + return list(ast.iter_child_nodes(self.eval_code_tree)) + + +class CaptureLastExpression(ast.NodeTransformer): + """Captures the return value from a loose expression.""" + + def __init__(self, tree: ast.AST, *args, **kwargs): + super().__init__(*args, **kwargs) + self.tree = tree + self.last_node = list(ast.iter_child_nodes(tree))[-1] + + def visit_Expr(self, node: ast.Expr) -> Union[ast.Expr, ast.Assign]: # noqa: N802 + """ + Replace the Expr node that is last child node of Module with an assignment. + + We use an assignment to capture the value of the last node, if it's a loose + Expr node. Normally, the value of an Expr node is lost, meaning we don't get + the output of such a last "loose" expression. By assigning it a name, we can + retrieve it for our output. + """ + if node is not self.last_node: + return node + + log.trace("Found a trailing last expression in the evaluation code") + + log.trace("Creating assignment statement with trailing expression as the right-hand side") + right_hand_side = list(ast.iter_child_nodes(node))[0] + + assignment = ast.Assign( + targets=[ast.Name(id='_value_last_expression', ctx=ast.Store())], + value=right_hand_side, + lineno=node.lineno, + col_offset=0, + ) + ast.fix_missing_locations(assignment) + return assignment + + def capture(self) -> ast.AST: + """Capture the value of the last expression with an assignment.""" + if not isinstance(self.last_node, ast.Expr): + # We only have to replace a node if the very last node is an Expr node + return self.tree + + new_tree = self.visit(self.tree) + return ast.fix_missing_locations(new_tree) diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py new file mode 100644 index 00000000..4f6b4321 --- /dev/null +++ b/bot/exts/core/internal_eval/_internal_eval.py @@ -0,0 +1,179 @@ +import logging +import re +import textwrap +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Client, Roles +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from ._helpers import EvalContext + +__all__ = ["InternalEval"] + +log = logging.getLogger(__name__) + +FORMATTED_CODE_REGEX = re.compile( + r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block + r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) + r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code + r"(?P<code>.*?)" # extract all code inside the markup + r"\s*" # any more whitespace before the end of the code markup + r"(?P=delim)", # match the exact same delimiter from the start again + re.DOTALL | re.IGNORECASE # "." also matches newlines, case insensitive +) + +RAW_CODE_REGEX = re.compile( + r"^(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code + r"(?P<code>.*?)" # extract all the rest as code + r"\s*$", # any trailing whitespace until the end of the string + re.DOTALL # "." also matches newlines +) + + +class InternalEval(commands.Cog): + """Top secret code evaluation for admins and owners.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.locals = {} + + if Client.debug: + self.internal_group.add_check(commands.is_owner().predicate) + + @staticmethod + def shorten_output( + output: str, + max_length: int = 1900, + placeholder: str = "\n[output truncated]" + ) -> str: + """ + Shorten the `output` so it's shorter than `max_length`. + + There are three tactics for this, tried in the following order: + - Shorten the output on a line-by-line basis + - Shorten the output on any whitespace character + - Shorten the output solely on character count + """ + max_length = max_length - len(placeholder) + + shortened_output = [] + char_count = 0 + for line in output.split("\n"): + if char_count + len(line) > max_length: + break + shortened_output.append(line) + char_count += len(line) + 1 # account for (possible) line ending + + if shortened_output: + shortened_output.append(placeholder) + return "\n".join(shortened_output) + + shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder) + + if shortened_output.strip() == placeholder.strip(): + # `textwrap` was unable to find whitespace to shorten on, so it has + # reduced the output to just the placeholder. Let's shorten based on + # characters instead. + shortened_output = output[:max_length] + placeholder + + return shortened_output + + async def _upload_output(self, output: str) -> Optional[str]: + """Upload `internal eval` output to our pastebin and return the url.""" + try: + async with self.bot.http_session.post( + "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True + ) as resp: + data = await resp.json() + + if "key" in data: + return f"https://paste.pythondiscord.com/{data['key']}" + except Exception: + # 400 (Bad Request) means there are too many characters + log.exception("Failed to upload `internal eval` output to paste service!") + + async def _send_output(self, ctx: commands.Context, output: str) -> None: + """Send the `internal eval` output to the command invocation context.""" + upload_message = "" + if len(output) >= 1980: + # The output is too long, let's truncate it for in-channel output and + # upload the complete output to the paste service. + url = await self._upload_output(output) + + if url: + upload_message = f"\nFull output here: {url}" + else: + upload_message = "\n:warning: Failed to upload full output!" + + output = self.shorten_output(output) + + await ctx.send(f"```py\n{output}\n```{upload_message}") + + async def _eval(self, ctx: commands.Context, code: str) -> None: + """Evaluate the `code` in the current evaluation context.""" + context_vars = { + "message": ctx.message, + "author": ctx.author, + "channel": ctx.channel, + "guild": ctx.guild, + "ctx": ctx, + "self": self, + "bot": self.bot, + "discord": discord, + } + + eval_context = EvalContext(context_vars, self.locals) + + log.trace("Preparing the evaluation by parsing the AST of the code") + error = eval_context.prepare_eval(code) + + if error: + log.trace("The code can't be evaluated due to an error") + await ctx.send(f"```py\n{error}\n```") + return + + log.trace("Evaluate the AST we've generated for the evaluation") + new_locals = await eval_context.run_eval() + + log.trace("Updating locals with those set during evaluation") + self.locals.update(new_locals) + + log.trace("Sending the formatted output back to the context") + await self._send_output(ctx, eval_context.format_output()) + + @commands.group(name="internal", aliases=("int",)) + @with_role(Roles.admin) + async def internal_group(self, ctx: commands.Context) -> None: + """Internal commands. Top secret!""" + if not ctx.invoked_subcommand: + await invoke_help_command(ctx) + + @internal_group.command(name="eval", aliases=("e",)) + @with_role(Roles.admin) + async def eval(self, ctx: commands.Context, *, code: str) -> None: + """Run eval in a REPL-like format.""" + if match := list(FORMATTED_CODE_REGEX.finditer(code)): + blocks = [block for block in match if block.group("block")] + + if len(blocks) > 1: + code = "\n".join(block.group("code") for block in blocks) + else: + match = match[0] if len(blocks) == 0 else blocks[0] + code, block, lang, delim = match.group("code", "block", "lang", "delim") + + else: + code = RAW_CODE_REGEX.fullmatch(code).group("code") + + code = textwrap.dedent(code) + await self._eval(ctx, code) + + @internal_group.command(name="reset", aliases=("clear", "exit", "r", "c")) + @with_role(Roles.admin) + async def reset(self, ctx: commands.Context) -> None: + """Reset the context and locals of the eval session.""" + self.locals = {} + await ctx.send("The evaluation context was reset.") diff --git a/bot/exts/core/ping.py b/bot/exts/core/ping.py new file mode 100644 index 00000000..6be78117 --- /dev/null +++ b/bot/exts/core/ping.py @@ -0,0 +1,45 @@ +import arrow +from dateutil.relativedelta import relativedelta +from discord import Embed +from discord.ext import commands + +from bot import start_time +from bot.bot import Bot +from bot.constants import Colours + + +class Ping(commands.Cog): + """Get info about the bot's ping and uptime.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @commands.command(name="ping") + async def ping(self, ctx: commands.Context) -> None: + """Ping the bot to see its latency and state.""" + embed = Embed( + title=":ping_pong: Pong!", + colour=Colours.bright_green, + description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms", + ) + + await ctx.send(embed=embed) + + # Originally made in 70d2170a0a6594561d59c7d080c4280f1ebcd70b by lemon & gdude2002 + @commands.command(name="uptime") + async def uptime(self, ctx: commands.Context) -> None: + """Get the current uptime of the bot.""" + difference = relativedelta(start_time - arrow.utcnow()) + uptime_string = start_time.shift( + seconds=-difference.seconds, + minutes=-difference.minutes, + hours=-difference.hours, + days=-difference.days + ).humanize() + + await ctx.send(f"I started up {uptime_string}.") + + +def setup(bot: Bot) -> None: + """Load the Ping cog.""" + bot.add_cog(Ping(bot)) diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py new file mode 100644 index 00000000..7572ce51 --- /dev/null +++ b/bot/exts/core/source.py @@ -0,0 +1,85 @@ +import inspect +from pathlib import Path +from typing import Optional + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Source +from bot.utils.converters import SourceConverter, SourceType + + +class BotSource(commands.Cog): + """Displays information about the bot's source code.""" + + @commands.command(name="source", aliases=("src",)) + async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: + """Display information and a GitHub link to the source code of a command, tag, or cog.""" + if not source_item: + embed = Embed(title="Sir Lancebot's GitHub Repository") + embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") + embed.set_thumbnail(url=Source.github_avatar_url) + await ctx.send(embed=embed) + return + + embed = await self.build_embed(source_item) + await ctx.send(embed=embed) + + def get_source_link(self, source_item: SourceType) -> tuple[str, str, Optional[int]]: + """ + Build GitHub link of source item, return this link, file location and first line number. + + Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval). + """ + if isinstance(source_item, commands.Command): + callback = inspect.unwrap(source_item.callback) + src = callback.__code__ + filename = src.co_filename + else: + src = type(source_item) + try: + filename = inspect.getsourcefile(src) + except TypeError: + raise commands.BadArgument("Cannot get source for a dynamically-created object.") + + if not isinstance(source_item, str): + try: + lines, first_line_no = inspect.getsourcelines(src) + except OSError: + raise commands.BadArgument("Cannot get source for a dynamically-created object.") + + lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}" + else: + first_line_no = None + lines_extension = "" + + file_location = Path(filename).relative_to(Path.cwd()).as_posix() + + url = f"{Source.github}/blob/main/{file_location}{lines_extension}" + + return url, file_location, first_line_no or None + + async def build_embed(self, source_object: SourceType) -> Optional[Embed]: + """Build embed based on source object.""" + url, location, first_line = self.get_source_link(source_object) + + if isinstance(source_object, commands.Command): + description = source_object.short_doc + title = f"Command: {source_object.qualified_name}" + else: + title = f"Cog: {source_object.qualified_name}" + description = source_object.description.splitlines()[0] + + embed = Embed(title=title, description=description) + embed.set_thumbnail(url=Source.github_avatar_url) + embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})") + line_text = f":{first_line}" if first_line else "" + embed.set_footer(text=f"{location}{line_text}") + + return embed + + +def setup(bot: Bot) -> None: + """Load the BotSource cog.""" + bot.add_cog(BotSource()) |