aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Karlis S <[email protected]>2020-03-31 14:23:16 +0300
committerGravatar GitHub <[email protected]>2020-03-31 14:23:16 +0300
commitad63ac99d36e308ca84ea1d236c0969c319c2ce4 (patch)
tree706d4037b49dc9bab34b26997edd2dc10136fb41
parentMerge branch 'master' into zen-match-fix (diff)
parentMerge pull request #845 from python-discord/update-logging-levels (diff)
Merge branch 'master' into zen-match-fix
Diffstat (limited to '')
-rw-r--r--bot/__main__.py1
-rw-r--r--bot/cogs/alias.py2
-rw-r--r--bot/cogs/bot.py1
-rw-r--r--bot/cogs/error_handler.py35
-rw-r--r--bot/cogs/moderation/__init__.py4
-rw-r--r--bot/cogs/moderation/scheduler.py2
-rw-r--r--bot/cogs/moderation/silence.py159
-rw-r--r--bot/cogs/moderation/superstarify.py4
-rw-r--r--bot/cogs/moderation/utils.py2
-rw-r--r--bot/cogs/snekbox.py2
-rw-r--r--bot/cogs/sync/syncers.py2
-rw-r--r--bot/cogs/utils.py21
-rw-r--r--bot/cogs/webhook_remover.py72
-rw-r--r--bot/converters.py30
-rw-r--r--bot/utils/messages.py2
-rw-r--r--tests/bot/cogs/moderation/__init__.py0
-rw-r--r--tests/bot/cogs/moderation/test_silence.py251
-rw-r--r--tests/bot/test_converters.py30
18 files changed, 606 insertions, 14 deletions
diff --git a/bot/__main__.py b/bot/__main__.py
index 3df477a6d..8c3ae02e3 100644
--- a/bot/__main__.py
+++ b/bot/__main__.py
@@ -63,6 +63,7 @@ bot.load_extension("bot.cogs.tags")
bot.load_extension("bot.cogs.token_remover")
bot.load_extension("bot.cogs.utils")
bot.load_extension("bot.cogs.watchchannels")
+bot.load_extension("bot.cogs.webhook_remover")
bot.load_extension("bot.cogs.wolfram")
# Apply `message_edited_at` patch if discord.py did not yet release a bug fix.
diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py
index 9001e18f0..55c7efe65 100644
--- a/bot/cogs/alias.py
+++ b/bot/cogs/alias.py
@@ -29,7 +29,7 @@ class Alias (Cog):
return log.info(f'Did not find command "{cmd_name}" to invoke.')
elif not await cmd.can_run(ctx):
return log.info(
- f'{str(ctx.author)} tried to run the command "{cmd_name}"'
+ f'{str(ctx.author)} tried to run the command "{cmd_name}" but lacks permission.'
)
await ctx.invoke(cmd, *args, **kwargs)
diff --git a/bot/cogs/bot.py b/bot/cogs/bot.py
index f17135877..e897b30ff 100644
--- a/bot/cogs/bot.py
+++ b/bot/cogs/bot.py
@@ -67,7 +67,6 @@ class BotCog(Cog, name="Bot"):
icon_url=URLs.bot_avatar
)
- log.info(f"{ctx.author} called !about. Returning information about the bot.")
await ctx.send(embed=embed)
@command(name='echo', aliases=('print',))
diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py
index 261769efc..6a622d2ce 100644
--- a/bot/cogs/error_handler.py
+++ b/bot/cogs/error_handler.py
@@ -31,7 +31,9 @@ class ErrorHandler(Cog):
Error handling emits a single error message in the invoking context `ctx` and a log message,
prioritised as follows:
- 1. If the name fails to match a command but matches a tag, the tag is invoked
+ 1. If the name fails to match a command:
+ * If it matches shh+ or unshh+, the channel is silenced or unsilenced respectively.
+ Otherwise if it matches a tag, the tag is invoked
* If CommandNotFound is raised when invoking the tag (determined by the presence of the
`invoked_from_error_handler` attribute), this error is treated as being unexpected
and therefore sends an error message
@@ -48,9 +50,11 @@ class ErrorHandler(Cog):
log.trace(f"Command {command} had its error already handled locally; ignoring.")
return
- # Try to look for a tag with the command's name if the command isn't found.
if isinstance(e, errors.CommandNotFound) and not hasattr(ctx, "invoked_from_error_handler"):
+ if await self.try_silence(ctx):
+ return
if ctx.channel.id != Channels.verification:
+ # Try to look for a tag with the command's name
await self.try_get_tag(ctx)
return # Exit early to avoid logging.
elif isinstance(e, errors.UserInputError):
@@ -89,6 +93,33 @@ class ErrorHandler(Cog):
else:
return self.bot.get_command("help")
+ async def try_silence(self, ctx: Context) -> bool:
+ """
+ Attempt to invoke the silence or unsilence command if invoke with matches a pattern.
+
+ Respecting the checks if:
+ * invoked with `shh+` silence channel for amount of h's*2 with max of 15.
+ * invoked with `unshh+` unsilence channel
+ Return bool depending on success of command.
+ """
+ command = ctx.invoked_with.lower()
+ silence_command = self.bot.get_command("silence")
+ ctx.invoked_from_error_handler = True
+ try:
+ if not await silence_command.can_run(ctx):
+ log.debug("Cancelling attempt to invoke silence/unsilence due to failed checks.")
+ return False
+ except errors.CommandError:
+ log.debug("Cancelling attempt to invoke silence/unsilence due to failed checks.")
+ return False
+ if command.startswith("shh"):
+ await ctx.invoke(silence_command, duration=min(command.count("h")*2, 15))
+ return True
+ elif command.startswith("unshh"):
+ await ctx.invoke(self.bot.get_command("unsilence"))
+ return True
+ return False
+
async def try_get_tag(self, ctx: Context) -> None:
"""
Attempt to display a tag by interpreting the command name as a tag name.
diff --git a/bot/cogs/moderation/__init__.py b/bot/cogs/moderation/__init__.py
index 5243cb92d..6880ca1bd 100644
--- a/bot/cogs/moderation/__init__.py
+++ b/bot/cogs/moderation/__init__.py
@@ -2,12 +2,14 @@ from bot.bot import Bot
from .infractions import Infractions
from .management import ModManagement
from .modlog import ModLog
+from .silence import Silence
from .superstarify import Superstarify
def setup(bot: Bot) -> None:
- """Load the Infractions, ModManagement, ModLog, and Superstarify cogs."""
+ """Load the Infractions, ModManagement, ModLog, Silence, and Superstarify cogs."""
bot.add_cog(Infractions(bot))
bot.add_cog(ModLog(bot))
bot.add_cog(ModManagement(bot))
+ bot.add_cog(Silence(bot))
bot.add_cog(Superstarify(bot))
diff --git a/bot/cogs/moderation/scheduler.py b/bot/cogs/moderation/scheduler.py
index f0b6b2c48..917697be9 100644
--- a/bot/cogs/moderation/scheduler.py
+++ b/bot/cogs/moderation/scheduler.py
@@ -222,7 +222,7 @@ class InfractionScheduler(Scheduler):
# If multiple active infractions were found, mark them as inactive in the database
# and cancel their expiration tasks.
if len(response) > 1:
- log.warning(
+ log.info(
f"Found more than one active {infr_type} infraction for user {user.id}; "
"deactivating the extra active infractions too."
)
diff --git a/bot/cogs/moderation/silence.py b/bot/cogs/moderation/silence.py
new file mode 100644
index 000000000..1ef3967a9
--- /dev/null
+++ b/bot/cogs/moderation/silence.py
@@ -0,0 +1,159 @@
+import asyncio
+import logging
+from contextlib import suppress
+from typing import Optional
+
+from discord import TextChannel
+from discord.ext import commands, tasks
+from discord.ext.commands import Context
+
+from bot.bot import Bot
+from bot.constants import Channels, Emojis, Guild, MODERATION_ROLES, Roles
+from bot.converters import HushDurationConverter
+from bot.utils.checks import with_role_check
+
+log = logging.getLogger(__name__)
+
+
+class SilenceNotifier(tasks.Loop):
+ """Loop notifier for posting notices to `alert_channel` containing added channels."""
+
+ def __init__(self, alert_channel: TextChannel):
+ super().__init__(self._notifier, seconds=1, minutes=0, hours=0, count=None, reconnect=True, loop=None)
+ self._silenced_channels = {}
+ self._alert_channel = alert_channel
+
+ def add_channel(self, channel: TextChannel) -> None:
+ """Add channel to `_silenced_channels` and start loop if not launched."""
+ if not self._silenced_channels:
+ self.start()
+ log.info("Starting notifier loop.")
+ self._silenced_channels[channel] = self._current_loop
+
+ def remove_channel(self, channel: TextChannel) -> None:
+ """Remove channel from `_silenced_channels` and stop loop if no channels remain."""
+ with suppress(KeyError):
+ del self._silenced_channels[channel]
+ if not self._silenced_channels:
+ self.stop()
+ log.info("Stopping notifier loop.")
+
+ async def _notifier(self) -> None:
+ """Post notice of `_silenced_channels` with their silenced duration to `_alert_channel` periodically."""
+ # Wait for 15 minutes between notices with pause at start of loop.
+ if self._current_loop and not self._current_loop/60 % 15:
+ log.debug(
+ f"Sending notice with channels: "
+ f"{', '.join(f'#{channel} ({channel.id})' for channel in self._silenced_channels)}."
+ )
+ channels_text = ', '.join(
+ f"{channel.mention} for {(self._current_loop-start)//60} min"
+ for channel, start in self._silenced_channels.items()
+ )
+ await self._alert_channel.send(f"<@&{Roles.moderators}> currently silenced channels: {channels_text}")
+
+
+class Silence(commands.Cog):
+ """Commands for stopping channel messages for `verified` role in a channel."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.muted_channels = set()
+ self._get_instance_vars_task = self.bot.loop.create_task(self._get_instance_vars())
+ self._get_instance_vars_event = asyncio.Event()
+
+ async def _get_instance_vars(self) -> None:
+ """Get instance variables after they're available to get from the guild."""
+ await self.bot.wait_until_guild_available()
+ guild = self.bot.get_guild(Guild.id)
+ self._verified_role = guild.get_role(Roles.verified)
+ self._mod_alerts_channel = self.bot.get_channel(Channels.mod_alerts)
+ self._mod_log_channel = self.bot.get_channel(Channels.mod_log)
+ self.notifier = SilenceNotifier(self._mod_log_channel)
+ self._get_instance_vars_event.set()
+
+ @commands.command(aliases=("hush",))
+ async def silence(self, ctx: Context, duration: HushDurationConverter = 10) -> None:
+ """
+ Silence the current channel for `duration` minutes or `forever`.
+
+ Duration is capped at 15 minutes, passing forever makes the silence indefinite.
+ Indefinitely silenced channels get added to a notifier which posts notices every 15 minutes from the start.
+ """
+ await self._get_instance_vars_event.wait()
+ log.debug(f"{ctx.author} is silencing channel #{ctx.channel}.")
+ if not await self._silence(ctx.channel, persistent=(duration is None), duration=duration):
+ await ctx.send(f"{Emojis.cross_mark} current channel is already silenced.")
+ return
+ if duration is None:
+ await ctx.send(f"{Emojis.check_mark} silenced current channel indefinitely.")
+ return
+
+ await ctx.send(f"{Emojis.check_mark} silenced current channel for {duration} minute(s).")
+ await asyncio.sleep(duration*60)
+ log.info(f"Unsilencing channel after set delay.")
+ await ctx.invoke(self.unsilence)
+
+ @commands.command(aliases=("unhush",))
+ async def unsilence(self, ctx: Context) -> None:
+ """
+ Unsilence the current channel.
+
+ If the channel was silenced indefinitely, notifications for the channel will stop.
+ """
+ await self._get_instance_vars_event.wait()
+ log.debug(f"Unsilencing channel #{ctx.channel} from {ctx.author}'s command.")
+ if await self._unsilence(ctx.channel):
+ await ctx.send(f"{Emojis.check_mark} unsilenced current channel.")
+
+ async def _silence(self, channel: TextChannel, persistent: bool, duration: Optional[int]) -> bool:
+ """
+ Silence `channel` for `self._verified_role`.
+
+ If `persistent` is `True` add `channel` to notifier.
+ `duration` is only used for logging; if None is passed `persistent` should be True to not log None.
+ Return `True` if channel permissions were changed, `False` otherwise.
+ """
+ current_overwrite = channel.overwrites_for(self._verified_role)
+ if current_overwrite.send_messages is False:
+ log.info(f"Tried to silence channel #{channel} ({channel.id}) but the channel was already silenced.")
+ return False
+ await channel.set_permissions(self._verified_role, **dict(current_overwrite, send_messages=False))
+ self.muted_channels.add(channel)
+ if persistent:
+ log.info(f"Silenced #{channel} ({channel.id}) indefinitely.")
+ self.notifier.add_channel(channel)
+ return True
+
+ log.info(f"Silenced #{channel} ({channel.id}) for {duration} minute(s).")
+ return True
+
+ async def _unsilence(self, channel: TextChannel) -> bool:
+ """
+ Unsilence `channel`.
+
+ Check if `channel` is silenced through a `PermissionOverwrite`,
+ if it is unsilence it and remove it from the notifier.
+ Return `True` if channel permissions were changed, `False` otherwise.
+ """
+ current_overwrite = channel.overwrites_for(self._verified_role)
+ if current_overwrite.send_messages is False:
+ await channel.set_permissions(self._verified_role, **dict(current_overwrite, send_messages=None))
+ log.info(f"Unsilenced channel #{channel} ({channel.id}).")
+ self.notifier.remove_channel(channel)
+ self.muted_channels.discard(channel)
+ return True
+ log.info(f"Tried to unsilence channel #{channel} ({channel.id}) but the channel was not silenced.")
+ return False
+
+ def cog_unload(self) -> None:
+ """Send alert with silenced channels on unload."""
+ if self.muted_channels:
+ channels_string = ''.join(channel.mention for channel in self.muted_channels)
+ message = f"<@&{Roles.moderators}> channels left silenced on cog unload: {channels_string}"
+ asyncio.create_task(self._mod_alerts_channel.send(message))
+
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return with_role_check(ctx, *MODERATION_ROLES)
diff --git a/bot/cogs/moderation/superstarify.py b/bot/cogs/moderation/superstarify.py
index 893cb7f13..ca3dc4202 100644
--- a/bot/cogs/moderation/superstarify.py
+++ b/bot/cogs/moderation/superstarify.py
@@ -59,7 +59,7 @@ class Superstarify(InfractionScheduler, Cog):
return # Nick change was triggered by this event. Ignore.
log.info(
- f"{after.display_name} is currently in superstar-prison. "
+ f"{after.display_name} ({after.id}) tried to escape superstar prison. "
f"Changing the nick back to {before.display_name}."
)
await after.edit(
@@ -80,7 +80,7 @@ class Superstarify(InfractionScheduler, Cog):
)
if not notified:
- log.warning("Failed to DM user about why they cannot change their nickname.")
+ log.info("Failed to DM user about why they cannot change their nickname.")
@Cog.listener()
async def on_member_join(self, member: Member) -> None:
diff --git a/bot/cogs/moderation/utils.py b/bot/cogs/moderation/utils.py
index 5052b9048..3598f3b1f 100644
--- a/bot/cogs/moderation/utils.py
+++ b/bot/cogs/moderation/utils.py
@@ -38,7 +38,7 @@ async def post_user(ctx: Context, user: UserSnowflake) -> t.Optional[dict]:
log.trace(f"Attempting to add user {user.id} to the database.")
if not isinstance(user, (discord.Member, discord.User)):
- log.warning("The user being added to the DB is not a Member or User object.")
+ log.debug("The user being added to the DB is not a Member or User object.")
payload = {
'avatar_hash': getattr(user, 'avatar', 0),
diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py
index 454836921..315383b12 100644
--- a/bot/cogs/snekbox.py
+++ b/bot/cogs/snekbox.py
@@ -301,7 +301,7 @@ class Snekbox(Cog):
code = await self.continue_eval(ctx, response)
if not code:
break
- log.info(f"Re-evaluating message {ctx.message.id}")
+ log.info(f"Re-evaluating code from message {ctx.message.id}:\n{code}")
def predicate_eval_message_edit(ctx: Context, old_msg: Message, new_msg: Message) -> bool:
diff --git a/bot/cogs/sync/syncers.py b/bot/cogs/sync/syncers.py
index c7ce54d65..003bf3727 100644
--- a/bot/cogs/sync/syncers.py
+++ b/bot/cogs/sync/syncers.py
@@ -131,7 +131,7 @@ class Syncer(abc.ABC):
await message.edit(content=f':ok_hand: {mention}{self.name} sync will proceed.')
return True
else:
- log.warning(f"The {self.name} syncer was aborted or timed out!")
+ log.info(f"The {self.name} syncer was aborted or timed out!")
await message.edit(
content=f':warning: {mention}{self.name} sync aborted or timed out!'
)
diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py
index 0619296ad..3ed471bbf 100644
--- a/bot/cogs/utils.py
+++ b/bot/cogs/utils.py
@@ -40,6 +40,8 @@ If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
"""
+ICON_URL = "https://www.python.org/static/opengraph-icon-200x200.png"
+
class Utils(Cog):
"""A selection of utilities which don't have a clear category."""
@@ -59,6 +61,10 @@ class Utils(Cog):
await ctx.invoke(self.bot.get_command("help"), "pep")
return
+ # Handle PEP 0 directly because it's not in .rst or .txt so it can't be accessed like other PEPs.
+ if pep_number == 0:
+ return await self.send_pep_zero(ctx)
+
possible_extensions = ['.txt', '.rst']
found_pep = False
for extension in possible_extensions:
@@ -82,7 +88,7 @@ class Utils(Cog):
description=f"[Link]({self.base_pep_url}{pep_number:04})",
)
- pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png")
+ pep_embed.set_thumbnail(url=ICON_URL)
# Add the interesting information
fields_to_check = ("Status", "Python-Version", "Created", "Type")
@@ -288,6 +294,19 @@ class Utils(Cog):
for reaction in options:
await message.add_reaction(reaction)
+ async def send_pep_zero(self, ctx: Context) -> None:
+ """Send information about PEP 0."""
+ pep_embed = Embed(
+ title=f"**PEP 0 - Index of Python Enhancement Proposals (PEPs)**",
+ description=f"[Link](https://www.python.org/dev/peps/)"
+ )
+ pep_embed.set_thumbnail(url=ICON_URL)
+ pep_embed.add_field(name="Status", value="Active")
+ pep_embed.add_field(name="Created", value="13-Jul-2000")
+ pep_embed.add_field(name="Type", value="Informational")
+
+ await ctx.send(embed=pep_embed)
+
def setup(bot: Bot) -> None:
"""Load the Utils cog."""
diff --git a/bot/cogs/webhook_remover.py b/bot/cogs/webhook_remover.py
new file mode 100644
index 000000000..49692113d
--- /dev/null
+++ b/bot/cogs/webhook_remover.py
@@ -0,0 +1,72 @@
+import logging
+import re
+
+from discord import Colour, Message
+from discord.ext.commands import Cog
+
+from bot.bot import Bot
+from bot.cogs.moderation.modlog import ModLog
+from bot.constants import Channels, Colours, Event, Icons
+
+WEBHOOK_URL_RE = re.compile(r"((?:https?://)?discordapp\.com/api/webhooks/\d+/)\S+/?", re.I)
+
+ALERT_MESSAGE_TEMPLATE = (
+ "{user}, looks like you posted a Discord webhook URL. Therefore, your "
+ "message has been removed. Your webhook may have been **compromised** so "
+ "please re-create the webhook **immediately**. If you believe this was "
+ "mistake, please let us know."
+)
+
+log = logging.getLogger(__name__)
+
+
+class WebhookRemover(Cog):
+ """Scan messages to detect Discord webhooks links."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @property
+ def mod_log(self) -> ModLog:
+ """Get current instance of `ModLog`."""
+ return self.bot.get_cog("ModLog")
+
+ async def delete_and_respond(self, msg: Message, redacted_url: str) -> None:
+ """Delete `msg` and send a warning that it contained the Discord webhook `redacted_url`."""
+ # Don't log this, due internal delete, not by user. Will make different entry.
+ self.mod_log.ignore(Event.message_delete, msg.id)
+ await msg.delete()
+ await msg.channel.send(ALERT_MESSAGE_TEMPLATE.format(user=msg.author.mention))
+
+ message = (
+ f"{msg.author} (`{msg.author.id}`) posted a Discord webhook URL "
+ f"to #{msg.channel}. Webhook URL was `{redacted_url}`"
+ )
+ log.debug(message)
+
+ # Send entry to moderation alerts.
+ await self.mod_log.send_log_message(
+ icon_url=Icons.token_removed,
+ colour=Colour(Colours.soft_red),
+ title="Discord webhook URL removed!",
+ text=message,
+ thumbnail=msg.author.avatar_url_as(static_format="png"),
+ channel_id=Channels.mod_alerts
+ )
+
+ @Cog.listener()
+ async def on_message(self, msg: Message) -> None:
+ """Check if a Discord webhook URL is in `message`."""
+ matches = WEBHOOK_URL_RE.search(msg.content)
+ if matches:
+ await self.delete_and_respond(msg, matches[1] + "xxx")
+
+ @Cog.listener()
+ async def on_message_edit(self, before: Message, after: Message) -> None:
+ """Check if a Discord webhook URL is in the edited message `after`."""
+ await self.on_message(after)
+
+
+def setup(bot: Bot) -> None:
+ """Load `WebhookRemover` cog."""
+ bot.add_cog(WebhookRemover(bot))
diff --git a/bot/converters.py b/bot/converters.py
index 1945e1da3..72c46fdf0 100644
--- a/bot/converters.py
+++ b/bot/converters.py
@@ -262,6 +262,34 @@ class ISODateTime(Converter):
return dt
+class HushDurationConverter(Converter):
+ """Convert passed duration to `int` minutes or `None`."""
+
+ MINUTES_RE = re.compile(r"(\d+)(?:M|m|$)")
+
+ async def convert(self, ctx: Context, argument: str) -> t.Optional[int]:
+ """
+ Convert `argument` to a duration that's max 15 minutes or None.
+
+ If `"forever"` is passed, None is returned; otherwise an int of the extracted time.
+ Accepted formats are:
+ * <duration>,
+ * <duration>m,
+ * <duration>M,
+ * forever.
+ """
+ if argument == "forever":
+ return None
+ match = self.MINUTES_RE.match(argument)
+ if not match:
+ raise BadArgument(f"{argument} is not a valid minutes duration.")
+
+ duration = int(match.group(1))
+ if duration > 15:
+ raise BadArgument("Duration must be at most 15 minutes.")
+ return duration
+
+
def proxy_user(user_id: str) -> discord.Object:
"""
Create a proxy user object from the given id.
@@ -323,7 +351,7 @@ class FetchedUser(UserConverter):
except discord.HTTPException as e:
# If the Discord error isn't `Unknown user`, return a proxy instead
if e.code != 10013:
- log.warning(f"Failed to fetch user, returning a proxy instead: status {e.status}")
+ log.info(f"Failed to fetch user, returning a proxy instead: status {e.status}")
return proxy_user(arg)
log.debug(f"Failed to fetch user {arg}: user does not exist.")
diff --git a/bot/utils/messages.py b/bot/utils/messages.py
index a36edc774..e969ee590 100644
--- a/bot/utils/messages.py
+++ b/bot/utils/messages.py
@@ -92,7 +92,7 @@ async def send_attachments(
elif link_large:
large.append(attachment)
else:
- log.warning(f"{failure_msg} because it's too large.")
+ log.info(f"{failure_msg} because it's too large.")
except HTTPException as e:
if link_large and e.status == 413:
large.append(attachment)
diff --git a/tests/bot/cogs/moderation/__init__.py b/tests/bot/cogs/moderation/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/bot/cogs/moderation/__init__.py
diff --git a/tests/bot/cogs/moderation/test_silence.py b/tests/bot/cogs/moderation/test_silence.py
new file mode 100644
index 000000000..3fd149f04
--- /dev/null
+++ b/tests/bot/cogs/moderation/test_silence.py
@@ -0,0 +1,251 @@
+import unittest
+from unittest import mock
+from unittest.mock import MagicMock, Mock
+
+from discord import PermissionOverwrite
+
+from bot.cogs.moderation.silence import Silence, SilenceNotifier
+from bot.constants import Channels, Emojis, Guild, Roles
+from tests.helpers import MockBot, MockContext, MockTextChannel
+
+
+class SilenceNotifierTests(unittest.IsolatedAsyncioTestCase):
+ def setUp(self) -> None:
+ self.alert_channel = MockTextChannel()
+ self.notifier = SilenceNotifier(self.alert_channel)
+ self.notifier.stop = self.notifier_stop_mock = Mock()
+ self.notifier.start = self.notifier_start_mock = Mock()
+
+ def test_add_channel_adds_channel(self):
+ """Channel in FirstHash with current loop is added to internal set."""
+ channel = Mock()
+ with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels:
+ self.notifier.add_channel(channel)
+ silenced_channels.__setitem__.assert_called_with(channel, self.notifier._current_loop)
+
+ def test_add_channel_starts_loop(self):
+ """Loop is started if `_silenced_channels` was empty."""
+ self.notifier.add_channel(Mock())
+ self.notifier_start_mock.assert_called_once()
+
+ def test_add_channel_skips_start_with_channels(self):
+ """Loop start is not called when `_silenced_channels` is not empty."""
+ with mock.patch.object(self.notifier, "_silenced_channels"):
+ self.notifier.add_channel(Mock())
+ self.notifier_start_mock.assert_not_called()
+
+ def test_remove_channel_removes_channel(self):
+ """Channel in FirstHash is removed from `_silenced_channels`."""
+ channel = Mock()
+ with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels:
+ self.notifier.remove_channel(channel)
+ silenced_channels.__delitem__.assert_called_with(channel)
+
+ def test_remove_channel_stops_loop(self):
+ """Notifier loop is stopped if `_silenced_channels` is empty after remove."""
+ with mock.patch.object(self.notifier, "_silenced_channels", __bool__=lambda _: False):
+ self.notifier.remove_channel(Mock())
+ self.notifier_stop_mock.assert_called_once()
+
+ def test_remove_channel_skips_stop_with_channels(self):
+ """Notifier loop is not stopped if `_silenced_channels` is not empty after remove."""
+ self.notifier.remove_channel(Mock())
+ self.notifier_stop_mock.assert_not_called()
+
+ async def test_notifier_private_sends_alert(self):
+ """Alert is sent on 15 min intervals."""
+ test_cases = (900, 1800, 2700)
+ for current_loop in test_cases:
+ with self.subTest(current_loop=current_loop):
+ with mock.patch.object(self.notifier, "_current_loop", new=current_loop):
+ await self.notifier._notifier()
+ self.alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> currently silenced channels: ")
+ self.alert_channel.send.reset_mock()
+
+ async def test_notifier_skips_alert(self):
+ """Alert is skipped on first loop or not an increment of 900."""
+ test_cases = (0, 15, 5000)
+ for current_loop in test_cases:
+ with self.subTest(current_loop=current_loop):
+ with mock.patch.object(self.notifier, "_current_loop", new=current_loop):
+ await self.notifier._notifier()
+ self.alert_channel.send.assert_not_called()
+
+
+class SilenceTests(unittest.IsolatedAsyncioTestCase):
+ def setUp(self) -> None:
+ self.bot = MockBot()
+ self.cog = Silence(self.bot)
+ self.ctx = MockContext()
+ self.cog._verified_role = None
+ # Set event so command callbacks can continue.
+ self.cog._get_instance_vars_event.set()
+
+ async def test_instance_vars_got_guild(self):
+ """Bot got guild after it became available."""
+ await self.cog._get_instance_vars()
+ self.bot.wait_until_guild_available.assert_called_once()
+ self.bot.get_guild.assert_called_once_with(Guild.id)
+
+ async def test_instance_vars_got_role(self):
+ """Got `Roles.verified` role from guild."""
+ await self.cog._get_instance_vars()
+ guild = self.bot.get_guild()
+ guild.get_role.assert_called_once_with(Roles.verified)
+
+ async def test_instance_vars_got_channels(self):
+ """Got channels from bot."""
+ await self.cog._get_instance_vars()
+ self.bot.get_channel.called_once_with(Channels.mod_alerts)
+ self.bot.get_channel.called_once_with(Channels.mod_log)
+
+ @mock.patch("bot.cogs.moderation.silence.SilenceNotifier")
+ async def test_instance_vars_got_notifier(self, notifier):
+ """Notifier was started with channel."""
+ mod_log = MockTextChannel()
+ self.bot.get_channel.side_effect = (None, mod_log)
+ await self.cog._get_instance_vars()
+ notifier.assert_called_once_with(mod_log)
+ self.bot.get_channel.side_effect = None
+
+ async def test_silence_sent_correct_discord_message(self):
+ """Check if proper message was sent when called with duration in channel with previous state."""
+ test_cases = (
+ (0.0001, f"{Emojis.check_mark} silenced current channel for 0.0001 minute(s).", True,),
+ (None, f"{Emojis.check_mark} silenced current channel indefinitely.", True,),
+ (5, f"{Emojis.cross_mark} current channel is already silenced.", False,),
+ )
+ for duration, result_message, _silence_patch_return in test_cases:
+ with self.subTest(
+ silence_duration=duration,
+ result_message=result_message,
+ starting_unsilenced_state=_silence_patch_return
+ ):
+ with mock.patch.object(self.cog, "_silence", return_value=_silence_patch_return):
+ await self.cog.silence.callback(self.cog, self.ctx, duration)
+ self.ctx.send.assert_called_once_with(result_message)
+ self.ctx.reset_mock()
+
+ async def test_unsilence_sent_correct_discord_message(self):
+ """Proper reply after a successful unsilence."""
+ with mock.patch.object(self.cog, "_unsilence", return_value=True):
+ await self.cog.unsilence.callback(self.cog, self.ctx)
+ self.ctx.send.assert_called_once_with(f"{Emojis.check_mark} unsilenced current channel.")
+
+ async def test_silence_private_for_false(self):
+ """Permissions are not set and `False` is returned in an already silenced channel."""
+ perm_overwrite = Mock(send_messages=False)
+ channel = Mock(overwrites_for=Mock(return_value=perm_overwrite))
+
+ self.assertFalse(await self.cog._silence(channel, True, None))
+ channel.set_permissions.assert_not_called()
+
+ async def test_silence_private_silenced_channel(self):
+ """Channel had `send_message` permissions revoked."""
+ channel = MockTextChannel()
+ self.assertTrue(await self.cog._silence(channel, False, None))
+ channel.set_permissions.assert_called_once()
+ self.assertFalse(channel.set_permissions.call_args.kwargs['send_messages'])
+
+ async def test_silence_private_preserves_permissions(self):
+ """Previous permissions were preserved when channel was silenced."""
+ channel = MockTextChannel()
+ # Set up mock channel permission state.
+ mock_permissions = PermissionOverwrite()
+ mock_permissions_dict = dict(mock_permissions)
+ channel.overwrites_for.return_value = mock_permissions
+ await self.cog._silence(channel, False, None)
+ new_permissions = channel.set_permissions.call_args.kwargs
+ # Remove 'send_messages' key because it got changed in the method.
+ del new_permissions['send_messages']
+ del mock_permissions_dict['send_messages']
+ self.assertDictEqual(mock_permissions_dict, new_permissions)
+
+ async def test_silence_private_notifier(self):
+ """Channel should be added to notifier with `persistent` set to `True`, and the other way around."""
+ channel = MockTextChannel()
+ with mock.patch.object(self.cog, "notifier", create=True):
+ with self.subTest(persistent=True):
+ await self.cog._silence(channel, True, None)
+ self.cog.notifier.add_channel.assert_called_once()
+
+ with mock.patch.object(self.cog, "notifier", create=True):
+ with self.subTest(persistent=False):
+ await self.cog._silence(channel, False, None)
+ self.cog.notifier.add_channel.assert_not_called()
+
+ async def test_silence_private_added_muted_channel(self):
+ """Channel was added to `muted_channels` on silence."""
+ channel = MockTextChannel()
+ with mock.patch.object(self.cog, "muted_channels") as muted_channels:
+ await self.cog._silence(channel, False, None)
+ muted_channels.add.assert_called_once_with(channel)
+
+ async def test_unsilence_private_for_false(self):
+ """Permissions are not set and `False` is returned in an unsilenced channel."""
+ channel = Mock()
+ self.assertFalse(await self.cog._unsilence(channel))
+ channel.set_permissions.assert_not_called()
+
+ @mock.patch.object(Silence, "notifier", create=True)
+ async def test_unsilence_private_unsilenced_channel(self, _):
+ """Channel had `send_message` permissions restored"""
+ perm_overwrite = MagicMock(send_messages=False)
+ channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite))
+ self.assertTrue(await self.cog._unsilence(channel))
+ channel.set_permissions.assert_called_once()
+ self.assertIsNone(channel.set_permissions.call_args.kwargs['send_messages'])
+
+ @mock.patch.object(Silence, "notifier", create=True)
+ async def test_unsilence_private_removed_notifier(self, notifier):
+ """Channel was removed from `notifier` on unsilence."""
+ perm_overwrite = MagicMock(send_messages=False)
+ channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite))
+ await self.cog._unsilence(channel)
+ notifier.remove_channel.assert_called_once_with(channel)
+
+ @mock.patch.object(Silence, "notifier", create=True)
+ async def test_unsilence_private_removed_muted_channel(self, _):
+ """Channel was removed from `muted_channels` on unsilence."""
+ perm_overwrite = MagicMock(send_messages=False)
+ channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite))
+ with mock.patch.object(self.cog, "muted_channels") as muted_channels:
+ await self.cog._unsilence(channel)
+ muted_channels.discard.assert_called_once_with(channel)
+
+ @mock.patch.object(Silence, "notifier", create=True)
+ async def test_unsilence_private_preserves_permissions(self, _):
+ """Previous permissions were preserved when channel was unsilenced."""
+ channel = MockTextChannel()
+ # Set up mock channel permission state.
+ mock_permissions = PermissionOverwrite(send_messages=False)
+ mock_permissions_dict = dict(mock_permissions)
+ channel.overwrites_for.return_value = mock_permissions
+ await self.cog._unsilence(channel)
+ new_permissions = channel.set_permissions.call_args.kwargs
+ # Remove 'send_messages' key because it got changed in the method.
+ del new_permissions['send_messages']
+ del mock_permissions_dict['send_messages']
+ self.assertDictEqual(mock_permissions_dict, new_permissions)
+
+ @mock.patch("bot.cogs.moderation.silence.asyncio")
+ @mock.patch.object(Silence, "_mod_alerts_channel", create=True)
+ def test_cog_unload_starts_task(self, alert_channel, asyncio_mock):
+ """Task for sending an alert was created with present `muted_channels`."""
+ with mock.patch.object(self.cog, "muted_channels"):
+ self.cog.cog_unload()
+ alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> channels left silenced on cog unload: ")
+ asyncio_mock.create_task.assert_called_once_with(alert_channel.send())
+
+ @mock.patch("bot.cogs.moderation.silence.asyncio")
+ def test_cog_unload_skips_task_start(self, asyncio_mock):
+ """No task created with no channels."""
+ self.cog.cog_unload()
+ asyncio_mock.create_task.assert_not_called()
+
+ @mock.patch("bot.cogs.moderation.silence.with_role_check")
+ @mock.patch("bot.cogs.moderation.silence.MODERATION_ROLES", new=(1, 2, 3))
+ def test_cog_check(self, role_check):
+ """Role check is called with `MODERATION_ROLES`"""
+ self.cog.cog_check(self.ctx)
+ role_check.assert_called_once_with(self.ctx, *(1, 2, 3))
diff --git a/tests/bot/test_converters.py b/tests/bot/test_converters.py
index 1e5ca62ae..ca8cb6825 100644
--- a/tests/bot/test_converters.py
+++ b/tests/bot/test_converters.py
@@ -8,6 +8,7 @@ from discord.ext.commands import BadArgument
from bot.converters import (
Duration,
+ HushDurationConverter,
ISODateTime,
TagContentConverter,
TagNameConverter,
@@ -271,3 +272,32 @@ class ConverterTests(unittest.TestCase):
exception_message = f"`{datetime_string}` is not a valid ISO-8601 datetime string"
with self.assertRaises(BadArgument, msg=exception_message):
asyncio.run(converter.convert(self.context, datetime_string))
+
+ def test_hush_duration_converter_for_valid(self):
+ """HushDurationConverter returns correct value for minutes duration or `"forever"` strings."""
+ test_values = (
+ ("0", 0),
+ ("15", 15),
+ ("10", 10),
+ ("5m", 5),
+ ("5M", 5),
+ ("forever", None),
+ )
+ converter = HushDurationConverter()
+ for minutes_string, expected_minutes in test_values:
+ with self.subTest(minutes_string=minutes_string, expected_minutes=expected_minutes):
+ converted = asyncio.run(converter.convert(self.context, minutes_string))
+ self.assertEqual(expected_minutes, converted)
+
+ def test_hush_duration_converter_for_invalid(self):
+ """HushDurationConverter raises correct exception for invalid minutes duration strings."""
+ test_values = (
+ ("16", "Duration must be at most 15 minutes."),
+ ("10d", "10d is not a valid minutes duration."),
+ ("-1", "-1 is not a valid minutes duration."),
+ )
+ converter = HushDurationConverter()
+ for invalid_minutes_string, exception_message in test_values:
+ with self.subTest(invalid_minutes_string=invalid_minutes_string, exception_message=exception_message):
+ with self.assertRaisesRegex(BadArgument, exception_message):
+ asyncio.run(converter.convert(self.context, invalid_minutes_string))