diff options
-rw-r--r-- | bot/cogs/antispam.py | 192 |
1 files changed, 143 insertions, 49 deletions
diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py index 02d5d64ce..22f9794f3 100644 --- a/bot/cogs/antispam.py +++ b/bot/cogs/antispam.py @@ -1,6 +1,9 @@ +import asyncio import logging +from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import List +from operator import itemgetter +from typing import Dict, Iterable, List, Set from discord import Colour, Member, Message, Object, TextChannel from discord.ext.commands import Bot @@ -33,18 +36,102 @@ RULE_FUNCTION_MAPPING = { } +@dataclass +class DeletionContext: + """Represents a Deletion Context for a single spam event.""" + + channel: TextChannel + members: Dict[int, Member] = field(default_factory=dict) + rules: Set[str] = field(default_factory=set) + messages: Dict[int, Message] = field(default_factory=dict) + + def add(self, rule_name: str, members: Iterable[Member], messages: Iterable[Message]) -> None: + """Adds new rule violation events to the deletion context.""" + self.rules.add(rule_name) + + for member in members: + if member.id not in self.members: + self.members[member.id] = member + + for message in messages: + if message.id not in self.messages: + self.messages[message.id] = message + + async def upload_messages(self, actor_id: int, modlog: ModLog) -> None: + """Method that takes care of uploading the queue and posting modlog alert.""" + triggered_by_users = ", ".join(f"{m.display_name}#{m.discriminator} (`{m.id}`)" for m in self.members.values()) + + mod_alert_message = ( + f"**Triggered by:** {triggered_by_users}\n" + f"**Channel:** {self.channel.mention}\n" + f"**Rules:** {', '.join(rule for rule in self.rules)}\n" + ) + + # For multiple messages or those with excessive newlines, use the logs API + if len(self.messages) > 1 or 'newlines' in self.rules: + url = await modlog.upload_log(self.messages.values(), actor_id) + mod_alert_message += f"A complete log of the offending messages can be found [here]({url})" + else: + mod_alert_message += "Message:\n" + [message] = self.messages.values() + content = message[0].clean_content + remaining_chars = 2040 - len(mod_alert_message) + + if len(content) > remaining_chars: + content = content[:remaining_chars] + "..." + + mod_alert_message += f"{content}" + + *_, last_message = self.messages.values() + await modlog.send_log_message( + icon_url=Icons.filtering, + colour=Colour(Colours.soft_red), + title=f"Spam detected!", + text=mod_alert_message, + thumbnail=last_message.author.avatar_url_as(static_format="png"), + channel_id=Channels.mod_alerts, + ping_everyone=AntiSpamConfig.ping_everyone + ) + + class AntiSpam: - def __init__(self, bot: Bot): + """Cog that controls our anti-spam measures.""" + + def __init__(self, bot: Bot, validation_errors: bool) -> None: self.bot = bot + self.validation_errors = validation_errors role_id = AntiSpamConfig.punishment['role_id'] self.muted_role = Object(role_id) self.expiration_date_converter = ExpirationDate() + self.message_deletion_queue = dict() + self.queue_consumption_tasks = dict() + @property def mod_log(self) -> ModLog: + """Allows for easy access of the ModLog cog.""" return self.bot.get_cog("ModLog") - async def on_message(self, message: Message): + async def on_ready(self): + """Unloads the cog and alerts admins if configuration validation failed.""" + if self.validation_errors: + body = "**The following errors were encountered:**\n" + body += "\n".join(f"- {error}" for error in self.validation_errors.values()) + body += "\n\n**The cog has been unloaded.**" + + await self.mod_log.send_log_message( + title=f"Error: AntiSpam configuration validation failed!", + text=body, + ping_everyone=True, + icon_url=Icons.token_removed, + colour=Colour.red() + ) + + self.bot.remove_cog(self.__class__.__name__) + return + + async def on_message(self, message: Message) -> None: + """Applies the antispam rules to each received message.""" if ( not message.guild or message.guild.id != GuildConfig.id @@ -57,7 +144,7 @@ class AntiSpam: # Fetch the rule configuration with the highest rule interval. max_interval_config = max( AntiSpamConfig.rules.values(), - key=lambda config: config['interval'] + key=itemgetter('interval') ) max_interval = max_interval_config['interval'] @@ -65,6 +152,7 @@ class AntiSpam: earliest_relevant_at = datetime.utcnow() - timedelta(seconds=max_interval) relevant_messages = [ msg async for msg in message.channel.history(after=earliest_relevant_at, reverse=False) + if not msg.author.bot ] for rule_name in AntiSpamConfig.rules: @@ -85,59 +173,48 @@ class AntiSpam: if result is not None: reason, members, relevant_messages = result full_reason = f"`{rule_name}` rule: {reason}" + + # If there's no spam event going on for this channel, start a new Message Deletion Context + if message.channel.id not in self.message_deletion_queue: + log.trace(f"Creating queue for channel `{message.channel.id}`") + self.message_deletion_queue[message.channel.id] = DeletionContext(channel=message.channel) + self.queue_consumption_tasks = self.bot.loop.create_task( + self._process_deletion_context(message.channel.id) + ) + + # Add the relevant of this trigger to the Deletion Context + self.message_deletion_queue[message.channel.id].add( + rule_name=rule_name, + members=members, + messages=relevant_messages + ) + for member in members: # Fire it off as a background task to ensure # that the sleep doesn't block further tasks self.bot.loop.create_task( - self.punish(message, member, full_reason, relevant_messages, rule_name) + self.punish(message, member, full_reason) ) await self.maybe_delete_messages(message.channel, relevant_messages) break - async def punish(self, msg: Message, member: Member, reason: str, messages: List[Message], rule_name: str): - # Sanity check to ensure we're not lagging behind - if self.muted_role not in member.roles: + async def punish(self, msg: Message, member: Member, reason: str) -> None: + """Punishes the given member for triggering an antispam rule.""" + if not any(role.id == self.muted_role.id for role in member.roles): remove_role_after = AntiSpamConfig.punishment['remove_after'] - mod_alert_message = ( - f"**Triggered by:** {member.display_name}#{member.discriminator} (`{member.id}`)\n" - f"**Channel:** {msg.channel.mention}\n" - f"**Reason:** {reason}\n" - ) - - # For multiple messages or those with excessive newlines, use the logs API - if len(messages) > 1 or rule_name == 'newlines': - url = await self.mod_log.upload_log(messages, msg.guild.me.id) - mod_alert_message += f"A complete log of the offending messages can be found [here]({url})" - else: - mod_alert_message += "Message:\n" - content = messages[0].clean_content - remaining_chars = 2040 - len(mod_alert_message) - - if len(content) > remaining_chars: - content = content[:remaining_chars] + "..." - - mod_alert_message += f"{content}" - - # Return the mod log message Context that we can use to post the infraction - mod_log_ctx = await self.mod_log.send_log_message( - icon_url=Icons.filtering, - colour=Colour(Colours.soft_red), - title=f"Spam detected!", - text=mod_alert_message, - thumbnail=msg.author.avatar_url_as(static_format="png"), - channel_id=Channels.mod_alerts, - ping_everyone=AntiSpamConfig.ping_everyone - ) + # We need context, let's get it + context = await self.bot.get_context(msg) # Since we're going to invoke the tempmute command directly, we need to manually call the converter. - dt_remove_role_after = await self.expiration_date_converter.convert(mod_log_ctx, f"{remove_role_after}S") - await mod_log_ctx.invoke(Moderation.tempmute, member, dt_remove_role_after, reason=reason) + dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S") + await context.invoke(Moderation.tempmute, member, dt_remove_role_after, reason=reason) + + async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]) -> None: + """Cleans the messages if cleaning is configured.""" - async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]): - # Is deletion of offending messages actually enabled? if AntiSpamConfig.clean_offending: # If we have more than one message, we can use bulk delete. @@ -152,24 +229,41 @@ class AntiSpam: self.mod_log.ignore(Event.message_delete, messages[0].id) await messages[0].delete() + async def _process_deletion_context(self, context_id: int) -> None: + """Processes the Deletion Context queue.""" + log.trace("Sleeping before processing message deletion queue.") + await asyncio.sleep(10) -def validate_config(): + if context_id not in self.message_deletion_queue: + log.error(f"Started processing deletion queue for context `{context_id}`, but it was not found!") + return + + deletion_context = self.message_deletion_queue.pop(context_id) + await deletion_context.upload_messages(self.bot.user.id, self.mod_log) + + +def validate_config() -> bool: + """Validates the antispam configs.""" + validation_errors = {} for name, config in AntiSpamConfig.rules.items(): if name not in RULE_FUNCTION_MAPPING: - raise ValueError( + log.error( f"Unrecognized antispam rule `{name}`. " f"Valid rules are: {', '.join(RULE_FUNCTION_MAPPING)}" ) - + validation_errors[name] = f"`{name}` is not recognized as an antispam rule." for required_key in ('interval', 'max'): if required_key not in config: - raise ValueError( + log.error( f"`{required_key}` is required but was not " f"set in rule `{name}`'s configuration." ) + validation_errors[name] = f"Key `{required_key}` is required but not set for rule `{name}`" + return validation_errors -def setup(bot: Bot): - validate_config() - bot.add_cog(AntiSpam(bot)) +def setup(bot: Bot) -> None: + """Setup for the cog.""" + validation_errors = validate_config() + bot.add_cog(AntiSpam(bot, validation_errors)) log.info("Cog loaded: AntiSpam") |