aboutsummaryrefslogtreecommitdiffstats
path: root/pydis_site/apps/api/tests
diff options
context:
space:
mode:
authorGravatar Akarys42 <[email protected]>2019-11-07 19:08:23 +0100
committerGravatar Akarys42 <[email protected]>2019-11-07 19:08:23 +0100
commit8e689ac24c8965f8285d1a4fdadbc0d593fd7c0c (patch)
tree8a3adc7a67e38cfbcbe7056f0277d6c882764e8a /pydis_site/apps/api/tests
parentBlock PATCH and PUT methods (diff)
parentMerge pull request #278 from python-discord/active-infractions-validation (diff)
Merge branch 'master' into #222-offensive-msg-autodeletion
Diffstat (limited to 'pydis_site/apps/api/tests')
-rw-r--r--pydis_site/apps/api/tests/migrations/__init__.py1
-rw-r--r--pydis_site/apps/api/tests/migrations/base.py102
-rw-r--r--pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py496
-rw-r--r--pydis_site/apps/api/tests/migrations/test_base.py135
-rw-r--r--pydis_site/apps/api/tests/test_infractions.py195
5 files changed, 927 insertions, 2 deletions
diff --git a/pydis_site/apps/api/tests/migrations/__init__.py b/pydis_site/apps/api/tests/migrations/__init__.py
new file mode 100644
index 00000000..38e42ffc
--- /dev/null
+++ b/pydis_site/apps/api/tests/migrations/__init__.py
@@ -0,0 +1 @@
+"""This submodule contains tests for functions used in data migrations."""
diff --git a/pydis_site/apps/api/tests/migrations/base.py b/pydis_site/apps/api/tests/migrations/base.py
new file mode 100644
index 00000000..0c0a5bd0
--- /dev/null
+++ b/pydis_site/apps/api/tests/migrations/base.py
@@ -0,0 +1,102 @@
+"""Includes utilities for testing migrations."""
+from django.db import connection
+from django.db.migrations.executor import MigrationExecutor
+from django.test import TestCase
+
+
+class MigrationsTestCase(TestCase):
+ """
+ A `TestCase` subclass to test migration files.
+
+ To be able to properly test a migration, we will need to inject data into the test database
+ before the migrations we want to test are applied, but after the older migrations have been
+ applied. This makes sure that we are testing "as if" we were actually applying this migration
+ to a database in the state it was in before introducing the new migration.
+
+ To set up a MigrationsTestCase, create a subclass of this class and set the following
+ class-level attributes:
+
+ - app: The name of the app that contains the migrations (e.g., `'api'`)
+ - migration_prior: The name* of the last migration file before the migrations you want to test
+ - migration_target: The name* of the last migration file we want to test
+
+ *) Specify the file names without a path or the `.py` file extension.
+
+ Additionally, overwrite the `setUpMigrationData` in the subclass to inject data into the
+ database before the migrations we want to test are applied. Please read the docstring of the
+ method for more information. An optional hook, `setUpPostMigrationData` is also provided.
+ """
+
+ # These class-level attributes should be set in classes that inherit from this base class.
+ app = None
+ migration_prior = None
+ migration_target = None
+
+ @classmethod
+ def setUpTestData(cls):
+ """
+ Injects data into the test database prior to the migration we're trying to test.
+
+ This class methods reverts the test database back to the state of the last migration file
+ prior to the migrations we want to test. It will then allow the user to inject data into the
+ test database by calling the `setUpMigrationData` hook. After the data has been injected, it
+ will apply the migrations we want to test and call the `setUpPostMigrationData` hook. The
+ user can now test if the migration correctly migrated the injected test data.
+ """
+ if not cls.app:
+ raise ValueError("The `app` attribute was not set.")
+
+ if not cls.migration_prior or not cls.migration_target:
+ raise ValueError("Both ` migration_prior` and `migration_target` need to be set.")
+
+ cls.migrate_from = [(cls.app, cls.migration_prior)]
+ cls.migrate_to = [(cls.app, cls.migration_target)]
+
+ # Reverse to database state prior to the migrations we want to test
+ executor = MigrationExecutor(connection)
+ executor.migrate(cls.migrate_from)
+
+ # Call the data injection hook with the current state of the project
+ old_apps = executor.loader.project_state(cls.migrate_from).apps
+ cls.setUpMigrationData(old_apps)
+
+ # Run the migrations we want to test
+ executor = MigrationExecutor(connection)
+ executor.loader.build_graph()
+ executor.migrate(cls.migrate_to)
+
+ # Save the project state so we're able to work with the correct model states
+ cls.apps = executor.loader.project_state(cls.migrate_to).apps
+
+ # Call `setUpPostMigrationData` to potentially set up post migration data used in testing
+ cls.setUpPostMigrationData(cls.apps)
+
+ @classmethod
+ def setUpMigrationData(cls, apps):
+ """
+ Override this method to inject data into the test database before the migration is applied.
+
+ This method will be called after setting up the database according to the migrations that
+ come before the migration(s) we are trying to test, but before the to-be-tested migration(s)
+ are applied. This allows us to simulate a database state just prior to the migrations we are
+ trying to test.
+
+ To make sure we're creating objects according to the state the models were in at this point
+ in the migration history, use `apps.get_model(app_name: str, model_name: str)` to get the
+ appropriate model, e.g.:
+
+ >>> Infraction = apps.get_model('api', 'Infraction')
+ """
+ pass
+
+ @classmethod
+ def setUpPostMigrationData(cls, apps):
+ """
+ Set up additional test data after the target migration has been applied.
+
+ Use `apps.get_model(app_name: str, model_name: str)` to get the correct instances of the
+ model classes:
+
+ >>> Infraction = apps.get_model('api', 'Infraction')
+ """
+ pass
diff --git a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py
new file mode 100644
index 00000000..8dc29b34
--- /dev/null
+++ b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py
@@ -0,0 +1,496 @@
+"""Tests for the data migration in `filename`."""
+import logging
+from collections import ChainMap, namedtuple
+from datetime import timedelta
+from itertools import count
+from typing import Dict, Iterable, Type, Union
+
+from django.db.models import Q
+from django.forms.models import model_to_dict
+from django.utils import timezone
+
+from pydis_site.apps.api.models import Infraction, User
+from .base import MigrationsTestCase
+
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+
+
+InfractionHistory = namedtuple('InfractionHistory', ("user_id", "infraction_history"))
+
+
+class InfractionFactory:
+ """Factory that creates infractions for a User instance."""
+
+ infraction_id = count(1)
+ user_id = count(1)
+ default_values = {
+ 'active': True,
+ 'expires_at': None,
+ 'hidden': False,
+ }
+
+ @classmethod
+ def create(
+ cls,
+ actor: User,
+ infractions: Iterable[Dict[str, Union[str, int, bool]]],
+ infraction_model: Type[Infraction] = Infraction,
+ user_model: Type[User] = User,
+ ) -> InfractionHistory:
+ """
+ Creates `infractions` for the `user` with the given `actor`.
+
+ The `infractions` dictionary can contain the following fields:
+ - `type` (required)
+ - `active` (default: True)
+ - `expires_at` (default: None; i.e, permanent)
+ - `hidden` (default: False).
+
+ The parameters `infraction_model` and `user_model` can be used to pass in an instance of
+ both model classes from a different migration/project state.
+ """
+ user_id = next(cls.user_id)
+ user = user_model.objects.create(
+ id=user_id,
+ name=f"Infracted user {user_id}",
+ discriminator=user_id,
+ avatar_hash=None,
+ )
+ infraction_history = []
+
+ for infraction in infractions:
+ infraction = dict(infraction)
+ infraction["id"] = next(cls.infraction_id)
+ infraction = ChainMap(infraction, cls.default_values)
+ new_infraction = infraction_model.objects.create(
+ user=user,
+ actor=actor,
+ type=infraction["type"],
+ reason=f"`{infraction['type']}` infraction (ID: {infraction['id']} of {user}",
+ active=infraction['active'],
+ hidden=infraction['hidden'],
+ expires_at=infraction['expires_at'],
+ )
+ infraction_history.append(new_infraction)
+
+ return InfractionHistory(user_id=user_id, infraction_history=infraction_history)
+
+
+class InfractionFactoryTests(MigrationsTestCase):
+ """Tests for the InfractionFactory."""
+
+ app = "api"
+ migration_prior = "0046_reminder_jump_url"
+ migration_target = "0046_reminder_jump_url"
+
+ @classmethod
+ def setUpPostMigrationData(cls, apps):
+ """Create a default actor for all infractions."""
+ cls.infraction_model = apps.get_model('api', 'Infraction')
+ cls.user_model = apps.get_model('api', 'User')
+
+ cls.actor = cls.user_model.objects.create(
+ id=9999,
+ name="Unknown Moderator",
+ discriminator=1040,
+ avatar_hash=None,
+ )
+
+ def test_infraction_factory_total_count(self):
+ """Does the test database hold as many infractions as we tried to create?"""
+ InfractionFactory.create(
+ actor=self.actor,
+ infractions=(
+ {'type': 'kick', 'active': False, 'hidden': False},
+ {'type': 'ban', 'active': True, 'hidden': False},
+ {'type': 'note', 'active': False, 'hidden': True},
+ ),
+ infraction_model=self.infraction_model,
+ user_model=self.user_model,
+ )
+ database_count = Infraction.objects.all().count()
+ self.assertEqual(3, database_count)
+
+ def test_infraction_factory_multiple_users(self):
+ """Does the test database hold as many infractions as we tried to create?"""
+ for _user in range(5):
+ InfractionFactory.create(
+ actor=self.actor,
+ infractions=(
+ {'type': 'kick', 'active': False, 'hidden': True},
+ {'type': 'ban', 'active': True, 'hidden': False},
+ ),
+ infraction_model=self.infraction_model,
+ user_model=self.user_model,
+ )
+
+ # Check if infractions and users are recorded properly in the database
+ database_count = Infraction.objects.all().count()
+ self.assertEqual(database_count, 10)
+
+ user_count = User.objects.all().count()
+ self.assertEqual(user_count, 5 + 1)
+
+ def test_infraction_factory_sets_correct_fields(self):
+ """Does the InfractionFactory set the correct attributes?"""
+ infractions = (
+ {
+ 'type': 'note',
+ 'active': False,
+ 'hidden': True,
+ 'expires_at': timezone.now()
+ },
+ {'type': 'warning', 'active': False, 'hidden': False, 'expires_at': None},
+ {'type': 'watch', 'active': False, 'hidden': True, 'expires_at': None},
+ {'type': 'mute', 'active': True, 'hidden': False, 'expires_at': None},
+ {'type': 'kick', 'active': True, 'hidden': True, 'expires_at': None},
+ {'type': 'ban', 'active': True, 'hidden': False, 'expires_at': None},
+ {
+ 'type': 'superstar',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now()
+ },
+ )
+
+ InfractionFactory.create(
+ actor=self.actor,
+ infractions=infractions,
+ infraction_model=self.infraction_model,
+ user_model=self.user_model,
+ )
+
+ for infraction in infractions:
+ with self.subTest(**infraction):
+ self.assertTrue(Infraction.objects.filter(**infraction).exists())
+
+
+class ActiveInfractionMigrationTests(MigrationsTestCase):
+ """
+ Tests the active infraction data migration.
+
+ The active infraction data migration should do the following things:
+
+ 1. migrates all active notes, warnings, and kicks to an inactive status;
+ 2. migrates all users with multiple active infractions of a single type to have only one active
+ infraction of that type. The infraction with the longest duration stays active.
+ """
+
+ app = "api"
+ migration_prior = "0046_reminder_jump_url"
+ migration_target = "0047_active_infractions_migration"
+
+ @classmethod
+ def setUpMigrationData(cls, apps):
+ """Sets up an initial database state that contains the relevant test cases."""
+ # Fetch the Infraction and User model in the current migration state
+ cls.infraction_model = apps.get_model('api', 'Infraction')
+ cls.user_model = apps.get_model('api', 'User')
+
+ cls.created_infractions = {}
+
+ # Moderator that serves as actor for all infractions
+ cls.user_moderator = cls.user_model.objects.create(
+ id=9999,
+ name="Olivier de Vienne",
+ discriminator=1040,
+ avatar_hash=None,
+ )
+
+ # User #1: clean user with no infractions
+ cls.created_infractions["no infractions"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=[],
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #2: One inactive note infraction
+ cls.created_infractions["one inactive note"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'note', 'active': False, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #3: One active note infraction
+ cls.created_infractions["one active note"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'note', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #4: One active and one inactive note infraction
+ cls.created_infractions["one active and one inactive note"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'note', 'active': False, 'hidden': True},
+ {'type': 'note', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #5: Once active note, one active kick, once active warning
+ cls.created_infractions["active note, kick, warning"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'note', 'active': True, 'hidden': True},
+ {'type': 'kick', 'active': True, 'hidden': True},
+ {'type': 'warning', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #6: One inactive ban and one active ban
+ cls.created_infractions["one inactive and one active ban"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'ban', 'active': False, 'hidden': True},
+ {'type': 'ban', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #7: Two active permanent bans
+ cls.created_infractions["two active perm bans"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'ban', 'active': True, 'hidden': True},
+ {'type': 'ban', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #8: Multiple active temporary bans
+ cls.created_infractions["multiple active temp bans"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=1)
+ },
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=10)
+ },
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=20)
+ },
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=5)
+ },
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #9: One active permanent ban, two active temporary bans
+ cls.created_infractions["active perm, two active temp bans"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=10)
+ },
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': None,
+ },
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=7)
+ },
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #10: One inactive permanent ban, two active temporary bans
+ cls.created_infractions["one inactive perm ban, two active temp bans"] = (
+ InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=10)
+ },
+ {
+ 'type': 'ban',
+ 'active': False,
+ 'hidden': True,
+ 'expires_at': None,
+ },
+ {
+ 'type': 'ban',
+ 'active': True,
+ 'hidden': True,
+ 'expires_at': timezone.now() + timedelta(days=7)
+ },
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+ )
+
+ # User #11: Active ban, active mute, active superstar
+ cls.created_infractions["active ban, mute, and superstar"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'ban', 'active': True, 'hidden': True},
+ {'type': 'mute', 'active': True, 'hidden': True},
+ {'type': 'superstar', 'active': True, 'hidden': True},
+ {'type': 'watch', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ # User #12: Multiple active bans, active mutes, active superstars
+ cls.created_infractions["multiple active bans, mutes, stars"] = InfractionFactory.create(
+ actor=cls.user_moderator,
+ infractions=(
+ {'type': 'ban', 'active': True, 'hidden': True},
+ {'type': 'ban', 'active': True, 'hidden': True},
+ {'type': 'ban', 'active': True, 'hidden': True},
+ {'type': 'mute', 'active': True, 'hidden': True},
+ {'type': 'mute', 'active': True, 'hidden': True},
+ {'type': 'mute', 'active': True, 'hidden': True},
+ {'type': 'superstar', 'active': True, 'hidden': True},
+ {'type': 'superstar', 'active': True, 'hidden': True},
+ {'type': 'superstar', 'active': True, 'hidden': True},
+ {'type': 'watch', 'active': True, 'hidden': True},
+ {'type': 'watch', 'active': True, 'hidden': True},
+ {'type': 'watch', 'active': True, 'hidden': True},
+ ),
+ infraction_model=cls.infraction_model,
+ user_model=cls.user_model,
+ )
+
+ def test_all_never_active_types_became_inactive(self):
+ """Are all infractions of a non-active type inactive after the migration?"""
+ inactive_type_query = Q(type="note") | Q(type="warning") | Q(type="kick")
+ self.assertFalse(
+ self.infraction_model.objects.filter(inactive_type_query, active=True).exists()
+ )
+
+ def test_migration_left_clean_user_without_infractions(self):
+ """Do users without infractions have no infractions after the migration?"""
+ user_id, infraction_history = self.created_infractions["no infractions"]
+ self.assertFalse(
+ self.infraction_model.objects.filter(user__id=user_id).exists()
+ )
+
+ def test_migration_left_user_with_inactive_note_untouched(self):
+ """Did the migration leave users with only an inactive note untouched?"""
+ user_id, infraction_history = self.created_infractions["one inactive note"]
+ inactive_note = infraction_history[0]
+ self.assertTrue(
+ self.infraction_model.objects.filter(**model_to_dict(inactive_note)).exists()
+ )
+
+ def test_migration_only_touched_active_field_of_active_note(self):
+ """Does the migration only change the `active` field?"""
+ user_id, infraction_history = self.created_infractions["one active note"]
+ note = model_to_dict(infraction_history[0])
+ note['active'] = False
+ self.assertTrue(
+ self.infraction_model.objects.filter(**note).exists()
+ )
+
+ def test_migration_only_touched_active_field_of_active_note_left_inactive_untouched(self):
+ """Does the migration only change the `active` field of active notes?"""
+ user_id, infraction_history = self.created_infractions["one active and one inactive note"]
+ for note in infraction_history:
+ with self.subTest(active=note.active):
+ note = model_to_dict(note)
+ note['active'] = False
+ self.assertTrue(
+ self.infraction_model.objects.filter(**note).exists()
+ )
+
+ def test_migration_migrates_all_nonactive_types_to_inactive(self):
+ """Do we set the `active` field of all non-active infractions to `False`?"""
+ user_id, infraction_history = self.created_infractions["active note, kick, warning"]
+ self.assertFalse(
+ self.infraction_model.objects.filter(user__id=user_id, active=True).exists()
+ )
+
+ def test_migration_leaves_user_with_one_active_ban_untouched(self):
+ """Do we leave a user with one active and one inactive ban untouched?"""
+ user_id, infraction_history = self.created_infractions["one inactive and one active ban"]
+ for infraction in infraction_history:
+ with self.subTest(active=infraction.active):
+ self.assertTrue(
+ self.infraction_model.objects.filter(**model_to_dict(infraction)).exists()
+ )
+
+ def test_migration_turns_double_active_perm_ban_into_single_active_perm_ban(self):
+ """Does the migration turn two active permanent bans into one active permanent ban?"""
+ user_id, infraction_history = self.created_infractions["two active perm bans"]
+ active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count()
+ self.assertEqual(active_count, 1)
+
+ def test_migration_leaves_temporary_ban_with_longest_duration_active(self):
+ """Does the migration turn two active permanent bans into one active permanent ban?"""
+ user_id, infraction_history = self.created_infractions["multiple active temp bans"]
+ active_ban = self.infraction_model.objects.get(user__id=user_id, active=True)
+ self.assertEqual(active_ban.expires_at, infraction_history[2].expires_at)
+
+ def test_migration_leaves_permanent_ban_active(self):
+ """Does the migration leave the permanent ban active?"""
+ user_id, infraction_history = self.created_infractions["active perm, two active temp bans"]
+ active_ban = self.infraction_model.objects.get(user__id=user_id, active=True)
+ self.assertIsNone(active_ban.expires_at)
+
+ def test_migration_leaves_longest_temp_ban_active_with_inactive_permanent_ban(self):
+ """Does the longest temp ban stay active, even with an inactive perm ban present?"""
+ user_id, infraction_history = self.created_infractions[
+ "one inactive perm ban, two active temp bans"
+ ]
+ active_ban = self.infraction_model.objects.get(user__id=user_id, active=True)
+ self.assertEqual(active_ban.expires_at, infraction_history[0].expires_at)
+
+ def test_migration_leaves_all_active_types_active_if_one_of_each_exists(self):
+ """Do all active infractions stay active if only one of each is present?"""
+ user_id, infraction_history = self.created_infractions["active ban, mute, and superstar"]
+ active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count()
+ self.assertEqual(active_count, 4)
+
+ def test_migration_reduces_all_active_types_to_a_single_active_infraction(self):
+ """Do we reduce all of the infraction types to one active infraction?"""
+ user_id, infraction_history = self.created_infractions["multiple active bans, mutes, stars"]
+ active_infractions = self.infraction_model.objects.filter(user__id=user_id, active=True)
+ self.assertEqual(len(active_infractions), 4)
+ types_observed = [infraction.type for infraction in active_infractions]
+
+ for infraction_type in ('ban', 'mute', 'superstar', 'watch'):
+ with self.subTest(type=infraction_type):
+ self.assertIn(infraction_type, types_observed)
diff --git a/pydis_site/apps/api/tests/migrations/test_base.py b/pydis_site/apps/api/tests/migrations/test_base.py
new file mode 100644
index 00000000..f69bc92c
--- /dev/null
+++ b/pydis_site/apps/api/tests/migrations/test_base.py
@@ -0,0 +1,135 @@
+import logging
+from unittest.mock import call, patch
+
+from django.db.migrations.loader import MigrationLoader
+from django.test import TestCase
+
+from .base import MigrationsTestCase, connection
+
+log = logging.getLogger(__name__)
+
+
+class SpanishInquisition(MigrationsTestCase):
+ app = "api"
+ migration_prior = "scragly"
+ migration_target = "kosa"
+
+
+@patch("pydis_site.apps.api.tests.migrations.base.MigrationExecutor")
+class MigrationsTestCaseNoSideEffectsTests(TestCase):
+ """Tests the MigrationTestCase class with actual migration side effects disabled."""
+
+ def setUp(self):
+ """Set up an instance of MigrationsTestCase for use in tests."""
+ self.test_case = SpanishInquisition()
+
+ def test_missing_app_class_raises_value_error(self, _migration_executor):
+ """A MigrationsTestCase subclass should set the class-attribute `app`."""
+ class Spam(MigrationsTestCase):
+ pass
+
+ spam = Spam()
+ with self.assertRaises(ValueError, msg="The `app` attribute was not set."):
+ spam.setUpTestData()
+
+ def test_missing_migration_class_attributes_raise_value_error(self, _migration_executor):
+ """A MigrationsTestCase subclass should set both `migration_prior` and `migration_target`"""
+ class Eggs(MigrationsTestCase):
+ app = "api"
+ migration_target = "lemon"
+
+ class Bacon(MigrationsTestCase):
+ app = "api"
+ migration_prior = "mark"
+
+ instances = (Eggs(), Bacon())
+
+ exception_message = "Both ` migration_prior` and `migration_target` need to be set."
+ for instance in instances:
+ with self.subTest(
+ migration_prior=instance.migration_prior,
+ migration_target=instance.migration_target,
+ ):
+ with self.assertRaises(ValueError, msg=exception_message):
+ instance.setUpTestData()
+
+ @patch(f"{__name__}.SpanishInquisition.setUpMigrationData")
+ @patch(f"{__name__}.SpanishInquisition.setUpPostMigrationData")
+ def test_migration_data_hooks_are_called_once(self, pre_hook, post_hook, _migration_executor):
+ """The `setUpMigrationData` and `setUpPostMigrationData` hooks should be called once."""
+ self.test_case.setUpTestData()
+ for hook in (pre_hook, post_hook):
+ with self.subTest(hook=repr(hook)):
+ hook.assert_called_once()
+
+ def test_migration_executor_is_instantiated_twice(self, migration_executor):
+ """The `MigrationExecutor` should be instantiated with the database connection twice."""
+ self.test_case.setUpTestData()
+
+ expected_args = [call(connection), call(connection)]
+ self.assertEqual(migration_executor.call_args_list, expected_args)
+
+ def test_project_state_is_loaded_for_correct_migration_files_twice(self, migration_executor):
+ """The `project_state` should first be loaded with `migrate_from`, then `migrate_to`."""
+ self.test_case.setUpTestData()
+
+ expected_args = [call(self.test_case.migrate_from), call(self.test_case.migrate_to)]
+ self.assertEqual(migration_executor().loader.project_state.call_args_list, expected_args)
+
+ def test_loader_build_graph_gets_called_once(self, migration_executor):
+ """We should rebuild the migration graph before applying the second set of migrations."""
+ self.test_case.setUpTestData()
+
+ migration_executor().loader.build_graph.assert_called_once()
+
+ def test_migration_executor_migrate_method_is_called_correctly_twice(self, migration_executor):
+ """The migrate method of the executor should be called twice with the correct arguments."""
+ self.test_case.setUpTestData()
+
+ self.assertEqual(migration_executor().migrate.call_count, 2)
+ calls = [call([('api', 'scragly')]), call([('api', 'kosa')])]
+ migration_executor().migrate.assert_has_calls(calls)
+
+
+class LifeOfBrian(MigrationsTestCase):
+ app = "api"
+ migration_prior = "0046_reminder_jump_url"
+ migration_target = "0048_add_infractions_unique_constraints_active"
+
+ @classmethod
+ def log_last_migration(cls):
+ """Parses the applied migrations dictionary to log the last applied migration."""
+ loader = MigrationLoader(connection)
+ api_migrations = [
+ migration for app, migration in loader.applied_migrations if app == cls.app
+ ]
+ last_migration = max(api_migrations, key=lambda name: int(name[:4]))
+ log.info(f"The last applied migration: {last_migration}")
+
+ @classmethod
+ def setUpMigrationData(cls, apps):
+ """Method that logs the last applied migration at this point."""
+ cls.log_last_migration()
+
+ @classmethod
+ def setUpPostMigrationData(cls, apps):
+ """Method that logs the last applied migration at this point."""
+ cls.log_last_migration()
+
+
+class MigrationsTestCaseMigrationTest(TestCase):
+ """Tests if `MigrationsTestCase` travels to the right points in the migration history."""
+
+ def test_migrations_test_case_travels_to_correct_migrations_in_history(self):
+ """The test case should first revert to `migration_prior`, then go to `migration_target`."""
+ brian = LifeOfBrian()
+
+ with self.assertLogs(log, level=logging.INFO) as logs:
+ brian.setUpTestData()
+
+ self.assertEqual(len(logs.records), 2)
+
+ for time_point, record in zip(("migration_prior", "migration_target"), logs.records):
+ with self.subTest(time_point=time_point):
+ message = f"The last applied migration: {getattr(brian, time_point)}"
+ self.assertEqual(record.getMessage(), message)
diff --git a/pydis_site/apps/api/tests/test_infractions.py b/pydis_site/apps/api/tests/test_infractions.py
index c58c32e2..7a54640e 100644
--- a/pydis_site/apps/api/tests/test_infractions.py
+++ b/pydis_site/apps/api/tests/test_infractions.py
@@ -1,6 +1,8 @@
from datetime import datetime as dt, timedelta, timezone
+from unittest.mock import patch
from urllib.parse import quote
+from django.db.utils import IntegrityError
from django_hosts.resolvers import reverse
from .base import APISubdomainTestCase
@@ -167,6 +169,12 @@ class CreationTests(APISubdomainTestCase):
discriminator=1,
avatar_hash=None
)
+ cls.second_user = User.objects.create(
+ id=6,
+ name='carl',
+ discriminator=2,
+ avatar_hash=None
+ )
def test_accepts_valid_data(self):
url = reverse('bot:infraction-list', host='api')
@@ -305,6 +313,187 @@ class CreationTests(APISubdomainTestCase):
'hidden': [f'{data["type"]} infractions must be hidden.']
})
+ def test_returns_400_for_active_infraction_of_type_that_cannot_be_active(self):
+ """Test if the API rejects active infractions for types that cannot be active."""
+ url = reverse('bot:infraction-list', host='api')
+ restricted_types = (
+ ('note', True),
+ ('warning', False),
+ ('kick', False),
+ )
+
+ for infraction_type, hidden in restricted_types:
+ with self.subTest(infraction_type=infraction_type):
+ invalid_infraction = {
+ 'user': self.user.id,
+ 'actor': self.user.id,
+ 'type': infraction_type,
+ 'reason': 'Take me on!',
+ 'hidden': hidden,
+ 'active': True,
+ 'expires_at': None,
+ }
+ response = self.client.post(url, data=invalid_infraction)
+ self.assertEqual(response.status_code, 400)
+ self.assertEqual(
+ response.json(),
+ {'active': [f'{infraction_type} infractions cannot be active.']}
+ )
+
+ def test_returns_400_for_second_active_infraction_of_the_same_type(self):
+ """Test if the API rejects a second active infraction of the same type for a given user."""
+ url = reverse('bot:infraction-list', host='api')
+ active_infraction_types = ('mute', 'ban', 'superstar')
+
+ for infraction_type in active_infraction_types:
+ with self.subTest(infraction_type=infraction_type):
+ first_active_infraction = {
+ 'user': self.user.id,
+ 'actor': self.user.id,
+ 'type': infraction_type,
+ 'reason': 'Take me on!',
+ 'active': True,
+ 'expires_at': '2019-10-04T12:52:00+00:00'
+ }
+
+ # Post the first active infraction of a type and confirm it's accepted.
+ first_response = self.client.post(url, data=first_active_infraction)
+ self.assertEqual(first_response.status_code, 201)
+
+ second_active_infraction = {
+ 'user': self.user.id,
+ 'actor': self.user.id,
+ 'type': infraction_type,
+ 'reason': 'Take on me!',
+ 'active': True,
+ 'expires_at': '2019-10-04T12:52:00+00:00'
+ }
+ second_response = self.client.post(url, data=second_active_infraction)
+ self.assertEqual(second_response.status_code, 400)
+ self.assertEqual(
+ second_response.json(),
+ {
+ 'non_field_errors': [
+ 'This user already has an active infraction of this type.'
+ ]
+ }
+ )
+
+ def test_returns_201_for_second_active_infraction_of_different_type(self):
+ """Test if the API accepts a second active infraction of a different type than the first."""
+ url = reverse('bot:infraction-list', host='api')
+ first_active_infraction = {
+ 'user': self.user.id,
+ 'actor': self.user.id,
+ 'type': 'mute',
+ 'reason': 'Be silent!',
+ 'hidden': True,
+ 'active': True,
+ 'expires_at': '2019-10-04T12:52:00+00:00'
+ }
+ second_active_infraction = {
+ 'user': self.user.id,
+ 'actor': self.user.id,
+ 'type': 'ban',
+ 'reason': 'Be gone!',
+ 'hidden': True,
+ 'active': True,
+ 'expires_at': '2019-10-05T12:52:00+00:00'
+ }
+ # Post the first active infraction of a type and confirm it's accepted.
+ first_response = self.client.post(url, data=first_active_infraction)
+ self.assertEqual(first_response.status_code, 201)
+
+ # Post the first active infraction of a type and confirm it's accepted.
+ second_response = self.client.post(url, data=second_active_infraction)
+ self.assertEqual(second_response.status_code, 201)
+
+ def test_unique_constraint_raises_integrity_error_on_second_active_of_same_type(self):
+ """Do we raise `IntegrityError` for the second active infraction of a type for a user?"""
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=True,
+ reason="The first active ban"
+ )
+ with self.assertRaises(IntegrityError):
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=True,
+ reason="The second active ban"
+ )
+
+ def test_unique_constraint_accepts_active_infraction_after_inactive_infraction(self):
+ """Do we accept an active infraction if the others of the same type are inactive?"""
+ try:
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=False,
+ reason="The first inactive ban"
+ )
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=False,
+ reason="The second inactive ban"
+ )
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=True,
+ reason="The first active ban"
+ )
+ except IntegrityError:
+ self.fail("An unexpected IntegrityError was raised.")
+
+ @patch(f"{__name__}.Infraction")
+ def test_if_accepts_active_infraction_test_catches_integrity_error(self, infraction_patch):
+ """Does the test properly catch the IntegrityError and raise an AssertionError?"""
+ infraction_patch.objects.create.side_effect = IntegrityError
+ with self.assertRaises(AssertionError, msg="An unexpected IntegrityError was raised."):
+ self.test_unique_constraint_accepts_active_infraction_after_inactive_infraction()
+
+ def test_unique_constraint_accepts_second_active_of_different_type(self):
+ """Do we accept a second active infraction of a different type for a given user?"""
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=True,
+ reason="The first active ban"
+ )
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="mute",
+ active=True,
+ reason="The first active mute"
+ )
+
+ def test_unique_constraint_accepts_active_infractions_for_different_users(self):
+ """Do we accept two active infractions of the same type for two different users?"""
+ Infraction.objects.create(
+ user=self.user,
+ actor=self.user,
+ type="ban",
+ active=True,
+ reason="An active ban for the first user"
+ )
+ Infraction.objects.create(
+ user=self.second_user,
+ actor=self.second_user,
+ type="ban",
+ active=False,
+ reason="An active ban for the second user"
+ )
+
class ExpandedTests(APISubdomainTestCase):
@classmethod
@@ -318,12 +507,14 @@ class ExpandedTests(APISubdomainTestCase):
cls.kick = Infraction.objects.create(
user_id=cls.user.id,
actor_id=cls.user.id,
- type='kick'
+ type='kick',
+ active=False
)
cls.warning = Infraction.objects.create(
user_id=cls.user.id,
actor_id=cls.user.id,
- type='warning'
+ type='warning',
+ active=False,
)
def check_expanded_fields(self, infraction):