aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
diff options
context:
space:
mode:
Diffstat (limited to 'rmq.py')
-rw-r--r--rmq.py54
1 files changed, 18 insertions, 36 deletions
diff --git a/rmq.py b/rmq.py
index 72e0e9a..919ef19 100644
--- a/rmq.py
+++ b/rmq.py
@@ -4,43 +4,28 @@ import traceback
import pika
from pika.exceptions import ConnectionClosed
-from config import EXCHANGE
-from config import EXCHANGE_TYPE
-from config import HOST
-from config import PASSWORD
-from config import PORT
-from config import QUEUE
-from config import ROUTING_KEY
-from config import USERNAME
+from config import EXCHANGE, EXCHANGE_TYPE, HOST, PASSWORD, PORT, QUEUE, ROUTING_KEY, USERNAME
from logs import log
-class Rmq(object):
+class Rmq:
+ """Rabbit MQ (RMQ) implementation used for communication with the bot."""
- def __init__(self,
- username=USERNAME,
- password=PASSWORD,
- host=HOST,
- port=PORT,
- exchange_type=EXCHANGE_TYPE):
-
- self.username = USERNAME
- self.password = PASSWORD
- self.host = HOST
- self.port = PORT
- self.exchange_type = EXCHANGE_TYPE
- self.credentials = pika.PlainCredentials(self.username, self.password)
- self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
+ def __init__(self):
+ self.credentials = pika.PlainCredentials(USERNAME, PASSWORD)
+ self.con_params = pika.ConnectionParameters(HOST, PORT, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
def _declare(self, channel, queue):
channel.queue_declare(
queue=queue,
- durable=False, # Do not commit messages to disk
+ durable=False, # Do not commit messages to disk
arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
- auto_delete=True) # Delete queue when all connection are closed
+ auto_delete=True) # Delete queue when all connection are closed
def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False):
+ """Subscribe to read from a RMQ channel."""
+
while True:
try:
connection = pika.BlockingConnection(self.con_params)
@@ -56,7 +41,7 @@ class Rmq(object):
callback(ch, method, properties, body, thread_ws=thread_ws),
queue=queue)
- log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
+ log.info(f"Connected to host: {HOST} port: {PORT} queue: {queue}")
if thread_ws:
if not thread_ws.closed:
@@ -77,19 +62,16 @@ class Rmq(object):
except ConnectionClosed:
if thread_ws:
if not thread_ws.closed:
- log.error(f"Connection to {self.host} could not be established")
+ log.error(f"Connection to {HOST} could not be established")
thread_ws.send('{"service": "disconnected"}')
exit(1)
- log.error(f"Connection lost, reconnecting to {self.host}")
+ log.error(f"Connection lost, reconnecting to {HOST}")
time.sleep(2)
- def publish(self,
- message,
- queue=QUEUE,
- routingkey=ROUTING_KEY,
- exchange=EXCHANGE):
+ def publish(self, message, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE):
+ """Open a connection to publish (write) to a RMQ channel."""
try:
connection = pika.BlockingConnection(self.con_params)
@@ -101,7 +83,7 @@ class Rmq(object):
channel.exchange_declare(
exchange=exchange,
- exchange_type=self.exchange_type)
+ exchange_type=EXCHANGE_TYPE)
channel.queue_bind(
exchange=exchange,
@@ -121,11 +103,11 @@ class Rmq(object):
log.error(f"Message '{message}' not delivered")
except ConnectionClosed:
- log.error(f"Could not send message, connection to {self.host} was lost")
+ log.error(f"Could not send message, connection to {HOST} was lost")
exit(1)
finally:
connection.close()
except ConnectionClosed:
- log.error(f"Could not connect to {self.host}")
+ log.error(f"Could not connect to {HOST}")