aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-31 16:36:03 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-31 16:36:03 +0200
commitab2bb78ec8c5979816a9536e96396b2f024c3415 (patch)
treee15f7dedb9a377591269bbf7ba18ab8242cbe1a4 /rmq.py
parenttravis is annoying (diff)
more testing
Diffstat (limited to 'rmq.py')
-rw-r--r--rmq.py24
1 files changed, 16 insertions, 8 deletions
diff --git a/rmq.py b/rmq.py
index c93e787..ef5e8c2 100644
--- a/rmq.py
+++ b/rmq.py
@@ -37,12 +37,11 @@ class Rmq(object):
def _declare(self, channel, queue):
channel.queue_declare(
queue=queue,
- durable=False, # Do not commit messages to disk
+ durable=False, # Do not commit messages to disk
arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
- auto_delete=True) # Delete queue when all connection are closed
-
- def consume(self, queue=QUEUE, callback=None, thread_ws=None):
+ auto_delete=True) # Delete queue when all connection are closed
+ def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False):
while True:
try:
connection = pika.BlockingConnection(self.con_params)
@@ -51,16 +50,22 @@ class Rmq(object):
channel = connection.channel()
self._declare(channel, queue)
channel.basic_qos(prefetch_count=1)
- channel.basic_consume(
- lambda ch, method, properties, body:
- callback(ch, method, properties, body, thread_ws=thread_ws),
- queue=queue)
+
+ if not run_once:
+ channel.basic_consume(
+ lambda ch, method, properties, body:
+ callback(ch, method, properties, body, thread_ws=thread_ws),
+ queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
if thread_ws:
if not thread_ws.closed:
thread_ws.send('{"service": "connected"}')
+
+ if run_once:
+ return channel.basic_get(queue=queue)
+
channel.start_consuming()
except Exception:
@@ -114,6 +119,9 @@ class Rmq(object):
log.info((f"published: {self.host} "
f"queue: {queue} "
f"message: {message}"))
+
+ return result
+
else:
log.error(f"Message '{message}' not delivered")