aboutsummaryrefslogtreecommitdiffstats
path: root/pydis_site/apps/api/tests
diff options
context:
space:
mode:
authorGravatar hedy <[email protected]>2023-12-14 20:28:17 +0800
committerGravatar hedy <[email protected]>2023-12-14 20:28:17 +0800
commit449c08fd5459b2f804dbf825086ec1dd0f244d8a (patch)
treee4589cb227cdb2e611bcbf9b02ea481fe24cdb34 /pydis_site/apps/api/tests
parentResize theme switch (diff)
parentMerge pull request #1173 from python-discord/dependabot/pip/sentry-sdk-1.39.0 (diff)
Fix all conflicts
hopefully I dont have to do this again
Diffstat (limited to 'pydis_site/apps/api/tests')
-rw-r--r--pydis_site/apps/api/tests/base.py3
-rw-r--r--pydis_site/apps/api/tests/migrations/__init__.py1
-rw-r--r--pydis_site/apps/api/tests/migrations/base.py102
-rw-r--r--pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py496
-rw-r--r--pydis_site/apps/api/tests/migrations/test_base.py135
-rw-r--r--pydis_site/apps/api/tests/test_bumped_threads.py63
-rw-r--r--pydis_site/apps/api/tests/test_deleted_messages.py11
-rw-r--r--pydis_site/apps/api/tests/test_documentation_links.py2
-rw-r--r--pydis_site/apps/api/tests/test_filterlists.py122
-rw-r--r--pydis_site/apps/api/tests/test_filters.py352
-rw-r--r--pydis_site/apps/api/tests/test_github_utils.py289
-rw-r--r--pydis_site/apps/api/tests/test_github_webhook_filter.py62
-rw-r--r--pydis_site/apps/api/tests/test_infractions.py120
-rw-r--r--pydis_site/apps/api/tests/test_models.py38
-rw-r--r--pydis_site/apps/api/tests/test_nominations.py42
-rw-r--r--pydis_site/apps/api/tests/test_off_topic_channel_names.py37
-rw-r--r--pydis_site/apps/api/tests/test_offensive_message.py90
-rw-r--r--pydis_site/apps/api/tests/test_reminders.py21
-rw-r--r--pydis_site/apps/api/tests/test_roles.py2
-rw-r--r--pydis_site/apps/api/tests/test_rules.py40
-rw-r--r--pydis_site/apps/api/tests/test_users.py152
-rw-r--r--pydis_site/apps/api/tests/test_validators.py239
22 files changed, 1175 insertions, 1244 deletions
diff --git a/pydis_site/apps/api/tests/base.py b/pydis_site/apps/api/tests/base.py
index c9f3cb7e..704b22cf 100644
--- a/pydis_site/apps/api/tests/base.py
+++ b/pydis_site/apps/api/tests/base.py
@@ -61,6 +61,7 @@ class AuthenticatedAPITestCase(APITestCase):
... self.assertEqual(response.status_code, 200)
"""
- def setUp(self):
+ def setUp(self) -> None:
+ """Bootstrap the user and authenticate it."""
super().setUp()
self.client.force_authenticate(test_user)
diff --git a/pydis_site/apps/api/tests/migrations/__init__.py b/pydis_site/apps/api/tests/migrations/__init__.py
deleted file mode 100644
index 38e42ffc..00000000
--- a/pydis_site/apps/api/tests/migrations/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-"""This submodule contains tests for functions used in data migrations."""
diff --git a/pydis_site/apps/api/tests/migrations/base.py b/pydis_site/apps/api/tests/migrations/base.py
deleted file mode 100644
index 0c0a5bd0..00000000
--- a/pydis_site/apps/api/tests/migrations/base.py
+++ /dev/null
@@ -1,102 +0,0 @@
-"""Includes utilities for testing migrations."""
-from django.db import connection
-from django.db.migrations.executor import MigrationExecutor
-from django.test import TestCase
-
-
-class MigrationsTestCase(TestCase):
- """
- A `TestCase` subclass to test migration files.
-
- To be able to properly test a migration, we will need to inject data into the test database
- before the migrations we want to test are applied, but after the older migrations have been
- applied. This makes sure that we are testing "as if" we were actually applying this migration
- to a database in the state it was in before introducing the new migration.
-
- To set up a MigrationsTestCase, create a subclass of this class and set the following
- class-level attributes:
-
- - app: The name of the app that contains the migrations (e.g., `'api'`)
- - migration_prior: The name* of the last migration file before the migrations you want to test
- - migration_target: The name* of the last migration file we want to test
-
- *) Specify the file names without a path or the `.py` file extension.
-
- Additionally, overwrite the `setUpMigrationData` in the subclass to inject data into the
- database before the migrations we want to test are applied. Please read the docstring of the
- method for more information. An optional hook, `setUpPostMigrationData` is also provided.
- """
-
- # These class-level attributes should be set in classes that inherit from this base class.
- app = None
- migration_prior = None
- migration_target = None
-
- @classmethod
- def setUpTestData(cls):
- """
- Injects data into the test database prior to the migration we're trying to test.
-
- This class methods reverts the test database back to the state of the last migration file
- prior to the migrations we want to test. It will then allow the user to inject data into the
- test database by calling the `setUpMigrationData` hook. After the data has been injected, it
- will apply the migrations we want to test and call the `setUpPostMigrationData` hook. The
- user can now test if the migration correctly migrated the injected test data.
- """
- if not cls.app:
- raise ValueError("The `app` attribute was not set.")
-
- if not cls.migration_prior or not cls.migration_target:
- raise ValueError("Both ` migration_prior` and `migration_target` need to be set.")
-
- cls.migrate_from = [(cls.app, cls.migration_prior)]
- cls.migrate_to = [(cls.app, cls.migration_target)]
-
- # Reverse to database state prior to the migrations we want to test
- executor = MigrationExecutor(connection)
- executor.migrate(cls.migrate_from)
-
- # Call the data injection hook with the current state of the project
- old_apps = executor.loader.project_state(cls.migrate_from).apps
- cls.setUpMigrationData(old_apps)
-
- # Run the migrations we want to test
- executor = MigrationExecutor(connection)
- executor.loader.build_graph()
- executor.migrate(cls.migrate_to)
-
- # Save the project state so we're able to work with the correct model states
- cls.apps = executor.loader.project_state(cls.migrate_to).apps
-
- # Call `setUpPostMigrationData` to potentially set up post migration data used in testing
- cls.setUpPostMigrationData(cls.apps)
-
- @classmethod
- def setUpMigrationData(cls, apps):
- """
- Override this method to inject data into the test database before the migration is applied.
-
- This method will be called after setting up the database according to the migrations that
- come before the migration(s) we are trying to test, but before the to-be-tested migration(s)
- are applied. This allows us to simulate a database state just prior to the migrations we are
- trying to test.
-
- To make sure we're creating objects according to the state the models were in at this point
- in the migration history, use `apps.get_model(app_name: str, model_name: str)` to get the
- appropriate model, e.g.:
-
- >>> Infraction = apps.get_model('api', 'Infraction')
- """
- pass
-
- @classmethod
- def setUpPostMigrationData(cls, apps):
- """
- Set up additional test data after the target migration has been applied.
-
- Use `apps.get_model(app_name: str, model_name: str)` to get the correct instances of the
- model classes:
-
- >>> Infraction = apps.get_model('api', 'Infraction')
- """
- pass
diff --git a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py b/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py
deleted file mode 100644
index 8dc29b34..00000000
--- a/pydis_site/apps/api/tests/migrations/test_active_infraction_migration.py
+++ /dev/null
@@ -1,496 +0,0 @@
-"""Tests for the data migration in `filename`."""
-import logging
-from collections import ChainMap, namedtuple
-from datetime import timedelta
-from itertools import count
-from typing import Dict, Iterable, Type, Union
-
-from django.db.models import Q
-from django.forms.models import model_to_dict
-from django.utils import timezone
-
-from pydis_site.apps.api.models import Infraction, User
-from .base import MigrationsTestCase
-
-log = logging.getLogger(__name__)
-log.setLevel(logging.DEBUG)
-
-
-InfractionHistory = namedtuple('InfractionHistory', ("user_id", "infraction_history"))
-
-
-class InfractionFactory:
- """Factory that creates infractions for a User instance."""
-
- infraction_id = count(1)
- user_id = count(1)
- default_values = {
- 'active': True,
- 'expires_at': None,
- 'hidden': False,
- }
-
- @classmethod
- def create(
- cls,
- actor: User,
- infractions: Iterable[Dict[str, Union[str, int, bool]]],
- infraction_model: Type[Infraction] = Infraction,
- user_model: Type[User] = User,
- ) -> InfractionHistory:
- """
- Creates `infractions` for the `user` with the given `actor`.
-
- The `infractions` dictionary can contain the following fields:
- - `type` (required)
- - `active` (default: True)
- - `expires_at` (default: None; i.e, permanent)
- - `hidden` (default: False).
-
- The parameters `infraction_model` and `user_model` can be used to pass in an instance of
- both model classes from a different migration/project state.
- """
- user_id = next(cls.user_id)
- user = user_model.objects.create(
- id=user_id,
- name=f"Infracted user {user_id}",
- discriminator=user_id,
- avatar_hash=None,
- )
- infraction_history = []
-
- for infraction in infractions:
- infraction = dict(infraction)
- infraction["id"] = next(cls.infraction_id)
- infraction = ChainMap(infraction, cls.default_values)
- new_infraction = infraction_model.objects.create(
- user=user,
- actor=actor,
- type=infraction["type"],
- reason=f"`{infraction['type']}` infraction (ID: {infraction['id']} of {user}",
- active=infraction['active'],
- hidden=infraction['hidden'],
- expires_at=infraction['expires_at'],
- )
- infraction_history.append(new_infraction)
-
- return InfractionHistory(user_id=user_id, infraction_history=infraction_history)
-
-
-class InfractionFactoryTests(MigrationsTestCase):
- """Tests for the InfractionFactory."""
-
- app = "api"
- migration_prior = "0046_reminder_jump_url"
- migration_target = "0046_reminder_jump_url"
-
- @classmethod
- def setUpPostMigrationData(cls, apps):
- """Create a default actor for all infractions."""
- cls.infraction_model = apps.get_model('api', 'Infraction')
- cls.user_model = apps.get_model('api', 'User')
-
- cls.actor = cls.user_model.objects.create(
- id=9999,
- name="Unknown Moderator",
- discriminator=1040,
- avatar_hash=None,
- )
-
- def test_infraction_factory_total_count(self):
- """Does the test database hold as many infractions as we tried to create?"""
- InfractionFactory.create(
- actor=self.actor,
- infractions=(
- {'type': 'kick', 'active': False, 'hidden': False},
- {'type': 'ban', 'active': True, 'hidden': False},
- {'type': 'note', 'active': False, 'hidden': True},
- ),
- infraction_model=self.infraction_model,
- user_model=self.user_model,
- )
- database_count = Infraction.objects.all().count()
- self.assertEqual(3, database_count)
-
- def test_infraction_factory_multiple_users(self):
- """Does the test database hold as many infractions as we tried to create?"""
- for _user in range(5):
- InfractionFactory.create(
- actor=self.actor,
- infractions=(
- {'type': 'kick', 'active': False, 'hidden': True},
- {'type': 'ban', 'active': True, 'hidden': False},
- ),
- infraction_model=self.infraction_model,
- user_model=self.user_model,
- )
-
- # Check if infractions and users are recorded properly in the database
- database_count = Infraction.objects.all().count()
- self.assertEqual(database_count, 10)
-
- user_count = User.objects.all().count()
- self.assertEqual(user_count, 5 + 1)
-
- def test_infraction_factory_sets_correct_fields(self):
- """Does the InfractionFactory set the correct attributes?"""
- infractions = (
- {
- 'type': 'note',
- 'active': False,
- 'hidden': True,
- 'expires_at': timezone.now()
- },
- {'type': 'warning', 'active': False, 'hidden': False, 'expires_at': None},
- {'type': 'watch', 'active': False, 'hidden': True, 'expires_at': None},
- {'type': 'mute', 'active': True, 'hidden': False, 'expires_at': None},
- {'type': 'kick', 'active': True, 'hidden': True, 'expires_at': None},
- {'type': 'ban', 'active': True, 'hidden': False, 'expires_at': None},
- {
- 'type': 'superstar',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now()
- },
- )
-
- InfractionFactory.create(
- actor=self.actor,
- infractions=infractions,
- infraction_model=self.infraction_model,
- user_model=self.user_model,
- )
-
- for infraction in infractions:
- with self.subTest(**infraction):
- self.assertTrue(Infraction.objects.filter(**infraction).exists())
-
-
-class ActiveInfractionMigrationTests(MigrationsTestCase):
- """
- Tests the active infraction data migration.
-
- The active infraction data migration should do the following things:
-
- 1. migrates all active notes, warnings, and kicks to an inactive status;
- 2. migrates all users with multiple active infractions of a single type to have only one active
- infraction of that type. The infraction with the longest duration stays active.
- """
-
- app = "api"
- migration_prior = "0046_reminder_jump_url"
- migration_target = "0047_active_infractions_migration"
-
- @classmethod
- def setUpMigrationData(cls, apps):
- """Sets up an initial database state that contains the relevant test cases."""
- # Fetch the Infraction and User model in the current migration state
- cls.infraction_model = apps.get_model('api', 'Infraction')
- cls.user_model = apps.get_model('api', 'User')
-
- cls.created_infractions = {}
-
- # Moderator that serves as actor for all infractions
- cls.user_moderator = cls.user_model.objects.create(
- id=9999,
- name="Olivier de Vienne",
- discriminator=1040,
- avatar_hash=None,
- )
-
- # User #1: clean user with no infractions
- cls.created_infractions["no infractions"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=[],
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #2: One inactive note infraction
- cls.created_infractions["one inactive note"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'note', 'active': False, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #3: One active note infraction
- cls.created_infractions["one active note"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'note', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #4: One active and one inactive note infraction
- cls.created_infractions["one active and one inactive note"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'note', 'active': False, 'hidden': True},
- {'type': 'note', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #5: Once active note, one active kick, once active warning
- cls.created_infractions["active note, kick, warning"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'note', 'active': True, 'hidden': True},
- {'type': 'kick', 'active': True, 'hidden': True},
- {'type': 'warning', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #6: One inactive ban and one active ban
- cls.created_infractions["one inactive and one active ban"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'ban', 'active': False, 'hidden': True},
- {'type': 'ban', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #7: Two active permanent bans
- cls.created_infractions["two active perm bans"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'ban', 'active': True, 'hidden': True},
- {'type': 'ban', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #8: Multiple active temporary bans
- cls.created_infractions["multiple active temp bans"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=1)
- },
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=10)
- },
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=20)
- },
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=5)
- },
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #9: One active permanent ban, two active temporary bans
- cls.created_infractions["active perm, two active temp bans"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=10)
- },
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': None,
- },
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=7)
- },
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #10: One inactive permanent ban, two active temporary bans
- cls.created_infractions["one inactive perm ban, two active temp bans"] = (
- InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=10)
- },
- {
- 'type': 'ban',
- 'active': False,
- 'hidden': True,
- 'expires_at': None,
- },
- {
- 'type': 'ban',
- 'active': True,
- 'hidden': True,
- 'expires_at': timezone.now() + timedelta(days=7)
- },
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
- )
-
- # User #11: Active ban, active mute, active superstar
- cls.created_infractions["active ban, mute, and superstar"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'ban', 'active': True, 'hidden': True},
- {'type': 'mute', 'active': True, 'hidden': True},
- {'type': 'superstar', 'active': True, 'hidden': True},
- {'type': 'watch', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- # User #12: Multiple active bans, active mutes, active superstars
- cls.created_infractions["multiple active bans, mutes, stars"] = InfractionFactory.create(
- actor=cls.user_moderator,
- infractions=(
- {'type': 'ban', 'active': True, 'hidden': True},
- {'type': 'ban', 'active': True, 'hidden': True},
- {'type': 'ban', 'active': True, 'hidden': True},
- {'type': 'mute', 'active': True, 'hidden': True},
- {'type': 'mute', 'active': True, 'hidden': True},
- {'type': 'mute', 'active': True, 'hidden': True},
- {'type': 'superstar', 'active': True, 'hidden': True},
- {'type': 'superstar', 'active': True, 'hidden': True},
- {'type': 'superstar', 'active': True, 'hidden': True},
- {'type': 'watch', 'active': True, 'hidden': True},
- {'type': 'watch', 'active': True, 'hidden': True},
- {'type': 'watch', 'active': True, 'hidden': True},
- ),
- infraction_model=cls.infraction_model,
- user_model=cls.user_model,
- )
-
- def test_all_never_active_types_became_inactive(self):
- """Are all infractions of a non-active type inactive after the migration?"""
- inactive_type_query = Q(type="note") | Q(type="warning") | Q(type="kick")
- self.assertFalse(
- self.infraction_model.objects.filter(inactive_type_query, active=True).exists()
- )
-
- def test_migration_left_clean_user_without_infractions(self):
- """Do users without infractions have no infractions after the migration?"""
- user_id, infraction_history = self.created_infractions["no infractions"]
- self.assertFalse(
- self.infraction_model.objects.filter(user__id=user_id).exists()
- )
-
- def test_migration_left_user_with_inactive_note_untouched(self):
- """Did the migration leave users with only an inactive note untouched?"""
- user_id, infraction_history = self.created_infractions["one inactive note"]
- inactive_note = infraction_history[0]
- self.assertTrue(
- self.infraction_model.objects.filter(**model_to_dict(inactive_note)).exists()
- )
-
- def test_migration_only_touched_active_field_of_active_note(self):
- """Does the migration only change the `active` field?"""
- user_id, infraction_history = self.created_infractions["one active note"]
- note = model_to_dict(infraction_history[0])
- note['active'] = False
- self.assertTrue(
- self.infraction_model.objects.filter(**note).exists()
- )
-
- def test_migration_only_touched_active_field_of_active_note_left_inactive_untouched(self):
- """Does the migration only change the `active` field of active notes?"""
- user_id, infraction_history = self.created_infractions["one active and one inactive note"]
- for note in infraction_history:
- with self.subTest(active=note.active):
- note = model_to_dict(note)
- note['active'] = False
- self.assertTrue(
- self.infraction_model.objects.filter(**note).exists()
- )
-
- def test_migration_migrates_all_nonactive_types_to_inactive(self):
- """Do we set the `active` field of all non-active infractions to `False`?"""
- user_id, infraction_history = self.created_infractions["active note, kick, warning"]
- self.assertFalse(
- self.infraction_model.objects.filter(user__id=user_id, active=True).exists()
- )
-
- def test_migration_leaves_user_with_one_active_ban_untouched(self):
- """Do we leave a user with one active and one inactive ban untouched?"""
- user_id, infraction_history = self.created_infractions["one inactive and one active ban"]
- for infraction in infraction_history:
- with self.subTest(active=infraction.active):
- self.assertTrue(
- self.infraction_model.objects.filter(**model_to_dict(infraction)).exists()
- )
-
- def test_migration_turns_double_active_perm_ban_into_single_active_perm_ban(self):
- """Does the migration turn two active permanent bans into one active permanent ban?"""
- user_id, infraction_history = self.created_infractions["two active perm bans"]
- active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count()
- self.assertEqual(active_count, 1)
-
- def test_migration_leaves_temporary_ban_with_longest_duration_active(self):
- """Does the migration turn two active permanent bans into one active permanent ban?"""
- user_id, infraction_history = self.created_infractions["multiple active temp bans"]
- active_ban = self.infraction_model.objects.get(user__id=user_id, active=True)
- self.assertEqual(active_ban.expires_at, infraction_history[2].expires_at)
-
- def test_migration_leaves_permanent_ban_active(self):
- """Does the migration leave the permanent ban active?"""
- user_id, infraction_history = self.created_infractions["active perm, two active temp bans"]
- active_ban = self.infraction_model.objects.get(user__id=user_id, active=True)
- self.assertIsNone(active_ban.expires_at)
-
- def test_migration_leaves_longest_temp_ban_active_with_inactive_permanent_ban(self):
- """Does the longest temp ban stay active, even with an inactive perm ban present?"""
- user_id, infraction_history = self.created_infractions[
- "one inactive perm ban, two active temp bans"
- ]
- active_ban = self.infraction_model.objects.get(user__id=user_id, active=True)
- self.assertEqual(active_ban.expires_at, infraction_history[0].expires_at)
-
- def test_migration_leaves_all_active_types_active_if_one_of_each_exists(self):
- """Do all active infractions stay active if only one of each is present?"""
- user_id, infraction_history = self.created_infractions["active ban, mute, and superstar"]
- active_count = self.infraction_model.objects.filter(user__id=user_id, active=True).count()
- self.assertEqual(active_count, 4)
-
- def test_migration_reduces_all_active_types_to_a_single_active_infraction(self):
- """Do we reduce all of the infraction types to one active infraction?"""
- user_id, infraction_history = self.created_infractions["multiple active bans, mutes, stars"]
- active_infractions = self.infraction_model.objects.filter(user__id=user_id, active=True)
- self.assertEqual(len(active_infractions), 4)
- types_observed = [infraction.type for infraction in active_infractions]
-
- for infraction_type in ('ban', 'mute', 'superstar', 'watch'):
- with self.subTest(type=infraction_type):
- self.assertIn(infraction_type, types_observed)
diff --git a/pydis_site/apps/api/tests/migrations/test_base.py b/pydis_site/apps/api/tests/migrations/test_base.py
deleted file mode 100644
index f69bc92c..00000000
--- a/pydis_site/apps/api/tests/migrations/test_base.py
+++ /dev/null
@@ -1,135 +0,0 @@
-import logging
-from unittest.mock import call, patch
-
-from django.db.migrations.loader import MigrationLoader
-from django.test import TestCase
-
-from .base import MigrationsTestCase, connection
-
-log = logging.getLogger(__name__)
-
-
-class SpanishInquisition(MigrationsTestCase):
- app = "api"
- migration_prior = "scragly"
- migration_target = "kosa"
-
-
-@patch("pydis_site.apps.api.tests.migrations.base.MigrationExecutor")
-class MigrationsTestCaseNoSideEffectsTests(TestCase):
- """Tests the MigrationTestCase class with actual migration side effects disabled."""
-
- def setUp(self):
- """Set up an instance of MigrationsTestCase for use in tests."""
- self.test_case = SpanishInquisition()
-
- def test_missing_app_class_raises_value_error(self, _migration_executor):
- """A MigrationsTestCase subclass should set the class-attribute `app`."""
- class Spam(MigrationsTestCase):
- pass
-
- spam = Spam()
- with self.assertRaises(ValueError, msg="The `app` attribute was not set."):
- spam.setUpTestData()
-
- def test_missing_migration_class_attributes_raise_value_error(self, _migration_executor):
- """A MigrationsTestCase subclass should set both `migration_prior` and `migration_target`"""
- class Eggs(MigrationsTestCase):
- app = "api"
- migration_target = "lemon"
-
- class Bacon(MigrationsTestCase):
- app = "api"
- migration_prior = "mark"
-
- instances = (Eggs(), Bacon())
-
- exception_message = "Both ` migration_prior` and `migration_target` need to be set."
- for instance in instances:
- with self.subTest(
- migration_prior=instance.migration_prior,
- migration_target=instance.migration_target,
- ):
- with self.assertRaises(ValueError, msg=exception_message):
- instance.setUpTestData()
-
- @patch(f"{__name__}.SpanishInquisition.setUpMigrationData")
- @patch(f"{__name__}.SpanishInquisition.setUpPostMigrationData")
- def test_migration_data_hooks_are_called_once(self, pre_hook, post_hook, _migration_executor):
- """The `setUpMigrationData` and `setUpPostMigrationData` hooks should be called once."""
- self.test_case.setUpTestData()
- for hook in (pre_hook, post_hook):
- with self.subTest(hook=repr(hook)):
- hook.assert_called_once()
-
- def test_migration_executor_is_instantiated_twice(self, migration_executor):
- """The `MigrationExecutor` should be instantiated with the database connection twice."""
- self.test_case.setUpTestData()
-
- expected_args = [call(connection), call(connection)]
- self.assertEqual(migration_executor.call_args_list, expected_args)
-
- def test_project_state_is_loaded_for_correct_migration_files_twice(self, migration_executor):
- """The `project_state` should first be loaded with `migrate_from`, then `migrate_to`."""
- self.test_case.setUpTestData()
-
- expected_args = [call(self.test_case.migrate_from), call(self.test_case.migrate_to)]
- self.assertEqual(migration_executor().loader.project_state.call_args_list, expected_args)
-
- def test_loader_build_graph_gets_called_once(self, migration_executor):
- """We should rebuild the migration graph before applying the second set of migrations."""
- self.test_case.setUpTestData()
-
- migration_executor().loader.build_graph.assert_called_once()
-
- def test_migration_executor_migrate_method_is_called_correctly_twice(self, migration_executor):
- """The migrate method of the executor should be called twice with the correct arguments."""
- self.test_case.setUpTestData()
-
- self.assertEqual(migration_executor().migrate.call_count, 2)
- calls = [call([('api', 'scragly')]), call([('api', 'kosa')])]
- migration_executor().migrate.assert_has_calls(calls)
-
-
-class LifeOfBrian(MigrationsTestCase):
- app = "api"
- migration_prior = "0046_reminder_jump_url"
- migration_target = "0048_add_infractions_unique_constraints_active"
-
- @classmethod
- def log_last_migration(cls):
- """Parses the applied migrations dictionary to log the last applied migration."""
- loader = MigrationLoader(connection)
- api_migrations = [
- migration for app, migration in loader.applied_migrations if app == cls.app
- ]
- last_migration = max(api_migrations, key=lambda name: int(name[:4]))
- log.info(f"The last applied migration: {last_migration}")
-
- @classmethod
- def setUpMigrationData(cls, apps):
- """Method that logs the last applied migration at this point."""
- cls.log_last_migration()
-
- @classmethod
- def setUpPostMigrationData(cls, apps):
- """Method that logs the last applied migration at this point."""
- cls.log_last_migration()
-
-
-class MigrationsTestCaseMigrationTest(TestCase):
- """Tests if `MigrationsTestCase` travels to the right points in the migration history."""
-
- def test_migrations_test_case_travels_to_correct_migrations_in_history(self):
- """The test case should first revert to `migration_prior`, then go to `migration_target`."""
- brian = LifeOfBrian()
-
- with self.assertLogs(log, level=logging.INFO) as logs:
- brian.setUpTestData()
-
- self.assertEqual(len(logs.records), 2)
-
- for time_point, record in zip(("migration_prior", "migration_target"), logs.records):
- with self.subTest(time_point=time_point):
- message = f"The last applied migration: {getattr(brian, time_point)}"
- self.assertEqual(record.getMessage(), message)
diff --git a/pydis_site/apps/api/tests/test_bumped_threads.py b/pydis_site/apps/api/tests/test_bumped_threads.py
new file mode 100644
index 00000000..2e3892c7
--- /dev/null
+++ b/pydis_site/apps/api/tests/test_bumped_threads.py
@@ -0,0 +1,63 @@
+from django.urls import reverse
+
+from .base import AuthenticatedAPITestCase
+from pydis_site.apps.api.models import BumpedThread
+
+
+class UnauthedBumpedThreadAPITests(AuthenticatedAPITestCase):
+ def setUp(self):
+ super().setUp()
+ self.client.force_authenticate(user=None)
+
+ def test_detail_lookup_returns_401(self):
+ url = reverse('api:bot:bumpedthread-detail', args=(1,))
+ response = self.client.get(url)
+
+ self.assertEqual(response.status_code, 401)
+
+ def test_list_returns_401(self):
+ url = reverse('api:bot:bumpedthread-list')
+ response = self.client.get(url)
+
+ self.assertEqual(response.status_code, 401)
+
+ def test_create_returns_401(self):
+ url = reverse('api:bot:bumpedthread-list')
+ response = self.client.post(url, {"thread_id": 3})
+
+ self.assertEqual(response.status_code, 401)
+
+ def test_delete_returns_401(self):
+ url = reverse('api:bot:bumpedthread-detail', args=(1,))
+ response = self.client.delete(url)
+
+ self.assertEqual(response.status_code, 401)
+
+
+class BumpedThreadAPITests(AuthenticatedAPITestCase):
+ @classmethod
+ def setUpTestData(cls):
+ cls.thread1 = BumpedThread.objects.create(
+ thread_id=1234,
+ )
+
+ def test_returns_bumped_threads_as_flat_list(self):
+ url = reverse('api:bot:bumpedthread-list')
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.json(), [1234])
+
+ def test_returns_204_for_existing_data(self):
+ url = reverse('api:bot:bumpedthread-detail', args=(1234,))
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 204)
+ self.assertEqual(response.content, b"")
+
+ def test_returns_404_for_non_existing_data(self):
+ url = reverse('api:bot:bumpedthread-detail', args=(42,))
+
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 404)
+ self.assertEqual(response.json(), {"detail": "Not found."})
diff --git a/pydis_site/apps/api/tests/test_deleted_messages.py b/pydis_site/apps/api/tests/test_deleted_messages.py
index 1eb535d8..d5501202 100644
--- a/pydis_site/apps/api/tests/test_deleted_messages.py
+++ b/pydis_site/apps/api/tests/test_deleted_messages.py
@@ -1,10 +1,9 @@
-from datetime import datetime
+from datetime import UTC, datetime
from django.urls import reverse
-from django.utils import timezone
from .base import AuthenticatedAPITestCase
-from ..models import MessageDeletionContext, User
+from pydis_site.apps.api.models import MessageDeletionContext, User
class DeletedMessagesWithoutActorTests(AuthenticatedAPITestCase):
@@ -18,7 +17,7 @@ class DeletedMessagesWithoutActorTests(AuthenticatedAPITestCase):
cls.data = {
'actor': None,
- 'creation': datetime.utcnow().isoformat(),
+ 'creation': datetime.now(tz=UTC).isoformat(),
'deletedmessage_set': [
{
'author': cls.author.id,
@@ -58,7 +57,7 @@ class DeletedMessagesWithActorTests(AuthenticatedAPITestCase):
cls.data = {
'actor': cls.actor.id,
- 'creation': datetime.utcnow().isoformat(),
+ 'creation': datetime.now(tz=UTC).isoformat(),
'deletedmessage_set': [
{
'author': cls.author.id,
@@ -90,7 +89,7 @@ class DeletedMessagesLogURLTests(AuthenticatedAPITestCase):
cls.deletion_context = MessageDeletionContext.objects.create(
actor=cls.actor,
- creation=timezone.now()
+ creation=datetime.now(tz=UTC),
)
def test_valid_log_url(self):
diff --git a/pydis_site/apps/api/tests/test_documentation_links.py b/pydis_site/apps/api/tests/test_documentation_links.py
index 4e238cbb..f4a332cb 100644
--- a/pydis_site/apps/api/tests/test_documentation_links.py
+++ b/pydis_site/apps/api/tests/test_documentation_links.py
@@ -1,7 +1,7 @@
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import DocumentationLink
+from pydis_site.apps.api.models import DocumentationLink
class UnauthedDocumentationLinkAPITests(AuthenticatedAPITestCase):
diff --git a/pydis_site/apps/api/tests/test_filterlists.py b/pydis_site/apps/api/tests/test_filterlists.py
deleted file mode 100644
index 5a5bca60..00000000
--- a/pydis_site/apps/api/tests/test_filterlists.py
+++ /dev/null
@@ -1,122 +0,0 @@
-from django.urls import reverse
-
-from pydis_site.apps.api.models import FilterList
-from pydis_site.apps.api.tests.base import AuthenticatedAPITestCase
-
-URL = reverse('api:bot:filterlist-list')
-JPEG_ALLOWLIST = {
- "type": 'FILE_FORMAT',
- "allowed": True,
- "content": ".jpeg",
-}
-PNG_ALLOWLIST = {
- "type": 'FILE_FORMAT',
- "allowed": True,
- "content": ".png",
-}
-
-
-class UnauthenticatedTests(AuthenticatedAPITestCase):
- def setUp(self):
- super().setUp()
- self.client.force_authenticate(user=None)
-
- def test_cannot_read_allowedlist_list(self):
- response = self.client.get(URL)
-
- self.assertEqual(response.status_code, 401)
-
-
-class EmptyDatabaseTests(AuthenticatedAPITestCase):
- @classmethod
- def setUpTestData(cls):
- FilterList.objects.all().delete()
-
- def test_returns_empty_object(self):
- response = self.client.get(URL)
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.json(), [])
-
-
-class FetchTests(AuthenticatedAPITestCase):
- @classmethod
- def setUpTestData(cls):
- FilterList.objects.all().delete()
- cls.jpeg_format = FilterList.objects.create(**JPEG_ALLOWLIST)
- cls.png_format = FilterList.objects.create(**PNG_ALLOWLIST)
-
- def test_returns_name_in_list(self):
- response = self.client.get(URL)
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.json()[0]["content"], self.jpeg_format.content)
- self.assertEqual(response.json()[1]["content"], self.png_format.content)
-
- def test_returns_single_item_by_id(self):
- response = self.client.get(f'{URL}/{self.jpeg_format.id}')
-
- self.assertEqual(response.status_code, 200)
- self.assertEqual(response.json().get("content"), self.jpeg_format.content)
-
- def test_returns_filter_list_types(self):
- response = self.client.get(f'{URL}/get-types')
-
- self.assertEqual(response.status_code, 200)
- for api_type, model_type in zip(response.json(), FilterList.FilterListType.choices):
- self.assertEquals(api_type[0], model_type[0])
- self.assertEquals(api_type[1], model_type[1])
-
-
-class CreationTests(AuthenticatedAPITestCase):
- @classmethod
- def setUpTestData(cls):
- FilterList.objects.all().delete()
-
- def test_returns_400_for_missing_params(self):
- no_type_json = {
- "allowed": True,
- "content": ".jpeg"
- }
- no_allowed_json = {
- "type": "FILE_FORMAT",
- "content": ".jpeg"
- }
- no_content_json = {
- "allowed": True,
- "type": "FILE_FORMAT"
- }
- cases = [{}, no_type_json, no_allowed_json, no_content_json]
-
- for case in cases:
- with self.subTest(case=case):
- response = self.client.post(URL, data=case)
- self.assertEqual(response.status_code, 400)
-
- def test_returns_201_for_successful_creation(self):
- response = self.client.post(URL, data=JPEG_ALLOWLIST)
- self.assertEqual(response.status_code, 201)
-
- def test_returns_400_for_duplicate_creation(self):
- self.client.post(URL, data=JPEG_ALLOWLIST)
- response = self.client.post(URL, data=JPEG_ALLOWLIST)
- self.assertEqual(response.status_code, 400)
-
-
-class DeletionTests(AuthenticatedAPITestCase):
- @classmethod
- def setUpTestData(cls):
- FilterList.objects.all().delete()
- cls.jpeg_format = FilterList.objects.create(**JPEG_ALLOWLIST)
- cls.png_format = FilterList.objects.create(**PNG_ALLOWLIST)
-
- def test_deleting_unknown_id_returns_404(self):
- response = self.client.delete(f"{URL}/200")
- self.assertEqual(response.status_code, 404)
-
- def test_deleting_known_id_returns_204(self):
- response = self.client.delete(f"{URL}/{self.jpeg_format.id}")
- self.assertEqual(response.status_code, 204)
-
- response = self.client.get(f"{URL}/{self.jpeg_format.id}")
- self.assertNotIn(self.png_format.content, response.json())
diff --git a/pydis_site/apps/api/tests/test_filters.py b/pydis_site/apps/api/tests/test_filters.py
new file mode 100644
index 00000000..4cef1c8f
--- /dev/null
+++ b/pydis_site/apps/api/tests/test_filters.py
@@ -0,0 +1,352 @@
+import contextlib
+from dataclasses import dataclass
+from datetime import timedelta
+from typing import Any
+
+from django.db.models import Model
+from django.urls import reverse
+
+from pydis_site.apps.api.models.bot.filters import Filter, FilterList
+from pydis_site.apps.api.tests.base import AuthenticatedAPITestCase
+
+
+@dataclass()
+class TestSequence:
+ model: type[Model]
+ route: str
+ object: dict[str, Any]
+ ignored_fields: tuple[str, ...] = ()
+
+ def url(self, detail: bool = False) -> str:
+ return reverse(f'api:bot:{self.route}-{"detail" if detail else "list"}')
+
+
+FK_FIELDS: dict[type[Model], tuple[str, ...]] = {
+ FilterList: (),
+ Filter: ("filter_list",),
+}
+
+
+def get_test_sequences() -> dict[str, TestSequence]:
+ filter_list1_deny_dict = {
+ "name": "testname",
+ "list_type": 0,
+ "guild_pings": [],
+ "filter_dm": True,
+ "dm_pings": [],
+ "remove_context": False,
+ "bypass_roles": [],
+ "enabled": True,
+ "dm_content": "",
+ "dm_embed": "",
+ "infraction_type": "NONE",
+ "infraction_reason": "",
+ "infraction_duration": timedelta(seconds=0),
+ "infraction_channel": 0,
+ "disabled_channels": [],
+ "disabled_categories": [],
+ "enabled_channels": [],
+ "enabled_categories": [],
+ "send_alert": True
+ }
+ filter_list1_allow_dict = filter_list1_deny_dict.copy()
+ filter_list1_allow_dict["list_type"] = 1
+ filter_list1_allow = FilterList(**filter_list1_allow_dict)
+
+ return {
+ "filter_list1": TestSequence(
+ FilterList,
+ "filterlist",
+ filter_list1_deny_dict,
+ ignored_fields=("filters", "created_at", "updated_at")
+ ),
+ "filter_list2": TestSequence(
+ FilterList,
+ "filterlist",
+ {
+ "name": "testname2",
+ "list_type": 1,
+ "guild_pings": ["Moderators"],
+ "filter_dm": False,
+ "dm_pings": ["here"],
+ "remove_context": True,
+ "bypass_roles": ["123456"],
+ "enabled": False,
+ "dm_content": "testing testing",
+ "dm_embed": "one two three",
+ "infraction_type": "TIMEOUT",
+ "infraction_reason": "stop testing",
+ "infraction_duration": timedelta(seconds=10.5),
+ "infraction_channel": 123,
+ "disabled_channels": ["python-general"],
+ "disabled_categories": ["CODE JAM"],
+ "enabled_channels": ["mighty-mice"],
+ "enabled_categories": ["Lobby"],
+ "send_alert": False
+ },
+ ignored_fields=("filters", "created_at", "updated_at")
+ ),
+ "filter": TestSequence(
+ Filter,
+ "filter",
+ {
+ "content": "bad word",
+ "description": "This is a really bad word.",
+ "additional_settings": "{'hi': 'there'}",
+ "guild_pings": None,
+ "filter_dm": None,
+ "dm_pings": None,
+ "remove_context": None,
+ "bypass_roles": None,
+ "enabled": None,
+ "dm_content": None,
+ "dm_embed": None,
+ "infraction_type": None,
+ "infraction_reason": None,
+ "infraction_duration": None,
+ "infraction_channel": None,
+ "disabled_channels": None,
+ "disabled_categories": None,
+ "enabled_channels": None,
+ "enabled_categories": None,
+ "send_alert": None,
+ "filter_list": filter_list1_allow
+ },
+ ignored_fields=("created_at", "updated_at")
+ ),
+ }
+
+
+def save_nested_objects(object_: Model, save_root: bool = True) -> None:
+ for field in FK_FIELDS.get(object_.__class__, ()):
+ value = getattr(object_, field)
+ save_nested_objects(value)
+
+ if save_root:
+ object_.save()
+
+
+def clean_test_json(json: dict) -> dict:
+ for key, value in json.items():
+ if isinstance(value, Model):
+ json[key] = value.id
+ elif isinstance(value, timedelta):
+ json[key] = str(value.total_seconds())
+
+ return json
+
+
+def clean_api_json(json: dict, sequence: TestSequence) -> dict:
+ for field in sequence.ignored_fields + ("id",):
+ with contextlib.suppress(KeyError):
+ del json[field]
+
+ return json
+
+
+def flatten_settings(json: dict) -> dict:
+ settings = json.pop("settings", {})
+ flattened_settings = {}
+ for entry, value in settings.items():
+ if isinstance(value, dict):
+ flattened_settings.update(value)
+ else:
+ flattened_settings[entry] = value
+
+ json.update(flattened_settings)
+
+ return json
+
+
+class GenericFilterTests(AuthenticatedAPITestCase):
+
+ def test_cannot_read_unauthenticated(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ self.client.force_authenticate(user=None)
+
+ response = self.client.get(sequence.url())
+ self.assertEqual(response.status_code, 401)
+
+ def test_empty_database(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ sequence.model.objects.all().delete()
+
+ response = self.client.get(sequence.url())
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(response.json(), [])
+
+ def test_fetch(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ sequence.model.objects.all().delete()
+
+ save_nested_objects(sequence.model(**sequence.object))
+
+ response = self.client.get(sequence.url())
+ self.assertDictEqual(
+ clean_test_json(sequence.object),
+ clean_api_json(flatten_settings(response.json()[0]), sequence)
+ )
+
+ def test_fetch_by_id(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ sequence.model.objects.all().delete()
+
+ saved = sequence.model(**sequence.object)
+ save_nested_objects(saved)
+
+ response = self.client.get(f"{sequence.url()}/{saved.id}")
+ self.assertDictEqual(
+ clean_test_json(sequence.object),
+ clean_api_json(flatten_settings(response.json()), sequence)
+ )
+
+ def test_fetch_non_existing(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ sequence.model.objects.all().delete()
+
+ response = self.client.get(f"{sequence.url()}/42")
+ self.assertEqual(response.status_code, 404)
+ self.assertDictEqual(response.json(), {'detail': 'Not found.'})
+
+ def test_creation(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ sequence.model.objects.all().delete()
+
+ save_nested_objects(sequence.model(**sequence.object), False)
+ data = clean_test_json(sequence.object.copy())
+ response = self.client.post(sequence.url(), data=data)
+
+ self.assertEqual(response.status_code, 201)
+ self.assertDictEqual(
+ clean_api_json(flatten_settings(response.json()), sequence),
+ clean_test_json(sequence.object)
+ )
+
+ def test_creation_missing_field(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ ignored_fields = sequence.ignored_fields + ("id", "additional_settings")
+ with self.subTest(name=name):
+ saved = sequence.model(**sequence.object)
+ save_nested_objects(saved)
+ data = clean_test_json(sequence.object.copy())
+
+ for field in sequence.model._meta.get_fields():
+ with self.subTest(field=field):
+ if field.null or field.name in ignored_fields:
+ continue
+
+ test_data = data.copy()
+ del test_data[field.name]
+
+ response = self.client.post(sequence.url(), data=test_data)
+ self.assertEqual(response.status_code, 400)
+
+ def test_deletion(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ saved = sequence.model(**sequence.object)
+ save_nested_objects(saved)
+
+ response = self.client.delete(f"{sequence.url()}/{saved.id}")
+ self.assertEqual(response.status_code, 204)
+
+ def test_deletion_non_existing(self) -> None:
+ for name, sequence in get_test_sequences().items():
+ with self.subTest(name=name):
+ sequence.model.objects.all().delete()
+
+ response = self.client.delete(f"{sequence.url()}/42")
+ self.assertEqual(response.status_code, 404)
+
+
+class FilterValidationTests(AuthenticatedAPITestCase):
+
+ def test_filter_validation(self) -> None:
+ test_sequences = get_test_sequences()
+ base_filter = test_sequences["filter"]
+ base_filter_list = test_sequences["filter_list1"]
+ cases = (
+ ({"infraction_reason": "hi"}, {}, 400),
+ ({"infraction_duration": timedelta(seconds=10)}, {}, 400),
+ ({"infraction_reason": "hi"}, {"infraction_type": "NOTE"}, 200),
+ ({"infraction_type": "TIMEOUT", "infraction_duration": timedelta(days=30)}, {}, 400),
+ ({"infraction_duration": timedelta(seconds=10)}, {"infraction_type": "TIMEOUT"}, 200),
+ ({"enabled_channels": ["admins"]}, {}, 200),
+ ({"disabled_channels": ["123"]}, {}, 200),
+ ({"enabled_categories": ["CODE JAM"]}, {}, 200),
+ ({"disabled_categories": ["CODE JAM"]}, {}, 200),
+ ({"enabled_channels": ["admins"], "disabled_channels": ["123", "admins"]}, {}, 400),
+ ({"enabled_categories": ["admins"], "disabled_categories": ["123", "admins"]}, {}, 400),
+ ({"enabled_channels": ["admins"]}, {"disabled_channels": ["123", "admins"]}, 400),
+ ({"enabled_categories": ["admins"]}, {"disabled_categories": ["123", "admins"]}, 400),
+ )
+
+ for filter_settings, filter_list_settings, response_code in cases:
+ with self.subTest(
+ f_settings=filter_settings, fl_settings=filter_list_settings, response=response_code
+ ):
+ base_filter.model.objects.all().delete()
+ base_filter_list.model.objects.all().delete()
+
+ case_filter_dict = base_filter.object.copy()
+ case_fl_dict = base_filter_list.object.copy()
+ case_fl_dict.update(filter_list_settings)
+
+ case_fl = base_filter_list.model(**case_fl_dict)
+ case_filter_dict["filter_list"] = case_fl
+ case_filter = base_filter.model(**case_filter_dict)
+ save_nested_objects(case_filter)
+
+ filter_settings["filter_list"] = case_fl
+ response = self.client.patch(
+ f"{base_filter.url()}/{case_filter.id}", data=clean_test_json(filter_settings)
+ )
+ self.assertEqual(response.status_code, response_code)
+
+ def test_filter_list_validation(self) -> None:
+ test_sequences = get_test_sequences()
+ base_filter_list = test_sequences["filter_list1"]
+ cases = (
+ ({"infraction_reason": "hi"}, 400),
+ ({"infraction_duration": timedelta(seconds=10)}, 400),
+ ({"infraction_type": "TIMEOUT", "infraction_duration": timedelta(days=30)}, 400),
+ ({"infraction_reason": "hi", "infraction_type": "NOTE"}, 200),
+ ({"infraction_duration": timedelta(seconds=10), "infraction_type": "TIMEOUT"}, 200),
+ ({"enabled_channels": ["admins"]}, 200), ({"disabled_channels": ["123"]}, 200),
+ ({"enabled_categories": ["CODE JAM"]}, 200),
+ ({"disabled_categories": ["CODE JAM"]}, 200),
+ ({"enabled_channels": ["admins"], "disabled_channels": ["123", "admins"]}, 400),
+ ({"enabled_categories": ["admins"], "disabled_categories": ["123", "admins"]}, 400),
+ )
+
+ for filter_list_settings, response_code in cases:
+ with self.subTest(fl_settings=filter_list_settings, response=response_code):
+ base_filter_list.model.objects.all().delete()
+
+ case_fl_dict = base_filter_list.object.copy()
+ case_fl = base_filter_list.model(**case_fl_dict)
+ save_nested_objects(case_fl)
+
+ response = self.client.patch(
+ f"{base_filter_list.url()}/{case_fl.id}",
+ data=clean_test_json(filter_list_settings)
+ )
+ self.assertEqual(response.status_code, response_code)
+
+ def test_filter_unique_constraint(self) -> None:
+ test_filter = get_test_sequences()["filter"]
+ test_filter.model.objects.all().delete()
+ test_filter_object = test_filter.model(**test_filter.object)
+ save_nested_objects(test_filter_object, False)
+
+ response = self.client.post(test_filter.url(), data=clean_test_json(test_filter.object))
+ self.assertEqual(response.status_code, 201)
+
+ response = self.client.post(test_filter.url(), data=clean_test_json(test_filter.object))
+ self.assertEqual(response.status_code, 400)
diff --git a/pydis_site/apps/api/tests/test_github_utils.py b/pydis_site/apps/api/tests/test_github_utils.py
new file mode 100644
index 00000000..d36111c9
--- /dev/null
+++ b/pydis_site/apps/api/tests/test_github_utils.py
@@ -0,0 +1,289 @@
+import dataclasses
+import datetime
+import typing
+import unittest
+from unittest import mock
+
+import django.test
+import httpx
+import jwt
+import rest_framework.response
+import rest_framework.test
+from django.urls import reverse
+
+from pydis_site import settings
+from pydis_site.apps.api import github_utils
+
+
+class GeneralUtilityTests(unittest.TestCase):
+ """Test the utility methods which do not fit in another class."""
+
+ def test_token_generation(self):
+ """Test that the a valid JWT token is generated."""
+ def encode(payload: dict, _: str, algorithm: str, *args, **kwargs) -> str:
+ """
+ Intercept the encode method.
+
+ The result is encoded with an algorithm which does not require a PEM key, as it may
+ not be available in testing environments.
+ """
+ self.assertEqual("RS256", algorithm, "The GitHub App JWT must be signed using RS256.")
+ return original_encode(
+ payload, "secret-encoding-key", *args, algorithm="HS256", **kwargs
+ )
+
+ original_encode = jwt.encode
+ with mock.patch("jwt.encode", new=encode):
+ token = github_utils.generate_token()
+ decoded = jwt.decode(token, "secret-encoding-key", algorithms=["HS256"])
+
+ delta = datetime.timedelta(minutes=10)
+ self.assertAlmostEqual(decoded["exp"] - decoded["iat"], delta.total_seconds())
+ then = datetime.datetime.now(tz=datetime.UTC) + delta
+ self.assertLess(decoded["exp"], then.timestamp())
+
+
+class CheckRunTests(unittest.TestCase):
+ """Tests the check_run_status utility."""
+
+ run_kwargs: typing.Mapping = {
+ "name": "run_name",
+ "head_sha": "sha",
+ "status": "completed",
+ "conclusion": "success",
+ "created_at": datetime.datetime.now(tz=datetime.UTC).strftime(settings.GITHUB_TIMESTAMP_FORMAT),
+ "artifacts_url": "url",
+ }
+
+ def test_completed_run(self):
+ """Test that an already completed run returns the correct URL."""
+ final_url = "some_url_string_1234"
+
+ kwargs = dict(self.run_kwargs, artifacts_url=final_url)
+ result = github_utils.check_run_status(github_utils.WorkflowRun(**kwargs))
+ self.assertEqual(final_url, result)
+
+ def test_pending_run(self):
+ """Test that a pending run raises the proper exception."""
+ kwargs = dict(self.run_kwargs, status="pending")
+ with self.assertRaises(github_utils.RunPendingError):
+ github_utils.check_run_status(github_utils.WorkflowRun(**kwargs))
+
+ def test_timeout_error(self):
+ """Test that a timeout is declared after a certain duration."""
+ kwargs = dict(self.run_kwargs, status="pending")
+ # Set the creation time to well before the MAX_RUN_TIME
+ # to guarantee the right conclusion
+ kwargs["created_at"] = (
+ datetime.datetime.now(tz=datetime.UTC)
+ - github_utils.MAX_RUN_TIME - datetime.timedelta(minutes=10)
+ ).strftime(settings.GITHUB_TIMESTAMP_FORMAT)
+
+ with self.assertRaises(github_utils.RunTimeoutError):
+ github_utils.check_run_status(github_utils.WorkflowRun(**kwargs))
+
+ def test_failed_run(self):
+ """Test that a failed run raises the proper exception."""
+ kwargs = dict(self.run_kwargs, conclusion="failed")
+ with self.assertRaises(github_utils.ActionFailedError):
+ github_utils.check_run_status(github_utils.WorkflowRun(**kwargs))
+
+
+def get_response_authorize(_: httpx.Client, request: httpx.Request, **__) -> httpx.Response:
+ """
+ Helper method for the authorize tests.
+
+ Requests are intercepted before being sent out, and the appropriate responses are returned.
+ """
+ path = request.url.path
+ auth = request.headers.get("Authorization")
+
+ if request.method == "GET":
+ if path == "/app/installations":
+ if auth == "bearer JWT initial token":
+ return httpx.Response(200, request=request, json=[{
+ "account": {"login": "VALID_OWNER"},
+ "access_tokens_url": "https://example.com/ACCESS_TOKEN_URL"
+ }])
+ return httpx.Response(
+ 401, json={"error": "auth app/installations"}, request=request
+ )
+
+ elif path == "/installation/repositories": # noqa: RET505
+ if auth == "bearer app access token":
+ return httpx.Response(200, request=request, json={
+ "repositories": [{
+ "name": "VALID_REPO"
+ }]
+ })
+ return httpx.Response( # pragma: no cover
+ 401, json={"error": "auth installation/repositories"}, request=request
+ )
+
+ elif request.method == "POST": # noqa: RET505
+ if path == "/ACCESS_TOKEN_URL":
+ if auth == "bearer JWT initial token":
+ return httpx.Response(200, request=request, json={"token": "app access token"})
+ return httpx.Response(401, json={"error": "auth access_token"}, request=request) # pragma: no cover
+
+ # Reaching this point means something has gone wrong
+ return httpx.Response(500, request=request) # pragma: no cover
+
+
[email protected]("httpx.Client.send", new=get_response_authorize)
[email protected](github_utils, "generate_token", new=mock.Mock(return_value="JWT initial token"))
+class AuthorizeTests(unittest.TestCase):
+ """Test the authorize utility."""
+
+ def test_invalid_apps_auth(self):
+ """Test that an exception is raised if authorization was attempted with an invalid token."""
+ with mock.patch.object(github_utils, "generate_token", return_value="Invalid token"): # noqa: SIM117
+ with self.assertRaises(httpx.HTTPStatusError) as error:
+ github_utils.authorize("VALID_OWNER", "VALID_REPO")
+
+ exception: httpx.HTTPStatusError = error.exception
+ self.assertEqual(401, exception.response.status_code)
+ self.assertEqual("auth app/installations", exception.response.json()["error"])
+
+ def test_missing_repo(self):
+ """Test that an exception is raised when the selected owner or repo are not available."""
+ with self.assertRaises(github_utils.NotFoundError):
+ github_utils.authorize("INVALID_OWNER", "VALID_REPO")
+ with self.assertRaises(github_utils.NotFoundError):
+ github_utils.authorize("VALID_OWNER", "INVALID_REPO")
+
+ def test_valid_authorization(self):
+ """Test that an accessible repository can be accessed."""
+ client = github_utils.authorize("VALID_OWNER", "VALID_REPO")
+ self.assertEqual("bearer app access token", client.headers.get("Authorization"))
+
+
+class ArtifactFetcherTests(unittest.TestCase):
+ """Test the get_artifact utility."""
+
+ @staticmethod
+ def get_response_get_artifact(request: httpx.Request, **_) -> httpx.Response:
+ """
+ Helper method for the get_artifact tests.
+
+ Requests are intercepted before being sent out, and the appropriate responses are returned.
+ """
+ path = request.url.path
+
+ if "force_error" in path:
+ return httpx.Response(404, request=request)
+
+ if request.method == "GET":
+ if path == "/repos/owner/repo/actions/runs":
+ run = github_utils.WorkflowRun(
+ name="action_name",
+ head_sha="action_sha",
+ created_at=(
+ datetime.datetime
+ .now(tz=datetime.UTC)
+ .strftime(settings.GITHUB_TIMESTAMP_FORMAT)
+ ),
+ status="completed",
+ conclusion="success",
+ artifacts_url="artifacts_url"
+ )
+ return httpx.Response(
+ 200, request=request, json={"workflow_runs": [dataclasses.asdict(run)]}
+ )
+ elif path == "/artifact_url": # noqa: RET505
+ return httpx.Response(
+ 200, request=request, json={"artifacts": [{
+ "name": "artifact_name",
+ "archive_download_url": "artifact_download_url"
+ }]}
+ )
+ elif path == "/artifact_download_url":
+ response = httpx.Response(302, request=request)
+ response.next_request = httpx.Request(
+ "GET",
+ httpx.URL("https://final_download.url")
+ )
+ return response
+
+ # Reaching this point means something has gone wrong
+ return httpx.Response(500, request=request) # pragma: no cover
+
+ def setUp(self) -> None:
+ self.call_args = ["owner", "repo", "action_sha", "action_name", "artifact_name"]
+ self.client = httpx.Client(base_url="https://example.com")
+
+ self.patchers = [
+ mock.patch.object(self.client, "send", new=self.get_response_get_artifact),
+ mock.patch.object(github_utils, "authorize", return_value=self.client),
+ mock.patch.object(github_utils, "check_run_status", return_value="artifact_url"),
+ ]
+
+ for patcher in self.patchers:
+ patcher.start()
+
+ def tearDown(self) -> None:
+ for patcher in self.patchers:
+ patcher.stop()
+
+ def test_client_closed_on_errors(self):
+ """Test that the client is terminated even if an error occurs at some point."""
+ self.call_args[0] = "force_error"
+ with self.assertRaises(httpx.HTTPStatusError):
+ github_utils.get_artifact(*self.call_args)
+ self.assertTrue(self.client.is_closed)
+
+ def test_missing(self):
+ """Test that an exception is raised if the requested artifact was not found."""
+ cases = (
+ "invalid sha",
+ "invalid action name",
+ "invalid artifact name",
+ )
+ for i, name in enumerate(cases, 2):
+ with self.subTest(f"Test {name} raises an error"):
+ new_args = self.call_args.copy()
+ new_args[i] = name
+
+ with self.assertRaises(github_utils.NotFoundError):
+ github_utils.get_artifact(*new_args)
+
+ def test_valid(self):
+ """Test that the correct download URL is returned for valid requests."""
+ url = github_utils.get_artifact(*self.call_args)
+ self.assertEqual("https://final_download.url", url)
+ self.assertTrue(self.client.is_closed)
+
+
[email protected](github_utils, "get_artifact")
+class GitHubArtifactViewTests(django.test.TestCase):
+ """Test the GitHub artifact fetch API view."""
+
+ def setUp(self):
+ self.kwargs = {
+ "owner": "test_owner",
+ "repo": "test_repo",
+ "sha": "test_sha",
+ "action_name": "test_action",
+ "artifact_name": "test_artifact",
+ }
+ self.url = reverse("api:github-artifacts", kwargs=self.kwargs)
+
+ def test_correct_artifact(self, artifact_mock: mock.Mock):
+ """Test a proper response is returned with proper input."""
+ artifact_mock.return_value = "final download url"
+ result = self.client.get(self.url)
+
+ self.assertIsInstance(result, rest_framework.response.Response)
+ self.assertEqual({"url": artifact_mock.return_value}, result.data)
+
+ def test_failed_fetch(self, artifact_mock: mock.Mock):
+ """Test that a proper error is returned when the request fails."""
+ artifact_mock.side_effect = github_utils.NotFoundError("Test error message")
+ result = self.client.get(self.url)
+
+ self.assertIsInstance(result, rest_framework.response.Response)
+ self.assertEqual({
+ "error_type": github_utils.NotFoundError.__name__,
+ "error": "Test error message",
+ "requested_resource": "/".join(self.kwargs.values())
+ }, result.data)
diff --git a/pydis_site/apps/api/tests/test_github_webhook_filter.py b/pydis_site/apps/api/tests/test_github_webhook_filter.py
new file mode 100644
index 00000000..8ca60511
--- /dev/null
+++ b/pydis_site/apps/api/tests/test_github_webhook_filter.py
@@ -0,0 +1,62 @@
+from unittest import mock
+from urllib.error import HTTPError
+
+from django.urls import reverse
+from rest_framework.test import APITestCase
+
+from pydis_site.apps.api.views import GitHubWebhookFilterView
+
+class GitHubWebhookFilterAPITests(APITestCase):
+ def test_ignores_bot_sender(self):
+ url = reverse('api:github-webhook-filter', args=('id', 'token'))
+ payload = {'sender': {'login': 'limette', 'type': 'bot'}}
+ headers = {'X-GitHub-Event': 'pull_request_review'}
+ response = self.client.post(url, data=payload, headers=headers)
+ self.assertEqual(response.status_code, 203)
+
+ def test_accepts_interesting_events(self):
+ url = reverse('api:github-webhook-filter', args=('id', 'token'))
+ payload = {
+ 'ref': 'refs/heads/master',
+ 'pull_request': {
+ 'user': {
+ 'login': "lemon",
+ }
+ },
+ 'review': {
+ 'state': 'commented',
+ 'body': "Amazing!!!"
+ },
+ 'repository': {
+ 'name': 'black',
+ 'owner': {
+ 'login': 'psf',
+ }
+ }
+ }
+ headers = {'X-GitHub-Event': 'pull_request_review'}
+
+ with mock.patch('urllib.request.urlopen') as urlopen:
+ urlopen.return_value = mock.MagicMock()
+ context_mock = urlopen.return_value.__enter__.return_value
+ context_mock.status = 299
+ context_mock.getheaders.return_value = [('X-Clacks-Overhead', 'Joe Armstrong')]
+ context_mock.read.return_value = b'{"status": "ok"}'
+
+ response = self.client.post(url, data=payload, headers=headers)
+ self.assertEqual(response.status_code, context_mock.status)
+ self.assertEqual(response.headers.get('X-Clacks-Overhead'), 'Joe Armstrong')
+
+ def test_rate_limit_is_logged_to_sentry(self):
+ url = reverse('api:github-webhook-filter', args=('id', 'token'))
+ payload = {}
+ headers = {'X-GitHub-Event': 'pull_request_review'}
+ with (
+ mock.patch('urllib.request.urlopen') as urlopen,
+ mock.patch.object(GitHubWebhookFilterView, "logger") as logger,
+ ):
+ urlopen.side_effect = HTTPError(None, 429, 'Too Many Requests', {}, None)
+ logger.warning = mock.PropertyMock()
+ self.client.post(url, data=payload, headers=headers)
+
+ logger.warning.assert_called_once()
diff --git a/pydis_site/apps/api/tests/test_infractions.py b/pydis_site/apps/api/tests/test_infractions.py
index b3dd16ee..f1e54b1e 100644
--- a/pydis_site/apps/api/tests/test_infractions.py
+++ b/pydis_site/apps/api/tests/test_infractions.py
@@ -1,14 +1,15 @@
import datetime
-from datetime import datetime as dt, timedelta, timezone
+from datetime import UTC, datetime as dt, timedelta
from unittest.mock import patch
from urllib.parse import quote
+from django.db import transaction
from django.db.utils import IntegrityError
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import Infraction, User
-from ..serializers import InfractionSerializer
+from pydis_site.apps.api.models import Infraction, User
+from pydis_site.apps.api.serializers import InfractionSerializer
class UnauthenticatedTests(AuthenticatedAPITestCase):
@@ -55,23 +56,26 @@ class InfractionTests(AuthenticatedAPITestCase):
type='ban',
reason='He terk my jerb!',
hidden=True,
- expires_at=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc),
- active=True
+ inserted_at=dt(2020, 10, 10, 0, 0, 0, tzinfo=UTC),
+ expires_at=dt(5018, 11, 20, 15, 52, tzinfo=UTC),
+ active=True,
)
cls.ban_inactive = Infraction.objects.create(
user_id=cls.user.id,
actor_id=cls.user.id,
type='ban',
reason='James is an ass, and we won\'t be working with him again.',
- active=False
+ active=False,
+ inserted_at=dt(2020, 10, 10, 0, 1, 0, tzinfo=UTC),
)
- cls.mute_permanent = Infraction.objects.create(
+ cls.timeout_permanent = Infraction.objects.create(
user_id=cls.user.id,
actor_id=cls.user.id,
- type='mute',
+ type='timeout',
reason='He has a filthy mouth and I am his soap.',
active=True,
- expires_at=None
+ inserted_at=dt(2020, 10, 10, 0, 2, 0, tzinfo=UTC),
+ expires_at=None,
)
cls.superstar_expires_soon = Infraction.objects.create(
user_id=cls.user.id,
@@ -79,7 +83,8 @@ class InfractionTests(AuthenticatedAPITestCase):
type='superstar',
reason='This one doesn\'t matter anymore.',
active=True,
- expires_at=datetime.datetime.utcnow() + datetime.timedelta(hours=5)
+ inserted_at=dt(2020, 10, 10, 0, 3, 0, tzinfo=UTC),
+ expires_at=dt.now(UTC) + datetime.timedelta(hours=5),
)
cls.voiceban_expires_later = Infraction.objects.create(
user_id=cls.user.id,
@@ -87,7 +92,8 @@ class InfractionTests(AuthenticatedAPITestCase):
type='voice_ban',
reason='Jet engine mic',
active=True,
- expires_at=datetime.datetime.utcnow() + datetime.timedelta(days=5)
+ inserted_at=dt(2020, 10, 10, 0, 4, 0, tzinfo=UTC),
+ expires_at=dt.now(UTC) + datetime.timedelta(days=5),
)
def test_list_all(self):
@@ -101,7 +107,7 @@ class InfractionTests(AuthenticatedAPITestCase):
self.assertEqual(len(infractions), 5)
self.assertEqual(infractions[0]['id'], self.voiceban_expires_later.id)
self.assertEqual(infractions[1]['id'], self.superstar_expires_soon.id)
- self.assertEqual(infractions[2]['id'], self.mute_permanent.id)
+ self.assertEqual(infractions[2]['id'], self.timeout_permanent.id)
self.assertEqual(infractions[3]['id'], self.ban_inactive.id)
self.assertEqual(infractions[4]['id'], self.ban_hidden.id)
@@ -128,7 +134,7 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_filter_permanent_false(self):
url = reverse('api:bot:infraction-list')
- response = self.client.get(f'{url}?type=mute&permanent=false')
+ response = self.client.get(f'{url}?type=timeout&permanent=false')
self.assertEqual(response.status_code, 200)
infractions = response.json()
@@ -137,17 +143,17 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_filter_permanent_true(self):
url = reverse('api:bot:infraction-list')
- response = self.client.get(f'{url}?type=mute&permanent=true')
+ response = self.client.get(f'{url}?type=timeout&permanent=true')
self.assertEqual(response.status_code, 200)
infractions = response.json()
- self.assertEqual(infractions[0]['id'], self.mute_permanent.id)
+ self.assertEqual(infractions[0]['id'], self.timeout_permanent.id)
def test_filter_after(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5)
- response = self.client.get(f'{url}?type=superstar&expires_after={target_time.isoformat()}')
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5)
+ response = self.client.get(url, {'type': 'superstar', 'expires_after': target_time.isoformat()})
self.assertEqual(response.status_code, 200)
infractions = response.json()
@@ -155,8 +161,8 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_filter_before(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5)
- response = self.client.get(f'{url}?type=superstar&expires_before={target_time.isoformat()}')
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5)
+ response = self.client.get(url, {'type': 'superstar', 'expires_before': target_time.isoformat()})
self.assertEqual(response.status_code, 200)
infractions = response.json()
@@ -168,22 +174,23 @@ class InfractionTests(AuthenticatedAPITestCase):
response = self.client.get(f'{url}?expires_after=gibberish')
self.assertEqual(response.status_code, 400)
- self.assertEqual(list(response.json())[0], "expires_after")
+ self.assertEqual(next(iter(response.json())), "expires_after")
def test_filter_before_invalid(self):
url = reverse('api:bot:infraction-list')
response = self.client.get(f'{url}?expires_before=000000000')
self.assertEqual(response.status_code, 400)
- self.assertEqual(list(response.json())[0], "expires_before")
+ self.assertEqual(next(iter(response.json())), "expires_before")
def test_after_before_before(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=4)
- target_time_late = datetime.datetime.utcnow() + datetime.timedelta(hours=6)
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=4)
+ target_time_late = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=6)
response = self.client.get(
- f'{url}?expires_before={target_time_late.isoformat()}'
- f'&expires_after={target_time.isoformat()}'
+ url,
+ {'expires_before': target_time_late.isoformat(),
+ 'expires_after': target_time.isoformat()},
)
self.assertEqual(response.status_code, 200)
@@ -192,11 +199,12 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_after_after_before_invalid(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5)
- target_time_late = datetime.datetime.utcnow() + datetime.timedelta(hours=9)
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5)
+ target_time_late = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=9)
response = self.client.get(
- f'{url}?expires_before={target_time.isoformat()}'
- f'&expires_after={target_time_late.isoformat()}'
+ url,
+ {'expires_before': target_time.isoformat(),
+ 'expires_after': target_time_late.isoformat()},
)
self.assertEqual(response.status_code, 400)
@@ -206,8 +214,11 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_permanent_after_invalid(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5)
- response = self.client.get(f'{url}?permanent=true&expires_after={target_time.isoformat()}')
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5)
+ response = self.client.get(
+ url,
+ {'permanent': 'true', 'expires_after': target_time.isoformat()},
+ )
self.assertEqual(response.status_code, 400)
errors = list(response.json())
@@ -215,8 +226,11 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_permanent_before_invalid(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=5)
- response = self.client.get(f'{url}?permanent=true&expires_before={target_time.isoformat()}')
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=5)
+ response = self.client.get(
+ url,
+ {'permanent': 'true', 'expires_before': target_time.isoformat()},
+ )
self.assertEqual(response.status_code, 400)
errors = list(response.json())
@@ -224,9 +238,10 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_nonpermanent_before(self):
url = reverse('api:bot:infraction-list')
- target_time = datetime.datetime.utcnow() + datetime.timedelta(hours=6)
+ target_time = datetime.datetime.now(tz=UTC) + datetime.timedelta(hours=6)
response = self.client.get(
- f'{url}?permanent=false&expires_before={target_time.isoformat()}'
+ url,
+ {'permanent': 'false', 'expires_before': target_time.isoformat()},
)
self.assertEqual(response.status_code, 200)
@@ -235,7 +250,7 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_filter_manytypes(self):
url = reverse('api:bot:infraction-list')
- response = self.client.get(f'{url}?types=mute,ban')
+ response = self.client.get(f'{url}?types=timeout,ban')
self.assertEqual(response.status_code, 200)
infractions = response.json()
@@ -243,7 +258,7 @@ class InfractionTests(AuthenticatedAPITestCase):
def test_types_type_invalid(self):
url = reverse('api:bot:infraction-list')
- response = self.client.get(f'{url}?types=mute,ban&type=superstar')
+ response = self.client.get(f'{url}?types=timeout,ban&type=superstar')
self.assertEqual(response.status_code, 400)
errors = list(response.json())
@@ -355,7 +370,7 @@ class CreationTests(AuthenticatedAPITestCase):
infraction = Infraction.objects.get(id=response.json()['id'])
self.assertAlmostEqual(
infraction.inserted_at,
- dt.now(timezone.utc),
+ dt.now(UTC),
delta=timedelta(seconds=2)
)
self.assertEqual(infraction.expires_at.isoformat(), data['expires_at'])
@@ -492,6 +507,7 @@ class CreationTests(AuthenticatedAPITestCase):
)
for infraction_type, hidden in restricted_types:
+ # https://stackoverflow.com/a/23326971
with self.subTest(infraction_type=infraction_type):
invalid_infraction = {
'user': self.user.id,
@@ -512,10 +528,10 @@ class CreationTests(AuthenticatedAPITestCase):
def test_returns_400_for_second_active_infraction_of_the_same_type(self):
"""Test if the API rejects a second active infraction of the same type for a given user."""
url = reverse('api:bot:infraction-list')
- active_infraction_types = ('mute', 'ban', 'superstar')
+ active_infraction_types = ('timeout', 'ban', 'superstar')
for infraction_type in active_infraction_types:
- with self.subTest(infraction_type=infraction_type):
+ with self.subTest(infraction_type=infraction_type), transaction.atomic():
first_active_infraction = {
'user': self.user.id,
'actor': self.user.id,
@@ -554,7 +570,7 @@ class CreationTests(AuthenticatedAPITestCase):
first_active_infraction = {
'user': self.user.id,
'actor': self.user.id,
- 'type': 'mute',
+ 'type': 'timeout',
'reason': 'Be silent!',
'hidden': True,
'active': True,
@@ -641,9 +657,9 @@ class CreationTests(AuthenticatedAPITestCase):
Infraction.objects.create(
user=self.user,
actor=self.user,
- type="mute",
+ type="timeout",
active=True,
- reason="The first active mute"
+ reason="The first active timeout"
)
def test_unique_constraint_accepts_active_infractions_for_different_users(self):
@@ -798,7 +814,7 @@ class SerializerTests(AuthenticatedAPITestCase):
actor_id=self.user.id,
type=_type,
reason='A reason.',
- expires_at=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc),
+ expires_at=dt(5018, 11, 20, 15, 52, tzinfo=UTC),
active=active
)
@@ -811,22 +827,6 @@ class SerializerTests(AuthenticatedAPITestCase):
self.assertTrue(serializer.is_valid(), msg=serializer.errors)
- def test_validation_error_if_active_duplicate(self):
- self.create_infraction('ban', active=True)
- instance = self.create_infraction('ban', active=False)
-
- data = {'active': True}
- serializer = InfractionSerializer(instance, data=data, partial=True)
-
- if not serializer.is_valid():
- self.assertIn('non_field_errors', serializer.errors)
-
- code = serializer.errors['non_field_errors'][0].code
- msg = f'Expected failure on unique validator but got {serializer.errors}'
- self.assertEqual(code, 'unique', msg=msg)
- else: # pragma: no cover
- self.fail('Validation unexpectedly succeeded.')
-
def test_is_valid_for_new_active_infraction(self):
self.create_infraction('ban', active=False)
diff --git a/pydis_site/apps/api/tests/test_models.py b/pydis_site/apps/api/tests/test_models.py
index 5c9ddea4..456ac408 100644
--- a/pydis_site/apps/api/tests/test_models.py
+++ b/pydis_site/apps/api/tests/test_models.py
@@ -1,14 +1,14 @@
-from datetime import datetime as dt
+from datetime import UTC, datetime as dt
from django.core.exceptions import ValidationError
from django.test import SimpleTestCase, TestCase
-from django.utils import timezone
from pydis_site.apps.api.models import (
DeletedMessage,
DocumentationLink,
+ Filter,
+ FilterList,
Infraction,
- Message,
MessageDeletionContext,
Nomination,
NominationEntry,
@@ -41,7 +41,7 @@ class NitroMessageLengthTest(TestCase):
self.context = MessageDeletionContext.objects.create(
id=50,
actor=self.user,
- creation=dt.utcnow()
+ creation=dt.now(UTC)
)
def test_create(self):
@@ -99,17 +99,26 @@ class StringDunderMethodTests(SimpleTestCase):
name='shawn',
discriminator=555,
),
- creation=dt.utcnow()
+ creation=dt.now(UTC)
),
embeds=[]
),
DocumentationLink(
'test', 'http://example.com', 'http://example.com'
),
+ FilterList(
+ name="forbidden_duckies",
+ list_type=0,
+ ),
+ Filter(
+ content="ducky_nsfw",
+ description="This ducky is totally inappropriate!",
+ additional_settings=None,
+ ),
OffensiveMessage(
id=602951077675139072,
channel_id=291284109232308226,
- delete_date=dt(3000, 1, 1)
+ delete_date=dt(3000, 1, 1, tzinfo=UTC)
),
OffTopicChannelName(name='bob-the-builders-playground'),
Role(
@@ -117,24 +126,13 @@ class StringDunderMethodTests(SimpleTestCase):
colour=0x5, permissions=0,
position=10,
),
- Message(
- id=45,
- author=User(
- id=444,
- name='bill',
- discriminator=5,
- ),
- channel_id=666,
- content="wooey",
- embeds=[]
- ),
MessageDeletionContext(
actor=User(
id=5555,
name='shawn',
discriminator=555,
),
- creation=dt.utcnow()
+ creation=dt.now(tz=UTC)
),
User(
id=5,
@@ -153,7 +151,7 @@ class StringDunderMethodTests(SimpleTestCase):
hidden=True,
type='kick',
reason='He terk my jerb!',
- expires_at=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc)
+ expires_at=dt(5018, 11, 20, 15, 52, tzinfo=UTC)
),
Reminder(
author=User(
@@ -167,7 +165,7 @@ class StringDunderMethodTests(SimpleTestCase):
'267624335836053506/291284109232308226/463087129459949587'
),
content="oh no",
- expiration=dt(5018, 11, 20, 15, 52, tzinfo=timezone.utc)
+ expiration=dt(5018, 11, 20, 15, 52, tzinfo=UTC)
),
NominationEntry(
nomination_id=self.nomination.id,
diff --git a/pydis_site/apps/api/tests/test_nominations.py b/pydis_site/apps/api/tests/test_nominations.py
index 62b2314c..e4dfe36a 100644
--- a/pydis_site/apps/api/tests/test_nominations.py
+++ b/pydis_site/apps/api/tests/test_nominations.py
@@ -1,9 +1,9 @@
-from datetime import datetime as dt, timedelta, timezone
+from datetime import UTC, datetime as dt, timedelta
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import Nomination, NominationEntry, User
+from pydis_site.apps.api.models import Nomination, NominationEntry, User
class CreationTests(AuthenticatedAPITestCase):
@@ -38,7 +38,7 @@ class CreationTests(AuthenticatedAPITestCase):
)
self.assertAlmostEqual(
nomination.inserted_at,
- dt.now(timezone.utc),
+ dt.now(UTC),
delta=timedelta(seconds=2)
)
self.assertEqual(nomination.user.id, data['user'])
@@ -254,7 +254,7 @@ class NominationTests(AuthenticatedAPITestCase):
def test_returns_400_on_frozen_field_update(self):
url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,))
data = {
- 'user': "Theo Katzman"
+ 'user': 1234
}
response = self.client.patch(url, data=data)
@@ -319,7 +319,7 @@ class NominationTests(AuthenticatedAPITestCase):
self.assertAlmostEqual(
nomination.ended_at,
- dt.now(timezone.utc),
+ dt.now(UTC),
delta=timedelta(seconds=2)
)
self.assertFalse(nomination.active)
@@ -524,3 +524,35 @@ class NominationTests(AuthenticatedAPITestCase):
self.assertEqual(response.json(), {
'actor': ["The actor doesn't exist or has not nominated the user."]
})
+
+ def test_patch_nomination_set_thread_id_of_active_nomination(self):
+ url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,))
+ data = {'thread_id': 9876543210}
+ response = self.client.patch(url, data=data)
+ self.assertEqual(response.status_code, 200)
+
+ def test_patch_nomination_set_thread_id_and_reviewed_of_active_nomination(self):
+ url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,))
+ data = {'thread_id': 9876543210, "reviewed": True}
+ response = self.client.patch(url, data=data)
+ self.assertEqual(response.status_code, 200)
+
+ def test_modifying_thread_id_when_ending_nomination(self):
+ url = reverse('api:bot:nomination-detail', args=(self.active_nomination.id,))
+ data = {'thread_id': 9876543210, 'active': False, 'end_reason': "What?"}
+
+ response = self.client.patch(url, data=data)
+ self.assertEqual(response.status_code, 400)
+ self.assertEqual(response.json(), {
+ 'thread_id': ['This field cannot be set when ending a nomination.']
+ })
+
+ def test_patch_thread_id_for_inactive_nomination(self):
+ url = reverse('api:bot:nomination-detail', args=(self.inactive_nomination.id,))
+ data = {'thread_id': 9876543210}
+
+ response = self.client.patch(url, data=data)
+ self.assertEqual(response.status_code, 400)
+ self.assertEqual(response.json(), {
+ 'thread_id': ['This field cannot be set if the nomination is inactive.']
+ })
diff --git a/pydis_site/apps/api/tests/test_off_topic_channel_names.py b/pydis_site/apps/api/tests/test_off_topic_channel_names.py
index 2d273756..315f707d 100644
--- a/pydis_site/apps/api/tests/test_off_topic_channel_names.py
+++ b/pydis_site/apps/api/tests/test_off_topic_channel_names.py
@@ -1,7 +1,7 @@
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import OffTopicChannelName
+from pydis_site.apps.api.models import OffTopicChannelName
class UnauthenticatedTests(AuthenticatedAPITestCase):
@@ -74,6 +74,9 @@ class ListTests(AuthenticatedAPITestCase):
cls.test_name_3 = OffTopicChannelName.objects.create(
name="frozen-with-iceman", used=True, active=False
)
+ cls.test_name_4 = OffTopicChannelName.objects.create(
+ name="xith-is-cool", used=True, active=True
+ )
def test_returns_name_in_list(self):
"""Return all off-topic channel names."""
@@ -86,28 +89,46 @@ class ListTests(AuthenticatedAPITestCase):
{
self.test_name.name,
self.test_name_2.name,
- self.test_name_3.name
+ self.test_name_3.name,
+ self.test_name_4.name
}
)
- def test_returns_two_items_with_random_items_param_set_to_2(self):
- """Return not-used name instead used."""
+ def test_returns_two_active_items_with_random_items_param_set_to_2(self):
+ """Return not-used active names instead used."""
url = reverse('api:bot:offtopicchannelname-list')
response = self.client.get(f'{url}?random_items=2')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 2)
- self.assertEqual(set(response.json()), {self.test_name.name, self.test_name_2.name})
+ self.assertTrue(
+ all(
+ item in (self.test_name.name, self.test_name_2.name, self.test_name_4.name)
+ for item in response.json()
+ )
+ )
+
+ def test_returns_three_active_items_with_random_items_param_set_to_3(self):
+ """Return not-used active names instead used."""
+ url = reverse('api:bot:offtopicchannelname-list')
+ response = self.client.get(f'{url}?random_items=3')
+
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(len(response.json()), 3)
+ self.assertEqual(
+ set(response.json()),
+ {self.test_name.name, self.test_name_2.name, self.test_name_4.name}
+ )
def test_running_out_of_names_with_random_parameter(self):
- """Reset names `used` parameter to `False` when running out of names."""
+ """Reset names `used` parameter to `False` when running out of active names."""
url = reverse('api:bot:offtopicchannelname-list')
response = self.client.get(f'{url}?random_items=3')
self.assertEqual(response.status_code, 200)
self.assertEqual(
set(response.json()),
- {self.test_name.name, self.test_name_2.name, self.test_name_3.name}
+ {self.test_name.name, self.test_name_2.name, self.test_name_4.name}
)
def test_returns_inactive_ot_names(self):
@@ -129,7 +150,7 @@ class ListTests(AuthenticatedAPITestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(
set(response.json()),
- {self.test_name.name, self.test_name_2.name}
+ {self.test_name.name, self.test_name_2.name, self.test_name_4.name}
)
diff --git a/pydis_site/apps/api/tests/test_offensive_message.py b/pydis_site/apps/api/tests/test_offensive_message.py
index 3cf95b75..2dc60bc3 100644
--- a/pydis_site/apps/api/tests/test_offensive_message.py
+++ b/pydis_site/apps/api/tests/test_offensive_message.py
@@ -3,20 +3,31 @@ import datetime
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import OffensiveMessage
+from pydis_site.apps.api.models import OffensiveMessage
+
+
+def create_offensive_message() -> OffensiveMessage:
+ """Creates and returns an `OffensiveMessgage` record for tests."""
+ delete_at = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(days=1)
+
+ return OffensiveMessage.objects.create(
+ id=602951077675139072,
+ channel_id=291284109232308226,
+ delete_date=delete_at,
+ )
class CreationTests(AuthenticatedAPITestCase):
def test_accept_valid_data(self):
url = reverse('api:bot:offensivemessage-list')
- delete_at = datetime.datetime.now() + datetime.timedelta(days=1)
+ delete_at = datetime.datetime.now() + datetime.timedelta(days=1) # noqa: DTZ005
data = {
'id': '602951077675139072',
'channel_id': '291284109232308226',
'delete_date': delete_at.isoformat()[:-1]
}
- aware_delete_at = delete_at.replace(tzinfo=datetime.timezone.utc)
+ aware_delete_at = delete_at.replace(tzinfo=datetime.UTC)
response = self.client.post(url, data=data)
self.assertEqual(response.status_code, 201)
@@ -32,7 +43,7 @@ class CreationTests(AuthenticatedAPITestCase):
def test_returns_400_on_non_future_date(self):
url = reverse('api:bot:offensivemessage-list')
- delete_at = datetime.datetime.now() - datetime.timedelta(days=1)
+ delete_at = datetime.datetime.now() - datetime.timedelta(days=1) # noqa: DTZ005
data = {
'id': '602951077675139072',
'channel_id': '291284109232308226',
@@ -46,7 +57,7 @@ class CreationTests(AuthenticatedAPITestCase):
def test_returns_400_on_negative_id_or_channel_id(self):
url = reverse('api:bot:offensivemessage-list')
- delete_at = datetime.datetime.now() + datetime.timedelta(days=1)
+ delete_at = datetime.datetime.now() + datetime.timedelta(days=1) # noqa: DTZ005
data = {
'id': '602951077675139072',
'channel_id': '291284109232308226',
@@ -72,8 +83,8 @@ class CreationTests(AuthenticatedAPITestCase):
class ListTests(AuthenticatedAPITestCase):
@classmethod
def setUpTestData(cls):
- delete_at = datetime.datetime.now() + datetime.timedelta(days=1)
- aware_delete_at = delete_at.replace(tzinfo=datetime.timezone.utc)
+ delete_at = datetime.datetime.now() + datetime.timedelta(days=1) # noqa: DTZ005
+ aware_delete_at = delete_at.replace(tzinfo=datetime.UTC)
cls.messages = [
{
@@ -111,13 +122,7 @@ class ListTests(AuthenticatedAPITestCase):
class DeletionTests(AuthenticatedAPITestCase):
@classmethod
def setUpTestData(cls):
- delete_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(days=1)
-
- cls.valid_offensive_message = OffensiveMessage.objects.create(
- id=602951077675139072,
- channel_id=291284109232308226,
- delete_date=delete_at.isoformat()
- )
+ cls.valid_offensive_message = create_offensive_message()
def test_delete_data(self):
url = reverse(
@@ -132,24 +137,53 @@ class DeletionTests(AuthenticatedAPITestCase):
)
-class NotAllowedMethodsTests(AuthenticatedAPITestCase):
+class UpdateOffensiveMessageTestCase(AuthenticatedAPITestCase):
@classmethod
def setUpTestData(cls):
- delete_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(days=1)
+ cls.message = create_offensive_message()
+ cls.in_one_week = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(days=7)
+
+ def test_updating_message(self):
+ url = reverse('api:bot:offensivemessage-detail', args=(self.message.id,))
+ data = {'delete_date': self.in_one_week.isoformat()}
+ update_response = self.client.patch(url, data=data)
+ self.assertEqual(update_response.status_code, 200)
- cls.valid_offensive_message = OffensiveMessage.objects.create(
- id=602951077675139072,
- channel_id=291284109232308226,
- delete_date=delete_at.isoformat()
+ self.message.refresh_from_db()
+ self.assertAlmostEqual(
+ self.message.delete_date,
+ self.in_one_week,
+ delta=datetime.timedelta(seconds=1),
)
- def test_returns_405_for_patch_and_put_requests(self):
- url = reverse(
- 'api:bot:offensivemessage-detail', args=(self.valid_offensive_message.id,)
+ def test_updating_write_once_fields(self):
+ """Fields such as the channel ID may not be updated."""
+ url = reverse('api:bot:offensivemessage-detail', args=(self.message.id,))
+ data = {'channel_id': self.message.channel_id + 1}
+ response = self.client.patch(url, data=data)
+ self.assertEqual(response.status_code, 400)
+ self.assertEqual(response.json(), {'channel_id': ["This field cannot be updated."]})
+
+ def test_updating_nonexistent_message(self):
+ url = reverse('api:bot:offensivemessage-detail', args=(self.message.id + 1,))
+ data = {'delete_date': self.in_one_week}
+
+ response = self.client.patch(url, data=data)
+ self.assertEqual(response.status_code, 404)
+ self.message.refresh_from_db()
+ self.assertNotAlmostEqual(
+ self.message.delete_date,
+ self.in_one_week,
+ delta=datetime.timedelta(seconds=1),
)
- not_allowed_methods = (self.client.patch, self.client.put)
- for method in not_allowed_methods:
- with self.subTest(method=method):
- response = method(url, {})
- self.assertEqual(response.status_code, 405)
+
+class NotAllowedMethodsTests(AuthenticatedAPITestCase):
+ @classmethod
+ def setUpTestData(cls):
+ cls.message = create_offensive_message()
+
+ def test_returns_405_for_get(self):
+ url = reverse('api:bot:offensivemessage-detail', args=(self.message.id,))
+ response = self.client.get(url)
+ self.assertEqual(response.status_code, 405)
diff --git a/pydis_site/apps/api/tests/test_reminders.py b/pydis_site/apps/api/tests/test_reminders.py
index 709685bc..98e93bb7 100644
--- a/pydis_site/apps/api/tests/test_reminders.py
+++ b/pydis_site/apps/api/tests/test_reminders.py
@@ -1,10 +1,10 @@
-from datetime import datetime
+from datetime import UTC, datetime
from django.forms.models import model_to_dict
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import Reminder, User
+from pydis_site.apps.api.models import Reminder, User
class UnauthedReminderAPITests(AuthenticatedAPITestCase):
@@ -59,7 +59,7 @@ class ReminderCreationTests(AuthenticatedAPITestCase):
data = {
'author': self.author.id,
'content': 'Remember to...wait what was it again?',
- 'expiration': datetime.utcnow().isoformat(),
+ 'expiration': datetime.now(tz=UTC).isoformat(),
'jump_url': "https://www.google.com",
'channel_id': 123,
'mentions': [8888, 9999],
@@ -91,7 +91,7 @@ class ReminderDeletionTests(AuthenticatedAPITestCase):
cls.reminder = Reminder.objects.create(
author=cls.author,
content="Don't forget to set yourself a reminder",
- expiration=datetime.utcnow().isoformat(),
+ expiration=datetime.now(UTC),
jump_url="https://www.decliningmentalfaculties.com",
channel_id=123
)
@@ -122,7 +122,7 @@ class ReminderListTests(AuthenticatedAPITestCase):
cls.reminder_one = Reminder.objects.create(
author=cls.author,
content="We should take Bikini Bottom, and push it somewhere else!",
- expiration=datetime.utcnow().isoformat(),
+ expiration=datetime.now(UTC),
jump_url="https://www.icantseemyforehead.com",
channel_id=123
)
@@ -130,16 +130,17 @@ class ReminderListTests(AuthenticatedAPITestCase):
cls.reminder_two = Reminder.objects.create(
author=cls.author,
content="Gahhh-I love being purple!",
- expiration=datetime.utcnow().isoformat(),
+ expiration=datetime.now(UTC),
jump_url="https://www.goofygoobersicecreampartyboat.com",
channel_id=123,
active=False
)
+ drf_format = '%Y-%m-%dT%H:%M:%S.%fZ'
cls.rem_dict_one = model_to_dict(cls.reminder_one)
- cls.rem_dict_one['expiration'] += 'Z' # Massaging a quirk of the response time format
+ cls.rem_dict_one['expiration'] = cls.rem_dict_one['expiration'].strftime(drf_format)
cls.rem_dict_two = model_to_dict(cls.reminder_two)
- cls.rem_dict_two['expiration'] += 'Z' # Massaging a quirk of the response time format
+ cls.rem_dict_two['expiration'] = cls.rem_dict_two['expiration'].strftime(drf_format)
def test_reminders_in_full_list(self):
url = reverse('api:bot:reminder-list')
@@ -175,7 +176,7 @@ class ReminderRetrieveTests(AuthenticatedAPITestCase):
cls.reminder = Reminder.objects.create(
author=cls.author,
content="Reminder content",
- expiration=datetime.utcnow().isoformat(),
+ expiration=datetime.now(UTC),
jump_url="http://example.com/",
channel_id=123
)
@@ -203,7 +204,7 @@ class ReminderUpdateTests(AuthenticatedAPITestCase):
cls.reminder = Reminder.objects.create(
author=cls.author,
content="Squash those do-gooders",
- expiration=datetime.utcnow().isoformat(),
+ expiration=datetime.now(UTC),
jump_url="https://www.decliningmentalfaculties.com",
channel_id=123
)
diff --git a/pydis_site/apps/api/tests/test_roles.py b/pydis_site/apps/api/tests/test_roles.py
index 73c80c77..d3031990 100644
--- a/pydis_site/apps/api/tests/test_roles.py
+++ b/pydis_site/apps/api/tests/test_roles.py
@@ -1,7 +1,7 @@
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import Role, User
+from pydis_site.apps.api.models import Role, User
class CreationTests(AuthenticatedAPITestCase):
diff --git a/pydis_site/apps/api/tests/test_rules.py b/pydis_site/apps/api/tests/test_rules.py
index d08c5fae..14412b90 100644
--- a/pydis_site/apps/api/tests/test_rules.py
+++ b/pydis_site/apps/api/tests/test_rules.py
@@ -1,7 +1,11 @@
+import itertools
+import re
+from pathlib import Path
+
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..views import RulesView
+from pydis_site.apps.api.views import RulesView
class RuleAPITests(AuthenticatedAPITestCase):
@@ -33,3 +37,37 @@ class RuleAPITests(AuthenticatedAPITestCase):
url = reverse('api:rules')
response = self.client.get(url + '?link_format=unknown')
self.assertEqual(response.status_code, 400)
+
+
+class RuleCorrectnessTests(AuthenticatedAPITestCase):
+ """Verifies that the rules from the API and by the static rules in the content app match."""
+
+ @classmethod
+ def setUpTestData(cls):
+ cls.markdown_rule_re = re.compile(r'^> \d+\. (.*)$')
+
+ def test_rules_in_markdown_file_roughly_equal_api_rules(self) -> None:
+ url = reverse('api:rules')
+ api_response = self.client.get(url + '?link_format=md')
+ api_rules = tuple(rule for (rule, _tags) in api_response.json())
+
+ markdown_rules_path = (
+ Path(__file__).parent.parent.parent / 'content' / 'resources' / 'rules.md'
+ )
+
+ markdown_rules = []
+ for line in markdown_rules_path.read_text(encoding="utf8").splitlines():
+ matches = self.markdown_rule_re.match(line)
+ if matches is not None:
+ markdown_rules.append(matches.group(1))
+
+ zipper = itertools.zip_longest(api_rules, markdown_rules)
+ for idx, (api_rule, markdown_rule) in enumerate(zipper):
+ with self.subTest(f"Rule {idx}"):
+ self.assertIsNotNone(
+ markdown_rule, f"The API has more rules than {markdown_rules_path}"
+ )
+ self.assertIsNotNone(
+ api_rule, f"{markdown_rules_path} has more rules than the API endpoint"
+ )
+ self.assertEqual(markdown_rule, api_rule)
diff --git a/pydis_site/apps/api/tests/test_users.py b/pydis_site/apps/api/tests/test_users.py
index e21bb32b..cff4a825 100644
--- a/pydis_site/apps/api/tests/test_users.py
+++ b/pydis_site/apps/api/tests/test_users.py
@@ -1,11 +1,12 @@
+import random
from unittest.mock import Mock, patch
from django.urls import reverse
from .base import AuthenticatedAPITestCase
-from ..models import Infraction, Role, User
-from ..models.bot.metricity import NotFoundError
-from ..viewsets.bot.user import UserListPagination
+from pydis_site.apps.api.models import Infraction, Role, User
+from pydis_site.apps.api.models.bot.metricity import NotFoundError
+from pydis_site.apps.api.viewsets.bot.user import UserListPagination
class UnauthedUserAPITests(AuthenticatedAPITestCase):
@@ -468,18 +469,17 @@ class UserMetricityTests(AuthenticatedAPITestCase):
with self.subTest(
voice_infractions=case['voice_infractions'],
voice_gate_blocked=case['voice_gate_blocked']
- ):
- with patch("pydis_site.apps.api.viewsets.bot.user.Infraction.objects.filter") as p:
- p.return_value = case['voice_infractions']
+ ), patch("pydis_site.apps.api.viewsets.bot.user.Infraction.objects.filter") as p:
+ p.return_value = case['voice_infractions']
- url = reverse('api:bot:user-metricity-data', args=[0])
- response = self.client.get(url)
+ url = reverse('api:bot:user-metricity-data', args=[0])
+ response = self.client.get(url)
- self.assertEqual(response.status_code, 200)
- self.assertEqual(
- response.json()["voice_gate_blocked"],
- case["voice_gate_blocked"]
- )
+ self.assertEqual(response.status_code, 200)
+ self.assertEqual(
+ response.json()["voice_gate_blocked"],
+ case["voice_gate_blocked"]
+ )
def test_metricity_review_data(self):
# Given
@@ -501,6 +501,90 @@ class UserMetricityTests(AuthenticatedAPITestCase):
"total_messages": total_messages
})
+ def test_metricity_activity_data(self):
+ # Given
+ self.mock_no_metricity_user() # Other functions shouldn't be used.
+ self.metricity.total_messages_in_past_n_days.return_value = [(0, 10)]
+
+ # When
+ url = reverse("api:bot:user-metricity-activity-data")
+ response = self.client.post(
+ url,
+ data=[0, 1],
+ QUERY_STRING="days=10",
+ )
+
+ # Then
+ self.assertEqual(response.status_code, 200)
+ self.metricity.total_messages_in_past_n_days.assert_called_once_with(["0", "1"], 10)
+ self.assertEqual(response.json(), {"0": 10, "1": 0})
+
+ def test_metricity_activity_data_invalid_days(self):
+ # Given
+ self.mock_no_metricity_user() # Other functions shouldn't be used.
+
+ # When
+ url = reverse("api:bot:user-metricity-activity-data")
+ response = self.client.post(
+ url,
+ data=[0, 1],
+ QUERY_STRING="days=fifty",
+ )
+
+ # Then
+ self.assertEqual(response.status_code, 400)
+ self.metricity.total_messages_in_past_n_days.assert_not_called()
+ self.assertEqual(response.json(), {"days": ["This query parameter must be an integer."]})
+
+ def test_metricity_activity_data_no_days(self):
+ # Given
+ self.mock_no_metricity_user() # Other functions shouldn't be used.
+
+ # When
+ url = reverse('api:bot:user-metricity-activity-data')
+ response = self.client.post(
+ url,
+ data=[0, 1],
+ )
+
+ # Then
+ self.assertEqual(response.status_code, 400)
+ self.metricity.total_messages_in_past_n_days.assert_not_called()
+ self.assertEqual(response.json(), {'days': ["This query parameter is required."]})
+
+ def test_metricity_activity_data_no_users(self):
+ # Given
+ self.mock_no_metricity_user() # Other functions shouldn't be used.
+
+ # When
+ url = reverse('api:bot:user-metricity-activity-data')
+ response = self.client.post(
+ url,
+ QUERY_STRING="days=10",
+ )
+
+ # Then
+ self.assertEqual(response.status_code, 400)
+ self.metricity.total_messages_in_past_n_days.assert_not_called()
+ self.assertEqual(response.json(), ['Expected a list of items but got type "dict".'])
+
+ def test_metricity_activity_data_invalid_users(self):
+ # Given
+ self.mock_no_metricity_user() # Other functions shouldn't be used.
+
+ # When
+ url = reverse('api:bot:user-metricity-activity-data')
+ response = self.client.post(
+ url,
+ data=[123, 'username'],
+ QUERY_STRING="days=10",
+ )
+
+ # Then
+ self.assertEqual(response.status_code, 400)
+ self.metricity.total_messages_in_past_n_days.assert_not_called()
+ self.assertEqual(response.json(), {'1': ['A valid integer is required.']})
+
def mock_metricity_user(self, joined_at, total_messages, total_blocks, top_channel_activity):
patcher = patch("pydis_site.apps.api.viewsets.bot.user.Metricity")
self.metricity = patcher.start()
@@ -520,3 +604,45 @@ class UserMetricityTests(AuthenticatedAPITestCase):
self.metricity.total_messages.side_effect = NotFoundError()
self.metricity.total_message_blocks.side_effect = NotFoundError()
self.metricity.top_channel_activity.side_effect = NotFoundError()
+
+
+class UserViewSetTests(AuthenticatedAPITestCase):
+ @classmethod
+ def setUpTestData(cls):
+ cls.searched_user = User.objects.create(
+ id=12095219,
+ name=f"Test user {random.randint(100, 1000)}",
+ discriminator=random.randint(1, 9999),
+ in_guild=True,
+ )
+ cls.other_user = User.objects.create(
+ id=18259125,
+ name=f"Test user {random.randint(100, 1000)}",
+ discriminator=random.randint(1, 9999),
+ in_guild=True,
+ )
+
+ def test_search_lookup_of_wanted_user(self) -> None:
+ """Searching a user by name and discriminator should return that user."""
+ url = reverse('api:bot:user-list')
+ params = {
+ 'username': self.searched_user.name,
+ 'discriminator': self.searched_user.discriminator,
+ }
+ response = self.client.get(url, params)
+ result = response.json()
+ self.assertEqual(result['count'], 1)
+ [user] = result['results']
+ self.assertEqual(user['id'], self.searched_user.id)
+
+ def test_search_lookup_of_unknown_user(self) -> None:
+ """Searching an unknown user should return no results."""
+ url = reverse('api:bot:user-list')
+ params = {
+ 'username': "f-string enjoyer",
+ 'discriminator': 1245,
+ }
+ response = self.client.get(url, params)
+ result = response.json()
+ self.assertEqual(result['count'], 0)
+ self.assertEqual(result['results'], [])
diff --git a/pydis_site/apps/api/tests/test_validators.py b/pydis_site/apps/api/tests/test_validators.py
index 551cc2aa..abff8f55 100644
--- a/pydis_site/apps/api/tests/test_validators.py
+++ b/pydis_site/apps/api/tests/test_validators.py
@@ -1,11 +1,10 @@
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from django.core.exceptions import ValidationError
from django.test import TestCase
-from ..models.bot.bot_setting import validate_bot_setting_name
-from ..models.bot.offensive_message import future_date_validator
-from ..models.utils import validate_embed
+from pydis_site.apps.api.models.bot.bot_setting import validate_bot_setting_name
+from pydis_site.apps.api.models.bot.offensive_message import future_date_validator
REQUIRED_KEYS = (
@@ -22,238 +21,10 @@ class BotSettingValidatorTests(TestCase):
validate_bot_setting_name('bad name')
-class TagEmbedValidatorTests(TestCase):
- def test_rejects_non_mapping(self):
- with self.assertRaises(ValidationError):
- validate_embed('non-empty non-mapping')
-
- def test_rejects_missing_required_keys(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'unknown': "key"
- })
-
- def test_rejects_one_correct_one_incorrect(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'provider': "??",
- 'title': ""
- })
-
- def test_rejects_empty_required_key(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': ''
- })
-
- def test_rejects_list_as_embed(self):
- with self.assertRaises(ValidationError):
- validate_embed([])
-
- def test_rejects_required_keys_and_unknown_keys(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "the duck walked up to the lemonade stand",
- 'and': "he said to the man running the stand"
- })
-
- def test_rejects_too_long_title(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': 'a' * 257
- })
-
- def test_rejects_too_many_fields(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': [{} for _ in range(26)]
- })
-
- def test_rejects_too_long_description(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'description': 'd' * 4097
- })
-
- def test_allows_valid_embed(self):
- validate_embed({
- 'title': "My embed",
- 'description': "look at my embed, my embed is amazing"
- })
-
- def test_allows_unvalidated_fields(self):
- validate_embed({
- 'title': "My embed",
- 'provider': "what am I??"
- })
-
- def test_rejects_fields_as_list_of_non_mappings(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': ['abc']
- })
-
- def test_rejects_fields_with_unknown_fields(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': [
- {
- 'what': "is this field"
- }
- ]
- })
-
- def test_rejects_fields_with_too_long_name(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': [
- {
- 'name': "a" * 257
- }
- ]
- })
-
- def test_rejects_one_correct_one_incorrect_field(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': [
- {
- 'name': "Totally valid",
- 'value': "LOOK AT ME"
- },
- {
- 'name': "Totally valid",
- 'value': "LOOK AT ME",
- 'oh': "what is this key?"
- }
- ]
- })
-
- def test_rejects_missing_required_field_field(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': [
- {
- 'name': "Totally valid",
- 'inline': True,
- }
- ]
- })
-
- def test_rejects_invalid_inline_field_field(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'fields': [
- {
- 'name': "Totally valid",
- 'value': "LOOK AT ME",
- 'inline': "Totally not a boolean",
- }
- ]
- })
-
- def test_allows_valid_fields(self):
- validate_embed({
- 'fields': [
- {
- 'name': "valid",
- 'value': "field",
- },
- {
- 'name': "valid",
- 'value': "field",
- 'inline': False,
- },
- {
- 'name': "valid",
- 'value': "field",
- 'inline': True,
- },
- ]
- })
-
- def test_rejects_footer_as_non_mapping(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'footer': []
- })
-
- def test_rejects_footer_with_unknown_fields(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'footer': {
- 'duck': "quack"
- }
- })
-
- def test_rejects_footer_with_empty_text(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'footer': {
- 'text': ""
- }
- })
-
- def test_allows_footer_with_proper_values(self):
- validate_embed({
- 'title': "whatever",
- 'footer': {
- 'text': "django good"
- }
- })
-
- def test_rejects_author_as_non_mapping(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'author': []
- })
-
- def test_rejects_author_with_unknown_field(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'author': {
- 'field': "that is unknown"
- }
- })
-
- def test_rejects_author_with_empty_name(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'author': {
- 'name': ""
- }
- })
-
- def test_rejects_author_with_one_correct_one_incorrect(self):
- with self.assertRaises(ValidationError):
- validate_embed({
- 'title': "whatever",
- 'author': {
- # Relies on "dictionary insertion order remembering" (D.I.O.R.) behaviour
- 'url': "bobswebsite.com",
- 'name': ""
- }
- })
-
- def test_allows_author_with_proper_values(self):
- validate_embed({
- 'title': "whatever",
- 'author': {
- 'name': "Bob"
- }
- })
-
-
class OffensiveMessageValidatorsTests(TestCase):
def test_accepts_future_date(self):
- future_date_validator(datetime(3000, 1, 1, tzinfo=timezone.utc))
+ future_date_validator(datetime(3000, 1, 1, tzinfo=UTC))
def test_rejects_non_future_date(self):
with self.assertRaises(ValidationError):
- future_date_validator(datetime(1000, 1, 1, tzinfo=timezone.utc))
+ future_date_validator(datetime(1000, 1, 1, tzinfo=UTC))