aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-26 12:54:49 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-26 12:54:49 +0200
commitea2141dc6fd7284e6f9fa04ee638460286e3b09c (patch)
tree218d332be80bd2f4e177b0d9e4725dd27dec2034
parentlint (diff)
simplify threads and use local thread variables to manage user websocket connections
-rw-r--r--Pipfile2
-rw-r--r--logs.py10
-rw-r--r--rmq.py94
-rw-r--r--rmq/__init__.py0
-rw-r--r--rmq/consumer.py42
-rw-r--r--rmq/publisher.py35
-rw-r--r--snekbox.py28
-rw-r--r--snekweb.py61
-rw-r--r--templates/index.html6
9 files changed, 146 insertions, 132 deletions
diff --git a/Pipfile b/Pipfile
index 5417ae3..c576744 100644
--- a/Pipfile
+++ b/Pipfile
@@ -21,7 +21,7 @@ python_version = "3.6"
[scripts]
lint = "flake8"
snekbox = "python snekbox.py"
-snekweb = "gunicorn -w 1 -b 0.0.0.0:5000 -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app"
+snekweb = "gunicorn -w 2 -b 0.0.0.0:5000 --log-level debug -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app"
buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ."
buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ."
pushbox = "docker push pythondiscord/snekbox:latest"
diff --git a/logs.py b/logs.py
new file mode 100644
index 0000000..fc6070e
--- /dev/null
+++ b/logs.py
@@ -0,0 +1,10 @@
+import logging
+import sys
+
+logformat = logging.Formatter(fmt='[%(asctime)s] [%(process)s] [%(levelname)s] %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S %z')
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+console = logging.StreamHandler(sys.stdout)
+console.setFormatter(logformat)
+log.addHandler(console)
diff --git a/rmq.py b/rmq.py
new file mode 100644
index 0000000..c8547d0
--- /dev/null
+++ b/rmq.py
@@ -0,0 +1,94 @@
+import pika
+import time
+import traceback
+import logging
+import sys
+
+from pika.exceptions import ConnectionClosed
+
+from config import USERNAME
+from config import PASSWORD
+from config import HOST
+from config import PORT
+from config import EXCHANGE_TYPE
+from config import QUEUE
+from config import ROUTING_KEY
+from config import EXCHANGE
+
+from logs import log
+
+
+class Rmq(object):
+
+ def __init__(self,
+ username=USERNAME,
+ password=PASSWORD,
+ host=HOST,
+ port=PORT,
+ exchange_type=EXCHANGE_TYPE):
+
+ self.username = USERNAME
+ self.password = PASSWORD
+ self.host = HOST
+ self.port = PORT
+ self.exchange_type = EXCHANGE_TYPE
+ self.credentials = pika.PlainCredentials(self.username, self.password)
+ self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
+ self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
+
+ def consume(self, queue=QUEUE, callback=None, thread_ws=None):
+
+ while True:
+ try:
+ connection = pika.BlockingConnection(self.con_params)
+
+ try:
+ channel = connection.channel()
+ channel.queue_declare(queue=queue, durable=False)
+ channel.basic_qos(prefetch_count=1)
+ channel.basic_consume(
+ lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws),
+ queue=queue)
+
+ log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
+
+ channel.start_consuming()
+
+ except Exception:
+ exc = traceback.format_exc()
+ log.info(exc)
+
+ finally:
+ connection.close()
+
+ except ConnectionClosed:
+ log.info(f"Connection lost, reconnecting to {self.host}")
+ pass
+
+ time.sleep(2)
+
+ def publish(self,
+ message,
+ queue=QUEUE,
+ routingkey=ROUTING_KEY,
+ exchange=EXCHANGE):
+
+ connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
+ channel = connection.channel()
+ channel.queue_declare(queue=queue, durable=False)
+ channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
+ channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ result = channel.basic_publish(
+ exchange=exchange,
+ routing_key=routingkey,
+ body=message,
+ properties=self.properties
+ )
+
+ if result:
+ log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}")
+ else:
+ log.info("not delivered")
+
+ connection.close()
diff --git a/rmq/__init__.py b/rmq/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/rmq/__init__.py
+++ /dev/null
diff --git a/rmq/consumer.py b/rmq/consumer.py
deleted file mode 100644
index 2c41e26..0000000
--- a/rmq/consumer.py
+++ /dev/null
@@ -1,42 +0,0 @@
-import time
-import traceback
-import pika
-from pika.exceptions import ConnectionClosed
-
-
-def consume(username='guest',
- password='guest',
- host='localhost',
- port=5672,
- queue='',
- callback=None):
-
- while True:
- credentials = pika.PlainCredentials(username, password)
- con_params = pika.ConnectionParameters(host, port, '/', credentials)
-
- try:
- connection = pika.BlockingConnection(con_params)
-
- try:
- channel = connection.channel()
- channel.queue_declare(queue=queue, durable=False)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(callback, queue=queue)
-
- print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True)
-
- channel.start_consuming()
-
- except Exception:
- exc = traceback.format_exc()
- print(exc, flush=True)
-
- finally:
- connection.close()
-
- except ConnectionClosed:
- print(f"Connection lost, reconnecting to {host}", flush=True)
- pass
-
- time.sleep(2)
diff --git a/rmq/publisher.py b/rmq/publisher.py
deleted file mode 100644
index 1a9a5cf..0000000
--- a/rmq/publisher.py
+++ /dev/null
@@ -1,35 +0,0 @@
-import pika
-
-
-def publish(message,
- username='guest',
- password='guest',
- host='localhost',
- port=5672,
- queue='',
- routingkey='',
- exchange='',
- exchange_type=''):
-
- credentials = pika.PlainCredentials(username, password)
- connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials))
- properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
-
- channel = connection.channel()
- channel.queue_declare(queue=queue, durable=False)
- channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)
- channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
-
- result = channel.basic_publish(
- exchange=exchange,
- routing_key=routingkey,
- body=message,
- properties=properties
- )
-
- if result:
- print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True)
- else:
- print("not delivered")
-
- connection.close()
diff --git a/snekbox.py b/snekbox.py
index ee82be8..d6795e8 100644
--- a/snekbox.py
+++ b/snekbox.py
@@ -1,13 +1,12 @@
import sys
import io
import json
+import logging
-from rmq.consumer import consume
-from rmq.publisher import publish
+from logs import log
+from rmq import Rmq
-from config import HOST
-from config import EXCHANGE_TYPE
-from config import QUEUE
+rmq = Rmq()
def execute(snippet):
@@ -26,24 +25,21 @@ def execute(snippet):
return redirected_output.getvalue().strip()
-def message_handler(ch, method, properties, body):
+def message_handler(ch, method, properties, body, thread_ws=None):
msg = body.decode('utf-8')
- print(f"incoming: {msg}", flush=True)
+ log.info(f"incoming: {msg}")
snek_msg = json.loads(msg)
for snekid, snekcode in snek_msg.items():
result = execute(snekcode)
- print(f"outgoing: {result}", flush=True)
- publish(result,
- host=HOST,
- queue=snekid,
- routingkey=snekid,
- exchange=snekid,
- exchange_type=EXCHANGE_TYPE)
-
+ log.info(f"outgoing: {result}")
+ rmq.publish(result,
+ queue=snekid,
+ routingkey=snekid,
+ exchange=snekid)
ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == '__main__':
- consume(host=HOST, queue=QUEUE, callback=message_handler)
+ rmq.consume(callback=message_handler)
diff --git a/snekweb.py b/snekweb.py
index 1c6ea34..8929438 100644
--- a/snekweb.py
+++ b/snekweb.py
@@ -3,24 +3,27 @@ import queue
import threading
import time
import json
-
-from rmq.publisher import publish
-from rmq.consumer import consume
+import logging
+import geventwebsocket
from flask import Flask
from flask import render_template
from flask_sockets import Sockets
-from config import HOST
-from config import EXCHANGE
-from config import EXCHANGE_TYPE
-from config import QUEUE
-from config import ROUTING_KEY
+from rmq import Rmq
+
+# Load app
app = Flask(__name__)
app.jinja_env.auto_reload = True
sockets = Sockets(app)
+# Logging
+gunicorn_logger = logging.getLogger('gunicorn.error')
+app.logger.handlers = gunicorn_logger.handlers
+app.logger.setLevel(gunicorn_logger.level)
+log = app.logger
+
@app.route('/')
def index():
@@ -29,48 +32,34 @@ def index():
@sockets.route('/ws/<snekboxid>')
def websocket_route(ws, snekboxid):
- RMQ_queue = queue.Queue(maxsize=0)
+ localdata = threading.local()
+ localdata.thread_ws = ws
- def message_handler(ch, method, properties, body):
+ rmq = Rmq()
+
+ def message_handler(ch, method, properties, body, thread_ws):
msg = body.decode('utf-8')
- RMQ_queue.put(msg)
+ log.debug(f"message_handler: {msg}")
+ thread_ws.send(msg)
ch.basic_ack(delivery_tag=method.delivery_tag)
- consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler}
- consumer = threading.Thread(target=consume, kwargs=consumer_parameters)
+ consumer_parameters = {'queue': snekboxid,
+ 'callback': message_handler,
+ 'thread_ws': localdata.thread_ws}
+ consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters)
consumer.daemon = True
consumer.start()
- def relay_to_ws(ws):
- global client_list
- while True:
- try:
- msg = RMQ_queue.get(False)
- if msg:
- ws.send(msg)
- except queue.Empty:
- time.sleep(0.1)
- pass
-
- relay = threading.Thread(target=relay_to_ws, args=(ws,))
- relay.daemon = True
- relay.start()
-
try:
while not ws.closed:
message = ws.receive()
if message:
snek_msg = json.dumps({snekboxid: message})
- print(f"forwarding {snek_msg} to rabbitmq")
- publish(snek_msg,
- host=HOST,
- queue=QUEUE,
- routingkey=ROUTING_KEY,
- exchange=EXCHANGE,
- exchange_type=EXCHANGE_TYPE)
+ log.info(f"forwarding {snek_msg} to rabbitmq")
+ rmq.publish(snek_msg)
except Exception:
- print(traceback.format_exc())
+ log.info(traceback.format_exc())
finally:
if not ws.closed:
diff --git a/templates/index.html b/templates/index.html
index ef53d74..9fd6350 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -91,8 +91,10 @@ function generate_id(){
return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
}
-
-
+window.onbeforeunload = function() {
+ sendMessage("disconnect")
+ websocket.close()
+};
window.addEventListener("load", init, false);