diff options
| author | 2021-03-14 00:32:30 +0100 | |
|---|---|---|
| committer | 2021-03-14 09:55:09 +0100 | |
| commit | d7bd0c348d6dd8be18174bb67ecf210362070b20 (patch) | |
| tree | 908a37d7919a4e67dca34a6fc54ac71409684f45 | |
| parent | Branding: do not call 'rotate_icons' from rotation init (diff) | |
Branding: propagate success-indicating boolean from 'apply_asset'
The sync command will now be able to use present this information
to the invoking user.
This commit also prevents the cached banner & icon hash from being
overwritten in the case of asset upload failure. As a result, the
daemon will attempt to re-apply the assets the following day.
| -rw-r--r-- | bot/exts/backend/branding/_cog.py | 62 |
1 files changed, 43 insertions, 19 deletions
diff --git a/bot/exts/backend/branding/_cog.py b/bot/exts/backend/branding/_cog.py index cd645fba4..dd19832af 100644 --- a/bot/exts/backend/branding/_cog.py +++ b/bot/exts/backend/branding/_cog.py @@ -106,13 +106,15 @@ class Branding(commands.Cog): # region: Internal utility - @mock_in_debug(return_value=None) - async def apply_asset(self, asset_type: AssetType, download_url: str) -> None: + @mock_in_debug(return_value=True) + async def apply_asset(self, asset_type: AssetType, download_url: str) -> bool: """ Download asset from `download_url` and apply it to PyDis as `asset_type`. This function is mocked in the development environment in order to prevent API spam during testing. Decorator should be temporarily removed in order to test internal methodology. + + Returns a boolean indicating whether the application was successful. """ log.info(f"Applying {asset_type.value} asset to the guild") @@ -120,7 +122,7 @@ class Branding(commands.Cog): if file is None: log.error(f"Failed to download {asset_type.value} from branding repository!") - return + return False await self.bot.wait_until_guild_available() pydis: discord.Guild = self.bot.get_guild(Guild.id) @@ -131,21 +133,30 @@ class Branding(commands.Cog): await pydis.edit(**{asset_type.value: file}) except discord.HTTPException as http_exc: log.error(f"Asset upload to Discord failed: {http_exc}") + return False except asyncio.TimeoutError: log.error(f"Asset upload to Discord timed out after {timeout} seconds!") + return False else: log.debug("Asset uploaded successfully!") + return True - async def apply_banner(self, banner: RemoteObject) -> None: + async def apply_banner(self, banner: RemoteObject) -> bool: """ - Apply `banner` to the guild and cache its hash. + Apply `banner` to the guild and cache its hash if successful. Banners should always be applied via this method in order to ensure that the last hash is cached. + + Returns a boolean indicating whether the application was successful. """ - await self.apply_asset(AssetType.BANNER, banner.download_url) - await self.cache_information.set("banner_hash", banner.sha) + success = await self.apply_asset(AssetType.BANNER, banner.download_url) + + if success: + await self.cache_information.set("banner_hash", banner.sha) - async def rotate_icons(self) -> None: + return success + + async def rotate_icons(self) -> bool: """ Choose and apply the next-up icon in rotation. @@ -155,6 +166,8 @@ class Branding(commands.Cog): Once the current iteration (lowest count in the cache) depletes, we move onto the next iteration. In the case that there is only 1 icon in the rotation and has already been applied, do nothing. + + Returns a boolean indicating whether a new icon was applied successfully. """ log.debug("Rotating icons") @@ -163,7 +176,7 @@ class Branding(commands.Cog): if len(state) == 1 and 1 in state.values(): log.debug("Aborting icon rotation: only 1 icon is available and has already been applied") - return + return False current_iteration = min(state.values()) # Choose iteration to draw from options = [download_url for download_url, times_used in state.items() if times_used == current_iteration] @@ -171,11 +184,15 @@ class Branding(commands.Cog): log.trace(f"Choosing from {len(options)} icons in iteration {current_iteration}") next_icon = random.choice(options) - await self.apply_asset(AssetType.ICON, next_icon) - await self.cache_icons.increment(next_icon) # Push the icon into the next iteration + success = await self.apply_asset(AssetType.ICON, next_icon) - timestamp = datetime.utcnow().timestamp() - await self.cache_information.set("last_rotation_timestamp", timestamp) + if success: + await self.cache_icons.increment(next_icon) # Push the icon into the next iteration + + timestamp = datetime.utcnow().timestamp() + await self.cache_information.set("last_rotation_timestamp", timestamp) + + return success async def maybe_rotate_icons(self) -> None: """ @@ -251,7 +268,7 @@ class Branding(commands.Cog): await channel.send(embed=embed) - async def enter_event(self, event: Event) -> None: + async def enter_event(self, event: Event) -> t.Tuple[bool, bool]: """ Enter `event` and update information cache. @@ -263,13 +280,15 @@ class Branding(commands.Cog): * Provide an on-demand information embed without re-querying the branding repository An event change should always be handled via this function, as it ensures that the cache is populated. + + Returns a 2-tuple indicating whether the banner, and the icon, were applied successfully. """ log.debug(f"Entering new event: {event.path}") - await self.apply_banner(event.banner) # Only one asset ~ apply directly + banner_success = await self.apply_banner(event.banner) # Only one asset ~ apply directly await self.initiate_icon_rotation(event.icons) # Prepare a new rotation - await self.rotate_icons() # Apply an icon from the new rotation + icon_success = await self.rotate_icons() # Apply an icon from the new rotation # Cache event identity to avoid re-entry in case of restart await self.cache_information.set("event_path", event.path) @@ -281,13 +300,17 @@ class Branding(commands.Cog): # Notify guild of new event ~ this reads the information that we cached above! await self.send_info_embed(Channels.change_log) - async def synchronise(self) -> None: + return banner_success, icon_success + + async def synchronise(self) -> t.Tuple[bool, bool]: """ Fetch the current event and delegate to `enter_event`. This is a convenience wrapper to force synchronisation either via a command, or when the daemon starts with an empty cache. It is generally only used in a recovery scenario. In the usual case, the daemon already has an `Event` instance and can pass it to `enter_event` directly. + + Returns a 2-tuple indicating whether the banner, and the icon, were applied successfully. """ log.debug("Synchronise: fetching current event") @@ -297,8 +320,9 @@ class Branding(commands.Cog): if current_event is None: log.error("Failed to fetch event ~ cannot synchronise!") - else: - await self.enter_event(current_event) + return False, False + + return await self.enter_event(current_event) async def populate_cache_events(self, events: t.List[Event]) -> None: """ |