diff options
author | 2018-05-26 16:37:56 +0200 | |
---|---|---|
committer | 2018-05-26 16:37:56 +0200 | |
commit | 1f4b827b439e236a24a811404dcb5c2e278f206d (patch) | |
tree | 4fa1251fe338a38dc3f955fa7783cac497b563ca | |
parent | automatically clean up queues and message to reduce rabbitmq memory footprint (diff) |
add rabbitmq mgmt webinterface, handle container autodiscovery better, minor code optimisations, update readme
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | config.py | 24 | ||||
-rw-r--r-- | docker-compose.yml | 13 | ||||
-rw-r--r-- | rmq.py | 54 | ||||
-rw-r--r-- | snekweb.py | 7 |
5 files changed, 65 insertions, 37 deletions
@@ -36,12 +36,14 @@ pipenv sync --dev Start a rabbitmq instance and get the container IP ```bash -docker run --name rmq -d rabbitmq:3.7.5-alpine +docker run -d --name rmq -p 15672:15672 -e RABBITMQ_DEFAULT_USER=rabbits -e RABBITMQ_DEFAULT_PASS=rabbits rabbitmq:3.7.5-management-alpine docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq # expected output with default setting: 172.17.0.2 # If not, change the config.py file to match ``` +rabbitmq webinterface: `http://localhost:15672` + start the webserver ```bash @@ -1,27 +1,25 @@ import os -def attempt_automatically_finding_the_ip_of_rmq(): +def autodiscover(): + container_names = ["rmq", "pdrmq", "snekbox_pdrmq_1"] try: import docker client = docker.from_env() - containers = client.containers.get('snekbox_pdrmq_1') - HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] - return HOST + for name in container_names: + container = client.containers.get(name) + if container.status == "running": + host = list(container.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] + return host except Exception: return '172.17.0.2' -USERNAME = 'guest' -PASSWORD = 'guest' -HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq()) +USERNAME = os.environ.get('RMQ_USERNAME', 'rabbits') +PASSWORD = os.environ.get('RMQ_PASSWORD', 'rabbits') +HOST = os.environ.get('RMQ_HOST', autodiscover()) PORT = 5672 -EXCHANGE_TYPE = 'direct' - QUEUE = 'input' EXCHANGE = QUEUE ROUTING_KEY = QUEUE - -RETURN_QUEUE = 'return' -RETURN_EXCHANGE = RETURN_QUEUE -RETURN_ROUTING_KEY = RETURN_QUEUE +EXCHANGE_TYPE = 'direct' diff --git a/docker-compose.yml b/docker-compose.yml index 667a700..67dbdde 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,9 +2,16 @@ version: '3' services: pdrmq: hostname: "pdrmq" - image: rabbitmq:3.7.5-alpine + image: rabbitmq:3.7.5-management-alpine + expose: + - "15672" + ports: + - "15672:15672" networks: - sneknet + environment: + RABBITMQ_DEFAULT_USER: rabbits + RABBITMQ_DEFAULT_PASS: rabbits pdsnekbox: hostname: "pdsnekbox" @@ -13,6 +20,8 @@ services: - sneknet environment: RMQ_HOST: pdrmq + RMQ_USERNAME: rabbits + RMQ_PASSWORD: rabbits pdsnekboxweb: hostname: "pdsnekboxweb" @@ -25,6 +34,8 @@ services: - "5000" environment: RMQ_HOST: pdrmq + RMQ_USERNAME: rabbits + RMQ_PASSWORD: rabbits networks: @@ -34,6 +34,13 @@ class Rmq(object): self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + def _declare(self, channel, queue): + channel.queue_declare( + queue=queue, + durable=False, # Do not commit messages to disk + arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds + auto_delete=True) # Delete queue when all connection are closed + def consume(self, queue=QUEUE, callback=None, thread_ws=None): while True: @@ -42,11 +49,7 @@ class Rmq(object): try: channel = connection.channel() - channel.queue_declare(queue=queue, - durable=False, # Do not commit to disk - arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA - auto_delete=True # Delete queue when all connection are closed - ) + self._declare(channel, queue) channel.basic_qos(prefetch_count=1) channel.basic_consume( lambda ch, method, properties, body: @@ -54,8 +57,10 @@ class Rmq(object): queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + if thread_ws: - thread_ws.send('{"service": "connected"}') + if not thread_ws.closed: + thread_ws.send('{"service": "connected"}') channel.start_consuming() except Exception: @@ -67,9 +72,11 @@ class Rmq(object): except ConnectionClosed: if thread_ws: - log.error(f"Connection to {self.host} could not be established") - thread_ws.send('{"service": "disconnected"}') - exit(1) + if not thread_ws.closed: + log.error(f"Connection to {self.host} could not be established") + thread_ws.send('{"service": "disconnected"}') + exit(1) + log.error(f"Connection lost, reconnecting to {self.host}") time.sleep(2) @@ -81,27 +88,34 @@ class Rmq(object): exchange=EXCHANGE): try: - connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + connection = pika.BlockingConnection(self.con_params) try: channel = connection.channel() - channel.queue_declare(queue=queue, - durable=False, # Do not commit to disk - arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA - auto_delete=True # Delete queue when all connection are closed - ) - channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + self._declare(channel, queue) + + channel.exchange_declare( + exchange=exchange, + exchange_type=self.exchange_type) + + channel.queue_bind( + exchange=exchange, + queue=queue, + routing_key=routingkey) result = channel.basic_publish( exchange=exchange, routing_key=routingkey, body=message, - properties=self.properties - ) + properties=self.properties) if result: - log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA + log.info((f"published message: {self.host} " + f"port: {self.port} " + f"exchange: {exchange} " + f"queue: {queue} " + f"message: {message}")) else: log.error(f"Messge '{message}' not delivered") @@ -36,14 +36,17 @@ def websocket_route(ws, snekboxid): def message_handler(ch, method, properties, body, thread_ws): msg = body.decode('utf-8') - log.debug(f"message_handler: {msg}") thread_ws.send(msg) ch.basic_ack(delivery_tag=method.delivery_tag) consumer_parameters = {'queue': snekboxid, 'callback': message_handler, 'thread_ws': localdata.thread_ws} - consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters) + + consumer = threading.Thread( + target=rmq.consume, + kwargs=consumer_parameters) + consumer.daemon = True consumer.start() |