diff options
Diffstat (limited to 'backend/routes')
| -rw-r--r-- | backend/routes/auth/authorize.py | 121 | ||||
| -rw-r--r-- | backend/routes/forms/form.py | 2 | ||||
| -rw-r--r-- | backend/routes/forms/submit.py | 48 | 
3 files changed, 145 insertions, 26 deletions
| diff --git a/backend/routes/auth/authorize.py b/backend/routes/auth/authorize.py index 975936a..d4587f0 100644 --- a/backend/routes/auth/authorize.py +++ b/backend/routes/auth/authorize.py @@ -2,26 +2,101 @@  Use a token received from the Discord OAuth2 system to fetch user information.  """ +import datetime +from typing import Union +  import httpx  import jwt  from pydantic.fields import Field  from pydantic.main import BaseModel  from spectree.response import Response +from starlette import responses +from starlette.authentication import requires  from starlette.requests import Request -from starlette.responses import JSONResponse +from backend import constants +from backend.authentication.user import User  from backend.constants import SECRET_KEY -from backend.route import Route  from backend.discord import fetch_bearer_token, fetch_user_details +from backend.route import Route  from backend.validation import ErrorMessage, api +AUTH_FAILURE = responses.JSONResponse({"error": "auth_failure"}, status_code=400) +  class AuthorizeRequest(BaseModel):      token: str = Field(description="The access token received from Discord.")  class AuthorizeResponse(BaseModel): -    token: str = Field(description="A JWT token containing the user information") +    username: str = Field("Discord display name.") +    expiry: str = Field("ISO formatted timestamp of expiry.") + + +async def process_token( +        bearer_token: dict, +        request: Request +) -> Union[AuthorizeResponse, AUTH_FAILURE]: +    """Post a bearer token to Discord, and return a JWT and username.""" +    interaction_start = datetime.datetime.now() + +    try: +        user_details = await fetch_user_details(bearer_token["access_token"]) +    except httpx.HTTPStatusError: +        AUTH_FAILURE.delete_cookie("token") +        return AUTH_FAILURE + +    max_age = datetime.timedelta(seconds=int(bearer_token["expires_in"])) +    token_expiry = interaction_start + max_age + +    data = { +        "token": bearer_token["access_token"], +        "refresh": bearer_token["refresh_token"], +        "user_details": user_details, +        "expiry": token_expiry.isoformat() +    } + +    token = jwt.encode(data, SECRET_KEY, algorithm="HS256") +    user = User(token, user_details) + +    response = responses.JSONResponse({ +        "username": user.display_name, +        "expiry": token_expiry.isoformat() +    }) + +    await set_response_token(response, request, token, bearer_token["expires_in"]) +    return response + + +async def set_response_token( +        response: responses.Response, +        request: Request, +        new_token: str, +        expiry: int +) -> None: +    """Helper that handles logic for updating a token in a set-cookie response.""" +    origin_url = request.headers.get("origin") + +    if origin_url == constants.PRODUCTION_URL: +        domain = request.url.netloc +        samesite = "strict" + +    elif not constants.PRODUCTION: +        domain = None +        samesite = "strict" + +    else: +        domain = request.url.netloc +        samesite = "None" + +    response.set_cookie( +        "token", f"JWT {new_token}", +        secure=constants.PRODUCTION, +        httponly=True, +        samesite=samesite, +        domain=domain, +        max_age=expiry +    )  class AuthorizeRoute(Route): @@ -37,22 +112,38 @@ class AuthorizeRoute(Route):          resp=Response(HTTP_200=AuthorizeResponse, HTTP_400=ErrorMessage),          tags=["auth"]      ) -    async def post(self, request: Request) -> JSONResponse: +    async def post(self, request: Request) -> responses.JSONResponse:          """Generate an authorization token."""          data = await request.json() -          try: -            bearer_token = await fetch_bearer_token(data["token"]) -            user_details = await fetch_user_details(bearer_token["access_token"]) +            url = request.headers.get("origin") +            bearer_token = await fetch_bearer_token(data["token"], url, refresh=False)          except httpx.HTTPStatusError: -            return JSONResponse({ -                "error": "auth_failure" -            }, status_code=400) +            return AUTH_FAILURE -        user_details["admin"] = await request.state.db.admins.find_one( -            {"_id": user_details["id"]} -        ) is not None +        return await process_token(bearer_token, request) + + +class TokenRefreshRoute(Route): +    """ +    Use the refresh code from a JWT to get a new token and generate a new JWT token. +    """ -        token = jwt.encode(user_details, SECRET_KEY, algorithm="HS256") +    name = "refresh" +    path = "/refresh" + +    @requires(["authenticated"]) +    @api.validate( +        resp=Response(HTTP_200=AuthorizeResponse, HTTP_400=ErrorMessage), +        tags=["auth"] +    ) +    async def post(self, request: Request) -> responses.JSONResponse: +        """Refresh an authorization token.""" +        try: +            token = request.user.decoded_token.get("refresh") +            url = request.headers.get("origin") +            bearer_token = await fetch_bearer_token(token, url, refresh=True) +        except httpx.HTTPStatusError: +            return AUTH_FAILURE -        return JSONResponse({"token": token}) +        return await process_token(bearer_token, request) diff --git a/backend/routes/forms/form.py b/backend/routes/forms/form.py index dd1c83f..1c6e44a 100644 --- a/backend/routes/forms/form.py +++ b/backend/routes/forms/form.py @@ -27,7 +27,7 @@ class SingleForm(Route):      @api.validate(resp=Response(HTTP_200=Form, HTTP_404=ErrorMessage), tags=["forms"])      async def get(self, request: Request) -> JSONResponse:          """Returns single form information by ID.""" -        admin = request.user.payload["admin"] if request.user.is_authenticated else False +        admin = request.user.admin if request.user.is_authenticated else False          filters = {              "_id": request.path_params["form_id"] diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py index b3a6afd..2624c98 100644 --- a/backend/routes/forms/submit.py +++ b/backend/routes/forms/submit.py @@ -3,6 +3,7 @@ Submit a form.  """  import binascii +import datetime  import hashlib  import uuid  from typing import Any, Optional @@ -15,11 +16,13 @@ from starlette.background import BackgroundTask  from starlette.requests import Request  from starlette.responses import JSONResponse -from backend.constants import FRONTEND_URL, FormFeatures, HCAPTCHA_API_SECRET +from backend import constants +from backend.authentication.user import User  from backend.models import Form, FormResponse  from backend.route import Route +from backend.routes.auth.authorize import set_response_token  from backend.routes.forms.unittesting import execute_unittest -from backend.validation import AuthorizationHeaders, ErrorMessage, api +from backend.validation import ErrorMessage, api  HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify"  HCAPTCHA_HEADERS = { @@ -52,13 +55,37 @@ class SubmitForm(Route):              HTTP_404=ErrorMessage,              HTTP_400=ErrorMessage          ), -        headers=AuthorizationHeaders,          tags=["forms", "responses"]      )      async def post(self, request: Request) -> JSONResponse:          """Submit a response to the form.""" -        data = await request.json() +        response = await self.submit(request) + +        # Silently try to update user data +        try: +            if hasattr(request.user, User.refresh_data.__name__): +                old = request.user.token +                await request.user.refresh_data() + +                if old != request.user.token: +                    try: +                        expiry = datetime.datetime.fromisoformat( +                            request.user.decoded_token.get("expiry") +                        ) +                    except ValueError: +                        expiry = None +                    expiry_seconds = (expiry - datetime.datetime.now()).seconds +                    await set_response_token(response, request, request.user.token, expiry_seconds) + +        except httpx.HTTPStatusError: +            pass + +        return response + +    async def submit(self, request: Request) -> JSONResponse: +        """Helper method for handling submission logic.""" +        data = await request.json()          data["timestamp"] = None          if form := await request.state.db.forms.find_one( @@ -69,7 +96,7 @@ class SubmitForm(Route):              response["id"] = str(uuid.uuid4())              response["form_id"] = form.id -            if FormFeatures.DISABLE_ANTISPAM.value not in form.features: +            if constants.FormFeatures.DISABLE_ANTISPAM.value not in form.features:                  ip_hash_ctx = hashlib.md5()                  ip_hash_ctx.update(request.client.host.encode())                  ip_hash = binascii.hexlify(ip_hash_ctx.digest()) @@ -79,7 +106,7 @@ class SubmitForm(Route):                  async with httpx.AsyncClient() as client:                      query_params = { -                        "secret": HCAPTCHA_API_SECRET, +                        "secret": constants.HCAPTCHA_API_SECRET,                          "response": data.get("captcha")                      }                      r = await client.post( @@ -96,12 +123,13 @@ class SubmitForm(Route):                      "captcha_pass": captcha_data["success"]                  } -            if FormFeatures.REQUIRES_LOGIN.value in form.features: +            if constants.FormFeatures.REQUIRES_LOGIN.value in form.features:                  if request.user.is_authenticated:                      response["user"] = request.user.payload +                    response["user"]["admin"] = request.user.admin                      if ( -                            FormFeatures.COLLECT_EMAIL.value in form.features +                            constants.FormFeatures.COLLECT_EMAIL.value in form.features                              and "email" not in response["user"]                      ):                          return JSONResponse({ @@ -153,7 +181,7 @@ class SubmitForm(Route):              )              send_webhook = None -            if FormFeatures.WEBHOOK_ENABLED.value in form.features: +            if constants.FormFeatures.WEBHOOK_ENABLED.value in form.features:                  send_webhook = BackgroundTask(                      self.send_submission_webhook,                      form=form, @@ -193,7 +221,7 @@ class SubmitForm(Route):          embed = {              "title": "New Form Response",              "description": f"{mention} submitted a response to `{form.name}`.", -            "url": f"{FRONTEND_URL}/path_to_view_form/{response.id}",  # TODO: Enter Form View URL +            "url": f"{constants.FRONTEND_URL}/path_to_view_form/{response.id}",  # noqa # TODO: Enter Form View URL              "timestamp": response.timestamp,              "color": 7506394,          } | 
