diff options
| author | 2019-11-11 19:55:47 +1000 | |
|---|---|---|
| committer | 2019-11-11 19:55:47 +1000 | |
| commit | afc014eb52b1b9d05753c8722965b6646738639a (patch) | |
| tree | 7c23f42a1d6371e4b0069564bf132538909e64e2 /pydis_site/apps/api/tests/migrations | |
| parent | Merge branch 'master' into allauth-user-settings (diff) | |
| parent | Merge pull request #278 from python-discord/active-infractions-validation (diff) | |
Merge branch 'master' into allauth-user-settings
Diffstat (limited to 'pydis_site/apps/api/tests/migrations')
4 files changed, 734 insertions, 0 deletions
| diff --git a/pydis_site/apps/api/tests/migrations/__init__.py b/pydis_site/apps/api/tests/migrations/__init__.py new file mode 100644 index 00000000..38e42ffc --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/__init__.py @@ -0,0 +1 @@ +"""This submodule contains tests for functions used in data migrations.""" diff --git a/pydis_site/apps/api/tests/migrations/base.py b/pydis_site/apps/api/tests/migrations/base.py new file mode 100644 index 00000000..0c0a5bd0 --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/base.py @@ -0,0 +1,102 @@ +"""Includes utilities for testing migrations.""" +from django.db import connection +from django.db.migrations.executor import MigrationExecutor +from django.test import TestCase + + +class MigrationsTestCase(TestCase): +    """ +    A `TestCase` subclass to test migration files. + +    To be able to properly test a migration, we will need to inject data into the test database +    before the migrations we want to test are applied, but after the older migrations have been +    applied. This makes sure that we are testing "as if" we were actually applying this migration +    to a database in the state it was in before introducing the new migration. + +    To set up a MigrationsTestCase, create a subclass of this class and set the following +    class-level attributes: + +    - app: The name of the app that contains the migrations (e.g., `'api'`) +    - migration_prior: The name* of the last migration file before the migrations you want to test +    - migration_target: The name* of the last migration file we want to test + +    *) Specify the file names without a path or the `.py` file extension. + +    Additionally, overwrite the `setUpMigrationData` in the subclass to inject data into the +    database before the migrations we want to test are applied. Please read the docstring of the +    method for more information. An optional hook, `setUpPostMigrationData` is also provided. +    """ + +    # These class-level attributes should be set in classes that inherit from this base class. +    app = None +    migration_prior = None +    migration_target = None + +    @classmethod +    def setUpTestData(cls): +        """ +        Injects data into the test database prior to the migration we're trying to test. + +        This class methods reverts the test database back to the state of the last migration file +        prior to the migrations we want to test. It will then allow the user to inject data into the +        test database by calling the `setUpMigrationData` hook. After the data has been injected, it +        will apply the migrations we want to test and call the `setUpPostMigrationData` hook. The +        user can now test if the migration correctly migrated the injected test data. +        """ +        if not cls.app: +            raise ValueError("The `app` attribute was not set.") + +        if not cls.migration_prior or not cls.migration_target: +            raise ValueError("Both ` migration_prior` and `migration_target` need to be set.") + +        cls.migrate_from = [(cls.app, cls.migration_prior)] +        cls.migrate_to = [(cls.app, cls.migration_target)] + +        # Reverse to database state prior to the migrations we want to test +        executor = MigrationExecutor(connection) +        executor.migrate(cls.migrate_from) + +        # Call the data injection hook with the current state of the project +        old_apps = executor.loader.project_state(cls.migrate_from).apps +        cls.setUpMigrationData(old_apps) + +        # Run the migrations we want to test +        executor = MigrationExecutor(connection) +        executor.loader.build_graph() +        executor.migrate(cls.migrate_to) + +        # Save the project state so we're able to work with the correct model states +        cls.apps = executor.loader.project_state(cls.migrate_to).apps + +        # Call `setUpPostMigrationData` to potentially set up post migration data used in testing +        cls.setUpPostMigrationData(cls.apps) + +    @classmethod +    def setUpMigrationData(cls, apps): +        """ +        Override this method to inject data into the test database before the migration is applied. + +        This method will be called after setting up the database according to the migrations that +        come before the migration(s) we are trying to test, but before the to-be-tested migration(s) +        are applied. This allows us to simulate a database state just prior to the migrations we are +        trying to test. + +        To make sure we're creating objects according to the state the models were in at this point +        in the migration history, use `apps.get_model(app_name: str, model_name: str)` to get the +        appropriate model, e.g.: + +        >>> Infraction = apps.get_model('api', 'Infraction') +        """ +        pass + +    @classmethod +    def setUpPostMigrationData(cls, apps): +        """ +        Set up additional test data after the target migration has been applied. + +        Use `apps.get_model(app_name: str, model_name: str)` to get the correct instances of the +        model classes: + +        >>> Infraction = apps.get_model('api', 'Infraction') +        """ +        pass diff --git a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py new file mode 100644 index 00000000..8dc29b34 --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py @@ -0,0 +1,496 @@ +"""Tests for the data migration in `filename`.""" +import logging +from collections import ChainMap, namedtuple +from datetime import timedelta +from itertools import count +from typing import Dict, Iterable, Type, Union + +from django.db.models import Q +from django.forms.models import model_to_dict +from django.utils import timezone + +from pydis_site.apps.api.models import Infraction, User +from .base import MigrationsTestCase + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + + +InfractionHistory = namedtuple('InfractionHistory', ("user_id", "infraction_history")) + + +class InfractionFactory: +    """Factory that creates infractions for a User instance.""" + +    infraction_id = count(1) +    user_id = count(1) +    default_values = { +        'active': True, +        'expires_at': None, +        'hidden': False, +    } + +    @classmethod +    def create( +        cls, +        actor: User, +        infractions: Iterable[Dict[str, Union[str, int, bool]]], +        infraction_model: Type[Infraction] = Infraction, +        user_model: Type[User] = User, +    ) -> InfractionHistory: +        """ +        Creates `infractions` for the `user` with the given `actor`. + +        The `infractions` dictionary can contain the following fields: +         - `type` (required) +         - `active` (default: True) +         - `expires_at` (default: None; i.e, permanent) +         - `hidden` (default: False). + +        The parameters `infraction_model` and `user_model` can be used to pass in an instance of +        both model classes from a different migration/project state. +        """ +        user_id = next(cls.user_id) +        user = user_model.objects.create( +            id=user_id, +            name=f"Infracted user {user_id}", +            discriminator=user_id, +            avatar_hash=None, +        ) +        infraction_history = [] + +        for infraction in infractions: +            infraction = dict(infraction) +            infraction["id"] = next(cls.infraction_id) +            infraction = ChainMap(infraction, cls.default_values) +            new_infraction = infraction_model.objects.create( +                user=user, +                actor=actor, +                type=infraction["type"], +                reason=f"`{infraction['type']}` infraction (ID: {infraction['id']} of {user}", +                active=infraction['active'], +                hidden=infraction['hidden'], +                expires_at=infraction['expires_at'], +            ) +            infraction_history.append(new_infraction) + +        return InfractionHistory(user_id=user_id, infraction_history=infraction_history) + + +class InfractionFactoryTests(MigrationsTestCase): +    """Tests for the InfractionFactory.""" + +    app = "api" +    migration_prior = "0046_reminder_jump_url" +    migration_target = "0046_reminder_jump_url" + +    @classmethod +    def setUpPostMigrationData(cls, apps): +        """Create a default actor for all infractions.""" +        cls.infraction_model = apps.get_model('api', 'Infraction') +        cls.user_model = apps.get_model('api', 'User') + +        cls.actor = cls.user_model.objects.create( +            id=9999, +            name="Unknown Moderator", +            discriminator=1040, +            avatar_hash=None, +        ) + +    def test_infraction_factory_total_count(self): +        """Does the test database hold as many infractions as we tried to create?""" +        InfractionFactory.create( +            actor=self.actor, +            infractions=( +                {'type': 'kick', 'active': False, 'hidden': False}, +                {'type': 'ban', 'active': True, 'hidden': False}, +                {'type': 'note', 'active': False, 'hidden': True}, +            ), +            infraction_model=self.infraction_model, +            user_model=self.user_model, +        ) +        database_count = Infraction.objects.all().count() +        self.assertEqual(3, database_count) + +    def test_infraction_factory_multiple_users(self): +        """Does the test database hold as many infractions as we tried to create?""" +        for _user in range(5): +            InfractionFactory.create( +                actor=self.actor, +                infractions=( +                    {'type': 'kick', 'active': False, 'hidden': True}, +                    {'type': 'ban', 'active': True, 'hidden': False}, +                ), +                infraction_model=self.infraction_model, +                user_model=self.user_model, +            ) + +        # Check if infractions and users are recorded properly in the database +        database_count = Infraction.objects.all().count() +        self.assertEqual(database_count, 10) + +        user_count = User.objects.all().count() +        self.assertEqual(user_count, 5 + 1) + +    def test_infraction_factory_sets_correct_fields(self): +        """Does the InfractionFactory set the correct attributes?""" +        infractions = ( +            { +                'type': 'note', +                'active': False, +                'hidden': True, +                'expires_at': timezone.now() +            }, +            {'type': 'warning', 'active': False, 'hidden': False, 'expires_at': None}, +            {'type': 'watch', 'active': False, 'hidden': True, 'expires_at': None}, +            {'type': 'mute', 'active': True, 'hidden': False, 'expires_at': None}, +            {'type': 'kick', 'active': True, 'hidden': True, 'expires_at': None}, +            {'type': 'ban', 'active': True, 'hidden': False, 'expires_at': None}, +            { +                'type': 'superstar', +                'active': True, +                'hidden': True, +                'expires_at': timezone.now() +            }, +        ) + +        InfractionFactory.create( +            actor=self.actor, +            infractions=infractions, +            infraction_model=self.infraction_model, +            user_model=self.user_model, +        ) + +        for infraction in infractions: +            with self.subTest(**infraction): +                self.assertTrue(Infraction.objects.filter(**infraction).exists()) + + +class ActiveInfractionMigrationTests(MigrationsTestCase): +    """ +    Tests the active infraction data migration. + +    The active infraction data migration should do the following things: + +    1.  migrates all active notes, warnings, and kicks to an inactive status; +    2.  migrates all users with multiple active infractions of a single type to have only one active +        infraction of that type. The infraction with the longest duration stays active. +    """ + +    app = "api" +    migration_prior = "0046_reminder_jump_url" +    migration_target = "0047_active_infractions_migration" + +    @classmethod +    def setUpMigrationData(cls, apps): +        """Sets up an initial database state that contains the relevant test cases.""" +        # Fetch the Infraction and User model in the current migration state +        cls.infraction_model = apps.get_model('api', 'Infraction') +        cls.user_model = apps.get_model('api', 'User') + +        cls.created_infractions = {} + +        # Moderator that serves as actor for all infractions +        cls.user_moderator = cls.user_model.objects.create( +            id=9999, +            name="Olivier de Vienne", +            discriminator=1040, +            avatar_hash=None, +        ) + +        # User #1: clean user with no infractions +        cls.created_infractions["no infractions"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=[], +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #2: One inactive note infraction +        cls.created_infractions["one inactive note"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': False, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #3: One active note infraction +        cls.created_infractions["one active note"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #4: One active and one inactive note infraction +        cls.created_infractions["one active and one inactive note"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': False, 'hidden': True}, +                {'type': 'note', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #5: Once active note, one active kick, once active warning +        cls.created_infractions["active note, kick, warning"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': True, 'hidden': True}, +                {'type': 'kick', 'active': True, 'hidden': True}, +                {'type': 'warning', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #6: One inactive ban and one active ban +        cls.created_infractions["one inactive and one active ban"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': False, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #7: Two active permanent bans +        cls.created_infractions["two active perm bans"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #8: Multiple active temporary bans +        cls.created_infractions["multiple active temp bans"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=1) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=10) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=20) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=5) +                }, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #9: One active permanent ban, two active temporary bans +        cls.created_infractions["active perm, two active temp bans"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=10) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': None, +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=7) +                }, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #10: One inactive permanent ban, two active temporary bans +        cls.created_infractions["one inactive perm ban, two active temp bans"] = ( +            InfractionFactory.create( +                actor=cls.user_moderator, +                infractions=( +                    { +                        'type': 'ban', +                        'active': True, +                        'hidden': True, +                        'expires_at': timezone.now() + timedelta(days=10) +                    }, +                    { +                        'type': 'ban', +                        'active': False, +                        'hidden': True, +                        'expires_at': None, +                    }, +                    { +                        'type': 'ban', +                        'active': True, +                        'hidden': True, +                        'expires_at': timezone.now() + timedelta(days=7) +                    }, +                ), +                infraction_model=cls.infraction_model, +                user_model=cls.user_model, +            ) +        ) + +        # User #11: Active ban, active mute, active superstar +        cls.created_infractions["active ban, mute, and superstar"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #12: Multiple active bans, active mutes, active superstars +        cls.created_infractions["multiple active bans, mutes, stars"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +    def test_all_never_active_types_became_inactive(self): +        """Are all infractions of a non-active type inactive after the migration?""" +        inactive_type_query = Q(type="note") | Q(type="warning") | Q(type="kick") +        self.assertFalse( +            self.infraction_model.objects.filter(inactive_type_query, active=True).exists() +        ) + +    def test_migration_left_clean_user_without_infractions(self): +        """Do users without infractions have no infractions after the migration?""" +        user_id, infraction_history = self.created_infractions["no infractions"] +        self.assertFalse( +            self.infraction_model.objects.filter(user__id=user_id).exists() +        ) + +    def test_migration_left_user_with_inactive_note_untouched(self): +        """Did the migration leave users with only an inactive note untouched?""" +        user_id, infraction_history = self.created_infractions["one inactive note"] +        inactive_note = infraction_history[0] +        self.assertTrue( +            self.infraction_model.objects.filter(**model_to_dict(inactive_note)).exists() +        ) + +    def test_migration_only_touched_active_field_of_active_note(self): +        """Does the migration only change the `active` field?""" +        user_id, infraction_history = self.created_infractions["one active note"] +        note = model_to_dict(infraction_history[0]) +        note['active'] = False +        self.assertTrue( +            self.infraction_model.objects.filter(**note).exists() +        ) + +    def test_migration_only_touched_active_field_of_active_note_left_inactive_untouched(self): +        """Does the migration only change the `active` field of active notes?""" +        user_id, infraction_history = self.created_infractions["one active and one inactive note"] +        for note in infraction_history: +            with self.subTest(active=note.active): +                note = model_to_dict(note) +                note['active'] = False +                self.assertTrue( +                    self.infraction_model.objects.filter(**note).exists() +                ) + +    def test_migration_migrates_all_nonactive_types_to_inactive(self): +        """Do we set the `active` field of all non-active infractions to `False`?""" +        user_id, infraction_history = self.created_infractions["active note, kick, warning"] +        self.assertFalse( +            self.infraction_model.objects.filter(user__id=user_id, active=True).exists() +        ) + +    def test_migration_leaves_user_with_one_active_ban_untouched(self): +        """Do we leave a user with one active and one inactive ban untouched?""" +        user_id, infraction_history = self.created_infractions["one inactive and one active ban"] +        for infraction in infraction_history: +            with self.subTest(active=infraction.active): +                self.assertTrue( +                    self.infraction_model.objects.filter(**model_to_dict(infraction)).exists() +                ) + +    def test_migration_turns_double_active_perm_ban_into_single_active_perm_ban(self): +        """Does the migration turn two active permanent bans into one active permanent ban?""" +        user_id, infraction_history = self.created_infractions["two active perm bans"] +        active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count() +        self.assertEqual(active_count, 1) + +    def test_migration_leaves_temporary_ban_with_longest_duration_active(self): +        """Does the migration turn two active permanent bans into one active permanent ban?""" +        user_id, infraction_history = self.created_infractions["multiple active temp bans"] +        active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) +        self.assertEqual(active_ban.expires_at, infraction_history[2].expires_at) + +    def test_migration_leaves_permanent_ban_active(self): +        """Does the migration leave the permanent ban active?""" +        user_id, infraction_history = self.created_infractions["active perm, two active temp bans"] +        active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) +        self.assertIsNone(active_ban.expires_at) + +    def test_migration_leaves_longest_temp_ban_active_with_inactive_permanent_ban(self): +        """Does the longest temp ban stay active, even with an inactive perm ban present?""" +        user_id, infraction_history = self.created_infractions[ +            "one inactive perm ban, two active temp bans" +        ] +        active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) +        self.assertEqual(active_ban.expires_at, infraction_history[0].expires_at) + +    def test_migration_leaves_all_active_types_active_if_one_of_each_exists(self): +        """Do all active infractions stay active if only one of each is present?""" +        user_id, infraction_history = self.created_infractions["active ban, mute, and superstar"] +        active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count() +        self.assertEqual(active_count, 4) + +    def test_migration_reduces_all_active_types_to_a_single_active_infraction(self): +        """Do we reduce all of the infraction types to one active infraction?""" +        user_id, infraction_history = self.created_infractions["multiple active bans, mutes, stars"] +        active_infractions = self.infraction_model.objects.filter(user__id=user_id, active=True) +        self.assertEqual(len(active_infractions), 4) +        types_observed = [infraction.type for infraction in active_infractions] + +        for infraction_type in ('ban', 'mute', 'superstar', 'watch'): +            with self.subTest(type=infraction_type): +                self.assertIn(infraction_type, types_observed) diff --git a/pydis_site/apps/api/tests/migrations/test_base.py b/pydis_site/apps/api/tests/migrations/test_base.py new file mode 100644 index 00000000..f69bc92c --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/test_base.py @@ -0,0 +1,135 @@ +import logging +from unittest.mock import call, patch + +from django.db.migrations.loader import MigrationLoader +from django.test import TestCase + +from .base import MigrationsTestCase, connection + +log = logging.getLogger(__name__) + + +class SpanishInquisition(MigrationsTestCase): +    app = "api" +    migration_prior = "scragly" +    migration_target = "kosa" + + +@patch("pydis_site.apps.api.tests.migrations.base.MigrationExecutor") +class MigrationsTestCaseNoSideEffectsTests(TestCase): +    """Tests the MigrationTestCase class with actual migration side effects disabled.""" + +    def setUp(self): +        """Set up an instance of MigrationsTestCase for use in tests.""" +        self.test_case = SpanishInquisition() + +    def test_missing_app_class_raises_value_error(self, _migration_executor): +        """A MigrationsTestCase subclass should set the class-attribute `app`.""" +        class Spam(MigrationsTestCase): +            pass + +        spam = Spam() +        with self.assertRaises(ValueError, msg="The `app` attribute was not set."): +            spam.setUpTestData() + +    def test_missing_migration_class_attributes_raise_value_error(self, _migration_executor): +        """A MigrationsTestCase subclass should set both `migration_prior` and `migration_target`""" +        class Eggs(MigrationsTestCase): +            app = "api" +            migration_target = "lemon" + +        class Bacon(MigrationsTestCase): +            app = "api" +            migration_prior = "mark" + +        instances = (Eggs(), Bacon()) + +        exception_message = "Both ` migration_prior` and `migration_target` need to be set." +        for instance in instances: +            with self.subTest( +                    migration_prior=instance.migration_prior, +                    migration_target=instance.migration_target, +            ): +                with self.assertRaises(ValueError, msg=exception_message): +                    instance.setUpTestData() + +    @patch(f"{__name__}.SpanishInquisition.setUpMigrationData") +    @patch(f"{__name__}.SpanishInquisition.setUpPostMigrationData") +    def test_migration_data_hooks_are_called_once(self, pre_hook, post_hook, _migration_executor): +        """The `setUpMigrationData` and `setUpPostMigrationData` hooks should be called once.""" +        self.test_case.setUpTestData() +        for hook in (pre_hook, post_hook): +            with self.subTest(hook=repr(hook)): +                hook.assert_called_once() + +    def test_migration_executor_is_instantiated_twice(self, migration_executor): +        """The `MigrationExecutor` should be instantiated with the database connection twice.""" +        self.test_case.setUpTestData() + +        expected_args = [call(connection), call(connection)] +        self.assertEqual(migration_executor.call_args_list, expected_args) + +    def test_project_state_is_loaded_for_correct_migration_files_twice(self, migration_executor): +        """The `project_state` should first be loaded with `migrate_from`, then `migrate_to`.""" +        self.test_case.setUpTestData() + +        expected_args = [call(self.test_case.migrate_from), call(self.test_case.migrate_to)] +        self.assertEqual(migration_executor().loader.project_state.call_args_list, expected_args) + +    def test_loader_build_graph_gets_called_once(self, migration_executor): +        """We should rebuild the migration graph before applying the second set of migrations.""" +        self.test_case.setUpTestData() + +        migration_executor().loader.build_graph.assert_called_once() + +    def test_migration_executor_migrate_method_is_called_correctly_twice(self, migration_executor): +        """The migrate method of the executor should be called twice with the correct arguments.""" +        self.test_case.setUpTestData() + +        self.assertEqual(migration_executor().migrate.call_count, 2) +        calls = [call([('api', 'scragly')]), call([('api', 'kosa')])] +        migration_executor().migrate.assert_has_calls(calls) + + +class LifeOfBrian(MigrationsTestCase): +    app = "api" +    migration_prior = "0046_reminder_jump_url" +    migration_target = "0048_add_infractions_unique_constraints_active" + +    @classmethod +    def log_last_migration(cls): +        """Parses the applied migrations dictionary to log the last applied migration.""" +        loader = MigrationLoader(connection) +        api_migrations = [ +            migration for app, migration in loader.applied_migrations if app == cls.app +        ] +        last_migration = max(api_migrations, key=lambda name: int(name[:4])) +        log.info(f"The last applied migration: {last_migration}") + +    @classmethod +    def setUpMigrationData(cls, apps): +        """Method that logs the last applied migration at this point.""" +        cls.log_last_migration() + +    @classmethod +    def setUpPostMigrationData(cls, apps): +        """Method that logs the last applied migration at this point.""" +        cls.log_last_migration() + + +class MigrationsTestCaseMigrationTest(TestCase): +    """Tests if `MigrationsTestCase` travels to the right points in the migration history.""" + +    def test_migrations_test_case_travels_to_correct_migrations_in_history(self): +        """The test case should first revert to `migration_prior`, then go to `migration_target`.""" +        brian = LifeOfBrian() + +        with self.assertLogs(log, level=logging.INFO) as logs: +            brian.setUpTestData() + +            self.assertEqual(len(logs.records), 2) + +            for time_point, record in zip(("migration_prior", "migration_target"), logs.records): +                with self.subTest(time_point=time_point): +                    message = f"The last applied migration: {getattr(brian, time_point)}" +                    self.assertEqual(record.getMessage(), message) | 
