diff options
| author | 2019-06-27 21:40:47 +0200 | |
|---|---|---|
| committer | 2019-06-27 22:03:59 +0200 | |
| commit | 59a75181e4f6c3a0994b0cc0603441ad0b1225e6 (patch) | |
| tree | 6261351b64e3f2e3c2cb3a86d6004317eb1034ef | |
| parent | Cleaning up debug functions and unnecessary imports (diff) | |
Adding proxy channel + improved methods to watchchannel
| -rw-r--r-- | bot/cogs/watchchannels/watchchannel.py | 117 |
1 files changed, 69 insertions, 48 deletions
diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py index 856211139..5f9d8d1dd 100644 --- a/bot/cogs/watchchannels/watchchannel.py +++ b/bot/cogs/watchchannels/watchchannel.py @@ -8,12 +8,10 @@ from typing import Optional import aiohttp import discord -from discord import Color, Embed, Message, errors -from discord.ext.commands import Bot, Context +from discord import Color, Embed, Message, Object, errors +from discord.ext.commands import BadArgument, Bot, Context -from bot.constants import ( - BigBrother as BigBrotherConfig, Guild as GuildConfig -) +from bot.constants import BigBrother as BigBrotherConfig, Guild as GuildConfig from bot.pagination import LinePaginator from bot.utils import messages from bot.utils.time import time_since @@ -23,6 +21,19 @@ log = logging.getLogger(__name__) URL_RE = re.compile(r"(https?://[^\s]+)") +def proxy_user(user_id: str) -> Object: + try: + user_id = int(user_id) + except ValueError: + raise BadArgument + user = Object(user_id) + user.mention = user.id + user.display_name = f"<@{user.id}>" + user.avatar_url_as = lambda static_format: None + user.bot = False + return user + + class WatchChannel(ABC): """ Base class for WatchChannels @@ -41,12 +52,11 @@ class WatchChannel(ABC): __init__ of the child after the super().__init__(*args, **kwargs) call. """ - self.bot = bot # These attributes need to be overwritten in the child class self.destination = None # Channels.big_brother_logs - self.webhook_id = None # Some t.b.d. constant + self.webhook_id = None # Webhooks.big_brother self.api_endpoint = None # 'bot/infractions' self.api_default_params = None # {'active': 'true', 'type': 'watch'} @@ -54,7 +64,7 @@ class WatchChannel(ABC): self._consume_task = None self.watched_users = defaultdict(dict) self.message_queue = defaultdict(lambda: defaultdict(deque)) - self.consumption_queue = None + self.consumption_queue = {} self.retries = 5 self.retry_delay = 10 self.channel = None @@ -66,14 +76,13 @@ class WatchChannel(ABC): @property def consuming_messages(self) -> bool: """Checks if a consumption task is currently running.""" - if self._consume_task is None: return False if self._consume_task.done(): exc = self._consume_task.exception() if exc: - log.exception( + self.log.exception( f"{self.__class__.__name__} consume task has failed with:", exc_info=exc ) @@ -81,16 +90,14 @@ class WatchChannel(ABC): return True - # @Cog.listener() async def start_watchchannel(self) -> None: """Retrieves watched users from the API.""" - await self.bot.wait_until_ready() if await self.initialize_channel() and await self.fetch_user_cache(): - log.trace(f"Started the {self.__class__.__name__} WatchChannel") + self.log.trace(f"Started the {self.__class__.__name__} WatchChannel") else: - log.error(f"Failed to start the {self.__class__.__name__} WatchChannel") + self.log.error(f"Failed to start the {self.__class__.__name__} WatchChannel") async def initialize_channel(self) -> bool: """ @@ -101,33 +108,30 @@ class WatchChannel(ABC): channel and webhook were initialized successfully. this function will return `True`. """ - if self.channel is None: for attempt in range(1, self.retries + 1): self.channel = self.bot.get_channel(self.destination) if self.channel is None: - log.error(f"Failed to get the {self.__class__.__name__} channel; cannot watch users") + self.log.error(f"Failed to get the {self.__class__.__name__} channel; cannot watch users") if attempt < self.initialization_retries: - log.error( - f"Attempt {attempt}/{self.retries}; " - f"Retrying in {self.retry_delay} seconds...") + self.log.error(f"Attempt {attempt}/{self.retries}; Retrying in {self.retry_delay} seconds...") await asyncio.sleep(self.retry_delay) else: - log.trace(f"Retrieved the TextChannel for {self.__class__.__name__}") + self.log.trace(f"Retrieved the TextChannel for {self.__class__.__name__}") break else: - log.error(f"Cannot find channel with id `{self.destination}`; cannot watch users") + self.log.error(f"Cannot get channel with id `{self.destination}`; cannot watch users") return False if self.webhook is None: self.webhook = await self.bot.get_webhook_info(self.webhook_id) # This is `fetch_webhook` in current if self.webhook is None: - log.error(f"Cannot find webhook with id `{self.webhook_id}`; cannot watch users") + self.log.error(f"Cannot get webhook with id `{self.webhook_id}`; cannot watch users") return False - log.trace(f"Retrieved the webhook for {self.__class__.__name__}") + self.log.trace(f"Retrieved the webhook for {self.__class__.__name__}") - log.trace(f"WatchChannel for {self.__class__.__name__} is fully initialized") + self.log.trace(f"WatchChannel for {self.__class__.__name__} is fully initialized") return True async def fetch_user_cache(self) -> bool: @@ -136,14 +140,13 @@ class WatchChannel(ABC): This function returns `True` if the update succeeded. """ - try: data = await self.bot.api_client.get( self.api_endpoint, params=self.api_default_params ) except aiohttp.ClientResponseError as e: - log.exception( + self.log.exception( f"Failed to fetch {self.__class__.__name__} watched users from API", exc_info=e ) @@ -157,25 +160,22 @@ class WatchChannel(ABC): return True - # @Cog.listener() async def on_message(self, msg: Message): """Queues up messages sent by watched users.""" - if msg.author.id in self.watched_users: if not self.consuming_messages: self._consume_task = self.bot.loop.create_task(self.consume_messages()) - log.trace(f"Received message: {msg.content} ({len(msg.attachments)} attachments)") + self.log.trace(f"Received message: {msg.content} ({len(msg.attachments)} attachments)") self.message_queue[msg.author.id][msg.channel.id].append(msg) async def consume_messages(self, delay_consumption: bool = True): """Consumes the message queues to log watched users' messages.""" - if delay_consumption: - log.trace(f"Sleeping {BigBrotherConfig.log_delay} seconds before consuming message queue") + self.log.trace(f"Sleeping {BigBrotherConfig.log_delay} seconds before consuming message queue") await asyncio.sleep(1) - log.trace(f"{self.__class__.__name__} started consuming the message queue") + self.log.trace(f"{self.__class__.__name__} started consuming the message queue") # Prevent losing a partly processed consumption queue after Task failure if not self.consumption_queue: @@ -187,36 +187,34 @@ class WatchChannel(ABC): while channel_queue: msg = channel_queue.popleft() - log.trace(f"Consuming message {msg.id} ({len(msg.attachments)} attachments)") + self.log.trace(f"Consuming message {msg.id} ({len(msg.attachments)} attachments)") await self.relay_message(msg) self.consumption_queue.clear() if self.message_queue: - log.trace("Channel queue not empty: Continuing consuming queues") + self.log.trace("Channel queue not empty: Continuing consuming queues") self._consume_task = self.bot.loop.create_task( self.consume_messages(delay_consumption=False) ) else: - log.trace("Done consuming messages.") + self.log.trace("Done consuming messages.") async def webhook_send( self, content: Optional[str] = None, username: Optional[str] = None, avatar_url: Optional[str] = None, embed: Optional[Embed] = None, ): """Sends a message to the webhook with the specified kwargs.""" - try: await self.webhook.send(content=content, username=username, avatar_url=avatar_url, embed=embed) except discord.HTTPException as exc: - log.exception( + self.log.exception( f"Failed to send message to {self.__class__.__name__} webhook", exc_info=exc ) async def relay_message(self, msg: Message) -> None: """Relays the message to the relevant WatchChannel""" - last_author, last_channel, count = self.message_history limit = BigBrotherConfig.header_message_limit @@ -252,7 +250,7 @@ class WatchChannel(ABC): avatar_url=msg.author.avatar_url ) except discord.HTTPException as exc: - log.exception( + self.log.exception( f"Failed to send an attachment to {self.__class__.__name__} webhook", exc_info=exc ) @@ -261,7 +259,6 @@ class WatchChannel(ABC): async def send_header(self, msg): """Sends an header embed to the WatchChannel""" - user_id = msg.author.id guild = self.bot.get_guild(GuildConfig.id) @@ -304,16 +301,13 @@ class WatchChannel(ABC): description=f":x: **Failed to update {self.__class__.__name__} user cache**", color=Color.red() ) - ctx.send(embed=e) + await ctx.send(embed=e) + return lines = [] for user_id, user_data in self.watched_users.items(): inserted_at = user_data['inserted_at'] - date_time = datetime.datetime.strptime( - inserted_at, - "%Y-%m-%dT%H:%M:%S.%fZ" - ).replace(tzinfo=None) - time_delta = time_since(date_time, precision="minutes", max_units=1) + time_delta = self._get_time_delta(inserted_at) lines.append(f"• <@{user_id}> (added {time_delta})") await LinePaginator.paginate( @@ -326,16 +320,43 @@ class WatchChannel(ABC): empty=False ) + @staticmethod + def _get_time_delta(time_string: str) -> str: + """Returns the time in human-readable time delta format""" + + date_time = datetime.datetime.strptime( + time_string, + "%Y-%m-%dT%H:%M:%S.%fZ" + ).replace(tzinfo=None) + time_delta = time_since(date_time, precision="minutes", max_units=1) + + return time_delta + + @staticmethod + def _get_human_readable(time_string: str, output_format: str = "%Y-%m-%d %H:%M:%S") -> str: + date_time = datetime.datetime.strptime( + time_string, + "%Y-%m-%dT%H:%M:%S.%fZ" + ).replace(tzinfo=None) + return date_time.strftime(output_format) + + def _remove_user(self, user_id: int) -> None: + """Removes user from the WatchChannel""" + + self.watched_users.pop(user_id, None) + self.message_queue.pop(user_id, None) + self.consumption_queue.pop(user_id, None) + def cog_unload(self): """Takes care of unloading the cog and cancelling the consumption task.""" - log.trace(f"Unloading {self.__class__._name__} cog") + self.log.trace(f"Unloading {self.__class__._name__} cog") if not self._consume_task.done(): self._consume_task.cancel() try: self._consume_task.result() except asyncio.CancelledError as e: - log.exception( + self.log.exception( f"The {self.__class__._name__} consume task was cancelled. Messages may be lost.", exc_info=e ) |