diff options
Diffstat (limited to 'snekweb.py')
| -rw-r--r-- | snekweb.py | 76 | 
1 files changed, 38 insertions, 38 deletions
| @@ -2,6 +2,7 @@ import traceback  import queue  import threading  import time +import json  from rmq.publisher import publish  from rmq.consumer import consume @@ -19,50 +20,49 @@ from config import RETURN_QUEUE  from config import ROUTING_KEY  app = Flask(__name__) +app.jinja_env.auto_reload = True  sockets = Sockets(app) -RMQ_queue = queue.Queue(maxsize=0) -def message_handler(ch, method, properties, body): -    msg = body.decode('utf-8') -    print(f"incoming: {msg} from rabbitmq", flush=True) -    RMQ_queue.put(msg) -    ch.basic_ack(delivery_tag = method.delivery_tag) - -def relay_to_ws(ws): -    while not ws.closed: -        try: -            msg = RMQ_queue.get(False) -            if msg: -                print(f"sending {msg} to user", flush=True) -                ws.send(msg) -        except queue.Empty: -            time.sleep(0.1) -            pass - -t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) -t1.daemon = True -t1.start()  @app.route('/')  def index():      return render_template('index.html') [email protected]('/ws') -def websocket_route(ws): -    t2 = threading.Thread(target=relay_to_ws, args=(ws, )) -    t2.daemon = True -    t2.start() [email protected]('/ws/<snekboxid>') +def websocket_route(ws, snekboxid): +    RMQ_queue = queue.Queue(maxsize=0) + +    def message_handler(ch, method, properties, body): +        msg = body.decode('utf-8') +        RMQ_queue.put(msg) +        ch.basic_ack(delivery_tag = method.delivery_tag) + +    consumer = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':snekboxid, 'callback':message_handler}) +    consumer.daemon = True +    consumer.start() + +    def relay_to_ws(ws): +        global client_list +        while True: +            try: +                msg = RMQ_queue.get(False) +                if msg: +                    ws.send(msg) +            except queue.Empty: +                time.sleep(0.1) +                pass + +    relay = threading.Thread(target=relay_to_ws, args=(ws,)) +    relay.daemon = True +    relay.start()      try:          while not ws.closed: -              message = ws.receive() - -            if not message: -                continue -            print(f"forwarding '{message}' to rabbitmq") - -            publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) +            if message: +                snek_msg = json.dumps({snekboxid:message}) +                print(f"forwarding {snek_msg} to rabbitmq") +                publish(snek_msg, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)      except:          print(traceback.format_exc()) @@ -71,8 +71,8 @@ def websocket_route(ws):          if not ws.closed:              ws.close() -if __name__ == '__main__': -    from gevent import pywsgi -    from geventwebsocket.handler import WebSocketHandler -    server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) -    server.serve_forever() +# if __name__ == '__main__': +#     from gevent import pywsgi +#     from geventwebsocket.handler import WebSocketHandler +#     server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) +#     server.serve_forever() | 
