diff options
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/discord.py | 38 | ||||
| -rw-r--r-- | backend/models/form.py | 12 | ||||
| -rw-r--r-- | backend/routes/forms/discover.py | 2 | ||||
| -rw-r--r-- | backend/routes/forms/response.py | 18 | ||||
| -rw-r--r-- | backend/routes/forms/responses.py | 17 | 
5 files changed, 73 insertions, 14 deletions
diff --git a/backend/discord.py b/backend/discord.py index 51de26a..4e35216 100644 --- a/backend/discord.py +++ b/backend/discord.py @@ -5,6 +5,7 @@ import json  import typing  import httpx +import starlette.requests  from pymongo.database import Database  from backend import constants, models @@ -150,3 +151,40 @@ async def get_member(          "inserted_at": datetime.datetime.now(tz=datetime.timezone.utc),      })      return member + + +class FormNotFoundError(Exception): +    """The requested form was not found.""" + + +async def _verify_access_helper( +    form_id: str, request: starlette.requests.Request, attribute: str +) -> bool: +    """A low level helper to validate access to a form resource based on the user's scopes.""" +    # Short circuit all resources for admins +    if "admin" in request.auth.scopes: +        return True + +    form = await request.state.db.forms.find_one({"id": form_id}) + +    if not form: +        raise FormNotFoundError() + +    form = models.Form(**form) + +    for role_id in getattr(form, attribute) or []: +        role = await request.state.db.roles.find_one({"id": role_id}) +        if not role: +            continue + +        role = models.DiscordRole(**json.loads(role["data"])) + +        if role.name in request.auth.scopes: +            return True + +    return False + + +async def verify_response_access(form_id: str, request: starlette.requests.Request) -> bool: +    """Ensure the user can access responses on the requested resource.""" +    return await _verify_access_helper(form_id, request, "response_readers") diff --git a/backend/models/form.py b/backend/models/form.py index f19ed85..45a7e0b 100644 --- a/backend/models/form.py +++ b/backend/models/form.py @@ -1,10 +1,10 @@  import typing as t  import httpx -from pydantic import constr, BaseModel, Field, root_validator, validator +from pydantic import BaseModel, Field, constr, root_validator, validator  from pydantic.error_wrappers import ErrorWrapper, ValidationError -from backend.constants import FormFeatures, WebHook +from backend.constants import DISCORD_GUILD, FormFeatures, WebHook  from .question import Question  PUBLIC_FIELDS = [ @@ -43,6 +43,7 @@ class Form(BaseModel):      submitted_text: t.Optional[str] = None      webhook: _WebHook = None      discord_role: t.Optional[str] +    response_readers: t.Optional[list[str]]      class Config:          allow_population_by_field_name = True @@ -67,6 +68,13 @@ class Form(BaseModel):          return value +    @validator("response_readers") +    def validate_role_scoping(cls, value: t.Optional[list[str]]): +        """Ensure special role based permissions aren't granted to the @everyone role.""" +        if value and str(DISCORD_GUILD) in value: +            raise ValueError("You can not add the everyone role as an access scope.") +        return value +      @root_validator      def validate_role(cls, values: dict[str, t.Any]) -> t.Optional[dict[str, t.Any]]:          """Validates does Discord role provided when flag provided.""" diff --git a/backend/routes/forms/discover.py b/backend/routes/forms/discover.py index d7351d5..b993075 100644 --- a/backend/routes/forms/discover.py +++ b/backend/routes/forms/discover.py @@ -29,7 +29,7 @@ EMPTY_FORM = Form(      features=__FEATURES,      questions=[__QUESTION],      name="Auth form", -    description="An empty form to help you get a token." +    description="An empty form to help you get a token.",  ) diff --git a/backend/routes/forms/response.py b/backend/routes/forms/response.py index d8d8d17..fbf8e99 100644 --- a/backend/routes/forms/response.py +++ b/backend/routes/forms/response.py @@ -1,11 +1,13 @@  """  Returns or deletes form response by ID.  """ +  from spectree import Response as RouteResponse  from starlette.authentication import requires  from starlette.requests import Request  from starlette.responses import JSONResponse +from backend import discord  from backend.models import FormResponse  from backend.route import Route  from backend.validation import ErrorMessage, OkayResponse, api @@ -17,23 +19,31 @@ class Response(Route):      name = "response"      path = "/{form_id:str}/responses/{response_id:str}" -    @requires(["authenticated", "admin"]) +    @requires(["authenticated"])      @api.validate( -        resp=RouteResponse(HTTP_200=FormResponse, HTTP_404=ErrorMessage), +        resp=RouteResponse(HTTP_200=FormResponse, HTTP_401=ErrorMessage, HTTP_404=ErrorMessage),          tags=["forms", "responses"]      )      async def get(self, request: Request) -> JSONResponse:          """Return a single form response by ID.""" +        form_id = request.path_params["form_id"] + +        try: +            if not await discord.verify_response_access(form_id, request): +                return JSONResponse({"error": "unauthorized"}, status_code=401) +        except discord.FormNotFoundError: +            return JSONResponse({"error": "form_not_found"}, status_code=404) +          if raw_response := await request.state.db.responses.find_one(              {                  "_id": request.path_params["response_id"], -                "form_id": request.path_params["form_id"] +                "form_id": form_id              }          ):              response = FormResponse(**raw_response)              return JSONResponse(response.dict())          else: -            return JSONResponse({"error": "not_found"}, status_code=404) +            return JSONResponse({"error": "response_not_found"}, status_code=404)      @requires(["authenticated", "admin"])      @api.validate( diff --git a/backend/routes/forms/responses.py b/backend/routes/forms/responses.py index f3c4cd7..1c8ebe3 100644 --- a/backend/routes/forms/responses.py +++ b/backend/routes/forms/responses.py @@ -7,9 +7,10 @@ from starlette.authentication import requires  from starlette.requests import Request  from starlette.responses import JSONResponse +from backend import discord  from backend.models import FormResponse, ResponseList  from backend.route import Route -from backend.validation import api, ErrorMessage, OkayResponse +from backend.validation import ErrorMessage, OkayResponse, api  class ResponseIdList(BaseModel): @@ -24,20 +25,22 @@ class Responses(Route):      name = "form_responses"      path = "/{form_id:str}/responses" -    @requires(["authenticated", "admin"]) +    @requires(["authenticated"])      @api.validate( -        resp=Response(HTTP_200=ResponseList, HTTP_404=ErrorMessage), +        resp=Response(HTTP_200=ResponseList, HTTP_401=ErrorMessage, HTTP_404=ErrorMessage),          tags=["forms", "responses"]      )      async def get(self, request: Request) -> JSONResponse:          """Returns all form responses by form ID.""" -        if not await request.state.db.forms.find_one( -            {"_id": request.path_params["form_id"]} -        ): +        form_id = request.path_params["form_id"] +        try: +            if not await discord.verify_response_access(form_id, request): +                return JSONResponse({"error": "unauthorized"}, 401) +        except discord.FormNotFoundError:              return JSONResponse({"error": "not_found"}, 404)          cursor = request.state.db.responses.find( -            {"form_id": request.path_params["form_id"]} +            {"form_id": form_id}          )          responses = [              FormResponse(**response) for response in await cursor.to_list(None)  |