aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Christopher Baklid <[email protected]>2018-05-26 14:15:11 +0200
committerGravatar Christopher Baklid <[email protected]>2018-05-26 14:15:11 +0200
commit3314b933240958f3d01e7c8ee5764dd3b690d789 (patch)
tree064d2aa19595e9e0222ff4ea5acd9dd466de033d
parentbeforeunload was never run, removed (diff)
automatically clean up queues and message to reduce rabbitmq memory footprint
-rw-r--r--rmq.py75
-rw-r--r--snekbox.py1
-rw-r--r--snekweb.py7
3 files changed, 51 insertions, 32 deletions
diff --git a/rmq.py b/rmq.py
index 1d570bf..e52e649 100644
--- a/rmq.py
+++ b/rmq.py
@@ -1,8 +1,6 @@
import pika
import time
import traceback
-import logging
-import sys
from pika.exceptions import ConnectionClosed
@@ -44,25 +42,35 @@ class Rmq(object):
try:
channel = connection.channel()
- channel.queue_declare(queue=queue, durable=False)
+ channel.queue_declare(queue=queue,
+ durable=False, # Do not commit to disk
+ arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
+ auto_delete=True # Delete queue when all connection are closed
+ )
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
- lambda ch, method, properties, body: callback(ch, method, properties, body, thread_ws=thread_ws),
+ lambda ch, method, properties, body:
+ callback(ch, method, properties, body, thread_ws=thread_ws),
queue=queue)
log.info(f"Connected to host: {self.host} port: {self.port} queue: {queue}")
-
+ if thread_ws:
+ thread_ws.send('{"service": "connected"}')
channel.start_consuming()
except Exception:
exc = traceback.format_exc()
- log.info(exc)
+ log.error(exc)
finally:
connection.close()
except ConnectionClosed:
- log.info(f"Connection lost, reconnecting to {self.host}")
+ if thread_ws:
+ log.error(f"Connection to {self.host} could not be established")
+ thread_ws.send('{"service": "disconnected"}')
+ exit(1)
+ log.error(f"Connection lost, reconnecting to {self.host}")
time.sleep(2)
@@ -72,22 +80,37 @@ class Rmq(object):
routingkey=ROUTING_KEY,
exchange=EXCHANGE):
- connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
- channel = connection.channel()
- channel.queue_declare(queue=queue, durable=False)
- channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
- channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
-
- result = channel.basic_publish(
- exchange=exchange,
- routing_key=routingkey,
- body=message,
- properties=self.properties
- )
-
- if result:
- log.info(f"Connecting to host: {self.host} port: {self.port} exchange: {exchange} queue: {queue}")
- else:
- log.info("not delivered")
-
- connection.close()
+ try:
+ connection = pika.BlockingConnection(pika.ConnectionParameters(self.host, self.port, '/', self.credentials))
+
+ try:
+ channel = connection.channel()
+ channel.queue_declare(queue=queue,
+ durable=False, # Do not commit to disk
+ arguments={'x-message-ttl': 5000}, # a message is automatically deleted after x milliseconds if unacknowledged # NOQA
+ auto_delete=True # Delete queue when all connection are closed
+ )
+ channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type)
+ channel.queue_bind(exchange=exchange, queue=queue, routing_key=routingkey)
+
+ result = channel.basic_publish(
+ exchange=exchange,
+ routing_key=routingkey,
+ body=message,
+ properties=self.properties
+ )
+
+ if result:
+ log.info(f"published message: {self.host} port: {self.port} exchange: {exchange} queue: {queue} message: {message}") # NOQA
+ else:
+ log.error(f"Messge '{message}' not delivered")
+
+ except ConnectionClosed:
+ log.error(f"Could not send message, connection to {self.host} was lost")
+ exit(1)
+
+ finally:
+ connection.close()
+
+ except ConnectionClosed:
+ log.error(f"Could not connect to {self.host}")
diff --git a/snekbox.py b/snekbox.py
index d6795e8..8c2bbaf 100644
--- a/snekbox.py
+++ b/snekbox.py
@@ -1,7 +1,6 @@
import sys
import io
import json
-import logging
from logs import log
from rmq import Rmq
diff --git a/snekweb.py b/snekweb.py
index 8929438..d7ed0d2 100644
--- a/snekweb.py
+++ b/snekweb.py
@@ -1,10 +1,7 @@
import traceback
-import queue
import threading
-import time
-import json
import logging
-import geventwebsocket
+import json
from flask import Flask
from flask import render_template
@@ -55,7 +52,7 @@ def websocket_route(ws, snekboxid):
message = ws.receive()
if message:
snek_msg = json.dumps({snekboxid: message})
- log.info(f"forwarding {snek_msg} to rabbitmq")
+ log.info(f"User {snekboxid} sends message\n{message.strip()}")
rmq.publish(snek_msg)
except Exception: