diff options
| -rw-r--r-- | Pipfile | 6 | ||||
| -rw-r--r-- | Pipfile.lock | 93 | ||||
| -rw-r--r-- | pydis_site/apps/api/migrations/0047_active_infractions_migration.py | 105 | ||||
| -rw-r--r-- | pydis_site/apps/api/migrations/0048_add_infractions_unique_constraints_active.py | 17 | ||||
| -rw-r--r-- | pydis_site/apps/api/models/bot/infraction.py | 7 | ||||
| -rw-r--r-- | pydis_site/apps/api/models/bot/user.py | 2 | ||||
| -rw-r--r-- | pydis_site/apps/api/serializers.py | 13 | ||||
| -rw-r--r-- | pydis_site/apps/api/tests/migrations/__init__.py | 1 | ||||
| -rw-r--r-- | pydis_site/apps/api/tests/migrations/base.py | 102 | ||||
| -rw-r--r-- | pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py | 496 | ||||
| -rw-r--r-- | pydis_site/apps/api/tests/migrations/test_base.py | 135 | ||||
| -rw-r--r-- | pydis_site/apps/api/tests/test_infractions.py | 195 | 
12 files changed, 1123 insertions, 49 deletions
| @@ -4,7 +4,7 @@ url = "https://pypi.org/simple"  verify_ssl = true  [packages] -django = "~=2.2" +django = "~=2.2.0"  django-crispy-forms = "~=1.7.2"  django-environ = "~=0.4.5"  django-filter = "~=2.1.0" @@ -26,8 +26,8 @@ django-allauth = "~=0.40"  [dev-packages]  coverage = "~=4.5.3"  flake8 = "~=3.7" -flake8-annotations = "~=1.0" -flake8-bandit = "==1.0.2" +flake8-annotations = "~=1.1" +flake8-bandit = "~=2.1"  flake8-bugbear = "~=19.8"  flake8-docstrings = "~=1.4"  flake8-import-order = "~=0.18" diff --git a/Pipfile.lock b/Pipfile.lock index 9a36c179..06b49ce7 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@  {      "_meta": {          "hash": { -            "sha256": "d43e1ad137078dd4d79576380de6560c2935a38094b9c3d174fc232f0a50f4d4" +            "sha256": "a4bfc709fcdecf5a3bd28326d51625c83a8a7661367cc69dbda20e3a55f1d9d3"          },          "pipfile-spec": 6,          "requires": { @@ -171,6 +171,7 @@          },          "libsass": {              "hashes": [ +                "sha256:175355d74bd040893d539154016153247ea9775d1655a36441c97a453887a0c0",                  "sha256:3113ef32eaf3662c162c250db6883d7a5f177856bfd8bb632a147cb0a95e4fee",                  "sha256:312d135e6bd1a137927fed781dab497c05930305265e3d3b1da3b3d916cd97a6",                  "sha256:32f8322aad9b6b864b826adb5e193d704d5fb2c816f85a5cc5bf775730e5d024", @@ -179,9 +180,11 @@                  "sha256:607ce32c3b31542e0bf1bc2409627dd7247a3849ba720ec34d23426b96346199",                  "sha256:6124594e72ba216b00131795ad5ea5de1e0cf8784e63a01e0c6a4e4c13fc7914",                  "sha256:6129063002fc8337b734f5963ac3eb01ead51e9c88c6d27e73ddc9236cb15b2e", +                "sha256:6d392ecd6e4de2ccfa3b1953f2da8461a2b7c8c8c17c24e1c335ab3040671c1a",                  "sha256:75b38c236be6ca03e3dd3789f3044180fc0836b7c9e4991fcc52a8570f47dc91",                  "sha256:9c711d4e4d003fec7f98fe87bb1faf7d88e6d648356413d8b8d9d76bd1844089",                  "sha256:b15a0e61bd54764e658bc6931015453fa34d954f87c3b6fd35624e13bcacf69d", +                "sha256:bc0c80a4e233b6b791a7f6f99415ab877e8a4d3a45085b68981c97d74dbfc8bf",                  "sha256:c22cdc37121b730e5fb87bc8d3eee8c4b1fe219a04d198a535fbd22895c99e27",                  "sha256:c5ba74babfb3a6976611312e0026c4668913cdf05e009921e1f54146ccdc02a4"              ], @@ -234,37 +237,39 @@          },          "psycopg2-binary": {              "hashes": [ -                "sha256:080c72714784989474f97be9ab0ddf7b2ad2984527e77f2909fcd04d4df53809", -                "sha256:110457be80b63ff4915febb06faa7be002b93a76e5ba19bf3f27636a2ef58598", -                "sha256:171352a03b22fc099f15103959b52ee77d9a27e028895d7e5fde127aa8e3bac5", -                "sha256:19d013e7b0817087517a4b3cab39c084d78898369e5c46258aab7be4f233d6a1", -                "sha256:249b6b21ae4eb0f7b8423b330aa80fab5f821b9ffc3f7561a5e2fd6bb142cf5d", -                "sha256:2ac0731d2d84b05c7bb39e85b7e123c3a0acd4cda631d8d542802c88deb9e87e", -                "sha256:2b6d561193f0dc3f50acfb22dd52ea8c8dfbc64bcafe3938b5f209cc17cb6f00", -                "sha256:2bd23e242e954214944481124755cbefe7c2cf563b1a54cd8d196d502f2578bf", -                "sha256:3e1239242ca60b3725e65ab2f13765fc199b03af9eaf1b5572f0e97bdcee5b43", -                "sha256:3eb70bb697abbe86b1d2b1316370c02ba320bfd1e9e35cf3b9566a855ea8e4e5", -                "sha256:51a2fc7e94b98bd1bb5d4570936f24fc2b0541b63eccadf8fdea266db8ad2f70", -                "sha256:52f1bdafdc764b7447e393ed39bb263eccb12bfda25a4ac06d82e3a9056251f6", -                "sha256:5b3581319a3951f1e866f4f6c5e42023db0fae0284273b82e97dfd32c51985cd", -                "sha256:63c1b66e3b2a3a336288e4bcec499e0dc310cd1dceaed1c46fa7419764c68877", -                "sha256:8123a99f24ecee469e5c1339427bcdb2a33920a18bb5c0d58b7c13f3b0298ba3", -                "sha256:85e699fcabe7f817c0f0a412d4e7c6627e00c412b418da7666ff353f38e30f67", -                "sha256:8dbff4557bbef963697583366400822387cccf794ccb001f1f2307ed21854c68", -                "sha256:908d21d08d6b81f1b7e056bbf40b2f77f8c499ab29e64ec5113052819ef1c89b", -                "sha256:af39d0237b17d0a5a5f638e9dffb34013ce2b1d41441fd30283e42b22d16858a", -                "sha256:af51bb9f055a3f4af0187149a8f60c9d516cf7d5565b3dac53358796a8fb2a5b", -                "sha256:b2ecac57eb49e461e86c092761e6b8e1fd9654dbaaddf71a076dcc869f7014e2", -                "sha256:cd37cc170678a4609becb26b53a2bc1edea65177be70c48dd7b39a1149cabd6e", -                "sha256:d17e3054b17e1a6cb8c1140f76310f6ede811e75b7a9d461922d2c72973f583e", -                "sha256:d305313c5a9695f40c46294d4315ed3a07c7d2b55e48a9010dad7db7a66c8b7f", -                "sha256:dd0ef0eb1f7dd18a3f4187226e226a7284bda6af5671937a221766e6ef1ee88f", -                "sha256:e1adff53b56db9905db48a972fb89370ad5736e0450b96f91bcf99cadd96cfd7", -                "sha256:f0d43828003c82dbc9269de87aa449e9896077a71954fbbb10a614c017e65737", -                "sha256:f78e8b487de4d92640105c1389e5b90be3496b1d75c90a666edd8737cc2dbab7" +                "sha256:040234f8a4a8dfd692662a8308d78f63f31a97e1c42d2480e5e6810c48966a29", +                "sha256:086f7e89ec85a6704db51f68f0dcae432eff9300809723a6e8782c41c2f48e03", +                "sha256:18ca813fdb17bc1db73fe61b196b05dd1ca2165b884dd5ec5568877cabf9b039", +                "sha256:19dc39616850342a2a6db70559af55b22955f86667b5f652f40c0e99253d9881", +                "sha256:2166e770cb98f02ed5ee2b0b569d40db26788e0bf2ec3ae1a0d864ea6f1d8309", +                "sha256:3a2522b1d9178575acee4adf8fd9f979f9c0449b00b4164bb63c3475ea6528ed", +                "sha256:3aa773580f85a28ffdf6f862e59cb5a3cc7ef6885121f2de3fca8d6ada4dbf3b", +                "sha256:3b5deaa3ee7180585a296af33e14c9b18c218d148e735c7accf78130765a47e3", +                "sha256:407af6d7e46593415f216c7f56ba087a9a42bd6dc2ecb86028760aa45b802bd7", +                "sha256:4c3c09fb674401f630626310bcaf6cd6285daf0d5e4c26d6e55ca26a2734e39b", +                "sha256:4c6717962247445b4f9e21c962ea61d2e884fc17df5ddf5e35863b016f8a1f03", +                "sha256:50446fae5681fc99f87e505d4e77c9407e683ab60c555ec302f9ac9bffa61103", +                "sha256:5057669b6a66aa9ca118a2a860159f0ee3acf837eda937bdd2a64f3431361a2d", +                "sha256:5dd90c5438b4f935c9d01fcbad3620253da89d19c1f5fca9158646407ed7df35", +                "sha256:659c815b5b8e2a55193ede2795c1e2349b8011497310bb936da7d4745652823b", +                "sha256:69b13fdf12878b10dc6003acc8d0abf3ad93e79813fd5f3812497c1c9fb9be49", +                "sha256:7a1cb80e35e1ccea3e11a48afe65d38744a0e0bde88795cc56a4d05b6e4f9d70", +                "sha256:7e6e3c52e6732c219c07bd97fff6c088f8df4dae3b79752ee3a817e6f32e177e", +                "sha256:7f42a8490c4fe854325504ce7a6e4796b207960dabb2cbafe3c3959cb00d1d7e", +                "sha256:84156313f258eafff716b2961644a4483a9be44a5d43551d554844d15d4d224e", +                "sha256:8578d6b8192e4c805e85f187bc530d0f52ba86c39172e61cd51f68fddd648103", +                "sha256:890167d5091279a27e2505ff0e1fb273f8c48c41d35c5b92adbf4af80e6b2ed6", +                "sha256:9aadff9032e967865f9778485571e93908d27dab21d0fdfdec0ca779bb6f8ad9", +                "sha256:9f24f383a298a0c0f9b3113b982e21751a8ecde6615494a3f1470eb4a9d70e9e", +                "sha256:a73021b44813b5c84eda4a3af5826dd72356a900bac9bd9dd1f0f81ee1c22c2f", +                "sha256:afd96845e12638d2c44d213d4810a08f4dc4a563f9a98204b7428e567014b1cd", +                "sha256:b73ddf033d8cd4cc9dfed6324b1ad2a89ba52c410ef6877998422fcb9c23e3a8", +                "sha256:dbc5cd56fff1a6152ca59445178652756f4e509f672e49ccdf3d79c1043113a4", +                "sha256:eac8a3499754790187bb00574ab980df13e754777d346f85e0ff6df929bcd964", +                "sha256:eaed1c65f461a959284649e37b5051224f4db6ebdc84e40b5e65f2986f101a08"              ],              "index": "pypi", -            "version": "==2.8.3" +            "version": "==2.8.4"          },          "pygments": {              "hashes": [ @@ -283,10 +288,10 @@          },          "pytz": {              "hashes": [ -                "sha256:26c0b32e437e54a18161324a2fca3c4b9846b74a8dccddd843113109e1116b32", -                "sha256:c894d57500a4cd2d5c71114aaab77dbab5eabd9022308ce5ac9bb93a60a6f0c7" +                "sha256:1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d", +                "sha256:b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be"              ], -            "version": "==2019.2" +            "version": "==2019.3"          },          "pyuwsgi": {              "hashes": [ @@ -414,10 +419,10 @@          },          "attrs": {              "hashes": [ -                "sha256:ec20e7a4825331c1b5ebf261d111e16fa9612c1f7a5e1f884f12bd53a664dfd2", -                "sha256:f913492e1663d3c36f502e5e9ba6cd13cf19d7fab50aa13239e420fef95e1396" +                "sha256:08a96c641c3a74e44eb59afb61a24f2cb9f4d7188748e76ba4bb5edfa3cb7d1c", +                "sha256:f7b7ce16570fe9965acd6d30101a28f62fb4a7f9e926b3bbc9b61f8b04247e72"              ], -            "version": "==19.2.0" +            "version": "==19.3.0"          },          "bandit": {              "hashes": [ @@ -496,11 +501,10 @@          },          "flake8-bandit": {              "hashes": [ -                "sha256:a66c7b42af9530d5e988851ccee02958a51a85d46f1f4609ecc3546948f809b8", -                "sha256:f7c3421fd9aebc63689c0693511e16dcad678fd4a0ce624b78ca91ae713eacdc" +                "sha256:687fc8da2e4a239b206af2e54a90093572a60d0954f3054e23690739b0b0de3b"              ],              "index": "pypi", -            "version": "==1.0.2" +            "version": "==2.1.2"          },          "flake8-bugbear": {              "hashes": [ @@ -705,20 +709,25 @@          },          "typed-ast": {              "hashes": [ +                "sha256:1170afa46a3799e18b4c977777ce137bb53c7485379d9706af8a59f2ea1aa161",                  "sha256:18511a0b3e7922276346bcb47e2ef9f38fb90fd31cb9223eed42c85d1312344e",                  "sha256:262c247a82d005e43b5b7f69aff746370538e176131c32dda9cb0f324d27141e",                  "sha256:2b907eb046d049bcd9892e3076c7a6456c93a25bebfe554e931620c90e6a25b0",                  "sha256:354c16e5babd09f5cb0ee000d54cfa38401d8b8891eefa878ac772f827181a3c", +                "sha256:48e5b1e71f25cfdef98b013263a88d7145879fbb2d5185f2a0c79fa7ebbeae47",                  "sha256:4e0b70c6fc4d010f8107726af5fd37921b666f5b31d9331f0bd24ad9a088e631",                  "sha256:630968c5cdee51a11c05a30453f8cd65e0cc1d2ad0d9192819df9978984529f4",                  "sha256:66480f95b8167c9c5c5c87f32cf437d585937970f3fc24386f313a4c97b44e34",                  "sha256:71211d26ffd12d63a83e079ff258ac9d56a1376a25bc80b1cdcdf601b855b90b", +                "sha256:7954560051331d003b4e2b3eb822d9dd2e376fa4f6d98fee32f452f52dd6ebb2", +                "sha256:838997f4310012cf2e1ad3803bce2f3402e9ffb71ded61b5ee22617b3a7f6b6e",                  "sha256:95bd11af7eafc16e829af2d3df510cecfd4387f6453355188342c3e79a2ec87a",                  "sha256:bc6c7d3fa1325a0c6613512a093bc2a2a15aeec350451cbdf9e1d4bffe3e3233",                  "sha256:cc34a6f5b426748a507dd5d1de4c1978f2eb5626d51326e43280941206c209e1",                  "sha256:d755f03c1e4a51e9b24d899561fec4ccaf51f210d52abdf8c07ee2849b212a36",                  "sha256:d7c45933b1bdfaf9f36c579671fec15d25b06c8398f113dab64c18ed1adda01d",                  "sha256:d896919306dd0aa22d0132f62a1b78d11aaf4c9fc5b3410d3c666b818191630a", +                "sha256:fdc1c9bbf79510b76408840e009ed65958feba92a88833cdceecff93ae8fff66",                  "sha256:ffde2fbfad571af120fcbfbbc61c72469e72f550d676c3342492a9dfdefb8f12"              ],              "version": "==1.4.0" @@ -733,10 +742,10 @@          },          "virtualenv": {              "hashes": [ -                "sha256:680af46846662bb38c5504b78bad9ed9e4f3ba2d54f54ba42494fdf94337fe30", -                "sha256:f78d81b62d3147396ac33fc9d77579ddc42cc2a98dd9ea38886f616b33bc7fb2" +                "sha256:3e3597e89c73df9313f5566e8fc582bd7037938d15b05329c232ec57a11a7ad5", +                "sha256:5d370508bf32e522d79096e8cbea3499d47e624ac7e11e9089f9397a0b3318df"              ], -            "version": "==16.7.5" +            "version": "==16.7.6"          },          "zipp": {              "hashes": [ diff --git a/pydis_site/apps/api/migrations/0047_active_infractions_migration.py b/pydis_site/apps/api/migrations/0047_active_infractions_migration.py new file mode 100644 index 00000000..9ac791dc --- /dev/null +++ b/pydis_site/apps/api/migrations/0047_active_infractions_migration.py @@ -0,0 +1,105 @@ +# Generated by Django 2.2.6 on 2019-10-07 15:59 + +from django.db import migrations +from django.db.models import Count, Prefetch, QuerySet + + +class ExpirationWrapper: +    """Wraps an expiration date to properly compare permanent and temporary infractions.""" + +    def __init__(self, infraction): +        self.expiration_date = infraction.expires_at + +    def __lt__(self, other): +        """An `expiration_date` is considered smaller when it comes earlier than the `other`.""" +        if self.expiration_date is None: +            # A permanent infraction can never end sooner than another infraction +            return False +        elif other.expiration_date is None: +            # If `self` is temporary, but `other` is permanent, `self` is smaller +            return True +        else: +            return self.expiration_date < other.expiration_date + +    def __eq__(self, other): +        """If both expiration dates are permanent they're equal, otherwise compare dates.""" +        if self.expiration_date is None and other.expiration_date is None: +            return True +        elif self.expiration_date is None or other.expiration_date is None: +            return False +        else: +            return self.expiration_date == other.expiration_date + + +def migrate_inactive_types_to_inactive(apps, schema_editor): +    """Migrates infractions of non-active types to inactive.""" +    infraction_model = apps.get_model('api', 'Infraction') +    infraction_model.objects.filter(type__in=('note', 'warning', 'kick')).update(active=False) + + +def get_query(user_model, infraction_model, infr_type: str) -> QuerySet: +    """ +    Creates QuerySet to fetch users with multiple active infractions of the given `type`. + +    The QuerySet will prefetch the infractions and attach them as an `.infractions` attribute to the +    `User` instances. +    """ +    active_infractions = infraction_model.objects.filter(type=infr_type, active=True) + +    # Build an SQL query by chaining methods together + +    # Get users with active infraction(s) of the provided `infr_type` +    query = user_model.objects.filter( +        infractions_received__type=infr_type, infractions_received__active=True +    ) + +    # Prefetch their active received infractions of `infr_type` and attach `.infractions` attribute +    query = query.prefetch_related( +        Prefetch('infractions_received', queryset=active_infractions, to_attr='infractions') +    ) + +    # Count and only include them if they have at least 2 active infractions of the `type` +    query = query.annotate(num_infractions=Count('infractions_received')) +    query = query.filter(num_infractions__gte=2) + +    # Make sure we return each individual only once +    query = query.distinct() + +    return query + + +def migrate_multiple_active_infractions_per_user_to_one(apps, schema_editor): +    """ +    Make sure a user only has one active infraction of a given "active" infraction type. + +    If a user has multiple active infraction, we keep the one with longest expiration date active +    and migrate the others to inactive. +    """ +    infraction_model = apps.get_model('api', 'Infraction') +    user_model = apps.get_model('api', 'User') + +    for infraction_type in ('ban', 'mute', 'superstar', 'watch'): +        query = get_query(user_model, infraction_model, infraction_type) +        for user in query: +            infractions = sorted(user.infractions, key=ExpirationWrapper, reverse=True) +            for infraction in infractions[1:]: +                infraction.active = False +                infraction.save() + + +def reverse_migration(apps, schema_editor): +    """There's no need to do anything special to reverse these migrations.""" +    return + + +class Migration(migrations.Migration): +    """Data migration to get the database consistent with the new infraction validation rules.""" + +    dependencies = [ +        ('api', '0046_reminder_jump_url'), +    ] + +    operations = [ +        migrations.RunPython(migrate_inactive_types_to_inactive, reverse_migration), +        migrations.RunPython(migrate_multiple_active_infractions_per_user_to_one, reverse_migration) +    ] diff --git a/pydis_site/apps/api/migrations/0048_add_infractions_unique_constraints_active.py b/pydis_site/apps/api/migrations/0048_add_infractions_unique_constraints_active.py new file mode 100644 index 00000000..4ea1fb90 --- /dev/null +++ b/pydis_site/apps/api/migrations/0048_add_infractions_unique_constraints_active.py @@ -0,0 +1,17 @@ +# Generated by Django 2.2.6 on 2019-10-07 18:27 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + +    dependencies = [ +        ('api', '0047_active_infractions_migration'), +    ] + +    operations = [ +        migrations.AddConstraint( +            model_name='infraction', +            constraint=models.UniqueConstraint(condition=models.Q(active=True), fields=('user', 'type'), name='unique_active_infraction_per_type_per_user'), +        ), +    ] diff --git a/pydis_site/apps/api/models/bot/infraction.py b/pydis_site/apps/api/models/bot/infraction.py index dfb32a97..108fd3a2 100644 --- a/pydis_site/apps/api/models/bot/infraction.py +++ b/pydis_site/apps/api/models/bot/infraction.py @@ -71,3 +71,10 @@ class Infraction(ModelReprMixin, models.Model):          """Defines the meta options for the infraction model."""          ordering = ['-inserted_at'] +        constraints = ( +            models.UniqueConstraint( +                fields=["user", "type"], +                condition=models.Q(active=True), +                name="unique_active_infraction_per_type_per_user" +            ), +        ) diff --git a/pydis_site/apps/api/models/bot/user.py b/pydis_site/apps/api/models/bot/user.py index 21617dc4..5140d2bf 100644 --- a/pydis_site/apps/api/models/bot/user.py +++ b/pydis_site/apps/api/models/bot/user.py @@ -50,7 +50,7 @@ class User(ModelReprMixin, models.Model):      def __str__(self):          """Returns the name and discriminator for the current user, for display purposes.""" -        return f"{self.name}#{self.discriminator}" +        return f"{self.name}#{self.discriminator:0>4}"      @property      def top_role(self) -> Role: diff --git a/pydis_site/apps/api/serializers.py b/pydis_site/apps/api/serializers.py index 8a605612..4e7cd863 100644 --- a/pydis_site/apps/api/serializers.py +++ b/pydis_site/apps/api/serializers.py @@ -1,6 +1,6 @@  """Converters from Django models to data interchange formats and back.""" -  from rest_framework.serializers import ModelSerializer, PrimaryKeyRelatedField, ValidationError +from rest_framework.validators import UniqueTogetherValidator  from rest_framework_bulk import BulkSerializerMixin  from .models import ( @@ -105,11 +105,22 @@ class InfractionSerializer(ModelSerializer):          fields = (              'id', 'inserted_at', 'expires_at', 'active', 'user', 'actor', 'type', 'reason', 'hidden'          ) +        validators = [ +            UniqueTogetherValidator( +                queryset=Infraction.objects.filter(active=True), +                fields=['user', 'type'], +                message='This user already has an active infraction of this type.', +            ) +        ]      def validate(self, attrs: dict) -> dict:          """Validate data constraints for the given data and abort if it is invalid."""          infr_type = attrs.get('type') +        active = attrs.get('active') +        if active and infr_type in ('note', 'warning', 'kick'): +            raise ValidationError({'active': [f'{infr_type} infractions cannot be active.']}) +          expires_at = attrs.get('expires_at')          if expires_at and infr_type in ('kick', 'warning'):              raise ValidationError({'expires_at': [f'{infr_type} infractions cannot expire.']}) diff --git a/pydis_site/apps/api/tests/migrations/__init__.py b/pydis_site/apps/api/tests/migrations/__init__.py new file mode 100644 index 00000000..38e42ffc --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/__init__.py @@ -0,0 +1 @@ +"""This submodule contains tests for functions used in data migrations.""" diff --git a/pydis_site/apps/api/tests/migrations/base.py b/pydis_site/apps/api/tests/migrations/base.py new file mode 100644 index 00000000..0c0a5bd0 --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/base.py @@ -0,0 +1,102 @@ +"""Includes utilities for testing migrations.""" +from django.db import connection +from django.db.migrations.executor import MigrationExecutor +from django.test import TestCase + + +class MigrationsTestCase(TestCase): +    """ +    A `TestCase` subclass to test migration files. + +    To be able to properly test a migration, we will need to inject data into the test database +    before the migrations we want to test are applied, but after the older migrations have been +    applied. This makes sure that we are testing "as if" we were actually applying this migration +    to a database in the state it was in before introducing the new migration. + +    To set up a MigrationsTestCase, create a subclass of this class and set the following +    class-level attributes: + +    - app: The name of the app that contains the migrations (e.g., `'api'`) +    - migration_prior: The name* of the last migration file before the migrations you want to test +    - migration_target: The name* of the last migration file we want to test + +    *) Specify the file names without a path or the `.py` file extension. + +    Additionally, overwrite the `setUpMigrationData` in the subclass to inject data into the +    database before the migrations we want to test are applied. Please read the docstring of the +    method for more information. An optional hook, `setUpPostMigrationData` is also provided. +    """ + +    # These class-level attributes should be set in classes that inherit from this base class. +    app = None +    migration_prior = None +    migration_target = None + +    @classmethod +    def setUpTestData(cls): +        """ +        Injects data into the test database prior to the migration we're trying to test. + +        This class methods reverts the test database back to the state of the last migration file +        prior to the migrations we want to test. It will then allow the user to inject data into the +        test database by calling the `setUpMigrationData` hook. After the data has been injected, it +        will apply the migrations we want to test and call the `setUpPostMigrationData` hook. The +        user can now test if the migration correctly migrated the injected test data. +        """ +        if not cls.app: +            raise ValueError("The `app` attribute was not set.") + +        if not cls.migration_prior or not cls.migration_target: +            raise ValueError("Both ` migration_prior` and `migration_target` need to be set.") + +        cls.migrate_from = [(cls.app, cls.migration_prior)] +        cls.migrate_to = [(cls.app, cls.migration_target)] + +        # Reverse to database state prior to the migrations we want to test +        executor = MigrationExecutor(connection) +        executor.migrate(cls.migrate_from) + +        # Call the data injection hook with the current state of the project +        old_apps = executor.loader.project_state(cls.migrate_from).apps +        cls.setUpMigrationData(old_apps) + +        # Run the migrations we want to test +        executor = MigrationExecutor(connection) +        executor.loader.build_graph() +        executor.migrate(cls.migrate_to) + +        # Save the project state so we're able to work with the correct model states +        cls.apps = executor.loader.project_state(cls.migrate_to).apps + +        # Call `setUpPostMigrationData` to potentially set up post migration data used in testing +        cls.setUpPostMigrationData(cls.apps) + +    @classmethod +    def setUpMigrationData(cls, apps): +        """ +        Override this method to inject data into the test database before the migration is applied. + +        This method will be called after setting up the database according to the migrations that +        come before the migration(s) we are trying to test, but before the to-be-tested migration(s) +        are applied. This allows us to simulate a database state just prior to the migrations we are +        trying to test. + +        To make sure we're creating objects according to the state the models were in at this point +        in the migration history, use `apps.get_model(app_name: str, model_name: str)` to get the +        appropriate model, e.g.: + +        >>> Infraction = apps.get_model('api', 'Infraction') +        """ +        pass + +    @classmethod +    def setUpPostMigrationData(cls, apps): +        """ +        Set up additional test data after the target migration has been applied. + +        Use `apps.get_model(app_name: str, model_name: str)` to get the correct instances of the +        model classes: + +        >>> Infraction = apps.get_model('api', 'Infraction') +        """ +        pass diff --git a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py new file mode 100644 index 00000000..8dc29b34 --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py @@ -0,0 +1,496 @@ +"""Tests for the data migration in `filename`.""" +import logging +from collections import ChainMap, namedtuple +from datetime import timedelta +from itertools import count +from typing import Dict, Iterable, Type, Union + +from django.db.models import Q +from django.forms.models import model_to_dict +from django.utils import timezone + +from pydis_site.apps.api.models import Infraction, User +from .base import MigrationsTestCase + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + + +InfractionHistory = namedtuple('InfractionHistory', ("user_id", "infraction_history")) + + +class InfractionFactory: +    """Factory that creates infractions for a User instance.""" + +    infraction_id = count(1) +    user_id = count(1) +    default_values = { +        'active': True, +        'expires_at': None, +        'hidden': False, +    } + +    @classmethod +    def create( +        cls, +        actor: User, +        infractions: Iterable[Dict[str, Union[str, int, bool]]], +        infraction_model: Type[Infraction] = Infraction, +        user_model: Type[User] = User, +    ) -> InfractionHistory: +        """ +        Creates `infractions` for the `user` with the given `actor`. + +        The `infractions` dictionary can contain the following fields: +         - `type` (required) +         - `active` (default: True) +         - `expires_at` (default: None; i.e, permanent) +         - `hidden` (default: False). + +        The parameters `infraction_model` and `user_model` can be used to pass in an instance of +        both model classes from a different migration/project state. +        """ +        user_id = next(cls.user_id) +        user = user_model.objects.create( +            id=user_id, +            name=f"Infracted user {user_id}", +            discriminator=user_id, +            avatar_hash=None, +        ) +        infraction_history = [] + +        for infraction in infractions: +            infraction = dict(infraction) +            infraction["id"] = next(cls.infraction_id) +            infraction = ChainMap(infraction, cls.default_values) +            new_infraction = infraction_model.objects.create( +                user=user, +                actor=actor, +                type=infraction["type"], +                reason=f"`{infraction['type']}` infraction (ID: {infraction['id']} of {user}", +                active=infraction['active'], +                hidden=infraction['hidden'], +                expires_at=infraction['expires_at'], +            ) +            infraction_history.append(new_infraction) + +        return InfractionHistory(user_id=user_id, infraction_history=infraction_history) + + +class InfractionFactoryTests(MigrationsTestCase): +    """Tests for the InfractionFactory.""" + +    app = "api" +    migration_prior = "0046_reminder_jump_url" +    migration_target = "0046_reminder_jump_url" + +    @classmethod +    def setUpPostMigrationData(cls, apps): +        """Create a default actor for all infractions.""" +        cls.infraction_model = apps.get_model('api', 'Infraction') +        cls.user_model = apps.get_model('api', 'User') + +        cls.actor = cls.user_model.objects.create( +            id=9999, +            name="Unknown Moderator", +            discriminator=1040, +            avatar_hash=None, +        ) + +    def test_infraction_factory_total_count(self): +        """Does the test database hold as many infractions as we tried to create?""" +        InfractionFactory.create( +            actor=self.actor, +            infractions=( +                {'type': 'kick', 'active': False, 'hidden': False}, +                {'type': 'ban', 'active': True, 'hidden': False}, +                {'type': 'note', 'active': False, 'hidden': True}, +            ), +            infraction_model=self.infraction_model, +            user_model=self.user_model, +        ) +        database_count = Infraction.objects.all().count() +        self.assertEqual(3, database_count) + +    def test_infraction_factory_multiple_users(self): +        """Does the test database hold as many infractions as we tried to create?""" +        for _user in range(5): +            InfractionFactory.create( +                actor=self.actor, +                infractions=( +                    {'type': 'kick', 'active': False, 'hidden': True}, +                    {'type': 'ban', 'active': True, 'hidden': False}, +                ), +                infraction_model=self.infraction_model, +                user_model=self.user_model, +            ) + +        # Check if infractions and users are recorded properly in the database +        database_count = Infraction.objects.all().count() +        self.assertEqual(database_count, 10) + +        user_count = User.objects.all().count() +        self.assertEqual(user_count, 5 + 1) + +    def test_infraction_factory_sets_correct_fields(self): +        """Does the InfractionFactory set the correct attributes?""" +        infractions = ( +            { +                'type': 'note', +                'active': False, +                'hidden': True, +                'expires_at': timezone.now() +            }, +            {'type': 'warning', 'active': False, 'hidden': False, 'expires_at': None}, +            {'type': 'watch', 'active': False, 'hidden': True, 'expires_at': None}, +            {'type': 'mute', 'active': True, 'hidden': False, 'expires_at': None}, +            {'type': 'kick', 'active': True, 'hidden': True, 'expires_at': None}, +            {'type': 'ban', 'active': True, 'hidden': False, 'expires_at': None}, +            { +                'type': 'superstar', +                'active': True, +                'hidden': True, +                'expires_at': timezone.now() +            }, +        ) + +        InfractionFactory.create( +            actor=self.actor, +            infractions=infractions, +            infraction_model=self.infraction_model, +            user_model=self.user_model, +        ) + +        for infraction in infractions: +            with self.subTest(**infraction): +                self.assertTrue(Infraction.objects.filter(**infraction).exists()) + + +class ActiveInfractionMigrationTests(MigrationsTestCase): +    """ +    Tests the active infraction data migration. + +    The active infraction data migration should do the following things: + +    1.  migrates all active notes, warnings, and kicks to an inactive status; +    2.  migrates all users with multiple active infractions of a single type to have only one active +        infraction of that type. The infraction with the longest duration stays active. +    """ + +    app = "api" +    migration_prior = "0046_reminder_jump_url" +    migration_target = "0047_active_infractions_migration" + +    @classmethod +    def setUpMigrationData(cls, apps): +        """Sets up an initial database state that contains the relevant test cases.""" +        # Fetch the Infraction and User model in the current migration state +        cls.infraction_model = apps.get_model('api', 'Infraction') +        cls.user_model = apps.get_model('api', 'User') + +        cls.created_infractions = {} + +        # Moderator that serves as actor for all infractions +        cls.user_moderator = cls.user_model.objects.create( +            id=9999, +            name="Olivier de Vienne", +            discriminator=1040, +            avatar_hash=None, +        ) + +        # User #1: clean user with no infractions +        cls.created_infractions["no infractions"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=[], +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #2: One inactive note infraction +        cls.created_infractions["one inactive note"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': False, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #3: One active note infraction +        cls.created_infractions["one active note"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #4: One active and one inactive note infraction +        cls.created_infractions["one active and one inactive note"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': False, 'hidden': True}, +                {'type': 'note', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #5: Once active note, one active kick, once active warning +        cls.created_infractions["active note, kick, warning"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'note', 'active': True, 'hidden': True}, +                {'type': 'kick', 'active': True, 'hidden': True}, +                {'type': 'warning', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #6: One inactive ban and one active ban +        cls.created_infractions["one inactive and one active ban"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': False, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #7: Two active permanent bans +        cls.created_infractions["two active perm bans"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #8: Multiple active temporary bans +        cls.created_infractions["multiple active temp bans"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=1) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=10) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=20) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=5) +                }, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #9: One active permanent ban, two active temporary bans +        cls.created_infractions["active perm, two active temp bans"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=10) +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': None, +                }, +                { +                    'type': 'ban', +                    'active': True, +                    'hidden': True, +                    'expires_at': timezone.now() + timedelta(days=7) +                }, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #10: One inactive permanent ban, two active temporary bans +        cls.created_infractions["one inactive perm ban, two active temp bans"] = ( +            InfractionFactory.create( +                actor=cls.user_moderator, +                infractions=( +                    { +                        'type': 'ban', +                        'active': True, +                        'hidden': True, +                        'expires_at': timezone.now() + timedelta(days=10) +                    }, +                    { +                        'type': 'ban', +                        'active': False, +                        'hidden': True, +                        'expires_at': None, +                    }, +                    { +                        'type': 'ban', +                        'active': True, +                        'hidden': True, +                        'expires_at': timezone.now() + timedelta(days=7) +                    }, +                ), +                infraction_model=cls.infraction_model, +                user_model=cls.user_model, +            ) +        ) + +        # User #11: Active ban, active mute, active superstar +        cls.created_infractions["active ban, mute, and superstar"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +        # User #12: Multiple active bans, active mutes, active superstars +        cls.created_infractions["multiple active bans, mutes, stars"] = InfractionFactory.create( +            actor=cls.user_moderator, +            infractions=( +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'ban', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'mute', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'superstar', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +                {'type': 'watch', 'active': True, 'hidden': True}, +            ), +            infraction_model=cls.infraction_model, +            user_model=cls.user_model, +        ) + +    def test_all_never_active_types_became_inactive(self): +        """Are all infractions of a non-active type inactive after the migration?""" +        inactive_type_query = Q(type="note") | Q(type="warning") | Q(type="kick") +        self.assertFalse( +            self.infraction_model.objects.filter(inactive_type_query, active=True).exists() +        ) + +    def test_migration_left_clean_user_without_infractions(self): +        """Do users without infractions have no infractions after the migration?""" +        user_id, infraction_history = self.created_infractions["no infractions"] +        self.assertFalse( +            self.infraction_model.objects.filter(user__id=user_id).exists() +        ) + +    def test_migration_left_user_with_inactive_note_untouched(self): +        """Did the migration leave users with only an inactive note untouched?""" +        user_id, infraction_history = self.created_infractions["one inactive note"] +        inactive_note = infraction_history[0] +        self.assertTrue( +            self.infraction_model.objects.filter(**model_to_dict(inactive_note)).exists() +        ) + +    def test_migration_only_touched_active_field_of_active_note(self): +        """Does the migration only change the `active` field?""" +        user_id, infraction_history = self.created_infractions["one active note"] +        note = model_to_dict(infraction_history[0]) +        note['active'] = False +        self.assertTrue( +            self.infraction_model.objects.filter(**note).exists() +        ) + +    def test_migration_only_touched_active_field_of_active_note_left_inactive_untouched(self): +        """Does the migration only change the `active` field of active notes?""" +        user_id, infraction_history = self.created_infractions["one active and one inactive note"] +        for note in infraction_history: +            with self.subTest(active=note.active): +                note = model_to_dict(note) +                note['active'] = False +                self.assertTrue( +                    self.infraction_model.objects.filter(**note).exists() +                ) + +    def test_migration_migrates_all_nonactive_types_to_inactive(self): +        """Do we set the `active` field of all non-active infractions to `False`?""" +        user_id, infraction_history = self.created_infractions["active note, kick, warning"] +        self.assertFalse( +            self.infraction_model.objects.filter(user__id=user_id, active=True).exists() +        ) + +    def test_migration_leaves_user_with_one_active_ban_untouched(self): +        """Do we leave a user with one active and one inactive ban untouched?""" +        user_id, infraction_history = self.created_infractions["one inactive and one active ban"] +        for infraction in infraction_history: +            with self.subTest(active=infraction.active): +                self.assertTrue( +                    self.infraction_model.objects.filter(**model_to_dict(infraction)).exists() +                ) + +    def test_migration_turns_double_active_perm_ban_into_single_active_perm_ban(self): +        """Does the migration turn two active permanent bans into one active permanent ban?""" +        user_id, infraction_history = self.created_infractions["two active perm bans"] +        active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count() +        self.assertEqual(active_count, 1) + +    def test_migration_leaves_temporary_ban_with_longest_duration_active(self): +        """Does the migration turn two active permanent bans into one active permanent ban?""" +        user_id, infraction_history = self.created_infractions["multiple active temp bans"] +        active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) +        self.assertEqual(active_ban.expires_at, infraction_history[2].expires_at) + +    def test_migration_leaves_permanent_ban_active(self): +        """Does the migration leave the permanent ban active?""" +        user_id, infraction_history = self.created_infractions["active perm, two active temp bans"] +        active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) +        self.assertIsNone(active_ban.expires_at) + +    def test_migration_leaves_longest_temp_ban_active_with_inactive_permanent_ban(self): +        """Does the longest temp ban stay active, even with an inactive perm ban present?""" +        user_id, infraction_history = self.created_infractions[ +            "one inactive perm ban, two active temp bans" +        ] +        active_ban = self.infraction_model.objects.get(user__id=user_id, active=True) +        self.assertEqual(active_ban.expires_at, infraction_history[0].expires_at) + +    def test_migration_leaves_all_active_types_active_if_one_of_each_exists(self): +        """Do all active infractions stay active if only one of each is present?""" +        user_id, infraction_history = self.created_infractions["active ban, mute, and superstar"] +        active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count() +        self.assertEqual(active_count, 4) + +    def test_migration_reduces_all_active_types_to_a_single_active_infraction(self): +        """Do we reduce all of the infraction types to one active infraction?""" +        user_id, infraction_history = self.created_infractions["multiple active bans, mutes, stars"] +        active_infractions = self.infraction_model.objects.filter(user__id=user_id, active=True) +        self.assertEqual(len(active_infractions), 4) +        types_observed = [infraction.type for infraction in active_infractions] + +        for infraction_type in ('ban', 'mute', 'superstar', 'watch'): +            with self.subTest(type=infraction_type): +                self.assertIn(infraction_type, types_observed) diff --git a/pydis_site/apps/api/tests/migrations/test_base.py b/pydis_site/apps/api/tests/migrations/test_base.py new file mode 100644 index 00000000..f69bc92c --- /dev/null +++ b/pydis_site/apps/api/tests/migrations/test_base.py @@ -0,0 +1,135 @@ +import logging +from unittest.mock import call, patch + +from django.db.migrations.loader import MigrationLoader +from django.test import TestCase + +from .base import MigrationsTestCase, connection + +log = logging.getLogger(__name__) + + +class SpanishInquisition(MigrationsTestCase): +    app = "api" +    migration_prior = "scragly" +    migration_target = "kosa" + + +@patch("pydis_site.apps.api.tests.migrations.base.MigrationExecutor") +class MigrationsTestCaseNoSideEffectsTests(TestCase): +    """Tests the MigrationTestCase class with actual migration side effects disabled.""" + +    def setUp(self): +        """Set up an instance of MigrationsTestCase for use in tests.""" +        self.test_case = SpanishInquisition() + +    def test_missing_app_class_raises_value_error(self, _migration_executor): +        """A MigrationsTestCase subclass should set the class-attribute `app`.""" +        class Spam(MigrationsTestCase): +            pass + +        spam = Spam() +        with self.assertRaises(ValueError, msg="The `app` attribute was not set."): +            spam.setUpTestData() + +    def test_missing_migration_class_attributes_raise_value_error(self, _migration_executor): +        """A MigrationsTestCase subclass should set both `migration_prior` and `migration_target`""" +        class Eggs(MigrationsTestCase): +            app = "api" +            migration_target = "lemon" + +        class Bacon(MigrationsTestCase): +            app = "api" +            migration_prior = "mark" + +        instances = (Eggs(), Bacon()) + +        exception_message = "Both ` migration_prior` and `migration_target` need to be set." +        for instance in instances: +            with self.subTest( +                    migration_prior=instance.migration_prior, +                    migration_target=instance.migration_target, +            ): +                with self.assertRaises(ValueError, msg=exception_message): +                    instance.setUpTestData() + +    @patch(f"{__name__}.SpanishInquisition.setUpMigrationData") +    @patch(f"{__name__}.SpanishInquisition.setUpPostMigrationData") +    def test_migration_data_hooks_are_called_once(self, pre_hook, post_hook, _migration_executor): +        """The `setUpMigrationData` and `setUpPostMigrationData` hooks should be called once.""" +        self.test_case.setUpTestData() +        for hook in (pre_hook, post_hook): +            with self.subTest(hook=repr(hook)): +                hook.assert_called_once() + +    def test_migration_executor_is_instantiated_twice(self, migration_executor): +        """The `MigrationExecutor` should be instantiated with the database connection twice.""" +        self.test_case.setUpTestData() + +        expected_args = [call(connection), call(connection)] +        self.assertEqual(migration_executor.call_args_list, expected_args) + +    def test_project_state_is_loaded_for_correct_migration_files_twice(self, migration_executor): +        """The `project_state` should first be loaded with `migrate_from`, then `migrate_to`.""" +        self.test_case.setUpTestData() + +        expected_args = [call(self.test_case.migrate_from), call(self.test_case.migrate_to)] +        self.assertEqual(migration_executor().loader.project_state.call_args_list, expected_args) + +    def test_loader_build_graph_gets_called_once(self, migration_executor): +        """We should rebuild the migration graph before applying the second set of migrations.""" +        self.test_case.setUpTestData() + +        migration_executor().loader.build_graph.assert_called_once() + +    def test_migration_executor_migrate_method_is_called_correctly_twice(self, migration_executor): +        """The migrate method of the executor should be called twice with the correct arguments.""" +        self.test_case.setUpTestData() + +        self.assertEqual(migration_executor().migrate.call_count, 2) +        calls = [call([('api', 'scragly')]), call([('api', 'kosa')])] +        migration_executor().migrate.assert_has_calls(calls) + + +class LifeOfBrian(MigrationsTestCase): +    app = "api" +    migration_prior = "0046_reminder_jump_url" +    migration_target = "0048_add_infractions_unique_constraints_active" + +    @classmethod +    def log_last_migration(cls): +        """Parses the applied migrations dictionary to log the last applied migration.""" +        loader = MigrationLoader(connection) +        api_migrations = [ +            migration for app, migration in loader.applied_migrations if app == cls.app +        ] +        last_migration = max(api_migrations, key=lambda name: int(name[:4])) +        log.info(f"The last applied migration: {last_migration}") + +    @classmethod +    def setUpMigrationData(cls, apps): +        """Method that logs the last applied migration at this point.""" +        cls.log_last_migration() + +    @classmethod +    def setUpPostMigrationData(cls, apps): +        """Method that logs the last applied migration at this point.""" +        cls.log_last_migration() + + +class MigrationsTestCaseMigrationTest(TestCase): +    """Tests if `MigrationsTestCase` travels to the right points in the migration history.""" + +    def test_migrations_test_case_travels_to_correct_migrations_in_history(self): +        """The test case should first revert to `migration_prior`, then go to `migration_target`.""" +        brian = LifeOfBrian() + +        with self.assertLogs(log, level=logging.INFO) as logs: +            brian.setUpTestData() + +            self.assertEqual(len(logs.records), 2) + +            for time_point, record in zip(("migration_prior", "migration_target"), logs.records): +                with self.subTest(time_point=time_point): +                    message = f"The last applied migration: {getattr(brian, time_point)}" +                    self.assertEqual(record.getMessage(), message) diff --git a/pydis_site/apps/api/tests/test_infractions.py b/pydis_site/apps/api/tests/test_infractions.py index c58c32e2..7a54640e 100644 --- a/pydis_site/apps/api/tests/test_infractions.py +++ b/pydis_site/apps/api/tests/test_infractions.py @@ -1,6 +1,8 @@  from datetime import datetime as dt, timedelta, timezone +from unittest.mock import patch  from urllib.parse import quote +from django.db.utils import IntegrityError  from django_hosts.resolvers import reverse  from .base import APISubdomainTestCase @@ -167,6 +169,12 @@ class CreationTests(APISubdomainTestCase):              discriminator=1,              avatar_hash=None          ) +        cls.second_user = User.objects.create( +            id=6, +            name='carl', +            discriminator=2, +            avatar_hash=None +        )      def test_accepts_valid_data(self):          url = reverse('bot:infraction-list', host='api') @@ -305,6 +313,187 @@ class CreationTests(APISubdomainTestCase):              'hidden': [f'{data["type"]} infractions must be hidden.']          }) +    def test_returns_400_for_active_infraction_of_type_that_cannot_be_active(self): +        """Test if the API rejects active infractions for types that cannot be active.""" +        url = reverse('bot:infraction-list', host='api') +        restricted_types = ( +            ('note', True), +            ('warning', False), +            ('kick', False), +        ) + +        for infraction_type, hidden in restricted_types: +            with self.subTest(infraction_type=infraction_type): +                invalid_infraction = { +                    'user': self.user.id, +                    'actor': self.user.id, +                    'type': infraction_type, +                    'reason': 'Take me on!', +                    'hidden': hidden, +                    'active': True, +                    'expires_at': None, +                } +                response = self.client.post(url, data=invalid_infraction) +                self.assertEqual(response.status_code, 400) +                self.assertEqual( +                    response.json(), +                    {'active': [f'{infraction_type} infractions cannot be active.']} +                ) + +    def test_returns_400_for_second_active_infraction_of_the_same_type(self): +        """Test if the API rejects a second active infraction of the same type for a given user.""" +        url = reverse('bot:infraction-list', host='api') +        active_infraction_types = ('mute', 'ban', 'superstar') + +        for infraction_type in active_infraction_types: +            with self.subTest(infraction_type=infraction_type): +                first_active_infraction = { +                    'user': self.user.id, +                    'actor': self.user.id, +                    'type': infraction_type, +                    'reason': 'Take me on!', +                    'active': True, +                    'expires_at': '2019-10-04T12:52:00+00:00' +                } + +                # Post the first active infraction of a type and confirm it's accepted. +                first_response = self.client.post(url, data=first_active_infraction) +                self.assertEqual(first_response.status_code, 201) + +                second_active_infraction = { +                    'user': self.user.id, +                    'actor': self.user.id, +                    'type': infraction_type, +                    'reason': 'Take on me!', +                    'active': True, +                    'expires_at': '2019-10-04T12:52:00+00:00' +                } +                second_response = self.client.post(url, data=second_active_infraction) +                self.assertEqual(second_response.status_code, 400) +                self.assertEqual( +                    second_response.json(), +                    { +                        'non_field_errors': [ +                            'This user already has an active infraction of this type.' +                        ] +                    } +                ) + +    def test_returns_201_for_second_active_infraction_of_different_type(self): +        """Test if the API accepts a second active infraction of a different type than the first.""" +        url = reverse('bot:infraction-list', host='api') +        first_active_infraction = { +            'user': self.user.id, +            'actor': self.user.id, +            'type': 'mute', +            'reason': 'Be silent!', +            'hidden': True, +            'active': True, +            'expires_at': '2019-10-04T12:52:00+00:00' +        } +        second_active_infraction = { +            'user': self.user.id, +            'actor': self.user.id, +            'type': 'ban', +            'reason': 'Be gone!', +            'hidden': True, +            'active': True, +            'expires_at': '2019-10-05T12:52:00+00:00' +        } +        # Post the first active infraction of a type and confirm it's accepted. +        first_response = self.client.post(url, data=first_active_infraction) +        self.assertEqual(first_response.status_code, 201) + +        # Post the first active infraction of a type and confirm it's accepted. +        second_response = self.client.post(url, data=second_active_infraction) +        self.assertEqual(second_response.status_code, 201) + +    def test_unique_constraint_raises_integrity_error_on_second_active_of_same_type(self): +        """Do we raise `IntegrityError` for the second active infraction of a type for a user?""" +        Infraction.objects.create( +            user=self.user, +            actor=self.user, +            type="ban", +            active=True, +            reason="The first active ban" +        ) +        with self.assertRaises(IntegrityError): +            Infraction.objects.create( +                user=self.user, +                actor=self.user, +                type="ban", +                active=True, +                reason="The second active ban" +            ) + +    def test_unique_constraint_accepts_active_infraction_after_inactive_infraction(self): +        """Do we accept an active infraction if the others of the same type are inactive?""" +        try: +            Infraction.objects.create( +                user=self.user, +                actor=self.user, +                type="ban", +                active=False, +                reason="The first inactive ban" +            ) +            Infraction.objects.create( +                user=self.user, +                actor=self.user, +                type="ban", +                active=False, +                reason="The second inactive ban" +            ) +            Infraction.objects.create( +                user=self.user, +                actor=self.user, +                type="ban", +                active=True, +                reason="The first active ban" +            ) +        except IntegrityError: +            self.fail("An unexpected IntegrityError was raised.") + +    @patch(f"{__name__}.Infraction") +    def test_if_accepts_active_infraction_test_catches_integrity_error(self, infraction_patch): +        """Does the test properly catch the IntegrityError and raise an AssertionError?""" +        infraction_patch.objects.create.side_effect = IntegrityError +        with self.assertRaises(AssertionError, msg="An unexpected IntegrityError was raised."): +            self.test_unique_constraint_accepts_active_infraction_after_inactive_infraction() + +    def test_unique_constraint_accepts_second_active_of_different_type(self): +        """Do we accept a second active infraction of a different type for a given user?""" +        Infraction.objects.create( +            user=self.user, +            actor=self.user, +            type="ban", +            active=True, +            reason="The first active ban" +        ) +        Infraction.objects.create( +            user=self.user, +            actor=self.user, +            type="mute", +            active=True, +            reason="The first active mute" +        ) + +    def test_unique_constraint_accepts_active_infractions_for_different_users(self): +        """Do we accept two active infractions of the same type for two different users?""" +        Infraction.objects.create( +            user=self.user, +            actor=self.user, +            type="ban", +            active=True, +            reason="An active ban for the first user" +        ) +        Infraction.objects.create( +            user=self.second_user, +            actor=self.second_user, +            type="ban", +            active=False, +            reason="An active ban for the second user" +        ) +  class ExpandedTests(APISubdomainTestCase):      @classmethod @@ -318,12 +507,14 @@ class ExpandedTests(APISubdomainTestCase):          cls.kick = Infraction.objects.create(              user_id=cls.user.id,              actor_id=cls.user.id, -            type='kick' +            type='kick', +            active=False          )          cls.warning = Infraction.objects.create(              user_id=cls.user.id,              actor_id=cls.user.id, -            type='warning' +            type='warning', +            active=False,          )      def check_expanded_fields(self, infraction): | 
