From 10a205e97d399d1736fd9c806e1e309bb31ff28d Mon Sep 17 00:00:00 2001 From: Christopher Baklid Date: Thu, 24 May 2018 21:45:25 +0200 Subject: generate new queue based on session id to lock user to their own event chain --- snekweb.py | 76 +++++++++++++++++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 38 deletions(-) (limited to 'snekweb.py') diff --git a/snekweb.py b/snekweb.py index 02aca20..ff9f8b4 100644 --- a/snekweb.py +++ b/snekweb.py @@ -2,6 +2,7 @@ import traceback import queue import threading import time +import json from rmq.publisher import publish from rmq.consumer import consume @@ -19,50 +20,49 @@ from config import RETURN_QUEUE from config import ROUTING_KEY app = Flask(__name__) +app.jinja_env.auto_reload = True sockets = Sockets(app) -RMQ_queue = queue.Queue(maxsize=0) -def message_handler(ch, method, properties, body): - msg = body.decode('utf-8') - print(f"incoming: {msg} from rabbitmq", flush=True) - RMQ_queue.put(msg) - ch.basic_ack(delivery_tag = method.delivery_tag) - -def relay_to_ws(ws): - while not ws.closed: - try: - msg = RMQ_queue.get(False) - if msg: - print(f"sending {msg} to user", flush=True) - ws.send(msg) - except queue.Empty: - time.sleep(0.1) - pass - -t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) -t1.daemon = True -t1.start() @app.route('/') def index(): return render_template('index.html') -@sockets.route('/ws') -def websocket_route(ws): - t2 = threading.Thread(target=relay_to_ws, args=(ws, )) - t2.daemon = True - t2.start() +@sockets.route('/ws/') +def websocket_route(ws, snekboxid): + RMQ_queue = queue.Queue(maxsize=0) + + def message_handler(ch, method, properties, body): + msg = body.decode('utf-8') + RMQ_queue.put(msg) + ch.basic_ack(delivery_tag = method.delivery_tag) + + consumer = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':snekboxid, 'callback':message_handler}) + consumer.daemon = True + consumer.start() + + def relay_to_ws(ws): + global client_list + while True: + try: + msg = RMQ_queue.get(False) + if msg: + ws.send(msg) + except queue.Empty: + time.sleep(0.1) + pass + + relay = threading.Thread(target=relay_to_ws, args=(ws,)) + relay.daemon = True + relay.start() try: while not ws.closed: - message = ws.receive() - - if not message: - continue - print(f"forwarding '{message}' to rabbitmq") - - publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) + if message: + snek_msg = json.dumps({snekboxid:message}) + print(f"forwarding {snek_msg} to rabbitmq") + publish(snek_msg, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) except: print(traceback.format_exc()) @@ -71,8 +71,8 @@ def websocket_route(ws): if not ws.closed: ws.close() -if __name__ == '__main__': - from gevent import pywsgi - from geventwebsocket.handler import WebSocketHandler - server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) - server.serve_forever() +# if __name__ == '__main__': +# from gevent import pywsgi +# from geventwebsocket.handler import WebSocketHandler +# server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) +# server.serve_forever() -- cgit v1.2.3