blob: e15058087463aa6e9a5fcc729b1d268ffa55fe51 (
plain) (
blame)
| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
 | import jwt
from starlette import authentication
from starlette.requests import Request
from backend import constants, discord
# We must import user such way here to avoid circular imports
from .user import User
class JWTAuthenticationBackend(authentication.AuthenticationBackend):
    """Custom Starlette authentication backend for JWT."""
    @staticmethod
    def get_token_from_cookie(cookie: str) -> str:
        """Parse JWT token from cookie."""
        try:
            prefix, token = cookie.split()
        except ValueError:
            msg = "Unable to split prefix and token from authorization cookie."
            raise authentication.AuthenticationError(msg)
        if prefix.upper() != "JWT":
            msg = f"Invalid authorization cookie prefix '{prefix}'."
            raise authentication.AuthenticationError(msg)
        return token
    async def authenticate(
        self,
        request: Request,
    ) -> tuple[authentication.AuthCredentials, authentication.BaseUser] | None:
        """Handles JWT authentication process."""
        cookie = request.cookies.get("token")
        if not cookie:
            return None
        token = self.get_token_from_cookie(cookie)
        try:
            payload = jwt.decode(token, constants.SECRET_KEY, algorithms=["HS256"])
        except jwt.InvalidTokenError as e:
            raise authentication.AuthenticationError(str(e))
        scopes = ["authenticated"]
        if not payload.get("token"):
            msg = "Token is missing from JWT."
            raise authentication.AuthenticationError(msg)
        if not payload.get("refresh"):
            msg = "Refresh token is missing from JWT."
            raise authentication.AuthenticationError(msg)
        try:
            user_details = payload.get("user_details")
            if not user_details or not user_details.get("id"):
                msg = "Improper user details."
                raise authentication.AuthenticationError(msg)  # noqa: TRY301
        except Exception:  # noqa: BLE001
            msg = "Could not parse user details."
            raise authentication.AuthenticationError(msg)
        user = User(
            token,
            user_details,
            await discord.get_member(user_details["id"]),
        )
        if await user.fetch_admin_status(request.state.db):
            scopes.append("admin")
        scopes.extend(await user.get_user_roles())
        return authentication.AuthCredentials(scopes), user
 |