aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar kwzrd <[email protected]>2021-03-14 00:32:30 +0100
committerGravatar kwzrd <[email protected]>2021-03-14 09:55:09 +0100
commitd7bd0c348d6dd8be18174bb67ecf210362070b20 (patch)
tree908a37d7919a4e67dca34a6fc54ac71409684f45
parentBranding: do not call 'rotate_icons' from rotation init (diff)
Branding: propagate success-indicating boolean from 'apply_asset'
The sync command will now be able to use present this information to the invoking user. This commit also prevents the cached banner & icon hash from being overwritten in the case of asset upload failure. As a result, the daemon will attempt to re-apply the assets the following day.
-rw-r--r--bot/exts/backend/branding/_cog.py62
1 files changed, 43 insertions, 19 deletions
diff --git a/bot/exts/backend/branding/_cog.py b/bot/exts/backend/branding/_cog.py
index cd645fba4..dd19832af 100644
--- a/bot/exts/backend/branding/_cog.py
+++ b/bot/exts/backend/branding/_cog.py
@@ -106,13 +106,15 @@ class Branding(commands.Cog):
# region: Internal utility
- @mock_in_debug(return_value=None)
- async def apply_asset(self, asset_type: AssetType, download_url: str) -> None:
+ @mock_in_debug(return_value=True)
+ async def apply_asset(self, asset_type: AssetType, download_url: str) -> bool:
"""
Download asset from `download_url` and apply it to PyDis as `asset_type`.
This function is mocked in the development environment in order to prevent API spam during testing.
Decorator should be temporarily removed in order to test internal methodology.
+
+ Returns a boolean indicating whether the application was successful.
"""
log.info(f"Applying {asset_type.value} asset to the guild")
@@ -120,7 +122,7 @@ class Branding(commands.Cog):
if file is None:
log.error(f"Failed to download {asset_type.value} from branding repository!")
- return
+ return False
await self.bot.wait_until_guild_available()
pydis: discord.Guild = self.bot.get_guild(Guild.id)
@@ -131,21 +133,30 @@ class Branding(commands.Cog):
await pydis.edit(**{asset_type.value: file})
except discord.HTTPException as http_exc:
log.error(f"Asset upload to Discord failed: {http_exc}")
+ return False
except asyncio.TimeoutError:
log.error(f"Asset upload to Discord timed out after {timeout} seconds!")
+ return False
else:
log.debug("Asset uploaded successfully!")
+ return True
- async def apply_banner(self, banner: RemoteObject) -> None:
+ async def apply_banner(self, banner: RemoteObject) -> bool:
"""
- Apply `banner` to the guild and cache its hash.
+ Apply `banner` to the guild and cache its hash if successful.
Banners should always be applied via this method in order to ensure that the last hash is cached.
+
+ Returns a boolean indicating whether the application was successful.
"""
- await self.apply_asset(AssetType.BANNER, banner.download_url)
- await self.cache_information.set("banner_hash", banner.sha)
+ success = await self.apply_asset(AssetType.BANNER, banner.download_url)
+
+ if success:
+ await self.cache_information.set("banner_hash", banner.sha)
- async def rotate_icons(self) -> None:
+ return success
+
+ async def rotate_icons(self) -> bool:
"""
Choose and apply the next-up icon in rotation.
@@ -155,6 +166,8 @@ class Branding(commands.Cog):
Once the current iteration (lowest count in the cache) depletes, we move onto the next iteration.
In the case that there is only 1 icon in the rotation and has already been applied, do nothing.
+
+ Returns a boolean indicating whether a new icon was applied successfully.
"""
log.debug("Rotating icons")
@@ -163,7 +176,7 @@ class Branding(commands.Cog):
if len(state) == 1 and 1 in state.values():
log.debug("Aborting icon rotation: only 1 icon is available and has already been applied")
- return
+ return False
current_iteration = min(state.values()) # Choose iteration to draw from
options = [download_url for download_url, times_used in state.items() if times_used == current_iteration]
@@ -171,11 +184,15 @@ class Branding(commands.Cog):
log.trace(f"Choosing from {len(options)} icons in iteration {current_iteration}")
next_icon = random.choice(options)
- await self.apply_asset(AssetType.ICON, next_icon)
- await self.cache_icons.increment(next_icon) # Push the icon into the next iteration
+ success = await self.apply_asset(AssetType.ICON, next_icon)
- timestamp = datetime.utcnow().timestamp()
- await self.cache_information.set("last_rotation_timestamp", timestamp)
+ if success:
+ await self.cache_icons.increment(next_icon) # Push the icon into the next iteration
+
+ timestamp = datetime.utcnow().timestamp()
+ await self.cache_information.set("last_rotation_timestamp", timestamp)
+
+ return success
async def maybe_rotate_icons(self) -> None:
"""
@@ -251,7 +268,7 @@ class Branding(commands.Cog):
await channel.send(embed=embed)
- async def enter_event(self, event: Event) -> None:
+ async def enter_event(self, event: Event) -> t.Tuple[bool, bool]:
"""
Enter `event` and update information cache.
@@ -263,13 +280,15 @@ class Branding(commands.Cog):
* Provide an on-demand information embed without re-querying the branding repository
An event change should always be handled via this function, as it ensures that the cache is populated.
+
+ Returns a 2-tuple indicating whether the banner, and the icon, were applied successfully.
"""
log.debug(f"Entering new event: {event.path}")
- await self.apply_banner(event.banner) # Only one asset ~ apply directly
+ banner_success = await self.apply_banner(event.banner) # Only one asset ~ apply directly
await self.initiate_icon_rotation(event.icons) # Prepare a new rotation
- await self.rotate_icons() # Apply an icon from the new rotation
+ icon_success = await self.rotate_icons() # Apply an icon from the new rotation
# Cache event identity to avoid re-entry in case of restart
await self.cache_information.set("event_path", event.path)
@@ -281,13 +300,17 @@ class Branding(commands.Cog):
# Notify guild of new event ~ this reads the information that we cached above!
await self.send_info_embed(Channels.change_log)
- async def synchronise(self) -> None:
+ return banner_success, icon_success
+
+ async def synchronise(self) -> t.Tuple[bool, bool]:
"""
Fetch the current event and delegate to `enter_event`.
This is a convenience wrapper to force synchronisation either via a command, or when the daemon starts
with an empty cache. It is generally only used in a recovery scenario. In the usual case, the daemon
already has an `Event` instance and can pass it to `enter_event` directly.
+
+ Returns a 2-tuple indicating whether the banner, and the icon, were applied successfully.
"""
log.debug("Synchronise: fetching current event")
@@ -297,8 +320,9 @@ class Branding(commands.Cog):
if current_event is None:
log.error("Failed to fetch event ~ cannot synchronise!")
- else:
- await self.enter_event(current_event)
+ return False, False
+
+ return await self.enter_event(current_event)
async def populate_cache_events(self, events: t.List[Event]) -> None:
"""