| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
 | import pika
import time
import traceback
from pika.exceptions import ConnectionClosed
from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE
from logs import log
class Rmq(object):
    def __init__(self,
                 username=USERNAME,
                 password=PASSWORD,
                 host=HOST,
                 port=PORT,
                 exchange_type=EXCHANGE_TYPE):
        self.username = USERNAME
        self.password = PASSWORD
        self.host = HOST
        self.port = PORT
        self.exchange_type = EXCHANGE_TYPE
        self.credentials = pika.PlainCredentials(self.username, self.password)
        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
    def _declare(self, channel, queue):
        channel.queue_declare(
            queue=queue,
            durable=False,                      # Do not commit messages to disk
            arguments={'x-message-ttl': 5000},  # Delete message automatically after x milliseconds
            auto_delete=True)                   # Delete queue when all connection are closed
    def consume(self, queue=QUEUE, callback=None, thread_ws=None):
        while True:
            try:
                connection = pika.BlockingConnection(self.con_params)
                try:
                    channel = connection.channel()
                    self._declare(channel, queue)
                    channel.basic_qos(prefetch_count=1)
                    channel.basic_consume(
                        lambda ch, method, properties, body:
                        callback(ch, method, properties, body, thread_ws=thread_ws),
                        queue=queue)
                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
                    if thread_ws:
                        if not thread_ws.closed:
                            thread_ws.send('{"service": "connected"}')
                    channel.start_consuming()
                except Exception:
                    exc = traceback.format_exc()
                    log.error(exc)
                finally:
                    connection.close()
            except ConnectionClosed:
                if thread_ws:
                    if not thread_ws.closed:
                        log.error(f"Connection to {self.host} could not be established")
                        thread_ws.send('{"service": "disconnected"}')
                        exit(1)
                log.error(f"Connection lost, reconnecting to {self.host}")
            time.sleep(2)
    def publish(self,
                message,
                queue=QUEUE,
                routingkey=ROUTING_KEY,
                exchange=EXCHANGE):
        try:
            connection = pika.BlockingConnection(self.con_params)
            try:
                channel = connection.channel()
                self._declare(channel, queue)
                channel.exchange_declare(
                    exchange=exchange,
                    exchange_type=self.exchange_type)
                channel.queue_bind(
                    exchange=exchange,
                    queue=queue,
                    routing_key=routingkey)
                result = channel.basic_publish(
                    exchange=exchange,
                    routing_key=routingkey,
                    body=message,
                    properties=self.properties)
                if result:
                    log.info((f"published: {self.host} "
                              f"queue: {queue} "
                              f"message: {message}"))
                else:
                    log.error(f"Message '{message}' not delivered")
            except ConnectionClosed:
                log.error(f"Could not send message, connection to {self.host} was lost")
                exit(1)
            finally:
                connection.close()
        except ConnectionClosed:
            log.error(f"Could not connect to {self.host}")
 |