aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/moderation/clean.py229
1 files changed, 133 insertions, 96 deletions
diff --git a/bot/exts/moderation/clean.py b/bot/exts/moderation/clean.py
index 5954672fe..455d28faa 100644
--- a/bot/exts/moderation/clean.py
+++ b/bot/exts/moderation/clean.py
@@ -2,7 +2,6 @@ import logging
import re
import time
from collections import defaultdict
-from itertools import chain
from typing import Any, Callable, DefaultDict, Iterable, List, Literal, Optional, TYPE_CHECKING, Tuple, Union
from discord import Colour, Embed, Message, NotFound, TextChannel, User, errors
@@ -59,28 +58,35 @@ class Clean(Cog):
"""Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
+ # region: Helper functions
+
@staticmethod
- def is_older_than_14d(message: Message) -> bool:
- """
- Precisely checks if message is older than 14 days, bulk deletion limit.
+ def _validate_input(
+ traverse: int,
+ channels: CleanChannels,
+ bots_only: bool,
+ user: User,
+ until_message: Message,
+ after_message: Message,
+ use_cache: bool
+ ) -> None:
+ """Raise errors if an argument value or a combination of values is invalid."""
+ # Is this an acceptable amount of messages to traverse?
+ if traverse > CleanMessages.message_limit:
+ raise BadArgument(f"You cannot traverse more than {CleanMessages.message_limit} messages.")
- Inspired by how purge works internally.
- Comparison on message age could possibly be less accurate which in turn would resort in problems
- with message deletion if said messages are very close to the 14d mark.
- """
- two_weeks_old_snowflake = int((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22
- return message.id < two_weeks_old_snowflake
+ if after_message:
+ # Ensure that until_message is specified.
+ if not until_message:
+ raise MissingRequiredArgument("`until_message` must be specified if `after_message` is specified.")
- async def _delete_messages_individually(self, messages: List[Message]) -> None:
- for message in messages:
- # Ensure that deletion was not canceled
- if not self.cleaning:
- return
- try:
- await message.delete()
- except NotFound:
- # Message doesn't exist or was already deleted
- continue
+ # Messages are not in same channel
+ if after_message.channel != until_message.channel:
+ raise BadArgument("You cannot do range clean across several channel.")
+
+ # Ensure that after_message is younger than until_message
+ if after_message.created_at >= until_message.created_at:
+ raise BadArgument("`after` message must be younger than `until` message")
def _get_messages_from_cache(self, traverse: int, to_delete: Predicate) -> Tuple[DefaultDict, List[int]]:
"""Helper function for getting messages from the cache."""
@@ -134,6 +140,107 @@ class Clean(Cog):
return message_mappings, message_ids
+ @staticmethod
+ def is_older_than_14d(message: Message) -> bool:
+ """
+ Precisely checks if message is older than 14 days, bulk deletion limit.
+
+ Inspired by how purge works internally.
+ Comparison on message age could possibly be less accurate which in turn would resort in problems
+ with message deletion if said messages are very close to the 14d mark.
+ """
+ two_weeks_old_snowflake = int((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22
+ return message.id < two_weeks_old_snowflake
+
+ async def _delete_messages_individually(self, messages: List[Message]) -> list[Message]:
+ """Delete each message in the list unless cleaning is cancelled. Return the deleted messages."""
+ deleted = []
+ for message in messages:
+ # Ensure that deletion was not canceled
+ if not self.cleaning:
+ return deleted
+ try:
+ await message.delete()
+ except NotFound:
+ # Message doesn't exist or was already deleted
+ continue
+ else:
+ deleted.append(message)
+ return deleted
+
+ async def _delete_found(self, message_mappings: dict[TextChannel, list[Message]]) -> list[Message]:
+ """
+ Delete the detected messages.
+
+ Deletion is made in bulk per channel for messages less than 14d old.
+ The function returns the deleted messages.
+ If cleaning was cancelled in the middle, return messages already deleted.
+ """
+ deleted = []
+ for channel, messages in message_mappings.items():
+ to_delete = []
+
+ for current_index, message in enumerate(messages):
+ if not self.cleaning:
+ # Means that the cleaning was canceled
+ return deleted
+
+ if self.is_older_than_14d(message):
+ # further messages are too old to be deleted in bulk
+ deleted_remaining = await self._delete_messages_individually(messages[current_index:])
+ deleted.extend(deleted_remaining)
+ if not self.cleaning:
+ # Means that deletion was canceled while deleting the individual messages
+ return deleted
+ break
+
+ to_delete.append(message)
+
+ if len(to_delete) == 100:
+ # we can only delete up to 100 messages in a bulk
+ await channel.delete_messages(to_delete)
+ deleted.extend(to_delete)
+ to_delete.clear()
+
+ if len(to_delete) > 0:
+ # deleting any leftover messages if there are any
+ await channel.delete_messages(to_delete)
+ deleted.extend(to_delete)
+
+ return deleted
+
+ async def _log_clean(self, messages: list[Message], channels: CleanChannels, invoker: User) -> None:
+ """Log the deleted messages to the modlog."""
+ if not messages:
+ # Can't build an embed, nothing to clean!
+ raise BadArgument("No matching messages could be found.")
+
+ # Reverse the list to have reverse chronological order
+ log_messages = reversed(messages)
+ log_url = await self.mod_log.upload_log(log_messages, invoker.id)
+
+ # Build the embed and send it
+ if channels == "*":
+ target_channels = "all channels"
+ else:
+ target_channels = ", ".join(channel.mention for channel in channels)
+
+ message = (
+ f"**{len(messages)}** messages deleted in {target_channels} by "
+ f"{invoker.mention}\n\n"
+ f"A log of the deleted messages can be found [here]({log_url})."
+ )
+
+ await self.mod_log.send_log_message(
+ icon_url=Icons.message_bulk_delete,
+ colour=Colour(Colours.soft_red),
+ title="Bulk message delete",
+ text=message,
+ channel_id=Channels.mod_log,
+ )
+
+ # endregion
+
async def _clean_messages(
self,
traverse: int,
@@ -183,22 +290,7 @@ class Clean(Cog):
"""Check if message is older than message provided in after_message but younger than until_message."""
return after_message.created_at <= message.created_at <= until_message.created_at
- # Is this an acceptable amount of messages to traverse?
- if traverse > CleanMessages.message_limit:
- raise BadArgument(f"You cannot traverse more than {CleanMessages.message_limit} messages.")
-
- if after_message:
- # Ensure that until_message is specified.
- if not until_message:
- raise MissingRequiredArgument("`until_message` must be specified if `after_message` is specified.")
-
- # Messages are not in same channel
- if after_message.channel != until_message.channel:
- raise BadArgument("You cannot do range clean across several channel.")
-
- # Ensure that after_message is younger than until_message
- if after_message.created_at >= until_message.created_at:
- raise BadArgument("`after` message must be younger than `until` message")
+ self._validate_input(traverse, channels, bots_only, user, until_message, after_message, use_cache)
# Are we already performing a clean?
if self.cleaning:
@@ -250,69 +342,12 @@ class Clean(Cog):
# Now let's delete the actual messages with purge.
self.mod_log.ignore(Event.message_delete, *message_ids)
-
- for channel, messages in message_mappings.items():
-
- to_delete = []
-
- for current_index, message in enumerate(messages):
-
- if not self.cleaning:
- # Means that the cleaning was canceled
- return
-
- if self.is_older_than_14d(message):
- # further messages are too old to be deleted in bulk
- await self._delete_messages_individually(messages[current_index:])
- if not self.cleaning:
- # Means that deletion was canceled while deleting the individual messages
- return
- break
-
- to_delete.append(message)
-
- if len(to_delete) == 100:
- # we can only delete up to 100 messages in a bulk
- await channel.delete_messages(to_delete)
- to_delete.clear()
-
- if len(to_delete) > 0:
- # deleting any leftover messages if there are any
- await channel.delete_messages(to_delete)
-
+ deleted_messages = await self._delete_found(message_mappings)
self.cleaning = False
- await self._log_clean(list(chain.from_iterable(message_mappings.values())), channels, ctx.author)
-
- async def _log_clean(self, messages: list[Message], channels: CleanChannels, invoker: User) -> None:
- """Log the deleted messages to the modlog."""
- if not messages:
- # Can't build an embed, nothing to clean!
- raise BadArgument("No matching messages could be found.")
-
- # Reverse the list to have reverse chronological order
- log_messages = reversed(messages)
- log_url = await self.mod_log.upload_log(log_messages, invoker.id)
+ await self._log_clean(deleted_messages, channels, ctx.author)
- # Build the embed and send it
- if channels == "*":
- target_channels = "all channels"
- else:
- target_channels = ", ".join(channel.mention for channel in channels)
-
- message = (
- f"**{len(messages)}** messages deleted in {target_channels} by "
- f"{invoker.mention}\n\n"
- f"A log of the deleted messages can be found [here]({log_url})."
- )
-
- await self.mod_log.send_log_message(
- icon_url=Icons.message_bulk_delete,
- colour=Colour(Colours.soft_red),
- title="Bulk message delete",
- text=message,
- channel_id=Channels.mod_log,
- )
+ # region: Commands
@group(invoke_without_command=True, name="clean", aliases=["clear", "purge"])
@has_any_role(*MODERATION_ROLES)
@@ -412,6 +447,8 @@ class Clean(Cog):
delete_after = None
await ctx.send(embed=embed, delete_after=delete_after)
+ # endregion
+
def setup(bot: Bot) -> None:
"""Load the Clean cog."""