diff options
Diffstat (limited to 'snekweb.py')
-rw-r--r-- | snekweb.py | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/snekweb.py b/snekweb.py new file mode 100644 index 0000000..02aca20 --- /dev/null +++ b/snekweb.py @@ -0,0 +1,78 @@ +import traceback +import queue +import threading +import time + +from rmq.publisher import publish +from rmq.consumer import consume + +from flask import Flask +from flask import render_template +from flask_sockets import Sockets + +from config import HOST +from config import PORT +from config import EXCHANGE +from config import EXCHANGE_TYPE +from config import QUEUE +from config import RETURN_QUEUE +from config import ROUTING_KEY + +app = Flask(__name__) +sockets = Sockets(app) +RMQ_queue = queue.Queue(maxsize=0) + +def message_handler(ch, method, properties, body): + msg = body.decode('utf-8') + print(f"incoming: {msg} from rabbitmq", flush=True) + RMQ_queue.put(msg) + ch.basic_ack(delivery_tag = method.delivery_tag) + +def relay_to_ws(ws): + while not ws.closed: + try: + msg = RMQ_queue.get(False) + if msg: + print(f"sending {msg} to user", flush=True) + ws.send(msg) + except queue.Empty: + time.sleep(0.1) + pass + +t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) +t1.daemon = True +t1.start() + [email protected]('/') +def index(): + return render_template('index.html') + [email protected]('/ws') +def websocket_route(ws): + t2 = threading.Thread(target=relay_to_ws, args=(ws, )) + t2.daemon = True + t2.start() + + try: + while not ws.closed: + + message = ws.receive() + + if not message: + continue + print(f"forwarding '{message}' to rabbitmq") + + publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) + + except: + print(traceback.format_exc()) + + finally: + if not ws.closed: + ws.close() + +if __name__ == '__main__': + from gevent import pywsgi + from geventwebsocket.handler import WebSocketHandler + server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) + server.serve_forever() |