diff options
Diffstat (limited to 'bot')
-rw-r--r-- | bot/exts/christmas/advent_of_code/__init__.py | 10 | ||||
-rw-r--r-- | bot/exts/christmas/advent_of_code/_caches.py | 5 | ||||
-rw-r--r-- | bot/exts/christmas/advent_of_code/_cog.py | 353 | ||||
-rw-r--r-- | bot/exts/christmas/advent_of_code/_helpers.py | 312 |
4 files changed, 680 insertions, 0 deletions
diff --git a/bot/exts/christmas/advent_of_code/__init__.py b/bot/exts/christmas/advent_of_code/__init__.py new file mode 100644 index 00000000..20ac5ab9 --- /dev/null +++ b/bot/exts/christmas/advent_of_code/__init__.py @@ -0,0 +1,10 @@ +from bot.bot import Bot + + +def setup(bot: Bot) -> None: + """Advent of Code Cog load.""" + # Import the Cog at runtime to prevent side effects like defining + # RedisCache instances too early. + from ._cog import AdventOfCode + + bot.add_cog(AdventOfCode(bot)) diff --git a/bot/exts/christmas/advent_of_code/_caches.py b/bot/exts/christmas/advent_of_code/_caches.py new file mode 100644 index 00000000..32d5394f --- /dev/null +++ b/bot/exts/christmas/advent_of_code/_caches.py @@ -0,0 +1,5 @@ +import async_rediscache + +leaderboard_counts = async_rediscache.RedisCache(namespace="AOC_leaderboard_counts") +leaderboard_cache = async_rediscache.RedisCache(namespace="AOC_leaderboard_cache") +assigned_leaderboard = async_rediscache.RedisCache(namespace="AOC_assigned_leaderboard") diff --git a/bot/exts/christmas/advent_of_code/_cog.py b/bot/exts/christmas/advent_of_code/_cog.py new file mode 100644 index 00000000..0df645bd --- /dev/null +++ b/bot/exts/christmas/advent_of_code/_cog.py @@ -0,0 +1,353 @@ +import asyncio +import json +import logging +import math +from datetime import datetime, timedelta +from pathlib import Path +from typing import Tuple + +import discord +from discord.ext import commands +from pytz import timezone + +from bot.bot import Bot +from bot.constants import ( + AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS, +) +from bot.exts.christmas.advent_of_code import _helpers +from bot.utils.decorators import in_month, override_in_channel, with_role + +log = logging.getLogger(__name__) + +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} + +EST = timezone("EST") +COUNTDOWN_STEP = 60 * 5 + +AOC_WHITELIST = WHITELISTED_CHANNELS + (Channels.advent_of_code, Channels.advent_of_code_staff) + + +def is_in_advent() -> bool: + """Utility function to check if we are between December 1st and December 25th.""" + # Run the code from the 1st to the 24th + return datetime.now(EST).day in range(1, 25) and datetime.now(EST).month == 12 + + +def time_left_to_aoc_midnight() -> Tuple[datetime, timedelta]: + """Calculates the amount of time left until midnight in UTC-5 (Advent of Code maintainer timezone).""" + # Change all time properties back to 00:00 + todays_midnight = datetime.now(EST).replace( + microsecond=0, + second=0, + minute=0, + hour=0 + ) + + # We want tomorrow so add a day on + tomorrow = todays_midnight + timedelta(days=1) + + # Calculate the timedelta between the current time and midnight + return tomorrow, tomorrow - datetime.now(EST) + + +async def countdown_status(bot: commands.Bot) -> None: + """Set the playing status of the bot to the minutes & hours left until the next day's challenge.""" + while is_in_advent(): + _, time_left = time_left_to_aoc_midnight() + + aligned_seconds = int(math.ceil(time_left.seconds / COUNTDOWN_STEP)) * COUNTDOWN_STEP + hours, minutes = aligned_seconds // 3600, aligned_seconds // 60 % 60 + + if aligned_seconds == 0: + playing = "right now!" + elif aligned_seconds == COUNTDOWN_STEP: + playing = f"in less than {minutes} minutes" + elif hours == 0: + playing = f"in {minutes} minutes" + elif hours == 23: + playing = f"since {60 - minutes} minutes ago" + else: + playing = f"in {hours} hours and {minutes} minutes" + + # Status will look like "Playing in 5 hours and 30 minutes" + await bot.change_presence(activity=discord.Game(playing)) + + # Sleep until next aligned time or a full step if already aligned + delay = time_left.seconds % COUNTDOWN_STEP or COUNTDOWN_STEP + await asyncio.sleep(delay) + + +async def day_countdown(bot: commands.Bot) -> None: + """ + Calculate the number of seconds left until the next day of Advent. + + Once we have calculated this we should then sleep that number and when the time is reached, ping + the Advent of Code role notifying them that the new challenge is ready. + """ + while is_in_advent(): + tomorrow, time_left = time_left_to_aoc_midnight() + + # Prevent bot from being slightly too early in trying to announce today's puzzle + await asyncio.sleep(time_left.seconds + 1) + + channel = bot.get_channel(Channels.advent_of_code) + + if not channel: + log.error("Could not find the AoC channel to send notification in") + break + + aoc_role = channel.guild.get_role(AocConfig.role_id) + if not aoc_role: + log.error("Could not find the AoC role to announce the daily puzzle") + break + + puzzle_url = f"https://adventofcode.com/{AocConfig.year}/day/{tomorrow.day}" + + # Check if the puzzle is already available to prevent our members from spamming + # the puzzle page before it's available by making a small HEAD request. + for retry in range(1, 5): + log.debug(f"Checking if the puzzle is already available (attempt {retry}/4)") + async with bot.http_session.head(puzzle_url, raise_for_status=False) as resp: + if resp.status == 200: + log.debug("Puzzle is available; let's send an announcement message.") + break + log.debug(f"The puzzle is not yet available (status={resp.status})") + await asyncio.sleep(10) + else: + log.error("The puzzle does does not appear to be available at this time, canceling announcement") + break + + await channel.send( + f"{aoc_role.mention} Good morning! Day {tomorrow.day} is ready to be attempted. " + f"View it online now at {puzzle_url}. Good luck!", + allowed_mentions=discord.AllowedMentions( + everyone=False, + users=False, + roles=[discord.Object(AocConfig.role_id)], + ) + ) + + # Wait a couple minutes so that if our sleep didn't sleep enough + # time we don't end up announcing twice. + await asyncio.sleep(120) + + +class AdventOfCode(commands.Cog): + """Advent of Code festivities! Ho Ho Ho!""" + + def __init__(self, bot: Bot) -> None: + self.bot = bot + + self._base_url = f"https://adventofcode.com/{AocConfig.year}" + self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" + + self.about_aoc_filepath = Path("./bot/resources/advent_of_code/about.json") + self.cached_about_aoc = self._build_about_embed() + + self.countdown_task = None + self.status_task = None + + countdown_coro = day_countdown(self.bot) + self.countdown_task = self.bot.loop.create_task(countdown_coro) + + status_coro = countdown_status(self.bot) + self.status_task = self.bot.loop.create_task(status_coro) + + @in_month(Month.DECEMBER) + @commands.group(name="adventofcode", aliases=("aoc",)) + @override_in_channel(AOC_WHITELIST) + async def adventofcode_group(self, ctx: commands.Context) -> None: + """All of the Advent of Code commands.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @adventofcode_group.command( + name="subscribe", + aliases=("sub", "notifications", "notify", "notifs"), + brief="Notifications for new days" + ) + @override_in_channel(AOC_WHITELIST) + async def aoc_subscribe(self, ctx: commands.Context) -> None: + """Assign the role for notifications about new days being ready.""" + role = ctx.guild.get_role(AocConfig.role_id) + unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" + + if role not in ctx.author.roles: + await ctx.author.add_roles(role) + await ctx.send("Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " + f"You can run `{unsubscribe_command}` to disable them again for you.") + else: + await ctx.send("Hey, you already are receiving notifications about new Advent of Code tasks. " + f"If you don't want them any more, run `{unsubscribe_command}` instead.") + + @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") + @override_in_channel(AOC_WHITELIST) + async def aoc_unsubscribe(self, ctx: commands.Context) -> None: + """Remove the role for notifications about new days being ready.""" + role = ctx.guild.get_role(AocConfig.role_id) + + if role in ctx.author.roles: + await ctx.author.remove_roles(role) + await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") + else: + await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") + + @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") + @override_in_channel(AOC_WHITELIST) + async def aoc_countdown(self, ctx: commands.Context) -> None: + """Return time left until next day.""" + if not is_in_advent(): + datetime_now = datetime.now(EST) + + # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past + this_year = datetime(datetime_now.year, 12, 1, tzinfo=EST) + next_year = datetime(datetime_now.year + 1, 12, 1, tzinfo=EST) + deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) + delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta + + # Add a finer timedelta if there's less than a day left + if delta.days == 0: + delta_str = f"approximately {delta.seconds // 3600} hours" + else: + delta_str = f"{delta.days} days" + + await ctx.send(f"The Advent of Code event is not currently running. " + f"The next event will start in {delta_str}.") + return + + tomorrow, time_left = time_left_to_aoc_midnight() + + hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + + await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") + + @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") + @override_in_channel(AOC_WHITELIST) + async def about_aoc(self, ctx: commands.Context) -> None: + """Respond with an explanation of all things Advent of Code.""" + await ctx.send("", embed=self.cached_about_aoc) + + @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)") + @override_in_channel(AOC_WHITELIST) + async def join_leaderboard(self, ctx: commands.Context) -> None: + """DM the user the information for joining the PyDis AoC private leaderboard.""" + author = ctx.message.author + log.info(f"{author.name} ({author.id}) has requested a PyDis AoC leaderboard code") + + if AocConfig.staff_leaderboard_id and any(r.id == Roles.helpers for r in author.roles): + join_code = AocConfig.leaderboards[AocConfig.staff_leaderboard_id].join_code + else: + join_code = await _helpers.get_public_join_code(author) + + if not join_code: + log.error(f"Failed to get a join code for user {author} ({author.id})") + error_embed = _error_embed_helper( + title="Unable to get join code", + description="Failed to get a join code to one of our boards. Please notify staff." + ) + await ctx.send(embed=error_embed) + return + + info_str = ( + "Head over to https://adventofcode.com/leaderboard/private " + f"with code `{join_code}` to join the Python Discord leaderboard!" + ) + try: + await author.send(info_str) + except discord.errors.Forbidden: + log.debug(f"{author.name} ({author.id}) has disabled DMs from server members") + await ctx.send(f":x: {author.mention}, please (temporarily) enable DMs to receive the join code") + else: + await ctx.message.add_reaction(Emojis.envelope) + + @adventofcode_group.command( + name="leaderboard", + aliases=("board", "lb"), + brief="Get a snapshot of the PyDis private AoC leaderboard", + ) + @override_in_channel(AOC_WHITELIST) + async def aoc_leaderboard(self, ctx: commands.Context) -> None: + """Get the current top scorers of the Python Discord Leaderboard.""" + async with ctx.typing(): + leaderboard = await _helpers.fetch_leaderboard() + number_of_participants = leaderboard["number_of_participants"] + + top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) + header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}" + + table = f"```\n{leaderboard['top_leaderboard']}\n```" + info_embed = _helpers.get_summary_embed(leaderboard) + + await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) + + @adventofcode_group.command( + name="stats", + aliases=("dailystats", "ds"), + brief="Get daily statistics for the PyDis private leaderboard" + ) + @override_in_channel(AOC_WHITELIST) + async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None: + """Send an embed with daily completion statistics for the Python Discord leaderboard.""" + leaderboard = await _helpers.fetch_leaderboard() + + # The daily stats are serialized as JSON as they have to be cached in Redis + daily_stats = json.loads(leaderboard["daily_stats"]) + async with ctx.typing(): + lines = ["Day ⭐ ⭐⭐ | %⭐ %⭐⭐\n================================"] + for day, stars in daily_stats.items(): + star_one = stars["star_one"] + star_two = stars["star_two"] + p_star_one = star_one / leaderboard["number_of_participants"] + p_star_two = star_two / leaderboard["number_of_participants"] + lines.append( + f"{day:>2}) {star_one:>4} {star_two:>4} | {p_star_one:>7.2%} {p_star_two:>7.2%}" + ) + table = "\n".join(lines) + info_embed = _helpers.get_summary_embed(leaderboard) + await ctx.send(f"```\n{table}\n```", embed=info_embed) + + @with_role(Roles.admin, Roles.events_lead) + @adventofcode_group.command( + name="refresh", + aliases=("fetch",), + brief="Force a refresh of the leaderboard cache.", + ) + async def refresh_leaderboard(self, ctx: commands.Context) -> None: + """ + Force a refresh of the leaderboard cache. + + Note: This should be used sparingly, as we want to prevent sending too + many requests to the Advent of Code server. + """ + async with ctx.typing(): + await _helpers.fetch_leaderboard(invalidate_cache=True) + await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!") + + def cog_unload(self) -> None: + """Cancel season-related tasks on cog unload.""" + log.debug("Unloading the cog and canceling the background task.") + self.countdown_task.cancel() + self.status_task.cancel() + + def _build_about_embed(self) -> discord.Embed: + """Build and return the informational "About AoC" embed from the resources file.""" + with self.about_aoc_filepath.open("r", encoding="utf8") as f: + embed_fields = json.load(f) + + about_embed = discord.Embed( + title=self._base_url, + colour=Colours.soft_green, + url=self._base_url, + timestamp=datetime.utcnow() + ) + about_embed.set_author(name="Advent of Code", url=self._base_url) + for field in embed_fields: + about_embed.add_field(**field) + + about_embed.set_footer(text="Last Updated") + return about_embed + + +def _error_embed_helper(title: str, description: str) -> discord.Embed: + """Return a red-colored Embed with the given title and description.""" + return discord.Embed(title=title, description=description, colour=discord.Colour.red()) diff --git a/bot/exts/christmas/advent_of_code/_helpers.py b/bot/exts/christmas/advent_of_code/_helpers.py new file mode 100644 index 00000000..8b85bf5d --- /dev/null +++ b/bot/exts/christmas/advent_of_code/_helpers.py @@ -0,0 +1,312 @@ +import collections +import datetime +import json +import logging +import operator +import typing + +import aiohttp +import discord + +from bot.constants import AdventOfCode, Colours +from bot.exts.christmas.advent_of_code import _caches + +log = logging.getLogger(__name__) + +PASTE_URL = "https://paste.pythondiscord.com/documents" +RAW_PASTE_URL_TEMPLATE = "https://paste.pythondiscord.com/raw/{key}" + +# Base API URL for Advent of Code Private Leaderboards +AOC_API_URL = "https://adventofcode.com/{year}/leaderboard/private/view/{leaderboard_id}.json" +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} + +# Leaderboard Line Template +AOC_TABLE_TEMPLATE = "{rank: >4} | {name:25.25} | {score: >5} | {stars}" +HEADER = AOC_TABLE_TEMPLATE.format(rank="", name="Name", score="Score", stars="⭐, ⭐⭐") +HEADER = f"{HEADER}\n{'-' * (len(HEADER) + 2)}" +HEADER_LINES = len(HEADER.splitlines()) +TOP_LEADERBOARD_LINES = HEADER_LINES + AdventOfCode.leaderboard_displayed_members + +# Keys that need to be set for a cached leaderboard +REQUIRED_CACHE_KEYS = ( + "full_leaderboard", + "top_leaderboard", + "full_leaderboard_url", + "leaderboard_fetched_at", + "number_of_participants", + "daily_stats", +) + +AOC_EMBED_THUMBNAIL = ( + "https://raw.githubusercontent.com/python-discord" + "/branding/master/seasonal/christmas/server_icons/festive_256.gif" +) + +# Create namedtuple that combines a participant's name and their completion +# time for a specific star. We're going to use this later to order the results +# for each star to compute the rank score. +_StarResult = collections.namedtuple("StarResult", "name completion_time") + + +def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: + """ + Parse the leaderboard data received from the AoC website. + + The data we receive from AoC is structured by member, not by day/star. This + means that we need to "transpose" the data to a per star structure in order + to calculate the rank scores each individual should get. + + As we need our data both "per participant" as well as "per day", we return + the parsed and analyzed data in both formats. + """ + # We need to get an aggregate of completion times for each star of each day, + # instead of per participant to compute the rank scores. This dictionary will + # provide such a transposed dataset. + star_results = collections.defaultdict(list) + + # As we're already iterating over the participants, we can record the number of + # first stars and second stars they've achieved right here and now. This means + # we won't have to iterate over the participants again later. + leaderboard = {} + + # The data we get from the AoC website is structured by member, not by day/star, + # which means we need to iterate over the members to transpose the data to a per + # star view. We need that per star view to compute rank scores per star. + for member in raw_leaderboard_data.values(): + name = member["name"] if member["name"] else f"Anonymous #{member['id']}" + leaderboard[name] = {"score": 0, "star_1_count": 0, "star_2_count": 0} + + # Iterate over all days for this participant + for day, stars in member["completion_day_level"].items(): + # Iterate over the complete stars for this day for this participant + for star, data in stars.items(): + # Record completion of this star for this individual + leaderboard[name][f"star_{star}_count"] += 1 + + # Record completion datetime for this participant for this day/star + completion_time = datetime.datetime.fromtimestamp(int(data['get_star_ts'])) + star_results[(day, star)].append( + _StarResult(name=name, completion_time=completion_time) + ) + + # Now that we have a transposed dataset that holds the completion time of all + # participants per star, we can compute the rank-based scores each participant + # should get for that star. + max_score = len(leaderboard) + for star in star_results.values(): + for rank, star_result in enumerate(sorted(star, key=operator.itemgetter(1))): + leaderboard[star_result.name]["score"] += max_score - rank + + # Since dictionaries now retain insertion order, let's use that + sorted_leaderboard = dict( + sorted(leaderboard.items(), key=lambda t: t[1]["score"], reverse=True) + ) + + daily_stats = {} + for day in range(1, 26): + star_one = len(star_results.get((day, 1), [])) + star_two = len(star_results.get((day, 1), [])) + daily_stats[day] = {"star_one": star_one, "star_two": star_two} + + return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard} + + +def _format_leaderboard(leaderboard: typing.Dict[str, int]) -> str: + """Format the leaderboard using the AOC_TABLE_TEMPLATE.""" + leaderboard_lines = [HEADER] + for rank, (name, results) in enumerate(leaderboard.items(), start=1): + leaderboard_lines.append( + AOC_TABLE_TEMPLATE.format( + rank=rank, + name=name, + score=str(results["score"]), + stars=f"({results['star_1_count']}, {results['star_2_count']})" + ) + ) + + return "\n".join(leaderboard_lines) + + +async def _fetch_leaderboard_data() -> typing.Dict[str, typing.Any]: + """Fetch data for all leaderboards and return a pooled result.""" + year = AdventOfCode.year + + # We'll make our requests one at a time to not flood the AoC website with + # up to six simultaneous requests. This may take a little longer, but it + # does avoid putting unnecessary stress on the Advent of Code website. + + # Container to store the raw data of each leaderboard + participants = {} + for leaderboard in AdventOfCode.leaderboards.values(): + leaderboard_url = AOC_API_URL.format(year=year, leaderboard_id=leaderboard.id) + cookies = {"session": leaderboard.session} + + # We don't need to create a session if we're going to throw it away after each request + async with aiohttp.request( + "GET", leaderboard_url, headers=AOC_REQUEST_HEADER, cookies=cookies + ) as resp: + if resp.status == 200: + raw_data = await resp.json() + + # Get the participants and store their current count + board_participants = raw_data["members"] + await _caches.leaderboard_counts.set(leaderboard.id, len(board_participants)) + participants.update(board_participants) + else: + log.warning(f"Fetching data failed for leaderboard `{leaderboard.id}`") + resp.raise_for_status() + + log.info(f"Fetched leaderboard information for {len(participants)} participants") + return participants + + +async def _upload_leaderboard(leaderboard: str) -> str: + """Upload the full leaderboard to our paste service and return the URL.""" + async with aiohttp.request("POST", PASTE_URL, data=leaderboard) as resp: + try: + resp_json = await resp.json() + except Exception: + log.exception("Failed to upload full leaderboard to paste service") + return "" + + if "key" in resp_json: + return RAW_PASTE_URL_TEMPLATE.format(key=resp_json["key"]) + + log.error(f"Unexpected response from paste service while uploading leaderboard {resp_json}") + return "" + + +def _get_top_leaderboard(full_leaderboard: str) -> str: + """Get the leaderboard up to the maximum specified entries.""" + return "\n".join(full_leaderboard.splitlines()[:TOP_LEADERBOARD_LINES]) + + +@_caches.leaderboard_cache.atomic_transaction +async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: + """ + Get the current Python Discord combined leaderboard. + + The leaderboard is cached and only fetched from the API if the current data + is older than the lifetime set in the constants. To prevent multiple calls + to this function fetching new leaderboard information in case of a cache + miss, this function is locked to one call at a time using a decorator. + """ + cached_leaderboard = await _caches.leaderboard_cache.to_dict() + + # Check if the cached leaderboard contains everything we expect it to. If it + # does not, this probably means the cache has not been created yet or has + # expired in Redis. This check also accounts for a malformed cache. + if invalidate_cache or any(key not in cached_leaderboard for key in REQUIRED_CACHE_KEYS): + log.info("No leaderboard cache available, fetching leaderboards...") + # Fetch the raw data + raw_leaderboard_data = await _fetch_leaderboard_data() + + # Parse it to extract "per star, per day" data and participant scores + parsed_leaderboard_data = _parse_raw_leaderboard_data(raw_leaderboard_data) + + leaderboard = parsed_leaderboard_data["leaderboard"] + number_of_participants = len(leaderboard) + formatted_leaderboard = _format_leaderboard(leaderboard) + full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) + leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() + + cached_leaderboard = { + "full_leaderboard": formatted_leaderboard, + "top_leaderboard": _get_top_leaderboard(formatted_leaderboard), + "full_leaderboard_url": full_leaderboard_url, + "leaderboard_fetched_at": leaderboard_fetched_at, + "number_of_participants": number_of_participants, + "daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]), + } + + # Store the new values in Redis + await _caches.leaderboard_cache.update(cached_leaderboard) + + # Set an expiry on the leaderboard RedisCache + with await _caches.leaderboard_cache._get_pool_connection() as connection: + await connection.expire( + _caches.leaderboard_cache.namespace, + AdventOfCode.leaderboard_cache_expiry_seconds + ) + + return cached_leaderboard + + +def get_summary_embed(leaderboard: dict) -> discord.Embed: + """Get an embed with the current summary stats of the leaderboard.""" + leaderboard_url = leaderboard['full_leaderboard_url'] + + aoc_embed = discord.Embed( + colour=Colours.soft_green, + timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), + ) + aoc_embed.add_field( + name="Number of Participants", + value=leaderboard["number_of_participants"], + inline=True, + ) + if leaderboard_url: + aoc_embed.add_field( + name="Full Leaderboard", + value=f"[Python Discord Leaderboard]({leaderboard_url})", + inline=True, + ) + aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) + aoc_embed.set_footer(text="Last Updated") + aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL) + + return aoc_embed + + +async def get_public_join_code(author: discord.Member) -> typing.Optional[str]: + """ + Get the join code for one of the non-staff leaderboards. + + If a user has previously requested a join code and their assigned board + hasn't filled up yet, we'll return the same join code to prevent them from + getting join codes for multiple boards. + """ + # Make sure to fetch new leaderboard information if the cache is older than + # 30 minutes. While this still means that there could be a discrepancy + # between the current leaderboard state and the numbers we have here, this + # should work fairly well given the buffer of slots that we have. + await fetch_leaderboard() + previously_assigned_board = await _caches.assigned_leaderboard.get(author.id) + current_board_counts = await _caches.leaderboard_counts.to_dict() + + # Remove the staff board from the current board counts as it should be ignored. + current_board_counts.pop(AdventOfCode.staff_leaderboard_id, None) + + # If this user has already received a join code, we'll give them the + # exact same one to prevent them from joining multiple boards and taking + # up multiple slots. + if previously_assigned_board: + # Check if their previously assigned board still has room for them + if current_board_counts.get(previously_assigned_board, 0) < 200: + log.info(f"{author} ({author.id}) was already assigned to a board with open slots.") + return AdventOfCode.leaderboards[previously_assigned_board].join_code + + log.info( + f"User {author} ({author.id}) previously received the join code for " + f"board `{previously_assigned_board}`, but that board's now full. " + "Assigning another board to this user." + ) + + # If we don't have the current board counts cached, let's force fetching a new cache + if not current_board_counts: + log.warning("Leaderboard counts were missing from the cache unexpectedly!") + await fetch_leaderboard(invalidate_cache=True) + current_board_counts = await _caches.leaderboard_counts.to_dict() + + # Find the board with the current lowest participant count. As we can't + best_board, _count = min(current_board_counts.items(), key=operator.itemgetter(1)) + + if current_board_counts.get(best_board, 0) >= 200: + log.warning(f"User {author} `{author.id}` requested a join code, but all boards are full!") + return + + log.info(f"Assigning user {author} ({author.id}) to board `{best_board}`") + await _caches.assigned_leaderboard.set(author.id, best_board) + + # Return the join code for this board + return AdventOfCode.leaderboards[best_board].join_code |