diff options
author | 2021-03-31 19:37:06 +0100 | |
---|---|---|
committer | 2021-03-31 19:37:06 +0100 | |
commit | c2f664488f739f29ded5d309b250119aec3f3051 (patch) | |
tree | b1f4411c47646253d68e1975fa0650e6dcc307f1 | |
parent | Merge pull request #1491 from python-discord/fix/dmrelay (diff) | |
parent | Branding: log after successful fetch (diff) |
Merge pull request #1463 from kwzrd/kwzrd/branding
-rw-r--r-- | Pipfile | 1 | ||||
-rw-r--r-- | Pipfile.lock | 10 | ||||
-rw-r--r-- | bot/decorators.py | 23 | ||||
-rw-r--r-- | bot/errors.py | 6 | ||||
-rw-r--r-- | bot/exts/backend/branding/__init__.py | 6 | ||||
-rw-r--r-- | bot/exts/backend/branding/_cog.py | 887 | ||||
-rw-r--r-- | bot/exts/backend/branding/_constants.py | 51 | ||||
-rw-r--r-- | bot/exts/backend/branding/_decorators.py | 27 | ||||
-rw-r--r-- | bot/exts/backend/branding/_errors.py | 2 | ||||
-rw-r--r-- | bot/exts/backend/branding/_repository.py | 240 | ||||
-rw-r--r-- | bot/exts/backend/branding/_seasons.py | 175 | ||||
-rw-r--r-- | bot/exts/backend/error_handler.py | 7 |
12 files changed, 766 insertions, 669 deletions
@@ -23,6 +23,7 @@ lxml = "~=4.4" markdownify = "==0.5.3" more_itertools = "~=8.2" python-dateutil = "~=2.8" +python-frontmatter = "~=1.0.0" pyyaml = "~=5.1" requests = "~=2.22" sentry-sdk = "~=0.19" diff --git a/Pipfile.lock b/Pipfile.lock index d16cef2a8..cbec48ef0 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e5b57ca7276af4709b345055d4b3705c4142c61c4669c796b79a73379ec37a9a" + "sha256": "91b5639198b35740611e7ac923cfc262e5897b8cbc3ca243dc98335705804ba7" }, "pipfile-spec": 6, "requires": { @@ -613,6 +613,14 @@ "index": "pypi", "version": "==2.8.1" }, + "python-frontmatter": { + "hashes": [ + "sha256:766ae75f1b301ffc5fe3494339147e0fd80bc3deff3d7590a93991978b579b08", + "sha256:e98152e977225ddafea6f01f40b4b0f1de175766322004c826ca99842d19a7cd" + ], + "index": "pypi", + "version": "==1.0.0" + }, "pytz": { "hashes": [ "sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da", diff --git a/bot/decorators.py b/bot/decorators.py index 063c8f878..0b50cc365 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -1,4 +1,5 @@ import asyncio +import functools import logging import typing as t from contextlib import suppress @@ -8,7 +9,7 @@ from discord import Member, NotFound from discord.ext import commands from discord.ext.commands import Cog, Context -from bot.constants import Channels, RedirectOutput +from bot.constants import Channels, DEBUG_MODE, RedirectOutput from bot.utils import function from bot.utils.checks import in_whitelist_check @@ -153,3 +154,23 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable: await func(*args, **kwargs) return wrapper return decorator + + +def mock_in_debug(return_value: t.Any) -> t.Callable: + """ + Short-circuit function execution if in debug mode and return `return_value`. + + The original function name, and the incoming args and kwargs are DEBUG level logged + upon each call. This is useful for expensive operations, i.e. media asset uploads + that are prone to rate-limits but need to be tested extensively. + """ + def decorator(func: t.Callable) -> t.Callable: + @functools.wraps(func) + async def wrapped(*args, **kwargs) -> t.Any: + """Short-circuit and log if in debug mode.""" + if DEBUG_MODE: + log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}") + return return_value + return await func(*args, **kwargs) + return wrapped + return decorator diff --git a/bot/errors.py b/bot/errors.py index ab0adcd42..3544c6320 100644 --- a/bot/errors.py +++ b/bot/errors.py @@ -35,3 +35,9 @@ class InvalidInfractedUser(Exception): self.reason = reason super().__init__(reason) + + +class BrandingMisconfiguration(RuntimeError): + """Raised by the Branding cog when a misconfigured event is encountered.""" + + pass diff --git a/bot/exts/backend/branding/__init__.py b/bot/exts/backend/branding/__init__.py index 81ea3bf49..20a747b7f 100644 --- a/bot/exts/backend/branding/__init__.py +++ b/bot/exts/backend/branding/__init__.py @@ -1,7 +1,7 @@ from bot.bot import Bot -from bot.exts.backend.branding._cog import BrandingManager +from bot.exts.backend.branding._cog import Branding def setup(bot: Bot) -> None: - """Loads BrandingManager cog.""" - bot.add_cog(BrandingManager(bot)) + """Load Branding cog.""" + bot.add_cog(Branding(bot)) diff --git a/bot/exts/backend/branding/_cog.py b/bot/exts/backend/branding/_cog.py index 20df83a89..0a4ddcc88 100644 --- a/bot/exts/backend/branding/_cog.py +++ b/bot/exts/backend/branding/_cog.py @@ -1,566 +1,647 @@ import asyncio -import itertools +import contextlib import logging import random import typing as t from datetime import datetime, time, timedelta +from enum import Enum +from operator import attrgetter -import arrow import async_timeout import discord from async_rediscache import RedisCache -from discord.ext import commands +from discord.ext import commands, tasks from bot.bot import Bot -from bot.constants import Branding, Colours, Emojis, Guild, MODERATION_ROLES -from bot.exts.backend.branding import _constants, _decorators, _errors, _seasons +from bot.constants import Branding as BrandingConfig, Channels, Colours, Guild, MODERATION_ROLES +from bot.decorators import mock_in_debug +from bot.exts.backend.branding._repository import BrandingRepository, Event, RemoteObject log = logging.getLogger(__name__) -class GitHubFile(t.NamedTuple): +class AssetType(Enum): """ - Represents a remote file on GitHub. + Recognised Discord guild asset types. - The `sha` hash is kept so that we can determine that a file has changed, - despite its filename remaining unchanged. + The value of each member corresponds exactly to a kwarg that can be passed to `Guild.edit`. """ - download_url: str - path: str - sha: str + BANNER = "banner" + ICON = "icon" -def pretty_files(files: t.Iterable[GitHubFile]) -> str: - """Provide a human-friendly representation of `files`.""" - return "\n".join(file.path for file in files) +def compound_hash(objects: t.Iterable[RemoteObject]) -> str: + """ + Join SHA attributes of `objects` into a single string. + + Compound hashes are cached to check for change in any of the member `objects`. + """ + return "-".join(item.sha for item in objects) + + +def make_embed(title: str, description: str, *, success: bool) -> discord.Embed: + """ + Construct simple response embed. + + If `success` is True, use green colour, otherwise red. + + For both `title` and `description`, empty string are valid values ~ fields will be empty. + """ + colour = Colours.soft_green if success else Colours.soft_red + return discord.Embed(title=title[:256], description=description[:2048], colour=colour) -def time_until_midnight() -> timedelta: +def extract_event_duration(event: Event) -> str: """ - Determine amount of time until the next-up UTC midnight. + Extract a human-readable, year-agnostic duration string from `event`. - The exact `midnight` moment is actually delayed to 5 seconds after, in order - to avoid potential problems due to imprecise sleep. + In the case that `event` is a fallback event, resolves to 'Fallback'. """ - now = datetime.utcnow() - tomorrow = now + timedelta(days=1) - midnight = datetime.combine(tomorrow, time(second=5)) + if event.meta.is_fallback: + return "Fallback" - return midnight - now + fmt = "%B %d" # Ex: August 23 + start_date = event.meta.start_date.strftime(fmt) + end_date = event.meta.end_date.strftime(fmt) + return f"{start_date} - {end_date}" -class BrandingManager(commands.Cog): + +def extract_event_name(event: Event) -> str: """ - Manages the guild's branding. - - The purpose of this cog is to help automate the synchronization of the branding - repository with the guild. It is capable of discovering assets in the repository - via GitHub's API, resolving download urls for them, and delegating - to the `bot` instance to upload them to the guild. - - BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens - once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single - season. The daemon can be turned on and off via the `daemon` cmd group. The value set via - its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will - automatically start on the next bot start-up. Otherwise, it will wait to be started manually. - - All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can - also be invoked manually, via the following API: - - branding list - - Show all available seasons - - branding set <season_name> - - Set the cog's internal state to represent `season_name`, if it exists - - If no `season_name` is given, set chronologically current season - - This will not automatically apply the season's branding to the guild, - the cog's state can be detached from the guild - - Seasons can therefore be 'previewed' using this command - - branding info - - View detailed information about resolved assets for current season - - branding refresh - - Refresh internal state, i.e. synchronize with branding repository - - branding apply - - Apply the current internal state to the guild, i.e. upload the assets - - branding cycle - - If there are multiple available icons for current season, randomly pick - and apply the next one - - The daemon calls these methods autonomously as appropriate. The use of this cog - is locked to moderation roles. As it performs media asset uploads, it is prone to - rate-limits - the `apply` command should be used with caution. The `set` command can, - however, be used freely to 'preview' seasonal branding and check whether paths have been - resolved as appropriate. - - While the bot is in debug mode, it will 'mock' asset uploads by logging the passed - download urls and pretending that the upload was successful. Make use of this - to test this cog's behaviour. + Extract title-cased event name from the path of `event`. + + An event with a path of 'events/black_history_month' will resolve to 'Black History Month'. """ + name = event.path.split("/")[-1] # Inner-most directory name. + words = name.split("_") # Words from snake case. - current_season: t.Type[_seasons.SeasonBase] + return " ".join(word.title() for word in words) - banner: t.Optional[GitHubFile] - available_icons: t.List[GitHubFile] - remaining_icons: t.List[GitHubFile] +class Branding(commands.Cog): + """ + Guild branding management. - days_since_cycle: t.Iterator + Extension responsible for automatic synchronisation of the guild's branding with the branding repository. + Event definitions and assets are automatically discovered and applied as appropriate. - daemon: t.Optional[asyncio.Task] + All state is stored in Redis. The cog should therefore seamlessly transition across restarts and maintain + a consistent icon rotation schedule for events with multiple icon assets. - # Branding configuration - branding_configuration = RedisCache() + By caching hashes of banner & icon assets, we discover changes in currently applied assets and always keep + the latest version applied. + + The command interface allows moderators+ to control the daemon or request asset synchronisation, while + regular users can see information about the current event and the overall event schedule. + """ + + # RedisCache[ + # "daemon_active": bool | If True, daemon starts on start-up. Controlled via commands. + # "event_path": str | Current event's path in the branding repo. + # "event_description": str | Current event's Markdown description. + # "event_duration": str | Current event's human-readable date range. + # "banner_hash": str | SHA of the currently applied banner. + # "icons_hash": str | Compound SHA of all icons in current rotation. + # "last_rotation_timestamp": float | POSIX UTC timestamp. + # ] + cache_information = RedisCache() + + # Icons in current rotation. Keys (str) are download URLs, values (int) track the amount of times each + # icon has been used in the current rotation. + cache_icons = RedisCache() + + # All available event names & durations. Cached by the daemon nightly; read by the calendar command. + cache_events = RedisCache() def __init__(self, bot: Bot) -> None: + """Instantiate repository abstraction & allow daemon to start.""" + self.bot = bot + self.repository = BrandingRepository(bot) + + self.bot.loop.create_task(self.maybe_start_daemon()) # Start depending on cache. + + # region: Internal logic & state management + + @mock_in_debug(return_value=True) # Mocked in development environment to prevent API spam. + async def apply_asset(self, asset_type: AssetType, download_url: str) -> bool: """ - Assign safe default values on init. + Download asset from `download_url` and apply it to PyDis as `asset_type`. - At this point, we don't have information about currently available branding. - Most of these attributes will be overwritten once the daemon connects, or once - the `refresh` command is used. + Return a boolean indicating whether the application was successful. """ - self.bot = bot - self.current_season = _seasons.get_current_season() + log.info(f"Applying '{asset_type.value}' asset to the guild.") - self.banner = None + try: + file = await self.repository.fetch_file(download_url) + except Exception: + log.exception(f"Failed to fetch '{asset_type.value}' asset.") + return False - self.available_icons = [] - self.remaining_icons = [] + await self.bot.wait_until_guild_available() + pydis: discord.Guild = self.bot.get_guild(Guild.id) - self.days_since_cycle = itertools.cycle([None]) + timeout = 10 # Seconds. + try: + with async_timeout.timeout(timeout): # Raise after `timeout` seconds. + await pydis.edit(**{asset_type.value: file}) + except discord.HTTPException: + log.exception("Asset upload to Discord failed.") + return False + except asyncio.TimeoutError: + log.error(f"Asset upload to Discord timed out after {timeout} seconds.") + return False + else: + log.trace("Asset uploaded successfully.") + return True - self.daemon = None - self._startup_task = self.bot.loop.create_task(self._initial_start_daemon()) + async def apply_banner(self, banner: RemoteObject) -> bool: + """ + Apply `banner` to the guild and cache its hash if successful. + + Banners should always be applied via this method to ensure that the last hash is cached. + + Return a boolean indicating whether the application was successful. + """ + success = await self.apply_asset(AssetType.BANNER, banner.download_url) - async def _initial_start_daemon(self) -> None: - """Checks is daemon active and when is, start it at cog load.""" - if await self.branding_configuration.get("daemon_active"): - self.daemon = self.bot.loop.create_task(self._daemon_func()) + if success: + await self.cache_information.set("banner_hash", banner.sha) - @property - def _daemon_running(self) -> bool: - """True if the daemon is currently active, False otherwise.""" - return self.daemon is not None and not self.daemon.done() + return success - async def _daemon_func(self) -> None: + async def rotate_icons(self) -> bool: """ - Manage all automated behaviour of the BrandingManager cog. + Choose and apply the next-up icon in rotation. + + We keep track of the amount of times each icon has been used. The values in `cache_icons` can be understood + to be iteration IDs. When an icon is chosen & applied, we bump its count, pushing it into the next iteration. - Once a day, the daemon will perform the following tasks: - - Update `current_season` - - Poll GitHub API to see if the available branding for `current_season` has changed - - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname) - - Check whether it's time to cycle guild icons + Once the current iteration (lowest count in the cache) depletes, we move onto the next iteration. - The internal loop runs once when activated, then periodically at the time - given by `time_until_midnight`. + In the case that there is only 1 icon in the rotation and has already been applied, do nothing. - All method calls in the internal loop are considered safe, i.e. no errors propagate - to the daemon's loop. The daemon itself does not perform any error handling on its own. + Return a boolean indicating whether a new icon was applied successfully. """ - await self.bot.wait_until_guild_available() + log.debug("Rotating icons.") - while True: - self.current_season = _seasons.get_current_season() - branding_changed = await self.refresh() + state = await self.cache_icons.to_dict() + log.trace(f"Total icons in rotation: {len(state)}.") - if branding_changed: - await self.apply() + if not state: # This would only happen if rotation not initiated, but we can handle gracefully. + log.warning("Attempted icon rotation with an empty icon cache. This indicates wrong logic.") + return False - elif next(self.days_since_cycle) == Branding.cycle_frequency: - await self.cycle() + if len(state) == 1 and 1 in state.values(): + log.debug("Aborting icon rotation: only 1 icon is available and has already been applied.") + return False - until_midnight = time_until_midnight() - await asyncio.sleep(until_midnight.total_seconds()) + current_iteration = min(state.values()) # Choose iteration to draw from. + options = [download_url for download_url, times_used in state.items() if times_used == current_iteration] - async def _info_embed(self) -> discord.Embed: - """Make an informative embed representing current season.""" - info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour) + log.trace(f"Choosing from {len(options)} icons in iteration {current_iteration}.") + next_icon = random.choice(options) - # If we're in a non-evergreen season, also show active months - if self.current_season is not _seasons.SeasonBase: - title = f"{self.current_season.season_name} ({', '.join(str(m) for m in self.current_season.months)})" - else: - title = self.current_season.season_name + success = await self.apply_asset(AssetType.ICON, next_icon) + + if success: + await self.cache_icons.increment(next_icon) # Push the icon into the next iteration. + + timestamp = datetime.utcnow().timestamp() + await self.cache_information.set("last_rotation_timestamp", timestamp) + + return success - # Use the author field to show the season's name and avatar if available - info_embed.set_author(name=title) + async def maybe_rotate_icons(self) -> None: + """ + Call `rotate_icons` if the configured amount of time has passed since last rotation. - banner = self.banner.path if self.banner is not None else "Unavailable" - info_embed.add_field(name="Banner", value=banner, inline=False) + We offset the calculated time difference into the future to avoid off-by-a-little-bit errors. Because there + is work to be done before the timestamp is read and written, the next read will likely commence slightly + under 24 hours after the last write. + """ + log.debug("Checking whether it's time for icons to rotate.") - icons = pretty_files(self.available_icons) or "Unavailable" - info_embed.add_field(name="Available icons", value=icons, inline=False) + last_rotation_timestamp = await self.cache_information.get("last_rotation_timestamp") - # Only display cycle frequency if we're actually cycling - if len(self.available_icons) > 1 and Branding.cycle_frequency: - info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}") + if last_rotation_timestamp is None: # Maiden case ~ never rotated. + await self.rotate_icons() + return - return info_embed + last_rotation = datetime.fromtimestamp(last_rotation_timestamp) + difference = (datetime.utcnow() - last_rotation) + timedelta(minutes=5) - async def _reset_remaining_icons(self) -> None: - """Set `remaining_icons` to a shuffled copy of `available_icons`.""" - self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons)) + log.trace(f"Icons last rotated at {last_rotation} (difference: {difference}).") - async def _reset_days_since_cycle(self) -> None: + if difference.days >= BrandingConfig.cycle_frequency: + await self.rotate_icons() + + async def initiate_icon_rotation(self, available_icons: t.List[RemoteObject]) -> None: """ - Reset the `days_since_cycle` iterator based on configured frequency. + Set up a new icon rotation. - If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey, - the iterator will always yield None. This signals that the icon shouldn't be cycled. + This function should be called whenever available icons change. This is generally the case when we enter + a new event, but potentially also when the assets of an on-going event change. In such cases, a reset + of `cache_icons` is necessary, because it contains download URLs which may have gotten stale. - Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely. - When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle. + This function does not upload a new icon! """ - if len(self.available_icons) > 1 and Branding.cycle_frequency: - sequence = range(1, Branding.cycle_frequency + 1) - else: - sequence = [None] + log.debug("Initiating new icon rotation.") - self.days_since_cycle = itertools.cycle(sequence) + await self.cache_icons.clear() - async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]: + new_state = {icon.download_url: 0 for icon in available_icons} + await self.cache_icons.update(new_state) + + log.trace(f"Icon rotation initiated for {len(new_state)} icons.") + + await self.cache_information.set("icons_hash", compound_hash(available_icons)) + + async def send_info_embed(self, channel_id: int, *, is_notification: bool) -> None: """ - Get files at `path` in the branding repository. + Send the currently cached event description to `channel_id`. - If `include_dirs` is False (default), only returns files at `path`. - Otherwise, will return both files and directories. Never returns symlinks. + When `is_notification` holds, a short contextual message for the #changelog channel is added. - Return dict mapping from filename to corresponding `GitHubFile` instance. - This may return an empty dict if the response status is non-200, - or if the target directory is empty. + We read event information from `cache_information`. The caller is therefore responsible for making + sure that the cache is up-to-date before calling this function. """ - url = f"{_constants.BRANDING_URL}/{path}" - async with self.bot.http_session.get( - url, headers=_constants.HEADERS, params=_constants.PARAMS - ) as resp: - # Short-circuit if we get non-200 response - if resp.status != _constants.STATUS_OK: - log.error(f"GitHub API returned non-200 response: {resp}") - return {} - directory = await resp.json() # Directory at `path` + log.debug(f"Sending event information event to channel: {channel_id} ({is_notification=}).") - allowed_types = {"file", "dir"} if include_dirs else {"file"} - return { - file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"]) - for file in directory - if file["type"] in allowed_types - } + await self.bot.wait_until_guild_available() + channel: t.Optional[discord.TextChannel] = self.bot.get_channel(channel_id) - async def refresh(self) -> bool: + if channel is None: + log.warning(f"Cannot send event information: channel {channel_id} not found!") + return + + log.trace(f"Destination channel: #{channel.name}.") + + description = await self.cache_information.get("event_description") + duration = await self.cache_information.get("event_duration") + + if None in (description, duration): + content = None + embed = make_embed("No event in cache", "Is the daemon enabled?", success=False) + + else: + content = "Python Discord is entering a new event!" if is_notification else None + embed = discord.Embed(description=description[:2048], colour=discord.Colour.blurple()) + embed.set_footer(text=duration[:2048]) + + await channel.send(content=content, embed=embed) + + async def enter_event(self, event: Event) -> t.Tuple[bool, bool]: """ - Synchronize available assets with branding repository. + Apply `event` assets and update information cache. - If the current season is not the evergreen, and lacks at least one asset, - we use the evergreen seasonal dir as fallback for missing assets. + We cache `event` information to ensure that we: + * Remember which event we're currently in across restarts + * Provide an on-demand informational embed without re-querying the branding repository - Finally, if neither the seasonal nor fallback branding directories contain - an asset, it will simply be ignored. + An event change should always be handled via this function, as it ensures that the cache is populated. - Return True if the branding has changed. This will be the case when we enter - a new season, or when something changes in the current seasons's directory - in the branding repository. + The #changelog notification is omitted when `event` is fallback, or already applied. + + Return a 2-tuple indicating whether the banner, and the icon, were applied successfully. """ - old_branding = (self.banner, self.available_icons) - seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True) + log.info(f"Entering event: '{event.path}'.") - # Only make a call to the fallback directory if there is something to be gained - branding_incomplete = any( - asset not in seasonal_dir - for asset in (_constants.FILE_BANNER, _constants.FILE_AVATAR, _constants.SERVER_ICONS) - ) - if branding_incomplete and self.current_season is not _seasons.SeasonBase: - fallback_dir = await self._get_files( - _seasons.SeasonBase.branding_path, include_dirs=True - ) - else: - fallback_dir = {} + banner_success = await self.apply_banner(event.banner) # Only one asset ~ apply directly. - # Resolve assets in this directory, None is a safe value - self.banner = ( - seasonal_dir.get(_constants.FILE_BANNER) - or fallback_dir.get(_constants.FILE_BANNER) - ) + await self.initiate_icon_rotation(event.icons) # Prepare a new rotation. + icon_success = await self.rotate_icons() # Apply an icon from the new rotation. - # Now resolve server icons by making a call to the proper sub-directory - if _constants.SERVER_ICONS in seasonal_dir: - icons_dir = await self._get_files( - f"{self.current_season.branding_path}/{_constants.SERVER_ICONS}" - ) - self.available_icons = list(icons_dir.values()) + # This will only be False in the case of a manual same-event re-synchronisation. + event_changed = event.path != await self.cache_information.get("event_path") - elif _constants.SERVER_ICONS in fallback_dir: - icons_dir = await self._get_files( - f"{_seasons.SeasonBase.branding_path}/{_constants.SERVER_ICONS}" - ) - self.available_icons = list(icons_dir.values()) + # Cache event identity to avoid re-entry in case of restart. + await self.cache_information.set("event_path", event.path) + # Cache information shown in the 'about' embed. + await self.populate_cache_event_description(event) + + # Notify guild of new event ~ this reads the information that we cached above. + if event_changed and not event.meta.is_fallback: + await self.send_info_embed(Channels.change_log, is_notification=True) else: - self.available_icons = [] # This should never be the case, but an empty list is a safe value + log.trace("Omitting #changelog notification. Event has not changed, or new event is fallback.") - # GitHubFile instances carry a `sha` attr so this will pick up if a file changes - branding_changed = old_branding != (self.banner, self.available_icons) + return banner_success, icon_success - if branding_changed: - log.info(f"New branding detected (season: {self.current_season.season_name})") - await self._reset_remaining_icons() - await self._reset_days_since_cycle() + async def synchronise(self) -> t.Tuple[bool, bool]: + """ + Fetch the current event and delegate to `enter_event`. - return branding_changed + This is a convenience function to force synchronisation via a command. It should generally only be used + in a recovery scenario. In the usual case, the daemon already has an `Event` instance and can pass it + to `enter_event` directly. - async def cycle(self) -> bool: + Return a 2-tuple indicating whether the banner, and the icon, were applied successfully. """ - Apply the next-up server icon. + log.debug("Synchronise: fetching current event.") - Returns True if an icon is available and successfully gets applied, False otherwise. - """ - if not self.available_icons: - log.info("Cannot cycle: no icons for this season") - return False + current_event, available_events = await self.repository.get_current_event() - if not self.remaining_icons: - log.info("Reset & shuffle remaining icons") - await self._reset_remaining_icons() + await self.populate_cache_events(available_events) - next_up = self.remaining_icons.pop(0) - success = await self.set_icon(next_up.download_url) + if current_event is None: + log.error("Failed to fetch event. Cannot synchronise!") + return False, False - return success + return await self.enter_event(current_event) - async def apply(self) -> t.List[str]: + async def populate_cache_events(self, events: t.List[Event]) -> None: """ - Apply current branding to the guild and bot. - - This delegates to the bot instance to do all the work. We only provide download urls - for available assets. Assets unavailable in the branding repo will be ignored. + Clear `cache_events` and re-populate with names and durations of `events`. - Returns a list of names of all failed assets. An asset is considered failed - if it isn't found in the branding repo, or if something goes wrong while the - bot is trying to apply it. + For each event, we store its name and duration string. This is the information presented to users in the + calendar command. If a format change is needed, it has to be done here. - An empty list denotes that all assets have been applied successfully. + The cache does not store the fallback event, as it is not shown in the calendar. """ - report = {asset: False for asset in ("banner", "icon")} + log.debug("Populating events cache.") - if self.banner is not None: - report["banner"] = await self.set_banner(self.banner.download_url) + await self.cache_events.clear() - report["icon"] = await self.cycle() + no_fallback = [event for event in events if not event.meta.is_fallback] + chronological_events = sorted(no_fallback, key=attrgetter("meta.start_date")) - failed_assets = [asset for asset, succeeded in report.items() if not succeeded] - return failed_assets + log.trace(f"Writing {len(chronological_events)} events (fallback omitted).") - @commands.has_any_role(*MODERATION_ROLES) - @commands.group(name="branding") - async def branding_cmds(self, ctx: commands.Context) -> None: - """Manual branding control.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) + with contextlib.suppress(ValueError): # Cache raises when updated with an empty dict. + await self.cache_events.update({ + extract_event_name(event): extract_event_duration(event) + for event in chronological_events + }) - @branding_cmds.command(name="list", aliases=["ls"]) - async def branding_list(self, ctx: commands.Context) -> None: - """List all available seasons and branding sources.""" - embed = discord.Embed(title="Available seasons", colour=Colours.soft_green) + async def populate_cache_event_description(self, event: Event) -> None: + """ + Cache `event` description & duration. - for season in _seasons.get_all_seasons(): - if season is _seasons.SeasonBase: - active_when = "always" - else: - active_when = f"in {', '.join(str(m) for m in season.months)}" + This should be called when entering a new event, and can be called periodically to ensure that the cache + holds fresh information in the case that the event remains the same, but its description changes. - description = ( - f"Active {active_when}\n" - f"Branding: {season.branding_path}" - ) - embed.add_field(name=season.season_name, value=description, inline=False) + The duration is stored formatted for the frontend. It is not intended to be used programmatically. + """ + log.debug("Caching event description & duration.") - await ctx.send(embed=embed) + await self.cache_information.set("event_description", event.meta.description) + await self.cache_information.set("event_duration", extract_event_duration(event)) - @branding_cmds.command(name="set") - async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None: + # endregion + # region: Daemon + + async def maybe_start_daemon(self) -> None: """ - Manually set season, or reset to current if none given. + Start the daemon depending on cache state. - Season search is a case-less comparison against both seasonal class name, - and its `season_name` attr. + The daemon will only start if it has been explicitly enabled via a command. + """ + log.debug("Checking whether daemon should start.") - This only pre-loads the cog's internal state to the chosen season, but does not - automatically apply the branding. As that is an expensive operation, the `apply` - command must be called explicitly after this command finishes. + should_begin: t.Optional[bool] = await self.cache_information.get("daemon_active") # None if never set! - This means that this command can be used to 'preview' a season gathering info - about its available assets, without applying them to the guild. + if should_begin: + self.daemon_loop.start() - If the daemon is running, it will automatically reset the season to current when - it wakes up. The season set via this command can therefore remain 'detached' from - what it should be - the daemon will make sure that it's set back properly. + def cog_unload(self) -> None: """ - if season_name is None: - new_season = _seasons.get_current_season() - else: - new_season = _seasons.get_season(season_name) - if new_season is None: - raise _errors.BrandingError("No such season exists") + Cancel the daemon in case of cog unload. - if self.current_season is new_season: - raise _errors.BrandingError(f"Season {self.current_season.season_name} already active") + This is **not** done automatically! The daemon otherwise remains active in the background. + """ + log.debug("Cog unload: cancelling daemon.") - self.current_season = new_season - await self.branding_refresh(ctx) + self.daemon_loop.cancel() - @branding_cmds.command(name="info", aliases=["status"]) - async def branding_info(self, ctx: commands.Context) -> None: + async def daemon_main(self) -> None: """ - Show available assets for current season. + Synchronise guild & caches with branding repository. - This can be used to confirm that assets have been resolved properly. - When `apply` is used, it attempts to upload exactly the assets listed here. + Pull the currently active event from the branding repository and check whether it matches the currently + active event in the cache. If not, apply the new event. + + However, it is also possible that an event's assets change as it's active. To account for such cases, + we check the banner & icons hashes against the currently cached values. If there is a mismatch, each + specific asset is re-applied. """ - await ctx.send(embed=await self._info_embed()) + log.info("Daemon main: checking current event.") - @branding_cmds.command(name="refresh") - async def branding_refresh(self, ctx: commands.Context) -> None: - """Sync currently available assets with branding repository.""" - async with ctx.typing(): - await self.refresh() - await self.branding_info(ctx) + new_event, available_events = await self.repository.get_current_event() + + await self.populate_cache_events(available_events) + + if new_event is None: + log.warning("Daemon main: failed to get current event from branding repository, will do nothing.") + return + + if new_event.path != await self.cache_information.get("event_path"): + log.debug("Daemon main: new event detected!") + await self.enter_event(new_event) + return + + await self.populate_cache_event_description(new_event) # Cache fresh frontend info in case of change. - @branding_cmds.command(name="apply") - async def branding_apply(self, ctx: commands.Context) -> None: + log.trace("Daemon main: event has not changed, checking for change in assets.") + + if new_event.banner.sha != await self.cache_information.get("banner_hash"): + log.debug("Daemon main: detected banner change.") + await self.apply_banner(new_event.banner) + + if compound_hash(new_event.icons) != await self.cache_information.get("icons_hash"): + log.debug("Daemon main: detected icon change.") + await self.initiate_icon_rotation(new_event.icons) + await self.rotate_icons() + else: + await self.maybe_rotate_icons() + + @tasks.loop(hours=24) + async def daemon_loop(self) -> None: """ - Apply current season's branding to the guild. + Call `daemon_main` every 24 hours. - Use `info` to check which assets will be applied. Shows which assets have - failed to be applied, if any. + The scheduler maintains an exact 24-hour frequency even if this coroutine takes time to complete. If the + coroutine is started at 00:01 and completes at 00:05, it will still be started at 00:01 the next day. """ - async with ctx.typing(): - failed_assets = await self.apply() - if failed_assets: - raise _errors.BrandingError( - f"Failed to apply following assets: {', '.join(failed_assets)}" - ) + log.trace("Daemon loop: calling daemon main.") - response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) + try: + await self.daemon_main() + except Exception: + log.exception("Daemon loop: failed with an unhandled exception!") - @branding_cmds.command(name="cycle") - async def branding_cycle(self, ctx: commands.Context) -> None: + @daemon_loop.before_loop + async def daemon_before(self) -> None: """ - Apply the next-up guild icon, if multiple are available. + Call `daemon_loop` immediately, then block the loop until the next-up UTC midnight. - The order is random. + The first iteration is invoked directly such that synchronisation happens immediately after daemon start. + We then calculate the time until the next-up midnight and sleep before letting `daemon_loop` begin. """ - async with ctx.typing(): - success = await self.cycle() - if not success: - raise _errors.BrandingError("Failed to cycle icon") + log.trace("Daemon before: performing start-up iteration.") + + await self.daemon_loop() + + log.trace("Daemon before: calculating time to sleep before loop begins.") + now = datetime.utcnow() - response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) + # The actual midnight moment is offset into the future to prevent issues with imprecise sleep. + tomorrow = now + timedelta(days=1) + midnight = datetime.combine(tomorrow, time(minute=1)) - @branding_cmds.group(name="daemon", aliases=["d", "task"]) - async def daemon_group(self, ctx: commands.Context) -> None: - """Control the background daemon.""" + sleep_secs = (midnight - now).total_seconds() + log.trace(f"Daemon before: sleeping {sleep_secs} seconds before next-up midnight: {midnight}.") + + await asyncio.sleep(sleep_secs) + + # endregion + # region: Command interface (branding) + + @commands.group(name="branding") + async def branding_group(self, ctx: commands.Context) -> None: + """Control the branding cog.""" if not ctx.invoked_subcommand: await ctx.send_help(ctx.command) - @daemon_group.command(name="status") - async def daemon_status(self, ctx: commands.Context) -> None: - """Check whether daemon is currently active.""" - if self._daemon_running: - remaining_time = (arrow.utcnow() + time_until_midnight()).humanize() - response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green) - response.set_footer(text=f"Next refresh {remaining_time}") - else: - response = discord.Embed(description="Daemon not running", colour=Colours.soft_red) + @branding_group.command(name="about", aliases=("current", "event")) + async def branding_about_cmd(self, ctx: commands.Context) -> None: + """Show the current event's description and duration.""" + await self.send_info_embed(ctx.channel.id, is_notification=False) - await ctx.send(embed=response) + @commands.has_any_role(*MODERATION_ROLES) + @branding_group.command(name="sync") + async def branding_sync_cmd(self, ctx: commands.Context) -> None: + """ + Force branding synchronisation. - @daemon_group.command(name="start") - async def daemon_start(self, ctx: commands.Context) -> None: - """If the daemon isn't running, start it.""" - if self._daemon_running: - raise _errors.BrandingError("Daemon already running!") + Show which assets have failed to synchronise, if any. + """ + async with ctx.typing(): + banner_success, icon_success = await self.synchronise() - self.daemon = self.bot.loop.create_task(self._daemon_func()) - await self.branding_configuration.set("daemon_active", True) + failed_assets = ", ".join( + name + for name, status in [("banner", banner_success), ("icon", icon_success)] + if status is False + ) - response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) + if failed_assets: + resp = make_embed("Synchronisation unsuccessful", f"Failed to apply: {failed_assets}.", success=False) + resp.set_footer(text="Check log for details.") + else: + resp = make_embed("Synchronisation successful", "Assets have been applied.", success=True) - @daemon_group.command(name="stop") - async def daemon_stop(self, ctx: commands.Context) -> None: - """If the daemon is running, stop it.""" - if not self._daemon_running: - raise _errors.BrandingError("Daemon not running!") + await ctx.send(embed=resp) - self.daemon.cancel() - await self.branding_configuration.set("daemon_active", False) + # endregion + # region: Command interface (branding calendar) + + @branding_group.group(name="calendar", aliases=("schedule", "events")) + async def branding_calendar_group(self, ctx: commands.Context) -> None: + """ + Show the current event calendar. - response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) + We draw event information from `cache_events` and use each key-value pair to create a field in the response + embed. As such, we do not need to query the API to get event information. The cache is automatically + re-populated by the daemon whenever it makes a request. A moderator+ can also explicitly request a cache + refresh using the 'refresh' subcommand. - async def _fetch_image(self, url: str) -> bytes: - """Retrieve and read image from `url`.""" - log.debug(f"Getting image from: {url}") - async with self.bot.http_session.get(url) as resp: - return await resp.read() + Due to Discord limitations, we only show up to 25 events. This is entirely sufficient at the time of writing. + In the case that we find ourselves with more than 25 events, a warning log will alert core devs. - async def _apply_asset(self, target: discord.Guild, asset: _constants.AssetType, url: str) -> bool: + In the future, we may be interested in a field-paginating solution. """ - Internal method for applying media assets to the guild. + if ctx.invoked_subcommand: + # If you're wondering why this works: when the 'refresh' subcommand eventually re-invokes + # this group, the attribute will be automatically set to None by the framework. + return + + available_events = await self.cache_events.to_dict() + log.trace(f"Found {len(available_events)} cached events available for calendar view.") + + if not available_events: + resp = make_embed("No events found!", "Cache may be empty, try `branding calendar refresh`.", success=False) + await ctx.send(embed=resp) + return + + embed = discord.Embed(title="Current event calendar", colour=discord.Colour.blurple()) + + # Because Discord embeds can only contain up to 25 fields, we only show the first 25. + first_25 = list(available_events.items())[:25] - This shouldn't be called directly. The purpose of this method is mainly generic - error handling to reduce needless code repetition. + if len(first_25) != len(available_events): # Alert core devs that a paginating solution is now necessary. + log.warning(f"There are {len(available_events)} events, but the calendar view can only display 25.") - Return True if upload was successful, False otherwise. + for name, duration in first_25: + embed.add_field(name=name[:256], value=duration[:1024]) + + embed.set_footer(text="Otherwise, the fallback season is used.") + + await ctx.send(embed=embed) + + @commands.has_any_role(*MODERATION_ROLES) + @branding_calendar_group.command(name="refresh") + async def branding_calendar_refresh_cmd(self, ctx: commands.Context) -> None: """ - log.info(f"Attempting to set {asset.name}: {url}") + Refresh event cache and show current event calendar. - kwargs = {asset.value: await self._fetch_image(url)} - try: - async with async_timeout.timeout(5): - await target.edit(**kwargs) + Supplementary subcommand allowing force-refreshing the event cache. Implemented as a subcommand because + unlike the supergroup, it requires moderator privileges. + """ + log.info("Performing command-requested event cache refresh.") - except asyncio.TimeoutError: - log.info("Asset upload timed out") - return False + async with ctx.typing(): + available_events = await self.repository.get_events() + await self.populate_cache_events(available_events) - except discord.HTTPException as discord_error: - log.exception("Asset upload failed", exc_info=discord_error) - return False + await ctx.invoke(self.branding_calendar_group) + + # endregion + # region: Command interface (branding daemon) + + @commands.has_any_role(*MODERATION_ROLES) + @branding_group.group(name="daemon", aliases=("d",)) + async def branding_daemon_group(self, ctx: commands.Context) -> None: + """Control the branding cog's daemon.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @branding_daemon_group.command(name="enable", aliases=("start", "on")) + async def branding_daemon_enable_cmd(self, ctx: commands.Context) -> None: + """Enable the branding daemon.""" + await self.cache_information.set("daemon_active", True) + if self.daemon_loop.is_running(): + resp = make_embed("Daemon is already enabled!", "", success=False) else: - log.info("Asset successfully applied") - return True + self.daemon_loop.start() + resp = make_embed("Daemon enabled!", "It will now automatically awaken on start-up.", success=True) - @_decorators.mock_in_debug(return_value=True) - async def set_banner(self, url: str) -> bool: - """Set the guild's banner to image at `url`.""" - guild = self.bot.get_guild(Guild.id) - if guild is None: - log.info("Failed to get guild instance, aborting asset upload") - return False + await ctx.send(embed=resp) - return await self._apply_asset(guild, _constants.AssetType.BANNER, url) + @branding_daemon_group.command(name="disable", aliases=("stop", "off")) + async def branding_daemon_disable_cmd(self, ctx: commands.Context) -> None: + """Disable the branding daemon.""" + await self.cache_information.set("daemon_active", False) - @_decorators.mock_in_debug(return_value=True) - async def set_icon(self, url: str) -> bool: - """Sets the guild's icon to image at `url`.""" - guild = self.bot.get_guild(Guild.id) - if guild is None: - log.info("Failed to get guild instance, aborting asset upload") - return False + if self.daemon_loop.is_running(): + self.daemon_loop.cancel() + resp = make_embed("Daemon disabled!", "It will not awaken on start-up.", success=True) + else: + resp = make_embed("Daemon is already disabled!", "", success=False) - return await self._apply_asset(guild, _constants.AssetType.SERVER_ICON, url) + await ctx.send(embed=resp) - def cog_unload(self) -> None: - """Cancels startup and daemon task.""" - self._startup_task.cancel() - if self.daemon is not None: - self.daemon.cancel() + @branding_daemon_group.command(name="status") + async def branding_daemon_status_cmd(self, ctx: commands.Context) -> None: + """Check whether the daemon is currently enabled.""" + if self.daemon_loop.is_running(): + resp = make_embed("Daemon is enabled", "Use `branding daemon disable` to stop.", success=True) + else: + resp = make_embed("Daemon is disabled", "Use `branding daemon enable` to start.", success=False) + + await ctx.send(embed=resp) + + # endregion diff --git a/bot/exts/backend/branding/_constants.py b/bot/exts/backend/branding/_constants.py deleted file mode 100644 index ca8e8c5f5..000000000 --- a/bot/exts/backend/branding/_constants.py +++ /dev/null @@ -1,51 +0,0 @@ -from enum import Enum, IntEnum - -from bot.constants import Keys - - -class Month(IntEnum): - """All month constants for seasons.""" - - JANUARY = 1 - FEBRUARY = 2 - MARCH = 3 - APRIL = 4 - MAY = 5 - JUNE = 6 - JULY = 7 - AUGUST = 8 - SEPTEMBER = 9 - OCTOBER = 10 - NOVEMBER = 11 - DECEMBER = 12 - - def __str__(self) -> str: - return self.name.title() - - -class AssetType(Enum): - """ - Discord media assets. - - The values match exactly the kwarg keys that can be passed to `Guild.edit`. - """ - - BANNER = "banner" - SERVER_ICON = "icon" - - -STATUS_OK = 200 # HTTP status code - -FILE_BANNER = "banner.png" -FILE_AVATAR = "avatar.png" -SERVER_ICONS = "server_icons" - -BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" - -PARAMS = {"ref": "main"} # Target branch -HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3 - -# A GitHub token is not necessary for the cog to operate, -# unauthorized requests are however limited to 60 per hour -if Keys.github: - HEADERS["Authorization"] = f"token {Keys.github}" diff --git a/bot/exts/backend/branding/_decorators.py b/bot/exts/backend/branding/_decorators.py deleted file mode 100644 index 6a1e7e869..000000000 --- a/bot/exts/backend/branding/_decorators.py +++ /dev/null @@ -1,27 +0,0 @@ -import functools -import logging -import typing as t - -from bot.constants import DEBUG_MODE - -log = logging.getLogger(__name__) - - -def mock_in_debug(return_value: t.Any) -> t.Callable: - """ - Short-circuit function execution if in debug mode and return `return_value`. - - The original function name, and the incoming args and kwargs are DEBUG level logged - upon each call. This is useful for expensive operations, i.e. media asset uploads - that are prone to rate-limits but need to be tested extensively. - """ - def decorator(func: t.Callable) -> t.Callable: - @functools.wraps(func) - async def wrapped(*args, **kwargs) -> t.Any: - """Short-circuit and log if in debug mode.""" - if DEBUG_MODE: - log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}") - return return_value - return await func(*args, **kwargs) - return wrapped - return decorator diff --git a/bot/exts/backend/branding/_errors.py b/bot/exts/backend/branding/_errors.py deleted file mode 100644 index 7cd271af3..000000000 --- a/bot/exts/backend/branding/_errors.py +++ /dev/null @@ -1,2 +0,0 @@ -class BrandingError(Exception): - """Exception raised by the BrandingManager cog.""" diff --git a/bot/exts/backend/branding/_repository.py b/bot/exts/backend/branding/_repository.py new file mode 100644 index 000000000..7b09d4641 --- /dev/null +++ b/bot/exts/backend/branding/_repository.py @@ -0,0 +1,240 @@ +import logging +import typing as t +from datetime import date, datetime + +import frontmatter + +from bot.bot import Bot +from bot.constants import Keys +from bot.errors import BrandingMisconfiguration + +# Base URL for requests into the branding repository. +BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" + +PARAMS = {"ref": "main"} # Target branch. +HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3. + +# A GitHub token is not necessary. However, unauthorized requests are limited to 60 per hour. +if Keys.github: + HEADERS["Authorization"] = f"token {Keys.github}" + +# Since event periods are year-agnostic, we parse them into `datetime` objects with a manually inserted year. +# Please note that this is intentionally a leap year to allow Feb 29 to be valid. +ARBITRARY_YEAR = 2020 + +# Format used to parse date strings after we inject `ARBITRARY_YEAR` at the end. +DATE_FMT = "%B %d %Y" # Ex: July 10 2020 + +log = logging.getLogger(__name__) + + +class RemoteObject: + """ + Remote file or directory on GitHub. + + The annotations match keys in the response JSON that we're interested in. + """ + + sha: str # Hash helps us detect asset change. + name: str # Filename. + path: str # Path from repo root. + type: str # Either 'file' or 'dir'. + download_url: t.Optional[str] # If type is 'dir', this is None! + + def __init__(self, dictionary: t.Dict[str, t.Any]) -> None: + """Initialize by grabbing annotated attributes from `dictionary`.""" + missing_keys = self.__annotations__.keys() - dictionary.keys() + if missing_keys: + raise KeyError(f"Fetched object lacks expected keys: {missing_keys}") + for annotation in self.__annotations__: + setattr(self, annotation, dictionary[annotation]) + + +class MetaFile(t.NamedTuple): + """Attributes defined in a 'meta.md' file.""" + + is_fallback: bool + start_date: t.Optional[date] + end_date: t.Optional[date] + description: str # Markdown event description. + + +class Event(t.NamedTuple): + """Event defined in the branding repository.""" + + path: str # Path from repo root where event lives. This is the event's identity. + meta: MetaFile + banner: RemoteObject + icons: t.List[RemoteObject] + + def __str__(self) -> str: + return f"<Event at '{self.path}'>" + + +class BrandingRepository: + """ + Branding repository abstraction. + + This class represents the branding repository's main branch and exposes available events and assets + as objects. It performs the necessary amount of validation to ensure that a misconfigured event + isn't returned. Such events are simply ignored, and will be substituted with the fallback event, + if available. Warning logs will inform core developers if a misconfigured event is encountered. + + Colliding events cause no special behaviour. In such cases, the first found active event is returned. + We work with the assumption that the branding repository checks for such conflicts and prevents them + from reaching the main branch. + + This class keeps no internal state. All `get_current_event` calls will result in GitHub API requests. + The caller is therefore responsible for being responsible and caching information to prevent API abuse. + + Requests are made using the HTTP session looked up on the bot instance. + """ + + def __init__(self, bot: Bot) -> None: + self.bot = bot + + async def fetch_directory(self, path: str, types: t.Container[str] = ("file", "dir")) -> t.Dict[str, RemoteObject]: + """ + Fetch directory found at `path` in the branding repository. + + Raise an exception if the request fails, or if the response lacks the expected keys. + + Passing custom `types` allows getting only files or directories. By default, both are included. + """ + full_url = f"{BRANDING_URL}/{path}" + log.debug(f"Fetching directory from branding repository: '{full_url}'.") + + async with self.bot.http_session.get(full_url, params=PARAMS, headers=HEADERS) as response: + if response.status != 200: + raise RuntimeError(f"Failed to fetch directory due to status: {response.status}") + + log.debug("Fetch successful, reading JSON response.") + json_directory = await response.json() + + return {file["name"]: RemoteObject(file) for file in json_directory if file["type"] in types} + + async def fetch_file(self, download_url: str) -> bytes: + """ + Fetch file as bytes from `download_url`. + + Raise an exception if the request does not succeed. + """ + log.debug(f"Fetching file from branding repository: '{download_url}'.") + + async with self.bot.http_session.get(download_url, params=PARAMS, headers=HEADERS) as response: + if response.status != 200: + raise RuntimeError(f"Failed to fetch file due to status: {response.status}") + + log.debug("Fetch successful, reading payload.") + return await response.read() + + def parse_meta_file(self, raw_file: bytes) -> MetaFile: + """ + Parse a 'meta.md' file from raw bytes. + + The caller is responsible for handling errors caused by misconfiguration. + """ + attrs, description = frontmatter.parse(raw_file, encoding="UTF-8") + + if not description: + raise BrandingMisconfiguration("No description found in 'meta.md'!") + + if attrs.get("fallback", False): + return MetaFile(is_fallback=True, start_date=None, end_date=None, description=description) + + start_date_raw = attrs.get("start_date") + end_date_raw = attrs.get("end_date") + + if None in (start_date_raw, end_date_raw): + raise BrandingMisconfiguration("Non-fallback event doesn't have start and end dates defined!") + + # We extend the configured month & day with an arbitrary leap year, allowing a datetime object to exist. + # This may raise errors if misconfigured. We let the caller handle such cases. + start_date = datetime.strptime(f"{start_date_raw} {ARBITRARY_YEAR}", DATE_FMT).date() + end_date = datetime.strptime(f"{end_date_raw} {ARBITRARY_YEAR}", DATE_FMT).date() + + return MetaFile(is_fallback=False, start_date=start_date, end_date=end_date, description=description) + + async def construct_event(self, directory: RemoteObject) -> Event: + """ + Construct an `Event` instance from an event `directory`. + + The caller is responsible for handling errors caused by misconfiguration. + """ + contents = await self.fetch_directory(directory.path) + + missing_assets = {"meta.md", "banner.png", "server_icons"} - contents.keys() + + if missing_assets: + raise BrandingMisconfiguration(f"Directory is missing following assets: {missing_assets}") + + server_icons = await self.fetch_directory(contents["server_icons"].path, types=("file",)) + + if len(server_icons) == 0: + raise BrandingMisconfiguration("Found no server icons!") + + meta_bytes = await self.fetch_file(contents["meta.md"].download_url) + + meta_file = self.parse_meta_file(meta_bytes) + + return Event(directory.path, meta_file, contents["banner.png"], list(server_icons.values())) + + async def get_events(self) -> t.List[Event]: + """ + Discover available events in the branding repository. + + Misconfigured events are skipped. May return an empty list in the catastrophic case. + """ + log.debug("Discovering events in branding repository.") + + try: + event_directories = await self.fetch_directory("events", types=("dir",)) # Skip files. + except Exception: + log.exception("Failed to fetch 'events' directory.") + return [] + + instances: t.List[Event] = [] + + for event_directory in event_directories.values(): + log.trace(f"Attempting to construct event from directory: '{event_directory.path}'.") + try: + instance = await self.construct_event(event_directory) + except Exception as exc: + log.warning(f"Could not construct event '{event_directory.path}'.", exc_info=exc) + else: + instances.append(instance) + + return instances + + async def get_current_event(self) -> t.Tuple[t.Optional[Event], t.List[Event]]: + """ + Get the currently active event, or the fallback event. + + The second return value is a list of all available events. The caller may discard it, if not needed. + Returning all events alongside the current one prevents having to query the API twice in some cases. + + The current event may be None in the case that no event is active, and no fallback event is found. + """ + utc_now = datetime.utcnow() + log.debug(f"Finding active event for: {utc_now}.") + + # Construct an object in the arbitrary year for the purpose of comparison. + lookup_now = date(year=ARBITRARY_YEAR, month=utc_now.month, day=utc_now.day) + log.trace(f"Lookup object in arbitrary year: {lookup_now}.") + + available_events = await self.get_events() + log.trace(f"Found {len(available_events)} available events.") + + for event in available_events: + meta = event.meta + if not meta.is_fallback and (meta.start_date <= lookup_now <= meta.end_date): + return event, available_events + + log.trace("No active event found. Looking for fallback event.") + + for event in available_events: + if event.meta.is_fallback: + return event, available_events + + log.warning("No event is currently active and no fallback event was found!") + return None, available_events diff --git a/bot/exts/backend/branding/_seasons.py b/bot/exts/backend/branding/_seasons.py deleted file mode 100644 index 5f6256b30..000000000 --- a/bot/exts/backend/branding/_seasons.py +++ /dev/null @@ -1,175 +0,0 @@ -import logging -import typing as t -from datetime import datetime - -from bot.constants import Colours -from bot.exts.backend.branding._constants import Month -from bot.exts.backend.branding._errors import BrandingError - -log = logging.getLogger(__name__) - - -class SeasonBase: - """ - Base for Seasonal classes. - - This serves as the off-season fallback for when no specific - seasons are active. - - Seasons are 'registered' simply by inheriting from `SeasonBase`. - We discover them by calling `__subclasses__`. - """ - - season_name: str = "Evergreen" - - colour: str = Colours.soft_green - description: str = "The default season!" - - branding_path: str = "seasonal/evergreen" - - months: t.Set[Month] = set(Month) - - -class Christmas(SeasonBase): - """Branding for December.""" - - season_name = "Festive season" - - colour = Colours.soft_red - description = ( - "The time is here to get into the festive spirit! No matter who you are, where you are, " - "or what beliefs you may follow, we hope every one of you enjoy this festive season!" - ) - - branding_path = "seasonal/christmas" - - months = {Month.DECEMBER} - - -class Easter(SeasonBase): - """Branding for April.""" - - season_name = "Easter" - - colour = Colours.bright_green - description = ( - "Bunny here, bunny there, bunny everywhere! Here at Python Discord, we celebrate " - "our version of Easter during the entire month of April." - ) - - branding_path = "seasonal/easter" - - months = {Month.APRIL} - - -class Halloween(SeasonBase): - """Branding for October.""" - - season_name = "Halloween" - - colour = Colours.orange - description = "Trick or treat?!" - - branding_path = "seasonal/halloween" - - months = {Month.OCTOBER} - - -class Pride(SeasonBase): - """Branding for June.""" - - season_name = "Pride" - - colour = Colours.pink - description = ( - "The month of June is a special month for us at Python Discord. It is very important to us " - "that everyone feels welcome here, no matter their origin, identity or sexuality. During the " - "month of June, while some of you are participating in Pride festivals across the world, " - "we will be celebrating individuality and commemorating the history and challenges " - "of the LGBTQ+ community with a Pride event of our own!" - ) - - branding_path = "seasonal/pride" - - months = {Month.JUNE} - - -class Valentines(SeasonBase): - """Branding for February.""" - - season_name = "Valentines" - - colour = Colours.pink - description = "Love is in the air!" - - branding_path = "seasonal/valentines" - - months = {Month.FEBRUARY} - - -class Wildcard(SeasonBase): - """Branding for August.""" - - season_name = "Wildcard" - - colour = Colours.purple - description = "A season full of surprises!" - - months = {Month.AUGUST} - - -def get_all_seasons() -> t.List[t.Type[SeasonBase]]: - """Give all available season classes.""" - return [SeasonBase] + SeasonBase.__subclasses__() - - -def get_current_season() -> t.Type[SeasonBase]: - """Give active season, based on current UTC month.""" - current_month = Month(datetime.utcnow().month) - - active_seasons = tuple( - season - for season in SeasonBase.__subclasses__() - if current_month in season.months - ) - - if not active_seasons: - return SeasonBase - - return active_seasons[0] - - -def get_season(name: str) -> t.Optional[t.Type[SeasonBase]]: - """ - Give season such that its class name or its `season_name` attr match `name` (caseless). - - If no such season exists, return None. - """ - name = name.casefold() - - for season in get_all_seasons(): - matches = (season.__name__.casefold(), season.season_name.casefold()) - - if name in matches: - return season - - -def _validate_season_overlap() -> None: - """ - Raise BrandingError if there are any colliding seasons. - - This serves as a local test to ensure that seasons haven't been misconfigured. - """ - month_to_season = {} - - for season in SeasonBase.__subclasses__(): - for month in season.months: - colliding_season = month_to_season.get(month) - - if colliding_season: - raise BrandingError(f"Season {season} collides with {colliding_season} in {month.name}") - else: - month_to_season[month] = season - - -_validate_season_overlap() diff --git a/bot/exts/backend/error_handler.py b/bot/exts/backend/error_handler.py index 9cb54cdab..76ab7dfc2 100644 --- a/bot/exts/backend/error_handler.py +++ b/bot/exts/backend/error_handler.py @@ -1,7 +1,6 @@ import contextlib import difflib import logging -import random import typing as t from discord import Embed @@ -10,10 +9,9 @@ from sentry_sdk import push_scope from bot.api import ResponseCodeError from bot.bot import Bot -from bot.constants import Colours, ERROR_REPLIES, Icons, MODERATION_ROLES +from bot.constants import Colours, Icons, MODERATION_ROLES from bot.converters import TagNameConverter from bot.errors import InvalidInfractedUser, LockedResourceError -from bot.exts.backend.branding._errors import BrandingError from bot.utils.checks import InWhitelistCheckFailure log = logging.getLogger(__name__) @@ -79,9 +77,6 @@ class ErrorHandler(Cog): await self.handle_api_error(ctx, e.original) elif isinstance(e.original, LockedResourceError): await ctx.send(f"{e.original} Please wait for it to finish and try again later.") - elif isinstance(e.original, BrandingError): - await ctx.send(embed=self._get_error_embed(random.choice(ERROR_REPLIES), str(e.original))) - return elif isinstance(e.original, InvalidInfractedUser): await ctx.send(f"Cannot infract that user. {e.original.reason}") else: |