diff options
author | 2018-05-31 16:36:03 +0200 | |
---|---|---|
committer | 2018-05-31 16:36:03 +0200 | |
commit | ab2bb78ec8c5979816a9536e96396b2f024c3415 (patch) | |
tree | e15f7dedb9a377591269bbf7ba18ab8242cbe1a4 /rmq.py | |
parent | travis is annoying (diff) |
more testing
Diffstat (limited to 'rmq.py')
-rw-r--r-- | rmq.py | 24 |
1 files changed, 16 insertions, 8 deletions
@@ -37,12 +37,11 @@ class Rmq(object): def _declare(self, channel, queue): channel.queue_declare( queue=queue, - durable=False, # Do not commit messages to disk + durable=False, # Do not commit messages to disk arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds - auto_delete=True) # Delete queue when all connection are closed - - def consume(self, queue=QUEUE, callback=None, thread_ws=None): + auto_delete=True) # Delete queue when all connection are closed + def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False): while True: try: connection = pika.BlockingConnection(self.con_params) @@ -51,16 +50,22 @@ class Rmq(object): channel = connection.channel() self._declare(channel, queue) channel.basic_qos(prefetch_count=1) - channel.basic_consume( - lambda ch, method, properties, body: - callback(ch, method, properties, body, thread_ws=thread_ws), - queue=queue) + + if not run_once: + channel.basic_consume( + lambda ch, method, properties, body: + callback(ch, method, properties, body, thread_ws=thread_ws), + queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") if thread_ws: if not thread_ws.closed: thread_ws.send('{"service": "connected"}') + + if run_once: + return channel.basic_get(queue=queue) + channel.start_consuming() except Exception: @@ -114,6 +119,9 @@ class Rmq(object): log.info((f"published: {self.host} " f"queue: {queue} " f"message: {message}")) + + return result + else: log.error(f"Message '{message}' not delivered") |