diff options
| -rw-r--r-- | bot/converters.py | 19 | ||||
| -rw-r--r-- | bot/exts/moderation/clean.py | 595 | ||||
| -rw-r--r-- | bot/exts/utils/clean.py | 274 | 
3 files changed, 614 insertions, 274 deletions
| diff --git a/bot/converters.py b/bot/converters.py index f50acb9c6..0984fa0a3 100644 --- a/bot/converters.py +++ b/bot/converters.py @@ -395,6 +395,24 @@ class Duration(DurationDelta):              raise BadArgument(f"`{duration}` results in a datetime outside the supported range.") +class Age(DurationDelta): +    """Convert duration strings into UTC datetime.datetime objects.""" + +    async def convert(self, ctx: Context, duration: str) -> datetime: +        """ +        Converts a `duration` string to a datetime object that's `duration` in the past. + +        The converter supports the same symbols for each unit of time as its parent class. +        """ +        delta = await super().convert(ctx, duration) +        now = datetime.now(timezone.utc) + +        try: +            return now - delta +        except (ValueError, OverflowError): +            raise BadArgument(f"`{duration}` results in a datetime outside the supported range.") + +  class OffTopicName(Converter):      """A converter that ensures an added off-topic name is valid.""" @@ -601,6 +619,7 @@ if t.TYPE_CHECKING:      SourceConverter = SourceType  # noqa: F811      DurationDelta = relativedelta  # noqa: F811      Duration = datetime  # noqa: F811 +    Age = datetime  # noqa: F811      OffTopicName = str  # noqa: F811      ISODateTime = datetime  # noqa: F811      HushDurationConverter = int  # noqa: F811 diff --git a/bot/exts/moderation/clean.py b/bot/exts/moderation/clean.py new file mode 100644 index 000000000..94494b983 --- /dev/null +++ b/bot/exts/moderation/clean.py @@ -0,0 +1,595 @@ +import contextlib +import logging +import re +import time +from collections import defaultdict +from contextlib import suppress +from datetime import datetime +from itertools import islice +from typing import Any, Callable, Iterable, Literal, Optional, TYPE_CHECKING, Union + +from discord import Colour, Message, NotFound, TextChannel, User, errors +from discord.ext.commands import Cog, Context, Converter, Greedy, group, has_any_role +from discord.ext.commands.converter import TextChannelConverter +from discord.ext.commands.errors import BadArgument + +from bot.bot import Bot +from bot.constants import Channels, CleanMessages, Colours, Emojis, Event, Icons, MODERATION_ROLES +from bot.converters import Age, ISODateTime +from bot.exts.moderation.modlog import ModLog +from bot.utils.channel import is_mod_channel + +log = logging.getLogger(__name__) + +# Default number of messages to look at in each channel. +DEFAULT_TRAVERSE = 10 +# Number of seconds before command invocations and responses are deleted in non-moderation channels. +MESSAGE_DELETE_DELAY = 5 + +# Type alias for checks for whether a message should be deleted. +Predicate = Callable[[Message], bool] +# Type alias for message lookup ranges. +CleanLimit = Union[Message, Age, ISODateTime] + + +class CleanChannels(Converter): +    """A converter that turns the given string to a list of channels to clean, or the literal `*` for all channels.""" + +    _channel_converter = TextChannelConverter() + +    async def convert(self, ctx: Context, argument: str) -> Union[Literal["*"], list[TextChannel]]: +        """Converts a string to a list of channels to clean, or the literal `*` for all channels.""" +        if argument == "*": +            return "*" +        return [await self._channel_converter.convert(ctx, channel) for channel in argument.split()] + + +class Regex(Converter): +    """A converter that takes a string in the form `.+` and returns the contents of the inline code compiled.""" + +    async def convert(self, ctx: Context, argument: str) -> re.Pattern: +        """Strips the backticks from the string and compiles it to a regex pattern.""" +        match = re.fullmatch(r"`(.+?)`", argument) +        if not match: +            raise BadArgument("Regex pattern missing wrapping backticks") +        try: +            return re.compile(match.group(1), re.IGNORECASE + re.DOTALL) +        except re.error as e: +            raise BadArgument(f"Regex error: {e.msg}") + + +if TYPE_CHECKING:  # Used to allow method resolution in IDEs like in converters.py. +    CleanChannels = Union[Literal["*"], list[TextChannel]]  # noqa: F811 +    Regex = re.Pattern  # noqa: F811 + + +class Clean(Cog): +    """ +    A cog that allows messages to be deleted in bulk while applying various filters. + +    You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a +    specific regular expression. + +    The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be +    used to view the messages in the Discord dark theme style. +    """ + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.cleaning = False + +    @property +    def mod_log(self) -> ModLog: +        """Get currently loaded ModLog cog instance.""" +        return self.bot.get_cog("ModLog") + +    # region: Helper functions + +    @staticmethod +    def _validate_input( +            traverse: int, +            channels: Optional[CleanChannels], +            bots_only: bool, +            users: Optional[list[User]], +            first_limit: Optional[CleanLimit], +            second_limit: Optional[CleanLimit], +    ) -> None: +        """Raise errors if an argument value or a combination of values is invalid.""" +        # Is this an acceptable amount of messages to traverse? +        if traverse > CleanMessages.message_limit: +            raise BadArgument(f"Cannot traverse more than {CleanMessages.message_limit} messages.") + +        if (isinstance(first_limit, Message) or isinstance(second_limit, Message)) and channels: +            raise BadArgument("Both a message limit and channels specified.") + +        if isinstance(first_limit, Message) and isinstance(second_limit, Message): +            # Messages are not in same channel. +            if first_limit.channel != second_limit.channel: +                raise BadArgument("Message limits are in different channels.") + +        if users and bots_only: +            raise BadArgument("Marked as bots only, but users were specified.") + +        # This is an implementation error rather than user error. +        if second_limit and not first_limit: +            raise ValueError("Second limit specified without the first.") + +    @staticmethod +    async def _send_expiring_message(ctx: Context, content: str) -> None: +        """Send `content` to the context channel. Automatically delete if it's not a mod channel.""" +        delete_after = None if is_mod_channel(ctx.channel) else MESSAGE_DELETE_DELAY +        await ctx.send(content, delete_after=delete_after) + +    @staticmethod +    def _build_predicate( +        bots_only: bool = False, +        users: Optional[list[User]] = None, +        regex: Optional[re.Pattern] = None, +        first_limit: Optional[datetime] = None, +        second_limit: Optional[datetime] = None, +    ) -> Predicate: +        """Return the predicate that decides whether to delete a given message.""" +        def predicate_bots_only(message: Message) -> bool: +            """Return True if the message was sent by a bot.""" +            return message.author.bot + +        def predicate_specific_users(message: Message) -> bool: +            """Return True if the message was sent by the user provided in the _clean_messages call.""" +            return message.author in users + +        def predicate_regex(message: Message) -> bool: +            """Check if the regex provided in _clean_messages matches the message content or any embed attributes.""" +            content = [message.content] + +            # Add the content for all embed attributes +            for embed in message.embeds: +                content.append(embed.title) +                content.append(embed.description) +                content.append(embed.footer.text) +                content.append(embed.author.name) +                for field in embed.fields: +                    content.append(field.name) +                    content.append(field.value) + +            # Get rid of empty attributes and turn it into a string +            content = "\n".join(attr for attr in content if attr) + +            # Now let's see if there's a regex match +            return bool(regex.search(content)) + +        def predicate_range(message: Message) -> bool: +            """Check if the message age is between the two limits.""" +            return first_limit <= message.created_at <= second_limit + +        def predicate_after(message: Message) -> bool: +            """Check if the message is older than the first limit.""" +            return message.created_at >= first_limit + +        predicates = [] +        # Set up the correct predicate +        if bots_only: +            predicates.append(predicate_bots_only)  # Delete messages from bots +        if users: +            predicates.append(predicate_specific_users)  # Delete messages from specific user +        if regex: +            predicates.append(predicate_regex)  # Delete messages that match regex +        # Add up to one of the following: +        if second_limit: +            predicates.append(predicate_range)  # Delete messages in the specified age range +        elif first_limit: +            predicates.append(predicate_after)  # Delete messages older than specific message + +        if not predicates: +            return lambda m: True +        if len(predicates) == 1: +            return predicates[0] +        return lambda m: all(pred(m) for pred in predicates) + +    async def _delete_invocation(self, ctx: Context) -> None: +        """Delete the command invocation if it's not in a mod channel.""" +        if not is_mod_channel(ctx.channel): +            self.mod_log.ignore(Event.message_delete, ctx.message.id) +            try: +                await ctx.message.delete() +            except errors.NotFound: +                # Invocation message has already been deleted +                log.info("Tried to delete invocation message, but it was already deleted.") + +    def _get_messages_from_cache(self, traverse: int, to_delete: Predicate) -> tuple[defaultdict[Any, list], list[int]]: +        """Helper function for getting messages from the cache.""" +        message_mappings = defaultdict(list) +        message_ids = [] +        for message in islice(self.bot.cached_messages, traverse): +            if not self.cleaning: +                # Cleaning was canceled +                return message_mappings, message_ids + +            if to_delete(message): +                message_mappings[message.channel].append(message) +                message_ids.append(message.id) + +        return message_mappings, message_ids + +    async def _get_messages_from_channels( +        self, +        traverse: int, +        channels: Iterable[TextChannel], +        to_delete: Predicate, +        before: Optional[datetime] = None, +        after: Optional[datetime] = None +    ) -> tuple[defaultdict[Any, list], list]: +        message_mappings = defaultdict(list) +        message_ids = [] + +        for channel in channels: +            async for message in channel.history(limit=traverse, before=before, after=after): + +                if not self.cleaning: +                    # Cleaning was canceled, return empty containers. +                    return defaultdict(list), [] + +                if to_delete(message): +                    message_mappings[message.channel].append(message) +                    message_ids.append(message.id) + +        return message_mappings, message_ids + +    @staticmethod +    def is_older_than_14d(message: Message) -> bool: +        """ +        Precisely checks if message is older than 14 days, bulk deletion limit. + +        Inspired by how purge works internally. +        Comparison on message age could possibly be less accurate which in turn would resort in problems +        with message deletion if said messages are very close to the 14d mark. +        """ +        two_weeks_old_snowflake = int((time.time() - 14 * 24 * 60 * 60) * 1000.0 - 1420070400000) << 22 +        return message.id < two_weeks_old_snowflake + +    async def _delete_messages_individually(self, messages: list[Message]) -> list[Message]: +        """Delete each message in the list unless cleaning is cancelled. Return the deleted messages.""" +        deleted = [] +        for message in messages: +            # Ensure that deletion was not canceled +            if not self.cleaning: +                return deleted +            with contextlib.suppress(NotFound):  # Message doesn't exist or was already deleted +                await message.delete() +                deleted.append(message) +        return deleted + +    async def _delete_found(self, message_mappings: dict[TextChannel, list[Message]]) -> list[Message]: +        """ +        Delete the detected messages. + +        Deletion is made in bulk per channel for messages less than 14d old. +        The function returns the deleted messages. +        If cleaning was cancelled in the middle, return messages already deleted. +        """ +        deleted = [] +        for channel, messages in message_mappings.items(): +            to_delete = [] + +            delete_old = False +            for current_index, message in enumerate(messages):  # noqa: B007 +                if not self.cleaning: +                    # Means that the cleaning was canceled +                    return deleted + +                if self.is_older_than_14d(message): +                    # Further messages are too old to be deleted in bulk +                    delete_old = True +                    break + +                to_delete.append(message) + +                if len(to_delete) == 100: +                    # Only up to 100 messages can be deleted in a bulk +                    await channel.delete_messages(to_delete) +                    deleted.extend(to_delete) +                    to_delete.clear() + +            if not self.cleaning: +                return deleted +            if len(to_delete) > 0: +                # Deleting any leftover messages if there are any +                await channel.delete_messages(to_delete) +                deleted.extend(to_delete) + +            if not self.cleaning: +                return deleted +            if delete_old: +                old_deleted = await self._delete_messages_individually(messages[current_index:]) +                deleted.extend(old_deleted) + +        return deleted + +    async def _modlog_cleaned_messages(self, messages: list[Message], channels: CleanChannels, ctx: Context) -> bool: +        """Log the deleted messages to the modlog. Return True if logging was successful.""" +        if not messages: +            # Can't build an embed, nothing to clean! +            await self._send_expiring_message(ctx, ":x: No matching messages could be found.") +            return False + +        # Reverse the list to have reverse chronological order +        log_messages = reversed(messages) +        log_url = await self.mod_log.upload_log(log_messages, ctx.author.id) + +        # Build the embed and send it +        if channels == "*": +            target_channels = "all channels" +        else: +            target_channels = ", ".join(channel.mention for channel in channels) + +        message = ( +            f"**{len(messages)}** messages deleted in {target_channels} by " +            f"{ctx.author.mention}\n\n" +            f"A log of the deleted messages can be found [here]({log_url})." +        ) + +        await self.mod_log.send_log_message( +            icon_url=Icons.message_bulk_delete, +            colour=Colour(Colours.soft_red), +            title="Bulk message delete", +            text=message, +            channel_id=Channels.mod_log, +        ) + +        return True + +    # endregion + +    async def _clean_messages( +        self, +        ctx: Context, +        traverse: int, +        channels: Optional[CleanChannels], +        bots_only: bool = False, +        users: Optional[list[User]] = None, +        regex: Optional[re.Pattern] = None, +        first_limit: Optional[CleanLimit] = None, +        second_limit: Optional[CleanLimit] = None, +        use_cache: Optional[bool] = True +    ) -> None: +        """A helper function that does the actual message cleaning.""" +        self._validate_input(traverse, channels, bots_only, users, first_limit, second_limit) + +        # Are we already performing a clean? +        if self.cleaning: +            await self._send_expiring_message( +                ctx, ":x: Please wait for the currently ongoing clean operation to complete." +            ) +            return +        self.cleaning = True + +        # Default to using the invoking context's channel or the channel of the message limit(s). +        if not channels: +            # Input was validated - if first_limit is a message, second_limit won't point at a different channel. +            if isinstance(first_limit, Message): +                channels = [first_limit.channel] +            elif isinstance(second_limit, Message): +                channels = [second_limit.channel] +            else: +                channels = [ctx.channel] + +        if isinstance(first_limit, Message): +            first_limit = first_limit.created_at +        if isinstance(second_limit, Message): +            second_limit = second_limit.created_at +        if first_limit and second_limit: +            first_limit, second_limit = sorted([first_limit, second_limit]) + +        # Needs to be called after standardizing the input. +        predicate = self._build_predicate(bots_only, users, regex, first_limit, second_limit) + +        # Delete the invocation first +        await self._delete_invocation(ctx) + +        if channels == "*" and use_cache: +            message_mappings, message_ids = self._get_messages_from_cache(traverse=traverse, to_delete=predicate) +        else: +            deletion_channels = channels +            if channels == "*": +                deletion_channels = [channel for channel in ctx.guild.channels if isinstance(channel, TextChannel)] +            message_mappings, message_ids = await self._get_messages_from_channels( +                traverse=traverse, +                channels=deletion_channels, +                to_delete=predicate, +                before=second_limit, +                after=first_limit  # Remember first is the earlier datetime. +            ) + +        if not self.cleaning: +            # Means that the cleaning was canceled +            return + +        # Now let's delete the actual messages with purge. +        self.mod_log.ignore(Event.message_delete, *message_ids) +        deleted_messages = await self._delete_found(message_mappings) +        self.cleaning = False + +        logged = await self._modlog_cleaned_messages(deleted_messages, channels, ctx) + +        if logged and is_mod_channel(ctx.channel): +            with suppress(NotFound):  # Can happen if the invoker deleted their own messages. +                await ctx.message.add_reaction(Emojis.check_mark) + +    # region: Commands + +    @group(invoke_without_command=True, name="clean", aliases=["clear", "purge"]) +    async def clean_group( +        self, +        ctx: Context, +        users: Greedy[User] = None, +        traverse: Optional[int] = None, +        first_limit: Optional[CleanLimit] = None, +        second_limit: Optional[CleanLimit] = None, +        use_cache: Optional[bool] = None, +        bots_only: Optional[bool] = False, +        regex: Optional[Regex] = None, +        *, +        channels: CleanChannels = None  # "Optional" with discord.py silently ignores incorrect input. +    ) -> None: +        """ +        Commands for cleaning messages in channels. + +        If arguments are provided, will act as a master command from which all subcommands can be derived. + +        \u2003• `users`: A series of user mentions, ID's, or names. +        \u2003• `traverse`: The number of messages to look at in each channel. If using the cache, will look at the +        first `traverse` messages in the cache. +        \u2003• `first_limit` and `second_limit`: A message, a duration delta, or an ISO datetime. +        If a message is provided, cleaning will happen in that channel, and channels cannot be provided. +        If a limit is provided, multiple channels cannot be provided. +        If only one of them is provided, acts as `clean until`. If both are provided, acts as `clean between`. +        \u2003• `use_cache`: Whether to use the message cache. +        If not provided, will default to False unless an asterisk is used for the channels. +        \u2003• `bots_only`: Whether to delete only bots. If specified, users cannot be specified. +        \u2003• `regex`: A regex pattern the message must contain to be deleted. +        The pattern must be provided enclosed in backticks. +        If the pattern contains spaces, it still needs to be enclosed in double quotes on top of that. +        \u2003• `channels`: A series of channels to delete in, or an asterisk to delete from all channels. +        """ +        if not any([traverse, users, first_limit, second_limit, regex, channels]): +            await ctx.send_help(ctx.command) +            return + +        if not traverse: +            if first_limit: +                traverse = CleanMessages.message_limit +            else: +                traverse = DEFAULT_TRAVERSE +        if use_cache is None: +            use_cache = channels == "*" + +        await self._clean_messages( +            ctx, traverse, channels, bots_only, users, regex, first_limit, second_limit, use_cache +        ) + +    @clean_group.command(name="user", aliases=["users"]) +    async def clean_user( +        self, +        ctx: Context, +        user: User, +        traverse: Optional[int] = DEFAULT_TRAVERSE, +        use_cache: Optional[bool] = True, +        *, +        channels: CleanChannels = None +    ) -> None: +        """Delete messages posted by the provided user, stop cleaning after traversing `traverse` messages.""" +        await self._clean_messages(ctx, traverse, users=[user], channels=channels, use_cache=use_cache) + +    @clean_group.command(name="all", aliases=["everything"]) +    async def clean_all( +        self, +        ctx: Context, +        traverse: Optional[int] = DEFAULT_TRAVERSE, +        use_cache: Optional[bool] = True, +        *, +        channels: CleanChannels = None +    ) -> None: +        """Delete all messages, regardless of poster, stop cleaning after traversing `traverse` messages.""" +        await self._clean_messages(ctx, traverse, channels=channels, use_cache=use_cache) + +    @clean_group.command(name="bots", aliases=["bot"]) +    async def clean_bots( +        self, +        ctx: Context, +        traverse: Optional[int] = DEFAULT_TRAVERSE, +        use_cache: Optional[bool] = True, +        *, +        channels: CleanChannels = None +    ) -> None: +        """Delete all messages posted by a bot, stop cleaning after traversing `traverse` messages.""" +        await self._clean_messages(ctx, traverse, bots_only=True, channels=channels, use_cache=use_cache) + +    @clean_group.command(name="regex", aliases=["word", "expression", "pattern"]) +    async def clean_regex( +        self, +        ctx: Context, +        regex: Regex, +        traverse: Optional[int] = DEFAULT_TRAVERSE, +        use_cache: Optional[bool] = True, +        *, +        channels: CleanChannels = None +    ) -> None: +        """ +        Delete all messages that match a certain regex, stop cleaning after traversing `traverse` messages. + +        The pattern must be provided enclosed in backticks. +        If the pattern contains spaces, it still needs to be enclosed in double quotes on top of that. +        For example: `[0-9]` +        """ +        await self._clean_messages(ctx, traverse, regex=regex, channels=channels, use_cache=use_cache) + +    @clean_group.command(name="until") +    async def clean_until( +        self, +        ctx: Context, +        until: CleanLimit, +        channel: TextChannel = None +    ) -> None: +        """ +        Delete all messages until a certain limit. + +        A limit can be either a message, and ISO date-time string, or a time delta. +        If a message is specified, `channel` cannot be specified. +        """ +        await self._clean_messages( +            ctx, +            CleanMessages.message_limit, +            channels=[channel] if channel else None, +            first_limit=until, +        ) + +    @clean_group.command(name="between", aliases=["after-until", "from-to"]) +    async def clean_between( +        self, +        ctx: Context, +        first_limit: CleanLimit, +        second_limit: CleanLimit, +        channel: TextChannel = None +    ) -> None: +        """ +        Delete all messages within range. + +        The range is specified through two limits. +        A limit can be either a message, and ISO date-time string, or a time delta. + +        If two messages are specified, they both must be in the same channel. +        If a message is specified, `channel` cannot be specified. +        """ +        await self._clean_messages( +            ctx, +            CleanMessages.message_limit, +            channels=[channel] if channel else None, +            first_limit=first_limit, +            second_limit=second_limit, +        ) + +    @clean_group.command(name="stop", aliases=["cancel", "abort"]) +    async def clean_cancel(self, ctx: Context) -> None: +        """If there is an ongoing cleaning process, attempt to immediately cancel it.""" +        if not self.cleaning: +            message = ":question: There's no cleaning going on." +        else: +            self.cleaning = False +            message = f"{Emojis.check_mark} Clean interrupted." + +        await self._send_expiring_message(ctx, message) +        await self._delete_invocation(ctx) + +    # endregion + +    async def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators to invoke the commands in this cog.""" +        return await has_any_role(*MODERATION_ROLES).predicate(ctx) + +    async def cog_command_error(self, ctx: Context, error: Exception) -> None: +        """Safely end the cleaning operation on unexpected errors.""" +        self.cleaning = False + + +def setup(bot: Bot) -> None: +    """Load the Clean cog.""" +    bot.add_cog(Clean(bot)) diff --git a/bot/exts/utils/clean.py b/bot/exts/utils/clean.py deleted file mode 100644 index a2e2d3eed..000000000 --- a/bot/exts/utils/clean.py +++ /dev/null @@ -1,274 +0,0 @@ -import random -import re -from typing import Iterable, Optional - -from discord import Colour, Embed, Message, TextChannel, User, errors -from discord.ext import commands -from discord.ext.commands import Cog, Context, group, has_any_role - -from bot.bot import Bot -from bot.constants import Channels, CleanMessages, Colours, Event, Icons, MODERATION_ROLES, NEGATIVE_REPLIES -from bot.exts.moderation.modlog import ModLog -from bot.log import get_logger - -log = get_logger(__name__) - - -class Clean(Cog): -    """ -    A cog that allows messages to be deleted in bulk, while applying various filters. - -    You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a -    specific regular expression. - -    The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be -    used to view the messages in the Discord dark theme style. -    """ - -    def __init__(self, bot: Bot): -        self.bot = bot -        self.cleaning = False - -    @property -    def mod_log(self) -> ModLog: -        """Get currently loaded ModLog cog instance.""" -        return self.bot.get_cog("ModLog") - -    async def _clean_messages( -        self, -        amount: int, -        ctx: Context, -        channels: Iterable[TextChannel], -        bots_only: bool = False, -        user: User = None, -        regex: Optional[str] = None, -        until_message: Optional[Message] = None, -    ) -> None: -        """A helper function that does the actual message cleaning.""" -        def predicate_bots_only(message: Message) -> bool: -            """Return True if the message was sent by a bot.""" -            return message.author.bot - -        def predicate_specific_user(message: Message) -> bool: -            """Return True if the message was sent by the user provided in the _clean_messages call.""" -            return message.author == user - -        def predicate_regex(message: Message) -> bool: -            """Check if the regex provided in _clean_messages matches the message content or any embed attributes.""" -            content = [message.content] - -            # Add the content for all embed attributes -            for embed in message.embeds: -                content.append(embed.title) -                content.append(embed.description) -                content.append(embed.footer.text) -                content.append(embed.author.name) -                for field in embed.fields: -                    content.append(field.name) -                    content.append(field.value) - -            # Get rid of empty attributes and turn it into a string -            content = [attr for attr in content if attr] -            content = "\n".join(content) - -            # Now let's see if there's a regex match -            if not content: -                return False -            else: -                return bool(re.search(regex.lower(), content.lower())) - -        # Is this an acceptable amount of messages to clean? -        if amount > CleanMessages.message_limit: -            embed = Embed( -                color=Colour(Colours.soft_red), -                title=random.choice(NEGATIVE_REPLIES), -                description=f"You cannot clean more than {CleanMessages.message_limit} messages." -            ) -            await ctx.send(embed=embed) -            return - -        # Are we already performing a clean? -        if self.cleaning: -            embed = Embed( -                color=Colour(Colours.soft_red), -                title=random.choice(NEGATIVE_REPLIES), -                description="Please wait for the currently ongoing clean operation to complete." -            ) -            await ctx.send(embed=embed) -            return - -        # Set up the correct predicate -        if bots_only: -            predicate = predicate_bots_only      # Delete messages from bots -        elif user: -            predicate = predicate_specific_user  # Delete messages from specific user -        elif regex: -            predicate = predicate_regex          # Delete messages that match regex -        else: -            predicate = lambda *_: True          # Delete all messages - -        # Default to using the invoking context's channel -        if not channels: -            channels = [ctx.channel] - -        # Delete the invocation first -        self.mod_log.ignore(Event.message_delete, ctx.message.id) -        try: -            await ctx.message.delete() -        except errors.NotFound: -            # Invocation message has already been deleted -            log.info("Tried to delete invocation message, but it was already deleted.") - -        messages = [] -        message_ids = [] -        self.cleaning = True - -        # Find the IDs of the messages to delete. IDs are needed in order to ignore mod log events. -        for channel in channels: -            async for message in channel.history(limit=amount): - -                # If at any point the cancel command is invoked, we should stop. -                if not self.cleaning: -                    return - -                # If we are looking for specific message. -                if until_message: - -                    # we could use ID's here however in case if the message we are looking for gets deleted, -                    # we won't have a way to figure that out thus checking for datetime should be more reliable -                    if message.created_at < until_message.created_at: -                        # means we have found the message until which we were supposed to be deleting. -                        break - -                    # Since we will be using `delete_messages` method of a TextChannel and we need message objects to -                    # use it as well as to send logs we will start appending messages here instead adding them from -                    # purge. -                    messages.append(message) - -                # If the message passes predicate, let's save it. -                if predicate is None or predicate(message): -                    message_ids.append(message.id) - -        self.cleaning = False - -        # Now let's delete the actual messages with purge. -        self.mod_log.ignore(Event.message_delete, *message_ids) -        for channel in channels: -            if until_message: -                for i in range(0, len(messages), 100): -                    # while purge automatically handles the amount of messages -                    # delete_messages only allows for up to 100 messages at once -                    # thus we need to paginate the amount to always be <= 100 -                    await channel.delete_messages(messages[i:i + 100]) -            else: -                messages += await channel.purge(limit=amount, check=predicate) - -        # Reverse the list to restore chronological order -        if messages: -            messages = reversed(messages) -            log_url = await self.mod_log.upload_log(messages, ctx.author.id) -        else: -            # Can't build an embed, nothing to clean! -            embed = Embed( -                color=Colour(Colours.soft_red), -                description="No matching messages could be found." -            ) -            await ctx.send(embed=embed, delete_after=10) -            return - -        # Build the embed and send it -        target_channels = ", ".join(channel.mention for channel in channels) - -        message = ( -            f"**{len(message_ids)}** messages deleted in {target_channels} by " -            f"{ctx.author.mention}\n\n" -            f"A log of the deleted messages can be found [here]({log_url})." -        ) - -        await self.mod_log.send_log_message( -            icon_url=Icons.message_bulk_delete, -            colour=Colour(Colours.soft_red), -            title="Bulk message delete", -            text=message, -            channel_id=Channels.mod_log, -        ) - -    @group(invoke_without_command=True, name="clean", aliases=["clear", "purge"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_group(self, ctx: Context) -> None: -        """Commands for cleaning messages in channels.""" -        await ctx.send_help(ctx.command) - -    @clean_group.command(name="user", aliases=["users"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_user( -        self, -        ctx: Context, -        user: User, -        amount: Optional[int] = 10, -        channels: commands.Greedy[TextChannel] = None -    ) -> None: -        """Delete messages posted by the provided user, stop cleaning after traversing `amount` messages.""" -        await self._clean_messages(amount, ctx, user=user, channels=channels) - -    @clean_group.command(name="all", aliases=["everything"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_all( -        self, -        ctx: Context, -        amount: Optional[int] = 10, -        channels: commands.Greedy[TextChannel] = None -    ) -> None: -        """Delete all messages, regardless of poster, stop cleaning after traversing `amount` messages.""" -        await self._clean_messages(amount, ctx, channels=channels) - -    @clean_group.command(name="bots", aliases=["bot"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_bots( -        self, -        ctx: Context, -        amount: Optional[int] = 10, -        channels: commands.Greedy[TextChannel] = None -    ) -> None: -        """Delete all messages posted by a bot, stop cleaning after traversing `amount` messages.""" -        await self._clean_messages(amount, ctx, bots_only=True, channels=channels) - -    @clean_group.command(name="regex", aliases=["word", "expression"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_regex( -        self, -        ctx: Context, -        regex: str, -        amount: Optional[int] = 10, -        channels: commands.Greedy[TextChannel] = None -    ) -> None: -        """Delete all messages that match a certain regex, stop cleaning after traversing `amount` messages.""" -        await self._clean_messages(amount, ctx, regex=regex, channels=channels) - -    @clean_group.command(name="message", aliases=["messages"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_message(self, ctx: Context, message: Message) -> None: -        """Delete all messages until certain message, stop cleaning after hitting the `message`.""" -        await self._clean_messages( -            CleanMessages.message_limit, -            ctx, -            channels=[message.channel], -            until_message=message -        ) - -    @clean_group.command(name="stop", aliases=["cancel", "abort"]) -    @has_any_role(*MODERATION_ROLES) -    async def clean_cancel(self, ctx: Context) -> None: -        """If there is an ongoing cleaning process, attempt to immediately cancel it.""" -        self.cleaning = False - -        embed = Embed( -            color=Colour.blurple(), -            description="Clean interrupted." -        ) -        await ctx.send(embed=embed, delete_after=10) - - -def setup(bot: Bot) -> None: -    """Load the Clean cog.""" -    bot.add_cog(Clean(bot)) | 
