diff options
| author | 2018-05-26 12:54:49 +0200 | |
|---|---|---|
| committer | 2018-05-26 12:54:49 +0200 | |
| commit | ea2141dc6fd7284e6f9fa04ee638460286e3b09c (patch) | |
| tree | 218d332be80bd2f4e177b0d9e4725dd27dec2034 /snekweb.py | |
| parent | lint (diff) | |
simplify threads and use local thread variables to manage user websocket connections
Diffstat (limited to 'snekweb.py')
| -rw-r--r-- | snekweb.py | 61 | 
1 files changed, 25 insertions, 36 deletions
| @@ -3,24 +3,27 @@ import queue  import threading  import time  import json - -from rmq.publisher import publish -from rmq.consumer import consume +import logging +import geventwebsocket  from flask import Flask  from flask import render_template  from flask_sockets import Sockets -from config import HOST -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY +from rmq import Rmq + +# Load app  app = Flask(__name__)  app.jinja_env.auto_reload = True  sockets = Sockets(app) +# Logging +gunicorn_logger = logging.getLogger('gunicorn.error') +app.logger.handlers = gunicorn_logger.handlers +app.logger.setLevel(gunicorn_logger.level) +log = app.logger +  @app.route('/')  def index(): @@ -29,48 +32,34 @@ def index():  @sockets.route('/ws/<snekboxid>')  def websocket_route(ws, snekboxid): -    RMQ_queue = queue.Queue(maxsize=0) +    localdata = threading.local() +    localdata.thread_ws = ws -    def message_handler(ch, method, properties, body): +    rmq = Rmq() + +    def message_handler(ch, method, properties, body, thread_ws):          msg = body.decode('utf-8') -        RMQ_queue.put(msg) +        log.debug(f"message_handler: {msg}") +        thread_ws.send(msg)          ch.basic_ack(delivery_tag=method.delivery_tag) -    consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler} -    consumer = threading.Thread(target=consume, kwargs=consumer_parameters) +    consumer_parameters = {'queue': snekboxid, +                           'callback': message_handler, +                           'thread_ws': localdata.thread_ws} +    consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters)      consumer.daemon = True      consumer.start() -    def relay_to_ws(ws): -        global client_list -        while True: -            try: -                msg = RMQ_queue.get(False) -                if msg: -                    ws.send(msg) -            except queue.Empty: -                time.sleep(0.1) -                pass - -    relay = threading.Thread(target=relay_to_ws, args=(ws,)) -    relay.daemon = True -    relay.start() -      try:          while not ws.closed:              message = ws.receive()              if message:                  snek_msg = json.dumps({snekboxid: message}) -                print(f"forwarding {snek_msg} to rabbitmq") -                publish(snek_msg, -                        host=HOST, -                        queue=QUEUE, -                        routingkey=ROUTING_KEY, -                        exchange=EXCHANGE, -                        exchange_type=EXCHANGE_TYPE) +                log.info(f"forwarding {snek_msg} to rabbitmq") +                rmq.publish(snek_msg)      except Exception: -        print(traceback.format_exc()) +        log.info(traceback.format_exc())      finally:          if not ws.closed: | 
