diff options
Diffstat (limited to 'rmq.py')
| -rw-r--r-- | rmq.py | 75 | 
1 files changed, 49 insertions, 26 deletions
| @@ -1,8 +1,6 @@  import pika  import time  import traceback -import logging -import sys  from pika.exceptions import ConnectionClosed @@ -44,25 +42,35 @@ class Rmq(object):                  try:                      channel = connection.channel() -                    channel.queue_declare(queue=queue, durable=False) +                    channel.queue_declare(queue=queue, +                                          durable=False,  # Do not commit to disk +                                          arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged  # NOQA +                                          auto_delete=True  # Delete queue when all connection are closed +                                          )                      channel.basic_qos(prefetch_count=1)                      channel.basic_consume( -                        lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), +                        lambda ch, method, properties, body: +                        callback(ch, method, properties, body, thread_ws=thread_ws),                          queue=queue)                      log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") - +                    if thread_ws: +                        thread_ws.send('{"service": "connected"}')                      channel.start_consuming()                  except Exception:                      exc = traceback.format_exc() -                    log.info(exc) +                    log.error(exc)                  finally:                      connection.close()              except ConnectionClosed: -                log.info(f"Connection lost, reconnecting to {self.host}") +                if thread_ws: +                    log.error(f"Connection to {self.host} could not be established") +                    thread_ws.send('{"service": "disconnected"}') +                    exit(1) +                log.error(f"Connection lost, reconnecting to {self.host}")              time.sleep(2) @@ -72,22 +80,37 @@ class Rmq(object):                  routingkey=ROUTING_KEY,                  exchange=EXCHANGE): -        connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) -        channel = connection.channel() -        channel.queue_declare(queue=queue, durable=False) -        channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) -        channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - -        result = channel.basic_publish( -            exchange=exchange, -            routing_key=routingkey, -            body=message, -            properties=self.properties -        ) - -        if result: -            log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") -        else: -            log.info("not delivered") - -        connection.close() +        try: +            connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + +            try: +                channel = connection.channel() +                channel.queue_declare(queue=queue, +                                      durable=False,  # Do not commit to disk +                                      arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged # NOQA +                                      auto_delete=True  # Delete queue when all connection are closed +                                      ) +                channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) +                channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +                result = channel.basic_publish( +                    exchange=exchange, +                    routing_key=routingkey, +                    body=message, +                    properties=self.properties +                ) + +                if result: +                    log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}")  # NOQA +                else: +                    log.error(f"Messge '{message}' not delivered") + +            except ConnectionClosed: +                log.error(f"Could not send message, connection to {self.host} was lost") +                exit(1) + +            finally: +                connection.close() + +        except ConnectionClosed: +            log.error(f"Could not connect to {self.host}") | 
