From 10a205e97d399d1736fd9c806e1e309bb31ff28d Mon Sep 17 00:00:00 2001 From: Christopher Baklid Date: Thu, 24 May 2018 21:45:25 +0200 Subject: generate new queue based on session id to lock user to their own event chain --- Pipfile | 3 ++- Pipfile.lock | 10 ++++++- snekbox.py | 9 ++++--- snekweb.py | 76 ++++++++++++++++++++++++++-------------------------- templates/index.html | 19 ++++++++++++- 5 files changed, 73 insertions(+), 44 deletions(-) diff --git a/Pipfile b/Pipfile index 54f4847..e8dae92 100644 --- a/Pipfile +++ b/Pipfile @@ -12,13 +12,14 @@ flask = "*" flask-sockets = "*" gevent = "==1.2.2" gevent-websocket = "*" +gunicorn = "*" [requires] python_version = "3.6" [scripts] snekbox = "python snekbox.py" -snekweb = "python snekweb.py" +snekweb = "gunicorn -w 1 -b 0.0.0.0:5000 -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app" buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." pushbox = "docker push pythondiscord/snekbox:latest" diff --git a/Pipfile.lock b/Pipfile.lock index 885f9c1..c580d4a 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "0a745b21e4db904bb3575c80d627ab80a3418bcb42e6e5c3695e914b88756114" + "sha256": "e371feaf8cfbc6df9050701afd2f1c9be77630841f833962845421542640e825" }, "pipfile-spec": 6, "requires": { @@ -143,6 +143,14 @@ ], "version": "==0.4.13" }, + "gunicorn": { + "hashes": [ + "sha256:7ef2b828b335ed58e3b64ffa84caceb0a7dd7c5ca12f217241350dec36a1d5dc", + "sha256:bc59005979efb6d2dd7d5ba72d99f8a8422862ad17ff3a16e900684630dd2a10" + ], + "index": "pypi", + "version": "==19.8.1" + }, "idna": { "hashes": [ "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", diff --git a/snekbox.py b/snekbox.py index b58447e..4dfcc48 100644 --- a/snekbox.py +++ b/snekbox.py @@ -3,6 +3,7 @@ import sys import time import pika import io +import json from rmq.consumer import consume from rmq.publisher import publish @@ -39,9 +40,11 @@ def message_handler(ch, method, properties, body): # Execute code snippets here print(f"incoming: {msg}", flush=True) - result = execute(msg) - print(f"outgoing: {result}", flush=True) - publish(result, host=HOST, queue=RETURN_QUEUE, routingkey=RETURN_ROUTING_KEY, exchange=RETURN_EXCHANGE, exchange_type=EXCHANGE_TYPE) + snek_msg = json.loads(msg) + for snekid, snekcode in snek_msg.items(): + result = execute(snekcode) + print(f"outgoing: {result}", flush=True) + publish(result, host=HOST, queue=snekid, routingkey=snekid, exchange=snekid, exchange_type=EXCHANGE_TYPE) ch.basic_ack(delivery_tag = method.delivery_tag) diff --git a/snekweb.py b/snekweb.py index 02aca20..ff9f8b4 100644 --- a/snekweb.py +++ b/snekweb.py @@ -2,6 +2,7 @@ import traceback import queue import threading import time +import json from rmq.publisher import publish from rmq.consumer import consume @@ -19,50 +20,49 @@ from config import RETURN_QUEUE from config import ROUTING_KEY app = Flask(__name__) +app.jinja_env.auto_reload = True sockets = Sockets(app) -RMQ_queue = queue.Queue(maxsize=0) -def message_handler(ch, method, properties, body): - msg = body.decode('utf-8') - print(f"incoming: {msg} from rabbitmq", flush=True) - RMQ_queue.put(msg) - ch.basic_ack(delivery_tag = method.delivery_tag) - -def relay_to_ws(ws): - while not ws.closed: - try: - msg = RMQ_queue.get(False) - if msg: - print(f"sending {msg} to user", flush=True) - ws.send(msg) - except queue.Empty: - time.sleep(0.1) - pass - -t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) -t1.daemon = True -t1.start() @app.route('/') def index(): return render_template('index.html') -@sockets.route('/ws') -def websocket_route(ws): - t2 = threading.Thread(target=relay_to_ws, args=(ws, )) - t2.daemon = True - t2.start() +@sockets.route('/ws/') +def websocket_route(ws, snekboxid): + RMQ_queue = queue.Queue(maxsize=0) + + def message_handler(ch, method, properties, body): + msg = body.decode('utf-8') + RMQ_queue.put(msg) + ch.basic_ack(delivery_tag = method.delivery_tag) + + consumer = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':snekboxid, 'callback':message_handler}) + consumer.daemon = True + consumer.start() + + def relay_to_ws(ws): + global client_list + while True: + try: + msg = RMQ_queue.get(False) + if msg: + ws.send(msg) + except queue.Empty: + time.sleep(0.1) + pass + + relay = threading.Thread(target=relay_to_ws, args=(ws,)) + relay.daemon = True + relay.start() try: while not ws.closed: - message = ws.receive() - - if not message: - continue - print(f"forwarding '{message}' to rabbitmq") - - publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) + if message: + snek_msg = json.dumps({snekboxid:message}) + print(f"forwarding {snek_msg} to rabbitmq") + publish(snek_msg, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) except: print(traceback.format_exc()) @@ -71,8 +71,8 @@ def websocket_route(ws): if not ws.closed: ws.close() -if __name__ == '__main__': - from gevent import pywsgi - from geventwebsocket.handler import WebSocketHandler - server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) - server.serve_forever() +# if __name__ == '__main__': +# from gevent import pywsgi +# from geventwebsocket.handler import WebSocketHandler +# server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) +# server.serve_forever() diff --git a/templates/index.html b/templates/index.html index d0b0630..ef53d74 100644 --- a/templates/index.html +++ b/templates/index.html @@ -3,9 +3,19 @@ snekboxweb -- cgit v1.2.3