diff options
author | 2018-05-26 14:15:11 +0200 | |
---|---|---|
committer | 2018-05-26 14:15:11 +0200 | |
commit | 3314b933240958f3d01e7c8ee5764dd3b690d789 (patch) | |
tree | 064d2aa19595e9e0222ff4ea5acd9dd466de033d /rmq.py | |
parent | beforeunload was never run, removed (diff) |
automatically clean up queues and message to reduce rabbitmq memory footprint
Diffstat (limited to 'rmq.py')
-rw-r--r-- | rmq.py | 75 |
1 files changed, 49 insertions, 26 deletions
@@ -1,8 +1,6 @@ import pika import time import traceback -import logging -import sys from pika.exceptions import ConnectionClosed @@ -44,25 +42,35 @@ class Rmq(object): try: channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) + channel.queue_declare(queue=queue, + durable=False, # Do not commit to disk + arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA + auto_delete=True # Delete queue when all connection are closed + ) channel.basic_qos(prefetch_count=1) channel.basic_consume( - lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), + lambda ch, method, properties, body: + callback(ch, method, properties, body, thread_ws=thread_ws), queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") - + if thread_ws: + thread_ws.send('{"service": "connected"}') channel.start_consuming() except Exception: exc = traceback.format_exc() - log.info(exc) + log.error(exc) finally: connection.close() except ConnectionClosed: - log.info(f"Connection lost, reconnecting to {self.host}") + if thread_ws: + log.error(f"Connection to {self.host} could not be established") + thread_ws.send('{"service": "disconnected"}') + exit(1) + log.error(f"Connection lost, reconnecting to {self.host}") time.sleep(2) @@ -72,22 +80,37 @@ class Rmq(object): routingkey=ROUTING_KEY, exchange=EXCHANGE): - connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - - result = channel.basic_publish( - exchange=exchange, - routing_key=routingkey, - body=message, - properties=self.properties - ) - - if result: - log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") - else: - log.info("not delivered") - - connection.close() + try: + connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + + try: + channel = connection.channel() + channel.queue_declare(queue=queue, + durable=False, # Do not commit to disk + arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA + auto_delete=True # Delete queue when all connection are closed + ) + channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) + channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + result = channel.basic_publish( + exchange=exchange, + routing_key=routingkey, + body=message, + properties=self.properties + ) + + if result: + log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA + else: + log.error(f"Messge '{message}' not delivered") + + except ConnectionClosed: + log.error(f"Could not send message, connection to {self.host} was lost") + exit(1) + + finally: + connection.close() + + except ConnectionClosed: + log.error(f"Could not connect to {self.host}") |