diff options
| author | 2018-05-26 12:54:49 +0200 | |
|---|---|---|
| committer | 2018-05-26 12:54:49 +0200 | |
| commit | ea2141dc6fd7284e6f9fa04ee638460286e3b09c (patch) | |
| tree | 218d332be80bd2f4e177b0d9e4725dd27dec2034 /rmq.py | |
| parent | lint (diff) | |
simplify threads and use local thread variables to manage user websocket connections
Diffstat (limited to 'rmq.py')
| -rw-r--r-- | rmq.py | 94 | 
1 files changed, 94 insertions, 0 deletions
| @@ -0,0 +1,94 @@ +import pika +import time +import traceback +import logging +import sys + +from pika.exceptions import ConnectionClosed + +from config import USERNAME +from config import PASSWORD +from config import HOST +from config import PORT +from config import EXCHANGE_TYPE +from config import QUEUE +from config import ROUTING_KEY +from config import EXCHANGE + +from logs import log + + +class Rmq(object): + +    def __init__(self, +                 username=USERNAME, +                 password=PASSWORD, +                 host=HOST, +                 port=PORT, +                 exchange_type=EXCHANGE_TYPE): + +        self.username = USERNAME +        self.password = PASSWORD +        self.host = HOST +        self.port = PORT +        self.exchange_type = EXCHANGE_TYPE +        self.credentials = pika.PlainCredentials(self.username, self.password) +        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials) +        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) + +    def consume(self, queue=QUEUE, callback=None, thread_ws=None): + +        while True: +            try: +                connection = pika.BlockingConnection(self.con_params) + +                try: +                    channel = connection.channel() +                    channel.queue_declare(queue=queue, durable=False) +                    channel.basic_qos(prefetch_count=1) +                    channel.basic_consume( +                        lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), +                        queue=queue) + +                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") + +                    channel.start_consuming() + +                except Exception: +                    exc = traceback.format_exc() +                    log.info(exc) + +                finally: +                    connection.close() + +            except ConnectionClosed: +                log.info(f"Connection lost, reconnecting to {self.host}") +                pass + +            time.sleep(2) + +    def publish(self, +                message, +                queue=QUEUE, +                routingkey=ROUTING_KEY, +                exchange=EXCHANGE): + +        connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) +        channel = connection.channel() +        channel.queue_declare(queue=queue, durable=False) +        channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) +        channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +        result = channel.basic_publish( +            exchange=exchange, +            routing_key=routingkey, +            body=message, +            properties=self.properties +        ) + +        if result: +            log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") +        else: +            log.info("not delivered") + +        connection.close() | 
