diff options
author | 2021-12-22 03:44:56 +0530 | |
---|---|---|
committer | 2021-12-22 03:44:56 +0530 | |
commit | 90840b6f65ce9f8a2f0993371e90fbc9d2864c9b (patch) | |
tree | 8fdfd266eb9cb5ca6d79f916b6338e85b78f29fb | |
parent | Add RedisMessageRelay class to relay messages across services. (diff) |
Modify function signatures and use string instead of json for pubsub message.redis_relay
-rw-r--r-- | botcore/redis_message_relay.py | 42 |
1 files changed, 15 insertions, 27 deletions
diff --git a/botcore/redis_message_relay.py b/botcore/redis_message_relay.py index f90e4dab..6f5e2e22 100644 --- a/botcore/redis_message_relay.py +++ b/botcore/redis_message_relay.py @@ -1,7 +1,7 @@ import asyncio import inspect import json -from typing import Callable, Union +from typing import Callable, Optional import aioredis @@ -12,13 +12,14 @@ class RedisMessageRelay: def __init__( self, name: str, - redis_channel: Union[aioredis.Channel, str], - redis_list: str, redis_pool: aioredis.Redis, + *, + redis_channel: Optional[aioredis.Channel, str] = None, + redis_list: Optional[str] = None, ) -> None: self.name = name - self.channel = redis_channel - self.redis_list = redis_list + self.channel = redis_channel or name + self.redis_list = redis_list or name self.redis = redis_pool @@ -27,16 +28,6 @@ class RedisMessageRelay: class RedisMessageProducer(RedisMessageRelay): - - def __init__( - self, - name: str, - redis_channel: Union[aioredis.Channel, str], - redis_list: str, - redis_pool: aioredis.Redis, - ) -> None: - super().__init__(name, redis_channel, redis_list, redis_pool) - async def relay(self, data: dict) -> int: """ Push message and notify consumer. @@ -49,9 +40,9 @@ class RedisMessageProducer(RedisMessageRelay): await self.redis.rpush(self.redis_list, serialised) # Notify consumer about new message. - subs: int = await self.redis.publish_json( + subs: int = await self.redis.publish( self.channel, - {"pushed": True} + "pushed" ) return subs @@ -60,17 +51,15 @@ class RedisMessageConsumer(RedisMessageRelay): def __init__( self, - name: str, - redis_channel: Union[aioredis.Channel, str], - redis_list: str, - redis_pool: aioredis.Redis, + *args, callback: Callable, - loop: asyncio.AbstractEventLoop + loop: Optional[asyncio.AbstractEventLoop] = None, + **kwargs ) -> None: - super().__init__(name, redis_channel, redis_list, redis_pool) + super().__init__(*args, **kwargs) self.callback = callback - self.loop = loop + self.loop = loop or asyncio.get_event_loop() async def listen(self) -> None: res = await self.redis.subscribe(self.channel) @@ -103,7 +92,6 @@ class RedisMessageConsumer(RedisMessageRelay): await self.pre_callback(item) while await self.channel.wait_message(): - data = await self.channel.get_json() - - if data["pushed"]: + data = await self.channel.get() + if data == "pushed": await self.read_redis_list() |