blob: 54385e20c0dc29052285355d8da9ee4ac7f69a6d (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
import typing as t
import jwt
from starlette import authentication
from starlette.requests import Request
from backend import constants
from backend import discord
# We must import user such way here to avoid circular imports
from .user import User
class JWTAuthenticationBackend(authentication.AuthenticationBackend):
"""Custom Starlette authentication backend for JWT."""
@staticmethod
def get_token_from_cookie(cookie: str) -> str:
"""Parse JWT token from cookie."""
try:
prefix, token = cookie.split()
except ValueError:
raise authentication.AuthenticationError(
"Unable to split prefix and token from authorization cookie."
)
if prefix.upper() != "JWT":
raise authentication.AuthenticationError(
f"Invalid authorization cookie prefix '{prefix}'."
)
return token
async def authenticate(
self, request: Request
) -> t.Optional[tuple[authentication.AuthCredentials, authentication.BaseUser]]:
"""Handles JWT authentication process."""
cookie = request.cookies.get("token")
if not cookie:
return None
token = self.get_token_from_cookie(cookie)
try:
payload = jwt.decode(token, constants.SECRET_KEY, algorithms=["HS256"])
except jwt.InvalidTokenError as e:
raise authentication.AuthenticationError(str(e))
scopes = ["authenticated"]
if not payload.get("token"):
raise authentication.AuthenticationError("Token is missing from JWT.")
if not payload.get("refresh"):
raise authentication.AuthenticationError(
"Refresh token is missing from JWT."
)
try:
user_details = payload.get("user_details")
if not user_details or not user_details.get("id"):
raise authentication.AuthenticationError("Improper user details.")
except Exception:
raise authentication.AuthenticationError("Could not parse user details.")
user = User(
token, user_details, await discord.get_member(request.state.db, user_details["id"])
)
if await user.fetch_admin_status(request.state.db):
scopes.append("admin")
scopes.extend(await user.get_user_roles(request.state.db))
return authentication.AuthCredentials(scopes), user
|