diff options
-rw-r--r-- | bot/constants.py | 2 | ||||
-rw-r--r-- | bot/converters.py | 40 | ||||
-rw-r--r-- | bot/exts/moderation/silence.py | 166 | ||||
-rw-r--r-- | tests/bot/exts/moderation/test_silence.py | 12 |
4 files changed, 172 insertions, 48 deletions
diff --git a/bot/constants.py b/bot/constants.py index 6bb6aacd2..f3870c2ed 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -398,6 +398,8 @@ class Channels(metaclass=YAMLGetter): change_log: int code_help_voice: int code_help_voice_2: int + admins_voice: int + staff_voice: int cooldown: int defcon: int dev_contrib: int diff --git a/bot/converters.py b/bot/converters.py index 2e118d476..613be73eb 100644 --- a/bot/converters.py +++ b/bot/converters.py @@ -536,6 +536,46 @@ class FetchedUser(UserConverter): raise BadArgument(f"User `{arg}` does not exist") +class AnyChannelConverter(Converter): + """ + Converts to a `discord.Channel` or, raises an error. + + Unlike the default Channel Converter, this converter can handle channels given + in string, id, or mention formatting for both `TextChannel`s and `VoiceChannel`s. + Always returns 1 or fewer channels, errors if more than one match exists. + + It is able to handle the following formats (caveats noted below:) + 1. Convert from ID - Example: 267631170882240512 + 2. Convert from Explicit Mention - Example: #welcome + 3. Convert from ID Mention - Example: <#267631170882240512> + 4. Convert from Unmentioned Name: - Example: welcome + + All the previous conversions are valid for both text and voice channels, but explicit + raw names (#4) do not work for non-unique channels, instead opting for an error. + Explicit mentions (#2) do not work for non-unique voice channels either. + """ + + async def convert(self, ctx: Context, arg: str) -> t.Union[discord.TextChannel, discord.VoiceChannel]: + """Convert the `arg` to a `TextChannel` or `VoiceChannel`.""" + stripped = arg.strip().lstrip("<").lstrip("#").rstrip(">").lower() + + # Filter channels by name and ID + channels = [channel for channel in ctx.guild.channels if stripped in (channel.name.lower(), str(channel.id))] + + if len(channels) == 0: + # Couldn't find a matching channel + log.debug(f"Could not convert `{arg}` to channel, no matches found.") + raise BadArgument(f"{arg} returned no matches.") + + elif len(channels) > 1: + # Couldn't discern the desired channel + log.debug(f"Could not convert `{arg}` to channel, {len(channels)} matches found.") + raise BadArgument(f"The provided argument returned too many matches ({len(channels)}).") + + else: + return channels[0] + + def _snowflake_from_regex(pattern: t.Pattern, arg: str) -> int: """ Extract the snowflake from `arg` using a regex `pattern` and return it as an int. diff --git a/bot/exts/moderation/silence.py b/bot/exts/moderation/silence.py index e6712b3b6..314aa946e 100644 --- a/bot/exts/moderation/silence.py +++ b/bot/exts/moderation/silence.py @@ -3,16 +3,16 @@ import logging from contextlib import suppress from datetime import datetime, timedelta, timezone from operator import attrgetter -from typing import Optional +from typing import Optional, Union from async_rediscache import RedisCache -from discord import TextChannel +from discord import TextChannel, VoiceChannel from discord.ext import commands, tasks from discord.ext.commands import Context from bot.bot import Bot from bot.constants import Channels, Emojis, Guild, MODERATION_ROLES, Roles -from bot.converters import HushDurationConverter +from bot.converters import AnyChannelConverter, HushDurationConverter from bot.utils.lock import LockedResourceError, lock_arg from bot.utils.scheduling import Scheduler @@ -41,7 +41,7 @@ class SilenceNotifier(tasks.Loop): self._silenced_channels = {} self._alert_channel = alert_channel - def add_channel(self, channel: TextChannel) -> None: + def add_channel(self, channel: Union[TextChannel, VoiceChannel]) -> None: """Add channel to `_silenced_channels` and start loop if not launched.""" if not self._silenced_channels: self.start() @@ -93,14 +93,54 @@ class Silence(commands.Cog): await self.bot.wait_until_guild_available() guild = self.bot.get_guild(Guild.id) - self._verified_role = guild.get_role(Roles.verified) + self._verified_msg_role = guild.get_role(Roles.verified) + self._verified_voice_role = guild.get_role(Roles.voice_verified) self._mod_alerts_channel = self.bot.get_channel(Channels.mod_alerts) self.notifier = SilenceNotifier(self.bot.get_channel(Channels.mod_log)) await self._reschedule() + async def send_message(self, message: str, source_channel: TextChannel, + target_channel: Union[TextChannel, VoiceChannel], + alert_target: bool = False, duration: HushDurationConverter = 0) -> None: + """Helper function to send message confirmation to `source_channel`, and notification to `target_channel`.""" + if isinstance(source_channel, TextChannel): + await source_channel.send( + message.replace("current", target_channel.mention if source_channel != target_channel else "current") + .replace("{duration}", str(duration)) + ) + + voice_chat = None + if isinstance(target_channel, VoiceChannel): + # Send to relevant channel + # TODO: Figure out a non-hardcoded way of doing this + if "offtopic" in target_channel.name.lower(): + voice_chat = self.bot.get_channel(Channels.voice_chat) + elif "code-help" in target_channel.name.lower(): + if "1" in target_channel.name.lower(): + voice_chat = self.bot.get_channel(Channels.code_help_voice) + else: + voice_chat = self.bot.get_channel(Channels.code_help_voice_2) + elif "admin" in target_channel.name.lower(): + voice_chat = self.bot.get_channel(Channels.admins_voice) + elif "staff" in target_channel.name.lower(): + voice_chat = self.bot.get_channel(Channels.staff_voice) + + if alert_target and source_channel != target_channel: + if isinstance(target_channel, VoiceChannel): + if voice_chat is None or voice_chat == source_channel: + return + + await voice_chat.send( + message.replace("{duration}", str(duration)).replace("current", voice_chat.mention) + ) + + else: + await target_channel.send(message.replace("{duration}", str(duration))) + @commands.command(aliases=("hush",)) @lock_arg(LOCK_NAMESPACE, "ctx", attrgetter("channel"), raise_error=True) - async def silence(self, ctx: Context, duration: HushDurationConverter = 10) -> None: + async def silence(self, ctx: Context, duration: HushDurationConverter = 10, + *, channel: AnyChannelConverter = None) -> None: """ Silence the current channel for `duration` minutes or `forever`. @@ -108,72 +148,100 @@ class Silence(commands.Cog): Indefinitely silenced channels get added to a notifier which posts notices every 15 minutes from the start. """ await self._init_task - - channel_info = f"#{ctx.channel} ({ctx.channel.id})" + if channel is None: + channel = ctx.channel + channel_info = f"#{channel} ({channel.id})" log.debug(f"{ctx.author} is silencing channel {channel_info}.") - if not await self._set_silence_overwrites(ctx.channel): + if not await self._set_silence_overwrites(channel): log.info(f"Tried to silence channel {channel_info} but the channel was already silenced.") - await ctx.send(MSG_SILENCE_FAIL) + await self.send_message(MSG_SILENCE_FAIL, ctx.channel, channel) return - await self._schedule_unsilence(ctx, duration) + await self._schedule_unsilence(ctx, channel, duration) if duration is None: - self.notifier.add_channel(ctx.channel) + self.notifier.add_channel(channel) log.info(f"Silenced {channel_info} indefinitely.") - await ctx.send(MSG_SILENCE_PERMANENT) + await self.send_message(MSG_SILENCE_PERMANENT, ctx.channel, channel, True) + else: log.info(f"Silenced {channel_info} for {duration} minute(s).") - await ctx.send(MSG_SILENCE_SUCCESS.format(duration=duration)) + await self.send_message(MSG_SILENCE_SUCCESS, ctx.channel, channel, True, duration) @commands.command(aliases=("unhush",)) - async def unsilence(self, ctx: Context) -> None: + async def unsilence(self, ctx: Context, *, channel: AnyChannelConverter = None) -> None: """ - Unsilence the current channel. + Unsilence the given channel if given, else the current one. If the channel was silenced indefinitely, notifications for the channel will stop. """ await self._init_task - log.debug(f"Unsilencing channel #{ctx.channel} from {ctx.author}'s command.") - await self._unsilence_wrapper(ctx.channel) + if channel is None: + channel = ctx.channel + log.debug(f"Unsilencing channel #{channel} from {ctx.author}'s command.") + await self._unsilence_wrapper(channel, ctx) @lock_arg(LOCK_NAMESPACE, "channel", raise_error=True) - async def _unsilence_wrapper(self, channel: TextChannel) -> None: + async def _unsilence_wrapper(self, channel: Union[TextChannel, VoiceChannel], + ctx: Optional[Context] = None) -> None: """Unsilence `channel` and send a success/failure message.""" + msg_channel = channel + if ctx is not None: + msg_channel = ctx.channel + if not await self._unsilence(channel): - overwrite = channel.overwrites_for(self._verified_role) - if overwrite.send_messages is False or overwrite.add_reactions is False: - await channel.send(MSG_UNSILENCE_MANUAL) + if isinstance(channel, VoiceChannel): + overwrite = channel.overwrites_for(self._verified_voice_role) + manual = overwrite.speak is False + else: + overwrite = channel.overwrites_for(self._verified_msg_role) + manual = overwrite.send_messages is False or overwrite.add_reactions is False + + # Send fail message to muted channel or voice chat channel, and invocation channel + if manual: + await self.send_message(MSG_UNSILENCE_MANUAL, msg_channel, channel) else: - await channel.send(MSG_UNSILENCE_FAIL) + await self.send_message(MSG_UNSILENCE_FAIL, msg_channel, channel) + else: - await channel.send(MSG_UNSILENCE_SUCCESS) + # Send success message to muted channel or voice chat channel, and invocation channel + await self.send_message(MSG_UNSILENCE_SUCCESS, msg_channel, channel, True) - async def _set_silence_overwrites(self, channel: TextChannel) -> bool: + async def _set_silence_overwrites(self, channel: Union[TextChannel, VoiceChannel]) -> bool: """Set silence permission overwrites for `channel` and return True if successful.""" - overwrite = channel.overwrites_for(self._verified_role) - prev_overwrites = dict(send_messages=overwrite.send_messages, add_reactions=overwrite.add_reactions) + if isinstance(channel, TextChannel): + overwrite = channel.overwrites_for(self._verified_msg_role) + prev_overwrites = dict(send_messages=overwrite.send_messages, add_reactions=overwrite.add_reactions) + else: + overwrite = channel.overwrites_for(self._verified_voice_role) + prev_overwrites = dict(speak=overwrite.speak) if channel.id in self.scheduler or all(val is False for val in prev_overwrites.values()): return False - overwrite.update(send_messages=False, add_reactions=False) - await channel.set_permissions(self._verified_role, overwrite=overwrite) + if isinstance(channel, TextChannel): + overwrite.update(send_messages=False, add_reactions=False) + await channel.set_permissions(self._verified_msg_role, overwrite=overwrite) + else: + overwrite.update(speak=False) + await channel.set_permissions(self._verified_voice_role, overwrite=overwrite) + await self.previous_overwrites.set(channel.id, json.dumps(prev_overwrites)) return True - async def _schedule_unsilence(self, ctx: Context, duration: Optional[int]) -> None: + async def _schedule_unsilence(self, ctx: Context, channel: typing.Union[TextChannel, VoiceChannel], + duration: Optional[int]) -> None: """Schedule `ctx.channel` to be unsilenced if `duration` is not None.""" if duration is None: - await self.unsilence_timestamps.set(ctx.channel.id, -1) + await self.unsilence_timestamps.set(channel.id, -1) else: - self.scheduler.schedule_later(duration * 60, ctx.channel.id, ctx.invoke(self.unsilence)) + self.scheduler.schedule_later(duration * 60, channel.id, ctx.invoke(self.unsilence, channel=channel)) unsilence_time = datetime.now(tz=timezone.utc) + timedelta(minutes=duration) - await self.unsilence_timestamps.set(ctx.channel.id, unsilence_time.timestamp()) + await self.unsilence_timestamps.set(channel.id, unsilence_time.timestamp()) - async def _unsilence(self, channel: TextChannel) -> bool: + async def _unsilence(self, channel: Union[TextChannel, VoiceChannel]) -> bool: """ Unsilence `channel`. @@ -188,14 +256,21 @@ class Silence(commands.Cog): log.info(f"Tried to unsilence channel #{channel} ({channel.id}) but the channel was not silenced.") return False - overwrite = channel.overwrites_for(self._verified_role) + if isinstance(channel, TextChannel): + overwrite = channel.overwrites_for(self._verified_msg_role) + else: + overwrite = channel.overwrites_for(self._verified_voice_role) + if prev_overwrites is None: log.info(f"Missing previous overwrites for #{channel} ({channel.id}); defaulting to None.") - overwrite.update(send_messages=None, add_reactions=None) + overwrite.update(send_messages=None, add_reactions=None, speak=None) else: overwrite.update(**json.loads(prev_overwrites)) - await channel.set_permissions(self._verified_role, overwrite=overwrite) + if isinstance(channel, TextChannel): + await channel.set_permissions(self._verified_msg_role, overwrite=overwrite) + else: + await channel.set_permissions(self._verified_voice_role, overwrite=overwrite) log.info(f"Unsilenced channel #{channel} ({channel.id}).") self.scheduler.cancel(channel.id) @@ -204,11 +279,18 @@ class Silence(commands.Cog): await self.unsilence_timestamps.delete(channel.id) if prev_overwrites is None: - await self._mod_alerts_channel.send( - f"<@&{Roles.admins}> Restored overwrites with default values after unsilencing " - f"{channel.mention}. Please check that the `Send Messages` and `Add Reactions` " - f"overwrites for {self._verified_role.mention} are at their desired values." - ) + if isinstance(channel, TextChannel): + await self._mod_alerts_channel.send( + f"<@&{Roles.admins}> Restored overwrites with default values after unsilencing " + f"{channel.mention}. Please check that the `Send Messages` and `Add Reactions` " + f"overwrites for {self._verified_msg_role.mention} are at their desired values." + ) + else: + await self._mod_alerts_channel.send( + f"<@&{Roles.admins}> Restored overwrites with default values after unsilencing " + f"{channel.mention}. Please check that the `Speak` " + f"overwrites for {self._verified_voice_role.mention} are at their desired values." + ) return True diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py index 104293d8e..bac933115 100644 --- a/tests/bot/exts/moderation/test_silence.py +++ b/tests/bot/exts/moderation/test_silence.py @@ -123,7 +123,7 @@ class SilenceCogTests(unittest.IsolatedAsyncioTestCase): guild.get_role.side_effect = lambda id_: Mock(id=id_) await self.cog._async_init() - self.assertEqual(self.cog._verified_role.id, Roles.verified) + self.assertEqual(self.cog._verified_msg_role.id, Roles.verified) @autospec(silence, "SilenceNotifier", pass_mocks=False) async def test_async_init_got_channels(self): @@ -277,7 +277,7 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=was_silenced): with self.subTest(was_silenced=was_silenced, message=message, duration=duration): await self.cog.silence.callback(self.cog, ctx, duration) - ctx.send.assert_called_once_with(message) + ctx.channel.send.assert_called_once_with(message) async def test_skipped_already_silenced(self): """Permissions were not set and `False` was returned for an already silenced channel.""" @@ -302,7 +302,7 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): self.assertFalse(self.overwrite.send_messages) self.assertFalse(self.overwrite.add_reactions) self.channel.set_permissions.assert_awaited_once_with( - self.cog._verified_role, + self.cog._verified_msg_role, overwrite=self.overwrite ) @@ -372,7 +372,7 @@ class SilenceTests(unittest.IsolatedAsyncioTestCase): args = (300, ctx.channel.id, ctx.invoke.return_value) self.cog.scheduler.schedule_later.assert_called_once_with(*args) - ctx.invoke.assert_called_once_with(self.cog.unsilence) + ctx.invoke.assert_called_once_with(self.cog.unsilence, channel=ctx.channel) async def test_permanent_not_scheduled(self): """A task was not scheduled for a permanent silence.""" @@ -435,7 +435,7 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase): """Channel's `send_message` and `add_reactions` overwrites were restored.""" await self.cog._unsilence(self.channel) self.channel.set_permissions.assert_awaited_once_with( - self.cog._verified_role, + self.cog._verified_msg_role, overwrite=self.overwrite, ) @@ -449,7 +449,7 @@ class UnsilenceTests(unittest.IsolatedAsyncioTestCase): await self.cog._unsilence(self.channel) self.channel.set_permissions.assert_awaited_once_with( - self.cog._verified_role, + self.cog._verified_msg_role, overwrite=self.overwrite, ) |