diff options
40 files changed, 1951 insertions, 1948 deletions
| diff --git a/.gitignore b/.gitignore index 261fa179f..a191523b6 100644 --- a/.gitignore +++ b/.gitignore @@ -116,3 +116,6 @@ config.yml  # JUnit XML reports from pytest  junit.xml + +# Mac OS .DS_Store, which is a file that stores custom attributes of its containing folder +.DS_Store diff --git a/bot/__main__.py b/bot/__main__.py index f25693734..19a7e5ec6 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -36,14 +36,13 @@ log.addHandler(APILoggingHandler(bot.api_client))  bot.load_extension("bot.cogs.error_handler")  bot.load_extension("bot.cogs.filtering")  bot.load_extension("bot.cogs.logging") -bot.load_extension("bot.cogs.modlog")  bot.load_extension("bot.cogs.security")  # Commands, etc  bot.load_extension("bot.cogs.antispam")  bot.load_extension("bot.cogs.bot")  bot.load_extension("bot.cogs.clean") -bot.load_extension("bot.cogs.cogs") +bot.load_extension("bot.cogs.extensions")  bot.load_extension("bot.cogs.help")  # Only load this in production @@ -64,7 +63,6 @@ bot.load_extension("bot.cogs.reddit")  bot.load_extension("bot.cogs.reminders")  bot.load_extension("bot.cogs.site")  bot.load_extension("bot.cogs.snekbox") -bot.load_extension("bot.cogs.superstarify")  bot.load_extension("bot.cogs.sync")  bot.load_extension("bot.cogs.tags")  bot.load_extension("bot.cogs.token_remover") diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py index 0f49a400c..6648805e9 100644 --- a/bot/cogs/alias.py +++ b/bot/cogs/alias.py @@ -5,6 +5,7 @@ from typing import Union  from discord import Colour, Embed, Member, User  from discord.ext.commands import Bot, Cog, Command, Context, clean_content, command, group +from bot.cogs.extensions import Extension  from bot.cogs.watchchannels.watchchannel import proxy_user  from bot.converters import TagNameConverter  from bot.pagination import LinePaginator @@ -84,9 +85,9 @@ class Alias (Cog):          await self.invoke(ctx, "site rules")      @command(name="reload", hidden=True) -    async def cogs_reload_alias(self, ctx: Context, *, cog_name: str) -> None: -        """Alias for invoking <prefix>cogs reload [cog_name].""" -        await self.invoke(ctx, "cogs reload", cog_name) +    async def extensions_reload_alias(self, ctx: Context, *extensions: Extension) -> None: +        """Alias for invoking <prefix>extensions reload [extensions...].""" +        await self.invoke(ctx, "extensions reload", *extensions)      @command(name="defon", hidden=True)      async def defcon_enable_alias(self, ctx: Context) -> None: diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py index 8dfa0ad05..1b394048a 100644 --- a/bot/cogs/antispam.py +++ b/bot/cogs/antispam.py @@ -10,7 +10,7 @@ from discord import Colour, Member, Message, NotFound, Object, TextChannel  from discord.ext.commands import Bot, Cog  from bot import rules -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog  from bot.constants import (      AntiSpam as AntiSpamConfig, Channels,      Colours, DEBUG_MODE, Event, Filter, @@ -107,14 +107,16 @@ class AntiSpam(Cog):          self.message_deletion_queue = dict()          self.queue_consumption_tasks = dict() +        self.bot.loop.create_task(self.alert_on_validation_error()) +      @property      def mod_log(self) -> ModLog:          """Allows for easy access of the ModLog cog."""          return self.bot.get_cog("ModLog") -    @Cog.listener() -    async def on_ready(self) -> None: +    async def alert_on_validation_error(self) -> None:          """Unloads the cog and alerts admins if configuration validation failed.""" +        await self.bot.wait_until_ready()          if self.validation_errors:              body = "**The following errors were encountered:**\n"              body += "\n".join(f"- {error}" for error in self.validation_errors.values()) @@ -207,8 +209,10 @@ class AntiSpam(Cog):          if not any(role.id == self.muted_role.id for role in member.roles):              remove_role_after = AntiSpamConfig.punishment['remove_after'] -            # We need context, let's get it +            # Get context and make sure the bot becomes the actor of infraction by patching the `author` attributes              context = await self.bot.get_context(msg) +            context.author = self.bot.user +            context.message.author = self.bot.user              # Since we're going to invoke the tempmute command directly, we need to manually call the converter.              dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S") diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py index 1c0c9a7a8..dca411d01 100644 --- a/bot/cogs/clean.py +++ b/bot/cogs/clean.py @@ -6,7 +6,7 @@ from typing import Optional  from discord import Colour, Embed, Message, User  from discord.ext.commands import Bot, Cog, Context, group -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog  from bot.constants import (      Channels, CleanMessages, Colours, Event,      Icons, MODERATION_ROLES, NEGATIVE_REPLIES diff --git a/bot/cogs/cogs.py b/bot/cogs/cogs.py deleted file mode 100644 index 1f6ccd09c..000000000 --- a/bot/cogs/cogs.py +++ /dev/null @@ -1,298 +0,0 @@ -import logging -import os - -from discord import Colour, Embed -from discord.ext.commands import Bot, Cog, Context, group - -from bot.constants import ( -    Emojis, MODERATION_ROLES, Roles, URLs -) -from bot.decorators import with_role -from bot.pagination import LinePaginator - -log = logging.getLogger(__name__) - -KEEP_LOADED = ["bot.cogs.cogs", "bot.cogs.modlog"] - - -class Cogs(Cog): -    """Cog management commands.""" - -    def __init__(self, bot: Bot): -        self.bot = bot -        self.cogs = {} - -        # Load up the cog names -        log.info("Initializing cog names...") -        for filename in os.listdir("bot/cogs"): -            if filename.endswith(".py") and "_" not in filename: -                if os.path.isfile(f"bot/cogs/{filename}"): -                    cog = filename[:-3] - -                    self.cogs[cog] = f"bot.cogs.{cog}" - -        # Allow reverse lookups by reversing the pairs -        self.cogs.update({v: k for k, v in self.cogs.items()}) - -    @group(name='cogs', aliases=('c',), invoke_without_command=True) -    @with_role(*MODERATION_ROLES, Roles.core_developer) -    async def cogs_group(self, ctx: Context) -> None: -        """Load, unload, reload, and list active cogs.""" -        await ctx.invoke(self.bot.get_command("help"), "cogs") - -    @cogs_group.command(name='load', aliases=('l',)) -    @with_role(*MODERATION_ROLES, Roles.core_developer) -    async def load_command(self, ctx: Context, cog: str) -> None: -        """ -        Load up an unloaded cog, given the module containing it. - -        You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the -        entire module directly. -        """ -        cog = cog.lower() - -        embed = Embed() -        embed.colour = Colour.red() - -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        if cog in self.cogs: -            full_cog = self.cogs[cog] -        elif "." in cog: -            full_cog = cog -        else: -            full_cog = None -            log.warning(f"{ctx.author} requested we load the '{cog}' cog, but that cog doesn't exist.") -            embed.description = f"Unknown cog: {cog}" - -        if full_cog: -            if full_cog not in self.bot.extensions: -                try: -                    self.bot.load_extension(full_cog) -                except ImportError: -                    log.exception(f"{ctx.author} requested we load the '{cog}' cog, " -                                  f"but the cog module {full_cog} could not be found!") -                    embed.description = f"Invalid cog: {cog}\n\nCould not find cog module {full_cog}" -                except Exception as e: -                    log.exception(f"{ctx.author} requested we load the '{cog}' cog, " -                                  "but the loading failed") -                    embed.description = f"Failed to load cog: {cog}\n\n{e.__class__.__name__}: {e}" -                else: -                    log.debug(f"{ctx.author} requested we load the '{cog}' cog. Cog loaded!") -                    embed.description = f"Cog loaded: {cog}" -                    embed.colour = Colour.green() -            else: -                log.warning(f"{ctx.author} requested we load the '{cog}' cog, but the cog was already loaded!") -                embed.description = f"Cog {cog} is already loaded" - -        await ctx.send(embed=embed) - -    @cogs_group.command(name='unload', aliases=('ul',)) -    @with_role(*MODERATION_ROLES, Roles.core_developer) -    async def unload_command(self, ctx: Context, cog: str) -> None: -        """ -        Unload an already-loaded cog, given the module containing it. - -        You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the -        entire module directly. -        """ -        cog = cog.lower() - -        embed = Embed() -        embed.colour = Colour.red() - -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        if cog in self.cogs: -            full_cog = self.cogs[cog] -        elif "." in cog: -            full_cog = cog -        else: -            full_cog = None -            log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but that cog doesn't exist.") -            embed.description = f"Unknown cog: {cog}" - -        if full_cog: -            if full_cog in KEEP_LOADED: -                log.warning(f"{ctx.author} requested we unload `{full_cog}`, that sneaky pete. We said no.") -                embed.description = f"You may not unload `{full_cog}`!" -            elif full_cog in self.bot.extensions: -                try: -                    self.bot.unload_extension(full_cog) -                except Exception as e: -                    log.exception(f"{ctx.author} requested we unload the '{cog}' cog, " -                                  "but the unloading failed") -                    embed.description = f"Failed to unload cog: {cog}\n\n```{e}```" -                else: -                    log.debug(f"{ctx.author} requested we unload the '{cog}' cog. Cog unloaded!") -                    embed.description = f"Cog unloaded: {cog}" -                    embed.colour = Colour.green() -            else: -                log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but the cog wasn't loaded!") -                embed.description = f"Cog {cog} is not loaded" - -        await ctx.send(embed=embed) - -    @cogs_group.command(name='reload', aliases=('r',)) -    @with_role(*MODERATION_ROLES, Roles.core_developer) -    async def reload_command(self, ctx: Context, cog: str) -> None: -        """ -        Reload an unloaded cog, given the module containing it. - -        You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the -        entire module directly. - -        If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the -        bot/cogs directory will be loaded. -        """ -        cog = cog.lower() - -        embed = Embed() -        embed.colour = Colour.red() - -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        if cog == "*": -            full_cog = cog -        elif cog in self.cogs: -            full_cog = self.cogs[cog] -        elif "." in cog: -            full_cog = cog -        else: -            full_cog = None -            log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but that cog doesn't exist.") -            embed.description = f"Unknown cog: {cog}" - -        if full_cog: -            if full_cog == "*": -                all_cogs = [ -                    f"bot.cogs.{fn[:-3]}" for fn in os.listdir("bot/cogs") -                    if os.path.isfile(f"bot/cogs/{fn}") and fn.endswith(".py") and "_" not in fn -                ] - -                failed_unloads = {} -                failed_loads = {} - -                unloaded = 0 -                loaded = 0 - -                for loaded_cog in self.bot.extensions.copy().keys(): -                    try: -                        self.bot.unload_extension(loaded_cog) -                    except Exception as e: -                        failed_unloads[loaded_cog] = f"{e.__class__.__name__}: {e}" -                    else: -                        unloaded += 1 - -                for unloaded_cog in all_cogs: -                    try: -                        self.bot.load_extension(unloaded_cog) -                    except Exception as e: -                        failed_loads[unloaded_cog] = f"{e.__class__.__name__}: {e}" -                    else: -                        loaded += 1 - -                lines = [ -                    "**All cogs reloaded**", -                    f"**Unloaded**: {unloaded} / **Loaded**: {loaded}" -                ] - -                if failed_unloads: -                    lines.append("\n**Unload failures**") - -                    for cog, error in failed_unloads: -                        lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`") - -                if failed_loads: -                    lines.append("\n**Load failures**") - -                    for cog, error in failed_loads.items(): -                        lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`") - -                log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n" -                          f"{lines}") - -                await LinePaginator.paginate(lines, ctx, embed, empty=False) -                return - -            elif full_cog in self.bot.extensions: -                try: -                    self.bot.unload_extension(full_cog) -                    self.bot.load_extension(full_cog) -                except Exception as e: -                    log.exception(f"{ctx.author} requested we reload the '{cog}' cog, " -                                  "but the unloading failed") -                    embed.description = f"Failed to reload cog: {cog}\n\n```{e}```" -                else: -                    log.debug(f"{ctx.author} requested we reload the '{cog}' cog. Cog reloaded!") -                    embed.description = f"Cog reload: {cog}" -                    embed.colour = Colour.green() -            else: -                log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but the cog wasn't loaded!") -                embed.description = f"Cog {cog} is not loaded" - -        await ctx.send(embed=embed) - -    @cogs_group.command(name='list', aliases=('all',)) -    @with_role(*MODERATION_ROLES, Roles.core_developer) -    async def list_command(self, ctx: Context) -> None: -        """ -        Get a list of all cogs, including their loaded status. - -        Gray indicates that the cog is unloaded. Green indicates that the cog is currently loaded. -        """ -        embed = Embed() -        lines = [] -        cogs = {} - -        embed.colour = Colour.blurple() -        embed.set_author( -            name="Python Bot (Cogs)", -            url=URLs.github_bot_repo, -            icon_url=URLs.bot_avatar -        ) - -        for key, _value in self.cogs.items(): -            if "." not in key: -                continue - -            if key in self.bot.extensions: -                cogs[key] = True -            else: -                cogs[key] = False - -        for key in self.bot.extensions.keys(): -            if key not in self.cogs: -                cogs[key] = True - -        for cog, loaded in sorted(cogs.items(), key=lambda x: x[0]): -            if cog in self.cogs: -                cog = self.cogs[cog] - -            if loaded: -                status = Emojis.status_online -            else: -                status = Emojis.status_offline - -            lines.append(f"{status}  {cog}") - -        log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") -        await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False) - - -def setup(bot: Bot) -> None: -    """Cogs cog load.""" -    bot.add_cog(Cogs(bot)) -    log.info("Cog loaded: Cogs") diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py index 048d8a683..70e101baa 100644 --- a/bot/cogs/defcon.py +++ b/bot/cogs/defcon.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta  from discord import Colour, Embed, Member  from discord.ext.commands import Bot, Cog, Context, group -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog  from bot.constants import Channels, Colours, Emojis, Event, Icons, Roles  from bot.decorators import with_role @@ -35,14 +35,16 @@ class Defcon(Cog):          self.channel = None          self.days = timedelta(days=0) +        self.bot.loop.create_task(self.sync_settings()) +      @property      def mod_log(self) -> ModLog:          """Get currently loaded ModLog cog instance."""          return self.bot.get_cog("ModLog") -    @Cog.listener() -    async def on_ready(self) -> None: +    async def sync_settings(self) -> None:          """On cog load, try to synchronize DEFCON settings to the API.""" +        await self.bot.wait_until_ready()          self.channel = await self.bot.fetch_channel(Channels.defcon)          try:              response = await self.bot.api_client.get('bot/bot-settings/defcon') diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py index c9e6b3b91..a13464bff 100644 --- a/bot/cogs/doc.py +++ b/bot/cogs/doc.py @@ -126,9 +126,11 @@ class Doc(commands.Cog):          self.bot = bot          self.inventories = {} -    @commands.Cog.listener() -    async def on_ready(self) -> None: -        """Refresh documentation inventory.""" +        self.bot.loop.create_task(self.init_refresh_inventory()) + +    async def init_refresh_inventory(self) -> None: +        """Refresh documentation inventory on cog initialization.""" +        await self.bot.wait_until_ready()          await self.refresh_inventory()      async def update_single( @@ -207,6 +209,9 @@ class Doc(commands.Cog):          symbol_heading = soup.find(id=symbol_id)          signature_buffer = [] +        if symbol_heading is None: +            return None +          # Traverse the tags of the signature header and ignore any          # unwanted symbols from it. Add all of it to a temporary buffer.          for tag in symbol_heading.strings: diff --git a/bot/cogs/extensions.py b/bot/cogs/extensions.py new file mode 100644 index 000000000..bb66e0b8e --- /dev/null +++ b/bot/cogs/extensions.py @@ -0,0 +1,236 @@ +import functools +import logging +import typing as t +from enum import Enum +from pkgutil import iter_modules + +from discord import Colour, Embed +from discord.ext import commands +from discord.ext.commands import Bot, Context, group + +from bot.constants import Emojis, MODERATION_ROLES, Roles, URLs +from bot.pagination import LinePaginator +from bot.utils.checks import with_role_check + +log = logging.getLogger(__name__) + +UNLOAD_BLACKLIST = {"bot.cogs.extensions", "bot.cogs.modlog"} +EXTENSIONS = frozenset( +    ext.name +    for ext in iter_modules(("bot/cogs",), "bot.cogs.") +    if ext.name[-1] != "_" +) + + +class Action(Enum): +    """Represents an action to perform on an extension.""" + +    # Need to be partial otherwise they are considered to be function definitions. +    LOAD = functools.partial(Bot.load_extension) +    UNLOAD = functools.partial(Bot.unload_extension) +    RELOAD = functools.partial(Bot.reload_extension) + + +class Extension(commands.Converter): +    """ +    Fully qualify the name of an extension and ensure it exists. + +    The * and ** values bypass this when used with the reload command. +    """ + +    async def convert(self, ctx: Context, argument: str) -> str: +        """Fully qualify the name of an extension and ensure it exists.""" +        # Special values to reload all extensions +        if argument == "*" or argument == "**": +            return argument + +        argument = argument.lower() + +        if "." not in argument: +            argument = f"bot.cogs.{argument}" + +        if argument in EXTENSIONS: +            return argument +        else: +            raise commands.BadArgument(f":x: Could not find the extension `{argument}`.") + + +class Extensions(commands.Cog): +    """Extension management commands.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True) +    async def extensions_group(self, ctx: Context) -> None: +        """Load, unload, reload, and list loaded extensions.""" +        await ctx.invoke(self.bot.get_command("help"), "extensions") + +    @extensions_group.command(name="load", aliases=("l",)) +    async def load_command(self, ctx: Context, *extensions: Extension) -> None: +        """ +        Load extensions given their fully qualified or unqualified names. + +        If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded. +        """  # noqa: W605 +        if not extensions: +            await ctx.invoke(self.bot.get_command("help"), "extensions load") +            return + +        if "*" in extensions or "**" in extensions: +            extensions = set(EXTENSIONS) - set(self.bot.extensions.keys()) + +        msg = self.batch_manage(Action.LOAD, *extensions) +        await ctx.send(msg) + +    @extensions_group.command(name="unload", aliases=("ul",)) +    async def unload_command(self, ctx: Context, *extensions: Extension) -> None: +        """ +        Unload currently loaded extensions given their fully qualified or unqualified names. + +        If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded. +        """  # noqa: W605 +        if not extensions: +            await ctx.invoke(self.bot.get_command("help"), "extensions unload") +            return + +        blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions)) + +        if blacklisted: +            msg = f":x: The following extension(s) may not be unloaded:```{blacklisted}```" +        else: +            if "*" in extensions or "**" in extensions: +                extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST + +            msg = self.batch_manage(Action.UNLOAD, *extensions) + +        await ctx.send(msg) + +    @extensions_group.command(name="reload", aliases=("r",)) +    async def reload_command(self, ctx: Context, *extensions: Extension) -> None: +        """ +        Reload extensions given their fully qualified or unqualified names. + +        If an extension fails to be reloaded, it will be rolled-back to the prior working state. + +        If '\*' is given as the name, all currently loaded extensions will be reloaded. +        If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded. +        """  # noqa: W605 +        if not extensions: +            await ctx.invoke(self.bot.get_command("help"), "extensions reload") +            return + +        if "**" in extensions: +            extensions = EXTENSIONS +        elif "*" in extensions: +            extensions = set(self.bot.extensions.keys()) | set(extensions) +            extensions.remove("*") + +        msg = self.batch_manage(Action.RELOAD, *extensions) + +        await ctx.send(msg) + +    @extensions_group.command(name="list", aliases=("all",)) +    async def list_command(self, ctx: Context) -> None: +        """ +        Get a list of all extensions, including their loaded status. + +        Grey indicates that the extension is unloaded. +        Green indicates that the extension is currently loaded. +        """ +        embed = Embed() +        lines = [] + +        embed.colour = Colour.blurple() +        embed.set_author( +            name="Extensions List", +            url=URLs.github_bot_repo, +            icon_url=URLs.bot_avatar +        ) + +        for ext in sorted(list(EXTENSIONS)): +            if ext in self.bot.extensions: +                status = Emojis.status_online +            else: +                status = Emojis.status_offline + +            ext = ext.rsplit(".", 1)[1] +            lines.append(f"{status}  {ext}") + +        log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") +        await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False) + +    def batch_manage(self, action: Action, *extensions: str) -> str: +        """ +        Apply an action to multiple extensions and return a message with the results. + +        If only one extension is given, it is deferred to `manage()`. +        """ +        if len(extensions) == 1: +            msg, _ = self.manage(action, extensions[0]) +            return msg + +        verb = action.name.lower() +        failures = {} + +        for extension in extensions: +            _, error = self.manage(action, extension) +            if error: +                failures[extension] = error + +        emoji = ":x:" if failures else ":ok_hand:" +        msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed." + +        if failures: +            failures = "\n".join(f"{ext}\n    {err}" for ext, err in failures.items()) +            msg += f"\nFailures:```{failures}```" + +        log.debug(f"Batch {verb}ed extensions.") + +        return msg + +    def manage(self, action: Action, ext: str) -> t.Tuple[str, t.Optional[str]]: +        """Apply an action to an extension and return the status message and any error message.""" +        verb = action.name.lower() +        error_msg = None + +        try: +            action.value(self.bot, ext) +        except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded): +            if action is Action.RELOAD: +                # When reloading, just load the extension if it was not loaded. +                return self.manage(Action.LOAD, ext) + +            msg = f":x: Extension `{ext}` is already {verb}ed." +            log.debug(msg[4:]) +        except Exception as e: +            if hasattr(e, "original"): +                e = e.original + +            log.exception(f"Extension '{ext}' failed to {verb}.") + +            error_msg = f"{e.__class__.__name__}: {e}" +            msg = f":x: Failed to {verb} extension `{ext}`:\n```{error_msg}```" +        else: +            msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`." +            log.debug(msg[10:]) + +        return msg, error_msg + +    # This cannot be static (must have a __func__ attribute). +    def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators and core developers to invoke the commands in this cog.""" +        return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developer) + +    # This cannot be static (must have a __func__ attribute). +    async def cog_command_error(self, ctx: Context, error: Exception) -> None: +        """Handle BadArgument errors locally to prevent the help command from showing.""" +        if isinstance(error, commands.BadArgument): +            await ctx.send(str(error)) +            error.handled = True + + +def setup(bot: Bot) -> None: +    """Load the Extensions cog.""" +    bot.add_cog(Extensions(bot)) +    log.info("Cog loaded: Extensions") diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py index bd8c6ed67..265ae5160 100644 --- a/bot/cogs/filtering.py +++ b/bot/cogs/filtering.py @@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta  from discord import Colour, DMChannel, Member, Message, TextChannel  from discord.ext.commands import Bot, Cog -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog  from bot.constants import (      Channels, Colours, DEBUG_MODE,      Filter, Icons, URLs diff --git a/bot/cogs/help.py b/bot/cogs/help.py index 37d12b2d5..9607dbd8d 100644 --- a/bot/cogs/help.py +++ b/bot/cogs/help.py @@ -1,5 +1,4 @@  import asyncio -import inspect  import itertools  from collections import namedtuple  from contextlib import suppress @@ -61,6 +60,12 @@ class HelpSession:              The message object that's showing the help contents.          * destination: `discord.abc.Messageable`              Where the help message is to be sent to. + +    Cogs can be grouped into custom categories. All cogs with the same category will be displayed +    under a single category name in the help output. Custom categories are defined inside the cogs +    as a class attribute named `category`. A description can also be specified with the attribute +    `category_description`. If a description is not found in at least one cog, the default will be +    the regular description (class docstring) of the first cog found in the category.      """      def __init__( @@ -107,12 +112,31 @@ class HelpSession:          if command:              return command -        cog = self._bot.cogs.get(query) -        if cog: +        # Find all cog categories that match. +        cog_matches = [] +        description = None +        for cog in self._bot.cogs.values(): +            if hasattr(cog, "category") and cog.category == query: +                cog_matches.append(cog) +                if hasattr(cog, "category_description"): +                    description = cog.category_description + +        # Try to search by cog name if no categories match. +        if not cog_matches: +            cog = self._bot.cogs.get(query) + +            # Don't consider it a match if the cog has a category. +            if cog and not hasattr(cog, "category"): +                cog_matches = [cog] + +        if cog_matches: +            cog = cog_matches[0] +            cmds = (cog.get_commands() for cog in cog_matches)  # Commands of all cogs +              return Cog( -                name=cog.qualified_name, -                description=inspect.getdoc(cog), -                commands=[c for c in self._bot.commands if c.cog is cog] +                name=cog.category if hasattr(cog, "category") else cog.qualified_name, +                description=description or cog.description, +                commands=tuple(itertools.chain.from_iterable(cmds))  # Flatten the list              )          self._handle_not_found(query) @@ -207,8 +231,16 @@ class HelpSession:          A zero width space is used as a prefix for results with no cogs to force them last in ordering.          """ -        cog = cmd.cog_name -        return f'**{cog}**' if cog else f'**\u200bNo Category:**' +        if cmd.cog: +            try: +                if cmd.cog.category: +                    return f'**{cmd.cog.category}**' +            except AttributeError: +                pass + +            return f'**{cmd.cog_name}**' +        else: +            return "**\u200bNo Category:**"      def _get_command_params(self, cmd: Command) -> str:          """ diff --git a/bot/cogs/information.py b/bot/cogs/information.py index 1afb37103..3a7ba0444 100644 --- a/bot/cogs/information.py +++ b/bot/cogs/information.py @@ -1,14 +1,18 @@  import colorsys  import logging +import pprint  import textwrap  import typing +from typing import Any, Mapping, Optional +import discord  from discord import CategoryChannel, Colour, Embed, Member, Role, TextChannel, VoiceChannel, utils -from discord.ext.commands import Bot, Cog, Context, command +from discord.ext import commands +from discord.ext.commands import Bot, BucketType, Cog, Context, command, group  from bot.constants import Channels, Emojis, MODERATION_ROLES, STAFF_ROLES -from bot.decorators import InChannelCheckFailure, with_role -from bot.utils.checks import with_role_check +from bot.decorators import InChannelCheckFailure, in_channel, with_role +from bot.utils.checks import cooldown_with_role_bypass, with_role_check  from bot.utils.time import time_since  log = logging.getLogger(__name__) @@ -229,6 +233,82 @@ class Information(Cog):          await ctx.send(embed=embed) +    def format_fields(self, mapping: Mapping[str, Any], field_width: Optional[int] = None) -> str: +        """Format a mapping to be readable to a human.""" +        # sorting is technically superfluous but nice if you want to look for a specific field +        fields = sorted(mapping.items(), key=lambda item: item[0]) + +        if field_width is None: +            field_width = len(max(mapping.keys(), key=len)) + +        out = '' + +        for key, val in fields: +            if isinstance(val, dict): +                # if we have dicts inside dicts we want to apply the same treatment to the inner dictionaries +                inner_width = int(field_width * 1.6) +                val = '\n' + self.format_fields(val, field_width=inner_width) + +            elif isinstance(val, str): +                # split up text since it might be long +                text = textwrap.fill(val, width=100, replace_whitespace=False) + +                # indent it, I guess you could do this with `wrap` and `join` but this is nicer +                val = textwrap.indent(text, ' ' * (field_width + len(': '))) + +                # the first line is already indented so we `str.lstrip` it +                val = val.lstrip() + +            if key == 'color': +                # makes the base 10 representation of a hex number readable to humans +                val = hex(val) + +            out += '{0:>{width}}: {1}\n'.format(key, val, width=field_width) + +        # remove trailing whitespace +        return out.rstrip() + +    @cooldown_with_role_bypass(2, 60 * 3, BucketType.member, bypass_roles=STAFF_ROLES) +    @group(invoke_without_command=True) +    @in_channel(Channels.bot, bypass_roles=STAFF_ROLES) +    async def raw(self, ctx: Context, *, message: discord.Message, json: bool = False) -> None: +        """Shows information about the raw API response.""" +        # I *guess* it could be deleted right as the command is invoked but I felt like it wasn't worth handling +        # doing this extra request is also much easier than trying to convert everything back into a dictionary again +        raw_data = await ctx.bot.http.get_message(message.channel.id, message.id) + +        paginator = commands.Paginator() + +        def add_content(title: str, content: str) -> None: +            paginator.add_line(f'== {title} ==\n') +            # replace backticks as it breaks out of code blocks. Spaces seemed to be the most reasonable solution. +            # we hope it's not close to 2000 +            paginator.add_line(content.replace('```', '`` `')) +            paginator.close_page() + +        if message.content: +            add_content('Raw message', message.content) + +        transformer = pprint.pformat if json else self.format_fields +        for field_name in ('embeds', 'attachments'): +            data = raw_data[field_name] + +            if not data: +                continue + +            total = len(data) +            for current, item in enumerate(data, start=1): +                title = f'Raw {field_name} ({current}/{total})' +                add_content(title, transformer(item)) + +        for page in paginator.pages: +            await ctx.send(page) + +    @raw.command() +    async def json(self, ctx: Context, message: discord.Message) -> None: +        """Shows information about the raw API response in a copy-pasteable Python format.""" +        await ctx.invoke(self.raw, message=message, json=True) +  def setup(bot: Bot) -> None:      """Information cog load.""" diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py index 8e47bcc36..c92b619ff 100644 --- a/bot/cogs/logging.py +++ b/bot/cogs/logging.py @@ -15,9 +15,11 @@ class Logging(Cog):      def __init__(self, bot: Bot):          self.bot = bot -    @Cog.listener() -    async def on_ready(self) -> None: +        self.bot.loop.create_task(self.startup_greeting()) + +    async def startup_greeting(self) -> None:          """Announce our presence to the configured devlog channel.""" +        await self.bot.wait_until_ready()          log.info("Bot connected!")          embed = Embed(description="Connected!") diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py deleted file mode 100644 index 5aa873a47..000000000 --- a/bot/cogs/moderation.py +++ /dev/null @@ -1,1172 +0,0 @@ -import asyncio -import logging -import textwrap -from datetime import datetime -from typing import Dict, Union - -from discord import ( -    Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User -) -from discord.ext.commands import ( -    BadArgument, BadUnionArgument, Bot, Cog, Context, command, group -) - -from bot import constants -from bot.cogs.modlog import ModLog -from bot.constants import Colours, Event, Icons, MODERATION_ROLES -from bot.converters import Duration, InfractionSearchQuery -from bot.decorators import with_role -from bot.pagination import LinePaginator -from bot.utils.moderation import already_has_active_infraction, post_infraction -from bot.utils.scheduling import Scheduler, create_task -from bot.utils.time import INFRACTION_FORMAT, format_infraction, wait_until - -log = logging.getLogger(__name__) - -INFRACTION_ICONS = { -    "Mute": Icons.user_mute, -    "Kick": Icons.sign_out, -    "Ban": Icons.user_ban -} -RULES_URL = "https://pythondiscord.com/pages/rules" -APPEALABLE_INFRACTIONS = ("Ban", "Mute") - - -def proxy_user(user_id: str) -> Object: -    """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved.""" -    try: -        user_id = int(user_id) -    except ValueError: -        raise BadArgument -    user = Object(user_id) -    user.mention = user.id -    user.avatar_url_as = lambda static_format: None -    return user - - -def permanent_duration(expires_at: str) -> str: -    """Only allow an expiration to be 'permanent' if it is a string.""" -    expires_at = expires_at.lower() -    if expires_at != "permanent": -        raise BadArgument -    else: -        return expires_at - - -UserTypes = Union[Member, User, proxy_user] - - -class Moderation(Scheduler, Cog): -    """Server moderation tools.""" - -    def __init__(self, bot: Bot): -        self.bot = bot -        self._muted_role = Object(constants.Roles.muted) -        super().__init__() - -    @property -    def mod_log(self) -> ModLog: -        """Get currently loaded ModLog cog instance.""" -        return self.bot.get_cog("ModLog") - -    @Cog.listener() -    async def on_ready(self) -> None: -        """Schedule expiration for previous infractions.""" -        # Schedule expiration for previous infractions -        infractions = await self.bot.api_client.get( -            'bot/infractions', params={'active': 'true'} -        ) -        for infraction in infractions: -            if infraction["expires_at"] is not None: -                self.schedule_task(self.bot.loop, infraction["id"], infraction) - -    @Cog.listener() -    async def on_member_join(self, member: Member) -> None: -        """Reapply active mute infractions for returning members.""" -        active_mutes = await self.bot.api_client.get( -            'bot/infractions', -            params={'user__id': str(member.id), 'type': 'mute', 'active': 'true'} -        ) -        if not active_mutes: -            return - -        # assume a single mute because of restrictions elsewhere -        mute = active_mutes[0] - -        # transform expiration to delay in seconds -        expiration_datetime = datetime.fromisoformat(mute["expires_at"][:-1]) -        delay = expiration_datetime - datetime.utcnow() -        delay_seconds = delay.total_seconds() - -        # if under a minute or in the past -        if delay_seconds < 60: -            log.debug(f"Marking infraction {mute['id']} as inactive (expired).") -            await self._deactivate_infraction(mute) -            self.cancel_task(mute["id"]) - -            # Notify the user that they've been unmuted. -            await self.notify_pardon( -                user=member, -                title="You have been unmuted.", -                content="You may now send messages in the server.", -                icon_url=Icons.user_unmute -            ) -            return - -        # allowing modlog since this is a passive action that should be logged -        await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}") -        log.debug(f"User {member.id} has been re-muted on rejoin.") - -    # region: Permanent infractions - -    @with_role(*MODERATION_ROLES) -    @command() -    async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: -        """Create a warning infraction in the database for a user.""" -        infraction = await post_infraction(ctx, user, type="warning", reason=reason) -        if infraction is None: -            return - -        notified = await self.notify_infraction(user=user, infr_type="Warning", reason=reason) - -        dm_result = ":incoming_envelope: " if notified else "" -        action = f"{dm_result}:ok_hand: warned {user.mention}" -        await ctx.send(f"{action}.") - -        if notified: -            dm_status = "Sent" -            log_content = None -        else: -            dm_status = "**Failed**" -            log_content = ctx.author.mention - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_warn, -            colour=Colour(Colours.soft_red), -            title="Member warned", -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.author} -                DM: {dm_status} -                Reason: {reason} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    @with_role(*MODERATION_ROLES) -    @command() -    async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: -        """Kicks a user with the provided reason.""" -        if not await self.respect_role_hierarchy(ctx, user, 'kick'): -            # Ensure ctx author has a higher top role than the target user -            # Warning is sent to ctx by the helper method -            return - -        infraction = await post_infraction(ctx, user, type="kick", reason=reason) -        if infraction is None: -            return - -        notified = await self.notify_infraction(user=user, infr_type="Kick", reason=reason) - -        self.mod_log.ignore(Event.member_remove, user.id) - -        try: -            await user.kick(reason=reason) -            action_result = True -        except Forbidden: -            action_result = False - -        dm_result = ":incoming_envelope: " if notified else "" -        action = f"{dm_result}:ok_hand: kicked {user.mention}" -        await ctx.send(f"{action}.") - -        dm_status = "Sent" if notified else "**Failed**" -        title = "Member kicked" if action_result else "Member kicked (Failed)" -        log_content = None if all((notified, action_result)) else ctx.author.mention - -        await self.mod_log.send_log_message( -            icon_url=Icons.sign_out, -            colour=Colour(Colours.soft_red), -            title=title, -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                DM: {dm_status} -                Reason: {reason} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    @with_role(*MODERATION_ROLES) -    @command() -    async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: -        """Create a permanent ban infraction for a user with the provided reason.""" -        if not await self.respect_role_hierarchy(ctx, user, 'ban'): -            # Ensure ctx author has a higher top role than the target user -            # Warning is sent to ctx by the helper method -            return - -        if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): -            return - -        infraction = await post_infraction(ctx, user, type="ban", reason=reason) -        if infraction is None: -            return - -        notified = await self.notify_infraction( -            user=user, -            infr_type="Ban", -            reason=reason -        ) - -        self.mod_log.ignore(Event.member_ban, user.id) -        self.mod_log.ignore(Event.member_remove, user.id) - -        try: -            await ctx.guild.ban(user, reason=reason, delete_message_days=0) -            action_result = True -        except Forbidden: -            action_result = False - -        dm_result = ":incoming_envelope: " if notified else "" -        action = f"{dm_result}:ok_hand: permanently banned {user.mention}" -        await ctx.send(f"{action}.") - -        dm_status = "Sent" if notified else "**Failed**" -        log_content = None if all((notified, action_result)) else ctx.author.mention -        title = "Member permanently banned" -        if not action_result: -            title += " (Failed)" - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_ban, -            colour=Colour(Colours.soft_red), -            title=title, -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                DM: {dm_status} -                Reason: {reason} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    # endregion -    # region: Temporary infractions - -    @with_role(*MODERATION_ROLES) -    @command(aliases=('mute',)) -    async def tempmute(self, ctx: Context, user: Member, duration: Duration, *, reason: str = None) -> None: -        """ -        Create a temporary mute infraction for a user with the provided expiration and reason. - -        Duration strings are parsed per: http://strftime.org/ -        """ -        expiration = duration - -        if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): -            return - -        infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration) -        if infraction is None: -            return - -        self.mod_log.ignore(Event.member_update, user.id) -        await user.add_roles(self._muted_role, reason=reason) - -        notified = await self.notify_infraction( -            user=user, -            infr_type="Mute", -            expires_at=expiration, -            reason=reason -        ) - -        infraction_expiration = format_infraction(infraction["expires_at"]) - -        self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - -        dm_result = ":incoming_envelope: " if notified else "" -        action = f"{dm_result}:ok_hand: muted {user.mention} until {infraction_expiration}" -        await ctx.send(f"{action}.") - -        if notified: -            dm_status = "Sent" -            log_content = None -        else: -            dm_status = "**Failed**" -            log_content = ctx.author.mention - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_mute, -            colour=Colour(Colours.soft_red), -            title="Member temporarily muted", -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                DM: {dm_status} -                Reason: {reason} -                Expires: {infraction_expiration} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    @with_role(*MODERATION_ROLES) -    @command() -    async def tempban(self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None) -> None: -        """ -        Create a temporary ban infraction for a user with the provided expiration and reason. - -        Duration strings are parsed per: http://strftime.org/ -        """ -        expiration = duration - -        if not await self.respect_role_hierarchy(ctx, user, 'tempban'): -            # Ensure ctx author has a higher top role than the target user -            # Warning is sent to ctx by the helper method -            return - -        if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): -            return - -        infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration) -        if infraction is None: -            return - -        notified = await self.notify_infraction( -            user=user, -            infr_type="Ban", -            expires_at=expiration, -            reason=reason -        ) - -        self.mod_log.ignore(Event.member_ban, user.id) -        self.mod_log.ignore(Event.member_remove, user.id) - -        try: -            await ctx.guild.ban(user, reason=reason, delete_message_days=0) -            action_result = True -        except Forbidden: -            action_result = False - -        infraction_expiration = format_infraction(infraction["expires_at"]) - -        self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - -        dm_result = ":incoming_envelope: " if notified else "" -        action = f"{dm_result}:ok_hand: banned {user.mention} until {infraction_expiration}" -        await ctx.send(f"{action}.") - -        dm_status = "Sent" if notified else "**Failed**" -        log_content = None if all((notified, action_result)) else ctx.author.mention -        title = "Member temporarily banned" -        if not action_result: -            title += " (Failed)" - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_ban, -            colour=Colour(Colours.soft_red), -            thumbnail=user.avatar_url_as(static_format="png"), -            title=title, -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                DM: {dm_status} -                Reason: {reason} -                Expires: {infraction_expiration} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    # endregion -    # region: Permanent shadow infractions - -    @with_role(*MODERATION_ROLES) -    @command(hidden=True) -    async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: -        """ -        Create a private infraction note in the database for a user with the provided reason. - -        This does not send the user a notification -        """ -        infraction = await post_infraction(ctx, user, type="note", reason=reason, hidden=True) -        if infraction is None: -            return - -        await ctx.send(f":ok_hand: note added for {user.mention}.") - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_warn, -            colour=Colour(Colours.soft_red), -            title="Member note added", -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                Reason: {reason} -            """), -            footer=f"ID {infraction['id']}" -        ) - -    @with_role(*MODERATION_ROLES) -    @command(hidden=True, aliases=['shadowkick', 'skick']) -    async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: -        """ -        Kick a user for the provided reason. - -        This does not send the user a notification. -        """ -        if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'): -            # Ensure ctx author has a higher top role than the target user -            # Warning is sent to ctx by the helper method -            return - -        infraction = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True) -        if infraction is None: -            return - -        self.mod_log.ignore(Event.member_remove, user.id) - -        try: -            await user.kick(reason=reason) -            action_result = True -        except Forbidden: -            action_result = False - -        await ctx.send(f":ok_hand: kicked {user.mention}.") - -        title = "Member shadow kicked" -        if action_result: -            log_content = None -        else: -            log_content = ctx.author.mention -            title += " (Failed)" - -        await self.mod_log.send_log_message( -            icon_url=Icons.sign_out, -            colour=Colour(Colours.soft_red), -            title=title, -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                Reason: {reason} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    @with_role(*MODERATION_ROLES) -    @command(hidden=True, aliases=['shadowban', 'sban']) -    async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: -        """ -        Create a permanent ban infraction for a user with the provided reason. - -        This does not send the user a notification. -        """ -        if not await self.respect_role_hierarchy(ctx, user, 'shadowban'): -            # Ensure ctx author has a higher top role than the target user -            # Warning is sent to ctx by the helper method -            return - -        if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): -            return - -        infraction = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True) -        if infraction is None: -            return - -        self.mod_log.ignore(Event.member_ban, user.id) -        self.mod_log.ignore(Event.member_remove, user.id) - -        try: -            await ctx.guild.ban(user, reason=reason, delete_message_days=0) -            action_result = True -        except Forbidden: -            action_result = False - -        await ctx.send(f":ok_hand: permanently banned {user.mention}.") - -        title = "Member permanently banned" -        if action_result: -            log_content = None -        else: -            log_content = ctx.author.mention -            title += " (Failed)" - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_ban, -            colour=Colour(Colours.soft_red), -            title=title, -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                Reason: {reason} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    # endregion -    # region: Temporary shadow infractions - -    @with_role(*MODERATION_ROLES) -    @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"]) -    async def shadow_tempmute( -        self, ctx: Context, user: Member, duration: Duration, *, reason: str = None -    ) -> None: -        """ -        Create a temporary mute infraction for a user with the provided reason. - -        Duration strings are parsed per: http://strftime.org/ - -        This does not send the user a notification. -        """ -        expiration = duration - -        if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): -            return - -        infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration, hidden=True) -        if infraction is None: -            return - -        self.mod_log.ignore(Event.member_update, user.id) -        await user.add_roles(self._muted_role, reason=reason) - -        infraction_expiration = format_infraction(infraction["expires_at"]) -        self.schedule_task(ctx.bot.loop, infraction["id"], infraction) -        await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.") - -        await self.mod_log.send_log_message( -            icon_url=Icons.user_mute, -            colour=Colour(Colours.soft_red), -            title="Member temporarily muted", -            thumbnail=user.avatar_url_as(static_format="png"), -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                Reason: {reason} -                Expires: {infraction_expiration} -            """), -            footer=f"ID {infraction['id']}" -        ) - -    @with_role(*MODERATION_ROLES) -    @command(hidden=True, aliases=["shadowtempban, stempban"]) -    async def shadow_tempban( -        self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None -    ) -> None: -        """ -        Create a temporary ban infraction for a user with the provided reason. - -        Duration strings are parsed per: http://strftime.org/ - -        This does not send the user a notification. -        """ -        expiration = duration - -        if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'): -            # Ensure ctx author has a higher top role than the target user -            # Warning is sent to ctx by the helper method -            return - -        if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): -            return - -        infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration, hidden=True) -        if infraction is None: -            return - -        self.mod_log.ignore(Event.member_ban, user.id) -        self.mod_log.ignore(Event.member_remove, user.id) - -        try: -            await ctx.guild.ban(user, reason=reason, delete_message_days=0) -            action_result = True -        except Forbidden: -            action_result = False - -        infraction_expiration = format_infraction(infraction["expires_at"]) -        self.schedule_task(ctx.bot.loop, infraction["id"], infraction) -        await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.") - -        title = "Member temporarily banned" -        if action_result: -            log_content = None -        else: -            log_content = ctx.author.mention -            title += " (Failed)" - -        # Send a log message to the mod log -        await self.mod_log.send_log_message( -            icon_url=Icons.user_ban, -            colour=Colour(Colours.soft_red), -            thumbnail=user.avatar_url_as(static_format="png"), -            title=title, -            text=textwrap.dedent(f""" -                Member: {user.mention} (`{user.id}`) -                Actor: {ctx.message.author} -                Reason: {reason} -                Expires: {infraction_expiration} -            """), -            content=log_content, -            footer=f"ID {infraction['id']}" -        ) - -    # endregion -    # region: Remove infractions (un- commands) - -    @with_role(*MODERATION_ROLES) -    @command() -    async def unmute(self, ctx: Context, user: UserTypes) -> None: -        """Deactivates the active mute infraction for a user.""" -        try: -            # check the current active infraction -            response = await self.bot.api_client.get( -                'bot/infractions', -                params={ -                    'active': 'true', -                    'type': 'mute', -                    'user__id': user.id -                } -            ) -            if len(response) > 1: -                log.warning("Found more than one active mute infraction for user `%d`", user.id) - -            if not response: -                # no active infraction -                await ctx.send( -                    f":x: There is no active mute infraction for user {user.mention}." -                ) -                return - -            for infraction in response: -                await self._deactivate_infraction(infraction) -                if infraction["expires_at"] is not None: -                    self.cancel_expiration(infraction["id"]) - -            notified = await self.notify_pardon( -                user=user, -                title="You have been unmuted.", -                content="You may now send messages in the server.", -                icon_url=Icons.user_unmute -            ) - -            if notified: -                dm_status = "Sent" -                dm_emoji = ":incoming_envelope: " -                log_content = None -            else: -                dm_status = "**Failed**" -                dm_emoji = "" -                log_content = ctx.author.mention - -            await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.") - -            embed_text = textwrap.dedent( -                f""" -                    Member: {user.mention} (`{user.id}`) -                    Actor: {ctx.message.author} -                    DM: {dm_status} -                """ -            ) - -            if len(response) > 1: -                footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" -                title = "Member unmuted" -                embed_text += "Note: User had multiple **active** mute infractions in the database." -            else: -                infraction = response[0] -                footer = f"Infraction ID: {infraction['id']}" -                title = "Member unmuted" - -            # Send a log message to the mod log -            await self.mod_log.send_log_message( -                icon_url=Icons.user_unmute, -                colour=Colour(Colours.soft_green), -                title=title, -                thumbnail=user.avatar_url_as(static_format="png"), -                text=embed_text, -                footer=footer, -                content=log_content -            ) -        except Exception: -            log.exception("There was an error removing an infraction.") -            await ctx.send(":x: There was an error removing the infraction.") - -    @with_role(*MODERATION_ROLES) -    @command() -    async def unban(self, ctx: Context, user: UserTypes) -> None: -        """Deactivates the active ban infraction for a user.""" -        try: -            # check the current active infraction -            response = await self.bot.api_client.get( -                'bot/infractions', -                params={ -                    'active': 'true', -                    'type': 'ban', -                    'user__id': str(user.id) -                } -            ) -            if len(response) > 1: -                log.warning( -                    "More than one active ban infraction found for user `%d`.", -                    user.id -                ) - -            if not response: -                # no active infraction -                await ctx.send( -                    f":x: There is no active ban infraction for user {user.mention}." -                ) -                return - -            for infraction in response: -                await self._deactivate_infraction(infraction) -                if infraction["expires_at"] is not None: -                    self.cancel_expiration(infraction["id"]) - -            embed_text = textwrap.dedent( -                f""" -                    Member: {user.mention} (`{user.id}`) -                    Actor: {ctx.message.author} -                """ -            ) - -            if len(response) > 1: -                footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" -                embed_text += "Note: User had multiple **active** ban infractions in the database." -            else: -                infraction = response[0] -                footer = f"Infraction ID: {infraction['id']}" - -            await ctx.send(f":ok_hand: Un-banned {user.mention}.") - -            # Send a log message to the mod log -            await self.mod_log.send_log_message( -                icon_url=Icons.user_unban, -                colour=Colour(Colours.soft_green), -                title="Member unbanned", -                thumbnail=user.avatar_url_as(static_format="png"), -                text=embed_text, -                footer=footer, -            ) -        except Exception: -            log.exception("There was an error removing an infraction.") -            await ctx.send(":x: There was an error removing the infraction.") - -    # endregion -    # region: Edit infraction commands - -    @with_role(*MODERATION_ROLES) -    @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) -    async def infraction_group(self, ctx: Context) -> None: -        """Infraction manipulation commands.""" -        await ctx.invoke(self.bot.get_command("help"), "infraction") - -    @with_role(*MODERATION_ROLES) -    @infraction_group.command(name='edit') -    async def infraction_edit( -        self, -        ctx: Context, -        infraction_id: int, -        expires_at: Union[Duration, permanent_duration, None], -        *, -        reason: str = None -    ) -> None: -        """ -        Edit the duration and/or the reason of an infraction. - -        Durations are relative to the time of updating. -        Use "permanent" to mark the infraction as permanent. -        """ -        if expires_at is None and reason is None: -            # Unlike UserInputError, the error handler will show a specified message for BadArgument -            raise BadArgument("Neither a new expiry nor a new reason was specified.") - -        # Retrieve the previous infraction for its information. -        old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}') - -        request_data = {} -        confirm_messages = [] -        log_text = "" - -        if expires_at == "permanent": -            request_data['expires_at'] = None -            confirm_messages.append("marked as permanent") -        elif expires_at is not None: -            request_data['expires_at'] = expires_at.isoformat() -            confirm_messages.append(f"set to expire on {expires_at.strftime(INFRACTION_FORMAT)}") -        else: -            confirm_messages.append("expiry unchanged") - -        if reason: -            request_data['reason'] = reason -            confirm_messages.append("set a new reason") -            log_text += f""" -                Previous reason: {old_infraction['reason']} -                New reason: {reason} -            """.rstrip() -        else: -            confirm_messages.append("reason unchanged") - -        # Update the infraction -        new_infraction = await self.bot.api_client.patch( -            f'bot/infractions/{infraction_id}', -            json=request_data, -        ) - -        # Re-schedule infraction if the expiration has been updated -        if 'expires_at' in request_data: -            self.cancel_task(new_infraction['id']) -            loop = asyncio.get_event_loop() -            self.schedule_task(loop, new_infraction['id'], new_infraction) - -            log_text += f""" -                Previous expiry: {old_infraction['expires_at'] or "Permanent"} -                New expiry: {new_infraction['expires_at'] or "Permanent"} -            """.rstrip() - -        await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}") - -        # Get information about the infraction's user -        user_id = new_infraction['user'] -        user = ctx.guild.get_member(user_id) - -        if user: -            user_text = f"{user.mention} (`{user.id}`)" -            thumbnail = user.avatar_url_as(static_format="png") -        else: -            user_text = f"`{user_id}`" -            thumbnail = None - -        # The infraction's actor -        actor_id = new_infraction['actor'] -        actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" - -        await self.mod_log.send_log_message( -            icon_url=Icons.pencil, -            colour=Colour.blurple(), -            title="Infraction edited", -            thumbnail=thumbnail, -            text=textwrap.dedent(f""" -                Member: {user_text} -                Actor: {actor} -                Edited by: {ctx.message.author}{log_text} -            """) -        ) - -    # endregion -    # region: Search infractions - -    @with_role(*MODERATION_ROLES) -    @infraction_group.group(name="search", invoke_without_command=True) -    async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: -        """Searches for infractions in the database.""" -        if isinstance(query, User): -            await ctx.invoke(self.search_user, query) - -        else: -            await ctx.invoke(self.search_reason, query) - -    @with_role(*MODERATION_ROLES) -    @infraction_search_group.command(name="user", aliases=("member", "id")) -    async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None: -        """Search for infractions by member.""" -        infraction_list = await self.bot.api_client.get( -            'bot/infractions', -            params={'user__id': str(user.id)} -        ) -        embed = Embed( -            title=f"Infractions for {user} ({len(infraction_list)} total)", -            colour=Colour.orange() -        ) -        await self.send_infraction_list(ctx, embed, infraction_list) - -    @with_role(*MODERATION_ROLES) -    @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) -    async def search_reason(self, ctx: Context, reason: str) -> None: -        """Search for infractions by their reason. Use Re2 for matching.""" -        infraction_list = await self.bot.api_client.get( -            'bot/infractions', params={'search': reason} -        ) -        embed = Embed( -            title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", -            colour=Colour.orange() -        ) -        await self.send_infraction_list(ctx, embed, infraction_list) - -    # endregion -    # region: Utility functions - -    async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None: -        """Send a paginated embed of infractions for the specified user.""" -        if not infractions: -            await ctx.send(f":warning: No infractions could be found for that query.") -            return - -        lines = tuple( -            self._infraction_to_string(infraction) -            for infraction in infractions -        ) - -        await LinePaginator.paginate( -            lines, -            ctx=ctx, -            embed=embed, -            empty=True, -            max_lines=3, -            max_size=1000 -        ) - -    # endregion -    # region: Utility functions - -    def schedule_expiration( -        self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]] -    ) -> None: -        """Schedules a task to expire a temporary infraction.""" -        infraction_id = infraction_object["id"] -        if infraction_id in self.scheduled_tasks: -            return - -        task: asyncio.Task = create_task(loop, self._scheduled_expiration(infraction_object)) - -        self.scheduled_tasks[infraction_id] = task - -    def cancel_expiration(self, infraction_id: str) -> None: -        """Un-schedules a task set to expire a temporary infraction.""" -        task = self.scheduled_tasks.get(infraction_id) -        if task is None: -            log.warning(f"Failed to unschedule {infraction_id}: no task found.") -            return -        task.cancel() -        log.debug(f"Unscheduled {infraction_id}.") -        del self.scheduled_tasks[infraction_id] - -    async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: -        """ -        Marks an infraction expired after the delay from time of scheduling to time of expiration. - -        At the time of expiration, the infraction is marked as inactive on the website, and the -        expiration task is cancelled. The user is then notified via DM. -        """ -        infraction_id = infraction_object["id"] - -        # transform expiration to delay in seconds -        expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1]) -        await wait_until(expiration_datetime) - -        log.debug(f"Marking infraction {infraction_id} as inactive (expired).") -        await self._deactivate_infraction(infraction_object) - -        self.cancel_task(infraction_object["id"]) - -        # Notify the user that they've been unmuted. -        user_id = infraction_object["user"] -        guild = self.bot.get_guild(constants.Guild.id) -        await self.notify_pardon( -            user=guild.get_member(user_id), -            title="You have been unmuted.", -            content="You may now send messages in the server.", -            icon_url=Icons.user_unmute -        ) - -    async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: -        """ -        A co-routine which marks an infraction as inactive on the website. - -        This co-routine does not cancel or un-schedule an expiration task. -        """ -        guild: Guild = self.bot.get_guild(constants.Guild.id) -        user_id = infraction_object["user"] -        infraction_type = infraction_object["type"] - -        await self.bot.api_client.patch( -            'bot/infractions/' + str(infraction_object['id']), -            json={"active": False} -        ) - -        if infraction_type == "mute": -            member: Member = guild.get_member(user_id) -            if member: -                # remove the mute role -                self.mod_log.ignore(Event.member_update, member.id) -                await member.remove_roles(self._muted_role) -            else: -                log.warning(f"Failed to un-mute user: {user_id} (not found)") -        elif infraction_type == "ban": -            user: Object = Object(user_id) -            try: -                await guild.unban(user) -            except NotFound: -                log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.") - -    def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str: -        """Convert the infraction object to a string representation.""" -        actor_id = infraction_object["actor"] -        guild: Guild = self.bot.get_guild(constants.Guild.id) -        actor = guild.get_member(actor_id) -        active = infraction_object["active"] -        user_id = infraction_object["user"] -        hidden = infraction_object["hidden"] -        created = format_infraction(infraction_object["inserted_at"]) -        if infraction_object["expires_at"] is None: -            expires = "*Permanent*" -        else: -            expires = format_infraction(infraction_object["expires_at"]) - -        lines = textwrap.dedent(f""" -            {"**===============**" if active else "==============="} -            Status: {"__**Active**__" if active else "Inactive"} -            User: {self.bot.get_user(user_id)} (`{user_id}`) -            Type: **{infraction_object["type"]}** -            Shadow: {hidden} -            Reason: {infraction_object["reason"] or "*None*"} -            Created: {created} -            Expires: {expires} -            Actor: {actor.mention if actor else actor_id} -            ID: `{infraction_object["id"]}` -            {"**===============**" if active else "==============="} -        """) - -        return lines.strip() - -    async def notify_infraction( -            self, -            user: Union[User, Member], -            infr_type: str, -            expires_at: Union[datetime, str] = 'N/A', -            reason: str = "No reason provided." -    ) -> bool: -        """ -        Attempt to notify a user, via DM, of their fresh infraction. - -        Returns a boolean indicator of whether the DM was successful. -        """ -        if isinstance(expires_at, datetime): -            expires_at = expires_at.strftime(INFRACTION_FORMAT) - -        embed = Embed( -            description=textwrap.dedent(f""" -                **Type:** {infr_type} -                **Expires:** {expires_at} -                **Reason:** {reason} -                """), -            colour=Colour(Colours.soft_red) -        ) - -        icon_url = INFRACTION_ICONS.get(infr_type, Icons.token_removed) -        embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) -        embed.title = f"Please review our rules over at {RULES_URL}" -        embed.url = RULES_URL - -        if infr_type in APPEALABLE_INFRACTIONS: -            embed.set_footer(text="To appeal this infraction, send an e-mail to [email protected]") - -        return await self.send_private_embed(user, embed) - -    async def notify_pardon( -        self, -        user: Union[User, Member], -        title: str, -        content: str, -        icon_url: str = Icons.user_verified -    ) -> bool: -        """ -        Attempt to notify a user, via DM, of their expired infraction. - -        Optionally returns a boolean indicator of whether the DM was successful. -        """ -        embed = Embed( -            description=content, -            colour=Colour(Colours.soft_green) -        ) - -        embed.set_author(name=title, icon_url=icon_url) - -        return await self.send_private_embed(user, embed) - -    async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool: -        """ -        A helper method for sending an embed to a user's DMs. - -        Returns a boolean indicator of DM success. -        """ -        # sometimes `user` is a `discord.Object`, so let's make it a proper user. -        user = await self.bot.fetch_user(user.id) - -        try: -            await user.send(embed=embed) -            return True -        except (HTTPException, Forbidden): -            log.debug( -                f"Infraction-related information could not be sent to user {user} ({user.id}). " -                "They've probably just disabled private messages." -            ) -            return False - -    async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None: -        """Send a mod log entry if an attempt to DM the target user has failed.""" -        await self.mod_log.send_log_message( -            icon_url=Icons.token_removed, -            content=actor.mention, -            colour=Colour(Colours.soft_red), -            title="Notification Failed", -            text=( -                f"Direct message was unable to be sent.\nUser: {target.mention}\n" -                f"Type: {infraction_type}" -            ) -        ) - -    # endregion - -    # This cannot be static (must have a __func__ attribute). -    async def cog_command_error(self, ctx: Context, error: Exception) -> None: -        """Send a notification to the invoking context on a Union failure.""" -        if isinstance(error, BadUnionArgument): -            if User in error.converters: -                await ctx.send(str(error.errors[0])) -                error.handled = True - -    @staticmethod -    async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool: -        """ -        Check if the highest role of the invoking member is greater than that of the target member. - -        If this check fails, a warning is sent to the invoking ctx. - -        Returns True always if target is not a discord.Member instance. -        """ -        if not isinstance(target, Member): -            return True - -        actor = ctx.author -        target_is_lower = target.top_role < actor.top_role -        if not target_is_lower: -            log.info( -                f"{actor} ({actor.id}) attempted to {infr_type} " -                f"{target} ({target.id}), who has an equal or higher top role." -            ) -            await ctx.send( -                f":x: {actor.mention}, you may not {infr_type} " -                "someone with an equal or higher top role." -            ) - -        return target_is_lower - - -def setup(bot: Bot) -> None: -    """Moderation cog load.""" -    bot.add_cog(Moderation(bot)) -    log.info("Cog loaded: Moderation") diff --git a/bot/cogs/moderation/__init__.py b/bot/cogs/moderation/__init__.py new file mode 100644 index 000000000..7383ed44e --- /dev/null +++ b/bot/cogs/moderation/__init__.py @@ -0,0 +1,25 @@ +import logging + +from discord.ext.commands import Bot + +from .infractions import Infractions +from .management import ModManagement +from .modlog import ModLog +from .superstarify import Superstarify + +log = logging.getLogger(__name__) + + +def setup(bot: Bot) -> None: +    """Load the moderation extension (Infractions, ModManagement, ModLog, & Superstarify cogs).""" +    bot.add_cog(Infractions(bot)) +    log.info("Cog loaded: Infractions") + +    bot.add_cog(ModLog(bot)) +    log.info("Cog loaded: ModLog") + +    bot.add_cog(ModManagement(bot)) +    log.info("Cog loaded: ModManagement") + +    bot.add_cog(Superstarify(bot)) +    log.info("Cog loaded: Superstarify") diff --git a/bot/cogs/moderation/infractions.py b/bot/cogs/moderation/infractions.py new file mode 100644 index 000000000..592ead60f --- /dev/null +++ b/bot/cogs/moderation/infractions.py @@ -0,0 +1,607 @@ +import logging +import textwrap +import typing as t +from datetime import datetime + +import dateutil.parser +import discord +from discord import Member +from discord.ext import commands +from discord.ext.commands import Context, command + +from bot import constants +from bot.api import ResponseCodeError +from bot.constants import Colours, Event +from bot.decorators import respect_role_hierarchy +from bot.utils import time +from bot.utils.checks import with_role_check +from bot.utils.scheduling import Scheduler +from . import utils +from .modlog import ModLog +from .utils import MemberObject + +log = logging.getLogger(__name__) + +MemberConverter = t.Union[utils.UserTypes, utils.proxy_user] + + +class Infractions(Scheduler, commands.Cog): +    """Apply and pardon infractions on users for moderation purposes.""" + +    category = "Moderation" +    category_description = "Server moderation tools." + +    def __init__(self, bot: commands.Bot): +        super().__init__() + +        self.bot = bot +        self.category = "Moderation" +        self._muted_role = discord.Object(constants.Roles.muted) + +        self.bot.loop.create_task(self.reschedule_infractions()) + +    @property +    def mod_log(self) -> ModLog: +        """Get currently loaded ModLog cog instance.""" +        return self.bot.get_cog("ModLog") + +    async def reschedule_infractions(self) -> None: +        """Schedule expiration for previous infractions.""" +        await self.bot.wait_until_ready() + +        infractions = await self.bot.api_client.get( +            'bot/infractions', +            params={'active': 'true'} +        ) +        for infraction in infractions: +            if infraction["expires_at"] is not None: +                self.schedule_task(self.bot.loop, infraction["id"], infraction) + +    @commands.Cog.listener() +    async def on_member_join(self, member: Member) -> None: +        """Reapply active mute infractions for returning members.""" +        active_mutes = await self.bot.api_client.get( +            'bot/infractions', +            params={ +                'user__id': str(member.id), +                'type': 'mute', +                'active': 'true' +            } +        ) +        if not active_mutes: +            return + +        # Assume a single mute because of restrictions elsewhere. +        mute = active_mutes[0] + +        # Calculate the time remaining, in seconds, for the mute. +        expiry = dateutil.parser.isoparse(mute["expires_at"]).replace(tzinfo=None) +        delta = (expiry - datetime.utcnow()).total_seconds() + +        # Mark as inactive if less than a minute remains. +        if delta < 60: +            await self.deactivate_infraction(mute) +            return + +        # Allowing mod log since this is a passive action that should be logged. +        await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}") +        log.debug(f"User {member.id} has been re-muted on rejoin.") + +    # region: Permanent infractions + +    @command() +    async def warn(self, ctx: Context, user: Member, *, reason: str = None) -> None: +        """Warn a user for the given reason.""" +        infraction = await utils.post_infraction(ctx, user, "warning", reason, active=False) +        if infraction is None: +            return + +        await self.apply_infraction(ctx, infraction, user) + +    @command() +    async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: +        """Kick a user for the given reason.""" +        await self.apply_kick(ctx, user, reason, active=False) + +    @command() +    async def ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: +        """Permanently ban a user for the given reason.""" +        await self.apply_ban(ctx, user, reason) + +    # endregion +    # region: Temporary infractions + +    @command(aliases=["mute"]) +    async def tempmute(self, ctx: Context, user: Member, duration: utils.Expiry, *, reason: str = None) -> None: +        """ +        Temporarily mute a user for the given reason and duration. + +        A unit of time should be appended to the duration. +        Units (∗case-sensitive): +        \u2003`y` - years +        \u2003`m` - months∗ +        \u2003`w` - weeks +        \u2003`d` - days +        \u2003`h` - hours +        \u2003`M` - minutes∗ +        \u2003`s` - seconds + +        Alternatively, an ISO 8601 timestamp can be provided for the duration. +        """ +        await self.apply_mute(ctx, user, reason, expires_at=duration) + +    @command() +    async def tempban(self, ctx: Context, user: MemberConverter, duration: utils.Expiry, *, reason: str = None) -> None: +        """ +        Temporarily ban a user for the given reason and duration. + +        A unit of time should be appended to the duration. +        Units (∗case-sensitive): +        \u2003`y` - years +        \u2003`m` - months∗ +        \u2003`w` - weeks +        \u2003`d` - days +        \u2003`h` - hours +        \u2003`M` - minutes∗ +        \u2003`s` - seconds + +        Alternatively, an ISO 8601 timestamp can be provided for the duration. +        """ +        await self.apply_ban(ctx, user, reason, expires_at=duration) + +    # endregion +    # region: Permanent shadow infractions + +    @command(hidden=True) +    async def note(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: +        """Create a private note for a user with the given reason without notifying the user.""" +        infraction = await utils.post_infraction(ctx, user, "note", reason, hidden=True, active=False) +        if infraction is None: +            return + +        await self.apply_infraction(ctx, infraction, user) + +    @command(hidden=True, aliases=['shadowkick', 'skick']) +    async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: +        """Kick a user for the given reason without notifying the user.""" +        await self.apply_kick(ctx, user, reason, hidden=True, active=False) + +    @command(hidden=True, aliases=['shadowban', 'sban']) +    async def shadow_ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: +        """Permanently ban a user for the given reason without notifying the user.""" +        await self.apply_ban(ctx, user, reason, hidden=True) + +    # endregion +    # region: Temporary shadow infractions + +    @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"]) +    async def shadow_tempmute(self, ctx: Context, user: Member, duration: utils.Expiry, *, reason: str = None) -> None: +        """ +        Temporarily mute a user for the given reason and duration without notifying the user. + +        A unit of time should be appended to the duration. +        Units (∗case-sensitive): +        \u2003`y` - years +        \u2003`m` - months∗ +        \u2003`w` - weeks +        \u2003`d` - days +        \u2003`h` - hours +        \u2003`M` - minutes∗ +        \u2003`s` - seconds + +        Alternatively, an ISO 8601 timestamp can be provided for the duration. +        """ +        await self.apply_mute(ctx, user, reason, expires_at=duration, hidden=True) + +    @command(hidden=True, aliases=["shadowtempban, stempban"]) +    async def shadow_tempban( +        self, +        ctx: Context, +        user: MemberConverter, +        duration: utils.Expiry, +        *, +        reason: str = None +    ) -> None: +        """ +        Temporarily ban a user for the given reason and duration without notifying the user. + +        A unit of time should be appended to the duration. +        Units (∗case-sensitive): +        \u2003`y` - years +        \u2003`m` - months∗ +        \u2003`w` - weeks +        \u2003`d` - days +        \u2003`h` - hours +        \u2003`M` - minutes∗ +        \u2003`s` - seconds + +        Alternatively, an ISO 8601 timestamp can be provided for the duration. +        """ +        await self.apply_ban(ctx, user, reason, expires_at=duration, hidden=True) + +    # endregion +    # region: Remove infractions (un- commands) + +    @command() +    async def unmute(self, ctx: Context, user: MemberConverter) -> None: +        """Prematurely end the active mute infraction for the user.""" +        await self.pardon_infraction(ctx, "mute", user) + +    @command() +    async def unban(self, ctx: Context, user: MemberConverter) -> None: +        """Prematurely end the active ban infraction for the user.""" +        await self.pardon_infraction(ctx, "ban", user) + +    # endregion +    # region: Base infraction functions + +    async def apply_mute(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: +        """Apply a mute infraction with kwargs passed to `post_infraction`.""" +        if await utils.has_active_infraction(ctx, user, "mute"): +            return + +        infraction = await utils.post_infraction(ctx, user, "mute", reason, **kwargs) +        if infraction is None: +            return + +        self.mod_log.ignore(Event.member_update, user.id) + +        action = user.add_roles(self._muted_role, reason=reason) +        await self.apply_infraction(ctx, infraction, user, action) + +    @respect_role_hierarchy() +    async def apply_kick(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: +        """Apply a kick infraction with kwargs passed to `post_infraction`.""" +        infraction = await utils.post_infraction(ctx, user, "kick", reason, **kwargs) +        if infraction is None: +            return + +        self.mod_log.ignore(Event.member_remove, user.id) + +        action = user.kick(reason=reason) +        await self.apply_infraction(ctx, infraction, user, action) + +    @respect_role_hierarchy() +    async def apply_ban(self, ctx: Context, user: MemberObject, reason: str, **kwargs) -> None: +        """Apply a ban infraction with kwargs passed to `post_infraction`.""" +        if await utils.has_active_infraction(ctx, user, "ban"): +            return + +        infraction = await utils.post_infraction(ctx, user, "ban", reason, **kwargs) +        if infraction is None: +            return + +        self.mod_log.ignore(Event.member_remove, user.id) + +        action = ctx.guild.ban(user, reason=reason, delete_message_days=0) +        await self.apply_infraction(ctx, infraction, user, action) + +    # endregion +    # region: Utility functions + +    async def _scheduled_task(self, infraction: utils.Infraction) -> None: +        """ +        Marks an infraction expired after the delay from time of scheduling to time of expiration. + +        At the time of expiration, the infraction is marked as inactive on the website and the +        expiration task is cancelled. +        """ +        _id = infraction["id"] + +        expiry = dateutil.parser.isoparse(infraction["expires_at"]).replace(tzinfo=None) +        await time.wait_until(expiry) + +        log.debug(f"Marking infraction {_id} as inactive (expired).") +        await self.deactivate_infraction(infraction) + +    async def deactivate_infraction( +        self, +        infraction: utils.Infraction, +        send_log: bool = True +    ) -> t.Dict[str, str]: +        """ +        Deactivate an active infraction and return a dictionary of lines to send in a mod log. + +        The infraction is removed from Discord, marked as inactive in the database, and has its +        expiration task cancelled. If `send_log` is True, a mod log is sent for the +        deactivation of the infraction. + +        Supported infraction types are mute and ban. Other types will raise a ValueError. +        """ +        guild = self.bot.get_guild(constants.Guild.id) +        mod_role = guild.get_role(constants.Roles.moderator) +        user_id = infraction["user"] +        _type = infraction["type"] +        _id = infraction["id"] +        reason = f"Infraction #{_id} expired or was pardoned." + +        log.debug(f"Marking infraction #{_id} as inactive (expired).") + +        log_content = None +        log_text = { +            "Member": str(user_id), +            "Actor": str(self.bot.user), +            "Reason": infraction["reason"] +        } + +        try: +            if _type == "mute": +                user = guild.get_member(user_id) +                if user: +                    # Remove the muted role. +                    self.mod_log.ignore(Event.member_update, user.id) +                    await user.remove_roles(self._muted_role, reason=reason) + +                    # DM the user about the expiration. +                    notified = await utils.notify_pardon( +                        user=user, +                        title="You have been unmuted.", +                        content="You may now send messages in the server.", +                        icon_url=utils.INFRACTION_ICONS["mute"][1] +                    ) + +                    log_text["Member"] = f"{user.mention}(`{user.id}`)" +                    log_text["DM"] = "Sent" if notified else "**Failed**" +                else: +                    log.info(f"Failed to unmute user {user_id}: user not found") +                    log_text["Failure"] = "User was not found in the guild." +            elif _type == "ban": +                user = discord.Object(user_id) +                self.mod_log.ignore(Event.member_unban, user_id) +                try: +                    await guild.unban(user, reason=reason) +                except discord.NotFound: +                    log.info(f"Failed to unban user {user_id}: no active ban found on Discord") +                    log_text["Note"] = "No active ban found on Discord." +            else: +                raise ValueError( +                    f"Attempted to deactivate an unsupported infraction #{_id} ({_type})!" +                ) +        except discord.Forbidden: +            log.warning(f"Failed to deactivate infraction #{_id} ({_type}): bot lacks permissions") +            log_text["Failure"] = f"The bot lacks permissions to do this (role hierarchy?)" +            log_content = mod_role.mention +        except discord.HTTPException as e: +            log.exception(f"Failed to deactivate infraction #{_id} ({_type})") +            log_text["Failure"] = f"HTTPException with code {e.code}." +            log_content = mod_role.mention + +        # Check if the user is currently being watched by Big Brother. +        try: +            active_watch = await self.bot.api_client.get( +                "bot/infractions", +                params={ +                    "active": "true", +                    "type": "watch", +                    "user__id": user_id +                } +            ) + +            log_text["Watching"] = "Yes" if active_watch else "No" +        except ResponseCodeError: +            log.exception(f"Failed to fetch watch status for user {user_id}") +            log_text["Watching"] = "Unknown - failed to fetch watch status." + +        try: +            # Mark infraction as inactive in the database. +            await self.bot.api_client.patch( +                f"bot/infractions/{_id}", +                json={"active": False} +            ) +        except ResponseCodeError as e: +            log.exception(f"Failed to deactivate infraction #{_id} ({_type})") +            log_line = f"API request failed with code {e.status}." +            log_content = mod_role.mention + +            # Append to an existing failure message if possible +            if "Failure" in log_text: +                log_text["Failure"] += f" {log_line}" +            else: +                log_text["Failure"] = log_line + +        # Cancel the expiration task. +        if infraction["expires_at"] is not None: +            self.cancel_task(infraction["id"]) + +        # Send a log message to the mod log. +        if send_log: +            log_title = f"expiration failed" if "Failure" in log_text else "expired" + +            await self.mod_log.send_log_message( +                icon_url=utils.INFRACTION_ICONS[_type][1], +                colour=Colours.soft_green, +                title=f"Infraction {log_title}: {_type}", +                text="\n".join(f"{k}: {v}" for k, v in log_text.items()), +                footer=f"ID: {_id}", +                content=log_content, +            ) + +        return log_text + +    async def apply_infraction( +        self, +        ctx: Context, +        infraction: utils.Infraction, +        user: MemberObject, +        action_coro: t.Optional[t.Awaitable] = None +    ) -> None: +        """Apply an infraction to the user, log the infraction, and optionally notify the user.""" +        infr_type = infraction["type"] +        icon = utils.INFRACTION_ICONS[infr_type][0] +        reason = infraction["reason"] +        expiry = infraction["expires_at"] + +        if expiry: +            expiry = time.format_infraction(expiry) + +        # Default values for the confirmation message and mod log. +        confirm_msg = f":ok_hand: applied" +        expiry_msg = f" until {expiry}" if expiry else " permanently" +        dm_result = "" +        dm_log_text = "" +        expiry_log_text = f"Expires: {expiry}" if expiry else "" +        log_title = "applied" +        log_content = None + +        # DM the user about the infraction if it's not a shadow/hidden infraction. +        if not infraction["hidden"]: +            # Sometimes user is a discord.Object; make it a proper user. +            await self.bot.fetch_user(user.id) + +            # Accordingly display whether the user was successfully notified via DM. +            if await utils.notify_infraction(user, infr_type, expiry, reason, icon): +                dm_result = ":incoming_envelope: " +                dm_log_text = "\nDM: Sent" +            else: +                dm_log_text = "\nDM: **Failed**" +                log_content = ctx.author.mention + +        if infraction["actor"] == self.bot.user.id: +            end_msg = f" (reason: {infraction['reason']})" +        else: +            infractions = await self.bot.api_client.get( +                "bot/infractions", +                params={"user__id": str(user.id)} +            ) +            end_msg = f" ({len(infractions)} infractions total)" + +        # Execute the necessary actions to apply the infraction on Discord. +        if action_coro: +            try: +                await action_coro +                if expiry: +                    # Schedule the expiration of the infraction. +                    self.schedule_task(ctx.bot.loop, infraction["id"], infraction) +            except discord.Forbidden: +                # Accordingly display that applying the infraction failed. +                confirm_msg = f":x: failed to apply" +                expiry_msg = "" +                log_content = ctx.author.mention +                log_title = "failed to apply" + +        # Send a confirmation message to the invoking context. +        await ctx.send( +            f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}{end_msg}." +        ) + +        # Send a log message to the mod log. +        await self.mod_log.send_log_message( +            icon_url=icon, +            colour=Colours.soft_red, +            title=f"Infraction {log_title}: {infr_type}", +            thumbnail=user.avatar_url_as(static_format="png"), +            text=textwrap.dedent(f""" +                Member: {user.mention} (`{user.id}`) +                Actor: {ctx.message.author}{dm_log_text} +                Reason: {reason} +                {expiry_log_text} +            """), +            content=log_content, +            footer=f"ID {infraction['id']}" +        ) + +    async def pardon_infraction(self, ctx: Context, infr_type: str, user: MemberObject) -> None: +        """Prematurely end an infraction for a user and log the action in the mod log.""" +        # Check the current active infraction +        response = await self.bot.api_client.get( +            'bot/infractions', +            params={ +                'active': 'true', +                'type': infr_type, +                'user__id': user.id +            } +        ) + +        if not response: +            await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.") +            return + +        # Deactivate the infraction and cancel its scheduled expiration task. +        log_text = await self.deactivate_infraction(response[0], send_log=False) + +        log_text["Member"] = f"{user.mention}(`{user.id}`)" +        log_text["Actor"] = str(ctx.message.author) +        log_content = None +        footer = f"ID: {response[0]['id']}" + +        # If multiple active infractions were found, mark them as inactive in the database +        # and cancel their expiration tasks. +        if len(response) > 1: +            log.warning(f"Found more than one active {infr_type} infraction for user {user.id}") + +            footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + +            log_note = f"Found multiple **active** {infr_type} infractions in the database." +            if "Note" in log_text: +                log_text["Note"] = f" {log_note}" +            else: +                log_text["Note"] = log_note + +            # deactivate_infraction() is not called again because: +            #     1. Discord cannot store multiple active bans or assign multiples of the same role +            #     2. It would send a pardon DM for each active infraction, which is redundant +            for infraction in response[1:]: +                _id = infraction['id'] +                try: +                    # Mark infraction as inactive in the database. +                    await self.bot.api_client.patch( +                        f"bot/infractions/{_id}", +                        json={"active": False} +                    ) +                except ResponseCodeError: +                    log.exception(f"Failed to deactivate infraction #{_id} ({infr_type})") +                    # This is simpler and cleaner than trying to concatenate all the errors. +                    log_text["Failure"] = "See bot's logs for details." + +                # Cancel pending expiration task. +                if infraction["expires_at"] is not None: +                    self.cancel_task(infraction["id"]) + +        # Accordingly display whether the user was successfully notified via DM. +        dm_emoji = "" +        if log_text.get("DM") == "Sent": +            dm_emoji = ":incoming_envelope: " +        elif "DM" in log_text: +            # Mention the actor because the DM failed to send. +            log_content = ctx.author.mention + +        # Accordingly display whether the pardon failed. +        if "Failure" in log_text: +            confirm_msg = ":x: failed to pardon" +            log_title = "pardon failed" +            log_content = ctx.author.mention +        else: +            confirm_msg = f":ok_hand: pardoned" +            log_title = "pardoned" + +        # Send a confirmation message to the invoking context. +        await ctx.send( +            f"{dm_emoji}{confirm_msg} infraction **{infr_type}** for {user.mention}. " +            f"{log_text.get('Failure', '')}" +        ) + +        # Send a log message to the mod log. +        await self.mod_log.send_log_message( +            icon_url=utils.INFRACTION_ICONS[infr_type][1], +            colour=Colours.soft_green, +            title=f"Infraction {log_title}: {infr_type}", +            thumbnail=user.avatar_url_as(static_format="png"), +            text="\n".join(f"{k}: {v}" for k, v in log_text.items()), +            footer=footer, +            content=log_content, +        ) + +    # endregion + +    # This cannot be static (must have a __func__ attribute). +    def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators to invoke the commands in this cog.""" +        return with_role_check(ctx, *constants.MODERATION_ROLES) + +    # This cannot be static (must have a __func__ attribute). +    async def cog_command_error(self, ctx: Context, error: Exception) -> None: +        """Send a notification to the invoking context on a Union failure.""" +        if isinstance(error, commands.BadUnionArgument): +            if discord.User in error.converters: +                await ctx.send(str(error.errors[0])) +                error.handled = True diff --git a/bot/cogs/moderation/management.py b/bot/cogs/moderation/management.py new file mode 100644 index 000000000..491f6d400 --- /dev/null +++ b/bot/cogs/moderation/management.py @@ -0,0 +1,268 @@ +import asyncio +import logging +import textwrap +import typing as t + +import discord +from discord.ext import commands +from discord.ext.commands import Context + +from bot import constants +from bot.converters import InfractionSearchQuery +from bot.pagination import LinePaginator +from bot.utils import time +from bot.utils.checks import with_role_check +from . import utils +from .infractions import Infractions +from .modlog import ModLog + +log = logging.getLogger(__name__) + +UserConverter = t.Union[discord.User, utils.proxy_user] + + +def permanent_duration(expires_at: str) -> str: +    """Only allow an expiration to be 'permanent' if it is a string.""" +    expires_at = expires_at.lower() +    if expires_at != "permanent": +        raise commands.BadArgument +    else: +        return expires_at + + +class ModManagement(commands.Cog): +    """Management of infractions.""" + +    category = "Moderation" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot + +    @property +    def mod_log(self) -> ModLog: +        """Get currently loaded ModLog cog instance.""" +        return self.bot.get_cog("ModLog") + +    @property +    def infractions_cog(self) -> Infractions: +        """Get currently loaded Infractions cog instance.""" +        return self.bot.get_cog("Infractions") + +    # region: Edit infraction commands + +    @commands.group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) +    async def infraction_group(self, ctx: Context) -> None: +        """Infraction manipulation commands.""" +        await ctx.invoke(self.bot.get_command("help"), "infraction") + +    @infraction_group.command(name='edit') +    async def infraction_edit( +        self, +        ctx: Context, +        infraction_id: int, +        duration: t.Union[utils.Expiry, permanent_duration, None], +        *, +        reason: str = None +    ) -> None: +        """ +        Edit the duration and/or the reason of an infraction. + +        Durations are relative to the time of updating and should be appended with a unit of time. +        Units (∗case-sensitive): +        \u2003`y` - years +        \u2003`m` - months∗ +        \u2003`w` - weeks +        \u2003`d` - days +        \u2003`h` - hours +        \u2003`M` - minutes∗ +        \u2003`s` - seconds + +        Use "permanent" to mark the infraction as permanent. Alternatively, an ISO 8601 timestamp +        can be provided for the duration. +        """ +        if duration is None and reason is None: +            # Unlike UserInputError, the error handler will show a specified message for BadArgument +            raise commands.BadArgument("Neither a new expiry nor a new reason was specified.") + +        # Retrieve the previous infraction for its information. +        old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}') + +        request_data = {} +        confirm_messages = [] +        log_text = "" + +        if duration == "permanent": +            request_data['expires_at'] = None +            confirm_messages.append("marked as permanent") +        elif duration is not None: +            request_data['expires_at'] = duration.isoformat() +            expiry = duration.strftime(time.INFRACTION_FORMAT) +            confirm_messages.append(f"set to expire on {expiry}") +        else: +            confirm_messages.append("expiry unchanged") + +        if reason: +            request_data['reason'] = reason +            confirm_messages.append("set a new reason") +            log_text += f""" +                Previous reason: {old_infraction['reason']} +                New reason: {reason} +            """.rstrip() +        else: +            confirm_messages.append("reason unchanged") + +        # Update the infraction +        new_infraction = await self.bot.api_client.patch( +            f'bot/infractions/{infraction_id}', +            json=request_data, +        ) + +        # Re-schedule infraction if the expiration has been updated +        if 'expires_at' in request_data: +            self.infractions_cog.cancel_task(new_infraction['id']) +            loop = asyncio.get_event_loop() +            self.infractions_cog.schedule_task(loop, new_infraction['id'], new_infraction) + +            log_text += f""" +                Previous expiry: {old_infraction['expires_at'] or "Permanent"} +                New expiry: {new_infraction['expires_at'] or "Permanent"} +            """.rstrip() + +        await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}") + +        # Get information about the infraction's user +        user_id = new_infraction['user'] +        user = ctx.guild.get_member(user_id) + +        if user: +            user_text = f"{user.mention} (`{user.id}`)" +            thumbnail = user.avatar_url_as(static_format="png") +        else: +            user_text = f"`{user_id}`" +            thumbnail = None + +        # The infraction's actor +        actor_id = new_infraction['actor'] +        actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" + +        await self.mod_log.send_log_message( +            icon_url=constants.Icons.pencil, +            colour=discord.Colour.blurple(), +            title="Infraction edited", +            thumbnail=thumbnail, +            text=textwrap.dedent(f""" +                Member: {user_text} +                Actor: {actor} +                Edited by: {ctx.message.author}{log_text} +            """) +        ) + +    # endregion +    # region: Search infractions + +    @infraction_group.group(name="search", invoke_without_command=True) +    async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: +        """Searches for infractions in the database.""" +        if isinstance(query, discord.User): +            await ctx.invoke(self.search_user, query) +        else: +            await ctx.invoke(self.search_reason, query) + +    @infraction_search_group.command(name="user", aliases=("member", "id")) +    async def search_user(self, ctx: Context, user: UserConverter) -> None: +        """Search for infractions by member.""" +        infraction_list = await self.bot.api_client.get( +            'bot/infractions', +            params={'user__id': str(user.id)} +        ) +        embed = discord.Embed( +            title=f"Infractions for {user} ({len(infraction_list)} total)", +            colour=discord.Colour.orange() +        ) +        await self.send_infraction_list(ctx, embed, infraction_list) + +    @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) +    async def search_reason(self, ctx: Context, reason: str) -> None: +        """Search for infractions by their reason. Use Re2 for matching.""" +        infraction_list = await self.bot.api_client.get( +            'bot/infractions', +            params={'search': reason} +        ) +        embed = discord.Embed( +            title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", +            colour=discord.Colour.orange() +        ) +        await self.send_infraction_list(ctx, embed, infraction_list) + +    # endregion +    # region: Utility functions + +    async def send_infraction_list( +        self, +        ctx: Context, +        embed: discord.Embed, +        infractions: t.Iterable[utils.Infraction] +    ) -> None: +        """Send a paginated embed of infractions for the specified user.""" +        if not infractions: +            await ctx.send(f":warning: No infractions could be found for that query.") +            return + +        lines = tuple( +            self.infraction_to_string(infraction) +            for infraction in infractions +        ) + +        await LinePaginator.paginate( +            lines, +            ctx=ctx, +            embed=embed, +            empty=True, +            max_lines=3, +            max_size=1000 +        ) + +    def infraction_to_string(self, infraction: utils.Infraction) -> str: +        """Convert the infraction object to a string representation.""" +        actor_id = infraction["actor"] +        guild = self.bot.get_guild(constants.Guild.id) +        actor = guild.get_member(actor_id) +        active = infraction["active"] +        user_id = infraction["user"] +        hidden = infraction["hidden"] +        created = time.format_infraction(infraction["inserted_at"]) +        if infraction["expires_at"] is None: +            expires = "*Permanent*" +        else: +            expires = time.format_infraction(infraction["expires_at"]) + +        lines = textwrap.dedent(f""" +            {"**===============**" if active else "==============="} +            Status: {"__**Active**__" if active else "Inactive"} +            User: {self.bot.get_user(user_id)} (`{user_id}`) +            Type: **{infraction["type"]}** +            Shadow: {hidden} +            Reason: {infraction["reason"] or "*None*"} +            Created: {created} +            Expires: {expires} +            Actor: {actor.mention if actor else actor_id} +            ID: `{infraction["id"]}` +            {"**===============**" if active else "==============="} +        """) + +        return lines.strip() + +    # endregion + +    # This cannot be static (must have a __func__ attribute). +    def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators to invoke the commands in this cog.""" +        return with_role_check(ctx, *constants.MODERATION_ROLES) + +    # This cannot be static (must have a __func__ attribute). +    async def cog_command_error(self, ctx: Context, error: Exception) -> None: +        """Send a notification to the invoking context on a Union failure.""" +        if isinstance(error, commands.BadUnionArgument): +            if discord.User in error.converters: +                await ctx.send(str(error.errors[0])) +                error.handled = True diff --git a/bot/cogs/modlog.py b/bot/cogs/moderation/modlog.py index 68424d268..118503517 100644 --- a/bot/cogs/modlog.py +++ b/bot/cogs/moderation/modlog.py @@ -1,30 +1,26 @@  import asyncio  import logging +import typing as t  from datetime import datetime -from typing import List, Optional, Union +import discord  from dateutil.relativedelta import relativedelta  from deepdiff import DeepDiff -from discord import ( -    CategoryChannel, Colour, Embed, File, Guild, -    Member, Message, NotFound, RawMessageDeleteEvent, -    RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel -) +from discord import Colour  from discord.abc import GuildChannel  from discord.ext.commands import Bot, Cog, Context -from bot.constants import ( -    Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs -) +from bot.constants import Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs  from bot.utils.time import humanize_delta +from .utils import UserTypes  log = logging.getLogger(__name__) -GUILD_CHANNEL = Union[CategoryChannel, TextChannel, VoiceChannel] +GUILD_CHANNEL = t.Union[discord.CategoryChannel, discord.TextChannel, discord.VoiceChannel]  CHANNEL_CHANGES_UNSUPPORTED = ("permissions",)  CHANNEL_CHANGES_SUPPRESSED = ("_overwrites", "position") -MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status") +MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status", "nick")  ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions") @@ -38,7 +34,7 @@ class ModLog(Cog, name="ModLog"):          self._cached_deletes = []          self._cached_edits = [] -    async def upload_log(self, messages: List[Message], actor_id: int) -> str: +    async def upload_log(self, messages: t.List[discord.Message], actor_id: int) -> str:          """          Uploads the log data to the database via an API endpoint for uploading logs. @@ -73,23 +69,23 @@ class ModLog(Cog, name="ModLog"):                  self._ignored[event].append(item)      async def send_log_message( -            self, -            icon_url: Optional[str], -            colour: Colour, -            title: Optional[str], -            text: str, -            thumbnail: Optional[str] = None, -            channel_id: int = Channels.modlog, -            ping_everyone: bool = False, -            files: Optional[List[File]] = None, -            content: Optional[str] = None, -            additional_embeds: Optional[List[Embed]] = None, -            additional_embeds_msg: Optional[str] = None, -            timestamp_override: Optional[datetime] = None, -            footer: Optional[str] = None, +        self, +        icon_url: t.Optional[str], +        colour: t.Union[discord.Colour, int], +        title: t.Optional[str], +        text: str, +        thumbnail: t.Optional[t.Union[str, discord.Asset]] = None, +        channel_id: int = Channels.modlog, +        ping_everyone: bool = False, +        files: t.Optional[t.List[discord.File]] = None, +        content: t.Optional[str] = None, +        additional_embeds: t.Optional[t.List[discord.Embed]] = None, +        additional_embeds_msg: t.Optional[str] = None, +        timestamp_override: t.Optional[datetime] = None, +        footer: t.Optional[str] = None,      ) -> Context:          """Generate log embed and send to logging channel.""" -        embed = Embed(description=text) +        embed = discord.Embed(description=text)          if title and icon_url:              embed.set_author(name=title, icon_url=icon_url) @@ -126,10 +122,10 @@ class ModLog(Cog, name="ModLog"):          if channel.guild.id != GuildConstant.id:              return -        if isinstance(channel, CategoryChannel): +        if isinstance(channel, discord.CategoryChannel):              title = "Category created"              message = f"{channel.name} (`{channel.id}`)" -        elif isinstance(channel, VoiceChannel): +        elif isinstance(channel, discord.VoiceChannel):              title = "Voice channel created"              if channel.category: @@ -144,7 +140,7 @@ class ModLog(Cog, name="ModLog"):              else:                  message = f"{channel.name} (`{channel.id}`)" -        await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message) +        await self.send_log_message(Icons.hash_green, Colours.soft_green, title, message)      @Cog.listener()      async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None: @@ -152,20 +148,20 @@ class ModLog(Cog, name="ModLog"):          if channel.guild.id != GuildConstant.id:              return -        if isinstance(channel, CategoryChannel): +        if isinstance(channel, discord.CategoryChannel):              title = "Category deleted" -        elif isinstance(channel, VoiceChannel): +        elif isinstance(channel, discord.VoiceChannel):              title = "Voice channel deleted"          else:              title = "Text channel deleted" -        if channel.category and not isinstance(channel, CategoryChannel): +        if channel.category and not isinstance(channel, discord.CategoryChannel):              message = f"{channel.category}/{channel.name} (`{channel.id}`)"          else:              message = f"{channel.name} (`{channel.id}`)"          await self.send_log_message( -            Icons.hash_red, Colour(Colours.soft_red), +            Icons.hash_red, Colours.soft_red,              title, message          ) @@ -230,29 +226,29 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_guild_role_create(self, role: Role) -> None: +    async def on_guild_role_create(self, role: discord.Role) -> None:          """Log role create event to mod log."""          if role.guild.id != GuildConstant.id:              return          await self.send_log_message( -            Icons.crown_green, Colour(Colours.soft_green), +            Icons.crown_green, Colours.soft_green,              "Role created", f"`{role.id}`"          )      @Cog.listener() -    async def on_guild_role_delete(self, role: Role) -> None: +    async def on_guild_role_delete(self, role: discord.Role) -> None:          """Log role delete event to mod log."""          if role.guild.id != GuildConstant.id:              return          await self.send_log_message( -            Icons.crown_red, Colour(Colours.soft_red), +            Icons.crown_red, Colours.soft_red,              "Role removed", f"{role.name} (`{role.id}`)"          )      @Cog.listener() -    async def on_guild_role_update(self, before: Role, after: Role) -> None: +    async def on_guild_role_update(self, before: discord.Role, after: discord.Role) -> None:          """Log role update event to mod log."""          if before.guild.id != GuildConstant.id:              return @@ -305,7 +301,7 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_guild_update(self, before: Guild, after: Guild) -> None: +    async def on_guild_update(self, before: discord.Guild, after: discord.Guild) -> None:          """Log guild update event to mod log."""          if before.id != GuildConstant.id:              return @@ -356,8 +352,8 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None: -        """Log ban event to mod log.""" +    async def on_member_ban(self, guild: discord.Guild, member: UserTypes) -> None: +        """Log ban event to user log."""          if guild.id != GuildConstant.id:              return @@ -366,14 +362,14 @@ class ModLog(Cog, name="ModLog"):              return          await self.send_log_message( -            Icons.user_ban, Colour(Colours.soft_red), +            Icons.user_ban, Colours.soft_red,              "User banned", f"{member.name}#{member.discriminator} (`{member.id}`)",              thumbnail=member.avatar_url_as(static_format="png"), -            channel_id=Channels.modlog +            channel_id=Channels.userlog          )      @Cog.listener() -    async def on_member_join(self, member: Member) -> None: +    async def on_member_join(self, member: discord.Member) -> None:          """Log member join event to user log."""          if member.guild.id != GuildConstant.id:              return @@ -388,14 +384,14 @@ class ModLog(Cog, name="ModLog"):              message = f"{Emojis.new} {message}"          await self.send_log_message( -            Icons.sign_in, Colour(Colours.soft_green), +            Icons.sign_in, Colours.soft_green,              "User joined", message,              thumbnail=member.avatar_url_as(static_format="png"),              channel_id=Channels.userlog          )      @Cog.listener() -    async def on_member_remove(self, member: Member) -> None: +    async def on_member_remove(self, member: discord.Member) -> None:          """Log member leave event to user log."""          if member.guild.id != GuildConstant.id:              return @@ -405,14 +401,14 @@ class ModLog(Cog, name="ModLog"):              return          await self.send_log_message( -            Icons.sign_out, Colour(Colours.soft_red), +            Icons.sign_out, Colours.soft_red,              "User left", f"{member.name}#{member.discriminator} (`{member.id}`)",              thumbnail=member.avatar_url_as(static_format="png"),              channel_id=Channels.userlog          )      @Cog.listener() -    async def on_member_unban(self, guild: Guild, member: User) -> None: +    async def on_member_unban(self, guild: discord.Guild, member: discord.User) -> None:          """Log member unban event to mod log."""          if guild.id != GuildConstant.id:              return @@ -429,7 +425,7 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_member_update(self, before: Member, after: Member) -> None: +    async def on_member_update(self, before: discord.Member, after: discord.Member) -> None:          """Log member update event to user log."""          if before.guild.id != GuildConstant.id:              return @@ -502,6 +498,11 @@ class ModLog(Cog, name="ModLog"):                  f"**Discriminator:** `{before.discriminator}` **->** `{after.discriminator}`"              ) +        if before.display_name != after.display_name: +            changes.append( +                f"**Display name:** `{before.display_name}` **->** `{after.display_name}`" +            ) +          if not changes:              return @@ -520,7 +521,7 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_message_delete(self, message: Message) -> None: +    async def on_message_delete(self, message: discord.Message) -> None:          """Log message delete event to message change log."""          channel = message.channel          author = message.author @@ -576,7 +577,7 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None: +    async def on_raw_message_delete(self, event: discord.RawMessageDeleteEvent) -> None:          """Log raw message delete event to message change log."""          if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored:              return @@ -610,14 +611,14 @@ class ModLog(Cog, name="ModLog"):              )          await self.send_log_message( -            Icons.message_delete, Colour(Colours.soft_red), +            Icons.message_delete, Colours.soft_red,              "Message deleted",              response,              channel_id=Channels.message_log          )      @Cog.listener() -    async def on_message_edit(self, before: Message, after: Message) -> None: +    async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None:          """Log message edit event to message change log."""          if (              not before.guild @@ -692,12 +693,12 @@ class ModLog(Cog, name="ModLog"):          )      @Cog.listener() -    async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None: +    async def on_raw_message_edit(self, event: discord.RawMessageUpdateEvent) -> None:          """Log raw message edit event to message change log."""          try:              channel = self.bot.get_channel(int(event.data["channel_id"]))              message = await channel.fetch_message(event.message_id) -        except NotFound:  # Was deleted before we got the event +        except discord.NotFound:  # Was deleted before we got the event              return          if ( @@ -760,9 +761,3 @@ class ModLog(Cog, name="ModLog"):              Icons.message_edit, Colour.blurple(), "Message edited (After)",              after_response, channel_id=Channels.message_log          ) - - -def setup(bot: Bot) -> None: -    """Mod log cog load.""" -    bot.add_cog(ModLog(bot)) -    log.info("Cog loaded: ModLog") diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/moderation/superstarify.py index 87021eded..ccc6395d9 100644 --- a/bot/cogs/superstarify/__init__.py +++ b/bot/cogs/moderation/superstarify.py @@ -1,21 +1,23 @@ +import json  import logging  import random +from pathlib import Path  from discord import Colour, Embed, Member  from discord.errors import Forbidden  from discord.ext.commands import Bot, Cog, Context, command -from bot.cogs.moderation import Moderation -from bot.cogs.modlog import ModLog -from bot.cogs.superstarify.stars import get_nick -from bot.constants import Icons, MODERATION_ROLES, POSITIVE_REPLIES -from bot.converters import Duration -from bot.decorators import with_role -from bot.utils.moderation import post_infraction +from bot import constants +from bot.utils.checks import with_role_check  from bot.utils.time import format_infraction +from . import utils +from .modlog import ModLog  log = logging.getLogger(__name__) -NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#wiki-toc-nickname-policy" +NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#nickname-policy" + +with Path("bot/resources/stars.json").open(encoding="utf-8") as stars_file: +    STAR_NAMES = json.load(stars_file)  class Superstarify(Cog): @@ -25,11 +27,6 @@ class Superstarify(Cog):          self.bot = bot      @property -    def moderation(self) -> Moderation: -        """Get currently loaded Moderation cog instance.""" -        return self.bot.get_cog("Moderation") - -    @property      def modlog(self) -> ModLog:          """Get currently loaded ModLog cog instance."""          return self.bot.get_cog("ModLog") @@ -62,7 +59,7 @@ class Superstarify(Cog):          if active_superstarifies:              [infraction] = active_superstarifies -            forced_nick = get_nick(infraction['id'], before.id) +            forced_nick = self.get_nick(infraction['id'], before.id)              if after.display_name == forced_nick:                  return  # Nick change was triggered by this event. Ignore. @@ -108,7 +105,7 @@ class Superstarify(Cog):          if active_superstarifies:              [infraction] = active_superstarifies -            forced_nick = get_nick(infraction['id'], member.id) +            forced_nick = self.get_nick(infraction['id'], member.id)              await member.edit(nick=forced_nick)              end_timestamp_human = format_infraction(infraction['expires_at']) @@ -138,7 +135,7 @@ class Superstarify(Cog):                  f"Superstardom ends: **{end_timestamp_human}**"              )              await self.modlog.send_log_message( -                icon_url=Icons.user_update, +                icon_url=constants.Icons.user_update,                  colour=Colour.gold(),                  title="Superstar member rejoined server",                  text=mod_log_message, @@ -146,45 +143,39 @@ class Superstarify(Cog):              )      @command(name='superstarify', aliases=('force_nick', 'star')) -    @with_role(*MODERATION_ROLES) -    async def superstarify( -        self, ctx: Context, member: Member, expiration: Duration, reason: str = None -    ) -> None: +    async def superstarify(self, ctx: Context, member: Member, duration: utils.Expiry, reason: str = None) -> None:          """          Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration. -        An optional reason can be provided. +        A unit of time should be appended to the duration. +        Units (∗case-sensitive): +        \u2003`y` - years +        \u2003`m` - months∗ +        \u2003`w` - weeks +        \u2003`d` - days +        \u2003`h` - hours +        \u2003`M` - minutes∗ +        \u2003`s` - seconds + +        Alternatively, an ISO 8601 timestamp can be provided for the duration. -        If no reason is given, the original name will be shown in a generated reason. +        An optional reason can be provided. If no reason is given, the original name will be shown +        in a generated reason.          """ -        active_superstarifies = await self.bot.api_client.get( -            'bot/infractions', -            params={ -                'active': 'true', -                'type': 'superstar', -                'user__id': str(member.id) -            } -        ) -        if active_superstarifies: -            await ctx.send( -                ":x: According to my records, this user is already superstarified. " -                f"See infraction **#{active_superstarifies[0]['id']}**." -            ) +        if await utils.has_active_infraction(ctx, member, "superstar"):              return -        infraction = await post_infraction( -            ctx, member, -            type='superstar', reason=reason or ('old nick: ' + member.display_name), -            expires_at=expiration -        ) -        forced_nick = get_nick(infraction['id'], member.id) +        reason = reason or ('old nick: ' + member.display_name) +        infraction = await utils.post_infraction(ctx, member, 'superstar', reason, expires_at=duration) +        forced_nick = self.get_nick(infraction['id'], member.id) +        expiry_str = format_infraction(infraction["expires_at"])          embed = Embed()          embed.title = "Congratulations!"          embed.description = (              f"Your previous nickname, **{member.display_name}**, was so bad that we have decided to change it. "              f"Your new nickname will be **{forced_nick}**.\n\n" -            f"You will be unable to change your nickname until \n**{expiration}**.\n\n" +            f"You will be unable to change your nickname until \n**{expiry_str}**.\n\n"              "If you're confused by this, please read our "              f"[official nickname policy]({NICKNAME_POLICY_URL})."          ) @@ -196,20 +187,20 @@ class Superstarify(Cog):              f"Superstarified by **{ctx.author.name}**\n"              f"Old nickname: `{member.display_name}`\n"              f"New nickname: `{forced_nick}`\n" -            f"Superstardom ends: **{expiration}**" +            f"Superstardom ends: **{expiry_str}**"          )          await self.modlog.send_log_message( -            icon_url=Icons.user_update, +            icon_url=constants.Icons.user_update,              colour=Colour.gold(),              title="Member Achieved Superstardom",              text=mod_log_message,              thumbnail=member.avatar_url_as(static_format="png")          ) -        await self.moderation.notify_infraction( +        await utils.notify_infraction(              user=member,              infr_type="Superstarify", -            expires_at=expiration, +            expires_at=expiry_str,              reason=f"Your nickname didn't comply with our [nickname policy]({NICKNAME_POLICY_URL})."          ) @@ -219,7 +210,6 @@ class Superstarify(Cog):          await ctx.send(embed=embed)      @command(name='unsuperstarify', aliases=('release_nick', 'unstar')) -    @with_role(*MODERATION_ROLES)      async def unsuperstarify(self, ctx: Context, member: Member) -> None:          """Remove the superstarify entry from our database, allowing the user to change their nickname."""          log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}") @@ -247,9 +237,9 @@ class Superstarify(Cog):          embed = Embed()          embed.description = "User has been released from superstar-prison." -        embed.title = random.choice(POSITIVE_REPLIES) +        embed.title = random.choice(constants.POSITIVE_REPLIES) -        await self.moderation.notify_pardon( +        await utils.notify_pardon(              user=member,              title="You are no longer superstarified.",              content="You may now change your nickname on the server." @@ -257,8 +247,13 @@ class Superstarify(Cog):          log.trace(f"{member.display_name} was successfully released from superstar-prison.")          await ctx.send(embed=embed) +    @staticmethod +    def get_nick(infraction_id: int, member_id: int) -> str: +        """Randomly select a nickname from the Superstarify nickname list.""" +        rng = random.Random(str(infraction_id) + str(member_id)) +        return rng.choice(STAR_NAMES) -def setup(bot: Bot) -> None: -    """Superstarify cog load.""" -    bot.add_cog(Superstarify(bot)) -    log.info("Cog loaded: Superstarify") +    # This cannot be static (must have a __func__ attribute). +    def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators to invoke the commands in this cog.""" +        return with_role_check(ctx, *constants.MODERATION_ROLES) diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py new file mode 100644 index 000000000..788a40d40 --- /dev/null +++ b/bot/cogs/moderation/utils.py @@ -0,0 +1,172 @@ +import logging +import textwrap +import typing as t +from datetime import datetime + +import discord +from discord.ext import commands +from discord.ext.commands import Context + +from bot.api import ResponseCodeError +from bot.constants import Colours, Icons +from bot.converters import Duration, ISODateTime + +log = logging.getLogger(__name__) + +# apply icon, pardon icon +INFRACTION_ICONS = { +    "mute": (Icons.user_mute, Icons.user_unmute), +    "kick": (Icons.sign_out, None), +    "ban": (Icons.user_ban, Icons.user_unban), +    "warning": (Icons.user_warn, None), +    "note": (Icons.user_warn, None), +} +RULES_URL = "https://pythondiscord.com/pages/rules" +APPEALABLE_INFRACTIONS = ("ban", "mute") + +UserTypes = t.Union[discord.Member, discord.User] +MemberObject = t.Union[UserTypes, discord.Object] +Infraction = t.Dict[str, t.Union[str, int, bool]] +Expiry = t.Union[Duration, ISODateTime] + + +def proxy_user(user_id: str) -> discord.Object: +    """ +    Create a proxy user object from the given id. + +    Used when a Member or User object cannot be resolved. +    """ +    try: +        user_id = int(user_id) +    except ValueError: +        raise commands.BadArgument + +    user = discord.Object(user_id) +    user.mention = user.id +    user.avatar_url_as = lambda static_format: None + +    return user + + +async def post_infraction( +    ctx: Context, +    user: MemberObject, +    infr_type: str, +    reason: str, +    expires_at: datetime = None, +    hidden: bool = False, +    active: bool = True, +) -> t.Optional[dict]: +    """Posts an infraction to the API.""" +    payload = { +        "actor": ctx.message.author.id, +        "hidden": hidden, +        "reason": reason, +        "type": infr_type, +        "user": user.id, +        "active": active +    } +    if expires_at: +        payload['expires_at'] = expires_at.isoformat() + +    try: +        response = await ctx.bot.api_client.post('bot/infractions', json=payload) +    except ResponseCodeError as exp: +        if exp.status == 400 and 'user' in exp.response_json: +            log.info( +                f"{ctx.author} tried to add a {infr_type} infraction to `{user.id}`, " +                "but that user id was not found in the database." +            ) +            await ctx.send( +                f":x: Cannot add infraction, the specified user is not known to the database." +            ) +            return +        else: +            log.exception("An unexpected ResponseCodeError occurred while adding an infraction:") +            await ctx.send(":x: There was an error adding the infraction.") +            return + +    return response + + +async def has_active_infraction(ctx: Context, user: MemberObject, infr_type: str) -> bool: +    """Checks if a user already has an active infraction of the given type.""" +    active_infractions = await ctx.bot.api_client.get( +        'bot/infractions', +        params={ +            'active': 'true', +            'type': infr_type, +            'user__id': str(user.id) +        } +    ) +    if active_infractions: +        await ctx.send( +            f":x: According to my records, this user already has a {infr_type} infraction. " +            f"See infraction **#{active_infractions[0]['id']}**." +        ) +        return True +    else: +        return False + + +async def notify_infraction( +    user: UserTypes, +    infr_type: str, +    expires_at: t.Optional[str] = None, +    reason: t.Optional[str] = None, +    icon_url: str = Icons.token_removed +) -> bool: +    """DM a user about their new infraction and return True if the DM is successful.""" +    embed = discord.Embed( +        description=textwrap.dedent(f""" +            **Type:** {infr_type.capitalize()} +            **Expires:** {expires_at or "N/A"} +            **Reason:** {reason or "No reason provided."} +            """), +        colour=Colours.soft_red +    ) + +    embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) +    embed.title = f"Please review our rules over at {RULES_URL}" +    embed.url = RULES_URL + +    if infr_type in APPEALABLE_INFRACTIONS: +        embed.set_footer( +            text="To appeal this infraction, send an e-mail to [email protected]" +        ) + +    return await send_private_embed(user, embed) + + +async def notify_pardon( +    user: UserTypes, +    title: str, +    content: str, +    icon_url: str = Icons.user_verified +) -> bool: +    """DM a user about their pardoned infraction and return True if the DM is successful.""" +    embed = discord.Embed( +        description=content, +        colour=Colours.soft_green +    ) + +    embed.set_author(name=title, icon_url=icon_url) + +    return await send_private_embed(user, embed) + + +async def send_private_embed(user: UserTypes, embed: discord.Embed) -> bool: +    """ +    A helper method for sending an embed to a user's DMs. + +    Returns a boolean indicator of DM success. +    """ +    try: +        await user.send(embed=embed) +        return True +    except (discord.HTTPException, discord.Forbidden, discord.NotFound): +        log.debug( +            f"Infraction-related information could not be sent to user {user} ({user.id}). " +            "The user either could not be retrieved or probably disabled their DMs." +        ) +        return False diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py index 16717d523..2977e4ebb 100644 --- a/bot/cogs/off_topic_names.py +++ b/bot/cogs/off_topic_names.py @@ -75,14 +75,16 @@ class OffTopicNames(Cog):          self.bot = bot          self.updater_task = None +        self.bot.loop.create_task(self.init_offtopic_updater()) +      def cog_unload(self) -> None:          """Cancel any running updater tasks on cog unload."""          if self.updater_task is not None:              self.updater_task.cancel() -    @Cog.listener() -    async def on_ready(self) -> None: +    async def init_offtopic_updater(self) -> None:          """Start off-topic channel updating event loop if it hasn't already started.""" +        await self.bot.wait_until_ready()          if self.updater_task is None:              coro = update_names(self.bot)              self.updater_task = self.bot.loop.create_task(coro) diff --git a/bot/cogs/reddit.py b/bot/cogs/reddit.py index 6880aab85..0f575cece 100644 --- a/bot/cogs/reddit.py +++ b/bot/cogs/reddit.py @@ -34,6 +34,8 @@ class Reddit(Cog):          self.new_posts_task = None          self.top_weekly_posts_task = None +        self.bot.loop.create_task(self.init_reddit_polling()) +      async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]:          """A helper method to fetch a certain amount of Reddit posts at a given route."""          # Reddit's JSON responses only provide 25 posts at most. @@ -262,9 +264,9 @@ class Reddit(Cog):              max_lines=15          ) -    @Cog.listener() -    async def on_ready(self) -> None: +    async def init_reddit_polling(self) -> None:          """Initiate reddit post event loop.""" +        await self.bot.wait_until_ready()          self.reddit_channel = await self.bot.fetch_channel(Channels.reddit)          if self.reddit_channel is not None: diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py index 6e91d2c06..b54622306 100644 --- a/bot/cogs/reminders.py +++ b/bot/cogs/reminders.py @@ -30,9 +30,11 @@ class Reminders(Scheduler, Cog):          self.bot = bot          super().__init__() -    @Cog.listener() -    async def on_ready(self) -> None: +        self.bot.loop.create_task(self.reschedule_reminders()) + +    async def reschedule_reminders(self) -> None:          """Get all current reminders from the API and reschedule them.""" +        await self.bot.wait_until_ready()          response = await self.bot.api_client.get(              'bot/reminders',              params={'active': 'true'} diff --git a/bot/cogs/superstarify/stars.py b/bot/cogs/superstarify/stars.py deleted file mode 100644 index dbac86770..000000000 --- a/bot/cogs/superstarify/stars.py +++ /dev/null @@ -1,87 +0,0 @@ -import random - - -STAR_NAMES = ( -    "Adele", -    "Aerosmith", -    "Aretha Franklin", -    "Ayumi Hamasaki", -    "B'z", -    "Barbra Streisand", -    "Barry Manilow", -    "Barry White", -    "Beyonce", -    "Billy Joel", -    "Bob Dylan", -    "Bob Marley", -    "Bob Seger", -    "Bon Jovi", -    "Britney Spears", -    "Bruce Springsteen", -    "Bruno Mars", -    "Bryan Adams", -    "Celine Dion", -    "Cher", -    "Christina Aguilera", -    "David Bowie", -    "Donna Summer", -    "Drake", -    "Ed Sheeran", -    "Elton John", -    "Elvis Presley", -    "Eminem", -    "Enya", -    "Flo Rida", -    "Frank Sinatra", -    "Garth Brooks", -    "George Michael", -    "George Strait", -    "James Taylor", -    "Janet Jackson", -    "Jay-Z", -    "Johnny Cash", -    "Johnny Hallyday", -    "Julio Iglesias", -    "Justin Bieber", -    "Justin Timberlake", -    "Kanye West", -    "Katy Perry", -    "Kenny G", -    "Kenny Rogers", -    "Lady Gaga", -    "Lil Wayne", -    "Linda Ronstadt", -    "Lionel Richie", -    "Madonna", -    "Mariah Carey", -    "Meat Loaf", -    "Michael Jackson", -    "Neil Diamond", -    "Nicki Minaj", -    "Olivia Newton-John", -    "Paul McCartney", -    "Phil Collins", -    "Pink", -    "Prince", -    "Reba McEntire", -    "Rihanna", -    "Robbie Williams", -    "Rod Stewart", -    "Santana", -    "Shania Twain", -    "Stevie Wonder", -    "Taylor Swift", -    "Tim McGraw", -    "Tina Turner", -    "Tom Petty", -    "Tupac Shakur", -    "Usher", -    "Van Halen", -    "Whitney Houston", -) - - -def get_nick(infraction_id: int, member_id: int) -> str: -    """Randomly select a nickname from the Superstarify nickname list.""" -    rng = random.Random(str(infraction_id) + str(member_id)) -    return rng.choice(STAR_NAMES) diff --git a/bot/cogs/sync/cog.py b/bot/cogs/sync/cog.py index b75fb26cd..aaa581f96 100644 --- a/bot/cogs/sync/cog.py +++ b/bot/cogs/sync/cog.py @@ -29,9 +29,11 @@ class Sync(Cog):      def __init__(self, bot: Bot) -> None:          self.bot = bot -    @Cog.listener() -    async def on_ready(self) -> None: +        self.bot.loop.create_task(self.sync_guild()) + +    async def sync_guild(self) -> None:          """Syncs the roles/users of the guild with the database.""" +        await self.bot.wait_until_ready()          guild = self.bot.get_guild(self.SYNC_SERVER_ID)          if guild is not None:              for syncer in self.ON_READY_SYNCERS: diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py index 7dd0afbbd..5a0d20e57 100644 --- a/bot/cogs/token_remover.py +++ b/bot/cogs/token_remover.py @@ -9,7 +9,7 @@ from discord import Colour, Message  from discord.ext.commands import Bot, Cog  from discord.utils import snowflake_time -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog  from bot.constants import Channels, Colours, Event, Icons  log = logging.getLogger(__name__) @@ -26,11 +26,11 @@ DELETION_MESSAGE_TEMPLATE = (  DISCORD_EPOCH_TIMESTAMP = datetime(2017, 1, 1)  TOKEN_EPOCH = 1_293_840_000  TOKEN_RE = re.compile( -    r"[^\s\.]+"     # Matches token part 1: The user ID string, encoded as base64 -    r"\."           # Matches a literal dot between the token parts -    r"[^\s\.]+"     # Matches token part 2: The creation timestamp, as an integer -    r"\."           # Matches a literal dot between the token parts -    r"[^\s\.]+"     # Matches token part 3: The HMAC, unused by us, but check that it isn't empty +    r"[^\s\.()\"']+"  # Matches token part 1: The user ID string, encoded as base64 +    r"\."             # Matches a literal dot between the token parts +    r"[^\s\.()\"']+"  # Matches token part 2: The creation timestamp, as an integer +    r"\."             # Matches a literal dot between the token parts +    r"[^\s\.()\"']+"  # Matches token part 3: The HMAC, unused by us, but check that it isn't empty  ) diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py index 9306c8986..793fe4c1a 100644 --- a/bot/cogs/utils.py +++ b/bot/cogs/utils.py @@ -35,56 +35,58 @@ class Utils(Cog):              await ctx.invoke(self.bot.get_command("help"), "pep")              return -        # Newer PEPs are written in RST instead of txt -        if pep_number > 542: -            pep_url = f"{self.base_github_pep_url}{pep_number:04}.rst" -        else: -            pep_url = f"{self.base_github_pep_url}{pep_number:04}.txt" - -        # Attempt to fetch the PEP -        log.trace(f"Requesting PEP {pep_number} with {pep_url}") -        response = await self.bot.http_session.get(pep_url) - -        if response.status == 200: -            log.trace("PEP found") - -            pep_content = await response.text() - -            # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 -            pep_header = HeaderParser().parse(StringIO(pep_content)) - -            # Assemble the embed -            pep_embed = Embed( -                title=f"**PEP {pep_number} - {pep_header['Title']}**", -                description=f"[Link]({self.base_pep_url}{pep_number:04})", -            ) - -            pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png") - -            # Add the interesting information -            if "Status" in pep_header: -                pep_embed.add_field(name="Status", value=pep_header["Status"]) -            if "Python-Version" in pep_header: -                pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"]) -            if "Created" in pep_header: -                pep_embed.add_field(name="Created", value=pep_header["Created"]) -            if "Type" in pep_header: -                pep_embed.add_field(name="Type", value=pep_header["Type"]) +        possible_extensions = ['.txt', '.rst'] +        found_pep = False +        for extension in possible_extensions: +            # Attempt to fetch the PEP +            pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}" +            log.trace(f"Requesting PEP {pep_number} with {pep_url}") +            response = await self.bot.http_session.get(pep_url) + +            if response.status == 200: +                log.trace("PEP found") +                found_pep = True + +                pep_content = await response.text() + +                # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 +                pep_header = HeaderParser().parse(StringIO(pep_content)) + +                # Assemble the embed +                pep_embed = Embed( +                    title=f"**PEP {pep_number} - {pep_header['Title']}**", +                    description=f"[Link]({self.base_pep_url}{pep_number:04})", +                ) -        elif response.status == 404: +                pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png") + +                # Add the interesting information +                if "Status" in pep_header: +                    pep_embed.add_field(name="Status", value=pep_header["Status"]) +                if "Python-Version" in pep_header: +                    pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"]) +                if "Created" in pep_header: +                    pep_embed.add_field(name="Created", value=pep_header["Created"]) +                if "Type" in pep_header: +                    pep_embed.add_field(name="Type", value=pep_header["Type"]) + +            elif response.status != 404: +                # any response except 200 and 404 is expected +                found_pep = True  # actually not, but it's easier to display this way +                log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " +                          f"{response.status}.\n{response.text}") + +                error_message = "Unexpected HTTP error during PEP search. Please let us know." +                pep_embed = Embed(title="Unexpected error", description=error_message) +                pep_embed.colour = Colour.red() +                break + +        if not found_pep:              log.trace("PEP was not found")              not_found = f"PEP {pep_number} does not exist."              pep_embed = Embed(title="PEP not found", description=not_found)              pep_embed.colour = Colour.red() -        else: -            log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " -                      f"{response.status}.\n{response.text}") - -            error_message = "Unexpected HTTP error during PEP search. Please let us know." -            pep_embed = Embed(title="Unexpected error", description=error_message) -            pep_embed.colour = Colour.red() -          await ctx.message.channel.send(embed=pep_embed)      @command() diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py index f0a099f27..5b115deaa 100644 --- a/bot/cogs/verification.py +++ b/bot/cogs/verification.py @@ -1,10 +1,12 @@  import logging +from datetime import datetime  from discord import Message, NotFound, Object +from discord.ext import tasks  from discord.ext.commands import Bot, Cog, Context, command -from bot.cogs.modlog import ModLog -from bot.constants import Channels, Event, Roles +from bot.cogs.moderation import ModLog +from bot.constants import Bot as BotConfig, Channels, Event, Roles  from bot.decorators import InChannelCheckFailure, in_channel, without_role  log = logging.getLogger(__name__) @@ -27,12 +29,18 @@ from time to time, you can send `!subscribe` to <#{Channels.bot}> at any time to  If you'd like to unsubscribe from the announcement notifications, simply send `!unsubscribe` to <#{Channels.bot}>.  """ +PERIODIC_PING = ( +    f"@everyone To verify that you have read our rules, please type `{BotConfig.prefix}accept`." +    f" Ping <@&{Roles.admin}> if you encounter any problems during the verification process." +) +  class Verification(Cog):      """User verification and role self-management."""      def __init__(self, bot: Bot):          self.bot = bot +        self.periodic_ping.start()      @property      def mod_log(self) -> ModLog: @@ -155,6 +163,34 @@ class Verification(Cog):          else:              return True +    @tasks.loop(hours=12) +    async def periodic_ping(self) -> None: +        """Every week, mention @everyone to remind them to verify.""" +        messages = self.bot.get_channel(Channels.verification).history(limit=10) +        need_to_post = True  # True if a new message needs to be sent. + +        async for message in messages: +            if message.author == self.bot.user and message.content == PERIODIC_PING: +                delta = datetime.utcnow() - message.created_at  # Time since last message. +                if delta.days >= 7:  # Message is older than a week. +                    await message.delete() +                else: +                    need_to_post = False + +                break + +        if need_to_post: +            await self.bot.get_channel(Channels.verification).send(PERIODIC_PING) + +    @periodic_ping.before_loop +    async def before_ping(self) -> None: +        """Only start the loop when the bot is ready.""" +        await self.bot.wait_until_ready() + +    def cog_unload(self) -> None: +        """Cancel the periodic ping task when the cog is unloaded.""" +        self.periodic_ping.cancel() +  def setup(bot: Bot) -> None:      """Verification cog load.""" diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py index 3eba9862f..c516508ca 100644 --- a/bot/cogs/watchchannels/bigbrother.py +++ b/bot/cogs/watchchannels/bigbrother.py @@ -5,9 +5,9 @@ from typing import Union  from discord import User  from discord.ext.commands import Bot, Cog, Context, group +from bot.cogs.moderation.utils import post_infraction  from bot.constants import Channels, Roles, Webhooks  from bot.decorators import with_role -from bot.utils.moderation import post_infraction  from .watchchannel import WatchChannel, proxy_user  log = logging.getLogger(__name__) @@ -64,9 +64,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):              await ctx.send(":x: The specified user is already being watched.")              return -        response = await post_infraction( -            ctx, user, type='watch', reason=reason, hidden=True -        ) +        response = await post_infraction(ctx, user, 'watch', reason, hidden=True)          if response is not None:              self.watched_users[user.id] = response @@ -111,7 +109,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):                  json={'active': False}              ) -            await post_infraction(ctx, user, type='watch', reason=f"Unwatched: {reason}", hidden=True, active=False) +            await post_infraction(ctx, user, 'watch', f"Unwatched: {reason}", hidden=True, active=False)              await ctx.send(f":white_check_mark: Messages sent by {user} will no longer be relayed.") diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py index 760e012eb..0bf75a924 100644 --- a/bot/cogs/watchchannels/watchchannel.py +++ b/bot/cogs/watchchannels/watchchannel.py @@ -13,7 +13,7 @@ from discord import Color, Embed, HTTPException, Message, Object, errors  from discord.ext.commands import BadArgument, Bot, Cog, Context  from bot.api import ResponseCodeError -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog  from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig, Icons  from bot.pagination import LinePaginator  from bot.utils import CogABCMeta, messages diff --git a/bot/decorators.py b/bot/decorators.py index 33a6bcadd..935df4af0 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -3,13 +3,13 @@ import random  from asyncio import Lock, sleep  from contextlib import suppress  from functools import wraps -from typing import Any, Callable, Container, Optional +from typing import Callable, Container, Union  from weakref import WeakValueDictionary -from discord import Colour, Embed +from discord import Colour, Embed, Member  from discord.errors import NotFound  from discord.ext import commands -from discord.ext.commands import CheckFailure, Context +from discord.ext.commands import CheckFailure, Cog, Context  from bot.constants import ERROR_REPLIES, RedirectOutput  from bot.utils.checks import with_role_check, without_role_check @@ -72,13 +72,13 @@ def locked() -> Callable:      Subsequent calls to the command from the same author are ignored until the command has completed invocation. -    This decorator has to go before (below) the `command` decorator. +    This decorator must go before (below) the `command` decorator.      """      def wrap(func: Callable) -> Callable:          func.__locks = WeakValueDictionary()          @wraps(func) -        async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Optional[Any]: +        async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None:              lock = func.__locks.setdefault(ctx.author.id, Lock())              if lock.locked():                  embed = Embed() @@ -93,7 +93,7 @@ def locked() -> Callable:                  return              async with func.__locks.setdefault(ctx.author.id, Lock()): -                return await func(self, ctx, *args, **kwargs) +                await func(self, ctx, *args, **kwargs)          return inner      return wrap @@ -103,17 +103,21 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non      Changes the channel in the context of the command to redirect the output to a certain channel.      Redirect is bypassed if the author has a role to bypass redirection. + +    This decorator must go before (below) the `command` decorator.      """      def wrap(func: Callable) -> Callable:          @wraps(func) -        async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Any: +        async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None:              if ctx.channel.id == destination_channel:                  log.trace(f"Command {ctx.command.name} was invoked in destination_channel, not redirecting") -                return await func(self, ctx, *args, **kwargs) +                await func(self, ctx, *args, **kwargs) +                return              if bypass_roles and any(role.id in bypass_roles for role in ctx.author.roles):                  log.trace(f"{ctx.author} has role to bypass output redirection") -                return await func(self, ctx, *args, **kwargs) +                await func(self, ctx, *args, **kwargs) +                return              redirect_channel = ctx.guild.get_channel(destination_channel)              old_channel = ctx.channel @@ -140,3 +144,50 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non                      log.trace("Redirect output: Deleted invocation message")          return inner      return wrap + + +def respect_role_hierarchy(target_arg: Union[int, str] = 0) -> Callable: +    """ +    Ensure the highest role of the invoking member is greater than that of the target member. + +    If the condition fails, a warning is sent to the invoking context. A target which is not an +    instance of discord.Member will always pass. + +    A value of 0 (i.e. position 0) for `target_arg` corresponds to the argument which comes after +    `ctx`. If the target argument is a kwarg, its name can instead be given. + +    This decorator must go before (below) the `command` decorator. +    """ +    def wrap(func: Callable) -> Callable: +        @wraps(func) +        async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None: +            try: +                target = kwargs[target_arg] +            except KeyError: +                try: +                    target = args[target_arg] +                except IndexError: +                    raise ValueError(f"Could not find target argument at position {target_arg}") +                except TypeError: +                    raise ValueError(f"Could not find target kwarg with key {target_arg!r}") + +            if not isinstance(target, Member): +                log.trace("The target is not a discord.Member; skipping role hierarchy check.") +                await func(self, ctx, *args, **kwargs) +                return + +            cmd = ctx.command.name +            actor = ctx.author +            if target.top_role >= actor.top_role: +                log.info( +                    f"{actor} ({actor.id}) attempted to {cmd} " +                    f"{target} ({target.id}), who has an equal or higher top role." +                ) +                await ctx.send( +                    f":x: {actor.mention}, you may not {cmd} " +                    "someone with an equal or higher top role." +                ) +            else: +                await func(self, ctx, *args, **kwargs) +        return inner +    return wrap diff --git a/bot/resources/stars.json b/bot/resources/stars.json index 8071b9626..c0b253120 100644 --- a/bot/resources/stars.json +++ b/bot/resources/stars.json @@ -1,82 +1,78 @@ -{ -  "Adele": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7c/Adele_2016.jpg/220px-Adele_2016.jpg", -  "Steven Tyler": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a8/Steven_Tyler_by_Gage_Skidmore_3.jpg/220px-Steven_Tyler_by_Gage_Skidmore_3.jpg", -  "Alex Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b3/Alex_Van_Halen_-_Van_Halen_Live.jpg/220px-Alex_Van_Halen_-_Van_Halen_Live.jpg", -  "Aretha Franklin": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c6/Aretha_Franklin_1968.jpg/220px-Aretha_Franklin_1968.jpg", -  "Ayumi Hamasaki": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Ayumi_Hamasaki_2007.jpg/220px-Ayumi_Hamasaki_2007.jpg", -  "Koshi Inaba": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_18.jpg/220px-B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_18.jpg", -  "Barbra Streisand": "https://upload.wikimedia.org/wikipedia/en/thumb/a/a3/Barbra_Streisand_-_1966.jpg/220px-Barbra_Streisand_-_1966.jpg", -  "Barry Manilow": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2b/BarryManilow.jpg/220px-BarryManilow.jpg", -  "Barry White": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b7/Barry_White%2C_Bestanddeelnr_927-0099.jpg/220px-Barry_White%2C_Bestanddeelnr_927-0099.jpg", -  "Beyonce": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/Beyonce_-_The_Formation_World_Tour%2C_at_Wembley_Stadium_in_London%2C_England.jpg/220px-Beyonce_-_The_Formation_World_Tour%2C_at_Wembley_Stadium_in_London%2C_England.jpg", -  "Billy Joel": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/Billy_Joel_Shankbone_NYC_2009.jpg/220px-Billy_Joel_Shankbone_NYC_2009.jpg", -  "Bob Dylan": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/02/Bob_Dylan_-_Azkena_Rock_Festival_2010_2.jpg/220px-Bob_Dylan_-_Azkena_Rock_Festival_2010_2.jpg", -  "Bob Marley": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5e/Bob-Marley.jpg/220px-Bob-Marley.jpg", -  "Bob Seger": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/16/Bob_Seger_2013.jpg/220px-Bob_Seger_2013.jpg", -  "Jon Bon Jovi": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/49/Jon_Bon_Jovi_at_the_2009_Tribeca_Film_Festival_3.jpg/220px-Jon_Bon_Jovi_at_the_2009_Tribeca_Film_Festival_3.jpg", -  "Britney Spears": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/Britney_Spears_2013_%28Straighten_Crop%29.jpg/200px-Britney_Spears_2013_%28Straighten_Crop%29.jpg", -  "Bruce Springsteen": "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3b/Bruce_Springsteen_-_Roskilde_Festival_2012.jpg/210px-Bruce_Springsteen_-_Roskilde_Festival_2012.jpg", -  "Bruno Mars": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b0/BrunoMars24KMagicWorldTourLive_%28cropped%29.jpg/220px-BrunoMars24KMagicWorldTourLive_%28cropped%29.jpg", -  "Bryan Adams": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7e/Bryan_Adams_Hamburg_MG_0631_flickr.jpg/300px-Bryan_Adams_Hamburg_MG_0631_flickr.jpg", -  "Celine Dion": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/42/Celine_Dion_Concert_Singing_Taking_Chances_2008.jpg/220px-Celine_Dion_Concert_Singing_Taking_Chances_2008.jpg", -  "Cher": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Cher_-_Casablanca.jpg/220px-Cher_-_Casablanca.jpg", -  "Christina Aguilera": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e7/Christina_Aguilera_in_2016.jpg/220px-Christina_Aguilera_in_2016.jpg", -  "David Bowie": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e8/David-Bowie_Chicago_2002-08-08_photoby_Adam-Bielawski-cropped.jpg/220px-David-Bowie_Chicago_2002-08-08_photoby_Adam-Bielawski-cropped.jpg", -  "David Lee Roth": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/fb/David_Lee_Roth_-_Van_Halen.jpg/220px-David_Lee_Roth_-_Van_Halen.jpg", -  "Donna Summer": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Nobel_Peace_Price_Concert_2009_Donna_Summer3.jpg/220px-Nobel_Peace_Price_Concert_2009_Donna_Summer3.jpg", -  "Drake": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/81/Drake_at_the_Velvet_Underground_-_2017_%2835986086223%29_%28cropped%29.jpg/220px-Drake_at_the_Velvet_Underground_-_2017_%2835986086223%29_%28cropped%29.jpg", -  "Ed Sheeran": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/55/Ed_Sheeran_2013.jpg/220px-Ed_Sheeran_2013.jpg", -  "Eddie Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Eddie_Van_Halen.jpg/300px-Eddie_Van_Halen.jpg", -  "Elton John": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/Elton_John_2011_Shankbone_2.JPG/220px-Elton_John_2011_Shankbone_2.JPG", -  "Elvis Presley": "https://upload.wikimedia.org/wikipedia/commons/thumb/9/99/Elvis_Presley_promoting_Jailhouse_Rock.jpg/220px-Elvis_Presley_promoting_Jailhouse_Rock.jpg", -  "Eminem": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/4a/Eminem_-_Concert_for_Valor_in_Washington%2C_D.C._Nov._11%2C_2014_%282%29_%28Cropped%29.jpg/220px-Eminem_-_Concert_for_Valor_in_Washington%2C_D.C._Nov._11%2C_2014_%282%29_%28Cropped%29.jpg", -  "Enya": "https://enya.com/wp-content/themes/enya%20full%20site/images/enya-about.jpg", -  "Flo Rida": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8b/Flo_Rida_%286924266548%29.jpg/220px-Flo_Rida_%286924266548%29.jpg", -  "Frank Sinatra": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/Frank_Sinatra_%2757.jpg/220px-Frank_Sinatra_%2757.jpg", -  "Garth Brooks": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/bc/Garth_Brooks_on_World_Tour_%28crop%29.png/220px-Garth_Brooks_on_World_Tour_%28crop%29.png", -  "George Michael": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/George_Michael.jpeg/220px-George_Michael.jpeg", -  "George Strait": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/George_Strait_2014_1.jpg/220px-George_Strait_2014_1.jpg", -  "James Taylor": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/cf/James_Taylor_-_Columbia.jpg/220px-James_Taylor_-_Columbia.jpg", -  "Janet Jackson": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/02/JanetJacksonUnbreakableTourSanFran2015.jpg/220px-JanetJacksonUnbreakableTourSanFran2015.jpg", -  "Jay-Z": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b6/Jay-Z.png/220px-Jay-Z.png", -  "Johnny Cash": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/JohnnyCash1969.jpg/220px-JohnnyCash1969.jpg", -  "Johnny Hallyday": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a1/Johnny_Hallyday_Cannes.jpg/220px-Johnny_Hallyday_Cannes.jpg", -  "Julio Iglesias": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ef/Julio_Iglesias09.jpg/220px-Julio_Iglesias09.jpg", -  "Justin Bieber": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/Justin_Bieber_in_2015.jpg/220px-Justin_Bieber_in_2015.jpg", -  "Justin Timberlake": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ed/Justin_Timberlake_by_Gage_Skidmore_2.jpg/220px-Justin_Timberlake_by_Gage_Skidmore_2.jpg", -  "Kanye West": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/11/Kanye_West_at_the_2009_Tribeca_Film_Festival.jpg/220px-Kanye_West_at_the_2009_Tribeca_Film_Festival.jpg", -  "Katy Perry": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8a/Katy_Perry_at_Madison_Square_Garden_%2837436531092%29_%28cropped%29.jpg/220px-Katy_Perry_at_Madison_Square_Garden_%2837436531092%29_%28cropped%29.jpg", -  "Kenny G": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f4/KennyGHWOFMay2013.jpg/220px-KennyGHWOFMay2013.jpg", -  "Kenny Rogers": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8c/KennyRogers.jpg/220px-KennyRogers.jpg", -  "Lady Gaga": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2c/Lady_Gaga_interview_2016.jpg/220px-Lady_Gaga_interview_2016.jpg", -  "Lil Wayne": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a6/Lil_Wayne_%2823513397583%29.jpg/220px-Lil_Wayne_%2823513397583%29.jpg", -  "Linda Ronstadt": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/LindaRonstadtPerforming.jpg/220px-LindaRonstadtPerforming.jpg", -  "Lionel Richie": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/cd/Lionel_Richie_2017.jpg/220px-Lionel_Richie_2017.jpg", -  "Madonna": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/Madonna_Rebel_Heart_Tour_2015_-_Stockholm_%2823051472299%29_%28cropped_2%29.jpg/220px-Madonna_Rebel_Heart_Tour_2015_-_Stockholm_%2823051472299%29_%28cropped_2%29.jpg", -  "Mariah Carey": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/Mariah_Carey_WBLS_2018_Interview_4.jpg/220px-Mariah_Carey_WBLS_2018_Interview_4.jpg", -  "Meat Loaf": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e7/Meat_Loaf.jpg/220px-Meat_Loaf.jpg", -  "Michael Jackson": "https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Michael_Jackson_in_1988.jpg/220px-Michael_Jackson_in_1988.jpg", -  "Neil Diamond": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f4/Neil_Diamond_HWOF_Aug_2012_other_%28levels_adjusted_and_cropped%29.jpg/220px-Neil_Diamond_HWOF_Aug_2012_other_%28levels_adjusted_and_cropped%29.jpg", -  "Nicki Minaj": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Nicki_Minaj_MTV_VMAs_4.jpg/250px-Nicki_Minaj_MTV_VMAs_4.jpg", -  "Olivia Newton-John": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c7/Olivia_Newton-John_2.jpg/220px-Olivia_Newton-John_2.jpg", -  "Paul McCartney": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5d/Paul_McCartney_-_Out_There_Concert_-_140420-5941-jikatu_%2813950091384%29.jpg/220px-Paul_McCartney_-_Out_There_Concert_-_140420-5941-jikatu_%2813950091384%29.jpg", -  "Phil Collins": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/1_collins.jpg/220px-1_collins.jpg", -  "Pink": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/1a/P%21nk_Live_2013.jpg/220px-P%21nk_Live_2013.jpg", -  "Prince": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b2/Prince_1983_1st_Avenue.jpg/220px-Prince_1983_1st_Avenue.jpg", -  "Reba McEntire": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e0/Reba_McEntire_by_Gage_Skidmore.jpg/220px-Reba_McEntire_by_Gage_Skidmore.jpg", -  "Rihanna": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/47/Rihanna_concert_in_Washington_DC_%282%29.jpg/250px-Rihanna_concert_in_Washington_DC_%282%29.jpg", -  "Robbie Williams": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/21/Robbie_Williams.jpg/220px-Robbie_Williams.jpg", -  "Rod Stewart": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/57/Rod_stewart_05111976_12_400.jpg/220px-Rod_stewart_05111976_12_400.jpg", -  "Carlos Santana": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Santana_2010.jpg/220px-Santana_2010.jpg", -  "Shania Twain": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ee/ShaniaTwainJunoAwardsMar2011.jpg/220px-ShaniaTwainJunoAwardsMar2011.jpg", -  "Stevie Wonder": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Stevie_Wonder_1973.JPG/220px-Stevie_Wonder_1973.JPG", -  "Tak Matsumoto": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_22.jpg/220px-B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_22.jpg", -  "Taylor Swift": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/25/Taylor_Swift_112_%2818119055110%29_%28cropped%29.jpg/220px-Taylor_Swift_112_%2818119055110%29_%28cropped%29.jpg", -  "Tim McGraw": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5f/Tim_McGraw_October_24_2015.jpg/220px-Tim_McGraw_October_24_2015.jpg", -  "Tina Turner": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/10/Tina_turner_21021985_01_350.jpg/250px-Tina_turner_21021985_01_350.jpg", -  "Tom Petty": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5a/Tom_Petty_Live_in_Horsens_%28cropped2%29.jpg/220px-Tom_Petty_Live_in_Horsens_%28cropped2%29.jpg", -  "Tupac Shakur": "https://upload.wikimedia.org/wikipedia/en/thumb/b/b5/Tupac_Amaru_Shakur2.jpg/220px-Tupac_Amaru_Shakur2.jpg", -  "Usher": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/fa/Usher_Cannes_2016_retusche.jpg/220px-Usher_Cannes_2016_retusche.jpg", -  "Whitney Houston": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Whitney_Houston_Welcome_Home_Heroes_1_cropped.jpg/220px-Whitney_Houston_Welcome_Home_Heroes_1_cropped.jpg", -  "Wolfgang Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/Wolfgang_Van_Halen_Different_Kind_of_Truth_2012.jpg/220px-Wolfgang_Van_Halen_Different_Kind_of_Truth_2012.jpg" -} +[ +  "Adele", +  "Aerosmith", +  "Aretha Franklin", +  "Ayumi Hamasaki", +  "B'z", +  "Barbra Streisand", +  "Barry Manilow", +  "Barry White", +  "Beyonce", +  "Billy Joel", +  "Bob Dylan", +  "Bob Marley", +  "Bob Seger", +  "Bon Jovi", +  "Britney Spears", +  "Bruce Springsteen", +  "Bruno Mars", +  "Bryan Adams", +  "Celine Dion", +  "Cher", +  "Christina Aguilera", +  "David Bowie", +  "Donna Summer", +  "Drake", +  "Ed Sheeran", +  "Elton John", +  "Elvis Presley", +  "Eminem", +  "Enya", +  "Flo Rida", +  "Frank Sinatra", +  "Garth Brooks", +  "George Michael", +  "George Strait", +  "James Taylor", +  "Janet Jackson", +  "Jay-Z", +  "Johnny Cash", +  "Johnny Hallyday", +  "Julio Iglesias", +  "Justin Bieber", +  "Justin Timberlake", +  "Kanye West", +  "Katy Perry", +  "Kenny G", +  "Kenny Rogers", +  "Lady Gaga", +  "Lil Wayne", +  "Linda Ronstadt", +  "Lionel Richie", +  "Madonna", +  "Mariah Carey", +  "Meat Loaf", +  "Michael Jackson", +  "Neil Diamond", +  "Nicki Minaj", +  "Olivia Newton-John", +  "Paul McCartney", +  "Phil Collins", +  "Pink", +  "Prince", +  "Reba McEntire", +  "Rihanna", +  "Robbie Williams", +  "Rod Stewart", +  "Santana", +  "Shania Twain", +  "Stevie Wonder", +  "Taylor Swift", +  "Tim McGraw", +  "Tina Turner", +  "Tom Petty", +  "Tupac Shakur", +  "Usher", +  "Van Halen", +  "Whitney Houston" +] diff --git a/bot/utils/checks.py b/bot/utils/checks.py index 19f64ff9f..ad892e512 100644 --- a/bot/utils/checks.py +++ b/bot/utils/checks.py @@ -1,6 +1,8 @@ +import datetime  import logging +from typing import Callable, Iterable -from discord.ext.commands import Context +from discord.ext.commands import BucketType, Cog, Command, CommandOnCooldown, Context, Cooldown, CooldownMapping  log = logging.getLogger(__name__) @@ -42,3 +44,47 @@ def in_channel_check(ctx: Context, channel_id: int) -> bool:      log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. "                f"The result of the in_channel check was {check}.")      return check + + +def cooldown_with_role_bypass(rate: int, per: float, type: BucketType = BucketType.default, *, +                              bypass_roles: Iterable[int]) -> Callable: +    """ +    Applies a cooldown to a command, but allows members with certain roles to be ignored. + +    NOTE: this replaces the `Command.before_invoke` callback, which *might* introduce problems in the future. +    """ +    # make it a set so lookup is hash based +    bypass = set(bypass_roles) + +    # this handles the actual cooldown logic +    buckets = CooldownMapping(Cooldown(rate, per, type)) + +    # will be called after the command has been parse but before it has been invoked, ensures that +    # the cooldown won't be updated if the user screws up their input to the command +    async def predicate(cog: Cog, ctx: Context) -> None: +        nonlocal bypass, buckets + +        if any(role.id in bypass for role in ctx.author.roles): +            return + +        # cooldown logic, taken from discord.py internals +        current = ctx.message.created_at.replace(tzinfo=datetime.timezone.utc).timestamp() +        bucket = buckets.get_bucket(ctx.message) +        retry_after = bucket.update_rate_limit(current) +        if retry_after: +            raise CommandOnCooldown(bucket, retry_after) + +    def wrapper(command: Command) -> Command: +        # NOTE: this could be changed if a subclass of Command were to be used. I didn't see the need for it +        # so I just made it raise an error when the decorator is applied before the actual command object exists. +        # +        # if the `before_invoke` detail is ever a problem then I can quickly just swap over. +        if not isinstance(command, Command): +            raise TypeError('Decorator `cooldown_with_role_bypass` must be applied after the command decorator. ' +                            'This means it has to be above the command decorator in the code.') + +        command._before_invoke = predicate + +        return command + +    return wrapper diff --git a/bot/utils/moderation.py b/bot/utils/moderation.py deleted file mode 100644 index 7860f14a1..000000000 --- a/bot/utils/moderation.py +++ /dev/null @@ -1,72 +0,0 @@ -import logging -from datetime import datetime -from typing import Optional, Union - -from discord import Member, Object, User -from discord.ext.commands import Context - -from bot.api import ResponseCodeError -from bot.constants import Keys - -log = logging.getLogger(__name__) - -HEADERS = {"X-API-KEY": Keys.site_api} - - -async def post_infraction( -    ctx: Context, -    user: Union[Member, Object, User], -    type: str, -    reason: str, -    expires_at: datetime = None, -    hidden: bool = False, -    active: bool = True, -) -> Optional[dict]: -    """Posts an infraction to the API.""" -    payload = { -        "actor": ctx.message.author.id, -        "hidden": hidden, -        "reason": reason, -        "type": type, -        "user": user.id, -        "active": active -    } -    if expires_at: -        payload['expires_at'] = expires_at.isoformat() - -    try: -        response = await ctx.bot.api_client.post('bot/infractions', json=payload) -    except ResponseCodeError as exp: -        if exp.status == 400 and 'user' in exp.response_json: -            log.info( -                f"{ctx.author} tried to add a {type} infraction to `{user.id}`, " -                "but that user id was not found in the database." -            ) -            await ctx.send(f":x: Cannot add infraction, the specified user is not known to the database.") -            return -        else: -            log.exception("An unexpected ResponseCodeError occurred while adding an infraction:") -            await ctx.send(":x: There was an error adding the infraction.") -            return - -    return response - - -async def already_has_active_infraction(ctx: Context, user: Union[Member, Object, User], type: str) -> bool: -    """Checks if a user already has an active infraction of the given type.""" -    active_infractions = await ctx.bot.api_client.get( -        'bot/infractions', -        params={ -            'active': 'true', -            'type': type, -            'user__id': str(user.id) -        } -    ) -    if active_infractions: -        await ctx.send( -            f":x: According to my records, this user already has a {type} infraction. " -            f"See infraction **#{active_infractions[0]['id']}**." -        ) -        return True -    else: -        return False diff --git a/bot/utils/time.py b/bot/utils/time.py index da28f2c76..2aea2c099 100644 --- a/bot/utils/time.py +++ b/bot/utils/time.py @@ -1,5 +1,6 @@  import asyncio  import datetime +from typing import Optional  import dateutil.parser  from dateutil.relativedelta import relativedelta @@ -34,6 +35,9 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units:      precision specifies the smallest unit of time to include (e.g. "seconds", "minutes").      max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours).      """ +    if max_units <= 0: +        raise ValueError("max_units must be positive") +      units = (          ("years", delta.years),          ("months", delta.months), @@ -83,15 +87,20 @@ def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max      return f"{humanized} ago" -def parse_rfc1123(time_str: str) -> datetime.datetime: +def parse_rfc1123(stamp: str) -> datetime.datetime:      """Parse RFC1123 time string into datetime.""" -    return datetime.datetime.strptime(time_str, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc) +    return datetime.datetime.strptime(stamp, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc)  # Hey, this could actually be used in the off_topic_names and reddit cogs :) -async def wait_until(time: datetime.datetime) -> None: -    """Wait until a given time.""" -    delay = time - datetime.datetime.utcnow() +async def wait_until(time: datetime.datetime, start: Optional[datetime.datetime] = None) -> None: +    """ +    Wait until a given time. + +    :param time: A datetime.datetime object to wait until. +    :param start: The start from which to calculate the waiting duration. Defaults to UTC time. +    """ +    delay = time - (start or datetime.datetime.utcnow())      delay_seconds = delay.total_seconds()      # Incorporate a small delay so we don't rapid-fire the event due to time precision errors diff --git a/config-default.yml b/config-default.yml index 827ae4619..ca405337e 100644 --- a/config-default.yml +++ b/config-default.yml @@ -282,7 +282,7 @@ anti_spam:      rules:          attachments:              interval: 10 -            max: 3 +            max: 9          burst:              interval: 10 diff --git a/docker-compose.yml b/docker-compose.yml index 9684a3c62..f79fdba58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,7 +6,7 @@ version: "3.7"  services:    postgres: -    image: postgres:11-alpine +    image: postgres:12-alpine      environment:        POSTGRES_DB: pysite        POSTGRES_PASSWORD: pysite diff --git a/tests/helpers.py b/tests/helpers.py index 2908294f7..25059fa3a 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -7,6 +7,10 @@ __all__ = ('AsyncMock', 'async_test')  # TODO: Remove me on 3.8 +# Allows you to mock a coroutine. Since the default `__call__` of `MagicMock` +# is not a coroutine, trying to mock a coroutine with it will result in errors +# as the default `__call__` is not awaitable. Use this class for monkeypatching +# coroutines instead.  class AsyncMock(MagicMock):      async def __call__(self, *args, **kwargs):          return super(AsyncMock, self).__call__(*args, **kwargs) diff --git a/tests/test_resources.py b/tests/test_resources.py index 2b17aea64..bcf124f05 100644 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -1,18 +1,13 @@  import json -import mimetypes  from pathlib import Path -from urllib.parse import urlparse  def test_stars_valid(): -    """Validates that `bot/resources/stars.json` contains valid images.""" +    """Validates that `bot/resources/stars.json` contains a list of strings."""      path = Path('bot', 'resources', 'stars.json')      content = path.read_text()      data = json.loads(content) -    for url in data.values(): -        assert urlparse(url).scheme == 'https' - -        mimetype, _ = mimetypes.guess_type(url) -        assert mimetype in ('image/jpeg', 'image/png') +    for name in data: +        assert type(name) is str diff --git a/tests/utils/test_time.py b/tests/utils/test_time.py new file mode 100644 index 000000000..4baa6395c --- /dev/null +++ b/tests/utils/test_time.py @@ -0,0 +1,62 @@ +import asyncio +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest +from dateutil.relativedelta import relativedelta + +from bot.utils import time +from tests.helpers import AsyncMock + + +    ('delta', 'precision', 'max_units', 'expected'), +    ( +        (relativedelta(days=2), 'seconds', 1, '2 days'), +        (relativedelta(days=2, hours=2), 'seconds', 2, '2 days and 2 hours'), +        (relativedelta(days=2, hours=2), 'seconds', 1, '2 days'), +        (relativedelta(days=2, hours=2), 'days', 2, '2 days'), + +        # Does not abort for unknown units, as the unit name is checked +        # against the attribute of the relativedelta instance. +        (relativedelta(days=2, hours=2), 'elephants', 2, '2 days and 2 hours'), + +        # Very high maximum units, but it only ever iterates over +        # each value the relativedelta might have. +        (relativedelta(days=2, hours=2), 'hours', 20, '2 days and 2 hours'), +    ) +) +def test_humanize_delta( +        delta: relativedelta, +        precision: str, +        max_units: int, +        expected: str +): +    assert time.humanize_delta(delta, precision, max_units) == expected + + [email protected]('max_units', (-1, 0)) +def test_humanize_delta_raises_for_invalid_max_units(max_units: int): +    with pytest.raises(ValueError, match='max_units must be positive'): +        time.humanize_delta(relativedelta(days=2, hours=2), 'hours', max_units) + + +    ('stamp', 'expected'), +    ( +        ('Sun, 15 Sep 2019 12:00:00 GMT', datetime(2019, 9, 15, 12, 0, 0, tzinfo=timezone.utc)), +    ) +) +def test_parse_rfc1123(stamp: str, expected: str): +    assert time.parse_rfc1123(stamp) == expected + + +@patch('asyncio.sleep', new_callable=AsyncMock) +def test_wait_until(sleep_patch): +    start = datetime(2019, 1, 1, 0, 0) +    then = datetime(2019, 1, 1, 0, 10) + +    # No return value +    assert asyncio.run(time.wait_until(then, start)) is None + +    sleep_patch.assert_called_once_with(10 * 60) | 
