diff options
author | 2018-05-26 14:15:11 +0200 | |
---|---|---|
committer | 2018-05-26 14:15:11 +0200 | |
commit | 3314b933240958f3d01e7c8ee5764dd3b690d789 (patch) | |
tree | 064d2aa19595e9e0222ff4ea5acd9dd466de033d | |
parent | beforeunload was never run, removed (diff) |
automatically clean up queues and message to reduce rabbitmq memory footprint
-rw-r--r-- | rmq.py | 75 | ||||
-rw-r--r-- | snekbox.py | 1 | ||||
-rw-r--r-- | snekweb.py | 7 |
3 files changed, 51 insertions, 32 deletions
@@ -1,8 +1,6 @@ import pika import time import traceback -import logging -import sys from pika.exceptions import ConnectionClosed @@ -44,25 +42,35 @@ class Rmq(object): try: channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) + channel.queue_declare(queue=queue, + durable=False, # Do not commit to disk + arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA + auto_delete=True # Delete queue when all connection are closed + ) channel.basic_qos(prefetch_count=1) channel.basic_consume( - lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws), + lambda ch, method, properties, body: + callback(ch, method, properties, body, thread_ws=thread_ws), queue=queue) log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}") - + if thread_ws: + thread_ws.send('{"service": "connected"}') channel.start_consuming() except Exception: exc = traceback.format_exc() - log.info(exc) + log.error(exc) finally: connection.close() except ConnectionClosed: - log.info(f"Connection lost, reconnecting to {self.host}") + if thread_ws: + log.error(f"Connection to {self.host} could not be established") + thread_ws.send('{"service": "disconnected"}') + exit(1) + log.error(f"Connection lost, reconnecting to {self.host}") time.sleep(2) @@ -72,22 +80,37 @@ class Rmq(object): routingkey=ROUTING_KEY, exchange=EXCHANGE): - connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) - channel = connection.channel() - channel.queue_declare(queue=queue, durable=False) - channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) - channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) - - result = channel.basic_publish( - exchange=exchange, - routing_key=routingkey, - body=message, - properties=self.properties - ) - - if result: - log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}") - else: - log.info("not delivered") - - connection.close() + try: + connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials)) + + try: + channel = connection.channel() + channel.queue_declare(queue=queue, + durable=False, # Do not commit to disk + arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA + auto_delete=True # Delete queue when all connection are closed + ) + channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type) + channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey) + + result = channel.basic_publish( + exchange=exchange, + routing_key=routingkey, + body=message, + properties=self.properties + ) + + if result: + log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA + else: + log.error(f"Messge '{message}' not delivered") + + except ConnectionClosed: + log.error(f"Could not send message, connection to {self.host} was lost") + exit(1) + + finally: + connection.close() + + except ConnectionClosed: + log.error(f"Could not connect to {self.host}") @@ -1,7 +1,6 @@ import sys import io import json -import logging from logs import log from rmq import Rmq @@ -1,10 +1,7 @@ import traceback -import queue import threading -import time -import json import logging -import geventwebsocket +import json from flask import Flask from flask import render_template @@ -55,7 +52,7 @@ def websocket_route(ws, snekboxid): message = ws.receive() if message: snek_msg = json.dumps({snekboxid: message}) - log.info(f"forwarding {snek_msg} to rabbitmq") + log.info(f"User {snekboxid} sends message\n{message.strip()}") rmq.publish(snek_msg) except Exception: |