diff options
| -rw-r--r-- | backend/routes/forms/condorcet.py | 99 | 
1 files changed, 99 insertions, 0 deletions
| diff --git a/backend/routes/forms/condorcet.py b/backend/routes/forms/condorcet.py new file mode 100644 index 0000000..1b216a1 --- /dev/null +++ b/backend/routes/forms/condorcet.py @@ -0,0 +1,99 @@ +"""Calculate the condorcet winner for a given question on a poll.""" + +from condorcet import CondorcetEvaluator +from pydantic import BaseModel +from spectree import Response +from starlette import exceptions +from starlette.authentication import requires +from starlette.requests import Request +from starlette.responses import JSONResponse + +from backend import discord +from backend.models import Form, FormResponse, Question +from backend.route import Route +from backend.validation import api + + +class CondorcetResponse(BaseModel): +    question: Question +    winners: list[str] +    rest_of_table: dict + + +class InvalidCondorcetRequest(exceptions.HTTPException): +    """The request for a condorcet calculation was invalid.""" + + +def reprocess_vote_object(vote: dict[str, int | None], number_options: int) -> dict[str, int]: +    """Reprocess votes so all no-preference votes are re-ranked as last (equivalent in Condorcet).""" +    vote_object = {} + +    for option, score in vote.items(): +        vote_object[option] = score or number_options + +    return vote_object + + +class Condorcet(Route): +    """Run a condorcet calculation on the given question on a form.""" + +    name = "form_condorcet" +    path = "/{form_id:str}/condorcet/{question_id:str}" + +    @requires(["authenticated"]) +    @api.validate( +        resp=Response(HTTP_200=CondorcetResponse), +        tags=["forms", "responses", "condorcet"], +    ) +    async def get(self, request: Request) -> JSONResponse: +        """ +        Run and return the condorcet winner for a poll. + +        Optionally takes a `?winners=` parameter specifying the number of winners to calculate. +        """ +        form_id = request.path_params["form_id"] +        question_id = request.path_params["question_id"] +        num_winners = request.query_params.get("winners", "1") + +        try: +            num_winners = int(num_winners) +        except ValueError: +            raise InvalidCondorcetRequest(detail="Invalid number of winners", status_code=400) + +        await discord.verify_response_access(form_id, request) + +        # We can assume we have a form now because verify_response_access +        # checks for form existence. +        form_data = Form(**(await request.state.db.forms.find_one({"_id": form_id}))) + +        questions = [question for question in form_data.questions if question.id == question_id] + +        if len(questions) != 1: +            raise InvalidCondorcetRequest(detail="Question not found", status_code=400) + +        question = questions[0] + +        if num_winners > len(question.data["options"]): +            raise InvalidCondorcetRequest( +                detail="Requested more winners than there are candidates", status_code=400 +            ) + +        cursor = request.state.db.responses.find( +            {"form_id": form_id}, +        ) +        responses = [FormResponse(**response) for response in await cursor.to_list(None)] + +        votes = [ +            reprocess_vote_object(response.response[question_id], len(question.data["options"])) +            for response in responses +        ] + +        evaluator = CondorcetEvaluator(candidates=question.data["options"], votes=votes) + +        winners, rest_of_table = evaluator.get_n_winners(num_winners) + +        return JSONResponse({ +            "question": question.dict(), +            "winners": winners, +            "rest_of_table": rest_of_table, +        }) | 
