diff options
Diffstat (limited to 'rmq.py')
| -rw-r--r-- | rmq.py | 54 | 
1 files changed, 18 insertions, 36 deletions
| @@ -4,43 +4,28 @@ import traceback  import pika  from pika.exceptions import ConnectionClosed -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import HOST -from config import PASSWORD -from config import PORT -from config import QUEUE -from config import ROUTING_KEY -from config import USERNAME +from config import EXCHANGE, EXCHANGE_TYPE, HOST, PASSWORD, PORT, QUEUE, ROUTING_KEY, USERNAME  from logs import log -class Rmq(object): +class Rmq: +    """Rabbit MQ (RMQ) implementation used for communication with the bot.""" -    def __init__(self, -                 username=USERNAME, -                 password=PASSWORD, -                 host=HOST, -                 port=PORT, -                 exchange_type=EXCHANGE_TYPE): - -        self.username = USERNAME -        self.password = PASSWORD -        self.host = HOST -        self.port = PORT -        self.exchange_type = EXCHANGE_TYPE -        self.credentials = pika.PlainCredentials(self.username, self.password) -        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) +    def __init__(self): +        self.credentials = pika.PlainCredentials(USERNAME, PASSWORD) +        self.con_params = pika.ConnectionParameters(HOST, PORT, '/', self.credentials)          self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)      def _declare(self, channel, queue):          channel.queue_declare(              queue=queue, -            durable=False,                       # Do not commit messages to disk +            durable=False,  # Do not commit messages to disk              arguments={'x-message-ttl': 5000},  # Delete message automatically after x milliseconds -            auto_delete=True)                    # Delete queue when all connection are closed +            auto_delete=True)  # Delete queue when all connection are closed      def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False): +        """Subscribe to read from a RMQ channel.""" +          while True:              try:                  connection = pika.BlockingConnection(self.con_params) @@ -56,7 +41,7 @@ class Rmq(object):                              callback(ch, method, properties, body, thread_ws=thread_ws),                              queue=queue) -                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") +                    log.info(f"Connected to host: {HOST} port: {PORT} queue: {queue}")                      if thread_ws:                          if not thread_ws.closed: @@ -77,19 +62,16 @@ class Rmq(object):              except ConnectionClosed:                  if thread_ws:                      if not thread_ws.closed: -                        log.error(f"Connection to {self.host} could not be established") +                        log.error(f"Connection to {HOST} could not be established")                          thread_ws.send('{"service": "disconnected"}')                          exit(1) -                log.error(f"Connection lost, reconnecting to {self.host}") +                log.error(f"Connection lost, reconnecting to {HOST}")              time.sleep(2) -    def publish(self, -                message, -                queue=QUEUE, -                routingkey=ROUTING_KEY, -                exchange=EXCHANGE): +    def publish(self, message, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE): +        """Open a connection to publish (write) to a RMQ channel."""          try:              connection = pika.BlockingConnection(self.con_params) @@ -101,7 +83,7 @@ class Rmq(object):                  channel.exchange_declare(                      exchange=exchange, -                    exchange_type=self.exchange_type) +                    exchange_type=EXCHANGE_TYPE)                  channel.queue_bind(                      exchange=exchange, @@ -121,11 +103,11 @@ class Rmq(object):                      log.error(f"Message '{message}' not delivered")              except ConnectionClosed: -                log.error(f"Could not send message, connection to {self.host} was lost") +                log.error(f"Could not send message, connection to {HOST} was lost")                  exit(1)              finally:                  connection.close()          except ConnectionClosed: -            log.error(f"Could not connect to {self.host}") +            log.error(f"Could not connect to {HOST}") | 
