diff options
Diffstat (limited to 'bot')
40 files changed, 1962 insertions, 538 deletions
diff --git a/bot/__main__.py b/bot/__main__.py index 6889fe2b..bd6c70ee 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -12,4 +12,8 @@ bot.add_check(whitelist_check(channels=WHITELISTED_CHANNELS, roles=STAFF_ROLES)) for ext in walk_extensions(): bot.load_extension(ext) -bot.run(Client.token) +if not Client.in_ci: + # Manually enable the message content intent. This is required until the below PR is merged + # https://github.com/python-discord/sir-lancebot/pull/1092 + bot._connection._intents.value += 1 << 15 + bot.run(Client.token) diff --git a/bot/constants.py b/bot/constants.py index 3b426c47..eb9ee4b3 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -12,6 +12,7 @@ __all__ = ( "Channels", "Categories", "Client", + "Logging", "Colours", "Emojis", "Icons", @@ -23,7 +24,7 @@ __all__ = ( "Reddit", "RedisConfig", "RedirectOutput", - "PYTHON_PREFIX" + "PYTHON_PREFIX", "MODERATION_ROLES", "STAFF_ROLES", "WHITELISTED_CHANNELS", @@ -37,6 +38,7 @@ log = logging.getLogger(__name__) PYTHON_PREFIX = "!" + @dataclasses.dataclass class AdventOfCodeLeaderboard: id: str @@ -53,7 +55,7 @@ class AdventOfCodeLeaderboard: def session(self) -> str: """Return either the actual `session` cookie or the fallback cookie.""" if self.use_fallback_session: - log.info(f"Returning fallback cookie for board `{self.id}`.") + log.trace(f"Returning fallback cookie for board `{self.id}`.") return AdventOfCode.fallback_session return self._session @@ -107,8 +109,11 @@ class Cats: class Channels(NamedTuple): advent_of_code = int(environ.get("AOC_CHANNEL_ID", 897932085766004786)) advent_of_code_commands = int(environ.get("AOC_COMMANDS_CHANNEL_ID", 897932607545823342)) - bot = 267659945086812160 + algos_and_data_structs = 650401909852864553 + bot_commands = 267659945086812160 + community_meta = 267659945086812160 organisation = 551789653284356126 + data_science_and_ai = 366673247892275221 devlog = int(environ.get("CHANNEL_DEVLOG", 622895325144940554)) dev_contrib = 635950537262759947 mod_meta = 775412552795947058 @@ -116,7 +121,7 @@ class Channels(NamedTuple): off_topic_0 = 291284109232308226 off_topic_1 = 463035241142026251 off_topic_2 = 463035268514185226 - community_bot_commands = int(environ.get("CHANNEL_COMMUNITY_BOT_COMMANDS", 607247579608121354)) + sir_lancebot_playground = int(environ.get("CHANNEL_COMMUNITY_BOT_COMMANDS", 607247579608121354)) voice_chat_0 = 412357430186344448 voice_chat_1 = 799647045886541885 staff_voice = 541638762007101470 @@ -130,23 +135,31 @@ class Categories(NamedTuple): media = 799054581991997460 staff = 364918151625965579 + codejam_categories_name = "Code Jam" # Name of the codejam team categories + class Client(NamedTuple): name = "Sir Lancebot" guild = int(environ.get("BOT_GUILD", 267624335836053506)) prefix = environ.get("PREFIX", ".") token = environ.get("BOT_TOKEN") debug = environ.get("BOT_DEBUG", "true").lower() == "true" - file_logs = environ.get("FILE_LOGS", "false").lower() == "true" + in_ci = environ.get("IN_CI", "false").lower() == "true" github_bot_repo = "https://github.com/python-discord/sir-lancebot" # Override seasonal locks: 1 (January) to 12 (December) month_override = int(environ["MONTH_OVERRIDE"]) if "MONTH_OVERRIDE" in environ else None + + +class Logging(NamedTuple): + debug = Client.debug + file_logs = environ.get("FILE_LOGS", "false").lower() == "true" trace_loggers = environ.get("BOT_TRACE_LOGGERS") class Colours: blue = 0x0279FD + twitter_blue = 0x1DA1F2 bright_green = 0x01D277 dark_green = 0x1F8B4C orange = 0xE67E22 @@ -231,7 +244,6 @@ class Emojis: status_dnd = "<:status_dnd:470326272082313216>" status_offline = "<:status_offline:470326266537705472>" - stackoverflow_tag = "<:stack_tag:870926975307501570>" stackoverflow_views = "<:stack_eye:870926992692879371>" @@ -343,8 +355,8 @@ STAFF_ROLES = {Roles.helpers, Roles.moderation_team, Roles.admins, Roles.owners} # Whitelisted channels WHITELISTED_CHANNELS = ( - Channels.bot, - Channels.community_bot_commands, + Channels.bot_commands, + Channels.sir_lancebot_playground, Channels.off_topic_0, Channels.off_topic_1, Channels.off_topic_2, diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py index 676a1e70..4578f734 100644 --- a/bot/exts/core/error_handler.py +++ b/bot/exts/core/error_handler.py @@ -1,4 +1,3 @@ -import difflib import logging import math import random @@ -11,6 +10,7 @@ from sentry_sdk import push_scope from bot.bot import Bot from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput +from bot.utils.commands import get_command_suggestions from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure from bot.utils.exceptions import APIError, MovedCommandError, UserNotPlayingError @@ -98,7 +98,8 @@ class CommandErrorHandler(commands.Cog): if isinstance(error, commands.NoPrivateMessage): await ctx.send( embed=self.error_embed( - f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", + "This command can only be used in the server. " + f"Go to <#{Channels.sir_lancebot_playground}> instead!", NEGATIVE_REPLIES ) ) @@ -157,31 +158,32 @@ class CommandErrorHandler(commands.Cog): async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None: """Sends user similar commands if any can be found.""" - raw_commands = [] - for cmd in self.bot.walk_commands(): - if not cmd.hidden: - raw_commands += (cmd.name, *cmd.aliases) - if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1): - similar_command_name = similar_command_data[0] - similar_command = self.bot.get_command(similar_command_name) - - if not similar_command: - return - - log_msg = "Cancelling attempt to suggest a command due to failed checks." - try: - if not await similar_command.can_run(ctx): + command_suggestions = [] + if similar_command_names := get_command_suggestions(list(self.bot.all_commands.keys()), command_name): + for similar_command_name in similar_command_names: + similar_command = self.bot.get_command(similar_command_name) + + if not similar_command: + continue + + log_msg = "Cancelling attempt to suggest a command due to failed checks." + try: + if not await similar_command.can_run(ctx): + log.debug(log_msg) + continue + except commands.errors.CommandError as cmd_error: log.debug(log_msg) - return - except commands.errors.CommandError as cmd_error: - log.debug(log_msg) - await self.on_command_error(ctx, cmd_error) - return + await self.on_command_error(ctx, cmd_error) + continue + + command_suggestions.append(similar_command_name) misspelled_content = ctx.message.content e = Embed() e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON) - e.description = misspelled_content.replace(command_name, similar_command_name, 1) + e.description = "\n".join( + misspelled_content.replace(command_name, cmd, 1) for cmd in command_suggestions + ) await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay) diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py index db3c2aa6..eb7a9762 100644 --- a/bot/exts/core/help.py +++ b/bot/exts/core/help.py @@ -3,16 +3,17 @@ import asyncio import itertools import logging from contextlib import suppress -from typing import NamedTuple, Union +from typing import NamedTuple, Optional, Union from discord import Colour, Embed, HTTPException, Message, Reaction, User from discord.ext import commands from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context -from rapidfuzz import process from bot import constants from bot.bot import Bot from bot.constants import Emojis +from bot.utils.commands import get_command_suggestions +from bot.utils.decorators import whitelist_override from bot.utils.pagination import FIRST_EMOJI, LAST_EMOJI, LEFT_EMOJI, LinePaginator, RIGHT_EMOJI DELETE_EMOJI = Emojis.trashcan @@ -41,14 +42,18 @@ class HelpQueryNotFound(ValueError): """ Raised when a HelpSession Query doesn't match a command or cog. - Contains the custom attribute of ``possible_matches``. - Instances of this object contain a dictionary of any command(s) that were close to matching the - query, where keys are the possible matched command names and values are the likeness match scores. + Params: + possible_matches: list of similar command names. + parent_command: parent command of an invalid subcommand. Only available when an invalid subcommand + has been passed. """ - def __init__(self, arg: str, possible_matches: dict = None): + def __init__( + self, arg: str, possible_matches: Optional[list[str]] = None, *, parent_command: Optional[Command] = None + ) -> None: super().__init__(arg) self.possible_matches = possible_matches + self.parent_command = parent_command class HelpSession: @@ -153,12 +158,17 @@ class HelpSession: Will pass on possible close matches along with the `HelpQueryNotFound` exception. """ - # Combine command and cog names - choices = list(self._bot.all_commands) + list(self._bot.cogs) + # Check if parent command is valid in case subcommand is invalid. + if " " in query: + parent, *_ = query.split() + parent_command = self._bot.get_command(parent) + + if parent_command: + raise HelpQueryNotFound('Invalid Subcommand.', parent_command=parent_command) - result = process.extract(query, choices, score_cutoff=90) + similar_commands = get_command_suggestions(list(self._bot.all_commands.keys()), query) - raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) + raise HelpQueryNotFound(f'Query "{query}" not found.', similar_commands) async def timeout(self, seconds: int = 30) -> None: """Waits for a set number of seconds, then stops the help session.""" @@ -277,7 +287,7 @@ class HelpSession: else: results.append(f"<{name}>") - return f"{cmd.name} {' '.join(results)}" + return f"{cmd.qualified_name} {' '.join(results)}" async def build_pages(self) -> None: """Builds the list of content pages to be paginated through in the help message, as a list of str.""" @@ -304,9 +314,10 @@ class HelpSession: prefix = constants.Client.prefix signature = self._get_command_params(self.query) + paginator.add_line(f"**```\n{prefix}{signature}\n```**") + parent = self.query.full_parent_name + " " if self.query.parent else "" - paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**") - aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases] + aliases = [f"`{alias}`" if not parent else f"`{parent}{alias}`" for alias in self.query.aliases] aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())] aliases = ", ".join(sorted(aliases)) if aliases: @@ -502,18 +513,26 @@ class Help(DiscordCog): """Custom Embed Pagination Help feature.""" @commands.command("help") + @whitelist_override(allow_dm=True) async def new_help(self, ctx: Context, *commands) -> None: """Shows Command Help.""" try: await HelpSession.start(ctx, *commands) except HelpQueryNotFound as error: + + # Send help message of parent command if subcommand is invalid. + if cmd := error.parent_command: + await ctx.send(str(error)) + await self.new_help(ctx, cmd.qualified_name) + return + embed = Embed() embed.colour = Colour.red() embed.title = str(error) if error.possible_matches: - matches = "\n".join(error.possible_matches.keys()) - embed.description = f"**Did you mean:**\n`{matches}`" + matches = "\n".join(error.possible_matches) + embed.description = f"**Did you mean:**\n{matches}" await ctx.send(embed=embed) diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py index 5b5461f0..190a15ec 100644 --- a/bot/exts/core/internal_eval/_internal_eval.py +++ b/bot/exts/core/internal_eval/_internal_eval.py @@ -34,6 +34,8 @@ RAW_CODE_REGEX = re.compile( re.DOTALL # "." also matches newlines ) +MAX_LENGTH = 99980 + class InternalEval(commands.Cog): """Top secret code evaluation for admins and owners.""" @@ -85,9 +87,10 @@ class InternalEval(commands.Cog): async def _upload_output(self, output: str) -> Optional[str]: """Upload `internal eval` output to our pastebin and return the url.""" + data = self.shorten_output(output, max_length=MAX_LENGTH) try: async with self.bot.http_session.post( - "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True + "https://paste.pythondiscord.com/documents", data=data, raise_for_status=True ) as resp: data = await resp.json() diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py index 7572ce51..2801be0f 100644 --- a/bot/exts/core/source.py +++ b/bot/exts/core/source.py @@ -6,14 +6,16 @@ from discord import Embed from discord.ext import commands from bot.bot import Bot -from bot.constants import Source +from bot.constants import Channels, Source, WHITELISTED_CHANNELS from bot.utils.converters import SourceConverter, SourceType +from bot.utils.decorators import whitelist_override class BotSource(commands.Cog): """Displays information about the bot's source code.""" @commands.command(name="source", aliases=("src",)) + @whitelist_override(channels=WHITELISTED_CHANNELS+(Channels.community_meta, Channels.dev_contrib)) async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: """Display information and a GitHub link to the source code of a command, tag, or cog.""" if not source_item: diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py index 01161f26..518841d4 100644 --- a/bot/exts/events/advent_of_code/_cog.py +++ b/bot/exts/events/advent_of_code/_cog.py @@ -61,7 +61,8 @@ class AdventOfCode(commands.Cog): self.status_task.set_name("AoC Status Countdown") self.status_task.add_done_callback(_helpers.background_task_callback) - self.completionist_task.start() + # Don't start task while event isn't running + # self.completionist_task.start() @tasks.loop(minutes=10.0) async def completionist_task(self) -> None: @@ -96,7 +97,9 @@ class AdventOfCode(commands.Cog): # Only give the role to people who have completed all 50 stars continue - member_id = aoc_name_to_member_id.get(member_aoc_info["name"], None) + aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}" + + member_id = aoc_name_to_member_id.get(aoc_name) if not member_id: log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.") continue diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py index 807cc275..6c004901 100644 --- a/bot/exts/events/advent_of_code/_helpers.py +++ b/bot/exts/events/advent_of_code/_helpers.py @@ -255,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]: # Two attempts, one with the original session cookie and one with the fallback session for attempt in range(1, 3): - log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") + log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") cookies = {"session": leaderboard.session} try: raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies) @@ -332,7 +332,7 @@ async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: number_of_participants = len(leaderboard) formatted_leaderboard = _format_leaderboard(leaderboard) full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) - leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() + leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat() cached_leaderboard = { "placement_leaderboard": json.dumps(raw_leaderboard_data), @@ -368,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed: """Get an embed with the current summary stats of the leaderboard.""" leaderboard_url = leaderboard["full_leaderboard_url"] refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60 + refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp()) - aoc_embed = discord.Embed( - colour=Colours.soft_green, - timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), - description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*" + aoc_embed = discord.Embed(colour=Colours.soft_green) + + aoc_embed.description = ( + f"The leaderboard is refreshed every {refresh_minutes} minutes.\n" + f"Last Updated: <t:{refreshed_unix}:t>" ) aoc_embed.add_field( name="Number of Participants", @@ -386,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed: inline=True, ) aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) - aoc_embed.set_footer(text="Last Updated") aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL) return aoc_embed diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py index a0bfa316..5529c12b 100644 --- a/bot/exts/events/advent_of_code/views/dayandstarview.py +++ b/bot/exts/events/advent_of_code/views/dayandstarview.py @@ -42,7 +42,13 @@ class AoCDropdownView(discord.ui.View): async def interaction_check(self, interaction: discord.Interaction) -> bool: """Global check to ensure that the interacting user is the user who invoked the command originally.""" - return interaction.user == self.original_author + if interaction.user != self.original_author: + await interaction.response.send_message( + ":x: You can't interact with someone else's response. Please run the command yourself!", + ephemeral=True + ) + return False + return True @discord.ui.select( placeholder="Day", diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/events/trivianight/__init__.py diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py new file mode 100644 index 00000000..8b012a17 --- /dev/null +++ b/bot/exts/events/trivianight/_game.py @@ -0,0 +1,192 @@ +import time +from random import randrange +from string import ascii_uppercase +from typing import Iterable, NamedTuple, Optional, TypedDict + +DEFAULT_QUESTION_POINTS = 10 +DEFAULT_QUESTION_TIME = 20 + + +class QuestionData(TypedDict): + """Representing the different 'keys' of the question taken from the JSON.""" + + number: str + description: str + answers: list[str] + correct: str + points: Optional[int] + time: Optional[int] + + +class UserGuess(NamedTuple): + """Represents the user's guess for a question.""" + + answer: str + editable: bool + elapsed: float + + +class QuestionClosed(RuntimeError): + """Exception raised when the question is not open for guesses anymore.""" + + +class AlreadyUpdated(RuntimeError): + """Exception raised when the user has already updated their guess once.""" + + +class AllQuestionsVisited(RuntimeError): + """Exception raised when all of the questions have been visited.""" + + +class Question: + """Interface for one question in a trivia night game.""" + + def __init__(self, data: QuestionData): + self._data = data + self._guesses: dict[int, UserGuess] = {} + self._started = None + + # These properties are mostly proxies to the underlying data: + + @property + def number(self) -> str: + """The number of the question.""" + return self._data["number"] + + @property + def description(self) -> str: + """The description of the question.""" + return self._data["description"] + + @property + def answers(self) -> list[tuple[str, str]]: + """ + The possible answers for this answer. + + This is a property that returns a list of letter, answer pairs. + """ + return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])] + + @property + def correct(self) -> str: + """The correct answer for this question.""" + return self._data["correct"] + + @property + def max_points(self) -> int: + """The maximum points that can be awarded for this question.""" + return self._data.get("points") or DEFAULT_QUESTION_POINTS + + @property + def time(self) -> float: + """The time allowed to answer the question.""" + return self._data.get("time") or DEFAULT_QUESTION_TIME + + def start(self) -> float: + """Start the question and return the time it started.""" + self._started = time.perf_counter() + return self._started + + def _update_guess(self, user: int, answer: str) -> UserGuess: + """Update an already existing guess.""" + if self._started is None: + raise QuestionClosed("Question is not open for answers.") + + if self._guesses[user][1] is False: + raise AlreadyUpdated(f"User({user}) has already updated their guess once.") + + self._guesses[user] = (answer, False, time.perf_counter() - self._started) + return self._guesses[user] + + def guess(self, user: int, answer: str) -> UserGuess: + """Add a guess made by a user to the current question.""" + if user in self._guesses: + return self._update_guess(user, answer) + + if self._started is None: + raise QuestionClosed("Question is not open for answers.") + + self._guesses[user] = (answer, True, time.perf_counter() - self._started) + return self._guesses[user] + + def stop(self) -> dict[int, UserGuess]: + """Stop the question and return the guesses that were made.""" + guesses = self._guesses + + self._started = None + self._guesses = {} + + return guesses + + +class TriviaNightGame: + """Interface for managing a game of trivia night.""" + + def __init__(self, data: list[QuestionData]) -> None: + self._questions = [Question(q) for q in data] + # A copy of the questions to keep for `.trivianight list` + self._all_questions = list(self._questions) + self.current_question: Optional[Question] = None + self._points = {} + self._speed = {} + + def __iter__(self) -> Iterable[Question]: + return iter(self._questions) + + def next_question(self, number: str = None) -> Question: + """ + Consume one random question from the trivia night game. + + One question is randomly picked from the list of questions which is then removed and returned. + """ + if self.current_question is not None: + raise RuntimeError("Cannot call next_question() when there is a current question.") + + if number is not None: + try: + question = [q for q in self._all_questions if q.number == int(number)][0] + except IndexError: + raise ValueError(f"Question number {number} does not exist.") + elif len(self._questions) == 0: + raise AllQuestionsVisited("All of the questions have been visited.") + else: + question = self._questions.pop(randrange(len(self._questions))) + + self.current_question = question + return question + + def end_question(self) -> None: + """ + End the current question. + + This method should be called when the question has been answered, it must be called before + attempting to call `next_question()` again. + """ + if self.current_question is None: + raise RuntimeError("Cannot call end_question() when there is no current question.") + + self.current_question.stop() + self.current_question = None + + def list_questions(self) -> str: + """ + List all the questions. + + This method should be called when `.trivianight list` is called to display the following information: + - Question number + - Question description + - Visited/not visited + """ + question_list = [] + + visited = ":white_check_mark:" + not_visited = ":x:" + + for question in self._all_questions: + formatted_string = ( + f"**Q{question.number}** {not_visited if question in self._questions else visited}" + f"\n{question.description}\n\n" + ) + question_list.append(formatted_string.rstrip()) + + return question_list diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py new file mode 100644 index 00000000..d6beced9 --- /dev/null +++ b/bot/exts/events/trivianight/_questions.py @@ -0,0 +1,179 @@ +from random import choice +from string import ascii_uppercase + +import discord +from discord import Embed, Interaction +from discord.ui import Button, View + +from bot.constants import Colours, NEGATIVE_REPLIES + +from ._game import AlreadyUpdated, Question, QuestionClosed +from ._scoreboard import Scoreboard + + +class AnswerButton(Button): + """Button subclass that's used to guess on a particular answer.""" + + def __init__(self, label: str, question: Question): + super().__init__(label=label, style=discord.ButtonStyle.green) + + self.question = question + + async def callback(self, interaction: Interaction) -> None: + """ + When a user interacts with the button, this will be called. + + Parameters: + - interaction: an instance of discord.Interaction representing the interaction between the user and the + button. + """ + try: + guess = self.question.guess(interaction.user.id, self.label) + except AlreadyUpdated: + await interaction.response.send_message( + embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="You've already changed your answer more than once!", + color=Colours.soft_red + ), + ephemeral=True + ) + return + except QuestionClosed: + await interaction.response.send_message( + embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="The question is no longer accepting guesses!", + color=Colours.soft_red + ), + ephemeral=True + ) + return + + if guess[1]: + await interaction.response.send_message( + embed=Embed( + title="Confirming that...", + description=f"You chose answer {self.label}.", + color=Colours.soft_green + ), + ephemeral=True + ) + else: + # guess[1] is False and they cannot change their answer again. Which + # indicates that they changed it this time around. + await interaction.response.send_message( + embed=Embed( + title="Confirming that...", + description=f"You changed your answer to answer {self.label}.", + color=Colours.soft_green + ), + ephemeral=True + ) + + +class QuestionView(View): + """View for one trivia night question.""" + + def __init__(self, question: Question) -> None: + super().__init__() + self.question = question + + for letter, _ in self.question.answers: + self.add_item(AnswerButton(letter, self.question)) + + @staticmethod + def unicodeify(text: str) -> str: + """ + Takes `text` and adds zero-width spaces to prevent copy and pasting the question. + + Parameters: + - text: A string that represents the question description to 'unicodeify' + """ + return "".join( + f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter + for idx, letter in enumerate(text) + ) + + def create_embed(self) -> Embed: + """Helper function to create the embed for the current question.""" + question_embed = Embed( + title=f"Question {self.question.number}", + description=self.unicodeify(self.question.description), + color=Colours.python_yellow + ) + + for label, answer in self.question.answers: + question_embed.add_field(name=f"Answer {label}", value=answer, inline=False) + + return question_embed + + def end_question(self, scoreboard: Scoreboard) -> Embed: + """ + Ends the question and displays the statistics on who got the question correct, awards points, etc. + + Returns: + An embed displaying the correct answers and the % of people that chose each answer. + """ + guesses = self.question.stop() + + labels = ascii_uppercase[:len(self.question.answers)] + + answer_embed = Embed( + title=f"The correct answer for Question {self.question.number} was...", + description=self.question.correct + ) + + if len(guesses) != 0: + answers_chosen = { + answer_choice: len( + tuple(filter(lambda x: x[0] == answer_choice, guesses.values())) + ) + for answer_choice in labels + } + + answers_chosen = dict( + sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True) + ) + + for answer, people_answered in answers_chosen.items(): + is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct + + # Setting the color of answer_embed to the % of people that got it correct via the mapping + if is_correct_answer: + # Maps the % of people who got it right to a color, from a range of red to green + percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB] + answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25] + + field_title = ( + (":white_check_mark: " if is_correct_answer else "") + + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose" + ) + + # The `ord` function is used here to change the letter to its corresponding position + answer_embed.add_field( + name=field_title, + value=self.question.answers[ord(answer) - 65][1], + inline=False + ) + + # Assign points to users + for user_id, answer in guesses.items(): + if dict(self.question.answers)[answer[0]] == self.question.correct: + scoreboard.assign_points( + int(user_id), + points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points, + speed=answer[-1] + ) + elif answer[-1] <= 2: + scoreboard.assign_points( + int(user_id), + points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points + ) + else: + scoreboard.assign_points( + int(user_id), + points=0 + ) + + return answer_embed diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py new file mode 100644 index 00000000..a5a5fcac --- /dev/null +++ b/bot/exts/events/trivianight/_scoreboard.py @@ -0,0 +1,186 @@ +from random import choice + +import discord.ui +from discord import ButtonStyle, Embed, Interaction, Member +from discord.ui import Button, View + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + + +class ScoreboardView(View): + """View for the scoreboard.""" + + def __init__(self, bot: Bot): + super().__init__() + self.bot = bot + + @staticmethod + def _int_to_ordinal(number: int) -> str: + """ + Converts an integer into an ordinal number, i.e. 1 to 1st. + + Parameters: + - number: an integer representing the number to convert to an ordinal number. + """ + suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)] + if (number % 100) in {11, 12, 13}: + suffix = "th" + + return str(number) + suffix + + async def create_main_leaderboard(self) -> Embed: + """ + Helper function that iterates through `self.points` to generate the main leaderboard embed. + + The main leaderboard would be formatted like the following: + **1**. @mention of the user (# of points) + along with the 29 other users who made it onto the leaderboard. + """ + formatted_string = "" + + for current_placement, (user, points) in enumerate(self.points.items()): + if current_placement + 1 > 30: + break + + user = await self.bot.fetch_user(int(user)) + formatted_string += f"**{current_placement + 1}.** {user.mention} " + formatted_string += f"({points:.1f} pts)\n" + if (current_placement + 1) % 10 == 0: + formatted_string += "β―β―β―β―β―β―β―β―\n" + + main_embed = Embed( + title="Winners of the Trivia Night", + description=formatted_string, + color=Colours.python_blue, + ) + + return main_embed + + async def _create_speed_embed(self) -> Embed: + """ + Helper function that iterates through `self.speed` to generate a leaderboard embed. + + The speed leaderboard would be formatted like the following: + **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s) + along with the 29 other users who made it onto the leaderboard. + """ + formatted_string = "" + + for current_placement, (user, time_taken) in enumerate(self.speed.items()): + if current_placement + 1 > 30: + break + + user = await self.bot.fetch_user(int(user)) + formatted_string += f"**{current_placement + 1}.** {user.mention} " + formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n" + if (current_placement + 1) % 10 == 0: + formatted_string += "β―β―β―β―β―β―β―β―\n" + + speed_embed = Embed( + title="Average time taken to answer a question", + description=formatted_string, + color=Colours.python_blue + ) + return speed_embed + + def _get_rank(self, member: Member) -> Embed: + """ + Gets the member's rank for the points leaderboard and speed leaderboard. + + Parameters: + - member: An instance of discord.Member representing the person who is trying to get their rank. + """ + rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue) + # These are stored as strings so that the last digit can be determined to choose the suffix + try: + points_rank = str(list(self.points).index(member.id) + 1) + speed_rank = str(list(self.speed).index(member.id) + 1) + except ValueError: + return Embed( + title=choice(NEGATIVE_REPLIES), + description="It looks like you didn't participate in the Trivia Night event!", + color=Colours.soft_red + ) + + rank_embed.add_field( + name="Total Points", + value=( + f"You got {self._int_to_ordinal(int(points_rank))} place" + f" with {self.points[member.id]:.1f} points." + ), + inline=False + ) + + rank_embed.add_field( + name="Average Speed", + value=( + f"You got {self._int_to_ordinal(int(speed_rank))} place" + f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds." + ), + inline=False + ) + return rank_embed + + @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green) + async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None: + """ + Send an ephemeral message with the speed leaderboard embed. + + Parameters: + - button: The discord.ui.Button instance representing the `Speed Leaderboard` button. + - interaction: The discord.Interaction instance containing information on the interaction between the user + and the button. + """ + await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True) + + @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple) + async def rank_button(self, button: Button, interaction: Interaction) -> None: + """ + Send an ephemeral message with the user's rank for the overall points/average speed. + + Parameters: + - button: The discord.ui.Button instance representing the `What's my rank?` button. + - interaction: The discord.Interaction instance containing information on the interaction between the user + and the button. + """ + await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True) + + +class Scoreboard: + """Class for the scoreboard for the Trivia Night event.""" + + def __init__(self, bot: Bot): + self._bot = bot + self._points = {} + self._speed = {} + + def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None: + """ + Assign points or deduct points to/from a certain user. + + This method should be called once the question has finished and all answers have been registered. + """ + if points is not None and user_id not in self._points.keys(): + self._points[user_id] = points + elif points is not None: + self._points[user_id] += points + + if speed is not None and user_id not in self._speed.keys(): + self._speed[user_id] = [1, speed] + elif speed is not None: + self._speed[user_id] = [ + self._speed[user_id][0] + 1, self._speed[user_id][1] + speed + ] + + async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]: + """Returns the embed of the main leaderboard along with the ScoreboardView.""" + view = ScoreboardView(self._bot) + + view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True)) + view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0])) + + return ( + await view.create_main_leaderboard(), + view if not speed_leaderboard else await view._create_speed_embed() + ) diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py new file mode 100644 index 00000000..18d8327a --- /dev/null +++ b/bot/exts/events/trivianight/trivianight.py @@ -0,0 +1,328 @@ +import asyncio +from json import JSONDecodeError, loads +from random import choice +from typing import Optional + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles +from bot.utils.pagination import LinePaginator + +from ._game import AllQuestionsVisited, TriviaNightGame +from ._questions import QuestionView +from ._scoreboard import Scoreboard + +# The ID you see below are the Events Lead role ID and the Event Runner Role ID +TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408) + + +class TriviaNightCog(commands.Cog): + """Cog for the Python Trivia Night event.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.game: Optional[TriviaNightGame] = None + self.scoreboard: Optional[Scoreboard] = None + self.question_closed: asyncio.Event = None + + @commands.group(aliases=["tn"], invoke_without_command=True) + async def trivianight(self, ctx: commands.Context) -> None: + """ + The command group for the Python Discord Trivia Night. + + If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is. + """ + cog_description = Embed( + title="What is .trivianight?", + description=( + "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other" + " players in a trivia about Python!" + ), + color=Colours.soft_green + ) + await ctx.send(embed=cog_description) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None: + """ + Loads a JSON file from the provided attachment or argument. + + The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below: + - number: int (represents the current question #) + - description: str (represents the question itself) + - answers: list[str] (represents the different answers possible, must be a length of 4) + - correct: str (represents the correct answer in terms of what the correct answer is in `answers` + - time: Optional[int] (represents the timer for the question and how long it should run, default is 10) + - points: Optional[int] (represents how many points are awarded for each question, default is 10) + + The load command accepts three different ways of loading in a JSON: + - an attachment of the JSON file + - a message link to the attachment/JSON + - reading the JSON itself via a codeblock or plain text + """ + if self.game is not None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is already a trivia night running!", + color=Colours.soft_red + )) + return + + if ctx.message.attachments: + json_text = (await ctx.message.attachments[0].read()).decode("utf8") + elif not to_load: + raise commands.BadArgument("You didn't attach an attachment nor link a message!") + elif ( + to_load.startswith("https://discord.com/channels") + or to_load.startswith("https://discordapp.com/channels") + ): + channel_id, message_id = to_load.split("/")[-2:] + channel = await ctx.guild.fetch_channel(int(channel_id)) + message = await channel.fetch_message(int(message_id)) + if message.attachments: + json_text = (await message.attachments[0].read()).decode("utf8") + else: + json_text = message.content.replace("```", "").replace("json", "").replace("\n", "") + else: + json_text = to_load.replace("```", "").replace("json", "").replace("\n", "") + + try: + serialized_json = loads(json_text) + except JSONDecodeError as error: + raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}") + + self.game = TriviaNightGame(serialized_json) + self.question_closed = asyncio.Event() + + success_embed = Embed( + title=choice(POSITIVE_REPLIES), + description="The JSON was loaded successfully!", + color=Colours.soft_green + ) + + self.scoreboard = Scoreboard(self.bot) + + await ctx.send(embed=success_embed) + + @trivianight.command(aliases=('next',)) + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def question(self, ctx: commands.Context, question_number: str = None) -> None: + """ + Gets a random question from the unanswered question list and lets the user(s) choose the answer. + + This command will continuously count down until the time limit of the question is exhausted. + However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="There is already an ongoing question!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + try: + next_question = self.game.next_question(question_number) + except AllQuestionsVisited: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="All of the questions have been used.", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + await ctx.send("Next question in 3 seconds! Get ready...") + await asyncio.sleep(3) + + question_view = QuestionView(next_question) + question_embed = question_view.create_embed() + + next_question.start() + message = await ctx.send(embed=question_embed, view=question_view) + + # Exponentially sleep less and less until the time limit is reached + percentage = 1 + while True: + percentage *= 0.5 + duration = next_question.time * percentage + + await asyncio.wait([self.question_closed.wait()], timeout=duration) + + if self.question_closed.is_set(): + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + self.question_closed.clear() + return + + if int(duration) > 1: + # It is quite ugly to display decimals, the delay for requests to reach Discord + # cause sub-second accuracy to be quite pointless. + await ctx.send(f"{int(duration)}s remaining...") + else: + # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a + # half, of that half) we must sleep both halves at the end. + await asyncio.wait([self.question_closed.wait()], timeout=duration) + if self.question_closed.is_set(): + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + self.question_closed.clear() + return + break + + await ctx.send(embed=question_view.end_question(self.scoreboard)) + await message.edit(embed=question_embed, view=None) + + self.game.end_question() + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def list(self, ctx: commands.Context) -> None: + """ + Display all the questions left in the question bank. + + Questions are displayed in the following format: + Q(number): Question description | :white_check_mark: if the question was used otherwise :x:. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + question_list = self.game.list_questions() + + list_embed = Embed(title="All Trivia Night Questions") + + if len(question_list) == 1: + list_embed.description = question_list[0] + await ctx.send(embed=list_embed) + else: + await LinePaginator.paginate( + question_list, + ctx, + list_embed + ) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def stop(self, ctx: commands.Context) -> None: + """ + End the ongoing question to show the correct question. + + This command should be used if the question should be ended early or if the time limit fails + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no ongoing question!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + self.question_closed.set() + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def end(self, ctx: commands.Context) -> None: + """ + Displays the scoreboard view. + + The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the + 30 players who had the fastest average response time to a question where they got the question right. + + The scoreboard view also has a button where the user can see their own rank, points and average speed if they + didn't make it onto the leaderboard. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="You can't end the event while a question is ongoing!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + scoreboard_embed, scoreboard_view = await self.scoreboard.display() + await ctx.send(embed=scoreboard_embed, view=scoreboard_view) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def scoreboard(self, ctx: commands.Context) -> None: + """ + Displays the scoreboard. + + The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the + 30 players who had the fastest average response time to a question where they got the question right. + """ + if self.game is None: + await ctx.send(embed=Embed( + title=choice(NEGATIVE_REPLIES), + description="There is no trivia night running!", + color=Colours.soft_red + )) + return + + if self.game.current_question is not None: + error_embed = Embed( + title=choice(NEGATIVE_REPLIES), + description="You can't end the event while a question is ongoing!", + color=Colours.soft_red + ) + await ctx.send(embed=error_embed) + return + + scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True) + await ctx.send(embeds=(scoreboard_embed, speed_scoreboard)) + + @trivianight.command() + @commands.has_any_role(*TRIVIA_NIGHT_ROLES) + async def end_game(self, ctx: commands.Context) -> None: + """Ends the ongoing game.""" + self.game = None + + await ctx.send(embed=Embed( + title=choice(POSITIVE_REPLIES), + description="The game has been stopped.", + color=Colours.soft_green + )) + + +def setup(bot: Bot) -> None: + """Load the TriviaNight cog.""" + bot.add_cog(TriviaNightCog(bot)) diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py index beff196f..77e38427 100644 --- a/bot/exts/fun/battleship.py +++ b/bot/exts/fun/battleship.py @@ -110,8 +110,8 @@ class Game: self.gameover: bool = False - self.turn: Optional[discord.Member] = None - self.next: Optional[discord.Member] = None + self.turn: Optional[Player] = None + self.next: Optional[Player] = None self.match: Optional[re.Match] = None self.surrender: bool = False diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py index f53695d5..1b88d065 100644 --- a/bot/exts/fun/connect_four.py +++ b/bot/exts/fun/connect_four.py @@ -5,6 +5,7 @@ from typing import Optional, Union import discord import emojis +from discord import ClientUser, Member from discord.ext import commands from bot.bot import Bot @@ -71,7 +72,9 @@ class Game: await self.message.add_reaction(CROSS_EMOJI) await self.message.edit(content=None, embed=embed) - async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None: + async def game_over( + self, action: str, player1: Union[ClientUser, Member], player2: Union[ClientUser, Member] + ) -> None: """Announces to public chat.""" if action == "win": await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}") diff --git a/bot/exts/fun/fun.py b/bot/exts/fun/fun.py index b148f1f3..e7337cb6 100644 --- a/bot/exts/fun/fun.py +++ b/bot/exts/fun/fun.py @@ -1,35 +1,21 @@ -import functools import json import logging import random from collections.abc import Iterable from pathlib import Path -from typing import Callable, Optional, Union +from typing import Literal -from discord import Embed, Message +import pyjokes +from discord import Embed from discord.ext import commands -from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content +from discord.ext.commands import BadArgument, Cog, Context, clean_content -from bot import utils from bot.bot import Bot from bot.constants import Client, Colours, Emojis -from bot.utils import helpers +from bot.utils import helpers, messages log = logging.getLogger(__name__) -UWU_WORDS = { - "fi": "fwi", - "l": "w", - "r": "w", - "some": "sum", - "th": "d", - "thing": "fing", - "tho": "fo", - "you're": "yuw'we", - "your": "yur", - "you": "yuw", -} - def caesar_cipher(text: str, offset: int) -> Iterable[str]: """ @@ -56,7 +42,6 @@ class Fun(Cog): def __init__(self, bot: Bot): self.bot = bot - self._caesar_cipher_embed = json.loads(Path("bot/resources/fun/caesar_info.json").read_text("UTF-8")) @staticmethod @@ -74,23 +59,6 @@ class Fun(Cog): else: raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.") - @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) - async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: - """Converts a given `text` into it's uwu equivalent.""" - conversion_func = functools.partial( - utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True - ) - text, embed = await Fun._get_text_and_embed(ctx, text) - # Convert embed if it exists - if embed is not None: - embed = Fun._convert_embed(conversion_func, embed) - converted_text = conversion_func(text) - converted_text = helpers.suppress_links(converted_text) - # Don't put >>> if only embed present - if converted_text: - converted_text = f">>> {converted_text.lstrip('> ')}" - await ctx.send(content=converted_text, embed=embed) - @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: """Randomly converts the casing of a given `text`.""" @@ -99,10 +67,10 @@ class Fun(Cog): return "".join( char.upper() if round(random.random()) else char.lower() for char in text ) - text, embed = await Fun._get_text_and_embed(ctx, text) + text, embed = await messages.get_text_and_embed(ctx, text) # Convert embed if it exists if embed is not None: - embed = Fun._convert_embed(conversion_func, embed) + embed = messages.convert_embed(conversion_func, embed) converted_text = conversion_func(text) converted_text = helpers.suppress_links(converted_text) # Don't put >>> if only embed present @@ -148,10 +116,10 @@ class Fun(Cog): """Encrypts the given string using the Caesar Cipher.""" return "".join(caesar_cipher(text, offset)) - text, embed = await Fun._get_text_and_embed(ctx, msg) + text, embed = await messages.get_text_and_embed(ctx, msg) if embed is not None: - embed = Fun._convert_embed(conversion_func, embed) + embed = messages.convert_embed(conversion_func, embed) converted_text = conversion_func(text) @@ -182,67 +150,11 @@ class Fun(Cog): """ await self._caesar_cipher(ctx, offset, msg, left_shift=True) - @staticmethod - async def _get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]: - """ - Attempts to extract the text and embed from a possible link to a discord Message. - - Does not retrieve the text and embed from the Message if it is in a channel the user does - not have read permissions in. - - Returns a tuple of: - str: If `text` is a valid discord Message, the contents of the message, else `text`. - Optional[Embed]: The embed if found in the valid Message, else None - """ - embed = None - - msg = await Fun._get_discord_message(ctx, text) - # Ensure the user has read permissions for the channel the message is in - if isinstance(msg, Message): - permissions = msg.channel.permissions_for(ctx.author) - if permissions.read_messages: - text = msg.clean_content - # Take first embed because we can't send multiple embeds - if msg.embeds: - embed = msg.embeds[0] - - return (text, embed) - - @staticmethod - async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]: - """ - Attempts to convert a given `text` to a discord Message object and return it. - - Conversion will succeed if given a discord Message ID or link. - Returns `text` if the conversion fails. - """ - try: - text = await MessageConverter().convert(ctx, text) - except commands.BadArgument: - log.debug(f"Input '{text:.20}...' is not a valid Discord Message") - return text - - @staticmethod - def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: - """ - Converts the text in an embed using a given conversion function, then return the embed. - - Only modifies the following fields: title, description, footer, fields - """ - embed_dict = embed.to_dict() - - embed_dict["title"] = func(embed_dict.get("title", "")) - embed_dict["description"] = func(embed_dict.get("description", "")) - - if "footer" in embed_dict: - embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) - - if "fields" in embed_dict: - for field in embed_dict["fields"]: - field["name"] = func(field.get("name", "")) - field["value"] = func(field.get("value", "")) - - return Embed.from_dict(embed_dict) + @commands.command() + async def joke(self, ctx: commands.Context, category: Literal["neutral", "chuck", "all"] = "all") -> None: + """Retrieves a joke of the specified `category` from the pyjokes api.""" + joke = pyjokes.get_joke(category=category) + await ctx.send(joke) def setup(bot: Bot) -> None: diff --git a/bot/exts/fun/latex.py b/bot/exts/fun/latex.py new file mode 100644 index 00000000..aeabcd20 --- /dev/null +++ b/bot/exts/fun/latex.py @@ -0,0 +1,138 @@ +import hashlib +import re +import string +from io import BytesIO +from pathlib import Path +from typing import BinaryIO, Optional + +import discord +from PIL import Image +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Channels, WHITELISTED_CHANNELS +from bot.utils.decorators import whitelist_override + +FORMATTED_CODE_REGEX = re.compile( + r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block + r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) + r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code + r"(?P<code>.*?)" # extract all code inside the markup + r"\s*" # any more whitespace before the end of the code markup + r"(?P=delim)", # match the exact same delimiter from the start again + re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive +) + +LATEX_API_URL = "https://rtex.probablyaweb.site/api/v2" +PASTEBIN_URL = "https://paste.pythondiscord.com" + +THIS_DIR = Path(__file__).parent +CACHE_DIRECTORY = THIS_DIR / "_latex_cache" +CACHE_DIRECTORY.mkdir(exist_ok=True) +TEMPLATE = string.Template(Path("bot/resources/fun/latex_template.txt").read_text()) + +PAD = 10 + +LATEX_ALLOWED_CHANNNELS = WHITELISTED_CHANNELS + ( + Channels.data_science_and_ai, + Channels.algos_and_data_structs, +) + + +def _prepare_input(text: str) -> str: + """Extract latex from a codeblock, if it is in one.""" + if match := FORMATTED_CODE_REGEX.match(text): + return match.group("code") + else: + return text + + +def _process_image(data: bytes, out_file: BinaryIO) -> None: + """Read `data` as an image file, and paste it on a white background.""" + image = Image.open(BytesIO(data)).convert("RGBA") + width, height = image.size + background = Image.new("RGBA", (width + 2 * PAD, height + 2 * PAD), "WHITE") + + # paste the image on the background, using the same image as the mask + # when an RGBA image is passed as the mask, its alpha band is used. + # this has the effect of skipping pasting the pixels where the image is transparent. + background.paste(image, (PAD, PAD), image) + background.save(out_file) + + +class InvalidLatexError(Exception): + """Represents an error caused by invalid latex.""" + + def __init__(self, logs: Optional[str]): + super().__init__(logs) + self.logs = logs + + +class Latex(commands.Cog): + """Renders latex.""" + + def __init__(self, bot: Bot): + self.bot = bot + + async def _generate_image(self, query: str, out_file: BinaryIO) -> None: + """Make an API request and save the generated image to cache.""" + payload = {"code": query, "format": "png"} + async with self.bot.http_session.post(LATEX_API_URL, data=payload, raise_for_status=True) as response: + response_json = await response.json() + if response_json["status"] != "success": + raise InvalidLatexError(logs=response_json.get("log")) + async with self.bot.http_session.get( + f"{LATEX_API_URL}/{response_json['filename']}", + raise_for_status=True + ) as response: + _process_image(await response.read(), out_file) + + async def _upload_to_pastebin(self, text: str) -> Optional[str]: + """Uploads `text` to the paste service, returning the url if successful.""" + try: + async with self.bot.http_session.post( + PASTEBIN_URL + "/documents", + data=text, + raise_for_status=True + ) as response: + response_json = await response.json() + if "key" in response_json: + return f"{PASTEBIN_URL}/{response_json['key']}.txt?noredirect" + except Exception: + # 400 (Bad Request) means there are too many characters + pass + + @commands.command() + @commands.max_concurrency(1, commands.BucketType.guild, wait=True) + @whitelist_override(channels=LATEX_ALLOWED_CHANNNELS) + async def latex(self, ctx: commands.Context, *, query: str) -> None: + """Renders the text in latex and sends the image.""" + query = _prepare_input(query) + + # the hash of the query is used as the filename in the cache. + query_hash = hashlib.md5(query.encode()).hexdigest() + image_path = CACHE_DIRECTORY / f"{query_hash}.png" + async with ctx.typing(): + if not image_path.exists(): + try: + with open(image_path, "wb") as out_file: + await self._generate_image(TEMPLATE.substitute(text=query), out_file) + except InvalidLatexError as err: + embed = discord.Embed(title="Failed to render input.") + if err.logs is None: + embed.description = "No logs available." + else: + logs_paste_url = await self._upload_to_pastebin(err.logs) + if logs_paste_url: + embed.description = f"[View Logs]({logs_paste_url})" + else: + embed.description = "Couldn't upload logs." + await ctx.send(embed=embed) + image_path.unlink() + return + await ctx.send(file=discord.File(image_path, "latex.png")) + + +def setup(bot: Bot) -> None: + """Load the Latex Cog.""" + bot.add_cog(Latex(bot)) diff --git a/bot/exts/fun/uwu.py b/bot/exts/fun/uwu.py new file mode 100644 index 00000000..83497893 --- /dev/null +++ b/bot/exts/fun/uwu.py @@ -0,0 +1,204 @@ +import random +import re +import typing as t +from dataclasses import dataclass +from functools import partial + +import discord +from discord.ext import commands +from discord.ext.commands import Cog, Context, clean_content + +from bot.bot import Bot +from bot.utils import helpers, messages + +WORD_REPLACE = { + "small": "smol", + "cute": "kawaii~", + "fluff": "floof", + "love": "luv", + "stupid": "baka", + "idiot": "baka", + "what": "nani", + "meow": "nya~", + "roar": "rawrr~", +} + +EMOJIS = [ + "rawr x3", + "OwO", + "UwU", + "o.O", + "-.-", + ">w<", + "ΟΟΟ", + "Γ²ΟΓ³", + "ΚwΚ", + ":3", + "XD", + "nyaa~~", + "mya", + ">_<", + "rawr", + "uwu", + "^^", + "^^;;", +] + +REGEX_WORD_REPLACE = re.compile(r"(?<!w)[lr](?!w)") + +REGEX_PUNCTUATION = re.compile(r"[.!?\r\n\t]") + +REGEX_STUTTER = re.compile(r"(\s)([a-zA-Z])") +SUBSTITUTE_STUTTER = r"\g<1>\g<2>-\g<2>" + +REGEX_NYA = re.compile(r"n([aeou][^aeiou])") +SUBSTITUTE_NYA = r"ny\1" + +REGEX_EMOJI = re.compile(r"<(a)?:(\w+?):(\d{15,21}?)>", re.ASCII) + + +@dataclass(frozen=True, eq=True) +class Emoji: + """Data class for an Emoji.""" + + name: str + uid: int + animated: bool = False + + def __str__(self): + anim_bit = "a" if self.animated else "" + return f"<{anim_bit}:{self.name}:{self.uid}>" + + def can_display(self, bot: Bot) -> bool: + """Determines if a bot is in a server with the emoji.""" + return bot.get_emoji(self.uid) is not None + + @classmethod + def from_match(cls, match: tuple[str, str, str]) -> t.Optional['Emoji']: + """Creates an Emoji from a regex match tuple.""" + if not match or len(match) != 3 or not match[2].isdecimal(): + return None + return cls(match[1], int(match[2]), match[0] == "a") + + +class Uwu(Cog): + """Cog for the uwu command.""" + + def __init__(self, bot: Bot): + self.bot = bot + + def _word_replace(self, input_string: str) -> str: + """Replaces words that are keys in the word replacement hash to the values specified.""" + for word, replacement in WORD_REPLACE.items(): + input_string = input_string.replace(word, replacement) + return input_string + + def _char_replace(self, input_string: str) -> str: + """Replace certain characters with 'w'.""" + return REGEX_WORD_REPLACE.sub("w", input_string) + + def _stutter(self, strength: float, input_string: str) -> str: + """Adds stuttering to a string.""" + return REGEX_STUTTER.sub(partial(self._stutter_replace, strength=strength), input_string, 0) + + def _stutter_replace(self, match: re.Match, strength: float = 0.0) -> str: + """Replaces a single character with a stuttered character.""" + match_string = match.group() + if random.random() < strength: + return f"{match_string}-{match_string[-1]}" # Stutter the last character + return match_string + + def _nyaify(self, input_string: str) -> str: + """Nyaifies a string by adding a 'y' between an 'n' and a vowel.""" + return REGEX_NYA.sub(SUBSTITUTE_NYA, input_string, 0) + + def _emoji(self, strength: float, input_string: str) -> str: + """Replaces some punctuation with emoticons.""" + return REGEX_PUNCTUATION.sub(partial(self._emoji_replace, strength=strength), input_string, 0) + + def _emoji_replace(self, match: re.Match, strength: float = 0.0) -> str: + """Replaces a punctuation character with an emoticon.""" + match_string = match.group() + if random.random() < strength: + return f" {random.choice(EMOJIS)} " + return match_string + + def _ext_emoji_replace(self, input_string: str) -> str: + """Replaces any emoji the bot cannot send in input_text with a random emoticons.""" + groups = REGEX_EMOJI.findall(input_string) + emojis = {Emoji.from_match(match) for match in groups} + # Replace with random emoticon if unable to display + emojis_map = { + re.escape(str(e)): random.choice(EMOJIS) + for e in emojis if e and not e.can_display(self.bot) + } + if emojis_map: + # Pattern for all emoji markdowns to be replaced + emojis_re = re.compile("|".join(emojis_map.keys())) + # Replace matches with random emoticon + return emojis_re.sub( + lambda m: emojis_map[re.escape(m.group())], + input_string + ) + # Return original if no replacement + return input_string + + def _uwuify(self, input_string: str, *, stutter_strength: float = 0.2, emoji_strength: float = 0.1) -> str: + """Takes a string and returns an uwuified version of it.""" + input_string = input_string.lower() + input_string = self._word_replace(input_string) + input_string = self._nyaify(input_string) + input_string = self._char_replace(input_string) + input_string = self._stutter(stutter_strength, input_string) + input_string = self._emoji(emoji_strength, input_string) + input_string = self._ext_emoji_replace(input_string) + return input_string + + @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) + async def uwu_command(self, ctx: Context, *, text: t.Optional[str] = None) -> None: + """ + Echo an uwuified version the passed text. + + Example: + '.uwu Hello, my name is John' returns something like + 'hewwo, m-my name is j-john nyaa~'. + """ + # If `text` isn't provided then we try to get message content of a replied message + text = text or getattr(ctx.message.reference, "resolved", None) + if isinstance(text, discord.Message): + embeds = text.embeds + text = text.content + else: + embeds = None + + if text is None: + # If we weren't able to get the content of a replied message + raise commands.UserInputError("Your message must have content or you must reply to a message.") + + await clean_content(fix_channel_mentions=True).convert(ctx, text) + + # Grabs the text from the embed for uwuification + if embeds: + embed = messages.convert_embed(self._uwuify, embeds[0]) + else: + # Parse potential message links in text + text, embed = await messages.get_text_and_embed(ctx, text) + + # If an embed is found, grab and uwuify its text + if embed: + embed = messages.convert_embed(self._uwuify, embed) + + # Adds the text harvested from an embed to be put into another quote block. + if text: + converted_text = self._uwuify(text) + converted_text = helpers.suppress_links(converted_text) + converted_text = f">>> {converted_text.lstrip('> ')}" + else: + converted_text = None + + await ctx.send(content=converted_text, embed=embed) + + +def setup(bot: Bot) -> None: + """Load the uwu cog.""" + bot.add_cog(Uwu(bot)) diff --git a/bot/exts/holidays/easter/egg_facts.py b/bot/exts/holidays/easter/egg_facts.py index 5f216e0d..152af6a4 100644 --- a/bot/exts/holidays/easter/egg_facts.py +++ b/bot/exts/holidays/easter/egg_facts.py @@ -31,7 +31,7 @@ class EasterFacts(commands.Cog): """A background task that sends an easter egg fact in the event channel everyday.""" await self.bot.wait_until_guild_available() - channel = self.bot.get_channel(Channels.community_bot_commands) + channel = self.bot.get_channel(Channels.sir_lancebot_playground) await channel.send(embed=self.make_embed()) @commands.command(name="eggfact", aliases=("fact",)) diff --git a/bot/exts/holidays/halloween/candy_collection.py b/bot/exts/holidays/halloween/candy_collection.py index 729bbc97..220ba8e5 100644 --- a/bot/exts/holidays/halloween/candy_collection.py +++ b/bot/exts/holidays/halloween/candy_collection.py @@ -55,7 +55,7 @@ class CandyCollection(commands.Cog): if message.author.bot: return # ensure it's hacktober channel - if message.channel.id != Channels.community_bot_commands: + if message.channel.id != Channels.sir_lancebot_playground: return # do random check for skull first as it has the lower chance @@ -77,7 +77,7 @@ class CandyCollection(commands.Cog): return # check to ensure it is in correct channel - if message.channel.id != Channels.community_bot_commands: + if message.channel.id != Channels.sir_lancebot_playground: return # if its not a candy or skull, and it is one of 10 most recent messages, @@ -139,7 +139,7 @@ class CandyCollection(commands.Cog): @property def hacktober_channel(self) -> discord.TextChannel: """Get #hacktoberbot channel from its ID.""" - return self.bot.get_channel(Channels.community_bot_commands) + return self.bot.get_channel(Channels.sir_lancebot_playground) @staticmethod async def send_spook_msg( diff --git a/bot/exts/holidays/halloween/spookynamerate.py b/bot/exts/holidays/halloween/spookynamerate.py index a3aa4f13..02fb71c3 100644 --- a/bot/exts/holidays/halloween/spookynamerate.py +++ b/bot/exts/holidays/halloween/spookynamerate.py @@ -223,7 +223,7 @@ class SpookyNameRate(Cog): if self.first_time: await channel.send( "Okkey... Welcome to the **Spooky Name Rate Game**! It's a relatively simple game.\n" - f"Everyday, a random name will be sent in <#{Channels.community_bot_commands}> " + f"Everyday, a random name will be sent in <#{Channels.sir_lancebot_playground}> " "and you need to try and spookify it!\nRegister your name using " f"`{Client.prefix}spookynamerate add spookified name`" ) @@ -359,10 +359,10 @@ class SpookyNameRate(Cog): """Gets the sir-lancebot-channel after waiting until ready.""" await self.bot.wait_until_ready() channel = self.bot.get_channel( - Channels.community_bot_commands - ) or await self.bot.fetch_channel(Channels.community_bot_commands) + Channels.sir_lancebot_playground + ) or await self.bot.fetch_channel(Channels.sir_lancebot_playground) if not channel: - logger.warning("Bot is unable to get the #seasonalbot-commands channel. Please check the channel ID.") + logger.warning("Bot is unable to get the #sir-lancebot-playground channel. Please check the channel ID.") return channel @staticmethod diff --git a/bot/exts/holidays/pride/pride_facts.py b/bot/exts/holidays/pride/pride_facts.py index e6ef7108..340f0b43 100644 --- a/bot/exts/holidays/pride/pride_facts.py +++ b/bot/exts/holidays/pride/pride_facts.py @@ -30,7 +30,7 @@ class PrideFacts(commands.Cog): """Background task to post the daily pride fact every day.""" await self.bot.wait_until_guild_available() - channel = self.bot.get_channel(Channels.community_bot_commands) + channel = self.bot.get_channel(Channels.sir_lancebot_playground) await self.send_select_fact(channel, datetime.utcnow()) async def send_random_fact(self, ctx: commands.Context) -> None: diff --git a/bot/exts/holidays/pride/pride_leader.py b/bot/exts/holidays/pride/pride_leader.py index 298c9328..adf01134 100644 --- a/bot/exts/holidays/pride/pride_leader.py +++ b/bot/exts/holidays/pride/pride_leader.py @@ -83,7 +83,7 @@ class PrideLeader(commands.Cog): embed.add_field( name="For More Information", value=f"Do `{constants.Client.prefix}wiki {name}`" - f" in <#{constants.Channels.community_bot_commands}>", + f" in <#{constants.Channels.sir_lancebot_playground}>", inline=False ) embed.set_thumbnail(url=pride_leader["url"]) diff --git a/bot/exts/holidays/valentines/be_my_valentine.py b/bot/exts/holidays/valentines/be_my_valentine.py index 1572d474..cbb95157 100644 --- a/bot/exts/holidays/valentines/be_my_valentine.py +++ b/bot/exts/holidays/valentines/be_my_valentine.py @@ -70,7 +70,7 @@ class BeMyValentine(commands.Cog): raise commands.UserInputError("Come on, you can't send a valentine to yourself :expressionless:") emoji_1, emoji_2 = self.random_emoji() - channel = self.bot.get_channel(Channels.community_bot_commands) + channel = self.bot.get_channel(Channels.sir_lancebot_playground) valentine, title = self.valentine_check(valentine_type) embed = discord.Embed( diff --git a/bot/exts/holidays/valentines/lovecalculator.py b/bot/exts/holidays/valentines/lovecalculator.py index a53014e5..10dea9df 100644 --- a/bot/exts/holidays/valentines/lovecalculator.py +++ b/bot/exts/holidays/valentines/lovecalculator.py @@ -12,7 +12,7 @@ from discord.ext import commands from discord.ext.commands import BadArgument, Cog, clean_content from bot.bot import Bot -from bot.constants import Channels, Client, Lovefest, Month +from bot.constants import Channels, Lovefest, Month, PYTHON_PREFIX from bot.utils.decorators import in_month log = logging.getLogger(__name__) @@ -32,7 +32,7 @@ class LoveCalculator(Cog): Tells you how much the two love each other. This command requires at least one member as input, if two are given love will be calculated between - those two users, if only one is given, the second member is asusmed to be the invoker. + those two users, if only one is given, the second member is assumed to be the invoker. Members are converted from: - User ID - Mention @@ -51,7 +51,7 @@ class LoveCalculator(Cog): raise BadArgument( "This command can only be ran against members with the lovefest role! " "This role be can assigned by running " - f"`{Client.prefix}lovefest sub` in <#{Channels.community_bot_commands}>." + f"`{PYTHON_PREFIX}subscribe` in <#{Channels.bot_commands}>." ) if whom is None: @@ -90,7 +90,7 @@ class LoveCalculator(Cog): name="A letter from Dr. Love:", value=data["text"] ) - embed.set_footer(text=f"You can unsubscribe from lovefest by using {Client.prefix}lovefest unsub") + embed.set_footer(text=f"You can unsubscribe from lovefest by using {PYTHON_PREFIX}subscribe.") await ctx.send(embed=embed) diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py index b50205a0..2e3458d8 100644 --- a/bot/exts/utilities/bookmark.py +++ b/bot/exts/utilities/bookmark.py @@ -16,6 +16,13 @@ log = logging.getLogger(__name__) # Number of seconds to wait for other users to bookmark the same message TIMEOUT = 120 BOOKMARK_EMOJI = "π" +MESSAGE_NOT_FOUND_ERROR = ( + "You must either provide a reference to a valid message, or reply to one." + "\n\nThe lookup strategy for a message is as follows (in order):" + "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" + "\n2. Lookup by message ID (the message **must** be in the current channel)" + "\n3. Lookup by message URL" +) class Bookmark(commands.Cog): @@ -42,51 +49,37 @@ class Bookmark(commands.Cog): return embed @staticmethod - def build_error_embed(user: discord.Member) -> discord.Embed: - """Builds an error embed for when a bookmark requester has DMs disabled.""" + def build_error_embed(message: str) -> discord.Embed: + """Builds an error embed for a given message.""" return discord.Embed( title=random.choice(ERROR_REPLIES), - description=f"{user.mention}, please enable your DMs to receive the bookmark.", + description=message, colour=Colours.soft_red ) async def action_bookmark( self, channel: discord.TextChannel, - user: discord.Member, + member: discord.Member, target_message: discord.Message, title: str ) -> None: - """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" + """ + Sends the given target_message as a bookmark to the member in DMs to the user. + + Send an error embed instead if the member has DMs disabled. + """ + embed = self.build_bookmark_dm(target_message, title) try: - embed = self.build_bookmark_dm(target_message, title) - await user.send(embed=embed) + await member.send(embed=embed) except discord.Forbidden: - error_embed = self.build_error_embed(user) + error_embed = self.build_error_embed(f"{member.mention}, please enable your DMs to receive the bookmark.") await channel.send(embed=error_embed) else: - log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") - - @staticmethod - async def send_reaction_embed( - channel: discord.TextChannel, - target_message: discord.Message - ) -> discord.Message: - """Sends an embed, with a reaction, so users can react to bookmark the message too.""" - message = await channel.send( - embed=discord.Embed( - description=( - f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " - f"[this message]({target_message.jump_url})." - ), - colour=Colours.soft_green - ) - ) - - await message.add_reaction(BOOKMARK_EMOJI) - return message + log.info(f"{member} bookmarked {target_message.jump_url} with title '{title}'") - @commands.command(name="bookmark", aliases=("bm", "pin")) + @commands.group(name="bookmark", aliases=("bm", "pin"), invoke_without_command=True) + @commands.guild_only() @whitelist_override(roles=(Roles.everyone,)) async def bookmark( self, @@ -95,30 +88,40 @@ class Bookmark(commands.Cog): *, title: str = "Bookmark" ) -> None: - """Send the author a link to `target_message` via DMs.""" - if not target_message: - if not ctx.message.reference: - raise commands.UserInputError( - "You must either provide a valid message to bookmark, or reply to one." - "\n\nThe lookup strategy for a message is as follows (in order):" - "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" - "\n2. Lookup by message ID (the message **must** be in the context channel)" - "\n3. Lookup by message URL" - ) - target_message = ctx.message.reference.resolved + """ + Send the author a link to the specified message via DMs. + + Members can either give a message as an argument, or reply to a message. + + Bookmarks can subsequently be deleted by using the `bookmark delete` command in DMs. + """ + target_message: Optional[discord.Message] = target_message or getattr(ctx.message.reference, "resolved", None) + if target_message is None: + raise commands.UserInputError(MESSAGE_NOT_FOUND_ERROR) # Prevent users from bookmarking a message in a channel they don't have access to permissions = target_message.channel.permissions_for(ctx.author) if not permissions.read_messages: log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.") - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description="You don't have permission to view this channel." - ) + embed = self.build_error_embed(f"{ctx.author.mention} You don't have permission to view this channel.") await ctx.send(embed=embed) return + await self.action_bookmark(ctx.channel, ctx.author, target_message, title) + + # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs + bookmarked_users = [ctx.author.id] + + reaction_embed = discord.Embed( + description=( + f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " + f"[this message]({ctx.message.jump_url})." + ), + colour=Colours.soft_green + ) + reaction_message = await ctx.send(embed=reaction_embed) + await reaction_message.add_reaction(BOOKMARK_EMOJI) + def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( @@ -134,11 +137,6 @@ class Bookmark(commands.Cog): user.id != self.bot.user.id )) ) - await self.action_bookmark(ctx.channel, ctx.author, target_message, title) - - # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs - bookmarked_users = [ctx.author.id] - reaction_message = await self.send_reaction_embed(ctx.channel, target_message) while True: try: @@ -152,6 +150,31 @@ class Bookmark(commands.Cog): await reaction_message.delete() + @bookmark.command(name="delete", aliases=("del", "rm"), root_aliases=("unbm", "unbookmark", "dmdelete", "dmdel")) + @whitelist_override(bypass_defaults=True, allow_dm=True) + async def delete_bookmark( + self, + ctx: commands.Context, + ) -> None: + """ + Delete the Sir-Lancebot message that the command invocation is replying to. + + This command allows deleting any message sent by Sir-Lancebot in the user's DM channel with the bot. + The command invocation must be a reply to the message that is to be deleted. + """ + target_message: Optional[discord.Message] = getattr(ctx.message.reference, "resolved", None) + if target_message is None: + raise commands.UserInputError("You must reply to the message from Sir-Lancebot you wish to delete.") + + if not isinstance(ctx.channel, discord.DMChannel): + raise commands.UserInputError("You can only run this command your own DMs!") + elif target_message.channel != ctx.channel: + raise commands.UserInputError("You can only delete messages in your own DMs!") + elif target_message.author != self.bot.user: + raise commands.UserInputError("You can only delete messages sent by Sir Lancebot!") + + await target_message.delete() + def setup(bot: Bot) -> None: """Load the Bookmark cog.""" diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py index b9feed18..42312dd1 100644 --- a/bot/exts/utilities/epoch.py +++ b/bot/exts/utilities/epoch.py @@ -35,7 +35,7 @@ class DateString(commands.Converter): """ try: return arrow.utcnow().dehumanize(argument) - except ValueError: + except (ValueError, OverflowError): try: dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True) except parser.ParserError: @@ -86,7 +86,10 @@ class Epoch(commands.Cog): view = TimestampMenuView(ctx, self._format_dates(date_time), epoch) original = await ctx.send(f"`{epoch}`", view=view) await view.wait() # wait until expiration before removing the dropdown - await original.edit(view=None) + try: + await original.edit(view=None) + except discord.NotFound: # disregard the error message if the message is deleled + pass @staticmethod def _format_dates(date: arrow.Arrow) -> list[str]: diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py index 539e388b..046f67df 100644 --- a/bot/exts/utilities/githubinfo.py +++ b/bot/exts/utilities/githubinfo.py @@ -1,30 +1,167 @@ import logging import random +import re +import typing as t +from dataclasses import dataclass from datetime import datetime -from urllib.parse import quote, quote_plus +from urllib.parse import quote import discord +from aiohttp import ClientResponse from discord.ext import commands from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES +from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens from bot.exts.core.extensions import invoke_help_command log = logging.getLogger(__name__) GITHUB_API_URL = "https://api.github.com" +REQUEST_HEADERS = { + "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if Tokens.github: + REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}" + +CODE_BLOCK_RE = re.compile( + r"^`([^`\n]+)`" # Inline codeblock + r"|```(.+?)```", # Multiline codeblock + re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( + r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass(eq=True, frozen=True) +class FoundIssue: + """Dataclass representing an issue found by the regex.""" + + organisation: t.Optional[str] + repository: str + number: str + + +@dataclass(eq=True, frozen=True) +class FetchError: + """Dataclass representing an error while fetching an issue.""" + + return_code: int + message: str + + +@dataclass(eq=True, frozen=True) +class IssueState: + """Dataclass representing the state of an issue.""" + + repository: str + number: int + url: str + title: str + emoji: str + class GithubInfo(commands.Cog): - """Fetches info from GitHub.""" + """A Cog that fetches info from GitHub.""" def __init__(self, bot: Bot): self.bot = bot + self.repos = [] + + @staticmethod + def remove_codeblocks(message: str) -> str: + """Remove any codeblock in a message.""" + return CODE_BLOCK_RE.sub("", message) + + async def fetch_issue( + self, + number: int, + repository: str, + user: str + ) -> t.Union[IssueState, FetchError]: + """ + Retrieve an issue from a GitHub repository. + + Returns IssueState on success, FetchError on failure. + """ + url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) + pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + + json_data, r = await self.fetch_data(url) + + if r.status == 403: + if r.headers.get("X-RateLimit-Remaining") == "0": + log.info(f"Ratelimit reached while fetching {url}") + return FetchError(403, "Ratelimit reached, please retry in a few minutes.") + return FetchError(403, "Cannot access issue.") + elif r.status in (404, 410): + return FetchError(r.status, "Issue not found.") + elif r.status != 200: + return FetchError(r.status, "Error while fetching issue.") + + # The initial API request is made to the issues API endpoint, which will return information + # if the issue or PR is present. However, the scope of information returned for PRs differs + # from issues: if the 'issues' key is present in the response then we can pull the data we + # need from the initial API call. + if "issues" in json_data["html_url"]: + if json_data.get("state") == "open": + emoji = Emojis.issue_open + else: + emoji = Emojis.issue_closed + + # If the 'issues' key is not contained in the API response and there is no error code, then + # we know that a PR has been requested and a call to the pulls API endpoint is necessary + # to get the desired information for the PR. + else: + pull_data, _ = await self.fetch_data(pulls_url) + if pull_data["draft"]: + emoji = Emojis.pull_request_draft + elif pull_data["state"] == "open": + emoji = Emojis.pull_request_open + # When 'merged_at' is not None, this means that the state of the PR is merged + elif pull_data["merged_at"] is not None: + emoji = Emojis.pull_request_merged + else: + emoji = Emojis.pull_request_closed + + issue_url = json_data.get("html_url") + + return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + + @staticmethod + def format_embed( + results: t.List[t.Union[IssueState, FetchError]] + ) -> discord.Embed: + """Take a list of IssueState or FetchError and format a Discord embed for them.""" + description_list = [] + + for result in results: + if isinstance(result, IssueState): + description_list.append( + f"{result.emoji} [[{result.repository}] #{result.number} {result.title}]({result.url})" + ) + elif isinstance(result, FetchError): + description_list.append(f":x: [{result.return_code}] {result.message}") + + resp = discord.Embed( + colour=Colours.bright_green, + description="\n".join(description_list) + ) - async def fetch_data(self, url: str) -> dict: - """Retrieve data as a dictionary.""" - async with self.bot.http_session.get(url) as r: - return await r.json() + resp.set_author(name="GitHub") + return resp @commands.group(name="github", aliases=("gh", "git")) @commands.cooldown(1, 10, commands.BucketType.user) @@ -33,11 +170,67 @@ class GithubInfo(commands.Cog): if ctx.invoked_subcommand is None: await invoke_help_command(ctx) + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """ + Automatic issue linking. + + Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. + """ + # Ignore bots + if message.author.bot: + return + + issues = [ + FoundIssue(*match.group("org", "repo", "number")) + for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) + ] + links = [] + + if issues: + # Block this from working in DMs + if not message.guild: + return + + log.trace(f"Found {issues = }") + # Remove duplicates + issues = list(dict.fromkeys(issues)) + + if len(issues) > MAXIMUM_ISSUES: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" + ) + await message.channel.send(embed=embed, delete_after=5) + return + + for repo_issue in issues: + result = await self.fetch_issue( + int(repo_issue.number), + repo_issue.repository, + repo_issue.organisation or "python-discord" + ) + if isinstance(result, IssueState): + links.append(result) + + if not links: + return + + resp = self.format_embed(links) + await message.channel.send(embed=resp) + + async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]: + """Retrieve data as a dictionary and the response in a tuple.""" + log.trace(f"Querying GH issues API: {url}") + async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: + return await r.json(), r + @github_group.command(name="user", aliases=("userinfo",)) async def github_user_info(self, ctx: commands.Context, username: str) -> None: """Fetches a user's GitHub information.""" async with ctx.typing(): - user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") + user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}") # User_data will not have a message key if the user exists if "message" in user_data: @@ -50,7 +243,7 @@ class GithubInfo(commands.Cog): await ctx.send(embed=embed) return - org_data = await self.fetch_data(user_data["organizations_url"]) + org_data, _ = await self.fetch_data(user_data["organizations_url"]) orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] orgs_to_add = " | ".join(orgs) @@ -91,10 +284,7 @@ class GithubInfo(commands.Cog): ) if user_data["type"] == "User": - embed.add_field( - name="Gists", - value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" - ) + embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})") embed.add_field( name=f"Organization{'s' if len(orgs)!=1 else ''}", @@ -123,7 +313,7 @@ class GithubInfo(commands.Cog): return async with ctx.typing(): - repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") # There won't be a message key if this repo exists if "message" in repo_data: diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py deleted file mode 100644 index b6d5a43e..00000000 --- a/bot/exts/utilities/issues.py +++ /dev/null @@ -1,277 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( - Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { - 404: "Issue/pull request not located! Please enter a valid number!", - 403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { - "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: - REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( - Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( - r"^`([^`\n]+)`" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( - r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: - """Dataclass representing an issue found by the regex.""" - - organisation: Optional[str] - repository: str - number: str - - def __hash__(self) -> int: - return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: - """Dataclass representing an error while fetching an issue.""" - - return_code: int - message: str - - -@dataclass -class IssueState: - """Dataclass representing the state of an issue.""" - - repository: str - number: int - url: str - title: str - emoji: str - - -class Issues(commands.Cog): - """Cog that allows users to retrieve issues from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.repos = [] - - @staticmethod - def remove_codeblocks(message: str) -> str: - """Remove any codeblock in a message.""" - return re.sub(CODE_BLOCK_RE, "", message) - - async def fetch_issues( - self, - number: int, - repository: str, - user: str - ) -> Union[IssueState, FetchError]: - """ - Retrieve an issue from a GitHub repository. - - Returns IssueState on success, FetchError on failure. - """ - url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) - pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) - log.trace(f"Querying GH issues API: {url}") - - async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: - json_data = await r.json() - - if r.status == 403: - if r.headers.get("X-RateLimit-Remaining") == "0": - log.info(f"Ratelimit reached while fetching {url}") - return FetchError(403, "Ratelimit reached, please retry in a few minutes.") - return FetchError(403, "Cannot access issue.") - elif r.status in (404, 410): - return FetchError(r.status, "Issue not found.") - elif r.status != 200: - return FetchError(r.status, "Error while fetching issue.") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data["html_url"]: - if json_data.get("state") == "open": - emoji = Emojis.issue_open - else: - emoji = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") - async with self.bot.http_session.get(pulls_url) as p: - pull_data = await p.json() - if pull_data["draft"]: - emoji = Emojis.pull_request_draft - elif pull_data["state"] == "open": - emoji = Emojis.pull_request_open - # When 'merged_at' is not None, this means that the state of the PR is merged - elif pull_data["merged_at"] is not None: - emoji = Emojis.pull_request_merged - else: - emoji = Emojis.pull_request_closed - - issue_url = json_data.get("html_url") - - return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - - @staticmethod - def format_embed( - results: list[Union[IssueState, FetchError]], - user: str, - repository: Optional[str] = None - ) -> discord.Embed: - """Take a list of IssueState or FetchError and format a Discord embed for them.""" - description_list = [] - - for result in results: - if isinstance(result, IssueState): - description_list.append(f"{result.emoji} [{result.title}]({result.url})") - elif isinstance(result, FetchError): - description_list.append(f":x: [{result.return_code}] {result.message}") - - resp = discord.Embed( - colour=Colours.bright_green, - description="\n".join(description_list) - ) - - embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" - resp.set_author(name="GitHub", url=embed_url) - return resp - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(aliases=("issues", "pr", "prs")) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" - # Remove duplicates - numbers = set(numbers) - - err_message = None - if not numbers: - err_message = "You must have at least one issue/PR!" - - elif len(numbers) > MAXIMUM_ISSUES: - err_message = f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - - # If there's an error with command invocation then send an error embed - if err_message is not None: - err_embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=err_message - ) - await ctx.send(embed=err_embed) - await invoke_help_command(ctx) - return - - results = [await self.fetch_issues(number, repository, user) for number in numbers] - await ctx.send(embed=self.format_embed(results, user, repository)) - - @commands.Cog.listener() - async def on_message(self, message: discord.Message) -> None: - """ - Automatic issue linking. - - Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. - """ - # Ignore bots - if message.author.bot: - return - - issues = [ - FoundIssue(*match.group("org", "repo", "number")) - for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) - ] - links = [] - - if issues: - # Block this from working in DMs - if not message.guild: - await message.channel.send( - embed=discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "You can't retrieve issues from DMs. " - f"Try again in <#{Channels.community_bot_commands}>" - ), - colour=Colours.soft_red - ) - ) - return - - log.trace(f"Found {issues = }") - # Remove duplicates - issues = set(issues) - - if len(issues) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await message.channel.send(embed=embed, delete_after=5) - return - - for repo_issue in issues: - result = await self.fetch_issues( - int(repo_issue.number), - repo_issue.repository, - repo_issue.organisation or "python-discord" - ) - if isinstance(result, IssueState): - links.append(result) - - if not links: - return - - resp = self.format_embed(links, "python-discord") - await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: - """Load the Issues cog.""" - bot.add_cog(Issues(bot)) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py index bf8f1341..5e9757d0 100644 --- a/bot/exts/utilities/realpython.py +++ b/bot/exts/utilities/realpython.py @@ -11,11 +11,10 @@ from bot.constants import Colours logger = logging.getLogger(__name__) - API_ROOT = "https://realpython.com/search/api/v1/" ARTICLE_URL = "https://realpython.com{article_url}" SEARCH_URL = "https://realpython.com/search?q={user_search}" - +HOME_URL = "https://realpython.com/" ERROR_EMBED = Embed( title="Error while searching Real Python", @@ -32,13 +31,22 @@ class RealPython(commands.Cog): @commands.command(aliases=["rp"]) @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None: + async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, + user_search: Optional[str] = None) -> None: """ Send some articles from RealPython that match the search terms. - By default the top 5 matches are sent, this can be overwritten to + By default, the top 5 matches are sent. This can be overwritten to a number between 1 and 5 by specifying an amount before the search query. + If no search query is specified by the user, the home page is sent. """ + if user_search is None: + home_page_embed = Embed(title="Real Python Home Page", url=HOME_URL, colour=Colours.orange) + + await ctx.send(embed=home_page_embed) + + return + if not 1 <= amount <= 5: await ctx.send("`amount` must be between 1 and 5 (inclusive).") return diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py new file mode 100644 index 00000000..c915f05b --- /dev/null +++ b/bot/exts/utilities/twemoji.py @@ -0,0 +1,150 @@ +import logging +import re +from typing import Literal, Optional + +import discord +from discord.ext import commands +from emoji import UNICODE_EMOJI_ENGLISH, is_emoji + +from bot.bot import Bot +from bot.constants import Colours, Roles +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) +BASE_URLS = { + "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/", + "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/", +} +CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$") + + +class Twemoji(commands.Cog): + """Utilities for working with Twemojis.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def get_url(codepoint: str, format: Literal["png", "svg"]) -> str: + """Returns a source file URL for the specified Twemoji, in the corresponding format.""" + return f"{BASE_URLS[format]}{codepoint}.{format}" + + @staticmethod + def alias_to_name(alias: str) -> str: + """ + Transform a unicode alias to an emoji name. + + Example usages: + >>> alias_to_name(":falling_leaf:") + "Falling leaf" + >>> alias_to_name(":family_man_girl_boy:") + "Family man girl boy" + """ + name = alias.strip(":").replace("_", " ") + return name.capitalize() + + @staticmethod + def build_embed(codepoint: str) -> discord.Embed: + """Returns the main embed for the `twemoji` commmand.""" + emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-")) + + embed = discord.Embed( + title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]), + description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})", + colour=Colours.twitter_blue, + ) + embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png")) + return embed + + @staticmethod + def emoji(codepoint: Optional[str]) -> Optional[str]: + """ + Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found. + + The return value is an emoji character, such as "π". The `codepoint` + argument can be of any format, since it will be trimmed automatically. + """ + if code := Twemoji.trim_code(codepoint): + return chr(int(code, 16)) + + @staticmethod + def codepoint(emoji: Optional[str]) -> Optional[str]: + """ + Returns the codepoint, in a trimmed format, of a single emoji. + + `emoji` should be an emoji character, such as "π" and "π₯°", and + not a codepoint like "1f1f8". When working with combined emojis, + such as "πΈπͺ" and "π¨βπ©βπ¦", send the component emojis through the method + one at a time. + """ + if emoji is None: + return None + return hex(ord(emoji)).removeprefix("0x") + + @staticmethod + def trim_code(codepoint: Optional[str]) -> Optional[str]: + """ + Returns the meaningful information from the given `codepoint`. + + If no codepoint is found, `None` is returned. + + Example usages: + >>> trim_code("U+1f1f8") + "1f1f8" + >>> trim_code("\u0001f1f8") + "1f1f8" + >>> trim_code("1f466") + "1f466" + """ + if code := CODEPOINT_REGEX.search(codepoint or ""): + return code.group() + + @staticmethod + def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str: + """ + Returns the codepoint corresponding to the passed tuple, separated by "-". + + The return format matches the format used in URLs for Twemoji source files. + + Example usages: + >>> codepoint_from_input(("π",)) + "1f40d" + >>> codepoint_from_input(("1f1f8", "1f1ea")) + "1f1f8-1f1ea" + >>> codepoint_from_input(("π¨βπ§βπ¦",)) + "1f468-200d-1f467-200d-1f466" + """ + raw_emoji = [emoji.lower() for emoji in raw_emoji] + if is_emoji(raw_emoji[0]): + emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0]) + return "-".join(emojis) + + emoji = "".join( + Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji + ) + if is_emoji(emoji): + return "-".join(Twemoji.codepoint(e) or "" for e in emoji) + + raise ValueError("No codepoint could be obtained from the given input") + + @commands.command(aliases=("tw",)) + @whitelist_override(roles=(Roles.everyone,)) + async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None: + """Sends a preview of a given Twemoji, specified by codepoint or emoji.""" + if len(raw_emoji) == 0: + await invoke_help_command(ctx) + return + try: + codepoint = self.codepoint_from_input(raw_emoji) + except ValueError: + raise commands.BadArgument( + "please include a valid emoji or emoji codepoint." + ) + + await ctx.send(embed=self.build_embed(codepoint)) + + +def setup(bot: Bot) -> None: + """Load the Twemoji cog.""" + bot.add_cog(Twemoji(bot)) @@ -6,7 +6,7 @@ from pathlib import Path import coloredlogs -from bot.constants import Client +from bot.constants import Logging def setup() -> None: @@ -20,7 +20,7 @@ def setup() -> None: log_format = logging.Formatter(format_string) root_logger = logging.getLogger() - if Client.file_logs: + if Logging.file_logs: # Set up file logging log_file = Path("logs/sir-lancebot.log") log_file.parent.mkdir(exist_ok=True) @@ -45,7 +45,7 @@ def setup() -> None: coloredlogs.install(level=logging.TRACE, stream=sys.stdout) - root_logger.setLevel(logging.DEBUG if Client.debug else logging.INFO) + root_logger.setLevel(logging.DEBUG if Logging.debug else logging.INFO) # Silence irrelevant loggers logging.getLogger("discord").setLevel(logging.ERROR) logging.getLogger("websockets").setLevel(logging.ERROR) @@ -81,7 +81,7 @@ def _set_trace_loggers() -> None: Otherwise if the env var begins with a "*", the root logger is set to the trace level and other contents are ignored. """ - level_filter = Client.trace_loggers + level_filter = Logging.trace_loggers if level_filter: if level_filter.startswith("*"): logging.getLogger().setLevel(logging.TRACE) diff --git a/bot/resources/fun/latex_template.txt b/bot/resources/fun/latex_template.txt new file mode 100644 index 00000000..6e67b810 --- /dev/null +++ b/bot/resources/fun/latex_template.txt @@ -0,0 +1,13 @@ +\documentclass{article} +\usepackage{amsmath,amsthm,amssymb,amsfonts} +\usepackage{bm} % nice bold symbols for matrices and vectors +\usepackage{bbm} % bold and calligraphic numbers +\usepackage[binary-units=true]{siunitx} % SI unit handling +\usepackage{tikz} % from here on, to make nice diagrams with tikz +\usepackage{ifthen} +\usetikzlibrary{patterns} +\usetikzlibrary{shapes, arrows, chains, fit, positioning, calc, decorations.pathreplacing} +\begin{document} + \pagenumbering{gobble} + $text +\end{document} diff --git a/bot/resources/utilities/py_topics.yaml b/bot/resources/utilities/py_topics.yaml index 1cd2c325..4527e8de 100644 --- a/bot/resources/utilities/py_topics.yaml +++ b/bot/resources/utilities/py_topics.yaml @@ -35,12 +35,31 @@ - Have you ever worked with a microcontroller or anything physical with Python before? - Have you ever tried making your own programming language? - Has a recently discovered Python module changed your general use of Python? + - What is your motivation for programming? + - What's your favorite Python related book? + - What's your favorite use of recursion in Python? + - If you could change one thing in Python, what would it be? + - What third-party library do you wish was in the Python standard library? + - Which package do you use the most and why? + - Which Python feature do you love the most? + - Do you have any plans for future projects? + - What modules/libraries do you want to see more projects using? + - What's the most ambitious thing you've done with Python so far? + +# programming-pedagogy +934931964509691966: + - What is the best way to teach/learn OOP? + - What benefits are there to teaching programming to students who aren't training to become developers? + - What are some basic concepts that we need to know before teaching programming to others? + - What are the most common difficulties/misconceptions students encounter while learning to program? + - What makes a project a good learning experience for beginners? + - What can make difficult concepts more fun for students to learn? # algos-and-data-structs 650401909852864553: - -# async +# async-and-concurrency 630504881542791169: - Are there any frameworks you wish were async? - How have coroutines changed the way you write Python? @@ -54,12 +73,13 @@ 342318764227821568: - Where do you get your best data? - What is your preferred database and for what use? + - What is the least safe use of databases you've seen? -# data-science +# data-science-and-ai 366673247892275221: - -# discord.py +# discord-bots 343944376055103488: - What unique features does your bot contain, if any? - What commands/features are you proud of making? @@ -78,6 +98,8 @@ - What's a common part of programming we can make harder? - What are the pros and cons of messing with __magic__()? - What's your favorite Python hack? + - What's the weirdest language feature that Python doesn't have, and how can we change that? + - What is the most esoteric code you've written? # game-development 660625198390837248: @@ -87,6 +109,17 @@ - What books or tutorials would you recommend for game-development beginners? - What made you start developing games? +# media-processing +971142229462777926: + - Where do you start with media processing? What is a good beginner project for first-timers in media processing? + - What are some ways you could manipulate media using Python? + - What is your favorite algorithm for manipulating media with Python? + - What is the most surprising result you have gotten after manipulating media with Python? + - What is the worst outcome you have gotten after manipulating media with Python? + - What is your most advanced media processing related achievement? + - Do you know any cool tricks or optimizations for manipulating media with Python? + - Can a computer truly generate and/or understand art? + # microcontrollers 545603026732318730: - What is your favorite version of the Raspberry Pi? @@ -110,6 +143,10 @@ - How often do you use GitHub Actions and workflows to automate your repositories? - What's your favorite app on GitHub? +# type-hinting +891788761371906108: + - + # unit-testing 463035728335732738: - @@ -120,6 +157,7 @@ - What's your most used Bash command? - How often do you update your Unix machine? - How often do you upgrade on production? + - What is your least favorite thing about interoperability amongst *NIX operating systems and/or platforms? # user-interfaces 338993628049571840: @@ -128,6 +166,7 @@ - Do you perfer Command Line Interfaces (CLI) or Graphic User Interfaces (GUI)? - What's your favorite CLI (Command Line Interface) or TUI (Terminal Line Interface)? - What's your best GUI project? + - What the best-looking app you've used? # web-development 366673702533988363: @@ -136,3 +175,4 @@ - What is your favorite API library? - What do you use for your frontend? - What does your stack look like? + - What's the best-looking website you've visited? diff --git a/bot/resources/utilities/starter.yaml b/bot/resources/utilities/starter.yaml index 6b0de0ef..ce759e1a 100644 --- a/bot/resources/utilities/starter.yaml +++ b/bot/resources/utilities/starter.yaml @@ -32,8 +32,6 @@ - How many years have you spent coding? - What book do you highly recommend everyone to read? - What websites do you use daily to keep yourself up to date with the industry? -- What made you want to join this Discord server? -- How are you? - What is the best advice you have ever gotten in regards to programming/software? - What is the most satisfying thing you've done in your life? - Who is your favorite music composer/producer/singer? @@ -49,3 +47,4 @@ - What artistic talents do you have? - What is the tallest building you've entered? - What is the oldest computer you've ever used? +- What animals do you like? diff --git a/bot/utils/checks.py b/bot/utils/checks.py index 8c426ed7..5433f436 100644 --- a/bot/utils/checks.py +++ b/bot/utils/checks.py @@ -33,7 +33,7 @@ def in_whitelist_check( channels: Container[int] = (), categories: Container[int] = (), roles: Container[int] = (), - redirect: Optional[int] = constants.Channels.community_bot_commands, + redirect: Optional[int] = constants.Channels.sir_lancebot_playground, fail_silently: bool = False, ) -> bool: """ diff --git a/bot/utils/commands.py b/bot/utils/commands.py new file mode 100644 index 00000000..7c04a25a --- /dev/null +++ b/bot/utils/commands.py @@ -0,0 +1,11 @@ +from typing import Optional + +from rapidfuzz import process + + +def get_command_suggestions( + all_commands: list[str], query: str, *, cutoff: int = 60, limit: int = 3 +) -> Optional[list]: + """Get similar command names.""" + results = process.extract(query, all_commands, score_cutoff=cutoff, limit=limit) + return [result[0] for result in results] diff --git a/bot/utils/decorators.py b/bot/utils/decorators.py index 0061abd9..442eb841 100644 --- a/bot/utils/decorators.py +++ b/bot/utils/decorators.py @@ -272,10 +272,10 @@ def whitelist_check(**default_kwargs: Container[int]) -> Callable[[Context], boo channels = set(kwargs.get("channels") or {}) categories = kwargs.get("categories") - # Only output override channels + community_bot_commands + # Only output override channels + sir_lancebot_playground if channels: default_whitelist_channels = set(WHITELISTED_CHANNELS) - default_whitelist_channels.discard(Channels.community_bot_commands) + default_whitelist_channels.discard(Channels.sir_lancebot_playground) channels.difference_update(default_whitelist_channels) # Add all whitelisted category channels, but skip if we're in DMs diff --git a/bot/utils/messages.py b/bot/utils/messages.py index a6c035f9..b0c95583 100644 --- a/bot/utils/messages.py +++ b/bot/utils/messages.py @@ -1,5 +1,12 @@ +import logging import re -from typing import Optional +from typing import Callable, Optional, Union + +from discord import Embed, Message +from discord.ext import commands +from discord.ext.commands import Context, MessageConverter + +log = logging.getLogger(__name__) def sub_clyde(username: Optional[str]) -> Optional[str]: @@ -17,3 +24,66 @@ def sub_clyde(username: Optional[str]) -> Optional[str]: return re.sub(r"(clyd)(e)", replace_e, username, flags=re.I) else: return username # Empty string or None + + +async def get_discord_message(ctx: Context, text: str) -> Union[Message, str]: + """ + Attempts to convert a given `text` to a discord Message object and return it. + + Conversion will succeed if given a discord Message ID or link. + Returns `text` if the conversion fails. + """ + try: + text = await MessageConverter().convert(ctx, text) + except commands.BadArgument: + pass + + return text + + +async def get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]: + """ + Attempts to extract the text and embed from a possible link to a discord Message. + + Does not retrieve the text and embed from the Message if it is in a channel the user does + not have read permissions in. + + Returns a tuple of: + str: If `text` is a valid discord Message, the contents of the message, else `text`. + Optional[Embed]: The embed if found in the valid Message, else None + """ + embed: Optional[Embed] = None + + msg = await get_discord_message(ctx, text) + # Ensure the user has read permissions for the channel the message is in + if isinstance(msg, Message): + permissions = msg.channel.permissions_for(ctx.author) + if permissions.read_messages: + text = msg.clean_content + # Take first embed because we can't send multiple embeds + if msg.embeds: + embed = msg.embeds[0] + + return text, embed + + +def convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: + """ + Converts the text in an embed using a given conversion function, then return the embed. + + Only modifies the following fields: title, description, footer, fields + """ + embed_dict = embed.to_dict() + + embed_dict["title"] = func(embed_dict.get("title", "")) + embed_dict["description"] = func(embed_dict.get("description", "")) + + if "footer" in embed_dict: + embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) + + if "fields" in embed_dict: + for field in embed_dict["fields"]: + field["name"] = func(field.get("name", "")) + field["value"] = func(field.get("value", "")) + + return Embed.from_dict(embed_dict) |