diff options
Diffstat (limited to 'rmq')
| -rw-r--r-- | rmq/__init__.py | 0 | ||||
| -rw-r--r-- | rmq/consumer.py | 35 | ||||
| -rw-r--r-- | rmq/publisher.py | 26 | 
3 files changed, 61 insertions, 0 deletions
| diff --git a/rmq/__init__.py b/rmq/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/rmq/__init__.py diff --git a/rmq/consumer.py b/rmq/consumer.py new file mode 100644 index 0000000..8e1ce2b --- /dev/null +++ b/rmq/consumer.py @@ -0,0 +1,35 @@ +import time +import traceback +import pika +from pika.exceptions import ConnectionClosed + +def consume(username='guest', password='guest', host='localhost', port=5672, queue='', callback=None): +    while True: +        credentials = pika.PlainCredentials(username, password) +        con_params = pika.ConnectionParameters(host, port, '/', credentials) + +        try: +            connection = pika.BlockingConnection(con_params) + +            try: +                channel = connection.channel() +                channel.queue_declare(queue=queue, durable=False) +                channel.basic_qos(prefetch_count=1) +                channel.basic_consume(callback, queue=queue) + +                print(f"""Connected to host: {host} port: {port} queue: {queue}""", flush=True) + +                channel.start_consuming() + +            except: +                exc = traceback.format_exc() +                print(exc, flush=True) + +            finally: +                connection.close() + +        except ConnectionClosed: +            print(f"Connection lost, reconnecting to {host}", flush=True) +            pass + +        time.sleep(2) diff --git a/rmq/publisher.py b/rmq/publisher.py new file mode 100644 index 0000000..4ba9db9 --- /dev/null +++ b/rmq/publisher.py @@ -0,0 +1,26 @@ +import pika + +def publish(message, username='guest', password='guest', host='localhost', port=5672, queue='', routingkey='', exchange='', exchange_type=''): +    credentials = pika.PlainCredentials(username, password) +    connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, '/', credentials)) +    properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + +    channel = connection.channel() +    channel.queue_declare(queue=queue, durable=False) +    channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) +    channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +    result = channel.basic_publish( +                exchange=exchange, +                routing_key=routingkey, +                body=message, +                properties=properties +    ) + +    if result: +        print(f"Connecting to host: {host} port: {port} exchange: {exchange} queue: {queue}", flush=True) +    else: +        print("not delivered") + +    connection.close() + | 
