diff options
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/__init__.py | 1 | ||||
| -rw-r--r-- | tests/api/__init__.py | 23 | ||||
| -rw-r--r-- | tests/api/test_eval.py | 49 | ||||
| -rw-r--r-- | tests/test_nsjail.py | 124 | ||||
| -rw-r--r-- | tests/test_snekbox.py | 60 | 
5 files changed, 196 insertions, 61 deletions
diff --git a/tests/__init__.py b/tests/__init__.py index 792d600..e69de29 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1 +0,0 @@ -# diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000..dcee5b5 --- /dev/null +++ b/tests/api/__init__.py @@ -0,0 +1,23 @@ +from subprocess import CompletedProcess +from unittest import mock + +from falcon import testing + +from snekbox.api import SnekAPI + + +class SnekAPITestCase(testing.TestCase): +    def setUp(self): +        super().setUp() + +        self.patcher = mock.patch("snekbox.api.resources.eval.NsJail", autospec=True) +        self.mock_nsjail = self.patcher.start() +        self.mock_nsjail.return_value.python3.return_value = CompletedProcess( +            args=[], +            returncode=0, +            stdout="output", +            stderr="error" +        ) +        self.addCleanup(self.patcher.stop) + +        self.app = SnekAPI() diff --git a/tests/api/test_eval.py b/tests/api/test_eval.py new file mode 100644 index 0000000..3350763 --- /dev/null +++ b/tests/api/test_eval.py @@ -0,0 +1,49 @@ +from tests.api import SnekAPITestCase + + +class TestEvalResource(SnekAPITestCase): +    PATH = "/eval" + +    def test_post_valid_200(self): +        body = {"input": "foo"} +        result = self.simulate_post(self.PATH, json=body) + +        self.assertEqual(result.status_code, 200) +        self.assertEqual("output", result.json["stdout"]) +        self.assertEqual(0, result.json["returncode"]) + +    def test_post_invalid_schema_400(self): +        body = {"stuff": "foo"} +        result = self.simulate_post(self.PATH, json=body) + +        self.assertEqual(result.status_code, 400) + +        expected = { +            "title": "Request data failed validation", +            "description": "'input' is a required property" +        } + +        self.assertEqual(expected, result.json) + +    def test_post_invalid_content_type_415(self): +        body = "{'input': 'foo'}" +        headers = {"Content-Type": "application/xml"} +        result = self.simulate_post(self.PATH, body=body, headers=headers) + +        self.assertEqual(result.status_code, 415) + +        expected = { +            "title": "Unsupported media type", +            "description": "application/xml is an unsupported media type." +        } + +        self.assertEqual(expected, result.json) + +    def test_disallowed_method_405(self): +        result = self.simulate_get(self.PATH) +        self.assertEqual(result.status_code, 405) + +    def test_options_allow_post_only(self): +        result = self.simulate_options(self.PATH) +        self.assertEqual(result.status_code, 200) +        self.assertEqual(result.headers.get("Allow"), "POST") diff --git a/tests/test_nsjail.py b/tests/test_nsjail.py new file mode 100644 index 0000000..bb176d9 --- /dev/null +++ b/tests/test_nsjail.py @@ -0,0 +1,124 @@ +import logging +import unittest +from textwrap import dedent + +from snekbox.nsjail import MEM_MAX, NsJail + + +class NsJailTests(unittest.TestCase): +    def setUp(self): +        super().setUp() + +        self.nsjail = NsJail() +        self.nsjail.DEBUG = False +        self.logger = logging.getLogger("snekbox.nsjail") + +    def test_print_returns_0(self): +        result = self.nsjail.python3("print('test')") +        self.assertEqual(result.returncode, 0) +        self.assertEqual(result.stdout, "test\n") +        self.assertEqual(result.stderr, None) + +    def test_timeout_returns_137(self): +        code = dedent(""" +            while True: +                pass +        """).strip() + +        with self.assertLogs(self.logger) as log: +            result = self.nsjail.python3(code) + +        self.assertEqual(result.returncode, 137) +        self.assertEqual(result.stdout, "") +        self.assertEqual(result.stderr, None) +        self.assertIn("run time >= time limit", "\n".join(log.output)) + +    def test_memory_returns_137(self): +        # Add a kilobyte just to be safe. +        code = dedent(f""" +            x = ' ' * {MEM_MAX + 1000} +        """).strip() + +        result = self.nsjail.python3(code) +        self.assertEqual(result.returncode, 137) +        self.assertEqual(result.stdout, "") +        self.assertEqual(result.stderr, None) + +    def test_subprocess_resource_unavailable(self): +        code = dedent(""" +            import subprocess +            print(subprocess.check_output('kill -9 6', shell=True).decode()) +        """).strip() + +        result = self.nsjail.python3(code) +        self.assertEqual(result.returncode, 1) +        self.assertIn("Resource temporarily unavailable", result.stdout) +        self.assertEqual(result.stderr, None) + +    def test_read_only_file_system(self): +        code = dedent(""" +            open('hello', 'w').write('world') +        """).strip() + +        result = self.nsjail.python3(code) +        self.assertEqual(result.returncode, 1) +        self.assertIn("Read-only file system", result.stdout) +        self.assertEqual(result.stderr, None) + +    def test_forkbomb_resource_unavailable(self): +        code = dedent(""" +            import os +            while 1: +                os.fork() +        """).strip() + +        result = self.nsjail.python3(code) +        self.assertEqual(result.returncode, 1) +        self.assertIn("Resource temporarily unavailable", result.stdout) +        self.assertEqual(result.stderr, None) + +    def test_sigsegv_returns_139(self):  # In honour of Juan. +        code = dedent(""" +            import ctypes +            ctypes.string_at(0) +        """).strip() + +        result = self.nsjail.python3(code) +        self.assertEqual(result.returncode, 139) +        self.assertEqual(result.stdout, "") +        self.assertEqual(result.stderr, None) + +    def test_null_byte_value_error(self): +        result = self.nsjail.python3("\0") +        self.assertEqual(result.returncode, None) +        self.assertEqual(result.stdout, "ValueError: embedded null byte") +        self.assertEqual(result.stderr, None) + +    def test_log_parser(self): +        log_lines = ( +            "[D][2019-06-22T20:07:00+0000][16] void foo::bar()():100 This is a debug message.", +            "[I][2019-06-22T20:07:48+0000] pid=20 ([STANDALONE MODE]) " +            "exited with status: 2, (PIDs left: 0)", +            "[W][2019-06-22T20:06:04+0000][14] void cmdline::logParams(nsjconf_t*)():250 " +            "Process will be UID/EUID=0 in the global user namespace, and will have user " +            "root-level access to files", +            "[W][2019-06-22T20:07:00+0000][16] void foo::bar()():100 This is a warning!", +            "[E][2019-06-22T20:07:00+0000][16] bool " +            "cmdline::setupArgv(nsjconf_t*, int, char**, int)():316 No command-line provided", +            "[F][2019-06-22T20:07:00+0000][16] int main(int, char**)():204 " +            "Couldn't parse cmdline options", +            "Invalid Line" +        ) + +        with self.assertLogs(self.logger, logging.DEBUG) as log: +            self.nsjail._parse_log(log_lines) + +        self.assertIn("DEBUG:snekbox.nsjail:This is a debug message.", log.output) +        self.assertIn("ERROR:snekbox.nsjail:Couldn't parse cmdline options", log.output) +        self.assertIn("ERROR:snekbox.nsjail:No command-line provided", log.output) +        self.assertIn("WARNING:snekbox.nsjail:Failed to parse log line 'Invalid Line'", log.output) +        self.assertIn("WARNING:snekbox.nsjail:This is a warning!", log.output) +        self.assertIn( +            "INFO:snekbox.nsjail:pid=20 ([STANDALONE MODE]) exited with status: 2, (PIDs left: 0)", +            log.output +        ) diff --git a/tests/test_snekbox.py b/tests/test_snekbox.py deleted file mode 100644 index e2505d6..0000000 --- a/tests/test_snekbox.py +++ /dev/null @@ -1,60 +0,0 @@ -import unittest -import pytest -import os -import json - -from snekbox import Snekbox -from rmq import Rmq - -r = Rmq() - -snek = Snekbox() - - -class SnekTests(unittest.TestCase): -    def test_nsjail(self): -        result = snek.python3('print("test")') -        self.assertEquals(result.strip(), 'test') - -    # def test_memory_error(self): -    #     code = ('x = "*"\n' -    #             'while True:\n' -    #             '    x = x * 99\n') -    #     result = snek.python3(code) -    #     self.assertEquals(result.strip(), 'timed out or memory limit exceeded') - -    def test_timeout(self): -        code = ('x = "*"\n' -        'while True:\n' -        '    try:\n' -        '        x = x * 99\n' -        '    except:\n' -        '        continue\n') - -        result = snek.python3(code) -        self.assertEquals(result.strip(), 'timed out or memory limit exceeded') - -    def test_kill(self): -        code = ('import subprocess\n' -                'print(subprocess.check_output("kill -9 6", shell=True).decode())') -        result = snek.python3(code) -        if 'ModuleNotFoundError' in result.strip(): -            self.assertIn('ModuleNotFoundError', result.strip()) -        else: -            self.assertIn('(PIDs left: 0)', result.strip()) - -    def test_forkbomb(self): -        code = ('import os\n' -                'while 1:\n' -                '    os.fork()') -        result = snek.python3(code) -        self.assertIn('Resource temporarily unavailable', result.strip()) - -    def test_juan_golf(self):  # in honour of Juan -        code = ("func = lambda: None\n" -                "CodeType = type(func.__code__)\n" -                "bytecode = CodeType(0,1,0,0,0,b'',(),(),(),'','',1,b'')\n" -                "exec(bytecode)") - -        result = snek.python3(code) -        self.assertEquals('unknown error, code: 111', result.strip())  |