diff options
author | 2018-05-26 12:54:49 +0200 | |
---|---|---|
committer | 2018-05-26 12:54:49 +0200 | |
commit | ea2141dc6fd7284e6f9fa04ee638460286e3b09c (patch) | |
tree | 218d332be80bd2f4e177b0d9e4725dd27dec2034 /snekweb.py | |
parent | lint (diff) |
simplify threads and use local thread variables to manage user websocket connections
Diffstat (limited to 'snekweb.py')
-rw-r--r-- | snekweb.py | 61 |
1 files changed, 25 insertions, 36 deletions
@@ -3,24 +3,27 @@ import queue import threading import time import json - -from rmq.publisher import publish -from rmq.consumer import consume +import logging +import geventwebsocket from flask import Flask from flask import render_template from flask_sockets import Sockets -from config import HOST -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY +from rmq import Rmq + +# Load app app = Flask(__name__) app.jinja_env.auto_reload = True sockets = Sockets(app) +# Logging +gunicorn_logger = logging.getLogger('gunicorn.error') +app.logger.handlers = gunicorn_logger.handlers +app.logger.setLevel(gunicorn_logger.level) +log = app.logger + @app.route('/') def index(): @@ -29,48 +32,34 @@ def index(): @sockets.route('/ws/<snekboxid>') def websocket_route(ws, snekboxid): - RMQ_queue = queue.Queue(maxsize=0) + localdata = threading.local() + localdata.thread_ws = ws - def message_handler(ch, method, properties, body): + rmq = Rmq() + + def message_handler(ch, method, properties, body, thread_ws): msg = body.decode('utf-8') - RMQ_queue.put(msg) + log.debug(f"message_handler: {msg}") + thread_ws.send(msg) ch.basic_ack(delivery_tag=method.delivery_tag) - consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler} - consumer = threading.Thread(target=consume, kwargs=consumer_parameters) + consumer_parameters = {'queue': snekboxid, + 'callback': message_handler, + 'thread_ws': localdata.thread_ws} + consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters) consumer.daemon = True consumer.start() - def relay_to_ws(ws): - global client_list - while True: - try: - msg = RMQ_queue.get(False) - if msg: - ws.send(msg) - except queue.Empty: - time.sleep(0.1) - pass - - relay = threading.Thread(target=relay_to_ws, args=(ws,)) - relay.daemon = True - relay.start() - try: while not ws.closed: message = ws.receive() if message: snek_msg = json.dumps({snekboxid: message}) - print(f"forwarding {snek_msg} to rabbitmq") - publish(snek_msg, - host=HOST, - queue=QUEUE, - routingkey=ROUTING_KEY, - exchange=EXCHANGE, - exchange_type=EXCHANGE_TYPE) + log.info(f"forwarding {snek_msg} to rabbitmq") + rmq.publish(snek_msg) except Exception: - print(traceback.format_exc()) + log.info(traceback.format_exc()) finally: if not ws.closed: |