diff options
| -rw-r--r-- | README.md | 4 | ||||
| -rw-r--r-- | config.py | 24 | ||||
| -rw-r--r-- | docker-compose.yml | 13 | ||||
| -rw-r--r-- | rmq.py | 54 | ||||
| -rw-r--r-- | snekweb.py | 7 | 
5 files changed, 65 insertions, 37 deletions
| @@ -36,12 +36,14 @@ pipenv sync --dev  Start a rabbitmq instance and get the container IP  ```bash -docker run --name rmq -d rabbitmq:3.7.5-alpine +docker run -d --name rmq -p 15672:15672 -e RABBITMQ_DEFAULT_USER=rabbits -e RABBITMQ_DEFAULT_PASS=rabbits rabbitmq:3.7.5-management-alpine  docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' rmq  # expected output with default setting: 172.17.0.2  # If not, change the config.py file to match  ``` +rabbitmq webinterface: `http://localhost:15672` +  start the webserver  ```bash @@ -1,27 +1,25 @@  import os -def attempt_automatically_finding_the_ip_of_rmq(): +def autodiscover(): +    container_names = ["rmq", "pdrmq", "snekbox_pdrmq_1"]      try:          import docker          client = docker.from_env() -        containers = client.containers.get('snekbox_pdrmq_1') -        HOST = list(containers.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] -        return HOST +        for name in container_names: +            container = client.containers.get(name) +            if container.status == "running": +                host = list(container.attrs.get('NetworkSettings').get('Networks').values())[0]['IPAddress'] +                return host      except Exception:          return '172.17.0.2' -USERNAME = 'guest' -PASSWORD = 'guest' -HOST = os.environ.get('RMQ_HOST', attempt_automatically_finding_the_ip_of_rmq()) +USERNAME = os.environ.get('RMQ_USERNAME', 'rabbits') +PASSWORD = os.environ.get('RMQ_PASSWORD', 'rabbits') +HOST = os.environ.get('RMQ_HOST', autodiscover())  PORT = 5672 -EXCHANGE_TYPE = 'direct' -  QUEUE = 'input'  EXCHANGE = QUEUE  ROUTING_KEY = QUEUE - -RETURN_QUEUE = 'return' -RETURN_EXCHANGE = RETURN_QUEUE -RETURN_ROUTING_KEY = RETURN_QUEUE +EXCHANGE_TYPE = 'direct' diff --git a/docker-compose.yml b/docker-compose.yml index 667a700..67dbdde 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,9 +2,16 @@ version: '3'  services:    pdrmq:      hostname: "pdrmq" -    image: rabbitmq:3.7.5-alpine +    image: rabbitmq:3.7.5-management-alpine +    expose: +      - "15672" +    ports: +       - "15672:15672"      networks:        - sneknet +    environment: +      RABBITMQ_DEFAULT_USER: rabbits +      RABBITMQ_DEFAULT_PASS: rabbits    pdsnekbox:      hostname: "pdsnekbox" @@ -13,6 +20,8 @@ services:        - sneknet      environment:        RMQ_HOST: pdrmq +      RMQ_USERNAME: rabbits +      RMQ_PASSWORD: rabbits    pdsnekboxweb:      hostname: "pdsnekboxweb" @@ -25,6 +34,8 @@ services:        - "5000"      environment:        RMQ_HOST: pdrmq +      RMQ_USERNAME: rabbits +      RMQ_PASSWORD: rabbits  networks: @@ -34,6 +34,13 @@ class Rmq(object):          self.con_params = pika.ConnectionParameters(self.host, self.port, '/', self.credentials)          self.properties = pika.BasicProperties(content_type='text/plain', delivery_mode=1) +    def _declare(self, channel, queue): +        channel.queue_declare( +            queue=queue, +            durable=False,                      # Do not commit messages to disk +            arguments={'x-message-ttl': 5000},  # Delete message automatically after x milliseconds +            auto_delete=True)                   # Delete queue when all connection are closed +      def consume(self, queue=QUEUE, callback=None, thread_ws=None):          while True: @@ -42,11 +49,7 @@ class Rmq(object):                  try:                      channel = connection.channel() -                    channel.queue_declare(queue=queue, -                                          durable=False,  # Do not commit to disk -                                          arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged  # NOQA -                                          auto_delete=True  # Delete queue when all connection are closed -                                          ) +                    self._declare(channel, queue)                      channel.basic_qos(prefetch_count=1)                      channel.basic_consume(                          lambda ch, method, properties, body: @@ -54,8 +57,10 @@ class Rmq(object):                          queue=queue)                      log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") +                      if thread_ws: -                        thread_ws.send('{"service": "connected"}') +                        if not thread_ws.closed: +                            thread_ws.send('{"service": "connected"}')                      channel.start_consuming()                  except Exception: @@ -67,9 +72,11 @@ class Rmq(object):              except ConnectionClosed:                  if thread_ws: -                    log.error(f"Connection to {self.host} could not be established") -                    thread_ws.send('{"service": "disconnected"}') -                    exit(1) +                    if not thread_ws.closed: +                        log.error(f"Connection to {self.host} could not be established") +                        thread_ws.send('{"service": "disconnected"}') +                        exit(1) +                  log.error(f"Connection lost, reconnecting to {self.host}")              time.sleep(2) @@ -81,27 +88,34 @@ class Rmq(object):                  exchange=EXCHANGE):          try: -            connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) +            connection = pika.BlockingConnection(self.con_params)              try:                  channel = connection.channel() -                channel.queue_declare(queue=queue, -                                      durable=False,  # Do not commit to disk -                                      arguments={'x-message-ttl': 5000},  # a message is automatically deleted after x milliseconds if unacknowledged # NOQA -                                      auto_delete=True  # Delete queue when all connection are closed -                                      ) -                channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) -                channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + +                self._declare(channel, queue) + +                channel.exchange_declare( +                    exchange=exchange, +                    exchange_type=self.exchange_type) + +                channel.queue_bind( +                    exchange=exchange, +                    queue=queue, +                    routing_key=routingkey)                  result = channel.basic_publish(                      exchange=exchange,                      routing_key=routingkey,                      body=message, -                    properties=self.properties -                ) +                    properties=self.properties)                  if result: -                    log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}")  # NOQA +                    log.info((f"published message: {self.host} " +                              f"port: {self.port} " +                              f"exchange: {exchange} " +                              f"queue: {queue} " +                              f"message: {message}"))                  else:                      log.error(f"Messge '{message}' not delivered") @@ -36,14 +36,17 @@ def websocket_route(ws, snekboxid):      def message_handler(ch, method, properties, body, thread_ws):          msg = body.decode('utf-8') -        log.debug(f"message_handler: {msg}")          thread_ws.send(msg)          ch.basic_ack(delivery_tag=method.delivery_tag)      consumer_parameters = {'queue': snekboxid,                             'callback': message_handler,                             'thread_ws': localdata.thread_ws} -    consumer = threading.Thread(target=rmq.consume, kwargs=consumer_parameters) + +    consumer = threading.Thread( +        target=rmq.consume, +        kwargs=consumer_parameters) +      consumer.daemon = True      consumer.start() | 
