1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
import pika
import time
import traceback
from pika.exceptions import ConnectionClosed
from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE
from logs import log
class Rmq(object):
def __init__(self,
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
exchange_type=EXCHANGE_TYPE):
self.username = USERNAME
self.password = PASSWORD
self.host = HOST
self.port = PORT
self.exchange_type = EXCHANGE_TYPE
self.credentials = pika.PlainCredentials(self.username, self.password)
self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
def _declare(self, channel, queue):
channel.queue_declare(
queue=queue,
durable=False, # Do not commit messages to disk
arguments={'x-message-ttl': 5000}, # Delete message automatically after x milliseconds
auto_delete=True) # Delete queue when all connection are closed
def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False):
while True:
try:
connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
self._declare(channel, queue)
channel.basic_qos(prefetch_count=1)
if not run_once:
channel.basic_consume(
lambda ch, method, properties, body:
callback(ch, method, properties, body, thread_ws=thread_ws),
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
if thread_ws:
if not thread_ws.closed:
thread_ws.send('{"service": "connected"}')
if run_once:
return channel.basic_get(queue=queue)
channel.start_consuming()
except Exception:
exc = traceback.format_exc()
log.error(exc)
finally:
connection.close()
except ConnectionClosed:
if thread_ws:
if not thread_ws.closed:
log.error(f"Connection to {self.host} could not be established")
thread_ws.send('{"service": "disconnected"}')
exit(1)
log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
def publish(self,
message,
queue=QUEUE,
routingkey=ROUTING_KEY,
exchange=EXCHANGE):
try:
connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
self._declare(channel, queue)
channel.exchange_declare(
exchange=exchange,
exchange_type=self.exchange_type)
channel.queue_bind(
exchange=exchange,
queue=queue,
routing_key=routingkey)
result = channel.basic_publish(
exchange=exchange,
routing_key=routingkey,
body=message,
properties=self.properties)
if result:
return result
else:
log.error(f"Message '{message}' not delivered")
except ConnectionClosed:
log.error(f"Could not send message, connection to {self.host} was lost")
exit(1)
finally:
connection.close()
except ConnectionClosed:
log.error(f"Could not connect to {self.host}")
|