aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar RohanJnr <[email protected]>2021-12-22 03:44:56 +0530
committerGravatar RohanJnr <[email protected]>2021-12-22 03:44:56 +0530
commit90840b6f65ce9f8a2f0993371e90fbc9d2864c9b (patch)
tree8fdfd266eb9cb5ca6d79f916b6338e85b78f29fb
parentAdd RedisMessageRelay class to relay messages across services. (diff)
Modify function signatures and use string instead of json for pubsub message.redis_relay
-rw-r--r--botcore/redis_message_relay.py42
1 files changed, 15 insertions, 27 deletions
diff --git a/botcore/redis_message_relay.py b/botcore/redis_message_relay.py
index f90e4dab..6f5e2e22 100644
--- a/botcore/redis_message_relay.py
+++ b/botcore/redis_message_relay.py
@@ -1,7 +1,7 @@
import asyncio
import inspect
import json
-from typing import Callable, Union
+from typing import Callable, Optional
import aioredis
@@ -12,13 +12,14 @@ class RedisMessageRelay:
def __init__(
self,
name: str,
- redis_channel: Union[aioredis.Channel, str],
- redis_list: str,
redis_pool: aioredis.Redis,
+ *,
+ redis_channel: Optional[aioredis.Channel, str] = None,
+ redis_list: Optional[str] = None,
) -> None:
self.name = name
- self.channel = redis_channel
- self.redis_list = redis_list
+ self.channel = redis_channel or name
+ self.redis_list = redis_list or name
self.redis = redis_pool
@@ -27,16 +28,6 @@ class RedisMessageRelay:
class RedisMessageProducer(RedisMessageRelay):
-
- def __init__(
- self,
- name: str,
- redis_channel: Union[aioredis.Channel, str],
- redis_list: str,
- redis_pool: aioredis.Redis,
- ) -> None:
- super().__init__(name, redis_channel, redis_list, redis_pool)
-
async def relay(self, data: dict) -> int:
"""
Push message and notify consumer.
@@ -49,9 +40,9 @@ class RedisMessageProducer(RedisMessageRelay):
await self.redis.rpush(self.redis_list, serialised)
# Notify consumer about new message.
- subs: int = await self.redis.publish_json(
+ subs: int = await self.redis.publish(
self.channel,
- {"pushed": True}
+ "pushed"
)
return subs
@@ -60,17 +51,15 @@ class RedisMessageConsumer(RedisMessageRelay):
def __init__(
self,
- name: str,
- redis_channel: Union[aioredis.Channel, str],
- redis_list: str,
- redis_pool: aioredis.Redis,
+ *args,
callback: Callable,
- loop: asyncio.AbstractEventLoop
+ loop: Optional[asyncio.AbstractEventLoop] = None,
+ **kwargs
) -> None:
- super().__init__(name, redis_channel, redis_list, redis_pool)
+ super().__init__(*args, **kwargs)
self.callback = callback
- self.loop = loop
+ self.loop = loop or asyncio.get_event_loop()
async def listen(self) -> None:
res = await self.redis.subscribe(self.channel)
@@ -103,7 +92,6 @@ class RedisMessageConsumer(RedisMessageRelay):
await self.pre_callback(item)
while await self.channel.wait_message():
- data = await self.channel.get_json()
-
- if data["pushed"]:
+ data = await self.channel.get()
+ if data == "pushed":
await self.read_redis_list()