aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
diff options
context:
space:
mode:
Diffstat (limited to 'rmq.py')
-rw-r--r--rmq.py132
1 files changed, 0 insertions, 132 deletions
diff --git a/rmq.py b/rmq.py
deleted file mode 100644
index 493a38f..0000000
--- a/rmq.py
+++ /dev/null
@@ -1,132 +0,0 @@
-import pika
-import time
-import traceback
-
-from pika.exceptions import ConnectionClosed
-
-from config import USERNAME
-from config import PASSWORD
-from config import HOST
-from config import PORT
-from config import EXCHANGE_TYPE
-from config import QUEUE
-from config import ROUTING_KEY
-from config import EXCHANGE
-
-from logs import log
-
-
-class Rmq(object):
-
- def __init__(self,
- username=USERNAME,
- password=PASSWORD,
- host=HOST,
- port=PORT,
- exchange_type=EXCHANGE_TYPE):
-
- self.username = USERNAME
- self.password = PASSWORD
- self.host = HOST
- self.port = PORT
- self.exchange_type = EXCHANGE_TYPE
- self.credentials = pika.PlainCredentials(self.username, self.password)
- self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
- self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
-
- def _declare(self, channel, queue):
- channel.queue_declare(
- queue=queue,
- durable=False, # Do not commit messages to disk
- arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
- auto_delete=True) # Delete queue when all connection are closed
-
- def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False):
- while True:
- try:
- connection = pika.BlockingConnection(self.con_params)
-
- try:
- channel = connection.channel()
- self._declare(channel, queue)
- channel.basic_qos(prefetch_count=1)
-
- if not run_once:
- channel.basic_consume(
- lambda ch, method, properties, body:
- callback(ch, method, properties, body, thread_ws=thread_ws),
- queue=queue)
-
- log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
-
- if thread_ws:
- if not thread_ws.closed:
- thread_ws.send('{"service": "connected"}')
-
- if run_once:
- return channel.basic_get(queue=queue)
-
- channel.start_consuming()
-
- except Exception:
- exc = traceback.format_exc()
- log.error(exc)
-
- finally:
- connection.close()
-
- except ConnectionClosed:
- if thread_ws:
- if not thread_ws.closed:
- log.error(f"Connection to {self.host} could not be established")
- thread_ws.send('{"service": "disconnected"}')
- exit(1)
-
- log.error(f"Connection lost, reconnecting to {self.host}")
-
- time.sleep(2)
-
- def publish(self,
- message,
- queue=QUEUE,
- routingkey=ROUTING_KEY,
- exchange=EXCHANGE):
-
- try:
- connection = pika.BlockingConnection(self.con_params)
-
- try:
- channel = connection.channel()
-
- self._declare(channel, queue)
-
- channel.exchange_declare(
- exchange=exchange,
- exchange_type=self.exchange_type)
-
- channel.queue_bind(
- exchange=exchange,
- queue=queue,
- routing_key=routingkey)
-
- result = channel.basic_publish(
- exchange=exchange,
- routing_key=routingkey,
- body=message,
- properties=self.properties)
-
- if result:
- return result
-
- else:
- log.error(f"Message '{message}' not delivered")
-
- except ConnectionClosed:
- log.error(f"Could not send message, connection to {self.host} was lost")
- exit(1)
-
- finally:
- connection.close()
-
- except ConnectionClosed:
- log.error(f"Could not connect to {self.host}")