aboutsummaryrefslogtreecommitdiffstats
path: root/snekweb.py
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-24 21:45:25 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-24 21:45:25 +0200
commit10a205e97d399d1736fd9c806e1e309bb31ff28d (patch)
tree14ed25194db1f274e76a07fe0073db0e7d464a27 /snekweb.py
parentupdate readme (diff)
generate new queue based on session id to lock user to their own event chain
Diffstat (limited to 'snekweb.py')
-rw-r--r--snekweb.py76
1 files changed, 38 insertions, 38 deletions
diff --git a/snekweb.py b/snekweb.py
index 02aca20..ff9f8b4 100644
--- a/snekweb.py
+++ b/snekweb.py
@@ -2,6 +2,7 @@ import traceback
import queue
import threading
import time
+import json
from rmq.publisher import publish
from rmq.consumer import consume
@@ -19,50 +20,49 @@ from config import RETURN_QUEUE
from config import ROUTING_KEY
app = Flask(__name__)
+app.jinja_env.auto_reload = True
sockets = Sockets(app)
-RMQ_queue = queue.Queue(maxsize=0)
-def message_handler(ch, method, properties, body):
- msg = body.decode('utf-8')
- print(f"incoming: {msg} from rabbitmq", flush=True)
- RMQ_queue.put(msg)
- ch.basic_ack(delivery_tag = method.delivery_tag)
-
-def relay_to_ws(ws):
- while not ws.closed:
- try:
- msg = RMQ_queue.get(False)
- if msg:
- print(f"sending {msg} to user", flush=True)
- ws.send(msg)
- except queue.Empty:
- time.sleep(0.1)
- pass
-
-t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler})
-t1.daemon = True
-t1.start()
@app.route('/')
def index():
return render_template('index.html')
-def websocket_route(ws):
- t2 = threading.Thread(target=relay_to_ws, args=(ws, ))
- t2.daemon = True
- t2.start()
[email protected]('/ws/<snekboxid>')
+def websocket_route(ws, snekboxid):
+ RMQ_queue = queue.Queue(maxsize=0)
+
+ def message_handler(ch, method, properties, body):
+ msg = body.decode('utf-8')
+ RMQ_queue.put(msg)
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+
+ consumer = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':snekboxid, 'callback':message_handler})
+ consumer.daemon = True
+ consumer.start()
+
+ def relay_to_ws(ws):
+ global client_list
+ while True:
+ try:
+ msg = RMQ_queue.get(False)
+ if msg:
+ ws.send(msg)
+ except queue.Empty:
+ time.sleep(0.1)
+ pass
+
+ relay = threading.Thread(target=relay_to_ws, args=(ws,))
+ relay.daemon = True
+ relay.start()
try:
while not ws.closed:
-
message = ws.receive()
-
- if not message:
- continue
- print(f"forwarding '{message}' to rabbitmq")
-
- publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
+ if message:
+ snek_msg = json.dumps({snekboxid:message})
+ print(f"forwarding {snek_msg} to rabbitmq")
+ publish(snek_msg, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
except:
print(traceback.format_exc())
@@ -71,8 +71,8 @@ def websocket_route(ws):
if not ws.closed:
ws.close()
-if __name__ == '__main__':
- from gevent import pywsgi
- from geventwebsocket.handler import WebSocketHandler
- server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
- server.serve_forever()
+# if __name__ == '__main__':
+# from gevent import pywsgi
+# from geventwebsocket.handler import WebSocketHandler
+# server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
+# server.serve_forever()