aboutsummaryrefslogtreecommitdiffstats
path: root/backend/models/dtos
diff options
context:
space:
mode:
Diffstat (limited to 'backend/models/dtos')
-rw-r--r--backend/models/dtos/__init__.py19
-rw-r--r--backend/models/dtos/antispam.py9
-rw-r--r--backend/models/dtos/discord_role.py38
-rw-r--r--backend/models/dtos/discord_user.py54
-rw-r--r--backend/models/dtos/form.py164
-rw-r--r--backend/models/dtos/form_response.py37
-rw-r--r--backend/models/dtos/question.py83
7 files changed, 404 insertions, 0 deletions
diff --git a/backend/models/dtos/__init__.py b/backend/models/dtos/__init__.py
new file mode 100644
index 0000000..336e28b
--- /dev/null
+++ b/backend/models/dtos/__init__.py
@@ -0,0 +1,19 @@
+from .antispam import AntiSpam
+from .discord_role import DiscordRole
+from .discord_user import DiscordMember, DiscordUser
+from .form import Form, FormList
+from .form_response import FormResponse, ResponseList
+from .question import CodeQuestion, Question
+
+__all__ = [
+ "AntiSpam",
+ "CodeQuestion",
+ "DiscordMember",
+ "DiscordRole",
+ "DiscordUser",
+ "Form",
+ "FormList",
+ "FormResponse",
+ "Question",
+ "ResponseList",
+]
diff --git a/backend/models/dtos/antispam.py b/backend/models/dtos/antispam.py
new file mode 100644
index 0000000..b596d4d
--- /dev/null
+++ b/backend/models/dtos/antispam.py
@@ -0,0 +1,9 @@
+from pydantic import BaseModel
+
+
+class AntiSpam(BaseModel):
+ """Schema model for form response antispam field."""
+
+ ip_hash: str
+ user_agent_hash: str
+ captcha_pass: bool
diff --git a/backend/models/dtos/discord_role.py b/backend/models/dtos/discord_role.py
new file mode 100644
index 0000000..195f557
--- /dev/null
+++ b/backend/models/dtos/discord_role.py
@@ -0,0 +1,38 @@
+from pydantic import BaseModel
+
+
+class RoleTags(BaseModel):
+ """Meta information about a discord role."""
+
+ bot_id: str | None
+ integration_id: str | None
+ premium_subscriber: bool
+
+ def __init__(self, **data) -> None:
+ """
+ Handle the terrible discord API.
+
+ Discord only returns the premium_subscriber field if it's true,
+ meaning the typical validation process wouldn't work.
+
+ We manually parse the raw data to determine if the field exists, and give it a useful
+ bool value.
+ """
+ data["premium_subscriber"] = "premium_subscriber" in data
+ super().__init__(**data)
+
+
+class DiscordRole(BaseModel):
+ """Schema model of Discord guild roles."""
+
+ id: str
+ name: str
+ color: int
+ hoist: bool
+ icon: str | None
+ unicode_emoji: str | None
+ position: int
+ permissions: str
+ managed: bool
+ mentionable: bool
+ tags: RoleTags | None
diff --git a/backend/models/dtos/discord_user.py b/backend/models/dtos/discord_user.py
new file mode 100644
index 0000000..be10672
--- /dev/null
+++ b/backend/models/dtos/discord_user.py
@@ -0,0 +1,54 @@
+import datetime
+import typing as t
+
+from pydantic import BaseModel
+
+
+class _User(BaseModel):
+ """Base for discord users and members."""
+
+ # Discord default fields.
+ username: str
+ id: str
+ discriminator: str
+ avatar: str | None
+ bot: bool | None
+ system: bool | None
+ locale: str | None
+ verified: bool | None
+ email: str | None
+ flags: int | None
+ premium_type: int | None
+ public_flags: int | None
+
+
+class DiscordUser(_User):
+ """Schema model of Discord user for form response."""
+
+ # Custom fields
+ admin: bool
+
+
+class DiscordMember(BaseModel):
+ """A discord guild member."""
+
+ user: _User
+ nick: str | None
+ avatar: str | None
+ roles: list[str]
+ joined_at: datetime.datetime
+ premium_since: datetime.datetime | None
+ deaf: bool
+ mute: bool
+ pending: bool | None
+ permissions: str | None
+ communication_disabled_until: datetime.datetime | None
+
+ def dict(self, *args, **kwargs) -> dict[str, t.Any]:
+ """Convert the model to a python dict, and encode timestamps in a serializable format."""
+ data = super().dict(*args, **kwargs)
+ for field, value in data.items():
+ if isinstance(value, datetime.datetime):
+ data[field] = value.isoformat()
+
+ return data
diff --git a/backend/models/dtos/form.py b/backend/models/dtos/form.py
new file mode 100644
index 0000000..739464e
--- /dev/null
+++ b/backend/models/dtos/form.py
@@ -0,0 +1,164 @@
+import typing as t
+
+import httpx
+from pydantic import BaseModel, Field, constr, root_validator, validator
+from pydantic.error_wrappers import ErrorWrapper, ValidationError
+
+from backend.constants import DISCORD_GUILD, FormFeatures, WebHook
+
+from .question import Question
+
+PUBLIC_FIELDS = [
+ "id",
+ "features",
+ "questions",
+ "name",
+ "description",
+ "submitted_text",
+ "discord_role",
+]
+
+
+class _WebHook(BaseModel):
+ """Schema model of discord webhooks."""
+
+ url: str
+ message: str | None
+
+ @validator("url")
+ def validate_url(cls, url: str) -> str:
+ """Validates URL parameter."""
+ if "discord.com/api/webhooks/" not in url:
+ msg = "URL must be a discord webhook."
+ raise ValueError(msg)
+
+ return url
+
+
+class Form(BaseModel):
+ """Schema model for form."""
+
+ id: constr(to_lower=True) = Field(alias="_id")
+ features: list[str]
+ questions: list[Question]
+ name: str
+ description: str
+ submitted_text: str | None = None
+ webhook: _WebHook = None
+ discord_role: str | None
+ response_readers: list[str] | None
+ editors: list[str] | None
+
+ class Config:
+ allow_population_by_field_name = True
+
+ @validator("features")
+ def validate_features(cls, value: list[str]) -> list[str]:
+ """Validates is all features in allowed list."""
+ # Uppercase everything to avoid mixed case in DB
+ value = [v.upper() for v in value]
+ allowed_values = [v.value for v in FormFeatures.__members__.values()]
+ if any(v not in allowed_values for v in value):
+ msg = "Form features list contains one or more invalid values."
+ raise ValueError(msg)
+
+ if FormFeatures.REQUIRES_LOGIN.value not in value:
+ if FormFeatures.COLLECT_EMAIL.value in value:
+ msg = "COLLECT_EMAIL feature require REQUIRES_LOGIN feature."
+ raise ValueError(msg)
+
+ if FormFeatures.ASSIGN_ROLE.value in value:
+ msg = "ASSIGN_ROLE feature require REQUIRES_LOGIN feature."
+ raise ValueError(msg)
+
+ return value
+
+ @validator("response_readers", "editors")
+ def validate_role_scoping(cls, value: list[str] | None) -> list[str]:
+ """Ensure special role based permissions aren't granted to the @everyone role."""
+ if value and DISCORD_GUILD in value:
+ msg = "You can not add the everyone role as an access scope."
+ raise ValueError(msg)
+ return value
+
+ @root_validator
+ def validate_role(cls, values: dict[str, t.Any]) -> dict[str, t.Any]:
+ """Validates does Discord role provided when flag provided."""
+ is_role_assigner = FormFeatures.ASSIGN_ROLE.value in values.get("features", [])
+ if is_role_assigner and not values.get("discord_role"):
+ msg = "discord_role field is required when ASSIGN_ROLE flag is provided."
+ raise ValueError(msg)
+
+ return values
+
+ def dict(self, admin: bool = True, **kwargs) -> dict[str, t.Any]: # noqa: FBT001, FBT002
+ """Wrapper for original function to exclude private data for public access."""
+ data = super().dict(**kwargs)
+ if admin:
+ return data
+
+ returned_data = {}
+
+ for field in PUBLIC_FIELDS:
+ fetch_field = "_id" if field == "id" and kwargs.get("by_alias") else field
+ returned_data[field] = data[fetch_field]
+
+ # Replace the unittest data section of code questions with the number of test cases.
+ for question in returned_data["questions"]:
+ if question["type"] == "code" and question["data"]["unittests"] is not None:
+ question["data"]["unittests"]["tests"] = len(question["data"]["unittests"]["tests"])
+ return returned_data
+
+
+class FormList(BaseModel):
+ __root__: list[Form]
+
+
+async def validate_hook_url(url: str) -> ValidationError | None:
+ """Validator for discord webhook urls."""
+
+ async def validate() -> str | None:
+ if not isinstance(url, str):
+ msg = "Webhook URL must be a string."
+ raise TypeError(msg)
+
+ if "discord.com/api/webhooks/" not in url:
+ msg = "URL must be a discord webhook."
+ raise ValueError(msg)
+
+ try:
+ async with httpx.AsyncClient() as client:
+ response = await client.get(url)
+ response.raise_for_status()
+
+ except httpx.RequestError as error:
+ # Catch exceptions in request format
+ msg = f"Encountered error while trying to connect to url: `{error}`"
+ raise ValueError(msg)
+
+ except httpx.HTTPStatusError as error:
+ # Catch exceptions in response
+ status = error.response.status_code
+
+ if status == 401:
+ msg = "Could not authenticate with target. Please check the webhook url."
+ raise ValueError(msg)
+ if status == 404:
+ msg = "Target could not find webhook url. Please check the webhook url."
+ raise ValueError(msg)
+
+ msg = f"Unknown error ({status}) while connecting to target: {error}"
+ raise ValueError(msg)
+
+ return url
+
+ # Validate, and return errors, if any
+ try:
+ await validate()
+ except Exception as e: # noqa: BLE001
+ loc = (
+ WebHook.__name__.lower(),
+ WebHook.URL.value,
+ )
+
+ return ValidationError([ErrorWrapper(e, loc=loc)], _WebHook)
diff --git a/backend/models/dtos/form_response.py b/backend/models/dtos/form_response.py
new file mode 100644
index 0000000..3c8297b
--- /dev/null
+++ b/backend/models/dtos/form_response.py
@@ -0,0 +1,37 @@
+import datetime
+import typing as t
+
+from pydantic import BaseModel, Field, validator
+
+from .antispam import AntiSpam
+from .discord_user import DiscordUser
+
+
+class FormResponse(BaseModel):
+ """Schema model for form response."""
+
+ id: str = Field(alias="_id")
+ user: DiscordUser | None
+ antispam: AntiSpam | None
+ response: dict[str, t.Any]
+ form_id: str
+ timestamp: str
+
+ @validator("timestamp", pre=True)
+ def set_timestamp(cls, iso_string: str | None) -> str:
+ if iso_string is None:
+ return datetime.datetime.now(tz=datetime.UTC).isoformat()
+
+ if not isinstance(iso_string, str):
+ msg = "Submission timestamp must be a string."
+ raise TypeError(msg)
+
+ # Convert to datetime and back to ensure string is valid
+ return datetime.datetime.fromisoformat(iso_string).isoformat()
+
+ class Config:
+ allow_population_by_field_name = True
+
+
+class ResponseList(BaseModel):
+ __root__: list[FormResponse]
diff --git a/backend/models/dtos/question.py b/backend/models/dtos/question.py
new file mode 100644
index 0000000..a13ce93
--- /dev/null
+++ b/backend/models/dtos/question.py
@@ -0,0 +1,83 @@
+import typing as t
+
+from pydantic import BaseModel, Field, root_validator, validator
+
+from backend.constants import QUESTION_TYPES, REQUIRED_QUESTION_TYPE_DATA
+
+_TESTS_TYPE = dict[str, str] | int
+
+
+class Unittests(BaseModel):
+ """Schema model for unittest suites in code questions."""
+
+ allow_failure: bool = False
+ tests: _TESTS_TYPE
+
+ @validator("tests")
+ def validate_tests(cls, value: _TESTS_TYPE) -> _TESTS_TYPE:
+ """Confirm that at least one test exists in a test suite."""
+ if isinstance(value, dict):
+ keys = len(value.keys()) - (1 if "setUp" in value else 0)
+ if keys == 0:
+ msg = "Must have at least one test in a test suite."
+ raise ValueError(msg)
+
+ return value
+
+
+class CodeQuestion(BaseModel):
+ """Schema model for questions of type `code`."""
+
+ language: str
+ unittests: Unittests | None
+
+
+class Question(BaseModel):
+ """Schema model for form question."""
+
+ id: str = Field(alias="_id")
+ name: str
+ type: str
+ data: dict[str, t.Any]
+ required: bool
+
+ class Config:
+ allow_population_by_field_name = True
+
+ @validator("type", pre=True)
+ def validate_question_type(cls, value: str) -> str:
+ """Checks if question type in currently allowed types list."""
+ value = value.lower()
+ if value not in QUESTION_TYPES:
+ msg = f"{value} is not valid question type. Allowed question types: {QUESTION_TYPES}."
+ raise ValueError(msg)
+
+ return value
+
+ @root_validator
+ def validate_question_data(
+ cls,
+ value: dict[str, t.Any],
+ ) -> dict[str, t.Any]:
+ """Check does required data exists for question type and remove other data."""
+ # When question type don't need data, don't add anything to keep DB clean.
+ if value.get("type") not in REQUIRED_QUESTION_TYPE_DATA:
+ return value
+
+ for key, data_type in REQUIRED_QUESTION_TYPE_DATA[value["type"]].items():
+ if key not in value.get("data", {}):
+ msg = f"Required question data key '{key}' not provided."
+ raise ValueError(msg)
+
+ if not isinstance(value["data"][key], data_type):
+ msg = (
+ f"Question data key '{key}' expects {data_type.__name__}, "
+ f"got {type(value["data"][key]).__name__} instead."
+ )
+ raise TypeError(msg)
+
+ # Validate unittest options
+ if value.get("type").lower() == "code":
+ value["data"] = CodeQuestion(**value.get("data")).dict()
+
+ return value