diff options
Diffstat (limited to 'backend')
-rw-r--r-- | backend/authentication/user.py | 4 | ||||
-rw-r--r-- | backend/routes/forms/submit.py | 23 | ||||
-rw-r--r-- | backend/routes/forms/unittesting.py | 94 |
3 files changed, 84 insertions, 37 deletions
diff --git a/backend/authentication/user.py b/backend/authentication/user.py index 6256cae..cd5a249 100644 --- a/backend/authentication/user.py +++ b/backend/authentication/user.py @@ -38,6 +38,10 @@ class User(BaseUser): return f"<@{self.payload['id']}>" @property + def user_id(self) -> str: + return str(self.payload["id"]) + + @property def decoded_token(self) -> dict[str, any]: return jwt.decode(self.token, SECRET_KEY, algorithms=["HS256"]) diff --git a/backend/routes/forms/submit.py b/backend/routes/forms/submit.py index 5c500b5..db5f4e4 100644 --- a/backend/routes/forms/submit.py +++ b/backend/routes/forms/submit.py @@ -10,6 +10,8 @@ import uuid from typing import Any, Optional import httpx +import pymongo.database +import sentry_sdk from pydantic import ValidationError from pydantic.main import BaseModel from spectree import Response @@ -23,7 +25,7 @@ from backend.models import Form, FormResponse from backend.route import Route from backend.routes.auth.authorize import set_response_token from backend.routes.forms.discover import AUTH_FORM -from backend.routes.forms.unittesting import execute_unittest +from backend.routes.forms.unittesting import BypassDetectedError, execute_unittest from backend.validation import ErrorMessage, api HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify" @@ -193,7 +195,22 @@ class SubmitForm(Route): # Run unittests if needed if any("unittests" in question.data for question in form.questions): - unittest_results = await execute_unittest(response_obj, form) + unittest_results, errors = await execute_unittest(response_obj, form) + + if len(errors): + username = getattr(request.user, "user_id", "Unknown") + sentry_sdk.capture_exception(BypassDetectedError( + f"Detected unittest bypass attempt on form {form.id} by {username}. " + f"Submission has been written to reporting database ({response_obj.id})." + )) + database: pymongo.database.Database = request.state.db + await database.get_collection("violations").insert_one({ + "user": username, + "bypasses": [error.args[0] for error in errors], + "submission": response_obj.dict(by_alias=True), + "timestamp": response_obj.timestamp, + "id": response_obj.id, + }) failures = [] status_code = 422 @@ -210,6 +227,8 @@ class SubmitForm(Route): failure_names = ["Could not parse user code."] elif test.return_code == 6: failure_names = ["Could not load user code."] + elif test.return_code == 10: + failure_names = ["Bypass detected."] else: failure_names = ["Internal error."] diff --git a/backend/routes/forms/unittesting.py b/backend/routes/forms/unittesting.py index 2bfeaa9..1b042e4 100644 --- a/backend/routes/forms/unittesting.py +++ b/backend/routes/forms/unittesting.py @@ -13,6 +13,10 @@ with open("resources/unittest_template.py") as file: TEST_TEMPLATE = file.read() +class BypassDetectedError(Exception): + """Detected an attempt at bypassing the unittests.""" + + UnittestResult = namedtuple( "UnittestResult", "question_id question_index return_code passed result" ) @@ -65,9 +69,12 @@ async def _post_eval(code: str) -> dict[str, str]: return response.json() -async def execute_unittest(form_response: FormResponse, form: Form) -> list[UnittestResult]: +async def execute_unittest( + form_response: FormResponse, form: Form +) -> tuple[list[UnittestResult], list[BypassDetectedError]]: """Execute all the unittests in this form and return the results.""" unittest_results = [] + errors = [] for index, question in enumerate(form.questions): if question.type == "code": @@ -101,40 +108,57 @@ async def execute_unittest(form_response: FormResponse, form: Form) -> list[Unit code = code.replace("### UNIT CODE", unit_code) try: - response = await _post_eval(code) - except HTTPStatusError: - return_code = 99 - result = "Unable to contact code runner." - else: - return_code = int(response["returncode"]) - - # Parse the stdout if the tests ran successfully - if return_code == 0: - stdout = response["stdout"] - passed = bool(int(stdout[0])) - - # If the test failed, we have to populate the result string. - if not passed: - failed_tests = stdout[1:].strip().split(";") - - # Redact failed hidden tests - for i, failed_test in enumerate(failed_tests.copy()): - if failed_test in hidden_tests: - failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}" - - result = ";".join(failed_tests) - else: - result = "" - elif return_code in (5, 6, 99): - result = response["stdout"] - # Killed by NsJail - elif return_code == 137: - return_code = 7 - result = "Timed out or ran out of memory." - # Another code has been returned by CPython because of another failure. - else: + try: + response = await _post_eval(code) + except HTTPStatusError: return_code = 99 - result = "Internal error." + result = "Unable to contact code runner." + else: + return_code = int(response["returncode"]) + + # Parse the stdout if the tests ran successfully + if return_code == 0: + stdout = response["stdout"] + try: + passed = bool(int(stdout[0])) + except ValueError: + raise BypassDetectedError("Detected a bypass when reading result code.") + + if passed and stdout.strip() != "1": + # Most likely a bypass attempt + # A 1 was written to stdout to indicate success, + # followed by the actual output + raise BypassDetectedError( + "Detected improper value for stdout in unittest." + ) + + # If the test failed, we have to populate the result string. + elif not passed: + failed_tests = stdout[1:].strip().split(";") + + # Redact failed hidden tests + for i, failed_test in enumerate(failed_tests.copy()): + if failed_test in hidden_tests: + failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}" + + result = ";".join(failed_tests) + else: + result = "" + elif return_code in (5, 6, 99): + result = response["stdout"] + # Killed by NsJail + elif return_code == 137: + return_code = 7 + result = "Timed out or ran out of memory." + # Another code has been returned by CPython because of another failure. + else: + return_code = 99 + result = "Internal error." + except BypassDetectedError as error: + return_code = 10 + result = "Bypass attempt detected, aborting." + errors.append(error) + passed = False unittest_results.append(UnittestResult( question_id=question.id, @@ -144,4 +168,4 @@ async def execute_unittest(form_response: FormResponse, form: Form) -> list[Unit result=result )) - return unittest_results + return unittest_results, errors |