aboutsummaryrefslogtreecommitdiffstats
path: root/backend/routes/forms/submit.py
blob: 3ecbda0625b63ed4d928d9c03ff6743a3404540e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
Submit a form.
"""

import binascii
import hashlib
import uuid

import httpx
import pydnsbl
from pydantic import ValidationError
from starlette.requests import Request

from starlette.responses import JSONResponse

from backend.constants import HCAPTCHA_API_SECRET, FormFeatures
from backend.models import Form, FormResponse
from backend.route import Route

HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify"
HCAPTCHA_HEADERS = {
    "Content-Type": "application/x-www-form-urlencoded"
}


class SubmitForm(Route):
    """
    Submit a form with the provided form ID.
    """

    name = "submit_form"
    path = "/submit/{form_id:str}"

    async def post(self, request: Request) -> JSONResponse:
        data = await request.json()

        if form := await request.state.db.forms.find_one(
            {"_id": request.path_params["form_id"], "features": "OPEN"}
        ):
            form = Form(**form)
            response = data.copy()
            response["id"] = str(uuid.uuid4())
            response["form_id"] = form.id

            if FormFeatures.DISABLE_ANTISPAM.value not in form.features:
                ip_hash_ctx = hashlib.md5()
                ip_hash_ctx.update(request.client.host.encode())
                ip_hash = binascii.hexlify(ip_hash_ctx.digest())
                user_agent_hash_ctx = hashlib.md5()
                user_agent_hash_ctx.update(request.headers["User-Agent"].encode())
                user_agent_hash = binascii.hexlify(user_agent_hash_ctx.digest())

                dsn_checker = pydnsbl.DNSBLIpChecker()
                dsn_blacklist = await dsn_checker.check_async(request.client.host)

                async with httpx.AsyncClient() as client:
                    query_params = {
                        "secret": HCAPTCHA_API_SECRET,
                        "response": data.get("captcha")
                    }
                    r = await client.post(
                        HCAPTCHA_VERIFY_URL,
                        params=query_params,
                        headers=HCAPTCHA_HEADERS
                    )
                    r.raise_for_status()
                    captcha_data = r.json()

                response["antispam"] = {
                    "ip_hash": ip_hash.decode(),
                    "user_agent_hash": user_agent_hash.decode(),
                    "captcha_pass": captcha_data["success"],
                    "dns_blacklisted": dsn_blacklist.blacklisted,
                }

            if FormFeatures.REQUIRES_LOGIN.value in form.features:
                if request.user.is_authenticated:
                    response["user"] = request.user.payload

                    if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]:  # noqa
                        return JSONResponse({
                            "error": "email_required"
                        }, status_code=400)
                else:
                    return JSONResponse({
                        "error": "missing_discord_data"
                    }, status_code=400)

            missing_fields = []
            for question in form.questions:
                if question.id not in response["response"]:
                    missing_fields.append(question.id)

            if missing_fields:
                return JSONResponse({
                    "error": "missing_fields",
                    "fields": missing_fields
                }, status_code=400)

            try:
                response_obj = FormResponse(**response)
            except ValidationError as e:
                return JSONResponse(e.errors())

            await request.state.db.responses.insert_one(
                response_obj.dict(by_alias=True)
            )

            return JSONResponse({
                "form": form.dict(),
                "response": response_obj.dict()
            })
        else:
            return JSONResponse({
                "error": "Open form not found"
            })