diff options
Diffstat (limited to 'bot/exts/christmas/adventofcode.py')
-rw-r--r-- | bot/exts/christmas/adventofcode.py | 255 |
1 files changed, 197 insertions, 58 deletions
diff --git a/bot/exts/christmas/adventofcode.py b/bot/exts/christmas/adventofcode.py index be1c733a..9b7780ae 100644 --- a/bot/exts/christmas/adventofcode.py +++ b/bot/exts/christmas/adventofcode.py @@ -136,21 +136,24 @@ class AdventOfCode(commands.Cog): public_leaderboard_members = RedisCache() # We don't want that users join to multiple leaderboards, so return only 1 code to user. - # User ID -> AoC leaderboard ID + # User ID -> Join code user_join_codes = RedisCache() + # We must keep track when user got (and what) stars, because we have multiple leaderboards. + # Format: User ID -> AoCCachedMember (pickle) + public_user_data = RedisCache() + def __init__(self, bot: Bot): self.bot = bot self._base_url = f"https://adventofcode.com/{AocConfig.year}" self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" - self.private_leaderboard_url = f"{self._base_url}/leaderboard/private/view/{AocConfig.leaderboard_staff_id}" self.about_aoc_filepath = Path("./bot/resources/advent_of_code/about.json") self.cached_about_aoc = self._build_about_embed() self.cached_global_leaderboard = None - self.cached_private_leaderboard = None + self.cached_staff_leaderboard = None self.countdown_task = None self.status_task = None @@ -173,6 +176,11 @@ class AdventOfCode(commands.Cog): ) } + self.last_updated = None + self.staff_last_updated = None + self.refresh_lock = asyncio.Lock() + self.staff_refresh_lock = asyncio.Lock() + @seasonal_task(Month.DECEMBER, sleep_time=60 * 30) async def leaderboard_members_updater(self) -> None: """Updates public leaderboards cached member amounts in every 30 minutes.""" @@ -183,11 +191,129 @@ class AdventOfCode(commands.Cog): # Update every leaderboard for what we have session cookie for aoc_id, cookie in self.leaderboard_cookies.items(): leaderboard = await AocPrivateLeaderboard.from_url(aoc_id, cookie) - log.info(leaderboard.members) # Update only when API return any members if len(leaderboard.members) > 0: await self.public_leaderboard_members.set(aoc_id, len(leaderboard.members)) + async def refresh_leaderboard(self) -> None: + """Updates public PyDis leaderboard scores based on dates.""" + self.last_updated = datetime.utcnow() + leaderboard_users = {} + leaderboards = [ + await AocPrivateLeaderboard.json_from_url(aoc_id, cookie) + for aoc_id, cookie in self.leaderboard_cookies.items() + ] + + for leaderboard in leaderboards: + for member_id, data in leaderboard["members"].items(): + leaderboard_users[int(member_id)] = { + "name": data.get("name", "Anonymous User"), + "aoc_id": int(member_id), + "days": { + day: { + "star_one": "1" in stars, + "star_two": "2" in stars, + "star_one_earned": int(stars["1"]["get_star_ts"]) if "1" in stars else None, + "star_two_earned": int(stars["2"]["get_star_ts"]) if "2" in stars else None, + } for day, stars in data.get("completion_day_level", {}).items() + } + } + + # Iterate over every advent day + for day in range(1, 26): + day = str(day) + star_one_users = [] + star_two_users = [] + + for user, user_data in leaderboard_users.items(): + if day in user_data["days"]: + if user_data["days"][day]["star_one"]: + star_one_users.append({ + "id": user, + "earned": datetime.fromtimestamp(user_data["days"][day]["star_one_earned"]), + }) + + if user_data["days"][day]["star_two"]: + star_two_users.append({ + "id": user, + "earned": datetime.fromtimestamp(user_data["days"][day]["star_two_earned"]), + }) + + # Sort these lists based on user star earning time + star_one_users = sorted(star_one_users, key=lambda k: k["earned"])[:100] + star_two_users = sorted(star_two_users, key=lambda k: k["earned"])[:100] + + points = 100 + for star_user_one in star_one_users: + if "score" in leaderboard_users[star_user_one["id"]]: + leaderboard_users[star_user_one["id"]]["score"] += points + else: + leaderboard_users[star_user_one["id"]]["score"] = points + points -= 1 + + points = 100 + for star_user_two in star_two_users: + if "score" in leaderboard_users[star_user_two["id"]]: + leaderboard_users[star_user_two["id"]]["score"] += points + else: + leaderboard_users[star_user_two["id"]]["score"] = points + points -= 1 + + # Put completions also in to make building easier later. + for user, user_data in leaderboard_users.items(): + completions_star_one = sum([1 for day in user_data["days"].values() if day["star_one"]]) + completions_star_two = sum([1 for day in user_data["days"].values() if day["star_two"]]) + + leaderboard_users[user]["star_one_completions"] = completions_star_one + leaderboard_users[user]["star_two_completions"] = completions_star_two + + # Finally clear old cache and persist everything to Redis + await self.public_user_data.clear() + [await self.public_user_data.set(user, json.dumps(user_data)) for user, user_data in leaderboard_users.items()] + + async def check_leaderboard(self) -> None: + """Checks should be public leaderboard refreshed and refresh when required.""" + async with self.refresh_lock: + secs = AocConfig.leaderboard_cache_age_threshold_seconds + if self.last_updated is None or self.last_updated < datetime.utcnow() - timedelta(seconds=secs): + await self.refresh_leaderboard() + + async def check_staff_leaderboard(self) -> None: + """Checks should be staff leaderboard refreshed and refresh when required.""" + async with self.staff_refresh_lock: + secs = AocConfig.leaderboard_cache_age_threshold_seconds + if self.staff_last_updated is None or self.staff_last_updated < datetime.utcnow() - timedelta(seconds=secs): + self.staff_last_updated = datetime.utcnow() + self.cached_staff_leaderboard = await AocPrivateLeaderboard.from_url( + AocConfig.leaderboard_staff_id, + Tokens.aoc_staff_session_cookie + ) + + async def get_leaderboard(self, members_amount: int) -> str: + """Generates leaderboard based on Redis data.""" + await self.check_leaderboard() + leaderboard_members = sorted( + [json.loads(data) for user, data in await self.public_user_data.items()], key=lambda k: k["score"] + )[:members_amount] + + stargroup = f"{Emojis.star}, {Emojis.star * 2}" + header = f"{' ' * 3}{'Score'} {'Name':^25} {stargroup:^7}\n{'-' * 44}" + table = "" + for i, member in enumerate(leaderboard_members): + if member["name"] == "Anonymous User": + name = f"{member['name']} #{member['aoc_id']}" + else: + name = member["name"] + + table += ( + f"{i + 1:2}) {member['score']:4} {name:25.25} " + f"({member['star_one_completions']:2}, {member['star_two_completions']:2})\n" + ) + else: + table = f"```{header}\n{table}```" + + return table + @in_month(Month.DECEMBER) @commands.group(name="adventofcode", aliases=("aoc",)) @override_in_channel(AOC_WHITELIST) @@ -316,26 +442,32 @@ class AdventOfCode(commands.Cog): limit will default to this maximum and provide feedback to the user. """ async with ctx.typing(): - await self._check_leaderboard_cache(ctx) - - if not self.cached_private_leaderboard: - # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() - # Short circuit here if there's an issue - return - + staff = ctx.channel.id == Channels.advent_of_code_staff number_of_people_to_display = await self._check_n_entries(ctx, number_of_people_to_display) - # Generate leaderboard table for embed - members_to_print = self.cached_private_leaderboard.top_n(number_of_people_to_display) - table = AocPrivateLeaderboard.build_leaderboard_embed(members_to_print) + if staff: + await self.check_staff_leaderboard() + members_to_print = self.cached_staff_leaderboard.top_n(number_of_people_to_display) + table = AocPrivateLeaderboard.build_leaderboard_embed(members_to_print) + else: + table = await self.get_leaderboard(number_of_people_to_display) # Build embed aoc_embed = discord.Embed( - description=f"Total members: {len(self.cached_private_leaderboard.members)}", + description=( + "Total members: " + f"{len(self.cached_staff_leaderboard.members) if staff else await self.public_user_data.length()}" + ), colour=Colours.soft_green, - timestamp=self.cached_private_leaderboard.last_updated + timestamp=self.staff_last_updated if staff else self.last_updated ) - aoc_embed.set_author(name="Advent of Code", url=self.private_leaderboard_url) + if ctx.channel.id == Channels.advent_of_code_staff: + aoc_embed.set_author( + name="Advent of Code", + url=f"{self._base_url}/leaderboard/private/view/{AocConfig.leaderboard_staff_id}" + ) + else: + aoc_embed.set_author(name="Advent of Code") aoc_embed.set_footer(text="Last Updated") await ctx.send( @@ -356,29 +488,54 @@ class AdventOfCode(commands.Cog): Embed will display the total members and the number of users who have completed each day's puzzle """ async with ctx.typing(): - await self._check_leaderboard_cache(ctx) - - if not self.cached_private_leaderboard: - # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() - # Short circuit here if there's an issue - return + is_staff = ctx.channel.id == Channels.advent_of_code_staff + if is_staff: + await self.check_staff_leaderboard() + else: + await self.check_leaderboard() # Build ASCII table - total_members = len(self.cached_private_leaderboard.members) + if is_staff: + total_members = len(self.cached_staff_leaderboard.members) + else: + total_members = await self.public_user_data.length() + _star = Emojis.star header = f"{'Day':4}{_star:^8}{_star*2:^4}{'% ' + _star:^8}{'% ' + _star*2:^4}\n{'='*35}" table = "" - for day, completions in enumerate(self.cached_private_leaderboard.daily_completion_summary): - per_one_star = f"{(completions[0]/total_members)*100:.2f}" - per_two_star = f"{(completions[1]/total_members)*100:.2f}" + if is_staff: + for day, completions in enumerate(self.cached_staff_leaderboard.daily_completion_summary): + per_one_star = f"{(completions[0]/total_members)*100:.2f}" + per_two_star = f"{(completions[1]/total_members)*100:.2f}" - table += f"{day+1:3}){completions[0]:^8}{completions[1]:^6}{per_one_star:^10}{per_two_star:^6}\n" + table += f"{day+1:3}){completions[0]:^8}{completions[1]:^6}{per_one_star:^10}{per_two_star:^6}\n" + else: + completions = {} + # Build data for completion rates + for _, user_data in await self.public_user_data.items(): + user_data = json.loads(user_data) + for day, stars in user_data["days"].items(): + day = int(day) + if day not in completions: + completions[day] = [0, 0] + + if stars["star_one"]: + completions[day][0] += 1 + if stars["star_two"]: + completions[day][1] += 1 + + for day, completion in completions.items(): + per_one_star = f"{(completion[0]/total_members)*100:.2f}" + per_two_star = f"{(completion[1] / total_members) * 100:.2f}" + + table += f"{day:3}){completion[0]:^8}{completion[1]:^6}{per_one_star:^10}{per_two_star:^6}\n" table = f"```\n{header}\n{table}```" # Build embed daily_stats_embed = discord.Embed( - colour=Colours.soft_green, timestamp=self.cached_private_leaderboard.last_updated + colour=Colours.soft_green, + timestamp=self.staff_last_updated if is_staff else self.last_updated ) daily_stats_embed.set_author(name="Advent of Code", url=self._base_url) daily_stats_embed.set_footer(text="Last Updated") @@ -402,7 +559,7 @@ class AdventOfCode(commands.Cog): limit will default to this maximum and provide feedback to the user. """ async with ctx.typing(): - await self._check_leaderboard_cache(ctx, global_board=True) + await self._check_leaderboard_cache(ctx) if not self.cached_global_leaderboard: # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() @@ -425,41 +582,31 @@ class AdventOfCode(commands.Cog): embed=aoc_embed, ) - async def _check_leaderboard_cache(self, ctx: commands.Context, global_board: bool = False) -> None: + async def _check_leaderboard_cache(self, ctx: commands.Context) -> None: """ Check age of current leaderboard & pull a new one if the board is too old. global_board is a boolean to toggle between the global board and the Pydis private board """ - # Toggle between global & private leaderboards - if global_board: - log.debug("Checking global leaderboard cache") - leaderboard_str = "cached_global_leaderboard" - _shortstr = "global" - else: - log.debug("Checking private leaderboard cache") - leaderboard_str = "cached_private_leaderboard" - _shortstr = "private" - - leaderboard = getattr(self, leaderboard_str) + leaderboard = self.cached_global_leaderboard if not leaderboard: - log.debug(f"No cached {_shortstr} leaderboard found") - await self._boardgetter(global_board) + log.debug("No cached global leaderboard found") + self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() else: leaderboard_age = datetime.utcnow() - leaderboard.last_updated age_seconds = leaderboard_age.total_seconds() if age_seconds < AocConfig.leaderboard_cache_age_threshold_seconds: - log.debug(f"Cached {_shortstr} leaderboard age less than threshold ({age_seconds} seconds old)") + log.debug(f"Cached global leaderboard age less than threshold ({age_seconds} seconds old)") else: - log.debug(f"Cached {_shortstr} leaderboard age greater than threshold ({age_seconds} seconds old)") - await self._boardgetter(global_board) + log.debug(f"Cached global leaderboard age greater than threshold ({age_seconds} seconds old)") + self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() - leaderboard = getattr(self, leaderboard_str) + leaderboard = self.cached_global_leaderboard if not leaderboard: await ctx.send( "", embed=_error_embed_helper( - title=f"Something's gone wrong and there's no cached {_shortstr} leaderboard!", + title="Something's gone wrong and there's no cached global leaderboard!", description="Please check in with a staff member.", ), ) @@ -475,8 +622,7 @@ class AdventOfCode(commands.Cog): ) await ctx.send( f":x: {author.mention}, number of entries to display must be a positive " - f"integer less than or equal to {max_entries}\n\n" - f"Head to {self.private_leaderboard_url} to view the entire leaderboard" + f"integer less than or equal to {max_entries}" ) number_of_people_to_display = max_entries @@ -496,13 +642,6 @@ class AdventOfCode(commands.Cog): return about_embed - async def _boardgetter(self, global_board: bool) -> None: - """Invoke the proper leaderboard getter based on the global_board boolean.""" - if global_board: - self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() - else: - self.cached_private_leaderboard = await AocPrivateLeaderboard.from_url() - def cog_unload(self) -> None: """Cancel season-related tasks on cog unload.""" log.debug("Unloading the cog and canceling the background task.") |