diff options
| author | 2018-05-26 16:37:56 +0200 | |
|---|---|---|
| committer | 2018-05-26 16:37:56 +0200 | |
| commit | 1f4b827b439e236a24a811404dcb5c2e278f206d (patch) | |
| tree | 4fa1251fe338a38dc3f955fa7783cac497b563ca /rmq.py | |
| parent | automatically clean up queues and message to reduce rabbitmq memory footprint (diff) | |
add rabbitmq mgmt webinterface, handle container autodiscovery better, minor code optimisations, update readme
Diffstat (limited to 'rmq.py')
| -rw-r--r-- | rmq.py | 54 | 
1 files changed, 34 insertions, 20 deletions
| @@ -34,6 +34,13 @@ class Rmq(object):          self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)          self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) +    def _declare(self, channel, queue): +        channel.queue_declare( +            queue=queue, +            durable=False,                      # Do not commit messages to disk +            arguments={'x-message-ttl': 5000},  # Delete message automatically after x milliseconds +            auto_delete=True)                   # Delete queue when all connection are closed +      def consume(self, queue=QUEUE, callback=None, thread_ws=None):          while True: @@ -42,11 +49,7 @@ class Rmq(object):                  try:                      channel = connection.channel() -                    channel.queue_declare(queue=queue, -                                          durable=False,  # Do not commit to disk -                                          arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged  # NOQA -                                          auto_delete=True  # Delete queue when all connection are closed -                                          ) +                    self._declare(channel, queue)                      channel.basic_qos(prefetch_count=1)                      channel.basic_consume(                          lambda ch, method, properties, body: @@ -54,8 +57,10 @@ class Rmq(object):                          queue=queue)                      log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") +                      if thread_ws: -                        thread_ws.send('{"service": "connected"}') +                        if not thread_ws.closed: +                            thread_ws.send('{"service": "connected"}')                      channel.start_consuming()                  except Exception: @@ -67,9 +72,11 @@ class Rmq(object):              except ConnectionClosed:                  if thread_ws: -                    log.error(f"Connection to {self.host} could not be established") -                    thread_ws.send('{"service": "disconnected"}') -                    exit(1) +                    if not thread_ws.closed: +                        log.error(f"Connection to {self.host} could not be established") +                        thread_ws.send('{"service": "disconnected"}') +                        exit(1) +                  log.error(f"Connection lost, reconnecting to {self.host}")              time.sleep(2) @@ -81,27 +88,34 @@ class Rmq(object):                  exchange=EXCHANGE):          try: -            connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) +            connection = pika.BlockingConnection(self.con_params)              try:                  channel = connection.channel() -                channel.queue_declare(queue=queue, -                                      durable=False,  # Do not commit to disk -                                      arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged # NOQA -                                      auto_delete=True  # Delete queue when all connection are closed -                                      ) -                channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) -                channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +                self._declare(channel, queue) + +                channel.exchange_declare( +                    exchange=exchange, +                    exchange_type=self.exchange_type) + +                channel.queue_bind( +                    exchange=exchange, +                    queue=queue, +                    routing_key=routingkey)                  result = channel.basic_publish(                      exchange=exchange,                      routing_key=routingkey,                      body=message, -                    properties=self.properties -                ) +                    properties=self.properties)                  if result: -                    log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}")  # NOQA +                    log.info((f"published message: {self.host} " +                              f"port: {self.port} " +                              f"exchange: {exchange} " +                              f"queue: {queue} " +                              f"message: {message}"))                  else:                      log.error(f"Messge '{message}' not delivered") | 
