1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
"""
Submit a form.
"""
import binascii
import hashlib
import uuid
import httpx
import pydnsbl
from pydantic import ValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
from backend.constants import HCAPTCHA_API_SECRET, FormFeatures
from backend.models import Form, FormResponse
from backend.route import Route
HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify"
HCAPTCHA_HEADERS = {
"Content-Type": "application/x-www-form-urlencoded"
}
class SubmitForm(Route):
"""
Submit a form with the provided form ID.
"""
name = "submit_form"
path = "/submit/{form_id:str}"
async def post(self, request: Request) -> JSONResponse:
data = await request.json()
if form := await request.state.db.forms.find_one(
{"_id": request.path_params["form_id"], "features": "OPEN"}
):
form = Form(**form)
response = data.copy()
response["id"] = str(uuid.uuid4())
response["form_id"] = form.id
if FormFeatures.DISABLE_ANTISPAM.value not in form.features:
ip_hash_ctx = hashlib.md5()
ip_hash_ctx.update(request.client.host.encode())
ip_hash = binascii.hexlify(ip_hash_ctx.digest())
user_agent_hash_ctx = hashlib.md5()
user_agent_hash_ctx.update(request.headers["User-Agent"].encode())
user_agent_hash = binascii.hexlify(user_agent_hash_ctx.digest())
dsn_checker = pydnsbl.DNSBLIpChecker()
dsn_blacklist = await dsn_checker.check_async(request.client.host)
async with httpx.AsyncClient() as client:
query_params = {
"secret": HCAPTCHA_API_SECRET,
"response": data.get("captcha")
}
r = await client.post(
HCAPTCHA_VERIFY_URL,
params=query_params,
headers=HCAPTCHA_HEADERS
)
r.raise_for_status()
captcha_data = r.json()
response["antispam"] = {
"ip_hash": ip_hash.decode(),
"user_agent_hash": user_agent_hash.decode(),
"captcha_pass": captcha_data["success"],
"dns_blacklisted": dsn_blacklist.blacklisted,
}
if FormFeatures.REQUIRES_LOGIN.value in form.features:
if request.user.is_authenticated:
response["user"] = request.user.payload
if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]: # noqa
return JSONResponse({
"error": "email_required"
}, status_code=400)
else:
return JSONResponse({
"error": "missing_discord_data"
}, status_code=400)
missing_fields = []
for question in form.questions:
if question.id not in response["response"]:
missing_fields.append(question.id)
if missing_fields:
return JSONResponse({
"error": "missing_fields",
"fields": missing_fields
}, status_code=400)
try:
response_obj = FormResponse(**response)
except ValidationError as e:
return JSONResponse(e.errors())
await request.state.db.responses.insert_one(
response_obj.dict(by_alias=True)
)
return JSONResponse({
"form": form.dict(),
"response": response_obj.dict()
})
else:
return JSONResponse({
"error": "Open form not found"
})
|