aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts
diff options
context:
space:
mode:
authorGravatar Izan <[email protected]>2022-02-16 23:52:55 +0000
committerGravatar Izan <[email protected]>2022-02-16 23:52:55 +0000
commita921f487833de7cedb690bfa2468eb98d2392596 (patch)
treeed2a283206b122f31b0ff6e8166a6852bc6e88cd /bot/exts
parentMerge remote-tracking branch 'origin/main' into main (diff)
parentAdd topics for `#programming-pedagogy` channel (diff)
Merge remote-tracking branch 'origin/main' into main
Diffstat (limited to 'bot/exts')
-rw-r--r--bot/exts/events/advent_of_code/_cog.py59
-rw-r--r--bot/exts/events/advent_of_code/_helpers.py15
-rw-r--r--bot/exts/events/advent_of_code/views/dayandstarview.py8
-rw-r--r--bot/exts/events/trivianight/__init__.py0
-rw-r--r--bot/exts/events/trivianight/_game.py192
-rw-r--r--bot/exts/events/trivianight/_questions.py179
-rw-r--r--bot/exts/events/trivianight/_scoreboard.py186
-rw-r--r--bot/exts/events/trivianight/trivianight.py328
-rw-r--r--bot/exts/fun/madlibs.py148
-rw-r--r--bot/exts/holidays/valentines/lovecalculator.py6
-rw-r--r--bot/exts/utilities/epoch.py138
-rw-r--r--bot/exts/utilities/githubinfo.py216
-rw-r--r--bot/exts/utilities/issues.py277
13 files changed, 1409 insertions, 343 deletions
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py
index a9625153..518841d4 100644
--- a/bot/exts/events/advent_of_code/_cog.py
+++ b/bot/exts/events/advent_of_code/_cog.py
@@ -11,12 +11,13 @@ from discord.ext import commands, tasks
from bot.bot import Bot
from bot.constants import (
- AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS
+ AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, PYTHON_PREFIX, Roles, WHITELISTED_CHANNELS
)
from bot.exts.events.advent_of_code import _helpers
from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView
from bot.utils import members
from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role
+from bot.utils.exceptions import MovedCommandError
from bot.utils.extensions import invoke_help_command
log = logging.getLogger(__name__)
@@ -60,7 +61,8 @@ class AdventOfCode(commands.Cog):
self.status_task.set_name("AoC Status Countdown")
self.status_task.add_done_callback(_helpers.background_task_callback)
- self.completionist_task.start()
+ # Don't start task while event isn't running
+ # self.completionist_task.start()
@tasks.loop(minutes=10.0)
async def completionist_task(self) -> None:
@@ -95,7 +97,9 @@ class AdventOfCode(commands.Cog):
# Only give the role to people who have completed all 50 stars
continue
- member_id = aoc_name_to_member_id.get(member_aoc_info["name"], None)
+ aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}"
+
+ member_id = aoc_name_to_member_id.get(aoc_name)
if not member_id:
log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.")
continue
@@ -137,45 +141,17 @@ class AdventOfCode(commands.Cog):
@commands.guild_only()
@adventofcode_group.command(
name="subscribe",
- aliases=("sub", "notifications", "notify", "notifs"),
- brief="Notifications for new days"
+ aliases=("sub", "notifications", "notify", "notifs", "unsubscribe", "unsub"),
+ help=f"NOTE: This command has been moved to {PYTHON_PREFIX}subscribe",
)
@whitelist_override(channels=AOC_WHITELIST)
async def aoc_subscribe(self, ctx: commands.Context) -> None:
- """Assign the role for notifications about new days being ready."""
- current_year = datetime.now().year
- if current_year != AocConfig.year:
- await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!")
- return
-
- role = ctx.guild.get_role(AocConfig.role_id)
- unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe"
-
- if role not in ctx.author.roles:
- await ctx.author.add_roles(role)
- await ctx.send(
- "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. "
- f"You can run `{unsubscribe_command}` to disable them again for you."
- )
- else:
- await ctx.send(
- "Hey, you already are receiving notifications about new Advent of Code tasks. "
- f"If you don't want them any more, run `{unsubscribe_command}` instead."
- )
-
- @in_month(Month.DECEMBER)
- @commands.guild_only()
- @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days")
- @whitelist_override(channels=AOC_WHITELIST)
- async def aoc_unsubscribe(self, ctx: commands.Context) -> None:
- """Remove the role for notifications about new days being ready."""
- role = ctx.guild.get_role(AocConfig.role_id)
+ """
+ Deprecated role command.
- if role in ctx.author.roles:
- await ctx.author.remove_roles(role)
- await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.")
- else:
- await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.")
+ This command has been moved to bot, and will be removed in the future.
+ """
+ raise MovedCommandError(f"{PYTHON_PREFIX}subscribe")
@adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day")
@whitelist_override(channels=AOC_WHITELIST)
@@ -214,9 +190,10 @@ class AdventOfCode(commands.Cog):
async def join_leaderboard(self, ctx: commands.Context) -> None:
"""DM the user the information for joining the Python Discord leaderboard."""
current_date = datetime.now()
- if (
- current_date.month not in (Month.NOVEMBER, Month.DECEMBER) and current_date.year != AocConfig.year or
- current_date.month != Month.JANUARY and current_date.year != AocConfig.year + 1
+ allowed_months = (Month.NOVEMBER.value, Month.DECEMBER.value)
+ if not (
+ current_date.month in allowed_months and current_date.year == AocConfig.year or
+ current_date.month == Month.JANUARY.value and current_date.year == AocConfig.year + 1
):
# Only allow joining the leaderboard in the run up to AOC and the January following.
await ctx.send(f"The Python Discord leaderboard for {current_date.year} is not yet available!")
diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py
index 807cc275..6c004901 100644
--- a/bot/exts/events/advent_of_code/_helpers.py
+++ b/bot/exts/events/advent_of_code/_helpers.py
@@ -255,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]:
# Two attempts, one with the original session cookie and one with the fallback session
for attempt in range(1, 3):
- log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
+ log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
cookies = {"session": leaderboard.session}
try:
raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies)
@@ -332,7 +332,7 @@ async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name:
number_of_participants = len(leaderboard)
formatted_leaderboard = _format_leaderboard(leaderboard)
full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard)
- leaderboard_fetched_at = datetime.datetime.utcnow().isoformat()
+ leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
cached_leaderboard = {
"placement_leaderboard": json.dumps(raw_leaderboard_data),
@@ -368,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
"""Get an embed with the current summary stats of the leaderboard."""
leaderboard_url = leaderboard["full_leaderboard_url"]
refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60
+ refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp())
- aoc_embed = discord.Embed(
- colour=Colours.soft_green,
- timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]),
- description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*"
+ aoc_embed = discord.Embed(colour=Colours.soft_green)
+
+ aoc_embed.description = (
+ f"The leaderboard is refreshed every {refresh_minutes} minutes.\n"
+ f"Last Updated: <t:{refreshed_unix}:t>"
)
aoc_embed.add_field(
name="Number of Participants",
@@ -386,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
inline=True,
)
aoc_embed.set_author(name="Advent of Code", url=leaderboard_url)
- aoc_embed.set_footer(text="Last Updated")
aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL)
return aoc_embed
diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py
index a0bfa316..5529c12b 100644
--- a/bot/exts/events/advent_of_code/views/dayandstarview.py
+++ b/bot/exts/events/advent_of_code/views/dayandstarview.py
@@ -42,7 +42,13 @@ class AoCDropdownView(discord.ui.View):
async def interaction_check(self, interaction: discord.Interaction) -> bool:
"""Global check to ensure that the interacting user is the user who invoked the command originally."""
- return interaction.user == self.original_author
+ if interaction.user != self.original_author:
+ await interaction.response.send_message(
+ ":x: You can't interact with someone else's response. Please run the command yourself!",
+ ephemeral=True
+ )
+ return False
+ return True
@discord.ui.select(
placeholder="Day",
diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/events/trivianight/__init__.py
diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py
new file mode 100644
index 00000000..8b012a17
--- /dev/null
+++ b/bot/exts/events/trivianight/_game.py
@@ -0,0 +1,192 @@
+import time
+from random import randrange
+from string import ascii_uppercase
+from typing import Iterable, NamedTuple, Optional, TypedDict
+
+DEFAULT_QUESTION_POINTS = 10
+DEFAULT_QUESTION_TIME = 20
+
+
+class QuestionData(TypedDict):
+ """Representing the different 'keys' of the question taken from the JSON."""
+
+ number: str
+ description: str
+ answers: list[str]
+ correct: str
+ points: Optional[int]
+ time: Optional[int]
+
+
+class UserGuess(NamedTuple):
+ """Represents the user's guess for a question."""
+
+ answer: str
+ editable: bool
+ elapsed: float
+
+
+class QuestionClosed(RuntimeError):
+ """Exception raised when the question is not open for guesses anymore."""
+
+
+class AlreadyUpdated(RuntimeError):
+ """Exception raised when the user has already updated their guess once."""
+
+
+class AllQuestionsVisited(RuntimeError):
+ """Exception raised when all of the questions have been visited."""
+
+
+class Question:
+ """Interface for one question in a trivia night game."""
+
+ def __init__(self, data: QuestionData):
+ self._data = data
+ self._guesses: dict[int, UserGuess] = {}
+ self._started = None
+
+ # These properties are mostly proxies to the underlying data:
+
+ @property
+ def number(self) -> str:
+ """The number of the question."""
+ return self._data["number"]
+
+ @property
+ def description(self) -> str:
+ """The description of the question."""
+ return self._data["description"]
+
+ @property
+ def answers(self) -> list[tuple[str, str]]:
+ """
+ The possible answers for this answer.
+
+ This is a property that returns a list of letter, answer pairs.
+ """
+ return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])]
+
+ @property
+ def correct(self) -> str:
+ """The correct answer for this question."""
+ return self._data["correct"]
+
+ @property
+ def max_points(self) -> int:
+ """The maximum points that can be awarded for this question."""
+ return self._data.get("points") or DEFAULT_QUESTION_POINTS
+
+ @property
+ def time(self) -> float:
+ """The time allowed to answer the question."""
+ return self._data.get("time") or DEFAULT_QUESTION_TIME
+
+ def start(self) -> float:
+ """Start the question and return the time it started."""
+ self._started = time.perf_counter()
+ return self._started
+
+ def _update_guess(self, user: int, answer: str) -> UserGuess:
+ """Update an already existing guess."""
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ if self._guesses[user][1] is False:
+ raise AlreadyUpdated(f"User({user}) has already updated their guess once.")
+
+ self._guesses[user] = (answer, False, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def guess(self, user: int, answer: str) -> UserGuess:
+ """Add a guess made by a user to the current question."""
+ if user in self._guesses:
+ return self._update_guess(user, answer)
+
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ self._guesses[user] = (answer, True, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def stop(self) -> dict[int, UserGuess]:
+ """Stop the question and return the guesses that were made."""
+ guesses = self._guesses
+
+ self._started = None
+ self._guesses = {}
+
+ return guesses
+
+
+class TriviaNightGame:
+ """Interface for managing a game of trivia night."""
+
+ def __init__(self, data: list[QuestionData]) -> None:
+ self._questions = [Question(q) for q in data]
+ # A copy of the questions to keep for `.trivianight list`
+ self._all_questions = list(self._questions)
+ self.current_question: Optional[Question] = None
+ self._points = {}
+ self._speed = {}
+
+ def __iter__(self) -> Iterable[Question]:
+ return iter(self._questions)
+
+ def next_question(self, number: str = None) -> Question:
+ """
+ Consume one random question from the trivia night game.
+
+ One question is randomly picked from the list of questions which is then removed and returned.
+ """
+ if self.current_question is not None:
+ raise RuntimeError("Cannot call next_question() when there is a current question.")
+
+ if number is not None:
+ try:
+ question = [q for q in self._all_questions if q.number == int(number)][0]
+ except IndexError:
+ raise ValueError(f"Question number {number} does not exist.")
+ elif len(self._questions) == 0:
+ raise AllQuestionsVisited("All of the questions have been visited.")
+ else:
+ question = self._questions.pop(randrange(len(self._questions)))
+
+ self.current_question = question
+ return question
+
+ def end_question(self) -> None:
+ """
+ End the current question.
+
+ This method should be called when the question has been answered, it must be called before
+ attempting to call `next_question()` again.
+ """
+ if self.current_question is None:
+ raise RuntimeError("Cannot call end_question() when there is no current question.")
+
+ self.current_question.stop()
+ self.current_question = None
+
+ def list_questions(self) -> str:
+ """
+ List all the questions.
+
+ This method should be called when `.trivianight list` is called to display the following information:
+ - Question number
+ - Question description
+ - Visited/not visited
+ """
+ question_list = []
+
+ visited = ":white_check_mark:"
+ not_visited = ":x:"
+
+ for question in self._all_questions:
+ formatted_string = (
+ f"**Q{question.number}** {not_visited if question in self._questions else visited}"
+ f"\n{question.description}\n\n"
+ )
+ question_list.append(formatted_string.rstrip())
+
+ return question_list
diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py
new file mode 100644
index 00000000..d6beced9
--- /dev/null
+++ b/bot/exts/events/trivianight/_questions.py
@@ -0,0 +1,179 @@
+from random import choice
+from string import ascii_uppercase
+
+import discord
+from discord import Embed, Interaction
+from discord.ui import Button, View
+
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+from ._game import AlreadyUpdated, Question, QuestionClosed
+from ._scoreboard import Scoreboard
+
+
+class AnswerButton(Button):
+ """Button subclass that's used to guess on a particular answer."""
+
+ def __init__(self, label: str, question: Question):
+ super().__init__(label=label, style=discord.ButtonStyle.green)
+
+ self.question = question
+
+ async def callback(self, interaction: Interaction) -> None:
+ """
+ When a user interacts with the button, this will be called.
+
+ Parameters:
+ - interaction: an instance of discord.Interaction representing the interaction between the user and the
+ button.
+ """
+ try:
+ guess = self.question.guess(interaction.user.id, self.label)
+ except AlreadyUpdated:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You've already changed your answer more than once!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+ except QuestionClosed:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="The question is no longer accepting guesses!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+
+ if guess[1]:
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You chose answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+ else:
+ # guess[1] is False and they cannot change their answer again. Which
+ # indicates that they changed it this time around.
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You changed your answer to answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+
+
+class QuestionView(View):
+ """View for one trivia night question."""
+
+ def __init__(self, question: Question) -> None:
+ super().__init__()
+ self.question = question
+
+ for letter, _ in self.question.answers:
+ self.add_item(AnswerButton(letter, self.question))
+
+ @staticmethod
+ def unicodeify(text: str) -> str:
+ """
+ Takes `text` and adds zero-width spaces to prevent copy and pasting the question.
+
+ Parameters:
+ - text: A string that represents the question description to 'unicodeify'
+ """
+ return "".join(
+ f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter
+ for idx, letter in enumerate(text)
+ )
+
+ def create_embed(self) -> Embed:
+ """Helper function to create the embed for the current question."""
+ question_embed = Embed(
+ title=f"Question {self.question.number}",
+ description=self.unicodeify(self.question.description),
+ color=Colours.python_yellow
+ )
+
+ for label, answer in self.question.answers:
+ question_embed.add_field(name=f"Answer {label}", value=answer, inline=False)
+
+ return question_embed
+
+ def end_question(self, scoreboard: Scoreboard) -> Embed:
+ """
+ Ends the question and displays the statistics on who got the question correct, awards points, etc.
+
+ Returns:
+ An embed displaying the correct answers and the % of people that chose each answer.
+ """
+ guesses = self.question.stop()
+
+ labels = ascii_uppercase[:len(self.question.answers)]
+
+ answer_embed = Embed(
+ title=f"The correct answer for Question {self.question.number} was...",
+ description=self.question.correct
+ )
+
+ if len(guesses) != 0:
+ answers_chosen = {
+ answer_choice: len(
+ tuple(filter(lambda x: x[0] == answer_choice, guesses.values()))
+ )
+ for answer_choice in labels
+ }
+
+ answers_chosen = dict(
+ sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True)
+ )
+
+ for answer, people_answered in answers_chosen.items():
+ is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct
+
+ # Setting the color of answer_embed to the % of people that got it correct via the mapping
+ if is_correct_answer:
+ # Maps the % of people who got it right to a color, from a range of red to green
+ percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB]
+ answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25]
+
+ field_title = (
+ (":white_check_mark: " if is_correct_answer else "")
+ + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose"
+ )
+
+ # The `ord` function is used here to change the letter to its corresponding position
+ answer_embed.add_field(
+ name=field_title,
+ value=self.question.answers[ord(answer) - 65][1],
+ inline=False
+ )
+
+ # Assign points to users
+ for user_id, answer in guesses.items():
+ if dict(self.question.answers)[answer[0]] == self.question.correct:
+ scoreboard.assign_points(
+ int(user_id),
+ points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points,
+ speed=answer[-1]
+ )
+ elif answer[-1] <= 2:
+ scoreboard.assign_points(
+ int(user_id),
+ points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points
+ )
+ else:
+ scoreboard.assign_points(
+ int(user_id),
+ points=0
+ )
+
+ return answer_embed
diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py
new file mode 100644
index 00000000..a5a5fcac
--- /dev/null
+++ b/bot/exts/events/trivianight/_scoreboard.py
@@ -0,0 +1,186 @@
+from random import choice
+
+import discord.ui
+from discord import ButtonStyle, Embed, Interaction, Member
+from discord.ui import Button, View
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+
+class ScoreboardView(View):
+ """View for the scoreboard."""
+
+ def __init__(self, bot: Bot):
+ super().__init__()
+ self.bot = bot
+
+ @staticmethod
+ def _int_to_ordinal(number: int) -> str:
+ """
+ Converts an integer into an ordinal number, i.e. 1 to 1st.
+
+ Parameters:
+ - number: an integer representing the number to convert to an ordinal number.
+ """
+ suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)]
+ if (number % 100) in {11, 12, 13}:
+ suffix = "th"
+
+ return str(number) + suffix
+
+ async def create_main_leaderboard(self) -> Embed:
+ """
+ Helper function that iterates through `self.points` to generate the main leaderboard embed.
+
+ The main leaderboard would be formatted like the following:
+ **1**. @mention of the user (# of points)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, points) in enumerate(self.points.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({points:.1f} pts)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ main_embed = Embed(
+ title="Winners of the Trivia Night",
+ description=formatted_string,
+ color=Colours.python_blue,
+ )
+
+ return main_embed
+
+ async def _create_speed_embed(self) -> Embed:
+ """
+ Helper function that iterates through `self.speed` to generate a leaderboard embed.
+
+ The speed leaderboard would be formatted like the following:
+ **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, time_taken) in enumerate(self.speed.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ speed_embed = Embed(
+ title="Average time taken to answer a question",
+ description=formatted_string,
+ color=Colours.python_blue
+ )
+ return speed_embed
+
+ def _get_rank(self, member: Member) -> Embed:
+ """
+ Gets the member's rank for the points leaderboard and speed leaderboard.
+
+ Parameters:
+ - member: An instance of discord.Member representing the person who is trying to get their rank.
+ """
+ rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue)
+ # These are stored as strings so that the last digit can be determined to choose the suffix
+ try:
+ points_rank = str(list(self.points).index(member.id) + 1)
+ speed_rank = str(list(self.speed).index(member.id) + 1)
+ except ValueError:
+ return Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="It looks like you didn't participate in the Trivia Night event!",
+ color=Colours.soft_red
+ )
+
+ rank_embed.add_field(
+ name="Total Points",
+ value=(
+ f"You got {self._int_to_ordinal(int(points_rank))} place"
+ f" with {self.points[member.id]:.1f} points."
+ ),
+ inline=False
+ )
+
+ rank_embed.add_field(
+ name="Average Speed",
+ value=(
+ f"You got {self._int_to_ordinal(int(speed_rank))} place"
+ f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds."
+ ),
+ inline=False
+ )
+ return rank_embed
+
+ @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green)
+ async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the speed leaderboard embed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `Speed Leaderboard` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True)
+
+ @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple)
+ async def rank_button(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the user's rank for the overall points/average speed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `What's my rank?` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True)
+
+
+class Scoreboard:
+ """Class for the scoreboard for the Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self._bot = bot
+ self._points = {}
+ self._speed = {}
+
+ def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None:
+ """
+ Assign points or deduct points to/from a certain user.
+
+ This method should be called once the question has finished and all answers have been registered.
+ """
+ if points is not None and user_id not in self._points.keys():
+ self._points[user_id] = points
+ elif points is not None:
+ self._points[user_id] += points
+
+ if speed is not None and user_id not in self._speed.keys():
+ self._speed[user_id] = [1, speed]
+ elif speed is not None:
+ self._speed[user_id] = [
+ self._speed[user_id][0] + 1, self._speed[user_id][1] + speed
+ ]
+
+ async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]:
+ """Returns the embed of the main leaderboard along with the ScoreboardView."""
+ view = ScoreboardView(self._bot)
+
+ view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True))
+ view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0]))
+
+ return (
+ await view.create_main_leaderboard(),
+ view if not speed_leaderboard else await view._create_speed_embed()
+ )
diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py
new file mode 100644
index 00000000..18d8327a
--- /dev/null
+++ b/bot/exts/events/trivianight/trivianight.py
@@ -0,0 +1,328 @@
+import asyncio
+from json import JSONDecodeError, loads
+from random import choice
+from typing import Optional
+
+from discord import Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles
+from bot.utils.pagination import LinePaginator
+
+from ._game import AllQuestionsVisited, TriviaNightGame
+from ._questions import QuestionView
+from ._scoreboard import Scoreboard
+
+# The ID you see below are the Events Lead role ID and the Event Runner Role ID
+TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408)
+
+
+class TriviaNightCog(commands.Cog):
+ """Cog for the Python Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.game: Optional[TriviaNightGame] = None
+ self.scoreboard: Optional[Scoreboard] = None
+ self.question_closed: asyncio.Event = None
+
+ @commands.group(aliases=["tn"], invoke_without_command=True)
+ async def trivianight(self, ctx: commands.Context) -> None:
+ """
+ The command group for the Python Discord Trivia Night.
+
+ If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is.
+ """
+ cog_description = Embed(
+ title="What is .trivianight?",
+ description=(
+ "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other"
+ " players in a trivia about Python!"
+ ),
+ color=Colours.soft_green
+ )
+ await ctx.send(embed=cog_description)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None:
+ """
+ Loads a JSON file from the provided attachment or argument.
+
+ The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below:
+ - number: int (represents the current question #)
+ - description: str (represents the question itself)
+ - answers: list[str] (represents the different answers possible, must be a length of 4)
+ - correct: str (represents the correct answer in terms of what the correct answer is in `answers`
+ - time: Optional[int] (represents the timer for the question and how long it should run, default is 10)
+ - points: Optional[int] (represents how many points are awarded for each question, default is 10)
+
+ The load command accepts three different ways of loading in a JSON:
+ - an attachment of the JSON file
+ - a message link to the attachment/JSON
+ - reading the JSON itself via a codeblock or plain text
+ """
+ if self.game is not None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already a trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if ctx.message.attachments:
+ json_text = (await ctx.message.attachments[0].read()).decode("utf8")
+ elif not to_load:
+ raise commands.BadArgument("You didn't attach an attachment nor link a message!")
+ elif (
+ to_load.startswith("https://discord.com/channels")
+ or to_load.startswith("https://discordapp.com/channels")
+ ):
+ channel_id, message_id = to_load.split("/")[-2:]
+ channel = await ctx.guild.fetch_channel(int(channel_id))
+ message = await channel.fetch_message(int(message_id))
+ if message.attachments:
+ json_text = (await message.attachments[0].read()).decode("utf8")
+ else:
+ json_text = message.content.replace("```", "").replace("json", "").replace("\n", "")
+ else:
+ json_text = to_load.replace("```", "").replace("json", "").replace("\n", "")
+
+ try:
+ serialized_json = loads(json_text)
+ except JSONDecodeError as error:
+ raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}")
+
+ self.game = TriviaNightGame(serialized_json)
+ self.question_closed = asyncio.Event()
+
+ success_embed = Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The JSON was loaded successfully!",
+ color=Colours.soft_green
+ )
+
+ self.scoreboard = Scoreboard(self.bot)
+
+ await ctx.send(embed=success_embed)
+
+ @trivianight.command(aliases=('next',))
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def question(self, ctx: commands.Context, question_number: str = None) -> None:
+ """
+ Gets a random question from the unanswered question list and lets the user(s) choose the answer.
+
+ This command will continuously count down until the time limit of the question is exhausted.
+ However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already an ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ try:
+ next_question = self.game.next_question(question_number)
+ except AllQuestionsVisited:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="All of the questions have been used.",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ await ctx.send("Next question in 3 seconds! Get ready...")
+ await asyncio.sleep(3)
+
+ question_view = QuestionView(next_question)
+ question_embed = question_view.create_embed()
+
+ next_question.start()
+ message = await ctx.send(embed=question_embed, view=question_view)
+
+ # Exponentially sleep less and less until the time limit is reached
+ percentage = 1
+ while True:
+ percentage *= 0.5
+ duration = next_question.time * percentage
+
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+
+ if int(duration) > 1:
+ # It is quite ugly to display decimals, the delay for requests to reach Discord
+ # cause sub-second accuracy to be quite pointless.
+ await ctx.send(f"{int(duration)}s remaining...")
+ else:
+ # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a
+ # half, of that half) we must sleep both halves at the end.
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+ break
+
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def list(self, ctx: commands.Context) -> None:
+ """
+ Display all the questions left in the question bank.
+
+ Questions are displayed in the following format:
+ Q(number): Question description | :white_check_mark: if the question was used otherwise :x:.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ question_list = self.game.list_questions()
+
+ list_embed = Embed(title="All Trivia Night Questions")
+
+ if len(question_list) == 1:
+ list_embed.description = question_list[0]
+ await ctx.send(embed=list_embed)
+ else:
+ await LinePaginator.paginate(
+ question_list,
+ ctx,
+ list_embed
+ )
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def stop(self, ctx: commands.Context) -> None:
+ """
+ End the ongoing question to show the correct question.
+
+ This command should be used if the question should be ended early or if the time limit fails
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ self.question_closed.set()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard view.
+
+ The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+
+ The scoreboard view also has a button where the user can see their own rank, points and average speed if they
+ didn't make it onto the leaderboard.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, scoreboard_view = await self.scoreboard.display()
+ await ctx.send(embed=scoreboard_embed, view=scoreboard_view)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def scoreboard(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard.
+
+ The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True)
+ await ctx.send(embeds=(scoreboard_embed, speed_scoreboard))
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end_game(self, ctx: commands.Context) -> None:
+ """Ends the ongoing game."""
+ self.game = None
+
+ await ctx.send(embed=Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The game has been stopped.",
+ color=Colours.soft_green
+ ))
+
+
+def setup(bot: Bot) -> None:
+ """Load the TriviaNight cog."""
+ bot.add_cog(TriviaNightCog(bot))
diff --git a/bot/exts/fun/madlibs.py b/bot/exts/fun/madlibs.py
new file mode 100644
index 00000000..21708e53
--- /dev/null
+++ b/bot/exts/fun/madlibs.py
@@ -0,0 +1,148 @@
+import json
+from asyncio import TimeoutError
+from pathlib import Path
+from random import choice
+from typing import TypedDict
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+TIMEOUT = 60.0
+
+
+class MadlibsTemplate(TypedDict):
+ """Structure of a template in the madlibs JSON file."""
+
+ title: str
+ blanks: list[str]
+ value: list[str]
+
+
+class Madlibs(commands.Cog):
+ """Cog for the Madlibs game."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.templates = self._load_templates()
+ self.edited_content = {}
+ self.checks = set()
+
+ @staticmethod
+ def _load_templates() -> list[MadlibsTemplate]:
+ madlibs_stories = Path("bot/resources/fun/madlibs_templates.json")
+
+ with open(madlibs_stories) as file:
+ return json.load(file)
+
+ @staticmethod
+ def madlibs_embed(part_of_speech: str, number_of_inputs: int) -> discord.Embed:
+ """Method to generate an embed with the game information."""
+ madlibs_embed = discord.Embed(title="Madlibs", color=Colours.python_blue)
+
+ madlibs_embed.add_field(
+ name="Enter a word that fits the given part of speech!",
+ value=f"Part of speech: {part_of_speech}\n\nMake sure not to spam, or you may get auto-muted!"
+ )
+
+ madlibs_embed.set_footer(text=f"Inputs remaining: {number_of_inputs}")
+
+ return madlibs_embed
+
+ @commands.Cog.listener()
+ async def on_message_edit(self, _: discord.Message, after: discord.Message) -> None:
+ """A listener that checks for message edits from the user."""
+ for check in self.checks:
+ if check(after):
+ break
+ else:
+ return
+
+ self.edited_content[after.id] = after.content
+
+ @commands.command()
+ @commands.max_concurrency(1, per=commands.BucketType.user)
+ async def madlibs(self, ctx: commands.Context) -> None:
+ """
+ Play Madlibs with the bot!
+
+ Madlibs is a game where the player is asked to enter a word that
+ fits a random part of speech (e.g. noun, adjective, verb, plural noun, etc.)
+ a random amount of times, depending on the story chosen by the bot at the beginning.
+ """
+ random_template = choice(self.templates)
+
+ def author_check(message: discord.Message) -> bool:
+ return message.channel.id == ctx.channel.id and message.author.id == ctx.author.id
+
+ self.checks.add(author_check)
+
+ loading_embed = discord.Embed(
+ title="Madlibs", description="Loading your Madlibs game...", color=Colours.python_blue
+ )
+ original_message = await ctx.send(embed=loading_embed)
+
+ submitted_words = {}
+
+ for i, part_of_speech in enumerate(random_template["blanks"]):
+ inputs_left = len(random_template["blanks"]) - i
+
+ madlibs_embed = self.madlibs_embed(part_of_speech, inputs_left)
+ await original_message.edit(embed=madlibs_embed)
+
+ try:
+ message = await self.bot.wait_for(event="message", check=author_check, timeout=TIMEOUT)
+ except TimeoutError:
+ timeout_embed = discord.Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="Uh oh! You took too long to respond!",
+ color=Colours.soft_red
+ )
+
+ await ctx.send(ctx.author.mention, embed=timeout_embed)
+
+ for msg_id in submitted_words:
+ self.edited_content.pop(msg_id, submitted_words[msg_id])
+
+ self.checks.remove(author_check)
+
+ return
+
+ submitted_words[message.id] = message.content
+
+ blanks = [self.edited_content.pop(msg_id, submitted_words[msg_id]) for msg_id in submitted_words]
+
+ self.checks.remove(author_check)
+
+ story = []
+ for value, blank in zip(random_template["value"], blanks):
+ story.append(f"{value}__{blank}__")
+
+ # In each story template, there is always one more "value"
+ # (fragment from the story) than there are blanks (words that the player enters)
+ # so we need to compensate by appending the last line of the story again.
+ story.append(random_template["value"][-1])
+
+ story_embed = discord.Embed(
+ title=random_template["title"],
+ description="".join(story),
+ color=Colours.bright_green
+ )
+
+ story_embed.set_footer(text=f"Generated for {ctx.author}", icon_url=ctx.author.display_avatar.url)
+
+ await ctx.send(embed=story_embed)
+
+ @madlibs.error
+ async def handle_madlibs_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
+ """Error handler for the Madlibs command."""
+ if isinstance(error, commands.MaxConcurrencyReached):
+ await ctx.send("You are already playing Madlibs!")
+ error.handled = True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Madlibs cog."""
+ bot.add_cog(Madlibs(bot))
diff --git a/bot/exts/holidays/valentines/lovecalculator.py b/bot/exts/holidays/valentines/lovecalculator.py
index a53014e5..99fba150 100644
--- a/bot/exts/holidays/valentines/lovecalculator.py
+++ b/bot/exts/holidays/valentines/lovecalculator.py
@@ -12,7 +12,7 @@ from discord.ext import commands
from discord.ext.commands import BadArgument, Cog, clean_content
from bot.bot import Bot
-from bot.constants import Channels, Client, Lovefest, Month
+from bot.constants import Channels, Lovefest, Month, PYTHON_PREFIX
from bot.utils.decorators import in_month
log = logging.getLogger(__name__)
@@ -51,7 +51,7 @@ class LoveCalculator(Cog):
raise BadArgument(
"This command can only be ran against members with the lovefest role! "
"This role be can assigned by running "
- f"`{Client.prefix}lovefest sub` in <#{Channels.community_bot_commands}>."
+ f"`{PYTHON_PREFIX}subscribe` in <#{Channels.bot}>."
)
if whom is None:
@@ -90,7 +90,7 @@ class LoveCalculator(Cog):
name="A letter from Dr. Love:",
value=data["text"]
)
- embed.set_footer(text=f"You can unsubscribe from lovefest by using {Client.prefix}lovefest unsub")
+ embed.set_footer(text=f"You can unsubscribe from lovefest by using {PYTHON_PREFIX}subscribe.")
await ctx.send(embed=embed)
diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py
new file mode 100644
index 00000000..42312dd1
--- /dev/null
+++ b/bot/exts/utilities/epoch.py
@@ -0,0 +1,138 @@
+from typing import Optional, Union
+
+import arrow
+import discord
+from dateutil import parser
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.utils.extensions import invoke_help_command
+
+# https://discord.com/developers/docs/reference#message-formatting-timestamp-styles
+STYLES = {
+ "Epoch": ("",),
+ "Short Time": ("t", "h:mm A",),
+ "Long Time": ("T", "h:mm:ss A"),
+ "Short Date": ("d", "MM/DD/YYYY"),
+ "Long Date": ("D", "MMMM D, YYYY"),
+ "Short Date/Time": ("f", "MMMM D, YYYY h:mm A"),
+ "Long Date/Time": ("F", "dddd, MMMM D, YYYY h:mm A"),
+ "Relative Time": ("R",)
+}
+DROPDOWN_TIMEOUT = 60
+
+
+class DateString(commands.Converter):
+ """Convert a relative or absolute date/time string to an arrow.Arrow object."""
+
+ async def convert(self, ctx: commands.Context, argument: str) -> Union[arrow.Arrow, Optional[tuple]]:
+ """
+ Convert a relative or absolute date/time string to an arrow.Arrow object.
+
+ Try to interpret the date string as a relative time. If conversion fails, try to interpret it as an absolute
+ time. Tokens that are not recognised are returned along with the part of the string that was successfully
+ converted to an arrow object. If the date string cannot be parsed, BadArgument is raised.
+ """
+ try:
+ return arrow.utcnow().dehumanize(argument)
+ except (ValueError, OverflowError):
+ try:
+ dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True)
+ except parser.ParserError:
+ raise commands.BadArgument(f"`{argument}` Could not be parsed to a relative or absolute date.")
+ except OverflowError:
+ raise commands.BadArgument(f"`{argument}` Results in a date outside of the supported range.")
+ return arrow.get(dt), ignored_tokens
+
+
+class Epoch(commands.Cog):
+ """Convert an entered time and date to a unix timestamp."""
+
+ @commands.command(name="epoch")
+ async def epoch(self, ctx: commands.Context, *, date_time: DateString = None) -> None:
+ """
+ Convert an entered date/time string to the equivalent epoch.
+
+ **Relative time**
+ Must begin with `in...` or end with `...ago`.
+ Accepted units: "seconds", "minutes", "hours", "days", "weeks", "months", "years".
+ eg `.epoch in a month 4 days and 2 hours`
+
+ **Absolute time**
+ eg `.epoch 2022/6/15 16:43 -04:00`
+ Absolute times must be entered in descending orders of magnitude.
+ If AM or PM is left unspecified, the 24-hour clock is assumed.
+ Timezones are optional, and will default to UTC. The following timezone formats are accepted:
+ Z (UTC)
+ ±HH:MM
+ ±HHMM
+ ±HH
+
+ Times in the dropdown are shown in UTC
+ """
+ if not date_time:
+ await invoke_help_command(ctx)
+ return
+
+ if isinstance(date_time, tuple):
+ # Remove empty strings. Strip extra whitespace from the remaining items
+ ignored_tokens = list(map(str.strip, filter(str.strip, date_time[1])))
+ date_time = date_time[0]
+ if ignored_tokens:
+ await ctx.send(f"Could not parse the following token(s): `{', '.join(ignored_tokens)}`")
+ await ctx.send(f"Date and time parsed as: `{date_time.format(arrow.FORMAT_RSS)}`")
+
+ epoch = int(date_time.timestamp())
+ view = TimestampMenuView(ctx, self._format_dates(date_time), epoch)
+ original = await ctx.send(f"`{epoch}`", view=view)
+ await view.wait() # wait until expiration before removing the dropdown
+ try:
+ await original.edit(view=None)
+ except discord.NotFound: # disregard the error message if the message is deleled
+ pass
+
+ @staticmethod
+ def _format_dates(date: arrow.Arrow) -> list[str]:
+ """
+ Return a list of date strings formatted according to the discord timestamp styles.
+
+ These are used in the description of each style in the dropdown
+ """
+ date = date.to('utc')
+ formatted = [str(int(date.timestamp()))]
+ formatted += [date.format(format[1]) for format in list(STYLES.values())[1:7]]
+ formatted.append(date.humanize())
+ return formatted
+
+
+class TimestampMenuView(discord.ui.View):
+ """View for the epoch command which contains a single `discord.ui.Select` dropdown component."""
+
+ def __init__(self, ctx: commands.Context, formatted_times: list[str], epoch: int):
+ super().__init__(timeout=DROPDOWN_TIMEOUT)
+ self.ctx = ctx
+ self.epoch = epoch
+ self.dropdown: discord.ui.Select = self.children[0]
+ for label, date_time in zip(STYLES.keys(), formatted_times):
+ self.dropdown.add_option(label=label, description=date_time)
+
+ @discord.ui.select(placeholder="Select the format of your timestamp")
+ async def select_format(self, _: discord.ui.Select, interaction: discord.Interaction) -> discord.Message:
+ """Drop down menu which contains a list of formats which discord timestamps can take."""
+ selected = interaction.data["values"][0]
+ if selected == "Epoch":
+ return await interaction.response.edit_message(content=f"`{self.epoch}`")
+ return await interaction.response.edit_message(content=f"`<t:{self.epoch}:{STYLES[selected][0]}>`")
+
+ async def interaction_check(self, interaction: discord.Interaction) -> bool:
+ """Check to ensure that the interacting user is the user who invoked the command."""
+ if interaction.user != self.ctx.author:
+ embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.")
+ await interaction.response.send_message(embed=embed, ephemeral=True)
+ return False
+ return True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Epoch cog."""
+ bot.add_cog(Epoch())
diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py
index 539e388b..963f54e5 100644
--- a/bot/exts/utilities/githubinfo.py
+++ b/bot/exts/utilities/githubinfo.py
@@ -1,30 +1,165 @@
import logging
import random
+import re
+import typing as t
+from dataclasses import dataclass
from datetime import datetime
-from urllib.parse import quote, quote_plus
+from urllib.parse import quote
import discord
+from aiohttp import ClientResponse
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES
+from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens
from bot.exts.core.extensions import invoke_help_command
log = logging.getLogger(__name__)
GITHUB_API_URL = "https://api.github.com"
+REQUEST_HEADERS = {
+ "Accept": "application/vnd.github.v3+json"
+}
+
+REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
+ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
+PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
+
+if Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}"
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+# Maximum number of issues in one message
+MAXIMUM_ISSUES = 5
+
+# Regex used when looking for automatic linking in messages
+# regex101 of current regex https://regex101.com/r/V2ji8M/6
+AUTOMATIC_REGEX = re.compile(
+ r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
+)
+
+
+@dataclass(eq=True, frozen=True)
+class FoundIssue:
+ """Dataclass representing an issue found by the regex."""
+
+ organisation: t.Optional[str]
+ repository: str
+ number: str
+
+
+@dataclass(eq=True, frozen=True)
+class FetchError:
+ """Dataclass representing an error while fetching an issue."""
+
+ return_code: int
+ message: str
+
+
+@dataclass(eq=True, frozen=True)
+class IssueState:
+ """Dataclass representing the state of an issue."""
+
+ repository: str
+ number: int
+ url: str
+ title: str
+ emoji: str
+
class GithubInfo(commands.Cog):
- """Fetches info from GitHub."""
+ """A Cog that fetches info from GitHub."""
def __init__(self, bot: Bot):
self.bot = bot
+ self.repos = []
+
+ @staticmethod
+ def remove_codeblocks(message: str) -> str:
+ """Remove any codeblock in a message."""
+ return CODE_BLOCK_RE.sub("", message)
+
+ async def fetch_issue(
+ self,
+ number: int,
+ repository: str,
+ user: str
+ ) -> t.Union[IssueState, FetchError]:
+ """
+ Retrieve an issue from a GitHub repository.
+
+ Returns IssueState on success, FetchError on failure.
+ """
+ url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
+ pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
+
+ json_data, r = await self.fetch_data(url)
+
+ if r.status == 403:
+ if r.headers.get("X-RateLimit-Remaining") == "0":
+ log.info(f"Ratelimit reached while fetching {url}")
+ return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
+ return FetchError(403, "Cannot access issue.")
+ elif r.status in (404, 410):
+ return FetchError(r.status, "Issue not found.")
+ elif r.status != 200:
+ return FetchError(r.status, "Error while fetching issue.")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data["html_url"]:
+ if json_data.get("state") == "open":
+ emoji = Emojis.issue_open
+ else:
+ emoji = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ pull_data, _ = await self.fetch_data(pulls_url)
+ if pull_data["draft"]:
+ emoji = Emojis.pull_request_draft
+ elif pull_data["state"] == "open":
+ emoji = Emojis.pull_request_open
+ # When 'merged_at' is not None, this means that the state of the PR is merged
+ elif pull_data["merged_at"] is not None:
+ emoji = Emojis.pull_request_merged
+ else:
+ emoji = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
- async def fetch_data(self, url: str) -> dict:
- """Retrieve data as a dictionary."""
- async with self.bot.http_session.get(url) as r:
- return await r.json()
+ return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
+
+ @staticmethod
+ def format_embed(
+ results: t.List[t.Union[IssueState, FetchError]]
+ ) -> discord.Embed:
+ """Take a list of IssueState or FetchError and format a Discord embed for them."""
+ description_list = []
+
+ for result in results:
+ if isinstance(result, IssueState):
+ description_list.append(f"{result.emoji} [{result.title}]({result.url})")
+ elif isinstance(result, FetchError):
+ description_list.append(f":x: [{result.return_code}] {result.message}")
+
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description="\n".join(description_list)
+ )
+
+ resp.set_author(name="GitHub")
+ return resp
@commands.group(name="github", aliases=("gh", "git"))
@commands.cooldown(1, 10, commands.BucketType.user)
@@ -33,11 +168,67 @@ class GithubInfo(commands.Cog):
if ctx.invoked_subcommand is None:
await invoke_help_command(ctx)
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """
+ Automatic issue linking.
+
+ Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
+ """
+ # Ignore bots
+ if message.author.bot:
+ return
+
+ issues = [
+ FoundIssue(*match.group("org", "repo", "number"))
+ for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
+ ]
+ links = []
+
+ if issues:
+ # Block this from working in DMs
+ if not message.guild:
+ return
+
+ log.trace(f"Found {issues = }")
+ # Remove duplicates
+ issues = set(issues)
+
+ if len(issues) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await message.channel.send(embed=embed, delete_after=5)
+ return
+
+ for repo_issue in issues:
+ result = await self.fetch_issue(
+ int(repo_issue.number),
+ repo_issue.repository,
+ repo_issue.organisation or "python-discord"
+ )
+ if isinstance(result, IssueState):
+ links.append(result)
+
+ if not links:
+ return
+
+ resp = self.format_embed(links)
+ await message.channel.send(embed=resp)
+
+ async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]:
+ """Retrieve data as a dictionary and the response in a tuple."""
+ log.trace(f"Querying GH issues API: {url}")
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
+ return await r.json(), r
+
@github_group.command(name="user", aliases=("userinfo",))
async def github_user_info(self, ctx: commands.Context, username: str) -> None:
"""Fetches a user's GitHub information."""
async with ctx.typing():
- user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
+ user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")
# User_data will not have a message key if the user exists
if "message" in user_data:
@@ -50,7 +241,7 @@ class GithubInfo(commands.Cog):
await ctx.send(embed=embed)
return
- org_data = await self.fetch_data(user_data["organizations_url"])
+ org_data, _ = await self.fetch_data(user_data["organizations_url"])
orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
orgs_to_add = " | ".join(orgs)
@@ -91,10 +282,7 @@ class GithubInfo(commands.Cog):
)
if user_data["type"] == "User":
- embed.add_field(
- name="Gists",
- value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
- )
+ embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")
embed.add_field(
name=f"Organization{'s' if len(orgs)!=1 else ''}",
@@ -123,7 +311,7 @@ class GithubInfo(commands.Cog):
return
async with ctx.typing():
- repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
+ repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
# There won't be a message key if this repo exists
if "message" in repo_data:
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
deleted file mode 100644
index b6d5a43e..00000000
--- a/bot/exts/utilities/issues.py
+++ /dev/null
@@ -1,277 +0,0 @@
-import logging
-import random
-import re
-from dataclasses import dataclass
-from typing import Optional, Union
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import (
- Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS
-)
-from bot.utils.decorators import whitelist_override
-from bot.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-BAD_RESPONSE = {
- 404: "Issue/pull request not located! Please enter a valid number!",
- 403: "Rate limit has been hit! Please try again later!"
-}
-REQUEST_HEADERS = {
- "Accept": "application/vnd.github.v3+json"
-}
-
-REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
-ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
-PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
-
-if GITHUB_TOKEN := Tokens.github:
- REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
-
-WHITELISTED_CATEGORIES = (
- Categories.development, Categories.devprojects, Categories.media, Categories.staff
-)
-
-CODE_BLOCK_RE = re.compile(
- r"^`([^`\n]+)`" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-
-# Maximum number of issues in one message
-MAXIMUM_ISSUES = 5
-
-# Regex used when looking for automatic linking in messages
-# regex101 of current regex https://regex101.com/r/V2ji8M/6
-AUTOMATIC_REGEX = re.compile(
- r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
-)
-
-
-@dataclass
-class FoundIssue:
- """Dataclass representing an issue found by the regex."""
-
- organisation: Optional[str]
- repository: str
- number: str
-
- def __hash__(self) -> int:
- return hash((self.organisation, self.repository, self.number))
-
-
-@dataclass
-class FetchError:
- """Dataclass representing an error while fetching an issue."""
-
- return_code: int
- message: str
-
-
-@dataclass
-class IssueState:
- """Dataclass representing the state of an issue."""
-
- repository: str
- number: int
- url: str
- title: str
- emoji: str
-
-
-class Issues(commands.Cog):
- """Cog that allows users to retrieve issues from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.repos = []
-
- @staticmethod
- def remove_codeblocks(message: str) -> str:
- """Remove any codeblock in a message."""
- return re.sub(CODE_BLOCK_RE, "", message)
-
- async def fetch_issues(
- self,
- number: int,
- repository: str,
- user: str
- ) -> Union[IssueState, FetchError]:
- """
- Retrieve an issue from a GitHub repository.
-
- Returns IssueState on success, FetchError on failure.
- """
- url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
- pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
- log.trace(f"Querying GH issues API: {url}")
-
- async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
- json_data = await r.json()
-
- if r.status == 403:
- if r.headers.get("X-RateLimit-Remaining") == "0":
- log.info(f"Ratelimit reached while fetching {url}")
- return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
- return FetchError(403, "Cannot access issue.")
- elif r.status in (404, 410):
- return FetchError(r.status, "Issue not found.")
- elif r.status != 200:
- return FetchError(r.status, "Error while fetching issue.")
-
- # The initial API request is made to the issues API endpoint, which will return information
- # if the issue or PR is present. However, the scope of information returned for PRs differs
- # from issues: if the 'issues' key is present in the response then we can pull the data we
- # need from the initial API call.
- if "issues" in json_data["html_url"]:
- if json_data.get("state") == "open":
- emoji = Emojis.issue_open
- else:
- emoji = Emojis.issue_closed
-
- # If the 'issues' key is not contained in the API response and there is no error code, then
- # we know that a PR has been requested and a call to the pulls API endpoint is necessary
- # to get the desired information for the PR.
- else:
- log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
- async with self.bot.http_session.get(pulls_url) as p:
- pull_data = await p.json()
- if pull_data["draft"]:
- emoji = Emojis.pull_request_draft
- elif pull_data["state"] == "open":
- emoji = Emojis.pull_request_open
- # When 'merged_at' is not None, this means that the state of the PR is merged
- elif pull_data["merged_at"] is not None:
- emoji = Emojis.pull_request_merged
- else:
- emoji = Emojis.pull_request_closed
-
- issue_url = json_data.get("html_url")
-
- return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
-
- @staticmethod
- def format_embed(
- results: list[Union[IssueState, FetchError]],
- user: str,
- repository: Optional[str] = None
- ) -> discord.Embed:
- """Take a list of IssueState or FetchError and format a Discord embed for them."""
- description_list = []
-
- for result in results:
- if isinstance(result, IssueState):
- description_list.append(f"{result.emoji} [{result.title}]({result.url})")
- elif isinstance(result, FetchError):
- description_list.append(f":x: [{result.return_code}] {result.message}")
-
- resp = discord.Embed(
- colour=Colours.bright_green,
- description="\n".join(description_list)
- )
-
- embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
- resp.set_author(name="GitHub", url=embed_url)
- return resp
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(aliases=("issues", "pr", "prs"))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
- # Remove duplicates
- numbers = set(numbers)
-
- err_message = None
- if not numbers:
- err_message = "You must have at least one issue/PR!"
-
- elif len(numbers) > MAXIMUM_ISSUES:
- err_message = f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
-
- # If there's an error with command invocation then send an error embed
- if err_message is not None:
- err_embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=err_message
- )
- await ctx.send(embed=err_embed)
- await invoke_help_command(ctx)
- return
-
- results = [await self.fetch_issues(number, repository, user) for number in numbers]
- await ctx.send(embed=self.format_embed(results, user, repository))
-
- @commands.Cog.listener()
- async def on_message(self, message: discord.Message) -> None:
- """
- Automatic issue linking.
-
- Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
- """
- # Ignore bots
- if message.author.bot:
- return
-
- issues = [
- FoundIssue(*match.group("org", "repo", "number"))
- for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
- ]
- links = []
-
- if issues:
- # Block this from working in DMs
- if not message.guild:
- await message.channel.send(
- embed=discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=(
- "You can't retrieve issues from DMs. "
- f"Try again in <#{Channels.community_bot_commands}>"
- ),
- colour=Colours.soft_red
- )
- )
- return
-
- log.trace(f"Found {issues = }")
- # Remove duplicates
- issues = set(issues)
-
- if len(issues) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await message.channel.send(embed=embed, delete_after=5)
- return
-
- for repo_issue in issues:
- result = await self.fetch_issues(
- int(repo_issue.number),
- repo_issue.repository,
- repo_issue.organisation or "python-discord"
- )
- if isinstance(result, IssueState):
- links.append(result)
-
- if not links:
- return
-
- resp = self.format_embed(links, "python-discord")
- await message.channel.send(embed=resp)
-
-
-def setup(bot: Bot) -> None:
- """Load the Issues cog."""
- bot.add_cog(Issues(bot))