diff options
author | 2018-05-26 16:37:56 +0200 | |
---|---|---|
committer | 2018-05-26 16:37:56 +0200 | |
commit | 1f4b827b439e236a24a811404dcb5c2e278f206d (patch) | |
tree | 4fa1251fe338a38dc3f955fa7783cac497b563ca /rmq.py | |
parent | automatically clean up queues and message to reduce rabbitmq memory footprint (diff) |
add rabbitmq mgmt webinterface, handle container autodiscovery better, minor code optimisations, update readme
Diffstat (limited to 'rmq.py')
-rw-r--r-- | rmq.py | 54 |
1 files changed, 34 insertions, 20 deletions
@@ -34,6 +34,13 @@ class Rmq(object): self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + def _declare(self, channel, queue): + channel.queue_declare( + queue=queue, + durable=False, # Do not commit messages to disk + arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds + auto_delete=True) # Delete queue when all connection are closed + def consume(self, queue=QUEUE, callback=None, thread_ws=None): while True: @@ -42,11 +49,7 @@ class Rmq(object): try: channel = connection.channel() - channel.queue_declare(queue=queue, - durable=False, # Do not commit to disk - arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA - auto_delete=True # Delete queue when all connection are closed - ) + self._declare(channel, queue) channel.basic_qos(prefetch_count=1) channel.basic_consume( lambda ch, method, properties, body: @@ -54,8 +57,10 @@ class Rmq(object): queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + if thread_ws: - thread_ws.send('{"service": "connected"}') + if not thread_ws.closed: + thread_ws.send('{"service": "connected"}') channel.start_consuming() except Exception: @@ -67,9 +72,11 @@ class Rmq(object): except ConnectionClosed: if thread_ws: - log.error(f"Connection to {self.host} could not be established") - thread_ws.send('{"service": "disconnected"}') - exit(1) + if not thread_ws.closed: + log.error(f"Connection to {self.host} could not be established") + thread_ws.send('{"service": "disconnected"}') + exit(1) + log.error(f"Connection lost, reconnecting to {self.host}") time.sleep(2) @@ -81,27 +88,34 @@ class Rmq(object): exchange=EXCHANGE): try: - connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + connection = pika.BlockingConnection(self.con_params) try: channel = connection.channel() - channel.queue_declare(queue=queue, - durable=False, # Do not commit to disk - arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA - auto_delete=True # Delete queue when all connection are closed - ) - channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + self._declare(channel, queue) + + channel.exchange_declare( + exchange=exchange, + exchange_type=self.exchange_type) + + channel.queue_bind( + exchange=exchange, + queue=queue, + routing_key=routingkey) result = channel.basic_publish( exchange=exchange, routing_key=routingkey, body=message, - properties=self.properties - ) + properties=self.properties) if result: - log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA + log.info((f"published message: {self.host} " + f"port: {self.port} " + f"exchange: {exchange} " + f"queue: {queue} " + f"message: {message}")) else: log.error(f"Messge '{message}' not delivered") |