aboutsummaryrefslogtreecommitdiffstats
path: root/backend
diff options
context:
space:
mode:
Diffstat (limited to 'backend')
-rw-r--r--backend/constants.py2
-rw-r--r--backend/models/__init__.py5
-rw-r--r--backend/models/antispam.py10
-rw-r--r--backend/models/discord_user.py24
-rw-r--r--backend/models/form_response.py19
-rw-r--r--backend/routes/forms/new.py5
-rw-r--r--backend/routes/forms/submit.py95
7 files changed, 135 insertions, 25 deletions
diff --git a/backend/constants.py b/backend/constants.py
index 3b8bec8..61519ed 100644
--- a/backend/constants.py
+++ b/backend/constants.py
@@ -18,6 +18,8 @@ OAUTH2_REDIRECT_URI = os.getenv(
SECRET_KEY = os.getenv("SECRET_KEY", binascii.hexlify(os.urandom(30)).decode())
+HCAPTCHA_API_SECRET = os.getenv("HCAPTCHA_API_SECRET")
+
QUESTION_TYPES = [
"radio",
"checkbox",
diff --git a/backend/models/__init__.py b/backend/models/__init__.py
index 80abf6f..98fa619 100644
--- a/backend/models/__init__.py
+++ b/backend/models/__init__.py
@@ -1,4 +1,7 @@
+from .antispam import AntiSpam
+from .discord_user import DiscordUser
from .form import Form
+from .form_response import FormResponse
from .question import Question
-__all__ = ["Form", "Question"]
+__all__ = ["AntiSpam", "DiscordUser", "Form", "FormResponse", "Question"]
diff --git a/backend/models/antispam.py b/backend/models/antispam.py
new file mode 100644
index 0000000..b16f686
--- /dev/null
+++ b/backend/models/antispam.py
@@ -0,0 +1,10 @@
+from pydantic import BaseModel
+
+
+class AntiSpam(BaseModel):
+ """Schema model for form response antispam field."""
+
+ ip_hash: str
+ user_agent_hash: str
+ captcha_pass: bool
+ dns_blacklisted: bool
diff --git a/backend/models/discord_user.py b/backend/models/discord_user.py
new file mode 100644
index 0000000..e835176
--- /dev/null
+++ b/backend/models/discord_user.py
@@ -0,0 +1,24 @@
+import typing as t
+
+from pydantic import BaseModel
+
+
+class DiscordUser(BaseModel):
+ """Schema model of Discord user for form response."""
+
+ # Discord default fields
+ id: int # This is actually snowflake, but we simplify it here
+ username: str
+ discriminator: str
+ avatar: t.Optional[str]
+ bot: t.Optional[bool]
+ system: t.Optional[bool]
+ locale: t.Optional[str]
+ verified: t.Optional[bool]
+ email: t.Optional[str]
+ flags: t.Optional[int]
+ premium_type: t.Optional[int]
+ public_flags: t.Optional[int]
+
+ # Custom fields
+ admin: bool
diff --git a/backend/models/form_response.py b/backend/models/form_response.py
new file mode 100644
index 0000000..bea070f
--- /dev/null
+++ b/backend/models/form_response.py
@@ -0,0 +1,19 @@
+import typing as t
+
+from pydantic import BaseModel, Field
+
+from .antispam import AntiSpam
+from .discord_user import DiscordUser
+
+
+class FormResponse(BaseModel):
+ """Schema model for form response."""
+
+ id: str = Field(alias="_id")
+ user: t.Optional[DiscordUser]
+ antispam: t.Optional[AntiSpam]
+ response: t.Dict[str, t.Any]
+ form_id: str
+
+ class Config:
+ allow_population_by_field_name = True
diff --git a/backend/routes/forms/new.py b/backend/routes/forms/new.py
index ff39f12..6437a4a 100644
--- a/backend/routes/forms/new.py
+++ b/backend/routes/forms/new.py
@@ -26,5 +26,10 @@ class FormCreate(Route):
except ValidationError as e:
return JSONResponse(e.errors())
+ if await request.state.db.forms.find_one({"_id": form.id}):
+ return JSONResponse({
+ "error": "Form with same ID already exists."
+ }, status_code=400)
+
await request.state.db.forms.insert_one(form.dict(by_alias=True))
return JSONResponse(form.dict())
diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py
index a94a1c9..3ecbda0 100644
--- a/backend/routes/forms/submit.py
+++ b/backend/routes/forms/submit.py
@@ -4,15 +4,24 @@ Submit a form.
import binascii
import hashlib
+import uuid
-import jwt
+import httpx
+import pydnsbl
+from pydantic import ValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
-from backend.constants import SECRET_KEY
+from backend.constants import HCAPTCHA_API_SECRET, FormFeatures
+from backend.models import Form, FormResponse
from backend.route import Route
+HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify"
+HCAPTCHA_HEADERS = {
+ "Content-Type": "application/x-www-form-urlencoded"
+}
+
class SubmitForm(Route):
"""
@@ -28,40 +37,78 @@ class SubmitForm(Route):
if form := await request.state.db.forms.find_one(
{"_id": request.path_params["form_id"], "features": "OPEN"}
):
- response_obj = {}
+ form = Form(**form)
+ response = data.copy()
+ response["id"] = str(uuid.uuid4())
+ response["form_id"] = form.id
- if "DISABLE_ANTISPAM" not in form["features"]:
+ if FormFeatures.DISABLE_ANTISPAM.value not in form.features:
ip_hash_ctx = hashlib.md5()
ip_hash_ctx.update(request.client.host.encode())
ip_hash = binascii.hexlify(ip_hash_ctx.digest())
+ user_agent_hash_ctx = hashlib.md5()
+ user_agent_hash_ctx.update(request.headers["User-Agent"].encode())
+ user_agent_hash = binascii.hexlify(user_agent_hash_ctx.digest())
- response_obj["antispam"] = {
- "ip": ip_hash.decode()
- }
+ dsn_checker = pydnsbl.DNSBLIpChecker()
+ dsn_blacklist = await dsn_checker.check_async(request.client.host)
- if "REQUIRES_LOGIN" in form["features"]:
- if token := data.get("token"):
- data = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
- response_obj["user"] = {
- "user": f"{data['username']}#{data['discriminator']}",
- "id": data["id"]
+ async with httpx.AsyncClient() as client:
+ query_params = {
+ "secret": HCAPTCHA_API_SECRET,
+ "response": data.get("captcha")
}
+ r = await client.post(
+ HCAPTCHA_VERIFY_URL,
+ params=query_params,
+ headers=HCAPTCHA_HEADERS
+ )
+ r.raise_for_status()
+ captcha_data = r.json()
+
+ response["antispam"] = {
+ "ip_hash": ip_hash.decode(),
+ "user_agent_hash": user_agent_hash.decode(),
+ "captcha_pass": captcha_data["success"],
+ "dns_blacklisted": dsn_blacklist.blacklisted,
+ }
+
+ if FormFeatures.REQUIRES_LOGIN.value in form.features:
+ if request.user.is_authenticated:
+ response["user"] = request.user.payload
- if "COLLECT_EMAIL" in form["features"]:
- if data.get("email"):
- response_obj["user"]["email"] = data["email"]
- else:
- return JSONResponse({
- "error": "User data did not include email information"
- })
+ if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]: # noqa
+ return JSONResponse({
+ "error": "email_required"
+ }, status_code=400)
else:
return JSONResponse({
- "error": "Missing Discord user data"
- })
+ "error": "missing_discord_data"
+ }, status_code=400)
+
+ missing_fields = []
+ for question in form.questions:
+ if question.id not in response["response"]:
+ missing_fields.append(question.id)
+
+ if missing_fields:
+ return JSONResponse({
+ "error": "missing_fields",
+ "fields": missing_fields
+ }, status_code=400)
+
+ try:
+ response_obj = FormResponse(**response)
+ except ValidationError as e:
+ return JSONResponse(e.errors())
+
+ await request.state.db.responses.insert_one(
+ response_obj.dict(by_alias=True)
+ )
return JSONResponse({
- "form": form,
- "response": response_obj
+ "form": form.dict(),
+ "response": response_obj.dict()
})
else:
return JSONResponse({