aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-26 16:37:56 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-26 16:37:56 +0200
commit1f4b827b439e236a24a811404dcb5c2e278f206d (patch)
tree4fa1251fe338a38dc3f955fa7783cac497b563ca
parentautomatically clean up queues and message to reduce rabbitmq memory footprint (diff)
add rabbitmq mgmt webinterface, handle container autodiscovery better, minor code optimisations, update readme
Diffstat (limited to '')
-rw-r--r--README.md4
-rw-r--r--config.py24
-rw-r--r--docker-compose.yml13
-rw-r--r--rmq.py54
-rw-r--r--snekweb.py7
5 files changed, 65 insertions, 37 deletions
diff --git a/README.md b/README.md
index 031e817..52ab8ef 100644
--- a/README.md
+++ b/README.md
@@ -36,12 +36,14 @@ pipenv sync --dev
Start a rabbitmq instance and get the container IP
```bash
-docker run --name rmq -d rabbitmq:3.7.5-alpine
+docker run -d --name rmq -p 15672:15672 -e RABBITMQ_DEFAULT_USER=rabbits -e RABBITMQ_DEFAULT_PASS=rabbits rabbitmq:3.7.5-management-alpine
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq
# expected output with default setting: 172.17.0.2
# If not, change the config.py file to match
```
+rabbitmq webinterface: `http://localhost:15672`
+
start the webserver
```bash
diff --git a/config.py b/config.py
index f6ae0f7..beb435e 100644
--- a/config.py
+++ b/config.py
@@ -1,27 +1,25 @@
import os
-def attempt_automatically_finding_the_ip_of_rmq():
+def autodiscover():
+ container_names = ["rmq", "pdrmq", "snekbox_pdrmq_1"]
try:
import docker
client = docker.from_env()
- containers = client.containers.get('snekbox_pdrmq_1')
- HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
- return HOST
+ for name in container_names:
+ container = client.containers.get(name)
+ if container.status == "running":
+ host = list(container.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
+ return host
except Exception:
return '172.17.0.2'
-USERNAME = 'guest'
-PASSWORD = 'guest'
-HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq())
+USERNAME = os.environ.get('RMQ_USERNAME', 'rabbits')
+PASSWORD = os.environ.get('RMQ_PASSWORD', 'rabbits')
+HOST = os.environ.get('RMQ_HOST', autodiscover())
PORT = 5672
-EXCHANGE_TYPE = 'direct'
-
QUEUE = 'input'
EXCHANGE = QUEUE
ROUTING_KEY = QUEUE
-
-RETURN_QUEUE = 'return'
-RETURN_EXCHANGE = RETURN_QUEUE
-RETURN_ROUTING_KEY = RETURN_QUEUE
+EXCHANGE_TYPE = 'direct'
diff --git a/docker-compose.yml b/docker-compose.yml
index 667a700..67dbdde 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -2,9 +2,16 @@ version: '3'
services:
pdrmq:
hostname: "pdrmq"
- image: rabbitmq:3.7.5-alpine
+ image: rabbitmq:3.7.5-management-alpine
+ expose:
+ - "15672"
+ ports:
+ - "15672:15672"
networks:
- sneknet
+ environment:
+ RABBITMQ_DEFAULT_USER: rabbits
+ RABBITMQ_DEFAULT_PASS: rabbits
pdsnekbox:
hostname: "pdsnekbox"
@@ -13,6 +20,8 @@ services:
- sneknet
environment:
RMQ_HOST: pdrmq
+ RMQ_USERNAME: rabbits
+ RMQ_PASSWORD: rabbits
pdsnekboxweb:
hostname: "pdsnekboxweb"
@@ -25,6 +34,8 @@ services:
- "5000"
environment:
RMQ_HOST: pdrmq
+ RMQ_USERNAME: rabbits
+ RMQ_PASSWORD: rabbits
networks:
diff --git a/rmq.py b/rmq.py
index e52e649..94f2217 100644
--- a/rmq.py
+++ b/rmq.py
@@ -34,6 +34,13 @@ class Rmq(object):
self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
+ def _declare(self, channel, queue):
+ channel.queue_declare(
+ queue=queue,
+ durable=False, # Do not commit messages to disk
+ arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
+ auto_delete=True) # Delete queue when all connection are closed
+
def consume(self, queue=QUEUE, callback=None, thread_ws=None):
while True:
@@ -42,11 +49,7 @@ class Rmq(object):
try:
channel = connection.channel()
- channel.queue_declare(queue=queue,
- durable=False, # Do not commit to disk
- arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
- auto_delete=True # Delete queue when all connection are closed
- )
+ self._declare(channel, queue)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
lambda ch, method, properties, body:
@@ -54,8 +57,10 @@ class Rmq(object):
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
+
if thread_ws:
- thread_ws.send('{"service": "connected"}')
+ if not thread_ws.closed:
+ thread_ws.send('{"service": "connected"}')
channel.start_consuming()
except Exception:
@@ -67,9 +72,11 @@ class Rmq(object):
except ConnectionClosed:
if thread_ws:
- log.error(f"Connection to {self.host} could not be established")
- thread_ws.send('{"service": "disconnected"}')
- exit(1)
+ if not thread_ws.closed:
+ log.error(f"Connection to {self.host} could not be established")
+ thread_ws.send('{"service": "disconnected"}')
+ exit(1)
+
log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
@@ -81,27 +88,34 @@ class Rmq(object):
exchange=EXCHANGE):
try:
- connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
+ connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
- channel.queue_declare(queue=queue,
- durable=False, # Do not commit to disk
- arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
- auto_delete=True # Delete queue when all connection are closed
- )
- channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
- channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ self._declare(channel, queue)
+
+ channel.exchange_declare(
+ exchange=exchange,
+ exchange_type=self.exchange_type)
+
+ channel.queue_bind(
+ exchange=exchange,
+ queue=queue,
+ routing_key=routingkey)
result = channel.basic_publish(
exchange=exchange,
routing_key=routingkey,
body=message,
- properties=self.properties
- )
+ properties=self.properties)
if result:
- log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA
+ log.info((f"published message: {self.host} "
+ f"port: {self.port} "
+ f"exchange: {exchange} "
+ f"queue: {queue} "
+ f"message: {message}"))
else:
log.error(f"Messge '{message}' not delivered")
diff --git a/snekweb.py b/snekweb.py
index d7ed0d2..2c0bb91 100644
--- a/snekweb.py
+++ b/snekweb.py
@@ -36,14 +36,17 @@ def websocket_route(ws, snekboxid):
def message_handler(ch, method, properties, body, thread_ws):
msg = body.decode('utf-8')
- log.debug(f"message_handler: {msg}")
thread_ws.send(msg)
ch.basic_ack(delivery_tag=method.delivery_tag)
consumer_parameters = {'queue': snekboxid,
'callback': message_handler,
'thread_ws': localdata.thread_ws}
- consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters)
+
+ consumer = threading.Thread(
+ target=rmq.consume,
+ kwargs=consumer_parameters)
+
consumer.daemon = True
consumer.start()