aboutsummaryrefslogtreecommitdiffstats
path: root/backend/routes/forms/unittesting.py
diff options
context:
space:
mode:
authorGravatar Hassan Abouelela <[email protected]>2021-02-28 14:26:39 +0300
committerGravatar GitHub <[email protected]>2021-02-28 14:26:39 +0300
commitcf13799c32d4c9ef9098215fc103f4a144077581 (patch)
tree8538fe46d101fb2f953df7fb19da93c10643dc88 /backend/routes/forms/unittesting.py
parentMerge pull request #64 from python-discord/abouelela-codeowners (diff)
parentMerge branch 'main' into feat/9/unittest-validation (diff)
Merge pull request #63 from python-discord/feat/9/unittest-validation
Support code unit testing through snekbox
Diffstat (limited to 'backend/routes/forms/unittesting.py')
-rw-r--r--backend/routes/forms/unittesting.py127
1 files changed, 127 insertions, 0 deletions
diff --git a/backend/routes/forms/unittesting.py b/backend/routes/forms/unittesting.py
new file mode 100644
index 0000000..3854314
--- /dev/null
+++ b/backend/routes/forms/unittesting.py
@@ -0,0 +1,127 @@
+import base64
+from collections import namedtuple
+from itertools import count
+from textwrap import indent
+
+import httpx
+from httpx import HTTPStatusError
+
+from backend.constants import SNEKBOX_URL
+from backend.models import FormResponse, Form
+
+with open("resources/unittest_template.py") as file:
+ TEST_TEMPLATE = file.read()
+
+
+UnittestResult = namedtuple("UnittestResult", "question_id return_code passed result")
+
+
+def filter_unittests(form: Form) -> Form:
+ """
+ Replace the unittest data section of code questions with the number of test cases.
+
+ This is used to redact the exact tests when sending the form back to the frontend.
+ """
+ for question in form.questions:
+ if question.type == "code" and "unittests" in question.data:
+ question.data["unittests"] = len(question.data["unittests"])
+
+ return form
+
+
+def _make_unit_code(units: dict[str, str]) -> str:
+ """Compose a dict mapping unit names to their code into an actual class body."""
+ result = ""
+
+ for unit_name, unit_code in units.items():
+ result += (
+ f"\ndef test_{unit_name.lstrip('#')}(unit):" # Function definition
+ f"\n{indent(unit_code, ' ')}" # Unit code
+ )
+
+ return indent(result, " ")
+
+
+def _make_user_code(code: str) -> str:
+ """Compose the user code into an actual base64-encoded string variable."""
+ code = base64.b64encode(code.encode("utf8")).decode("utf8")
+ return f'USER_CODE = b"{code}"'
+
+
+async def _post_eval(code: str) -> dict[str, str]:
+ """Post the eval to snekbox and return the response."""
+ async with httpx.AsyncClient() as client:
+ data = {"input": code}
+ response = await client.post(SNEKBOX_URL, json=data, timeout=10)
+
+ response.raise_for_status()
+ return response.json()
+
+
+async def execute_unittest(form_response: FormResponse, form: Form) -> list[UnittestResult]:
+ """Execute all the unittests in this form and return the results."""
+ unittest_results = []
+
+ for question in form.questions:
+ if question.type == "code" and "unittests" in question.data:
+ passed = False
+
+ # Tests starting with an hashtag should have censored names.
+ hidden_test_counter = count(1)
+ hidden_tests = {
+ test.lstrip("#").lstrip("test_"): next(hidden_test_counter)
+ for test in question.data["unittests"].keys()
+ if test.startswith("#")
+ }
+
+ # Compose runner code
+ unit_code = _make_unit_code(question.data["unittests"])
+ user_code = _make_user_code(form_response.response[question.id])
+
+ code = TEST_TEMPLATE.replace("### USER CODE", user_code)
+ code = code.replace("### UNIT CODE", unit_code)
+
+ try:
+ response = await _post_eval(code)
+ except HTTPStatusError:
+ return_code = 99
+ result = "Unable to contact code runner."
+ else:
+ return_code = int(response["returncode"])
+
+ # Parse the stdout if the tests ran successfully
+ if return_code == 0:
+ stdout = response["stdout"]
+ passed = bool(int(stdout[0]))
+
+ # If the test failed, we have to populate the result string.
+ if not passed:
+ failed_tests = stdout[1:].strip().split(";")
+
+ # Redact failed hidden tests
+ for i, failed_test in enumerate(failed_tests.copy()):
+ if failed_test in hidden_tests:
+ failed_tests[i] = f"hidden_test_{hidden_tests[failed_test]}"
+
+ result = ";".join(failed_tests)
+ else:
+ result = ""
+ elif return_code in (5, 6, 99):
+ result = response["stdout"]
+ # Killed by NsJail
+ elif return_code == 137:
+ return_code = 7
+ result = "Timed out or ran out of memory."
+ # Another code has been returned by CPython because of another failure.
+ else:
+ return_code = 99
+ result = "Internal error."
+
+ unittest_results.append(UnittestResult(
+ question_id=question.id,
+ return_code=return_code,
+ passed=passed,
+ result=result
+ ))
+
+ return unittest_results