diff options
-rw-r--r-- | backend/discord.py | 38 | ||||
-rw-r--r-- | backend/models/form.py | 12 | ||||
-rw-r--r-- | backend/routes/forms/discover.py | 2 | ||||
-rw-r--r-- | backend/routes/forms/response.py | 18 | ||||
-rw-r--r-- | backend/routes/forms/responses.py | 17 |
5 files changed, 73 insertions, 14 deletions
diff --git a/backend/discord.py b/backend/discord.py index 51de26a..4e35216 100644 --- a/backend/discord.py +++ b/backend/discord.py @@ -5,6 +5,7 @@ import json import typing import httpx +import starlette.requests from pymongo.database import Database from backend import constants, models @@ -150,3 +151,40 @@ async def get_member( "inserted_at": datetime.datetime.now(tz=datetime.timezone.utc), }) return member + + +class FormNotFoundError(Exception): + """The requested form was not found.""" + + +async def _verify_access_helper( + form_id: str, request: starlette.requests.Request, attribute: str +) -> bool: + """A low level helper to validate access to a form resource based on the user's scopes.""" + # Short circuit all resources for admins + if "admin" in request.auth.scopes: + return True + + form = await request.state.db.forms.find_one({"id": form_id}) + + if not form: + raise FormNotFoundError() + + form = models.Form(**form) + + for role_id in getattr(form, attribute) or []: + role = await request.state.db.roles.find_one({"id": role_id}) + if not role: + continue + + role = models.DiscordRole(**json.loads(role["data"])) + + if role.name in request.auth.scopes: + return True + + return False + + +async def verify_response_access(form_id: str, request: starlette.requests.Request) -> bool: + """Ensure the user can access responses on the requested resource.""" + return await _verify_access_helper(form_id, request, "response_readers") diff --git a/backend/models/form.py b/backend/models/form.py index f19ed85..45a7e0b 100644 --- a/backend/models/form.py +++ b/backend/models/form.py @@ -1,10 +1,10 @@ import typing as t import httpx -from pydantic import constr, BaseModel, Field, root_validator, validator +from pydantic import BaseModel, Field, constr, root_validator, validator from pydantic.error_wrappers import ErrorWrapper, ValidationError -from backend.constants import FormFeatures, WebHook +from backend.constants import DISCORD_GUILD, FormFeatures, WebHook from .question import Question PUBLIC_FIELDS = [ @@ -43,6 +43,7 @@ class Form(BaseModel): submitted_text: t.Optional[str] = None webhook: _WebHook = None discord_role: t.Optional[str] + response_readers: t.Optional[list[str]] class Config: allow_population_by_field_name = True @@ -67,6 +68,13 @@ class Form(BaseModel): return value + @validator("response_readers") + def validate_role_scoping(cls, value: t.Optional[list[str]]): + """Ensure special role based permissions aren't granted to the @everyone role.""" + if value and str(DISCORD_GUILD) in value: + raise ValueError("You can not add the everyone role as an access scope.") + return value + @root_validator def validate_role(cls, values: dict[str, t.Any]) -> t.Optional[dict[str, t.Any]]: """Validates does Discord role provided when flag provided.""" diff --git a/backend/routes/forms/discover.py b/backend/routes/forms/discover.py index d7351d5..b993075 100644 --- a/backend/routes/forms/discover.py +++ b/backend/routes/forms/discover.py @@ -29,7 +29,7 @@ EMPTY_FORM = Form( features=__FEATURES, questions=[__QUESTION], name="Auth form", - description="An empty form to help you get a token." + description="An empty form to help you get a token.", ) diff --git a/backend/routes/forms/response.py b/backend/routes/forms/response.py index d8d8d17..fbf8e99 100644 --- a/backend/routes/forms/response.py +++ b/backend/routes/forms/response.py @@ -1,11 +1,13 @@ """ Returns or deletes form response by ID. """ + from spectree import Response as RouteResponse from starlette.authentication import requires from starlette.requests import Request from starlette.responses import JSONResponse +from backend import discord from backend.models import FormResponse from backend.route import Route from backend.validation import ErrorMessage, OkayResponse, api @@ -17,23 +19,31 @@ class Response(Route): name = "response" path = "/{form_id:str}/responses/{response_id:str}" - @requires(["authenticated", "admin"]) + @requires(["authenticated"]) @api.validate( - resp=RouteResponse(HTTP_200=FormResponse, HTTP_404=ErrorMessage), + resp=RouteResponse(HTTP_200=FormResponse, HTTP_401=ErrorMessage, HTTP_404=ErrorMessage), tags=["forms", "responses"] ) async def get(self, request: Request) -> JSONResponse: """Return a single form response by ID.""" + form_id = request.path_params["form_id"] + + try: + if not await discord.verify_response_access(form_id, request): + return JSONResponse({"error": "unauthorized"}, status_code=401) + except discord.FormNotFoundError: + return JSONResponse({"error": "form_not_found"}, status_code=404) + if raw_response := await request.state.db.responses.find_one( { "_id": request.path_params["response_id"], - "form_id": request.path_params["form_id"] + "form_id": form_id } ): response = FormResponse(**raw_response) return JSONResponse(response.dict()) else: - return JSONResponse({"error": "not_found"}, status_code=404) + return JSONResponse({"error": "response_not_found"}, status_code=404) @requires(["authenticated", "admin"]) @api.validate( diff --git a/backend/routes/forms/responses.py b/backend/routes/forms/responses.py index f3c4cd7..1c8ebe3 100644 --- a/backend/routes/forms/responses.py +++ b/backend/routes/forms/responses.py @@ -7,9 +7,10 @@ from starlette.authentication import requires from starlette.requests import Request from starlette.responses import JSONResponse +from backend import discord from backend.models import FormResponse, ResponseList from backend.route import Route -from backend.validation import api, ErrorMessage, OkayResponse +from backend.validation import ErrorMessage, OkayResponse, api class ResponseIdList(BaseModel): @@ -24,20 +25,22 @@ class Responses(Route): name = "form_responses" path = "/{form_id:str}/responses" - @requires(["authenticated", "admin"]) + @requires(["authenticated"]) @api.validate( - resp=Response(HTTP_200=ResponseList, HTTP_404=ErrorMessage), + resp=Response(HTTP_200=ResponseList, HTTP_401=ErrorMessage, HTTP_404=ErrorMessage), tags=["forms", "responses"] ) async def get(self, request: Request) -> JSONResponse: """Returns all form responses by form ID.""" - if not await request.state.db.forms.find_one( - {"_id": request.path_params["form_id"]} - ): + form_id = request.path_params["form_id"] + try: + if not await discord.verify_response_access(form_id, request): + return JSONResponse({"error": "unauthorized"}, 401) + except discord.FormNotFoundError: return JSONResponse({"error": "not_found"}, 404) cursor = request.state.db.responses.find( - {"form_id": request.path_params["form_id"]} + {"form_id": form_id} ) responses = [ FormResponse(**response) for response in await cursor.to_list(None) |