diff options
author | 2024-08-27 22:20:45 +0100 | |
---|---|---|
committer | 2024-08-27 22:24:43 +0100 | |
commit | 4ebc9b9e4928ec0a31426fd06279495cfd2162d9 (patch) | |
tree | 3afe05c42b6b6d6a04c93e3ff39c63696be86f03 | |
parent | Include the issued at date stamp in JWTs (diff) |
Update auth middleware to check for users that need to change their password
-rw-r--r-- | thallium-backend/src/auth.py | 33 |
1 files changed, 24 insertions, 9 deletions
diff --git a/thallium-backend/src/auth.py b/thallium-backend/src/auth.py index 14e126c..98dc540 100644 --- a/thallium-backend/src/auth.py +++ b/thallium-backend/src/auth.py @@ -52,22 +52,37 @@ class TokenAuth(HTTPBearer): status_code=403, detail="Invalid authentication credentials", ) - await self.attach_auth_info_to_state(jwt_data, request, db) + user_or_voucher = await self.verify_login(jwt_data, db) + await self.attach_auth_info_to_state(user_or_voucher, request) - async def attach_auth_info_to_state(self, jwt_data: dict, request: Request, db: DBSession) -> None: - """Attach the auth info of the requesting user to the state object.""" + async def verify_login(self, jwt_data: dict, db: DBSession) -> DBUser | DBVoucher: + """Verify that the JWT is linked to an active user and doesn't require a password reset.""" requester_id = jwt_data["sub"] table = DBUser if jwt_data["iss"] == "thallium:user" else DBVoucher stmt = select(table).where(table.id == requester_id) + if isinstance(table, DBUser): + stmt = stmt.where(DBUser.active) res = await db.scalar(stmt) - if isinstance(res, DBUser): - request.state.user = User.model_validate(res.__dict__) - elif isinstance(res, DBVoucher): - request.state.voucher = Voucher.model_validate(res.__dict__) - else: - raise HTTPException(403, "Your user no longer exists") + if not res: + raise HTTPException(403, "User no longer exists") + if isinstance(res, DBVoucher): + return res + + if res.require_password_change: + raise HTTPException(status_code=403, detail="You must reset your password.") + log.info(jwt_data["iat"]) + if datetime.fromtimestamp(jwt_data["iat"], tz=UTC) < res.password_set_at: + raise HTTPException(status_code=403, detail="Token invalid, password changed.") + return res + + async def attach_auth_info_to_state(self, user_or_voucher: DBUser | DBVoucher, request: Request) -> None: + """Attach the auth info of the requesting user to the state object.""" + if isinstance(user_or_voucher, DBUser): + request.state.user = User.model_validate(user_or_voucher.__dict__) + elif isinstance(user_or_voucher, DBVoucher): + request.state.voucher = Voucher.model_validate(user_or_voucher.__dict__) def build_jwt( |