diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/bot/exts/moderation/test_silence.py | 379 | ||||
-rw-r--r-- | tests/helpers.py | 25 |
2 files changed, 374 insertions, 30 deletions
diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py index fa5fc9e81..a365b2aae 100644 --- a/tests/bot/exts/moderation/test_silence.py +++ b/tests/bot/exts/moderation/test_silence.py @@ -2,14 +2,14 @@ import asyncio import unittest from datetime import datetime, timezone from unittest import mock -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock from async_rediscache import RedisSession from discord import PermissionOverwrite from bot.constants import Channels, Guild, Roles from bot.exts.moderation import silence -from tests.helpers import MockBot, MockContext, MockTextChannel, autospec +from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockTextChannel, MockVoiceChannel, autospec redis_session = None redis_loop = asyncio.get_event_loop() @@ -149,7 +149,7 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase): self.assertTrue(self.cog._init_task.cancelled()) @autospec("discord.ext.commands", "has_any_role") - @mock.patch.object(silence, "MODERATION_ROLES", new=(1, 2, 3)) + @mock.patch.object(silence.constants, "MODERATION_ROLES", new=(1, 2, 3)) async def test_cog_check(self, role_check): """Role check was called with `MODERATION_ROLES`""" ctx = MockContext() @@ -159,6 +159,62 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase): role_check.assert_called_once_with(*(1, 2, 3)) role_check.return_value.predicate.assert_awaited_once_with(ctx) + async def test_force_voice_sync(self): + """Tests the _force_voice_sync helper function.""" + await self.cog._async_init() + + members = [MockMember() for _ in range(10)] + members.extend([MockMember(roles=[self.cog._helper_role]) for _ in range(3)]) + + channel = MockVoiceChannel(members=members) + + await self.cog._force_voice_sync(channel) + for member in members: + if self.cog._helper_role in member.roles: + member.move_to.assert_not_called() + else: + self.assertEqual(member.move_to.call_count, 2) + calls = member.move_to.call_args_list + + # Tests that the member was moved to the afk channel, and back. + self.assertEqual((channel.guild.afk_channel,), calls[0].args) + self.assertEqual((channel,), calls[1].args) + + async def test_force_voice_sync_no_channel(self): + """Test to ensure _force_voice_sync can create its own voice channel if one is not available.""" + await self.cog._async_init() + + channel = MockVoiceChannel(guild=MockGuild(afk_channel=None)) + new_channel = MockVoiceChannel(delete=AsyncMock()) + channel.guild.create_voice_channel.return_value = new_channel + + await self.cog._force_voice_sync(channel) + + # Check channel creation + overwrites = { + channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False) + } + channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites) + + # Check bot deleted channel + new_channel.delete.assert_called_once() + + async def test_voice_kick(self): + """Test to ensure kick function can remove all members from a voice channel.""" + await self.cog._async_init() + + members = [MockMember() for _ in range(10)] + members.extend([MockMember(roles=[self.cog._helper_role]) for _ in range(3)]) + + channel = MockVoiceChannel(members=members) + await self.cog._kick_voice_members(channel) + + for member in members: + if self.cog._helper_role in member.roles: + member.move_to.assert_not_called() + else: + self.assertEqual((None,), member.move_to.call_args_list[0].args) + @autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False) class RescheduleTests(unittest.IsolatedAsyncioTestCase): @@ -235,6 +291,17 @@ class RescheduleTests(unittest.IsolatedAsyncioTestCase): self.cog.notifier.add_channel.assert_not_called() +def voice_sync_helper(function): + """Helper wrapper to test the sync and kick functions for voice channels.""" + @autospec(silence.Silence, "_force_voice_sync", "_kick_voice_members", "_set_silence_overwrites") + async def inner(self, sync, kick, overwrites): + overwrites.return_value = True + await function(self, MockContext(), + sync, kick) + + return inner + + @autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False) class SilenceTests(unittest.IsolatedAsyncioTestCase): """Tests for the silence command and its related helper methods.""" @@ -242,7 +309,7 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): @autospec(silence.Silence, "_reschedule", pass_mocks=False) @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False) def setUp(self) -> None: - self.bot = MockBot() + self.bot = MockBot(get_channel=lambda _: MockTextChannel()) self.cog = silence.Silence(self.bot) self.cog._init_task = asyncio.Future() self.cog._init_task.set_result(None) @@ -252,9 +319,13 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): asyncio.run(self.cog._async_init()) # Populate instance attributes. - self.channel = MockTextChannel() - self.overwrite = PermissionOverwrite(stream=True, send_messages=True, add_reactions=False) - self.channel.overwrites_for.return_value = self.overwrite + self.text_channel = MockTextChannel() + self.text_overwrite = PermissionOverwrite(stream=True, send_messages=True, add_reactions=False) + self.text_channel.overwrites_for.return_value = self.text_overwrite + + self.voice_channel = MockVoiceChannel() + self.voice_overwrite = PermissionOverwrite(speak=True) + self.voice_channel.overwrites_for.return_value = self.voice_overwrite async def test_sent_correct_message(self): """Appropriate failure/success message was sent by the command.""" @@ -268,7 +339,76 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=was_silenced): with self.subTest(was_silenced=was_silenced, message=message, duration=duration): await self.cog.silence.callback(self.cog, ctx, duration) - ctx.send.assert_called_once_with(message) + ctx.channel.send.assert_called_once_with(message) + + @mock.patch.object(silence.Silence, "_set_silence_overwrites", return_value=True) + @mock.patch.object(silence.Silence, "_force_voice_sync") + async def test_sent_to_correct_channel(self, voice_sync, _): + """Test function sends messages to the correct channels.""" + text_channel = MockTextChannel() + voice_channel = MockVoiceChannel() + ctx = MockContext() + + test_cases = ( + (None, silence.MSG_SILENCE_SUCCESS.format(duration=10)), + (text_channel, silence.MSG_SILENCE_SUCCESS.format(duration=10)), + (voice_channel, silence.MSG_SILENCE_SUCCESS.format(duration=10)), + (ctx.channel, silence.MSG_SILENCE_SUCCESS.format(duration=10)), + ) + + for target, message in test_cases: + with self.subTest(target_channel=target, message=message): + await self.cog.silence.callback(self.cog, ctx, 10, target, kick=False) + + if ctx.channel == target or target is None: + ctx.channel.send.assert_called_once_with(message) + + else: + ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention)) + if isinstance(target, MockTextChannel): + target.send.assert_called_once_with(message) + else: + voice_sync.assert_called_once_with(target) + + ctx.channel.send.reset_mock() + if target is not None and isinstance(target, MockTextChannel): + target.send.reset_mock() + + @voice_sync_helper + async def test_sync_called(self, ctx, sync, kick): + """Tests if silence command calls sync on a voice channel.""" + channel = MockVoiceChannel() + await self.cog.silence.callback(self.cog, ctx, 10, channel, kick=False) + + sync.assert_called_once_with(self.cog, channel) + kick.assert_not_called() + + @voice_sync_helper + async def test_kick_called(self, ctx, sync, kick): + """Tests if silence command calls kick on a voice channel.""" + channel = MockVoiceChannel() + await self.cog.silence.callback(self.cog, ctx, 10, channel, kick=True) + + kick.assert_called_once_with(self.cog, channel) + sync.assert_not_called() + + @voice_sync_helper + async def test_sync_not_called(self, ctx, sync, kick): + """Tests that silence command does not call sync on a text channel.""" + channel = MockTextChannel() + await self.cog.silence.callback(self.cog, ctx, 10, channel, kick=False) + + sync.assert_not_called() + kick.assert_not_called() + + @voice_sync_helper + async def test_kick_not_called(self, ctx, sync, kick): + """Tests that silence command does not call kick on a text channel.""" + channel = MockTextChannel() + await self.cog.silence.callback(self.cog, ctx, 10, channel, kick=True) + + sync.assert_not_called() + kick.assert_not_called() async def test_skipped_already_silenced(self): """Permissions were not set and `False` was returned for an already silenced channel.""" @@ -289,19 +429,19 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): async def test_silenced_channel(self): """Channel had `send_message` and `add_reactions` permissions revoked for verified role.""" - self.assertTrue(await self.cog._set_silence_overwrites(self.channel)) - self.assertFalse(self.overwrite.send_messages) - self.assertFalse(self.overwrite.add_reactions) - self.channel.set_permissions.assert_awaited_once_with( + self.assertTrue(await self.cog._set_silence_overwrites(self.text_channel)) + self.assertFalse(self.text_overwrite.send_messages) + self.assertFalse(self.text_overwrite.add_reactions) + self.text_channel.set_permissions.assert_awaited_once_with( self.cog._everyone_role, - overwrite=self.overwrite + overwrite=self.text_overwrite ) async def test_preserved_other_overwrites(self): """Channel's other unrelated overwrites were not changed.""" - prev_overwrite_dict = dict(self.overwrite) - await self.cog._set_silence_overwrites(self.channel) - new_overwrite_dict = dict(self.overwrite) + prev_overwrite_dict = dict(self.text_overwrite) + await self.cog._set_silence_overwrites(self.text_channel) + new_overwrite_dict = dict(self.text_overwrite) # Remove 'send_messages' & 'add_reactions' keys because they were changed by the method. del prev_overwrite_dict['send_messages'] @@ -332,8 +472,8 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): async def test_cached_previous_overwrites(self): """Channel's previous overwrites were cached.""" overwrite_json = '{"send_messages": true, "add_reactions": false}' - await self.cog._set_silence_overwrites(self.channel) - self.cog.previous_overwrites.set.assert_called_once_with(self.channel.id, overwrite_json) + await self.cog._set_silence_overwrites(self.text_channel) + self.cog.previous_overwrites.set.assert_called_once_with(self.text_channel.id, overwrite_json) @autospec(silence, "datetime") async def test_cached_unsilence_time(self, datetime_mock): @@ -343,7 +483,7 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): timestamp = now_timestamp + duration * 60 datetime_mock.now.return_value = datetime.fromtimestamp(now_timestamp, tz=timezone.utc) - ctx = MockContext(channel=self.channel) + ctx = MockContext(channel=self.text_channel) await self.cog.silence.callback(self.cog, ctx, duration) self.cog.unsilence_timestamps.set.assert_awaited_once_with(ctx.channel.id, timestamp) @@ -351,26 +491,34 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): async def test_cached_indefinite_time(self): """A value of -1 was cached for a permanent silence.""" - ctx = MockContext(channel=self.channel) + ctx = MockContext(channel=self.text_channel) await self.cog.silence.callback(self.cog, ctx, None) self.cog.unsilence_timestamps.set.assert_awaited_once_with(ctx.channel.id, -1) async def test_scheduled_task(self): """An unsilence task was scheduled.""" - ctx = MockContext(channel=self.channel, invoke=mock.MagicMock()) + ctx = MockContext(channel=self.text_channel, invoke=mock.MagicMock()) await self.cog.silence.callback(self.cog, ctx, 5) args = (300, ctx.channel.id, ctx.invoke.return_value) self.cog.scheduler.schedule_later.assert_called_once_with(*args) - ctx.invoke.assert_called_once_with(self.cog.unsilence) + ctx.invoke.assert_called_once_with(self.cog.unsilence, channel=ctx.channel) async def test_permanent_not_scheduled(self): """A task was not scheduled for a permanent silence.""" - ctx = MockContext(channel=self.channel) + ctx = MockContext(channel=self.text_channel) await self.cog.silence.callback(self.cog, ctx, None) self.cog.scheduler.schedule_later.assert_not_called() + async def test_correct_permission_updates(self): + """Tests if _set_silence_overwrites can correctly get and update permissions.""" + self.assertTrue(await self.cog._set_silence_overwrites(self.voice_channel)) + self.assertFalse(self.voice_overwrite.speak) + + self.assertTrue(await self.cog._set_silence_overwrites(self.voice_channel, kick=True)) + self.assertFalse(self.voice_overwrite.speak or self.voice_overwrite.connect) + @autospec(silence.Silence, "unsilence_timestamps", pass_mocks=False) class UnsilenceTests(unittest.IsolatedAsyncioTestCase): @@ -405,13 +553,22 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase): (False, silence.MSG_UNSILENCE_MANUAL, PermissionOverwrite(send_messages=False)), (False, silence.MSG_UNSILENCE_MANUAL, PermissionOverwrite(add_reactions=False)), ) + for was_unsilenced, message, overwrite in test_cases: ctx = MockContext() - with self.subTest(was_unsilenced=was_unsilenced, message=message, overwrite=overwrite): + + for target in [None, MockTextChannel()]: + ctx.channel.overwrites_for.return_value = overwrite + if target: + target.overwrites_for.return_value = overwrite + with mock.patch.object(self.cog, "_unsilence", return_value=was_unsilenced): - ctx.channel.overwrites_for.return_value = overwrite - await self.cog.unsilence.callback(self.cog, ctx) - ctx.channel.send.assert_called_once_with(message) + with mock.patch.object(self.cog, "send_message") as send_message: + with self.subTest(was_unsilenced=was_unsilenced, overwrite=overwrite, target=target): + await self.cog.unsilence.callback(self.cog, ctx, channel=target) + + call_args = (message, ctx.channel, target or ctx.channel) + send_message.assert_called_once_with(*call_args, alert_target=was_unsilenced) async def test_skipped_already_unsilenced(self): """Permissions were not set and `False` was returned for an already unsilenced channel.""" @@ -453,6 +610,10 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase): await self.cog._unsilence(self.channel) self.cog._mod_alerts_channel.send.assert_awaited_once() + self.cog._mod_alerts_channel.send.reset_mock() + + await self.cog._unsilence(MockVoiceChannel()) + self.cog._mod_alerts_channel.send.assert_awaited_once() async def test_removed_notifier(self): """Channel was removed from `notifier`.""" @@ -491,3 +652,167 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase): del new_overwrite_dict['add_reactions'] self.assertDictEqual(prev_overwrite_dict, new_overwrite_dict) + + @mock.patch.object(silence.Silence, "_unsilence", return_value=False) + @mock.patch.object(silence.Silence, "send_message") + async def test_unsilence_helper_fail(self, send_message, _): + """Tests unsilence_wrapper when `_unsilence` fails.""" + ctx = MockContext() + + text_channel = MockTextChannel() + text_role = self.cog.bot.get_guild(Guild.id).default_role + + voice_channel = MockVoiceChannel() + voice_role = self.cog.bot.get_guild(Guild.id).get_role(Roles.voice_verified) + + test_cases = ( + (ctx, text_channel, text_role, True, silence.MSG_UNSILENCE_FAIL), + (ctx, text_channel, text_role, False, silence.MSG_UNSILENCE_MANUAL), + (ctx, voice_channel, voice_role, True, silence.MSG_UNSILENCE_FAIL), + (ctx, voice_channel, voice_role, False, silence.MSG_UNSILENCE_MANUAL), + ) + + class PermClass: + """Class to Mock return permissions""" + def __init__(self, value: bool): + self.val = value + + def __getattr__(self, item): + return self.val + + for context, channel, role, permission, message in test_cases: + with mock.patch.object(channel, "overwrites_for", return_value=PermClass(permission)) as overwrites: + with self.subTest(channel=channel, message=message): + await self.cog._unsilence_wrapper(channel, context) + + overwrites.assert_called_once_with(role) + send_message.assert_called_once_with(message, ctx.channel, channel, alert_target=False) + + send_message.reset_mock() + + @mock.patch.object(silence.Silence, "_force_voice_sync") + @mock.patch.object(silence.Silence, "send_message") + async def test_correct_overwrites(self, send_message, _): + """Tests the overwrites returned by the _unsilence_wrapper are correct for voice and text channels.""" + ctx = MockContext() + + text_channel = MockTextChannel() + text_role = self.cog.bot.get_guild(Guild.id).default_role + + voice_channel = MockVoiceChannel() + voice_role = self.cog.bot.get_guild(Guild.id).get_role(Roles.voice_verified) + + async def reset(): + await text_channel.set_permissions(text_role, PermissionOverwrite(send_messages=False, add_reactions=False)) + await voice_channel.set_permissions(voice_role, PermissionOverwrite(speak=False, connect=False)) + + text_channel.reset_mock() + voice_channel.reset_mock() + send_message.reset_mock() + await reset() + + default_text_overwrites = text_channel.overwrites_for(text_role) + default_voice_overwrites = voice_channel.overwrites_for(voice_role) + + test_cases = ( + (ctx, text_channel, text_role, default_text_overwrites, silence.MSG_UNSILENCE_SUCCESS), + (ctx, voice_channel, voice_role, default_voice_overwrites, silence.MSG_UNSILENCE_SUCCESS), + (ctx, ctx.channel, text_role, ctx.channel.overwrites_for(text_role), silence.MSG_UNSILENCE_SUCCESS), + (None, text_channel, text_role, default_text_overwrites, silence.MSG_UNSILENCE_SUCCESS), + ) + + for context, channel, role, overwrites, message in test_cases: + with self.subTest(ctx=context, channel=channel): + await self.cog._unsilence_wrapper(channel, context) + + if context is None: + send_message.assert_called_once_with(message, channel, channel, alert_target=True) + else: + send_message.assert_called_once_with(message, context.channel, channel, alert_target=True) + + channel.set_permissions.assert_called_once_with(role, overwrite=overwrites) + if channel != ctx.channel: + ctx.channel.send.assert_not_called() + + await reset() + + +class SendMessageTests(unittest.IsolatedAsyncioTestCase): + """Unittests for the send message helper function.""" + + def setUp(self) -> None: + self.bot = MockBot() + self.cog = silence.Silence(self.bot) + + self.text_channels = [MockTextChannel() for _ in range(2)] + self.bot.get_channel.return_value = self.text_channels[1] + + self.voice_channel = MockVoiceChannel() + + async def test_send_to_channel(self): + """Tests a basic case for the send function.""" + message = "Test basic message." + await self.cog.send_message(message, *self.text_channels, alert_target=False) + + self.text_channels[0].send.assert_called_once_with(message) + self.text_channels[1].send.assert_not_called() + + async def test_send_to_multiple_channels(self): + """Tests sending messages to two channels.""" + message = "Test basic message." + await self.cog.send_message(message, *self.text_channels, alert_target=True) + + self.text_channels[0].send.assert_called_once_with(message) + self.text_channels[1].send.assert_called_once_with(message) + + async def test_duration_replacement(self): + """Tests that the channel name was set correctly for one target channel.""" + message = "Current. The following should be replaced: current channel." + await self.cog.send_message(message, *self.text_channels, alert_target=False) + + updated_message = message.replace("current channel", self.text_channels[0].mention) + self.text_channels[0].send.assert_called_once_with(updated_message) + self.text_channels[1].send.assert_not_called() + + async def test_name_replacement_multiple_channels(self): + """Tests that the channel name was set correctly for two channels.""" + message = "Current. The following should be replaced: current channel." + await self.cog.send_message(message, *self.text_channels, alert_target=True) + + updated_message = message.replace("current channel", self.text_channels[0].mention) + self.text_channels[0].send.assert_called_once_with(updated_message) + self.text_channels[1].send.assert_called_once_with(message) + + async def test_silence_voice(self): + """Tests that the correct message was sent when a voice channel is muted without alerting.""" + message = "This should show up just here." + await self.cog.send_message(message, self.text_channels[0], self.voice_channel, alert_target=False) + self.text_channels[0].send.assert_called_once_with(message) + + async def test_silence_voice_alert(self): + """Tests that the correct message was sent when a voice channel is muted with alerts.""" + with unittest.mock.patch.object(silence, "VOICE_CHANNELS") as mock_voice_channels: + mock_voice_channels.get.return_value = self.text_channels[1].id + + message = "This should show up as current channel." + await self.cog.send_message(message, self.text_channels[0], self.voice_channel, alert_target=True) + + updated_message = message.replace("current channel", self.voice_channel.mention) + self.text_channels[0].send.assert_called_once_with(updated_message) + self.text_channels[1].send.assert_called_once_with(updated_message) + + mock_voice_channels.get.assert_called_once_with(self.voice_channel.id) + + async def test_silence_voice_sibling_channel(self): + """Tests silencing a voice channel from the related text channel.""" + with unittest.mock.patch.object(silence, "VOICE_CHANNELS") as mock_voice_channels: + mock_voice_channels.get.return_value = self.text_channels[1].id + + message = "This should show up as current channel." + await self.cog.send_message(message, self.text_channels[1], self.voice_channel, alert_target=True) + + updated_message = message.replace("current channel", self.voice_channel.mention) + self.text_channels[1].send.assert_called_once_with(updated_message) + + mock_voice_channels.get.assert_called_once_with(self.voice_channel.id) + self.bot.get_channel.assert_called_once_with(self.text_channels[1].id) diff --git a/tests/helpers.py b/tests/helpers.py index 496363ae3..529664e67 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -16,7 +16,6 @@ from bot.async_stats import AsyncStatsClient from bot.bot import Bot from tests._autospec import autospec # noqa: F401 other modules import it via this module - for logger in logging.Logger.manager.loggerDict.values(): # Set all loggers to CRITICAL by default to prevent screen clutter during testing @@ -320,7 +319,10 @@ channel_data = { } state = unittest.mock.MagicMock() guild = unittest.mock.MagicMock() -channel_instance = discord.TextChannel(state=state, guild=guild, data=channel_data) +text_channel_instance = discord.TextChannel(state=state, guild=guild, data=channel_data) + +channel_data["type"] = "VoiceChannel" +voice_channel_instance = discord.VoiceChannel(state=state, guild=guild, data=channel_data) class MockTextChannel(CustomMockMixin, unittest.mock.Mock, HashableMixin): @@ -330,7 +332,24 @@ class MockTextChannel(CustomMockMixin, unittest.mock.Mock, HashableMixin): Instances of this class will follow the specifications of `discord.TextChannel` instances. For more information, see the `MockGuild` docstring. """ - spec_set = channel_instance + spec_set = text_channel_instance + + def __init__(self, **kwargs) -> None: + default_kwargs = {'id': next(self.discord_id), 'name': 'channel', 'guild': MockGuild()} + super().__init__(**collections.ChainMap(kwargs, default_kwargs)) + + if 'mention' not in kwargs: + self.mention = f"#{self.name}" + + +class MockVoiceChannel(CustomMockMixin, unittest.mock.Mock, HashableMixin): + """ + A MagicMock subclass to mock VoiceChannel objects. + + Instances of this class will follow the specifications of `discord.VoiceChannel` instances. For + more information, see the `MockGuild` docstring. + """ + spec_set = voice_channel_instance def __init__(self, **kwargs) -> None: default_kwargs = {'id': next(self.discord_id), 'name': 'channel', 'guild': MockGuild()} |