diff options
Diffstat (limited to 'tests')
37 files changed, 2806 insertions, 1945 deletions
diff --git a/tests/_autospec.py b/tests/_autospec.py new file mode 100644 index 000000000..ee2fc1973 --- /dev/null +++ b/tests/_autospec.py @@ -0,0 +1,64 @@ +import contextlib +import functools +import unittest.mock +from typing import Callable + + [email protected](unittest.mock._patch.decoration_helper) +def _decoration_helper(self, patched, args, keywargs): + """Skips adding patchings as args if their `dont_pass` attribute is True.""" + # Don't ask what this does. It's just a copy from stdlib, but with the dont_pass check added. + extra_args = [] + with contextlib.ExitStack() as exit_stack: + for patching in patched.patchings: + arg = exit_stack.enter_context(patching) + if not getattr(patching, "dont_pass", False): + # Only add the patching as an arg if dont_pass is False. + if patching.attribute_name is not None: + keywargs.update(arg) + elif patching.new is unittest.mock.DEFAULT: + extra_args.append(arg) + + args += tuple(extra_args) + yield args, keywargs + + [email protected](unittest.mock._patch.copy) +def _copy(self): + """Copy the `dont_pass` attribute along with the standard copy operation.""" + patcher_copy = _copy.original(self) + patcher_copy.dont_pass = getattr(self, "dont_pass", False) + return patcher_copy + + +# Monkey-patch the patcher class :) +_copy.original = unittest.mock._patch.copy +unittest.mock._patch.copy = _copy +unittest.mock._patch.decoration_helper = _decoration_helper + + +def autospec(target, *attributes: str, pass_mocks: bool = True, **patch_kwargs) -> Callable: + """ + Patch multiple `attributes` of a `target` with autospecced mocks and `spec_set` as True. + + If `pass_mocks` is True, pass the autospecced mocks as arguments to the decorated object. + """ + # Caller's kwargs should take priority and overwrite the defaults. + kwargs = dict(spec_set=True, autospec=True) + kwargs.update(patch_kwargs) + + # Import the target if it's a string. + # This is to support both object and string targets like patch.multiple. + if type(target) is str: + target = unittest.mock._importer(target) + + def decorator(func): + for attribute in attributes: + patcher = unittest.mock.patch.object(target, attribute, **kwargs) + if not pass_mocks: + # A custom attribute to keep track of which patchings should be skipped. + patcher.dont_pass = True + func = patcher(func) + return func + return decorator diff --git a/tests/bot/cogs/moderation/test_infractions.py b/tests/bot/cogs/moderation/test_infractions.py deleted file mode 100644 index da4e92ccc..000000000 --- a/tests/bot/cogs/moderation/test_infractions.py +++ /dev/null @@ -1,55 +0,0 @@ -import textwrap -import unittest -from unittest.mock import AsyncMock, Mock, patch - -from bot.cogs.moderation.infractions import Infractions -from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole - - -class TruncationTests(unittest.IsolatedAsyncioTestCase): - """Tests for ban and kick command reason truncation.""" - - def setUp(self): - self.bot = MockBot() - self.cog = Infractions(self.bot) - self.user = MockMember(id=1234, top_role=MockRole(id=3577, position=10)) - self.target = MockMember(id=1265, top_role=MockRole(id=9876, position=0)) - self.guild = MockGuild(id=4567) - self.ctx = MockContext(bot=self.bot, author=self.user, guild=self.guild) - - @patch("bot.cogs.moderation.utils.get_active_infraction") - @patch("bot.cogs.moderation.utils.post_infraction") - async def test_apply_ban_reason_truncation(self, post_infraction_mock, get_active_mock): - """Should truncate reason for `ctx.guild.ban`.""" - get_active_mock.return_value = None - post_infraction_mock.return_value = {"foo": "bar"} - - self.cog.apply_infraction = AsyncMock() - self.bot.get_cog.return_value = AsyncMock() - self.cog.mod_log.ignore = Mock() - self.ctx.guild.ban = Mock() - - await self.cog.apply_ban(self.ctx, self.target, "foo bar" * 3000) - self.ctx.guild.ban.assert_called_once_with( - self.target, - reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="..."), - delete_message_days=0 - ) - self.cog.apply_infraction.assert_awaited_once_with( - self.ctx, {"foo": "bar"}, self.target, self.ctx.guild.ban.return_value - ) - - @patch("bot.cogs.moderation.utils.post_infraction") - async def test_apply_kick_reason_truncation(self, post_infraction_mock): - """Should truncate reason for `Member.kick`.""" - post_infraction_mock.return_value = {"foo": "bar"} - - self.cog.apply_infraction = AsyncMock() - self.cog.mod_log.ignore = Mock() - self.target.kick = Mock() - - await self.cog.apply_kick(self.ctx, self.target, "foo bar" * 3000) - self.target.kick.assert_called_once_with(reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="...")) - self.cog.apply_infraction.assert_awaited_once_with( - self.ctx, {"foo": "bar"}, self.target, self.target.kick.return_value - ) diff --git a/tests/bot/cogs/moderation/test_silence.py b/tests/bot/cogs/moderation/test_silence.py deleted file mode 100644 index ab3d0742a..000000000 --- a/tests/bot/cogs/moderation/test_silence.py +++ /dev/null @@ -1,261 +0,0 @@ -import unittest -from unittest import mock -from unittest.mock import MagicMock, Mock - -from discord import PermissionOverwrite - -from bot.cogs.moderation.silence import Silence, SilenceNotifier -from bot.constants import Channels, Emojis, Guild, Roles -from tests.helpers import MockBot, MockContext, MockTextChannel - - -class SilenceNotifierTests(unittest.IsolatedAsyncioTestCase): - def setUp(self) -> None: - self.alert_channel = MockTextChannel() - self.notifier = SilenceNotifier(self.alert_channel) - self.notifier.stop = self.notifier_stop_mock = Mock() - self.notifier.start = self.notifier_start_mock = Mock() - - def test_add_channel_adds_channel(self): - """Channel in FirstHash with current loop is added to internal set.""" - channel = Mock() - with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels: - self.notifier.add_channel(channel) - silenced_channels.__setitem__.assert_called_with(channel, self.notifier._current_loop) - - def test_add_channel_starts_loop(self): - """Loop is started if `_silenced_channels` was empty.""" - self.notifier.add_channel(Mock()) - self.notifier_start_mock.assert_called_once() - - def test_add_channel_skips_start_with_channels(self): - """Loop start is not called when `_silenced_channels` is not empty.""" - with mock.patch.object(self.notifier, "_silenced_channels"): - self.notifier.add_channel(Mock()) - self.notifier_start_mock.assert_not_called() - - def test_remove_channel_removes_channel(self): - """Channel in FirstHash is removed from `_silenced_channels`.""" - channel = Mock() - with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels: - self.notifier.remove_channel(channel) - silenced_channels.__delitem__.assert_called_with(channel) - - def test_remove_channel_stops_loop(self): - """Notifier loop is stopped if `_silenced_channels` is empty after remove.""" - with mock.patch.object(self.notifier, "_silenced_channels", __bool__=lambda _: False): - self.notifier.remove_channel(Mock()) - self.notifier_stop_mock.assert_called_once() - - def test_remove_channel_skips_stop_with_channels(self): - """Notifier loop is not stopped if `_silenced_channels` is not empty after remove.""" - self.notifier.remove_channel(Mock()) - self.notifier_stop_mock.assert_not_called() - - async def test_notifier_private_sends_alert(self): - """Alert is sent on 15 min intervals.""" - test_cases = (900, 1800, 2700) - for current_loop in test_cases: - with self.subTest(current_loop=current_loop): - with mock.patch.object(self.notifier, "_current_loop", new=current_loop): - await self.notifier._notifier() - self.alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> currently silenced channels: ") - self.alert_channel.send.reset_mock() - - async def test_notifier_skips_alert(self): - """Alert is skipped on first loop or not an increment of 900.""" - test_cases = (0, 15, 5000) - for current_loop in test_cases: - with self.subTest(current_loop=current_loop): - with mock.patch.object(self.notifier, "_current_loop", new=current_loop): - await self.notifier._notifier() - self.alert_channel.send.assert_not_called() - - -class SilenceTests(unittest.IsolatedAsyncioTestCase): - def setUp(self) -> None: - self.bot = MockBot() - self.cog = Silence(self.bot) - self.ctx = MockContext() - self.cog._verified_role = None - # Set event so command callbacks can continue. - self.cog._get_instance_vars_event.set() - - async def test_instance_vars_got_guild(self): - """Bot got guild after it became available.""" - await self.cog._get_instance_vars() - self.bot.wait_until_guild_available.assert_called_once() - self.bot.get_guild.assert_called_once_with(Guild.id) - - async def test_instance_vars_got_role(self): - """Got `Roles.verified` role from guild.""" - await self.cog._get_instance_vars() - guild = self.bot.get_guild() - guild.get_role.assert_called_once_with(Roles.verified) - - async def test_instance_vars_got_channels(self): - """Got channels from bot.""" - await self.cog._get_instance_vars() - self.bot.get_channel.called_once_with(Channels.mod_alerts) - self.bot.get_channel.called_once_with(Channels.mod_log) - - @mock.patch("bot.cogs.moderation.silence.SilenceNotifier") - async def test_instance_vars_got_notifier(self, notifier): - """Notifier was started with channel.""" - mod_log = MockTextChannel() - self.bot.get_channel.side_effect = (None, mod_log) - await self.cog._get_instance_vars() - notifier.assert_called_once_with(mod_log) - self.bot.get_channel.side_effect = None - - async def test_silence_sent_correct_discord_message(self): - """Check if proper message was sent when called with duration in channel with previous state.""" - test_cases = ( - (0.0001, f"{Emojis.check_mark} silenced current channel for 0.0001 minute(s).", True,), - (None, f"{Emojis.check_mark} silenced current channel indefinitely.", True,), - (5, f"{Emojis.cross_mark} current channel is already silenced.", False,), - ) - for duration, result_message, _silence_patch_return in test_cases: - with self.subTest( - silence_duration=duration, - result_message=result_message, - starting_unsilenced_state=_silence_patch_return - ): - with mock.patch.object(self.cog, "_silence", return_value=_silence_patch_return): - await self.cog.silence.callback(self.cog, self.ctx, duration) - self.ctx.send.assert_called_once_with(result_message) - self.ctx.reset_mock() - - async def test_unsilence_sent_correct_discord_message(self): - """Check if proper message was sent when unsilencing channel.""" - test_cases = ( - (True, f"{Emojis.check_mark} unsilenced current channel."), - (False, f"{Emojis.cross_mark} current channel was not silenced.") - ) - for _unsilence_patch_return, result_message in test_cases: - with self.subTest( - starting_silenced_state=_unsilence_patch_return, - result_message=result_message - ): - with mock.patch.object(self.cog, "_unsilence", return_value=_unsilence_patch_return): - await self.cog.unsilence.callback(self.cog, self.ctx) - self.ctx.send.assert_called_once_with(result_message) - self.ctx.reset_mock() - - async def test_silence_private_for_false(self): - """Permissions are not set and `False` is returned in an already silenced channel.""" - perm_overwrite = Mock(send_messages=False) - channel = Mock(overwrites_for=Mock(return_value=perm_overwrite)) - - self.assertFalse(await self.cog._silence(channel, True, None)) - channel.set_permissions.assert_not_called() - - async def test_silence_private_silenced_channel(self): - """Channel had `send_message` permissions revoked.""" - channel = MockTextChannel() - self.assertTrue(await self.cog._silence(channel, False, None)) - channel.set_permissions.assert_called_once() - self.assertFalse(channel.set_permissions.call_args.kwargs['send_messages']) - - async def test_silence_private_preserves_permissions(self): - """Previous permissions were preserved when channel was silenced.""" - channel = MockTextChannel() - # Set up mock channel permission state. - mock_permissions = PermissionOverwrite() - mock_permissions_dict = dict(mock_permissions) - channel.overwrites_for.return_value = mock_permissions - await self.cog._silence(channel, False, None) - new_permissions = channel.set_permissions.call_args.kwargs - # Remove 'send_messages' key because it got changed in the method. - del new_permissions['send_messages'] - del mock_permissions_dict['send_messages'] - self.assertDictEqual(mock_permissions_dict, new_permissions) - - async def test_silence_private_notifier(self): - """Channel should be added to notifier with `persistent` set to `True`, and the other way around.""" - channel = MockTextChannel() - with mock.patch.object(self.cog, "notifier", create=True): - with self.subTest(persistent=True): - await self.cog._silence(channel, True, None) - self.cog.notifier.add_channel.assert_called_once() - - with mock.patch.object(self.cog, "notifier", create=True): - with self.subTest(persistent=False): - await self.cog._silence(channel, False, None) - self.cog.notifier.add_channel.assert_not_called() - - async def test_silence_private_added_muted_channel(self): - """Channel was added to `muted_channels` on silence.""" - channel = MockTextChannel() - with mock.patch.object(self.cog, "muted_channels") as muted_channels: - await self.cog._silence(channel, False, None) - muted_channels.add.assert_called_once_with(channel) - - async def test_unsilence_private_for_false(self): - """Permissions are not set and `False` is returned in an unsilenced channel.""" - channel = Mock() - self.assertFalse(await self.cog._unsilence(channel)) - channel.set_permissions.assert_not_called() - - @mock.patch.object(Silence, "notifier", create=True) - async def test_unsilence_private_unsilenced_channel(self, _): - """Channel had `send_message` permissions restored""" - perm_overwrite = MagicMock(send_messages=False) - channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite)) - self.assertTrue(await self.cog._unsilence(channel)) - channel.set_permissions.assert_called_once() - self.assertIsNone(channel.set_permissions.call_args.kwargs['send_messages']) - - @mock.patch.object(Silence, "notifier", create=True) - async def test_unsilence_private_removed_notifier(self, notifier): - """Channel was removed from `notifier` on unsilence.""" - perm_overwrite = MagicMock(send_messages=False) - channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite)) - await self.cog._unsilence(channel) - notifier.remove_channel.assert_called_once_with(channel) - - @mock.patch.object(Silence, "notifier", create=True) - async def test_unsilence_private_removed_muted_channel(self, _): - """Channel was removed from `muted_channels` on unsilence.""" - perm_overwrite = MagicMock(send_messages=False) - channel = MockTextChannel(overwrites_for=Mock(return_value=perm_overwrite)) - with mock.patch.object(self.cog, "muted_channels") as muted_channels: - await self.cog._unsilence(channel) - muted_channels.discard.assert_called_once_with(channel) - - @mock.patch.object(Silence, "notifier", create=True) - async def test_unsilence_private_preserves_permissions(self, _): - """Previous permissions were preserved when channel was unsilenced.""" - channel = MockTextChannel() - # Set up mock channel permission state. - mock_permissions = PermissionOverwrite(send_messages=False) - mock_permissions_dict = dict(mock_permissions) - channel.overwrites_for.return_value = mock_permissions - await self.cog._unsilence(channel) - new_permissions = channel.set_permissions.call_args.kwargs - # Remove 'send_messages' key because it got changed in the method. - del new_permissions['send_messages'] - del mock_permissions_dict['send_messages'] - self.assertDictEqual(mock_permissions_dict, new_permissions) - - @mock.patch("bot.cogs.moderation.silence.asyncio") - @mock.patch.object(Silence, "_mod_alerts_channel", create=True) - def test_cog_unload_starts_task(self, alert_channel, asyncio_mock): - """Task for sending an alert was created with present `muted_channels`.""" - with mock.patch.object(self.cog, "muted_channels"): - self.cog.cog_unload() - alert_channel.send.assert_called_once_with(f"<@&{Roles.moderators}> channels left silenced on cog unload: ") - asyncio_mock.create_task.assert_called_once_with(alert_channel.send()) - - @mock.patch("bot.cogs.moderation.silence.asyncio") - def test_cog_unload_skips_task_start(self, asyncio_mock): - """No task created with no channels.""" - self.cog.cog_unload() - asyncio_mock.create_task.assert_not_called() - - @mock.patch("bot.cogs.moderation.silence.with_role_check") - @mock.patch("bot.cogs.moderation.silence.MODERATION_ROLES", new=(1, 2, 3)) - def test_cog_check(self, role_check): - """Role check is called with `MODERATION_ROLES`""" - self.cog.cog_check(self.ctx) - role_check.assert_called_once_with(self.ctx, *(1, 2, 3)) diff --git a/tests/bot/cogs/sync/test_base.py b/tests/bot/cogs/sync/test_base.py deleted file mode 100644 index 70aea2bab..000000000 --- a/tests/bot/cogs/sync/test_base.py +++ /dev/null @@ -1,404 +0,0 @@ -import asyncio -import unittest -from unittest import mock - -import discord - -from bot import constants -from bot.api import ResponseCodeError -from bot.cogs.sync.syncers import Syncer, _Diff -from tests import helpers - - -class TestSyncer(Syncer): - """Syncer subclass with mocks for abstract methods for testing purposes.""" - - name = "test" - _get_diff = mock.AsyncMock() - _sync = mock.AsyncMock() - - -class SyncerBaseTests(unittest.TestCase): - """Tests for the syncer base class.""" - - def setUp(self): - self.bot = helpers.MockBot() - - def test_instantiation_fails_without_abstract_methods(self): - """The class must have abstract methods implemented.""" - with self.assertRaisesRegex(TypeError, "Can't instantiate abstract class"): - Syncer(self.bot) - - -class SyncerSendPromptTests(unittest.IsolatedAsyncioTestCase): - """Tests for sending the sync confirmation prompt.""" - - def setUp(self): - self.bot = helpers.MockBot() - self.syncer = TestSyncer(self.bot) - - def mock_get_channel(self): - """Fixture to return a mock channel and message for when `get_channel` is used.""" - self.bot.reset_mock() - - mock_channel = helpers.MockTextChannel() - mock_message = helpers.MockMessage() - - mock_channel.send.return_value = mock_message - self.bot.get_channel.return_value = mock_channel - - return mock_channel, mock_message - - def mock_fetch_channel(self): - """Fixture to return a mock channel and message for when `fetch_channel` is used.""" - self.bot.reset_mock() - - mock_channel = helpers.MockTextChannel() - mock_message = helpers.MockMessage() - - self.bot.get_channel.return_value = None - mock_channel.send.return_value = mock_message - self.bot.fetch_channel.return_value = mock_channel - - return mock_channel, mock_message - - async def test_send_prompt_edits_and_returns_message(self): - """The given message should be edited to display the prompt and then should be returned.""" - msg = helpers.MockMessage() - ret_val = await self.syncer._send_prompt(msg) - - msg.edit.assert_called_once() - self.assertIn("content", msg.edit.call_args[1]) - self.assertEqual(ret_val, msg) - - async def test_send_prompt_gets_dev_core_channel(self): - """The dev-core channel should be retrieved if an extant message isn't given.""" - subtests = ( - (self.bot.get_channel, self.mock_get_channel), - (self.bot.fetch_channel, self.mock_fetch_channel), - ) - - for method, mock_ in subtests: - with self.subTest(method=method, msg=mock_.__name__): - mock_() - await self.syncer._send_prompt() - - method.assert_called_once_with(constants.Channels.dev_core) - - async def test_send_prompt_returns_none_if_channel_fetch_fails(self): - """None should be returned if there's an HTTPException when fetching the channel.""" - self.bot.get_channel.return_value = None - self.bot.fetch_channel.side_effect = discord.HTTPException(mock.MagicMock(), "test error!") - - ret_val = await self.syncer._send_prompt() - - self.assertIsNone(ret_val) - - async def test_send_prompt_sends_and_returns_new_message_if_not_given(self): - """A new message mentioning core devs should be sent and returned if message isn't given.""" - for mock_ in (self.mock_get_channel, self.mock_fetch_channel): - with self.subTest(msg=mock_.__name__): - mock_channel, mock_message = mock_() - ret_val = await self.syncer._send_prompt() - - mock_channel.send.assert_called_once() - self.assertIn(self.syncer._CORE_DEV_MENTION, mock_channel.send.call_args[0][0]) - self.assertEqual(ret_val, mock_message) - - async def test_send_prompt_adds_reactions(self): - """The message should have reactions for confirmation added.""" - extant_message = helpers.MockMessage() - subtests = ( - (extant_message, lambda: (None, extant_message)), - (None, self.mock_get_channel), - (None, self.mock_fetch_channel), - ) - - for message_arg, mock_ in subtests: - subtest_msg = "Extant message" if mock_.__name__ == "<lambda>" else mock_.__name__ - - with self.subTest(msg=subtest_msg): - _, mock_message = mock_() - await self.syncer._send_prompt(message_arg) - - calls = [mock.call(emoji) for emoji in self.syncer._REACTION_EMOJIS] - mock_message.add_reaction.assert_has_calls(calls) - - -class SyncerConfirmationTests(unittest.IsolatedAsyncioTestCase): - """Tests for waiting for a sync confirmation reaction on the prompt.""" - - def setUp(self): - self.bot = helpers.MockBot() - self.syncer = TestSyncer(self.bot) - self.core_dev_role = helpers.MockRole(id=constants.Roles.core_developers) - - @staticmethod - def get_message_reaction(emoji): - """Fixture to return a mock message an reaction from the given `emoji`.""" - message = helpers.MockMessage() - reaction = helpers.MockReaction(emoji=emoji, message=message) - - return message, reaction - - def test_reaction_check_for_valid_emoji_and_authors(self): - """Should return True if authors are identical or are a bot and a core dev, respectively.""" - user_subtests = ( - ( - helpers.MockMember(id=77), - helpers.MockMember(id=77), - "identical users", - ), - ( - helpers.MockMember(id=77, bot=True), - helpers.MockMember(id=43, roles=[self.core_dev_role]), - "bot author and core-dev reactor", - ), - ) - - for emoji in self.syncer._REACTION_EMOJIS: - for author, user, msg in user_subtests: - with self.subTest(author=author, user=user, emoji=emoji, msg=msg): - message, reaction = self.get_message_reaction(emoji) - ret_val = self.syncer._reaction_check(author, message, reaction, user) - - self.assertTrue(ret_val) - - def test_reaction_check_for_invalid_reactions(self): - """Should return False for invalid reaction events.""" - valid_emoji = self.syncer._REACTION_EMOJIS[0] - subtests = ( - ( - helpers.MockMember(id=77), - *self.get_message_reaction(valid_emoji), - helpers.MockMember(id=43, roles=[self.core_dev_role]), - "users are not identical", - ), - ( - helpers.MockMember(id=77, bot=True), - *self.get_message_reaction(valid_emoji), - helpers.MockMember(id=43), - "reactor lacks the core-dev role", - ), - ( - helpers.MockMember(id=77, bot=True, roles=[self.core_dev_role]), - *self.get_message_reaction(valid_emoji), - helpers.MockMember(id=77, bot=True, roles=[self.core_dev_role]), - "reactor is a bot", - ), - ( - helpers.MockMember(id=77), - helpers.MockMessage(id=95), - helpers.MockReaction(emoji=valid_emoji, message=helpers.MockMessage(id=26)), - helpers.MockMember(id=77), - "messages are not identical", - ), - ( - helpers.MockMember(id=77), - *self.get_message_reaction("InVaLiD"), - helpers.MockMember(id=77), - "emoji is invalid", - ), - ) - - for *args, msg in subtests: - kwargs = dict(zip(("author", "message", "reaction", "user"), args)) - with self.subTest(**kwargs, msg=msg): - ret_val = self.syncer._reaction_check(*args) - self.assertFalse(ret_val) - - async def test_wait_for_confirmation(self): - """The message should always be edited and only return True if the emoji is a check mark.""" - subtests = ( - (constants.Emojis.check_mark, True, None), - ("InVaLiD", False, None), - (None, False, asyncio.TimeoutError), - ) - - for emoji, ret_val, side_effect in subtests: - for bot in (True, False): - with self.subTest(emoji=emoji, ret_val=ret_val, side_effect=side_effect, bot=bot): - # Set up mocks - message = helpers.MockMessage() - member = helpers.MockMember(bot=bot) - - self.bot.wait_for.reset_mock() - self.bot.wait_for.return_value = (helpers.MockReaction(emoji=emoji), None) - self.bot.wait_for.side_effect = side_effect - - # Call the function - actual_return = await self.syncer._wait_for_confirmation(member, message) - - # Perform assertions - self.bot.wait_for.assert_called_once() - self.assertIn("reaction_add", self.bot.wait_for.call_args[0]) - - message.edit.assert_called_once() - kwargs = message.edit.call_args[1] - self.assertIn("content", kwargs) - - # Core devs should only be mentioned if the author is a bot. - if bot: - self.assertIn(self.syncer._CORE_DEV_MENTION, kwargs["content"]) - else: - self.assertNotIn(self.syncer._CORE_DEV_MENTION, kwargs["content"]) - - self.assertIs(actual_return, ret_val) - - -class SyncerSyncTests(unittest.IsolatedAsyncioTestCase): - """Tests for main function orchestrating the sync.""" - - def setUp(self): - self.bot = helpers.MockBot(user=helpers.MockMember(bot=True)) - self.syncer = TestSyncer(self.bot) - - async def test_sync_respects_confirmation_result(self): - """The sync should abort if confirmation fails and continue if confirmed.""" - mock_message = helpers.MockMessage() - subtests = ( - (True, mock_message), - (False, None), - ) - - for confirmed, message in subtests: - with self.subTest(confirmed=confirmed): - self.syncer._sync.reset_mock() - self.syncer._get_diff.reset_mock() - - diff = _Diff({1, 2, 3}, {4, 5}, None) - self.syncer._get_diff.return_value = diff - self.syncer._get_confirmation_result = mock.AsyncMock( - return_value=(confirmed, message) - ) - - guild = helpers.MockGuild() - await self.syncer.sync(guild) - - self.syncer._get_diff.assert_called_once_with(guild) - self.syncer._get_confirmation_result.assert_called_once() - - if confirmed: - self.syncer._sync.assert_called_once_with(diff) - else: - self.syncer._sync.assert_not_called() - - async def test_sync_diff_size(self): - """The diff size should be correctly calculated.""" - subtests = ( - (6, _Diff({1, 2}, {3, 4}, {5, 6})), - (5, _Diff({1, 2, 3}, None, {4, 5})), - (0, _Diff(None, None, None)), - (0, _Diff(set(), set(), set())), - ) - - for size, diff in subtests: - with self.subTest(size=size, diff=diff): - self.syncer._get_diff.reset_mock() - self.syncer._get_diff.return_value = diff - self.syncer._get_confirmation_result = mock.AsyncMock(return_value=(False, None)) - - guild = helpers.MockGuild() - await self.syncer.sync(guild) - - self.syncer._get_diff.assert_called_once_with(guild) - self.syncer._get_confirmation_result.assert_called_once() - self.assertEqual(self.syncer._get_confirmation_result.call_args[0][0], size) - - async def test_sync_message_edited(self): - """The message should be edited if one was sent, even if the sync has an API error.""" - subtests = ( - (None, None, False), - (helpers.MockMessage(), None, True), - (helpers.MockMessage(), ResponseCodeError(mock.MagicMock()), True), - ) - - for message, side_effect, should_edit in subtests: - with self.subTest(message=message, side_effect=side_effect, should_edit=should_edit): - self.syncer._sync.side_effect = side_effect - self.syncer._get_confirmation_result = mock.AsyncMock( - return_value=(True, message) - ) - - guild = helpers.MockGuild() - await self.syncer.sync(guild) - - if should_edit: - message.edit.assert_called_once() - self.assertIn("content", message.edit.call_args[1]) - - async def test_sync_confirmation_context_redirect(self): - """If ctx is given, a new message should be sent and author should be ctx's author.""" - mock_member = helpers.MockMember() - subtests = ( - (None, self.bot.user, None), - (helpers.MockContext(author=mock_member), mock_member, helpers.MockMessage()), - ) - - for ctx, author, message in subtests: - with self.subTest(ctx=ctx, author=author, message=message): - if ctx is not None: - ctx.send.return_value = message - - # Make sure `_get_diff` returns a MagicMock, not an AsyncMock - self.syncer._get_diff.return_value = mock.MagicMock() - - self.syncer._get_confirmation_result = mock.AsyncMock(return_value=(False, None)) - - guild = helpers.MockGuild() - await self.syncer.sync(guild, ctx) - - if ctx is not None: - ctx.send.assert_called_once() - - self.syncer._get_confirmation_result.assert_called_once() - self.assertEqual(self.syncer._get_confirmation_result.call_args[0][1], author) - self.assertEqual(self.syncer._get_confirmation_result.call_args[0][2], message) - - @mock.patch.object(constants.Sync, "max_diff", new=3) - async def test_confirmation_result_small_diff(self): - """Should always return True and the given message if the diff size is too small.""" - author = helpers.MockMember() - expected_message = helpers.MockMessage() - - for size in (3, 2): # pragma: no cover - with self.subTest(size=size): - self.syncer._send_prompt = mock.AsyncMock() - self.syncer._wait_for_confirmation = mock.AsyncMock() - - coro = self.syncer._get_confirmation_result(size, author, expected_message) - result, actual_message = await coro - - self.assertTrue(result) - self.assertEqual(actual_message, expected_message) - self.syncer._send_prompt.assert_not_called() - self.syncer._wait_for_confirmation.assert_not_called() - - @mock.patch.object(constants.Sync, "max_diff", new=3) - async def test_confirmation_result_large_diff(self): - """Should return True if confirmed and False if _send_prompt fails or aborted.""" - author = helpers.MockMember() - mock_message = helpers.MockMessage() - - subtests = ( - (True, mock_message, True, "confirmed"), - (False, None, False, "_send_prompt failed"), - (False, mock_message, False, "aborted"), - ) - - for expected_result, expected_message, confirmed, msg in subtests: # pragma: no cover - with self.subTest(msg=msg): - self.syncer._send_prompt = mock.AsyncMock(return_value=expected_message) - self.syncer._wait_for_confirmation = mock.AsyncMock(return_value=confirmed) - - coro = self.syncer._get_confirmation_result(4, author) - actual_result, actual_message = await coro - - self.syncer._send_prompt.assert_called_once_with(None) # message defaults to None - self.assertIs(actual_result, expected_result) - self.assertEqual(actual_message, expected_message) - - if expected_message: - self.syncer._wait_for_confirmation.assert_called_once_with( - author, expected_message - ) diff --git a/tests/bot/cogs/test_duck_pond.py b/tests/bot/cogs/test_duck_pond.py deleted file mode 100644 index a8c0107c6..000000000 --- a/tests/bot/cogs/test_duck_pond.py +++ /dev/null @@ -1,575 +0,0 @@ -import asyncio -import logging -import typing -import unittest -from unittest.mock import AsyncMock, MagicMock, patch - -import discord - -from bot import constants -from bot.cogs import duck_pond -from tests import base -from tests import helpers - -MODULE_PATH = "bot.cogs.duck_pond" - - -class DuckPondTests(base.LoggingTestsMixin, unittest.IsolatedAsyncioTestCase): - """Tests for DuckPond functionality.""" - - @classmethod - def setUpClass(cls): - """Sets up the objects that only have to be initialized once.""" - cls.nonstaff_member = helpers.MockMember(name="Non-staffer") - - cls.staff_role = helpers.MockRole(name="Staff role", id=constants.STAFF_ROLES[0]) - cls.staff_member = helpers.MockMember(name="staffer", roles=[cls.staff_role]) - - cls.checkmark_emoji = "\N{White Heavy Check Mark}" - cls.thumbs_up_emoji = "\N{Thumbs Up Sign}" - cls.unicode_duck_emoji = "\N{Duck}" - cls.duck_pond_emoji = helpers.MockPartialEmoji(id=constants.DuckPond.custom_emojis[0]) - cls.non_duck_custom_emoji = helpers.MockPartialEmoji(id=123) - - def setUp(self): - """Sets up the objects that need to be refreshed before each test.""" - self.bot = helpers.MockBot(user=helpers.MockMember(id=46692)) - self.cog = duck_pond.DuckPond(bot=self.bot) - - def test_duck_pond_correctly_initializes(self): - """`__init__ should set `bot` and `webhook_id` attributes and schedule `fetch_webhook`.""" - bot = helpers.MockBot() - cog = MagicMock() - - duck_pond.DuckPond.__init__(cog, bot) - - self.assertEqual(cog.bot, bot) - self.assertEqual(cog.webhook_id, constants.Webhooks.duck_pond) - bot.loop.create_task.assert_called_once_with(cog.fetch_webhook()) - - def test_fetch_webhook_succeeds_without_connectivity_issues(self): - """The `fetch_webhook` method waits until `READY` event and sets the `webhook` attribute.""" - self.bot.fetch_webhook.return_value = "dummy webhook" - self.cog.webhook_id = 1 - - asyncio.run(self.cog.fetch_webhook()) - - self.bot.wait_until_guild_available.assert_called_once() - self.bot.fetch_webhook.assert_called_once_with(1) - self.assertEqual(self.cog.webhook, "dummy webhook") - - def test_fetch_webhook_logs_when_unable_to_fetch_webhook(self): - """The `fetch_webhook` method should log an exception when it fails to fetch the webhook.""" - self.bot.fetch_webhook.side_effect = discord.HTTPException(response=MagicMock(), message="Not found.") - self.cog.webhook_id = 1 - - log = logging.getLogger('bot.cogs.duck_pond') - with self.assertLogs(logger=log, level=logging.ERROR) as log_watcher: - asyncio.run(self.cog.fetch_webhook()) - - self.bot.wait_until_guild_available.assert_called_once() - self.bot.fetch_webhook.assert_called_once_with(1) - - self.assertEqual(len(log_watcher.records), 1) - - record = log_watcher.records[0] - self.assertEqual(record.levelno, logging.ERROR) - - def test_is_staff_returns_correct_values_based_on_instance_passed(self): - """The `is_staff` method should return correct values based on the instance passed.""" - test_cases = ( - (helpers.MockUser(name="User instance"), False), - (helpers.MockMember(name="Member instance without staff role"), False), - (helpers.MockMember(name="Member instance with staff role", roles=[self.staff_role]), True) - ) - - for user, expected_return in test_cases: - actual_return = self.cog.is_staff(user) - with self.subTest(user_type=user.name, expected_return=expected_return, actual_return=actual_return): - self.assertEqual(expected_return, actual_return) - - async def test_has_green_checkmark_correctly_detects_presence_of_green_checkmark_emoji(self): - """The `has_green_checkmark` method should only return `True` if one is present.""" - test_cases = ( - ( - "No reactions", helpers.MockMessage(), False - ), - ( - "No green check mark reactions", - helpers.MockMessage(reactions=[ - helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.bot.user]), - helpers.MockReaction(emoji=self.thumbs_up_emoji, users=[self.bot.user]) - ]), - False - ), - ( - "Green check mark reaction, but not from our bot", - helpers.MockMessage(reactions=[ - helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.bot.user]), - helpers.MockReaction(emoji=self.checkmark_emoji, users=[self.staff_member]) - ]), - False - ), - ( - "Green check mark reaction, with one from the bot", - helpers.MockMessage(reactions=[ - helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.bot.user]), - helpers.MockReaction(emoji=self.checkmark_emoji, users=[self.staff_member, self.bot.user]) - ]), - True - ) - ) - - for description, message, expected_return in test_cases: - actual_return = await self.cog.has_green_checkmark(message) - with self.subTest( - test_case=description, - expected_return=expected_return, - actual_return=actual_return - ): - self.assertEqual(expected_return, actual_return) - - def test_send_webhook_correctly_passes_on_arguments(self): - """The `send_webhook` method should pass the arguments to the webhook correctly.""" - self.cog.webhook = helpers.MockAsyncWebhook() - - content = "fake content" - username = "fake username" - avatar_url = "fake avatar_url" - embed = "fake embed" - - asyncio.run(self.cog.send_webhook(content, username, avatar_url, embed)) - - self.cog.webhook.send.assert_called_once_with( - content=content, - username=username, - avatar_url=avatar_url, - embed=embed - ) - - def test_send_webhook_logs_when_sending_message_fails(self): - """The `send_webhook` method should catch a `discord.HTTPException` and log accordingly.""" - self.cog.webhook = helpers.MockAsyncWebhook() - self.cog.webhook.send.side_effect = discord.HTTPException(response=MagicMock(), message="Something failed.") - - log = logging.getLogger('bot.cogs.duck_pond') - with self.assertLogs(logger=log, level=logging.ERROR) as log_watcher: - asyncio.run(self.cog.send_webhook()) - - self.assertEqual(len(log_watcher.records), 1) - - record = log_watcher.records[0] - self.assertEqual(record.levelno, logging.ERROR) - - def _get_reaction( - self, - emoji: typing.Union[str, helpers.MockEmoji], - staff: int = 0, - nonstaff: int = 0 - ) -> helpers.MockReaction: - staffers = [helpers.MockMember(roles=[self.staff_role]) for _ in range(staff)] - nonstaffers = [helpers.MockMember() for _ in range(nonstaff)] - return helpers.MockReaction(emoji=emoji, users=staffers + nonstaffers) - - async def test_count_ducks_correctly_counts_the_number_of_eligible_duck_emojis(self): - """The `count_ducks` method should return the number of unique staffers who gave a duck.""" - test_cases = ( - # Simple test cases - # A message without reactions should return 0 - ( - "No reactions", - helpers.MockMessage(), - 0 - ), - # A message with a non-duck reaction from a non-staffer should return 0 - ( - "Non-duck reaction from non-staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.thumbs_up_emoji, nonstaff=1)]), - 0 - ), - # A message with a non-duck reaction from a staffer should return 0 - ( - "Non-duck reaction from staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.non_duck_custom_emoji, staff=1)]), - 0 - ), - # A message with a non-duck reaction from a non-staffer and staffer should return 0 - ( - "Non-duck reaction from staffer + non-staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.thumbs_up_emoji, staff=1, nonstaff=1)]), - 0 - ), - # A message with a unicode duck reaction from a non-staffer should return 0 - ( - "Unicode Duck Reaction from non-staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.unicode_duck_emoji, nonstaff=1)]), - 0 - ), - # A message with a unicode duck reaction from a staffer should return 1 - ( - "Unicode Duck Reaction from staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.unicode_duck_emoji, staff=1)]), - 1 - ), - # A message with a unicode duck reaction from a non-staffer and staffer should return 1 - ( - "Unicode Duck Reaction from staffer + non-staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.unicode_duck_emoji, staff=1, nonstaff=1)]), - 1 - ), - # A message with a duckpond duck reaction from a non-staffer should return 0 - ( - "Duckpond Duck Reaction from non-staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, nonstaff=1)]), - 0 - ), - # A message with a duckpond duck reaction from a staffer should return 1 - ( - "Duckpond Duck Reaction from staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, staff=1)]), - 1 - ), - # A message with a duckpond duck reaction from a non-staffer and staffer should return 1 - ( - "Duckpond Duck Reaction from staffer + non-staffer", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, staff=1, nonstaff=1)]), - 1 - ), - - # Complex test cases - # A message with duckpond duck reactions from 3 staffers and 2 non-staffers returns 3 - ( - "Duckpond Duck Reaction from 3 staffers + 2 non-staffers", - helpers.MockMessage(reactions=[self._get_reaction(emoji=self.duck_pond_emoji, staff=3, nonstaff=2)]), - 3 - ), - # A staffer with multiple duck reactions only counts once - ( - "Two different duck reactions from the same staffer", - helpers.MockMessage( - reactions=[ - helpers.MockReaction(emoji=self.duck_pond_emoji, users=[self.staff_member]), - helpers.MockReaction(emoji=self.unicode_duck_emoji, users=[self.staff_member]), - ] - ), - 1 - ), - # A non-string emoji does not count (to test the `isinstance(reaction.emoji, str)` elif) - ( - "Reaction with non-Emoji/str emoij from 3 staffers + 2 non-staffers", - helpers.MockMessage(reactions=[self._get_reaction(emoji=100, staff=3, nonstaff=2)]), - 0 - ), - # We correctly sum when multiple reactions are provided. - ( - "Duckpond Duck Reaction from 3 staffers + 2 non-staffers", - helpers.MockMessage( - reactions=[ - self._get_reaction(emoji=self.duck_pond_emoji, staff=3, nonstaff=2), - self._get_reaction(emoji=self.unicode_duck_emoji, staff=4, nonstaff=9), - ] - ), - 3 + 4 - ), - ) - - for description, message, expected_count in test_cases: - actual_count = await self.cog.count_ducks(message) - with self.subTest(test_case=description, expected_count=expected_count, actual_count=actual_count): - self.assertEqual(expected_count, actual_count) - - async def test_relay_message_correctly_relays_content_and_attachments(self): - """The `relay_message` method should correctly relay message content and attachments.""" - send_webhook_path = f"{MODULE_PATH}.DuckPond.send_webhook" - send_attachments_path = f"{MODULE_PATH}.send_attachments" - - self.cog.webhook = helpers.MockAsyncWebhook() - - test_values = ( - (helpers.MockMessage(clean_content="", attachments=[]), False, False), - (helpers.MockMessage(clean_content="message", attachments=[]), True, False), - (helpers.MockMessage(clean_content="", attachments=["attachment"]), False, True), - (helpers.MockMessage(clean_content="message", attachments=["attachment"]), True, True), - ) - - for message, expect_webhook_call, expect_attachment_call in test_values: - with patch(send_webhook_path, new_callable=AsyncMock) as send_webhook: - with patch(send_attachments_path, new_callable=AsyncMock) as send_attachments: - with self.subTest(clean_content=message.clean_content, attachments=message.attachments): - await self.cog.relay_message(message) - - self.assertEqual(expect_webhook_call, send_webhook.called) - self.assertEqual(expect_attachment_call, send_attachments.called) - - message.add_reaction.assert_called_once_with(self.checkmark_emoji) - - @patch(f"{MODULE_PATH}.send_attachments", new_callable=AsyncMock) - async def test_relay_message_handles_irretrievable_attachment_exceptions(self, send_attachments): - """The `relay_message` method should handle irretrievable attachments.""" - message = helpers.MockMessage(clean_content="message", attachments=["attachment"]) - side_effects = (discord.errors.Forbidden(MagicMock(), ""), discord.errors.NotFound(MagicMock(), "")) - - self.cog.webhook = helpers.MockAsyncWebhook() - log = logging.getLogger("bot.cogs.duck_pond") - - for side_effect in side_effects: # pragma: no cover - send_attachments.side_effect = side_effect - with patch(f"{MODULE_PATH}.DuckPond.send_webhook", new_callable=AsyncMock) as send_webhook: - with self.subTest(side_effect=type(side_effect).__name__): - with self.assertNotLogs(logger=log, level=logging.ERROR): - await self.cog.relay_message(message) - - self.assertEqual(send_webhook.call_count, 2) - - @patch(f"{MODULE_PATH}.DuckPond.send_webhook", new_callable=AsyncMock) - @patch(f"{MODULE_PATH}.send_attachments", new_callable=AsyncMock) - async def test_relay_message_handles_attachment_http_error(self, send_attachments, send_webhook): - """The `relay_message` method should handle irretrievable attachments.""" - message = helpers.MockMessage(clean_content="message", attachments=["attachment"]) - - self.cog.webhook = helpers.MockAsyncWebhook() - log = logging.getLogger("bot.cogs.duck_pond") - - side_effect = discord.HTTPException(MagicMock(), "") - send_attachments.side_effect = side_effect - with self.subTest(side_effect=type(side_effect).__name__): - with self.assertLogs(logger=log, level=logging.ERROR) as log_watcher: - await self.cog.relay_message(message) - - send_webhook.assert_called_once_with( - content=message.clean_content, - username=message.author.display_name, - avatar_url=message.author.avatar_url - ) - - self.assertEqual(len(log_watcher.records), 1) - - record = log_watcher.records[0] - self.assertEqual(record.levelno, logging.ERROR) - - def _mock_payload(self, label: str, is_custom_emoji: bool, id_: int, emoji_name: str): - """Creates a mock `on_raw_reaction_add` payload with the specified emoji data.""" - payload = MagicMock(name=label) - payload.emoji.is_custom_emoji.return_value = is_custom_emoji - payload.emoji.id = id_ - payload.emoji.name = emoji_name - return payload - - async def test_payload_has_duckpond_emoji_correctly_detects_relevant_emojis(self): - """The `on_raw_reaction_add` event handler should ignore irrelevant emojis.""" - test_values = ( - # Custom Emojis - ( - self._mock_payload( - label="Custom Duckpond Emoji", - is_custom_emoji=True, - id_=constants.DuckPond.custom_emojis[0], - emoji_name="" - ), - True - ), - ( - self._mock_payload( - label="Custom Non-Duckpond Emoji", - is_custom_emoji=True, - id_=123, - emoji_name="" - ), - False - ), - # Unicode Emojis - ( - self._mock_payload( - label="Unicode Duck Emoji", - is_custom_emoji=False, - id_=1, - emoji_name=self.unicode_duck_emoji - ), - True - ), - ( - self._mock_payload( - label="Unicode Non-Duck Emoji", - is_custom_emoji=False, - id_=1, - emoji_name=self.thumbs_up_emoji - ), - False - ), - ) - - for payload, expected_return in test_values: - actual_return = self.cog._payload_has_duckpond_emoji(payload) - with self.subTest(case=payload._mock_name, expected_return=expected_return, actual_return=actual_return): - self.assertEqual(expected_return, actual_return) - - @patch(f"{MODULE_PATH}.discord.utils.get") - @patch(f"{MODULE_PATH}.DuckPond._payload_has_duckpond_emoji", new=MagicMock(return_value=False)) - def test_on_raw_reaction_add_returns_early_with_payload_without_duck_emoji(self, utils_get): - """The `on_raw_reaction_add` method should return early if the payload does not contain a duck emoji.""" - self.assertIsNone(asyncio.run(self.cog.on_raw_reaction_add(payload=MagicMock()))) - - # Ensure we've returned before making an unnecessary API call in the lines of code after the emoji check - utils_get.assert_not_called() - - def _raw_reaction_mocks(self, channel_id, message_id, user_id): - """Sets up mocks for tests of the `on_raw_reaction_add` event listener.""" - channel = helpers.MockTextChannel(id=channel_id) - self.bot.get_all_channels.return_value = (channel,) - - message = helpers.MockMessage(id=message_id) - - channel.fetch_message.return_value = message - - member = helpers.MockMember(id=user_id, roles=[self.staff_role]) - message.guild.members = (member,) - - payload = MagicMock(channel_id=channel_id, message_id=message_id, user_id=user_id) - - return channel, message, member, payload - - async def test_on_raw_reaction_add_returns_for_bot_and_non_staff_members(self): - """The `on_raw_reaction_add` event handler should return for bot users or non-staff members.""" - channel_id = 1234 - message_id = 2345 - user_id = 3456 - - channel, message, _, payload = self._raw_reaction_mocks(channel_id, message_id, user_id) - - test_cases = ( - ("non-staff member", helpers.MockMember(id=user_id)), - ("bot staff member", helpers.MockMember(id=user_id, roles=[self.staff_role], bot=True)), - ) - - payload.emoji = self.duck_pond_emoji - - for description, member in test_cases: - message.guild.members = (member, ) - with self.subTest(test_case=description), patch(f"{MODULE_PATH}.DuckPond.has_green_checkmark") as checkmark: - checkmark.side_effect = AssertionError( - "Expected method to return before calling `self.has_green_checkmark`." - ) - self.assertIsNone(await self.cog.on_raw_reaction_add(payload)) - - # Check that we did make it past the payload checks - channel.fetch_message.assert_called_once() - channel.fetch_message.reset_mock() - - @patch(f"{MODULE_PATH}.DuckPond.is_staff") - @patch(f"{MODULE_PATH}.DuckPond.count_ducks", new_callable=AsyncMock) - def test_on_raw_reaction_add_returns_on_message_with_green_checkmark_placed_by_bot(self, count_ducks, is_staff): - """The `on_raw_reaction_add` event should return when the message has a green check mark placed by the bot.""" - channel_id = 31415926535 - message_id = 27182818284 - user_id = 16180339887 - - channel, message, member, payload = self._raw_reaction_mocks(channel_id, message_id, user_id) - - payload.emoji = helpers.MockPartialEmoji(name=self.unicode_duck_emoji) - payload.emoji.is_custom_emoji.return_value = False - - message.reactions = [helpers.MockReaction(emoji=self.checkmark_emoji, users=[self.bot.user])] - - is_staff.return_value = True - count_ducks.side_effect = AssertionError("Expected method to return before calling `self.count_ducks`") - - self.assertIsNone(asyncio.run(self.cog.on_raw_reaction_add(payload))) - - # Assert that we've made it past `self.is_staff` - is_staff.assert_called_once() - - async def test_on_raw_reaction_add_does_not_relay_below_duck_threshold(self): - """The `on_raw_reaction_add` listener should not relay messages or attachments below the duck threshold.""" - test_cases = ( - (constants.DuckPond.threshold - 1, False), - (constants.DuckPond.threshold, True), - (constants.DuckPond.threshold + 1, True), - ) - - channel, message, member, payload = self._raw_reaction_mocks(channel_id=3, message_id=4, user_id=5) - - payload.emoji = self.duck_pond_emoji - - for duck_count, should_relay in test_cases: - with patch(f"{MODULE_PATH}.DuckPond.relay_message", new_callable=AsyncMock) as relay_message: - with patch(f"{MODULE_PATH}.DuckPond.count_ducks", new_callable=AsyncMock) as count_ducks: - count_ducks.return_value = duck_count - with self.subTest(duck_count=duck_count, should_relay=should_relay): - await self.cog.on_raw_reaction_add(payload) - - # Confirm that we've made it past counting - count_ducks.assert_called_once() - - # Did we relay a message? - has_relayed = relay_message.called - self.assertEqual(has_relayed, should_relay) - - if should_relay: - relay_message.assert_called_once_with(message) - - async def test_on_raw_reaction_remove_prevents_removal_of_green_checkmark_depending_on_the_duck_count(self): - """The `on_raw_reaction_remove` listener prevents removal of the check mark on messages with enough ducks.""" - checkmark = helpers.MockPartialEmoji(name=self.checkmark_emoji) - - message = helpers.MockMessage(id=1234) - - channel = helpers.MockTextChannel(id=98765) - channel.fetch_message.return_value = message - - self.bot.get_all_channels.return_value = (channel, ) - - payload = MagicMock(channel_id=channel.id, message_id=message.id, emoji=checkmark) - - test_cases = ( - (constants.DuckPond.threshold - 1, False), - (constants.DuckPond.threshold, True), - (constants.DuckPond.threshold + 1, True), - ) - for duck_count, should_re_add_checkmark in test_cases: - with patch(f"{MODULE_PATH}.DuckPond.count_ducks", new_callable=AsyncMock) as count_ducks: - count_ducks.return_value = duck_count - with self.subTest(duck_count=duck_count, should_re_add_checkmark=should_re_add_checkmark): - await self.cog.on_raw_reaction_remove(payload) - - # Check if we fetched the message - channel.fetch_message.assert_called_once_with(message.id) - - # Check if we actually counted the number of ducks - count_ducks.assert_called_once_with(message) - - has_re_added_checkmark = message.add_reaction.called - self.assertEqual(should_re_add_checkmark, has_re_added_checkmark) - - if should_re_add_checkmark: - message.add_reaction.assert_called_once_with(self.checkmark_emoji) - message.add_reaction.reset_mock() - - # reset mocks - channel.fetch_message.reset_mock() - message.reset_mock() - - def test_on_raw_reaction_remove_ignores_removal_of_non_checkmark_reactions(self): - """The `on_raw_reaction_remove` listener should ignore the removal of non-check mark emojis.""" - channel = helpers.MockTextChannel(id=98765) - - channel.fetch_message.side_effect = AssertionError( - "Expected method to return before calling `channel.fetch_message`" - ) - - self.bot.get_all_channels.return_value = (channel, ) - - payload = MagicMock(emoji=helpers.MockPartialEmoji(name=self.thumbs_up_emoji), channel_id=channel.id) - - self.assertIsNone(asyncio.run(self.cog.on_raw_reaction_remove(payload))) - - channel.fetch_message.assert_not_called() - - -class DuckPondSetupTests(unittest.TestCase): - """Tests setup of the `DuckPond` cog.""" - - def test_setup(self): - """Setup of the extension should call add_cog.""" - bot = helpers.MockBot() - duck_pond.setup(bot) - bot.add_cog.assert_called_once() diff --git a/tests/bot/cogs/__init__.py b/tests/bot/exts/__init__.py index e69de29bb..e69de29bb 100644 --- a/tests/bot/cogs/__init__.py +++ b/tests/bot/exts/__init__.py diff --git a/tests/bot/cogs/moderation/__init__.py b/tests/bot/exts/backend/__init__.py index e69de29bb..e69de29bb 100644 --- a/tests/bot/cogs/moderation/__init__.py +++ b/tests/bot/exts/backend/__init__.py diff --git a/tests/bot/cogs/sync/__init__.py b/tests/bot/exts/backend/sync/__init__.py index e69de29bb..e69de29bb 100644 --- a/tests/bot/cogs/sync/__init__.py +++ b/tests/bot/exts/backend/sync/__init__.py diff --git a/tests/bot/exts/backend/sync/test_base.py b/tests/bot/exts/backend/sync/test_base.py new file mode 100644 index 000000000..4953550f9 --- /dev/null +++ b/tests/bot/exts/backend/sync/test_base.py @@ -0,0 +1,73 @@ +import unittest +from unittest import mock + + +from bot.api import ResponseCodeError +from bot.exts.backend.sync._syncers import Syncer +from tests import helpers + + +class TestSyncer(Syncer): + """Syncer subclass with mocks for abstract methods for testing purposes.""" + + name = "test" + _get_diff = mock.AsyncMock() + _sync = mock.AsyncMock() + + +class SyncerBaseTests(unittest.TestCase): + """Tests for the syncer base class.""" + + def setUp(self): + self.bot = helpers.MockBot() + + def test_instantiation_fails_without_abstract_methods(self): + """The class must have abstract methods implemented.""" + with self.assertRaisesRegex(TypeError, "Can't instantiate abstract class"): + Syncer(self.bot) + + +class SyncerSyncTests(unittest.IsolatedAsyncioTestCase): + """Tests for main function orchestrating the sync.""" + + def setUp(self): + self.bot = helpers.MockBot(user=helpers.MockMember(bot=True)) + self.syncer = TestSyncer(self.bot) + self.guild = helpers.MockGuild() + + # Make sure `_get_diff` returns a MagicMock, not an AsyncMock + self.syncer._get_diff.return_value = mock.MagicMock() + + async def test_sync_message_edited(self): + """The message should be edited if one was sent, even if the sync has an API error.""" + subtests = ( + (None, None, False), + (helpers.MockMessage(), None, True), + (helpers.MockMessage(), ResponseCodeError(mock.MagicMock()), True), + ) + + for message, side_effect, should_edit in subtests: + with self.subTest(message=message, side_effect=side_effect, should_edit=should_edit): + self.syncer._sync.side_effect = side_effect + ctx = helpers.MockContext() + ctx.send.return_value = message + + await self.syncer.sync(self.guild, ctx) + + if should_edit: + message.edit.assert_called_once() + self.assertIn("content", message.edit.call_args[1]) + + async def test_sync_message_sent(self): + """If ctx is given, a new message should be sent.""" + subtests = ( + (None, None), + (helpers.MockContext(), helpers.MockMessage()), + ) + + for ctx, message in subtests: + with self.subTest(ctx=ctx, message=message): + await self.syncer.sync(self.guild, ctx) + + if ctx is not None: + ctx.send.assert_called_once() diff --git a/tests/bot/cogs/sync/test_cog.py b/tests/bot/exts/backend/sync/test_cog.py index 120bc991d..063a82754 100644 --- a/tests/bot/cogs/sync/test_cog.py +++ b/tests/bot/exts/backend/sync/test_cog.py @@ -5,8 +5,9 @@ import discord from bot import constants from bot.api import ResponseCodeError -from bot.cogs import sync -from bot.cogs.sync.syncers import Syncer +from bot.exts.backend import sync +from bot.exts.backend.sync._cog import Sync +from bot.exts.backend.sync._syncers import Syncer from tests import helpers from tests.base import CommandTestCase @@ -29,19 +30,19 @@ class SyncCogTestCase(unittest.IsolatedAsyncioTestCase): self.bot = helpers.MockBot() self.role_syncer_patcher = mock.patch( - "bot.cogs.sync.syncers.RoleSyncer", + "bot.exts.backend.sync._syncers.RoleSyncer", autospec=Syncer, spec_set=True ) self.user_syncer_patcher = mock.patch( - "bot.cogs.sync.syncers.UserSyncer", + "bot.exts.backend.sync._syncers.UserSyncer", autospec=Syncer, spec_set=True ) self.RoleSyncer = self.role_syncer_patcher.start() self.UserSyncer = self.user_syncer_patcher.start() - self.cog = sync.Sync(self.bot) + self.cog = Sync(self.bot) def tearDown(self): self.role_syncer_patcher.stop() @@ -59,7 +60,7 @@ class SyncCogTestCase(unittest.IsolatedAsyncioTestCase): class SyncCogTests(SyncCogTestCase): """Tests for the Sync cog.""" - @mock.patch.object(sync.Sync, "sync_guild", new_callable=mock.MagicMock) + @mock.patch.object(Sync, "sync_guild", new_callable=mock.MagicMock) def test_sync_cog_init(self, sync_guild): """Should instantiate syncers and run a sync for the guild.""" # Reset because a Sync cog was already instantiated in setUp. @@ -70,7 +71,7 @@ class SyncCogTests(SyncCogTestCase): mock_sync_guild_coro = mock.MagicMock() sync_guild.return_value = mock_sync_guild_coro - sync.Sync(self.bot) + Sync(self.bot) self.RoleSyncer.assert_called_once_with(self.bot) self.UserSyncer.assert_called_once_with(self.bot) @@ -131,7 +132,7 @@ class SyncCogListenerTests(SyncCogTestCase): super().setUp() self.cog.patch_user = mock.AsyncMock(spec_set=self.cog.patch_user) - self.guild_id_patcher = mock.patch("bot.cogs.sync.cog.constants.Guild.id", 5) + self.guild_id_patcher = mock.patch("bot.exts.backend.sync._cog.constants.Guild.id", 5) self.guild_id = self.guild_id_patcher.start() self.guild = helpers.MockGuild(id=self.guild_id) @@ -391,14 +392,14 @@ class SyncCogCommandTests(SyncCogTestCase, CommandTestCase): async def test_sync_roles_command(self): """sync() should be called on the RoleSyncer.""" ctx = helpers.MockContext() - await self.cog.sync_roles_command.callback(self.cog, ctx) + await self.cog.sync_roles_command(self.cog, ctx) self.cog.role_syncer.sync.assert_called_once_with(ctx.guild, ctx) async def test_sync_users_command(self): """sync() should be called on the UserSyncer.""" ctx = helpers.MockContext() - await self.cog.sync_users_command.callback(self.cog, ctx) + await self.cog.sync_users_command(self.cog, ctx) self.cog.user_syncer.sync.assert_called_once_with(ctx.guild, ctx) diff --git a/tests/bot/cogs/sync/test_roles.py b/tests/bot/exts/backend/sync/test_roles.py index 79eee98f4..7b9f40cad 100644 --- a/tests/bot/cogs/sync/test_roles.py +++ b/tests/bot/exts/backend/sync/test_roles.py @@ -3,7 +3,7 @@ from unittest import mock import discord -from bot.cogs.sync.syncers import RoleSyncer, _Diff, _Role +from bot.exts.backend.sync._syncers import RoleSyncer, _Diff, _Role from tests import helpers diff --git a/tests/bot/cogs/sync/test_users.py b/tests/bot/exts/backend/sync/test_users.py index 002a947ad..9f380a15d 100644 --- a/tests/bot/cogs/sync/test_users.py +++ b/tests/bot/exts/backend/sync/test_users.py @@ -1,7 +1,6 @@ import unittest -from unittest import mock -from bot.cogs.sync.syncers import UserSyncer, _Diff, _User +from bot.exts.backend.sync._syncers import UserSyncer, _Diff from tests import helpers @@ -10,7 +9,7 @@ def fake_user(**kwargs): kwargs.setdefault("id", 43) kwargs.setdefault("name", "bob the test man") kwargs.setdefault("discriminator", 1337) - kwargs.setdefault("roles", (666,)) + kwargs.setdefault("roles", [666]) kwargs.setdefault("in_guild", True) return kwargs @@ -40,22 +39,42 @@ class UserSyncerDiffTests(unittest.IsolatedAsyncioTestCase): return guild + @staticmethod + def get_mock_member(member: dict): + member = member.copy() + del member["in_guild"] + mock_member = helpers.MockMember(**member) + mock_member.roles = [helpers.MockRole(id=role_id) for role_id in member["roles"]] + return mock_member + async def test_empty_diff_for_no_users(self): """When no users are given, an empty diff should be returned.""" + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [] + } guild = self.get_guild() actual_diff = await self.syncer._get_diff(guild) - expected_diff = (set(), set(), None) + expected_diff = ([], [], None) self.assertEqual(actual_diff, expected_diff) async def test_empty_diff_for_identical_users(self): """No differences should be found if the users in the guild and DB are identical.""" - self.bot.api_client.get.return_value = [fake_user()] + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [fake_user()] + } guild = self.get_guild(fake_user()) + guild.get_member.return_value = self.get_mock_member(fake_user()) actual_diff = await self.syncer._get_diff(guild) - expected_diff = (set(), set(), None) + expected_diff = ([], [], None) self.assertEqual(actual_diff, expected_diff) @@ -63,59 +82,102 @@ class UserSyncerDiffTests(unittest.IsolatedAsyncioTestCase): """Only updated users should be added to the 'updated' set of the diff.""" updated_user = fake_user(id=99, name="new") - self.bot.api_client.get.return_value = [fake_user(id=99, name="old"), fake_user()] + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [fake_user(id=99, name="old"), fake_user()] + } guild = self.get_guild(updated_user, fake_user()) + guild.get_member.side_effect = [ + self.get_mock_member(updated_user), + self.get_mock_member(fake_user()) + ] actual_diff = await self.syncer._get_diff(guild) - expected_diff = (set(), {_User(**updated_user)}, None) + expected_diff = ([], [{"id": 99, "name": "new"}], None) self.assertEqual(actual_diff, expected_diff) async def test_diff_for_new_users(self): - """Only new users should be added to the 'created' set of the diff.""" + """Only new users should be added to the 'created' list of the diff.""" new_user = fake_user(id=99, name="new") - self.bot.api_client.get.return_value = [fake_user()] + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [fake_user()] + } guild = self.get_guild(fake_user(), new_user) - + guild.get_member.side_effect = [ + self.get_mock_member(fake_user()), + self.get_mock_member(new_user) + ] actual_diff = await self.syncer._get_diff(guild) - expected_diff = ({_User(**new_user)}, set(), None) + expected_diff = ([new_user], [], None) self.assertEqual(actual_diff, expected_diff) async def test_diff_sets_in_guild_false_for_leaving_users(self): """When a user leaves the guild, the `in_guild` flag is updated to `False`.""" - leaving_user = fake_user(id=63, in_guild=False) - - self.bot.api_client.get.return_value = [fake_user(), fake_user(id=63)] + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [fake_user(), fake_user(id=63)] + } guild = self.get_guild(fake_user()) + guild.get_member.side_effect = [ + self.get_mock_member(fake_user()), + None + ] actual_diff = await self.syncer._get_diff(guild) - expected_diff = (set(), {_User(**leaving_user)}, None) + expected_diff = ([], [{"id": 63, "in_guild": False}], None) self.assertEqual(actual_diff, expected_diff) async def test_diff_for_new_updated_and_leaving_users(self): """When users are added, updated, and removed, all of them are returned properly.""" new_user = fake_user(id=99, name="new") + updated_user = fake_user(id=55, name="updated") - leaving_user = fake_user(id=63, in_guild=False) - self.bot.api_client.get.return_value = [fake_user(), fake_user(id=55), fake_user(id=63)] + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [fake_user(), fake_user(id=55), fake_user(id=63)] + } guild = self.get_guild(fake_user(), new_user, updated_user) + guild.get_member.side_effect = [ + self.get_mock_member(fake_user()), + self.get_mock_member(updated_user), + None + ] actual_diff = await self.syncer._get_diff(guild) - expected_diff = ({_User(**new_user)}, {_User(**updated_user), _User(**leaving_user)}, None) + expected_diff = ([new_user], [{"id": 55, "name": "updated"}, {"id": 63, "in_guild": False}], None) self.assertEqual(actual_diff, expected_diff) async def test_empty_diff_for_db_users_not_in_guild(self): - """When the DB knows a user the guild doesn't, no difference is found.""" - self.bot.api_client.get.return_value = [fake_user(), fake_user(id=63, in_guild=False)] + """When the DB knows a user, but the guild doesn't, no difference is found.""" + self.bot.api_client.get.return_value = { + "count": 3, + "next_page_no": None, + "previous_page_no": None, + "results": [fake_user(), fake_user(id=63, in_guild=False)] + } guild = self.get_guild(fake_user()) + guild.get_member.side_effect = [ + self.get_mock_member(fake_user()), + None + ] actual_diff = await self.syncer._get_diff(guild) - expected_diff = (set(), set(), None) + expected_diff = ([], [], None) self.assertEqual(actual_diff, expected_diff) @@ -131,13 +193,10 @@ class UserSyncerSyncTests(unittest.IsolatedAsyncioTestCase): """Only POST requests should be made with the correct payload.""" users = [fake_user(id=111), fake_user(id=222)] - user_tuples = {_User(**user) for user in users} - diff = _Diff(user_tuples, set(), None) + diff = _Diff(users, [], None) await self.syncer._sync(diff) - calls = [mock.call("bot/users", json=user) for user in users] - self.bot.api_client.post.assert_has_calls(calls, any_order=True) - self.assertEqual(self.bot.api_client.post.call_count, len(users)) + self.bot.api_client.post.assert_called_once_with("bot/users", json=diff.created) self.bot.api_client.put.assert_not_called() self.bot.api_client.delete.assert_not_called() @@ -146,13 +205,10 @@ class UserSyncerSyncTests(unittest.IsolatedAsyncioTestCase): """Only PUT requests should be made with the correct payload.""" users = [fake_user(id=111), fake_user(id=222)] - user_tuples = {_User(**user) for user in users} - diff = _Diff(set(), user_tuples, None) + diff = _Diff([], users, None) await self.syncer._sync(diff) - calls = [mock.call(f"bot/users/{user['id']}", json=user) for user in users] - self.bot.api_client.put.assert_has_calls(calls, any_order=True) - self.assertEqual(self.bot.api_client.put.call_count, len(users)) + self.bot.api_client.patch.assert_called_once_with("bot/users/bulk_patch", json=diff.updated) self.bot.api_client.post.assert_not_called() self.bot.api_client.delete.assert_not_called() diff --git a/tests/bot/cogs/test_logging.py b/tests/bot/exts/backend/test_logging.py index 8a18fdcd6..466f207d9 100644 --- a/tests/bot/cogs/test_logging.py +++ b/tests/bot/exts/backend/test_logging.py @@ -2,7 +2,7 @@ import unittest from unittest.mock import patch from bot import constants -from bot.cogs.logging import Logging +from bot.exts.backend.logging import Logging from tests.helpers import MockBot, MockTextChannel @@ -14,7 +14,7 @@ class LoggingTests(unittest.IsolatedAsyncioTestCase): self.cog = Logging(self.bot) self.dev_log = MockTextChannel(id=1234, name="dev-log") - @patch("bot.cogs.logging.DEBUG_MODE", False) + @patch("bot.exts.backend.logging.DEBUG_MODE", False) async def test_debug_mode_false(self): """Should send connected message to dev-log.""" self.bot.get_channel.return_value = self.dev_log @@ -24,7 +24,7 @@ class LoggingTests(unittest.IsolatedAsyncioTestCase): self.bot.get_channel.assert_called_once_with(constants.Channels.dev_log) self.dev_log.send.assert_awaited_once() - @patch("bot.cogs.logging.DEBUG_MODE", True) + @patch("bot.exts.backend.logging.DEBUG_MODE", True) async def test_debug_mode_true(self): """Should not send anything to dev-log.""" await self.cog.startup_greeting() diff --git a/tests/bot/patches/__init__.py b/tests/bot/exts/filters/__init__.py index e69de29bb..e69de29bb 100644 --- a/tests/bot/patches/__init__.py +++ b/tests/bot/exts/filters/__init__.py diff --git a/tests/bot/cogs/test_antimalware.py b/tests/bot/exts/filters/test_antimalware.py index f219fc1ba..3393c6cdc 100644 --- a/tests/bot/cogs/test_antimalware.py +++ b/tests/bot/exts/filters/test_antimalware.py @@ -1,28 +1,35 @@ import unittest -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import AsyncMock, Mock from discord import NotFound -from bot.cogs import antimalware -from bot.constants import AntiMalware as AntiMalwareConfig, Channels, STAFF_ROLES +from bot.constants import Channels, STAFF_ROLES +from bot.exts.filters import antimalware from tests.helpers import MockAttachment, MockBot, MockMessage, MockRole -MODULE = "bot.cogs.antimalware" - -@patch(f"{MODULE}.AntiMalwareConfig.whitelist", new=[".first", ".second", ".third"]) class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): """Test the AntiMalware cog.""" def setUp(self): """Sets up fresh objects for each test.""" self.bot = MockBot() + self.bot.filter_list_cache = { + "FILE_FORMAT.True": { + ".first": {}, + ".second": {}, + ".third": {}, + } + } self.cog = antimalware.AntiMalware(self.bot) self.message = MockMessage() + self.message.webhook_id = None + self.message.author.bot = None + self.whitelist = [".first", ".second", ".third"] async def test_message_with_allowed_attachment(self): """Messages with allowed extensions should not be deleted""" - attachment = MockAttachment(filename=f"python{AntiMalwareConfig.whitelist[0]}") + attachment = MockAttachment(filename="python.first") self.message.attachments = [attachment] await self.cog.on_message(self.message) @@ -43,6 +50,26 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): self.message.delete.assert_not_called() + async def test_webhook_message_with_illegal_extension(self): + """A webhook message containing an illegal extension should be ignored.""" + attachment = MockAttachment(filename="python.disallowed") + self.message.webhook_id = 697140105563078727 + self.message.attachments = [attachment] + + await self.cog.on_message(self.message) + + self.message.delete.assert_not_called() + + async def test_bot_message_with_illegal_extension(self): + """A bot message containing an illegal extension should be ignored.""" + attachment = MockAttachment(filename="python.disallowed") + self.message.author.bot = 409107086526644234 + self.message.attachments = [attachment] + + await self.cog.on_message(self.message) + + self.message.delete.assert_not_called() + async def test_message_with_illegal_extension_gets_deleted(self): """A message containing an illegal extension should send an embed.""" attachment = MockAttachment(filename="python.disallowed") @@ -93,7 +120,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(embed.description, antimalware.TXT_EMBED_DESCRIPTION.format.return_value) antimalware.TXT_EMBED_DESCRIPTION.format.assert_called_with(cmd_channel_mention=cmd_channel.mention) - async def test_other_disallowed_extention_embed_description(self): + async def test_other_disallowed_extension_embed_description(self): """Test the description for a non .py/.txt disallowed extension.""" attachment = MockAttachment(filename="python.disallowed") self.message.attachments = [attachment] @@ -109,6 +136,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(embed.description, antimalware.DISALLOWED_EMBED_DESCRIPTION.format.return_value) antimalware.DISALLOWED_EMBED_DESCRIPTION.format.assert_called_with( + joined_whitelist=", ".join(self.whitelist), blocked_extensions_str=".disallowed", meta_channel_mention=meta_channel.mention ) @@ -135,7 +163,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): """The return value should include all non-whitelisted extensions.""" test_values = ( ([], []), - (AntiMalwareConfig.whitelist, []), + (self.whitelist, []), ([".first"], []), ([".first", ".disallowed"], [".disallowed"]), ([".disallowed"], [".disallowed"]), @@ -145,7 +173,7 @@ class AntiMalwareCogTests(unittest.IsolatedAsyncioTestCase): for extensions, expected_disallowed_extensions in test_values: with self.subTest(extensions=extensions, expected_disallowed_extensions=expected_disallowed_extensions): self.message.attachments = [MockAttachment(filename=f"filename{extension}") for extension in extensions] - disallowed_extensions = self.cog.get_disallowed_extensions(self.message) + disallowed_extensions = self.cog._get_disallowed_extensions(self.message) self.assertCountEqual(disallowed_extensions, expected_disallowed_extensions) diff --git a/tests/bot/cogs/test_antispam.py b/tests/bot/exts/filters/test_antispam.py index ce5472c71..6a0e4fded 100644 --- a/tests/bot/cogs/test_antispam.py +++ b/tests/bot/exts/filters/test_antispam.py @@ -1,6 +1,6 @@ import unittest -from bot.cogs import antispam +from bot.exts.filters import antispam class AntispamConfigurationValidationTests(unittest.TestCase): diff --git a/tests/bot/cogs/test_security.py b/tests/bot/exts/filters/test_security.py index 9d1a62f7e..c0c3baa42 100644 --- a/tests/bot/cogs/test_security.py +++ b/tests/bot/exts/filters/test_security.py @@ -3,7 +3,7 @@ from unittest.mock import MagicMock from discord.ext.commands import NoPrivateMessage -from bot.cogs import security +from bot.exts.filters import security from tests.helpers import MockBot, MockContext diff --git a/tests/bot/cogs/test_token_remover.py b/tests/bot/exts/filters/test_token_remover.py index 3349caa73..f99cc3370 100644 --- a/tests/bot/cogs/test_token_remover.py +++ b/tests/bot/exts/filters/test_token_remover.py @@ -6,9 +6,10 @@ from unittest.mock import MagicMock from discord import Colour, NotFound from bot import constants -from bot.cogs import token_remover -from bot.cogs.moderation import ModLog -from bot.cogs.token_remover import Token, TokenRemover +from bot.exts.filters import token_remover +from bot.exts.filters.token_remover import Token, TokenRemover +from bot.exts.moderation.modlog import ModLog +from bot.utils.messages import format_user from tests.helpers import MockBot, MockMessage, autospec @@ -22,23 +23,25 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): self.msg = MockMessage(id=555, content="hello world") self.msg.channel.mention = "#lemonade-stand" + self.msg.guild.get_member.return_value.bot = False + self.msg.guild.get_member.return_value.__str__.return_value = "Woody" self.msg.author.__str__ = MagicMock(return_value=self.msg.author.name) self.msg.author.avatar_url_as.return_value = "picture-lemon.png" - def test_is_valid_user_id_valid(self): - """Should consider user IDs valid if they decode entirely to ASCII digits.""" - ids = ( - "NDcyMjY1OTQzMDYyNDEzMzMy", - "NDc1MDczNjI5Mzk5NTQ3OTA0", - "NDY3MjIzMjMwNjUwNzc3NjQx", + def test_extract_user_id_valid(self): + """Should consider user IDs valid if they decode into an integer ID.""" + id_pairs = ( + ("NDcyMjY1OTQzMDYyNDEzMzMy", 472265943062413332), + ("NDc1MDczNjI5Mzk5NTQ3OTA0", 475073629399547904), + ("NDY3MjIzMjMwNjUwNzc3NjQx", 467223230650777641), ) - for user_id in ids: - with self.subTest(user_id=user_id): - result = TokenRemover.is_valid_user_id(user_id) - self.assertTrue(result) + for token_id, user_id in id_pairs: + with self.subTest(token_id=token_id): + result = TokenRemover.extract_user_id(token_id) + self.assertEqual(result, user_id) - def test_is_valid_user_id_invalid(self): + def test_extract_user_id_invalid(self): """Should consider non-digit and non-ASCII IDs invalid.""" ids = ( ("SGVsbG8gd29ybGQ", "non-digit ASCII"), @@ -52,8 +55,8 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): for user_id, msg in ids: with self.subTest(msg=msg): - result = TokenRemover.is_valid_user_id(user_id) - self.assertFalse(result) + result = TokenRemover.extract_user_id(user_id) + self.assertIsNone(result) def test_is_valid_timestamp_valid(self): """Should consider timestamps valid if they're greater than the Discord epoch.""" @@ -85,6 +88,34 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): result = TokenRemover.is_valid_timestamp(timestamp) self.assertFalse(result) + def test_is_valid_hmac_valid(self): + """Should consider an HMAC valid if it has at least 3 unique characters.""" + valid_hmacs = ( + "VXmErH7j511turNpfURmb0rVNm8", + "Ysnu2wacjaKs7qnoo46S8Dm2us8", + "sJf6omBPORBPju3WJEIAcwW9Zds", + "s45jqDV_Iisn-symw0yDRrk_jf4", + ) + + for hmac in valid_hmacs: + with self.subTest(msg=hmac): + result = TokenRemover.is_maybe_valid_hmac(hmac) + self.assertTrue(result) + + def test_is_invalid_hmac_invalid(self): + """Should consider an HMAC invalid if has fewer than 3 unique characters.""" + invalid_hmacs = ( + ("xxxxxxxxxxxxxxxxxx", "Single character"), + ("XxXxXxXxXxXxXxXxXx", "Single character alternating case"), + ("ASFasfASFasfASFASsf", "Three characters alternating-case"), + ("asdasdasdasdasdasdasd", "Three characters one case"), + ) + + for hmac, msg in invalid_hmacs: + with self.subTest(msg=msg): + result = TokenRemover.is_maybe_valid_hmac(hmac) + self.assertFalse(result) + def test_mod_log_property(self): """The `mod_log` property should ask the bot to return the `ModLog` cog.""" self.bot.get_cog.return_value = 'lemon' @@ -132,7 +163,7 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): await cog.on_message(msg) find_token_in_message.assert_not_called() - @autospec("bot.cogs.token_remover", "TOKEN_RE") + @autospec("bot.exts.filters.token_remover", "TOKEN_RE") def test_find_token_no_matches(self, token_re): """None should be returned if the regex matches no tokens in a message.""" token_re.finditer.return_value = () @@ -142,11 +173,18 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): self.assertIsNone(return_value) token_re.finditer.assert_called_once_with(self.msg.content) - @autospec(TokenRemover, "is_valid_user_id", "is_valid_timestamp") - @autospec("bot.cogs.token_remover", "Token") - @autospec("bot.cogs.token_remover", "TOKEN_RE") - def test_find_token_valid_match(self, token_re, token_cls, is_valid_id, is_valid_timestamp): - """The first match with a valid user ID and timestamp should be returned as a `Token`.""" + @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac") + @autospec("bot.exts.filters.token_remover", "Token") + @autospec("bot.exts.filters.token_remover", "TOKEN_RE") + def test_find_token_valid_match( + self, + token_re, + token_cls, + extract_user_id, + is_valid_timestamp, + is_maybe_valid_hmac, + ): + """The first match with a valid user ID, timestamp, and HMAC should be returned as a `Token`.""" matches = [ mock.create_autospec(Match, spec_set=True, instance=True), mock.create_autospec(Match, spec_set=True, instance=True), @@ -158,23 +196,32 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): token_re.finditer.return_value = matches token_cls.side_effect = tokens - is_valid_id.side_effect = (False, True) # The 1st match will be invalid, 2nd one valid. + extract_user_id.side_effect = (None, True) # The 1st match will be invalid, 2nd one valid. is_valid_timestamp.return_value = True + is_maybe_valid_hmac.return_value = True return_value = TokenRemover.find_token_in_message(self.msg) self.assertEqual(tokens[1], return_value) token_re.finditer.assert_called_once_with(self.msg.content) - @autospec(TokenRemover, "is_valid_user_id", "is_valid_timestamp") - @autospec("bot.cogs.token_remover", "Token") - @autospec("bot.cogs.token_remover", "TOKEN_RE") - def test_find_token_invalid_matches(self, token_re, token_cls, is_valid_id, is_valid_timestamp): - """None should be returned if no matches have valid user IDs or timestamps.""" + @autospec(TokenRemover, "extract_user_id", "is_valid_timestamp", "is_maybe_valid_hmac") + @autospec("bot.exts.filters.token_remover", "Token") + @autospec("bot.exts.filters.token_remover", "TOKEN_RE") + def test_find_token_invalid_matches( + self, + token_re, + token_cls, + extract_user_id, + is_valid_timestamp, + is_maybe_valid_hmac, + ): + """None should be returned if no matches have valid user IDs, HMACs, and timestamps.""" token_re.finditer.return_value = [mock.create_autospec(Match, spec_set=True, instance=True)] token_cls.return_value = mock.create_autospec(Token, spec_set=True, instance=True) - is_valid_id.return_value = False + extract_user_id.return_value = None is_valid_timestamp.return_value = False + is_maybe_valid_hmac.return_value = False return_value = TokenRemover.find_token_in_message(self.msg) @@ -230,36 +277,85 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): results = [match[0] for match in results] self.assertCountEqual((token_1, token_2), results) - @autospec("bot.cogs.token_remover", "LOG_MESSAGE") + @autospec("bot.exts.filters.token_remover", "LOG_MESSAGE") def test_format_log_message(self, log_message): """Should correctly format the log message with info from the message and token.""" - token = Token("NDY3MjIzMjMwNjUwNzc3NjQx", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") + token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") log_message.format.return_value = "Howdy" return_value = TokenRemover.format_log_message(self.msg, token) self.assertEqual(return_value, log_message.format.return_value) log_message.format.assert_called_once_with( - author=self.msg.author, - author_id=self.msg.author.id, + author=format_user(self.msg.author), channel=self.msg.channel.mention, user_id=token.user_id, timestamp=token.timestamp, hmac="x" * len(token.hmac), ) + @autospec("bot.exts.filters.token_remover", "UNKNOWN_USER_LOG_MESSAGE") + def test_format_userid_log_message_unknown(self, unknown_user_log_message): + """Should correctly format the user ID portion when the actual user it belongs to is unknown.""" + token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") + unknown_user_log_message.format.return_value = " Partner" + msg = MockMessage(id=555, content="hello world") + msg.guild.get_member.return_value = None + + return_value = TokenRemover.format_userid_log_message(msg, token) + + self.assertEqual(return_value, (unknown_user_log_message.format.return_value, False)) + unknown_user_log_message.format.assert_called_once_with(user_id=472265943062413332) + + @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE") + def test_format_userid_log_message_bot(self, known_user_log_message): + """Should correctly format the user ID portion when the ID belongs to a known bot.""" + token = Token("NDcyMjY1OTQzMDYyNDEzMzMy", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") + known_user_log_message.format.return_value = " Partner" + msg = MockMessage(id=555, content="hello world") + msg.guild.get_member.return_value.__str__.return_value = "Sam" + msg.guild.get_member.return_value.bot = True + + return_value = TokenRemover.format_userid_log_message(msg, token) + + self.assertEqual(return_value, (known_user_log_message.format.return_value, False)) + + known_user_log_message.format.assert_called_once_with( + user_id=472265943062413332, + user_name="Sam", + kind="BOT", + ) + + @autospec("bot.exts.filters.token_remover", "KNOWN_USER_LOG_MESSAGE") + def test_format_log_message_user_token_user(self, user_token_message): + """Should correctly format the user ID portion when the ID belongs to a known user.""" + token = Token("NDY3MjIzMjMwNjUwNzc3NjQx", "XsySD_", "s45jqDV_Iisn-symw0yDRrk_jf4") + user_token_message.format.return_value = "Partner" + + return_value = TokenRemover.format_userid_log_message(self.msg, token) + + self.assertEqual(return_value, (user_token_message.format.return_value, True)) + user_token_message.format.assert_called_once_with( + user_id=467223230650777641, + user_name="Woody", + kind="USER", + ) + @mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock) - @autospec("bot.cogs.token_remover", "log") - @autospec(TokenRemover, "format_log_message") - async def test_take_action(self, format_log_message, logger, mod_log_property): + @autospec("bot.exts.filters.token_remover", "log") + @autospec(TokenRemover, "format_log_message", "format_userid_log_message") + async def test_take_action(self, format_log_message, format_userid_log_message, logger, mod_log_property): """Should delete the message and send a mod log.""" cog = TokenRemover(self.bot) mod_log = mock.create_autospec(ModLog, spec_set=True, instance=True) token = mock.create_autospec(Token, spec_set=True, instance=True) + token.user_id = "no-id" log_msg = "testing123" + userid_log_message = "userid-log-message" mod_log_property.return_value = mod_log format_log_message.return_value = log_msg + format_userid_log_message.return_value = (userid_log_message, True) await cog.take_action(self.msg, token) @@ -269,6 +365,7 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): ) format_log_message.assert_called_once_with(self.msg, token) + format_userid_log_message.assert_called_once_with(self.msg, token) logger.debug.assert_called_with(log_msg) self.bot.stats.incr.assert_called_once_with("tokens.removed_tokens") @@ -277,9 +374,10 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): icon_url=constants.Icons.token_removed, colour=Colour(constants.Colours.soft_red), title="Token removed!", - text=log_msg, + text=log_msg + "\n" + userid_log_message, thumbnail=self.msg.author.avatar_url_as.return_value, - channel_id=constants.Channels.mod_alerts + channel_id=constants.Channels.mod_alerts, + ping_everyone=True, ) @mock.patch.object(TokenRemover, "mod_log", new_callable=mock.PropertyMock) @@ -299,7 +397,7 @@ class TokenRemoverTests(unittest.IsolatedAsyncioTestCase): class TokenRemoverExtensionTests(unittest.TestCase): """Tests for the token_remover extension.""" - @autospec("bot.cogs.token_remover", "TokenRemover") + @autospec("bot.exts.filters.token_remover", "TokenRemover") def test_extension_setup(self, cog): """The TokenRemover cog should be added.""" bot = MockBot() diff --git a/tests/bot/exts/info/__init__.py b/tests/bot/exts/info/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/bot/exts/info/__init__.py diff --git a/tests/bot/cogs/test_information.py b/tests/bot/exts/info/test_information.py index 79c0e0ad3..daede54c5 100644 --- a/tests/bot/cogs/test_information.py +++ b/tests/bot/exts/info/test_information.py @@ -1,4 +1,3 @@ -import asyncio import textwrap import unittest import unittest.mock @@ -6,14 +5,14 @@ import unittest.mock import discord from bot import constants -from bot.cogs import information +from bot.exts.info import information from bot.utils.checks import InWhitelistCheckFailure from tests import helpers -COG_PATH = "bot.cogs.information.Information" +COG_PATH = "bot.exts.info.information.Information" -class InformationCogTests(unittest.TestCase): +class InformationCogTests(unittest.IsolatedAsyncioTestCase): """Tests the Information cog.""" @classmethod @@ -29,16 +28,14 @@ class InformationCogTests(unittest.TestCase): self.ctx = helpers.MockContext() self.ctx.author.roles.append(self.moderator_role) - def test_roles_command_command(self): + async def test_roles_command_command(self): """Test if the `role_info` command correctly returns the `moderator_role`.""" self.ctx.guild.roles.append(self.moderator_role) self.cog.roles_info.can_run = unittest.mock.AsyncMock() self.cog.roles_info.can_run.return_value = True - coroutine = self.cog.roles_info.callback(self.cog, self.ctx) - - self.assertIsNone(asyncio.run(coroutine)) + self.assertIsNone(await self.cog.roles_info(self.cog, self.ctx)) self.ctx.send.assert_called_once() _, kwargs = self.ctx.send.call_args @@ -48,7 +45,7 @@ class InformationCogTests(unittest.TestCase): self.assertEqual(embed.colour, discord.Colour.blurple()) self.assertEqual(embed.description, f"\n`{self.moderator_role.id}` - {self.moderator_role.mention}\n") - def test_role_info_command(self): + async def test_role_info_command(self): """Tests the `role info` command.""" dummy_role = helpers.MockRole( name="Dummy", @@ -73,9 +70,7 @@ class InformationCogTests(unittest.TestCase): self.cog.role_info.can_run = unittest.mock.AsyncMock() self.cog.role_info.can_run.return_value = True - coroutine = self.cog.role_info.callback(self.cog, self.ctx, dummy_role, admin_role) - - self.assertIsNone(asyncio.run(coroutine)) + self.assertIsNone(await self.cog.role_info(self.cog, self.ctx, dummy_role, admin_role)) self.assertEqual(self.ctx.send.call_count, 2) @@ -97,80 +92,8 @@ class InformationCogTests(unittest.TestCase): self.assertEqual(admin_embed.title, "Admins info") self.assertEqual(admin_embed.colour, discord.Colour.red()) - @unittest.mock.patch('bot.cogs.information.time_since') - def test_server_info_command(self, time_since_patch): - time_since_patch.return_value = '2 days ago' - - self.ctx.guild = helpers.MockGuild( - features=('lemons', 'apples'), - region="The Moon", - roles=[self.moderator_role], - channels=[ - discord.TextChannel( - state={}, - guild=self.ctx.guild, - data={'id': 42, 'name': 'lemons-offering', 'position': 22, 'type': 'text'} - ), - discord.CategoryChannel( - state={}, - guild=self.ctx.guild, - data={'id': 5125, 'name': 'the-lemon-collection', 'position': 22, 'type': 'category'} - ), - discord.VoiceChannel( - state={}, - guild=self.ctx.guild, - data={'id': 15290, 'name': 'listen-to-lemon', 'position': 22, 'type': 'voice'} - ) - ], - members=[ - *(helpers.MockMember(status=discord.Status.online) for _ in range(2)), - *(helpers.MockMember(status=discord.Status.idle) for _ in range(1)), - *(helpers.MockMember(status=discord.Status.dnd) for _ in range(4)), - *(helpers.MockMember(status=discord.Status.offline) for _ in range(3)), - ], - member_count=1_234, - icon_url='a-lemon.jpg', - ) - - coroutine = self.cog.server_info.callback(self.cog, self.ctx) - self.assertIsNone(asyncio.run(coroutine)) - - time_since_patch.assert_called_once_with(self.ctx.guild.created_at, precision='days') - _, kwargs = self.ctx.send.call_args - embed = kwargs.pop('embed') - self.assertEqual(embed.colour, discord.Colour.blurple()) - self.assertEqual( - embed.description, - textwrap.dedent( - f""" - **Server information** - Created: {time_since_patch.return_value} - Voice region: {self.ctx.guild.region} - Features: {', '.join(self.ctx.guild.features)} - - **Channel counts** - Category channels: 1 - Text channels: 1 - Voice channels: 1 - Staff channels: 0 - - **Member counts** - Members: {self.ctx.guild.member_count:,} - Staff members: 0 - Roles: {len(self.ctx.guild.roles)} - - **Member statuses** - {constants.Emojis.status_online} 2 - {constants.Emojis.status_idle} 1 - {constants.Emojis.status_dnd} 4 - {constants.Emojis.status_offline} 3 - """ - ) - ) - self.assertEqual(embed.thumbnail.url, 'a-lemon.jpg') - -class UserInfractionHelperMethodTests(unittest.TestCase): +class UserInfractionHelperMethodTests(unittest.IsolatedAsyncioTestCase): """Tests for the helper methods of the `!user` command.""" def setUp(self): @@ -180,7 +103,7 @@ class UserInfractionHelperMethodTests(unittest.TestCase): self.cog = information.Information(self.bot) self.member = helpers.MockMember(id=1234) - def test_user_command_helper_method_get_requests(self): + async def test_user_command_helper_method_get_requests(self): """The helper methods should form the correct get requests.""" test_values = ( { @@ -202,11 +125,11 @@ class UserInfractionHelperMethodTests(unittest.TestCase): endpoint, params = test_value["expected_args"] with self.subTest(method=helper_method, endpoint=endpoint, params=params): - asyncio.run(helper_method(self.member)) + await helper_method(self.member) self.bot.api_client.get.assert_called_once_with(endpoint, params=params) self.bot.api_client.get.reset_mock() - def _method_subtests(self, method, test_values, default_header): + async def _method_subtests(self, method, test_values, default_header): """Helper method that runs the subtests for the different helper methods.""" for test_value in test_values: api_response = test_value["api response"] @@ -215,12 +138,12 @@ class UserInfractionHelperMethodTests(unittest.TestCase): with self.subTest(method=method, api_response=api_response, expected_lines=expected_lines): self.bot.api_client.get.return_value = api_response - expected_output = "\n".join(default_header + expected_lines) - actual_output = asyncio.run(method(self.member)) + expected_output = "\n".join(expected_lines) + actual_output = await method(self.member) - self.assertEqual(expected_output, actual_output) + self.assertEqual((default_header, expected_output), actual_output) - def test_basic_user_infraction_counts_returns_correct_strings(self): + async def test_basic_user_infraction_counts_returns_correct_strings(self): """The method should correctly list both the total and active number of non-hidden infractions.""" test_values = ( # No infractions means zero counts @@ -249,16 +172,16 @@ class UserInfractionHelperMethodTests(unittest.TestCase): }, ) - header = ["**Infractions**"] + header = "Infractions" - self._method_subtests(self.cog.basic_user_infraction_counts, test_values, header) + await self._method_subtests(self.cog.basic_user_infraction_counts, test_values, header) - def test_expanded_user_infraction_counts_returns_correct_strings(self): + async def test_expanded_user_infraction_counts_returns_correct_strings(self): """The method should correctly list the total and active number of all infractions split by infraction type.""" test_values = ( { "api response": [], - "expected_lines": ["This user has never received an infraction."], + "expected_lines": ["No infractions"], }, # Shows non-hidden inactive infraction as expected { @@ -304,24 +227,24 @@ class UserInfractionHelperMethodTests(unittest.TestCase): }, ) - header = ["**Infractions**"] + header = "Infractions" - self._method_subtests(self.cog.expanded_user_infraction_counts, test_values, header) + await self._method_subtests(self.cog.expanded_user_infraction_counts, test_values, header) - def test_user_nomination_counts_returns_correct_strings(self): + async def test_user_nomination_counts_returns_correct_strings(self): """The method should list the number of active and historical nominations for the user.""" test_values = ( { "api response": [], - "expected_lines": ["This user has never been nominated."], + "expected_lines": ["No nominations"], }, { "api response": [{'active': True}], - "expected_lines": ["This user is **currently** nominated (1 nomination in total)."], + "expected_lines": ["This user is **currently** nominated", "(1 nomination in total)"], }, { "api response": [{'active': True}, {'active': False}], - "expected_lines": ["This user is **currently** nominated (2 nominations in total)."], + "expected_lines": ["This user is **currently** nominated", "(2 nominations in total)"], }, { "api response": [{'active': False}], @@ -334,14 +257,14 @@ class UserInfractionHelperMethodTests(unittest.TestCase): ) - header = ["**Nominations**"] + header = "Nominations" - self._method_subtests(self.cog.user_nomination_counts, test_values, header) + await self._method_subtests(self.cog.user_nomination_counts, test_values, header) [email protected]("bot.cogs.information.time_since", new=unittest.mock.MagicMock(return_value="1 year ago")) [email protected]("bot.cogs.information.constants.MODERATION_CHANNELS", new=[50]) -class UserEmbedTests(unittest.TestCase): [email protected]("bot.exts.info.information.time_since", new=unittest.mock.MagicMock(return_value="1 year ago")) [email protected]("bot.exts.info.information.constants.MODERATION_CHANNELS", new=[50]) +class UserEmbedTests(unittest.IsolatedAsyncioTestCase): """Tests for the creation of the `!user` embed.""" def setUp(self): @@ -350,32 +273,41 @@ class UserEmbedTests(unittest.TestCase): self.bot.api_client.get = unittest.mock.AsyncMock() self.cog = information.Information(self.bot) - @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value="")) - def test_create_user_embed_uses_string_representation_of_user_in_title_if_nick_is_not_available(self): + @unittest.mock.patch( + f"{COG_PATH}.basic_user_infraction_counts", + new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions")) + ) + async def test_create_user_embed_uses_string_representation_of_user_in_title_if_nick_is_not_available(self): """The embed should use the string representation of the user if they don't have a nick.""" ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=1)) user = helpers.MockMember() user.nick = None user.__str__ = unittest.mock.Mock(return_value="Mr. Hemlock") - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) self.assertEqual(embed.title, "Mr. Hemlock") - @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value="")) - def test_create_user_embed_uses_nick_in_title_if_available(self): + @unittest.mock.patch( + f"{COG_PATH}.basic_user_infraction_counts", + new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions")) + ) + async def test_create_user_embed_uses_nick_in_title_if_available(self): """The embed should use the nick if it's available.""" ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=1)) user = helpers.MockMember() user.nick = "Cat lover" user.__str__ = unittest.mock.Mock(return_value="Mr. Hemlock") - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) self.assertEqual(embed.title, "Cat lover (Mr. Hemlock)") - @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value="")) - def test_create_user_embed_ignores_everyone_role(self): + @unittest.mock.patch( + f"{COG_PATH}.basic_user_infraction_counts", + new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions")) + ) + async def test_create_user_embed_ignores_everyone_role(self): """Created `!user` embeds should not contain mention of the @everyone-role.""" ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=1)) admins_role = helpers.MockRole(name='Admins') @@ -384,80 +316,92 @@ class UserEmbedTests(unittest.TestCase): # A `MockMember` has the @Everyone role by default; we add the Admins to that. user = helpers.MockMember(roles=[admins_role], top_role=admins_role) - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) - self.assertIn("&Admins", embed.description) - self.assertNotIn("&Everyone", embed.description) + self.assertIn("&Admins", embed.fields[1].value) + self.assertNotIn("&Everyone", embed.fields[1].value) @unittest.mock.patch(f"{COG_PATH}.expanded_user_infraction_counts", new_callable=unittest.mock.AsyncMock) @unittest.mock.patch(f"{COG_PATH}.user_nomination_counts", new_callable=unittest.mock.AsyncMock) - def test_create_user_embed_expanded_information_in_moderation_channels(self, nomination_counts, infraction_counts): + async def test_create_user_embed_expanded_information_in_moderation_channels( + self, + nomination_counts, + infraction_counts + ): """The embed should contain expanded infractions and nomination info in mod channels.""" ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=50)) moderators_role = helpers.MockRole(name='Moderators') moderators_role.colour = 100 - infraction_counts.return_value = "expanded infractions info" - nomination_counts.return_value = "nomination info" + infraction_counts.return_value = ("Infractions", "expanded infractions info") + nomination_counts.return_value = ("Nominations", "nomination info") user = helpers.MockMember(id=314, roles=[moderators_role], top_role=moderators_role) - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) infraction_counts.assert_called_once_with(user) nomination_counts.assert_called_once_with(user) self.assertEqual( textwrap.dedent(f""" - **User Information** Created: {"1 year ago"} Profile: {user.mention} ID: {user.id} + """).strip(), + embed.fields[0].value + ) - **Member Information** + self.assertEqual( + textwrap.dedent(f""" Joined: {"1 year ago"} Roles: &Moderators - - expanded infractions info - - nomination info """).strip(), - embed.description + embed.fields[1].value ) @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new_callable=unittest.mock.AsyncMock) - def test_create_user_embed_basic_information_outside_of_moderation_channels(self, infraction_counts): + async def test_create_user_embed_basic_information_outside_of_moderation_channels(self, infraction_counts): """The embed should contain only basic infraction data outside of mod channels.""" ctx = helpers.MockContext(channel=helpers.MockTextChannel(id=100)) moderators_role = helpers.MockRole(name='Moderators') moderators_role.colour = 100 - infraction_counts.return_value = "basic infractions info" + infraction_counts.return_value = ("Infractions", "basic infractions info") user = helpers.MockMember(id=314, roles=[moderators_role], top_role=moderators_role) - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) infraction_counts.assert_called_once_with(user) self.assertEqual( textwrap.dedent(f""" - **User Information** Created: {"1 year ago"} Profile: {user.mention} ID: {user.id} + """).strip(), + embed.fields[0].value + ) - **Member Information** + self.assertEqual( + textwrap.dedent(f""" Joined: {"1 year ago"} Roles: &Moderators - - basic infractions info """).strip(), - embed.description + embed.fields[1].value + ) + + self.assertEqual( + "basic infractions info", + embed.fields[2].value ) - @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value="")) - def test_create_user_embed_uses_top_role_colour_when_user_has_roles(self): + @unittest.mock.patch( + f"{COG_PATH}.basic_user_infraction_counts", + new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions")) + ) + async def test_create_user_embed_uses_top_role_colour_when_user_has_roles(self): """The embed should be created with the colour of the top role, if a top role is available.""" ctx = helpers.MockContext() @@ -465,35 +409,41 @@ class UserEmbedTests(unittest.TestCase): moderators_role.colour = 100 user = helpers.MockMember(id=314, roles=[moderators_role], top_role=moderators_role) - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) self.assertEqual(embed.colour, discord.Colour(moderators_role.colour)) - @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value="")) - def test_create_user_embed_uses_blurple_colour_when_user_has_no_roles(self): + @unittest.mock.patch( + f"{COG_PATH}.basic_user_infraction_counts", + new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions")) + ) + async def test_create_user_embed_uses_blurple_colour_when_user_has_no_roles(self): """The embed should be created with a blurple colour if the user has no assigned roles.""" ctx = helpers.MockContext() user = helpers.MockMember(id=217) - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) self.assertEqual(embed.colour, discord.Colour.blurple()) - @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new=unittest.mock.AsyncMock(return_value="")) - def test_create_user_embed_uses_png_format_of_user_avatar_as_thumbnail(self): + @unittest.mock.patch( + f"{COG_PATH}.basic_user_infraction_counts", + new=unittest.mock.AsyncMock(return_value=("Infractions", "basic infractions")) + ) + async def test_create_user_embed_uses_png_format_of_user_avatar_as_thumbnail(self): """The embed thumbnail should be set to the user's avatar in `png` format.""" ctx = helpers.MockContext() user = helpers.MockMember(id=217) user.avatar_url_as.return_value = "avatar url" - embed = asyncio.run(self.cog.create_user_embed(ctx, user)) + embed = await self.cog.create_user_embed(ctx, user) user.avatar_url_as.assert_called_once_with(static_format="png") self.assertEqual(embed.thumbnail.url, "avatar url") [email protected]("bot.cogs.information.constants") -class UserCommandTests(unittest.TestCase): [email protected]("bot.exts.info.information.constants") +class UserCommandTests(unittest.IsolatedAsyncioTestCase): """Tests for the `!user` command.""" def setUp(self): @@ -509,76 +459,70 @@ class UserCommandTests(unittest.TestCase): self.moderator = helpers.MockMember(id=2, name="riffautae", roles=[self.moderator_role]) self.target = helpers.MockMember(id=3, name="__fluzz__") - def test_regular_member_cannot_target_another_member(self, constants): + # There's no way to mock the channel constant without deferring imports. The constant is + # used as a default value for a parameter, which gets defined upon import. + self.bot_command_channel = helpers.MockTextChannel(id=constants.Channels.bot_commands) + + async def test_regular_member_cannot_target_another_member(self, constants): """A regular user should not be able to use `!user` targeting another user.""" constants.MODERATION_ROLES = [self.moderator_role.id] - ctx = helpers.MockContext(author=self.author) - asyncio.run(self.cog.user_info.callback(self.cog, ctx, self.target)) + await self.cog.user_info(self.cog, ctx, self.target) ctx.send.assert_called_once_with("You may not use this command on users other than yourself.") - def test_regular_member_cannot_use_command_outside_of_bot_commands(self, constants): + async def test_regular_member_cannot_use_command_outside_of_bot_commands(self, constants): """A regular user should not be able to use this command outside of bot-commands.""" constants.MODERATION_ROLES = [self.moderator_role.id] constants.STAFF_ROLES = [self.moderator_role.id] - constants.Channels.bot_commands = 50 - ctx = helpers.MockContext(author=self.author, channel=helpers.MockTextChannel(id=100)) msg = "Sorry, but you may only use this command within <#50>." with self.assertRaises(InWhitelistCheckFailure, msg=msg): - asyncio.run(self.cog.user_info.callback(self.cog, ctx)) + await self.cog.user_info(self.cog, ctx) - @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock) - def test_regular_user_may_use_command_in_bot_commands_channel(self, create_embed, constants): + @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed") + async def test_regular_user_may_use_command_in_bot_commands_channel(self, create_embed, constants): """A regular user should be allowed to use `!user` targeting themselves in bot-commands.""" constants.STAFF_ROLES = [self.moderator_role.id] - constants.Channels.bot_commands = 50 + ctx = helpers.MockContext(author=self.author, channel=self.bot_command_channel) - ctx = helpers.MockContext(author=self.author, channel=helpers.MockTextChannel(id=50)) - - asyncio.run(self.cog.user_info.callback(self.cog, ctx)) + await self.cog.user_info(self.cog, ctx) create_embed.assert_called_once_with(ctx, self.author) ctx.send.assert_called_once() - @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock) - def test_regular_user_can_explicitly_target_themselves(self, create_embed, constants): + @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed") + async def test_regular_user_can_explicitly_target_themselves(self, create_embed, _): """A user should target itself with `!user` when a `user` argument was not provided.""" constants.STAFF_ROLES = [self.moderator_role.id] - constants.Channels.bot_commands = 50 - - ctx = helpers.MockContext(author=self.author, channel=helpers.MockTextChannel(id=50)) + ctx = helpers.MockContext(author=self.author, channel=self.bot_command_channel) - asyncio.run(self.cog.user_info.callback(self.cog, ctx, self.author)) + await self.cog.user_info(self.cog, ctx, self.author) create_embed.assert_called_once_with(ctx, self.author) ctx.send.assert_called_once() - @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock) - def test_staff_members_can_bypass_channel_restriction(self, create_embed, constants): + @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed") + async def test_staff_members_can_bypass_channel_restriction(self, create_embed, constants): """Staff members should be able to bypass the bot-commands channel restriction.""" constants.STAFF_ROLES = [self.moderator_role.id] - constants.Channels.bot_commands = 50 - ctx = helpers.MockContext(author=self.moderator, channel=helpers.MockTextChannel(id=200)) - asyncio.run(self.cog.user_info.callback(self.cog, ctx)) + await self.cog.user_info(self.cog, ctx) create_embed.assert_called_once_with(ctx, self.moderator) ctx.send.assert_called_once() - @unittest.mock.patch("bot.cogs.information.Information.create_user_embed", new_callable=unittest.mock.AsyncMock) - def test_moderators_can_target_another_member(self, create_embed, constants): + @unittest.mock.patch("bot.exts.info.information.Information.create_user_embed") + async def test_moderators_can_target_another_member(self, create_embed, constants): """A moderator should be able to use `!user` targeting another user.""" constants.MODERATION_ROLES = [self.moderator_role.id] constants.STAFF_ROLES = [self.moderator_role.id] - ctx = helpers.MockContext(author=self.moderator, channel=helpers.MockTextChannel(id=50)) - asyncio.run(self.cog.user_info.callback(self.cog, ctx, self.target)) + await self.cog.user_info(self.cog, ctx, self.target) create_embed.assert_called_once_with(ctx, self.target) ctx.send.assert_called_once() diff --git a/tests/bot/exts/moderation/__init__.py b/tests/bot/exts/moderation/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/bot/exts/moderation/__init__.py diff --git a/tests/bot/exts/moderation/infraction/__init__.py b/tests/bot/exts/moderation/infraction/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/bot/exts/moderation/infraction/__init__.py diff --git a/tests/bot/exts/moderation/infraction/test_infractions.py b/tests/bot/exts/moderation/infraction/test_infractions.py new file mode 100644 index 000000000..bf557a484 --- /dev/null +++ b/tests/bot/exts/moderation/infraction/test_infractions.py @@ -0,0 +1,201 @@ +import textwrap +import unittest +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +from bot.constants import Event +from bot.exts.moderation.infraction.infractions import Infractions +from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole + + +class TruncationTests(unittest.IsolatedAsyncioTestCase): + """Tests for ban and kick command reason truncation.""" + + def setUp(self): + self.bot = MockBot() + self.cog = Infractions(self.bot) + self.user = MockMember(id=1234, top_role=MockRole(id=3577, position=10)) + self.target = MockMember(id=1265, top_role=MockRole(id=9876, position=0)) + self.guild = MockGuild(id=4567) + self.ctx = MockContext(bot=self.bot, author=self.user, guild=self.guild) + + @patch("bot.exts.moderation.infraction._utils.get_active_infraction") + @patch("bot.exts.moderation.infraction._utils.post_infraction") + async def test_apply_ban_reason_truncation(self, post_infraction_mock, get_active_mock): + """Should truncate reason for `ctx.guild.ban`.""" + get_active_mock.return_value = None + post_infraction_mock.return_value = {"foo": "bar"} + + self.cog.apply_infraction = AsyncMock() + self.bot.get_cog.return_value = AsyncMock() + self.cog.mod_log.ignore = Mock() + self.ctx.guild.ban = Mock() + + await self.cog.apply_ban(self.ctx, self.target, "foo bar" * 3000) + self.ctx.guild.ban.assert_called_once_with( + self.target, + reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="..."), + delete_message_days=0 + ) + self.cog.apply_infraction.assert_awaited_once_with( + self.ctx, {"foo": "bar"}, self.target, self.ctx.guild.ban.return_value + ) + + @patch("bot.exts.moderation.infraction._utils.post_infraction") + async def test_apply_kick_reason_truncation(self, post_infraction_mock): + """Should truncate reason for `Member.kick`.""" + post_infraction_mock.return_value = {"foo": "bar"} + + self.cog.apply_infraction = AsyncMock() + self.cog.mod_log.ignore = Mock() + self.target.kick = Mock() + + await self.cog.apply_kick(self.ctx, self.target, "foo bar" * 3000) + self.target.kick.assert_called_once_with(reason=textwrap.shorten("foo bar" * 3000, 512, placeholder="...")) + self.cog.apply_infraction.assert_awaited_once_with( + self.ctx, {"foo": "bar"}, self.target, self.target.kick.return_value + ) + + +@patch("bot.exts.moderation.infraction.infractions.constants.Roles.voice_verified", new=123456) +class VoiceBanTests(unittest.IsolatedAsyncioTestCase): + """Tests for voice ban related functions and commands.""" + + def setUp(self): + self.bot = MockBot() + self.mod = MockMember(top_role=10) + self.user = MockMember(top_role=1, roles=[MockRole(id=123456)]) + self.guild = MockGuild() + self.ctx = MockContext(bot=self.bot, author=self.mod) + self.cog = Infractions(self.bot) + + async def test_permanent_voice_ban(self): + """Should call voice ban applying function without expiry.""" + self.cog.apply_voice_ban = AsyncMock() + self.assertIsNone(await self.cog.voiceban(self.cog, self.ctx, self.user, reason="foobar")) + self.cog.apply_voice_ban.assert_awaited_once_with(self.ctx, self.user, "foobar") + + async def test_temporary_voice_ban(self): + """Should call voice ban applying function with expiry.""" + self.cog.apply_voice_ban = AsyncMock() + self.assertIsNone(await self.cog.tempvoiceban(self.cog, self.ctx, self.user, "baz", reason="foobar")) + self.cog.apply_voice_ban.assert_awaited_once_with(self.ctx, self.user, "foobar", expires_at="baz") + + async def test_voice_unban(self): + """Should call infraction pardoning function.""" + self.cog.pardon_infraction = AsyncMock() + self.assertIsNone(await self.cog.unvoiceban(self.cog, self.ctx, self.user)) + self.cog.pardon_infraction.assert_awaited_once_with(self.ctx, "voice_ban", self.user) + + @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction") + @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction") + async def test_voice_ban_user_have_active_infraction(self, get_active_infraction, post_infraction_mock): + """Should return early when user already have Voice Ban infraction.""" + get_active_infraction.return_value = {"foo": "bar"} + self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar")) + get_active_infraction.assert_awaited_once_with(self.ctx, self.user, "voice_ban") + post_infraction_mock.assert_not_awaited() + + @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction") + @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction") + async def test_voice_ban_infraction_post_failed(self, get_active_infraction, post_infraction_mock): + """Should return early when posting infraction fails.""" + self.cog.mod_log.ignore = MagicMock() + get_active_infraction.return_value = None + post_infraction_mock.return_value = None + self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar")) + post_infraction_mock.assert_awaited_once() + self.cog.mod_log.ignore.assert_not_called() + + @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction") + @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction") + async def test_voice_ban_infraction_post_add_kwargs(self, get_active_infraction, post_infraction_mock): + """Should pass all kwargs passed to apply_voice_ban to post_infraction.""" + get_active_infraction.return_value = None + # We don't want that this continue yet + post_infraction_mock.return_value = None + self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar", my_kwarg=23)) + post_infraction_mock.assert_awaited_once_with( + self.ctx, self.user, "voice_ban", "foobar", active=True, my_kwarg=23 + ) + + @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction") + @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction") + async def test_voice_ban_mod_log_ignore(self, get_active_infraction, post_infraction_mock): + """Should ignore Voice Verified role removing.""" + self.cog.mod_log.ignore = MagicMock() + self.cog.apply_infraction = AsyncMock() + self.user.remove_roles = MagicMock(return_value="my_return_value") + + get_active_infraction.return_value = None + post_infraction_mock.return_value = {"foo": "bar"} + + self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar")) + self.cog.mod_log.ignore.assert_called_once_with(Event.member_update, self.user.id) + + @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction") + @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction") + async def test_voice_ban_apply_infraction(self, get_active_infraction, post_infraction_mock): + """Should ignore Voice Verified role removing.""" + self.cog.mod_log.ignore = MagicMock() + self.cog.apply_infraction = AsyncMock() + self.user.remove_roles = MagicMock(return_value="my_return_value") + + get_active_infraction.return_value = None + post_infraction_mock.return_value = {"foo": "bar"} + + self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar")) + self.user.remove_roles.assert_called_once_with(self.cog._voice_verified_role, reason="foobar") + self.cog.apply_infraction.assert_awaited_once_with(self.ctx, {"foo": "bar"}, self.user, "my_return_value") + + @patch("bot.exts.moderation.infraction.infractions._utils.post_infraction") + @patch("bot.exts.moderation.infraction.infractions._utils.get_active_infraction") + async def test_voice_ban_truncate_reason(self, get_active_infraction, post_infraction_mock): + """Should truncate reason for voice ban.""" + self.cog.mod_log.ignore = MagicMock() + self.cog.apply_infraction = AsyncMock() + self.user.remove_roles = MagicMock(return_value="my_return_value") + + get_active_infraction.return_value = None + post_infraction_mock.return_value = {"foo": "bar"} + + self.assertIsNone(await self.cog.apply_voice_ban(self.ctx, self.user, "foobar" * 3000)) + self.user.remove_roles.assert_called_once_with( + self.cog._voice_verified_role, reason=textwrap.shorten("foobar" * 3000, 512, placeholder="...") + ) + self.cog.apply_infraction.assert_awaited_once_with(self.ctx, {"foo": "bar"}, self.user, "my_return_value") + + async def test_voice_unban_user_not_found(self): + """Should include info to return dict when user was not found from guild.""" + self.guild.get_member.return_value = None + result = await self.cog.pardon_voice_ban(self.user.id, self.guild, "foobar") + self.assertEqual(result, {"Info": "User was not found in the guild."}) + + @patch("bot.exts.moderation.infraction.infractions._utils.notify_pardon") + @patch("bot.exts.moderation.infraction.infractions.format_user") + async def test_voice_unban_user_found(self, format_user_mock, notify_pardon_mock): + """Should add role back with ignoring, notify user and return log dictionary..""" + self.guild.get_member.return_value = self.user + notify_pardon_mock.return_value = True + format_user_mock.return_value = "my-user" + + result = await self.cog.pardon_voice_ban(self.user.id, self.guild, "foobar") + self.assertEqual(result, { + "Member": "my-user", + "DM": "Sent" + }) + notify_pardon_mock.assert_awaited_once() + + @patch("bot.exts.moderation.infraction.infractions._utils.notify_pardon") + @patch("bot.exts.moderation.infraction.infractions.format_user") + async def test_voice_unban_dm_fail(self, format_user_mock, notify_pardon_mock): + """Should add role back with ignoring, notify user and return log dictionary..""" + self.guild.get_member.return_value = self.user + notify_pardon_mock.return_value = False + format_user_mock.return_value = "my-user" + + result = await self.cog.pardon_voice_ban(self.user.id, self.guild, "foobar") + self.assertEqual(result, { + "Member": "my-user", + "DM": "**Failed**" + }) + notify_pardon_mock.assert_awaited_once() diff --git a/tests/bot/exts/moderation/infraction/test_utils.py b/tests/bot/exts/moderation/infraction/test_utils.py new file mode 100644 index 000000000..5b62463e0 --- /dev/null +++ b/tests/bot/exts/moderation/infraction/test_utils.py @@ -0,0 +1,359 @@ +import unittest +from collections import namedtuple +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock, call, patch + +from discord import Embed, Forbidden, HTTPException, NotFound + +from bot.api import ResponseCodeError +from bot.constants import Colours, Icons +from bot.exts.moderation.infraction import _utils as utils +from tests.helpers import MockBot, MockContext, MockMember, MockUser + + +class ModerationUtilsTests(unittest.IsolatedAsyncioTestCase): + """Tests Moderation utils.""" + + def setUp(self): + self.bot = MockBot() + self.member = MockMember(id=1234) + self.user = MockUser(id=1234) + self.ctx = MockContext(bot=self.bot, author=self.member) + + async def test_post_user(self): + """Should POST a new user and return the response if successful or otherwise send an error message.""" + user = MockUser(discriminator=5678, id=1234, name="Test user") + not_user = MagicMock(discriminator=3333, id=5678, name="Wrong user") + test_cases = [ + { + "user": user, + "post_result": "bar", + "raise_error": None, + "payload": { + "discriminator": 5678, + "id": self.user.id, + "in_guild": False, + "name": "Test user", + "roles": [] + } + }, + { + "user": self.member, + "post_result": "foo", + "raise_error": ResponseCodeError(MagicMock(status=400), "foo"), + "payload": { + "discriminator": 0, + "id": self.member.id, + "in_guild": False, + "name": "Name unknown", + "roles": [] + } + }, + { + "user": not_user, + "post_result": "bar", + "raise_error": None, + "payload": { + "discriminator": not_user.discriminator, + "id": not_user.id, + "in_guild": False, + "name": not_user.name, + "roles": [] + } + } + ] + + for case in test_cases: + user = case["user"] + post_result = case["post_result"] + raise_error = case["raise_error"] + payload = case["payload"] + + with self.subTest(user=user, post_result=post_result, raise_error=raise_error, payload=payload): + self.bot.api_client.post.reset_mock(side_effect=True) + self.ctx.bot.api_client.post.return_value = post_result + + self.ctx.bot.api_client.post.side_effect = raise_error + + result = await utils.post_user(self.ctx, user) + + if raise_error: + self.assertIsNone(result) + self.ctx.send.assert_awaited_once() + self.assertIn(str(raise_error.status), self.ctx.send.call_args[0][0]) + else: + self.assertEqual(result, post_result) + self.bot.api_client.post.assert_awaited_once_with("bot/users", json=payload) + + async def test_get_active_infraction(self): + """ + Should request the API for active infractions and return infraction if the user has one or `None` otherwise. + + A message should be sent to the context indicating a user already has an infraction, if that's the case. + """ + test_case = namedtuple("test_case", ["get_return_value", "expected_output", "infraction_nr", "send_msg"]) + test_cases = [ + test_case([], None, None, True), + test_case([{"id": 123987}], {"id": 123987}, "123987", False), + test_case([{"id": 123987}], {"id": 123987}, "123987", True) + ] + + for case in test_cases: + with self.subTest(return_value=case.get_return_value, expected=case.expected_output): + self.bot.api_client.get.reset_mock() + self.ctx.send.reset_mock() + + params = { + "active": "true", + "type": "ban", + "user__id": str(self.member.id) + } + + self.bot.api_client.get.return_value = case.get_return_value + + result = await utils.get_active_infraction(self.ctx, self.member, "ban", send_msg=case.send_msg) + self.assertEqual(result, case.expected_output) + self.bot.api_client.get.assert_awaited_once_with("bot/infractions", params=params) + + if case.send_msg and case.get_return_value: + self.ctx.send.assert_awaited_once() + sent_message = self.ctx.send.call_args[0][0] + self.assertIn(case.infraction_nr, sent_message) + self.assertIn("ban", sent_message) + else: + self.ctx.send.assert_not_awaited() + + @patch("bot.exts.moderation.infraction._utils.send_private_embed") + async def test_notify_infraction(self, send_private_embed_mock): + """ + Should send an embed of a certain format as a DM and return `True` if DM successful. + + Appealable infractions should have the appeal message in the embed's footer. + """ + test_cases = [ + { + "args": (self.user, "ban", "2020-02-26 09:20 (23 hours and 59 minutes)"), + "expected_output": Embed( + title=utils.INFRACTION_TITLE, + description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format( + type="Ban", + expires="2020-02-26 09:20 (23 hours and 59 minutes)", + reason="No reason provided." + ), + colour=Colours.soft_red, + url=utils.RULES_URL + ).set_author( + name=utils.INFRACTION_AUTHOR_NAME, + url=utils.RULES_URL, + icon_url=Icons.token_removed + ).set_footer(text=utils.INFRACTION_APPEAL_FOOTER), + "send_result": True + }, + { + "args": (self.user, "warning", None, "Test reason."), + "expected_output": Embed( + title=utils.INFRACTION_TITLE, + description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format( + type="Warning", + expires="N/A", + reason="Test reason." + ), + colour=Colours.soft_red, + url=utils.RULES_URL + ).set_author( + name=utils.INFRACTION_AUTHOR_NAME, + url=utils.RULES_URL, + icon_url=Icons.token_removed + ), + "send_result": False + }, + { + "args": (self.user, "note", None, None, Icons.defcon_denied), + "expected_output": Embed( + title=utils.INFRACTION_TITLE, + description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format( + type="Note", + expires="N/A", + reason="No reason provided." + ), + colour=Colours.soft_red, + url=utils.RULES_URL + ).set_author( + name=utils.INFRACTION_AUTHOR_NAME, + url=utils.RULES_URL, + icon_url=Icons.defcon_denied + ), + "send_result": False + }, + { + "args": (self.user, "mute", "2020-02-26 09:20 (23 hours and 59 minutes)", "Test", Icons.defcon_denied), + "expected_output": Embed( + title=utils.INFRACTION_TITLE, + description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format( + type="Mute", + expires="2020-02-26 09:20 (23 hours and 59 minutes)", + reason="Test" + ), + colour=Colours.soft_red, + url=utils.RULES_URL + ).set_author( + name=utils.INFRACTION_AUTHOR_NAME, + url=utils.RULES_URL, + icon_url=Icons.defcon_denied + ).set_footer(text=utils.INFRACTION_APPEAL_FOOTER), + "send_result": False + }, + { + "args": (self.user, "mute", None, "foo bar" * 4000, Icons.defcon_denied), + "expected_output": Embed( + title=utils.INFRACTION_TITLE, + description=utils.INFRACTION_DESCRIPTION_TEMPLATE.format( + type="Mute", + expires="N/A", + reason="foo bar" * 4000 + )[:2045] + "...", + colour=Colours.soft_red, + url=utils.RULES_URL + ).set_author( + name=utils.INFRACTION_AUTHOR_NAME, + url=utils.RULES_URL, + icon_url=Icons.defcon_denied + ).set_footer(text=utils.INFRACTION_APPEAL_FOOTER), + "send_result": True + } + ] + + for case in test_cases: + with self.subTest(args=case["args"], expected=case["expected_output"], send=case["send_result"]): + send_private_embed_mock.reset_mock() + + send_private_embed_mock.return_value = case["send_result"] + result = await utils.notify_infraction(*case["args"]) + + self.assertEqual(case["send_result"], result) + + embed = send_private_embed_mock.call_args[0][1] + + self.assertEqual(embed.to_dict(), case["expected_output"].to_dict()) + + send_private_embed_mock.assert_awaited_once_with(case["args"][0], embed) + + @patch("bot.exts.moderation.infraction._utils.send_private_embed") + async def test_notify_pardon(self, send_private_embed_mock): + """Should send an embed of a certain format as a DM and return `True` if DM successful.""" + test_case = namedtuple("test_case", ["args", "icon", "send_result"]) + test_cases = [ + test_case((self.user, "Test title", "Example content"), Icons.user_verified, True), + test_case((self.user, "Test title", "Example content", Icons.user_update), Icons.user_update, False) + ] + + for case in test_cases: + expected = Embed( + description="Example content", + colour=Colours.soft_green + ).set_author( + name="Test title", + icon_url=case.icon + ) + + with self.subTest(args=case.args, expected=expected): + send_private_embed_mock.reset_mock() + + send_private_embed_mock.return_value = case.send_result + + result = await utils.notify_pardon(*case.args) + self.assertEqual(case.send_result, result) + + embed = send_private_embed_mock.call_args[0][1] + self.assertEqual(embed.to_dict(), expected.to_dict()) + + send_private_embed_mock.assert_awaited_once_with(case.args[0], embed) + + async def test_send_private_embed(self): + """Should DM the user and return `True` on success or `False` on failure.""" + embed = Embed(title="Test", description="Test val") + + test_case = namedtuple("test_case", ["expected_output", "raised_exception"]) + test_cases = [ + test_case(True, None), + test_case(False, HTTPException(AsyncMock(), AsyncMock())), + test_case(False, Forbidden(AsyncMock(), AsyncMock())), + test_case(False, NotFound(AsyncMock(), AsyncMock())) + ] + + for case in test_cases: + with self.subTest(expected=case.expected_output, raised=case.raised_exception): + self.user.send.reset_mock(side_effect=True) + self.user.send.side_effect = case.raised_exception + + result = await utils.send_private_embed(self.user, embed) + + self.assertEqual(result, case.expected_output) + self.user.send.assert_awaited_once_with(embed=embed) + + +class TestPostInfraction(unittest.IsolatedAsyncioTestCase): + """Tests for the `post_infraction` function.""" + + def setUp(self): + self.bot = MockBot() + self.member = MockMember(id=1234) + self.user = MockUser(id=1234) + self.ctx = MockContext(bot=self.bot, author=self.member) + + async def test_normal_post_infraction(self): + """Should return response from POST request if there are no errors.""" + now = datetime.now() + payload = { + "actor": self.ctx.author.id, + "hidden": True, + "reason": "Test reason", + "type": "ban", + "user": self.member.id, + "active": False, + "expires_at": now.isoformat() + } + + self.ctx.bot.api_client.post.return_value = "foo" + actual = await utils.post_infraction(self.ctx, self.member, "ban", "Test reason", now, True, False) + + self.assertEqual(actual, "foo") + self.ctx.bot.api_client.post.assert_awaited_once_with("bot/infractions", json=payload) + + async def test_unknown_error_post_infraction(self): + """Should send an error message to chat when a non-400 error occurs.""" + self.ctx.bot.api_client.post.side_effect = ResponseCodeError(AsyncMock(), AsyncMock()) + self.ctx.bot.api_client.post.side_effect.status = 500 + + actual = await utils.post_infraction(self.ctx, self.user, "ban", "Test reason") + self.assertIsNone(actual) + + self.assertTrue("500" in self.ctx.send.call_args[0][0]) + + @patch("bot.exts.moderation.infraction._utils.post_user", return_value=None) + async def test_user_not_found_none_post_infraction(self, post_user_mock): + """Should abort and return `None` when a new user fails to be posted.""" + self.bot.api_client.post.side_effect = ResponseCodeError(MagicMock(status=400), {"user": "foo"}) + + actual = await utils.post_infraction(self.ctx, self.user, "mute", "Test reason") + self.assertIsNone(actual) + post_user_mock.assert_awaited_once_with(self.ctx, self.user) + + @patch("bot.exts.moderation.infraction._utils.post_user", return_value="bar") + async def test_first_fail_second_success_user_post_infraction(self, post_user_mock): + """Should post the user if they don't exist, POST infraction again, and return the response if successful.""" + payload = { + "actor": self.ctx.author.id, + "hidden": False, + "reason": "Test reason", + "type": "mute", + "user": self.user.id, + "active": True + } + + self.bot.api_client.post.side_effect = [ResponseCodeError(MagicMock(status=400), {"user": "foo"}), "foo"] + + actual = await utils.post_infraction(self.ctx, self.user, "mute", "Test reason") + self.assertEqual(actual, "foo") + self.bot.api_client.post.assert_has_awaits([call("bot/infractions", json=payload)] * 2) + post_user_mock.assert_awaited_once_with(self.ctx, self.user) diff --git a/tests/bot/exts/moderation/test_incidents.py b/tests/bot/exts/moderation/test_incidents.py new file mode 100644 index 000000000..cbf7f7bcf --- /dev/null +++ b/tests/bot/exts/moderation/test_incidents.py @@ -0,0 +1,770 @@ +import asyncio +import enum +import logging +import typing as t +import unittest +from unittest.mock import AsyncMock, MagicMock, call, patch + +import aiohttp +import discord + +from bot.constants import Colours +from bot.exts.moderation import incidents +from tests.helpers import ( + MockAsyncWebhook, + MockAttachment, + MockBot, + MockMember, + MockMessage, + MockReaction, + MockRole, + MockTextChannel, + MockUser, +) + + +class MockAsyncIterable: + """ + Helper for mocking asynchronous for loops. + + It does not appear that the `unittest` library currently provides anything that would + allow us to simply mock an async iterator, such as `discord.TextChannel.history`. + + We therefore write our own helper to wrap a regular synchronous iterable, and feed + its values via `__anext__` rather than `__next__`. + + This class was written for the purposes of testing the `Incidents` cog - it may not + be generic enough to be placed in the `tests.helpers` module. + """ + + def __init__(self, messages: t.Iterable): + """Take a sync iterable to be wrapped.""" + self.iter_messages = iter(messages) + + def __aiter__(self): + """Return `self` as we provide the `__anext__` method.""" + return self + + async def __anext__(self): + """ + Feed the next item, or raise `StopAsyncIteration`. + + Since we're wrapping a sync iterator, it will communicate that it has been depleted + by raising a `StopIteration`. The `async for` construct does not expect it, and we + therefore need to substitute it for the appropriate exception type. + """ + try: + return next(self.iter_messages) + except StopIteration: + raise StopAsyncIteration + + +class MockSignal(enum.Enum): + A = "A" + B = "B" + + +mock_404 = discord.NotFound( + response=MagicMock(aiohttp.ClientResponse), # Mock the erroneous response + message="Not found", +) + + +class TestDownloadFile(unittest.IsolatedAsyncioTestCase): + """Collection of tests for the `download_file` helper function.""" + + async def test_download_file_success(self): + """If `to_file` succeeds, function returns the acquired `discord.File`.""" + file = MagicMock(discord.File, filename="bigbadlemon.jpg") + attachment = MockAttachment(to_file=AsyncMock(return_value=file)) + + acquired_file = await incidents.download_file(attachment) + self.assertIs(file, acquired_file) + + async def test_download_file_404(self): + """If `to_file` encounters a 404, function handles the exception & returns None.""" + attachment = MockAttachment(to_file=AsyncMock(side_effect=mock_404)) + + acquired_file = await incidents.download_file(attachment) + self.assertIsNone(acquired_file) + + async def test_download_file_fail(self): + """If `to_file` fails on a non-404 error, function logs the exception & returns None.""" + arbitrary_error = discord.HTTPException(MagicMock(aiohttp.ClientResponse), "Arbitrary API error") + attachment = MockAttachment(to_file=AsyncMock(side_effect=arbitrary_error)) + + with self.assertLogs(logger=incidents.log, level=logging.ERROR): + acquired_file = await incidents.download_file(attachment) + + self.assertIsNone(acquired_file) + + +class TestMakeEmbed(unittest.IsolatedAsyncioTestCase): + """Collection of tests for the `make_embed` helper function.""" + + async def test_make_embed_actioned(self): + """Embed is coloured green and footer contains 'Actioned' when `outcome=Signal.ACTIONED`.""" + embed, file = await incidents.make_embed(MockMessage(), incidents.Signal.ACTIONED, MockMember()) + + self.assertEqual(embed.colour.value, Colours.soft_green) + self.assertIn("Actioned", embed.footer.text) + + async def test_make_embed_not_actioned(self): + """Embed is coloured red and footer contains 'Rejected' when `outcome=Signal.NOT_ACTIONED`.""" + embed, file = await incidents.make_embed(MockMessage(), incidents.Signal.NOT_ACTIONED, MockMember()) + + self.assertEqual(embed.colour.value, Colours.soft_red) + self.assertIn("Rejected", embed.footer.text) + + async def test_make_embed_content(self): + """Incident content appears as embed description.""" + incident = MockMessage(content="this is an incident") + embed, file = await incidents.make_embed(incident, incidents.Signal.ACTIONED, MockMember()) + + self.assertEqual(incident.content, embed.description) + + async def test_make_embed_with_attachment_succeeds(self): + """Incident's attachment is downloaded and displayed in the embed's image field.""" + file = MagicMock(discord.File, filename="bigbadjoe.jpg") + attachment = MockAttachment(filename="bigbadjoe.jpg") + incident = MockMessage(content="this is an incident", attachments=[attachment]) + + # Patch `download_file` to return our `file` + with patch("bot.exts.moderation.incidents.download_file", AsyncMock(return_value=file)): + embed, returned_file = await incidents.make_embed(incident, incidents.Signal.ACTIONED, MockMember()) + + self.assertIs(file, returned_file) + self.assertEqual("attachment://bigbadjoe.jpg", embed.image.url) + + async def test_make_embed_with_attachment_fails(self): + """Incident's attachment fails to download, proxy url is linked instead.""" + attachment = MockAttachment(proxy_url="discord.com/bigbadjoe.jpg") + incident = MockMessage(content="this is an incident", attachments=[attachment]) + + # Patch `download_file` to return None as if the download failed + with patch("bot.exts.moderation.incidents.download_file", AsyncMock(return_value=None)): + embed, returned_file = await incidents.make_embed(incident, incidents.Signal.ACTIONED, MockMember()) + + self.assertIsNone(returned_file) + + # The author name field is simply expected to have something in it, we do not assert the message + self.assertGreater(len(embed.author.name), 0) + self.assertEqual(embed.author.url, "discord.com/bigbadjoe.jpg") # However, it should link the exact url + + +@patch("bot.constants.Channels.incidents", 123) +class TestIsIncident(unittest.TestCase): + """ + Collection of tests for the `is_incident` helper function. + + In `setUp`, we will create a mock message which should qualify as an incident. Each + test case will then mutate this instance to make it **not** qualify, in various ways. + + Notice that we patch the #incidents channel id globally for this class. + """ + + def setUp(self) -> None: + """Prepare a mock message which should qualify as an incident.""" + self.incident = MockMessage( + channel=MockTextChannel(id=123), + content="this is an incident", + author=MockUser(bot=False), + pinned=False, + ) + + def test_is_incident_true(self): + """Message qualifies as an incident if unchanged.""" + self.assertTrue(incidents.is_incident(self.incident)) + + def check_false(self): + """Assert that `self.incident` does **not** qualify as an incident.""" + self.assertFalse(incidents.is_incident(self.incident)) + + def test_is_incident_false_channel(self): + """Message doesn't qualify if sent outside of #incidents.""" + self.incident.channel = MockTextChannel(id=456) + self.check_false() + + def test_is_incident_false_content(self): + """Message doesn't qualify if content begins with hash symbol.""" + self.incident.content = "# this is a comment message" + self.check_false() + + def test_is_incident_false_author(self): + """Message doesn't qualify if author is a bot.""" + self.incident.author = MockUser(bot=True) + self.check_false() + + def test_is_incident_false_pinned(self): + """Message doesn't qualify if it is pinned.""" + self.incident.pinned = True + self.check_false() + + +class TestOwnReactions(unittest.TestCase): + """Assertions for the `own_reactions` function.""" + + def test_own_reactions(self): + """Only bot's own emoji are extracted from the input incident.""" + reactions = ( + MockReaction(emoji="A", me=True), + MockReaction(emoji="B", me=True), + MockReaction(emoji="C", me=False), + ) + message = MockMessage(reactions=reactions) + self.assertSetEqual(incidents.own_reactions(message), {"A", "B"}) + + +@patch("bot.exts.moderation.incidents.ALL_SIGNALS", {"A", "B"}) +class TestHasSignals(unittest.TestCase): + """ + Assertions for the `has_signals` function. + + We patch `ALL_SIGNALS` globally. Each test function then patches `own_reactions` + as appropriate. + """ + + def test_has_signals_true(self): + """True when `own_reactions` returns all emoji in `ALL_SIGNALS`.""" + message = MockMessage() + own_reactions = MagicMock(return_value={"A", "B"}) + + with patch("bot.exts.moderation.incidents.own_reactions", own_reactions): + self.assertTrue(incidents.has_signals(message)) + + def test_has_signals_false(self): + """False when `own_reactions` does not return all emoji in `ALL_SIGNALS`.""" + message = MockMessage() + own_reactions = MagicMock(return_value={"A", "C"}) + + with patch("bot.exts.moderation.incidents.own_reactions", own_reactions): + self.assertFalse(incidents.has_signals(message)) + + +@patch("bot.exts.moderation.incidents.Signal", MockSignal) +class TestAddSignals(unittest.IsolatedAsyncioTestCase): + """ + Assertions for the `add_signals` coroutine. + + These are all fairly similar and could go into a single test function, but I found the + patching & sub-testing fairly awkward in that case and decided to split them up + to avoid unnecessary syntax noise. + """ + + def setUp(self): + """Prepare a mock incident message for tests to use.""" + self.incident = MockMessage() + + @patch("bot.exts.moderation.incidents.own_reactions", MagicMock(return_value=set())) + async def test_add_signals_missing(self): + """All emoji are added when none are present.""" + await incidents.add_signals(self.incident) + self.incident.add_reaction.assert_has_calls([call("A"), call("B")]) + + @patch("bot.exts.moderation.incidents.own_reactions", MagicMock(return_value={"A"})) + async def test_add_signals_partial(self): + """Only missing emoji are added when some are present.""" + await incidents.add_signals(self.incident) + self.incident.add_reaction.assert_has_calls([call("B")]) + + @patch("bot.exts.moderation.incidents.own_reactions", MagicMock(return_value={"A", "B"})) + async def test_add_signals_present(self): + """No emoji are added when all are present.""" + await incidents.add_signals(self.incident) + self.incident.add_reaction.assert_not_called() + + +class TestIncidents(unittest.IsolatedAsyncioTestCase): + """ + Tests for bound methods of the `Incidents` cog. + + Use this as a base class for `Incidents` tests - it will prepare a fresh instance + for each test function, but not make any assertions on its own. Tests can mutate + the instance as they wish. + """ + + def setUp(self): + """ + Prepare a fresh `Incidents` instance for each test. + + Note that this will not schedule `crawl_incidents` in the background, as everything + is being mocked. The `crawl_task` attribute will end up being None. + """ + self.cog_instance = incidents.Incidents(MockBot()) + + +@patch("asyncio.sleep", AsyncMock()) # Prevent the coro from sleeping to speed up the test +class TestCrawlIncidents(TestIncidents): + """ + Tests for the `Incidents.crawl_incidents` coroutine. + + Apart from `test_crawl_incidents_waits_until_cache_ready`, all tests in this class + will patch the return values of `is_incident` and `has_signal` and then observe + whether the `AsyncMock` for `add_signals` was awaited or not. + + The `add_signals` mock is added by each test separately to ensure it is clean (has not + been awaited by another test yet). The mock can be reset, but this appears to be the + cleaner way. + + For each test, we inject a mock channel with a history of 1 message only (see: `setUp`). + """ + + def setUp(self): + """For each test, ensure `bot.get_channel` returns a channel with 1 arbitrary message.""" + super().setUp() # First ensure we get `cog_instance` from parent + + incidents_history = MagicMock(return_value=MockAsyncIterable([MockMessage()])) + self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(history=incidents_history)) + + async def test_crawl_incidents_waits_until_cache_ready(self): + """ + The coroutine will await the `wait_until_guild_available` event. + + Since this task is schedule in the `__init__`, it is critical that it waits for the + cache to be ready, so that it can safely get the #incidents channel. + """ + await self.cog_instance.crawl_incidents() + self.cog_instance.bot.wait_until_guild_available.assert_awaited() + + @patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) + @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=False)) # Message doesn't qualify + @patch("bot.exts.moderation.incidents.has_signals", MagicMock(return_value=False)) + async def test_crawl_incidents_noop_if_is_not_incident(self): + """Signals are not added for a non-incident message.""" + await self.cog_instance.crawl_incidents() + incidents.add_signals.assert_not_awaited() + + @patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) + @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)) # Message qualifies + @patch("bot.exts.moderation.incidents.has_signals", MagicMock(return_value=True)) # But already has signals + async def test_crawl_incidents_noop_if_message_already_has_signals(self): + """Signals are not added for messages which already have them.""" + await self.cog_instance.crawl_incidents() + incidents.add_signals.assert_not_awaited() + + @patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) + @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)) # Message qualifies + @patch("bot.exts.moderation.incidents.has_signals", MagicMock(return_value=False)) # And doesn't have signals + async def test_crawl_incidents_add_signals_called(self): + """Message has signals added as it does not have them yet and qualifies as an incident.""" + await self.cog_instance.crawl_incidents() + incidents.add_signals.assert_awaited_once() + + +class TestArchive(TestIncidents): + """Tests for the `Incidents.archive` coroutine.""" + + async def test_archive_webhook_not_found(self): + """ + Method recovers and returns False when the webhook is not found. + + Implicitly, this also tests that the error is handled internally and doesn't + propagate out of the method, which is just as important. + """ + self.cog_instance.bot.fetch_webhook = AsyncMock(side_effect=mock_404) + self.assertFalse( + await self.cog_instance.archive(incident=MockMessage(), outcome=MagicMock(), actioned_by=MockMember()) + ) + + async def test_archive_relays_incident(self): + """ + If webhook is found, method relays `incident` properly. + + This test will assert that the fetched webhook's `send` method is fed the correct arguments, + and that the `archive` method returns True. + """ + webhook = MockAsyncWebhook() + self.cog_instance.bot.fetch_webhook = AsyncMock(return_value=webhook) # Patch in our webhook + + # Define our own `incident` to be archived + incident = MockMessage( + content="this is an incident", + author=MockUser(name="author_name", avatar_url="author_avatar"), + id=123, + ) + built_embed = MagicMock(discord.Embed, id=123) # We patch `make_embed` to return this + + with patch("bot.exts.moderation.incidents.make_embed", AsyncMock(return_value=(built_embed, None))): + archive_return = await self.cog_instance.archive(incident, MagicMock(value="A"), MockMember()) + + # Now we check that the webhook was given the correct args, and that `archive` returned True + webhook.send.assert_called_once_with( + embed=built_embed, + username="author_name", + avatar_url="author_avatar", + file=None, + ) + self.assertTrue(archive_return) + + async def test_archive_clyde_username(self): + """ + The archive webhook username is cleansed using `sub_clyde`. + + Discord will reject any webhook with "clyde" in the username field, as it impersonates + the official Clyde bot. Since we do not control what the username will be (the incident + author name is used), we must ensure the name is cleansed, otherwise the relay may fail. + + This test assumes the username is passed as a kwarg. If this test fails, please review + whether the passed argument is being retrieved correctly. + """ + webhook = MockAsyncWebhook() + self.cog_instance.bot.fetch_webhook = AsyncMock(return_value=webhook) + + message_from_clyde = MockMessage(author=MockUser(name="clyde the great")) + await self.cog_instance.archive(message_from_clyde, MagicMock(incidents.Signal), MockMember()) + + self.assertNotIn("clyde", webhook.send.call_args.kwargs["username"]) + + +class TestMakeConfirmationTask(TestIncidents): + """ + Tests for the `Incidents.make_confirmation_task` method. + + Writing tests for this method is difficult, as it mostly just delegates the provided + information elsewhere. There is very little internal logic. Whether our approach + works conceptually is difficult to prove using unit tests. + """ + + def test_make_confirmation_task_check(self): + """ + The internal check will recognize the passed incident. + + This is a little tricky - we first pass a message with a specific `id` in, and then + retrieve the built check from the `call_args` of the `wait_for` method. This relies + on the check being passed as a kwarg. + + Once the check is retrieved, we assert that it gives True for our incident's `id`, + and False for any other. + + If this function begins to fail, first check that `created_check` is being retrieved + correctly. It should be the function that is built locally in the tested method. + """ + self.cog_instance.make_confirmation_task(MockMessage(id=123)) + + self.cog_instance.bot.wait_for.assert_called_once() + created_check = self.cog_instance.bot.wait_for.call_args.kwargs["check"] + + # The `message_id` matches the `id` of our incident + self.assertTrue(created_check(payload=MagicMock(message_id=123))) + + # This `message_id` does not match + self.assertFalse(created_check(payload=MagicMock(message_id=0))) + + +@patch("bot.exts.moderation.incidents.ALLOWED_ROLES", {1, 2}) +@patch("bot.exts.moderation.incidents.Incidents.make_confirmation_task", AsyncMock()) # Generic awaitable +class TestProcessEvent(TestIncidents): + """Tests for the `Incidents.process_event` coroutine.""" + + async def test_process_event_bad_role(self): + """The reaction is removed when the author lacks all allowed roles.""" + incident = MockMessage() + member = MockMember(roles=[MockRole(id=0)]) # Must have role 1 or 2 + + await self.cog_instance.process_event("reaction", incident, member) + incident.remove_reaction.assert_called_once_with("reaction", member) + + async def test_process_event_bad_emoji(self): + """ + The reaction is removed when an invalid emoji is used. + + This requires that we pass in a `member` with valid roles, as we need the role check + to succeed. + """ + incident = MockMessage() + member = MockMember(roles=[MockRole(id=1)]) # Member has allowed role + + await self.cog_instance.process_event("invalid_signal", incident, member) + incident.remove_reaction.assert_called_once_with("invalid_signal", member) + + async def test_process_event_no_archive_on_investigating(self): + """Message is not archived on `Signal.INVESTIGATING`.""" + with patch("bot.exts.moderation.incidents.Incidents.archive", AsyncMock()) as mocked_archive: + await self.cog_instance.process_event( + reaction=incidents.Signal.INVESTIGATING.value, + incident=MockMessage(), + member=MockMember(roles=[MockRole(id=1)]), + ) + + mocked_archive.assert_not_called() + + async def test_process_event_no_delete_if_archive_fails(self): + """ + Original message is not deleted when `Incidents.archive` returns False. + + This is the way of signaling that the relay failed, and we should not remove the original, + as that would result in losing the incident record. + """ + incident = MockMessage() + + with patch("bot.exts.moderation.incidents.Incidents.archive", AsyncMock(return_value=False)): + await self.cog_instance.process_event( + reaction=incidents.Signal.ACTIONED.value, + incident=incident, + member=MockMember(roles=[MockRole(id=1)]) + ) + + incident.delete.assert_not_called() + + async def test_process_event_confirmation_task_is_awaited(self): + """Task given by `Incidents.make_confirmation_task` is awaited before method exits.""" + mock_task = AsyncMock() + + with patch("bot.exts.moderation.incidents.Incidents.make_confirmation_task", mock_task): + await self.cog_instance.process_event( + reaction=incidents.Signal.ACTIONED.value, + incident=MockMessage(), + member=MockMember(roles=[MockRole(id=1)]) + ) + + mock_task.assert_awaited() + + async def test_process_event_confirmation_task_timeout_is_handled(self): + """ + Confirmation task `asyncio.TimeoutError` is handled gracefully. + + We have `make_confirmation_task` return a mock with a side effect, and then catch the + exception should it propagate out of `process_event`. This is so that we can then manually + fail the test with a more informative message than just the plain traceback. + """ + mock_task = AsyncMock(side_effect=asyncio.TimeoutError()) + + try: + with patch("bot.exts.moderation.incidents.Incidents.make_confirmation_task", mock_task): + await self.cog_instance.process_event( + reaction=incidents.Signal.ACTIONED.value, + incident=MockMessage(), + member=MockMember(roles=[MockRole(id=1)]) + ) + except asyncio.TimeoutError: + self.fail("TimeoutError was not handled gracefully, and propagated out of `process_event`!") + + +class TestResolveMessage(TestIncidents): + """Tests for the `Incidents.resolve_message` coroutine.""" + + async def test_resolve_message_pass_message_id(self): + """Method will call `_get_message` with the passed `message_id`.""" + await self.cog_instance.resolve_message(123) + self.cog_instance.bot._connection._get_message.assert_called_once_with(123) + + async def test_resolve_message_in_cache(self): + """ + No API call is made if the queried message exists in the cache. + + We mock the `_get_message` return value regardless of input. Whether it finds the message + internally is considered d.py's responsibility, not ours. + """ + cached_message = MockMessage(id=123) + self.cog_instance.bot._connection._get_message = MagicMock(return_value=cached_message) + + return_value = await self.cog_instance.resolve_message(123) + + self.assertIs(return_value, cached_message) + self.cog_instance.bot.get_channel.assert_not_called() # The `fetch_message` line was never hit + + async def test_resolve_message_not_in_cache(self): + """ + The message is retrieved from the API if it isn't cached. + + This is desired behaviour for messages which exist, but were sent before the bot's + current session. + """ + self.cog_instance.bot._connection._get_message = MagicMock(return_value=None) # Cache returns None + + # API returns our message + uncached_message = MockMessage() + fetch_message = AsyncMock(return_value=uncached_message) + self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(fetch_message=fetch_message)) + + retrieved_message = await self.cog_instance.resolve_message(123) + self.assertIs(retrieved_message, uncached_message) + + async def test_resolve_message_doesnt_exist(self): + """ + If the API returns a 404, the function handles it gracefully and returns None. + + This is an edge-case happening with racing events - event A will relay the message + to the archive and delete the original. Once event B acquires the `event_lock`, + it will not find the message in the cache, and will ask the API. + """ + self.cog_instance.bot._connection._get_message = MagicMock(return_value=None) # Cache returns None + + fetch_message = AsyncMock(side_effect=mock_404) + self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(fetch_message=fetch_message)) + + self.assertIsNone(await self.cog_instance.resolve_message(123)) + + async def test_resolve_message_fetch_fails(self): + """ + Non-404 errors are handled, logged & None is returned. + + In contrast with a 404, this should make an error-level log. We assert that at least + one such log was made - we do not make any assertions about the log's message. + """ + self.cog_instance.bot._connection._get_message = MagicMock(return_value=None) # Cache returns None + + arbitrary_error = discord.HTTPException( + response=MagicMock(aiohttp.ClientResponse), + message="Arbitrary error", + ) + fetch_message = AsyncMock(side_effect=arbitrary_error) + self.cog_instance.bot.get_channel = MagicMock(return_value=MockTextChannel(fetch_message=fetch_message)) + + with self.assertLogs(logger=incidents.log, level=logging.ERROR): + self.assertIsNone(await self.cog_instance.resolve_message(123)) + + +@patch("bot.constants.Channels.incidents", 123) +class TestOnRawReactionAdd(TestIncidents): + """ + Tests for the `Incidents.on_raw_reaction_add` listener. + + Writing tests for this listener comes with additional complexity due to the listener + awaiting the `crawl_task` task. See `asyncSetUp` for further details, which attempts + to make unit testing this function possible. + """ + + def setUp(self): + """ + Prepare & assign `payload` attribute. + + This attribute represents an *ideal* payload which will not be rejected by the + listener. As each test will receive a fresh instance, it can be mutated to + observe how the listener's behaviour changes with different attributes on + the passed payload. + """ + super().setUp() # Ensure `cog_instance` is assigned + + self.payload = MagicMock( + discord.RawReactionActionEvent, + channel_id=123, # Patched at class level + message_id=456, + member=MockMember(bot=False), + emoji="reaction", + ) + + async def asyncSetUp(self): # noqa: N802 + """ + Prepare an empty task and assign it as `crawl_task`. + + It appears that the `unittest` framework does not provide anything for mocking + asyncio tasks. An `AsyncMock` instance can be called and then awaited, however, + it does not provide the `done` method or any other parts of the `asyncio.Task` + interface. + + Although we do not need to make any assertions about the task itself while + testing the listener, the code will still await it and call the `done` method, + and so we must inject something that will not fail on either action. + + Note that this is done in an `asyncSetUp`, which runs after `setUp`. + The justification is that creating an actual task requires the event + loop to be ready, which is not the case in the `setUp`. + """ + mock_task = asyncio.create_task(AsyncMock()()) # Mock async func, then a coro + self.cog_instance.crawl_task = mock_task + + async def test_on_raw_reaction_add_wrong_channel(self): + """ + Events outside of #incidents will be ignored. + + We check this by asserting that `resolve_message` was never queried. + """ + self.payload.channel_id = 0 + self.cog_instance.resolve_message = AsyncMock() + + await self.cog_instance.on_raw_reaction_add(self.payload) + self.cog_instance.resolve_message.assert_not_called() + + async def test_on_raw_reaction_add_user_is_bot(self): + """ + Events dispatched by bot accounts will be ignored. + + We check this by asserting that `resolve_message` was never queried. + """ + self.payload.member = MockMember(bot=True) + self.cog_instance.resolve_message = AsyncMock() + + await self.cog_instance.on_raw_reaction_add(self.payload) + self.cog_instance.resolve_message.assert_not_called() + + async def test_on_raw_reaction_add_message_doesnt_exist(self): + """ + Listener gracefully handles the case where `resolve_message` gives None. + + We check this by asserting that `process_event` was never called. + """ + self.cog_instance.process_event = AsyncMock() + self.cog_instance.resolve_message = AsyncMock(return_value=None) + + await self.cog_instance.on_raw_reaction_add(self.payload) + self.cog_instance.process_event.assert_not_called() + + async def test_on_raw_reaction_add_message_is_not_an_incident(self): + """ + The event won't be processed if the related message is not an incident. + + This is an edge-case that can happen if someone manually leaves a reaction + on a pinned message, or a comment. + + We check this by asserting that `process_event` was never called. + """ + self.cog_instance.process_event = AsyncMock() + self.cog_instance.resolve_message = AsyncMock(return_value=MockMessage()) + + with patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=False)): + await self.cog_instance.on_raw_reaction_add(self.payload) + + self.cog_instance.process_event.assert_not_called() + + async def test_on_raw_reaction_add_valid_event_is_processed(self): + """ + If the reaction event is valid, it is passed to `process_event`. + + This is the case when everything goes right: + * The reaction was placed in #incidents, and not by a bot + * The message was found successfully + * The message qualifies as an incident + + Additionally, we check that all arguments were passed as expected. + """ + incident = MockMessage(id=1) + + self.cog_instance.process_event = AsyncMock() + self.cog_instance.resolve_message = AsyncMock(return_value=incident) + + with patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)): + await self.cog_instance.on_raw_reaction_add(self.payload) + + self.cog_instance.process_event.assert_called_with( + "reaction", # Defined in `self.payload` + incident, + self.payload.member, + ) + + +class TestOnMessage(TestIncidents): + """ + Tests for the `Incidents.on_message` listener. + + Notice the decorators mocking the `is_incident` return value. The `is_incidents` + function is tested in `TestIsIncident` - here we do not worry about it. + """ + + @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=True)) + async def test_on_message_incident(self): + """Messages qualifying as incidents are passed to `add_signals`.""" + incident = MockMessage() + + with patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) as mock_add_signals: + await self.cog_instance.on_message(incident) + + mock_add_signals.assert_called_once_with(incident) + + @patch("bot.exts.moderation.incidents.is_incident", MagicMock(return_value=False)) + async def test_on_message_non_incident(self): + """Messages not qualifying as incidents are ignored.""" + with patch("bot.exts.moderation.incidents.add_signals", AsyncMock()) as mock_add_signals: + await self.cog_instance.on_message(MockMessage()) + + mock_add_signals.assert_not_called() diff --git a/tests/bot/cogs/moderation/test_modlog.py b/tests/bot/exts/moderation/test_modlog.py index f2809f40a..f8f142484 100644 --- a/tests/bot/cogs/moderation/test_modlog.py +++ b/tests/bot/exts/moderation/test_modlog.py @@ -2,7 +2,7 @@ import unittest import discord -from bot.cogs.moderation.modlog import ModLog +from bot.exts.moderation.modlog import ModLog from tests.helpers import MockBot, MockTextChannel diff --git a/tests/bot/exts/moderation/test_silence.py b/tests/bot/exts/moderation/test_silence.py new file mode 100644 index 000000000..104293d8e --- /dev/null +++ b/tests/bot/exts/moderation/test_silence.py @@ -0,0 +1,502 @@ +import asyncio +import unittest +from datetime import datetime, timezone +from unittest import mock +from unittest.mock import Mock + +from async_rediscache import RedisSession +from discord import PermissionOverwrite + +from bot.constants import Channels, Guild, Roles +from bot.exts.moderation import silence +from tests.helpers import MockBot, MockContext, MockTextChannel, autospec + +redis_session = None +redis_loop = asyncio.get_event_loop() + + +def setUpModule(): # noqa: N802 + """Create and connect to the fakeredis session.""" + global redis_session + redis_session = RedisSession(use_fakeredis=True) + redis_loop.run_until_complete(redis_session.connect()) + + +def tearDownModule(): # noqa: N802 + """Close the fakeredis session.""" + if redis_session: + redis_loop.run_until_complete(redis_session.close()) + + +# Have to subclass it because builtins can't be patched. +class PatchedDatetime(datetime): + """A datetime object with a mocked now() function.""" + + now = mock.create_autospec(datetime, "now") + + +class SilenceNotifierTests(unittest.IsolatedAsyncioTestCase): + def setUp(self) -> None: + self.alert_channel = MockTextChannel() + self.notifier = silence.SilenceNotifier(self.alert_channel) + self.notifier.stop = self.notifier_stop_mock = Mock() + self.notifier.start = self.notifier_start_mock = Mock() + + def test_add_channel_adds_channel(self): + """Channel is added to `_silenced_channels` with the current loop.""" + channel = Mock() + with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels: + self.notifier.add_channel(channel) + silenced_channels.__setitem__.assert_called_with(channel, self.notifier._current_loop) + + def test_add_channel_starts_loop(self): + """Loop is started if `_silenced_channels` was empty.""" + self.notifier.add_channel(Mock()) + self.notifier_start_mock.assert_called_once() + + def test_add_channel_skips_start_with_channels(self): + """Loop start is not called when `_silenced_channels` is not empty.""" + with mock.patch.object(self.notifier, "_silenced_channels"): + self.notifier.add_channel(Mock()) + self.notifier_start_mock.assert_not_called() + + def test_remove_channel_removes_channel(self): + """Channel is removed from `_silenced_channels`.""" + channel = Mock() + with mock.patch.object(self.notifier, "_silenced_channels") as silenced_channels: + self.notifier.remove_channel(channel) + silenced_channels.__delitem__.assert_called_with(channel) + + def test_remove_channel_stops_loop(self): + """Notifier loop is stopped if `_silenced_channels` is empty after remove.""" + with mock.patch.object(self.notifier, "_silenced_channels", __bool__=lambda _: False): + self.notifier.remove_channel(Mock()) + self.notifier_stop_mock.assert_called_once() + + def test_remove_channel_skips_stop_with_channels(self): + """Notifier loop is not stopped if `_silenced_channels` is not empty after remove.""" + self.notifier.remove_channel(Mock()) + self.notifier_stop_mock.assert_not_called() + + async def test_notifier_private_sends_alert(self): + """Alert is sent on 15 min intervals.""" + test_cases = (900, 1800, 2700) + for current_loop in test_cases: + with self.subTest(current_loop=current_loop): + with mock.patch.object(self.notifier, "_current_loop", new=current_loop): + await self.notifier._notifier() + self.alert_channel.send.assert_called_once_with( + f"<@&{Roles.moderators}> currently silenced channels: " + ) + self.alert_channel.send.reset_mock() + + async def test_notifier_skips_alert(self): + """Alert is skipped on first loop or not an increment of 900.""" + test_cases = (0, 15, 5000) + for current_loop in test_cases: + with self.subTest(current_loop=current_loop): + with mock.patch.object(self.notifier, "_current_loop", new=current_loop): + await self.notifier._notifier() + self.alert_channel.send.assert_not_called() + + +@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False) +class SilenceCogTests(unittest.IsolatedAsyncioTestCase): + """Tests for the general functionality of the Silence cog.""" + + @autospec(silence, "Scheduler", pass_mocks=False) + def setUp(self) -> None: + self.bot = MockBot() + self.cog = silence.Silence(self.bot) + + @autospec(silence, "SilenceNotifier", pass_mocks=False) + async def test_async_init_got_guild(self): + """Bot got guild after it became available.""" + await self.cog._async_init() + self.bot.wait_until_guild_available.assert_awaited_once() + self.bot.get_guild.assert_called_once_with(Guild.id) + + @autospec(silence, "SilenceNotifier", pass_mocks=False) + async def test_async_init_got_role(self): + """Got `Roles.verified` role from guild.""" + guild = self.bot.get_guild() + guild.get_role.side_effect = lambda id_: Mock(id=id_) + + await self.cog._async_init() + self.assertEqual(self.cog._verified_role.id, Roles.verified) + + @autospec(silence, "SilenceNotifier", pass_mocks=False) + async def test_async_init_got_channels(self): + """Got channels from bot.""" + self.bot.get_channel.side_effect = lambda id_: MockTextChannel(id=id_) + + await self.cog._async_init() + self.assertEqual(self.cog._mod_alerts_channel.id, Channels.mod_alerts) + + @autospec(silence, "SilenceNotifier") + async def test_async_init_got_notifier(self, notifier): + """Notifier was started with channel.""" + self.bot.get_channel.side_effect = lambda id_: MockTextChannel(id=id_) + + await self.cog._async_init() + notifier.assert_called_once_with(MockTextChannel(id=Channels.mod_log)) + self.assertEqual(self.cog.notifier, notifier.return_value) + + @autospec(silence, "SilenceNotifier", pass_mocks=False) + async def test_async_init_rescheduled(self): + """`_reschedule_` coroutine was awaited.""" + self.cog._reschedule = mock.create_autospec(self.cog._reschedule) + await self.cog._async_init() + self.cog._reschedule.assert_awaited_once_with() + + def test_cog_unload_cancelled_tasks(self): + """The init task was cancelled.""" + self.cog._init_task = asyncio.Future() + self.cog.cog_unload() + + # It's too annoying to test cancel_all since it's a done callback and wrapped in a lambda. + self.assertTrue(self.cog._init_task.cancelled()) + + @autospec("discord.ext.commands", "has_any_role") + @mock.patch.object(silence, "MODERATION_ROLES", new=(1, 2, 3)) + async def test_cog_check(self, role_check): + """Role check was called with `MODERATION_ROLES`""" + ctx = MockContext() + role_check.return_value.predicate = mock.AsyncMock() + + await self.cog.cog_check(ctx) + role_check.assert_called_once_with(*(1, 2, 3)) + role_check.return_value.predicate.assert_awaited_once_with(ctx) + + +@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False) +class RescheduleTests(unittest.IsolatedAsyncioTestCase): + """Tests for the rescheduling of cached unsilences.""" + + @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False) + def setUp(self): + self.bot = MockBot() + self.cog = silence.Silence(self.bot) + self.cog._unsilence_wrapper = mock.create_autospec(self.cog._unsilence_wrapper) + + with mock.patch.object(self.cog, "_reschedule", autospec=True): + asyncio.run(self.cog._async_init()) # Populate instance attributes. + + async def test_skipped_missing_channel(self): + """Did nothing because the channel couldn't be retrieved.""" + self.cog.unsilence_timestamps.items.return_value = [(123, -1), (123, 1), (123, 10000000000)] + self.bot.get_channel.return_value = None + + await self.cog._reschedule() + + self.cog.notifier.add_channel.assert_not_called() + self.cog._unsilence_wrapper.assert_not_called() + self.cog.scheduler.schedule_later.assert_not_called() + + async def test_added_permanent_to_notifier(self): + """Permanently silenced channels were added to the notifier.""" + channels = [MockTextChannel(id=123), MockTextChannel(id=456)] + self.bot.get_channel.side_effect = channels + self.cog.unsilence_timestamps.items.return_value = [(123, -1), (456, -1)] + + await self.cog._reschedule() + + self.cog.notifier.add_channel.assert_any_call(channels[0]) + self.cog.notifier.add_channel.assert_any_call(channels[1]) + + self.cog._unsilence_wrapper.assert_not_called() + self.cog.scheduler.schedule_later.assert_not_called() + + async def test_unsilenced_expired(self): + """Unsilenced expired silences.""" + channels = [MockTextChannel(id=123), MockTextChannel(id=456)] + self.bot.get_channel.side_effect = channels + self.cog.unsilence_timestamps.items.return_value = [(123, 100), (456, 200)] + + await self.cog._reschedule() + + self.cog._unsilence_wrapper.assert_any_call(channels[0]) + self.cog._unsilence_wrapper.assert_any_call(channels[1]) + + self.cog.notifier.add_channel.assert_not_called() + self.cog.scheduler.schedule_later.assert_not_called() + + @mock.patch.object(silence, "datetime", new=PatchedDatetime) + async def test_rescheduled_active(self): + """Rescheduled active silences.""" + channels = [MockTextChannel(id=123), MockTextChannel(id=456)] + self.bot.get_channel.side_effect = channels + self.cog.unsilence_timestamps.items.return_value = [(123, 2000), (456, 3000)] + silence.datetime.now.return_value = datetime.fromtimestamp(1000, tz=timezone.utc) + + self.cog._unsilence_wrapper = mock.MagicMock() + unsilence_return = self.cog._unsilence_wrapper.return_value + + await self.cog._reschedule() + + # Yuck. + calls = [mock.call(1000, 123, unsilence_return), mock.call(2000, 456, unsilence_return)] + self.cog.scheduler.schedule_later.assert_has_calls(calls) + + unsilence_calls = [mock.call(channel) for channel in channels] + self.cog._unsilence_wrapper.assert_has_calls(unsilence_calls) + + self.cog.notifier.add_channel.assert_not_called() + + +@autospec(silence.Silence, "previous_overwrites", "unsilence_timestamps", pass_mocks=False) +class SilenceTests(unittest.IsolatedAsyncioTestCase): + """Tests for the silence command and its related helper methods.""" + + @autospec(silence.Silence, "_reschedule", pass_mocks=False) + @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False) + def setUp(self) -> None: + self.bot = MockBot() + self.cog = silence.Silence(self.bot) + self.cog._init_task = asyncio.Future() + self.cog._init_task.set_result(None) + + # Avoid unawaited coroutine warnings. + self.cog.scheduler.schedule_later.side_effect = lambda delay, task_id, coro: coro.close() + + asyncio.run(self.cog._async_init()) # Populate instance attributes. + + self.channel = MockTextChannel() + self.overwrite = PermissionOverwrite(stream=True, send_messages=True, add_reactions=False) + self.channel.overwrites_for.return_value = self.overwrite + + async def test_sent_correct_message(self): + """Appropriate failure/success message was sent by the command.""" + test_cases = ( + (0.0001, silence.MSG_SILENCE_SUCCESS.format(duration=0.0001), True,), + (None, silence.MSG_SILENCE_PERMANENT, True,), + (5, silence.MSG_SILENCE_FAIL, False,), + ) + for duration, message, was_silenced in test_cases: + ctx = MockContext() + with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=was_silenced): + with self.subTest(was_silenced=was_silenced, message=message, duration=duration): + await self.cog.silence.callback(self.cog, ctx, duration) + ctx.send.assert_called_once_with(message) + + async def test_skipped_already_silenced(self): + """Permissions were not set and `False` was returned for an already silenced channel.""" + subtests = ( + (False, PermissionOverwrite(send_messages=False, add_reactions=False)), + (True, PermissionOverwrite(send_messages=True, add_reactions=True)), + (True, PermissionOverwrite(send_messages=False, add_reactions=False)), + ) + + for contains, overwrite in subtests: + with self.subTest(contains=contains, overwrite=overwrite): + self.cog.scheduler.__contains__.return_value = contains + channel = MockTextChannel() + channel.overwrites_for.return_value = overwrite + + self.assertFalse(await self.cog._set_silence_overwrites(channel)) + channel.set_permissions.assert_not_called() + + async def test_silenced_channel(self): + """Channel had `send_message` and `add_reactions` permissions revoked for verified role.""" + self.assertTrue(await self.cog._set_silence_overwrites(self.channel)) + self.assertFalse(self.overwrite.send_messages) + self.assertFalse(self.overwrite.add_reactions) + self.channel.set_permissions.assert_awaited_once_with( + self.cog._verified_role, + overwrite=self.overwrite + ) + + async def test_preserved_other_overwrites(self): + """Channel's other unrelated overwrites were not changed.""" + prev_overwrite_dict = dict(self.overwrite) + await self.cog._set_silence_overwrites(self.channel) + new_overwrite_dict = dict(self.overwrite) + + # Remove 'send_messages' & 'add_reactions' keys because they were changed by the method. + del prev_overwrite_dict['send_messages'] + del prev_overwrite_dict['add_reactions'] + del new_overwrite_dict['send_messages'] + del new_overwrite_dict['add_reactions'] + + self.assertDictEqual(prev_overwrite_dict, new_overwrite_dict) + + async def test_temp_not_added_to_notifier(self): + """Channel was not added to notifier if a duration was set for the silence.""" + with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True): + await self.cog.silence.callback(self.cog, MockContext(), 15) + self.cog.notifier.add_channel.assert_not_called() + + async def test_indefinite_added_to_notifier(self): + """Channel was added to notifier if a duration was not set for the silence.""" + with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=True): + await self.cog.silence.callback(self.cog, MockContext(), None) + self.cog.notifier.add_channel.assert_called_once() + + async def test_silenced_not_added_to_notifier(self): + """Channel was not added to the notifier if it was already silenced.""" + with mock.patch.object(self.cog, "_set_silence_overwrites", return_value=False): + await self.cog.silence.callback(self.cog, MockContext(), 15) + self.cog.notifier.add_channel.assert_not_called() + + async def test_cached_previous_overwrites(self): + """Channel's previous overwrites were cached.""" + overwrite_json = '{"send_messages": true, "add_reactions": false}' + await self.cog._set_silence_overwrites(self.channel) + self.cog.previous_overwrites.set.assert_called_once_with(self.channel.id, overwrite_json) + + @autospec(silence, "datetime") + async def test_cached_unsilence_time(self, datetime_mock): + """The UTC POSIX timestamp for the unsilence was cached.""" + now_timestamp = 100 + duration = 15 + timestamp = now_timestamp + duration * 60 + datetime_mock.now.return_value = datetime.fromtimestamp(now_timestamp, tz=timezone.utc) + + ctx = MockContext(channel=self.channel) + await self.cog.silence.callback(self.cog, ctx, duration) + + self.cog.unsilence_timestamps.set.assert_awaited_once_with(ctx.channel.id, timestamp) + datetime_mock.now.assert_called_once_with(tz=timezone.utc) # Ensure it's using an aware dt. + + async def test_cached_indefinite_time(self): + """A value of -1 was cached for a permanent silence.""" + ctx = MockContext(channel=self.channel) + await self.cog.silence.callback(self.cog, ctx, None) + self.cog.unsilence_timestamps.set.assert_awaited_once_with(ctx.channel.id, -1) + + async def test_scheduled_task(self): + """An unsilence task was scheduled.""" + ctx = MockContext(channel=self.channel, invoke=mock.MagicMock()) + + await self.cog.silence.callback(self.cog, ctx, 5) + + args = (300, ctx.channel.id, ctx.invoke.return_value) + self.cog.scheduler.schedule_later.assert_called_once_with(*args) + ctx.invoke.assert_called_once_with(self.cog.unsilence) + + async def test_permanent_not_scheduled(self): + """A task was not scheduled for a permanent silence.""" + ctx = MockContext(channel=self.channel) + await self.cog.silence.callback(self.cog, ctx, None) + self.cog.scheduler.schedule_later.assert_not_called() + + +@autospec(silence.Silence, "unsilence_timestamps", pass_mocks=False) +class UnsilenceTests(unittest.IsolatedAsyncioTestCase): + """Tests for the unsilence command and its related helper methods.""" + + @autospec(silence.Silence, "_reschedule", pass_mocks=False) + @autospec(silence, "Scheduler", "SilenceNotifier", pass_mocks=False) + def setUp(self) -> None: + self.bot = MockBot(get_channel=lambda _: MockTextChannel()) + self.cog = silence.Silence(self.bot) + self.cog._init_task = asyncio.Future() + self.cog._init_task.set_result(None) + + overwrites_cache = mock.create_autospec(self.cog.previous_overwrites, spec_set=True) + self.cog.previous_overwrites = overwrites_cache + + asyncio.run(self.cog._async_init()) # Populate instance attributes. + + self.cog.scheduler.__contains__.return_value = True + overwrites_cache.get.return_value = '{"send_messages": true, "add_reactions": false}' + self.channel = MockTextChannel() + self.overwrite = PermissionOverwrite(stream=True, send_messages=False, add_reactions=False) + self.channel.overwrites_for.return_value = self.overwrite + + async def test_sent_correct_message(self): + """Appropriate failure/success message was sent by the command.""" + unsilenced_overwrite = PermissionOverwrite(send_messages=True, add_reactions=True) + test_cases = ( + (True, silence.MSG_UNSILENCE_SUCCESS, unsilenced_overwrite), + (False, silence.MSG_UNSILENCE_FAIL, unsilenced_overwrite), + (False, silence.MSG_UNSILENCE_MANUAL, self.overwrite), + (False, silence.MSG_UNSILENCE_MANUAL, PermissionOverwrite(send_messages=False)), + (False, silence.MSG_UNSILENCE_MANUAL, PermissionOverwrite(add_reactions=False)), + ) + for was_unsilenced, message, overwrite in test_cases: + ctx = MockContext() + with self.subTest(was_unsilenced=was_unsilenced, message=message, overwrite=overwrite): + with mock.patch.object(self.cog, "_unsilence", return_value=was_unsilenced): + ctx.channel.overwrites_for.return_value = overwrite + await self.cog.unsilence.callback(self.cog, ctx) + ctx.channel.send.assert_called_once_with(message) + + async def test_skipped_already_unsilenced(self): + """Permissions were not set and `False` was returned for an already unsilenced channel.""" + self.cog.scheduler.__contains__.return_value = False + self.cog.previous_overwrites.get.return_value = None + channel = MockTextChannel() + + self.assertFalse(await self.cog._unsilence(channel)) + channel.set_permissions.assert_not_called() + + async def test_restored_overwrites(self): + """Channel's `send_message` and `add_reactions` overwrites were restored.""" + await self.cog._unsilence(self.channel) + self.channel.set_permissions.assert_awaited_once_with( + self.cog._verified_role, + overwrite=self.overwrite, + ) + + # Recall that these values are determined by the fixture. + self.assertTrue(self.overwrite.send_messages) + self.assertFalse(self.overwrite.add_reactions) + + async def test_cache_miss_used_default_overwrites(self): + """Both overwrites were set to None due previous values not being found in the cache.""" + self.cog.previous_overwrites.get.return_value = None + + await self.cog._unsilence(self.channel) + self.channel.set_permissions.assert_awaited_once_with( + self.cog._verified_role, + overwrite=self.overwrite, + ) + + self.assertIsNone(self.overwrite.send_messages) + self.assertIsNone(self.overwrite.add_reactions) + + async def test_cache_miss_sent_mod_alert(self): + """A message was sent to the mod alerts channel.""" + self.cog.previous_overwrites.get.return_value = None + + await self.cog._unsilence(self.channel) + self.cog._mod_alerts_channel.send.assert_awaited_once() + + async def test_removed_notifier(self): + """Channel was removed from `notifier`.""" + await self.cog._unsilence(self.channel) + self.cog.notifier.remove_channel.assert_called_once_with(self.channel) + + async def test_deleted_cached_overwrite(self): + """Channel was deleted from the overwrites cache.""" + await self.cog._unsilence(self.channel) + self.cog.previous_overwrites.delete.assert_awaited_once_with(self.channel.id) + + async def test_deleted_cached_time(self): + """Channel was deleted from the timestamp cache.""" + await self.cog._unsilence(self.channel) + self.cog.unsilence_timestamps.delete.assert_awaited_once_with(self.channel.id) + + async def test_cancelled_task(self): + """The scheduled unsilence task should be cancelled.""" + await self.cog._unsilence(self.channel) + self.cog.scheduler.cancel.assert_called_once_with(self.channel.id) + + async def test_preserved_other_overwrites(self): + """Channel's other unrelated overwrites were not changed, including cache misses.""" + for overwrite_json in ('{"send_messages": true, "add_reactions": null}', None): + with self.subTest(overwrite_json=overwrite_json): + self.cog.previous_overwrites.get.return_value = overwrite_json + + prev_overwrite_dict = dict(self.overwrite) + await self.cog._unsilence(self.channel) + new_overwrite_dict = dict(self.overwrite) + + # Remove these keys because they were modified by the unsilence. + del prev_overwrite_dict['send_messages'] + del prev_overwrite_dict['add_reactions'] + del new_overwrite_dict['send_messages'] + del new_overwrite_dict['add_reactions'] + + self.assertDictEqual(prev_overwrite_dict, new_overwrite_dict) diff --git a/tests/bot/exts/moderation/test_slowmode.py b/tests/bot/exts/moderation/test_slowmode.py new file mode 100644 index 000000000..dad751e0d --- /dev/null +++ b/tests/bot/exts/moderation/test_slowmode.py @@ -0,0 +1,113 @@ +import unittest +from unittest import mock + +from dateutil.relativedelta import relativedelta + +from bot.constants import Emojis +from bot.exts.moderation.slowmode import Slowmode +from tests.helpers import MockBot, MockContext, MockTextChannel + + +class SlowmodeTests(unittest.IsolatedAsyncioTestCase): + + def setUp(self) -> None: + self.bot = MockBot() + self.cog = Slowmode(self.bot) + self.ctx = MockContext() + + async def test_get_slowmode_no_channel(self) -> None: + """Get slowmode without a given channel.""" + self.ctx.channel = MockTextChannel(name='python-general', slowmode_delay=5) + + await self.cog.get_slowmode(self.cog, self.ctx, None) + self.ctx.send.assert_called_once_with("The slowmode delay for #python-general is 5 seconds.") + + async def test_get_slowmode_with_channel(self) -> None: + """Get slowmode with a given channel.""" + text_channel = MockTextChannel(name='python-language', slowmode_delay=2) + + await self.cog.get_slowmode(self.cog, self.ctx, text_channel) + self.ctx.send.assert_called_once_with('The slowmode delay for #python-language is 2 seconds.') + + async def test_set_slowmode_no_channel(self) -> None: + """Set slowmode without a given channel.""" + test_cases = ( + ('helpers', 23, True, f'{Emojis.check_mark} The slowmode delay for #helpers is now 23 seconds.'), + ('mods', 76526, False, f'{Emojis.cross_mark} The slowmode delay must be between 0 and 6 hours.'), + ('admins', 97, True, f'{Emojis.check_mark} The slowmode delay for #admins is now 1 minute and 37 seconds.') + ) + + for channel_name, seconds, edited, result_msg in test_cases: + with self.subTest( + channel_mention=channel_name, + seconds=seconds, + edited=edited, + result_msg=result_msg + ): + self.ctx.channel = MockTextChannel(name=channel_name) + + await self.cog.set_slowmode(self.cog, self.ctx, None, relativedelta(seconds=seconds)) + + if edited: + self.ctx.channel.edit.assert_awaited_once_with(slowmode_delay=float(seconds)) + else: + self.ctx.channel.edit.assert_not_called() + + self.ctx.send.assert_called_once_with(result_msg) + + self.ctx.reset_mock() + + async def test_set_slowmode_with_channel(self) -> None: + """Set slowmode with a given channel.""" + test_cases = ( + ('bot-commands', 12, True, f'{Emojis.check_mark} The slowmode delay for #bot-commands is now 12 seconds.'), + ('mod-spam', 21, True, f'{Emojis.check_mark} The slowmode delay for #mod-spam is now 21 seconds.'), + ('admin-spam', 4323598, False, f'{Emojis.cross_mark} The slowmode delay must be between 0 and 6 hours.') + ) + + for channel_name, seconds, edited, result_msg in test_cases: + with self.subTest( + channel_mention=channel_name, + seconds=seconds, + edited=edited, + result_msg=result_msg + ): + text_channel = MockTextChannel(name=channel_name) + + await self.cog.set_slowmode(self.cog, self.ctx, text_channel, relativedelta(seconds=seconds)) + + if edited: + text_channel.edit.assert_awaited_once_with(slowmode_delay=float(seconds)) + else: + text_channel.edit.assert_not_called() + + self.ctx.send.assert_called_once_with(result_msg) + + self.ctx.reset_mock() + + async def test_reset_slowmode_no_channel(self) -> None: + """Reset slowmode without a given channel.""" + self.ctx.channel = MockTextChannel(name='careers', slowmode_delay=6) + + await self.cog.reset_slowmode(self.cog, self.ctx, None) + self.ctx.send.assert_called_once_with( + f'{Emojis.check_mark} The slowmode delay for #careers has been reset to 0 seconds.' + ) + + async def test_reset_slowmode_with_channel(self) -> None: + """Reset slowmode with a given channel.""" + text_channel = MockTextChannel(name='meta', slowmode_delay=1) + + await self.cog.reset_slowmode(self.cog, self.ctx, text_channel) + self.ctx.send.assert_called_once_with( + f'{Emojis.check_mark} The slowmode delay for #meta has been reset to 0 seconds.' + ) + + @mock.patch("bot.exts.moderation.slowmode.has_any_role") + @mock.patch("bot.exts.moderation.slowmode.MODERATION_ROLES", new=(1, 2, 3)) + async def test_cog_check(self, role_check): + """Role check is called with `MODERATION_ROLES`""" + role_check.return_value.predicate = mock.AsyncMock() + await self.cog.cog_check(self.ctx) + role_check.assert_called_once_with(*(1, 2, 3)) + role_check.return_value.predicate.assert_awaited_once_with(self.ctx) diff --git a/tests/bot/cogs/test_cogs.py b/tests/bot/exts/test_cogs.py index fdda59a8f..f8e120262 100644 --- a/tests/bot/cogs/test_cogs.py +++ b/tests/bot/exts/test_cogs.py @@ -10,7 +10,7 @@ from unittest import mock from discord.ext import commands -from bot import cogs +from bot import exts class CommandNameTests(unittest.TestCase): @@ -29,13 +29,14 @@ class CommandNameTests(unittest.TestCase): @staticmethod def walk_modules() -> t.Iterator[ModuleType]: - """Yield imported modules from the bot.cogs subpackage.""" + """Yield imported modules from the bot.exts subpackage.""" def on_error(name: str) -> t.NoReturn: raise ImportError(name=name) # pragma: no cover # The mock prevents asyncio.get_event_loop() from being called. with mock.patch("discord.ext.tasks.loop"): - for module in pkgutil.walk_packages(cogs.__path__, "bot.cogs.", onerror=on_error): + prefix = f"{exts.__name__}." + for module in pkgutil.walk_packages(exts.__path__, prefix, onerror=on_error): if not module.ispkg: yield importlib.import_module(module.name) @@ -53,6 +54,7 @@ class CommandNameTests(unittest.TestCase): """Return a list of all qualified names, including aliases, for the `command`.""" names = [f"{command.full_parent_name} {alias}".strip() for alias in command.aliases] names.append(command.qualified_name) + names += getattr(command, "root_aliases", []) return names diff --git a/tests/bot/exts/utils/__init__.py b/tests/bot/exts/utils/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/bot/exts/utils/__init__.py diff --git a/tests/bot/exts/utils/test_jams.py b/tests/bot/exts/utils/test_jams.py new file mode 100644 index 000000000..45e7b5b51 --- /dev/null +++ b/tests/bot/exts/utils/test_jams.py @@ -0,0 +1,173 @@ +import unittest +from unittest.mock import AsyncMock, MagicMock, create_autospec + +from discord import CategoryChannel + +from bot.constants import Roles +from bot.exts.utils import jams +from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole, MockTextChannel + + +def get_mock_category(channel_count: int, name: str) -> CategoryChannel: + """Return a mocked code jam category.""" + category = create_autospec(CategoryChannel, spec_set=True, instance=True) + category.name = name + category.channels = [MockTextChannel() for _ in range(channel_count)] + + return category + + +class JamCreateTeamTests(unittest.IsolatedAsyncioTestCase): + """Tests for `createteam` command.""" + + def setUp(self): + self.bot = MockBot() + self.admin_role = MockRole(name="Admins", id=Roles.admins) + self.command_user = MockMember([self.admin_role]) + self.guild = MockGuild([self.admin_role]) + self.ctx = MockContext(bot=self.bot, author=self.command_user, guild=self.guild) + self.cog = jams.CodeJams(self.bot) + + async def test_too_small_amount_of_team_members_passed(self): + """Should `ctx.send` and exit early when too small amount of members.""" + for case in (1, 2): + with self.subTest(amount_of_members=case): + self.cog.create_channels = AsyncMock() + self.cog.add_roles = AsyncMock() + + self.ctx.reset_mock() + members = (MockMember() for _ in range(case)) + await self.cog.createteam(self.cog, self.ctx, "foo", members) + + self.ctx.send.assert_awaited_once() + self.cog.create_channels.assert_not_awaited() + self.cog.add_roles.assert_not_awaited() + + async def test_duplicate_members_provided(self): + """Should `ctx.send` and exit early because duplicate members provided and total there is only 1 member.""" + self.cog.create_channels = AsyncMock() + self.cog.add_roles = AsyncMock() + + member = MockMember() + await self.cog.createteam(self.cog, self.ctx, "foo", (member for _ in range(5))) + + self.ctx.send.assert_awaited_once() + self.cog.create_channels.assert_not_awaited() + self.cog.add_roles.assert_not_awaited() + + async def test_result_sending(self): + """Should call `ctx.send` when everything goes right.""" + self.cog.create_channels = AsyncMock() + self.cog.add_roles = AsyncMock() + + members = [MockMember() for _ in range(5)] + await self.cog.createteam(self.cog, self.ctx, "foo", members) + + self.cog.create_channels.assert_awaited_once() + self.cog.add_roles.assert_awaited_once() + self.ctx.send.assert_awaited_once() + + async def test_category_doesnt_exist(self): + """Should create a new code jam category.""" + subtests = ( + [], + [get_mock_category(jams.MAX_CHANNELS - 1, jams.CATEGORY_NAME)], + [get_mock_category(jams.MAX_CHANNELS - 2, "other")], + ) + + for categories in subtests: + self.guild.reset_mock() + self.guild.categories = categories + + with self.subTest(categories=categories): + actual_category = await self.cog.get_category(self.guild) + + self.guild.create_category_channel.assert_awaited_once() + category_overwrites = self.guild.create_category_channel.call_args[1]["overwrites"] + + self.assertFalse(category_overwrites[self.guild.default_role].read_messages) + self.assertTrue(category_overwrites[self.guild.me].read_messages) + self.assertEqual(self.guild.create_category_channel.return_value, actual_category) + + async def test_category_channel_exist(self): + """Should not try to create category channel.""" + expected_category = get_mock_category(jams.MAX_CHANNELS - 2, jams.CATEGORY_NAME) + self.guild.categories = [ + get_mock_category(jams.MAX_CHANNELS - 2, "other"), + expected_category, + get_mock_category(0, jams.CATEGORY_NAME), + ] + + actual_category = await self.cog.get_category(self.guild) + self.assertEqual(expected_category, actual_category) + + async def test_channel_overwrites(self): + """Should have correct permission overwrites for users and roles.""" + leader = MockMember() + members = [leader] + [MockMember() for _ in range(4)] + overwrites = self.cog.get_overwrites(members, self.guild) + + # Leader permission overwrites + self.assertTrue(overwrites[leader].manage_messages) + self.assertTrue(overwrites[leader].read_messages) + self.assertTrue(overwrites[leader].manage_webhooks) + self.assertTrue(overwrites[leader].connect) + + # Other members permission overwrites + for member in members[1:]: + self.assertTrue(overwrites[member].read_messages) + self.assertTrue(overwrites[member].connect) + + # Everyone and verified role overwrite + self.assertFalse(overwrites[self.guild.default_role].read_messages) + self.assertFalse(overwrites[self.guild.default_role].connect) + self.assertFalse(overwrites[self.guild.get_role(Roles.verified)].read_messages) + self.assertFalse(overwrites[self.guild.get_role(Roles.verified)].connect) + + async def test_team_channels_creation(self): + """Should create new voice and text channel for team.""" + members = [MockMember() for _ in range(5)] + + self.cog.get_overwrites = MagicMock() + self.cog.get_category = AsyncMock() + self.ctx.guild.create_text_channel.return_value = MockTextChannel(mention="foobar-channel") + actual = await self.cog.create_channels(self.guild, "my-team", members) + + self.assertEqual("foobar-channel", actual) + self.cog.get_overwrites.assert_called_once_with(members, self.guild) + self.cog.get_category.assert_awaited_once_with(self.guild) + + self.guild.create_text_channel.assert_awaited_once_with( + "my-team", + overwrites=self.cog.get_overwrites.return_value, + category=self.cog.get_category.return_value + ) + self.guild.create_voice_channel.assert_awaited_once_with( + "My Team", + overwrites=self.cog.get_overwrites.return_value, + category=self.cog.get_category.return_value + ) + + async def test_jam_roles_adding(self): + """Should add team leader role to leader and jam role to every team member.""" + leader_role = MockRole(name="Team Leader") + jam_role = MockRole(name="Jammer") + self.guild.get_role.side_effect = [leader_role, jam_role] + + leader = MockMember() + members = [leader] + [MockMember() for _ in range(4)] + await self.cog.add_roles(self.guild, members) + + leader.add_roles.assert_any_await(leader_role) + for member in members: + member.add_roles.assert_any_await(jam_role) + + +class CodeJamSetup(unittest.TestCase): + """Test for `setup` function of `CodeJam` cog.""" + + def test_setup(self): + """Should call `bot.add_cog`.""" + bot = MockBot() + jams.setup(bot) + bot.add_cog.assert_called_once() diff --git a/tests/bot/cogs/test_snekbox.py b/tests/bot/exts/utils/test_snekbox.py index cf9adbee0..9a42d0610 100644 --- a/tests/bot/cogs/test_snekbox.py +++ b/tests/bot/exts/utils/test_snekbox.py @@ -1,13 +1,12 @@ import asyncio -import logging import unittest from unittest.mock import AsyncMock, MagicMock, Mock, call, create_autospec, patch from discord.ext import commands from bot import constants -from bot.cogs import snekbox -from bot.cogs.snekbox import Snekbox +from bot.exts.utils import snekbox +from bot.exts.utils.snekbox import Snekbox from tests.helpers import MockBot, MockContext, MockMessage, MockReaction, MockUser @@ -39,49 +38,27 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): result = await self.cog.upload_output("-" * (snekbox.MAX_PASTE_LEN + 1)) self.assertEqual(result, "too long to upload") - async def test_upload_output(self): + @patch("bot.exts.utils.snekbox.send_to_paste_service") + async def test_upload_output(self, mock_paste_util): """Upload the eval output to the URLs.paste_service.format(key="documents") endpoint.""" - key = "MarkDiamond" - resp = MagicMock() - resp.json = AsyncMock(return_value={"key": key}) - - context_manager = MagicMock() - context_manager.__aenter__.return_value = resp - self.bot.http_session.post.return_value = context_manager - - self.assertEqual( - await self.cog.upload_output("My awesome output"), - constants.URLs.paste_service.format(key=key) - ) - self.bot.http_session.post.assert_called_with( - constants.URLs.paste_service.format(key="documents"), - data="My awesome output", - raise_for_status=True + await self.cog.upload_output("Test output.") + mock_paste_util.assert_called_once_with( + self.bot.http_session, "Test output.", extension="txt" ) - async def test_upload_output_gracefully_fallback_if_exception_during_request(self): - """Output upload gracefully fallback if the upload fail.""" - resp = MagicMock() - resp.json = AsyncMock(side_effect=Exception) - - context_manager = MagicMock() - context_manager.__aenter__.return_value = resp - self.bot.http_session.post.return_value = context_manager - - log = logging.getLogger("bot.cogs.snekbox") - with self.assertLogs(logger=log, level='ERROR'): - await self.cog.upload_output('My awesome output!') - - async def test_upload_output_gracefully_fallback_if_no_key_in_response(self): - """Output upload gracefully fallback if there is no key entry in the response body.""" - self.assertEqual((await self.cog.upload_output('My awesome output!')), None) - def test_prepare_input(self): cases = ( ('print("Hello world!")', 'print("Hello world!")', 'non-formatted'), ('`print("Hello world!")`', 'print("Hello world!")', 'one line code block'), ('```\nprint("Hello world!")```', 'print("Hello world!")', 'multiline code block'), ('```py\nprint("Hello world!")```', 'print("Hello world!")', 'multiline python code block'), + ('text```print("Hello world!")```text', 'print("Hello world!")', 'code block surrounded by text'), + ('```print("Hello world!")```\ntext\n```py\nprint("Hello world!")```', + 'print("Hello world!")\nprint("Hello world!")', 'two code blocks with text in-between'), + ('`print("Hello world!")`\ntext\n```print("How\'s it going?")```', + 'print("How\'s it going?")', 'code block preceded by inline code'), + ('`print("Hello world!")`\ntext\n`print("Hello world!")`', + 'print("Hello world!")', 'one inline code block of two') ) for case, expected, testname in cases: with self.subTest(msg=f'Extract code from {testname}.'): @@ -99,14 +76,14 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): actual = self.cog.get_results_message({'stdout': stdout, 'returncode': returncode}) self.assertEqual(actual, expected) - @patch('bot.cogs.snekbox.Signals', side_effect=ValueError) + @patch('bot.exts.utils.snekbox.Signals', side_effect=ValueError) def test_get_results_message_invalid_signal(self, mock_signals: Mock): self.assertEqual( self.cog.get_results_message({'stdout': '', 'returncode': 127}), ('Your eval job has completed with return code 127', '') ) - @patch('bot.cogs.snekbox.Signals') + @patch('bot.exts.utils.snekbox.Signals') def test_get_results_message_valid_signal(self, mock_signals: Mock): mock_signals.return_value.name = 'SIGTEST' self.assertEqual( @@ -147,12 +124,12 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ('<!@', ("<!@\u200B", None), r'Convert <!@ to <!@\u200B'), ( '\u202E\u202E\u202E', - ('Code block escape attempt detected; will not output result', None), + ('Code block escape attempt detected; will not output result', 'https://testificate.com/'), 'Detect RIGHT-TO-LEFT OVERRIDE' ), ( '\u200B\u200B\u200B', - ('Code block escape attempt detected; will not output result', None), + ('Code block escape attempt detected; will not output result', 'https://testificate.com/'), 'Detect ZERO WIDTH SPACE' ), ('long\nbeard', ('001 | long\n002 | beard', None), 'Two line output'), @@ -184,7 +161,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): self.cog.send_eval = AsyncMock(return_value=response) self.cog.continue_eval = AsyncMock(return_value=None) - await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') + await self.cog.eval_command(self.cog, ctx=ctx, code='MyAwesomeCode') self.cog.prepare_input.assert_called_once_with('MyAwesomeCode') self.cog.send_eval.assert_called_once_with(ctx, 'MyAwesomeFormattedCode') self.cog.continue_eval.assert_called_once_with(ctx, response) @@ -198,7 +175,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): self.cog.continue_eval = AsyncMock() self.cog.continue_eval.side_effect = ('MyAwesomeCode-2', None) - await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') + await self.cog.eval_command(self.cog, ctx=ctx, code='MyAwesomeCode') self.cog.prepare_input.has_calls(call('MyAwesomeCode'), call('MyAwesomeCode-2')) self.cog.send_eval.assert_called_with(ctx, 'MyAwesomeFormattedCode') self.cog.continue_eval.assert_called_with(ctx, response) @@ -210,7 +187,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ctx.author.mention = '@LemonLemonishBeard#0042' ctx.send = AsyncMock() self.cog.jobs = (42,) - await self.cog.eval_command.callback(self.cog, ctx=ctx, code='MyAwesomeCode') + await self.cog.eval_command(self.cog, ctx=ctx, code='MyAwesomeCode') ctx.send.assert_called_once_with( "@LemonLemonishBeard#0042 You've already got a job running - please wait for it to finish!" ) @@ -218,8 +195,8 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): async def test_eval_command_call_help(self): """Test if the eval command call the help command if no code is provided.""" ctx = MockContext(command="sentinel") - await self.cog.eval_command.callback(self.cog, ctx=ctx, code='') - ctx.send_help.assert_called_once_with("sentinel") + await self.cog.eval_command(self.cog, ctx=ctx, code='') + ctx.send_help.assert_called_once_with(ctx.command) async def test_send_eval(self): """Test the send_eval function.""" @@ -233,9 +210,13 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): self.cog.get_status_emoji = MagicMock(return_value=':yay!:') self.cog.format_output = AsyncMock(return_value=('[No output]', None)) + mocked_filter_cog = MagicMock() + mocked_filter_cog.filter_eval = AsyncMock(return_value=False) + self.bot.get_cog.return_value = mocked_filter_cog + await self.cog.send_eval(ctx, 'MyAwesomeCode') ctx.send.assert_called_once_with( - '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```py\n[No output]\n```' + '@LemonLemonishBeard#0042 :yay!: Return code 0.\n\n```\n[No output]\n```' ) self.cog.post_eval.assert_called_once_with('MyAwesomeCode') self.cog.get_status_emoji.assert_called_once_with({'stdout': '', 'returncode': 0}) @@ -254,10 +235,14 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): self.cog.get_status_emoji = MagicMock(return_value=':yay!:') self.cog.format_output = AsyncMock(return_value=('Way too long beard', 'lookatmybeard.com')) + mocked_filter_cog = MagicMock() + mocked_filter_cog.filter_eval = AsyncMock(return_value=False) + self.bot.get_cog.return_value = mocked_filter_cog + await self.cog.send_eval(ctx, 'MyAwesomeCode') ctx.send.assert_called_once_with( '@LemonLemonishBeard#0042 :yay!: Return code 0.' - '\n\n```py\nWay too long beard\n```\nFull output: lookatmybeard.com' + '\n\n```\nWay too long beard\n```\nFull output: lookatmybeard.com' ) self.cog.post_eval.assert_called_once_with('MyAwesomeCode') self.cog.get_status_emoji.assert_called_once_with({'stdout': 'Way too long beard', 'returncode': 0}) @@ -275,16 +260,20 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): self.cog.get_status_emoji = MagicMock(return_value=':nope!:') self.cog.format_output = AsyncMock() # This function isn't called + mocked_filter_cog = MagicMock() + mocked_filter_cog.filter_eval = AsyncMock(return_value=False) + self.bot.get_cog.return_value = mocked_filter_cog + await self.cog.send_eval(ctx, 'MyAwesomeCode') ctx.send.assert_called_once_with( - '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```py\nBeard got stuck in the eval\n```' + '@LemonLemonishBeard#0042 :nope!: Return code 127.\n\n```\nBeard got stuck in the eval\n```' ) self.cog.post_eval.assert_called_once_with('MyAwesomeCode') self.cog.get_status_emoji.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127}) self.cog.get_results_message.assert_called_once_with({'stdout': 'ERROR', 'returncode': 127}) self.cog.format_output.assert_not_called() - @patch("bot.cogs.snekbox.partial") + @patch("bot.exts.utils.snekbox.partial") async def test_continue_eval_does_continue(self, partial_mock): """Test that the continue_eval function does continue if required conditions are met.""" ctx = MockContext(message=MockMessage(add_reaction=AsyncMock(), clear_reactions=AsyncMock())) @@ -308,7 +297,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): ) ) ctx.message.add_reaction.assert_called_once_with(snekbox.REEVAL_EMOJI) - ctx.message.clear_reactions.assert_called_once() + ctx.message.clear_reaction.assert_called_once_with(snekbox.REEVAL_EMOJI) response.delete.assert_called_once() async def test_continue_eval_does_not_continue(self): @@ -317,7 +306,7 @@ class SnekboxTests(unittest.IsolatedAsyncioTestCase): actual = await self.cog.continue_eval(ctx, MockMessage()) self.assertEqual(actual, None) - ctx.message.clear_reactions.assert_called_once() + ctx.message.clear_reaction.assert_called_once_with(snekbox.REEVAL_EMOJI) async def test_get_code(self): """Should return 1st arg (or None) if eval cmd in message, otherwise return full content.""" diff --git a/tests/bot/test_pagination.py b/tests/bot/test_pagination.py index ce880d457..630f2516d 100644 --- a/tests/bot/test_pagination.py +++ b/tests/bot/test_pagination.py @@ -44,18 +44,3 @@ class LinePaginatorTests(TestCase): self.paginator.add_line('x' * (self.paginator.scale_to_size + 1)) # Note: item at index 1 is the truncated line, index 0 is prefix self.assertEqual(self.paginator._current_page[1], 'x' * self.paginator.scale_to_size) - - -class ImagePaginatorTests(TestCase): - """Tests functionality of the `ImagePaginator`.""" - - def setUp(self): - """Create a paginator for the test method.""" - self.paginator = pagination.ImagePaginator() - - def test_add_image_appends_image(self): - """`add_image` appends the image to the image list.""" - image = 'lemon' - self.paginator.add_image(image) - - assert self.paginator.images == [image] diff --git a/tests/bot/utils/test_checks.py b/tests/bot/utils/test_checks.py index de72e5748..883465e0b 100644 --- a/tests/bot/utils/test_checks.py +++ b/tests/bot/utils/test_checks.py @@ -1,48 +1,50 @@ import unittest from unittest.mock import MagicMock +from discord import DMChannel + from bot.utils import checks from bot.utils.checks import InWhitelistCheckFailure from tests.helpers import MockContext, MockRole -class ChecksTests(unittest.TestCase): +class ChecksTests(unittest.IsolatedAsyncioTestCase): """Tests the check functions defined in `bot.checks`.""" def setUp(self): self.ctx = MockContext() - def test_with_role_check_without_guild(self): - """`with_role_check` returns `False` if `Context.guild` is None.""" - self.ctx.guild = None - self.assertFalse(checks.with_role_check(self.ctx)) + async def test_has_any_role_check_without_guild(self): + """`has_any_role_check` returns `False` for non-guild channels.""" + self.ctx.channel = MagicMock(DMChannel) + self.assertFalse(await checks.has_any_role_check(self.ctx)) - def test_with_role_check_without_required_roles(self): - """`with_role_check` returns `False` if `Context.author` lacks the required role.""" + async def test_has_any_role_check_without_required_roles(self): + """`has_any_role_check` returns `False` if `Context.author` lacks the required role.""" self.ctx.author.roles = [] - self.assertFalse(checks.with_role_check(self.ctx)) + self.assertFalse(await checks.has_any_role_check(self.ctx)) - def test_with_role_check_with_guild_and_required_role(self): - """`with_role_check` returns `True` if `Context.author` has the required role.""" + async def test_has_any_role_check_with_guild_and_required_role(self): + """`has_any_role_check` returns `True` if `Context.author` has the required role.""" self.ctx.author.roles.append(MockRole(id=10)) - self.assertTrue(checks.with_role_check(self.ctx, 10)) + self.assertTrue(await checks.has_any_role_check(self.ctx, 10)) - def test_without_role_check_without_guild(self): - """`without_role_check` should return `False` when `Context.guild` is None.""" - self.ctx.guild = None - self.assertFalse(checks.without_role_check(self.ctx)) + async def test_has_no_roles_check_without_guild(self): + """`has_no_roles_check` should return `False` when `Context.guild` is None.""" + self.ctx.channel = MagicMock(DMChannel) + self.assertFalse(await checks.has_no_roles_check(self.ctx)) - def test_without_role_check_returns_false_with_unwanted_role(self): - """`without_role_check` returns `False` if `Context.author` has unwanted role.""" + async def test_has_no_roles_check_returns_false_with_unwanted_role(self): + """`has_no_roles_check` returns `False` if `Context.author` has unwanted role.""" role_id = 42 self.ctx.author.roles.append(MockRole(id=role_id)) - self.assertFalse(checks.without_role_check(self.ctx, role_id)) + self.assertFalse(await checks.has_no_roles_check(self.ctx, role_id)) - def test_without_role_check_returns_true_without_unwanted_role(self): - """`without_role_check` returns `True` if `Context.author` does not have unwanted role.""" + async def test_has_no_roles_check_returns_true_without_unwanted_role(self): + """`has_no_roles_check` returns `True` if `Context.author` does not have unwanted role.""" role_id = 42 self.ctx.author.roles.append(MockRole(id=role_id)) - self.assertTrue(checks.without_role_check(self.ctx, role_id + 10)) + self.assertTrue(await checks.has_no_roles_check(self.ctx, role_id + 10)) def test_in_whitelist_check_correct_channel(self): """`in_whitelist_check` returns `True` if `Context.channel.id` is in the channel list.""" diff --git a/tests/bot/utils/test_redis_cache.py b/tests/bot/utils/test_redis_cache.py deleted file mode 100644 index a2f0fe55d..000000000 --- a/tests/bot/utils/test_redis_cache.py +++ /dev/null @@ -1,265 +0,0 @@ -import asyncio -import unittest - -import fakeredis.aioredis - -from bot.utils import RedisCache -from bot.utils.redis_cache import NoBotInstanceError, NoNamespaceError, NoParentInstanceError -from tests import helpers - - -class RedisCacheTests(unittest.IsolatedAsyncioTestCase): - """Tests the RedisCache class from utils.redis_dict.py.""" - - async def asyncSetUp(self): # noqa: N802 - """Sets up the objects that only have to be initialized once.""" - self.bot = helpers.MockBot() - self.bot.redis_session = await fakeredis.aioredis.create_redis_pool() - - # Okay, so this is necessary so that we can create a clean new - # class for every test method, and we want that because it will - # ensure we get a fresh loop, which is necessary for test_increment_lock - # to be able to pass. - class DummyCog: - """A dummy cog, for dummies.""" - - redis = RedisCache() - - def __init__(self, bot: helpers.MockBot): - self.bot = bot - - self.cog = DummyCog(self.bot) - - await self.cog.redis.clear() - - def test_class_attribute_namespace(self): - """Test that RedisDict creates a namespace automatically for class attributes.""" - self.assertEqual(self.cog.redis._namespace, "DummyCog.redis") - - async def test_class_attribute_required(self): - """Test that errors are raised when not assigned as a class attribute.""" - bad_cache = RedisCache() - self.assertIs(bad_cache._namespace, None) - - with self.assertRaises(RuntimeError): - await bad_cache.set("test", "me_up_deadman") - - async def test_set_get_item(self): - """Test that users can set and get items from the RedisDict.""" - test_cases = ( - ('favorite_fruit', 'melon'), - ('favorite_number', 86), - ('favorite_fraction', 86.54), - ('favorite_boolean', False), - ('other_boolean', True), - ) - - # Test that we can get and set different types. - for test in test_cases: - await self.cog.redis.set(*test) - self.assertEqual(await self.cog.redis.get(test[0]), test[1]) - - # Test that .get allows a default value - self.assertEqual(await self.cog.redis.get('favorite_nothing', "bearclaw"), "bearclaw") - - async def test_set_item_type(self): - """Test that .set rejects keys and values that are not permitted.""" - fruits = ["lemon", "melon", "apple"] - - with self.assertRaises(TypeError): - await self.cog.redis.set(fruits, "nice") - - with self.assertRaises(TypeError): - await self.cog.redis.set(4.23, "nice") - - async def test_delete_item(self): - """Test that .delete allows us to delete stuff from the RedisCache.""" - # Add an item and verify that it gets added - await self.cog.redis.set("internet", "firetruck") - self.assertEqual(await self.cog.redis.get("internet"), "firetruck") - - # Delete that item and verify that it gets deleted - await self.cog.redis.delete("internet") - self.assertIs(await self.cog.redis.get("internet"), None) - - async def test_contains(self): - """Test that we can check membership with .contains.""" - await self.cog.redis.set('favorite_country', "Burkina Faso") - - self.assertIs(await self.cog.redis.contains('favorite_country'), True) - self.assertIs(await self.cog.redis.contains('favorite_dentist'), False) - - async def test_items(self): - """Test that the RedisDict can be iterated.""" - # Set up our test cases in the Redis cache - test_cases = [ - ('favorite_turtle', 'Donatello'), - ('second_favorite_turtle', 'Leonardo'), - ('third_favorite_turtle', 'Raphael'), - ] - for key, value in test_cases: - await self.cog.redis.set(key, value) - - # Consume the AsyncIterator into a regular list, easier to compare that way. - redis_items = [item for item in await self.cog.redis.items()] - - # These sequences are probably in the same order now, but probably - # isn't good enough for tests. Let's not rely on .hgetall always - # returning things in sequence, and just sort both lists to be safe. - redis_items = sorted(redis_items) - test_cases = sorted(test_cases) - - # If these are equal now, everything works fine. - self.assertSequenceEqual(test_cases, redis_items) - - async def test_length(self): - """Test that we can get the correct .length from the RedisDict.""" - await self.cog.redis.set('one', 1) - await self.cog.redis.set('two', 2) - await self.cog.redis.set('three', 3) - self.assertEqual(await self.cog.redis.length(), 3) - - await self.cog.redis.set('four', 4) - self.assertEqual(await self.cog.redis.length(), 4) - - async def test_to_dict(self): - """Test that the .to_dict method returns a workable dictionary copy.""" - copy = await self.cog.redis.to_dict() - local_copy = {key: value for key, value in await self.cog.redis.items()} - self.assertIs(type(copy), dict) - self.assertDictEqual(copy, local_copy) - - async def test_clear(self): - """Test that the .clear method removes the entire hash.""" - await self.cog.redis.set('teddy', 'with me') - await self.cog.redis.set('in my dreams', 'you have a weird hat') - self.assertEqual(await self.cog.redis.length(), 2) - - await self.cog.redis.clear() - self.assertEqual(await self.cog.redis.length(), 0) - - async def test_pop(self): - """Test that we can .pop an item from the RedisDict.""" - await self.cog.redis.set('john', 'was afraid') - - self.assertEqual(await self.cog.redis.pop('john'), 'was afraid') - self.assertEqual(await self.cog.redis.pop('pete', 'breakneck'), 'breakneck') - self.assertEqual(await self.cog.redis.length(), 0) - - async def test_update(self): - """Test that we can .update the RedisDict with multiple items.""" - await self.cog.redis.set("reckfried", "lona") - await self.cog.redis.set("bel air", "prince") - await self.cog.redis.update({ - "reckfried": "jona", - "mega": "hungry, though", - }) - - result = { - "reckfried": "jona", - "bel air": "prince", - "mega": "hungry, though", - } - self.assertDictEqual(await self.cog.redis.to_dict(), result) - - def test_typestring_conversion(self): - """Test the typestring-related helper functions.""" - conversion_tests = ( - (12, "i|12"), - (12.4, "f|12.4"), - ("cowabunga", "s|cowabunga"), - ) - - # Test conversion to typestring - for _input, expected in conversion_tests: - self.assertEqual(self.cog.redis._value_to_typestring(_input), expected) - - # Test conversion from typestrings - for _input, expected in conversion_tests: - self.assertEqual(self.cog.redis._value_from_typestring(expected), _input) - - # Test that exceptions are raised on invalid input - with self.assertRaises(TypeError): - self.cog.redis._value_to_typestring(["internet"]) - self.cog.redis._value_from_typestring("o|firedog") - - async def test_increment_decrement(self): - """Test .increment and .decrement methods.""" - await self.cog.redis.set("entropic", 5) - await self.cog.redis.set("disentropic", 12.5) - - # Test default increment - await self.cog.redis.increment("entropic") - self.assertEqual(await self.cog.redis.get("entropic"), 6) - - # Test default decrement - await self.cog.redis.decrement("entropic") - self.assertEqual(await self.cog.redis.get("entropic"), 5) - - # Test float increment with float - await self.cog.redis.increment("disentropic", 2.0) - self.assertEqual(await self.cog.redis.get("disentropic"), 14.5) - - # Test float increment with int - await self.cog.redis.increment("disentropic", 2) - self.assertEqual(await self.cog.redis.get("disentropic"), 16.5) - - # Test negative increments, because why not. - await self.cog.redis.increment("entropic", -5) - self.assertEqual(await self.cog.redis.get("entropic"), 0) - - # Negative decrements? Sure. - await self.cog.redis.decrement("entropic", -5) - self.assertEqual(await self.cog.redis.get("entropic"), 5) - - # What about if we use a negative float to decrement an int? - # This should convert the type into a float. - await self.cog.redis.decrement("entropic", -2.5) - self.assertEqual(await self.cog.redis.get("entropic"), 7.5) - - # Let's test that they raise the right errors - with self.assertRaises(KeyError): - await self.cog.redis.increment("doesn't_exist!") - - await self.cog.redis.set("stringthing", "stringthing") - with self.assertRaises(TypeError): - await self.cog.redis.increment("stringthing") - - async def test_increment_lock(self): - """Test that we can't produce a race condition in .increment.""" - await self.cog.redis.set("test_key", 0) - tasks = [] - - # Increment this a lot in different tasks - for _ in range(100): - task = asyncio.create_task( - self.cog.redis.increment("test_key", 1) - ) - tasks.append(task) - await asyncio.gather(*tasks) - - # Confirm that the value has been incremented the exact right number of times. - value = await self.cog.redis.get("test_key") - self.assertEqual(value, 100) - - async def test_exceptions_raised(self): - """Testing that the various RuntimeErrors are reachable.""" - class MyCog: - cache = RedisCache() - - def __init__(self): - self.other_cache = RedisCache() - - cog = MyCog() - - # Raises "No Bot instance" - with self.assertRaises(NoBotInstanceError): - await cog.cache.get("john") - - # Raises "RedisCache has no namespace" - with self.assertRaises(NoNamespaceError): - await cog.other_cache.get("was") - - # Raises "You must access the RedisCache instance through the cog instance" - with self.assertRaises(NoParentInstanceError): - await MyCog.cache.get("afraid") diff --git a/tests/bot/utils/test_services.py b/tests/bot/utils/test_services.py new file mode 100644 index 000000000..5e0855704 --- /dev/null +++ b/tests/bot/utils/test_services.py @@ -0,0 +1,74 @@ +import logging +import unittest +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +from aiohttp import ClientConnectorError + +from bot.utils.services import FAILED_REQUEST_ATTEMPTS, send_to_paste_service + + +class PasteTests(unittest.IsolatedAsyncioTestCase): + def setUp(self) -> None: + self.http_session = MagicMock() + + @patch("bot.utils.services.URLs.paste_service", "https://paste_service.com/{key}") + async def test_url_and_sent_contents(self): + """Correct url was used and post was called with expected data.""" + response = MagicMock( + json=AsyncMock(return_value={"key": ""}) + ) + self.http_session.post().__aenter__.return_value = response + self.http_session.post.reset_mock() + await send_to_paste_service(self.http_session, "Content") + self.http_session.post.assert_called_once_with("https://paste_service.com/documents", data="Content") + + @patch("bot.utils.services.URLs.paste_service", "https://paste_service.com/{key}") + async def test_paste_returns_correct_url_on_success(self): + """Url with specified extension is returned on successful requests.""" + key = "paste_key" + test_cases = ( + (f"https://paste_service.com/{key}.txt", "txt"), + (f"https://paste_service.com/{key}.py", "py"), + (f"https://paste_service.com/{key}", ""), + ) + response = MagicMock( + json=AsyncMock(return_value={"key": key}) + ) + self.http_session.post().__aenter__.return_value = response + + for expected_output, extension in test_cases: + with self.subTest(msg=f"Send contents with extension {repr(extension)}"): + self.assertEqual( + await send_to_paste_service(self.http_session, "", extension=extension), + expected_output + ) + + async def test_request_repeated_on_json_errors(self): + """Json with error message and invalid json are handled as errors and requests repeated.""" + test_cases = ({"message": "error"}, {"unexpected_key": None}, {}) + self.http_session.post().__aenter__.return_value = response = MagicMock() + self.http_session.post.reset_mock() + + for error_json in test_cases: + with self.subTest(error_json=error_json): + response.json = AsyncMock(return_value=error_json) + result = await send_to_paste_service(self.http_session, "") + self.assertEqual(self.http_session.post.call_count, FAILED_REQUEST_ATTEMPTS) + self.assertIsNone(result) + + self.http_session.post.reset_mock() + + async def test_request_repeated_on_connection_errors(self): + """Requests are repeated in the case of connection errors.""" + self.http_session.post = MagicMock(side_effect=ClientConnectorError(Mock(), Mock())) + result = await send_to_paste_service(self.http_session, "") + self.assertEqual(self.http_session.post.call_count, FAILED_REQUEST_ATTEMPTS) + self.assertIsNone(result) + + async def test_general_error_handled_and_request_repeated(self): + """All `Exception`s are handled, logged and request repeated.""" + self.http_session.post = MagicMock(side_effect=Exception) + result = await send_to_paste_service(self.http_session, "") + self.assertEqual(self.http_session.post.call_count, FAILED_REQUEST_ATTEMPTS) + self.assertLogs("bot.utils", logging.ERROR) + self.assertIsNone(result) diff --git a/tests/helpers.py b/tests/helpers.py index facc4e1af..870f66197 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -5,7 +5,7 @@ import itertools import logging import unittest.mock from asyncio import AbstractEventLoop -from typing import Callable, Iterable, Optional +from typing import Iterable, Optional import discord from aiohttp import ClientSession @@ -14,6 +14,7 @@ from discord.ext.commands import Context from bot.api import APIClient from bot.async_stats import AsyncStatsClient from bot.bot import Bot +from tests._autospec import autospec # noqa: F401 other modules import it via this module for logger in logging.Logger.manager.loggerDict.values(): @@ -26,24 +27,6 @@ for logger in logging.Logger.manager.loggerDict.values(): logger.setLevel(logging.CRITICAL) -def autospec(target, *attributes: str, **kwargs) -> Callable: - """Patch multiple `attributes` of a `target` with autospecced mocks and `spec_set` as True.""" - # Caller's kwargs should take priority and overwrite the defaults. - kwargs = {'spec_set': True, 'autospec': True, **kwargs} - - # Import the target if it's a string. - # This is to support both object and string targets like patch.multiple. - if type(target) is str: - target = unittest.mock._importer(target) - - def decorator(func): - for attribute in attributes: - patcher = unittest.mock.patch.object(target, attribute, **kwargs) - func = patcher(func) - return func - return decorator - - class HashableMixin(discord.mixins.EqualityComparable): """ Mixin that provides similar hashing and equality functionality as discord.py's `Hashable` mixin. @@ -308,7 +291,11 @@ class MockBot(CustomMockMixin, unittest.mock.MagicMock): Instances of this class will follow the specifications of `discord.ext.commands.Bot` instances. For more information, see the `MockGuild` docstring. """ - spec_set = Bot(command_prefix=unittest.mock.MagicMock(), loop=_get_mock_loop()) + spec_set = Bot( + command_prefix=unittest.mock.MagicMock(), + loop=_get_mock_loop(), + redis_session=unittest.mock.MagicMock(), + ) additional_spec_asyncs = ("wait_for", "redis_ready") def __init__(self, **kwargs) -> None: |