diff options
| author | 2022-02-05 16:50:11 +0400 | |
|---|---|---|
| committer | 2022-02-05 17:20:40 +0400 | |
| commit | 134b2f70e4cf947744f1b061766bb37fe616ad65 (patch) | |
| tree | ef6b95bc5a78528d91ae969f3cfd00bc8e5be8ed /backend/authentication | |
| parent | Add Helper Functions For Managing Roles (diff) | |
Overhaul Scope System
Adds discord role support to the pre-existing scopes system to power
more complex access permissions.
Signed-off-by: Hassan Abouelela <[email protected]>
Diffstat (limited to 'backend/authentication')
| -rw-r--r-- | backend/authentication/backend.py | 9 | ||||
| -rw-r--r-- | backend/authentication/user.py | 45 | 
2 files changed, 45 insertions, 9 deletions
| diff --git a/backend/authentication/backend.py b/backend/authentication/backend.py index c7590e9..54385e2 100644 --- a/backend/authentication/backend.py +++ b/backend/authentication/backend.py @@ -5,6 +5,7 @@ from starlette import authentication  from starlette.requests import Request  from backend import constants +from backend import discord  # We must import user such way here to avoid circular imports  from .user import User @@ -60,8 +61,12 @@ class JWTAuthenticationBackend(authentication.AuthenticationBackend):          except Exception:              raise authentication.AuthenticationError("Could not parse user details.") -        user = User(token, user_details) -        if await user.fetch_admin_status(request): +        user = User( +            token, user_details, await discord.get_member(request.state.db, user_details["id"]) +        ) +        if await user.fetch_admin_status(request.state.db):              scopes.append("admin") +        scopes.extend(await user.get_user_roles(request.state.db)) +          return authentication.AuthCredentials(scopes), user diff --git a/backend/authentication/user.py b/backend/authentication/user.py index 857c2ed..0ec0188 100644 --- a/backend/authentication/user.py +++ b/backend/authentication/user.py @@ -1,20 +1,27 @@ +import typing  import typing as t  import jwt +from pymongo.database import Database  from starlette.authentication import BaseUser -from starlette.requests import Request +from backend import discord, models  from backend.constants import SECRET_KEY -from backend.discord import fetch_user_details  class User(BaseUser):      """Starlette BaseUser implementation for JWT authentication.""" -    def __init__(self, token: str, payload: dict[str, t.Any]) -> None: +    def __init__( +        self, +        token: str, +        payload: dict[str, t.Any], +        member: typing.Optional[models.DiscordMember], +    ) -> None:          self.token = token          self.payload = payload          self.admin = False +        self.member = member      @property      def is_authenticated(self) -> bool: @@ -34,16 +41,40 @@ class User(BaseUser):      def decoded_token(self) -> dict[str, any]:          return jwt.decode(self.token, SECRET_KEY, algorithms=["HS256"]) -    async def fetch_admin_status(self, request: Request) -> bool: -        self.admin = await request.state.db.admins.find_one( +    async def get_user_roles(self, database: Database) -> list[str]: +        """Get a list of the user's discord roles.""" +        if not self.member: +            return [] + +        server_roles = await discord.get_roles(database) +        roles = [] + +        for role in server_roles: +            if role.id in self.member.roles: +                roles.append(role.name) + +        if "admin" in roles: +            # Protect against collision with the forms admin role +            roles.remove("admin") +            roles.append("discord admin") + +        return roles + +    async def fetch_admin_status(self, database: Database) -> bool: +        self.admin = await database.admins.find_one(              {"_id": self.payload["id"]}          ) is not None          return self.admin -    async def refresh_data(self) -> None: +    async def refresh_data(self, database: Database) -> None:          """Fetches user data from discord, and updates the instance.""" -        self.payload = await fetch_user_details(self.decoded_token.get("token")) +        self.member = await discord.get_member(database, self.payload["id"]) + +        if self.member: +            self.payload = self.member.user.dict() +        else: +            self.payload = await discord.fetch_user_details(self.decoded_token.get("token"))          updated_info = self.decoded_token          updated_info["user_details"] = self.payload | 
