diff options
Diffstat (limited to 'bot/exts')
-rw-r--r-- | bot/exts/evergreen/branding.py | 540 | ||||
-rw-r--r-- | bot/exts/evergreen/error_handler.py | 6 | ||||
-rw-r--r-- | bot/exts/halloween/candy_collection.py | 60 | ||||
-rw-r--r-- | bot/exts/halloween/hacktoberstats.py | 79 |
4 files changed, 39 insertions, 646 deletions
diff --git a/bot/exts/evergreen/branding.py b/bot/exts/evergreen/branding.py deleted file mode 100644 index fa607270..00000000 --- a/bot/exts/evergreen/branding.py +++ /dev/null @@ -1,540 +0,0 @@ -import asyncio -import itertools -# import json -import logging -import random -import typing as t -from datetime import datetime, time, timedelta -from pathlib import Path - -import arrow -import discord -from discord.embeds import EmptyEmbed -from discord.ext import commands - -from bot.bot import SeasonalBot -from bot.constants import Branding, Colours, Emojis, MODERATION_ROLES, Tokens -from bot.seasons import SeasonBase, get_all_seasons, get_current_season, get_season -from bot.utils import human_months -from bot.utils.decorators import with_role -from bot.utils.exceptions import BrandingError -# TODO: Implement substitute for current volume persistence requirements # noqa: T000 -# from bot.utils.persist import make_persistent - -log = logging.getLogger(__name__) - -STATUS_OK = 200 # HTTP status code - -FILE_BANNER = "banner.png" -FILE_AVATAR = "avatar.png" -SERVER_ICONS = "server_icons" - -BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" - -PARAMS = {"ref": "master"} # Target branch -HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3 - -# A GitHub token is not necessary for the cog to operate, -# unauthorized requests are however limited to 60 per hour -if Tokens.github: - HEADERS["Authorization"] = f"token {Tokens.github}" - - -class GitHubFile(t.NamedTuple): - """ - Represents a remote file on GitHub. - - The `sha` hash is kept so that we can determine that a file has changed, - despite its filename remaining unchanged. - """ - - download_url: str - path: str - sha: str - - -def pretty_files(files: t.Iterable[GitHubFile]) -> str: - """Provide a human-friendly representation of `files`.""" - return "\n".join(file.path for file in files) - - -def time_until_midnight() -> timedelta: - """ - Determine amount of time until the next-up UTC midnight. - - The exact `midnight` moment is actually delayed to 5 seconds after, in order - to avoid potential problems due to imprecise sleep. - """ - now = datetime.utcnow() - tomorrow = now + timedelta(days=1) - midnight = datetime.combine(tomorrow, time(second=5)) - - return midnight - now - - -class BrandingManager(commands.Cog): - """ - Manages the guild's branding. - - The purpose of this cog is to help automate the synchronization of the branding - repository with the guild. It is capable of discovering assets in the repository - via GitHub's API, resolving download urls for them, and delegating - to the `bot` instance to upload them to the guild. - - BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens - once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single - season. The daemon can be turned on and off via the `daemon` cmd group. The value set via - its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will - automatically start on the next bot start-up. Otherwise, it will wait to be started manually. - - All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can - also be invoked manually, via the following API: - - branding list - - Show all available seasons - - branding set <season_name> - - Set the cog's internal state to represent `season_name`, if it exists - - If no `season_name` is given, set chronologically current season - - This will not automatically apply the season's branding to the guild, - the cog's state can be detached from the guild - - Seasons can therefore be 'previewed' using this command - - branding info - - View detailed information about resolved assets for current season - - branding refresh - - Refresh internal state, i.e. synchronize with branding repository - - branding apply - - Apply the current internal state to the guild, i.e. upload the assets - - branding cycle - - If there are multiple available icons for current season, randomly pick - and apply the next one - - The daemon calls these methods autonomously as appropriate. The use of this cog - is locked to moderation roles. As it performs media asset uploads, it is prone to - rate-limits - the `apply` command should be used with caution. The `set` command can, - however, be used freely to 'preview' seasonal branding and check whether paths have been - resolved as appropriate. - - While the bot is in debug mode, it will 'mock' asset uploads by logging the passed - download urls and pretending that the upload was successful. Make use of this - to test this cog's behaviour. - """ - - current_season: t.Type[SeasonBase] - - banner: t.Optional[GitHubFile] - avatar: t.Optional[GitHubFile] - - available_icons: t.List[GitHubFile] - remaining_icons: t.List[GitHubFile] - - days_since_cycle: t.Iterator - - config_file: Path - - daemon: t.Optional[asyncio.Task] - - def __init__(self, bot: SeasonalBot) -> None: - """ - Assign safe default values on init. - - At this point, we don't have information about currently available branding. - Most of these attributes will be overwritten once the daemon connects, or once - the `refresh` command is used. - """ - self.bot = bot - self.current_season = get_current_season() - - self.banner = None - self.avatar = None - - self.available_icons = [] - self.remaining_icons = [] - - self.days_since_cycle = itertools.cycle([None]) - - # self.config_file = make_persistent(Path("bot", "resources", "evergreen", "branding.json")) - - # should_run = self._read_config()["daemon_active"] - - # if should_run: - # self.daemon = self.bot.loop.create_task(self._daemon_func()) - # else: - self.daemon = None - - @property - def _daemon_running(self) -> bool: - """True if the daemon is currently active, False otherwise.""" - return self.daemon is not None and not self.daemon.done() - - def _read_config(self) -> t.Dict[str, bool]: - """Read and return persistent config file.""" - raise NotImplementedError("read_config functionality requires mounting a persistent volume.") - - def _write_config(self, key: str, value: bool) -> None: - """Write a `key`, `value` pair to persistent config file.""" - raise NotImplementedError("write_config functionality requires mounting a persistent volume.") - - async def _daemon_func(self) -> None: - """ - Manage all automated behaviour of the BrandingManager cog. - - Once a day, the daemon will perform the following tasks: - - Update `current_season` - - Poll GitHub API to see if the available branding for `current_season` has changed - - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname) - - Check whether it's time to cycle guild icons - - The internal loop runs once when activated, then periodically at the time - given by `time_until_midnight`. - - All method calls in the internal loop are considered safe, i.e. no errors propagate - to the daemon's loop. The daemon itself does not perform any error handling on its own. - """ - await self.bot.wait_until_guild_available() - - while True: - self.current_season = get_current_season() - branding_changed = await self.refresh() - - if branding_changed: - await self.apply() - - elif next(self.days_since_cycle) == Branding.cycle_frequency: - await self.cycle() - - until_midnight = time_until_midnight() - await asyncio.sleep(until_midnight.total_seconds()) - - async def _info_embed(self) -> discord.Embed: - """Make an informative embed representing current season.""" - info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour) - - # If we're in a non-evergreen season, also show active months - if self.current_season is not SeasonBase: - title = f"{self.current_season.season_name} ({human_months(self.current_season.months)})" - else: - title = self.current_season.season_name - - # Use the author field to show the season's name and avatar if available - info_embed.set_author(name=title, icon_url=self.avatar.download_url if self.avatar else EmptyEmbed) - - banner = self.banner.path if self.banner is not None else "Unavailable" - info_embed.add_field(name="Banner", value=banner, inline=False) - - avatar = self.avatar.path if self.avatar is not None else "Unavailable" - info_embed.add_field(name="Avatar", value=avatar, inline=False) - - icons = pretty_files(self.available_icons) or "Unavailable" - info_embed.add_field(name="Available icons", value=icons, inline=False) - - # Only display cycle frequency if we're actually cycling - if len(self.available_icons) > 1 and Branding.cycle_frequency: - info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}") - - return info_embed - - async def _reset_remaining_icons(self) -> None: - """Set `remaining_icons` to a shuffled copy of `available_icons`.""" - self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons)) - - async def _reset_days_since_cycle(self) -> None: - """ - Reset the `days_since_cycle` iterator based on configured frequency. - - If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey, - the iterator will always yield None. This signals that the icon shouldn't be cycled. - - Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely. - When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle. - """ - if len(self.available_icons) > 1 and Branding.cycle_frequency: - sequence = range(1, Branding.cycle_frequency + 1) - else: - sequence = [None] - - self.days_since_cycle = itertools.cycle(sequence) - - async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]: - """ - Get files at `path` in the branding repository. - - If `include_dirs` is False (default), only returns files at `path`. - Otherwise, will return both files and directories. Never returns symlinks. - - Return dict mapping from filename to corresponding `GitHubFile` instance. - This may return an empty dict if the response status is non-200, - or if the target directory is empty. - """ - url = f"{BRANDING_URL}/{path}" - async with self.bot.http_session.get(url, headers=HEADERS, params=PARAMS) as resp: - # Short-circuit if we get non-200 response - if resp.status != STATUS_OK: - log.error(f"GitHub API returned non-200 response: {resp}") - return {} - directory = await resp.json() # Directory at `path` - - allowed_types = {"file", "dir"} if include_dirs else {"file"} - return { - file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"]) - for file in directory - if file["type"] in allowed_types - } - - async def refresh(self) -> bool: - """ - Synchronize available assets with branding repository. - - If the current season is not the evergreen, and lacks at least one asset, - we use the evergreen seasonal dir as fallback for missing assets. - - Finally, if neither the seasonal nor fallback branding directories contain - an asset, it will simply be ignored. - - Return True if the branding has changed. This will be the case when we enter - a new season, or when something changes in the current seasons's directory - in the branding repository. - """ - old_branding = (self.banner, self.avatar, self.available_icons) - seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True) - - # Only make a call to the fallback directory if there is something to be gained - branding_incomplete = any( - asset not in seasonal_dir - for asset in (FILE_BANNER, FILE_AVATAR, SERVER_ICONS) - ) - if branding_incomplete and self.current_season is not SeasonBase: - fallback_dir = await self._get_files(SeasonBase.branding_path, include_dirs=True) - else: - fallback_dir = {} - - # Resolve assets in this directory, None is a safe value - self.banner = seasonal_dir.get(FILE_BANNER) or fallback_dir.get(FILE_BANNER) - self.avatar = seasonal_dir.get(FILE_AVATAR) or fallback_dir.get(FILE_AVATAR) - - # Now resolve server icons by making a call to the proper sub-directory - if SERVER_ICONS in seasonal_dir: - icons_dir = await self._get_files(f"{self.current_season.branding_path}/{SERVER_ICONS}") - self.available_icons = list(icons_dir.values()) - - elif SERVER_ICONS in fallback_dir: - icons_dir = await self._get_files(f"{SeasonBase.branding_path}/{SERVER_ICONS}") - self.available_icons = list(icons_dir.values()) - - else: - self.available_icons = [] # This should never be the case, but an empty list is a safe value - - # GitHubFile instances carry a `sha` attr so this will pick up if a file changes - branding_changed = old_branding != (self.banner, self.avatar, self.available_icons) - - if branding_changed: - log.info(f"New branding detected (season: {self.current_season.season_name})") - await self._reset_remaining_icons() - await self._reset_days_since_cycle() - - return branding_changed - - async def cycle(self) -> bool: - """ - Apply the next-up server icon. - - Returns True if an icon is available and successfully gets applied, False otherwise. - """ - if not self.available_icons: - log.info("Cannot cycle: no icons for this season") - return False - - if not self.remaining_icons: - log.info("Reset & shuffle remaining icons") - await self._reset_remaining_icons() - - next_up = self.remaining_icons.pop(0) - success = await self.bot.set_icon(next_up.download_url) - - return success - - async def apply(self) -> t.List[str]: - """ - Apply current branding to the guild and bot. - - This delegates to the bot instance to do all the work. We only provide download urls - for available assets. Assets unavailable in the branding repo will be ignored. - - Returns a list of names of all failed assets. An asset is considered failed - if it isn't found in the branding repo, or if something goes wrong while the - bot is trying to apply it. - - An empty list denotes that all assets have been applied successfully. - """ - report = {asset: False for asset in ("banner", "avatar", "nickname", "icon")} - - if self.banner is not None: - report["banner"] = await self.bot.set_banner(self.banner.download_url) - - if self.avatar is not None: - report["avatar"] = await self.bot.set_avatar(self.avatar.download_url) - - if self.current_season.bot_name: - report["nickname"] = await self.bot.set_nickname(self.current_season.bot_name) - - report["icon"] = await self.cycle() - - failed_assets = [asset for asset, succeeded in report.items() if not succeeded] - return failed_assets - - @with_role(*MODERATION_ROLES) - @commands.group(name="branding") - async def branding_cmds(self, ctx: commands.Context) -> None: - """Manual branding control.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @branding_cmds.command(name="list", aliases=["ls"]) - async def branding_list(self, ctx: commands.Context) -> None: - """List all available seasons and branding sources.""" - embed = discord.Embed(title="Available seasons", colour=Colours.soft_green) - - for season in get_all_seasons(): - if season is SeasonBase: - active_when = "always" - else: - active_when = f"in {human_months(season.months)}" - - description = ( - f"Active {active_when}\n" - f"Branding: {season.branding_path}" - ) - embed.add_field(name=season.season_name, value=description, inline=False) - - await ctx.send(embed=embed) - - @branding_cmds.command(name="set") - async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None: - """ - Manually set season, or reset to current if none given. - - Season search is a case-less comparison against both seasonal class name, - and its `season_name` attr. - - This only pre-loads the cog's internal state to the chosen season, but does not - automatically apply the branding. As that is an expensive operation, the `apply` - command must be called explicitly after this command finishes. - - This means that this command can be used to 'preview' a season gathering info - about its available assets, without applying them to the guild. - - If the daemon is running, it will automatically reset the season to current when - it wakes up. The season set via this command can therefore remain 'detached' from - what it should be - the daemon will make sure that it's set back properly. - """ - if season_name is None: - new_season = get_current_season() - else: - new_season = get_season(season_name) - if new_season is None: - raise BrandingError("No such season exists") - - if self.current_season is new_season: - raise BrandingError(f"Season {self.current_season.season_name} already active") - - self.current_season = new_season - await self.branding_refresh(ctx) - - @branding_cmds.command(name="info", aliases=["status"]) - async def branding_info(self, ctx: commands.Context) -> None: - """ - Show available assets for current season. - - This can be used to confirm that assets have been resolved properly. - When `apply` is used, it attempts to upload exactly the assets listed here. - """ - await ctx.send(embed=await self._info_embed()) - - @branding_cmds.command(name="refresh") - async def branding_refresh(self, ctx: commands.Context) -> None: - """Sync currently available assets with branding repository.""" - async with ctx.typing(): - await self.refresh() - await self.branding_info(ctx) - - @branding_cmds.command(name="apply") - async def branding_apply(self, ctx: commands.Context) -> None: - """ - Apply current season's branding to the guild. - - Use `info` to check which assets will be applied. Shows which assets have - failed to be applied, if any. - """ - async with ctx.typing(): - failed_assets = await self.apply() - if failed_assets: - raise BrandingError(f"Failed to apply following assets: {', '.join(failed_assets)}") - - response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @branding_cmds.command(name="cycle") - async def branding_cycle(self, ctx: commands.Context) -> None: - """ - Apply the next-up guild icon, if multiple are available. - - The order is random. - """ - async with ctx.typing(): - success = await self.cycle() - if not success: - raise BrandingError("Failed to cycle icon") - - response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @branding_cmds.group(name="daemon", aliases=["d", "task"]) - async def daemon_group(self, ctx: commands.Context) -> None: - """Control the background daemon.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @daemon_group.command(name="status") - async def daemon_status(self, ctx: commands.Context) -> None: - """Check whether daemon is currently active.""" - if self._daemon_running: - remaining_time = (arrow.utcnow() + time_until_midnight()).humanize() - response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green) - response.set_footer(text=f"Next refresh {remaining_time}") - else: - response = discord.Embed(description="Daemon not running", colour=Colours.soft_red) - - await ctx.send(embed=response) - - @daemon_group.command(name="start", enabled=False) - async def daemon_start(self, ctx: commands.Context) -> None: - """If the daemon isn't running, start it.""" - if self._daemon_running: - raise BrandingError("Daemon already running!") - - self.daemon = self.bot.loop.create_task(self._daemon_func()) - self._write_config("daemon_active", True) - - response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @daemon_group.command(name="stop", enabled=False) - async def daemon_stop(self, ctx: commands.Context) -> None: - """If the daemon is running, stop it.""" - if not self._daemon_running: - raise BrandingError("Daemon not running!") - - self.daemon.cancel() - self._write_config("daemon_active", False) - - response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - -def setup(bot: SeasonalBot) -> None: - """Load BrandingManager cog.""" - bot.add_cog(BrandingManager(bot)) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 459a2b2d..6e518435 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -9,7 +9,7 @@ from sentry_sdk import push_scope from bot.constants import Colours, ERROR_REPLIES, NEGATIVE_REPLIES from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import BrandingError, UserNotPlayingError +from bot.utils.exceptions import UserNotPlayingError log = logging.getLogger(__name__) @@ -57,10 +57,6 @@ class CommandErrorHandler(commands.Cog): if isinstance(error, commands.CommandNotFound): return - if isinstance(error, BrandingError): - await ctx.send(embed=self.error_embed(str(error))) - return - if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) return diff --git a/bot/exts/halloween/candy_collection.py b/bot/exts/halloween/candy_collection.py index bd0b90cc..33e18b24 100644 --- a/bot/exts/halloween/candy_collection.py +++ b/bot/exts/halloween/candy_collection.py @@ -1,18 +1,14 @@ -import json import logging import random -# from pathlib import Path from typing import Union import discord +from async_rediscache import RedisCache from discord.ext import commands from bot.constants import Channels, Month from bot.utils.decorators import in_month -# TODO: Implement substitutes for volume-persistent methods. # noqa: T000 -# from bot.utils.persist import make_persistent - log = logging.getLogger(__name__) # chance is 1 in x range, so 1 in 20 range would give 5% chance (for add candy) @@ -37,18 +33,15 @@ EMOJIS = dict( class CandyCollection(commands.Cog): """Candy collection game Cog.""" - def __init__(self, bot: commands.Bot): - self.bot = bot - # self.json_file = make_persistent(Path("bot", "resources", "halloween", "candy_collection.json")) + # User candy amount records + candy_records = RedisCache() - with self.json_file.open() as fp: - candy_data = json.load(fp) + # Candy and skull messages mapping + candy_messages = RedisCache() + skull_messages = RedisCache() - self.candy_records = candy_data.get("records", dict()) - - # Message ID where bot added the candies/skulls - self.candy_messages = set() - self.skull_messages = set() + def __init__(self, bot: commands.Bot): + self.bot = bot @in_month(Month.OCTOBER) @commands.Cog.listener() @@ -63,11 +56,11 @@ class CandyCollection(commands.Cog): # do random check for skull first as it has the lower chance if random.randint(1, ADD_SKULL_REACTION_CHANCE) == 1: - self.skull_messages.add(message.id) + await self.skull_messages.set(message.id, "skull") return await message.add_reaction(EMOJIS['SKULL']) # check for the candy chance next if random.randint(1, ADD_CANDY_REACTION_CHANCE) == 1: - self.candy_messages.add(message.id) + await self.candy_messages.set(message.id, "candy") return await message.add_reaction(EMOJIS['CANDY']) @in_month(Month.OCTOBER) @@ -94,17 +87,19 @@ class CandyCollection(commands.Cog): await self.reacted_msg_chance(message) return - if message.id in self.candy_messages and str(reaction.emoji) == EMOJIS['CANDY']: - self.candy_messages.remove(message.id) - prev_record = self.candy_records.get(str(user.id), 0) - self.candy_records[str(user.id)] = prev_record + 1 + if await self.candy_messages.get(message.id) == "candy" and str(reaction.emoji) == EMOJIS['CANDY']: + await self.candy_messages.delete(message.id) + if await self.candy_records.contains(user.id): + await self.candy_records.increment(user.id) + else: + await self.candy_records.set(user.id, 1) - elif message.id in self.skull_messages and str(reaction.emoji) == EMOJIS['SKULL']: - self.skull_messages.remove(message.id) + elif await self.skull_messages.get(message.id) == "skull" and str(reaction.emoji) == EMOJIS['SKULL']: + await self.skull_messages.delete(message.id) - if prev_record := self.candy_records.get(str(user.id)): + if prev_record := await self.candy_records.get(user.id): lost = min(random.randint(1, 3), prev_record) - self.candy_records[str(user.id)] = prev_record - lost + await self.candy_records.decrement(user.id, lost) if lost == prev_record: await CandyCollection.send_spook_msg(user, message.channel, 'all of your') @@ -116,7 +111,6 @@ class CandyCollection(commands.Cog): return # Skip saving await reaction.clear() - await self.bot.loop.run_in_executor(None, self.save_to_json) async def reacted_msg_chance(self, message: discord.Message) -> None: """ @@ -126,11 +120,11 @@ class CandyCollection(commands.Cog): existing reaction. """ if random.randint(1, ADD_SKULL_EXISTING_REACTION_CHANCE) == 1: - self.skull_messages.add(message.id) + await self.skull_messages.set(message.id, "skull") return await message.add_reaction(EMOJIS['SKULL']) if random.randint(1, ADD_CANDY_EXISTING_REACTION_CHANCE) == 1: - self.candy_messages.add(message.id) + await self.candy_messages.set(message.id, "candy") return await message.add_reaction(EMOJIS['CANDY']) @property @@ -159,18 +153,15 @@ class CandyCollection(commands.Cog): "I tried to take your candies but you had none to begin with!") await channel.send(embed=embed) - def save_to_json(self) -> None: - """Save JSON to a local file.""" - with self.json_file.open('w') as fp: - json.dump(dict(records=self.candy_records), fp) - @in_month(Month.OCTOBER) @commands.command() async def candy(self, ctx: commands.Context) -> None: """Get the candy leaderboard and save to JSON.""" + records = await self.candy_records.items() + def generate_leaderboard() -> str: top_sorted = sorted( - ((user_id, score) for user_id, score in self.candy_records.items() if score > 0), + ((user_id, score) for user_id, score in records if score > 0), key=lambda x: x[1], reverse=True ) @@ -199,3 +190,4 @@ class CandyCollection(commands.Cog): def setup(bot: commands.Bot) -> None: """Candy Collection game Cog load.""" + bot.add_cog(CandyCollection(bot)) diff --git a/bot/exts/halloween/hacktoberstats.py b/bot/exts/halloween/hacktoberstats.py index 4fd5c324..26d75565 100644 --- a/bot/exts/halloween/hacktoberstats.py +++ b/bot/exts/halloween/hacktoberstats.py @@ -1,21 +1,17 @@ -import json import logging import re from collections import Counter from datetime import datetime, timedelta -# from pathlib import Path from typing import List, Tuple, Union import aiohttp import discord +from async_rediscache import RedisCache from discord.ext import commands from bot.constants import Channels, Month, Tokens, WHITELISTED_CHANNELS from bot.utils.decorators import in_month, override_in_channel -# TODO: Implement substitutes for volume-persistent methods. # noqa: T000 -# from bot.utils.persist import make_persistent - log = logging.getLogger(__name__) CURRENT_YEAR = datetime.now().year # Used to construct GH API query @@ -39,10 +35,11 @@ GITHUB_NONEXISTENT_USER_MESSAGE = ( class HacktoberStats(commands.Cog): """Hacktoberfest statistics Cog.""" + # Stores mapping of user IDs and GitHub usernames + linked_accounts = RedisCache() + def __init__(self, bot: commands.Bot): self.bot = bot - # self.link_json = make_persistent(Path("bot", "resources", "halloween", "github_links.json")) - self.linked_accounts = self.load_linked_users() @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True) @@ -58,8 +55,8 @@ class HacktoberStats(commands.Cog): if not github_username: author_id, author_mention = self._author_mention_from_context(ctx) - if str(author_id) in self.linked_accounts.keys(): - github_username = self.linked_accounts[author_id]["github_username"] + if await self.linked_accounts.contains(author_id): + github_username = await self.linked_accounts.get(author_id) logging.info(f"Getting stats for {author_id} linked GitHub account '{github_username}'") else: msg = ( @@ -79,30 +76,19 @@ class HacktoberStats(commands.Cog): """ Link the invoking user's Github github_username to their Discord ID. - Linked users are stored as a nested dict: - { - Discord_ID: { - "github_username": str - "date_added": datetime - } - } + Linked users are stored in Redis: User ID => GitHub Username. """ author_id, author_mention = self._author_mention_from_context(ctx) if github_username: - if str(author_id) in self.linked_accounts.keys(): - old_username = self.linked_accounts[author_id]["github_username"] + if await self.linked_accounts.contains(author_id): + old_username = await self.linked_accounts.get(author_id) logging.info(f"{author_id} has changed their github link from '{old_username}' to '{github_username}'") await ctx.send(f"{author_mention}, your GitHub username has been updated to: '{github_username}'") else: logging.info(f"{author_id} has added a github link to '{github_username}'") await ctx.send(f"{author_mention}, your GitHub username has been added") - self.linked_accounts[author_id] = { - "github_username": github_username, - "date_added": datetime.now() - } - - self.save_linked_users() + await self.linked_accounts.set(author_id, github_username) else: logging.info(f"{author_id} tried to link a GitHub account but didn't provide a username") await ctx.send(f"{author_mention}, a GitHub username is required to link your account") @@ -114,7 +100,7 @@ class HacktoberStats(commands.Cog): """Remove the invoking user's account link from the log.""" author_id, author_mention = self._author_mention_from_context(ctx) - stored_user = self.linked_accounts.pop(author_id, None) + stored_user = await self.linked_accounts.pop(author_id, None) if stored_user: await ctx.send(f"{author_mention}, your GitHub profile has been unlinked") logging.info(f"{author_id} has unlinked their GitHub account") @@ -122,48 +108,6 @@ class HacktoberStats(commands.Cog): await ctx.send(f"{author_mention}, you do not currently have a linked GitHub account") logging.info(f"{author_id} tried to unlink their GitHub account but no account was linked") - self.save_linked_users() - - def load_linked_users(self) -> dict: - """ - Load list of linked users from local JSON file. - - Linked users are stored as a nested dict: - { - Discord_ID: { - "github_username": str - "date_added": datetime - } - } - """ - if self.link_json.exists(): - logging.info(f"Loading linked GitHub accounts from '{self.link_json}'") - with open(self.link_json, 'r', encoding="utf8") as file: - linked_accounts = json.load(file) - - logging.info(f"Loaded {len(linked_accounts)} linked GitHub accounts from '{self.link_json}'") - return linked_accounts - else: - logging.info(f"Linked account log: '{self.link_json}' does not exist") - return {} - - def save_linked_users(self) -> None: - """ - Save list of linked users to local JSON file. - - Linked users are stored as a nested dict: - { - Discord_ID: { - "github_username": str - "date_added": datetime - } - } - """ - logging.info(f"Saving linked_accounts to '{self.link_json}'") - with open(self.link_json, 'w', encoding="utf8") as file: - json.dump(self.linked_accounts, file, default=str) - logging.info(f"linked_accounts saved to '{self.link_json}'") - async def get_stats(self, ctx: commands.Context, github_username: str) -> None: """ Query GitHub's API for PRs created by a GitHub user during the month of October. @@ -491,3 +435,4 @@ class HacktoberStats(commands.Cog): def setup(bot: commands.Bot) -> None: """Hacktoberstats Cog load.""" + bot.add_cog(HacktoberStats(bot)) |