diff options
| -rw-r--r-- | tests/bot/exts/moderation/test_silence.py | 212 | 
1 files changed, 211 insertions, 1 deletions
diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py index 577725071..1d05ee357 100644 --- a/tests/bot/exts/moderation/test_silence.py +++ b/tests/bot/exts/moderation/test_silence.py @@ -9,7 +9,7 @@ from discord import PermissionOverwrite  from bot.constants import Channels, Emojis, Guild, Roles  from bot.exts.moderation import silence -from tests.helpers import MockBot, MockContext, MockTextChannel, MockVoiceChannel, autospec +from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockTextChannel, MockVoiceChannel, autospec  redis_session = None  redis_loop = asyncio.get_event_loop() @@ -237,6 +237,7 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase):              text_channel_2.send.assert_called_once_with(f"This should show up as {voice_channel.mention}")      async def test_get_related_text_channel(self): +        """Tests the helper function that connects voice to text channels."""          voice_channel = MockVoiceChannel()          tests = ( @@ -255,6 +256,72 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase):                  result_id = await self.cog._get_related_text_channel(voice_channel)                  self.assertEqual(result_id, channel_id) +    async def test_force_voice_sync(self): +        """Tests the _force_voice_sync helper function.""" +        await self.cog._async_init() + +        afk_channel = MockVoiceChannel() +        channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel)) + +        members = [] +        for _ in range(10): +            members.append(MockMember()) + +        channel.members = members +        test_cases = ( +            (members[0], False, "Muting member."), +            (members[0], True, "Kicking voice channel member."), +            (None, False, "Muting member."), +            (None, True, "Kicking voice channel member."), +        ) + +        for member, kick, reason in test_cases: +            with self.subTest(members=member, kick=kick, reason=reason): +                await self.cog._force_voice_sync(channel, member, kick) + +                for single_member in channel.members if member is None else [member]: +                    if kick: +                        single_member.move_to.assert_called_once_with(None, reason=reason) +                    else: +                        self.assertEqual(single_member.move_to.call_count, 2) +                        single_member.move_to.assert_has_calls([ +                            mock.call(afk_channel, reason=reason), +                            mock.call(channel, reason=reason) +                        ], any_order=False) + +                    single_member.reset_mock() + +    async def test_force_voice_sync_staff(self): +        """Tests to ensure _force_voice_sync does not kick staff members.""" +        await self.cog._async_init() +        member = MockMember(roles=[self.cog._helper_role]) + +        await self.cog._force_voice_sync(MockVoiceChannel(), member) +        member.move_to.assert_not_called() + +    async def test_force_voice_sync_no_channel(self): +        """Test to ensure _force_voice_sync can create its own voice channel if one is not available.""" +        await self.cog._async_init() + +        member = MockMember() +        channel = MockVoiceChannel(guild=MockGuild(afk_channel=None)) + +        new_channel = MockVoiceChannel(delete=Mock()) +        channel.guild.create_voice_channel.return_value = new_channel + +        with mock.patch.object(self.cog.scheduler, "schedule_later") as scheduler: +            await self.cog._force_voice_sync(channel, member) + +            # Check channel creation +            overwrites = { +                channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False) +            } +            channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites) + +            # Check bot queued deletion +            new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.") +            scheduler.assert_called_once_with(30, new_channel.id, new_channel.delete()) +  @autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False)  class RescheduleTests(unittest.IsolatedAsyncioTestCase): @@ -366,6 +433,31 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase):                      await self.cog.silence.callback(self.cog, ctx, duration)                      ctx.channel.send.assert_called_once_with(message) +    async def test_sent_to_correct_channel(self): +        """Test function sends messages to the correct channels.""" +        text_channel = MockTextChannel() +        ctx = MockContext() + +        test_cases = ( +            (None, silence.MSG_SILENCE_SUCCESS.format(duration=10)), +            (text_channel, silence.MSG_SILENCE_SUCCESS.format(duration=10)), +            (ctx.channel, silence.MSG_SILENCE_SUCCESS.format(duration=10)), +        ) + +        for target, message in test_cases: +            with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True): +                with self.subTest(target_channel=target, message=message): +                    await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target) +                    if ctx.channel == target or target is None: +                        ctx.channel.send.assert_called_once_with(message) +                    else: +                        ctx.channel.send.assert_called_once_with(message.replace("current", text_channel.mention)) +                        target.send.assert_called_once_with(message) + +            ctx.channel.send.reset_mock() +            if target is not None: +                target.send.reset_mock() +      async def test_skipped_already_silenced(self):          """Permissions were not set and `False` was returned for an already silenced channel."""          subtests = ( @@ -509,6 +601,40 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase):                      await self.cog.unsilence.callback(self.cog, ctx)                      ctx.channel.send.assert_called_once_with(message) +    async def test_sent_to_correct_channel(self): +        """Test function sends messages to the correct channels.""" +        unsilenced_overwrite = PermissionOverwrite(send_messages=True, add_reactions=True) +        text_channel = MockTextChannel() +        ctx = MockContext() + +        test_cases = ( +            (None, silence.MSG_UNSILENCE_SUCCESS.format(duration=10)), +            (text_channel, silence.MSG_UNSILENCE_SUCCESS.format(duration=10)), +            (ctx.channel, silence.MSG_UNSILENCE_SUCCESS.format(duration=10)), +        ) + +        for target, message in test_cases: +            with self.subTest(target_channel=target, message=message): +                with mock.patch.object(self.cog, "_unsilence", return_value=True): +                    # Assign Return +                    if ctx.channel == target or target is None: +                        ctx.channel.overwrites_for.return_value = unsilenced_overwrite +                    else: +                        target.overwrites_for.return_value = unsilenced_overwrite + +                    await self.cog.unsilence.callback(self.cog, ctx, channel=target) + +                    # Check Messages +                    if ctx.channel == target or target is None: +                        ctx.channel.send.assert_called_once_with(message) +                    else: +                        ctx.channel.send.assert_called_once_with(message.replace("current", text_channel.mention)) +                        target.send.assert_called_once_with(message) + +            ctx.channel.send.reset_mock() +            if target is not None: +                target.send.reset_mock() +      async def test_skipped_already_unsilenced(self):          """Permissions were not set and `False` was returned for an already unsilenced channel."""          self.cog.scheduler.__contains__.return_value = False @@ -549,6 +675,10 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase):          await self.cog._unsilence(self.channel)          self.cog._mod_alerts_channel.send.assert_awaited_once() +        self.cog._mod_alerts_channel.send.reset_mock() + +        await self.cog._unsilence(MockVoiceChannel()) +        self.cog._mod_alerts_channel.send.assert_awaited_once()      async def test_removed_notifier(self):          """Channel was removed from `notifier`.""" @@ -587,3 +717,83 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase):                  del new_overwrite_dict['add_reactions']                  self.assertDictEqual(prev_overwrite_dict, new_overwrite_dict) + +    async def test_unsilence_helper_fail(self): +        """Tests unsilence_wrapper when `_unsilence` fails.""" +        ctx = MockContext() + +        text_channel = MockTextChannel() +        text_role = self.cog.bot.get_guild(Guild.id).get_role(Roles.verified) + +        voice_channel = MockVoiceChannel() +        voice_role = self.cog.bot.get_guild(Guild.id).get_role(Roles.voice_verified) + +        test_cases = ( +            (ctx, text_channel, text_role, True, silence.MSG_UNSILENCE_FAIL), +            (ctx, text_channel, text_role, False, silence.MSG_UNSILENCE_MANUAL), +            (ctx, voice_channel, voice_role, True, silence.MSG_UNSILENCE_FAIL), +            (ctx, voice_channel, voice_role, False, silence.MSG_UNSILENCE_MANUAL), +        ) + +        class PermClass: +            """Class to Mock return permissions""" +            def __init__(self, value: bool): +                self.val = value + +            def __getattr__(self, item): +                return self.val + +        for context, channel, role, permission, message in test_cases: +            with self.subTest(channel=channel, message=message): +                with mock.patch.object(channel, "overwrites_for", return_value=PermClass(permission)) as overwrites: +                    with mock.patch.object(self.cog, "send_message") as send_message: +                        with mock.patch.object(self.cog, "_unsilence", return_value=False): +                            await self.cog._unsilence_wrapper(channel, context) + +                            overwrites.assert_called_once_with(role) +                            send_message.assert_called_once_with(message, ctx.channel, channel) + +    async def test_correct_overwrites(self): +        """Tests the overwrites returned by the _unsilence_wrapper are correct for voice and text channels.""" +        ctx = MockContext() + +        text_channel = MockTextChannel() +        text_role = self.cog.bot.get_guild(Guild.id).get_role(Roles.verified) + +        voice_channel = MockVoiceChannel() +        voice_role = self.cog.bot.get_guild(Guild.id).get_role(Roles.voice_verified) + +        async def reset(): +            await text_channel.set_permissions(text_role, PermissionOverwrite(send_messages=False, add_reactions=False)) +            await voice_channel.set_permissions(voice_role, PermissionOverwrite(speak=False, connect=False)) + +            text_channel.reset_mock() +            voice_channel.reset_mock() +        await reset() + +        default_text_overwrites = text_channel.overwrites_for(text_role) +        default_voice_overwrites = voice_channel.overwrites_for(voice_role) + +        test_cases = ( +            (ctx, text_channel, text_role, default_text_overwrites, silence.MSG_UNSILENCE_SUCCESS), +            (ctx, voice_channel, voice_role, default_voice_overwrites, silence.MSG_UNSILENCE_SUCCESS), +            (ctx, ctx.channel, text_role, ctx.channel.overwrites_for(text_role), silence.MSG_UNSILENCE_SUCCESS), +            (None, text_channel, text_role, default_text_overwrites, silence.MSG_UNSILENCE_SUCCESS), +        ) + +        for context, channel, role, overwrites, message in test_cases: +            with self.subTest(ctx=context, channel=channel): +                with mock.patch.object(self.cog, "send_message") as send_message: +                    with mock.patch.object(self.cog, "_force_voice_sync"): +                        await self.cog._unsilence_wrapper(channel, context) + +                        if context is None: +                            send_message.assert_called_once_with(message, channel, channel, True) +                        else: +                            send_message.assert_called_once_with(message, context.channel, channel, True) + +                        channel.set_permissions.assert_called_once_with(role, overwrite=overwrites) +                        if channel != ctx.channel: +                            ctx.channel.assert_not_called() + +            await reset()  |