aboutsummaryrefslogtreecommitdiffstats
path: root/tests/gunicorn_utils.py
blob: f5dae7a604680d29945ba87523dabc367ede4b90 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import concurrent.futures
import contextlib
import multiprocessing
from typing import Iterator

from gunicorn.app.wsgiapp import WSGIApplication


class _StandaloneApplication(WSGIApplication):
    def __init__(self, config_path: str = None, **kwargs):
        self.config_path = config_path
        self.options = kwargs

        super().__init__()

    def init(self, parser, opts, args):
        """Patch `opts` to simulate the config path being passed as a command-line argument."""
        super().init(parser, opts, args)
        opts.config = self.config_path

    def load_config(self):
        """Set the option kwargs in the config, then load the config as usual."""
        for key, value in self.options.items():
            if key in self.cfg.settings and value is not None:
                self.cfg.set(key.lower(), value)

        super().load_config()


def _proc_target(config_path: str, event: multiprocessing.Event, **kwargs) -> None:
    """Run a Gunicorn app with the given config and set `event` when Gunicorn is ready."""
    def when_ready(_):
        event.set()

    app = _StandaloneApplication(config_path, when_ready=when_ready, **kwargs)

    import logging
    logging.disable(logging.INFO)

    app.run()


@contextlib.contextmanager
def run_gunicorn(config_path: str = "config/gunicorn.conf.py", **kwargs) -> Iterator[None]:
    """
    Run the Snekbox app through separate Gunicorn process. Use as a context manager.

    `config_path` is the path to the Gunicorn config to use.
    Additional kwargs are interpreted as Gunicorn settings.

    Raise RuntimeError if Gunicorn terminates before it is ready.
    Raise TimeoutError if Gunicorn isn't ready after 60 seconds.
    """
    event = multiprocessing.Event()
    proc = multiprocessing.Process(target=_proc_target, args=(config_path, event), kwargs=kwargs)

    try:
        proc.start()

        # Wait 60 seconds for Gunicorn to be ready, but exit early if Gunicorn fails.
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
        concurrent.futures.wait(
            [executor.submit(proc.join), executor.submit(event.wait)],
            timeout=60,
            return_when=concurrent.futures.FIRST_COMPLETED
        )
        # Can't use the context manager cause wait=False needs to be set.
        executor.shutdown(wait=False, cancel_futures=True)

        if proc.is_alive():
            if not event.is_set():
                raise TimeoutError("Timed out waiting for Gunicorn to be ready.")
        else:
            raise RuntimeError(f"Gunicorn terminated unexpectedly with code {proc.exitcode}.")

        yield
    finally:
        proc.terminate()