diff options
Diffstat (limited to '')
| -rw-r--r-- | Pipfile | 1 | ||||
| -rw-r--r-- | Pipfile.lock | 10 | ||||
| -rw-r--r-- | bot/decorators.py | 23 | ||||
| -rw-r--r-- | bot/errors.py | 6 | ||||
| -rw-r--r-- | bot/exts/backend/branding/__init__.py | 6 | ||||
| -rw-r--r-- | bot/exts/backend/branding/_cog.py | 887 | ||||
| -rw-r--r-- | bot/exts/backend/branding/_constants.py | 51 | ||||
| -rw-r--r-- | bot/exts/backend/branding/_decorators.py | 27 | ||||
| -rw-r--r-- | bot/exts/backend/branding/_errors.py | 2 | ||||
| -rw-r--r-- | bot/exts/backend/branding/_repository.py | 240 | ||||
| -rw-r--r-- | bot/exts/backend/branding/_seasons.py | 175 | ||||
| -rw-r--r-- | bot/exts/backend/error_handler.py | 7 | 
12 files changed, 766 insertions, 669 deletions
| @@ -23,6 +23,7 @@ lxml = "~=4.4"  markdownify = "==0.5.3"  more_itertools = "~=8.2"  python-dateutil = "~=2.8" +python-frontmatter = "~=1.0.0"  pyyaml = "~=5.1"  requests = "~=2.22"  sentry-sdk = "~=0.19" diff --git a/Pipfile.lock b/Pipfile.lock index d16cef2a8..cbec48ef0 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@  {      "_meta": {          "hash": { -            "sha256": "e5b57ca7276af4709b345055d4b3705c4142c61c4669c796b79a73379ec37a9a" +            "sha256": "91b5639198b35740611e7ac923cfc262e5897b8cbc3ca243dc98335705804ba7"          },          "pipfile-spec": 6,          "requires": { @@ -613,6 +613,14 @@              "index": "pypi",              "version": "==2.8.1"          }, +        "python-frontmatter": { +            "hashes": [ +                "sha256:766ae75f1b301ffc5fe3494339147e0fd80bc3deff3d7590a93991978b579b08", +                "sha256:e98152e977225ddafea6f01f40b4b0f1de175766322004c826ca99842d19a7cd" +            ], +            "index": "pypi", +            "version": "==1.0.0" +        },          "pytz": {              "hashes": [                  "sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da", diff --git a/bot/decorators.py b/bot/decorators.py index 063c8f878..0b50cc365 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -1,4 +1,5 @@  import asyncio +import functools  import logging  import typing as t  from contextlib import suppress @@ -8,7 +9,7 @@ from discord import Member, NotFound  from discord.ext import commands  from discord.ext.commands import Cog, Context -from bot.constants import Channels, RedirectOutput +from bot.constants import Channels, DEBUG_MODE, RedirectOutput  from bot.utils import function  from bot.utils.checks import in_whitelist_check @@ -153,3 +154,23 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable:                  await func(*args, **kwargs)          return wrapper      return decorator + + +def mock_in_debug(return_value: t.Any) -> t.Callable: +    """ +    Short-circuit function execution if in debug mode and return `return_value`. + +    The original function name, and the incoming args and kwargs are DEBUG level logged +    upon each call. This is useful for expensive operations, i.e. media asset uploads +    that are prone to rate-limits but need to be tested extensively. +    """ +    def decorator(func: t.Callable) -> t.Callable: +        @functools.wraps(func) +        async def wrapped(*args, **kwargs) -> t.Any: +            """Short-circuit and log if in debug mode.""" +            if DEBUG_MODE: +                log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}") +                return return_value +            return await func(*args, **kwargs) +        return wrapped +    return decorator diff --git a/bot/errors.py b/bot/errors.py index ab0adcd42..3544c6320 100644 --- a/bot/errors.py +++ b/bot/errors.py @@ -35,3 +35,9 @@ class InvalidInfractedUser(Exception):          self.reason = reason          super().__init__(reason) + + +class BrandingMisconfiguration(RuntimeError): +    """Raised by the Branding cog when a misconfigured event is encountered.""" + +    pass diff --git a/bot/exts/backend/branding/__init__.py b/bot/exts/backend/branding/__init__.py index 81ea3bf49..20a747b7f 100644 --- a/bot/exts/backend/branding/__init__.py +++ b/bot/exts/backend/branding/__init__.py @@ -1,7 +1,7 @@  from bot.bot import Bot -from bot.exts.backend.branding._cog import BrandingManager +from bot.exts.backend.branding._cog import Branding  def setup(bot: Bot) -> None: -    """Loads BrandingManager cog.""" -    bot.add_cog(BrandingManager(bot)) +    """Load Branding cog.""" +    bot.add_cog(Branding(bot)) diff --git a/bot/exts/backend/branding/_cog.py b/bot/exts/backend/branding/_cog.py index 20df83a89..0a4ddcc88 100644 --- a/bot/exts/backend/branding/_cog.py +++ b/bot/exts/backend/branding/_cog.py @@ -1,566 +1,647 @@  import asyncio -import itertools +import contextlib  import logging  import random  import typing as t  from datetime import datetime, time, timedelta +from enum import Enum +from operator import attrgetter -import arrow  import async_timeout  import discord  from async_rediscache import RedisCache -from discord.ext import commands +from discord.ext import commands, tasks  from bot.bot import Bot -from bot.constants import Branding, Colours, Emojis, Guild, MODERATION_ROLES -from bot.exts.backend.branding import _constants, _decorators, _errors, _seasons +from bot.constants import Branding as BrandingConfig, Channels, Colours, Guild, MODERATION_ROLES +from bot.decorators import mock_in_debug +from bot.exts.backend.branding._repository import BrandingRepository, Event, RemoteObject  log = logging.getLogger(__name__) -class GitHubFile(t.NamedTuple): +class AssetType(Enum):      """ -    Represents a remote file on GitHub. +    Recognised Discord guild asset types. -    The `sha` hash is kept so that we can determine that a file has changed, -    despite its filename remaining unchanged. +    The value of each member corresponds exactly to a kwarg that can be passed to `Guild.edit`.      """ -    download_url: str -    path: str -    sha: str +    BANNER = "banner" +    ICON = "icon" -def pretty_files(files: t.Iterable[GitHubFile]) -> str: -    """Provide a human-friendly representation of `files`.""" -    return "\n".join(file.path for file in files) +def compound_hash(objects: t.Iterable[RemoteObject]) -> str: +    """ +    Join SHA attributes of `objects` into a single string. + +    Compound hashes are cached to check for change in any of the member `objects`. +    """ +    return "-".join(item.sha for item in objects) + + +def make_embed(title: str, description: str, *, success: bool) -> discord.Embed: +    """ +    Construct simple response embed. + +    If `success` is True, use green colour, otherwise red. + +    For both `title` and `description`, empty string are valid values ~ fields will be empty. +    """ +    colour = Colours.soft_green if success else Colours.soft_red +    return discord.Embed(title=title[:256], description=description[:2048], colour=colour) -def time_until_midnight() -> timedelta: +def extract_event_duration(event: Event) -> str:      """ -    Determine amount of time until the next-up UTC midnight. +    Extract a human-readable, year-agnostic duration string from `event`. -    The exact `midnight` moment is actually delayed to 5 seconds after, in order -    to avoid potential problems due to imprecise sleep. +    In the case that `event` is a fallback event, resolves to 'Fallback'.      """ -    now = datetime.utcnow() -    tomorrow = now + timedelta(days=1) -    midnight = datetime.combine(tomorrow, time(second=5)) +    if event.meta.is_fallback: +        return "Fallback" -    return midnight - now +    fmt = "%B %d"  # Ex: August 23 +    start_date = event.meta.start_date.strftime(fmt) +    end_date = event.meta.end_date.strftime(fmt) +    return f"{start_date} - {end_date}" -class BrandingManager(commands.Cog): + +def extract_event_name(event: Event) -> str:      """ -    Manages the guild's branding. - -    The purpose of this cog is to help automate the synchronization of the branding -    repository with the guild. It is capable of discovering assets in the repository -    via GitHub's API, resolving download urls for them, and delegating -    to the `bot` instance to upload them to the guild. - -    BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens -    once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single -    season. The daemon can be turned on and off via the `daemon` cmd group. The value set via -    its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will -    automatically start on the next bot start-up. Otherwise, it will wait to be started manually. - -    All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can -    also be invoked manually, via the following API: - -        branding list -            - Show all available seasons - -        branding set <season_name> -            - Set the cog's internal state to represent `season_name`, if it exists -            - If no `season_name` is given, set chronologically current season -            - This will not automatically apply the season's branding to the guild, -              the cog's state can be detached from the guild -            - Seasons can therefore be 'previewed' using this command - -        branding info -            - View detailed information about resolved assets for current season - -        branding refresh -            - Refresh internal state, i.e. synchronize with branding repository - -        branding apply -            - Apply the current internal state to the guild, i.e. upload the assets - -        branding cycle -            - If there are multiple available icons for current season, randomly pick -              and apply the next one - -    The daemon calls these methods autonomously as appropriate. The use of this cog -    is locked to moderation roles. As it performs media asset uploads, it is prone to -    rate-limits - the `apply` command should be used with caution. The `set` command can, -    however, be used freely to 'preview' seasonal branding and check whether paths have been -    resolved as appropriate. - -    While the bot is in debug mode, it will 'mock' asset uploads by logging the passed -    download urls and pretending that the upload was successful. Make use of this -    to test this cog's behaviour. +    Extract title-cased event name from the path of `event`. + +    An event with a path of 'events/black_history_month' will resolve to 'Black History Month'.      """ +    name = event.path.split("/")[-1]  # Inner-most directory name. +    words = name.split("_")  # Words from snake case. -    current_season: t.Type[_seasons.SeasonBase] +    return " ".join(word.title() for word in words) -    banner: t.Optional[GitHubFile] -    available_icons: t.List[GitHubFile] -    remaining_icons: t.List[GitHubFile] +class Branding(commands.Cog): +    """ +    Guild branding management. -    days_since_cycle: t.Iterator +    Extension responsible for automatic synchronisation of the guild's branding with the branding repository. +    Event definitions and assets are automatically discovered and applied as appropriate. -    daemon: t.Optional[asyncio.Task] +    All state is stored in Redis. The cog should therefore seamlessly transition across restarts and maintain +    a consistent icon rotation schedule for events with multiple icon assets. -    # Branding configuration -    branding_configuration = RedisCache() +    By caching hashes of banner & icon assets, we discover changes in currently applied assets and always keep +    the latest version applied. + +    The command interface allows moderators+ to control the daemon or request asset synchronisation, while +    regular users can see information about the current event and the overall event schedule. +    """ + +    # RedisCache[ +    #     "daemon_active": bool            | If True, daemon starts on start-up. Controlled via commands. +    #     "event_path": str                | Current event's path in the branding repo. +    #     "event_description": str         | Current event's Markdown description. +    #     "event_duration": str            | Current event's human-readable date range. +    #     "banner_hash": str               | SHA of the currently applied banner. +    #     "icons_hash": str                | Compound SHA of all icons in current rotation. +    #     "last_rotation_timestamp": float | POSIX UTC timestamp. +    # ] +    cache_information = RedisCache() + +    # Icons in current rotation. Keys (str) are download URLs, values (int) track the amount of times each +    # icon has been used in the current rotation. +    cache_icons = RedisCache() + +    # All available event names & durations. Cached by the daemon nightly; read by the calendar command. +    cache_events = RedisCache()      def __init__(self, bot: Bot) -> None: +        """Instantiate repository abstraction & allow daemon to start.""" +        self.bot = bot +        self.repository = BrandingRepository(bot) + +        self.bot.loop.create_task(self.maybe_start_daemon())  # Start depending on cache. + +    # region: Internal logic & state management + +    @mock_in_debug(return_value=True)  # Mocked in development environment to prevent API spam. +    async def apply_asset(self, asset_type: AssetType, download_url: str) -> bool:          """ -        Assign safe default values on init. +        Download asset from `download_url` and apply it to PyDis as `asset_type`. -        At this point, we don't have information about currently available branding. -        Most of these attributes will be overwritten once the daemon connects, or once -        the `refresh` command is used. +        Return a boolean indicating whether the application was successful.          """ -        self.bot = bot -        self.current_season = _seasons.get_current_season() +        log.info(f"Applying '{asset_type.value}' asset to the guild.") -        self.banner = None +        try: +            file = await self.repository.fetch_file(download_url) +        except Exception: +            log.exception(f"Failed to fetch '{asset_type.value}' asset.") +            return False -        self.available_icons = [] -        self.remaining_icons = [] +        await self.bot.wait_until_guild_available() +        pydis: discord.Guild = self.bot.get_guild(Guild.id) -        self.days_since_cycle = itertools.cycle([None]) +        timeout = 10  # Seconds. +        try: +            with async_timeout.timeout(timeout):  # Raise after `timeout` seconds. +                await pydis.edit(**{asset_type.value: file}) +        except discord.HTTPException: +            log.exception("Asset upload to Discord failed.") +            return False +        except asyncio.TimeoutError: +            log.error(f"Asset upload to Discord timed out after {timeout} seconds.") +            return False +        else: +            log.trace("Asset uploaded successfully.") +            return True -        self.daemon = None -        self._startup_task = self.bot.loop.create_task(self._initial_start_daemon()) +    async def apply_banner(self, banner: RemoteObject) -> bool: +        """ +        Apply `banner` to the guild and cache its hash if successful. + +        Banners should always be applied via this method to ensure that the last hash is cached. + +        Return a boolean indicating whether the application was successful. +        """ +        success = await self.apply_asset(AssetType.BANNER, banner.download_url) -    async def _initial_start_daemon(self) -> None: -        """Checks is daemon active and when is, start it at cog load.""" -        if await self.branding_configuration.get("daemon_active"): -            self.daemon = self.bot.loop.create_task(self._daemon_func()) +        if success: +            await self.cache_information.set("banner_hash", banner.sha) -    @property -    def _daemon_running(self) -> bool: -        """True if the daemon is currently active, False otherwise.""" -        return self.daemon is not None and not self.daemon.done() +        return success -    async def _daemon_func(self) -> None: +    async def rotate_icons(self) -> bool:          """ -        Manage all automated behaviour of the BrandingManager cog. +        Choose and apply the next-up icon in rotation. + +        We keep track of the amount of times each icon has been used. The values in `cache_icons` can be understood +        to be iteration IDs. When an icon is chosen & applied, we bump its count, pushing it into the next iteration. -        Once a day, the daemon will perform the following tasks: -            - Update `current_season` -            - Poll GitHub API to see if the available branding for `current_season` has changed -            - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname) -            - Check whether it's time to cycle guild icons +        Once the current iteration (lowest count in the cache) depletes, we move onto the next iteration. -        The internal loop runs once when activated, then periodically at the time -        given by `time_until_midnight`. +        In the case that there is only 1 icon in the rotation and has already been applied, do nothing. -        All method calls in the internal loop are considered safe, i.e. no errors propagate -        to the daemon's loop. The daemon itself does not perform any error handling on its own. +        Return a boolean indicating whether a new icon was applied successfully.          """ -        await self.bot.wait_until_guild_available() +        log.debug("Rotating icons.") -        while True: -            self.current_season = _seasons.get_current_season() -            branding_changed = await self.refresh() +        state = await self.cache_icons.to_dict() +        log.trace(f"Total icons in rotation: {len(state)}.") -            if branding_changed: -                await self.apply() +        if not state:  # This would only happen if rotation not initiated, but we can handle gracefully. +            log.warning("Attempted icon rotation with an empty icon cache. This indicates wrong logic.") +            return False -            elif next(self.days_since_cycle) == Branding.cycle_frequency: -                await self.cycle() +        if len(state) == 1 and 1 in state.values(): +            log.debug("Aborting icon rotation: only 1 icon is available and has already been applied.") +            return False -            until_midnight = time_until_midnight() -            await asyncio.sleep(until_midnight.total_seconds()) +        current_iteration = min(state.values())  # Choose iteration to draw from. +        options = [download_url for download_url, times_used in state.items() if times_used == current_iteration] -    async def _info_embed(self) -> discord.Embed: -        """Make an informative embed representing current season.""" -        info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour) +        log.trace(f"Choosing from {len(options)} icons in iteration {current_iteration}.") +        next_icon = random.choice(options) -        # If we're in a non-evergreen season, also show active months -        if self.current_season is not _seasons.SeasonBase: -            title = f"{self.current_season.season_name} ({', '.join(str(m) for m in self.current_season.months)})" -        else: -            title = self.current_season.season_name +        success = await self.apply_asset(AssetType.ICON, next_icon) + +        if success: +            await self.cache_icons.increment(next_icon)  # Push the icon into the next iteration. + +            timestamp = datetime.utcnow().timestamp() +            await self.cache_information.set("last_rotation_timestamp", timestamp) + +        return success -        # Use the author field to show the season's name and avatar if available -        info_embed.set_author(name=title) +    async def maybe_rotate_icons(self) -> None: +        """ +        Call `rotate_icons` if the configured amount of time has passed since last rotation. -        banner = self.banner.path if self.banner is not None else "Unavailable" -        info_embed.add_field(name="Banner", value=banner, inline=False) +        We offset the calculated time difference into the future to avoid off-by-a-little-bit errors. Because there +        is work to be done before the timestamp is read and written, the next read will likely commence slightly +        under 24 hours after the last write. +        """ +        log.debug("Checking whether it's time for icons to rotate.") -        icons = pretty_files(self.available_icons) or "Unavailable" -        info_embed.add_field(name="Available icons", value=icons, inline=False) +        last_rotation_timestamp = await self.cache_information.get("last_rotation_timestamp") -        # Only display cycle frequency if we're actually cycling -        if len(self.available_icons) > 1 and Branding.cycle_frequency: -            info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}") +        if last_rotation_timestamp is None:  # Maiden case ~ never rotated. +            await self.rotate_icons() +            return -        return info_embed +        last_rotation = datetime.fromtimestamp(last_rotation_timestamp) +        difference = (datetime.utcnow() - last_rotation) + timedelta(minutes=5) -    async def _reset_remaining_icons(self) -> None: -        """Set `remaining_icons` to a shuffled copy of `available_icons`.""" -        self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons)) +        log.trace(f"Icons last rotated at {last_rotation} (difference: {difference}).") -    async def _reset_days_since_cycle(self) -> None: +        if difference.days >= BrandingConfig.cycle_frequency: +            await self.rotate_icons() + +    async def initiate_icon_rotation(self, available_icons: t.List[RemoteObject]) -> None:          """ -        Reset the `days_since_cycle` iterator based on configured frequency. +        Set up a new icon rotation. -        If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey, -        the iterator will always yield None. This signals that the icon shouldn't be cycled. +        This function should be called whenever available icons change. This is generally the case when we enter +        a new event, but potentially also when the assets of an on-going event change. In such cases, a reset +        of `cache_icons` is necessary, because it contains download URLs which may have gotten stale. -        Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely. -        When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle. +        This function does not upload a new icon!          """ -        if len(self.available_icons) > 1 and Branding.cycle_frequency: -            sequence = range(1, Branding.cycle_frequency + 1) -        else: -            sequence = [None] +        log.debug("Initiating new icon rotation.") -        self.days_since_cycle = itertools.cycle(sequence) +        await self.cache_icons.clear() -    async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]: +        new_state = {icon.download_url: 0 for icon in available_icons} +        await self.cache_icons.update(new_state) + +        log.trace(f"Icon rotation initiated for {len(new_state)} icons.") + +        await self.cache_information.set("icons_hash", compound_hash(available_icons)) + +    async def send_info_embed(self, channel_id: int, *, is_notification: bool) -> None:          """ -        Get files at `path` in the branding repository. +        Send the currently cached event description to `channel_id`. -        If `include_dirs` is False (default), only returns files at `path`. -        Otherwise, will return both files and directories. Never returns symlinks. +        When `is_notification` holds, a short contextual message for the #changelog channel is added. -        Return dict mapping from filename to corresponding `GitHubFile` instance. -        This may return an empty dict if the response status is non-200, -        or if the target directory is empty. +        We read event information from `cache_information`. The caller is therefore responsible for making +        sure that the cache is up-to-date before calling this function.          """ -        url = f"{_constants.BRANDING_URL}/{path}" -        async with self.bot.http_session.get( -            url, headers=_constants.HEADERS, params=_constants.PARAMS -        ) as resp: -            # Short-circuit if we get non-200 response -            if resp.status != _constants.STATUS_OK: -                log.error(f"GitHub API returned non-200 response: {resp}") -                return {} -            directory = await resp.json()  # Directory at `path` +        log.debug(f"Sending event information event to channel: {channel_id} ({is_notification=}).") -        allowed_types = {"file", "dir"} if include_dirs else {"file"} -        return { -            file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"]) -            for file in directory -            if file["type"] in allowed_types -        } +        await self.bot.wait_until_guild_available() +        channel: t.Optional[discord.TextChannel] = self.bot.get_channel(channel_id) -    async def refresh(self) -> bool: +        if channel is None: +            log.warning(f"Cannot send event information: channel {channel_id} not found!") +            return + +        log.trace(f"Destination channel: #{channel.name}.") + +        description = await self.cache_information.get("event_description") +        duration = await self.cache_information.get("event_duration") + +        if None in (description, duration): +            content = None +            embed = make_embed("No event in cache", "Is the daemon enabled?", success=False) + +        else: +            content = "Python Discord is entering a new event!" if is_notification else None +            embed = discord.Embed(description=description[:2048], colour=discord.Colour.blurple()) +            embed.set_footer(text=duration[:2048]) + +        await channel.send(content=content, embed=embed) + +    async def enter_event(self, event: Event) -> t.Tuple[bool, bool]:          """ -        Synchronize available assets with branding repository. +        Apply `event` assets and update information cache. -        If the current season is not the evergreen, and lacks at least one asset, -        we use the evergreen seasonal dir as fallback for missing assets. +        We cache `event` information to ensure that we: +        * Remember which event we're currently in across restarts +        * Provide an on-demand informational embed without re-querying the branding repository -        Finally, if neither the seasonal nor fallback branding directories contain -        an asset, it will simply be ignored. +        An event change should always be handled via this function, as it ensures that the cache is populated. -        Return True if the branding has changed. This will be the case when we enter -        a new season, or when something changes in the current seasons's directory -        in the branding repository. +        The #changelog notification is omitted when `event` is fallback, or already applied. + +        Return a 2-tuple indicating whether the banner, and the icon, were applied successfully.          """ -        old_branding = (self.banner, self.available_icons) -        seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True) +        log.info(f"Entering event: '{event.path}'.") -        # Only make a call to the fallback directory if there is something to be gained -        branding_incomplete = any( -            asset not in seasonal_dir -            for asset in (_constants.FILE_BANNER, _constants.FILE_AVATAR, _constants.SERVER_ICONS) -        ) -        if branding_incomplete and self.current_season is not _seasons.SeasonBase: -            fallback_dir = await self._get_files( -                _seasons.SeasonBase.branding_path, include_dirs=True -            ) -        else: -            fallback_dir = {} +        banner_success = await self.apply_banner(event.banner)  # Only one asset ~ apply directly. -        # Resolve assets in this directory, None is a safe value -        self.banner = ( -            seasonal_dir.get(_constants.FILE_BANNER) -            or fallback_dir.get(_constants.FILE_BANNER) -        ) +        await self.initiate_icon_rotation(event.icons)  # Prepare a new rotation. +        icon_success = await self.rotate_icons()  # Apply an icon from the new rotation. -        # Now resolve server icons by making a call to the proper sub-directory -        if _constants.SERVER_ICONS in seasonal_dir: -            icons_dir = await self._get_files( -                f"{self.current_season.branding_path}/{_constants.SERVER_ICONS}" -            ) -            self.available_icons = list(icons_dir.values()) +        # This will only be False in the case of a manual same-event re-synchronisation. +        event_changed = event.path != await self.cache_information.get("event_path") -        elif _constants.SERVER_ICONS in fallback_dir: -            icons_dir = await self._get_files( -                f"{_seasons.SeasonBase.branding_path}/{_constants.SERVER_ICONS}" -            ) -            self.available_icons = list(icons_dir.values()) +        # Cache event identity to avoid re-entry in case of restart. +        await self.cache_information.set("event_path", event.path) +        # Cache information shown in the 'about' embed. +        await self.populate_cache_event_description(event) + +        # Notify guild of new event ~ this reads the information that we cached above. +        if event_changed and not event.meta.is_fallback: +            await self.send_info_embed(Channels.change_log, is_notification=True)          else: -            self.available_icons = []  # This should never be the case, but an empty list is a safe value +            log.trace("Omitting #changelog notification. Event has not changed, or new event is fallback.") -        # GitHubFile instances carry a `sha` attr so this will pick up if a file changes -        branding_changed = old_branding != (self.banner, self.available_icons) +        return banner_success, icon_success -        if branding_changed: -            log.info(f"New branding detected (season: {self.current_season.season_name})") -            await self._reset_remaining_icons() -            await self._reset_days_since_cycle() +    async def synchronise(self) -> t.Tuple[bool, bool]: +        """ +        Fetch the current event and delegate to `enter_event`. -        return branding_changed +        This is a convenience function to force synchronisation via a command. It should generally only be used +        in a recovery scenario. In the usual case, the daemon already has an `Event` instance and can pass it +        to `enter_event` directly. -    async def cycle(self) -> bool: +        Return a 2-tuple indicating whether the banner, and the icon, were applied successfully.          """ -        Apply the next-up server icon. +        log.debug("Synchronise: fetching current event.") -        Returns True if an icon is available and successfully gets applied, False otherwise. -        """ -        if not self.available_icons: -            log.info("Cannot cycle: no icons for this season") -            return False +        current_event, available_events = await self.repository.get_current_event() -        if not self.remaining_icons: -            log.info("Reset & shuffle remaining icons") -            await self._reset_remaining_icons() +        await self.populate_cache_events(available_events) -        next_up = self.remaining_icons.pop(0) -        success = await self.set_icon(next_up.download_url) +        if current_event is None: +            log.error("Failed to fetch event. Cannot synchronise!") +            return False, False -        return success +        return await self.enter_event(current_event) -    async def apply(self) -> t.List[str]: +    async def populate_cache_events(self, events: t.List[Event]) -> None:          """ -        Apply current branding to the guild and bot. - -        This delegates to the bot instance to do all the work. We only provide download urls -        for available assets. Assets unavailable in the branding repo will be ignored. +        Clear `cache_events` and re-populate with names and durations of `events`. -        Returns a list of names of all failed assets. An asset is considered failed -        if it isn't found in the branding repo, or if something goes wrong while the -        bot is trying to apply it. +        For each event, we store its name and duration string. This is the information presented to users in the +        calendar command. If a format change is needed, it has to be done here. -        An empty list denotes that all assets have been applied successfully. +        The cache does not store the fallback event, as it is not shown in the calendar.          """ -        report = {asset: False for asset in ("banner", "icon")} +        log.debug("Populating events cache.") -        if self.banner is not None: -            report["banner"] = await self.set_banner(self.banner.download_url) +        await self.cache_events.clear() -        report["icon"] = await self.cycle() +        no_fallback = [event for event in events if not event.meta.is_fallback] +        chronological_events = sorted(no_fallback, key=attrgetter("meta.start_date")) -        failed_assets = [asset for asset, succeeded in report.items() if not succeeded] -        return failed_assets +        log.trace(f"Writing {len(chronological_events)} events (fallback omitted).") -    @commands.has_any_role(*MODERATION_ROLES) -    @commands.group(name="branding") -    async def branding_cmds(self, ctx: commands.Context) -> None: -        """Manual branding control.""" -        if not ctx.invoked_subcommand: -            await ctx.send_help(ctx.command) +        with contextlib.suppress(ValueError):  # Cache raises when updated with an empty dict. +            await self.cache_events.update({ +                extract_event_name(event): extract_event_duration(event) +                for event in chronological_events +            }) -    @branding_cmds.command(name="list", aliases=["ls"]) -    async def branding_list(self, ctx: commands.Context) -> None: -        """List all available seasons and branding sources.""" -        embed = discord.Embed(title="Available seasons", colour=Colours.soft_green) +    async def populate_cache_event_description(self, event: Event) -> None: +        """ +        Cache `event` description & duration. -        for season in _seasons.get_all_seasons(): -            if season is _seasons.SeasonBase: -                active_when = "always" -            else: -                active_when = f"in {', '.join(str(m) for m in season.months)}" +        This should be called when entering a new event, and can be called periodically to ensure that the cache +        holds fresh information in the case that the event remains the same, but its description changes. -            description = ( -                f"Active {active_when}\n" -                f"Branding: {season.branding_path}" -            ) -            embed.add_field(name=season.season_name, value=description, inline=False) +        The duration is stored formatted for the frontend. It is not intended to be used programmatically. +        """ +        log.debug("Caching event description & duration.") -        await ctx.send(embed=embed) +        await self.cache_information.set("event_description", event.meta.description) +        await self.cache_information.set("event_duration", extract_event_duration(event)) -    @branding_cmds.command(name="set") -    async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None: +    # endregion +    # region: Daemon + +    async def maybe_start_daemon(self) -> None:          """ -        Manually set season, or reset to current if none given. +        Start the daemon depending on cache state. -        Season search is a case-less comparison against both seasonal class name, -        and its `season_name` attr. +        The daemon will only start if it has been explicitly enabled via a command. +        """ +        log.debug("Checking whether daemon should start.") -        This only pre-loads the cog's internal state to the chosen season, but does not -        automatically apply the branding. As that is an expensive operation, the `apply` -        command must be called explicitly after this command finishes. +        should_begin: t.Optional[bool] = await self.cache_information.get("daemon_active")  # None if never set! -        This means that this command can be used to 'preview' a season gathering info -        about its available assets, without applying them to the guild. +        if should_begin: +            self.daemon_loop.start() -        If the daemon is running, it will automatically reset the season to current when -        it wakes up. The season set via this command can therefore remain 'detached' from -        what it should be - the daemon will make sure that it's set back properly. +    def cog_unload(self) -> None:          """ -        if season_name is None: -            new_season = _seasons.get_current_season() -        else: -            new_season = _seasons.get_season(season_name) -            if new_season is None: -                raise _errors.BrandingError("No such season exists") +        Cancel the daemon in case of cog unload. -        if self.current_season is new_season: -            raise _errors.BrandingError(f"Season {self.current_season.season_name} already active") +        This is **not** done automatically! The daemon otherwise remains active in the background. +        """ +        log.debug("Cog unload: cancelling daemon.") -        self.current_season = new_season -        await self.branding_refresh(ctx) +        self.daemon_loop.cancel() -    @branding_cmds.command(name="info", aliases=["status"]) -    async def branding_info(self, ctx: commands.Context) -> None: +    async def daemon_main(self) -> None:          """ -        Show available assets for current season. +        Synchronise guild & caches with branding repository. -        This can be used to confirm that assets have been resolved properly. -        When `apply` is used, it attempts to upload exactly the assets listed here. +        Pull the currently active event from the branding repository and check whether it matches the currently +        active event in the cache. If not, apply the new event. + +        However, it is also possible that an event's assets change as it's active. To account for such cases, +        we check the banner & icons hashes against the currently cached values. If there is a mismatch, each +        specific asset is re-applied.          """ -        await ctx.send(embed=await self._info_embed()) +        log.info("Daemon main: checking current event.") -    @branding_cmds.command(name="refresh") -    async def branding_refresh(self, ctx: commands.Context) -> None: -        """Sync currently available assets with branding repository.""" -        async with ctx.typing(): -            await self.refresh() -            await self.branding_info(ctx) +        new_event, available_events = await self.repository.get_current_event() + +        await self.populate_cache_events(available_events) + +        if new_event is None: +            log.warning("Daemon main: failed to get current event from branding repository, will do nothing.") +            return + +        if new_event.path != await self.cache_information.get("event_path"): +            log.debug("Daemon main: new event detected!") +            await self.enter_event(new_event) +            return + +        await self.populate_cache_event_description(new_event)  # Cache fresh frontend info in case of change. -    @branding_cmds.command(name="apply") -    async def branding_apply(self, ctx: commands.Context) -> None: +        log.trace("Daemon main: event has not changed, checking for change in assets.") + +        if new_event.banner.sha != await self.cache_information.get("banner_hash"): +            log.debug("Daemon main: detected banner change.") +            await self.apply_banner(new_event.banner) + +        if compound_hash(new_event.icons) != await self.cache_information.get("icons_hash"): +            log.debug("Daemon main: detected icon change.") +            await self.initiate_icon_rotation(new_event.icons) +            await self.rotate_icons() +        else: +            await self.maybe_rotate_icons() + +    @tasks.loop(hours=24) +    async def daemon_loop(self) -> None:          """ -        Apply current season's branding to the guild. +        Call `daemon_main` every 24 hours. -        Use `info` to check which assets will be applied. Shows which assets have -        failed to be applied, if any. +        The scheduler maintains an exact 24-hour frequency even if this coroutine takes time to complete. If the +        coroutine is started at 00:01 and completes at 00:05, it will still be started at 00:01 the next day.          """ -        async with ctx.typing(): -            failed_assets = await self.apply() -            if failed_assets: -                raise _errors.BrandingError( -                    f"Failed to apply following assets: {', '.join(failed_assets)}" -                ) +        log.trace("Daemon loop: calling daemon main.") -            response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green) -            await ctx.send(embed=response) +        try: +            await self.daemon_main() +        except Exception: +            log.exception("Daemon loop: failed with an unhandled exception!") -    @branding_cmds.command(name="cycle") -    async def branding_cycle(self, ctx: commands.Context) -> None: +    @daemon_loop.before_loop +    async def daemon_before(self) -> None:          """ -        Apply the next-up guild icon, if multiple are available. +        Call `daemon_loop` immediately, then block the loop until the next-up UTC midnight. -        The order is random. +        The first iteration is invoked directly such that synchronisation happens immediately after daemon start. +        We then calculate the time until the next-up midnight and sleep before letting `daemon_loop` begin.          """ -        async with ctx.typing(): -            success = await self.cycle() -            if not success: -                raise _errors.BrandingError("Failed to cycle icon") +        log.trace("Daemon before: performing start-up iteration.") + +        await self.daemon_loop() + +        log.trace("Daemon before: calculating time to sleep before loop begins.") +        now = datetime.utcnow() -            response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green) -            await ctx.send(embed=response) +        # The actual midnight moment is offset into the future to prevent issues with imprecise sleep. +        tomorrow = now + timedelta(days=1) +        midnight = datetime.combine(tomorrow, time(minute=1)) -    @branding_cmds.group(name="daemon", aliases=["d", "task"]) -    async def daemon_group(self, ctx: commands.Context) -> None: -        """Control the background daemon.""" +        sleep_secs = (midnight - now).total_seconds() +        log.trace(f"Daemon before: sleeping {sleep_secs} seconds before next-up midnight: {midnight}.") + +        await asyncio.sleep(sleep_secs) + +    # endregion +    # region: Command interface (branding) + +    @commands.group(name="branding") +    async def branding_group(self, ctx: commands.Context) -> None: +        """Control the branding cog."""          if not ctx.invoked_subcommand:              await ctx.send_help(ctx.command) -    @daemon_group.command(name="status") -    async def daemon_status(self, ctx: commands.Context) -> None: -        """Check whether daemon is currently active.""" -        if self._daemon_running: -            remaining_time = (arrow.utcnow() + time_until_midnight()).humanize() -            response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green) -            response.set_footer(text=f"Next refresh {remaining_time}") -        else: -            response = discord.Embed(description="Daemon not running", colour=Colours.soft_red) +    @branding_group.command(name="about", aliases=("current", "event")) +    async def branding_about_cmd(self, ctx: commands.Context) -> None: +        """Show the current event's description and duration.""" +        await self.send_info_embed(ctx.channel.id, is_notification=False) -        await ctx.send(embed=response) +    @commands.has_any_role(*MODERATION_ROLES) +    @branding_group.command(name="sync") +    async def branding_sync_cmd(self, ctx: commands.Context) -> None: +        """ +        Force branding synchronisation. -    @daemon_group.command(name="start") -    async def daemon_start(self, ctx: commands.Context) -> None: -        """If the daemon isn't running, start it.""" -        if self._daemon_running: -            raise _errors.BrandingError("Daemon already running!") +        Show which assets have failed to synchronise, if any. +        """ +        async with ctx.typing(): +            banner_success, icon_success = await self.synchronise() -        self.daemon = self.bot.loop.create_task(self._daemon_func()) -        await self.branding_configuration.set("daemon_active", True) +        failed_assets = ", ".join( +            name +            for name, status in [("banner", banner_success), ("icon", icon_success)] +            if status is False +        ) -        response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green) -        await ctx.send(embed=response) +        if failed_assets: +            resp = make_embed("Synchronisation unsuccessful", f"Failed to apply: {failed_assets}.", success=False) +            resp.set_footer(text="Check log for details.") +        else: +            resp = make_embed("Synchronisation successful", "Assets have been applied.", success=True) -    @daemon_group.command(name="stop") -    async def daemon_stop(self, ctx: commands.Context) -> None: -        """If the daemon is running, stop it.""" -        if not self._daemon_running: -            raise _errors.BrandingError("Daemon not running!") +        await ctx.send(embed=resp) -        self.daemon.cancel() -        await self.branding_configuration.set("daemon_active", False) +    # endregion +    # region: Command interface (branding calendar) + +    @branding_group.group(name="calendar", aliases=("schedule", "events")) +    async def branding_calendar_group(self, ctx: commands.Context) -> None: +        """ +        Show the current event calendar. -        response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green) -        await ctx.send(embed=response) +        We draw event information from `cache_events` and use each key-value pair to create a field in the response +        embed. As such, we do not need to query the API to get event information. The cache is automatically +        re-populated by the daemon whenever it makes a request. A moderator+ can also explicitly request a cache +        refresh using the 'refresh' subcommand. -    async def _fetch_image(self, url: str) -> bytes: -        """Retrieve and read image from `url`.""" -        log.debug(f"Getting image from: {url}") -        async with self.bot.http_session.get(url) as resp: -            return await resp.read() +        Due to Discord limitations, we only show up to 25 events. This is entirely sufficient at the time of writing. +        In the case that we find ourselves with more than 25 events, a warning log will alert core devs. -    async def _apply_asset(self, target: discord.Guild, asset: _constants.AssetType, url: str) -> bool: +        In the future, we may be interested in a field-paginating solution.          """ -        Internal method for applying media assets to the guild. +        if ctx.invoked_subcommand: +            # If you're wondering why this works: when the 'refresh' subcommand eventually re-invokes +            # this group, the attribute will be automatically set to None by the framework. +            return + +        available_events = await self.cache_events.to_dict() +        log.trace(f"Found {len(available_events)} cached events available for calendar view.") + +        if not available_events: +            resp = make_embed("No events found!", "Cache may be empty, try `branding calendar refresh`.", success=False) +            await ctx.send(embed=resp) +            return + +        embed = discord.Embed(title="Current event calendar", colour=discord.Colour.blurple()) + +        # Because Discord embeds can only contain up to 25 fields, we only show the first 25. +        first_25 = list(available_events.items())[:25] -        This shouldn't be called directly. The purpose of this method is mainly generic -        error handling to reduce needless code repetition. +        if len(first_25) != len(available_events):  # Alert core devs that a paginating solution is now necessary. +            log.warning(f"There are {len(available_events)} events, but the calendar view can only display 25.") -        Return True if upload was successful, False otherwise. +        for name, duration in first_25: +            embed.add_field(name=name[:256], value=duration[:1024]) + +        embed.set_footer(text="Otherwise, the fallback season is used.") + +        await ctx.send(embed=embed) + +    @commands.has_any_role(*MODERATION_ROLES) +    @branding_calendar_group.command(name="refresh") +    async def branding_calendar_refresh_cmd(self, ctx: commands.Context) -> None:          """ -        log.info(f"Attempting to set {asset.name}: {url}") +        Refresh event cache and show current event calendar. -        kwargs = {asset.value: await self._fetch_image(url)} -        try: -            async with async_timeout.timeout(5): -                await target.edit(**kwargs) +        Supplementary subcommand allowing force-refreshing the event cache. Implemented as a subcommand because +        unlike the supergroup, it requires moderator privileges. +        """ +        log.info("Performing command-requested event cache refresh.") -        except asyncio.TimeoutError: -            log.info("Asset upload timed out") -            return False +        async with ctx.typing(): +            available_events = await self.repository.get_events() +            await self.populate_cache_events(available_events) -        except discord.HTTPException as discord_error: -            log.exception("Asset upload failed", exc_info=discord_error) -            return False +        await ctx.invoke(self.branding_calendar_group) + +    # endregion +    # region: Command interface (branding daemon) + +    @commands.has_any_role(*MODERATION_ROLES) +    @branding_group.group(name="daemon", aliases=("d",)) +    async def branding_daemon_group(self, ctx: commands.Context) -> None: +        """Control the branding cog's daemon.""" +        if not ctx.invoked_subcommand: +            await ctx.send_help(ctx.command) + +    @branding_daemon_group.command(name="enable", aliases=("start", "on")) +    async def branding_daemon_enable_cmd(self, ctx: commands.Context) -> None: +        """Enable the branding daemon.""" +        await self.cache_information.set("daemon_active", True) +        if self.daemon_loop.is_running(): +            resp = make_embed("Daemon is already enabled!", "", success=False)          else: -            log.info("Asset successfully applied") -            return True +            self.daemon_loop.start() +            resp = make_embed("Daemon enabled!", "It will now automatically awaken on start-up.", success=True) -    @_decorators.mock_in_debug(return_value=True) -    async def set_banner(self, url: str) -> bool: -        """Set the guild's banner to image at `url`.""" -        guild = self.bot.get_guild(Guild.id) -        if guild is None: -            log.info("Failed to get guild instance, aborting asset upload") -            return False +        await ctx.send(embed=resp) -        return await self._apply_asset(guild, _constants.AssetType.BANNER, url) +    @branding_daemon_group.command(name="disable", aliases=("stop", "off")) +    async def branding_daemon_disable_cmd(self, ctx: commands.Context) -> None: +        """Disable the branding daemon.""" +        await self.cache_information.set("daemon_active", False) -    @_decorators.mock_in_debug(return_value=True) -    async def set_icon(self, url: str) -> bool: -        """Sets the guild's icon to image at `url`.""" -        guild = self.bot.get_guild(Guild.id) -        if guild is None: -            log.info("Failed to get guild instance, aborting asset upload") -            return False +        if self.daemon_loop.is_running(): +            self.daemon_loop.cancel() +            resp = make_embed("Daemon disabled!", "It will not awaken on start-up.", success=True) +        else: +            resp = make_embed("Daemon is already disabled!", "", success=False) -        return await self._apply_asset(guild, _constants.AssetType.SERVER_ICON, url) +        await ctx.send(embed=resp) -    def cog_unload(self) -> None: -        """Cancels startup and daemon task.""" -        self._startup_task.cancel() -        if self.daemon is not None: -            self.daemon.cancel() +    @branding_daemon_group.command(name="status") +    async def branding_daemon_status_cmd(self, ctx: commands.Context) -> None: +        """Check whether the daemon is currently enabled.""" +        if self.daemon_loop.is_running(): +            resp = make_embed("Daemon is enabled", "Use `branding daemon disable` to stop.", success=True) +        else: +            resp = make_embed("Daemon is disabled", "Use `branding daemon enable` to start.", success=False) + +        await ctx.send(embed=resp) + +    # endregion diff --git a/bot/exts/backend/branding/_constants.py b/bot/exts/backend/branding/_constants.py deleted file mode 100644 index ca8e8c5f5..000000000 --- a/bot/exts/backend/branding/_constants.py +++ /dev/null @@ -1,51 +0,0 @@ -from enum import Enum, IntEnum - -from bot.constants import Keys - - -class Month(IntEnum): -    """All month constants for seasons.""" - -    JANUARY = 1 -    FEBRUARY = 2 -    MARCH = 3 -    APRIL = 4 -    MAY = 5 -    JUNE = 6 -    JULY = 7 -    AUGUST = 8 -    SEPTEMBER = 9 -    OCTOBER = 10 -    NOVEMBER = 11 -    DECEMBER = 12 - -    def __str__(self) -> str: -        return self.name.title() - - -class AssetType(Enum): -    """ -    Discord media assets. - -    The values match exactly the kwarg keys that can be passed to `Guild.edit`. -    """ - -    BANNER = "banner" -    SERVER_ICON = "icon" - - -STATUS_OK = 200  # HTTP status code - -FILE_BANNER = "banner.png" -FILE_AVATAR = "avatar.png" -SERVER_ICONS = "server_icons" - -BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" - -PARAMS = {"ref": "main"}  # Target branch -HEADERS = {"Accept": "application/vnd.github.v3+json"}  # Ensure we use API v3 - -# A GitHub token is not necessary for the cog to operate, -# unauthorized requests are however limited to 60 per hour -if Keys.github: -    HEADERS["Authorization"] = f"token {Keys.github}" diff --git a/bot/exts/backend/branding/_decorators.py b/bot/exts/backend/branding/_decorators.py deleted file mode 100644 index 6a1e7e869..000000000 --- a/bot/exts/backend/branding/_decorators.py +++ /dev/null @@ -1,27 +0,0 @@ -import functools -import logging -import typing as t - -from bot.constants import DEBUG_MODE - -log = logging.getLogger(__name__) - - -def mock_in_debug(return_value: t.Any) -> t.Callable: -    """ -    Short-circuit function execution if in debug mode and return `return_value`. - -    The original function name, and the incoming args and kwargs are DEBUG level logged -    upon each call. This is useful for expensive operations, i.e. media asset uploads -    that are prone to rate-limits but need to be tested extensively. -    """ -    def decorator(func: t.Callable) -> t.Callable: -        @functools.wraps(func) -        async def wrapped(*args, **kwargs) -> t.Any: -            """Short-circuit and log if in debug mode.""" -            if DEBUG_MODE: -                log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}") -                return return_value -            return await func(*args, **kwargs) -        return wrapped -    return decorator diff --git a/bot/exts/backend/branding/_errors.py b/bot/exts/backend/branding/_errors.py deleted file mode 100644 index 7cd271af3..000000000 --- a/bot/exts/backend/branding/_errors.py +++ /dev/null @@ -1,2 +0,0 @@ -class BrandingError(Exception): -    """Exception raised by the BrandingManager cog.""" diff --git a/bot/exts/backend/branding/_repository.py b/bot/exts/backend/branding/_repository.py new file mode 100644 index 000000000..7b09d4641 --- /dev/null +++ b/bot/exts/backend/branding/_repository.py @@ -0,0 +1,240 @@ +import logging +import typing as t +from datetime import date, datetime + +import frontmatter + +from bot.bot import Bot +from bot.constants import Keys +from bot.errors import BrandingMisconfiguration + +# Base URL for requests into the branding repository. +BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" + +PARAMS = {"ref": "main"}  # Target branch. +HEADERS = {"Accept": "application/vnd.github.v3+json"}  # Ensure we use API v3. + +# A GitHub token is not necessary. However, unauthorized requests are limited to 60 per hour. +if Keys.github: +    HEADERS["Authorization"] = f"token {Keys.github}" + +# Since event periods are year-agnostic, we parse them into `datetime` objects with a manually inserted year. +# Please note that this is intentionally a leap year to allow Feb 29 to be valid. +ARBITRARY_YEAR = 2020 + +# Format used to parse date strings after we inject `ARBITRARY_YEAR` at the end. +DATE_FMT = "%B %d %Y"  # Ex: July 10 2020 + +log = logging.getLogger(__name__) + + +class RemoteObject: +    """ +    Remote file or directory on GitHub. + +    The annotations match keys in the response JSON that we're interested in. +    """ + +    sha: str  # Hash helps us detect asset change. +    name: str  # Filename. +    path: str  # Path from repo root. +    type: str  # Either 'file' or 'dir'. +    download_url: t.Optional[str]  # If type is 'dir', this is None! + +    def __init__(self, dictionary: t.Dict[str, t.Any]) -> None: +        """Initialize by grabbing annotated attributes from `dictionary`.""" +        missing_keys = self.__annotations__.keys() - dictionary.keys() +        if missing_keys: +            raise KeyError(f"Fetched object lacks expected keys: {missing_keys}") +        for annotation in self.__annotations__: +            setattr(self, annotation, dictionary[annotation]) + + +class MetaFile(t.NamedTuple): +    """Attributes defined in a 'meta.md' file.""" + +    is_fallback: bool +    start_date: t.Optional[date] +    end_date: t.Optional[date] +    description: str  # Markdown event description. + + +class Event(t.NamedTuple): +    """Event defined in the branding repository.""" + +    path: str  # Path from repo root where event lives. This is the event's identity. +    meta: MetaFile +    banner: RemoteObject +    icons: t.List[RemoteObject] + +    def __str__(self) -> str: +        return f"<Event at '{self.path}'>" + + +class BrandingRepository: +    """ +    Branding repository abstraction. + +    This class represents the branding repository's main branch and exposes available events and assets +    as objects. It performs the necessary amount of validation to ensure that a misconfigured event +    isn't returned. Such events are simply ignored, and will be substituted with the fallback event, +    if available. Warning logs will inform core developers if a misconfigured event is encountered. + +    Colliding events cause no special behaviour. In such cases, the first found active event is returned. +    We work with the assumption that the branding repository checks for such conflicts and prevents them +    from reaching the main branch. + +    This class keeps no internal state. All `get_current_event` calls will result in GitHub API requests. +    The caller is therefore responsible for being responsible and caching information to prevent API abuse. + +    Requests are made using the HTTP session looked up on the bot instance. +    """ + +    def __init__(self, bot: Bot) -> None: +        self.bot = bot + +    async def fetch_directory(self, path: str, types: t.Container[str] = ("file", "dir")) -> t.Dict[str, RemoteObject]: +        """ +        Fetch directory found at `path` in the branding repository. + +        Raise an exception if the request fails, or if the response lacks the expected keys. + +        Passing custom `types` allows getting only files or directories. By default, both are included. +        """ +        full_url = f"{BRANDING_URL}/{path}" +        log.debug(f"Fetching directory from branding repository: '{full_url}'.") + +        async with self.bot.http_session.get(full_url, params=PARAMS, headers=HEADERS) as response: +            if response.status != 200: +                raise RuntimeError(f"Failed to fetch directory due to status: {response.status}") + +            log.debug("Fetch successful, reading JSON response.") +            json_directory = await response.json() + +        return {file["name"]: RemoteObject(file) for file in json_directory if file["type"] in types} + +    async def fetch_file(self, download_url: str) -> bytes: +        """ +        Fetch file as bytes from `download_url`. + +        Raise an exception if the request does not succeed. +        """ +        log.debug(f"Fetching file from branding repository: '{download_url}'.") + +        async with self.bot.http_session.get(download_url, params=PARAMS, headers=HEADERS) as response: +            if response.status != 200: +                raise RuntimeError(f"Failed to fetch file due to status: {response.status}") + +            log.debug("Fetch successful, reading payload.") +            return await response.read() + +    def parse_meta_file(self, raw_file: bytes) -> MetaFile: +        """ +        Parse a 'meta.md' file from raw bytes. + +        The caller is responsible for handling errors caused by misconfiguration. +        """ +        attrs, description = frontmatter.parse(raw_file, encoding="UTF-8") + +        if not description: +            raise BrandingMisconfiguration("No description found in 'meta.md'!") + +        if attrs.get("fallback", False): +            return MetaFile(is_fallback=True, start_date=None, end_date=None, description=description) + +        start_date_raw = attrs.get("start_date") +        end_date_raw = attrs.get("end_date") + +        if None in (start_date_raw, end_date_raw): +            raise BrandingMisconfiguration("Non-fallback event doesn't have start and end dates defined!") + +        # We extend the configured month & day with an arbitrary leap year, allowing a datetime object to exist. +        # This may raise errors if misconfigured. We let the caller handle such cases. +        start_date = datetime.strptime(f"{start_date_raw} {ARBITRARY_YEAR}", DATE_FMT).date() +        end_date = datetime.strptime(f"{end_date_raw} {ARBITRARY_YEAR}", DATE_FMT).date() + +        return MetaFile(is_fallback=False, start_date=start_date, end_date=end_date, description=description) + +    async def construct_event(self, directory: RemoteObject) -> Event: +        """ +        Construct an `Event` instance from an event `directory`. + +        The caller is responsible for handling errors caused by misconfiguration. +        """ +        contents = await self.fetch_directory(directory.path) + +        missing_assets = {"meta.md", "banner.png", "server_icons"} - contents.keys() + +        if missing_assets: +            raise BrandingMisconfiguration(f"Directory is missing following assets: {missing_assets}") + +        server_icons = await self.fetch_directory(contents["server_icons"].path, types=("file",)) + +        if len(server_icons) == 0: +            raise BrandingMisconfiguration("Found no server icons!") + +        meta_bytes = await self.fetch_file(contents["meta.md"].download_url) + +        meta_file = self.parse_meta_file(meta_bytes) + +        return Event(directory.path, meta_file, contents["banner.png"], list(server_icons.values())) + +    async def get_events(self) -> t.List[Event]: +        """ +        Discover available events in the branding repository. + +        Misconfigured events are skipped. May return an empty list in the catastrophic case. +        """ +        log.debug("Discovering events in branding repository.") + +        try: +            event_directories = await self.fetch_directory("events", types=("dir",))  # Skip files. +        except Exception: +            log.exception("Failed to fetch 'events' directory.") +            return [] + +        instances: t.List[Event] = [] + +        for event_directory in event_directories.values(): +            log.trace(f"Attempting to construct event from directory: '{event_directory.path}'.") +            try: +                instance = await self.construct_event(event_directory) +            except Exception as exc: +                log.warning(f"Could not construct event '{event_directory.path}'.", exc_info=exc) +            else: +                instances.append(instance) + +        return instances + +    async def get_current_event(self) -> t.Tuple[t.Optional[Event], t.List[Event]]: +        """ +        Get the currently active event, or the fallback event. + +        The second return value is a list of all available events. The caller may discard it, if not needed. +        Returning all events alongside the current one prevents having to query the API twice in some cases. + +        The current event may be None in the case that no event is active, and no fallback event is found. +        """ +        utc_now = datetime.utcnow() +        log.debug(f"Finding active event for: {utc_now}.") + +        # Construct an object in the arbitrary year for the purpose of comparison. +        lookup_now = date(year=ARBITRARY_YEAR, month=utc_now.month, day=utc_now.day) +        log.trace(f"Lookup object in arbitrary year: {lookup_now}.") + +        available_events = await self.get_events() +        log.trace(f"Found {len(available_events)} available events.") + +        for event in available_events: +            meta = event.meta +            if not meta.is_fallback and (meta.start_date <= lookup_now <= meta.end_date): +                return event, available_events + +        log.trace("No active event found. Looking for fallback event.") + +        for event in available_events: +            if event.meta.is_fallback: +                return event, available_events + +        log.warning("No event is currently active and no fallback event was found!") +        return None, available_events diff --git a/bot/exts/backend/branding/_seasons.py b/bot/exts/backend/branding/_seasons.py deleted file mode 100644 index 5f6256b30..000000000 --- a/bot/exts/backend/branding/_seasons.py +++ /dev/null @@ -1,175 +0,0 @@ -import logging -import typing as t -from datetime import datetime - -from bot.constants import Colours -from bot.exts.backend.branding._constants import Month -from bot.exts.backend.branding._errors import BrandingError - -log = logging.getLogger(__name__) - - -class SeasonBase: -    """ -    Base for Seasonal classes. - -    This serves as the off-season fallback for when no specific -    seasons are active. - -    Seasons are 'registered' simply by inheriting from `SeasonBase`. -    We discover them by calling `__subclasses__`. -    """ - -    season_name: str = "Evergreen" - -    colour: str = Colours.soft_green -    description: str = "The default season!" - -    branding_path: str = "seasonal/evergreen" - -    months: t.Set[Month] = set(Month) - - -class Christmas(SeasonBase): -    """Branding for December.""" - -    season_name = "Festive season" - -    colour = Colours.soft_red -    description = ( -        "The time is here to get into the festive spirit! No matter who you are, where you are, " -        "or what beliefs you may follow, we hope every one of you enjoy this festive season!" -    ) - -    branding_path = "seasonal/christmas" - -    months = {Month.DECEMBER} - - -class Easter(SeasonBase): -    """Branding for April.""" - -    season_name = "Easter" - -    colour = Colours.bright_green -    description = ( -        "Bunny here, bunny there, bunny everywhere! Here at Python Discord, we celebrate " -        "our version of Easter during the entire month of April." -    ) - -    branding_path = "seasonal/easter" - -    months = {Month.APRIL} - - -class Halloween(SeasonBase): -    """Branding for October.""" - -    season_name = "Halloween" - -    colour = Colours.orange -    description = "Trick or treat?!" - -    branding_path = "seasonal/halloween" - -    months = {Month.OCTOBER} - - -class Pride(SeasonBase): -    """Branding for June.""" - -    season_name = "Pride" - -    colour = Colours.pink -    description = ( -        "The month of June is a special month for us at Python Discord. It is very important to us " -        "that everyone feels welcome here, no matter their origin, identity or sexuality. During the " -        "month of June, while some of you are participating in Pride festivals across the world, " -        "we will be celebrating individuality and commemorating the history and challenges " -        "of the LGBTQ+ community with a Pride event of our own!" -    ) - -    branding_path = "seasonal/pride" - -    months = {Month.JUNE} - - -class Valentines(SeasonBase): -    """Branding for February.""" - -    season_name = "Valentines" - -    colour = Colours.pink -    description = "Love is in the air!" - -    branding_path = "seasonal/valentines" - -    months = {Month.FEBRUARY} - - -class Wildcard(SeasonBase): -    """Branding for August.""" - -    season_name = "Wildcard" - -    colour = Colours.purple -    description = "A season full of surprises!" - -    months = {Month.AUGUST} - - -def get_all_seasons() -> t.List[t.Type[SeasonBase]]: -    """Give all available season classes.""" -    return [SeasonBase] + SeasonBase.__subclasses__() - - -def get_current_season() -> t.Type[SeasonBase]: -    """Give active season, based on current UTC month.""" -    current_month = Month(datetime.utcnow().month) - -    active_seasons = tuple( -        season -        for season in SeasonBase.__subclasses__() -        if current_month in season.months -    ) - -    if not active_seasons: -        return SeasonBase - -    return active_seasons[0] - - -def get_season(name: str) -> t.Optional[t.Type[SeasonBase]]: -    """ -    Give season such that its class name or its `season_name` attr match `name` (caseless). - -    If no such season exists, return None. -    """ -    name = name.casefold() - -    for season in get_all_seasons(): -        matches = (season.__name__.casefold(), season.season_name.casefold()) - -        if name in matches: -            return season - - -def _validate_season_overlap() -> None: -    """ -    Raise BrandingError if there are any colliding seasons. - -    This serves as a local test to ensure that seasons haven't been misconfigured. -    """ -    month_to_season = {} - -    for season in SeasonBase.__subclasses__(): -        for month in season.months: -            colliding_season = month_to_season.get(month) - -            if colliding_season: -                raise BrandingError(f"Season {season} collides with {colliding_season} in {month.name}") -            else: -                month_to_season[month] = season - - -_validate_season_overlap() diff --git a/bot/exts/backend/error_handler.py b/bot/exts/backend/error_handler.py index 9cb54cdab..76ab7dfc2 100644 --- a/bot/exts/backend/error_handler.py +++ b/bot/exts/backend/error_handler.py @@ -1,7 +1,6 @@  import contextlib  import difflib  import logging -import random  import typing as t  from discord import Embed @@ -10,10 +9,9 @@ from sentry_sdk import push_scope  from bot.api import ResponseCodeError  from bot.bot import Bot -from bot.constants import Colours, ERROR_REPLIES, Icons, MODERATION_ROLES +from bot.constants import Colours, Icons, MODERATION_ROLES  from bot.converters import TagNameConverter  from bot.errors import InvalidInfractedUser, LockedResourceError -from bot.exts.backend.branding._errors import BrandingError  from bot.utils.checks import InWhitelistCheckFailure  log = logging.getLogger(__name__) @@ -79,9 +77,6 @@ class ErrorHandler(Cog):                  await self.handle_api_error(ctx, e.original)              elif isinstance(e.original, LockedResourceError):                  await ctx.send(f"{e.original} Please wait for it to finish and try again later.") -            elif isinstance(e.original, BrandingError): -                await ctx.send(embed=self._get_error_embed(random.choice(ERROR_REPLIES), str(e.original))) -                return              elif isinstance(e.original, InvalidInfractedUser):                  await ctx.send(f"Cannot infract that user. {e.original.reason}")              else: | 
