diff options
| author | 2021-02-28 14:41:47 +0300 | |
|---|---|---|
| committer | 2021-02-28 14:41:47 +0300 | |
| commit | f15dc347713147fe7d69ac1e866a178f63df3a07 (patch) | |
| tree | 20fabe2c7f2d8e915211ef56b75bec78bc55d010 /backend/routes | |
| parent | Bump uvicorn from 0.13.3 to 0.13.4 (diff) | |
| parent | Merge pull request #63 from python-discord/feat/9/unittest-validation (diff) | |
Merge branch 'main' into dependabot/pip/uvicorn-0.13.4
Diffstat (limited to 'backend/routes')
| -rw-r--r-- | backend/routes/forms/form.py | 6 | ||||
| -rw-r--r-- | backend/routes/forms/submit.py | 25 | ||||
| -rw-r--r-- | backend/routes/forms/unittesting.py | 127 | 
3 files changed, 155 insertions, 3 deletions
| diff --git a/backend/routes/forms/form.py b/backend/routes/forms/form.py index b6b722e..dd1c83f 100644 --- a/backend/routes/forms/form.py +++ b/backend/routes/forms/form.py @@ -10,6 +10,7 @@ from starlette.responses import JSONResponse  from backend.models import Form  from backend.route import Route +from backend.routes.forms.unittesting import filter_unittests  from backend.validation import ErrorMessage, OkayResponse, api @@ -26,7 +27,7 @@ class SingleForm(Route):      @api.validate(resp=Response(HTTP_200=Form, HTTP_404=ErrorMessage), tags=["forms"])      async def get(self, request: Request) -> JSONResponse:          """Returns single form information by ID.""" -        admin = request.user.payload["admin"] if request.user.is_authenticated else False  # noqa +        admin = request.user.payload["admin"] if request.user.is_authenticated else False          filters = {              "_id": request.path_params["form_id"] @@ -37,6 +38,9 @@ class SingleForm(Route):          if raw_form := await request.state.db.forms.find_one(filters):              form = Form(**raw_form) +            if not admin: +                form = filter_unittests(form) +              return JSONResponse(form.dict(admin=admin))          return JSONResponse({"error": "not_found"}, status_code=404) diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py index d8e6d35..b3a6afd 100644 --- a/backend/routes/forms/submit.py +++ b/backend/routes/forms/submit.py @@ -18,6 +18,7 @@ from starlette.responses import JSONResponse  from backend.constants import FRONTEND_URL, FormFeatures, HCAPTCHA_API_SECRET  from backend.models import Form, FormResponse  from backend.route import Route +from backend.routes.forms.unittesting import execute_unittest  from backend.validation import AuthorizationHeaders, ErrorMessage, api  HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify" @@ -99,7 +100,10 @@ class SubmitForm(Route):                  if request.user.is_authenticated:                      response["user"] = request.user.payload -                    if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]:  # noqa +                    if ( +                            FormFeatures.COLLECT_EMAIL.value in form.features +                            and "email" not in response["user"] +                    ):                          return JSONResponse({                              "error": "email_required"                          }, status_code=400) @@ -127,6 +131,23 @@ class SubmitForm(Route):              except ValidationError as e:                  return JSONResponse(e.errors(), status_code=422) +            # Run unittests if needed +            if any("unittests" in question.data for question in form.questions): +                unittest_results = await execute_unittest(response_obj, form) + +                if not all(test.passed for test in unittest_results): +                    # Return 500 if we encountered an internal error (code 99). +                    status_code = 500 if any( +                        test.return_code == 99 for test in unittest_results +                    ) else 403 + +                    return JSONResponse({ +                        "error": "failed_tests", +                        "test_results": [ +                            test._asdict() for test in unittest_results if not test.passed +                        ] +                    }, status_code=status_code) +              await request.state.db.responses.insert_one(                  response_obj.dict(by_alias=True)              ) @@ -172,7 +193,7 @@ class SubmitForm(Route):          embed = {              "title": "New Form Response",              "description": f"{mention} submitted a response to `{form.name}`.", -            "url": f"{FRONTEND_URL}/path_to_view_form/{response.id}",  # noqa # TODO: Enter Form View URL +            "url": f"{FRONTEND_URL}/path_to_view_form/{response.id}",  # TODO: Enter Form View URL              "timestamp": response.timestamp,              "color": 7506394,          } diff --git a/backend/routes/forms/unittesting.py b/backend/routes/forms/unittesting.py new file mode 100644 index 0000000..3854314 --- /dev/null +++ b/backend/routes/forms/unittesting.py @@ -0,0 +1,127 @@ +import base64 +from collections import namedtuple +from itertools import count +from textwrap import indent + +import httpx +from httpx import HTTPStatusError + +from backend.constants import SNEKBOX_URL +from backend.models import FormResponse, Form + +with open("resources/unittest_template.py") as file: +    TEST_TEMPLATE = file.read() + + +UnittestResult = namedtuple("UnittestResult", "question_id return_code passed result") + + +def filter_unittests(form: Form) -> Form: +    """ +    Replace the unittest data section of code questions with the number of test cases. + +    This is used to redact the exact tests when sending the form back to the frontend. +    """ +    for question in form.questions: +        if question.type == "code" and "unittests" in question.data: +            question.data["unittests"] = len(question.data["unittests"]) + +    return form + + +def _make_unit_code(units: dict[str, str]) -> str: +    """Compose a dict mapping unit names to their code into an actual class body.""" +    result = "" + +    for unit_name, unit_code in units.items(): +        result += ( +            f"\ndef test_{unit_name.lstrip('#')}(unit):"  # Function definition +            f"\n{indent(unit_code, '    ')}"  # Unit code +        ) + +    return indent(result, "    ") + + +def _make_user_code(code: str) -> str: +    """Compose the user code into an actual base64-encoded string variable.""" +    code = base64.b64encode(code.encode("utf8")).decode("utf8") +    return f'USER_CODE = b"{code}"' + + +async def _post_eval(code: str) -> dict[str, str]: +    """Post the eval to snekbox and return the response.""" +    async with httpx.AsyncClient() as client: +        data = {"input": code} +        response = await client.post(SNEKBOX_URL, json=data, timeout=10) + +        response.raise_for_status() +        return response.json() + + +async def execute_unittest(form_response: FormResponse, form: Form) -> list[UnittestResult]: +    """Execute all the unittests in this form and return the results.""" +    unittest_results = [] + +    for question in form.questions: +        if question.type == "code" and "unittests" in question.data: +            passed = False + +            # Tests starting with an hashtag should have censored names. +            hidden_test_counter = count(1) +            hidden_tests = { +                test.lstrip("#").lstrip("test_"): next(hidden_test_counter) +                for test in question.data["unittests"].keys() +                if test.startswith("#") +            } + +            # Compose runner code +            unit_code = _make_unit_code(question.data["unittests"]) +            user_code = _make_user_code(form_response.response[question.id]) + +            code = TEST_TEMPLATE.replace("### USER CODE", user_code) +            code = code.replace("### UNIT CODE", unit_code) + +            try: +                response = await _post_eval(code) +            except HTTPStatusError: +                return_code = 99 +                result = "Unable to contact code runner." +            else: +                return_code = int(response["returncode"]) + +                # Parse the stdout if the tests ran successfully +                if return_code == 0: +                    stdout = response["stdout"] +                    passed = bool(int(stdout[0])) + +                    # If the test failed, we have to populate the result string. +                    if not passed: +                        failed_tests = stdout[1:].strip().split(";") + +                        # Redact failed hidden tests +                        for i, failed_test in enumerate(failed_tests.copy()): +                            if failed_test in hidden_tests: +                                failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}" + +                        result = ";".join(failed_tests) +                    else: +                        result = "" +                elif return_code in (5, 6, 99): +                    result = response["stdout"] +                # Killed by NsJail +                elif return_code == 137: +                    return_code = 7 +                    result = "Timed out or ran out of memory." +                # Another code has been returned by CPython because of another failure. +                else: +                    return_code = 99 +                    result = "Internal error." + +            unittest_results.append(UnittestResult( +                question_id=question.id, +                return_code=return_code, +                passed=passed, +                result=result +            )) + +    return unittest_results | 
