diff options
Diffstat (limited to 'backend/routes')
| -rw-r--r-- | backend/routes/forms/submit.py | 23 | ||||
| -rw-r--r-- | backend/routes/forms/unittesting.py | 94 | 
2 files changed, 80 insertions, 37 deletions
diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py index 5c500b5..db5f4e4 100644 --- a/backend/routes/forms/submit.py +++ b/backend/routes/forms/submit.py @@ -10,6 +10,8 @@ import uuid  from typing import Any, Optional  import httpx +import pymongo.database +import sentry_sdk  from pydantic import ValidationError  from pydantic.main import BaseModel  from spectree import Response @@ -23,7 +25,7 @@ from backend.models import Form, FormResponse  from backend.route import Route  from backend.routes.auth.authorize import set_response_token  from backend.routes.forms.discover import AUTH_FORM -from backend.routes.forms.unittesting import execute_unittest +from backend.routes.forms.unittesting import BypassDetectedError, execute_unittest  from backend.validation import ErrorMessage, api  HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify" @@ -193,7 +195,22 @@ class SubmitForm(Route):              # Run unittests if needed              if any("unittests" in question.data for question in form.questions): -                unittest_results = await execute_unittest(response_obj, form) +                unittest_results, errors = await execute_unittest(response_obj, form) + +                if len(errors): +                    username = getattr(request.user, "user_id", "Unknown") +                    sentry_sdk.capture_exception(BypassDetectedError( +                        f"Detected unittest bypass attempt on form {form.id} by {username}. " +                        f"Submission has been written to reporting database ({response_obj.id})." +                    )) +                    database: pymongo.database.Database = request.state.db +                    await database.get_collection("violations").insert_one({ +                        "user": username, +                        "bypasses": [error.args[0] for error in errors], +                        "submission": response_obj.dict(by_alias=True), +                        "timestamp": response_obj.timestamp, +                        "id": response_obj.id, +                    })                  failures = []                  status_code = 422 @@ -210,6 +227,8 @@ class SubmitForm(Route):                          failure_names = ["Could not parse user code."]                      elif test.return_code == 6:                          failure_names = ["Could not load user code."] +                    elif test.return_code == 10: +                        failure_names = ["Bypass detected."]                      else:                          failure_names = ["Internal error."] diff --git a/backend/routes/forms/unittesting.py b/backend/routes/forms/unittesting.py index 2bfeaa9..1b042e4 100644 --- a/backend/routes/forms/unittesting.py +++ b/backend/routes/forms/unittesting.py @@ -13,6 +13,10 @@ with open("resources/unittest_template.py") as file:      TEST_TEMPLATE = file.read() +class BypassDetectedError(Exception): +    """Detected an attempt at bypassing the unittests.""" + +  UnittestResult = namedtuple(      "UnittestResult", "question_id question_index return_code passed result"  ) @@ -65,9 +69,12 @@ async def _post_eval(code: str) -> dict[str, str]:          return response.json() -async def execute_unittest(form_response: FormResponse, form: Form) -> list[UnittestResult]: +async def execute_unittest( +        form_response: FormResponse, form: Form +) -> tuple[list[UnittestResult], list[BypassDetectedError]]:      """Execute all the unittests in this form and return the results."""      unittest_results = [] +    errors = []      for index, question in enumerate(form.questions):          if question.type == "code": @@ -101,40 +108,57 @@ async def execute_unittest(form_response: FormResponse, form: Form) -> list[Unit              code = code.replace("### UNIT CODE", unit_code)              try: -                response = await _post_eval(code) -            except HTTPStatusError: -                return_code = 99 -                result = "Unable to contact code runner." -            else: -                return_code = int(response["returncode"]) - -                # Parse the stdout if the tests ran successfully -                if return_code == 0: -                    stdout = response["stdout"] -                    passed = bool(int(stdout[0])) - -                    # If the test failed, we have to populate the result string. -                    if not passed: -                        failed_tests = stdout[1:].strip().split(";") - -                        # Redact failed hidden tests -                        for i, failed_test in enumerate(failed_tests.copy()): -                            if failed_test in hidden_tests: -                                failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}" - -                        result = ";".join(failed_tests) -                    else: -                        result = "" -                elif return_code in (5, 6, 99): -                    result = response["stdout"] -                # Killed by NsJail -                elif return_code == 137: -                    return_code = 7 -                    result = "Timed out or ran out of memory." -                # Another code has been returned by CPython because of another failure. -                else: +                try: +                    response = await _post_eval(code) +                except HTTPStatusError:                      return_code = 99 -                    result = "Internal error." +                    result = "Unable to contact code runner." +                else: +                    return_code = int(response["returncode"]) + +                    # Parse the stdout if the tests ran successfully +                    if return_code == 0: +                        stdout = response["stdout"] +                        try: +                            passed = bool(int(stdout[0])) +                        except ValueError: +                            raise BypassDetectedError("Detected a bypass when reading result code.") + +                        if passed and stdout.strip() != "1": +                            # Most likely a bypass attempt +                            # A 1 was written to stdout to indicate success, +                            # followed by the actual output +                            raise BypassDetectedError( +                                "Detected improper value for stdout in unittest." +                            ) + +                        # If the test failed, we have to populate the result string. +                        elif not passed: +                            failed_tests = stdout[1:].strip().split(";") + +                            # Redact failed hidden tests +                            for i, failed_test in enumerate(failed_tests.copy()): +                                if failed_test in hidden_tests: +                                    failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}" + +                            result = ";".join(failed_tests) +                        else: +                            result = "" +                    elif return_code in (5, 6, 99): +                        result = response["stdout"] +                    # Killed by NsJail +                    elif return_code == 137: +                        return_code = 7 +                        result = "Timed out or ran out of memory." +                    # Another code has been returned by CPython because of another failure. +                    else: +                        return_code = 99 +                        result = "Internal error." +            except BypassDetectedError as error: +                return_code = 10 +                result = "Bypass attempt detected, aborting." +                errors.append(error) +                passed = False              unittest_results.append(UnittestResult(                  question_id=question.id, @@ -144,4 +168,4 @@ async def execute_unittest(form_response: FormResponse, form: Form) -> list[Unit                  result=result              )) -    return unittest_results +    return unittest_results, errors  |