diff options
Diffstat (limited to 'rmq.py')
-rw-r--r-- | rmq.py | 54 |
1 files changed, 18 insertions, 36 deletions
@@ -4,43 +4,28 @@ import traceback import pika from pika.exceptions import ConnectionClosed -from config import EXCHANGE -from config import EXCHANGE_TYPE -from config import HOST -from config import PASSWORD -from config import PORT -from config import QUEUE -from config import ROUTING_KEY -from config import USERNAME +from config import EXCHANGE, EXCHANGE_TYPE, HOST, PASSWORD, PORT, QUEUE, ROUTING_KEY, USERNAME from logs import log -class Rmq(object): +class Rmq: + """Rabbit MQ (RMQ) implementation used for communication with the bot.""" - def __init__(self, - username=USERNAME, - password=PASSWORD, - host=HOST, - port=PORT, - exchange_type=EXCHANGE_TYPE): - - self.username = USERNAME - self.password = PASSWORD - self.host = HOST - self.port = PORT - self.exchange_type = EXCHANGE_TYPE - self.credentials = pika.PlainCredentials(self.username, self.password) - self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) + def __init__(self): + self.credentials = pika.PlainCredentials(USERNAME, PASSWORD) + self.con_params = pika.ConnectionParameters(HOST, PORT, '/', self.credentials) self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) def _declare(self, channel, queue): channel.queue_declare( queue=queue, - durable=False, # Do not commit messages to disk + durable=False, # Do not commit messages to disk arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds - auto_delete=True) # Delete queue when all connection are closed + auto_delete=True) # Delete queue when all connection are closed def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False): + """Subscribe to read from a RMQ channel.""" + while True: try: connection = pika.BlockingConnection(self.con_params) @@ -56,7 +41,7 @@ class Rmq(object): callback(ch, method, properties, body, thread_ws=thread_ws), queue=queue) - log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + log.info(f"Connected to host: {HOST} port: {PORT} queue: {queue}") if thread_ws: if not thread_ws.closed: @@ -77,19 +62,16 @@ class Rmq(object): except ConnectionClosed: if thread_ws: if not thread_ws.closed: - log.error(f"Connection to {self.host} could not be established") + log.error(f"Connection to {HOST} could not be established") thread_ws.send('{"service": "disconnected"}') exit(1) - log.error(f"Connection lost, reconnecting to {self.host}") + log.error(f"Connection lost, reconnecting to {HOST}") time.sleep(2) - def publish(self, - message, - queue=QUEUE, - routingkey=ROUTING_KEY, - exchange=EXCHANGE): + def publish(self, message, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE): + """Open a connection to publish (write) to a RMQ channel.""" try: connection = pika.BlockingConnection(self.con_params) @@ -101,7 +83,7 @@ class Rmq(object): channel.exchange_declare( exchange=exchange, - exchange_type=self.exchange_type) + exchange_type=EXCHANGE_TYPE) channel.queue_bind( exchange=exchange, @@ -121,11 +103,11 @@ class Rmq(object): log.error(f"Message '{message}' not delivered") except ConnectionClosed: - log.error(f"Could not send message, connection to {self.host} was lost") + log.error(f"Could not send message, connection to {HOST} was lost") exit(1) finally: connection.close() except ConnectionClosed: - log.error(f"Could not connect to {self.host}") + log.error(f"Could not connect to {HOST}") |