aboutsummaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorGravatar Andi Qu <[email protected]>2020-10-27 15:23:38 +0200
committerGravatar Andi Qu <[email protected]>2020-10-27 15:23:38 +0200
commit420e4fda56e4d8bb3335c36cb0af08f084e0957b (patch)
treec0697ac3beb344eeeeb7ff953c078c9ee49e80c9 /tests
parentMerge branch 'master' of https://github.com/dolphingarlic/bot (diff)
parentPR #1248: Set async-rediscache logging level to warning (diff)
Merge branch 'master' of https://github.com/python-discord/bot into python-discord-master
Diffstat (limited to 'tests')
-rw-r--r--tests/_autospec.py64
-rw-r--r--tests/bot/cogs/moderation/test_infractions.py55
-rw-r--r--tests/bot/cogs/moderation/test_silence.py261
-rw-r--r--tests/bot/cogs/sync/test_base.py404
-rw-r--r--tests/bot/cogs/test_duck_pond.py575
-rw-r--r--tests/bot/exts/__init__.py (renamed from tests/bot/cogs/__init__.py)0
-rw-r--r--tests/bot/exts/backend/__init__.py (renamed from tests/bot/cogs/moderation/__init__.py)0
-rw-r--r--tests/bot/exts/backend/sync/__init__.py (renamed from tests/bot/cogs/sync/__init__.py)0
-rw-r--r--tests/bot/exts/backend/sync/test_base.py73
-rw-r--r--tests/bot/exts/backend/sync/test_cog.py (renamed from tests/bot/cogs/sync/test_cog.py)21
-rw-r--r--tests/bot/exts/backend/sync/test_roles.py (renamed from tests/bot/cogs/sync/test_roles.py)2
-rw-r--r--tests/bot/exts/backend/sync/test_users.py (renamed from tests/bot/cogs/sync/test_users.py)120
-rw-r--r--tests/bot/exts/backend/test_logging.py (renamed from tests/bot/cogs/test_logging.py)6
-rw-r--r--tests/bot/exts/filters/__init__.py (renamed from tests/bot/patches/__init__.py)0
-rw-r--r--tests/bot/exts/filters/test_antimalware.py (renamed from tests/bot/cogs/test_antimalware.py)48
-rw-r--r--tests/bot/exts/filters/test_antispam.py (renamed from tests/bot/cogs/test_antispam.py)2
-rw-r--r--tests/bot/exts/filters/test_security.py (renamed from tests/bot/cogs/test_security.py)2
-rw-r--r--tests/bot/exts/filters/test_token_remover.py (renamed from tests/bot/cogs/test_token_remover.py)176
-rw-r--r--tests/bot/exts/info/__init__.py0
-rw-r--r--tests/bot/exts/info/test_information.py (renamed from tests/bot/cogs/test_information.py)296
-rw-r--r--tests/bot/exts/moderation/__init__.py0
-rw-r--r--tests/bot/exts/moderation/infraction/__init__.py0
-rw-r--r--tests/bot/exts/moderation/infraction/test_infractions.py201
-rw-r--r--tests/bot/exts/moderation/infraction/test_utils.py359
-rw-r--r--tests/bot/exts/moderation/test_incidents.py770
-rw-r--r--tests/bot/exts/moderation/test_modlog.py (renamed from tests/bot/cogs/moderation/test_modlog.py)2
-rw-r--r--tests/bot/exts/moderation/test_silence.py502
-rw-r--r--tests/bot/exts/moderation/test_slowmode.py113
-rw-r--r--tests/bot/exts/test_cogs.py (renamed from tests/bot/cogs/test_cogs.py)8
-rw-r--r--tests/bot/exts/utils/__init__.py0
-rw-r--r--tests/bot/exts/utils/test_jams.py173
-rw-r--r--tests/bot/exts/utils/test_snekbox.py (renamed from tests/bot/cogs/test_snekbox.py)93
-rw-r--r--tests/bot/test_pagination.py15
-rw-r--r--tests/bot/utils/test_checks.py44
-rw-r--r--tests/bot/utils/test_redis_cache.py265
-rw-r--r--tests/bot/utils/test_services.py74
-rw-r--r--tests/helpers.py27
37 files changed, 2806 insertions, 1945 deletions
diff --git a/tests/_autospec.py b/tests/_autospec.py
new file mode 100644
index 000000000..ee2fc1973
--- /dev/null
+++ b/tests/_autospec.py
@@ -0,0 +1,64 @@
+import contextlib
+import functools
+import unittest.mock
+from typing import Callable
+
+
[email protected](unittest.mock._patch.decoration_helper)
+def _decoration_helper(self, patched, args, keywargs):
+ """Skips adding patchings as args if their `dont_pass` attribute is True."""
+ # Don't ask what this does. It's just a copy from stdlib, but with the dont_pass check added.
+ extra_args = []
+ with contextlib.ExitStack() as exit_stack:
+ for patching in patched.patchings:
+ arg = exit_stack.enter_context(patching)
+ if not getattr(patching, "dont_pass", False):
+ # Only add the patching as an arg if dont_pass is False.
+ if patching.attribute_name is not None:
+ keywargs.update(arg)
+ elif patching.new is unittest.mock.DEFAULT:
+ extra_args.append(arg)
+
+ args += tuple(extra_args)
+ yield args, keywargs
+
+
[email protected](unittest.mock._patch.copy)
+def _copy(self):
+ """Copy the `dont_pass` attribute along with the standard copy operation."""
+ patcher_copy = _copy.original(self)
+ patcher_copy.dont_pass = getattr(self, "dont_pass", False)
+ return patcher_copy
+
+
+# Monkey-patch the patcher class :)
+_copy.original = unittest.mock._patch.copy
+unittest.mock._patch.copy = _copy
+unittest.mock._patch.decoration_helper = _decoration_helper
+
+
+def autospec(target, *attributes: str, pass_mocks: bool = True, **patch_kwargs) -> Callable:
+ """
+ Patch multiple `attributes` of a `target` with autospecced mocks and `spec_set` as True.
+
+ If `pass_mocks` is True, pass the autospecced mocks as arguments to the decorated object.
+ """
+ # Caller's kwargs should take priority and overwrite the defaults.
+ kwargs = dict(spec_set=True, autospec=True)
+ kwargs.update(patch_kwargs)
+
+ # Import the target if it's a string.
+ # This is to support both object and string targets like patch.multiple.
+ if type(target) is str:
+ target = unittest.mock._importer(target)
+
+ def decorator(func):
+ for attribute in attributes:
+ patcher = unittest.mock.patch.object(target, attribute, **kwargs)
+ if not pass_mocks:
+ # A custom attribute to keep track of which patchings should be skipped.
+ patcher.dont_pass = True
+ func = patcher(func)
+ return func
+ return decorator
diff --git a/tests/bot/cogs/moderation/test_infractions.py b/tests/bot/cogs/moderation/test_infractions.py
deleted file mode 100644
index da4e92ccc..000000000
--- a/tests/bot/cogs/moderation/test_infractions.py
+++ /dev/null
@@ -1,55 +0,0 @@
-import textwrap
-import unittest
-from unittest.mock import AsyncMock, Mock, patch
-
-from bot.cogs.moderation.infractions import Infractions
-from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole
-
-
-class TruncationTests(unittest.IsolatedAsyncioTestCase):
- """Tests for ban and kick command reason truncation."""
-
- def setUp(self):
- self.bot = MockBot()
- self.cog = Infractions(self.bot)
- self.user = MockMember(id=1234, top_role=MockRole(id=3577, position=10))
- self.target = MockMember(id=1265, top_role=MockRole(id=9876, position=0))
- self.guild = MockGuild(id=4567)
- self.ctx = MockContext(bot=self.bot, author=self.user, guild=self.guild)
-
- @patch("bot.cogs.moderation.utils.get_active_infraction")
- @patch("bot.cogs.moderation.utils.post_infraction")
- async def test_apply_ban_reason_truncation(self, post_infraction_mock, get_active_mock):
- """Should truncate reason for `ctx.guild.ban`."""
- get_active_mock.return_value = None
- post_infraction_mock.return_value = {"foo": "bar"}
-
- self.cog.apply_infraction = AsyncMock()
- self.bot.get_cog.return_value = AsyncMock()
- self.cog.mod_log.ignore = Mock()
- self.ctx.guild.ban = Mock()
-
- await self.cog.apply_ban(self.ctx, self.target, "foo bar" * 3000)
- self.ctx.guild.ban.assert_called_once_with(
- self.target,
- reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="..."),
- delete_message_days=0
- )
- self.cog.apply_infraction.assert_awaited_once_with(
- self.ctx, {"foo": "bar"}, self.target, self.ctx.guild.ban.return_value
- )
-
- @patch("bot.cogs.moderation.utils.post_infraction")
- async def test_apply_kick_reason_truncation(self, post_infraction_mock):
- """Should truncate reason for `Member.kick`."""
- post_infraction_mock.return_value = {"foo": "bar"}
-
- self.cog.apply_infraction = AsyncMock()
- self.cog.mod_log.ignore = Mock()
- self.target.kick = Mock()
-
- await self.cog.apply_kick(self.ctx, self.target, "foo bar" * 3000)
- self.target.kick.assert_called_once_with(reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="..."))
- self.cog.apply_infraction.assert_awaited_once_with(
- self.ctx, {"foo": "bar"}, self.target, self.target.kick.return_value
- )
diff --git a/tests/bot/cogs/moderation/test_silence.py b/tests/bot/cogs/moderation/test_silence.py
deleted file mode 100644
index ab3d0742a..000000000
--- a/tests/bot/cogs/moderation/test_silence.py
+++ /dev/null
@@ -1,261 +0,0 @@
-import unittest
-from unittest import mock
-from unittest.mock import MagicMock, Mock
-
-from discord import PermissionOverwrite
-
-from bot.cogs.moderation.silence import Silence, SilenceNotifier
-from bot.constants import Channels, Emojis, Guild, Roles
-from tests.helpers import MockBot, MockContext, MockTextChannel
-
-
-class SilenceNotifierTests(unittest.IsolatedAsyncioTestCase):
- def setUp(self) -> None:
- self.alert_channel = MockTextChannel()
- self.notifier = SilenceNotifier(self.alert_channel)
- self.notifier.stop = self.notifier_stop_mock = Mock()
- self.notifier.start = self.notifier_start_mock = Mock()
-
- def test_add_channel_adds_channel(self):
- """Channel in FirstHash with current loop is added to internal set."""
- channel = Mock()
- with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels:
- self.notifier.add_channel(channel)
- silenced_channels.__setitem__.assert_called_with(channel, self.notifier._current_loop)
-
- def test_add_channel_starts_loop(self):
- """Loop is started if `_silenced_channels` was empty."""
- self.notifier.add_channel(Mock())
- self.notifier_start_mock.assert_called_once()
-
- def test_add_channel_skips_start_with_channels(self):
- """Loop start is not called when `_silenced_channels` is not empty."""
- with mock.patch.object(self.notifier, "_silenced_channels"):
- self.notifier.add_channel(Mock())
- self.notifier_start_mock.assert_not_called()
-
- def test_remove_channel_removes_channel(self):
- """Channel in FirstHash is removed from `_silenced_channels`."""
- channel = Mock()
- with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels:
- self.notifier.remove_channel(channel)
- silenced_channels.__delitem__.assert_called_with(channel)
-
- def test_remove_channel_stops_loop(self):
- """Notifier loop is stopped if `_silenced_channels` is empty after remove."""
- with mock.patch.object(self.notifier, "_silenced_channels", __bool__=lambda _: False):
- self.notifier.remove_channel(Mock())
- self.notifier_stop_mock.assert_called_once()
-
- def test_remove_channel_skips_stop_with_channels(self):
- """Notifier loop is not stopped if `_silenced_channels` is not empty after remove."""
- self.notifier.remove_channel(Mock())
- self.notifier_stop_mock.assert_not_called()
-
- async def test_notifier_private_sends_alert(self):
- """Alert is sent on 15 min intervals."""
- test_cases = (900, 1800, 2700)
- for current_loop in test_cases:
- with self.subTest(current_loop=current_loop):
- with mock.patch.object(self.notifier, "_current_loop", new=current_loop):
- await self.notifier._notifier()
- self.alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> currently silenced channels: ")
- self.alert_channel.send.reset_mock()
-
- async def test_notifier_skips_alert(self):
- """Alert is skipped on first loop or not an increment of 900."""
- test_cases = (0, 15, 5000)
- for current_loop in test_cases:
- with self.subTest(current_loop=current_loop):
- with mock.patch.object(self.notifier, "_current_loop", new=current_loop):
- await self.notifier._notifier()
- self.alert_channel.send.assert_not_called()
-
-
-class SilenceTests(unittest.IsolatedAsyncioTestCase):
- def setUp(self) -> None:
- self.bot = MockBot()
- self.cog = Silence(self.bot)
- self.ctx = MockContext()
- self.cog._verified_role = None
- # Set event so command callbacks can continue.
- self.cog._get_instance_vars_event.set()
-
- async def test_instance_vars_got_guild(self):
- """Bot got guild after it became available."""
- await self.cog._get_instance_vars()
- self.bot.wait_until_guild_available.assert_called_once()
- self.bot.get_guild.assert_called_once_with(Guild.id)
-
- async def test_instance_vars_got_role(self):
- """Got `Roles.verified` role from guild."""
- await self.cog._get_instance_vars()
- guild = self.bot.get_guild()
- guild.get_role.assert_called_once_with(Roles.verified)
-
- async def test_instance_vars_got_channels(self):
- """Got channels from bot."""
- await self.cog._get_instance_vars()
- self.bot.get_channel.called_once_with(Channels.mod_alerts)
- self.bot.get_channel.called_once_with(Channels.mod_log)
-
- @mock.patch("bot.cogs.moderation.silence.SilenceNotifier")
- async def test_instance_vars_got_notifier(self, notifier):
- """Notifier was started with channel."""
- mod_log = MockTextChannel()
- self.bot.get_channel.side_effect = (None, mod_log)
- await self.cog._get_instance_vars()
- notifier.assert_called_once_with(mod_log)
- self.bot.get_channel.side_effect = None
-
- async def test_silence_sent_correct_discord_message(self):
- """Check if proper message was sent when called with duration in channel with previous state."""
- test_cases = (
- (0.0001, f"{Emojis.check_mark} silenced current channel for 0.0001 minute(s).", True,),
- (None, f"{Emojis.check_mark} silenced current channel indefinitely.", True,),
- (5, f"{Emojis.cross_mark} current channel is already silenced.", False,),
- )
- for duration, result_message, _silence_patch_return in test_cases:
- with self.subTest(
- silence_duration=duration,
- result_message=result_message,
- starting_unsilenced_state=_silence_patch_return
- ):
- with mock.patch.object(self.cog, "_silence", return_value=_silence_patch_return):
- await self.cog.silence.callback(self.cog, self.ctx, duration)
- self.ctx.send.assert_called_once_with(result_message)
- self.ctx.reset_mock()
-
- async def test_unsilence_sent_correct_discord_message(self):
- """Check if proper message was sent when unsilencing channel."""
- test_cases = (
- (True, f"{Emojis.check_mark} unsilenced current channel."),
- (False, f"{Emojis.cross_mark} current channel was not silenced.")
- )
- for _unsilence_patch_return, result_message in test_cases:
- with self.subTest(
- starting_silenced_state=_unsilence_patch_return,
- result_message=result_message
- ):
- with mock.patch.object(self.cog, "_unsilence", return_value=_unsilence_patch_return):
- await self.cog.unsilence.callback(self.cog, self.ctx)
- self.ctx.send.assert_called_once_with(result_message)
- self.ctx.reset_mock()
-
- async def test_silence_private_for_false(self):
- """Permissions are not set and `False` is returned in an already silenced channel."""
- perm_overwrite = Mock(send_messages=False)
- channel = Mock(overwrites_for=Mock(return_value=perm_overwrite))
-
- self.assertFalse(await self.cog._silence(channel, True, None))
- channel.set_permissions.assert_not_called()
-
- async def test_silence_private_silenced_channel(self):
- """Channel had `send_message` permissions revoked."""
- channel = MockTextChannel()
- self.assertTrue(await self.cog._silence(channel, False, None))
- channel.set_permissions.assert_called_once()
- self.assertFalse(channel.set_permissions.call_args.kwargs['send_messages'])
-
- async def test_silence_private_preserves_permissions(self):
- """Previous permissions were preserved when channel was silenced."""
- channel = MockTextChannel()
- # Set up mock channel permission state.
- mock_permissions = PermissionOverwrite()
- mock_permissions_dict = dict(mock_permissions)
- channel.overwrites_for.return_value = mock_permissions
- await self.cog._silence(channel, False, None)
- new_permissions = channel.set_permissions.call_args.kwargs
- # Remove 'send_messages' key because it got changed in the method.
- del new_permissions['send_messages']
- del mock_permissions_dict['send_messages']
- self.assertDictEqual(mock_permissions_dict, new_permissions)
-
- async def test_silence_private_notifier(self):
- """Channel should be added to notifier with `persistent` set to `True`, and the other way around."""
- channel = MockTextChannel()
- with mock.patch.object(self.cog, "notifier", create=True):
- with self.subTest(persistent=True):
- await self.cog._silence(channel, True, None)
- self.cog.notifier.add_channel.assert_called_once()
-
- with mock.patch.object(self.cog, "notifier", create=True):
- with self.subTest(persistent=False):
- await self.cog._silence(channel, False, None)
- self.cog.notifier.add_channel.assert_not_called()
-
- async def test_silence_private_added_muted_channel(self):
- """Channel was added to `muted_channels` on silence."""
- channel = MockTextChannel()
- with mock.patch.object(self.cog, "muted_channels") as muted_channels:
- await self.cog._silence(channel, False, None)
- muted_channels.add.assert_called_once_with(channel)
-
- async def test_unsilence_private_for_false(self):
- """Permissions are not set and `False` is returned in an unsilenced channel."""
- channel = Mock()
- self.assertFalse(await self.cog._unsilence(channel))
- channel.set_permissions.assert_not_called()
-
- @mock.patch.object(Silence, "notifier", create=True)
- async def test_unsilence_private_unsilenced_channel(self, _):
- """Channel had `send_message` permissions restored"""
- perm_overwrite = MagicMock(send_messages=False)
- channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite))
- self.assertTrue(await self.cog._unsilence(channel))
- channel.set_permissions.assert_called_once()
- self.assertIsNone(channel.set_permissions.call_args.kwargs['send_messages'])
-
- @mock.patch.object(Silence, "notifier", create=True)
- async def test_unsilence_private_removed_notifier(self, notifier):
- """Channel was removed from `notifier` on unsilence."""
- perm_overwrite = MagicMock(send_messages=False)
- channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite))
- await self.cog._unsilence(channel)
- notifier.remove_channel.assert_called_once_with(channel)
-
- @mock.patch.object(Silence, "notifier", create=True)
- async def test_unsilence_private_removed_muted_channel(self, _):
- """Channel was removed from `muted_channels` on unsilence."""
- perm_overwrite = MagicMock(send_messages=False)
- channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite))
- with mock.patch.object(self.cog, "muted_channels") as muted_channels:
- await self.cog._unsilence(channel)
- muted_channels.discard.assert_called_once_with(channel)
-
- @mock.patch.object(Silence, "notifier", create=True)
- async def test_unsilence_private_preserves_permissions(self, _):
- """Previous permissions were preserved when channel was unsilenced."""
- channel = MockTextChannel()
- # Set up mock channel permission state.
- mock_permissions = PermissionOverwrite(send_messages=False)
- mock_permissions_dict = dict(mock_permissions)
- channel.overwrites_for.return_value = mock_permissions
- await self.cog._unsilence(channel)
- new_permissions = channel.set_permissions.call_args.kwargs
- # Remove 'send_messages' key because it got changed in the method.
- del new_permissions['send_messages']
- del mock_permissions_dict['send_messages']
- self.assertDictEqual(mock_permissions_dict, new_permissions)
-
- @mock.patch("bot.cogs.moderation.silence.asyncio")
- @mock.patch.object(Silence, "_mod_alerts_channel", create=True)
- def test_cog_unload_starts_task(self, alert_channel, asyncio_mock):
- """Task for sending an alert was created with present `muted_channels`."""
- with mock.patch.object(self.cog, "muted_channels"):
- self.cog.cog_unload()
- alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> channels left silenced on cog unload: ")
- asyncio_mock.create_task.assert_called_once_with(alert_channel.send())
-
- @mock.patch("bot.cogs.moderation.silence.asyncio")
- def test_cog_unload_skips_task_start(self, asyncio_mock):
- """No task created with no channels."""
- self.cog.cog_unload()
- asyncio_mock.create_task.assert_not_called()
-
- @mock.patch("bot.cogs.moderation.silence.with_role_check")
- @mock.patch("bot.cogs.moderation.silence.MODERATION_ROLES", new=(1, 2, 3))
- def test_cog_check(self, role_check):
- """Role check is called with `MODERATION_ROLES`"""
- self.cog.cog_check(self.ctx)
- role_check.assert_called_once_with(self.ctx, *(1, 2, 3))
diff --git a/tests/bot/cogs/sync/test_base.py b/tests/bot/cogs/sync/test_base.py
deleted file mode 100644
index 70aea2bab..000000000
--- a/tests/bot/cogs/sync/test_base.py
+++ /dev/null
@@ -1,404 +0,0 @@
-import asyncio
-import unittest
-from unittest import mock
-
-import discord
-
-from bot import constants
-from bot.api import ResponseCodeError
-from bot.cogs.sync.syncers import Syncer, _Diff
-from tests import helpers
-
-
-class TestSyncer(Syncer):
- """Syncer subclass with mocks for abstract methods for testing purposes."""
-
- name = "test"
- _get_diff = mock.AsyncMock()
- _sync = mock.AsyncMock()
-
-
-class SyncerBaseTests(unittest.TestCase):
- """Tests for the syncer base class."""
-
- def setUp(self):
- self.bot = helpers.MockBot()
-
- def test_instantiation_fails_without_abstract_methods(self):
- """The class must have abstract methods implemented."""
- with self.assertRaisesRegex(TypeError, "Can't instantiate abstract class"):
- Syncer(self.bot)
-
-
-class SyncerSendPromptTests(unittest.IsolatedAsyncioTestCase):
- """Tests for sending the sync confirmation prompt."""
-
- def setUp(self):
- self.bot = helpers.MockBot()
- self.syncer = TestSyncer(self.bot)
-
- def mock_get_channel(self):
- """Fixture to return a mock channel and message for when `get_channel` is used."""
- self.bot.reset_mock()
-
- mock_channel = helpers.MockTextChannel()
- mock_message = helpers.MockMessage()
-
- mock_channel.send.return_value = mock_message
- self.bot.get_channel.return_value = mock_channel
-
- return mock_channel, mock_message
-
- def mock_fetch_channel(self):
- """Fixture to return a mock channel and message for when `fetch_channel` is used."""
- self.bot.reset_mock()
-
- mock_channel = helpers.MockTextChannel()
- mock_message = helpers.MockMessage()
-
- self.bot.get_channel.return_value = None
- mock_channel.send.return_value = mock_message
- self.bot.fetch_channel.return_value = mock_channel
-
- return mock_channel, mock_message
-
- async def test_send_prompt_edits_and_returns_message(self):
- """The given message should be edited to display the prompt and then should be returned."""
- msg = helpers.MockMessage()
- ret_val = await self.syncer._send_prompt(msg)
-
- msg.edit.assert_called_once()
- self.assertIn("content", msg.edit.call_args[1])
- self.assertEqual(ret_val, msg)
-
- async def test_send_prompt_gets_dev_core_channel(self):
- """The dev-core channel should be retrieved if an extant message isn't given."""
- subtests = (
- (self.bot.get_channel, self.mock_get_channel),
- (self.bot.fetch_channel, self.mock_fetch_channel),
- )
-
- for method, mock_ in subtests:
- with self.subTest(method=method, msg=mock_.__name__):
- mock_()
- await self.syncer._send_prompt()
-
- method.assert_called_once_with(constants.Channels.dev_core)
-
- async def test_send_prompt_returns_none_if_channel_fetch_fails(self):
- """None should be returned if there's an HTTPException when fetching the channel."""
- self.bot.get_channel.return_value = None
- self.bot.fetch_channel.side_effect = discord.HTTPException(mock.MagicMock(), "test error!")
-
- ret_val = await self.syncer._send_prompt()
-
- self.assertIsNone(ret_val)
-
- async def test_send_prompt_sends_and_returns_new_message_if_not_given(self):
- """A new message mentioning core devs should be sent and returned if message isn't given."""
- for mock_ in (self.mock_get_channel, self.mock_fetch_channel):
- with self.subTest(msg=mock_.__name__):
- mock_channel, mock_message = mock_()
- ret_val = await self.syncer._send_prompt()
-
- mock_channel.send.assert_called_once()
- self.assertIn(self.syncer._CORE_DEV_MENTION, mock_channel.send.call_args[0][0])
- self.assertEqual(ret_val, mock_message)
-
- async def test_send_prompt_adds_reactions(self):
- """The message should have reactions for confirmation added."""
- extant_message = helpers.MockMessage()
- subtests = (
- (extant_message, lambda: (None, extant_message)),
- (None, self.mock_get_channel),
- (None, self.mock_fetch_channel),
- )
-
- for message_arg, mock_ in subtests:
- subtest_msg = "Extant message" if mock_.__name__ == "<lambda>" else mock_.__name__
-
- with self.subTest(msg=subtest_msg):
- _, mock_message = mock_()
- await self.syncer._send_prompt(message_arg)
-
- calls = [mock.call(emoji) for emoji in self.syncer._REACTION_EMOJIS]
- mock_message.add_reaction.assert_has_calls(calls)
-
-
-class SyncerConfirmationTests(unittest.IsolatedAsyncioTestCase):
- """Tests for waiting for a sync confirmation reaction on the prompt."""
-
- def setUp(self):
- self.bot = helpers.MockBot()
- self.syncer = TestSyncer(self.bot)
- self.core_dev_role = helpers.MockRole(id=constants.Roles.core_developers)
-
- @staticmethod
- def get_message_reaction(emoji):
- """Fixture to return a mock message an reaction from the given `emoji`."""
- message = helpers.MockMessage()
- reaction = helpers.MockReaction(emoji=emoji, message=message)
-
- return message, reaction
-
- def test_reaction_check_for_valid_emoji_and_authors(self):
- """Should return True if authors are identical or are a bot and a core dev, respectively."""
- user_subtests = (
- (
- helpers.MockMember(id=77),
- helpers.MockMember(id=77),
- "identical users",
- ),
- (
- helpers.MockMember(id=77, bot=True),
- helpers.MockMember(id=43, roles=[self.core_dev_role]),
- "bot author and core-dev reactor",
- ),
- )
-
- for emoji in self.syncer._REACTION_EMOJIS:
- for author, user, msg in user_subtests:
- with self.subTest(author=author, user=user, emoji=emoji, msg=msg):
- message, reaction = self.get_message_reaction(emoji)
- ret_val = self.syncer._reaction_check(author, message, reaction, user)
-
- self.assertTrue(ret_val)
-
- def test_reaction_check_for_invalid_reactions(self):
- """Should return False for invalid reaction events."""
- valid_emoji = self.syncer._REACTION_EMOJIS[0]
- subtests = (
- (
- helpers.MockMember(id=77),
- *self.get_message_reaction(valid_emoji),
- helpers.MockMember(id=43, roles=[self.core_dev_role]),
- "users are not identical",
- ),
- (
- helpers.MockMember(id=77, bot=True),
- *self.get_message_reaction(valid_emoji),
- helpers.MockMember(id=43),
- "reactor lacks the core-dev role",
- ),
- (
- helpers.MockMember(id=77, bot=True, roles=[self.core_dev_role]),
- *self.get_message_reaction(valid_emoji),
- helpers.MockMember(id=77, bot=True, roles=[self.core_dev_role]),
- "reactor is a bot",
- ),
- (
- helpers.MockMember(id=77),
- helpers.MockMessage(id=95),
- helpers.MockReaction(emoji=valid_emoji, message=helpers.MockMessage(id=26)),
- helpers.MockMember(id=77),
- "messages are not identical",
- ),
- (
- helpers.MockMember(id=77),
- *self.get_message_reaction("InVaLiD"),
- helpers.MockMember(id=77),
- "emoji is invalid",
- ),
- )
-
- for *args, msg in subtests:
- kwargs = dict(zip(("author", "message", "reaction", "user"), args))
- with self.subTest(**kwargs, msg=msg):
- ret_val = self.syncer._reaction_check(*args)
- self.assertFalse(ret_val)
-
- async def test_wait_for_confirmation(self):
- """The message should always be edited and only return True if the emoji is a check mark."""
- subtests = (
- (constants.Emojis.check_mark, True, None),
- ("InVaLiD", False, None),
- (None, False, asyncio.TimeoutError),
- )
-
- for emoji, ret_val, side_effect in subtests:
- for bot in (True, False):
- with self.subTest(emoji=emoji, ret_val=ret_val, side_effect=side_effect, bot=bot):
- # Set up mocks
- message = helpers.MockMessage()
- member = helpers.MockMember(bot=bot)
-
- self.bot.wait_for.reset_mock()
- self.bot.wait_for.return_value = (helpers.MockReaction(emoji=emoji), None)
- self.bot.wait_for.side_effect = side_effect
-
- # Call the function
- actual_return = await self.syncer._wait_for_confirmation(member, message)
-
- # Perform assertions
- self.bot.wait_for.assert_called_once()
- self.assertIn("reaction_add", self.bot.wait_for.call_args[0])
-
- message.edit.assert_called_once()
- kwargs = message.edit.call_args[1]
- self.assertIn("content", kwargs)
-
- # Core devs should only be mentioned if the author is a bot.
- if bot:
- self.assertIn(self.syncer._CORE_DEV_MENTION, kwargs["content"])
- else:
- self.assertNotIn(self.syncer._CORE_DEV_MENTION, kwargs["content"])
-
- self.assertIs(actual_return, ret_val)
-
-
-class SyncerSyncTests(unittest.IsolatedAsyncioTestCase):
- """Tests for main function orchestrating the sync."""
-
- def setUp(self):
- self.bot = helpers.MockBot(user=helpers.MockMember(bot=True))
- self.syncer = TestSyncer(self.bot)
-
- async def test_sync_respects_confirmation_result(self):
- """The sync should abort if confirmation fails and continue if confirmed."""
- mock_message = helpers.MockMessage()
- subtests = (
- (True, mock_message),
- (False, None),
- )
-
- for confirmed, message in subtests:
- with self.subTest(confirmed=confirmed):
- self.syncer._sync.reset_mock()
- self.syncer._get_diff.reset_mock()
-
- diff = _Diff({1, 2, 3}, {4, 5}, None)
- self.syncer._get_diff.return_value = diff
- self.syncer._get_confirmation_result = mock.AsyncMock(
- return_value=(confirmed, message)
- )
-
- guild = helpers.MockGuild()
- await self.syncer.sync(guild)
-
- self.syncer._get_diff.assert_called_once_with(guild)
- self.syncer._get_confirmation_result.assert_called_once()
-
- if confirmed:
- self.syncer._sync.assert_called_once_with(diff)
- else:
- self.syncer._sync.assert_not_called()
-
- async def test_sync_diff_size(self):
- """The diff size should be correctly calculated."""
- subtests = (
- (6, _Diff({1, 2}, {3, 4}, {5, 6})),
- (5, _Diff({1, 2, 3}, None, {4, 5})),
- (0, _Diff(None, None, None)),
- (0, _Diff(set(), set(), set())),
- )
-
- for size, diff in subtests:
- with self.subTest(size=size, diff=diff):
- self.syncer._get_diff.reset_mock()
- self.syncer._get_diff.return_value = diff
- self.syncer._get_confirmation_result = mock.AsyncMock(return_value=(False, None))
-
- guild = helpers.MockGuild()
- await self.syncer.sync(guild)
-
- self.syncer._get_diff.assert_called_once_with(guild)
- self.syncer._get_confirmation_result.assert_called_once()
- self.assertEqual(self.syncer._get_confirmation_result.call_args[0][0], size)
-
- async def test_sync_message_edited(self):
- """The message should be edited if one was sent, even if the sync has an API error."""
- subtests = (
- (None, None, False),
- (helpers.MockMessage(), None, True),
- (helpers.MockMessage(), ResponseCodeError(mock.MagicMock()), True),
- )
-
- for message, side_effect, should_edit in subtests:
- with self.subTest(message=message, side_effect=side_effect, should_edit=should_edit):
- self.syncer._sync.side_effect = side_effect
- self.syncer._get_confirmation_result = mock.AsyncMock(
- return_value=(True, message)
- )
-
- guild = helpers.MockGuild()
- await self.syncer.sync(guild)
-
- if should_edit:
- message.edit.assert_called_once()
- self.assertIn("content", message.edit.call_args[1])
-
- async def test_sync_confirmation_context_redirect(self):
- """If ctx is given, a new message should be sent and author should be ctx's author."""
- mock_member = helpers.MockMember()
- subtests = (
- (None, self.bot.user, None),
- (helpers.MockContext(author=mock_member), mock_member, helpers.MockMessage()),
- )
-
- for ctx, author, message in subtests:
- with self.subTest(ctx=ctx, author=author, message=message):
- if ctx is not None:
- ctx.send.return_value = message
-
- # Make sure `_get_diff` returns a MagicMock, not an AsyncMock
- self.syncer._get_diff.return_value = mock.MagicMock()
-
- self.syncer._get_confirmation_result = mock.AsyncMock(return_value=(False, None))
-
- guild = helpers.MockGuild()
- await self.syncer.sync(guild, ctx)
-
- if ctx is not None:
- ctx.send.assert_called_once()
-
- self.syncer._get_confirmation_result.assert_called_once()
- self.assertEqual(self.syncer._get_confirmation_result.call_args[0][1], author)
- self.assertEqual(self.syncer._get_confirmation_result.call_args[0][2], message)
-
- @mock.patch.object(constants.Sync, "max_diff", new=3)
- async def test_confirmation_result_small_diff(self):
- """Should always return True and the given message if the diff size is too small."""
- author = helpers.MockMember()
- expected_message = helpers.MockMessage()
-
- for size in (3, 2): # pragma: no cover
- with self.subTest(size=size):
- self.syncer._send_prompt = mock.AsyncMock()
- self.syncer._wait_for_confirmation = mock.AsyncMock()
-
- coro = self.syncer._get_confirmation_result(size, author, expected_message)
- result, actual_message = await coro
-
- self.assertTrue(result)
- self.assertEqual(actual_message, expected_message)
- self.syncer._send_prompt.assert_not_called()
- self.syncer._wait_for_confirmation.assert_not_called()
-
- @mock.patch.object(constants.Sync, "max_diff", new=3)
- async def test_confirmation_result_large_diff(self):
- """Should return True if confirmed and False if _send_prompt fails or aborted."""
- author = helpers.MockMember()
- mock_message = helpers.MockMessage()
-
- subtests = (
- (True, mock_message, True, "confirmed"),
- (False, None, False, "_send_prompt failed"),
- (False, mock_message, False, "aborted"),
- )
-
- for expected_result, expected_message, confirmed, msg in subtests: # pragma: no cover
- with self.subTest(msg=msg):
- self.syncer._send_prompt = mock.AsyncMock(return_value=expected_message)
- self.syncer._wait_for_confirmation = mock.AsyncMock(return_value=confirmed)
-
- coro = self.syncer._get_confirmation_result(4, author)
- actual_result, actual_message = await coro
-
- self.syncer._send_prompt.assert_called_once_with(None) # message defaults to None
- self.assertIs(actual_result, expected_result)
- self.assertEqual(actual_message, expected_message)
-
- if expected_message:
- self.syncer._wait_for_confirmation.assert_called_once_with(
- author, expected_message
- )
diff --git a/tests/bot/cogs/test_duck_pond.py b/tests/bot/cogs/test_duck_pond.py
deleted file mode 100644
index a8c0107c6..000000000
--- a/tests/bot/cogs/test_duck_pond.py
+++ /dev/null
@@ -1,575 +0,0 @@
-import asyncio
-import logging
-import typing
-import unittest
-from unittest.mock import AsyncMock, MagicMock, patch
-
-import discord
-
-from bot import constants
-from bot.cogs import duck_pond
-from tests import base
-from tests import helpers
-
-MODULE_PATH = "bot.cogs.duck_pond"
-
-
-class DuckPondTests(base.LoggingTestsMixin, unittest.IsolatedAsyncioTestCase):
- """Tests for DuckPond functionality."""
-
- @classmethod
- def setUpClass(cls):
- """Sets up the objects that only have to be initialized once."""
- cls.nonstaff_member = helpers.MockMember(name="Non-staffer")
-
- cls.staff_role = helpers.MockRole(name="Staff role", id=constants.STAFF_ROLES[0])
- cls.staff_member = helpers.MockMember(name="staffer", roles=[cls.staff_role])
-
- cls.checkmark_emoji = "\N{White Heavy Check Mark}"
- cls.thumbs_up_emoji = "\N{Thumbs Up Sign}"
- cls.unicode_duck_emoji = "\N{Duck}"
- cls.duck_pond_emoji = helpers.MockPartialEmoji(id=constants.DuckPond.custom_emojis[0])
- cls.non_duck_custom_emoji = helpers.MockPartialEmoji(id=123)
-
- def setUp(self):
- """Sets up the objects that need to be refreshed before each test."""
- self.bot = helpers.MockBot(user=helpers.MockMember(id=46692))
- self.cog = duck_pond.DuckPond(bot=self.bot)
-
- def test_duck_pond_correctly_initializes(self):
- """`__init__ should set `bot` and `webhook_id` attributes and schedule `fetch_webhook`."""
- bot = helpers.MockBot()
- cog = MagicMock()
-
- duck_pond.DuckPond.__init__(cog, bot)
-
- self.assertEqual(cog.bot, bot)
- self.assertEqual(cog.webhook_id, constants.Webhooks.duck_pond)
- bot.loop.create_task.assert_called_once_with(cog.fetch_webhook())
-
- def test_fetch_webhook_succeeds_without_connectivity_issues(self):
- """The `fetch_webhook` method waits until `READY` event and sets the `webhook` attribute."""
- self.bot.fetch_webhook.return_value = "dummy webhook"
- self.cog.webhook_id = 1
-
- asyncio.run(self.cog.fetch_webhook())
-
- self.bot.wait_until_guild_available.assert_called_once()
- self.bot.fetch_webhook.assert_called_once_with(1)
- self.assertEqual(self.cog.webhook, "dummy webhook")
-
- def test_fetch_webhook_logs_when_unable_to_fetch_webhook(self):
- """The `fetch_webhook` method should log an exception when it fails to fetch the webhook."""
- self.bot.fetch_webhook.side_effect = discord.HTTPException(response=MagicMock(), message="Not found.")
- self.cog.webhook_id = 1
-
- log = logging.getLogger('bot.cogs.duck_pond')
- with self.assertLogs(logger=log, level=logging.ERROR) as log_watcher:
- asyncio.run(self.cog.fetch_webhook())
-
- self.bot.wait_until_guild_available.assert_called_once()
- self.bot.fetch_webhook.assert_called_once_with(1)
-
- self.assertEqual(len(log_watcher.records), 1)
-
- record = log_watcher.records[0]
- self.assertEqual(record.levelno, logging.ERROR)
-
- def test_is_staff_returns_correct_values_based_on_instance_passed(self):
- """The `is_staff` method should return correct values based on the instance passed."""
- test_cases = (
- (helpers.MockUser(name="User instance"), False),
- (helpers.MockMember(name="Member instance without staff role"), False),
- (helpers.MockMember(name="Member instance with staff role", roles=[self.staff_role]), True)
- )
-
- for user, expected_return in test_cases:
- actual_return = self.cog.is_staff(user)
- with self.subTest(user_type=user.name, expected_return=expected_return, actual_return=actual_return):
- self.assertEqual(expected_return, actual_return)
-
- async def test_has_green_checkmark_correctly_detects_presence_of_green_checkmark_emoji(self):
- """The `has_green_checkmark` method should only return `True` if one is present."""
- test_cases = (
- (
- "No reactions", helpers.MockMessage(), False
- ),
- (
- "No green check mark reactions",
- helpers.MockMessage(reactions=[
- helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.bot.user]),
- helpers.MockReaction(emoji=self.thumbs_up_emoji, users=[self.bot.user])
- ]),
- False
- ),
- (
- "Green check mark reaction, but not from our bot",
- helpers.MockMessage(reactions=[
- helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.bot.user]),
- helpers.MockReaction(emoji=self.checkmark_emoji, users=[self.staff_member])
- ]),
- False
- ),
- (
- "Green check mark reaction, with one from the bot",
- helpers.MockMessage(reactions=[
- helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.bot.user]),
- helpers.MockReaction(emoji=self.checkmark_emoji, users=[self.staff_member, self.bot.user])
- ]),
- True
- )
- )
-
- for description, message, expected_return in test_cases:
- actual_return = await self.cog.has_green_checkmark(message)
- with self.subTest(
- test_case=description,
- expected_return=expected_return,
- actual_return=actual_return
- ):
- self.assertEqual(expected_return, actual_return)
-
- def test_send_webhook_correctly_passes_on_arguments(self):
- """The `send_webhook` method should pass the arguments to the webhook correctly."""
- self.cog.webhook = helpers.MockAsyncWebhook()
-
- content = "fake content"
- username = "fake username"
- avatar_url = "fake avatar_url"
- embed = "fake embed"
-
- asyncio.run(self.cog.send_webhook(content, username, avatar_url, embed))
-
- self.cog.webhook.send.assert_called_once_with(
- content=content,
- username=username,
- avatar_url=avatar_url,
- embed=embed
- )
-
- def test_send_webhook_logs_when_sending_message_fails(self):
- """The `send_webhook` method should catch a `discord.HTTPException` and log accordingly."""
- self.cog.webhook = helpers.MockAsyncWebhook()
- self.cog.webhook.send.side_effect = discord.HTTPException(response=MagicMock(), message="Something failed.")
-
- log = logging.getLogger('bot.cogs.duck_pond')
- with self.assertLogs(logger=log, level=logging.ERROR) as log_watcher:
- asyncio.run(self.cog.send_webhook())
-
- self.assertEqual(len(log_watcher.records), 1)
-
- record = log_watcher.records[0]
- self.assertEqual(record.levelno, logging.ERROR)
-
- def _get_reaction(
- self,
- emoji: typing.Union[str, helpers.MockEmoji],
- staff: int = 0,
- nonstaff: int = 0
- ) -> helpers.MockReaction:
- staffers = [helpers.MockMember(roles=[self.staff_role]) for _ in range(staff)]
- nonstaffers = [helpers.MockMember() for _ in range(nonstaff)]
- return helpers.MockReaction(emoji=emoji, users=staffers + nonstaffers)
-
- async def test_count_ducks_correctly_counts_the_number_of_eligible_duck_emojis(self):
- """The `count_ducks` method should return the number of unique staffers who gave a duck."""
- test_cases = (
- # Simple test cases
- # A message without reactions should return 0
- (
- "No reactions",
- helpers.MockMessage(),
- 0
- ),
- # A message with a non-duck reaction from a non-staffer should return 0
- (
- "Non-duck reaction from non-staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.thumbs_up_emoji, nonstaff=1)]),
- 0
- ),
- # A message with a non-duck reaction from a staffer should return 0
- (
- "Non-duck reaction from staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.non_duck_custom_emoji, staff=1)]),
- 0
- ),
- # A message with a non-duck reaction from a non-staffer and staffer should return 0
- (
- "Non-duck reaction from staffer + non-staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.thumbs_up_emoji, staff=1, nonstaff=1)]),
- 0
- ),
- # A message with a unicode duck reaction from a non-staffer should return 0
- (
- "Unicode Duck Reaction from non-staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.unicode_duck_emoji, nonstaff=1)]),
- 0
- ),
- # A message with a unicode duck reaction from a staffer should return 1
- (
- "Unicode Duck Reaction from staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.unicode_duck_emoji, staff=1)]),
- 1
- ),
- # A message with a unicode duck reaction from a non-staffer and staffer should return 1
- (
- "Unicode Duck Reaction from staffer + non-staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.unicode_duck_emoji, staff=1, nonstaff=1)]),
- 1
- ),
- # A message with a duckpond duck reaction from a non-staffer should return 0
- (
- "Duckpond Duck Reaction from non-staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, nonstaff=1)]),
- 0
- ),
- # A message with a duckpond duck reaction from a staffer should return 1
- (
- "Duckpond Duck Reaction from staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, staff=1)]),
- 1
- ),
- # A message with a duckpond duck reaction from a non-staffer and staffer should return 1
- (
- "Duckpond Duck Reaction from staffer + non-staffer",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, staff=1, nonstaff=1)]),
- 1
- ),
-
- # Complex test cases
- # A message with duckpond duck reactions from 3 staffers and 2 non-staffers returns 3
- (
- "Duckpond Duck Reaction from 3 staffers + 2 non-staffers",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, staff=3, nonstaff=2)]),
- 3
- ),
- # A staffer with multiple duck reactions only counts once
- (
- "Two different duck reactions from the same staffer",
- helpers.MockMessage(
- reactions=[
- helpers.MockReaction(emoji=self.duck_pond_emoji, users=[self.staff_member]),
- helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.staff_member]),
- ]
- ),
- 1
- ),
- # A non-string emoji does not count (to test the `isinstance(reaction.emoji, str)` elif)
- (
- "Reaction with non-Emoji/str emoij from 3 staffers + 2 non-staffers",
- helpers.MockMessage(reactions=[self._get_reaction(emoji=100, staff=3, nonstaff=2)]),
- 0
- ),
- # We correctly sum when multiple reactions are provided.
- (
- "Duckpond Duck Reaction from 3 staffers + 2 non-staffers",
- helpers.MockMessage(
- reactions=[
- self._get_reaction(emoji=self.duck_pond_emoji, staff=3, nonstaff=2),
- self._get_reaction(emoji=self.unicode_duck_emoji, staff=4, nonstaff=9),
- ]
- ),
- 3 + 4
- ),
- )
-
- for description, message, expected_count in test_cases:
- actual_count = await self.cog.count_ducks(message)
- with self.subTest(test_case=description, expected_count=expected_count, actual_count=actual_count):
- self.assertEqual(expected_count, actual_count)
-
- async def test_relay_message_correctly_relays_content_and_attachments(self):
- """The `relay_message` method should correctly relay message content and attachments."""
- send_webhook_path = f"{MODULE_PATH}.DuckPond.send_webhook"
- send_attachments_path = f"{MODULE_PATH}.send_attachments"
-
- self.cog.webhook = helpers.MockAsyncWebhook()
-
- test_values = (
- (helpers.MockMessage(clean_content="", attachments=[]), False, False),
- (helpers.MockMessage(clean_content="message", attachments=[]), True, False),
- (helpers.MockMessage(clean_content="", attachments=["attachment"]), False, True),
- (helpers.MockMessage(clean_content="message", attachments=["attachment"]), True, True),
- )
-
- for message, expect_webhook_call, expect_attachment_call in test_values:
- with patch(send_webhook_path, new_callable=AsyncMock) as send_webhook:
- with patch(send_attachments_path, new_callable=AsyncMock) as send_attachments:
- with self.subTest(clean_content=message.clean_content, attachments=message.attachments):
- await self.cog.relay_message(message)
-
- self.assertEqual(expect_webhook_call, send_webhook.called)
- self.assertEqual(expect_attachment_call, send_attachments.called)
-
- message.add_reaction.assert_called_once_with(self.checkmark_emoji)
-
- @patch(f"{MODULE_PATH}.send_attachments", new_callable=AsyncMock)
- async def test_relay_message_handles_irretrievable_attachment_exceptions(self, send_attachments):
- """The `relay_message` method should handle irretrievable attachments."""
- message = helpers.MockMessage(clean_content="message", attachments=["attachment"])
- side_effects = (discord.errors.Forbidden(MagicMock(), ""), discord.errors.NotFound(MagicMock(), ""))
-
- self.cog.webhook = helpers.MockAsyncWebhook()
- log = logging.getLogger("bot.cogs.duck_pond")
-
- for side_effect in side_effects: # pragma: no cover
- send_attachments.side_effect = side_effect
- with patch(f"{MODULE_PATH}.DuckPond.send_webhook", new_callable=AsyncMock) as send_webhook:
- with self.subTest(side_effect=type(side_effect).__name__):
- with self.assertNotLogs(logger=log, level=logging.ERROR):
- await self.cog.relay_message(message)
-
- self.assertEqual(send_webhook.call_count, 2)
-
- @patch(f"{MODULE_PATH}.DuckPond.send_webhook", new_callable=AsyncMock)
- @patch(f"{MODULE_PATH}.send_attachments", new_callable=AsyncMock)
- async def test_relay_message_handles_attachment_http_error(self, send_attachments, send_webhook):
- """The `relay_message` method should handle irretrievable attachments."""
- message = helpers.MockMessage(clean_content="message", attachments=["attachment"])
-
- self.cog.webhook = helpers.MockAsyncWebhook()
- log = logging.getLogger("bot.cogs.duck_pond")
-
- side_effect = discord.HTTPException(MagicMock(), "")
- send_attachments.side_effect = side_effect
- with self.subTest(side_effect=type(side_effect).__name__):
- with self.assertLogs(logger=log, level=logging.ERROR) as log_watcher:
- await self.cog.relay_message(message)
-
- send_webhook.assert_called_once_with(
- content=message.clean_content,
- username=message.author.display_name,
- avatar_url=message.author.avatar_url
- )
-
- self.assertEqual(len(log_watcher.records), 1)
-
- record = log_watcher.records[0]
- self.assertEqual(record.levelno, logging.ERROR)
-
- def _mock_payload(self, label: str, is_custom_emoji: bool, id_: int, emoji_name: str):
- """Creates a mock `on_raw_reaction_add` payload with the specified emoji data."""
- payload = MagicMock(name=label)
- payload.emoji.is_custom_emoji.return_value = is_custom_emoji
- payload.emoji.id = id_
- payload.emoji.name = emoji_name
- return payload
-
- async def test_payload_has_duckpond_emoji_correctly_detects_relevant_emojis(self):
- """The `on_raw_reaction_add` event handler should ignore irrelevant emojis."""
- test_values = (
- # Custom Emojis
- (
- self._mock_payload(
- label="Custom Duckpond Emoji",
- is_custom_emoji=True,
- id_=constants.DuckPond.custom_emojis[0],
- emoji_name=""
- ),
- True
- ),
- (
- self._mock_payload(
- label="Custom Non-Duckpond Emoji",
- is_custom_emoji=True,
- id_=123,
- emoji_name=""
- ),
- False
- ),
- # Unicode Emojis
- (
- self._mock_payload(
- label="Unicode Duck Emoji",
- is_custom_emoji=False,
- id_=1,
- emoji_name=self.unicode_duck_emoji
- ),
- True
- ),
- (
- self._mock_payload(
- label="Unicode Non-Duck Emoji",
- is_custom_emoji=False,
- id_=1,
- emoji_name=self.thumbs_up_emoji
- ),
- False
- ),
- )
-
- for payload, expected_return in test_values:
- actual_return = self.cog._payload_has_duckpond_emoji(payload)
- with self.subTest(case=payload._mock_name, expected_return=expected_return, actual_return=actual_return):
- self.assertEqual(expected_return, actual_return)
-
- @patch(f"{MODULE_PATH}.discord.utils.get")
- @patch(f"{MODULE_PATH}.DuckPond._payload_has_duckpond_emoji", new=MagicMock(return_value=False))
- def test_on_raw_reaction_add_returns_early_with_payload_without_duck_emoji(self, utils_get):
- """The `on_raw_reaction_add` method should return early if the payload does not contain a duck emoji."""
- self.assertIsNone(asyncio.run(self.cog.on_raw_reaction_add(payload=MagicMock())))
-
- # Ensure we've returned before making an unnecessary API call in the lines of code after the emoji check
- utils_get.assert_not_called()
-
- def _raw_reaction_mocks(self, channel_id, message_id, user_id):
- """Sets up mocks for tests of the `on_raw_reaction_add` event listener."""
- channel = helpers.MockTextChannel(id=channel_id)
- self.bot.get_all_channels.return_value = (channel,)
-
- message = helpers.MockMessage(id=message_id)
-
- channel.fetch_message.return_value = message
-
- member = helpers.MockMember(id=user_id, roles=[self.staff_role])
- message.guild.members = (member,)
-
- payload = MagicMock(channel_id=channel_id, message_id=message_id, user_id=user_id)
-
- return channel, message, member, payload
-
- async def test_on_raw_reaction_add_returns_for_bot_and_non_staff_members(self):
- """The `on_raw_reaction_add` event handler should return for bot users or non-staff members."""
- channel_id = 1234
- message_id = 2345
- user_id = 3456
-
- channel, message, _, payload = self._raw_reaction_mocks(channel_id, message_id, user_id)
-
- test_cases = (
- ("non-staff member", helpers.MockMember(id=user_id)),
- ("bot staff member", helpers.MockMember(id=user_id, roles=[self.staff_role], bot=True)),
- )
-
- payload.emoji = self.duck_pond_emoji
-
- for description, member in test_cases:
- message.guild.members = (member, )
- with self.subTest(test_case=description), patch(f"{MODULE_PATH}.DuckPond.has_green_checkmark") as checkmark:
- checkmark.side_effect = AssertionError(
- "Expected method to return before calling `self.has_green_checkmark`."
- )
- self.assertIsNone(await self.cog.on_raw_reaction_add(payload))
-
- # Check that we did make it past the payload checks
- channel.fetch_message.assert_called_once()
- channel.fetch_message.reset_mock()
-
- @patch(f"{MODULE_PATH}.DuckPond.is_staff")
- @patch(f"{MODULE_PATH}.DuckPond.count_ducks", new_callable=AsyncMock)
- def test_on_raw_reaction_add_returns_on_message_with_green_checkmark_placed_by_bot(self, count_ducks, is_staff):
- """The `on_raw_reaction_add` event should return when the message has a green check mark placed by the bot."""
- channel_id = 31415926535
- message_id = 27182818284
- user_id = 16180339887
-
- channel, message, member, payload = self._raw_reaction_mocks(channel_id, message_id, user_id)
-
- payload.emoji = helpers.MockPartialEmoji(name=self.unicode_duck_emoji)
- payload.emoji.is_custom_emoji.return_value = False
-
- message.reactions = [helpers.MockReaction(emoji=self.checkmark_emoji, users=[self.bot.user])]
-
- is_staff.return_value = True
- count_ducks.side_effect = AssertionError("Expected method to return before calling `self.count_ducks`")
-
- self.assertIsNone(asyncio.run(self.cog.on_raw_reaction_add(payload)))
-
- # Assert that we've made it past `self.is_staff`
- is_staff.assert_called_once()
-
- async def test_on_raw_reaction_add_does_not_relay_below_duck_threshold(self):
- """The `on_raw_reaction_add` listener should not relay messages or attachments below the duck threshold."""
- test_cases = (
- (constants.DuckPond.threshold - 1, False),
- (constants.DuckPond.threshold, True),
- (constants.DuckPond.threshold + 1, True),
- )
-
- channel, message, member, payload = self._raw_reaction_mocks(channel_id=3, message_id=4, user_id=5)
-
- payload.emoji = self.duck_pond_emoji
-
- for duck_count, should_relay in test_cases:
- with patch(f"{MODULE_PATH}.DuckPond.relay_message", new_callable=AsyncMock) as relay_message:
- with patch(f"{MODULE_PATH}.DuckPond.count_ducks", new_callable=AsyncMock) as count_ducks:
- count_ducks.return_value = duck_count
- with self.subTest(duck_count=duck_count, should_relay=should_relay):
- await self.cog.on_raw_reaction_add(payload)
-
- # Confirm that we've made it past counting
- count_ducks.assert_called_once()
-
- # Did we relay a message?
- has_relayed = relay_message.called
- self.assertEqual(has_relayed, should_relay)
-
- if should_relay:
- relay_message.assert_called_once_with(message)
-
- async def test_on_raw_reaction_remove_prevents_removal_of_green_checkmark_depending_on_the_duck_count(self):
- """The `on_raw_reaction_remove` listener prevents removal of the check mark on messages with enough ducks."""
- checkmark = helpers.MockPartialEmoji(name=self.checkmark_emoji)
-
- message = helpers.MockMessage(id=1234)
-
- channel = helpers.MockTextChannel(id=98765)
- channel.fetch_message.return_value = message
-
- self.bot.get_all_channels.return_value = (channel, )
-
- payload = MagicMock(channel_id=channel.id, message_id=message.id, emoji=checkmark)
-
- test_cases = (
- (constants.DuckPond.threshold - 1, False),
- (constants.DuckPond.threshold, True),
- (constants.DuckPond.threshold + 1, True),
- )
- for duck_count, should_re_add_checkmark in test_cases:
- with patch(f"{MODULE_PATH}.DuckPond.count_ducks", new_callable=AsyncMock) as count_ducks:
- count_ducks.return_value = duck_count
- with self.subTest(duck_count=duck_count, should_re_add_checkmark=should_re_add_checkmark):
- await self.cog.on_raw_reaction_remove(payload)
-
- # Check if we fetched the message
- channel.fetch_message.assert_called_once_with(message.id)
-
- # Check if we actually counted the number of ducks
- count_ducks.assert_called_once_with(message)
-
- has_re_added_checkmark = message.add_reaction.called
- self.assertEqual(should_re_add_checkmark, has_re_added_checkmark)
-
- if should_re_add_checkmark:
- message.add_reaction.assert_called_once_with(self.checkmark_emoji)
- message.add_reaction.reset_mock()
-
- # reset mocks
- channel.fetch_message.reset_mock()
- message.reset_mock()
-
- def test_on_raw_reaction_remove_ignores_removal_of_non_checkmark_reactions(self):
- """The `on_raw_reaction_remove` listener should ignore the removal of non-check mark emojis."""
- channel = helpers.MockTextChannel(id=98765)
-
- channel.fetch_message.side_effect = AssertionError(
- "Expected method to return before calling `channel.fetch_message`"
- )
-
- self.bot.get_all_channels.return_value = (channel, )
-
- payload = MagicMock(emoji=helpers.MockPartialEmoji(name=self.thumbs_up_emoji), channel_id=channel.id)
-
- self.assertIsNone(asyncio.run(self.cog.on_raw_reaction_remove(payload)))
-
- channel.fetch_message.assert_not_called()
-
-
-class DuckPondSetupTests(unittest.TestCase):
- """Tests setup of the `DuckPond` cog."""
-
- def test_setup(self):
- """Setup of the extension should call add_cog."""
- bot = helpers.MockBot()
- duck_pond.setup(bot)
- bot.add_cog.assert_called_once()
diff --git a/tests/bot/cogs/__init__.py b/tests/bot/exts/__init__.py
index e69de29bb..e69de29bb 100644
--- a/tests/bot/cogs/__init__.py
+++ b/tests/bot/exts/__init__.py
diff --git a/tests/bot/cogs/moderation/__init__.py b/tests/bot/exts/backend/__init__.py
index e69de29bb..e69de29bb 100644
--- a/tests/bot/cogs/moderation/__init__.py
+++ b/tests/bot/exts/backend/__init__.py
diff --git a/tests/bot/cogs/sync/__init__.py b/tests/bot/exts/backend/sync/__init__.py
index e69de29bb..e69de29bb 100644
--- a/tests/bot/cogs/sync/__init__.py
+++ b/tests/bot/exts/backend/sync/__init__.py
diff --git a/tests/bot/exts/backend/sync/test_base.py b/tests/bot/exts/backend/sync/test_base.py
new file mode 100644
index 000000000..4953550f9
--- /dev/null
+++ b/tests/bot/exts/backend/sync/test_base.py
@@ -0,0 +1,73 @@
+import unittest
+from unittest import mock
+
+
+from bot.api import ResponseCodeError
+from bot.exts.backend.sync._syncers import Syncer
+from tests import helpers
+
+
+class TestSyncer(Syncer):
+ """Syncer subclass with mocks for abstract methods for testing purposes."""
+
+ name = "test"
+ _get_diff = mock.AsyncMock()
+ _sync = mock.AsyncMock()
+
+
+class SyncerBaseTests(unittest.TestCase):
+ """Tests for the syncer base class."""
+
+ def setUp(self):
+ self.bot = helpers.MockBot()
+
+ def test_instantiation_fails_without_abstract_methods(self):
+ """The class must have abstract methods implemented."""
+ with self.assertRaisesRegex(TypeError, "Can't instantiate abstract class"):
+ Syncer(self.bot)
+
+
+class SyncerSyncTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for main function orchestrating the sync."""
+
+ def setUp(self):
+ self.bot = helpers.MockBot(user=helpers.MockMember(bot=True))
+ self.syncer = TestSyncer(self.bot)
+ self.guild = helpers.MockGuild()
+
+ # Make sure `_get_diff` returns a MagicMock, not an AsyncMock
+ self.syncer._get_diff.return_value = mock.MagicMock()
+
+ async def test_sync_message_edited(self):
+ """The message should be edited if one was sent, even if the sync has an API error."""
+ subtests = (
+ (None, None, False),
+ (helpers.MockMessage(), None, True),
+ (helpers.MockMessage(), ResponseCodeError(mock.MagicMock()), True),
+ )
+
+ for message, side_effect, should_edit in subtests:
+ with self.subTest(message=message, side_effect=side_effect, should_edit=should_edit):
+ self.syncer._sync.side_effect = side_effect
+ ctx = helpers.MockContext()
+ ctx.send.return_value = message
+
+ await self.syncer.sync(self.guild, ctx)
+
+ if should_edit:
+ message.edit.assert_called_once()
+ self.assertIn("content", message.edit.call_args[1])
+
+ async def test_sync_message_sent(self):
+ """If ctx is given, a new message should be sent."""
+ subtests = (
+ (None, None),
+ (helpers.MockContext(), helpers.MockMessage()),
+ )
+
+ for ctx, message in subtests:
+ with self.subTest(ctx=ctx, message=message):
+ await self.syncer.sync(self.guild, ctx)
+
+ if ctx is not None:
+ ctx.send.assert_called_once()
diff --git a/tests/bot/cogs/sync/test_cog.py b/tests/bot/exts/backend/sync/test_cog.py
index 120bc991d..063a82754 100644
--- a/tests/bot/cogs/sync/test_cog.py
+++ b/tests/bot/exts/backend/sync/test_cog.py
@@ -5,8 +5,9 @@ import discord
from bot import constants
from bot.api import ResponseCodeError
-from bot.cogs import sync
-from bot.cogs.sync.syncers import Syncer
+from bot.exts.backend import sync
+from bot.exts.backend.sync._cog import Sync
+from bot.exts.backend.sync._syncers import Syncer
from tests import helpers
from tests.base import CommandTestCase
@@ -29,19 +30,19 @@ class SyncCogTestCase(unittest.IsolatedAsyncioTestCase):
self.bot = helpers.MockBot()
self.role_syncer_patcher = mock.patch(
- "bot.cogs.sync.syncers.RoleSyncer",
+ "bot.exts.backend.sync._syncers.RoleSyncer",
autospec=Syncer,
spec_set=True
)
self.user_syncer_patcher = mock.patch(
- "bot.cogs.sync.syncers.UserSyncer",
+ "bot.exts.backend.sync._syncers.UserSyncer",
autospec=Syncer,
spec_set=True
)
self.RoleSyncer = self.role_syncer_patcher.start()
self.UserSyncer = self.user_syncer_patcher.start()
- self.cog = sync.Sync(self.bot)
+ self.cog = Sync(self.bot)
def tearDown(self):
self.role_syncer_patcher.stop()
@@ -59,7 +60,7 @@ class SyncCogTestCase(unittest.IsolatedAsyncioTestCase):
class SyncCogTests(SyncCogTestCase):
"""Tests for the Sync cog."""
- @mock.patch.object(sync.Sync, "sync_guild", new_callable=mock.MagicMock)
+ @mock.patch.object(Sync, "sync_guild", new_callable=mock.MagicMock)
def test_sync_cog_init(self, sync_guild):
"""Should instantiate syncers and run a sync for the guild."""
# Reset because a Sync cog was already instantiated in setUp.
@@ -70,7 +71,7 @@ class SyncCogTests(SyncCogTestCase):
mock_sync_guild_coro = mock.MagicMock()
sync_guild.return_value = mock_sync_guild_coro
- sync.Sync(self.bot)
+ Sync(self.bot)
self.RoleSyncer.assert_called_once_with(self.bot)
self.UserSyncer.assert_called_once_with(self.bot)
@@ -131,7 +132,7 @@ class SyncCogListenerTests(SyncCogTestCase):
super().setUp()
self.cog.patch_user = mock.AsyncMock(spec_set=self.cog.patch_user)
- self.guild_id_patcher = mock.patch("bot.cogs.sync.cog.constants.Guild.id", 5)
+ self.guild_id_patcher = mock.patch("bot.exts.backend.sync._cog.constants.Guild.id", 5)
self.guild_id = self.guild_id_patcher.start()
self.guild = helpers.MockGuild(id=self.guild_id)
@@ -391,14 +392,14 @@ class SyncCogCommandTests(SyncCogTestCase, CommandTestCase):
async def test_sync_roles_command(self):
"""sync() should be called on the RoleSyncer."""
ctx = helpers.MockContext()
- await self.cog.sync_roles_command.callback(self.cog, ctx)
+ await self.cog.sync_roles_command(self.cog, ctx)
self.cog.role_syncer.sync.assert_called_once_with(ctx.guild, ctx)
async def test_sync_users_command(self):
"""sync() should be called on the UserSyncer."""
ctx = helpers.MockContext()
- await self.cog.sync_users_command.callback(self.cog, ctx)
+ await self.cog.sync_users_command(self.cog, ctx)
self.cog.user_syncer.sync.assert_called_once_with(ctx.guild, ctx)
diff --git a/tests/bot/cogs/sync/test_roles.py b/tests/bot/exts/backend/sync/test_roles.py
index 79eee98f4..7b9f40cad 100644
--- a/tests/bot/cogs/sync/test_roles.py
+++ b/tests/bot/exts/backend/sync/test_roles.py
@@ -3,7 +3,7 @@ from unittest import mock
import discord
-from bot.cogs.sync.syncers import RoleSyncer, _Diff, _Role
+from bot.exts.backend.sync._syncers import RoleSyncer, _Diff, _Role
from tests import helpers
diff --git a/tests/bot/cogs/sync/test_users.py b/tests/bot/exts/backend/sync/test_users.py
index 002a947ad..9f380a15d 100644
--- a/tests/bot/cogs/sync/test_users.py
+++ b/tests/bot/exts/backend/sync/test_users.py
@@ -1,7 +1,6 @@
import unittest
-from unittest import mock
-from bot.cogs.sync.syncers import UserSyncer, _Diff, _User
+from bot.exts.backend.sync._syncers import UserSyncer, _Diff
from tests import helpers
@@ -10,7 +9,7 @@ def fake_user(**kwargs):
kwargs.setdefault("id", 43)
kwargs.setdefault("name", "bob the test man")
kwargs.setdefault("discriminator", 1337)
- kwargs.setdefault("roles", (666,))
+ kwargs.setdefault("roles", [666])
kwargs.setdefault("in_guild", True)
return kwargs
@@ -40,22 +39,42 @@ class UserSyncerDiffTests(unittest.IsolatedAsyncioTestCase):
return guild
+ @staticmethod
+ def get_mock_member(member: dict):
+ member = member.copy()
+ del member["in_guild"]
+ mock_member = helpers.MockMember(**member)
+ mock_member.roles = [helpers.MockRole(id=role_id) for role_id in member["roles"]]
+ return mock_member
+
async def test_empty_diff_for_no_users(self):
"""When no users are given, an empty diff should be returned."""
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": []
+ }
guild = self.get_guild()
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = (set(), set(), None)
+ expected_diff = ([], [], None)
self.assertEqual(actual_diff, expected_diff)
async def test_empty_diff_for_identical_users(self):
"""No differences should be found if the users in the guild and DB are identical."""
- self.bot.api_client.get.return_value = [fake_user()]
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": [fake_user()]
+ }
guild = self.get_guild(fake_user())
+ guild.get_member.return_value = self.get_mock_member(fake_user())
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = (set(), set(), None)
+ expected_diff = ([], [], None)
self.assertEqual(actual_diff, expected_diff)
@@ -63,59 +82,102 @@ class UserSyncerDiffTests(unittest.IsolatedAsyncioTestCase):
"""Only updated users should be added to the 'updated' set of the diff."""
updated_user = fake_user(id=99, name="new")
- self.bot.api_client.get.return_value = [fake_user(id=99, name="old"), fake_user()]
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": [fake_user(id=99, name="old"), fake_user()]
+ }
guild = self.get_guild(updated_user, fake_user())
+ guild.get_member.side_effect = [
+ self.get_mock_member(updated_user),
+ self.get_mock_member(fake_user())
+ ]
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = (set(), {_User(**updated_user)}, None)
+ expected_diff = ([], [{"id": 99, "name": "new"}], None)
self.assertEqual(actual_diff, expected_diff)
async def test_diff_for_new_users(self):
- """Only new users should be added to the 'created' set of the diff."""
+ """Only new users should be added to the 'created' list of the diff."""
new_user = fake_user(id=99, name="new")
- self.bot.api_client.get.return_value = [fake_user()]
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": [fake_user()]
+ }
guild = self.get_guild(fake_user(), new_user)
-
+ guild.get_member.side_effect = [
+ self.get_mock_member(fake_user()),
+ self.get_mock_member(new_user)
+ ]
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = ({_User(**new_user)}, set(), None)
+ expected_diff = ([new_user], [], None)
self.assertEqual(actual_diff, expected_diff)
async def test_diff_sets_in_guild_false_for_leaving_users(self):
"""When a user leaves the guild, the `in_guild` flag is updated to `False`."""
- leaving_user = fake_user(id=63, in_guild=False)
-
- self.bot.api_client.get.return_value = [fake_user(), fake_user(id=63)]
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": [fake_user(), fake_user(id=63)]
+ }
guild = self.get_guild(fake_user())
+ guild.get_member.side_effect = [
+ self.get_mock_member(fake_user()),
+ None
+ ]
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = (set(), {_User(**leaving_user)}, None)
+ expected_diff = ([], [{"id": 63, "in_guild": False}], None)
self.assertEqual(actual_diff, expected_diff)
async def test_diff_for_new_updated_and_leaving_users(self):
"""When users are added, updated, and removed, all of them are returned properly."""
new_user = fake_user(id=99, name="new")
+
updated_user = fake_user(id=55, name="updated")
- leaving_user = fake_user(id=63, in_guild=False)
- self.bot.api_client.get.return_value = [fake_user(), fake_user(id=55), fake_user(id=63)]
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": [fake_user(), fake_user(id=55), fake_user(id=63)]
+ }
guild = self.get_guild(fake_user(), new_user, updated_user)
+ guild.get_member.side_effect = [
+ self.get_mock_member(fake_user()),
+ self.get_mock_member(updated_user),
+ None
+ ]
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = ({_User(**new_user)}, {_User(**updated_user), _User(**leaving_user)}, None)
+ expected_diff = ([new_user], [{"id": 55, "name": "updated"}, {"id": 63, "in_guild": False}], None)
self.assertEqual(actual_diff, expected_diff)
async def test_empty_diff_for_db_users_not_in_guild(self):
- """When the DB knows a user the guild doesn't, no difference is found."""
- self.bot.api_client.get.return_value = [fake_user(), fake_user(id=63, in_guild=False)]
+ """When the DB knows a user, but the guild doesn't, no difference is found."""
+ self.bot.api_client.get.return_value = {
+ "count": 3,
+ "next_page_no": None,
+ "previous_page_no": None,
+ "results": [fake_user(), fake_user(id=63, in_guild=False)]
+ }
guild = self.get_guild(fake_user())
+ guild.get_member.side_effect = [
+ self.get_mock_member(fake_user()),
+ None
+ ]
actual_diff = await self.syncer._get_diff(guild)
- expected_diff = (set(), set(), None)
+ expected_diff = ([], [], None)
self.assertEqual(actual_diff, expected_diff)
@@ -131,13 +193,10 @@ class UserSyncerSyncTests(unittest.IsolatedAsyncioTestCase):
"""Only POST requests should be made with the correct payload."""
users = [fake_user(id=111), fake_user(id=222)]
- user_tuples = {_User(**user) for user in users}
- diff = _Diff(user_tuples, set(), None)
+ diff = _Diff(users, [], None)
await self.syncer._sync(diff)
- calls = [mock.call("bot/users", json=user) for user in users]
- self.bot.api_client.post.assert_has_calls(calls, any_order=True)
- self.assertEqual(self.bot.api_client.post.call_count, len(users))
+ self.bot.api_client.post.assert_called_once_with("bot/users", json=diff.created)
self.bot.api_client.put.assert_not_called()
self.bot.api_client.delete.assert_not_called()
@@ -146,13 +205,10 @@ class UserSyncerSyncTests(unittest.IsolatedAsyncioTestCase):
"""Only PUT requests should be made with the correct payload."""
users = [fake_user(id=111), fake_user(id=222)]
- user_tuples = {_User(**user) for user in users}
- diff = _Diff(set(), user_tuples, None)
+ diff = _Diff([], users, None)
await self.syncer._sync(diff)
- calls = [mock.call(f"bot/users/{user['id']}", json=user) for user in users]
- self.bot.api_client.put.assert_has_calls(calls, any_order=True)
- self.assertEqual(self.bot.api_client.put.call_count, len(users))
+ self.bot.api_client.patch.assert_called_once_with("bot/users/bulk_patch", json=diff.updated)
self.bot.api_client.post.assert_not_called()
self.bot.api_client.delete.assert_not_called()
diff --git a/tests/bot/cogs/test_logging.py b/tests/bot/exts/backend/test_logging.py
index 8a18fdcd6..466f207d9 100644
--- a/tests/bot/cogs/test_logging.py
+++ b/tests/bot/exts/backend/test_logging.py
@@ -2,7 +2,7 @@ import unittest
from unittest.mock import patch
from bot import constants
-from bot.cogs.logging import Logging
+from bot.exts.backend.logging import Logging
from tests.helpers import MockBot, MockTextChannel
@@ -14,7 +14,7 @@ class LoggingTests(unittest.IsolatedAsyncioTestCase):
self.cog = Logging(self.bot)
self.dev_log = MockTextChannel(id=1234, name="dev-log")
- @patch("bot.cogs.logging.DEBUG_MODE", False)
+ @patch("bot.exts.backend.logging.DEBUG_MODE", False)
async def test_debug_mode_false(self):
"""Should send connected message to dev-log."""
self.bot.get_channel.return_value = self.dev_log
@@ -24,7 +24,7 @@ class LoggingTests(unittest.IsolatedAsyncioTestCase):
self.bot.get_channel.assert_called_once_with(constants.Channels.dev_log)
self.dev_log.send.assert_awaited_once()
- @patch("bot.cogs.logging.DEBUG_MODE", True)
+ @patch("bot.exts.backend.logging.DEBUG_MODE", True)
async def test_debug_mode_true(self):
"""Should not send anything to dev-log."""
await self.cog.startup_greeting()
diff --git a/tests/bot/patches/__init__.py b/tests/bot/exts/filters/__init__.py
index e69de29bb..e69de29bb 100644
--- a/tests/bot/patches/__init__.py
+++ b/tests/bot/exts/filters/__init__.py
diff --git a/tests/bot/cogs/test_antimalware.py b/tests/bot/exts/filters/test_antimalware.py
index f219fc1ba..3393c6cdc 100644
--- a/tests/bot/cogs/test_antimalware.py
+++ b/tests/bot/exts/filters/test_antimalware.py
@@ -1,28 +1,35 @@
import unittest
-from unittest.mock import AsyncMock, Mock, patch
+from unittest.mock import AsyncMock, Mock
from discord import NotFound
-from bot.cogs import antimalware
-from bot.constants import AntiMalware as AntiMalwareConfig, Channels, STAFF_ROLES
+from bot.constants import Channels, STAFF_ROLES
+from bot.exts.filters import antimalware
from tests.helpers import MockAttachment, MockBot, MockMessage, MockRole
-MODULE = "bot.cogs.antimalware"
-
-@patch(f"{MODULE}.AntiMalwareConfig.whitelist", new=[".first", ".second", ".third"])
class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
"""Test the AntiMalware cog."""
def setUp(self):
"""Sets up fresh objects for each test."""
self.bot = MockBot()
+ self.bot.filter_list_cache = {
+ "FILE_FORMAT.True": {
+ ".first": {},
+ ".second": {},
+ ".third": {},
+ }
+ }
self.cog = antimalware.AntiMalware(self.bot)
self.message = MockMessage()
+ self.message.webhook_id = None
+ self.message.author.bot = None
+ self.whitelist = [".first", ".second", ".third"]
async def test_message_with_allowed_attachment(self):
"""Messages with allowed extensions should not be deleted"""
- attachment = MockAttachment(filename=f"python{AntiMalwareConfig.whitelist[0]}")
+ attachment = MockAttachment(filename="python.first")
self.message.attachments = [attachment]
await self.cog.on_message(self.message)
@@ -43,6 +50,26 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
self.message.delete.assert_not_called()
+ async def test_webhook_message_with_illegal_extension(self):
+ """A webhook message containing an illegal extension should be ignored."""
+ attachment = MockAttachment(filename="python.disallowed")
+ self.message.webhook_id = 697140105563078727
+ self.message.attachments = [attachment]
+
+ await self.cog.on_message(self.message)
+
+ self.message.delete.assert_not_called()
+
+ async def test_bot_message_with_illegal_extension(self):
+ """A bot message containing an illegal extension should be ignored."""
+ attachment = MockAttachment(filename="python.disallowed")
+ self.message.author.bot = 409107086526644234
+ self.message.attachments = [attachment]
+
+ await self.cog.on_message(self.message)
+
+ self.message.delete.assert_not_called()
+
async def test_message_with_illegal_extension_gets_deleted(self):
"""A message containing an illegal extension should send an embed."""
attachment = MockAttachment(filename="python.disallowed")
@@ -93,7 +120,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(embed.description, antimalware.TXT_EMBED_DESCRIPTION.format.return_value)
antimalware.TXT_EMBED_DESCRIPTION.format.assert_called_with(cmd_channel_mention=cmd_channel.mention)
- async def test_other_disallowed_extention_embed_description(self):
+ async def test_other_disallowed_extension_embed_description(self):
"""Test the description for a non .py/.txt disallowed extension."""
attachment = MockAttachment(filename="python.disallowed")
self.message.attachments = [attachment]
@@ -109,6 +136,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(embed.description, antimalware.DISALLOWED_EMBED_DESCRIPTION.format.return_value)
antimalware.DISALLOWED_EMBED_DESCRIPTION.format.assert_called_with(
+ joined_whitelist=", ".join(self.whitelist),
blocked_extensions_str=".disallowed",
meta_channel_mention=meta_channel.mention
)
@@ -135,7 +163,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
"""The return value should include all non-whitelisted extensions."""
test_values = (
([], []),
- (AntiMalwareConfig.whitelist, []),
+ (self.whitelist, []),
([".first"], []),
([".first", ".disallowed"], [".disallowed"]),
([".disallowed"], [".disallowed"]),
@@ -145,7 +173,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase):
for extensions, expected_disallowed_extensions in test_values:
with self.subTest(extensions=extensions, expected_disallowed_extensions=expected_disallowed_extensions):
self.message.attachments = [MockAttachment(filename=f"filename{extension}") for extension in extensions]
- disallowed_extensions = self.cog.get_disallowed_extensions(self.message)
+ disallowed_extensions = self.cog._get_disallowed_extensions(self.message)
self.assertCountEqual(disallowed_extensions, expected_disallowed_extensions)
diff --git a/tests/bot/cogs/test_antispam.py b/tests/bot/exts/filters/test_antispam.py
index ce5472c71..6a0e4fded 100644
--- a/tests/bot/cogs/test_antispam.py
+++ b/tests/bot/exts/filters/test_antispam.py
@@ -1,6 +1,6 @@
import unittest
-from bot.cogs import antispam
+from bot.exts.filters import antispam
class AntispamConfigurationValidationTests(unittest.TestCase):
diff --git a/tests/bot/cogs/test_security.py b/tests/bot/exts/filters/test_security.py
index 9d1a62f7e..c0c3baa42 100644
--- a/tests/bot/cogs/test_security.py
+++ b/tests/bot/exts/filters/test_security.py
@@ -3,7 +3,7 @@ from unittest.mock import MagicMock
from discord.ext.commands import NoPrivateMessage
-from bot.cogs import security
+from bot.exts.filters import security
from tests.helpers import MockBot, MockContext
diff --git a/tests/bot/cogs/test_token_remover.py b/tests/bot/exts/filters/test_token_remover.py
index 3349caa73..f99cc3370 100644
--- a/tests/bot/cogs/test_token_remover.py
+++ b/tests/bot/exts/filters/test_token_remover.py
@@ -6,9 +6,10 @@ from unittest.mock import MagicMock
from discord import Colour, NotFound
from bot import constants
-from bot.cogs import token_remover
-from bot.cogs.moderation import ModLog
-from bot.cogs.token_remover import Token, TokenRemover
+from bot.exts.filters import token_remover
+from bot.exts.filters.token_remover import Token, TokenRemover
+from bot.exts.moderation.modlog import ModLog
+from bot.utils.messages import format_user
from tests.helpers import MockBot, MockMessage, autospec
@@ -22,23 +23,25 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
self.msg = MockMessage(id=555, content="hello world")
self.msg.channel.mention = "#lemonade-stand"
+ self.msg.guild.get_member.return_value.bot = False
+ self.msg.guild.get_member.return_value.__str__.return_value = "Woody"
self.msg.author.__str__ = MagicMock(return_value=self.msg.author.name)
self.msg.author.avatar_url_as.return_value = "picture-lemon.png"
- def test_is_valid_user_id_valid(self):
- """Should consider user IDs valid if they decode entirely to ASCII digits."""
- ids = (
- "NDcyMjY1OTQzMDYyNDEzMzMy",
- "NDc1MDczNjI5Mzk5NTQ3OTA0",
- "NDY3MjIzMjMwNjUwNzc3NjQx",
+ def test_extract_user_id_valid(self):
+ """Should consider user IDs valid if they decode into an integer ID."""
+ id_pairs = (
+ ("NDcyMjY1OTQzMDYyNDEzMzMy", 472265943062413332),
+ ("NDc1MDczNjI5Mzk5NTQ3OTA0", 475073629399547904),
+ ("NDY3MjIzMjMwNjUwNzc3NjQx", 467223230650777641),
)
- for user_id in ids:
- with self.subTest(user_id=user_id):
- result = TokenRemover.is_valid_user_id(user_id)
- self.assertTrue(result)
+ for token_id, user_id in id_pairs:
+ with self.subTest(token_id=token_id):
+ result = TokenRemover.extract_user_id(token_id)
+ self.assertEqual(result, user_id)
- def test_is_valid_user_id_invalid(self):
+ def test_extract_user_id_invalid(self):
"""Should consider non-digit and non-ASCII IDs invalid."""
ids = (
("SGVsbG8gd29ybGQ", "non-digit ASCII"),
@@ -52,8 +55,8 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
for user_id, msg in ids:
with self.subTest(msg=msg):
- result = TokenRemover.is_valid_user_id(user_id)
- self.assertFalse(result)
+ result = TokenRemover.extract_user_id(user_id)
+ self.assertIsNone(result)
def test_is_valid_timestamp_valid(self):
"""Should consider timestamps valid if they're greater than the Discord epoch."""
@@ -85,6 +88,34 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
result = TokenRemover.is_valid_timestamp(timestamp)
self.assertFalse(result)
+ def test_is_valid_hmac_valid(self):
+ """Should consider an HMAC valid if it has at least 3 unique characters."""
+ valid_hmacs = (
+ "VXmErH7j511turNpfURmb0rVNm8",
+ "Ysnu2wacjaKs7qnoo46S8Dm2us8",
+ "sJf6omBPORBPju3WJEIAcwW9Zds",
+ "s45jqDV_Iisn-symw0yDRrk_jf4",
+ )
+
+ for hmac in valid_hmacs:
+ with self.subTest(msg=hmac):
+ result = TokenRemover.is_maybe_valid_hmac(hmac)
+ self.assertTrue(result)
+
+ def test_is_invalid_hmac_invalid(self):
+ """Should consider an HMAC invalid if has fewer than 3 unique characters."""
+ invalid_hmacs = (
+ ("xxxxxxxxxxxxxxxxxx", "Single character"),
+ ("XxXxXxXxXxXxXxXxXx", "Single character alternating case"),
+ ("ASFasfASFasfASFASsf", "Three characters alternating-case"),
+ ("asdasdasdasdasdasdasd", "Three characters one case"),
+ )
+
+ for hmac, msg in invalid_hmacs:
+ with self.subTest(msg=msg):
+ result = TokenRemover.is_maybe_valid_hmac(hmac)
+ self.assertFalse(result)
+
def test_mod_log_property(self):
"""The `mod_log` property should ask the bot to return the `ModLog` cog."""
self.bot.get_cog.return_value = 'lemon'
@@ -132,7 +163,7 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
await cog.on_message(msg)
find_token_in_message.assert_not_called()
- @autospec("bot.cogs.token_remover", "TOKEN_RE")
+ @autospec("bot.exts.filters.token_remover", "TOKEN_RE")
def test_find_token_no_matches(self, token_re):
"""None should be returned if the regex matches no tokens in a message."""
token_re.finditer.return_value = ()
@@ -142,11 +173,18 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
self.assertIsNone(return_value)
token_re.finditer.assert_called_once_with(self.msg.content)
- @autospec(TokenRemover, "is_valid_user_id", "is_valid_timestamp")
- @autospec("bot.cogs.token_remover", "Token")
- @autospec("bot.cogs.token_remover", "TOKEN_RE")
- def test_find_token_valid_match(self, token_re, token_cls, is_valid_id, is_valid_timestamp):
- """The first match with a valid user ID and timestamp should be returned as a `Token`."""
+ @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac")
+ @autospec("bot.exts.filters.token_remover", "Token")
+ @autospec("bot.exts.filters.token_remover", "TOKEN_RE")
+ def test_find_token_valid_match(
+ self,
+ token_re,
+ token_cls,
+ extract_user_id,
+ is_valid_timestamp,
+ is_maybe_valid_hmac,
+ ):
+ """The first match with a valid user ID, timestamp, and HMAC should be returned as a `Token`."""
matches = [
mock.create_autospec(Match, spec_set=True, instance=True),
mock.create_autospec(Match, spec_set=True, instance=True),
@@ -158,23 +196,32 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
token_re.finditer.return_value = matches
token_cls.side_effect = tokens
- is_valid_id.side_effect = (False, True) # The 1st match will be invalid, 2nd one valid.
+ extract_user_id.side_effect = (None, True) # The 1st match will be invalid, 2nd one valid.
is_valid_timestamp.return_value = True
+ is_maybe_valid_hmac.return_value = True
return_value = TokenRemover.find_token_in_message(self.msg)
self.assertEqual(tokens[1], return_value)
token_re.finditer.assert_called_once_with(self.msg.content)
- @autospec(TokenRemover, "is_valid_user_id", "is_valid_timestamp")
- @autospec("bot.cogs.token_remover", "Token")
- @autospec("bot.cogs.token_remover", "TOKEN_RE")
- def test_find_token_invalid_matches(self, token_re, token_cls, is_valid_id, is_valid_timestamp):
- """None should be returned if no matches have valid user IDs or timestamps."""
+ @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac")
+ @autospec("bot.exts.filters.token_remover", "Token")
+ @autospec("bot.exts.filters.token_remover", "TOKEN_RE")
+ def test_find_token_invalid_matches(
+ self,
+ token_re,
+ token_cls,
+ extract_user_id,
+ is_valid_timestamp,
+ is_maybe_valid_hmac,
+ ):
+ """None should be returned if no matches have valid user IDs, HMACs, and timestamps."""
token_re.finditer.return_value = [mock.create_autospec(Match, spec_set=True, instance=True)]
token_cls.return_value = mock.create_autospec(Token, spec_set=True, instance=True)
- is_valid_id.return_value = False
+ extract_user_id.return_value = None
is_valid_timestamp.return_value = False
+ is_maybe_valid_hmac.return_value = False
return_value = TokenRemover.find_token_in_message(self.msg)
@@ -230,36 +277,85 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
results = [match[0] for match in results]
self.assertCountEqual((token_1, token_2), results)
- @autospec("bot.cogs.token_remover", "LOG_MESSAGE")
+ @autospec("bot.exts.filters.token_remover", "LOG_MESSAGE")
def test_format_log_message(self, log_message):
"""Should correctly format the log message with info from the message and token."""
- token = Token("NDY3MjIzMjMwNjUwNzc3NjQx", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
+ token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
log_message.format.return_value = "Howdy"
return_value = TokenRemover.format_log_message(self.msg, token)
self.assertEqual(return_value, log_message.format.return_value)
log_message.format.assert_called_once_with(
- author=self.msg.author,
- author_id=self.msg.author.id,
+ author=format_user(self.msg.author),
channel=self.msg.channel.mention,
user_id=token.user_id,
timestamp=token.timestamp,
hmac="x" * len(token.hmac),
)
+ @autospec("bot.exts.filters.token_remover", "UNKNOWN_USER_LOG_MESSAGE")
+ def test_format_userid_log_message_unknown(self, unknown_user_log_message):
+ """Should correctly format the user ID portion when the actual user it belongs to is unknown."""
+ token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
+ unknown_user_log_message.format.return_value = " Partner"
+ msg = MockMessage(id=555, content="hello world")
+ msg.guild.get_member.return_value = None
+
+ return_value = TokenRemover.format_userid_log_message(msg, token)
+
+ self.assertEqual(return_value, (unknown_user_log_message.format.return_value, False))
+ unknown_user_log_message.format.assert_called_once_with(user_id=472265943062413332)
+
+ @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE")
+ def test_format_userid_log_message_bot(self, known_user_log_message):
+ """Should correctly format the user ID portion when the ID belongs to a known bot."""
+ token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
+ known_user_log_message.format.return_value = " Partner"
+ msg = MockMessage(id=555, content="hello world")
+ msg.guild.get_member.return_value.__str__.return_value = "Sam"
+ msg.guild.get_member.return_value.bot = True
+
+ return_value = TokenRemover.format_userid_log_message(msg, token)
+
+ self.assertEqual(return_value, (known_user_log_message.format.return_value, False))
+
+ known_user_log_message.format.assert_called_once_with(
+ user_id=472265943062413332,
+ user_name="Sam",
+ kind="BOT",
+ )
+
+ @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE")
+ def test_format_log_message_user_token_user(self, user_token_message):
+ """Should correctly format the user ID portion when the ID belongs to a known user."""
+ token = Token("NDY3MjIzMjMwNjUwNzc3NjQx", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4")
+ user_token_message.format.return_value = "Partner"
+
+ return_value = TokenRemover.format_userid_log_message(self.msg, token)
+
+ self.assertEqual(return_value, (user_token_message.format.return_value, True))
+ user_token_message.format.assert_called_once_with(
+ user_id=467223230650777641,
+ user_name="Woody",
+ kind="USER",
+ )
+
@mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock)
- @autospec("bot.cogs.token_remover", "log")
- @autospec(TokenRemover, "format_log_message")
- async def test_take_action(self, format_log_message, logger, mod_log_property):
+ @autospec("bot.exts.filters.token_remover", "log")
+ @autospec(TokenRemover, "format_log_message", "format_userid_log_message")
+ async def test_take_action(self, format_log_message, format_userid_log_message, logger, mod_log_property):
"""Should delete the message and send a mod log."""
cog = TokenRemover(self.bot)
mod_log = mock.create_autospec(ModLog, spec_set=True, instance=True)
token = mock.create_autospec(Token, spec_set=True, instance=True)
+ token.user_id = "no-id"
log_msg = "testing123"
+ userid_log_message = "userid-log-message"
mod_log_property.return_value = mod_log
format_log_message.return_value = log_msg
+ format_userid_log_message.return_value = (userid_log_message, True)
await cog.take_action(self.msg, token)
@@ -269,6 +365,7 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
)
format_log_message.assert_called_once_with(self.msg, token)
+ format_userid_log_message.assert_called_once_with(self.msg, token)
logger.debug.assert_called_with(log_msg)
self.bot.stats.incr.assert_called_once_with("tokens.removed_tokens")
@@ -277,9 +374,10 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
icon_url=constants.Icons.token_removed,
colour=Colour(constants.Colours.soft_red),
title="Token removed!",
- text=log_msg,
+ text=log_msg + "\n" + userid_log_message,
thumbnail=self.msg.author.avatar_url_as.return_value,
- channel_id=constants.Channels.mod_alerts
+ channel_id=constants.Channels.mod_alerts,
+ ping_everyone=True,
)
@mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock)
@@ -299,7 +397,7 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase):
class TokenRemoverExtensionTests(unittest.TestCase):
"""Tests for the token_remover extension."""
- @autospec("bot.cogs.token_remover", "TokenRemover")
+ @autospec("bot.exts.filters.token_remover", "TokenRemover")
def test_extension_setup(self, cog):
"""The TokenRemover cog should be added."""
bot = MockBot()
diff --git a/tests/bot/exts/info/__init__.py b/tests/bot/exts/info/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/bot/exts/info/__init__.py
diff --git a/tests/bot/cogs/test_information.py b/tests/bot/exts/info/test_information.py
index 79c0e0ad3..daede54c5 100644
--- a/tests/bot/cogs/test_information.py
+++ b/tests/bot/exts/info/test_information.py
@@ -1,4 +1,3 @@
-import asyncio
import textwrap
import unittest
import unittest.mock
@@ -6,14 +5,14 @@ import unittest.mock
import discord
from bot import constants
-from bot.cogs import information
+from bot.exts.info import information
from bot.utils.checks import InWhitelistCheckFailure
from tests import helpers
-COG_PATH = "bot.cogs.information.Information"
+COG_PATH = "bot.exts.info.information.Information"
-class InformationCogTests(unittest.TestCase):
+class InformationCogTests(unittest.IsolatedAsyncioTestCase):
"""Tests the Information cog."""
@classmethod
@@ -29,16 +28,14 @@ class InformationCogTests(unittest.TestCase):
self.ctx = helpers.MockContext()
self.ctx.author.roles.append(self.moderator_role)
- def test_roles_command_command(self):
+ async def test_roles_command_command(self):
"""Test if the `role_info` command correctly returns the `moderator_role`."""
self.ctx.guild.roles.append(self.moderator_role)
self.cog.roles_info.can_run = unittest.mock.AsyncMock()
self.cog.roles_info.can_run.return_value = True
- coroutine = self.cog.roles_info.callback(self.cog, self.ctx)
-
- self.assertIsNone(asyncio.run(coroutine))
+ self.assertIsNone(await self.cog.roles_info(self.cog, self.ctx))
self.ctx.send.assert_called_once()
_, kwargs = self.ctx.send.call_args
@@ -48,7 +45,7 @@ class InformationCogTests(unittest.TestCase):
self.assertEqual(embed.colour, discord.Colour.blurple())
self.assertEqual(embed.description, f"\n`{self.moderator_role.id}` - {self.moderator_role.mention}\n")
- def test_role_info_command(self):
+ async def test_role_info_command(self):
"""Tests the `role info` command."""
dummy_role = helpers.MockRole(
name="Dummy",
@@ -73,9 +70,7 @@ class InformationCogTests(unittest.TestCase):
self.cog.role_info.can_run = unittest.mock.AsyncMock()
self.cog.role_info.can_run.return_value = True
- coroutine = self.cog.role_info.callback(self.cog, self.ctx, dummy_role, admin_role)
-
- self.assertIsNone(asyncio.run(coroutine))
+ self.assertIsNone(await self.cog.role_info(self.cog, self.ctx, dummy_role, admin_role))
self.assertEqual(self.ctx.send.call_count, 2)
@@ -97,80 +92,8 @@ class InformationCogTests(unittest.TestCase):
self.assertEqual(admin_embed.title, "Admins info")
self.assertEqual(admin_embed.colour, discord.Colour.red())
- @unittest.mock.patch('bot.cogs.information.time_since')
- def test_server_info_command(self, time_since_patch):
- time_since_patch.return_value = '2 days ago'
-
- self.ctx.guild = helpers.MockGuild(
- features=('lemons', 'apples'),
- region="The Moon",
- roles=[self.moderator_role],
- channels=[
- discord.TextChannel(
- state={},
- guild=self.ctx.guild,
- data={'id': 42, 'name': 'lemons-offering', 'position': 22, 'type': 'text'}
- ),
- discord.CategoryChannel(
- state={},
- guild=self.ctx.guild,
- data={'id': 5125, 'name': 'the-lemon-collection', 'position': 22, 'type': 'category'}
- ),
- discord.VoiceChannel(
- state={},
- guild=self.ctx.guild,
- data={'id': 15290, 'name': 'listen-to-lemon', 'position': 22, 'type': 'voice'}
- )
- ],
- members=[
- *(helpers.MockMember(status=discord.Status.online) for _ in range(2)),
- *(helpers.MockMember(status=discord.Status.idle) for _ in range(1)),
- *(helpers.MockMember(status=discord.Status.dnd) for _ in range(4)),
- *(helpers.MockMember(status=discord.Status.offline) for _ in range(3)),
- ],
- member_count=1_234,
- icon_url='a-lemon.jpg',
- )
-
- coroutine = self.cog.server_info.callback(self.cog, self.ctx)
- self.assertIsNone(asyncio.run(coroutine))
-
- time_since_patch.assert_called_once_with(self.ctx.guild.created_at, precision='days')
- _, kwargs = self.ctx.send.call_args
- embed = kwargs.pop('embed')
- self.assertEqual(embed.colour, discord.Colour.blurple())
- self.assertEqual(
- embed.description,
- textwrap.dedent(
- f"""
- **Server information**
- Created: {time_since_patch.return_value}
- Voice region: {self.ctx.guild.region}
- Features: {', '.join(self.ctx.guild.features)}
-
- **Channel counts**
- Category channels: 1
- Text channels: 1
- Voice channels: 1
- Staff channels: 0
-
- **Member counts**
- Members: {self.ctx.guild.member_count:,}
- Staff members: 0
- Roles: {len(self.ctx.guild.roles)}
-
- **Member statuses**
- {constants.Emojis.status_online} 2
- {constants.Emojis.status_idle} 1
- {constants.Emojis.status_dnd} 4
- {constants.Emojis.status_offline} 3
- """
- )
- )
- self.assertEqual(embed.thumbnail.url, 'a-lemon.jpg')
-
-class UserInfractionHelperMethodTests(unittest.TestCase):
+class UserInfractionHelperMethodTests(unittest.IsolatedAsyncioTestCase):
"""Tests for the helper methods of the `!user` command."""
def setUp(self):
@@ -180,7 +103,7 @@ class UserInfractionHelperMethodTests(unittest.TestCase):
self.cog = information.Information(self.bot)
self.member = helpers.MockMember(id=1234)
- def test_user_command_helper_method_get_requests(self):
+ async def test_user_command_helper_method_get_requests(self):
"""The helper methods should form the correct get requests."""
test_values = (
{
@@ -202,11 +125,11 @@ class UserInfractionHelperMethodTests(unittest.TestCase):
endpoint, params = test_value["expected_args"]
with self.subTest(method=helper_method, endpoint=endpoint, params=params):
- asyncio.run(helper_method(self.member))
+ await helper_method(self.member)
self.bot.api_client.get.assert_called_once_with(endpoint, params=params)
self.bot.api_client.get.reset_mock()
- def _method_subtests(self, method, test_values, default_header):
+ async def _method_subtests(self, method, test_values, default_header):
"""Helper method that runs the subtests for the different helper methods."""
for test_value in test_values:
api_response = test_value["api response"]
@@ -215,12 +138,12 @@ class UserInfractionHelperMethodTests(unittest.TestCase):
with self.subTest(method=method, api_response=api_response, expected_lines=expected_lines):
self.bot.api_client.get.return_value = api_response
- expected_output = "\n".join(default_header + expected_lines)
- actual_output = asyncio.run(method(self.member))
+ expected_output = "\n".join(expected_lines)
+ actual_output = await method(self.member)
- self.assertEqual(expected_output, actual_output)
+ self.assertEqual((default_header, expected_output), actual_output)
- def test_basic_user_infraction_counts_returns_correct_strings(self):
+ async def test_basic_user_infraction_counts_returns_correct_strings(self):
"""The method should correctly list both the total and active number of non-hidden infractions."""
test_values = (
# No infractions means zero counts
@@ -249,16 +172,16 @@ class UserInfractionHelperMethodTests(unittest.TestCase):
},
)
- header = ["**Infractions**"]
+ header = "Infractions"
- self._method_subtests(self.cog.basic_user_infraction_counts, test_values, header)
+ await self._method_subtests(self.cog.basic_user_infraction_counts, test_values, header)
- def test_expanded_user_infraction_counts_returns_correct_strings(self):
+ async def test_expanded_user_infraction_counts_returns_correct_strings(self):
"""The method should correctly list the total and active number of all infractions split by infraction type."""
test_values = (
{
"api response": [],
- "expected_lines": ["This user has never received an infraction."],
+ "expected_lines": ["No infractions"],
},
# Shows non-hidden inactive infraction as expected
{
@@ -304,24 +227,24 @@ class UserInfractionHelperMethodTests(unittest.TestCase):
},
)
- header = ["**Infractions**"]
+ header = "Infractions"
- self._method_subtests(self.cog.expanded_user_infraction_counts, test_values, header)
+ await self._method_subtests(self.cog.expanded_user_infraction_counts, test_values, header)
- def test_user_nomination_counts_returns_correct_strings(self):
+ async def test_user_nomination_counts_returns_correct_strings(self):
"""The method should list the number of active and historical nominations for the user."""
test_values = (
{
"api response": [],
- "expected_lines": ["This user has never been nominated."],
+ "expected_lines": ["No nominations"],
},
{
"api response": [{'active': True}],
- "expected_lines": ["This user is **currently** nominated (1 nomination in total)."],
+ "expected_lines": ["This user is **currently** nominated", "(1 nomination in total)"],
},
{
"api response": [{'active': True}, {'active': False}],
- "expected_lines": ["This user is **currently** nominated (2 nominations in total)."],
+ "expected_lines": ["This user is **currently** nominated", "(2 nominations in total)"],
},
{
"api response": [{'active': False}],
@@ -334,14 +257,14 @@ class UserInfractionHelperMethodTests(unittest.TestCase):
)
- header = ["**Nominations**"]
+ header = "Nominations"
- self._method_subtests(self.cog.user_nomination_counts, test_values, header)
+ await self._method_subtests(self.cog.user_nomination_counts, test_values, header)
[email protected]("bot.cogs.information.time_since", new=unittest.mock.MagicMock(return_value="1 year ago"))
[email protected]("bot.cogs.information.constants.MODERATION_CHANNELS", new=[50])
-class UserEmbedTests(unittest.TestCase):
[email protected]("bot.exts.info.information.time_since", new=unittest.mock.MagicMock(return_value="1 year ago"))
[email protected]("bot.exts.info.information.constants.MODERATION_CHANNELS", new=[50])
+class UserEmbedTests(unittest.IsolatedAsyncioTestCase):
"""Tests for the creation of the `!user` embed."""
def setUp(self):
@@ -350,32 +273,41 @@ class UserEmbedTests(unittest.TestCase):
self.bot.api_client.get = unittest.mock.AsyncMock()
self.cog = information.Information(self.bot)
- @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value=""))
- def test_create_user_embed_uses_string_representation_of_user_in_title_if_nick_is_not_available(self):
+ @unittest.mock.patch(
+ f"{COG_PATH}.basic_user_infraction_counts",
+ new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions"))
+ )
+ async def test_create_user_embed_uses_string_representation_of_user_in_title_if_nick_is_not_available(self):
"""The embed should use the string representation of the user if they don't have a nick."""
ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=1))
user = helpers.MockMember()
user.nick = None
user.__str__ = unittest.mock.Mock(return_value="Mr. Hemlock")
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
self.assertEqual(embed.title, "Mr. Hemlock")
- @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value=""))
- def test_create_user_embed_uses_nick_in_title_if_available(self):
+ @unittest.mock.patch(
+ f"{COG_PATH}.basic_user_infraction_counts",
+ new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions"))
+ )
+ async def test_create_user_embed_uses_nick_in_title_if_available(self):
"""The embed should use the nick if it's available."""
ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=1))
user = helpers.MockMember()
user.nick = "Cat lover"
user.__str__ = unittest.mock.Mock(return_value="Mr. Hemlock")
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
self.assertEqual(embed.title, "Cat lover (Mr. Hemlock)")
- @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value=""))
- def test_create_user_embed_ignores_everyone_role(self):
+ @unittest.mock.patch(
+ f"{COG_PATH}.basic_user_infraction_counts",
+ new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions"))
+ )
+ async def test_create_user_embed_ignores_everyone_role(self):
"""Created `!user` embeds should not contain mention of the @everyone-role."""
ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=1))
admins_role = helpers.MockRole(name='Admins')
@@ -384,80 +316,92 @@ class UserEmbedTests(unittest.TestCase):
# A `MockMember` has the @Everyone role by default; we add the Admins to that.
user = helpers.MockMember(roles=[admins_role], top_role=admins_role)
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
- self.assertIn("&Admins", embed.description)
- self.assertNotIn("&Everyone", embed.description)
+ self.assertIn("&Admins", embed.fields[1].value)
+ self.assertNotIn("&Everyone", embed.fields[1].value)
@unittest.mock.patch(f"{COG_PATH}.expanded_user_infraction_counts", new_callable=unittest.mock.AsyncMock)
@unittest.mock.patch(f"{COG_PATH}.user_nomination_counts", new_callable=unittest.mock.AsyncMock)
- def test_create_user_embed_expanded_information_in_moderation_channels(self, nomination_counts, infraction_counts):
+ async def test_create_user_embed_expanded_information_in_moderation_channels(
+ self,
+ nomination_counts,
+ infraction_counts
+ ):
"""The embed should contain expanded infractions and nomination info in mod channels."""
ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=50))
moderators_role = helpers.MockRole(name='Moderators')
moderators_role.colour = 100
- infraction_counts.return_value = "expanded infractions info"
- nomination_counts.return_value = "nomination info"
+ infraction_counts.return_value = ("Infractions", "expanded infractions info")
+ nomination_counts.return_value = ("Nominations", "nomination info")
user = helpers.MockMember(id=314, roles=[moderators_role], top_role=moderators_role)
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
infraction_counts.assert_called_once_with(user)
nomination_counts.assert_called_once_with(user)
self.assertEqual(
textwrap.dedent(f"""
- **User Information**
Created: {"1 year ago"}
Profile: {user.mention}
ID: {user.id}
+ """).strip(),
+ embed.fields[0].value
+ )
- **Member Information**
+ self.assertEqual(
+ textwrap.dedent(f"""
Joined: {"1 year ago"}
Roles: &Moderators
-
- expanded infractions info
-
- nomination info
""").strip(),
- embed.description
+ embed.fields[1].value
)
@unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new_callable=unittest.mock.AsyncMock)
- def test_create_user_embed_basic_information_outside_of_moderation_channels(self, infraction_counts):
+ async def test_create_user_embed_basic_information_outside_of_moderation_channels(self, infraction_counts):
"""The embed should contain only basic infraction data outside of mod channels."""
ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=100))
moderators_role = helpers.MockRole(name='Moderators')
moderators_role.colour = 100
- infraction_counts.return_value = "basic infractions info"
+ infraction_counts.return_value = ("Infractions", "basic infractions info")
user = helpers.MockMember(id=314, roles=[moderators_role], top_role=moderators_role)
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
infraction_counts.assert_called_once_with(user)
self.assertEqual(
textwrap.dedent(f"""
- **User Information**
Created: {"1 year ago"}
Profile: {user.mention}
ID: {user.id}
+ """).strip(),
+ embed.fields[0].value
+ )
- **Member Information**
+ self.assertEqual(
+ textwrap.dedent(f"""
Joined: {"1 year ago"}
Roles: &Moderators
-
- basic infractions info
""").strip(),
- embed.description
+ embed.fields[1].value
+ )
+
+ self.assertEqual(
+ "basic infractions info",
+ embed.fields[2].value
)
- @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value=""))
- def test_create_user_embed_uses_top_role_colour_when_user_has_roles(self):
+ @unittest.mock.patch(
+ f"{COG_PATH}.basic_user_infraction_counts",
+ new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions"))
+ )
+ async def test_create_user_embed_uses_top_role_colour_when_user_has_roles(self):
"""The embed should be created with the colour of the top role, if a top role is available."""
ctx = helpers.MockContext()
@@ -465,35 +409,41 @@ class UserEmbedTests(unittest.TestCase):
moderators_role.colour = 100
user = helpers.MockMember(id=314, roles=[moderators_role], top_role=moderators_role)
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
self.assertEqual(embed.colour, discord.Colour(moderators_role.colour))
- @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value=""))
- def test_create_user_embed_uses_blurple_colour_when_user_has_no_roles(self):
+ @unittest.mock.patch(
+ f"{COG_PATH}.basic_user_infraction_counts",
+ new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions"))
+ )
+ async def test_create_user_embed_uses_blurple_colour_when_user_has_no_roles(self):
"""The embed should be created with a blurple colour if the user has no assigned roles."""
ctx = helpers.MockContext()
user = helpers.MockMember(id=217)
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
self.assertEqual(embed.colour, discord.Colour.blurple())
- @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value=""))
- def test_create_user_embed_uses_png_format_of_user_avatar_as_thumbnail(self):
+ @unittest.mock.patch(
+ f"{COG_PATH}.basic_user_infraction_counts",
+ new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions"))
+ )
+ async def test_create_user_embed_uses_png_format_of_user_avatar_as_thumbnail(self):
"""The embed thumbnail should be set to the user's avatar in `png` format."""
ctx = helpers.MockContext()
user = helpers.MockMember(id=217)
user.avatar_url_as.return_value = "avatar url"
- embed = asyncio.run(self.cog.create_user_embed(ctx, user))
+ embed = await self.cog.create_user_embed(ctx, user)
user.avatar_url_as.assert_called_once_with(static_format="png")
self.assertEqual(embed.thumbnail.url, "avatar url")
[email protected]("bot.cogs.information.constants")
-class UserCommandTests(unittest.TestCase):
[email protected]("bot.exts.info.information.constants")
+class UserCommandTests(unittest.IsolatedAsyncioTestCase):
"""Tests for the `!user` command."""
def setUp(self):
@@ -509,76 +459,70 @@ class UserCommandTests(unittest.TestCase):
self.moderator = helpers.MockMember(id=2, name="riffautae", roles=[self.moderator_role])
self.target = helpers.MockMember(id=3, name="__fluzz__")
- def test_regular_member_cannot_target_another_member(self, constants):
+ # There's no way to mock the channel constant without deferring imports. The constant is
+ # used as a default value for a parameter, which gets defined upon import.
+ self.bot_command_channel = helpers.MockTextChannel(id=constants.Channels.bot_commands)
+
+ async def test_regular_member_cannot_target_another_member(self, constants):
"""A regular user should not be able to use `!user` targeting another user."""
constants.MODERATION_ROLES = [self.moderator_role.id]
-
ctx = helpers.MockContext(author=self.author)
- asyncio.run(self.cog.user_info.callback(self.cog, ctx, self.target))
+ await self.cog.user_info(self.cog, ctx, self.target)
ctx.send.assert_called_once_with("You may not use this command on users other than yourself.")
- def test_regular_member_cannot_use_command_outside_of_bot_commands(self, constants):
+ async def test_regular_member_cannot_use_command_outside_of_bot_commands(self, constants):
"""A regular user should not be able to use this command outside of bot-commands."""
constants.MODERATION_ROLES = [self.moderator_role.id]
constants.STAFF_ROLES = [self.moderator_role.id]
- constants.Channels.bot_commands = 50
-
ctx = helpers.MockContext(author=self.author, channel=helpers.MockTextChannel(id=100))
msg = "Sorry, but you may only use this command within <#50>."
with self.assertRaises(InWhitelistCheckFailure, msg=msg):
- asyncio.run(self.cog.user_info.callback(self.cog, ctx))
+ await self.cog.user_info(self.cog, ctx)
- @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock)
- def test_regular_user_may_use_command_in_bot_commands_channel(self, create_embed, constants):
+ @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed")
+ async def test_regular_user_may_use_command_in_bot_commands_channel(self, create_embed, constants):
"""A regular user should be allowed to use `!user` targeting themselves in bot-commands."""
constants.STAFF_ROLES = [self.moderator_role.id]
- constants.Channels.bot_commands = 50
+ ctx = helpers.MockContext(author=self.author, channel=self.bot_command_channel)
- ctx = helpers.MockContext(author=self.author, channel=helpers.MockTextChannel(id=50))
-
- asyncio.run(self.cog.user_info.callback(self.cog, ctx))
+ await self.cog.user_info(self.cog, ctx)
create_embed.assert_called_once_with(ctx, self.author)
ctx.send.assert_called_once()
- @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock)
- def test_regular_user_can_explicitly_target_themselves(self, create_embed, constants):
+ @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed")
+ async def test_regular_user_can_explicitly_target_themselves(self, create_embed, _):
"""A user should target itself with `!user` when a `user` argument was not provided."""
constants.STAFF_ROLES = [self.moderator_role.id]
- constants.Channels.bot_commands = 50
-
- ctx = helpers.MockContext(author=self.author, channel=helpers.MockTextChannel(id=50))
+ ctx = helpers.MockContext(author=self.author, channel=self.bot_command_channel)
- asyncio.run(self.cog.user_info.callback(self.cog, ctx, self.author))
+ await self.cog.user_info(self.cog, ctx, self.author)
create_embed.assert_called_once_with(ctx, self.author)
ctx.send.assert_called_once()
- @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock)
- def test_staff_members_can_bypass_channel_restriction(self, create_embed, constants):
+ @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed")
+ async def test_staff_members_can_bypass_channel_restriction(self, create_embed, constants):
"""Staff members should be able to bypass the bot-commands channel restriction."""
constants.STAFF_ROLES = [self.moderator_role.id]
- constants.Channels.bot_commands = 50
-
ctx = helpers.MockContext(author=self.moderator, channel=helpers.MockTextChannel(id=200))
- asyncio.run(self.cog.user_info.callback(self.cog, ctx))
+ await self.cog.user_info(self.cog, ctx)
create_embed.assert_called_once_with(ctx, self.moderator)
ctx.send.assert_called_once()
- @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock)
- def test_moderators_can_target_another_member(self, create_embed, constants):
+ @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed")
+ async def test_moderators_can_target_another_member(self, create_embed, constants):
"""A moderator should be able to use `!user` targeting another user."""
constants.MODERATION_ROLES = [self.moderator_role.id]
constants.STAFF_ROLES = [self.moderator_role.id]
-
ctx = helpers.MockContext(author=self.moderator, channel=helpers.MockTextChannel(id=50))
- asyncio.run(self.cog.user_info.callback(self.cog, ctx, self.target))
+ await self.cog.user_info(self.cog, ctx, self.target)
create_embed.assert_called_once_with(ctx, self.target)
ctx.send.assert_called_once()
diff --git a/tests/bot/exts/moderation/__init__.py b/tests/bot/exts/moderation/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/bot/exts/moderation/__init__.py
diff --git a/tests/bot/exts/moderation/infraction/__init__.py b/tests/bot/exts/moderation/infraction/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/bot/exts/moderation/infraction/__init__.py
diff --git a/tests/bot/exts/moderation/infraction/test_infractions.py b/tests/bot/exts/moderation/infraction/test_infractions.py
new file mode 100644
index 000000000..bf557a484
--- /dev/null
+++ b/tests/bot/exts/moderation/infraction/test_infractions.py
@@ -0,0 +1,201 @@
+import textwrap
+import unittest
+from unittest.mock import AsyncMock, MagicMock, Mock, patch
+
+from bot.constants import Event
+from bot.exts.moderation.infraction.infractions import Infractions
+from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole
+
+
+class TruncationTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for ban and kick command reason truncation."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.cog = Infractions(self.bot)
+ self.user = MockMember(id=1234, top_role=MockRole(id=3577, position=10))
+ self.target = MockMember(id=1265, top_role=MockRole(id=9876, position=0))
+ self.guild = MockGuild(id=4567)
+ self.ctx = MockContext(bot=self.bot, author=self.user, guild=self.guild)
+
+ @patch("bot.exts.moderation.infraction._utils.get_active_infraction")
+ @patch("bot.exts.moderation.infraction._utils.post_infraction")
+ async def test_apply_ban_reason_truncation(self, post_infraction_mock, get_active_mock):
+ """Should truncate reason for `ctx.guild.ban`."""
+ get_active_mock.return_value = None
+ post_infraction_mock.return_value = {"foo": "bar"}
+
+ self.cog.apply_infraction = AsyncMock()
+ self.bot.get_cog.return_value = AsyncMock()
+ self.cog.mod_log.ignore = Mock()
+ self.ctx.guild.ban = Mock()
+
+ await self.cog.apply_ban(self.ctx, self.target, "foo bar" * 3000)
+ self.ctx.guild.ban.assert_called_once_with(
+ self.target,
+ reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="..."),
+ delete_message_days=0
+ )
+ self.cog.apply_infraction.assert_awaited_once_with(
+ self.ctx, {"foo": "bar"}, self.target, self.ctx.guild.ban.return_value
+ )
+
+ @patch("bot.exts.moderation.infraction._utils.post_infraction")
+ async def test_apply_kick_reason_truncation(self, post_infraction_mock):
+ """Should truncate reason for `Member.kick`."""
+ post_infraction_mock.return_value = {"foo": "bar"}
+
+ self.cog.apply_infraction = AsyncMock()
+ self.cog.mod_log.ignore = Mock()
+ self.target.kick = Mock()
+
+ await self.cog.apply_kick(self.ctx, self.target, "foo bar" * 3000)
+ self.target.kick.assert_called_once_with(reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="..."))
+ self.cog.apply_infraction.assert_awaited_once_with(
+ self.ctx, {"foo": "bar"}, self.target, self.target.kick.return_value
+ )
+
+
+@patch("bot.exts.moderation.infraction.infractions.constants.Roles.voice_verified", new=123456)
+class VoiceBanTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for voice ban related functions and commands."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.mod = MockMember(top_role=10)
+ self.user = MockMember(top_role=1, roles=[MockRole(id=123456)])
+ self.guild = MockGuild()
+ self.ctx = MockContext(bot=self.bot, author=self.mod)
+ self.cog = Infractions(self.bot)
+
+ async def test_permanent_voice_ban(self):
+ """Should call voice ban applying function without expiry."""
+ self.cog.apply_voice_ban = AsyncMock()
+ self.assertIsNone(await self.cog.voiceban(self.cog, self.ctx, self.user, reason="foobar"))
+ self.cog.apply_voice_ban.assert_awaited_once_with(self.ctx, self.user, "foobar")
+
+ async def test_temporary_voice_ban(self):
+ """Should call voice ban applying function with expiry."""
+ self.cog.apply_voice_ban = AsyncMock()
+ self.assertIsNone(await self.cog.tempvoiceban(self.cog, self.ctx, self.user, "baz", reason="foobar"))
+ self.cog.apply_voice_ban.assert_awaited_once_with(self.ctx, self.user, "foobar", expires_at="baz")
+
+ async def test_voice_unban(self):
+ """Should call infraction pardoning function."""
+ self.cog.pardon_infraction = AsyncMock()
+ self.assertIsNone(await self.cog.unvoiceban(self.cog, self.ctx, self.user))
+ self.cog.pardon_infraction.assert_awaited_once_with(self.ctx, "voice_ban", self.user)
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction")
+ @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction")
+ async def test_voice_ban_user_have_active_infraction(self, get_active_infraction, post_infraction_mock):
+ """Should return early when user already have Voice Ban infraction."""
+ get_active_infraction.return_value = {"foo": "bar"}
+ self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar"))
+ get_active_infraction.assert_awaited_once_with(self.ctx, self.user, "voice_ban")
+ post_infraction_mock.assert_not_awaited()
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction")
+ @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction")
+ async def test_voice_ban_infraction_post_failed(self, get_active_infraction, post_infraction_mock):
+ """Should return early when posting infraction fails."""
+ self.cog.mod_log.ignore = MagicMock()
+ get_active_infraction.return_value = None
+ post_infraction_mock.return_value = None
+ self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar"))
+ post_infraction_mock.assert_awaited_once()
+ self.cog.mod_log.ignore.assert_not_called()
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction")
+ @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction")
+ async def test_voice_ban_infraction_post_add_kwargs(self, get_active_infraction, post_infraction_mock):
+ """Should pass all kwargs passed to apply_voice_ban to post_infraction."""
+ get_active_infraction.return_value = None
+ # We don't want that this continue yet
+ post_infraction_mock.return_value = None
+ self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar", my_kwarg=23))
+ post_infraction_mock.assert_awaited_once_with(
+ self.ctx, self.user, "voice_ban", "foobar", active=True, my_kwarg=23
+ )
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction")
+ @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction")
+ async def test_voice_ban_mod_log_ignore(self, get_active_infraction, post_infraction_mock):
+ """Should ignore Voice Verified role removing."""
+ self.cog.mod_log.ignore = MagicMock()
+ self.cog.apply_infraction = AsyncMock()
+ self.user.remove_roles = MagicMock(return_value="my_return_value")
+
+ get_active_infraction.return_value = None
+ post_infraction_mock.return_value = {"foo": "bar"}
+
+ self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar"))
+ self.cog.mod_log.ignore.assert_called_once_with(Event.member_update, self.user.id)
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction")
+ @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction")
+ async def test_voice_ban_apply_infraction(self, get_active_infraction, post_infraction_mock):
+ """Should ignore Voice Verified role removing."""
+ self.cog.mod_log.ignore = MagicMock()
+ self.cog.apply_infraction = AsyncMock()
+ self.user.remove_roles = MagicMock(return_value="my_return_value")
+
+ get_active_infraction.return_value = None
+ post_infraction_mock.return_value = {"foo": "bar"}
+
+ self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar"))
+ self.user.remove_roles.assert_called_once_with(self.cog._voice_verified_role, reason="foobar")
+ self.cog.apply_infraction.assert_awaited_once_with(self.ctx, {"foo": "bar"}, self.user, "my_return_value")
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction")
+ @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction")
+ async def test_voice_ban_truncate_reason(self, get_active_infraction, post_infraction_mock):
+ """Should truncate reason for voice ban."""
+ self.cog.mod_log.ignore = MagicMock()
+ self.cog.apply_infraction = AsyncMock()
+ self.user.remove_roles = MagicMock(return_value="my_return_value")
+
+ get_active_infraction.return_value = None
+ post_infraction_mock.return_value = {"foo": "bar"}
+
+ self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar" * 3000))
+ self.user.remove_roles.assert_called_once_with(
+ self.cog._voice_verified_role, reason=textwrap.shorten("foobar" * 3000, 512, placeholder="...")
+ )
+ self.cog.apply_infraction.assert_awaited_once_with(self.ctx, {"foo": "bar"}, self.user, "my_return_value")
+
+ async def test_voice_unban_user_not_found(self):
+ """Should include info to return dict when user was not found from guild."""
+ self.guild.get_member.return_value = None
+ result = await self.cog.pardon_voice_ban(self.user.id, self.guild, "foobar")
+ self.assertEqual(result, {"Info": "User was not found in the guild."})
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.notify_pardon")
+ @patch("bot.exts.moderation.infraction.infractions.format_user")
+ async def test_voice_unban_user_found(self, format_user_mock, notify_pardon_mock):
+ """Should add role back with ignoring, notify user and return log dictionary.."""
+ self.guild.get_member.return_value = self.user
+ notify_pardon_mock.return_value = True
+ format_user_mock.return_value = "my-user"
+
+ result = await self.cog.pardon_voice_ban(self.user.id, self.guild, "foobar")
+ self.assertEqual(result, {
+ "Member": "my-user",
+ "DM": "Sent"
+ })
+ notify_pardon_mock.assert_awaited_once()
+
+ @patch("bot.exts.moderation.infraction.infractions._utils.notify_pardon")
+ @patch("bot.exts.moderation.infraction.infractions.format_user")
+ async def test_voice_unban_dm_fail(self, format_user_mock, notify_pardon_mock):
+ """Should add role back with ignoring, notify user and return log dictionary.."""
+ self.guild.get_member.return_value = self.user
+ notify_pardon_mock.return_value = False
+ format_user_mock.return_value = "my-user"
+
+ result = await self.cog.pardon_voice_ban(self.user.id, self.guild, "foobar")
+ self.assertEqual(result, {
+ "Member": "my-user",
+ "DM": "**Failed**"
+ })
+ notify_pardon_mock.assert_awaited_once()
diff --git a/tests/bot/exts/moderation/infraction/test_utils.py b/tests/bot/exts/moderation/infraction/test_utils.py
new file mode 100644
index 000000000..5b62463e0
--- /dev/null
+++ b/tests/bot/exts/moderation/infraction/test_utils.py
@@ -0,0 +1,359 @@
+import unittest
+from collections import namedtuple
+from datetime import datetime
+from unittest.mock import AsyncMock, MagicMock, call, patch
+
+from discord import Embed, Forbidden, HTTPException, NotFound
+
+from bot.api import ResponseCodeError
+from bot.constants import Colours, Icons
+from bot.exts.moderation.infraction import _utils as utils
+from tests.helpers import MockBot, MockContext, MockMember, MockUser
+
+
+class ModerationUtilsTests(unittest.IsolatedAsyncioTestCase):
+ """Tests Moderation utils."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.member = MockMember(id=1234)
+ self.user = MockUser(id=1234)
+ self.ctx = MockContext(bot=self.bot, author=self.member)
+
+ async def test_post_user(self):
+ """Should POST a new user and return the response if successful or otherwise send an error message."""
+ user = MockUser(discriminator=5678, id=1234, name="Test user")
+ not_user = MagicMock(discriminator=3333, id=5678, name="Wrong user")
+ test_cases = [
+ {
+ "user": user,
+ "post_result": "bar",
+ "raise_error": None,
+ "payload": {
+ "discriminator": 5678,
+ "id": self.user.id,
+ "in_guild": False,
+ "name": "Test user",
+ "roles": []
+ }
+ },
+ {
+ "user": self.member,
+ "post_result": "foo",
+ "raise_error": ResponseCodeError(MagicMock(status=400), "foo"),
+ "payload": {
+ "discriminator": 0,
+ "id": self.member.id,
+ "in_guild": False,
+ "name": "Name unknown",
+ "roles": []
+ }
+ },
+ {
+ "user": not_user,
+ "post_result": "bar",
+ "raise_error": None,
+ "payload": {
+ "discriminator": not_user.discriminator,
+ "id": not_user.id,
+ "in_guild": False,
+ "name": not_user.name,
+ "roles": []
+ }
+ }
+ ]
+
+ for case in test_cases:
+ user = case["user"]
+ post_result = case["post_result"]
+ raise_error = case["raise_error"]
+ payload = case["payload"]
+
+ with self.subTest(user=user, post_result=post_result, raise_error=raise_error, payload=payload):
+ self.bot.api_client.post.reset_mock(side_effect=True)
+ self.ctx.bot.api_client.post.return_value = post_result
+
+ self.ctx.bot.api_client.post.side_effect = raise_error
+
+ result = await utils.post_user(self.ctx, user)
+
+ if raise_error:
+ self.assertIsNone(result)
+ self.ctx.send.assert_awaited_once()
+ self.assertIn(str(raise_error.status), self.ctx.send.call_args[0][0])
+ else:
+ self.assertEqual(result, post_result)
+ self.bot.api_client.post.assert_awaited_once_with("bot/users", json=payload)
+
+ async def test_get_active_infraction(self):
+ """
+ Should request the API for active infractions and return infraction if the user has one or `None` otherwise.
+
+ A message should be sent to the context indicating a user already has an infraction, if that's the case.
+ """
+ test_case = namedtuple("test_case", ["get_return_value", "expected_output", "infraction_nr", "send_msg"])
+ test_cases = [
+ test_case([], None, None, True),
+ test_case([{"id": 123987}], {"id": 123987}, "123987", False),
+ test_case([{"id": 123987}], {"id": 123987}, "123987", True)
+ ]
+
+ for case in test_cases:
+ with self.subTest(return_value=case.get_return_value, expected=case.expected_output):
+ self.bot.api_client.get.reset_mock()
+ self.ctx.send.reset_mock()
+
+ params = {
+ "active": "true",
+ "type": "ban",
+ "user__id": str(self.member.id)
+ }
+
+ self.bot.api_client.get.return_value = case.get_return_value
+
+ result = await utils.get_active_infraction(self.ctx, self.member, "ban", send_msg=case.send_msg)
+ self.assertEqual(result, case.expected_output)
+ self.bot.api_client.get.assert_awaited_once_with("bot/infractions", params=params)
+
+ if case.send_msg and case.get_return_value:
+ self.ctx.send.assert_awaited_once()
+ sent_message = self.ctx.send.call_args[0][0]
+ self.assertIn(case.infraction_nr, sent_message)
+ self.assertIn("ban", sent_message)
+ else:
+ self.ctx.send.assert_not_awaited()
+
+ @patch("bot.exts.moderation.infraction._utils.send_private_embed")
+ async def test_notify_infraction(self, send_private_embed_mock):
+ """
+ Should send an embed of a certain format as a DM and return `True` if DM successful.
+
+ Appealable infractions should have the appeal message in the embed's footer.
+ """
+ test_cases = [
+ {
+ "args": (self.user, "ban", "2020-02-26 09:20 (23 hours and 59 minutes)"),
+ "expected_output": Embed(
+ title=utils.INFRACTION_TITLE,
+ description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format(
+ type="Ban",
+ expires="2020-02-26 09:20 (23 hours and 59 minutes)",
+ reason="No reason provided."
+ ),
+ colour=Colours.soft_red,
+ url=utils.RULES_URL
+ ).set_author(
+ name=utils.INFRACTION_AUTHOR_NAME,
+ url=utils.RULES_URL,
+ icon_url=Icons.token_removed
+ ).set_footer(text=utils.INFRACTION_APPEAL_FOOTER),
+ "send_result": True
+ },
+ {
+ "args": (self.user, "warning", None, "Test reason."),
+ "expected_output": Embed(
+ title=utils.INFRACTION_TITLE,
+ description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format(
+ type="Warning",
+ expires="N/A",
+ reason="Test reason."
+ ),
+ colour=Colours.soft_red,
+ url=utils.RULES_URL
+ ).set_author(
+ name=utils.INFRACTION_AUTHOR_NAME,
+ url=utils.RULES_URL,
+ icon_url=Icons.token_removed
+ ),
+ "send_result": False
+ },
+ {
+ "args": (self.user, "note", None, None, Icons.defcon_denied),
+ "expected_output": Embed(
+ title=utils.INFRACTION_TITLE,
+ description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format(
+ type="Note",
+ expires="N/A",
+ reason="No reason provided."
+ ),
+ colour=Colours.soft_red,
+ url=utils.RULES_URL
+ ).set_author(
+ name=utils.INFRACTION_AUTHOR_NAME,
+ url=utils.RULES_URL,
+ icon_url=Icons.defcon_denied
+ ),
+ "send_result": False
+ },
+ {
+ "args": (self.user, "mute", "2020-02-26 09:20 (23 hours and 59 minutes)", "Test", Icons.defcon_denied),
+ "expected_output": Embed(
+ title=utils.INFRACTION_TITLE,
+ description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format(
+ type="Mute",
+ expires="2020-02-26 09:20 (23 hours and 59 minutes)",
+ reason="Test"
+ ),
+ colour=Colours.soft_red,
+ url=utils.RULES_URL
+ ).set_author(
+ name=utils.INFRACTION_AUTHOR_NAME,
+ url=utils.RULES_URL,
+ icon_url=Icons.defcon_denied
+ ).set_footer(text=utils.INFRACTION_APPEAL_FOOTER),
+ "send_result": False
+ },
+ {
+ "args": (self.user, "mute", None, "foo bar" * 4000, Icons.defcon_denied),
+ "expected_output": Embed(
+ title=utils.INFRACTION_TITLE,
+ description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format(
+ type="Mute",
+ expires="N/A",
+ reason="foo bar" * 4000
+ )[:2045] + "...",
+ colour=Colours.soft_red,
+ url=utils.RULES_URL
+ ).set_author(
+ name=utils.INFRACTION_AUTHOR_NAME,
+ url=utils.RULES_URL,
+ icon_url=Icons.defcon_denied
+ ).set_footer(text=utils.INFRACTION_APPEAL_FOOTER),
+ "send_result": True
+ }
+ ]
+
+ for case in test_cases:
+ with self.subTest(args=case["args"], expected=case["expected_output"], send=case["send_result"]):
+ send_private_embed_mock.reset_mock()
+
+ send_private_embed_mock.return_value = case["send_result"]
+ result = await utils.notify_infraction(*case["args"])
+
+ self.assertEqual(case["send_result"], result)
+
+ embed = send_private_embed_mock.call_args[0][1]
+
+ self.assertEqual(embed.to_dict(), case["expected_output"].to_dict())
+
+ send_private_embed_mock.assert_awaited_once_with(case["args"][0], embed)
+
+ @patch("bot.exts.moderation.infraction._utils.send_private_embed")
+ async def test_notify_pardon(self, send_private_embed_mock):
+ """Should send an embed of a certain format as a DM and return `True` if DM successful."""
+ test_case = namedtuple("test_case", ["args", "icon", "send_result"])
+ test_cases = [
+ test_case((self.user, "Test title", "Example content"), Icons.user_verified, True),
+ test_case((self.user, "Test title", "Example content", Icons.user_update), Icons.user_update, False)
+ ]
+
+ for case in test_cases:
+ expected = Embed(
+ description="Example content",
+ colour=Colours.soft_green
+ ).set_author(
+ name="Test title",
+ icon_url=case.icon
+ )
+
+ with self.subTest(args=case.args, expected=expected):
+ send_private_embed_mock.reset_mock()
+
+ send_private_embed_mock.return_value = case.send_result
+
+ result = await utils.notify_pardon(*case.args)
+ self.assertEqual(case.send_result, result)
+
+ embed = send_private_embed_mock.call_args[0][1]
+ self.assertEqual(embed.to_dict(), expected.to_dict())
+
+ send_private_embed_mock.assert_awaited_once_with(case.args[0], embed)
+
+ async def test_send_private_embed(self):
+ """Should DM the user and return `True` on success or `False` on failure."""
+ embed = Embed(title="Test", description="Test val")
+
+ test_case = namedtuple("test_case", ["expected_output", "raised_exception"])
+ test_cases = [
+ test_case(True, None),
+ test_case(False, HTTPException(AsyncMock(), AsyncMock())),
+ test_case(False, Forbidden(AsyncMock(), AsyncMock())),
+ test_case(False, NotFound(AsyncMock(), AsyncMock()))
+ ]
+
+ for case in test_cases:
+ with self.subTest(expected=case.expected_output, raised=case.raised_exception):
+ self.user.send.reset_mock(side_effect=True)
+ self.user.send.side_effect = case.raised_exception
+
+ result = await utils.send_private_embed(self.user, embed)
+
+ self.assertEqual(result, case.expected_output)
+ self.user.send.assert_awaited_once_with(embed=embed)
+
+
+class TestPostInfraction(unittest.IsolatedAsyncioTestCase):
+ """Tests for the `post_infraction` function."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.member = MockMember(id=1234)
+ self.user = MockUser(id=1234)
+ self.ctx = MockContext(bot=self.bot, author=self.member)
+
+ async def test_normal_post_infraction(self):
+ """Should return response from POST request if there are no errors."""
+ now = datetime.now()
+ payload = {
+ "actor": self.ctx.author.id,
+ "hidden": True,
+ "reason": "Test reason",
+ "type": "ban",
+ "user": self.member.id,
+ "active": False,
+ "expires_at": now.isoformat()
+ }
+
+ self.ctx.bot.api_client.post.return_value = "foo"
+ actual = await utils.post_infraction(self.ctx, self.member, "ban", "Test reason", now, True, False)
+
+ self.assertEqual(actual, "foo")
+ self.ctx.bot.api_client.post.assert_awaited_once_with("bot/infractions", json=payload)
+
+ async def test_unknown_error_post_infraction(self):
+ """Should send an error message to chat when a non-400 error occurs."""
+ self.ctx.bot.api_client.post.side_effect = ResponseCodeError(AsyncMock(), AsyncMock())
+ self.ctx.bot.api_client.post.side_effect.status = 500
+
+ actual = await utils.post_infraction(self.ctx, self.user, "ban", "Test reason")
+ self.assertIsNone(actual)
+
+ self.assertTrue("500" in self.ctx.send.call_args[0][0])
+
+ @patch("bot.exts.moderation.infraction._utils.post_user", return_value=None)
+ async def test_user_not_found_none_post_infraction(self, post_user_mock):
+ """Should abort and return `None` when a new user fails to be posted."""
+ self.bot.api_client.post.side_effect = ResponseCodeError(MagicMock(status=400), {"user": "foo"})
+
+ actual = await utils.post_infraction(self.ctx, self.user, "mute", "Test reason")
+ self.assertIsNone(actual)
+ post_user_mock.assert_awaited_once_with(self.ctx, self.user)
+
+ @patch("bot.exts.moderation.infraction._utils.post_user", return_value="bar")
+ async def test_first_fail_second_success_user_post_infraction(self, post_user_mock):
+ """Should post the user if they don't exist, POST infraction again, and return the response if successful."""
+ payload = {
+ "actor": self.ctx.author.id,
+ "hidden": False,
+ "reason": "Test reason",
+ "type": "mute",
+ "user": self.user.id,
+ "active": True
+ }
+
+ self.bot.api_client.post.side_effect = [ResponseCodeError(MagicMock(status=400), {"user": "foo"}), "foo"]
+
+ actual = await utils.post_infraction(self.ctx, self.user, "mute", "Test reason")
+ self.assertEqual(actual, "foo")
+ self.bot.api_client.post.assert_has_awaits([call("bot/infractions", json=payload)] * 2)
+ post_user_mock.assert_awaited_once_with(self.ctx, self.user)
diff --git a/tests/bot/exts/moderation/test_incidents.py b/tests/bot/exts/moderation/test_incidents.py
new file mode 100644
index 000000000..cbf7f7bcf
--- /dev/null
+++ b/tests/bot/exts/moderation/test_incidents.py
@@ -0,0 +1,770 @@
+import asyncio
+import enum
+import logging
+import typing as t
+import unittest
+from unittest.mock import AsyncMock, MagicMock, call, patch
+
+import aiohttp
+import discord
+
+from bot.constants import Colours
+from bot.exts.moderation import incidents
+from tests.helpers import (
+ MockAsyncWebhook,
+ MockAttachment,
+ MockBot,
+ MockMember,
+ MockMessage,
+ MockReaction,
+ MockRole,
+ MockTextChannel,
+ MockUser,
+)
+
+
+class MockAsyncIterable:
+ """
+ Helper for mocking asynchronous for loops.
+
+ It does not appear that the `unittest` library currently provides anything that would
+ allow us to simply mock an async iterator, such as `discord.TextChannel.history`.
+
+ We therefore write our own helper to wrap a regular synchronous iterable, and feed
+ its values via `__anext__` rather than `__next__`.
+
+ This class was written for the purposes of testing the `Incidents` cog - it may not
+ be generic enough to be placed in the `tests.helpers` module.
+ """
+
+ def __init__(self, messages: t.Iterable):
+ """Take a sync iterable to be wrapped."""
+ self.iter_messages = iter(messages)
+
+ def __aiter__(self):
+ """Return `self` as we provide the `__anext__` method."""
+ return self
+
+ async def __anext__(self):
+ """
+ Feed the next item, or raise `StopAsyncIteration`.
+
+ Since we're wrapping a sync iterator, it will communicate that it has been depleted
+ by raising a `StopIteration`. The `async for` construct does not expect it, and we
+ therefore need to substitute it for the appropriate exception type.
+ """
+ try:
+ return next(self.iter_messages)
+ except StopIteration:
+ raise StopAsyncIteration
+
+
+class MockSignal(enum.Enum):
+ A = "A"
+ B = "B"
+
+
+mock_404 = discord.NotFound(
+ response=MagicMock(aiohttp.ClientResponse), # Mock the erroneous response
+ message="Not found",
+)
+
+
+class TestDownloadFile(unittest.IsolatedAsyncioTestCase):
+ """Collection of tests for the `download_file` helper function."""
+
+ async def test_download_file_success(self):
+ """If `to_file` succeeds, function returns the acquired `discord.File`."""
+ file = MagicMock(discord.File, filename="bigbadlemon.jpg")
+ attachment = MockAttachment(to_file=AsyncMock(return_value=file))
+
+ acquired_file = await incidents.download_file(attachment)
+ self.assertIs(file, acquired_file)
+
+ async def test_download_file_404(self):
+ """If `to_file` encounters a 404, function handles the exception & returns None."""
+ attachment = MockAttachment(to_file=AsyncMock(side_effect=mock_404))
+
+ acquired_file = await incidents.download_file(attachment)
+ self.assertIsNone(acquired_file)
+
+ async def test_download_file_fail(self):
+ """If `to_file` fails on a non-404 error, function logs the exception & returns None."""
+ arbitrary_error = discord.HTTPException(MagicMock(aiohttp.ClientResponse), "Arbitrary API error")
+ attachment = MockAttachment(to_file=AsyncMock(side_effect=arbitrary_error))
+
+ with self.assertLogs(logger=incidents.log, level=logging.ERROR):
+ acquired_file = await incidents.download_file(attachment)
+
+ self.assertIsNone(acquired_file)
+
+
+class TestMakeEmbed(unittest.IsolatedAsyncioTestCase):
+ """Collection of tests for the `make_embed` helper function."""
+
+ async def test_make_embed_actioned(self):
+ """Embed is coloured green and footer contains 'Actioned' when `outcome=Signal.ACTIONED`."""
+ embed, file = await incidents.make_embed(MockMessage(), incidents.Signal.ACTIONED, MockMember())
+
+ self.assertEqual(embed.colour.value, Colours.soft_green)
+ self.assertIn("Actioned", embed.footer.text)
+
+ async def test_make_embed_not_actioned(self):
+ """Embed is coloured red and footer contains 'Rejected' when `outcome=Signal.NOT_ACTIONED`."""
+ embed, file = await incidents.make_embed(MockMessage(), incidents.Signal.NOT_ACTIONED, MockMember())
+
+ self.assertEqual(embed.colour.value, Colours.soft_red)
+ self.assertIn("Rejected", embed.footer.text)
+
+ async def test_make_embed_content(self):
+ """Incident content appears as embed description."""
+ incident = MockMessage(content="this is an incident")
+ embed, file = await incidents.make_embed(incident, incidents.Signal.ACTIONED, MockMember())
+
+ self.assertEqual(incident.content, embed.description)
+
+ async def test_make_embed_with_attachment_succeeds(self):
+ """Incident's attachment is downloaded and displayed in the embed's image field."""
+ file = MagicMock(discord.File, filename="bigbadjoe.jpg")
+ attachment = MockAttachment(filename="bigbadjoe.jpg")
+ incident = MockMessage(content="this is an incident", attachments=[attachment])
+
+ # Patch `download_file` to return our `file`
+ with patch("bot.exts.moderation.incidents.download_file", AsyncMock(return_value=file)):
+ embed, returned_file = await incidents.make_embed(incident, incidents.Signal.ACTIONED, MockMember())
+
+ self.assertIs(file, returned_file)
+ self.assertEqual("attachment://bigbadjoe.jpg", embed.image.url)
+
+ async def test_make_embed_with_attachment_fails(self):
+ """Incident's attachment fails to download, proxy url is linked instead."""
+ attachment = MockAttachment(proxy_url="discord.com/bigbadjoe.jpg")
+ incident = MockMessage(content="this is an incident", attachments=[attachment])
+
+ # Patch `download_file` to return None as if the download failed
+ with patch("bot.exts.moderation.incidents.download_file", AsyncMock(return_value=None)):
+ embed, returned_file = await incidents.make_embed(incident, incidents.Signal.ACTIONED, MockMember())
+
+ self.assertIsNone(returned_file)
+
+ # The author name field is simply expected to have something in it, we do not assert the message
+ self.assertGreater(len(embed.author.name), 0)
+ self.assertEqual(embed.author.url, "discord.com/bigbadjoe.jpg") # However, it should link the exact url
+
+
+@patch("bot.constants.Channels.incidents", 123)
+class TestIsIncident(unittest.TestCase):
+ """
+ Collection of tests for the `is_incident` helper function.
+
+ In `setUp`, we will create a mock message which should qualify as an incident. Each
+ test case will then mutate this instance to make it **not** qualify, in various ways.
+
+ Notice that we patch the #incidents channel id globally for this class.
+ """
+
+ def setUp(self) -> None:
+ """Prepare a mock message which should qualify as an incident."""
+ self.incident = MockMessage(
+ channel=MockTextChannel(id=123),
+ content="this is an incident",
+ author=MockUser(bot=False),
+ pinned=False,
+ )
+
+ def test_is_incident_true(self):
+ """Message qualifies as an incident if unchanged."""
+ self.assertTrue(incidents.is_incident(self.incident))
+
+ def check_false(self):
+ """Assert that `self.incident` does **not** qualify as an incident."""
+ self.assertFalse(incidents.is_incident(self.incident))
+
+ def test_is_incident_false_channel(self):
+ """Message doesn't qualify if sent outside of #incidents."""
+ self.incident.channel = MockTextChannel(id=456)
+ self.check_false()
+
+ def test_is_incident_false_content(self):
+ """Message doesn't qualify if content begins with hash symbol."""
+ self.incident.content = "# this is a comment message"
+ self.check_false()
+
+ def test_is_incident_false_author(self):
+ """Message doesn't qualify if author is a bot."""
+ self.incident.author = MockUser(bot=True)
+ self.check_false()
+
+ def test_is_incident_false_pinned(self):
+ """Message doesn't qualify if it is pinned."""
+ self.incident.pinned = True
+ self.check_false()
+
+
+class TestOwnReactions(unittest.TestCase):
+ """Assertions for the `own_reactions` function."""
+
+ def test_own_reactions(self):
+ """Only bot's own emoji are extracted from the input incident."""
+ reactions = (
+ MockReaction(emoji="A", me=True),
+ MockReaction(emoji="B", me=True),
+ MockReaction(emoji="C", me=False),
+ )
+ message = MockMessage(reactions=reactions)
+ self.assertSetEqual(incidents.own_reactions(message), {"A", "B"})
+
+
+@patch("bot.exts.moderation.incidents.ALL_SIGNALS", {"A", "B"})
+class TestHasSignals(unittest.TestCase):
+ """
+ Assertions for the `has_signals` function.
+
+ We patch `ALL_SIGNALS` globally. Each test function then patches `own_reactions`
+ as appropriate.
+ """
+
+ def test_has_signals_true(self):
+ """True when `own_reactions` returns all emoji in `ALL_SIGNALS`."""
+ message = MockMessage()
+ own_reactions = MagicMock(return_value={"A", "B"})
+
+ with patch("bot.exts.moderation.incidents.own_reactions", own_reactions):
+ self.assertTrue(incidents.has_signals(message))
+
+ def test_has_signals_false(self):
+ """False when `own_reactions` does not return all emoji in `ALL_SIGNALS`."""
+ message = MockMessage()
+ own_reactions = MagicMock(return_value={"A", "C"})
+
+ with patch("bot.exts.moderation.incidents.own_reactions", own_reactions):
+ self.assertFalse(incidents.has_signals(message))
+
+
+@patch("bot.exts.moderation.incidents.Signal", MockSignal)
+class TestAddSignals(unittest.IsolatedAsyncioTestCase):
+ """
+ Assertions for the `add_signals` coroutine.
+
+ These are all fairly similar and could go into a single test function, but I found the
+ patching & sub-testing fairly awkward in that case and decided to split them up
+ to avoid unnecessary syntax noise.
+ """
+
+ def setUp(self):
+ """Prepare a mock incident message for tests to use."""
+ self.incident = MockMessage()
+
+ @patch("bot.exts.moderation.incidents.own_reactions", MagicMock(return_value=set()))
+ async def test_add_signals_missing(self):
+ """All emoji are added when none are present."""
+ await incidents.add_signals(self.incident)
+ self.incident.add_reaction.assert_has_calls([call("A"), call("B")])
+
+ @patch("bot.exts.moderation.incidents.own_reactions", MagicMock(return_value={"A"}))
+ async def test_add_signals_partial(self):
+ """Only missing emoji are added when some are present."""
+ await incidents.add_signals(self.incident)
+ self.incident.add_reaction.assert_has_calls([call("B")])
+
+ @patch("bot.exts.moderation.incidents.own_reactions", MagicMock(return_value={"A", "B"}))
+ async def test_add_signals_present(self):
+ """No emoji are added when all are present."""
+ await incidents.add_signals(self.incident)
+ self.incident.add_reaction.assert_not_called()
+
+
+class TestIncidents(unittest.IsolatedAsyncioTestCase):
+ """
+ Tests for bound methods of the `Incidents` cog.
+
+ Use this as a base class for `Incidents` tests - it will prepare a fresh instance
+ for each test function, but not make any assertions on its own. Tests can mutate
+ the instance as they wish.
+ """
+
+ def setUp(self):
+ """
+ Prepare a fresh `Incidents` instance for each test.
+
+ Note that this will not schedule `crawl_incidents` in the background, as everything
+ is being mocked. The `crawl_task` attribute will end up being None.
+ """
+ self.cog_instance = incidents.Incidents(MockBot())
+
+
+@patch("asyncio.sleep", AsyncMock()) # Prevent the coro from sleeping to speed up the test
+class TestCrawlIncidents(TestIncidents):
+ """
+ Tests for the `Incidents.crawl_incidents` coroutine.
+
+ Apart from `test_crawl_incidents_waits_until_cache_ready`, all tests in this class
+ will patch the return values of `is_incident` and `has_signal` and then observe
+ whether the `AsyncMock` for `add_signals` was awaited or not.
+
+ The `add_signals` mock is added by each test separately to ensure it is clean (has not
+ been awaited by another test yet). The mock can be reset, but this appears to be the
+ cleaner way.
+
+ For each test, we inject a mock channel with a history of 1 message only (see: `setUp`).
+ """
+
+ def setUp(self):
+ """For each test, ensure `bot.get_channel` returns a channel with 1 arbitrary message."""
+ super().setUp() # First ensure we get `cog_instance` from parent
+
+ incidents_history = MagicMock(return_value=MockAsyncIterable([MockMessage()]))
+ self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(history=incidents_history))
+
+ async def test_crawl_incidents_waits_until_cache_ready(self):
+ """
+ The coroutine will await the `wait_until_guild_available` event.
+
+ Since this task is schedule in the `__init__`, it is critical that it waits for the
+ cache to be ready, so that it can safely get the #incidents channel.
+ """
+ await self.cog_instance.crawl_incidents()
+ self.cog_instance.bot.wait_until_guild_available.assert_awaited()
+
+ @patch("bot.exts.moderation.incidents.add_signals", AsyncMock())
+ @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=False)) # Message doesn't qualify
+ @patch("bot.exts.moderation.incidents.has_signals", MagicMock(return_value=False))
+ async def test_crawl_incidents_noop_if_is_not_incident(self):
+ """Signals are not added for a non-incident message."""
+ await self.cog_instance.crawl_incidents()
+ incidents.add_signals.assert_not_awaited()
+
+ @patch("bot.exts.moderation.incidents.add_signals", AsyncMock())
+ @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)) # Message qualifies
+ @patch("bot.exts.moderation.incidents.has_signals", MagicMock(return_value=True)) # But already has signals
+ async def test_crawl_incidents_noop_if_message_already_has_signals(self):
+ """Signals are not added for messages which already have them."""
+ await self.cog_instance.crawl_incidents()
+ incidents.add_signals.assert_not_awaited()
+
+ @patch("bot.exts.moderation.incidents.add_signals", AsyncMock())
+ @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)) # Message qualifies
+ @patch("bot.exts.moderation.incidents.has_signals", MagicMock(return_value=False)) # And doesn't have signals
+ async def test_crawl_incidents_add_signals_called(self):
+ """Message has signals added as it does not have them yet and qualifies as an incident."""
+ await self.cog_instance.crawl_incidents()
+ incidents.add_signals.assert_awaited_once()
+
+
+class TestArchive(TestIncidents):
+ """Tests for the `Incidents.archive` coroutine."""
+
+ async def test_archive_webhook_not_found(self):
+ """
+ Method recovers and returns False when the webhook is not found.
+
+ Implicitly, this also tests that the error is handled internally and doesn't
+ propagate out of the method, which is just as important.
+ """
+ self.cog_instance.bot.fetch_webhook = AsyncMock(side_effect=mock_404)
+ self.assertFalse(
+ await self.cog_instance.archive(incident=MockMessage(), outcome=MagicMock(), actioned_by=MockMember())
+ )
+
+ async def test_archive_relays_incident(self):
+ """
+ If webhook is found, method relays `incident` properly.
+
+ This test will assert that the fetched webhook's `send` method is fed the correct arguments,
+ and that the `archive` method returns True.
+ """
+ webhook = MockAsyncWebhook()
+ self.cog_instance.bot.fetch_webhook = AsyncMock(return_value=webhook) # Patch in our webhook
+
+ # Define our own `incident` to be archived
+ incident = MockMessage(
+ content="this is an incident",
+ author=MockUser(name="author_name", avatar_url="author_avatar"),
+ id=123,
+ )
+ built_embed = MagicMock(discord.Embed, id=123) # We patch `make_embed` to return this
+
+ with patch("bot.exts.moderation.incidents.make_embed", AsyncMock(return_value=(built_embed, None))):
+ archive_return = await self.cog_instance.archive(incident, MagicMock(value="A"), MockMember())
+
+ # Now we check that the webhook was given the correct args, and that `archive` returned True
+ webhook.send.assert_called_once_with(
+ embed=built_embed,
+ username="author_name",
+ avatar_url="author_avatar",
+ file=None,
+ )
+ self.assertTrue(archive_return)
+
+ async def test_archive_clyde_username(self):
+ """
+ The archive webhook username is cleansed using `sub_clyde`.
+
+ Discord will reject any webhook with "clyde" in the username field, as it impersonates
+ the official Clyde bot. Since we do not control what the username will be (the incident
+ author name is used), we must ensure the name is cleansed, otherwise the relay may fail.
+
+ This test assumes the username is passed as a kwarg. If this test fails, please review
+ whether the passed argument is being retrieved correctly.
+ """
+ webhook = MockAsyncWebhook()
+ self.cog_instance.bot.fetch_webhook = AsyncMock(return_value=webhook)
+
+ message_from_clyde = MockMessage(author=MockUser(name="clyde the great"))
+ await self.cog_instance.archive(message_from_clyde, MagicMock(incidents.Signal), MockMember())
+
+ self.assertNotIn("clyde", webhook.send.call_args.kwargs["username"])
+
+
+class TestMakeConfirmationTask(TestIncidents):
+ """
+ Tests for the `Incidents.make_confirmation_task` method.
+
+ Writing tests for this method is difficult, as it mostly just delegates the provided
+ information elsewhere. There is very little internal logic. Whether our approach
+ works conceptually is difficult to prove using unit tests.
+ """
+
+ def test_make_confirmation_task_check(self):
+ """
+ The internal check will recognize the passed incident.
+
+ This is a little tricky - we first pass a message with a specific `id` in, and then
+ retrieve the built check from the `call_args` of the `wait_for` method. This relies
+ on the check being passed as a kwarg.
+
+ Once the check is retrieved, we assert that it gives True for our incident's `id`,
+ and False for any other.
+
+ If this function begins to fail, first check that `created_check` is being retrieved
+ correctly. It should be the function that is built locally in the tested method.
+ """
+ self.cog_instance.make_confirmation_task(MockMessage(id=123))
+
+ self.cog_instance.bot.wait_for.assert_called_once()
+ created_check = self.cog_instance.bot.wait_for.call_args.kwargs["check"]
+
+ # The `message_id` matches the `id` of our incident
+ self.assertTrue(created_check(payload=MagicMock(message_id=123)))
+
+ # This `message_id` does not match
+ self.assertFalse(created_check(payload=MagicMock(message_id=0)))
+
+
+@patch("bot.exts.moderation.incidents.ALLOWED_ROLES", {1, 2})
+@patch("bot.exts.moderation.incidents.Incidents.make_confirmation_task", AsyncMock()) # Generic awaitable
+class TestProcessEvent(TestIncidents):
+ """Tests for the `Incidents.process_event` coroutine."""
+
+ async def test_process_event_bad_role(self):
+ """The reaction is removed when the author lacks all allowed roles."""
+ incident = MockMessage()
+ member = MockMember(roles=[MockRole(id=0)]) # Must have role 1 or 2
+
+ await self.cog_instance.process_event("reaction", incident, member)
+ incident.remove_reaction.assert_called_once_with("reaction", member)
+
+ async def test_process_event_bad_emoji(self):
+ """
+ The reaction is removed when an invalid emoji is used.
+
+ This requires that we pass in a `member` with valid roles, as we need the role check
+ to succeed.
+ """
+ incident = MockMessage()
+ member = MockMember(roles=[MockRole(id=1)]) # Member has allowed role
+
+ await self.cog_instance.process_event("invalid_signal", incident, member)
+ incident.remove_reaction.assert_called_once_with("invalid_signal", member)
+
+ async def test_process_event_no_archive_on_investigating(self):
+ """Message is not archived on `Signal.INVESTIGATING`."""
+ with patch("bot.exts.moderation.incidents.Incidents.archive", AsyncMock()) as mocked_archive:
+ await self.cog_instance.process_event(
+ reaction=incidents.Signal.INVESTIGATING.value,
+ incident=MockMessage(),
+ member=MockMember(roles=[MockRole(id=1)]),
+ )
+
+ mocked_archive.assert_not_called()
+
+ async def test_process_event_no_delete_if_archive_fails(self):
+ """
+ Original message is not deleted when `Incidents.archive` returns False.
+
+ This is the way of signaling that the relay failed, and we should not remove the original,
+ as that would result in losing the incident record.
+ """
+ incident = MockMessage()
+
+ with patch("bot.exts.moderation.incidents.Incidents.archive", AsyncMock(return_value=False)):
+ await self.cog_instance.process_event(
+ reaction=incidents.Signal.ACTIONED.value,
+ incident=incident,
+ member=MockMember(roles=[MockRole(id=1)])
+ )
+
+ incident.delete.assert_not_called()
+
+ async def test_process_event_confirmation_task_is_awaited(self):
+ """Task given by `Incidents.make_confirmation_task` is awaited before method exits."""
+ mock_task = AsyncMock()
+
+ with patch("bot.exts.moderation.incidents.Incidents.make_confirmation_task", mock_task):
+ await self.cog_instance.process_event(
+ reaction=incidents.Signal.ACTIONED.value,
+ incident=MockMessage(),
+ member=MockMember(roles=[MockRole(id=1)])
+ )
+
+ mock_task.assert_awaited()
+
+ async def test_process_event_confirmation_task_timeout_is_handled(self):
+ """
+ Confirmation task `asyncio.TimeoutError` is handled gracefully.
+
+ We have `make_confirmation_task` return a mock with a side effect, and then catch the
+ exception should it propagate out of `process_event`. This is so that we can then manually
+ fail the test with a more informative message than just the plain traceback.
+ """
+ mock_task = AsyncMock(side_effect=asyncio.TimeoutError())
+
+ try:
+ with patch("bot.exts.moderation.incidents.Incidents.make_confirmation_task", mock_task):
+ await self.cog_instance.process_event(
+ reaction=incidents.Signal.ACTIONED.value,
+ incident=MockMessage(),
+ member=MockMember(roles=[MockRole(id=1)])
+ )
+ except asyncio.TimeoutError:
+ self.fail("TimeoutError was not handled gracefully, and propagated out of `process_event`!")
+
+
+class TestResolveMessage(TestIncidents):
+ """Tests for the `Incidents.resolve_message` coroutine."""
+
+ async def test_resolve_message_pass_message_id(self):
+ """Method will call `_get_message` with the passed `message_id`."""
+ await self.cog_instance.resolve_message(123)
+ self.cog_instance.bot._connection._get_message.assert_called_once_with(123)
+
+ async def test_resolve_message_in_cache(self):
+ """
+ No API call is made if the queried message exists in the cache.
+
+ We mock the `_get_message` return value regardless of input. Whether it finds the message
+ internally is considered d.py's responsibility, not ours.
+ """
+ cached_message = MockMessage(id=123)
+ self.cog_instance.bot._connection._get_message = MagicMock(return_value=cached_message)
+
+ return_value = await self.cog_instance.resolve_message(123)
+
+ self.assertIs(return_value, cached_message)
+ self.cog_instance.bot.get_channel.assert_not_called() # The `fetch_message` line was never hit
+
+ async def test_resolve_message_not_in_cache(self):
+ """
+ The message is retrieved from the API if it isn't cached.
+
+ This is desired behaviour for messages which exist, but were sent before the bot's
+ current session.
+ """
+ self.cog_instance.bot._connection._get_message = MagicMock(return_value=None) # Cache returns None
+
+ # API returns our message
+ uncached_message = MockMessage()
+ fetch_message = AsyncMock(return_value=uncached_message)
+ self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(fetch_message=fetch_message))
+
+ retrieved_message = await self.cog_instance.resolve_message(123)
+ self.assertIs(retrieved_message, uncached_message)
+
+ async def test_resolve_message_doesnt_exist(self):
+ """
+ If the API returns a 404, the function handles it gracefully and returns None.
+
+ This is an edge-case happening with racing events - event A will relay the message
+ to the archive and delete the original. Once event B acquires the `event_lock`,
+ it will not find the message in the cache, and will ask the API.
+ """
+ self.cog_instance.bot._connection._get_message = MagicMock(return_value=None) # Cache returns None
+
+ fetch_message = AsyncMock(side_effect=mock_404)
+ self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(fetch_message=fetch_message))
+
+ self.assertIsNone(await self.cog_instance.resolve_message(123))
+
+ async def test_resolve_message_fetch_fails(self):
+ """
+ Non-404 errors are handled, logged & None is returned.
+
+ In contrast with a 404, this should make an error-level log. We assert that at least
+ one such log was made - we do not make any assertions about the log's message.
+ """
+ self.cog_instance.bot._connection._get_message = MagicMock(return_value=None) # Cache returns None
+
+ arbitrary_error = discord.HTTPException(
+ response=MagicMock(aiohttp.ClientResponse),
+ message="Arbitrary error",
+ )
+ fetch_message = AsyncMock(side_effect=arbitrary_error)
+ self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(fetch_message=fetch_message))
+
+ with self.assertLogs(logger=incidents.log, level=logging.ERROR):
+ self.assertIsNone(await self.cog_instance.resolve_message(123))
+
+
+@patch("bot.constants.Channels.incidents", 123)
+class TestOnRawReactionAdd(TestIncidents):
+ """
+ Tests for the `Incidents.on_raw_reaction_add` listener.
+
+ Writing tests for this listener comes with additional complexity due to the listener
+ awaiting the `crawl_task` task. See `asyncSetUp` for further details, which attempts
+ to make unit testing this function possible.
+ """
+
+ def setUp(self):
+ """
+ Prepare & assign `payload` attribute.
+
+ This attribute represents an *ideal* payload which will not be rejected by the
+ listener. As each test will receive a fresh instance, it can be mutated to
+ observe how the listener's behaviour changes with different attributes on
+ the passed payload.
+ """
+ super().setUp() # Ensure `cog_instance` is assigned
+
+ self.payload = MagicMock(
+ discord.RawReactionActionEvent,
+ channel_id=123, # Patched at class level
+ message_id=456,
+ member=MockMember(bot=False),
+ emoji="reaction",
+ )
+
+ async def asyncSetUp(self): # noqa: N802
+ """
+ Prepare an empty task and assign it as `crawl_task`.
+
+ It appears that the `unittest` framework does not provide anything for mocking
+ asyncio tasks. An `AsyncMock` instance can be called and then awaited, however,
+ it does not provide the `done` method or any other parts of the `asyncio.Task`
+ interface.
+
+ Although we do not need to make any assertions about the task itself while
+ testing the listener, the code will still await it and call the `done` method,
+ and so we must inject something that will not fail on either action.
+
+ Note that this is done in an `asyncSetUp`, which runs after `setUp`.
+ The justification is that creating an actual task requires the event
+ loop to be ready, which is not the case in the `setUp`.
+ """
+ mock_task = asyncio.create_task(AsyncMock()()) # Mock async func, then a coro
+ self.cog_instance.crawl_task = mock_task
+
+ async def test_on_raw_reaction_add_wrong_channel(self):
+ """
+ Events outside of #incidents will be ignored.
+
+ We check this by asserting that `resolve_message` was never queried.
+ """
+ self.payload.channel_id = 0
+ self.cog_instance.resolve_message = AsyncMock()
+
+ await self.cog_instance.on_raw_reaction_add(self.payload)
+ self.cog_instance.resolve_message.assert_not_called()
+
+ async def test_on_raw_reaction_add_user_is_bot(self):
+ """
+ Events dispatched by bot accounts will be ignored.
+
+ We check this by asserting that `resolve_message` was never queried.
+ """
+ self.payload.member = MockMember(bot=True)
+ self.cog_instance.resolve_message = AsyncMock()
+
+ await self.cog_instance.on_raw_reaction_add(self.payload)
+ self.cog_instance.resolve_message.assert_not_called()
+
+ async def test_on_raw_reaction_add_message_doesnt_exist(self):
+ """
+ Listener gracefully handles the case where `resolve_message` gives None.
+
+ We check this by asserting that `process_event` was never called.
+ """
+ self.cog_instance.process_event = AsyncMock()
+ self.cog_instance.resolve_message = AsyncMock(return_value=None)
+
+ await self.cog_instance.on_raw_reaction_add(self.payload)
+ self.cog_instance.process_event.assert_not_called()
+
+ async def test_on_raw_reaction_add_message_is_not_an_incident(self):
+ """
+ The event won't be processed if the related message is not an incident.
+
+ This is an edge-case that can happen if someone manually leaves a reaction
+ on a pinned message, or a comment.
+
+ We check this by asserting that `process_event` was never called.
+ """
+ self.cog_instance.process_event = AsyncMock()
+ self.cog_instance.resolve_message = AsyncMock(return_value=MockMessage())
+
+ with patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=False)):
+ await self.cog_instance.on_raw_reaction_add(self.payload)
+
+ self.cog_instance.process_event.assert_not_called()
+
+ async def test_on_raw_reaction_add_valid_event_is_processed(self):
+ """
+ If the reaction event is valid, it is passed to `process_event`.
+
+ This is the case when everything goes right:
+ * The reaction was placed in #incidents, and not by a bot
+ * The message was found successfully
+ * The message qualifies as an incident
+
+ Additionally, we check that all arguments were passed as expected.
+ """
+ incident = MockMessage(id=1)
+
+ self.cog_instance.process_event = AsyncMock()
+ self.cog_instance.resolve_message = AsyncMock(return_value=incident)
+
+ with patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)):
+ await self.cog_instance.on_raw_reaction_add(self.payload)
+
+ self.cog_instance.process_event.assert_called_with(
+ "reaction", # Defined in `self.payload`
+ incident,
+ self.payload.member,
+ )
+
+
+class TestOnMessage(TestIncidents):
+ """
+ Tests for the `Incidents.on_message` listener.
+
+ Notice the decorators mocking the `is_incident` return value. The `is_incidents`
+ function is tested in `TestIsIncident` - here we do not worry about it.
+ """
+
+ @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True))
+ async def test_on_message_incident(self):
+ """Messages qualifying as incidents are passed to `add_signals`."""
+ incident = MockMessage()
+
+ with patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) as mock_add_signals:
+ await self.cog_instance.on_message(incident)
+
+ mock_add_signals.assert_called_once_with(incident)
+
+ @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=False))
+ async def test_on_message_non_incident(self):
+ """Messages not qualifying as incidents are ignored."""
+ with patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) as mock_add_signals:
+ await self.cog_instance.on_message(MockMessage())
+
+ mock_add_signals.assert_not_called()
diff --git a/tests/bot/cogs/moderation/test_modlog.py b/tests/bot/exts/moderation/test_modlog.py
index f2809f40a..f8f142484 100644
--- a/tests/bot/cogs/moderation/test_modlog.py
+++ b/tests/bot/exts/moderation/test_modlog.py
@@ -2,7 +2,7 @@ import unittest
import discord
-from bot.cogs.moderation.modlog import ModLog
+from bot.exts.moderation.modlog import ModLog
from tests.helpers import MockBot, MockTextChannel
diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py
new file mode 100644
index 000000000..104293d8e
--- /dev/null
+++ b/tests/bot/exts/moderation/test_silence.py
@@ -0,0 +1,502 @@
+import asyncio
+import unittest
+from datetime import datetime, timezone
+from unittest import mock
+from unittest.mock import Mock
+
+from async_rediscache import RedisSession
+from discord import PermissionOverwrite
+
+from bot.constants import Channels, Guild, Roles
+from bot.exts.moderation import silence
+from tests.helpers import MockBot, MockContext, MockTextChannel, autospec
+
+redis_session = None
+redis_loop = asyncio.get_event_loop()
+
+
+def setUpModule(): # noqa: N802
+ """Create and connect to the fakeredis session."""
+ global redis_session
+ redis_session = RedisSession(use_fakeredis=True)
+ redis_loop.run_until_complete(redis_session.connect())
+
+
+def tearDownModule(): # noqa: N802
+ """Close the fakeredis session."""
+ if redis_session:
+ redis_loop.run_until_complete(redis_session.close())
+
+
+# Have to subclass it because builtins can't be patched.
+class PatchedDatetime(datetime):
+ """A datetime object with a mocked now() function."""
+
+ now = mock.create_autospec(datetime, "now")
+
+
+class SilenceNotifierTests(unittest.IsolatedAsyncioTestCase):
+ def setUp(self) -> None:
+ self.alert_channel = MockTextChannel()
+ self.notifier = silence.SilenceNotifier(self.alert_channel)
+ self.notifier.stop = self.notifier_stop_mock = Mock()
+ self.notifier.start = self.notifier_start_mock = Mock()
+
+ def test_add_channel_adds_channel(self):
+ """Channel is added to `_silenced_channels` with the current loop."""
+ channel = Mock()
+ with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels:
+ self.notifier.add_channel(channel)
+ silenced_channels.__setitem__.assert_called_with(channel, self.notifier._current_loop)
+
+ def test_add_channel_starts_loop(self):
+ """Loop is started if `_silenced_channels` was empty."""
+ self.notifier.add_channel(Mock())
+ self.notifier_start_mock.assert_called_once()
+
+ def test_add_channel_skips_start_with_channels(self):
+ """Loop start is not called when `_silenced_channels` is not empty."""
+ with mock.patch.object(self.notifier, "_silenced_channels"):
+ self.notifier.add_channel(Mock())
+ self.notifier_start_mock.assert_not_called()
+
+ def test_remove_channel_removes_channel(self):
+ """Channel is removed from `_silenced_channels`."""
+ channel = Mock()
+ with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels:
+ self.notifier.remove_channel(channel)
+ silenced_channels.__delitem__.assert_called_with(channel)
+
+ def test_remove_channel_stops_loop(self):
+ """Notifier loop is stopped if `_silenced_channels` is empty after remove."""
+ with mock.patch.object(self.notifier, "_silenced_channels", __bool__=lambda _: False):
+ self.notifier.remove_channel(Mock())
+ self.notifier_stop_mock.assert_called_once()
+
+ def test_remove_channel_skips_stop_with_channels(self):
+ """Notifier loop is not stopped if `_silenced_channels` is not empty after remove."""
+ self.notifier.remove_channel(Mock())
+ self.notifier_stop_mock.assert_not_called()
+
+ async def test_notifier_private_sends_alert(self):
+ """Alert is sent on 15 min intervals."""
+ test_cases = (900, 1800, 2700)
+ for current_loop in test_cases:
+ with self.subTest(current_loop=current_loop):
+ with mock.patch.object(self.notifier, "_current_loop", new=current_loop):
+ await self.notifier._notifier()
+ self.alert_channel.send.assert_called_once_with(
+ f"<@&{Roles.moderators}> currently silenced channels: "
+ )
+ self.alert_channel.send.reset_mock()
+
+ async def test_notifier_skips_alert(self):
+ """Alert is skipped on first loop or not an increment of 900."""
+ test_cases = (0, 15, 5000)
+ for current_loop in test_cases:
+ with self.subTest(current_loop=current_loop):
+ with mock.patch.object(self.notifier, "_current_loop", new=current_loop):
+ await self.notifier._notifier()
+ self.alert_channel.send.assert_not_called()
+
+
+@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False)
+class SilenceCogTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for the general functionality of the Silence cog."""
+
+ @autospec(silence, "Scheduler", pass_mocks=False)
+ def setUp(self) -> None:
+ self.bot = MockBot()
+ self.cog = silence.Silence(self.bot)
+
+ @autospec(silence, "SilenceNotifier", pass_mocks=False)
+ async def test_async_init_got_guild(self):
+ """Bot got guild after it became available."""
+ await self.cog._async_init()
+ self.bot.wait_until_guild_available.assert_awaited_once()
+ self.bot.get_guild.assert_called_once_with(Guild.id)
+
+ @autospec(silence, "SilenceNotifier", pass_mocks=False)
+ async def test_async_init_got_role(self):
+ """Got `Roles.verified` role from guild."""
+ guild = self.bot.get_guild()
+ guild.get_role.side_effect = lambda id_: Mock(id=id_)
+
+ await self.cog._async_init()
+ self.assertEqual(self.cog._verified_role.id, Roles.verified)
+
+ @autospec(silence, "SilenceNotifier", pass_mocks=False)
+ async def test_async_init_got_channels(self):
+ """Got channels from bot."""
+ self.bot.get_channel.side_effect = lambda id_: MockTextChannel(id=id_)
+
+ await self.cog._async_init()
+ self.assertEqual(self.cog._mod_alerts_channel.id, Channels.mod_alerts)
+
+ @autospec(silence, "SilenceNotifier")
+ async def test_async_init_got_notifier(self, notifier):
+ """Notifier was started with channel."""
+ self.bot.get_channel.side_effect = lambda id_: MockTextChannel(id=id_)
+
+ await self.cog._async_init()
+ notifier.assert_called_once_with(MockTextChannel(id=Channels.mod_log))
+ self.assertEqual(self.cog.notifier, notifier.return_value)
+
+ @autospec(silence, "SilenceNotifier", pass_mocks=False)
+ async def test_async_init_rescheduled(self):
+ """`_reschedule_` coroutine was awaited."""
+ self.cog._reschedule = mock.create_autospec(self.cog._reschedule)
+ await self.cog._async_init()
+ self.cog._reschedule.assert_awaited_once_with()
+
+ def test_cog_unload_cancelled_tasks(self):
+ """The init task was cancelled."""
+ self.cog._init_task = asyncio.Future()
+ self.cog.cog_unload()
+
+ # It's too annoying to test cancel_all since it's a done callback and wrapped in a lambda.
+ self.assertTrue(self.cog._init_task.cancelled())
+
+ @autospec("discord.ext.commands", "has_any_role")
+ @mock.patch.object(silence, "MODERATION_ROLES", new=(1, 2, 3))
+ async def test_cog_check(self, role_check):
+ """Role check was called with `MODERATION_ROLES`"""
+ ctx = MockContext()
+ role_check.return_value.predicate = mock.AsyncMock()
+
+ await self.cog.cog_check(ctx)
+ role_check.assert_called_once_with(*(1, 2, 3))
+ role_check.return_value.predicate.assert_awaited_once_with(ctx)
+
+
+@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False)
+class RescheduleTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for the rescheduling of cached unsilences."""
+
+ @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False)
+ def setUp(self):
+ self.bot = MockBot()
+ self.cog = silence.Silence(self.bot)
+ self.cog._unsilence_wrapper = mock.create_autospec(self.cog._unsilence_wrapper)
+
+ with mock.patch.object(self.cog, "_reschedule", autospec=True):
+ asyncio.run(self.cog._async_init()) # Populate instance attributes.
+
+ async def test_skipped_missing_channel(self):
+ """Did nothing because the channel couldn't be retrieved."""
+ self.cog.unsilence_timestamps.items.return_value = [(123, -1), (123, 1), (123, 10000000000)]
+ self.bot.get_channel.return_value = None
+
+ await self.cog._reschedule()
+
+ self.cog.notifier.add_channel.assert_not_called()
+ self.cog._unsilence_wrapper.assert_not_called()
+ self.cog.scheduler.schedule_later.assert_not_called()
+
+ async def test_added_permanent_to_notifier(self):
+ """Permanently silenced channels were added to the notifier."""
+ channels = [MockTextChannel(id=123), MockTextChannel(id=456)]
+ self.bot.get_channel.side_effect = channels
+ self.cog.unsilence_timestamps.items.return_value = [(123, -1), (456, -1)]
+
+ await self.cog._reschedule()
+
+ self.cog.notifier.add_channel.assert_any_call(channels[0])
+ self.cog.notifier.add_channel.assert_any_call(channels[1])
+
+ self.cog._unsilence_wrapper.assert_not_called()
+ self.cog.scheduler.schedule_later.assert_not_called()
+
+ async def test_unsilenced_expired(self):
+ """Unsilenced expired silences."""
+ channels = [MockTextChannel(id=123), MockTextChannel(id=456)]
+ self.bot.get_channel.side_effect = channels
+ self.cog.unsilence_timestamps.items.return_value = [(123, 100), (456, 200)]
+
+ await self.cog._reschedule()
+
+ self.cog._unsilence_wrapper.assert_any_call(channels[0])
+ self.cog._unsilence_wrapper.assert_any_call(channels[1])
+
+ self.cog.notifier.add_channel.assert_not_called()
+ self.cog.scheduler.schedule_later.assert_not_called()
+
+ @mock.patch.object(silence, "datetime", new=PatchedDatetime)
+ async def test_rescheduled_active(self):
+ """Rescheduled active silences."""
+ channels = [MockTextChannel(id=123), MockTextChannel(id=456)]
+ self.bot.get_channel.side_effect = channels
+ self.cog.unsilence_timestamps.items.return_value = [(123, 2000), (456, 3000)]
+ silence.datetime.now.return_value = datetime.fromtimestamp(1000, tz=timezone.utc)
+
+ self.cog._unsilence_wrapper = mock.MagicMock()
+ unsilence_return = self.cog._unsilence_wrapper.return_value
+
+ await self.cog._reschedule()
+
+ # Yuck.
+ calls = [mock.call(1000, 123, unsilence_return), mock.call(2000, 456, unsilence_return)]
+ self.cog.scheduler.schedule_later.assert_has_calls(calls)
+
+ unsilence_calls = [mock.call(channel) for channel in channels]
+ self.cog._unsilence_wrapper.assert_has_calls(unsilence_calls)
+
+ self.cog.notifier.add_channel.assert_not_called()
+
+
+@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False)
+class SilenceTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for the silence command and its related helper methods."""
+
+ @autospec(silence.Silence, "_reschedule", pass_mocks=False)
+ @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False)
+ def setUp(self) -> None:
+ self.bot = MockBot()
+ self.cog = silence.Silence(self.bot)
+ self.cog._init_task = asyncio.Future()
+ self.cog._init_task.set_result(None)
+
+ # Avoid unawaited coroutine warnings.
+ self.cog.scheduler.schedule_later.side_effect = lambda delay, task_id, coro: coro.close()
+
+ asyncio.run(self.cog._async_init()) # Populate instance attributes.
+
+ self.channel = MockTextChannel()
+ self.overwrite = PermissionOverwrite(stream=True, send_messages=True, add_reactions=False)
+ self.channel.overwrites_for.return_value = self.overwrite
+
+ async def test_sent_correct_message(self):
+ """Appropriate failure/success message was sent by the command."""
+ test_cases = (
+ (0.0001, silence.MSG_SILENCE_SUCCESS.format(duration=0.0001), True,),
+ (None, silence.MSG_SILENCE_PERMANENT, True,),
+ (5, silence.MSG_SILENCE_FAIL, False,),
+ )
+ for duration, message, was_silenced in test_cases:
+ ctx = MockContext()
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=was_silenced):
+ with self.subTest(was_silenced=was_silenced, message=message, duration=duration):
+ await self.cog.silence.callback(self.cog, ctx, duration)
+ ctx.send.assert_called_once_with(message)
+
+ async def test_skipped_already_silenced(self):
+ """Permissions were not set and `False` was returned for an already silenced channel."""
+ subtests = (
+ (False, PermissionOverwrite(send_messages=False, add_reactions=False)),
+ (True, PermissionOverwrite(send_messages=True, add_reactions=True)),
+ (True, PermissionOverwrite(send_messages=False, add_reactions=False)),
+ )
+
+ for contains, overwrite in subtests:
+ with self.subTest(contains=contains, overwrite=overwrite):
+ self.cog.scheduler.__contains__.return_value = contains
+ channel = MockTextChannel()
+ channel.overwrites_for.return_value = overwrite
+
+ self.assertFalse(await self.cog._set_silence_overwrites(channel))
+ channel.set_permissions.assert_not_called()
+
+ async def test_silenced_channel(self):
+ """Channel had `send_message` and `add_reactions` permissions revoked for verified role."""
+ self.assertTrue(await self.cog._set_silence_overwrites(self.channel))
+ self.assertFalse(self.overwrite.send_messages)
+ self.assertFalse(self.overwrite.add_reactions)
+ self.channel.set_permissions.assert_awaited_once_with(
+ self.cog._verified_role,
+ overwrite=self.overwrite
+ )
+
+ async def test_preserved_other_overwrites(self):
+ """Channel's other unrelated overwrites were not changed."""
+ prev_overwrite_dict = dict(self.overwrite)
+ await self.cog._set_silence_overwrites(self.channel)
+ new_overwrite_dict = dict(self.overwrite)
+
+ # Remove 'send_messages' & 'add_reactions' keys because they were changed by the method.
+ del prev_overwrite_dict['send_messages']
+ del prev_overwrite_dict['add_reactions']
+ del new_overwrite_dict['send_messages']
+ del new_overwrite_dict['add_reactions']
+
+ self.assertDictEqual(prev_overwrite_dict, new_overwrite_dict)
+
+ async def test_temp_not_added_to_notifier(self):
+ """Channel was not added to notifier if a duration was set for the silence."""
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
+ await self.cog.silence.callback(self.cog, MockContext(), 15)
+ self.cog.notifier.add_channel.assert_not_called()
+
+ async def test_indefinite_added_to_notifier(self):
+ """Channel was added to notifier if a duration was not set for the silence."""
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True):
+ await self.cog.silence.callback(self.cog, MockContext(), None)
+ self.cog.notifier.add_channel.assert_called_once()
+
+ async def test_silenced_not_added_to_notifier(self):
+ """Channel was not added to the notifier if it was already silenced."""
+ with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=False):
+ await self.cog.silence.callback(self.cog, MockContext(), 15)
+ self.cog.notifier.add_channel.assert_not_called()
+
+ async def test_cached_previous_overwrites(self):
+ """Channel's previous overwrites were cached."""
+ overwrite_json = '{"send_messages": true, "add_reactions": false}'
+ await self.cog._set_silence_overwrites(self.channel)
+ self.cog.previous_overwrites.set.assert_called_once_with(self.channel.id, overwrite_json)
+
+ @autospec(silence, "datetime")
+ async def test_cached_unsilence_time(self, datetime_mock):
+ """The UTC POSIX timestamp for the unsilence was cached."""
+ now_timestamp = 100
+ duration = 15
+ timestamp = now_timestamp + duration * 60
+ datetime_mock.now.return_value = datetime.fromtimestamp(now_timestamp, tz=timezone.utc)
+
+ ctx = MockContext(channel=self.channel)
+ await self.cog.silence.callback(self.cog, ctx, duration)
+
+ self.cog.unsilence_timestamps.set.assert_awaited_once_with(ctx.channel.id, timestamp)
+ datetime_mock.now.assert_called_once_with(tz=timezone.utc) # Ensure it's using an aware dt.
+
+ async def test_cached_indefinite_time(self):
+ """A value of -1 was cached for a permanent silence."""
+ ctx = MockContext(channel=self.channel)
+ await self.cog.silence.callback(self.cog, ctx, None)
+ self.cog.unsilence_timestamps.set.assert_awaited_once_with(ctx.channel.id, -1)
+
+ async def test_scheduled_task(self):
+ """An unsilence task was scheduled."""
+ ctx = MockContext(channel=self.channel, invoke=mock.MagicMock())
+
+ await self.cog.silence.callback(self.cog, ctx, 5)
+
+ args = (300, ctx.channel.id, ctx.invoke.return_value)
+ self.cog.scheduler.schedule_later.assert_called_once_with(*args)
+ ctx.invoke.assert_called_once_with(self.cog.unsilence)
+
+ async def test_permanent_not_scheduled(self):
+ """A task was not scheduled for a permanent silence."""
+ ctx = MockContext(channel=self.channel)
+ await self.cog.silence.callback(self.cog, ctx, None)
+ self.cog.scheduler.schedule_later.assert_not_called()
+
+
+@autospec(silence.Silence, "unsilence_timestamps", pass_mocks=False)
+class UnsilenceTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for the unsilence command and its related helper methods."""
+
+ @autospec(silence.Silence, "_reschedule", pass_mocks=False)
+ @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False)
+ def setUp(self) -> None:
+ self.bot = MockBot(get_channel=lambda _: MockTextChannel())
+ self.cog = silence.Silence(self.bot)
+ self.cog._init_task = asyncio.Future()
+ self.cog._init_task.set_result(None)
+
+ overwrites_cache = mock.create_autospec(self.cog.previous_overwrites, spec_set=True)
+ self.cog.previous_overwrites = overwrites_cache
+
+ asyncio.run(self.cog._async_init()) # Populate instance attributes.
+
+ self.cog.scheduler.__contains__.return_value = True
+ overwrites_cache.get.return_value = '{"send_messages": true, "add_reactions": false}'
+ self.channel = MockTextChannel()
+ self.overwrite = PermissionOverwrite(stream=True, send_messages=False, add_reactions=False)
+ self.channel.overwrites_for.return_value = self.overwrite
+
+ async def test_sent_correct_message(self):
+ """Appropriate failure/success message was sent by the command."""
+ unsilenced_overwrite = PermissionOverwrite(send_messages=True, add_reactions=True)
+ test_cases = (
+ (True, silence.MSG_UNSILENCE_SUCCESS, unsilenced_overwrite),
+ (False, silence.MSG_UNSILENCE_FAIL, unsilenced_overwrite),
+ (False, silence.MSG_UNSILENCE_MANUAL, self.overwrite),
+ (False, silence.MSG_UNSILENCE_MANUAL, PermissionOverwrite(send_messages=False)),
+ (False, silence.MSG_UNSILENCE_MANUAL, PermissionOverwrite(add_reactions=False)),
+ )
+ for was_unsilenced, message, overwrite in test_cases:
+ ctx = MockContext()
+ with self.subTest(was_unsilenced=was_unsilenced, message=message, overwrite=overwrite):
+ with mock.patch.object(self.cog, "_unsilence", return_value=was_unsilenced):
+ ctx.channel.overwrites_for.return_value = overwrite
+ await self.cog.unsilence.callback(self.cog, ctx)
+ ctx.channel.send.assert_called_once_with(message)
+
+ async def test_skipped_already_unsilenced(self):
+ """Permissions were not set and `False` was returned for an already unsilenced channel."""
+ self.cog.scheduler.__contains__.return_value = False
+ self.cog.previous_overwrites.get.return_value = None
+ channel = MockTextChannel()
+
+ self.assertFalse(await self.cog._unsilence(channel))
+ channel.set_permissions.assert_not_called()
+
+ async def test_restored_overwrites(self):
+ """Channel's `send_message` and `add_reactions` overwrites were restored."""
+ await self.cog._unsilence(self.channel)
+ self.channel.set_permissions.assert_awaited_once_with(
+ self.cog._verified_role,
+ overwrite=self.overwrite,
+ )
+
+ # Recall that these values are determined by the fixture.
+ self.assertTrue(self.overwrite.send_messages)
+ self.assertFalse(self.overwrite.add_reactions)
+
+ async def test_cache_miss_used_default_overwrites(self):
+ """Both overwrites were set to None due previous values not being found in the cache."""
+ self.cog.previous_overwrites.get.return_value = None
+
+ await self.cog._unsilence(self.channel)
+ self.channel.set_permissions.assert_awaited_once_with(
+ self.cog._verified_role,
+ overwrite=self.overwrite,
+ )
+
+ self.assertIsNone(self.overwrite.send_messages)
+ self.assertIsNone(self.overwrite.add_reactions)
+
+ async def test_cache_miss_sent_mod_alert(self):
+ """A message was sent to the mod alerts channel."""
+ self.cog.previous_overwrites.get.return_value = None
+
+ await self.cog._unsilence(self.channel)
+ self.cog._mod_alerts_channel.send.assert_awaited_once()
+
+ async def test_removed_notifier(self):
+ """Channel was removed from `notifier`."""
+ await self.cog._unsilence(self.channel)
+ self.cog.notifier.remove_channel.assert_called_once_with(self.channel)
+
+ async def test_deleted_cached_overwrite(self):
+ """Channel was deleted from the overwrites cache."""
+ await self.cog._unsilence(self.channel)
+ self.cog.previous_overwrites.delete.assert_awaited_once_with(self.channel.id)
+
+ async def test_deleted_cached_time(self):
+ """Channel was deleted from the timestamp cache."""
+ await self.cog._unsilence(self.channel)
+ self.cog.unsilence_timestamps.delete.assert_awaited_once_with(self.channel.id)
+
+ async def test_cancelled_task(self):
+ """The scheduled unsilence task should be cancelled."""
+ await self.cog._unsilence(self.channel)
+ self.cog.scheduler.cancel.assert_called_once_with(self.channel.id)
+
+ async def test_preserved_other_overwrites(self):
+ """Channel's other unrelated overwrites were not changed, including cache misses."""
+ for overwrite_json in ('{"send_messages": true, "add_reactions": null}', None):
+ with self.subTest(overwrite_json=overwrite_json):
+ self.cog.previous_overwrites.get.return_value = overwrite_json
+
+ prev_overwrite_dict = dict(self.overwrite)
+ await self.cog._unsilence(self.channel)
+ new_overwrite_dict = dict(self.overwrite)
+
+ # Remove these keys because they were modified by the unsilence.
+ del prev_overwrite_dict['send_messages']
+ del prev_overwrite_dict['add_reactions']
+ del new_overwrite_dict['send_messages']
+ del new_overwrite_dict['add_reactions']
+
+ self.assertDictEqual(prev_overwrite_dict, new_overwrite_dict)
diff --git a/tests/bot/exts/moderation/test_slowmode.py b/tests/bot/exts/moderation/test_slowmode.py
new file mode 100644
index 000000000..dad751e0d
--- /dev/null
+++ b/tests/bot/exts/moderation/test_slowmode.py
@@ -0,0 +1,113 @@
+import unittest
+from unittest import mock
+
+from dateutil.relativedelta import relativedelta
+
+from bot.constants import Emojis
+from bot.exts.moderation.slowmode import Slowmode
+from tests.helpers import MockBot, MockContext, MockTextChannel
+
+
+class SlowmodeTests(unittest.IsolatedAsyncioTestCase):
+
+ def setUp(self) -> None:
+ self.bot = MockBot()
+ self.cog = Slowmode(self.bot)
+ self.ctx = MockContext()
+
+ async def test_get_slowmode_no_channel(self) -> None:
+ """Get slowmode without a given channel."""
+ self.ctx.channel = MockTextChannel(name='python-general', slowmode_delay=5)
+
+ await self.cog.get_slowmode(self.cog, self.ctx, None)
+ self.ctx.send.assert_called_once_with("The slowmode delay for #python-general is 5 seconds.")
+
+ async def test_get_slowmode_with_channel(self) -> None:
+ """Get slowmode with a given channel."""
+ text_channel = MockTextChannel(name='python-language', slowmode_delay=2)
+
+ await self.cog.get_slowmode(self.cog, self.ctx, text_channel)
+ self.ctx.send.assert_called_once_with('The slowmode delay for #python-language is 2 seconds.')
+
+ async def test_set_slowmode_no_channel(self) -> None:
+ """Set slowmode without a given channel."""
+ test_cases = (
+ ('helpers', 23, True, f'{Emojis.check_mark} The slowmode delay for #helpers is now 23 seconds.'),
+ ('mods', 76526, False, f'{Emojis.cross_mark} The slowmode delay must be between 0 and 6 hours.'),
+ ('admins', 97, True, f'{Emojis.check_mark} The slowmode delay for #admins is now 1 minute and 37 seconds.')
+ )
+
+ for channel_name, seconds, edited, result_msg in test_cases:
+ with self.subTest(
+ channel_mention=channel_name,
+ seconds=seconds,
+ edited=edited,
+ result_msg=result_msg
+ ):
+ self.ctx.channel = MockTextChannel(name=channel_name)
+
+ await self.cog.set_slowmode(self.cog, self.ctx, None, relativedelta(seconds=seconds))
+
+ if edited:
+ self.ctx.channel.edit.assert_awaited_once_with(slowmode_delay=float(seconds))
+ else:
+ self.ctx.channel.edit.assert_not_called()
+
+ self.ctx.send.assert_called_once_with(result_msg)
+
+ self.ctx.reset_mock()
+
+ async def test_set_slowmode_with_channel(self) -> None:
+ """Set slowmode with a given channel."""
+ test_cases = (
+ ('bot-commands', 12, True, f'{Emojis.check_mark} The slowmode delay for #bot-commands is now 12 seconds.'),
+ ('mod-spam', 21, True, f'{Emojis.check_mark} The slowmode delay for #mod-spam is now 21 seconds.'),
+ ('admin-spam', 4323598, False, f'{Emojis.cross_mark} The slowmode delay must be between 0 and 6 hours.')
+ )
+
+ for channel_name, seconds, edited, result_msg in test_cases:
+ with self.subTest(
+ channel_mention=channel_name,
+ seconds=seconds,
+ edited=edited,
+ result_msg=result_msg
+ ):
+ text_channel = MockTextChannel(name=channel_name)
+
+ await self.cog.set_slowmode(self.cog, self.ctx, text_channel, relativedelta(seconds=seconds))
+
+ if edited:
+ text_channel.edit.assert_awaited_once_with(slowmode_delay=float(seconds))
+ else:
+ text_channel.edit.assert_not_called()
+
+ self.ctx.send.assert_called_once_with(result_msg)
+
+ self.ctx.reset_mock()
+
+ async def test_reset_slowmode_no_channel(self) -> None:
+ """Reset slowmode without a given channel."""
+ self.ctx.channel = MockTextChannel(name='careers', slowmode_delay=6)
+
+ await self.cog.reset_slowmode(self.cog, self.ctx, None)
+ self.ctx.send.assert_called_once_with(
+ f'{Emojis.check_mark} The slowmode delay for #careers has been reset to 0 seconds.'
+ )
+
+ async def test_reset_slowmode_with_channel(self) -> None:
+ """Reset slowmode with a given channel."""
+ text_channel = MockTextChannel(name='meta', slowmode_delay=1)
+
+ await self.cog.reset_slowmode(self.cog, self.ctx, text_channel)
+ self.ctx.send.assert_called_once_with(
+ f'{Emojis.check_mark} The slowmode delay for #meta has been reset to 0 seconds.'
+ )
+
+ @mock.patch("bot.exts.moderation.slowmode.has_any_role")
+ @mock.patch("bot.exts.moderation.slowmode.MODERATION_ROLES", new=(1, 2, 3))
+ async def test_cog_check(self, role_check):
+ """Role check is called with `MODERATION_ROLES`"""
+ role_check.return_value.predicate = mock.AsyncMock()
+ await self.cog.cog_check(self.ctx)
+ role_check.assert_called_once_with(*(1, 2, 3))
+ role_check.return_value.predicate.assert_awaited_once_with(self.ctx)
diff --git a/tests/bot/cogs/test_cogs.py b/tests/bot/exts/test_cogs.py
index fdda59a8f..f8e120262 100644
--- a/tests/bot/cogs/test_cogs.py
+++ b/tests/bot/exts/test_cogs.py
@@ -10,7 +10,7 @@ from unittest import mock
from discord.ext import commands
-from bot import cogs
+from bot import exts
class CommandNameTests(unittest.TestCase):
@@ -29,13 +29,14 @@ class CommandNameTests(unittest.TestCase):
@staticmethod
def walk_modules() -> t.Iterator[ModuleType]:
- """Yield imported modules from the bot.cogs subpackage."""
+ """Yield imported modules from the bot.exts subpackage."""
def on_error(name: str) -> t.NoReturn:
raise ImportError(name=name) # pragma: no cover
# The mock prevents asyncio.get_event_loop() from being called.
with mock.patch("discord.ext.tasks.loop"):
- for module in pkgutil.walk_packages(cogs.__path__, "bot.cogs.", onerror=on_error):
+ prefix = f"{exts.__name__}."
+ for module in pkgutil.walk_packages(exts.__path__, prefix, onerror=on_error):
if not module.ispkg:
yield importlib.import_module(module.name)
@@ -53,6 +54,7 @@ class CommandNameTests(unittest.TestCase):
"""Return a list of all qualified names, including aliases, for the `command`."""
names = [f"{command.full_parent_name} {alias}".strip() for alias in command.aliases]
names.append(command.qualified_name)
+ names += getattr(command, "root_aliases", [])
return names
diff --git a/tests/bot/exts/utils/__init__.py b/tests/bot/exts/utils/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/bot/exts/utils/__init__.py
diff --git a/tests/bot/exts/utils/test_jams.py b/tests/bot/exts/utils/test_jams.py
new file mode 100644
index 000000000..45e7b5b51
--- /dev/null
+++ b/tests/bot/exts/utils/test_jams.py
@@ -0,0 +1,173 @@
+import unittest
+from unittest.mock import AsyncMock, MagicMock, create_autospec
+
+from discord import CategoryChannel
+
+from bot.constants import Roles
+from bot.exts.utils import jams
+from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole, MockTextChannel
+
+
+def get_mock_category(channel_count: int, name: str) -> CategoryChannel:
+ """Return a mocked code jam category."""
+ category = create_autospec(CategoryChannel, spec_set=True, instance=True)
+ category.name = name
+ category.channels = [MockTextChannel() for _ in range(channel_count)]
+
+ return category
+
+
+class JamCreateTeamTests(unittest.IsolatedAsyncioTestCase):
+ """Tests for `createteam` command."""
+
+ def setUp(self):
+ self.bot = MockBot()
+ self.admin_role = MockRole(name="Admins", id=Roles.admins)
+ self.command_user = MockMember([self.admin_role])
+ self.guild = MockGuild([self.admin_role])
+ self.ctx = MockContext(bot=self.bot, author=self.command_user, guild=self.guild)
+ self.cog = jams.CodeJams(self.bot)
+
+ async def test_too_small_amount_of_team_members_passed(self):
+ """Should `ctx.send` and exit early when too small amount of members."""
+ for case in (1, 2):
+ with self.subTest(amount_of_members=case):
+ self.cog.create_channels = AsyncMock()
+ self.cog.add_roles = AsyncMock()
+
+ self.ctx.reset_mock()
+ members = (MockMember() for _ in range(case))
+ await self.cog.createteam(self.cog, self.ctx, "foo", members)
+
+ self.ctx.send.assert_awaited_once()
+ self.cog.create_channels.assert_not_awaited()
+ self.cog.add_roles.assert_not_awaited()
+
+ async def test_duplicate_members_provided(self):
+ """Should `ctx.send` and exit early because duplicate members provided and total there is only 1 member."""
+ self.cog.create_channels = AsyncMock()
+ self.cog.add_roles = AsyncMock()
+
+ member = MockMember()
+ await self.cog.createteam(self.cog, self.ctx, "foo", (member for _ in range(5)))
+
+ self.ctx.send.assert_awaited_once()
+ self.cog.create_channels.assert_not_awaited()
+ self.cog.add_roles.assert_not_awaited()
+
+ async def test_result_sending(self):
+ """Should call `ctx.send` when everything goes right."""
+ self.cog.create_channels = AsyncMock()
+ self.cog.add_roles = AsyncMock()
+
+ members = [MockMember() for _ in range(5)]
+ await self.cog.createteam(self.cog, self.ctx, "foo", members)
+
+ self.cog.create_channels.assert_awaited_once()
+ self.cog.add_roles.assert_awaited_once()
+ self.ctx.send.assert_awaited_once()
+
+ async def test_category_doesnt_exist(self):
+ """Should create a new code jam category."""
+ subtests = (
+ [],
+ [get_mock_category(jams.MAX_CHANNELS - 1, jams.CATEGORY_NAME)],
+ [get_mock_category(jams.MAX_CHANNELS - 2, "other")],
+ )
+
+ for categories in subtests:
+ self.guild.reset_mock()
+ self.guild.categories = categories
+
+ with self.subTest(categories=categories):
+ actual_category = await self.cog.get_category(self.guild)
+
+ self.guild.create_category_channel.assert_awaited_once()
+ category_overwrites = self.guild.create_category_channel.call_args[1]["overwrites"]
+
+ self.assertFalse(category_overwrites[self.guild.default_role].read_messages)
+ self.assertTrue(category_overwrites[self.guild.me].read_messages)
+ self.assertEqual(self.guild.create_category_channel.return_value, actual_category)
+
+ async def test_category_channel_exist(self):
+ """Should not try to create category channel."""
+ expected_category = get_mock_category(jams.MAX_CHANNELS - 2, jams.CATEGORY_NAME)
+ self.guild.categories = [
+ get_mock_category(jams.MAX_CHANNELS - 2, "other"),
+ expected_category,
+ get_mock_category(0, jams.CATEGORY_NAME),
+ ]
+
+ actual_category = await self.cog.get_category(self.guild)
+ self.assertEqual(expected_category, actual_category)
+
+ async def test_channel_overwrites(self):
+ """Should have correct permission overwrites for users and roles."""
+ leader = MockMember()
+ members = [leader] + [MockMember() for _ in range(4)]
+ overwrites = self.cog.get_overwrites(members, self.guild)
+
+ # Leader permission overwrites
+ self.assertTrue(overwrites[leader].manage_messages)
+ self.assertTrue(overwrites[leader].read_messages)
+ self.assertTrue(overwrites[leader].manage_webhooks)
+ self.assertTrue(overwrites[leader].connect)
+
+ # Other members permission overwrites
+ for member in members[1:]:
+ self.assertTrue(overwrites[member].read_messages)
+ self.assertTrue(overwrites[member].connect)
+
+ # Everyone and verified role overwrite
+ self.assertFalse(overwrites[self.guild.default_role].read_messages)
+ self.assertFalse(overwrites[self.guild.default_role].connect)
+ self.assertFalse(overwrites[self.guild.get_role(Roles.verified)].read_messages)
+ self.assertFalse(overwrites[self.guild.get_role(Roles.verified)].connect)
+
+ async def test_team_channels_creation(self):
+ """Should create new voice and text channel for team."""
+ members = [MockMember() for _ in range(5)]
+
+ self.cog.get_overwrites = MagicMock()
+ self.cog.get_category = AsyncMock()
+ self.ctx.guild.create_text_channel.return_value = MockTextChannel(mention="foobar-channel")
+ actual = await self.cog.create_channels(self.guild, "my-team", members)
+
+ self.assertEqual("foobar-channel", actual)
+ self.cog.get_overwrites.assert_called_once_with(members, self.guild)
+ self.cog.get_category.assert_awaited_once_with(self.guild)
+
+ self.guild.create_text_channel.assert_awaited_once_with(
+ "my-team",
+ overwrites=self.cog.get_overwrites.return_value,
+ category=self.cog.get_category.return_value
+ )
+ self.guild.create_voice_channel.assert_awaited_once_with(
+ "My Team",
+ overwrites=self.cog.get_overwrites.return_value,
+ category=self.cog.get_category.return_value
+ )
+
+ async def test_jam_roles_adding(self):
+ """Should add team leader role to leader and jam role to every team member."""
+ leader_role = MockRole(name="Team Leader")
+ jam_role = MockRole(name="Jammer")
+ self.guild.get_role.side_effect = [leader_role, jam_role]
+
+ leader = MockMember()
+ members = [leader] + [MockMember() for _ in range(4)]
+ await self.cog.add_roles(self.guild, members)
+
+ leader.add_roles.assert_any_await(leader_role)
+ for member in members:
+ member.add_roles.assert_any_await(jam_role)
+
+
+class CodeJamSetup(unittest.TestCase):
+ """Test for `setup` function of `CodeJam` cog."""
+
+ def test_setup(self):
+ """Should call `bot.add_cog`."""
+ bot = MockBot()
+ jams.setup(bot)
+ bot.add_cog.assert_called_once()
diff --git a/tests/bot/cogs/test_snekbox.py b/tests/bot/exts/utils/test_snekbox.py
index cf9adbee0..9a42d0610 100644
--- a/tests/bot/cogs/test_snekbox.py
+++ b/tests/bot/exts/utils/test_snekbox.py
@@ -1,13 +1,12 @@
import asyncio
-import logging
import unittest
from unittest.mock import AsyncMock, MagicMock, Mock, call, create_autospec, patch
from discord.ext import commands
from bot import constants
-from bot.cogs import snekbox
-from bot.cogs.snekbox import Snekbox
+from bot.exts.utils import snekbox
+from bot.exts.utils.snekbox import Snekbox
from tests.helpers import MockBot, MockContext, MockMessage, MockReaction, MockUser
@@ -39,49 +38,27 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
result = await self.cog.upload_output("-" * (snekbox.MAX_PASTE_LEN + 1))
self.assertEqual(result, "too long to upload")
- async def test_upload_output(self):
+ @patch("bot.exts.utils.snekbox.send_to_paste_service")
+ async def test_upload_output(self, mock_paste_util):
"""Upload the eval output to the URLs.paste_service.format(key="documents") endpoint."""
- key = "MarkDiamond"
- resp = MagicMock()
- resp.json = AsyncMock(return_value={"key": key})
-
- context_manager = MagicMock()
- context_manager.__aenter__.return_value = resp
- self.bot.http_session.post.return_value = context_manager
-
- self.assertEqual(
- await self.cog.upload_output("My awesome output"),
- constants.URLs.paste_service.format(key=key)
- )
- self.bot.http_session.post.assert_called_with(
- constants.URLs.paste_service.format(key="documents"),
- data="My awesome output",
- raise_for_status=True
+ await self.cog.upload_output("Test output.")
+ mock_paste_util.assert_called_once_with(
+ self.bot.http_session, "Test output.", extension="txt"
)
- async def test_upload_output_gracefully_fallback_if_exception_during_request(self):
- """Output upload gracefully fallback if the upload fail."""
- resp = MagicMock()
- resp.json = AsyncMock(side_effect=Exception)
-
- context_manager = MagicMock()
- context_manager.__aenter__.return_value = resp
- self.bot.http_session.post.return_value = context_manager
-
- log = logging.getLogger("bot.cogs.snekbox")
- with self.assertLogs(logger=log, level='ERROR'):
- await self.cog.upload_output('My awesome output!')
-
- async def test_upload_output_gracefully_fallback_if_no_key_in_response(self):
- """Output upload gracefully fallback if there is no key entry in the response body."""
- self.assertEqual((await self.cog.upload_output('My awesome output!')), None)
-
def test_prepare_input(self):
cases = (
('print("Hello world!")', 'print("Hello world!")', 'non-formatted'),
('`print("Hello world!")`', 'print("Hello world!")', 'one line code block'),
('```\nprint("Hello world!")```', 'print("Hello world!")', 'multiline code block'),
('```py\nprint("Hello world!")```', 'print("Hello world!")', 'multiline python code block'),
+ ('text```print("Hello world!")```text', 'print("Hello world!")', 'code block surrounded by text'),
+ ('```print("Hello world!")```\ntext\n```py\nprint("Hello world!")```',
+ 'print("Hello world!")\nprint("Hello world!")', 'two code blocks with text in-between'),
+ ('`print("Hello world!")`\ntext\n```print("How\'s it going?")```',
+ 'print("How\'s it going?")', 'code block preceded by inline code'),
+ ('`print("Hello world!")`\ntext\n`print("Hello world!")`',
+ 'print("Hello world!")', 'one inline code block of two')
)
for case, expected, testname in cases:
with self.subTest(msg=f'Extract code from {testname}.'):
@@ -99,14 +76,14 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
actual = self.cog.get_results_message({'stdout': stdout, 'returncode': returncode})
self.assertEqual(actual, expected)
- @patch('bot.cogs.snekbox.Signals', side_effect=ValueError)
+ @patch('bot.exts.utils.snekbox.Signals', side_effect=ValueError)
def test_get_results_message_invalid_signal(self, mock_signals: Mock):
self.assertEqual(
self.cog.get_results_message({'stdout': '', 'returncode': 127}),
('Your eval job has completed with return code 127', '')
)
- @patch('bot.cogs.snekbox.Signals')
+ @patch('bot.exts.utils.snekbox.Signals')
def test_get_results_message_valid_signal(self, mock_signals: Mock):
mock_signals.return_value.name = 'SIGTEST'
self.assertEqual(
@@ -147,12 +124,12 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
('<!@', ("<!@\u200B", None), r'Convert <!@ to <!@\u200B'),
(
'\u202E\u202E\u202E',
- ('Code block escape attempt detected; will not output result', None),
+ ('Code block escape attempt detected; will not output result', 'https://testificate.com/'),
'Detect RIGHT-TO-LEFT OVERRIDE'
),
(
'\u200B\u200B\u200B',
- ('Code block escape attempt detected; will not output result', None),
+ ('Code block escape attempt detected; will not output result', 'https://testificate.com/'),
'Detect ZERO WIDTH SPACE'
),
('long\nbeard', ('001 | long\n002 | beard', None), 'Two line output'),
@@ -184,7 +161,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.send_eval = AsyncMock(return_value=response)
self.cog.continue_eval = AsyncMock(return_value=None)
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ await self.cog.eval_command(self.cog, ctx=ctx, code='MyAwesomeCode')
self.cog.prepare_input.assert_called_once_with('MyAwesomeCode')
self.cog.send_eval.assert_called_once_with(ctx, 'MyAwesomeFormattedCode')
self.cog.continue_eval.assert_called_once_with(ctx, response)
@@ -198,7 +175,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.continue_eval = AsyncMock()
self.cog.continue_eval.side_effect = ('MyAwesomeCode-2', None)
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ await self.cog.eval_command(self.cog, ctx=ctx, code='MyAwesomeCode')
self.cog.prepare_input.has_calls(call('MyAwesomeCode'), call('MyAwesomeCode-2'))
self.cog.send_eval.assert_called_with(ctx, 'MyAwesomeFormattedCode')
self.cog.continue_eval.assert_called_with(ctx, response)
@@ -210,7 +187,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
ctx.author.mention = '@LemonLemonishBeard#0042'
ctx.send = AsyncMock()
self.cog.jobs = (42,)
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode')
+ await self.cog.eval_command(self.cog, ctx=ctx, code='MyAwesomeCode')
ctx.send.assert_called_once_with(
"@LemonLemonishBeard#0042 You've already got a job running - please wait for it to finish!"
)
@@ -218,8 +195,8 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
async def test_eval_command_call_help(self):
"""Test if the eval command call the help command if no code is provided."""
ctx = MockContext(command="sentinel")
- await self.cog.eval_command.callback(self.cog, ctx=ctx, code='')
- ctx.send_help.assert_called_once_with("sentinel")
+ await self.cog.eval_command(self.cog, ctx=ctx, code='')
+ ctx.send_help.assert_called_once_with(ctx.command)
async def test_send_eval(self):
"""Test the send_eval function."""
@@ -233,9 +210,13 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
self.cog.format_output = AsyncMock(return_value=('[No output]', None))
+ mocked_filter_cog = MagicMock()
+ mocked_filter_cog.filter_eval = AsyncMock(return_value=False)
+ self.bot.get_cog.return_value = mocked_filter_cog
+
await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
- '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```'
+ '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```\n[No output]\n```'
)
self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
self.cog.get_status_emoji.assert_called_once_with({'stdout': '', 'returncode': 0})
@@ -254,10 +235,14 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.get_status_emoji = MagicMock(return_value=':yay!:')
self.cog.format_output = AsyncMock(return_value=('Way too long beard', 'lookatmybeard.com'))
+ mocked_filter_cog = MagicMock()
+ mocked_filter_cog.filter_eval = AsyncMock(return_value=False)
+ self.bot.get_cog.return_value = mocked_filter_cog
+
await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
'@LemonLemonishBeard#0042 :yay!: Return code 0.'
- '\n\n```py\nWay too long beard\n```\nFull output: lookatmybeard.com'
+ '\n\n```\nWay too long beard\n```\nFull output: lookatmybeard.com'
)
self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
self.cog.get_status_emoji.assert_called_once_with({'stdout': 'Way too long beard', 'returncode': 0})
@@ -275,16 +260,20 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
self.cog.get_status_emoji = MagicMock(return_value=':nope!:')
self.cog.format_output = AsyncMock() # This function isn't called
+ mocked_filter_cog = MagicMock()
+ mocked_filter_cog.filter_eval = AsyncMock(return_value=False)
+ self.bot.get_cog.return_value = mocked_filter_cog
+
await self.cog.send_eval(ctx, 'MyAwesomeCode')
ctx.send.assert_called_once_with(
- '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nBeard got stuck in the eval\n```'
+ '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```\nBeard got stuck in the eval\n```'
)
self.cog.post_eval.assert_called_once_with('MyAwesomeCode')
self.cog.get_status_emoji.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127})
self.cog.get_results_message.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127})
self.cog.format_output.assert_not_called()
- @patch("bot.cogs.snekbox.partial")
+ @patch("bot.exts.utils.snekbox.partial")
async def test_continue_eval_does_continue(self, partial_mock):
"""Test that the continue_eval function does continue if required conditions are met."""
ctx = MockContext(message=MockMessage(add_reaction=AsyncMock(), clear_reactions=AsyncMock()))
@@ -308,7 +297,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
)
)
ctx.message.add_reaction.assert_called_once_with(snekbox.REEVAL_EMOJI)
- ctx.message.clear_reactions.assert_called_once()
+ ctx.message.clear_reaction.assert_called_once_with(snekbox.REEVAL_EMOJI)
response.delete.assert_called_once()
async def test_continue_eval_does_not_continue(self):
@@ -317,7 +306,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase):
actual = await self.cog.continue_eval(ctx, MockMessage())
self.assertEqual(actual, None)
- ctx.message.clear_reactions.assert_called_once()
+ ctx.message.clear_reaction.assert_called_once_with(snekbox.REEVAL_EMOJI)
async def test_get_code(self):
"""Should return 1st arg (or None) if eval cmd in message, otherwise return full content."""
diff --git a/tests/bot/test_pagination.py b/tests/bot/test_pagination.py
index ce880d457..630f2516d 100644
--- a/tests/bot/test_pagination.py
+++ b/tests/bot/test_pagination.py
@@ -44,18 +44,3 @@ class LinePaginatorTests(TestCase):
self.paginator.add_line('x' * (self.paginator.scale_to_size + 1))
# Note: item at index 1 is the truncated line, index 0 is prefix
self.assertEqual(self.paginator._current_page[1], 'x' * self.paginator.scale_to_size)
-
-
-class ImagePaginatorTests(TestCase):
- """Tests functionality of the `ImagePaginator`."""
-
- def setUp(self):
- """Create a paginator for the test method."""
- self.paginator = pagination.ImagePaginator()
-
- def test_add_image_appends_image(self):
- """`add_image` appends the image to the image list."""
- image = 'lemon'
- self.paginator.add_image(image)
-
- assert self.paginator.images == [image]
diff --git a/tests/bot/utils/test_checks.py b/tests/bot/utils/test_checks.py
index de72e5748..883465e0b 100644
--- a/tests/bot/utils/test_checks.py
+++ b/tests/bot/utils/test_checks.py
@@ -1,48 +1,50 @@
import unittest
from unittest.mock import MagicMock
+from discord import DMChannel
+
from bot.utils import checks
from bot.utils.checks import InWhitelistCheckFailure
from tests.helpers import MockContext, MockRole
-class ChecksTests(unittest.TestCase):
+class ChecksTests(unittest.IsolatedAsyncioTestCase):
"""Tests the check functions defined in `bot.checks`."""
def setUp(self):
self.ctx = MockContext()
- def test_with_role_check_without_guild(self):
- """`with_role_check` returns `False` if `Context.guild` is None."""
- self.ctx.guild = None
- self.assertFalse(checks.with_role_check(self.ctx))
+ async def test_has_any_role_check_without_guild(self):
+ """`has_any_role_check` returns `False` for non-guild channels."""
+ self.ctx.channel = MagicMock(DMChannel)
+ self.assertFalse(await checks.has_any_role_check(self.ctx))
- def test_with_role_check_without_required_roles(self):
- """`with_role_check` returns `False` if `Context.author` lacks the required role."""
+ async def test_has_any_role_check_without_required_roles(self):
+ """`has_any_role_check` returns `False` if `Context.author` lacks the required role."""
self.ctx.author.roles = []
- self.assertFalse(checks.with_role_check(self.ctx))
+ self.assertFalse(await checks.has_any_role_check(self.ctx))
- def test_with_role_check_with_guild_and_required_role(self):
- """`with_role_check` returns `True` if `Context.author` has the required role."""
+ async def test_has_any_role_check_with_guild_and_required_role(self):
+ """`has_any_role_check` returns `True` if `Context.author` has the required role."""
self.ctx.author.roles.append(MockRole(id=10))
- self.assertTrue(checks.with_role_check(self.ctx, 10))
+ self.assertTrue(await checks.has_any_role_check(self.ctx, 10))
- def test_without_role_check_without_guild(self):
- """`without_role_check` should return `False` when `Context.guild` is None."""
- self.ctx.guild = None
- self.assertFalse(checks.without_role_check(self.ctx))
+ async def test_has_no_roles_check_without_guild(self):
+ """`has_no_roles_check` should return `False` when `Context.guild` is None."""
+ self.ctx.channel = MagicMock(DMChannel)
+ self.assertFalse(await checks.has_no_roles_check(self.ctx))
- def test_without_role_check_returns_false_with_unwanted_role(self):
- """`without_role_check` returns `False` if `Context.author` has unwanted role."""
+ async def test_has_no_roles_check_returns_false_with_unwanted_role(self):
+ """`has_no_roles_check` returns `False` if `Context.author` has unwanted role."""
role_id = 42
self.ctx.author.roles.append(MockRole(id=role_id))
- self.assertFalse(checks.without_role_check(self.ctx, role_id))
+ self.assertFalse(await checks.has_no_roles_check(self.ctx, role_id))
- def test_without_role_check_returns_true_without_unwanted_role(self):
- """`without_role_check` returns `True` if `Context.author` does not have unwanted role."""
+ async def test_has_no_roles_check_returns_true_without_unwanted_role(self):
+ """`has_no_roles_check` returns `True` if `Context.author` does not have unwanted role."""
role_id = 42
self.ctx.author.roles.append(MockRole(id=role_id))
- self.assertTrue(checks.without_role_check(self.ctx, role_id + 10))
+ self.assertTrue(await checks.has_no_roles_check(self.ctx, role_id + 10))
def test_in_whitelist_check_correct_channel(self):
"""`in_whitelist_check` returns `True` if `Context.channel.id` is in the channel list."""
diff --git a/tests/bot/utils/test_redis_cache.py b/tests/bot/utils/test_redis_cache.py
deleted file mode 100644
index a2f0fe55d..000000000
--- a/tests/bot/utils/test_redis_cache.py
+++ /dev/null
@@ -1,265 +0,0 @@
-import asyncio
-import unittest
-
-import fakeredis.aioredis
-
-from bot.utils import RedisCache
-from bot.utils.redis_cache import NoBotInstanceError, NoNamespaceError, NoParentInstanceError
-from tests import helpers
-
-
-class RedisCacheTests(unittest.IsolatedAsyncioTestCase):
- """Tests the RedisCache class from utils.redis_dict.py."""
-
- async def asyncSetUp(self): # noqa: N802
- """Sets up the objects that only have to be initialized once."""
- self.bot = helpers.MockBot()
- self.bot.redis_session = await fakeredis.aioredis.create_redis_pool()
-
- # Okay, so this is necessary so that we can create a clean new
- # class for every test method, and we want that because it will
- # ensure we get a fresh loop, which is necessary for test_increment_lock
- # to be able to pass.
- class DummyCog:
- """A dummy cog, for dummies."""
-
- redis = RedisCache()
-
- def __init__(self, bot: helpers.MockBot):
- self.bot = bot
-
- self.cog = DummyCog(self.bot)
-
- await self.cog.redis.clear()
-
- def test_class_attribute_namespace(self):
- """Test that RedisDict creates a namespace automatically for class attributes."""
- self.assertEqual(self.cog.redis._namespace, "DummyCog.redis")
-
- async def test_class_attribute_required(self):
- """Test that errors are raised when not assigned as a class attribute."""
- bad_cache = RedisCache()
- self.assertIs(bad_cache._namespace, None)
-
- with self.assertRaises(RuntimeError):
- await bad_cache.set("test", "me_up_deadman")
-
- async def test_set_get_item(self):
- """Test that users can set and get items from the RedisDict."""
- test_cases = (
- ('favorite_fruit', 'melon'),
- ('favorite_number', 86),
- ('favorite_fraction', 86.54),
- ('favorite_boolean', False),
- ('other_boolean', True),
- )
-
- # Test that we can get and set different types.
- for test in test_cases:
- await self.cog.redis.set(*test)
- self.assertEqual(await self.cog.redis.get(test[0]), test[1])
-
- # Test that .get allows a default value
- self.assertEqual(await self.cog.redis.get('favorite_nothing', "bearclaw"), "bearclaw")
-
- async def test_set_item_type(self):
- """Test that .set rejects keys and values that are not permitted."""
- fruits = ["lemon", "melon", "apple"]
-
- with self.assertRaises(TypeError):
- await self.cog.redis.set(fruits, "nice")
-
- with self.assertRaises(TypeError):
- await self.cog.redis.set(4.23, "nice")
-
- async def test_delete_item(self):
- """Test that .delete allows us to delete stuff from the RedisCache."""
- # Add an item and verify that it gets added
- await self.cog.redis.set("internet", "firetruck")
- self.assertEqual(await self.cog.redis.get("internet"), "firetruck")
-
- # Delete that item and verify that it gets deleted
- await self.cog.redis.delete("internet")
- self.assertIs(await self.cog.redis.get("internet"), None)
-
- async def test_contains(self):
- """Test that we can check membership with .contains."""
- await self.cog.redis.set('favorite_country', "Burkina Faso")
-
- self.assertIs(await self.cog.redis.contains('favorite_country'), True)
- self.assertIs(await self.cog.redis.contains('favorite_dentist'), False)
-
- async def test_items(self):
- """Test that the RedisDict can be iterated."""
- # Set up our test cases in the Redis cache
- test_cases = [
- ('favorite_turtle', 'Donatello'),
- ('second_favorite_turtle', 'Leonardo'),
- ('third_favorite_turtle', 'Raphael'),
- ]
- for key, value in test_cases:
- await self.cog.redis.set(key, value)
-
- # Consume the AsyncIterator into a regular list, easier to compare that way.
- redis_items = [item for item in await self.cog.redis.items()]
-
- # These sequences are probably in the same order now, but probably
- # isn't good enough for tests. Let's not rely on .hgetall always
- # returning things in sequence, and just sort both lists to be safe.
- redis_items = sorted(redis_items)
- test_cases = sorted(test_cases)
-
- # If these are equal now, everything works fine.
- self.assertSequenceEqual(test_cases, redis_items)
-
- async def test_length(self):
- """Test that we can get the correct .length from the RedisDict."""
- await self.cog.redis.set('one', 1)
- await self.cog.redis.set('two', 2)
- await self.cog.redis.set('three', 3)
- self.assertEqual(await self.cog.redis.length(), 3)
-
- await self.cog.redis.set('four', 4)
- self.assertEqual(await self.cog.redis.length(), 4)
-
- async def test_to_dict(self):
- """Test that the .to_dict method returns a workable dictionary copy."""
- copy = await self.cog.redis.to_dict()
- local_copy = {key: value for key, value in await self.cog.redis.items()}
- self.assertIs(type(copy), dict)
- self.assertDictEqual(copy, local_copy)
-
- async def test_clear(self):
- """Test that the .clear method removes the entire hash."""
- await self.cog.redis.set('teddy', 'with me')
- await self.cog.redis.set('in my dreams', 'you have a weird hat')
- self.assertEqual(await self.cog.redis.length(), 2)
-
- await self.cog.redis.clear()
- self.assertEqual(await self.cog.redis.length(), 0)
-
- async def test_pop(self):
- """Test that we can .pop an item from the RedisDict."""
- await self.cog.redis.set('john', 'was afraid')
-
- self.assertEqual(await self.cog.redis.pop('john'), 'was afraid')
- self.assertEqual(await self.cog.redis.pop('pete', 'breakneck'), 'breakneck')
- self.assertEqual(await self.cog.redis.length(), 0)
-
- async def test_update(self):
- """Test that we can .update the RedisDict with multiple items."""
- await self.cog.redis.set("reckfried", "lona")
- await self.cog.redis.set("bel air", "prince")
- await self.cog.redis.update({
- "reckfried": "jona",
- "mega": "hungry, though",
- })
-
- result = {
- "reckfried": "jona",
- "bel air": "prince",
- "mega": "hungry, though",
- }
- self.assertDictEqual(await self.cog.redis.to_dict(), result)
-
- def test_typestring_conversion(self):
- """Test the typestring-related helper functions."""
- conversion_tests = (
- (12, "i|12"),
- (12.4, "f|12.4"),
- ("cowabunga", "s|cowabunga"),
- )
-
- # Test conversion to typestring
- for _input, expected in conversion_tests:
- self.assertEqual(self.cog.redis._value_to_typestring(_input), expected)
-
- # Test conversion from typestrings
- for _input, expected in conversion_tests:
- self.assertEqual(self.cog.redis._value_from_typestring(expected), _input)
-
- # Test that exceptions are raised on invalid input
- with self.assertRaises(TypeError):
- self.cog.redis._value_to_typestring(["internet"])
- self.cog.redis._value_from_typestring("o|firedog")
-
- async def test_increment_decrement(self):
- """Test .increment and .decrement methods."""
- await self.cog.redis.set("entropic", 5)
- await self.cog.redis.set("disentropic", 12.5)
-
- # Test default increment
- await self.cog.redis.increment("entropic")
- self.assertEqual(await self.cog.redis.get("entropic"), 6)
-
- # Test default decrement
- await self.cog.redis.decrement("entropic")
- self.assertEqual(await self.cog.redis.get("entropic"), 5)
-
- # Test float increment with float
- await self.cog.redis.increment("disentropic", 2.0)
- self.assertEqual(await self.cog.redis.get("disentropic"), 14.5)
-
- # Test float increment with int
- await self.cog.redis.increment("disentropic", 2)
- self.assertEqual(await self.cog.redis.get("disentropic"), 16.5)
-
- # Test negative increments, because why not.
- await self.cog.redis.increment("entropic", -5)
- self.assertEqual(await self.cog.redis.get("entropic"), 0)
-
- # Negative decrements? Sure.
- await self.cog.redis.decrement("entropic", -5)
- self.assertEqual(await self.cog.redis.get("entropic"), 5)
-
- # What about if we use a negative float to decrement an int?
- # This should convert the type into a float.
- await self.cog.redis.decrement("entropic", -2.5)
- self.assertEqual(await self.cog.redis.get("entropic"), 7.5)
-
- # Let's test that they raise the right errors
- with self.assertRaises(KeyError):
- await self.cog.redis.increment("doesn't_exist!")
-
- await self.cog.redis.set("stringthing", "stringthing")
- with self.assertRaises(TypeError):
- await self.cog.redis.increment("stringthing")
-
- async def test_increment_lock(self):
- """Test that we can't produce a race condition in .increment."""
- await self.cog.redis.set("test_key", 0)
- tasks = []
-
- # Increment this a lot in different tasks
- for _ in range(100):
- task = asyncio.create_task(
- self.cog.redis.increment("test_key", 1)
- )
- tasks.append(task)
- await asyncio.gather(*tasks)
-
- # Confirm that the value has been incremented the exact right number of times.
- value = await self.cog.redis.get("test_key")
- self.assertEqual(value, 100)
-
- async def test_exceptions_raised(self):
- """Testing that the various RuntimeErrors are reachable."""
- class MyCog:
- cache = RedisCache()
-
- def __init__(self):
- self.other_cache = RedisCache()
-
- cog = MyCog()
-
- # Raises "No Bot instance"
- with self.assertRaises(NoBotInstanceError):
- await cog.cache.get("john")
-
- # Raises "RedisCache has no namespace"
- with self.assertRaises(NoNamespaceError):
- await cog.other_cache.get("was")
-
- # Raises "You must access the RedisCache instance through the cog instance"
- with self.assertRaises(NoParentInstanceError):
- await MyCog.cache.get("afraid")
diff --git a/tests/bot/utils/test_services.py b/tests/bot/utils/test_services.py
new file mode 100644
index 000000000..5e0855704
--- /dev/null
+++ b/tests/bot/utils/test_services.py
@@ -0,0 +1,74 @@
+import logging
+import unittest
+from unittest.mock import AsyncMock, MagicMock, Mock, patch
+
+from aiohttp import ClientConnectorError
+
+from bot.utils.services import FAILED_REQUEST_ATTEMPTS, send_to_paste_service
+
+
+class PasteTests(unittest.IsolatedAsyncioTestCase):
+ def setUp(self) -> None:
+ self.http_session = MagicMock()
+
+ @patch("bot.utils.services.URLs.paste_service", "https://paste_service.com/{key}")
+ async def test_url_and_sent_contents(self):
+ """Correct url was used and post was called with expected data."""
+ response = MagicMock(
+ json=AsyncMock(return_value={"key": ""})
+ )
+ self.http_session.post().__aenter__.return_value = response
+ self.http_session.post.reset_mock()
+ await send_to_paste_service(self.http_session, "Content")
+ self.http_session.post.assert_called_once_with("https://paste_service.com/documents", data="Content")
+
+ @patch("bot.utils.services.URLs.paste_service", "https://paste_service.com/{key}")
+ async def test_paste_returns_correct_url_on_success(self):
+ """Url with specified extension is returned on successful requests."""
+ key = "paste_key"
+ test_cases = (
+ (f"https://paste_service.com/{key}.txt", "txt"),
+ (f"https://paste_service.com/{key}.py", "py"),
+ (f"https://paste_service.com/{key}", ""),
+ )
+ response = MagicMock(
+ json=AsyncMock(return_value={"key": key})
+ )
+ self.http_session.post().__aenter__.return_value = response
+
+ for expected_output, extension in test_cases:
+ with self.subTest(msg=f"Send contents with extension {repr(extension)}"):
+ self.assertEqual(
+ await send_to_paste_service(self.http_session, "", extension=extension),
+ expected_output
+ )
+
+ async def test_request_repeated_on_json_errors(self):
+ """Json with error message and invalid json are handled as errors and requests repeated."""
+ test_cases = ({"message": "error"}, {"unexpected_key": None}, {})
+ self.http_session.post().__aenter__.return_value = response = MagicMock()
+ self.http_session.post.reset_mock()
+
+ for error_json in test_cases:
+ with self.subTest(error_json=error_json):
+ response.json = AsyncMock(return_value=error_json)
+ result = await send_to_paste_service(self.http_session, "")
+ self.assertEqual(self.http_session.post.call_count, FAILED_REQUEST_ATTEMPTS)
+ self.assertIsNone(result)
+
+ self.http_session.post.reset_mock()
+
+ async def test_request_repeated_on_connection_errors(self):
+ """Requests are repeated in the case of connection errors."""
+ self.http_session.post = MagicMock(side_effect=ClientConnectorError(Mock(), Mock()))
+ result = await send_to_paste_service(self.http_session, "")
+ self.assertEqual(self.http_session.post.call_count, FAILED_REQUEST_ATTEMPTS)
+ self.assertIsNone(result)
+
+ async def test_general_error_handled_and_request_repeated(self):
+ """All `Exception`s are handled, logged and request repeated."""
+ self.http_session.post = MagicMock(side_effect=Exception)
+ result = await send_to_paste_service(self.http_session, "")
+ self.assertEqual(self.http_session.post.call_count, FAILED_REQUEST_ATTEMPTS)
+ self.assertLogs("bot.utils", logging.ERROR)
+ self.assertIsNone(result)
diff --git a/tests/helpers.py b/tests/helpers.py
index facc4e1af..870f66197 100644
--- a/tests/helpers.py
+++ b/tests/helpers.py
@@ -5,7 +5,7 @@ import itertools
import logging
import unittest.mock
from asyncio import AbstractEventLoop
-from typing import Callable, Iterable, Optional
+from typing import Iterable, Optional
import discord
from aiohttp import ClientSession
@@ -14,6 +14,7 @@ from discord.ext.commands import Context
from bot.api import APIClient
from bot.async_stats import AsyncStatsClient
from bot.bot import Bot
+from tests._autospec import autospec # noqa: F401 other modules import it via this module
for logger in logging.Logger.manager.loggerDict.values():
@@ -26,24 +27,6 @@ for logger in logging.Logger.manager.loggerDict.values():
logger.setLevel(logging.CRITICAL)
-def autospec(target, *attributes: str, **kwargs) -> Callable:
- """Patch multiple `attributes` of a `target` with autospecced mocks and `spec_set` as True."""
- # Caller's kwargs should take priority and overwrite the defaults.
- kwargs = {'spec_set': True, 'autospec': True, **kwargs}
-
- # Import the target if it's a string.
- # This is to support both object and string targets like patch.multiple.
- if type(target) is str:
- target = unittest.mock._importer(target)
-
- def decorator(func):
- for attribute in attributes:
- patcher = unittest.mock.patch.object(target, attribute, **kwargs)
- func = patcher(func)
- return func
- return decorator
-
-
class HashableMixin(discord.mixins.EqualityComparable):
"""
Mixin that provides similar hashing and equality functionality as discord.py's `Hashable` mixin.
@@ -308,7 +291,11 @@ class MockBot(CustomMockMixin, unittest.mock.MagicMock):
Instances of this class will follow the specifications of `discord.ext.commands.Bot` instances.
For more information, see the `MockGuild` docstring.
"""
- spec_set = Bot(command_prefix=unittest.mock.MagicMock(), loop=_get_mock_loop())
+ spec_set = Bot(
+ command_prefix=unittest.mock.MagicMock(),
+ loop=_get_mock_loop(),
+ redis_session=unittest.mock.MagicMock(),
+ )
additional_spec_asyncs = ("wait_for", "redis_ready")
def __init__(self, **kwargs) -> None: