aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
blob: c8547d0e2f93b6d2d95b062b11e6894746d70773 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import pika
import time
import traceback
import logging
import sys

from pika.exceptions import ConnectionClosed

from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE

from logs import log


class Rmq(object):

    def __init__(self,
                 username=USERNAME,
                 password=PASSWORD,
                 host=HOST,
                 port=PORT,
                 exchange_type=EXCHANGE_TYPE):

        self.username = USERNAME
        self.password = PASSWORD
        self.host = HOST
        self.port = PORT
        self.exchange_type = EXCHANGE_TYPE
        self.credentials = pika.PlainCredentials(self.username, self.password)
        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)

    def consume(self, queue=QUEUE, callback=None, thread_ws=None):

        while True:
            try:
                connection = pika.BlockingConnection(self.con_params)

                try:
                    channel = connection.channel()
                    channel.queue_declare(queue=queue, durable=False)
                    channel.basic_qos(prefetch_count=1)
                    channel.basic_consume(
                        lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws),
                        queue=queue)

                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")

                    channel.start_consuming()

                except Exception:
                    exc = traceback.format_exc()
                    log.info(exc)

                finally:
                    connection.close()

            except ConnectionClosed:
                log.info(f"Connection lost, reconnecting to {self.host}")
                pass

            time.sleep(2)

    def publish(self,
                message,
                queue=QUEUE,
                routingkey=ROUTING_KEY,
                exchange=EXCHANGE):

        connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
        channel = connection.channel()
        channel.queue_declare(queue=queue, durable=False)
        channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
        channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)

        result = channel.basic_publish(
            exchange=exchange,
            routing_key=routingkey,
            body=message,
            properties=self.properties
        )

        if result:
            log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}")
        else:
            log.info("not delivered")

        connection.close()