From ab2bb78ec8c5979816a9536e96396b2f024c3415 Mon Sep 17 00:00:00 2001 From: Christopher Baklid Date: Thu, 31 May 2018 16:36:03 +0200 Subject: more testing --- rmq.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) (limited to 'rmq.py') diff --git a/rmq.py b/rmq.py index c93e787..ef5e8c2 100644 --- a/rmq.py +++ b/rmq.py @@ -37,12 +37,11 @@ class Rmq(object): def _declare(self, channel, queue): channel.queue_declare( queue=queue, - durable=False, # Do not commit messages to disk + durable=False, # Do not commit messages to disk arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds - auto_delete=True) # Delete queue when all connection are closed - - def consume(self, queue=QUEUE, callback=None, thread_ws=None): + auto_delete=True) # Delete queue when all connection are closed + def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False): while True: try: connection = pika.BlockingConnection(self.con_params) @@ -51,16 +50,22 @@ class Rmq(object): channel = connection.channel() self._declare(channel, queue) channel.basic_qos(prefetch_count=1) - channel.basic_consume( - lambda ch, method, properties, body: - callback(ch, method, properties, body, thread_ws=thread_ws), - queue=queue) + + if not run_once: + channel.basic_consume( + lambda ch, method, properties, body: + callback(ch, method, properties, body, thread_ws=thread_ws), + queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") if thread_ws: if not thread_ws.closed: thread_ws.send('{"service": "connected"}') + + if run_once: + return channel.basic_get(queue=queue) + channel.start_consuming() except Exception: @@ -114,6 +119,9 @@ class Rmq(object): log.info((f"published: {self.host} " f"queue: {queue} " f"message: {message}")) + + return result + else: log.error(f"Message '{message}' not delivered") -- cgit v1.2.3