aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Hassan Abouelela <[email protected]>2020-11-28 20:14:48 +0300
committerGravatar Hassan Abouelela <[email protected]>2020-11-28 20:14:48 +0300
commita6c8a9aca63f7d40d6c5701a08626da198c1d54a (patch)
tree236fa95603e1e81d0d962e1a14bab3af91c132c9
parentClarifies Constants Use in Silence (diff)
Refractors Voice Sync Helper
Refractors the voice sync helper function into two different functions, one for each purpose. Moves the afk_channel get/creation code to its own function. Updates tests. Signed-off-by: Hassan Abouelela <[email protected]>
-rw-r--r--bot/exts/moderation/silence.py88
-rw-r--r--tests/bot/exts/moderation/test_silence.py124
2 files changed, 121 insertions, 91 deletions
diff --git a/bot/exts/moderation/silence.py b/bot/exts/moderation/silence.py
index d1db0da9b..9b3725326 100644
--- a/bot/exts/moderation/silence.py
+++ b/bot/exts/moderation/silence.py
@@ -174,7 +174,10 @@ class Silence(commands.Cog):
return
if isinstance(channel, VoiceChannel):
- await self._force_voice_sync(channel, kick=kick)
+ if kick:
+ await self._kick_voice_members(channel)
+ else:
+ await self._force_voice_sync(channel)
await self._schedule_unsilence(ctx, channel, duration)
@@ -252,56 +255,57 @@ class Silence(commands.Cog):
return True
- async def _force_voice_sync(
- self, channel: VoiceChannel, member: Optional[Member] = None, kick: bool = False
- ) -> None:
- """
- Move all non-staff members from `channel` to a temporary channel and back to force toggle role mute.
-
- If `member` is passed, the mute only occurs to that member.
- Permission modification has to happen before this function.
-
- If `kick_all` is True, members will not be added back to the voice channel.
- """
- # Handle member picking logic
- if member is not None:
- members = [member]
- else:
- members = channel.members
-
- # Handle kick logic
- if kick:
- for member in members:
- await member.move_to(None, reason="Kicking voice channel member.")
+ @staticmethod
+ async def _get_afk_channel(guild: Guild) -> VoiceChannel:
+ """Get a guild's AFK channel, or create one if it does not exist."""
+ afk_channel = guild.afk_channel
- log.debug(f"Kicked all members from #{channel.name} ({channel.id}).")
- return
-
- # Obtain temporary channel
- afk_channel = channel.guild.afk_channel
if afk_channel is None:
overwrites = {
- channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
+ guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
}
- afk_channel = await channel.guild.create_voice_channel("mute-temp", overwrites=overwrites)
+ afk_channel = await guild.create_voice_channel("mute-temp", overwrites=overwrites)
log.info(f"Failed to get afk-channel, created temporary channel #{afk_channel} ({afk_channel.id})")
- # Schedule channel deletion in case function errors out
- self.scheduler.schedule_later(
- 30, afk_channel.id, afk_channel.delete(reason="Deleting temp mute channel.")
- )
+ return afk_channel
- # Move all members to temporary channel and back
- for member in members:
- # Skip staff
- if self._helper_role in member.roles:
- continue
+ async def _kick_voice_members(self, channel: VoiceChannel) -> None:
+ """Remove all non-staff members from a voice channel."""
+ log.debug(f"Removing all non staff members from #{channel.name} ({channel.id}).")
+
+ for member in channel.members:
+ if self._helper_role not in member.roles:
+ await member.move_to(None, reason="Kicking member from voice channel.")
- await member.move_to(afk_channel, reason="Muting member.")
- log.debug(f"Moved {member.name} to afk channel.")
+ log.debug("Removed all members.")
- await member.move_to(channel, reason="Muting member.")
- log.debug(f"Moved {member.name} to original voice channel.")
+ async def _force_voice_sync(self, channel: VoiceChannel) -> None:
+ """
+ Move all non-staff members from `channel` to a temporary channel and back to force toggle role mute.
+
+ Permission modification has to happen before this function.
+ """
+ # Obtain temporary channel
+ delete_channel = channel.guild.afk_channel is None
+ afk_channel = await self._get_afk_channel(channel.guild)
+
+ try:
+ # Move all members to temporary channel and back
+ for member in channel.members:
+ # Skip staff
+ if self._helper_role in member.roles:
+ continue
+
+ await member.move_to(afk_channel, reason="Muting VC member.")
+ log.debug(f"Moved {member.name} to afk channel.")
+
+ await member.move_to(channel, reason="Muting VC member.")
+ log.debug(f"Moved {member.name} to original voice channel.")
+
+ finally:
+ # Delete VC channel if it was created.
+ if delete_channel:
+ await afk_channel.delete(reason="Deleting temp mute channel.")
async def _schedule_unsilence(
self, ctx: Context, channel: Union[TextChannel, VoiceChannel], duration: Optional[int]
diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py
index bff2888b9..9fb3e404a 100644
--- a/tests/bot/exts/moderation/test_silence.py
+++ b/tests/bot/exts/moderation/test_silence.py
@@ -2,7 +2,7 @@ import asyncio
import unittest
from datetime import datetime, timezone
from unittest import mock
-from unittest.mock import Mock
+from unittest.mock import AsyncMock, Mock
from async_rediscache import RedisSession
from discord import PermissionOverwrite
@@ -266,67 +266,69 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase):
"""Tests the _force_voice_sync helper function."""
await self.cog._async_init()
- afk_channel = MockVoiceChannel()
- channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel))
-
members = []
for _ in range(10):
members.append(MockMember())
- channel.members = members
- test_cases = (
- (members[0], False, "Muting member."),
- (members[0], True, "Kicking voice channel member."),
- (None, False, "Muting member."),
- (None, True, "Kicking voice channel member."),
- )
-
- for member, kick, reason in test_cases:
- with self.subTest(members=member, kick=kick, reason=reason):
- await self.cog._force_voice_sync(channel, member, kick)
-
- for single_member in channel.members if member is None else [member]:
- if kick:
- single_member.move_to.assert_called_once_with(None, reason=reason)
- else:
- self.assertEqual(single_member.move_to.call_count, 2)
- single_member.move_to.assert_has_calls([
- mock.call(afk_channel, reason=reason),
- mock.call(channel, reason=reason)
- ], any_order=False)
+ afk_channel = MockVoiceChannel()
+ channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel), members=members)
- single_member.reset_mock()
+ await self.cog._force_voice_sync(channel)
+ for member in members:
+ self.assertEqual(member.move_to.call_count, 2)
+ member.move_to.assert_has_calls([
+ mock.call(afk_channel, reason="Muting VC member."),
+ mock.call(channel, reason="Muting VC member.")
+ ], any_order=False)
async def test_force_voice_sync_staff(self):
"""Tests to ensure _force_voice_sync does not kick staff members."""
await self.cog._async_init()
member = MockMember(roles=[self.cog._helper_role])
- await self.cog._force_voice_sync(MockVoiceChannel(), member)
+ await self.cog._force_voice_sync(MockVoiceChannel(members=[member]))
member.move_to.assert_not_called()
async def test_force_voice_sync_no_channel(self):
"""Test to ensure _force_voice_sync can create its own voice channel if one is not available."""
await self.cog._async_init()
- member = MockMember()
channel = MockVoiceChannel(guild=MockGuild(afk_channel=None))
-
- new_channel = MockVoiceChannel(delete=Mock())
+ new_channel = MockVoiceChannel(delete=AsyncMock())
channel.guild.create_voice_channel.return_value = new_channel
- with mock.patch.object(self.cog.scheduler, "schedule_later") as scheduler:
- await self.cog._force_voice_sync(channel, member)
+ await self.cog._force_voice_sync(channel)
+
+ # Check channel creation
+ overwrites = {
+ channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
+ }
+ channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites)
+
+ # Check bot deleted channel
+ new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.")
+
+ async def test_voice_kick(self):
+ """Test to ensure kick function can remove all members from a voice channel."""
+ await self.cog._async_init()
+
+ members = []
+ for _ in range(10):
+ members.append(MockMember())
+
+ channel = MockVoiceChannel(members=members)
+ await self.cog._kick_voice_members(channel)
+
+ for member in members:
+ member.move_to.assert_called_once_with(None, reason="Kicking member from voice channel.")
- # Check channel creation
- overwrites = {
- channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
- }
- channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites)
+ async def test_voice_kick_staff(self):
+ """Test to ensure voice kick skips staff members."""
+ await self.cog._async_init()
+ member = MockMember(roles=[self.cog._helper_role])
- # Check bot queued deletion
- new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.")
- scheduler.assert_called_once_with(30, new_channel.id, new_channel.delete())
+ await self.cog._kick_voice_members(MockVoiceChannel(members=[member]))
+ member.move_to.assert_not_called()
@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False)
@@ -457,21 +459,45 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase):
)
for target, message in test_cases:
- with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
- with self.subTest(target_channel=target, message=message):
- await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target)
- if ctx.channel == target or target is None:
- ctx.channel.send.assert_called_once_with(message)
+ with mock.patch.object(self.cog, "_force_voice_sync") as voice_sync:
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
+ with self.subTest(target_channel=target, message=message):
+ await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target)
+ if ctx.channel == target or target is None:
+ ctx.channel.send.assert_called_once_with(message)
- else:
- ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention))
- if isinstance(target, MockTextChannel):
- target.send.assert_called_once_with(message)
+ else:
+ ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention))
+ if isinstance(target, MockTextChannel):
+ target.send.assert_called_once_with(message)
+ else:
+ voice_sync.assert_called_once_with(target)
ctx.channel.send.reset_mock()
if target is not None and isinstance(target, MockTextChannel):
target.send.reset_mock()
+ @mock.patch.object(silence.Silence, "_kick_voice_members")
+ @mock.patch.object(silence.Silence, "_force_voice_sync")
+ async def test_sync_or_kick_called(self, sync, kick):
+ """Tests if silence command calls kick or sync on voice channels when appropriate."""
+ channel = MockVoiceChannel()
+ ctx = MockContext()
+
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
+ with self.subTest("Test calls kick"):
+ await self.cog.silence.callback(self.cog, ctx, 10, kick=True, channel=channel)
+ kick.assert_called_once_with(channel)
+ sync.assert_not_called()
+
+ kick.reset_mock()
+ sync.reset_mock()
+
+ with self.subTest("Test calls sync"):
+ await self.cog.silence.callback(self.cog, ctx, 10, kick=False, channel=channel)
+ sync.assert_called_once_with(channel)
+ kick.assert_not_called()
+
async def test_skipped_already_silenced(self):
"""Permissions were not set and `False` was returned for an already silenced channel."""
subtests = (