diff options
-rw-r--r-- | backend/routes/forms/submit.py | 90 |
1 files changed, 68 insertions, 22 deletions
diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py index a94a1c9..054aecb 100644 --- a/backend/routes/forms/submit.py +++ b/backend/routes/forms/submit.py @@ -4,15 +4,24 @@ Submit a form. import binascii import hashlib +import uuid -import jwt +import httpx +import pydnsbl +from pydantic import ValidationError from starlette.requests import Request from starlette.responses import JSONResponse -from backend.constants import SECRET_KEY +from backend.constants import HCAPTCHA_API_SECRET, FormFeatures +from backend.models import Form, FormResponse from backend.route import Route +HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify" +HCAPTCHA_HEADERS = { + "Content-Type": "application/x-www-form-urlencoded" +} + class SubmitForm(Route): """ @@ -28,40 +37,77 @@ class SubmitForm(Route): if form := await request.state.db.forms.find_one( {"_id": request.path_params["form_id"], "features": "OPEN"} ): - response_obj = {} + form = Form(**form) + response = data.copy() + response["id"] = str(uuid.uuid4()) + response["form_id"] = form.id - if "DISABLE_ANTISPAM" not in form["features"]: + if FormFeatures.DISABLE_ANTISPAM.value not in form.features: ip_hash_ctx = hashlib.md5() ip_hash_ctx.update(request.client.host.encode()) ip_hash = binascii.hexlify(ip_hash_ctx.digest()) + user_agent_hash_ctx = hashlib.md5() + user_agent_hash_ctx.update(request.headers["User-Agent"].encode()) + user_agent_hash = binascii.hexlify(user_agent_hash_ctx.digest()) - response_obj["antispam"] = { - "ip": ip_hash.decode() - } + dsn_checker = pydnsbl.DNSBLIpChecker() + dsn_blacklist = await dsn_checker.check_async(request.client.host) - if "REQUIRES_LOGIN" in form["features"]: - if token := data.get("token"): - data = jwt.decode(token, SECRET_KEY, algorithms=['HS256']) - response_obj["user"] = { - "user": f"{data['username']}#{data['discriminator']}", - "id": data["id"] + async with httpx.AsyncClient() as client: + query_params = { + "secret": HCAPTCHA_API_SECRET, + "response": data.get("captcha") } + r = await client.post( + HCAPTCHA_VERIFY_URL, + params=query_params, + headers=HCAPTCHA_HEADERS + ) + r.raise_for_status() + captcha_data = r.json() + + response["antispam"] = { + "ip_hash": ip_hash.decode(), + "user_agent_hash": user_agent_hash.decode(), + "captcha_pass": captcha_data["success"], + "dns_blacklisted": dsn_blacklist.blacklisted, + } + + if FormFeatures.REQUIRES_LOGIN.value in form.features: + if request.user.is_authenticated: + response["user"] = request.user.payload - if "COLLECT_EMAIL" in form["features"]: - if data.get("email"): - response_obj["user"]["email"] = data["email"] - else: - return JSONResponse({ - "error": "User data did not include email information" - }) + if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]: # noqa + return JSONResponse({ + "error": "User data doesn't include email." + }) else: return JSONResponse({ "error": "Missing Discord user data" }) + missing_fields = [] + for question in form.questions: + if question.id not in response["response"]: + missing_fields.append(question.id) + + if missing_fields: + return JSONResponse({ + "error": f"Following missing fields: {', '.join(missing_fields)}." + }) + + try: + response_obj = FormResponse(**response) + except ValidationError as e: + return JSONResponse(e.errors()) + + await request.state.db.responses.insert_one( + response_obj.dict(by_alias=True) + ) + return JSONResponse({ - "form": form, - "response": response_obj + "form": form.dict(), + "response": response_obj.dict() }) else: return JSONResponse({ |