diff options
-rw-r--r-- | Pipfile | 2 | ||||
-rw-r--r-- | logs.py | 10 | ||||
-rw-r--r-- | rmq.py | 94 | ||||
-rw-r--r-- | rmq/__init__.py | 0 | ||||
-rw-r--r-- | rmq/consumer.py | 42 | ||||
-rw-r--r-- | rmq/publisher.py | 35 | ||||
-rw-r--r-- | snekbox.py | 28 | ||||
-rw-r--r-- | snekweb.py | 61 | ||||
-rw-r--r-- | templates/index.html | 6 |
9 files changed, 146 insertions, 132 deletions
@@ -21,7 +21,7 @@ python_version = "3.6" [scripts] lint = "flake8" snekbox = "python snekbox.py" -snekweb = "gunicorn -w 1 -b 0.0.0.0:5000 -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app" +snekweb = "gunicorn -w 2 -b 0.0.0.0:5000 --log-level debug -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app" buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." pushbox = "docker push pythondiscord/snekbox:latest" @@ -0,0 +1,10 @@ +import logging +import sys + +logformat = logging.Formatter(fmt='[%(asctime)s] [%(process)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S %z') +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) +console = logging.StreamHandler(sys.stdout) +console.setFormatter(logformat) +log.addHandler(console) @@ -0,0 +1,94 @@ +import pika +import time +import traceback +import logging +import sys + +from pika.exceptions import ConnectionClosed + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE_TYPE +from config import QUEUE +from config import ROUTING_KEY +from config import EXCHANGE + +from logs import log + + +class Rmq(object): + + def __init__(self, + username=USERNAME, + password=PASSWORD, + host=HOST, + port=PORT, + exchange_type=EXCHANGE_TYPE): + + self.username = USERNAME + self.password = PASSWORD + self.host = HOST + self.port = PORT + self.exchange_type = EXCHANGE_TYPE + self.credentials = pika.PlainCredentials(self.username, self.password) + self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) + self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + + def consume(self, queue=QUEUE, callback=None, thread_ws=None): + + while True: + try: + connection = pika.BlockingConnection(self.con_params) + + try: + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), + queue=queue) + + log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + + channel.start_consuming() + + except Exception: + exc = traceback.format_exc() + log.info(exc) + + finally: + connection.close() + + except ConnectionClosed: + log.info(f"Connection lost, reconnecting to {self.host}") + pass + + time.sleep(2) + + def publish(self, + message, + queue=QUEUE, + routingkey=ROUTING_KEY, + exchange=EXCHANGE): + + connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) + channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + result = channel.basic_publish( + exchange=exchange, + routing_key=routingkey, + body=message, + properties=self.properties + ) + + if result: + log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") + else: + log.info("not delivered") + + connection.close() diff --git a/rmq/__init__.py b/rmq/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/rmq/__init__.py +++ /dev/null diff --git a/rmq/consumer.py b/rmq/consumer.py deleted file mode 100644 index 2c41e26..0000000 --- a/rmq/consumer.py +++ /dev/null @@ -1,42 +0,0 @@ -import time -import traceback -import pika -from pika.exceptions import ConnectionClosed - - -def consume(username='guest', - password='guest', - host='localhost', - port=5672, - queue='', - callback=None): - - while True: - credentials = pika.PlainCredentials(username, password) - con_params = pika.ConnectionParameters(host, port, '/', credentials) - - try: - connection = pika.BlockingConnection(con_params) - - try: - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(callback, queue=queue) - - print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) - - channel.start_consuming() - - except Exception: - exc = traceback.format_exc() - print(exc, flush=True) - - finally: - connection.close() - - except ConnectionClosed: - print(f"Connection lost, reconnecting to {host}", flush=True) - pass - - time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py deleted file mode 100644 index 1a9a5cf..0000000 --- a/rmq/publisher.py +++ /dev/null @@ -1,35 +0,0 @@ -import pika - - -def publish(message, - username='guest', - password='guest', - host='localhost', - port=5672, - queue='', - routingkey='', - exchange='', - exchange_type=''): - - credentials = pika.PlainCredentials(username, password) - connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) - properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - - result = channel.basic_publish( - exchange=exchange, - routing_key=routingkey, - body=message, - properties=properties - ) - - if result: - print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) - else: - print("not delivered") - - connection.close() @@ -1,13 +1,12 @@ import sys import io import json +import logging -from rmq.consumer import consume -from rmq.publisher import publish +from logs import log +from rmq import Rmq -from config import HOST -from config import EXCHANGE_TYPE -from config import QUEUE +rmq = Rmq() def execute(snippet): @@ -26,24 +25,21 @@ def execute(snippet): return redirected_output.getvalue().strip() -def message_handler(ch, method, properties, body): +def message_handler(ch, method, properties, body, thread_ws=None): msg = body.decode('utf-8') - print(f"incoming: {msg}", flush=True) + log.info(f"incoming: {msg}") snek_msg = json.loads(msg) for snekid, snekcode in snek_msg.items(): result = execute(snekcode) - print(f"outgoing: {result}", flush=True) - publish(result, - host=HOST, - queue=snekid, - routingkey=snekid, - exchange=snekid, - exchange_type=EXCHANGE_TYPE) - + log.info(f"outgoing: {result}") + rmq.publish(result, + queue=snekid, + routingkey=snekid, + exchange=snekid) ch.basic_ack(delivery_tag=method.delivery_tag) if __name__ == '__main__': - consume(host=HOST, queue=QUEUE, callback=message_handler) + rmq.consume(callback=message_handler) @@ -3,24 +3,27 @@ import queue import threading import time import json - -from rmq.publisher import publish -from rmq.consumer import consume +import logging +import geventwebsocket from flask import Flask from flask import render_template from flask_sockets import Sockets -from config import HOST -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY +from rmq import Rmq + +# Load app app = Flask(__name__) app.jinja_env.auto_reload = True sockets = Sockets(app) +# Logging +gunicorn_logger = logging.getLogger('gunicorn.error') +app.logger.handlers = gunicorn_logger.handlers +app.logger.setLevel(gunicorn_logger.level) +log = app.logger + @app.route('/') def index(): @@ -29,48 +32,34 @@ def index(): @sockets.route('/ws/<snekboxid>') def websocket_route(ws, snekboxid): - RMQ_queue = queue.Queue(maxsize=0) + localdata = threading.local() + localdata.thread_ws = ws - def message_handler(ch, method, properties, body): + rmq = Rmq() + + def message_handler(ch, method, properties, body, thread_ws): msg = body.decode('utf-8') - RMQ_queue.put(msg) + log.debug(f"message_handler: {msg}") + thread_ws.send(msg) ch.basic_ack(delivery_tag=method.delivery_tag) - consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler} - consumer = threading.Thread(target=consume, kwargs=consumer_parameters) + consumer_parameters = {'queue': snekboxid, + 'callback': message_handler, + 'thread_ws': localdata.thread_ws} + consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters) consumer.daemon = True consumer.start() - def relay_to_ws(ws): - global client_list - while True: - try: - msg = RMQ_queue.get(False) - if msg: - ws.send(msg) - except queue.Empty: - time.sleep(0.1) - pass - - relay = threading.Thread(target=relay_to_ws, args=(ws,)) - relay.daemon = True - relay.start() - try: while not ws.closed: message = ws.receive() if message: snek_msg = json.dumps({snekboxid: message}) - print(f"forwarding {snek_msg} to rabbitmq") - publish(snek_msg, - host=HOST, - queue=QUEUE, - routingkey=ROUTING_KEY, - exchange=EXCHANGE, - exchange_type=EXCHANGE_TYPE) + log.info(f"forwarding {snek_msg} to rabbitmq") + rmq.publish(snek_msg) except Exception: - print(traceback.format_exc()) + log.info(traceback.format_exc()) finally: if not ws.closed: diff --git a/templates/index.html b/templates/index.html index ef53d74..9fd6350 100644 --- a/templates/index.html +++ b/templates/index.html @@ -91,8 +91,10 @@ function generate_id(){ return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); } - - +window.onbeforeunload = function() { + sendMessage("disconnect") + websocket.close() +}; window.addEventListener("load", init, false); |