1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
import traceback
import queue
import threading
import time
from rmq.publisher import publish
from rmq.consumer import consume
from flask import Flask
from flask import render_template
from flask_sockets import Sockets
from config import HOST
from config import PORT
from config import EXCHANGE
from config import EXCHANGE_TYPE
from config import QUEUE
from config import RETURN_QUEUE
from config import ROUTING_KEY
app = Flask(__name__)
sockets = Sockets(app)
RMQ_queue = queue.Queue(maxsize=0)
def message_handler(ch, method, properties, body):
msg = body.decode('utf-8')
print(f"incoming: {msg} from rabbitmq", flush=True)
RMQ_queue.put(msg)
ch.basic_ack(delivery_tag = method.delivery_tag)
def relay_to_ws(ws):
while not ws.closed:
try:
msg = RMQ_queue.get(False)
if msg:
print(f"sending {msg} to user", flush=True)
ws.send(msg)
except queue.Empty:
time.sleep(0.1)
pass
t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler})
t1.daemon = True
t1.start()
@app.route('/')
def index():
return render_template('index.html')
@sockets.route('/ws')
def websocket_route(ws):
t2 = threading.Thread(target=relay_to_ws, args=(ws, ))
t2.daemon = True
t2.start()
try:
while not ws.closed:
message = ws.receive()
if not message:
continue
print(f"forwarding '{message}' to rabbitmq")
publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
except:
print(traceback.format_exc())
finally:
if not ws.closed:
ws.close()
if __name__ == '__main__':
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()
|