aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--bot/__main__.py4
-rw-r--r--bot/cogs/alias.py7
-rw-r--r--bot/cogs/antispam.py12
-rw-r--r--bot/cogs/clean.py2
-rw-r--r--bot/cogs/cogs.py298
-rw-r--r--bot/cogs/defcon.py8
-rw-r--r--bot/cogs/doc.py11
-rw-r--r--bot/cogs/extensions.py236
-rw-r--r--bot/cogs/filtering.py2
-rw-r--r--bot/cogs/help.py48
-rw-r--r--bot/cogs/information.py86
-rw-r--r--bot/cogs/logging.py6
-rw-r--r--bot/cogs/moderation.py1172
-rw-r--r--bot/cogs/moderation/__init__.py25
-rw-r--r--bot/cogs/moderation/infractions.py607
-rw-r--r--bot/cogs/moderation/management.py268
-rw-r--r--bot/cogs/moderation/modlog.py (renamed from bot/cogs/modlog.py)119
-rw-r--r--bot/cogs/moderation/superstarify.py (renamed from bot/cogs/superstarify/__init__.py)101
-rw-r--r--bot/cogs/moderation/utils.py172
-rw-r--r--bot/cogs/off_topic_names.py6
-rw-r--r--bot/cogs/reddit.py6
-rw-r--r--bot/cogs/reminders.py6
-rw-r--r--bot/cogs/superstarify/stars.py87
-rw-r--r--bot/cogs/sync/cog.py6
-rw-r--r--bot/cogs/token_remover.py12
-rw-r--r--bot/cogs/utils.py90
-rw-r--r--bot/cogs/verification.py40
-rw-r--r--bot/cogs/watchchannels/bigbrother.py8
-rw-r--r--bot/cogs/watchchannels/watchchannel.py2
-rw-r--r--bot/decorators.py69
-rw-r--r--bot/resources/stars.json160
-rw-r--r--bot/utils/checks.py48
-rw-r--r--bot/utils/moderation.py72
-rw-r--r--bot/utils/time.py19
-rw-r--r--config-default.yml2
-rw-r--r--docker-compose.yml2
-rw-r--r--tests/helpers.py4
-rw-r--r--tests/test_resources.py11
-rw-r--r--tests/utils/test_time.py62
40 files changed, 1951 insertions, 1948 deletions
diff --git a/.gitignore b/.gitignore
index 261fa179f..a191523b6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -116,3 +116,6 @@ config.yml
# JUnit XML reports from pytest
junit.xml
+
+# Mac OS .DS_Store, which is a file that stores custom attributes of its containing folder
+.DS_Store
diff --git a/bot/__main__.py b/bot/__main__.py
index f25693734..19a7e5ec6 100644
--- a/bot/__main__.py
+++ b/bot/__main__.py
@@ -36,14 +36,13 @@ log.addHandler(APILoggingHandler(bot.api_client))
bot.load_extension("bot.cogs.error_handler")
bot.load_extension("bot.cogs.filtering")
bot.load_extension("bot.cogs.logging")
-bot.load_extension("bot.cogs.modlog")
bot.load_extension("bot.cogs.security")
# Commands, etc
bot.load_extension("bot.cogs.antispam")
bot.load_extension("bot.cogs.bot")
bot.load_extension("bot.cogs.clean")
-bot.load_extension("bot.cogs.cogs")
+bot.load_extension("bot.cogs.extensions")
bot.load_extension("bot.cogs.help")
# Only load this in production
@@ -64,7 +63,6 @@ bot.load_extension("bot.cogs.reddit")
bot.load_extension("bot.cogs.reminders")
bot.load_extension("bot.cogs.site")
bot.load_extension("bot.cogs.snekbox")
-bot.load_extension("bot.cogs.superstarify")
bot.load_extension("bot.cogs.sync")
bot.load_extension("bot.cogs.tags")
bot.load_extension("bot.cogs.token_remover")
diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py
index 0f49a400c..6648805e9 100644
--- a/bot/cogs/alias.py
+++ b/bot/cogs/alias.py
@@ -5,6 +5,7 @@ from typing import Union
from discord import Colour, Embed, Member, User
from discord.ext.commands import Bot, Cog, Command, Context, clean_content, command, group
+from bot.cogs.extensions import Extension
from bot.cogs.watchchannels.watchchannel import proxy_user
from bot.converters import TagNameConverter
from bot.pagination import LinePaginator
@@ -84,9 +85,9 @@ class Alias (Cog):
await self.invoke(ctx, "site rules")
@command(name="reload", hidden=True)
- async def cogs_reload_alias(self, ctx: Context, *, cog_name: str) -> None:
- """Alias for invoking <prefix>cogs reload [cog_name]."""
- await self.invoke(ctx, "cogs reload", cog_name)
+ async def extensions_reload_alias(self, ctx: Context, *extensions: Extension) -> None:
+ """Alias for invoking <prefix>extensions reload [extensions...]."""
+ await self.invoke(ctx, "extensions reload", *extensions)
@command(name="defon", hidden=True)
async def defcon_enable_alias(self, ctx: Context) -> None:
diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py
index 8dfa0ad05..1b394048a 100644
--- a/bot/cogs/antispam.py
+++ b/bot/cogs/antispam.py
@@ -10,7 +10,7 @@ from discord import Colour, Member, Message, NotFound, Object, TextChannel
from discord.ext.commands import Bot, Cog
from bot import rules
-from bot.cogs.modlog import ModLog
+from bot.cogs.moderation import ModLog
from bot.constants import (
AntiSpam as AntiSpamConfig, Channels,
Colours, DEBUG_MODE, Event, Filter,
@@ -107,14 +107,16 @@ class AntiSpam(Cog):
self.message_deletion_queue = dict()
self.queue_consumption_tasks = dict()
+ self.bot.loop.create_task(self.alert_on_validation_error())
+
@property
def mod_log(self) -> ModLog:
"""Allows for easy access of the ModLog cog."""
return self.bot.get_cog("ModLog")
- @Cog.listener()
- async def on_ready(self) -> None:
+ async def alert_on_validation_error(self) -> None:
"""Unloads the cog and alerts admins if configuration validation failed."""
+ await self.bot.wait_until_ready()
if self.validation_errors:
body = "**The following errors were encountered:**\n"
body += "\n".join(f"- {error}" for error in self.validation_errors.values())
@@ -207,8 +209,10 @@ class AntiSpam(Cog):
if not any(role.id == self.muted_role.id for role in member.roles):
remove_role_after = AntiSpamConfig.punishment['remove_after']
- # We need context, let's get it
+ # Get context and make sure the bot becomes the actor of infraction by patching the `author` attributes
context = await self.bot.get_context(msg)
+ context.author = self.bot.user
+ context.message.author = self.bot.user
# Since we're going to invoke the tempmute command directly, we need to manually call the converter.
dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S")
diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py
index 1c0c9a7a8..dca411d01 100644
--- a/bot/cogs/clean.py
+++ b/bot/cogs/clean.py
@@ -6,7 +6,7 @@ from typing import Optional
from discord import Colour, Embed, Message, User
from discord.ext.commands import Bot, Cog, Context, group
-from bot.cogs.modlog import ModLog
+from bot.cogs.moderation import ModLog
from bot.constants import (
Channels, CleanMessages, Colours, Event,
Icons, MODERATION_ROLES, NEGATIVE_REPLIES
diff --git a/bot/cogs/cogs.py b/bot/cogs/cogs.py
deleted file mode 100644
index 1f6ccd09c..000000000
--- a/bot/cogs/cogs.py
+++ /dev/null
@@ -1,298 +0,0 @@
-import logging
-import os
-
-from discord import Colour, Embed
-from discord.ext.commands import Bot, Cog, Context, group
-
-from bot.constants import (
- Emojis, MODERATION_ROLES, Roles, URLs
-)
-from bot.decorators import with_role
-from bot.pagination import LinePaginator
-
-log = logging.getLogger(__name__)
-
-KEEP_LOADED = ["bot.cogs.cogs", "bot.cogs.modlog"]
-
-
-class Cogs(Cog):
- """Cog management commands."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.cogs = {}
-
- # Load up the cog names
- log.info("Initializing cog names...")
- for filename in os.listdir("bot/cogs"):
- if filename.endswith(".py") and "_" not in filename:
- if os.path.isfile(f"bot/cogs/{filename}"):
- cog = filename[:-3]
-
- self.cogs[cog] = f"bot.cogs.{cog}"
-
- # Allow reverse lookups by reversing the pairs
- self.cogs.update({v: k for k, v in self.cogs.items()})
-
- @group(name='cogs', aliases=('c',), invoke_without_command=True)
- @with_role(*MODERATION_ROLES, Roles.core_developer)
- async def cogs_group(self, ctx: Context) -> None:
- """Load, unload, reload, and list active cogs."""
- await ctx.invoke(self.bot.get_command("help"), "cogs")
-
- @cogs_group.command(name='load', aliases=('l',))
- @with_role(*MODERATION_ROLES, Roles.core_developer)
- async def load_command(self, ctx: Context, cog: str) -> None:
- """
- Load up an unloaded cog, given the module containing it.
-
- You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
- entire module directly.
- """
- cog = cog.lower()
-
- embed = Embed()
- embed.colour = Colour.red()
-
- embed.set_author(
- name="Python Bot (Cogs)",
- url=URLs.github_bot_repo,
- icon_url=URLs.bot_avatar
- )
-
- if cog in self.cogs:
- full_cog = self.cogs[cog]
- elif "." in cog:
- full_cog = cog
- else:
- full_cog = None
- log.warning(f"{ctx.author} requested we load the '{cog}' cog, but that cog doesn't exist.")
- embed.description = f"Unknown cog: {cog}"
-
- if full_cog:
- if full_cog not in self.bot.extensions:
- try:
- self.bot.load_extension(full_cog)
- except ImportError:
- log.exception(f"{ctx.author} requested we load the '{cog}' cog, "
- f"but the cog module {full_cog} could not be found!")
- embed.description = f"Invalid cog: {cog}\n\nCould not find cog module {full_cog}"
- except Exception as e:
- log.exception(f"{ctx.author} requested we load the '{cog}' cog, "
- "but the loading failed")
- embed.description = f"Failed to load cog: {cog}\n\n{e.__class__.__name__}: {e}"
- else:
- log.debug(f"{ctx.author} requested we load the '{cog}' cog. Cog loaded!")
- embed.description = f"Cog loaded: {cog}"
- embed.colour = Colour.green()
- else:
- log.warning(f"{ctx.author} requested we load the '{cog}' cog, but the cog was already loaded!")
- embed.description = f"Cog {cog} is already loaded"
-
- await ctx.send(embed=embed)
-
- @cogs_group.command(name='unload', aliases=('ul',))
- @with_role(*MODERATION_ROLES, Roles.core_developer)
- async def unload_command(self, ctx: Context, cog: str) -> None:
- """
- Unload an already-loaded cog, given the module containing it.
-
- You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
- entire module directly.
- """
- cog = cog.lower()
-
- embed = Embed()
- embed.colour = Colour.red()
-
- embed.set_author(
- name="Python Bot (Cogs)",
- url=URLs.github_bot_repo,
- icon_url=URLs.bot_avatar
- )
-
- if cog in self.cogs:
- full_cog = self.cogs[cog]
- elif "." in cog:
- full_cog = cog
- else:
- full_cog = None
- log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but that cog doesn't exist.")
- embed.description = f"Unknown cog: {cog}"
-
- if full_cog:
- if full_cog in KEEP_LOADED:
- log.warning(f"{ctx.author} requested we unload `{full_cog}`, that sneaky pete. We said no.")
- embed.description = f"You may not unload `{full_cog}`!"
- elif full_cog in self.bot.extensions:
- try:
- self.bot.unload_extension(full_cog)
- except Exception as e:
- log.exception(f"{ctx.author} requested we unload the '{cog}' cog, "
- "but the unloading failed")
- embed.description = f"Failed to unload cog: {cog}\n\n```{e}```"
- else:
- log.debug(f"{ctx.author} requested we unload the '{cog}' cog. Cog unloaded!")
- embed.description = f"Cog unloaded: {cog}"
- embed.colour = Colour.green()
- else:
- log.warning(f"{ctx.author} requested we unload the '{cog}' cog, but the cog wasn't loaded!")
- embed.description = f"Cog {cog} is not loaded"
-
- await ctx.send(embed=embed)
-
- @cogs_group.command(name='reload', aliases=('r',))
- @with_role(*MODERATION_ROLES, Roles.core_developer)
- async def reload_command(self, ctx: Context, cog: str) -> None:
- """
- Reload an unloaded cog, given the module containing it.
-
- You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
- entire module directly.
-
- If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the
- bot/cogs directory will be loaded.
- """
- cog = cog.lower()
-
- embed = Embed()
- embed.colour = Colour.red()
-
- embed.set_author(
- name="Python Bot (Cogs)",
- url=URLs.github_bot_repo,
- icon_url=URLs.bot_avatar
- )
-
- if cog == "*":
- full_cog = cog
- elif cog in self.cogs:
- full_cog = self.cogs[cog]
- elif "." in cog:
- full_cog = cog
- else:
- full_cog = None
- log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but that cog doesn't exist.")
- embed.description = f"Unknown cog: {cog}"
-
- if full_cog:
- if full_cog == "*":
- all_cogs = [
- f"bot.cogs.{fn[:-3]}" for fn in os.listdir("bot/cogs")
- if os.path.isfile(f"bot/cogs/{fn}") and fn.endswith(".py") and "_" not in fn
- ]
-
- failed_unloads = {}
- failed_loads = {}
-
- unloaded = 0
- loaded = 0
-
- for loaded_cog in self.bot.extensions.copy().keys():
- try:
- self.bot.unload_extension(loaded_cog)
- except Exception as e:
- failed_unloads[loaded_cog] = f"{e.__class__.__name__}: {e}"
- else:
- unloaded += 1
-
- for unloaded_cog in all_cogs:
- try:
- self.bot.load_extension(unloaded_cog)
- except Exception as e:
- failed_loads[unloaded_cog] = f"{e.__class__.__name__}: {e}"
- else:
- loaded += 1
-
- lines = [
- "**All cogs reloaded**",
- f"**Unloaded**: {unloaded} / **Loaded**: {loaded}"
- ]
-
- if failed_unloads:
- lines.append("\n**Unload failures**")
-
- for cog, error in failed_unloads:
- lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`")
-
- if failed_loads:
- lines.append("\n**Load failures**")
-
- for cog, error in failed_loads.items():
- lines.append(f"{Emojis.status_dnd} **{cog}:** `{error}`")
-
- log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n"
- f"{lines}")
-
- await LinePaginator.paginate(lines, ctx, embed, empty=False)
- return
-
- elif full_cog in self.bot.extensions:
- try:
- self.bot.unload_extension(full_cog)
- self.bot.load_extension(full_cog)
- except Exception as e:
- log.exception(f"{ctx.author} requested we reload the '{cog}' cog, "
- "but the unloading failed")
- embed.description = f"Failed to reload cog: {cog}\n\n```{e}```"
- else:
- log.debug(f"{ctx.author} requested we reload the '{cog}' cog. Cog reloaded!")
- embed.description = f"Cog reload: {cog}"
- embed.colour = Colour.green()
- else:
- log.warning(f"{ctx.author} requested we reload the '{cog}' cog, but the cog wasn't loaded!")
- embed.description = f"Cog {cog} is not loaded"
-
- await ctx.send(embed=embed)
-
- @cogs_group.command(name='list', aliases=('all',))
- @with_role(*MODERATION_ROLES, Roles.core_developer)
- async def list_command(self, ctx: Context) -> None:
- """
- Get a list of all cogs, including their loaded status.
-
- Gray indicates that the cog is unloaded. Green indicates that the cog is currently loaded.
- """
- embed = Embed()
- lines = []
- cogs = {}
-
- embed.colour = Colour.blurple()
- embed.set_author(
- name="Python Bot (Cogs)",
- url=URLs.github_bot_repo,
- icon_url=URLs.bot_avatar
- )
-
- for key, _value in self.cogs.items():
- if "." not in key:
- continue
-
- if key in self.bot.extensions:
- cogs[key] = True
- else:
- cogs[key] = False
-
- for key in self.bot.extensions.keys():
- if key not in self.cogs:
- cogs[key] = True
-
- for cog, loaded in sorted(cogs.items(), key=lambda x: x[0]):
- if cog in self.cogs:
- cog = self.cogs[cog]
-
- if loaded:
- status = Emojis.status_online
- else:
- status = Emojis.status_offline
-
- lines.append(f"{status} {cog}")
-
- log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.")
- await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False)
-
-
-def setup(bot: Bot) -> None:
- """Cogs cog load."""
- bot.add_cog(Cogs(bot))
- log.info("Cog loaded: Cogs")
diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py
index 048d8a683..70e101baa 100644
--- a/bot/cogs/defcon.py
+++ b/bot/cogs/defcon.py
@@ -4,7 +4,7 @@ from datetime import datetime, timedelta
from discord import Colour, Embed, Member
from discord.ext.commands import Bot, Cog, Context, group
-from bot.cogs.modlog import ModLog
+from bot.cogs.moderation import ModLog
from bot.constants import Channels, Colours, Emojis, Event, Icons, Roles
from bot.decorators import with_role
@@ -35,14 +35,16 @@ class Defcon(Cog):
self.channel = None
self.days = timedelta(days=0)
+ self.bot.loop.create_task(self.sync_settings())
+
@property
def mod_log(self) -> ModLog:
"""Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
- @Cog.listener()
- async def on_ready(self) -> None:
+ async def sync_settings(self) -> None:
"""On cog load, try to synchronize DEFCON settings to the API."""
+ await self.bot.wait_until_ready()
self.channel = await self.bot.fetch_channel(Channels.defcon)
try:
response = await self.bot.api_client.get('bot/bot-settings/defcon')
diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py
index c9e6b3b91..a13464bff 100644
--- a/bot/cogs/doc.py
+++ b/bot/cogs/doc.py
@@ -126,9 +126,11 @@ class Doc(commands.Cog):
self.bot = bot
self.inventories = {}
- @commands.Cog.listener()
- async def on_ready(self) -> None:
- """Refresh documentation inventory."""
+ self.bot.loop.create_task(self.init_refresh_inventory())
+
+ async def init_refresh_inventory(self) -> None:
+ """Refresh documentation inventory on cog initialization."""
+ await self.bot.wait_until_ready()
await self.refresh_inventory()
async def update_single(
@@ -207,6 +209,9 @@ class Doc(commands.Cog):
symbol_heading = soup.find(id=symbol_id)
signature_buffer = []
+ if symbol_heading is None:
+ return None
+
# Traverse the tags of the signature header and ignore any
# unwanted symbols from it. Add all of it to a temporary buffer.
for tag in symbol_heading.strings:
diff --git a/bot/cogs/extensions.py b/bot/cogs/extensions.py
new file mode 100644
index 000000000..bb66e0b8e
--- /dev/null
+++ b/bot/cogs/extensions.py
@@ -0,0 +1,236 @@
+import functools
+import logging
+import typing as t
+from enum import Enum
+from pkgutil import iter_modules
+
+from discord import Colour, Embed
+from discord.ext import commands
+from discord.ext.commands import Bot, Context, group
+
+from bot.constants import Emojis, MODERATION_ROLES, Roles, URLs
+from bot.pagination import LinePaginator
+from bot.utils.checks import with_role_check
+
+log = logging.getLogger(__name__)
+
+UNLOAD_BLACKLIST = {"bot.cogs.extensions", "bot.cogs.modlog"}
+EXTENSIONS = frozenset(
+ ext.name
+ for ext in iter_modules(("bot/cogs",), "bot.cogs.")
+ if ext.name[-1] != "_"
+)
+
+
+class Action(Enum):
+ """Represents an action to perform on an extension."""
+
+ # Need to be partial otherwise they are considered to be function definitions.
+ LOAD = functools.partial(Bot.load_extension)
+ UNLOAD = functools.partial(Bot.unload_extension)
+ RELOAD = functools.partial(Bot.reload_extension)
+
+
+class Extension(commands.Converter):
+ """
+ Fully qualify the name of an extension and ensure it exists.
+
+ The * and ** values bypass this when used with the reload command.
+ """
+
+ async def convert(self, ctx: Context, argument: str) -> str:
+ """Fully qualify the name of an extension and ensure it exists."""
+ # Special values to reload all extensions
+ if argument == "*" or argument == "**":
+ return argument
+
+ argument = argument.lower()
+
+ if "." not in argument:
+ argument = f"bot.cogs.{argument}"
+
+ if argument in EXTENSIONS:
+ return argument
+ else:
+ raise commands.BadArgument(f":x: Could not find the extension `{argument}`.")
+
+
+class Extensions(commands.Cog):
+ """Extension management commands."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True)
+ async def extensions_group(self, ctx: Context) -> None:
+ """Load, unload, reload, and list loaded extensions."""
+ await ctx.invoke(self.bot.get_command("help"), "extensions")
+
+ @extensions_group.command(name="load", aliases=("l",))
+ async def load_command(self, ctx: Context, *extensions: Extension) -> None:
+ """
+ Load extensions given their fully qualified or unqualified names.
+
+ If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded.
+ """ # noqa: W605
+ if not extensions:
+ await ctx.invoke(self.bot.get_command("help"), "extensions load")
+ return
+
+ if "*" in extensions or "**" in extensions:
+ extensions = set(EXTENSIONS) - set(self.bot.extensions.keys())
+
+ msg = self.batch_manage(Action.LOAD, *extensions)
+ await ctx.send(msg)
+
+ @extensions_group.command(name="unload", aliases=("ul",))
+ async def unload_command(self, ctx: Context, *extensions: Extension) -> None:
+ """
+ Unload currently loaded extensions given their fully qualified or unqualified names.
+
+ If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded.
+ """ # noqa: W605
+ if not extensions:
+ await ctx.invoke(self.bot.get_command("help"), "extensions unload")
+ return
+
+ blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions))
+
+ if blacklisted:
+ msg = f":x: The following extension(s) may not be unloaded:```{blacklisted}```"
+ else:
+ if "*" in extensions or "**" in extensions:
+ extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST
+
+ msg = self.batch_manage(Action.UNLOAD, *extensions)
+
+ await ctx.send(msg)
+
+ @extensions_group.command(name="reload", aliases=("r",))
+ async def reload_command(self, ctx: Context, *extensions: Extension) -> None:
+ """
+ Reload extensions given their fully qualified or unqualified names.
+
+ If an extension fails to be reloaded, it will be rolled-back to the prior working state.
+
+ If '\*' is given as the name, all currently loaded extensions will be reloaded.
+ If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded.
+ """ # noqa: W605
+ if not extensions:
+ await ctx.invoke(self.bot.get_command("help"), "extensions reload")
+ return
+
+ if "**" in extensions:
+ extensions = EXTENSIONS
+ elif "*" in extensions:
+ extensions = set(self.bot.extensions.keys()) | set(extensions)
+ extensions.remove("*")
+
+ msg = self.batch_manage(Action.RELOAD, *extensions)
+
+ await ctx.send(msg)
+
+ @extensions_group.command(name="list", aliases=("all",))
+ async def list_command(self, ctx: Context) -> None:
+ """
+ Get a list of all extensions, including their loaded status.
+
+ Grey indicates that the extension is unloaded.
+ Green indicates that the extension is currently loaded.
+ """
+ embed = Embed()
+ lines = []
+
+ embed.colour = Colour.blurple()
+ embed.set_author(
+ name="Extensions List",
+ url=URLs.github_bot_repo,
+ icon_url=URLs.bot_avatar
+ )
+
+ for ext in sorted(list(EXTENSIONS)):
+ if ext in self.bot.extensions:
+ status = Emojis.status_online
+ else:
+ status = Emojis.status_offline
+
+ ext = ext.rsplit(".", 1)[1]
+ lines.append(f"{status} {ext}")
+
+ log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.")
+ await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False)
+
+ def batch_manage(self, action: Action, *extensions: str) -> str:
+ """
+ Apply an action to multiple extensions and return a message with the results.
+
+ If only one extension is given, it is deferred to `manage()`.
+ """
+ if len(extensions) == 1:
+ msg, _ = self.manage(action, extensions[0])
+ return msg
+
+ verb = action.name.lower()
+ failures = {}
+
+ for extension in extensions:
+ _, error = self.manage(action, extension)
+ if error:
+ failures[extension] = error
+
+ emoji = ":x:" if failures else ":ok_hand:"
+ msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed."
+
+ if failures:
+ failures = "\n".join(f"{ext}\n {err}" for ext, err in failures.items())
+ msg += f"\nFailures:```{failures}```"
+
+ log.debug(f"Batch {verb}ed extensions.")
+
+ return msg
+
+ def manage(self, action: Action, ext: str) -> t.Tuple[str, t.Optional[str]]:
+ """Apply an action to an extension and return the status message and any error message."""
+ verb = action.name.lower()
+ error_msg = None
+
+ try:
+ action.value(self.bot, ext)
+ except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded):
+ if action is Action.RELOAD:
+ # When reloading, just load the extension if it was not loaded.
+ return self.manage(Action.LOAD, ext)
+
+ msg = f":x: Extension `{ext}` is already {verb}ed."
+ log.debug(msg[4:])
+ except Exception as e:
+ if hasattr(e, "original"):
+ e = e.original
+
+ log.exception(f"Extension '{ext}' failed to {verb}.")
+
+ error_msg = f"{e.__class__.__name__}: {e}"
+ msg = f":x: Failed to {verb} extension `{ext}`:\n```{error_msg}```"
+ else:
+ msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`."
+ log.debug(msg[10:])
+
+ return msg, error_msg
+
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators and core developers to invoke the commands in this cog."""
+ return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developer)
+
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Handle BadArgument errors locally to prevent the help command from showing."""
+ if isinstance(error, commands.BadArgument):
+ await ctx.send(str(error))
+ error.handled = True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Extensions cog."""
+ bot.add_cog(Extensions(bot))
+ log.info("Cog loaded: Extensions")
diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py
index bd8c6ed67..265ae5160 100644
--- a/bot/cogs/filtering.py
+++ b/bot/cogs/filtering.py
@@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta
from discord import Colour, DMChannel, Member, Message, TextChannel
from discord.ext.commands import Bot, Cog
-from bot.cogs.modlog import ModLog
+from bot.cogs.moderation import ModLog
from bot.constants import (
Channels, Colours, DEBUG_MODE,
Filter, Icons, URLs
diff --git a/bot/cogs/help.py b/bot/cogs/help.py
index 37d12b2d5..9607dbd8d 100644
--- a/bot/cogs/help.py
+++ b/bot/cogs/help.py
@@ -1,5 +1,4 @@
import asyncio
-import inspect
import itertools
from collections import namedtuple
from contextlib import suppress
@@ -61,6 +60,12 @@ class HelpSession:
The message object that's showing the help contents.
* destination: `discord.abc.Messageable`
Where the help message is to be sent to.
+
+ Cogs can be grouped into custom categories. All cogs with the same category will be displayed
+ under a single category name in the help output. Custom categories are defined inside the cogs
+ as a class attribute named `category`. A description can also be specified with the attribute
+ `category_description`. If a description is not found in at least one cog, the default will be
+ the regular description (class docstring) of the first cog found in the category.
"""
def __init__(
@@ -107,12 +112,31 @@ class HelpSession:
if command:
return command
- cog = self._bot.cogs.get(query)
- if cog:
+ # Find all cog categories that match.
+ cog_matches = []
+ description = None
+ for cog in self._bot.cogs.values():
+ if hasattr(cog, "category") and cog.category == query:
+ cog_matches.append(cog)
+ if hasattr(cog, "category_description"):
+ description = cog.category_description
+
+ # Try to search by cog name if no categories match.
+ if not cog_matches:
+ cog = self._bot.cogs.get(query)
+
+ # Don't consider it a match if the cog has a category.
+ if cog and not hasattr(cog, "category"):
+ cog_matches = [cog]
+
+ if cog_matches:
+ cog = cog_matches[0]
+ cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs
+
return Cog(
- name=cog.qualified_name,
- description=inspect.getdoc(cog),
- commands=[c for c in self._bot.commands if c.cog is cog]
+ name=cog.category if hasattr(cog, "category") else cog.qualified_name,
+ description=description or cog.description,
+ commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list
)
self._handle_not_found(query)
@@ -207,8 +231,16 @@ class HelpSession:
A zero width space is used as a prefix for results with no cogs to force them last in ordering.
"""
- cog = cmd.cog_name
- return f'**{cog}**' if cog else f'**\u200bNo Category:**'
+ if cmd.cog:
+ try:
+ if cmd.cog.category:
+ return f'**{cmd.cog.category}**'
+ except AttributeError:
+ pass
+
+ return f'**{cmd.cog_name}**'
+ else:
+ return "**\u200bNo Category:**"
def _get_command_params(self, cmd: Command) -> str:
"""
diff --git a/bot/cogs/information.py b/bot/cogs/information.py
index 1afb37103..3a7ba0444 100644
--- a/bot/cogs/information.py
+++ b/bot/cogs/information.py
@@ -1,14 +1,18 @@
import colorsys
import logging
+import pprint
import textwrap
import typing
+from typing import Any, Mapping, Optional
+import discord
from discord import CategoryChannel, Colour, Embed, Member, Role, TextChannel, VoiceChannel, utils
-from discord.ext.commands import Bot, Cog, Context, command
+from discord.ext import commands
+from discord.ext.commands import Bot, BucketType, Cog, Context, command, group
from bot.constants import Channels, Emojis, MODERATION_ROLES, STAFF_ROLES
-from bot.decorators import InChannelCheckFailure, with_role
-from bot.utils.checks import with_role_check
+from bot.decorators import InChannelCheckFailure, in_channel, with_role
+from bot.utils.checks import cooldown_with_role_bypass, with_role_check
from bot.utils.time import time_since
log = logging.getLogger(__name__)
@@ -229,6 +233,82 @@ class Information(Cog):
await ctx.send(embed=embed)
+ def format_fields(self, mapping: Mapping[str, Any], field_width: Optional[int] = None) -> str:
+ """Format a mapping to be readable to a human."""
+ # sorting is technically superfluous but nice if you want to look for a specific field
+ fields = sorted(mapping.items(), key=lambda item: item[0])
+
+ if field_width is None:
+ field_width = len(max(mapping.keys(), key=len))
+
+ out = ''
+
+ for key, val in fields:
+ if isinstance(val, dict):
+ # if we have dicts inside dicts we want to apply the same treatment to the inner dictionaries
+ inner_width = int(field_width * 1.6)
+ val = '\n' + self.format_fields(val, field_width=inner_width)
+
+ elif isinstance(val, str):
+ # split up text since it might be long
+ text = textwrap.fill(val, width=100, replace_whitespace=False)
+
+ # indent it, I guess you could do this with `wrap` and `join` but this is nicer
+ val = textwrap.indent(text, ' ' * (field_width + len(': ')))
+
+ # the first line is already indented so we `str.lstrip` it
+ val = val.lstrip()
+
+ if key == 'color':
+ # makes the base 10 representation of a hex number readable to humans
+ val = hex(val)
+
+ out += '{0:>{width}}: {1}\n'.format(key, val, width=field_width)
+
+ # remove trailing whitespace
+ return out.rstrip()
+
+ @cooldown_with_role_bypass(2, 60 * 3, BucketType.member, bypass_roles=STAFF_ROLES)
+ @group(invoke_without_command=True)
+ @in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
+ async def raw(self, ctx: Context, *, message: discord.Message, json: bool = False) -> None:
+ """Shows information about the raw API response."""
+ # I *guess* it could be deleted right as the command is invoked but I felt like it wasn't worth handling
+ # doing this extra request is also much easier than trying to convert everything back into a dictionary again
+ raw_data = await ctx.bot.http.get_message(message.channel.id, message.id)
+
+ paginator = commands.Paginator()
+
+ def add_content(title: str, content: str) -> None:
+ paginator.add_line(f'== {title} ==\n')
+ # replace backticks as it breaks out of code blocks. Spaces seemed to be the most reasonable solution.
+ # we hope it's not close to 2000
+ paginator.add_line(content.replace('```', '`` `'))
+ paginator.close_page()
+
+ if message.content:
+ add_content('Raw message', message.content)
+
+ transformer = pprint.pformat if json else self.format_fields
+ for field_name in ('embeds', 'attachments'):
+ data = raw_data[field_name]
+
+ if not data:
+ continue
+
+ total = len(data)
+ for current, item in enumerate(data, start=1):
+ title = f'Raw {field_name} ({current}/{total})'
+ add_content(title, transformer(item))
+
+ for page in paginator.pages:
+ await ctx.send(page)
+
+ @raw.command()
+ async def json(self, ctx: Context, message: discord.Message) -> None:
+ """Shows information about the raw API response in a copy-pasteable Python format."""
+ await ctx.invoke(self.raw, message=message, json=True)
+
def setup(bot: Bot) -> None:
"""Information cog load."""
diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py
index 8e47bcc36..c92b619ff 100644
--- a/bot/cogs/logging.py
+++ b/bot/cogs/logging.py
@@ -15,9 +15,11 @@ class Logging(Cog):
def __init__(self, bot: Bot):
self.bot = bot
- @Cog.listener()
- async def on_ready(self) -> None:
+ self.bot.loop.create_task(self.startup_greeting())
+
+ async def startup_greeting(self) -> None:
"""Announce our presence to the configured devlog channel."""
+ await self.bot.wait_until_ready()
log.info("Bot connected!")
embed = Embed(description="Connected!")
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py
deleted file mode 100644
index 5aa873a47..000000000
--- a/bot/cogs/moderation.py
+++ /dev/null
@@ -1,1172 +0,0 @@
-import asyncio
-import logging
-import textwrap
-from datetime import datetime
-from typing import Dict, Union
-
-from discord import (
- Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User
-)
-from discord.ext.commands import (
- BadArgument, BadUnionArgument, Bot, Cog, Context, command, group
-)
-
-from bot import constants
-from bot.cogs.modlog import ModLog
-from bot.constants import Colours, Event, Icons, MODERATION_ROLES
-from bot.converters import Duration, InfractionSearchQuery
-from bot.decorators import with_role
-from bot.pagination import LinePaginator
-from bot.utils.moderation import already_has_active_infraction, post_infraction
-from bot.utils.scheduling import Scheduler, create_task
-from bot.utils.time import INFRACTION_FORMAT, format_infraction, wait_until
-
-log = logging.getLogger(__name__)
-
-INFRACTION_ICONS = {
- "Mute": Icons.user_mute,
- "Kick": Icons.sign_out,
- "Ban": Icons.user_ban
-}
-RULES_URL = "https://pythondiscord.com/pages/rules"
-APPEALABLE_INFRACTIONS = ("Ban", "Mute")
-
-
-def proxy_user(user_id: str) -> Object:
- """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
- try:
- user_id = int(user_id)
- except ValueError:
- raise BadArgument
- user = Object(user_id)
- user.mention = user.id
- user.avatar_url_as = lambda static_format: None
- return user
-
-
-def permanent_duration(expires_at: str) -> str:
- """Only allow an expiration to be 'permanent' if it is a string."""
- expires_at = expires_at.lower()
- if expires_at != "permanent":
- raise BadArgument
- else:
- return expires_at
-
-
-UserTypes = Union[Member, User, proxy_user]
-
-
-class Moderation(Scheduler, Cog):
- """Server moderation tools."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self._muted_role = Object(constants.Roles.muted)
- super().__init__()
-
- @property
- def mod_log(self) -> ModLog:
- """Get currently loaded ModLog cog instance."""
- return self.bot.get_cog("ModLog")
-
- @Cog.listener()
- async def on_ready(self) -> None:
- """Schedule expiration for previous infractions."""
- # Schedule expiration for previous infractions
- infractions = await self.bot.api_client.get(
- 'bot/infractions', params={'active': 'true'}
- )
- for infraction in infractions:
- if infraction["expires_at"] is not None:
- self.schedule_task(self.bot.loop, infraction["id"], infraction)
-
- @Cog.listener()
- async def on_member_join(self, member: Member) -> None:
- """Reapply active mute infractions for returning members."""
- active_mutes = await self.bot.api_client.get(
- 'bot/infractions',
- params={'user__id': str(member.id), 'type': 'mute', 'active': 'true'}
- )
- if not active_mutes:
- return
-
- # assume a single mute because of restrictions elsewhere
- mute = active_mutes[0]
-
- # transform expiration to delay in seconds
- expiration_datetime = datetime.fromisoformat(mute["expires_at"][:-1])
- delay = expiration_datetime - datetime.utcnow()
- delay_seconds = delay.total_seconds()
-
- # if under a minute or in the past
- if delay_seconds < 60:
- log.debug(f"Marking infraction {mute['id']} as inactive (expired).")
- await self._deactivate_infraction(mute)
- self.cancel_task(mute["id"])
-
- # Notify the user that they've been unmuted.
- await self.notify_pardon(
- user=member,
- title="You have been unmuted.",
- content="You may now send messages in the server.",
- icon_url=Icons.user_unmute
- )
- return
-
- # allowing modlog since this is a passive action that should be logged
- await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}")
- log.debug(f"User {member.id} has been re-muted on rejoin.")
-
- # region: Permanent infractions
-
- @with_role(*MODERATION_ROLES)
- @command()
- async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
- """Create a warning infraction in the database for a user."""
- infraction = await post_infraction(ctx, user, type="warning", reason=reason)
- if infraction is None:
- return
-
- notified = await self.notify_infraction(user=user, infr_type="Warning", reason=reason)
-
- dm_result = ":incoming_envelope: " if notified else ""
- action = f"{dm_result}:ok_hand: warned {user.mention}"
- await ctx.send(f"{action}.")
-
- if notified:
- dm_status = "Sent"
- log_content = None
- else:
- dm_status = "**Failed**"
- log_content = ctx.author.mention
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_warn,
- colour=Colour(Colours.soft_red),
- title="Member warned",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.author}
- DM: {dm_status}
- Reason: {reason}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- @with_role(*MODERATION_ROLES)
- @command()
- async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
- """Kicks a user with the provided reason."""
- if not await self.respect_role_hierarchy(ctx, user, 'kick'):
- # Ensure ctx author has a higher top role than the target user
- # Warning is sent to ctx by the helper method
- return
-
- infraction = await post_infraction(ctx, user, type="kick", reason=reason)
- if infraction is None:
- return
-
- notified = await self.notify_infraction(user=user, infr_type="Kick", reason=reason)
-
- self.mod_log.ignore(Event.member_remove, user.id)
-
- try:
- await user.kick(reason=reason)
- action_result = True
- except Forbidden:
- action_result = False
-
- dm_result = ":incoming_envelope: " if notified else ""
- action = f"{dm_result}:ok_hand: kicked {user.mention}"
- await ctx.send(f"{action}.")
-
- dm_status = "Sent" if notified else "**Failed**"
- title = "Member kicked" if action_result else "Member kicked (Failed)"
- log_content = None if all((notified, action_result)) else ctx.author.mention
-
- await self.mod_log.send_log_message(
- icon_url=Icons.sign_out,
- colour=Colour(Colours.soft_red),
- title=title,
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- DM: {dm_status}
- Reason: {reason}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- @with_role(*MODERATION_ROLES)
- @command()
- async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
- """Create a permanent ban infraction for a user with the provided reason."""
- if not await self.respect_role_hierarchy(ctx, user, 'ban'):
- # Ensure ctx author has a higher top role than the target user
- # Warning is sent to ctx by the helper method
- return
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
- return
-
- infraction = await post_infraction(ctx, user, type="ban", reason=reason)
- if infraction is None:
- return
-
- notified = await self.notify_infraction(
- user=user,
- infr_type="Ban",
- reason=reason
- )
-
- self.mod_log.ignore(Event.member_ban, user.id)
- self.mod_log.ignore(Event.member_remove, user.id)
-
- try:
- await ctx.guild.ban(user, reason=reason, delete_message_days=0)
- action_result = True
- except Forbidden:
- action_result = False
-
- dm_result = ":incoming_envelope: " if notified else ""
- action = f"{dm_result}:ok_hand: permanently banned {user.mention}"
- await ctx.send(f"{action}.")
-
- dm_status = "Sent" if notified else "**Failed**"
- log_content = None if all((notified, action_result)) else ctx.author.mention
- title = "Member permanently banned"
- if not action_result:
- title += " (Failed)"
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_ban,
- colour=Colour(Colours.soft_red),
- title=title,
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- DM: {dm_status}
- Reason: {reason}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- # endregion
- # region: Temporary infractions
-
- @with_role(*MODERATION_ROLES)
- @command(aliases=('mute',))
- async def tempmute(self, ctx: Context, user: Member, duration: Duration, *, reason: str = None) -> None:
- """
- Create a temporary mute infraction for a user with the provided expiration and reason.
-
- Duration strings are parsed per: http://strftime.org/
- """
- expiration = duration
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
- return
-
- infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_update, user.id)
- await user.add_roles(self._muted_role, reason=reason)
-
- notified = await self.notify_infraction(
- user=user,
- infr_type="Mute",
- expires_at=expiration,
- reason=reason
- )
-
- infraction_expiration = format_infraction(infraction["expires_at"])
-
- self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
-
- dm_result = ":incoming_envelope: " if notified else ""
- action = f"{dm_result}:ok_hand: muted {user.mention} until {infraction_expiration}"
- await ctx.send(f"{action}.")
-
- if notified:
- dm_status = "Sent"
- log_content = None
- else:
- dm_status = "**Failed**"
- log_content = ctx.author.mention
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_mute,
- colour=Colour(Colours.soft_red),
- title="Member temporarily muted",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- DM: {dm_status}
- Reason: {reason}
- Expires: {infraction_expiration}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- @with_role(*MODERATION_ROLES)
- @command()
- async def tempban(self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None) -> None:
- """
- Create a temporary ban infraction for a user with the provided expiration and reason.
-
- Duration strings are parsed per: http://strftime.org/
- """
- expiration = duration
-
- if not await self.respect_role_hierarchy(ctx, user, 'tempban'):
- # Ensure ctx author has a higher top role than the target user
- # Warning is sent to ctx by the helper method
- return
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
- return
-
- infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration)
- if infraction is None:
- return
-
- notified = await self.notify_infraction(
- user=user,
- infr_type="Ban",
- expires_at=expiration,
- reason=reason
- )
-
- self.mod_log.ignore(Event.member_ban, user.id)
- self.mod_log.ignore(Event.member_remove, user.id)
-
- try:
- await ctx.guild.ban(user, reason=reason, delete_message_days=0)
- action_result = True
- except Forbidden:
- action_result = False
-
- infraction_expiration = format_infraction(infraction["expires_at"])
-
- self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
-
- dm_result = ":incoming_envelope: " if notified else ""
- action = f"{dm_result}:ok_hand: banned {user.mention} until {infraction_expiration}"
- await ctx.send(f"{action}.")
-
- dm_status = "Sent" if notified else "**Failed**"
- log_content = None if all((notified, action_result)) else ctx.author.mention
- title = "Member temporarily banned"
- if not action_result:
- title += " (Failed)"
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_ban,
- colour=Colour(Colours.soft_red),
- thumbnail=user.avatar_url_as(static_format="png"),
- title=title,
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- DM: {dm_status}
- Reason: {reason}
- Expires: {infraction_expiration}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- # endregion
- # region: Permanent shadow infractions
-
- @with_role(*MODERATION_ROLES)
- @command(hidden=True)
- async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
- """
- Create a private infraction note in the database for a user with the provided reason.
-
- This does not send the user a notification
- """
- infraction = await post_infraction(ctx, user, type="note", reason=reason, hidden=True)
- if infraction is None:
- return
-
- await ctx.send(f":ok_hand: note added for {user.mention}.")
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_warn,
- colour=Colour(Colours.soft_red),
- title="Member note added",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Reason: {reason}
- """),
- footer=f"ID {infraction['id']}"
- )
-
- @with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=['shadowkick', 'skick'])
- async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
- """
- Kick a user for the provided reason.
-
- This does not send the user a notification.
- """
- if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'):
- # Ensure ctx author has a higher top role than the target user
- # Warning is sent to ctx by the helper method
- return
-
- infraction = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_remove, user.id)
-
- try:
- await user.kick(reason=reason)
- action_result = True
- except Forbidden:
- action_result = False
-
- await ctx.send(f":ok_hand: kicked {user.mention}.")
-
- title = "Member shadow kicked"
- if action_result:
- log_content = None
- else:
- log_content = ctx.author.mention
- title += " (Failed)"
-
- await self.mod_log.send_log_message(
- icon_url=Icons.sign_out,
- colour=Colour(Colours.soft_red),
- title=title,
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Reason: {reason}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- @with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=['shadowban', 'sban'])
- async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
- """
- Create a permanent ban infraction for a user with the provided reason.
-
- This does not send the user a notification.
- """
- if not await self.respect_role_hierarchy(ctx, user, 'shadowban'):
- # Ensure ctx author has a higher top role than the target user
- # Warning is sent to ctx by the helper method
- return
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
- return
-
- infraction = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_ban, user.id)
- self.mod_log.ignore(Event.member_remove, user.id)
-
- try:
- await ctx.guild.ban(user, reason=reason, delete_message_days=0)
- action_result = True
- except Forbidden:
- action_result = False
-
- await ctx.send(f":ok_hand: permanently banned {user.mention}.")
-
- title = "Member permanently banned"
- if action_result:
- log_content = None
- else:
- log_content = ctx.author.mention
- title += " (Failed)"
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_ban,
- colour=Colour(Colours.soft_red),
- title=title,
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Reason: {reason}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- # endregion
- # region: Temporary shadow infractions
-
- @with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"])
- async def shadow_tempmute(
- self, ctx: Context, user: Member, duration: Duration, *, reason: str = None
- ) -> None:
- """
- Create a temporary mute infraction for a user with the provided reason.
-
- Duration strings are parsed per: http://strftime.org/
-
- This does not send the user a notification.
- """
- expiration = duration
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
- return
-
- infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration, hidden=True)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_update, user.id)
- await user.add_roles(self._muted_role, reason=reason)
-
- infraction_expiration = format_infraction(infraction["expires_at"])
- self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
- await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.")
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_mute,
- colour=Colour(Colours.soft_red),
- title="Member temporarily muted",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Reason: {reason}
- Expires: {infraction_expiration}
- """),
- footer=f"ID {infraction['id']}"
- )
-
- @with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=["shadowtempban, stempban"])
- async def shadow_tempban(
- self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None
- ) -> None:
- """
- Create a temporary ban infraction for a user with the provided reason.
-
- Duration strings are parsed per: http://strftime.org/
-
- This does not send the user a notification.
- """
- expiration = duration
-
- if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'):
- # Ensure ctx author has a higher top role than the target user
- # Warning is sent to ctx by the helper method
- return
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
- return
-
- infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration, hidden=True)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_ban, user.id)
- self.mod_log.ignore(Event.member_remove, user.id)
-
- try:
- await ctx.guild.ban(user, reason=reason, delete_message_days=0)
- action_result = True
- except Forbidden:
- action_result = False
-
- infraction_expiration = format_infraction(infraction["expires_at"])
- self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
- await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.")
-
- title = "Member temporarily banned"
- if action_result:
- log_content = None
- else:
- log_content = ctx.author.mention
- title += " (Failed)"
-
- # Send a log message to the mod log
- await self.mod_log.send_log_message(
- icon_url=Icons.user_ban,
- colour=Colour(Colours.soft_red),
- thumbnail=user.avatar_url_as(static_format="png"),
- title=title,
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Reason: {reason}
- Expires: {infraction_expiration}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
- # endregion
- # region: Remove infractions (un- commands)
-
- @with_role(*MODERATION_ROLES)
- @command()
- async def unmute(self, ctx: Context, user: UserTypes) -> None:
- """Deactivates the active mute infraction for a user."""
- try:
- # check the current active infraction
- response = await self.bot.api_client.get(
- 'bot/infractions',
- params={
- 'active': 'true',
- 'type': 'mute',
- 'user__id': user.id
- }
- )
- if len(response) > 1:
- log.warning("Found more than one active mute infraction for user `%d`", user.id)
-
- if not response:
- # no active infraction
- await ctx.send(
- f":x: There is no active mute infraction for user {user.mention}."
- )
- return
-
- for infraction in response:
- await self._deactivate_infraction(infraction)
- if infraction["expires_at"] is not None:
- self.cancel_expiration(infraction["id"])
-
- notified = await self.notify_pardon(
- user=user,
- title="You have been unmuted.",
- content="You may now send messages in the server.",
- icon_url=Icons.user_unmute
- )
-
- if notified:
- dm_status = "Sent"
- dm_emoji = ":incoming_envelope: "
- log_content = None
- else:
- dm_status = "**Failed**"
- dm_emoji = ""
- log_content = ctx.author.mention
-
- await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.")
-
- embed_text = textwrap.dedent(
- f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- DM: {dm_status}
- """
- )
-
- if len(response) > 1:
- footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}"
- title = "Member unmuted"
- embed_text += "Note: User had multiple **active** mute infractions in the database."
- else:
- infraction = response[0]
- footer = f"Infraction ID: {infraction['id']}"
- title = "Member unmuted"
-
- # Send a log message to the mod log
- await self.mod_log.send_log_message(
- icon_url=Icons.user_unmute,
- colour=Colour(Colours.soft_green),
- title=title,
- thumbnail=user.avatar_url_as(static_format="png"),
- text=embed_text,
- footer=footer,
- content=log_content
- )
- except Exception:
- log.exception("There was an error removing an infraction.")
- await ctx.send(":x: There was an error removing the infraction.")
-
- @with_role(*MODERATION_ROLES)
- @command()
- async def unban(self, ctx: Context, user: UserTypes) -> None:
- """Deactivates the active ban infraction for a user."""
- try:
- # check the current active infraction
- response = await self.bot.api_client.get(
- 'bot/infractions',
- params={
- 'active': 'true',
- 'type': 'ban',
- 'user__id': str(user.id)
- }
- )
- if len(response) > 1:
- log.warning(
- "More than one active ban infraction found for user `%d`.",
- user.id
- )
-
- if not response:
- # no active infraction
- await ctx.send(
- f":x: There is no active ban infraction for user {user.mention}."
- )
- return
-
- for infraction in response:
- await self._deactivate_infraction(infraction)
- if infraction["expires_at"] is not None:
- self.cancel_expiration(infraction["id"])
-
- embed_text = textwrap.dedent(
- f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- """
- )
-
- if len(response) > 1:
- footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}"
- embed_text += "Note: User had multiple **active** ban infractions in the database."
- else:
- infraction = response[0]
- footer = f"Infraction ID: {infraction['id']}"
-
- await ctx.send(f":ok_hand: Un-banned {user.mention}.")
-
- # Send a log message to the mod log
- await self.mod_log.send_log_message(
- icon_url=Icons.user_unban,
- colour=Colour(Colours.soft_green),
- title="Member unbanned",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=embed_text,
- footer=footer,
- )
- except Exception:
- log.exception("There was an error removing an infraction.")
- await ctx.send(":x: There was an error removing the infraction.")
-
- # endregion
- # region: Edit infraction commands
-
- @with_role(*MODERATION_ROLES)
- @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
- async def infraction_group(self, ctx: Context) -> None:
- """Infraction manipulation commands."""
- await ctx.invoke(self.bot.get_command("help"), "infraction")
-
- @with_role(*MODERATION_ROLES)
- @infraction_group.command(name='edit')
- async def infraction_edit(
- self,
- ctx: Context,
- infraction_id: int,
- expires_at: Union[Duration, permanent_duration, None],
- *,
- reason: str = None
- ) -> None:
- """
- Edit the duration and/or the reason of an infraction.
-
- Durations are relative to the time of updating.
- Use "permanent" to mark the infraction as permanent.
- """
- if expires_at is None and reason is None:
- # Unlike UserInputError, the error handler will show a specified message for BadArgument
- raise BadArgument("Neither a new expiry nor a new reason was specified.")
-
- # Retrieve the previous infraction for its information.
- old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}')
-
- request_data = {}
- confirm_messages = []
- log_text = ""
-
- if expires_at == "permanent":
- request_data['expires_at'] = None
- confirm_messages.append("marked as permanent")
- elif expires_at is not None:
- request_data['expires_at'] = expires_at.isoformat()
- confirm_messages.append(f"set to expire on {expires_at.strftime(INFRACTION_FORMAT)}")
- else:
- confirm_messages.append("expiry unchanged")
-
- if reason:
- request_data['reason'] = reason
- confirm_messages.append("set a new reason")
- log_text += f"""
- Previous reason: {old_infraction['reason']}
- New reason: {reason}
- """.rstrip()
- else:
- confirm_messages.append("reason unchanged")
-
- # Update the infraction
- new_infraction = await self.bot.api_client.patch(
- f'bot/infractions/{infraction_id}',
- json=request_data,
- )
-
- # Re-schedule infraction if the expiration has been updated
- if 'expires_at' in request_data:
- self.cancel_task(new_infraction['id'])
- loop = asyncio.get_event_loop()
- self.schedule_task(loop, new_infraction['id'], new_infraction)
-
- log_text += f"""
- Previous expiry: {old_infraction['expires_at'] or "Permanent"}
- New expiry: {new_infraction['expires_at'] or "Permanent"}
- """.rstrip()
-
- await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}")
-
- # Get information about the infraction's user
- user_id = new_infraction['user']
- user = ctx.guild.get_member(user_id)
-
- if user:
- user_text = f"{user.mention} (`{user.id}`)"
- thumbnail = user.avatar_url_as(static_format="png")
- else:
- user_text = f"`{user_id}`"
- thumbnail = None
-
- # The infraction's actor
- actor_id = new_infraction['actor']
- actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
-
- await self.mod_log.send_log_message(
- icon_url=Icons.pencil,
- colour=Colour.blurple(),
- title="Infraction edited",
- thumbnail=thumbnail,
- text=textwrap.dedent(f"""
- Member: {user_text}
- Actor: {actor}
- Edited by: {ctx.message.author}{log_text}
- """)
- )
-
- # endregion
- # region: Search infractions
-
- @with_role(*MODERATION_ROLES)
- @infraction_group.group(name="search", invoke_without_command=True)
- async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
- """Searches for infractions in the database."""
- if isinstance(query, User):
- await ctx.invoke(self.search_user, query)
-
- else:
- await ctx.invoke(self.search_reason, query)
-
- @with_role(*MODERATION_ROLES)
- @infraction_search_group.command(name="user", aliases=("member", "id"))
- async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None:
- """Search for infractions by member."""
- infraction_list = await self.bot.api_client.get(
- 'bot/infractions',
- params={'user__id': str(user.id)}
- )
- embed = Embed(
- title=f"Infractions for {user} ({len(infraction_list)} total)",
- colour=Colour.orange()
- )
- await self.send_infraction_list(ctx, embed, infraction_list)
-
- @with_role(*MODERATION_ROLES)
- @infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
- async def search_reason(self, ctx: Context, reason: str) -> None:
- """Search for infractions by their reason. Use Re2 for matching."""
- infraction_list = await self.bot.api_client.get(
- 'bot/infractions', params={'search': reason}
- )
- embed = Embed(
- title=f"Infractions matching `{reason}` ({len(infraction_list)} total)",
- colour=Colour.orange()
- )
- await self.send_infraction_list(ctx, embed, infraction_list)
-
- # endregion
- # region: Utility functions
-
- async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None:
- """Send a paginated embed of infractions for the specified user."""
- if not infractions:
- await ctx.send(f":warning: No infractions could be found for that query.")
- return
-
- lines = tuple(
- self._infraction_to_string(infraction)
- for infraction in infractions
- )
-
- await LinePaginator.paginate(
- lines,
- ctx=ctx,
- embed=embed,
- empty=True,
- max_lines=3,
- max_size=1000
- )
-
- # endregion
- # region: Utility functions
-
- def schedule_expiration(
- self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]]
- ) -> None:
- """Schedules a task to expire a temporary infraction."""
- infraction_id = infraction_object["id"]
- if infraction_id in self.scheduled_tasks:
- return
-
- task: asyncio.Task = create_task(loop, self._scheduled_expiration(infraction_object))
-
- self.scheduled_tasks[infraction_id] = task
-
- def cancel_expiration(self, infraction_id: str) -> None:
- """Un-schedules a task set to expire a temporary infraction."""
- task = self.scheduled_tasks.get(infraction_id)
- if task is None:
- log.warning(f"Failed to unschedule {infraction_id}: no task found.")
- return
- task.cancel()
- log.debug(f"Unscheduled {infraction_id}.")
- del self.scheduled_tasks[infraction_id]
-
- async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
- """
- Marks an infraction expired after the delay from time of scheduling to time of expiration.
-
- At the time of expiration, the infraction is marked as inactive on the website, and the
- expiration task is cancelled. The user is then notified via DM.
- """
- infraction_id = infraction_object["id"]
-
- # transform expiration to delay in seconds
- expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1])
- await wait_until(expiration_datetime)
-
- log.debug(f"Marking infraction {infraction_id} as inactive (expired).")
- await self._deactivate_infraction(infraction_object)
-
- self.cancel_task(infraction_object["id"])
-
- # Notify the user that they've been unmuted.
- user_id = infraction_object["user"]
- guild = self.bot.get_guild(constants.Guild.id)
- await self.notify_pardon(
- user=guild.get_member(user_id),
- title="You have been unmuted.",
- content="You may now send messages in the server.",
- icon_url=Icons.user_unmute
- )
-
- async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
- """
- A co-routine which marks an infraction as inactive on the website.
-
- This co-routine does not cancel or un-schedule an expiration task.
- """
- guild: Guild = self.bot.get_guild(constants.Guild.id)
- user_id = infraction_object["user"]
- infraction_type = infraction_object["type"]
-
- await self.bot.api_client.patch(
- 'bot/infractions/' + str(infraction_object['id']),
- json={"active": False}
- )
-
- if infraction_type == "mute":
- member: Member = guild.get_member(user_id)
- if member:
- # remove the mute role
- self.mod_log.ignore(Event.member_update, member.id)
- await member.remove_roles(self._muted_role)
- else:
- log.warning(f"Failed to un-mute user: {user_id} (not found)")
- elif infraction_type == "ban":
- user: Object = Object(user_id)
- try:
- await guild.unban(user)
- except NotFound:
- log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.")
-
- def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str:
- """Convert the infraction object to a string representation."""
- actor_id = infraction_object["actor"]
- guild: Guild = self.bot.get_guild(constants.Guild.id)
- actor = guild.get_member(actor_id)
- active = infraction_object["active"]
- user_id = infraction_object["user"]
- hidden = infraction_object["hidden"]
- created = format_infraction(infraction_object["inserted_at"])
- if infraction_object["expires_at"] is None:
- expires = "*Permanent*"
- else:
- expires = format_infraction(infraction_object["expires_at"])
-
- lines = textwrap.dedent(f"""
- {"**===============**" if active else "==============="}
- Status: {"__**Active**__" if active else "Inactive"}
- User: {self.bot.get_user(user_id)} (`{user_id}`)
- Type: **{infraction_object["type"]}**
- Shadow: {hidden}
- Reason: {infraction_object["reason"] or "*None*"}
- Created: {created}
- Expires: {expires}
- Actor: {actor.mention if actor else actor_id}
- ID: `{infraction_object["id"]}`
- {"**===============**" if active else "==============="}
- """)
-
- return lines.strip()
-
- async def notify_infraction(
- self,
- user: Union[User, Member],
- infr_type: str,
- expires_at: Union[datetime, str] = 'N/A',
- reason: str = "No reason provided."
- ) -> bool:
- """
- Attempt to notify a user, via DM, of their fresh infraction.
-
- Returns a boolean indicator of whether the DM was successful.
- """
- if isinstance(expires_at, datetime):
- expires_at = expires_at.strftime(INFRACTION_FORMAT)
-
- embed = Embed(
- description=textwrap.dedent(f"""
- **Type:** {infr_type}
- **Expires:** {expires_at}
- **Reason:** {reason}
- """),
- colour=Colour(Colours.soft_red)
- )
-
- icon_url = INFRACTION_ICONS.get(infr_type, Icons.token_removed)
- embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL)
- embed.title = f"Please review our rules over at {RULES_URL}"
- embed.url = RULES_URL
-
- if infr_type in APPEALABLE_INFRACTIONS:
- embed.set_footer(text="To appeal this infraction, send an e-mail to [email protected]")
-
- return await self.send_private_embed(user, embed)
-
- async def notify_pardon(
- self,
- user: Union[User, Member],
- title: str,
- content: str,
- icon_url: str = Icons.user_verified
- ) -> bool:
- """
- Attempt to notify a user, via DM, of their expired infraction.
-
- Optionally returns a boolean indicator of whether the DM was successful.
- """
- embed = Embed(
- description=content,
- colour=Colour(Colours.soft_green)
- )
-
- embed.set_author(name=title, icon_url=icon_url)
-
- return await self.send_private_embed(user, embed)
-
- async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool:
- """
- A helper method for sending an embed to a user's DMs.
-
- Returns a boolean indicator of DM success.
- """
- # sometimes `user` is a `discord.Object`, so let's make it a proper user.
- user = await self.bot.fetch_user(user.id)
-
- try:
- await user.send(embed=embed)
- return True
- except (HTTPException, Forbidden):
- log.debug(
- f"Infraction-related information could not be sent to user {user} ({user.id}). "
- "They've probably just disabled private messages."
- )
- return False
-
- async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None:
- """Send a mod log entry if an attempt to DM the target user has failed."""
- await self.mod_log.send_log_message(
- icon_url=Icons.token_removed,
- content=actor.mention,
- colour=Colour(Colours.soft_red),
- title="Notification Failed",
- text=(
- f"Direct message was unable to be sent.\nUser: {target.mention}\n"
- f"Type: {infraction_type}"
- )
- )
-
- # endregion
-
- # This cannot be static (must have a __func__ attribute).
- async def cog_command_error(self, ctx: Context, error: Exception) -> None:
- """Send a notification to the invoking context on a Union failure."""
- if isinstance(error, BadUnionArgument):
- if User in error.converters:
- await ctx.send(str(error.errors[0]))
- error.handled = True
-
- @staticmethod
- async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool:
- """
- Check if the highest role of the invoking member is greater than that of the target member.
-
- If this check fails, a warning is sent to the invoking ctx.
-
- Returns True always if target is not a discord.Member instance.
- """
- if not isinstance(target, Member):
- return True
-
- actor = ctx.author
- target_is_lower = target.top_role < actor.top_role
- if not target_is_lower:
- log.info(
- f"{actor} ({actor.id}) attempted to {infr_type} "
- f"{target} ({target.id}), who has an equal or higher top role."
- )
- await ctx.send(
- f":x: {actor.mention}, you may not {infr_type} "
- "someone with an equal or higher top role."
- )
-
- return target_is_lower
-
-
-def setup(bot: Bot) -> None:
- """Moderation cog load."""
- bot.add_cog(Moderation(bot))
- log.info("Cog loaded: Moderation")
diff --git a/bot/cogs/moderation/__init__.py b/bot/cogs/moderation/__init__.py
new file mode 100644
index 000000000..7383ed44e
--- /dev/null
+++ b/bot/cogs/moderation/__init__.py
@@ -0,0 +1,25 @@
+import logging
+
+from discord.ext.commands import Bot
+
+from .infractions import Infractions
+from .management import ModManagement
+from .modlog import ModLog
+from .superstarify import Superstarify
+
+log = logging.getLogger(__name__)
+
+
+def setup(bot: Bot) -> None:
+ """Load the moderation extension (Infractions, ModManagement, ModLog, & Superstarify cogs)."""
+ bot.add_cog(Infractions(bot))
+ log.info("Cog loaded: Infractions")
+
+ bot.add_cog(ModLog(bot))
+ log.info("Cog loaded: ModLog")
+
+ bot.add_cog(ModManagement(bot))
+ log.info("Cog loaded: ModManagement")
+
+ bot.add_cog(Superstarify(bot))
+ log.info("Cog loaded: Superstarify")
diff --git a/bot/cogs/moderation/infractions.py b/bot/cogs/moderation/infractions.py
new file mode 100644
index 000000000..592ead60f
--- /dev/null
+++ b/bot/cogs/moderation/infractions.py
@@ -0,0 +1,607 @@
+import logging
+import textwrap
+import typing as t
+from datetime import datetime
+
+import dateutil.parser
+import discord
+from discord import Member
+from discord.ext import commands
+from discord.ext.commands import Context, command
+
+from bot import constants
+from bot.api import ResponseCodeError
+from bot.constants import Colours, Event
+from bot.decorators import respect_role_hierarchy
+from bot.utils import time
+from bot.utils.checks import with_role_check
+from bot.utils.scheduling import Scheduler
+from . import utils
+from .modlog import ModLog
+from .utils import MemberObject
+
+log = logging.getLogger(__name__)
+
+MemberConverter = t.Union[utils.UserTypes, utils.proxy_user]
+
+
+class Infractions(Scheduler, commands.Cog):
+ """Apply and pardon infractions on users for moderation purposes."""
+
+ category = "Moderation"
+ category_description = "Server moderation tools."
+
+ def __init__(self, bot: commands.Bot):
+ super().__init__()
+
+ self.bot = bot
+ self.category = "Moderation"
+ self._muted_role = discord.Object(constants.Roles.muted)
+
+ self.bot.loop.create_task(self.reschedule_infractions())
+
+ @property
+ def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
+ return self.bot.get_cog("ModLog")
+
+ async def reschedule_infractions(self) -> None:
+ """Schedule expiration for previous infractions."""
+ await self.bot.wait_until_ready()
+
+ infractions = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={'active': 'true'}
+ )
+ for infraction in infractions:
+ if infraction["expires_at"] is not None:
+ self.schedule_task(self.bot.loop, infraction["id"], infraction)
+
+ @commands.Cog.listener()
+ async def on_member_join(self, member: Member) -> None:
+ """Reapply active mute infractions for returning members."""
+ active_mutes = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={
+ 'user__id': str(member.id),
+ 'type': 'mute',
+ 'active': 'true'
+ }
+ )
+ if not active_mutes:
+ return
+
+ # Assume a single mute because of restrictions elsewhere.
+ mute = active_mutes[0]
+
+ # Calculate the time remaining, in seconds, for the mute.
+ expiry = dateutil.parser.isoparse(mute["expires_at"]).replace(tzinfo=None)
+ delta = (expiry - datetime.utcnow()).total_seconds()
+
+ # Mark as inactive if less than a minute remains.
+ if delta < 60:
+ await self.deactivate_infraction(mute)
+ return
+
+ # Allowing mod log since this is a passive action that should be logged.
+ await member.add_roles(self._muted_role, reason=f"Re-applying active mute: {mute['id']}")
+ log.debug(f"User {member.id} has been re-muted on rejoin.")
+
+ # region: Permanent infractions
+
+ @command()
+ async def warn(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Warn a user for the given reason."""
+ infraction = await utils.post_infraction(ctx, user, "warning", reason, active=False)
+ if infraction is None:
+ return
+
+ await self.apply_infraction(ctx, infraction, user)
+
+ @command()
+ async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Kick a user for the given reason."""
+ await self.apply_kick(ctx, user, reason, active=False)
+
+ @command()
+ async def ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
+ """Permanently ban a user for the given reason."""
+ await self.apply_ban(ctx, user, reason)
+
+ # endregion
+ # region: Temporary infractions
+
+ @command(aliases=["mute"])
+ async def tempmute(self, ctx: Context, user: Member, duration: utils.Expiry, *, reason: str = None) -> None:
+ """
+ Temporarily mute a user for the given reason and duration.
+
+ A unit of time should be appended to the duration.
+ Units (∗case-sensitive):
+ \u2003`y` - years
+ \u2003`m` - months∗
+ \u2003`w` - weeks
+ \u2003`d` - days
+ \u2003`h` - hours
+ \u2003`M` - minutes∗
+ \u2003`s` - seconds
+
+ Alternatively, an ISO 8601 timestamp can be provided for the duration.
+ """
+ await self.apply_mute(ctx, user, reason, expires_at=duration)
+
+ @command()
+ async def tempban(self, ctx: Context, user: MemberConverter, duration: utils.Expiry, *, reason: str = None) -> None:
+ """
+ Temporarily ban a user for the given reason and duration.
+
+ A unit of time should be appended to the duration.
+ Units (∗case-sensitive):
+ \u2003`y` - years
+ \u2003`m` - months∗
+ \u2003`w` - weeks
+ \u2003`d` - days
+ \u2003`h` - hours
+ \u2003`M` - minutes∗
+ \u2003`s` - seconds
+
+ Alternatively, an ISO 8601 timestamp can be provided for the duration.
+ """
+ await self.apply_ban(ctx, user, reason, expires_at=duration)
+
+ # endregion
+ # region: Permanent shadow infractions
+
+ @command(hidden=True)
+ async def note(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
+ """Create a private note for a user with the given reason without notifying the user."""
+ infraction = await utils.post_infraction(ctx, user, "note", reason, hidden=True, active=False)
+ if infraction is None:
+ return
+
+ await self.apply_infraction(ctx, infraction, user)
+
+ @command(hidden=True, aliases=['shadowkick', 'skick'])
+ async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Kick a user for the given reason without notifying the user."""
+ await self.apply_kick(ctx, user, reason, hidden=True, active=False)
+
+ @command(hidden=True, aliases=['shadowban', 'sban'])
+ async def shadow_ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
+ """Permanently ban a user for the given reason without notifying the user."""
+ await self.apply_ban(ctx, user, reason, hidden=True)
+
+ # endregion
+ # region: Temporary shadow infractions
+
+ @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"])
+ async def shadow_tempmute(self, ctx: Context, user: Member, duration: utils.Expiry, *, reason: str = None) -> None:
+ """
+ Temporarily mute a user for the given reason and duration without notifying the user.
+
+ A unit of time should be appended to the duration.
+ Units (∗case-sensitive):
+ \u2003`y` - years
+ \u2003`m` - months∗
+ \u2003`w` - weeks
+ \u2003`d` - days
+ \u2003`h` - hours
+ \u2003`M` - minutes∗
+ \u2003`s` - seconds
+
+ Alternatively, an ISO 8601 timestamp can be provided for the duration.
+ """
+ await self.apply_mute(ctx, user, reason, expires_at=duration, hidden=True)
+
+ @command(hidden=True, aliases=["shadowtempban, stempban"])
+ async def shadow_tempban(
+ self,
+ ctx: Context,
+ user: MemberConverter,
+ duration: utils.Expiry,
+ *,
+ reason: str = None
+ ) -> None:
+ """
+ Temporarily ban a user for the given reason and duration without notifying the user.
+
+ A unit of time should be appended to the duration.
+ Units (∗case-sensitive):
+ \u2003`y` - years
+ \u2003`m` - months∗
+ \u2003`w` - weeks
+ \u2003`d` - days
+ \u2003`h` - hours
+ \u2003`M` - minutes∗
+ \u2003`s` - seconds
+
+ Alternatively, an ISO 8601 timestamp can be provided for the duration.
+ """
+ await self.apply_ban(ctx, user, reason, expires_at=duration, hidden=True)
+
+ # endregion
+ # region: Remove infractions (un- commands)
+
+ @command()
+ async def unmute(self, ctx: Context, user: MemberConverter) -> None:
+ """Prematurely end the active mute infraction for the user."""
+ await self.pardon_infraction(ctx, "mute", user)
+
+ @command()
+ async def unban(self, ctx: Context, user: MemberConverter) -> None:
+ """Prematurely end the active ban infraction for the user."""
+ await self.pardon_infraction(ctx, "ban", user)
+
+ # endregion
+ # region: Base infraction functions
+
+ async def apply_mute(self, ctx: Context, user: Member, reason: str, **kwargs) -> None:
+ """Apply a mute infraction with kwargs passed to `post_infraction`."""
+ if await utils.has_active_infraction(ctx, user, "mute"):
+ return
+
+ infraction = await utils.post_infraction(ctx, user, "mute", reason, **kwargs)
+ if infraction is None:
+ return
+
+ self.mod_log.ignore(Event.member_update, user.id)
+
+ action = user.add_roles(self._muted_role, reason=reason)
+ await self.apply_infraction(ctx, infraction, user, action)
+
+ @respect_role_hierarchy()
+ async def apply_kick(self, ctx: Context, user: Member, reason: str, **kwargs) -> None:
+ """Apply a kick infraction with kwargs passed to `post_infraction`."""
+ infraction = await utils.post_infraction(ctx, user, "kick", reason, **kwargs)
+ if infraction is None:
+ return
+
+ self.mod_log.ignore(Event.member_remove, user.id)
+
+ action = user.kick(reason=reason)
+ await self.apply_infraction(ctx, infraction, user, action)
+
+ @respect_role_hierarchy()
+ async def apply_ban(self, ctx: Context, user: MemberObject, reason: str, **kwargs) -> None:
+ """Apply a ban infraction with kwargs passed to `post_infraction`."""
+ if await utils.has_active_infraction(ctx, user, "ban"):
+ return
+
+ infraction = await utils.post_infraction(ctx, user, "ban", reason, **kwargs)
+ if infraction is None:
+ return
+
+ self.mod_log.ignore(Event.member_remove, user.id)
+
+ action = ctx.guild.ban(user, reason=reason, delete_message_days=0)
+ await self.apply_infraction(ctx, infraction, user, action)
+
+ # endregion
+ # region: Utility functions
+
+ async def _scheduled_task(self, infraction: utils.Infraction) -> None:
+ """
+ Marks an infraction expired after the delay from time of scheduling to time of expiration.
+
+ At the time of expiration, the infraction is marked as inactive on the website and the
+ expiration task is cancelled.
+ """
+ _id = infraction["id"]
+
+ expiry = dateutil.parser.isoparse(infraction["expires_at"]).replace(tzinfo=None)
+ await time.wait_until(expiry)
+
+ log.debug(f"Marking infraction {_id} as inactive (expired).")
+ await self.deactivate_infraction(infraction)
+
+ async def deactivate_infraction(
+ self,
+ infraction: utils.Infraction,
+ send_log: bool = True
+ ) -> t.Dict[str, str]:
+ """
+ Deactivate an active infraction and return a dictionary of lines to send in a mod log.
+
+ The infraction is removed from Discord, marked as inactive in the database, and has its
+ expiration task cancelled. If `send_log` is True, a mod log is sent for the
+ deactivation of the infraction.
+
+ Supported infraction types are mute and ban. Other types will raise a ValueError.
+ """
+ guild = self.bot.get_guild(constants.Guild.id)
+ mod_role = guild.get_role(constants.Roles.moderator)
+ user_id = infraction["user"]
+ _type = infraction["type"]
+ _id = infraction["id"]
+ reason = f"Infraction #{_id} expired or was pardoned."
+
+ log.debug(f"Marking infraction #{_id} as inactive (expired).")
+
+ log_content = None
+ log_text = {
+ "Member": str(user_id),
+ "Actor": str(self.bot.user),
+ "Reason": infraction["reason"]
+ }
+
+ try:
+ if _type == "mute":
+ user = guild.get_member(user_id)
+ if user:
+ # Remove the muted role.
+ self.mod_log.ignore(Event.member_update, user.id)
+ await user.remove_roles(self._muted_role, reason=reason)
+
+ # DM the user about the expiration.
+ notified = await utils.notify_pardon(
+ user=user,
+ title="You have been unmuted.",
+ content="You may now send messages in the server.",
+ icon_url=utils.INFRACTION_ICONS["mute"][1]
+ )
+
+ log_text["Member"] = f"{user.mention}(`{user.id}`)"
+ log_text["DM"] = "Sent" if notified else "**Failed**"
+ else:
+ log.info(f"Failed to unmute user {user_id}: user not found")
+ log_text["Failure"] = "User was not found in the guild."
+ elif _type == "ban":
+ user = discord.Object(user_id)
+ self.mod_log.ignore(Event.member_unban, user_id)
+ try:
+ await guild.unban(user, reason=reason)
+ except discord.NotFound:
+ log.info(f"Failed to unban user {user_id}: no active ban found on Discord")
+ log_text["Note"] = "No active ban found on Discord."
+ else:
+ raise ValueError(
+ f"Attempted to deactivate an unsupported infraction #{_id} ({_type})!"
+ )
+ except discord.Forbidden:
+ log.warning(f"Failed to deactivate infraction #{_id} ({_type}): bot lacks permissions")
+ log_text["Failure"] = f"The bot lacks permissions to do this (role hierarchy?)"
+ log_content = mod_role.mention
+ except discord.HTTPException as e:
+ log.exception(f"Failed to deactivate infraction #{_id} ({_type})")
+ log_text["Failure"] = f"HTTPException with code {e.code}."
+ log_content = mod_role.mention
+
+ # Check if the user is currently being watched by Big Brother.
+ try:
+ active_watch = await self.bot.api_client.get(
+ "bot/infractions",
+ params={
+ "active": "true",
+ "type": "watch",
+ "user__id": user_id
+ }
+ )
+
+ log_text["Watching"] = "Yes" if active_watch else "No"
+ except ResponseCodeError:
+ log.exception(f"Failed to fetch watch status for user {user_id}")
+ log_text["Watching"] = "Unknown - failed to fetch watch status."
+
+ try:
+ # Mark infraction as inactive in the database.
+ await self.bot.api_client.patch(
+ f"bot/infractions/{_id}",
+ json={"active": False}
+ )
+ except ResponseCodeError as e:
+ log.exception(f"Failed to deactivate infraction #{_id} ({_type})")
+ log_line = f"API request failed with code {e.status}."
+ log_content = mod_role.mention
+
+ # Append to an existing failure message if possible
+ if "Failure" in log_text:
+ log_text["Failure"] += f" {log_line}"
+ else:
+ log_text["Failure"] = log_line
+
+ # Cancel the expiration task.
+ if infraction["expires_at"] is not None:
+ self.cancel_task(infraction["id"])
+
+ # Send a log message to the mod log.
+ if send_log:
+ log_title = f"expiration failed" if "Failure" in log_text else "expired"
+
+ await self.mod_log.send_log_message(
+ icon_url=utils.INFRACTION_ICONS[_type][1],
+ colour=Colours.soft_green,
+ title=f"Infraction {log_title}: {_type}",
+ text="\n".join(f"{k}: {v}" for k, v in log_text.items()),
+ footer=f"ID: {_id}",
+ content=log_content,
+ )
+
+ return log_text
+
+ async def apply_infraction(
+ self,
+ ctx: Context,
+ infraction: utils.Infraction,
+ user: MemberObject,
+ action_coro: t.Optional[t.Awaitable] = None
+ ) -> None:
+ """Apply an infraction to the user, log the infraction, and optionally notify the user."""
+ infr_type = infraction["type"]
+ icon = utils.INFRACTION_ICONS[infr_type][0]
+ reason = infraction["reason"]
+ expiry = infraction["expires_at"]
+
+ if expiry:
+ expiry = time.format_infraction(expiry)
+
+ # Default values for the confirmation message and mod log.
+ confirm_msg = f":ok_hand: applied"
+ expiry_msg = f" until {expiry}" if expiry else " permanently"
+ dm_result = ""
+ dm_log_text = ""
+ expiry_log_text = f"Expires: {expiry}" if expiry else ""
+ log_title = "applied"
+ log_content = None
+
+ # DM the user about the infraction if it's not a shadow/hidden infraction.
+ if not infraction["hidden"]:
+ # Sometimes user is a discord.Object; make it a proper user.
+ await self.bot.fetch_user(user.id)
+
+ # Accordingly display whether the user was successfully notified via DM.
+ if await utils.notify_infraction(user, infr_type, expiry, reason, icon):
+ dm_result = ":incoming_envelope: "
+ dm_log_text = "\nDM: Sent"
+ else:
+ dm_log_text = "\nDM: **Failed**"
+ log_content = ctx.author.mention
+
+ if infraction["actor"] == self.bot.user.id:
+ end_msg = f" (reason: {infraction['reason']})"
+ else:
+ infractions = await self.bot.api_client.get(
+ "bot/infractions",
+ params={"user__id": str(user.id)}
+ )
+ end_msg = f" ({len(infractions)} infractions total)"
+
+ # Execute the necessary actions to apply the infraction on Discord.
+ if action_coro:
+ try:
+ await action_coro
+ if expiry:
+ # Schedule the expiration of the infraction.
+ self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
+ except discord.Forbidden:
+ # Accordingly display that applying the infraction failed.
+ confirm_msg = f":x: failed to apply"
+ expiry_msg = ""
+ log_content = ctx.author.mention
+ log_title = "failed to apply"
+
+ # Send a confirmation message to the invoking context.
+ await ctx.send(
+ f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}{end_msg}."
+ )
+
+ # Send a log message to the mod log.
+ await self.mod_log.send_log_message(
+ icon_url=icon,
+ colour=Colours.soft_red,
+ title=f"Infraction {log_title}: {infr_type}",
+ thumbnail=user.avatar_url_as(static_format="png"),
+ text=textwrap.dedent(f"""
+ Member: {user.mention} (`{user.id}`)
+ Actor: {ctx.message.author}{dm_log_text}
+ Reason: {reason}
+ {expiry_log_text}
+ """),
+ content=log_content,
+ footer=f"ID {infraction['id']}"
+ )
+
+ async def pardon_infraction(self, ctx: Context, infr_type: str, user: MemberObject) -> None:
+ """Prematurely end an infraction for a user and log the action in the mod log."""
+ # Check the current active infraction
+ response = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={
+ 'active': 'true',
+ 'type': infr_type,
+ 'user__id': user.id
+ }
+ )
+
+ if not response:
+ await ctx.send(f":x: There's no active {infr_type} infraction for user {user.mention}.")
+ return
+
+ # Deactivate the infraction and cancel its scheduled expiration task.
+ log_text = await self.deactivate_infraction(response[0], send_log=False)
+
+ log_text["Member"] = f"{user.mention}(`{user.id}`)"
+ log_text["Actor"] = str(ctx.message.author)
+ log_content = None
+ footer = f"ID: {response[0]['id']}"
+
+ # If multiple active infractions were found, mark them as inactive in the database
+ # and cancel their expiration tasks.
+ if len(response) > 1:
+ log.warning(f"Found more than one active {infr_type} infraction for user {user.id}")
+
+ footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}"
+
+ log_note = f"Found multiple **active** {infr_type} infractions in the database."
+ if "Note" in log_text:
+ log_text["Note"] = f" {log_note}"
+ else:
+ log_text["Note"] = log_note
+
+ # deactivate_infraction() is not called again because:
+ # 1. Discord cannot store multiple active bans or assign multiples of the same role
+ # 2. It would send a pardon DM for each active infraction, which is redundant
+ for infraction in response[1:]:
+ _id = infraction['id']
+ try:
+ # Mark infraction as inactive in the database.
+ await self.bot.api_client.patch(
+ f"bot/infractions/{_id}",
+ json={"active": False}
+ )
+ except ResponseCodeError:
+ log.exception(f"Failed to deactivate infraction #{_id} ({infr_type})")
+ # This is simpler and cleaner than trying to concatenate all the errors.
+ log_text["Failure"] = "See bot's logs for details."
+
+ # Cancel pending expiration task.
+ if infraction["expires_at"] is not None:
+ self.cancel_task(infraction["id"])
+
+ # Accordingly display whether the user was successfully notified via DM.
+ dm_emoji = ""
+ if log_text.get("DM") == "Sent":
+ dm_emoji = ":incoming_envelope: "
+ elif "DM" in log_text:
+ # Mention the actor because the DM failed to send.
+ log_content = ctx.author.mention
+
+ # Accordingly display whether the pardon failed.
+ if "Failure" in log_text:
+ confirm_msg = ":x: failed to pardon"
+ log_title = "pardon failed"
+ log_content = ctx.author.mention
+ else:
+ confirm_msg = f":ok_hand: pardoned"
+ log_title = "pardoned"
+
+ # Send a confirmation message to the invoking context.
+ await ctx.send(
+ f"{dm_emoji}{confirm_msg} infraction **{infr_type}** for {user.mention}. "
+ f"{log_text.get('Failure', '')}"
+ )
+
+ # Send a log message to the mod log.
+ await self.mod_log.send_log_message(
+ icon_url=utils.INFRACTION_ICONS[infr_type][1],
+ colour=Colours.soft_green,
+ title=f"Infraction {log_title}: {infr_type}",
+ thumbnail=user.avatar_url_as(static_format="png"),
+ text="\n".join(f"{k}: {v}" for k, v in log_text.items()),
+ footer=footer,
+ content=log_content,
+ )
+
+ # endregion
+
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return with_role_check(ctx, *constants.MODERATION_ROLES)
+
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
+ if isinstance(error, commands.BadUnionArgument):
+ if discord.User in error.converters:
+ await ctx.send(str(error.errors[0]))
+ error.handled = True
diff --git a/bot/cogs/moderation/management.py b/bot/cogs/moderation/management.py
new file mode 100644
index 000000000..491f6d400
--- /dev/null
+++ b/bot/cogs/moderation/management.py
@@ -0,0 +1,268 @@
+import asyncio
+import logging
+import textwrap
+import typing as t
+
+import discord
+from discord.ext import commands
+from discord.ext.commands import Context
+
+from bot import constants
+from bot.converters import InfractionSearchQuery
+from bot.pagination import LinePaginator
+from bot.utils import time
+from bot.utils.checks import with_role_check
+from . import utils
+from .infractions import Infractions
+from .modlog import ModLog
+
+log = logging.getLogger(__name__)
+
+UserConverter = t.Union[discord.User, utils.proxy_user]
+
+
+def permanent_duration(expires_at: str) -> str:
+ """Only allow an expiration to be 'permanent' if it is a string."""
+ expires_at = expires_at.lower()
+ if expires_at != "permanent":
+ raise commands.BadArgument
+ else:
+ return expires_at
+
+
+class ModManagement(commands.Cog):
+ """Management of infractions."""
+
+ category = "Moderation"
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @property
+ def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
+ return self.bot.get_cog("ModLog")
+
+ @property
+ def infractions_cog(self) -> Infractions:
+ """Get currently loaded Infractions cog instance."""
+ return self.bot.get_cog("Infractions")
+
+ # region: Edit infraction commands
+
+ @commands.group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
+ async def infraction_group(self, ctx: Context) -> None:
+ """Infraction manipulation commands."""
+ await ctx.invoke(self.bot.get_command("help"), "infraction")
+
+ @infraction_group.command(name='edit')
+ async def infraction_edit(
+ self,
+ ctx: Context,
+ infraction_id: int,
+ duration: t.Union[utils.Expiry, permanent_duration, None],
+ *,
+ reason: str = None
+ ) -> None:
+ """
+ Edit the duration and/or the reason of an infraction.
+
+ Durations are relative to the time of updating and should be appended with a unit of time.
+ Units (∗case-sensitive):
+ \u2003`y` - years
+ \u2003`m` - months∗
+ \u2003`w` - weeks
+ \u2003`d` - days
+ \u2003`h` - hours
+ \u2003`M` - minutes∗
+ \u2003`s` - seconds
+
+ Use "permanent" to mark the infraction as permanent. Alternatively, an ISO 8601 timestamp
+ can be provided for the duration.
+ """
+ if duration is None and reason is None:
+ # Unlike UserInputError, the error handler will show a specified message for BadArgument
+ raise commands.BadArgument("Neither a new expiry nor a new reason was specified.")
+
+ # Retrieve the previous infraction for its information.
+ old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}')
+
+ request_data = {}
+ confirm_messages = []
+ log_text = ""
+
+ if duration == "permanent":
+ request_data['expires_at'] = None
+ confirm_messages.append("marked as permanent")
+ elif duration is not None:
+ request_data['expires_at'] = duration.isoformat()
+ expiry = duration.strftime(time.INFRACTION_FORMAT)
+ confirm_messages.append(f"set to expire on {expiry}")
+ else:
+ confirm_messages.append("expiry unchanged")
+
+ if reason:
+ request_data['reason'] = reason
+ confirm_messages.append("set a new reason")
+ log_text += f"""
+ Previous reason: {old_infraction['reason']}
+ New reason: {reason}
+ """.rstrip()
+ else:
+ confirm_messages.append("reason unchanged")
+
+ # Update the infraction
+ new_infraction = await self.bot.api_client.patch(
+ f'bot/infractions/{infraction_id}',
+ json=request_data,
+ )
+
+ # Re-schedule infraction if the expiration has been updated
+ if 'expires_at' in request_data:
+ self.infractions_cog.cancel_task(new_infraction['id'])
+ loop = asyncio.get_event_loop()
+ self.infractions_cog.schedule_task(loop, new_infraction['id'], new_infraction)
+
+ log_text += f"""
+ Previous expiry: {old_infraction['expires_at'] or "Permanent"}
+ New expiry: {new_infraction['expires_at'] or "Permanent"}
+ """.rstrip()
+
+ await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}")
+
+ # Get information about the infraction's user
+ user_id = new_infraction['user']
+ user = ctx.guild.get_member(user_id)
+
+ if user:
+ user_text = f"{user.mention} (`{user.id}`)"
+ thumbnail = user.avatar_url_as(static_format="png")
+ else:
+ user_text = f"`{user_id}`"
+ thumbnail = None
+
+ # The infraction's actor
+ actor_id = new_infraction['actor']
+ actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
+
+ await self.mod_log.send_log_message(
+ icon_url=constants.Icons.pencil,
+ colour=discord.Colour.blurple(),
+ title="Infraction edited",
+ thumbnail=thumbnail,
+ text=textwrap.dedent(f"""
+ Member: {user_text}
+ Actor: {actor}
+ Edited by: {ctx.message.author}{log_text}
+ """)
+ )
+
+ # endregion
+ # region: Search infractions
+
+ @infraction_group.group(name="search", invoke_without_command=True)
+ async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
+ """Searches for infractions in the database."""
+ if isinstance(query, discord.User):
+ await ctx.invoke(self.search_user, query)
+ else:
+ await ctx.invoke(self.search_reason, query)
+
+ @infraction_search_group.command(name="user", aliases=("member", "id"))
+ async def search_user(self, ctx: Context, user: UserConverter) -> None:
+ """Search for infractions by member."""
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={'user__id': str(user.id)}
+ )
+ embed = discord.Embed(
+ title=f"Infractions for {user} ({len(infraction_list)} total)",
+ colour=discord.Colour.orange()
+ )
+ await self.send_infraction_list(ctx, embed, infraction_list)
+
+ @infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
+ async def search_reason(self, ctx: Context, reason: str) -> None:
+ """Search for infractions by their reason. Use Re2 for matching."""
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={'search': reason}
+ )
+ embed = discord.Embed(
+ title=f"Infractions matching `{reason}` ({len(infraction_list)} total)",
+ colour=discord.Colour.orange()
+ )
+ await self.send_infraction_list(ctx, embed, infraction_list)
+
+ # endregion
+ # region: Utility functions
+
+ async def send_infraction_list(
+ self,
+ ctx: Context,
+ embed: discord.Embed,
+ infractions: t.Iterable[utils.Infraction]
+ ) -> None:
+ """Send a paginated embed of infractions for the specified user."""
+ if not infractions:
+ await ctx.send(f":warning: No infractions could be found for that query.")
+ return
+
+ lines = tuple(
+ self.infraction_to_string(infraction)
+ for infraction in infractions
+ )
+
+ await LinePaginator.paginate(
+ lines,
+ ctx=ctx,
+ embed=embed,
+ empty=True,
+ max_lines=3,
+ max_size=1000
+ )
+
+ def infraction_to_string(self, infraction: utils.Infraction) -> str:
+ """Convert the infraction object to a string representation."""
+ actor_id = infraction["actor"]
+ guild = self.bot.get_guild(constants.Guild.id)
+ actor = guild.get_member(actor_id)
+ active = infraction["active"]
+ user_id = infraction["user"]
+ hidden = infraction["hidden"]
+ created = time.format_infraction(infraction["inserted_at"])
+ if infraction["expires_at"] is None:
+ expires = "*Permanent*"
+ else:
+ expires = time.format_infraction(infraction["expires_at"])
+
+ lines = textwrap.dedent(f"""
+ {"**===============**" if active else "==============="}
+ Status: {"__**Active**__" if active else "Inactive"}
+ User: {self.bot.get_user(user_id)} (`{user_id}`)
+ Type: **{infraction["type"]}**
+ Shadow: {hidden}
+ Reason: {infraction["reason"] or "*None*"}
+ Created: {created}
+ Expires: {expires}
+ Actor: {actor.mention if actor else actor_id}
+ ID: `{infraction["id"]}`
+ {"**===============**" if active else "==============="}
+ """)
+
+ return lines.strip()
+
+ # endregion
+
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return with_role_check(ctx, *constants.MODERATION_ROLES)
+
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
+ if isinstance(error, commands.BadUnionArgument):
+ if discord.User in error.converters:
+ await ctx.send(str(error.errors[0]))
+ error.handled = True
diff --git a/bot/cogs/modlog.py b/bot/cogs/moderation/modlog.py
index 68424d268..118503517 100644
--- a/bot/cogs/modlog.py
+++ b/bot/cogs/moderation/modlog.py
@@ -1,30 +1,26 @@
import asyncio
import logging
+import typing as t
from datetime import datetime
-from typing import List, Optional, Union
+import discord
from dateutil.relativedelta import relativedelta
from deepdiff import DeepDiff
-from discord import (
- CategoryChannel, Colour, Embed, File, Guild,
- Member, Message, NotFound, RawMessageDeleteEvent,
- RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel
-)
+from discord import Colour
from discord.abc import GuildChannel
from discord.ext.commands import Bot, Cog, Context
-from bot.constants import (
- Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs
-)
+from bot.constants import Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs
from bot.utils.time import humanize_delta
+from .utils import UserTypes
log = logging.getLogger(__name__)
-GUILD_CHANNEL = Union[CategoryChannel, TextChannel, VoiceChannel]
+GUILD_CHANNEL = t.Union[discord.CategoryChannel, discord.TextChannel, discord.VoiceChannel]
CHANNEL_CHANGES_UNSUPPORTED = ("permissions",)
CHANNEL_CHANGES_SUPPRESSED = ("_overwrites", "position")
-MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status")
+MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status", "nick")
ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions")
@@ -38,7 +34,7 @@ class ModLog(Cog, name="ModLog"):
self._cached_deletes = []
self._cached_edits = []
- async def upload_log(self, messages: List[Message], actor_id: int) -> str:
+ async def upload_log(self, messages: t.List[discord.Message], actor_id: int) -> str:
"""
Uploads the log data to the database via an API endpoint for uploading logs.
@@ -73,23 +69,23 @@ class ModLog(Cog, name="ModLog"):
self._ignored[event].append(item)
async def send_log_message(
- self,
- icon_url: Optional[str],
- colour: Colour,
- title: Optional[str],
- text: str,
- thumbnail: Optional[str] = None,
- channel_id: int = Channels.modlog,
- ping_everyone: bool = False,
- files: Optional[List[File]] = None,
- content: Optional[str] = None,
- additional_embeds: Optional[List[Embed]] = None,
- additional_embeds_msg: Optional[str] = None,
- timestamp_override: Optional[datetime] = None,
- footer: Optional[str] = None,
+ self,
+ icon_url: t.Optional[str],
+ colour: t.Union[discord.Colour, int],
+ title: t.Optional[str],
+ text: str,
+ thumbnail: t.Optional[t.Union[str, discord.Asset]] = None,
+ channel_id: int = Channels.modlog,
+ ping_everyone: bool = False,
+ files: t.Optional[t.List[discord.File]] = None,
+ content: t.Optional[str] = None,
+ additional_embeds: t.Optional[t.List[discord.Embed]] = None,
+ additional_embeds_msg: t.Optional[str] = None,
+ timestamp_override: t.Optional[datetime] = None,
+ footer: t.Optional[str] = None,
) -> Context:
"""Generate log embed and send to logging channel."""
- embed = Embed(description=text)
+ embed = discord.Embed(description=text)
if title and icon_url:
embed.set_author(name=title, icon_url=icon_url)
@@ -126,10 +122,10 @@ class ModLog(Cog, name="ModLog"):
if channel.guild.id != GuildConstant.id:
return
- if isinstance(channel, CategoryChannel):
+ if isinstance(channel, discord.CategoryChannel):
title = "Category created"
message = f"{channel.name} (`{channel.id}`)"
- elif isinstance(channel, VoiceChannel):
+ elif isinstance(channel, discord.VoiceChannel):
title = "Voice channel created"
if channel.category:
@@ -144,7 +140,7 @@ class ModLog(Cog, name="ModLog"):
else:
message = f"{channel.name} (`{channel.id}`)"
- await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message)
+ await self.send_log_message(Icons.hash_green, Colours.soft_green, title, message)
@Cog.listener()
async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None:
@@ -152,20 +148,20 @@ class ModLog(Cog, name="ModLog"):
if channel.guild.id != GuildConstant.id:
return
- if isinstance(channel, CategoryChannel):
+ if isinstance(channel, discord.CategoryChannel):
title = "Category deleted"
- elif isinstance(channel, VoiceChannel):
+ elif isinstance(channel, discord.VoiceChannel):
title = "Voice channel deleted"
else:
title = "Text channel deleted"
- if channel.category and not isinstance(channel, CategoryChannel):
+ if channel.category and not isinstance(channel, discord.CategoryChannel):
message = f"{channel.category}/{channel.name} (`{channel.id}`)"
else:
message = f"{channel.name} (`{channel.id}`)"
await self.send_log_message(
- Icons.hash_red, Colour(Colours.soft_red),
+ Icons.hash_red, Colours.soft_red,
title, message
)
@@ -230,29 +226,29 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_create(self, role: Role) -> None:
+ async def on_guild_role_create(self, role: discord.Role) -> None:
"""Log role create event to mod log."""
if role.guild.id != GuildConstant.id:
return
await self.send_log_message(
- Icons.crown_green, Colour(Colours.soft_green),
+ Icons.crown_green, Colours.soft_green,
"Role created", f"`{role.id}`"
)
@Cog.listener()
- async def on_guild_role_delete(self, role: Role) -> None:
+ async def on_guild_role_delete(self, role: discord.Role) -> None:
"""Log role delete event to mod log."""
if role.guild.id != GuildConstant.id:
return
await self.send_log_message(
- Icons.crown_red, Colour(Colours.soft_red),
+ Icons.crown_red, Colours.soft_red,
"Role removed", f"{role.name} (`{role.id}`)"
)
@Cog.listener()
- async def on_guild_role_update(self, before: Role, after: Role) -> None:
+ async def on_guild_role_update(self, before: discord.Role, after: discord.Role) -> None:
"""Log role update event to mod log."""
if before.guild.id != GuildConstant.id:
return
@@ -305,7 +301,7 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_update(self, before: Guild, after: Guild) -> None:
+ async def on_guild_update(self, before: discord.Guild, after: discord.Guild) -> None:
"""Log guild update event to mod log."""
if before.id != GuildConstant.id:
return
@@ -356,8 +352,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None:
- """Log ban event to mod log."""
+ async def on_member_ban(self, guild: discord.Guild, member: UserTypes) -> None:
+ """Log ban event to user log."""
if guild.id != GuildConstant.id:
return
@@ -366,14 +362,14 @@ class ModLog(Cog, name="ModLog"):
return
await self.send_log_message(
- Icons.user_ban, Colour(Colours.soft_red),
+ Icons.user_ban, Colours.soft_red,
"User banned", f"{member.name}#{member.discriminator} (`{member.id}`)",
thumbnail=member.avatar_url_as(static_format="png"),
- channel_id=Channels.modlog
+ channel_id=Channels.userlog
)
@Cog.listener()
- async def on_member_join(self, member: Member) -> None:
+ async def on_member_join(self, member: discord.Member) -> None:
"""Log member join event to user log."""
if member.guild.id != GuildConstant.id:
return
@@ -388,14 +384,14 @@ class ModLog(Cog, name="ModLog"):
message = f"{Emojis.new} {message}"
await self.send_log_message(
- Icons.sign_in, Colour(Colours.soft_green),
+ Icons.sign_in, Colours.soft_green,
"User joined", message,
thumbnail=member.avatar_url_as(static_format="png"),
channel_id=Channels.userlog
)
@Cog.listener()
- async def on_member_remove(self, member: Member) -> None:
+ async def on_member_remove(self, member: discord.Member) -> None:
"""Log member leave event to user log."""
if member.guild.id != GuildConstant.id:
return
@@ -405,14 +401,14 @@ class ModLog(Cog, name="ModLog"):
return
await self.send_log_message(
- Icons.sign_out, Colour(Colours.soft_red),
+ Icons.sign_out, Colours.soft_red,
"User left", f"{member.name}#{member.discriminator} (`{member.id}`)",
thumbnail=member.avatar_url_as(static_format="png"),
channel_id=Channels.userlog
)
@Cog.listener()
- async def on_member_unban(self, guild: Guild, member: User) -> None:
+ async def on_member_unban(self, guild: discord.Guild, member: discord.User) -> None:
"""Log member unban event to mod log."""
if guild.id != GuildConstant.id:
return
@@ -429,7 +425,7 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_update(self, before: Member, after: Member) -> None:
+ async def on_member_update(self, before: discord.Member, after: discord.Member) -> None:
"""Log member update event to user log."""
if before.guild.id != GuildConstant.id:
return
@@ -502,6 +498,11 @@ class ModLog(Cog, name="ModLog"):
f"**Discriminator:** `{before.discriminator}` **->** `{after.discriminator}`"
)
+ if before.display_name != after.display_name:
+ changes.append(
+ f"**Display name:** `{before.display_name}` **->** `{after.display_name}`"
+ )
+
if not changes:
return
@@ -520,7 +521,7 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_message_delete(self, message: Message) -> None:
+ async def on_message_delete(self, message: discord.Message) -> None:
"""Log message delete event to message change log."""
channel = message.channel
author = message.author
@@ -576,7 +577,7 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None:
+ async def on_raw_message_delete(self, event: discord.RawMessageDeleteEvent) -> None:
"""Log raw message delete event to message change log."""
if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored:
return
@@ -610,14 +611,14 @@ class ModLog(Cog, name="ModLog"):
)
await self.send_log_message(
- Icons.message_delete, Colour(Colours.soft_red),
+ Icons.message_delete, Colours.soft_red,
"Message deleted",
response,
channel_id=Channels.message_log
)
@Cog.listener()
- async def on_message_edit(self, before: Message, after: Message) -> None:
+ async def on_message_edit(self, before: discord.Message, after: discord.Message) -> None:
"""Log message edit event to message change log."""
if (
not before.guild
@@ -692,12 +693,12 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None:
+ async def on_raw_message_edit(self, event: discord.RawMessageUpdateEvent) -> None:
"""Log raw message edit event to message change log."""
try:
channel = self.bot.get_channel(int(event.data["channel_id"]))
message = await channel.fetch_message(event.message_id)
- except NotFound: # Was deleted before we got the event
+ except discord.NotFound: # Was deleted before we got the event
return
if (
@@ -760,9 +761,3 @@ class ModLog(Cog, name="ModLog"):
Icons.message_edit, Colour.blurple(), "Message edited (After)",
after_response, channel_id=Channels.message_log
)
-
-
-def setup(bot: Bot) -> None:
- """Mod log cog load."""
- bot.add_cog(ModLog(bot))
- log.info("Cog loaded: ModLog")
diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/moderation/superstarify.py
index 87021eded..ccc6395d9 100644
--- a/bot/cogs/superstarify/__init__.py
+++ b/bot/cogs/moderation/superstarify.py
@@ -1,21 +1,23 @@
+import json
import logging
import random
+from pathlib import Path
from discord import Colour, Embed, Member
from discord.errors import Forbidden
from discord.ext.commands import Bot, Cog, Context, command
-from bot.cogs.moderation import Moderation
-from bot.cogs.modlog import ModLog
-from bot.cogs.superstarify.stars import get_nick
-from bot.constants import Icons, MODERATION_ROLES, POSITIVE_REPLIES
-from bot.converters import Duration
-from bot.decorators import with_role
-from bot.utils.moderation import post_infraction
+from bot import constants
+from bot.utils.checks import with_role_check
from bot.utils.time import format_infraction
+from . import utils
+from .modlog import ModLog
log = logging.getLogger(__name__)
-NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#wiki-toc-nickname-policy"
+NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#nickname-policy"
+
+with Path("bot/resources/stars.json").open(encoding="utf-8") as stars_file:
+ STAR_NAMES = json.load(stars_file)
class Superstarify(Cog):
@@ -25,11 +27,6 @@ class Superstarify(Cog):
self.bot = bot
@property
- def moderation(self) -> Moderation:
- """Get currently loaded Moderation cog instance."""
- return self.bot.get_cog("Moderation")
-
- @property
def modlog(self) -> ModLog:
"""Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@@ -62,7 +59,7 @@ class Superstarify(Cog):
if active_superstarifies:
[infraction] = active_superstarifies
- forced_nick = get_nick(infraction['id'], before.id)
+ forced_nick = self.get_nick(infraction['id'], before.id)
if after.display_name == forced_nick:
return # Nick change was triggered by this event. Ignore.
@@ -108,7 +105,7 @@ class Superstarify(Cog):
if active_superstarifies:
[infraction] = active_superstarifies
- forced_nick = get_nick(infraction['id'], member.id)
+ forced_nick = self.get_nick(infraction['id'], member.id)
await member.edit(nick=forced_nick)
end_timestamp_human = format_infraction(infraction['expires_at'])
@@ -138,7 +135,7 @@ class Superstarify(Cog):
f"Superstardom ends: **{end_timestamp_human}**"
)
await self.modlog.send_log_message(
- icon_url=Icons.user_update,
+ icon_url=constants.Icons.user_update,
colour=Colour.gold(),
title="Superstar member rejoined server",
text=mod_log_message,
@@ -146,45 +143,39 @@ class Superstarify(Cog):
)
@command(name='superstarify', aliases=('force_nick', 'star'))
- @with_role(*MODERATION_ROLES)
- async def superstarify(
- self, ctx: Context, member: Member, expiration: Duration, reason: str = None
- ) -> None:
+ async def superstarify(self, ctx: Context, member: Member, duration: utils.Expiry, reason: str = None) -> None:
"""
Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration.
- An optional reason can be provided.
+ A unit of time should be appended to the duration.
+ Units (∗case-sensitive):
+ \u2003`y` - years
+ \u2003`m` - months∗
+ \u2003`w` - weeks
+ \u2003`d` - days
+ \u2003`h` - hours
+ \u2003`M` - minutes∗
+ \u2003`s` - seconds
+
+ Alternatively, an ISO 8601 timestamp can be provided for the duration.
- If no reason is given, the original name will be shown in a generated reason.
+ An optional reason can be provided. If no reason is given, the original name will be shown
+ in a generated reason.
"""
- active_superstarifies = await self.bot.api_client.get(
- 'bot/infractions',
- params={
- 'active': 'true',
- 'type': 'superstar',
- 'user__id': str(member.id)
- }
- )
- if active_superstarifies:
- await ctx.send(
- ":x: According to my records, this user is already superstarified. "
- f"See infraction **#{active_superstarifies[0]['id']}**."
- )
+ if await utils.has_active_infraction(ctx, member, "superstar"):
return
- infraction = await post_infraction(
- ctx, member,
- type='superstar', reason=reason or ('old nick: ' + member.display_name),
- expires_at=expiration
- )
- forced_nick = get_nick(infraction['id'], member.id)
+ reason = reason or ('old nick: ' + member.display_name)
+ infraction = await utils.post_infraction(ctx, member, 'superstar', reason, expires_at=duration)
+ forced_nick = self.get_nick(infraction['id'], member.id)
+ expiry_str = format_infraction(infraction["expires_at"])
embed = Embed()
embed.title = "Congratulations!"
embed.description = (
f"Your previous nickname, **{member.display_name}**, was so bad that we have decided to change it. "
f"Your new nickname will be **{forced_nick}**.\n\n"
- f"You will be unable to change your nickname until \n**{expiration}**.\n\n"
+ f"You will be unable to change your nickname until \n**{expiry_str}**.\n\n"
"If you're confused by this, please read our "
f"[official nickname policy]({NICKNAME_POLICY_URL})."
)
@@ -196,20 +187,20 @@ class Superstarify(Cog):
f"Superstarified by **{ctx.author.name}**\n"
f"Old nickname: `{member.display_name}`\n"
f"New nickname: `{forced_nick}`\n"
- f"Superstardom ends: **{expiration}**"
+ f"Superstardom ends: **{expiry_str}**"
)
await self.modlog.send_log_message(
- icon_url=Icons.user_update,
+ icon_url=constants.Icons.user_update,
colour=Colour.gold(),
title="Member Achieved Superstardom",
text=mod_log_message,
thumbnail=member.avatar_url_as(static_format="png")
)
- await self.moderation.notify_infraction(
+ await utils.notify_infraction(
user=member,
infr_type="Superstarify",
- expires_at=expiration,
+ expires_at=expiry_str,
reason=f"Your nickname didn't comply with our [nickname policy]({NICKNAME_POLICY_URL})."
)
@@ -219,7 +210,6 @@ class Superstarify(Cog):
await ctx.send(embed=embed)
@command(name='unsuperstarify', aliases=('release_nick', 'unstar'))
- @with_role(*MODERATION_ROLES)
async def unsuperstarify(self, ctx: Context, member: Member) -> None:
"""Remove the superstarify entry from our database, allowing the user to change their nickname."""
log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}")
@@ -247,9 +237,9 @@ class Superstarify(Cog):
embed = Embed()
embed.description = "User has been released from superstar-prison."
- embed.title = random.choice(POSITIVE_REPLIES)
+ embed.title = random.choice(constants.POSITIVE_REPLIES)
- await self.moderation.notify_pardon(
+ await utils.notify_pardon(
user=member,
title="You are no longer superstarified.",
content="You may now change your nickname on the server."
@@ -257,8 +247,13 @@ class Superstarify(Cog):
log.trace(f"{member.display_name} was successfully released from superstar-prison.")
await ctx.send(embed=embed)
+ @staticmethod
+ def get_nick(infraction_id: int, member_id: int) -> str:
+ """Randomly select a nickname from the Superstarify nickname list."""
+ rng = random.Random(str(infraction_id) + str(member_id))
+ return rng.choice(STAR_NAMES)
-def setup(bot: Bot) -> None:
- """Superstarify cog load."""
- bot.add_cog(Superstarify(bot))
- log.info("Cog loaded: Superstarify")
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return with_role_check(ctx, *constants.MODERATION_ROLES)
diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py
new file mode 100644
index 000000000..788a40d40
--- /dev/null
+++ b/bot/cogs/moderation/utils.py
@@ -0,0 +1,172 @@
+import logging
+import textwrap
+import typing as t
+from datetime import datetime
+
+import discord
+from discord.ext import commands
+from discord.ext.commands import Context
+
+from bot.api import ResponseCodeError
+from bot.constants import Colours, Icons
+from bot.converters import Duration, ISODateTime
+
+log = logging.getLogger(__name__)
+
+# apply icon, pardon icon
+INFRACTION_ICONS = {
+ "mute": (Icons.user_mute, Icons.user_unmute),
+ "kick": (Icons.sign_out, None),
+ "ban": (Icons.user_ban, Icons.user_unban),
+ "warning": (Icons.user_warn, None),
+ "note": (Icons.user_warn, None),
+}
+RULES_URL = "https://pythondiscord.com/pages/rules"
+APPEALABLE_INFRACTIONS = ("ban", "mute")
+
+UserTypes = t.Union[discord.Member, discord.User]
+MemberObject = t.Union[UserTypes, discord.Object]
+Infraction = t.Dict[str, t.Union[str, int, bool]]
+Expiry = t.Union[Duration, ISODateTime]
+
+
+def proxy_user(user_id: str) -> discord.Object:
+ """
+ Create a proxy user object from the given id.
+
+ Used when a Member or User object cannot be resolved.
+ """
+ try:
+ user_id = int(user_id)
+ except ValueError:
+ raise commands.BadArgument
+
+ user = discord.Object(user_id)
+ user.mention = user.id
+ user.avatar_url_as = lambda static_format: None
+
+ return user
+
+
+async def post_infraction(
+ ctx: Context,
+ user: MemberObject,
+ infr_type: str,
+ reason: str,
+ expires_at: datetime = None,
+ hidden: bool = False,
+ active: bool = True,
+) -> t.Optional[dict]:
+ """Posts an infraction to the API."""
+ payload = {
+ "actor": ctx.message.author.id,
+ "hidden": hidden,
+ "reason": reason,
+ "type": infr_type,
+ "user": user.id,
+ "active": active
+ }
+ if expires_at:
+ payload['expires_at'] = expires_at.isoformat()
+
+ try:
+ response = await ctx.bot.api_client.post('bot/infractions', json=payload)
+ except ResponseCodeError as exp:
+ if exp.status == 400 and 'user' in exp.response_json:
+ log.info(
+ f"{ctx.author} tried to add a {infr_type} infraction to `{user.id}`, "
+ "but that user id was not found in the database."
+ )
+ await ctx.send(
+ f":x: Cannot add infraction, the specified user is not known to the database."
+ )
+ return
+ else:
+ log.exception("An unexpected ResponseCodeError occurred while adding an infraction:")
+ await ctx.send(":x: There was an error adding the infraction.")
+ return
+
+ return response
+
+
+async def has_active_infraction(ctx: Context, user: MemberObject, infr_type: str) -> bool:
+ """Checks if a user already has an active infraction of the given type."""
+ active_infractions = await ctx.bot.api_client.get(
+ 'bot/infractions',
+ params={
+ 'active': 'true',
+ 'type': infr_type,
+ 'user__id': str(user.id)
+ }
+ )
+ if active_infractions:
+ await ctx.send(
+ f":x: According to my records, this user already has a {infr_type} infraction. "
+ f"See infraction **#{active_infractions[0]['id']}**."
+ )
+ return True
+ else:
+ return False
+
+
+async def notify_infraction(
+ user: UserTypes,
+ infr_type: str,
+ expires_at: t.Optional[str] = None,
+ reason: t.Optional[str] = None,
+ icon_url: str = Icons.token_removed
+) -> bool:
+ """DM a user about their new infraction and return True if the DM is successful."""
+ embed = discord.Embed(
+ description=textwrap.dedent(f"""
+ **Type:** {infr_type.capitalize()}
+ **Expires:** {expires_at or "N/A"}
+ **Reason:** {reason or "No reason provided."}
+ """),
+ colour=Colours.soft_red
+ )
+
+ embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL)
+ embed.title = f"Please review our rules over at {RULES_URL}"
+ embed.url = RULES_URL
+
+ if infr_type in APPEALABLE_INFRACTIONS:
+ embed.set_footer(
+ text="To appeal this infraction, send an e-mail to [email protected]"
+ )
+
+ return await send_private_embed(user, embed)
+
+
+async def notify_pardon(
+ user: UserTypes,
+ title: str,
+ content: str,
+ icon_url: str = Icons.user_verified
+) -> bool:
+ """DM a user about their pardoned infraction and return True if the DM is successful."""
+ embed = discord.Embed(
+ description=content,
+ colour=Colours.soft_green
+ )
+
+ embed.set_author(name=title, icon_url=icon_url)
+
+ return await send_private_embed(user, embed)
+
+
+async def send_private_embed(user: UserTypes, embed: discord.Embed) -> bool:
+ """
+ A helper method for sending an embed to a user's DMs.
+
+ Returns a boolean indicator of DM success.
+ """
+ try:
+ await user.send(embed=embed)
+ return True
+ except (discord.HTTPException, discord.Forbidden, discord.NotFound):
+ log.debug(
+ f"Infraction-related information could not be sent to user {user} ({user.id}). "
+ "The user either could not be retrieved or probably disabled their DMs."
+ )
+ return False
diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py
index 16717d523..2977e4ebb 100644
--- a/bot/cogs/off_topic_names.py
+++ b/bot/cogs/off_topic_names.py
@@ -75,14 +75,16 @@ class OffTopicNames(Cog):
self.bot = bot
self.updater_task = None
+ self.bot.loop.create_task(self.init_offtopic_updater())
+
def cog_unload(self) -> None:
"""Cancel any running updater tasks on cog unload."""
if self.updater_task is not None:
self.updater_task.cancel()
- @Cog.listener()
- async def on_ready(self) -> None:
+ async def init_offtopic_updater(self) -> None:
"""Start off-topic channel updating event loop if it hasn't already started."""
+ await self.bot.wait_until_ready()
if self.updater_task is None:
coro = update_names(self.bot)
self.updater_task = self.bot.loop.create_task(coro)
diff --git a/bot/cogs/reddit.py b/bot/cogs/reddit.py
index 6880aab85..0f575cece 100644
--- a/bot/cogs/reddit.py
+++ b/bot/cogs/reddit.py
@@ -34,6 +34,8 @@ class Reddit(Cog):
self.new_posts_task = None
self.top_weekly_posts_task = None
+ self.bot.loop.create_task(self.init_reddit_polling())
+
async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]:
"""A helper method to fetch a certain amount of Reddit posts at a given route."""
# Reddit's JSON responses only provide 25 posts at most.
@@ -262,9 +264,9 @@ class Reddit(Cog):
max_lines=15
)
- @Cog.listener()
- async def on_ready(self) -> None:
+ async def init_reddit_polling(self) -> None:
"""Initiate reddit post event loop."""
+ await self.bot.wait_until_ready()
self.reddit_channel = await self.bot.fetch_channel(Channels.reddit)
if self.reddit_channel is not None:
diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py
index 6e91d2c06..b54622306 100644
--- a/bot/cogs/reminders.py
+++ b/bot/cogs/reminders.py
@@ -30,9 +30,11 @@ class Reminders(Scheduler, Cog):
self.bot = bot
super().__init__()
- @Cog.listener()
- async def on_ready(self) -> None:
+ self.bot.loop.create_task(self.reschedule_reminders())
+
+ async def reschedule_reminders(self) -> None:
"""Get all current reminders from the API and reschedule them."""
+ await self.bot.wait_until_ready()
response = await self.bot.api_client.get(
'bot/reminders',
params={'active': 'true'}
diff --git a/bot/cogs/superstarify/stars.py b/bot/cogs/superstarify/stars.py
deleted file mode 100644
index dbac86770..000000000
--- a/bot/cogs/superstarify/stars.py
+++ /dev/null
@@ -1,87 +0,0 @@
-import random
-
-
-STAR_NAMES = (
- "Adele",
- "Aerosmith",
- "Aretha Franklin",
- "Ayumi Hamasaki",
- "B'z",
- "Barbra Streisand",
- "Barry Manilow",
- "Barry White",
- "Beyonce",
- "Billy Joel",
- "Bob Dylan",
- "Bob Marley",
- "Bob Seger",
- "Bon Jovi",
- "Britney Spears",
- "Bruce Springsteen",
- "Bruno Mars",
- "Bryan Adams",
- "Celine Dion",
- "Cher",
- "Christina Aguilera",
- "David Bowie",
- "Donna Summer",
- "Drake",
- "Ed Sheeran",
- "Elton John",
- "Elvis Presley",
- "Eminem",
- "Enya",
- "Flo Rida",
- "Frank Sinatra",
- "Garth Brooks",
- "George Michael",
- "George Strait",
- "James Taylor",
- "Janet Jackson",
- "Jay-Z",
- "Johnny Cash",
- "Johnny Hallyday",
- "Julio Iglesias",
- "Justin Bieber",
- "Justin Timberlake",
- "Kanye West",
- "Katy Perry",
- "Kenny G",
- "Kenny Rogers",
- "Lady Gaga",
- "Lil Wayne",
- "Linda Ronstadt",
- "Lionel Richie",
- "Madonna",
- "Mariah Carey",
- "Meat Loaf",
- "Michael Jackson",
- "Neil Diamond",
- "Nicki Minaj",
- "Olivia Newton-John",
- "Paul McCartney",
- "Phil Collins",
- "Pink",
- "Prince",
- "Reba McEntire",
- "Rihanna",
- "Robbie Williams",
- "Rod Stewart",
- "Santana",
- "Shania Twain",
- "Stevie Wonder",
- "Taylor Swift",
- "Tim McGraw",
- "Tina Turner",
- "Tom Petty",
- "Tupac Shakur",
- "Usher",
- "Van Halen",
- "Whitney Houston",
-)
-
-
-def get_nick(infraction_id: int, member_id: int) -> str:
- """Randomly select a nickname from the Superstarify nickname list."""
- rng = random.Random(str(infraction_id) + str(member_id))
- return rng.choice(STAR_NAMES)
diff --git a/bot/cogs/sync/cog.py b/bot/cogs/sync/cog.py
index b75fb26cd..aaa581f96 100644
--- a/bot/cogs/sync/cog.py
+++ b/bot/cogs/sync/cog.py
@@ -29,9 +29,11 @@ class Sync(Cog):
def __init__(self, bot: Bot) -> None:
self.bot = bot
- @Cog.listener()
- async def on_ready(self) -> None:
+ self.bot.loop.create_task(self.sync_guild())
+
+ async def sync_guild(self) -> None:
"""Syncs the roles/users of the guild with the database."""
+ await self.bot.wait_until_ready()
guild = self.bot.get_guild(self.SYNC_SERVER_ID)
if guild is not None:
for syncer in self.ON_READY_SYNCERS:
diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py
index 7dd0afbbd..5a0d20e57 100644
--- a/bot/cogs/token_remover.py
+++ b/bot/cogs/token_remover.py
@@ -9,7 +9,7 @@ from discord import Colour, Message
from discord.ext.commands import Bot, Cog
from discord.utils import snowflake_time
-from bot.cogs.modlog import ModLog
+from bot.cogs.moderation import ModLog
from bot.constants import Channels, Colours, Event, Icons
log = logging.getLogger(__name__)
@@ -26,11 +26,11 @@ DELETION_MESSAGE_TEMPLATE = (
DISCORD_EPOCH_TIMESTAMP = datetime(2017, 1, 1)
TOKEN_EPOCH = 1_293_840_000
TOKEN_RE = re.compile(
- r"[^\s\.]+" # Matches token part 1: The user ID string, encoded as base64
- r"\." # Matches a literal dot between the token parts
- r"[^\s\.]+" # Matches token part 2: The creation timestamp, as an integer
- r"\." # Matches a literal dot between the token parts
- r"[^\s\.]+" # Matches token part 3: The HMAC, unused by us, but check that it isn't empty
+ r"[^\s\.()\"']+" # Matches token part 1: The user ID string, encoded as base64
+ r"\." # Matches a literal dot between the token parts
+ r"[^\s\.()\"']+" # Matches token part 2: The creation timestamp, as an integer
+ r"\." # Matches a literal dot between the token parts
+ r"[^\s\.()\"']+" # Matches token part 3: The HMAC, unused by us, but check that it isn't empty
)
diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py
index 9306c8986..793fe4c1a 100644
--- a/bot/cogs/utils.py
+++ b/bot/cogs/utils.py
@@ -35,56 +35,58 @@ class Utils(Cog):
await ctx.invoke(self.bot.get_command("help"), "pep")
return
- # Newer PEPs are written in RST instead of txt
- if pep_number > 542:
- pep_url = f"{self.base_github_pep_url}{pep_number:04}.rst"
- else:
- pep_url = f"{self.base_github_pep_url}{pep_number:04}.txt"
-
- # Attempt to fetch the PEP
- log.trace(f"Requesting PEP {pep_number} with {pep_url}")
- response = await self.bot.http_session.get(pep_url)
-
- if response.status == 200:
- log.trace("PEP found")
-
- pep_content = await response.text()
-
- # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
- pep_header = HeaderParser().parse(StringIO(pep_content))
-
- # Assemble the embed
- pep_embed = Embed(
- title=f"**PEP {pep_number} - {pep_header['Title']}**",
- description=f"[Link]({self.base_pep_url}{pep_number:04})",
- )
-
- pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png")
-
- # Add the interesting information
- if "Status" in pep_header:
- pep_embed.add_field(name="Status", value=pep_header["Status"])
- if "Python-Version" in pep_header:
- pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"])
- if "Created" in pep_header:
- pep_embed.add_field(name="Created", value=pep_header["Created"])
- if "Type" in pep_header:
- pep_embed.add_field(name="Type", value=pep_header["Type"])
+ possible_extensions = ['.txt', '.rst']
+ found_pep = False
+ for extension in possible_extensions:
+ # Attempt to fetch the PEP
+ pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}"
+ log.trace(f"Requesting PEP {pep_number} with {pep_url}")
+ response = await self.bot.http_session.get(pep_url)
+
+ if response.status == 200:
+ log.trace("PEP found")
+ found_pep = True
+
+ pep_content = await response.text()
+
+ # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
+ pep_header = HeaderParser().parse(StringIO(pep_content))
+
+ # Assemble the embed
+ pep_embed = Embed(
+ title=f"**PEP {pep_number} - {pep_header['Title']}**",
+ description=f"[Link]({self.base_pep_url}{pep_number:04})",
+ )
- elif response.status == 404:
+ pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png")
+
+ # Add the interesting information
+ if "Status" in pep_header:
+ pep_embed.add_field(name="Status", value=pep_header["Status"])
+ if "Python-Version" in pep_header:
+ pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"])
+ if "Created" in pep_header:
+ pep_embed.add_field(name="Created", value=pep_header["Created"])
+ if "Type" in pep_header:
+ pep_embed.add_field(name="Type", value=pep_header["Type"])
+
+ elif response.status != 404:
+ # any response except 200 and 404 is expected
+ found_pep = True # actually not, but it's easier to display this way
+ log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: "
+ f"{response.status}.\n{response.text}")
+
+ error_message = "Unexpected HTTP error during PEP search. Please let us know."
+ pep_embed = Embed(title="Unexpected error", description=error_message)
+ pep_embed.colour = Colour.red()
+ break
+
+ if not found_pep:
log.trace("PEP was not found")
not_found = f"PEP {pep_number} does not exist."
pep_embed = Embed(title="PEP not found", description=not_found)
pep_embed.colour = Colour.red()
- else:
- log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: "
- f"{response.status}.\n{response.text}")
-
- error_message = "Unexpected HTTP error during PEP search. Please let us know."
- pep_embed = Embed(title="Unexpected error", description=error_message)
- pep_embed.colour = Colour.red()
-
await ctx.message.channel.send(embed=pep_embed)
@command()
diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py
index f0a099f27..5b115deaa 100644
--- a/bot/cogs/verification.py
+++ b/bot/cogs/verification.py
@@ -1,10 +1,12 @@
import logging
+from datetime import datetime
from discord import Message, NotFound, Object
+from discord.ext import tasks
from discord.ext.commands import Bot, Cog, Context, command
-from bot.cogs.modlog import ModLog
-from bot.constants import Channels, Event, Roles
+from bot.cogs.moderation import ModLog
+from bot.constants import Bot as BotConfig, Channels, Event, Roles
from bot.decorators import InChannelCheckFailure, in_channel, without_role
log = logging.getLogger(__name__)
@@ -27,12 +29,18 @@ from time to time, you can send `!subscribe` to <#{Channels.bot}> at any time to
If you'd like to unsubscribe from the announcement notifications, simply send `!unsubscribe` to <#{Channels.bot}>.
"""
+PERIODIC_PING = (
+ f"@everyone To verify that you have read our rules, please type `{BotConfig.prefix}accept`."
+ f" Ping <@&{Roles.admin}> if you encounter any problems during the verification process."
+)
+
class Verification(Cog):
"""User verification and role self-management."""
def __init__(self, bot: Bot):
self.bot = bot
+ self.periodic_ping.start()
@property
def mod_log(self) -> ModLog:
@@ -155,6 +163,34 @@ class Verification(Cog):
else:
return True
+ @tasks.loop(hours=12)
+ async def periodic_ping(self) -> None:
+ """Every week, mention @everyone to remind them to verify."""
+ messages = self.bot.get_channel(Channels.verification).history(limit=10)
+ need_to_post = True # True if a new message needs to be sent.
+
+ async for message in messages:
+ if message.author == self.bot.user and message.content == PERIODIC_PING:
+ delta = datetime.utcnow() - message.created_at # Time since last message.
+ if delta.days >= 7: # Message is older than a week.
+ await message.delete()
+ else:
+ need_to_post = False
+
+ break
+
+ if need_to_post:
+ await self.bot.get_channel(Channels.verification).send(PERIODIC_PING)
+
+ @periodic_ping.before_loop
+ async def before_ping(self) -> None:
+ """Only start the loop when the bot is ready."""
+ await self.bot.wait_until_ready()
+
+ def cog_unload(self) -> None:
+ """Cancel the periodic ping task when the cog is unloaded."""
+ self.periodic_ping.cancel()
+
def setup(bot: Bot) -> None:
"""Verification cog load."""
diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py
index 3eba9862f..c516508ca 100644
--- a/bot/cogs/watchchannels/bigbrother.py
+++ b/bot/cogs/watchchannels/bigbrother.py
@@ -5,9 +5,9 @@ from typing import Union
from discord import User
from discord.ext.commands import Bot, Cog, Context, group
+from bot.cogs.moderation.utils import post_infraction
from bot.constants import Channels, Roles, Webhooks
from bot.decorators import with_role
-from bot.utils.moderation import post_infraction
from .watchchannel import WatchChannel, proxy_user
log = logging.getLogger(__name__)
@@ -64,9 +64,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):
await ctx.send(":x: The specified user is already being watched.")
return
- response = await post_infraction(
- ctx, user, type='watch', reason=reason, hidden=True
- )
+ response = await post_infraction(ctx, user, 'watch', reason, hidden=True)
if response is not None:
self.watched_users[user.id] = response
@@ -111,7 +109,7 @@ class BigBrother(WatchChannel, Cog, name="Big Brother"):
json={'active': False}
)
- await post_infraction(ctx, user, type='watch', reason=f"Unwatched: {reason}", hidden=True, active=False)
+ await post_infraction(ctx, user, 'watch', f"Unwatched: {reason}", hidden=True, active=False)
await ctx.send(f":white_check_mark: Messages sent by {user} will no longer be relayed.")
diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py
index 760e012eb..0bf75a924 100644
--- a/bot/cogs/watchchannels/watchchannel.py
+++ b/bot/cogs/watchchannels/watchchannel.py
@@ -13,7 +13,7 @@ from discord import Color, Embed, HTTPException, Message, Object, errors
from discord.ext.commands import BadArgument, Bot, Cog, Context
from bot.api import ResponseCodeError
-from bot.cogs.modlog import ModLog
+from bot.cogs.moderation import ModLog
from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig, Icons
from bot.pagination import LinePaginator
from bot.utils import CogABCMeta, messages
diff --git a/bot/decorators.py b/bot/decorators.py
index 33a6bcadd..935df4af0 100644
--- a/bot/decorators.py
+++ b/bot/decorators.py
@@ -3,13 +3,13 @@ import random
from asyncio import Lock, sleep
from contextlib import suppress
from functools import wraps
-from typing import Any, Callable, Container, Optional
+from typing import Callable, Container, Union
from weakref import WeakValueDictionary
-from discord import Colour, Embed
+from discord import Colour, Embed, Member
from discord.errors import NotFound
from discord.ext import commands
-from discord.ext.commands import CheckFailure, Context
+from discord.ext.commands import CheckFailure, Cog, Context
from bot.constants import ERROR_REPLIES, RedirectOutput
from bot.utils.checks import with_role_check, without_role_check
@@ -72,13 +72,13 @@ def locked() -> Callable:
Subsequent calls to the command from the same author are ignored until the command has completed invocation.
- This decorator has to go before (below) the `command` decorator.
+ This decorator must go before (below) the `command` decorator.
"""
def wrap(func: Callable) -> Callable:
func.__locks = WeakValueDictionary()
@wraps(func)
- async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Optional[Any]:
+ async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None:
lock = func.__locks.setdefault(ctx.author.id, Lock())
if lock.locked():
embed = Embed()
@@ -93,7 +93,7 @@ def locked() -> Callable:
return
async with func.__locks.setdefault(ctx.author.id, Lock()):
- return await func(self, ctx, *args, **kwargs)
+ await func(self, ctx, *args, **kwargs)
return inner
return wrap
@@ -103,17 +103,21 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non
Changes the channel in the context of the command to redirect the output to a certain channel.
Redirect is bypassed if the author has a role to bypass redirection.
+
+ This decorator must go before (below) the `command` decorator.
"""
def wrap(func: Callable) -> Callable:
@wraps(func)
- async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Any:
+ async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None:
if ctx.channel.id == destination_channel:
log.trace(f"Command {ctx.command.name} was invoked in destination_channel, not redirecting")
- return await func(self, ctx, *args, **kwargs)
+ await func(self, ctx, *args, **kwargs)
+ return
if bypass_roles and any(role.id in bypass_roles for role in ctx.author.roles):
log.trace(f"{ctx.author} has role to bypass output redirection")
- return await func(self, ctx, *args, **kwargs)
+ await func(self, ctx, *args, **kwargs)
+ return
redirect_channel = ctx.guild.get_channel(destination_channel)
old_channel = ctx.channel
@@ -140,3 +144,50 @@ def redirect_output(destination_channel: int, bypass_roles: Container[int] = Non
log.trace("Redirect output: Deleted invocation message")
return inner
return wrap
+
+
+def respect_role_hierarchy(target_arg: Union[int, str] = 0) -> Callable:
+ """
+ Ensure the highest role of the invoking member is greater than that of the target member.
+
+ If the condition fails, a warning is sent to the invoking context. A target which is not an
+ instance of discord.Member will always pass.
+
+ A value of 0 (i.e. position 0) for `target_arg` corresponds to the argument which comes after
+ `ctx`. If the target argument is a kwarg, its name can instead be given.
+
+ This decorator must go before (below) the `command` decorator.
+ """
+ def wrap(func: Callable) -> Callable:
+ @wraps(func)
+ async def inner(self: Cog, ctx: Context, *args, **kwargs) -> None:
+ try:
+ target = kwargs[target_arg]
+ except KeyError:
+ try:
+ target = args[target_arg]
+ except IndexError:
+ raise ValueError(f"Could not find target argument at position {target_arg}")
+ except TypeError:
+ raise ValueError(f"Could not find target kwarg with key {target_arg!r}")
+
+ if not isinstance(target, Member):
+ log.trace("The target is not a discord.Member; skipping role hierarchy check.")
+ await func(self, ctx, *args, **kwargs)
+ return
+
+ cmd = ctx.command.name
+ actor = ctx.author
+ if target.top_role >= actor.top_role:
+ log.info(
+ f"{actor} ({actor.id}) attempted to {cmd} "
+ f"{target} ({target.id}), who has an equal or higher top role."
+ )
+ await ctx.send(
+ f":x: {actor.mention}, you may not {cmd} "
+ "someone with an equal or higher top role."
+ )
+ else:
+ await func(self, ctx, *args, **kwargs)
+ return inner
+ return wrap
diff --git a/bot/resources/stars.json b/bot/resources/stars.json
index 8071b9626..c0b253120 100644
--- a/bot/resources/stars.json
+++ b/bot/resources/stars.json
@@ -1,82 +1,78 @@
-{
- "Adele": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7c/Adele_2016.jpg/220px-Adele_2016.jpg",
- "Steven Tyler": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a8/Steven_Tyler_by_Gage_Skidmore_3.jpg/220px-Steven_Tyler_by_Gage_Skidmore_3.jpg",
- "Alex Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b3/Alex_Van_Halen_-_Van_Halen_Live.jpg/220px-Alex_Van_Halen_-_Van_Halen_Live.jpg",
- "Aretha Franklin": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c6/Aretha_Franklin_1968.jpg/220px-Aretha_Franklin_1968.jpg",
- "Ayumi Hamasaki": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Ayumi_Hamasaki_2007.jpg/220px-Ayumi_Hamasaki_2007.jpg",
- "Koshi Inaba": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_18.jpg/220px-B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_18.jpg",
- "Barbra Streisand": "https://upload.wikimedia.org/wikipedia/en/thumb/a/a3/Barbra_Streisand_-_1966.jpg/220px-Barbra_Streisand_-_1966.jpg",
- "Barry Manilow": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2b/BarryManilow.jpg/220px-BarryManilow.jpg",
- "Barry White": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b7/Barry_White%2C_Bestanddeelnr_927-0099.jpg/220px-Barry_White%2C_Bestanddeelnr_927-0099.jpg",
- "Beyonce": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/Beyonce_-_The_Formation_World_Tour%2C_at_Wembley_Stadium_in_London%2C_England.jpg/220px-Beyonce_-_The_Formation_World_Tour%2C_at_Wembley_Stadium_in_London%2C_England.jpg",
- "Billy Joel": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/Billy_Joel_Shankbone_NYC_2009.jpg/220px-Billy_Joel_Shankbone_NYC_2009.jpg",
- "Bob Dylan": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/02/Bob_Dylan_-_Azkena_Rock_Festival_2010_2.jpg/220px-Bob_Dylan_-_Azkena_Rock_Festival_2010_2.jpg",
- "Bob Marley": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5e/Bob-Marley.jpg/220px-Bob-Marley.jpg",
- "Bob Seger": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/16/Bob_Seger_2013.jpg/220px-Bob_Seger_2013.jpg",
- "Jon Bon Jovi": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/49/Jon_Bon_Jovi_at_the_2009_Tribeca_Film_Festival_3.jpg/220px-Jon_Bon_Jovi_at_the_2009_Tribeca_Film_Festival_3.jpg",
- "Britney Spears": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/Britney_Spears_2013_%28Straighten_Crop%29.jpg/200px-Britney_Spears_2013_%28Straighten_Crop%29.jpg",
- "Bruce Springsteen": "https://upload.wikimedia.org/wikipedia/commons/thumb/3/3b/Bruce_Springsteen_-_Roskilde_Festival_2012.jpg/210px-Bruce_Springsteen_-_Roskilde_Festival_2012.jpg",
- "Bruno Mars": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b0/BrunoMars24KMagicWorldTourLive_%28cropped%29.jpg/220px-BrunoMars24KMagicWorldTourLive_%28cropped%29.jpg",
- "Bryan Adams": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7e/Bryan_Adams_Hamburg_MG_0631_flickr.jpg/300px-Bryan_Adams_Hamburg_MG_0631_flickr.jpg",
- "Celine Dion": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/42/Celine_Dion_Concert_Singing_Taking_Chances_2008.jpg/220px-Celine_Dion_Concert_Singing_Taking_Chances_2008.jpg",
- "Cher": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Cher_-_Casablanca.jpg/220px-Cher_-_Casablanca.jpg",
- "Christina Aguilera": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e7/Christina_Aguilera_in_2016.jpg/220px-Christina_Aguilera_in_2016.jpg",
- "David Bowie": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e8/David-Bowie_Chicago_2002-08-08_photoby_Adam-Bielawski-cropped.jpg/220px-David-Bowie_Chicago_2002-08-08_photoby_Adam-Bielawski-cropped.jpg",
- "David Lee Roth": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/fb/David_Lee_Roth_-_Van_Halen.jpg/220px-David_Lee_Roth_-_Van_Halen.jpg",
- "Donna Summer": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Nobel_Peace_Price_Concert_2009_Donna_Summer3.jpg/220px-Nobel_Peace_Price_Concert_2009_Donna_Summer3.jpg",
- "Drake": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/81/Drake_at_the_Velvet_Underground_-_2017_%2835986086223%29_%28cropped%29.jpg/220px-Drake_at_the_Velvet_Underground_-_2017_%2835986086223%29_%28cropped%29.jpg",
- "Ed Sheeran": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/55/Ed_Sheeran_2013.jpg/220px-Ed_Sheeran_2013.jpg",
- "Eddie Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Eddie_Van_Halen.jpg/300px-Eddie_Van_Halen.jpg",
- "Elton John": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/Elton_John_2011_Shankbone_2.JPG/220px-Elton_John_2011_Shankbone_2.JPG",
- "Elvis Presley": "https://upload.wikimedia.org/wikipedia/commons/thumb/9/99/Elvis_Presley_promoting_Jailhouse_Rock.jpg/220px-Elvis_Presley_promoting_Jailhouse_Rock.jpg",
- "Eminem": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/4a/Eminem_-_Concert_for_Valor_in_Washington%2C_D.C._Nov._11%2C_2014_%282%29_%28Cropped%29.jpg/220px-Eminem_-_Concert_for_Valor_in_Washington%2C_D.C._Nov._11%2C_2014_%282%29_%28Cropped%29.jpg",
- "Enya": "https://enya.com/wp-content/themes/enya%20full%20site/images/enya-about.jpg",
- "Flo Rida": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8b/Flo_Rida_%286924266548%29.jpg/220px-Flo_Rida_%286924266548%29.jpg",
- "Frank Sinatra": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/af/Frank_Sinatra_%2757.jpg/220px-Frank_Sinatra_%2757.jpg",
- "Garth Brooks": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/bc/Garth_Brooks_on_World_Tour_%28crop%29.png/220px-Garth_Brooks_on_World_Tour_%28crop%29.png",
- "George Michael": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/George_Michael.jpeg/220px-George_Michael.jpeg",
- "George Strait": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/George_Strait_2014_1.jpg/220px-George_Strait_2014_1.jpg",
- "James Taylor": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/cf/James_Taylor_-_Columbia.jpg/220px-James_Taylor_-_Columbia.jpg",
- "Janet Jackson": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/02/JanetJacksonUnbreakableTourSanFran2015.jpg/220px-JanetJacksonUnbreakableTourSanFran2015.jpg",
- "Jay-Z": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b6/Jay-Z.png/220px-Jay-Z.png",
- "Johnny Cash": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/JohnnyCash1969.jpg/220px-JohnnyCash1969.jpg",
- "Johnny Hallyday": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a1/Johnny_Hallyday_Cannes.jpg/220px-Johnny_Hallyday_Cannes.jpg",
- "Julio Iglesias": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ef/Julio_Iglesias09.jpg/220px-Julio_Iglesias09.jpg",
- "Justin Bieber": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/Justin_Bieber_in_2015.jpg/220px-Justin_Bieber_in_2015.jpg",
- "Justin Timberlake": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ed/Justin_Timberlake_by_Gage_Skidmore_2.jpg/220px-Justin_Timberlake_by_Gage_Skidmore_2.jpg",
- "Kanye West": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/11/Kanye_West_at_the_2009_Tribeca_Film_Festival.jpg/220px-Kanye_West_at_the_2009_Tribeca_Film_Festival.jpg",
- "Katy Perry": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8a/Katy_Perry_at_Madison_Square_Garden_%2837436531092%29_%28cropped%29.jpg/220px-Katy_Perry_at_Madison_Square_Garden_%2837436531092%29_%28cropped%29.jpg",
- "Kenny G": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f4/KennyGHWOFMay2013.jpg/220px-KennyGHWOFMay2013.jpg",
- "Kenny Rogers": "https://upload.wikimedia.org/wikipedia/commons/thumb/8/8c/KennyRogers.jpg/220px-KennyRogers.jpg",
- "Lady Gaga": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/2c/Lady_Gaga_interview_2016.jpg/220px-Lady_Gaga_interview_2016.jpg",
- "Lil Wayne": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a6/Lil_Wayne_%2823513397583%29.jpg/220px-Lil_Wayne_%2823513397583%29.jpg",
- "Linda Ronstadt": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/LindaRonstadtPerforming.jpg/220px-LindaRonstadtPerforming.jpg",
- "Lionel Richie": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/cd/Lionel_Richie_2017.jpg/220px-Lionel_Richie_2017.jpg",
- "Madonna": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d1/Madonna_Rebel_Heart_Tour_2015_-_Stockholm_%2823051472299%29_%28cropped_2%29.jpg/220px-Madonna_Rebel_Heart_Tour_2015_-_Stockholm_%2823051472299%29_%28cropped_2%29.jpg",
- "Mariah Carey": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f2/Mariah_Carey_WBLS_2018_Interview_4.jpg/220px-Mariah_Carey_WBLS_2018_Interview_4.jpg",
- "Meat Loaf": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e7/Meat_Loaf.jpg/220px-Meat_Loaf.jpg",
- "Michael Jackson": "https://upload.wikimedia.org/wikipedia/commons/thumb/3/31/Michael_Jackson_in_1988.jpg/220px-Michael_Jackson_in_1988.jpg",
- "Neil Diamond": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f4/Neil_Diamond_HWOF_Aug_2012_other_%28levels_adjusted_and_cropped%29.jpg/220px-Neil_Diamond_HWOF_Aug_2012_other_%28levels_adjusted_and_cropped%29.jpg",
- "Nicki Minaj": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Nicki_Minaj_MTV_VMAs_4.jpg/250px-Nicki_Minaj_MTV_VMAs_4.jpg",
- "Olivia Newton-John": "https://upload.wikimedia.org/wikipedia/commons/thumb/c/c7/Olivia_Newton-John_2.jpg/220px-Olivia_Newton-John_2.jpg",
- "Paul McCartney": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5d/Paul_McCartney_-_Out_There_Concert_-_140420-5941-jikatu_%2813950091384%29.jpg/220px-Paul_McCartney_-_Out_There_Concert_-_140420-5941-jikatu_%2813950091384%29.jpg",
- "Phil Collins": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/1_collins.jpg/220px-1_collins.jpg",
- "Pink": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/1a/P%21nk_Live_2013.jpg/220px-P%21nk_Live_2013.jpg",
- "Prince": "https://upload.wikimedia.org/wikipedia/commons/thumb/b/b2/Prince_1983_1st_Avenue.jpg/220px-Prince_1983_1st_Avenue.jpg",
- "Reba McEntire": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/e0/Reba_McEntire_by_Gage_Skidmore.jpg/220px-Reba_McEntire_by_Gage_Skidmore.jpg",
- "Rihanna": "https://upload.wikimedia.org/wikipedia/commons/thumb/4/47/Rihanna_concert_in_Washington_DC_%282%29.jpg/250px-Rihanna_concert_in_Washington_DC_%282%29.jpg",
- "Robbie Williams": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/21/Robbie_Williams.jpg/220px-Robbie_Williams.jpg",
- "Rod Stewart": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/57/Rod_stewart_05111976_12_400.jpg/220px-Rod_stewart_05111976_12_400.jpg",
- "Carlos Santana": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Santana_2010.jpg/220px-Santana_2010.jpg",
- "Shania Twain": "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ee/ShaniaTwainJunoAwardsMar2011.jpg/220px-ShaniaTwainJunoAwardsMar2011.jpg",
- "Stevie Wonder": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/54/Stevie_Wonder_1973.JPG/220px-Stevie_Wonder_1973.JPG",
- "Tak Matsumoto": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/da/B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_22.jpg/220px-B%27Z_at_Best_Buy_Theater_NYC_-_9-30-12_-_22.jpg",
- "Taylor Swift": "https://upload.wikimedia.org/wikipedia/commons/thumb/2/25/Taylor_Swift_112_%2818119055110%29_%28cropped%29.jpg/220px-Taylor_Swift_112_%2818119055110%29_%28cropped%29.jpg",
- "Tim McGraw": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5f/Tim_McGraw_October_24_2015.jpg/220px-Tim_McGraw_October_24_2015.jpg",
- "Tina Turner": "https://upload.wikimedia.org/wikipedia/commons/thumb/1/10/Tina_turner_21021985_01_350.jpg/250px-Tina_turner_21021985_01_350.jpg",
- "Tom Petty": "https://upload.wikimedia.org/wikipedia/commons/thumb/5/5a/Tom_Petty_Live_in_Horsens_%28cropped2%29.jpg/220px-Tom_Petty_Live_in_Horsens_%28cropped2%29.jpg",
- "Tupac Shakur": "https://upload.wikimedia.org/wikipedia/en/thumb/b/b5/Tupac_Amaru_Shakur2.jpg/220px-Tupac_Amaru_Shakur2.jpg",
- "Usher": "https://upload.wikimedia.org/wikipedia/commons/thumb/f/fa/Usher_Cannes_2016_retusche.jpg/220px-Usher_Cannes_2016_retusche.jpg",
- "Whitney Houston": "https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Whitney_Houston_Welcome_Home_Heroes_1_cropped.jpg/220px-Whitney_Houston_Welcome_Home_Heroes_1_cropped.jpg",
- "Wolfgang Van Halen": "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0c/Wolfgang_Van_Halen_Different_Kind_of_Truth_2012.jpg/220px-Wolfgang_Van_Halen_Different_Kind_of_Truth_2012.jpg"
-}
+[
+ "Adele",
+ "Aerosmith",
+ "Aretha Franklin",
+ "Ayumi Hamasaki",
+ "B'z",
+ "Barbra Streisand",
+ "Barry Manilow",
+ "Barry White",
+ "Beyonce",
+ "Billy Joel",
+ "Bob Dylan",
+ "Bob Marley",
+ "Bob Seger",
+ "Bon Jovi",
+ "Britney Spears",
+ "Bruce Springsteen",
+ "Bruno Mars",
+ "Bryan Adams",
+ "Celine Dion",
+ "Cher",
+ "Christina Aguilera",
+ "David Bowie",
+ "Donna Summer",
+ "Drake",
+ "Ed Sheeran",
+ "Elton John",
+ "Elvis Presley",
+ "Eminem",
+ "Enya",
+ "Flo Rida",
+ "Frank Sinatra",
+ "Garth Brooks",
+ "George Michael",
+ "George Strait",
+ "James Taylor",
+ "Janet Jackson",
+ "Jay-Z",
+ "Johnny Cash",
+ "Johnny Hallyday",
+ "Julio Iglesias",
+ "Justin Bieber",
+ "Justin Timberlake",
+ "Kanye West",
+ "Katy Perry",
+ "Kenny G",
+ "Kenny Rogers",
+ "Lady Gaga",
+ "Lil Wayne",
+ "Linda Ronstadt",
+ "Lionel Richie",
+ "Madonna",
+ "Mariah Carey",
+ "Meat Loaf",
+ "Michael Jackson",
+ "Neil Diamond",
+ "Nicki Minaj",
+ "Olivia Newton-John",
+ "Paul McCartney",
+ "Phil Collins",
+ "Pink",
+ "Prince",
+ "Reba McEntire",
+ "Rihanna",
+ "Robbie Williams",
+ "Rod Stewart",
+ "Santana",
+ "Shania Twain",
+ "Stevie Wonder",
+ "Taylor Swift",
+ "Tim McGraw",
+ "Tina Turner",
+ "Tom Petty",
+ "Tupac Shakur",
+ "Usher",
+ "Van Halen",
+ "Whitney Houston"
+]
diff --git a/bot/utils/checks.py b/bot/utils/checks.py
index 19f64ff9f..ad892e512 100644
--- a/bot/utils/checks.py
+++ b/bot/utils/checks.py
@@ -1,6 +1,8 @@
+import datetime
import logging
+from typing import Callable, Iterable
-from discord.ext.commands import Context
+from discord.ext.commands import BucketType, Cog, Command, CommandOnCooldown, Context, Cooldown, CooldownMapping
log = logging.getLogger(__name__)
@@ -42,3 +44,47 @@ def in_channel_check(ctx: Context, channel_id: int) -> bool:
log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. "
f"The result of the in_channel check was {check}.")
return check
+
+
+def cooldown_with_role_bypass(rate: int, per: float, type: BucketType = BucketType.default, *,
+ bypass_roles: Iterable[int]) -> Callable:
+ """
+ Applies a cooldown to a command, but allows members with certain roles to be ignored.
+
+ NOTE: this replaces the `Command.before_invoke` callback, which *might* introduce problems in the future.
+ """
+ # make it a set so lookup is hash based
+ bypass = set(bypass_roles)
+
+ # this handles the actual cooldown logic
+ buckets = CooldownMapping(Cooldown(rate, per, type))
+
+ # will be called after the command has been parse but before it has been invoked, ensures that
+ # the cooldown won't be updated if the user screws up their input to the command
+ async def predicate(cog: Cog, ctx: Context) -> None:
+ nonlocal bypass, buckets
+
+ if any(role.id in bypass for role in ctx.author.roles):
+ return
+
+ # cooldown logic, taken from discord.py internals
+ current = ctx.message.created_at.replace(tzinfo=datetime.timezone.utc).timestamp()
+ bucket = buckets.get_bucket(ctx.message)
+ retry_after = bucket.update_rate_limit(current)
+ if retry_after:
+ raise CommandOnCooldown(bucket, retry_after)
+
+ def wrapper(command: Command) -> Command:
+ # NOTE: this could be changed if a subclass of Command were to be used. I didn't see the need for it
+ # so I just made it raise an error when the decorator is applied before the actual command object exists.
+ #
+ # if the `before_invoke` detail is ever a problem then I can quickly just swap over.
+ if not isinstance(command, Command):
+ raise TypeError('Decorator `cooldown_with_role_bypass` must be applied after the command decorator. '
+ 'This means it has to be above the command decorator in the code.')
+
+ command._before_invoke = predicate
+
+ return command
+
+ return wrapper
diff --git a/bot/utils/moderation.py b/bot/utils/moderation.py
deleted file mode 100644
index 7860f14a1..000000000
--- a/bot/utils/moderation.py
+++ /dev/null
@@ -1,72 +0,0 @@
-import logging
-from datetime import datetime
-from typing import Optional, Union
-
-from discord import Member, Object, User
-from discord.ext.commands import Context
-
-from bot.api import ResponseCodeError
-from bot.constants import Keys
-
-log = logging.getLogger(__name__)
-
-HEADERS = {"X-API-KEY": Keys.site_api}
-
-
-async def post_infraction(
- ctx: Context,
- user: Union[Member, Object, User],
- type: str,
- reason: str,
- expires_at: datetime = None,
- hidden: bool = False,
- active: bool = True,
-) -> Optional[dict]:
- """Posts an infraction to the API."""
- payload = {
- "actor": ctx.message.author.id,
- "hidden": hidden,
- "reason": reason,
- "type": type,
- "user": user.id,
- "active": active
- }
- if expires_at:
- payload['expires_at'] = expires_at.isoformat()
-
- try:
- response = await ctx.bot.api_client.post('bot/infractions', json=payload)
- except ResponseCodeError as exp:
- if exp.status == 400 and 'user' in exp.response_json:
- log.info(
- f"{ctx.author} tried to add a {type} infraction to `{user.id}`, "
- "but that user id was not found in the database."
- )
- await ctx.send(f":x: Cannot add infraction, the specified user is not known to the database.")
- return
- else:
- log.exception("An unexpected ResponseCodeError occurred while adding an infraction:")
- await ctx.send(":x: There was an error adding the infraction.")
- return
-
- return response
-
-
-async def already_has_active_infraction(ctx: Context, user: Union[Member, Object, User], type: str) -> bool:
- """Checks if a user already has an active infraction of the given type."""
- active_infractions = await ctx.bot.api_client.get(
- 'bot/infractions',
- params={
- 'active': 'true',
- 'type': type,
- 'user__id': str(user.id)
- }
- )
- if active_infractions:
- await ctx.send(
- f":x: According to my records, this user already has a {type} infraction. "
- f"See infraction **#{active_infractions[0]['id']}**."
- )
- return True
- else:
- return False
diff --git a/bot/utils/time.py b/bot/utils/time.py
index da28f2c76..2aea2c099 100644
--- a/bot/utils/time.py
+++ b/bot/utils/time.py
@@ -1,5 +1,6 @@
import asyncio
import datetime
+from typing import Optional
import dateutil.parser
from dateutil.relativedelta import relativedelta
@@ -34,6 +35,9 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units:
precision specifies the smallest unit of time to include (e.g. "seconds", "minutes").
max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours).
"""
+ if max_units <= 0:
+ raise ValueError("max_units must be positive")
+
units = (
("years", delta.years),
("months", delta.months),
@@ -83,15 +87,20 @@ def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max
return f"{humanized} ago"
-def parse_rfc1123(time_str: str) -> datetime.datetime:
+def parse_rfc1123(stamp: str) -> datetime.datetime:
"""Parse RFC1123 time string into datetime."""
- return datetime.datetime.strptime(time_str, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc)
+ return datetime.datetime.strptime(stamp, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc)
# Hey, this could actually be used in the off_topic_names and reddit cogs :)
-async def wait_until(time: datetime.datetime) -> None:
- """Wait until a given time."""
- delay = time - datetime.datetime.utcnow()
+async def wait_until(time: datetime.datetime, start: Optional[datetime.datetime] = None) -> None:
+ """
+ Wait until a given time.
+
+ :param time: A datetime.datetime object to wait until.
+ :param start: The start from which to calculate the waiting duration. Defaults to UTC time.
+ """
+ delay = time - (start or datetime.datetime.utcnow())
delay_seconds = delay.total_seconds()
# Incorporate a small delay so we don't rapid-fire the event due to time precision errors
diff --git a/config-default.yml b/config-default.yml
index 827ae4619..ca405337e 100644
--- a/config-default.yml
+++ b/config-default.yml
@@ -282,7 +282,7 @@ anti_spam:
rules:
attachments:
interval: 10
- max: 3
+ max: 9
burst:
interval: 10
diff --git a/docker-compose.yml b/docker-compose.yml
index 9684a3c62..f79fdba58 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -6,7 +6,7 @@ version: "3.7"
services:
postgres:
- image: postgres:11-alpine
+ image: postgres:12-alpine
environment:
POSTGRES_DB: pysite
POSTGRES_PASSWORD: pysite
diff --git a/tests/helpers.py b/tests/helpers.py
index 2908294f7..25059fa3a 100644
--- a/tests/helpers.py
+++ b/tests/helpers.py
@@ -7,6 +7,10 @@ __all__ = ('AsyncMock', 'async_test')
# TODO: Remove me on 3.8
+# Allows you to mock a coroutine. Since the default `__call__` of `MagicMock`
+# is not a coroutine, trying to mock a coroutine with it will result in errors
+# as the default `__call__` is not awaitable. Use this class for monkeypatching
+# coroutines instead.
class AsyncMock(MagicMock):
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)
diff --git a/tests/test_resources.py b/tests/test_resources.py
index 2b17aea64..bcf124f05 100644
--- a/tests/test_resources.py
+++ b/tests/test_resources.py
@@ -1,18 +1,13 @@
import json
-import mimetypes
from pathlib import Path
-from urllib.parse import urlparse
def test_stars_valid():
- """Validates that `bot/resources/stars.json` contains valid images."""
+ """Validates that `bot/resources/stars.json` contains a list of strings."""
path = Path('bot', 'resources', 'stars.json')
content = path.read_text()
data = json.loads(content)
- for url in data.values():
- assert urlparse(url).scheme == 'https'
-
- mimetype, _ = mimetypes.guess_type(url)
- assert mimetype in ('image/jpeg', 'image/png')
+ for name in data:
+ assert type(name) is str
diff --git a/tests/utils/test_time.py b/tests/utils/test_time.py
new file mode 100644
index 000000000..4baa6395c
--- /dev/null
+++ b/tests/utils/test_time.py
@@ -0,0 +1,62 @@
+import asyncio
+from datetime import datetime, timezone
+from unittest.mock import patch
+
+import pytest
+from dateutil.relativedelta import relativedelta
+
+from bot.utils import time
+from tests.helpers import AsyncMock
+
+
+ ('delta', 'precision', 'max_units', 'expected'),
+ (
+ (relativedelta(days=2), 'seconds', 1, '2 days'),
+ (relativedelta(days=2, hours=2), 'seconds', 2, '2 days and 2 hours'),
+ (relativedelta(days=2, hours=2), 'seconds', 1, '2 days'),
+ (relativedelta(days=2, hours=2), 'days', 2, '2 days'),
+
+ # Does not abort for unknown units, as the unit name is checked
+ # against the attribute of the relativedelta instance.
+ (relativedelta(days=2, hours=2), 'elephants', 2, '2 days and 2 hours'),
+
+ # Very high maximum units, but it only ever iterates over
+ # each value the relativedelta might have.
+ (relativedelta(days=2, hours=2), 'hours', 20, '2 days and 2 hours'),
+ )
+)
+def test_humanize_delta(
+ delta: relativedelta,
+ precision: str,
+ max_units: int,
+ expected: str
+):
+ assert time.humanize_delta(delta, precision, max_units) == expected
+
+
[email protected]('max_units', (-1, 0))
+def test_humanize_delta_raises_for_invalid_max_units(max_units: int):
+ with pytest.raises(ValueError, match='max_units must be positive'):
+ time.humanize_delta(relativedelta(days=2, hours=2), 'hours', max_units)
+
+
+ ('stamp', 'expected'),
+ (
+ ('Sun, 15 Sep 2019 12:00:00 GMT', datetime(2019, 9, 15, 12, 0, 0, tzinfo=timezone.utc)),
+ )
+)
+def test_parse_rfc1123(stamp: str, expected: str):
+ assert time.parse_rfc1123(stamp) == expected
+
+
+@patch('asyncio.sleep', new_callable=AsyncMock)
+def test_wait_until(sleep_patch):
+ start = datetime(2019, 1, 1, 0, 0)
+ then = datetime(2019, 1, 1, 0, 10)
+
+ # No return value
+ assert asyncio.run(time.wait_until(then, start)) is None
+
+ sleep_patch.assert_called_once_with(10 * 60)