diff options
author | 2021-12-11 20:39:16 +0530 | |
---|---|---|
committer | 2021-12-11 20:39:16 +0530 | |
commit | 64c1a21b7f69bd232460fe37234456f3ec903a84 (patch) | |
tree | aa016639b5daabd97e246a404a86d18f2e0b6a85 /botcore/redis_message_relay.py | |
parent | Install aioredis and async-rediscache. (diff) |
Add RedisMessageRelay class to relay messages across services.
Diffstat (limited to 'botcore/redis_message_relay.py')
-rw-r--r-- | botcore/redis_message_relay.py | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/botcore/redis_message_relay.py b/botcore/redis_message_relay.py new file mode 100644 index 00000000..f90e4dab --- /dev/null +++ b/botcore/redis_message_relay.py @@ -0,0 +1,109 @@ +import asyncio +import inspect +import json +from typing import Callable, Union + +import aioredis + + +class RedisMessageRelay: + """A class for relaying messages across services using redis lists and pubsub.""" + + def __init__( + self, + name: str, + redis_channel: Union[aioredis.Channel, str], + redis_list: str, + redis_pool: aioredis.Redis, + ) -> None: + self.name = name + self.channel = redis_channel + self.redis_list = redis_list + + self.redis = redis_pool + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} name={self.name} channel={self.channel} list={self.redis_list}>" + + +class RedisMessageProducer(RedisMessageRelay): + + def __init__( + self, + name: str, + redis_channel: Union[aioredis.Channel, str], + redis_list: str, + redis_pool: aioredis.Redis, + ) -> None: + super().__init__(name, redis_channel, redis_list, redis_pool) + + async def relay(self, data: dict) -> int: + """ + Push message and notify consumer. + + returns: + - Number of subscribers the notification was delivered to. + """ + serialised = json.dumps(data) + + await self.redis.rpush(self.redis_list, serialised) + + # Notify consumer about new message. + subs: int = await self.redis.publish_json( + self.channel, + {"pushed": True} + ) + return subs + + +class RedisMessageConsumer(RedisMessageRelay): + + def __init__( + self, + name: str, + redis_channel: Union[aioredis.Channel, str], + redis_list: str, + redis_pool: aioredis.Redis, + callback: Callable, + loop: asyncio.AbstractEventLoop + ) -> None: + super().__init__(name, redis_channel, redis_list, redis_pool) + + self.callback = callback + self.loop = loop + + async def listen(self) -> None: + res = await self.redis.subscribe(self.channel) + self.channel = res[0] + await self.wait_until_message() + + async def pre_callback(self, data: dict): + if inspect.iscoroutinefunction(self.callback): + await self.callback(data) + return + + self.callback(data) + + async def read_redis_list(self) -> None: + serialised = await self.redis.lpop(self.redis_list) + try: + data = json.loads(serialised) + except (json.JSONDecodeError, TypeError): + pass + else: + await self.pre_callback(data) + + async def wait_until_message(self) -> None: + """Receive message from a channel.""" + # Empty queue before receiving messages. + queue: list = await self.redis.lrange(self.redis_list, 0, -1) + await self.redis.delete(self.redis_list) + + for item in queue: + await self.pre_callback(item) + + while await self.channel.wait_message(): + data = await self.channel.get_json() + + if data["pushed"]: + await self.read_redis_list() |