diff options
| -rw-r--r-- | Pipfile | 2 | ||||
| -rw-r--r-- | logs.py | 10 | ||||
| -rw-r--r-- | rmq.py | 94 | ||||
| -rw-r--r-- | rmq/__init__.py | 0 | ||||
| -rw-r--r-- | rmq/consumer.py | 42 | ||||
| -rw-r--r-- | rmq/publisher.py | 35 | ||||
| -rw-r--r-- | snekbox.py | 28 | ||||
| -rw-r--r-- | snekweb.py | 61 | ||||
| -rw-r--r-- | templates/index.html | 6 | 
9 files changed, 146 insertions, 132 deletions
| @@ -21,7 +21,7 @@ python_version = "3.6"  [scripts]  lint = "flake8"  snekbox = "python snekbox.py" -snekweb = "gunicorn -w 1 -b 0.0.0.0:5000 -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app" +snekweb = "gunicorn -w 2 -b 0.0.0.0:5000 --log-level debug -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker snekweb:app"  buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ."  buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ."  pushbox = "docker push pythondiscord/snekbox:latest" @@ -0,0 +1,10 @@ +import logging +import sys + +logformat = logging.Formatter(fmt='[%(asctime)s] [%(process)s] [%(levelname)s] %(message)s', +                              datefmt='%Y-%m-%d %H:%M:%S %z') +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) +console = logging.StreamHandler(sys.stdout) +console.setFormatter(logformat) +log.addHandler(console) @@ -0,0 +1,94 @@ +import pika +import time +import traceback +import logging +import sys + +from pika.exceptions import ConnectionClosed + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE_TYPE +from config import QUEUE +from config import ROUTING_KEY +from config import EXCHANGE + +from logs import log + + +class Rmq(object): + +    def __init__(self, +                 username=USERNAME, +                 password=PASSWORD, +                 host=HOST, +                 port=PORT, +                 exchange_type=EXCHANGE_TYPE): + +        self.username = USERNAME +        self.password = PASSWORD +        self.host = HOST +        self.port = PORT +        self.exchange_type = EXCHANGE_TYPE +        self.credentials = pika.PlainCredentials(self.username, self.password) +        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) +        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + +    def consume(self, queue=QUEUE, callback=None, thread_ws=None): + +        while True: +            try: +                connection = pika.BlockingConnection(self.con_params) + +                try: +                    channel = connection.channel() +                    channel.queue_declare(queue=queue, durable=False) +                    channel.basic_qos(prefetch_count=1) +                    channel.basic_consume( +                        lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), +                        queue=queue) + +                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + +                    channel.start_consuming() + +                except Exception: +                    exc = traceback.format_exc() +                    log.info(exc) + +                finally: +                    connection.close() + +            except ConnectionClosed: +                log.info(f"Connection lost, reconnecting to {self.host}") +                pass + +            time.sleep(2) + +    def publish(self, +                message, +                queue=QUEUE, +                routingkey=ROUTING_KEY, +                exchange=EXCHANGE): + +        connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) +        channel = connection.channel() +        channel.queue_declare(queue=queue, durable=False) +        channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) +        channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +        result = channel.basic_publish( +            exchange=exchange, +            routing_key=routingkey, +            body=message, +            properties=self.properties +        ) + +        if result: +            log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") +        else: +            log.info("not delivered") + +        connection.close() diff --git a/rmq/__init__.py b/rmq/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/rmq/__init__.py +++ /dev/null diff --git a/rmq/consumer.py b/rmq/consumer.py deleted file mode 100644 index 2c41e26..0000000 --- a/rmq/consumer.py +++ /dev/null @@ -1,42 +0,0 @@ -import time -import traceback -import pika -from pika.exceptions import ConnectionClosed - - -def consume(username='guest', -            password='guest', -            host='localhost', -            port=5672, -            queue='', -            callback=None): - -    while True: -        credentials = pika.PlainCredentials(username, password) -        con_params = pika.ConnectionParameters(host, port, '/', credentials) - -        try: -            connection = pika.BlockingConnection(con_params) - -            try: -                channel = connection.channel() -                channel.queue_declare(queue=queue, durable=False) -                channel.basic_qos(prefetch_count=1) -                channel.basic_consume(callback, queue=queue) - -                print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) - -                channel.start_consuming() - -            except Exception: -                exc = traceback.format_exc() -                print(exc, flush=True) - -            finally: -                connection.close() - -        except ConnectionClosed: -            print(f"Connection lost, reconnecting to {host}", flush=True) -            pass - -        time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py deleted file mode 100644 index 1a9a5cf..0000000 --- a/rmq/publisher.py +++ /dev/null @@ -1,35 +0,0 @@ -import pika - - -def publish(message, -            username='guest', -            password='guest', -            host='localhost', -            port=5672, -            queue='', -            routingkey='', -            exchange='', -            exchange_type=''): - -    credentials = pika.PlainCredentials(username, password) -    connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) -    properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - -    channel = connection.channel() -    channel.queue_declare(queue=queue, durable=False) -    channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) -    channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - -    result = channel.basic_publish( -        exchange=exchange, -        routing_key=routingkey, -        body=message, -        properties=properties -    ) - -    if result: -        print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) -    else: -        print("not delivered") - -    connection.close() @@ -1,13 +1,12 @@  import sys  import io  import json +import logging -from rmq.consumer import consume -from rmq.publisher import publish +from logs import log +from rmq import Rmq -from config import HOST -from config import EXCHANGE_TYPE -from config import QUEUE +rmq = Rmq()  def execute(snippet): @@ -26,24 +25,21 @@ def execute(snippet):      return redirected_output.getvalue().strip() -def message_handler(ch, method, properties, body): +def message_handler(ch, method, properties, body, thread_ws=None):      msg = body.decode('utf-8') -    print(f"incoming: {msg}", flush=True) +    log.info(f"incoming: {msg}")      snek_msg = json.loads(msg)      for snekid, snekcode in snek_msg.items():          result = execute(snekcode) -        print(f"outgoing: {result}", flush=True) -        publish(result, -                host=HOST, -                queue=snekid, -                routingkey=snekid, -                exchange=snekid, -                exchange_type=EXCHANGE_TYPE) - +        log.info(f"outgoing: {result}") +        rmq.publish(result, +                    queue=snekid, +                    routingkey=snekid, +                    exchange=snekid)      ch.basic_ack(delivery_tag=method.delivery_tag)  if __name__ == '__main__': -    consume(host=HOST, queue=QUEUE, callback=message_handler) +    rmq.consume(callback=message_handler) @@ -3,24 +3,27 @@ import queue  import threading  import time  import json - -from rmq.publisher import publish -from rmq.consumer import consume +import logging +import geventwebsocket  from flask import Flask  from flask import render_template  from flask_sockets import Sockets -from config import HOST -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY +from rmq import Rmq + +# Load app  app = Flask(__name__)  app.jinja_env.auto_reload = True  sockets = Sockets(app) +# Logging +gunicorn_logger = logging.getLogger('gunicorn.error') +app.logger.handlers = gunicorn_logger.handlers +app.logger.setLevel(gunicorn_logger.level) +log = app.logger +  @app.route('/')  def index(): @@ -29,48 +32,34 @@ def index():  @sockets.route('/ws/<snekboxid>')  def websocket_route(ws, snekboxid): -    RMQ_queue = queue.Queue(maxsize=0) +    localdata = threading.local() +    localdata.thread_ws = ws -    def message_handler(ch, method, properties, body): +    rmq = Rmq() + +    def message_handler(ch, method, properties, body, thread_ws):          msg = body.decode('utf-8') -        RMQ_queue.put(msg) +        log.debug(f"message_handler: {msg}") +        thread_ws.send(msg)          ch.basic_ack(delivery_tag=method.delivery_tag) -    consumer_parameters = {'host': HOST, 'queue': snekboxid, 'callback': message_handler} -    consumer = threading.Thread(target=consume, kwargs=consumer_parameters) +    consumer_parameters = {'queue': snekboxid, +                           'callback': message_handler, +                           'thread_ws': localdata.thread_ws} +    consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters)      consumer.daemon = True      consumer.start() -    def relay_to_ws(ws): -        global client_list -        while True: -            try: -                msg = RMQ_queue.get(False) -                if msg: -                    ws.send(msg) -            except queue.Empty: -                time.sleep(0.1) -                pass - -    relay = threading.Thread(target=relay_to_ws, args=(ws,)) -    relay.daemon = True -    relay.start() -      try:          while not ws.closed:              message = ws.receive()              if message:                  snek_msg = json.dumps({snekboxid: message}) -                print(f"forwarding {snek_msg} to rabbitmq") -                publish(snek_msg, -                        host=HOST, -                        queue=QUEUE, -                        routingkey=ROUTING_KEY, -                        exchange=EXCHANGE, -                        exchange_type=EXCHANGE_TYPE) +                log.info(f"forwarding {snek_msg} to rabbitmq") +                rmq.publish(snek_msg)      except Exception: -        print(traceback.format_exc()) +        log.info(traceback.format_exc())      finally:          if not ws.closed: diff --git a/templates/index.html b/templates/index.html index ef53d74..9fd6350 100644 --- a/templates/index.html +++ b/templates/index.html @@ -91,8 +91,10 @@ function generate_id(){      return Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);  } - - +window.onbeforeunload = function() { +    sendMessage("disconnect") +    websocket.close() +};  window.addEventListener("load", init, false); | 
