diff options
40 files changed, 1951 insertions, 1948 deletions
diff --git a/.gitignore b/.gitignore index 261fa179f..a191523b6 100644 --- a/.gitignore +++ b/.gitignore @@ -116,3 +116,6 @@ config.yml # JUnit XML reports from pytest junit.xml + +# Mac OS .DS_Store, which is a file that stores custom attributes of its containing folder +.DS_Store diff --git a/bot/__main__.py b/bot/__main__.py index f25693734..19a7e5ec6 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -36,14 +36,13 @@ log.addHandler(APILoggingHandler(bot.api_client)) bot.load_extension("bot.cogs.error_handler") bot.load_extension("bot.cogs.filtering") bot.load_extension("bot.cogs.logging") -bot.load_extension("bot.cogs.modlog") bot.load_extension("bot.cogs.security") # Commands, etc bot.load_extension("bot.cogs.antispam") bot.load_extension("bot.cogs.bot") bot.load_extension("bot.cogs.clean") -bot.load_extension("bot.cogs.cogs") +bot.load_extension("bot.cogs.extensions") bot.load_extension("bot.cogs.help") # Only load this in production @@ -64,7 +63,6 @@ bot.load_extension("bot.cogs.reddit") bot.load_extension("bot.cogs.reminders") bot.load_extension("bot.cogs.site") bot.load_extension("bot.cogs.snekbox") -bot.load_extension("bot.cogs.superstarify") bot.load_extension("bot.cogs.sync") bot.load_extension("bot.cogs.tags") bot.load_extension("bot.cogs.token_remover") diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py index 0f49a400c..6648805e9 100644 --- a/bot/cogs/alias.py +++ b/bot/cogs/alias.py @@ -5,6 +5,7 @@ from typing import Union from discord import Colour, Embed, Member, User from discord.ext.commands import Bot, Cog, Command, Context, clean_content, command, group +from bot.cogs.extensions import Extension from bot.cogs.watchchannels.watchchannel import proxy_user from bot.converters import TagNameConverter from bot.pagination import LinePaginator @@ -84,9 +85,9 @@ class Alias (Cog): await self.invoke(ctx, "site rules") @command(name="reload", hidden=True) - async def cogs_reload_alias(self, ctx: Context, *, cog_name: str) -> None: - """Alias for invoking <prefix>cogs reload [cog_name].""" - await self.invoke(ctx, "cogs reload", cog_name) + async def extensions_reload_alias(self, ctx: Context, *extensions: Extension) -> None: + """Alias for invoking <prefix>extensions reload [extensions...].""" + await self.invoke(ctx, "extensions reload", *extensions) @command(name="defon", hidden=True) async def defcon_enable_alias(self, ctx: Context) -> None: diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py index 8dfa0ad05..1b394048a 100644 --- a/bot/cogs/antispam.py +++ b/bot/cogs/antispam.py @@ -10,7 +10,7 @@ from discord import Colour, Member, Message, NotFound, Object, TextChannel from discord.ext.commands import Bot, Cog from bot import rules -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import ( AntiSpam as AntiSpamConfig, Channels, Colours, DEBUG_MODE, Event, Filter, @@ -107,14 +107,16 @@ class AntiSpam(Cog): self.message_deletion_queue = dict() self.queue_consumption_tasks = dict() + self.bot.loop.create_task(self.alert_on_validation_error()) + @property def mod_log(self) -> ModLog: """Allows for easy access of the ModLog cog.""" return self.bot.get_cog("ModLog") - @Cog.listener() - async def on_ready(self) -> None: + async def alert_on_validation_error(self) -> None: """Unloads the cog and alerts admins if configuration validation failed.""" + await self.bot.wait_until_ready() if self.validation_errors: body = "**The following errors were encountered:**\n" body += "\n".join(f"- {error}" for error in self.validation_errors.values()) @@ -207,8 +209,10 @@ class AntiSpam(Cog): if not any(role.id == self.muted_role.id for role in member.roles): remove_role_after = AntiSpamConfig.punishment['remove_after'] - # We need context, let's get it + # Get context and make sure the bot becomes the actor of infraction by patching the `author` attributes context = await self.bot.get_context(msg) + context.author = self.bot.user + context.message.author = self.bot.user # Since we're going to invoke the tempmute command directly, we need to manually call the converter. dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S") diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py index 1c0c9a7a8..dca411d01 100644 --- a/bot/cogs/clean.py +++ b/bot/cogs/clean.py @@ -6,7 +6,7 @@ from typing import Optional from discord import Colour, Embed, Message, User from discord.ext.commands import Bot, Cog, Context, group -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import ( Channels, CleanMessages, Colours, Event, Icons, MODERATION_ROLES, NEGATIVE_REPLIES diff --git a/bot/cogs/cogs.py b/bot/cogs/cogs.py deleted file mode 100644 index 1f6ccd09c..000000000 --- a/bot/cogs/cogs.py +++ /dev/null @@ -1,298 +0,0 @@ -import logging -import os - -from discord import Colour, Embed -from discord.ext.commands import Bot, Cog, Context, group - -from bot.constants import ( - Emojis, MODERATION_ROLES, Roles, URLs -) -from bot.decorators import with_role -from bot.pagination import LinePaginator - -log = logging.getLogger(__name__) - -KEEP_LOADED = ["bot.cogs.cogs", "bot.cogs.modlog"] - - -class Cogs(Cog): - """Cog management commands.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.cogs = {} - - # Load up the cog names - log.info("Initializing cog names...") - for filename in os.listdir("bot/cogs"): - if filename.endswith(".py") and "_" not in filename: - if os.path.isfile(f"bot/cogs/{filename}"): - cog = filename[:-3] - - self.cogs[cog] = f"bot.cogs.{cog}" - - # Allow reverse lookups by reversing the pairs - self.cogs.update({v: k for k, v in self.cogs.items()}) - - @group(name='cogs', aliases=('c',), invoke_without_command=True) - @with_role(*MODERATION_ROLES, Roles.core_developer) - async def cogs_group(self, ctx: Context) -> None: - """Load, unload, reload, and list active cogs.""" - await ctx.invoke(self.bot.get_command("help"), "cogs") - - @cogs_group.command(name='load', aliases=('l',)) - @with_role(*MODERATION_ROLES, Roles.core_developer) - async def load_command(self, ctx: Context, cog: str) -> None: - """ - Load up an unloaded cog, given the module containing it. - - You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the - entire module directly. - """ - cog = cog.lower() - - embed = Embed() - embed.colour = Colour.red() - - embed.set_author( - name="Python Bot (Cogs)", - url=URLs.github_bot_repo, - icon_url=URLs.bot_avatar - ) - - if cog in self.cogs: - full_cog = self.cogs[cog] - elif "." in cog: - full_cog = cog - else: - full_cog = None - log.warning(f"{ctx.author} requested we load the '{cog}' cog, but that cog doesn't exist.") - embed.description = f"Unknown cog: {cog}" - - if full_cog: - if full_cog not in self.bot.extensions: - try: - self.bot.load_extension(full_cog) - except ImportError: - log.exception(f"{ctx.author} requested we load the '{cog}' cog, " - f"but the cog module {full_cog} could not be found!") - embed.description = f"Invalid cog: {cog}\n\nCould not find cog module {full_cog}" - except Exception as e: - log.exception(f"{ctx.author} requested we load the '{cog}' cog, " - "but the loading failed") - embed.description = f"Failed to load cog: {cog}\n\n{e.__class__.__name__}: {e}" - else: - log.debug(f"{ctx.author} requested we load the '{cog}' cog. Cog loaded!") - embed.description = f"Cog loaded: {cog}" - embed.colour = Colour.green() - else: - log.warning(f"{ctx.author} requested we load the '{cog}' cog, but the cog was already loaded!") - embed.description = f"Cog {cog} is already loaded" - - await ctx.send(embed=embed) - - @cogs_group.command(name='unload', aliases=('ul',)) - @with_role(*MODERATION_ROLES, Roles.core_developer) - async def unload_command(self, ctx: Context, cog: str) -> None: - """ - Unload an already-loaded cog, given the module containing it. - - You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the - entire module directly. - """ - cog = cog.lower() - - embed = Embed() - embed.colour = Colour.red() - - embed.set_author( - name="Python Bot (Cogs)", - url=URLs.github_bot_repo, - icon_url=URLs.bot_avatar - ) - - if cog in self.cogs: - full_cog = self.cogs[cog] - elif "." in cog: - full_cog = cog - else: - full_cog = None - log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but that cog doesn't exist.") - embed.description = f"Unknown cog: {cog}" - - if full_cog: - if full_cog in KEEP_LOADED: - log.warning(f"{ctx.author} requested we unload `{full_cog}`, that sneaky pete. We said no.") - embed.description = f"You may not unload `{full_cog}`!" - elif full_cog in self.bot.extensions: - try: - self.bot.unload_extension(full_cog) - except Exception as e: - log.exception(f"{ctx.author} requested we unload the '{cog}' cog, " - "but the unloading failed") - embed.description = f"Failed to unload cog: {cog}\n\n```{e}```" - else: - log.debug(f"{ctx.author} requested we unload the '{cog}' cog. Cog unloaded!") - embed.description = f"Cog unloaded: {cog}" - embed.colour = Colour.green() - else: - log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but the cog wasn't loaded!") - embed.description = f"Cog {cog} is not loaded" - - await ctx.send(embed=embed) - - @cogs_group.command(name='reload', aliases=('r',)) - @with_role(*MODERATION_ROLES, Roles.core_developer) - async def reload_command(self, ctx: Context, cog: str) -> None: - """ - Reload an unloaded cog, given the module containing it. - - You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the - entire module directly. - - If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the - bot/cogs directory will be loaded. - """ - cog = cog.lower() - - embed = Embed() - embed.colour = Colour.red() - - embed.set_author( - name="Python Bot (Cogs)", - url=URLs.github_bot_repo, - icon_url=URLs.bot_avatar - ) - - if cog == "*": - full_cog = cog - elif cog in self.cogs: - full_cog = self.cogs[cog] - elif "." in cog: - full_cog = cog - else: - full_cog = None - log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but that cog doesn't exist.") - embed.description = f"Unknown cog: {cog}" - - if full_cog: - if full_cog == "*": - all_cogs = [ - f"bot.cogs.{fn[:-3]}" for fn in os.listdir("bot/cogs") - if os.path.isfile(f"bot/cogs/{fn}") and fn.endswith(".py") and "_" not in fn - ] - - failed_unloads = {} - failed_loads = {} - - unloaded = 0 - loaded = 0 - - for loaded_cog in self.bot.extensions.copy().keys(): - try: - self.bot.unload_extension(loaded_cog) - except Exception as e: - failed_unloads[loaded_cog] = f"{e.__class__.__name__}: {e}" - else: - unloaded += 1 - - for unloaded_cog in all_cogs: - try: - self.bot.load_extension(unloaded_cog) - except Exception as e: - failed_loads[unloaded_cog] = f"{e.__class__.__name__}: {e}" - else: - loaded += 1 - - lines = [ - "**All cogs reloaded**", - f"**Unloaded**: {unloaded} / **Loaded**: {loaded}" - ] - - if failed_unloads: - lines.append("\n**Unload failures**") - - for cog, error in failed_unloads: - lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`") - - if failed_loads: - lines.append("\n**Load failures**") - - for cog, error in failed_loads.items(): - lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`") - - log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n" - f"{lines}") - - await LinePaginator.paginate(lines, ctx, embed, empty=False) - return - - elif full_cog in self.bot.extensions: - try: - self.bot.unload_extension(full_cog) - self.bot.load_extension(full_cog) - except Exception as e: - log.exception(f"{ctx.author} requested we reload the '{cog}' cog, " - "but the unloading failed") - embed.description = f"Failed to reload cog: {cog}\n\n```{e}```" - else: - log.debug(f"{ctx.author} requested we reload the '{cog}' cog. Cog reloaded!") - embed.description = f"Cog reload: {cog}" - embed.colour = Colour.green() - else: - log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but the cog wasn't loaded!") - embed.description = f"Cog {cog} is not loaded" - - await ctx.send(embed=embed) - - @cogs_group.command(name='list', aliases=('all',)) - @with_role(*MODERATION_ROLES, Roles.core_developer) - async def list_command(self, ctx: Context) -> None: - """ - Get a list of all cogs, including their loaded status. - - Gray indicates that the cog is unloaded. Green indicates that the cog is currently loaded. - """ - embed = Embed() - lines = [] - cogs = {} - - embed.colour = Colour.blurple() - embed.set_author( - name="Python Bot (Cogs)", - url=URLs.github_bot_repo, - icon_url=URLs.bot_avatar - ) - - for key, _value in self.cogs.items(): - if "." not in key: - continue - - if key in self.bot.extensions: - cogs[key] = True - else: - cogs[key] = False - - for key in self.bot.extensions.keys(): - if key not in self.cogs: - cogs[key] = True - - for cog, loaded in sorted(cogs.items(), key=lambda x: x[0]): - if cog in self.cogs: - cog = self.cogs[cog] - - if loaded: - status = Emojis.status_online - else: - status = Emojis.status_offline - - lines.append(f"{status} {cog}") - - log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") - await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False) - - -def setup(bot: Bot) -> None: - """Cogs cog load.""" - bot.add_cog(Cogs(bot)) - log.info("Cog loaded: Cogs") diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py index 048d8a683..70e101baa 100644 --- a/bot/cogs/defcon.py +++ b/bot/cogs/defcon.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta from discord import Colour, Embed, Member from discord.ext.commands import Bot, Cog, Context, group -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import Channels, Colours, Emojis, Event, Icons, Roles from bot.decorators import with_role @@ -35,14 +35,16 @@ class Defcon(Cog): self.channel = None self.days = timedelta(days=0) + self.bot.loop.create_task(self.sync_settings()) + @property def mod_log(self) -> ModLog: """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") - @Cog.listener() - async def on_ready(self) -> None: + async def sync_settings(self) -> None: """On cog load, try to synchronize DEFCON settings to the API.""" + await self.bot.wait_until_ready() self.channel = await self.bot.fetch_channel(Channels.defcon) try: response = await self.bot.api_client.get('bot/bot-settings/defcon') diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py index c9e6b3b91..a13464bff 100644 --- a/bot/cogs/doc.py +++ b/bot/cogs/doc.py @@ -126,9 +126,11 @@ class Doc(commands.Cog): self.bot = bot self.inventories = {} - @commands.Cog.listener() - async def on_ready(self) -> None: - """Refresh documentation inventory.""" + self.bot.loop.create_task(self.init_refresh_inventory()) + + async def init_refresh_inventory(self) -> None: + """Refresh documentation inventory on cog initialization.""" + await self.bot.wait_until_ready() await self.refresh_inventory() async def update_single( @@ -207,6 +209,9 @@ class Doc(commands.Cog): symbol_heading = soup.find(id=symbol_id) signature_buffer = [] + if symbol_heading is None: + return None + # Traverse the tags of the signature header and ignore any # unwanted symbols from it. Add all of it to a temporary buffer. for tag in symbol_heading.strings: diff --git a/bot/cogs/extensions.py b/bot/cogs/extensions.py new file mode 100644 index 000000000..bb66e0b8e --- /dev/null +++ b/bot/cogs/extensions.py @@ -0,0 +1,236 @@ +import functools +import logging +import typing as t +from enum import Enum +from pkgutil import iter_modules + +from discord import Colour, Embed +from discord.ext import commands +from discord.ext.commands import Bot, Context, group + +from bot.constants import Emojis, MODERATION_ROLES, Roles, URLs +from bot.pagination import LinePaginator +from bot.utils.checks import with_role_check + +log = logging.getLogger(__name__) + +UNLOAD_BLACKLIST = {"bot.cogs.extensions", "bot.cogs.modlog"} +EXTENSIONS = frozenset( + ext.name + for ext in iter_modules(("bot/cogs",), "bot.cogs.") + if ext.name[-1] != "_" +) + + +class Action(Enum): + """Represents an action to perform on an extension.""" + + # Need to be partial otherwise they are considered to be function definitions. + LOAD = functools.partial(Bot.load_extension) + UNLOAD = functools.partial(Bot.unload_extension) + RELOAD = functools.partial(Bot.reload_extension) + + +class Extension(commands.Converter): + """ + Fully qualify the name of an extension and ensure it exists. + + The * and ** values bypass this when used with the reload command. + """ + + async def convert(self, ctx: Context, argument: str) -> str: + """Fully qualify the name of an extension and ensure it exists.""" + # Special values to reload all extensions + if argument == "*" or argument == "**": + return argument + + argument = argument.lower() + + if "." not in argument: + argument = f"bot.cogs.{argument}" + + if argument in EXTENSIONS: + return argument + else: + raise commands.BadArgument(f":x: Could not find the extension `{argument}`.") + + +class Extensions(commands.Cog): + """Extension management commands.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True) + async def extensions_group(self, ctx: Context) -> None: + """Load, unload, reload, and list loaded extensions.""" + await ctx.invoke(self.bot.get_command("help"), "extensions") + + @extensions_group.command(name="load", aliases=("l",)) + async def load_command(self, ctx: Context, *extensions: Extension) -> None: + """ + Load extensions given their fully qualified or unqualified names. + + If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded. + """ # noqa: W605 + if not extensions: + await ctx.invoke(self.bot.get_command("help"), "extensions load") + return + + if "*" in extensions or "**" in extensions: + extensions = set(EXTENSIONS) - set(self.bot.extensions.keys()) + + msg = self.batch_manage(Action.LOAD, *extensions) + await ctx.send(msg) + + @extensions_group.command(name="unload", aliases=("ul",)) + async def unload_command(self, ctx: Context, *extensions: Extension) -> None: + """ + Unload currently loaded extensions given their fully qualified or unqualified names. + + If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded. + """ # noqa: W605 + if not extensions: + await ctx.invoke(self.bot.get_command("help"), "extensions unload") + return + + blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions)) + + if blacklisted: + msg = f":x: The following extension(s) may not be unloaded:```{blacklisted}```" + else: + if "*" in extensions or "**" in extensions: + extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST + + msg = self.batch_manage(Action.UNLOAD, *extensions) + + await ctx.send(msg) + + @extensions_group.command(name="reload", aliases=("r",)) + async def reload_command(self, ctx: Context, *extensions: Extension) -> None: + """ + Reload extensions given their fully qualified or unqualified names. + + If an extension fails to be reloaded, it will be rolled-back to the prior working state. + + If '\*' is given as the name, all currently loaded extensions will be reloaded. + If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded. + """ # noqa: W605 + if not extensions: + await ctx.invoke(self.bot.get_command("help"), "extensions reload") + return + + if "**" in extensions: + extensions = EXTENSIONS + elif "*" in extensions: + extensions = set(self.bot.extensions.keys()) | set(extensions) + extensions.remove("*") + + msg = self.batch_manage(Action.RELOAD, *extensions) + + await ctx.send(msg) + + @extensions_group.command(name="list", aliases=("all",)) + async def list_command(self, ctx: Context) -> None: + """ + Get a list of all extensions, including their loaded status. + + Grey indicates that the extension is unloaded. + Green indicates that the extension is currently loaded. + """ + embed = Embed() + lines = [] + + embed.colour = Colour.blurple() + embed.set_author( + name="Extensions List", + url=URLs.github_bot_repo, + icon_url=URLs.bot_avatar + ) + + for ext in sorted(list(EXTENSIONS)): + if ext in self.bot.extensions: + status = Emojis.status_online + else: + status = Emojis.status_offline + + ext = ext.rsplit(".", 1)[1] + lines.append(f"{status} {ext}") + + log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") + await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False) + + def batch_manage(self, action: Action, *extensions: str) -> str: + """ + Apply an action to multiple extensions and return a message with the results. + + If only one extension is given, it is deferred to `manage()`. + """ + if len(extensions) == 1: + msg, _ = self.manage(action, extensions[0]) + return msg + + verb = action.name.lower() + failures = {} + + for extension in extensions: + _, error = self.manage(action, extension) + if error: + failures[extension] = error + + emoji = ":x:" if failures else ":ok_hand:" + msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed." + + if failures: + failures = "\n".join(f"{ext}\n {err}" for ext, err in failures.items()) + msg += f"\nFailures:```{failures}```" + + log.debug(f"Batch {verb}ed extensions.") + + return msg + + def manage(self, action: Action, ext: str) -> t.Tuple[str, t.Optional[str]]: + """Apply an action to an extension and return the status message and any error message.""" + verb = action.name.lower() + error_msg = None + + try: + action.value(self.bot, ext) + except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded): + if action is Action.RELOAD: + # When reloading, just load the extension if it was not loaded. + return self.manage(Action.LOAD, ext) + + msg = f":x: Extension `{ext}` is already {verb}ed." + log.debug(msg[4:]) + except Exception as e: + if hasattr(e, "original"): + e = e.original + + log.exception(f"Extension '{ext}' failed to {verb}.") + + error_msg = f"{e.__class__.__name__}: {e}" + msg = f":x: Failed to {verb} extension `{ext}`:\n```{error_msg}```" + else: + msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`." + log.debug(msg[10:]) + + return msg, error_msg + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators and core developers to invoke the commands in this cog.""" + return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developer) + + # This cannot be static (must have a __func__ attribute). + async def cog_command_error(self, ctx: Context, error: Exception) -> None: + """Handle BadArgument errors locally to prevent the help command from showing.""" + if isinstance(error, commands.BadArgument): + await ctx.send(str(error)) + error.handled = True + + +def setup(bot: Bot) -> None: + """Load the Extensions cog.""" + bot.add_cog(Extensions(bot)) + log.info("Cog loaded: Extensions") diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py index bd8c6ed67..265ae5160 100644 --- a/bot/cogs/filtering.py +++ b/bot/cogs/filtering.py @@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta from discord import Colour, DMChannel, Member, Message, TextChannel from discord.ext.commands import Bot, Cog -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import ( Channels, Colours, DEBUG_MODE, Filter, Icons, URLs diff --git a/bot/cogs/help.py b/bot/cogs/help.py index 37d12b2d5..9607dbd8d 100644 --- a/bot/cogs/help.py +++ b/bot/cogs/help.py @@ -1,5 +1,4 @@ import asyncio -import inspect import itertools from collections import namedtuple from contextlib import suppress @@ -61,6 +60,12 @@ class HelpSession: The message object that's showing the help contents. * destination: `discord.abc.Messageable` Where the help message is to be sent to. + + Cogs can be grouped into custom categories. All cogs with the same category will be displayed + under a single category name in the help output. Custom categories are defined inside the cogs + as a class attribute named `category`. A description can also be specified with the attribute + `category_description`. If a description is not found in at least one cog, the default will be + the regular description (class docstring) of the first cog found in the category. """ def __init__( @@ -107,12 +112,31 @@ class HelpSession: if command: return command - cog = self._bot.cogs.get(query) - if cog: + # Find all cog categories that match. + cog_matches = [] + description = None + for cog in self._bot.cogs.values(): + if hasattr(cog, "category") and cog.category == query: + cog_matches.append(cog) + if hasattr(cog, "category_description"): + description = cog.category_description + + # Try to search by cog name if no categories match. + if not cog_matches: + cog = self._bot.cogs.get(query) + + # Don't consider it a match if the cog has a category. + if cog and not hasattr(cog, "category"): + cog_matches = [cog] + + if cog_matches: + cog = cog_matches[0] + cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs + return Cog( - name=cog.qualified_name, - description=inspect.getdoc(cog), - commands=[c for c in self._bot.commands if c.cog is cog] + name=cog.category if hasattr(cog, "category") else cog.qualified_name, + description=description or cog.description, + commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list ) self._handle_not_found(query) @@ -207,8 +231,16 @@ class HelpSession: A zero width space is used as a prefix for results with no cogs to force them last in ordering. """ - cog = cmd.cog_name - return f'**{cog}**' if cog else f'**\u200bNo Category:**' + if cmd.cog: + try: + if cmd.cog.category: + return f'**{cmd.cog.category}**' + except AttributeError: + pass + + return f'**{cmd.cog_name}**' + else: + return "**\u200bNo Category:**" def _get_command_params(self, cmd: Command) -> str: """ diff --git a/bot/cogs/information.py b/bot/cogs/information.py index 1afb37103..3a7ba0444 100644 --- a/bot/cogs/information.py +++ b/bot/cogs/information.py @@ -1,14 +1,18 @@ import colorsys import logging +import pprint import textwrap import typing +from typing import Any, Mapping, Optional +import discord from discord import CategoryChannel, Colour, Embed, Member, Role, TextChannel, VoiceChannel, utils -from discord.ext.commands import Bot, Cog, Context, command +from discord.ext import commands +from discord.ext.commands import Bot, BucketType, Cog, Context, command, group from bot.constants import Channels, Emojis, MODERATION_ROLES, STAFF_ROLES -from bot.decorators import InChannelCheckFailure, with_role -from bot.utils.checks import with_role_check +from bot.decorators import InChannelCheckFailure, in_channel, with_role +from bot.utils.checks import cooldown_with_role_bypass, with_role_check from bot.utils.time import time_since log = logging.getLogger(__name__) @@ -229,6 +233,82 @@ class Information(Cog): await ctx.send(embed=embed) + def format_fields(self, mapping: Mapping[str, Any], field_width: Optional[int] = None) -> str: + """Format a mapping to be readable to a human.""" + # sorting is technically superfluous but nice if you want to look for a specific field + fields = sorted(mapping.items(), key=lambda item: item[0]) + + if field_width is None: + field_width = len(max(mapping.keys(), key=len)) + + out = '' + + for key, val in fields: + if isinstance(val, dict): + # if we have dicts inside dicts we want to apply the same treatment to the inner dictionaries + inner_width = int(field_width * 1.6) + val = '\n' + self.format_fields(val, field_width=inner_width) + + elif isinstance(val, str): + # split up text since it might be long + text = textwrap.fill(val, width=100, replace_whitespace=False) + + # indent it, I guess you could do this with `wrap` and `join` but this is nicer + val = textwrap.indent(text, ' ' * (field_width + len(': '))) + + # the first line is already indented so we `str.lstrip` it + val = val.lstrip() + + if key == 'color': + # makes the base 10 representation of a hex number readable to humans + val = hex(val) + + out += '{0:>{width}}: {1}\n'.format(key, val, width=field_width) + + # remove trailing whitespace + return out.rstrip() + + @cooldown_with_role_bypass(2, 60 * 3, BucketType.member, bypass_roles=STAFF_ROLES) + @group(invoke_without_command=True) + @in_channel(Channels.bot, bypass_roles=STAFF_ROLES) + async def raw(self, ctx: Context, *, message: discord.Message, json: bool = False) -> None: + """Shows information about the raw API response.""" + # I *guess* it could be deleted right as the command is invoked but I felt like it wasn't worth handling + # doing this extra request is also much easier than trying to convert everything back into a dictionary again + raw_data = await ctx.bot.http.get_message(message.channel.id, message.id) + + paginator = commands.Paginator() + + def add_content(title: str, content: str) -> None: + paginator.add_line(f'== {title} ==\n') + # replace backticks as it breaks out of code blocks. Spaces seemed to be the most reasonable solution. + # we hope it's not close to 2000 + paginator.add_line(content.replace('```', '`` `')) + paginator.close_page() + + if message.content: + add_content('Raw message', message.content) + + transformer = pprint.pformat if json else self.format_fields + for field_name in ('embeds', 'attachments'): + data = raw_data[field_name] + + if not data: + continue + + total = len(data) + for current, item in enumerate(data, start=1): + title = f'Raw {field_name} ({current}/{total})' + add_content(title, transformer(item)) + + for page in paginator.pages: + await ctx.send(page) + + @raw.command() + async def json(self, ctx: Context, message: discord.Message) -> None: + """Shows information about the raw API response in a copy-pasteable Python format.""" + await ctx.invoke(self.raw, message=message, json=True) + def setup(bot: Bot) -> None: """Information cog load.""" diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py index 8e47bcc36..c92b619ff 100644 --- a/bot/cogs/logging.py +++ b/bot/cogs/logging.py @@ -15,9 +15,11 @@ class Logging(Cog): def __init__(self, bot: Bot): self.bot = bot - @Cog.listener() - async def on_ready(self) -> None: + self.bot.loop.create_task(self.startup_greeting()) + + async def startup_greeting(self) -> None: """Announce our presence to the configured devlog channel.""" + await self.bot.wait_until_ready() log.info("Bot connected!") embed = Embed(description="Connected!") diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py deleted file mode 100644 index 5aa873a47..000000000 --- a/bot/cogs/moderation.py +++ /dev/null @@ -1,1172 +0,0 @@ -import asyncio -import logging -import textwrap -from datetime import datetime -from typing import Dict, Union - -from discord import ( - Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User -) -from discord.ext.commands import ( - BadArgument, BadUnionArgument, Bot, Cog, Context, command, group -) - -from bot import constants -from bot.cogs.modlog import ModLog -from bot.constants import Colours, Event, Icons, MODERATION_ROLES -from bot.converters import Duration, InfractionSearchQuery -from bot.decorators import with_role -from bot.pagination import LinePaginator -from bot.utils.moderation import already_has_active_infraction, post_infraction -from bot.utils.scheduling import Scheduler, create_task -from bot.utils.time import INFRACTION_FORMAT, format_infraction, wait_until - -log = logging.getLogger(__name__) - -INFRACTION_ICONS = { - "Mute": Icons.user_mute, - "Kick": Icons.sign_out, - "Ban": Icons.user_ban -} -RULES_URL = "https://pythondiscord.com/pages/rules" -APPEALABLE_INFRACTIONS = ("Ban", "Mute") - - -def proxy_user(user_id: str) -> Object: - """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved.""" - try: - user_id = int(user_id) - except ValueError: - raise BadArgument - user = Object(user_id) - user.mention = user.id - user.avatar_url_as = lambda static_format: None - return user - - -def permanent_duration(expires_at: str) -> str: - """Only allow an expiration to be 'permanent' if it is a string.""" - expires_at = expires_at.lower() - if expires_at != "permanent": - raise BadArgument - else: - return expires_at - - -UserTypes = Union[Member, User, proxy_user] - - -class Moderation(Scheduler, Cog): - """Server moderation tools.""" - - def __init__(self, bot: Bot): - self.bot = bot - self._muted_role = Object(constants.Roles.muted) - super().__init__() - - @property - def mod_log(self) -> ModLog: - """Get currently loaded ModLog cog instance.""" - return self.bot.get_cog("ModLog") - - @Cog.listener() - async def on_ready(self) -> None: - """Schedule expiration for previous infractions.""" - # Schedule expiration for previous infractions - infractions = await self.bot.api_client.get( - 'bot/infractions', params={'active': 'true'} - ) - for infraction in infractions: - if infraction["expires_at"] is not None: - self.schedule_task(self.bot.loop, infraction["id"], infraction) - - @Cog.listener() - async def on_member_join(self, member: Member) -> None: - """Reapply active mute infractions for returning members.""" - active_mutes = await self.bot.api_client.get( - 'bot/infractions', - params={'user__id': str(member.id), 'type': 'mute', 'active': 'true'} - ) - if not active_mutes: - return - - # assume a single mute because of restrictions elsewhere - mute = active_mutes[0] - - # transform expiration to delay in seconds - expiration_datetime = datetime.fromisoformat(mute["expires_at"][:-1]) - delay = expiration_datetime - datetime.utcnow() - delay_seconds = delay.total_seconds() - - # if under a minute or in the past - if delay_seconds < 60: - log.debug(f"Marking infraction {mute['id']} as inactive (expired).") - await self._deactivate_infraction(mute) - self.cancel_task(mute["id"]) - - # Notify the user that they've been unmuted. - await self.notify_pardon( - user=member, - title="You have been unmuted.", - content="You may now send messages in the server.", - icon_url=Icons.user_unmute - ) - return - - # allowing modlog since this is a passive action that should be logged - await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}") - log.debug(f"User {member.id} has been re-muted on rejoin.") - - # region: Permanent infractions - - @with_role(*MODERATION_ROLES) - @command() - async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: - """Create a warning infraction in the database for a user.""" - infraction = await post_infraction(ctx, user, type="warning", reason=reason) - if infraction is None: - return - - notified = await self.notify_infraction(user=user, infr_type="Warning", reason=reason) - - dm_result = ":incoming_envelope: " if notified else "" - action = f"{dm_result}:ok_hand: warned {user.mention}" - await ctx.send(f"{action}.") - - if notified: - dm_status = "Sent" - log_content = None - else: - dm_status = "**Failed**" - log_content = ctx.author.mention - - await self.mod_log.send_log_message( - icon_url=Icons.user_warn, - colour=Colour(Colours.soft_red), - title="Member warned", - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.author} - DM: {dm_status} - Reason: {reason} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - @with_role(*MODERATION_ROLES) - @command() - async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: - """Kicks a user with the provided reason.""" - if not await self.respect_role_hierarchy(ctx, user, 'kick'): - # Ensure ctx author has a higher top role than the target user - # Warning is sent to ctx by the helper method - return - - infraction = await post_infraction(ctx, user, type="kick", reason=reason) - if infraction is None: - return - - notified = await self.notify_infraction(user=user, infr_type="Kick", reason=reason) - - self.mod_log.ignore(Event.member_remove, user.id) - - try: - await user.kick(reason=reason) - action_result = True - except Forbidden: - action_result = False - - dm_result = ":incoming_envelope: " if notified else "" - action = f"{dm_result}:ok_hand: kicked {user.mention}" - await ctx.send(f"{action}.") - - dm_status = "Sent" if notified else "**Failed**" - title = "Member kicked" if action_result else "Member kicked (Failed)" - log_content = None if all((notified, action_result)) else ctx.author.mention - - await self.mod_log.send_log_message( - icon_url=Icons.sign_out, - colour=Colour(Colours.soft_red), - title=title, - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - DM: {dm_status} - Reason: {reason} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - @with_role(*MODERATION_ROLES) - @command() - async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: - """Create a permanent ban infraction for a user with the provided reason.""" - if not await self.respect_role_hierarchy(ctx, user, 'ban'): - # Ensure ctx author has a higher top role than the target user - # Warning is sent to ctx by the helper method - return - - if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): - return - - infraction = await post_infraction(ctx, user, type="ban", reason=reason) - if infraction is None: - return - - notified = await self.notify_infraction( - user=user, - infr_type="Ban", - reason=reason - ) - - self.mod_log.ignore(Event.member_ban, user.id) - self.mod_log.ignore(Event.member_remove, user.id) - - try: - await ctx.guild.ban(user, reason=reason, delete_message_days=0) - action_result = True - except Forbidden: - action_result = False - - dm_result = ":incoming_envelope: " if notified else "" - action = f"{dm_result}:ok_hand: permanently banned {user.mention}" - await ctx.send(f"{action}.") - - dm_status = "Sent" if notified else "**Failed**" - log_content = None if all((notified, action_result)) else ctx.author.mention - title = "Member permanently banned" - if not action_result: - title += " (Failed)" - - await self.mod_log.send_log_message( - icon_url=Icons.user_ban, - colour=Colour(Colours.soft_red), - title=title, - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - DM: {dm_status} - Reason: {reason} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - # endregion - # region: Temporary infractions - - @with_role(*MODERATION_ROLES) - @command(aliases=('mute',)) - async def tempmute(self, ctx: Context, user: Member, duration: Duration, *, reason: str = None) -> None: - """ - Create a temporary mute infraction for a user with the provided expiration and reason. - - Duration strings are parsed per: http://strftime.org/ - """ - expiration = duration - - if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): - return - - infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration) - if infraction is None: - return - - self.mod_log.ignore(Event.member_update, user.id) - await user.add_roles(self._muted_role, reason=reason) - - notified = await self.notify_infraction( - user=user, - infr_type="Mute", - expires_at=expiration, - reason=reason - ) - - infraction_expiration = format_infraction(infraction["expires_at"]) - - self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - - dm_result = ":incoming_envelope: " if notified else "" - action = f"{dm_result}:ok_hand: muted {user.mention} until {infraction_expiration}" - await ctx.send(f"{action}.") - - if notified: - dm_status = "Sent" - log_content = None - else: - dm_status = "**Failed**" - log_content = ctx.author.mention - - await self.mod_log.send_log_message( - icon_url=Icons.user_mute, - colour=Colour(Colours.soft_red), - title="Member temporarily muted", - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - DM: {dm_status} - Reason: {reason} - Expires: {infraction_expiration} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - @with_role(*MODERATION_ROLES) - @command() - async def tempban(self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None) -> None: - """ - Create a temporary ban infraction for a user with the provided expiration and reason. - - Duration strings are parsed per: http://strftime.org/ - """ - expiration = duration - - if not await self.respect_role_hierarchy(ctx, user, 'tempban'): - # Ensure ctx author has a higher top role than the target user - # Warning is sent to ctx by the helper method - return - - if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): - return - - infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration) - if infraction is None: - return - - notified = await self.notify_infraction( - user=user, - infr_type="Ban", - expires_at=expiration, - reason=reason - ) - - self.mod_log.ignore(Event.member_ban, user.id) - self.mod_log.ignore(Event.member_remove, user.id) - - try: - await ctx.guild.ban(user, reason=reason, delete_message_days=0) - action_result = True - except Forbidden: - action_result = False - - infraction_expiration = format_infraction(infraction["expires_at"]) - - self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - - dm_result = ":incoming_envelope: " if notified else "" - action = f"{dm_result}:ok_hand: banned {user.mention} until {infraction_expiration}" - await ctx.send(f"{action}.") - - dm_status = "Sent" if notified else "**Failed**" - log_content = None if all((notified, action_result)) else ctx.author.mention - title = "Member temporarily banned" - if not action_result: - title += " (Failed)" - - await self.mod_log.send_log_message( - icon_url=Icons.user_ban, - colour=Colour(Colours.soft_red), - thumbnail=user.avatar_url_as(static_format="png"), - title=title, - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - DM: {dm_status} - Reason: {reason} - Expires: {infraction_expiration} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - # endregion - # region: Permanent shadow infractions - - @with_role(*MODERATION_ROLES) - @command(hidden=True) - async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: - """ - Create a private infraction note in the database for a user with the provided reason. - - This does not send the user a notification - """ - infraction = await post_infraction(ctx, user, type="note", reason=reason, hidden=True) - if infraction is None: - return - - await ctx.send(f":ok_hand: note added for {user.mention}.") - - await self.mod_log.send_log_message( - icon_url=Icons.user_warn, - colour=Colour(Colours.soft_red), - title="Member note added", - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Reason: {reason} - """), - footer=f"ID {infraction['id']}" - ) - - @with_role(*MODERATION_ROLES) - @command(hidden=True, aliases=['shadowkick', 'skick']) - async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: - """ - Kick a user for the provided reason. - - This does not send the user a notification. - """ - if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'): - # Ensure ctx author has a higher top role than the target user - # Warning is sent to ctx by the helper method - return - - infraction = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True) - if infraction is None: - return - - self.mod_log.ignore(Event.member_remove, user.id) - - try: - await user.kick(reason=reason) - action_result = True - except Forbidden: - action_result = False - - await ctx.send(f":ok_hand: kicked {user.mention}.") - - title = "Member shadow kicked" - if action_result: - log_content = None - else: - log_content = ctx.author.mention - title += " (Failed)" - - await self.mod_log.send_log_message( - icon_url=Icons.sign_out, - colour=Colour(Colours.soft_red), - title=title, - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Reason: {reason} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - @with_role(*MODERATION_ROLES) - @command(hidden=True, aliases=['shadowban', 'sban']) - async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: - """ - Create a permanent ban infraction for a user with the provided reason. - - This does not send the user a notification. - """ - if not await self.respect_role_hierarchy(ctx, user, 'shadowban'): - # Ensure ctx author has a higher top role than the target user - # Warning is sent to ctx by the helper method - return - - if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): - return - - infraction = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True) - if infraction is None: - return - - self.mod_log.ignore(Event.member_ban, user.id) - self.mod_log.ignore(Event.member_remove, user.id) - - try: - await ctx.guild.ban(user, reason=reason, delete_message_days=0) - action_result = True - except Forbidden: - action_result = False - - await ctx.send(f":ok_hand: permanently banned {user.mention}.") - - title = "Member permanently banned" - if action_result: - log_content = None - else: - log_content = ctx.author.mention - title += " (Failed)" - - await self.mod_log.send_log_message( - icon_url=Icons.user_ban, - colour=Colour(Colours.soft_red), - title=title, - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Reason: {reason} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - # endregion - # region: Temporary shadow infractions - - @with_role(*MODERATION_ROLES) - @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"]) - async def shadow_tempmute( - self, ctx: Context, user: Member, duration: Duration, *, reason: str = None - ) -> None: - """ - Create a temporary mute infraction for a user with the provided reason. - - Duration strings are parsed per: http://strftime.org/ - - This does not send the user a notification. - """ - expiration = duration - - if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): - return - - infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration, hidden=True) - if infraction is None: - return - - self.mod_log.ignore(Event.member_update, user.id) - await user.add_roles(self._muted_role, reason=reason) - - infraction_expiration = format_infraction(infraction["expires_at"]) - self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.") - - await self.mod_log.send_log_message( - icon_url=Icons.user_mute, - colour=Colour(Colours.soft_red), - title="Member temporarily muted", - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Reason: {reason} - Expires: {infraction_expiration} - """), - footer=f"ID {infraction['id']}" - ) - - @with_role(*MODERATION_ROLES) - @command(hidden=True, aliases=["shadowtempban, stempban"]) - async def shadow_tempban( - self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None - ) -> None: - """ - Create a temporary ban infraction for a user with the provided reason. - - Duration strings are parsed per: http://strftime.org/ - - This does not send the user a notification. - """ - expiration = duration - - if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'): - # Ensure ctx author has a higher top role than the target user - # Warning is sent to ctx by the helper method - return - - if await already_has_active_infraction(ctx=ctx, user=user, type="ban"): - return - - infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration, hidden=True) - if infraction is None: - return - - self.mod_log.ignore(Event.member_ban, user.id) - self.mod_log.ignore(Event.member_remove, user.id) - - try: - await ctx.guild.ban(user, reason=reason, delete_message_days=0) - action_result = True - except Forbidden: - action_result = False - - infraction_expiration = format_infraction(infraction["expires_at"]) - self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.") - - title = "Member temporarily banned" - if action_result: - log_content = None - else: - log_content = ctx.author.mention - title += " (Failed)" - - # Send a log message to the mod log - await self.mod_log.send_log_message( - icon_url=Icons.user_ban, - colour=Colour(Colours.soft_red), - thumbnail=user.avatar_url_as(static_format="png"), - title=title, - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - Reason: {reason} - Expires: {infraction_expiration} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - # endregion - # region: Remove infractions (un- commands) - - @with_role(*MODERATION_ROLES) - @command() - async def unmute(self, ctx: Context, user: UserTypes) -> None: - """Deactivates the active mute infraction for a user.""" - try: - # check the current active infraction - response = await self.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': 'mute', - 'user__id': user.id - } - ) - if len(response) > 1: - log.warning("Found more than one active mute infraction for user `%d`", user.id) - - if not response: - # no active infraction - await ctx.send( - f":x: There is no active mute infraction for user {user.mention}." - ) - return - - for infraction in response: - await self._deactivate_infraction(infraction) - if infraction["expires_at"] is not None: - self.cancel_expiration(infraction["id"]) - - notified = await self.notify_pardon( - user=user, - title="You have been unmuted.", - content="You may now send messages in the server.", - icon_url=Icons.user_unmute - ) - - if notified: - dm_status = "Sent" - dm_emoji = ":incoming_envelope: " - log_content = None - else: - dm_status = "**Failed**" - dm_emoji = "" - log_content = ctx.author.mention - - await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.") - - embed_text = textwrap.dedent( - f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - DM: {dm_status} - """ - ) - - if len(response) > 1: - footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" - title = "Member unmuted" - embed_text += "Note: User had multiple **active** mute infractions in the database." - else: - infraction = response[0] - footer = f"Infraction ID: {infraction['id']}" - title = "Member unmuted" - - # Send a log message to the mod log - await self.mod_log.send_log_message( - icon_url=Icons.user_unmute, - colour=Colour(Colours.soft_green), - title=title, - thumbnail=user.avatar_url_as(static_format="png"), - text=embed_text, - footer=footer, - content=log_content - ) - except Exception: - log.exception("There was an error removing an infraction.") - await ctx.send(":x: There was an error removing the infraction.") - - @with_role(*MODERATION_ROLES) - @command() - async def unban(self, ctx: Context, user: UserTypes) -> None: - """Deactivates the active ban infraction for a user.""" - try: - # check the current active infraction - response = await self.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': 'ban', - 'user__id': str(user.id) - } - ) - if len(response) > 1: - log.warning( - "More than one active ban infraction found for user `%d`.", - user.id - ) - - if not response: - # no active infraction - await ctx.send( - f":x: There is no active ban infraction for user {user.mention}." - ) - return - - for infraction in response: - await self._deactivate_infraction(infraction) - if infraction["expires_at"] is not None: - self.cancel_expiration(infraction["id"]) - - embed_text = textwrap.dedent( - f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - """ - ) - - if len(response) > 1: - footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" - embed_text += "Note: User had multiple **active** ban infractions in the database." - else: - infraction = response[0] - footer = f"Infraction ID: {infraction['id']}" - - await ctx.send(f":ok_hand: Un-banned {user.mention}.") - - # Send a log message to the mod log - await self.mod_log.send_log_message( - icon_url=Icons.user_unban, - colour=Colour(Colours.soft_green), - title="Member unbanned", - thumbnail=user.avatar_url_as(static_format="png"), - text=embed_text, - footer=footer, - ) - except Exception: - log.exception("There was an error removing an infraction.") - await ctx.send(":x: There was an error removing the infraction.") - - # endregion - # region: Edit infraction commands - - @with_role(*MODERATION_ROLES) - @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) - async def infraction_group(self, ctx: Context) -> None: - """Infraction manipulation commands.""" - await ctx.invoke(self.bot.get_command("help"), "infraction") - - @with_role(*MODERATION_ROLES) - @infraction_group.command(name='edit') - async def infraction_edit( - self, - ctx: Context, - infraction_id: int, - expires_at: Union[Duration, permanent_duration, None], - *, - reason: str = None - ) -> None: - """ - Edit the duration and/or the reason of an infraction. - - Durations are relative to the time of updating. - Use "permanent" to mark the infraction as permanent. - """ - if expires_at is None and reason is None: - # Unlike UserInputError, the error handler will show a specified message for BadArgument - raise BadArgument("Neither a new expiry nor a new reason was specified.") - - # Retrieve the previous infraction for its information. - old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}') - - request_data = {} - confirm_messages = [] - log_text = "" - - if expires_at == "permanent": - request_data['expires_at'] = None - confirm_messages.append("marked as permanent") - elif expires_at is not None: - request_data['expires_at'] = expires_at.isoformat() - confirm_messages.append(f"set to expire on {expires_at.strftime(INFRACTION_FORMAT)}") - else: - confirm_messages.append("expiry unchanged") - - if reason: - request_data['reason'] = reason - confirm_messages.append("set a new reason") - log_text += f""" - Previous reason: {old_infraction['reason']} - New reason: {reason} - """.rstrip() - else: - confirm_messages.append("reason unchanged") - - # Update the infraction - new_infraction = await self.bot.api_client.patch( - f'bot/infractions/{infraction_id}', - json=request_data, - ) - - # Re-schedule infraction if the expiration has been updated - if 'expires_at' in request_data: - self.cancel_task(new_infraction['id']) - loop = asyncio.get_event_loop() - self.schedule_task(loop, new_infraction['id'], new_infraction) - - log_text += f""" - Previous expiry: {old_infraction['expires_at'] or "Permanent"} - New expiry: {new_infraction['expires_at'] or "Permanent"} - """.rstrip() - - await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}") - - # Get information about the infraction's user - user_id = new_infraction['user'] - user = ctx.guild.get_member(user_id) - - if user: - user_text = f"{user.mention} (`{user.id}`)" - thumbnail = user.avatar_url_as(static_format="png") - else: - user_text = f"`{user_id}`" - thumbnail = None - - # The infraction's actor - actor_id = new_infraction['actor'] - actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" - - await self.mod_log.send_log_message( - icon_url=Icons.pencil, - colour=Colour.blurple(), - title="Infraction edited", - thumbnail=thumbnail, - text=textwrap.dedent(f""" - Member: {user_text} - Actor: {actor} - Edited by: {ctx.message.author}{log_text} - """) - ) - - # endregion - # region: Search infractions - - @with_role(*MODERATION_ROLES) - @infraction_group.group(name="search", invoke_without_command=True) - async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: - """Searches for infractions in the database.""" - if isinstance(query, User): - await ctx.invoke(self.search_user, query) - - else: - await ctx.invoke(self.search_reason, query) - - @with_role(*MODERATION_ROLES) - @infraction_search_group.command(name="user", aliases=("member", "id")) - async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None: - """Search for infractions by member.""" - infraction_list = await self.bot.api_client.get( - 'bot/infractions', - params={'user__id': str(user.id)} - ) - embed = Embed( - title=f"Infractions for {user} ({len(infraction_list)} total)", - colour=Colour.orange() - ) - await self.send_infraction_list(ctx, embed, infraction_list) - - @with_role(*MODERATION_ROLES) - @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) - async def search_reason(self, ctx: Context, reason: str) -> None: - """Search for infractions by their reason. Use Re2 for matching.""" - infraction_list = await self.bot.api_client.get( - 'bot/infractions', params={'search': reason} - ) - embed = Embed( - title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", - colour=Colour.orange() - ) - await self.send_infraction_list(ctx, embed, infraction_list) - - # endregion - # region: Utility functions - - async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None: - """Send a paginated embed of infractions for the specified user.""" - if not infractions: - await ctx.send(f":warning: No infractions could be found for that query.") - return - - lines = tuple( - self._infraction_to_string(infraction) - for infraction in infractions - ) - - await LinePaginator.paginate( - lines, - ctx=ctx, - embed=embed, - empty=True, - max_lines=3, - max_size=1000 - ) - - # endregion - # region: Utility functions - - def schedule_expiration( - self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]] - ) -> None: - """Schedules a task to expire a temporary infraction.""" - infraction_id = infraction_object["id"] - if infraction_id in self.scheduled_tasks: - return - - task: asyncio.Task = create_task(loop, self._scheduled_expiration(infraction_object)) - - self.scheduled_tasks[infraction_id] = task - - def cancel_expiration(self, infraction_id: str) -> None: - """Un-schedules a task set to expire a temporary infraction.""" - task = self.scheduled_tasks.get(infraction_id) - if task is None: - log.warning(f"Failed to unschedule {infraction_id}: no task found.") - return - task.cancel() - log.debug(f"Unscheduled {infraction_id}.") - del self.scheduled_tasks[infraction_id] - - async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: - """ - Marks an infraction expired after the delay from time of scheduling to time of expiration. - - At the time of expiration, the infraction is marked as inactive on the website, and the - expiration task is cancelled. The user is then notified via DM. - """ - infraction_id = infraction_object["id"] - - # transform expiration to delay in seconds - expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1]) - await wait_until(expiration_datetime) - - log.debug(f"Marking infraction {infraction_id} as inactive (expired).") - await self._deactivate_infraction(infraction_object) - - self.cancel_task(infraction_object["id"]) - - # Notify the user that they've been unmuted. - user_id = infraction_object["user"] - guild = self.bot.get_guild(constants.Guild.id) - await self.notify_pardon( - user=guild.get_member(user_id), - title="You have been unmuted.", - content="You may now send messages in the server.", - icon_url=Icons.user_unmute - ) - - async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: - """ - A co-routine which marks an infraction as inactive on the website. - - This co-routine does not cancel or un-schedule an expiration task. - """ - guild: Guild = self.bot.get_guild(constants.Guild.id) - user_id = infraction_object["user"] - infraction_type = infraction_object["type"] - - await self.bot.api_client.patch( - 'bot/infractions/' + str(infraction_object['id']), - json={"active": False} - ) - - if infraction_type == "mute": - member: Member = guild.get_member(user_id) - if member: - # remove the mute role - self.mod_log.ignore(Event.member_update, member.id) - await member.remove_roles(self._muted_role) - else: - log.warning(f"Failed to un-mute user: {user_id} (not found)") - elif infraction_type == "ban": - user: Object = Object(user_id) - try: - await guild.unban(user) - except NotFound: - log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.") - - def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str: - """Convert the infraction object to a string representation.""" - actor_id = infraction_object["actor"] - guild: Guild = self.bot.get_guild(constants.Guild.id) - actor = guild.get_member(actor_id) - active = infraction_object["active"] - user_id = infraction_object["user"] - hidden = infraction_object["hidden"] - created = format_infraction(infraction_object["inserted_at"]) - if infraction_object["expires_at"] is None: - expires = "*Permanent*" - else: - expires = format_infraction(infraction_object["expires_at"]) - - lines = textwrap.dedent(f""" - {"**===============**" if active else "==============="} - Status: {"__**Active**__" if active else "Inactive"} - User: {self.bot.get_user(user_id)} (`{user_id}`) - Type: **{infraction_object["type"]}** - Shadow: {hidden} - Reason: {infraction_object["reason"] or "*None*"} - Created: {created} - Expires: {expires} - Actor: {actor.mention if actor else actor_id} - ID: `{infraction_object["id"]}` - {"**===============**" if active else "==============="} - """) - - return lines.strip() - - async def notify_infraction( - self, - user: Union[User, Member], - infr_type: str, - expires_at: Union[datetime, str] = 'N/A', - reason: str = "No reason provided." - ) -> bool: - """ - Attempt to notify a user, via DM, of their fresh infraction. - - Returns a boolean indicator of whether the DM was successful. - """ - if isinstance(expires_at, datetime): - expires_at = expires_at.strftime(INFRACTION_FORMAT) - - embed = Embed( - description=textwrap.dedent(f""" - **Type:** {infr_type} - **Expires:** {expires_at} - **Reason:** {reason} - """), - colour=Colour(Colours.soft_red) - ) - - icon_url = INFRACTION_ICONS.get(infr_type, Icons.token_removed) - embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) - embed.title = f"Please review our rules over at {RULES_URL}" - embed.url = RULES_URL - - if infr_type in APPEALABLE_INFRACTIONS: - embed.set_footer(text="To appeal this infraction, send an e-mail to [email protected]") - - return await self.send_private_embed(user, embed) - - async def notify_pardon( - self, - user: Union[User, Member], - title: str, - content: str, - icon_url: str = Icons.user_verified - ) -> bool: - """ - Attempt to notify a user, via DM, of their expired infraction. - - Optionally returns a boolean indicator of whether the DM was successful. - """ - embed = Embed( - description=content, - colour=Colour(Colours.soft_green) - ) - - embed.set_author(name=title, icon_url=icon_url) - - return await self.send_private_embed(user, embed) - - async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool: - """ - A helper method for sending an embed to a user's DMs. - - Returns a boolean indicator of DM success. - """ - # sometimes `user` is a `discord.Object`, so let's make it a proper user. - user = await self.bot.fetch_user(user.id) - - try: - await user.send(embed=embed) - return True - except (HTTPException, Forbidden): - log.debug( - f"Infraction-related information could not be sent to user {user} ({user.id}). " - "They've probably just disabled private messages." - ) - return False - - async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None: - """Send a mod log entry if an attempt to DM the target user has failed.""" - await self.mod_log.send_log_message( - icon_url=Icons.token_removed, - content=actor.mention, - colour=Colour(Colours.soft_red), - title="Notification Failed", - text=( - f"Direct message was unable to be sent.\nUser: {target.mention}\n" - f"Type: {infraction_type}" - ) - ) - - # endregion - - # This cannot be static (must have a __func__ attribute). - async def cog_command_error(self, ctx: Context, error: Exception) -> None: - """Send a notification to the invoking context on a Union failure.""" - if isinstance(error, BadUnionArgument): - if User in error.converters: - await ctx.send(str(error.errors[0])) - error.handled = True - - @staticmethod - async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool: - """ - Check if the highest role of the invoking member is greater than that of the target member. - - If this check fails, a warning is sent to the invoking ctx. - - Returns True always if target is not a discord.Member instance. - """ - if not isinstance(target, Member): - return True - - actor = ctx.author - target_is_lower = target.top_role < actor.top_role - if not target_is_lower: - log.info( - f"{actor} ({actor.id}) attempted to {infr_type} " - f"{target} ({target.id}), who has an equal or higher top role." - ) - await ctx.send( - f":x: {actor.mention}, you may not {infr_type} " - "someone with an equal or higher top role." - ) - - return target_is_lower - - -def setup(bot: Bot) -> None: - """Moderation cog load.""" - bot.add_cog(Moderation(bot)) - log.info("Cog loaded: Moderation") diff --git a/bot/cogs/moderation/__init__.py b/bot/cogs/moderation/__init__.py new file mode 100644 index 000000000..7383ed44e --- /dev/null +++ b/bot/cogs/moderation/__init__.py @@ -0,0 +1,25 @@ +import logging + +from discord.ext.commands import Bot + +from .infractions import Infractions +from .management import ModManagement +from .modlog import ModLog +from .superstarify import Superstarify + +log = logging.getLogger(__name__) + + +def setup(bot: Bot) -> None: + """Load the moderation extension (Infractions, ModManagement, ModLog, & Superstarify cogs).""" + bot.add_cog(Infractions(bot)) + log.info("Cog loaded: Infractions") + + bot.add_cog(ModLog(bot)) + log.info("Cog loaded: ModLog") + + bot.add_cog(ModManagement(bot)) + log.info("Cog loaded: ModManagement") + + bot.add_cog(Superstarify(bot)) + log.info("Cog loaded: Superstarify") diff --git a/bot/cogs/moderation/infractions.py b/bot/cogs/moderation/infractions.py new file mode 100644 index 000000000..592ead60f --- /dev/null +++ b/bot/cogs/moderation/infractions.py @@ -0,0 +1,607 @@ +import logging +import textwrap +import typing as t +from datetime import datetime + +import dateutil.parser +import discord +from discord import Member +from discord.ext import commands +from discord.ext.commands import Context, command + +from bot import constants +from bot.api import ResponseCodeError +from bot.constants import Colours, Event +from bot.decorators import respect_role_hierarchy +from bot.utils import time +from bot.utils.checks import with_role_check +from bot.utils.scheduling import Scheduler +from . import utils +from .modlog import ModLog +from .utils import MemberObject + +log = logging.getLogger(__name__) + +MemberConverter = t.Union[utils.UserTypes, utils.proxy_user] + + +class Infractions(Scheduler, commands.Cog): + """Apply and pardon infractions on users for moderation purposes.""" + + category = "Moderation" + category_description = "Server moderation tools." + + def __init__(self, bot: commands.Bot): + super().__init__() + + self.bot = bot + self.category = "Moderation" + self._muted_role = discord.Object(constants.Roles.muted) + + self.bot.loop.create_task(self.reschedule_infractions()) + + @property + def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" + return self.bot.get_cog("ModLog") + + async def reschedule_infractions(self) -> None: + """Schedule expiration for previous infractions.""" + await self.bot.wait_until_ready() + + infractions = await self.bot.api_client.get( + 'bot/infractions', + params={'active': 'true'} + ) + for infraction in infractions: + if infraction["expires_at"] is not None: + self.schedule_task(self.bot.loop, infraction["id"], infraction) + + @commands.Cog.listener() + async def on_member_join(self, member: Member) -> None: + """Reapply active mute infractions for returning members.""" + active_mutes = await self.bot.api_client.get( + 'bot/infractions', + params={ + 'user__id': str(member.id), + 'type': 'mute', + 'active': 'true' + } + ) + if not active_mutes: + return + + # Assume a single mute because of restrictions elsewhere. + mute = active_mutes[0] + + # Calculate the time remaining, in seconds, for the mute. + expiry = dateutil.parser.isoparse(mute["expires_at"]).replace(tzinfo=None) + delta = (expiry - datetime.utcnow()).total_seconds() + + # Mark as inactive if less than a minute remains. + if delta < 60: + await self.deactivate_infraction(mute) + return + + # Allowing mod log since this is a passive action that should be logged. + await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}") + log.debug(f"User {member.id} has been re-muted on rejoin.") + + # region: Permanent infractions + + @command() + async def warn(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Warn a user for the given reason.""" + infraction = await utils.post_infraction(ctx, user, "warning", reason, active=False) + if infraction is None: + return + + await self.apply_infraction(ctx, infraction, user) + + @command() + async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Kick a user for the given reason.""" + await self.apply_kick(ctx, user, reason, active=False) + + @command() + async def ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Permanently ban a user for the given reason.""" + await self.apply_ban(ctx, user, reason) + + # endregion + # region: Temporary infractions + + @command(aliases=["mute"]) + async def tempmute(self, ctx: Context, user: Member, duration: utils.Expiry, *, reason: str = None) -> None: + """ + Temporarily mute a user for the given reason and duration. + + A unit of time should be appended to the duration. + Units (∗case-sensitive): + \u2003`y` - years + \u2003`m` - months∗ + \u2003`w` - weeks + \u2003`d` - days + \u2003`h` - hours + \u2003`M` - minutes∗ + \u2003`s` - seconds + + Alternatively, an ISO 8601 timestamp can be provided for the duration. + """ + await self.apply_mute(ctx, user, reason, expires_at=duration) + + @command() + async def tempban(self, ctx: Context, user: MemberConverter, duration: utils.Expiry, *, reason: str = None) -> None: + """ + Temporarily ban a user for the given reason and duration. + + A unit of time should be appended to the duration. + Units (∗case-sensitive): + \u2003`y` - years + \u2003`m` - months∗ + \u2003`w` - weeks + \u2003`d` - days + \u2003`h` - hours + \u2003`M` - minutes∗ + \u2003`s` - seconds + + Alternatively, an ISO 8601 timestamp can be provided for the duration. + """ + await self.apply_ban(ctx, user, reason, expires_at=duration) + + # endregion + # region: Permanent shadow infractions + + @command(hidden=True) + async def note(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Create a private note for a user with the given reason without notifying the user.""" + infraction = await utils.post_infraction(ctx, user, "note", reason, hidden=True, active=False) + if infraction is None: + return + + await self.apply_infraction(ctx, infraction, user) + + @command(hidden=True, aliases=['shadowkick', 'skick']) + async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Kick a user for the given reason without notifying the user.""" + await self.apply_kick(ctx, user, reason, hidden=True, active=False) + + @command(hidden=True, aliases=['shadowban', 'sban']) + async def shadow_ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Permanently ban a user for the given reason without notifying the user.""" + await self.apply_ban(ctx, user, reason, hidden=True) + + # endregion + # region: Temporary shadow infractions + + @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"]) + async def shadow_tempmute(self, ctx: Context, user: Member, duration: utils.Expiry, *, reason: str = None) -> None: + """ + Temporarily mute a user for the given reason and duration without notifying the user. + + A unit of time should be appended to the duration. + Units (∗case-sensitive): + \u2003`y` - years + \u2003`m` - months∗ + \u2003`w` - weeks + \u2003`d` - days + \u2003`h` - hours + \u2003`M` - minutes∗ + \u2003`s` - seconds + + Alternatively, an ISO 8601 timestamp can be provided for the duration. + """ + await self.apply_mute(ctx, user, reason, expires_at=duration, hidden=True) + + @command(hidden=True, aliases=["shadowtempban, stempban"]) + async def shadow_tempban( + self, + ctx: Context, + user: MemberConverter, + duration: utils.Expiry, + *, + reason: str = None + ) -> None: + """ + Temporarily ban a user for the given reason and duration without notifying the user. + + A unit of time should be appended to the duration. + Units (∗case-sensitive): + \u2003`y` - years + \u2003`m` - months∗ + \u2003`w` - weeks + \u2003`d` - days + \u2003`h` - hours + \u2003`M` - minutes∗ + \u2003`s` - seconds + + Alternatively, an ISO 8601 timestamp can be provided for the duration. + """ + await self.apply_ban(ctx, user, reason, expires_at=duration, hidden=True) + + # endregion + # region: Remove infractions (un- commands) + + @command() + async def unmute(self, ctx: Context, user: MemberConverter) -> None: + """Prematurely end the active mute infraction for the user.""" + await self.pardon_infraction(ctx, "mute", user) + + @command() + async def unban(self, ctx: Context, user: MemberConverter) -> None: + """Prematurely end the active ban infraction for the user.""" + await self.pardon_infraction(ctx, "ban", user) + + # endregion + # region: Base infraction functions + + async def apply_mute(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: + """Apply a mute infraction with kwargs passed to `post_infraction`.""" + if await utils.has_active_infraction(ctx, user, "mute"): + return + + infraction = await utils.post_infraction(ctx, user, "mute", reason, **kwargs) + if infraction is None: + return + + self.mod_log.ignore(Event.member_update, user.id) + + action = user.add_roles(self._muted_role, reason=reason) + await self.apply_infraction(ctx, infraction, user, action) + + @respect_role_hierarchy() + async def apply_kick(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: + """Apply a kick infraction with kwargs passed to `post_infraction`.""" + infraction = await utils.post_infraction(ctx, user, "kick", reason, **kwargs) + if infraction is None: + return + + self.mod_log.ignore(Event.member_remove, user.id) + + action = user.kick(reason=reason) + await self.apply_infraction(ctx, infraction, user, action) + + @respect_role_hierarchy() + async def apply_ban(self, ctx: Context, user: MemberObject, reason: str, **kwargs) -> None: + """Apply a ban infraction with kwargs passed to `post_infraction`.""" + if await utils.has_active_infraction(ctx, user, "ban"): + return + + infraction = await utils.post_infraction(ctx, user, "ban", reason, **kwargs) + if infraction is None: + return + + self.mod_log.ignore(Event.member_remove, user.id) + + action = ctx.guild.ban(user, reason=reason, delete_message_days=0) + await self.apply_infraction(ctx, infraction, user, action) + + # endregion + # region: Utility functions + + async def _scheduled_task(self, infraction: utils.Infraction) -> None: + """ + Marks an infraction expired after the delay from time of scheduling to time of expiration. + + At the time of expiration, the infraction is marked as inactive on the website and the + expiration task is cancelled. + """ + _id = infraction["id"] + + expiry = dateutil.parser.isoparse(infraction["expires_at"]).replace(tzinfo=None) + await time.wait_until(expiry) + + log.debug(f"Marking infraction {_id} as inactive (expired).") + await self.deactivate_infraction(infraction) + + async def deactivate_infraction( + self, + infraction: utils.Infraction, + send_log: bool = True + ) -> t.Dict[str, str]: + """ + Deactivate an active infraction and return a dictionary of lines to send in a mod log. + + The infraction is removed from Discord, marked as inactive in the database, and has its + expiration task cancelled. If `send_log` is True, a mod log is sent for the + deactivation of the infraction. + + Supported infraction types are mute and ban. Other types will raise a ValueError. + """ + guild = self.bot.get_guild(constants.Guild.id) + mod_role = guild.get_role(constants.Roles.moderator) + user_id = infraction["user"] + _type = infraction["type"] + _id = infraction["id"] + reason = f"Infraction #{_id} expired or was pardoned." + + log.debug(f"Marking infraction #{_id} as inactive (expired).") + + log_content = None + log_text = { + "Member": str(user_id), + "Actor": str(self.bot.user), + "Reason": infraction["reason"] + } + + try: + if _type == "mute": + user = guild.get_member(user_id) + if user: + # Remove the muted role. + self.mod_log.ignore(Event.member_update, user.id) + await user.remove_roles(self._muted_role, reason=reason) + + # DM the user about the expiration. + notified = await utils.notify_pardon( + user=user, + title="You have been unmuted.", + content="You may now send messages in the server.", + icon_url=utils.INFRACTION_ICONS["mute"][1] + ) + + log_text["Member"] = f"{user.mention}(`{user.id}`)" + log_text["DM"] = "Sent" if notified else "**Failed**" + else: + log.info(f"Failed to unmute user {user_id}: user not found") + log_text["Failure"] = "User was not found in the guild." + elif _type == "ban": + user = discord.Object(user_id) + self.mod_log.ignore(Event.member_unban, user_id) + try: + await guild.unban(user, reason=reason) + except discord.NotFound: + log.info(f"Failed to unban user {user_id}: no active ban found on Discord") + log_text["Note"] = "No active ban found on Discord." + else: + raise ValueError( + f"Attempted to deactivate an unsupported infraction #{_id} ({_type})!" + ) + except discord.Forbidden: + log.warning(f"Failed to deactivate infraction #{_id} ({_type}): bot lacks permissions") + log_text["Failure"] = f"The bot lacks permissions to do this (role hierarchy?)" + log_content = mod_role.mention + except discord.HTTPException as e: + log.exception(f"Failed to deactivate infraction #{_id} ({_type})") + log_text["Failure"] = f"HTTPException with code {e.code}." + log_content = mod_role.mention + + # Check if the user is currently being watched by Big Brother. + try: + active_watch = await self.bot.api_client.get( + "bot/infractions", + params={ + "active": "true", + "type": "watch", + "user__id": user_id + } + ) + + log_text["Watching"] = "Yes" if active_watch else "No" + except ResponseCodeError: + log.exception(f"Failed to fetch watch status for user {user_id}") + log_text["Watching"] = "Unknown - failed to fetch watch status." + + try: + # Mark infraction as inactive in the database. + await self.bot.api_client.patch( + f"bot/infractions/{_id}", + json={"active": False} + ) + except ResponseCodeError as e: + log.exception(f"Failed to deactivate infraction #{_id} ({_type})") + log_line = f"API request failed with code {e.status}." + log_content = mod_role.mention + + # Append to an existing failure message if possible + if "Failure" in log_text: + log_text["Failure"] += f" {log_line}" + else: + log_text["Failure"] = log_line + + # Cancel the expiration task. + if infraction["expires_at"] is not None: + self.cancel_task(infraction["id"]) + + # Send a log message to the mod log. + if send_log: + log_title = f"expiration failed" if "Failure" in log_text else "expired" + + await self.mod_log.send_log_message( + icon_url=utils.INFRACTION_ICONS[_type][1], + colour=Colours.soft_green, + title=f"Infraction {log_title}: {_type}", + text="\n".join(f"{k}: {v}" for k, v in log_text.items()), + footer=f"ID: {_id}", + content=log_content, + ) + + return log_text + + async def apply_infraction( + self, + ctx: Context, + infraction: utils.Infraction, + user: MemberObject, + action_coro: t.Optional[t.Awaitable] = None + ) -> None: + """Apply an infraction to the user, log the infraction, and optionally notify the user.""" + infr_type = infraction["type"] + icon = utils.INFRACTION_ICONS[infr_type][0] + reason = infraction["reason"] + expiry = infraction["expires_at"] + + if expiry: + expiry = time.format_infraction(expiry) + + # Default values for the confirmation message and mod log. + confirm_msg = f":ok_hand: applied" + expiry_msg = f" until {expiry}" if expiry else " permanently" + dm_result = "" + dm_log_text = "" + expiry_log_text = f"Expires: {expiry}" if expiry else "" + log_title = "applied" + log_content = None + + # DM the user about the infraction if it's not a shadow/hidden infraction. + if not infraction["hidden"]: + # Sometimes user is a discord.Object; make it a proper user. + await self.bot.fetch_user(user.id) + + # Accordingly display whether the user was successfully notified via DM. + if await utils.notify_infraction(user, infr_type, expiry, reason, icon): + dm_result = ":incoming_envelope: " + dm_log_text = "\nDM: Sent" + else: + dm_log_text = "\nDM: **Failed**" + log_content = ctx.author.mention + + if infraction["actor"] == self.bot.user.id: + end_msg = f" (reason: {infraction['reason']})" + else: + infractions = await self.bot.api_client.get( + "bot/infractions", + params={"user__id": str(user.id)} + ) + end_msg = f" ({len(infractions)} infractions total)" + + # Execute the necessary actions to apply the infraction on Discord. + if action_coro: + try: + await action_coro + if expiry: + # Schedule the expiration of the infraction. + self.schedule_task(ctx.bot.loop, infraction["id"], infraction) + except discord.Forbidden: + # Accordingly display that applying the infraction failed. + confirm_msg = f":x: failed to apply" + expiry_msg = "" + log_content = ctx.author.mention + log_title = "failed to apply" + + # Send a confirmation message to the invoking context. + await ctx.send( + f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}{end_msg}." + ) + + # Send a log message to the mod log. + await self.mod_log.send_log_message( + icon_url=icon, + colour=Colours.soft_red, + title=f"Infraction {log_title}: {infr_type}", + thumbnail=user.avatar_url_as(static_format="png"), + text=textwrap.dedent(f""" + Member: {user.mention} (`{user.id}`) + Actor: {ctx.message.author}{dm_log_text} + Reason: {reason} + {expiry_log_text} + """), + content=log_content, + footer=f"ID {infraction['id']}" + ) + + async def pardon_infraction(self, ctx: Context, infr_type: str, user: MemberObject) -> None: + """Prematurely end an infraction for a user and log the action in the mod log.""" + # Check the current active infraction + response = await self.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': infr_type, + 'user__id': user.id + } + ) + + if not response: + await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.") + return + + # Deactivate the infraction and cancel its scheduled expiration task. + log_text = await self.deactivate_infraction(response[0], send_log=False) + + log_text["Member"] = f"{user.mention}(`{user.id}`)" + log_text["Actor"] = str(ctx.message.author) + log_content = None + footer = f"ID: {response[0]['id']}" + + # If multiple active infractions were found, mark them as inactive in the database + # and cancel their expiration tasks. + if len(response) > 1: + log.warning(f"Found more than one active {infr_type} infraction for user {user.id}") + + footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + + log_note = f"Found multiple **active** {infr_type} infractions in the database." + if "Note" in log_text: + log_text["Note"] = f" {log_note}" + else: + log_text["Note"] = log_note + + # deactivate_infraction() is not called again because: + # 1. Discord cannot store multiple active bans or assign multiples of the same role + # 2. It would send a pardon DM for each active infraction, which is redundant + for infraction in response[1:]: + _id = infraction['id'] + try: + # Mark infraction as inactive in the database. + await self.bot.api_client.patch( + f"bot/infractions/{_id}", + json={"active": False} + ) + except ResponseCodeError: + log.exception(f"Failed to deactivate infraction #{_id} ({infr_type})") + # This is simpler and cleaner than trying to concatenate all the errors. + log_text["Failure"] = "See bot's logs for details." + + # Cancel pending expiration task. + if infraction["expires_at"] is not None: + self.cancel_task(infraction["id"]) + + # Accordingly display whether the user was successfully notified via DM. + dm_emoji = "" + if log_text.get("DM") == "Sent": + dm_emoji = ":incoming_envelope: " + elif "DM" in log_text: + # Mention the actor because the DM failed to send. + log_content = ctx.author.mention + + # Accordingly display whether the pardon failed. + if "Failure" in log_text: + confirm_msg = ":x: failed to pardon" + log_title = "pardon failed" + log_content = ctx.author.mention + else: + confirm_msg = f":ok_hand: pardoned" + log_title = "pardoned" + + # Send a confirmation message to the invoking context. + await ctx.send( + f"{dm_emoji}{confirm_msg} infraction **{infr_type}** for {user.mention}. " + f"{log_text.get('Failure', '')}" + ) + + # Send a log message to the mod log. + await self.mod_log.send_log_message( + icon_url=utils.INFRACTION_ICONS[infr_type][1], + colour=Colours.soft_green, + title=f"Infraction {log_title}: {infr_type}", + thumbnail=user.avatar_url_as(static_format="png"), + text="\n".join(f"{k}: {v}" for k, v in log_text.items()), + footer=footer, + content=log_content, + ) + + # endregion + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return with_role_check(ctx, *constants.MODERATION_ROLES) + + # This cannot be static (must have a __func__ attribute). + async def cog_command_error(self, ctx: Context, error: Exception) -> None: + """Send a notification to the invoking context on a Union failure.""" + if isinstance(error, commands.BadUnionArgument): + if discord.User in error.converters: + await ctx.send(str(error.errors[0])) + error.handled = True diff --git a/bot/cogs/moderation/management.py b/bot/cogs/moderation/management.py new file mode 100644 index 000000000..491f6d400 --- /dev/null +++ b/bot/cogs/moderation/management.py @@ -0,0 +1,268 @@ +import asyncio +import logging +import textwrap +import typing as t + +import discord +from discord.ext import commands +from discord.ext.commands import Context + +from bot import constants +from bot.converters import InfractionSearchQuery +from bot.pagination import LinePaginator +from bot.utils import time +from bot.utils.checks import with_role_check +from . import utils +from .infractions import Infractions +from .modlog import ModLog + +log = logging.getLogger(__name__) + +UserConverter = t.Union[discord.User, utils.proxy_user] + + +def permanent_duration(expires_at: str) -> str: + """Only allow an expiration to be 'permanent' if it is a string.""" + expires_at = expires_at.lower() + if expires_at != "permanent": + raise commands.BadArgument + else: + return expires_at + + +class ModManagement(commands.Cog): + """Management of infractions.""" + + category = "Moderation" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @property + def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" + return self.bot.get_cog("ModLog") + + @property + def infractions_cog(self) -> Infractions: + """Get currently loaded Infractions cog instance.""" + return self.bot.get_cog("Infractions") + + # region: Edit infraction commands + + @commands.group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) + async def infraction_group(self, ctx: Context) -> None: + """Infraction manipulation commands.""" + await ctx.invoke(self.bot.get_command("help"), "infraction") + + @infraction_group.command(name='edit') + async def infraction_edit( + self, + ctx: Context, + infraction_id: int, + duration: t.Union[utils.Expiry, permanent_duration, None], + *, + reason: str = None + ) -> None: + """ + Edit the duration and/or the reason of an infraction. + + Durations are relative to the time of updating and should be appended with a unit of time. + Units (∗case-sensitive): + \u2003`y` - years + \u2003`m` - months∗ + \u2003`w` - weeks + \u2003`d` - days + \u2003`h` - hours + \u2003`M` - minutes∗ + \u2003`s` - seconds + + Use "permanent" to mark the infraction as permanent. Alternatively, an ISO 8601 timestamp + can be provided for the duration. + """ + if duration is None and reason is None: + # Unlike UserInputError, the error handler will show a specified message for BadArgument + raise commands.BadArgument("Neither a new expiry nor a new reason was specified.") + + # Retrieve the previous infraction for its information. + old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}') + + request_data = {} + confirm_messages = [] + log_text = "" + + if duration == "permanent": + request_data['expires_at'] = None + confirm_messages.append("marked as permanent") + elif duration is not None: + request_data['expires_at'] = duration.isoformat() + expiry = duration.strftime(time.INFRACTION_FORMAT) + confirm_messages.append(f"set to expire on {expiry}") + else: + confirm_messages.append("expiry unchanged") + + if reason: + request_data['reason'] = reason + confirm_messages.append("set a new reason") + log_text += f""" + Previous reason: {old_infraction['reason']} + New reason: {reason} + """.rstrip() + else: + confirm_messages.append("reason unchanged") + + # Update the infraction + new_infraction = await self.bot.api_client.patch( + f'bot/infractions/{infraction_id}', + json=request_data, + ) + + # Re-schedule infraction if the expiration has been updated + if 'expires_at' in request_data: + self.infractions_cog.cancel_task(new_infraction['id']) + loop = asyncio.get_event_loop() + self.infractions_cog.schedule_task(loop, new_infraction['id'], new_infraction) + + log_text += f""" + Previous expiry: {old_infraction['expires_at'] or "Permanent"} + New expiry: {new_infraction['expires_at'] or "Permanent"} + """.rstrip() + + await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}") + + # Get information about the infraction's user + user_id = new_infraction['user'] + user = ctx.guild.get_member(user_id) + + if user: + user_text = f"{user.mention} (`{user.id}`)" + thumbnail = user.avatar_url_as(static_format="png") + else: + user_text = f"`{user_id}`" + thumbnail = None + + # The infraction's actor + actor_id = new_infraction['actor'] + actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" + + await self.mod_log.send_log_message( + icon_url=constants.Icons.pencil, + colour=discord.Colour.blurple(), + title="Infraction edited", + thumbnail=thumbnail, + text=textwrap.dedent(f""" + Member: {user_text} + Actor: {actor} + Edited by: {ctx.message.author}{log_text} + """) + ) + + # endregion + # region: Search infractions + + @infraction_group.group(name="search", invoke_without_command=True) + async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: + """Searches for infractions in the database.""" + if isinstance(query, discord.User): + await ctx.invoke(self.search_user, query) + else: + await ctx.invoke(self.search_reason, query) + + @infraction_search_group.command(name="user", aliases=("member", "id")) + async def search_user(self, ctx: Context, user: UserConverter) -> None: + """Search for infractions by member.""" + infraction_list = await self.bot.api_client.get( + 'bot/infractions', + params={'user__id': str(user.id)} + ) + embed = discord.Embed( + title=f"Infractions for {user} ({len(infraction_list)} total)", + colour=discord.Colour.orange() + ) + await self.send_infraction_list(ctx, embed, infraction_list) + + @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) + async def search_reason(self, ctx: Context, reason: str) -> None: + """Search for infractions by their reason. Use Re2 for matching.""" + infraction_list = await self.bot.api_client.get( + 'bot/infractions', + params={'search': reason} + ) + embed = discord.Embed( + title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", + colour=discord.Colour.orange() + ) + await self.send_infraction_list(ctx, embed, infraction_list) + + # endregion + # region: Utility functions + + async def send_infraction_list( + self, + ctx: Context, + embed: discord.Embed, + infractions: t.Iterable[utils.Infraction] + ) -> None: + """Send a paginated embed of infractions for the specified user.""" + if not infractions: + await ctx.send(f":warning: No infractions could be found for that query.") + return + + lines = tuple( + self.infraction_to_string(infraction) + for infraction in infractions + ) + + await LinePaginator.paginate( + lines, + ctx=ctx, + embed=embed, + empty=True, + max_lines=3, + max_size=1000 + ) + + def infraction_to_string(self, infraction: utils.Infraction) -> str: + """Convert the infraction object to a string representation.""" + actor_id = infraction["actor"] + guild = self.bot.get_guild(constants.Guild.id) + actor = guild.get_member(actor_id) + active = infraction["active"] + user_id = infraction["user"] + hidden = infraction["hidden"] + created = time.format_infraction(infraction["inserted_at"]) + if infraction["expires_at"] is None: + expires = "*Permanent*" + else: + expires = time.format_infraction(infraction["expires_at"]) + + lines = textwrap.dedent(f""" + {"**===============**" if active else "==============="} + Status: {"__**Active**__" if active else "Inactive"} + User: {self.bot.get_user(user_id)} (`{user_id}`) + Type: **{infraction["type"]}** + Shadow: {hidden} + Reason: {infraction["reason"] or "*None*"} + Created: {created} + Expires: {expires} + Actor: {actor.mention if actor else actor_id} + ID: `{infraction["id"]}` + {"**===============**" if active else "==============="} + """) + + return lines.strip() + + # endregion + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return with_role_check(ctx, *constants.MODERATION_ROLES) + + # This cannot be static (must have a __func__ attribute). + async def cog_command_error(self, ctx: Context, error: Exception) -> None: + """Send a notification to the invoking context on a Union failure.""" + if isinstance(error, commands.BadUnionArgument): + if discord.User in error.converters: + await ctx.send(str(error.errors[0])) + error.handled = True diff --git a/bot/cogs/modlog.py b/bot/cogs/moderation/modlog.py index 68424d268..118503517 100644 --- a/bot/cogs/modlog.py +++ b/bot/cogs/moderation/modlog.py @@ -1,30 +1,26 @@ import asyncio import logging +import typing as t from datetime import datetime -from typing import List, Optional, Union +import discord from dateutil.relativedelta import relativedelta from deepdiff import DeepDiff -from discord import ( - CategoryChannel, Colour, Embed, File, Guild, - Member, Message, NotFound, RawMessageDeleteEvent, - RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel -) +from discord import Colour from discord.abc import GuildChannel from discord.ext.commands import Bot, Cog, Context -from bot.constants import ( - Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs -) +from bot.constants import Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs from bot.utils.time import humanize_delta +from .utils import UserTypes log = logging.getLogger(__name__) -GUILD_CHANNEL = Union[CategoryChannel, TextChannel, VoiceChannel] +GUILD_CHANNEL = t.Union[discord.CategoryChannel, discord.TextChannel, discord.VoiceChannel] CHANNEL_CHANGES_UNSUPPORTED = ("permissions",) CHANNEL_CHANGES_SUPPRESSED = ("_overwrites", "position") -MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status") +MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status", "nick") ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions") @@ -38,7 +34,7 @@ class ModLog(Cog, name="ModLog"): self._cached_deletes = [] self._cached_edits = [] - async def upload_log(self, messages: List[Message], actor_id: int) -> str: + async def upload_log(self, messages: t.List[discord.Message], actor_id: int) -> str: """ Uploads the log data to the database via an API endpoint for uploading logs. @@ -73,23 +69,23 @@ class ModLog(Cog, name="ModLog"): self._ignored[event].append(item) async def send_log_message( - self, - icon_url: Optional[str], - colour: Colour, - title: Optional[str], - text: str, - thumbnail: Optional[str] = None, - channel_id: int = Channels.modlog, - ping_everyone: bool = False, - files: Optional[List[File]] = None, - content: Optional[str] = None, - additional_embeds: Optional[List[Embed]] = None, - additional_embeds_msg: Optional[str] = None, - timestamp_override: Optional[datetime] = None, - footer: Optional[str] = None, + self, + icon_url: t.Optional[str], + colour: t.Union[discord.Colour, int], + title: t.Optional[str], + text: str, + thumbnail: t.Optional[t.Union[str, discord.Asset]] = None, + channel_id: int = Channels.modlog, + ping_everyone: bool = False, + files: t.Optional[t.List[discord.File]] = None, + content: t.Optional[str] = None, + additional_embeds: t.Optional[t.List[discord.Embed]] = None, + additional_embeds_msg: t.Optional[str] = None, + timestamp_override: t.Optional[datetime] = None, + footer: t.Optional[str] = None, ) -> Context: """Generate log embed and send to logging channel.""" - embed = Embed(description=text) + embed = discord.Embed(description=text) if title and icon_url: embed.set_author(name=title, icon_url=icon_url) @@ -126,10 +122,10 @@ class ModLog(Cog, name="ModLog"): if channel.guild.id != GuildConstant.id: return - if isinstance(channel, CategoryChannel): + if isinstance(channel, discord.CategoryChannel): title = "Category created" message = f"{channel.name} (`{channel.id}`)" - elif isinstance(channel, VoiceChannel): + elif isinstance(channel, discord.VoiceChannel): title = "Voice channel created" if channel.category: @@ -144,7 +140,7 @@ class ModLog(Cog, name="ModLog"): else: message = f"{channel.name} (`{channel.id}`)" - await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message) + await self.send_log_message(Icons.hash_green, Colours.soft_green, title, message) @Cog.listener() async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None: @@ -152,20 +148,20 @@ class ModLog(Cog, name="ModLog"): if channel.guild.id != GuildConstant.id: return - if isinstance(channel, CategoryChannel): + if isinstance(channel, discord.CategoryChannel): title = "Category deleted" - elif isinstance(channel, VoiceChannel): + elif isinstance(channel, discord.VoiceChannel): title = "Voice channel deleted" else: title = "Text channel deleted" - if channel.category and not isinstance(channel, CategoryChannel): + if channel.category and not isinstance(channel, discord.CategoryChannel): message = f"{channel.category}/{channel.name} (`{channel.id}`)" else: message = f"{channel.name} (`{channel.id}`)" await self.send_log_message( - Icons.hash_red, Colour(Colours.soft_red), + Icons.hash_red, Colours.soft_red, title, message ) @@ -230,29 +226,29 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_role_create(self, role: Role) -> None: + async def on_guild_role_create(self, role: discord.Role) -> None: """Log role create event to mod log.""" if role.guild.id != GuildConstant.id: return await self.send_log_message( - Icons.crown_green, Colour(Colours.soft_green), + Icons.crown_green, Colours.soft_green, "Role created", f"`{role.id}`" ) @Cog.listener() - async def on_guild_role_delete(self, role: Role) -> None: + async def on_guild_role_delete(self, role: discord.Role) -> None: """Log role delete event to mod log.""" if role.guild.id != GuildConstant.id: return await self.send_log_message( - Icons.crown_red, Colour(Colours.soft_red), + Icons.crown_red, Colours.soft_red, "Role removed", f"{role.name} (`{role.id}`)" ) @Cog.listener() - async def on_guild_role_update(self, before: Role, after: Role) -> None: + async def on_guild_role_update(self, before: discord.Role, after: discord.Role) -> None: """Log role update event to mod log.""" if before.guild.id != GuildConstant.id: return @@ -305,7 +301,7 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_update(self, before: Guild, after: Guild) -> None: + async def on_guild_update(self, before: discord.Guild, after: discord.Guild) -> None: """Log guild update event to mod log.""" if before.id != GuildConstant.id: return @@ -356,8 +352,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None: - """Log ban event to mod log.""" + async def on_member_ban(self, guild: discord.Guild, member: UserTypes) -> None: + """Log ban event to user log.""" if guild.id != GuildConstant.id: return @@ -366,14 +362,14 @@ class ModLog(Cog, name="ModLog"): return await self.send_log_message( - Icons.user_ban, Colour(Colours.soft_red), + Icons.user_ban, Colours.soft_red, "User banned", f"{member.name}#{member.discriminator} (`{member.id}`)", thumbnail=member.avatar_url_as(static_format="png"), - channel_id=Channels.modlog + channel_id=Channels.userlog ) @Cog.listener() - async def on_member_join(self, member: Member) -> None: + async def on_member_join(self, member: discord.Member) -> None: """Log member join event to user log.""" if member.guild.id != GuildConstant.id: return @@ -388,14 +384,14 @@ class ModLog(Cog, name="ModLog"): message = f"{Emojis.new} {message}" await self.send_log_message( - Icons.sign_in, Colour(Colours.soft_green), + Icons.sign_in, Colours.soft_green, "User joined", message, thumbnail=member.avatar_url_as(static_format="png"), channel_id=Channels.userlog ) @Cog.listener() - async def on_member_remove(self, member: Member) -> None: + async def on_member_remove(self, member: discord.Member) -> None: """Log member leave event to user log.""" if member.guild.id != GuildConstant.id: return @@ -405,14 +401,14 @@ class ModLog(Cog, name="ModLog"): return await self.send_log_message( - Icons.sign_out, Colour(Colours.soft_red), + Icons.sign_out, Colours.soft_red, "User left", f"{member.name}#{member.discriminator} (`{member.id}`)", thumbnail=member.avatar_url_as(static_format="png"), channel_id=Channels.userlog ) @Cog.listener() - async def on_member_unban(self, guild: Guild, member: User) -> None: + async def on_member_unban(self, guild: discord.Guild, member: discord.User) -> None: """Log member unban event to mod log.""" if guild.id != GuildConstant.id: return @@ -429,7 +425,7 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_update(self, before: Member, after: Member) -> None: + async def on_member_update(self, before: discord.Member, after: discord.Member) -> None: """Log member update event to user log.""" if before.guild.id != GuildConstant.id: return @@ -502,6 +498,11 @@ class ModLog(Cog, name="ModLog"): f"**Discriminator:** `{before.discriminator}` **->** `{after.discriminator}`" ) + if before.display_name != after.display_name: + changes.append( + f"**Display name:** `{before.display_name}` **->** `{after.display_name}`" + ) + if not changes: return @@ -520,7 +521,7 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_message_delete(self, message: Message) -> None: + async def on_message_delete(self, message: discord.Message) -> None: """Log message delete event to message change log.""" channel = message.channel author = message.author @@ -576,7 +577,7 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None: + async def on_raw_message_delete(self, event: discord.RawMessageDeleteEvent) -> None: """Log raw message delete event to message change log.""" if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored: return @@ -610,14 +611,14 @@ class ModLog(Cog, name="ModLog"): ) await self.send_log_message( - Icons.message_delete, Colour(Colours.soft_red), + Icons.message_delete, Colours.soft_red, "Message deleted", response, channel_id=Channels.message_log ) @Cog.listener() - async def on_message_edit(self, before: Message, after: Message) -> None: + async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None: """Log message edit event to message change log.""" if ( not before.guild @@ -692,12 +693,12 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None: + async def on_raw_message_edit(self, event: discord.RawMessageUpdateEvent) -> None: """Log raw message edit event to message change log.""" try: channel = self.bot.get_channel(int(event.data["channel_id"])) message = await channel.fetch_message(event.message_id) - except NotFound: # Was deleted before we got the event + except discord.NotFound: # Was deleted before we got the event return if ( @@ -760,9 +761,3 @@ class ModLog(Cog, name="ModLog"): Icons.message_edit, Colour.blurple(), "Message edited (After)", after_response, channel_id=Channels.message_log ) - - -def setup(bot: Bot) -> None: - """Mod log cog load.""" - bot.add_cog(ModLog(bot)) - log.info("Cog loaded: ModLog") diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/moderation/superstarify.py index 87021eded..ccc6395d9 100644 --- a/bot/cogs/superstarify/__init__.py +++ b/bot/cogs/moderation/superstarify.py @@ -1,21 +1,23 @@ +import json import logging import random +from pathlib import Path from discord import Colour, Embed, Member from discord.errors import Forbidden from discord.ext.commands import Bot, Cog, Context, command -from bot.cogs.moderation import Moderation -from bot.cogs.modlog import ModLog -from bot.cogs.superstarify.stars import get_nick -from bot.constants import Icons, MODERATION_ROLES, POSITIVE_REPLIES -from bot.converters import Duration -from bot.decorators import with_role -from bot.utils.moderation import post_infraction +from bot import constants +from bot.utils.checks import with_role_check from bot.utils.time import format_infraction +from . import utils +from .modlog import ModLog log = logging.getLogger(__name__) -NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#wiki-toc-nickname-policy" +NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#nickname-policy" + +with Path("bot/resources/stars.json").open(encoding="utf-8") as stars_file: + STAR_NAMES = json.load(stars_file) class Superstarify(Cog): @@ -25,11 +27,6 @@ class Superstarify(Cog): self.bot = bot @property - def moderation(self) -> Moderation: - """Get currently loaded Moderation cog instance.""" - return self.bot.get_cog("Moderation") - - @property def modlog(self) -> ModLog: """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @@ -62,7 +59,7 @@ class Superstarify(Cog): if active_superstarifies: [infraction] = active_superstarifies - forced_nick = get_nick(infraction['id'], before.id) + forced_nick = self.get_nick(infraction['id'], before.id) if after.display_name == forced_nick: return # Nick change was triggered by this event. Ignore. @@ -108,7 +105,7 @@ class Superstarify(Cog): if active_superstarifies: [infraction] = active_superstarifies - forced_nick = get_nick(infraction['id'], member.id) + forced_nick = self.get_nick(infraction['id'], member.id) await member.edit(nick=forced_nick) end_timestamp_human = format_infraction(infraction['expires_at']) @@ -138,7 +135,7 @@ class Superstarify(Cog): f"Superstardom ends: **{end_timestamp_human}**" ) await self.modlog.send_log_message( - icon_url=Icons.user_update, + icon_url=constants.Icons.user_update, colour=Colour.gold(), title="Superstar member rejoined server", text=mod_log_message, @@ -146,45 +143,39 @@ class Superstarify(Cog): ) @command(name='superstarify', aliases=('force_nick', 'star')) - @with_role(*MODERATION_ROLES) - async def superstarify( - self, ctx: Context, member: Member, expiration: Duration, reason: str = None - ) -> None: + async def superstarify(self, ctx: Context, member: Member, duration: utils.Expiry, reason: str = None) -> None: """ Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration. - An optional reason can be provided. + A unit of time should be appended to the duration. + Units (∗case-sensitive): + \u2003`y` - years + \u2003`m` - months∗ + \u2003`w` - weeks + \u2003`d` - days + \u2003`h` - hours + \u2003`M` - minutes∗ + \u2003`s` - seconds + + Alternatively, an ISO 8601 timestamp can be provided for the duration. - If no reason is given, the original name will be shown in a generated reason. + An optional reason can be provided. If no reason is given, the original name will be shown + in a generated reason. """ - active_superstarifies = await self.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': 'superstar', - 'user__id': str(member.id) - } - ) - if active_superstarifies: - await ctx.send( - ":x: According to my records, this user is already superstarified. " - f"See infraction **#{active_superstarifies[0]['id']}**." - ) + if await utils.has_active_infraction(ctx, member, "superstar"): return - infraction = await post_infraction( - ctx, member, - type='superstar', reason=reason or ('old nick: ' + member.display_name), - expires_at=expiration - ) - forced_nick = get_nick(infraction['id'], member.id) + reason = reason or ('old nick: ' + member.display_name) + infraction = await utils.post_infraction(ctx, member, 'superstar', reason, expires_at=duration) + forced_nick = self.get_nick(infraction['id'], member.id) + expiry_str = format_infraction(infraction["expires_at"]) embed = Embed() embed.title = "Congratulations!" embed.description = ( f"Your previous nickname, **{member.display_name}**, was so bad that we have decided to change it. " f"Your new nickname will be **{forced_nick}**.\n\n" - f"You will be unable to change your nickname until \n**{expiration}**.\n\n" + f"You will be unable to change your nickname until \n**{expiry_str}**.\n\n" "If you're confused by this, please read our " f"[official nickname policy]({NICKNAME_POLICY_URL})." ) @@ -196,20 +187,20 @@ class Superstarify(Cog): f"Superstarified by **{ctx.author.name}**\n" f"Old nickname: `{member.display_name}`\n" f"New nickname: `{forced_nick}`\n" - f"Superstardom ends: **{expiration}**" + f"Superstardom ends: **{expiry_str}**" ) await self.modlog.send_log_message( - icon_url=Icons.user_update, + icon_url=constants.Icons.user_update, colour=Colour.gold(), title="Member Achieved Superstardom", text=mod_log_message, thumbnail=member.avatar_url_as(static_format="png") ) - await self.moderation.notify_infraction( + await utils.notify_infraction( user=member, infr_type="Superstarify", - expires_at=expiration, + expires_at=expiry_str, reason=f"Your nickname didn't comply with our [nickname policy]({NICKNAME_POLICY_URL})." ) @@ -219,7 +210,6 @@ class Superstarify(Cog): await ctx.send(embed=embed) @command(name='unsuperstarify', aliases=('release_nick', 'unstar')) - @with_role(*MODERATION_ROLES) async def unsuperstarify(self, ctx: Context, member: Member) -> None: """Remove the superstarify entry from our database, allowing the user to change their nickname.""" log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}") @@ -247,9 +237,9 @@ class Superstarify(Cog): embed = Embed() embed.description = "User has been released from superstar-prison." - embed.title = random.choice(POSITIVE_REPLIES) + embed.title = random.choice(constants.POSITIVE_REPLIES) - await self.moderation.notify_pardon( + await utils.notify_pardon( user=member, title="You are no longer superstarified.", content="You may now change your nickname on the server." @@ -257,8 +247,13 @@ class Superstarify(Cog): log.trace(f"{member.display_name} was successfully released from superstar-prison.") await ctx.send(embed=embed) + @staticmethod + def get_nick(infraction_id: int, member_id: int) -> str: + """Randomly select a nickname from the Superstarify nickname list.""" + rng = random.Random(str(infraction_id) + str(member_id)) + return rng.choice(STAR_NAMES) -def setup(bot: Bot) -> None: - """Superstarify cog load.""" - bot.add_cog(Superstarify(bot)) - log.info("Cog loaded: Superstarify") + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return with_role_check(ctx, *constants.MODERATION_ROLES) diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py new file mode 100644 index 000000000..788a40d40 --- /dev/null +++ b/bot/cogs/moderation/utils.py @@ -0,0 +1,172 @@ +import logging +import textwrap +import typing as t +from datetime import datetime + +import discord +from discord.ext import commands +from discord.ext.commands import Context + +from bot.api import ResponseCodeError +from bot.constants import Colours, Icons +from bot.converters import Duration, ISODateTime + +log = logging.getLogger(__name__) + +# apply icon, pardon icon +INFRACTION_ICONS = { + "mute": (Icons.user_mute, Icons.user_unmute), + "kick": (Icons.sign_out, None), + "ban": (Icons.user_ban, Icons.user_unban), + "warning": (Icons.user_warn, None), + "note": (Icons.user_warn, None), +} +RULES_URL = "https://pythondiscord.com/pages/rules" +APPEALABLE_INFRACTIONS = ("ban", "mute") + +UserTypes = t.Union[discord.Member, discord.User] +MemberObject = t.Union[UserTypes, discord.Object] +Infraction = t.Dict[str, t.Union[str, int, bool]] +Expiry = t.Union[Duration, ISODateTime] + + +def proxy_user(user_id: str) -> discord.Object: + """ + Create a proxy user object from the given id. + + Used when a Member or User object cannot be resolved. + """ + try: + user_id = int(user_id) + except ValueError: + raise commands.BadArgument + + user = discord.Object(user_id) + user.mention = user.id + user.avatar_url_as = lambda static_format: None + + return user + + +async def post_infraction( + ctx: Context, + user: MemberObject, + infr_type: str, + reason: str, + expires_at: datetime = None, + hidden: bool = False, + active: bool = True, +) -> t.Optional[dict]: + """Posts an infraction to the API.""" + payload = { + "actor": ctx.message.author.id, + "hidden": hidden, + "reason": reason, + "type": infr_type, + "user": user.id, + "active": active + } + if expires_at: + payload['expires_at'] = expires_at.isoformat() + + try: + response = await ctx.bot.api_client.post('bot/infractions', json=payload) + except ResponseCodeError as exp: + if exp.status == 400 and 'user' in exp.response_json: + log.info( + f"{ctx.author} tried to add a {infr_type} infraction to `{user.id}`, " + "but that user id was not found in the database." + ) + await ctx.send( + f":x: Cannot add infraction, the specified user is not known to the database." + ) + return + else: + log.exception("An unexpected ResponseCodeError occurred while adding an infraction:") + await ctx.send(":x: There was an error adding the infraction.") + return + + return response + + +async def has_active_infraction(ctx: Context, user: MemberObject, infr_type: str) -> bool: + """Checks if a user already has an active infraction of the given type.""" + active_infractions = await ctx.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': infr_type, + 'user__id': str(user.id) + } + ) + if active_infractions: + await ctx.send( + f":x: According to my records, this user already has a {infr_type} infraction. " + f"See infraction **#{active_infractions[0]['id']}**." + ) + return True + else: + return False + + +async def notify_infraction( + user: UserTypes, + infr_type: str, + expires_at: t.Optional[str] = None, + reason: t.Optional[str] = None, + icon_url: str = Icons.token_removed +) -> bool: + """DM a user about their new infraction and return True if the DM is successful.""" + embed = discord.Embed( + description=textwrap.dedent(f""" + **Type:** {infr_type.capitalize()} + **Expires:** {expires_at or "N/A"} + **Reason:** {reason or "No reason provided."} + """), + colour=Colours.soft_red + ) + + embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) + embed.title = f"Please review our rules over at {RULES_URL}" + embed.url = RULES_URL + + if infr_type in APPEALABLE_INFRACTIONS: + embed.set_footer( + text="To appeal this infraction, send an e-mail to [email protected]" + ) + + return await send_private_embed(user, embed) + + +async def notify_pardon( + user: UserTypes, + title: str, + content: str, + icon_url: str = Icons.user_verified +) -> bool: + """DM a user about their pardoned infraction and return True if the DM is successful.""" + embed = discord.Embed( + description=content, + colour=Colours.soft_green + ) + + embed.set_author(name=title, icon_url=icon_url) + + return await send_private_embed(user, embed) + + +async def send_private_embed(user: UserTypes, embed: discord.Embed) -> bool: + """ + A helper method for sending an embed to a user's DMs. + + Returns a boolean indicator of DM success. + """ + try: + await user.send(embed=embed) + return True + except (discord.HTTPException, discord.Forbidden, discord.NotFound): + log.debug( + f"Infraction-related information could not be sent to user {user} ({user.id}). " + "The user either could not be retrieved or probably disabled their DMs." + ) + return False diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py index 16717d523..2977e4ebb 100644 --- a/bot/cogs/off_topic_names.py +++ b/bot/cogs/off_topic_names.py @@ -75,14 +75,16 @@ class OffTopicNames(Cog): self.bot = bot self.updater_task = None + self.bot.loop.create_task(self.init_offtopic_updater()) + def cog_unload(self) -> None: """Cancel any running updater tasks on cog unload.""" if self.updater_task is not None: self.updater_task.cancel() - @Cog.listener() - async def on_ready(self) -> None: + async def init_offtopic_updater(self) -> None: """Start off-topic channel updating event loop if it hasn't already started.""" + await self.bot.wait_until_ready() if self.updater_task is None: coro = update_names(self.bot) self.updater_task = self.bot.loop.create_task(coro) diff --git a/bot/cogs/reddit.py b/bot/cogs/reddit.py index 6880aab85..0f575cece 100644 --- a/bot/cogs/reddit.py +++ b/bot/cogs/reddit.py @@ -34,6 +34,8 @@ class Reddit(Cog): self.new_posts_task = None self.top_weekly_posts_task = None + self.bot.loop.create_task(self.init_reddit_polling()) + async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]: """A helper method to fetch a certain amount of Reddit posts at a given route.""" # Reddit's JSON responses only provide 25 posts at most. @@ -262,9 +264,9 @@ class Reddit(Cog): max_lines=15 ) - @Cog.listener() - async def on_ready(self) -> None: + async def init_reddit_polling(self) -> None: """Initiate reddit post event loop.""" + await self.bot.wait_until_ready() self.reddit_channel = await self.bot.fetch_channel(Channels.reddit) if self.reddit_channel is not None: diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py index 6e91d2c06..b54622306 100644 --- a/bot/cogs/reminders.py +++ b/bot/cogs/reminders.py @@ -30,9 +30,11 @@ class Reminders(Scheduler, Cog): self.bot = bot super().__init__() - @Cog.listener() - async def on_ready(self) -> None: + self.bot.loop.create_task(self.reschedule_reminders()) + + async def reschedule_reminders(self) -> None: """Get all current reminders from the API and reschedule them.""" + await self.bot.wait_until_ready() response = await self.bot.api_client.get( 'bot/reminders', params={'active': 'true'} diff --git a/bot/cogs/superstarify/stars.py b/bot/cogs/superstarify/stars.py deleted file mode 100644 index dbac86770..000000000 --- a/bot/cogs/superstarify/stars.py +++ /dev/null @@ -1,87 +0,0 @@ -import random - - -STAR_NAMES = ( - "Adele", - "Aerosmith", - "Aretha Franklin", - "Ayumi Hamasaki", - "B'z", - "Barbra Streisand", - "Barry Manilow", - "Barry White", - "Beyonce", - "Billy Joel", - "Bob Dylan", - "Bob Marley", - "Bob Seger", - "Bon Jovi", - "Britney Spears", - "Bruce Springsteen", - "Bruno Mars", - "Bryan Adams", - "Celine Dion", - "Cher", - "Christina Aguilera", - "David Bowie", - "Donna Summer", - "Drake", - "Ed Sheeran", - "Elton John", - "Elvis Presley", - "Eminem", - "Enya", - "Flo Rida", - "Frank Sinatra", - "Garth Brooks", - "George Michael", - "George Strait", - "James Taylor", - "Janet Jackson", - "Jay-Z", - "Johnny Cash", - "Johnny Hallyday", - "Julio Iglesias", - "Justin Bieber", - "Justin Timberlake", - "Kanye West", - "Katy Perry", - "Kenny G", - "Kenny Rogers", - "Lady Gaga", - "Lil Wayne", - "Linda Ronstadt", - "Lionel Richie", - "Madonna", - "Mariah Carey", - "Meat Loaf", - "Michael Jackson", - "Neil Diamond", - "Nicki Minaj", - "Olivia Newton-John", - "Paul McCartney", - "Phil Collins", - "Pink", - "Prince", - "Reba McEntire", - "Rihanna", - "Robbie Williams", - "Rod Stewart", - "Santana", - "Shania Twain", - "Stevie Wonder", - "Taylor Swift", - "Tim McGraw", - "Tina Turner", - "Tom Petty", - "Tupac Shakur", - "Usher", - "Van Halen", - "Whitney Houston", -) - - -def get_nick(infraction_id: int, member_id: int) -> str: - """Randomly select a nickname from the Superstarify nickname list.""" - rng = random.Random(str(infraction_id) + str(member_id)) - return rng.choice(STAR_NAMES) diff --git a/bot/cogs/sync/cog.py b/bot/cogs/sync/cog.py index b75fb26cd..aaa581f96 100644 --- a/bot/cogs/sync/cog.py +++ b/bot/cogs/sync/cog.py @@ -29,9 +29,11 @@ class Sync(Cog): def __init__(self, bot: Bot) -> None: self.bot = bot - @Cog.listener() - async def on_ready(self) -> None: + self.bot.loop.create_task(self.sync_guild()) + + async def sync_guild(self) -> None: """Syncs the roles/users of the guild with the database.""" + await self.bot.wait_until_ready() guild = self.bot.get_guild(self.SYNC_SERVER_ID) if guild is not None: for syncer in self.ON_READY_SYNCERS: diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py index 7dd0afbbd..5a0d20e57 100644 --- a/bot/cogs/token_remover.py +++ b/bot/cogs/token_remover.py @@ -9,7 +9,7 @@ from discord import Colour, Message from discord.ext.commands import Bot, Cog from discord.utils import snowflake_time -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import Channels, Colours, Event, Icons log = logging.getLogger(__name__) @@ -26,11 +26,11 @@ DELETION_MESSAGE_TEMPLATE = ( DISCORD_EPOCH_TIMESTAMP = datetime(2017, 1, 1) TOKEN_EPOCH = 1_293_840_000 TOKEN_RE = re.compile( - r"[^\s\.]+" # Matches token part 1: The user ID string, encoded as base64 - r"\." # Matches a literal dot between the token parts - r"[^\s\.]+" # Matches token part 2: The creation timestamp, as an integer - r"\." # Matches a literal dot between the token parts - r"[^\s\.]+" # Matches token part 3: The HMAC, unused by us, but check that it isn't empty + r"[^\s\.()\"']+" # Matches token part 1: The user ID string, encoded as base64 + r"\." # Matches a literal dot between the token parts + r"[^\s\.()\"']+" # Matches token part 2: The creation timestamp, as an integer + r"\." # Matches a literal dot between the token parts + r"[^\s\.()\"']+" # Matches token part 3: The HMAC, unused by us, but check that it isn't empty ) diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py index 9306c8986..793fe4c1a 100644 --- a/bot/cogs/utils.py +++ b/bot/cogs/utils.py @@ -35,56 +35,58 @@ class Utils(Cog): await ctx.invoke(self.bot.get_command("help"), "pep") return - # Newer PEPs are written in RST instead of txt - if pep_number > 542: - pep_url = f"{self.base_github_pep_url}{pep_number:04}.rst" - else: - pep_url = f"{self.base_github_pep_url}{pep_number:04}.txt" - - # Attempt to fetch the PEP - log.trace(f"Requesting PEP {pep_number} with {pep_url}") - response = await self.bot.http_session.get(pep_url) - - if response.status == 200: - log.trace("PEP found") - - pep_content = await response.text() - - # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 - pep_header = HeaderParser().parse(StringIO(pep_content)) - - # Assemble the embed - pep_embed = Embed( - title=f"**PEP {pep_number} - {pep_header['Title']}**", - description=f"[Link]({self.base_pep_url}{pep_number:04})", - ) - - pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png") - - # Add the interesting information - if "Status" in pep_header: - pep_embed.add_field(name="Status", value=pep_header["Status"]) - if "Python-Version" in pep_header: - pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"]) - if "Created" in pep_header: - pep_embed.add_field(name="Created", value=pep_header["Created"]) - if "Type" in pep_header: - pep_embed.add_field(name="Type", value=pep_header["Type"]) + possible_extensions = ['.txt', '.rst'] + found_pep = False + for extension in possible_extensions: + # Attempt to fetch the PEP + pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}" + log.trace(f"Requesting PEP {pep_number} with {pep_url}") + response = await self.bot.http_session.get(pep_url) + + if response.status == 200: + log.trace("PEP found") + found_pep = True + + pep_content = await response.text() + + # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179 + pep_header = HeaderParser().parse(StringIO(pep_content)) + + # Assemble the embed + pep_embed = Embed( + title=f"**PEP {pep_number} - {pep_header['Title']}**", + description=f"[Link]({self.base_pep_url}{pep_number:04})", + ) - elif response.status == 404: + pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png") + + # Add the interesting information + if "Status" in pep_header: + pep_embed.add_field(name="Status", value=pep_header["Status"]) + if "Python-Version" in pep_header: + pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"]) + if "Created" in pep_header: + pep_embed.add_field(name="Created", value=pep_header["Created"]) + if "Type" in pep_header: + pep_embed.add_field(name="Type", value=pep_header["Type"]) + + elif response.status != 404: + # any response except 200 and 404 is expected + found_pep = True # actually not, but it's easier to display this way + log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " + f"{response.status}.\n{response.text}") + + error_message = "Unexpected HTTP error during PEP search. Please let us know." + pep_embed = Embed(title="Unexpected error", description=error_message) + pep_embed.colour = Colour.red() + break + + if not found_pep: log.trace("PEP was not found") not_found = f"PEP {pep_number} does not exist." pep_embed = Embed(title="PEP not found", description=not_found) pep_embed.colour = Colour.red() - else: - log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: " - f"{response.status}.\n{response.text}") - - error_message = "Unexpected HTTP error during PEP search. Please let us know." - pep_embed = Embed(title="Unexpected error", description=error_message) - pep_embed.colour = Colour.red() - await ctx.message.channel.send(embed=pep_embed) @command() diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py index f0a099f27..5b115deaa 100644 --- a/bot/cogs/verification.py +++ b/bot/cogs/verification.py @@ -1,10 +1,12 @@ import logging +from datetime import datetime from discord import Message, NotFound, Object +from discord.ext import tasks from discord.ext.commands import Bot, Cog, Context, command -from bot.cogs.modlog import ModLog -from bot.constants import Channels, Event, Roles +from bot.cogs.moderation import ModLog +from bot.constants import Bot as BotConfig, Channels, Event, Roles from bot.decorators import InChannelCheckFailure, in_channel, without_role log = logging.getLogger(__name__) @@ -27,12 +29,18 @@ from time to time, you can send `!subscribe` to <#{Channels.bot}> at any time to If you'd like to unsubscribe from the announcement notifications, simply send `!unsubscribe` to <#{Channels.bot}>. """ +PERIODIC_PING = ( + f"@everyone To verify that you have read our rules, please type `{BotConfig.prefix}accept`." + f" Ping <@&{Roles.admin}> if you encounter any problems during the verification process." +) + class Verification(Cog): """User verification and role self-management.""" def __init__(self, bot: Bot): self.bot = bot + self.periodic_ping.start() @property def mod_log(self) -> ModLog: @@ -155,6 +163,34 @@ class Verification(Cog): else: return True + @tasks.loop(hours=12) + async def periodic_ping(self) -> None: + """Every week, mention @everyone to remind them to verify.""" + messages = self.bot.get_channel(Channels.verification).history(limit=10) + need_to_post = True # True if a new message needs to be sent. + + async for message in messages: + if message.author == self.bot.user and message.content == PERIODIC_PING: + delta = datetime.utcnow() - message.created_at # Time since last message. + if delta.days >= 7: # Message is older than a week. + await message.delete() + else: + need_to_post = False + + break + + if need_to_post: + await self.bot.get_channel(Channels.verification).send(PERIODIC_PING) + + @periodic_ping.before_loop + async def before_ping(self) -> None: + """Only start the loop when the bot is ready.""" + await self.bot.wait_until_ready() + + def cog_unload(self) -> None: + """Cancel the periodic ping task when the cog is unloaded.""" + self.periodic_ping.cancel() + def setup(bot: Bot) -> None: """Verification cog load.""" diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py index 3eba9862f..c516508ca 100644 --- a/bot/cogs/watchchannels/bigbrother.py +++ b/bot/cogs/watchchannels/bigbrother.py @@ -5,9 +5,9 @@ from typing import Union from discord import User from discord.ext.commands import Bot, Cog, Context, group +from bot.cogs.moderation.utils import post_infraction from bot.constants import Channels, Roles, Webhooks from bot.decorators import with_role -from bot.utils.moderation import post_infraction from .watchchannel import WatchChannel, proxy_user log = logging.getLogger(__name__) @@ -64,9 +64,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"): await ctx.send(":x: The specified user is already being watched.") return - response = await post_infraction( - ctx, user, type='watch', reason=reason, hidden=True - ) + response = await post_infraction(ctx, user, 'watch', reason, hidden=True) if response is not None: self.watched_users[user.id] = response @@ -111,7 +109,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"): json={'active': False} ) - await post_infraction(ctx, user, type='watch', reason=f"Unwatched: {reason}", hidden=True, active=False) + await post_infraction(ctx, user, 'watch', f"Unwatched: {reason}", hidden=True, active=False) await ctx.send(f":white_check_mark: Messages sent by {user} will no longer be relayed.") diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py index 760e012eb..0bf75a924 100644 --- a/bot/cogs/watchchannels/watchchannel.py +++ b/bot/cogs/watchchannels/watchchannel.py @@ -13,7 +13,7 @@ from discord import Color, Embed, HTTPException, Message, Object, errors from discord.ext.commands import BadArgument, Bot, Cog, Context from bot.api import ResponseCodeError -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig, Icons from bot.pagination import LinePaginator from bot.utils import CogABCMeta, messages diff --git a/bot/decorators.py b/bot/decorators.py index 33a6bcadd..935df4af0 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -3,13 +3,13 @@ import random from asyncio import Lock, sleep from contextlib import suppress from functools import wraps -from typing import Any, Callable, Container, Optional +from typing import Callable, Container, Union from weakref import WeakValueDictionary -from discord import Colour, Embed +from discord import Colour, Embed, Member from discord.errors import NotFound from discord.ext import commands -from discord.ext.commands import CheckFailure, Context +from discord.ext.commands import CheckFailure, Cog, Context from bot.constants import ERROR_REPLIES, RedirectOutput from bot.utils.checks import with_role_check, without_role_check @@ -72,13 +72,13 @@ def locked() -> Callable: Subsequent calls to the command from the same author are ignored until the command has completed invocation. - This decorator has to go before (below) the `command` decorator. + This decorator must go before (below) the `command` decorator. """ def wrap(func: Callable) -> Callable: func.__locks = WeakValueDictionary() @wraps(func) - async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Optional[Any]: + async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None: lock = func.__locks.setdefault(ctx.author.id, Lock()) if lock.locked(): embed = Embed() @@ -93,7 +93,7 @@ def locked() -> Callable: return async with func.__locks.setdefault(ctx.author.id, Lock()): - return await func(self, ctx, *args, **kwargs) + await func(self, ctx, *args, **kwargs) return inner return wrap @@ -103,17 +103,21 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non Changes the channel in the context of the command to redirect the output to a certain channel. Redirect is bypassed if the author has a role to bypass redirection. + + This decorator must go before (below) the `command` decorator. """ def wrap(func: Callable) -> Callable: @wraps(func) - async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Any: + async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None: if ctx.channel.id == destination_channel: log.trace(f"Command {ctx.command.name} was invoked in destination_channel, not redirecting") - return await func(self, ctx, *args, **kwargs) + await func(self, ctx, *args, **kwargs) + return if bypass_roles and any(role.id in bypass_roles for role in ctx.author.roles): log.trace(f"{ctx.author} has role to bypass output redirection") - return await func(self, ctx, *args, **kwargs) + await func(self, ctx, *args, **kwargs) + return redirect_channel = ctx.guild.get_channel(destination_channel) old_channel = ctx.channel @@ -140,3 +144,50 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non log.trace("Redirect output: Deleted invocation message") return inner return wrap + + +def respect_role_hierarchy(target_arg: Union[int, str] = 0) -> Callable: + """ + Ensure the highest role of the invoking member is greater than that of the target member. + + If the condition fails, a warning is sent to the invoking context. A target which is not an + instance of discord.Member will always pass. + + A value of 0 (i.e. position 0) for `target_arg` corresponds to the argument which comes after + `ctx`. If the target argument is a kwarg, its name can instead be given. + + This decorator must go before (below) the `command` decorator. + """ + def wrap(func: Callable) -> Callable: + @wraps(func) + async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None: + try: + target = kwargs[target_arg] + except KeyError: + try: + target = args[target_arg] + except IndexError: + raise ValueError(f"Could not find target argument at position {target_arg}") + except TypeError: + raise ValueError(f"Could not find target kwarg with key {target_arg!r}") + + if not isinstance(target, Member): + log.trace("The target is not a discord.Member; skipping role hierarchy check.") + await func(self, ctx, *args, **kwargs) + return + + cmd = ctx.command.name + actor = ctx.author + if target.top_role >= actor.top_role: + log.info( + f"{actor} ({actor.id}) attempted to {cmd} " + f"{target} ({target.id}), who has an equal or higher top role." + ) + await ctx.send( + f":x: {actor.mention}, you may not {cmd} " + "someone with an equal or higher top role." + ) + else: + await func(self, ctx, *args, **kwargs) + return inner + return wrap diff --git a/bot/resources/stars.json b/bot/resources/stars.json index 8071b9626..c0b253120 100644 --- a/bot/resources/stars.json +++ b/bot/resources/stars.json @@ -1,82 +1,78 @@ -{ - "Adele": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7c/Adele_2016.jpg/220px-Adele_2016.jpg", - "Steven Tyler": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a8/Steven_Tyler_by_Gage_Skidmore_3.jpg/220px-Steven_Tyler_by_Gage_Skidmore_3.jpg", - "Alex Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b3/Alex_Van_Halen_-_Van_Halen_Live.jpg/220px-Alex_Van_Halen_-_Van_Halen_Live.jpg", - "Aretha Franklin": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c6/Aretha_Franklin_1968.jpg/220px-Aretha_Franklin_1968.jpg", - "Ayumi Hamasaki": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Ayumi_Hamasaki_2007.jpg/220px-Ayumi_Hamasaki_2007.jpg", - "Koshi Inaba": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_18.jpg/220px-B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_18.jpg", - "Barbra Streisand": "https://upload.wikimedia.org/wikipedia/en/thumb/a/a3/Barbra_Streisand_-_1966.jpg/220px-Barbra_Streisand_-_1966.jpg", - "Barry Manilow": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2b/BarryManilow.jpg/220px-BarryManilow.jpg", - "Barry White": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b7/Barry_White%2C_Bestanddeelnr_927-0099.jpg/220px-Barry_White%2C_Bestanddeelnr_927-0099.jpg", - "Beyonce": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/Beyonce_-_The_Formation_World_Tour%2C_at_Wembley_Stadium_in_London%2C_England.jpg/220px-Beyonce_-_The_Formation_World_Tour%2C_at_Wembley_Stadium_in_London%2C_England.jpg", - "Billy Joel": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/Billy_Joel_Shankbone_NYC_2009.jpg/220px-Billy_Joel_Shankbone_NYC_2009.jpg", - "Bob Dylan": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/02/Bob_Dylan_-_Azkena_Rock_Festival_2010_2.jpg/220px-Bob_Dylan_-_Azkena_Rock_Festival_2010_2.jpg", - "Bob Marley": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5e/Bob-Marley.jpg/220px-Bob-Marley.jpg", - "Bob Seger": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/16/Bob_Seger_2013.jpg/220px-Bob_Seger_2013.jpg", - "Jon Bon Jovi": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/49/Jon_Bon_Jovi_at_the_2009_Tribeca_Film_Festival_3.jpg/220px-Jon_Bon_Jovi_at_the_2009_Tribeca_Film_Festival_3.jpg", - "Britney Spears": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/Britney_Spears_2013_%28Straighten_Crop%29.jpg/200px-Britney_Spears_2013_%28Straighten_Crop%29.jpg", - "Bruce Springsteen": "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3b/Bruce_Springsteen_-_Roskilde_Festival_2012.jpg/210px-Bruce_Springsteen_-_Roskilde_Festival_2012.jpg", - "Bruno Mars": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b0/BrunoMars24KMagicWorldTourLive_%28cropped%29.jpg/220px-BrunoMars24KMagicWorldTourLive_%28cropped%29.jpg", - "Bryan Adams": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7e/Bryan_Adams_Hamburg_MG_0631_flickr.jpg/300px-Bryan_Adams_Hamburg_MG_0631_flickr.jpg", - "Celine Dion": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/42/Celine_Dion_Concert_Singing_Taking_Chances_2008.jpg/220px-Celine_Dion_Concert_Singing_Taking_Chances_2008.jpg", - "Cher": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Cher_-_Casablanca.jpg/220px-Cher_-_Casablanca.jpg", - "Christina Aguilera": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e7/Christina_Aguilera_in_2016.jpg/220px-Christina_Aguilera_in_2016.jpg", - "David Bowie": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e8/David-Bowie_Chicago_2002-08-08_photoby_Adam-Bielawski-cropped.jpg/220px-David-Bowie_Chicago_2002-08-08_photoby_Adam-Bielawski-cropped.jpg", - "David Lee Roth": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/fb/David_Lee_Roth_-_Van_Halen.jpg/220px-David_Lee_Roth_-_Van_Halen.jpg", - "Donna Summer": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Nobel_Peace_Price_Concert_2009_Donna_Summer3.jpg/220px-Nobel_Peace_Price_Concert_2009_Donna_Summer3.jpg", - "Drake": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/81/Drake_at_the_Velvet_Underground_-_2017_%2835986086223%29_%28cropped%29.jpg/220px-Drake_at_the_Velvet_Underground_-_2017_%2835986086223%29_%28cropped%29.jpg", - "Ed Sheeran": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/55/Ed_Sheeran_2013.jpg/220px-Ed_Sheeran_2013.jpg", - "Eddie Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Eddie_Van_Halen.jpg/300px-Eddie_Van_Halen.jpg", - "Elton John": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/Elton_John_2011_Shankbone_2.JPG/220px-Elton_John_2011_Shankbone_2.JPG", - "Elvis Presley": "https://upload.wikimedia.org/wikipedia/commons/thumb/9/99/Elvis_Presley_promoting_Jailhouse_Rock.jpg/220px-Elvis_Presley_promoting_Jailhouse_Rock.jpg", - "Eminem": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/4a/Eminem_-_Concert_for_Valor_in_Washington%2C_D.C._Nov._11%2C_2014_%282%29_%28Cropped%29.jpg/220px-Eminem_-_Concert_for_Valor_in_Washington%2C_D.C._Nov._11%2C_2014_%282%29_%28Cropped%29.jpg", - "Enya": "https://enya.com/wp-content/themes/enya%20full%20site/images/enya-about.jpg", - "Flo Rida": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8b/Flo_Rida_%286924266548%29.jpg/220px-Flo_Rida_%286924266548%29.jpg", - "Frank Sinatra": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/Frank_Sinatra_%2757.jpg/220px-Frank_Sinatra_%2757.jpg", - "Garth Brooks": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/bc/Garth_Brooks_on_World_Tour_%28crop%29.png/220px-Garth_Brooks_on_World_Tour_%28crop%29.png", - "George Michael": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/George_Michael.jpeg/220px-George_Michael.jpeg", - "George Strait": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/George_Strait_2014_1.jpg/220px-George_Strait_2014_1.jpg", - "James Taylor": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/cf/James_Taylor_-_Columbia.jpg/220px-James_Taylor_-_Columbia.jpg", - "Janet Jackson": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/02/JanetJacksonUnbreakableTourSanFran2015.jpg/220px-JanetJacksonUnbreakableTourSanFran2015.jpg", - "Jay-Z": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b6/Jay-Z.png/220px-Jay-Z.png", - "Johnny Cash": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/JohnnyCash1969.jpg/220px-JohnnyCash1969.jpg", - "Johnny Hallyday": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a1/Johnny_Hallyday_Cannes.jpg/220px-Johnny_Hallyday_Cannes.jpg", - "Julio Iglesias": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ef/Julio_Iglesias09.jpg/220px-Julio_Iglesias09.jpg", - "Justin Bieber": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/Justin_Bieber_in_2015.jpg/220px-Justin_Bieber_in_2015.jpg", - "Justin Timberlake": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ed/Justin_Timberlake_by_Gage_Skidmore_2.jpg/220px-Justin_Timberlake_by_Gage_Skidmore_2.jpg", - "Kanye West": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/11/Kanye_West_at_the_2009_Tribeca_Film_Festival.jpg/220px-Kanye_West_at_the_2009_Tribeca_Film_Festival.jpg", - "Katy Perry": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8a/Katy_Perry_at_Madison_Square_Garden_%2837436531092%29_%28cropped%29.jpg/220px-Katy_Perry_at_Madison_Square_Garden_%2837436531092%29_%28cropped%29.jpg", - "Kenny G": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f4/KennyGHWOFMay2013.jpg/220px-KennyGHWOFMay2013.jpg", - "Kenny Rogers": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8c/KennyRogers.jpg/220px-KennyRogers.jpg", - "Lady Gaga": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2c/Lady_Gaga_interview_2016.jpg/220px-Lady_Gaga_interview_2016.jpg", - "Lil Wayne": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a6/Lil_Wayne_%2823513397583%29.jpg/220px-Lil_Wayne_%2823513397583%29.jpg", - "Linda Ronstadt": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/LindaRonstadtPerforming.jpg/220px-LindaRonstadtPerforming.jpg", - "Lionel Richie": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/cd/Lionel_Richie_2017.jpg/220px-Lionel_Richie_2017.jpg", - "Madonna": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/Madonna_Rebel_Heart_Tour_2015_-_Stockholm_%2823051472299%29_%28cropped_2%29.jpg/220px-Madonna_Rebel_Heart_Tour_2015_-_Stockholm_%2823051472299%29_%28cropped_2%29.jpg", - "Mariah Carey": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/Mariah_Carey_WBLS_2018_Interview_4.jpg/220px-Mariah_Carey_WBLS_2018_Interview_4.jpg", - "Meat Loaf": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e7/Meat_Loaf.jpg/220px-Meat_Loaf.jpg", - "Michael Jackson": "https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Michael_Jackson_in_1988.jpg/220px-Michael_Jackson_in_1988.jpg", - "Neil Diamond": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f4/Neil_Diamond_HWOF_Aug_2012_other_%28levels_adjusted_and_cropped%29.jpg/220px-Neil_Diamond_HWOF_Aug_2012_other_%28levels_adjusted_and_cropped%29.jpg", - "Nicki Minaj": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Nicki_Minaj_MTV_VMAs_4.jpg/250px-Nicki_Minaj_MTV_VMAs_4.jpg", - "Olivia Newton-John": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c7/Olivia_Newton-John_2.jpg/220px-Olivia_Newton-John_2.jpg", - "Paul McCartney": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5d/Paul_McCartney_-_Out_There_Concert_-_140420-5941-jikatu_%2813950091384%29.jpg/220px-Paul_McCartney_-_Out_There_Concert_-_140420-5941-jikatu_%2813950091384%29.jpg", - "Phil Collins": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/1_collins.jpg/220px-1_collins.jpg", - "Pink": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/1a/P%21nk_Live_2013.jpg/220px-P%21nk_Live_2013.jpg", - "Prince": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b2/Prince_1983_1st_Avenue.jpg/220px-Prince_1983_1st_Avenue.jpg", - "Reba McEntire": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e0/Reba_McEntire_by_Gage_Skidmore.jpg/220px-Reba_McEntire_by_Gage_Skidmore.jpg", - "Rihanna": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/47/Rihanna_concert_in_Washington_DC_%282%29.jpg/250px-Rihanna_concert_in_Washington_DC_%282%29.jpg", - "Robbie Williams": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/21/Robbie_Williams.jpg/220px-Robbie_Williams.jpg", - "Rod Stewart": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/57/Rod_stewart_05111976_12_400.jpg/220px-Rod_stewart_05111976_12_400.jpg", - "Carlos Santana": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Santana_2010.jpg/220px-Santana_2010.jpg", - "Shania Twain": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ee/ShaniaTwainJunoAwardsMar2011.jpg/220px-ShaniaTwainJunoAwardsMar2011.jpg", - "Stevie Wonder": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Stevie_Wonder_1973.JPG/220px-Stevie_Wonder_1973.JPG", - "Tak Matsumoto": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_22.jpg/220px-B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_22.jpg", - "Taylor Swift": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/25/Taylor_Swift_112_%2818119055110%29_%28cropped%29.jpg/220px-Taylor_Swift_112_%2818119055110%29_%28cropped%29.jpg", - "Tim McGraw": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5f/Tim_McGraw_October_24_2015.jpg/220px-Tim_McGraw_October_24_2015.jpg", - "Tina Turner": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/10/Tina_turner_21021985_01_350.jpg/250px-Tina_turner_21021985_01_350.jpg", - "Tom Petty": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5a/Tom_Petty_Live_in_Horsens_%28cropped2%29.jpg/220px-Tom_Petty_Live_in_Horsens_%28cropped2%29.jpg", - "Tupac Shakur": "https://upload.wikimedia.org/wikipedia/en/thumb/b/b5/Tupac_Amaru_Shakur2.jpg/220px-Tupac_Amaru_Shakur2.jpg", - "Usher": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/fa/Usher_Cannes_2016_retusche.jpg/220px-Usher_Cannes_2016_retusche.jpg", - "Whitney Houston": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Whitney_Houston_Welcome_Home_Heroes_1_cropped.jpg/220px-Whitney_Houston_Welcome_Home_Heroes_1_cropped.jpg", - "Wolfgang Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/Wolfgang_Van_Halen_Different_Kind_of_Truth_2012.jpg/220px-Wolfgang_Van_Halen_Different_Kind_of_Truth_2012.jpg" -} +[ + "Adele", + "Aerosmith", + "Aretha Franklin", + "Ayumi Hamasaki", + "B'z", + "Barbra Streisand", + "Barry Manilow", + "Barry White", + "Beyonce", + "Billy Joel", + "Bob Dylan", + "Bob Marley", + "Bob Seger", + "Bon Jovi", + "Britney Spears", + "Bruce Springsteen", + "Bruno Mars", + "Bryan Adams", + "Celine Dion", + "Cher", + "Christina Aguilera", + "David Bowie", + "Donna Summer", + "Drake", + "Ed Sheeran", + "Elton John", + "Elvis Presley", + "Eminem", + "Enya", + "Flo Rida", + "Frank Sinatra", + "Garth Brooks", + "George Michael", + "George Strait", + "James Taylor", + "Janet Jackson", + "Jay-Z", + "Johnny Cash", + "Johnny Hallyday", + "Julio Iglesias", + "Justin Bieber", + "Justin Timberlake", + "Kanye West", + "Katy Perry", + "Kenny G", + "Kenny Rogers", + "Lady Gaga", + "Lil Wayne", + "Linda Ronstadt", + "Lionel Richie", + "Madonna", + "Mariah Carey", + "Meat Loaf", + "Michael Jackson", + "Neil Diamond", + "Nicki Minaj", + "Olivia Newton-John", + "Paul McCartney", + "Phil Collins", + "Pink", + "Prince", + "Reba McEntire", + "Rihanna", + "Robbie Williams", + "Rod Stewart", + "Santana", + "Shania Twain", + "Stevie Wonder", + "Taylor Swift", + "Tim McGraw", + "Tina Turner", + "Tom Petty", + "Tupac Shakur", + "Usher", + "Van Halen", + "Whitney Houston" +] diff --git a/bot/utils/checks.py b/bot/utils/checks.py index 19f64ff9f..ad892e512 100644 --- a/bot/utils/checks.py +++ b/bot/utils/checks.py @@ -1,6 +1,8 @@ +import datetime import logging +from typing import Callable, Iterable -from discord.ext.commands import Context +from discord.ext.commands import BucketType, Cog, Command, CommandOnCooldown, Context, Cooldown, CooldownMapping log = logging.getLogger(__name__) @@ -42,3 +44,47 @@ def in_channel_check(ctx: Context, channel_id: int) -> bool: log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. " f"The result of the in_channel check was {check}.") return check + + +def cooldown_with_role_bypass(rate: int, per: float, type: BucketType = BucketType.default, *, + bypass_roles: Iterable[int]) -> Callable: + """ + Applies a cooldown to a command, but allows members with certain roles to be ignored. + + NOTE: this replaces the `Command.before_invoke` callback, which *might* introduce problems in the future. + """ + # make it a set so lookup is hash based + bypass = set(bypass_roles) + + # this handles the actual cooldown logic + buckets = CooldownMapping(Cooldown(rate, per, type)) + + # will be called after the command has been parse but before it has been invoked, ensures that + # the cooldown won't be updated if the user screws up their input to the command + async def predicate(cog: Cog, ctx: Context) -> None: + nonlocal bypass, buckets + + if any(role.id in bypass for role in ctx.author.roles): + return + + # cooldown logic, taken from discord.py internals + current = ctx.message.created_at.replace(tzinfo=datetime.timezone.utc).timestamp() + bucket = buckets.get_bucket(ctx.message) + retry_after = bucket.update_rate_limit(current) + if retry_after: + raise CommandOnCooldown(bucket, retry_after) + + def wrapper(command: Command) -> Command: + # NOTE: this could be changed if a subclass of Command were to be used. I didn't see the need for it + # so I just made it raise an error when the decorator is applied before the actual command object exists. + # + # if the `before_invoke` detail is ever a problem then I can quickly just swap over. + if not isinstance(command, Command): + raise TypeError('Decorator `cooldown_with_role_bypass` must be applied after the command decorator. ' + 'This means it has to be above the command decorator in the code.') + + command._before_invoke = predicate + + return command + + return wrapper diff --git a/bot/utils/moderation.py b/bot/utils/moderation.py deleted file mode 100644 index 7860f14a1..000000000 --- a/bot/utils/moderation.py +++ /dev/null @@ -1,72 +0,0 @@ -import logging -from datetime import datetime -from typing import Optional, Union - -from discord import Member, Object, User -from discord.ext.commands import Context - -from bot.api import ResponseCodeError -from bot.constants import Keys - -log = logging.getLogger(__name__) - -HEADERS = {"X-API-KEY": Keys.site_api} - - -async def post_infraction( - ctx: Context, - user: Union[Member, Object, User], - type: str, - reason: str, - expires_at: datetime = None, - hidden: bool = False, - active: bool = True, -) -> Optional[dict]: - """Posts an infraction to the API.""" - payload = { - "actor": ctx.message.author.id, - "hidden": hidden, - "reason": reason, - "type": type, - "user": user.id, - "active": active - } - if expires_at: - payload['expires_at'] = expires_at.isoformat() - - try: - response = await ctx.bot.api_client.post('bot/infractions', json=payload) - except ResponseCodeError as exp: - if exp.status == 400 and 'user' in exp.response_json: - log.info( - f"{ctx.author} tried to add a {type} infraction to `{user.id}`, " - "but that user id was not found in the database." - ) - await ctx.send(f":x: Cannot add infraction, the specified user is not known to the database.") - return - else: - log.exception("An unexpected ResponseCodeError occurred while adding an infraction:") - await ctx.send(":x: There was an error adding the infraction.") - return - - return response - - -async def already_has_active_infraction(ctx: Context, user: Union[Member, Object, User], type: str) -> bool: - """Checks if a user already has an active infraction of the given type.""" - active_infractions = await ctx.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': type, - 'user__id': str(user.id) - } - ) - if active_infractions: - await ctx.send( - f":x: According to my records, this user already has a {type} infraction. " - f"See infraction **#{active_infractions[0]['id']}**." - ) - return True - else: - return False diff --git a/bot/utils/time.py b/bot/utils/time.py index da28f2c76..2aea2c099 100644 --- a/bot/utils/time.py +++ b/bot/utils/time.py @@ -1,5 +1,6 @@ import asyncio import datetime +from typing import Optional import dateutil.parser from dateutil.relativedelta import relativedelta @@ -34,6 +35,9 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: precision specifies the smallest unit of time to include (e.g. "seconds", "minutes"). max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours). """ + if max_units <= 0: + raise ValueError("max_units must be positive") + units = ( ("years", delta.years), ("months", delta.months), @@ -83,15 +87,20 @@ def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max return f"{humanized} ago" -def parse_rfc1123(time_str: str) -> datetime.datetime: +def parse_rfc1123(stamp: str) -> datetime.datetime: """Parse RFC1123 time string into datetime.""" - return datetime.datetime.strptime(time_str, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc) + return datetime.datetime.strptime(stamp, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc) # Hey, this could actually be used in the off_topic_names and reddit cogs :) -async def wait_until(time: datetime.datetime) -> None: - """Wait until a given time.""" - delay = time - datetime.datetime.utcnow() +async def wait_until(time: datetime.datetime, start: Optional[datetime.datetime] = None) -> None: + """ + Wait until a given time. + + :param time: A datetime.datetime object to wait until. + :param start: The start from which to calculate the waiting duration. Defaults to UTC time. + """ + delay = time - (start or datetime.datetime.utcnow()) delay_seconds = delay.total_seconds() # Incorporate a small delay so we don't rapid-fire the event due to time precision errors diff --git a/config-default.yml b/config-default.yml index 827ae4619..ca405337e 100644 --- a/config-default.yml +++ b/config-default.yml @@ -282,7 +282,7 @@ anti_spam: rules: attachments: interval: 10 - max: 3 + max: 9 burst: interval: 10 diff --git a/docker-compose.yml b/docker-compose.yml index 9684a3c62..f79fdba58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,7 +6,7 @@ version: "3.7" services: postgres: - image: postgres:11-alpine + image: postgres:12-alpine environment: POSTGRES_DB: pysite POSTGRES_PASSWORD: pysite diff --git a/tests/helpers.py b/tests/helpers.py index 2908294f7..25059fa3a 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -7,6 +7,10 @@ __all__ = ('AsyncMock', 'async_test') # TODO: Remove me on 3.8 +# Allows you to mock a coroutine. Since the default `__call__` of `MagicMock` +# is not a coroutine, trying to mock a coroutine with it will result in errors +# as the default `__call__` is not awaitable. Use this class for monkeypatching +# coroutines instead. class AsyncMock(MagicMock): async def __call__(self, *args, **kwargs): return super(AsyncMock, self).__call__(*args, **kwargs) diff --git a/tests/test_resources.py b/tests/test_resources.py index 2b17aea64..bcf124f05 100644 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -1,18 +1,13 @@ import json -import mimetypes from pathlib import Path -from urllib.parse import urlparse def test_stars_valid(): - """Validates that `bot/resources/stars.json` contains valid images.""" + """Validates that `bot/resources/stars.json` contains a list of strings.""" path = Path('bot', 'resources', 'stars.json') content = path.read_text() data = json.loads(content) - for url in data.values(): - assert urlparse(url).scheme == 'https' - - mimetype, _ = mimetypes.guess_type(url) - assert mimetype in ('image/jpeg', 'image/png') + for name in data: + assert type(name) is str diff --git a/tests/utils/test_time.py b/tests/utils/test_time.py new file mode 100644 index 000000000..4baa6395c --- /dev/null +++ b/tests/utils/test_time.py @@ -0,0 +1,62 @@ +import asyncio +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest +from dateutil.relativedelta import relativedelta + +from bot.utils import time +from tests.helpers import AsyncMock + + + ('delta', 'precision', 'max_units', 'expected'), + ( + (relativedelta(days=2), 'seconds', 1, '2 days'), + (relativedelta(days=2, hours=2), 'seconds', 2, '2 days and 2 hours'), + (relativedelta(days=2, hours=2), 'seconds', 1, '2 days'), + (relativedelta(days=2, hours=2), 'days', 2, '2 days'), + + # Does not abort for unknown units, as the unit name is checked + # against the attribute of the relativedelta instance. + (relativedelta(days=2, hours=2), 'elephants', 2, '2 days and 2 hours'), + + # Very high maximum units, but it only ever iterates over + # each value the relativedelta might have. + (relativedelta(days=2, hours=2), 'hours', 20, '2 days and 2 hours'), + ) +) +def test_humanize_delta( + delta: relativedelta, + precision: str, + max_units: int, + expected: str +): + assert time.humanize_delta(delta, precision, max_units) == expected + + [email protected]('max_units', (-1, 0)) +def test_humanize_delta_raises_for_invalid_max_units(max_units: int): + with pytest.raises(ValueError, match='max_units must be positive'): + time.humanize_delta(relativedelta(days=2, hours=2), 'hours', max_units) + + + ('stamp', 'expected'), + ( + ('Sun, 15 Sep 2019 12:00:00 GMT', datetime(2019, 9, 15, 12, 0, 0, tzinfo=timezone.utc)), + ) +) +def test_parse_rfc1123(stamp: str, expected: str): + assert time.parse_rfc1123(stamp) == expected + + +@patch('asyncio.sleep', new_callable=AsyncMock) +def test_wait_until(sleep_patch): + start = datetime(2019, 1, 1, 0, 0) + then = datetime(2019, 1, 1, 0, 10) + + # No return value + assert asyncio.run(time.wait_until(then, start)) is None + + sleep_patch.assert_called_once_with(10 * 60) |