diff options
Diffstat (limited to '')
| -rw-r--r-- | bot/__main__.py | 1 | ||||
| -rw-r--r-- | bot/cogs/alias.py | 2 | ||||
| -rw-r--r-- | bot/cogs/bot.py | 1 | ||||
| -rw-r--r-- | bot/cogs/error_handler.py | 35 | ||||
| -rw-r--r-- | bot/cogs/moderation/__init__.py | 4 | ||||
| -rw-r--r-- | bot/cogs/moderation/scheduler.py | 2 | ||||
| -rw-r--r-- | bot/cogs/moderation/silence.py | 159 | ||||
| -rw-r--r-- | bot/cogs/moderation/superstarify.py | 4 | ||||
| -rw-r--r-- | bot/cogs/moderation/utils.py | 2 | ||||
| -rw-r--r-- | bot/cogs/snekbox.py | 2 | ||||
| -rw-r--r-- | bot/cogs/sync/syncers.py | 2 | ||||
| -rw-r--r-- | bot/cogs/utils.py | 21 | ||||
| -rw-r--r-- | bot/cogs/webhook_remover.py | 72 | ||||
| -rw-r--r-- | bot/converters.py | 30 | ||||
| -rw-r--r-- | bot/utils/messages.py | 2 | ||||
| -rw-r--r-- | tests/bot/cogs/moderation/__init__.py | 0 | ||||
| -rw-r--r-- | tests/bot/cogs/moderation/test_silence.py | 251 | ||||
| -rw-r--r-- | tests/bot/test_converters.py | 30 |
18 files changed, 606 insertions, 14 deletions
diff --git a/bot/__main__.py b/bot/__main__.py index 3df477a6d..8c3ae02e3 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -63,6 +63,7 @@ bot.load_extension("bot.cogs.tags") bot.load_extension("bot.cogs.token_remover") bot.load_extension("bot.cogs.utils") bot.load_extension("bot.cogs.watchchannels") +bot.load_extension("bot.cogs.webhook_remover") bot.load_extension("bot.cogs.wolfram") # Apply `message_edited_at` patch if discord.py did not yet release a bug fix. diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py index 9001e18f0..55c7efe65 100644 --- a/bot/cogs/alias.py +++ b/bot/cogs/alias.py @@ -29,7 +29,7 @@ class Alias (Cog): return log.info(f'Did not find command "{cmd_name}" to invoke.') elif not await cmd.can_run(ctx): return log.info( - f'{str(ctx.author)} tried to run the command "{cmd_name}"' + f'{str(ctx.author)} tried to run the command "{cmd_name}" but lacks permission.' ) await ctx.invoke(cmd, *args, **kwargs) diff --git a/bot/cogs/bot.py b/bot/cogs/bot.py index f17135877..e897b30ff 100644 --- a/bot/cogs/bot.py +++ b/bot/cogs/bot.py @@ -67,7 +67,6 @@ class BotCog(Cog, name="Bot"): icon_url=URLs.bot_avatar ) - log.info(f"{ctx.author} called !about. Returning information about the bot.") await ctx.send(embed=embed) @command(name='echo', aliases=('print',)) diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py index 261769efc..6a622d2ce 100644 --- a/bot/cogs/error_handler.py +++ b/bot/cogs/error_handler.py @@ -31,7 +31,9 @@ class ErrorHandler(Cog): Error handling emits a single error message in the invoking context `ctx` and a log message, prioritised as follows: - 1. If the name fails to match a command but matches a tag, the tag is invoked + 1. If the name fails to match a command: + * If it matches shh+ or unshh+, the channel is silenced or unsilenced respectively. + Otherwise if it matches a tag, the tag is invoked * If CommandNotFound is raised when invoking the tag (determined by the presence of the `invoked_from_error_handler` attribute), this error is treated as being unexpected and therefore sends an error message @@ -48,9 +50,11 @@ class ErrorHandler(Cog): log.trace(f"Command {command} had its error already handled locally; ignoring.") return - # Try to look for a tag with the command's name if the command isn't found. if isinstance(e, errors.CommandNotFound) and not hasattr(ctx, "invoked_from_error_handler"): + if await self.try_silence(ctx): + return if ctx.channel.id != Channels.verification: + # Try to look for a tag with the command's name await self.try_get_tag(ctx) return # Exit early to avoid logging. elif isinstance(e, errors.UserInputError): @@ -89,6 +93,33 @@ class ErrorHandler(Cog): else: return self.bot.get_command("help") + async def try_silence(self, ctx: Context) -> bool: + """ + Attempt to invoke the silence or unsilence command if invoke with matches a pattern. + + Respecting the checks if: + * invoked with `shh+` silence channel for amount of h's*2 with max of 15. + * invoked with `unshh+` unsilence channel + Return bool depending on success of command. + """ + command = ctx.invoked_with.lower() + silence_command = self.bot.get_command("silence") + ctx.invoked_from_error_handler = True + try: + if not await silence_command.can_run(ctx): + log.debug("Cancelling attempt to invoke silence/unsilence due to failed checks.") + return False + except errors.CommandError: + log.debug("Cancelling attempt to invoke silence/unsilence due to failed checks.") + return False + if command.startswith("shh"): + await ctx.invoke(silence_command, duration=min(command.count("h")*2, 15)) + return True + elif command.startswith("unshh"): + await ctx.invoke(self.bot.get_command("unsilence")) + return True + return False + async def try_get_tag(self, ctx: Context) -> None: """ Attempt to display a tag by interpreting the command name as a tag name. diff --git a/bot/cogs/moderation/__init__.py b/bot/cogs/moderation/__init__.py index 5243cb92d..6880ca1bd 100644 --- a/bot/cogs/moderation/__init__.py +++ b/bot/cogs/moderation/__init__.py @@ -2,12 +2,14 @@ from bot.bot import Bot from .infractions import Infractions from .management import ModManagement from .modlog import ModLog +from .silence import Silence from .superstarify import Superstarify def setup(bot: Bot) -> None: - """Load the Infractions, ModManagement, ModLog, and Superstarify cogs.""" + """Load the Infractions, ModManagement, ModLog, Silence, and Superstarify cogs.""" bot.add_cog(Infractions(bot)) bot.add_cog(ModLog(bot)) bot.add_cog(ModManagement(bot)) + bot.add_cog(Silence(bot)) bot.add_cog(Superstarify(bot)) diff --git a/bot/cogs/moderation/scheduler.py b/bot/cogs/moderation/scheduler.py index f0b6b2c48..917697be9 100644 --- a/bot/cogs/moderation/scheduler.py +++ b/bot/cogs/moderation/scheduler.py @@ -222,7 +222,7 @@ class InfractionScheduler(Scheduler): # If multiple active infractions were found, mark them as inactive in the database # and cancel their expiration tasks. if len(response) > 1: - log.warning( + log.info( f"Found more than one active {infr_type} infraction for user {user.id}; " "deactivating the extra active infractions too." ) diff --git a/bot/cogs/moderation/silence.py b/bot/cogs/moderation/silence.py new file mode 100644 index 000000000..1ef3967a9 --- /dev/null +++ b/bot/cogs/moderation/silence.py @@ -0,0 +1,159 @@ +import asyncio +import logging +from contextlib import suppress +from typing import Optional + +from discord import TextChannel +from discord.ext import commands, tasks +from discord.ext.commands import Context + +from bot.bot import Bot +from bot.constants import Channels, Emojis, Guild, MODERATION_ROLES, Roles +from bot.converters import HushDurationConverter +from bot.utils.checks import with_role_check + +log = logging.getLogger(__name__) + + +class SilenceNotifier(tasks.Loop): + """Loop notifier for posting notices to `alert_channel` containing added channels.""" + + def __init__(self, alert_channel: TextChannel): + super().__init__(self._notifier, seconds=1, minutes=0, hours=0, count=None, reconnect=True, loop=None) + self._silenced_channels = {} + self._alert_channel = alert_channel + + def add_channel(self, channel: TextChannel) -> None: + """Add channel to `_silenced_channels` and start loop if not launched.""" + if not self._silenced_channels: + self.start() + log.info("Starting notifier loop.") + self._silenced_channels[channel] = self._current_loop + + def remove_channel(self, channel: TextChannel) -> None: + """Remove channel from `_silenced_channels` and stop loop if no channels remain.""" + with suppress(KeyError): + del self._silenced_channels[channel] + if not self._silenced_channels: + self.stop() + log.info("Stopping notifier loop.") + + async def _notifier(self) -> None: + """Post notice of `_silenced_channels` with their silenced duration to `_alert_channel` periodically.""" + # Wait for 15 minutes between notices with pause at start of loop. + if self._current_loop and not self._current_loop/60 % 15: + log.debug( + f"Sending notice with channels: " + f"{', '.join(f'#{channel} ({channel.id})' for channel in self._silenced_channels)}." + ) + channels_text = ', '.join( + f"{channel.mention} for {(self._current_loop-start)//60} min" + for channel, start in self._silenced_channels.items() + ) + await self._alert_channel.send(f"<@&{Roles.moderators}> currently silenced channels: {channels_text}") + + +class Silence(commands.Cog): + """Commands for stopping channel messages for `verified` role in a channel.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.muted_channels = set() + self._get_instance_vars_task = self.bot.loop.create_task(self._get_instance_vars()) + self._get_instance_vars_event = asyncio.Event() + + async def _get_instance_vars(self) -> None: + """Get instance variables after they're available to get from the guild.""" + await self.bot.wait_until_guild_available() + guild = self.bot.get_guild(Guild.id) + self._verified_role = guild.get_role(Roles.verified) + self._mod_alerts_channel = self.bot.get_channel(Channels.mod_alerts) + self._mod_log_channel = self.bot.get_channel(Channels.mod_log) + self.notifier = SilenceNotifier(self._mod_log_channel) + self._get_instance_vars_event.set() + + @commands.command(aliases=("hush",)) + async def silence(self, ctx: Context, duration: HushDurationConverter = 10) -> None: + """ + Silence the current channel for `duration` minutes or `forever`. + + Duration is capped at 15 minutes, passing forever makes the silence indefinite. + Indefinitely silenced channels get added to a notifier which posts notices every 15 minutes from the start. + """ + await self._get_instance_vars_event.wait() + log.debug(f"{ctx.author} is silencing channel #{ctx.channel}.") + if not await self._silence(ctx.channel, persistent=(duration is None), duration=duration): + await ctx.send(f"{Emojis.cross_mark} current channel is already silenced.") + return + if duration is None: + await ctx.send(f"{Emojis.check_mark} silenced current channel indefinitely.") + return + + await ctx.send(f"{Emojis.check_mark} silenced current channel for {duration} minute(s).") + await asyncio.sleep(duration*60) + log.info(f"Unsilencing channel after set delay.") + await ctx.invoke(self.unsilence) + + @commands.command(aliases=("unhush",)) + async def unsilence(self, ctx: Context) -> None: + """ + Unsilence the current channel. + + If the channel was silenced indefinitely, notifications for the channel will stop. + """ + await self._get_instance_vars_event.wait() + log.debug(f"Unsilencing channel #{ctx.channel} from {ctx.author}'s command.") + if await self._unsilence(ctx.channel): + await ctx.send(f"{Emojis.check_mark} unsilenced current channel.") + + async def _silence(self, channel: TextChannel, persistent: bool, duration: Optional[int]) -> bool: + """ + Silence `channel` for `self._verified_role`. + + If `persistent` is `True` add `channel` to notifier. + `duration` is only used for logging; if None is passed `persistent` should be True to not log None. + Return `True` if channel permissions were changed, `False` otherwise. + """ + current_overwrite = channel.overwrites_for(self._verified_role) + if current_overwrite.send_messages is False: + log.info(f"Tried to silence channel #{channel} ({channel.id}) but the channel was already silenced.") + return False + await channel.set_permissions(self._verified_role, **dict(current_overwrite, send_messages=False)) + self.muted_channels.add(channel) + if persistent: + log.info(f"Silenced #{channel} ({channel.id}) indefinitely.") + self.notifier.add_channel(channel) + return True + + log.info(f"Silenced #{channel} ({channel.id}) for {duration} minute(s).") + return True + + async def _unsilence(self, channel: TextChannel) -> bool: + """ + Unsilence `channel`. + + Check if `channel` is silenced through a `PermissionOverwrite`, + if it is unsilence it and remove it from the notifier. + Return `True` if channel permissions were changed, `False` otherwise. + """ + current_overwrite = channel.overwrites_for(self._verified_role) + if current_overwrite.send_messages is False: + await channel.set_permissions(self._verified_role, **dict(current_overwrite, send_messages=None)) + log.info(f"Unsilenced channel #{channel} ({channel.id}).") + self.notifier.remove_channel(channel) + self.muted_channels.discard(channel) + return True + log.info(f"Tried to unsilence channel #{channel} ({channel.id}) but the channel was not silenced.") + return False + + def cog_unload(self) -> None: + """Send alert with silenced channels on unload.""" + if self.muted_channels: + channels_string = ''.join(channel.mention for channel in self.muted_channels) + message = f"<@&{Roles.moderators}> channels left silenced on cog unload: {channels_string}" + asyncio.create_task(self._mod_alerts_channel.send(message)) + + # This cannot be static (must have a __func__ attribute). + def cog_check(self, ctx: Context) -> bool: + """Only allow moderators to invoke the commands in this cog.""" + return with_role_check(ctx, *MODERATION_ROLES) diff --git a/bot/cogs/moderation/superstarify.py b/bot/cogs/moderation/superstarify.py index 893cb7f13..ca3dc4202 100644 --- a/bot/cogs/moderation/superstarify.py +++ b/bot/cogs/moderation/superstarify.py @@ -59,7 +59,7 @@ class Superstarify(InfractionScheduler, Cog): return # Nick change was triggered by this event. Ignore. log.info( - f"{after.display_name} is currently in superstar-prison. " + f"{after.display_name} ({after.id}) tried to escape superstar prison. " f"Changing the nick back to {before.display_name}." ) await after.edit( @@ -80,7 +80,7 @@ class Superstarify(InfractionScheduler, Cog): ) if not notified: - log.warning("Failed to DM user about why they cannot change their nickname.") + log.info("Failed to DM user about why they cannot change their nickname.") @Cog.listener() async def on_member_join(self, member: Member) -> None: diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py index 5052b9048..3598f3b1f 100644 --- a/bot/cogs/moderation/utils.py +++ b/bot/cogs/moderation/utils.py @@ -38,7 +38,7 @@ async def post_user(ctx: Context, user: UserSnowflake) -> t.Optional[dict]: log.trace(f"Attempting to add user {user.id} to the database.") if not isinstance(user, (discord.Member, discord.User)): - log.warning("The user being added to the DB is not a Member or User object.") + log.debug("The user being added to the DB is not a Member or User object.") payload = { 'avatar_hash': getattr(user, 'avatar', 0), diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py index 454836921..315383b12 100644 --- a/bot/cogs/snekbox.py +++ b/bot/cogs/snekbox.py @@ -301,7 +301,7 @@ class Snekbox(Cog): code = await self.continue_eval(ctx, response) if not code: break - log.info(f"Re-evaluating message {ctx.message.id}") + log.info(f"Re-evaluating code from message {ctx.message.id}:\n{code}") def predicate_eval_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> bool: diff --git a/bot/cogs/sync/syncers.py b/bot/cogs/sync/syncers.py index c7ce54d65..003bf3727 100644 --- a/bot/cogs/sync/syncers.py +++ b/bot/cogs/sync/syncers.py @@ -131,7 +131,7 @@ class Syncer(abc.ABC): await message.edit(content=f':ok_hand: {mention}{self.name} sync will proceed.') return True else: - log.warning(f"The {self.name} syncer was aborted or timed out!") + log.info(f"The {self.name} syncer was aborted or timed out!") await message.edit( content=f':warning: {mention}{self.name} sync aborted or timed out!' ) diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py index 0619296ad..3ed471bbf 100644 --- a/bot/cogs/utils.py +++ b/bot/cogs/utils.py @@ -40,6 +40,8 @@ If the implementation is easy to explain, it may be a good idea. Namespaces are one honking great idea -- let's do more of those! """ +ICON_URL = "https://www.python.org/static/opengraph-icon-200x200.png" + class Utils(Cog): """A selection of utilities which don't have a clear category.""" @@ -59,6 +61,10 @@ class Utils(Cog): await ctx.invoke(self.bot.get_command("help"), "pep") return + # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs. + if pep_number == 0: + return await self.send_pep_zero(ctx) + possible_extensions = ['.txt', '.rst'] found_pep = False for extension in possible_extensions: @@ -82,7 +88,7 @@ class Utils(Cog): description=f"[Link]({self.base_pep_url}{pep_number:04})", ) - pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png") + pep_embed.set_thumbnail(url=ICON_URL) # Add the interesting information fields_to_check = ("Status", "Python-Version", "Created", "Type") @@ -288,6 +294,19 @@ class Utils(Cog): for reaction in options: await message.add_reaction(reaction) + async def send_pep_zero(self, ctx: Context) -> None: + """Send information about PEP 0.""" + pep_embed = Embed( + title=f"**PEP 0 - Index of Python Enhancement Proposals (PEPs)**", + description=f"[Link](https://www.python.org/dev/peps/)" + ) + pep_embed.set_thumbnail(url=ICON_URL) + pep_embed.add_field(name="Status", value="Active") + pep_embed.add_field(name="Created", value="13-Jul-2000") + pep_embed.add_field(name="Type", value="Informational") + + await ctx.send(embed=pep_embed) + def setup(bot: Bot) -> None: """Load the Utils cog.""" diff --git a/bot/cogs/webhook_remover.py b/bot/cogs/webhook_remover.py new file mode 100644 index 000000000..49692113d --- /dev/null +++ b/bot/cogs/webhook_remover.py @@ -0,0 +1,72 @@ +import logging +import re + +from discord import Colour, Message +from discord.ext.commands import Cog + +from bot.bot import Bot +from bot.cogs.moderation.modlog import ModLog +from bot.constants import Channels, Colours, Event, Icons + +WEBHOOK_URL_RE = re.compile(r"((?:https?://)?discordapp\.com/api/webhooks/\d+/)\S+/?", re.I) + +ALERT_MESSAGE_TEMPLATE = ( + "{user}, looks like you posted a Discord webhook URL. Therefore, your " + "message has been removed. Your webhook may have been **compromised** so " + "please re-create the webhook **immediately**. If you believe this was " + "mistake, please let us know." +) + +log = logging.getLogger(__name__) + + +class WebhookRemover(Cog): + """Scan messages to detect Discord webhooks links.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @property + def mod_log(self) -> ModLog: + """Get current instance of `ModLog`.""" + return self.bot.get_cog("ModLog") + + async def delete_and_respond(self, msg: Message, redacted_url: str) -> None: + """Delete `msg` and send a warning that it contained the Discord webhook `redacted_url`.""" + # Don't log this, due internal delete, not by user. Will make different entry. + self.mod_log.ignore(Event.message_delete, msg.id) + await msg.delete() + await msg.channel.send(ALERT_MESSAGE_TEMPLATE.format(user=msg.author.mention)) + + message = ( + f"{msg.author} (`{msg.author.id}`) posted a Discord webhook URL " + f"to #{msg.channel}. Webhook URL was `{redacted_url}`" + ) + log.debug(message) + + # Send entry to moderation alerts. + await self.mod_log.send_log_message( + icon_url=Icons.token_removed, + colour=Colour(Colours.soft_red), + title="Discord webhook URL removed!", + text=message, + thumbnail=msg.author.avatar_url_as(static_format="png"), + channel_id=Channels.mod_alerts + ) + + @Cog.listener() + async def on_message(self, msg: Message) -> None: + """Check if a Discord webhook URL is in `message`.""" + matches = WEBHOOK_URL_RE.search(msg.content) + if matches: + await self.delete_and_respond(msg, matches[1] + "xxx") + + @Cog.listener() + async def on_message_edit(self, before: Message, after: Message) -> None: + """Check if a Discord webhook URL is in the edited message `after`.""" + await self.on_message(after) + + +def setup(bot: Bot) -> None: + """Load `WebhookRemover` cog.""" + bot.add_cog(WebhookRemover(bot)) diff --git a/bot/converters.py b/bot/converters.py index 1945e1da3..72c46fdf0 100644 --- a/bot/converters.py +++ b/bot/converters.py @@ -262,6 +262,34 @@ class ISODateTime(Converter): return dt +class HushDurationConverter(Converter): + """Convert passed duration to `int` minutes or `None`.""" + + MINUTES_RE = re.compile(r"(\d+)(?:M|m|$)") + + async def convert(self, ctx: Context, argument: str) -> t.Optional[int]: + """ + Convert `argument` to a duration that's max 15 minutes or None. + + If `"forever"` is passed, None is returned; otherwise an int of the extracted time. + Accepted formats are: + * <duration>, + * <duration>m, + * <duration>M, + * forever. + """ + if argument == "forever": + return None + match = self.MINUTES_RE.match(argument) + if not match: + raise BadArgument(f"{argument} is not a valid minutes duration.") + + duration = int(match.group(1)) + if duration > 15: + raise BadArgument("Duration must be at most 15 minutes.") + return duration + + def proxy_user(user_id: str) -> discord.Object: """ Create a proxy user object from the given id. @@ -323,7 +351,7 @@ class FetchedUser(UserConverter): except discord.HTTPException as e: # If the Discord error isn't `Unknown user`, return a proxy instead if e.code != 10013: - log.warning(f"Failed to fetch user, returning a proxy instead: status {e.status}") + log.info(f"Failed to fetch user, returning a proxy instead: status {e.status}") return proxy_user(arg) log.debug(f"Failed to fetch user {arg}: user does not exist.") diff --git a/bot/utils/messages.py b/bot/utils/messages.py index a36edc774..e969ee590 100644 --- a/bot/utils/messages.py +++ b/bot/utils/messages.py @@ -92,7 +92,7 @@ async def send_attachments( elif link_large: large.append(attachment) else: - log.warning(f"{failure_msg} because it's too large.") + log.info(f"{failure_msg} because it's too large.") except HTTPException as e: if link_large and e.status == 413: large.append(attachment) diff --git a/tests/bot/cogs/moderation/__init__.py b/tests/bot/cogs/moderation/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/bot/cogs/moderation/__init__.py diff --git a/tests/bot/cogs/moderation/test_silence.py b/tests/bot/cogs/moderation/test_silence.py new file mode 100644 index 000000000..3fd149f04 --- /dev/null +++ b/tests/bot/cogs/moderation/test_silence.py @@ -0,0 +1,251 @@ +import unittest +from unittest import mock +from unittest.mock import MagicMock, Mock + +from discord import PermissionOverwrite + +from bot.cogs.moderation.silence import Silence, SilenceNotifier +from bot.constants import Channels, Emojis, Guild, Roles +from tests.helpers import MockBot, MockContext, MockTextChannel + + +class SilenceNotifierTests(unittest.IsolatedAsyncioTestCase): + def setUp(self) -> None: + self.alert_channel = MockTextChannel() + self.notifier = SilenceNotifier(self.alert_channel) + self.notifier.stop = self.notifier_stop_mock = Mock() + self.notifier.start = self.notifier_start_mock = Mock() + + def test_add_channel_adds_channel(self): + """Channel in FirstHash with current loop is added to internal set.""" + channel = Mock() + with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels: + self.notifier.add_channel(channel) + silenced_channels.__setitem__.assert_called_with(channel, self.notifier._current_loop) + + def test_add_channel_starts_loop(self): + """Loop is started if `_silenced_channels` was empty.""" + self.notifier.add_channel(Mock()) + self.notifier_start_mock.assert_called_once() + + def test_add_channel_skips_start_with_channels(self): + """Loop start is not called when `_silenced_channels` is not empty.""" + with mock.patch.object(self.notifier, "_silenced_channels"): + self.notifier.add_channel(Mock()) + self.notifier_start_mock.assert_not_called() + + def test_remove_channel_removes_channel(self): + """Channel in FirstHash is removed from `_silenced_channels`.""" + channel = Mock() + with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels: + self.notifier.remove_channel(channel) + silenced_channels.__delitem__.assert_called_with(channel) + + def test_remove_channel_stops_loop(self): + """Notifier loop is stopped if `_silenced_channels` is empty after remove.""" + with mock.patch.object(self.notifier, "_silenced_channels", __bool__=lambda _: False): + self.notifier.remove_channel(Mock()) + self.notifier_stop_mock.assert_called_once() + + def test_remove_channel_skips_stop_with_channels(self): + """Notifier loop is not stopped if `_silenced_channels` is not empty after remove.""" + self.notifier.remove_channel(Mock()) + self.notifier_stop_mock.assert_not_called() + + async def test_notifier_private_sends_alert(self): + """Alert is sent on 15 min intervals.""" + test_cases = (900, 1800, 2700) + for current_loop in test_cases: + with self.subTest(current_loop=current_loop): + with mock.patch.object(self.notifier, "_current_loop", new=current_loop): + await self.notifier._notifier() + self.alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> currently silenced channels: ") + self.alert_channel.send.reset_mock() + + async def test_notifier_skips_alert(self): + """Alert is skipped on first loop or not an increment of 900.""" + test_cases = (0, 15, 5000) + for current_loop in test_cases: + with self.subTest(current_loop=current_loop): + with mock.patch.object(self.notifier, "_current_loop", new=current_loop): + await self.notifier._notifier() + self.alert_channel.send.assert_not_called() + + +class SilenceTests(unittest.IsolatedAsyncioTestCase): + def setUp(self) -> None: + self.bot = MockBot() + self.cog = Silence(self.bot) + self.ctx = MockContext() + self.cog._verified_role = None + # Set event so command callbacks can continue. + self.cog._get_instance_vars_event.set() + + async def test_instance_vars_got_guild(self): + """Bot got guild after it became available.""" + await self.cog._get_instance_vars() + self.bot.wait_until_guild_available.assert_called_once() + self.bot.get_guild.assert_called_once_with(Guild.id) + + async def test_instance_vars_got_role(self): + """Got `Roles.verified` role from guild.""" + await self.cog._get_instance_vars() + guild = self.bot.get_guild() + guild.get_role.assert_called_once_with(Roles.verified) + + async def test_instance_vars_got_channels(self): + """Got channels from bot.""" + await self.cog._get_instance_vars() + self.bot.get_channel.called_once_with(Channels.mod_alerts) + self.bot.get_channel.called_once_with(Channels.mod_log) + + @mock.patch("bot.cogs.moderation.silence.SilenceNotifier") + async def test_instance_vars_got_notifier(self, notifier): + """Notifier was started with channel.""" + mod_log = MockTextChannel() + self.bot.get_channel.side_effect = (None, mod_log) + await self.cog._get_instance_vars() + notifier.assert_called_once_with(mod_log) + self.bot.get_channel.side_effect = None + + async def test_silence_sent_correct_discord_message(self): + """Check if proper message was sent when called with duration in channel with previous state.""" + test_cases = ( + (0.0001, f"{Emojis.check_mark} silenced current channel for 0.0001 minute(s).", True,), + (None, f"{Emojis.check_mark} silenced current channel indefinitely.", True,), + (5, f"{Emojis.cross_mark} current channel is already silenced.", False,), + ) + for duration, result_message, _silence_patch_return in test_cases: + with self.subTest( + silence_duration=duration, + result_message=result_message, + starting_unsilenced_state=_silence_patch_return + ): + with mock.patch.object(self.cog, "_silence", return_value=_silence_patch_return): + await self.cog.silence.callback(self.cog, self.ctx, duration) + self.ctx.send.assert_called_once_with(result_message) + self.ctx.reset_mock() + + async def test_unsilence_sent_correct_discord_message(self): + """Proper reply after a successful unsilence.""" + with mock.patch.object(self.cog, "_unsilence", return_value=True): + await self.cog.unsilence.callback(self.cog, self.ctx) + self.ctx.send.assert_called_once_with(f"{Emojis.check_mark} unsilenced current channel.") + + async def test_silence_private_for_false(self): + """Permissions are not set and `False` is returned in an already silenced channel.""" + perm_overwrite = Mock(send_messages=False) + channel = Mock(overwrites_for=Mock(return_value=perm_overwrite)) + + self.assertFalse(await self.cog._silence(channel, True, None)) + channel.set_permissions.assert_not_called() + + async def test_silence_private_silenced_channel(self): + """Channel had `send_message` permissions revoked.""" + channel = MockTextChannel() + self.assertTrue(await self.cog._silence(channel, False, None)) + channel.set_permissions.assert_called_once() + self.assertFalse(channel.set_permissions.call_args.kwargs['send_messages']) + + async def test_silence_private_preserves_permissions(self): + """Previous permissions were preserved when channel was silenced.""" + channel = MockTextChannel() + # Set up mock channel permission state. + mock_permissions = PermissionOverwrite() + mock_permissions_dict = dict(mock_permissions) + channel.overwrites_for.return_value = mock_permissions + await self.cog._silence(channel, False, None) + new_permissions = channel.set_permissions.call_args.kwargs + # Remove 'send_messages' key because it got changed in the method. + del new_permissions['send_messages'] + del mock_permissions_dict['send_messages'] + self.assertDictEqual(mock_permissions_dict, new_permissions) + + async def test_silence_private_notifier(self): + """Channel should be added to notifier with `persistent` set to `True`, and the other way around.""" + channel = MockTextChannel() + with mock.patch.object(self.cog, "notifier", create=True): + with self.subTest(persistent=True): + await self.cog._silence(channel, True, None) + self.cog.notifier.add_channel.assert_called_once() + + with mock.patch.object(self.cog, "notifier", create=True): + with self.subTest(persistent=False): + await self.cog._silence(channel, False, None) + self.cog.notifier.add_channel.assert_not_called() + + async def test_silence_private_added_muted_channel(self): + """Channel was added to `muted_channels` on silence.""" + channel = MockTextChannel() + with mock.patch.object(self.cog, "muted_channels") as muted_channels: + await self.cog._silence(channel, False, None) + muted_channels.add.assert_called_once_with(channel) + + async def test_unsilence_private_for_false(self): + """Permissions are not set and `False` is returned in an unsilenced channel.""" + channel = Mock() + self.assertFalse(await self.cog._unsilence(channel)) + channel.set_permissions.assert_not_called() + + @mock.patch.object(Silence, "notifier", create=True) + async def test_unsilence_private_unsilenced_channel(self, _): + """Channel had `send_message` permissions restored""" + perm_overwrite = MagicMock(send_messages=False) + channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite)) + self.assertTrue(await self.cog._unsilence(channel)) + channel.set_permissions.assert_called_once() + self.assertIsNone(channel.set_permissions.call_args.kwargs['send_messages']) + + @mock.patch.object(Silence, "notifier", create=True) + async def test_unsilence_private_removed_notifier(self, notifier): + """Channel was removed from `notifier` on unsilence.""" + perm_overwrite = MagicMock(send_messages=False) + channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite)) + await self.cog._unsilence(channel) + notifier.remove_channel.assert_called_once_with(channel) + + @mock.patch.object(Silence, "notifier", create=True) + async def test_unsilence_private_removed_muted_channel(self, _): + """Channel was removed from `muted_channels` on unsilence.""" + perm_overwrite = MagicMock(send_messages=False) + channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite)) + with mock.patch.object(self.cog, "muted_channels") as muted_channels: + await self.cog._unsilence(channel) + muted_channels.discard.assert_called_once_with(channel) + + @mock.patch.object(Silence, "notifier", create=True) + async def test_unsilence_private_preserves_permissions(self, _): + """Previous permissions were preserved when channel was unsilenced.""" + channel = MockTextChannel() + # Set up mock channel permission state. + mock_permissions = PermissionOverwrite(send_messages=False) + mock_permissions_dict = dict(mock_permissions) + channel.overwrites_for.return_value = mock_permissions + await self.cog._unsilence(channel) + new_permissions = channel.set_permissions.call_args.kwargs + # Remove 'send_messages' key because it got changed in the method. + del new_permissions['send_messages'] + del mock_permissions_dict['send_messages'] + self.assertDictEqual(mock_permissions_dict, new_permissions) + + @mock.patch("bot.cogs.moderation.silence.asyncio") + @mock.patch.object(Silence, "_mod_alerts_channel", create=True) + def test_cog_unload_starts_task(self, alert_channel, asyncio_mock): + """Task for sending an alert was created with present `muted_channels`.""" + with mock.patch.object(self.cog, "muted_channels"): + self.cog.cog_unload() + alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> channels left silenced on cog unload: ") + asyncio_mock.create_task.assert_called_once_with(alert_channel.send()) + + @mock.patch("bot.cogs.moderation.silence.asyncio") + def test_cog_unload_skips_task_start(self, asyncio_mock): + """No task created with no channels.""" + self.cog.cog_unload() + asyncio_mock.create_task.assert_not_called() + + @mock.patch("bot.cogs.moderation.silence.with_role_check") + @mock.patch("bot.cogs.moderation.silence.MODERATION_ROLES", new=(1, 2, 3)) + def test_cog_check(self, role_check): + """Role check is called with `MODERATION_ROLES`""" + self.cog.cog_check(self.ctx) + role_check.assert_called_once_with(self.ctx, *(1, 2, 3)) diff --git a/tests/bot/test_converters.py b/tests/bot/test_converters.py index 1e5ca62ae..ca8cb6825 100644 --- a/tests/bot/test_converters.py +++ b/tests/bot/test_converters.py @@ -8,6 +8,7 @@ from discord.ext.commands import BadArgument from bot.converters import ( Duration, + HushDurationConverter, ISODateTime, TagContentConverter, TagNameConverter, @@ -271,3 +272,32 @@ class ConverterTests(unittest.TestCase): exception_message = f"`{datetime_string}` is not a valid ISO-8601 datetime string" with self.assertRaises(BadArgument, msg=exception_message): asyncio.run(converter.convert(self.context, datetime_string)) + + def test_hush_duration_converter_for_valid(self): + """HushDurationConverter returns correct value for minutes duration or `"forever"` strings.""" + test_values = ( + ("0", 0), + ("15", 15), + ("10", 10), + ("5m", 5), + ("5M", 5), + ("forever", None), + ) + converter = HushDurationConverter() + for minutes_string, expected_minutes in test_values: + with self.subTest(minutes_string=minutes_string, expected_minutes=expected_minutes): + converted = asyncio.run(converter.convert(self.context, minutes_string)) + self.assertEqual(expected_minutes, converted) + + def test_hush_duration_converter_for_invalid(self): + """HushDurationConverter raises correct exception for invalid minutes duration strings.""" + test_values = ( + ("16", "Duration must be at most 15 minutes."), + ("10d", "10d is not a valid minutes duration."), + ("-1", "-1 is not a valid minutes duration."), + ) + converter = HushDurationConverter() + for invalid_minutes_string, exception_message in test_values: + with self.subTest(invalid_minutes_string=invalid_minutes_string, exception_message=exception_message): + with self.assertRaisesRegex(BadArgument, exception_message): + asyncio.run(converter.convert(self.context, invalid_minutes_string)) |