diff options
Diffstat (limited to 'rmq.py')
-rw-r--r-- | rmq.py | 24 |
1 files changed, 16 insertions, 8 deletions
@@ -37,12 +37,11 @@ class Rmq(object): def _declare(self, channel, queue): channel.queue_declare( queue=queue, - durable=False, # Do not commit messages to disk + durable=False, # Do not commit messages to disk arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds - auto_delete=True) # Delete queue when all connection are closed - - def consume(self, queue=QUEUE, callback=None, thread_ws=None): + auto_delete=True) # Delete queue when all connection are closed + def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False): while True: try: connection = pika.BlockingConnection(self.con_params) @@ -51,16 +50,22 @@ class Rmq(object): channel = connection.channel() self._declare(channel, queue) channel.basic_qos(prefetch_count=1) - channel.basic_consume( - lambda ch, method, properties, body: - callback(ch, method, properties, body, thread_ws=thread_ws), - queue=queue) + + if not run_once: + channel.basic_consume( + lambda ch, method, properties, body: + callback(ch, method, properties, body, thread_ws=thread_ws), + queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") if thread_ws: if not thread_ws.closed: thread_ws.send('{"service": "connected"}') + + if run_once: + return channel.basic_get(queue=queue) + channel.start_consuming() except Exception: @@ -114,6 +119,9 @@ class Rmq(object): log.info((f"published: {self.host} " f"queue: {queue} " f"message: {message}")) + + return result + else: log.error(f"Message '{message}' not delivered") |