| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
 | import pika
import time
import traceback
from pika.exceptions import ConnectionClosed
from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE
from logs import log
class Rmq(object):
    def __init__(self,
                 username=USERNAME,
                 password=PASSWORD,
                 host=HOST,
                 port=PORT,
                 exchange_type=EXCHANGE_TYPE):
        self.username = USERNAME
        self.password = PASSWORD
        self.host = HOST
        self.port = PORT
        self.exchange_type = EXCHANGE_TYPE
        self.credentials = pika.PlainCredentials(self.username, self.password)
        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
    def consume(self, queue=QUEUE, callback=None, thread_ws=None):
        while True:
            try:
                connection = pika.BlockingConnection(self.con_params)
                try:
                    channel = connection.channel()
                    channel.queue_declare(queue=queue,
                                          durable=False,  # Do not commit to disk
                                          arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged  # NOQA
                                          auto_delete=True  # Delete queue when all connection are closed
                                          )
                    channel.basic_qos(prefetch_count=1)
                    channel.basic_consume(
                        lambda ch, method, properties, body:
                        callback(ch, method, properties, body, thread_ws=thread_ws),
                        queue=queue)
                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
                    if thread_ws:
                        thread_ws.send('{"service": "connected"}')
                    channel.start_consuming()
                except Exception:
                    exc = traceback.format_exc()
                    log.error(exc)
                finally:
                    connection.close()
            except ConnectionClosed:
                if thread_ws:
                    log.error(f"Connection to {self.host} could not be established")
                    thread_ws.send('{"service": "disconnected"}')
                    exit(1)
                log.error(f"Connection lost, reconnecting to {self.host}")
            time.sleep(2)
    def publish(self,
                message,
                queue=QUEUE,
                routingkey=ROUTING_KEY,
                exchange=EXCHANGE):
        try:
            connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
            try:
                channel = connection.channel()
                channel.queue_declare(queue=queue,
                                      durable=False,  # Do not commit to disk
                                      arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
                                      auto_delete=True  # Delete queue when all connection are closed
                                      )
                channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
                channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
                result = channel.basic_publish(
                    exchange=exchange,
                    routing_key=routingkey,
                    body=message,
                    properties=self.properties
                )
                if result:
                    log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}")  # NOQA
                else:
                    log.error(f"Messge '{message}' not delivered")
            except ConnectionClosed:
                log.error(f"Could not send message, connection to {self.host} was lost")
                exit(1)
            finally:
                connection.close()
        except ConnectionClosed:
            log.error(f"Could not connect to {self.host}")
 |