diff options
Diffstat (limited to 'bot/exts/events')
| -rw-r--r-- | bot/exts/events/advent_of_code/_cog.py | 342 | ||||
| -rw-r--r-- | bot/exts/events/advent_of_code/_helpers.py | 87 | ||||
| -rw-r--r-- | bot/exts/events/advent_of_code/views/dayandstarview.py | 82 | ||||
| -rw-r--r-- | bot/exts/events/hacktoberfest/hacktober-issue-finder.py | 11 | ||||
| -rw-r--r-- | bot/exts/events/trivianight/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/events/trivianight/_game.py | 192 | ||||
| -rw-r--r-- | bot/exts/events/trivianight/_questions.py | 179 | ||||
| -rw-r--r-- | bot/exts/events/trivianight/_scoreboard.py | 186 | ||||
| -rw-r--r-- | bot/exts/events/trivianight/trivianight.py | 328 |
9 files changed, 1312 insertions, 95 deletions
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py index ca60e517..518841d4 100644 --- a/bot/exts/events/advent_of_code/_cog.py +++ b/bot/exts/events/advent_of_code/_cog.py @@ -2,17 +2,22 @@ import json import logging from datetime import datetime, timedelta from pathlib import Path +from typing import Optional import arrow import discord -from discord.ext import commands +from async_rediscache import RedisCache +from discord.ext import commands, tasks from bot.bot import Bot from bot.constants import ( - AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS, + AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, PYTHON_PREFIX, Roles, WHITELISTED_CHANNELS ) from bot.exts.events.advent_of_code import _helpers +from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView +from bot.utils import members from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role +from bot.utils.exceptions import MovedCommandError from bot.utils.extensions import invoke_help_command log = logging.getLogger(__name__) @@ -29,6 +34,14 @@ AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,) class AdventOfCode(commands.Cog): """Advent of Code festivities! Ho Ho Ho!""" + # Redis Cache for linking Discord IDs to Advent of Code usernames + # RedisCache[member_id: aoc_username_string] + account_links = RedisCache() + + # A dict with keys of member_ids to block from getting the role + # RedisCache[member_id: None] + completionist_block_list = RedisCache() + def __init__(self, bot: Bot): self.bot = bot @@ -48,6 +61,62 @@ class AdventOfCode(commands.Cog): self.status_task.set_name("AoC Status Countdown") self.status_task.add_done_callback(_helpers.background_task_callback) + # Don't start task while event isn't running + # self.completionist_task.start() + + @tasks.loop(minutes=10.0) + async def completionist_task(self) -> None: + """ + Give members who have completed all 50 AoC stars the completionist role. + + Runs on a schedule, as defined in the task.loop decorator. + """ + await self.bot.wait_until_guild_available() + guild = self.bot.get_guild(Client.guild) + completionist_role = guild.get_role(Roles.aoc_completionist) + if completionist_role is None: + log.warning("Could not find the AoC completionist role; cancelling completionist task.") + self.completionist_task.cancel() + return + + aoc_name_to_member_id = { + aoc_name: member_id + for member_id, aoc_name in await self.account_links.items() + } + + try: + leaderboard = await _helpers.fetch_leaderboard() + except _helpers.FetchingLeaderboardFailedError: + await self.bot.send_log("Unable to fetch AoC leaderboard during role sync.") + return + + placement_leaderboard = json.loads(leaderboard["placement_leaderboard"]) + + for member_aoc_info in placement_leaderboard.values(): + if not member_aoc_info["stars"] == 50: + # Only give the role to people who have completed all 50 stars + continue + + aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}" + + member_id = aoc_name_to_member_id.get(aoc_name) + if not member_id: + log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.") + continue + + member = await members.get_or_fetch_member(guild, member_id) + if member is None: + log.debug(f"Could not find {member_id}, not giving role.") + continue + + if completionist_role in member.roles: + log.debug(f"{member.name} ({member.mention}) already has the completionist role.") + continue + + if not await self.completionist_block_list.contains(member_id): + log.debug(f"Giving completionist role to {member.name} ({member.mention}).") + await members.handle_role_change(member, member.add_roles, completionist_role) + @commands.group(name="adventofcode", aliases=("aoc",)) @whitelist_override(channels=AOC_WHITELIST) async def adventofcode_group(self, ctx: commands.Context) -> None: @@ -55,77 +124,59 @@ class AdventOfCode(commands.Cog): if not ctx.invoked_subcommand: await invoke_help_command(ctx) + @with_role(Roles.admins) @adventofcode_group.command( - name="subscribe", - aliases=("sub", "notifications", "notify", "notifs"), - brief="Notifications for new days" + name="block", + brief="Block a user from getting the completionist role.", ) - @whitelist_override(channels=AOC_WHITELIST) - async def aoc_subscribe(self, ctx: commands.Context) -> None: - """Assign the role for notifications about new days being ready.""" - current_year = datetime.now().year - if current_year != AocConfig.year: - await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!") - return + async def block_from_role(self, ctx: commands.Context, member: discord.Member) -> None: + """Block the given member from receiving the AoC completionist role, removing it from them if needed.""" + completionist_role = ctx.guild.get_role(Roles.aoc_completionist) + if completionist_role in member.roles: + await member.remove_roles(completionist_role) - role = ctx.guild.get_role(AocConfig.role_id) - unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" + await self.completionist_block_list.set(member.id, "sentinel") + await ctx.send(f":+1: Blocked {member.mention} from getting the AoC completionist role.") - if role not in ctx.author.roles: - await ctx.author.add_roles(role) - await ctx.send( - "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " - f"You can run `{unsubscribe_command}` to disable them again for you." - ) - else: - await ctx.send( - "Hey, you already are receiving notifications about new Advent of Code tasks. " - f"If you don't want them any more, run `{unsubscribe_command}` instead." - ) - - @in_month(Month.DECEMBER) - @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") + @commands.guild_only() + @adventofcode_group.command( + name="subscribe", + aliases=("sub", "notifications", "notify", "notifs", "unsubscribe", "unsub"), + help=f"NOTE: This command has been moved to {PYTHON_PREFIX}subscribe", + ) @whitelist_override(channels=AOC_WHITELIST) - async def aoc_unsubscribe(self, ctx: commands.Context) -> None: - """Remove the role for notifications about new days being ready.""" - role = ctx.guild.get_role(AocConfig.role_id) + async def aoc_subscribe(self, ctx: commands.Context) -> None: + """ + Deprecated role command. - if role in ctx.author.roles: - await ctx.author.remove_roles(role) - await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") - else: - await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") + This command has been moved to bot, and will be removed in the future. + """ + raise MovedCommandError(f"{PYTHON_PREFIX}subscribe") @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") @whitelist_override(channels=AOC_WHITELIST) async def aoc_countdown(self, ctx: commands.Context) -> None: """Return time left until next day.""" - if not _helpers.is_in_advent(): - datetime_now = arrow.now(_helpers.EST) - - # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past - this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) - next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) - deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) - delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta - - # Add a finer timedelta if there's less than a day left - if delta.days == 0: - delta_str = f"approximately {delta.seconds // 3600} hours" - else: - delta_str = f"{delta.days} days" + if _helpers.is_in_advent(): + tomorrow, _ = _helpers.time_left_to_est_midnight() + next_day_timestamp = int(tomorrow.timestamp()) - await ctx.send( - "The Advent of Code event is not currently running. " - f"The next event will start in {delta_str}." - ) + await ctx.send(f"Day {tomorrow.day} starts <t:{next_day_timestamp}:R>.") return - tomorrow, time_left = _helpers.time_left_to_est_midnight() + datetime_now = arrow.now(_helpers.EST) + # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past + this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) + next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) + deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) + delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta - hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + next_aoc_timestamp = int((datetime_now + delta).timestamp()) - await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") + await ctx.send( + "The Advent of Code event is not currently running. " + f"The next event will start <t:{next_aoc_timestamp}:R>." + ) @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") @whitelist_override(channels=AOC_WHITELIST) @@ -133,13 +184,19 @@ class AdventOfCode(commands.Cog): """Respond with an explanation of all things Advent of Code.""" await ctx.send(embed=self.cached_about_aoc) + @commands.guild_only() @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)") @whitelist_override(channels=AOC_WHITELIST) async def join_leaderboard(self, ctx: commands.Context) -> None: """DM the user the information for joining the Python Discord leaderboard.""" - current_year = datetime.now().year - if current_year != AocConfig.year: - await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!") + current_date = datetime.now() + allowed_months = (Month.NOVEMBER.value, Month.DECEMBER.value) + if not ( + current_date.month in allowed_months and current_date.year == AocConfig.year or + current_date.month == Month.JANUARY.value and current_date.year == AocConfig.year + 1 + ): + # Only allow joining the leaderboard in the run up to AOC and the January following. + await ctx.send(f"The Python Discord leaderboard for {current_date.year} is not yet available!") return author = ctx.author @@ -150,7 +207,7 @@ class AdventOfCode(commands.Cog): else: try: join_code = await _helpers.get_public_join_code(author) - except _helpers.FetchingLeaderboardFailed: + except _helpers.FetchingLeaderboardFailedError: await ctx.send(":x: Failed to get join code! Notified maintainers.") return @@ -178,33 +235,163 @@ class AdventOfCode(commands.Cog): else: await ctx.message.add_reaction(Emojis.envelope) - @in_month(Month.DECEMBER) + @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY) + @adventofcode_group.command( + name="link", + aliases=("connect",), + brief="Tie your Discord account with your Advent of Code name." + ) + @whitelist_override(channels=AOC_WHITELIST) + async def aoc_link_account(self, ctx: commands.Context, *, aoc_name: str = None) -> None: + """ + Link your Discord Account to your Advent of Code name. + + Stored in a Redis Cache with the format of `Discord ID: Advent of Code Name` + """ + cache_items = await self.account_links.items() + cache_aoc_names = [value for _, value in cache_items] + + if aoc_name: + # Let's check the current values in the cache to make sure it isn't already tied to a different account + if aoc_name == await self.account_links.get(ctx.author.id): + await ctx.reply(f"{aoc_name} is already tied to your account.") + return + elif aoc_name in cache_aoc_names: + log.info( + f"{ctx.author} ({ctx.author.id}) tried to connect their account to {aoc_name}," + " but it's already connected to another user." + ) + await ctx.reply( + f"{aoc_name} is already tied to another account." + " Please contact an admin if you believe this is an error." + ) + return + + # Update an existing link + if old_aoc_name := await self.account_links.get(ctx.author.id): + log.info(f"Changing link for {ctx.author} ({ctx.author.id}) from {old_aoc_name} to {aoc_name}.") + await self.account_links.set(ctx.author.id, aoc_name) + await ctx.reply(f"Your linked account has been changed to {aoc_name}.") + else: + # Create a new link + log.info(f"Linking {ctx.author} ({ctx.author.id}) to account {aoc_name}.") + await self.account_links.set(ctx.author.id, aoc_name) + await ctx.reply(f"You have linked your Discord ID to {aoc_name}.") + else: + # User has not supplied a name, let's check if they're in the cache or not + if cache_name := await self.account_links.get(ctx.author.id): + await ctx.reply(f"You have already linked an Advent of Code account: {cache_name}.") + else: + await ctx.reply( + "You have not linked an Advent of Code account." + " Please re-run the command with one specified." + ) + + @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY) + @adventofcode_group.command( + name="unlink", + aliases=("disconnect",), + brief="Tie your Discord account with your Advent of Code name." + ) + @whitelist_override(channels=AOC_WHITELIST) + async def aoc_unlink_account(self, ctx: commands.Context) -> None: + """ + Unlink your Discord ID with your Advent of Code leaderboard name. + + Deletes the entry that was Stored in the Redis cache. + """ + if aoc_cache_name := await self.account_links.get(ctx.author.id): + log.info(f"Unlinking {ctx.author} ({ctx.author.id}) from Advent of Code account {aoc_cache_name}") + await self.account_links.delete(ctx.author.id) + await ctx.reply(f"We have removed the link between your Discord ID and {aoc_cache_name}.") + else: + log.info(f"Attempted to unlink {ctx.author} ({ctx.author.id}), but no link was found.") + await ctx.reply("You don't have an Advent of Code account linked.") + + @in_month(Month.DECEMBER, Month.JANUARY) + @adventofcode_group.command( + name="dayandstar", + aliases=("daynstar", "daystar"), + brief="Get a view that lets you filter the leaderboard by day and star", + ) + @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) + async def aoc_day_and_star_leaderboard( + self, + ctx: commands.Context, + maximum_scorers_day_and_star: Optional[int] = 10 + ) -> None: + """Have the bot send a View that will let you filter the leaderboard by day and star.""" + if maximum_scorers_day_and_star > AocConfig.max_day_and_star_results or maximum_scorers_day_and_star <= 0: + raise commands.BadArgument( + f"The maximum number of results you can query is {AocConfig.max_day_and_star_results}" + ) + async with ctx.typing(): + try: + leaderboard = await _helpers.fetch_leaderboard() + except _helpers.FetchingLeaderboardFailedError: + await ctx.send(":x: Unable to fetch leaderboard!") + return + # This is a dictionary that contains solvers in respect of day, and star. + # e.g. 1-1 means the solvers of the first star of the first day and their completion time + per_day_and_star = json.loads(leaderboard['leaderboard_per_day_and_star']) + view = AoCDropdownView( + day_and_star_data=per_day_and_star, + maximum_scorers=maximum_scorers_day_and_star, + original_author=ctx.author + ) + message = await ctx.send( + content="Please select a day and a star to filter by!", + view=view + ) + await view.wait() + await message.edit(view=None) + + @in_month(Month.DECEMBER, Month.JANUARY) @adventofcode_group.command( name="leaderboard", aliases=("board", "lb"), brief="Get a snapshot of the PyDis private AoC leaderboard", ) @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) - async def aoc_leaderboard(self, ctx: commands.Context) -> None: - """Get the current top scorers of the Python Discord Leaderboard.""" + async def aoc_leaderboard(self, ctx: commands.Context, *, aoc_name: Optional[str] = None) -> None: + """ + Get the current top scorers of the Python Discord Leaderboard. + + Additionally you can specify an `aoc_name` that will append the + specified profile's personal stats to the top of the leaderboard + """ + # Strip quotes from the AoC username if needed (e.g. "My Name" -> My Name) + # This is to keep compatibility with those already used to wrapping the AoC name in quotes + # Note: only strips one layer of quotes to allow names with quotes at the start and end + # e.g. ""My Name"" -> "My Name" + if aoc_name and aoc_name.startswith('"') and aoc_name.endswith('"'): + aoc_name = aoc_name[1:-1] + + # Check if an advent of code account is linked in the Redis Cache if aoc_name is not given + if (aoc_cache_name := await self.account_links.get(ctx.author.id)) and aoc_name is None: + aoc_name = aoc_cache_name + async with ctx.typing(): try: - leaderboard = await _helpers.fetch_leaderboard() - except _helpers.FetchingLeaderboardFailed: + leaderboard = await _helpers.fetch_leaderboard(self_placement_name=aoc_name) + except _helpers.FetchingLeaderboardFailedError: await ctx.send(":x: Unable to fetch leaderboard!") return - number_of_participants = leaderboard["number_of_participants"] + number_of_participants = leaderboard["number_of_participants"] - top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) - header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}" - - table = f"```\n{leaderboard['top_leaderboard']}\n```" - info_embed = _helpers.get_summary_embed(leaderboard) + top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) + self_placement_header = " (and your personal stats compared to the top 10)" if aoc_name else "" + header = f"Here's our current top {top_count}{self_placement_header}! {Emojis.christmas_tree * 3}" + table = "```\n" \ + f"{leaderboard['placement_leaderboard'] if aoc_name else leaderboard['top_leaderboard']}" \ + "\n```" + info_embed = _helpers.get_summary_embed(leaderboard) - await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) + await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) + return - @in_month(Month.DECEMBER) + @in_month(Month.DECEMBER, Month.JANUARY) @adventofcode_group.command( name="global", aliases=("globalboard", "gb"), @@ -231,7 +418,7 @@ class AdventOfCode(commands.Cog): """Send an embed with daily completion statistics for the Python Discord leaderboard.""" try: leaderboard = await _helpers.fetch_leaderboard() - except _helpers.FetchingLeaderboardFailed: + except _helpers.FetchingLeaderboardFailedError: await ctx.send(":x: Can't fetch leaderboard for stats right now!") return @@ -251,7 +438,7 @@ class AdventOfCode(commands.Cog): info_embed = _helpers.get_summary_embed(leaderboard) await ctx.send(f"```\n{table}\n```", embed=info_embed) - @with_role(Roles.admin) + @with_role(Roles.admins) @adventofcode_group.command( name="refresh", aliases=("fetch",), @@ -267,7 +454,7 @@ class AdventOfCode(commands.Cog): async with ctx.typing(): try: await _helpers.fetch_leaderboard(invalidate_cache=True) - except _helpers.FetchingLeaderboardFailed: + except _helpers.FetchingLeaderboardFailedError: await ctx.send(":x: Something went wrong while trying to refresh the cache!") else: await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!") @@ -277,6 +464,7 @@ class AdventOfCode(commands.Cog): log.debug("Unloading the cog and canceling the background task.") self.notification_task.cancel() self.status_task.cancel() + self.completionist_task.cancel() def _build_about_embed(self) -> discord.Embed: """Build and return the informational "About AoC" embed from the resources file.""" diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py index 5fedb60f..6c004901 100644 --- a/bot/exts/events/advent_of_code/_helpers.py +++ b/bot/exts/events/advent_of_code/_helpers.py @@ -10,6 +10,7 @@ from typing import Any, Optional import aiohttp import arrow import discord +from discord.ext import commands from bot.bot import Bot from bot.constants import AdventOfCode, Channels, Colours @@ -70,6 +71,33 @@ class FetchingLeaderboardFailedError(Exception): """Raised when one or more leaderboards could not be fetched at all.""" +def _format_leaderboard_line(rank: int, data: dict[str, Any], *, is_author: bool) -> str: + """ + Build a string representing a line of the leaderboard. + + Parameters: + rank: + Rank in the leaderboard of this entry. + + data: + Mapping with entry information. + + Keyword arguments: + is_author: + Whether to address the name displayed in the returned line + personally. + + Returns: + A formatted line for the leaderboard. + """ + return AOC_TABLE_TEMPLATE.format( + rank=rank, + name=data['name'] if not is_author else f"(You) {data['name']}", + score=str(data['score']), + stars=f"({data['star_1']}, {data['star_2']})" + ) + + def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]: """ Provide a sorting value for our leaderboard. @@ -105,6 +133,7 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: # The data we get from the AoC website is structured by member, not by day/star, # which means we need to iterate over the members to transpose the data to a per # star view. We need that per star view to compute rank scores per star. + per_day_star_stats = collections.defaultdict(list) for member in raw_leaderboard_data.values(): name = member["name"] if member["name"] else f"Anonymous #{member['id']}" member_id = member["id"] @@ -122,6 +151,11 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: star_results[(day, star)].append( StarResult(member_id=member_id, completion_time=completion_time) ) + per_day_star_stats[f"{day}-{star}"].append( + {'completion_time': int(data["get_star_ts"]), 'member_name': name} + ) + for key in per_day_star_stats: + per_day_star_stats[key] = sorted(per_day_star_stats[key], key=operator.itemgetter('completion_time')) # Now that we have a transposed dataset that holds the completion time of all # participants per star, we can compute the rank-based scores each participant @@ -151,13 +185,26 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: # this data to JSON in order to cache it in Redis. daily_stats[day] = {"star_one": star_one, "star_two": star_two} - return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard} + return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard, 'per_day_and_star': per_day_star_stats} -def _format_leaderboard(leaderboard: dict[str, dict]) -> str: +def _format_leaderboard(leaderboard: dict[str, dict], self_placement_name: str = None) -> str: """Format the leaderboard using the AOC_TABLE_TEMPLATE.""" leaderboard_lines = [HEADER] + self_placement_exists = False for rank, data in enumerate(leaderboard.values(), start=1): + if self_placement_name and data["name"].lower() == self_placement_name.lower(): + leaderboard_lines.insert( + 1, + AOC_TABLE_TEMPLATE.format( + rank=rank, + name=f"(You) {data['name']}", + score=str(data["score"]), + stars=f"({data['star_1']}, {data['star_2']})" + ) + ) + self_placement_exists = True + continue leaderboard_lines.append( AOC_TABLE_TEMPLATE.format( rank=rank, @@ -166,7 +213,13 @@ def _format_leaderboard(leaderboard: dict[str, dict]) -> str: stars=f"({data['star_1']}, {data['star_2']})" ) ) - + if self_placement_name and not self_placement_exists: + raise commands.BadArgument( + "Sorry, your profile does not exist in this leaderboard." + "\n\n" + "To join our leaderboard, run the command `.aoc join`." + " If you've joined recently, please wait up to 30 minutes for our leaderboard to refresh." + ) return "\n".join(leaderboard_lines) @@ -202,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]: # Two attempts, one with the original session cookie and one with the fallback session for attempt in range(1, 3): - log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") + log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") cookies = {"session": leaderboard.session} try: raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies) @@ -254,7 +307,7 @@ def _get_top_leaderboard(full_leaderboard: str) -> str: @_caches.leaderboard_cache.atomic_transaction -async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: +async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: str = None) -> dict: """ Get the current Python Discord combined leaderboard. @@ -264,7 +317,6 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: miss, this function is locked to one call at a time using a decorator. """ cached_leaderboard = await _caches.leaderboard_cache.to_dict() - # Check if the cached leaderboard contains everything we expect it to. If it # does not, this probably means the cache has not been created yet or has # expired in Redis. This check also accounts for a malformed cache. @@ -280,15 +332,17 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: number_of_participants = len(leaderboard) formatted_leaderboard = _format_leaderboard(leaderboard) full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) - leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() + leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat() cached_leaderboard = { + "placement_leaderboard": json.dumps(raw_leaderboard_data), "full_leaderboard": formatted_leaderboard, "top_leaderboard": _get_top_leaderboard(formatted_leaderboard), "full_leaderboard_url": full_leaderboard_url, "leaderboard_fetched_at": leaderboard_fetched_at, "number_of_participants": number_of_participants, "daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]), + "leaderboard_per_day_and_star": json.dumps(parsed_leaderboard_data["per_day_and_star"]) } # Store the new values in Redis @@ -300,7 +354,13 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: _caches.leaderboard_cache.namespace, AdventOfCode.leaderboard_cache_expiry_seconds ) - + if self_placement_name: + formatted_placement_leaderboard = _parse_raw_leaderboard_data( + json.loads(cached_leaderboard["placement_leaderboard"]) + )["leaderboard"] + cached_leaderboard["placement_leaderboard"] = _get_top_leaderboard( + _format_leaderboard(formatted_placement_leaderboard, self_placement_name=self_placement_name) + ) return cached_leaderboard @@ -308,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed: """Get an embed with the current summary stats of the leaderboard.""" leaderboard_url = leaderboard["full_leaderboard_url"] refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60 + refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp()) + + aoc_embed = discord.Embed(colour=Colours.soft_green) - aoc_embed = discord.Embed( - colour=Colours.soft_green, - timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), - description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*" + aoc_embed.description = ( + f"The leaderboard is refreshed every {refresh_minutes} minutes.\n" + f"Last Updated: <t:{refreshed_unix}:t>" ) aoc_embed.add_field( name="Number of Participants", @@ -326,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed: inline=True, ) aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) - aoc_embed.set_footer(text="Last Updated") aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL) return aoc_embed diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py new file mode 100644 index 00000000..5529c12b --- /dev/null +++ b/bot/exts/events/advent_of_code/views/dayandstarview.py @@ -0,0 +1,82 @@ +from datetime import datetime + +import discord + +AOC_DAY_AND_STAR_TEMPLATE = "{rank: >4} | {name:25.25} | {completion_time: >10}" + + +class AoCDropdownView(discord.ui.View): + """Interactive view to filter AoC stats by Day and Star.""" + + def __init__(self, original_author: discord.Member, day_and_star_data: dict[str: dict], maximum_scorers: int): + super().__init__() + self.day = 0 + self.star = 0 + self.data = day_and_star_data + self.maximum_scorers = maximum_scorers + self.original_author = original_author + + def generate_output(self) -> str: + """ + Generates a formatted codeblock with AoC statistics based on the currently selected day and star. + + Optionally, when the requested day and star data does not exist yet it returns an error message. + """ + header = AOC_DAY_AND_STAR_TEMPLATE.format( + rank="Rank", + name="Name", completion_time="Completion time (UTC)" + ) + lines = [f"{header}\n{'-' * (len(header) + 2)}"] + if not (day_and_star_data := self.data.get(f"{self.day}-{self.star}")): + return ":x: The requested data for the specified day and star does not exist yet." + for rank, scorer in enumerate(day_and_star_data[:self.maximum_scorers]): + time_data = datetime.fromtimestamp(scorer['completion_time']).strftime("%I:%M:%S %p") + lines.append(AOC_DAY_AND_STAR_TEMPLATE.format( + datastamp="", + rank=rank + 1, + name=scorer['member_name'], + completion_time=time_data) + ) + joined_lines = "\n".join(lines) + return f"Statistics for Day: {self.day}, Star: {self.star}.\n ```\n{joined_lines}\n```" + + async def interaction_check(self, interaction: discord.Interaction) -> bool: + """Global check to ensure that the interacting user is the user who invoked the command originally.""" + if interaction.user != self.original_author: + await interaction.response.send_message( + ":x: You can't interact with someone else's response. Please run the command yourself!", + ephemeral=True + ) + return False + return True + + @discord.ui.select( + placeholder="Day", + options=[discord.SelectOption(label=str(i)) for i in range(1, 26)], + custom_id="day_select" + ) + async def day_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: + """Dropdown to choose a Day of the AoC.""" + self.day = select.values[0] + + @discord.ui.select( + placeholder="Star", + options=[discord.SelectOption(label=str(i)) for i in range(1, 3)], + custom_id="star_select" + ) + async def star_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: + """Dropdown to choose either the first or the second star.""" + self.star = select.values[0] + + @discord.ui.button(label="Fetch", style=discord.ButtonStyle.blurple) + async def fetch(self, button: discord.ui.Button, interaction: discord.Interaction) -> None: + """Button that fetches the statistics based on the dropdown values.""" + if self.day == 0 or self.star == 0: + await interaction.response.send_message( + "You have to select a value from both of the dropdowns!", + ephemeral=True + ) + else: + await interaction.response.edit_message(content=self.generate_output()) + self.day = 0 + self.star = 0 diff --git a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py index e3053851..1774564b 100644 --- a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py +++ b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py @@ -52,10 +52,10 @@ class HacktoberIssues(commands.Cog): async def get_issues(self, ctx: commands.Context, option: str) -> Optional[dict]: """Get a list of the python issues with the label 'hacktoberfest' from the Github api.""" if option == "beginner": - if (ctx.message.created_at - self.cache_timer_beginner).seconds <= 60: + if (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_beginner).seconds <= 60: log.debug("using cache") return self.cache_beginner - elif (ctx.message.created_at - self.cache_timer_normal).seconds <= 60: + elif (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_normal).seconds <= 60: log.debug("using cache") return self.cache_normal @@ -88,10 +88,10 @@ class HacktoberIssues(commands.Cog): if option == "beginner": self.cache_beginner = data - self.cache_timer_beginner = ctx.message.created_at + self.cache_timer_beginner = ctx.message.created_at.replace(tzinfo=None) else: self.cache_normal = data - self.cache_timer_normal = ctx.message.created_at + self.cache_timer_normal = ctx.message.created_at.replace(tzinfo=None) return data @@ -100,7 +100,8 @@ class HacktoberIssues(commands.Cog): """Format the issue data into a embed.""" title = issue["title"] issue_url = issue["url"].replace("api.", "").replace("/repos/", "/") - body = issue["body"] + # issues can have empty bodies, which in that case GitHub doesn't include the key in the API response + body = issue.get("body", "") labels = [label["name"] for label in issue["labels"]] embed = discord.Embed(title=title) diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/events/trivianight/__init__.py diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py new file mode 100644 index 00000000..8b012a17 --- /dev/null +++ b/bot/exts/events/trivianight/_game.py @@ -0,0 +1,192 @@ +import time +from random import randrange +from string import ascii_uppercase +from typing import Iterable, NamedTuple, Optional, TypedDict + +DEFAULT_QUESTION_POINTS = 10 +DEFAULT_QUESTION_TIME = 20 + + +class QuestionData(TypedDict): + """Representing the different 'keys' of the question taken from the JSON.""" + + number: str + description: str + answers: list[str] + correct: str + points: Optional[int] + time: Optional[int] + + +class UserGuess(NamedTuple): + """Represents the user's guess for a question.""" + + answer: str + editable: bool + elapsed: float + + +class QuestionClosed(RuntimeError): + """Exception raised when the question is not open for guesses anymore.""" + + +class AlreadyUpdated(RuntimeError): + """Exception raised when the user has already updated their guess once.""" + + +class AllQuestionsVisited(RuntimeError): + """Exception raised when all of the questions have been visited.""" + + +class Question: + """Interface for one question in a trivia night game.""" + + def __init__(self, data: QuestionData): + self._data = data + self._guesses: dict[int, UserGuess] = {} + self._started = None + + # These properties are mostly proxies to the underlying data: + + @property + def number(self) -> str: + """The number of the question.""" + return self._data["number"] + + @property + def description(self) -> str: + """The description of the question.""" + return self._data["description"] + + @property + def answers(self) -> list[tuple[str, str]]: + """ + The possible answers for this answer. + + This is a property that returns a list of letter, answer pairs. + """ + return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])] + + @property + def correct(self) -> str: + """The correct answer for this question.""" + return self._data["correct"] + + @property + def max_points(self) -> int: + """The maximum points that can be awarded for this question.""" + return self._data.get("points") or DEFAULT_QUESTION_POINTS + + @property + def time(self) -> float: + """The time allowed to answer the question.""" + return self._data.get("time") or DEFAULT_QUESTION_TIME + + def start(self) -> float: + """Start the question and return the time it started.""" + self._started = time.perf_counter() + return self._started + + def _update_guess(self, user: int, answer: str) -> UserGuess: + """Update an already existing guess.""" + if self._started is None: + raise QuestionClosed("Question is not open for answers.") + + if self._guesses[user][1] is False: + raise AlreadyUpdated(f"User({user}) has already updated their guess once.") + + self._guesses[user] = (answer, False, time.perf_counter() - self._started) + return self._guesses[user] + + def guess(self, user: int, answer: str) -> UserGuess: + """Add a guess made by a user to the current question.""" + if user in self._guesses: + return self._update_guess(user, answer) + + if self._started is None: + raise QuestionClosed("Question is not open for answers.") + + self._guesses[user] = (answer, True, time.perf_counter() - self._started) + return self._guesses[user] + + def stop(self) -> dict[int, UserGuess]: + """Stop the question and return the guesses that were made.""" + guesses = self._guesses + + self._started = None + self._guesses = {} + + return guesses + + +class TriviaNightGame: + """Interface for managing a game of trivia night.""" + + def __init__(self, data: list[QuestionData]) -> None: + self._questions = [Question(q) for q in data] + # A copy of the questions to keep for `.trivianight list` + self._all_questions = list(self._questions) + self.current_question: Optional[Question] = None + self._points = {} + self._speed = {} + + def __iter__(self) -> Iterable[Question]: + return iter(self._questions) + + def next_question(self, number: str = None) -> Question: + """ + Consume one random question from the trivia night game. + + One question is randomly picked from the list of questions which is then removed and returned. + """ + if self.current_question is not None: + raise RuntimeError("Cannot call next_question() when there is a current question.") + + if number is not None: + try: + question = [q for q in self._all_questions if q.number == int(number)][0] + except IndexError: + raise ValueError(f"Question number {number} does not exist.") + elif len(self._questions) == 0: + raise AllQuestionsVisited("All of the questions have been visited.") + else: + question = self._questions.pop(randrange(len(self._questions))) + + self.current_question = question + return question + + def end_question(self) -> None: + """ + End the current question. + + This method should be called when the question has been answered, it must be called before + attempting to call `next_question()` again. + """ + if self.current_question is None: + raise RuntimeError("Cannot call end_question() when there is no current question.") + + self.current_question.stop() + self.current_question = None + + def list_questions(self) -> str: + """ + List all the questions. + + This method should be called when `.trivianight list` is called to display the following information: + - Question number + - Question description + - Visited/not visited + """ + question_list = [] + + visited = ":white_check_mark:" + not_visited = ":x:" + + for question in self._all_questions: + formatted_string = ( + f"**Q{question.number}** {not_visited if question in self._questions else visited}" + f"\n{question.description}\n\n" + ) + question_list.append(formatted_string.rstrip()) + + return question_list diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py new file mode 100644 index 00000000..d6beced9 --- /dev/null +++ b/bot/exts/events/trivianight/_questions.py @@ -0,0 +1,179 @@ +from random import choice +from string import ascii_uppercase + +import discord +from discord import Embed, Interaction +from discord.ui import Button, View + +from bot.constants import Colours, NEGATIVE_REPLIES + +from ._game import AlreadyUpdated, Question, QuestionClosed +from ._scoreboard import Scoreboard + + +class AnswerButton(Button): + """Button subclass that's used to guess on a particular answer.""" + + def __init__(self, label: str, question: Question): + super().__init__(label=label, style=discord.ButtonStyle.green) + + self.question = question + + async def callback(self, interaction: Interaction) -> None: + """ + When a user interacts with the button, this will be called. + + Parameters: + - interaction: an instance of discord.Interaction representing the interaction between the user and the + button. + """ + try: + guess = self.question.guess(interaction.user.id, self.label) + except AlreadyUpdated: + await interaction.response.send_message( + embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="You've already changed your answer more than once!", + color=Colours.soft_red + ), + ephemeral=True + ) + return + except QuestionClosed: + await interaction.response.send_message( + embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="The question is no longer accepting guesses!", + color=Colours.soft_red + ), + ephemeral=True + ) + return + + if guess[1]: + await interaction.response.send_message( + embed=Embed( + title="Confirming that...", + description=f"You chose answer {self.label}.", + color=Colours.soft_green + ), + ephemeral=True + ) + else: + # guess[1] is False and they cannot change their answer again. Which + # indicates that they changed it this time around. + await interaction.response.send_message( + embed=Embed( + title="Confirming that...", + description=f"You changed your answer to answer {self.label}.", + color=Colours.soft_green + ), + ephemeral=True + ) + + +class QuestionView(View): + """View for one trivia night question.""" + + def __init__(self, question: Question) -> None: + super().__init__() + self.question = question + + for letter, _ in self.question.answers: + self.add_item(AnswerButton(letter, self.question)) + + @staticmethod + def unicodeify(text: str) -> str: + """ + Takes `text` and adds zero-width spaces to prevent copy and pasting the question. + + Parameters: + - text: A string that represents the question description to 'unicodeify' + """ + return "".join( + f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter + for idx, letter in enumerate(text) + ) + + def create_embed(self) -> Embed: + """Helper function to create the embed for the current question.""" + question_embed = Embed( + title=f"Question {self.question.number}", + description=self.unicodeify(self.question.description), + color=Colours.python_yellow + ) + + for label, answer in self.question.answers: + question_embed.add_field(name=f"Answer {label}", value=answer, inline=False) + + return question_embed + + def end_question(self, scoreboard: Scoreboard) -> Embed: + """ + Ends the question and displays the statistics on who got the question correct, awards points, etc. + + Returns: + An embed displaying the correct answers and the % of people that chose each answer. + """ + guesses = self.question.stop() + + labels = ascii_uppercase[:len(self.question.answers)] + + answer_embed = Embed( + title=f"The correct answer for Question {self.question.number} was...", + description=self.question.correct + ) + + if len(guesses) != 0: + answers_chosen = { + answer_choice: len( + tuple(filter(lambda x: x[0] == answer_choice, guesses.values())) + ) + for answer_choice in labels + } + + answers_chosen = dict( + sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True) + ) + + for answer, people_answered in answers_chosen.items(): + is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct + + # Setting the color of answer_embed to the % of people that got it correct via the mapping + if is_correct_answer: + # Maps the % of people who got it right to a color, from a range of red to green + percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB] + answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25] + + field_title = ( + (":white_check_mark: " if is_correct_answer else "") + + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose" + ) + + # The `ord` function is used here to change the letter to its corresponding position + answer_embed.add_field( + name=field_title, + value=self.question.answers[ord(answer) - 65][1], + inline=False + ) + + # Assign points to users + for user_id, answer in guesses.items(): + if dict(self.question.answers)[answer[0]] == self.question.correct: + scoreboard.assign_points( + int(user_id), + points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points, + speed=answer[-1] + ) + elif answer[-1] <= 2: + scoreboard.assign_points( + int(user_id), + points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points + ) + else: + scoreboard.assign_points( + int(user_id), + points=0 + ) + + return answer_embed diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py new file mode 100644 index 00000000..a5a5fcac --- /dev/null +++ b/bot/exts/events/trivianight/_scoreboard.py @@ -0,0 +1,186 @@ +from random import choice + +import discord.ui +from discord import ButtonStyle, Embed, Interaction, Member +from discord.ui import Button, View + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + + +class ScoreboardView(View): + """View for the scoreboard.""" + + def __init__(self, bot: Bot): + super().__init__() + self.bot = bot + + @staticmethod + def _int_to_ordinal(number: int) -> str: + """ + Converts an integer into an ordinal number, i.e. 1 to 1st. + + Parameters: + - number: an integer representing the number to convert to an ordinal number. + """ + suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)] + if (number % 100) in {11, 12, 13}: + suffix = "th" + + return str(number) + suffix + + async def create_main_leaderboard(self) -> Embed: + """ + Helper function that iterates through `self.points` to generate the main leaderboard embed. + + The main leaderboard would be formatted like the following: + **1**. @mention of the user (# of points) + along with the 29 other users who made it onto the leaderboard. + """ + formatted_string = "" + + for current_placement, (user, points) in enumerate(self.points.items()): + if current_placement + 1 > 30: + break + + user = await self.bot.fetch_user(int(user)) + formatted_string += f"**{current_placement + 1}.** {user.mention} " + formatted_string += f"({points:.1f} pts)\n" + if (current_placement + 1) % 10 == 0: + formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n" + + main_embed = Embed( + title="Winners of the Trivia Night", + description=formatted_string, + color=Colours.python_blue, + ) + + return main_embed + + async def _create_speed_embed(self) -> Embed: + """ + Helper function that iterates through `self.speed` to generate a leaderboard embed. + + The speed leaderboard would be formatted like the following: + **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s) + along with the 29 other users who made it onto the leaderboard. + """ + formatted_string = "" + + for current_placement, (user, time_taken) in enumerate(self.speed.items()): + if current_placement + 1 > 30: + break + + user = await self.bot.fetch_user(int(user)) + formatted_string += f"**{current_placement + 1}.** {user.mention} " + formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n" + if (current_placement + 1) % 10 == 0: + formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n" + + speed_embed = Embed( + title="Average time taken to answer a question", + description=formatted_string, + color=Colours.python_blue + ) + return speed_embed + + def _get_rank(self, member: Member) -> Embed: + """ + Gets the member's rank for the points leaderboard and speed leaderboard. + + Parameters: + - member: An instance of discord.Member representing the person who is trying to get their rank. + """ + rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue) + # These are stored as strings so that the last digit can be determined to choose the suffix + try: + points_rank = str(list(self.points).index(member.id) + 1) + speed_rank = str(list(self.speed).index(member.id) + 1) + except ValueError: + return Embed( + title=choice(NEGATIVE_REPLIES), + description="It looks like you didn't participate in the Trivia Night event!", + color=Colours.soft_red + ) + + rank_embed.add_field( + name="Total Points", + value=( + f"You got {self._int_to_ordinal(int(points_rank))} place" + f" with {self.points[member.id]:.1f} points." + ), + inline=False + ) + + rank_embed.add_field( + name="Average Speed", + value=( + f"You got {self._int_to_ordinal(int(speed_rank))} place" + f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds." + ), + inline=False + ) + return rank_embed + + @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green) + async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None: + """ + Send an ephemeral message with the speed leaderboard embed. + + Parameters: + - button: The discord.ui.Button instance representing the `Speed Leaderboard` button. + - interaction: The discord.Interaction instance containing information on the interaction between the user + and the button. + """ + await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True) + + @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple) + async def rank_button(self, button: Button, interaction: Interaction) -> None: + """ + Send an ephemeral message with the user's rank for the overall points/average speed. + + Parameters: + - button: The discord.ui.Button instance representing the `What's my rank?` button. + - interaction: The discord.Interaction instance containing information on the interaction between the user + and the button. + """ + await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True) + + +class Scoreboard: + """Class for the scoreboard for the Trivia Night event.""" + + def __init__(self, bot: Bot): + self._bot = bot + self._points = {} + self._speed = {} + + def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None: + """ + Assign points or deduct points to/from a certain user. + + This method should be called once the question has finished and all answers have been registered. + """ + if points is not None and user_id not in self._points.keys(): + self._points[user_id] = points + elif points is not None: + self._points[user_id] += points + + if speed is not None and user_id not in self._speed.keys(): + self._speed[user_id] = [1, speed] + elif speed is not None: + self._speed[user_id] = [ + self._speed[user_id][0] + 1, self._speed[user_id][1] + speed + ] + + async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]: + """Returns the embed of the main leaderboard along with the ScoreboardView.""" + view = ScoreboardView(self._bot) + + view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True)) + view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0])) + + return ( + await view.create_main_leaderboard(), + view if not speed_leaderboard else await view._create_speed_embed() + ) diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py new file mode 100644 index 00000000..18d8327a --- /dev/null +++ b/bot/exts/events/trivianight/trivianight.py @@ -0,0 +1,328 @@ +import asyncio +from json import JSONDecodeError, loads +from random import choice +from typing import Optional + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles +from bot.utils.pagination import LinePaginator + +from ._game import AllQuestionsVisited, TriviaNightGame +from ._questions import QuestionView +from ._scoreboard import Scoreboard + +# The ID you see below are the Events Lead role ID and the Event Runner Role ID +TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408) + + +class TriviaNightCog(commands.Cog): + """Cog for the Python Trivia Night event.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.game: Optional[TriviaNightGame] = None + self.scoreboard: Optional[Scoreboard] = None + self.question_closed: asyncio.Event = None + + @commands.group(aliases=["tn"], invoke_without_command=True) + async def trivianight(self, ctx: commands.Context) -> None: + """ + The command group for the Python Discord Trivia Night. + + If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is. + """ + cog_description = Embed( + title="What is .trivianight?", + description=( + "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other" + " players in a trivia about Python!" + ), + color=Colours.soft_green + ) + await ctx.send(embed=cog_description) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None: + """ + Loads a JSON file from the provided attachment or argument. + + The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below: + - number: int (represents the current question #) + - description: str (represents the question itself) + - answers: list[str] (represents the different answers possible, must be a length of 4) + - correct: str (represents the correct answer in terms of what the correct answer is in `answers` + - time: Optional[int] (represents the timer for the question and how long it should run, default is 10) + - points: Optional[int] (represents how many points are awarded for each question, default is 10) + + The load command accepts three different ways of loading in a JSON: + - an attachment of the JSON file + - a message link to the attachment/JSON + - reading the JSON itself via a codeblock or plain text + """ + if self.game is not None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is already a trivia night running!", + color=Colours.soft_red + )) + return + + if ctx.message.attachments: + json_text = (await ctx.message.attachments[0].read()).decode("utf8") + elif not to_load: + raise commands.BadArgument("You didn't attach an attachment nor link a message!") + elif ( + to_load.startswith("https://discord.com/channels") + or to_load.startswith("https://discordapp.com/channels") + ): + channel_id, message_id = to_load.split("/")[-2:] + channel = await ctx.guild.fetch_channel(int(channel_id)) + message = await channel.fetch_message(int(message_id)) + if message.attachments: + json_text = (await message.attachments[0].read()).decode("utf8") + else: + json_text = message.content.replace("```", "").replace("json", "").replace("\n", "") + else: + json_text = to_load.replace("```", "").replace("json", "").replace("\n", "") + + try: + serialized_json = loads(json_text) + except JSONDecodeError as error: + raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}") + + self.game = TriviaNightGame(serialized_json) + self.question_closed = asyncio.Event() + + success_embed = Embed( + title=choice(POSITIVE_REPLIES), + description="The JSON was loaded successfully!", + color=Colours.soft_green + ) + + self.scoreboard = Scoreboard(self.bot) + + await ctx.send(embed=success_embed) + + @trivianight.command(aliases=('next',)) + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def question(self, ctx: commands.Context, question_number: str = None) -> None: + """ + Gets a random question from the unanswered question list and lets the user(s) choose the answer. + + This command will continuously count down until the time limit of the question is exhausted. + However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="There is already an ongoing question!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + try: + next_question = self.game.next_question(question_number) + except AllQuestionsVisited: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="All of the questions have been used.", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + await ctx.send("Next question in 3 seconds! Get ready...") + await asyncio.sleep(3) + + question_view = QuestionView(next_question) + question_embed = question_view.create_embed() + + next_question.start() + message = await ctx.send(embed=question_embed, view=question_view) + + # Exponentially sleep less and less until the time limit is reached + percentage = 1 + while True: + percentage *= 0.5 + duration = next_question.time * percentage + + await asyncio.wait([self.question_closed.wait()], timeout=duration) + + if self.question_closed.is_set(): + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + self.question_closed.clear() + return + + if int(duration) > 1: + # It is quite ugly to display decimals, the delay for requests to reach Discord + # cause sub-second accuracy to be quite pointless. + await ctx.send(f"{int(duration)}s remaining...") + else: + # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a + # half, of that half) we must sleep both halves at the end. + await asyncio.wait([self.question_closed.wait()], timeout=duration) + if self.question_closed.is_set(): + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + self.question_closed.clear() + return + break + + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def list(self, ctx: commands.Context) -> None: + """ + Display all the questions left in the question bank. + + Questions are displayed in the following format: + Q(number): Question description | :white_check_mark: if the question was used otherwise :x:. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + question_list = self.game.list_questions() + + list_embed = Embed(title="All Trivia Night Questions") + + if len(question_list) == 1: + list_embed.description = question_list[0] + await ctx.send(embed=list_embed) + else: + await LinePaginator.paginate( + question_list, + ctx, + list_embed + ) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def stop(self, ctx: commands.Context) -> None: + """ + End the ongoing question to show the correct question. + + This command should be used if the question should be ended early or if the time limit fails + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no ongoing question!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + self.question_closed.set() + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def end(self, ctx: commands.Context) -> None: + """ + Displays the scoreboard view. + + The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the + 30 players who had the fastest average response time to a question where they got the question right. + + The scoreboard view also has a button where the user can see their own rank, points and average speed if they + didn't make it onto the leaderboard. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="You can't end the event while a question is ongoing!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + scoreboard_embed, scoreboard_view = await self.scoreboard.display() + await ctx.send(embed=scoreboard_embed, view=scoreboard_view) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def scoreboard(self, ctx: commands.Context) -> None: + """ + Displays the scoreboard. + + The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the + 30 players who had the fastest average response time to a question where they got the question right. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="You can't end the event while a question is ongoing!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True) + await ctx.send(embeds=(scoreboard_embed, speed_scoreboard)) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def end_game(self, ctx: commands.Context) -> None: + """Ends the ongoing game.""" + self.game = None + + await ctx.send(embed=Embed( + title=choice(POSITIVE_REPLIES), + description="The game has been stopped.", + color=Colours.soft_green + )) + + +def setup(bot: Bot) -> None: + """Load the TriviaNight cog.""" + bot.add_cog(TriviaNightCog(bot)) |