aboutsummaryrefslogtreecommitdiffstats
path: root/bot/utils
diff options
context:
space:
mode:
authorGravatar Eivind Teig <[email protected]>2020-04-02 14:39:24 +0200
committerGravatar GitHub <[email protected]>2020-04-02 14:39:24 +0200
commitd77a2bbc50305d05371197f4cfe3354cfca4c627 (patch)
treebe1eed54972d9843f66114311f93b68b579046ac /bot/utils
parentMerge pull request #382 from ks129/game-fuzzy (diff)
parentMerge master: adjust `Space` cog location (diff)
Merge pull request #329 from python-discord/seasonal-purge
Deseasonify: Make all cogs available year-round, and manage only branding by season.
Diffstat (limited to 'bot/utils')
-rw-r--r--bot/utils/__init__.py24
-rw-r--r--bot/utils/decorators.py320
-rw-r--r--bot/utils/exceptions.py4
-rw-r--r--bot/utils/pagination.py430
-rw-r--r--bot/utils/persist.py4
5 files changed, 778 insertions, 4 deletions
diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py
index 25fd4b96..35ef0a7b 100644
--- a/bot/utils/__init__.py
+++ b/bot/utils/__init__.py
@@ -2,12 +2,32 @@ import asyncio
import contextlib
import re
import string
-from typing import List
+from datetime import datetime
+from typing import Iterable, List
import discord
from discord.ext.commands import BadArgument, Context
-from bot.pagination import LinePaginator
+from bot.constants import Client, Month
+from bot.utils.pagination import LinePaginator
+
+
+def human_months(months: Iterable[Month]) -> str:
+ """Build a comma separated list of `months`."""
+ return ", ".join(str(m) for m in months)
+
+
+def resolve_current_month() -> Month:
+ """
+ Determine current month w.r.t. `Client.month_override` env var.
+
+ If the env variable was set, current month always resolves to the configured value.
+ Otherwise, the current UTC month is given.
+ """
+ if Client.month_override is not None:
+ return Month(Client.month_override)
+ else:
+ return Month(datetime.utcnow().month)
async def disambiguate(
diff --git a/bot/utils/decorators.py b/bot/utils/decorators.py
new file mode 100644
index 00000000..519e61a9
--- /dev/null
+++ b/bot/utils/decorators.py
@@ -0,0 +1,320 @@
+import asyncio
+import functools
+import logging
+import random
+import typing as t
+from asyncio import Lock
+from functools import wraps
+from weakref import WeakValueDictionary
+
+from discord import Colour, Embed
+from discord.ext import commands
+from discord.ext.commands import CheckFailure, Command, Context
+
+from bot.constants import Client, ERROR_REPLIES, Month
+from bot.utils import human_months, resolve_current_month
+
+ONE_DAY = 24 * 60 * 60
+
+log = logging.getLogger(__name__)
+
+
+class InChannelCheckFailure(CheckFailure):
+ """Check failure when the user runs a command in a non-whitelisted channel."""
+
+ pass
+
+
+class InMonthCheckFailure(CheckFailure):
+ """Check failure for when a command is invoked outside of its allowed month."""
+
+ pass
+
+
+def seasonal_task(*allowed_months: Month, sleep_time: t.Union[float, int] = ONE_DAY) -> t.Callable:
+ """
+ Perform the decorated method periodically in `allowed_months`.
+
+ This provides a convenience wrapper to avoid code repetition where some task shall
+ perform an operation repeatedly in a constant interval, but only in specific months.
+
+ The decorated function will be called once every `sleep_time` seconds while
+ the current UTC month is in `allowed_months`. Sleep time defaults to 24 hours.
+
+ The wrapped task is responsible for waiting for the bot to be ready, if necessary.
+ """
+ def decorator(task_body: t.Callable) -> t.Callable:
+ @functools.wraps(task_body)
+ async def decorated_task(*args, **kwargs) -> None:
+ """Call `task_body` once every `sleep_time` seconds in `allowed_months`."""
+ log.info(f"Starting seasonal task {task_body.__qualname__} ({human_months(allowed_months)})")
+
+ while True:
+ current_month = resolve_current_month()
+
+ if current_month in allowed_months:
+ await task_body(*args, **kwargs)
+ else:
+ log.debug(f"Seasonal task {task_body.__qualname__} sleeps in {current_month!s}")
+
+ await asyncio.sleep(sleep_time)
+ return decorated_task
+ return decorator
+
+
+def in_month_listener(*allowed_months: Month) -> t.Callable:
+ """
+ Shield a listener from being invoked outside of `allowed_months`.
+
+ The check is performed against current UTC month.
+ """
+ def decorator(listener: t.Callable) -> t.Callable:
+ @functools.wraps(listener)
+ async def guarded_listener(*args, **kwargs) -> None:
+ """Wrapped listener will abort if not in allowed month."""
+ current_month = resolve_current_month()
+
+ if current_month in allowed_months:
+ # Propagate return value although it should always be None
+ return await listener(*args, **kwargs)
+ else:
+ log.debug(f"Guarded {listener.__qualname__} from invoking in {current_month!s}")
+ return guarded_listener
+ return decorator
+
+
+def in_month_command(*allowed_months: Month) -> t.Callable:
+ """
+ Check whether the command was invoked in one of `enabled_months`.
+
+ Uses the current UTC month at the time of running the predicate.
+ """
+ async def predicate(ctx: Context) -> bool:
+ current_month = resolve_current_month()
+ can_run = current_month in allowed_months
+
+ log.debug(
+ f"Command '{ctx.command}' is locked to months {human_months(allowed_months)}. "
+ f"Invoking it in month {current_month!s} is {'allowed' if can_run else 'disallowed'}."
+ )
+ if can_run:
+ return True
+ else:
+ raise InMonthCheckFailure(f"Command can only be used in {human_months(allowed_months)}")
+
+ return commands.check(predicate)
+
+
+def in_month(*allowed_months: Month) -> t.Callable:
+ """
+ Universal decorator for season-locking commands and listeners alike.
+
+ This only serves to determine whether the decorated callable is a command,
+ a listener, or neither. It then delegates to either `in_month_command`,
+ or `in_month_listener`, or raises TypeError, respectively.
+
+ Please note that in order for this decorator to correctly determine whether
+ the decorated callable is a cmd or listener, it **has** to first be turned
+ into one. This means that this decorator should always be placed **above**
+ the d.py one that registers it as either.
+
+ This will decorate groups as well, as those subclass Command. In order to lock
+ all subcommands of a group, its `invoke_without_command` param must **not** be
+ manually set to True - this causes a circumvention of the group's callback
+ and the seasonal check applied to it.
+ """
+ def decorator(callable_: t.Callable) -> t.Callable:
+ # Functions decorated as commands are turned into instances of `Command`
+ if isinstance(callable_, Command):
+ logging.debug(f"Command {callable_.qualified_name} will be locked to {human_months(allowed_months)}")
+ actual_deco = in_month_command(*allowed_months)
+
+ # D.py will assign this attribute when `callable_` is registered as a listener
+ elif hasattr(callable_, "__cog_listener__"):
+ logging.debug(f"Listener {callable_.__qualname__} will be locked to {human_months(allowed_months)}")
+ actual_deco = in_month_listener(*allowed_months)
+
+ # Otherwise we're unsure exactly what has been decorated
+ # This happens before the bot starts, so let's just raise
+ else:
+ raise TypeError(f"Decorated object {callable_} is neither a command nor a listener")
+
+ return actual_deco(callable_)
+ return decorator
+
+
+def with_role(*role_ids: int) -> t.Callable:
+ """Check to see whether the invoking user has any of the roles specified in role_ids."""
+ async def predicate(ctx: Context) -> bool:
+ if not ctx.guild: # Return False in a DM
+ log.debug(
+ f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. "
+ "This command is restricted by the with_role decorator. Rejecting request."
+ )
+ return False
+
+ for role in ctx.author.roles:
+ if role.id in role_ids:
+ log.debug(f"{ctx.author} has the '{role.name}' role, and passes the check.")
+ return True
+
+ log.debug(
+ f"{ctx.author} does not have the required role to use "
+ f"the '{ctx.command.name}' command, so the request is rejected."
+ )
+ return False
+ return commands.check(predicate)
+
+
+def without_role(*role_ids: int) -> t.Callable:
+ """Check whether the invoking user does not have all of the roles specified in role_ids."""
+ async def predicate(ctx: Context) -> bool:
+ if not ctx.guild: # Return False in a DM
+ log.debug(
+ f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. "
+ "This command is restricted by the without_role decorator. Rejecting request."
+ )
+ return False
+
+ author_roles = [role.id for role in ctx.author.roles]
+ check = all(role not in author_roles for role in role_ids)
+ log.debug(
+ f"{ctx.author} tried to call the '{ctx.command.name}' command. "
+ f"The result of the without_role check was {check}."
+ )
+ return check
+ return commands.check(predicate)
+
+
+def in_channel_check(*channels: int, bypass_roles: t.Container[int] = None) -> t.Callable[[Context], bool]:
+ """
+ Checks that the message is in a whitelisted channel or optionally has a bypass role.
+
+ If `in_channel_override` is present, check if it contains channels
+ and use them in place of the global whitelist.
+ """
+ def predicate(ctx: Context) -> bool:
+ if not ctx.guild:
+ log.debug(f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM.")
+ return True
+ if ctx.channel.id in channels:
+ log.debug(
+ f"{ctx.author} tried to call the '{ctx.command.name}' command "
+ f"and the command was used in a whitelisted channel."
+ )
+ return True
+
+ if bypass_roles and any(r.id in bypass_roles for r in ctx.author.roles):
+ log.debug(
+ f"{ctx.author} called the '{ctx.command.name}' command and "
+ f"had a role to bypass the in_channel check."
+ )
+ return True
+
+ if hasattr(ctx.command.callback, "in_channel_override"):
+ override = ctx.command.callback.in_channel_override
+ if override is None:
+ log.debug(
+ f"{ctx.author} called the '{ctx.command.name}' command "
+ f"and the command was whitelisted to bypass the in_channel check."
+ )
+ return True
+ else:
+ if ctx.channel.id in override:
+ log.debug(
+ f"{ctx.author} tried to call the '{ctx.command.name}' command "
+ f"and the command was used in an overridden whitelisted channel."
+ )
+ return True
+
+ log.debug(
+ f"{ctx.author} tried to call the '{ctx.command.name}' command. "
+ f"The overridden in_channel check failed."
+ )
+ channels_str = ', '.join(f"<#{c_id}>" for c_id in override)
+ raise InChannelCheckFailure(
+ f"Sorry, but you may only use this command within {channels_str}."
+ )
+
+ log.debug(
+ f"{ctx.author} tried to call the '{ctx.command.name}' command. "
+ f"The in_channel check failed."
+ )
+
+ channels_str = ', '.join(f"<#{c_id}>" for c_id in channels)
+ raise InChannelCheckFailure(
+ f"Sorry, but you may only use this command within {channels_str}."
+ )
+
+ return predicate
+
+
+in_channel = commands.check(in_channel_check)
+
+
+def override_in_channel(channels: t.Tuple[int] = None) -> t.Callable:
+ """
+ Set command callback attribute for detection in `in_channel_check`.
+
+ Override global whitelist if channels are specified.
+
+ This decorator has to go before (below) below the `command` decorator.
+ """
+ def inner(func: t.Callable) -> t.Callable:
+ func.in_channel_override = channels
+ return func
+
+ return inner
+
+
+def locked() -> t.Union[t.Callable, None]:
+ """
+ Allows the user to only run one instance of the decorated command at a time.
+
+ Subsequent calls to the command from the same author are ignored until the command has completed invocation.
+
+ This decorator has to go before (below) the `command` decorator.
+ """
+ def wrap(func: t.Callable) -> t.Union[t.Callable, None]:
+ func.__locks = WeakValueDictionary()
+
+ @wraps(func)
+ async def inner(self: t.Callable, ctx: Context, *args, **kwargs) -> t.Union[t.Callable, None]:
+ lock = func.__locks.setdefault(ctx.author.id, Lock())
+ if lock.locked():
+ embed = Embed()
+ embed.colour = Colour.red()
+
+ log.debug(f"User tried to invoke a locked command.")
+ embed.description = (
+ "You're already using this command. Please wait until "
+ "it is done before you use it again."
+ )
+ embed.title = random.choice(ERROR_REPLIES)
+ await ctx.send(embed=embed)
+ return
+
+ async with func.__locks.setdefault(ctx.author.id, Lock()):
+ return await func(self, ctx, *args, **kwargs)
+ return inner
+ return wrap
+
+
+def mock_in_debug(return_value: t.Any) -> t.Callable:
+ """
+ Short-circuit function execution if in debug mode and return `return_value`.
+
+ The original function name, and the incoming args and kwargs are DEBUG level logged
+ upon each call. This is useful for expensive operations, i.e. media asset uploads
+ that are prone to rate-limits but need to be tested extensively.
+ """
+ def decorator(func: t.Callable) -> t.Callable:
+ @functools.wraps(func)
+ async def wrapped(*args, **kwargs) -> t.Any:
+ """Short-circuit and log if in debug mode."""
+ if Client.debug:
+ log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}")
+ return return_value
+ return await func(*args, **kwargs)
+ return wrapped
+ return decorator
diff --git a/bot/utils/exceptions.py b/bot/utils/exceptions.py
new file mode 100644
index 00000000..70c20e12
--- /dev/null
+++ b/bot/utils/exceptions.py
@@ -0,0 +1,4 @@
+class BrandingError(Exception):
+ """Exception raised by the BrandingManager cog."""
+
+ pass
diff --git a/bot/utils/pagination.py b/bot/utils/pagination.py
new file mode 100644
index 00000000..9a7a0382
--- /dev/null
+++ b/bot/utils/pagination.py
@@ -0,0 +1,430 @@
+import asyncio
+import logging
+from typing import Iterable, List, Optional, Tuple
+
+from discord import Embed, Member, Reaction
+from discord.abc import User
+from discord.ext.commands import Context, Paginator
+
+from bot.constants import Emojis
+
+FIRST_EMOJI = "\u23EE" # [:track_previous:]
+LEFT_EMOJI = "\u2B05" # [:arrow_left:]
+RIGHT_EMOJI = "\u27A1" # [:arrow_right:]
+LAST_EMOJI = "\u23ED" # [:track_next:]
+DELETE_EMOJI = Emojis.trashcan # [:trashcan:]
+
+PAGINATION_EMOJI = (FIRST_EMOJI, LEFT_EMOJI, RIGHT_EMOJI, LAST_EMOJI, DELETE_EMOJI)
+
+log = logging.getLogger(__name__)
+
+
+class EmptyPaginatorEmbed(Exception):
+ """Base Exception class for an empty paginator embed."""
+
+
+class LinePaginator(Paginator):
+ """A class that aids in paginating code blocks for Discord messages."""
+
+ def __init__(self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None):
+ """
+ Overrides the Paginator.__init__ from inside discord.ext.commands.
+
+ `prefix` and `suffix` will be prepended and appended respectively to every page.
+
+ `max_size` and `max_lines` denote the maximum amount of codepoints and lines
+ allowed per page.
+ """
+ self.prefix = prefix
+ self.suffix = suffix
+ self.max_size = max_size - len(suffix)
+ self.max_lines = max_lines
+ self._current_page = [prefix]
+ self._linecount = 0
+ self._count = len(prefix) + 1 # prefix + newline
+ self._pages = []
+
+ def add_line(self, line: str = '', *, empty: bool = False) -> None:
+ """
+ Adds a line to the current page.
+
+ If the line exceeds the `max_size` then a RuntimeError is raised.
+
+ Overrides the Paginator.add_line from inside discord.ext.commands in order to allow
+ configuration of the maximum number of lines per page.
+
+ If `empty` is True, an empty line will be placed after the a given `line`.
+ """
+ if len(line) > self.max_size - len(self.prefix) - 2:
+ raise RuntimeError('Line exceeds maximum page size %s' % (self.max_size - len(self.prefix) - 2))
+
+ if self.max_lines is not None:
+ if self._linecount >= self.max_lines:
+ self._linecount = 0
+ self.close_page()
+
+ self._linecount += 1
+ if self._count + len(line) + 1 > self.max_size:
+ self.close_page()
+
+ self._count += len(line) + 1
+ self._current_page.append(line)
+
+ if empty:
+ self._current_page.append('')
+ self._count += 1
+
+ @classmethod
+ async def paginate(cls, lines: Iterable[str], ctx: Context, embed: Embed,
+ prefix: str = "", suffix: str = "", max_lines: Optional[int] = None,
+ max_size: int = 500, empty: bool = True, restrict_to_user: User = None,
+ timeout: int = 300, footer_text: str = None, url: str = None,
+ exception_on_empty_embed: bool = False):
+ """
+ Use a paginator and set of reactions to provide pagination over a set of lines.
+
+ The reactions are used to switch page, or to finish with pagination.
+
+ When used, this will send a message using `ctx.send()` and apply a set of reactions to it.
+ These reactions may be used to change page, or to remove pagination from the message.
+
+ Pagination will also be removed automatically if no reaction is added for `timeout` seconds,
+ defaulting to five minutes (300 seconds).
+
+ If `empty` is True, an empty line will be placed between each given line.
+
+ >>> embed = Embed()
+ >>> embed.set_author(name="Some Operation", url=url, icon_url=icon)
+ >>> await LinePaginator.paginate(
+ ... (line for line in lines),
+ ... ctx, embed
+ ... )
+ """
+ def event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ no_restrictions = (
+ # Pagination is not restricted
+ not restrict_to_user
+ # The reaction was by a whitelisted user
+ or user_.id == restrict_to_user.id
+ )
+
+ return (
+ # Conditions for a successful pagination:
+ all((
+ # Reaction is on this message
+ reaction_.message.id == message.id,
+ # Reaction is one of the pagination emotes
+ str(reaction_.emoji) in PAGINATION_EMOJI, # Note: DELETE_EMOJI is a string and not unicode
+ # Reaction was not made by the Bot
+ user_.id != ctx.bot.user.id,
+ # There were no restrictions
+ no_restrictions
+ ))
+ )
+
+ paginator = cls(prefix=prefix, suffix=suffix, max_size=max_size, max_lines=max_lines)
+ current_page = 0
+
+ if not lines:
+ if exception_on_empty_embed:
+ log.exception(f"Pagination asked for empty lines iterable")
+ raise EmptyPaginatorEmbed("No lines to paginate")
+
+ log.debug("No lines to add to paginator, adding '(nothing to display)' message")
+ lines.append("(nothing to display)")
+
+ for line in lines:
+ try:
+ paginator.add_line(line, empty=empty)
+ except Exception:
+ log.exception(f"Failed to add line to paginator: '{line}'")
+ raise # Should propagate
+ else:
+ log.trace(f"Added line to paginator: '{line}'")
+
+ log.debug(f"Paginator created with {len(paginator.pages)} pages")
+
+ embed.description = paginator.pages[current_page]
+
+ if len(paginator.pages) <= 1:
+ if footer_text:
+ embed.set_footer(text=footer_text)
+ log.trace(f"Setting embed footer to '{footer_text}'")
+
+ if url:
+ embed.url = url
+ log.trace(f"Setting embed url to '{url}'")
+
+ log.debug("There's less than two pages, so we won't paginate - sending single page on its own")
+ return await ctx.send(embed=embed)
+ else:
+ if footer_text:
+ embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})")
+ else:
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+ log.trace(f"Setting embed footer to '{embed.footer.text}'")
+
+ if url:
+ embed.url = url
+ log.trace(f"Setting embed url to '{url}'")
+
+ log.debug("Sending first page to channel...")
+ message = await ctx.send(embed=embed)
+
+ log.debug("Adding emoji reactions to message...")
+
+ for emoji in PAGINATION_EMOJI:
+ # Add all the applicable emoji to the message
+ log.trace(f"Adding reaction: {repr(emoji)}")
+ await message.add_reaction(emoji)
+
+ while True:
+ try:
+ reaction, user = await ctx.bot.wait_for("reaction_add", timeout=timeout, check=event_check)
+ log.trace(f"Got reaction: {reaction}")
+ except asyncio.TimeoutError:
+ log.debug("Timed out waiting for a reaction")
+ break # We're done, no reactions for the last 5 minutes
+
+ if str(reaction.emoji) == DELETE_EMOJI: # Note: DELETE_EMOJI is a string and not unicode
+ log.debug("Got delete reaction")
+ return await message.delete()
+
+ if reaction.emoji == FIRST_EMOJI:
+ await message.remove_reaction(reaction.emoji, user)
+ current_page = 0
+
+ log.debug(f"Got first page reaction - changing to page 1/{len(paginator.pages)}")
+
+ embed.description = ""
+ await message.edit(embed=embed)
+ embed.description = paginator.pages[current_page]
+ if footer_text:
+ embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})")
+ else:
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+ await message.edit(embed=embed)
+
+ if reaction.emoji == LAST_EMOJI:
+ await message.remove_reaction(reaction.emoji, user)
+ current_page = len(paginator.pages) - 1
+
+ log.debug(f"Got last page reaction - changing to page {current_page + 1}/{len(paginator.pages)}")
+
+ embed.description = ""
+ await message.edit(embed=embed)
+ embed.description = paginator.pages[current_page]
+ if footer_text:
+ embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})")
+ else:
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+ await message.edit(embed=embed)
+
+ if reaction.emoji == LEFT_EMOJI:
+ await message.remove_reaction(reaction.emoji, user)
+
+ if current_page <= 0:
+ log.debug("Got previous page reaction, but we're on the first page - ignoring")
+ continue
+
+ current_page -= 1
+ log.debug(f"Got previous page reaction - changing to page {current_page + 1}/{len(paginator.pages)}")
+
+ embed.description = ""
+ await message.edit(embed=embed)
+ embed.description = paginator.pages[current_page]
+
+ if footer_text:
+ embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})")
+ else:
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+
+ await message.edit(embed=embed)
+
+ if reaction.emoji == RIGHT_EMOJI:
+ await message.remove_reaction(reaction.emoji, user)
+
+ if current_page >= len(paginator.pages) - 1:
+ log.debug("Got next page reaction, but we're on the last page - ignoring")
+ continue
+
+ current_page += 1
+ log.debug(f"Got next page reaction - changing to page {current_page + 1}/{len(paginator.pages)}")
+
+ embed.description = ""
+ await message.edit(embed=embed)
+ embed.description = paginator.pages[current_page]
+
+ if footer_text:
+ embed.set_footer(text=f"{footer_text} (Page {current_page + 1}/{len(paginator.pages)})")
+ else:
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+
+ await message.edit(embed=embed)
+
+ log.debug("Ending pagination and clearing reactions...")
+ await message.clear_reactions()
+
+
+class ImagePaginator(Paginator):
+ """
+ Helper class that paginates images for embeds in messages.
+
+ Close resemblance to LinePaginator, except focuses on images over text.
+
+ Refer to ImagePaginator.paginate for documentation on how to use.
+ """
+
+ def __init__(self, prefix: str = "", suffix: str = ""):
+ super().__init__(prefix, suffix)
+ self._current_page = [prefix]
+ self.images = []
+ self._pages = []
+
+ def add_line(self, line: str = '', *, empty: bool = False) -> None:
+ """
+ Adds a line to each page, usually just 1 line in this context.
+
+ If `empty` is True, an empty line will be placed after a given `line`.
+ """
+ if line:
+ self._count = len(line)
+ else:
+ self._count = 0
+ self._current_page.append(line)
+ self.close_page()
+
+ def add_image(self, image: str = None) -> None:
+ """Adds an image to a page given the url."""
+ self.images.append(image)
+
+ @classmethod
+ async def paginate(cls, pages: List[Tuple[str, str]], ctx: Context, embed: Embed,
+ prefix: str = "", suffix: str = "", timeout: int = 300,
+ exception_on_empty_embed: bool = False):
+ """
+ Use a paginator and set of reactions to provide pagination over a set of title/image pairs.
+
+ `pages` is a list of tuples of page title/image url pairs.
+ `prefix` and `suffix` will be prepended and appended respectively to the message.
+
+ When used, this will send a message using `ctx.send()` and apply a set of reactions to it.
+ These reactions may be used to change page, or to remove pagination from the message.
+
+ Note: Pagination will be removed automatically if no reaction is added for `timeout` seconds,
+ defaulting to five minutes (300 seconds).
+
+ >>> embed = Embed()
+ >>> embed.set_author(name="Some Operation", url=url, icon_url=icon)
+ >>> await ImagePaginator.paginate(pages, ctx, embed)
+ """
+ def check_event(reaction_: Reaction, member: Member) -> bool:
+ """Checks each reaction added, if it matches our conditions pass the wait_for."""
+ return all((
+ # Reaction is on the same message sent
+ reaction_.message.id == message.id,
+ # The reaction is part of the navigation menu
+ str(reaction_.emoji) in PAGINATION_EMOJI, # Note: DELETE_EMOJI is a string and not unicode
+ # The reactor is not a bot
+ not member.bot
+ ))
+
+ paginator = cls(prefix=prefix, suffix=suffix)
+ current_page = 0
+
+ if not pages:
+ if exception_on_empty_embed:
+ log.exception(f"Pagination asked for empty image list")
+ raise EmptyPaginatorEmbed("No images to paginate")
+
+ log.debug("No images to add to paginator, adding '(no images to display)' message")
+ pages.append(("(no images to display)", ""))
+
+ for text, image_url in pages:
+ paginator.add_line(text)
+ paginator.add_image(image_url)
+
+ embed.description = paginator.pages[current_page]
+ image = paginator.images[current_page]
+
+ if image:
+ embed.set_image(url=image)
+
+ if len(paginator.pages) <= 1:
+ return await ctx.send(embed=embed)
+
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+ message = await ctx.send(embed=embed)
+
+ for emoji in PAGINATION_EMOJI:
+ await message.add_reaction(emoji)
+
+ while True:
+ # Start waiting for reactions
+ try:
+ reaction, user = await ctx.bot.wait_for("reaction_add", timeout=timeout, check=check_event)
+ except asyncio.TimeoutError:
+ log.debug("Timed out waiting for a reaction")
+ break # We're done, no reactions for the last 5 minutes
+
+ # Deletes the users reaction
+ await message.remove_reaction(reaction.emoji, user)
+
+ # Delete reaction press - [:trashcan:]
+ if str(reaction.emoji) == DELETE_EMOJI: # Note: DELETE_EMOJI is a string and not unicode
+ log.debug("Got delete reaction")
+ return await message.delete()
+
+ # First reaction press - [:track_previous:]
+ if reaction.emoji == FIRST_EMOJI:
+ if current_page == 0:
+ log.debug("Got first page reaction, but we're on the first page - ignoring")
+ continue
+
+ current_page = 0
+ reaction_type = "first"
+
+ # Last reaction press - [:track_next:]
+ if reaction.emoji == LAST_EMOJI:
+ if current_page >= len(paginator.pages) - 1:
+ log.debug("Got last page reaction, but we're on the last page - ignoring")
+ continue
+
+ current_page = len(paginator.pages) - 1
+ reaction_type = "last"
+
+ # Previous reaction press - [:arrow_left: ]
+ if reaction.emoji == LEFT_EMOJI:
+ if current_page <= 0:
+ log.debug("Got previous page reaction, but we're on the first page - ignoring")
+ continue
+
+ current_page -= 1
+ reaction_type = "previous"
+
+ # Next reaction press - [:arrow_right:]
+ if reaction.emoji == RIGHT_EMOJI:
+ if current_page >= len(paginator.pages) - 1:
+ log.debug("Got next page reaction, but we're on the last page - ignoring")
+ continue
+
+ current_page += 1
+ reaction_type = "next"
+
+ # Magic happens here, after page and reaction_type is set
+ embed.description = ""
+ await message.edit(embed=embed)
+ embed.description = paginator.pages[current_page]
+
+ image = paginator.images[current_page]
+ if image:
+ embed.set_image(url=image)
+
+ embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}")
+ log.debug(f"Got {reaction_type} page reaction - changing to page {current_page + 1}/{len(paginator.pages)}")
+
+ await message.edit(embed=embed)
+
+ log.debug("Ending pagination and clearing reactions...")
+ await message.clear_reactions()
diff --git a/bot/utils/persist.py b/bot/utils/persist.py
index a60a1219..d78e5420 100644
--- a/bot/utils/persist.py
+++ b/bot/utils/persist.py
@@ -2,7 +2,7 @@ import sqlite3
from pathlib import Path
from shutil import copyfile
-from bot.seasons.season import get_seasons
+from bot.exts import get_package_names
DIRECTORY = Path("data") # directory that has a persistent volume mapped to it
@@ -41,7 +41,7 @@ def make_persistent(file_path: Path) -> Path:
raise OSError(f"File not found at {file_path}.")
# detect season in datafile path for assigning to subdirectory
- season = next((s for s in get_seasons() if s in file_path.parts), None)
+ season = next((s for s in get_package_names() if s in file_path.parts), None)
if season:
# make sure subdirectory exists first