aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/cogs/antispam.py192
1 files changed, 143 insertions, 49 deletions
diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py
index 02d5d64ce..22f9794f3 100644
--- a/bot/cogs/antispam.py
+++ b/bot/cogs/antispam.py
@@ -1,6 +1,9 @@
+import asyncio
import logging
+from dataclasses import dataclass, field
from datetime import datetime, timedelta
-from typing import List
+from operator import itemgetter
+from typing import Dict, Iterable, List, Set
from discord import Colour, Member, Message, Object, TextChannel
from discord.ext.commands import Bot
@@ -33,18 +36,102 @@ RULE_FUNCTION_MAPPING = {
}
+@dataclass
+class DeletionContext:
+ """Represents a Deletion Context for a single spam event."""
+
+ channel: TextChannel
+ members: Dict[int, Member] = field(default_factory=dict)
+ rules: Set[str] = field(default_factory=set)
+ messages: Dict[int, Message] = field(default_factory=dict)
+
+ def add(self, rule_name: str, members: Iterable[Member], messages: Iterable[Message]) -> None:
+ """Adds new rule violation events to the deletion context."""
+ self.rules.add(rule_name)
+
+ for member in members:
+ if member.id not in self.members:
+ self.members[member.id] = member
+
+ for message in messages:
+ if message.id not in self.messages:
+ self.messages[message.id] = message
+
+ async def upload_messages(self, actor_id: int, modlog: ModLog) -> None:
+ """Method that takes care of uploading the queue and posting modlog alert."""
+ triggered_by_users = ", ".join(f"{m.display_name}#{m.discriminator} (`{m.id}`)" for m in self.members.values())
+
+ mod_alert_message = (
+ f"**Triggered by:** {triggered_by_users}\n"
+ f"**Channel:** {self.channel.mention}\n"
+ f"**Rules:** {', '.join(rule for rule in self.rules)}\n"
+ )
+
+ # For multiple messages or those with excessive newlines, use the logs API
+ if len(self.messages) > 1 or 'newlines' in self.rules:
+ url = await modlog.upload_log(self.messages.values(), actor_id)
+ mod_alert_message += f"A complete log of the offending messages can be found [here]({url})"
+ else:
+ mod_alert_message += "Message:\n"
+ [message] = self.messages.values()
+ content = message[0].clean_content
+ remaining_chars = 2040 - len(mod_alert_message)
+
+ if len(content) > remaining_chars:
+ content = content[:remaining_chars] + "..."
+
+ mod_alert_message += f"{content}"
+
+ *_, last_message = self.messages.values()
+ await modlog.send_log_message(
+ icon_url=Icons.filtering,
+ colour=Colour(Colours.soft_red),
+ title=f"Spam detected!",
+ text=mod_alert_message,
+ thumbnail=last_message.author.avatar_url_as(static_format="png"),
+ channel_id=Channels.mod_alerts,
+ ping_everyone=AntiSpamConfig.ping_everyone
+ )
+
+
class AntiSpam:
- def __init__(self, bot: Bot):
+ """Cog that controls our anti-spam measures."""
+
+ def __init__(self, bot: Bot, validation_errors: bool) -> None:
self.bot = bot
+ self.validation_errors = validation_errors
role_id = AntiSpamConfig.punishment['role_id']
self.muted_role = Object(role_id)
self.expiration_date_converter = ExpirationDate()
+ self.message_deletion_queue = dict()
+ self.queue_consumption_tasks = dict()
+
@property
def mod_log(self) -> ModLog:
+ """Allows for easy access of the ModLog cog."""
return self.bot.get_cog("ModLog")
- async def on_message(self, message: Message):
+ async def on_ready(self):
+ """Unloads the cog and alerts admins if configuration validation failed."""
+ if self.validation_errors:
+ body = "**The following errors were encountered:**\n"
+ body += "\n".join(f"- {error}" for error in self.validation_errors.values())
+ body += "\n\n**The cog has been unloaded.**"
+
+ await self.mod_log.send_log_message(
+ title=f"Error: AntiSpam configuration validation failed!",
+ text=body,
+ ping_everyone=True,
+ icon_url=Icons.token_removed,
+ colour=Colour.red()
+ )
+
+ self.bot.remove_cog(self.__class__.__name__)
+ return
+
+ async def on_message(self, message: Message) -> None:
+ """Applies the antispam rules to each received message."""
if (
not message.guild
or message.guild.id != GuildConfig.id
@@ -57,7 +144,7 @@ class AntiSpam:
# Fetch the rule configuration with the highest rule interval.
max_interval_config = max(
AntiSpamConfig.rules.values(),
- key=lambda config: config['interval']
+ key=itemgetter('interval')
)
max_interval = max_interval_config['interval']
@@ -65,6 +152,7 @@ class AntiSpam:
earliest_relevant_at = datetime.utcnow() - timedelta(seconds=max_interval)
relevant_messages = [
msg async for msg in message.channel.history(after=earliest_relevant_at, reverse=False)
+ if not msg.author.bot
]
for rule_name in AntiSpamConfig.rules:
@@ -85,59 +173,48 @@ class AntiSpam:
if result is not None:
reason, members, relevant_messages = result
full_reason = f"`{rule_name}` rule: {reason}"
+
+ # If there's no spam event going on for this channel, start a new Message Deletion Context
+ if message.channel.id not in self.message_deletion_queue:
+ log.trace(f"Creating queue for channel `{message.channel.id}`")
+ self.message_deletion_queue[message.channel.id] = DeletionContext(channel=message.channel)
+ self.queue_consumption_tasks = self.bot.loop.create_task(
+ self._process_deletion_context(message.channel.id)
+ )
+
+ # Add the relevant of this trigger to the Deletion Context
+ self.message_deletion_queue[message.channel.id].add(
+ rule_name=rule_name,
+ members=members,
+ messages=relevant_messages
+ )
+
for member in members:
# Fire it off as a background task to ensure
# that the sleep doesn't block further tasks
self.bot.loop.create_task(
- self.punish(message, member, full_reason, relevant_messages, rule_name)
+ self.punish(message, member, full_reason)
)
await self.maybe_delete_messages(message.channel, relevant_messages)
break
- async def punish(self, msg: Message, member: Member, reason: str, messages: List[Message], rule_name: str):
- # Sanity check to ensure we're not lagging behind
- if self.muted_role not in member.roles:
+ async def punish(self, msg: Message, member: Member, reason: str) -> None:
+ """Punishes the given member for triggering an antispam rule."""
+ if not any(role.id == self.muted_role.id for role in member.roles):
remove_role_after = AntiSpamConfig.punishment['remove_after']
- mod_alert_message = (
- f"**Triggered by:** {member.display_name}#{member.discriminator} (`{member.id}`)\n"
- f"**Channel:** {msg.channel.mention}\n"
- f"**Reason:** {reason}\n"
- )
-
- # For multiple messages or those with excessive newlines, use the logs API
- if len(messages) > 1 or rule_name == 'newlines':
- url = await self.mod_log.upload_log(messages, msg.guild.me.id)
- mod_alert_message += f"A complete log of the offending messages can be found [here]({url})"
- else:
- mod_alert_message += "Message:\n"
- content = messages[0].clean_content
- remaining_chars = 2040 - len(mod_alert_message)
-
- if len(content) > remaining_chars:
- content = content[:remaining_chars] + "..."
-
- mod_alert_message += f"{content}"
-
- # Return the mod log message Context that we can use to post the infraction
- mod_log_ctx = await self.mod_log.send_log_message(
- icon_url=Icons.filtering,
- colour=Colour(Colours.soft_red),
- title=f"Spam detected!",
- text=mod_alert_message,
- thumbnail=msg.author.avatar_url_as(static_format="png"),
- channel_id=Channels.mod_alerts,
- ping_everyone=AntiSpamConfig.ping_everyone
- )
+ # We need context, let's get it
+ context = await self.bot.get_context(msg)
# Since we're going to invoke the tempmute command directly, we need to manually call the converter.
- dt_remove_role_after = await self.expiration_date_converter.convert(mod_log_ctx, f"{remove_role_after}S")
- await mod_log_ctx.invoke(Moderation.tempmute, member, dt_remove_role_after, reason=reason)
+ dt_remove_role_after = await self.expiration_date_converter.convert(context, f"{remove_role_after}S")
+ await context.invoke(Moderation.tempmute, member, dt_remove_role_after, reason=reason)
+
+ async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]) -> None:
+ """Cleans the messages if cleaning is configured."""
- async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]):
- # Is deletion of offending messages actually enabled?
if AntiSpamConfig.clean_offending:
# If we have more than one message, we can use bulk delete.
@@ -152,24 +229,41 @@ class AntiSpam:
self.mod_log.ignore(Event.message_delete, messages[0].id)
await messages[0].delete()
+ async def _process_deletion_context(self, context_id: int) -> None:
+ """Processes the Deletion Context queue."""
+ log.trace("Sleeping before processing message deletion queue.")
+ await asyncio.sleep(10)
-def validate_config():
+ if context_id not in self.message_deletion_queue:
+ log.error(f"Started processing deletion queue for context `{context_id}`, but it was not found!")
+ return
+
+ deletion_context = self.message_deletion_queue.pop(context_id)
+ await deletion_context.upload_messages(self.bot.user.id, self.mod_log)
+
+
+def validate_config() -> bool:
+ """Validates the antispam configs."""
+ validation_errors = {}
for name, config in AntiSpamConfig.rules.items():
if name not in RULE_FUNCTION_MAPPING:
- raise ValueError(
+ log.error(
f"Unrecognized antispam rule `{name}`. "
f"Valid rules are: {', '.join(RULE_FUNCTION_MAPPING)}"
)
-
+ validation_errors[name] = f"`{name}` is not recognized as an antispam rule."
for required_key in ('interval', 'max'):
if required_key not in config:
- raise ValueError(
+ log.error(
f"`{required_key}` is required but was not "
f"set in rule `{name}`'s configuration."
)
+ validation_errors[name] = f"Key `{required_key}` is required but not set for rule `{name}`"
+ return validation_errors
-def setup(bot: Bot):
- validate_config()
- bot.add_cog(AntiSpam(bot))
+def setup(bot: Bot) -> None:
+ """Setup for the cog."""
+ validation_errors = validate_config()
+ bot.add_cog(AntiSpam(bot, validation_errors))
log.info("Cog loaded: AntiSpam")