aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/events
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/events')
-rw-r--r--bot/exts/events/advent_of_code/_cog.py342
-rw-r--r--bot/exts/events/advent_of_code/_helpers.py87
-rw-r--r--bot/exts/events/advent_of_code/views/dayandstarview.py82
-rw-r--r--bot/exts/events/hacktoberfest/hacktober-issue-finder.py11
-rw-r--r--bot/exts/events/trivianight/__init__.py0
-rw-r--r--bot/exts/events/trivianight/_game.py192
-rw-r--r--bot/exts/events/trivianight/_questions.py179
-rw-r--r--bot/exts/events/trivianight/_scoreboard.py186
-rw-r--r--bot/exts/events/trivianight/trivianight.py328
9 files changed, 1312 insertions, 95 deletions
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py
index ca60e517..518841d4 100644
--- a/bot/exts/events/advent_of_code/_cog.py
+++ b/bot/exts/events/advent_of_code/_cog.py
@@ -2,17 +2,22 @@ import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
+from typing import Optional
import arrow
import discord
-from discord.ext import commands
+from async_rediscache import RedisCache
+from discord.ext import commands, tasks
from bot.bot import Bot
from bot.constants import (
- AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS,
+ AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, PYTHON_PREFIX, Roles, WHITELISTED_CHANNELS
)
from bot.exts.events.advent_of_code import _helpers
+from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView
+from bot.utils import members
from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role
+from bot.utils.exceptions import MovedCommandError
from bot.utils.extensions import invoke_help_command
log = logging.getLogger(__name__)
@@ -29,6 +34,14 @@ AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,)
class AdventOfCode(commands.Cog):
"""Advent of Code festivities! Ho Ho Ho!"""
+ # Redis Cache for linking Discord IDs to Advent of Code usernames
+ # RedisCache[member_id: aoc_username_string]
+ account_links = RedisCache()
+
+ # A dict with keys of member_ids to block from getting the role
+ # RedisCache[member_id: None]
+ completionist_block_list = RedisCache()
+
def __init__(self, bot: Bot):
self.bot = bot
@@ -48,6 +61,62 @@ class AdventOfCode(commands.Cog):
self.status_task.set_name("AoC Status Countdown")
self.status_task.add_done_callback(_helpers.background_task_callback)
+ # Don't start task while event isn't running
+ # self.completionist_task.start()
+
+ @tasks.loop(minutes=10.0)
+ async def completionist_task(self) -> None:
+ """
+ Give members who have completed all 50 AoC stars the completionist role.
+
+ Runs on a schedule, as defined in the task.loop decorator.
+ """
+ await self.bot.wait_until_guild_available()
+ guild = self.bot.get_guild(Client.guild)
+ completionist_role = guild.get_role(Roles.aoc_completionist)
+ if completionist_role is None:
+ log.warning("Could not find the AoC completionist role; cancelling completionist task.")
+ self.completionist_task.cancel()
+ return
+
+ aoc_name_to_member_id = {
+ aoc_name: member_id
+ for member_id, aoc_name in await self.account_links.items()
+ }
+
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailedError:
+ await self.bot.send_log("Unable to fetch AoC leaderboard during role sync.")
+ return
+
+ placement_leaderboard = json.loads(leaderboard["placement_leaderboard"])
+
+ for member_aoc_info in placement_leaderboard.values():
+ if not member_aoc_info["stars"] == 50:
+ # Only give the role to people who have completed all 50 stars
+ continue
+
+ aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}"
+
+ member_id = aoc_name_to_member_id.get(aoc_name)
+ if not member_id:
+ log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.")
+ continue
+
+ member = await members.get_or_fetch_member(guild, member_id)
+ if member is None:
+ log.debug(f"Could not find {member_id}, not giving role.")
+ continue
+
+ if completionist_role in member.roles:
+ log.debug(f"{member.name} ({member.mention}) already has the completionist role.")
+ continue
+
+ if not await self.completionist_block_list.contains(member_id):
+ log.debug(f"Giving completionist role to {member.name} ({member.mention}).")
+ await members.handle_role_change(member, member.add_roles, completionist_role)
+
@commands.group(name="adventofcode", aliases=("aoc",))
@whitelist_override(channels=AOC_WHITELIST)
async def adventofcode_group(self, ctx: commands.Context) -> None:
@@ -55,77 +124,59 @@ class AdventOfCode(commands.Cog):
if not ctx.invoked_subcommand:
await invoke_help_command(ctx)
+ @with_role(Roles.admins)
@adventofcode_group.command(
- name="subscribe",
- aliases=("sub", "notifications", "notify", "notifs"),
- brief="Notifications for new days"
+ name="block",
+ brief="Block a user from getting the completionist role.",
)
- @whitelist_override(channels=AOC_WHITELIST)
- async def aoc_subscribe(self, ctx: commands.Context) -> None:
- """Assign the role for notifications about new days being ready."""
- current_year = datetime.now().year
- if current_year != AocConfig.year:
- await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!")
- return
+ async def block_from_role(self, ctx: commands.Context, member: discord.Member) -> None:
+ """Block the given member from receiving the AoC completionist role, removing it from them if needed."""
+ completionist_role = ctx.guild.get_role(Roles.aoc_completionist)
+ if completionist_role in member.roles:
+ await member.remove_roles(completionist_role)
- role = ctx.guild.get_role(AocConfig.role_id)
- unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe"
+ await self.completionist_block_list.set(member.id, "sentinel")
+ await ctx.send(f":+1: Blocked {member.mention} from getting the AoC completionist role.")
- if role not in ctx.author.roles:
- await ctx.author.add_roles(role)
- await ctx.send(
- "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. "
- f"You can run `{unsubscribe_command}` to disable them again for you."
- )
- else:
- await ctx.send(
- "Hey, you already are receiving notifications about new Advent of Code tasks. "
- f"If you don't want them any more, run `{unsubscribe_command}` instead."
- )
-
- @in_month(Month.DECEMBER)
- @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days")
+ @commands.guild_only()
+ @adventofcode_group.command(
+ name="subscribe",
+ aliases=("sub", "notifications", "notify", "notifs", "unsubscribe", "unsub"),
+ help=f"NOTE: This command has been moved to {PYTHON_PREFIX}subscribe",
+ )
@whitelist_override(channels=AOC_WHITELIST)
- async def aoc_unsubscribe(self, ctx: commands.Context) -> None:
- """Remove the role for notifications about new days being ready."""
- role = ctx.guild.get_role(AocConfig.role_id)
+ async def aoc_subscribe(self, ctx: commands.Context) -> None:
+ """
+ Deprecated role command.
- if role in ctx.author.roles:
- await ctx.author.remove_roles(role)
- await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.")
- else:
- await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.")
+ This command has been moved to bot, and will be removed in the future.
+ """
+ raise MovedCommandError(f"{PYTHON_PREFIX}subscribe")
@adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day")
@whitelist_override(channels=AOC_WHITELIST)
async def aoc_countdown(self, ctx: commands.Context) -> None:
"""Return time left until next day."""
- if not _helpers.is_in_advent():
- datetime_now = arrow.now(_helpers.EST)
-
- # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
- this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
- next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
- deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
- delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
-
- # Add a finer timedelta if there's less than a day left
- if delta.days == 0:
- delta_str = f"approximately {delta.seconds // 3600} hours"
- else:
- delta_str = f"{delta.days} days"
+ if _helpers.is_in_advent():
+ tomorrow, _ = _helpers.time_left_to_est_midnight()
+ next_day_timestamp = int(tomorrow.timestamp())
- await ctx.send(
- "The Advent of Code event is not currently running. "
- f"The next event will start in {delta_str}."
- )
+ await ctx.send(f"Day {tomorrow.day} starts <t:{next_day_timestamp}:R>.")
return
- tomorrow, time_left = _helpers.time_left_to_est_midnight()
+ datetime_now = arrow.now(_helpers.EST)
+ # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
+ this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
+ next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
+ deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
+ delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
- hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60
+ next_aoc_timestamp = int((datetime_now + delta).timestamp())
- await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.")
+ await ctx.send(
+ "The Advent of Code event is not currently running. "
+ f"The next event will start <t:{next_aoc_timestamp}:R>."
+ )
@adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code")
@whitelist_override(channels=AOC_WHITELIST)
@@ -133,13 +184,19 @@ class AdventOfCode(commands.Cog):
"""Respond with an explanation of all things Advent of Code."""
await ctx.send(embed=self.cached_about_aoc)
+ @commands.guild_only()
@adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)")
@whitelist_override(channels=AOC_WHITELIST)
async def join_leaderboard(self, ctx: commands.Context) -> None:
"""DM the user the information for joining the Python Discord leaderboard."""
- current_year = datetime.now().year
- if current_year != AocConfig.year:
- await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!")
+ current_date = datetime.now()
+ allowed_months = (Month.NOVEMBER.value, Month.DECEMBER.value)
+ if not (
+ current_date.month in allowed_months and current_date.year == AocConfig.year or
+ current_date.month == Month.JANUARY.value and current_date.year == AocConfig.year + 1
+ ):
+ # Only allow joining the leaderboard in the run up to AOC and the January following.
+ await ctx.send(f"The Python Discord leaderboard for {current_date.year} is not yet available!")
return
author = ctx.author
@@ -150,7 +207,7 @@ class AdventOfCode(commands.Cog):
else:
try:
join_code = await _helpers.get_public_join_code(author)
- except _helpers.FetchingLeaderboardFailed:
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Failed to get join code! Notified maintainers.")
return
@@ -178,33 +235,163 @@ class AdventOfCode(commands.Cog):
else:
await ctx.message.add_reaction(Emojis.envelope)
- @in_month(Month.DECEMBER)
+ @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY)
+ @adventofcode_group.command(
+ name="link",
+ aliases=("connect",),
+ brief="Tie your Discord account with your Advent of Code name."
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_link_account(self, ctx: commands.Context, *, aoc_name: str = None) -> None:
+ """
+ Link your Discord Account to your Advent of Code name.
+
+ Stored in a Redis Cache with the format of `Discord ID: Advent of Code Name`
+ """
+ cache_items = await self.account_links.items()
+ cache_aoc_names = [value for _, value in cache_items]
+
+ if aoc_name:
+ # Let's check the current values in the cache to make sure it isn't already tied to a different account
+ if aoc_name == await self.account_links.get(ctx.author.id):
+ await ctx.reply(f"{aoc_name} is already tied to your account.")
+ return
+ elif aoc_name in cache_aoc_names:
+ log.info(
+ f"{ctx.author} ({ctx.author.id}) tried to connect their account to {aoc_name},"
+ " but it's already connected to another user."
+ )
+ await ctx.reply(
+ f"{aoc_name} is already tied to another account."
+ " Please contact an admin if you believe this is an error."
+ )
+ return
+
+ # Update an existing link
+ if old_aoc_name := await self.account_links.get(ctx.author.id):
+ log.info(f"Changing link for {ctx.author} ({ctx.author.id}) from {old_aoc_name} to {aoc_name}.")
+ await self.account_links.set(ctx.author.id, aoc_name)
+ await ctx.reply(f"Your linked account has been changed to {aoc_name}.")
+ else:
+ # Create a new link
+ log.info(f"Linking {ctx.author} ({ctx.author.id}) to account {aoc_name}.")
+ await self.account_links.set(ctx.author.id, aoc_name)
+ await ctx.reply(f"You have linked your Discord ID to {aoc_name}.")
+ else:
+ # User has not supplied a name, let's check if they're in the cache or not
+ if cache_name := await self.account_links.get(ctx.author.id):
+ await ctx.reply(f"You have already linked an Advent of Code account: {cache_name}.")
+ else:
+ await ctx.reply(
+ "You have not linked an Advent of Code account."
+ " Please re-run the command with one specified."
+ )
+
+ @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY)
+ @adventofcode_group.command(
+ name="unlink",
+ aliases=("disconnect",),
+ brief="Tie your Discord account with your Advent of Code name."
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_unlink_account(self, ctx: commands.Context) -> None:
+ """
+ Unlink your Discord ID with your Advent of Code leaderboard name.
+
+ Deletes the entry that was Stored in the Redis cache.
+ """
+ if aoc_cache_name := await self.account_links.get(ctx.author.id):
+ log.info(f"Unlinking {ctx.author} ({ctx.author.id}) from Advent of Code account {aoc_cache_name}")
+ await self.account_links.delete(ctx.author.id)
+ await ctx.reply(f"We have removed the link between your Discord ID and {aoc_cache_name}.")
+ else:
+ log.info(f"Attempted to unlink {ctx.author} ({ctx.author.id}), but no link was found.")
+ await ctx.reply("You don't have an Advent of Code account linked.")
+
+ @in_month(Month.DECEMBER, Month.JANUARY)
+ @adventofcode_group.command(
+ name="dayandstar",
+ aliases=("daynstar", "daystar"),
+ brief="Get a view that lets you filter the leaderboard by day and star",
+ )
+ @whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
+ async def aoc_day_and_star_leaderboard(
+ self,
+ ctx: commands.Context,
+ maximum_scorers_day_and_star: Optional[int] = 10
+ ) -> None:
+ """Have the bot send a View that will let you filter the leaderboard by day and star."""
+ if maximum_scorers_day_and_star > AocConfig.max_day_and_star_results or maximum_scorers_day_and_star <= 0:
+ raise commands.BadArgument(
+ f"The maximum number of results you can query is {AocConfig.max_day_and_star_results}"
+ )
+ async with ctx.typing():
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailedError:
+ await ctx.send(":x: Unable to fetch leaderboard!")
+ return
+ # This is a dictionary that contains solvers in respect of day, and star.
+ # e.g. 1-1 means the solvers of the first star of the first day and their completion time
+ per_day_and_star = json.loads(leaderboard['leaderboard_per_day_and_star'])
+ view = AoCDropdownView(
+ day_and_star_data=per_day_and_star,
+ maximum_scorers=maximum_scorers_day_and_star,
+ original_author=ctx.author
+ )
+ message = await ctx.send(
+ content="Please select a day and a star to filter by!",
+ view=view
+ )
+ await view.wait()
+ await message.edit(view=None)
+
+ @in_month(Month.DECEMBER, Month.JANUARY)
@adventofcode_group.command(
name="leaderboard",
aliases=("board", "lb"),
brief="Get a snapshot of the PyDis private AoC leaderboard",
)
@whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
- async def aoc_leaderboard(self, ctx: commands.Context) -> None:
- """Get the current top scorers of the Python Discord Leaderboard."""
+ async def aoc_leaderboard(self, ctx: commands.Context, *, aoc_name: Optional[str] = None) -> None:
+ """
+ Get the current top scorers of the Python Discord Leaderboard.
+
+ Additionally you can specify an `aoc_name` that will append the
+ specified profile's personal stats to the top of the leaderboard
+ """
+ # Strip quotes from the AoC username if needed (e.g. "My Name" -> My Name)
+ # This is to keep compatibility with those already used to wrapping the AoC name in quotes
+ # Note: only strips one layer of quotes to allow names with quotes at the start and end
+ # e.g. ""My Name"" -> "My Name"
+ if aoc_name and aoc_name.startswith('"') and aoc_name.endswith('"'):
+ aoc_name = aoc_name[1:-1]
+
+ # Check if an advent of code account is linked in the Redis Cache if aoc_name is not given
+ if (aoc_cache_name := await self.account_links.get(ctx.author.id)) and aoc_name is None:
+ aoc_name = aoc_cache_name
+
async with ctx.typing():
try:
- leaderboard = await _helpers.fetch_leaderboard()
- except _helpers.FetchingLeaderboardFailed:
+ leaderboard = await _helpers.fetch_leaderboard(self_placement_name=aoc_name)
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Unable to fetch leaderboard!")
return
- number_of_participants = leaderboard["number_of_participants"]
+ number_of_participants = leaderboard["number_of_participants"]
- top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
- header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}"
-
- table = f"```\n{leaderboard['top_leaderboard']}\n```"
- info_embed = _helpers.get_summary_embed(leaderboard)
+ top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
+ self_placement_header = " (and your personal stats compared to the top 10)" if aoc_name else ""
+ header = f"Here's our current top {top_count}{self_placement_header}! {Emojis.christmas_tree * 3}"
+ table = "```\n" \
+ f"{leaderboard['placement_leaderboard'] if aoc_name else leaderboard['top_leaderboard']}" \
+ "\n```"
+ info_embed = _helpers.get_summary_embed(leaderboard)
- await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
+ await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
+ return
- @in_month(Month.DECEMBER)
+ @in_month(Month.DECEMBER, Month.JANUARY)
@adventofcode_group.command(
name="global",
aliases=("globalboard", "gb"),
@@ -231,7 +418,7 @@ class AdventOfCode(commands.Cog):
"""Send an embed with daily completion statistics for the Python Discord leaderboard."""
try:
leaderboard = await _helpers.fetch_leaderboard()
- except _helpers.FetchingLeaderboardFailed:
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Can't fetch leaderboard for stats right now!")
return
@@ -251,7 +438,7 @@ class AdventOfCode(commands.Cog):
info_embed = _helpers.get_summary_embed(leaderboard)
await ctx.send(f"```\n{table}\n```", embed=info_embed)
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
@adventofcode_group.command(
name="refresh",
aliases=("fetch",),
@@ -267,7 +454,7 @@ class AdventOfCode(commands.Cog):
async with ctx.typing():
try:
await _helpers.fetch_leaderboard(invalidate_cache=True)
- except _helpers.FetchingLeaderboardFailed:
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Something went wrong while trying to refresh the cache!")
else:
await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!")
@@ -277,6 +464,7 @@ class AdventOfCode(commands.Cog):
log.debug("Unloading the cog and canceling the background task.")
self.notification_task.cancel()
self.status_task.cancel()
+ self.completionist_task.cancel()
def _build_about_embed(self) -> discord.Embed:
"""Build and return the informational "About AoC" embed from the resources file."""
diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py
index 5fedb60f..6c004901 100644
--- a/bot/exts/events/advent_of_code/_helpers.py
+++ b/bot/exts/events/advent_of_code/_helpers.py
@@ -10,6 +10,7 @@ from typing import Any, Optional
import aiohttp
import arrow
import discord
+from discord.ext import commands
from bot.bot import Bot
from bot.constants import AdventOfCode, Channels, Colours
@@ -70,6 +71,33 @@ class FetchingLeaderboardFailedError(Exception):
"""Raised when one or more leaderboards could not be fetched at all."""
+def _format_leaderboard_line(rank: int, data: dict[str, Any], *, is_author: bool) -> str:
+ """
+ Build a string representing a line of the leaderboard.
+
+ Parameters:
+ rank:
+ Rank in the leaderboard of this entry.
+
+ data:
+ Mapping with entry information.
+
+ Keyword arguments:
+ is_author:
+ Whether to address the name displayed in the returned line
+ personally.
+
+ Returns:
+ A formatted line for the leaderboard.
+ """
+ return AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=data['name'] if not is_author else f"(You) {data['name']}",
+ score=str(data['score']),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+
+
def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]:
"""
Provide a sorting value for our leaderboard.
@@ -105,6 +133,7 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
# The data we get from the AoC website is structured by member, not by day/star,
# which means we need to iterate over the members to transpose the data to a per
# star view. We need that per star view to compute rank scores per star.
+ per_day_star_stats = collections.defaultdict(list)
for member in raw_leaderboard_data.values():
name = member["name"] if member["name"] else f"Anonymous #{member['id']}"
member_id = member["id"]
@@ -122,6 +151,11 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
star_results[(day, star)].append(
StarResult(member_id=member_id, completion_time=completion_time)
)
+ per_day_star_stats[f"{day}-{star}"].append(
+ {'completion_time': int(data["get_star_ts"]), 'member_name': name}
+ )
+ for key in per_day_star_stats:
+ per_day_star_stats[key] = sorted(per_day_star_stats[key], key=operator.itemgetter('completion_time'))
# Now that we have a transposed dataset that holds the completion time of all
# participants per star, we can compute the rank-based scores each participant
@@ -151,13 +185,26 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
# this data to JSON in order to cache it in Redis.
daily_stats[day] = {"star_one": star_one, "star_two": star_two}
- return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard}
+ return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard, 'per_day_and_star': per_day_star_stats}
-def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
+def _format_leaderboard(leaderboard: dict[str, dict], self_placement_name: str = None) -> str:
"""Format the leaderboard using the AOC_TABLE_TEMPLATE."""
leaderboard_lines = [HEADER]
+ self_placement_exists = False
for rank, data in enumerate(leaderboard.values(), start=1):
+ if self_placement_name and data["name"].lower() == self_placement_name.lower():
+ leaderboard_lines.insert(
+ 1,
+ AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=f"(You) {data['name']}",
+ score=str(data["score"]),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+ )
+ self_placement_exists = True
+ continue
leaderboard_lines.append(
AOC_TABLE_TEMPLATE.format(
rank=rank,
@@ -166,7 +213,13 @@ def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
stars=f"({data['star_1']}, {data['star_2']})"
)
)
-
+ if self_placement_name and not self_placement_exists:
+ raise commands.BadArgument(
+ "Sorry, your profile does not exist in this leaderboard."
+ "\n\n"
+ "To join our leaderboard, run the command `.aoc join`."
+ " If you've joined recently, please wait up to 30 minutes for our leaderboard to refresh."
+ )
return "\n".join(leaderboard_lines)
@@ -202,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]:
# Two attempts, one with the original session cookie and one with the fallback session
for attempt in range(1, 3):
- log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
+ log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
cookies = {"session": leaderboard.session}
try:
raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies)
@@ -254,7 +307,7 @@ def _get_top_leaderboard(full_leaderboard: str) -> str:
@_caches.leaderboard_cache.atomic_transaction
-async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
+async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: str = None) -> dict:
"""
Get the current Python Discord combined leaderboard.
@@ -264,7 +317,6 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
miss, this function is locked to one call at a time using a decorator.
"""
cached_leaderboard = await _caches.leaderboard_cache.to_dict()
-
# Check if the cached leaderboard contains everything we expect it to. If it
# does not, this probably means the cache has not been created yet or has
# expired in Redis. This check also accounts for a malformed cache.
@@ -280,15 +332,17 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
number_of_participants = len(leaderboard)
formatted_leaderboard = _format_leaderboard(leaderboard)
full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard)
- leaderboard_fetched_at = datetime.datetime.utcnow().isoformat()
+ leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
cached_leaderboard = {
+ "placement_leaderboard": json.dumps(raw_leaderboard_data),
"full_leaderboard": formatted_leaderboard,
"top_leaderboard": _get_top_leaderboard(formatted_leaderboard),
"full_leaderboard_url": full_leaderboard_url,
"leaderboard_fetched_at": leaderboard_fetched_at,
"number_of_participants": number_of_participants,
"daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]),
+ "leaderboard_per_day_and_star": json.dumps(parsed_leaderboard_data["per_day_and_star"])
}
# Store the new values in Redis
@@ -300,7 +354,13 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
_caches.leaderboard_cache.namespace,
AdventOfCode.leaderboard_cache_expiry_seconds
)
-
+ if self_placement_name:
+ formatted_placement_leaderboard = _parse_raw_leaderboard_data(
+ json.loads(cached_leaderboard["placement_leaderboard"])
+ )["leaderboard"]
+ cached_leaderboard["placement_leaderboard"] = _get_top_leaderboard(
+ _format_leaderboard(formatted_placement_leaderboard, self_placement_name=self_placement_name)
+ )
return cached_leaderboard
@@ -308,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
"""Get an embed with the current summary stats of the leaderboard."""
leaderboard_url = leaderboard["full_leaderboard_url"]
refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60
+ refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp())
+
+ aoc_embed = discord.Embed(colour=Colours.soft_green)
- aoc_embed = discord.Embed(
- colour=Colours.soft_green,
- timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]),
- description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*"
+ aoc_embed.description = (
+ f"The leaderboard is refreshed every {refresh_minutes} minutes.\n"
+ f"Last Updated: <t:{refreshed_unix}:t>"
)
aoc_embed.add_field(
name="Number of Participants",
@@ -326,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
inline=True,
)
aoc_embed.set_author(name="Advent of Code", url=leaderboard_url)
- aoc_embed.set_footer(text="Last Updated")
aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL)
return aoc_embed
diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py
new file mode 100644
index 00000000..5529c12b
--- /dev/null
+++ b/bot/exts/events/advent_of_code/views/dayandstarview.py
@@ -0,0 +1,82 @@
+from datetime import datetime
+
+import discord
+
+AOC_DAY_AND_STAR_TEMPLATE = "{rank: >4} | {name:25.25} | {completion_time: >10}"
+
+
+class AoCDropdownView(discord.ui.View):
+ """Interactive view to filter AoC stats by Day and Star."""
+
+ def __init__(self, original_author: discord.Member, day_and_star_data: dict[str: dict], maximum_scorers: int):
+ super().__init__()
+ self.day = 0
+ self.star = 0
+ self.data = day_and_star_data
+ self.maximum_scorers = maximum_scorers
+ self.original_author = original_author
+
+ def generate_output(self) -> str:
+ """
+ Generates a formatted codeblock with AoC statistics based on the currently selected day and star.
+
+ Optionally, when the requested day and star data does not exist yet it returns an error message.
+ """
+ header = AOC_DAY_AND_STAR_TEMPLATE.format(
+ rank="Rank",
+ name="Name", completion_time="Completion time (UTC)"
+ )
+ lines = [f"{header}\n{'-' * (len(header) + 2)}"]
+ if not (day_and_star_data := self.data.get(f"{self.day}-{self.star}")):
+ return ":x: The requested data for the specified day and star does not exist yet."
+ for rank, scorer in enumerate(day_and_star_data[:self.maximum_scorers]):
+ time_data = datetime.fromtimestamp(scorer['completion_time']).strftime("%I:%M:%S %p")
+ lines.append(AOC_DAY_AND_STAR_TEMPLATE.format(
+ datastamp="",
+ rank=rank + 1,
+ name=scorer['member_name'],
+ completion_time=time_data)
+ )
+ joined_lines = "\n".join(lines)
+ return f"Statistics for Day: {self.day}, Star: {self.star}.\n ```\n{joined_lines}\n```"
+
+ async def interaction_check(self, interaction: discord.Interaction) -> bool:
+ """Global check to ensure that the interacting user is the user who invoked the command originally."""
+ if interaction.user != self.original_author:
+ await interaction.response.send_message(
+ ":x: You can't interact with someone else's response. Please run the command yourself!",
+ ephemeral=True
+ )
+ return False
+ return True
+
+ @discord.ui.select(
+ placeholder="Day",
+ options=[discord.SelectOption(label=str(i)) for i in range(1, 26)],
+ custom_id="day_select"
+ )
+ async def day_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None:
+ """Dropdown to choose a Day of the AoC."""
+ self.day = select.values[0]
+
+ @discord.ui.select(
+ placeholder="Star",
+ options=[discord.SelectOption(label=str(i)) for i in range(1, 3)],
+ custom_id="star_select"
+ )
+ async def star_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None:
+ """Dropdown to choose either the first or the second star."""
+ self.star = select.values[0]
+
+ @discord.ui.button(label="Fetch", style=discord.ButtonStyle.blurple)
+ async def fetch(self, button: discord.ui.Button, interaction: discord.Interaction) -> None:
+ """Button that fetches the statistics based on the dropdown values."""
+ if self.day == 0 or self.star == 0:
+ await interaction.response.send_message(
+ "You have to select a value from both of the dropdowns!",
+ ephemeral=True
+ )
+ else:
+ await interaction.response.edit_message(content=self.generate_output())
+ self.day = 0
+ self.star = 0
diff --git a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
index e3053851..1774564b 100644
--- a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
+++ b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
@@ -52,10 +52,10 @@ class HacktoberIssues(commands.Cog):
async def get_issues(self, ctx: commands.Context, option: str) -> Optional[dict]:
"""Get a list of the python issues with the label 'hacktoberfest' from the Github api."""
if option == "beginner":
- if (ctx.message.created_at - self.cache_timer_beginner).seconds <= 60:
+ if (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_beginner).seconds <= 60:
log.debug("using cache")
return self.cache_beginner
- elif (ctx.message.created_at - self.cache_timer_normal).seconds <= 60:
+ elif (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_normal).seconds <= 60:
log.debug("using cache")
return self.cache_normal
@@ -88,10 +88,10 @@ class HacktoberIssues(commands.Cog):
if option == "beginner":
self.cache_beginner = data
- self.cache_timer_beginner = ctx.message.created_at
+ self.cache_timer_beginner = ctx.message.created_at.replace(tzinfo=None)
else:
self.cache_normal = data
- self.cache_timer_normal = ctx.message.created_at
+ self.cache_timer_normal = ctx.message.created_at.replace(tzinfo=None)
return data
@@ -100,7 +100,8 @@ class HacktoberIssues(commands.Cog):
"""Format the issue data into a embed."""
title = issue["title"]
issue_url = issue["url"].replace("api.", "").replace("/repos/", "/")
- body = issue["body"]
+ # issues can have empty bodies, which in that case GitHub doesn't include the key in the API response
+ body = issue.get("body", "")
labels = [label["name"] for label in issue["labels"]]
embed = discord.Embed(title=title)
diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/events/trivianight/__init__.py
diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py
new file mode 100644
index 00000000..8b012a17
--- /dev/null
+++ b/bot/exts/events/trivianight/_game.py
@@ -0,0 +1,192 @@
+import time
+from random import randrange
+from string import ascii_uppercase
+from typing import Iterable, NamedTuple, Optional, TypedDict
+
+DEFAULT_QUESTION_POINTS = 10
+DEFAULT_QUESTION_TIME = 20
+
+
+class QuestionData(TypedDict):
+ """Representing the different 'keys' of the question taken from the JSON."""
+
+ number: str
+ description: str
+ answers: list[str]
+ correct: str
+ points: Optional[int]
+ time: Optional[int]
+
+
+class UserGuess(NamedTuple):
+ """Represents the user's guess for a question."""
+
+ answer: str
+ editable: bool
+ elapsed: float
+
+
+class QuestionClosed(RuntimeError):
+ """Exception raised when the question is not open for guesses anymore."""
+
+
+class AlreadyUpdated(RuntimeError):
+ """Exception raised when the user has already updated their guess once."""
+
+
+class AllQuestionsVisited(RuntimeError):
+ """Exception raised when all of the questions have been visited."""
+
+
+class Question:
+ """Interface for one question in a trivia night game."""
+
+ def __init__(self, data: QuestionData):
+ self._data = data
+ self._guesses: dict[int, UserGuess] = {}
+ self._started = None
+
+ # These properties are mostly proxies to the underlying data:
+
+ @property
+ def number(self) -> str:
+ """The number of the question."""
+ return self._data["number"]
+
+ @property
+ def description(self) -> str:
+ """The description of the question."""
+ return self._data["description"]
+
+ @property
+ def answers(self) -> list[tuple[str, str]]:
+ """
+ The possible answers for this answer.
+
+ This is a property that returns a list of letter, answer pairs.
+ """
+ return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])]
+
+ @property
+ def correct(self) -> str:
+ """The correct answer for this question."""
+ return self._data["correct"]
+
+ @property
+ def max_points(self) -> int:
+ """The maximum points that can be awarded for this question."""
+ return self._data.get("points") or DEFAULT_QUESTION_POINTS
+
+ @property
+ def time(self) -> float:
+ """The time allowed to answer the question."""
+ return self._data.get("time") or DEFAULT_QUESTION_TIME
+
+ def start(self) -> float:
+ """Start the question and return the time it started."""
+ self._started = time.perf_counter()
+ return self._started
+
+ def _update_guess(self, user: int, answer: str) -> UserGuess:
+ """Update an already existing guess."""
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ if self._guesses[user][1] is False:
+ raise AlreadyUpdated(f"User({user}) has already updated their guess once.")
+
+ self._guesses[user] = (answer, False, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def guess(self, user: int, answer: str) -> UserGuess:
+ """Add a guess made by a user to the current question."""
+ if user in self._guesses:
+ return self._update_guess(user, answer)
+
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ self._guesses[user] = (answer, True, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def stop(self) -> dict[int, UserGuess]:
+ """Stop the question and return the guesses that were made."""
+ guesses = self._guesses
+
+ self._started = None
+ self._guesses = {}
+
+ return guesses
+
+
+class TriviaNightGame:
+ """Interface for managing a game of trivia night."""
+
+ def __init__(self, data: list[QuestionData]) -> None:
+ self._questions = [Question(q) for q in data]
+ # A copy of the questions to keep for `.trivianight list`
+ self._all_questions = list(self._questions)
+ self.current_question: Optional[Question] = None
+ self._points = {}
+ self._speed = {}
+
+ def __iter__(self) -> Iterable[Question]:
+ return iter(self._questions)
+
+ def next_question(self, number: str = None) -> Question:
+ """
+ Consume one random question from the trivia night game.
+
+ One question is randomly picked from the list of questions which is then removed and returned.
+ """
+ if self.current_question is not None:
+ raise RuntimeError("Cannot call next_question() when there is a current question.")
+
+ if number is not None:
+ try:
+ question = [q for q in self._all_questions if q.number == int(number)][0]
+ except IndexError:
+ raise ValueError(f"Question number {number} does not exist.")
+ elif len(self._questions) == 0:
+ raise AllQuestionsVisited("All of the questions have been visited.")
+ else:
+ question = self._questions.pop(randrange(len(self._questions)))
+
+ self.current_question = question
+ return question
+
+ def end_question(self) -> None:
+ """
+ End the current question.
+
+ This method should be called when the question has been answered, it must be called before
+ attempting to call `next_question()` again.
+ """
+ if self.current_question is None:
+ raise RuntimeError("Cannot call end_question() when there is no current question.")
+
+ self.current_question.stop()
+ self.current_question = None
+
+ def list_questions(self) -> str:
+ """
+ List all the questions.
+
+ This method should be called when `.trivianight list` is called to display the following information:
+ - Question number
+ - Question description
+ - Visited/not visited
+ """
+ question_list = []
+
+ visited = ":white_check_mark:"
+ not_visited = ":x:"
+
+ for question in self._all_questions:
+ formatted_string = (
+ f"**Q{question.number}** {not_visited if question in self._questions else visited}"
+ f"\n{question.description}\n\n"
+ )
+ question_list.append(formatted_string.rstrip())
+
+ return question_list
diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py
new file mode 100644
index 00000000..d6beced9
--- /dev/null
+++ b/bot/exts/events/trivianight/_questions.py
@@ -0,0 +1,179 @@
+from random import choice
+from string import ascii_uppercase
+
+import discord
+from discord import Embed, Interaction
+from discord.ui import Button, View
+
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+from ._game import AlreadyUpdated, Question, QuestionClosed
+from ._scoreboard import Scoreboard
+
+
+class AnswerButton(Button):
+ """Button subclass that's used to guess on a particular answer."""
+
+ def __init__(self, label: str, question: Question):
+ super().__init__(label=label, style=discord.ButtonStyle.green)
+
+ self.question = question
+
+ async def callback(self, interaction: Interaction) -> None:
+ """
+ When a user interacts with the button, this will be called.
+
+ Parameters:
+ - interaction: an instance of discord.Interaction representing the interaction between the user and the
+ button.
+ """
+ try:
+ guess = self.question.guess(interaction.user.id, self.label)
+ except AlreadyUpdated:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You've already changed your answer more than once!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+ except QuestionClosed:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="The question is no longer accepting guesses!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+
+ if guess[1]:
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You chose answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+ else:
+ # guess[1] is False and they cannot change their answer again. Which
+ # indicates that they changed it this time around.
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You changed your answer to answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+
+
+class QuestionView(View):
+ """View for one trivia night question."""
+
+ def __init__(self, question: Question) -> None:
+ super().__init__()
+ self.question = question
+
+ for letter, _ in self.question.answers:
+ self.add_item(AnswerButton(letter, self.question))
+
+ @staticmethod
+ def unicodeify(text: str) -> str:
+ """
+ Takes `text` and adds zero-width spaces to prevent copy and pasting the question.
+
+ Parameters:
+ - text: A string that represents the question description to 'unicodeify'
+ """
+ return "".join(
+ f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter
+ for idx, letter in enumerate(text)
+ )
+
+ def create_embed(self) -> Embed:
+ """Helper function to create the embed for the current question."""
+ question_embed = Embed(
+ title=f"Question {self.question.number}",
+ description=self.unicodeify(self.question.description),
+ color=Colours.python_yellow
+ )
+
+ for label, answer in self.question.answers:
+ question_embed.add_field(name=f"Answer {label}", value=answer, inline=False)
+
+ return question_embed
+
+ def end_question(self, scoreboard: Scoreboard) -> Embed:
+ """
+ Ends the question and displays the statistics on who got the question correct, awards points, etc.
+
+ Returns:
+ An embed displaying the correct answers and the % of people that chose each answer.
+ """
+ guesses = self.question.stop()
+
+ labels = ascii_uppercase[:len(self.question.answers)]
+
+ answer_embed = Embed(
+ title=f"The correct answer for Question {self.question.number} was...",
+ description=self.question.correct
+ )
+
+ if len(guesses) != 0:
+ answers_chosen = {
+ answer_choice: len(
+ tuple(filter(lambda x: x[0] == answer_choice, guesses.values()))
+ )
+ for answer_choice in labels
+ }
+
+ answers_chosen = dict(
+ sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True)
+ )
+
+ for answer, people_answered in answers_chosen.items():
+ is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct
+
+ # Setting the color of answer_embed to the % of people that got it correct via the mapping
+ if is_correct_answer:
+ # Maps the % of people who got it right to a color, from a range of red to green
+ percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB]
+ answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25]
+
+ field_title = (
+ (":white_check_mark: " if is_correct_answer else "")
+ + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose"
+ )
+
+ # The `ord` function is used here to change the letter to its corresponding position
+ answer_embed.add_field(
+ name=field_title,
+ value=self.question.answers[ord(answer) - 65][1],
+ inline=False
+ )
+
+ # Assign points to users
+ for user_id, answer in guesses.items():
+ if dict(self.question.answers)[answer[0]] == self.question.correct:
+ scoreboard.assign_points(
+ int(user_id),
+ points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points,
+ speed=answer[-1]
+ )
+ elif answer[-1] <= 2:
+ scoreboard.assign_points(
+ int(user_id),
+ points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points
+ )
+ else:
+ scoreboard.assign_points(
+ int(user_id),
+ points=0
+ )
+
+ return answer_embed
diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py
new file mode 100644
index 00000000..a5a5fcac
--- /dev/null
+++ b/bot/exts/events/trivianight/_scoreboard.py
@@ -0,0 +1,186 @@
+from random import choice
+
+import discord.ui
+from discord import ButtonStyle, Embed, Interaction, Member
+from discord.ui import Button, View
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+
+class ScoreboardView(View):
+ """View for the scoreboard."""
+
+ def __init__(self, bot: Bot):
+ super().__init__()
+ self.bot = bot
+
+ @staticmethod
+ def _int_to_ordinal(number: int) -> str:
+ """
+ Converts an integer into an ordinal number, i.e. 1 to 1st.
+
+ Parameters:
+ - number: an integer representing the number to convert to an ordinal number.
+ """
+ suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)]
+ if (number % 100) in {11, 12, 13}:
+ suffix = "th"
+
+ return str(number) + suffix
+
+ async def create_main_leaderboard(self) -> Embed:
+ """
+ Helper function that iterates through `self.points` to generate the main leaderboard embed.
+
+ The main leaderboard would be formatted like the following:
+ **1**. @mention of the user (# of points)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, points) in enumerate(self.points.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({points:.1f} pts)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ main_embed = Embed(
+ title="Winners of the Trivia Night",
+ description=formatted_string,
+ color=Colours.python_blue,
+ )
+
+ return main_embed
+
+ async def _create_speed_embed(self) -> Embed:
+ """
+ Helper function that iterates through `self.speed` to generate a leaderboard embed.
+
+ The speed leaderboard would be formatted like the following:
+ **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, time_taken) in enumerate(self.speed.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ speed_embed = Embed(
+ title="Average time taken to answer a question",
+ description=formatted_string,
+ color=Colours.python_blue
+ )
+ return speed_embed
+
+ def _get_rank(self, member: Member) -> Embed:
+ """
+ Gets the member's rank for the points leaderboard and speed leaderboard.
+
+ Parameters:
+ - member: An instance of discord.Member representing the person who is trying to get their rank.
+ """
+ rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue)
+ # These are stored as strings so that the last digit can be determined to choose the suffix
+ try:
+ points_rank = str(list(self.points).index(member.id) + 1)
+ speed_rank = str(list(self.speed).index(member.id) + 1)
+ except ValueError:
+ return Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="It looks like you didn't participate in the Trivia Night event!",
+ color=Colours.soft_red
+ )
+
+ rank_embed.add_field(
+ name="Total Points",
+ value=(
+ f"You got {self._int_to_ordinal(int(points_rank))} place"
+ f" with {self.points[member.id]:.1f} points."
+ ),
+ inline=False
+ )
+
+ rank_embed.add_field(
+ name="Average Speed",
+ value=(
+ f"You got {self._int_to_ordinal(int(speed_rank))} place"
+ f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds."
+ ),
+ inline=False
+ )
+ return rank_embed
+
+ @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green)
+ async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the speed leaderboard embed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `Speed Leaderboard` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True)
+
+ @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple)
+ async def rank_button(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the user's rank for the overall points/average speed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `What's my rank?` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True)
+
+
+class Scoreboard:
+ """Class for the scoreboard for the Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self._bot = bot
+ self._points = {}
+ self._speed = {}
+
+ def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None:
+ """
+ Assign points or deduct points to/from a certain user.
+
+ This method should be called once the question has finished and all answers have been registered.
+ """
+ if points is not None and user_id not in self._points.keys():
+ self._points[user_id] = points
+ elif points is not None:
+ self._points[user_id] += points
+
+ if speed is not None and user_id not in self._speed.keys():
+ self._speed[user_id] = [1, speed]
+ elif speed is not None:
+ self._speed[user_id] = [
+ self._speed[user_id][0] + 1, self._speed[user_id][1] + speed
+ ]
+
+ async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]:
+ """Returns the embed of the main leaderboard along with the ScoreboardView."""
+ view = ScoreboardView(self._bot)
+
+ view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True))
+ view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0]))
+
+ return (
+ await view.create_main_leaderboard(),
+ view if not speed_leaderboard else await view._create_speed_embed()
+ )
diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py
new file mode 100644
index 00000000..18d8327a
--- /dev/null
+++ b/bot/exts/events/trivianight/trivianight.py
@@ -0,0 +1,328 @@
+import asyncio
+from json import JSONDecodeError, loads
+from random import choice
+from typing import Optional
+
+from discord import Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles
+from bot.utils.pagination import LinePaginator
+
+from ._game import AllQuestionsVisited, TriviaNightGame
+from ._questions import QuestionView
+from ._scoreboard import Scoreboard
+
+# The ID you see below are the Events Lead role ID and the Event Runner Role ID
+TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408)
+
+
+class TriviaNightCog(commands.Cog):
+ """Cog for the Python Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.game: Optional[TriviaNightGame] = None
+ self.scoreboard: Optional[Scoreboard] = None
+ self.question_closed: asyncio.Event = None
+
+ @commands.group(aliases=["tn"], invoke_without_command=True)
+ async def trivianight(self, ctx: commands.Context) -> None:
+ """
+ The command group for the Python Discord Trivia Night.
+
+ If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is.
+ """
+ cog_description = Embed(
+ title="What is .trivianight?",
+ description=(
+ "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other"
+ " players in a trivia about Python!"
+ ),
+ color=Colours.soft_green
+ )
+ await ctx.send(embed=cog_description)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None:
+ """
+ Loads a JSON file from the provided attachment or argument.
+
+ The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below:
+ - number: int (represents the current question #)
+ - description: str (represents the question itself)
+ - answers: list[str] (represents the different answers possible, must be a length of 4)
+ - correct: str (represents the correct answer in terms of what the correct answer is in `answers`
+ - time: Optional[int] (represents the timer for the question and how long it should run, default is 10)
+ - points: Optional[int] (represents how many points are awarded for each question, default is 10)
+
+ The load command accepts three different ways of loading in a JSON:
+ - an attachment of the JSON file
+ - a message link to the attachment/JSON
+ - reading the JSON itself via a codeblock or plain text
+ """
+ if self.game is not None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already a trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if ctx.message.attachments:
+ json_text = (await ctx.message.attachments[0].read()).decode("utf8")
+ elif not to_load:
+ raise commands.BadArgument("You didn't attach an attachment nor link a message!")
+ elif (
+ to_load.startswith("https://discord.com/channels")
+ or to_load.startswith("https://discordapp.com/channels")
+ ):
+ channel_id, message_id = to_load.split("/")[-2:]
+ channel = await ctx.guild.fetch_channel(int(channel_id))
+ message = await channel.fetch_message(int(message_id))
+ if message.attachments:
+ json_text = (await message.attachments[0].read()).decode("utf8")
+ else:
+ json_text = message.content.replace("```", "").replace("json", "").replace("\n", "")
+ else:
+ json_text = to_load.replace("```", "").replace("json", "").replace("\n", "")
+
+ try:
+ serialized_json = loads(json_text)
+ except JSONDecodeError as error:
+ raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}")
+
+ self.game = TriviaNightGame(serialized_json)
+ self.question_closed = asyncio.Event()
+
+ success_embed = Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The JSON was loaded successfully!",
+ color=Colours.soft_green
+ )
+
+ self.scoreboard = Scoreboard(self.bot)
+
+ await ctx.send(embed=success_embed)
+
+ @trivianight.command(aliases=('next',))
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def question(self, ctx: commands.Context, question_number: str = None) -> None:
+ """
+ Gets a random question from the unanswered question list and lets the user(s) choose the answer.
+
+ This command will continuously count down until the time limit of the question is exhausted.
+ However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already an ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ try:
+ next_question = self.game.next_question(question_number)
+ except AllQuestionsVisited:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="All of the questions have been used.",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ await ctx.send("Next question in 3 seconds! Get ready...")
+ await asyncio.sleep(3)
+
+ question_view = QuestionView(next_question)
+ question_embed = question_view.create_embed()
+
+ next_question.start()
+ message = await ctx.send(embed=question_embed, view=question_view)
+
+ # Exponentially sleep less and less until the time limit is reached
+ percentage = 1
+ while True:
+ percentage *= 0.5
+ duration = next_question.time * percentage
+
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+
+ if int(duration) > 1:
+ # It is quite ugly to display decimals, the delay for requests to reach Discord
+ # cause sub-second accuracy to be quite pointless.
+ await ctx.send(f"{int(duration)}s remaining...")
+ else:
+ # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a
+ # half, of that half) we must sleep both halves at the end.
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+ break
+
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def list(self, ctx: commands.Context) -> None:
+ """
+ Display all the questions left in the question bank.
+
+ Questions are displayed in the following format:
+ Q(number): Question description | :white_check_mark: if the question was used otherwise :x:.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ question_list = self.game.list_questions()
+
+ list_embed = Embed(title="All Trivia Night Questions")
+
+ if len(question_list) == 1:
+ list_embed.description = question_list[0]
+ await ctx.send(embed=list_embed)
+ else:
+ await LinePaginator.paginate(
+ question_list,
+ ctx,
+ list_embed
+ )
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def stop(self, ctx: commands.Context) -> None:
+ """
+ End the ongoing question to show the correct question.
+
+ This command should be used if the question should be ended early or if the time limit fails
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ self.question_closed.set()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard view.
+
+ The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+
+ The scoreboard view also has a button where the user can see their own rank, points and average speed if they
+ didn't make it onto the leaderboard.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, scoreboard_view = await self.scoreboard.display()
+ await ctx.send(embed=scoreboard_embed, view=scoreboard_view)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def scoreboard(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard.
+
+ The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True)
+ await ctx.send(embeds=(scoreboard_embed, speed_scoreboard))
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end_game(self, ctx: commands.Context) -> None:
+ """Ends the ongoing game."""
+ self.game = None
+
+ await ctx.send(embed=Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The game has been stopped.",
+ color=Colours.soft_green
+ ))
+
+
+def setup(bot: Bot) -> None:
+ """Load the TriviaNight cog."""
+ bot.add_cog(TriviaNightCog(bot))