aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/moderation/silence.py88
-rw-r--r--tests/bot/exts/moderation/test_silence.py124
2 files changed, 121 insertions, 91 deletions
diff --git a/bot/exts/moderation/silence.py b/bot/exts/moderation/silence.py
index d1db0da9b..9b3725326 100644
--- a/bot/exts/moderation/silence.py
+++ b/bot/exts/moderation/silence.py
@@ -174,7 +174,10 @@ class Silence(commands.Cog):
return
if isinstance(channel, VoiceChannel):
- await self._force_voice_sync(channel, kick=kick)
+ if kick:
+ await self._kick_voice_members(channel)
+ else:
+ await self._force_voice_sync(channel)
await self._schedule_unsilence(ctx, channel, duration)
@@ -252,56 +255,57 @@ class Silence(commands.Cog):
return True
- async def _force_voice_sync(
- self, channel: VoiceChannel, member: Optional[Member] = None, kick: bool = False
- ) -> None:
- """
- Move all non-staff members from `channel` to a temporary channel and back to force toggle role mute.
-
- If `member` is passed, the mute only occurs to that member.
- Permission modification has to happen before this function.
-
- If `kick_all` is True, members will not be added back to the voice channel.
- """
- # Handle member picking logic
- if member is not None:
- members = [member]
- else:
- members = channel.members
-
- # Handle kick logic
- if kick:
- for member in members:
- await member.move_to(None, reason="Kicking voice channel member.")
+ @staticmethod
+ async def _get_afk_channel(guild: Guild) -> VoiceChannel:
+ """Get a guild's AFK channel, or create one if it does not exist."""
+ afk_channel = guild.afk_channel
- log.debug(f"Kicked all members from #{channel.name} ({channel.id}).")
- return
-
- # Obtain temporary channel
- afk_channel = channel.guild.afk_channel
if afk_channel is None:
overwrites = {
- channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
+ guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
}
- afk_channel = await channel.guild.create_voice_channel("mute-temp", overwrites=overwrites)
+ afk_channel = await guild.create_voice_channel("mute-temp", overwrites=overwrites)
log.info(f"Failed to get afk-channel, created temporary channel #{afk_channel} ({afk_channel.id})")
- # Schedule channel deletion in case function errors out
- self.scheduler.schedule_later(
- 30, afk_channel.id, afk_channel.delete(reason="Deleting temp mute channel.")
- )
+ return afk_channel
- # Move all members to temporary channel and back
- for member in members:
- # Skip staff
- if self._helper_role in member.roles:
- continue
+ async def _kick_voice_members(self, channel: VoiceChannel) -> None:
+ """Remove all non-staff members from a voice channel."""
+ log.debug(f"Removing all non staff members from #{channel.name} ({channel.id}).")
+
+ for member in channel.members:
+ if self._helper_role not in member.roles:
+ await member.move_to(None, reason="Kicking member from voice channel.")
- await member.move_to(afk_channel, reason="Muting member.")
- log.debug(f"Moved {member.name} to afk channel.")
+ log.debug("Removed all members.")
- await member.move_to(channel, reason="Muting member.")
- log.debug(f"Moved {member.name} to original voice channel.")
+ async def _force_voice_sync(self, channel: VoiceChannel) -> None:
+ """
+ Move all non-staff members from `channel` to a temporary channel and back to force toggle role mute.
+
+ Permission modification has to happen before this function.
+ """
+ # Obtain temporary channel
+ delete_channel = channel.guild.afk_channel is None
+ afk_channel = await self._get_afk_channel(channel.guild)
+
+ try:
+ # Move all members to temporary channel and back
+ for member in channel.members:
+ # Skip staff
+ if self._helper_role in member.roles:
+ continue
+
+ await member.move_to(afk_channel, reason="Muting VC member.")
+ log.debug(f"Moved {member.name} to afk channel.")
+
+ await member.move_to(channel, reason="Muting VC member.")
+ log.debug(f"Moved {member.name} to original voice channel.")
+
+ finally:
+ # Delete VC channel if it was created.
+ if delete_channel:
+ await afk_channel.delete(reason="Deleting temp mute channel.")
async def _schedule_unsilence(
self, ctx: Context, channel: Union[TextChannel, VoiceChannel], duration: Optional[int]
diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py
index bff2888b9..9fb3e404a 100644
--- a/tests/bot/exts/moderation/test_silence.py
+++ b/tests/bot/exts/moderation/test_silence.py
@@ -2,7 +2,7 @@ import asyncio
import unittest
from datetime import datetime, timezone
from unittest import mock
-from unittest.mock import Mock
+from unittest.mock import AsyncMock, Mock
from async_rediscache import RedisSession
from discord import PermissionOverwrite
@@ -266,67 +266,69 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase):
"""Tests the _force_voice_sync helper function."""
await self.cog._async_init()
- afk_channel = MockVoiceChannel()
- channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel))
-
members = []
for _ in range(10):
members.append(MockMember())
- channel.members = members
- test_cases = (
- (members[0], False, "Muting member."),
- (members[0], True, "Kicking voice channel member."),
- (None, False, "Muting member."),
- (None, True, "Kicking voice channel member."),
- )
-
- for member, kick, reason in test_cases:
- with self.subTest(members=member, kick=kick, reason=reason):
- await self.cog._force_voice_sync(channel, member, kick)
-
- for single_member in channel.members if member is None else [member]:
- if kick:
- single_member.move_to.assert_called_once_with(None, reason=reason)
- else:
- self.assertEqual(single_member.move_to.call_count, 2)
- single_member.move_to.assert_has_calls([
- mock.call(afk_channel, reason=reason),
- mock.call(channel, reason=reason)
- ], any_order=False)
+ afk_channel = MockVoiceChannel()
+ channel = MockVoiceChannel(guild=MockGuild(afk_channel=afk_channel), members=members)
- single_member.reset_mock()
+ await self.cog._force_voice_sync(channel)
+ for member in members:
+ self.assertEqual(member.move_to.call_count, 2)
+ member.move_to.assert_has_calls([
+ mock.call(afk_channel, reason="Muting VC member."),
+ mock.call(channel, reason="Muting VC member.")
+ ], any_order=False)
async def test_force_voice_sync_staff(self):
"""Tests to ensure _force_voice_sync does not kick staff members."""
await self.cog._async_init()
member = MockMember(roles=[self.cog._helper_role])
- await self.cog._force_voice_sync(MockVoiceChannel(), member)
+ await self.cog._force_voice_sync(MockVoiceChannel(members=[member]))
member.move_to.assert_not_called()
async def test_force_voice_sync_no_channel(self):
"""Test to ensure _force_voice_sync can create its own voice channel if one is not available."""
await self.cog._async_init()
- member = MockMember()
channel = MockVoiceChannel(guild=MockGuild(afk_channel=None))
-
- new_channel = MockVoiceChannel(delete=Mock())
+ new_channel = MockVoiceChannel(delete=AsyncMock())
channel.guild.create_voice_channel.return_value = new_channel
- with mock.patch.object(self.cog.scheduler, "schedule_later") as scheduler:
- await self.cog._force_voice_sync(channel, member)
+ await self.cog._force_voice_sync(channel)
+
+ # Check channel creation
+ overwrites = {
+ channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
+ }
+ channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites)
+
+ # Check bot deleted channel
+ new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.")
+
+ async def test_voice_kick(self):
+ """Test to ensure kick function can remove all members from a voice channel."""
+ await self.cog._async_init()
+
+ members = []
+ for _ in range(10):
+ members.append(MockMember())
+
+ channel = MockVoiceChannel(members=members)
+ await self.cog._kick_voice_members(channel)
+
+ for member in members:
+ member.move_to.assert_called_once_with(None, reason="Kicking member from voice channel.")
- # Check channel creation
- overwrites = {
- channel.guild.default_role: PermissionOverwrite(speak=False, connect=False, view_channel=False)
- }
- channel.guild.create_voice_channel.assert_called_once_with("mute-temp", overwrites=overwrites)
+ async def test_voice_kick_staff(self):
+ """Test to ensure voice kick skips staff members."""
+ await self.cog._async_init()
+ member = MockMember(roles=[self.cog._helper_role])
- # Check bot queued deletion
- new_channel.delete.assert_called_once_with(reason="Deleting temp mute channel.")
- scheduler.assert_called_once_with(30, new_channel.id, new_channel.delete())
+ await self.cog._kick_voice_members(MockVoiceChannel(members=[member]))
+ member.move_to.assert_not_called()
@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False)
@@ -457,21 +459,45 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase):
)
for target, message in test_cases:
- with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
- with self.subTest(target_channel=target, message=message):
- await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target)
- if ctx.channel == target or target is None:
- ctx.channel.send.assert_called_once_with(message)
+ with mock.patch.object(self.cog, "_force_voice_sync") as voice_sync:
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
+ with self.subTest(target_channel=target, message=message):
+ await self.cog.silence.callback(self.cog, ctx, 10, False, channel=target)
+ if ctx.channel == target or target is None:
+ ctx.channel.send.assert_called_once_with(message)
- else:
- ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention))
- if isinstance(target, MockTextChannel):
- target.send.assert_called_once_with(message)
+ else:
+ ctx.channel.send.assert_called_once_with(message.replace("current channel", target.mention))
+ if isinstance(target, MockTextChannel):
+ target.send.assert_called_once_with(message)
+ else:
+ voice_sync.assert_called_once_with(target)
ctx.channel.send.reset_mock()
if target is not None and isinstance(target, MockTextChannel):
target.send.reset_mock()
+ @mock.patch.object(silence.Silence, "_kick_voice_members")
+ @mock.patch.object(silence.Silence, "_force_voice_sync")
+ async def test_sync_or_kick_called(self, sync, kick):
+ """Tests if silence command calls kick or sync on voice channels when appropriate."""
+ channel = MockVoiceChannel()
+ ctx = MockContext()
+
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
+ with self.subTest("Test calls kick"):
+ await self.cog.silence.callback(self.cog, ctx, 10, kick=True, channel=channel)
+ kick.assert_called_once_with(channel)
+ sync.assert_not_called()
+
+ kick.reset_mock()
+ sync.reset_mock()
+
+ with self.subTest("Test calls sync"):
+ await self.cog.silence.callback(self.cog, ctx, 10, kick=False, channel=channel)
+ sync.assert_called_once_with(channel)
+ kick.assert_not_called()
+
async def test_skipped_already_silenced(self):
"""Permissions were not set and `False` was returned for an already silenced channel."""
subtests = (