aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-27 10:49:48 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-27 10:49:48 +0200
commitba23eba9a3acb271d9597bf6fb780d86d50bc2d1 (patch)
treed472a337c8d60efe481594ab899ae760c3959438
parentretag rabbitmq docker image (diff)
kick off jobs in their own processes
-rw-r--r--snekbox.py44
1 files changed, 28 insertions, 16 deletions
diff --git a/snekbox.py b/snekbox.py
index 8c2bbaf..d047996 100644
--- a/snekbox.py
+++ b/snekbox.py
@@ -1,6 +1,7 @@
import sys
import io
import json
+import multiprocessing
from logs import log
from rmq import Rmq
@@ -8,35 +9,46 @@ from rmq import Rmq
rmq = Rmq()
-def execute(snippet):
+def execute(body):
+ msg = body.decode('utf-8')
+ log.info(f"incoming: {msg}")
+
+ failed = False
+
old_stdout = sys.stdout
redirected_output = sys.stdout = io.StringIO()
- failed = False
+
try:
- exec(snippet)
+
+ snek_msg = json.loads(msg)
+ for snekid, snekcode in snek_msg.items():
+ exec(snekcode)
+
except Exception as e:
failed = str(e)
+
finally:
sys.stdout = old_stdout
if failed:
- return failed.strip()
- return redirected_output.getvalue().strip()
+ result = failed.strip()
+
+ result = redirected_output.getvalue().strip()
+
+ log.info(f"outgoing: {result}")
+
+ rmq.publish(result,
+ queue=snekid,
+ routingkey=snekid,
+ exchange=snekid)
+ exit(0)
def message_handler(ch, method, properties, body, thread_ws=None):
- msg = body.decode('utf-8')
+ p = multiprocessing.Process(target=execute, args=(body,))
+ p.daemon = True
+ p.start()
- log.info(f"incoming: {msg}")
- snek_msg = json.loads(msg)
-
- for snekid, snekcode in snek_msg.items():
- result = execute(snekcode)
- log.info(f"outgoing: {result}")
- rmq.publish(result,
- queue=snekid,
- routingkey=snekid,
- exchange=snekid)
ch.basic_ack(delivery_tag=method.delivery_tag)