diff options
Diffstat (limited to '')
| -rw-r--r-- | rmq.py | 94 | ||||
| -rw-r--r-- | rmq/__init__.py | 0 | ||||
| -rw-r--r-- | rmq/consumer.py | 42 | ||||
| -rw-r--r-- | rmq/publisher.py | 35 |
4 files changed, 94 insertions, 77 deletions
@@ -0,0 +1,94 @@ +import pika +import time +import traceback +import logging +import sys + +from pika.exceptions import ConnectionClosed + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE_TYPE +from config import QUEUE +from config import ROUTING_KEY +from config import EXCHANGE + +from logs import log + + +class Rmq(object): + + def __init__(self, + username=USERNAME, + password=PASSWORD, + host=HOST, + port=PORT, + exchange_type=EXCHANGE_TYPE): + + self.username = USERNAME + self.password = PASSWORD + self.host = HOST + self.port = PORT + self.exchange_type = EXCHANGE_TYPE + self.credentials = pika.PlainCredentials(self.username, self.password) + self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) + self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + + def consume(self, queue=QUEUE, callback=None, thread_ws=None): + + while True: + try: + connection = pika.BlockingConnection(self.con_params) + + try: + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), + queue=queue) + + log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + + channel.start_consuming() + + except Exception: + exc = traceback.format_exc() + log.info(exc) + + finally: + connection.close() + + except ConnectionClosed: + log.info(f"Connection lost, reconnecting to {self.host}") + pass + + time.sleep(2) + + def publish(self, + message, + queue=QUEUE, + routingkey=ROUTING_KEY, + exchange=EXCHANGE): + + connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + channel = connection.channel() + channel.queue_declare(queue=queue, durable=False) + channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) + channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + result = channel.basic_publish( + exchange=exchange, + routing_key=routingkey, + body=message, + properties=self.properties + ) + + if result: + log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") + else: + log.info("not delivered") + + connection.close() diff --git a/rmq/__init__.py b/rmq/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/rmq/__init__.py +++ /dev/null diff --git a/rmq/consumer.py b/rmq/consumer.py deleted file mode 100644 index 2c41e26..0000000 --- a/rmq/consumer.py +++ /dev/null @@ -1,42 +0,0 @@ -import time -import traceback -import pika -from pika.exceptions import ConnectionClosed - - -def consume(username='guest', - password='guest', - host='localhost', - port=5672, - queue='', - callback=None): - - while True: - credentials = pika.PlainCredentials(username, password) - con_params = pika.ConnectionParameters(host, port, '/', credentials) - - try: - connection = pika.BlockingConnection(con_params) - - try: - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(callback, queue=queue) - - print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) - - channel.start_consuming() - - except Exception: - exc = traceback.format_exc() - print(exc, flush=True) - - finally: - connection.close() - - except ConnectionClosed: - print(f"Connection lost, reconnecting to {host}", flush=True) - pass - - time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py deleted file mode 100644 index 1a9a5cf..0000000 --- a/rmq/publisher.py +++ /dev/null @@ -1,35 +0,0 @@ -import pika - - -def publish(message, - username='guest', - password='guest', - host='localhost', - port=5672, - queue='', - routingkey='', - exchange='', - exchange_type=''): - - credentials = pika.PlainCredentials(username, password) - connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) - properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) - - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - - result = channel.basic_publish( - exchange=exchange, - routing_key=routingkey, - body=message, - properties=properties - ) - - if result: - print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) - else: - print("not delivered") - - connection.close() |