diff options
Diffstat (limited to '')
| -rw-r--r-- | backend/routes/forms/submit.py | 90 | 
1 files changed, 68 insertions, 22 deletions
| diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py index a94a1c9..054aecb 100644 --- a/backend/routes/forms/submit.py +++ b/backend/routes/forms/submit.py @@ -4,15 +4,24 @@ Submit a form.  import binascii  import hashlib +import uuid -import jwt +import httpx +import pydnsbl +from pydantic import ValidationError  from starlette.requests import Request  from starlette.responses import JSONResponse -from backend.constants import SECRET_KEY +from backend.constants import HCAPTCHA_API_SECRET, FormFeatures +from backend.models import Form, FormResponse  from backend.route import Route +HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify" +HCAPTCHA_HEADERS = { +    "Content-Type": "application/x-www-form-urlencoded" +} +  class SubmitForm(Route):      """ @@ -28,40 +37,77 @@ class SubmitForm(Route):          if form := await request.state.db.forms.find_one(              {"_id": request.path_params["form_id"], "features": "OPEN"}          ): -            response_obj = {} +            form = Form(**form) +            response = data.copy() +            response["id"] = str(uuid.uuid4()) +            response["form_id"] = form.id -            if "DISABLE_ANTISPAM" not in form["features"]: +            if FormFeatures.DISABLE_ANTISPAM.value not in form.features:                  ip_hash_ctx = hashlib.md5()                  ip_hash_ctx.update(request.client.host.encode())                  ip_hash = binascii.hexlify(ip_hash_ctx.digest()) +                user_agent_hash_ctx = hashlib.md5() +                user_agent_hash_ctx.update(request.headers["User-Agent"].encode()) +                user_agent_hash = binascii.hexlify(user_agent_hash_ctx.digest()) -                response_obj["antispam"] = { -                    "ip": ip_hash.decode() -                } +                dsn_checker = pydnsbl.DNSBLIpChecker() +                dsn_blacklist = await dsn_checker.check_async(request.client.host) -            if "REQUIRES_LOGIN" in form["features"]: -                if token := data.get("token"): -                    data = jwt.decode(token, SECRET_KEY, algorithms=['HS256']) -                    response_obj["user"] = { -                        "user": f"{data['username']}#{data['discriminator']}", -                        "id": data["id"] +                async with httpx.AsyncClient() as client: +                    query_params = { +                        "secret": HCAPTCHA_API_SECRET, +                        "response": data.get("captcha")                      } +                    r = await client.post( +                        HCAPTCHA_VERIFY_URL, +                        params=query_params, +                        headers=HCAPTCHA_HEADERS +                    ) +                    r.raise_for_status() +                    captcha_data = r.json() + +                response["antispam"] = { +                    "ip_hash": ip_hash.decode(), +                    "user_agent_hash": user_agent_hash.decode(), +                    "captcha_pass": captcha_data["success"], +                    "dns_blacklisted": dsn_blacklist.blacklisted, +                } + +            if FormFeatures.REQUIRES_LOGIN.value in form.features: +                if request.user.is_authenticated: +                    response["user"] = request.user.payload -                    if "COLLECT_EMAIL" in form["features"]: -                        if data.get("email"): -                            response_obj["user"]["email"] = data["email"] -                        else: -                            return JSONResponse({ -                                "error": "User data did not include email information" -                            }) +                    if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]:  # noqa +                        return JSONResponse({ +                            "error": "User data doesn't include email." +                        })                  else:                      return JSONResponse({                          "error": "Missing Discord user data"                      }) +            missing_fields = [] +            for question in form.questions: +                if question.id not in response["response"]: +                    missing_fields.append(question.id) + +            if missing_fields: +                return JSONResponse({ +                    "error": f"Following missing fields: {', '.join(missing_fields)}." +                }) + +            try: +                response_obj = FormResponse(**response) +            except ValidationError as e: +                return JSONResponse(e.errors()) + +            await request.state.db.responses.insert_one( +                response_obj.dict(by_alias=True) +            ) +              return JSONResponse({ -                "form": form, -                "response": response_obj +                "form": form.dict(), +                "response": response_obj.dict()              })          else:              return JSONResponse({ | 
