aboutsummaryrefslogtreecommitdiffstats
path: root/runner/consume.py
diff options
context:
space:
mode:
Diffstat (limited to 'runner/consume.py')
-rw-r--r--runner/consume.py68
1 files changed, 68 insertions, 0 deletions
diff --git a/runner/consume.py b/runner/consume.py
new file mode 100644
index 0000000..0e4d79a
--- /dev/null
+++ b/runner/consume.py
@@ -0,0 +1,68 @@
+import pika
+import traceback
+import sys
+from io import StringIO
+
+from config import (
+ USERNAME,
+ PASSWORD,
+ HOST,
+ PORT,
+ EXCHANGE,
+ EXCHANGE_TYPE,
+ QUEUE,
+ ROUTING_KEY,
+)
+
+def execute(snippet):
+ old_stdout = sys.stdout
+ redirected_output = sys.stdout = StringIO()
+ failed = False
+ try:
+ exec(snippet)
+ except Exception as e:
+ failed = e
+ finally:
+ sys.stdout = old_stdout
+
+ if failed:
+ return failed
+ return redirected_output.getvalue()
+
+
+def message_handler(ch, method, properties, body):
+ msg = body.decode('utf-8')
+
+ # Execute code snippets here
+ print(f"incoming: {msg}", flush=True)
+ result = execute(msg)
+ print(result, flush=True)
+
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+
+def rabbitmq_consume():
+ credentials = pika.PlainCredentials(USERNAME, PASSWORD)
+ connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials))
+
+ channel = connection.channel()
+ channel.queue_declare(queue=QUEUE, durable=False)
+ channel.basic_qos(prefetch_count=1)
+ channel.basic_consume(message_handler, queue=QUEUE)
+
+ try:
+ print(f"""Connecting to
+ host: {HOST}
+ port: {PORT}
+ exchange: {EXCHANGE}
+ queue: {QUEUE}""", flush=True)
+
+ channel.start_consuming()
+
+ except Exception:
+ exc = traceback.format_exc()
+ print(exc, flush=True)
+
+ finally:
+ connection.close()
+
+rabbitmq_consume()