diff options
-rw-r--r-- | .dockerignore | 4 | ||||
-rw-r--r-- | Pipfile | 12 | ||||
-rw-r--r-- | README.md | 15 | ||||
-rw-r--r-- | config.py | 25 | ||||
-rw-r--r-- | docker/Dockerfile | 4 | ||||
-rw-r--r-- | docker/Dockerfile.webapp | 4 | ||||
-rw-r--r-- | rmq/__init__.py | 0 | ||||
-rw-r--r-- | rmq/consumer.py | 35 | ||||
-rw-r--r-- | rmq/publisher.py | 26 | ||||
-rw-r--r-- | snekbox.py | 49 | ||||
-rw-r--r-- | snekbox/config.py | 10 | ||||
-rw-r--r-- | snekbox/consume.py | 75 | ||||
-rw-r--r-- | snekbox/publish.py | 46 | ||||
-rw-r--r-- | snekweb.py | 78 | ||||
-rw-r--r-- | templates/index.html (renamed from webapp/templates/index.html) | 48 | ||||
-rw-r--r-- | webapp/rmq.py | 48 | ||||
-rw-r--r-- | webapp/webapp.py | 39 |
17 files changed, 276 insertions, 242 deletions
diff --git a/.dockerignore b/.dockerignore index 210f85f..65555c9 100644 --- a/.dockerignore +++ b/.dockerignore @@ -13,3 +13,7 @@ Vagrantfile .gitignore .travis.yml docker +docker-compose.yml +LICENSE +README.md +tox.ini @@ -17,9 +17,9 @@ gevent-websocket = "*" python_version = "3.6" [scripts] -consume = "python consume.py" -publish = "python publish.py" -webapp = "python webapp.py" -build = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." -buildwebapp = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." -compose = "docker-compose up" +snekbox = "python snekbox.py" +snekweb = "python snekweb.py" +buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ." +buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ." +pushbox = "docker push pythondiscord/snekbox:latest" +pushweb = "docker push pythondiscord/snekboxweb:latest" @@ -43,20 +43,27 @@ docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq # If not, change the snekbox/config.py file to match ``` +start the webserver + +```bash +docker run --name snekboxweb --network=host -d pythondiscord/snekboxweb:latest +netstat -plnt +# tcp 0.0.0.0:5000 LISTEN +``` + ## Test the code use two terminals! ```bash #terminal 1 -pipenv run python snekbox/consume.py +pipenv run python snekbox.py #terminal 2 -pipenv run python snekbox/publish.py +pipenv run python snekweb.py ``` -The publish will put a message on the message queue -and the consumer will pick it up and do stuff +`http://localhost:5000` ## Build and run the consumer in a container diff --git a/config.py b/config.py new file mode 100644 index 0000000..cb428fe --- /dev/null +++ b/config.py @@ -0,0 +1,25 @@ +import os + +def attempt_automatically_finding_the_ip_of_rmq(): + try: + import docker + client = docker.from_env() + containers = client.containers.get('snekbox_pdrmq_1') + HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] + return HOST + except: + return '172.17.0.2' + +USERNAME = 'guest' +PASSWORD = 'guest' +HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq()) +PORT = 5672 +EXCHANGE_TYPE = 'direct' + +QUEUE = 'input' +EXCHANGE = QUEUE +ROUTING_KEY = QUEUE + +RETURN_QUEUE = 'return' +RETURN_EXCHANGE = RETURN_QUEUE +RETURN_ROUTING_KEY = RETURN_QUEUE diff --git a/docker/Dockerfile b/docker/Dockerfile index 47be8b1..88d1919 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -14,10 +14,10 @@ RUN pip install pipenv RUN mkdir -p /snekbox COPY Pipfile /snekbox COPY Pipfile.lock /snekbox -COPY snekbox /snekbox +COPY . /snekbox WORKDIR /snekbox RUN pipenv sync ENTRYPOINT ["/sbin/tini", "--"] -CMD ["pipenv", "run", "consume"] +CMD ["pipenv", "run", "snekbox"] diff --git a/docker/Dockerfile.webapp b/docker/Dockerfile.webapp index ea06f20..55c326d 100644 --- a/docker/Dockerfile.webapp +++ b/docker/Dockerfile.webapp @@ -14,7 +14,7 @@ RUN pip install pipenv RUN mkdir -p /webapp COPY Pipfile /webapp COPY Pipfile.lock /webapp -COPY webapp /webapp +COPY . /webapp WORKDIR /webapp RUN pipenv sync --dev @@ -22,4 +22,4 @@ RUN pipenv sync --dev EXPOSE 5000 ENTRYPOINT ["/sbin/tini", "--"] -CMD ["pipenv", "run", "webapp"] +CMD ["pipenv", "run", "snekweb"] diff --git a/rmq/__init__.py b/rmq/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/rmq/__init__.py diff --git a/rmq/consumer.py b/rmq/consumer.py new file mode 100644 index 0000000..8e1ce2b --- /dev/null +++ b/rmq/consumer.py @@ -0,0 +1,35 @@ +import time +import traceback +import pika +from pika.exceptions import ConnectionClosed + +def consume(username='guest', password='guest', host='localhost', port=5672, queue='', callback=None): + while True: + credentials = pika.PlainCredentials(username, password) + con_params = pika.ConnectionParameters(host, port, '/', credentials) + + try: + connection = pika.BlockingConnection(con_params) + + try: + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.basic_qos(prefetch_count=1) + channel.basic_consume(callback, queue=queue) + + print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) + + channel.start_consuming() + + except: + exc = traceback.format_exc() + print(exc, flush=True) + + finally: + connection.close() + + except ConnectionClosed: + print(f"Connection lost, reconnecting to {host}", flush=True) + pass + + time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py new file mode 100644 index 0000000..4ba9db9 --- /dev/null +++ b/rmq/publisher.py @@ -0,0 +1,26 @@ +import pika + +def publish(message, username='guest', password='guest', host='localhost', port=5672, queue='', routingkey='', exchange='', exchange_type=''): + credentials = pika.PlainCredentials(username, password) + connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) + properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) + channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + result = channel.basic_publish( + exchange=exchange, + routing_key=routingkey, + body=message, + properties=properties + ) + + if result: + print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) + else: + print("not delivered") + + connection.close() + diff --git a/snekbox.py b/snekbox.py new file mode 100644 index 0000000..b58447e --- /dev/null +++ b/snekbox.py @@ -0,0 +1,49 @@ +import traceback +import sys +import time +import pika +import io + +from rmq.consumer import consume +from rmq.publisher import publish + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE +from config import EXCHANGE_TYPE +from config import QUEUE +from config import RETURN_QUEUE +from config import RETURN_EXCHANGE +from config import RETURN_ROUTING_KEY + +def execute(snippet): + old_stdout = sys.stdout + redirected_output = sys.stdout = io.StringIO() + failed = False + try: + exec(snippet) + except Exception as e: + failed = str(e) + finally: + sys.stdout = old_stdout + + if failed: + return failed.strip() + return redirected_output.getvalue().strip() + + +def message_handler(ch, method, properties, body): + msg = body.decode('utf-8') + + # Execute code snippets here + print(f"incoming: {msg}", flush=True) + result = execute(msg) + print(f"outgoing: {result}", flush=True) + publish(result, host=HOST, queue=RETURN_QUEUE, routingkey=RETURN_ROUTING_KEY, exchange=RETURN_EXCHANGE, exchange_type=EXCHANGE_TYPE) + + ch.basic_ack(delivery_tag = method.delivery_tag) + +if __name__ == '__main__': + consume(host=HOST, queue=QUEUE, callback=message_handler) diff --git a/snekbox/config.py b/snekbox/config.py deleted file mode 100644 index d0bd4f6..0000000 --- a/snekbox/config.py +++ /dev/null @@ -1,10 +0,0 @@ -import os - -USERNAME = 'guest' -PASSWORD = 'guest' -HOST = os.environ.get('RMQ_HOST', '172.17.0.2') -PORT = 5672 -EXCHANGE = 'exchange' -EXCHANGE_TYPE = 'direct' -QUEUE = 'text' -ROUTING_KEY = 'bacon' diff --git a/snekbox/consume.py b/snekbox/consume.py deleted file mode 100644 index d44a018..0000000 --- a/snekbox/consume.py +++ /dev/null @@ -1,75 +0,0 @@ -import traceback -import sys -import time -import pika -import io - -from pika.exceptions import ConnectionClosed - -from config import USERNAME -from config import PASSWORD -from config import HOST -from config import PORT -from config import EXCHANGE -from config import QUEUE - -def execute(snippet): - old_stdout = sys.stdout - redirected_output = sys.stdout = io.StringIO() - failed = False - try: - exec(snippet) - except Exception as e: - failed = e - finally: - sys.stdout = old_stdout - - if failed: - return failed - return redirected_output.getvalue() - - -def message_handler(ch, method, properties, body): - msg = body.decode('utf-8') - - # Execute code snippets here - print(f"incoming: {msg}", flush=True) - result = execute(msg) - print(result, flush=True) - - ch.basic_ack(delivery_tag = method.delivery_tag) - -def rabbitmq_consume(): - - while True: - credentials = pika.PlainCredentials(USERNAME, PASSWORD) - con_params = pika.ConnectionParameters(HOST, PORT, '/', credentials) - - try: - connection = pika.BlockingConnection(con_params) - - try: - channel = connection.channel() - channel.queue_declare(queue=QUEUE, durable=False) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(message_handler, queue=QUEUE) - - print(f"""Connected to \nhost: {HOST}\nport: {PORT}\nexchange: {EXCHANGE}\nqueue: {QUEUE}""", flush=True) - - channel.start_consuming() - - except: - exc = traceback.format_exc() - print(exc, flush=True) - - finally: - connection.close() - - except ConnectionClosed: - print(f"Connection lost, reconnecting to {HOST}", flush=True) - pass - - time.sleep(2) - -if __name__ == '__main__': - rabbitmq_consume() diff --git a/snekbox/publish.py b/snekbox/publish.py deleted file mode 100644 index 0598976..0000000 --- a/snekbox/publish.py +++ /dev/null @@ -1,46 +0,0 @@ -import pika -from config import USERNAME -from config import PASSWORD -from config import HOST -from config import PORT -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY - -try: - import docker - client = docker.from_env() - containers = client.containers.get('snekbox_pdrmq_1') - print("Attempting to get rabbitmq host automatically") - HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] - print(f"found {HOST}") -except: - pass - -def send(message): - credentials = pika.PlainCredentials(USERNAME, PASSWORD) - connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials)) - properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - - channel = connection.channel() - channel.queue_declare(queue=QUEUE, durable=False) - channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) - channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) - - result = channel.basic_publish( - exchange=EXCHANGE, - routing_key=ROUTING_KEY, - body=message, - properties=properties - ) - - if result: - print(f"""Connecting to\nhost: {HOST}\nport: {PORT}\nexchange: {EXCHANGE}\nqueue: {QUEUE}""", flush=True) - print(f"Sent: '{message}'") - else: - print("not delivered") - - connection.close() - -#send('print("bacon is delicious")') diff --git a/snekweb.py b/snekweb.py new file mode 100644 index 0000000..02aca20 --- /dev/null +++ b/snekweb.py @@ -0,0 +1,78 @@ +import traceback +import queue +import threading +import time + +from rmq.publisher import publish +from rmq.consumer import consume + +from flask import Flask +from flask import render_template +from flask_sockets import Sockets + +from config import HOST +from config import PORT +from config import EXCHANGE +from config import EXCHANGE_TYPE +from config import QUEUE +from config import RETURN_QUEUE +from config import ROUTING_KEY + +app = Flask(__name__) +sockets = Sockets(app) +RMQ_queue = queue.Queue(maxsize=0) + +def message_handler(ch, method, properties, body): + msg = body.decode('utf-8') + print(f"incoming: {msg} from rabbitmq", flush=True) + RMQ_queue.put(msg) + ch.basic_ack(delivery_tag = method.delivery_tag) + +def relay_to_ws(ws): + while not ws.closed: + try: + msg = RMQ_queue.get(False) + if msg: + print(f"sending {msg} to user", flush=True) + ws.send(msg) + except queue.Empty: + time.sleep(0.1) + pass + +t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler}) +t1.daemon = True +t1.start() + [email protected]('/') +def index(): + return render_template('index.html') + [email protected]('/ws') +def websocket_route(ws): + t2 = threading.Thread(target=relay_to_ws, args=(ws, )) + t2.daemon = True + t2.start() + + try: + while not ws.closed: + + message = ws.receive() + + if not message: + continue + print(f"forwarding '{message}' to rabbitmq") + + publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) + + except: + print(traceback.format_exc()) + + finally: + if not ws.closed: + ws.close() + +if __name__ == '__main__': + from gevent import pywsgi + from geventwebsocket.handler import WebSocketHandler + server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) + server.serve_forever() diff --git a/webapp/templates/index.html b/templates/index.html index f2213ff..d0b0630 100644 --- a/webapp/templates/index.html +++ b/templates/index.html @@ -1,16 +1,17 @@ <!DOCTYPE html> <meta charset="utf-8" /> -<title>WebSocket Test</title> +<title>snekboxweb</title> <script language="javascript" type="text/javascript"> +let _ready = false var output; function init(){ output = document.getElementById("output"); - testWebSocket(); + websocketHandler(); } -function testWebSocket(){ +function websocketHandler(){ var here = window.location.host; var wsUri = `ws://${here}/ws`; websocket = new WebSocket(wsUri); @@ -21,25 +22,47 @@ function testWebSocket(){ } function onOpen(evt){ - writeToScreen("CONNECTED"); + _ready = true + console.log("CONNECTED"); } function onClose(evt){ - writeToScreen("DISCONNECTED"); + _ready = false + console.log("DISCONNECTED"); } function onMessage(evt){ writeToScreen('<span style="color: blue;">RESPONSE: ' + evt.data+'</span>'); +} + +function exit(){ websocket.close(); } function onError(evt){ + _ready = false writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data); } -function sendMessage(message){ - writeToScreen("SENT: " + message); - websocket.send(message); +function sendMessage(msg){ + waitForSocketConnection(function(){ + websocket.send(msg); + }); + console.log("sent message "+msg) +} + +function waitForSocketConnection(callback){ + setTimeout( + function () { + if (_ready === true) { + if(callback != null){ + callback();} + return; + } + else { + waitForSocketConnection(callback);} + + }, 500); // milliseconds } function writeToScreen(message){ @@ -58,7 +81,12 @@ window.addEventListener("load", init, false); </script> -<input type="text" id="field1" value="print('fsdf')"><br> +<textarea rows="4" cols="50" type="text" id="field1"> +def sum(a,b): + return a+b +print( sum(1,2) ) +</textarea> +<br> <button onclick="sendFromInput()">Send</button> - +<button onclick="exit()">disconnect from websocket</button> <div id="output"></div> diff --git a/webapp/rmq.py b/webapp/rmq.py deleted file mode 100644 index 80d418a..0000000 --- a/webapp/rmq.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -import pika - -USERNAME = 'guest' -PASSWORD = 'guest' -HOST = os.environ.get('RMQ_HOST', '172.17.0.2') -PORT = 5672 -EXCHANGE = 'exchange' -EXCHANGE_TYPE = 'direct' -QUEUE = 'text' -ROUTING_KEY = 'bacon' - -try: - import docker - client = docker.from_env() - containers = client.containers.get('snekbox_pdrmq_1') - print("Attempting to get rabbitmq host automatically") - HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] - print(f"found {HOST}") -except: - pass - -def send(message): - credentials = pika.PlainCredentials(USERNAME, PASSWORD) - connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials)) - properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - - channel = connection.channel() - channel.queue_declare(queue=QUEUE, durable=False) - channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE) - channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) - - result = channel.basic_publish( - exchange=EXCHANGE, - routing_key=ROUTING_KEY, - body=message, - properties=properties - ) - - if result: - print(f"""Connecting to\nhost: {HOST}\nport: {PORT}\nexchange: {EXCHANGE}\nqueue: {QUEUE}""", flush=True) - print(f"Sent: '{message}'") - else: - print("not delivered") - - connection.close() - -#send('print("bacon is delicious")') diff --git a/webapp/webapp.py b/webapp/webapp.py deleted file mode 100644 index 676fc27..0000000 --- a/webapp/webapp.py +++ /dev/null @@ -1,39 +0,0 @@ -import traceback - -from rmq import send as rmq_send -from flask import Flask -from flask import render_template -from flask_sockets import Sockets - -app = Flask(__name__) -sockets = Sockets(app) - [email protected]('/') -def index(): - return render_template('index.html') - [email protected]('/ws') -def websocket_route(ws): - try: - while not ws.closed: - message = ws.receive() - - if not message: - continue - print(f"received '{message}'") - - rmq_send(message) - - except: - print(traceback.format_exec()) - - finally: - if not ws.closed: - ws.close() - -if __name__ == '__main__': - #app.run(host='0.0.0.0', port=5000, debug=True) - from gevent import pywsgi - from geventwebsocket.handler import WebSocketHandler - server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) - server.serve_forever() |