import traceback import queue import threading import time import json from rmq.publisher import publish from rmq.consumer import consume from flask import Flask from flask import render_template from flask_sockets import Sockets from config import HOST from config import EXCHANGE from config import EXCHANGE_TYPE from config import QUEUE from config import ROUTING_KEY app = Flask(__name__) app.jinja_env.auto_reload = True sockets = Sockets(app) @app.route('/') def index(): return render_template('index.html') @sockets.route('/ws/') def websocket_route(ws, snekboxid): RMQ_queue = queue.Queue(maxsize=0) def message_handler(ch, method, properties, body): msg = body.decode('utf-8') RMQ_queue.put(msg) ch.basic_ack(delivery_tag=method.delivery_tag) consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler} consumer = threading.Thread(target=consume, kwargs=consumer_parameters) consumer.daemon = True consumer.start() def relay_to_ws(ws): global client_list while True: try: msg = RMQ_queue.get(False) if msg: ws.send(msg) except queue.Empty: time.sleep(0.1) pass relay = threading.Thread(target=relay_to_ws, args=(ws,)) relay.daemon = True relay.start() try: while not ws.closed: message = ws.receive() if message: snek_msg = json.dumps({snekboxid: message}) print(f"forwarding {snek_msg} to rabbitmq") publish(snek_msg, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) except Exception: print(traceback.format_exc()) finally: if not ws.closed: ws.close() if __name__ == '__main__': from gevent import pywsgi from geventwebsocket.handler import WebSocketHandler server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) server.serve_forever()