aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-26 16:37:56 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-26 16:37:56 +0200
commit1f4b827b439e236a24a811404dcb5c2e278f206d (patch)
tree4fa1251fe338a38dc3f955fa7783cac497b563ca
parentautomatically clean up queues and message to reduce rabbitmq memory footprint (diff)
add rabbitmq mgmt webinterface, handle container autodiscovery better, minor code optimisations, update readme
-rw-r--r--README.md4
-rw-r--r--config.py24
-rw-r--r--docker-compose.yml13
-rw-r--r--rmq.py54
-rw-r--r--snekweb.py7
5 files changed, 65 insertions, 37 deletions
diff --git a/README.md b/README.md
index 031e817..52ab8ef 100644
--- a/README.md
+++ b/README.md
@@ -36,12 +36,14 @@ pipenv sync --dev
Start a rabbitmq instance and get the container IP
```bash
-docker run --name rmq -d rabbitmq:3.7.5-alpine
+docker run -d --name rmq -p 15672:15672 -e RABBITMQ_DEFAULT_USER=rabbits -e RABBITMQ_DEFAULT_PASS=rabbits rabbitmq:3.7.5-management-alpine
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq
# expected output with default setting: 172.17.0.2
# If not, change the config.py file to match
```
+rabbitmq webinterface: `http://localhost:15672`
+
start the webserver
```bash
diff --git a/config.py b/config.py
index f6ae0f7..beb435e 100644
--- a/config.py
+++ b/config.py
@@ -1,27 +1,25 @@
import os
-def attempt_automatically_finding_the_ip_of_rmq():
+def autodiscover():
+ container_names = ["rmq", "pdrmq", "snekbox_pdrmq_1"]
try:
import docker
client = docker.from_env()
- containers = client.containers.get('snekbox_pdrmq_1')
- HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
- return HOST
+ for name in container_names:
+ container = client.containers.get(name)
+ if container.status == "running":
+ host = list(container.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress']
+ return host
except Exception:
return '172.17.0.2'
-USERNAME = 'guest'
-PASSWORD = 'guest'
-HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq())
+USERNAME = os.environ.get('RMQ_USERNAME', 'rabbits')
+PASSWORD = os.environ.get('RMQ_PASSWORD', 'rabbits')
+HOST = os.environ.get('RMQ_HOST', autodiscover())
PORT = 5672
-EXCHANGE_TYPE = 'direct'
-
QUEUE = 'input'
EXCHANGE = QUEUE
ROUTING_KEY = QUEUE
-
-RETURN_QUEUE = 'return'
-RETURN_EXCHANGE = RETURN_QUEUE
-RETURN_ROUTING_KEY = RETURN_QUEUE
+EXCHANGE_TYPE = 'direct'
diff --git a/docker-compose.yml b/docker-compose.yml
index 667a700..67dbdde 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -2,9 +2,16 @@ version: '3'
services:
pdrmq:
hostname: "pdrmq"
- image: rabbitmq:3.7.5-alpine
+ image: rabbitmq:3.7.5-management-alpine
+ expose:
+ - "15672"
+ ports:
+ - "15672:15672"
networks:
- sneknet
+ environment:
+ RABBITMQ_DEFAULT_USER: rabbits
+ RABBITMQ_DEFAULT_PASS: rabbits
pdsnekbox:
hostname: "pdsnekbox"
@@ -13,6 +20,8 @@ services:
- sneknet
environment:
RMQ_HOST: pdrmq
+ RMQ_USERNAME: rabbits
+ RMQ_PASSWORD: rabbits
pdsnekboxweb:
hostname: "pdsnekboxweb"
@@ -25,6 +34,8 @@ services:
- "5000"
environment:
RMQ_HOST: pdrmq
+ RMQ_USERNAME: rabbits
+ RMQ_PASSWORD: rabbits
networks:
diff --git a/rmq.py b/rmq.py
index e52e649..94f2217 100644
--- a/rmq.py
+++ b/rmq.py
@@ -34,6 +34,13 @@ class Rmq(object):
self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
+ def _declare(self, channel, queue):
+ channel.queue_declare(
+ queue=queue,
+ durable=False, # Do not commit messages to disk
+ arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
+ auto_delete=True) # Delete queue when all connection are closed
+
def consume(self, queue=QUEUE, callback=None, thread_ws=None):
while True:
@@ -42,11 +49,7 @@ class Rmq(object):
try:
channel = connection.channel()
- channel.queue_declare(queue=queue,
- durable=False, # Do not commit to disk
- arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
- auto_delete=True # Delete queue when all connection are closed
- )
+ self._declare(channel, queue)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
lambda ch, method, properties, body:
@@ -54,8 +57,10 @@ class Rmq(object):
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
+
if thread_ws:
- thread_ws.send('{"service": "connected"}')
+ if not thread_ws.closed:
+ thread_ws.send('{"service": "connected"}')
channel.start_consuming()
except Exception:
@@ -67,9 +72,11 @@ class Rmq(object):
except ConnectionClosed:
if thread_ws:
- log.error(f"Connection to {self.host} could not be established")
- thread_ws.send('{"service": "disconnected"}')
- exit(1)
+ if not thread_ws.closed:
+ log.error(f"Connection to {self.host} could not be established")
+ thread_ws.send('{"service": "disconnected"}')
+ exit(1)
+
log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
@@ -81,27 +88,34 @@ class Rmq(object):
exchange=EXCHANGE):
try:
- connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
+ connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
- channel.queue_declare(queue=queue,
- durable=False, # Do not commit to disk
- arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
- auto_delete=True # Delete queue when all connection are closed
- )
- channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
- channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ self._declare(channel, queue)
+
+ channel.exchange_declare(
+ exchange=exchange,
+ exchange_type=self.exchange_type)
+
+ channel.queue_bind(
+ exchange=exchange,
+ queue=queue,
+ routing_key=routingkey)
result = channel.basic_publish(
exchange=exchange,
routing_key=routingkey,
body=message,
- properties=self.properties
- )
+ properties=self.properties)
if result:
- log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA
+ log.info((f"published message: {self.host} "
+ f"port: {self.port} "
+ f"exchange: {exchange} "
+ f"queue: {queue} "
+ f"message: {message}"))
else:
log.error(f"Messge '{message}' not delivered")
diff --git a/snekweb.py b/snekweb.py
index d7ed0d2..2c0bb91 100644
--- a/snekweb.py
+++ b/snekweb.py
@@ -36,14 +36,17 @@ def websocket_route(ws, snekboxid):
def message_handler(ch, method, properties, body, thread_ws):
msg = body.decode('utf-8')
- log.debug(f"message_handler: {msg}")
thread_ws.send(msg)
ch.basic_ack(delivery_tag=method.delivery_tag)
consumer_parameters = {'queue': snekboxid,
'callback': message_handler,
'thread_ws': localdata.thread_ws}
- consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters)
+
+ consumer = threading.Thread(
+ target=rmq.consume,
+ kwargs=consumer_parameters)
+
consumer.daemon = True
consumer.start()