diff options
author | 2018-05-30 22:38:54 +0100 | |
---|---|---|
committer | 2018-05-30 22:38:54 +0100 | |
commit | aaa387a5b3bdb9bc416690dccef66196c76d373e (patch) | |
tree | 270900787f1f8d76a97279f277fb281d45f37859 | |
parent | Add FAQ about LPTHW (diff) |
RabbitMQ mixin, powered by Kombu (#84)
* [RMQ] Add Kombi an an RMQMixin, as well as some constants
* [RMQ] Fix example in mixin docstring
* Update Pipfile.lock - for some reason, pipenv didn't lock kombu
-rw-r--r-- | Pipfile | 2 | ||||
-rw-r--r-- | Pipfile.lock | 44 | ||||
-rw-r--r-- | gunicorn_config.py | 12 | ||||
-rw-r--r-- | pysite/constants.py | 21 | ||||
-rw-r--r-- | pysite/mixins.py | 95 | ||||
-rw-r--r-- | pysite/queues.py | 5 |
6 files changed, 178 insertions, 1 deletions
@@ -19,6 +19,8 @@ flask-wtf = "*" docutils = "*" pygments = "*" gunicorn = "*" +kombu = "*" +librabbitmq = "*" [dev-packages] "flake8" = "*" diff --git a/Pipfile.lock b/Pipfile.lock index bd5ec96a..609caccc 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "9eaada0263515aec9e1aff323b79b628edc12c4018c992d8003e93349c0ecd99" + "sha256": "6089e578ded814ee025e4ce8422f25d14a2b8e71c6438fb8a8f9222a79f09bbc" }, "pipfile-spec": 6, "requires": { @@ -16,6 +16,13 @@ ] }, "default": { + "amqp": { + "hashes": [ + "sha256:073dd02fdd73041bffc913b767866015147b61f2a9bc104daef172fc1a0066eb", + "sha256:eed41946890cd43e8dee44a316b85cf6fee5a1a34bb4a562b660a358eb529e1b" + ], + "version": "==2.3.2" + }, "certifi": { "hashes": [ "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7", @@ -172,12 +179,32 @@ ], "version": "==2.10" }, + "kombu": { + "hashes": [ + "sha256:86adec6c60f63124e2082ea8481bbe4ebe04fde8ebed32c177c7f0cd2c1c9082", + "sha256:b274db3a4eacc4789aeb24e1de3e460586db7c4fc8610f7adcc7a3a1709a60af" + ], + "index": "pypi", + "version": "==4.2.1" + }, "lazy": { "hashes": [ "sha256:c80a77bf7106ba7b27378759900cfefef38271088dc63b014bcfe610c8e68e3d" ], "version": "==1.3" }, + "librabbitmq": { + "hashes": [ + "sha256:3116e40c02d4285b8dd69834e4cbcb1a89ea534ca9147e865f11d44e7cc56eea", + "sha256:5cdfb473573396d43d54cef9e9b4c74fa3d1516da51d04a7b261f6ef4e0bd8be", + "sha256:98e355f486964dadae7e8b51c9a60e9aa0653bbe27f6b14542687f305c4c3652", + "sha256:c2a8113d3c831808d1d940fdf43e4882636a1efe2864df7ab3bb709a45016b37", + "sha256:cd9cc09343b193d7cf2cff6c6a578061863bd986a4bdf38f922e9dc32e15d944", + "sha256:ffa2363a860ab5dcc3ce3703247e05e940c73d776c03a3f3f9deaf3cf43bb96c" + ], + "index": "pypi", + "version": "==2.0.0" + }, "logmatic-python": { "hashes": [ "sha256:0c15ac9f5faa6a60059b28910db642c3dc7722948c3cc940923f8c9039604342" @@ -270,6 +297,13 @@ ], "version": "==2.4.3" }, + "vine": { + "hashes": [ + "sha256:52116d59bc45392af9fdd3b75ed98ae48a93e822cee21e5fda249105c59a7a72", + "sha256:6849544be74ec3638e84d90bc1cf2e1e9224cc10d96cd4383ec3f69e9bce077b" + ], + "version": "==1.1.4" + }, "werkzeug": { "hashes": [ "sha256:c3fd7a7d41976d9f44db327260e263132466836cef6f91512889ed60ad26557c", @@ -334,6 +368,14 @@ ], "version": "==6.7" }, + "colorama": { + "hashes": [ + "sha256:463f8483208e921368c9f306094eb6f725c6ca42b0f97e313cb5d5512459feda", + "sha256:48eb22f4f8461b1df5734a074b57042430fb06e1d61bd1e11b078c0fe6d7a1f1" + ], + "markers": "sys_platform == 'win32'", + "version": "==0.3.9" + }, "coverage": { "hashes": [ "sha256:00d464797a236f654337181af72b4baea3d35d056ca480e45e9163bb5df496b8", diff --git a/gunicorn_config.py b/gunicorn_config.py index 37a91367..6c9cf04a 100644 --- a/gunicorn_config.py +++ b/gunicorn_config.py @@ -1,6 +1,10 @@ import re +from kombu import Connection + +from pysite.constants import RMQ_HOST, RMQ_PASSWORD, RMQ_PORT, RMQ_USERNAME from pysite.migrations.runner import run_migrations +from pysite.queues import QUEUES STRIP_REGEX = re.compile(r"<[^<]+?>") WIKI_TABLE = "wiki" @@ -34,3 +38,11 @@ def _when_ready(server=None, output_func=None): output(f"Created the following tables: {tables}") run_migrations(db, output=output) + + output("Declaring RabbitMQ queues...") + + with Connection(hostname=RMQ_HOST, userid=RMQ_USERNAME, password=RMQ_PASSWORD, port=RMQ_PORT) as c: + with c.channel() as channel: + for name, queue in QUEUES.items(): + queue.declare(channel=channel) + output(f"Queue declared: {name}") diff --git a/pysite/constants.py b/pysite/constants.py index f95e076f..a9040751 100644 --- a/pysite/constants.py +++ b/pysite/constants.py @@ -17,6 +17,16 @@ class ValidationTypes(Enum): params = "params" +class BotEventTypes(Enum): + mod_log = "mod_log" + + send_message = "send_message" + send_embed = "send_embed" + + add_role = "ensure_role" + remove_role = "remove_role" + + DEBUG_MODE = "FLASK_DEBUG" in environ # All snowflakes should be strings as RethinkDB rounds them as ints @@ -106,3 +116,14 @@ WIKI_AUDIT_WEBHOOK = environ.get("WIKI_AUDIT_WEBHOOK") or None # Bot key BOT_API_KEY = environ.get("BOT_API_KEY") or None + +# RabbitMQ settings +BOT_EVENT_QUEUE = "bot_events" + +RMQ_USERNAME = environ.get("RABBITMQ_DEFAULT_USER") or "guest" +RMQ_PASSWORD = environ.get("RABBITMQ_DEFAULT_PASS") or "guest" +RMQ_HOST = "pdrmq" if not DEBUG_MODE else "localhost" +RMQ_PORT = 5672 + +# Channels +CHANNEL_MOD_LOG = 282638479504965634 diff --git a/pysite/mixins.py b/pysite/mixins.py index d0e822bf..6b5f7187 100644 --- a/pysite/mixins.py +++ b/pysite/mixins.py @@ -1,8 +1,14 @@ +from typing import Any, Dict from weakref import ref from flask import Blueprint +from kombu import Connection from rethinkdb.ast import Table +from pysite.constants import ( + BOT_EVENT_QUEUE, BotEventTypes, + RMQ_HOST, RMQ_PASSWORD, RMQ_PORT, RMQ_USERNAME +) from pysite.database import RethinkDB from pysite.oauth import OAuthBackend @@ -58,6 +64,95 @@ class DBMixin: return self._db() +class RMQMixin: + """ + Mixin for classes that make use of RabbitMQ. It allows routes to send JSON-encoded messages to specific RabbitMQ + queues. + + This class is intended to be mixed in alongside one of the other view classes. For example: + + >>> class MyView(APIView, RMQMixin): + ... name = "my_view" # Flask internal name for this route + ... path = "/my_view" # Actual URL path to reach this route + ... queue_name = "my_queue" # Name of the RabbitMQ queue to send on + + Note that the queue name is optional if all you want to do is send bot events. + + This class will also work with Websockets: + + >>> class MyWebsocket(WS, RMQMixin): + ... name = "my_websocket" + ... path = "/my_websocket" + ... queue_name = "my_queue" + """ + + queue_name = "" + + @classmethod + def setup(cls: "RMQMixin", manager: "pysite.route_manager.RouteManager", blueprint: Blueprint): + """ + Set up the view by calling `super().setup()` as appropriate. + + :param manager: Instance of the current RouteManager (used to get a handle for the database object) + :param blueprint: Current Flask blueprint + """ + + if hasattr(super(), "setup"): + super().setup(manager, blueprint) # pragma: no cover + + @property + def rmq_connection(self) -> Connection: + """ + Get a Kombu AMQP connection object - use this in a context manager so that it gets closed after you're done + + If you're just trying to send a message, check out `rmq_send` and `rmq_bot_event` instead. + """ + + return Connection(hostname=RMQ_HOST, userid=RMQ_USERNAME, password=RMQ_PASSWORD, port=RMQ_PORT) + + def rmq_send(self, data: Dict[str, Any], routing_key: str = None): + """ + Send some data to the RabbitMQ queue + + >>> self.rmq_send({ + ... "text": "My hovercraft is full of eels!", + ... "source": "Dirty Hungarian Phrasebook" + ... }) + ... + + This will be delivered to the queue immediately. + """ + + if routing_key is None: + routing_key = self.queue_name + + with self.rmq_connection as c: + producer = c.Producer() + producer.publish(data, routing_key=routing_key) + + def rmq_bot_event(self, event_type: BotEventTypes, data: Dict[str, Any]): + """ + Send an event to the queue responsible for delivering events to the bot + + >>> self.rmq_bot_event(BotEventTypes.send_message, { + ... "channel": CHANNEL_MOD_LOG, + ... "message": "This is a plain-text message for @everyone, from the site!" + ... }) + ... + + This will be delivered to the bot and actioned immediately, or when the bot comes online if it isn't already + connected. + """ + + if not isinstance(event_type, BotEventTypes): + raise ValueError("`event_type` must be a member of the the `pysite.constants.BotEventTypes` enum") + + return self.rmq_send( + {"event": event_type.value, "data": data}, + routing_key=BOT_EVENT_QUEUE, + ) + + class OAuthMixin: """ Mixin for the classes that need access to a logged in user's information. This class should be used diff --git a/pysite/queues.py b/pysite/queues.py new file mode 100644 index 00000000..7a200208 --- /dev/null +++ b/pysite/queues.py @@ -0,0 +1,5 @@ +from kombu import Queue + +QUEUES = { # RabbitMQ Queue definitions, they'll be declared at gunicorn start time + "bot_events": Queue("bot_events", durable=True) +} |