1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
import pika
import time
import traceback
import logging
import sys
from pika.exceptions import ConnectionClosed
from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE
from logs import log
class Rmq(object):
def __init__(self,
username=USERNAME,
password=PASSWORD,
host=HOST,
port=PORT,
exchange_type=EXCHANGE_TYPE):
self.username = USERNAME
self.password = PASSWORD
self.host = HOST
self.port = PORT
self.exchange_type = EXCHANGE_TYPE
self.credentials = pika.PlainCredentials(self.username, self.password)
self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)
def consume(self, queue=QUEUE, callback=None, thread_ws=None):
while True:
try:
connection = pika.BlockingConnection(self.con_params)
try:
channel = connection.channel()
channel.queue_declare(queue=queue, durable=False)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws),
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
channel.start_consuming()
except Exception:
exc = traceback.format_exc()
log.info(exc)
finally:
connection.close()
except ConnectionClosed:
log.info(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
def publish(self,
message,
queue=QUEUE,
routingkey=ROUTING_KEY,
exchange=EXCHANGE):
connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
channel = connection.channel()
channel.queue_declare(queue=queue, durable=False)
channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
result = channel.basic_publish(
exchange=exchange,
routing_key=routingkey,
body=message,
properties=self.properties
)
if result:
log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}")
else:
log.info("not delivered")
connection.close()
|