aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-26 14:15:11 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-26 14:15:11 +0200
commit3314b933240958f3d01e7c8ee5764dd3b690d789 (patch)
tree064d2aa19595e9e0222ff4ea5acd9dd466de033d /rmq.py
parentbeforeunload was never run, removed (diff)
automatically clean up queues and message to reduce rabbitmq memory footprint
Diffstat (limited to 'rmq.py')
-rw-r--r--rmq.py75
1 files changed, 49 insertions, 26 deletions
diff --git a/rmq.py b/rmq.py
index 1d570bf..e52e649 100644
--- a/rmq.py
+++ b/rmq.py
@@ -1,8 +1,6 @@
import pika
import time
import traceback
-import logging
-import sys
from pika.exceptions import ConnectionClosed
@@ -44,25 +42,35 @@ class Rmq(object):
try:
channel = connection.channel()
- channel.queue_declare(queue=queue, durable=False)
+ channel.queue_declare(queue=queue,
+ durable=False, # Do not commit to disk
+ arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
+ auto_delete=True # Delete queue when all connection are closed
+ )
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
- lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws),
+ lambda ch, method, properties, body:
+ callback(ch, method, properties, body, thread_ws=thread_ws),
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
-
+ if thread_ws:
+ thread_ws.send('{"service": "connected"}')
channel.start_consuming()
except Exception:
exc = traceback.format_exc()
- log.info(exc)
+ log.error(exc)
finally:
connection.close()
except ConnectionClosed:
- log.info(f"Connection lost, reconnecting to {self.host}")
+ if thread_ws:
+ log.error(f"Connection to {self.host} could not be established")
+ thread_ws.send('{"service": "disconnected"}')
+ exit(1)
+ log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
@@ -72,22 +80,37 @@ class Rmq(object):
routingkey=ROUTING_KEY,
exchange=EXCHANGE):
- connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
- channel = connection.channel()
- channel.queue_declare(queue=queue, durable=False)
- channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
- channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
-
- result = channel.basic_publish(
- exchange=exchange,
- routing_key=routingkey,
- body=message,
- properties=self.properties
- )
-
- if result:
- log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}")
- else:
- log.info("not delivered")
-
- connection.close()
+ try:
+ connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
+
+ try:
+ channel = connection.channel()
+ channel.queue_declare(queue=queue,
+ durable=False, # Do not commit to disk
+ arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
+ auto_delete=True # Delete queue when all connection are closed
+ )
+ channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
+ channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ result = channel.basic_publish(
+ exchange=exchange,
+ routing_key=routingkey,
+ body=message,
+ properties=self.properties
+ )
+
+ if result:
+ log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA
+ else:
+ log.error(f"Messge '{message}' not delivered")
+
+ except ConnectionClosed:
+ log.error(f"Could not send message, connection to {self.host} was lost")
+ exit(1)
+
+ finally:
+ connection.close()
+
+ except ConnectionClosed:
+ log.error(f"Could not connect to {self.host}")