diff options
| author | 2020-04-02 14:39:24 +0200 | |
|---|---|---|
| committer | 2020-04-02 14:39:24 +0200 | |
| commit | d77a2bbc50305d05371197f4cfe3354cfca4c627 (patch) | |
| tree | be1eed54972d9843f66114311f93b68b579046ac /bot/utils | |
| parent | Merge pull request #382 from ks129/game-fuzzy (diff) | |
| parent | Merge master: adjust `Space` cog location (diff) | |
Merge pull request #329 from python-discord/seasonal-purge
Deseasonify: Make all cogs available year-round, and manage only branding by season.
Diffstat (limited to 'bot/utils')
| -rw-r--r-- | bot/utils/__init__.py | 24 | ||||
| -rw-r--r-- | bot/utils/decorators.py | 320 | ||||
| -rw-r--r-- | bot/utils/exceptions.py | 4 | ||||
| -rw-r--r-- | bot/utils/pagination.py | 430 | ||||
| -rw-r--r-- | bot/utils/persist.py | 4 | 
5 files changed, 778 insertions, 4 deletions
diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py index 25fd4b96..35ef0a7b 100644 --- a/bot/utils/__init__.py +++ b/bot/utils/__init__.py @@ -2,12 +2,32 @@ import asyncio  import contextlib  import re  import string -from typing import List +from datetime import datetime +from typing import Iterable, List  import discord  from discord.ext.commands import BadArgument, Context -from bot.pagination import LinePaginator +from bot.constants import Client, Month +from bot.utils.pagination import LinePaginator + + +def human_months(months: Iterable[Month]) -> str: +    """Build a comma separated list of `months`.""" +    return ", ".join(str(m) for m in months) + + +def resolve_current_month() -> Month: +    """ +    Determine current month w.r.t. `Client.month_override` env var. + +    If the env variable was set, current month always resolves to the configured value. +    Otherwise, the current UTC month is given. +    """ +    if Client.month_override is not None: +        return Month(Client.month_override) +    else: +        return Month(datetime.utcnow().month)  async def disambiguate( diff --git a/bot/utils/decorators.py b/bot/utils/decorators.py new file mode 100644 index 00000000..519e61a9 --- /dev/null +++ b/bot/utils/decorators.py @@ -0,0 +1,320 @@ +import asyncio +import functools +import logging +import random +import typing as t +from asyncio import Lock +from functools import wraps +from weakref import WeakValueDictionary + +from discord import Colour, Embed +from discord.ext import commands +from discord.ext.commands import CheckFailure, Command, Context + +from bot.constants import Client, ERROR_REPLIES, Month +from bot.utils import human_months, resolve_current_month + +ONE_DAY = 24 * 60 * 60 + +log = logging.getLogger(__name__) + + +class InChannelCheckFailure(CheckFailure): +    """Check failure when the user runs a command in a non-whitelisted channel.""" + +    pass + + +class InMonthCheckFailure(CheckFailure): +    """Check failure for when a command is invoked outside of its allowed month.""" + +    pass + + +def seasonal_task(*allowed_months: Month, sleep_time: t.Union[float, int] = ONE_DAY) -> t.Callable: +    """ +    Perform the decorated method periodically in `allowed_months`. + +    This provides a convenience wrapper to avoid code repetition where some task shall +    perform an operation repeatedly in a constant interval, but only in specific months. + +    The decorated function will be called once every `sleep_time` seconds while +    the current UTC month is in `allowed_months`. Sleep time defaults to 24 hours. + +    The wrapped task is responsible for waiting for the bot to be ready, if necessary. +    """ +    def decorator(task_body: t.Callable) -> t.Callable: +        @functools.wraps(task_body) +        async def decorated_task(*args, **kwargs) -> None: +            """Call `task_body` once every `sleep_time` seconds in `allowed_months`.""" +            log.info(f"Starting seasonal task {task_body.__qualname__} ({human_months(allowed_months)})") + +            while True: +                current_month = resolve_current_month() + +                if current_month in allowed_months: +                    await task_body(*args, **kwargs) +                else: +                    log.debug(f"Seasonal task {task_body.__qualname__} sleeps in {current_month!s}") + +                await asyncio.sleep(sleep_time) +        return decorated_task +    return decorator + + +def in_month_listener(*allowed_months: Month) -> t.Callable: +    """ +    Shield a listener from being invoked outside of `allowed_months`. + +    The check is performed against current UTC month. +    """ +    def decorator(listener: t.Callable) -> t.Callable: +        @functools.wraps(listener) +        async def guarded_listener(*args, **kwargs) -> None: +            """Wrapped listener will abort if not in allowed month.""" +            current_month = resolve_current_month() + +            if current_month in allowed_months: +                # Propagate return value although it should always be None +                return await listener(*args, **kwargs) +            else: +                log.debug(f"Guarded {listener.__qualname__} from invoking in {current_month!s}") +        return guarded_listener +    return decorator + + +def in_month_command(*allowed_months: Month) -> t.Callable: +    """ +    Check whether the command was invoked in one of `enabled_months`. + +    Uses the current UTC month at the time of running the predicate. +    """ +    async def predicate(ctx: Context) -> bool: +        current_month = resolve_current_month() +        can_run = current_month in allowed_months + +        log.debug( +            f"Command '{ctx.command}' is locked to months {human_months(allowed_months)}. " +            f"Invoking it in month {current_month!s} is {'allowed' if can_run else 'disallowed'}." +        ) +        if can_run: +            return True +        else: +            raise InMonthCheckFailure(f"Command can only be used in {human_months(allowed_months)}") + +    return commands.check(predicate) + + +def in_month(*allowed_months: Month) -> t.Callable: +    """ +    Universal decorator for season-locking commands and listeners alike. + +    This only serves to determine whether the decorated callable is a command, +    a listener, or neither. It then delegates to either `in_month_command`, +    or `in_month_listener`, or raises TypeError, respectively. + +    Please note that in order for this decorator to correctly determine whether +    the decorated callable is a cmd or listener, it **has** to first be turned +    into one. This means that this decorator should always be placed **above** +    the d.py one that registers it as either. + +    This will decorate groups as well, as those subclass Command. In order to lock +    all subcommands of a group, its `invoke_without_command` param must **not** be +    manually set to True - this causes a circumvention of the group's callback +    and the seasonal check applied to it. +    """ +    def decorator(callable_: t.Callable) -> t.Callable: +        # Functions decorated as commands are turned into instances of `Command` +        if isinstance(callable_, Command): +            logging.debug(f"Command {callable_.qualified_name} will be locked to {human_months(allowed_months)}") +            actual_deco = in_month_command(*allowed_months) + +        # D.py will assign this attribute when `callable_` is registered as a listener +        elif hasattr(callable_, "__cog_listener__"): +            logging.debug(f"Listener {callable_.__qualname__} will be locked to {human_months(allowed_months)}") +            actual_deco = in_month_listener(*allowed_months) + +        # Otherwise we're unsure exactly what has been decorated +        # This happens before the bot starts, so let's just raise +        else: +            raise TypeError(f"Decorated object {callable_} is neither a command nor a listener") + +        return actual_deco(callable_) +    return decorator + + +def with_role(*role_ids: int) -> t.Callable: +    """Check to see whether the invoking user has any of the roles specified in role_ids.""" +    async def predicate(ctx: Context) -> bool: +        if not ctx.guild:  # Return False in a DM +            log.debug( +                f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. " +                "This command is restricted by the with_role decorator. Rejecting request." +            ) +            return False + +        for role in ctx.author.roles: +            if role.id in role_ids: +                log.debug(f"{ctx.author} has the '{role.name}' role, and passes the check.") +                return True + +        log.debug( +            f"{ctx.author} does not have the required role to use " +            f"the '{ctx.command.name}' command, so the request is rejected." +        ) +        return False +    return commands.check(predicate) + + +def without_role(*role_ids: int) -> t.Callable: +    """Check whether the invoking user does not have all of the roles specified in role_ids.""" +    async def predicate(ctx: Context) -> bool: +        if not ctx.guild:  # Return False in a DM +            log.debug( +                f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. " +                "This command is restricted by the without_role decorator. Rejecting request." +            ) +            return False + +        author_roles = [role.id for role in ctx.author.roles] +        check = all(role not in author_roles for role in role_ids) +        log.debug( +            f"{ctx.author} tried to call the '{ctx.command.name}' command. " +            f"The result of the without_role check was {check}." +        ) +        return check +    return commands.check(predicate) + + +def in_channel_check(*channels: int, bypass_roles: t.Container[int] = None) -> t.Callable[[Context], bool]: +    """ +    Checks that the message is in a whitelisted channel or optionally has a bypass role. + +    If `in_channel_override` is present, check if it contains channels +    and use them in place of the global whitelist. +    """ +    def predicate(ctx: Context) -> bool: +        if not ctx.guild: +            log.debug(f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM.") +            return True +        if ctx.channel.id in channels: +            log.debug( +                f"{ctx.author} tried to call the '{ctx.command.name}' command " +                f"and the command was used in a whitelisted channel." +            ) +            return True + +        if bypass_roles and any(r.id in bypass_roles for r in ctx.author.roles): +            log.debug( +                f"{ctx.author} called the '{ctx.command.name}' command and " +                f"had a role to bypass the in_channel check." +            ) +            return True + +        if hasattr(ctx.command.callback, "in_channel_override"): +            override = ctx.command.callback.in_channel_override +            if override is None: +                log.debug( +                    f"{ctx.author} called the '{ctx.command.name}' command " +                    f"and the command was whitelisted to bypass the in_channel check." +                ) +                return True +            else: +                if ctx.channel.id in override: +                    log.debug( +                        f"{ctx.author} tried to call the '{ctx.command.name}' command " +                        f"and the command was used in an overridden whitelisted channel." +                    ) +                    return True + +                log.debug( +                    f"{ctx.author} tried to call the '{ctx.command.name}' command. " +                    f"The overridden in_channel check failed." +                ) +                channels_str = ', '.join(f"<#{c_id}>" for c_id in override) +                raise InChannelCheckFailure( +                    f"Sorry, but you may only use this command within {channels_str}." +                ) + +        log.debug( +            f"{ctx.author} tried to call the '{ctx.command.name}' command. " +            f"The in_channel check failed." +        ) + +        channels_str = ', '.join(f"<#{c_id}>" for c_id in channels) +        raise InChannelCheckFailure( +            f"Sorry, but you may only use this command within {channels_str}." +        ) + +    return predicate + + +in_channel = commands.check(in_channel_check) + + +def override_in_channel(channels: t.Tuple[int] = None) -> t.Callable: +    """ +    Set command callback attribute for detection in `in_channel_check`. + +    Override global whitelist if channels are specified. + +    This decorator has to go before (below) below the `command` decorator. +    """ +    def inner(func: t.Callable) -> t.Callable: +        func.in_channel_override = channels +        return func + +    return inner + + +def locked() -> t.Union[t.Callable, None]: +    """ +    Allows the user to only run one instance of the decorated command at a time. + +    Subsequent calls to the command from the same author are ignored until the command has completed invocation. + +    This decorator has to go before (below) the `command` decorator. +    """ +    def wrap(func: t.Callable) -> t.Union[t.Callable, None]: +        func.__locks = WeakValueDictionary() + +        @wraps(func) +        async def inner(self: t.Callable, ctx: Context, *args, **kwargs) -> t.Union[t.Callable, None]: +            lock = func.__locks.setdefault(ctx.author.id, Lock()) +            if lock.locked(): +                embed = Embed() +                embed.colour = Colour.red() + +                log.debug(f"User tried to invoke a locked command.") +                embed.description = ( +                    "You're already using this command. Please wait until " +                    "it is done before you use it again." +                ) +                embed.title = random.choice(ERROR_REPLIES) +                await ctx.send(embed=embed) +                return + +            async with func.__locks.setdefault(ctx.author.id, Lock()): +                return await func(self, ctx, *args, **kwargs) +        return inner +    return wrap + + +def mock_in_debug(return_value: t.Any) -> t.Callable: +    """ +    Short-circuit function execution if in debug mode and return `return_value`. + +    The original function name, and the incoming args and kwargs are DEBUG level logged +    upon each call. This is useful for expensive operations, i.e. media asset uploads +    that are prone to rate-limits but need to be tested extensively. +    """ +    def decorator(func: t.Callable) -> t.Callable: +        @functools.wraps(func) +        async def wrapped(*args, **kwargs) -> t.Any: +            """Short-circuit and log if in debug mode.""" +            if Client.debug: +                log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}") +                return return_value +            return await func(*args, **kwargs) +        return wrapped +    return decorator diff --git a/bot/utils/exceptions.py b/bot/utils/exceptions.py new file mode 100644 index 00000000..70c20e12 --- /dev/null +++ b/bot/utils/exceptions.py @@ -0,0 +1,4 @@ +class BrandingError(Exception): +    """Exception raised by the BrandingManager cog.""" + +    pass diff --git a/bot/utils/pagination.py b/bot/utils/pagination.py new file mode 100644 index 00000000..9a7a0382 --- /dev/null +++ b/bot/utils/pagination.py @@ -0,0 +1,430 @@ +import asyncio +import logging +from typing import Iterable, List, Optional, Tuple + +from discord import Embed, Member, Reaction +from discord.abc import User +from discord.ext.commands import Context, Paginator + +from bot.constants import Emojis + +FIRST_EMOJI = "\u23EE"   # [:track_previous:] +LEFT_EMOJI = "\u2B05"    # [:arrow_left:] +RIGHT_EMOJI = "\u27A1"   # [:arrow_right:] +LAST_EMOJI = "\u23ED"    # [:track_next:] +DELETE_EMOJI = Emojis.trashcan  # [:trashcan:] + +PAGINATION_EMOJI = (FIRST_EMOJI, LEFT_EMOJI, RIGHT_EMOJI, LAST_EMOJI, DELETE_EMOJI) + +log = logging.getLogger(__name__) + + +class EmptyPaginatorEmbed(Exception): +    """Base Exception class for an empty paginator embed.""" + + +class LinePaginator(Paginator): +    """A class that aids in paginating code blocks for Discord messages.""" + +    def __init__(self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None): +        """ +        Overrides the Paginator.__init__ from inside discord.ext.commands. + +        `prefix` and `suffix` will be prepended and appended respectively to every page. + +        `max_size` and `max_lines` denote the maximum amount of codepoints and lines +        allowed per page. +        """ +        self.prefix = prefix +        self.suffix = suffix +        self.max_size = max_size - len(suffix) +        self.max_lines = max_lines +        self._current_page = [prefix] +        self._linecount = 0 +        self._count = len(prefix) + 1  # prefix + newline +        self._pages = [] + +    def add_line(self, line: str = '', *, empty: bool = False) -> None: +        """ +        Adds a line to the current page. + +        If the line exceeds the `max_size` then a RuntimeError is raised. + +        Overrides the Paginator.add_line from inside discord.ext.commands in order to allow +        configuration of the maximum number of lines per page. + +        If `empty` is True, an empty line will be placed after the a given `line`. +        """ +        if len(line) > self.max_size - len(self.prefix) - 2: +            raise RuntimeError('Line exceeds maximum page size %s' % (self.max_size - len(self.prefix) - 2)) + +        if self.max_lines is not None: +            if self._linecount >= self.max_lines: +                self._linecount = 0 +                self.close_page() + +            self._linecount += 1 +        if self._count + len(line) + 1 > self.max_size: +            self.close_page() + +        self._count += len(line) + 1 +        self._current_page.append(line) + +        if empty: +            self._current_page.append('') +            self._count += 1 + +    @classmethod +    async def paginate(cls, lines: Iterable[str], ctx: Context, embed: Embed, +                       prefix: str = "", suffix: str = "", max_lines: Optional[int] = None, +                       max_size: int = 500, empty: bool = True, restrict_to_user: User = None, +                       timeout: int = 300, footer_text: str = None, url: str = None, +                       exception_on_empty_embed: bool = False): +        """ +        Use a paginator and set of reactions to provide pagination over a set of lines. + +        The reactions are used to switch page, or to finish with pagination. + +        When used, this will send a message using `ctx.send()` and apply a set of reactions to it. +        These reactions may be used to change page, or to remove pagination from the message. + +        Pagination will also be removed automatically if no reaction is added for `timeout` seconds, +        defaulting to five minutes (300 seconds). + +        If `empty` is True, an empty line will be placed between each given line. + +        >>> embed = Embed() +        >>> embed.set_author(name="Some Operation", url=url, icon_url=icon) +        >>> await LinePaginator.paginate( +        ...     (line for line in lines), +        ...     ctx, embed +        ... ) +        """ +        def event_check(reaction_: Reaction, user_: Member) -> bool: +            """Make sure that this reaction is what we want to operate on.""" +            no_restrictions = ( +                # Pagination is not restricted +                not restrict_to_user +                # The reaction was by a whitelisted user +                or user_.id == restrict_to_user.id +            ) + +            return ( +                # Conditions for a successful pagination: +                all(( +                    # Reaction is on this message +                    reaction_.message.id == message.id, +                    # Reaction is one of the pagination emotes +                    str(reaction_.emoji) in PAGINATION_EMOJI,  # Note: DELETE_EMOJI is a string and not unicode +                    # Reaction was not made by the Bot +                    user_.id != ctx.bot.user.id, +                    # There were no restrictions +                    no_restrictions +                )) +            ) + +        paginator = cls(prefix=prefix, suffix=suffix, max_size=max_size, max_lines=max_lines) +        current_page = 0 + +        if not lines: +            if exception_on_empty_embed: +                log.exception(f"Pagination asked for empty lines iterable") +                raise EmptyPaginatorEmbed("No lines to paginate") + +            log.debug("No lines to add to paginator, adding '(nothing to display)' message") +            lines.append("(nothing to display)") + +        for line in lines: +            try: +                paginator.add_line(line, empty=empty) +            except Exception: +                log.exception(f"Failed to add line to paginator: '{line}'") +                raise  # Should propagate +            else: +                log.trace(f"Added line to paginator: '{line}'") + +        log.debug(f"Paginator created with {len(paginator.pages)} pages") + +        embed.description = paginator.pages[current_page] + +        if len(paginator.pages) <= 1: +            if footer_text: +                embed.set_footer(text=footer_text) +                log.trace(f"Setting embed footer to '{footer_text}'") + +            if url: +                embed.url = url +                log.trace(f"Setting embed url to '{url}'") + +            log.debug("There's less than two pages, so we won't paginate - sending single page on its own") +            return await ctx.send(embed=embed) +        else: +            if footer_text: +                embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})") +            else: +                embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") +            log.trace(f"Setting embed footer to '{embed.footer.text}'") + +            if url: +                embed.url = url +                log.trace(f"Setting embed url to '{url}'") + +            log.debug("Sending first page to channel...") +            message = await ctx.send(embed=embed) + +        log.debug("Adding emoji reactions to message...") + +        for emoji in PAGINATION_EMOJI: +            # Add all the applicable emoji to the message +            log.trace(f"Adding reaction: {repr(emoji)}") +            await message.add_reaction(emoji) + +        while True: +            try: +                reaction, user = await ctx.bot.wait_for("reaction_add", timeout=timeout, check=event_check) +                log.trace(f"Got reaction: {reaction}") +            except asyncio.TimeoutError: +                log.debug("Timed out waiting for a reaction") +                break  # We're done, no reactions for the last 5 minutes + +            if str(reaction.emoji) == DELETE_EMOJI:  # Note: DELETE_EMOJI is a string and not unicode +                log.debug("Got delete reaction") +                return await message.delete() + +            if reaction.emoji == FIRST_EMOJI: +                await message.remove_reaction(reaction.emoji, user) +                current_page = 0 + +                log.debug(f"Got first page reaction - changing to page 1/{len(paginator.pages)}") + +                embed.description = "" +                await message.edit(embed=embed) +                embed.description = paginator.pages[current_page] +                if footer_text: +                    embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})") +                else: +                    embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") +                await message.edit(embed=embed) + +            if reaction.emoji == LAST_EMOJI: +                await message.remove_reaction(reaction.emoji, user) +                current_page = len(paginator.pages) - 1 + +                log.debug(f"Got last page reaction - changing to page {current_page + 1}/{len(paginator.pages)}") + +                embed.description = "" +                await message.edit(embed=embed) +                embed.description = paginator.pages[current_page] +                if footer_text: +                    embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})") +                else: +                    embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") +                await message.edit(embed=embed) + +            if reaction.emoji == LEFT_EMOJI: +                await message.remove_reaction(reaction.emoji, user) + +                if current_page <= 0: +                    log.debug("Got previous page reaction, but we're on the first page - ignoring") +                    continue + +                current_page -= 1 +                log.debug(f"Got previous page reaction - changing to page {current_page + 1}/{len(paginator.pages)}") + +                embed.description = "" +                await message.edit(embed=embed) +                embed.description = paginator.pages[current_page] + +                if footer_text: +                    embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})") +                else: +                    embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") + +                await message.edit(embed=embed) + +            if reaction.emoji == RIGHT_EMOJI: +                await message.remove_reaction(reaction.emoji, user) + +                if current_page >= len(paginator.pages) - 1: +                    log.debug("Got next page reaction, but we're on the last page - ignoring") +                    continue + +                current_page += 1 +                log.debug(f"Got next page reaction - changing to page {current_page + 1}/{len(paginator.pages)}") + +                embed.description = "" +                await message.edit(embed=embed) +                embed.description = paginator.pages[current_page] + +                if footer_text: +                    embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})") +                else: +                    embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") + +                await message.edit(embed=embed) + +        log.debug("Ending pagination and clearing reactions...") +        await message.clear_reactions() + + +class ImagePaginator(Paginator): +    """ +    Helper class that paginates images for embeds in messages. + +    Close resemblance to LinePaginator, except focuses on images over text. + +    Refer to ImagePaginator.paginate for documentation on how to use. +    """ + +    def __init__(self, prefix: str = "", suffix: str = ""): +        super().__init__(prefix, suffix) +        self._current_page = [prefix] +        self.images = [] +        self._pages = [] + +    def add_line(self, line: str = '', *, empty: bool = False) -> None: +        """ +        Adds a line to each page, usually just 1 line in this context. + +        If `empty` is True, an empty line will be placed after a given `line`. +        """ +        if line: +            self._count = len(line) +        else: +            self._count = 0 +        self._current_page.append(line) +        self.close_page() + +    def add_image(self, image: str = None) -> None: +        """Adds an image to a page given the url.""" +        self.images.append(image) + +    @classmethod +    async def paginate(cls, pages: List[Tuple[str, str]], ctx: Context, embed: Embed, +                       prefix: str = "", suffix: str = "", timeout: int = 300, +                       exception_on_empty_embed: bool = False): +        """ +        Use a paginator and set of reactions to provide pagination over a set of title/image pairs. + +        `pages` is a list of tuples of page title/image url pairs. +        `prefix` and `suffix` will be prepended and appended respectively to the message. + +        When used, this will send a message using `ctx.send()` and apply a set of reactions to it. +        These reactions may be used to change page, or to remove pagination from the message. + +        Note: Pagination will be removed automatically if no reaction is added for `timeout` seconds, +              defaulting to five minutes (300 seconds). + +        >>> embed = Embed() +        >>> embed.set_author(name="Some Operation", url=url, icon_url=icon) +        >>> await ImagePaginator.paginate(pages, ctx, embed) +        """ +        def check_event(reaction_: Reaction, member: Member) -> bool: +            """Checks each reaction added, if it matches our conditions pass the wait_for.""" +            return all(( +                # Reaction is on the same message sent +                reaction_.message.id == message.id, +                # The reaction is part of the navigation menu +                str(reaction_.emoji) in PAGINATION_EMOJI,  # Note: DELETE_EMOJI is a string and not unicode +                # The reactor is not a bot +                not member.bot +            )) + +        paginator = cls(prefix=prefix, suffix=suffix) +        current_page = 0 + +        if not pages: +            if exception_on_empty_embed: +                log.exception(f"Pagination asked for empty image list") +                raise EmptyPaginatorEmbed("No images to paginate") + +            log.debug("No images to add to paginator, adding '(no images to display)' message") +            pages.append(("(no images to display)", "")) + +        for text, image_url in pages: +            paginator.add_line(text) +            paginator.add_image(image_url) + +        embed.description = paginator.pages[current_page] +        image = paginator.images[current_page] + +        if image: +            embed.set_image(url=image) + +        if len(paginator.pages) <= 1: +            return await ctx.send(embed=embed) + +        embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") +        message = await ctx.send(embed=embed) + +        for emoji in PAGINATION_EMOJI: +            await message.add_reaction(emoji) + +        while True: +            # Start waiting for reactions +            try: +                reaction, user = await ctx.bot.wait_for("reaction_add", timeout=timeout, check=check_event) +            except asyncio.TimeoutError: +                log.debug("Timed out waiting for a reaction") +                break  # We're done, no reactions for the last 5 minutes + +            # Deletes the users reaction +            await message.remove_reaction(reaction.emoji, user) + +            # Delete reaction press - [:trashcan:] +            if str(reaction.emoji) == DELETE_EMOJI:  # Note: DELETE_EMOJI is a string and not unicode +                log.debug("Got delete reaction") +                return await message.delete() + +            # First reaction press - [:track_previous:] +            if reaction.emoji == FIRST_EMOJI: +                if current_page == 0: +                    log.debug("Got first page reaction, but we're on the first page - ignoring") +                    continue + +                current_page = 0 +                reaction_type = "first" + +            # Last reaction press - [:track_next:] +            if reaction.emoji == LAST_EMOJI: +                if current_page >= len(paginator.pages) - 1: +                    log.debug("Got last page reaction, but we're on the last page - ignoring") +                    continue + +                current_page = len(paginator.pages) - 1 +                reaction_type = "last" + +            # Previous reaction press - [:arrow_left: ] +            if reaction.emoji == LEFT_EMOJI: +                if current_page <= 0: +                    log.debug("Got previous page reaction, but we're on the first page - ignoring") +                    continue + +                current_page -= 1 +                reaction_type = "previous" + +            # Next reaction press - [:arrow_right:] +            if reaction.emoji == RIGHT_EMOJI: +                if current_page >= len(paginator.pages) - 1: +                    log.debug("Got next page reaction, but we're on the last page - ignoring") +                    continue + +                current_page += 1 +                reaction_type = "next" + +            # Magic happens here, after page and reaction_type is set +            embed.description = "" +            await message.edit(embed=embed) +            embed.description = paginator.pages[current_page] + +            image = paginator.images[current_page] +            if image: +                embed.set_image(url=image) + +            embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") +            log.debug(f"Got {reaction_type} page reaction - changing to page {current_page + 1}/{len(paginator.pages)}") + +            await message.edit(embed=embed) + +        log.debug("Ending pagination and clearing reactions...") +        await message.clear_reactions() diff --git a/bot/utils/persist.py b/bot/utils/persist.py index a60a1219..d78e5420 100644 --- a/bot/utils/persist.py +++ b/bot/utils/persist.py @@ -2,7 +2,7 @@ import sqlite3  from pathlib import Path  from shutil import copyfile -from bot.seasons.season import get_seasons +from bot.exts import get_package_names  DIRECTORY = Path("data")  # directory that has a persistent volume mapped to it @@ -41,7 +41,7 @@ def make_persistent(file_path: Path) -> Path:          raise OSError(f"File not found at {file_path}.")      # detect season in datafile path for assigning to subdirectory -    season = next((s for s in get_seasons() if s in file_path.parts), None) +    season = next((s for s in get_package_names() if s in file_path.parts), None)      if season:          # make sure subdirectory exists first  |