diff options
Diffstat (limited to 'bot/seasons/christmas/adventofcode.py')
-rw-r--r-- | bot/seasons/christmas/adventofcode.py | 622 |
1 files changed, 622 insertions, 0 deletions
diff --git a/bot/seasons/christmas/adventofcode.py b/bot/seasons/christmas/adventofcode.py new file mode 100644 index 00000000..4c766703 --- /dev/null +++ b/bot/seasons/christmas/adventofcode.py @@ -0,0 +1,622 @@ +import asyncio +import json +import logging +import re +from datetime import datetime, timedelta +from pathlib import Path +from typing import List + +import aiohttp +import discord +from bs4 import BeautifulSoup +from discord.ext import commands +from pytz import timezone + +from bot.constants import AdventOfCode as AocConfig +from bot.constants import Colours, Emojis, Tokens + +log = logging.getLogger(__name__) + +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} +AOC_SESSION_COOKIE = {"session": Tokens.aoc_session_cookie} + +EST = timezone("EST") + + +def is_in_advent() -> bool: + """ + Utility function to check if we are between December 1st + and December 25th. + """ + return datetime.now(EST).day in range(1, 26) and datetime.now(EST).month == 12 + + +def time_left_to_aoc_midnight() -> timedelta: + """ + This calculates the amount of time left until midnight in + UTC-5 (Advent of Code maintainer timezone). + """ + # Change all time properties back to 00:00 + todays_midnight = datetime.now(EST).replace(microsecond=0, + second=0, + minute=0, + hour=0) + + # We want tomorrow so add a day on + tomorrow = todays_midnight + timedelta(days=1) + + # Calculate the timedelta between the current time and midnight + return tomorrow, tomorrow - datetime.now(EST) + + +async def countdown_status(bot: commands.Bot): + """ + Every 2 minutes set the playing status of the bot to + the number of minutes & hours left until the next day + release. + """ + while is_in_advent(): + _, time_left = time_left_to_aoc_midnight() + + hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + + if hours == 0: + game = discord.Game(f"in {minutes} minutes") + else: + game = discord.Game(f"in {hours} hours and {minutes} minutes") + + # Status will look like "Playing in 5 hours and 30 minutes" + await bot.change_presence(activity=game) + + # Sleep 2 minutes + await asyncio.sleep(120) + + +async def day_countdown(bot: commands.Bot): + """ + Calculate the number of seconds left until the next day of advent. Once + we have calculated this we should then sleep that number and when the time + is reached ping the advent of code role notifying them that the new task is + ready. + """ + while is_in_advent(): + tomorrow, time_left = time_left_to_aoc_midnight() + + await asyncio.sleep(time_left.seconds) + + channel = bot.get_channel(AocConfig.channel_id) + + if not channel: + log.error("Could not find the AoC channel to send notification in") + break + + await channel.send(f"<@&{AocConfig.role_id}> Good morning! Day {tomorrow.day} is ready to be attempted. " + f"View it online now at https://adventofcode.com/{AocConfig.year}/day/{tomorrow.day}" + f" (this link could take a few minutes to start working). Good luck!") + + # Wait a couple minutes so that if our sleep didn't sleep enough + # time we don't end up announcing twice. + await asyncio.sleep(120) + + +class AdventOfCode: + def __init__(self, bot: commands.Bot): + self.bot = bot + + self._base_url = f"https://adventofcode.com/{AocConfig.year}" + self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" + self.private_leaderboard_url = f"{self._base_url}/leaderboard/private/view/{AocConfig.leaderboard_id}" + + self.about_aoc_filepath = Path("./bot/resources/advent_of_code/about.json") + self.cached_about_aoc = self._build_about_embed() + + self.cached_global_leaderboard = None + self.cached_private_leaderboard = None + + self.countdown_task = None + self.status_task = None + + countdown_coro = day_countdown(self.bot) + self.countdown_task = asyncio.ensure_future(self.bot.loop.create_task(countdown_coro)) + + status_coro = countdown_status(self.bot) + self.status_task = asyncio.ensure_future(self.bot.loop.create_task(status_coro)) + + @commands.group(name="adventofcode", aliases=("aoc",), invoke_without_command=True) + async def adventofcode_group(self, ctx: commands.Context): + """ + Advent of Code festivities! Ho Ho Ho! + """ + + await ctx.invoke(self.bot.get_command("help"), "adventofcode") + + @adventofcode_group.command(name="notifications", aliases=("notify", "notifs"), brief="Notifications for new days") + async def aoc_notifications(self, ctx: commands.Context): + """ + Assign the role for notifications about new days being ready. + + Call the same command again to end notifications and remove the role. + """ + role = ctx.guild.get_role(AocConfig.role_id) + + if role in ctx.author.roles: + await ctx.author.remove_roles(role) + await ctx.send("Okay! You have been unsubscribed from notifications. If in future you want to" + " resubscribe just run this command again.") + else: + await ctx.author.add_roles(role) + await ctx.send("Okay! You have been subscribed to notifications about new Advent of Code tasks." + " To unsubscribe in future run the same command again.") + + @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") + async def aoc_countdown(self, ctx: commands.Context): + """ + Return time left until next day + """ + tomorrow, time_left = time_left_to_aoc_midnight() + + hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + + await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") + + @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") + async def about_aoc(self, ctx: commands.Context): + """ + Respond with an explanation of all things Advent of Code + """ + + await ctx.send("", embed=self.cached_about_aoc) + + @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join PyDis' private AoC leaderboard") + async def join_leaderboard(self, ctx: commands.Context): + """ + Retrieve the link to join the PyDis AoC private leaderboard + """ + + info_str = ( + "Head over to https://adventofcode.com/leaderboard/private " + f"with code `{AocConfig.leaderboard_join_code}` to join the PyDis private leaderboard!" + ) + await ctx.send(info_str) + + @adventofcode_group.command( + name="leaderboard", + aliases=("board", "stats", "lb"), + brief="Get a snapshot of the PyDis private AoC leaderboard", + ) + async def aoc_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10): + """ + Pull the top number_of_people_to_display members from the PyDis leaderboard and post an embed + + For readability, number_of_people_to_display defaults to 10. A maximum value is configured in the + Advent of Code section of the bot constants. number_of_people_to_display values greater than this + limit will default to this maximum and provide feedback to the user. + """ + + async with ctx.typing(): + await self._check_leaderboard_cache(ctx) + + if not self.cached_private_leaderboard: + # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() + # Short circuit here if there's an issue + return + + number_of_people_to_display = await self._check_n_entries(ctx, number_of_people_to_display) + + # Generate leaderboard table for embed + members_to_print = self.cached_private_leaderboard.top_n(number_of_people_to_display) + table = AocPrivateLeaderboard.build_leaderboard_embed(members_to_print) + + # Build embed + aoc_embed = discord.Embed( + description=f"Total members: {len(self.cached_private_leaderboard.members)}", + colour=Colours.soft_green, + timestamp=self.cached_private_leaderboard.last_updated + ) + aoc_embed.set_author(name="Advent of Code", url=self.private_leaderboard_url) + aoc_embed.set_footer(text="Last Updated") + + await ctx.send( + content=f"Here's the current Top {number_of_people_to_display}! {Emojis.christmas_tree*3}\n\n{table}", + embed=aoc_embed, + ) + + @adventofcode_group.command( + name="global", + aliases=("globalstats", "globalboard", "gb"), + brief="Get a snapshot of the global AoC leaderboard", + ) + async def global_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10): + """ + Pull the top number_of_people_to_display members from the global AoC leaderboard and post an embed + + For readability, number_of_people_to_display defaults to 10. A maximum value is configured in the + Advent of Code section of the bot constants. number_of_people_to_display values greater than this + limit will default to this maximum and provide feedback to the user. + """ + + async with ctx.typing(): + await self._check_leaderboard_cache(ctx, global_board=True) + + if not self.cached_global_leaderboard: + # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() + # Short circuit here if there's an issue + return + + number_of_people_to_display = await self._check_n_entries(ctx, number_of_people_to_display) + + # Generate leaderboard table for embed + members_to_print = self.cached_global_leaderboard.top_n(number_of_people_to_display) + table = AocGlobalLeaderboard.build_leaderboard_embed(members_to_print) + + # Build embed + aoc_embed = discord.Embed(colour=Colours.soft_green, timestamp=self.cached_global_leaderboard.last_updated) + aoc_embed.set_author(name="Advent of Code", url=self._base_url) + aoc_embed.set_footer(text="Last Updated") + + await ctx.send( + content=f"Here's the current global Top {number_of_people_to_display}! {Emojis.christmas_tree*3}\n\n{table}", # noqa + embed=aoc_embed, + ) + + async def _check_leaderboard_cache(self, ctx, global_board: bool = False): + """ + Check age of current leaderboard & pull a new one if the board is too old + + global_board is a boolean to toggle between the global board and the Pydis private board + """ + + # Toggle between global & private leaderboards + if global_board: + log.debug("Checking global leaderboard cache") + leaderboard_str = "cached_global_leaderboard" + _shortstr = "global" + else: + log.debug("Checking private leaderboard cache") + leaderboard_str = "cached_private_leaderboard" + _shortstr = "private" + + leaderboard = getattr(self, leaderboard_str) + if not leaderboard: + log.debug(f"No cached {_shortstr} leaderboard found") + await self._boardgetter(global_board) + else: + leaderboard_age = datetime.utcnow() - leaderboard.last_updated + age_seconds = leaderboard_age.total_seconds() + if age_seconds < AocConfig.leaderboard_cache_age_threshold_seconds: + log.debug(f"Cached {_shortstr} leaderboard age less than threshold ({age_seconds} seconds old)") + else: + log.debug(f"Cached {_shortstr} leaderboard age greater than threshold ({age_seconds} seconds old)") + await self._boardgetter(global_board) + + leaderboard = getattr(self, leaderboard_str) + if not leaderboard: + await ctx.send( + "", + embed=_error_embed_helper( + title=f"Something's gone wrong and there's no cached {_shortstr} leaderboard!", + description="Please check in with a staff member.", + ), + ) + + async def _check_n_entries(self, ctx: commands.Context, number_of_people_to_display: int) -> int: + # Check for n > max_entries and n <= 0 + max_entries = AocConfig.leaderboard_max_displayed_members + author = ctx.message.author + if not 0 <= number_of_people_to_display <= max_entries: + log.debug( + f"{author.name} ({author.id}) attempted to fetch an invalid number " + f" of entries from the AoC leaderboard ({number_of_people_to_display})" + ) + await ctx.send( + f":x: {author.mention}, number of entries to display must be a positive " + f"integer less than or equal to {max_entries}\n\n" + f"Head to {self.private_leaderboard_url} to view the entire leaderboard" + ) + number_of_people_to_display = max_entries + + return number_of_people_to_display + + def _build_about_embed(self) -> discord.Embed: + """ + Build and return the informational "About AoC" embed from the resources file + """ + + with self.about_aoc_filepath.open("r") as f: + embed_fields = json.load(f) + + about_embed = discord.Embed(title=self._base_url, colour=Colours.soft_green, url=self._base_url) + about_embed.set_author(name="Advent of Code", url=self._base_url) + for field in embed_fields: + about_embed.add_field(**field) + + about_embed.set_footer(text=f"Last Updated (UTC): {datetime.utcnow()}") + + return about_embed + + async def _boardgetter(self, global_board: bool): + """ + Invoke the proper leaderboard getter based on the global_board boolean + """ + if global_board: + self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() + else: + self.cached_private_leaderboard = await AocPrivateLeaderboard.from_url() + + +class AocMember: + def __init__(self, name: str, aoc_id: int, stars: int, starboard: list, local_score: int, global_score: int): + self.name = name + self.aoc_id = aoc_id + self.stars = stars + self.starboard = starboard + self.local_score = local_score + self.global_score = global_score + self.completions = self._completions_from_starboard(self.starboard) + + def __repr__(self): + return f"<{self.name} ({self.aoc_id}): {self.local_score}>" + + @classmethod + def member_from_json(cls, injson: dict) -> "AocMember": + """ + Generate an AocMember from AoC's private leaderboard API JSON + + injson is expected to be the dict contained in: + + AoC_APIjson['members'][<member id>:str] + + Returns an AocMember object + """ + + return cls( + name=injson["name"] if injson["name"] else "Anonymous User", + aoc_id=int(injson["id"]), + stars=injson["stars"], + starboard=cls._starboard_from_json(injson["completion_day_level"]), + local_score=injson["local_score"], + global_score=injson["global_score"], + ) + + @staticmethod + def _starboard_from_json(injson: dict) -> list: + """ + Generate starboard from AoC's private leaderboard API JSON + + injson is expected to be the dict contained in: + + AoC_APIjson['members'][<member id>:str]['completion_day_level'] + + Returns a list of 25 lists, where each nested list contains a pair of booleans representing + the code challenge completion status for that day + """ + + # Basic input validation + if not isinstance(injson, dict): + raise ValueError + + # Initialize starboard + starboard = [] + for _i in range(25): + starboard.append([False, False]) + + # Iterate over days, which are the keys of injson (as str) + for day in injson: + idx = int(day) - 1 + # If there is a second star, the first star must be completed + if "2" in injson[day].keys(): + starboard[idx] = [True, True] + # If the day exists in injson, then at least the first star is completed + else: + starboard[idx] = [True, False] + + return starboard + + @staticmethod + def _completions_from_starboard(starboard: list) -> tuple: + """ + Return days completed, as a (1 star, 2 star) tuple, from starboard + """ + + completions = [0, 0] + for day in starboard: + if day[0]: + completions[0] += 1 + if day[1]: + completions[1] += 1 + + return tuple(completions) + + +class AocPrivateLeaderboard: + def __init__(self, members: list, owner_id: int, event_year: int): + self.members = members + self._owner_id = owner_id + self._event_year = event_year + self.last_updated = datetime.utcnow() + + def top_n(self, n: int = 10) -> dict: + """ + Return the top n participants on the leaderboard. + + If n is not specified, default to the top 10 + """ + + return self.members[:n] + + @staticmethod + async def json_from_url( + leaderboard_id: int = AocConfig.leaderboard_id, year: int = AocConfig.year + ) -> "AocPrivateLeaderboard": + """ + Request the API JSON from Advent of Code for leaderboard_id for the specified year's event + + If no year is input, year defaults to the current year + """ + + api_url = f"https://adventofcode.com/{year}/leaderboard/private/view/{leaderboard_id}.json" + + log.debug("Querying Advent of Code Private Leaderboard API") + async with aiohttp.ClientSession(cookies=AOC_SESSION_COOKIE, headers=AOC_REQUEST_HEADER) as session: + async with session.get(api_url) as resp: + if resp.status == 200: + raw_dict = await resp.json() + else: + log.warning(f"Bad response received from AoC ({resp.status}), check session cookie") + resp.raise_for_status() + + return raw_dict + + @classmethod + def from_json(cls, injson: dict) -> "AocPrivateLeaderboard": + """ + Generate an AocPrivateLeaderboard object from AoC's private leaderboard API JSON + """ + + return cls( + members=cls._sorted_members(injson["members"]), owner_id=injson["owner_id"], event_year=injson["event"] + ) + + @classmethod + async def from_url(cls) -> "AocPrivateLeaderboard": + """ + Helper wrapping of AocPrivateLeaderboard.json_from_url and AocPrivateLeaderboard.from_json + """ + + api_json = await cls.json_from_url() + return cls.from_json(api_json) + + @staticmethod + def _sorted_members(injson: dict) -> list: + """ + Generate a sorted list of AocMember objects from AoC's private leaderboard API JSON + + Output list is sorted based on the AocMember.local_score + """ + + members = [AocMember.member_from_json(injson[member]) for member in injson] + members.sort(key=lambda x: x.local_score, reverse=True) + + return members + + @staticmethod + def build_leaderboard_embed(members_to_print: List[AocMember]) -> str: + """ + Build a text table from members_to_print, a list of AocMember objects + + Returns a string to be used as the content of the bot's leaderboard response + """ + + stargroup = f"{Emojis.star}, {Emojis.star*2}" + header = f"{' '*3}{'Score'} {'Name':^25} {stargroup:^7}\n{'-'*44}" + table = "" + for i, member in enumerate(members_to_print): + if member.name == "Anonymous User": + name = f"{member.name} #{member.aoc_id}" + else: + name = member.name + + table += ( + f"{i+1:2}) {member.local_score:4} {name:25.25} " + f"({member.completions[0]:2}, {member.completions[1]:2})\n" + ) + else: + table = f"```{header}\n{table}```" + + return table + + +class AocGlobalLeaderboard: + def __init__(self, members: List[tuple]): + self.members = members + self.last_updated = datetime.utcnow() + + def top_n(self, n: int = 10) -> dict: + """ + Return the top n participants on the leaderboard. + + If n is not specified, default to the top 10 + """ + + return self.members[:n] + + @classmethod + async def from_url(cls) -> "AocGlobalLeaderboard": + """ + Generate an list of tuples for the entries on AoC's global leaderboard + + Because there is no API for this, web scraping needs to be used + """ + + aoc_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" + + async with aiohttp.ClientSession(headers=AOC_REQUEST_HEADER) as session: + async with session.get(aoc_url) as resp: + if resp.status == 200: + raw_html = await resp.text() + else: + log.warning(f"Bad response received from AoC ({resp.status}), check session cookie") + resp.raise_for_status() + + soup = BeautifulSoup(raw_html, "html.parser") + ele = soup.find_all("div", class_="leaderboard-entry") + + exp = r"(?:[ ]{,2}(\d+)\))?[ ]+(\d+)\s+([\w\(\)#\-\d ]+)" + + lb_list = [] + for entry in ele: + # Strip off the AoC++ decorator + raw_str = entry.text.replace("(AoC++)", "").rstrip() + + # Use a regex to extract the info from the string to unify formatting + # Group 1: Rank + # Group 2: Global Score + # Group 3: Member string + r = re.match(exp, raw_str) + + rank = int(r.group(1)) if r.group(1) else None + global_score = int(r.group(2)) + + member = r.group(3) + if member.lower().startswith("(anonymous"): + # Normalize anonymous user string by stripping () and title casing + member = re.sub(r"[\(\)]", "", member).title() + + lb_list.append((rank, global_score, member)) + + return cls(lb_list) + + @staticmethod + def build_leaderboard_embed(members_to_print: List[tuple]) -> str: + """ + Build a text table from members_to_print, a list of tuples + + Returns a string to be used as the content of the bot's leaderboard response + """ + + header = f"{' '*4}{'Score'} {'Name':^25}\n{'-'*36}" + table = "" + for member in members_to_print: + # In the event of a tie, rank is None + if member[0]: + rank = f"{member[0]:3})" + else: + rank = f"{' ':4}" + table += f"{rank} {member[1]:4} {member[2]:25.25}\n" + else: + table = f"```{header}\n{table}```" + + return table + + +def _error_embed_helper(title: str, description: str) -> discord.Embed: + """ + Return a red-colored Embed with the given title and description + """ + + return discord.Embed(title=title, description=description, colour=discord.Colour.red()) + + +def setup(bot: commands.Bot) -> None: + bot.add_cog(AdventOfCode(bot)) + log.info("Cog loaded: adventofcode") |