diff options
Diffstat (limited to 'backend/models/dtos')
-rw-r--r-- | backend/models/dtos/__init__.py | 19 | ||||
-rw-r--r-- | backend/models/dtos/antispam.py | 9 | ||||
-rw-r--r-- | backend/models/dtos/discord_role.py | 38 | ||||
-rw-r--r-- | backend/models/dtos/discord_user.py | 54 | ||||
-rw-r--r-- | backend/models/dtos/form.py | 164 | ||||
-rw-r--r-- | backend/models/dtos/form_response.py | 37 | ||||
-rw-r--r-- | backend/models/dtos/question.py | 83 |
7 files changed, 404 insertions, 0 deletions
diff --git a/backend/models/dtos/__init__.py b/backend/models/dtos/__init__.py new file mode 100644 index 0000000..336e28b --- /dev/null +++ b/backend/models/dtos/__init__.py @@ -0,0 +1,19 @@ +from .antispam import AntiSpam +from .discord_role import DiscordRole +from .discord_user import DiscordMember, DiscordUser +from .form import Form, FormList +from .form_response import FormResponse, ResponseList +from .question import CodeQuestion, Question + +__all__ = [ + "AntiSpam", + "CodeQuestion", + "DiscordMember", + "DiscordRole", + "DiscordUser", + "Form", + "FormList", + "FormResponse", + "Question", + "ResponseList", +] diff --git a/backend/models/dtos/antispam.py b/backend/models/dtos/antispam.py new file mode 100644 index 0000000..b596d4d --- /dev/null +++ b/backend/models/dtos/antispam.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel + + +class AntiSpam(BaseModel): + """Schema model for form response antispam field.""" + + ip_hash: str + user_agent_hash: str + captcha_pass: bool diff --git a/backend/models/dtos/discord_role.py b/backend/models/dtos/discord_role.py new file mode 100644 index 0000000..195f557 --- /dev/null +++ b/backend/models/dtos/discord_role.py @@ -0,0 +1,38 @@ +from pydantic import BaseModel + + +class RoleTags(BaseModel): + """Meta information about a discord role.""" + + bot_id: str | None + integration_id: str | None + premium_subscriber: bool + + def __init__(self, **data) -> None: + """ + Handle the terrible discord API. + + Discord only returns the premium_subscriber field if it's true, + meaning the typical validation process wouldn't work. + + We manually parse the raw data to determine if the field exists, and give it a useful + bool value. + """ + data["premium_subscriber"] = "premium_subscriber" in data + super().__init__(**data) + + +class DiscordRole(BaseModel): + """Schema model of Discord guild roles.""" + + id: str + name: str + color: int + hoist: bool + icon: str | None + unicode_emoji: str | None + position: int + permissions: str + managed: bool + mentionable: bool + tags: RoleTags | None diff --git a/backend/models/dtos/discord_user.py b/backend/models/dtos/discord_user.py new file mode 100644 index 0000000..be10672 --- /dev/null +++ b/backend/models/dtos/discord_user.py @@ -0,0 +1,54 @@ +import datetime +import typing as t + +from pydantic import BaseModel + + +class _User(BaseModel): + """Base for discord users and members.""" + + # Discord default fields. + username: str + id: str + discriminator: str + avatar: str | None + bot: bool | None + system: bool | None + locale: str | None + verified: bool | None + email: str | None + flags: int | None + premium_type: int | None + public_flags: int | None + + +class DiscordUser(_User): + """Schema model of Discord user for form response.""" + + # Custom fields + admin: bool + + +class DiscordMember(BaseModel): + """A discord guild member.""" + + user: _User + nick: str | None + avatar: str | None + roles: list[str] + joined_at: datetime.datetime + premium_since: datetime.datetime | None + deaf: bool + mute: bool + pending: bool | None + permissions: str | None + communication_disabled_until: datetime.datetime | None + + def dict(self, *args, **kwargs) -> dict[str, t.Any]: + """Convert the model to a python dict, and encode timestamps in a serializable format.""" + data = super().dict(*args, **kwargs) + for field, value in data.items(): + if isinstance(value, datetime.datetime): + data[field] = value.isoformat() + + return data diff --git a/backend/models/dtos/form.py b/backend/models/dtos/form.py new file mode 100644 index 0000000..739464e --- /dev/null +++ b/backend/models/dtos/form.py @@ -0,0 +1,164 @@ +import typing as t + +import httpx +from pydantic import BaseModel, Field, constr, root_validator, validator +from pydantic.error_wrappers import ErrorWrapper, ValidationError + +from backend.constants import DISCORD_GUILD, FormFeatures, WebHook + +from .question import Question + +PUBLIC_FIELDS = [ + "id", + "features", + "questions", + "name", + "description", + "submitted_text", + "discord_role", +] + + +class _WebHook(BaseModel): + """Schema model of discord webhooks.""" + + url: str + message: str | None + + @validator("url") + def validate_url(cls, url: str) -> str: + """Validates URL parameter.""" + if "discord.com/api/webhooks/" not in url: + msg = "URL must be a discord webhook." + raise ValueError(msg) + + return url + + +class Form(BaseModel): + """Schema model for form.""" + + id: constr(to_lower=True) = Field(alias="_id") + features: list[str] + questions: list[Question] + name: str + description: str + submitted_text: str | None = None + webhook: _WebHook = None + discord_role: str | None + response_readers: list[str] | None + editors: list[str] | None + + class Config: + allow_population_by_field_name = True + + @validator("features") + def validate_features(cls, value: list[str]) -> list[str]: + """Validates is all features in allowed list.""" + # Uppercase everything to avoid mixed case in DB + value = [v.upper() for v in value] + allowed_values = [v.value for v in FormFeatures.__members__.values()] + if any(v not in allowed_values for v in value): + msg = "Form features list contains one or more invalid values." + raise ValueError(msg) + + if FormFeatures.REQUIRES_LOGIN.value not in value: + if FormFeatures.COLLECT_EMAIL.value in value: + msg = "COLLECT_EMAIL feature require REQUIRES_LOGIN feature." + raise ValueError(msg) + + if FormFeatures.ASSIGN_ROLE.value in value: + msg = "ASSIGN_ROLE feature require REQUIRES_LOGIN feature." + raise ValueError(msg) + + return value + + @validator("response_readers", "editors") + def validate_role_scoping(cls, value: list[str] | None) -> list[str]: + """Ensure special role based permissions aren't granted to the @everyone role.""" + if value and DISCORD_GUILD in value: + msg = "You can not add the everyone role as an access scope." + raise ValueError(msg) + return value + + @root_validator + def validate_role(cls, values: dict[str, t.Any]) -> dict[str, t.Any]: + """Validates does Discord role provided when flag provided.""" + is_role_assigner = FormFeatures.ASSIGN_ROLE.value in values.get("features", []) + if is_role_assigner and not values.get("discord_role"): + msg = "discord_role field is required when ASSIGN_ROLE flag is provided." + raise ValueError(msg) + + return values + + def dict(self, admin: bool = True, **kwargs) -> dict[str, t.Any]: # noqa: FBT001, FBT002 + """Wrapper for original function to exclude private data for public access.""" + data = super().dict(**kwargs) + if admin: + return data + + returned_data = {} + + for field in PUBLIC_FIELDS: + fetch_field = "_id" if field == "id" and kwargs.get("by_alias") else field + returned_data[field] = data[fetch_field] + + # Replace the unittest data section of code questions with the number of test cases. + for question in returned_data["questions"]: + if question["type"] == "code" and question["data"]["unittests"] is not None: + question["data"]["unittests"]["tests"] = len(question["data"]["unittests"]["tests"]) + return returned_data + + +class FormList(BaseModel): + __root__: list[Form] + + +async def validate_hook_url(url: str) -> ValidationError | None: + """Validator for discord webhook urls.""" + + async def validate() -> str | None: + if not isinstance(url, str): + msg = "Webhook URL must be a string." + raise TypeError(msg) + + if "discord.com/api/webhooks/" not in url: + msg = "URL must be a discord webhook." + raise ValueError(msg) + + try: + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + + except httpx.RequestError as error: + # Catch exceptions in request format + msg = f"Encountered error while trying to connect to url: `{error}`" + raise ValueError(msg) + + except httpx.HTTPStatusError as error: + # Catch exceptions in response + status = error.response.status_code + + if status == 401: + msg = "Could not authenticate with target. Please check the webhook url." + raise ValueError(msg) + if status == 404: + msg = "Target could not find webhook url. Please check the webhook url." + raise ValueError(msg) + + msg = f"Unknown error ({status}) while connecting to target: {error}" + raise ValueError(msg) + + return url + + # Validate, and return errors, if any + try: + await validate() + except Exception as e: # noqa: BLE001 + loc = ( + WebHook.__name__.lower(), + WebHook.URL.value, + ) + + return ValidationError([ErrorWrapper(e, loc=loc)], _WebHook) diff --git a/backend/models/dtos/form_response.py b/backend/models/dtos/form_response.py new file mode 100644 index 0000000..3c8297b --- /dev/null +++ b/backend/models/dtos/form_response.py @@ -0,0 +1,37 @@ +import datetime +import typing as t + +from pydantic import BaseModel, Field, validator + +from .antispam import AntiSpam +from .discord_user import DiscordUser + + +class FormResponse(BaseModel): + """Schema model for form response.""" + + id: str = Field(alias="_id") + user: DiscordUser | None + antispam: AntiSpam | None + response: dict[str, t.Any] + form_id: str + timestamp: str + + @validator("timestamp", pre=True) + def set_timestamp(cls, iso_string: str | None) -> str: + if iso_string is None: + return datetime.datetime.now(tz=datetime.UTC).isoformat() + + if not isinstance(iso_string, str): + msg = "Submission timestamp must be a string." + raise TypeError(msg) + + # Convert to datetime and back to ensure string is valid + return datetime.datetime.fromisoformat(iso_string).isoformat() + + class Config: + allow_population_by_field_name = True + + +class ResponseList(BaseModel): + __root__: list[FormResponse] diff --git a/backend/models/dtos/question.py b/backend/models/dtos/question.py new file mode 100644 index 0000000..a13ce93 --- /dev/null +++ b/backend/models/dtos/question.py @@ -0,0 +1,83 @@ +import typing as t + +from pydantic import BaseModel, Field, root_validator, validator + +from backend.constants import QUESTION_TYPES, REQUIRED_QUESTION_TYPE_DATA + +_TESTS_TYPE = dict[str, str] | int + + +class Unittests(BaseModel): + """Schema model for unittest suites in code questions.""" + + allow_failure: bool = False + tests: _TESTS_TYPE + + @validator("tests") + def validate_tests(cls, value: _TESTS_TYPE) -> _TESTS_TYPE: + """Confirm that at least one test exists in a test suite.""" + if isinstance(value, dict): + keys = len(value.keys()) - (1 if "setUp" in value else 0) + if keys == 0: + msg = "Must have at least one test in a test suite." + raise ValueError(msg) + + return value + + +class CodeQuestion(BaseModel): + """Schema model for questions of type `code`.""" + + language: str + unittests: Unittests | None + + +class Question(BaseModel): + """Schema model for form question.""" + + id: str = Field(alias="_id") + name: str + type: str + data: dict[str, t.Any] + required: bool + + class Config: + allow_population_by_field_name = True + + @validator("type", pre=True) + def validate_question_type(cls, value: str) -> str: + """Checks if question type in currently allowed types list.""" + value = value.lower() + if value not in QUESTION_TYPES: + msg = f"{value} is not valid question type. Allowed question types: {QUESTION_TYPES}." + raise ValueError(msg) + + return value + + @root_validator + def validate_question_data( + cls, + value: dict[str, t.Any], + ) -> dict[str, t.Any]: + """Check does required data exists for question type and remove other data.""" + # When question type don't need data, don't add anything to keep DB clean. + if value.get("type") not in REQUIRED_QUESTION_TYPE_DATA: + return value + + for key, data_type in REQUIRED_QUESTION_TYPE_DATA[value["type"]].items(): + if key not in value.get("data", {}): + msg = f"Required question data key '{key}' not provided." + raise ValueError(msg) + + if not isinstance(value["data"][key], data_type): + msg = ( + f"Question data key '{key}' expects {data_type.__name__}, " + f"got {type(value["data"][key]).__name__} instead." + ) + raise TypeError(msg) + + # Validate unittest options + if value.get("type").lower() == "code": + value["data"] = CodeQuestion(**value.get("data")).dict() + + return value |