diff options
author | 2022-02-16 23:52:55 +0000 | |
---|---|---|
committer | 2022-02-16 23:52:55 +0000 | |
commit | a921f487833de7cedb690bfa2468eb98d2392596 (patch) | |
tree | ed2a283206b122f31b0ff6e8166a6852bc6e88cd /bot/exts | |
parent | Merge remote-tracking branch 'origin/main' into main (diff) | |
parent | Add topics for `#programming-pedagogy` channel (diff) |
Merge remote-tracking branch 'origin/main' into main
Diffstat (limited to 'bot/exts')
-rw-r--r-- | bot/exts/events/advent_of_code/_cog.py | 59 | ||||
-rw-r--r-- | bot/exts/events/advent_of_code/_helpers.py | 15 | ||||
-rw-r--r-- | bot/exts/events/advent_of_code/views/dayandstarview.py | 8 | ||||
-rw-r--r-- | bot/exts/events/trivianight/__init__.py | 0 | ||||
-rw-r--r-- | bot/exts/events/trivianight/_game.py | 192 | ||||
-rw-r--r-- | bot/exts/events/trivianight/_questions.py | 179 | ||||
-rw-r--r-- | bot/exts/events/trivianight/_scoreboard.py | 186 | ||||
-rw-r--r-- | bot/exts/events/trivianight/trivianight.py | 328 | ||||
-rw-r--r-- | bot/exts/fun/madlibs.py | 148 | ||||
-rw-r--r-- | bot/exts/holidays/valentines/lovecalculator.py | 6 | ||||
-rw-r--r-- | bot/exts/utilities/epoch.py | 138 | ||||
-rw-r--r-- | bot/exts/utilities/githubinfo.py | 216 | ||||
-rw-r--r-- | bot/exts/utilities/issues.py | 277 |
13 files changed, 1409 insertions, 343 deletions
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py index a9625153..518841d4 100644 --- a/bot/exts/events/advent_of_code/_cog.py +++ b/bot/exts/events/advent_of_code/_cog.py @@ -11,12 +11,13 @@ from discord.ext import commands, tasks from bot.bot import Bot from bot.constants import ( - AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS + AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, PYTHON_PREFIX, Roles, WHITELISTED_CHANNELS ) from bot.exts.events.advent_of_code import _helpers from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView from bot.utils import members from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role +from bot.utils.exceptions import MovedCommandError from bot.utils.extensions import invoke_help_command log = logging.getLogger(__name__) @@ -60,7 +61,8 @@ class AdventOfCode(commands.Cog): self.status_task.set_name("AoC Status Countdown") self.status_task.add_done_callback(_helpers.background_task_callback) - self.completionist_task.start() + # Don't start task while event isn't running + # self.completionist_task.start() @tasks.loop(minutes=10.0) async def completionist_task(self) -> None: @@ -95,7 +97,9 @@ class AdventOfCode(commands.Cog): # Only give the role to people who have completed all 50 stars continue - member_id = aoc_name_to_member_id.get(member_aoc_info["name"], None) + aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}" + + member_id = aoc_name_to_member_id.get(aoc_name) if not member_id: log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.") continue @@ -137,45 +141,17 @@ class AdventOfCode(commands.Cog): @commands.guild_only() @adventofcode_group.command( name="subscribe", - aliases=("sub", "notifications", "notify", "notifs"), - brief="Notifications for new days" + aliases=("sub", "notifications", "notify", "notifs", "unsubscribe", "unsub"), + help=f"NOTE: This command has been moved to {PYTHON_PREFIX}subscribe", ) @whitelist_override(channels=AOC_WHITELIST) async def aoc_subscribe(self, ctx: commands.Context) -> None: - """Assign the role for notifications about new days being ready.""" - current_year = datetime.now().year - if current_year != AocConfig.year: - await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!") - return - - role = ctx.guild.get_role(AocConfig.role_id) - unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" - - if role not in ctx.author.roles: - await ctx.author.add_roles(role) - await ctx.send( - "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " - f"You can run `{unsubscribe_command}` to disable them again for you." - ) - else: - await ctx.send( - "Hey, you already are receiving notifications about new Advent of Code tasks. " - f"If you don't want them any more, run `{unsubscribe_command}` instead." - ) - - @in_month(Month.DECEMBER) - @commands.guild_only() - @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") - @whitelist_override(channels=AOC_WHITELIST) - async def aoc_unsubscribe(self, ctx: commands.Context) -> None: - """Remove the role for notifications about new days being ready.""" - role = ctx.guild.get_role(AocConfig.role_id) + """ + Deprecated role command. - if role in ctx.author.roles: - await ctx.author.remove_roles(role) - await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") - else: - await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") + This command has been moved to bot, and will be removed in the future. + """ + raise MovedCommandError(f"{PYTHON_PREFIX}subscribe") @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") @whitelist_override(channels=AOC_WHITELIST) @@ -214,9 +190,10 @@ class AdventOfCode(commands.Cog): async def join_leaderboard(self, ctx: commands.Context) -> None: """DM the user the information for joining the Python Discord leaderboard.""" current_date = datetime.now() - if ( - current_date.month not in (Month.NOVEMBER, Month.DECEMBER) and current_date.year != AocConfig.year or - current_date.month != Month.JANUARY and current_date.year != AocConfig.year + 1 + allowed_months = (Month.NOVEMBER.value, Month.DECEMBER.value) + if not ( + current_date.month in allowed_months and current_date.year == AocConfig.year or + current_date.month == Month.JANUARY.value and current_date.year == AocConfig.year + 1 ): # Only allow joining the leaderboard in the run up to AOC and the January following. await ctx.send(f"The Python Discord leaderboard for {current_date.year} is not yet available!") diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py index 807cc275..6c004901 100644 --- a/bot/exts/events/advent_of_code/_helpers.py +++ b/bot/exts/events/advent_of_code/_helpers.py @@ -255,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]: # Two attempts, one with the original session cookie and one with the fallback session for attempt in range(1, 3): - log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") + log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") cookies = {"session": leaderboard.session} try: raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies) @@ -332,7 +332,7 @@ async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: number_of_participants = len(leaderboard) formatted_leaderboard = _format_leaderboard(leaderboard) full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) - leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() + leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat() cached_leaderboard = { "placement_leaderboard": json.dumps(raw_leaderboard_data), @@ -368,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed: """Get an embed with the current summary stats of the leaderboard.""" leaderboard_url = leaderboard["full_leaderboard_url"] refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60 + refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp()) - aoc_embed = discord.Embed( - colour=Colours.soft_green, - timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), - description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*" + aoc_embed = discord.Embed(colour=Colours.soft_green) + + aoc_embed.description = ( + f"The leaderboard is refreshed every {refresh_minutes} minutes.\n" + f"Last Updated: <t:{refreshed_unix}:t>" ) aoc_embed.add_field( name="Number of Participants", @@ -386,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed: inline=True, ) aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) - aoc_embed.set_footer(text="Last Updated") aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL) return aoc_embed diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py index a0bfa316..5529c12b 100644 --- a/bot/exts/events/advent_of_code/views/dayandstarview.py +++ b/bot/exts/events/advent_of_code/views/dayandstarview.py @@ -42,7 +42,13 @@ class AoCDropdownView(discord.ui.View): async def interaction_check(self, interaction: discord.Interaction) -> bool: """Global check to ensure that the interacting user is the user who invoked the command originally.""" - return interaction.user == self.original_author + if interaction.user != self.original_author: + await interaction.response.send_message( + ":x: You can't interact with someone else's response. Please run the command yourself!", + ephemeral=True + ) + return False + return True @discord.ui.select( placeholder="Day", diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/events/trivianight/__init__.py diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py new file mode 100644 index 00000000..8b012a17 --- /dev/null +++ b/bot/exts/events/trivianight/_game.py @@ -0,0 +1,192 @@ +import time +from random import randrange +from string import ascii_uppercase +from typing import Iterable, NamedTuple, Optional, TypedDict + +DEFAULT_QUESTION_POINTS = 10 +DEFAULT_QUESTION_TIME = 20 + + +class QuestionData(TypedDict): + """Representing the different 'keys' of the question taken from the JSON.""" + + number: str + description: str + answers: list[str] + correct: str + points: Optional[int] + time: Optional[int] + + +class UserGuess(NamedTuple): + """Represents the user's guess for a question.""" + + answer: str + editable: bool + elapsed: float + + +class QuestionClosed(RuntimeError): + """Exception raised when the question is not open for guesses anymore.""" + + +class AlreadyUpdated(RuntimeError): + """Exception raised when the user has already updated their guess once.""" + + +class AllQuestionsVisited(RuntimeError): + """Exception raised when all of the questions have been visited.""" + + +class Question: + """Interface for one question in a trivia night game.""" + + def __init__(self, data: QuestionData): + self._data = data + self._guesses: dict[int, UserGuess] = {} + self._started = None + + # These properties are mostly proxies to the underlying data: + + @property + def number(self) -> str: + """The number of the question.""" + return self._data["number"] + + @property + def description(self) -> str: + """The description of the question.""" + return self._data["description"] + + @property + def answers(self) -> list[tuple[str, str]]: + """ + The possible answers for this answer. + + This is a property that returns a list of letter, answer pairs. + """ + return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])] + + @property + def correct(self) -> str: + """The correct answer for this question.""" + return self._data["correct"] + + @property + def max_points(self) -> int: + """The maximum points that can be awarded for this question.""" + return self._data.get("points") or DEFAULT_QUESTION_POINTS + + @property + def time(self) -> float: + """The time allowed to answer the question.""" + return self._data.get("time") or DEFAULT_QUESTION_TIME + + def start(self) -> float: + """Start the question and return the time it started.""" + self._started = time.perf_counter() + return self._started + + def _update_guess(self, user: int, answer: str) -> UserGuess: + """Update an already existing guess.""" + if self._started is None: + raise QuestionClosed("Question is not open for answers.") + + if self._guesses[user][1] is False: + raise AlreadyUpdated(f"User({user}) has already updated their guess once.") + + self._guesses[user] = (answer, False, time.perf_counter() - self._started) + return self._guesses[user] + + def guess(self, user: int, answer: str) -> UserGuess: + """Add a guess made by a user to the current question.""" + if user in self._guesses: + return self._update_guess(user, answer) + + if self._started is None: + raise QuestionClosed("Question is not open for answers.") + + self._guesses[user] = (answer, True, time.perf_counter() - self._started) + return self._guesses[user] + + def stop(self) -> dict[int, UserGuess]: + """Stop the question and return the guesses that were made.""" + guesses = self._guesses + + self._started = None + self._guesses = {} + + return guesses + + +class TriviaNightGame: + """Interface for managing a game of trivia night.""" + + def __init__(self, data: list[QuestionData]) -> None: + self._questions = [Question(q) for q in data] + # A copy of the questions to keep for `.trivianight list` + self._all_questions = list(self._questions) + self.current_question: Optional[Question] = None + self._points = {} + self._speed = {} + + def __iter__(self) -> Iterable[Question]: + return iter(self._questions) + + def next_question(self, number: str = None) -> Question: + """ + Consume one random question from the trivia night game. + + One question is randomly picked from the list of questions which is then removed and returned. + """ + if self.current_question is not None: + raise RuntimeError("Cannot call next_question() when there is a current question.") + + if number is not None: + try: + question = [q for q in self._all_questions if q.number == int(number)][0] + except IndexError: + raise ValueError(f"Question number {number} does not exist.") + elif len(self._questions) == 0: + raise AllQuestionsVisited("All of the questions have been visited.") + else: + question = self._questions.pop(randrange(len(self._questions))) + + self.current_question = question + return question + + def end_question(self) -> None: + """ + End the current question. + + This method should be called when the question has been answered, it must be called before + attempting to call `next_question()` again. + """ + if self.current_question is None: + raise RuntimeError("Cannot call end_question() when there is no current question.") + + self.current_question.stop() + self.current_question = None + + def list_questions(self) -> str: + """ + List all the questions. + + This method should be called when `.trivianight list` is called to display the following information: + - Question number + - Question description + - Visited/not visited + """ + question_list = [] + + visited = ":white_check_mark:" + not_visited = ":x:" + + for question in self._all_questions: + formatted_string = ( + f"**Q{question.number}** {not_visited if question in self._questions else visited}" + f"\n{question.description}\n\n" + ) + question_list.append(formatted_string.rstrip()) + + return question_list diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py new file mode 100644 index 00000000..d6beced9 --- /dev/null +++ b/bot/exts/events/trivianight/_questions.py @@ -0,0 +1,179 @@ +from random import choice +from string import ascii_uppercase + +import discord +from discord import Embed, Interaction +from discord.ui import Button, View + +from bot.constants import Colours, NEGATIVE_REPLIES + +from ._game import AlreadyUpdated, Question, QuestionClosed +from ._scoreboard import Scoreboard + + +class AnswerButton(Button): + """Button subclass that's used to guess on a particular answer.""" + + def __init__(self, label: str, question: Question): + super().__init__(label=label, style=discord.ButtonStyle.green) + + self.question = question + + async def callback(self, interaction: Interaction) -> None: + """ + When a user interacts with the button, this will be called. + + Parameters: + - interaction: an instance of discord.Interaction representing the interaction between the user and the + button. + """ + try: + guess = self.question.guess(interaction.user.id, self.label) + except AlreadyUpdated: + await interaction.response.send_message( + embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="You've already changed your answer more than once!", + color=Colours.soft_red + ), + ephemeral=True + ) + return + except QuestionClosed: + await interaction.response.send_message( + embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="The question is no longer accepting guesses!", + color=Colours.soft_red + ), + ephemeral=True + ) + return + + if guess[1]: + await interaction.response.send_message( + embed=Embed( + title="Confirming that...", + description=f"You chose answer {self.label}.", + color=Colours.soft_green + ), + ephemeral=True + ) + else: + # guess[1] is False and they cannot change their answer again. Which + # indicates that they changed it this time around. + await interaction.response.send_message( + embed=Embed( + title="Confirming that...", + description=f"You changed your answer to answer {self.label}.", + color=Colours.soft_green + ), + ephemeral=True + ) + + +class QuestionView(View): + """View for one trivia night question.""" + + def __init__(self, question: Question) -> None: + super().__init__() + self.question = question + + for letter, _ in self.question.answers: + self.add_item(AnswerButton(letter, self.question)) + + @staticmethod + def unicodeify(text: str) -> str: + """ + Takes `text` and adds zero-width spaces to prevent copy and pasting the question. + + Parameters: + - text: A string that represents the question description to 'unicodeify' + """ + return "".join( + f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter + for idx, letter in enumerate(text) + ) + + def create_embed(self) -> Embed: + """Helper function to create the embed for the current question.""" + question_embed = Embed( + title=f"Question {self.question.number}", + description=self.unicodeify(self.question.description), + color=Colours.python_yellow + ) + + for label, answer in self.question.answers: + question_embed.add_field(name=f"Answer {label}", value=answer, inline=False) + + return question_embed + + def end_question(self, scoreboard: Scoreboard) -> Embed: + """ + Ends the question and displays the statistics on who got the question correct, awards points, etc. + + Returns: + An embed displaying the correct answers and the % of people that chose each answer. + """ + guesses = self.question.stop() + + labels = ascii_uppercase[:len(self.question.answers)] + + answer_embed = Embed( + title=f"The correct answer for Question {self.question.number} was...", + description=self.question.correct + ) + + if len(guesses) != 0: + answers_chosen = { + answer_choice: len( + tuple(filter(lambda x: x[0] == answer_choice, guesses.values())) + ) + for answer_choice in labels + } + + answers_chosen = dict( + sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True) + ) + + for answer, people_answered in answers_chosen.items(): + is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct + + # Setting the color of answer_embed to the % of people that got it correct via the mapping + if is_correct_answer: + # Maps the % of people who got it right to a color, from a range of red to green + percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB] + answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25] + + field_title = ( + (":white_check_mark: " if is_correct_answer else "") + + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose" + ) + + # The `ord` function is used here to change the letter to its corresponding position + answer_embed.add_field( + name=field_title, + value=self.question.answers[ord(answer) - 65][1], + inline=False + ) + + # Assign points to users + for user_id, answer in guesses.items(): + if dict(self.question.answers)[answer[0]] == self.question.correct: + scoreboard.assign_points( + int(user_id), + points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points, + speed=answer[-1] + ) + elif answer[-1] <= 2: + scoreboard.assign_points( + int(user_id), + points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points + ) + else: + scoreboard.assign_points( + int(user_id), + points=0 + ) + + return answer_embed diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py new file mode 100644 index 00000000..a5a5fcac --- /dev/null +++ b/bot/exts/events/trivianight/_scoreboard.py @@ -0,0 +1,186 @@ +from random import choice + +import discord.ui +from discord import ButtonStyle, Embed, Interaction, Member +from discord.ui import Button, View + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + + +class ScoreboardView(View): + """View for the scoreboard.""" + + def __init__(self, bot: Bot): + super().__init__() + self.bot = bot + + @staticmethod + def _int_to_ordinal(number: int) -> str: + """ + Converts an integer into an ordinal number, i.e. 1 to 1st. + + Parameters: + - number: an integer representing the number to convert to an ordinal number. + """ + suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)] + if (number % 100) in {11, 12, 13}: + suffix = "th" + + return str(number) + suffix + + async def create_main_leaderboard(self) -> Embed: + """ + Helper function that iterates through `self.points` to generate the main leaderboard embed. + + The main leaderboard would be formatted like the following: + **1**. @mention of the user (# of points) + along with the 29 other users who made it onto the leaderboard. + """ + formatted_string = "" + + for current_placement, (user, points) in enumerate(self.points.items()): + if current_placement + 1 > 30: + break + + user = await self.bot.fetch_user(int(user)) + formatted_string += f"**{current_placement + 1}.** {user.mention} " + formatted_string += f"({points:.1f} pts)\n" + if (current_placement + 1) % 10 == 0: + formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n" + + main_embed = Embed( + title="Winners of the Trivia Night", + description=formatted_string, + color=Colours.python_blue, + ) + + return main_embed + + async def _create_speed_embed(self) -> Embed: + """ + Helper function that iterates through `self.speed` to generate a leaderboard embed. + + The speed leaderboard would be formatted like the following: + **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s) + along with the 29 other users who made it onto the leaderboard. + """ + formatted_string = "" + + for current_placement, (user, time_taken) in enumerate(self.speed.items()): + if current_placement + 1 > 30: + break + + user = await self.bot.fetch_user(int(user)) + formatted_string += f"**{current_placement + 1}.** {user.mention} " + formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n" + if (current_placement + 1) % 10 == 0: + formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n" + + speed_embed = Embed( + title="Average time taken to answer a question", + description=formatted_string, + color=Colours.python_blue + ) + return speed_embed + + def _get_rank(self, member: Member) -> Embed: + """ + Gets the member's rank for the points leaderboard and speed leaderboard. + + Parameters: + - member: An instance of discord.Member representing the person who is trying to get their rank. + """ + rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue) + # These are stored as strings so that the last digit can be determined to choose the suffix + try: + points_rank = str(list(self.points).index(member.id) + 1) + speed_rank = str(list(self.speed).index(member.id) + 1) + except ValueError: + return Embed( + title=choice(NEGATIVE_REPLIES), + description="It looks like you didn't participate in the Trivia Night event!", + color=Colours.soft_red + ) + + rank_embed.add_field( + name="Total Points", + value=( + f"You got {self._int_to_ordinal(int(points_rank))} place" + f" with {self.points[member.id]:.1f} points." + ), + inline=False + ) + + rank_embed.add_field( + name="Average Speed", + value=( + f"You got {self._int_to_ordinal(int(speed_rank))} place" + f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds." + ), + inline=False + ) + return rank_embed + + @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green) + async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None: + """ + Send an ephemeral message with the speed leaderboard embed. + + Parameters: + - button: The discord.ui.Button instance representing the `Speed Leaderboard` button. + - interaction: The discord.Interaction instance containing information on the interaction between the user + and the button. + """ + await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True) + + @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple) + async def rank_button(self, button: Button, interaction: Interaction) -> None: + """ + Send an ephemeral message with the user's rank for the overall points/average speed. + + Parameters: + - button: The discord.ui.Button instance representing the `What's my rank?` button. + - interaction: The discord.Interaction instance containing information on the interaction between the user + and the button. + """ + await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True) + + +class Scoreboard: + """Class for the scoreboard for the Trivia Night event.""" + + def __init__(self, bot: Bot): + self._bot = bot + self._points = {} + self._speed = {} + + def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None: + """ + Assign points or deduct points to/from a certain user. + + This method should be called once the question has finished and all answers have been registered. + """ + if points is not None and user_id not in self._points.keys(): + self._points[user_id] = points + elif points is not None: + self._points[user_id] += points + + if speed is not None and user_id not in self._speed.keys(): + self._speed[user_id] = [1, speed] + elif speed is not None: + self._speed[user_id] = [ + self._speed[user_id][0] + 1, self._speed[user_id][1] + speed + ] + + async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]: + """Returns the embed of the main leaderboard along with the ScoreboardView.""" + view = ScoreboardView(self._bot) + + view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True)) + view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0])) + + return ( + await view.create_main_leaderboard(), + view if not speed_leaderboard else await view._create_speed_embed() + ) diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py new file mode 100644 index 00000000..18d8327a --- /dev/null +++ b/bot/exts/events/trivianight/trivianight.py @@ -0,0 +1,328 @@ +import asyncio +from json import JSONDecodeError, loads +from random import choice +from typing import Optional + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles +from bot.utils.pagination import LinePaginator + +from ._game import AllQuestionsVisited, TriviaNightGame +from ._questions import QuestionView +from ._scoreboard import Scoreboard + +# The ID you see below are the Events Lead role ID and the Event Runner Role ID +TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408) + + +class TriviaNightCog(commands.Cog): + """Cog for the Python Trivia Night event.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.game: Optional[TriviaNightGame] = None + self.scoreboard: Optional[Scoreboard] = None + self.question_closed: asyncio.Event = None + + @commands.group(aliases=["tn"], invoke_without_command=True) + async def trivianight(self, ctx: commands.Context) -> None: + """ + The command group for the Python Discord Trivia Night. + + If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is. + """ + cog_description = Embed( + title="What is .trivianight?", + description=( + "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other" + " players in a trivia about Python!" + ), + color=Colours.soft_green + ) + await ctx.send(embed=cog_description) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None: + """ + Loads a JSON file from the provided attachment or argument. + + The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below: + - number: int (represents the current question #) + - description: str (represents the question itself) + - answers: list[str] (represents the different answers possible, must be a length of 4) + - correct: str (represents the correct answer in terms of what the correct answer is in `answers` + - time: Optional[int] (represents the timer for the question and how long it should run, default is 10) + - points: Optional[int] (represents how many points are awarded for each question, default is 10) + + The load command accepts three different ways of loading in a JSON: + - an attachment of the JSON file + - a message link to the attachment/JSON + - reading the JSON itself via a codeblock or plain text + """ + if self.game is not None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is already a trivia night running!", + color=Colours.soft_red + )) + return + + if ctx.message.attachments: + json_text = (await ctx.message.attachments[0].read()).decode("utf8") + elif not to_load: + raise commands.BadArgument("You didn't attach an attachment nor link a message!") + elif ( + to_load.startswith("https://discord.com/channels") + or to_load.startswith("https://discordapp.com/channels") + ): + channel_id, message_id = to_load.split("/")[-2:] + channel = await ctx.guild.fetch_channel(int(channel_id)) + message = await channel.fetch_message(int(message_id)) + if message.attachments: + json_text = (await message.attachments[0].read()).decode("utf8") + else: + json_text = message.content.replace("```", "").replace("json", "").replace("\n", "") + else: + json_text = to_load.replace("```", "").replace("json", "").replace("\n", "") + + try: + serialized_json = loads(json_text) + except JSONDecodeError as error: + raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}") + + self.game = TriviaNightGame(serialized_json) + self.question_closed = asyncio.Event() + + success_embed = Embed( + title=choice(POSITIVE_REPLIES), + description="The JSON was loaded successfully!", + color=Colours.soft_green + ) + + self.scoreboard = Scoreboard(self.bot) + + await ctx.send(embed=success_embed) + + @trivianight.command(aliases=('next',)) + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def question(self, ctx: commands.Context, question_number: str = None) -> None: + """ + Gets a random question from the unanswered question list and lets the user(s) choose the answer. + + This command will continuously count down until the time limit of the question is exhausted. + However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="There is already an ongoing question!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + try: + next_question = self.game.next_question(question_number) + except AllQuestionsVisited: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="All of the questions have been used.", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + await ctx.send("Next question in 3 seconds! Get ready...") + await asyncio.sleep(3) + + question_view = QuestionView(next_question) + question_embed = question_view.create_embed() + + next_question.start() + message = await ctx.send(embed=question_embed, view=question_view) + + # Exponentially sleep less and less until the time limit is reached + percentage = 1 + while True: + percentage *= 0.5 + duration = next_question.time * percentage + + await asyncio.wait([self.question_closed.wait()], timeout=duration) + + if self.question_closed.is_set(): + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + self.question_closed.clear() + return + + if int(duration) > 1: + # It is quite ugly to display decimals, the delay for requests to reach Discord + # cause sub-second accuracy to be quite pointless. + await ctx.send(f"{int(duration)}s remaining...") + else: + # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a + # half, of that half) we must sleep both halves at the end. + await asyncio.wait([self.question_closed.wait()], timeout=duration) + if self.question_closed.is_set(): + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + self.question_closed.clear() + return + break + + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def list(self, ctx: commands.Context) -> None: + """ + Display all the questions left in the question bank. + + Questions are displayed in the following format: + Q(number): Question description | :white_check_mark: if the question was used otherwise :x:. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + question_list = self.game.list_questions() + + list_embed = Embed(title="All Trivia Night Questions") + + if len(question_list) == 1: + list_embed.description = question_list[0] + await ctx.send(embed=list_embed) + else: + await LinePaginator.paginate( + question_list, + ctx, + list_embed + ) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def stop(self, ctx: commands.Context) -> None: + """ + End the ongoing question to show the correct question. + + This command should be used if the question should be ended early or if the time limit fails + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no ongoing question!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + self.question_closed.set() + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def end(self, ctx: commands.Context) -> None: + """ + Displays the scoreboard view. + + The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the + 30 players who had the fastest average response time to a question where they got the question right. + + The scoreboard view also has a button where the user can see their own rank, points and average speed if they + didn't make it onto the leaderboard. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="You can't end the event while a question is ongoing!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + scoreboard_embed, scoreboard_view = await self.scoreboard.display() + await ctx.send(embed=scoreboard_embed, view=scoreboard_view) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def scoreboard(self, ctx: commands.Context) -> None: + """ + Displays the scoreboard. + + The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the + 30 players who had the fastest average response time to a question where they got the question right. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="You can't end the event while a question is ongoing!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True) + await ctx.send(embeds=(scoreboard_embed, speed_scoreboard)) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def end_game(self, ctx: commands.Context) -> None: + """Ends the ongoing game.""" + self.game = None + + await ctx.send(embed=Embed( + title=choice(POSITIVE_REPLIES), + description="The game has been stopped.", + color=Colours.soft_green + )) + + +def setup(bot: Bot) -> None: + """Load the TriviaNight cog.""" + bot.add_cog(TriviaNightCog(bot)) diff --git a/bot/exts/fun/madlibs.py b/bot/exts/fun/madlibs.py new file mode 100644 index 00000000..21708e53 --- /dev/null +++ b/bot/exts/fun/madlibs.py @@ -0,0 +1,148 @@ +import json +from asyncio import TimeoutError +from pathlib import Path +from random import choice +from typing import TypedDict + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + +TIMEOUT = 60.0 + + +class MadlibsTemplate(TypedDict): + """Structure of a template in the madlibs JSON file.""" + + title: str + blanks: list[str] + value: list[str] + + +class Madlibs(commands.Cog): + """Cog for the Madlibs game.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.templates = self._load_templates() + self.edited_content = {} + self.checks = set() + + @staticmethod + def _load_templates() -> list[MadlibsTemplate]: + madlibs_stories = Path("bot/resources/fun/madlibs_templates.json") + + with open(madlibs_stories) as file: + return json.load(file) + + @staticmethod + def madlibs_embed(part_of_speech: str, number_of_inputs: int) -> discord.Embed: + """Method to generate an embed with the game information.""" + madlibs_embed = discord.Embed(title="Madlibs", color=Colours.python_blue) + + madlibs_embed.add_field( + name="Enter a word that fits the given part of speech!", + value=f"Part of speech: {part_of_speech}\n\nMake sure not to spam, or you may get auto-muted!" + ) + + madlibs_embed.set_footer(text=f"Inputs remaining: {number_of_inputs}") + + return madlibs_embed + + @commands.Cog.listener() + async def on_message_edit(self, _: discord.Message, after: discord.Message) -> None: + """A listener that checks for message edits from the user.""" + for check in self.checks: + if check(after): + break + else: + return + + self.edited_content[after.id] = after.content + + @commands.command() + @commands.max_concurrency(1, per=commands.BucketType.user) + async def madlibs(self, ctx: commands.Context) -> None: + """ + Play Madlibs with the bot! + + Madlibs is a game where the player is asked to enter a word that + fits a random part of speech (e.g. noun, adjective, verb, plural noun, etc.) + a random amount of times, depending on the story chosen by the bot at the beginning. + """ + random_template = choice(self.templates) + + def author_check(message: discord.Message) -> bool: + return message.channel.id == ctx.channel.id and message.author.id == ctx.author.id + + self.checks.add(author_check) + + loading_embed = discord.Embed( + title="Madlibs", description="Loading your Madlibs game...", color=Colours.python_blue + ) + original_message = await ctx.send(embed=loading_embed) + + submitted_words = {} + + for i, part_of_speech in enumerate(random_template["blanks"]): + inputs_left = len(random_template["blanks"]) - i + + madlibs_embed = self.madlibs_embed(part_of_speech, inputs_left) + await original_message.edit(embed=madlibs_embed) + + try: + message = await self.bot.wait_for(event="message", check=author_check, timeout=TIMEOUT) + except TimeoutError: + timeout_embed = discord.Embed( + title=choice(NEGATIVE_REPLIES), + description="Uh oh! You took too long to respond!", + color=Colours.soft_red + ) + + await ctx.send(ctx.author.mention, embed=timeout_embed) + + for msg_id in submitted_words: + self.edited_content.pop(msg_id, submitted_words[msg_id]) + + self.checks.remove(author_check) + + return + + submitted_words[message.id] = message.content + + blanks = [self.edited_content.pop(msg_id, submitted_words[msg_id]) for msg_id in submitted_words] + + self.checks.remove(author_check) + + story = [] + for value, blank in zip(random_template["value"], blanks): + story.append(f"{value}__{blank}__") + + # In each story template, there is always one more "value" + # (fragment from the story) than there are blanks (words that the player enters) + # so we need to compensate by appending the last line of the story again. + story.append(random_template["value"][-1]) + + story_embed = discord.Embed( + title=random_template["title"], + description="".join(story), + color=Colours.bright_green + ) + + story_embed.set_footer(text=f"Generated for {ctx.author}", icon_url=ctx.author.display_avatar.url) + + await ctx.send(embed=story_embed) + + @madlibs.error + async def handle_madlibs_error(self, ctx: commands.Context, error: commands.CommandError) -> None: + """Error handler for the Madlibs command.""" + if isinstance(error, commands.MaxConcurrencyReached): + await ctx.send("You are already playing Madlibs!") + error.handled = True + + +def setup(bot: Bot) -> None: + """Load the Madlibs cog.""" + bot.add_cog(Madlibs(bot)) diff --git a/bot/exts/holidays/valentines/lovecalculator.py b/bot/exts/holidays/valentines/lovecalculator.py index a53014e5..99fba150 100644 --- a/bot/exts/holidays/valentines/lovecalculator.py +++ b/bot/exts/holidays/valentines/lovecalculator.py @@ -12,7 +12,7 @@ from discord.ext import commands from discord.ext.commands import BadArgument, Cog, clean_content from bot.bot import Bot -from bot.constants import Channels, Client, Lovefest, Month +from bot.constants import Channels, Lovefest, Month, PYTHON_PREFIX from bot.utils.decorators import in_month log = logging.getLogger(__name__) @@ -51,7 +51,7 @@ class LoveCalculator(Cog): raise BadArgument( "This command can only be ran against members with the lovefest role! " "This role be can assigned by running " - f"`{Client.prefix}lovefest sub` in <#{Channels.community_bot_commands}>." + f"`{PYTHON_PREFIX}subscribe` in <#{Channels.bot}>." ) if whom is None: @@ -90,7 +90,7 @@ class LoveCalculator(Cog): name="A letter from Dr. Love:", value=data["text"] ) - embed.set_footer(text=f"You can unsubscribe from lovefest by using {Client.prefix}lovefest unsub") + embed.set_footer(text=f"You can unsubscribe from lovefest by using {PYTHON_PREFIX}subscribe.") await ctx.send(embed=embed) diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py new file mode 100644 index 00000000..42312dd1 --- /dev/null +++ b/bot/exts/utilities/epoch.py @@ -0,0 +1,138 @@ +from typing import Optional, Union + +import arrow +import discord +from dateutil import parser +from discord.ext import commands + +from bot.bot import Bot +from bot.utils.extensions import invoke_help_command + +# https://discord.com/developers/docs/reference#message-formatting-timestamp-styles +STYLES = { + "Epoch": ("",), + "Short Time": ("t", "h:mm A",), + "Long Time": ("T", "h:mm:ss A"), + "Short Date": ("d", "MM/DD/YYYY"), + "Long Date": ("D", "MMMM D, YYYY"), + "Short Date/Time": ("f", "MMMM D, YYYY h:mm A"), + "Long Date/Time": ("F", "dddd, MMMM D, YYYY h:mm A"), + "Relative Time": ("R",) +} +DROPDOWN_TIMEOUT = 60 + + +class DateString(commands.Converter): + """Convert a relative or absolute date/time string to an arrow.Arrow object.""" + + async def convert(self, ctx: commands.Context, argument: str) -> Union[arrow.Arrow, Optional[tuple]]: + """ + Convert a relative or absolute date/time string to an arrow.Arrow object. + + Try to interpret the date string as a relative time. If conversion fails, try to interpret it as an absolute + time. Tokens that are not recognised are returned along with the part of the string that was successfully + converted to an arrow object. If the date string cannot be parsed, BadArgument is raised. + """ + try: + return arrow.utcnow().dehumanize(argument) + except (ValueError, OverflowError): + try: + dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True) + except parser.ParserError: + raise commands.BadArgument(f"`{argument}` Could not be parsed to a relative or absolute date.") + except OverflowError: + raise commands.BadArgument(f"`{argument}` Results in a date outside of the supported range.") + return arrow.get(dt), ignored_tokens + + +class Epoch(commands.Cog): + """Convert an entered time and date to a unix timestamp.""" + + @commands.command(name="epoch") + async def epoch(self, ctx: commands.Context, *, date_time: DateString = None) -> None: + """ + Convert an entered date/time string to the equivalent epoch. + + **Relative time** + Must begin with `in...` or end with `...ago`. + Accepted units: "seconds", "minutes", "hours", "days", "weeks", "months", "years". + eg `.epoch in a month 4 days and 2 hours` + + **Absolute time** + eg `.epoch 2022/6/15 16:43 -04:00` + Absolute times must be entered in descending orders of magnitude. + If AM or PM is left unspecified, the 24-hour clock is assumed. + Timezones are optional, and will default to UTC. The following timezone formats are accepted: + Z (UTC) + ±HH:MM + ±HHMM + ±HH + + Times in the dropdown are shown in UTC + """ + if not date_time: + await invoke_help_command(ctx) + return + + if isinstance(date_time, tuple): + # Remove empty strings. Strip extra whitespace from the remaining items + ignored_tokens = list(map(str.strip, filter(str.strip, date_time[1]))) + date_time = date_time[0] + if ignored_tokens: + await ctx.send(f"Could not parse the following token(s): `{', '.join(ignored_tokens)}`") + await ctx.send(f"Date and time parsed as: `{date_time.format(arrow.FORMAT_RSS)}`") + + epoch = int(date_time.timestamp()) + view = TimestampMenuView(ctx, self._format_dates(date_time), epoch) + original = await ctx.send(f"`{epoch}`", view=view) + await view.wait() # wait until expiration before removing the dropdown + try: + await original.edit(view=None) + except discord.NotFound: # disregard the error message if the message is deleled + pass + + @staticmethod + def _format_dates(date: arrow.Arrow) -> list[str]: + """ + Return a list of date strings formatted according to the discord timestamp styles. + + These are used in the description of each style in the dropdown + """ + date = date.to('utc') + formatted = [str(int(date.timestamp()))] + formatted += [date.format(format[1]) for format in list(STYLES.values())[1:7]] + formatted.append(date.humanize()) + return formatted + + +class TimestampMenuView(discord.ui.View): + """View for the epoch command which contains a single `discord.ui.Select` dropdown component.""" + + def __init__(self, ctx: commands.Context, formatted_times: list[str], epoch: int): + super().__init__(timeout=DROPDOWN_TIMEOUT) + self.ctx = ctx + self.epoch = epoch + self.dropdown: discord.ui.Select = self.children[0] + for label, date_time in zip(STYLES.keys(), formatted_times): + self.dropdown.add_option(label=label, description=date_time) + + @discord.ui.select(placeholder="Select the format of your timestamp") + async def select_format(self, _: discord.ui.Select, interaction: discord.Interaction) -> discord.Message: + """Drop down menu which contains a list of formats which discord timestamps can take.""" + selected = interaction.data["values"][0] + if selected == "Epoch": + return await interaction.response.edit_message(content=f"`{self.epoch}`") + return await interaction.response.edit_message(content=f"`<t:{self.epoch}:{STYLES[selected][0]}>`") + + async def interaction_check(self, interaction: discord.Interaction) -> bool: + """Check to ensure that the interacting user is the user who invoked the command.""" + if interaction.user != self.ctx.author: + embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.") + await interaction.response.send_message(embed=embed, ephemeral=True) + return False + return True + + +def setup(bot: Bot) -> None: + """Load the Epoch cog.""" + bot.add_cog(Epoch()) diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py index 539e388b..963f54e5 100644 --- a/bot/exts/utilities/githubinfo.py +++ b/bot/exts/utilities/githubinfo.py @@ -1,30 +1,165 @@ import logging import random +import re +import typing as t +from dataclasses import dataclass from datetime import datetime -from urllib.parse import quote, quote_plus +from urllib.parse import quote import discord +from aiohttp import ClientResponse from discord.ext import commands from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES +from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens from bot.exts.core.extensions import invoke_help_command log = logging.getLogger(__name__) GITHUB_API_URL = "https://api.github.com" +REQUEST_HEADERS = { + "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if Tokens.github: + REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}" + +CODE_BLOCK_RE = re.compile( + r"^`([^`\n]+)`" # Inline codeblock + r"|```(.+?)```", # Multiline codeblock + re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( + r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass(eq=True, frozen=True) +class FoundIssue: + """Dataclass representing an issue found by the regex.""" + + organisation: t.Optional[str] + repository: str + number: str + + +@dataclass(eq=True, frozen=True) +class FetchError: + """Dataclass representing an error while fetching an issue.""" + + return_code: int + message: str + + +@dataclass(eq=True, frozen=True) +class IssueState: + """Dataclass representing the state of an issue.""" + + repository: str + number: int + url: str + title: str + emoji: str + class GithubInfo(commands.Cog): - """Fetches info from GitHub.""" + """A Cog that fetches info from GitHub.""" def __init__(self, bot: Bot): self.bot = bot + self.repos = [] + + @staticmethod + def remove_codeblocks(message: str) -> str: + """Remove any codeblock in a message.""" + return CODE_BLOCK_RE.sub("", message) + + async def fetch_issue( + self, + number: int, + repository: str, + user: str + ) -> t.Union[IssueState, FetchError]: + """ + Retrieve an issue from a GitHub repository. + + Returns IssueState on success, FetchError on failure. + """ + url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) + pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + + json_data, r = await self.fetch_data(url) + + if r.status == 403: + if r.headers.get("X-RateLimit-Remaining") == "0": + log.info(f"Ratelimit reached while fetching {url}") + return FetchError(403, "Ratelimit reached, please retry in a few minutes.") + return FetchError(403, "Cannot access issue.") + elif r.status in (404, 410): + return FetchError(r.status, "Issue not found.") + elif r.status != 200: + return FetchError(r.status, "Error while fetching issue.") + + # The initial API request is made to the issues API endpoint, which will return information + # if the issue or PR is present. However, the scope of information returned for PRs differs + # from issues: if the 'issues' key is present in the response then we can pull the data we + # need from the initial API call. + if "issues" in json_data["html_url"]: + if json_data.get("state") == "open": + emoji = Emojis.issue_open + else: + emoji = Emojis.issue_closed + + # If the 'issues' key is not contained in the API response and there is no error code, then + # we know that a PR has been requested and a call to the pulls API endpoint is necessary + # to get the desired information for the PR. + else: + pull_data, _ = await self.fetch_data(pulls_url) + if pull_data["draft"]: + emoji = Emojis.pull_request_draft + elif pull_data["state"] == "open": + emoji = Emojis.pull_request_open + # When 'merged_at' is not None, this means that the state of the PR is merged + elif pull_data["merged_at"] is not None: + emoji = Emojis.pull_request_merged + else: + emoji = Emojis.pull_request_closed + + issue_url = json_data.get("html_url") - async def fetch_data(self, url: str) -> dict: - """Retrieve data as a dictionary.""" - async with self.bot.http_session.get(url) as r: - return await r.json() + return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + + @staticmethod + def format_embed( + results: t.List[t.Union[IssueState, FetchError]] + ) -> discord.Embed: + """Take a list of IssueState or FetchError and format a Discord embed for them.""" + description_list = [] + + for result in results: + if isinstance(result, IssueState): + description_list.append(f"{result.emoji} [{result.title}]({result.url})") + elif isinstance(result, FetchError): + description_list.append(f":x: [{result.return_code}] {result.message}") + + resp = discord.Embed( + colour=Colours.bright_green, + description="\n".join(description_list) + ) + + resp.set_author(name="GitHub") + return resp @commands.group(name="github", aliases=("gh", "git")) @commands.cooldown(1, 10, commands.BucketType.user) @@ -33,11 +168,67 @@ class GithubInfo(commands.Cog): if ctx.invoked_subcommand is None: await invoke_help_command(ctx) + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """ + Automatic issue linking. + + Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. + """ + # Ignore bots + if message.author.bot: + return + + issues = [ + FoundIssue(*match.group("org", "repo", "number")) + for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) + ] + links = [] + + if issues: + # Block this from working in DMs + if not message.guild: + return + + log.trace(f"Found {issues = }") + # Remove duplicates + issues = set(issues) + + if len(issues) > MAXIMUM_ISSUES: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" + ) + await message.channel.send(embed=embed, delete_after=5) + return + + for repo_issue in issues: + result = await self.fetch_issue( + int(repo_issue.number), + repo_issue.repository, + repo_issue.organisation or "python-discord" + ) + if isinstance(result, IssueState): + links.append(result) + + if not links: + return + + resp = self.format_embed(links) + await message.channel.send(embed=resp) + + async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]: + """Retrieve data as a dictionary and the response in a tuple.""" + log.trace(f"Querying GH issues API: {url}") + async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: + return await r.json(), r + @github_group.command(name="user", aliases=("userinfo",)) async def github_user_info(self, ctx: commands.Context, username: str) -> None: """Fetches a user's GitHub information.""" async with ctx.typing(): - user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") + user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}") # User_data will not have a message key if the user exists if "message" in user_data: @@ -50,7 +241,7 @@ class GithubInfo(commands.Cog): await ctx.send(embed=embed) return - org_data = await self.fetch_data(user_data["organizations_url"]) + org_data, _ = await self.fetch_data(user_data["organizations_url"]) orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] orgs_to_add = " | ".join(orgs) @@ -91,10 +282,7 @@ class GithubInfo(commands.Cog): ) if user_data["type"] == "User": - embed.add_field( - name="Gists", - value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" - ) + embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})") embed.add_field( name=f"Organization{'s' if len(orgs)!=1 else ''}", @@ -123,7 +311,7 @@ class GithubInfo(commands.Cog): return async with ctx.typing(): - repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") # There won't be a message key if this repo exists if "message" in repo_data: diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py deleted file mode 100644 index b6d5a43e..00000000 --- a/bot/exts/utilities/issues.py +++ /dev/null @@ -1,277 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( - Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { - 404: "Issue/pull request not located! Please enter a valid number!", - 403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { - "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: - REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( - Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( - r"^`([^`\n]+)`" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( - r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: - """Dataclass representing an issue found by the regex.""" - - organisation: Optional[str] - repository: str - number: str - - def __hash__(self) -> int: - return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: - """Dataclass representing an error while fetching an issue.""" - - return_code: int - message: str - - -@dataclass -class IssueState: - """Dataclass representing the state of an issue.""" - - repository: str - number: int - url: str - title: str - emoji: str - - -class Issues(commands.Cog): - """Cog that allows users to retrieve issues from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.repos = [] - - @staticmethod - def remove_codeblocks(message: str) -> str: - """Remove any codeblock in a message.""" - return re.sub(CODE_BLOCK_RE, "", message) - - async def fetch_issues( - self, - number: int, - repository: str, - user: str - ) -> Union[IssueState, FetchError]: - """ - Retrieve an issue from a GitHub repository. - - Returns IssueState on success, FetchError on failure. - """ - url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) - pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) - log.trace(f"Querying GH issues API: {url}") - - async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: - json_data = await r.json() - - if r.status == 403: - if r.headers.get("X-RateLimit-Remaining") == "0": - log.info(f"Ratelimit reached while fetching {url}") - return FetchError(403, "Ratelimit reached, please retry in a few minutes.") - return FetchError(403, "Cannot access issue.") - elif r.status in (404, 410): - return FetchError(r.status, "Issue not found.") - elif r.status != 200: - return FetchError(r.status, "Error while fetching issue.") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data["html_url"]: - if json_data.get("state") == "open": - emoji = Emojis.issue_open - else: - emoji = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") - async with self.bot.http_session.get(pulls_url) as p: - pull_data = await p.json() - if pull_data["draft"]: - emoji = Emojis.pull_request_draft - elif pull_data["state"] == "open": - emoji = Emojis.pull_request_open - # When 'merged_at' is not None, this means that the state of the PR is merged - elif pull_data["merged_at"] is not None: - emoji = Emojis.pull_request_merged - else: - emoji = Emojis.pull_request_closed - - issue_url = json_data.get("html_url") - - return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - - @staticmethod - def format_embed( - results: list[Union[IssueState, FetchError]], - user: str, - repository: Optional[str] = None - ) -> discord.Embed: - """Take a list of IssueState or FetchError and format a Discord embed for them.""" - description_list = [] - - for result in results: - if isinstance(result, IssueState): - description_list.append(f"{result.emoji} [{result.title}]({result.url})") - elif isinstance(result, FetchError): - description_list.append(f":x: [{result.return_code}] {result.message}") - - resp = discord.Embed( - colour=Colours.bright_green, - description="\n".join(description_list) - ) - - embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" - resp.set_author(name="GitHub", url=embed_url) - return resp - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(aliases=("issues", "pr", "prs")) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" - # Remove duplicates - numbers = set(numbers) - - err_message = None - if not numbers: - err_message = "You must have at least one issue/PR!" - - elif len(numbers) > MAXIMUM_ISSUES: - err_message = f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - - # If there's an error with command invocation then send an error embed - if err_message is not None: - err_embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=err_message - ) - await ctx.send(embed=err_embed) - await invoke_help_command(ctx) - return - - results = [await self.fetch_issues(number, repository, user) for number in numbers] - await ctx.send(embed=self.format_embed(results, user, repository)) - - @commands.Cog.listener() - async def on_message(self, message: discord.Message) -> None: - """ - Automatic issue linking. - - Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. - """ - # Ignore bots - if message.author.bot: - return - - issues = [ - FoundIssue(*match.group("org", "repo", "number")) - for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) - ] - links = [] - - if issues: - # Block this from working in DMs - if not message.guild: - await message.channel.send( - embed=discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "You can't retrieve issues from DMs. " - f"Try again in <#{Channels.community_bot_commands}>" - ), - colour=Colours.soft_red - ) - ) - return - - log.trace(f"Found {issues = }") - # Remove duplicates - issues = set(issues) - - if len(issues) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await message.channel.send(embed=embed, delete_after=5) - return - - for repo_issue in issues: - result = await self.fetch_issues( - int(repo_issue.number), - repo_issue.repository, - repo_issue.organisation or "python-discord" - ) - if isinstance(result, IssueState): - links.append(result) - - if not links: - return - - resp = self.format_embed(links, "python-discord") - await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: - """Load the Issues cog.""" - bot.add_cog(Issues(bot)) |