aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Johannes Christ <[email protected]>2021-10-30 11:38:14 +0200
committerGravatar GitHub <[email protected]>2021-10-30 11:38:14 +0200
commitaf8fe4a29f38bd0048e2e7d87ad4623f445d2ae1 (patch)
tree3de9c3b0b32a35b6a0da7667386f87b6512c8cf6
parentMerge pull request #1909 from python-discord/pin-consistency (diff)
parentMerge branch 'main' into cleanrework (diff)
Merge pull request #1793 from python-discord/cleanrework
Clean Cog Rework
-rw-r--r--bot/converters.py19
-rw-r--r--bot/exts/moderation/clean.py595
-rw-r--r--bot/exts/utils/clean.py274
3 files changed, 614 insertions, 274 deletions
diff --git a/bot/converters.py b/bot/converters.py
index f50acb9c6..0984fa0a3 100644
--- a/bot/converters.py
+++ b/bot/converters.py
@@ -395,6 +395,24 @@ class Duration(DurationDelta):
raise BadArgument(f"`{duration}` results in a datetime outside the supported range.")
+class Age(DurationDelta):
+ """Convert duration strings into UTC datetime.datetime objects."""
+
+ async def convert(self, ctx: Context, duration: str) -> datetime:
+ """
+ Converts a `duration` string to a datetime object that's `duration` in the past.
+
+ The converter supports the same symbols for each unit of time as its parent class.
+ """
+ delta = await super().convert(ctx, duration)
+ now = datetime.now(timezone.utc)
+
+ try:
+ return now - delta
+ except (ValueError, OverflowError):
+ raise BadArgument(f"`{duration}` results in a datetime outside the supported range.")
+
+
class OffTopicName(Converter):
"""A converter that ensures an added off-topic name is valid."""
@@ -601,6 +619,7 @@ if t.TYPE_CHECKING:
SourceConverter = SourceType # noqa: F811
DurationDelta = relativedelta # noqa: F811
Duration = datetime # noqa: F811
+ Age = datetime # noqa: F811
OffTopicName = str # noqa: F811
ISODateTime = datetime # noqa: F811
HushDurationConverter = int # noqa: F811
diff --git a/bot/exts/moderation/clean.py b/bot/exts/moderation/clean.py
new file mode 100644
index 000000000..94494b983
--- /dev/null
+++ b/bot/exts/moderation/clean.py
@@ -0,0 +1,595 @@
+import contextlib
+import logging
+import re
+import time
+from collections import defaultdict
+from contextlib import suppress
+from datetime import datetime
+from itertools import islice
+from typing import Any, Callable, Iterable, Literal, Optional, TYPE_CHECKING, Union
+
+from discord import Colour, Message, NotFound, TextChannel, User, errors
+from discord.ext.commands import Cog, Context, Converter, Greedy, group, has_any_role
+from discord.ext.commands.converter import TextChannelConverter
+from discord.ext.commands.errors import BadArgument
+
+from bot.bot import Bot
+from bot.constants import Channels, CleanMessages, Colours, Emojis, Event, Icons, MODERATION_ROLES
+from bot.converters import Age, ISODateTime
+from bot.exts.moderation.modlog import ModLog
+from bot.utils.channel import is_mod_channel
+
+log = logging.getLogger(__name__)
+
+# Default number of messages to look at in each channel.
+DEFAULT_TRAVERSE = 10
+# Number of seconds before command invocations and responses are deleted in non-moderation channels.
+MESSAGE_DELETE_DELAY = 5
+
+# Type alias for checks for whether a message should be deleted.
+Predicate = Callable[[Message], bool]
+# Type alias for message lookup ranges.
+CleanLimit = Union[Message, Age, ISODateTime]
+
+
+class CleanChannels(Converter):
+ """A converter that turns the given string to a list of channels to clean, or the literal `*` for all channels."""
+
+ _channel_converter = TextChannelConverter()
+
+ async def convert(self, ctx: Context, argument: str) -> Union[Literal["*"], list[TextChannel]]:
+ """Converts a string to a list of channels to clean, or the literal `*` for all channels."""
+ if argument == "*":
+ return "*"
+ return [await self._channel_converter.convert(ctx, channel) for channel in argument.split()]
+
+
+class Regex(Converter):
+ """A converter that takes a string in the form `.+` and returns the contents of the inline code compiled."""
+
+ async def convert(self, ctx: Context, argument: str) -> re.Pattern:
+ """Strips the backticks from the string and compiles it to a regex pattern."""
+ match = re.fullmatch(r"`(.+?)`", argument)
+ if not match:
+ raise BadArgument("Regex pattern missing wrapping backticks")
+ try:
+ return re.compile(match.group(1), re.IGNORECASE + re.DOTALL)
+ except re.error as e:
+ raise BadArgument(f"Regex error: {e.msg}")
+
+
+if TYPE_CHECKING: # Used to allow method resolution in IDEs like in converters.py.
+ CleanChannels = Union[Literal["*"], list[TextChannel]] # noqa: F811
+ Regex = re.Pattern # noqa: F811
+
+
+class Clean(Cog):
+ """
+ A cog that allows messages to be deleted in bulk while applying various filters.
+
+ You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a
+ specific regular expression.
+
+ The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be
+ used to view the messages in the Discord dark theme style.
+ """
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.cleaning = False
+
+ @property
+ def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
+ return self.bot.get_cog("ModLog")
+
+ # region: Helper functions
+
+ @staticmethod
+ def _validate_input(
+ traverse: int,
+ channels: Optional[CleanChannels],
+ bots_only: bool,
+ users: Optional[list[User]],
+ first_limit: Optional[CleanLimit],
+ second_limit: Optional[CleanLimit],
+ ) -> None:
+ """Raise errors if an argument value or a combination of values is invalid."""
+ # Is this an acceptable amount of messages to traverse?
+ if traverse > CleanMessages.message_limit:
+ raise BadArgument(f"Cannot traverse more than {CleanMessages.message_limit} messages.")
+
+ if (isinstance(first_limit, Message) or isinstance(second_limit, Message)) and channels:
+ raise BadArgument("Both a message limit and channels specified.")
+
+ if isinstance(first_limit, Message) and isinstance(second_limit, Message):
+ # Messages are not in same channel.
+ if first_limit.channel != second_limit.channel:
+ raise BadArgument("Message limits are in different channels.")
+
+ if users and bots_only:
+ raise BadArgument("Marked as bots only, but users were specified.")
+
+ # This is an implementation error rather than user error.
+ if second_limit and not first_limit:
+ raise ValueError("Second limit specified without the first.")
+
+ @staticmethod
+ async def _send_expiring_message(ctx: Context, content: str) -> None:
+ """Send `content` to the context channel. Automatically delete if it's not a mod channel."""
+ delete_after = None if is_mod_channel(ctx.channel) else MESSAGE_DELETE_DELAY
+ await ctx.send(content, delete_after=delete_after)
+
+ @staticmethod
+ def _build_predicate(
+ bots_only: bool = False,
+ users: Optional[list[User]] = None,
+ regex: Optional[re.Pattern] = None,
+ first_limit: Optional[datetime] = None,
+ second_limit: Optional[datetime] = None,
+ ) -> Predicate:
+ """Return the predicate that decides whether to delete a given message."""
+ def predicate_bots_only(message: Message) -> bool:
+ """Return True if the message was sent by a bot."""
+ return message.author.bot
+
+ def predicate_specific_users(message: Message) -> bool:
+ """Return True if the message was sent by the user provided in the _clean_messages call."""
+ return message.author in users
+
+ def predicate_regex(message: Message) -> bool:
+ """Check if the regex provided in _clean_messages matches the message content or any embed attributes."""
+ content = [message.content]
+
+ # Add the content for all embed attributes
+ for embed in message.embeds:
+ content.append(embed.title)
+ content.append(embed.description)
+ content.append(embed.footer.text)
+ content.append(embed.author.name)
+ for field in embed.fields:
+ content.append(field.name)
+ content.append(field.value)
+
+ # Get rid of empty attributes and turn it into a string
+ content = "\n".join(attr for attr in content if attr)
+
+ # Now let's see if there's a regex match
+ return bool(regex.search(content))
+
+ def predicate_range(message: Message) -> bool:
+ """Check if the message age is between the two limits."""
+ return first_limit <= message.created_at <= second_limit
+
+ def predicate_after(message: Message) -> bool:
+ """Check if the message is older than the first limit."""
+ return message.created_at >= first_limit
+
+ predicates = []
+ # Set up the correct predicate
+ if bots_only:
+ predicates.append(predicate_bots_only) # Delete messages from bots
+ if users:
+ predicates.append(predicate_specific_users) # Delete messages from specific user
+ if regex:
+ predicates.append(predicate_regex) # Delete messages that match regex
+ # Add up to one of the following:
+ if second_limit:
+ predicates.append(predicate_range) # Delete messages in the specified age range
+ elif first_limit:
+ predicates.append(predicate_after) # Delete messages older than specific message
+
+ if not predicates:
+ return lambda m: True
+ if len(predicates) == 1:
+ return predicates[0]
+ return lambda m: all(pred(m) for pred in predicates)
+
+ async def _delete_invocation(self, ctx: Context) -> None:
+ """Delete the command invocation if it's not in a mod channel."""
+ if not is_mod_channel(ctx.channel):
+ self.mod_log.ignore(Event.message_delete, ctx.message.id)
+ try:
+ await ctx.message.delete()
+ except errors.NotFound:
+ # Invocation message has already been deleted
+ log.info("Tried to delete invocation message, but it was already deleted.")
+
+ def _get_messages_from_cache(self, traverse: int, to_delete: Predicate) -> tuple[defaultdict[Any, list], list[int]]:
+ """Helper function for getting messages from the cache."""
+ message_mappings = defaultdict(list)
+ message_ids = []
+ for message in islice(self.bot.cached_messages, traverse):
+ if not self.cleaning:
+ # Cleaning was canceled
+ return message_mappings, message_ids
+
+ if to_delete(message):
+ message_mappings[message.channel].append(message)
+ message_ids.append(message.id)
+
+ return message_mappings, message_ids
+
+ async def _get_messages_from_channels(
+ self,
+ traverse: int,
+ channels: Iterable[TextChannel],
+ to_delete: Predicate,
+ before: Optional[datetime] = None,
+ after: Optional[datetime] = None
+ ) -> tuple[defaultdict[Any, list], list]:
+ message_mappings = defaultdict(list)
+ message_ids = []
+
+ for channel in channels:
+ async for message in channel.history(limit=traverse, before=before, after=after):
+
+ if not self.cleaning:
+ # Cleaning was canceled, return empty containers.
+ return defaultdict(list), []
+
+ if to_delete(message):
+ message_mappings[message.channel].append(message)
+ message_ids.append(message.id)
+
+ return message_mappings, message_ids
+
+ @staticmethod
+ def is_older_than_14d(message: Message) -> bool:
+ """
+ Precisely checks if message is older than 14 days, bulk deletion limit.
+
+ Inspired by how purge works internally.
+ Comparison on message age could possibly be less accurate which in turn would resort in problems
+ with message deletion if said messages are very close to the 14d mark.
+ """
+ two_weeks_old_snowflake = int((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22
+ return message.id < two_weeks_old_snowflake
+
+ async def _delete_messages_individually(self, messages: list[Message]) -> list[Message]:
+ """Delete each message in the list unless cleaning is cancelled. Return the deleted messages."""
+ deleted = []
+ for message in messages:
+ # Ensure that deletion was not canceled
+ if not self.cleaning:
+ return deleted
+ with contextlib.suppress(NotFound): # Message doesn't exist or was already deleted
+ await message.delete()
+ deleted.append(message)
+ return deleted
+
+ async def _delete_found(self, message_mappings: dict[TextChannel, list[Message]]) -> list[Message]:
+ """
+ Delete the detected messages.
+
+ Deletion is made in bulk per channel for messages less than 14d old.
+ The function returns the deleted messages.
+ If cleaning was cancelled in the middle, return messages already deleted.
+ """
+ deleted = []
+ for channel, messages in message_mappings.items():
+ to_delete = []
+
+ delete_old = False
+ for current_index, message in enumerate(messages): # noqa: B007
+ if not self.cleaning:
+ # Means that the cleaning was canceled
+ return deleted
+
+ if self.is_older_than_14d(message):
+ # Further messages are too old to be deleted in bulk
+ delete_old = True
+ break
+
+ to_delete.append(message)
+
+ if len(to_delete) == 100:
+ # Only up to 100 messages can be deleted in a bulk
+ await channel.delete_messages(to_delete)
+ deleted.extend(to_delete)
+ to_delete.clear()
+
+ if not self.cleaning:
+ return deleted
+ if len(to_delete) > 0:
+ # Deleting any leftover messages if there are any
+ await channel.delete_messages(to_delete)
+ deleted.extend(to_delete)
+
+ if not self.cleaning:
+ return deleted
+ if delete_old:
+ old_deleted = await self._delete_messages_individually(messages[current_index:])
+ deleted.extend(old_deleted)
+
+ return deleted
+
+ async def _modlog_cleaned_messages(self, messages: list[Message], channels: CleanChannels, ctx: Context) -> bool:
+ """Log the deleted messages to the modlog. Return True if logging was successful."""
+ if not messages:
+ # Can't build an embed, nothing to clean!
+ await self._send_expiring_message(ctx, ":x: No matching messages could be found.")
+ return False
+
+ # Reverse the list to have reverse chronological order
+ log_messages = reversed(messages)
+ log_url = await self.mod_log.upload_log(log_messages, ctx.author.id)
+
+ # Build the embed and send it
+ if channels == "*":
+ target_channels = "all channels"
+ else:
+ target_channels = ", ".join(channel.mention for channel in channels)
+
+ message = (
+ f"**{len(messages)}** messages deleted in {target_channels} by "
+ f"{ctx.author.mention}\n\n"
+ f"A log of the deleted messages can be found [here]({log_url})."
+ )
+
+ await self.mod_log.send_log_message(
+ icon_url=Icons.message_bulk_delete,
+ colour=Colour(Colours.soft_red),
+ title="Bulk message delete",
+ text=message,
+ channel_id=Channels.mod_log,
+ )
+
+ return True
+
+ # endregion
+
+ async def _clean_messages(
+ self,
+ ctx: Context,
+ traverse: int,
+ channels: Optional[CleanChannels],
+ bots_only: bool = False,
+ users: Optional[list[User]] = None,
+ regex: Optional[re.Pattern] = None,
+ first_limit: Optional[CleanLimit] = None,
+ second_limit: Optional[CleanLimit] = None,
+ use_cache: Optional[bool] = True
+ ) -> None:
+ """A helper function that does the actual message cleaning."""
+ self._validate_input(traverse, channels, bots_only, users, first_limit, second_limit)
+
+ # Are we already performing a clean?
+ if self.cleaning:
+ await self._send_expiring_message(
+ ctx, ":x: Please wait for the currently ongoing clean operation to complete."
+ )
+ return
+ self.cleaning = True
+
+ # Default to using the invoking context's channel or the channel of the message limit(s).
+ if not channels:
+ # Input was validated - if first_limit is a message, second_limit won't point at a different channel.
+ if isinstance(first_limit, Message):
+ channels = [first_limit.channel]
+ elif isinstance(second_limit, Message):
+ channels = [second_limit.channel]
+ else:
+ channels = [ctx.channel]
+
+ if isinstance(first_limit, Message):
+ first_limit = first_limit.created_at
+ if isinstance(second_limit, Message):
+ second_limit = second_limit.created_at
+ if first_limit and second_limit:
+ first_limit, second_limit = sorted([first_limit, second_limit])
+
+ # Needs to be called after standardizing the input.
+ predicate = self._build_predicate(bots_only, users, regex, first_limit, second_limit)
+
+ # Delete the invocation first
+ await self._delete_invocation(ctx)
+
+ if channels == "*" and use_cache:
+ message_mappings, message_ids = self._get_messages_from_cache(traverse=traverse, to_delete=predicate)
+ else:
+ deletion_channels = channels
+ if channels == "*":
+ deletion_channels = [channel for channel in ctx.guild.channels if isinstance(channel, TextChannel)]
+ message_mappings, message_ids = await self._get_messages_from_channels(
+ traverse=traverse,
+ channels=deletion_channels,
+ to_delete=predicate,
+ before=second_limit,
+ after=first_limit # Remember first is the earlier datetime.
+ )
+
+ if not self.cleaning:
+ # Means that the cleaning was canceled
+ return
+
+ # Now let's delete the actual messages with purge.
+ self.mod_log.ignore(Event.message_delete, *message_ids)
+ deleted_messages = await self._delete_found(message_mappings)
+ self.cleaning = False
+
+ logged = await self._modlog_cleaned_messages(deleted_messages, channels, ctx)
+
+ if logged and is_mod_channel(ctx.channel):
+ with suppress(NotFound): # Can happen if the invoker deleted their own messages.
+ await ctx.message.add_reaction(Emojis.check_mark)
+
+ # region: Commands
+
+ @group(invoke_without_command=True, name="clean", aliases=["clear", "purge"])
+ async def clean_group(
+ self,
+ ctx: Context,
+ users: Greedy[User] = None,
+ traverse: Optional[int] = None,
+ first_limit: Optional[CleanLimit] = None,
+ second_limit: Optional[CleanLimit] = None,
+ use_cache: Optional[bool] = None,
+ bots_only: Optional[bool] = False,
+ regex: Optional[Regex] = None,
+ *,
+ channels: CleanChannels = None # "Optional" with discord.py silently ignores incorrect input.
+ ) -> None:
+ """
+ Commands for cleaning messages in channels.
+
+ If arguments are provided, will act as a master command from which all subcommands can be derived.
+
+ \u2003• `users`: A series of user mentions, ID's, or names.
+ \u2003• `traverse`: The number of messages to look at in each channel. If using the cache, will look at the
+ first `traverse` messages in the cache.
+ \u2003• `first_limit` and `second_limit`: A message, a duration delta, or an ISO datetime.
+ If a message is provided, cleaning will happen in that channel, and channels cannot be provided.
+ If a limit is provided, multiple channels cannot be provided.
+ If only one of them is provided, acts as `clean until`. If both are provided, acts as `clean between`.
+ \u2003• `use_cache`: Whether to use the message cache.
+ If not provided, will default to False unless an asterisk is used for the channels.
+ \u2003• `bots_only`: Whether to delete only bots. If specified, users cannot be specified.
+ \u2003• `regex`: A regex pattern the message must contain to be deleted.
+ The pattern must be provided enclosed in backticks.
+ If the pattern contains spaces, it still needs to be enclosed in double quotes on top of that.
+ \u2003• `channels`: A series of channels to delete in, or an asterisk to delete from all channels.
+ """
+ if not any([traverse, users, first_limit, second_limit, regex, channels]):
+ await ctx.send_help(ctx.command)
+ return
+
+ if not traverse:
+ if first_limit:
+ traverse = CleanMessages.message_limit
+ else:
+ traverse = DEFAULT_TRAVERSE
+ if use_cache is None:
+ use_cache = channels == "*"
+
+ await self._clean_messages(
+ ctx, traverse, channels, bots_only, users, regex, first_limit, second_limit, use_cache
+ )
+
+ @clean_group.command(name="user", aliases=["users"])
+ async def clean_user(
+ self,
+ ctx: Context,
+ user: User,
+ traverse: Optional[int] = DEFAULT_TRAVERSE,
+ use_cache: Optional[bool] = True,
+ *,
+ channels: CleanChannels = None
+ ) -> None:
+ """Delete messages posted by the provided user, stop cleaning after traversing `traverse` messages."""
+ await self._clean_messages(ctx, traverse, users=[user], channels=channels, use_cache=use_cache)
+
+ @clean_group.command(name="all", aliases=["everything"])
+ async def clean_all(
+ self,
+ ctx: Context,
+ traverse: Optional[int] = DEFAULT_TRAVERSE,
+ use_cache: Optional[bool] = True,
+ *,
+ channels: CleanChannels = None
+ ) -> None:
+ """Delete all messages, regardless of poster, stop cleaning after traversing `traverse` messages."""
+ await self._clean_messages(ctx, traverse, channels=channels, use_cache=use_cache)
+
+ @clean_group.command(name="bots", aliases=["bot"])
+ async def clean_bots(
+ self,
+ ctx: Context,
+ traverse: Optional[int] = DEFAULT_TRAVERSE,
+ use_cache: Optional[bool] = True,
+ *,
+ channels: CleanChannels = None
+ ) -> None:
+ """Delete all messages posted by a bot, stop cleaning after traversing `traverse` messages."""
+ await self._clean_messages(ctx, traverse, bots_only=True, channels=channels, use_cache=use_cache)
+
+ @clean_group.command(name="regex", aliases=["word", "expression", "pattern"])
+ async def clean_regex(
+ self,
+ ctx: Context,
+ regex: Regex,
+ traverse: Optional[int] = DEFAULT_TRAVERSE,
+ use_cache: Optional[bool] = True,
+ *,
+ channels: CleanChannels = None
+ ) -> None:
+ """
+ Delete all messages that match a certain regex, stop cleaning after traversing `traverse` messages.
+
+ The pattern must be provided enclosed in backticks.
+ If the pattern contains spaces, it still needs to be enclosed in double quotes on top of that.
+ For example: `[0-9]`
+ """
+ await self._clean_messages(ctx, traverse, regex=regex, channels=channels, use_cache=use_cache)
+
+ @clean_group.command(name="until")
+ async def clean_until(
+ self,
+ ctx: Context,
+ until: CleanLimit,
+ channel: TextChannel = None
+ ) -> None:
+ """
+ Delete all messages until a certain limit.
+
+ A limit can be either a message, and ISO date-time string, or a time delta.
+ If a message is specified, `channel` cannot be specified.
+ """
+ await self._clean_messages(
+ ctx,
+ CleanMessages.message_limit,
+ channels=[channel] if channel else None,
+ first_limit=until,
+ )
+
+ @clean_group.command(name="between", aliases=["after-until", "from-to"])
+ async def clean_between(
+ self,
+ ctx: Context,
+ first_limit: CleanLimit,
+ second_limit: CleanLimit,
+ channel: TextChannel = None
+ ) -> None:
+ """
+ Delete all messages within range.
+
+ The range is specified through two limits.
+ A limit can be either a message, and ISO date-time string, or a time delta.
+
+ If two messages are specified, they both must be in the same channel.
+ If a message is specified, `channel` cannot be specified.
+ """
+ await self._clean_messages(
+ ctx,
+ CleanMessages.message_limit,
+ channels=[channel] if channel else None,
+ first_limit=first_limit,
+ second_limit=second_limit,
+ )
+
+ @clean_group.command(name="stop", aliases=["cancel", "abort"])
+ async def clean_cancel(self, ctx: Context) -> None:
+ """If there is an ongoing cleaning process, attempt to immediately cancel it."""
+ if not self.cleaning:
+ message = ":question: There's no cleaning going on."
+ else:
+ self.cleaning = False
+ message = f"{Emojis.check_mark} Clean interrupted."
+
+ await self._send_expiring_message(ctx, message)
+ await self._delete_invocation(ctx)
+
+ # endregion
+
+ async def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return await has_any_role(*MODERATION_ROLES).predicate(ctx)
+
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Safely end the cleaning operation on unexpected errors."""
+ self.cleaning = False
+
+
+def setup(bot: Bot) -> None:
+ """Load the Clean cog."""
+ bot.add_cog(Clean(bot))
diff --git a/bot/exts/utils/clean.py b/bot/exts/utils/clean.py
deleted file mode 100644
index a2e2d3eed..000000000
--- a/bot/exts/utils/clean.py
+++ /dev/null
@@ -1,274 +0,0 @@
-import random
-import re
-from typing import Iterable, Optional
-
-from discord import Colour, Embed, Message, TextChannel, User, errors
-from discord.ext import commands
-from discord.ext.commands import Cog, Context, group, has_any_role
-
-from bot.bot import Bot
-from bot.constants import Channels, CleanMessages, Colours, Event, Icons, MODERATION_ROLES, NEGATIVE_REPLIES
-from bot.exts.moderation.modlog import ModLog
-from bot.log import get_logger
-
-log = get_logger(__name__)
-
-
-class Clean(Cog):
- """
- A cog that allows messages to be deleted in bulk, while applying various filters.
-
- You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a
- specific regular expression.
-
- The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be
- used to view the messages in the Discord dark theme style.
- """
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.cleaning = False
-
- @property
- def mod_log(self) -> ModLog:
- """Get currently loaded ModLog cog instance."""
- return self.bot.get_cog("ModLog")
-
- async def _clean_messages(
- self,
- amount: int,
- ctx: Context,
- channels: Iterable[TextChannel],
- bots_only: bool = False,
- user: User = None,
- regex: Optional[str] = None,
- until_message: Optional[Message] = None,
- ) -> None:
- """A helper function that does the actual message cleaning."""
- def predicate_bots_only(message: Message) -> bool:
- """Return True if the message was sent by a bot."""
- return message.author.bot
-
- def predicate_specific_user(message: Message) -> bool:
- """Return True if the message was sent by the user provided in the _clean_messages call."""
- return message.author == user
-
- def predicate_regex(message: Message) -> bool:
- """Check if the regex provided in _clean_messages matches the message content or any embed attributes."""
- content = [message.content]
-
- # Add the content for all embed attributes
- for embed in message.embeds:
- content.append(embed.title)
- content.append(embed.description)
- content.append(embed.footer.text)
- content.append(embed.author.name)
- for field in embed.fields:
- content.append(field.name)
- content.append(field.value)
-
- # Get rid of empty attributes and turn it into a string
- content = [attr for attr in content if attr]
- content = "\n".join(content)
-
- # Now let's see if there's a regex match
- if not content:
- return False
- else:
- return bool(re.search(regex.lower(), content.lower()))
-
- # Is this an acceptable amount of messages to clean?
- if amount > CleanMessages.message_limit:
- embed = Embed(
- color=Colour(Colours.soft_red),
- title=random.choice(NEGATIVE_REPLIES),
- description=f"You cannot clean more than {CleanMessages.message_limit} messages."
- )
- await ctx.send(embed=embed)
- return
-
- # Are we already performing a clean?
- if self.cleaning:
- embed = Embed(
- color=Colour(Colours.soft_red),
- title=random.choice(NEGATIVE_REPLIES),
- description="Please wait for the currently ongoing clean operation to complete."
- )
- await ctx.send(embed=embed)
- return
-
- # Set up the correct predicate
- if bots_only:
- predicate = predicate_bots_only # Delete messages from bots
- elif user:
- predicate = predicate_specific_user # Delete messages from specific user
- elif regex:
- predicate = predicate_regex # Delete messages that match regex
- else:
- predicate = lambda *_: True # Delete all messages
-
- # Default to using the invoking context's channel
- if not channels:
- channels = [ctx.channel]
-
- # Delete the invocation first
- self.mod_log.ignore(Event.message_delete, ctx.message.id)
- try:
- await ctx.message.delete()
- except errors.NotFound:
- # Invocation message has already been deleted
- log.info("Tried to delete invocation message, but it was already deleted.")
-
- messages = []
- message_ids = []
- self.cleaning = True
-
- # Find the IDs of the messages to delete. IDs are needed in order to ignore mod log events.
- for channel in channels:
- async for message in channel.history(limit=amount):
-
- # If at any point the cancel command is invoked, we should stop.
- if not self.cleaning:
- return
-
- # If we are looking for specific message.
- if until_message:
-
- # we could use ID's here however in case if the message we are looking for gets deleted,
- # we won't have a way to figure that out thus checking for datetime should be more reliable
- if message.created_at < until_message.created_at:
- # means we have found the message until which we were supposed to be deleting.
- break
-
- # Since we will be using `delete_messages` method of a TextChannel and we need message objects to
- # use it as well as to send logs we will start appending messages here instead adding them from
- # purge.
- messages.append(message)
-
- # If the message passes predicate, let's save it.
- if predicate is None or predicate(message):
- message_ids.append(message.id)
-
- self.cleaning = False
-
- # Now let's delete the actual messages with purge.
- self.mod_log.ignore(Event.message_delete, *message_ids)
- for channel in channels:
- if until_message:
- for i in range(0, len(messages), 100):
- # while purge automatically handles the amount of messages
- # delete_messages only allows for up to 100 messages at once
- # thus we need to paginate the amount to always be <= 100
- await channel.delete_messages(messages[i:i + 100])
- else:
- messages += await channel.purge(limit=amount, check=predicate)
-
- # Reverse the list to restore chronological order
- if messages:
- messages = reversed(messages)
- log_url = await self.mod_log.upload_log(messages, ctx.author.id)
- else:
- # Can't build an embed, nothing to clean!
- embed = Embed(
- color=Colour(Colours.soft_red),
- description="No matching messages could be found."
- )
- await ctx.send(embed=embed, delete_after=10)
- return
-
- # Build the embed and send it
- target_channels = ", ".join(channel.mention for channel in channels)
-
- message = (
- f"**{len(message_ids)}** messages deleted in {target_channels} by "
- f"{ctx.author.mention}\n\n"
- f"A log of the deleted messages can be found [here]({log_url})."
- )
-
- await self.mod_log.send_log_message(
- icon_url=Icons.message_bulk_delete,
- colour=Colour(Colours.soft_red),
- title="Bulk message delete",
- text=message,
- channel_id=Channels.mod_log,
- )
-
- @group(invoke_without_command=True, name="clean", aliases=["clear", "purge"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_group(self, ctx: Context) -> None:
- """Commands for cleaning messages in channels."""
- await ctx.send_help(ctx.command)
-
- @clean_group.command(name="user", aliases=["users"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_user(
- self,
- ctx: Context,
- user: User,
- amount: Optional[int] = 10,
- channels: commands.Greedy[TextChannel] = None
- ) -> None:
- """Delete messages posted by the provided user, stop cleaning after traversing `amount` messages."""
- await self._clean_messages(amount, ctx, user=user, channels=channels)
-
- @clean_group.command(name="all", aliases=["everything"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_all(
- self,
- ctx: Context,
- amount: Optional[int] = 10,
- channels: commands.Greedy[TextChannel] = None
- ) -> None:
- """Delete all messages, regardless of poster, stop cleaning after traversing `amount` messages."""
- await self._clean_messages(amount, ctx, channels=channels)
-
- @clean_group.command(name="bots", aliases=["bot"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_bots(
- self,
- ctx: Context,
- amount: Optional[int] = 10,
- channels: commands.Greedy[TextChannel] = None
- ) -> None:
- """Delete all messages posted by a bot, stop cleaning after traversing `amount` messages."""
- await self._clean_messages(amount, ctx, bots_only=True, channels=channels)
-
- @clean_group.command(name="regex", aliases=["word", "expression"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_regex(
- self,
- ctx: Context,
- regex: str,
- amount: Optional[int] = 10,
- channels: commands.Greedy[TextChannel] = None
- ) -> None:
- """Delete all messages that match a certain regex, stop cleaning after traversing `amount` messages."""
- await self._clean_messages(amount, ctx, regex=regex, channels=channels)
-
- @clean_group.command(name="message", aliases=["messages"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_message(self, ctx: Context, message: Message) -> None:
- """Delete all messages until certain message, stop cleaning after hitting the `message`."""
- await self._clean_messages(
- CleanMessages.message_limit,
- ctx,
- channels=[message.channel],
- until_message=message
- )
-
- @clean_group.command(name="stop", aliases=["cancel", "abort"])
- @has_any_role(*MODERATION_ROLES)
- async def clean_cancel(self, ctx: Context) -> None:
- """If there is an ongoing cleaning process, attempt to immediately cancel it."""
- self.cleaning = False
-
- embed = Embed(
- color=Colour.blurple(),
- description="Clean interrupted."
- )
- await ctx.send(embed=embed, delete_after=10)
-
-
-def setup(bot: Bot) -> None:
- """Load the Clean cog."""
- bot.add_cog(Clean(bot))