diff options
| -rw-r--r-- | bot/cogs/extensions.py | 267 | 
1 files changed, 69 insertions, 198 deletions
| diff --git a/bot/cogs/extensions.py b/bot/cogs/extensions.py index c3d6fae27..e24e95e6d 100644 --- a/bot/cogs/extensions.py +++ b/bot/cogs/extensions.py @@ -1,5 +1,5 @@  import logging -import os +import textwrap  import typing as t  from enum import Enum  from pkgutil import iter_modules @@ -13,7 +13,7 @@ from bot.utils.checks import with_role_check  log = logging.getLogger(__name__) -KEEP_LOADED = ["bot.cogs.extensions", "bot.cogs.modlog"] +UNLOAD_BLACKLIST = {"bot.cogs.extensions", "bot.cogs.modlog"}  EXTENSIONS = frozenset(      ext.name      for ext in iter_modules(("bot/cogs",), "bot.cogs.") @@ -59,214 +59,45 @@ class Extensions(Cog):      def __init__(self, bot: Bot):          self.bot = bot -    @group(name='extensions', aliases=('c', 'ext', 'exts'), invoke_without_command=True) +    @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True)      async def extensions_group(self, ctx: Context) -> None: -        """Load, unload, reload, and list active cogs.""" +        """Load, unload, reload, and list loaded extensions."""          await ctx.invoke(self.bot.get_command("help"), "extensions") -    @extensions_group.command(name='load', aliases=('l',)) -    async def load_command(self, ctx: Context, cog: str) -> None: -        """ -        Load up an unloaded cog, given the module containing it. - -        You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the -        entire module directly. -        """ -        cog = cog.lower() - -        embed = Embed() -        embed.colour = Colour.red() - -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        if cog in self.cogs: -            full_cog = self.cogs[cog] -        elif "." in cog: -            full_cog = cog +    @extensions_group.command(name="load", aliases=("l",)) +    async def load_command(self, ctx: Context, extension: Extension) -> None: +        """Load an extension given its fully qualified or unqualified name.""" +        msg, _ = self.manage(extension, Action.LOAD) +        await ctx.send(msg) + +    @extensions_group.command(name="unload", aliases=("ul",)) +    async def unload_command(self, ctx: Context, extension: Extension) -> None: +        """Unload a currently loaded extension given its fully qualified or unqualified name.""" +        if extension in UNLOAD_BLACKLIST: +            msg = f":x: The extension `{extension}` may not be unloaded."          else: -            full_cog = None -            log.warning(f"{ctx.author} requested we load the '{cog}' cog, but that cog doesn't exist.") -            embed.description = f"Unknown cog: {cog}" - -        if full_cog: -            if full_cog not in self.bot.extensions: -                try: -                    self.bot.load_extension(full_cog) -                except ImportError: -                    log.exception(f"{ctx.author} requested we load the '{cog}' cog, " -                                  f"but the cog module {full_cog} could not be found!") -                    embed.description = f"Invalid cog: {cog}\n\nCould not find cog module {full_cog}" -                except Exception as e: -                    log.exception(f"{ctx.author} requested we load the '{cog}' cog, " -                                  "but the loading failed") -                    embed.description = f"Failed to load cog: {cog}\n\n{e.__class__.__name__}: {e}" -                else: -                    log.debug(f"{ctx.author} requested we load the '{cog}' cog. Cog loaded!") -                    embed.description = f"Cog loaded: {cog}" -                    embed.colour = Colour.green() -            else: -                log.warning(f"{ctx.author} requested we load the '{cog}' cog, but the cog was already loaded!") -                embed.description = f"Cog {cog} is already loaded" +            msg, _ = self.manage(extension, Action.UNLOAD) -        await ctx.send(embed=embed) +        await ctx.send(msg) -    @extensions_group.command(name='unload', aliases=('ul',)) -    async def unload_command(self, ctx: Context, cog: str) -> None: +    @extensions_group.command(name="reload", aliases=("r",)) +    async def reload_command(self, ctx: Context, extension: Extension) -> None:          """ -        Unload an already-loaded cog, given the module containing it. +        Reload an extension given its fully qualified or unqualified name. -        You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the -        entire module directly. +        If `*` is given as the name, all currently loaded extensions will be reloaded. +        If `**` is given as the name, all extensions, including unloaded ones, will be reloaded.          """ -        cog = cog.lower() - -        embed = Embed() -        embed.colour = Colour.red() - -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        if cog in self.cogs: -            full_cog = self.cogs[cog] -        elif "." in cog: -            full_cog = cog +        if extension == "*": +            msg = await self.reload_all() +        elif extension == "**": +            msg = await self.reload_all(True)          else: -            full_cog = None -            log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but that cog doesn't exist.") -            embed.description = f"Unknown cog: {cog}" - -        if full_cog: -            if full_cog in KEEP_LOADED: -                log.warning(f"{ctx.author} requested we unload `{full_cog}`, that sneaky pete. We said no.") -                embed.description = f"You may not unload `{full_cog}`!" -            elif full_cog in self.bot.extensions: -                try: -                    self.bot.unload_extension(full_cog) -                except Exception as e: -                    log.exception(f"{ctx.author} requested we unload the '{cog}' cog, " -                                  "but the unloading failed") -                    embed.description = f"Failed to unload cog: {cog}\n\n```{e}```" -                else: -                    log.debug(f"{ctx.author} requested we unload the '{cog}' cog. Cog unloaded!") -                    embed.description = f"Cog unloaded: {cog}" -                    embed.colour = Colour.green() -            else: -                log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but the cog wasn't loaded!") -                embed.description = f"Cog {cog} is not loaded" - -        await ctx.send(embed=embed) +            msg, _ = self.manage(extension, Action.RELOAD) -    @extensions_group.command(name='reload', aliases=('r',)) -    async def reload_command(self, ctx: Context, cog: str) -> None: -        """ -        Reload an unloaded cog, given the module containing it. +        await ctx.send(msg) -        You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the -        entire module directly. - -        If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the -        bot/cogs directory will be loaded. -        """ -        cog = cog.lower() - -        embed = Embed() -        embed.colour = Colour.red() - -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        if cog == "*": -            full_cog = cog -        elif cog in self.cogs: -            full_cog = self.cogs[cog] -        elif "." in cog: -            full_cog = cog -        else: -            full_cog = None -            log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but that cog doesn't exist.") -            embed.description = f"Unknown cog: {cog}" - -        if full_cog: -            if full_cog == "*": -                all_cogs = [ -                    f"bot.cogs.{fn[:-3]}" for fn in os.listdir("bot/cogs") -                    if os.path.isfile(f"bot/cogs/{fn}") and fn.endswith(".py") and "_" not in fn -                ] - -                failed_unloads = {} -                failed_loads = {} - -                unloaded = 0 -                loaded = 0 - -                for loaded_cog in self.bot.extensions.copy().keys(): -                    try: -                        self.bot.unload_extension(loaded_cog) -                    except Exception as e: -                        failed_unloads[loaded_cog] = f"{e.__class__.__name__}: {e}" -                    else: -                        unloaded += 1 - -                for unloaded_cog in all_cogs: -                    try: -                        self.bot.load_extension(unloaded_cog) -                    except Exception as e: -                        failed_loads[unloaded_cog] = f"{e.__class__.__name__}: {e}" -                    else: -                        loaded += 1 - -                lines = [ -                    "**All cogs reloaded**", -                    f"**Unloaded**: {unloaded} / **Loaded**: {loaded}" -                ] - -                if failed_unloads: -                    lines.append("\n**Unload failures**") - -                    for cog, error in failed_unloads: -                        lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`") - -                if failed_loads: -                    lines.append("\n**Load failures**") - -                    for cog, error in failed_loads.items(): -                        lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`") - -                log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n" -                          f"{lines}") - -                await LinePaginator.paginate(lines, ctx, embed, empty=False) -                return - -            elif full_cog in self.bot.extensions: -                try: -                    self.bot.unload_extension(full_cog) -                    self.bot.load_extension(full_cog) -                except Exception as e: -                    log.exception(f"{ctx.author} requested we reload the '{cog}' cog, " -                                  "but the unloading failed") -                    embed.description = f"Failed to reload cog: {cog}\n\n```{e}```" -                else: -                    log.debug(f"{ctx.author} requested we reload the '{cog}' cog. Cog reloaded!") -                    embed.description = f"Cog reload: {cog}" -                    embed.colour = Colour.green() -            else: -                log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but the cog wasn't loaded!") -                embed.description = f"Cog {cog} is not loaded" - -        await ctx.send(embed=embed) - -    @extensions_group.command(name='list', aliases=('all',)) +    @extensions_group.command(name="list", aliases=("all",))      async def list_command(self, ctx: Context) -> None:          """          Get a list of all cogs, including their loaded status. @@ -311,6 +142,46 @@ class Extensions(Cog):          log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.")          await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False) +    async def reload_all(self, reload_unloaded: bool = False) -> str: +        """Reload all loaded (and optionally unloaded) extensions and return an output message.""" +        unloaded = [] +        unload_failures = {} +        load_failures = {} + +        to_unload = self.bot.extensions.copy().keys() +        for extension in to_unload: +            _, error = self.manage(extension, Action.UNLOAD) +            if error: +                unload_failures[extension] = error +            else: +                unloaded.append(extension) + +        if reload_unloaded: +            unloaded = EXTENSIONS + +        for extension in unloaded: +            _, error = self.manage(extension, Action.LOAD) +            if error: +                load_failures[extension] = error + +        msg = textwrap.dedent(f""" +            **All extensions reloaded** +            Unloaded: {len(to_unload) - len(unload_failures)} / {len(to_unload)} +            Loaded: {len(unloaded) - len(load_failures)} / {len(unloaded)} +        """).strip() + +        if unload_failures: +            failures = '\n'.join(f'{ext}\n    {err}' for ext, err in unload_failures) +            msg += f'\nUnload failures:```{failures}```' + +        if load_failures: +            failures = '\n'.join(f'{ext}\n    {err}' for ext, err in load_failures) +            msg += f'\nLoad failures:```{failures}```' + +        log.debug(f'Reloaded all extensions.') + +        return msg +      def manage(self, ext: str, action: Action) -> t.Tuple[str, t.Optional[str]]:          """Apply an action to an extension and return the status message and any error message."""          verb = action.name.lower() | 
