diff options
| -rw-r--r-- | bot/cogs/help.py | 722 | ||||
| -rw-r--r-- | bot/cogs/wolfram.py | 8 | ||||
| -rw-r--r-- | bot/decorators.py | 29 | 
3 files changed, 289 insertions, 470 deletions
| diff --git a/bot/cogs/help.py b/bot/cogs/help.py index 744722220..f020a1725 100644 --- a/bot/cogs/help.py +++ b/bot/cogs/help.py @@ -1,34 +1,48 @@ -import asyncio  import itertools +import logging +from asyncio import TimeoutError  from collections import namedtuple  from contextlib import suppress -from typing import Union +from typing import List -from discord import Colour, Embed, HTTPException, Message, Reaction, User -from discord.ext import commands -from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context +from discord import Colour, Embed, NotFound, Member, Message, Reaction, User +from discord.ext.commands import Bot, Cog, Command, Context, Group, HelpCommand  from fuzzywuzzy import fuzz, process  from bot import constants -from bot.bot import Bot  from bot.constants import Channels, Emojis, STAFF_ROLES  from bot.decorators import redirect_output -from bot.pagination import ( -    FIRST_EMOJI, LAST_EMOJI, -    LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, -) +from bot.pagination import LinePaginator +log = logging.getLogger(__name__) + +COMMANDS_PER_PAGE = 8  DELETE_EMOJI = Emojis.trashcan +PREFIX = constants.Bot.prefix + +Category = namedtuple("Category", ["name", "description", "cogs"]) + + +async def help_cleanup(bot: Bot, author: Member, message: Message) -> None: +    """ +    Runs the cleanup for the help command. + +    Adds the :trashcan: reaction that, when clicked, will delete the help message. +    After a 300 second timeout, the reaction will be removed. +    """ +    def check(r: Reaction, u: User) -> bool: +        """Checks the reaction is :trashcan:, the author is original author and messages are the same.""" +        return str(r) == DELETE_EMOJI and u.id == author.id and r.message.id == message.id -REACTIONS = { -    FIRST_EMOJI: 'first', -    LEFT_EMOJI: 'back', -    RIGHT_EMOJI: 'next', -    LAST_EMOJI: 'end', -    DELETE_EMOJI: 'stop', -} +    await message.add_reaction(DELETE_EMOJI) -Cog = namedtuple('Cog', ['name', 'description', 'commands']) +    try: +        await bot.wait_for("reaction_add", check=check, timeout=300) +        await message.delete() +    except TimeoutError: +        await message.remove_reaction(DELETE_EMOJI, bot.user) +    except NotFound: +        pass  class HelpQueryNotFound(ValueError): @@ -46,22 +60,9 @@ class HelpQueryNotFound(ValueError):          self.possible_matches = possible_matches -class HelpSession: +class CustomHelpCommand(HelpCommand):      """ -    An interactive session for bot and command help output. - -    Expected attributes include: -        * title: str -            The title of the help message. -        * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command] -        * description: str -            The description of the query. -        * pages: list[str] -            A list of the help content split into manageable pages. -        * message: `discord.Message` -            The message object that's showing the help contents. -        * destination: `discord.abc.Messageable` -            Where the help message is to be sent to. +    An interactive instance for the bot help command.      Cogs can be grouped into custom categories. All cogs with the same category will be displayed      under a single category name in the help output. Custom categories are defined inside the cogs @@ -70,499 +71,304 @@ class HelpSession:      the regular description (class docstring) of the first cog found in the category.      """ -    def __init__( -        self, -        ctx: Context, -        *command, -        cleanup: bool = False, -        only_can_run: bool = True, -        show_hidden: bool = False, -        max_lines: int = 15 -    ): -        """Creates an instance of the HelpSession class.""" -        self._ctx = ctx -        self._bot = ctx.bot -        self.title = "Command Help" - -        # set the query details for the session -        if command: -            query_str = ' '.join(command) -            self.query = self._get_query(query_str) -            self.description = self.query.description or self.query.help -        else: -            self.query = ctx.bot -            self.description = self.query.description -        self.author = ctx.author -        self.destination = ctx.channel - -        # set the config for the session -        self._cleanup = cleanup -        self._only_can_run = only_can_run -        self._show_hidden = show_hidden -        self._max_lines = max_lines - -        # init session states -        self._pages = None -        self._current_page = 0 -        self.message = None -        self._timeout_task = None -        self.reset_timeout() - -    def _get_query(self, query: str) -> Union[Command, Cog]: +    def __init__(self): +        super().__init__(command_attrs={"help": "Shows help for bot commands"}) + +    @redirect_output(destination_channel=Channels.bot_commands, bypass_roles=STAFF_ROLES) +    async def prepare_help_command(self, ctx: Context, command: str = None) -> None: +        """Adjust context to redirect to a new channel if required.""" +        self.context = ctx + +    async def command_callback(self, ctx: Context, *, command: str = None) -> None:          """Attempts to match the provided query with a valid command or cog.""" -        command = self._bot.get_command(query) -        if command: -            return command +        # the only reason we need to tamper with this is because d.py does not support "categories", +        # so we need to deal with them ourselves. + +        # handle any command redirection and adjust context channel accordingly. +        await self.prepare_help_command(ctx, command=command) +        bot = ctx.bot + +        if command is None: +            # quick and easy, send bot help if command is none +            mapping = self.get_bot_mapping() +            await self.send_bot_help(mapping) +            return -        # Find all cog categories that match.          cog_matches = []          description = None -        for cog in self._bot.cogs.values(): -            if hasattr(cog, "category") and cog.category == query: +        for cog in bot.cogs.values(): +            if hasattr(cog, "category") and cog.category == command:                  cog_matches.append(cog)                  if hasattr(cog, "category_description"):                      description = cog.category_description -        # Try to search by cog name if no categories match. -        if not cog_matches: -            cog = self._bot.cogs.get(query) - -            # Don't consider it a match if the cog has a category. -            if cog and not hasattr(cog, "category"): -                cog_matches = [cog] -          if cog_matches: -            cog = cog_matches[0] -            cmds = (cog.get_commands() for cog in cog_matches)  # Commands of all cogs - -            return Cog( -                name=cog.category if hasattr(cog, "category") else cog.qualified_name, -                description=description or cog.description, -                commands=tuple(itertools.chain.from_iterable(cmds))  # Flatten the list -            ) - -        self._handle_not_found(query) - -    def _handle_not_found(self, query: str) -> None: -        """ -        Handles when a query does not match a valid command or cog. - -        Will pass on possible close matches along with the `HelpQueryNotFound` exception. -        """ -        # Combine command and cog names -        choices = list(self._bot.all_commands) + list(self._bot.cogs) - -        result = process.extractBests(query, choices, scorer=fuzz.ratio, score_cutoff=90) - -        raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) - -    async def timeout(self, seconds: int = 30) -> None: -        """Waits for a set number of seconds, then stops the help session.""" -        await asyncio.sleep(seconds) -        await self.stop() - -    def reset_timeout(self) -> None: -        """Cancels the original timeout task and sets it again from the start.""" -        # cancel original if it exists -        if self._timeout_task: -            if not self._timeout_task.cancelled(): -                self._timeout_task.cancel() - -        # recreate the timeout task -        self._timeout_task = self._bot.loop.create_task(self.timeout()) - -    async def on_reaction_add(self, reaction: Reaction, user: User) -> None: -        """Event handler for when reactions are added on the help message.""" -        # ensure it was the relevant session message -        if reaction.message.id != self.message.id: -            return - -        # ensure it was the session author who reacted -        if user.id != self.author.id: +            category = Category(name=command, description=description, cogs=cog_matches) +            await self.send_category_help(category)              return -        emoji = str(reaction.emoji) - -        # check if valid action -        if emoji not in REACTIONS: -            return - -        self.reset_timeout() - -        # Run relevant action method -        action = getattr(self, f'do_{REACTIONS[emoji]}', None) -        if action: -            await action() +        # it's either a cog, group, command or subcommand, let super deal with it +        await super().command_callback(ctx, command=command) -        # remove the added reaction to prep for re-use -        with suppress(HTTPException): -            await self.message.remove_reaction(reaction, user) +    async def get_all_help_choices(self) -> set: +        """ +        Get all the possible options for getting help in the bot. -    async def on_message_delete(self, message: Message) -> None: -        """Closes the help session when the help message is deleted.""" -        if message.id == self.message.id: -            await self.stop() +        This will only display commands the author has permission to run. -    async def prepare(self) -> None: -        """Sets up the help session pages, events, message and reactions.""" -        # create paginated content -        await self.build_pages() +        These include: +        - Category names +        - Cog names +        - Group command names (and aliases) +        - Command names (and aliases) +        - Subcommand names (with parent group and aliases for subcommand, but not including aliases for group) -        # setup listeners -        self._bot.add_listener(self.on_reaction_add) -        self._bot.add_listener(self.on_message_delete) +        Options and choices are case sensitive. +        """ +        # first get all commands including subcommands and full command name aliases +        choices = set() +        for c in await self.filter_commands(self.context.bot.walk_commands()): +            # the the command or group name +            choices.add(str(c)) + +            if isinstance(c, Command): +                # all aliases if it's just a command +                choices.update(c.aliases) +            else: +                # otherwise we need to add the parent name in +                choices.update(f"{c.full_parent_name} {a}" for a in c.aliases) -        # Send the help message -        await self.update_page() -        self.add_reactions() +        # all cog names +        choices.update(self.context.bot.cogs) -    def add_reactions(self) -> None: -        """Adds the relevant reactions to the help message based on if pagination is required.""" -        # if paginating -        if len(self._pages) > 1: -            for reaction in REACTIONS: -                self._bot.loop.create_task(self.message.add_reaction(reaction)) +        # all category names +        choices.update(n.category for n in self.context.bot.cogs.values() if hasattr(n, "category")) +        return choices -        # if single-page -        else: -            self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI)) - -    def _category_key(self, cmd: Command) -> str: +    async def command_not_found(self, string: str) -> "HelpQueryNotFound":          """ -        Returns a cog name of a given command for use as a key for `sorted` and `groupby`. +        Handles when a query does not match a valid command, group, cog or category. -        A zero width space is used as a prefix for results with no cogs to force them last in ordering. +        Will return an instance of the `HelpQueryNotFound` exception with the error message and possible matches.          """ -        if cmd.cog: -            try: -                if cmd.cog.category: -                    return f'**{cmd.cog.category}**' -            except AttributeError: -                pass +        choices = await self.get_all_help_choices() +        result = process.extractBests(string, choices, scorer=fuzz.ratio, score_cutoff=80) -            return f'**{cmd.cog_name}**' -        else: -            return "**\u200bNo Category:**" +        return HelpQueryNotFound(f'Query "{string}" not found.', dict(result)) -    def _get_command_params(self, cmd: Command) -> str: +    async def subcommand_not_found(self, command: Command, string: str) -> "HelpQueryNotFound":          """ -        Returns the command usage signature. +        Redirects the error to `command_not_found`. -        This is a custom implementation of `command.signature` in order to format the command -        signature without aliases. +        `command_not_found` deals with searching and getting best choices for both commands and subcommands.          """ -        results = [] -        for name, param in cmd.clean_params.items(): - -            # if argument has a default value -            if param.default is not param.empty: - -                if isinstance(param.default, str): -                    show_default = param.default -                else: -                    show_default = param.default is not None - -                # if default is not an empty string or None -                if show_default: -                    results.append(f'[{name}={param.default}]') -                else: -                    results.append(f'[{name}]') - -            # if variable length argument -            elif param.kind == param.VAR_POSITIONAL: -                results.append(f'[{name}...]') - -            # if required -            else: -                results.append(f'<{name}>') - -        return f"{cmd.name} {' '.join(results)}" - -    async def build_pages(self) -> None: -        """Builds the list of content pages to be paginated through in the help message, as a list of str.""" -        # Use LinePaginator to restrict embed line height -        paginator = LinePaginator(prefix='', suffix='', max_lines=self._max_lines) - -        prefix = constants.Bot.prefix +        return await self.command_not_found(f"{command.qualified_name} {string}") -        # show signature if query is a command -        if isinstance(self.query, commands.Command): -            signature = self._get_command_params(self.query) -            parent = self.query.full_parent_name + ' ' if self.query.parent else '' -            paginator.add_line(f'**```{prefix}{parent}{signature}```**') +    async def send_error_message(self, error: HelpQueryNotFound) -> None: +        """Send the error message to the channel.""" +        embed = Embed(colour=Colour.red(), title=str(error)) -            # show command aliases -            aliases = ', '.join(f'`{a}`' for a in self.query.aliases) -            if aliases: -                paginator.add_line(f'**Can also use:** {aliases}\n') +        if getattr(error, "possible_matches", None): +            matches = "\n".join(f"`{n}`" for n in error.possible_matches) +            embed.description = f"**Did you mean:**\n{matches}" -            if not await self.query.can_run(self._ctx): -                paginator.add_line('***You cannot run this command.***\n') +        await self.context.send(embed=embed) -        # show name if query is a cog -        if isinstance(self.query, Cog): -            paginator.add_line(f'**{self.query.name}**') - -        if self.description: -            paginator.add_line(f'*{self.description}*') - -        # list all children commands of the queried object -        if isinstance(self.query, (commands.GroupMixin, Cog)): - -            # remove hidden commands if session is not wanting hiddens -            if not self._show_hidden: -                filtered = [c for c in self.query.commands if not c.hidden] -            else: -                filtered = self.query.commands - -            # if after filter there are no commands, finish up -            if not filtered: -                self._pages = paginator.pages -                return - -            # set category to Commands if cog -            if isinstance(self.query, Cog): -                grouped = (('**Commands:**', self.query.commands),) - -            # set category to Subcommands if command -            elif isinstance(self.query, commands.Command): -                grouped = (('**Subcommands:**', self.query.commands),) - -                # don't show prefix for subcommands -                prefix = '' - -            # otherwise sort and organise all commands into categories -            else: -                cat_sort = sorted(filtered, key=self._category_key) -                grouped = itertools.groupby(cat_sort, key=self._category_key) - -            # process each category -            for category, cmds in grouped: -                cmds = sorted(cmds, key=lambda c: c.name) - -                # if there are no commands, skip category -                if len(cmds) == 0: -                    continue - -                cat_cmds = [] - -                # format details for each child command -                for command in cmds: +    async def command_formatting(self, command: Command) -> Embed: +        """ +        Takes a command and turns it into an embed. -                    # skip if hidden and hide if session is set to -                    if command.hidden and not self._show_hidden: -                        continue +        It will add an author, command signature + help, aliases and a note if the user can't run the command. +        """ +        embed = Embed() +        embed.set_author(name="Command Help", icon_url=constants.Icons.questionmark) -                    # see if the user can run the command -                    strikeout = '' +        parent = command.full_parent_name -                    # Patch to make the !help command work outside of #bot-commands again -                    # This probably needs a proper rewrite, but this will make it work in -                    # the mean time. -                    try: -                        can_run = await command.can_run(self._ctx) -                    except CheckFailure: -                        can_run = False +        name = str(command) if not parent else f"{parent} {command.name}" +        fmt = f"**```{PREFIX}{name} {command.signature}```**\n" -                    if not can_run: -                        # skip if we don't show commands they can't run -                        if self._only_can_run: -                            continue -                        strikeout = '~~' +        # show command aliases +        aliases = ", ".join(f"`{a}`" if not parent else f"`{parent} {a}`" for a in command.aliases) +        if aliases: +            fmt += f"**Can also use:** {aliases}\n\n" -                    signature = self._get_command_params(command) -                    info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" +        # check if the user is allowed to run this command +        if not await command.can_run(self.context): +            fmt += "***You cannot run this command.***\n\n" -                    # handle if the command has no docstring -                    if command.short_doc: -                        cat_cmds.append(f'{info}\n*{command.short_doc}*') -                    else: -                        cat_cmds.append(f'{info}\n*No details provided.*') +        fmt += f"*{command.help or 'No details provided.'}*\n" +        embed.description = fmt -                # state var for if the category should be added next -                print_cat = 1 -                new_page = True +        return embed -                for details in cat_cmds: +    async def send_command_help(self, command: Command) -> None: +        """Send help for a single command.""" +        embed = await self.command_formatting(command) +        message = await self.context.send(embed=embed) +        await help_cleanup(self.context.bot, self.context.author, message) + +    @staticmethod +    def get_commands_brief_details(commands_: List[Command]) -> str: +        """Formats the prefix, command name and signature, and short doc for an iterable of commands.""" +        details = "" +        for c in commands_: +            signature = f" {c.signature}" if c.signature else "" +            details += f"\n**`{PREFIX}{c.qualified_name}{signature}`**\n*{c.short_doc or 'No details provided'}*" + +        return details + +    async def send_group_help(self, group: Group) -> None: +        """Sends help for a group command.""" +        subcommands = group.commands + +        if len(subcommands) == 0: +            # no subcommands, just treat it like a regular command +            await self.send_command_help(group) +            return -                    # keep details together, paginating early if it won't fit -                    lines_adding = len(details.split('\n')) + print_cat -                    if paginator._linecount + lines_adding > self._max_lines: -                        paginator._linecount = 0 -                        new_page = True -                        paginator.close_page() +        # remove commands that the user can't run and are hidden, and sort by name +        commands_ = await self.filter_commands(subcommands, sort=True) -                        # new page so print category title again -                        print_cat = 1 +        embed = await self.command_formatting(group) -                    if print_cat: -                        if new_page: -                            paginator.add_line('') -                        paginator.add_line(category) -                        print_cat = 0 +        command_details = self.get_commands_brief_details(commands_) +        if command_details: +            embed.description += f"\n**Subcommands:**\n{command_details}" -                    paginator.add_line(details) +        message = await self.context.send(embed=embed) +        await help_cleanup(self.context.bot, self.context.author, message) -        # save organised pages to session -        self._pages = paginator.pages +    async def send_cog_help(self, cog: Cog) -> None: +        """Send help for a cog.""" +        # sort commands by name, and remove any the user cant run or are hidden. +        commands_ = await self.filter_commands(cog.get_commands(), sort=True) -    def embed_page(self, page_number: int = 0) -> Embed: -        """Returns an Embed with the requested page formatted within."""          embed = Embed() +        embed.set_author(name="Command Help", icon_url=constants.Icons.questionmark) +        embed.description = f"**{cog.qualified_name}**\n*{cog.description}*" -        # if command or cog, add query to title for pages other than first -        if isinstance(self.query, (commands.Command, Cog)) and page_number > 0: -            title = f'Command Help | "{self.query.name}"' -        else: -            title = self.title - -        embed.set_author(name=title, icon_url=constants.Icons.questionmark) -        embed.description = self._pages[page_number] +        command_details = self.get_commands_brief_details(commands_) +        if command_details: +            embed.description += f"\n\n**Commands:**\n{command_details}" -        # add page counter to footer if paginating -        page_count = len(self._pages) -        if page_count > 1: -            embed.set_footer(text=f'Page {self._current_page+1} / {page_count}') +        message = await self.context.send(embed=embed) +        await help_cleanup(self.context.bot, self.context.author, message) -        return embed - -    async def update_page(self, page_number: int = 0) -> None: -        """Sends the intial message, or changes the existing one to the given page number.""" -        self._current_page = page_number -        embed_page = self.embed_page(page_number) +    @staticmethod +    def _category_key(cmd: Command) -> str: +        """ +        Returns a cog name of a given command for use as a key for `sorted` and `groupby`. -        if not self.message: -            self.message = await self.destination.send(embed=embed_page) +        A zero width space is used as a prefix for results with no cogs to force them last in ordering. +        """ +        if cmd.cog: +            with suppress(AttributeError): +                if cmd.cog.category: +                    return f"**{cmd.cog.category}**" +            return f"**{cmd.cog_name}**"          else: -            await self.message.edit(embed=embed_page) +            return "**\u200bNo Category:**" -    @classmethod -    async def start(cls, ctx: Context, *command, **options) -> "HelpSession": +    async def send_category_help(self, category: Category) -> None:          """ -        Create and begin a help session based on the given command context. - -        Available options kwargs: -            * cleanup: Optional[bool] -                Set to `True` to have the message deleted on session end. Defaults to `False`. -            * only_can_run: Optional[bool] -                Set to `True` to hide commands the user can't run. Defaults to `False`. -            * show_hidden: Optional[bool] -                Set to `True` to include hidden commands. Defaults to `False`. -            * max_lines: Optional[int] -                Sets the max number of lines the paginator will add to a single page. Defaults to 20. +        Sends help for a bot category. + +        This sends a brief help for all commands in all cogs registered to the category.          """ -        session = cls(ctx, *command, **options) -        await session.prepare() +        embed = Embed() +        embed.set_author(name="Command Help", icon_url=constants.Icons.questionmark) -        return session +        all_commands = [] +        for c in category.cogs: +            all_commands.extend(c.get_commands()) -    async def stop(self) -> None: -        """Stops the help session, removes event listeners and attempts to delete the help message.""" -        self._bot.remove_listener(self.on_reaction_add) -        self._bot.remove_listener(self.on_message_delete) +        filtered_commands = await self.filter_commands(all_commands, sort=True) -        # ignore if permission issue, or the message doesn't exist -        with suppress(HTTPException, AttributeError): -            if self._cleanup: -                await self.message.delete() -            else: -                await self.message.clear_reactions() +        lines = [ +            f"`{PREFIX}{c.qualified_name}{f' {c.signature}' if c.signature else ''}`" +            f"\n*{c.short_doc or 'No details provided.'}*" for c in filtered_commands +        ] -    @property -    def is_first_page(self) -> bool: -        """Check if session is currently showing the first page.""" -        return self._current_page == 0 +        description = f"**{category.name}**\n*{category.description}*" -    @property -    def is_last_page(self) -> bool: -        """Check if the session is currently showing the last page.""" -        return self._current_page == (len(self._pages)-1) +        if lines: +            description += "\n\n**Commands:**" -    async def do_first(self) -> None: -        """Event that is called when the user requests the first page.""" -        if not self.is_first_page: -            await self.update_page(0) +        await LinePaginator.paginate( +            lines, +            self.context, +            embed, +            prefix=description, +            max_lines=COMMANDS_PER_PAGE, +            max_size=2040 +        ) -    async def do_back(self) -> None: -        """Event that is called when the user requests the previous page.""" -        if not self.is_first_page: -            await self.update_page(self._current_page-1) +    async def send_bot_help(self, mapping: dict) -> None: +        """Sends help for all bot commands and cogs.""" +        bot = self.context.bot -    async def do_next(self) -> None: -        """Event that is called when the user requests the next page.""" -        if not self.is_last_page: -            await self.update_page(self._current_page+1) +        embed = Embed() +        embed.set_author(name="Command Help", icon_url=constants.Icons.questionmark) + +        filter_commands = await self.filter_commands(bot.commands, sort=True, key=self._category_key) + +        cog_or_category_pages = [] + +        for cog_or_category, _commands in itertools.groupby(filter_commands, key=self._category_key): +            sorted_commands = sorted(_commands, key=lambda c: c.name) + +            if len(sorted_commands) == 0: +                continue + +            command_details = [ +                f"`{PREFIX}{c.qualified_name}{f' {c.signature}' if c.signature else ''}`" +                f"\n*{c.short_doc or 'No details provided.'}*" for c in sorted_commands +            ] + +            # Split cogs or categories which have too many commands to fit in one page. +            # The length of commands is included for later use when aggregating into pages for the paginator. +            for i in range(0, len(sorted_commands), COMMANDS_PER_PAGE): +                truncated_fmt = command_details[i:i + COMMANDS_PER_PAGE] +                joined_fmt = "\n".join(truncated_fmt) +                cog_or_category_pages.append((f"**{cog_or_category}**\n{joined_fmt}", len(truncated_fmt))) + +        pages = [] +        counter = 0 +        page = "" +        for fmt, length in cog_or_category_pages: +            counter += length +            if counter > COMMANDS_PER_PAGE: +                # force a new page on paginator even if it falls short of the max pages +                # since we still want to group categories/cogs. +                counter = length +                pages.append(page) +                page = f"{fmt}\n\n" +            else: +                page += f"{fmt}\n\n" -    async def do_end(self) -> None: -        """Event that is called when the user requests the last page.""" -        if not self.is_last_page: -            await self.update_page(len(self._pages)-1) +        if page: +            # add any remaining command help that didn't get added in the last iteration above. +            pages.append(page) -    async def do_stop(self) -> None: -        """Event that is called when the user requests to stop the help session.""" -        await self.message.delete() +        await LinePaginator.paginate(pages, self.context, embed=embed, max_lines=1, max_size=2040) -class Help(DiscordCog): +class Help(Cog):      """Custom Embed Pagination Help feature.""" -    @commands.command('help') -    @redirect_output(destination_channel=Channels.bot_commands, bypass_roles=STAFF_ROLES) -    async def new_help(self, ctx: Context, *commands) -> None: -        """Shows Command Help.""" -        try: -            await HelpSession.start(ctx, *commands) -        except HelpQueryNotFound as error: -            embed = Embed() -            embed.colour = Colour.red() -            embed.title = str(error) - -            if error.possible_matches: -                matches = '\n'.join(error.possible_matches.keys()) -                embed.description = f'**Did you mean:**\n`{matches}`' - -            await ctx.send(embed=embed) - +    def __init__(self, bot: Bot) -> None: +        self.bot = bot +        self.old_help_command = bot.help_command +        bot.help_command = CustomHelpCommand() +        bot.help_command.cog = self -def unload(bot: Bot) -> None: -    """ -    Reinstates the original help command. - -    This is run if the cog raises an exception on load, or if the extension is unloaded. -    """ -    bot.remove_command('help') -    bot.add_command(bot._old_help) +    def cog_unload(self) -> None: +        """Reset the help command when the cog is unloaded.""" +        self.bot.help_command = self.old_help_command  def setup(bot: Bot) -> None: -    """ -    The setup for the help extension. - -    This is called automatically on `bot.load_extension` being run. - -    Stores the original help command instance on the `bot._old_help` attribute for later -    reinstatement, before removing it from the command registry so the new help command can be -    loaded successfully. - -    If an exception is raised during the loading of the cog, `unload` will be called in order to -    reinstate the original help command. -    """ -    bot._old_help = bot.get_command('help') -    bot.remove_command('help') - -    try: -        bot.add_cog(Help()) -    except Exception: -        unload(bot) -        raise - - -def teardown(bot: Bot) -> None: -    """ -    The teardown for the help extension. - -    This is called automatically on `bot.unload_extension` being run. - -    Calls `unload` in order to reinstate the original help command. -    """ -    unload(bot) +    """Load the Help cog.""" +    bot.add_cog(Help(bot)) +    log.info("Cog loaded: Help") diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py index 5d6b4630b..e6cae3bb8 100644 --- a/bot/cogs/wolfram.py +++ b/bot/cogs/wolfram.py @@ -60,6 +60,14 @@ def custom_cooldown(*ignore: List[int]) -> Callable:      A list of roles may be provided to ignore the per-user cooldown      """      async def predicate(ctx: Context) -> bool: +        if ctx.invoked_with == 'help': +            # if the invoked command is help we don't want to increase the ratelimits since it's not actually +            # invoking the command/making a request, so instead just check if the user/guild are on cooldown. +            guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0  # if guild is on cooldown +            if not any(r.id in ignore for r in ctx.author.roles):  # check user bucket if user is not ignored +                return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 +            return guild_cooldown +          user_bucket = usercd.get_bucket(ctx.message)          if all(role.id not in ignore for role in ctx.author.roles): diff --git a/bot/decorators.py b/bot/decorators.py index 2d18eaa6a..185521552 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -6,7 +6,7 @@ from functools import wraps  from typing import Callable, Container, Union  from weakref import WeakValueDictionary -from discord import Colour, Embed, Member +from discord import Colour, Embed, Member, Message  from discord.errors import NotFound  from discord.ext import commands  from discord.ext.commands import CheckFailure, Cog, Context @@ -110,6 +110,20 @@ def locked() -> Callable:      return wrap +async def delete_invocation(ctx: Context, message: Message) -> None: +    """Task to delete the invocation and user redirection messages.""" +    if RedirectOutput.delete_invocation: +        await sleep(RedirectOutput.delete_delay) + +        with suppress(NotFound): +            await message.delete() +            log.trace("Redirect output: Deleted user redirection message") + +        with suppress(NotFound): +            await ctx.message.delete() +            log.trace("Redirect output: Deleted invocation message") + +  def redirect_output(destination_channel: int, bypass_roles: Container[int] = None) -> Callable:      """      Changes the channel in the context of the command to redirect the output to a certain channel. @@ -143,17 +157,8 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non                  f"Hey, {ctx.author.mention}, you can find the output of your command here: "                  f"{redirect_channel.mention}"              ) - -            if RedirectOutput.delete_invocation: -                await sleep(RedirectOutput.delete_delay) - -                with suppress(NotFound): -                    await message.delete() -                    log.trace("Redirect output: Deleted user redirection message") - -                with suppress(NotFound): -                    await ctx.message.delete() -                    log.trace("Redirect output: Deleted invocation message") +            # we need to run it in a task for the help command - which gets held up if waiting for invocation deletion. +            ctx.bot.loop.create_task(delete_invocation(ctx, message))          return inner      return wrap | 
