diff options
| -rw-r--r-- | bot/exts/moderation/clean.py | 229 |
1 files changed, 133 insertions, 96 deletions
diff --git a/bot/exts/moderation/clean.py b/bot/exts/moderation/clean.py index 5954672fe..455d28faa 100644 --- a/bot/exts/moderation/clean.py +++ b/bot/exts/moderation/clean.py @@ -2,7 +2,6 @@ import logging import re import time from collections import defaultdict -from itertools import chain from typing import Any, Callable, DefaultDict, Iterable, List, Literal, Optional, TYPE_CHECKING, Tuple, Union from discord import Colour, Embed, Message, NotFound, TextChannel, User, errors @@ -59,28 +58,35 @@ class Clean(Cog): """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") + # region: Helper functions + @staticmethod - def is_older_than_14d(message: Message) -> bool: - """ - Precisely checks if message is older than 14 days, bulk deletion limit. + def _validate_input( + traverse: int, + channels: CleanChannels, + bots_only: bool, + user: User, + until_message: Message, + after_message: Message, + use_cache: bool + ) -> None: + """Raise errors if an argument value or a combination of values is invalid.""" + # Is this an acceptable amount of messages to traverse? + if traverse > CleanMessages.message_limit: + raise BadArgument(f"You cannot traverse more than {CleanMessages.message_limit} messages.") - Inspired by how purge works internally. - Comparison on message age could possibly be less accurate which in turn would resort in problems - with message deletion if said messages are very close to the 14d mark. - """ - two_weeks_old_snowflake = int((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22 - return message.id < two_weeks_old_snowflake + if after_message: + # Ensure that until_message is specified. + if not until_message: + raise MissingRequiredArgument("`until_message` must be specified if `after_message` is specified.") - async def _delete_messages_individually(self, messages: List[Message]) -> None: - for message in messages: - # Ensure that deletion was not canceled - if not self.cleaning: - return - try: - await message.delete() - except NotFound: - # Message doesn't exist or was already deleted - continue + # Messages are not in same channel + if after_message.channel != until_message.channel: + raise BadArgument("You cannot do range clean across several channel.") + + # Ensure that after_message is younger than until_message + if after_message.created_at >= until_message.created_at: + raise BadArgument("`after` message must be younger than `until` message") def _get_messages_from_cache(self, traverse: int, to_delete: Predicate) -> Tuple[DefaultDict, List[int]]: """Helper function for getting messages from the cache.""" @@ -134,6 +140,107 @@ class Clean(Cog): return message_mappings, message_ids + @staticmethod + def is_older_than_14d(message: Message) -> bool: + """ + Precisely checks if message is older than 14 days, bulk deletion limit. + + Inspired by how purge works internally. + Comparison on message age could possibly be less accurate which in turn would resort in problems + with message deletion if said messages are very close to the 14d mark. + """ + two_weeks_old_snowflake = int((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22 + return message.id < two_weeks_old_snowflake + + async def _delete_messages_individually(self, messages: List[Message]) -> list[Message]: + """Delete each message in the list unless cleaning is cancelled. Return the deleted messages.""" + deleted = [] + for message in messages: + # Ensure that deletion was not canceled + if not self.cleaning: + return deleted + try: + await message.delete() + except NotFound: + # Message doesn't exist or was already deleted + continue + else: + deleted.append(message) + return deleted + + async def _delete_found(self, message_mappings: dict[TextChannel, list[Message]]) -> list[Message]: + """ + Delete the detected messages. + + Deletion is made in bulk per channel for messages less than 14d old. + The function returns the deleted messages. + If cleaning was cancelled in the middle, return messages already deleted. + """ + deleted = [] + for channel, messages in message_mappings.items(): + to_delete = [] + + for current_index, message in enumerate(messages): + if not self.cleaning: + # Means that the cleaning was canceled + return deleted + + if self.is_older_than_14d(message): + # further messages are too old to be deleted in bulk + deleted_remaining = await self._delete_messages_individually(messages[current_index:]) + deleted.extend(deleted_remaining) + if not self.cleaning: + # Means that deletion was canceled while deleting the individual messages + return deleted + break + + to_delete.append(message) + + if len(to_delete) == 100: + # we can only delete up to 100 messages in a bulk + await channel.delete_messages(to_delete) + deleted.extend(to_delete) + to_delete.clear() + + if len(to_delete) > 0: + # deleting any leftover messages if there are any + await channel.delete_messages(to_delete) + deleted.extend(to_delete) + + return deleted + + async def _log_clean(self, messages: list[Message], channels: CleanChannels, invoker: User) -> None: + """Log the deleted messages to the modlog.""" + if not messages: + # Can't build an embed, nothing to clean! + raise BadArgument("No matching messages could be found.") + + # Reverse the list to have reverse chronological order + log_messages = reversed(messages) + log_url = await self.mod_log.upload_log(log_messages, invoker.id) + + # Build the embed and send it + if channels == "*": + target_channels = "all channels" + else: + target_channels = ", ".join(channel.mention for channel in channels) + + message = ( + f"**{len(messages)}** messages deleted in {target_channels} by " + f"{invoker.mention}\n\n" + f"A log of the deleted messages can be found [here]({log_url})." + ) + + await self.mod_log.send_log_message( + icon_url=Icons.message_bulk_delete, + colour=Colour(Colours.soft_red), + title="Bulk message delete", + text=message, + channel_id=Channels.mod_log, + ) + + # endregion + async def _clean_messages( self, traverse: int, @@ -183,22 +290,7 @@ class Clean(Cog): """Check if message is older than message provided in after_message but younger than until_message.""" return after_message.created_at <= message.created_at <= until_message.created_at - # Is this an acceptable amount of messages to traverse? - if traverse > CleanMessages.message_limit: - raise BadArgument(f"You cannot traverse more than {CleanMessages.message_limit} messages.") - - if after_message: - # Ensure that until_message is specified. - if not until_message: - raise MissingRequiredArgument("`until_message` must be specified if `after_message` is specified.") - - # Messages are not in same channel - if after_message.channel != until_message.channel: - raise BadArgument("You cannot do range clean across several channel.") - - # Ensure that after_message is younger than until_message - if after_message.created_at >= until_message.created_at: - raise BadArgument("`after` message must be younger than `until` message") + self._validate_input(traverse, channels, bots_only, user, until_message, after_message, use_cache) # Are we already performing a clean? if self.cleaning: @@ -250,69 +342,12 @@ class Clean(Cog): # Now let's delete the actual messages with purge. self.mod_log.ignore(Event.message_delete, *message_ids) - - for channel, messages in message_mappings.items(): - - to_delete = [] - - for current_index, message in enumerate(messages): - - if not self.cleaning: - # Means that the cleaning was canceled - return - - if self.is_older_than_14d(message): - # further messages are too old to be deleted in bulk - await self._delete_messages_individually(messages[current_index:]) - if not self.cleaning: - # Means that deletion was canceled while deleting the individual messages - return - break - - to_delete.append(message) - - if len(to_delete) == 100: - # we can only delete up to 100 messages in a bulk - await channel.delete_messages(to_delete) - to_delete.clear() - - if len(to_delete) > 0: - # deleting any leftover messages if there are any - await channel.delete_messages(to_delete) - + deleted_messages = await self._delete_found(message_mappings) self.cleaning = False - await self._log_clean(list(chain.from_iterable(message_mappings.values())), channels, ctx.author) - - async def _log_clean(self, messages: list[Message], channels: CleanChannels, invoker: User) -> None: - """Log the deleted messages to the modlog.""" - if not messages: - # Can't build an embed, nothing to clean! - raise BadArgument("No matching messages could be found.") - - # Reverse the list to have reverse chronological order - log_messages = reversed(messages) - log_url = await self.mod_log.upload_log(log_messages, invoker.id) + await self._log_clean(deleted_messages, channels, ctx.author) - # Build the embed and send it - if channels == "*": - target_channels = "all channels" - else: - target_channels = ", ".join(channel.mention for channel in channels) - - message = ( - f"**{len(messages)}** messages deleted in {target_channels} by " - f"{invoker.mention}\n\n" - f"A log of the deleted messages can be found [here]({log_url})." - ) - - await self.mod_log.send_log_message( - icon_url=Icons.message_bulk_delete, - colour=Colour(Colours.soft_red), - title="Bulk message delete", - text=message, - channel_id=Channels.mod_log, - ) + # region: Commands @group(invoke_without_command=True, name="clean", aliases=["clear", "purge"]) @has_any_role(*MODERATION_ROLES) @@ -412,6 +447,8 @@ class Clean(Cog): delete_after = None await ctx.send(embed=embed, delete_after=delete_after) + # endregion + def setup(bot: Bot) -> None: """Load the Clean cog.""" |