diff options
author | 2021-02-28 14:47:11 +0300 | |
---|---|---|
committer | 2021-02-28 14:49:42 +0300 | |
commit | d1c229940248b7fd3a82713a125337d300729966 (patch) | |
tree | 395131748f039524ef34affc16045cbc12fe70e6 /backend/routes/forms/unittesting.py | |
parent | Sets Sentry SDK Environment (diff) | |
parent | Merge pull request #62 from python-discord/dependabot/pip/uvicorn-0.13.4 (diff) |
Merge branch 'main' into dependabot/pip/sentry-sdk-0.20.3
Signed-off-by: Hassan Abouelela <[email protected]>
# Conflicts:
# poetry.lock
Diffstat (limited to 'backend/routes/forms/unittesting.py')
-rw-r--r-- | backend/routes/forms/unittesting.py | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/backend/routes/forms/unittesting.py b/backend/routes/forms/unittesting.py new file mode 100644 index 0000000..3854314 --- /dev/null +++ b/backend/routes/forms/unittesting.py @@ -0,0 +1,127 @@ +import base64 +from collections import namedtuple +from itertools import count +from textwrap import indent + +import httpx +from httpx import HTTPStatusError + +from backend.constants import SNEKBOX_URL +from backend.models import FormResponse, Form + +with open("resources/unittest_template.py") as file: + TEST_TEMPLATE = file.read() + + +UnittestResult = namedtuple("UnittestResult", "question_id return_code passed result") + + +def filter_unittests(form: Form) -> Form: + """ + Replace the unittest data section of code questions with the number of test cases. + + This is used to redact the exact tests when sending the form back to the frontend. + """ + for question in form.questions: + if question.type == "code" and "unittests" in question.data: + question.data["unittests"] = len(question.data["unittests"]) + + return form + + +def _make_unit_code(units: dict[str, str]) -> str: + """Compose a dict mapping unit names to their code into an actual class body.""" + result = "" + + for unit_name, unit_code in units.items(): + result += ( + f"\ndef test_{unit_name.lstrip('#')}(unit):" # Function definition + f"\n{indent(unit_code, ' ')}" # Unit code + ) + + return indent(result, " ") + + +def _make_user_code(code: str) -> str: + """Compose the user code into an actual base64-encoded string variable.""" + code = base64.b64encode(code.encode("utf8")).decode("utf8") + return f'USER_CODE = b"{code}"' + + +async def _post_eval(code: str) -> dict[str, str]: + """Post the eval to snekbox and return the response.""" + async with httpx.AsyncClient() as client: + data = {"input": code} + response = await client.post(SNEKBOX_URL, json=data, timeout=10) + + response.raise_for_status() + return response.json() + + +async def execute_unittest(form_response: FormResponse, form: Form) -> list[UnittestResult]: + """Execute all the unittests in this form and return the results.""" + unittest_results = [] + + for question in form.questions: + if question.type == "code" and "unittests" in question.data: + passed = False + + # Tests starting with an hashtag should have censored names. + hidden_test_counter = count(1) + hidden_tests = { + test.lstrip("#").lstrip("test_"): next(hidden_test_counter) + for test in question.data["unittests"].keys() + if test.startswith("#") + } + + # Compose runner code + unit_code = _make_unit_code(question.data["unittests"]) + user_code = _make_user_code(form_response.response[question.id]) + + code = TEST_TEMPLATE.replace("### USER CODE", user_code) + code = code.replace("### UNIT CODE", unit_code) + + try: + response = await _post_eval(code) + except HTTPStatusError: + return_code = 99 + result = "Unable to contact code runner." + else: + return_code = int(response["returncode"]) + + # Parse the stdout if the tests ran successfully + if return_code == 0: + stdout = response["stdout"] + passed = bool(int(stdout[0])) + + # If the test failed, we have to populate the result string. + if not passed: + failed_tests = stdout[1:].strip().split(";") + + # Redact failed hidden tests + for i, failed_test in enumerate(failed_tests.copy()): + if failed_test in hidden_tests: + failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}" + + result = ";".join(failed_tests) + else: + result = "" + elif return_code in (5, 6, 99): + result = response["stdout"] + # Killed by NsJail + elif return_code == 137: + return_code = 7 + result = "Timed out or ran out of memory." + # Another code has been returned by CPython because of another failure. + else: + return_code = 99 + result = "Internal error." + + unittest_results.append(UnittestResult( + question_id=question.id, + return_code=return_code, + passed=passed, + result=result + )) + + return unittest_results |