aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Joe Banks <[email protected]>2021-03-31 19:37:06 +0100
committerGravatar GitHub <[email protected]>2021-03-31 19:37:06 +0100
commitc2f664488f739f29ded5d309b250119aec3f3051 (patch)
treeb1f4411c47646253d68e1975fa0650e6dcc307f1
parentMerge pull request #1491 from python-discord/fix/dmrelay (diff)
parentBranding: log after successful fetch (diff)
Merge pull request #1463 from kwzrd/kwzrd/branding
-rw-r--r--Pipfile1
-rw-r--r--Pipfile.lock10
-rw-r--r--bot/decorators.py23
-rw-r--r--bot/errors.py6
-rw-r--r--bot/exts/backend/branding/__init__.py6
-rw-r--r--bot/exts/backend/branding/_cog.py887
-rw-r--r--bot/exts/backend/branding/_constants.py51
-rw-r--r--bot/exts/backend/branding/_decorators.py27
-rw-r--r--bot/exts/backend/branding/_errors.py2
-rw-r--r--bot/exts/backend/branding/_repository.py240
-rw-r--r--bot/exts/backend/branding/_seasons.py175
-rw-r--r--bot/exts/backend/error_handler.py7
12 files changed, 766 insertions, 669 deletions
diff --git a/Pipfile b/Pipfile
index 99e480278..7fab198f3 100644
--- a/Pipfile
+++ b/Pipfile
@@ -23,6 +23,7 @@ lxml = "~=4.4"
markdownify = "==0.5.3"
more_itertools = "~=8.2"
python-dateutil = "~=2.8"
+python-frontmatter = "~=1.0.0"
pyyaml = "~=5.1"
requests = "~=2.22"
sentry-sdk = "~=0.19"
diff --git a/Pipfile.lock b/Pipfile.lock
index d16cef2a8..cbec48ef0 100644
--- a/Pipfile.lock
+++ b/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "e5b57ca7276af4709b345055d4b3705c4142c61c4669c796b79a73379ec37a9a"
+ "sha256": "91b5639198b35740611e7ac923cfc262e5897b8cbc3ca243dc98335705804ba7"
},
"pipfile-spec": 6,
"requires": {
@@ -613,6 +613,14 @@
"index": "pypi",
"version": "==2.8.1"
},
+ "python-frontmatter": {
+ "hashes": [
+ "sha256:766ae75f1b301ffc5fe3494339147e0fd80bc3deff3d7590a93991978b579b08",
+ "sha256:e98152e977225ddafea6f01f40b4b0f1de175766322004c826ca99842d19a7cd"
+ ],
+ "index": "pypi",
+ "version": "==1.0.0"
+ },
"pytz": {
"hashes": [
"sha256:83a4a90894bf38e243cf052c8b58f381bfe9a7a483f6a9cab140bc7f702ac4da",
diff --git a/bot/decorators.py b/bot/decorators.py
index 063c8f878..0b50cc365 100644
--- a/bot/decorators.py
+++ b/bot/decorators.py
@@ -1,4 +1,5 @@
import asyncio
+import functools
import logging
import typing as t
from contextlib import suppress
@@ -8,7 +9,7 @@ from discord import Member, NotFound
from discord.ext import commands
from discord.ext.commands import Cog, Context
-from bot.constants import Channels, RedirectOutput
+from bot.constants import Channels, DEBUG_MODE, RedirectOutput
from bot.utils import function
from bot.utils.checks import in_whitelist_check
@@ -153,3 +154,23 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable:
await func(*args, **kwargs)
return wrapper
return decorator
+
+
+def mock_in_debug(return_value: t.Any) -> t.Callable:
+ """
+ Short-circuit function execution if in debug mode and return `return_value`.
+
+ The original function name, and the incoming args and kwargs are DEBUG level logged
+ upon each call. This is useful for expensive operations, i.e. media asset uploads
+ that are prone to rate-limits but need to be tested extensively.
+ """
+ def decorator(func: t.Callable) -> t.Callable:
+ @functools.wraps(func)
+ async def wrapped(*args, **kwargs) -> t.Any:
+ """Short-circuit and log if in debug mode."""
+ if DEBUG_MODE:
+ log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}")
+ return return_value
+ return await func(*args, **kwargs)
+ return wrapped
+ return decorator
diff --git a/bot/errors.py b/bot/errors.py
index ab0adcd42..3544c6320 100644
--- a/bot/errors.py
+++ b/bot/errors.py
@@ -35,3 +35,9 @@ class InvalidInfractedUser(Exception):
self.reason = reason
super().__init__(reason)
+
+
+class BrandingMisconfiguration(RuntimeError):
+ """Raised by the Branding cog when a misconfigured event is encountered."""
+
+ pass
diff --git a/bot/exts/backend/branding/__init__.py b/bot/exts/backend/branding/__init__.py
index 81ea3bf49..20a747b7f 100644
--- a/bot/exts/backend/branding/__init__.py
+++ b/bot/exts/backend/branding/__init__.py
@@ -1,7 +1,7 @@
from bot.bot import Bot
-from bot.exts.backend.branding._cog import BrandingManager
+from bot.exts.backend.branding._cog import Branding
def setup(bot: Bot) -> None:
- """Loads BrandingManager cog."""
- bot.add_cog(BrandingManager(bot))
+ """Load Branding cog."""
+ bot.add_cog(Branding(bot))
diff --git a/bot/exts/backend/branding/_cog.py b/bot/exts/backend/branding/_cog.py
index 20df83a89..0a4ddcc88 100644
--- a/bot/exts/backend/branding/_cog.py
+++ b/bot/exts/backend/branding/_cog.py
@@ -1,566 +1,647 @@
import asyncio
-import itertools
+import contextlib
import logging
import random
import typing as t
from datetime import datetime, time, timedelta
+from enum import Enum
+from operator import attrgetter
-import arrow
import async_timeout
import discord
from async_rediscache import RedisCache
-from discord.ext import commands
+from discord.ext import commands, tasks
from bot.bot import Bot
-from bot.constants import Branding, Colours, Emojis, Guild, MODERATION_ROLES
-from bot.exts.backend.branding import _constants, _decorators, _errors, _seasons
+from bot.constants import Branding as BrandingConfig, Channels, Colours, Guild, MODERATION_ROLES
+from bot.decorators import mock_in_debug
+from bot.exts.backend.branding._repository import BrandingRepository, Event, RemoteObject
log = logging.getLogger(__name__)
-class GitHubFile(t.NamedTuple):
+class AssetType(Enum):
"""
- Represents a remote file on GitHub.
+ Recognised Discord guild asset types.
- The `sha` hash is kept so that we can determine that a file has changed,
- despite its filename remaining unchanged.
+ The value of each member corresponds exactly to a kwarg that can be passed to `Guild.edit`.
"""
- download_url: str
- path: str
- sha: str
+ BANNER = "banner"
+ ICON = "icon"
-def pretty_files(files: t.Iterable[GitHubFile]) -> str:
- """Provide a human-friendly representation of `files`."""
- return "\n".join(file.path for file in files)
+def compound_hash(objects: t.Iterable[RemoteObject]) -> str:
+ """
+ Join SHA attributes of `objects` into a single string.
+
+ Compound hashes are cached to check for change in any of the member `objects`.
+ """
+ return "-".join(item.sha for item in objects)
+
+
+def make_embed(title: str, description: str, *, success: bool) -> discord.Embed:
+ """
+ Construct simple response embed.
+
+ If `success` is True, use green colour, otherwise red.
+
+ For both `title` and `description`, empty string are valid values ~ fields will be empty.
+ """
+ colour = Colours.soft_green if success else Colours.soft_red
+ return discord.Embed(title=title[:256], description=description[:2048], colour=colour)
-def time_until_midnight() -> timedelta:
+def extract_event_duration(event: Event) -> str:
"""
- Determine amount of time until the next-up UTC midnight.
+ Extract a human-readable, year-agnostic duration string from `event`.
- The exact `midnight` moment is actually delayed to 5 seconds after, in order
- to avoid potential problems due to imprecise sleep.
+ In the case that `event` is a fallback event, resolves to 'Fallback'.
"""
- now = datetime.utcnow()
- tomorrow = now + timedelta(days=1)
- midnight = datetime.combine(tomorrow, time(second=5))
+ if event.meta.is_fallback:
+ return "Fallback"
- return midnight - now
+ fmt = "%B %d" # Ex: August 23
+ start_date = event.meta.start_date.strftime(fmt)
+ end_date = event.meta.end_date.strftime(fmt)
+ return f"{start_date} - {end_date}"
-class BrandingManager(commands.Cog):
+
+def extract_event_name(event: Event) -> str:
"""
- Manages the guild's branding.
-
- The purpose of this cog is to help automate the synchronization of the branding
- repository with the guild. It is capable of discovering assets in the repository
- via GitHub's API, resolving download urls for them, and delegating
- to the `bot` instance to upload them to the guild.
-
- BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens
- once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single
- season. The daemon can be turned on and off via the `daemon` cmd group. The value set via
- its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will
- automatically start on the next bot start-up. Otherwise, it will wait to be started manually.
-
- All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can
- also be invoked manually, via the following API:
-
- branding list
- - Show all available seasons
-
- branding set <season_name>
- - Set the cog's internal state to represent `season_name`, if it exists
- - If no `season_name` is given, set chronologically current season
- - This will not automatically apply the season's branding to the guild,
- the cog's state can be detached from the guild
- - Seasons can therefore be 'previewed' using this command
-
- branding info
- - View detailed information about resolved assets for current season
-
- branding refresh
- - Refresh internal state, i.e. synchronize with branding repository
-
- branding apply
- - Apply the current internal state to the guild, i.e. upload the assets
-
- branding cycle
- - If there are multiple available icons for current season, randomly pick
- and apply the next one
-
- The daemon calls these methods autonomously as appropriate. The use of this cog
- is locked to moderation roles. As it performs media asset uploads, it is prone to
- rate-limits - the `apply` command should be used with caution. The `set` command can,
- however, be used freely to 'preview' seasonal branding and check whether paths have been
- resolved as appropriate.
-
- While the bot is in debug mode, it will 'mock' asset uploads by logging the passed
- download urls and pretending that the upload was successful. Make use of this
- to test this cog's behaviour.
+ Extract title-cased event name from the path of `event`.
+
+ An event with a path of 'events/black_history_month' will resolve to 'Black History Month'.
"""
+ name = event.path.split("/")[-1] # Inner-most directory name.
+ words = name.split("_") # Words from snake case.
- current_season: t.Type[_seasons.SeasonBase]
+ return " ".join(word.title() for word in words)
- banner: t.Optional[GitHubFile]
- available_icons: t.List[GitHubFile]
- remaining_icons: t.List[GitHubFile]
+class Branding(commands.Cog):
+ """
+ Guild branding management.
- days_since_cycle: t.Iterator
+ Extension responsible for automatic synchronisation of the guild's branding with the branding repository.
+ Event definitions and assets are automatically discovered and applied as appropriate.
- daemon: t.Optional[asyncio.Task]
+ All state is stored in Redis. The cog should therefore seamlessly transition across restarts and maintain
+ a consistent icon rotation schedule for events with multiple icon assets.
- # Branding configuration
- branding_configuration = RedisCache()
+ By caching hashes of banner & icon assets, we discover changes in currently applied assets and always keep
+ the latest version applied.
+
+ The command interface allows moderators+ to control the daemon or request asset synchronisation, while
+ regular users can see information about the current event and the overall event schedule.
+ """
+
+ # RedisCache[
+ # "daemon_active": bool | If True, daemon starts on start-up. Controlled via commands.
+ # "event_path": str | Current event's path in the branding repo.
+ # "event_description": str | Current event's Markdown description.
+ # "event_duration": str | Current event's human-readable date range.
+ # "banner_hash": str | SHA of the currently applied banner.
+ # "icons_hash": str | Compound SHA of all icons in current rotation.
+ # "last_rotation_timestamp": float | POSIX UTC timestamp.
+ # ]
+ cache_information = RedisCache()
+
+ # Icons in current rotation. Keys (str) are download URLs, values (int) track the amount of times each
+ # icon has been used in the current rotation.
+ cache_icons = RedisCache()
+
+ # All available event names & durations. Cached by the daemon nightly; read by the calendar command.
+ cache_events = RedisCache()
def __init__(self, bot: Bot) -> None:
+ """Instantiate repository abstraction & allow daemon to start."""
+ self.bot = bot
+ self.repository = BrandingRepository(bot)
+
+ self.bot.loop.create_task(self.maybe_start_daemon()) # Start depending on cache.
+
+ # region: Internal logic & state management
+
+ @mock_in_debug(return_value=True) # Mocked in development environment to prevent API spam.
+ async def apply_asset(self, asset_type: AssetType, download_url: str) -> bool:
"""
- Assign safe default values on init.
+ Download asset from `download_url` and apply it to PyDis as `asset_type`.
- At this point, we don't have information about currently available branding.
- Most of these attributes will be overwritten once the daemon connects, or once
- the `refresh` command is used.
+ Return a boolean indicating whether the application was successful.
"""
- self.bot = bot
- self.current_season = _seasons.get_current_season()
+ log.info(f"Applying '{asset_type.value}' asset to the guild.")
- self.banner = None
+ try:
+ file = await self.repository.fetch_file(download_url)
+ except Exception:
+ log.exception(f"Failed to fetch '{asset_type.value}' asset.")
+ return False
- self.available_icons = []
- self.remaining_icons = []
+ await self.bot.wait_until_guild_available()
+ pydis: discord.Guild = self.bot.get_guild(Guild.id)
- self.days_since_cycle = itertools.cycle([None])
+ timeout = 10 # Seconds.
+ try:
+ with async_timeout.timeout(timeout): # Raise after `timeout` seconds.
+ await pydis.edit(**{asset_type.value: file})
+ except discord.HTTPException:
+ log.exception("Asset upload to Discord failed.")
+ return False
+ except asyncio.TimeoutError:
+ log.error(f"Asset upload to Discord timed out after {timeout} seconds.")
+ return False
+ else:
+ log.trace("Asset uploaded successfully.")
+ return True
- self.daemon = None
- self._startup_task = self.bot.loop.create_task(self._initial_start_daemon())
+ async def apply_banner(self, banner: RemoteObject) -> bool:
+ """
+ Apply `banner` to the guild and cache its hash if successful.
+
+ Banners should always be applied via this method to ensure that the last hash is cached.
+
+ Return a boolean indicating whether the application was successful.
+ """
+ success = await self.apply_asset(AssetType.BANNER, banner.download_url)
- async def _initial_start_daemon(self) -> None:
- """Checks is daemon active and when is, start it at cog load."""
- if await self.branding_configuration.get("daemon_active"):
- self.daemon = self.bot.loop.create_task(self._daemon_func())
+ if success:
+ await self.cache_information.set("banner_hash", banner.sha)
- @property
- def _daemon_running(self) -> bool:
- """True if the daemon is currently active, False otherwise."""
- return self.daemon is not None and not self.daemon.done()
+ return success
- async def _daemon_func(self) -> None:
+ async def rotate_icons(self) -> bool:
"""
- Manage all automated behaviour of the BrandingManager cog.
+ Choose and apply the next-up icon in rotation.
+
+ We keep track of the amount of times each icon has been used. The values in `cache_icons` can be understood
+ to be iteration IDs. When an icon is chosen & applied, we bump its count, pushing it into the next iteration.
- Once a day, the daemon will perform the following tasks:
- - Update `current_season`
- - Poll GitHub API to see if the available branding for `current_season` has changed
- - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname)
- - Check whether it's time to cycle guild icons
+ Once the current iteration (lowest count in the cache) depletes, we move onto the next iteration.
- The internal loop runs once when activated, then periodically at the time
- given by `time_until_midnight`.
+ In the case that there is only 1 icon in the rotation and has already been applied, do nothing.
- All method calls in the internal loop are considered safe, i.e. no errors propagate
- to the daemon's loop. The daemon itself does not perform any error handling on its own.
+ Return a boolean indicating whether a new icon was applied successfully.
"""
- await self.bot.wait_until_guild_available()
+ log.debug("Rotating icons.")
- while True:
- self.current_season = _seasons.get_current_season()
- branding_changed = await self.refresh()
+ state = await self.cache_icons.to_dict()
+ log.trace(f"Total icons in rotation: {len(state)}.")
- if branding_changed:
- await self.apply()
+ if not state: # This would only happen if rotation not initiated, but we can handle gracefully.
+ log.warning("Attempted icon rotation with an empty icon cache. This indicates wrong logic.")
+ return False
- elif next(self.days_since_cycle) == Branding.cycle_frequency:
- await self.cycle()
+ if len(state) == 1 and 1 in state.values():
+ log.debug("Aborting icon rotation: only 1 icon is available and has already been applied.")
+ return False
- until_midnight = time_until_midnight()
- await asyncio.sleep(until_midnight.total_seconds())
+ current_iteration = min(state.values()) # Choose iteration to draw from.
+ options = [download_url for download_url, times_used in state.items() if times_used == current_iteration]
- async def _info_embed(self) -> discord.Embed:
- """Make an informative embed representing current season."""
- info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour)
+ log.trace(f"Choosing from {len(options)} icons in iteration {current_iteration}.")
+ next_icon = random.choice(options)
- # If we're in a non-evergreen season, also show active months
- if self.current_season is not _seasons.SeasonBase:
- title = f"{self.current_season.season_name} ({', '.join(str(m) for m in self.current_season.months)})"
- else:
- title = self.current_season.season_name
+ success = await self.apply_asset(AssetType.ICON, next_icon)
+
+ if success:
+ await self.cache_icons.increment(next_icon) # Push the icon into the next iteration.
+
+ timestamp = datetime.utcnow().timestamp()
+ await self.cache_information.set("last_rotation_timestamp", timestamp)
+
+ return success
- # Use the author field to show the season's name and avatar if available
- info_embed.set_author(name=title)
+ async def maybe_rotate_icons(self) -> None:
+ """
+ Call `rotate_icons` if the configured amount of time has passed since last rotation.
- banner = self.banner.path if self.banner is not None else "Unavailable"
- info_embed.add_field(name="Banner", value=banner, inline=False)
+ We offset the calculated time difference into the future to avoid off-by-a-little-bit errors. Because there
+ is work to be done before the timestamp is read and written, the next read will likely commence slightly
+ under 24 hours after the last write.
+ """
+ log.debug("Checking whether it's time for icons to rotate.")
- icons = pretty_files(self.available_icons) or "Unavailable"
- info_embed.add_field(name="Available icons", value=icons, inline=False)
+ last_rotation_timestamp = await self.cache_information.get("last_rotation_timestamp")
- # Only display cycle frequency if we're actually cycling
- if len(self.available_icons) > 1 and Branding.cycle_frequency:
- info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}")
+ if last_rotation_timestamp is None: # Maiden case ~ never rotated.
+ await self.rotate_icons()
+ return
- return info_embed
+ last_rotation = datetime.fromtimestamp(last_rotation_timestamp)
+ difference = (datetime.utcnow() - last_rotation) + timedelta(minutes=5)
- async def _reset_remaining_icons(self) -> None:
- """Set `remaining_icons` to a shuffled copy of `available_icons`."""
- self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons))
+ log.trace(f"Icons last rotated at {last_rotation} (difference: {difference}).")
- async def _reset_days_since_cycle(self) -> None:
+ if difference.days >= BrandingConfig.cycle_frequency:
+ await self.rotate_icons()
+
+ async def initiate_icon_rotation(self, available_icons: t.List[RemoteObject]) -> None:
"""
- Reset the `days_since_cycle` iterator based on configured frequency.
+ Set up a new icon rotation.
- If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey,
- the iterator will always yield None. This signals that the icon shouldn't be cycled.
+ This function should be called whenever available icons change. This is generally the case when we enter
+ a new event, but potentially also when the assets of an on-going event change. In such cases, a reset
+ of `cache_icons` is necessary, because it contains download URLs which may have gotten stale.
- Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely.
- When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle.
+ This function does not upload a new icon!
"""
- if len(self.available_icons) > 1 and Branding.cycle_frequency:
- sequence = range(1, Branding.cycle_frequency + 1)
- else:
- sequence = [None]
+ log.debug("Initiating new icon rotation.")
- self.days_since_cycle = itertools.cycle(sequence)
+ await self.cache_icons.clear()
- async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]:
+ new_state = {icon.download_url: 0 for icon in available_icons}
+ await self.cache_icons.update(new_state)
+
+ log.trace(f"Icon rotation initiated for {len(new_state)} icons.")
+
+ await self.cache_information.set("icons_hash", compound_hash(available_icons))
+
+ async def send_info_embed(self, channel_id: int, *, is_notification: bool) -> None:
"""
- Get files at `path` in the branding repository.
+ Send the currently cached event description to `channel_id`.
- If `include_dirs` is False (default), only returns files at `path`.
- Otherwise, will return both files and directories. Never returns symlinks.
+ When `is_notification` holds, a short contextual message for the #changelog channel is added.
- Return dict mapping from filename to corresponding `GitHubFile` instance.
- This may return an empty dict if the response status is non-200,
- or if the target directory is empty.
+ We read event information from `cache_information`. The caller is therefore responsible for making
+ sure that the cache is up-to-date before calling this function.
"""
- url = f"{_constants.BRANDING_URL}/{path}"
- async with self.bot.http_session.get(
- url, headers=_constants.HEADERS, params=_constants.PARAMS
- ) as resp:
- # Short-circuit if we get non-200 response
- if resp.status != _constants.STATUS_OK:
- log.error(f"GitHub API returned non-200 response: {resp}")
- return {}
- directory = await resp.json() # Directory at `path`
+ log.debug(f"Sending event information event to channel: {channel_id} ({is_notification=}).")
- allowed_types = {"file", "dir"} if include_dirs else {"file"}
- return {
- file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"])
- for file in directory
- if file["type"] in allowed_types
- }
+ await self.bot.wait_until_guild_available()
+ channel: t.Optional[discord.TextChannel] = self.bot.get_channel(channel_id)
- async def refresh(self) -> bool:
+ if channel is None:
+ log.warning(f"Cannot send event information: channel {channel_id} not found!")
+ return
+
+ log.trace(f"Destination channel: #{channel.name}.")
+
+ description = await self.cache_information.get("event_description")
+ duration = await self.cache_information.get("event_duration")
+
+ if None in (description, duration):
+ content = None
+ embed = make_embed("No event in cache", "Is the daemon enabled?", success=False)
+
+ else:
+ content = "Python Discord is entering a new event!" if is_notification else None
+ embed = discord.Embed(description=description[:2048], colour=discord.Colour.blurple())
+ embed.set_footer(text=duration[:2048])
+
+ await channel.send(content=content, embed=embed)
+
+ async def enter_event(self, event: Event) -> t.Tuple[bool, bool]:
"""
- Synchronize available assets with branding repository.
+ Apply `event` assets and update information cache.
- If the current season is not the evergreen, and lacks at least one asset,
- we use the evergreen seasonal dir as fallback for missing assets.
+ We cache `event` information to ensure that we:
+ * Remember which event we're currently in across restarts
+ * Provide an on-demand informational embed without re-querying the branding repository
- Finally, if neither the seasonal nor fallback branding directories contain
- an asset, it will simply be ignored.
+ An event change should always be handled via this function, as it ensures that the cache is populated.
- Return True if the branding has changed. This will be the case when we enter
- a new season, or when something changes in the current seasons's directory
- in the branding repository.
+ The #changelog notification is omitted when `event` is fallback, or already applied.
+
+ Return a 2-tuple indicating whether the banner, and the icon, were applied successfully.
"""
- old_branding = (self.banner, self.available_icons)
- seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True)
+ log.info(f"Entering event: '{event.path}'.")
- # Only make a call to the fallback directory if there is something to be gained
- branding_incomplete = any(
- asset not in seasonal_dir
- for asset in (_constants.FILE_BANNER, _constants.FILE_AVATAR, _constants.SERVER_ICONS)
- )
- if branding_incomplete and self.current_season is not _seasons.SeasonBase:
- fallback_dir = await self._get_files(
- _seasons.SeasonBase.branding_path, include_dirs=True
- )
- else:
- fallback_dir = {}
+ banner_success = await self.apply_banner(event.banner) # Only one asset ~ apply directly.
- # Resolve assets in this directory, None is a safe value
- self.banner = (
- seasonal_dir.get(_constants.FILE_BANNER)
- or fallback_dir.get(_constants.FILE_BANNER)
- )
+ await self.initiate_icon_rotation(event.icons) # Prepare a new rotation.
+ icon_success = await self.rotate_icons() # Apply an icon from the new rotation.
- # Now resolve server icons by making a call to the proper sub-directory
- if _constants.SERVER_ICONS in seasonal_dir:
- icons_dir = await self._get_files(
- f"{self.current_season.branding_path}/{_constants.SERVER_ICONS}"
- )
- self.available_icons = list(icons_dir.values())
+ # This will only be False in the case of a manual same-event re-synchronisation.
+ event_changed = event.path != await self.cache_information.get("event_path")
- elif _constants.SERVER_ICONS in fallback_dir:
- icons_dir = await self._get_files(
- f"{_seasons.SeasonBase.branding_path}/{_constants.SERVER_ICONS}"
- )
- self.available_icons = list(icons_dir.values())
+ # Cache event identity to avoid re-entry in case of restart.
+ await self.cache_information.set("event_path", event.path)
+ # Cache information shown in the 'about' embed.
+ await self.populate_cache_event_description(event)
+
+ # Notify guild of new event ~ this reads the information that we cached above.
+ if event_changed and not event.meta.is_fallback:
+ await self.send_info_embed(Channels.change_log, is_notification=True)
else:
- self.available_icons = [] # This should never be the case, but an empty list is a safe value
+ log.trace("Omitting #changelog notification. Event has not changed, or new event is fallback.")
- # GitHubFile instances carry a `sha` attr so this will pick up if a file changes
- branding_changed = old_branding != (self.banner, self.available_icons)
+ return banner_success, icon_success
- if branding_changed:
- log.info(f"New branding detected (season: {self.current_season.season_name})")
- await self._reset_remaining_icons()
- await self._reset_days_since_cycle()
+ async def synchronise(self) -> t.Tuple[bool, bool]:
+ """
+ Fetch the current event and delegate to `enter_event`.
- return branding_changed
+ This is a convenience function to force synchronisation via a command. It should generally only be used
+ in a recovery scenario. In the usual case, the daemon already has an `Event` instance and can pass it
+ to `enter_event` directly.
- async def cycle(self) -> bool:
+ Return a 2-tuple indicating whether the banner, and the icon, were applied successfully.
"""
- Apply the next-up server icon.
+ log.debug("Synchronise: fetching current event.")
- Returns True if an icon is available and successfully gets applied, False otherwise.
- """
- if not self.available_icons:
- log.info("Cannot cycle: no icons for this season")
- return False
+ current_event, available_events = await self.repository.get_current_event()
- if not self.remaining_icons:
- log.info("Reset & shuffle remaining icons")
- await self._reset_remaining_icons()
+ await self.populate_cache_events(available_events)
- next_up = self.remaining_icons.pop(0)
- success = await self.set_icon(next_up.download_url)
+ if current_event is None:
+ log.error("Failed to fetch event. Cannot synchronise!")
+ return False, False
- return success
+ return await self.enter_event(current_event)
- async def apply(self) -> t.List[str]:
+ async def populate_cache_events(self, events: t.List[Event]) -> None:
"""
- Apply current branding to the guild and bot.
-
- This delegates to the bot instance to do all the work. We only provide download urls
- for available assets. Assets unavailable in the branding repo will be ignored.
+ Clear `cache_events` and re-populate with names and durations of `events`.
- Returns a list of names of all failed assets. An asset is considered failed
- if it isn't found in the branding repo, or if something goes wrong while the
- bot is trying to apply it.
+ For each event, we store its name and duration string. This is the information presented to users in the
+ calendar command. If a format change is needed, it has to be done here.
- An empty list denotes that all assets have been applied successfully.
+ The cache does not store the fallback event, as it is not shown in the calendar.
"""
- report = {asset: False for asset in ("banner", "icon")}
+ log.debug("Populating events cache.")
- if self.banner is not None:
- report["banner"] = await self.set_banner(self.banner.download_url)
+ await self.cache_events.clear()
- report["icon"] = await self.cycle()
+ no_fallback = [event for event in events if not event.meta.is_fallback]
+ chronological_events = sorted(no_fallback, key=attrgetter("meta.start_date"))
- failed_assets = [asset for asset, succeeded in report.items() if not succeeded]
- return failed_assets
+ log.trace(f"Writing {len(chronological_events)} events (fallback omitted).")
- @commands.has_any_role(*MODERATION_ROLES)
- @commands.group(name="branding")
- async def branding_cmds(self, ctx: commands.Context) -> None:
- """Manual branding control."""
- if not ctx.invoked_subcommand:
- await ctx.send_help(ctx.command)
+ with contextlib.suppress(ValueError): # Cache raises when updated with an empty dict.
+ await self.cache_events.update({
+ extract_event_name(event): extract_event_duration(event)
+ for event in chronological_events
+ })
- @branding_cmds.command(name="list", aliases=["ls"])
- async def branding_list(self, ctx: commands.Context) -> None:
- """List all available seasons and branding sources."""
- embed = discord.Embed(title="Available seasons", colour=Colours.soft_green)
+ async def populate_cache_event_description(self, event: Event) -> None:
+ """
+ Cache `event` description & duration.
- for season in _seasons.get_all_seasons():
- if season is _seasons.SeasonBase:
- active_when = "always"
- else:
- active_when = f"in {', '.join(str(m) for m in season.months)}"
+ This should be called when entering a new event, and can be called periodically to ensure that the cache
+ holds fresh information in the case that the event remains the same, but its description changes.
- description = (
- f"Active {active_when}\n"
- f"Branding: {season.branding_path}"
- )
- embed.add_field(name=season.season_name, value=description, inline=False)
+ The duration is stored formatted for the frontend. It is not intended to be used programmatically.
+ """
+ log.debug("Caching event description & duration.")
- await ctx.send(embed=embed)
+ await self.cache_information.set("event_description", event.meta.description)
+ await self.cache_information.set("event_duration", extract_event_duration(event))
- @branding_cmds.command(name="set")
- async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None:
+ # endregion
+ # region: Daemon
+
+ async def maybe_start_daemon(self) -> None:
"""
- Manually set season, or reset to current if none given.
+ Start the daemon depending on cache state.
- Season search is a case-less comparison against both seasonal class name,
- and its `season_name` attr.
+ The daemon will only start if it has been explicitly enabled via a command.
+ """
+ log.debug("Checking whether daemon should start.")
- This only pre-loads the cog's internal state to the chosen season, but does not
- automatically apply the branding. As that is an expensive operation, the `apply`
- command must be called explicitly after this command finishes.
+ should_begin: t.Optional[bool] = await self.cache_information.get("daemon_active") # None if never set!
- This means that this command can be used to 'preview' a season gathering info
- about its available assets, without applying them to the guild.
+ if should_begin:
+ self.daemon_loop.start()
- If the daemon is running, it will automatically reset the season to current when
- it wakes up. The season set via this command can therefore remain 'detached' from
- what it should be - the daemon will make sure that it's set back properly.
+ def cog_unload(self) -> None:
"""
- if season_name is None:
- new_season = _seasons.get_current_season()
- else:
- new_season = _seasons.get_season(season_name)
- if new_season is None:
- raise _errors.BrandingError("No such season exists")
+ Cancel the daemon in case of cog unload.
- if self.current_season is new_season:
- raise _errors.BrandingError(f"Season {self.current_season.season_name} already active")
+ This is **not** done automatically! The daemon otherwise remains active in the background.
+ """
+ log.debug("Cog unload: cancelling daemon.")
- self.current_season = new_season
- await self.branding_refresh(ctx)
+ self.daemon_loop.cancel()
- @branding_cmds.command(name="info", aliases=["status"])
- async def branding_info(self, ctx: commands.Context) -> None:
+ async def daemon_main(self) -> None:
"""
- Show available assets for current season.
+ Synchronise guild & caches with branding repository.
- This can be used to confirm that assets have been resolved properly.
- When `apply` is used, it attempts to upload exactly the assets listed here.
+ Pull the currently active event from the branding repository and check whether it matches the currently
+ active event in the cache. If not, apply the new event.
+
+ However, it is also possible that an event's assets change as it's active. To account for such cases,
+ we check the banner & icons hashes against the currently cached values. If there is a mismatch, each
+ specific asset is re-applied.
"""
- await ctx.send(embed=await self._info_embed())
+ log.info("Daemon main: checking current event.")
- @branding_cmds.command(name="refresh")
- async def branding_refresh(self, ctx: commands.Context) -> None:
- """Sync currently available assets with branding repository."""
- async with ctx.typing():
- await self.refresh()
- await self.branding_info(ctx)
+ new_event, available_events = await self.repository.get_current_event()
+
+ await self.populate_cache_events(available_events)
+
+ if new_event is None:
+ log.warning("Daemon main: failed to get current event from branding repository, will do nothing.")
+ return
+
+ if new_event.path != await self.cache_information.get("event_path"):
+ log.debug("Daemon main: new event detected!")
+ await self.enter_event(new_event)
+ return
+
+ await self.populate_cache_event_description(new_event) # Cache fresh frontend info in case of change.
- @branding_cmds.command(name="apply")
- async def branding_apply(self, ctx: commands.Context) -> None:
+ log.trace("Daemon main: event has not changed, checking for change in assets.")
+
+ if new_event.banner.sha != await self.cache_information.get("banner_hash"):
+ log.debug("Daemon main: detected banner change.")
+ await self.apply_banner(new_event.banner)
+
+ if compound_hash(new_event.icons) != await self.cache_information.get("icons_hash"):
+ log.debug("Daemon main: detected icon change.")
+ await self.initiate_icon_rotation(new_event.icons)
+ await self.rotate_icons()
+ else:
+ await self.maybe_rotate_icons()
+
+ @tasks.loop(hours=24)
+ async def daemon_loop(self) -> None:
"""
- Apply current season's branding to the guild.
+ Call `daemon_main` every 24 hours.
- Use `info` to check which assets will be applied. Shows which assets have
- failed to be applied, if any.
+ The scheduler maintains an exact 24-hour frequency even if this coroutine takes time to complete. If the
+ coroutine is started at 00:01 and completes at 00:05, it will still be started at 00:01 the next day.
"""
- async with ctx.typing():
- failed_assets = await self.apply()
- if failed_assets:
- raise _errors.BrandingError(
- f"Failed to apply following assets: {', '.join(failed_assets)}"
- )
+ log.trace("Daemon loop: calling daemon main.")
- response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green)
- await ctx.send(embed=response)
+ try:
+ await self.daemon_main()
+ except Exception:
+ log.exception("Daemon loop: failed with an unhandled exception!")
- @branding_cmds.command(name="cycle")
- async def branding_cycle(self, ctx: commands.Context) -> None:
+ @daemon_loop.before_loop
+ async def daemon_before(self) -> None:
"""
- Apply the next-up guild icon, if multiple are available.
+ Call `daemon_loop` immediately, then block the loop until the next-up UTC midnight.
- The order is random.
+ The first iteration is invoked directly such that synchronisation happens immediately after daemon start.
+ We then calculate the time until the next-up midnight and sleep before letting `daemon_loop` begin.
"""
- async with ctx.typing():
- success = await self.cycle()
- if not success:
- raise _errors.BrandingError("Failed to cycle icon")
+ log.trace("Daemon before: performing start-up iteration.")
+
+ await self.daemon_loop()
+
+ log.trace("Daemon before: calculating time to sleep before loop begins.")
+ now = datetime.utcnow()
- response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green)
- await ctx.send(embed=response)
+ # The actual midnight moment is offset into the future to prevent issues with imprecise sleep.
+ tomorrow = now + timedelta(days=1)
+ midnight = datetime.combine(tomorrow, time(minute=1))
- @branding_cmds.group(name="daemon", aliases=["d", "task"])
- async def daemon_group(self, ctx: commands.Context) -> None:
- """Control the background daemon."""
+ sleep_secs = (midnight - now).total_seconds()
+ log.trace(f"Daemon before: sleeping {sleep_secs} seconds before next-up midnight: {midnight}.")
+
+ await asyncio.sleep(sleep_secs)
+
+ # endregion
+ # region: Command interface (branding)
+
+ @commands.group(name="branding")
+ async def branding_group(self, ctx: commands.Context) -> None:
+ """Control the branding cog."""
if not ctx.invoked_subcommand:
await ctx.send_help(ctx.command)
- @daemon_group.command(name="status")
- async def daemon_status(self, ctx: commands.Context) -> None:
- """Check whether daemon is currently active."""
- if self._daemon_running:
- remaining_time = (arrow.utcnow() + time_until_midnight()).humanize()
- response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green)
- response.set_footer(text=f"Next refresh {remaining_time}")
- else:
- response = discord.Embed(description="Daemon not running", colour=Colours.soft_red)
+ @branding_group.command(name="about", aliases=("current", "event"))
+ async def branding_about_cmd(self, ctx: commands.Context) -> None:
+ """Show the current event's description and duration."""
+ await self.send_info_embed(ctx.channel.id, is_notification=False)
- await ctx.send(embed=response)
+ @commands.has_any_role(*MODERATION_ROLES)
+ @branding_group.command(name="sync")
+ async def branding_sync_cmd(self, ctx: commands.Context) -> None:
+ """
+ Force branding synchronisation.
- @daemon_group.command(name="start")
- async def daemon_start(self, ctx: commands.Context) -> None:
- """If the daemon isn't running, start it."""
- if self._daemon_running:
- raise _errors.BrandingError("Daemon already running!")
+ Show which assets have failed to synchronise, if any.
+ """
+ async with ctx.typing():
+ banner_success, icon_success = await self.synchronise()
- self.daemon = self.bot.loop.create_task(self._daemon_func())
- await self.branding_configuration.set("daemon_active", True)
+ failed_assets = ", ".join(
+ name
+ for name, status in [("banner", banner_success), ("icon", icon_success)]
+ if status is False
+ )
- response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green)
- await ctx.send(embed=response)
+ if failed_assets:
+ resp = make_embed("Synchronisation unsuccessful", f"Failed to apply: {failed_assets}.", success=False)
+ resp.set_footer(text="Check log for details.")
+ else:
+ resp = make_embed("Synchronisation successful", "Assets have been applied.", success=True)
- @daemon_group.command(name="stop")
- async def daemon_stop(self, ctx: commands.Context) -> None:
- """If the daemon is running, stop it."""
- if not self._daemon_running:
- raise _errors.BrandingError("Daemon not running!")
+ await ctx.send(embed=resp)
- self.daemon.cancel()
- await self.branding_configuration.set("daemon_active", False)
+ # endregion
+ # region: Command interface (branding calendar)
+
+ @branding_group.group(name="calendar", aliases=("schedule", "events"))
+ async def branding_calendar_group(self, ctx: commands.Context) -> None:
+ """
+ Show the current event calendar.
- response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green)
- await ctx.send(embed=response)
+ We draw event information from `cache_events` and use each key-value pair to create a field in the response
+ embed. As such, we do not need to query the API to get event information. The cache is automatically
+ re-populated by the daemon whenever it makes a request. A moderator+ can also explicitly request a cache
+ refresh using the 'refresh' subcommand.
- async def _fetch_image(self, url: str) -> bytes:
- """Retrieve and read image from `url`."""
- log.debug(f"Getting image from: {url}")
- async with self.bot.http_session.get(url) as resp:
- return await resp.read()
+ Due to Discord limitations, we only show up to 25 events. This is entirely sufficient at the time of writing.
+ In the case that we find ourselves with more than 25 events, a warning log will alert core devs.
- async def _apply_asset(self, target: discord.Guild, asset: _constants.AssetType, url: str) -> bool:
+ In the future, we may be interested in a field-paginating solution.
"""
- Internal method for applying media assets to the guild.
+ if ctx.invoked_subcommand:
+ # If you're wondering why this works: when the 'refresh' subcommand eventually re-invokes
+ # this group, the attribute will be automatically set to None by the framework.
+ return
+
+ available_events = await self.cache_events.to_dict()
+ log.trace(f"Found {len(available_events)} cached events available for calendar view.")
+
+ if not available_events:
+ resp = make_embed("No events found!", "Cache may be empty, try `branding calendar refresh`.", success=False)
+ await ctx.send(embed=resp)
+ return
+
+ embed = discord.Embed(title="Current event calendar", colour=discord.Colour.blurple())
+
+ # Because Discord embeds can only contain up to 25 fields, we only show the first 25.
+ first_25 = list(available_events.items())[:25]
- This shouldn't be called directly. The purpose of this method is mainly generic
- error handling to reduce needless code repetition.
+ if len(first_25) != len(available_events): # Alert core devs that a paginating solution is now necessary.
+ log.warning(f"There are {len(available_events)} events, but the calendar view can only display 25.")
- Return True if upload was successful, False otherwise.
+ for name, duration in first_25:
+ embed.add_field(name=name[:256], value=duration[:1024])
+
+ embed.set_footer(text="Otherwise, the fallback season is used.")
+
+ await ctx.send(embed=embed)
+
+ @commands.has_any_role(*MODERATION_ROLES)
+ @branding_calendar_group.command(name="refresh")
+ async def branding_calendar_refresh_cmd(self, ctx: commands.Context) -> None:
"""
- log.info(f"Attempting to set {asset.name}: {url}")
+ Refresh event cache and show current event calendar.
- kwargs = {asset.value: await self._fetch_image(url)}
- try:
- async with async_timeout.timeout(5):
- await target.edit(**kwargs)
+ Supplementary subcommand allowing force-refreshing the event cache. Implemented as a subcommand because
+ unlike the supergroup, it requires moderator privileges.
+ """
+ log.info("Performing command-requested event cache refresh.")
- except asyncio.TimeoutError:
- log.info("Asset upload timed out")
- return False
+ async with ctx.typing():
+ available_events = await self.repository.get_events()
+ await self.populate_cache_events(available_events)
- except discord.HTTPException as discord_error:
- log.exception("Asset upload failed", exc_info=discord_error)
- return False
+ await ctx.invoke(self.branding_calendar_group)
+
+ # endregion
+ # region: Command interface (branding daemon)
+
+ @commands.has_any_role(*MODERATION_ROLES)
+ @branding_group.group(name="daemon", aliases=("d",))
+ async def branding_daemon_group(self, ctx: commands.Context) -> None:
+ """Control the branding cog's daemon."""
+ if not ctx.invoked_subcommand:
+ await ctx.send_help(ctx.command)
+
+ @branding_daemon_group.command(name="enable", aliases=("start", "on"))
+ async def branding_daemon_enable_cmd(self, ctx: commands.Context) -> None:
+ """Enable the branding daemon."""
+ await self.cache_information.set("daemon_active", True)
+ if self.daemon_loop.is_running():
+ resp = make_embed("Daemon is already enabled!", "", success=False)
else:
- log.info("Asset successfully applied")
- return True
+ self.daemon_loop.start()
+ resp = make_embed("Daemon enabled!", "It will now automatically awaken on start-up.", success=True)
- @_decorators.mock_in_debug(return_value=True)
- async def set_banner(self, url: str) -> bool:
- """Set the guild's banner to image at `url`."""
- guild = self.bot.get_guild(Guild.id)
- if guild is None:
- log.info("Failed to get guild instance, aborting asset upload")
- return False
+ await ctx.send(embed=resp)
- return await self._apply_asset(guild, _constants.AssetType.BANNER, url)
+ @branding_daemon_group.command(name="disable", aliases=("stop", "off"))
+ async def branding_daemon_disable_cmd(self, ctx: commands.Context) -> None:
+ """Disable the branding daemon."""
+ await self.cache_information.set("daemon_active", False)
- @_decorators.mock_in_debug(return_value=True)
- async def set_icon(self, url: str) -> bool:
- """Sets the guild's icon to image at `url`."""
- guild = self.bot.get_guild(Guild.id)
- if guild is None:
- log.info("Failed to get guild instance, aborting asset upload")
- return False
+ if self.daemon_loop.is_running():
+ self.daemon_loop.cancel()
+ resp = make_embed("Daemon disabled!", "It will not awaken on start-up.", success=True)
+ else:
+ resp = make_embed("Daemon is already disabled!", "", success=False)
- return await self._apply_asset(guild, _constants.AssetType.SERVER_ICON, url)
+ await ctx.send(embed=resp)
- def cog_unload(self) -> None:
- """Cancels startup and daemon task."""
- self._startup_task.cancel()
- if self.daemon is not None:
- self.daemon.cancel()
+ @branding_daemon_group.command(name="status")
+ async def branding_daemon_status_cmd(self, ctx: commands.Context) -> None:
+ """Check whether the daemon is currently enabled."""
+ if self.daemon_loop.is_running():
+ resp = make_embed("Daemon is enabled", "Use `branding daemon disable` to stop.", success=True)
+ else:
+ resp = make_embed("Daemon is disabled", "Use `branding daemon enable` to start.", success=False)
+
+ await ctx.send(embed=resp)
+
+ # endregion
diff --git a/bot/exts/backend/branding/_constants.py b/bot/exts/backend/branding/_constants.py
deleted file mode 100644
index ca8e8c5f5..000000000
--- a/bot/exts/backend/branding/_constants.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from enum import Enum, IntEnum
-
-from bot.constants import Keys
-
-
-class Month(IntEnum):
- """All month constants for seasons."""
-
- JANUARY = 1
- FEBRUARY = 2
- MARCH = 3
- APRIL = 4
- MAY = 5
- JUNE = 6
- JULY = 7
- AUGUST = 8
- SEPTEMBER = 9
- OCTOBER = 10
- NOVEMBER = 11
- DECEMBER = 12
-
- def __str__(self) -> str:
- return self.name.title()
-
-
-class AssetType(Enum):
- """
- Discord media assets.
-
- The values match exactly the kwarg keys that can be passed to `Guild.edit`.
- """
-
- BANNER = "banner"
- SERVER_ICON = "icon"
-
-
-STATUS_OK = 200 # HTTP status code
-
-FILE_BANNER = "banner.png"
-FILE_AVATAR = "avatar.png"
-SERVER_ICONS = "server_icons"
-
-BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents"
-
-PARAMS = {"ref": "main"} # Target branch
-HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3
-
-# A GitHub token is not necessary for the cog to operate,
-# unauthorized requests are however limited to 60 per hour
-if Keys.github:
- HEADERS["Authorization"] = f"token {Keys.github}"
diff --git a/bot/exts/backend/branding/_decorators.py b/bot/exts/backend/branding/_decorators.py
deleted file mode 100644
index 6a1e7e869..000000000
--- a/bot/exts/backend/branding/_decorators.py
+++ /dev/null
@@ -1,27 +0,0 @@
-import functools
-import logging
-import typing as t
-
-from bot.constants import DEBUG_MODE
-
-log = logging.getLogger(__name__)
-
-
-def mock_in_debug(return_value: t.Any) -> t.Callable:
- """
- Short-circuit function execution if in debug mode and return `return_value`.
-
- The original function name, and the incoming args and kwargs are DEBUG level logged
- upon each call. This is useful for expensive operations, i.e. media asset uploads
- that are prone to rate-limits but need to be tested extensively.
- """
- def decorator(func: t.Callable) -> t.Callable:
- @functools.wraps(func)
- async def wrapped(*args, **kwargs) -> t.Any:
- """Short-circuit and log if in debug mode."""
- if DEBUG_MODE:
- log.debug(f"Function {func.__name__} called with args: {args}, kwargs: {kwargs}")
- return return_value
- return await func(*args, **kwargs)
- return wrapped
- return decorator
diff --git a/bot/exts/backend/branding/_errors.py b/bot/exts/backend/branding/_errors.py
deleted file mode 100644
index 7cd271af3..000000000
--- a/bot/exts/backend/branding/_errors.py
+++ /dev/null
@@ -1,2 +0,0 @@
-class BrandingError(Exception):
- """Exception raised by the BrandingManager cog."""
diff --git a/bot/exts/backend/branding/_repository.py b/bot/exts/backend/branding/_repository.py
new file mode 100644
index 000000000..7b09d4641
--- /dev/null
+++ b/bot/exts/backend/branding/_repository.py
@@ -0,0 +1,240 @@
+import logging
+import typing as t
+from datetime import date, datetime
+
+import frontmatter
+
+from bot.bot import Bot
+from bot.constants import Keys
+from bot.errors import BrandingMisconfiguration
+
+# Base URL for requests into the branding repository.
+BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents"
+
+PARAMS = {"ref": "main"} # Target branch.
+HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3.
+
+# A GitHub token is not necessary. However, unauthorized requests are limited to 60 per hour.
+if Keys.github:
+ HEADERS["Authorization"] = f"token {Keys.github}"
+
+# Since event periods are year-agnostic, we parse them into `datetime` objects with a manually inserted year.
+# Please note that this is intentionally a leap year to allow Feb 29 to be valid.
+ARBITRARY_YEAR = 2020
+
+# Format used to parse date strings after we inject `ARBITRARY_YEAR` at the end.
+DATE_FMT = "%B %d %Y" # Ex: July 10 2020
+
+log = logging.getLogger(__name__)
+
+
+class RemoteObject:
+ """
+ Remote file or directory on GitHub.
+
+ The annotations match keys in the response JSON that we're interested in.
+ """
+
+ sha: str # Hash helps us detect asset change.
+ name: str # Filename.
+ path: str # Path from repo root.
+ type: str # Either 'file' or 'dir'.
+ download_url: t.Optional[str] # If type is 'dir', this is None!
+
+ def __init__(self, dictionary: t.Dict[str, t.Any]) -> None:
+ """Initialize by grabbing annotated attributes from `dictionary`."""
+ missing_keys = self.__annotations__.keys() - dictionary.keys()
+ if missing_keys:
+ raise KeyError(f"Fetched object lacks expected keys: {missing_keys}")
+ for annotation in self.__annotations__:
+ setattr(self, annotation, dictionary[annotation])
+
+
+class MetaFile(t.NamedTuple):
+ """Attributes defined in a 'meta.md' file."""
+
+ is_fallback: bool
+ start_date: t.Optional[date]
+ end_date: t.Optional[date]
+ description: str # Markdown event description.
+
+
+class Event(t.NamedTuple):
+ """Event defined in the branding repository."""
+
+ path: str # Path from repo root where event lives. This is the event's identity.
+ meta: MetaFile
+ banner: RemoteObject
+ icons: t.List[RemoteObject]
+
+ def __str__(self) -> str:
+ return f"<Event at '{self.path}'>"
+
+
+class BrandingRepository:
+ """
+ Branding repository abstraction.
+
+ This class represents the branding repository's main branch and exposes available events and assets
+ as objects. It performs the necessary amount of validation to ensure that a misconfigured event
+ isn't returned. Such events are simply ignored, and will be substituted with the fallback event,
+ if available. Warning logs will inform core developers if a misconfigured event is encountered.
+
+ Colliding events cause no special behaviour. In such cases, the first found active event is returned.
+ We work with the assumption that the branding repository checks for such conflicts and prevents them
+ from reaching the main branch.
+
+ This class keeps no internal state. All `get_current_event` calls will result in GitHub API requests.
+ The caller is therefore responsible for being responsible and caching information to prevent API abuse.
+
+ Requests are made using the HTTP session looked up on the bot instance.
+ """
+
+ def __init__(self, bot: Bot) -> None:
+ self.bot = bot
+
+ async def fetch_directory(self, path: str, types: t.Container[str] = ("file", "dir")) -> t.Dict[str, RemoteObject]:
+ """
+ Fetch directory found at `path` in the branding repository.
+
+ Raise an exception if the request fails, or if the response lacks the expected keys.
+
+ Passing custom `types` allows getting only files or directories. By default, both are included.
+ """
+ full_url = f"{BRANDING_URL}/{path}"
+ log.debug(f"Fetching directory from branding repository: '{full_url}'.")
+
+ async with self.bot.http_session.get(full_url, params=PARAMS, headers=HEADERS) as response:
+ if response.status != 200:
+ raise RuntimeError(f"Failed to fetch directory due to status: {response.status}")
+
+ log.debug("Fetch successful, reading JSON response.")
+ json_directory = await response.json()
+
+ return {file["name"]: RemoteObject(file) for file in json_directory if file["type"] in types}
+
+ async def fetch_file(self, download_url: str) -> bytes:
+ """
+ Fetch file as bytes from `download_url`.
+
+ Raise an exception if the request does not succeed.
+ """
+ log.debug(f"Fetching file from branding repository: '{download_url}'.")
+
+ async with self.bot.http_session.get(download_url, params=PARAMS, headers=HEADERS) as response:
+ if response.status != 200:
+ raise RuntimeError(f"Failed to fetch file due to status: {response.status}")
+
+ log.debug("Fetch successful, reading payload.")
+ return await response.read()
+
+ def parse_meta_file(self, raw_file: bytes) -> MetaFile:
+ """
+ Parse a 'meta.md' file from raw bytes.
+
+ The caller is responsible for handling errors caused by misconfiguration.
+ """
+ attrs, description = frontmatter.parse(raw_file, encoding="UTF-8")
+
+ if not description:
+ raise BrandingMisconfiguration("No description found in 'meta.md'!")
+
+ if attrs.get("fallback", False):
+ return MetaFile(is_fallback=True, start_date=None, end_date=None, description=description)
+
+ start_date_raw = attrs.get("start_date")
+ end_date_raw = attrs.get("end_date")
+
+ if None in (start_date_raw, end_date_raw):
+ raise BrandingMisconfiguration("Non-fallback event doesn't have start and end dates defined!")
+
+ # We extend the configured month & day with an arbitrary leap year, allowing a datetime object to exist.
+ # This may raise errors if misconfigured. We let the caller handle such cases.
+ start_date = datetime.strptime(f"{start_date_raw} {ARBITRARY_YEAR}", DATE_FMT).date()
+ end_date = datetime.strptime(f"{end_date_raw} {ARBITRARY_YEAR}", DATE_FMT).date()
+
+ return MetaFile(is_fallback=False, start_date=start_date, end_date=end_date, description=description)
+
+ async def construct_event(self, directory: RemoteObject) -> Event:
+ """
+ Construct an `Event` instance from an event `directory`.
+
+ The caller is responsible for handling errors caused by misconfiguration.
+ """
+ contents = await self.fetch_directory(directory.path)
+
+ missing_assets = {"meta.md", "banner.png", "server_icons"} - contents.keys()
+
+ if missing_assets:
+ raise BrandingMisconfiguration(f"Directory is missing following assets: {missing_assets}")
+
+ server_icons = await self.fetch_directory(contents["server_icons"].path, types=("file",))
+
+ if len(server_icons) == 0:
+ raise BrandingMisconfiguration("Found no server icons!")
+
+ meta_bytes = await self.fetch_file(contents["meta.md"].download_url)
+
+ meta_file = self.parse_meta_file(meta_bytes)
+
+ return Event(directory.path, meta_file, contents["banner.png"], list(server_icons.values()))
+
+ async def get_events(self) -> t.List[Event]:
+ """
+ Discover available events in the branding repository.
+
+ Misconfigured events are skipped. May return an empty list in the catastrophic case.
+ """
+ log.debug("Discovering events in branding repository.")
+
+ try:
+ event_directories = await self.fetch_directory("events", types=("dir",)) # Skip files.
+ except Exception:
+ log.exception("Failed to fetch 'events' directory.")
+ return []
+
+ instances: t.List[Event] = []
+
+ for event_directory in event_directories.values():
+ log.trace(f"Attempting to construct event from directory: '{event_directory.path}'.")
+ try:
+ instance = await self.construct_event(event_directory)
+ except Exception as exc:
+ log.warning(f"Could not construct event '{event_directory.path}'.", exc_info=exc)
+ else:
+ instances.append(instance)
+
+ return instances
+
+ async def get_current_event(self) -> t.Tuple[t.Optional[Event], t.List[Event]]:
+ """
+ Get the currently active event, or the fallback event.
+
+ The second return value is a list of all available events. The caller may discard it, if not needed.
+ Returning all events alongside the current one prevents having to query the API twice in some cases.
+
+ The current event may be None in the case that no event is active, and no fallback event is found.
+ """
+ utc_now = datetime.utcnow()
+ log.debug(f"Finding active event for: {utc_now}.")
+
+ # Construct an object in the arbitrary year for the purpose of comparison.
+ lookup_now = date(year=ARBITRARY_YEAR, month=utc_now.month, day=utc_now.day)
+ log.trace(f"Lookup object in arbitrary year: {lookup_now}.")
+
+ available_events = await self.get_events()
+ log.trace(f"Found {len(available_events)} available events.")
+
+ for event in available_events:
+ meta = event.meta
+ if not meta.is_fallback and (meta.start_date <= lookup_now <= meta.end_date):
+ return event, available_events
+
+ log.trace("No active event found. Looking for fallback event.")
+
+ for event in available_events:
+ if event.meta.is_fallback:
+ return event, available_events
+
+ log.warning("No event is currently active and no fallback event was found!")
+ return None, available_events
diff --git a/bot/exts/backend/branding/_seasons.py b/bot/exts/backend/branding/_seasons.py
deleted file mode 100644
index 5f6256b30..000000000
--- a/bot/exts/backend/branding/_seasons.py
+++ /dev/null
@@ -1,175 +0,0 @@
-import logging
-import typing as t
-from datetime import datetime
-
-from bot.constants import Colours
-from bot.exts.backend.branding._constants import Month
-from bot.exts.backend.branding._errors import BrandingError
-
-log = logging.getLogger(__name__)
-
-
-class SeasonBase:
- """
- Base for Seasonal classes.
-
- This serves as the off-season fallback for when no specific
- seasons are active.
-
- Seasons are 'registered' simply by inheriting from `SeasonBase`.
- We discover them by calling `__subclasses__`.
- """
-
- season_name: str = "Evergreen"
-
- colour: str = Colours.soft_green
- description: str = "The default season!"
-
- branding_path: str = "seasonal/evergreen"
-
- months: t.Set[Month] = set(Month)
-
-
-class Christmas(SeasonBase):
- """Branding for December."""
-
- season_name = "Festive season"
-
- colour = Colours.soft_red
- description = (
- "The time is here to get into the festive spirit! No matter who you are, where you are, "
- "or what beliefs you may follow, we hope every one of you enjoy this festive season!"
- )
-
- branding_path = "seasonal/christmas"
-
- months = {Month.DECEMBER}
-
-
-class Easter(SeasonBase):
- """Branding for April."""
-
- season_name = "Easter"
-
- colour = Colours.bright_green
- description = (
- "Bunny here, bunny there, bunny everywhere! Here at Python Discord, we celebrate "
- "our version of Easter during the entire month of April."
- )
-
- branding_path = "seasonal/easter"
-
- months = {Month.APRIL}
-
-
-class Halloween(SeasonBase):
- """Branding for October."""
-
- season_name = "Halloween"
-
- colour = Colours.orange
- description = "Trick or treat?!"
-
- branding_path = "seasonal/halloween"
-
- months = {Month.OCTOBER}
-
-
-class Pride(SeasonBase):
- """Branding for June."""
-
- season_name = "Pride"
-
- colour = Colours.pink
- description = (
- "The month of June is a special month for us at Python Discord. It is very important to us "
- "that everyone feels welcome here, no matter their origin, identity or sexuality. During the "
- "month of June, while some of you are participating in Pride festivals across the world, "
- "we will be celebrating individuality and commemorating the history and challenges "
- "of the LGBTQ+ community with a Pride event of our own!"
- )
-
- branding_path = "seasonal/pride"
-
- months = {Month.JUNE}
-
-
-class Valentines(SeasonBase):
- """Branding for February."""
-
- season_name = "Valentines"
-
- colour = Colours.pink
- description = "Love is in the air!"
-
- branding_path = "seasonal/valentines"
-
- months = {Month.FEBRUARY}
-
-
-class Wildcard(SeasonBase):
- """Branding for August."""
-
- season_name = "Wildcard"
-
- colour = Colours.purple
- description = "A season full of surprises!"
-
- months = {Month.AUGUST}
-
-
-def get_all_seasons() -> t.List[t.Type[SeasonBase]]:
- """Give all available season classes."""
- return [SeasonBase] + SeasonBase.__subclasses__()
-
-
-def get_current_season() -> t.Type[SeasonBase]:
- """Give active season, based on current UTC month."""
- current_month = Month(datetime.utcnow().month)
-
- active_seasons = tuple(
- season
- for season in SeasonBase.__subclasses__()
- if current_month in season.months
- )
-
- if not active_seasons:
- return SeasonBase
-
- return active_seasons[0]
-
-
-def get_season(name: str) -> t.Optional[t.Type[SeasonBase]]:
- """
- Give season such that its class name or its `season_name` attr match `name` (caseless).
-
- If no such season exists, return None.
- """
- name = name.casefold()
-
- for season in get_all_seasons():
- matches = (season.__name__.casefold(), season.season_name.casefold())
-
- if name in matches:
- return season
-
-
-def _validate_season_overlap() -> None:
- """
- Raise BrandingError if there are any colliding seasons.
-
- This serves as a local test to ensure that seasons haven't been misconfigured.
- """
- month_to_season = {}
-
- for season in SeasonBase.__subclasses__():
- for month in season.months:
- colliding_season = month_to_season.get(month)
-
- if colliding_season:
- raise BrandingError(f"Season {season} collides with {colliding_season} in {month.name}")
- else:
- month_to_season[month] = season
-
-
-_validate_season_overlap()
diff --git a/bot/exts/backend/error_handler.py b/bot/exts/backend/error_handler.py
index 9cb54cdab..76ab7dfc2 100644
--- a/bot/exts/backend/error_handler.py
+++ b/bot/exts/backend/error_handler.py
@@ -1,7 +1,6 @@
import contextlib
import difflib
import logging
-import random
import typing as t
from discord import Embed
@@ -10,10 +9,9 @@ from sentry_sdk import push_scope
from bot.api import ResponseCodeError
from bot.bot import Bot
-from bot.constants import Colours, ERROR_REPLIES, Icons, MODERATION_ROLES
+from bot.constants import Colours, Icons, MODERATION_ROLES
from bot.converters import TagNameConverter
from bot.errors import InvalidInfractedUser, LockedResourceError
-from bot.exts.backend.branding._errors import BrandingError
from bot.utils.checks import InWhitelistCheckFailure
log = logging.getLogger(__name__)
@@ -79,9 +77,6 @@ class ErrorHandler(Cog):
await self.handle_api_error(ctx, e.original)
elif isinstance(e.original, LockedResourceError):
await ctx.send(f"{e.original} Please wait for it to finish and try again later.")
- elif isinstance(e.original, BrandingError):
- await ctx.send(embed=self._get_error_embed(random.choice(ERROR_REPLIES), str(e.original)))
- return
elif isinstance(e.original, InvalidInfractedUser):
await ctx.send(f"Cannot infract that user. {e.original.reason}")
else: