diff options
author | 2020-11-28 20:14:48 +0300 | |
---|---|---|
committer | 2020-11-28 20:14:48 +0300 | |
commit | a6c8a9aca63f7d40d6c5701a08626da198c1d54a (patch) | |
tree | 236fa95603e1e81d0d962e1a14bab3af91c132c9 | |
parent | Clarifies Constants Use in Silence (diff) |
Refractors Voice Sync Helper
Refractors the voice sync helper function into two different functions,
one for each purpose. Moves the afk_channel get/creation code to its own
function. Updates tests.
Signed-off-by: Hassan Abouelela <[email protected]>
-rw-r--r-- | bot/exts/moderation/silence.py | 88 | ||||
-rw-r--r-- | tests/bot/exts/moderation/test_silence.py | 124 |
2 files changed, 121 insertions, 91 deletions
diff --git a/bot/exts/moderation/silence.py b/bot/exts/moderation/silence.py index d1db0da9b..9b3725326 100644 --- a/bot/exts/moderation/silence.py +++ b/bot/exts/moderation/silence.py @@ -174,7 +174,10 @@ class Silence(commands.Cog): return if isinstance(channel, VoiceChannel): - await self._force_voice_sync(channel, kick=kick) + if kick: + await self._kick_voice_members(channel) + else: + await self._force_voice_sync(channel) await self._schedule_unsilence(ctx, channel, duration) @@ -252,56 +255,57 @@ class Silence(commands.Cog): return True - async def _force_voice_sync( - self, channel: VoiceChannel, member: Optional[Member] = None, kick: bool = False - ) -> None: - """ - Move all non-staff members from `channel` to a temporary channel and back to force toggle role mute. - - If `member` is passed, the mute only occurs to that member. - Permission modification has to happen before this function. - - If `kick_all` is True, members will not be added back to the voice channel. - """ - # Handle member picking logic - if member is not None: - members = [member] - else: - members = channel.members - - # Handle kick logic - if kick: - for member in members: - await member.move_to(None, reason="Kicking voice channel member.") + @staticmethod + async def _get_afk_channel(guild: Guild) -> VoiceChannel: + """Get a guild's AFK channel, or create one if it does not exist.""" + afk_channel = guild.afk_channel - log.debug(f"Kicked all members from #{channel.name} ({channel.id}).") - return - - # Obtain temporary channel - afk_channel = channel.guild.afk_channel if afk_channel is None: overwrites = { - channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False) + guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False) } - afk_channel = await channel.guild.create_voice_channel("mute-temp", overwrites=overwrites) + afk_channel = await guild.create_voice_channel("mute-temp", overwrites=overwrites) log.info(f"Failed to get afk-channel, created temporary channel #{afk_channel} ({afk_channel.id})") - # Schedule channel deletion in case function errors out - self.scheduler.schedule_later( - 30, afk_channel.id, afk_channel.delete(reason="Deleting temp mute channel.") - ) + return afk_channel - # Move all members to temporary channel and back - for member in members: - # Skip staff - if self._helper_role in member.roles: - continue + async def _kick_voice_members(self, channel: VoiceChannel) -> None: + """Remove all non-staff members from a voice channel.""" + log.debug(f"Removing all non staff members from #{channel.name} ({channel.id}).") + + for member in channel.members: + if self._helper_role not in member.roles: + await member.move_to(None, reason="Kicking member from voice channel.") - await member.move_to(afk_channel, reason="Muting member.") - log.debug(f"Moved {member.name} to afk channel.") + log.debug("Removed all members.") - await member.move_to(channel, reason="Muting member.") - log.debug(f"Moved {member.name} to original voice channel.") + async def _force_voice_sync(self, channel: VoiceChannel) -> None: + """ + Move all non-staff members from `channel` to a temporary channel and back to force toggle role mute. + + Permission modification has to happen before this function. + """ + # Obtain temporary channel + delete_channel = channel.guild.afk_channel is None + afk_channel = await self._get_afk_channel(channel.guild) + + try: + # Move all members to temporary channel and back + for member in channel.members: + # Skip staff + if self._helper_role in member.roles: + continue + + await member.move_to(afk_channel, reason="Muting VC member.") + log.debug(f"Moved {member.name} to afk channel.") + + await member.move_to(channel, reason="Muting VC member.") + log.debug(f"Moved {member.name} to original voice channel.") + + finally: + # Delete VC channel if it was created. + if delete_channel: + await afk_channel.delete(reason="Deleting temp mute channel.") async def _schedule_unsilence( self, ctx: Context, channel: Union[TextChannel, VoiceChannel], duration: Optional[int] diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py index bff2888b9..9fb3e404a 100644 --- a/tests/bot/exts/moderation/test_silence.py +++ b/tests/bot/exts/moderation/test_silence.py @@ -2,7 +2,7 @@ import asyncio import unittest from datetime import datetime, timezone from unittest import mock -from unittest.mock import Mock +from unittest.mock import AsyncMock, Mock from async_rediscache import RedisSession from discord import PermissionOverwrite @@ -266,67 +266,69 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase): """Tests the _force_voice_sync helper function.""" await self.cog._async_init() - afk_channel = MockVoiceChannel() - channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel)) - members = [] for _ in range(10): members.append(MockMember()) - channel.members = members - test_cases = ( - (members[0], False, "Muting member."), - (members[0], True, "Kicking voice channel member."), - (None, False, "Muting member."), - (None, True, "Kicking voice channel member."), - ) - - for member, kick, reason in test_cases: - with self.subTest(members=member, kick=kick, reason=reason): - await self.cog._force_voice_sync(channel, member, kick) - - for single_member in channel.members if member is None else [member]: - if kick: - single_member.move_to.assert_called_once_with(None, reason=reason) - else: - self.assertEqual(single_member.move_to.call_count, 2) - single_member.move_to.assert_has_calls([ - mock.call(afk_channel, reason=reason), - mock.call(channel, reason=reason) - ], any_order=False) + afk_channel = MockVoiceChannel() + channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel), members=members) - single_member.reset_mock() + await self.cog._force_voice_sync(channel) + for member in members: + self.assertEqual(member.move_to.call_count, 2) + member.move_to.assert_has_calls([ + mock.call(afk_channel, reason="Muting VC member."), + mock.call(channel, reason="Muting VC member.") + ], any_order=False) async def test_force_voice_sync_staff(self): """Tests to ensure _force_voice_sync does not kick staff members.""" await self.cog._async_init() member = MockMember(roles=[self.cog._helper_role]) - await self.cog._force_voice_sync(MockVoiceChannel(), member) + await self.cog._force_voice_sync(MockVoiceChannel(members=[member])) member.move_to.assert_not_called() async def test_force_voice_sync_no_channel(self): """Test to ensure _force_voice_sync can create its own voice channel if one is not available.""" await self.cog._async_init() - member = MockMember() channel = MockVoiceChannel(guild=MockGuild(afk_channel=None)) - - new_channel = MockVoiceChannel(delete=Mock()) + new_channel = MockVoiceChannel(delete=AsyncMock()) channel.guild.create_voice_channel.return_value = new_channel - with mock.patch.object(self.cog.scheduler, "schedule_later") as scheduler: - await self.cog._force_voice_sync(channel, member) + await self.cog._force_voice_sync(channel) + + # Check channel creation + overwrites = { + channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False) + } + channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites) + + # Check bot deleted channel + new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.") + + async def test_voice_kick(self): + """Test to ensure kick function can remove all members from a voice channel.""" + await self.cog._async_init() + + members = [] + for _ in range(10): + members.append(MockMember()) + + channel = MockVoiceChannel(members=members) + await self.cog._kick_voice_members(channel) + + for member in members: + member.move_to.assert_called_once_with(None, reason="Kicking member from voice channel.") - # Check channel creation - overwrites = { - channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False) - } - channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites) + async def test_voice_kick_staff(self): + """Test to ensure voice kick skips staff members.""" + await self.cog._async_init() + member = MockMember(roles=[self.cog._helper_role]) - # Check bot queued deletion - new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.") - scheduler.assert_called_once_with(30, new_channel.id, new_channel.delete()) + await self.cog._kick_voice_members(MockVoiceChannel(members=[member])) + member.move_to.assert_not_called() @autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False) @@ -457,21 +459,45 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): ) for target, message in test_cases: - with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True): - with self.subTest(target_channel=target, message=message): - await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target) - if ctx.channel == target or target is None: - ctx.channel.send.assert_called_once_with(message) + with mock.patch.object(self.cog, "_force_voice_sync") as voice_sync: + with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True): + with self.subTest(target_channel=target, message=message): + await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target) + if ctx.channel == target or target is None: + ctx.channel.send.assert_called_once_with(message) - else: - ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention)) - if isinstance(target, MockTextChannel): - target.send.assert_called_once_with(message) + else: + ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention)) + if isinstance(target, MockTextChannel): + target.send.assert_called_once_with(message) + else: + voice_sync.assert_called_once_with(target) ctx.channel.send.reset_mock() if target is not None and isinstance(target, MockTextChannel): target.send.reset_mock() + @mock.patch.object(silence.Silence, "_kick_voice_members") + @mock.patch.object(silence.Silence, "_force_voice_sync") + async def test_sync_or_kick_called(self, sync, kick): + """Tests if silence command calls kick or sync on voice channels when appropriate.""" + channel = MockVoiceChannel() + ctx = MockContext() + + with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True): + with self.subTest("Test calls kick"): + await self.cog.silence.callback(self.cog, ctx, 10, kick=True, channel=channel) + kick.assert_called_once_with(channel) + sync.assert_not_called() + + kick.reset_mock() + sync.reset_mock() + + with self.subTest("Test calls sync"): + await self.cog.silence.callback(self.cog, ctx, 10, kick=False, channel=channel) + sync.assert_called_once_with(channel) + kick.assert_not_called() + async def test_skipped_already_silenced(self): """Permissions were not set and `False` was returned for an already silenced channel.""" subtests = ( |