aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-23 22:38:20 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-23 22:38:20 +0200
commit309a6f93f878fc96951902fc47d45a30ef5f8d71 (patch)
treef788b43a892a93d0f97da73f459a55b43e1ea1a0
parentupdate readme (diff)
POC completed
-rw-r--r--.dockerignore4
-rw-r--r--Pipfile12
-rw-r--r--README.md15
-rw-r--r--config.py25
-rw-r--r--docker/Dockerfile4
-rw-r--r--docker/Dockerfile.webapp4
-rw-r--r--rmq/__init__.py0
-rw-r--r--rmq/consumer.py35
-rw-r--r--rmq/publisher.py26
-rw-r--r--snekbox.py49
-rw-r--r--snekbox/config.py10
-rw-r--r--snekbox/consume.py75
-rw-r--r--snekbox/publish.py46
-rw-r--r--snekweb.py78
-rw-r--r--templates/index.html (renamed from webapp/templates/index.html)48
-rw-r--r--webapp/rmq.py48
-rw-r--r--webapp/webapp.py39
17 files changed, 276 insertions, 242 deletions
diff --git a/.dockerignore b/.dockerignore
index 210f85f..65555c9 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -13,3 +13,7 @@ Vagrantfile
.gitignore
.travis.yml
docker
+docker-compose.yml
+LICENSE
+README.md
+tox.ini
diff --git a/Pipfile b/Pipfile
index f92847e..54f4847 100644
--- a/Pipfile
+++ b/Pipfile
@@ -17,9 +17,9 @@ gevent-websocket = "*"
python_version = "3.6"
[scripts]
-consume = "python consume.py"
-publish = "python publish.py"
-webapp = "python webapp.py"
-build = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ."
-buildwebapp = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ."
-compose = "docker-compose up"
+snekbox = "python snekbox.py"
+snekweb = "python snekweb.py"
+buildbox = "docker build -t pythondiscord/snekbox:latest -f docker/Dockerfile ."
+buildweb = "docker build -t pythondiscord/snekboxweb:latest -f docker/Dockerfile.webapp ."
+pushbox = "docker push pythondiscord/snekbox:latest"
+pushweb = "docker push pythondiscord/snekboxweb:latest"
diff --git a/README.md b/README.md
index 93d5f18..2bb1a35 100644
--- a/README.md
+++ b/README.md
@@ -43,20 +43,27 @@ docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq
# If not, change the snekbox/config.py file to match
```
+start the webserver
+
+```bash
+docker run --name snekboxweb --network=host -d pythondiscord/snekboxweb:latest
+netstat -plnt
+# tcp 0.0.0.0:5000 LISTEN
+```
+
## Test the code
use two terminals!
```bash
#terminal 1
-pipenv run python snekbox/consume.py
+pipenv run python snekbox.py
#terminal 2
-pipenv run python snekbox/publish.py
+pipenv run python snekweb.py
```
-The publish will put a message on the message queue
-and the consumer will pick it up and do stuff
+`http://localhost:5000`
## Build and run the consumer in a container
diff --git a/config.py b/config.py
new file mode 100644
index 0000000..cb428fe
--- /dev/null
+++ b/config.py
@@ -0,0 +1,25 @@
+import os
+
+def attempt_automatically_finding_the_ip_of_rmq():
+ try:
+ import docker
+ client = docker.from_env()
+ containers = client.containers.get('snekbox_pdrmq_1')
+ HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
+ return HOST
+ except:
+ return '172.17.0.2'
+
+USERNAME = 'guest'
+PASSWORD = 'guest'
+HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq())
+PORT = 5672
+EXCHANGE_TYPE = 'direct'
+
+QUEUE = 'input'
+EXCHANGE = QUEUE
+ROUTING_KEY = QUEUE
+
+RETURN_QUEUE = 'return'
+RETURN_EXCHANGE = RETURN_QUEUE
+RETURN_ROUTING_KEY = RETURN_QUEUE
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 47be8b1..88d1919 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -14,10 +14,10 @@ RUN pip install pipenv
RUN mkdir -p /snekbox
COPY Pipfile /snekbox
COPY Pipfile.lock /snekbox
-COPY snekbox /snekbox
+COPY . /snekbox
WORKDIR /snekbox
RUN pipenv sync
ENTRYPOINT ["/sbin/tini", "--"]
-CMD ["pipenv", "run", "consume"]
+CMD ["pipenv", "run", "snekbox"]
diff --git a/docker/Dockerfile.webapp b/docker/Dockerfile.webapp
index ea06f20..55c326d 100644
--- a/docker/Dockerfile.webapp
+++ b/docker/Dockerfile.webapp
@@ -14,7 +14,7 @@ RUN pip install pipenv
RUN mkdir -p /webapp
COPY Pipfile /webapp
COPY Pipfile.lock /webapp
-COPY webapp /webapp
+COPY . /webapp
WORKDIR /webapp
RUN pipenv sync --dev
@@ -22,4 +22,4 @@ RUN pipenv sync --dev
EXPOSE 5000
ENTRYPOINT ["/sbin/tini", "--"]
-CMD ["pipenv", "run", "webapp"]
+CMD ["pipenv", "run", "snekweb"]
diff --git a/rmq/__init__.py b/rmq/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/rmq/__init__.py
diff --git a/rmq/consumer.py b/rmq/consumer.py
new file mode 100644
index 0000000..8e1ce2b
--- /dev/null
+++ b/rmq/consumer.py
@@ -0,0 +1,35 @@
+import time
+import traceback
+import pika
+from pika.exceptions import ConnectionClosed
+
+def consume(username='guest', password='guest', host='localhost', port=5672, queue='', callback=None):
+ while True:
+ credentials = pika.PlainCredentials(username, password)
+ con_params = pika.ConnectionParameters(host, port, '/', credentials)
+
+ try:
+ connection = pika.BlockingConnection(con_params)
+
+ try:
+ channel = connection.channel()
+ channel.queue_declare(queue=queue, durable=False)
+ channel.basic_qos(prefetch_count=1)
+ channel.basic_consume(callback, queue=queue)
+
+ print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True)
+
+ channel.start_consuming()
+
+ except:
+ exc = traceback.format_exc()
+ print(exc, flush=True)
+
+ finally:
+ connection.close()
+
+ except ConnectionClosed:
+ print(f"Connection lost, reconnecting to {host}", flush=True)
+ pass
+
+ time.sleep(2)
diff --git a/rmq/publisher.py b/rmq/publisher.py
new file mode 100644
index 0000000..4ba9db9
--- /dev/null
+++ b/rmq/publisher.py
@@ -0,0 +1,26 @@
+import pika
+
+def publish(message, username='guest', password='guest', host='localhost', port=5672, queue='', routingkey='', exchange='', exchange_type=''):
+ credentials = pika.PlainCredentials(username, password)
+ connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials))
+ properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
+
+ channel = connection.channel()
+ channel.queue_declare(queue=queue, durable=False)
+ channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)
+ channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ result = channel.basic_publish(
+ exchange=exchange,
+ routing_key=routingkey,
+ body=message,
+ properties=properties
+ )
+
+ if result:
+ print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True)
+ else:
+ print("not delivered")
+
+ connection.close()
+
diff --git a/snekbox.py b/snekbox.py
new file mode 100644
index 0000000..b58447e
--- /dev/null
+++ b/snekbox.py
@@ -0,0 +1,49 @@
+import traceback
+import sys
+import time
+import pika
+import io
+
+from rmq.consumer import consume
+from rmq.publisher import publish
+
+from config import USERNAME
+from config import PASSWORD
+from config import HOST
+from config import PORT
+from config import EXCHANGE
+from config import EXCHANGE_TYPE
+from config import QUEUE
+from config import RETURN_QUEUE
+from config import RETURN_EXCHANGE
+from config import RETURN_ROUTING_KEY
+
+def execute(snippet):
+ old_stdout = sys.stdout
+ redirected_output = sys.stdout = io.StringIO()
+ failed = False
+ try:
+ exec(snippet)
+ except Exception as e:
+ failed = str(e)
+ finally:
+ sys.stdout = old_stdout
+
+ if failed:
+ return failed.strip()
+ return redirected_output.getvalue().strip()
+
+
+def message_handler(ch, method, properties, body):
+ msg = body.decode('utf-8')
+
+ # Execute code snippets here
+ print(f"incoming: {msg}", flush=True)
+ result = execute(msg)
+ print(f"outgoing: {result}", flush=True)
+ publish(result, host=HOST, queue=RETURN_QUEUE, routingkey=RETURN_ROUTING_KEY, exchange=RETURN_EXCHANGE, exchange_type=EXCHANGE_TYPE)
+
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+
+if __name__ == '__main__':
+ consume(host=HOST, queue=QUEUE, callback=message_handler)
diff --git a/snekbox/config.py b/snekbox/config.py
deleted file mode 100644
index d0bd4f6..0000000
--- a/snekbox/config.py
+++ /dev/null
@@ -1,10 +0,0 @@
-import os
-
-USERNAME = 'guest'
-PASSWORD = 'guest'
-HOST = os.environ.get('RMQ_HOST', '172.17.0.2')
-PORT = 5672
-EXCHANGE = 'exchange'
-EXCHANGE_TYPE = 'direct'
-QUEUE = 'text'
-ROUTING_KEY = 'bacon'
diff --git a/snekbox/consume.py b/snekbox/consume.py
deleted file mode 100644
index d44a018..0000000
--- a/snekbox/consume.py
+++ /dev/null
@@ -1,75 +0,0 @@
-import traceback
-import sys
-import time
-import pika
-import io
-
-from pika.exceptions import ConnectionClosed
-
-from config import USERNAME
-from config import PASSWORD
-from config import HOST
-from config import PORT
-from config import EXCHANGE
-from config import QUEUE
-
-def execute(snippet):
- old_stdout = sys.stdout
- redirected_output = sys.stdout = io.StringIO()
- failed = False
- try:
- exec(snippet)
- except Exception as e:
- failed = e
- finally:
- sys.stdout = old_stdout
-
- if failed:
- return failed
- return redirected_output.getvalue()
-
-
-def message_handler(ch, method, properties, body):
- msg = body.decode('utf-8')
-
- # Execute code snippets here
- print(f"incoming: {msg}", flush=True)
- result = execute(msg)
- print(result, flush=True)
-
- ch.basic_ack(delivery_tag = method.delivery_tag)
-
-def rabbitmq_consume():
-
- while True:
- credentials = pika.PlainCredentials(USERNAME, PASSWORD)
- con_params = pika.ConnectionParameters(HOST, PORT, '/', credentials)
-
- try:
- connection = pika.BlockingConnection(con_params)
-
- try:
- channel = connection.channel()
- channel.queue_declare(queue=QUEUE, durable=False)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(message_handler, queue=QUEUE)
-
- print(f"""Connected to \nhost: {HOST}\nport: {PORT}\nexchange: {EXCHANGE}\nqueue: {QUEUE}""", flush=True)
-
- channel.start_consuming()
-
- except:
- exc = traceback.format_exc()
- print(exc, flush=True)
-
- finally:
- connection.close()
-
- except ConnectionClosed:
- print(f"Connection lost, reconnecting to {HOST}", flush=True)
- pass
-
- time.sleep(2)
-
-if __name__ == '__main__':
- rabbitmq_consume()
diff --git a/snekbox/publish.py b/snekbox/publish.py
deleted file mode 100644
index 0598976..0000000
--- a/snekbox/publish.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import pika
-from config import USERNAME
-from config import PASSWORD
-from config import HOST
-from config import PORT
-from config import EXCHANGE
-from config import EXCHANGE_TYPE
-from config import QUEUE
-from config import ROUTING_KEY
-
-try:
- import docker
- client = docker.from_env()
- containers = client.containers.get('snekbox_pdrmq_1')
- print("Attempting to get rabbitmq host automatically")
- HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
- print(f"found {HOST}")
-except:
- pass
-
-def send(message):
- credentials = pika.PlainCredentials(USERNAME, PASSWORD)
- connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials))
- properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
-
- channel = connection.channel()
- channel.queue_declare(queue=QUEUE, durable=False)
- channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
- channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
-
- result = channel.basic_publish(
- exchange=EXCHANGE,
- routing_key=ROUTING_KEY,
- body=message,
- properties=properties
- )
-
- if result:
- print(f"""Connecting to\nhost: {HOST}\nport: {PORT}\nexchange: {EXCHANGE}\nqueue: {QUEUE}""", flush=True)
- print(f"Sent: '{message}'")
- else:
- print("not delivered")
-
- connection.close()
-
-#send('print("bacon is delicious")')
diff --git a/snekweb.py b/snekweb.py
new file mode 100644
index 0000000..02aca20
--- /dev/null
+++ b/snekweb.py
@@ -0,0 +1,78 @@
+import traceback
+import queue
+import threading
+import time
+
+from rmq.publisher import publish
+from rmq.consumer import consume
+
+from flask import Flask
+from flask import render_template
+from flask_sockets import Sockets
+
+from config import HOST
+from config import PORT
+from config import EXCHANGE
+from config import EXCHANGE_TYPE
+from config import QUEUE
+from config import RETURN_QUEUE
+from config import ROUTING_KEY
+
+app = Flask(__name__)
+sockets = Sockets(app)
+RMQ_queue = queue.Queue(maxsize=0)
+
+def message_handler(ch, method, properties, body):
+ msg = body.decode('utf-8')
+ print(f"incoming: {msg} from rabbitmq", flush=True)
+ RMQ_queue.put(msg)
+ ch.basic_ack(delivery_tag = method.delivery_tag)
+
+def relay_to_ws(ws):
+ while not ws.closed:
+ try:
+ msg = RMQ_queue.get(False)
+ if msg:
+ print(f"sending {msg} to user", flush=True)
+ ws.send(msg)
+ except queue.Empty:
+ time.sleep(0.1)
+ pass
+
+t1 = threading.Thread(target=consume, kwargs={'host':HOST, 'queue':RETURN_QUEUE, 'callback':message_handler})
+t1.daemon = True
+t1.start()
+
+def index():
+ return render_template('index.html')
+
+def websocket_route(ws):
+ t2 = threading.Thread(target=relay_to_ws, args=(ws, ))
+ t2.daemon = True
+ t2.start()
+
+ try:
+ while not ws.closed:
+
+ message = ws.receive()
+
+ if not message:
+ continue
+ print(f"forwarding '{message}' to rabbitmq")
+
+ publish(message, host=HOST, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
+
+ except:
+ print(traceback.format_exc())
+
+ finally:
+ if not ws.closed:
+ ws.close()
+
+if __name__ == '__main__':
+ from gevent import pywsgi
+ from geventwebsocket.handler import WebSocketHandler
+ server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
+ server.serve_forever()
diff --git a/webapp/templates/index.html b/templates/index.html
index f2213ff..d0b0630 100644
--- a/webapp/templates/index.html
+++ b/templates/index.html
@@ -1,16 +1,17 @@
<!DOCTYPE html>
<meta charset="utf-8" />
-<title>WebSocket Test</title>
+<title>snekboxweb</title>
<script language="javascript" type="text/javascript">
+let _ready = false
var output;
function init(){
output = document.getElementById("output");
- testWebSocket();
+ websocketHandler();
}
-function testWebSocket(){
+function websocketHandler(){
var here = window.location.host;
var wsUri = `ws://${here}/ws`;
websocket = new WebSocket(wsUri);
@@ -21,25 +22,47 @@ function testWebSocket(){
}
function onOpen(evt){
- writeToScreen("CONNECTED");
+ _ready = true
+ console.log("CONNECTED");
}
function onClose(evt){
- writeToScreen("DISCONNECTED");
+ _ready = false
+ console.log("DISCONNECTED");
}
function onMessage(evt){
writeToScreen('<span style="color: blue;">RESPONSE: ' + evt.data+'</span>');
+}
+
+function exit(){
websocket.close();
}
function onError(evt){
+ _ready = false
writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data);
}
-function sendMessage(message){
- writeToScreen("SENT: " + message);
- websocket.send(message);
+function sendMessage(msg){
+ waitForSocketConnection(function(){
+ websocket.send(msg);
+ });
+ console.log("sent message "+msg)
+}
+
+function waitForSocketConnection(callback){
+ setTimeout(
+ function () {
+ if (_ready === true) {
+ if(callback != null){
+ callback();}
+ return;
+ }
+ else {
+ waitForSocketConnection(callback);}
+
+ }, 500); // milliseconds
}
function writeToScreen(message){
@@ -58,7 +81,12 @@ window.addEventListener("load", init, false);
</script>
-<input type="text" id="field1" value="print('fsdf')"><br>
+<textarea rows="4" cols="50" type="text" id="field1">
+def sum(a,b):
+ return a+b
+print( sum(1,2) )
+</textarea>
+<br>
<button onclick="sendFromInput()">Send</button>
-
+<button onclick="exit()">disconnect from websocket</button>
<div id="output"></div>
diff --git a/webapp/rmq.py b/webapp/rmq.py
deleted file mode 100644
index 80d418a..0000000
--- a/webapp/rmq.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import os
-import pika
-
-USERNAME = 'guest'
-PASSWORD = 'guest'
-HOST = os.environ.get('RMQ_HOST', '172.17.0.2')
-PORT = 5672
-EXCHANGE = 'exchange'
-EXCHANGE_TYPE = 'direct'
-QUEUE = 'text'
-ROUTING_KEY = 'bacon'
-
-try:
- import docker
- client = docker.from_env()
- containers = client.containers.get('snekbox_pdrmq_1')
- print("Attempting to get rabbitmq host automatically")
- HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
- print(f"found {HOST}")
-except:
- pass
-
-def send(message):
- credentials = pika.PlainCredentials(USERNAME, PASSWORD)
- connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, PORT, '/', credentials))
- properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
-
- channel = connection.channel()
- channel.queue_declare(queue=QUEUE, durable=False)
- channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
- channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
-
- result = channel.basic_publish(
- exchange=EXCHANGE,
- routing_key=ROUTING_KEY,
- body=message,
- properties=properties
- )
-
- if result:
- print(f"""Connecting to\nhost: {HOST}\nport: {PORT}\nexchange: {EXCHANGE}\nqueue: {QUEUE}""", flush=True)
- print(f"Sent: '{message}'")
- else:
- print("not delivered")
-
- connection.close()
-
-#send('print("bacon is delicious")')
diff --git a/webapp/webapp.py b/webapp/webapp.py
deleted file mode 100644
index 676fc27..0000000
--- a/webapp/webapp.py
+++ /dev/null
@@ -1,39 +0,0 @@
-import traceback
-
-from rmq import send as rmq_send
-from flask import Flask
-from flask import render_template
-from flask_sockets import Sockets
-
-app = Flask(__name__)
-sockets = Sockets(app)
-
-def index():
- return render_template('index.html')
-
-def websocket_route(ws):
- try:
- while not ws.closed:
- message = ws.receive()
-
- if not message:
- continue
- print(f"received '{message}'")
-
- rmq_send(message)
-
- except:
- print(traceback.format_exec())
-
- finally:
- if not ws.closed:
- ws.close()
-
-if __name__ == '__main__':
- #app.run(host='0.0.0.0', port=5000, debug=True)
- from gevent import pywsgi
- from geventwebsocket.handler import WebSocketHandler
- server = pywsgi.WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
- server.serve_forever()