diff options
| author | 2018-05-23 22:38:20 +0200 | |
|---|---|---|
| committer | 2018-05-23 22:38:20 +0200 | |
| commit | 309a6f93f878fc96951902fc47d45a30ef5f8d71 (patch) | |
| tree | f788b43a892a93d0f97da73f459a55b43e1ea1a0 | |
| parent | update readme (diff) | |
POC completed
| -rw-r--r-- | .dockerignore | 4 | ||||
| -rw-r--r-- | Pipfile | 12 | ||||
| -rw-r--r-- | README.md | 15 | ||||
| -rw-r--r-- | config.py | 25 | ||||
| -rw-r--r-- | docker/Dockerfile | 4 | ||||
| -rw-r--r-- | docker/Dockerfile.webapp | 4 | ||||
| -rw-r--r-- | rmq/__init__.py | 0 | ||||
| -rw-r--r-- | rmq/consumer.py | 35 | ||||
| -rw-r--r-- | rmq/publisher.py | 26 | ||||
| -rw-r--r-- | snekbox.py | 49 | ||||
| -rw-r--r-- | snekbox/config.py | 10 | ||||
| -rw-r--r-- | snekbox/consume.py | 75 | ||||
| -rw-r--r-- | snekbox/publish.py | 46 | ||||
| -rw-r--r-- | snekweb.py | 78 | ||||
| -rw-r--r-- | templates/index.html (renamed from webapp/templates/index.html) | 48 | ||||
| -rw-r--r-- | webapp/rmq.py | 48 | ||||
| -rw-r--r-- | webapp/webapp.py | 39 | 
17 files changed, 276 insertions, 242 deletions
| diff --git a/.dockerignore b/.dockerignore index 210f85f..65555c9 100644 --- a/.dockerignore +++ b/.dockerignore @@ -13,3 +13,7 @@ Vagrantfile  .gitignore  .travis.yml  docker +docker-compose.yml +LICENSE +README.md +tox.ini @@ -17,9 +17,9 @@ gevent-websocket = "*"  python_version = "3.6"  [scripts] -consume = "python consume.py" -publish = "python publish.py" -webapp = "python webapp.py" -build = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." -buildwebapp = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." -compose = "docker-compose up" +snekbox = "python snekbox.py" +snekweb = "python snekweb.py" +buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." +buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." +pushbox = "docker push pythondiscord/snekbox:latest" +pushweb = "docker push pythondiscord/snekboxweb:latest" @@ -43,20 +43,27 @@ docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq  # If not, change the snekbox/config.py file to match  ``` +start the webserver + +```bash +docker run --name snekboxweb --network=host -d pythondiscord/snekboxweb:latest +netstat -plnt +# tcp    0.0.0.0:5000    LISTEN +``` +  ## Test the code  use two terminals!  ```bash  #terminal 1 -pipenv run python snekbox/consume.py +pipenv run python snekbox.py  #terminal 2 -pipenv run python snekbox/publish.py +pipenv run python snekweb.py  ``` -The publish will put a message on the message queue -and the consumer will pick it up and do stuff +`http://localhost:5000`  ## Build and run the consumer in a container diff --git a/config.py b/config.py new file mode 100644 index 0000000..cb428fe --- /dev/null +++ b/config.py @@ -0,0 +1,25 @@ +import os + +def attempt_automatically_finding_the_ip_of_rmq(): +    try: +        import docker +        client = docker.from_env() +        containers = client.containers.get('snekbox_pdrmq_1') +        HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] +        return HOST +    except: +        return '172.17.0.2' + +USERNAME = 'guest' +PASSWORD = 'guest' +HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq()) +PORT = 5672 +EXCHANGE_TYPE = 'direct' + +QUEUE = 'input' +EXCHANGE = QUEUE +ROUTING_KEY = QUEUE + +RETURN_QUEUE = 'return' +RETURN_EXCHANGE = RETURN_QUEUE +RETURN_ROUTING_KEY = RETURN_QUEUE diff --git a/docker/Dockerfile b/docker/Dockerfile index 47be8b1..88d1919 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -14,10 +14,10 @@ RUN pip install pipenv  RUN mkdir -p /snekbox  COPY Pipfile /snekbox  COPY Pipfile.lock /snekbox -COPY snekbox /snekbox +COPY . /snekbox  WORKDIR /snekbox  RUN pipenv sync  ENTRYPOINT ["/sbin/tini", "--"] -CMD ["pipenv", "run", "consume"] +CMD ["pipenv", "run", "snekbox"] diff --git a/docker/Dockerfile.webapp b/docker/Dockerfile.webapp index ea06f20..55c326d 100644 --- a/docker/Dockerfile.webapp +++ b/docker/Dockerfile.webapp @@ -14,7 +14,7 @@ RUN pip install pipenv  RUN mkdir -p /webapp  COPY Pipfile /webapp  COPY Pipfile.lock /webapp -COPY webapp /webapp +COPY . /webapp  WORKDIR /webapp  RUN pipenv sync --dev @@ -22,4 +22,4 @@ RUN pipenv sync --dev  EXPOSE 5000  ENTRYPOINT ["/sbin/tini", "--"] -CMD ["pipenv", "run", "webapp"] +CMD ["pipenv", "run", "snekweb"] diff --git a/rmq/__init__.py b/rmq/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/rmq/__init__.py diff --git a/rmq/consumer.py b/rmq/consumer.py new file mode 100644 index 0000000..8e1ce2b --- /dev/null +++ b/rmq/consumer.py @@ -0,0 +1,35 @@ +import time +import traceback +import pika +from pika.exceptions import ConnectionClosed + +def consume(username='guest', password='guest', host='localhost', port=5672, queue='', callback=None): +    while True: +        credentials = pika.PlainCredentials(username, password) +        con_params = pika.ConnectionParameters(host, port, '/', credentials) + +        try: +            connection = pika.BlockingConnection(con_params) + +            try: +                channel = connection.channel() +                channel.queue_declare(queue=queue, durable=False) +                channel.basic_qos(prefetch_count=1) +                channel.basic_consume(callback, queue=queue) + +                print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) + +                channel.start_consuming() + +            except: +                exc = traceback.format_exc() +                print(exc, flush=True) + +            finally: +                connection.close() + +        except ConnectionClosed: +            print(f"Connection lost, reconnecting to {host}", flush=True) +            pass + +        time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py new file mode 100644 index 0000000..4ba9db9 --- /dev/null +++ b/rmq/publisher.py @@ -0,0 +1,26 @@ +import pika + +def publish(message, username='guest', password='guest', host='localhost', port=5672, queue='', routingkey='', exchange='', exchange_type=''): +    credentials = pika.PlainCredentials(username, password) +    connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) +    properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + +    channel = connection.channel() +    channel.queue_declare(queue=queue, durable=False) +    channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) +    channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +    result = channel.basic_publish( +                exchange=exchange, +                routing_key=routingkey, +                body=message, +                properties=properties +    ) + +    if result: +        print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) +    else: +        print("not delivered") + +    connection.close() + diff --git a/snekbox.py b/snekbox.py new file mode 100644 index 0000000..b58447e --- /dev/null +++ b/snekbox.py @@ -0,0 +1,49 @@ +import traceback +import sys +import time +import pika +import io + +from rmq.consumer import consume +from rmq.publisher import publish + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE +from config import EXCHANGE_TYPE +from config import QUEUE +from config import RETURN_QUEUE +from config import RETURN_EXCHANGE +from config import RETURN_ROUTING_KEY + +def execute(snippet): +    old_stdout = sys.stdout +    redirected_output = sys.stdout = io.StringIO() +    failed = False +    try: +        exec(snippet) +    except Exception as e: +        failed = str(e) +    finally: +        sys.stdout = old_stdout + +    if failed: +        return failed.strip() +    return redirected_output.getvalue().strip() + + +def message_handler(ch, method, properties, body): +    msg = body.decode('utf-8') + +    # Execute code snippets here +    print(f"incoming: {msg}", flush=True) +    result = execute(msg) +    print(f"outgoing: {result}", flush=True) +    publish(result, host=HOST, queue=RETURN_QUEUE, routingkey=RETURN_ROUTING_KEY, exchange=RETURN_EXCHANGE, exchange_type=EXCHANGE_TYPE) + +    ch.basic_ack(delivery_tag = method.delivery_tag) + +if __name__ == '__main__': +    consume(host=HOST, queue=QUEUE, callback=message_handler) diff --git a/snekbox/config.py b/snekbox/config.py deleted file mode 100644 index d0bd4f6..0000000 --- a/snekbox/config.py +++ /dev/null @@ -1,10 +0,0 @@ -import os - -USERNAME      = 'guest' -PASSWORD      = 'guest' -HOST          = os.environ.get('RMQ_HOST', '172.17.0.2') -PORT          = 5672 -EXCHANGE      = 'exchange' -EXCHANGE_TYPE = 'direct' -QUEUE         = 'text' -ROUTING_KEY   = 'bacon' diff --git a/snekbox/consume.py b/snekbox/consume.py deleted file mode 100644 index d44a018..0000000 --- a/snekbox/consume.py +++ /dev/null @@ -1,75 +0,0 @@ -import traceback -import sys -import time -import pika -import io - -from pika.exceptions import ConnectionClosed - -from config import USERNAME -from config import PASSWORD -from config import HOST -from config import PORT -from config import EXCHANGE -from config import QUEUE - -def execute(snippet): -    old_stdout = sys.stdout -    redirected_output = sys.stdout = io.StringIO() -    failed = False -    try: -        exec(snippet) -    except Exception as e: -        failed = e -    finally: -        sys.stdout = old_stdout - -    if failed: -        return failed -    return redirected_output.getvalue() - - -def message_handler(ch, method, properties, body): -    msg = body.decode('utf-8') - -    # Execute code snippets here -    print(f"incoming: {msg}", flush=True) -    result = execute(msg) -    print(result, flush=True) - -    ch.basic_ack(delivery_tag = method.delivery_tag) - -def rabbitmq_consume(): - -    while True: -        credentials = pika.PlainCredentials(USERNAME, PASSWORD) -        con_params = pika.ConnectionParameters(HOST, PORT, '/', credentials) - -        try: -            connection = pika.BlockingConnection(con_params) - -            try: -                channel = connection.channel() -                channel.queue_declare(queue=QUEUE, durable=False) -                channel.basic_qos(prefetch_count=1) -                channel.basic_consume(message_handler, queue=QUEUE) - -                print(f"""Connected to \nhost:     {HOST}\nport:     {PORT}\nexchange: {EXCHANGE}\nqueue:    {QUEUE}""", flush=True) - -                channel.start_consuming() - -            except: -                exc = traceback.format_exc() -                print(exc, flush=True) - -            finally: -                connection.close() - -        except ConnectionClosed: -            print(f"Connection lost, reconnecting to {HOST}", flush=True) -            pass - -        time.sleep(2) - -if __name__ == '__main__': -    rabbitmq_consume() diff --git a/snekbox/publish.py b/snekbox/publish.py deleted file mode 100644 index 0598976..0000000 --- a/snekbox/publish.py +++ /dev/null @@ -1,46 +0,0 @@ -import pika -from config import USERNAME -from config import PASSWORD -from config import HOST -from config import PORT -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY - -try: -    import docker -    client = docker.from_env() -    containers = client.containers.get('snekbox_pdrmq_1') -    print("Attempting to get rabbitmq host automatically") -    HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] -    print(f"found {HOST}") -except: -    pass - -def send(message): -    credentials = pika.PlainCredentials(USERNAME, PASSWORD) -    connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials)) -    properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - -    channel = connection.channel() -    channel.queue_declare(queue=QUEUE, durable=False) -    channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) -    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) - -    result = channel.basic_publish( -                exchange=EXCHANGE, -                routing_key=ROUTING_KEY, -                body=message, -                properties=properties -    ) - -    if result: -        print(f"""Connecting to\nhost:     {HOST}\nport:     {PORT}\nexchange: {EXCHANGE}\nqueue:    {QUEUE}""", flush=True) -        print(f"Sent: '{message}'") -    else: -        print("not delivered") - -    connection.close() - -#send('print("bacon is delicious")') diff --git a/snekweb.py b/snekweb.py new file mode 100644 index 0000000..02aca20 --- /dev/null +++ b/snekweb.py @@ -0,0 +1,78 @@ +import traceback +import queue +import threading +import time + +from rmq.publisher import publish +from rmq.consumer import consume + +from flask import Flask +from flask import render_template +from flask_sockets import Sockets + +from config import HOST +from config import PORT +from config import EXCHANGE +from config import EXCHANGE_TYPE +from config import QUEUE +from config import RETURN_QUEUE +from config import ROUTING_KEY + +app = Flask(__name__) +sockets = Sockets(app) +RMQ_queue = queue.Queue(maxsize=0) + +def message_handler(ch, method, properties, body): +    msg = body.decode('utf-8') +    print(f"incoming: {msg} from rabbitmq", flush=True) +    RMQ_queue.put(msg) +    ch.basic_ack(delivery_tag = method.delivery_tag) + +def relay_to_ws(ws): +    while not ws.closed: +        try: +            msg = RMQ_queue.get(False) +            if msg: +                print(f"sending {msg} to user", flush=True) +                ws.send(msg) +        except queue.Empty: +            time.sleep(0.1) +            pass + +t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) +t1.daemon = True +t1.start() + [email protected]('/') +def index(): +    return render_template('index.html') + [email protected]('/ws') +def websocket_route(ws): +    t2 = threading.Thread(target=relay_to_ws, args=(ws, )) +    t2.daemon = True +    t2.start() + +    try: +        while not ws.closed: + +            message = ws.receive() + +            if not message: +                continue +            print(f"forwarding '{message}' to rabbitmq") + +            publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) + +    except: +        print(traceback.format_exc()) + +    finally: +        if not ws.closed: +            ws.close() + +if __name__ == '__main__': +    from gevent import pywsgi +    from geventwebsocket.handler import WebSocketHandler +    server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) +    server.serve_forever() diff --git a/webapp/templates/index.html b/templates/index.html index f2213ff..d0b0630 100644 --- a/webapp/templates/index.html +++ b/templates/index.html @@ -1,16 +1,17 @@  <!DOCTYPE html>  <meta charset="utf-8" /> -<title>WebSocket Test</title> +<title>snekboxweb</title>  <script language="javascript" type="text/javascript"> +let _ready = false  var output;  function init(){      output = document.getElementById("output"); -    testWebSocket(); +    websocketHandler();  } -function testWebSocket(){ +function websocketHandler(){      var here = window.location.host;      var wsUri = `ws://${here}/ws`;      websocket = new WebSocket(wsUri); @@ -21,25 +22,47 @@ function testWebSocket(){  }  function onOpen(evt){ -    writeToScreen("CONNECTED"); +    _ready = true +    console.log("CONNECTED");  }  function onClose(evt){ -    writeToScreen("DISCONNECTED"); +    _ready = false +    console.log("DISCONNECTED");  }  function onMessage(evt){      writeToScreen('<span style="color: blue;">RESPONSE: ' + evt.data+'</span>'); +} + +function exit(){      websocket.close();  }  function onError(evt){ +    _ready = false      writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data);  } -function sendMessage(message){ -    writeToScreen("SENT: " + message); -    websocket.send(message); +function sendMessage(msg){ +    waitForSocketConnection(function(){ +        websocket.send(msg); +    }); +    console.log("sent message "+msg) +} + +function waitForSocketConnection(callback){ +    setTimeout( +        function () { +            if (_ready === true) { +                if(callback != null){ +                    callback();} +                return; +            } +            else { +                waitForSocketConnection(callback);} + +        }, 500); // milliseconds  }  function writeToScreen(message){ @@ -58,7 +81,12 @@ window.addEventListener("load", init, false);  </script> -<input type="text" id="field1" value="print('fsdf')"><br> +<textarea rows="4" cols="50" type="text" id="field1"> +def sum(a,b): +    return a+b +print( sum(1,2) ) +</textarea> +<br>  <button onclick="sendFromInput()">Send</button> - +<button onclick="exit()">disconnect from websocket</button>  <div id="output"></div> diff --git a/webapp/rmq.py b/webapp/rmq.py deleted file mode 100644 index 80d418a..0000000 --- a/webapp/rmq.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -import pika - -USERNAME      = 'guest' -PASSWORD      = 'guest' -HOST          = os.environ.get('RMQ_HOST', '172.17.0.2') -PORT          = 5672 -EXCHANGE      = 'exchange' -EXCHANGE_TYPE = 'direct' -QUEUE         = 'text' -ROUTING_KEY   = 'bacon' - -try: -    import docker -    client = docker.from_env() -    containers = client.containers.get('snekbox_pdrmq_1') -    print("Attempting to get rabbitmq host automatically") -    HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] -    print(f"found {HOST}") -except: -    pass - -def send(message): -    credentials = pika.PlainCredentials(USERNAME, PASSWORD) -    connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials)) -    properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - -    channel = connection.channel() -    channel.queue_declare(queue=QUEUE, durable=False) -    channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) -    channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) - -    result = channel.basic_publish( -                exchange=EXCHANGE, -                routing_key=ROUTING_KEY, -                body=message, -                properties=properties -    ) - -    if result: -        print(f"""Connecting to\nhost:     {HOST}\nport:     {PORT}\nexchange: {EXCHANGE}\nqueue:    {QUEUE}""", flush=True) -        print(f"Sent: '{message}'") -    else: -        print("not delivered") - -    connection.close() - -#send('print("bacon is delicious")') diff --git a/webapp/webapp.py b/webapp/webapp.py deleted file mode 100644 index 676fc27..0000000 --- a/webapp/webapp.py +++ /dev/null @@ -1,39 +0,0 @@ -import traceback - -from rmq import send as rmq_send -from flask import Flask -from flask import render_template -from flask_sockets import Sockets - -app = Flask(__name__) -sockets = Sockets(app) - [email protected]('/') -def index(): -    return render_template('index.html') - [email protected]('/ws') -def websocket_route(ws): -    try: -        while not ws.closed: -            message = ws.receive() - -            if not message: -                continue -            print(f"received '{message}'") - -            rmq_send(message) - -    except: -        print(traceback.format_exec()) - -    finally: -        if not ws.closed: -            ws.close() - -if __name__ == '__main__': -    #app.run(host='0.0.0.0', port=5000, debug=True) -    from gevent import pywsgi -    from geventwebsocket.handler import WebSocketHandler -    server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) -    server.serve_forever() | 
