1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
import pika
import time
import traceback
from pika.exceptions import ConnectionClosed
from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE
from logs import log
class Rmq(object):
def __init__(self,
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
exchange_type=EXCHANGE_TYPE):
self.username = USERNAME
self.password = PASSWORD
self.host = HOST
self.port = PORT
self.exchange_type = EXCHANGE_TYPE
self.credentials = pika.PlainCredentials(self.username, self.password)
self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
def _declare(self, channel, queue):
channel.queue_declare(
queue=queue,
durable=False, # Do not commit messages to disk
arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
auto_delete=True) # Delete queue when all connection are closed
def consume(self, queue=QUEUE, callback=None, thread_ws=None):
while True:
try:
connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
self._declare(channel, queue)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
lambda ch, method, properties, body:
callback(ch, method, properties, body, thread_ws=thread_ws),
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
if thread_ws:
if not thread_ws.closed:
thread_ws.send('{"service": "connected"}')
channel.start_consuming()
except Exception:
exc = traceback.format_exc()
log.error(exc)
finally:
connection.close()
except ConnectionClosed:
if thread_ws:
if not thread_ws.closed:
log.error(f"Connection to {self.host} could not be established")
thread_ws.send('{"service": "disconnected"}')
exit(1)
log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
def publish(self,
message,
queue=QUEUE,
routingkey=ROUTING_KEY,
exchange=EXCHANGE):
try:
connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
self._declare(channel, queue)
channel.exchange_declare(
exchange=exchange,
exchange_type=self.exchange_type)
channel.queue_bind(
exchange=exchange,
queue=queue,
routing_key=routingkey)
result = channel.basic_publish(
exchange=exchange,
routing_key=routingkey,
body=message,
properties=self.properties)
if result:
log.info((f"published: {self.host} "
f"queue: {queue} "
f"message: {message}"))
else:
log.error(f"Message '{message}' not delivered")
except ConnectionClosed:
log.error(f"Could not send message, connection to {self.host} was lost")
exit(1)
finally:
connection.close()
except ConnectionClosed:
log.error(f"Could not connect to {self.host}")
|