diff options
author | 2023-12-14 20:28:17 +0800 | |
---|---|---|
committer | 2023-12-14 20:28:17 +0800 | |
commit | 449c08fd5459b2f804dbf825086ec1dd0f244d8a (patch) | |
tree | e4589cb227cdb2e611bcbf9b02ea481fe24cdb34 /pydis_site/apps/api/tests | |
parent | Resize theme switch (diff) | |
parent | Merge pull request #1173 from python-discord/dependabot/pip/sentry-sdk-1.39.0 (diff) |
Fix all conflicts
hopefully I dont have to do this again
Diffstat (limited to 'pydis_site/apps/api/tests')
22 files changed, 1175 insertions, 1244 deletions
diff --git a/pydis_site/apps/api/tests/base.py b/pydis_site/apps/api/tests/base.py index c9f3cb7e..704b22cf 100644 --- a/pydis_site/apps/api/tests/base.py +++ b/pydis_site/apps/api/tests/base.py @@ -61,6 +61,7 @@ class AuthenticatedAPITestCase(APITestCase): ... self.assertEqual(response.status_code, 200) """ - def setUp(self): + def setUp(self) -> None: + """Bootstrap the user and authenticate it.""" super().setUp() self.client.force_authenticate(test_user) diff --git a/pydis_site/apps/api/tests/migrations/__init__.py b/pydis_site/apps/api/tests/migrations/__init__.py deleted file mode 100644 index 38e42ffc..00000000 --- a/pydis_site/apps/api/tests/migrations/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""This submodule contains tests for functions used in data migrations.""" diff --git a/pydis_site/apps/api/tests/migrations/base.py b/pydis_site/apps/api/tests/migrations/base.py deleted file mode 100644 index 0c0a5bd0..00000000 --- a/pydis_site/apps/api/tests/migrations/base.py +++ /dev/null @@ -1,102 +0,0 @@ -"""Includes utilities for testing migrations.""" -from django.db import connection -from django.db.migrations.executor import MigrationExecutor -from django.test import TestCase - - -class MigrationsTestCase(TestCase): - """ - A `TestCase` subclass to test migration files. - - To be able to properly test a migration, we will need to inject data into the test database - before the migrations we want to test are applied, but after the older migrations have been - applied. This makes sure that we are testing "as if" we were actually applying this migration - to a database in the state it was in before introducing the new migration. - - To set up a MigrationsTestCase, create a subclass of this class and set the following - class-level attributes: - - - app: The name of the app that contains the migrations (e.g., `'api'`) - - migration_prior: The name* of the last migration file before the migrations you want to test - - migration_target: The name* of the last migration file we want to test - - *) Specify the file names without a path or the `.py` file extension. - - Additionally, overwrite the `setUpMigrationData` in the subclass to inject data into the - database before the migrations we want to test are applied. Please read the docstring of the - method for more information. An optional hook, `setUpPostMigrationData` is also provided. - """ - - # These class-level attributes should be set in classes that inherit from this base class. - app = None - migration_prior = None - migration_target = None - - @classmethod - def setUpTestData(cls): - """ - Injects data into the test database prior to the migration we're trying to test. - - This class methods reverts the test database back to the state of the last migration file - prior to the migrations we want to test. It will then allow the user to inject data into the - test database by calling the `setUpMigrationData` hook. After the data has been injected, it - will apply the migrations we want to test and call the `setUpPostMigrationData` hook. The - user can now test if the migration correctly migrated the injected test data. - """ - if not cls.app: - raise ValueError("The `app` attribute was not set.") - - if not cls.migration_prior or not cls.migration_target: - raise ValueError("Both ` migration_prior` and `migration_target` need to be set.") - - cls.migrate_from = [(cls.app, cls.migration_prior)] - cls.migrate_to = [(cls.app, cls.migration_target)] - - # Reverse to database state prior to the migrations we want to test - executor = MigrationExecutor(connection) - executor.migrate(cls.migrate_from) - - # Call the data injection hook with the current state of the project - old_apps = executor.loader.project_state(cls.migrate_from).apps - cls.setUpMigrationData(old_apps) - - # Run the migrations we want to test - executor = MigrationExecutor(connection) - executor.loader.build_graph() - executor.migrate(cls.migrate_to) - - # Save the project state so we're able to work with the correct model states - cls.apps = executor.loader.project_state(cls.migrate_to).apps - - # Call `setUpPostMigrationData` to potentially set up post migration data used in testing - cls.setUpPostMigrationData(cls.apps) - - @classmethod - def setUpMigrationData(cls, apps): - """ - Override this method to inject data into the test database before the migration is applied. - - This method will be called after setting up the database according to the migrations that - come before the migration(s) we are trying to test, but before the to-be-tested migration(s) - are applied. This allows us to simulate a database state just prior to the migrations we are - trying to test. - - To make sure we're creating objects according to the state the models were in at this point - in the migration history, use `apps.get_model(app_name: str, model_name: str)` to get the - appropriate model, e.g.: - - >>> Infraction = apps.get_model('api', 'Infraction') - """ - pass - - @classmethod - def setUpPostMigrationData(cls, apps): - """ - Set up additional test data after the target migration has been applied. - - Use `apps.get_model(app_name: str, model_name: str)` to get the correct instances of the - model classes: - - >>> Infraction = apps.get_model('api', 'Infraction') - """ - pass diff --git a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py deleted file mode 100644 index 8dc29b34..00000000 --- a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py +++ /dev/null @@ -1,496 +0,0 @@ -"""Tests for the data migration in `filename`.""" -import logging -from collections import ChainMap, namedtuple -from datetime import timedelta -from itertools import count -from typing import Dict, Iterable, Type, Union - -from django.db.models import Q -from django.forms.models import model_to_dict -from django.utils import timezone - -from pydis_site.apps.api.models import Infraction, User -from .base import MigrationsTestCase - -log = logging.getLogger(__name__) -log.setLevel(logging.DEBUG) - - -InfractionHistory = namedtuple('InfractionHistory', ("user_id", "infraction_history")) - - -class InfractionFactory: - """Factory that creates infractions for a User instance.""" - - infraction_id = count(1) - user_id = count(1) - default_values = { - 'active': True, - 'expires_at': None, - 'hidden': False, - } - - @classmethod - def create( - cls, - actor: User, - infractions: Iterable[Dict[str, Union[str, int, bool]]], - infraction_model: Type[Infraction] = Infraction, - user_model: Type[User] = User, - ) -> InfractionHistory: - """ - Creates `infractions` for the `user` with the given `actor`. - - The `infractions` dictionary can contain the following fields: - - `type` (required) - - `active` (default: True) - - `expires_at` (default: None; i.e, permanent) - - `hidden` (default: False). - - The parameters `infraction_model` and `user_model` can be used to pass in an instance of - both model classes from a different migration/project state. - """ - user_id = next(cls.user_id) - user = user_model.objects.create( - id=user_id, - name=f"Infracted user {user_id}", - discriminator=user_id, - avatar_hash=None, - ) - infraction_history = [] - - for infraction in infractions: - infraction = dict(infraction) - infraction["id"] = next(cls.infraction_id) - infraction = ChainMap(infraction, cls.default_values) - new_infraction = infraction_model.objects.create( - user=user, - actor=actor, - type=infraction["type"], - reason=f"`{infraction['type']}` infraction (ID: {infraction['id']} of {user}", - active=infraction['active'], - hidden=infraction['hidden'], - expires_at=infraction['expires_at'], - ) - infraction_history.append(new_infraction) - - return InfractionHistory(user_id=user_id, infraction_history=infraction_history) - - -class InfractionFactoryTests(MigrationsTestCase): - """Tests for the InfractionFactory.""" - - app = "api" - migration_prior = "0046_reminder_jump_url" - migration_target = "0046_reminder_jump_url" - - @classmethod - def setUpPostMigrationData(cls, apps): - """Create a default actor for all infractions.""" - cls.infraction_model = apps.get_model('api', 'Infraction') - cls.user_model = apps.get_model('api', 'User') - - cls.actor = cls.user_model.objects.create( - id=9999, - name="Unknown Moderator", - discriminator=1040, - avatar_hash=None, - ) - - def test_infraction_factory_total_count(self): - """Does the test database hold as many infractions as we tried to create?""" - InfractionFactory.create( - actor=self.actor, - infractions=( - {'type': 'kick', 'active': False, 'hidden': False}, - {'type': 'ban', 'active': True, 'hidden': False}, - {'type': 'note', 'active': False, 'hidden': True}, - ), - infraction_model=self.infraction_model, - user_model=self.user_model, - ) - database_count = Infraction.objects.all().count() - self.assertEqual(3, database_count) - - def test_infraction_factory_multiple_users(self): - """Does the test database hold as many infractions as we tried to create?""" - for _user in range(5): - InfractionFactory.create( - actor=self.actor, - infractions=( - {'type': 'kick', 'active': False, 'hidden': True}, - {'type': 'ban', 'active': True, 'hidden': False}, - ), - infraction_model=self.infraction_model, - user_model=self.user_model, - ) - - # Check if infractions and users are recorded properly in the database - database_count = Infraction.objects.all().count() - self.assertEqual(database_count, 10) - - user_count = User.objects.all().count() - self.assertEqual(user_count, 5 + 1) - - def test_infraction_factory_sets_correct_fields(self): - """Does the InfractionFactory set the correct attributes?""" - infractions = ( - { - 'type': 'note', - 'active': False, - 'hidden': True, - 'expires_at': timezone.now() - }, - {'type': 'warning', 'active': False, 'hidden': False, 'expires_at': None}, - {'type': 'watch', 'active': False, 'hidden': True, 'expires_at': None}, - {'type': 'mute', 'active': True, 'hidden': False, 'expires_at': None}, - {'type': 'kick', 'active': True, 'hidden': True, 'expires_at': None}, - {'type': 'ban', 'active': True, 'hidden': False, 'expires_at': None}, - { - 'type': 'superstar', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() - }, - ) - - InfractionFactory.create( - actor=self.actor, - infractions=infractions, - infraction_model=self.infraction_model, - user_model=self.user_model, - ) - - for infraction in infractions: - with self.subTest(**infraction): - self.assertTrue(Infraction.objects.filter(**infraction).exists()) - - -class ActiveInfractionMigrationTests(MigrationsTestCase): - """ - Tests the active infraction data migration. - - The active infraction data migration should do the following things: - - 1. migrates all active notes, warnings, and kicks to an inactive status; - 2. migrates all users with multiple active infractions of a single type to have only one active - infraction of that type. The infraction with the longest duration stays active. - """ - - app = "api" - migration_prior = "0046_reminder_jump_url" - migration_target = "0047_active_infractions_migration" - - @classmethod - def setUpMigrationData(cls, apps): - """Sets up an initial database state that contains the relevant test cases.""" - # Fetch the Infraction and User model in the current migration state - cls.infraction_model = apps.get_model('api', 'Infraction') - cls.user_model = apps.get_model('api', 'User') - - cls.created_infractions = {} - - # Moderator that serves as actor for all infractions - cls.user_moderator = cls.user_model.objects.create( - id=9999, - name="Olivier de Vienne", - discriminator=1040, - avatar_hash=None, - ) - - # User #1: clean user with no infractions - cls.created_infractions["no infractions"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=[], - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #2: One inactive note infraction - cls.created_infractions["one inactive note"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'note', 'active': False, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #3: One active note infraction - cls.created_infractions["one active note"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'note', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #4: One active and one inactive note infraction - cls.created_infractions["one active and one inactive note"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'note', 'active': False, 'hidden': True}, - {'type': 'note', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #5: Once active note, one active kick, once active warning - cls.created_infractions["active note, kick, warning"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'note', 'active': True, 'hidden': True}, - {'type': 'kick', 'active': True, 'hidden': True}, - {'type': 'warning', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #6: One inactive ban and one active ban - cls.created_infractions["one inactive and one active ban"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'ban', 'active': False, 'hidden': True}, - {'type': 'ban', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #7: Two active permanent bans - cls.created_infractions["two active perm bans"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'ban', 'active': True, 'hidden': True}, - {'type': 'ban', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #8: Multiple active temporary bans - cls.created_infractions["multiple active temp bans"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=1) - }, - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=10) - }, - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=20) - }, - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=5) - }, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #9: One active permanent ban, two active temporary bans - cls.created_infractions["active perm, two active temp bans"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=10) - }, - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': None, - }, - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=7) - }, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #10: One inactive permanent ban, two active temporary bans - cls.created_infractions["one inactive perm ban, two active temp bans"] = ( - InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=10) - }, - { - 'type': 'ban', - 'active': False, - 'hidden': True, - 'expires_at': None, - }, - { - 'type': 'ban', - 'active': True, - 'hidden': True, - 'expires_at': timezone.now() + timedelta(days=7) - }, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - ) - - # User #11: Active ban, active mute, active superstar - cls.created_infractions["active ban, mute, and superstar"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'ban', 'active': True, 'hidden': True}, - {'type': 'mute', 'active': True, 'hidden': True}, - {'type': 'superstar', 'active': True, 'hidden': True}, - {'type': 'watch', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - # User #12: Multiple active bans, active mutes, active superstars - cls.created_infractions["multiple active bans, mutes, stars"] = InfractionFactory.create( - actor=cls.user_moderator, - infractions=( - {'type': 'ban', 'active': True, 'hidden': True}, - {'type': 'ban', 'active': True, 'hidden': True}, - {'type': 'ban', 'active': True, 'hidden': True}, - {'type': 'mute', 'active': True, 'hidden': True}, - {'type': 'mute', 'active': True, 'hidden': True}, - {'type': 'mute', 'active': True, 'hidden': True}, - {'type': 'superstar', 'active': True, 'hidden': True}, - {'type': 'superstar', 'active': True, 'hidden': True}, - {'type': 'superstar', 'active': True, 'hidden': True}, - {'type': 'watch', 'active': True, 'hidden': True}, - {'type': 'watch', 'active': True, 'hidden': True}, - {'type': 'watch', 'active': True, 'hidden': True}, - ), - infraction_model=cls.infraction_model, - user_model=cls.user_model, - ) - - def test_all_never_active_types_became_inactive(self): - """Are all infractions of a non-active type inactive after the migration?""" - inactive_type_query = Q(type="note") | Q(type="warning") | Q(type="kick") - self.assertFalse( - self.infraction_model.objects.filter(inactive_type_query, active=True).exists() - ) - - def test_migration_left_clean_user_without_infractions(self): - """Do users without infractions have no infractions after the migration?""" - user_id, infraction_history = self.created_infractions["no infractions"] - self.assertFalse( - self.infraction_model.objects.filter(user__id=user_id).exists() - ) - - def test_migration_left_user_with_inactive_note_untouched(self): - """Did the migration leave users with only an inactive note untouched?""" - user_id, infraction_history = self.created_infractions["one inactive note"] - inactive_note = infraction_history[0] - self.assertTrue( - self.infraction_model.objects.filter(**model_to_dict(inactive_note)).exists() - ) - - def test_migration_only_touched_active_field_of_active_note(self): - """Does the migration only change the `active` field?""" - user_id, infraction_history = self.created_infractions["one active note"] - note = model_to_dict(infraction_history[0]) - note['active'] = False - self.assertTrue( - self.infraction_model.objects.filter(**note).exists() - ) - - def test_migration_only_touched_active_field_of_active_note_left_inactive_untouched(self): - """Does the migration only change the `active` field of active notes?""" - user_id, infraction_history = self.created_infractions["one active and one inactive note"] - for note in infraction_history: - with self.subTest(active=note.active): - note = model_to_dict(note) - note['active'] = False - self.assertTrue( - self.infraction_model.objects.filter(**note).exists() - ) - - def test_migration_migrates_all_nonactive_types_to_inactive(self): - """Do we set the `active` field of all non-active infractions to `False`?""" - user_id, infraction_history = self.created_infractions["active note, kick, warning"] - self.assertFalse( - self.infraction_model.objects.filter(user__id=user_id, active=True).exists() - ) - - def test_migration_leaves_user_with_one_active_ban_untouched(self): - """Do we leave a user with one active and one inactive ban untouched?""" - user_id, infraction_history = self.created_infractions["one inactive and one active ban"] - for infraction in infraction_history: - with self.subTest(active=infraction.active): - self.assertTrue( - self.infraction_model.objects.filter(**model_to_dict(infraction)).exists() - ) - - def test_migration_turns_double_active_perm_ban_into_single_active_perm_ban(self): - """Does the migration turn two active permanent bans into one active permanent ban?""" - user_id, infraction_history = self.created_infractions["two active perm bans"] - active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count() - self.assertEqual(active_count, 1) - - def test_migration_leaves_temporary_ban_with_longest_duration_active(self): - """Does the migration turn two active permanent bans into one active permanent ban?""" - user_id, infraction_history = self.created_infractions["multiple active temp bans"] - active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) - self.assertEqual(active_ban.expires_at, infraction_history[2].expires_at) - - def test_migration_leaves_permanent_ban_active(self): - """Does the migration leave the permanent ban active?""" - user_id, infraction_history = self.created_infractions["active perm, two active temp bans"] - active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) - self.assertIsNone(active_ban.expires_at) - - def test_migration_leaves_longest_temp_ban_active_with_inactive_permanent_ban(self): - """Does the longest temp ban stay active, even with an inactive perm ban present?""" - user_id, infraction_history = self.created_infractions[ - "one inactive perm ban, two active temp bans" - ] - active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) - self.assertEqual(active_ban.expires_at, infraction_history[0].expires_at) - - def test_migration_leaves_all_active_types_active_if_one_of_each_exists(self): - """Do all active infractions stay active if only one of each is present?""" - user_id, infraction_history = self.created_infractions["active ban, mute, and superstar"] - active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count() - self.assertEqual(active_count, 4) - - def test_migration_reduces_all_active_types_to_a_single_active_infraction(self): - """Do we reduce all of the infraction types to one active infraction?""" - user_id, infraction_history = self.created_infractions["multiple active bans, mutes, stars"] - active_infractions = self.infraction_model.objects.filter(user__id=user_id, active=True) - self.assertEqual(len(active_infractions), 4) - types_observed = [infraction.type for infraction in active_infractions] - - for infraction_type in ('ban', 'mute', 'superstar', 'watch'): - with self.subTest(type=infraction_type): - self.assertIn(infraction_type, types_observed) diff --git a/pydis_site/apps/api/tests/migrations/test_base.py b/pydis_site/apps/api/tests/migrations/test_base.py deleted file mode 100644 index f69bc92c..00000000 --- a/pydis_site/apps/api/tests/migrations/test_base.py +++ /dev/null @@ -1,135 +0,0 @@ -import logging -from unittest.mock import call, patch - -from django.db.migrations.loader import MigrationLoader -from django.test import TestCase - -from .base import MigrationsTestCase, connection - -log = logging.getLogger(__name__) - - -class SpanishInquisition(MigrationsTestCase): - app = "api" - migration_prior = "scragly" - migration_target = "kosa" - - -@patch("pydis_site.apps.api.tests.migrations.base.MigrationExecutor") -class MigrationsTestCaseNoSideEffectsTests(TestCase): - """Tests the MigrationTestCase class with actual migration side effects disabled.""" - - def setUp(self): - """Set up an instance of MigrationsTestCase for use in tests.""" - self.test_case = SpanishInquisition() - - def test_missing_app_class_raises_value_error(self, _migration_executor): - """A MigrationsTestCase subclass should set the class-attribute `app`.""" - class Spam(MigrationsTestCase): - pass - - spam = Spam() - with self.assertRaises(ValueError, msg="The `app` attribute was not set."): - spam.setUpTestData() - - def test_missing_migration_class_attributes_raise_value_error(self, _migration_executor): - """A MigrationsTestCase subclass should set both `migration_prior` and `migration_target`""" - class Eggs(MigrationsTestCase): - app = "api" - migration_target = "lemon" - - class Bacon(MigrationsTestCase): - app = "api" - migration_prior = "mark" - - instances = (Eggs(), Bacon()) - - exception_message = "Both ` migration_prior` and `migration_target` need to be set." - for instance in instances: - with self.subTest( - migration_prior=instance.migration_prior, - migration_target=instance.migration_target, - ): - with self.assertRaises(ValueError, msg=exception_message): - instance.setUpTestData() - - @patch(f"{__name__}.SpanishInquisition.setUpMigrationData") - @patch(f"{__name__}.SpanishInquisition.setUpPostMigrationData") - def test_migration_data_hooks_are_called_once(self, pre_hook, post_hook, _migration_executor): - """The `setUpMigrationData` and `setUpPostMigrationData` hooks should be called once.""" - self.test_case.setUpTestData() - for hook in (pre_hook, post_hook): - with self.subTest(hook=repr(hook)): - hook.assert_called_once() - - def test_migration_executor_is_instantiated_twice(self, migration_executor): - """The `MigrationExecutor` should be instantiated with the database connection twice.""" - self.test_case.setUpTestData() - - expected_args = [call(connection), call(connection)] - self.assertEqual(migration_executor.call_args_list, expected_args) - - def test_project_state_is_loaded_for_correct_migration_files_twice(self, migration_executor): - """The `project_state` should first be loaded with `migrate_from`, then `migrate_to`.""" - self.test_case.setUpTestData() - - expected_args = [call(self.test_case.migrate_from), call(self.test_case.migrate_to)] - self.assertEqual(migration_executor().loader.project_state.call_args_list, expected_args) - - def test_loader_build_graph_gets_called_once(self, migration_executor): - """We should rebuild the migration graph before applying the second set of migrations.""" - self.test_case.setUpTestData() - - migration_executor().loader.build_graph.assert_called_once() - - def test_migration_executor_migrate_method_is_called_correctly_twice(self, migration_executor): - """The migrate method of the executor should be called twice with the correct arguments.""" - self.test_case.setUpTestData() - - self.assertEqual(migration_executor().migrate.call_count, 2) - calls = [call([('api', 'scragly')]), call([('api', 'kosa')])] - migration_executor().migrate.assert_has_calls(calls) - - -class LifeOfBrian(MigrationsTestCase): - app = "api" - migration_prior = "0046_reminder_jump_url" - migration_target = "0048_add_infractions_unique_constraints_active" - - @classmethod - def log_last_migration(cls): - """Parses the applied migrations dictionary to log the last applied migration.""" - loader = MigrationLoader(connection) - api_migrations = [ - migration for app, migration in loader.applied_migrations if app == cls.app - ] - last_migration = max(api_migrations, key=lambda name: int(name[:4])) - log.info(f"The last applied migration: {last_migration}") - - @classmethod - def setUpMigrationData(cls, apps): - """Method that logs the last applied migration at this point.""" - cls.log_last_migration() - - @classmethod - def setUpPostMigrationData(cls, apps): - """Method that logs the last applied migration at this point.""" - cls.log_last_migration() - - -class MigrationsTestCaseMigrationTest(TestCase): - """Tests if `MigrationsTestCase` travels to the right points in the migration history.""" - - def test_migrations_test_case_travels_to_correct_migrations_in_history(self): - """The test case should first revert to `migration_prior`, then go to `migration_target`.""" - brian = LifeOfBrian() - - with self.assertLogs(log, level=logging.INFO) as logs: - brian.setUpTestData() - - self.assertEqual(len(logs.records), 2) - - for time_point, record in zip(("migration_prior", "migration_target"), logs.records): - with self.subTest(time_point=time_point): - message = f"The last applied migration: {getattr(brian, time_point)}" - self.assertEqual(record.getMessage(), message) diff --git a/pydis_site/apps/api/tests/test_bumped_threads.py b/pydis_site/apps/api/tests/test_bumped_threads.py new file mode 100644 index 00000000..2e3892c7 --- /dev/null +++ b/pydis_site/apps/api/tests/test_bumped_threads.py @@ -0,0 +1,63 @@ +from django.urls import reverse + +from .base import AuthenticatedAPITestCase +from pydis_site.apps.api.models import BumpedThread + + +class UnauthedBumpedThreadAPITests(AuthenticatedAPITestCase): + def setUp(self): + super().setUp() + self.client.force_authenticate(user=None) + + def test_detail_lookup_returns_401(self): + url = reverse('api:bot:bumpedthread-detail', args=(1,)) + response = self.client.get(url) + + self.assertEqual(response.status_code, 401) + + def test_list_returns_401(self): + url = reverse('api:bot:bumpedthread-list') + response = self.client.get(url) + + self.assertEqual(response.status_code, 401) + + def test_create_returns_401(self): + url = reverse('api:bot:bumpedthread-list') + response = self.client.post(url, {"thread_id": 3}) + + self.assertEqual(response.status_code, 401) + + def test_delete_returns_401(self): + url = reverse('api:bot:bumpedthread-detail', args=(1,)) + response = self.client.delete(url) + + self.assertEqual(response.status_code, 401) + + +class BumpedThreadAPITests(AuthenticatedAPITestCase): + @classmethod + def setUpTestData(cls): + cls.thread1 = BumpedThread.objects.create( + thread_id=1234, + ) + + def test_returns_bumped_threads_as_flat_list(self): + url = reverse('api:bot:bumpedthread-list') + + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), [1234]) + + def test_returns_204_for_existing_data(self): + url = reverse('api:bot:bumpedthread-detail', args=(1234,)) + + response = self.client.get(url) + self.assertEqual(response.status_code, 204) + self.assertEqual(response.content, b"") + + def test_returns_404_for_non_existing_data(self): + url = reverse('api:bot:bumpedthread-detail', args=(42,)) + + response = self.client.get(url) + self.assertEqual(response.status_code, 404) + self.assertEqual(response.json(), {"detail": "Not found."}) diff --git a/pydis_site/apps/api/tests/test_deleted_messages.py b/pydis_site/apps/api/tests/test_deleted_messages.py index 1eb535d8..d5501202 100644 --- a/pydis_site/apps/api/tests/test_deleted_messages.py +++ b/pydis_site/apps/api/tests/test_deleted_messages.py @@ -1,10 +1,9 @@ -from datetime import datetime +from datetime import UTC, datetime from django.urls import reverse -from django.utils import timezone from .base import AuthenticatedAPITestCase -from ..models import MessageDeletionContext, User +from pydis_site.apps.api.models import MessageDeletionContext, User class DeletedMessagesWithoutActorTests(AuthenticatedAPITestCase): @@ -18,7 +17,7 @@ class DeletedMessagesWithoutActorTests(AuthenticatedAPITestCase): cls.data = { 'actor': None, - 'creation': datetime.utcnow().isoformat(), + 'creation': datetime.now(tz=UTC).isoformat(), 'deletedmessage_set': [ { 'author': cls.author.id, @@ -58,7 +57,7 @@ class DeletedMessagesWithActorTests(AuthenticatedAPITestCase): cls.data = { 'actor': cls.actor.id, - 'creation': datetime.utcnow().isoformat(), + 'creation': datetime.now(tz=UTC).isoformat(), 'deletedmessage_set': [ { 'author': cls.author.id, @@ -90,7 +89,7 @@ class DeletedMessagesLogURLTests(AuthenticatedAPITestCase): cls.deletion_context = MessageDeletionContext.objects.create( actor=cls.actor, - creation=timezone.now() + creation=datetime.now(tz=UTC), ) def test_valid_log_url(self): diff --git a/pydis_site/apps/api/tests/test_documentation_links.py b/pydis_site/apps/api/tests/test_documentation_links.py index 4e238cbb..f4a332cb 100644 --- a/pydis_site/apps/api/tests/test_documentation_links.py +++ b/pydis_site/apps/api/tests/test_documentation_links.py @@ -1,7 +1,7 @@ from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import DocumentationLink +from pydis_site.apps.api.models import DocumentationLink class UnauthedDocumentationLinkAPITests(AuthenticatedAPITestCase): diff --git a/pydis_site/apps/api/tests/test_filterlists.py b/pydis_site/apps/api/tests/test_filterlists.py deleted file mode 100644 index 5a5bca60..00000000 --- a/pydis_site/apps/api/tests/test_filterlists.py +++ /dev/null @@ -1,122 +0,0 @@ -from django.urls import reverse - -from pydis_site.apps.api.models import FilterList -from pydis_site.apps.api.tests.base import AuthenticatedAPITestCase - -URL = reverse('api:bot:filterlist-list') -JPEG_ALLOWLIST = { - "type": 'FILE_FORMAT', - "allowed": True, - "content": ".jpeg", -} -PNG_ALLOWLIST = { - "type": 'FILE_FORMAT', - "allowed": True, - "content": ".png", -} - - -class UnauthenticatedTests(AuthenticatedAPITestCase): - def setUp(self): - super().setUp() - self.client.force_authenticate(user=None) - - def test_cannot_read_allowedlist_list(self): - response = self.client.get(URL) - - self.assertEqual(response.status_code, 401) - - -class EmptyDatabaseTests(AuthenticatedAPITestCase): - @classmethod - def setUpTestData(cls): - FilterList.objects.all().delete() - - def test_returns_empty_object(self): - response = self.client.get(URL) - - self.assertEqual(response.status_code, 200) - self.assertEqual(response.json(), []) - - -class FetchTests(AuthenticatedAPITestCase): - @classmethod - def setUpTestData(cls): - FilterList.objects.all().delete() - cls.jpeg_format = FilterList.objects.create(**JPEG_ALLOWLIST) - cls.png_format = FilterList.objects.create(**PNG_ALLOWLIST) - - def test_returns_name_in_list(self): - response = self.client.get(URL) - - self.assertEqual(response.status_code, 200) - self.assertEqual(response.json()[0]["content"], self.jpeg_format.content) - self.assertEqual(response.json()[1]["content"], self.png_format.content) - - def test_returns_single_item_by_id(self): - response = self.client.get(f'{URL}/{self.jpeg_format.id}') - - self.assertEqual(response.status_code, 200) - self.assertEqual(response.json().get("content"), self.jpeg_format.content) - - def test_returns_filter_list_types(self): - response = self.client.get(f'{URL}/get-types') - - self.assertEqual(response.status_code, 200) - for api_type, model_type in zip(response.json(), FilterList.FilterListType.choices): - self.assertEquals(api_type[0], model_type[0]) - self.assertEquals(api_type[1], model_type[1]) - - -class CreationTests(AuthenticatedAPITestCase): - @classmethod - def setUpTestData(cls): - FilterList.objects.all().delete() - - def test_returns_400_for_missing_params(self): - no_type_json = { - "allowed": True, - "content": ".jpeg" - } - no_allowed_json = { - "type": "FILE_FORMAT", - "content": ".jpeg" - } - no_content_json = { - "allowed": True, - "type": "FILE_FORMAT" - } - cases = [{}, no_type_json, no_allowed_json, no_content_json] - - for case in cases: - with self.subTest(case=case): - response = self.client.post(URL, data=case) - self.assertEqual(response.status_code, 400) - - def test_returns_201_for_successful_creation(self): - response = self.client.post(URL, data=JPEG_ALLOWLIST) - self.assertEqual(response.status_code, 201) - - def test_returns_400_for_duplicate_creation(self): - self.client.post(URL, data=JPEG_ALLOWLIST) - response = self.client.post(URL, data=JPEG_ALLOWLIST) - self.assertEqual(response.status_code, 400) - - -class DeletionTests(AuthenticatedAPITestCase): - @classmethod - def setUpTestData(cls): - FilterList.objects.all().delete() - cls.jpeg_format = FilterList.objects.create(**JPEG_ALLOWLIST) - cls.png_format = FilterList.objects.create(**PNG_ALLOWLIST) - - def test_deleting_unknown_id_returns_404(self): - response = self.client.delete(f"{URL}/200") - self.assertEqual(response.status_code, 404) - - def test_deleting_known_id_returns_204(self): - response = self.client.delete(f"{URL}/{self.jpeg_format.id}") - self.assertEqual(response.status_code, 204) - - response = self.client.get(f"{URL}/{self.jpeg_format.id}") - self.assertNotIn(self.png_format.content, response.json()) diff --git a/pydis_site/apps/api/tests/test_filters.py b/pydis_site/apps/api/tests/test_filters.py new file mode 100644 index 00000000..4cef1c8f --- /dev/null +++ b/pydis_site/apps/api/tests/test_filters.py @@ -0,0 +1,352 @@ +import contextlib +from dataclasses import dataclass +from datetime import timedelta +from typing import Any + +from django.db.models import Model +from django.urls import reverse + +from pydis_site.apps.api.models.bot.filters import Filter, FilterList +from pydis_site.apps.api.tests.base import AuthenticatedAPITestCase + + +@dataclass() +class TestSequence: + model: type[Model] + route: str + object: dict[str, Any] + ignored_fields: tuple[str, ...] = () + + def url(self, detail: bool = False) -> str: + return reverse(f'api:bot:{self.route}-{"detail" if detail else "list"}') + + +FK_FIELDS: dict[type[Model], tuple[str, ...]] = { + FilterList: (), + Filter: ("filter_list",), +} + + +def get_test_sequences() -> dict[str, TestSequence]: + filter_list1_deny_dict = { + "name": "testname", + "list_type": 0, + "guild_pings": [], + "filter_dm": True, + "dm_pings": [], + "remove_context": False, + "bypass_roles": [], + "enabled": True, + "dm_content": "", + "dm_embed": "", + "infraction_type": "NONE", + "infraction_reason": "", + "infraction_duration": timedelta(seconds=0), + "infraction_channel": 0, + "disabled_channels": [], + "disabled_categories": [], + "enabled_channels": [], + "enabled_categories": [], + "send_alert": True + } + filter_list1_allow_dict = filter_list1_deny_dict.copy() + filter_list1_allow_dict["list_type"] = 1 + filter_list1_allow = FilterList(**filter_list1_allow_dict) + + return { + "filter_list1": TestSequence( + FilterList, + "filterlist", + filter_list1_deny_dict, + ignored_fields=("filters", "created_at", "updated_at") + ), + "filter_list2": TestSequence( + FilterList, + "filterlist", + { + "name": "testname2", + "list_type": 1, + "guild_pings": ["Moderators"], + "filter_dm": False, + "dm_pings": ["here"], + "remove_context": True, + "bypass_roles": ["123456"], + "enabled": False, + "dm_content": "testing testing", + "dm_embed": "one two three", + "infraction_type": "TIMEOUT", + "infraction_reason": "stop testing", + "infraction_duration": timedelta(seconds=10.5), + "infraction_channel": 123, + "disabled_channels": ["python-general"], + "disabled_categories": ["CODE JAM"], + "enabled_channels": ["mighty-mice"], + "enabled_categories": ["Lobby"], + "send_alert": False + }, + ignored_fields=("filters", "created_at", "updated_at") + ), + "filter": TestSequence( + Filter, + "filter", + { + "content": "bad word", + "description": "This is a really bad word.", + "additional_settings": "{'hi': 'there'}", + "guild_pings": None, + "filter_dm": None, + "dm_pings": None, + "remove_context": None, + "bypass_roles": None, + "enabled": None, + "dm_content": None, + "dm_embed": None, + "infraction_type": None, + "infraction_reason": None, + "infraction_duration": None, + "infraction_channel": None, + "disabled_channels": None, + "disabled_categories": None, + "enabled_channels": None, + "enabled_categories": None, + "send_alert": None, + "filter_list": filter_list1_allow + }, + ignored_fields=("created_at", "updated_at") + ), + } + + +def save_nested_objects(object_: Model, save_root: bool = True) -> None: + for field in FK_FIELDS.get(object_.__class__, ()): + value = getattr(object_, field) + save_nested_objects(value) + + if save_root: + object_.save() + + +def clean_test_json(json: dict) -> dict: + for key, value in json.items(): + if isinstance(value, Model): + json[key] = value.id + elif isinstance(value, timedelta): + json[key] = str(value.total_seconds()) + + return json + + +def clean_api_json(json: dict, sequence: TestSequence) -> dict: + for field in sequence.ignored_fields + ("id",): + with contextlib.suppress(KeyError): + del json[field] + + return json + + +def flatten_settings(json: dict) -> dict: + settings = json.pop("settings", {}) + flattened_settings = {} + for entry, value in settings.items(): + if isinstance(value, dict): + flattened_settings.update(value) + else: + flattened_settings[entry] = value + + json.update(flattened_settings) + + return json + + +class GenericFilterTests(AuthenticatedAPITestCase): + + def test_cannot_read_unauthenticated(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + self.client.force_authenticate(user=None) + + response = self.client.get(sequence.url()) + self.assertEqual(response.status_code, 401) + + def test_empty_database(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + sequence.model.objects.all().delete() + + response = self.client.get(sequence.url()) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), []) + + def test_fetch(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + sequence.model.objects.all().delete() + + save_nested_objects(sequence.model(**sequence.object)) + + response = self.client.get(sequence.url()) + self.assertDictEqual( + clean_test_json(sequence.object), + clean_api_json(flatten_settings(response.json()[0]), sequence) + ) + + def test_fetch_by_id(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + sequence.model.objects.all().delete() + + saved = sequence.model(**sequence.object) + save_nested_objects(saved) + + response = self.client.get(f"{sequence.url()}/{saved.id}") + self.assertDictEqual( + clean_test_json(sequence.object), + clean_api_json(flatten_settings(response.json()), sequence) + ) + + def test_fetch_non_existing(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + sequence.model.objects.all().delete() + + response = self.client.get(f"{sequence.url()}/42") + self.assertEqual(response.status_code, 404) + self.assertDictEqual(response.json(), {'detail': 'Not found.'}) + + def test_creation(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + sequence.model.objects.all().delete() + + save_nested_objects(sequence.model(**sequence.object), False) + data = clean_test_json(sequence.object.copy()) + response = self.client.post(sequence.url(), data=data) + + self.assertEqual(response.status_code, 201) + self.assertDictEqual( + clean_api_json(flatten_settings(response.json()), sequence), + clean_test_json(sequence.object) + ) + + def test_creation_missing_field(self) -> None: + for name, sequence in get_test_sequences().items(): + ignored_fields = sequence.ignored_fields + ("id", "additional_settings") + with self.subTest(name=name): + saved = sequence.model(**sequence.object) + save_nested_objects(saved) + data = clean_test_json(sequence.object.copy()) + + for field in sequence.model._meta.get_fields(): + with self.subTest(field=field): + if field.null or field.name in ignored_fields: + continue + + test_data = data.copy() + del test_data[field.name] + + response = self.client.post(sequence.url(), data=test_data) + self.assertEqual(response.status_code, 400) + + def test_deletion(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + saved = sequence.model(**sequence.object) + save_nested_objects(saved) + + response = self.client.delete(f"{sequence.url()}/{saved.id}") + self.assertEqual(response.status_code, 204) + + def test_deletion_non_existing(self) -> None: + for name, sequence in get_test_sequences().items(): + with self.subTest(name=name): + sequence.model.objects.all().delete() + + response = self.client.delete(f"{sequence.url()}/42") + self.assertEqual(response.status_code, 404) + + +class FilterValidationTests(AuthenticatedAPITestCase): + + def test_filter_validation(self) -> None: + test_sequences = get_test_sequences() + base_filter = test_sequences["filter"] + base_filter_list = test_sequences["filter_list1"] + cases = ( + ({"infraction_reason": "hi"}, {}, 400), + ({"infraction_duration": timedelta(seconds=10)}, {}, 400), + ({"infraction_reason": "hi"}, {"infraction_type": "NOTE"}, 200), + ({"infraction_type": "TIMEOUT", "infraction_duration": timedelta(days=30)}, {}, 400), + ({"infraction_duration": timedelta(seconds=10)}, {"infraction_type": "TIMEOUT"}, 200), + ({"enabled_channels": ["admins"]}, {}, 200), + ({"disabled_channels": ["123"]}, {}, 200), + ({"enabled_categories": ["CODE JAM"]}, {}, 200), + ({"disabled_categories": ["CODE JAM"]}, {}, 200), + ({"enabled_channels": ["admins"], "disabled_channels": ["123", "admins"]}, {}, 400), + ({"enabled_categories": ["admins"], "disabled_categories": ["123", "admins"]}, {}, 400), + ({"enabled_channels": ["admins"]}, {"disabled_channels": ["123", "admins"]}, 400), + ({"enabled_categories": ["admins"]}, {"disabled_categories": ["123", "admins"]}, 400), + ) + + for filter_settings, filter_list_settings, response_code in cases: + with self.subTest( + f_settings=filter_settings, fl_settings=filter_list_settings, response=response_code + ): + base_filter.model.objects.all().delete() + base_filter_list.model.objects.all().delete() + + case_filter_dict = base_filter.object.copy() + case_fl_dict = base_filter_list.object.copy() + case_fl_dict.update(filter_list_settings) + + case_fl = base_filter_list.model(**case_fl_dict) + case_filter_dict["filter_list"] = case_fl + case_filter = base_filter.model(**case_filter_dict) + save_nested_objects(case_filter) + + filter_settings["filter_list"] = case_fl + response = self.client.patch( + f"{base_filter.url()}/{case_filter.id}", data=clean_test_json(filter_settings) + ) + self.assertEqual(response.status_code, response_code) + + def test_filter_list_validation(self) -> None: + test_sequences = get_test_sequences() + base_filter_list = test_sequences["filter_list1"] + cases = ( + ({"infraction_reason": "hi"}, 400), + ({"infraction_duration": timedelta(seconds=10)}, 400), + ({"infraction_type": "TIMEOUT", "infraction_duration": timedelta(days=30)}, 400), + ({"infraction_reason": "hi", "infraction_type": "NOTE"}, 200), + ({"infraction_duration": timedelta(seconds=10), "infraction_type": "TIMEOUT"}, 200), + ({"enabled_channels": ["admins"]}, 200), ({"disabled_channels": ["123"]}, 200), + ({"enabled_categories": ["CODE JAM"]}, 200), + ({"disabled_categories": ["CODE JAM"]}, 200), + ({"enabled_channels": ["admins"], "disabled_channels": ["123", "admins"]}, 400), + ({"enabled_categories": ["admins"], "disabled_categories": ["123", "admins"]}, 400), + ) + + for filter_list_settings, response_code in cases: + with self.subTest(fl_settings=filter_list_settings, response=response_code): + base_filter_list.model.objects.all().delete() + + case_fl_dict = base_filter_list.object.copy() + case_fl = base_filter_list.model(**case_fl_dict) + save_nested_objects(case_fl) + + response = self.client.patch( + f"{base_filter_list.url()}/{case_fl.id}", + data=clean_test_json(filter_list_settings) + ) + self.assertEqual(response.status_code, response_code) + + def test_filter_unique_constraint(self) -> None: + test_filter = get_test_sequences()["filter"] + test_filter.model.objects.all().delete() + test_filter_object = test_filter.model(**test_filter.object) + save_nested_objects(test_filter_object, False) + + response = self.client.post(test_filter.url(), data=clean_test_json(test_filter.object)) + self.assertEqual(response.status_code, 201) + + response = self.client.post(test_filter.url(), data=clean_test_json(test_filter.object)) + self.assertEqual(response.status_code, 400) diff --git a/pydis_site/apps/api/tests/test_github_utils.py b/pydis_site/apps/api/tests/test_github_utils.py new file mode 100644 index 00000000..d36111c9 --- /dev/null +++ b/pydis_site/apps/api/tests/test_github_utils.py @@ -0,0 +1,289 @@ +import dataclasses +import datetime +import typing +import unittest +from unittest import mock + +import django.test +import httpx +import jwt +import rest_framework.response +import rest_framework.test +from django.urls import reverse + +from pydis_site import settings +from pydis_site.apps.api import github_utils + + +class GeneralUtilityTests(unittest.TestCase): + """Test the utility methods which do not fit in another class.""" + + def test_token_generation(self): + """Test that the a valid JWT token is generated.""" + def encode(payload: dict, _: str, algorithm: str, *args, **kwargs) -> str: + """ + Intercept the encode method. + + The result is encoded with an algorithm which does not require a PEM key, as it may + not be available in testing environments. + """ + self.assertEqual("RS256", algorithm, "The GitHub App JWT must be signed using RS256.") + return original_encode( + payload, "secret-encoding-key", *args, algorithm="HS256", **kwargs + ) + + original_encode = jwt.encode + with mock.patch("jwt.encode", new=encode): + token = github_utils.generate_token() + decoded = jwt.decode(token, "secret-encoding-key", algorithms=["HS256"]) + + delta = datetime.timedelta(minutes=10) + self.assertAlmostEqual(decoded["exp"] - decoded["iat"], delta.total_seconds()) + then = datetime.datetime.now(tz=datetime.UTC) + delta + self.assertLess(decoded["exp"], then.timestamp()) + + +class CheckRunTests(unittest.TestCase): + """Tests the check_run_status utility.""" + + run_kwargs: typing.Mapping = { + "name": "run_name", + "head_sha": "sha", + "status": "completed", + "conclusion": "success", + "created_at": datetime.datetime.now(tz=datetime.UTC).strftime(settings.GITHUB_TIMESTAMP_FORMAT), + "artifacts_url": "url", + } + + def test_completed_run(self): + """Test that an already completed run returns the correct URL.""" + final_url = "some_url_string_1234" + + kwargs = dict(self.run_kwargs, artifacts_url=final_url) + result = github_utils.check_run_status(github_utils.WorkflowRun(**kwargs)) + self.assertEqual(final_url, result) + + def test_pending_run(self): + """Test that a pending run raises the proper exception.""" + kwargs = dict(self.run_kwargs, status="pending") + with self.assertRaises(github_utils.RunPendingError): + github_utils.check_run_status(github_utils.WorkflowRun(**kwargs)) + + def test_timeout_error(self): + """Test that a timeout is declared after a certain duration.""" + kwargs = dict(self.run_kwargs, status="pending") + # Set the creation time to well before the MAX_RUN_TIME + # to guarantee the right conclusion + kwargs["created_at"] = ( + datetime.datetime.now(tz=datetime.UTC) + - github_utils.MAX_RUN_TIME - datetime.timedelta(minutes=10) + ).strftime(settings.GITHUB_TIMESTAMP_FORMAT) + + with self.assertRaises(github_utils.RunTimeoutError): + github_utils.check_run_status(github_utils.WorkflowRun(**kwargs)) + + def test_failed_run(self): + """Test that a failed run raises the proper exception.""" + kwargs = dict(self.run_kwargs, conclusion="failed") + with self.assertRaises(github_utils.ActionFailedError): + github_utils.check_run_status(github_utils.WorkflowRun(**kwargs)) + + +def get_response_authorize(_: httpx.Client, request: httpx.Request, **__) -> httpx.Response: + """ + Helper method for the authorize tests. + + Requests are intercepted before being sent out, and the appropriate responses are returned. + """ + path = request.url.path + auth = request.headers.get("Authorization") + + if request.method == "GET": + if path == "/app/installations": + if auth == "bearer JWT initial token": + return httpx.Response(200, request=request, json=[{ + "account": {"login": "VALID_OWNER"}, + "access_tokens_url": "https://example.com/ACCESS_TOKEN_URL" + }]) + return httpx.Response( + 401, json={"error": "auth app/installations"}, request=request + ) + + elif path == "/installation/repositories": # noqa: RET505 + if auth == "bearer app access token": + return httpx.Response(200, request=request, json={ + "repositories": [{ + "name": "VALID_REPO" + }] + }) + return httpx.Response( # pragma: no cover + 401, json={"error": "auth installation/repositories"}, request=request + ) + + elif request.method == "POST": # noqa: RET505 + if path == "/ACCESS_TOKEN_URL": + if auth == "bearer JWT initial token": + return httpx.Response(200, request=request, json={"token": "app access token"}) + return httpx.Response(401, json={"error": "auth access_token"}, request=request) # pragma: no cover + + # Reaching this point means something has gone wrong + return httpx.Response(500, request=request) # pragma: no cover + + [email protected]("httpx.Client.send", new=get_response_authorize) [email protected](github_utils, "generate_token", new=mock.Mock(return_value="JWT initial token")) +class AuthorizeTests(unittest.TestCase): + """Test the authorize utility.""" + + def test_invalid_apps_auth(self): + """Test that an exception is raised if authorization was attempted with an invalid token.""" + with mock.patch.object(github_utils, "generate_token", return_value="Invalid token"): # noqa: SIM117 + with self.assertRaises(httpx.HTTPStatusError) as error: + github_utils.authorize("VALID_OWNER", "VALID_REPO") + + exception: httpx.HTTPStatusError = error.exception + self.assertEqual(401, exception.response.status_code) + self.assertEqual("auth app/installations", exception.response.json()["error"]) + + def test_missing_repo(self): + """Test that an exception is raised when the selected owner or repo are not available.""" + with self.assertRaises(github_utils.NotFoundError): + github_utils.authorize("INVALID_OWNER", "VALID_REPO") + with self.assertRaises(github_utils.NotFoundError): + github_utils.authorize("VALID_OWNER", "INVALID_REPO") + + def test_valid_authorization(self): + """Test that an accessible repository can be accessed.""" + client = github_utils.authorize("VALID_OWNER", "VALID_REPO") + self.assertEqual("bearer app access token", client.headers.get("Authorization")) + + +class ArtifactFetcherTests(unittest.TestCase): + """Test the get_artifact utility.""" + + @staticmethod + def get_response_get_artifact(request: httpx.Request, **_) -> httpx.Response: + """ + Helper method for the get_artifact tests. + + Requests are intercepted before being sent out, and the appropriate responses are returned. + """ + path = request.url.path + + if "force_error" in path: + return httpx.Response(404, request=request) + + if request.method == "GET": + if path == "/repos/owner/repo/actions/runs": + run = github_utils.WorkflowRun( + name="action_name", + head_sha="action_sha", + created_at=( + datetime.datetime + .now(tz=datetime.UTC) + .strftime(settings.GITHUB_TIMESTAMP_FORMAT) + ), + status="completed", + conclusion="success", + artifacts_url="artifacts_url" + ) + return httpx.Response( + 200, request=request, json={"workflow_runs": [dataclasses.asdict(run)]} + ) + elif path == "/artifact_url": # noqa: RET505 + return httpx.Response( + 200, request=request, json={"artifacts": [{ + "name": "artifact_name", + "archive_download_url": "artifact_download_url" + }]} + ) + elif path == "/artifact_download_url": + response = httpx.Response(302, request=request) + response.next_request = httpx.Request( + "GET", + httpx.URL("https://final_download.url") + ) + return response + + # Reaching this point means something has gone wrong + return httpx.Response(500, request=request) # pragma: no cover + + def setUp(self) -> None: + self.call_args = ["owner", "repo", "action_sha", "action_name", "artifact_name"] + self.client = httpx.Client(base_url="https://example.com") + + self.patchers = [ + mock.patch.object(self.client, "send", new=self.get_response_get_artifact), + mock.patch.object(github_utils, "authorize", return_value=self.client), + mock.patch.object(github_utils, "check_run_status", return_value="artifact_url"), + ] + + for patcher in self.patchers: + patcher.start() + + def tearDown(self) -> None: + for patcher in self.patchers: + patcher.stop() + + def test_client_closed_on_errors(self): + """Test that the client is terminated even if an error occurs at some point.""" + self.call_args[0] = "force_error" + with self.assertRaises(httpx.HTTPStatusError): + github_utils.get_artifact(*self.call_args) + self.assertTrue(self.client.is_closed) + + def test_missing(self): + """Test that an exception is raised if the requested artifact was not found.""" + cases = ( + "invalid sha", + "invalid action name", + "invalid artifact name", + ) + for i, name in enumerate(cases, 2): + with self.subTest(f"Test {name} raises an error"): + new_args = self.call_args.copy() + new_args[i] = name + + with self.assertRaises(github_utils.NotFoundError): + github_utils.get_artifact(*new_args) + + def test_valid(self): + """Test that the correct download URL is returned for valid requests.""" + url = github_utils.get_artifact(*self.call_args) + self.assertEqual("https://final_download.url", url) + self.assertTrue(self.client.is_closed) + + [email protected](github_utils, "get_artifact") +class GitHubArtifactViewTests(django.test.TestCase): + """Test the GitHub artifact fetch API view.""" + + def setUp(self): + self.kwargs = { + "owner": "test_owner", + "repo": "test_repo", + "sha": "test_sha", + "action_name": "test_action", + "artifact_name": "test_artifact", + } + self.url = reverse("api:github-artifacts", kwargs=self.kwargs) + + def test_correct_artifact(self, artifact_mock: mock.Mock): + """Test a proper response is returned with proper input.""" + artifact_mock.return_value = "final download url" + result = self.client.get(self.url) + + self.assertIsInstance(result, rest_framework.response.Response) + self.assertEqual({"url": artifact_mock.return_value}, result.data) + + def test_failed_fetch(self, artifact_mock: mock.Mock): + """Test that a proper error is returned when the request fails.""" + artifact_mock.side_effect = github_utils.NotFoundError("Test error message") + result = self.client.get(self.url) + + self.assertIsInstance(result, rest_framework.response.Response) + self.assertEqual({ + "error_type": github_utils.NotFoundError.__name__, + "error": "Test error message", + "requested_resource": "/".join(self.kwargs.values()) + }, result.data) diff --git a/pydis_site/apps/api/tests/test_github_webhook_filter.py b/pydis_site/apps/api/tests/test_github_webhook_filter.py new file mode 100644 index 00000000..8ca60511 --- /dev/null +++ b/pydis_site/apps/api/tests/test_github_webhook_filter.py @@ -0,0 +1,62 @@ +from unittest import mock +from urllib.error import HTTPError + +from django.urls import reverse +from rest_framework.test import APITestCase + +from pydis_site.apps.api.views import GitHubWebhookFilterView + +class GitHubWebhookFilterAPITests(APITestCase): + def test_ignores_bot_sender(self): + url = reverse('api:github-webhook-filter', args=('id', 'token')) + payload = {'sender': {'login': 'limette', 'type': 'bot'}} + headers = {'X-GitHub-Event': 'pull_request_review'} + response = self.client.post(url, data=payload, headers=headers) + self.assertEqual(response.status_code, 203) + + def test_accepts_interesting_events(self): + url = reverse('api:github-webhook-filter', args=('id', 'token')) + payload = { + 'ref': 'refs/heads/master', + 'pull_request': { + 'user': { + 'login': "lemon", + } + }, + 'review': { + 'state': 'commented', + 'body': "Amazing!!!" + }, + 'repository': { + 'name': 'black', + 'owner': { + 'login': 'psf', + } + } + } + headers = {'X-GitHub-Event': 'pull_request_review'} + + with mock.patch('urllib.request.urlopen') as urlopen: + urlopen.return_value = mock.MagicMock() + context_mock = urlopen.return_value.__enter__.return_value + context_mock.status = 299 + context_mock.getheaders.return_value = [('X-Clacks-Overhead', 'Joe Armstrong')] + context_mock.read.return_value = b'{"status": "ok"}' + + response = self.client.post(url, data=payload, headers=headers) + self.assertEqual(response.status_code, context_mock.status) + self.assertEqual(response.headers.get('X-Clacks-Overhead'), 'Joe Armstrong') + + def test_rate_limit_is_logged_to_sentry(self): + url = reverse('api:github-webhook-filter', args=('id', 'token')) + payload = {} + headers = {'X-GitHub-Event': 'pull_request_review'} + with ( + mock.patch('urllib.request.urlopen') as urlopen, + mock.patch.object(GitHubWebhookFilterView, "logger") as logger, + ): + urlopen.side_effect = HTTPError(None, 429, 'Too Many Requests', {}, None) + logger.warning = mock.PropertyMock() + self.client.post(url, data=payload, headers=headers) + + logger.warning.assert_called_once() diff --git a/pydis_site/apps/api/tests/test_infractions.py b/pydis_site/apps/api/tests/test_infractions.py index b3dd16ee..f1e54b1e 100644 --- a/pydis_site/apps/api/tests/test_infractions.py +++ b/pydis_site/apps/api/tests/test_infractions.py @@ -1,14 +1,15 @@ import datetime -from datetime import datetime as dt, timedelta, timezone +from datetime import UTC, datetime as dt, timedelta from unittest.mock import patch from urllib.parse import quote +from django.db import transaction from django.db.utils import IntegrityError from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import Infraction, User -from ..serializers import InfractionSerializer +from pydis_site.apps.api.models import Infraction, User +from pydis_site.apps.api.serializers import InfractionSerializer class UnauthenticatedTests(AuthenticatedAPITestCase): @@ -55,23 +56,26 @@ class InfractionTests(AuthenticatedAPITestCase): type='ban', reason='He terk my jerb!', hidden=True, - expires_at=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc), - active=True + inserted_at=dt(2020, 10, 10, 0, 0, 0, tzinfo=UTC), + expires_at=dt(5018, 11, 20, 15, 52, tzinfo=UTC), + active=True, ) cls.ban_inactive = Infraction.objects.create( user_id=cls.user.id, actor_id=cls.user.id, type='ban', reason='James is an ass, and we won\'t be working with him again.', - active=False + active=False, + inserted_at=dt(2020, 10, 10, 0, 1, 0, tzinfo=UTC), ) - cls.mute_permanent = Infraction.objects.create( + cls.timeout_permanent = Infraction.objects.create( user_id=cls.user.id, actor_id=cls.user.id, - type='mute', + type='timeout', reason='He has a filthy mouth and I am his soap.', active=True, - expires_at=None + inserted_at=dt(2020, 10, 10, 0, 2, 0, tzinfo=UTC), + expires_at=None, ) cls.superstar_expires_soon = Infraction.objects.create( user_id=cls.user.id, @@ -79,7 +83,8 @@ class InfractionTests(AuthenticatedAPITestCase): type='superstar', reason='This one doesn\'t matter anymore.', active=True, - expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=5) + inserted_at=dt(2020, 10, 10, 0, 3, 0, tzinfo=UTC), + expires_at=dt.now(UTC) + datetime.timedelta(hours=5), ) cls.voiceban_expires_later = Infraction.objects.create( user_id=cls.user.id, @@ -87,7 +92,8 @@ class InfractionTests(AuthenticatedAPITestCase): type='voice_ban', reason='Jet engine mic', active=True, - expires_at=datetime.datetime.utcnow() + datetime.timedelta(days=5) + inserted_at=dt(2020, 10, 10, 0, 4, 0, tzinfo=UTC), + expires_at=dt.now(UTC) + datetime.timedelta(days=5), ) def test_list_all(self): @@ -101,7 +107,7 @@ class InfractionTests(AuthenticatedAPITestCase): self.assertEqual(len(infractions), 5) self.assertEqual(infractions[0]['id'], self.voiceban_expires_later.id) self.assertEqual(infractions[1]['id'], self.superstar_expires_soon.id) - self.assertEqual(infractions[2]['id'], self.mute_permanent.id) + self.assertEqual(infractions[2]['id'], self.timeout_permanent.id) self.assertEqual(infractions[3]['id'], self.ban_inactive.id) self.assertEqual(infractions[4]['id'], self.ban_hidden.id) @@ -128,7 +134,7 @@ class InfractionTests(AuthenticatedAPITestCase): def test_filter_permanent_false(self): url = reverse('api:bot:infraction-list') - response = self.client.get(f'{url}?type=mute&permanent=false') + response = self.client.get(f'{url}?type=timeout&permanent=false') self.assertEqual(response.status_code, 200) infractions = response.json() @@ -137,17 +143,17 @@ class InfractionTests(AuthenticatedAPITestCase): def test_filter_permanent_true(self): url = reverse('api:bot:infraction-list') - response = self.client.get(f'{url}?type=mute&permanent=true') + response = self.client.get(f'{url}?type=timeout&permanent=true') self.assertEqual(response.status_code, 200) infractions = response.json() - self.assertEqual(infractions[0]['id'], self.mute_permanent.id) + self.assertEqual(infractions[0]['id'], self.timeout_permanent.id) def test_filter_after(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5) - response = self.client.get(f'{url}?type=superstar&expires_after={target_time.isoformat()}') + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5) + response = self.client.get(url, {'type': 'superstar', 'expires_after': target_time.isoformat()}) self.assertEqual(response.status_code, 200) infractions = response.json() @@ -155,8 +161,8 @@ class InfractionTests(AuthenticatedAPITestCase): def test_filter_before(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5) - response = self.client.get(f'{url}?type=superstar&expires_before={target_time.isoformat()}') + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5) + response = self.client.get(url, {'type': 'superstar', 'expires_before': target_time.isoformat()}) self.assertEqual(response.status_code, 200) infractions = response.json() @@ -168,22 +174,23 @@ class InfractionTests(AuthenticatedAPITestCase): response = self.client.get(f'{url}?expires_after=gibberish') self.assertEqual(response.status_code, 400) - self.assertEqual(list(response.json())[0], "expires_after") + self.assertEqual(next(iter(response.json())), "expires_after") def test_filter_before_invalid(self): url = reverse('api:bot:infraction-list') response = self.client.get(f'{url}?expires_before=000000000') self.assertEqual(response.status_code, 400) - self.assertEqual(list(response.json())[0], "expires_before") + self.assertEqual(next(iter(response.json())), "expires_before") def test_after_before_before(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=4) - target_time_late = datetime.datetime.utcnow() + datetime.timedelta(hours=6) + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=4) + target_time_late = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=6) response = self.client.get( - f'{url}?expires_before={target_time_late.isoformat()}' - f'&expires_after={target_time.isoformat()}' + url, + {'expires_before': target_time_late.isoformat(), + 'expires_after': target_time.isoformat()}, ) self.assertEqual(response.status_code, 200) @@ -192,11 +199,12 @@ class InfractionTests(AuthenticatedAPITestCase): def test_after_after_before_invalid(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5) - target_time_late = datetime.datetime.utcnow() + datetime.timedelta(hours=9) + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5) + target_time_late = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=9) response = self.client.get( - f'{url}?expires_before={target_time.isoformat()}' - f'&expires_after={target_time_late.isoformat()}' + url, + {'expires_before': target_time.isoformat(), + 'expires_after': target_time_late.isoformat()}, ) self.assertEqual(response.status_code, 400) @@ -206,8 +214,11 @@ class InfractionTests(AuthenticatedAPITestCase): def test_permanent_after_invalid(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5) - response = self.client.get(f'{url}?permanent=true&expires_after={target_time.isoformat()}') + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5) + response = self.client.get( + url, + {'permanent': 'true', 'expires_after': target_time.isoformat()}, + ) self.assertEqual(response.status_code, 400) errors = list(response.json()) @@ -215,8 +226,11 @@ class InfractionTests(AuthenticatedAPITestCase): def test_permanent_before_invalid(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5) - response = self.client.get(f'{url}?permanent=true&expires_before={target_time.isoformat()}') + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5) + response = self.client.get( + url, + {'permanent': 'true', 'expires_before': target_time.isoformat()}, + ) self.assertEqual(response.status_code, 400) errors = list(response.json()) @@ -224,9 +238,10 @@ class InfractionTests(AuthenticatedAPITestCase): def test_nonpermanent_before(self): url = reverse('api:bot:infraction-list') - target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=6) + target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=6) response = self.client.get( - f'{url}?permanent=false&expires_before={target_time.isoformat()}' + url, + {'permanent': 'false', 'expires_before': target_time.isoformat()}, ) self.assertEqual(response.status_code, 200) @@ -235,7 +250,7 @@ class InfractionTests(AuthenticatedAPITestCase): def test_filter_manytypes(self): url = reverse('api:bot:infraction-list') - response = self.client.get(f'{url}?types=mute,ban') + response = self.client.get(f'{url}?types=timeout,ban') self.assertEqual(response.status_code, 200) infractions = response.json() @@ -243,7 +258,7 @@ class InfractionTests(AuthenticatedAPITestCase): def test_types_type_invalid(self): url = reverse('api:bot:infraction-list') - response = self.client.get(f'{url}?types=mute,ban&type=superstar') + response = self.client.get(f'{url}?types=timeout,ban&type=superstar') self.assertEqual(response.status_code, 400) errors = list(response.json()) @@ -355,7 +370,7 @@ class CreationTests(AuthenticatedAPITestCase): infraction = Infraction.objects.get(id=response.json()['id']) self.assertAlmostEqual( infraction.inserted_at, - dt.now(timezone.utc), + dt.now(UTC), delta=timedelta(seconds=2) ) self.assertEqual(infraction.expires_at.isoformat(), data['expires_at']) @@ -492,6 +507,7 @@ class CreationTests(AuthenticatedAPITestCase): ) for infraction_type, hidden in restricted_types: + # https://stackoverflow.com/a/23326971 with self.subTest(infraction_type=infraction_type): invalid_infraction = { 'user': self.user.id, @@ -512,10 +528,10 @@ class CreationTests(AuthenticatedAPITestCase): def test_returns_400_for_second_active_infraction_of_the_same_type(self): """Test if the API rejects a second active infraction of the same type for a given user.""" url = reverse('api:bot:infraction-list') - active_infraction_types = ('mute', 'ban', 'superstar') + active_infraction_types = ('timeout', 'ban', 'superstar') for infraction_type in active_infraction_types: - with self.subTest(infraction_type=infraction_type): + with self.subTest(infraction_type=infraction_type), transaction.atomic(): first_active_infraction = { 'user': self.user.id, 'actor': self.user.id, @@ -554,7 +570,7 @@ class CreationTests(AuthenticatedAPITestCase): first_active_infraction = { 'user': self.user.id, 'actor': self.user.id, - 'type': 'mute', + 'type': 'timeout', 'reason': 'Be silent!', 'hidden': True, 'active': True, @@ -641,9 +657,9 @@ class CreationTests(AuthenticatedAPITestCase): Infraction.objects.create( user=self.user, actor=self.user, - type="mute", + type="timeout", active=True, - reason="The first active mute" + reason="The first active timeout" ) def test_unique_constraint_accepts_active_infractions_for_different_users(self): @@ -798,7 +814,7 @@ class SerializerTests(AuthenticatedAPITestCase): actor_id=self.user.id, type=_type, reason='A reason.', - expires_at=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc), + expires_at=dt(5018, 11, 20, 15, 52, tzinfo=UTC), active=active ) @@ -811,22 +827,6 @@ class SerializerTests(AuthenticatedAPITestCase): self.assertTrue(serializer.is_valid(), msg=serializer.errors) - def test_validation_error_if_active_duplicate(self): - self.create_infraction('ban', active=True) - instance = self.create_infraction('ban', active=False) - - data = {'active': True} - serializer = InfractionSerializer(instance, data=data, partial=True) - - if not serializer.is_valid(): - self.assertIn('non_field_errors', serializer.errors) - - code = serializer.errors['non_field_errors'][0].code - msg = f'Expected failure on unique validator but got {serializer.errors}' - self.assertEqual(code, 'unique', msg=msg) - else: # pragma: no cover - self.fail('Validation unexpectedly succeeded.') - def test_is_valid_for_new_active_infraction(self): self.create_infraction('ban', active=False) diff --git a/pydis_site/apps/api/tests/test_models.py b/pydis_site/apps/api/tests/test_models.py index 5c9ddea4..456ac408 100644 --- a/pydis_site/apps/api/tests/test_models.py +++ b/pydis_site/apps/api/tests/test_models.py @@ -1,14 +1,14 @@ -from datetime import datetime as dt +from datetime import UTC, datetime as dt from django.core.exceptions import ValidationError from django.test import SimpleTestCase, TestCase -from django.utils import timezone from pydis_site.apps.api.models import ( DeletedMessage, DocumentationLink, + Filter, + FilterList, Infraction, - Message, MessageDeletionContext, Nomination, NominationEntry, @@ -41,7 +41,7 @@ class NitroMessageLengthTest(TestCase): self.context = MessageDeletionContext.objects.create( id=50, actor=self.user, - creation=dt.utcnow() + creation=dt.now(UTC) ) def test_create(self): @@ -99,17 +99,26 @@ class StringDunderMethodTests(SimpleTestCase): name='shawn', discriminator=555, ), - creation=dt.utcnow() + creation=dt.now(UTC) ), embeds=[] ), DocumentationLink( 'test', 'http://example.com', 'http://example.com' ), + FilterList( + name="forbidden_duckies", + list_type=0, + ), + Filter( + content="ducky_nsfw", + description="This ducky is totally inappropriate!", + additional_settings=None, + ), OffensiveMessage( id=602951077675139072, channel_id=291284109232308226, - delete_date=dt(3000, 1, 1) + delete_date=dt(3000, 1, 1, tzinfo=UTC) ), OffTopicChannelName(name='bob-the-builders-playground'), Role( @@ -117,24 +126,13 @@ class StringDunderMethodTests(SimpleTestCase): colour=0x5, permissions=0, position=10, ), - Message( - id=45, - author=User( - id=444, - name='bill', - discriminator=5, - ), - channel_id=666, - content="wooey", - embeds=[] - ), MessageDeletionContext( actor=User( id=5555, name='shawn', discriminator=555, ), - creation=dt.utcnow() + creation=dt.now(tz=UTC) ), User( id=5, @@ -153,7 +151,7 @@ class StringDunderMethodTests(SimpleTestCase): hidden=True, type='kick', reason='He terk my jerb!', - expires_at=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc) + expires_at=dt(5018, 11, 20, 15, 52, tzinfo=UTC) ), Reminder( author=User( @@ -167,7 +165,7 @@ class StringDunderMethodTests(SimpleTestCase): '267624335836053506/291284109232308226/463087129459949587' ), content="oh no", - expiration=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc) + expiration=dt(5018, 11, 20, 15, 52, tzinfo=UTC) ), NominationEntry( nomination_id=self.nomination.id, diff --git a/pydis_site/apps/api/tests/test_nominations.py b/pydis_site/apps/api/tests/test_nominations.py index 62b2314c..e4dfe36a 100644 --- a/pydis_site/apps/api/tests/test_nominations.py +++ b/pydis_site/apps/api/tests/test_nominations.py @@ -1,9 +1,9 @@ -from datetime import datetime as dt, timedelta, timezone +from datetime import UTC, datetime as dt, timedelta from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import Nomination, NominationEntry, User +from pydis_site.apps.api.models import Nomination, NominationEntry, User class CreationTests(AuthenticatedAPITestCase): @@ -38,7 +38,7 @@ class CreationTests(AuthenticatedAPITestCase): ) self.assertAlmostEqual( nomination.inserted_at, - dt.now(timezone.utc), + dt.now(UTC), delta=timedelta(seconds=2) ) self.assertEqual(nomination.user.id, data['user']) @@ -254,7 +254,7 @@ class NominationTests(AuthenticatedAPITestCase): def test_returns_400_on_frozen_field_update(self): url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,)) data = { - 'user': "Theo Katzman" + 'user': 1234 } response = self.client.patch(url, data=data) @@ -319,7 +319,7 @@ class NominationTests(AuthenticatedAPITestCase): self.assertAlmostEqual( nomination.ended_at, - dt.now(timezone.utc), + dt.now(UTC), delta=timedelta(seconds=2) ) self.assertFalse(nomination.active) @@ -524,3 +524,35 @@ class NominationTests(AuthenticatedAPITestCase): self.assertEqual(response.json(), { 'actor': ["The actor doesn't exist or has not nominated the user."] }) + + def test_patch_nomination_set_thread_id_of_active_nomination(self): + url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,)) + data = {'thread_id': 9876543210} + response = self.client.patch(url, data=data) + self.assertEqual(response.status_code, 200) + + def test_patch_nomination_set_thread_id_and_reviewed_of_active_nomination(self): + url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,)) + data = {'thread_id': 9876543210, "reviewed": True} + response = self.client.patch(url, data=data) + self.assertEqual(response.status_code, 200) + + def test_modifying_thread_id_when_ending_nomination(self): + url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,)) + data = {'thread_id': 9876543210, 'active': False, 'end_reason': "What?"} + + response = self.client.patch(url, data=data) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json(), { + 'thread_id': ['This field cannot be set when ending a nomination.'] + }) + + def test_patch_thread_id_for_inactive_nomination(self): + url = reverse('api:bot:nomination-detail', args=(self.inactive_nomination.id,)) + data = {'thread_id': 9876543210} + + response = self.client.patch(url, data=data) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json(), { + 'thread_id': ['This field cannot be set if the nomination is inactive.'] + }) diff --git a/pydis_site/apps/api/tests/test_off_topic_channel_names.py b/pydis_site/apps/api/tests/test_off_topic_channel_names.py index 2d273756..315f707d 100644 --- a/pydis_site/apps/api/tests/test_off_topic_channel_names.py +++ b/pydis_site/apps/api/tests/test_off_topic_channel_names.py @@ -1,7 +1,7 @@ from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import OffTopicChannelName +from pydis_site.apps.api.models import OffTopicChannelName class UnauthenticatedTests(AuthenticatedAPITestCase): @@ -74,6 +74,9 @@ class ListTests(AuthenticatedAPITestCase): cls.test_name_3 = OffTopicChannelName.objects.create( name="frozen-with-iceman", used=True, active=False ) + cls.test_name_4 = OffTopicChannelName.objects.create( + name="xith-is-cool", used=True, active=True + ) def test_returns_name_in_list(self): """Return all off-topic channel names.""" @@ -86,28 +89,46 @@ class ListTests(AuthenticatedAPITestCase): { self.test_name.name, self.test_name_2.name, - self.test_name_3.name + self.test_name_3.name, + self.test_name_4.name } ) - def test_returns_two_items_with_random_items_param_set_to_2(self): - """Return not-used name instead used.""" + def test_returns_two_active_items_with_random_items_param_set_to_2(self): + """Return not-used active names instead used.""" url = reverse('api:bot:offtopicchannelname-list') response = self.client.get(f'{url}?random_items=2') self.assertEqual(response.status_code, 200) self.assertEqual(len(response.json()), 2) - self.assertEqual(set(response.json()), {self.test_name.name, self.test_name_2.name}) + self.assertTrue( + all( + item in (self.test_name.name, self.test_name_2.name, self.test_name_4.name) + for item in response.json() + ) + ) + + def test_returns_three_active_items_with_random_items_param_set_to_3(self): + """Return not-used active names instead used.""" + url = reverse('api:bot:offtopicchannelname-list') + response = self.client.get(f'{url}?random_items=3') + + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.json()), 3) + self.assertEqual( + set(response.json()), + {self.test_name.name, self.test_name_2.name, self.test_name_4.name} + ) def test_running_out_of_names_with_random_parameter(self): - """Reset names `used` parameter to `False` when running out of names.""" + """Reset names `used` parameter to `False` when running out of active names.""" url = reverse('api:bot:offtopicchannelname-list') response = self.client.get(f'{url}?random_items=3') self.assertEqual(response.status_code, 200) self.assertEqual( set(response.json()), - {self.test_name.name, self.test_name_2.name, self.test_name_3.name} + {self.test_name.name, self.test_name_2.name, self.test_name_4.name} ) def test_returns_inactive_ot_names(self): @@ -129,7 +150,7 @@ class ListTests(AuthenticatedAPITestCase): self.assertEqual(response.status_code, 200) self.assertEqual( set(response.json()), - {self.test_name.name, self.test_name_2.name} + {self.test_name.name, self.test_name_2.name, self.test_name_4.name} ) diff --git a/pydis_site/apps/api/tests/test_offensive_message.py b/pydis_site/apps/api/tests/test_offensive_message.py index 3cf95b75..2dc60bc3 100644 --- a/pydis_site/apps/api/tests/test_offensive_message.py +++ b/pydis_site/apps/api/tests/test_offensive_message.py @@ -3,20 +3,31 @@ import datetime from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import OffensiveMessage +from pydis_site.apps.api.models import OffensiveMessage + + +def create_offensive_message() -> OffensiveMessage: + """Creates and returns an `OffensiveMessgage` record for tests.""" + delete_at = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(days=1) + + return OffensiveMessage.objects.create( + id=602951077675139072, + channel_id=291284109232308226, + delete_date=delete_at, + ) class CreationTests(AuthenticatedAPITestCase): def test_accept_valid_data(self): url = reverse('api:bot:offensivemessage-list') - delete_at = datetime.datetime.now() + datetime.timedelta(days=1) + delete_at = datetime.datetime.now() + datetime.timedelta(days=1) # noqa: DTZ005 data = { 'id': '602951077675139072', 'channel_id': '291284109232308226', 'delete_date': delete_at.isoformat()[:-1] } - aware_delete_at = delete_at.replace(tzinfo=datetime.timezone.utc) + aware_delete_at = delete_at.replace(tzinfo=datetime.UTC) response = self.client.post(url, data=data) self.assertEqual(response.status_code, 201) @@ -32,7 +43,7 @@ class CreationTests(AuthenticatedAPITestCase): def test_returns_400_on_non_future_date(self): url = reverse('api:bot:offensivemessage-list') - delete_at = datetime.datetime.now() - datetime.timedelta(days=1) + delete_at = datetime.datetime.now() - datetime.timedelta(days=1) # noqa: DTZ005 data = { 'id': '602951077675139072', 'channel_id': '291284109232308226', @@ -46,7 +57,7 @@ class CreationTests(AuthenticatedAPITestCase): def test_returns_400_on_negative_id_or_channel_id(self): url = reverse('api:bot:offensivemessage-list') - delete_at = datetime.datetime.now() + datetime.timedelta(days=1) + delete_at = datetime.datetime.now() + datetime.timedelta(days=1) # noqa: DTZ005 data = { 'id': '602951077675139072', 'channel_id': '291284109232308226', @@ -72,8 +83,8 @@ class CreationTests(AuthenticatedAPITestCase): class ListTests(AuthenticatedAPITestCase): @classmethod def setUpTestData(cls): - delete_at = datetime.datetime.now() + datetime.timedelta(days=1) - aware_delete_at = delete_at.replace(tzinfo=datetime.timezone.utc) + delete_at = datetime.datetime.now() + datetime.timedelta(days=1) # noqa: DTZ005 + aware_delete_at = delete_at.replace(tzinfo=datetime.UTC) cls.messages = [ { @@ -111,13 +122,7 @@ class ListTests(AuthenticatedAPITestCase): class DeletionTests(AuthenticatedAPITestCase): @classmethod def setUpTestData(cls): - delete_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(days=1) - - cls.valid_offensive_message = OffensiveMessage.objects.create( - id=602951077675139072, - channel_id=291284109232308226, - delete_date=delete_at.isoformat() - ) + cls.valid_offensive_message = create_offensive_message() def test_delete_data(self): url = reverse( @@ -132,24 +137,53 @@ class DeletionTests(AuthenticatedAPITestCase): ) -class NotAllowedMethodsTests(AuthenticatedAPITestCase): +class UpdateOffensiveMessageTestCase(AuthenticatedAPITestCase): @classmethod def setUpTestData(cls): - delete_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(days=1) + cls.message = create_offensive_message() + cls.in_one_week = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(days=7) + + def test_updating_message(self): + url = reverse('api:bot:offensivemessage-detail', args=(self.message.id,)) + data = {'delete_date': self.in_one_week.isoformat()} + update_response = self.client.patch(url, data=data) + self.assertEqual(update_response.status_code, 200) - cls.valid_offensive_message = OffensiveMessage.objects.create( - id=602951077675139072, - channel_id=291284109232308226, - delete_date=delete_at.isoformat() + self.message.refresh_from_db() + self.assertAlmostEqual( + self.message.delete_date, + self.in_one_week, + delta=datetime.timedelta(seconds=1), ) - def test_returns_405_for_patch_and_put_requests(self): - url = reverse( - 'api:bot:offensivemessage-detail', args=(self.valid_offensive_message.id,) + def test_updating_write_once_fields(self): + """Fields such as the channel ID may not be updated.""" + url = reverse('api:bot:offensivemessage-detail', args=(self.message.id,)) + data = {'channel_id': self.message.channel_id + 1} + response = self.client.patch(url, data=data) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json(), {'channel_id': ["This field cannot be updated."]}) + + def test_updating_nonexistent_message(self): + url = reverse('api:bot:offensivemessage-detail', args=(self.message.id + 1,)) + data = {'delete_date': self.in_one_week} + + response = self.client.patch(url, data=data) + self.assertEqual(response.status_code, 404) + self.message.refresh_from_db() + self.assertNotAlmostEqual( + self.message.delete_date, + self.in_one_week, + delta=datetime.timedelta(seconds=1), ) - not_allowed_methods = (self.client.patch, self.client.put) - for method in not_allowed_methods: - with self.subTest(method=method): - response = method(url, {}) - self.assertEqual(response.status_code, 405) + +class NotAllowedMethodsTests(AuthenticatedAPITestCase): + @classmethod + def setUpTestData(cls): + cls.message = create_offensive_message() + + def test_returns_405_for_get(self): + url = reverse('api:bot:offensivemessage-detail', args=(self.message.id,)) + response = self.client.get(url) + self.assertEqual(response.status_code, 405) diff --git a/pydis_site/apps/api/tests/test_reminders.py b/pydis_site/apps/api/tests/test_reminders.py index 709685bc..98e93bb7 100644 --- a/pydis_site/apps/api/tests/test_reminders.py +++ b/pydis_site/apps/api/tests/test_reminders.py @@ -1,10 +1,10 @@ -from datetime import datetime +from datetime import UTC, datetime from django.forms.models import model_to_dict from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import Reminder, User +from pydis_site.apps.api.models import Reminder, User class UnauthedReminderAPITests(AuthenticatedAPITestCase): @@ -59,7 +59,7 @@ class ReminderCreationTests(AuthenticatedAPITestCase): data = { 'author': self.author.id, 'content': 'Remember to...wait what was it again?', - 'expiration': datetime.utcnow().isoformat(), + 'expiration': datetime.now(tz=UTC).isoformat(), 'jump_url': "https://www.google.com", 'channel_id': 123, 'mentions': [8888, 9999], @@ -91,7 +91,7 @@ class ReminderDeletionTests(AuthenticatedAPITestCase): cls.reminder = Reminder.objects.create( author=cls.author, content="Don't forget to set yourself a reminder", - expiration=datetime.utcnow().isoformat(), + expiration=datetime.now(UTC), jump_url="https://www.decliningmentalfaculties.com", channel_id=123 ) @@ -122,7 +122,7 @@ class ReminderListTests(AuthenticatedAPITestCase): cls.reminder_one = Reminder.objects.create( author=cls.author, content="We should take Bikini Bottom, and push it somewhere else!", - expiration=datetime.utcnow().isoformat(), + expiration=datetime.now(UTC), jump_url="https://www.icantseemyforehead.com", channel_id=123 ) @@ -130,16 +130,17 @@ class ReminderListTests(AuthenticatedAPITestCase): cls.reminder_two = Reminder.objects.create( author=cls.author, content="Gahhh-I love being purple!", - expiration=datetime.utcnow().isoformat(), + expiration=datetime.now(UTC), jump_url="https://www.goofygoobersicecreampartyboat.com", channel_id=123, active=False ) + drf_format = '%Y-%m-%dT%H:%M:%S.%fZ' cls.rem_dict_one = model_to_dict(cls.reminder_one) - cls.rem_dict_one['expiration'] += 'Z' # Massaging a quirk of the response time format + cls.rem_dict_one['expiration'] = cls.rem_dict_one['expiration'].strftime(drf_format) cls.rem_dict_two = model_to_dict(cls.reminder_two) - cls.rem_dict_two['expiration'] += 'Z' # Massaging a quirk of the response time format + cls.rem_dict_two['expiration'] = cls.rem_dict_two['expiration'].strftime(drf_format) def test_reminders_in_full_list(self): url = reverse('api:bot:reminder-list') @@ -175,7 +176,7 @@ class ReminderRetrieveTests(AuthenticatedAPITestCase): cls.reminder = Reminder.objects.create( author=cls.author, content="Reminder content", - expiration=datetime.utcnow().isoformat(), + expiration=datetime.now(UTC), jump_url="http://example.com/", channel_id=123 ) @@ -203,7 +204,7 @@ class ReminderUpdateTests(AuthenticatedAPITestCase): cls.reminder = Reminder.objects.create( author=cls.author, content="Squash those do-gooders", - expiration=datetime.utcnow().isoformat(), + expiration=datetime.now(UTC), jump_url="https://www.decliningmentalfaculties.com", channel_id=123 ) diff --git a/pydis_site/apps/api/tests/test_roles.py b/pydis_site/apps/api/tests/test_roles.py index 73c80c77..d3031990 100644 --- a/pydis_site/apps/api/tests/test_roles.py +++ b/pydis_site/apps/api/tests/test_roles.py @@ -1,7 +1,7 @@ from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import Role, User +from pydis_site.apps.api.models import Role, User class CreationTests(AuthenticatedAPITestCase): diff --git a/pydis_site/apps/api/tests/test_rules.py b/pydis_site/apps/api/tests/test_rules.py index d08c5fae..14412b90 100644 --- a/pydis_site/apps/api/tests/test_rules.py +++ b/pydis_site/apps/api/tests/test_rules.py @@ -1,7 +1,11 @@ +import itertools +import re +from pathlib import Path + from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..views import RulesView +from pydis_site.apps.api.views import RulesView class RuleAPITests(AuthenticatedAPITestCase): @@ -33,3 +37,37 @@ class RuleAPITests(AuthenticatedAPITestCase): url = reverse('api:rules') response = self.client.get(url + '?link_format=unknown') self.assertEqual(response.status_code, 400) + + +class RuleCorrectnessTests(AuthenticatedAPITestCase): + """Verifies that the rules from the API and by the static rules in the content app match.""" + + @classmethod + def setUpTestData(cls): + cls.markdown_rule_re = re.compile(r'^> \d+\. (.*)$') + + def test_rules_in_markdown_file_roughly_equal_api_rules(self) -> None: + url = reverse('api:rules') + api_response = self.client.get(url + '?link_format=md') + api_rules = tuple(rule for (rule, _tags) in api_response.json()) + + markdown_rules_path = ( + Path(__file__).parent.parent.parent / 'content' / 'resources' / 'rules.md' + ) + + markdown_rules = [] + for line in markdown_rules_path.read_text(encoding="utf8").splitlines(): + matches = self.markdown_rule_re.match(line) + if matches is not None: + markdown_rules.append(matches.group(1)) + + zipper = itertools.zip_longest(api_rules, markdown_rules) + for idx, (api_rule, markdown_rule) in enumerate(zipper): + with self.subTest(f"Rule {idx}"): + self.assertIsNotNone( + markdown_rule, f"The API has more rules than {markdown_rules_path}" + ) + self.assertIsNotNone( + api_rule, f"{markdown_rules_path} has more rules than the API endpoint" + ) + self.assertEqual(markdown_rule, api_rule) diff --git a/pydis_site/apps/api/tests/test_users.py b/pydis_site/apps/api/tests/test_users.py index e21bb32b..cff4a825 100644 --- a/pydis_site/apps/api/tests/test_users.py +++ b/pydis_site/apps/api/tests/test_users.py @@ -1,11 +1,12 @@ +import random from unittest.mock import Mock, patch from django.urls import reverse from .base import AuthenticatedAPITestCase -from ..models import Infraction, Role, User -from ..models.bot.metricity import NotFoundError -from ..viewsets.bot.user import UserListPagination +from pydis_site.apps.api.models import Infraction, Role, User +from pydis_site.apps.api.models.bot.metricity import NotFoundError +from pydis_site.apps.api.viewsets.bot.user import UserListPagination class UnauthedUserAPITests(AuthenticatedAPITestCase): @@ -468,18 +469,17 @@ class UserMetricityTests(AuthenticatedAPITestCase): with self.subTest( voice_infractions=case['voice_infractions'], voice_gate_blocked=case['voice_gate_blocked'] - ): - with patch("pydis_site.apps.api.viewsets.bot.user.Infraction.objects.filter") as p: - p.return_value = case['voice_infractions'] + ), patch("pydis_site.apps.api.viewsets.bot.user.Infraction.objects.filter") as p: + p.return_value = case['voice_infractions'] - url = reverse('api:bot:user-metricity-data', args=[0]) - response = self.client.get(url) + url = reverse('api:bot:user-metricity-data', args=[0]) + response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertEqual( - response.json()["voice_gate_blocked"], - case["voice_gate_blocked"] - ) + self.assertEqual(response.status_code, 200) + self.assertEqual( + response.json()["voice_gate_blocked"], + case["voice_gate_blocked"] + ) def test_metricity_review_data(self): # Given @@ -501,6 +501,90 @@ class UserMetricityTests(AuthenticatedAPITestCase): "total_messages": total_messages }) + def test_metricity_activity_data(self): + # Given + self.mock_no_metricity_user() # Other functions shouldn't be used. + self.metricity.total_messages_in_past_n_days.return_value = [(0, 10)] + + # When + url = reverse("api:bot:user-metricity-activity-data") + response = self.client.post( + url, + data=[0, 1], + QUERY_STRING="days=10", + ) + + # Then + self.assertEqual(response.status_code, 200) + self.metricity.total_messages_in_past_n_days.assert_called_once_with(["0", "1"], 10) + self.assertEqual(response.json(), {"0": 10, "1": 0}) + + def test_metricity_activity_data_invalid_days(self): + # Given + self.mock_no_metricity_user() # Other functions shouldn't be used. + + # When + url = reverse("api:bot:user-metricity-activity-data") + response = self.client.post( + url, + data=[0, 1], + QUERY_STRING="days=fifty", + ) + + # Then + self.assertEqual(response.status_code, 400) + self.metricity.total_messages_in_past_n_days.assert_not_called() + self.assertEqual(response.json(), {"days": ["This query parameter must be an integer."]}) + + def test_metricity_activity_data_no_days(self): + # Given + self.mock_no_metricity_user() # Other functions shouldn't be used. + + # When + url = reverse('api:bot:user-metricity-activity-data') + response = self.client.post( + url, + data=[0, 1], + ) + + # Then + self.assertEqual(response.status_code, 400) + self.metricity.total_messages_in_past_n_days.assert_not_called() + self.assertEqual(response.json(), {'days': ["This query parameter is required."]}) + + def test_metricity_activity_data_no_users(self): + # Given + self.mock_no_metricity_user() # Other functions shouldn't be used. + + # When + url = reverse('api:bot:user-metricity-activity-data') + response = self.client.post( + url, + QUERY_STRING="days=10", + ) + + # Then + self.assertEqual(response.status_code, 400) + self.metricity.total_messages_in_past_n_days.assert_not_called() + self.assertEqual(response.json(), ['Expected a list of items but got type "dict".']) + + def test_metricity_activity_data_invalid_users(self): + # Given + self.mock_no_metricity_user() # Other functions shouldn't be used. + + # When + url = reverse('api:bot:user-metricity-activity-data') + response = self.client.post( + url, + data=[123, 'username'], + QUERY_STRING="days=10", + ) + + # Then + self.assertEqual(response.status_code, 400) + self.metricity.total_messages_in_past_n_days.assert_not_called() + self.assertEqual(response.json(), {'1': ['A valid integer is required.']}) + def mock_metricity_user(self, joined_at, total_messages, total_blocks, top_channel_activity): patcher = patch("pydis_site.apps.api.viewsets.bot.user.Metricity") self.metricity = patcher.start() @@ -520,3 +604,45 @@ class UserMetricityTests(AuthenticatedAPITestCase): self.metricity.total_messages.side_effect = NotFoundError() self.metricity.total_message_blocks.side_effect = NotFoundError() self.metricity.top_channel_activity.side_effect = NotFoundError() + + +class UserViewSetTests(AuthenticatedAPITestCase): + @classmethod + def setUpTestData(cls): + cls.searched_user = User.objects.create( + id=12095219, + name=f"Test user {random.randint(100, 1000)}", + discriminator=random.randint(1, 9999), + in_guild=True, + ) + cls.other_user = User.objects.create( + id=18259125, + name=f"Test user {random.randint(100, 1000)}", + discriminator=random.randint(1, 9999), + in_guild=True, + ) + + def test_search_lookup_of_wanted_user(self) -> None: + """Searching a user by name and discriminator should return that user.""" + url = reverse('api:bot:user-list') + params = { + 'username': self.searched_user.name, + 'discriminator': self.searched_user.discriminator, + } + response = self.client.get(url, params) + result = response.json() + self.assertEqual(result['count'], 1) + [user] = result['results'] + self.assertEqual(user['id'], self.searched_user.id) + + def test_search_lookup_of_unknown_user(self) -> None: + """Searching an unknown user should return no results.""" + url = reverse('api:bot:user-list') + params = { + 'username': "f-string enjoyer", + 'discriminator': 1245, + } + response = self.client.get(url, params) + result = response.json() + self.assertEqual(result['count'], 0) + self.assertEqual(result['results'], []) diff --git a/pydis_site/apps/api/tests/test_validators.py b/pydis_site/apps/api/tests/test_validators.py index 551cc2aa..abff8f55 100644 --- a/pydis_site/apps/api/tests/test_validators.py +++ b/pydis_site/apps/api/tests/test_validators.py @@ -1,11 +1,10 @@ -from datetime import datetime, timezone +from datetime import UTC, datetime from django.core.exceptions import ValidationError from django.test import TestCase -from ..models.bot.bot_setting import validate_bot_setting_name -from ..models.bot.offensive_message import future_date_validator -from ..models.utils import validate_embed +from pydis_site.apps.api.models.bot.bot_setting import validate_bot_setting_name +from pydis_site.apps.api.models.bot.offensive_message import future_date_validator REQUIRED_KEYS = ( @@ -22,238 +21,10 @@ class BotSettingValidatorTests(TestCase): validate_bot_setting_name('bad name') -class TagEmbedValidatorTests(TestCase): - def test_rejects_non_mapping(self): - with self.assertRaises(ValidationError): - validate_embed('non-empty non-mapping') - - def test_rejects_missing_required_keys(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'unknown': "key" - }) - - def test_rejects_one_correct_one_incorrect(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'provider': "??", - 'title': "" - }) - - def test_rejects_empty_required_key(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': '' - }) - - def test_rejects_list_as_embed(self): - with self.assertRaises(ValidationError): - validate_embed([]) - - def test_rejects_required_keys_and_unknown_keys(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "the duck walked up to the lemonade stand", - 'and': "he said to the man running the stand" - }) - - def test_rejects_too_long_title(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': 'a' * 257 - }) - - def test_rejects_too_many_fields(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': [{} for _ in range(26)] - }) - - def test_rejects_too_long_description(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'description': 'd' * 4097 - }) - - def test_allows_valid_embed(self): - validate_embed({ - 'title': "My embed", - 'description': "look at my embed, my embed is amazing" - }) - - def test_allows_unvalidated_fields(self): - validate_embed({ - 'title': "My embed", - 'provider': "what am I??" - }) - - def test_rejects_fields_as_list_of_non_mappings(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': ['abc'] - }) - - def test_rejects_fields_with_unknown_fields(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': [ - { - 'what': "is this field" - } - ] - }) - - def test_rejects_fields_with_too_long_name(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': [ - { - 'name': "a" * 257 - } - ] - }) - - def test_rejects_one_correct_one_incorrect_field(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': [ - { - 'name': "Totally valid", - 'value': "LOOK AT ME" - }, - { - 'name': "Totally valid", - 'value': "LOOK AT ME", - 'oh': "what is this key?" - } - ] - }) - - def test_rejects_missing_required_field_field(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': [ - { - 'name': "Totally valid", - 'inline': True, - } - ] - }) - - def test_rejects_invalid_inline_field_field(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'fields': [ - { - 'name': "Totally valid", - 'value': "LOOK AT ME", - 'inline': "Totally not a boolean", - } - ] - }) - - def test_allows_valid_fields(self): - validate_embed({ - 'fields': [ - { - 'name': "valid", - 'value': "field", - }, - { - 'name': "valid", - 'value': "field", - 'inline': False, - }, - { - 'name': "valid", - 'value': "field", - 'inline': True, - }, - ] - }) - - def test_rejects_footer_as_non_mapping(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'footer': [] - }) - - def test_rejects_footer_with_unknown_fields(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'footer': { - 'duck': "quack" - } - }) - - def test_rejects_footer_with_empty_text(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'footer': { - 'text': "" - } - }) - - def test_allows_footer_with_proper_values(self): - validate_embed({ - 'title': "whatever", - 'footer': { - 'text': "django good" - } - }) - - def test_rejects_author_as_non_mapping(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'author': [] - }) - - def test_rejects_author_with_unknown_field(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'author': { - 'field': "that is unknown" - } - }) - - def test_rejects_author_with_empty_name(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'author': { - 'name': "" - } - }) - - def test_rejects_author_with_one_correct_one_incorrect(self): - with self.assertRaises(ValidationError): - validate_embed({ - 'title': "whatever", - 'author': { - # Relies on "dictionary insertion order remembering" (D.I.O.R.) behaviour - 'url': "bobswebsite.com", - 'name': "" - } - }) - - def test_allows_author_with_proper_values(self): - validate_embed({ - 'title': "whatever", - 'author': { - 'name': "Bob" - } - }) - - class OffensiveMessageValidatorsTests(TestCase): def test_accepts_future_date(self): - future_date_validator(datetime(3000, 1, 1, tzinfo=timezone.utc)) + future_date_validator(datetime(3000, 1, 1, tzinfo=UTC)) def test_rejects_non_future_date(self): with self.assertRaises(ValidationError): - future_date_validator(datetime(1000, 1, 1, tzinfo=timezone.utc)) + future_date_validator(datetime(1000, 1, 1, tzinfo=UTC)) |