aboutsummaryrefslogtreecommitdiffstats
path: root/rmq.py
blob: 29fc4488744a802fafa1b55266341ea07e6c0cff (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import time
import traceback

import pika
from pika.exceptions import ConnectionClosed

from config import EXCHANGE, EXCHANGE_TYPE, HOST, PASSWORD, PORT, QUEUE, ROUTING_KEY, USERNAME
from logs import log


class Rmq:
    """Rabbit MQ (RMQ) implementation used for communication with the bot."""

    def __init__(self):
        self.credentials = pika.PlainCredentials(USERNAME, PASSWORD)
        self.con_params = pika.ConnectionParameters(HOST, PORT, '/', self.credentials)
        self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1)

    def _declare(self, channel, queue):
        channel.queue_declare(
            queue=queue,
            durable=False,  # Do not commit messages to disk
            arguments={'x-message-ttl': 5000},  # Delete message automatically after x milliseconds
            auto_delete=True)  # Delete queue when all connection are closed

    def consume(self, queue=QUEUE, callback=None, thread_ws=None, run_once=False):
        """Subscribe to read from a RMQ channel."""
        while True:
            try:
                connection = pika.BlockingConnection(self.con_params)

                try:
                    channel = connection.channel()
                    self._declare(channel, queue)
                    channel.basic_qos(prefetch_count=1)

                    if not run_once:
                        channel.basic_consume(
                            lambda ch, method, properties, body:
                            callback(ch, method, properties, body, thread_ws=thread_ws),
                            queue=queue)

                    log.info(f"Connected to host: {HOST} port: {PORT} queue: {queue}")

                    if thread_ws:
                        if not thread_ws.closed:
                            thread_ws.send('{"service": "connected"}')

                    if run_once:
                        return channel.basic_get(queue=queue)

                    channel.start_consuming()

                except Exception:
                    exc = traceback.format_exc()
                    log.error(exc)

                finally:
                    connection.close()

            except ConnectionClosed:
                if thread_ws:
                    if not thread_ws.closed:
                        log.error(f"Connection to {HOST} could not be established")
                        thread_ws.send('{"service": "disconnected"}')
                        exit(1)

                log.error(f"Connection lost, reconnecting to {HOST}")

            time.sleep(2)

    def publish(self, message, queue=QUEUE, routingkey=ROUTING_KEY, exchange=EXCHANGE):
        """Open a connection to publish (write) to a RMQ channel."""
        try:
            connection = pika.BlockingConnection(self.con_params)

            try:
                channel = connection.channel()

                self._declare(channel, queue)

                channel.exchange_declare(
                    exchange=exchange,
                    exchange_type=EXCHANGE_TYPE)

                channel.queue_bind(
                    exchange=exchange,
                    queue=queue,
                    routing_key=routingkey)

                result = channel.basic_publish(
                    exchange=exchange,
                    routing_key=routingkey,
                    body=message,
                    properties=self.properties)

                if result:
                    return result

                else:
                    log.error(f"Message '{message}' not delivered")

            except ConnectionClosed:
                log.error(f"Could not send message, connection to {HOST} was lost")
                exit(1)

            finally:
                connection.close()

        except ConnectionClosed:
            log.error(f"Could not connect to {HOST}")