aboutsummaryrefslogtreecommitdiffstats
path: root/bot
diff options
context:
space:
mode:
Diffstat (limited to 'bot')
-rw-r--r--bot/__main__.py6
-rw-r--r--bot/constants.py28
-rw-r--r--bot/exts/core/error_handler.py46
-rw-r--r--bot/exts/core/help.py49
-rw-r--r--bot/exts/core/internal_eval/_internal_eval.py5
-rw-r--r--bot/exts/core/source.py4
-rw-r--r--bot/exts/events/advent_of_code/_cog.py7
-rw-r--r--bot/exts/events/advent_of_code/_helpers.py15
-rw-r--r--bot/exts/events/advent_of_code/views/dayandstarview.py8
-rw-r--r--bot/exts/events/trivianight/__init__.py0
-rw-r--r--bot/exts/events/trivianight/_game.py192
-rw-r--r--bot/exts/events/trivianight/_questions.py179
-rw-r--r--bot/exts/events/trivianight/_scoreboard.py186
-rw-r--r--bot/exts/events/trivianight/trivianight.py328
-rw-r--r--bot/exts/fun/battleship.py4
-rw-r--r--bot/exts/fun/connect_four.py5
-rw-r--r--bot/exts/fun/fun.py116
-rw-r--r--bot/exts/fun/latex.py138
-rw-r--r--bot/exts/fun/uwu.py204
-rw-r--r--bot/exts/holidays/easter/egg_facts.py2
-rw-r--r--bot/exts/holidays/halloween/candy_collection.py6
-rw-r--r--bot/exts/holidays/halloween/spookynamerate.py8
-rw-r--r--bot/exts/holidays/pride/pride_facts.py2
-rw-r--r--bot/exts/holidays/pride/pride_leader.py2
-rw-r--r--bot/exts/holidays/valentines/be_my_valentine.py2
-rw-r--r--bot/exts/holidays/valentines/lovecalculator.py8
-rw-r--r--bot/exts/utilities/bookmark.py123
-rw-r--r--bot/exts/utilities/epoch.py7
-rw-r--r--bot/exts/utilities/githubinfo.py218
-rw-r--r--bot/exts/utilities/issues.py277
-rw-r--r--bot/exts/utilities/realpython.py16
-rw-r--r--bot/exts/utilities/twemoji.py150
-rw-r--r--bot/log.py8
-rw-r--r--bot/resources/fun/latex_template.txt13
-rw-r--r--bot/resources/utilities/py_topics.yaml46
-rw-r--r--bot/resources/utilities/starter.yaml3
-rw-r--r--bot/utils/checks.py2
-rw-r--r--bot/utils/commands.py11
-rw-r--r--bot/utils/decorators.py4
-rw-r--r--bot/utils/messages.py72
40 files changed, 1962 insertions, 538 deletions
diff --git a/bot/__main__.py b/bot/__main__.py
index 6889fe2b..bd6c70ee 100644
--- a/bot/__main__.py
+++ b/bot/__main__.py
@@ -12,4 +12,8 @@ bot.add_check(whitelist_check(channels=WHITELISTED_CHANNELS, roles=STAFF_ROLES))
for ext in walk_extensions():
bot.load_extension(ext)
-bot.run(Client.token)
+if not Client.in_ci:
+ # Manually enable the message content intent. This is required until the below PR is merged
+ # https://github.com/python-discord/sir-lancebot/pull/1092
+ bot._connection._intents.value += 1 << 15
+ bot.run(Client.token)
diff --git a/bot/constants.py b/bot/constants.py
index 3b426c47..eb9ee4b3 100644
--- a/bot/constants.py
+++ b/bot/constants.py
@@ -12,6 +12,7 @@ __all__ = (
"Channels",
"Categories",
"Client",
+ "Logging",
"Colours",
"Emojis",
"Icons",
@@ -23,7 +24,7 @@ __all__ = (
"Reddit",
"RedisConfig",
"RedirectOutput",
- "PYTHON_PREFIX"
+ "PYTHON_PREFIX",
"MODERATION_ROLES",
"STAFF_ROLES",
"WHITELISTED_CHANNELS",
@@ -37,6 +38,7 @@ log = logging.getLogger(__name__)
PYTHON_PREFIX = "!"
+
@dataclasses.dataclass
class AdventOfCodeLeaderboard:
id: str
@@ -53,7 +55,7 @@ class AdventOfCodeLeaderboard:
def session(self) -> str:
"""Return either the actual `session` cookie or the fallback cookie."""
if self.use_fallback_session:
- log.info(f"Returning fallback cookie for board `{self.id}`.")
+ log.trace(f"Returning fallback cookie for board `{self.id}`.")
return AdventOfCode.fallback_session
return self._session
@@ -107,8 +109,11 @@ class Cats:
class Channels(NamedTuple):
advent_of_code = int(environ.get("AOC_CHANNEL_ID", 897932085766004786))
advent_of_code_commands = int(environ.get("AOC_COMMANDS_CHANNEL_ID", 897932607545823342))
- bot = 267659945086812160
+ algos_and_data_structs = 650401909852864553
+ bot_commands = 267659945086812160
+ community_meta = 267659945086812160
organisation = 551789653284356126
+ data_science_and_ai = 366673247892275221
devlog = int(environ.get("CHANNEL_DEVLOG", 622895325144940554))
dev_contrib = 635950537262759947
mod_meta = 775412552795947058
@@ -116,7 +121,7 @@ class Channels(NamedTuple):
off_topic_0 = 291284109232308226
off_topic_1 = 463035241142026251
off_topic_2 = 463035268514185226
- community_bot_commands = int(environ.get("CHANNEL_COMMUNITY_BOT_COMMANDS", 607247579608121354))
+ sir_lancebot_playground = int(environ.get("CHANNEL_COMMUNITY_BOT_COMMANDS", 607247579608121354))
voice_chat_0 = 412357430186344448
voice_chat_1 = 799647045886541885
staff_voice = 541638762007101470
@@ -130,23 +135,31 @@ class Categories(NamedTuple):
media = 799054581991997460
staff = 364918151625965579
+
codejam_categories_name = "Code Jam" # Name of the codejam team categories
+
class Client(NamedTuple):
name = "Sir Lancebot"
guild = int(environ.get("BOT_GUILD", 267624335836053506))
prefix = environ.get("PREFIX", ".")
token = environ.get("BOT_TOKEN")
debug = environ.get("BOT_DEBUG", "true").lower() == "true"
- file_logs = environ.get("FILE_LOGS", "false").lower() == "true"
+ in_ci = environ.get("IN_CI", "false").lower() == "true"
github_bot_repo = "https://github.com/python-discord/sir-lancebot"
# Override seasonal locks: 1 (January) to 12 (December)
month_override = int(environ["MONTH_OVERRIDE"]) if "MONTH_OVERRIDE" in environ else None
+
+
+class Logging(NamedTuple):
+ debug = Client.debug
+ file_logs = environ.get("FILE_LOGS", "false").lower() == "true"
trace_loggers = environ.get("BOT_TRACE_LOGGERS")
class Colours:
blue = 0x0279FD
+ twitter_blue = 0x1DA1F2
bright_green = 0x01D277
dark_green = 0x1F8B4C
orange = 0xE67E22
@@ -231,7 +244,6 @@ class Emojis:
status_dnd = "<:status_dnd:470326272082313216>"
status_offline = "<:status_offline:470326266537705472>"
-
stackoverflow_tag = "<:stack_tag:870926975307501570>"
stackoverflow_views = "<:stack_eye:870926992692879371>"
@@ -343,8 +355,8 @@ STAFF_ROLES = {Roles.helpers, Roles.moderation_team, Roles.admins, Roles.owners}
# Whitelisted channels
WHITELISTED_CHANNELS = (
- Channels.bot,
- Channels.community_bot_commands,
+ Channels.bot_commands,
+ Channels.sir_lancebot_playground,
Channels.off_topic_0,
Channels.off_topic_1,
Channels.off_topic_2,
diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py
index 676a1e70..4578f734 100644
--- a/bot/exts/core/error_handler.py
+++ b/bot/exts/core/error_handler.py
@@ -1,4 +1,3 @@
-import difflib
import logging
import math
import random
@@ -11,6 +10,7 @@ from sentry_sdk import push_scope
from bot.bot import Bot
from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput
+from bot.utils.commands import get_command_suggestions
from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure
from bot.utils.exceptions import APIError, MovedCommandError, UserNotPlayingError
@@ -98,7 +98,8 @@ class CommandErrorHandler(commands.Cog):
if isinstance(error, commands.NoPrivateMessage):
await ctx.send(
embed=self.error_embed(
- f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!",
+ "This command can only be used in the server. "
+ f"Go to <#{Channels.sir_lancebot_playground}> instead!",
NEGATIVE_REPLIES
)
)
@@ -157,31 +158,32 @@ class CommandErrorHandler(commands.Cog):
async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None:
"""Sends user similar commands if any can be found."""
- raw_commands = []
- for cmd in self.bot.walk_commands():
- if not cmd.hidden:
- raw_commands += (cmd.name, *cmd.aliases)
- if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1):
- similar_command_name = similar_command_data[0]
- similar_command = self.bot.get_command(similar_command_name)
-
- if not similar_command:
- return
-
- log_msg = "Cancelling attempt to suggest a command due to failed checks."
- try:
- if not await similar_command.can_run(ctx):
+ command_suggestions = []
+ if similar_command_names := get_command_suggestions(list(self.bot.all_commands.keys()), command_name):
+ for similar_command_name in similar_command_names:
+ similar_command = self.bot.get_command(similar_command_name)
+
+ if not similar_command:
+ continue
+
+ log_msg = "Cancelling attempt to suggest a command due to failed checks."
+ try:
+ if not await similar_command.can_run(ctx):
+ log.debug(log_msg)
+ continue
+ except commands.errors.CommandError as cmd_error:
log.debug(log_msg)
- return
- except commands.errors.CommandError as cmd_error:
- log.debug(log_msg)
- await self.on_command_error(ctx, cmd_error)
- return
+ await self.on_command_error(ctx, cmd_error)
+ continue
+
+ command_suggestions.append(similar_command_name)
misspelled_content = ctx.message.content
e = Embed()
e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON)
- e.description = misspelled_content.replace(command_name, similar_command_name, 1)
+ e.description = "\n".join(
+ misspelled_content.replace(command_name, cmd, 1) for cmd in command_suggestions
+ )
await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay)
diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py
index db3c2aa6..eb7a9762 100644
--- a/bot/exts/core/help.py
+++ b/bot/exts/core/help.py
@@ -3,16 +3,17 @@ import asyncio
import itertools
import logging
from contextlib import suppress
-from typing import NamedTuple, Union
+from typing import NamedTuple, Optional, Union
from discord import Colour, Embed, HTTPException, Message, Reaction, User
from discord.ext import commands
from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context
-from rapidfuzz import process
from bot import constants
from bot.bot import Bot
from bot.constants import Emojis
+from bot.utils.commands import get_command_suggestions
+from bot.utils.decorators import whitelist_override
from bot.utils.pagination import FIRST_EMOJI, LAST_EMOJI, LEFT_EMOJI, LinePaginator, RIGHT_EMOJI
DELETE_EMOJI = Emojis.trashcan
@@ -41,14 +42,18 @@ class HelpQueryNotFound(ValueError):
"""
Raised when a HelpSession Query doesn't match a command or cog.
- Contains the custom attribute of ``possible_matches``.
- Instances of this object contain a dictionary of any command(s) that were close to matching the
- query, where keys are the possible matched command names and values are the likeness match scores.
+ Params:
+ possible_matches: list of similar command names.
+ parent_command: parent command of an invalid subcommand. Only available when an invalid subcommand
+ has been passed.
"""
- def __init__(self, arg: str, possible_matches: dict = None):
+ def __init__(
+ self, arg: str, possible_matches: Optional[list[str]] = None, *, parent_command: Optional[Command] = None
+ ) -> None:
super().__init__(arg)
self.possible_matches = possible_matches
+ self.parent_command = parent_command
class HelpSession:
@@ -153,12 +158,17 @@ class HelpSession:
Will pass on possible close matches along with the `HelpQueryNotFound` exception.
"""
- # Combine command and cog names
- choices = list(self._bot.all_commands) + list(self._bot.cogs)
+ # Check if parent command is valid in case subcommand is invalid.
+ if " " in query:
+ parent, *_ = query.split()
+ parent_command = self._bot.get_command(parent)
+
+ if parent_command:
+ raise HelpQueryNotFound('Invalid Subcommand.', parent_command=parent_command)
- result = process.extract(query, choices, score_cutoff=90)
+ similar_commands = get_command_suggestions(list(self._bot.all_commands.keys()), query)
- raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result))
+ raise HelpQueryNotFound(f'Query "{query}" not found.', similar_commands)
async def timeout(self, seconds: int = 30) -> None:
"""Waits for a set number of seconds, then stops the help session."""
@@ -277,7 +287,7 @@ class HelpSession:
else:
results.append(f"<{name}>")
- return f"{cmd.name} {' '.join(results)}"
+ return f"{cmd.qualified_name} {' '.join(results)}"
async def build_pages(self) -> None:
"""Builds the list of content pages to be paginated through in the help message, as a list of str."""
@@ -304,9 +314,10 @@ class HelpSession:
prefix = constants.Client.prefix
signature = self._get_command_params(self.query)
+ paginator.add_line(f"**```\n{prefix}{signature}\n```**")
+
parent = self.query.full_parent_name + " " if self.query.parent else ""
- paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**")
- aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases]
+ aliases = [f"`{alias}`" if not parent else f"`{parent}{alias}`" for alias in self.query.aliases]
aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())]
aliases = ", ".join(sorted(aliases))
if aliases:
@@ -502,18 +513,26 @@ class Help(DiscordCog):
"""Custom Embed Pagination Help feature."""
@commands.command("help")
+ @whitelist_override(allow_dm=True)
async def new_help(self, ctx: Context, *commands) -> None:
"""Shows Command Help."""
try:
await HelpSession.start(ctx, *commands)
except HelpQueryNotFound as error:
+
+ # Send help message of parent command if subcommand is invalid.
+ if cmd := error.parent_command:
+ await ctx.send(str(error))
+ await self.new_help(ctx, cmd.qualified_name)
+ return
+
embed = Embed()
embed.colour = Colour.red()
embed.title = str(error)
if error.possible_matches:
- matches = "\n".join(error.possible_matches.keys())
- embed.description = f"**Did you mean:**\n`{matches}`"
+ matches = "\n".join(error.possible_matches)
+ embed.description = f"**Did you mean:**\n{matches}"
await ctx.send(embed=embed)
diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py
index 5b5461f0..190a15ec 100644
--- a/bot/exts/core/internal_eval/_internal_eval.py
+++ b/bot/exts/core/internal_eval/_internal_eval.py
@@ -34,6 +34,8 @@ RAW_CODE_REGEX = re.compile(
re.DOTALL # "." also matches newlines
)
+MAX_LENGTH = 99980
+
class InternalEval(commands.Cog):
"""Top secret code evaluation for admins and owners."""
@@ -85,9 +87,10 @@ class InternalEval(commands.Cog):
async def _upload_output(self, output: str) -> Optional[str]:
"""Upload `internal eval` output to our pastebin and return the url."""
+ data = self.shorten_output(output, max_length=MAX_LENGTH)
try:
async with self.bot.http_session.post(
- "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True
+ "https://paste.pythondiscord.com/documents", data=data, raise_for_status=True
) as resp:
data = await resp.json()
diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py
index 7572ce51..2801be0f 100644
--- a/bot/exts/core/source.py
+++ b/bot/exts/core/source.py
@@ -6,14 +6,16 @@ from discord import Embed
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Source
+from bot.constants import Channels, Source, WHITELISTED_CHANNELS
from bot.utils.converters import SourceConverter, SourceType
+from bot.utils.decorators import whitelist_override
class BotSource(commands.Cog):
"""Displays information about the bot's source code."""
@commands.command(name="source", aliases=("src",))
+ @whitelist_override(channels=WHITELISTED_CHANNELS+(Channels.community_meta, Channels.dev_contrib))
async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None:
"""Display information and a GitHub link to the source code of a command, tag, or cog."""
if not source_item:
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py
index 01161f26..518841d4 100644
--- a/bot/exts/events/advent_of_code/_cog.py
+++ b/bot/exts/events/advent_of_code/_cog.py
@@ -61,7 +61,8 @@ class AdventOfCode(commands.Cog):
self.status_task.set_name("AoC Status Countdown")
self.status_task.add_done_callback(_helpers.background_task_callback)
- self.completionist_task.start()
+ # Don't start task while event isn't running
+ # self.completionist_task.start()
@tasks.loop(minutes=10.0)
async def completionist_task(self) -> None:
@@ -96,7 +97,9 @@ class AdventOfCode(commands.Cog):
# Only give the role to people who have completed all 50 stars
continue
- member_id = aoc_name_to_member_id.get(member_aoc_info["name"], None)
+ aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}"
+
+ member_id = aoc_name_to_member_id.get(aoc_name)
if not member_id:
log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.")
continue
diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py
index 807cc275..6c004901 100644
--- a/bot/exts/events/advent_of_code/_helpers.py
+++ b/bot/exts/events/advent_of_code/_helpers.py
@@ -255,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]:
# Two attempts, one with the original session cookie and one with the fallback session
for attempt in range(1, 3):
- log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
+ log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
cookies = {"session": leaderboard.session}
try:
raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies)
@@ -332,7 +332,7 @@ async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name:
number_of_participants = len(leaderboard)
formatted_leaderboard = _format_leaderboard(leaderboard)
full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard)
- leaderboard_fetched_at = datetime.datetime.utcnow().isoformat()
+ leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
cached_leaderboard = {
"placement_leaderboard": json.dumps(raw_leaderboard_data),
@@ -368,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
"""Get an embed with the current summary stats of the leaderboard."""
leaderboard_url = leaderboard["full_leaderboard_url"]
refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60
+ refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp())
- aoc_embed = discord.Embed(
- colour=Colours.soft_green,
- timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]),
- description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*"
+ aoc_embed = discord.Embed(colour=Colours.soft_green)
+
+ aoc_embed.description = (
+ f"The leaderboard is refreshed every {refresh_minutes} minutes.\n"
+ f"Last Updated: <t:{refreshed_unix}:t>"
)
aoc_embed.add_field(
name="Number of Participants",
@@ -386,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
inline=True,
)
aoc_embed.set_author(name="Advent of Code", url=leaderboard_url)
- aoc_embed.set_footer(text="Last Updated")
aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL)
return aoc_embed
diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py
index a0bfa316..5529c12b 100644
--- a/bot/exts/events/advent_of_code/views/dayandstarview.py
+++ b/bot/exts/events/advent_of_code/views/dayandstarview.py
@@ -42,7 +42,13 @@ class AoCDropdownView(discord.ui.View):
async def interaction_check(self, interaction: discord.Interaction) -> bool:
"""Global check to ensure that the interacting user is the user who invoked the command originally."""
- return interaction.user == self.original_author
+ if interaction.user != self.original_author:
+ await interaction.response.send_message(
+ ":x: You can't interact with someone else's response. Please run the command yourself!",
+ ephemeral=True
+ )
+ return False
+ return True
@discord.ui.select(
placeholder="Day",
diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/events/trivianight/__init__.py
diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py
new file mode 100644
index 00000000..8b012a17
--- /dev/null
+++ b/bot/exts/events/trivianight/_game.py
@@ -0,0 +1,192 @@
+import time
+from random import randrange
+from string import ascii_uppercase
+from typing import Iterable, NamedTuple, Optional, TypedDict
+
+DEFAULT_QUESTION_POINTS = 10
+DEFAULT_QUESTION_TIME = 20
+
+
+class QuestionData(TypedDict):
+ """Representing the different 'keys' of the question taken from the JSON."""
+
+ number: str
+ description: str
+ answers: list[str]
+ correct: str
+ points: Optional[int]
+ time: Optional[int]
+
+
+class UserGuess(NamedTuple):
+ """Represents the user's guess for a question."""
+
+ answer: str
+ editable: bool
+ elapsed: float
+
+
+class QuestionClosed(RuntimeError):
+ """Exception raised when the question is not open for guesses anymore."""
+
+
+class AlreadyUpdated(RuntimeError):
+ """Exception raised when the user has already updated their guess once."""
+
+
+class AllQuestionsVisited(RuntimeError):
+ """Exception raised when all of the questions have been visited."""
+
+
+class Question:
+ """Interface for one question in a trivia night game."""
+
+ def __init__(self, data: QuestionData):
+ self._data = data
+ self._guesses: dict[int, UserGuess] = {}
+ self._started = None
+
+ # These properties are mostly proxies to the underlying data:
+
+ @property
+ def number(self) -> str:
+ """The number of the question."""
+ return self._data["number"]
+
+ @property
+ def description(self) -> str:
+ """The description of the question."""
+ return self._data["description"]
+
+ @property
+ def answers(self) -> list[tuple[str, str]]:
+ """
+ The possible answers for this answer.
+
+ This is a property that returns a list of letter, answer pairs.
+ """
+ return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])]
+
+ @property
+ def correct(self) -> str:
+ """The correct answer for this question."""
+ return self._data["correct"]
+
+ @property
+ def max_points(self) -> int:
+ """The maximum points that can be awarded for this question."""
+ return self._data.get("points") or DEFAULT_QUESTION_POINTS
+
+ @property
+ def time(self) -> float:
+ """The time allowed to answer the question."""
+ return self._data.get("time") or DEFAULT_QUESTION_TIME
+
+ def start(self) -> float:
+ """Start the question and return the time it started."""
+ self._started = time.perf_counter()
+ return self._started
+
+ def _update_guess(self, user: int, answer: str) -> UserGuess:
+ """Update an already existing guess."""
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ if self._guesses[user][1] is False:
+ raise AlreadyUpdated(f"User({user}) has already updated their guess once.")
+
+ self._guesses[user] = (answer, False, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def guess(self, user: int, answer: str) -> UserGuess:
+ """Add a guess made by a user to the current question."""
+ if user in self._guesses:
+ return self._update_guess(user, answer)
+
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ self._guesses[user] = (answer, True, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def stop(self) -> dict[int, UserGuess]:
+ """Stop the question and return the guesses that were made."""
+ guesses = self._guesses
+
+ self._started = None
+ self._guesses = {}
+
+ return guesses
+
+
+class TriviaNightGame:
+ """Interface for managing a game of trivia night."""
+
+ def __init__(self, data: list[QuestionData]) -> None:
+ self._questions = [Question(q) for q in data]
+ # A copy of the questions to keep for `.trivianight list`
+ self._all_questions = list(self._questions)
+ self.current_question: Optional[Question] = None
+ self._points = {}
+ self._speed = {}
+
+ def __iter__(self) -> Iterable[Question]:
+ return iter(self._questions)
+
+ def next_question(self, number: str = None) -> Question:
+ """
+ Consume one random question from the trivia night game.
+
+ One question is randomly picked from the list of questions which is then removed and returned.
+ """
+ if self.current_question is not None:
+ raise RuntimeError("Cannot call next_question() when there is a current question.")
+
+ if number is not None:
+ try:
+ question = [q for q in self._all_questions if q.number == int(number)][0]
+ except IndexError:
+ raise ValueError(f"Question number {number} does not exist.")
+ elif len(self._questions) == 0:
+ raise AllQuestionsVisited("All of the questions have been visited.")
+ else:
+ question = self._questions.pop(randrange(len(self._questions)))
+
+ self.current_question = question
+ return question
+
+ def end_question(self) -> None:
+ """
+ End the current question.
+
+ This method should be called when the question has been answered, it must be called before
+ attempting to call `next_question()` again.
+ """
+ if self.current_question is None:
+ raise RuntimeError("Cannot call end_question() when there is no current question.")
+
+ self.current_question.stop()
+ self.current_question = None
+
+ def list_questions(self) -> str:
+ """
+ List all the questions.
+
+ This method should be called when `.trivianight list` is called to display the following information:
+ - Question number
+ - Question description
+ - Visited/not visited
+ """
+ question_list = []
+
+ visited = ":white_check_mark:"
+ not_visited = ":x:"
+
+ for question in self._all_questions:
+ formatted_string = (
+ f"**Q{question.number}** {not_visited if question in self._questions else visited}"
+ f"\n{question.description}\n\n"
+ )
+ question_list.append(formatted_string.rstrip())
+
+ return question_list
diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py
new file mode 100644
index 00000000..d6beced9
--- /dev/null
+++ b/bot/exts/events/trivianight/_questions.py
@@ -0,0 +1,179 @@
+from random import choice
+from string import ascii_uppercase
+
+import discord
+from discord import Embed, Interaction
+from discord.ui import Button, View
+
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+from ._game import AlreadyUpdated, Question, QuestionClosed
+from ._scoreboard import Scoreboard
+
+
+class AnswerButton(Button):
+ """Button subclass that's used to guess on a particular answer."""
+
+ def __init__(self, label: str, question: Question):
+ super().__init__(label=label, style=discord.ButtonStyle.green)
+
+ self.question = question
+
+ async def callback(self, interaction: Interaction) -> None:
+ """
+ When a user interacts with the button, this will be called.
+
+ Parameters:
+ - interaction: an instance of discord.Interaction representing the interaction between the user and the
+ button.
+ """
+ try:
+ guess = self.question.guess(interaction.user.id, self.label)
+ except AlreadyUpdated:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You've already changed your answer more than once!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+ except QuestionClosed:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="The question is no longer accepting guesses!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+
+ if guess[1]:
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You chose answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+ else:
+ # guess[1] is False and they cannot change their answer again. Which
+ # indicates that they changed it this time around.
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You changed your answer to answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+
+
+class QuestionView(View):
+ """View for one trivia night question."""
+
+ def __init__(self, question: Question) -> None:
+ super().__init__()
+ self.question = question
+
+ for letter, _ in self.question.answers:
+ self.add_item(AnswerButton(letter, self.question))
+
+ @staticmethod
+ def unicodeify(text: str) -> str:
+ """
+ Takes `text` and adds zero-width spaces to prevent copy and pasting the question.
+
+ Parameters:
+ - text: A string that represents the question description to 'unicodeify'
+ """
+ return "".join(
+ f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter
+ for idx, letter in enumerate(text)
+ )
+
+ def create_embed(self) -> Embed:
+ """Helper function to create the embed for the current question."""
+ question_embed = Embed(
+ title=f"Question {self.question.number}",
+ description=self.unicodeify(self.question.description),
+ color=Colours.python_yellow
+ )
+
+ for label, answer in self.question.answers:
+ question_embed.add_field(name=f"Answer {label}", value=answer, inline=False)
+
+ return question_embed
+
+ def end_question(self, scoreboard: Scoreboard) -> Embed:
+ """
+ Ends the question and displays the statistics on who got the question correct, awards points, etc.
+
+ Returns:
+ An embed displaying the correct answers and the % of people that chose each answer.
+ """
+ guesses = self.question.stop()
+
+ labels = ascii_uppercase[:len(self.question.answers)]
+
+ answer_embed = Embed(
+ title=f"The correct answer for Question {self.question.number} was...",
+ description=self.question.correct
+ )
+
+ if len(guesses) != 0:
+ answers_chosen = {
+ answer_choice: len(
+ tuple(filter(lambda x: x[0] == answer_choice, guesses.values()))
+ )
+ for answer_choice in labels
+ }
+
+ answers_chosen = dict(
+ sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True)
+ )
+
+ for answer, people_answered in answers_chosen.items():
+ is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct
+
+ # Setting the color of answer_embed to the % of people that got it correct via the mapping
+ if is_correct_answer:
+ # Maps the % of people who got it right to a color, from a range of red to green
+ percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB]
+ answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25]
+
+ field_title = (
+ (":white_check_mark: " if is_correct_answer else "")
+ + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose"
+ )
+
+ # The `ord` function is used here to change the letter to its corresponding position
+ answer_embed.add_field(
+ name=field_title,
+ value=self.question.answers[ord(answer) - 65][1],
+ inline=False
+ )
+
+ # Assign points to users
+ for user_id, answer in guesses.items():
+ if dict(self.question.answers)[answer[0]] == self.question.correct:
+ scoreboard.assign_points(
+ int(user_id),
+ points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points,
+ speed=answer[-1]
+ )
+ elif answer[-1] <= 2:
+ scoreboard.assign_points(
+ int(user_id),
+ points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points
+ )
+ else:
+ scoreboard.assign_points(
+ int(user_id),
+ points=0
+ )
+
+ return answer_embed
diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py
new file mode 100644
index 00000000..a5a5fcac
--- /dev/null
+++ b/bot/exts/events/trivianight/_scoreboard.py
@@ -0,0 +1,186 @@
+from random import choice
+
+import discord.ui
+from discord import ButtonStyle, Embed, Interaction, Member
+from discord.ui import Button, View
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+
+class ScoreboardView(View):
+ """View for the scoreboard."""
+
+ def __init__(self, bot: Bot):
+ super().__init__()
+ self.bot = bot
+
+ @staticmethod
+ def _int_to_ordinal(number: int) -> str:
+ """
+ Converts an integer into an ordinal number, i.e. 1 to 1st.
+
+ Parameters:
+ - number: an integer representing the number to convert to an ordinal number.
+ """
+ suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)]
+ if (number % 100) in {11, 12, 13}:
+ suffix = "th"
+
+ return str(number) + suffix
+
+ async def create_main_leaderboard(self) -> Embed:
+ """
+ Helper function that iterates through `self.points` to generate the main leaderboard embed.
+
+ The main leaderboard would be formatted like the following:
+ **1**. @mention of the user (# of points)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, points) in enumerate(self.points.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({points:.1f} pts)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ main_embed = Embed(
+ title="Winners of the Trivia Night",
+ description=formatted_string,
+ color=Colours.python_blue,
+ )
+
+ return main_embed
+
+ async def _create_speed_embed(self) -> Embed:
+ """
+ Helper function that iterates through `self.speed` to generate a leaderboard embed.
+
+ The speed leaderboard would be formatted like the following:
+ **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, time_taken) in enumerate(self.speed.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ speed_embed = Embed(
+ title="Average time taken to answer a question",
+ description=formatted_string,
+ color=Colours.python_blue
+ )
+ return speed_embed
+
+ def _get_rank(self, member: Member) -> Embed:
+ """
+ Gets the member's rank for the points leaderboard and speed leaderboard.
+
+ Parameters:
+ - member: An instance of discord.Member representing the person who is trying to get their rank.
+ """
+ rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue)
+ # These are stored as strings so that the last digit can be determined to choose the suffix
+ try:
+ points_rank = str(list(self.points).index(member.id) + 1)
+ speed_rank = str(list(self.speed).index(member.id) + 1)
+ except ValueError:
+ return Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="It looks like you didn't participate in the Trivia Night event!",
+ color=Colours.soft_red
+ )
+
+ rank_embed.add_field(
+ name="Total Points",
+ value=(
+ f"You got {self._int_to_ordinal(int(points_rank))} place"
+ f" with {self.points[member.id]:.1f} points."
+ ),
+ inline=False
+ )
+
+ rank_embed.add_field(
+ name="Average Speed",
+ value=(
+ f"You got {self._int_to_ordinal(int(speed_rank))} place"
+ f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds."
+ ),
+ inline=False
+ )
+ return rank_embed
+
+ @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green)
+ async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the speed leaderboard embed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `Speed Leaderboard` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True)
+
+ @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple)
+ async def rank_button(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the user's rank for the overall points/average speed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `What's my rank?` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True)
+
+
+class Scoreboard:
+ """Class for the scoreboard for the Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self._bot = bot
+ self._points = {}
+ self._speed = {}
+
+ def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None:
+ """
+ Assign points or deduct points to/from a certain user.
+
+ This method should be called once the question has finished and all answers have been registered.
+ """
+ if points is not None and user_id not in self._points.keys():
+ self._points[user_id] = points
+ elif points is not None:
+ self._points[user_id] += points
+
+ if speed is not None and user_id not in self._speed.keys():
+ self._speed[user_id] = [1, speed]
+ elif speed is not None:
+ self._speed[user_id] = [
+ self._speed[user_id][0] + 1, self._speed[user_id][1] + speed
+ ]
+
+ async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]:
+ """Returns the embed of the main leaderboard along with the ScoreboardView."""
+ view = ScoreboardView(self._bot)
+
+ view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True))
+ view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0]))
+
+ return (
+ await view.create_main_leaderboard(),
+ view if not speed_leaderboard else await view._create_speed_embed()
+ )
diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py
new file mode 100644
index 00000000..18d8327a
--- /dev/null
+++ b/bot/exts/events/trivianight/trivianight.py
@@ -0,0 +1,328 @@
+import asyncio
+from json import JSONDecodeError, loads
+from random import choice
+from typing import Optional
+
+from discord import Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles
+from bot.utils.pagination import LinePaginator
+
+from ._game import AllQuestionsVisited, TriviaNightGame
+from ._questions import QuestionView
+from ._scoreboard import Scoreboard
+
+# The ID you see below are the Events Lead role ID and the Event Runner Role ID
+TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408)
+
+
+class TriviaNightCog(commands.Cog):
+ """Cog for the Python Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.game: Optional[TriviaNightGame] = None
+ self.scoreboard: Optional[Scoreboard] = None
+ self.question_closed: asyncio.Event = None
+
+ @commands.group(aliases=["tn"], invoke_without_command=True)
+ async def trivianight(self, ctx: commands.Context) -> None:
+ """
+ The command group for the Python Discord Trivia Night.
+
+ If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is.
+ """
+ cog_description = Embed(
+ title="What is .trivianight?",
+ description=(
+ "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other"
+ " players in a trivia about Python!"
+ ),
+ color=Colours.soft_green
+ )
+ await ctx.send(embed=cog_description)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None:
+ """
+ Loads a JSON file from the provided attachment or argument.
+
+ The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below:
+ - number: int (represents the current question #)
+ - description: str (represents the question itself)
+ - answers: list[str] (represents the different answers possible, must be a length of 4)
+ - correct: str (represents the correct answer in terms of what the correct answer is in `answers`
+ - time: Optional[int] (represents the timer for the question and how long it should run, default is 10)
+ - points: Optional[int] (represents how many points are awarded for each question, default is 10)
+
+ The load command accepts three different ways of loading in a JSON:
+ - an attachment of the JSON file
+ - a message link to the attachment/JSON
+ - reading the JSON itself via a codeblock or plain text
+ """
+ if self.game is not None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already a trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if ctx.message.attachments:
+ json_text = (await ctx.message.attachments[0].read()).decode("utf8")
+ elif not to_load:
+ raise commands.BadArgument("You didn't attach an attachment nor link a message!")
+ elif (
+ to_load.startswith("https://discord.com/channels")
+ or to_load.startswith("https://discordapp.com/channels")
+ ):
+ channel_id, message_id = to_load.split("/")[-2:]
+ channel = await ctx.guild.fetch_channel(int(channel_id))
+ message = await channel.fetch_message(int(message_id))
+ if message.attachments:
+ json_text = (await message.attachments[0].read()).decode("utf8")
+ else:
+ json_text = message.content.replace("```", "").replace("json", "").replace("\n", "")
+ else:
+ json_text = to_load.replace("```", "").replace("json", "").replace("\n", "")
+
+ try:
+ serialized_json = loads(json_text)
+ except JSONDecodeError as error:
+ raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}")
+
+ self.game = TriviaNightGame(serialized_json)
+ self.question_closed = asyncio.Event()
+
+ success_embed = Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The JSON was loaded successfully!",
+ color=Colours.soft_green
+ )
+
+ self.scoreboard = Scoreboard(self.bot)
+
+ await ctx.send(embed=success_embed)
+
+ @trivianight.command(aliases=('next',))
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def question(self, ctx: commands.Context, question_number: str = None) -> None:
+ """
+ Gets a random question from the unanswered question list and lets the user(s) choose the answer.
+
+ This command will continuously count down until the time limit of the question is exhausted.
+ However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already an ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ try:
+ next_question = self.game.next_question(question_number)
+ except AllQuestionsVisited:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="All of the questions have been used.",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ await ctx.send("Next question in 3 seconds! Get ready...")
+ await asyncio.sleep(3)
+
+ question_view = QuestionView(next_question)
+ question_embed = question_view.create_embed()
+
+ next_question.start()
+ message = await ctx.send(embed=question_embed, view=question_view)
+
+ # Exponentially sleep less and less until the time limit is reached
+ percentage = 1
+ while True:
+ percentage *= 0.5
+ duration = next_question.time * percentage
+
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+
+ if int(duration) > 1:
+ # It is quite ugly to display decimals, the delay for requests to reach Discord
+ # cause sub-second accuracy to be quite pointless.
+ await ctx.send(f"{int(duration)}s remaining...")
+ else:
+ # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a
+ # half, of that half) we must sleep both halves at the end.
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+ break
+
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def list(self, ctx: commands.Context) -> None:
+ """
+ Display all the questions left in the question bank.
+
+ Questions are displayed in the following format:
+ Q(number): Question description | :white_check_mark: if the question was used otherwise :x:.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ question_list = self.game.list_questions()
+
+ list_embed = Embed(title="All Trivia Night Questions")
+
+ if len(question_list) == 1:
+ list_embed.description = question_list[0]
+ await ctx.send(embed=list_embed)
+ else:
+ await LinePaginator.paginate(
+ question_list,
+ ctx,
+ list_embed
+ )
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def stop(self, ctx: commands.Context) -> None:
+ """
+ End the ongoing question to show the correct question.
+
+ This command should be used if the question should be ended early or if the time limit fails
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ self.question_closed.set()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard view.
+
+ The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+
+ The scoreboard view also has a button where the user can see their own rank, points and average speed if they
+ didn't make it onto the leaderboard.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, scoreboard_view = await self.scoreboard.display()
+ await ctx.send(embed=scoreboard_embed, view=scoreboard_view)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def scoreboard(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard.
+
+ The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True)
+ await ctx.send(embeds=(scoreboard_embed, speed_scoreboard))
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end_game(self, ctx: commands.Context) -> None:
+ """Ends the ongoing game."""
+ self.game = None
+
+ await ctx.send(embed=Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The game has been stopped.",
+ color=Colours.soft_green
+ ))
+
+
+def setup(bot: Bot) -> None:
+ """Load the TriviaNight cog."""
+ bot.add_cog(TriviaNightCog(bot))
diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py
index beff196f..77e38427 100644
--- a/bot/exts/fun/battleship.py
+++ b/bot/exts/fun/battleship.py
@@ -110,8 +110,8 @@ class Game:
self.gameover: bool = False
- self.turn: Optional[discord.Member] = None
- self.next: Optional[discord.Member] = None
+ self.turn: Optional[Player] = None
+ self.next: Optional[Player] = None
self.match: Optional[re.Match] = None
self.surrender: bool = False
diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py
index f53695d5..1b88d065 100644
--- a/bot/exts/fun/connect_four.py
+++ b/bot/exts/fun/connect_four.py
@@ -5,6 +5,7 @@ from typing import Optional, Union
import discord
import emojis
+from discord import ClientUser, Member
from discord.ext import commands
from bot.bot import Bot
@@ -71,7 +72,9 @@ class Game:
await self.message.add_reaction(CROSS_EMOJI)
await self.message.edit(content=None, embed=embed)
- async def game_over(self, action: str, player1: discord.user, player2: discord.user) -> None:
+ async def game_over(
+ self, action: str, player1: Union[ClientUser, Member], player2: Union[ClientUser, Member]
+ ) -> None:
"""Announces to public chat."""
if action == "win":
await self.channel.send(f"Game Over! {player1.mention} won against {player2.mention}")
diff --git a/bot/exts/fun/fun.py b/bot/exts/fun/fun.py
index b148f1f3..e7337cb6 100644
--- a/bot/exts/fun/fun.py
+++ b/bot/exts/fun/fun.py
@@ -1,35 +1,21 @@
-import functools
import json
import logging
import random
from collections.abc import Iterable
from pathlib import Path
-from typing import Callable, Optional, Union
+from typing import Literal
-from discord import Embed, Message
+import pyjokes
+from discord import Embed
from discord.ext import commands
-from discord.ext.commands import BadArgument, Cog, Context, MessageConverter, clean_content
+from discord.ext.commands import BadArgument, Cog, Context, clean_content
-from bot import utils
from bot.bot import Bot
from bot.constants import Client, Colours, Emojis
-from bot.utils import helpers
+from bot.utils import helpers, messages
log = logging.getLogger(__name__)
-UWU_WORDS = {
- "fi": "fwi",
- "l": "w",
- "r": "w",
- "some": "sum",
- "th": "d",
- "thing": "fing",
- "tho": "fo",
- "you're": "yuw'we",
- "your": "yur",
- "you": "yuw",
-}
-
def caesar_cipher(text: str, offset: int) -> Iterable[str]:
"""
@@ -56,7 +42,6 @@ class Fun(Cog):
def __init__(self, bot: Bot):
self.bot = bot
-
self._caesar_cipher_embed = json.loads(Path("bot/resources/fun/caesar_info.json").read_text("UTF-8"))
@staticmethod
@@ -74,23 +59,6 @@ class Fun(Cog):
else:
raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.")
- @commands.command(name="uwu", aliases=("uwuwize", "uwuify",))
- async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None:
- """Converts a given `text` into it's uwu equivalent."""
- conversion_func = functools.partial(
- utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True
- )
- text, embed = await Fun._get_text_and_embed(ctx, text)
- # Convert embed if it exists
- if embed is not None:
- embed = Fun._convert_embed(conversion_func, embed)
- converted_text = conversion_func(text)
- converted_text = helpers.suppress_links(converted_text)
- # Don't put >>> if only embed present
- if converted_text:
- converted_text = f">>> {converted_text.lstrip('> ')}"
- await ctx.send(content=converted_text, embed=embed)
-
@commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",))
async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None:
"""Randomly converts the casing of a given `text`."""
@@ -99,10 +67,10 @@ class Fun(Cog):
return "".join(
char.upper() if round(random.random()) else char.lower() for char in text
)
- text, embed = await Fun._get_text_and_embed(ctx, text)
+ text, embed = await messages.get_text_and_embed(ctx, text)
# Convert embed if it exists
if embed is not None:
- embed = Fun._convert_embed(conversion_func, embed)
+ embed = messages.convert_embed(conversion_func, embed)
converted_text = conversion_func(text)
converted_text = helpers.suppress_links(converted_text)
# Don't put >>> if only embed present
@@ -148,10 +116,10 @@ class Fun(Cog):
"""Encrypts the given string using the Caesar Cipher."""
return "".join(caesar_cipher(text, offset))
- text, embed = await Fun._get_text_and_embed(ctx, msg)
+ text, embed = await messages.get_text_and_embed(ctx, msg)
if embed is not None:
- embed = Fun._convert_embed(conversion_func, embed)
+ embed = messages.convert_embed(conversion_func, embed)
converted_text = conversion_func(text)
@@ -182,67 +150,11 @@ class Fun(Cog):
"""
await self._caesar_cipher(ctx, offset, msg, left_shift=True)
- @staticmethod
- async def _get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]:
- """
- Attempts to extract the text and embed from a possible link to a discord Message.
-
- Does not retrieve the text and embed from the Message if it is in a channel the user does
- not have read permissions in.
-
- Returns a tuple of:
- str: If `text` is a valid discord Message, the contents of the message, else `text`.
- Optional[Embed]: The embed if found in the valid Message, else None
- """
- embed = None
-
- msg = await Fun._get_discord_message(ctx, text)
- # Ensure the user has read permissions for the channel the message is in
- if isinstance(msg, Message):
- permissions = msg.channel.permissions_for(ctx.author)
- if permissions.read_messages:
- text = msg.clean_content
- # Take first embed because we can't send multiple embeds
- if msg.embeds:
- embed = msg.embeds[0]
-
- return (text, embed)
-
- @staticmethod
- async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]:
- """
- Attempts to convert a given `text` to a discord Message object and return it.
-
- Conversion will succeed if given a discord Message ID or link.
- Returns `text` if the conversion fails.
- """
- try:
- text = await MessageConverter().convert(ctx, text)
- except commands.BadArgument:
- log.debug(f"Input '{text:.20}...' is not a valid Discord Message")
- return text
-
- @staticmethod
- def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed:
- """
- Converts the text in an embed using a given conversion function, then return the embed.
-
- Only modifies the following fields: title, description, footer, fields
- """
- embed_dict = embed.to_dict()
-
- embed_dict["title"] = func(embed_dict.get("title", ""))
- embed_dict["description"] = func(embed_dict.get("description", ""))
-
- if "footer" in embed_dict:
- embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", ""))
-
- if "fields" in embed_dict:
- for field in embed_dict["fields"]:
- field["name"] = func(field.get("name", ""))
- field["value"] = func(field.get("value", ""))
-
- return Embed.from_dict(embed_dict)
+ @commands.command()
+ async def joke(self, ctx: commands.Context, category: Literal["neutral", "chuck", "all"] = "all") -> None:
+ """Retrieves a joke of the specified `category` from the pyjokes api."""
+ joke = pyjokes.get_joke(category=category)
+ await ctx.send(joke)
def setup(bot: Bot) -> None:
diff --git a/bot/exts/fun/latex.py b/bot/exts/fun/latex.py
new file mode 100644
index 00000000..aeabcd20
--- /dev/null
+++ b/bot/exts/fun/latex.py
@@ -0,0 +1,138 @@
+import hashlib
+import re
+import string
+from io import BytesIO
+from pathlib import Path
+from typing import BinaryIO, Optional
+
+import discord
+from PIL import Image
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Channels, WHITELISTED_CHANNELS
+from bot.utils.decorators import whitelist_override
+
+FORMATTED_CODE_REGEX = re.compile(
+ r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
+ r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
+ r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
+ r"(?P<code>.*?)" # extract all code inside the markup
+ r"\s*" # any more whitespace before the end of the code markup
+ r"(?P=delim)", # match the exact same delimiter from the start again
+ re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
+)
+
+LATEX_API_URL = "https://rtex.probablyaweb.site/api/v2"
+PASTEBIN_URL = "https://paste.pythondiscord.com"
+
+THIS_DIR = Path(__file__).parent
+CACHE_DIRECTORY = THIS_DIR / "_latex_cache"
+CACHE_DIRECTORY.mkdir(exist_ok=True)
+TEMPLATE = string.Template(Path("bot/resources/fun/latex_template.txt").read_text())
+
+PAD = 10
+
+LATEX_ALLOWED_CHANNNELS = WHITELISTED_CHANNELS + (
+ Channels.data_science_and_ai,
+ Channels.algos_and_data_structs,
+)
+
+
+def _prepare_input(text: str) -> str:
+ """Extract latex from a codeblock, if it is in one."""
+ if match := FORMATTED_CODE_REGEX.match(text):
+ return match.group("code")
+ else:
+ return text
+
+
+def _process_image(data: bytes, out_file: BinaryIO) -> None:
+ """Read `data` as an image file, and paste it on a white background."""
+ image = Image.open(BytesIO(data)).convert("RGBA")
+ width, height = image.size
+ background = Image.new("RGBA", (width + 2 * PAD, height + 2 * PAD), "WHITE")
+
+ # paste the image on the background, using the same image as the mask
+ # when an RGBA image is passed as the mask, its alpha band is used.
+ # this has the effect of skipping pasting the pixels where the image is transparent.
+ background.paste(image, (PAD, PAD), image)
+ background.save(out_file)
+
+
+class InvalidLatexError(Exception):
+ """Represents an error caused by invalid latex."""
+
+ def __init__(self, logs: Optional[str]):
+ super().__init__(logs)
+ self.logs = logs
+
+
+class Latex(commands.Cog):
+ """Renders latex."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ async def _generate_image(self, query: str, out_file: BinaryIO) -> None:
+ """Make an API request and save the generated image to cache."""
+ payload = {"code": query, "format": "png"}
+ async with self.bot.http_session.post(LATEX_API_URL, data=payload, raise_for_status=True) as response:
+ response_json = await response.json()
+ if response_json["status"] != "success":
+ raise InvalidLatexError(logs=response_json.get("log"))
+ async with self.bot.http_session.get(
+ f"{LATEX_API_URL}/{response_json['filename']}",
+ raise_for_status=True
+ ) as response:
+ _process_image(await response.read(), out_file)
+
+ async def _upload_to_pastebin(self, text: str) -> Optional[str]:
+ """Uploads `text` to the paste service, returning the url if successful."""
+ try:
+ async with self.bot.http_session.post(
+ PASTEBIN_URL + "/documents",
+ data=text,
+ raise_for_status=True
+ ) as response:
+ response_json = await response.json()
+ if "key" in response_json:
+ return f"{PASTEBIN_URL}/{response_json['key']}.txt?noredirect"
+ except Exception:
+ # 400 (Bad Request) means there are too many characters
+ pass
+
+ @commands.command()
+ @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
+ @whitelist_override(channels=LATEX_ALLOWED_CHANNNELS)
+ async def latex(self, ctx: commands.Context, *, query: str) -> None:
+ """Renders the text in latex and sends the image."""
+ query = _prepare_input(query)
+
+ # the hash of the query is used as the filename in the cache.
+ query_hash = hashlib.md5(query.encode()).hexdigest()
+ image_path = CACHE_DIRECTORY / f"{query_hash}.png"
+ async with ctx.typing():
+ if not image_path.exists():
+ try:
+ with open(image_path, "wb") as out_file:
+ await self._generate_image(TEMPLATE.substitute(text=query), out_file)
+ except InvalidLatexError as err:
+ embed = discord.Embed(title="Failed to render input.")
+ if err.logs is None:
+ embed.description = "No logs available."
+ else:
+ logs_paste_url = await self._upload_to_pastebin(err.logs)
+ if logs_paste_url:
+ embed.description = f"[View Logs]({logs_paste_url})"
+ else:
+ embed.description = "Couldn't upload logs."
+ await ctx.send(embed=embed)
+ image_path.unlink()
+ return
+ await ctx.send(file=discord.File(image_path, "latex.png"))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Latex Cog."""
+ bot.add_cog(Latex(bot))
diff --git a/bot/exts/fun/uwu.py b/bot/exts/fun/uwu.py
new file mode 100644
index 00000000..83497893
--- /dev/null
+++ b/bot/exts/fun/uwu.py
@@ -0,0 +1,204 @@
+import random
+import re
+import typing as t
+from dataclasses import dataclass
+from functools import partial
+
+import discord
+from discord.ext import commands
+from discord.ext.commands import Cog, Context, clean_content
+
+from bot.bot import Bot
+from bot.utils import helpers, messages
+
+WORD_REPLACE = {
+ "small": "smol",
+ "cute": "kawaii~",
+ "fluff": "floof",
+ "love": "luv",
+ "stupid": "baka",
+ "idiot": "baka",
+ "what": "nani",
+ "meow": "nya~",
+ "roar": "rawrr~",
+}
+
+EMOJIS = [
+ "rawr x3",
+ "OwO",
+ "UwU",
+ "o.O",
+ "-.-",
+ ">w<",
+ "σωσ",
+ "òωó",
+ "ʘwʘ",
+ ":3",
+ "XD",
+ "nyaa~~",
+ "mya",
+ ">_<",
+ "rawr",
+ "uwu",
+ "^^",
+ "^^;;",
+]
+
+REGEX_WORD_REPLACE = re.compile(r"(?<!w)[lr](?!w)")
+
+REGEX_PUNCTUATION = re.compile(r"[.!?\r\n\t]")
+
+REGEX_STUTTER = re.compile(r"(\s)([a-zA-Z])")
+SUBSTITUTE_STUTTER = r"\g<1>\g<2>-\g<2>"
+
+REGEX_NYA = re.compile(r"n([aeou][^aeiou])")
+SUBSTITUTE_NYA = r"ny\1"
+
+REGEX_EMOJI = re.compile(r"<(a)?:(\w+?):(\d{15,21}?)>", re.ASCII)
+
+
+@dataclass(frozen=True, eq=True)
+class Emoji:
+ """Data class for an Emoji."""
+
+ name: str
+ uid: int
+ animated: bool = False
+
+ def __str__(self):
+ anim_bit = "a" if self.animated else ""
+ return f"<{anim_bit}:{self.name}:{self.uid}>"
+
+ def can_display(self, bot: Bot) -> bool:
+ """Determines if a bot is in a server with the emoji."""
+ return bot.get_emoji(self.uid) is not None
+
+ @classmethod
+ def from_match(cls, match: tuple[str, str, str]) -> t.Optional['Emoji']:
+ """Creates an Emoji from a regex match tuple."""
+ if not match or len(match) != 3 or not match[2].isdecimal():
+ return None
+ return cls(match[1], int(match[2]), match[0] == "a")
+
+
+class Uwu(Cog):
+ """Cog for the uwu command."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ def _word_replace(self, input_string: str) -> str:
+ """Replaces words that are keys in the word replacement hash to the values specified."""
+ for word, replacement in WORD_REPLACE.items():
+ input_string = input_string.replace(word, replacement)
+ return input_string
+
+ def _char_replace(self, input_string: str) -> str:
+ """Replace certain characters with 'w'."""
+ return REGEX_WORD_REPLACE.sub("w", input_string)
+
+ def _stutter(self, strength: float, input_string: str) -> str:
+ """Adds stuttering to a string."""
+ return REGEX_STUTTER.sub(partial(self._stutter_replace, strength=strength), input_string, 0)
+
+ def _stutter_replace(self, match: re.Match, strength: float = 0.0) -> str:
+ """Replaces a single character with a stuttered character."""
+ match_string = match.group()
+ if random.random() < strength:
+ return f"{match_string}-{match_string[-1]}" # Stutter the last character
+ return match_string
+
+ def _nyaify(self, input_string: str) -> str:
+ """Nyaifies a string by adding a 'y' between an 'n' and a vowel."""
+ return REGEX_NYA.sub(SUBSTITUTE_NYA, input_string, 0)
+
+ def _emoji(self, strength: float, input_string: str) -> str:
+ """Replaces some punctuation with emoticons."""
+ return REGEX_PUNCTUATION.sub(partial(self._emoji_replace, strength=strength), input_string, 0)
+
+ def _emoji_replace(self, match: re.Match, strength: float = 0.0) -> str:
+ """Replaces a punctuation character with an emoticon."""
+ match_string = match.group()
+ if random.random() < strength:
+ return f" {random.choice(EMOJIS)} "
+ return match_string
+
+ def _ext_emoji_replace(self, input_string: str) -> str:
+ """Replaces any emoji the bot cannot send in input_text with a random emoticons."""
+ groups = REGEX_EMOJI.findall(input_string)
+ emojis = {Emoji.from_match(match) for match in groups}
+ # Replace with random emoticon if unable to display
+ emojis_map = {
+ re.escape(str(e)): random.choice(EMOJIS)
+ for e in emojis if e and not e.can_display(self.bot)
+ }
+ if emojis_map:
+ # Pattern for all emoji markdowns to be replaced
+ emojis_re = re.compile("|".join(emojis_map.keys()))
+ # Replace matches with random emoticon
+ return emojis_re.sub(
+ lambda m: emojis_map[re.escape(m.group())],
+ input_string
+ )
+ # Return original if no replacement
+ return input_string
+
+ def _uwuify(self, input_string: str, *, stutter_strength: float = 0.2, emoji_strength: float = 0.1) -> str:
+ """Takes a string and returns an uwuified version of it."""
+ input_string = input_string.lower()
+ input_string = self._word_replace(input_string)
+ input_string = self._nyaify(input_string)
+ input_string = self._char_replace(input_string)
+ input_string = self._stutter(stutter_strength, input_string)
+ input_string = self._emoji(emoji_strength, input_string)
+ input_string = self._ext_emoji_replace(input_string)
+ return input_string
+
+ @commands.command(name="uwu", aliases=("uwuwize", "uwuify",))
+ async def uwu_command(self, ctx: Context, *, text: t.Optional[str] = None) -> None:
+ """
+ Echo an uwuified version the passed text.
+
+ Example:
+ '.uwu Hello, my name is John' returns something like
+ 'hewwo, m-my name is j-john nyaa~'.
+ """
+ # If `text` isn't provided then we try to get message content of a replied message
+ text = text or getattr(ctx.message.reference, "resolved", None)
+ if isinstance(text, discord.Message):
+ embeds = text.embeds
+ text = text.content
+ else:
+ embeds = None
+
+ if text is None:
+ # If we weren't able to get the content of a replied message
+ raise commands.UserInputError("Your message must have content or you must reply to a message.")
+
+ await clean_content(fix_channel_mentions=True).convert(ctx, text)
+
+ # Grabs the text from the embed for uwuification
+ if embeds:
+ embed = messages.convert_embed(self._uwuify, embeds[0])
+ else:
+ # Parse potential message links in text
+ text, embed = await messages.get_text_and_embed(ctx, text)
+
+ # If an embed is found, grab and uwuify its text
+ if embed:
+ embed = messages.convert_embed(self._uwuify, embed)
+
+ # Adds the text harvested from an embed to be put into another quote block.
+ if text:
+ converted_text = self._uwuify(text)
+ converted_text = helpers.suppress_links(converted_text)
+ converted_text = f">>> {converted_text.lstrip('> ')}"
+ else:
+ converted_text = None
+
+ await ctx.send(content=converted_text, embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the uwu cog."""
+ bot.add_cog(Uwu(bot))
diff --git a/bot/exts/holidays/easter/egg_facts.py b/bot/exts/holidays/easter/egg_facts.py
index 5f216e0d..152af6a4 100644
--- a/bot/exts/holidays/easter/egg_facts.py
+++ b/bot/exts/holidays/easter/egg_facts.py
@@ -31,7 +31,7 @@ class EasterFacts(commands.Cog):
"""A background task that sends an easter egg fact in the event channel everyday."""
await self.bot.wait_until_guild_available()
- channel = self.bot.get_channel(Channels.community_bot_commands)
+ channel = self.bot.get_channel(Channels.sir_lancebot_playground)
await channel.send(embed=self.make_embed())
@commands.command(name="eggfact", aliases=("fact",))
diff --git a/bot/exts/holidays/halloween/candy_collection.py b/bot/exts/holidays/halloween/candy_collection.py
index 729bbc97..220ba8e5 100644
--- a/bot/exts/holidays/halloween/candy_collection.py
+++ b/bot/exts/holidays/halloween/candy_collection.py
@@ -55,7 +55,7 @@ class CandyCollection(commands.Cog):
if message.author.bot:
return
# ensure it's hacktober channel
- if message.channel.id != Channels.community_bot_commands:
+ if message.channel.id != Channels.sir_lancebot_playground:
return
# do random check for skull first as it has the lower chance
@@ -77,7 +77,7 @@ class CandyCollection(commands.Cog):
return
# check to ensure it is in correct channel
- if message.channel.id != Channels.community_bot_commands:
+ if message.channel.id != Channels.sir_lancebot_playground:
return
# if its not a candy or skull, and it is one of 10 most recent messages,
@@ -139,7 +139,7 @@ class CandyCollection(commands.Cog):
@property
def hacktober_channel(self) -> discord.TextChannel:
"""Get #hacktoberbot channel from its ID."""
- return self.bot.get_channel(Channels.community_bot_commands)
+ return self.bot.get_channel(Channels.sir_lancebot_playground)
@staticmethod
async def send_spook_msg(
diff --git a/bot/exts/holidays/halloween/spookynamerate.py b/bot/exts/holidays/halloween/spookynamerate.py
index a3aa4f13..02fb71c3 100644
--- a/bot/exts/holidays/halloween/spookynamerate.py
+++ b/bot/exts/holidays/halloween/spookynamerate.py
@@ -223,7 +223,7 @@ class SpookyNameRate(Cog):
if self.first_time:
await channel.send(
"Okkey... Welcome to the **Spooky Name Rate Game**! It's a relatively simple game.\n"
- f"Everyday, a random name will be sent in <#{Channels.community_bot_commands}> "
+ f"Everyday, a random name will be sent in <#{Channels.sir_lancebot_playground}> "
"and you need to try and spookify it!\nRegister your name using "
f"`{Client.prefix}spookynamerate add spookified name`"
)
@@ -359,10 +359,10 @@ class SpookyNameRate(Cog):
"""Gets the sir-lancebot-channel after waiting until ready."""
await self.bot.wait_until_ready()
channel = self.bot.get_channel(
- Channels.community_bot_commands
- ) or await self.bot.fetch_channel(Channels.community_bot_commands)
+ Channels.sir_lancebot_playground
+ ) or await self.bot.fetch_channel(Channels.sir_lancebot_playground)
if not channel:
- logger.warning("Bot is unable to get the #seasonalbot-commands channel. Please check the channel ID.")
+ logger.warning("Bot is unable to get the #sir-lancebot-playground channel. Please check the channel ID.")
return channel
@staticmethod
diff --git a/bot/exts/holidays/pride/pride_facts.py b/bot/exts/holidays/pride/pride_facts.py
index e6ef7108..340f0b43 100644
--- a/bot/exts/holidays/pride/pride_facts.py
+++ b/bot/exts/holidays/pride/pride_facts.py
@@ -30,7 +30,7 @@ class PrideFacts(commands.Cog):
"""Background task to post the daily pride fact every day."""
await self.bot.wait_until_guild_available()
- channel = self.bot.get_channel(Channels.community_bot_commands)
+ channel = self.bot.get_channel(Channels.sir_lancebot_playground)
await self.send_select_fact(channel, datetime.utcnow())
async def send_random_fact(self, ctx: commands.Context) -> None:
diff --git a/bot/exts/holidays/pride/pride_leader.py b/bot/exts/holidays/pride/pride_leader.py
index 298c9328..adf01134 100644
--- a/bot/exts/holidays/pride/pride_leader.py
+++ b/bot/exts/holidays/pride/pride_leader.py
@@ -83,7 +83,7 @@ class PrideLeader(commands.Cog):
embed.add_field(
name="For More Information",
value=f"Do `{constants.Client.prefix}wiki {name}`"
- f" in <#{constants.Channels.community_bot_commands}>",
+ f" in <#{constants.Channels.sir_lancebot_playground}>",
inline=False
)
embed.set_thumbnail(url=pride_leader["url"])
diff --git a/bot/exts/holidays/valentines/be_my_valentine.py b/bot/exts/holidays/valentines/be_my_valentine.py
index 1572d474..cbb95157 100644
--- a/bot/exts/holidays/valentines/be_my_valentine.py
+++ b/bot/exts/holidays/valentines/be_my_valentine.py
@@ -70,7 +70,7 @@ class BeMyValentine(commands.Cog):
raise commands.UserInputError("Come on, you can't send a valentine to yourself :expressionless:")
emoji_1, emoji_2 = self.random_emoji()
- channel = self.bot.get_channel(Channels.community_bot_commands)
+ channel = self.bot.get_channel(Channels.sir_lancebot_playground)
valentine, title = self.valentine_check(valentine_type)
embed = discord.Embed(
diff --git a/bot/exts/holidays/valentines/lovecalculator.py b/bot/exts/holidays/valentines/lovecalculator.py
index a53014e5..10dea9df 100644
--- a/bot/exts/holidays/valentines/lovecalculator.py
+++ b/bot/exts/holidays/valentines/lovecalculator.py
@@ -12,7 +12,7 @@ from discord.ext import commands
from discord.ext.commands import BadArgument, Cog, clean_content
from bot.bot import Bot
-from bot.constants import Channels, Client, Lovefest, Month
+from bot.constants import Channels, Lovefest, Month, PYTHON_PREFIX
from bot.utils.decorators import in_month
log = logging.getLogger(__name__)
@@ -32,7 +32,7 @@ class LoveCalculator(Cog):
Tells you how much the two love each other.
This command requires at least one member as input, if two are given love will be calculated between
- those two users, if only one is given, the second member is asusmed to be the invoker.
+ those two users, if only one is given, the second member is assumed to be the invoker.
Members are converted from:
- User ID
- Mention
@@ -51,7 +51,7 @@ class LoveCalculator(Cog):
raise BadArgument(
"This command can only be ran against members with the lovefest role! "
"This role be can assigned by running "
- f"`{Client.prefix}lovefest sub` in <#{Channels.community_bot_commands}>."
+ f"`{PYTHON_PREFIX}subscribe` in <#{Channels.bot_commands}>."
)
if whom is None:
@@ -90,7 +90,7 @@ class LoveCalculator(Cog):
name="A letter from Dr. Love:",
value=data["text"]
)
- embed.set_footer(text=f"You can unsubscribe from lovefest by using {Client.prefix}lovefest unsub")
+ embed.set_footer(text=f"You can unsubscribe from lovefest by using {PYTHON_PREFIX}subscribe.")
await ctx.send(embed=embed)
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py
index b50205a0..2e3458d8 100644
--- a/bot/exts/utilities/bookmark.py
+++ b/bot/exts/utilities/bookmark.py
@@ -16,6 +16,13 @@ log = logging.getLogger(__name__)
# Number of seconds to wait for other users to bookmark the same message
TIMEOUT = 120
BOOKMARK_EMOJI = "πŸ“Œ"
+MESSAGE_NOT_FOUND_ERROR = (
+ "You must either provide a reference to a valid message, or reply to one."
+ "\n\nThe lookup strategy for a message is as follows (in order):"
+ "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
+ "\n2. Lookup by message ID (the message **must** be in the current channel)"
+ "\n3. Lookup by message URL"
+)
class Bookmark(commands.Cog):
@@ -42,51 +49,37 @@ class Bookmark(commands.Cog):
return embed
@staticmethod
- def build_error_embed(user: discord.Member) -> discord.Embed:
- """Builds an error embed for when a bookmark requester has DMs disabled."""
+ def build_error_embed(message: str) -> discord.Embed:
+ """Builds an error embed for a given message."""
return discord.Embed(
title=random.choice(ERROR_REPLIES),
- description=f"{user.mention}, please enable your DMs to receive the bookmark.",
+ description=message,
colour=Colours.soft_red
)
async def action_bookmark(
self,
channel: discord.TextChannel,
- user: discord.Member,
+ member: discord.Member,
target_message: discord.Message,
title: str
) -> None:
- """Sends the bookmark DM, or sends an error embed when a user bookmarks a message."""
+ """
+ Sends the given target_message as a bookmark to the member in DMs to the user.
+
+ Send an error embed instead if the member has DMs disabled.
+ """
+ embed = self.build_bookmark_dm(target_message, title)
try:
- embed = self.build_bookmark_dm(target_message, title)
- await user.send(embed=embed)
+ await member.send(embed=embed)
except discord.Forbidden:
- error_embed = self.build_error_embed(user)
+ error_embed = self.build_error_embed(f"{member.mention}, please enable your DMs to receive the bookmark.")
await channel.send(embed=error_embed)
else:
- log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'")
-
- @staticmethod
- async def send_reaction_embed(
- channel: discord.TextChannel,
- target_message: discord.Message
- ) -> discord.Message:
- """Sends an embed, with a reaction, so users can react to bookmark the message too."""
- message = await channel.send(
- embed=discord.Embed(
- description=(
- f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to "
- f"[this message]({target_message.jump_url})."
- ),
- colour=Colours.soft_green
- )
- )
-
- await message.add_reaction(BOOKMARK_EMOJI)
- return message
+ log.info(f"{member} bookmarked {target_message.jump_url} with title '{title}'")
- @commands.command(name="bookmark", aliases=("bm", "pin"))
+ @commands.group(name="bookmark", aliases=("bm", "pin"), invoke_without_command=True)
+ @commands.guild_only()
@whitelist_override(roles=(Roles.everyone,))
async def bookmark(
self,
@@ -95,30 +88,40 @@ class Bookmark(commands.Cog):
*,
title: str = "Bookmark"
) -> None:
- """Send the author a link to `target_message` via DMs."""
- if not target_message:
- if not ctx.message.reference:
- raise commands.UserInputError(
- "You must either provide a valid message to bookmark, or reply to one."
- "\n\nThe lookup strategy for a message is as follows (in order):"
- "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
- "\n2. Lookup by message ID (the message **must** be in the context channel)"
- "\n3. Lookup by message URL"
- )
- target_message = ctx.message.reference.resolved
+ """
+ Send the author a link to the specified message via DMs.
+
+ Members can either give a message as an argument, or reply to a message.
+
+ Bookmarks can subsequently be deleted by using the `bookmark delete` command in DMs.
+ """
+ target_message: Optional[discord.Message] = target_message or getattr(ctx.message.reference, "resolved", None)
+ if target_message is None:
+ raise commands.UserInputError(MESSAGE_NOT_FOUND_ERROR)
# Prevent users from bookmarking a message in a channel they don't have access to
permissions = target_message.channel.permissions_for(ctx.author)
if not permissions.read_messages:
log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.")
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description="You don't have permission to view this channel."
- )
+ embed = self.build_error_embed(f"{ctx.author.mention} You don't have permission to view this channel.")
await ctx.send(embed=embed)
return
+ await self.action_bookmark(ctx.channel, ctx.author, target_message, title)
+
+ # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs
+ bookmarked_users = [ctx.author.id]
+
+ reaction_embed = discord.Embed(
+ description=(
+ f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to "
+ f"[this message]({ctx.message.jump_url})."
+ ),
+ colour=Colours.soft_green
+ )
+ reaction_message = await ctx.send(embed=reaction_embed)
+ await reaction_message.add_reaction(BOOKMARK_EMOJI)
+
def event_check(reaction: discord.Reaction, user: discord.Member) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
@@ -134,11 +137,6 @@ class Bookmark(commands.Cog):
user.id != self.bot.user.id
))
)
- await self.action_bookmark(ctx.channel, ctx.author, target_message, title)
-
- # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs
- bookmarked_users = [ctx.author.id]
- reaction_message = await self.send_reaction_embed(ctx.channel, target_message)
while True:
try:
@@ -152,6 +150,31 @@ class Bookmark(commands.Cog):
await reaction_message.delete()
+ @bookmark.command(name="delete", aliases=("del", "rm"), root_aliases=("unbm", "unbookmark", "dmdelete", "dmdel"))
+ @whitelist_override(bypass_defaults=True, allow_dm=True)
+ async def delete_bookmark(
+ self,
+ ctx: commands.Context,
+ ) -> None:
+ """
+ Delete the Sir-Lancebot message that the command invocation is replying to.
+
+ This command allows deleting any message sent by Sir-Lancebot in the user's DM channel with the bot.
+ The command invocation must be a reply to the message that is to be deleted.
+ """
+ target_message: Optional[discord.Message] = getattr(ctx.message.reference, "resolved", None)
+ if target_message is None:
+ raise commands.UserInputError("You must reply to the message from Sir-Lancebot you wish to delete.")
+
+ if not isinstance(ctx.channel, discord.DMChannel):
+ raise commands.UserInputError("You can only run this command your own DMs!")
+ elif target_message.channel != ctx.channel:
+ raise commands.UserInputError("You can only delete messages in your own DMs!")
+ elif target_message.author != self.bot.user:
+ raise commands.UserInputError("You can only delete messages sent by Sir Lancebot!")
+
+ await target_message.delete()
+
def setup(bot: Bot) -> None:
"""Load the Bookmark cog."""
diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py
index b9feed18..42312dd1 100644
--- a/bot/exts/utilities/epoch.py
+++ b/bot/exts/utilities/epoch.py
@@ -35,7 +35,7 @@ class DateString(commands.Converter):
"""
try:
return arrow.utcnow().dehumanize(argument)
- except ValueError:
+ except (ValueError, OverflowError):
try:
dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True)
except parser.ParserError:
@@ -86,7 +86,10 @@ class Epoch(commands.Cog):
view = TimestampMenuView(ctx, self._format_dates(date_time), epoch)
original = await ctx.send(f"`{epoch}`", view=view)
await view.wait() # wait until expiration before removing the dropdown
- await original.edit(view=None)
+ try:
+ await original.edit(view=None)
+ except discord.NotFound: # disregard the error message if the message is deleled
+ pass
@staticmethod
def _format_dates(date: arrow.Arrow) -> list[str]:
diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py
index 539e388b..046f67df 100644
--- a/bot/exts/utilities/githubinfo.py
+++ b/bot/exts/utilities/githubinfo.py
@@ -1,30 +1,167 @@
import logging
import random
+import re
+import typing as t
+from dataclasses import dataclass
from datetime import datetime
-from urllib.parse import quote, quote_plus
+from urllib.parse import quote
import discord
+from aiohttp import ClientResponse
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES
+from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens
from bot.exts.core.extensions import invoke_help_command
log = logging.getLogger(__name__)
GITHUB_API_URL = "https://api.github.com"
+REQUEST_HEADERS = {
+ "Accept": "application/vnd.github.v3+json"
+}
+
+REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
+ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
+PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
+
+if Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}"
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+# Maximum number of issues in one message
+MAXIMUM_ISSUES = 5
+
+# Regex used when looking for automatic linking in messages
+# regex101 of current regex https://regex101.com/r/V2ji8M/6
+AUTOMATIC_REGEX = re.compile(
+ r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
+)
+
+
+@dataclass(eq=True, frozen=True)
+class FoundIssue:
+ """Dataclass representing an issue found by the regex."""
+
+ organisation: t.Optional[str]
+ repository: str
+ number: str
+
+
+@dataclass(eq=True, frozen=True)
+class FetchError:
+ """Dataclass representing an error while fetching an issue."""
+
+ return_code: int
+ message: str
+
+
+@dataclass(eq=True, frozen=True)
+class IssueState:
+ """Dataclass representing the state of an issue."""
+
+ repository: str
+ number: int
+ url: str
+ title: str
+ emoji: str
+
class GithubInfo(commands.Cog):
- """Fetches info from GitHub."""
+ """A Cog that fetches info from GitHub."""
def __init__(self, bot: Bot):
self.bot = bot
+ self.repos = []
+
+ @staticmethod
+ def remove_codeblocks(message: str) -> str:
+ """Remove any codeblock in a message."""
+ return CODE_BLOCK_RE.sub("", message)
+
+ async def fetch_issue(
+ self,
+ number: int,
+ repository: str,
+ user: str
+ ) -> t.Union[IssueState, FetchError]:
+ """
+ Retrieve an issue from a GitHub repository.
+
+ Returns IssueState on success, FetchError on failure.
+ """
+ url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
+ pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
+
+ json_data, r = await self.fetch_data(url)
+
+ if r.status == 403:
+ if r.headers.get("X-RateLimit-Remaining") == "0":
+ log.info(f"Ratelimit reached while fetching {url}")
+ return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
+ return FetchError(403, "Cannot access issue.")
+ elif r.status in (404, 410):
+ return FetchError(r.status, "Issue not found.")
+ elif r.status != 200:
+ return FetchError(r.status, "Error while fetching issue.")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data["html_url"]:
+ if json_data.get("state") == "open":
+ emoji = Emojis.issue_open
+ else:
+ emoji = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ pull_data, _ = await self.fetch_data(pulls_url)
+ if pull_data["draft"]:
+ emoji = Emojis.pull_request_draft
+ elif pull_data["state"] == "open":
+ emoji = Emojis.pull_request_open
+ # When 'merged_at' is not None, this means that the state of the PR is merged
+ elif pull_data["merged_at"] is not None:
+ emoji = Emojis.pull_request_merged
+ else:
+ emoji = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
+
+ return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
+
+ @staticmethod
+ def format_embed(
+ results: t.List[t.Union[IssueState, FetchError]]
+ ) -> discord.Embed:
+ """Take a list of IssueState or FetchError and format a Discord embed for them."""
+ description_list = []
+
+ for result in results:
+ if isinstance(result, IssueState):
+ description_list.append(
+ f"{result.emoji} [[{result.repository}] #{result.number} {result.title}]({result.url})"
+ )
+ elif isinstance(result, FetchError):
+ description_list.append(f":x: [{result.return_code}] {result.message}")
+
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description="\n".join(description_list)
+ )
- async def fetch_data(self, url: str) -> dict:
- """Retrieve data as a dictionary."""
- async with self.bot.http_session.get(url) as r:
- return await r.json()
+ resp.set_author(name="GitHub")
+ return resp
@commands.group(name="github", aliases=("gh", "git"))
@commands.cooldown(1, 10, commands.BucketType.user)
@@ -33,11 +170,67 @@ class GithubInfo(commands.Cog):
if ctx.invoked_subcommand is None:
await invoke_help_command(ctx)
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """
+ Automatic issue linking.
+
+ Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
+ """
+ # Ignore bots
+ if message.author.bot:
+ return
+
+ issues = [
+ FoundIssue(*match.group("org", "repo", "number"))
+ for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
+ ]
+ links = []
+
+ if issues:
+ # Block this from working in DMs
+ if not message.guild:
+ return
+
+ log.trace(f"Found {issues = }")
+ # Remove duplicates
+ issues = list(dict.fromkeys(issues))
+
+ if len(issues) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await message.channel.send(embed=embed, delete_after=5)
+ return
+
+ for repo_issue in issues:
+ result = await self.fetch_issue(
+ int(repo_issue.number),
+ repo_issue.repository,
+ repo_issue.organisation or "python-discord"
+ )
+ if isinstance(result, IssueState):
+ links.append(result)
+
+ if not links:
+ return
+
+ resp = self.format_embed(links)
+ await message.channel.send(embed=resp)
+
+ async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]:
+ """Retrieve data as a dictionary and the response in a tuple."""
+ log.trace(f"Querying GH issues API: {url}")
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
+ return await r.json(), r
+
@github_group.command(name="user", aliases=("userinfo",))
async def github_user_info(self, ctx: commands.Context, username: str) -> None:
"""Fetches a user's GitHub information."""
async with ctx.typing():
- user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
+ user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")
# User_data will not have a message key if the user exists
if "message" in user_data:
@@ -50,7 +243,7 @@ class GithubInfo(commands.Cog):
await ctx.send(embed=embed)
return
- org_data = await self.fetch_data(user_data["organizations_url"])
+ org_data, _ = await self.fetch_data(user_data["organizations_url"])
orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
orgs_to_add = " | ".join(orgs)
@@ -91,10 +284,7 @@ class GithubInfo(commands.Cog):
)
if user_data["type"] == "User":
- embed.add_field(
- name="Gists",
- value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
- )
+ embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")
embed.add_field(
name=f"Organization{'s' if len(orgs)!=1 else ''}",
@@ -123,7 +313,7 @@ class GithubInfo(commands.Cog):
return
async with ctx.typing():
- repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
+ repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
# There won't be a message key if this repo exists
if "message" in repo_data:
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
deleted file mode 100644
index b6d5a43e..00000000
--- a/bot/exts/utilities/issues.py
+++ /dev/null
@@ -1,277 +0,0 @@
-import logging
-import random
-import re
-from dataclasses import dataclass
-from typing import Optional, Union
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import (
- Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS
-)
-from bot.utils.decorators import whitelist_override
-from bot.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-BAD_RESPONSE = {
- 404: "Issue/pull request not located! Please enter a valid number!",
- 403: "Rate limit has been hit! Please try again later!"
-}
-REQUEST_HEADERS = {
- "Accept": "application/vnd.github.v3+json"
-}
-
-REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
-ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
-PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
-
-if GITHUB_TOKEN := Tokens.github:
- REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
-
-WHITELISTED_CATEGORIES = (
- Categories.development, Categories.devprojects, Categories.media, Categories.staff
-)
-
-CODE_BLOCK_RE = re.compile(
- r"^`([^`\n]+)`" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-
-# Maximum number of issues in one message
-MAXIMUM_ISSUES = 5
-
-# Regex used when looking for automatic linking in messages
-# regex101 of current regex https://regex101.com/r/V2ji8M/6
-AUTOMATIC_REGEX = re.compile(
- r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
-)
-
-
-@dataclass
-class FoundIssue:
- """Dataclass representing an issue found by the regex."""
-
- organisation: Optional[str]
- repository: str
- number: str
-
- def __hash__(self) -> int:
- return hash((self.organisation, self.repository, self.number))
-
-
-@dataclass
-class FetchError:
- """Dataclass representing an error while fetching an issue."""
-
- return_code: int
- message: str
-
-
-@dataclass
-class IssueState:
- """Dataclass representing the state of an issue."""
-
- repository: str
- number: int
- url: str
- title: str
- emoji: str
-
-
-class Issues(commands.Cog):
- """Cog that allows users to retrieve issues from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.repos = []
-
- @staticmethod
- def remove_codeblocks(message: str) -> str:
- """Remove any codeblock in a message."""
- return re.sub(CODE_BLOCK_RE, "", message)
-
- async def fetch_issues(
- self,
- number: int,
- repository: str,
- user: str
- ) -> Union[IssueState, FetchError]:
- """
- Retrieve an issue from a GitHub repository.
-
- Returns IssueState on success, FetchError on failure.
- """
- url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
- pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
- log.trace(f"Querying GH issues API: {url}")
-
- async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
- json_data = await r.json()
-
- if r.status == 403:
- if r.headers.get("X-RateLimit-Remaining") == "0":
- log.info(f"Ratelimit reached while fetching {url}")
- return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
- return FetchError(403, "Cannot access issue.")
- elif r.status in (404, 410):
- return FetchError(r.status, "Issue not found.")
- elif r.status != 200:
- return FetchError(r.status, "Error while fetching issue.")
-
- # The initial API request is made to the issues API endpoint, which will return information
- # if the issue or PR is present. However, the scope of information returned for PRs differs
- # from issues: if the 'issues' key is present in the response then we can pull the data we
- # need from the initial API call.
- if "issues" in json_data["html_url"]:
- if json_data.get("state") == "open":
- emoji = Emojis.issue_open
- else:
- emoji = Emojis.issue_closed
-
- # If the 'issues' key is not contained in the API response and there is no error code, then
- # we know that a PR has been requested and a call to the pulls API endpoint is necessary
- # to get the desired information for the PR.
- else:
- log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
- async with self.bot.http_session.get(pulls_url) as p:
- pull_data = await p.json()
- if pull_data["draft"]:
- emoji = Emojis.pull_request_draft
- elif pull_data["state"] == "open":
- emoji = Emojis.pull_request_open
- # When 'merged_at' is not None, this means that the state of the PR is merged
- elif pull_data["merged_at"] is not None:
- emoji = Emojis.pull_request_merged
- else:
- emoji = Emojis.pull_request_closed
-
- issue_url = json_data.get("html_url")
-
- return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
-
- @staticmethod
- def format_embed(
- results: list[Union[IssueState, FetchError]],
- user: str,
- repository: Optional[str] = None
- ) -> discord.Embed:
- """Take a list of IssueState or FetchError and format a Discord embed for them."""
- description_list = []
-
- for result in results:
- if isinstance(result, IssueState):
- description_list.append(f"{result.emoji} [{result.title}]({result.url})")
- elif isinstance(result, FetchError):
- description_list.append(f":x: [{result.return_code}] {result.message}")
-
- resp = discord.Embed(
- colour=Colours.bright_green,
- description="\n".join(description_list)
- )
-
- embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
- resp.set_author(name="GitHub", url=embed_url)
- return resp
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(aliases=("issues", "pr", "prs"))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
- # Remove duplicates
- numbers = set(numbers)
-
- err_message = None
- if not numbers:
- err_message = "You must have at least one issue/PR!"
-
- elif len(numbers) > MAXIMUM_ISSUES:
- err_message = f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
-
- # If there's an error with command invocation then send an error embed
- if err_message is not None:
- err_embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=err_message
- )
- await ctx.send(embed=err_embed)
- await invoke_help_command(ctx)
- return
-
- results = [await self.fetch_issues(number, repository, user) for number in numbers]
- await ctx.send(embed=self.format_embed(results, user, repository))
-
- @commands.Cog.listener()
- async def on_message(self, message: discord.Message) -> None:
- """
- Automatic issue linking.
-
- Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
- """
- # Ignore bots
- if message.author.bot:
- return
-
- issues = [
- FoundIssue(*match.group("org", "repo", "number"))
- for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
- ]
- links = []
-
- if issues:
- # Block this from working in DMs
- if not message.guild:
- await message.channel.send(
- embed=discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=(
- "You can't retrieve issues from DMs. "
- f"Try again in <#{Channels.community_bot_commands}>"
- ),
- colour=Colours.soft_red
- )
- )
- return
-
- log.trace(f"Found {issues = }")
- # Remove duplicates
- issues = set(issues)
-
- if len(issues) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await message.channel.send(embed=embed, delete_after=5)
- return
-
- for repo_issue in issues:
- result = await self.fetch_issues(
- int(repo_issue.number),
- repo_issue.repository,
- repo_issue.organisation or "python-discord"
- )
- if isinstance(result, IssueState):
- links.append(result)
-
- if not links:
- return
-
- resp = self.format_embed(links, "python-discord")
- await message.channel.send(embed=resp)
-
-
-def setup(bot: Bot) -> None:
- """Load the Issues cog."""
- bot.add_cog(Issues(bot))
diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py
index bf8f1341..5e9757d0 100644
--- a/bot/exts/utilities/realpython.py
+++ b/bot/exts/utilities/realpython.py
@@ -11,11 +11,10 @@ from bot.constants import Colours
logger = logging.getLogger(__name__)
-
API_ROOT = "https://realpython.com/search/api/v1/"
ARTICLE_URL = "https://realpython.com{article_url}"
SEARCH_URL = "https://realpython.com/search?q={user_search}"
-
+HOME_URL = "https://realpython.com/"
ERROR_EMBED = Embed(
title="Error while searching Real Python",
@@ -32,13 +31,22 @@ class RealPython(commands.Cog):
@commands.command(aliases=["rp"])
@commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
- async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None:
+ async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *,
+ user_search: Optional[str] = None) -> None:
"""
Send some articles from RealPython that match the search terms.
- By default the top 5 matches are sent, this can be overwritten to
+ By default, the top 5 matches are sent. This can be overwritten to
a number between 1 and 5 by specifying an amount before the search query.
+ If no search query is specified by the user, the home page is sent.
"""
+ if user_search is None:
+ home_page_embed = Embed(title="Real Python Home Page", url=HOME_URL, colour=Colours.orange)
+
+ await ctx.send(embed=home_page_embed)
+
+ return
+
if not 1 <= amount <= 5:
await ctx.send("`amount` must be between 1 and 5 (inclusive).")
return
diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py
new file mode 100644
index 00000000..c915f05b
--- /dev/null
+++ b/bot/exts/utilities/twemoji.py
@@ -0,0 +1,150 @@
+import logging
+import re
+from typing import Literal, Optional
+
+import discord
+from discord.ext import commands
+from emoji import UNICODE_EMOJI_ENGLISH, is_emoji
+
+from bot.bot import Bot
+from bot.constants import Colours, Roles
+from bot.utils.decorators import whitelist_override
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+BASE_URLS = {
+ "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/",
+ "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/",
+}
+CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$")
+
+
+class Twemoji(commands.Cog):
+ """Utilities for working with Twemojis."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def get_url(codepoint: str, format: Literal["png", "svg"]) -> str:
+ """Returns a source file URL for the specified Twemoji, in the corresponding format."""
+ return f"{BASE_URLS[format]}{codepoint}.{format}"
+
+ @staticmethod
+ def alias_to_name(alias: str) -> str:
+ """
+ Transform a unicode alias to an emoji name.
+
+ Example usages:
+ >>> alias_to_name(":falling_leaf:")
+ "Falling leaf"
+ >>> alias_to_name(":family_man_girl_boy:")
+ "Family man girl boy"
+ """
+ name = alias.strip(":").replace("_", " ")
+ return name.capitalize()
+
+ @staticmethod
+ def build_embed(codepoint: str) -> discord.Embed:
+ """Returns the main embed for the `twemoji` commmand."""
+ emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-"))
+
+ embed = discord.Embed(
+ title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]),
+ description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})",
+ colour=Colours.twitter_blue,
+ )
+ embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png"))
+ return embed
+
+ @staticmethod
+ def emoji(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found.
+
+ The return value is an emoji character, such as "πŸ‚". The `codepoint`
+ argument can be of any format, since it will be trimmed automatically.
+ """
+ if code := Twemoji.trim_code(codepoint):
+ return chr(int(code, 16))
+
+ @staticmethod
+ def codepoint(emoji: Optional[str]) -> Optional[str]:
+ """
+ Returns the codepoint, in a trimmed format, of a single emoji.
+
+ `emoji` should be an emoji character, such as "🐍" and "πŸ₯°", and
+ not a codepoint like "1f1f8". When working with combined emojis,
+ such as "πŸ‡ΈπŸ‡ͺ" and "πŸ‘¨β€πŸ‘©β€πŸ‘¦", send the component emojis through the method
+ one at a time.
+ """
+ if emoji is None:
+ return None
+ return hex(ord(emoji)).removeprefix("0x")
+
+ @staticmethod
+ def trim_code(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the meaningful information from the given `codepoint`.
+
+ If no codepoint is found, `None` is returned.
+
+ Example usages:
+ >>> trim_code("U+1f1f8")
+ "1f1f8"
+ >>> trim_code("\u0001f1f8")
+ "1f1f8"
+ >>> trim_code("1f466")
+ "1f466"
+ """
+ if code := CODEPOINT_REGEX.search(codepoint or ""):
+ return code.group()
+
+ @staticmethod
+ def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str:
+ """
+ Returns the codepoint corresponding to the passed tuple, separated by "-".
+
+ The return format matches the format used in URLs for Twemoji source files.
+
+ Example usages:
+ >>> codepoint_from_input(("🐍",))
+ "1f40d"
+ >>> codepoint_from_input(("1f1f8", "1f1ea"))
+ "1f1f8-1f1ea"
+ >>> codepoint_from_input(("πŸ‘¨β€πŸ‘§β€πŸ‘¦",))
+ "1f468-200d-1f467-200d-1f466"
+ """
+ raw_emoji = [emoji.lower() for emoji in raw_emoji]
+ if is_emoji(raw_emoji[0]):
+ emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0])
+ return "-".join(emojis)
+
+ emoji = "".join(
+ Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji
+ )
+ if is_emoji(emoji):
+ return "-".join(Twemoji.codepoint(e) or "" for e in emoji)
+
+ raise ValueError("No codepoint could be obtained from the given input")
+
+ @commands.command(aliases=("tw",))
+ @whitelist_override(roles=(Roles.everyone,))
+ async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None:
+ """Sends a preview of a given Twemoji, specified by codepoint or emoji."""
+ if len(raw_emoji) == 0:
+ await invoke_help_command(ctx)
+ return
+ try:
+ codepoint = self.codepoint_from_input(raw_emoji)
+ except ValueError:
+ raise commands.BadArgument(
+ "please include a valid emoji or emoji codepoint."
+ )
+
+ await ctx.send(embed=self.build_embed(codepoint))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Twemoji cog."""
+ bot.add_cog(Twemoji(bot))
diff --git a/bot/log.py b/bot/log.py
index 29e696e0..a87a836a 100644
--- a/bot/log.py
+++ b/bot/log.py
@@ -6,7 +6,7 @@ from pathlib import Path
import coloredlogs
-from bot.constants import Client
+from bot.constants import Logging
def setup() -> None:
@@ -20,7 +20,7 @@ def setup() -> None:
log_format = logging.Formatter(format_string)
root_logger = logging.getLogger()
- if Client.file_logs:
+ if Logging.file_logs:
# Set up file logging
log_file = Path("logs/sir-lancebot.log")
log_file.parent.mkdir(exist_ok=True)
@@ -45,7 +45,7 @@ def setup() -> None:
coloredlogs.install(level=logging.TRACE, stream=sys.stdout)
- root_logger.setLevel(logging.DEBUG if Client.debug else logging.INFO)
+ root_logger.setLevel(logging.DEBUG if Logging.debug else logging.INFO)
# Silence irrelevant loggers
logging.getLogger("discord").setLevel(logging.ERROR)
logging.getLogger("websockets").setLevel(logging.ERROR)
@@ -81,7 +81,7 @@ def _set_trace_loggers() -> None:
Otherwise if the env var begins with a "*",
the root logger is set to the trace level and other contents are ignored.
"""
- level_filter = Client.trace_loggers
+ level_filter = Logging.trace_loggers
if level_filter:
if level_filter.startswith("*"):
logging.getLogger().setLevel(logging.TRACE)
diff --git a/bot/resources/fun/latex_template.txt b/bot/resources/fun/latex_template.txt
new file mode 100644
index 00000000..6e67b810
--- /dev/null
+++ b/bot/resources/fun/latex_template.txt
@@ -0,0 +1,13 @@
+\documentclass{article}
+\usepackage{amsmath,amsthm,amssymb,amsfonts}
+\usepackage{bm} % nice bold symbols for matrices and vectors
+\usepackage{bbm} % bold and calligraphic numbers
+\usepackage[binary-units=true]{siunitx} % SI unit handling
+\usepackage{tikz} % from here on, to make nice diagrams with tikz
+\usepackage{ifthen}
+\usetikzlibrary{patterns}
+\usetikzlibrary{shapes, arrows, chains, fit, positioning, calc, decorations.pathreplacing}
+\begin{document}
+ \pagenumbering{gobble}
+ $text
+\end{document}
diff --git a/bot/resources/utilities/py_topics.yaml b/bot/resources/utilities/py_topics.yaml
index 1cd2c325..4527e8de 100644
--- a/bot/resources/utilities/py_topics.yaml
+++ b/bot/resources/utilities/py_topics.yaml
@@ -35,12 +35,31 @@
- Have you ever worked with a microcontroller or anything physical with Python before?
- Have you ever tried making your own programming language?
- Has a recently discovered Python module changed your general use of Python?
+ - What is your motivation for programming?
+ - What's your favorite Python related book?
+ - What's your favorite use of recursion in Python?
+ - If you could change one thing in Python, what would it be?
+ - What third-party library do you wish was in the Python standard library?
+ - Which package do you use the most and why?
+ - Which Python feature do you love the most?
+ - Do you have any plans for future projects?
+ - What modules/libraries do you want to see more projects using?
+ - What's the most ambitious thing you've done with Python so far?
+
+# programming-pedagogy
+934931964509691966:
+ - What is the best way to teach/learn OOP?
+ - What benefits are there to teaching programming to students who aren't training to become developers?
+ - What are some basic concepts that we need to know before teaching programming to others?
+ - What are the most common difficulties/misconceptions students encounter while learning to program?
+ - What makes a project a good learning experience for beginners?
+ - What can make difficult concepts more fun for students to learn?
# algos-and-data-structs
650401909852864553:
-
-# async
+# async-and-concurrency
630504881542791169:
- Are there any frameworks you wish were async?
- How have coroutines changed the way you write Python?
@@ -54,12 +73,13 @@
342318764227821568:
- Where do you get your best data?
- What is your preferred database and for what use?
+ - What is the least safe use of databases you've seen?
-# data-science
+# data-science-and-ai
366673247892275221:
-
-# discord.py
+# discord-bots
343944376055103488:
- What unique features does your bot contain, if any?
- What commands/features are you proud of making?
@@ -78,6 +98,8 @@
- What's a common part of programming we can make harder?
- What are the pros and cons of messing with __magic__()?
- What's your favorite Python hack?
+ - What's the weirdest language feature that Python doesn't have, and how can we change that?
+ - What is the most esoteric code you've written?
# game-development
660625198390837248:
@@ -87,6 +109,17 @@
- What books or tutorials would you recommend for game-development beginners?
- What made you start developing games?
+# media-processing
+971142229462777926:
+ - Where do you start with media processing? What is a good beginner project for first-timers in media processing?
+ - What are some ways you could manipulate media using Python?
+ - What is your favorite algorithm for manipulating media with Python?
+ - What is the most surprising result you have gotten after manipulating media with Python?
+ - What is the worst outcome you have gotten after manipulating media with Python?
+ - What is your most advanced media processing related achievement?
+ - Do you know any cool tricks or optimizations for manipulating media with Python?
+ - Can a computer truly generate and/or understand art?
+
# microcontrollers
545603026732318730:
- What is your favorite version of the Raspberry Pi?
@@ -110,6 +143,10 @@
- How often do you use GitHub Actions and workflows to automate your repositories?
- What's your favorite app on GitHub?
+# type-hinting
+891788761371906108:
+ -
+
# unit-testing
463035728335732738:
-
@@ -120,6 +157,7 @@
- What's your most used Bash command?
- How often do you update your Unix machine?
- How often do you upgrade on production?
+ - What is your least favorite thing about interoperability amongst *NIX operating systems and/or platforms?
# user-interfaces
338993628049571840:
@@ -128,6 +166,7 @@
- Do you perfer Command Line Interfaces (CLI) or Graphic User Interfaces (GUI)?
- What's your favorite CLI (Command Line Interface) or TUI (Terminal Line Interface)?
- What's your best GUI project?
+ - What the best-looking app you've used?
# web-development
366673702533988363:
@@ -136,3 +175,4 @@
- What is your favorite API library?
- What do you use for your frontend?
- What does your stack look like?
+ - What's the best-looking website you've visited?
diff --git a/bot/resources/utilities/starter.yaml b/bot/resources/utilities/starter.yaml
index 6b0de0ef..ce759e1a 100644
--- a/bot/resources/utilities/starter.yaml
+++ b/bot/resources/utilities/starter.yaml
@@ -32,8 +32,6 @@
- How many years have you spent coding?
- What book do you highly recommend everyone to read?
- What websites do you use daily to keep yourself up to date with the industry?
-- What made you want to join this Discord server?
-- How are you?
- What is the best advice you have ever gotten in regards to programming/software?
- What is the most satisfying thing you've done in your life?
- Who is your favorite music composer/producer/singer?
@@ -49,3 +47,4 @@
- What artistic talents do you have?
- What is the tallest building you've entered?
- What is the oldest computer you've ever used?
+- What animals do you like?
diff --git a/bot/utils/checks.py b/bot/utils/checks.py
index 8c426ed7..5433f436 100644
--- a/bot/utils/checks.py
+++ b/bot/utils/checks.py
@@ -33,7 +33,7 @@ def in_whitelist_check(
channels: Container[int] = (),
categories: Container[int] = (),
roles: Container[int] = (),
- redirect: Optional[int] = constants.Channels.community_bot_commands,
+ redirect: Optional[int] = constants.Channels.sir_lancebot_playground,
fail_silently: bool = False,
) -> bool:
"""
diff --git a/bot/utils/commands.py b/bot/utils/commands.py
new file mode 100644
index 00000000..7c04a25a
--- /dev/null
+++ b/bot/utils/commands.py
@@ -0,0 +1,11 @@
+from typing import Optional
+
+from rapidfuzz import process
+
+
+def get_command_suggestions(
+ all_commands: list[str], query: str, *, cutoff: int = 60, limit: int = 3
+) -> Optional[list]:
+ """Get similar command names."""
+ results = process.extract(query, all_commands, score_cutoff=cutoff, limit=limit)
+ return [result[0] for result in results]
diff --git a/bot/utils/decorators.py b/bot/utils/decorators.py
index 0061abd9..442eb841 100644
--- a/bot/utils/decorators.py
+++ b/bot/utils/decorators.py
@@ -272,10 +272,10 @@ def whitelist_check(**default_kwargs: Container[int]) -> Callable[[Context], boo
channels = set(kwargs.get("channels") or {})
categories = kwargs.get("categories")
- # Only output override channels + community_bot_commands
+ # Only output override channels + sir_lancebot_playground
if channels:
default_whitelist_channels = set(WHITELISTED_CHANNELS)
- default_whitelist_channels.discard(Channels.community_bot_commands)
+ default_whitelist_channels.discard(Channels.sir_lancebot_playground)
channels.difference_update(default_whitelist_channels)
# Add all whitelisted category channels, but skip if we're in DMs
diff --git a/bot/utils/messages.py b/bot/utils/messages.py
index a6c035f9..b0c95583 100644
--- a/bot/utils/messages.py
+++ b/bot/utils/messages.py
@@ -1,5 +1,12 @@
+import logging
import re
-from typing import Optional
+from typing import Callable, Optional, Union
+
+from discord import Embed, Message
+from discord.ext import commands
+from discord.ext.commands import Context, MessageConverter
+
+log = logging.getLogger(__name__)
def sub_clyde(username: Optional[str]) -> Optional[str]:
@@ -17,3 +24,66 @@ def sub_clyde(username: Optional[str]) -> Optional[str]:
return re.sub(r"(clyd)(e)", replace_e, username, flags=re.I)
else:
return username # Empty string or None
+
+
+async def get_discord_message(ctx: Context, text: str) -> Union[Message, str]:
+ """
+ Attempts to convert a given `text` to a discord Message object and return it.
+
+ Conversion will succeed if given a discord Message ID or link.
+ Returns `text` if the conversion fails.
+ """
+ try:
+ text = await MessageConverter().convert(ctx, text)
+ except commands.BadArgument:
+ pass
+
+ return text
+
+
+async def get_text_and_embed(ctx: Context, text: str) -> tuple[str, Optional[Embed]]:
+ """
+ Attempts to extract the text and embed from a possible link to a discord Message.
+
+ Does not retrieve the text and embed from the Message if it is in a channel the user does
+ not have read permissions in.
+
+ Returns a tuple of:
+ str: If `text` is a valid discord Message, the contents of the message, else `text`.
+ Optional[Embed]: The embed if found in the valid Message, else None
+ """
+ embed: Optional[Embed] = None
+
+ msg = await get_discord_message(ctx, text)
+ # Ensure the user has read permissions for the channel the message is in
+ if isinstance(msg, Message):
+ permissions = msg.channel.permissions_for(ctx.author)
+ if permissions.read_messages:
+ text = msg.clean_content
+ # Take first embed because we can't send multiple embeds
+ if msg.embeds:
+ embed = msg.embeds[0]
+
+ return text, embed
+
+
+def convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed:
+ """
+ Converts the text in an embed using a given conversion function, then return the embed.
+
+ Only modifies the following fields: title, description, footer, fields
+ """
+ embed_dict = embed.to_dict()
+
+ embed_dict["title"] = func(embed_dict.get("title", ""))
+ embed_dict["description"] = func(embed_dict.get("description", ""))
+
+ if "footer" in embed_dict:
+ embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", ""))
+
+ if "fields" in embed_dict:
+ for field in embed_dict["fields"]:
+ field["name"] = func(field.get("name", ""))
+ field["value"] = func(field.get("value", ""))
+
+ return Embed.from_dict(embed_dict)