aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/constants.py1
-rw-r--r--bot/decorators.py7
-rw-r--r--bot/exts/moderation/clean.py42
-rw-r--r--bot/exts/moderation/infraction/infractions.py79
-rw-r--r--tests/bot/exts/moderation/infraction/test_infractions.py91
-rw-r--r--tests/bot/exts/moderation/test_clean.py104
6 files changed, 286 insertions, 38 deletions
diff --git a/bot/constants.py b/bot/constants.py
index 1b713a7e3..77c01bfa3 100644
--- a/bot/constants.py
+++ b/bot/constants.py
@@ -445,6 +445,7 @@ class Channels(metaclass=YAMLGetter):
incidents_archive: int
mod_alerts: int
mod_meta: int
+ mods: int
nominations: int
nomination_voting: int
organisation: int
diff --git a/bot/decorators.py b/bot/decorators.py
index 048a2a09a..f4331264f 100644
--- a/bot/decorators.py
+++ b/bot/decorators.py
@@ -188,7 +188,7 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable:
"""
def decorator(func: types.FunctionType) -> types.FunctionType:
@command_wraps(func)
- async def wrapper(*args, **kwargs) -> None:
+ async def wrapper(*args, **kwargs) -> t.Any:
log.trace(f"{func.__name__}: respect role hierarchy decorator called")
bound_args = function.get_bound_args(func, args, kwargs)
@@ -196,8 +196,7 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable:
if not isinstance(target, Member):
log.trace("The target is not a discord.Member; skipping role hierarchy check.")
- await func(*args, **kwargs)
- return
+ return await func(*args, **kwargs)
ctx = function.get_arg_value(1, bound_args)
cmd = ctx.command.name
@@ -214,7 +213,7 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable:
)
else:
log.trace(f"{func.__name__}: {target.top_role=} < {actor.top_role=}; calling func")
- await func(*args, **kwargs)
+ return await func(*args, **kwargs)
return wrapper
return decorator
diff --git a/bot/exts/moderation/clean.py b/bot/exts/moderation/clean.py
index e61ef7880..cb6836258 100644
--- a/bot/exts/moderation/clean.py
+++ b/bot/exts/moderation/clean.py
@@ -331,12 +331,17 @@ class Clean(Cog):
return deleted
- async def _modlog_cleaned_messages(self, messages: list[Message], channels: CleanChannels, ctx: Context) -> bool:
- """Log the deleted messages to the modlog. Return True if logging was successful."""
+ async def _modlog_cleaned_messages(
+ self,
+ messages: list[Message],
+ channels: CleanChannels,
+ ctx: Context
+ ) -> Optional[str]:
+ """Log the deleted messages to the modlog, returning the log url if logging was successful."""
if not messages:
# Can't build an embed, nothing to clean!
await self._send_expiring_message(ctx, ":x: No matching messages could be found.")
- return False
+ return None
# Reverse the list to have reverse chronological order
log_messages = reversed(messages)
@@ -362,7 +367,7 @@ class Clean(Cog):
channel_id=Channels.mod_log,
)
- return True
+ return log_url
# endregion
@@ -375,8 +380,9 @@ class Clean(Cog):
regex: Optional[re.Pattern] = None,
first_limit: Optional[CleanLimit] = None,
second_limit: Optional[CleanLimit] = None,
- ) -> None:
- """A helper function that does the actual message cleaning."""
+ attempt_delete_invocation: bool = True,
+ ) -> Optional[str]:
+ """A helper function that does the actual message cleaning, returns the log url if logging was successful."""
self._validate_input(channels, bots_only, users, first_limit, second_limit)
# Are we already performing a clean?
@@ -384,7 +390,7 @@ class Clean(Cog):
await self._send_expiring_message(
ctx, ":x: Please wait for the currently ongoing clean operation to complete."
)
- return
+ return None
self.cleaning = True
deletion_channels = self._channels_set(channels, ctx, first_limit, second_limit)
@@ -399,8 +405,9 @@ class Clean(Cog):
# Needs to be called after standardizing the input.
predicate = self._build_predicate(first_limit, second_limit, bots_only, users, regex)
- # Delete the invocation first
- await self._delete_invocation(ctx)
+ if attempt_delete_invocation:
+ # Delete the invocation first
+ await self._delete_invocation(ctx)
if self._use_cache(first_limit):
log.trace(f"Messages for cleaning by {ctx.author.id} will be searched in the cache.")
@@ -418,7 +425,7 @@ class Clean(Cog):
if not self.cleaning:
# Means that the cleaning was canceled
- return
+ return None
# Now let's delete the actual messages with purge.
self.mod_log.ignore(Event.message_delete, *message_ids)
@@ -427,11 +434,18 @@ class Clean(Cog):
if not channels:
channels = deletion_channels
- logged = await self._modlog_cleaned_messages(deleted_messages, channels, ctx)
+ log_url = await self._modlog_cleaned_messages(deleted_messages, channels, ctx)
- if logged and is_mod_channel(ctx.channel):
- with suppress(NotFound): # Can happen if the invoker deleted their own messages.
- await ctx.message.add_reaction(Emojis.check_mark)
+ success_message = (
+ f"{Emojis.ok_hand} Deleted {len(deleted_messages)} messages. "
+ f"A log of the deleted messages can be found here {log_url}."
+ )
+ if log_url and is_mod_channel(ctx.channel):
+ await ctx.reply(success_message)
+ elif log_url:
+ if mods := self.bot.get_channel(Channels.mods):
+ await mods.send(f"{ctx.author.mention} {success_message}")
+ return log_url
# region: Commands
diff --git a/bot/exts/moderation/infraction/infractions.py b/bot/exts/moderation/infraction/infractions.py
index 7c0259b8e..09ee1a7b4 100644
--- a/bot/exts/moderation/infraction/infractions.py
+++ b/bot/exts/moderation/infraction/infractions.py
@@ -9,7 +9,7 @@ from discord.ext.commands import Context, command
from bot import constants
from bot.bot import Bot
from bot.constants import Event
-from bot.converters import Duration, Expiry, MemberOrUser, UnambiguousMemberOrUser
+from bot.converters import Age, Duration, Expiry, Infraction, MemberOrUser, UnambiguousMemberOrUser
from bot.decorators import respect_role_hierarchy
from bot.exts.moderation.infraction import _utils
from bot.exts.moderation.infraction._scheduler import InfractionScheduler
@@ -19,6 +19,11 @@ from bot.utils.messages import format_user
log = get_logger(__name__)
+if t.TYPE_CHECKING:
+ from bot.exts.moderation.clean import Clean
+ from bot.exts.moderation.infraction.management import ModManagement
+ from bot.exts.moderation.watchchannels.bigbrother import BigBrother
+
class Infractions(InfractionScheduler, commands.Cog):
"""Apply and pardon infractions on users for moderation purposes."""
@@ -91,8 +96,8 @@ class Infractions(InfractionScheduler, commands.Cog):
"""
await self.apply_ban(ctx, user, reason, expires_at=duration)
- @command(aliases=('pban',))
- async def purgeban(
+ @command(aliases=("cban", "purgeban", "pban"))
+ async def cleanban(
self,
ctx: Context,
user: UnambiguousMemberOrUser,
@@ -101,11 +106,49 @@ class Infractions(InfractionScheduler, commands.Cog):
reason: t.Optional[str] = None
) -> None:
"""
- Same as ban but removes all their messages of the last 24 hours.
+ Same as ban, but also cleans all their messages from the last hour.
If duration is specified, it temporarily bans that user for the given duration.
"""
- await self.apply_ban(ctx, user, reason, 1, expires_at=duration)
+ clean_cog: t.Optional[Clean] = self.bot.get_cog("Clean")
+ if clean_cog is None:
+ # If we can't get the clean cog, fall back to native purgeban.
+ await self.apply_ban(ctx, user, reason, purge_days=1, expires_at=duration)
+ return
+
+ infraction = await self.apply_ban(ctx, user, reason, expires_at=duration)
+ if not infraction or not infraction.get("id"):
+ # Ban was unsuccessful, quit early.
+ await ctx.send(":x: Failed to apply ban.")
+ log.error("Failed to apply ban to user %d", user.id)
+ return
+
+ # Calling commands directly skips Discord.py's convertors, so we need to convert args manually.
+ clean_time = await Age().convert(ctx, "1h")
+ infraction = await Infraction().convert(ctx, infraction["id"])
+
+ log_url = await clean_cog._clean_messages(
+ ctx,
+ users=[user],
+ channels="*",
+ first_limit=clean_time,
+ attempt_delete_invocation=False,
+ )
+ if not log_url:
+ # Cleaning failed, or there were no messages to clean, exit early.
+ return
+
+ infr_manage_cog: t.Optional[ModManagement] = self.bot.get_cog("ModManagement")
+ if infr_manage_cog is None:
+ # If we can't get the mod management cog, don't bother appending the log.
+ return
+
+ # Overwrite the context's send function so infraction append
+ # doesn't output the update infraction confirmation message.
+ async def send(*args, **kwargs) -> None:
+ pass
+ ctx.send = send
+ await infr_manage_cog.infraction_append(ctx, infraction, None, reason=f"[Clean log]({log_url})")
@command(aliases=("vban",))
async def voiceban(self, ctx: Context) -> None:
@@ -368,7 +411,7 @@ class Infractions(InfractionScheduler, commands.Cog):
reason: t.Optional[str],
purge_days: t.Optional[int] = 0,
**kwargs
- ) -> None:
+ ) -> t.Optional[dict]:
"""
Apply a ban infraction with kwargs passed to `post_infraction`.
@@ -376,7 +419,7 @@ class Infractions(InfractionScheduler, commands.Cog):
"""
if isinstance(user, Member) and user.top_role >= ctx.me.top_role:
await ctx.send(":x: I can't ban users above or equal to me in the role hierarchy.")
- return
+ return None
# In the case of a permanent ban, we don't need get_active_infractions to tell us if one is active
is_temporary = kwargs.get("expires_at") is not None
@@ -385,19 +428,19 @@ class Infractions(InfractionScheduler, commands.Cog):
if active_infraction:
if is_temporary:
log.trace("Tempban ignored as it cannot overwrite an active ban.")
- return
+ return None
if active_infraction.get('expires_at') is None:
log.trace("Permaban already exists, notify.")
await ctx.send(f":x: User is already permanently banned (#{active_infraction['id']}).")
- return
+ return None
log.trace("Old tempban is being replaced by new permaban.")
await self.pardon_infraction(ctx, "ban", user, send_msg=is_temporary)
infraction = await _utils.post_infraction(ctx, user, "ban", reason, active=True, **kwargs)
if infraction is None:
- return
+ return None
infraction["purge"] = "purge " if purge_days else ""
@@ -409,19 +452,17 @@ class Infractions(InfractionScheduler, commands.Cog):
action = ctx.guild.ban(user, reason=reason, delete_message_days=purge_days)
await self.apply_infraction(ctx, infraction, user, action)
+ bb_cog: t.Optional[BigBrother] = self.bot.get_cog("Big Brother")
if infraction.get('expires_at') is not None:
log.trace(f"Ban isn't permanent; user {user} won't be unwatched by Big Brother.")
- return
-
- bb_cog = self.bot.get_cog("Big Brother")
- if not bb_cog:
+ elif not bb_cog:
log.error(f"Big Brother cog not loaded; perma-banned user {user} won't be unwatched.")
- return
-
- log.trace(f"Big Brother cog loaded; attempting to unwatch perma-banned user {user}.")
+ else:
+ log.trace(f"Big Brother cog loaded; attempting to unwatch perma-banned user {user}.")
+ bb_reason = "User has been permanently banned from the server. Automatically removed."
+ await bb_cog.apply_unwatch(ctx, user, bb_reason, send_message=False)
- bb_reason = "User has been permanently banned from the server. Automatically removed."
- await bb_cog.apply_unwatch(ctx, user, bb_reason, send_message=False)
+ return infraction
@respect_role_hierarchy(member_arg=2)
async def apply_voice_mute(self, ctx: Context, user: MemberOrUser, reason: t.Optional[str], **kwargs) -> None:
diff --git a/tests/bot/exts/moderation/infraction/test_infractions.py b/tests/bot/exts/moderation/infraction/test_infractions.py
index f89465f84..8bed1e386 100644
--- a/tests/bot/exts/moderation/infraction/test_infractions.py
+++ b/tests/bot/exts/moderation/infraction/test_infractions.py
@@ -1,13 +1,15 @@
import inspect
import textwrap
import unittest
-from unittest.mock import ANY, AsyncMock, MagicMock, Mock, patch
+from unittest.mock import ANY, AsyncMock, DEFAULT, MagicMock, Mock, patch
from discord.errors import NotFound
from bot.constants import Event
+from bot.exts.moderation.clean import Clean
from bot.exts.moderation.infraction import _utils
from bot.exts.moderation.infraction.infractions import Infractions
+from bot.exts.moderation.infraction.management import ModManagement
from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole, MockUser, autospec
@@ -231,3 +233,90 @@ class VoiceMuteTests(unittest.IsolatedAsyncioTestCase):
"DM": "**Failed**"
})
notify_pardon_mock.assert_awaited_once()
+
+
+class CleanBanTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for cleanban functionality."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.mod = MockMember(roles=[MockRole(id=7890123, position=10)])
+ self.user = MockMember(roles=[MockRole(id=123456, position=1)])
+ self.guild = MockGuild()
+ self.ctx = MockContext(bot=self.bot, author=self.mod)
+ self.cog = Infractions(self.bot)
+ self.clean_cog = Clean(self.bot)
+ self.management_cog = ModManagement(self.bot)
+
+ self.cog.apply_ban = AsyncMock(return_value={"id": 42})
+ self.log_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
+ self.clean_cog._clean_messages = AsyncMock(return_value=self.log_url)
+
+ def mock_get_cog(self, enable_clean, enable_manage):
+ """Mock get cog factory that allows the user to specify whether clean and manage cogs are enabled."""
+ def inner(name):
+ if name == "ModManagement":
+ return self.management_cog if enable_manage else None
+ elif name == "Clean":
+ return self.clean_cog if enable_clean else None
+ else:
+ return DEFAULT
+ return inner
+
+ async def test_cleanban_falls_back_to_native_purge_without_clean_cog(self):
+ """Should fallback to native purge if the Clean cog is not available."""
+ self.bot.get_cog.side_effect = self.mock_get_cog(False, False)
+
+ self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))
+ self.cog.apply_ban.assert_awaited_once_with(
+ self.ctx,
+ self.user,
+ "FooBar",
+ purge_days=1,
+ expires_at=None,
+ )
+
+ async def test_cleanban_doesnt_purge_messages_if_clean_cog_available(self):
+ """Cleanban command should use the native purge messages if the clean cog is available."""
+ self.bot.get_cog.side_effect = self.mock_get_cog(True, False)
+
+ self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))
+ self.cog.apply_ban.assert_awaited_once_with(
+ self.ctx,
+ self.user,
+ "FooBar",
+ expires_at=None,
+ )
+
+ @patch("bot.exts.moderation.infraction.infractions.Age")
+ async def test_cleanban_uses_clean_cog_when_available(self, mocked_age_converter):
+ """Test cleanban uses the clean cog to clean messages if it's available."""
+ self.bot.api_client.patch = AsyncMock()
+ self.bot.get_cog.side_effect = self.mock_get_cog(True, False)
+
+ mocked_age_converter.return_value.convert = AsyncMock(return_value="81M")
+ self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))
+
+ self.clean_cog._clean_messages.assert_awaited_once_with(
+ self.ctx,
+ users=[self.user],
+ channels="*",
+ first_limit="81M",
+ attempt_delete_invocation=False,
+ )
+
+ @patch("bot.exts.moderation.infraction.infractions.Infraction")
+ async def test_cleanban_edits_infraction_reason(self, mocked_infraction_converter):
+ """Ensure cleanban edits the ban reason with a link to the clean log."""
+ self.bot.get_cog.side_effect = self.mock_get_cog(True, True)
+
+ self.management_cog.infraction_append = AsyncMock()
+ mocked_infraction_converter.return_value.convert = AsyncMock(return_value=42)
+ self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))
+
+ self.management_cog.infraction_append.assert_awaited_once_with(
+ self.ctx,
+ 42,
+ None,
+ reason=f"[Clean log]({self.log_url})"
+ )
diff --git a/tests/bot/exts/moderation/test_clean.py b/tests/bot/exts/moderation/test_clean.py
new file mode 100644
index 000000000..d7647fa48
--- /dev/null
+++ b/tests/bot/exts/moderation/test_clean.py
@@ -0,0 +1,104 @@
+import unittest
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from bot.exts.moderation.clean import Clean
+from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockMessage, MockRole, MockTextChannel
+
+
+class CleanTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for clean cog functionality."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.mod = MockMember(roles=[MockRole(id=7890123, position=10)])
+ self.user = MockMember(roles=[MockRole(id=123456, position=1)])
+ self.guild = MockGuild()
+ self.ctx = MockContext(bot=self.bot, author=self.mod)
+ self.cog = Clean(self.bot)
+
+ self.log_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
+ self.cog._modlog_cleaned_messages = AsyncMock(return_value=self.log_url)
+
+ self.cog._use_cache = MagicMock(return_value=True)
+ self.cog._delete_found = AsyncMock(return_value=[42, 84])
+
+ @patch("bot.exts.moderation.clean.is_mod_channel")
+ async def test_clean_deletes_invocation_in_non_mod_channel(self, mod_channel_check):
+ """Clean command should delete the invocation message if ran in a non mod channel."""
+ mod_channel_check.return_value = False
+ self.ctx.message.delete = AsyncMock()
+
+ self.assertIsNone(await self.cog._delete_invocation(self.ctx))
+
+ self.ctx.message.delete.assert_awaited_once()
+
+ @patch("bot.exts.moderation.clean.is_mod_channel")
+ async def test_clean_doesnt_delete_invocation_in_mod_channel(self, mod_channel_check):
+ """Clean command should not delete the invocation message if ran in a mod channel."""
+ mod_channel_check.return_value = True
+ self.ctx.message.delete = AsyncMock()
+
+ self.assertIsNone(await self.cog._delete_invocation(self.ctx))
+
+ self.ctx.message.delete.assert_not_awaited()
+
+ async def test_clean_doesnt_attempt_deletion_when_attempt_delete_invocation_is_false(self):
+ """Clean command should not attempt to delete the invocation message if attempt_delete_invocation is false."""
+ self.cog._delete_invocation = AsyncMock()
+ self.bot.get_channel = MagicMock(return_value=False)
+
+ self.assertEqual(
+ await self.cog._clean_messages(
+ self.ctx,
+ None,
+ first_limit=MockMessage(),
+ attempt_delete_invocation=False,
+ ),
+ self.log_url,
+ )
+
+ self.cog._delete_invocation.assert_not_awaited()
+
+ @patch("bot.exts.moderation.clean.is_mod_channel")
+ async def test_clean_replies_with_success_message_when_ran_in_mod_channel(self, mod_channel_check):
+ """Clean command should reply to the message with a confirmation message if invoked in a mod channel."""
+ mod_channel_check.return_value = True
+ self.ctx.reply = AsyncMock()
+
+ self.assertEqual(
+ await self.cog._clean_messages(
+ self.ctx,
+ None,
+ first_limit=MockMessage(),
+ attempt_delete_invocation=False,
+ ),
+ self.log_url,
+ )
+
+ self.ctx.reply.assert_awaited_once()
+ sent_message = self.ctx.reply.await_args[0][0]
+ self.assertIn(self.log_url, sent_message)
+ self.assertIn("2 messages", sent_message)
+
+ @patch("bot.exts.moderation.clean.is_mod_channel")
+ async def test_clean_send_success_message_to_mods_when_ran_in_non_mod_channel(self, mod_channel_check):
+ """Clean command should send a confirmation message to #mods if invoked in a non-mod channel."""
+ mod_channel_check.return_value = False
+ mocked_mods = MockTextChannel(id=1234567)
+ mocked_mods.send = AsyncMock()
+ self.bot.get_channel = MagicMock(return_value=mocked_mods)
+
+ self.assertEqual(
+ await self.cog._clean_messages(
+ self.ctx,
+ None,
+ first_limit=MockMessage(),
+ attempt_delete_invocation=False,
+ ),
+ self.log_url,
+ )
+
+ mocked_mods.send.assert_awaited_once()
+ sent_message = mocked_mods.send.await_args[0][0]
+ self.assertIn(self.log_url, sent_message)
+ self.assertIn("2 messages", sent_message)