aboutsummaryrefslogtreecommitdiffstats
path: root/snekweb.py
diff options
context:
space:
mode:
Diffstat (limited to 'snekweb.py')
-rw-r--r--snekweb.py78
1 files changed, 78 insertions, 0 deletions
diff --git a/snekweb.py b/snekweb.py
new file mode 100644
index 0000000..02aca20
--- /dev/null
+++ b/snekweb.py
@@ -0,0 +1,78 @@
+import traceback
+import queue
+import threading
+import time
+
+from rmq.publisher import publish
+from rmq.consumer import consume
+
+from flask import Flask
+from flask import render_template
+from flask_sockets import Sockets
+
+from config import HOST
+from config import PORT
+from config import EXCHANGE
+from config import EXCHANGE_TYPE
+from config import QUEUE
+from config import RETURN_QUEUE
+from config import ROUTING_KEY
+
+app = Flask(__name__)
+sockets = Sockets(app)
+RMQ_queue = queue.Queue(maxsize=0)
+
+def message_handler(ch, method, properties, body):
+ msg = body.decode('utf-8')
+ print(f"incoming: {msg} from rabbitmq", flush=True)
+ RMQ_queue.put(msg)
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+
+def relay_to_ws(ws):
+ while not ws.closed:
+ try:
+ msg = RMQ_queue.get(False)
+ if msg:
+ print(f"sending {msg} to user", flush=True)
+ ws.send(msg)
+ except queue.Empty:
+ time.sleep(0.1)
+ pass
+
+t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler})
+t1.daemon = True
+t1.start()
+
+def index():
+ return render_template('index.html')
+
+def websocket_route(ws):
+ t2 = threading.Thread(target=relay_to_ws, args=(ws, ))
+ t2.daemon = True
+ t2.start()
+
+ try:
+ while not ws.closed:
+
+ message = ws.receive()
+
+ if not message:
+ continue
+ print(f"forwarding '{message}' to rabbitmq")
+
+ publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
+
+ except:
+ print(traceback.format_exc())
+
+ finally:
+ if not ws.closed:
+ ws.close()
+
+if __name__ == '__main__':
+ from gevent import pywsgi
+ from geventwebsocket.handler import WebSocketHandler
+ server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
+ server.serve_forever()