aboutsummaryrefslogtreecommitdiffstats
path: root/backend/routes
diff options
context:
space:
mode:
authorGravatar Joe Banks <[email protected]>2021-03-06 19:54:33 +0000
committerGravatar GitHub <[email protected]>2021-03-06 19:54:33 +0000
commit0f363c26271594de42ee05bb59a99e99c6e12de1 (patch)
tree2e409cac18da2f976b5593065a185cec01e90c85 /backend/routes
parentMerge pull request #62 from python-discord/dependabot/pip/uvicorn-0.13.4 (diff)
parentMerge branch 'main' into token-expiry (diff)
Merge pull request #58 from python-discord/token-expiry
Diffstat (limited to 'backend/routes')
-rw-r--r--backend/routes/auth/authorize.py87
-rw-r--r--backend/routes/forms/form.py2
-rw-r--r--backend/routes/forms/submit.py50
3 files changed, 115 insertions, 24 deletions
diff --git a/backend/routes/auth/authorize.py b/backend/routes/auth/authorize.py
index 975936a..26d8622 100644
--- a/backend/routes/auth/authorize.py
+++ b/backend/routes/auth/authorize.py
@@ -2,26 +2,71 @@
Use a token received from the Discord OAuth2 system to fetch user information.
"""
+import datetime
+from typing import Union
+
import httpx
import jwt
from pydantic.fields import Field
from pydantic.main import BaseModel
from spectree.response import Response
+from starlette.authentication import requires
from starlette.requests import Request
from starlette.responses import JSONResponse
+from backend import constants
+from backend.authentication.user import User
from backend.constants import SECRET_KEY
-from backend.route import Route
from backend.discord import fetch_bearer_token, fetch_user_details
+from backend.route import Route
from backend.validation import ErrorMessage, api
+AUTH_FAILURE = JSONResponse({"error": "auth_failure"}, status_code=400)
+
class AuthorizeRequest(BaseModel):
token: str = Field(description="The access token received from Discord.")
class AuthorizeResponse(BaseModel):
- token: str = Field(description="A JWT token containing the user information")
+ username: str = Field("Discord display name.")
+ expiry: str = Field("ISO formatted timestamp of expiry.")
+
+
+async def process_token(bearer_token: dict) -> Union[AuthorizeResponse, AUTH_FAILURE]:
+ """Post a bearer token to Discord, and return a JWT and username."""
+ interaction_start = datetime.datetime.now()
+
+ try:
+ user_details = await fetch_user_details(bearer_token["access_token"])
+ except httpx.HTTPStatusError:
+ AUTH_FAILURE.delete_cookie("token")
+ return AUTH_FAILURE
+
+ max_age = datetime.timedelta(seconds=int(bearer_token["expires_in"]))
+ token_expiry = interaction_start + max_age
+
+ data = {
+ "token": bearer_token["access_token"],
+ "refresh": bearer_token["refresh_token"],
+ "user_details": user_details,
+ "expiry": token_expiry.isoformat()
+ }
+
+ token = jwt.encode(data, SECRET_KEY, algorithm="HS256")
+ user = User(token, user_details)
+
+ response = JSONResponse({
+ "username": user.display_name,
+ "expiry": token_expiry.isoformat()
+ })
+
+ response.set_cookie(
+ "token", f"JWT {token}",
+ secure=constants.PRODUCTION, httponly=True, samesite="strict",
+ max_age=bearer_token["expires_in"]
+ )
+ return response
class AuthorizeRoute(Route):
@@ -40,19 +85,35 @@ class AuthorizeRoute(Route):
async def post(self, request: Request) -> JSONResponse:
"""Generate an authorization token."""
data = await request.json()
-
try:
- bearer_token = await fetch_bearer_token(data["token"])
- user_details = await fetch_user_details(bearer_token["access_token"])
+ url = request.headers.get("origin")
+ bearer_token = await fetch_bearer_token(data["token"], url, refresh=False)
except httpx.HTTPStatusError:
- return JSONResponse({
- "error": "auth_failure"
- }, status_code=400)
+ return AUTH_FAILURE
+
+ return await process_token(bearer_token)
- user_details["admin"] = await request.state.db.admins.find_one(
- {"_id": user_details["id"]}
- ) is not None
- token = jwt.encode(user_details, SECRET_KEY, algorithm="HS256")
+class TokenRefreshRoute(Route):
+ """
+ Use the refresh code from a JWT to get a new token and generate a new JWT token.
+ """
+
+ name = "refresh"
+ path = "/refresh"
+
+ @requires(["authenticated"])
+ @api.validate(
+ resp=Response(HTTP_200=AuthorizeResponse, HTTP_400=ErrorMessage),
+ tags=["auth"]
+ )
+ async def post(self, request: Request) -> JSONResponse:
+ """Refresh an authorization token."""
+ try:
+ token = request.user.decoded_token.get("refresh")
+ url = request.headers.get("origin")
+ bearer_token = await fetch_bearer_token(token, url, refresh=True)
+ except httpx.HTTPStatusError:
+ return AUTH_FAILURE
- return JSONResponse({"token": token})
+ return await process_token(bearer_token)
diff --git a/backend/routes/forms/form.py b/backend/routes/forms/form.py
index dd1c83f..1c6e44a 100644
--- a/backend/routes/forms/form.py
+++ b/backend/routes/forms/form.py
@@ -27,7 +27,7 @@ class SingleForm(Route):
@api.validate(resp=Response(HTTP_200=Form, HTTP_404=ErrorMessage), tags=["forms"])
async def get(self, request: Request) -> JSONResponse:
"""Returns single form information by ID."""
- admin = request.user.payload["admin"] if request.user.is_authenticated else False
+ admin = request.user.admin if request.user.is_authenticated else False
filters = {
"_id": request.path_params["form_id"]
diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py
index b3a6afd..8680b2d 100644
--- a/backend/routes/forms/submit.py
+++ b/backend/routes/forms/submit.py
@@ -3,6 +3,7 @@ Submit a form.
"""
import binascii
+import datetime
import hashlib
import uuid
from typing import Any, Optional
@@ -15,11 +16,12 @@ from starlette.background import BackgroundTask
from starlette.requests import Request
from starlette.responses import JSONResponse
-from backend.constants import FRONTEND_URL, FormFeatures, HCAPTCHA_API_SECRET
+from backend import constants
+from backend.authentication.user import User
from backend.models import Form, FormResponse
from backend.route import Route
from backend.routes.forms.unittesting import execute_unittest
-from backend.validation import AuthorizationHeaders, ErrorMessage, api
+from backend.validation import ErrorMessage, api
HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify"
HCAPTCHA_HEADERS = {
@@ -52,13 +54,40 @@ class SubmitForm(Route):
HTTP_404=ErrorMessage,
HTTP_400=ErrorMessage
),
- headers=AuthorizationHeaders,
tags=["forms", "responses"]
)
async def post(self, request: Request) -> JSONResponse:
"""Submit a response to the form."""
- data = await request.json()
+ response = await self.submit(request)
+
+ # Silently try to update user data
+ try:
+ if hasattr(request.user, User.refresh_data.__name__):
+ old = request.user.token
+ await request.user.refresh_data()
+
+ if old != request.user.token:
+ try:
+ expiry = datetime.datetime.fromisoformat(
+ request.user.decoded_token.get("expiry")
+ )
+ except ValueError:
+ expiry = None
+
+ response.set_cookie(
+ "token", f"JWT {request.user.token}",
+ secure=constants.PRODUCTION, httponly=True, samesite="strict",
+ max_age=(expiry - datetime.datetime.now()).seconds
+ )
+ except httpx.HTTPStatusError:
+ pass
+
+ return response
+
+ async def submit(self, request: Request) -> JSONResponse:
+ """Helper method for handling submission logic."""
+ data = await request.json()
data["timestamp"] = None
if form := await request.state.db.forms.find_one(
@@ -69,7 +98,7 @@ class SubmitForm(Route):
response["id"] = str(uuid.uuid4())
response["form_id"] = form.id
- if FormFeatures.DISABLE_ANTISPAM.value not in form.features:
+ if constants.FormFeatures.DISABLE_ANTISPAM.value not in form.features:
ip_hash_ctx = hashlib.md5()
ip_hash_ctx.update(request.client.host.encode())
ip_hash = binascii.hexlify(ip_hash_ctx.digest())
@@ -79,7 +108,7 @@ class SubmitForm(Route):
async with httpx.AsyncClient() as client:
query_params = {
- "secret": HCAPTCHA_API_SECRET,
+ "secret": constants.HCAPTCHA_API_SECRET,
"response": data.get("captcha")
}
r = await client.post(
@@ -96,12 +125,13 @@ class SubmitForm(Route):
"captcha_pass": captcha_data["success"]
}
- if FormFeatures.REQUIRES_LOGIN.value in form.features:
+ if constants.FormFeatures.REQUIRES_LOGIN.value in form.features:
if request.user.is_authenticated:
response["user"] = request.user.payload
+ response["user"]["admin"] = request.user.admin
if (
- FormFeatures.COLLECT_EMAIL.value in form.features
+ constants.FormFeatures.COLLECT_EMAIL.value in form.features
and "email" not in response["user"]
):
return JSONResponse({
@@ -153,7 +183,7 @@ class SubmitForm(Route):
)
send_webhook = None
- if FormFeatures.WEBHOOK_ENABLED.value in form.features:
+ if constants.FormFeatures.WEBHOOK_ENABLED.value in form.features:
send_webhook = BackgroundTask(
self.send_submission_webhook,
form=form,
@@ -193,7 +223,7 @@ class SubmitForm(Route):
embed = {
"title": "New Form Response",
"description": f"{mention} submitted a response to `{form.name}`.",
- "url": f"{FRONTEND_URL}/path_to_view_form/{response.id}", # TODO: Enter Form View URL
+ "url": f"{constants.FRONTEND_URL}/path_to_view_form/{response.id}", # noqa # TODO: Enter Form View URL
"timestamp": response.timestamp,
"color": 7506394,
}