diff options
Diffstat (limited to 'bot/exts/core')
| -rw-r--r-- | bot/exts/core/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/core/error_handler.py | 182 | ||||
| -rw-r--r-- | bot/exts/core/extensions.py | 266 | ||||
| -rw-r--r-- | bot/exts/core/help.py | 562 | ||||
| -rw-r--r-- | bot/exts/core/internal_eval/__init__.py | 10 | ||||
| -rw-r--r-- | bot/exts/core/internal_eval/_helpers.py | 248 | ||||
| -rw-r--r-- | bot/exts/core/internal_eval/_internal_eval.py | 179 | ||||
| -rw-r--r-- | bot/exts/core/ping.py | 45 | ||||
| -rw-r--r-- | bot/exts/core/source.py | 85 | 
9 files changed, 1577 insertions, 0 deletions
| diff --git a/bot/exts/core/__init__.py b/bot/exts/core/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/core/__init__.py diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py new file mode 100644 index 00000000..fd2123e7 --- /dev/null +++ b/bot/exts/core/error_handler.py @@ -0,0 +1,182 @@ +import difflib +import logging +import math +import random +from collections.abc import Iterable +from typing import Union + +from discord import Embed, Message +from discord.ext import commands +from sentry_sdk import push_scope + +from bot.bot import Bot +from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput +from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure +from bot.utils.exceptions import APIError, UserNotPlayingError + +log = logging.getLogger(__name__) + + +QUESTION_MARK_ICON = "https://cdn.discordapp.com/emojis/512367613339369475.png" + + +class CommandErrorHandler(commands.Cog): +    """A error handler for the PythonDiscord server.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @staticmethod +    def revert_cooldown_counter(command: commands.Command, message: Message) -> None: +        """Undoes the last cooldown counter for user-error cases.""" +        if command._buckets.valid: +            bucket = command._buckets.get_bucket(message) +            bucket._tokens = min(bucket.rate, bucket._tokens + 1) +            logging.debug("Cooldown counter reverted as the command was not used correctly.") + +    @staticmethod +    def error_embed(message: str, title: Union[Iterable, str] = ERROR_REPLIES) -> Embed: +        """Build a basic embed with red colour and either a random error title or a title provided.""" +        embed = Embed(colour=Colours.soft_red) +        if isinstance(title, str): +            embed.title = title +        else: +            embed.title = random.choice(title) +        embed.description = message +        return embed + +    @commands.Cog.listener() +    async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None: +        """Activates when a command raises an error.""" +        if getattr(error, "handled", False): +            logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.") +            return + +        parent_command = "" +        if subctx := getattr(ctx, "subcontext", None): +            parent_command = f"{ctx.command} " +            ctx = subctx + +        error = getattr(error, "original", error) +        logging.debug( +            f"Error Encountered: {type(error).__name__} - {str(error)}, " +            f"Command: {ctx.command}, " +            f"Author: {ctx.author}, " +            f"Channel: {ctx.channel}" +        ) + +        if isinstance(error, commands.CommandNotFound): +            await self.send_command_suggestion(ctx, ctx.invoked_with) +            return + +        if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): +            await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) +            return + +        if isinstance(error, commands.UserInputError): +            self.revert_cooldown_counter(ctx.command, ctx.message) +            usage = f"```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" +            embed = self.error_embed( +                f"Your input was invalid: {error}\n\nUsage:{usage}" +            ) +            await ctx.send(embed=embed) +            return + +        if isinstance(error, commands.CommandOnCooldown): +            mins, secs = divmod(math.ceil(error.retry_after), 60) +            embed = self.error_embed( +                f"This command is on cooldown:\nPlease retry in {mins} minutes {secs} seconds.", +                NEGATIVE_REPLIES +            ) +            await ctx.send(embed=embed, delete_after=7.5) +            return + +        if isinstance(error, commands.DisabledCommand): +            await ctx.send(embed=self.error_embed("This command has been disabled.", NEGATIVE_REPLIES)) +            return + +        if isinstance(error, commands.NoPrivateMessage): +            await ctx.send( +                embed=self.error_embed( +                    f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", +                    NEGATIVE_REPLIES +                ) +            ) +            return + +        if isinstance(error, commands.BadArgument): +            self.revert_cooldown_counter(ctx.command, ctx.message) +            embed = self.error_embed( +                "The argument you provided was invalid: " +                f"{error}\n\nUsage:\n```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" +            ) +            await ctx.send(embed=embed) +            return + +        if isinstance(error, commands.CheckFailure): +            await ctx.send(embed=self.error_embed("You are not authorized to use this command.", NEGATIVE_REPLIES)) +            return + +        if isinstance(error, UserNotPlayingError): +            await ctx.send("Game not found.") +            return + +        if isinstance(error, APIError): +            await ctx.send( +                embed=self.error_embed( +                    f"There was an error when communicating with the {error.api}", +                    NEGATIVE_REPLIES +                ) +            ) +            return + +        with push_scope() as scope: +            scope.user = { +                "id": ctx.author.id, +                "username": str(ctx.author) +            } + +            scope.set_tag("command", ctx.command.qualified_name) +            scope.set_tag("message_id", ctx.message.id) +            scope.set_tag("channel_id", ctx.channel.id) + +            scope.set_extra("full_message", ctx.message.content) + +            if ctx.guild is not None: +                scope.set_extra("jump_to", ctx.message.jump_url) + +            log.exception(f"Unhandled command error: {str(error)}", exc_info=error) + +    async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None: +        """Sends user similar commands if any can be found.""" +        raw_commands = [] +        for cmd in self.bot.walk_commands(): +            if not cmd.hidden: +                raw_commands += (cmd.name, *cmd.aliases) +        if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1): +            similar_command_name = similar_command_data[0] +            similar_command = self.bot.get_command(similar_command_name) + +            if not similar_command: +                return + +            log_msg = "Cancelling attempt to suggest a command due to failed checks." +            try: +                if not await similar_command.can_run(ctx): +                    log.debug(log_msg) +                    return +            except commands.errors.CommandError as cmd_error: +                log.debug(log_msg) +                await self.on_command_error(ctx, cmd_error) +                return + +            misspelled_content = ctx.message.content +            e = Embed() +            e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON) +            e.description = misspelled_content.replace(command_name, similar_command_name, 1) +            await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay) + + +def setup(bot: Bot) -> None: +    """Load the ErrorHandler cog.""" +    bot.add_cog(CommandErrorHandler(bot)) diff --git a/bot/exts/core/extensions.py b/bot/exts/core/extensions.py new file mode 100644 index 00000000..424bacac --- /dev/null +++ b/bot/exts/core/extensions.py @@ -0,0 +1,266 @@ +import functools +import logging +from collections.abc import Mapping +from enum import Enum +from typing import Optional + +from discord import Colour, Embed +from discord.ext import commands +from discord.ext.commands import Context, group + +from bot import exts +from bot.bot import Bot +from bot.constants import Client, Emojis, MODERATION_ROLES, Roles +from bot.utils.checks import with_role_check +from bot.utils.extensions import EXTENSIONS, invoke_help_command, unqualify +from bot.utils.pagination import LinePaginator + +log = logging.getLogger(__name__) + + +UNLOAD_BLACKLIST = {f"{exts.__name__}.utils.extensions"} +BASE_PATH_LEN = len(exts.__name__.split(".")) + + +class Action(Enum): +    """Represents an action to perform on an extension.""" + +    # Need to be partial otherwise they are considered to be function definitions. +    LOAD = functools.partial(Bot.load_extension) +    UNLOAD = functools.partial(Bot.unload_extension) +    RELOAD = functools.partial(Bot.reload_extension) + + +class Extension(commands.Converter): +    """ +    Fully qualify the name of an extension and ensure it exists. + +    The * and ** values bypass this when used with the reload command. +    """ + +    async def convert(self, ctx: Context, argument: str) -> str: +        """Fully qualify the name of an extension and ensure it exists.""" +        # Special values to reload all extensions +        if argument == "*" or argument == "**": +            return argument + +        argument = argument.lower() + +        if argument in EXTENSIONS: +            return argument +        elif (qualified_arg := f"{exts.__name__}.{argument}") in EXTENSIONS: +            return qualified_arg + +        matches = [] +        for ext in EXTENSIONS: +            if argument == unqualify(ext): +                matches.append(ext) + +        if len(matches) > 1: +            matches.sort() +            names = "\n".join(matches) +            raise commands.BadArgument( +                f":x: `{argument}` is an ambiguous extension name. " +                f"Please use one of the following fully-qualified names.```\n{names}\n```" +            ) +        elif matches: +            return matches[0] +        else: +            raise commands.BadArgument(f":x: Could not find the extension `{argument}`.") + + +class Extensions(commands.Cog): +    """Extension management commands.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True) +    async def extensions_group(self, ctx: Context) -> None: +        """Load, unload, reload, and list loaded extensions.""" +        await invoke_help_command(ctx) + +    @extensions_group.command(name="load", aliases=("l",)) +    async def load_command(self, ctx: Context, *extensions: Extension) -> None: +        r""" +        Load extensions given their fully qualified or unqualified names. + +        If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded. +        """  # noqa: W605 +        if not extensions: +            await invoke_help_command(ctx) +            return + +        if "*" in extensions or "**" in extensions: +            extensions = set(EXTENSIONS) - set(self.bot.extensions.keys()) + +        msg = self.batch_manage(Action.LOAD, *extensions) +        await ctx.send(msg) + +    @extensions_group.command(name="unload", aliases=("ul",)) +    async def unload_command(self, ctx: Context, *extensions: Extension) -> None: +        r""" +        Unload currently loaded extensions given their fully qualified or unqualified names. + +        If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded. +        """  # noqa: W605 +        if not extensions: +            await invoke_help_command(ctx) +            return + +        blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions)) + +        if blacklisted: +            msg = f":x: The following extension(s) may not be unloaded:```\n{blacklisted}\n```" +        else: +            if "*" in extensions or "**" in extensions: +                extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST + +            msg = self.batch_manage(Action.UNLOAD, *extensions) + +        await ctx.send(msg) + +    @extensions_group.command(name="reload", aliases=("r",), root_aliases=("reload",)) +    async def reload_command(self, ctx: Context, *extensions: Extension) -> None: +        r""" +        Reload extensions given their fully qualified or unqualified names. + +        If an extension fails to be reloaded, it will be rolled-back to the prior working state. + +        If '\*' is given as the name, all currently loaded extensions will be reloaded. +        If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded. +        """  # noqa: W605 +        if not extensions: +            await invoke_help_command(ctx) +            return + +        if "**" in extensions: +            extensions = EXTENSIONS +        elif "*" in extensions: +            extensions = set(self.bot.extensions.keys()) | set(extensions) +            extensions.remove("*") + +        msg = self.batch_manage(Action.RELOAD, *extensions) + +        await ctx.send(msg) + +    @extensions_group.command(name="list", aliases=("all",)) +    async def list_command(self, ctx: Context) -> None: +        """ +        Get a list of all extensions, including their loaded status. + +        Grey indicates that the extension is unloaded. +        Green indicates that the extension is currently loaded. +        """ +        embed = Embed(colour=Colour.blurple()) +        embed.set_author( +            name="Extensions List", +            url=Client.github_bot_repo, +            icon_url=str(self.bot.user.display_avatar.url) +        ) + +        lines = [] +        categories = self.group_extension_statuses() +        for category, extensions in sorted(categories.items()): +            # Treat each category as a single line by concatenating everything. +            # This ensures the paginator will not cut off a page in the middle of a category. +            category = category.replace("_", " ").title() +            extensions = "\n".join(sorted(extensions)) +            lines.append(f"**{category}**\n{extensions}\n") + +        log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") +        await LinePaginator.paginate(lines, ctx, embed, max_size=1200, empty=False) + +    def group_extension_statuses(self) -> Mapping[str, str]: +        """Return a mapping of extension names and statuses to their categories.""" +        categories = {} + +        for ext in EXTENSIONS: +            if ext in self.bot.extensions: +                status = Emojis.status_online +            else: +                status = Emojis.status_offline + +            path = ext.split(".") +            if len(path) > BASE_PATH_LEN + 1: +                category = " - ".join(path[BASE_PATH_LEN:-1]) +            else: +                category = "uncategorised" + +            categories.setdefault(category, []).append(f"{status}  {path[-1]}") + +        return categories + +    def batch_manage(self, action: Action, *extensions: str) -> str: +        """ +        Apply an action to multiple extensions and return a message with the results. + +        If only one extension is given, it is deferred to `manage()`. +        """ +        if len(extensions) == 1: +            msg, _ = self.manage(action, extensions[0]) +            return msg + +        verb = action.name.lower() +        failures = {} + +        for extension in extensions: +            _, error = self.manage(action, extension) +            if error: +                failures[extension] = error + +        emoji = ":x:" if failures else ":ok_hand:" +        msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed." + +        if failures: +            failures = "\n".join(f"{ext}\n    {err}" for ext, err in failures.items()) +            msg += f"\nFailures:```\n{failures}\n```" + +        log.debug(f"Batch {verb}ed extensions.") + +        return msg + +    def manage(self, action: Action, ext: str) -> tuple[str, Optional[str]]: +        """Apply an action to an extension and return the status message and any error message.""" +        verb = action.name.lower() +        error_msg = None + +        try: +            action.value(self.bot, ext) +        except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded): +            if action is Action.RELOAD: +                # When reloading, just load the extension if it was not loaded. +                return self.manage(Action.LOAD, ext) + +            msg = f":x: Extension `{ext}` is already {verb}ed." +            log.debug(msg[4:]) +        except Exception as e: +            if hasattr(e, "original"): +                e = e.original + +            log.exception(f"Extension '{ext}' failed to {verb}.") + +            error_msg = f"{e.__class__.__name__}: {e}" +            msg = f":x: Failed to {verb} extension `{ext}`:\n```\n{error_msg}\n```" +        else: +            msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`." +            log.debug(msg[10:]) + +        return msg, error_msg + +    # This cannot be static (must have a __func__ attribute). +    def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators and core developers to invoke the commands in this cog.""" +        return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developers) + +    # This cannot be static (must have a __func__ attribute). +    async def cog_command_error(self, ctx: Context, error: Exception) -> None: +        """Handle BadArgument errors locally to prevent the help command from showing.""" +        if isinstance(error, commands.BadArgument): +            await ctx.send(str(error)) +            error.handled = True + + +def setup(bot: Bot) -> None: +    """Load the Extensions cog.""" +    bot.add_cog(Extensions(bot)) diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py new file mode 100644 index 00000000..4b766b50 --- /dev/null +++ b/bot/exts/core/help.py @@ -0,0 +1,562 @@ +# Help command from Python bot. All commands that will be added to there in futures should be added to here too. +import asyncio +import itertools +import logging +from contextlib import suppress +from typing import NamedTuple, Union + +from discord import Colour, Embed, HTTPException, Message, Reaction, User +from discord.ext import commands +from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context +from rapidfuzz import process + +from bot import constants +from bot.bot import Bot +from bot.constants import Emojis +from bot.utils.pagination import ( +    FIRST_EMOJI, LAST_EMOJI, +    LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, +) + +DELETE_EMOJI = Emojis.trashcan + +REACTIONS = { +    FIRST_EMOJI: "first", +    LEFT_EMOJI: "back", +    RIGHT_EMOJI: "next", +    LAST_EMOJI: "end", +    DELETE_EMOJI: "stop", +} + + +class Cog(NamedTuple): +    """Show information about a Cog's name, description and commands.""" + +    name: str +    description: str +    commands: list[Command] + + +log = logging.getLogger(__name__) + + +class HelpQueryNotFound(ValueError): +    """ +    Raised when a HelpSession Query doesn't match a command or cog. + +    Contains the custom attribute of ``possible_matches``. +    Instances of this object contain a dictionary of any command(s) that were close to matching the +    query, where keys are the possible matched command names and values are the likeness match scores. +    """ + +    def __init__(self, arg: str, possible_matches: dict = None): +        super().__init__(arg) +        self.possible_matches = possible_matches + + +class HelpSession: +    """ +    An interactive session for bot and command help output. + +    Expected attributes include: +        * title: str +            The title of the help message. +        * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command] +        * description: str +            The description of the query. +        * pages: list[str] +            A list of the help content split into manageable pages. +        * message: `discord.Message` +            The message object that's showing the help contents. +        * destination: `discord.abc.Messageable` +            Where the help message is to be sent to. +    Cogs can be grouped into custom categories. All cogs with the same category will be displayed +    under a single category name in the help output. Custom categories are defined inside the cogs +    as a class attribute named `category`. A description can also be specified with the attribute +    `category_description`. If a description is not found in at least one cog, the default will be +    the regular description (class docstring) of the first cog found in the category. +    """ + +    def __init__( +        self, +        ctx: Context, +        *command, +        cleanup: bool = False, +        only_can_run: bool = True, +        show_hidden: bool = False, +        max_lines: int = 15 +    ): +        """Creates an instance of the HelpSession class.""" +        self._ctx = ctx +        self._bot = ctx.bot +        self.title = "Command Help" + +        # set the query details for the session +        if command: +            query_str = " ".join(command) +            self.query = self._get_query(query_str) +            self.description = self.query.description or self.query.help +        else: +            self.query = ctx.bot +            self.description = self.query.description +        self.author = ctx.author +        self.destination = ctx.channel + +        # set the config for the session +        self._cleanup = cleanup +        self._only_can_run = only_can_run +        self._show_hidden = show_hidden +        self._max_lines = max_lines + +        # init session states +        self._pages = None +        self._current_page = 0 +        self.message = None +        self._timeout_task = None +        self.reset_timeout() + +    def _get_query(self, query: str) -> Union[Command, Cog]: +        """Attempts to match the provided query with a valid command or cog.""" +        command = self._bot.get_command(query) +        if command: +            return command + +        # Find all cog categories that match. +        cog_matches = [] +        description = None +        for cog in self._bot.cogs.values(): +            if hasattr(cog, "category") and cog.category == query: +                cog_matches.append(cog) +                if hasattr(cog, "category_description"): +                    description = cog.category_description + +        # Try to search by cog name if no categories match. +        if not cog_matches: +            cog = self._bot.cogs.get(query) + +            # Don't consider it a match if the cog has a category. +            if cog and not hasattr(cog, "category"): +                cog_matches = [cog] + +        if cog_matches: +            cog = cog_matches[0] +            cmds = (cog.get_commands() for cog in cog_matches)  # Commands of all cogs + +            return Cog( +                name=cog.category if hasattr(cog, "category") else cog.qualified_name, +                description=description or cog.description, +                commands=tuple(itertools.chain.from_iterable(cmds))  # Flatten the list +            ) + +        self._handle_not_found(query) + +    def _handle_not_found(self, query: str) -> None: +        """ +        Handles when a query does not match a valid command or cog. + +        Will pass on possible close matches along with the `HelpQueryNotFound` exception. +        """ +        # Combine command and cog names +        choices = list(self._bot.all_commands) + list(self._bot.cogs) + +        result = process.extract(query, choices, score_cutoff=90) + +        raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) + +    async def timeout(self, seconds: int = 30) -> None: +        """Waits for a set number of seconds, then stops the help session.""" +        await asyncio.sleep(seconds) +        await self.stop() + +    def reset_timeout(self) -> None: +        """Cancels the original timeout task and sets it again from the start.""" +        # cancel original if it exists +        if self._timeout_task: +            if not self._timeout_task.cancelled(): +                self._timeout_task.cancel() + +        # recreate the timeout task +        self._timeout_task = self._bot.loop.create_task(self.timeout()) + +    async def on_reaction_add(self, reaction: Reaction, user: User) -> None: +        """Event handler for when reactions are added on the help message.""" +        # ensure it was the relevant session message +        if reaction.message.id != self.message.id: +            return + +        # ensure it was the session author who reacted +        if user.id != self.author.id: +            return + +        emoji = str(reaction.emoji) + +        # check if valid action +        if emoji not in REACTIONS: +            return + +        self.reset_timeout() + +        # Run relevant action method +        action = getattr(self, f"do_{REACTIONS[emoji]}", None) +        if action: +            await action() + +        # remove the added reaction to prep for re-use +        with suppress(HTTPException): +            await self.message.remove_reaction(reaction, user) + +    async def on_message_delete(self, message: Message) -> None: +        """Closes the help session when the help message is deleted.""" +        if message.id == self.message.id: +            await self.stop() + +    async def prepare(self) -> None: +        """Sets up the help session pages, events, message and reactions.""" +        await self.build_pages() + +        self._bot.add_listener(self.on_reaction_add) +        self._bot.add_listener(self.on_message_delete) + +        await self.update_page() +        self.add_reactions() + +    def add_reactions(self) -> None: +        """Adds the relevant reactions to the help message based on if pagination is required.""" +        # if paginating +        if len(self._pages) > 1: +            for reaction in REACTIONS: +                self._bot.loop.create_task(self.message.add_reaction(reaction)) + +        # if single-page +        else: +            self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI)) + +    def _category_key(self, cmd: Command) -> str: +        """ +        Returns a cog name of a given command for use as a key for `sorted` and `groupby`. + +        A zero width space is used as a prefix for results with no cogs to force them last in ordering. +        """ +        if cmd.cog: +            try: +                if cmd.cog.category: +                    return f"**{cmd.cog.category}**" +            except AttributeError: +                pass + +            return f"**{cmd.cog_name}**" +        else: +            return "**\u200bNo Category:**" + +    def _get_command_params(self, cmd: Command) -> str: +        """ +        Returns the command usage signature. + +        This is a custom implementation of `command.signature` in order to format the command +        signature without aliases. +        """ +        results = [] +        for name, param in cmd.clean_params.items(): + +            # if argument has a default value +            if param.default is not param.empty: + +                if isinstance(param.default, str): +                    show_default = param.default +                else: +                    show_default = param.default is not None + +                # if default is not an empty string or None +                if show_default: +                    results.append(f"[{name}={param.default}]") +                else: +                    results.append(f"[{name}]") + +            # if variable length argument +            elif param.kind == param.VAR_POSITIONAL: +                results.append(f"[{name}...]") + +            # if required +            else: +                results.append(f"<{name}>") + +        return f"{cmd.name} {' '.join(results)}" + +    async def build_pages(self) -> None: +        """Builds the list of content pages to be paginated through in the help message, as a list of str.""" +        # Use LinePaginator to restrict embed line height +        paginator = LinePaginator(prefix="", suffix="", max_lines=self._max_lines) + +        # show signature if query is a command +        if isinstance(self.query, commands.Command): +            await self._add_command_signature(paginator) + +        if isinstance(self.query, Cog): +            paginator.add_line(f"**{self.query.name}**") + +        if self.description: +            paginator.add_line(f"*{self.description}*") + +        # list all children commands of the queried object +        if isinstance(self.query, (commands.GroupMixin, Cog)): +            await self._list_child_commands(paginator) + +        self._pages = paginator.pages + +    async def _add_command_signature(self, paginator: LinePaginator) -> None: +        prefix = constants.Client.prefix + +        signature = self._get_command_params(self.query) +        parent = self.query.full_parent_name + " " if self.query.parent else "" +        paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**") +        aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases] +        aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())] +        aliases = ", ".join(sorted(aliases)) +        if aliases: +            paginator.add_line(f"**Can also use:** {aliases}\n") +        if not await self.query.can_run(self._ctx): +            paginator.add_line("***You cannot run this command.***\n") + +    async def _list_child_commands(self, paginator: LinePaginator) -> None: +        # remove hidden commands if session is not wanting hiddens +        if not self._show_hidden: +            filtered = [c for c in self.query.commands if not c.hidden] +        else: +            filtered = self.query.commands + +        # if after filter there are no commands, finish up +        if not filtered: +            self._pages = paginator.pages +            return + +        if isinstance(self.query, Cog): +            grouped = (("**Commands:**", self.query.commands),) + +        elif isinstance(self.query, commands.Command): +            grouped = (("**Subcommands:**", self.query.commands),) + +        # otherwise sort and organise all commands into categories +        else: +            cat_sort = sorted(filtered, key=self._category_key) +            grouped = itertools.groupby(cat_sort, key=self._category_key) + +        for category, cmds in grouped: +            await self._format_command_category(paginator, category, list(cmds)) + +    async def _format_command_category(self, paginator: LinePaginator, category: str, cmds: list[Command]) -> None: +        cmds = sorted(cmds, key=lambda c: c.name) +        cat_cmds = [] +        for command in cmds: +            cat_cmds += await self._format_command(command) + +        # state var for if the category should be added next +        print_cat = 1 +        new_page = True + +        for details in cat_cmds: + +            # keep details together, paginating early if it won"t fit +            lines_adding = len(details.split("\n")) + print_cat +            if paginator._linecount + lines_adding > self._max_lines: +                paginator._linecount = 0 +                new_page = True +                paginator.close_page() + +                # new page so print category title again +                print_cat = 1 + +            if print_cat: +                if new_page: +                    paginator.add_line("") +                paginator.add_line(category) +                print_cat = 0 + +            paginator.add_line(details) + +    async def _format_command(self, command: Command) -> list[str]: +        # skip if hidden and hide if session is set to +        if command.hidden and not self._show_hidden: +            return [] + +        # Patch to make the !help command work outside of #bot-commands again +        # This probably needs a proper rewrite, but this will make it work in +        # the mean time. +        try: +            can_run = await command.can_run(self._ctx) +        except CheckFailure: +            can_run = False + +        # see if the user can run the command +        strikeout = "" +        if not can_run: +            # skip if we don't show commands they can't run +            if self._only_can_run: +                return [] +            strikeout = "~~" + +        if isinstance(self.query, commands.Command): +            prefix = "" +        else: +            prefix = constants.Client.prefix + +        signature = self._get_command_params(command) +        info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" + +        # handle if the command has no docstring +        short_doc = command.short_doc or "No details provided" +        return [f"{info}\n*{short_doc}*"] + +    def embed_page(self, page_number: int = 0) -> Embed: +        """Returns an Embed with the requested page formatted within.""" +        embed = Embed() + +        if isinstance(self.query, (commands.Command, Cog)) and page_number > 0: +            title = f'Command Help | "{self.query.name}"' +        else: +            title = self.title + +        embed.set_author(name=title, icon_url=constants.Icons.questionmark) +        embed.description = self._pages[page_number] + +        page_count = len(self._pages) +        if page_count > 1: +            embed.set_footer(text=f"Page {self._current_page+1} / {page_count}") + +        return embed + +    async def update_page(self, page_number: int = 0) -> None: +        """Sends the intial message, or changes the existing one to the given page number.""" +        self._current_page = page_number +        embed_page = self.embed_page(page_number) + +        if not self.message: +            self.message = await self.destination.send(embed=embed_page) +        else: +            await self.message.edit(embed=embed_page) + +    @classmethod +    async def start(cls, ctx: Context, *command, **options) -> "HelpSession": +        """ +        Create and begin a help session based on the given command context. + +        Available options kwargs: +            * cleanup: Optional[bool] +                Set to `True` to have the message deleted on session end. Defaults to `False`. +            * only_can_run: Optional[bool] +                Set to `True` to hide commands the user can't run. Defaults to `False`. +            * show_hidden: Optional[bool] +                Set to `True` to include hidden commands. Defaults to `False`. +            * max_lines: Optional[int] +                Sets the max number of lines the paginator will add to a single page. Defaults to 20. +        """ +        session = cls(ctx, *command, **options) +        await session.prepare() + +        return session + +    async def stop(self) -> None: +        """Stops the help session, removes event listeners and attempts to delete the help message.""" +        self._bot.remove_listener(self.on_reaction_add) +        self._bot.remove_listener(self.on_message_delete) + +        # ignore if permission issue, or the message doesn't exist +        with suppress(HTTPException, AttributeError): +            if self._cleanup: +                await self.message.delete() +            else: +                await self.message.clear_reactions() + +    @property +    def is_first_page(self) -> bool: +        """Check if session is currently showing the first page.""" +        return self._current_page == 0 + +    @property +    def is_last_page(self) -> bool: +        """Check if the session is currently showing the last page.""" +        return self._current_page == (len(self._pages)-1) + +    async def do_first(self) -> None: +        """Event that is called when the user requests the first page.""" +        if not self.is_first_page: +            await self.update_page(0) + +    async def do_back(self) -> None: +        """Event that is called when the user requests the previous page.""" +        if not self.is_first_page: +            await self.update_page(self._current_page-1) + +    async def do_next(self) -> None: +        """Event that is called when the user requests the next page.""" +        if not self.is_last_page: +            await self.update_page(self._current_page+1) + +    async def do_end(self) -> None: +        """Event that is called when the user requests the last page.""" +        if not self.is_last_page: +            await self.update_page(len(self._pages)-1) + +    async def do_stop(self) -> None: +        """Event that is called when the user requests to stop the help session.""" +        await self.message.delete() + + +class Help(DiscordCog): +    """Custom Embed Pagination Help feature.""" + +    @commands.command("help") +    async def new_help(self, ctx: Context, *commands) -> None: +        """Shows Command Help.""" +        try: +            await HelpSession.start(ctx, *commands) +        except HelpQueryNotFound as error: +            embed = Embed() +            embed.colour = Colour.red() +            embed.title = str(error) + +            if error.possible_matches: +                matches = "\n".join(error.possible_matches.keys()) +                embed.description = f"**Did you mean:**\n`{matches}`" + +            await ctx.send(embed=embed) + + +def unload(bot: Bot) -> None: +    """ +    Reinstates the original help command. + +    This is run if the cog raises an exception on load, or if the extension is unloaded. +    """ +    bot.remove_command("help") +    bot.add_command(bot._old_help) + + +def setup(bot: Bot) -> None: +    """ +    The setup for the help extension. + +    This is called automatically on `bot.load_extension` being run. +    Stores the original help command instance on the `bot._old_help` attribute for later +    reinstatement, before removing it from the command registry so the new help command can be +    loaded successfully. +    If an exception is raised during the loading of the cog, `unload` will be called in order to +    reinstate the original help command. +    """ +    bot._old_help = bot.get_command("help") +    bot.remove_command("help") + +    try: +        bot.add_cog(Help()) +    except Exception: +        unload(bot) +        raise + + +def teardown(bot: Bot) -> None: +    """ +    The teardown for the help extension. + +    This is called automatically on `bot.unload_extension` being run. +    Calls `unload` in order to reinstate the original help command. +    """ +    unload(bot) diff --git a/bot/exts/core/internal_eval/__init__.py b/bot/exts/core/internal_eval/__init__.py new file mode 100644 index 00000000..695fa74d --- /dev/null +++ b/bot/exts/core/internal_eval/__init__.py @@ -0,0 +1,10 @@ +from bot.bot import Bot + + +def setup(bot: Bot) -> None: +    """Set up the Internal Eval extension.""" +    # Import the Cog at runtime to prevent side effects like defining +    # RedisCache instances too early. +    from ._internal_eval import InternalEval + +    bot.add_cog(InternalEval(bot)) diff --git a/bot/exts/core/internal_eval/_helpers.py b/bot/exts/core/internal_eval/_helpers.py new file mode 100644 index 00000000..5b2f8f5d --- /dev/null +++ b/bot/exts/core/internal_eval/_helpers.py @@ -0,0 +1,248 @@ +import ast +import collections +import contextlib +import functools +import inspect +import io +import logging +import sys +import traceback +import types +from typing import Any, Optional, Union + +log = logging.getLogger(__name__) + +# A type alias to annotate the tuples returned from `sys.exc_info()` +ExcInfo = tuple[type[Exception], Exception, types.TracebackType] +Namespace = dict[str, Any] + +# This will be used as an coroutine function wrapper for the code +# to be evaluated. The wrapper contains one `pass` statement which +# will be replaced with `ast` with the code that we want to have +# evaluated. +# The function redirects output and captures exceptions that were +# raised in the code we evaluate. The latter is used to provide a +# meaningful traceback to the end user. +EVAL_WRAPPER = """ +async def _eval_wrapper_function(): +    try: +        with contextlib.redirect_stdout(_eval_context.stdout): +            pass +        if '_value_last_expression' in locals(): +            if inspect.isawaitable(_value_last_expression): +                _value_last_expression = await _value_last_expression +            _eval_context._value_last_expression = _value_last_expression +        else: +            _eval_context._value_last_expression = None +    except Exception: +        _eval_context.exc_info = sys.exc_info() +    finally: +        _eval_context.locals = locals() +_eval_context.function = _eval_wrapper_function +""" +INTERNAL_EVAL_FRAMENAME = "<internal eval>" +EVAL_WRAPPER_FUNCTION_FRAMENAME = "_eval_wrapper_function" + + +def format_internal_eval_exception(exc_info: ExcInfo, code: str) -> str: +    """Format an exception caught while evaluation code by inserting lines.""" +    exc_type, exc_value, tb = exc_info +    stack_summary = traceback.StackSummary.extract(traceback.walk_tb(tb)) +    code = code.split("\n") + +    output = ["Traceback (most recent call last):"] +    for frame in stack_summary: +        if frame.filename == INTERNAL_EVAL_FRAMENAME: +            line = code[frame.lineno - 1].lstrip() + +            if frame.name == EVAL_WRAPPER_FUNCTION_FRAMENAME: +                name = INTERNAL_EVAL_FRAMENAME +            else: +                name = frame.name +        else: +            line = frame.line +            name = frame.name + +        output.append( +            f'  File "{frame.filename}", line {frame.lineno}, in {name}\n' +            f"    {line}" +        ) + +    output.extend(traceback.format_exception_only(exc_type, exc_value)) +    return "\n".join(output) + + +class EvalContext: +    """ +    Represents the current `internal eval` context. + +    The context remembers names set during earlier runs of `internal eval`. To +    clear the context, use the `.internal clear` command. +    """ + +    def __init__(self, context_vars: Namespace, local_vars: Namespace): +        self._locals = dict(local_vars) +        self.context_vars = dict(context_vars) + +        self.stdout = io.StringIO() +        self._value_last_expression = None +        self.exc_info = None +        self.code = "" +        self.function = None +        self.eval_tree = None + +    @property +    def dependencies(self) -> dict[str, Any]: +        """ +        Return a mapping of the dependencies for the wrapper function. + +        By using a property descriptor, the mapping can't be accidentally +        mutated during evaluation. This ensures the dependencies are always +        available. +        """ +        return { +            "print": functools.partial(print, file=self.stdout), +            "contextlib": contextlib, +            "inspect": inspect, +            "sys": sys, +            "_eval_context": self, +            "_": self._value_last_expression, +        } + +    @property +    def locals(self) -> dict[str, Any]: +        """Return a mapping of names->values needed for evaluation.""" +        return {**collections.ChainMap(self.dependencies, self.context_vars, self._locals)} + +    @locals.setter +    def locals(self, locals_: dict[str, Any]) -> None: +        """Update the contextual mapping of names to values.""" +        log.trace(f"Updating {self._locals} with {locals_}") +        self._locals.update(locals_) + +    def prepare_eval(self, code: str) -> Optional[str]: +        """Prepare an evaluation by processing the code and setting up the context.""" +        self.code = code + +        if not self.code: +            log.debug("No code was attached to the evaluation command") +            return "[No code detected]" + +        try: +            code_tree = ast.parse(code, filename=INTERNAL_EVAL_FRAMENAME) +        except SyntaxError: +            log.debug("Got a SyntaxError while parsing the eval code") +            return "".join(traceback.format_exception(*sys.exc_info(), limit=0)) + +        log.trace("Parsing the AST to see if there's a trailing expression we need to capture") +        code_tree = CaptureLastExpression(code_tree).capture() + +        log.trace("Wrapping the AST in the AST of the wrapper coroutine") +        eval_tree = WrapEvalCodeTree(code_tree).wrap() + +        self.eval_tree = eval_tree +        return None + +    async def run_eval(self) -> Namespace: +        """Run the evaluation and return the updated locals.""" +        log.trace("Compiling the AST to bytecode using `exec` mode") +        compiled_code = compile(self.eval_tree, filename=INTERNAL_EVAL_FRAMENAME, mode="exec") + +        log.trace("Executing the compiled code with the desired namespace environment") +        exec(compiled_code, self.locals)  # noqa: B102,S102 + +        log.trace("Awaiting the created evaluation wrapper coroutine.") +        await self.function() + +        log.trace("Returning the updated captured locals.") +        return self._locals + +    def format_output(self) -> str: +        """Format the output of the most recent evaluation.""" +        output = [] + +        log.trace(f"Getting output from stdout `{id(self.stdout)}`") +        stdout_text = self.stdout.getvalue() +        if stdout_text: +            log.trace("Appending output captured from stdout/print") +            output.append(stdout_text) + +        if self._value_last_expression is not None: +            log.trace("Appending the output of a captured trialing expression") +            output.append(f"[Captured] {self._value_last_expression!r}") + +        if self.exc_info: +            log.trace("Appending exception information") +            output.append(format_internal_eval_exception(self.exc_info, self.code)) + +        log.trace(f"Generated output: {output!r}") +        return "\n".join(output) or "[No output]" + + +class WrapEvalCodeTree(ast.NodeTransformer): +    """Wraps the AST of eval code with the wrapper function.""" + +    def __init__(self, eval_code_tree: ast.AST, *args, **kwargs): +        super().__init__(*args, **kwargs) +        self.eval_code_tree = eval_code_tree + +        # To avoid mutable aliasing, parse the WRAPPER_FUNC for each wrapping +        self.wrapper = ast.parse(EVAL_WRAPPER, filename=INTERNAL_EVAL_FRAMENAME) + +    def wrap(self) -> ast.AST: +        """Wrap the tree of the code by the tree of the wrapper function.""" +        new_tree = self.visit(self.wrapper) +        return ast.fix_missing_locations(new_tree) + +    def visit_Pass(self, node: ast.Pass) -> list[ast.AST]:  # noqa: N802 +        """ +        Replace the `_ast.Pass` node in the wrapper function by the eval AST. + +        This method works on the assumption that there's a single `pass` +        statement in the wrapper function. +        """ +        return list(ast.iter_child_nodes(self.eval_code_tree)) + + +class CaptureLastExpression(ast.NodeTransformer): +    """Captures the return value from a loose expression.""" + +    def __init__(self, tree: ast.AST, *args, **kwargs): +        super().__init__(*args, **kwargs) +        self.tree = tree +        self.last_node = list(ast.iter_child_nodes(tree))[-1] + +    def visit_Expr(self, node: ast.Expr) -> Union[ast.Expr, ast.Assign]:  # noqa: N802 +        """ +        Replace the Expr node that is last child node of Module with an assignment. + +        We use an assignment to capture the value of the last node, if it's a loose +        Expr node. Normally, the value of an Expr node is lost, meaning we don't get +        the output of such a last "loose" expression. By assigning it a name, we can +        retrieve it for our output. +        """ +        if node is not self.last_node: +            return node + +        log.trace("Found a trailing last expression in the evaluation code") + +        log.trace("Creating assignment statement with trailing expression as the right-hand side") +        right_hand_side = list(ast.iter_child_nodes(node))[0] + +        assignment = ast.Assign( +            targets=[ast.Name(id='_value_last_expression', ctx=ast.Store())], +            value=right_hand_side, +            lineno=node.lineno, +            col_offset=0, +        ) +        ast.fix_missing_locations(assignment) +        return assignment + +    def capture(self) -> ast.AST: +        """Capture the value of the last expression with an assignment.""" +        if not isinstance(self.last_node, ast.Expr): +            # We only have to replace a node if the very last node is an Expr node +            return self.tree + +        new_tree = self.visit(self.tree) +        return ast.fix_missing_locations(new_tree) diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py new file mode 100644 index 00000000..4f6b4321 --- /dev/null +++ b/bot/exts/core/internal_eval/_internal_eval.py @@ -0,0 +1,179 @@ +import logging +import re +import textwrap +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Client, Roles +from bot.utils.decorators import with_role +from bot.utils.extensions import invoke_help_command +from ._helpers import EvalContext + +__all__ = ["InternalEval"] + +log = logging.getLogger(__name__) + +FORMATTED_CODE_REGEX = re.compile( +    r"(?P<delim>(?P<block>```)|``?)"        # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block +    r"(?(block)(?:(?P<lang>[a-z]+)\n)?)"    # if we're in a block, match optional language (only letters plus newline) +    r"(?:[ \t]*\n)*"                        # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all code inside the markup +    r"\s*"                                  # any more whitespace before the end of the code markup +    r"(?P=delim)",                          # match the exact same delimiter from the start again +    re.DOTALL | re.IGNORECASE               # "." also matches newlines, case insensitive +) + +RAW_CODE_REGEX = re.compile( +    r"^(?:[ \t]*\n)*"                       # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all the rest as code +    r"\s*$",                                # any trailing whitespace until the end of the string +    re.DOTALL                               # "." also matches newlines +) + + +class InternalEval(commands.Cog): +    """Top secret code evaluation for admins and owners.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.locals = {} + +        if Client.debug: +            self.internal_group.add_check(commands.is_owner().predicate) + +    @staticmethod +    def shorten_output( +            output: str, +            max_length: int = 1900, +            placeholder: str = "\n[output truncated]" +    ) -> str: +        """ +        Shorten the `output` so it's shorter than `max_length`. + +        There are three tactics for this, tried in the following order: +        - Shorten the output on a line-by-line basis +        - Shorten the output on any whitespace character +        - Shorten the output solely on character count +        """ +        max_length = max_length - len(placeholder) + +        shortened_output = [] +        char_count = 0 +        for line in output.split("\n"): +            if char_count + len(line) > max_length: +                break +            shortened_output.append(line) +            char_count += len(line) + 1  # account for (possible) line ending + +        if shortened_output: +            shortened_output.append(placeholder) +            return "\n".join(shortened_output) + +        shortened_output = textwrap.shorten(output, width=max_length, placeholder=placeholder) + +        if shortened_output.strip() == placeholder.strip(): +            # `textwrap` was unable to find whitespace to shorten on, so it has +            # reduced the output to just the placeholder. Let's shorten based on +            # characters instead. +            shortened_output = output[:max_length] + placeholder + +        return shortened_output + +    async def _upload_output(self, output: str) -> Optional[str]: +        """Upload `internal eval` output to our pastebin and return the url.""" +        try: +            async with self.bot.http_session.post( +                "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True +            ) as resp: +                data = await resp.json() + +            if "key" in data: +                return f"https://paste.pythondiscord.com/{data['key']}" +        except Exception: +            # 400 (Bad Request) means there are too many characters +            log.exception("Failed to upload `internal eval` output to paste service!") + +    async def _send_output(self, ctx: commands.Context, output: str) -> None: +        """Send the `internal eval` output to the command invocation context.""" +        upload_message = "" +        if len(output) >= 1980: +            # The output is too long, let's truncate it for in-channel output and +            # upload the complete output to the paste service. +            url = await self._upload_output(output) + +            if url: +                upload_message = f"\nFull output here: {url}" +            else: +                upload_message = "\n:warning: Failed to upload full output!" + +            output = self.shorten_output(output) + +        await ctx.send(f"```py\n{output}\n```{upload_message}") + +    async def _eval(self, ctx: commands.Context, code: str) -> None: +        """Evaluate the `code` in the current evaluation context.""" +        context_vars = { +            "message": ctx.message, +            "author": ctx.author, +            "channel": ctx.channel, +            "guild": ctx.guild, +            "ctx": ctx, +            "self": self, +            "bot": self.bot, +            "discord": discord, +        } + +        eval_context = EvalContext(context_vars, self.locals) + +        log.trace("Preparing the evaluation by parsing the AST of the code") +        error = eval_context.prepare_eval(code) + +        if error: +            log.trace("The code can't be evaluated due to an error") +            await ctx.send(f"```py\n{error}\n```") +            return + +        log.trace("Evaluate the AST we've generated for the evaluation") +        new_locals = await eval_context.run_eval() + +        log.trace("Updating locals with those set during evaluation") +        self.locals.update(new_locals) + +        log.trace("Sending the formatted output back to the context") +        await self._send_output(ctx, eval_context.format_output()) + +    @commands.group(name="internal", aliases=("int",)) +    @with_role(Roles.admin) +    async def internal_group(self, ctx: commands.Context) -> None: +        """Internal commands. Top secret!""" +        if not ctx.invoked_subcommand: +            await invoke_help_command(ctx) + +    @internal_group.command(name="eval", aliases=("e",)) +    @with_role(Roles.admin) +    async def eval(self, ctx: commands.Context, *, code: str) -> None: +        """Run eval in a REPL-like format.""" +        if match := list(FORMATTED_CODE_REGEX.finditer(code)): +            blocks = [block for block in match if block.group("block")] + +            if len(blocks) > 1: +                code = "\n".join(block.group("code") for block in blocks) +            else: +                match = match[0] if len(blocks) == 0 else blocks[0] +                code, block, lang, delim = match.group("code", "block", "lang", "delim") + +        else: +            code = RAW_CODE_REGEX.fullmatch(code).group("code") + +        code = textwrap.dedent(code) +        await self._eval(ctx, code) + +    @internal_group.command(name="reset", aliases=("clear", "exit", "r", "c")) +    @with_role(Roles.admin) +    async def reset(self, ctx: commands.Context) -> None: +        """Reset the context and locals of the eval session.""" +        self.locals = {} +        await ctx.send("The evaluation context was reset.") diff --git a/bot/exts/core/ping.py b/bot/exts/core/ping.py new file mode 100644 index 00000000..6be78117 --- /dev/null +++ b/bot/exts/core/ping.py @@ -0,0 +1,45 @@ +import arrow +from dateutil.relativedelta import relativedelta +from discord import Embed +from discord.ext import commands + +from bot import start_time +from bot.bot import Bot +from bot.constants import Colours + + +class Ping(commands.Cog): +    """Get info about the bot's ping and uptime.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @commands.command(name="ping") +    async def ping(self, ctx: commands.Context) -> None: +        """Ping the bot to see its latency and state.""" +        embed = Embed( +            title=":ping_pong: Pong!", +            colour=Colours.bright_green, +            description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms", +        ) + +        await ctx.send(embed=embed) + +    # Originally made in 70d2170a0a6594561d59c7d080c4280f1ebcd70b by lemon & gdude2002 +    @commands.command(name="uptime") +    async def uptime(self, ctx: commands.Context) -> None: +        """Get the current uptime of the bot.""" +        difference = relativedelta(start_time - arrow.utcnow()) +        uptime_string = start_time.shift( +            seconds=-difference.seconds, +            minutes=-difference.minutes, +            hours=-difference.hours, +            days=-difference.days +        ).humanize() + +        await ctx.send(f"I started up {uptime_string}.") + + +def setup(bot: Bot) -> None: +    """Load the Ping cog.""" +    bot.add_cog(Ping(bot)) diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py new file mode 100644 index 00000000..7572ce51 --- /dev/null +++ b/bot/exts/core/source.py @@ -0,0 +1,85 @@ +import inspect +from pathlib import Path +from typing import Optional + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Source +from bot.utils.converters import SourceConverter, SourceType + + +class BotSource(commands.Cog): +    """Displays information about the bot's source code.""" + +    @commands.command(name="source", aliases=("src",)) +    async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: +        """Display information and a GitHub link to the source code of a command, tag, or cog.""" +        if not source_item: +            embed = Embed(title="Sir Lancebot's GitHub Repository") +            embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") +            embed.set_thumbnail(url=Source.github_avatar_url) +            await ctx.send(embed=embed) +            return + +        embed = await self.build_embed(source_item) +        await ctx.send(embed=embed) + +    def get_source_link(self, source_item: SourceType) -> tuple[str, str, Optional[int]]: +        """ +        Build GitHub link of source item, return this link, file location and first line number. + +        Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval). +        """ +        if isinstance(source_item, commands.Command): +            callback = inspect.unwrap(source_item.callback) +            src = callback.__code__ +            filename = src.co_filename +        else: +            src = type(source_item) +            try: +                filename = inspect.getsourcefile(src) +            except TypeError: +                raise commands.BadArgument("Cannot get source for a dynamically-created object.") + +        if not isinstance(source_item, str): +            try: +                lines, first_line_no = inspect.getsourcelines(src) +            except OSError: +                raise commands.BadArgument("Cannot get source for a dynamically-created object.") + +            lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}" +        else: +            first_line_no = None +            lines_extension = "" + +        file_location = Path(filename).relative_to(Path.cwd()).as_posix() + +        url = f"{Source.github}/blob/main/{file_location}{lines_extension}" + +        return url, file_location, first_line_no or None + +    async def build_embed(self, source_object: SourceType) -> Optional[Embed]: +        """Build embed based on source object.""" +        url, location, first_line = self.get_source_link(source_object) + +        if isinstance(source_object, commands.Command): +            description = source_object.short_doc +            title = f"Command: {source_object.qualified_name}" +        else: +            title = f"Cog: {source_object.qualified_name}" +            description = source_object.description.splitlines()[0] + +        embed = Embed(title=title, description=description) +        embed.set_thumbnail(url=Source.github_avatar_url) +        embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})") +        line_text = f":{first_line}" if first_line else "" +        embed.set_footer(text=f"{location}{line_text}") + +        return embed + + +def setup(bot: Bot) -> None: +    """Load the BotSource cog.""" +    bot.add_cog(BotSource()) | 
