aboutsummaryrefslogtreecommitdiffstats
path: root/snekweb.py
diff options
context:
space:
mode:
Diffstat (limited to 'snekweb.py')
-rw-r--r--snekweb.py61
1 files changed, 25 insertions, 36 deletions
diff --git a/snekweb.py b/snekweb.py
index 1c6ea34..8929438 100644
--- a/snekweb.py
+++ b/snekweb.py
@@ -3,24 +3,27 @@ import queue
import threading
import time
import json
-
-from rmq.publisher import publish
-from rmq.consumer import consume
+import logging
+import geventwebsocket
from flask import Flask
from flask import render_template
from flask_sockets import Sockets
-from config import HOST
-from config import EXCHANGE
-from config import EXCHANGE_TYPE
-from config import QUEUE
-from config import ROUTING_KEY
+from rmq import Rmq
+
+# Load app
app = Flask(__name__)
app.jinja_env.auto_reload = True
sockets = Sockets(app)
+# Logging
+gunicorn_logger = logging.getLogger('gunicorn.error')
+app.logger.handlers = gunicorn_logger.handlers
+app.logger.setLevel(gunicorn_logger.level)
+log = app.logger
+
@app.route('/')
def index():
@@ -29,48 +32,34 @@ def index():
@sockets.route('/ws/<snekboxid>')
def websocket_route(ws, snekboxid):
- RMQ_queue = queue.Queue(maxsize=0)
+ localdata = threading.local()
+ localdata.thread_ws = ws
- def message_handler(ch, method, properties, body):
+ rmq = Rmq()
+
+ def message_handler(ch, method, properties, body, thread_ws):
msg = body.decode('utf-8')
- RMQ_queue.put(msg)
+ log.debug(f"message_handler: {msg}")
+ thread_ws.send(msg)
ch.basic_ack(delivery_tag=method.delivery_tag)
- consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler}
- consumer = threading.Thread(target=consume, kwargs=consumer_parameters)
+ consumer_parameters = {'queue': snekboxid,
+ 'callback': message_handler,
+ 'thread_ws': localdata.thread_ws}
+ consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters)
consumer.daemon = True
consumer.start()
- def relay_to_ws(ws):
- global client_list
- while True:
- try:
- msg = RMQ_queue.get(False)
- if msg:
- ws.send(msg)
- except queue.Empty:
- time.sleep(0.1)
- pass
-
- relay = threading.Thread(target=relay_to_ws, args=(ws,))
- relay.daemon = True
- relay.start()
-
try:
while not ws.closed:
message = ws.receive()
if message:
snek_msg = json.dumps({snekboxid: message})
- print(f"forwarding {snek_msg} to rabbitmq")
- publish(snek_msg,
- host=HOST,
- queue=QUEUE,
- routingkey=ROUTING_KEY,
- exchange=EXCHANGE,
- exchange_type=EXCHANGE_TYPE)
+ log.info(f"forwarding {snek_msg} to rabbitmq")
+ rmq.publish(snek_msg)
except Exception:
- print(traceback.format_exc())
+ log.info(traceback.format_exc())
finally:
if not ws.closed: