diff options
| author | 2018-05-23 22:38:20 +0200 | |
|---|---|---|
| committer | 2018-05-23 22:38:20 +0200 | |
| commit | 309a6f93f878fc96951902fc47d45a30ef5f8d71 (patch) | |
| tree | f788b43a892a93d0f97da73f459a55b43e1ea1a0 /snekweb.py | |
| parent | update readme (diff) | |
POC completed
Diffstat (limited to 'snekweb.py')
| -rw-r--r-- | snekweb.py | 78 | 
1 files changed, 78 insertions, 0 deletions
| diff --git a/snekweb.py b/snekweb.py new file mode 100644 index 0000000..02aca20 --- /dev/null +++ b/snekweb.py @@ -0,0 +1,78 @@ +import traceback +import queue +import threading +import time + +from rmq.publisher import publish +from rmq.consumer import consume + +from flask import Flask +from flask import render_template +from flask_sockets import Sockets + +from config import HOST +from config import PORT +from config import EXCHANGE +from config import EXCHANGE_TYPE +from config import QUEUE +from config import RETURN_QUEUE +from config import ROUTING_KEY + +app = Flask(__name__) +sockets = Sockets(app) +RMQ_queue = queue.Queue(maxsize=0) + +def message_handler(ch, method, properties, body): +    msg = body.decode('utf-8') +    print(f"incoming: {msg} from rabbitmq", flush=True) +    RMQ_queue.put(msg) +    ch.basic_ack(delivery_tag = method.delivery_tag) + +def relay_to_ws(ws): +    while not ws.closed: +        try: +            msg = RMQ_queue.get(False) +            if msg: +                print(f"sending {msg} to user", flush=True) +                ws.send(msg) +        except queue.Empty: +            time.sleep(0.1) +            pass + +t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) +t1.daemon = True +t1.start() + [email protected]('/') +def index(): +    return render_template('index.html') + [email protected]('/ws') +def websocket_route(ws): +    t2 = threading.Thread(target=relay_to_ws, args=(ws, )) +    t2.daemon = True +    t2.start() + +    try: +        while not ws.closed: + +            message = ws.receive() + +            if not message: +                continue +            print(f"forwarding '{message}' to rabbitmq") + +            publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) + +    except: +        print(traceback.format_exc()) + +    finally: +        if not ws.closed: +            ws.close() + +if __name__ == '__main__': +    from gevent import pywsgi +    from geventwebsocket.handler import WebSocketHandler +    server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) +    server.serve_forever() | 
