From 290a2cbb762944cc3ebb00f88a71da09efb0f6c5 Mon Sep 17 00:00:00 2001 From: MarkKoz Date: Mon, 30 Sep 2019 17:58:44 -0700 Subject: Create a moderation sub-package for moderation-related cogs * Rename Infractions cog to ModManagement * Rename Moderation cog to Infractions * Rename infractions.py to management.py * Rename moderation.py to infractions.py * Move moderation utils to sub-package and rename to utils.py * Move Modlog, Infractions, and ModManagement to sub-package * Use sub-package as an extension that loads aforementioned cogs --- bot/__main__.py | 2 - bot/cogs/antispam.py | 2 +- bot/cogs/clean.py | 2 +- bot/cogs/defcon.py | 2 +- bot/cogs/filtering.py | 2 +- bot/cogs/infractions.py | 264 ------------ bot/cogs/moderation.py | 562 ------------------------ bot/cogs/moderation/__init__.py | 21 + bot/cogs/moderation/infractions.py | 562 ++++++++++++++++++++++++ bot/cogs/moderation/management.py | 263 +++++++++++ bot/cogs/moderation/modlog.py | 768 +++++++++++++++++++++++++++++++++ bot/cogs/moderation/utils.py | 87 ++++ bot/cogs/modlog.py | 768 --------------------------------- bot/cogs/superstarify/__init__.py | 15 +- bot/cogs/token_remover.py | 2 +- bot/cogs/verification.py | 2 +- bot/cogs/watchchannels/bigbrother.py | 2 +- bot/cogs/watchchannels/watchchannel.py | 2 +- bot/utils/moderation.py | 87 ---- 19 files changed, 1716 insertions(+), 1699 deletions(-) delete mode 100644 bot/cogs/infractions.py delete mode 100644 bot/cogs/moderation.py create mode 100644 bot/cogs/moderation/__init__.py create mode 100644 bot/cogs/moderation/infractions.py create mode 100644 bot/cogs/moderation/management.py create mode 100644 bot/cogs/moderation/modlog.py create mode 100644 bot/cogs/moderation/utils.py delete mode 100644 bot/cogs/modlog.py delete mode 100644 bot/utils/moderation.py diff --git a/bot/__main__.py b/bot/__main__.py index 019550a89..7d8cf6d6d 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -36,7 +36,6 @@ log.addHandler(APILoggingHandler(bot.api_client)) bot.load_extension("bot.cogs.error_handler") bot.load_extension("bot.cogs.filtering") bot.load_extension("bot.cogs.logging") -bot.load_extension("bot.cogs.modlog") bot.load_extension("bot.cogs.security") # Commands, etc @@ -57,7 +56,6 @@ bot.load_extension("bot.cogs.defcon") bot.load_extension("bot.cogs.eval") bot.load_extension("bot.cogs.free") bot.load_extension("bot.cogs.information") -bot.load_extension("bot.cogs.infractions") bot.load_extension("bot.cogs.jams") bot.load_extension("bot.cogs.moderation") bot.load_extension("bot.cogs.off_topic_names") diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py index 8dfa0ad05..cd1940aaa 100644 --- a/bot/cogs/antispam.py +++ b/bot/cogs/antispam.py @@ -10,7 +10,7 @@ from discord import Colour, Member, Message, NotFound, Object, TextChannel from discord.ext.commands import Bot, Cog from bot import rules -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import ( AntiSpam as AntiSpamConfig, Channels, Colours, DEBUG_MODE, Event, Filter, diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py index 1c0c9a7a8..dca411d01 100644 --- a/bot/cogs/clean.py +++ b/bot/cogs/clean.py @@ -6,7 +6,7 @@ from typing import Optional from discord import Colour, Embed, Message, User from discord.ext.commands import Bot, Cog, Context, group -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import ( Channels, CleanMessages, Colours, Event, Icons, MODERATION_ROLES, NEGATIVE_REPLIES diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py index 048d8a683..ae0332688 100644 --- a/bot/cogs/defcon.py +++ b/bot/cogs/defcon.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta from discord import Colour, Embed, Member from discord.ext.commands import Bot, Cog, Context, group -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import Channels, Colours, Emojis, Event, Icons, Roles from bot.decorators import with_role diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py index bd8c6ed67..265ae5160 100644 --- a/bot/cogs/filtering.py +++ b/bot/cogs/filtering.py @@ -7,7 +7,7 @@ from dateutil.relativedelta import relativedelta from discord import Colour, DMChannel, Member, Message, TextChannel from discord.ext.commands import Bot, Cog -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import ( Channels, Colours, DEBUG_MODE, Filter, Icons, URLs diff --git a/bot/cogs/infractions.py b/bot/cogs/infractions.py deleted file mode 100644 index 709a42b6c..000000000 --- a/bot/cogs/infractions.py +++ /dev/null @@ -1,264 +0,0 @@ -import asyncio -import logging -import textwrap -import typing as t - -import discord -from discord.ext import commands -from discord.ext.commands import Context - -from bot import constants -from bot.cogs.moderation import Moderation -from bot.cogs.modlog import ModLog -from bot.converters import Duration, InfractionSearchQuery -from bot.pagination import LinePaginator -from bot.utils import time -from bot.utils.checks import with_role_check -from bot.utils.moderation import Infraction, proxy_user - -log = logging.getLogger(__name__) - -UserConverter = t.Union[discord.User, proxy_user] - - -def permanent_duration(expires_at: str) -> str: - """Only allow an expiration to be 'permanent' if it is a string.""" - expires_at = expires_at.lower() - if expires_at != "permanent": - raise commands.BadArgument - else: - return expires_at - - -class Infractions(commands.Cog): - """Management of infractions.""" - - def __init__(self, bot: commands.Bot): - self.bot = bot - - @property - def mod_log(self) -> ModLog: - """Get currently loaded ModLog cog instance.""" - return self.bot.get_cog("ModLog") - - @property - def mod_cog(self) -> Moderation: - """Get currently loaded Moderation cog instance.""" - return self.bot.get_cog("Moderation") - - # region: Edit infraction commands - - @commands.group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) - async def infraction_group(self, ctx: Context) -> None: - """Infraction manipulation commands.""" - await ctx.invoke(self.bot.get_command("help"), "infraction") - - @infraction_group.command(name='edit') - async def infraction_edit( - self, - ctx: Context, - infraction_id: int, - expires_at: t.Union[Duration, permanent_duration, None], - *, - reason: str = None - ) -> None: - """ - Edit the duration and/or the reason of an infraction. - - Durations are relative to the time of updating and should be appended with a unit of time: - y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) - - Use "permanent" to mark the infraction as permanent. - """ - if expires_at is None and reason is None: - # Unlike UserInputError, the error handler will show a specified message for BadArgument - raise commands.BadArgument("Neither a new expiry nor a new reason was specified.") - - # Retrieve the previous infraction for its information. - old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}') - - request_data = {} - confirm_messages = [] - log_text = "" - - if expires_at == "permanent": - request_data['expires_at'] = None - confirm_messages.append("marked as permanent") - elif expires_at is not None: - request_data['expires_at'] = expires_at.isoformat() - expiry = expires_at.strftime(time.INFRACTION_FORMAT) - confirm_messages.append(f"set to expire on {expiry}") - else: - confirm_messages.append("expiry unchanged") - - if reason: - request_data['reason'] = reason - confirm_messages.append("set a new reason") - log_text += f""" - Previous reason: {old_infraction['reason']} - New reason: {reason} - """.rstrip() - else: - confirm_messages.append("reason unchanged") - - # Update the infraction - new_infraction = await self.bot.api_client.patch( - f'bot/infractions/{infraction_id}', - json=request_data, - ) - - # Re-schedule infraction if the expiration has been updated - if 'expires_at' in request_data: - self.mod_cog.cancel_task(new_infraction['id']) - loop = asyncio.get_event_loop() - self.mod_cog.schedule_task(loop, new_infraction['id'], new_infraction) - - log_text += f""" - Previous expiry: {old_infraction['expires_at'] or "Permanent"} - New expiry: {new_infraction['expires_at'] or "Permanent"} - """.rstrip() - - await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}") - - # Get information about the infraction's user - user_id = new_infraction['user'] - user = ctx.guild.get_member(user_id) - - if user: - user_text = f"{user.mention} (`{user.id}`)" - thumbnail = user.avatar_url_as(static_format="png") - else: - user_text = f"`{user_id}`" - thumbnail = None - - # The infraction's actor - actor_id = new_infraction['actor'] - actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" - - await self.mod_log.send_log_message( - icon_url=constants.Icons.pencil, - colour=discord.Colour.blurple(), - title="Infraction edited", - thumbnail=thumbnail, - text=textwrap.dedent(f""" - Member: {user_text} - Actor: {actor} - Edited by: {ctx.message.author}{log_text} - """) - ) - - # endregion - # region: Search infractions - - @infraction_group.group(name="search", invoke_without_command=True) - async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: - """Searches for infractions in the database.""" - if isinstance(query, discord.User): - await ctx.invoke(self.search_user, query) - else: - await ctx.invoke(self.search_reason, query) - - @infraction_search_group.command(name="user", aliases=("member", "id")) - async def search_user(self, ctx: Context, user: UserConverter) -> None: - """Search for infractions by member.""" - infraction_list = await self.bot.api_client.get( - 'bot/infractions', - params={'user__id': str(user.id)} - ) - embed = discord.Embed( - title=f"Infractions for {user} ({len(infraction_list)} total)", - colour=discord.Colour.orange() - ) - await self.send_infraction_list(ctx, embed, infraction_list) - - @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) - async def search_reason(self, ctx: Context, reason: str) -> None: - """Search for infractions by their reason. Use Re2 for matching.""" - infraction_list = await self.bot.api_client.get( - 'bot/infractions', - params={'search': reason} - ) - embed = discord.Embed( - title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", - colour=discord.Colour.orange() - ) - await self.send_infraction_list(ctx, embed, infraction_list) - - # endregion - # region: Utility functions - - async def send_infraction_list( - self, - ctx: Context, - embed: discord.Embed, - infractions: t.Iterable[Infraction] - ) -> None: - """Send a paginated embed of infractions for the specified user.""" - if not infractions: - await ctx.send(f":warning: No infractions could be found for that query.") - return - - lines = tuple( - self.infraction_to_string(infraction) - for infraction in infractions - ) - - await LinePaginator.paginate( - lines, - ctx=ctx, - embed=embed, - empty=True, - max_lines=3, - max_size=1000 - ) - - def infraction_to_string(self, infraction_object: Infraction) -> str: - """Convert the infraction object to a string representation.""" - actor_id = infraction_object["actor"] - guild = self.bot.get_guild(constants.Guild.id) - actor = guild.get_member(actor_id) - active = infraction_object["active"] - user_id = infraction_object["user"] - hidden = infraction_object["hidden"] - created = time.format_infraction(infraction_object["inserted_at"]) - if infraction_object["expires_at"] is None: - expires = "*Permanent*" - else: - expires = time.format_infraction(infraction_object["expires_at"]) - - lines = textwrap.dedent(f""" - {"**===============**" if active else "==============="} - Status: {"__**Active**__" if active else "Inactive"} - User: {self.bot.get_user(user_id)} (`{user_id}`) - Type: **{infraction_object["type"]}** - Shadow: {hidden} - Reason: {infraction_object["reason"] or "*None*"} - Created: {created} - Expires: {expires} - Actor: {actor.mention if actor else actor_id} - ID: `{infraction_object["id"]}` - {"**===============**" if active else "==============="} - """) - - return lines.strip() - - # endregion - - # This cannot be static (must have a __func__ attribute). - def cog_check(self, ctx: Context) -> bool: - """Only allow moderators to invoke the commands in this cog.""" - return with_role_check(ctx, *constants.MODERATION_ROLES) - - # This cannot be static (must have a __func__ attribute). - async def cog_command_error(self, ctx: Context, error: Exception) -> None: - """Send a notification to the invoking context on a Union failure.""" - if isinstance(error, commands.BadUnionArgument): - if discord.User in error.converters: - await ctx.send(str(error.errors[0])) - error.handled = True - - -def setup(bot: commands.Bot) -> None: - """Load the Infractions cog.""" - bot.add_cog(Infractions(bot)) - log.info("Cog loaded: Infractions") diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py deleted file mode 100644 index 15eee397d..000000000 --- a/bot/cogs/moderation.py +++ /dev/null @@ -1,562 +0,0 @@ -import logging -import textwrap -from datetime import datetime -from typing import Awaitable, Optional, Union - -from discord import ( - Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User -) -from discord.ext.commands import BadUnionArgument, Bot, Cog, Context, command - -from bot import constants -from bot.cogs.modlog import ModLog -from bot.constants import Colours, Event, Icons -from bot.converters import Duration -from bot.decorators import respect_role_hierarchy -from bot.utils.checks import with_role_check -from bot.utils.moderation import ( - Infraction, MemberObject, already_has_active_infraction, post_infraction, proxy_user -) -from bot.utils.scheduling import Scheduler -from bot.utils.time import format_infraction, wait_until - -log = logging.getLogger(__name__) - -INFRACTION_ICONS = { - "mute": Icons.user_mute, - "kick": Icons.sign_out, - "ban": Icons.user_ban, - "warning": Icons.user_warn, - "note": Icons.user_warn, -} -RULES_URL = "https://pythondiscord.com/pages/rules" -APPEALABLE_INFRACTIONS = ("ban", "mute") - - -MemberConverter = Union[Member, User, proxy_user] - - -class Moderation(Scheduler, Cog): - """Server moderation tools.""" - - def __init__(self, bot: Bot): - self.bot = bot - self._muted_role = Object(constants.Roles.muted) - super().__init__() - - @property - def mod_log(self) -> ModLog: - """Get currently loaded ModLog cog instance.""" - return self.bot.get_cog("ModLog") - - @Cog.listener() - async def on_ready(self) -> None: - """Schedule expiration for previous infractions.""" - # Schedule expiration for previous infractions - infractions = await self.bot.api_client.get( - 'bot/infractions', params={'active': 'true'} - ) - for infraction in infractions: - if infraction["expires_at"] is not None: - self.schedule_task(self.bot.loop, infraction["id"], infraction) - - # region: Permanent infractions - - @command() - async def warn(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: - """Warn a user for the given reason.""" - infraction = await post_infraction(ctx, user, reason, "warning") - if infraction is None: - return - - await self.apply_infraction(ctx, infraction, user) - - @command() - async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: - """Kick a user for the given reason.""" - await self.apply_kick(ctx, user, reason) - - @command() - async def ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: - """Permanently ban a user for the given reason.""" - await self.apply_ban(ctx, user, reason) - - # endregion - # region: Temporary infractions - - @command(aliases=('mute',)) - async def tempmute(self, ctx: Context, user: Member, duration: Duration, *, reason: str = None) -> None: - """ - Temporarily mute a user for the given reason and duration. - - A unit of time should be appended to the duration: - y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) - """ - await self.apply_mute(ctx, user, reason, expires_at=duration) - - @command() - async def tempban(self, ctx: Context, user: MemberConverter, duration: Duration, *, reason: str = None) -> None: - """ - Temporarily ban a user for the given reason and duration. - - A unit of time should be appended to the duration: - y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) - """ - await self.apply_ban(ctx, user, reason, expires_at=duration) - - # endregion - # region: Permanent shadow infractions - - @command(hidden=True) - async def note(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: - """Create a private note for a user with the given reason without notifying the user.""" - infraction = await post_infraction(ctx, user, reason, "note", hidden=True) - if infraction is None: - return - - await self.apply_infraction(ctx, infraction, user) - - @command(hidden=True, aliases=['shadowkick', 'skick']) - async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: - """Kick a user for the given reason without notifying the user.""" - await self.apply_kick(ctx, user, reason, hidden=True) - - @command(hidden=True, aliases=['shadowban', 'sban']) - async def shadow_ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: - """Permanently ban a user for the given reason without notifying the user.""" - await self.apply_ban(ctx, user, reason, hidden=True) - - # endregion - # region: Temporary shadow infractions - - @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"]) - async def shadow_tempmute( - self, ctx: Context, user: Member, duration: Duration, *, reason: str = None - ) -> None: - """ - Temporarily mute a user for the given reason and duration without notifying the user. - - A unit of time should be appended to the duration: - y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) - """ - await self.apply_mute(ctx, user, reason, expires_at=duration, hidden=True) - - @command(hidden=True, aliases=["shadowtempban, stempban"]) - async def shadow_tempban( - self, ctx: Context, user: MemberConverter, duration: Duration, *, reason: str = None - ) -> None: - """ - Temporarily ban a user for the given reason and duration without notifying the user. - - A unit of time should be appended to the duration: - y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) - """ - await self.apply_ban(ctx, user, reason, expires_at=duration, hidden=True) - - # endregion - # region: Remove infractions (un- commands) - - @command() - async def unmute(self, ctx: Context, user: MemberConverter) -> None: - """Deactivates the active mute infraction for a user.""" - try: - # check the current active infraction - response = await self.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': 'mute', - 'user__id': user.id - } - ) - if len(response) > 1: - log.warning("Found more than one active mute infraction for user `%d`", user.id) - - if not response: - # no active infraction - await ctx.send( - f":x: There is no active mute infraction for user {user.mention}." - ) - return - - for infraction in response: - await self._deactivate_infraction(infraction) - if infraction["expires_at"] is not None: - self.cancel_expiration(infraction["id"]) - - notified = await self.notify_pardon( - user=user, - title="You have been unmuted.", - content="You may now send messages in the server.", - icon_url=Icons.user_unmute - ) - - if notified: - dm_status = "Sent" - dm_emoji = ":incoming_envelope: " - log_content = None - else: - dm_status = "**Failed**" - dm_emoji = "" - log_content = ctx.author.mention - - await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.") - - embed_text = textwrap.dedent( - f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - DM: {dm_status} - """ - ) - - if len(response) > 1: - footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" - title = "Member unmuted" - embed_text += "Note: User had multiple **active** mute infractions in the database." - else: - infraction = response[0] - footer = f"Infraction ID: {infraction['id']}" - title = "Member unmuted" - - # Send a log message to the mod log - await self.mod_log.send_log_message( - icon_url=Icons.user_unmute, - colour=Colour(Colours.soft_green), - title=title, - thumbnail=user.avatar_url_as(static_format="png"), - text=embed_text, - footer=footer, - content=log_content - ) - except Exception: - log.exception("There was an error removing an infraction.") - await ctx.send(":x: There was an error removing the infraction.") - - @command() - async def unban(self, ctx: Context, user: MemberConverter) -> None: - """Deactivates the active ban infraction for a user.""" - try: - # check the current active infraction - response = await self.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': 'ban', - 'user__id': str(user.id) - } - ) - if len(response) > 1: - log.warning( - "More than one active ban infraction found for user `%d`.", - user.id - ) - - if not response: - # no active infraction - await ctx.send( - f":x: There is no active ban infraction for user {user.mention}." - ) - return - - for infraction in response: - await self._deactivate_infraction(infraction) - if infraction["expires_at"] is not None: - self.cancel_expiration(infraction["id"]) - - embed_text = textwrap.dedent( - f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author} - """ - ) - - if len(response) > 1: - footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" - embed_text += "Note: User had multiple **active** ban infractions in the database." - else: - infraction = response[0] - footer = f"Infraction ID: {infraction['id']}" - - await ctx.send(f":ok_hand: Un-banned {user.mention}.") - - # Send a log message to the mod log - await self.mod_log.send_log_message( - icon_url=Icons.user_unban, - colour=Colour(Colours.soft_green), - title="Member unbanned", - thumbnail=user.avatar_url_as(static_format="png"), - text=embed_text, - footer=footer, - ) - except Exception: - log.exception("There was an error removing an infraction.") - await ctx.send(":x: There was an error removing the infraction.") - - # endregion - # region: Base infraction functions - - async def apply_mute(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: - """Apply a mute infraction with kwargs passed to `post_infraction`.""" - if await already_has_active_infraction(ctx, user, "mute"): - return - - infraction = await post_infraction(ctx, user, "mute", reason, **kwargs) - if infraction is None: - return - - self.mod_log.ignore(Event.member_update, user.id) - - action = user.add_roles(self._muted_role, reason=reason) - await self.apply_infraction(ctx, infraction, user, action) - - @respect_role_hierarchy() - async def apply_kick(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: - """Apply a kick infraction with kwargs passed to `post_infraction`.""" - infraction = await post_infraction(ctx, user, type="kick", **kwargs) - if infraction is None: - return - - self.mod_log.ignore(Event.member_remove, user.id) - - action = user.kick(reason=reason) - await self.apply_infraction(ctx, infraction, user, action) - - @respect_role_hierarchy() - async def apply_ban(self, ctx: Context, user: MemberObject, reason: str, **kwargs) -> None: - """Apply a ban infraction with kwargs passed to `post_infraction`.""" - if await already_has_active_infraction(ctx, user, "ban"): - return - - infraction = await post_infraction(ctx, user, reason, "ban", **kwargs) - if infraction is None: - return - - self.mod_log.ignore(Event.member_ban, user.id) - self.mod_log.ignore(Event.member_remove, user.id) - - action = ctx.guild.ban(user, reason=reason, delete_message_days=0) - await self.apply_infraction(ctx, infraction, user, action) - - # endregion - # region: Utility functions - - def cancel_expiration(self, infraction_id: str) -> None: - """Un-schedules a task set to expire a temporary infraction.""" - task = self.scheduled_tasks.get(infraction_id) - if task is None: - log.warning(f"Failed to unschedule {infraction_id}: no task found.") - return - task.cancel() - log.debug(f"Unscheduled {infraction_id}.") - del self.scheduled_tasks[infraction_id] - - async def _scheduled_task(self, infraction_object: Infraction) -> None: - """ - Marks an infraction expired after the delay from time of scheduling to time of expiration. - - At the time of expiration, the infraction is marked as inactive on the website, and the - expiration task is cancelled. The user is then notified via DM. - """ - infraction_id = infraction_object["id"] - - # transform expiration to delay in seconds - expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1]) - await wait_until(expiration_datetime) - - log.debug(f"Marking infraction {infraction_id} as inactive (expired).") - await self._deactivate_infraction(infraction_object) - - self.cancel_task(infraction_object["id"]) - - # Notify the user that they've been unmuted. - user_id = infraction_object["user"] - guild = self.bot.get_guild(constants.Guild.id) - await self.notify_pardon( - user=guild.get_member(user_id), - title="You have been unmuted.", - content="You may now send messages in the server.", - icon_url=Icons.user_unmute - ) - - async def _deactivate_infraction(self, infraction_object: Infraction) -> None: - """ - A co-routine which marks an infraction as inactive on the website. - - This co-routine does not cancel or un-schedule an expiration task. - """ - guild: Guild = self.bot.get_guild(constants.Guild.id) - user_id = infraction_object["user"] - infraction_type = infraction_object["type"] - - if infraction_type == "mute": - member: Member = guild.get_member(user_id) - if member: - # remove the mute role - self.mod_log.ignore(Event.member_update, member.id) - await member.remove_roles(self._muted_role) - else: - log.warning(f"Failed to un-mute user: {user_id} (not found)") - elif infraction_type == "ban": - user: Object = Object(user_id) - try: - await guild.unban(user) - except NotFound: - log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.") - - await self.bot.api_client.patch( - 'bot/infractions/' + str(infraction_object['id']), - json={"active": False} - ) - - async def notify_infraction( - self, - user: MemberObject, - infr_type: str, - expires_at: Optional[str] = None, - reason: Optional[str] = None - ) -> bool: - """ - Attempt to notify a user, via DM, of their fresh infraction. - - Returns a boolean indicator of whether the DM was successful. - """ - embed = Embed( - description=textwrap.dedent(f""" - **Type:** {infr_type.capitalize()} - **Expires:** {expires_at or "N/A"} - **Reason:** {reason or "No reason provided."} - """), - colour=Colour(Colours.soft_red) - ) - - icon_url = INFRACTION_ICONS.get(infr_type, Icons.token_removed) - embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) - embed.title = f"Please review our rules over at {RULES_URL}" - embed.url = RULES_URL - - if infr_type in APPEALABLE_INFRACTIONS: - embed.set_footer(text="To appeal this infraction, send an e-mail to appeals@pythondiscord.com") - - return await self.send_private_embed(user, embed) - - async def notify_pardon( - self, - user: MemberObject, - title: str, - content: str, - icon_url: str = Icons.user_verified - ) -> bool: - """ - Attempt to notify a user, via DM, of their expired infraction. - - Optionally returns a boolean indicator of whether the DM was successful. - """ - embed = Embed( - description=content, - colour=Colour(Colours.soft_green) - ) - - embed.set_author(name=title, icon_url=icon_url) - - return await self.send_private_embed(user, embed) - - async def send_private_embed(self, user: MemberObject, embed: Embed) -> bool: - """ - A helper method for sending an embed to a user's DMs. - - Returns a boolean indicator of DM success. - """ - try: - # sometimes `user` is a `discord.Object`, so let's make it a proper user. - user = await self.bot.fetch_user(user.id) - - await user.send(embed=embed) - return True - except (HTTPException, Forbidden, NotFound): - log.debug( - f"Infraction-related information could not be sent to user {user} ({user.id}). " - "The user either could not be retrieved or probably disabled their DMs." - ) - return False - - async def apply_infraction( - self, - ctx: Context, - infraction: Infraction, - user: MemberObject, - action_coro: Optional[Awaitable] = None - ) -> None: - """Apply an infraction to the user, log the infraction, and optionally notify the user.""" - infr_type = infraction["type"] - icon = INFRACTION_ICONS[infr_type] - reason = infraction["reason"] - expiry = infraction["expires_at"] - - if expiry: - expiry = format_infraction(expiry) - - confirm_msg = f":ok_hand: applied" - expiry_msg = f" until {expiry}" if expiry else " permanently" - dm_result = "" - dm_log_text = "" - expiry_log_text = f"Expires: {expiry}" if expiry else "" - log_title = "applied" - log_content = None - - if not infraction["hidden"]: - if await self.notify_infraction(user, infr_type, expiry, reason): - dm_result = ":incoming_envelope: " - dm_log_text = "\nDM: Sent" - else: - dm_log_text = "\nDM: **Failed**" - log_content = ctx.author.mention - - if action_coro: - try: - await action_coro - if expiry: - self.schedule_task(ctx.bot.loop, infraction["id"], infraction) - except Forbidden: - confirm_msg = f":x: failed to apply" - expiry_msg = "" - log_content = ctx.author.mention - log_title = "failed to apply" - - await ctx.send(f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}.") - - await self.mod_log.send_log_message( - icon_url=icon, - colour=Colour(Colours.soft_red), - title=f"Infraction {log_title}: {infr_type}", - thumbnail=user.avatar_url_as(static_format="png"), - text=textwrap.dedent(f""" - Member: {user.mention} (`{user.id}`) - Actor: {ctx.message.author}{dm_log_text} - Reason: {reason} - {expiry_log_text} - """), - content=log_content, - footer=f"ID {infraction['id']}" - ) - - # endregion - - # This cannot be static (must have a __func__ attribute). - def cog_check(self, ctx: Context) -> bool: - """Only allow moderators to invoke the commands in this cog.""" - return with_role_check(ctx, *constants.MODERATION_ROLES) - - # This cannot be static (must have a __func__ attribute). - async def cog_command_error(self, ctx: Context, error: Exception) -> None: - """Send a notification to the invoking context on a Union failure.""" - if isinstance(error, BadUnionArgument): - if User in error.converters: - await ctx.send(str(error.errors[0])) - error.handled = True - - -def setup(bot: Bot) -> None: - """Moderation cog load.""" - bot.add_cog(Moderation(bot)) - log.info("Cog loaded: Moderation") diff --git a/bot/cogs/moderation/__init__.py b/bot/cogs/moderation/__init__.py new file mode 100644 index 000000000..bf0a14c29 --- /dev/null +++ b/bot/cogs/moderation/__init__.py @@ -0,0 +1,21 @@ +import logging + +from discord.ext.commands import Bot + +from .infractions import Infractions +from .management import ModManagement +from .modlog import ModLog + +log = logging.getLogger(__name__) + + +def setup(bot: Bot) -> None: + """Load the moderation extension with the Infractions, ModManagement, and ModLog cogs.""" + bot.add_cog(Infractions(bot)) + log.info("Cog loaded: Infractions") + + bot.add_cog(ModLog(bot)) + log.info("Cog loaded: ModLog") + + bot.add_cog(ModManagement(bot)) + log.info("Cog loaded: ModManagement") diff --git a/bot/cogs/moderation/infractions.py b/bot/cogs/moderation/infractions.py new file mode 100644 index 000000000..d36f147f7 --- /dev/null +++ b/bot/cogs/moderation/infractions.py @@ -0,0 +1,562 @@ +import logging +import textwrap +from datetime import datetime +from typing import Awaitable, Optional, Union + +from discord import ( + Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User +) +from discord.ext.commands import BadUnionArgument, Bot, Cog, Context, command + +from bot import constants +from bot.cogs.moderation import ModLog +from bot.cogs.moderation.utils import ( + Infraction, MemberObject, already_has_active_infraction, post_infraction, proxy_user +) +from bot.constants import Colours, Event, Icons +from bot.converters import Duration +from bot.decorators import respect_role_hierarchy +from bot.utils.checks import with_role_check +from bot.utils.scheduling import Scheduler +from bot.utils.time import format_infraction, wait_until + +log = logging.getLogger(__name__) + +INFRACTION_ICONS = { + "mute": Icons.user_mute, + "kick": Icons.sign_out, + "ban": Icons.user_ban, + "warning": Icons.user_warn, + "note": Icons.user_warn, +} +RULES_URL = "https://pythondiscord.com/pages/rules" +APPEALABLE_INFRACTIONS = ("ban", "mute") + + +MemberConverter = Union[Member, User, proxy_user] + + +class Infractions(Scheduler, Cog): + """Server moderation tools.""" + + def __init__(self, bot: Bot): + self.bot = bot + self._muted_role = Object(constants.Roles.muted) + super().__init__() + + @property + def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" + return self.bot.get_cog("ModLog") + + @Cog.listener() + async def on_ready(self) -> None: + """Schedule expiration for previous infractions.""" + # Schedule expiration for previous infractions + infractions = await self.bot.api_client.get( + 'bot/infractions', params={'active': 'true'} + ) + for infraction in infractions: + if infraction["expires_at"] is not None: + self.schedule_task(self.bot.loop, infraction["id"], infraction) + + # region: Permanent infractions + + @command() + async def warn(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Warn a user for the given reason.""" + infraction = await post_infraction(ctx, user, reason, "warning") + if infraction is None: + return + + await self.apply_infraction(ctx, infraction, user) + + @command() + async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Kick a user for the given reason.""" + await self.apply_kick(ctx, user, reason) + + @command() + async def ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Permanently ban a user for the given reason.""" + await self.apply_ban(ctx, user, reason) + + # endregion + # region: Temporary infractions + + @command(aliases=('mute',)) + async def tempmute(self, ctx: Context, user: Member, duration: Duration, *, reason: str = None) -> None: + """ + Temporarily mute a user for the given reason and duration. + + A unit of time should be appended to the duration: + y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) + """ + await self.apply_mute(ctx, user, reason, expires_at=duration) + + @command() + async def tempban(self, ctx: Context, user: MemberConverter, duration: Duration, *, reason: str = None) -> None: + """ + Temporarily ban a user for the given reason and duration. + + A unit of time should be appended to the duration: + y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) + """ + await self.apply_ban(ctx, user, reason, expires_at=duration) + + # endregion + # region: Permanent shadow infractions + + @command(hidden=True) + async def note(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Create a private note for a user with the given reason without notifying the user.""" + infraction = await post_infraction(ctx, user, reason, "note", hidden=True) + if infraction is None: + return + + await self.apply_infraction(ctx, infraction, user) + + @command(hidden=True, aliases=['shadowkick', 'skick']) + async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Kick a user for the given reason without notifying the user.""" + await self.apply_kick(ctx, user, reason, hidden=True) + + @command(hidden=True, aliases=['shadowban', 'sban']) + async def shadow_ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None: + """Permanently ban a user for the given reason without notifying the user.""" + await self.apply_ban(ctx, user, reason, hidden=True) + + # endregion + # region: Temporary shadow infractions + + @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"]) + async def shadow_tempmute( + self, ctx: Context, user: Member, duration: Duration, *, reason: str = None + ) -> None: + """ + Temporarily mute a user for the given reason and duration without notifying the user. + + A unit of time should be appended to the duration: + y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) + """ + await self.apply_mute(ctx, user, reason, expires_at=duration, hidden=True) + + @command(hidden=True, aliases=["shadowtempban, stempban"]) + async def shadow_tempban( + self, ctx: Context, user: MemberConverter, duration: Duration, *, reason: str = None + ) -> None: + """ + Temporarily ban a user for the given reason and duration without notifying the user. + + A unit of time should be appended to the duration: + y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) + """ + await self.apply_ban(ctx, user, reason, expires_at=duration, hidden=True) + + # endregion + # region: Remove infractions (un- commands) + + @command() + async def unmute(self, ctx: Context, user: MemberConverter) -> None: + """Deactivates the active mute infraction for a user.""" + try: + # check the current active infraction + response = await self.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': 'mute', + 'user__id': user.id + } + ) + if len(response) > 1: + log.warning("Found more than one active mute infraction for user `%d`", user.id) + + if not response: + # no active infraction + await ctx.send( + f":x: There is no active mute infraction for user {user.mention}." + ) + return + + for infraction in response: + await self._deactivate_infraction(infraction) + if infraction["expires_at"] is not None: + self.cancel_expiration(infraction["id"]) + + notified = await self.notify_pardon( + user=user, + title="You have been unmuted.", + content="You may now send messages in the server.", + icon_url=Icons.user_unmute + ) + + if notified: + dm_status = "Sent" + dm_emoji = ":incoming_envelope: " + log_content = None + else: + dm_status = "**Failed**" + dm_emoji = "" + log_content = ctx.author.mention + + await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.") + + embed_text = textwrap.dedent( + f""" + Member: {user.mention} (`{user.id}`) + Actor: {ctx.message.author} + DM: {dm_status} + """ + ) + + if len(response) > 1: + footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + title = "Member unmuted" + embed_text += "Note: User had multiple **active** mute infractions in the database." + else: + infraction = response[0] + footer = f"Infraction ID: {infraction['id']}" + title = "Member unmuted" + + # Send a log message to the mod log + await self.mod_log.send_log_message( + icon_url=Icons.user_unmute, + colour=Colour(Colours.soft_green), + title=title, + thumbnail=user.avatar_url_as(static_format="png"), + text=embed_text, + footer=footer, + content=log_content + ) + except Exception: + log.exception("There was an error removing an infraction.") + await ctx.send(":x: There was an error removing the infraction.") + + @command() + async def unban(self, ctx: Context, user: MemberConverter) -> None: + """Deactivates the active ban infraction for a user.""" + try: + # check the current active infraction + response = await self.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': 'ban', + 'user__id': str(user.id) + } + ) + if len(response) > 1: + log.warning( + "More than one active ban infraction found for user `%d`.", + user.id + ) + + if not response: + # no active infraction + await ctx.send( + f":x: There is no active ban infraction for user {user.mention}." + ) + return + + for infraction in response: + await self._deactivate_infraction(infraction) + if infraction["expires_at"] is not None: + self.cancel_expiration(infraction["id"]) + + embed_text = textwrap.dedent( + f""" + Member: {user.mention} (`{user.id}`) + Actor: {ctx.message.author} + """ + ) + + if len(response) > 1: + footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}" + embed_text += "Note: User had multiple **active** ban infractions in the database." + else: + infraction = response[0] + footer = f"Infraction ID: {infraction['id']}" + + await ctx.send(f":ok_hand: Un-banned {user.mention}.") + + # Send a log message to the mod log + await self.mod_log.send_log_message( + icon_url=Icons.user_unban, + colour=Colour(Colours.soft_green), + title="Member unbanned", + thumbnail=user.avatar_url_as(static_format="png"), + text=embed_text, + footer=footer, + ) + except Exception: + log.exception("There was an error removing an infraction.") + await ctx.send(":x: There was an error removing the infraction.") + + # endregion + # region: Base infraction functions + + async def apply_mute(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: + """Apply a mute infraction with kwargs passed to `post_infraction`.""" + if await already_has_active_infraction(ctx, user, "mute"): + return + + infraction = await post_infraction(ctx, user, "mute", reason, **kwargs) + if infraction is None: + return + + self.mod_log.ignore(Event.member_update, user.id) + + action = user.add_roles(self._muted_role, reason=reason) + await self.apply_infraction(ctx, infraction, user, action) + + @respect_role_hierarchy() + async def apply_kick(self, ctx: Context, user: Member, reason: str, **kwargs) -> None: + """Apply a kick infraction with kwargs passed to `post_infraction`.""" + infraction = await post_infraction(ctx, user, type="kick", **kwargs) + if infraction is None: + return + + self.mod_log.ignore(Event.member_remove, user.id) + + action = user.kick(reason=reason) + await self.apply_infraction(ctx, infraction, user, action) + + @respect_role_hierarchy() + async def apply_ban(self, ctx: Context, user: MemberObject, reason: str, **kwargs) -> None: + """Apply a ban infraction with kwargs passed to `post_infraction`.""" + if await already_has_active_infraction(ctx, user, "ban"): + return + + infraction = await post_infraction(ctx, user, reason, "ban", **kwargs) + if infraction is None: + return + + self.mod_log.ignore(Event.member_ban, user.id) + self.mod_log.ignore(Event.member_remove, user.id) + + action = ctx.guild.ban(user, reason=reason, delete_message_days=0) + await self.apply_infraction(ctx, infraction, user, action) + + # endregion + # region: Utility functions + + def cancel_expiration(self, infraction_id: str) -> None: + """Un-schedules a task set to expire a temporary infraction.""" + task = self.scheduled_tasks.get(infraction_id) + if task is None: + log.warning(f"Failed to unschedule {infraction_id}: no task found.") + return + task.cancel() + log.debug(f"Unscheduled {infraction_id}.") + del self.scheduled_tasks[infraction_id] + + async def _scheduled_task(self, infraction_object: Infraction) -> None: + """ + Marks an infraction expired after the delay from time of scheduling to time of expiration. + + At the time of expiration, the infraction is marked as inactive on the website, and the + expiration task is cancelled. The user is then notified via DM. + """ + infraction_id = infraction_object["id"] + + # transform expiration to delay in seconds + expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1]) + await wait_until(expiration_datetime) + + log.debug(f"Marking infraction {infraction_id} as inactive (expired).") + await self._deactivate_infraction(infraction_object) + + self.cancel_task(infraction_object["id"]) + + # Notify the user that they've been unmuted. + user_id = infraction_object["user"] + guild = self.bot.get_guild(constants.Guild.id) + await self.notify_pardon( + user=guild.get_member(user_id), + title="You have been unmuted.", + content="You may now send messages in the server.", + icon_url=Icons.user_unmute + ) + + async def _deactivate_infraction(self, infraction_object: Infraction) -> None: + """ + A co-routine which marks an infraction as inactive on the website. + + This co-routine does not cancel or un-schedule an expiration task. + """ + guild: Guild = self.bot.get_guild(constants.Guild.id) + user_id = infraction_object["user"] + infraction_type = infraction_object["type"] + + if infraction_type == "mute": + member: Member = guild.get_member(user_id) + if member: + # remove the mute role + self.mod_log.ignore(Event.member_update, member.id) + await member.remove_roles(self._muted_role) + else: + log.warning(f"Failed to un-mute user: {user_id} (not found)") + elif infraction_type == "ban": + user: Object = Object(user_id) + try: + await guild.unban(user) + except NotFound: + log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.") + + await self.bot.api_client.patch( + 'bot/infractions/' + str(infraction_object['id']), + json={"active": False} + ) + + async def notify_infraction( + self, + user: MemberObject, + infr_type: str, + expires_at: Optional[str] = None, + reason: Optional[str] = None + ) -> bool: + """ + Attempt to notify a user, via DM, of their fresh infraction. + + Returns a boolean indicator of whether the DM was successful. + """ + embed = Embed( + description=textwrap.dedent(f""" + **Type:** {infr_type.capitalize()} + **Expires:** {expires_at or "N/A"} + **Reason:** {reason or "No reason provided."} + """), + colour=Colour(Colours.soft_red) + ) + + icon_url = INFRACTION_ICONS.get(infr_type, Icons.token_removed) + embed.set_author(name="Infraction Information", icon_url=icon_url, url=RULES_URL) + embed.title = f"Please review our rules over at {RULES_URL}" + embed.url = RULES_URL + + if infr_type in APPEALABLE_INFRACTIONS: + embed.set_footer(text="To appeal this infraction, send an e-mail to appeals@pythondiscord.com") + + return await self.send_private_embed(user, embed) + + async def notify_pardon( + self, + user: MemberObject, + title: str, + content: str, + icon_url: str = Icons.user_verified + ) -> bool: + """ + Attempt to notify a user, via DM, of their expired infraction. + + Optionally returns a boolean indicator of whether the DM was successful. + """ + embed = Embed( + description=content, + colour=Colour(Colours.soft_green) + ) + + embed.set_author(name=title, icon_url=icon_url) + + return await self.send_private_embed(user, embed) + + async def send_private_embed(self, user: MemberObject, embed: Embed) -> bool: + """ + A helper method for sending an embed to a user's DMs. + + Returns a boolean indicator of DM success. + """ + try: + # sometimes `user` is a `discord.Object`, so let's make it a proper user. + user = await self.bot.fetch_user(user.id) + + await user.send(embed=embed) + return True + except (HTTPException, Forbidden, NotFound): + log.debug( + f"Infraction-related information could not be sent to user {user} ({user.id}). " + "The user either could not be retrieved or probably disabled their DMs." + ) + return False + + async def apply_infraction( + self, + ctx: Context, + infraction: Infraction, + user: MemberObject, + action_coro: Optional[Awaitable] = None + ) -> None: + """Apply an infraction to the user, log the infraction, and optionally notify the user.""" + infr_type = infraction["type"] + icon = INFRACTION_ICONS[infr_type] + reason = infraction["reason"] + expiry = infraction["expires_at"] + + if expiry: + expiry = format_infraction(expiry) + + confirm_msg = f":ok_hand: applied" + expiry_msg = f" until {expiry}" if expiry else " permanently" + dm_result = "" + dm_log_text = "" + expiry_log_text = f"Expires: {expiry}" if expiry else "" + log_title = "applied" + log_content = None + + if not infraction["hidden"]: + if await self.notify_infraction(user, infr_type, expiry, reason): + dm_result = ":incoming_envelope: " + dm_log_text = "\nDM: Sent" + else: + dm_log_text = "\nDM: **Failed**" + log_content = ctx.author.mention + + if action_coro: + try: + await action_coro + if expiry: + self.schedule_task(ctx.bot.loop, infraction["id"], infraction) + except Forbidden: + confirm_msg = f":x: failed to apply" + expiry_msg = "" + log_content = ctx.author.mention + log_title = "failed to apply" + + await ctx.send(f"{dm_result}{confirm_msg} **{infr_type}** to {user.mention}{expiry_msg}.") + + await self.mod_log.send_log_message( + icon_url=icon, + colour=Colour(Colours.soft_red), + title=f"Infraction {log_title}: {infr_type}", + thumbnail=user.avatar_url_as(static_format="png"), + text=textwrap.dedent(f""" + Member: {user.mention} (`{user.id}`) + Actor: {ctx.message.author}{dm_log_text} + Reason: {reason} + {expiry_log_text} + """), + content=log_content, + footer=f"ID {infraction['id']}" + ) + + # endregion + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return with_role_check(ctx, *constants.MODERATION_ROLES) + + # This cannot be static (must have a __func__ attribute). + async def cog_command_error(self, ctx: Context, error: Exception) -> None: + """Send a notification to the invoking context on a Union failure.""" + if isinstance(error, BadUnionArgument): + if User in error.converters: + await ctx.send(str(error.errors[0])) + error.handled = True + + +def setup(bot: Bot) -> None: + """Moderation cog load.""" + bot.add_cog(Infractions(bot)) + log.info("Cog loaded: Moderation") diff --git a/bot/cogs/moderation/management.py b/bot/cogs/moderation/management.py new file mode 100644 index 000000000..6bacab8ca --- /dev/null +++ b/bot/cogs/moderation/management.py @@ -0,0 +1,263 @@ +import asyncio +import logging +import textwrap +import typing as t + +import discord +from discord.ext import commands +from discord.ext.commands import Context + +from bot import constants +from bot.cogs.moderation import Infractions, ModLog +from bot.cogs.moderation.utils import Infraction, proxy_user +from bot.converters import Duration, InfractionSearchQuery +from bot.pagination import LinePaginator +from bot.utils import time +from bot.utils.checks import with_role_check + +log = logging.getLogger(__name__) + +UserConverter = t.Union[discord.User, proxy_user] + + +def permanent_duration(expires_at: str) -> str: + """Only allow an expiration to be 'permanent' if it is a string.""" + expires_at = expires_at.lower() + if expires_at != "permanent": + raise commands.BadArgument + else: + return expires_at + + +class ModManagement(commands.Cog): + """Management of infractions.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @property + def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" + return self.bot.get_cog("ModLog") + + @property + def infractions_cog(self) -> Infractions: + """Get currently loaded Infractions cog instance.""" + return self.bot.get_cog("Infractions") + + # region: Edit infraction commands + + @commands.group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) + async def infraction_group(self, ctx: Context) -> None: + """Infraction manipulation commands.""" + await ctx.invoke(self.bot.get_command("help"), "infraction") + + @infraction_group.command(name='edit') + async def infraction_edit( + self, + ctx: Context, + infraction_id: int, + expires_at: t.Union[Duration, permanent_duration, None], + *, + reason: str = None + ) -> None: + """ + Edit the duration and/or the reason of an infraction. + + Durations are relative to the time of updating and should be appended with a unit of time: + y (years), m (months), w (weeks), d (days), h (hours), M (minutes), s (seconds) + + Use "permanent" to mark the infraction as permanent. + """ + if expires_at is None and reason is None: + # Unlike UserInputError, the error handler will show a specified message for BadArgument + raise commands.BadArgument("Neither a new expiry nor a new reason was specified.") + + # Retrieve the previous infraction for its information. + old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}') + + request_data = {} + confirm_messages = [] + log_text = "" + + if expires_at == "permanent": + request_data['expires_at'] = None + confirm_messages.append("marked as permanent") + elif expires_at is not None: + request_data['expires_at'] = expires_at.isoformat() + expiry = expires_at.strftime(time.INFRACTION_FORMAT) + confirm_messages.append(f"set to expire on {expiry}") + else: + confirm_messages.append("expiry unchanged") + + if reason: + request_data['reason'] = reason + confirm_messages.append("set a new reason") + log_text += f""" + Previous reason: {old_infraction['reason']} + New reason: {reason} + """.rstrip() + else: + confirm_messages.append("reason unchanged") + + # Update the infraction + new_infraction = await self.bot.api_client.patch( + f'bot/infractions/{infraction_id}', + json=request_data, + ) + + # Re-schedule infraction if the expiration has been updated + if 'expires_at' in request_data: + self.infractions_cog.cancel_task(new_infraction['id']) + loop = asyncio.get_event_loop() + self.infractions_cog.schedule_task(loop, new_infraction['id'], new_infraction) + + log_text += f""" + Previous expiry: {old_infraction['expires_at'] or "Permanent"} + New expiry: {new_infraction['expires_at'] or "Permanent"} + """.rstrip() + + await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}") + + # Get information about the infraction's user + user_id = new_infraction['user'] + user = ctx.guild.get_member(user_id) + + if user: + user_text = f"{user.mention} (`{user.id}`)" + thumbnail = user.avatar_url_as(static_format="png") + else: + user_text = f"`{user_id}`" + thumbnail = None + + # The infraction's actor + actor_id = new_infraction['actor'] + actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`" + + await self.mod_log.send_log_message( + icon_url=constants.Icons.pencil, + colour=discord.Colour.blurple(), + title="Infraction edited", + thumbnail=thumbnail, + text=textwrap.dedent(f""" + Member: {user_text} + Actor: {actor} + Edited by: {ctx.message.author}{log_text} + """) + ) + + # endregion + # region: Search infractions + + @infraction_group.group(name="search", invoke_without_command=True) + async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: + """Searches for infractions in the database.""" + if isinstance(query, discord.User): + await ctx.invoke(self.search_user, query) + else: + await ctx.invoke(self.search_reason, query) + + @infraction_search_group.command(name="user", aliases=("member", "id")) + async def search_user(self, ctx: Context, user: UserConverter) -> None: + """Search for infractions by member.""" + infraction_list = await self.bot.api_client.get( + 'bot/infractions', + params={'user__id': str(user.id)} + ) + embed = discord.Embed( + title=f"Infractions for {user} ({len(infraction_list)} total)", + colour=discord.Colour.orange() + ) + await self.send_infraction_list(ctx, embed, infraction_list) + + @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) + async def search_reason(self, ctx: Context, reason: str) -> None: + """Search for infractions by their reason. Use Re2 for matching.""" + infraction_list = await self.bot.api_client.get( + 'bot/infractions', + params={'search': reason} + ) + embed = discord.Embed( + title=f"Infractions matching `{reason}` ({len(infraction_list)} total)", + colour=discord.Colour.orange() + ) + await self.send_infraction_list(ctx, embed, infraction_list) + + # endregion + # region: Utility functions + + async def send_infraction_list( + self, + ctx: Context, + embed: discord.Embed, + infractions: t.Iterable[Infraction] + ) -> None: + """Send a paginated embed of infractions for the specified user.""" + if not infractions: + await ctx.send(f":warning: No infractions could be found for that query.") + return + + lines = tuple( + self.infraction_to_string(infraction) + for infraction in infractions + ) + + await LinePaginator.paginate( + lines, + ctx=ctx, + embed=embed, + empty=True, + max_lines=3, + max_size=1000 + ) + + def infraction_to_string(self, infraction_object: Infraction) -> str: + """Convert the infraction object to a string representation.""" + actor_id = infraction_object["actor"] + guild = self.bot.get_guild(constants.Guild.id) + actor = guild.get_member(actor_id) + active = infraction_object["active"] + user_id = infraction_object["user"] + hidden = infraction_object["hidden"] + created = time.format_infraction(infraction_object["inserted_at"]) + if infraction_object["expires_at"] is None: + expires = "*Permanent*" + else: + expires = time.format_infraction(infraction_object["expires_at"]) + + lines = textwrap.dedent(f""" + {"**===============**" if active else "==============="} + Status: {"__**Active**__" if active else "Inactive"} + User: {self.bot.get_user(user_id)} (`{user_id}`) + Type: **{infraction_object["type"]}** + Shadow: {hidden} + Reason: {infraction_object["reason"] or "*None*"} + Created: {created} + Expires: {expires} + Actor: {actor.mention if actor else actor_id} + ID: `{infraction_object["id"]}` + {"**===============**" if active else "==============="} + """) + + return lines.strip() + + # endregion + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return with_role_check(ctx, *constants.MODERATION_ROLES) + + # This cannot be static (must have a __func__ attribute). + async def cog_command_error(self, ctx: Context, error: Exception) -> None: + """Send a notification to the invoking context on a Union failure.""" + if isinstance(error, commands.BadUnionArgument): + if discord.User in error.converters: + await ctx.send(str(error.errors[0])) + error.handled = True + + +def setup(bot: commands.Bot) -> None: + """Load the Infractions cog.""" + bot.add_cog(ModManagement(bot)) + log.info("Cog loaded: Infractions") diff --git a/bot/cogs/moderation/modlog.py b/bot/cogs/moderation/modlog.py new file mode 100644 index 000000000..50cb55e33 --- /dev/null +++ b/bot/cogs/moderation/modlog.py @@ -0,0 +1,768 @@ +import asyncio +import logging +from datetime import datetime +from typing import List, Optional, Union + +from dateutil.relativedelta import relativedelta +from deepdiff import DeepDiff +from discord import ( + Asset, CategoryChannel, Colour, Embed, File, Guild, + Member, Message, NotFound, RawMessageDeleteEvent, + RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel +) +from discord.abc import GuildChannel +from discord.ext.commands import Bot, Cog, Context + +from bot.constants import ( + Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs +) +from bot.utils.time import humanize_delta + +log = logging.getLogger(__name__) + +GUILD_CHANNEL = Union[CategoryChannel, TextChannel, VoiceChannel] + +CHANNEL_CHANGES_UNSUPPORTED = ("permissions",) +CHANNEL_CHANGES_SUPPRESSED = ("_overwrites", "position") +MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status") +ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions") + + +class ModLog(Cog, name="ModLog"): + """Logging for server events and staff actions.""" + + def __init__(self, bot: Bot): + self.bot = bot + self._ignored = {event: [] for event in Event} + + self._cached_deletes = [] + self._cached_edits = [] + + async def upload_log(self, messages: List[Message], actor_id: int) -> str: + """ + Uploads the log data to the database via an API endpoint for uploading logs. + + Used in several mod log embeds. + + Returns a URL that can be used to view the log. + """ + response = await self.bot.api_client.post( + 'bot/deleted-messages', + json={ + 'actor': actor_id, + 'creation': datetime.utcnow().isoformat(), + 'deletedmessage_set': [ + { + 'id': message.id, + 'author': message.author.id, + 'channel_id': message.channel.id, + 'content': message.content, + 'embeds': [embed.to_dict() for embed in message.embeds] + } + for message in messages + ] + } + ) + + return f"{URLs.site_logs_view}/{response['id']}" + + def ignore(self, event: Event, *items: int) -> None: + """Add event to ignored events to suppress log emission.""" + for item in items: + if item not in self._ignored[event]: + self._ignored[event].append(item) + + async def send_log_message( + self, + icon_url: Optional[str], + colour: Colour, + title: Optional[str], + text: str, + thumbnail: Optional[Union[str, Asset]] = None, + channel_id: int = Channels.modlog, + ping_everyone: bool = False, + files: Optional[List[File]] = None, + content: Optional[str] = None, + additional_embeds: Optional[List[Embed]] = None, + additional_embeds_msg: Optional[str] = None, + timestamp_override: Optional[datetime] = None, + footer: Optional[str] = None, + ) -> Context: + """Generate log embed and send to logging channel.""" + embed = Embed(description=text) + + if title and icon_url: + embed.set_author(name=title, icon_url=icon_url) + + embed.colour = colour + embed.timestamp = timestamp_override or datetime.utcnow() + + if footer: + embed.set_footer(text=footer) + + if thumbnail: + embed.set_thumbnail(url=thumbnail) + + if ping_everyone: + if content: + content = f"@everyone\n{content}" + else: + content = "@everyone" + + channel = self.bot.get_channel(channel_id) + log_message = await channel.send(content=content, embed=embed, files=files) + + if additional_embeds: + if additional_embeds_msg: + await channel.send(additional_embeds_msg) + for additional_embed in additional_embeds: + await channel.send(embed=additional_embed) + + return await self.bot.get_context(log_message) # Optionally return for use with antispam + + @Cog.listener() + async def on_guild_channel_create(self, channel: GUILD_CHANNEL) -> None: + """Log channel create event to mod log.""" + if channel.guild.id != GuildConstant.id: + return + + if isinstance(channel, CategoryChannel): + title = "Category created" + message = f"{channel.name} (`{channel.id}`)" + elif isinstance(channel, VoiceChannel): + title = "Voice channel created" + + if channel.category: + message = f"{channel.category}/{channel.name} (`{channel.id}`)" + else: + message = f"{channel.name} (`{channel.id}`)" + else: + title = "Text channel created" + + if channel.category: + message = f"{channel.category}/{channel.name} (`{channel.id}`)" + else: + message = f"{channel.name} (`{channel.id}`)" + + await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message) + + @Cog.listener() + async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None: + """Log channel delete event to mod log.""" + if channel.guild.id != GuildConstant.id: + return + + if isinstance(channel, CategoryChannel): + title = "Category deleted" + elif isinstance(channel, VoiceChannel): + title = "Voice channel deleted" + else: + title = "Text channel deleted" + + if channel.category and not isinstance(channel, CategoryChannel): + message = f"{channel.category}/{channel.name} (`{channel.id}`)" + else: + message = f"{channel.name} (`{channel.id}`)" + + await self.send_log_message( + Icons.hash_red, Colour(Colours.soft_red), + title, message + ) + + @Cog.listener() + async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel) -> None: + """Log channel update event to mod log.""" + if before.guild.id != GuildConstant.id: + return + + if before.id in self._ignored[Event.guild_channel_update]: + self._ignored[Event.guild_channel_update].remove(before.id) + return + + diff = DeepDiff(before, after) + changes = [] + done = [] + + diff_values = diff.get("values_changed", {}) + diff_values.update(diff.get("type_changes", {})) + + for key, value in diff_values.items(): + if not key: # Not sure why, but it happens + continue + + key = key[5:] # Remove "root." prefix + + if "[" in key: + key = key.split("[", 1)[0] + + if "." in key: + key = key.split(".", 1)[0] + + if key in done or key in CHANNEL_CHANGES_SUPPRESSED: + continue + + if key in CHANNEL_CHANGES_UNSUPPORTED: + changes.append(f"**{key.title()}** updated") + else: + new = value["new_value"] + old = value["old_value"] + + changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") + + done.append(key) + + if not changes: + return + + message = "" + + for item in sorted(changes): + message += f"{Emojis.bullet} {item}\n" + + if after.category: + message = f"**{after.category}/#{after.name} (`{after.id}`)**\n{message}" + else: + message = f"**#{after.name}** (`{after.id}`)\n{message}" + + await self.send_log_message( + Icons.hash_blurple, Colour.blurple(), + "Channel updated", message + ) + + @Cog.listener() + async def on_guild_role_create(self, role: Role) -> None: + """Log role create event to mod log.""" + if role.guild.id != GuildConstant.id: + return + + await self.send_log_message( + Icons.crown_green, Colour(Colours.soft_green), + "Role created", f"`{role.id}`" + ) + + @Cog.listener() + async def on_guild_role_delete(self, role: Role) -> None: + """Log role delete event to mod log.""" + if role.guild.id != GuildConstant.id: + return + + await self.send_log_message( + Icons.crown_red, Colour(Colours.soft_red), + "Role removed", f"{role.name} (`{role.id}`)" + ) + + @Cog.listener() + async def on_guild_role_update(self, before: Role, after: Role) -> None: + """Log role update event to mod log.""" + if before.guild.id != GuildConstant.id: + return + + diff = DeepDiff(before, after) + changes = [] + done = [] + + diff_values = diff.get("values_changed", {}) + diff_values.update(diff.get("type_changes", {})) + + for key, value in diff_values.items(): + if not key: # Not sure why, but it happens + continue + + key = key[5:] # Remove "root." prefix + + if "[" in key: + key = key.split("[", 1)[0] + + if "." in key: + key = key.split(".", 1)[0] + + if key in done or key == "color": + continue + + if key in ROLE_CHANGES_UNSUPPORTED: + changes.append(f"**{key.title()}** updated") + else: + new = value["new_value"] + old = value["old_value"] + + changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") + + done.append(key) + + if not changes: + return + + message = "" + + for item in sorted(changes): + message += f"{Emojis.bullet} {item}\n" + + message = f"**{after.name}** (`{after.id}`)\n{message}" + + await self.send_log_message( + Icons.crown_blurple, Colour.blurple(), + "Role updated", message + ) + + @Cog.listener() + async def on_guild_update(self, before: Guild, after: Guild) -> None: + """Log guild update event to mod log.""" + if before.id != GuildConstant.id: + return + + diff = DeepDiff(before, after) + changes = [] + done = [] + + diff_values = diff.get("values_changed", {}) + diff_values.update(diff.get("type_changes", {})) + + for key, value in diff_values.items(): + if not key: # Not sure why, but it happens + continue + + key = key[5:] # Remove "root." prefix + + if "[" in key: + key = key.split("[", 1)[0] + + if "." in key: + key = key.split(".", 1)[0] + + if key in done: + continue + + new = value["new_value"] + old = value["old_value"] + + changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") + + done.append(key) + + if not changes: + return + + message = "" + + for item in sorted(changes): + message += f"{Emojis.bullet} {item}\n" + + message = f"**{after.name}** (`{after.id}`)\n{message}" + + await self.send_log_message( + Icons.guild_update, Colour.blurple(), + "Guild updated", message, + thumbnail=after.icon_url_as(format="png") + ) + + @Cog.listener() + async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None: + """Log ban event to mod log.""" + if guild.id != GuildConstant.id: + return + + if member.id in self._ignored[Event.member_ban]: + self._ignored[Event.member_ban].remove(member.id) + return + + await self.send_log_message( + Icons.user_ban, Colour(Colours.soft_red), + "User banned", f"{member.name}#{member.discriminator} (`{member.id}`)", + thumbnail=member.avatar_url_as(static_format="png"), + channel_id=Channels.modlog + ) + + @Cog.listener() + async def on_member_join(self, member: Member) -> None: + """Log member join event to user log.""" + if member.guild.id != GuildConstant.id: + return + + message = f"{member.name}#{member.discriminator} (`{member.id}`)" + now = datetime.utcnow() + difference = abs(relativedelta(now, member.created_at)) + + message += "\n\n**Account age:** " + humanize_delta(difference) + + if difference.days < 1 and difference.months < 1 and difference.years < 1: # New user account! + message = f"{Emojis.new} {message}" + + await self.send_log_message( + Icons.sign_in, Colour(Colours.soft_green), + "User joined", message, + thumbnail=member.avatar_url_as(static_format="png"), + channel_id=Channels.userlog + ) + + @Cog.listener() + async def on_member_remove(self, member: Member) -> None: + """Log member leave event to user log.""" + if member.guild.id != GuildConstant.id: + return + + if member.id in self._ignored[Event.member_remove]: + self._ignored[Event.member_remove].remove(member.id) + return + + await self.send_log_message( + Icons.sign_out, Colour(Colours.soft_red), + "User left", f"{member.name}#{member.discriminator} (`{member.id}`)", + thumbnail=member.avatar_url_as(static_format="png"), + channel_id=Channels.userlog + ) + + @Cog.listener() + async def on_member_unban(self, guild: Guild, member: User) -> None: + """Log member unban event to mod log.""" + if guild.id != GuildConstant.id: + return + + if member.id in self._ignored[Event.member_unban]: + self._ignored[Event.member_unban].remove(member.id) + return + + await self.send_log_message( + Icons.user_unban, Colour.blurple(), + "User unbanned", f"{member.name}#{member.discriminator} (`{member.id}`)", + thumbnail=member.avatar_url_as(static_format="png"), + channel_id=Channels.modlog + ) + + @Cog.listener() + async def on_member_update(self, before: Member, after: Member) -> None: + """Log member update event to user log.""" + if before.guild.id != GuildConstant.id: + return + + if before.id in self._ignored[Event.member_update]: + self._ignored[Event.member_update].remove(before.id) + return + + diff = DeepDiff(before, after) + changes = [] + done = [] + + diff_values = {} + + diff_values.update(diff.get("values_changed", {})) + diff_values.update(diff.get("type_changes", {})) + diff_values.update(diff.get("iterable_item_removed", {})) + diff_values.update(diff.get("iterable_item_added", {})) + + diff_user = DeepDiff(before._user, after._user) + + diff_values.update(diff_user.get("values_changed", {})) + diff_values.update(diff_user.get("type_changes", {})) + diff_values.update(diff_user.get("iterable_item_removed", {})) + diff_values.update(diff_user.get("iterable_item_added", {})) + + for key, value in diff_values.items(): + if not key: # Not sure why, but it happens + continue + + key = key[5:] # Remove "root." prefix + + if "[" in key: + key = key.split("[", 1)[0] + + if "." in key: + key = key.split(".", 1)[0] + + if key in done or key in MEMBER_CHANGES_SUPPRESSED: + continue + + if key == "_roles": + new_roles = after.roles + old_roles = before.roles + + for role in old_roles: + if role not in new_roles: + changes.append(f"**Role removed:** {role.name} (`{role.id}`)") + + for role in new_roles: + if role not in old_roles: + changes.append(f"**Role added:** {role.name} (`{role.id}`)") + + else: + new = value.get("new_value") + old = value.get("old_value") + + if new and old: + changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") + + done.append(key) + + if before.name != after.name: + changes.append( + f"**Username:** `{before.name}` **->** `{after.name}`" + ) + + if before.discriminator != after.discriminator: + changes.append( + f"**Discriminator:** `{before.discriminator}` **->** `{after.discriminator}`" + ) + + if not changes: + return + + message = "" + + for item in sorted(changes): + message += f"{Emojis.bullet} {item}\n" + + message = f"**{after.name}#{after.discriminator}** (`{after.id}`)\n{message}" + + await self.send_log_message( + Icons.user_update, Colour.blurple(), + "Member updated", message, + thumbnail=after.avatar_url_as(static_format="png"), + channel_id=Channels.userlog + ) + + @Cog.listener() + async def on_message_delete(self, message: Message) -> None: + """Log message delete event to message change log.""" + channel = message.channel + author = message.author + + if message.guild.id != GuildConstant.id or channel.id in GuildConstant.ignored: + return + + self._cached_deletes.append(message.id) + + if message.id in self._ignored[Event.message_delete]: + self._ignored[Event.message_delete].remove(message.id) + return + + if author.bot: + return + + if channel.category: + response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{message.id}`\n" + "\n" + ) + else: + response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** #{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{message.id}`\n" + "\n" + ) + + if message.attachments: + # Prepend the message metadata with the number of attachments + response = f"**Attachments:** {len(message.attachments)}\n" + response + + # Shorten the message content if necessary + content = message.clean_content + remaining_chars = 2040 - len(response) + + if len(content) > remaining_chars: + botlog_url = await self.upload_log(messages=[message], actor_id=message.author.id) + ending = f"\n\nMessage truncated, [full message here]({botlog_url})." + truncation_point = remaining_chars - len(ending) + content = f"{content[:truncation_point]}...{ending}" + + response += f"{content}" + + await self.send_log_message( + Icons.message_delete, Colours.soft_red, + "Message deleted", + response, + channel_id=Channels.message_log + ) + + @Cog.listener() + async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None: + """Log raw message delete event to message change log.""" + if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored: + return + + await asyncio.sleep(1) # Wait here in case the normal event was fired + + if event.message_id in self._cached_deletes: + # It was in the cache and the normal event was fired, so we can just ignore it + self._cached_deletes.remove(event.message_id) + return + + if event.message_id in self._ignored[Event.message_delete]: + self._ignored[Event.message_delete].remove(event.message_id) + return + + channel = self.bot.get_channel(event.channel_id) + + if channel.category: + response = ( + f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{event.message_id}`\n" + "\n" + "This message was not cached, so the message content cannot be displayed." + ) + else: + response = ( + f"**Channel:** #{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{event.message_id}`\n" + "\n" + "This message was not cached, so the message content cannot be displayed." + ) + + await self.send_log_message( + Icons.message_delete, Colour(Colours.soft_red), + "Message deleted", + response, + channel_id=Channels.message_log + ) + + @Cog.listener() + async def on_message_edit(self, before: Message, after: Message) -> None: + """Log message edit event to message change log.""" + if ( + not before.guild + or before.guild.id != GuildConstant.id + or before.channel.id in GuildConstant.ignored + or before.author.bot + ): + return + + self._cached_edits.append(before.id) + + if before.content == after.content: + return + + author = before.author + channel = before.channel + + if channel.category: + before_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{before.id}`\n" + "\n" + f"{before.clean_content}" + ) + + after_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{before.id}`\n" + "\n" + f"{after.clean_content}" + ) + else: + before_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** #{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{before.id}`\n" + "\n" + f"{before.clean_content}" + ) + + after_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** #{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{before.id}`\n" + "\n" + f"{after.clean_content}" + ) + + if before.edited_at: + # Message was previously edited, to assist with self-bot detection, use the edited_at + # datetime as the baseline and create a human-readable delta between this edit event + # and the last time the message was edited + timestamp = before.edited_at + delta = humanize_delta(relativedelta(after.edited_at, before.edited_at)) + footer = f"Last edited {delta} ago" + else: + # Message was not previously edited, use the created_at datetime as the baseline, no + # delta calculation needed + timestamp = before.created_at + footer = None + + await self.send_log_message( + Icons.message_edit, Colour.blurple(), "Message edited (Before)", before_response, + channel_id=Channels.message_log, timestamp_override=timestamp, footer=footer + ) + + await self.send_log_message( + Icons.message_edit, Colour.blurple(), "Message edited (After)", after_response, + channel_id=Channels.message_log, timestamp_override=after.edited_at + ) + + @Cog.listener() + async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None: + """Log raw message edit event to message change log.""" + try: + channel = self.bot.get_channel(int(event.data["channel_id"])) + message = await channel.fetch_message(event.message_id) + except NotFound: # Was deleted before we got the event + return + + if ( + not message.guild + or message.guild.id != GuildConstant.id + or message.channel.id in GuildConstant.ignored + or message.author.bot + ): + return + + await asyncio.sleep(1) # Wait here in case the normal event was fired + + if event.message_id in self._cached_edits: + # It was in the cache and the normal event was fired, so we can just ignore it + self._cached_edits.remove(event.message_id) + return + + author = message.author + channel = message.channel + + if channel.category: + before_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{message.id}`\n" + "\n" + "This message was not cached, so the message content cannot be displayed." + ) + + after_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{message.id}`\n" + "\n" + f"{message.clean_content}" + ) + else: + before_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** #{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{message.id}`\n" + "\n" + "This message was not cached, so the message content cannot be displayed." + ) + + after_response = ( + f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" + f"**Channel:** #{channel.name} (`{channel.id}`)\n" + f"**Message ID:** `{message.id}`\n" + "\n" + f"{message.clean_content}" + ) + + await self.send_log_message( + Icons.message_edit, Colour.blurple(), "Message edited (Before)", + before_response, channel_id=Channels.message_log + ) + + await self.send_log_message( + Icons.message_edit, Colour.blurple(), "Message edited (After)", + after_response, channel_id=Channels.message_log + ) + + +def setup(bot: Bot) -> None: + """Mod log cog load.""" + bot.add_cog(ModLog(bot)) + log.info("Cog loaded: ModLog") diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py new file mode 100644 index 000000000..48ebe422c --- /dev/null +++ b/bot/cogs/moderation/utils.py @@ -0,0 +1,87 @@ +import logging +import typing as t +from datetime import datetime + +import discord +from discord.ext import commands +from discord.ext.commands import Context + +from bot.api import ResponseCodeError + +log = logging.getLogger(__name__) + +MemberObject = t.Union[discord.Member, discord.User, discord.Object] +Infraction = t.Dict[str, t.Union[str, int, bool]] + + +def proxy_user(user_id: str) -> discord.Object: + """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved.""" + try: + user_id = int(user_id) + except ValueError: + raise commands.BadArgument + + user = discord.Object(user_id) + user.mention = user.id + user.avatar_url_as = lambda static_format: None + + return user + + +async def post_infraction( + ctx: Context, + user: MemberObject, + type: str, + reason: str, + expires_at: datetime = None, + hidden: bool = False, + active: bool = True, +) -> t.Optional[dict]: + """Posts an infraction to the API.""" + payload = { + "actor": ctx.message.author.id, + "hidden": hidden, + "reason": reason, + "type": type, + "user": user.id, + "active": active + } + if expires_at: + payload['expires_at'] = expires_at.isoformat() + + try: + response = await ctx.bot.api_client.post('bot/infractions', json=payload) + except ResponseCodeError as exp: + if exp.status == 400 and 'user' in exp.response_json: + log.info( + f"{ctx.author} tried to add a {type} infraction to `{user.id}`, " + "but that user id was not found in the database." + ) + await ctx.send(f":x: Cannot add infraction, the specified user is not known to the database.") + return + else: + log.exception("An unexpected ResponseCodeError occurred while adding an infraction:") + await ctx.send(":x: There was an error adding the infraction.") + return + + return response + + +async def already_has_active_infraction(ctx: Context, user: MemberObject, type: str) -> bool: + """Checks if a user already has an active infraction of the given type.""" + active_infractions = await ctx.bot.api_client.get( + 'bot/infractions', + params={ + 'active': 'true', + 'type': type, + 'user__id': str(user.id) + } + ) + if active_infractions: + await ctx.send( + f":x: According to my records, this user already has a {type} infraction. " + f"See infraction **#{active_infractions[0]['id']}**." + ) + return True + else: + return False diff --git a/bot/cogs/modlog.py b/bot/cogs/modlog.py deleted file mode 100644 index 50cb55e33..000000000 --- a/bot/cogs/modlog.py +++ /dev/null @@ -1,768 +0,0 @@ -import asyncio -import logging -from datetime import datetime -from typing import List, Optional, Union - -from dateutil.relativedelta import relativedelta -from deepdiff import DeepDiff -from discord import ( - Asset, CategoryChannel, Colour, Embed, File, Guild, - Member, Message, NotFound, RawMessageDeleteEvent, - RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel -) -from discord.abc import GuildChannel -from discord.ext.commands import Bot, Cog, Context - -from bot.constants import ( - Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs -) -from bot.utils.time import humanize_delta - -log = logging.getLogger(__name__) - -GUILD_CHANNEL = Union[CategoryChannel, TextChannel, VoiceChannel] - -CHANNEL_CHANGES_UNSUPPORTED = ("permissions",) -CHANNEL_CHANGES_SUPPRESSED = ("_overwrites", "position") -MEMBER_CHANGES_SUPPRESSED = ("status", "activities", "_client_status") -ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions") - - -class ModLog(Cog, name="ModLog"): - """Logging for server events and staff actions.""" - - def __init__(self, bot: Bot): - self.bot = bot - self._ignored = {event: [] for event in Event} - - self._cached_deletes = [] - self._cached_edits = [] - - async def upload_log(self, messages: List[Message], actor_id: int) -> str: - """ - Uploads the log data to the database via an API endpoint for uploading logs. - - Used in several mod log embeds. - - Returns a URL that can be used to view the log. - """ - response = await self.bot.api_client.post( - 'bot/deleted-messages', - json={ - 'actor': actor_id, - 'creation': datetime.utcnow().isoformat(), - 'deletedmessage_set': [ - { - 'id': message.id, - 'author': message.author.id, - 'channel_id': message.channel.id, - 'content': message.content, - 'embeds': [embed.to_dict() for embed in message.embeds] - } - for message in messages - ] - } - ) - - return f"{URLs.site_logs_view}/{response['id']}" - - def ignore(self, event: Event, *items: int) -> None: - """Add event to ignored events to suppress log emission.""" - for item in items: - if item not in self._ignored[event]: - self._ignored[event].append(item) - - async def send_log_message( - self, - icon_url: Optional[str], - colour: Colour, - title: Optional[str], - text: str, - thumbnail: Optional[Union[str, Asset]] = None, - channel_id: int = Channels.modlog, - ping_everyone: bool = False, - files: Optional[List[File]] = None, - content: Optional[str] = None, - additional_embeds: Optional[List[Embed]] = None, - additional_embeds_msg: Optional[str] = None, - timestamp_override: Optional[datetime] = None, - footer: Optional[str] = None, - ) -> Context: - """Generate log embed and send to logging channel.""" - embed = Embed(description=text) - - if title and icon_url: - embed.set_author(name=title, icon_url=icon_url) - - embed.colour = colour - embed.timestamp = timestamp_override or datetime.utcnow() - - if footer: - embed.set_footer(text=footer) - - if thumbnail: - embed.set_thumbnail(url=thumbnail) - - if ping_everyone: - if content: - content = f"@everyone\n{content}" - else: - content = "@everyone" - - channel = self.bot.get_channel(channel_id) - log_message = await channel.send(content=content, embed=embed, files=files) - - if additional_embeds: - if additional_embeds_msg: - await channel.send(additional_embeds_msg) - for additional_embed in additional_embeds: - await channel.send(embed=additional_embed) - - return await self.bot.get_context(log_message) # Optionally return for use with antispam - - @Cog.listener() - async def on_guild_channel_create(self, channel: GUILD_CHANNEL) -> None: - """Log channel create event to mod log.""" - if channel.guild.id != GuildConstant.id: - return - - if isinstance(channel, CategoryChannel): - title = "Category created" - message = f"{channel.name} (`{channel.id}`)" - elif isinstance(channel, VoiceChannel): - title = "Voice channel created" - - if channel.category: - message = f"{channel.category}/{channel.name} (`{channel.id}`)" - else: - message = f"{channel.name} (`{channel.id}`)" - else: - title = "Text channel created" - - if channel.category: - message = f"{channel.category}/{channel.name} (`{channel.id}`)" - else: - message = f"{channel.name} (`{channel.id}`)" - - await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message) - - @Cog.listener() - async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None: - """Log channel delete event to mod log.""" - if channel.guild.id != GuildConstant.id: - return - - if isinstance(channel, CategoryChannel): - title = "Category deleted" - elif isinstance(channel, VoiceChannel): - title = "Voice channel deleted" - else: - title = "Text channel deleted" - - if channel.category and not isinstance(channel, CategoryChannel): - message = f"{channel.category}/{channel.name} (`{channel.id}`)" - else: - message = f"{channel.name} (`{channel.id}`)" - - await self.send_log_message( - Icons.hash_red, Colour(Colours.soft_red), - title, message - ) - - @Cog.listener() - async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel) -> None: - """Log channel update event to mod log.""" - if before.guild.id != GuildConstant.id: - return - - if before.id in self._ignored[Event.guild_channel_update]: - self._ignored[Event.guild_channel_update].remove(before.id) - return - - diff = DeepDiff(before, after) - changes = [] - done = [] - - diff_values = diff.get("values_changed", {}) - diff_values.update(diff.get("type_changes", {})) - - for key, value in diff_values.items(): - if not key: # Not sure why, but it happens - continue - - key = key[5:] # Remove "root." prefix - - if "[" in key: - key = key.split("[", 1)[0] - - if "." in key: - key = key.split(".", 1)[0] - - if key in done or key in CHANNEL_CHANGES_SUPPRESSED: - continue - - if key in CHANNEL_CHANGES_UNSUPPORTED: - changes.append(f"**{key.title()}** updated") - else: - new = value["new_value"] - old = value["old_value"] - - changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") - - done.append(key) - - if not changes: - return - - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - - if after.category: - message = f"**{after.category}/#{after.name} (`{after.id}`)**\n{message}" - else: - message = f"**#{after.name}** (`{after.id}`)\n{message}" - - await self.send_log_message( - Icons.hash_blurple, Colour.blurple(), - "Channel updated", message - ) - - @Cog.listener() - async def on_guild_role_create(self, role: Role) -> None: - """Log role create event to mod log.""" - if role.guild.id != GuildConstant.id: - return - - await self.send_log_message( - Icons.crown_green, Colour(Colours.soft_green), - "Role created", f"`{role.id}`" - ) - - @Cog.listener() - async def on_guild_role_delete(self, role: Role) -> None: - """Log role delete event to mod log.""" - if role.guild.id != GuildConstant.id: - return - - await self.send_log_message( - Icons.crown_red, Colour(Colours.soft_red), - "Role removed", f"{role.name} (`{role.id}`)" - ) - - @Cog.listener() - async def on_guild_role_update(self, before: Role, after: Role) -> None: - """Log role update event to mod log.""" - if before.guild.id != GuildConstant.id: - return - - diff = DeepDiff(before, after) - changes = [] - done = [] - - diff_values = diff.get("values_changed", {}) - diff_values.update(diff.get("type_changes", {})) - - for key, value in diff_values.items(): - if not key: # Not sure why, but it happens - continue - - key = key[5:] # Remove "root." prefix - - if "[" in key: - key = key.split("[", 1)[0] - - if "." in key: - key = key.split(".", 1)[0] - - if key in done or key == "color": - continue - - if key in ROLE_CHANGES_UNSUPPORTED: - changes.append(f"**{key.title()}** updated") - else: - new = value["new_value"] - old = value["old_value"] - - changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") - - done.append(key) - - if not changes: - return - - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - - message = f"**{after.name}** (`{after.id}`)\n{message}" - - await self.send_log_message( - Icons.crown_blurple, Colour.blurple(), - "Role updated", message - ) - - @Cog.listener() - async def on_guild_update(self, before: Guild, after: Guild) -> None: - """Log guild update event to mod log.""" - if before.id != GuildConstant.id: - return - - diff = DeepDiff(before, after) - changes = [] - done = [] - - diff_values = diff.get("values_changed", {}) - diff_values.update(diff.get("type_changes", {})) - - for key, value in diff_values.items(): - if not key: # Not sure why, but it happens - continue - - key = key[5:] # Remove "root." prefix - - if "[" in key: - key = key.split("[", 1)[0] - - if "." in key: - key = key.split(".", 1)[0] - - if key in done: - continue - - new = value["new_value"] - old = value["old_value"] - - changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") - - done.append(key) - - if not changes: - return - - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - - message = f"**{after.name}** (`{after.id}`)\n{message}" - - await self.send_log_message( - Icons.guild_update, Colour.blurple(), - "Guild updated", message, - thumbnail=after.icon_url_as(format="png") - ) - - @Cog.listener() - async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None: - """Log ban event to mod log.""" - if guild.id != GuildConstant.id: - return - - if member.id in self._ignored[Event.member_ban]: - self._ignored[Event.member_ban].remove(member.id) - return - - await self.send_log_message( - Icons.user_ban, Colour(Colours.soft_red), - "User banned", f"{member.name}#{member.discriminator} (`{member.id}`)", - thumbnail=member.avatar_url_as(static_format="png"), - channel_id=Channels.modlog - ) - - @Cog.listener() - async def on_member_join(self, member: Member) -> None: - """Log member join event to user log.""" - if member.guild.id != GuildConstant.id: - return - - message = f"{member.name}#{member.discriminator} (`{member.id}`)" - now = datetime.utcnow() - difference = abs(relativedelta(now, member.created_at)) - - message += "\n\n**Account age:** " + humanize_delta(difference) - - if difference.days < 1 and difference.months < 1 and difference.years < 1: # New user account! - message = f"{Emojis.new} {message}" - - await self.send_log_message( - Icons.sign_in, Colour(Colours.soft_green), - "User joined", message, - thumbnail=member.avatar_url_as(static_format="png"), - channel_id=Channels.userlog - ) - - @Cog.listener() - async def on_member_remove(self, member: Member) -> None: - """Log member leave event to user log.""" - if member.guild.id != GuildConstant.id: - return - - if member.id in self._ignored[Event.member_remove]: - self._ignored[Event.member_remove].remove(member.id) - return - - await self.send_log_message( - Icons.sign_out, Colour(Colours.soft_red), - "User left", f"{member.name}#{member.discriminator} (`{member.id}`)", - thumbnail=member.avatar_url_as(static_format="png"), - channel_id=Channels.userlog - ) - - @Cog.listener() - async def on_member_unban(self, guild: Guild, member: User) -> None: - """Log member unban event to mod log.""" - if guild.id != GuildConstant.id: - return - - if member.id in self._ignored[Event.member_unban]: - self._ignored[Event.member_unban].remove(member.id) - return - - await self.send_log_message( - Icons.user_unban, Colour.blurple(), - "User unbanned", f"{member.name}#{member.discriminator} (`{member.id}`)", - thumbnail=member.avatar_url_as(static_format="png"), - channel_id=Channels.modlog - ) - - @Cog.listener() - async def on_member_update(self, before: Member, after: Member) -> None: - """Log member update event to user log.""" - if before.guild.id != GuildConstant.id: - return - - if before.id in self._ignored[Event.member_update]: - self._ignored[Event.member_update].remove(before.id) - return - - diff = DeepDiff(before, after) - changes = [] - done = [] - - diff_values = {} - - diff_values.update(diff.get("values_changed", {})) - diff_values.update(diff.get("type_changes", {})) - diff_values.update(diff.get("iterable_item_removed", {})) - diff_values.update(diff.get("iterable_item_added", {})) - - diff_user = DeepDiff(before._user, after._user) - - diff_values.update(diff_user.get("values_changed", {})) - diff_values.update(diff_user.get("type_changes", {})) - diff_values.update(diff_user.get("iterable_item_removed", {})) - diff_values.update(diff_user.get("iterable_item_added", {})) - - for key, value in diff_values.items(): - if not key: # Not sure why, but it happens - continue - - key = key[5:] # Remove "root." prefix - - if "[" in key: - key = key.split("[", 1)[0] - - if "." in key: - key = key.split(".", 1)[0] - - if key in done or key in MEMBER_CHANGES_SUPPRESSED: - continue - - if key == "_roles": - new_roles = after.roles - old_roles = before.roles - - for role in old_roles: - if role not in new_roles: - changes.append(f"**Role removed:** {role.name} (`{role.id}`)") - - for role in new_roles: - if role not in old_roles: - changes.append(f"**Role added:** {role.name} (`{role.id}`)") - - else: - new = value.get("new_value") - old = value.get("old_value") - - if new and old: - changes.append(f"**{key.title()}:** `{old}` **->** `{new}`") - - done.append(key) - - if before.name != after.name: - changes.append( - f"**Username:** `{before.name}` **->** `{after.name}`" - ) - - if before.discriminator != after.discriminator: - changes.append( - f"**Discriminator:** `{before.discriminator}` **->** `{after.discriminator}`" - ) - - if not changes: - return - - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - - message = f"**{after.name}#{after.discriminator}** (`{after.id}`)\n{message}" - - await self.send_log_message( - Icons.user_update, Colour.blurple(), - "Member updated", message, - thumbnail=after.avatar_url_as(static_format="png"), - channel_id=Channels.userlog - ) - - @Cog.listener() - async def on_message_delete(self, message: Message) -> None: - """Log message delete event to message change log.""" - channel = message.channel - author = message.author - - if message.guild.id != GuildConstant.id or channel.id in GuildConstant.ignored: - return - - self._cached_deletes.append(message.id) - - if message.id in self._ignored[Event.message_delete]: - self._ignored[Event.message_delete].remove(message.id) - return - - if author.bot: - return - - if channel.category: - response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{message.id}`\n" - "\n" - ) - else: - response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** #{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{message.id}`\n" - "\n" - ) - - if message.attachments: - # Prepend the message metadata with the number of attachments - response = f"**Attachments:** {len(message.attachments)}\n" + response - - # Shorten the message content if necessary - content = message.clean_content - remaining_chars = 2040 - len(response) - - if len(content) > remaining_chars: - botlog_url = await self.upload_log(messages=[message], actor_id=message.author.id) - ending = f"\n\nMessage truncated, [full message here]({botlog_url})." - truncation_point = remaining_chars - len(ending) - content = f"{content[:truncation_point]}...{ending}" - - response += f"{content}" - - await self.send_log_message( - Icons.message_delete, Colours.soft_red, - "Message deleted", - response, - channel_id=Channels.message_log - ) - - @Cog.listener() - async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None: - """Log raw message delete event to message change log.""" - if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored: - return - - await asyncio.sleep(1) # Wait here in case the normal event was fired - - if event.message_id in self._cached_deletes: - # It was in the cache and the normal event was fired, so we can just ignore it - self._cached_deletes.remove(event.message_id) - return - - if event.message_id in self._ignored[Event.message_delete]: - self._ignored[Event.message_delete].remove(event.message_id) - return - - channel = self.bot.get_channel(event.channel_id) - - if channel.category: - response = ( - f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{event.message_id}`\n" - "\n" - "This message was not cached, so the message content cannot be displayed." - ) - else: - response = ( - f"**Channel:** #{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{event.message_id}`\n" - "\n" - "This message was not cached, so the message content cannot be displayed." - ) - - await self.send_log_message( - Icons.message_delete, Colour(Colours.soft_red), - "Message deleted", - response, - channel_id=Channels.message_log - ) - - @Cog.listener() - async def on_message_edit(self, before: Message, after: Message) -> None: - """Log message edit event to message change log.""" - if ( - not before.guild - or before.guild.id != GuildConstant.id - or before.channel.id in GuildConstant.ignored - or before.author.bot - ): - return - - self._cached_edits.append(before.id) - - if before.content == after.content: - return - - author = before.author - channel = before.channel - - if channel.category: - before_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{before.id}`\n" - "\n" - f"{before.clean_content}" - ) - - after_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{before.id}`\n" - "\n" - f"{after.clean_content}" - ) - else: - before_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** #{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{before.id}`\n" - "\n" - f"{before.clean_content}" - ) - - after_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** #{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{before.id}`\n" - "\n" - f"{after.clean_content}" - ) - - if before.edited_at: - # Message was previously edited, to assist with self-bot detection, use the edited_at - # datetime as the baseline and create a human-readable delta between this edit event - # and the last time the message was edited - timestamp = before.edited_at - delta = humanize_delta(relativedelta(after.edited_at, before.edited_at)) - footer = f"Last edited {delta} ago" - else: - # Message was not previously edited, use the created_at datetime as the baseline, no - # delta calculation needed - timestamp = before.created_at - footer = None - - await self.send_log_message( - Icons.message_edit, Colour.blurple(), "Message edited (Before)", before_response, - channel_id=Channels.message_log, timestamp_override=timestamp, footer=footer - ) - - await self.send_log_message( - Icons.message_edit, Colour.blurple(), "Message edited (After)", after_response, - channel_id=Channels.message_log, timestamp_override=after.edited_at - ) - - @Cog.listener() - async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None: - """Log raw message edit event to message change log.""" - try: - channel = self.bot.get_channel(int(event.data["channel_id"])) - message = await channel.fetch_message(event.message_id) - except NotFound: # Was deleted before we got the event - return - - if ( - not message.guild - or message.guild.id != GuildConstant.id - or message.channel.id in GuildConstant.ignored - or message.author.bot - ): - return - - await asyncio.sleep(1) # Wait here in case the normal event was fired - - if event.message_id in self._cached_edits: - # It was in the cache and the normal event was fired, so we can just ignore it - self._cached_edits.remove(event.message_id) - return - - author = message.author - channel = message.channel - - if channel.category: - before_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{message.id}`\n" - "\n" - "This message was not cached, so the message content cannot be displayed." - ) - - after_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** {channel.category}/#{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{message.id}`\n" - "\n" - f"{message.clean_content}" - ) - else: - before_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** #{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{message.id}`\n" - "\n" - "This message was not cached, so the message content cannot be displayed." - ) - - after_response = ( - f"**Author:** {author.name}#{author.discriminator} (`{author.id}`)\n" - f"**Channel:** #{channel.name} (`{channel.id}`)\n" - f"**Message ID:** `{message.id}`\n" - "\n" - f"{message.clean_content}" - ) - - await self.send_log_message( - Icons.message_edit, Colour.blurple(), "Message edited (Before)", - before_response, channel_id=Channels.message_log - ) - - await self.send_log_message( - Icons.message_edit, Colour.blurple(), "Message edited (After)", - after_response, channel_id=Channels.message_log - ) - - -def setup(bot: Bot) -> None: - """Mod log cog load.""" - bot.add_cog(ModLog(bot)) - log.info("Cog loaded: ModLog") diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/superstarify/__init__.py index 87021eded..576de2d31 100644 --- a/bot/cogs/superstarify/__init__.py +++ b/bot/cogs/superstarify/__init__.py @@ -5,13 +5,12 @@ from discord import Colour, Embed, Member from discord.errors import Forbidden from discord.ext.commands import Bot, Cog, Context, command -from bot.cogs.moderation import Moderation -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import Infractions, ModLog +from bot.cogs.moderation.utils import post_infraction from bot.cogs.superstarify.stars import get_nick from bot.constants import Icons, MODERATION_ROLES, POSITIVE_REPLIES from bot.converters import Duration from bot.decorators import with_role -from bot.utils.moderation import post_infraction from bot.utils.time import format_infraction log = logging.getLogger(__name__) @@ -25,9 +24,9 @@ class Superstarify(Cog): self.bot = bot @property - def moderation(self) -> Moderation: - """Get currently loaded Moderation cog instance.""" - return self.bot.get_cog("Moderation") + def infractions_cog(self) -> Infractions: + """Get currently loaded Infractions cog instance.""" + return self.bot.get_cog("Infractions") @property def modlog(self) -> ModLog: @@ -206,7 +205,7 @@ class Superstarify(Cog): thumbnail=member.avatar_url_as(static_format="png") ) - await self.moderation.notify_infraction( + await self.infractions_cog.notify_infraction( user=member, infr_type="Superstarify", expires_at=expiration, @@ -249,7 +248,7 @@ class Superstarify(Cog): embed.description = "User has been released from superstar-prison." embed.title = random.choice(POSITIVE_REPLIES) - await self.moderation.notify_pardon( + await self.infractions_cog.notify_pardon( user=member, title="You are no longer superstarified.", content="You may now change your nickname on the server." diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py index 7dd0afbbd..4a655d049 100644 --- a/bot/cogs/token_remover.py +++ b/bot/cogs/token_remover.py @@ -9,7 +9,7 @@ from discord import Colour, Message from discord.ext.commands import Bot, Cog from discord.utils import snowflake_time -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import Channels, Colours, Event, Icons log = logging.getLogger(__name__) diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py index f0a099f27..acd7a7865 100644 --- a/bot/cogs/verification.py +++ b/bot/cogs/verification.py @@ -3,7 +3,7 @@ import logging from discord import Message, NotFound, Object from discord.ext.commands import Bot, Cog, Context, command -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import Channels, Event, Roles from bot.decorators import InChannelCheckFailure, in_channel, without_role diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py index e191c2dbc..c332d80b9 100644 --- a/bot/cogs/watchchannels/bigbrother.py +++ b/bot/cogs/watchchannels/bigbrother.py @@ -5,9 +5,9 @@ from typing import Union from discord import User from discord.ext.commands import Bot, Cog, Context, group +from bot.cogs.moderation.utils import post_infraction from bot.constants import Channels, Roles, Webhooks from bot.decorators import with_role -from bot.utils.moderation import post_infraction from .watchchannel import WatchChannel, proxy_user log = logging.getLogger(__name__) diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py index ce8014d69..e67f4674b 100644 --- a/bot/cogs/watchchannels/watchchannel.py +++ b/bot/cogs/watchchannels/watchchannel.py @@ -13,7 +13,7 @@ from discord import Color, Embed, HTTPException, Message, Object, errors from discord.ext.commands import BadArgument, Bot, Cog, Context from bot.api import ResponseCodeError -from bot.cogs.modlog import ModLog +from bot.cogs.moderation import ModLog from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig, Icons from bot.pagination import LinePaginator from bot.utils import CogABCMeta, messages diff --git a/bot/utils/moderation.py b/bot/utils/moderation.py deleted file mode 100644 index 48ebe422c..000000000 --- a/bot/utils/moderation.py +++ /dev/null @@ -1,87 +0,0 @@ -import logging -import typing as t -from datetime import datetime - -import discord -from discord.ext import commands -from discord.ext.commands import Context - -from bot.api import ResponseCodeError - -log = logging.getLogger(__name__) - -MemberObject = t.Union[discord.Member, discord.User, discord.Object] -Infraction = t.Dict[str, t.Union[str, int, bool]] - - -def proxy_user(user_id: str) -> discord.Object: - """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved.""" - try: - user_id = int(user_id) - except ValueError: - raise commands.BadArgument - - user = discord.Object(user_id) - user.mention = user.id - user.avatar_url_as = lambda static_format: None - - return user - - -async def post_infraction( - ctx: Context, - user: MemberObject, - type: str, - reason: str, - expires_at: datetime = None, - hidden: bool = False, - active: bool = True, -) -> t.Optional[dict]: - """Posts an infraction to the API.""" - payload = { - "actor": ctx.message.author.id, - "hidden": hidden, - "reason": reason, - "type": type, - "user": user.id, - "active": active - } - if expires_at: - payload['expires_at'] = expires_at.isoformat() - - try: - response = await ctx.bot.api_client.post('bot/infractions', json=payload) - except ResponseCodeError as exp: - if exp.status == 400 and 'user' in exp.response_json: - log.info( - f"{ctx.author} tried to add a {type} infraction to `{user.id}`, " - "but that user id was not found in the database." - ) - await ctx.send(f":x: Cannot add infraction, the specified user is not known to the database.") - return - else: - log.exception("An unexpected ResponseCodeError occurred while adding an infraction:") - await ctx.send(":x: There was an error adding the infraction.") - return - - return response - - -async def already_has_active_infraction(ctx: Context, user: MemberObject, type: str) -> bool: - """Checks if a user already has an active infraction of the given type.""" - active_infractions = await ctx.bot.api_client.get( - 'bot/infractions', - params={ - 'active': 'true', - 'type': type, - 'user__id': str(user.id) - } - ) - if active_infractions: - await ctx.send( - f":x: According to my records, this user already has a {type} infraction. " - f"See infraction **#{active_infractions[0]['id']}**." - ) - return True - else: - return False -- cgit v1.2.3