diff options
Diffstat (limited to 'backend/authentication')
| -rw-r--r-- | backend/authentication/backend.py | 42 | 
1 files changed, 31 insertions, 11 deletions
| diff --git a/backend/authentication/backend.py b/backend/authentication/backend.py index f1d2ece..abe7313 100644 --- a/backend/authentication/backend.py +++ b/backend/authentication/backend.py @@ -1,6 +1,6 @@ -import jwt  import typing as t +import jwt  from starlette import authentication  from starlette.requests import Request @@ -13,18 +13,18 @@ class JWTAuthenticationBackend(authentication.AuthenticationBackend):      """Custom Starlette authentication backend for JWT."""      @staticmethod -    def get_token_from_header(header: str) -> str: -        """Parse JWT token from header value.""" +    def get_token_from_cookie(cookie: str) -> str: +        """Parse JWT token from cookie."""          try: -            prefix, token = header.split() +            prefix, token = cookie.split()          except ValueError:              raise authentication.AuthenticationError( -                "Unable to split prefix and token from Authorization header." +                "Unable to split prefix and token from authorization cookie."              )          if prefix.upper() != "JWT":              raise authentication.AuthenticationError( -                f"Invalid Authorization header prefix '{prefix}'." +                f"Invalid authorization cookie prefix '{prefix}'."              )          return token @@ -33,11 +33,11 @@ class JWTAuthenticationBackend(authentication.AuthenticationBackend):          self, request: Request      ) -> t.Optional[tuple[authentication.AuthCredentials, authentication.BaseUser]]:          """Handles JWT authentication process.""" -        if "Authorization" not in request.headers: +        cookie = request.cookies.get("BackendToken") +        if not cookie:              return None -        auth = request.headers["Authorization"] -        token = self.get_token_from_header(auth) +        token = self.get_token_from_cookie(cookie)          try:              payload = jwt.decode(token, constants.SECRET_KEY, algorithms=["HS256"]) @@ -46,7 +46,27 @@ class JWTAuthenticationBackend(authentication.AuthenticationBackend):          scopes = ["authenticated"] -        if payload.get("admin") is True: +        if not payload.get("token"): +            raise authentication.AuthenticationError("Token is missing from JWT.") +        if not payload.get("refresh"): +            raise authentication.AuthenticationError( +                "Refresh token is missing from JWT." +            ) + +        try: +            user_details = payload.get("user_details") +            if not user_details or not user_details.get("id"): +                raise authentication.AuthenticationError("Improper user details.") +        except Exception: +            raise authentication.AuthenticationError("Could not parse user details.") + +        admin = await request.state.db.admins.find_one( +            {"_id": user_details["id"]} +        ) is not None + +        if admin:              scopes.append("admin") -        return authentication.AuthCredentials(scopes), User(token, payload) +        user = User(token, user_details) + +        return authentication.AuthCredentials(scopes), user | 
