From ea2141dc6fd7284e6f9fa04ee638460286e3b09c Mon Sep 17 00:00:00 2001 From: Christopher Baklid Date: Sat, 26 May 2018 12:54:49 +0200 Subject: simplify threads and use local thread variables to manage user websocket connections --- Pipfile | 2 +- logs.py | 10 ++++++ rmq.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++ rmq/__init__.py | 0 rmq/consumer.py | 42 ----------------------- rmq/publisher.py | 35 ------------------- snekbox.py | 28 +++++++--------- snekweb.py | 61 ++++++++++++++-------------------- templates/index.html | 6 ++-- 9 files changed, 146 insertions(+), 132 deletions(-) create mode 100644 logs.py create mode 100644 rmq.py delete mode 100644 rmq/__init__.py delete mode 100644 rmq/consumer.py delete mode 100644 rmq/publisher.py diff --git a/Pipfile b/Pipfile index 5417ae3..c576744 100644 --- a/Pipfile +++ b/Pipfile @@ -21,7 +21,7 @@ python_version = "3.6" [scripts] lint = "flake8" snekbox = "python snekbox.py" -snekweb = "gunicorn -w 1 -b 0.0.0.0:5000 -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app" +snekweb = "gunicorn -w 2 -b 0.0.0.0:5000 --log-level debug -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app" buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." pushbox = "docker push pythondiscord/snekbox:latest" diff --git a/logs.py b/logs.py new file mode 100644 index 0000000..fc6070e --- /dev/null +++ b/logs.py @@ -0,0 +1,10 @@ +import logging +import sys + +logformat = logging.Formatter(fmt='[%(asctime)s] [%(process)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S %z') +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) +console = logging.StreamHandler(sys.stdout) +console.setFormatter(logformat) +log.addHandler(console) diff --git a/rmq.py b/rmq.py new file mode 100644 index 0000000..c8547d0 --- /dev/null +++ b/rmq.py @@ -0,0 +1,94 @@ +import pika +import time +import traceback +import logging +import sys + +from pika.exceptions import ConnectionClosed + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE_TYPE +from config import QUEUE +from config import ROUTING_KEY +from config import EXCHANGE + +from logs import log + + +class Rmq(object): + + def __init__(self, + username=USERNAME, + password=PASSWORD, + host=HOST, + port=PORT, + exchange_type=EXCHANGE_TYPE): + + self.username = USERNAME + self.password = PASSWORD + self.host = HOST + self.port = PORT + self.exchange_type = EXCHANGE_TYPE + self.credentials = pika.PlainCredentials(self.username, self.password) + self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) + self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + + def consume(self, queue=QUEUE, callback=None, thread_ws=None): + + while True: + try: + connection = pika.BlockingConnection(self.con_params) + + try: + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), + queue=queue) + + log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + + channel.start_consuming() + + except Exception: + exc = traceback.format_exc() + log.info(exc) + + finally: + connection.close() + + except ConnectionClosed: + log.info(f"Connection lost, reconnecting to {self.host}") + pass + + time.sleep(2) + + def publish(self, + message, + queue=QUEUE, + routingkey=ROUTING_KEY, + exchange=EXCHANGE): + + connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) + channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + result = channel.basic_publish( + exchange=exchange, + routing_key=routingkey, + body=message, + properties=self.properties + ) + + if result: + log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") + else: + log.info("not delivered") + + connection.close() diff --git a/rmq/__init__.py b/rmq/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/rmq/consumer.py b/rmq/consumer.py deleted file mode 100644 index 2c41e26..0000000 --- a/rmq/consumer.py +++ /dev/null @@ -1,42 +0,0 @@ -import time -import traceback -import pika -from pika.exceptions import ConnectionClosed - - -def consume(username='guest', - password='guest', - host='localhost', - port=5672, - queue='', - callback=None): - - while True: - credentials = pika.PlainCredentials(username, password) - con_params = pika.ConnectionParameters(host, port, '/', credentials) - - try: - connection = pika.BlockingConnection(con_params) - - try: - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(callback, queue=queue) - - print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) - - channel.start_consuming() - - except Exception: - exc = traceback.format_exc() - print(exc, flush=True) - - finally: - connection.close() - - except ConnectionClosed: - print(f"Connection lost, reconnecting to {host}", flush=True) - pass - - time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py deleted file mode 100644 index 1a9a5cf..0000000 --- a/rmq/publisher.py +++ /dev/null @@ -1,35 +0,0 @@ -import pika - - -def publish(message, - username='guest', - password='guest', - host='localhost', - port=5672, - queue='', - routingkey='', - exchange='', - exchange_type=''): - - credentials = pika.PlainCredentials(username, password) - connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) - properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - - result = channel.basic_publish( - exchange=exchange, - routing_key=routingkey, - body=message, - properties=properties - ) - - if result: - print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) - else: - print("not delivered") - - connection.close() diff --git a/snekbox.py b/snekbox.py index ee82be8..d6795e8 100644 --- a/snekbox.py +++ b/snekbox.py @@ -1,13 +1,12 @@ import sys import io import json +import logging -from rmq.consumer import consume -from rmq.publisher import publish +from logs import log +from rmq import Rmq -from config import HOST -from config import EXCHANGE_TYPE -from config import QUEUE +rmq = Rmq() def execute(snippet): @@ -26,24 +25,21 @@ def execute(snippet): return redirected_output.getvalue().strip() -def message_handler(ch, method, properties, body): +def message_handler(ch, method, properties, body, thread_ws=None): msg = body.decode('utf-8') - print(f"incoming: {msg}", flush=True) + log.info(f"incoming: {msg}") snek_msg = json.loads(msg) for snekid, snekcode in snek_msg.items(): result = execute(snekcode) - print(f"outgoing: {result}", flush=True) - publish(result, - host=HOST, - queue=snekid, - routingkey=snekid, - exchange=snekid, - exchange_type=EXCHANGE_TYPE) - + log.info(f"outgoing: {result}") + rmq.publish(result, + queue=snekid, + routingkey=snekid, + exchange=snekid) ch.basic_ack(delivery_tag=method.delivery_tag) if __name__ == '__main__': - consume(host=HOST, queue=QUEUE, callback=message_handler) + rmq.consume(callback=message_handler) diff --git a/snekweb.py b/snekweb.py index 1c6ea34..8929438 100644 --- a/snekweb.py +++ b/snekweb.py @@ -3,24 +3,27 @@ import queue import threading import time import json - -from rmq.publisher import publish -from rmq.consumer import consume +import logging +import geventwebsocket from flask import Flask from flask import render_template from flask_sockets import Sockets -from config import HOST -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY +from rmq import Rmq + +# Load app app = Flask(__name__) app.jinja_env.auto_reload = True sockets = Sockets(app) +# Logging +gunicorn_logger = logging.getLogger('gunicorn.error') +app.logger.handlers = gunicorn_logger.handlers +app.logger.setLevel(gunicorn_logger.level) +log = app.logger + @app.route('/') def index(): @@ -29,48 +32,34 @@ def index(): @sockets.route('/ws/') def websocket_route(ws, snekboxid): - RMQ_queue = queue.Queue(maxsize=0) + localdata = threading.local() + localdata.thread_ws = ws - def message_handler(ch, method, properties, body): + rmq = Rmq() + + def message_handler(ch, method, properties, body, thread_ws): msg = body.decode('utf-8') - RMQ_queue.put(msg) + log.debug(f"message_handler: {msg}") + thread_ws.send(msg) ch.basic_ack(delivery_tag=method.delivery_tag) - consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler} - consumer = threading.Thread(target=consume, kwargs=consumer_parameters) + consumer_parameters = {'queue': snekboxid, + 'callback': message_handler, + 'thread_ws': localdata.thread_ws} + consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters) consumer.daemon = True consumer.start() - def relay_to_ws(ws): - global client_list - while True: - try: - msg = RMQ_queue.get(False) - if msg: - ws.send(msg) - except queue.Empty: - time.sleep(0.1) - pass - - relay = threading.Thread(target=relay_to_ws, args=(ws,)) - relay.daemon = True - relay.start() - try: while not ws.closed: message = ws.receive() if message: snek_msg = json.dumps({snekboxid: message}) - print(f"forwarding {snek_msg} to rabbitmq") - publish(snek_msg, - host=HOST, - queue=QUEUE, - routingkey=ROUTING_KEY, - exchange=EXCHANGE, - exchange_type=EXCHANGE_TYPE) + log.info(f"forwarding {snek_msg} to rabbitmq") + rmq.publish(snek_msg) except Exception: - print(traceback.format_exc()) + log.info(traceback.format_exc()) finally: if not ws.closed: diff --git a/templates/index.html b/templates/index.html index ef53d74..9fd6350 100644 --- a/templates/index.html +++ b/templates/index.html @@ -91,8 +91,10 @@ function generate_id(){ return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); } - - +window.onbeforeunload = function() { + sendMessage("disconnect") + websocket.close() +}; window.addEventListener("load", init, false); -- cgit v1.2.3