diff options
Diffstat (limited to '')
| -rw-r--r-- | bot/exts/help_channels/__init__.py | 40 | ||||
| -rw-r--r-- | bot/exts/help_channels/_channel.py | 281 | ||||
| -rw-r--r-- | bot/exts/help_channels/_cog.py | 674 | ||||
| -rw-r--r-- | bot/exts/help_channels/_message.py | 311 | ||||
| -rw-r--r-- | bot/exts/help_channels/_name.py | 69 | ||||
| -rw-r--r-- | bot/exts/help_channels/_stats.py | 50 | 
6 files changed, 263 insertions, 1162 deletions
| diff --git a/bot/exts/help_channels/__init__.py b/bot/exts/help_channels/__init__.py index b9c940183..00b4a735b 100644 --- a/bot/exts/help_channels/__init__.py +++ b/bot/exts/help_channels/__init__.py @@ -1,40 +1,8 @@ -from bot import constants -from bot.bot import Bot -from bot.exts.help_channels._channel import MAX_CHANNELS_PER_CATEGORY -from bot.log import get_logger - -log = get_logger(__name__) - - -def validate_config() -> None: -    """Raise a ValueError if the cog's config is invalid.""" -    log.trace("Validating config.") -    total = constants.HelpChannels.max_total_channels -    available = constants.HelpChannels.max_available -    if total == 0 or available == 0: -        raise ValueError("max_total_channels and max_available and must be greater than 0.") - -    if total < available: -        raise ValueError( -            f"max_total_channels ({total}) must be greater than or equal to max_available " -            f"({available})." -        ) - -    if total > MAX_CHANNELS_PER_CATEGORY: -        raise ValueError( -            f"max_total_channels ({total}) must be less than or equal to " -            f"{MAX_CHANNELS_PER_CATEGORY} due to Discord's limit on channels per category." -        ) +from bot.bot import Bot +from bot.exts.help_channels._cog import HelpForum  async def setup(bot: Bot) -> None: -    """Load the HelpChannels cog.""" -    # Defer import to reduce side effects from importing the help_channels package. -    from bot.exts.help_channels._cog import HelpChannels -    try: -        validate_config() -    except ValueError as e: -        log.error(f"HelpChannels cog will not be loaded due to misconfiguration: {e}") -    else: -        await bot.add_cog(HelpChannels(bot)) +    """Load the HelpForum cog.""" +    await bot.add_cog(HelpForum(bot)) diff --git a/bot/exts/help_channels/_channel.py b/bot/exts/help_channels/_channel.py index cfe774f4c..38725ddfd 100644 --- a/bot/exts/help_channels/_channel.py +++ b/bot/exts/help_channels/_channel.py @@ -1,195 +1,144 @@ -import re -import typing as t -from datetime import timedelta -from enum import Enum +"""Contains all logic to handle changes to posts in the help forum.""" + +import textwrap -import arrow  import discord -from arrow import Arrow +from botcore.utils import members  import bot  from bot import constants -from bot.exts.help_channels import _caches, _message +from bot.exts.help_channels import _stats  from bot.log import get_logger -from bot.utils.channel import get_or_fetch_channel  log = get_logger(__name__) -MAX_CHANNELS_PER_CATEGORY = 50 -EXCLUDED_CHANNELS = (constants.Channels.cooldown,) -CLAIMED_BY_RE = re.compile(r"Channel claimed by <@!?(?P<user_id>\d{17,20})>\.$") +ASKING_GUIDE_URL = "https://pythondiscord.com/pages/asking-good-questions/" + +POST_TITLE = "Python help channel" +NEW_POST_MSG = f""" +**Remember to:** +• **Ask** your Python question, not if you can ask or if there's an expert who can help. +• **Show** a code sample as text (rather than a screenshot) and the error message, if you got one. +• **Explain** what you expect to happen and what actually happens. +For more tips, check out our guide on [asking good questions]({ASKING_GUIDE_URL}). +""" +POST_FOOTER = f"Closes after a period of inactivity, or when you send {constants.Bot.prefix}close." -class ClosingReason(Enum): -    """All possible closing reasons for help channels.""" +DORMANT_MSG = f""" +This help channel has been marked as **dormant** and locked. \ +It is no longer possible to send messages in this channel. -    COMMAND = "command" -    LATEST_MESSAGE = "auto.latest_message" -    CLAIMANT_TIMEOUT = "auto.claimant_timeout" -    OTHER_TIMEOUT = "auto.other_timeout" -    DELETED = "auto.deleted" -    CLEANUP = "auto.cleanup" +If your question wasn't answered yet, you can create a new post in <#{constants.Channels.help_system_forum}>. \ +Consider rephrasing the question to maximize your chance of getting a good answer. \ +If you're not sure how, have a look through our guide for **[asking a good question]({ASKING_GUIDE_URL})**. +""" -def get_category_channels(category: discord.CategoryChannel) -> t.Iterable[discord.TextChannel]: -    """Yield the text channels of the `category` in an unsorted manner.""" -    log.trace(f"Getting text channels in the category '{category}' ({category.id}).") +def is_help_forum_post(channel: discord.abc.GuildChannel) -> bool: +    """Return True if `channel` is a post in the help forum.""" +    log.trace(f"Checking if #{channel} is a help channel.") +    return getattr(channel, "parent_id", None) == constants.Channels.help_system_forum -    # This is faster than using category.channels because the latter sorts them. -    for channel in category.guild.channels: -        if channel.category_id == category.id and not is_excluded_channel(channel): -            yield channel +async def _close_help_thread(closed_thread: discord.Thread, closed_on: _stats.ClosingReason) -> None: +    """Close the help thread and record stats.""" +    embed = discord.Embed(description=DORMANT_MSG) +    await closed_thread.send(embed=embed) +    await closed_thread.edit(archived=True, locked=True, reason="Locked a dormant help channel") -async def get_closing_time(channel: discord.TextChannel, init_done: bool) -> t.Tuple[Arrow, ClosingReason]: -    """ -    Return the time at which the given help `channel` should be closed along with the reason. +    _stats.report_post_count() +    await _stats.report_complete_session(closed_thread, closed_on) -    `init_done` is True if the cog has finished loading and False otherwise. +    poster = closed_thread.owner +    cooldown_role = closed_thread.guild.get_role(constants.Roles.help_cooldown) +    await members.handle_role_change(poster, poster.remove_roles, cooldown_role) -    The time is calculated as follows: -    * If `init_done` is True or the cached time for the claimant's last message is unavailable, -      add the configured `idle_minutes_claimant` to the time the most recent message was sent. -    * If the help session is empty (see `is_empty`), do the above but with `deleted_idle_minutes`. -    * If either of the above is attempted but the channel is completely empty, close the channel -      immediately. -    * Otherwise, retrieve the times of the claimant's and non-claimant's last messages from the -      cache. Add the configured `idle_minutes_claimant` and idle_minutes_others`, respectively, and -      choose the time which is furthest in the future. -    """ -    log.trace(f"Getting the closing time for #{channel} ({channel.id}).") - -    is_empty = await _message.is_empty(channel) -    if is_empty: -        idle_minutes_claimant = constants.HelpChannels.deleted_idle_minutes -    else: -        idle_minutes_claimant = constants.HelpChannels.idle_minutes_claimant - -    claimant_time = await _caches.claimant_last_message_times.get(channel.id) - -    # The current session lacks messages, the cog is still starting, or the cache is empty. -    if is_empty or not init_done or claimant_time is None: -        msg = await _message.get_last_message(channel) -        if not msg: -            log.debug(f"No idle time available; #{channel} ({channel.id}) has no messages, closing now.") -            return Arrow.min, ClosingReason.DELETED - -        # Use the greatest offset to avoid the possibility of prematurely closing the channel. -        time = Arrow.fromdatetime(msg.created_at) + timedelta(minutes=idle_minutes_claimant) -        reason = ClosingReason.DELETED if is_empty else ClosingReason.LATEST_MESSAGE -        return time, reason - -    claimant_time = Arrow.utcfromtimestamp(claimant_time) -    others_time = await _caches.non_claimant_last_message_times.get(channel.id) - -    if others_time: -        others_time = Arrow.utcfromtimestamp(others_time) -    else: -        # The help session hasn't received any answers (messages from non-claimants) yet. -        # Set to min value so it isn't considered when calculating the closing time. -        others_time = Arrow.min - -    # Offset the cached times by the configured values. -    others_time += timedelta(minutes=constants.HelpChannels.idle_minutes_others) -    claimant_time += timedelta(minutes=idle_minutes_claimant) - -    # Use the time which is the furthest into the future. -    if claimant_time >= others_time: -        closing_time = claimant_time -        reason = ClosingReason.CLAIMANT_TIMEOUT -    else: -        closing_time = others_time -        reason = ClosingReason.OTHER_TIMEOUT - -    log.trace(f"#{channel} ({channel.id}) should be closed at {closing_time} due to {reason}.") -    return closing_time, reason - - -async def get_in_use_time(channel_id: int) -> t.Optional[timedelta]: -    """Return the duration `channel_id` has been in use. Return None if it's not in use.""" -    log.trace(f"Calculating in use time for channel {channel_id}.") - -    claimed_timestamp = await _caches.claim_times.get(channel_id) -    if claimed_timestamp: -        claimed = Arrow.utcfromtimestamp(claimed_timestamp) -        return arrow.utcnow() - claimed - - -def is_excluded_channel(channel: discord.abc.GuildChannel) -> bool: -    """Check if a channel should be excluded from the help channel system.""" -    return not isinstance(channel, discord.TextChannel) or channel.id in EXCLUDED_CHANNELS - - -async def move_to_bottom(channel: discord.TextChannel, category_id: int, **options) -> None: -    """ -    Move the `channel` to the bottom position of `category` and edit channel attributes. - -    To ensure "stable sorting", we use the `bulk_channel_update` endpoint and provide the current -    positions of the other channels in the category as-is. This should make sure that the channel -    really ends up at the bottom of the category. - -    If `options` are provided, the channel will be edited after the move is completed. This is the -    same order of operations that `discord.TextChannel.edit` uses. For information on available -    options, see the documentation on `discord.TextChannel.edit`. While possible, position-related -    options should be avoided, as it may interfere with the category move we perform. -    """ -    # Get a fresh copy of the category from the bot to avoid the cache mismatch issue we had. -    category = await get_or_fetch_channel(category_id) - -    payload = [{"id": c.id, "position": c.position} for c in category.channels] - -    # Calculate the bottom position based on the current highest position in the category. If the -    # category is currently empty, we simply use the current position of the channel to avoid making -    # unnecessary changes to positions in the guild. -    bottom_position = payload[-1]["position"] + 1 if payload else channel.position - -    payload.append( -        { -            "id": channel.id, -            "position": bottom_position, -            "parent_id": category.id, -            "lock_permissions": True, -        } +async def send_opened_post_message(thread: discord.Thread) -> None: +    """Send the opener message in the new help post.""" +    embed = discord.Embed( +        color=constants.Colours.bright_green, +        description=NEW_POST_MSG, +    ) +    embed.set_author(name=POST_TITLE) +    embed.set_footer(text=POST_FOOTER) +    await thread.send(embed=embed) + + +async def send_opened_post_dm(thread: discord.Thread) -> None: +    """Send the opener a DM message with a jump link to their new post.""" +    embed = discord.Embed( +        title="Help channel opened", +        description=f"You opened {thread.mention}.", +        colour=constants.Colours.bright_green, +        timestamp=thread.created_at, +    ) +    embed.set_thumbnail(url=constants.Icons.green_questionmark) +    message = thread.starter_message +    if not message: +        try: +            message = await thread.fetch_message(thread.id) +        except discord.HTTPException: +            log.warning(f"Could not fetch message for thread {thread.name} ({thread.id})") +            return + +    formatted_message = textwrap.shorten(message.content, width=100, placeholder="...") +    embed.add_field(name="Your message", value=formatted_message, inline=False) +    embed.add_field( +        name="Conversation", +        value=f"[Jump to message!]({message.jump_url})", +        inline=False,      ) -    # We use d.py's method to ensure our request is processed by d.py's rate limit manager -    await bot.instance.http.bulk_channel_update(category.guild.id, payload) +    try: +        await thread.owner.send(embed=embed) +        log.trace(f"Sent DM to {thread.owner} ({thread.owner_id}) after posting in help forum.") +    except discord.errors.Forbidden: +        log.trace( +            f"Ignoring to send DM to {thread.owner} ({thread.owner_id}) after posting in help forum: DMs disabled.", +        ) -    # Now that the channel is moved, we can edit the other attributes -    if options: -        await channel.edit(**options) +async def help_thread_opened(opened_thread: discord.Thread, *, reopen: bool = False) -> None: +    """Apply new post logic to a new help forum post.""" +    _stats.report_post_count() -async def ensure_cached_claimant(channel: discord.TextChannel) -> None: -    """ -    Ensure there is a claimant cached for each help channel. +    if not isinstance(opened_thread.owner, discord.Member): +        log.debug(f"{opened_thread.owner_id} isn't a member. Closing post.") +        await _close_help_thread(opened_thread, _stats.ClosingReason.CLEANUP) +        return -    Check the redis cache first, return early if there is already a claimant cached. -    If there isn't an entry in redis, search for the "Claimed by X." embed in channel history. -        Stopping early if we discover a dormant message first. +    await send_opened_post_message(opened_thread) +    await send_opened_post_dm(opened_thread) + +    cooldown_role = opened_thread.guild.get_role(constants.Roles.help_cooldown) +    await members.handle_role_change(opened_thread.owner, opened_thread.owner.add_roles, cooldown_role) -    If a claimant could not be found, send a warning to #helpers and set the claimant to the bot. -    """ -    if await _caches.claimants.get(channel.id): -        return -    async for message in channel.history(limit=1000): -        if message.author.id != bot.instance.user.id: -            # We only care about bot messages +async def help_thread_closed(closed_thread: discord.Thread) -> None: +    """Apply archive logic to a manually closed help forum post.""" +    await _close_help_thread(closed_thread, _stats.ClosingReason.COMMAND) + + +async def help_thread_archived(archived_thread: discord.Thread) -> None: +    """Apply archive logic to an archived help forum post.""" +    async for thread_update in archived_thread.guild.audit_logs(limit=50, action=discord.AuditLogAction.thread_update): +        if thread_update.target.id != archived_thread.id:              continue -        if message.embeds: -            if _message._match_bot_embed(message, _message.DORMANT_MSG): -                log.info("Hit the dormant message embed before finding a claimant in %s (%d).", channel, channel.id) -                break -            # Only set the claimant if the first embed matches the claimed channel embed regex -            description = message.embeds[0].description -            if (description is not None) and (match := CLAIMED_BY_RE.match(description)): -                await _caches.claimants.set(channel.id, int(match.group("user_id"))) -                return - -    await bot.instance.get_channel(constants.Channels.helpers).send( -        f"I couldn't find a claimant for {channel.mention} in that last 1000 messages. " -        "Please use your helper powers to close the channel if/when appropriate." -    ) -    await _caches.claimants.set(channel.id, bot.instance.user.id) + +        # Don't apply close logic if the post was archived by the bot, as it +        # would have been done so via _close_help_thread. +        if thread_update.user.id == bot.instance.user.id: +            return + +    await _close_help_thread(archived_thread, _stats.ClosingReason.INACTIVE) + + +async def help_thread_deleted(deleted_thread_event: discord.RawThreadDeleteEvent) -> None: +    """Record appropriate stats when a help thread is deleted.""" +    _stats.report_post_count() +    cached_thread = deleted_thread_event.thread +    if cached_thread and not cached_thread.archived: +        # If the thread is in the bot's cache, and it was not archived before deleting, report a complete session. +        await _stats.report_complete_session(cached_thread, _stats.ClosingReason.DELETED) diff --git a/bot/exts/help_channels/_cog.py b/bot/exts/help_channels/_cog.py index 31a33f8af..6423f6f2f 100644 --- a/bot/exts/help_channels/_cog.py +++ b/bot/exts/help_channels/_cog.py @@ -1,656 +1,74 @@ -import asyncio -import random -import typing as t -from datetime import timedelta -from operator import attrgetter +"""Contains the Cog that receives discord.py events and defers most actions to other files in the module.""" -import arrow  import discord -import discord.abc -from botcore.utils import members, scheduling  from discord.ext import commands  from bot import constants  from bot.bot import Bot -from bot.constants import Channels, RedirectOutput -from bot.exts.help_channels import _caches, _channel, _message, _name, _stats +from bot.exts.help_channels import _caches, _channel, _message  from bot.log import get_logger -from bot.utils import channel as channel_utils, lock  log = get_logger(__name__) -NAMESPACE = "help" -HELP_CHANNEL_TOPIC = """ -This is a Python help channel. You can claim your own help channel in the Python Help: Available category. -""" -AVAILABLE_HELP_CHANNELS = "**Currently available help channel(s):** {available}" - -class HelpChannels(commands.Cog): +class HelpForum(commands.Cog):      """ -    Manage the help channel system of the guild. - -    The system is based on a 3-category system: - -    Available Category - -    * Contains channels which are ready to be occupied by someone who needs help -    * Will always contain `constants.HelpChannels.max_available` channels; refilled automatically -      from the pool of dormant channels -        * Prioritise using the channels which have been dormant for the longest amount of time -        * If there are no more dormant channels, the bot will automatically create a new one -        * If there are no dormant channels to move, helpers will be notified (see `notify()`) -    * When a channel becomes available, the dormant embed will be edited to show `AVAILABLE_MSG` -    * User can only claim a channel at an interval `constants.HelpChannels.claim_minutes` -        * To keep track of cooldowns, user which claimed a channel will have a temporary role - -    In Use Category - -    * Contains all channels which are occupied by someone needing help -    * Channel moves to dormant category after -        - `constants.HelpChannels.idle_minutes_other` minutes since the last user message, or -        - `constants.HelpChannels.idle_minutes_claimant` minutes since the last claimant message. -    * Command can prematurely mark a channel as dormant -        * Channel claimant is allowed to use the command -        * Allowed roles for the command are configurable with `constants.HelpChannels.cmd_whitelist` -    * When a channel becomes dormant, an embed with `DORMANT_MSG` will be sent +    Manage the help channel forum of the guild. -    Dormant Category +    This system uses Discord's native forum channel feature to handle most of the logic. -    * Contains channels which aren't in use -    * Channels are used to refill the Available category - -    Help channels are named after the foods in `bot/resources/foods.json`. +    The purpose of this cog is to add additional features, such as stats collection, old post locking +    and helpful automated messages.      """      def __init__(self, bot: Bot):          self.bot = bot -        self.scheduler = scheduling.Scheduler(self.__class__.__name__) - -        self.guild: discord.Guild = None -        self.cooldown_role: discord.Role = None - -        # Categories -        self.available_category: discord.CategoryChannel = None -        self.in_use_category: discord.CategoryChannel = None -        self.dormant_category: discord.CategoryChannel = None - -        # Queues -        self.channel_queue: asyncio.Queue[discord.TextChannel] = None -        self.name_queue: t.Deque[str] = None - -        # Notifications -        # Using a very old date so that we don't have to use Optional typing. -        self.last_none_remaining_notification = arrow.get('1815-12-10T18:00:00.00000+00:00') -        self.last_running_low_notification = arrow.get('1815-12-10T18:00:00.00000+00:00') - -        self.dynamic_message: t.Optional[int] = None -        self.available_help_channels: t.Set[discord.TextChannel] = set() - -        # Asyncio stuff -        self.queue_tasks: t.List[asyncio.Task] = [] -        self.init_done = False - -    async def cog_unload(self) -> None: -        """Cancel the init task and scheduled tasks when the cog unloads.""" -        log.trace("Cog unload: cancelling the init_cog task") - -        log.trace("Cog unload: cancelling the channel queue tasks") -        for task in self.queue_tasks: -            task.cancel() - -        self.scheduler.cancel_all() - -    @lock.lock_arg(NAMESPACE, "message", attrgetter("channel.id")) -    @lock.lock_arg(NAMESPACE, "message", attrgetter("author.id")) -    @lock.lock_arg(f"{NAMESPACE}.unclaim", "message", attrgetter("author.id"), wait=True) -    async def claim_channel(self, message: discord.Message) -> None: -        """ -        Claim the channel in which the question `message` was sent. - -        Send an embed stating the claimant, move the channel to the In Use category, and pin the `message`. -        Add a cooldown to the claimant to prevent them from asking another question. -        Lastly, make a new channel available. -        """ -        log.info(f"Channel #{message.channel} was claimed by `{message.author.id}`.") - -        try: -            await self.move_to_in_use(message.channel) -        except discord.DiscordServerError: -            try: -                await message.channel.send( -                    "The bot encountered a Discord API error while trying to move this channel, please try again later." -                ) -            except Exception as e: -                log.warning("Error occurred while sending fail claim message:", exc_info=e) -            log.info( -                "500 error from Discord when moving #%s (%d) to in-use for %s (%d). Cancelling claim.", -                message.channel.name, -                message.channel.id, -                message.author.name, -                message.author.id, -            ) -            self.bot.stats.incr("help.failed_claims.500_on_move") -            return - -        embed = discord.Embed( -            description=f"Channel claimed by {message.author.mention}.", -            color=constants.Colours.bright_green, -        ) -        await message.channel.send(embed=embed) - -        # Handle odd edge case of `message.author` not being a `discord.Member` (see bot#1839) -        if not isinstance(message.author, discord.Member): -            log.debug(f"{message.author} ({message.author.id}) isn't a member. Not giving cooldown role or sending DM.") -        else: -            await members.handle_role_change(message.author, message.author.add_roles, self.cooldown_role) - -            try: -                await _message.dm_on_open(message) -            except Exception as e: -                log.warning("Error occurred while sending DM:", exc_info=e) - -        await _message.pin(message) - -        # Add user with channel for dormant check. -        await _caches.claimants.set(message.channel.id, message.author.id) - -        self.bot.stats.incr("help.claimed") - -        # datetime.timestamp() would assume it's local, despite d.py giving a (naïve) UTC time. -        timestamp = arrow.Arrow.fromdatetime(message.created_at).timestamp() - -        await _caches.claim_times.set(message.channel.id, timestamp) -        await _caches.claimant_last_message_times.set(message.channel.id, timestamp) -        # Delete to indicate that the help session has yet to receive an answer. -        await _caches.non_claimant_last_message_times.delete(message.channel.id) - -        # Removing the help channel from the dynamic message, and editing/sending that message. -        self.available_help_channels.remove(message.channel) - -        # Not awaited because it may indefinitely hold the lock while waiting for a channel. -        scheduling.create_task(self.move_to_available(), name=f"help_claim_{message.id}") - -    def create_channel_queue(self) -> asyncio.Queue: -        """ -        Return a queue of dormant channels to use for getting the next available channel. - -        The channels are added to the queue in a random order. -        """ -        log.trace("Creating the channel queue.") - -        channels = list(_channel.get_category_channels(self.dormant_category)) -        random.shuffle(channels) - -        log.trace("Populating the channel queue with channels.") -        queue = asyncio.Queue() -        for channel in channels: -            queue.put_nowait(channel) - -        return queue - -    async def create_dormant(self) -> t.Optional[discord.TextChannel]: -        """ -        Create and return a new channel in the Dormant category. - -        The new channel will sync its permission overwrites with the category. - -        Return None if no more channel names are available. -        """ -        log.trace("Getting a name for a new dormant channel.") - -        try: -            name = self.name_queue.popleft() -        except IndexError: -            log.debug("No more names available for new dormant channels.") -            return None - -        log.debug(f"Creating a new dormant channel named {name}.") -        return await self.dormant_category.create_text_channel(name, topic=HELP_CHANNEL_TOPIC) +        self.help_forum_channel_id = constants.Channels.help_system_forum      async def close_check(self, ctx: commands.Context) -> bool: -        """Return True if the channel is in use and the user is the claimant or has a whitelisted role.""" -        if ctx.channel.category != self.in_use_category: -            log.debug(f"{ctx.author} invoked command 'close' outside an in-use help channel") +        """Return True if the channel is a help post, and the user is the claimant or has a whitelisted role.""" +        if not _channel.is_help_forum_post(ctx.channel):              return False -        if await _caches.claimants.get(ctx.channel.id) == ctx.author.id: +        if ctx.author.id == ctx.channel.owner_id:              log.trace(f"{ctx.author} is the help channel claimant, passing the check for dormant.")              self.bot.stats.incr("help.dormant_invoke.claimant")              return True          log.trace(f"{ctx.author} is not the help channel claimant, checking roles.")          has_role = await commands.has_any_role(*constants.HelpChannels.cmd_whitelist).predicate(ctx) -          if has_role:              self.bot.stats.incr("help.dormant_invoke.staff") -          return has_role -    @commands.command(name="close", aliases=["dormant", "solved"], enabled=False) +    @commands.group(name="help-forum") +    async def help_forum_group(self,  ctx: commands.Context) -> None: +        """A group of commands that help manage our help forum system.""" +        if not ctx.invoked_subcommand: +            await ctx.send_help(ctx.command) + +    @help_forum_group.command(name="close", root_aliases=("close", "dormant", "solved"))      async def close_command(self, ctx: commands.Context) -> None:          """ -        Make the current in-use help channel dormant. +        Make the help post this command was called in dormant.          May only be invoked by the channel's claimant or by staff.          """          # Don't use a discord.py check because the check needs to fail silently.          if await self.close_check(ctx):              log.info(f"Close command invoked by {ctx.author} in #{ctx.channel}.") -            await self.unclaim_channel(ctx.channel, closed_on=_channel.ClosingReason.COMMAND) - -    async def get_available_candidate(self) -> discord.TextChannel: -        """ -        Return a dormant channel to turn into an available channel. - -        If no channel is available, wait indefinitely until one becomes available. -        """ -        log.trace("Getting an available channel candidate.") - -        try: -            channel = self.channel_queue.get_nowait() -        except asyncio.QueueEmpty: -            log.info("No candidate channels in the queue; creating a new channel.") -            channel = await self.create_dormant() - -            if not channel: -                log.info("Couldn't create a candidate channel; waiting to get one from the queue.") -                last_notification = await _message.notify_none_remaining(self.last_none_remaining_notification) - -                if last_notification: -                    self.last_none_remaining_notification = last_notification - -                channel = await self.wait_for_dormant_channel()  # Blocks until a new channel is available - -        else: -            last_notification = await _message.notify_running_low( -                self.channel_queue.qsize(), -                self.last_running_low_notification -            ) - -            if last_notification: -                self.last_running_low_notification = last_notification - -        return channel - -    async def init_available(self) -> None: -        """Initialise the Available category with channels.""" -        log.trace("Initialising the Available category with channels.") - -        channels = list(_channel.get_category_channels(self.available_category)) -        missing = constants.HelpChannels.max_available - len(channels) - -        # If we've got less than `max_available` channel available, we should add some. -        if missing > 0: -            log.trace(f"Moving {missing} missing channels to the Available category.") -            for _ in range(missing): -                await self.move_to_available() - -        # If for some reason we have more than `max_available` channels available, -        # we should move the superfluous ones over to dormant. -        elif missing < 0: -            log.trace(f"Moving {abs(missing)} superfluous available channels over to the Dormant category.") -            for channel in channels[:abs(missing)]: -                await self.unclaim_channel(channel, closed_on=_channel.ClosingReason.CLEANUP) - -        self.available_help_channels = set(_channel.get_category_channels(self.available_category)) - -        # Getting channels that need to be included in the dynamic message. -        await self.update_available_help_channels() -        log.trace("Dynamic available help message updated.") - -    async def init_categories(self) -> None: -        """Get the help category objects. Remove the cog if retrieval fails.""" -        log.trace("Getting the CategoryChannel objects for the help categories.") - -        try: -            self.available_category = await channel_utils.get_or_fetch_channel( -                constants.Categories.help_available -            ) -            self.in_use_category = await channel_utils.get_or_fetch_channel( -                constants.Categories.help_in_use -            ) -            self.dormant_category = await channel_utils.get_or_fetch_channel( -                constants.Categories.help_dormant -            ) -        except discord.HTTPException: -            log.exception("Failed to get a category; cog will be removed") -            await self.bot.remove_cog(self.qualified_name) - -    async def cog_load(self) -> None: -        """Initialise the help channel system.""" -        log.trace("Waiting for the guild to be available before initialisation.") -        await self.bot.wait_until_guild_available() - -        log.trace("Initialising the cog.") -        self.guild = self.bot.get_guild(constants.Guild.id) -        self.cooldown_role = self.guild.get_role(constants.Roles.help_cooldown) - -        await self.init_categories() - -        self.channel_queue = self.create_channel_queue() -        self.name_queue = _name.create_name_queue( -            self.available_category, -            self.in_use_category, -            self.dormant_category, -        ) - -        log.trace("Moving or rescheduling in-use channels.") -        for channel in _channel.get_category_channels(self.in_use_category): -            await _channel.ensure_cached_claimant(channel) -            await self.move_idle_channel(channel, has_task=False) - -        # Prevent the command from being used until ready. -        # The ready event wasn't used because channels could change categories between the time -        # the command is invoked and the cog is ready (e.g. if move_idle_channel wasn't called yet). -        # This may confuse users. So would potentially long delays for the cog to become ready. -        self.close_command.enabled = True - -        # Acquiring the dynamic message ID, if it exists within the cache. -        log.trace("Attempting to fetch How-to-get-help dynamic message ID.") -        self.dynamic_message = await _caches.dynamic_message.get("message_id") - -        await self.init_available() -        _stats.report_counts() - -        self.init_done = True -        log.info("Cog is ready!") - -    async def move_idle_channel(self, channel: discord.TextChannel, has_task: bool = True) -> None: -        """ -        Make the `channel` dormant if idle or schedule the move if still active. - -        If `has_task` is True and rescheduling is required, the extant task to make the channel -        dormant will first be cancelled. -        """ -        log.trace(f"Handling in-use channel #{channel} ({channel.id}).") - -        closing_time, closed_on = await _channel.get_closing_time(channel, self.init_done) - -        # Closing time is in the past. -        # Add 1 second due to POSIX timestamps being lower resolution than datetime objects. -        if closing_time < (arrow.utcnow() + timedelta(seconds=1)): -            log.info( -                f"#{channel} ({channel.id}) is idle past {closing_time} " -                f"and will be made dormant. Reason: {closed_on.value}" -            ) - -            await self.unclaim_channel(channel, closed_on=closed_on) -        else: -            # Cancel the existing task, if any. -            if has_task: -                self.scheduler.cancel(channel.id) +            await _channel.help_thread_closed(ctx.channel) -            delay = (closing_time - arrow.utcnow()).seconds -            log.info( -                f"#{channel} ({channel.id}) is still active; " -                f"scheduling it to be moved after {delay} seconds." -            ) - -            self.scheduler.schedule_later(delay, channel.id, self.move_idle_channel(channel)) - -    async def move_to_available(self) -> None: -        """Make a channel available.""" -        log.trace("Making a channel available.") - -        channel = await self.get_available_candidate() -        channel_str = f"#{channel} ({channel.id})" -        log.info(f"Making {channel_str} available.") - -        await _message.send_available_message(channel) - -        log.trace(f"Moving {channel_str} to the Available category.") - -        # Unpin any previously stuck pins -        log.trace(f"Looking for pins stuck in {channel_str}.") -        if stuck_pins := await _message.unpin_all(channel): -            log.debug(f"Removed {stuck_pins} stuck pins from {channel_str}.") - -        await _channel.move_to_bottom( -            channel=channel, -            category_id=constants.Categories.help_available, -        ) - -        # Adding the help channel to the dynamic message, and editing/sending that message. -        self.available_help_channels.add(channel) -        await self.update_available_help_channels() - -        _stats.report_counts() - -    async def move_to_dormant(self, channel: discord.TextChannel) -> None: -        """Make the `channel` dormant.""" -        log.info(f"Moving #{channel} ({channel.id}) to the Dormant category.") -        await _channel.move_to_bottom( -            channel=channel, -            category_id=constants.Categories.help_dormant, -        ) - -        log.trace(f"Sending dormant message for #{channel} ({channel.id}).") -        embed = discord.Embed( -            description=_message.DORMANT_MSG.format( -                dormant=self.dormant_category.name, -                available=self.available_category.name, -            ) -        ) -        await channel.send(embed=embed) - -        log.trace(f"Pushing #{channel} ({channel.id}) into the channel queue.") -        self.channel_queue.put_nowait(channel) - -        _stats.report_counts() - -    @lock.lock_arg(f"{NAMESPACE}.unclaim", "channel") -    async def unclaim_channel(self, channel: discord.TextChannel, *, closed_on: _channel.ClosingReason) -> None: -        """ -        Unclaim an in-use help `channel` to make it dormant. - -        Unpin the claimant's question message and move the channel to the Dormant category. -        Remove the cooldown role from the channel claimant if they have no other channels claimed. -        Cancel the scheduled cooldown role removal task. - -        `closed_on` is the reason that the channel was closed. See _channel.ClosingReason for possible values. -        """ -        claimant_id = await _caches.claimants.get(channel.id) -        _unclaim_channel = self._unclaim_channel - -        # It could be possible that there is no claimant cached. In such case, it'd be useless and -        # possibly incorrect to lock on None. Therefore, the lock is applied conditionally. -        if claimant_id is not None: -            decorator = lock.lock_arg(f"{NAMESPACE}.unclaim", "claimant_id", wait=True) -            _unclaim_channel = decorator(_unclaim_channel) - -        return await _unclaim_channel(channel, claimant_id, closed_on) - -    async def _unclaim_channel( -        self, -        channel: discord.TextChannel, -        claimant_id: t.Optional[int], -        closed_on: _channel.ClosingReason -    ) -> None: -        """Actual implementation of `unclaim_channel`. See that for full documentation.""" -        await _caches.claimants.delete(channel.id) -        await _caches.session_participants.delete(channel.id) - -        if not claimant_id: -            log.info("No claimant given when un-claiming %s (%d). Skipping role removal.", channel, channel.id) -        else: -            claimant = await members.get_or_fetch_member(self.guild, claimant_id) -            if claimant is None: -                log.info(f"{claimant_id} left the guild during their help session; the cooldown role won't be removed") -            else: -                await members.handle_role_change(claimant, claimant.remove_roles, self.cooldown_role) - -        await _message.unpin_all(channel) -        await _stats.report_complete_session(channel.id, closed_on) -        await self.move_to_dormant(channel) - -        # Cancel the task that makes the channel dormant only if called by the close command. -        # In other cases, the task is either already done or not-existent. -        if closed_on == _channel.ClosingReason.COMMAND: -            self.scheduler.cancel(channel.id) - -    async def move_to_in_use(self, channel: discord.TextChannel) -> None: -        """Make a channel in-use and schedule it to be made dormant.""" -        log.info(f"Moving #{channel} ({channel.id}) to the In Use category.") - -        await _channel.move_to_bottom( -            channel=channel, -            category_id=constants.Categories.help_in_use, -        ) - -        timeout = constants.HelpChannels.idle_minutes_claimant * 60 - -        log.trace(f"Scheduling #{channel} ({channel.id}) to become dormant in {timeout} sec.") -        self.scheduler.schedule_later(timeout, channel.id, self.move_idle_channel(channel)) -        _stats.report_counts() - -    @commands.Cog.listener() -    async def on_message(self, message: discord.Message) -> None: -        """Move an available channel to the In Use category and replace it with a dormant one.""" -        if message.author.bot: -            return  # Ignore messages sent by bots. - -        if channel_utils.is_in_category(message.channel, constants.Categories.help_available): -            if not _channel.is_excluded_channel(message.channel): -                await self.claim_channel(message) - -        elif channel_utils.is_in_category(message.channel, constants.Categories.help_in_use): -            await self.notify_session_participants(message) -            await _message.update_message_caches(message) - -    @commands.Cog.listener() -    async def on_message_delete(self, msg: discord.Message) -> None: -        """ -        Reschedule an in-use channel to become dormant sooner if the channel is empty. - -        The new time for the dormant task is configured with `HelpChannels.deleted_idle_minutes`. -        """ -        if not channel_utils.is_in_category(msg.channel, constants.Categories.help_in_use): -            return - -        if not await _message.is_empty(msg.channel): -            return - -        log.info(f"Claimant of #{msg.channel} ({msg.author}) deleted message, channel is empty now. Rescheduling task.") - -        # Cancel existing dormant task before scheduling new. -        self.scheduler.cancel(msg.channel.id) - -        delay = constants.HelpChannels.deleted_idle_minutes * 60 -        self.scheduler.schedule_later(delay, msg.channel.id, self.move_idle_channel(msg.channel)) - -    async def wait_for_dormant_channel(self) -> discord.TextChannel: -        """Wait for a dormant channel to become available in the queue and return it.""" -        log.trace("Waiting for a dormant channel.") - -        task = scheduling.create_task(self.channel_queue.get()) -        self.queue_tasks.append(task) -        channel = await task - -        log.trace(f"Channel #{channel} ({channel.id}) finally retrieved from the queue.") -        self.queue_tasks.remove(task) - -        return channel - -    async def update_available_help_channels(self) -> None: -        """Updates the dynamic message within #how-to-get-help for available help channels.""" -        available_channels = AVAILABLE_HELP_CHANNELS.format( -            available=", ".join( -                c.mention for c in sorted(self.available_help_channels, key=attrgetter("position")) -            ) or None -        ) - -        if self.dynamic_message is not None: -            try: -                log.trace("Help channels have changed, dynamic message has been edited.") -                await discord.PartialMessage( -                    channel=self.bot.get_channel(constants.Channels.how_to_get_help), -                    id=self.dynamic_message, -                ).edit(content=available_channels) -            except discord.NotFound: -                pass -            else: -                return - -        log.trace("Dynamic message could not be edited or found. Creating a new one.") -        new_dynamic_message = await self.bot.get_channel(constants.Channels.how_to_get_help).send(available_channels) -        self.dynamic_message = new_dynamic_message.id -        await _caches.dynamic_message.set("message_id", self.dynamic_message) - -    @staticmethod -    def _serialise_session_participants(participants: set[int]) -> str: -        """Convert a set to a comma separated string.""" -        return ','.join(str(p) for p in participants) - -    @staticmethod -    def _deserialise_session_participants(s: str) -> set[int]: -        """Convert a comma separated string into a set.""" -        return set(int(user_id) for user_id in s.split(",") if user_id != "") - -    @lock.lock_arg(NAMESPACE, "message", attrgetter("channel.id")) -    @lock.lock_arg(NAMESPACE, "message", attrgetter("author.id")) -    async def notify_session_participants(self, message: discord.Message) -> None: -        """ -        Check if the message author meets the requirements to be notified. - -        If they meet the requirements they are notified. -        """ -        if await _caches.claimants.get(message.channel.id) == message.author.id: -            return  # Ignore messages sent by claimants - -        if not await _caches.help_dm.get(message.author.id): -            return  # Ignore message if user is opted out of help dms - -        if (await self.bot.get_context(message)).command == self.close_command: -            return  # Ignore messages that are closing the channel - -        session_participants = self._deserialise_session_participants( -            await _caches.session_participants.get(message.channel.id) or "" -        ) - -        if message.author.id not in session_participants: -            session_participants.add(message.author.id) - -            embed = discord.Embed( -                title="Currently Helping", -                description=f"You're currently helping in {message.channel.mention}", -                color=constants.Colours.bright_green, -                timestamp=message.created_at -            ) -            embed.add_field(name="Conversation", value=f"[Jump to message]({message.jump_url})") - -            try: -                await message.author.send(embed=embed) -            except discord.Forbidden: -                log.trace( -                    f"Failed to send helpdm message to {message.author.id}. DMs Closed/Blocked. " -                    "Removing user from helpdm." -                ) -                bot_commands_channel = self.bot.get_channel(Channels.bot_commands) -                await _caches.help_dm.delete(message.author.id) -                await bot_commands_channel.send( -                    f"{message.author.mention} {constants.Emojis.cross_mark} " -                    "To receive updates on help channels you're active in, enable your DMs.", -                    delete_after=RedirectOutput.delete_delay -                ) -                return - -            await _caches.session_participants.set( -                message.channel.id, -                self._serialise_session_participants(session_participants) -            ) - -    @commands.command(name="helpdm") -    async def helpdm_command( +    @help_forum_group.command(name="dm", root_aliases=("helpdm",)) +    async def help_dm_command(          self,          ctx: commands.Context, -        state_bool: bool +        state_bool: bool,      ) -> None:          """ -        Allows user to toggle "Helping" dms. +        Allows user to toggle "Helping" DMs.          If this is set to on the user will receive a dm for the channel they are participating in. -          If this is set to off the user will not receive a dm for channel that they are participating in.          """          state_str = "ON" if state_bool else "OFF" @@ -664,3 +82,47 @@ class HelpChannels(commands.Cog):          else:              await _caches.help_dm.delete(ctx.author.id)          await ctx.send(f"{constants.Emojis.ok_hand} {ctx.author.mention} Help DMs {state_str}!") + +    @help_forum_group.command(name="title", root_aliases=("title",)) +    async def rename_help_post(self, ctx: commands.Context, *, title: str) -> None: +        """Rename the help post to the provided title.""" +        if not _channel.is_help_forum_post(ctx.channel): +            # Silently fail in channels other than help posts +            return + +        if not await commands.has_any_role(constants.Roles.helpers).predicate(ctx): +            # Silently fail for non-helpers +            return + +        await ctx.channel.edit(name=title) + +    @commands.Cog.listener() +    async def on_thread_create(self, thread: discord.Thread) -> None: +        """Defer application of new post logic for posts the help forum to the _channel helper.""" +        if thread.parent_id == self.help_forum_channel_id: +            await _channel.help_thread_opened(thread) + +    @commands.Cog.listener() +    async def on_thread_update(self, before: discord.Thread, after: discord.Thread) -> None: +        """Defer application archive logic for posts in the help forum to the _channel helper.""" +        if after.parent_id != self.help_forum_channel_id: +            return +        if not before.archived and after.archived: +            await _channel.help_thread_archived(after) + +    @commands.Cog.listener() +    async def on_raw_thread_delete(self, deleted_thread_event: discord.RawThreadDeleteEvent) -> None: +        """Defer application of new post logic for posts the help forum to the _channel helper.""" +        if deleted_thread_event.parent_id == self.help_forum_channel_id: +            await _channel.help_thread_deleted(deleted_thread_event) + +    @commands.Cog.listener() +    async def on_message(self, message: discord.Message) -> None: +        """Defer application of new message logic for messages in the help forum to the _message helper.""" +        if not _channel.is_help_forum_post(message.channel): +            return None + +        await _message.notify_session_participants(message) + +        if message.author.id != message.channel.owner_id: +            await _caches.posts_with_non_claimant_messages.set(message.channel.id, "sentinel") diff --git a/bot/exts/help_channels/_message.py b/bot/exts/help_channels/_message.py index 00d57ea40..98bfe59b8 100644 --- a/bot/exts/help_channels/_message.py +++ b/bot/exts/help_channels/_message.py @@ -1,286 +1,73 @@ -import textwrap -import typing as t +from operator import attrgetter -import arrow  import discord -from arrow import Arrow  import bot  from bot import constants  from bot.exts.help_channels import _caches  from bot.log import get_logger +from bot.utils import lock  log = get_logger(__name__) +NAMESPACE = "help" -ASKING_GUIDE_URL = "https://pythondiscord.com/pages/asking-good-questions/" -AVAILABLE_MSG = f""" -Send your question here to claim the channel. +def _serialise_session_participants(participants: set[int]) -> str: +    """Convert a set to a comma separated string.""" +    return ','.join(str(p) for p in participants) -**Remember to:** -• **Ask** your Python question, not if you can ask or if there's an expert who can help. -• **Show** a code sample as text (rather than a screenshot) and the error message, if you got one. -• **Explain** what you expect to happen and what actually happens. -For more tips, check out our guide on [asking good questions]({ASKING_GUIDE_URL}). -""" +def _deserialise_session_participants(s: str) -> set[int]: +    """Convert a comma separated string into a set.""" +    return set(int(user_id) for user_id in s.split(",") if user_id != "") -AVAILABLE_TITLE = "Available help channel" -AVAILABLE_FOOTER = f"Closes after a period of inactivity, or when you send {constants.Bot.prefix}close." - -DORMANT_MSG = f""" -This help channel has been marked as **dormant**, and has been moved into the **{{dormant}}** \ -category at the bottom of the channel list. It is no longer possible to send messages in this \ -channel until it becomes available again. - -If your question wasn't answered yet, you can claim a new help channel from the \ -**{{available}}** category by simply asking your question again. Consider rephrasing the \ -question to maximize your chance of getting a good answer. If you're not sure how, have a look \ -through our guide for **[asking a good question]({ASKING_GUIDE_URL})**. -""" - - -async def update_message_caches(message: discord.Message) -> None: -    """Checks the source of new content in a help channel and updates the appropriate cache.""" -    channel = message.channel - -    log.trace(f"Checking if #{channel} ({channel.id}) has had a reply.") - -    claimant_id = await _caches.claimants.get(channel.id) -    if not claimant_id: -        # The mapping for this channel doesn't exist, we can't do anything. -        return - -    # datetime.timestamp() would assume it's local, despite d.py giving a (naïve) UTC time. -    timestamp = Arrow.fromdatetime(message.created_at).timestamp() - -    # Overwrite the appropriate last message cache depending on the author of the message -    if message.author.id == claimant_id: -        await _caches.claimant_last_message_times.set(channel.id, timestamp) -    else: -        await _caches.non_claimant_last_message_times.set(channel.id, timestamp) - - -async def get_last_message(channel: discord.TextChannel) -> t.Optional[discord.Message]: -    """Return the last message sent in the channel or None if no messages exist.""" -    log.trace(f"Getting the last message in #{channel} ({channel.id}).") - -    async for message in channel.history(limit=1): -        return message - -    log.debug(f"No last message available; #{channel} ({channel.id}) has no messages.") -    return None - - -async def is_empty(channel: discord.TextChannel) -> bool: -    """Return True if there's an AVAILABLE_MSG and the messages leading up are bot messages.""" -    log.trace(f"Checking if #{channel} ({channel.id}) is empty.") - -    # A limit of 100 results in a single API call. -    # If AVAILABLE_MSG isn't found within 100 messages, then assume the channel is not empty. -    # Not gonna do an extensive search for it cause it's too expensive. -    async for msg in channel.history(limit=100): -        if not msg.author.bot: -            log.trace(f"#{channel} ({channel.id}) has a non-bot message.") -            return False - -        if _match_bot_embed(msg, AVAILABLE_MSG): -            log.trace(f"#{channel} ({channel.id}) has the available message embed.") -            return True - -    return False - - -async def dm_on_open(message: discord.Message) -> None: -    """ -    DM claimant with a link to the claimed channel's first message, with a 100 letter preview of the message. - -    Does nothing if the user has DMs disabled. -    """ -    embed = discord.Embed( -        title="Help channel opened", -        description=f"You claimed {message.channel.mention}.", -        colour=bot.constants.Colours.bright_green, -        timestamp=message.created_at, -    ) - -    embed.set_thumbnail(url=constants.Icons.green_questionmark) -    formatted_message = textwrap.shorten(message.content, width=100, placeholder="...") -    if formatted_message: -        embed.add_field(name="Your message", value=formatted_message, inline=False) -    embed.add_field( -        name="Conversation", -        value=f"[Jump to message!]({message.jump_url})", -        inline=False, -    ) - -    try: -        await message.author.send(embed=embed) -        log.trace(f"Sent DM to {message.author.id} after claiming help channel.") -    except discord.errors.Forbidden: -        log.trace( -            f"Ignoring to send DM to {message.author.id} after claiming help channel: DMs disabled." -        ) - - -async def notify_none_remaining(last_notification: Arrow) -> t.Optional[Arrow]: -    """ -    Send a pinging message in `channel` notifying about there being no dormant channels remaining. - -    If a notification was sent, return the time at which the message was sent. -    Otherwise, return None. - -    Configuration: -        * `HelpChannels.notify_minutes`              - minimum interval between notifications -        * `HelpChannels.notify_none_remaining`       - toggle none_remaining notifications -        * `HelpChannels.notify_none_remaining_roles` - roles mentioned in notifications -    """ -    if not constants.HelpChannels.notify_none_remaining: -        return None - -    if (arrow.utcnow() - last_notification).total_seconds() < (constants.HelpChannels.notify_minutes * 60): -        log.trace("Did not send none_remaining notification as it hasn't been enough time since the last one.") -        return None - -    log.trace("Notifying about lack of channels.") - -    mentions = " ".join(f"<@&{role}>" for role in constants.HelpChannels.notify_none_remaining_roles) -    allowed_roles = [discord.Object(id_) for id_ in constants.HelpChannels.notify_none_remaining_roles] - -    channel = bot.instance.get_channel(constants.HelpChannels.notify_channel) -    if channel is None: -        log.trace("Did not send none_remaining notification as the notification channel couldn't be gathered.") - -    try: -        await channel.send( -            f"{mentions} A new available help channel is needed but there " -            "are no more dormant ones. Consider freeing up some in-use channels manually by " -            f"using the `{constants.Bot.prefix}dormant` command within the channels.", -            allowed_mentions=discord.AllowedMentions(everyone=False, roles=allowed_roles) -        ) -    except Exception: -        # Handle it here cause this feature isn't critical for the functionality of the system. -        log.exception("Failed to send notification about lack of dormant channels!") -    else: -        bot.instance.stats.incr("help.out_of_channel_alerts") -        return arrow.utcnow() - - -async def notify_running_low(number_of_channels_left: int, last_notification: Arrow) -> t.Optional[Arrow]: [email protected]_arg(NAMESPACE, "message", attrgetter("channel.id")) [email protected]_arg(NAMESPACE, "message", attrgetter("author.id")) +async def notify_session_participants(message: discord.Message) -> None:      """ -    Send a non-pinging message in `channel` notifying about there being a low amount of dormant channels. - -    This will include the number of dormant channels left `number_of_channels_left` +    Check if the message author meets the requirements to be notified. -    If a notification was sent, return the time at which the message was sent. -    Otherwise, return None. - -    Configuration: -        * `HelpChannels.notify_minutes`               - minimum interval between notifications -        * `HelpChannels.notify_running_low`           - toggle running_low notifications -        * `HelpChannels.notify_running_low_threshold` - minimum amount of channels to trigger running_low notifications +    If they meet the requirements they are notified.      """ -    if not constants.HelpChannels.notify_running_low: -        return None - -    if number_of_channels_left > constants.HelpChannels.notify_running_low_threshold: -        log.trace("Did not send notify_running_low notification as the threshold was not met.") -        return None - -    if (arrow.utcnow() - last_notification).total_seconds() < (constants.HelpChannels.notify_minutes * 60): -        log.trace("Did not send notify_running_low notification as it hasn't been enough time since the last one.") -        return None - -    log.trace("Notifying about getting close to no dormant channels.") - -    channel = bot.instance.get_channel(constants.HelpChannels.notify_channel) -    if channel is None: -        log.trace("Did not send notify_running notification as the notification channel couldn't be gathered.") - -    try: -        if number_of_channels_left == 1: -            message = f"There is only {number_of_channels_left} dormant channel left. " -        else: -            message = f"There are only {number_of_channels_left} dormant channels left. " -        message += "Consider participating in some help channels so that we don't run out." -        await channel.send(message) -    except Exception: -        # Handle it here cause this feature isn't critical for the functionality of the system. -        log.exception("Failed to send notification about running low of dormant channels!") -    else: -        bot.instance.stats.incr("help.running_low_alerts") -        return arrow.utcnow() - - -async def pin(message: discord.Message) -> None: -    """Pin an initial question `message`.""" -    await _pin_wrapper(message, pin=True) +    if message.channel.owner_id == message.author.id: +        return  # Ignore messages sent by claimants +    if not await _caches.help_dm.get(message.author.id): +        return  # Ignore message if user is opted out of help dms -async def send_available_message(channel: discord.TextChannel) -> None: -    """Send the available message by editing a dormant message or sending a new message.""" -    channel_info = f"#{channel} ({channel.id})" -    log.trace(f"Sending available message in {channel_info}.") - -    embed = discord.Embed( -        color=constants.Colours.bright_green, -        description=AVAILABLE_MSG, +    session_participants = _deserialise_session_participants( +        await _caches.session_participants.get(message.channel.id) or "",      ) -    embed.set_author(name=AVAILABLE_TITLE, icon_url=constants.Icons.green_checkmark) -    embed.set_footer(text=AVAILABLE_FOOTER) - -    msg = await get_last_message(channel) -    if _match_bot_embed(msg, DORMANT_MSG): -        log.trace(f"Found dormant message {msg.id} in {channel_info}; editing it.") -        await msg.edit(embed=embed) -    else: -        log.trace(f"Dormant message not found in {channel_info}; sending a new message.") -        await channel.send(embed=embed) - - -async def unpin_all(channel: discord.TextChannel) -> int: -    """Unpin all pinned messages in `channel` and return the amount of unpinned messages.""" -    count = 0 -    for message in await channel.pins(): -        if await _pin_wrapper(message, pin=False): -            count += 1 - -    return count - - -def _match_bot_embed(message: t.Optional[discord.Message], description: str) -> bool: -    """Return `True` if the bot's `message`'s embed description matches `description`.""" -    if not message or not message.embeds: -        return False - -    bot_msg_desc = message.embeds[0].description -    if bot_msg_desc is None: -        log.trace("Last message was a bot embed but it was empty.") -        return False -    return message.author == bot.instance.user and bot_msg_desc.strip() == description.strip() +    if message.author.id not in session_participants: +        session_participants.add(message.author.id) -async def _pin_wrapper(message: discord.Message, *, pin: bool) -> bool: -    """ -    Pin `message` if `pin` is True or unpin if it's False. - -    Return True if successful and False otherwise. -    """ -    channel_str = f"#{message.channel} ({message.channel.id})" -    func = message.pin if pin else message.unpin - -    try: -        await func() -    except discord.HTTPException as e: -        if e.code == 10008: -            log.debug(f"Message {message.id} in {channel_str} doesn't exist; can't {func.__name__}.") -        else: -            log.exception( -                f"Error {func.__name__}ning message {message.id} in {channel_str}: " -                f"{e.status} ({e.code})" +        embed = discord.Embed( +            title="Currently Helping", +            description=f"You're currently helping in {message.channel.mention}", +            color=constants.Colours.bright_green, +            timestamp=message.created_at, +        ) +        embed.add_field(name="Conversation", value=f"[Jump to message]({message.jump_url})") + +        try: +            await message.author.send(embed=embed) +        except discord.Forbidden: +            log.trace( +                f"Failed to send help dm message to {message.author.id}. DMs Closed/Blocked. " +                "Removing user from help dm." +            ) +            await _caches.help_dm.delete(message.author.id) +            bot_commands_channel = bot.instance.get_channel(constants.Channels.bot_commands) +            await bot_commands_channel.send( +                f"{message.author.mention} {constants.Emojis.cross_mark} " +                "To receive updates on help channels you're active in, enable your DMs.", +                delete_after=constants.RedirectOutput.delete_delay,              ) -        return False -    else: -        log.trace(f"{func.__name__.capitalize()}ned message {message.id} in {channel_str}.") -        return True +            return + +        await _caches.session_participants.set( +            message.channel.id, +            _serialise_session_participants(session_participants), +        ) diff --git a/bot/exts/help_channels/_name.py b/bot/exts/help_channels/_name.py deleted file mode 100644 index a9d9b2df1..000000000 --- a/bot/exts/help_channels/_name.py +++ /dev/null @@ -1,69 +0,0 @@ -import json -import typing as t -from collections import deque -from pathlib import Path - -import discord - -from bot import constants -from bot.exts.help_channels._channel import MAX_CHANNELS_PER_CATEGORY, get_category_channels -from bot.log import get_logger - -log = get_logger(__name__) - - -def create_name_queue(*categories: discord.CategoryChannel) -> deque: -    """ -    Return a queue of food names to use for creating new channels. - -    Skip names that are already in use by channels in `categories`. -    """ -    log.trace("Creating the food name queue.") - -    used_names = _get_used_names(*categories) - -    log.trace("Determining the available names.") -    available_names = (name for name in _get_names() if name not in used_names) - -    log.trace("Populating the name queue with names.") -    return deque(available_names) - - -def _get_names() -> t.List[str]: -    """ -    Return a truncated list of prefixed food names. - -    The amount of names is configured with `HelpChannels.max_total_channels`. -    The prefix is configured with `HelpChannels.name_prefix`. -    """ -    count = constants.HelpChannels.max_total_channels -    prefix = constants.HelpChannels.name_prefix - -    log.trace(f"Getting the first {count} food names from JSON.") - -    with Path("bot/resources/foods.json").open(encoding="utf-8") as foods_file: -        all_names = json.load(foods_file) - -    if prefix: -        return [prefix + name for name in all_names[:count]] -    else: -        return all_names[:count] - - -def _get_used_names(*categories: discord.CategoryChannel) -> t.Set[str]: -    """Return names which are already being used by channels in `categories`.""" -    log.trace("Getting channel names which are already being used.") - -    names = set() -    for cat in categories: -        for channel in get_category_channels(cat): -            names.add(channel.name) - -    if len(names) > MAX_CHANNELS_PER_CATEGORY: -        log.warning( -            f"Too many help channels ({len(names)}) already exist! " -            f"Discord only supports {MAX_CHANNELS_PER_CATEGORY} in a category." -        ) - -    log.trace(f"Got {len(names)} used names: {names}") -    return names diff --git a/bot/exts/help_channels/_stats.py b/bot/exts/help_channels/_stats.py index 4698c26de..6c05b4701 100644 --- a/bot/exts/help_channels/_stats.py +++ b/bot/exts/help_channels/_stats.py @@ -1,40 +1,44 @@ -from more_itertools import ilen +from enum import Enum + +import arrow +import discord  import bot  from bot import constants -from bot.exts.help_channels import _caches, _channel  from bot.log import get_logger  log = get_logger(__name__) -def report_counts() -> None: -    """Report channel count stats of each help category.""" -    for name in ("in_use", "available", "dormant"): -        id_ = getattr(constants.Categories, f"help_{name}") -        category = bot.instance.get_channel(id_) +class ClosingReason(Enum): +    """All possible closing reasons for help channels.""" + +    COMMAND = "command" +    INACTIVE = "auto.inactive" +    DELETED = "auto.deleted" +    CLEANUP = "auto.cleanup" + -        if category: -            total = ilen(_channel.get_category_channels(category)) -            bot.instance.stats.gauge(f"help.total.{name}", total) -        else: -            log.warning(f"Couldn't find category {name!r} to track channel count stats.") +def report_post_count() -> None: +    """Report post count stats of the help forum.""" +    help_forum = bot.instance.get_channel(constants.Channels.help_system_forum) +    bot.instance.stats.gauge("help_forum.total.in_use", len(help_forum.threads)) -async def report_complete_session(channel_id: int, closed_on: _channel.ClosingReason) -> None: +async def report_complete_session(help_session_post: discord.Thread, closed_on: ClosingReason) -> None:      """ -    Report stats for a completed help session channel `channel_id`. +    Report stats for a completed help session post `help_session_post`. -    `closed_on` is the reason why the channel was closed. See `_channel.ClosingReason` for possible reasons. +    `closed_on` is the reason why the post was closed. See `ClosingReason` for possible reasons.      """ -    bot.instance.stats.incr(f"help.dormant_calls.{closed_on.value}") +    bot.instance.stats.incr(f"help_forum.dormant_calls.{closed_on.value}") -    in_use_time = await _channel.get_in_use_time(channel_id) -    if in_use_time: -        bot.instance.stats.timing("help.in_use_time", in_use_time) +    open_time = discord.utils.snowflake_time(help_session_post.id) +    in_use_time = arrow.utcnow() - open_time +    bot.instance.stats.timing("help_forum.in_use_time", in_use_time) -    non_claimant_last_message_time = await _caches.non_claimant_last_message_times.get(channel_id) -    if non_claimant_last_message_time is None: -        bot.instance.stats.incr("help.sessions.unanswered") +    if set(help_session_post.members)-{help_session_post.owner_id} == set(): +        # Can't use len(help_session_post.members) as the claimant (owner) may have left the thread. +        bot.instance.stats.incr("help_forum.sessions.unanswered")      else: -        bot.instance.stats.incr("help.sessions.answered") +        bot.instance.stats.incr("help_forum.sessions.answered") | 
