aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
blob: 493a38f9fc67399ce5a70abc84f2d1655d293866 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import pika
import time
import traceback

from pika.exceptions import ConnectionClosed

from config import USERNAME
from config import PASSWORD
from config import HOST
from config import PORT
from config import EXCHANGE_TYPE
from config import QUEUE
from config import ROUTING_KEY
from config import EXCHANGE

from logs import log


class Rmq(object):

    def __init__(self,
                 username=USERNAME,
                 password=PASSWORD,
                 host=HOST,
                 port=PORT,
                 exchange_type=EXCHANGE_TYPE):

        self.username = USERNAME
        self.password = PASSWORD
        self.host = HOST
        self.port = PORT
        self.exchange_type = EXCHANGE_TYPE
        self.credentials = pika.PlainCredentials(self.username, self.password)
        self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)
        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)

    def _declare(self, channel, queue):
        channel.queue_declare(
            queue=queue,
            durable=False,                       # Do not commit messages to disk
            arguments={'x-message-ttl': 5000},  # Delete message automatically after x milliseconds
            auto_delete=True)                    # Delete queue when all connection are closed

    def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False):
        while True:
            try:
                connection = pika.BlockingConnection(self.con_params)

                try:
                    channel = connection.channel()
                    self._declare(channel, queue)
                    channel.basic_qos(prefetch_count=1)

                    if not run_once:
                        channel.basic_consume(
                            lambda ch, method, properties, body:
                            callback(ch, method, properties, body, thread_ws=thread_ws),
                            queue=queue)

                    log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")

                    if thread_ws:
                        if not thread_ws.closed:
                            thread_ws.send('{"service": "connected"}')

                    if run_once:
                        return channel.basic_get(queue=queue)

                    channel.start_consuming()

                except Exception:
                    exc = traceback.format_exc()
                    log.error(exc)

                finally:
                    connection.close()

            except ConnectionClosed:
                if thread_ws:
                    if not thread_ws.closed:
                        log.error(f"Connection to {self.host} could not be established")
                        thread_ws.send('{"service": "disconnected"}')
                        exit(1)

                log.error(f"Connection lost, reconnecting to {self.host}")

            time.sleep(2)

    def publish(self,
                message,
                queue=QUEUE,
                routingkey=ROUTING_KEY,
                exchange=EXCHANGE):

        try:
            connection = pika.BlockingConnection(self.con_params)

            try:
                channel = connection.channel()

                self._declare(channel, queue)

                channel.exchange_declare(
                    exchange=exchange,
                    exchange_type=self.exchange_type)

                channel.queue_bind(
                    exchange=exchange,
                    queue=queue,
                    routing_key=routingkey)

                result = channel.basic_publish(
                    exchange=exchange,
                    routing_key=routingkey,
                    body=message,
                    properties=self.properties)

                if result:
                    return result

                else:
                    log.error(f"Message '{message}' not delivered")

            except ConnectionClosed:
                log.error(f"Could not send message, connection to {self.host} was lost")
                exit(1)

            finally:
                connection.close()

        except ConnectionClosed:
            log.error(f"Could not connect to {self.host}")