diff options
Diffstat (limited to 'rmq.py')
-rw-r--r-- | rmq.py | 132 |
1 files changed, 0 insertions, 132 deletions
@@ -1,132 +0,0 @@ -import pika -import time -import traceback - -from pika.exceptions import ConnectionClosed - -from config import USERNAME -from config import PASSWORD -from config import HOST -from config import PORT -from config import EXCHANGE_TYPE -from config import QUEUE -from config import ROUTING_KEY -from config import EXCHANGE - -from logs import log - - -class Rmq(object): - - def __init__(self, - username=USERNAME, - password=PASSWORD, - host=HOST, - port=PORT, - exchange_type=EXCHANGE_TYPE): - - self.username = USERNAME - self.password = PASSWORD - self.host = HOST - self.port = PORT - self.exchange_type = EXCHANGE_TYPE - self.credentials = pika.PlainCredentials(self.username, self.password) - self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) - self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - - def _declare(self, channel, queue): - channel.queue_declare( - queue=queue, - durable=False, # Do not commit messages to disk - arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds - auto_delete=True) # Delete queue when all connection are closed - - def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False): - while True: - try: - connection = pika.BlockingConnection(self.con_params) - - try: - channel = connection.channel() - self._declare(channel, queue) - channel.basic_qos(prefetch_count=1) - - if not run_once: - channel.basic_consume( - lambda ch, method, properties, body: - callback(ch, method, properties, body, thread_ws=thread_ws), - queue=queue) - - log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") - - if thread_ws: - if not thread_ws.closed: - thread_ws.send('{"service": "connected"}') - - if run_once: - return channel.basic_get(queue=queue) - - channel.start_consuming() - - except Exception: - exc = traceback.format_exc() - log.error(exc) - - finally: - connection.close() - - except ConnectionClosed: - if thread_ws: - if not thread_ws.closed: - log.error(f"Connection to {self.host} could not be established") - thread_ws.send('{"service": "disconnected"}') - exit(1) - - log.error(f"Connection lost, reconnecting to {self.host}") - - time.sleep(2) - - def publish(self, - message, - queue=QUEUE, - routingkey=ROUTING_KEY, - exchange=EXCHANGE): - - try: - connection = pika.BlockingConnection(self.con_params) - - try: - channel = connection.channel() - - self._declare(channel, queue) - - channel.exchange_declare( - exchange=exchange, - exchange_type=self.exchange_type) - - channel.queue_bind( - exchange=exchange, - queue=queue, - routing_key=routingkey) - - result = channel.basic_publish( - exchange=exchange, - routing_key=routingkey, - body=message, - properties=self.properties) - - if result: - return result - - else: - log.error(f"Message '{message}' not delivered") - - except ConnectionClosed: - log.error(f"Could not send message, connection to {self.host} was lost") - exit(1) - - finally: - connection.close() - - except ConnectionClosed: - log.error(f"Could not connect to {self.host}") |