aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-26 16:37:56 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-26 16:37:56 +0200
commit1f4b827b439e236a24a811404dcb5c2e278f206d (patch)
tree4fa1251fe338a38dc3f955fa7783cac497b563ca /rmq.py
parentautomatically clean up queues and message to reduce rabbitmq memory footprint (diff)
add rabbitmq mgmt webinterface, handle container autodiscovery better, minor code optimisations, update readme
Diffstat (limited to 'rmq.py')
-rw-r--r--rmq.py54
1 files changed, 34 insertions, 20 deletions
diff --git a/rmq.py b/rmq.py
index e52e649..94f2217 100644
--- a/rmq.py
+++ b/rmq.py
@@ -34,6 +34,13 @@ class Rmq(object):
self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
+ def _declare(self, channel, queue):
+ channel.queue_declare(
+ queue=queue,
+ durable=False, # Do not commit messages to disk
+ arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
+ auto_delete=True) # Delete queue when all connection are closed
+
def consume(self, queue=QUEUE, callback=None, thread_ws=None):
while True:
@@ -42,11 +49,7 @@ class Rmq(object):
try:
channel = connection.channel()
- channel.queue_declare(queue=queue,
- durable=False, # Do not commit to disk
- arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
- auto_delete=True # Delete queue when all connection are closed
- )
+ self._declare(channel, queue)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
lambda ch, method, properties, body:
@@ -54,8 +57,10 @@ class Rmq(object):
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
+
if thread_ws:
- thread_ws.send('{"service": "connected"}')
+ if not thread_ws.closed:
+ thread_ws.send('{"service": "connected"}')
channel.start_consuming()
except Exception:
@@ -67,9 +72,11 @@ class Rmq(object):
except ConnectionClosed:
if thread_ws:
- log.error(f"Connection to {self.host} could not be established")
- thread_ws.send('{"service": "disconnected"}')
- exit(1)
+ if not thread_ws.closed:
+ log.error(f"Connection to {self.host} could not be established")
+ thread_ws.send('{"service": "disconnected"}')
+ exit(1)
+
log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
@@ -81,27 +88,34 @@ class Rmq(object):
exchange=EXCHANGE):
try:
- connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
+ connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
- channel.queue_declare(queue=queue,
- durable=False, # Do not commit to disk
- arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
- auto_delete=True # Delete queue when all connection are closed
- )
- channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
- channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ self._declare(channel, queue)
+
+ channel.exchange_declare(
+ exchange=exchange,
+ exchange_type=self.exchange_type)
+
+ channel.queue_bind(
+ exchange=exchange,
+ queue=queue,
+ routing_key=routingkey)
result = channel.basic_publish(
exchange=exchange,
routing_key=routingkey,
body=message,
- properties=self.properties
- )
+ properties=self.properties)
if result:
- log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA
+ log.info((f"published message: {self.host} "
+ f"port: {self.port} "
+ f"exchange: {exchange} "
+ f"queue: {queue} "
+ f"message: {message}"))
else:
log.error(f"Messge '{message}' not delivered")