aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts
diff options
context:
space:
mode:
authorGravatar Senjan21 <[email protected]>2022-05-09 18:21:21 +0200
committerGravatar GitHub <[email protected]>2022-05-09 18:21:21 +0200
commit019983c3785191a0c7182c62394cec2bac123d51 (patch)
tree54c296bd04a13a0097cbd7249b78a6716556d735 /bot/exts
parentDoublefixed indentation and removed unused import. (diff)
parentBump pillow from 9.0.0 to 9.0.1 (#1045) (diff)
Merge branch 'main' into uwu
Diffstat (limited to 'bot/exts')
-rw-r--r--bot/exts/avatar_modification/avatar_modify.py4
-rw-r--r--bot/exts/core/error_handler.py13
-rw-r--r--bot/exts/core/extensions.py4
-rw-r--r--bot/exts/core/help.py5
-rw-r--r--bot/exts/core/internal_eval/_internal_eval.py12
-rw-r--r--bot/exts/core/source.py4
-rw-r--r--bot/exts/events/advent_of_code/_cog.py342
-rw-r--r--bot/exts/events/advent_of_code/_helpers.py87
-rw-r--r--bot/exts/events/advent_of_code/views/dayandstarview.py82
-rw-r--r--bot/exts/events/hacktoberfest/hacktober-issue-finder.py11
-rw-r--r--bot/exts/events/trivianight/__init__.py0
-rw-r--r--bot/exts/events/trivianight/_game.py192
-rw-r--r--bot/exts/events/trivianight/_questions.py179
-rw-r--r--bot/exts/events/trivianight/_scoreboard.py186
-rw-r--r--bot/exts/events/trivianight/trivianight.py328
-rw-r--r--bot/exts/fun/anagram.py109
-rw-r--r--bot/exts/fun/battleship.py1
-rw-r--r--bot/exts/fun/connect_four.py3
-rw-r--r--bot/exts/fun/duck_game.py42
-rw-r--r--bot/exts/fun/game.py32
-rw-r--r--bot/exts/fun/latex.py130
-rw-r--r--bot/exts/fun/madlibs.py148
-rw-r--r--bot/exts/fun/quack.py75
-rw-r--r--bot/exts/fun/snakes/_utils.py25
-rw-r--r--bot/exts/fun/tic_tac_toe.py19
-rw-r--r--bot/exts/fun/trivia_quiz.py6
-rw-r--r--bot/exts/holidays/easter/earth_photos.py3
-rw-r--r--bot/exts/holidays/easter/egg_facts.py2
-rw-r--r--bot/exts/holidays/halloween/candy_collection.py24
-rw-r--r--bot/exts/holidays/halloween/scarymovie.py1
-rw-r--r--bot/exts/holidays/halloween/spookynamerate.py14
-rw-r--r--bot/exts/holidays/halloween/spookyreact.py8
-rw-r--r--bot/exts/holidays/hanukkah/hanukkah_embed.py84
-rw-r--r--bot/exts/holidays/pride/pride_facts.py2
-rw-r--r--bot/exts/holidays/pride/pride_leader.py2
-rw-r--r--bot/exts/holidays/valentines/be_my_valentine.py42
-rw-r--r--bot/exts/holidays/valentines/lovecalculator.py11
-rw-r--r--bot/exts/utilities/bookmark.py13
-rw-r--r--bot/exts/utilities/challenges.py341
-rw-r--r--bot/exts/utilities/colour.py266
-rw-r--r--bot/exts/utilities/conversationstarters.py91
-rw-r--r--bot/exts/utilities/emoji.py6
-rw-r--r--bot/exts/utilities/epoch.py138
-rw-r--r--bot/exts/utilities/githubinfo.py220
-rw-r--r--bot/exts/utilities/issues.py275
-rw-r--r--bot/exts/utilities/latex.py101
-rw-r--r--bot/exts/utilities/realpython.py16
-rw-r--r--bot/exts/utilities/reddit.py10
-rw-r--r--bot/exts/utilities/twemoji.py150
-rw-r--r--bot/exts/utilities/wikipedia.py6
-rw-r--r--bot/exts/utilities/wtf_python.py138
51 files changed, 3316 insertions, 687 deletions
diff --git a/bot/exts/avatar_modification/avatar_modify.py b/bot/exts/avatar_modification/avatar_modify.py
index 87eb05e6..3ee70cfd 100644
--- a/bot/exts/avatar_modification/avatar_modify.py
+++ b/bot/exts/avatar_modification/avatar_modify.py
@@ -239,7 +239,7 @@ class AvatarModify(commands.Cog):
description=f"Here is your lovely avatar, surrounded by\n a beautiful {option} flag. Enjoy :D"
)
embed.set_image(url=f"attachment://{file_name}")
- embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.avatar.url)
+ embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.display_avatar.url)
await ctx.send(file=file, embed=embed)
@avatar_modify.group(
@@ -286,7 +286,7 @@ class AvatarModify(commands.Cog):
@avatar_modify.command(
aliases=("savatar", "spookify"),
root_aliases=("spookyavatar", "spookify", "savatar"),
- brief="Spookify an user's avatar."
+ brief="Spookify a user's avatar."
)
async def spookyavatar(self, ctx: commands.Context) -> None:
"""This "spookifies" the user's avatar, with a random *spooky* effect."""
diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py
index fd2123e7..983632ba 100644
--- a/bot/exts/core/error_handler.py
+++ b/bot/exts/core/error_handler.py
@@ -12,7 +12,7 @@ from sentry_sdk import push_scope
from bot.bot import Bot
from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput
from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure
-from bot.utils.exceptions import APIError, UserNotPlayingError
+from bot.utils.exceptions import APIError, MovedCommandError, UserNotPlayingError
log = logging.getLogger(__name__)
@@ -98,7 +98,8 @@ class CommandErrorHandler(commands.Cog):
if isinstance(error, commands.NoPrivateMessage):
await ctx.send(
embed=self.error_embed(
- f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!",
+ "This command can only be used in the server. "
+ f"Go to <#{Channels.sir_lancebot_playground}> instead!",
NEGATIVE_REPLIES
)
)
@@ -130,6 +131,14 @@ class CommandErrorHandler(commands.Cog):
)
return
+ if isinstance(error, MovedCommandError):
+ description = (
+ f"This command, `{ctx.prefix}{ctx.command.qualified_name}` has moved to `{error.new_command_name}`.\n"
+ f"Please use `{error.new_command_name}` instead."
+ )
+ await ctx.send(embed=self.error_embed(description, NEGATIVE_REPLIES))
+ return
+
with push_scope() as scope:
scope.user = {
"id": ctx.author.id,
diff --git a/bot/exts/core/extensions.py b/bot/exts/core/extensions.py
index 424bacac..d809d2b9 100644
--- a/bot/exts/core/extensions.py
+++ b/bot/exts/core/extensions.py
@@ -18,7 +18,7 @@ from bot.utils.pagination import LinePaginator
log = logging.getLogger(__name__)
-UNLOAD_BLACKLIST = {f"{exts.__name__}.utils.extensions"}
+UNLOAD_BLACKLIST = {f"{exts.__name__}.core.extensions"}
BASE_PATH_LEN = len(exts.__name__.split("."))
@@ -152,7 +152,7 @@ class Extensions(commands.Cog):
Grey indicates that the extension is unloaded.
Green indicates that the extension is currently loaded.
"""
- embed = Embed(colour=Colour.blurple())
+ embed = Embed(colour=Colour.og_blurple())
embed.set_author(
name="Extensions List",
url=Client.github_bot_repo,
diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py
index 4b766b50..db3c2aa6 100644
--- a/bot/exts/core/help.py
+++ b/bot/exts/core/help.py
@@ -13,10 +13,7 @@ from rapidfuzz import process
from bot import constants
from bot.bot import Bot
from bot.constants import Emojis
-from bot.utils.pagination import (
- FIRST_EMOJI, LAST_EMOJI,
- LEFT_EMOJI, LinePaginator, RIGHT_EMOJI,
-)
+from bot.utils.pagination import FIRST_EMOJI, LAST_EMOJI, LEFT_EMOJI, LinePaginator, RIGHT_EMOJI
DELETE_EMOJI = Emojis.trashcan
diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py
index 4f6b4321..190a15ec 100644
--- a/bot/exts/core/internal_eval/_internal_eval.py
+++ b/bot/exts/core/internal_eval/_internal_eval.py
@@ -10,6 +10,7 @@ from bot.bot import Bot
from bot.constants import Client, Roles
from bot.utils.decorators import with_role
from bot.utils.extensions import invoke_help_command
+
from ._helpers import EvalContext
__all__ = ["InternalEval"]
@@ -33,6 +34,8 @@ RAW_CODE_REGEX = re.compile(
re.DOTALL # "." also matches newlines
)
+MAX_LENGTH = 99980
+
class InternalEval(commands.Cog):
"""Top secret code evaluation for admins and owners."""
@@ -84,9 +87,10 @@ class InternalEval(commands.Cog):
async def _upload_output(self, output: str) -> Optional[str]:
"""Upload `internal eval` output to our pastebin and return the url."""
+ data = self.shorten_output(output, max_length=MAX_LENGTH)
try:
async with self.bot.http_session.post(
- "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True
+ "https://paste.pythondiscord.com/documents", data=data, raise_for_status=True
) as resp:
data = await resp.json()
@@ -146,14 +150,14 @@ class InternalEval(commands.Cog):
await self._send_output(ctx, eval_context.format_output())
@commands.group(name="internal", aliases=("int",))
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
async def internal_group(self, ctx: commands.Context) -> None:
"""Internal commands. Top secret!"""
if not ctx.invoked_subcommand:
await invoke_help_command(ctx)
@internal_group.command(name="eval", aliases=("e",))
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
async def eval(self, ctx: commands.Context, *, code: str) -> None:
"""Run eval in a REPL-like format."""
if match := list(FORMATTED_CODE_REGEX.finditer(code)):
@@ -172,7 +176,7 @@ class InternalEval(commands.Cog):
await self._eval(ctx, code)
@internal_group.command(name="reset", aliases=("clear", "exit", "r", "c"))
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
async def reset(self, ctx: commands.Context) -> None:
"""Reset the context and locals of the eval session."""
self.locals = {}
diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py
index 7572ce51..2801be0f 100644
--- a/bot/exts/core/source.py
+++ b/bot/exts/core/source.py
@@ -6,14 +6,16 @@ from discord import Embed
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Source
+from bot.constants import Channels, Source, WHITELISTED_CHANNELS
from bot.utils.converters import SourceConverter, SourceType
+from bot.utils.decorators import whitelist_override
class BotSource(commands.Cog):
"""Displays information about the bot's source code."""
@commands.command(name="source", aliases=("src",))
+ @whitelist_override(channels=WHITELISTED_CHANNELS+(Channels.community_meta, Channels.dev_contrib))
async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None:
"""Display information and a GitHub link to the source code of a command, tag, or cog."""
if not source_item:
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py
index ca60e517..518841d4 100644
--- a/bot/exts/events/advent_of_code/_cog.py
+++ b/bot/exts/events/advent_of_code/_cog.py
@@ -2,17 +2,22 @@ import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
+from typing import Optional
import arrow
import discord
-from discord.ext import commands
+from async_rediscache import RedisCache
+from discord.ext import commands, tasks
from bot.bot import Bot
from bot.constants import (
- AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS,
+ AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, PYTHON_PREFIX, Roles, WHITELISTED_CHANNELS
)
from bot.exts.events.advent_of_code import _helpers
+from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView
+from bot.utils import members
from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role
+from bot.utils.exceptions import MovedCommandError
from bot.utils.extensions import invoke_help_command
log = logging.getLogger(__name__)
@@ -29,6 +34,14 @@ AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,)
class AdventOfCode(commands.Cog):
"""Advent of Code festivities! Ho Ho Ho!"""
+ # Redis Cache for linking Discord IDs to Advent of Code usernames
+ # RedisCache[member_id: aoc_username_string]
+ account_links = RedisCache()
+
+ # A dict with keys of member_ids to block from getting the role
+ # RedisCache[member_id: None]
+ completionist_block_list = RedisCache()
+
def __init__(self, bot: Bot):
self.bot = bot
@@ -48,6 +61,62 @@ class AdventOfCode(commands.Cog):
self.status_task.set_name("AoC Status Countdown")
self.status_task.add_done_callback(_helpers.background_task_callback)
+ # Don't start task while event isn't running
+ # self.completionist_task.start()
+
+ @tasks.loop(minutes=10.0)
+ async def completionist_task(self) -> None:
+ """
+ Give members who have completed all 50 AoC stars the completionist role.
+
+ Runs on a schedule, as defined in the task.loop decorator.
+ """
+ await self.bot.wait_until_guild_available()
+ guild = self.bot.get_guild(Client.guild)
+ completionist_role = guild.get_role(Roles.aoc_completionist)
+ if completionist_role is None:
+ log.warning("Could not find the AoC completionist role; cancelling completionist task.")
+ self.completionist_task.cancel()
+ return
+
+ aoc_name_to_member_id = {
+ aoc_name: member_id
+ for member_id, aoc_name in await self.account_links.items()
+ }
+
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailedError:
+ await self.bot.send_log("Unable to fetch AoC leaderboard during role sync.")
+ return
+
+ placement_leaderboard = json.loads(leaderboard["placement_leaderboard"])
+
+ for member_aoc_info in placement_leaderboard.values():
+ if not member_aoc_info["stars"] == 50:
+ # Only give the role to people who have completed all 50 stars
+ continue
+
+ aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}"
+
+ member_id = aoc_name_to_member_id.get(aoc_name)
+ if not member_id:
+ log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.")
+ continue
+
+ member = await members.get_or_fetch_member(guild, member_id)
+ if member is None:
+ log.debug(f"Could not find {member_id}, not giving role.")
+ continue
+
+ if completionist_role in member.roles:
+ log.debug(f"{member.name} ({member.mention}) already has the completionist role.")
+ continue
+
+ if not await self.completionist_block_list.contains(member_id):
+ log.debug(f"Giving completionist role to {member.name} ({member.mention}).")
+ await members.handle_role_change(member, member.add_roles, completionist_role)
+
@commands.group(name="adventofcode", aliases=("aoc",))
@whitelist_override(channels=AOC_WHITELIST)
async def adventofcode_group(self, ctx: commands.Context) -> None:
@@ -55,77 +124,59 @@ class AdventOfCode(commands.Cog):
if not ctx.invoked_subcommand:
await invoke_help_command(ctx)
+ @with_role(Roles.admins)
@adventofcode_group.command(
- name="subscribe",
- aliases=("sub", "notifications", "notify", "notifs"),
- brief="Notifications for new days"
+ name="block",
+ brief="Block a user from getting the completionist role.",
)
- @whitelist_override(channels=AOC_WHITELIST)
- async def aoc_subscribe(self, ctx: commands.Context) -> None:
- """Assign the role for notifications about new days being ready."""
- current_year = datetime.now().year
- if current_year != AocConfig.year:
- await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!")
- return
+ async def block_from_role(self, ctx: commands.Context, member: discord.Member) -> None:
+ """Block the given member from receiving the AoC completionist role, removing it from them if needed."""
+ completionist_role = ctx.guild.get_role(Roles.aoc_completionist)
+ if completionist_role in member.roles:
+ await member.remove_roles(completionist_role)
- role = ctx.guild.get_role(AocConfig.role_id)
- unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe"
+ await self.completionist_block_list.set(member.id, "sentinel")
+ await ctx.send(f":+1: Blocked {member.mention} from getting the AoC completionist role.")
- if role not in ctx.author.roles:
- await ctx.author.add_roles(role)
- await ctx.send(
- "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. "
- f"You can run `{unsubscribe_command}` to disable them again for you."
- )
- else:
- await ctx.send(
- "Hey, you already are receiving notifications about new Advent of Code tasks. "
- f"If you don't want them any more, run `{unsubscribe_command}` instead."
- )
-
- @in_month(Month.DECEMBER)
- @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days")
+ @commands.guild_only()
+ @adventofcode_group.command(
+ name="subscribe",
+ aliases=("sub", "notifications", "notify", "notifs", "unsubscribe", "unsub"),
+ help=f"NOTE: This command has been moved to {PYTHON_PREFIX}subscribe",
+ )
@whitelist_override(channels=AOC_WHITELIST)
- async def aoc_unsubscribe(self, ctx: commands.Context) -> None:
- """Remove the role for notifications about new days being ready."""
- role = ctx.guild.get_role(AocConfig.role_id)
+ async def aoc_subscribe(self, ctx: commands.Context) -> None:
+ """
+ Deprecated role command.
- if role in ctx.author.roles:
- await ctx.author.remove_roles(role)
- await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.")
- else:
- await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.")
+ This command has been moved to bot, and will be removed in the future.
+ """
+ raise MovedCommandError(f"{PYTHON_PREFIX}subscribe")
@adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day")
@whitelist_override(channels=AOC_WHITELIST)
async def aoc_countdown(self, ctx: commands.Context) -> None:
"""Return time left until next day."""
- if not _helpers.is_in_advent():
- datetime_now = arrow.now(_helpers.EST)
-
- # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
- this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
- next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
- deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
- delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
-
- # Add a finer timedelta if there's less than a day left
- if delta.days == 0:
- delta_str = f"approximately {delta.seconds // 3600} hours"
- else:
- delta_str = f"{delta.days} days"
+ if _helpers.is_in_advent():
+ tomorrow, _ = _helpers.time_left_to_est_midnight()
+ next_day_timestamp = int(tomorrow.timestamp())
- await ctx.send(
- "The Advent of Code event is not currently running. "
- f"The next event will start in {delta_str}."
- )
+ await ctx.send(f"Day {tomorrow.day} starts <t:{next_day_timestamp}:R>.")
return
- tomorrow, time_left = _helpers.time_left_to_est_midnight()
+ datetime_now = arrow.now(_helpers.EST)
+ # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
+ this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
+ next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
+ deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
+ delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
- hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60
+ next_aoc_timestamp = int((datetime_now + delta).timestamp())
- await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.")
+ await ctx.send(
+ "The Advent of Code event is not currently running. "
+ f"The next event will start <t:{next_aoc_timestamp}:R>."
+ )
@adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code")
@whitelist_override(channels=AOC_WHITELIST)
@@ -133,13 +184,19 @@ class AdventOfCode(commands.Cog):
"""Respond with an explanation of all things Advent of Code."""
await ctx.send(embed=self.cached_about_aoc)
+ @commands.guild_only()
@adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)")
@whitelist_override(channels=AOC_WHITELIST)
async def join_leaderboard(self, ctx: commands.Context) -> None:
"""DM the user the information for joining the Python Discord leaderboard."""
- current_year = datetime.now().year
- if current_year != AocConfig.year:
- await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!")
+ current_date = datetime.now()
+ allowed_months = (Month.NOVEMBER.value, Month.DECEMBER.value)
+ if not (
+ current_date.month in allowed_months and current_date.year == AocConfig.year or
+ current_date.month == Month.JANUARY.value and current_date.year == AocConfig.year + 1
+ ):
+ # Only allow joining the leaderboard in the run up to AOC and the January following.
+ await ctx.send(f"The Python Discord leaderboard for {current_date.year} is not yet available!")
return
author = ctx.author
@@ -150,7 +207,7 @@ class AdventOfCode(commands.Cog):
else:
try:
join_code = await _helpers.get_public_join_code(author)
- except _helpers.FetchingLeaderboardFailed:
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Failed to get join code! Notified maintainers.")
return
@@ -178,33 +235,163 @@ class AdventOfCode(commands.Cog):
else:
await ctx.message.add_reaction(Emojis.envelope)
- @in_month(Month.DECEMBER)
+ @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY)
+ @adventofcode_group.command(
+ name="link",
+ aliases=("connect",),
+ brief="Tie your Discord account with your Advent of Code name."
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_link_account(self, ctx: commands.Context, *, aoc_name: str = None) -> None:
+ """
+ Link your Discord Account to your Advent of Code name.
+
+ Stored in a Redis Cache with the format of `Discord ID: Advent of Code Name`
+ """
+ cache_items = await self.account_links.items()
+ cache_aoc_names = [value for _, value in cache_items]
+
+ if aoc_name:
+ # Let's check the current values in the cache to make sure it isn't already tied to a different account
+ if aoc_name == await self.account_links.get(ctx.author.id):
+ await ctx.reply(f"{aoc_name} is already tied to your account.")
+ return
+ elif aoc_name in cache_aoc_names:
+ log.info(
+ f"{ctx.author} ({ctx.author.id}) tried to connect their account to {aoc_name},"
+ " but it's already connected to another user."
+ )
+ await ctx.reply(
+ f"{aoc_name} is already tied to another account."
+ " Please contact an admin if you believe this is an error."
+ )
+ return
+
+ # Update an existing link
+ if old_aoc_name := await self.account_links.get(ctx.author.id):
+ log.info(f"Changing link for {ctx.author} ({ctx.author.id}) from {old_aoc_name} to {aoc_name}.")
+ await self.account_links.set(ctx.author.id, aoc_name)
+ await ctx.reply(f"Your linked account has been changed to {aoc_name}.")
+ else:
+ # Create a new link
+ log.info(f"Linking {ctx.author} ({ctx.author.id}) to account {aoc_name}.")
+ await self.account_links.set(ctx.author.id, aoc_name)
+ await ctx.reply(f"You have linked your Discord ID to {aoc_name}.")
+ else:
+ # User has not supplied a name, let's check if they're in the cache or not
+ if cache_name := await self.account_links.get(ctx.author.id):
+ await ctx.reply(f"You have already linked an Advent of Code account: {cache_name}.")
+ else:
+ await ctx.reply(
+ "You have not linked an Advent of Code account."
+ " Please re-run the command with one specified."
+ )
+
+ @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY)
+ @adventofcode_group.command(
+ name="unlink",
+ aliases=("disconnect",),
+ brief="Tie your Discord account with your Advent of Code name."
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_unlink_account(self, ctx: commands.Context) -> None:
+ """
+ Unlink your Discord ID with your Advent of Code leaderboard name.
+
+ Deletes the entry that was Stored in the Redis cache.
+ """
+ if aoc_cache_name := await self.account_links.get(ctx.author.id):
+ log.info(f"Unlinking {ctx.author} ({ctx.author.id}) from Advent of Code account {aoc_cache_name}")
+ await self.account_links.delete(ctx.author.id)
+ await ctx.reply(f"We have removed the link between your Discord ID and {aoc_cache_name}.")
+ else:
+ log.info(f"Attempted to unlink {ctx.author} ({ctx.author.id}), but no link was found.")
+ await ctx.reply("You don't have an Advent of Code account linked.")
+
+ @in_month(Month.DECEMBER, Month.JANUARY)
+ @adventofcode_group.command(
+ name="dayandstar",
+ aliases=("daynstar", "daystar"),
+ brief="Get a view that lets you filter the leaderboard by day and star",
+ )
+ @whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
+ async def aoc_day_and_star_leaderboard(
+ self,
+ ctx: commands.Context,
+ maximum_scorers_day_and_star: Optional[int] = 10
+ ) -> None:
+ """Have the bot send a View that will let you filter the leaderboard by day and star."""
+ if maximum_scorers_day_and_star > AocConfig.max_day_and_star_results or maximum_scorers_day_and_star <= 0:
+ raise commands.BadArgument(
+ f"The maximum number of results you can query is {AocConfig.max_day_and_star_results}"
+ )
+ async with ctx.typing():
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailedError:
+ await ctx.send(":x: Unable to fetch leaderboard!")
+ return
+ # This is a dictionary that contains solvers in respect of day, and star.
+ # e.g. 1-1 means the solvers of the first star of the first day and their completion time
+ per_day_and_star = json.loads(leaderboard['leaderboard_per_day_and_star'])
+ view = AoCDropdownView(
+ day_and_star_data=per_day_and_star,
+ maximum_scorers=maximum_scorers_day_and_star,
+ original_author=ctx.author
+ )
+ message = await ctx.send(
+ content="Please select a day and a star to filter by!",
+ view=view
+ )
+ await view.wait()
+ await message.edit(view=None)
+
+ @in_month(Month.DECEMBER, Month.JANUARY)
@adventofcode_group.command(
name="leaderboard",
aliases=("board", "lb"),
brief="Get a snapshot of the PyDis private AoC leaderboard",
)
@whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
- async def aoc_leaderboard(self, ctx: commands.Context) -> None:
- """Get the current top scorers of the Python Discord Leaderboard."""
+ async def aoc_leaderboard(self, ctx: commands.Context, *, aoc_name: Optional[str] = None) -> None:
+ """
+ Get the current top scorers of the Python Discord Leaderboard.
+
+ Additionally you can specify an `aoc_name` that will append the
+ specified profile's personal stats to the top of the leaderboard
+ """
+ # Strip quotes from the AoC username if needed (e.g. "My Name" -> My Name)
+ # This is to keep compatibility with those already used to wrapping the AoC name in quotes
+ # Note: only strips one layer of quotes to allow names with quotes at the start and end
+ # e.g. ""My Name"" -> "My Name"
+ if aoc_name and aoc_name.startswith('"') and aoc_name.endswith('"'):
+ aoc_name = aoc_name[1:-1]
+
+ # Check if an advent of code account is linked in the Redis Cache if aoc_name is not given
+ if (aoc_cache_name := await self.account_links.get(ctx.author.id)) and aoc_name is None:
+ aoc_name = aoc_cache_name
+
async with ctx.typing():
try:
- leaderboard = await _helpers.fetch_leaderboard()
- except _helpers.FetchingLeaderboardFailed:
+ leaderboard = await _helpers.fetch_leaderboard(self_placement_name=aoc_name)
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Unable to fetch leaderboard!")
return
- number_of_participants = leaderboard["number_of_participants"]
+ number_of_participants = leaderboard["number_of_participants"]
- top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
- header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}"
-
- table = f"```\n{leaderboard['top_leaderboard']}\n```"
- info_embed = _helpers.get_summary_embed(leaderboard)
+ top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
+ self_placement_header = " (and your personal stats compared to the top 10)" if aoc_name else ""
+ header = f"Here's our current top {top_count}{self_placement_header}! {Emojis.christmas_tree * 3}"
+ table = "```\n" \
+ f"{leaderboard['placement_leaderboard'] if aoc_name else leaderboard['top_leaderboard']}" \
+ "\n```"
+ info_embed = _helpers.get_summary_embed(leaderboard)
- await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
+ await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
+ return
- @in_month(Month.DECEMBER)
+ @in_month(Month.DECEMBER, Month.JANUARY)
@adventofcode_group.command(
name="global",
aliases=("globalboard", "gb"),
@@ -231,7 +418,7 @@ class AdventOfCode(commands.Cog):
"""Send an embed with daily completion statistics for the Python Discord leaderboard."""
try:
leaderboard = await _helpers.fetch_leaderboard()
- except _helpers.FetchingLeaderboardFailed:
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Can't fetch leaderboard for stats right now!")
return
@@ -251,7 +438,7 @@ class AdventOfCode(commands.Cog):
info_embed = _helpers.get_summary_embed(leaderboard)
await ctx.send(f"```\n{table}\n```", embed=info_embed)
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
@adventofcode_group.command(
name="refresh",
aliases=("fetch",),
@@ -267,7 +454,7 @@ class AdventOfCode(commands.Cog):
async with ctx.typing():
try:
await _helpers.fetch_leaderboard(invalidate_cache=True)
- except _helpers.FetchingLeaderboardFailed:
+ except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Something went wrong while trying to refresh the cache!")
else:
await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!")
@@ -277,6 +464,7 @@ class AdventOfCode(commands.Cog):
log.debug("Unloading the cog and canceling the background task.")
self.notification_task.cancel()
self.status_task.cancel()
+ self.completionist_task.cancel()
def _build_about_embed(self) -> discord.Embed:
"""Build and return the informational "About AoC" embed from the resources file."""
diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py
index 5fedb60f..6c004901 100644
--- a/bot/exts/events/advent_of_code/_helpers.py
+++ b/bot/exts/events/advent_of_code/_helpers.py
@@ -10,6 +10,7 @@ from typing import Any, Optional
import aiohttp
import arrow
import discord
+from discord.ext import commands
from bot.bot import Bot
from bot.constants import AdventOfCode, Channels, Colours
@@ -70,6 +71,33 @@ class FetchingLeaderboardFailedError(Exception):
"""Raised when one or more leaderboards could not be fetched at all."""
+def _format_leaderboard_line(rank: int, data: dict[str, Any], *, is_author: bool) -> str:
+ """
+ Build a string representing a line of the leaderboard.
+
+ Parameters:
+ rank:
+ Rank in the leaderboard of this entry.
+
+ data:
+ Mapping with entry information.
+
+ Keyword arguments:
+ is_author:
+ Whether to address the name displayed in the returned line
+ personally.
+
+ Returns:
+ A formatted line for the leaderboard.
+ """
+ return AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=data['name'] if not is_author else f"(You) {data['name']}",
+ score=str(data['score']),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+
+
def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]:
"""
Provide a sorting value for our leaderboard.
@@ -105,6 +133,7 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
# The data we get from the AoC website is structured by member, not by day/star,
# which means we need to iterate over the members to transpose the data to a per
# star view. We need that per star view to compute rank scores per star.
+ per_day_star_stats = collections.defaultdict(list)
for member in raw_leaderboard_data.values():
name = member["name"] if member["name"] else f"Anonymous #{member['id']}"
member_id = member["id"]
@@ -122,6 +151,11 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
star_results[(day, star)].append(
StarResult(member_id=member_id, completion_time=completion_time)
)
+ per_day_star_stats[f"{day}-{star}"].append(
+ {'completion_time': int(data["get_star_ts"]), 'member_name': name}
+ )
+ for key in per_day_star_stats:
+ per_day_star_stats[key] = sorted(per_day_star_stats[key], key=operator.itemgetter('completion_time'))
# Now that we have a transposed dataset that holds the completion time of all
# participants per star, we can compute the rank-based scores each participant
@@ -151,13 +185,26 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
# this data to JSON in order to cache it in Redis.
daily_stats[day] = {"star_one": star_one, "star_two": star_two}
- return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard}
+ return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard, 'per_day_and_star': per_day_star_stats}
-def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
+def _format_leaderboard(leaderboard: dict[str, dict], self_placement_name: str = None) -> str:
"""Format the leaderboard using the AOC_TABLE_TEMPLATE."""
leaderboard_lines = [HEADER]
+ self_placement_exists = False
for rank, data in enumerate(leaderboard.values(), start=1):
+ if self_placement_name and data["name"].lower() == self_placement_name.lower():
+ leaderboard_lines.insert(
+ 1,
+ AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=f"(You) {data['name']}",
+ score=str(data["score"]),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+ )
+ self_placement_exists = True
+ continue
leaderboard_lines.append(
AOC_TABLE_TEMPLATE.format(
rank=rank,
@@ -166,7 +213,13 @@ def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
stars=f"({data['star_1']}, {data['star_2']})"
)
)
-
+ if self_placement_name and not self_placement_exists:
+ raise commands.BadArgument(
+ "Sorry, your profile does not exist in this leaderboard."
+ "\n\n"
+ "To join our leaderboard, run the command `.aoc join`."
+ " If you've joined recently, please wait up to 30 minutes for our leaderboard to refresh."
+ )
return "\n".join(leaderboard_lines)
@@ -202,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]:
# Two attempts, one with the original session cookie and one with the fallback session
for attempt in range(1, 3):
- log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
+ log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
cookies = {"session": leaderboard.session}
try:
raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies)
@@ -254,7 +307,7 @@ def _get_top_leaderboard(full_leaderboard: str) -> str:
@_caches.leaderboard_cache.atomic_transaction
-async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
+async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: str = None) -> dict:
"""
Get the current Python Discord combined leaderboard.
@@ -264,7 +317,6 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
miss, this function is locked to one call at a time using a decorator.
"""
cached_leaderboard = await _caches.leaderboard_cache.to_dict()
-
# Check if the cached leaderboard contains everything we expect it to. If it
# does not, this probably means the cache has not been created yet or has
# expired in Redis. This check also accounts for a malformed cache.
@@ -280,15 +332,17 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
number_of_participants = len(leaderboard)
formatted_leaderboard = _format_leaderboard(leaderboard)
full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard)
- leaderboard_fetched_at = datetime.datetime.utcnow().isoformat()
+ leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
cached_leaderboard = {
+ "placement_leaderboard": json.dumps(raw_leaderboard_data),
"full_leaderboard": formatted_leaderboard,
"top_leaderboard": _get_top_leaderboard(formatted_leaderboard),
"full_leaderboard_url": full_leaderboard_url,
"leaderboard_fetched_at": leaderboard_fetched_at,
"number_of_participants": number_of_participants,
"daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]),
+ "leaderboard_per_day_and_star": json.dumps(parsed_leaderboard_data["per_day_and_star"])
}
# Store the new values in Redis
@@ -300,7 +354,13 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
_caches.leaderboard_cache.namespace,
AdventOfCode.leaderboard_cache_expiry_seconds
)
-
+ if self_placement_name:
+ formatted_placement_leaderboard = _parse_raw_leaderboard_data(
+ json.loads(cached_leaderboard["placement_leaderboard"])
+ )["leaderboard"]
+ cached_leaderboard["placement_leaderboard"] = _get_top_leaderboard(
+ _format_leaderboard(formatted_placement_leaderboard, self_placement_name=self_placement_name)
+ )
return cached_leaderboard
@@ -308,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
"""Get an embed with the current summary stats of the leaderboard."""
leaderboard_url = leaderboard["full_leaderboard_url"]
refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60
+ refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp())
+
+ aoc_embed = discord.Embed(colour=Colours.soft_green)
- aoc_embed = discord.Embed(
- colour=Colours.soft_green,
- timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]),
- description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*"
+ aoc_embed.description = (
+ f"The leaderboard is refreshed every {refresh_minutes} minutes.\n"
+ f"Last Updated: <t:{refreshed_unix}:t>"
)
aoc_embed.add_field(
name="Number of Participants",
@@ -326,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:
inline=True,
)
aoc_embed.set_author(name="Advent of Code", url=leaderboard_url)
- aoc_embed.set_footer(text="Last Updated")
aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL)
return aoc_embed
diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py
new file mode 100644
index 00000000..5529c12b
--- /dev/null
+++ b/bot/exts/events/advent_of_code/views/dayandstarview.py
@@ -0,0 +1,82 @@
+from datetime import datetime
+
+import discord
+
+AOC_DAY_AND_STAR_TEMPLATE = "{rank: >4} | {name:25.25} | {completion_time: >10}"
+
+
+class AoCDropdownView(discord.ui.View):
+ """Interactive view to filter AoC stats by Day and Star."""
+
+ def __init__(self, original_author: discord.Member, day_and_star_data: dict[str: dict], maximum_scorers: int):
+ super().__init__()
+ self.day = 0
+ self.star = 0
+ self.data = day_and_star_data
+ self.maximum_scorers = maximum_scorers
+ self.original_author = original_author
+
+ def generate_output(self) -> str:
+ """
+ Generates a formatted codeblock with AoC statistics based on the currently selected day and star.
+
+ Optionally, when the requested day and star data does not exist yet it returns an error message.
+ """
+ header = AOC_DAY_AND_STAR_TEMPLATE.format(
+ rank="Rank",
+ name="Name", completion_time="Completion time (UTC)"
+ )
+ lines = [f"{header}\n{'-' * (len(header) + 2)}"]
+ if not (day_and_star_data := self.data.get(f"{self.day}-{self.star}")):
+ return ":x: The requested data for the specified day and star does not exist yet."
+ for rank, scorer in enumerate(day_and_star_data[:self.maximum_scorers]):
+ time_data = datetime.fromtimestamp(scorer['completion_time']).strftime("%I:%M:%S %p")
+ lines.append(AOC_DAY_AND_STAR_TEMPLATE.format(
+ datastamp="",
+ rank=rank + 1,
+ name=scorer['member_name'],
+ completion_time=time_data)
+ )
+ joined_lines = "\n".join(lines)
+ return f"Statistics for Day: {self.day}, Star: {self.star}.\n ```\n{joined_lines}\n```"
+
+ async def interaction_check(self, interaction: discord.Interaction) -> bool:
+ """Global check to ensure that the interacting user is the user who invoked the command originally."""
+ if interaction.user != self.original_author:
+ await interaction.response.send_message(
+ ":x: You can't interact with someone else's response. Please run the command yourself!",
+ ephemeral=True
+ )
+ return False
+ return True
+
+ @discord.ui.select(
+ placeholder="Day",
+ options=[discord.SelectOption(label=str(i)) for i in range(1, 26)],
+ custom_id="day_select"
+ )
+ async def day_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None:
+ """Dropdown to choose a Day of the AoC."""
+ self.day = select.values[0]
+
+ @discord.ui.select(
+ placeholder="Star",
+ options=[discord.SelectOption(label=str(i)) for i in range(1, 3)],
+ custom_id="star_select"
+ )
+ async def star_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None:
+ """Dropdown to choose either the first or the second star."""
+ self.star = select.values[0]
+
+ @discord.ui.button(label="Fetch", style=discord.ButtonStyle.blurple)
+ async def fetch(self, button: discord.ui.Button, interaction: discord.Interaction) -> None:
+ """Button that fetches the statistics based on the dropdown values."""
+ if self.day == 0 or self.star == 0:
+ await interaction.response.send_message(
+ "You have to select a value from both of the dropdowns!",
+ ephemeral=True
+ )
+ else:
+ await interaction.response.edit_message(content=self.generate_output())
+ self.day = 0
+ self.star = 0
diff --git a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
index e3053851..1774564b 100644
--- a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
+++ b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
@@ -52,10 +52,10 @@ class HacktoberIssues(commands.Cog):
async def get_issues(self, ctx: commands.Context, option: str) -> Optional[dict]:
"""Get a list of the python issues with the label 'hacktoberfest' from the Github api."""
if option == "beginner":
- if (ctx.message.created_at - self.cache_timer_beginner).seconds <= 60:
+ if (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_beginner).seconds <= 60:
log.debug("using cache")
return self.cache_beginner
- elif (ctx.message.created_at - self.cache_timer_normal).seconds <= 60:
+ elif (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_normal).seconds <= 60:
log.debug("using cache")
return self.cache_normal
@@ -88,10 +88,10 @@ class HacktoberIssues(commands.Cog):
if option == "beginner":
self.cache_beginner = data
- self.cache_timer_beginner = ctx.message.created_at
+ self.cache_timer_beginner = ctx.message.created_at.replace(tzinfo=None)
else:
self.cache_normal = data
- self.cache_timer_normal = ctx.message.created_at
+ self.cache_timer_normal = ctx.message.created_at.replace(tzinfo=None)
return data
@@ -100,7 +100,8 @@ class HacktoberIssues(commands.Cog):
"""Format the issue data into a embed."""
title = issue["title"]
issue_url = issue["url"].replace("api.", "").replace("/repos/", "/")
- body = issue["body"]
+ # issues can have empty bodies, which in that case GitHub doesn't include the key in the API response
+ body = issue.get("body", "")
labels = [label["name"] for label in issue["labels"]]
embed = discord.Embed(title=title)
diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/events/trivianight/__init__.py
diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py
new file mode 100644
index 00000000..8b012a17
--- /dev/null
+++ b/bot/exts/events/trivianight/_game.py
@@ -0,0 +1,192 @@
+import time
+from random import randrange
+from string import ascii_uppercase
+from typing import Iterable, NamedTuple, Optional, TypedDict
+
+DEFAULT_QUESTION_POINTS = 10
+DEFAULT_QUESTION_TIME = 20
+
+
+class QuestionData(TypedDict):
+ """Representing the different 'keys' of the question taken from the JSON."""
+
+ number: str
+ description: str
+ answers: list[str]
+ correct: str
+ points: Optional[int]
+ time: Optional[int]
+
+
+class UserGuess(NamedTuple):
+ """Represents the user's guess for a question."""
+
+ answer: str
+ editable: bool
+ elapsed: float
+
+
+class QuestionClosed(RuntimeError):
+ """Exception raised when the question is not open for guesses anymore."""
+
+
+class AlreadyUpdated(RuntimeError):
+ """Exception raised when the user has already updated their guess once."""
+
+
+class AllQuestionsVisited(RuntimeError):
+ """Exception raised when all of the questions have been visited."""
+
+
+class Question:
+ """Interface for one question in a trivia night game."""
+
+ def __init__(self, data: QuestionData):
+ self._data = data
+ self._guesses: dict[int, UserGuess] = {}
+ self._started = None
+
+ # These properties are mostly proxies to the underlying data:
+
+ @property
+ def number(self) -> str:
+ """The number of the question."""
+ return self._data["number"]
+
+ @property
+ def description(self) -> str:
+ """The description of the question."""
+ return self._data["description"]
+
+ @property
+ def answers(self) -> list[tuple[str, str]]:
+ """
+ The possible answers for this answer.
+
+ This is a property that returns a list of letter, answer pairs.
+ """
+ return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])]
+
+ @property
+ def correct(self) -> str:
+ """The correct answer for this question."""
+ return self._data["correct"]
+
+ @property
+ def max_points(self) -> int:
+ """The maximum points that can be awarded for this question."""
+ return self._data.get("points") or DEFAULT_QUESTION_POINTS
+
+ @property
+ def time(self) -> float:
+ """The time allowed to answer the question."""
+ return self._data.get("time") or DEFAULT_QUESTION_TIME
+
+ def start(self) -> float:
+ """Start the question and return the time it started."""
+ self._started = time.perf_counter()
+ return self._started
+
+ def _update_guess(self, user: int, answer: str) -> UserGuess:
+ """Update an already existing guess."""
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ if self._guesses[user][1] is False:
+ raise AlreadyUpdated(f"User({user}) has already updated their guess once.")
+
+ self._guesses[user] = (answer, False, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def guess(self, user: int, answer: str) -> UserGuess:
+ """Add a guess made by a user to the current question."""
+ if user in self._guesses:
+ return self._update_guess(user, answer)
+
+ if self._started is None:
+ raise QuestionClosed("Question is not open for answers.")
+
+ self._guesses[user] = (answer, True, time.perf_counter() - self._started)
+ return self._guesses[user]
+
+ def stop(self) -> dict[int, UserGuess]:
+ """Stop the question and return the guesses that were made."""
+ guesses = self._guesses
+
+ self._started = None
+ self._guesses = {}
+
+ return guesses
+
+
+class TriviaNightGame:
+ """Interface for managing a game of trivia night."""
+
+ def __init__(self, data: list[QuestionData]) -> None:
+ self._questions = [Question(q) for q in data]
+ # A copy of the questions to keep for `.trivianight list`
+ self._all_questions = list(self._questions)
+ self.current_question: Optional[Question] = None
+ self._points = {}
+ self._speed = {}
+
+ def __iter__(self) -> Iterable[Question]:
+ return iter(self._questions)
+
+ def next_question(self, number: str = None) -> Question:
+ """
+ Consume one random question from the trivia night game.
+
+ One question is randomly picked from the list of questions which is then removed and returned.
+ """
+ if self.current_question is not None:
+ raise RuntimeError("Cannot call next_question() when there is a current question.")
+
+ if number is not None:
+ try:
+ question = [q for q in self._all_questions if q.number == int(number)][0]
+ except IndexError:
+ raise ValueError(f"Question number {number} does not exist.")
+ elif len(self._questions) == 0:
+ raise AllQuestionsVisited("All of the questions have been visited.")
+ else:
+ question = self._questions.pop(randrange(len(self._questions)))
+
+ self.current_question = question
+ return question
+
+ def end_question(self) -> None:
+ """
+ End the current question.
+
+ This method should be called when the question has been answered, it must be called before
+ attempting to call `next_question()` again.
+ """
+ if self.current_question is None:
+ raise RuntimeError("Cannot call end_question() when there is no current question.")
+
+ self.current_question.stop()
+ self.current_question = None
+
+ def list_questions(self) -> str:
+ """
+ List all the questions.
+
+ This method should be called when `.trivianight list` is called to display the following information:
+ - Question number
+ - Question description
+ - Visited/not visited
+ """
+ question_list = []
+
+ visited = ":white_check_mark:"
+ not_visited = ":x:"
+
+ for question in self._all_questions:
+ formatted_string = (
+ f"**Q{question.number}** {not_visited if question in self._questions else visited}"
+ f"\n{question.description}\n\n"
+ )
+ question_list.append(formatted_string.rstrip())
+
+ return question_list
diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py
new file mode 100644
index 00000000..d6beced9
--- /dev/null
+++ b/bot/exts/events/trivianight/_questions.py
@@ -0,0 +1,179 @@
+from random import choice
+from string import ascii_uppercase
+
+import discord
+from discord import Embed, Interaction
+from discord.ui import Button, View
+
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+from ._game import AlreadyUpdated, Question, QuestionClosed
+from ._scoreboard import Scoreboard
+
+
+class AnswerButton(Button):
+ """Button subclass that's used to guess on a particular answer."""
+
+ def __init__(self, label: str, question: Question):
+ super().__init__(label=label, style=discord.ButtonStyle.green)
+
+ self.question = question
+
+ async def callback(self, interaction: Interaction) -> None:
+ """
+ When a user interacts with the button, this will be called.
+
+ Parameters:
+ - interaction: an instance of discord.Interaction representing the interaction between the user and the
+ button.
+ """
+ try:
+ guess = self.question.guess(interaction.user.id, self.label)
+ except AlreadyUpdated:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You've already changed your answer more than once!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+ except QuestionClosed:
+ await interaction.response.send_message(
+ embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="The question is no longer accepting guesses!",
+ color=Colours.soft_red
+ ),
+ ephemeral=True
+ )
+ return
+
+ if guess[1]:
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You chose answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+ else:
+ # guess[1] is False and they cannot change their answer again. Which
+ # indicates that they changed it this time around.
+ await interaction.response.send_message(
+ embed=Embed(
+ title="Confirming that...",
+ description=f"You changed your answer to answer {self.label}.",
+ color=Colours.soft_green
+ ),
+ ephemeral=True
+ )
+
+
+class QuestionView(View):
+ """View for one trivia night question."""
+
+ def __init__(self, question: Question) -> None:
+ super().__init__()
+ self.question = question
+
+ for letter, _ in self.question.answers:
+ self.add_item(AnswerButton(letter, self.question))
+
+ @staticmethod
+ def unicodeify(text: str) -> str:
+ """
+ Takes `text` and adds zero-width spaces to prevent copy and pasting the question.
+
+ Parameters:
+ - text: A string that represents the question description to 'unicodeify'
+ """
+ return "".join(
+ f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter
+ for idx, letter in enumerate(text)
+ )
+
+ def create_embed(self) -> Embed:
+ """Helper function to create the embed for the current question."""
+ question_embed = Embed(
+ title=f"Question {self.question.number}",
+ description=self.unicodeify(self.question.description),
+ color=Colours.python_yellow
+ )
+
+ for label, answer in self.question.answers:
+ question_embed.add_field(name=f"Answer {label}", value=answer, inline=False)
+
+ return question_embed
+
+ def end_question(self, scoreboard: Scoreboard) -> Embed:
+ """
+ Ends the question and displays the statistics on who got the question correct, awards points, etc.
+
+ Returns:
+ An embed displaying the correct answers and the % of people that chose each answer.
+ """
+ guesses = self.question.stop()
+
+ labels = ascii_uppercase[:len(self.question.answers)]
+
+ answer_embed = Embed(
+ title=f"The correct answer for Question {self.question.number} was...",
+ description=self.question.correct
+ )
+
+ if len(guesses) != 0:
+ answers_chosen = {
+ answer_choice: len(
+ tuple(filter(lambda x: x[0] == answer_choice, guesses.values()))
+ )
+ for answer_choice in labels
+ }
+
+ answers_chosen = dict(
+ sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True)
+ )
+
+ for answer, people_answered in answers_chosen.items():
+ is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct
+
+ # Setting the color of answer_embed to the % of people that got it correct via the mapping
+ if is_correct_answer:
+ # Maps the % of people who got it right to a color, from a range of red to green
+ percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB]
+ answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25]
+
+ field_title = (
+ (":white_check_mark: " if is_correct_answer else "")
+ + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose"
+ )
+
+ # The `ord` function is used here to change the letter to its corresponding position
+ answer_embed.add_field(
+ name=field_title,
+ value=self.question.answers[ord(answer) - 65][1],
+ inline=False
+ )
+
+ # Assign points to users
+ for user_id, answer in guesses.items():
+ if dict(self.question.answers)[answer[0]] == self.question.correct:
+ scoreboard.assign_points(
+ int(user_id),
+ points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points,
+ speed=answer[-1]
+ )
+ elif answer[-1] <= 2:
+ scoreboard.assign_points(
+ int(user_id),
+ points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points
+ )
+ else:
+ scoreboard.assign_points(
+ int(user_id),
+ points=0
+ )
+
+ return answer_embed
diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py
new file mode 100644
index 00000000..a5a5fcac
--- /dev/null
+++ b/bot/exts/events/trivianight/_scoreboard.py
@@ -0,0 +1,186 @@
+from random import choice
+
+import discord.ui
+from discord import ButtonStyle, Embed, Interaction, Member
+from discord.ui import Button, View
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+
+class ScoreboardView(View):
+ """View for the scoreboard."""
+
+ def __init__(self, bot: Bot):
+ super().__init__()
+ self.bot = bot
+
+ @staticmethod
+ def _int_to_ordinal(number: int) -> str:
+ """
+ Converts an integer into an ordinal number, i.e. 1 to 1st.
+
+ Parameters:
+ - number: an integer representing the number to convert to an ordinal number.
+ """
+ suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)]
+ if (number % 100) in {11, 12, 13}:
+ suffix = "th"
+
+ return str(number) + suffix
+
+ async def create_main_leaderboard(self) -> Embed:
+ """
+ Helper function that iterates through `self.points` to generate the main leaderboard embed.
+
+ The main leaderboard would be formatted like the following:
+ **1**. @mention of the user (# of points)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, points) in enumerate(self.points.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({points:.1f} pts)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ main_embed = Embed(
+ title="Winners of the Trivia Night",
+ description=formatted_string,
+ color=Colours.python_blue,
+ )
+
+ return main_embed
+
+ async def _create_speed_embed(self) -> Embed:
+ """
+ Helper function that iterates through `self.speed` to generate a leaderboard embed.
+
+ The speed leaderboard would be formatted like the following:
+ **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s)
+ along with the 29 other users who made it onto the leaderboard.
+ """
+ formatted_string = ""
+
+ for current_placement, (user, time_taken) in enumerate(self.speed.items()):
+ if current_placement + 1 > 30:
+ break
+
+ user = await self.bot.fetch_user(int(user))
+ formatted_string += f"**{current_placement + 1}.** {user.mention} "
+ formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n"
+ if (current_placement + 1) % 10 == 0:
+ formatted_string += "⎯⎯⎯⎯⎯⎯⎯⎯\n"
+
+ speed_embed = Embed(
+ title="Average time taken to answer a question",
+ description=formatted_string,
+ color=Colours.python_blue
+ )
+ return speed_embed
+
+ def _get_rank(self, member: Member) -> Embed:
+ """
+ Gets the member's rank for the points leaderboard and speed leaderboard.
+
+ Parameters:
+ - member: An instance of discord.Member representing the person who is trying to get their rank.
+ """
+ rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue)
+ # These are stored as strings so that the last digit can be determined to choose the suffix
+ try:
+ points_rank = str(list(self.points).index(member.id) + 1)
+ speed_rank = str(list(self.speed).index(member.id) + 1)
+ except ValueError:
+ return Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="It looks like you didn't participate in the Trivia Night event!",
+ color=Colours.soft_red
+ )
+
+ rank_embed.add_field(
+ name="Total Points",
+ value=(
+ f"You got {self._int_to_ordinal(int(points_rank))} place"
+ f" with {self.points[member.id]:.1f} points."
+ ),
+ inline=False
+ )
+
+ rank_embed.add_field(
+ name="Average Speed",
+ value=(
+ f"You got {self._int_to_ordinal(int(speed_rank))} place"
+ f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds."
+ ),
+ inline=False
+ )
+ return rank_embed
+
+ @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green)
+ async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the speed leaderboard embed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `Speed Leaderboard` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True)
+
+ @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple)
+ async def rank_button(self, button: Button, interaction: Interaction) -> None:
+ """
+ Send an ephemeral message with the user's rank for the overall points/average speed.
+
+ Parameters:
+ - button: The discord.ui.Button instance representing the `What's my rank?` button.
+ - interaction: The discord.Interaction instance containing information on the interaction between the user
+ and the button.
+ """
+ await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True)
+
+
+class Scoreboard:
+ """Class for the scoreboard for the Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self._bot = bot
+ self._points = {}
+ self._speed = {}
+
+ def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None:
+ """
+ Assign points or deduct points to/from a certain user.
+
+ This method should be called once the question has finished and all answers have been registered.
+ """
+ if points is not None and user_id not in self._points.keys():
+ self._points[user_id] = points
+ elif points is not None:
+ self._points[user_id] += points
+
+ if speed is not None and user_id not in self._speed.keys():
+ self._speed[user_id] = [1, speed]
+ elif speed is not None:
+ self._speed[user_id] = [
+ self._speed[user_id][0] + 1, self._speed[user_id][1] + speed
+ ]
+
+ async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]:
+ """Returns the embed of the main leaderboard along with the ScoreboardView."""
+ view = ScoreboardView(self._bot)
+
+ view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True))
+ view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0]))
+
+ return (
+ await view.create_main_leaderboard(),
+ view if not speed_leaderboard else await view._create_speed_embed()
+ )
diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py
new file mode 100644
index 00000000..18d8327a
--- /dev/null
+++ b/bot/exts/events/trivianight/trivianight.py
@@ -0,0 +1,328 @@
+import asyncio
+from json import JSONDecodeError, loads
+from random import choice
+from typing import Optional
+
+from discord import Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles
+from bot.utils.pagination import LinePaginator
+
+from ._game import AllQuestionsVisited, TriviaNightGame
+from ._questions import QuestionView
+from ._scoreboard import Scoreboard
+
+# The ID you see below are the Events Lead role ID and the Event Runner Role ID
+TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408)
+
+
+class TriviaNightCog(commands.Cog):
+ """Cog for the Python Trivia Night event."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.game: Optional[TriviaNightGame] = None
+ self.scoreboard: Optional[Scoreboard] = None
+ self.question_closed: asyncio.Event = None
+
+ @commands.group(aliases=["tn"], invoke_without_command=True)
+ async def trivianight(self, ctx: commands.Context) -> None:
+ """
+ The command group for the Python Discord Trivia Night.
+
+ If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is.
+ """
+ cog_description = Embed(
+ title="What is .trivianight?",
+ description=(
+ "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other"
+ " players in a trivia about Python!"
+ ),
+ color=Colours.soft_green
+ )
+ await ctx.send(embed=cog_description)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None:
+ """
+ Loads a JSON file from the provided attachment or argument.
+
+ The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below:
+ - number: int (represents the current question #)
+ - description: str (represents the question itself)
+ - answers: list[str] (represents the different answers possible, must be a length of 4)
+ - correct: str (represents the correct answer in terms of what the correct answer is in `answers`
+ - time: Optional[int] (represents the timer for the question and how long it should run, default is 10)
+ - points: Optional[int] (represents how many points are awarded for each question, default is 10)
+
+ The load command accepts three different ways of loading in a JSON:
+ - an attachment of the JSON file
+ - a message link to the attachment/JSON
+ - reading the JSON itself via a codeblock or plain text
+ """
+ if self.game is not None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already a trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if ctx.message.attachments:
+ json_text = (await ctx.message.attachments[0].read()).decode("utf8")
+ elif not to_load:
+ raise commands.BadArgument("You didn't attach an attachment nor link a message!")
+ elif (
+ to_load.startswith("https://discord.com/channels")
+ or to_load.startswith("https://discordapp.com/channels")
+ ):
+ channel_id, message_id = to_load.split("/")[-2:]
+ channel = await ctx.guild.fetch_channel(int(channel_id))
+ message = await channel.fetch_message(int(message_id))
+ if message.attachments:
+ json_text = (await message.attachments[0].read()).decode("utf8")
+ else:
+ json_text = message.content.replace("```", "").replace("json", "").replace("\n", "")
+ else:
+ json_text = to_load.replace("```", "").replace("json", "").replace("\n", "")
+
+ try:
+ serialized_json = loads(json_text)
+ except JSONDecodeError as error:
+ raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}")
+
+ self.game = TriviaNightGame(serialized_json)
+ self.question_closed = asyncio.Event()
+
+ success_embed = Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The JSON was loaded successfully!",
+ color=Colours.soft_green
+ )
+
+ self.scoreboard = Scoreboard(self.bot)
+
+ await ctx.send(embed=success_embed)
+
+ @trivianight.command(aliases=('next',))
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def question(self, ctx: commands.Context, question_number: str = None) -> None:
+ """
+ Gets a random question from the unanswered question list and lets the user(s) choose the answer.
+
+ This command will continuously count down until the time limit of the question is exhausted.
+ However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is already an ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ try:
+ next_question = self.game.next_question(question_number)
+ except AllQuestionsVisited:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="All of the questions have been used.",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ await ctx.send("Next question in 3 seconds! Get ready...")
+ await asyncio.sleep(3)
+
+ question_view = QuestionView(next_question)
+ question_embed = question_view.create_embed()
+
+ next_question.start()
+ message = await ctx.send(embed=question_embed, view=question_view)
+
+ # Exponentially sleep less and less until the time limit is reached
+ percentage = 1
+ while True:
+ percentage *= 0.5
+ duration = next_question.time * percentage
+
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+
+ if int(duration) > 1:
+ # It is quite ugly to display decimals, the delay for requests to reach Discord
+ # cause sub-second accuracy to be quite pointless.
+ await ctx.send(f"{int(duration)}s remaining...")
+ else:
+ # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a
+ # half, of that half) we must sleep both halves at the end.
+ await asyncio.wait([self.question_closed.wait()], timeout=duration)
+ if self.question_closed.is_set():
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+ self.question_closed.clear()
+ return
+ break
+
+ await ctx.send(embed=question_view.end_question(self.scoreboard))
+ await message.edit(embed=question_embed, view=None)
+
+ self.game.end_question()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def list(self, ctx: commands.Context) -> None:
+ """
+ Display all the questions left in the question bank.
+
+ Questions are displayed in the following format:
+ Q(number): Question description | :white_check_mark: if the question was used otherwise :x:.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ question_list = self.game.list_questions()
+
+ list_embed = Embed(title="All Trivia Night Questions")
+
+ if len(question_list) == 1:
+ list_embed.description = question_list[0]
+ await ctx.send(embed=list_embed)
+ else:
+ await LinePaginator.paginate(
+ question_list,
+ ctx,
+ list_embed
+ )
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def stop(self, ctx: commands.Context) -> None:
+ """
+ End the ongoing question to show the correct question.
+
+ This command should be used if the question should be ended early or if the time limit fails
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no ongoing question!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ self.question_closed.set()
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard view.
+
+ The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+
+ The scoreboard view also has a button where the user can see their own rank, points and average speed if they
+ didn't make it onto the leaderboard.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, scoreboard_view = await self.scoreboard.display()
+ await ctx.send(embed=scoreboard_embed, view=scoreboard_view)
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def scoreboard(self, ctx: commands.Context) -> None:
+ """
+ Displays the scoreboard.
+
+ The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the
+ 30 players who had the fastest average response time to a question where they got the question right.
+ """
+ if self.game is None:
+ await ctx.send(embed=Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="There is no trivia night running!",
+ color=Colours.soft_red
+ ))
+ return
+
+ if self.game.current_question is not None:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="You can't end the event while a question is ongoing!",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True)
+ await ctx.send(embeds=(scoreboard_embed, speed_scoreboard))
+
+ @trivianight.command()
+ @commands.has_any_role(*TRIVIA_NIGHT_ROLES)
+ async def end_game(self, ctx: commands.Context) -> None:
+ """Ends the ongoing game."""
+ self.game = None
+
+ await ctx.send(embed=Embed(
+ title=choice(POSITIVE_REPLIES),
+ description="The game has been stopped.",
+ color=Colours.soft_green
+ ))
+
+
+def setup(bot: Bot) -> None:
+ """Load the TriviaNight cog."""
+ bot.add_cog(TriviaNightCog(bot))
diff --git a/bot/exts/fun/anagram.py b/bot/exts/fun/anagram.py
new file mode 100644
index 00000000..79280fa9
--- /dev/null
+++ b/bot/exts/fun/anagram.py
@@ -0,0 +1,109 @@
+import asyncio
+import json
+import logging
+import random
+from pathlib import Path
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+log = logging.getLogger(__name__)
+
+TIME_LIMIT = 60
+
+# anagram.json file contains all the anagrams
+with open(Path("bot/resources/fun/anagram.json"), "r") as f:
+ ANAGRAMS_ALL = json.load(f)
+
+
+class AnagramGame:
+ """
+ Used for creating instances of anagram games.
+
+ Once multiple games can be run at the same time, this class' instances
+ can be used for keeping track of each anagram game.
+ """
+
+ def __init__(self, scrambled: str, correct: list[str]) -> None:
+ self.scrambled = scrambled
+ self.correct = set(correct)
+
+ self.winners = set()
+
+ async def message_creation(self, message: discord.Message) -> None:
+ """Check if the message is a correct answer and remove it from the list of answers."""
+ if message.content.lower() in self.correct:
+ self.winners.add(message.author.mention)
+ self.correct.remove(message.content.lower())
+
+
+class Anagram(commands.Cog):
+ """Cog for the Anagram game command."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ self.games: dict[int, AnagramGame] = {}
+
+ @commands.command(name="anagram", aliases=("anag", "gram", "ag"))
+ async def anagram_command(self, ctx: commands.Context) -> None:
+ """
+ Given shuffled letters, rearrange them into anagrams.
+
+ Show an embed with scrambled letters which if rearranged can form words.
+ After a specific amount of time, list the correct answers and whether someone provided a
+ correct answer.
+ """
+ if self.games.get(ctx.channel.id):
+ await ctx.send("An anagram is already being solved in this channel!")
+ return
+
+ scrambled_letters, correct = random.choice(list(ANAGRAMS_ALL.items()))
+
+ game = AnagramGame(scrambled_letters, correct)
+ self.games[ctx.channel.id] = game
+
+ anagram_embed = discord.Embed(
+ title=f"Find anagrams from these letters: '{scrambled_letters.upper()}'",
+ description=f"You have {TIME_LIMIT} seconds to find correct words.",
+ colour=Colours.purple,
+ )
+
+ await ctx.send(embed=anagram_embed)
+ await asyncio.sleep(TIME_LIMIT)
+
+ if game.winners:
+ win_list = ", ".join(game.winners)
+ content = f"Well done {win_list} for getting it right!"
+ else:
+ content = "Nobody got it right."
+
+ answer_embed = discord.Embed(
+ title=f"The words were: `{'`, `'.join(ANAGRAMS_ALL[game.scrambled])}`!",
+ colour=Colours.pink,
+ )
+
+ await ctx.send(content, embed=answer_embed)
+
+ # Game is finished, let's remove it from the dict
+ self.games.pop(ctx.channel.id)
+
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """Check a message for an anagram attempt and pass to an ongoing game."""
+ if message.author.bot or not message.guild:
+ return
+
+ game = self.games.get(message.channel.id)
+ if not game:
+ return
+
+ await game.message_creation(message)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Anagram cog."""
+ bot.add_cog(Anagram(bot))
diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py
index f4351954..beff196f 100644
--- a/bot/exts/fun/battleship.py
+++ b/bot/exts/fun/battleship.py
@@ -369,7 +369,6 @@ class Battleship(commands.Cog):
return any(player in (game.p1.user, game.p2.user) for game in self.games)
@commands.group(invoke_without_command=True)
- @commands.guild_only()
async def battleship(self, ctx: commands.Context) -> None:
"""
Play a game of Battleship with someone else!
diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py
index 647bb2b7..f53695d5 100644
--- a/bot/exts/fun/connect_four.py
+++ b/bot/exts/fun/connect_four.py
@@ -6,7 +6,6 @@ from typing import Optional, Union
import discord
import emojis
from discord.ext import commands
-from discord.ext.commands import guild_only
from bot.bot import Bot
from bot.constants import Emojis
@@ -361,7 +360,6 @@ class ConnectFour(commands.Cog):
self.games.remove(game)
raise
- @guild_only()
@commands.group(
invoke_without_command=True,
aliases=("4inarow", "connect4", "connectfour", "c4"),
@@ -426,7 +424,6 @@ class ConnectFour(commands.Cog):
await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2))
- @guild_only()
@connect_four.command(aliases=("bot", "computer", "cpu"))
async def ai(
self,
diff --git a/bot/exts/fun/duck_game.py b/bot/exts/fun/duck_game.py
index 1ef7513f..10b03a49 100644
--- a/bot/exts/fun/duck_game.py
+++ b/bot/exts/fun/duck_game.py
@@ -11,7 +11,7 @@ from PIL import Image, ImageDraw, ImageFont
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours, MODERATION_ROLES
+from bot.constants import MODERATION_ROLES
from bot.utils.decorators import with_role
DECK = list(product(*[(0, 1, 2)]*4))
@@ -130,6 +130,9 @@ class DuckGame:
while len(self.solutions) < minimum_solutions:
self.board = random.sample(DECK, size)
+ self.board_msg = None
+ self.found_msg = None
+
@property
def board(self) -> list[tuple[int]]:
"""Accesses board property."""
@@ -181,7 +184,7 @@ class DuckGamesDirector(commands.Cog):
)
@commands.cooldown(rate=1, per=2, type=commands.BucketType.channel)
async def start_game(self, ctx: commands.Context) -> None:
- """Generate a board, send the game embed, and end the game after a time limit."""
+ """Start a new Duck Duck Duck Goose game."""
if ctx.channel.id in self.current_games:
await ctx.send("There's already a game running!")
return
@@ -191,8 +194,8 @@ class DuckGamesDirector(commands.Cog):
game.running = True
self.current_games[ctx.channel.id] = game
- game.msg_content = ""
- game.embed_msg = await self.send_board_embed(ctx, game)
+ game.board_msg = await self.send_board_embed(ctx, game)
+ game.found_msg = await self.send_found_embed(ctx)
await asyncio.sleep(GAME_DURATION)
# Checking for the channel ID in the currently running games is not sufficient.
@@ -245,13 +248,13 @@ class DuckGamesDirector(commands.Cog):
if answer in game.solutions:
game.claimed_answers[answer] = msg.author
game.scores[msg.author] += CORRECT_SOLN
- await self.display_claimed_answer(game, msg.author, answer)
+ await self.append_to_found_embed(game, f"{str(answer):12s} - {msg.author.display_name}")
else:
await msg.add_reaction(EMOJI_WRONG)
game.scores[msg.author] += INCORRECT_SOLN
async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message:
- """Create and send the initial game embed. This will be edited as the game goes on."""
+ """Create and send an embed to display the board."""
image = assemble_board_image(game.board, game.rows, game.columns)
with BytesIO() as image_stream:
image.save(image_stream, format="png")
@@ -259,19 +262,27 @@ class DuckGamesDirector(commands.Cog):
file = discord.File(fp=image_stream, filename="board.png")
embed = discord.Embed(
title="Duck Duck Duck Goose!",
- color=Colours.bright_green,
+ color=discord.Color.dark_purple(),
)
embed.set_image(url="attachment://board.png")
return await ctx.send(embed=embed, file=file)
- async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None:
- """Add a claimed answer to the game embed."""
+ async def send_found_embed(self, ctx: commands.Context) -> discord.Message:
+ """Create and send an embed to display claimed answers. This will be edited as the game goes on."""
+ # Can't be part of the board embed because of discord.py limitations with editing an embed with an image.
+ embed = discord.Embed(
+ title="Flights Found",
+ color=discord.Color.dark_purple(),
+ )
+ return await ctx.send(embed=embed)
+
+ async def append_to_found_embed(self, game: DuckGame, text: str) -> None:
+ """Append text to the claimed answers embed."""
async with game.editing_embed:
- # We specifically edit the message contents instead of the embed
- # Because we load in the image from the file, editing any portion of the embed
- # Does weird things to the image and this works around that weirdness
- game.msg_content = f"{game.msg_content}\n{str(answer):12s} - {author.display_name}"
- await game.embed_msg.edit(content=game.msg_content)
+ found_embed, = game.found_msg.embeds
+ old_desc = found_embed.description or ""
+ found_embed.description = f"{old_desc.rstrip()}\n{text}"
+ await game.found_msg.edit(embed=found_embed)
async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None:
"""Edit the game embed to reflect the end of the game and mark the game as not running."""
@@ -296,8 +307,7 @@ class DuckGamesDirector(commands.Cog):
missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed)
else:
missed_text = "All the flights were found!"
-
- await game.embed_msg.edit(content=f"{missed_text}")
+ await self.append_to_found_embed(game, f"\n{missed_text}")
@start_game.command(name="help")
async def show_rules(self, ctx: commands.Context) -> None:
diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py
index f9c150e6..5f56bef7 100644
--- a/bot/exts/fun/game.py
+++ b/bot/exts/fun/game.py
@@ -118,6 +118,7 @@ class GameStatus(IntEnum):
Offline = 5
Cancelled = 6
Rumored = 7
+ Delisted = 8
class AgeRatingCategories(IntEnum):
@@ -125,6 +126,11 @@ class AgeRatingCategories(IntEnum):
ESRB = 1
PEGI = 2
+ CERO = 3
+ USK = 4
+ GRAC = 5
+ CLASS_IND = 6
+ ACB = 7
class AgeRatings(IntEnum):
@@ -142,6 +148,32 @@ class AgeRatings(IntEnum):
T = 10
M = 11
AO = 12
+ CERO_A = 13
+ CERO_B = 14
+ CERO_C = 15
+ CERO_D = 16
+ CERO_Z = 17
+ USK_0 = 18
+ USK_6 = 19
+ USK_12 = 20
+ USK_18 = 21
+ GRAC_ALL = 22
+ GRAC_Twelve = 23
+ GRAC_Fifteen = 24
+ GRAC_Eighteen = 25
+ GRAC_TESTING = 26
+ CLASS_IND_L = 27
+ CLASS_IND_Ten = 28
+ CLASS_IND_Twelve = 29
+ CLASS_IND_Fourteen = 30
+ CLASS_IND_Sixteen = 31
+ CLASS_IND_Eighteen = 32
+ ACB_G = 33
+ ACB_PG = 34
+ ACB_M = 35
+ ACB_MA15 = 36
+ ACB_R18 = 37
+ ACB_RC = 38
class Games(Cog):
diff --git a/bot/exts/fun/latex.py b/bot/exts/fun/latex.py
new file mode 100644
index 00000000..d43ec8c4
--- /dev/null
+++ b/bot/exts/fun/latex.py
@@ -0,0 +1,130 @@
+import hashlib
+import re
+import string
+from io import BytesIO
+from pathlib import Path
+from typing import BinaryIO, Optional
+
+import discord
+from PIL import Image
+from discord.ext import commands
+
+from bot.bot import Bot
+
+FORMATTED_CODE_REGEX = re.compile(
+ r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
+ r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
+ r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
+ r"(?P<code>.*?)" # extract all code inside the markup
+ r"\s*" # any more whitespace before the end of the code markup
+ r"(?P=delim)", # match the exact same delimiter from the start again
+ re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
+)
+
+LATEX_API_URL = "https://rtex.probablyaweb.site/api/v2"
+PASTEBIN_URL = "https://paste.pythondiscord.com"
+
+THIS_DIR = Path(__file__).parent
+CACHE_DIRECTORY = THIS_DIR / "_latex_cache"
+CACHE_DIRECTORY.mkdir(exist_ok=True)
+TEMPLATE = string.Template(Path("bot/resources/fun/latex_template.txt").read_text())
+
+PAD = 10
+
+
+def _prepare_input(text: str) -> str:
+ """Extract latex from a codeblock, if it is in one."""
+ if match := FORMATTED_CODE_REGEX.match(text):
+ return match.group("code")
+ else:
+ return text
+
+
+def _process_image(data: bytes, out_file: BinaryIO) -> None:
+ """Read `data` as an image file, and paste it on a white background."""
+ image = Image.open(BytesIO(data)).convert("RGBA")
+ width, height = image.size
+ background = Image.new("RGBA", (width + 2 * PAD, height + 2 * PAD), "WHITE")
+
+ # paste the image on the background, using the same image as the mask
+ # when an RGBA image is passed as the mask, its alpha band is used.
+ # this has the effect of skipping pasting the pixels where the image is transparent.
+ background.paste(image, (PAD, PAD), image)
+ background.save(out_file)
+
+
+class InvalidLatexError(Exception):
+ """Represents an error caused by invalid latex."""
+
+ def __init__(self, logs: Optional[str]):
+ super().__init__(logs)
+ self.logs = logs
+
+
+class Latex(commands.Cog):
+ """Renders latex."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ async def _generate_image(self, query: str, out_file: BinaryIO) -> None:
+ """Make an API request and save the generated image to cache."""
+ payload = {"code": query, "format": "png"}
+ async with self.bot.http_session.post(LATEX_API_URL, data=payload, raise_for_status=True) as response:
+ response_json = await response.json()
+ if response_json["status"] != "success":
+ raise InvalidLatexError(logs=response_json.get("log"))
+ async with self.bot.http_session.get(
+ f"{LATEX_API_URL}/{response_json['filename']}",
+ raise_for_status=True
+ ) as response:
+ _process_image(await response.read(), out_file)
+
+ async def _upload_to_pastebin(self, text: str) -> Optional[str]:
+ """Uploads `text` to the paste service, returning the url if successful."""
+ try:
+ async with self.bot.http_session.post(
+ PASTEBIN_URL + "/documents",
+ data=text,
+ raise_for_status=True
+ ) as response:
+ response_json = await response.json()
+ if "key" in response_json:
+ return f"{PASTEBIN_URL}/{response_json['key']}.txt?noredirect"
+ except Exception:
+ # 400 (Bad Request) means there are too many characters
+ pass
+
+ @commands.command()
+ @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
+ async def latex(self, ctx: commands.Context, *, query: str) -> None:
+ """Renders the text in latex and sends the image."""
+ query = _prepare_input(query)
+
+ # the hash of the query is used as the filename in the cache.
+ query_hash = hashlib.md5(query.encode()).hexdigest()
+ image_path = CACHE_DIRECTORY / f"{query_hash}.png"
+ async with ctx.typing():
+ if not image_path.exists():
+ try:
+ with open(image_path, "wb") as out_file:
+ await self._generate_image(TEMPLATE.substitute(text=query), out_file)
+ except InvalidLatexError as err:
+ embed = discord.Embed(title="Failed to render input.")
+ if err.logs is None:
+ embed.description = "No logs available."
+ else:
+ logs_paste_url = await self._upload_to_pastebin(err.logs)
+ if logs_paste_url:
+ embed.description = f"[View Logs]({logs_paste_url})"
+ else:
+ embed.description = "Couldn't upload logs."
+ await ctx.send(embed=embed)
+ image_path.unlink()
+ return
+ await ctx.send(file=discord.File(image_path, "latex.png"))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Latex Cog."""
+ bot.add_cog(Latex(bot))
diff --git a/bot/exts/fun/madlibs.py b/bot/exts/fun/madlibs.py
new file mode 100644
index 00000000..21708e53
--- /dev/null
+++ b/bot/exts/fun/madlibs.py
@@ -0,0 +1,148 @@
+import json
+from asyncio import TimeoutError
+from pathlib import Path
+from random import choice
+from typing import TypedDict
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+TIMEOUT = 60.0
+
+
+class MadlibsTemplate(TypedDict):
+ """Structure of a template in the madlibs JSON file."""
+
+ title: str
+ blanks: list[str]
+ value: list[str]
+
+
+class Madlibs(commands.Cog):
+ """Cog for the Madlibs game."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.templates = self._load_templates()
+ self.edited_content = {}
+ self.checks = set()
+
+ @staticmethod
+ def _load_templates() -> list[MadlibsTemplate]:
+ madlibs_stories = Path("bot/resources/fun/madlibs_templates.json")
+
+ with open(madlibs_stories) as file:
+ return json.load(file)
+
+ @staticmethod
+ def madlibs_embed(part_of_speech: str, number_of_inputs: int) -> discord.Embed:
+ """Method to generate an embed with the game information."""
+ madlibs_embed = discord.Embed(title="Madlibs", color=Colours.python_blue)
+
+ madlibs_embed.add_field(
+ name="Enter a word that fits the given part of speech!",
+ value=f"Part of speech: {part_of_speech}\n\nMake sure not to spam, or you may get auto-muted!"
+ )
+
+ madlibs_embed.set_footer(text=f"Inputs remaining: {number_of_inputs}")
+
+ return madlibs_embed
+
+ @commands.Cog.listener()
+ async def on_message_edit(self, _: discord.Message, after: discord.Message) -> None:
+ """A listener that checks for message edits from the user."""
+ for check in self.checks:
+ if check(after):
+ break
+ else:
+ return
+
+ self.edited_content[after.id] = after.content
+
+ @commands.command()
+ @commands.max_concurrency(1, per=commands.BucketType.user)
+ async def madlibs(self, ctx: commands.Context) -> None:
+ """
+ Play Madlibs with the bot!
+
+ Madlibs is a game where the player is asked to enter a word that
+ fits a random part of speech (e.g. noun, adjective, verb, plural noun, etc.)
+ a random amount of times, depending on the story chosen by the bot at the beginning.
+ """
+ random_template = choice(self.templates)
+
+ def author_check(message: discord.Message) -> bool:
+ return message.channel.id == ctx.channel.id and message.author.id == ctx.author.id
+
+ self.checks.add(author_check)
+
+ loading_embed = discord.Embed(
+ title="Madlibs", description="Loading your Madlibs game...", color=Colours.python_blue
+ )
+ original_message = await ctx.send(embed=loading_embed)
+
+ submitted_words = {}
+
+ for i, part_of_speech in enumerate(random_template["blanks"]):
+ inputs_left = len(random_template["blanks"]) - i
+
+ madlibs_embed = self.madlibs_embed(part_of_speech, inputs_left)
+ await original_message.edit(embed=madlibs_embed)
+
+ try:
+ message = await self.bot.wait_for(event="message", check=author_check, timeout=TIMEOUT)
+ except TimeoutError:
+ timeout_embed = discord.Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="Uh oh! You took too long to respond!",
+ color=Colours.soft_red
+ )
+
+ await ctx.send(ctx.author.mention, embed=timeout_embed)
+
+ for msg_id in submitted_words:
+ self.edited_content.pop(msg_id, submitted_words[msg_id])
+
+ self.checks.remove(author_check)
+
+ return
+
+ submitted_words[message.id] = message.content
+
+ blanks = [self.edited_content.pop(msg_id, submitted_words[msg_id]) for msg_id in submitted_words]
+
+ self.checks.remove(author_check)
+
+ story = []
+ for value, blank in zip(random_template["value"], blanks):
+ story.append(f"{value}__{blank}__")
+
+ # In each story template, there is always one more "value"
+ # (fragment from the story) than there are blanks (words that the player enters)
+ # so we need to compensate by appending the last line of the story again.
+ story.append(random_template["value"][-1])
+
+ story_embed = discord.Embed(
+ title=random_template["title"],
+ description="".join(story),
+ color=Colours.bright_green
+ )
+
+ story_embed.set_footer(text=f"Generated for {ctx.author}", icon_url=ctx.author.display_avatar.url)
+
+ await ctx.send(embed=story_embed)
+
+ @madlibs.error
+ async def handle_madlibs_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
+ """Error handler for the Madlibs command."""
+ if isinstance(error, commands.MaxConcurrencyReached):
+ await ctx.send("You are already playing Madlibs!")
+ error.handled = True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Madlibs cog."""
+ bot.add_cog(Madlibs(bot))
diff --git a/bot/exts/fun/quack.py b/bot/exts/fun/quack.py
new file mode 100644
index 00000000..0c228aed
--- /dev/null
+++ b/bot/exts/fun/quack.py
@@ -0,0 +1,75 @@
+import logging
+import random
+from typing import Literal, Optional
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+
+API_URL = 'https://quackstack.pythondiscord.com'
+
+log = logging.getLogger(__name__)
+
+
+class Quackstack(commands.Cog):
+ """Cog used for wrapping Quackstack."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @commands.command()
+ async def quack(
+ self,
+ ctx: commands.Context,
+ ducktype: Literal["duck", "manduck"] = "duck",
+ *,
+ seed: Optional[str] = None
+ ) -> None:
+ """
+ Use the Quackstack API to generate a random duck.
+
+ If a seed is provided, a duck is generated based on the given seed.
+ Either "duck" or "manduck" can be provided to change the duck type generated.
+ """
+ ducktype = ducktype.lower()
+ quackstack_url = f"{API_URL}/{ducktype}"
+ params = {}
+ if seed is not None:
+ try:
+ seed = int(seed)
+ except ValueError:
+ # We just need to turn the string into an integer any way possible
+ seed = int.from_bytes(seed.encode(), "big")
+ params["seed"] = seed
+
+ async with self.bot.http_session.get(quackstack_url, params=params) as response:
+ error_embed = discord.Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description="The request failed. Please try again later.",
+ color=Colours.soft_red,
+ )
+ if response.status != 200:
+ log.error(f"Response to Quackstack returned code {response.status}")
+ await ctx.send(embed=error_embed)
+ return
+
+ data = await response.json()
+ file = data["file"]
+
+ embed = discord.Embed(
+ title=f"Quack! Here's a {ducktype} for you.",
+ description=f"A {ducktype} from Quackstack.",
+ color=Colours.grass_green,
+ url=f"{API_URL}/docs"
+ )
+
+ embed.set_image(url=API_URL + file)
+
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Loads the Quack cog."""
+ bot.add_cog(Quackstack(bot))
diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py
index de51339d..182fa9d9 100644
--- a/bot/exts/fun/snakes/_utils.py
+++ b/bot/exts/fun/snakes/_utils.py
@@ -6,13 +6,14 @@ import math
import random
from itertools import product
from pathlib import Path
+from typing import Union
from PIL import Image
from PIL.ImageDraw import ImageDraw
-from discord import File, Member, Reaction
+from discord import File, Member, Reaction, User
from discord.ext.commands import Cog, Context
-from bot.constants import Roles
+from bot.constants import MODERATION_ROLES
SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute()
@@ -395,7 +396,7 @@ class SnakeAndLaddersGame:
Listen for reactions until players have joined, and the game has been started.
"""
- def startup_event_check(reaction_: Reaction, user_: Member) -> bool:
+ def startup_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
all((
@@ -460,7 +461,7 @@ class SnakeAndLaddersGame:
await self.cancel_game()
return # We're done, no reactions for the last 5 minutes
- async def _add_player(self, user: Member) -> None:
+ async def _add_player(self, user: Union[User, Member]) -> None:
"""Add player to game."""
self.players.append(user)
self.player_tiles[user.id] = 1
@@ -469,7 +470,7 @@ class SnakeAndLaddersGame:
im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))
self.avatar_images[user.id] = im
- async def player_join(self, user: Member) -> None:
+ async def player_join(self, user: Union[User, Member]) -> None:
"""
Handle players joining the game.
@@ -495,7 +496,7 @@ class SnakeAndLaddersGame:
delete_after=10
)
- async def player_leave(self, user: Member) -> bool:
+ async def player_leave(self, user: Union[User, Member]) -> bool:
"""
Handle players leaving the game.
@@ -530,7 +531,7 @@ class SnakeAndLaddersGame:
await self.channel.send("**Snakes and Ladders**: Game has been canceled.")
self._destruct()
- async def start_game(self, user: Member) -> None:
+ async def start_game(self, user: Union[User, Member]) -> None:
"""
Allow the game author to begin the game.
@@ -551,7 +552,7 @@ class SnakeAndLaddersGame:
async def start_round(self) -> None:
"""Begin the round."""
- def game_event_check(reaction_: Reaction, user_: Member) -> bool:
+ def game_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
all((
@@ -644,7 +645,7 @@ class SnakeAndLaddersGame:
if not is_surrendered:
await self._complete_round()
- async def player_roll(self, user: Member) -> None:
+ async def player_roll(self, user: Union[User, Member]) -> None:
"""Handle the player's roll."""
if user.id not in self.player_tiles:
await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
@@ -691,7 +692,7 @@ class SnakeAndLaddersGame:
await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:")
self._destruct()
- def _check_winner(self) -> Member:
+ def _check_winner(self) -> Union[User, Member]:
"""Return a winning member if we're in the post-round state and there's a winner."""
if self.state != "post_round":
return None
@@ -716,6 +717,6 @@ class SnakeAndLaddersGame:
return x_level, y_level
@staticmethod
- def _is_moderator(user: Member) -> bool:
+ def _is_moderator(user: Union[User, Member]) -> bool:
"""Return True if the user is a Moderator."""
- return any(Roles.moderator == role.id for role in user.roles)
+ return any(role.id in MODERATION_ROLES for role in getattr(user, 'roles', []))
diff --git a/bot/exts/fun/tic_tac_toe.py b/bot/exts/fun/tic_tac_toe.py
index 5c4f8051..5dd38a81 100644
--- a/bot/exts/fun/tic_tac_toe.py
+++ b/bot/exts/fun/tic_tac_toe.py
@@ -3,7 +3,7 @@ import random
from typing import Callable, Optional, Union
import discord
-from discord.ext.commands import Cog, Context, check, group, guild_only
+from discord.ext.commands import Cog, Context, check, group
from bot.bot import Bot
from bot.constants import Emojis
@@ -72,10 +72,12 @@ class Player:
class AI:
"""Tic Tac Toe AI class for against computer gaming."""
- def __init__(self, symbol: str):
+ def __init__(self, bot_user: discord.Member, symbol: str):
+ self.user = bot_user
self.symbol = symbol
- async def get_move(self, board: dict[int, str], _: discord.Message) -> tuple[bool, int]:
+ @staticmethod
+ async def get_move(board: dict[int, str], _: discord.Message) -> tuple[bool, int]:
"""Get move from AI. AI use Minimax strategy."""
possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())]
@@ -97,8 +99,8 @@ class AI:
return False, random.choice(open_edges)
def __str__(self) -> str:
- """Return `AI` as user name."""
- return "AI"
+ """Return mention of @Sir Lancebot."""
+ return self.user.mention
class Game:
@@ -107,6 +109,7 @@ class Game:
def __init__(self, players: list[Union[Player, AI]], ctx: Context):
self.players = players
self.ctx = ctx
+ self.channel = ctx.channel
self.board = {
1: Emojis.number_emojis[1],
2: Emojis.number_emojis[2],
@@ -173,7 +176,8 @@ class Game:
self.canceled = True
return False, "User declined"
- async def add_reactions(self, msg: discord.Message) -> None:
+ @staticmethod
+ async def add_reactions(msg: discord.Message) -> None:
"""Add number emojis to message."""
for nr in Emojis.number_emojis.values():
await msg.add_reaction(nr)
@@ -249,7 +253,6 @@ class TicTacToe(Cog):
def __init__(self):
self.games: list[Game] = []
- @guild_only()
@is_channel_free()
@is_requester_free()
@group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True)
@@ -265,7 +268,7 @@ class TicTacToe(Cog):
return
if opponent is None:
game = Game(
- [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)],
+ [Player(ctx.author, ctx, Emojis.x_square), AI(ctx.me, Emojis.o_square)],
ctx
)
else:
diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py
index 236586b0..4a1cec5b 100644
--- a/bot/exts/fun/trivia_quiz.py
+++ b/bot/exts/fun/trivia_quiz.py
@@ -16,7 +16,7 @@ from discord.ext import commands, tasks
from rapidfuzz import fuzz
from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES, Roles
+from bot.constants import Client, Colours, MODERATION_ROLES, NEGATIVE_REPLIES
logger = logging.getLogger(__name__)
@@ -332,7 +332,7 @@ class TriviaQuiz(commands.Cog):
if self.game_status[ctx.channel.id]:
await ctx.send(
"Game is already running... "
- f"do `{self.bot.command_prefix}quiz stop`"
+ f"do `{Client.prefix}quiz stop`"
)
return
@@ -550,7 +550,7 @@ class TriviaQuiz(commands.Cog):
if self.game_status[ctx.channel.id]:
# Check if the author is the game starter or a moderator.
if ctx.author == self.game_owners[ctx.channel.id] or any(
- Roles.moderator == role.id for role in ctx.author.roles
+ role.id in MODERATION_ROLES for role in getattr(ctx.author, 'roles', [])
):
self.game_status[ctx.channel.id] = False
del self.game_owners[ctx.channel.id]
diff --git a/bot/exts/holidays/easter/earth_photos.py b/bot/exts/holidays/easter/earth_photos.py
index f65790af..27442f1c 100644
--- a/bot/exts/holidays/easter/earth_photos.py
+++ b/bot/exts/holidays/easter/earth_photos.py
@@ -4,8 +4,7 @@ import discord
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours
-from bot.constants import Tokens
+from bot.constants import Colours, Tokens
log = logging.getLogger(__name__)
diff --git a/bot/exts/holidays/easter/egg_facts.py b/bot/exts/holidays/easter/egg_facts.py
index 5f216e0d..152af6a4 100644
--- a/bot/exts/holidays/easter/egg_facts.py
+++ b/bot/exts/holidays/easter/egg_facts.py
@@ -31,7 +31,7 @@ class EasterFacts(commands.Cog):
"""A background task that sends an easter egg fact in the event channel everyday."""
await self.bot.wait_until_guild_available()
- channel = self.bot.get_channel(Channels.community_bot_commands)
+ channel = self.bot.get_channel(Channels.sir_lancebot_playground)
await channel.send(embed=self.make_embed())
@commands.command(name="eggfact", aliases=("fact",))
diff --git a/bot/exts/holidays/halloween/candy_collection.py b/bot/exts/holidays/halloween/candy_collection.py
index 4afd5913..220ba8e5 100644
--- a/bot/exts/holidays/halloween/candy_collection.py
+++ b/bot/exts/holidays/halloween/candy_collection.py
@@ -55,7 +55,7 @@ class CandyCollection(commands.Cog):
if message.author.bot:
return
# ensure it's hacktober channel
- if message.channel.id != Channels.community_bot_commands:
+ if message.channel.id != Channels.sir_lancebot_playground:
return
# do random check for skull first as it has the lower chance
@@ -77,12 +77,17 @@ class CandyCollection(commands.Cog):
return
# check to ensure it is in correct channel
- if message.channel.id != Channels.community_bot_commands:
+ if message.channel.id != Channels.sir_lancebot_playground:
return
# if its not a candy or skull, and it is one of 10 most recent messages,
# proceed to add a skull/candy with higher chance
if str(reaction.emoji) not in (EMOJIS["SKULL"], EMOJIS["CANDY"]):
+ # Ensure the reaction is not for a bot's message so users can't spam
+ # reaction buttons like in .help to get candies.
+ if message.author.bot:
+ return
+
recent_message_ids = map(
lambda m: m.id,
await self.hacktober_channel.history(limit=10).flatten()
@@ -134,7 +139,7 @@ class CandyCollection(commands.Cog):
@property
def hacktober_channel(self) -> discord.TextChannel:
"""Get #hacktoberbot channel from its ID."""
- return self.bot.get_channel(id=Channels.community_bot_commands)
+ return self.bot.get_channel(Channels.sir_lancebot_playground)
@staticmethod
async def send_spook_msg(
@@ -182,13 +187,24 @@ class CandyCollection(commands.Cog):
for index, record in enumerate(top_five)
) if top_five else "No Candies"
- e = discord.Embed(colour=discord.Colour.blurple())
+ def get_user_candy_score() -> str:
+ for user_id, score in records:
+ if user_id == ctx.author.id:
+ return f"{ctx.author.mention}: {score}"
+ return f"{ctx.author.mention}: 0"
+
+ e = discord.Embed(colour=discord.Colour.og_blurple())
e.add_field(
name="Top Candy Records",
value=generate_leaderboard(),
inline=False
)
e.add_field(
+ name="Your Candy Score",
+ value=get_user_candy_score(),
+ inline=False
+ )
+ e.add_field(
name="\u200b",
value="Candies will randomly appear on messages sent. "
"\nHit the candy when it appears as fast as possible to get the candy! "
diff --git a/bot/exts/holidays/halloween/scarymovie.py b/bot/exts/holidays/halloween/scarymovie.py
index 33659fd8..89310b97 100644
--- a/bot/exts/holidays/halloween/scarymovie.py
+++ b/bot/exts/holidays/halloween/scarymovie.py
@@ -6,6 +6,7 @@ from discord.ext import commands
from bot.bot import Bot
from bot.constants import Tokens
+
log = logging.getLogger(__name__)
diff --git a/bot/exts/holidays/halloween/spookynamerate.py b/bot/exts/holidays/halloween/spookynamerate.py
index 2e59d4a8..02fb71c3 100644
--- a/bot/exts/holidays/halloween/spookynamerate.py
+++ b/bot/exts/holidays/halloween/spookynamerate.py
@@ -143,7 +143,7 @@ class SpookyNameRate(Cog):
if data["author"] == ctx.author.id:
await ctx.send(
"But you have already added an entry! Type "
- f"`{self.bot.command_prefix}spookynamerate "
+ f"`{Client.prefix}spookynamerate "
"delete` to delete it, and then you can add it again"
)
return
@@ -185,7 +185,7 @@ class SpookyNameRate(Cog):
return
await ctx.send(
- f"But you don't have an entry... :eyes: Type `{self.bot.command_prefix}spookynamerate add your entry`"
+ f"But you don't have an entry... :eyes: Type `{Client.prefix}spookynamerate add your entry`"
)
@Cog.listener()
@@ -223,9 +223,9 @@ class SpookyNameRate(Cog):
if self.first_time:
await channel.send(
"Okkey... Welcome to the **Spooky Name Rate Game**! It's a relatively simple game.\n"
- f"Everyday, a random name will be sent in <#{Channels.community_bot_commands}> "
+ f"Everyday, a random name will be sent in <#{Channels.sir_lancebot_playground}> "
"and you need to try and spookify it!\nRegister your name using "
- f"`{self.bot.command_prefix}spookynamerate add spookified name`"
+ f"`{Client.prefix}spookynamerate add spookified name`"
)
await self.data.set("first_time", False)
@@ -359,10 +359,10 @@ class SpookyNameRate(Cog):
"""Gets the sir-lancebot-channel after waiting until ready."""
await self.bot.wait_until_ready()
channel = self.bot.get_channel(
- Channels.community_bot_commands
- ) or await self.bot.fetch_channel(Channels.community_bot_commands)
+ Channels.sir_lancebot_playground
+ ) or await self.bot.fetch_channel(Channels.sir_lancebot_playground)
if not channel:
- logger.warning("Bot is unable to get the #seasonalbot-commands channel. Please check the channel ID.")
+ logger.warning("Bot is unable to get the #sir-lancebot-playground channel. Please check the channel ID.")
return channel
@staticmethod
diff --git a/bot/exts/holidays/halloween/spookyreact.py b/bot/exts/holidays/halloween/spookyreact.py
index 25e783f4..e228b91d 100644
--- a/bot/exts/holidays/halloween/spookyreact.py
+++ b/bot/exts/holidays/halloween/spookyreact.py
@@ -47,12 +47,12 @@ class SpookyReact(Cog):
Short-circuit helper check.
Return True if:
- * author is the bot
+ * author is a bot
* prefix is not None
"""
- # Check for self reaction
- if message.author == self.bot.user:
- log.debug(f"Ignoring reactions on self message. Message ID: {message.id}")
+ # Check if message author is a bot
+ if message.author.bot:
+ log.debug(f"Ignoring reactions on bot message. Message ID: {message.id}")
return True
# Check for command invocation
diff --git a/bot/exts/holidays/hanukkah/hanukkah_embed.py b/bot/exts/holidays/hanukkah/hanukkah_embed.py
index ac3eab7b..5767f91e 100644
--- a/bot/exts/holidays/hanukkah/hanukkah_embed.py
+++ b/bot/exts/holidays/hanukkah/hanukkah_embed.py
@@ -21,45 +21,41 @@ class HanukkahEmbed(commands.Cog):
def __init__(self, bot: Bot):
self.bot = bot
- self.hanukkah_days = []
- self.hanukkah_months = []
- self.hanukkah_years = []
+ self.hanukkah_dates: list[datetime.date] = []
- async def get_hanukkah_dates(self) -> list[str]:
+ def _parse_time_to_datetime(self, date: list[str]) -> datetime.datetime:
+ """Format the times provided by the api to datetime forms."""
+ try:
+ return datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z")
+ except ValueError:
+ # there is a possibility of an event not having a time, just a day
+ # to catch this, we try again without time information
+ return datetime.datetime.strptime(date, "%Y-%m-%d")
+
+ async def fetch_hanukkah_dates(self) -> list[datetime.date]:
"""Gets the dates for hanukkah festival."""
- hanukkah_dates = []
+ # clear the datetime objects to prevent a memory link
+ self.hanukkah_dates = []
async with self.bot.http_session.get(HEBCAL_URL) as response:
json_data = await response.json()
festivals = json_data["items"]
for festival in festivals:
if festival["title"].startswith("Chanukah"):
date = festival["date"]
- hanukkah_dates.append(date)
- return hanukkah_dates
+ self.hanukkah_dates.append(self._parse_time_to_datetime(date).date())
+ return self.hanukkah_dates
@in_month(Month.NOVEMBER, Month.DECEMBER)
@commands.command(name="hanukkah", aliases=("chanukah",))
async def hanukkah_festival(self, ctx: commands.Context) -> None:
"""Tells you about the Hanukkah Festivaltime of festival, festival day, etc)."""
- hanukkah_dates = await self.get_hanukkah_dates()
- self.hanukkah_dates_split(hanukkah_dates)
- hanukkah_start_day = int(self.hanukkah_days[0])
- hanukkah_start_month = int(self.hanukkah_months[0])
- hanukkah_start_year = int(self.hanukkah_years[0])
- hanukkah_end_day = int(self.hanukkah_days[8])
- hanukkah_end_month = int(self.hanukkah_months[8])
- hanukkah_end_year = int(self.hanukkah_years[8])
-
- hanukkah_start = datetime.date(hanukkah_start_year, hanukkah_start_month, hanukkah_start_day)
- hanukkah_end = datetime.date(hanukkah_end_year, hanukkah_end_month, hanukkah_end_day)
+ hanukkah_dates = await self.fetch_hanukkah_dates()
+ start_day = hanukkah_dates[0]
+ end_day = hanukkah_dates[-1]
today = datetime.date.today()
- # today = datetime.date(2019, 12, 24) (for testing)
- day = str(today.day)
- month = str(today.month)
- year = str(today.year)
embed = Embed(title="Hanukkah", colour=Colours.blue)
- if day in self.hanukkah_days and month in self.hanukkah_months and year in self.hanukkah_years:
- if int(day) == hanukkah_start_day:
+ if start_day <= today <= end_day:
+ if start_day == today:
now = datetime.datetime.utcnow()
hours = now.hour + 4 # using only hours
hanukkah_start_hour = 18
@@ -77,35 +73,27 @@ class HanukkahEmbed(commands.Cog):
)
await ctx.send(embed=embed)
return
- festival_day = self.hanukkah_days.index(day)
+ festival_day = hanukkah_dates.index(today)
number_suffixes = ["st", "nd", "rd", "th"]
suffix = number_suffixes[festival_day - 1 if festival_day <= 3 else 3]
message = ":menorah:" * festival_day
- embed.description = f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}"
- await ctx.send(embed=embed)
+ embed.description = (
+ f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}"
+ )
+ elif today < start_day:
+ format_start = start_day.strftime("%d of %B")
+ embed.description = (
+ "Hanukkah has not started yet. "
+ f"Hanukkah will start at sundown on {format_start}."
+ )
else:
- if today < hanukkah_start:
- festival_starting_month = hanukkah_start.strftime("%B")
- embed.description = (
- f"Hanukkah has not started yet. "
- f"Hanukkah will start at sundown on {hanukkah_start_day}th "
- f"of {festival_starting_month}."
- )
- else:
- festival_end_month = hanukkah_end.strftime("%B")
- embed.description = (
- f"Looks like you missed Hanukkah!"
- f"Hanukkah ended on {hanukkah_end_day}th of {festival_end_month}."
- )
-
- await ctx.send(embed=embed)
+ format_end = end_day.strftime("%d of %B")
+ embed.description = (
+ "Looks like you missed Hanukkah! "
+ f"Hanukkah ended on {format_end}."
+ )
- def hanukkah_dates_split(self, hanukkah_dates: list[str]) -> None:
- """We are splitting the dates for hanukkah into days, months and years."""
- for date in hanukkah_dates:
- self.hanukkah_days.append(date[8:10])
- self.hanukkah_months.append(date[5:7])
- self.hanukkah_years.append(date[0:4])
+ await ctx.send(embed=embed)
def setup(bot: Bot) -> None:
diff --git a/bot/exts/holidays/pride/pride_facts.py b/bot/exts/holidays/pride/pride_facts.py
index e6ef7108..340f0b43 100644
--- a/bot/exts/holidays/pride/pride_facts.py
+++ b/bot/exts/holidays/pride/pride_facts.py
@@ -30,7 +30,7 @@ class PrideFacts(commands.Cog):
"""Background task to post the daily pride fact every day."""
await self.bot.wait_until_guild_available()
- channel = self.bot.get_channel(Channels.community_bot_commands)
+ channel = self.bot.get_channel(Channels.sir_lancebot_playground)
await self.send_select_fact(channel, datetime.utcnow())
async def send_random_fact(self, ctx: commands.Context) -> None:
diff --git a/bot/exts/holidays/pride/pride_leader.py b/bot/exts/holidays/pride/pride_leader.py
index 298c9328..adf01134 100644
--- a/bot/exts/holidays/pride/pride_leader.py
+++ b/bot/exts/holidays/pride/pride_leader.py
@@ -83,7 +83,7 @@ class PrideLeader(commands.Cog):
embed.add_field(
name="For More Information",
value=f"Do `{constants.Client.prefix}wiki {name}`"
- f" in <#{constants.Channels.community_bot_commands}>",
+ f" in <#{constants.Channels.sir_lancebot_playground}>",
inline=False
)
embed.set_thumbnail(url=pride_leader["url"])
diff --git a/bot/exts/holidays/valentines/be_my_valentine.py b/bot/exts/holidays/valentines/be_my_valentine.py
index 4d454c3a..cbb95157 100644
--- a/bot/exts/holidays/valentines/be_my_valentine.py
+++ b/bot/exts/holidays/valentines/be_my_valentine.py
@@ -7,14 +7,16 @@ import discord
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Channels, Colours, Lovefest, Month
+from bot.constants import Channels, Colours, Lovefest, Month, PYTHON_PREFIX
from bot.utils.decorators import in_month
-from bot.utils.extensions import invoke_help_command
+from bot.utils.exceptions import MovedCommandError
log = logging.getLogger(__name__)
HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_heart:", ":two_hearts:"]
+MOVED_COMMAND = f"{PYTHON_PREFIX}subscribe"
+
class BeMyValentine(commands.Cog):
"""A cog that sends Valentines to other users!"""
@@ -30,40 +32,14 @@ class BeMyValentine(commands.Cog):
return loads(p.read_text("utf8"))
@in_month(Month.FEBRUARY)
- @commands.group(name="lovefest")
+ @commands.command(name="lovefest", help=f"NOTE: This command has been moved to {MOVED_COMMAND}")
async def lovefest_role(self, ctx: commands.Context) -> None:
"""
- Subscribe or unsubscribe from the lovefest role.
-
- The lovefest role makes you eligible to receive anonymous valentines from other users.
+ Deprecated lovefest role command.
- 1) use the command \".lovefest sub\" to get the lovefest role.
- 2) use the command \".lovefest unsub\" to get rid of the lovefest role.
+ This command has been moved to bot, and will be removed in the future.
"""
- if not ctx.invoked_subcommand:
- await invoke_help_command(ctx)
-
- @lovefest_role.command(name="sub")
- async def add_role(self, ctx: commands.Context) -> None:
- """Adds the lovefest role."""
- user = ctx.author
- role = ctx.guild.get_role(Lovefest.role_id)
- if role not in ctx.author.roles:
- await user.add_roles(role)
- await ctx.send("The Lovefest role has been added !")
- else:
- await ctx.send("You already have the role !")
-
- @lovefest_role.command(name="unsub")
- async def remove_role(self, ctx: commands.Context) -> None:
- """Removes the lovefest role."""
- user = ctx.author
- role = ctx.guild.get_role(Lovefest.role_id)
- if role not in ctx.author.roles:
- await ctx.send("You dont have the lovefest role.")
- else:
- await user.remove_roles(role)
- await ctx.send("The lovefest role has been successfully removed!")
+ raise MovedCommandError(MOVED_COMMAND)
@commands.cooldown(1, 1800, commands.BucketType.user)
@commands.group(name="bemyvalentine", invoke_without_command=True)
@@ -94,7 +70,7 @@ class BeMyValentine(commands.Cog):
raise commands.UserInputError("Come on, you can't send a valentine to yourself :expressionless:")
emoji_1, emoji_2 = self.random_emoji()
- channel = self.bot.get_channel(Channels.community_bot_commands)
+ channel = self.bot.get_channel(Channels.sir_lancebot_playground)
valentine, title = self.valentine_check(valentine_type)
embed = discord.Embed(
diff --git a/bot/exts/holidays/valentines/lovecalculator.py b/bot/exts/holidays/valentines/lovecalculator.py
index 3999db2b..10dea9df 100644
--- a/bot/exts/holidays/valentines/lovecalculator.py
+++ b/bot/exts/holidays/valentines/lovecalculator.py
@@ -12,7 +12,7 @@ from discord.ext import commands
from discord.ext.commands import BadArgument, Cog, clean_content
from bot.bot import Bot
-from bot.constants import Channels, Client, Lovefest, Month
+from bot.constants import Channels, Lovefest, Month, PYTHON_PREFIX
from bot.utils.decorators import in_month
log = logging.getLogger(__name__)
@@ -32,7 +32,7 @@ class LoveCalculator(Cog):
Tells you how much the two love each other.
This command requires at least one member as input, if two are given love will be calculated between
- those two users, if only one is given, the second member is asusmed to be the invoker.
+ those two users, if only one is given, the second member is assumed to be the invoker.
Members are converted from:
- User ID
- Mention
@@ -51,7 +51,7 @@ class LoveCalculator(Cog):
raise BadArgument(
"This command can only be ran against members with the lovefest role! "
"This role be can assigned by running "
- f"`{Client.prefix}lovefest sub` in <#{Channels.community_bot_commands}>."
+ f"`{PYTHON_PREFIX}subscribe` in <#{Channels.bot_commands}>."
)
if whom is None:
@@ -74,7 +74,8 @@ class LoveCalculator(Cog):
# We need the -1 due to how bisect returns the point
# see the documentation for further detail
# https://docs.python.org/3/library/bisect.html#bisect.bisect
- index = bisect.bisect(LOVE_DATA, (love_percent,)) - 1
+ love_threshold = [threshold for threshold, _ in LOVE_DATA]
+ index = bisect.bisect(love_threshold, love_percent) - 1
# We already have the nearest "fit" love level
# We only need the dict, so we can ditch the first element
_, data = LOVE_DATA[index]
@@ -89,7 +90,7 @@ class LoveCalculator(Cog):
name="A letter from Dr. Love:",
value=data["text"]
)
- embed.set_footer(text=f"You can unsubscribe from lovefest by using {Client.prefix}lovefest unsub")
+ embed.set_footer(text=f"You can unsubscribe from lovefest by using {PYTHON_PREFIX}subscribe.")
await ctx.send(embed=embed)
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py
index a91ef1c0..b50205a0 100644
--- a/bot/exts/utilities/bookmark.py
+++ b/bot/exts/utilities/bookmark.py
@@ -7,7 +7,7 @@ import discord
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS
+from bot.constants import Colours, ERROR_REPLIES, Icons, Roles
from bot.utils.converters import WrappedMessageConverter
from bot.utils.decorators import whitelist_override
@@ -16,7 +16,6 @@ log = logging.getLogger(__name__)
# Number of seconds to wait for other users to bookmark the same message
TIMEOUT = 120
BOOKMARK_EMOJI = "πŸ“Œ"
-WHITELISTED_CATEGORIES = (Categories.help_in_use,)
class Bookmark(commands.Cog):
@@ -87,8 +86,8 @@ class Bookmark(commands.Cog):
await message.add_reaction(BOOKMARK_EMOJI)
return message
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
@commands.command(name="bookmark", aliases=("bm", "pin"))
+ @whitelist_override(roles=(Roles.everyone,))
async def bookmark(
self,
ctx: commands.Context,
@@ -99,7 +98,13 @@ class Bookmark(commands.Cog):
"""Send the author a link to `target_message` via DMs."""
if not target_message:
if not ctx.message.reference:
- raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.")
+ raise commands.UserInputError(
+ "You must either provide a valid message to bookmark, or reply to one."
+ "\n\nThe lookup strategy for a message is as follows (in order):"
+ "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
+ "\n2. Lookup by message ID (the message **must** be in the context channel)"
+ "\n3. Lookup by message URL"
+ )
target_message = ctx.message.reference.resolved
# Prevent users from bookmarking a message in a channel they don't have access to
diff --git a/bot/exts/utilities/challenges.py b/bot/exts/utilities/challenges.py
new file mode 100644
index 00000000..ab7ae442
--- /dev/null
+++ b/bot/exts/utilities/challenges.py
@@ -0,0 +1,341 @@
+import logging
+from asyncio import to_thread
+from random import choice
+from typing import Union
+
+from bs4 import BeautifulSoup
+from discord import Embed, Interaction, SelectOption, ui
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, Emojis, NEGATIVE_REPLIES
+
+log = logging.getLogger(__name__)
+API_ROOT = "https://www.codewars.com/api/v1/code-challenges/{kata_id}"
+
+# Map difficulty for the kata to color we want to display in the embed.
+# These colors are representative of the colors that each kyu's level represents on codewars.com
+MAPPING_OF_KYU = {
+ 8: 0xdddbda, 7: 0xdddbda, 6: 0xecb613, 5: 0xecb613,
+ 4: 0x3c7ebb, 3: 0x3c7ebb, 2: 0x866cc7, 1: 0x866cc7
+}
+
+# Supported languages for a kata on codewars.com
+SUPPORTED_LANGUAGES = {
+ "stable": [
+ "c", "c#", "c++", "clojure", "coffeescript", "coq", "crystal", "dart", "elixir",
+ "f#", "go", "groovy", "haskell", "java", "javascript", "kotlin", "lean", "lua", "nasm",
+ "php", "python", "racket", "ruby", "rust", "scala", "shell", "sql", "swift", "typescript"
+ ],
+ "beta": [
+ "agda", "bf", "cfml", "cobol", "commonlisp", "elm", "erlang", "factor",
+ "forth", "fortran", "haxe", "idris", "julia", "nim", "objective-c", "ocaml",
+ "pascal", "perl", "powershell", "prolog", "purescript", "r", "raku", "reason", "solidity", "vb.net"
+ ]
+}
+
+
+class InformationDropdown(ui.Select):
+ """A dropdown inheriting from ui.Select that allows finding out other information about the kata."""
+
+ def __init__(self, language_embed: Embed, tags_embed: Embed, other_info_embed: Embed, main_embed: Embed):
+ options = [
+ SelectOption(
+ label="Main Information",
+ description="See the kata's difficulty, description, etc.",
+ emoji="🌎"
+ ),
+ SelectOption(
+ label="Languages",
+ description="See what languages this kata supports!",
+ emoji=Emojis.reddit_post_text
+ ),
+ SelectOption(
+ label="Tags",
+ description="See what categories this kata falls under!",
+ emoji=Emojis.stackoverflow_tag
+ ),
+ SelectOption(
+ label="Other Information",
+ description="See how other people performed on this kata and more!",
+ emoji="β„Ή"
+ )
+ ]
+
+ # We map the option label to the embed instance so that it can be easily looked up later in O(1)
+ self.mapping_of_embeds = {
+ "Main Information": main_embed,
+ "Languages": language_embed,
+ "Tags": tags_embed,
+ "Other Information": other_info_embed,
+ }
+
+ super().__init__(
+ placeholder="See more information regarding this kata",
+ min_values=1,
+ max_values=1,
+ options=options
+ )
+
+ async def callback(self, interaction: Interaction) -> None:
+ """Callback for when someone clicks on a dropdown."""
+ # Edit the message to the embed selected in the option
+ # The `original_message` attribute is set just after the message is sent with the view.
+ # The attribute is not set during initialization.
+ result_embed = self.mapping_of_embeds[self.values[0]]
+ await self.original_message.edit(embed=result_embed)
+
+
+class Challenges(commands.Cog):
+ """
+ Cog for the challenge command.
+
+ The challenge command pulls a random kata from codewars.com.
+ A kata is the name for a challenge, specific to codewars.com.
+
+ The challenge command also has filters to customize the kata that is given.
+ You can specify the language the kata should be from, difficulty and topic of the kata.
+ """
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ async def kata_id(self, search_link: str, params: dict) -> Union[str, Embed]:
+ """
+ Uses bs4 to get the HTML code for the page of katas, where the page is the link of the formatted `search_link`.
+
+ This will webscrape the search page with `search_link` and then get the ID of a kata for the
+ codewars.com API to use.
+ """
+ async with self.bot.http_session.get(search_link, params=params) as response:
+ if response.status != 200:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="We ran into an error when getting the kata from codewars.com, try again later.",
+ color=Colours.soft_red
+ )
+ log.error(f"Unexpected response from codewars.com, status code: {response.status}")
+ return error_embed
+
+ soup = BeautifulSoup(await response.text(), features="lxml")
+ first_kata_div = await to_thread(soup.find_all, "div", class_="item-title px-0")
+
+ if not first_kata_div:
+ raise commands.BadArgument("No katas could be found with the filters provided.")
+ elif len(first_kata_div) >= 3:
+ first_kata_div = choice(first_kata_div[:3])
+ elif "q=" not in search_link:
+ first_kata_div = choice(first_kata_div)
+ else:
+ first_kata_div = first_kata_div[0]
+
+ # There are numerous divs before arriving at the id of the kata, which can be used for the link.
+ first_kata_id = first_kata_div.a["href"].split("/")[-1]
+ return first_kata_id
+
+ async def kata_information(self, kata_id: str) -> Union[dict, Embed]:
+ """
+ Returns the information about the Kata.
+
+ Uses the codewars.com API to get information about the kata using `kata_id`.
+ """
+ async with self.bot.http_session.get(API_ROOT.format(kata_id=kata_id)) as response:
+ if response.status != 200:
+ error_embed = Embed(
+ title=choice(NEGATIVE_REPLIES),
+ description="We ran into an error when getting the kata information, try again later.",
+ color=Colours.soft_red
+ )
+ log.error(f"Unexpected response from codewars.com/api/v1, status code: {response.status}")
+ return error_embed
+
+ return await response.json()
+
+ @staticmethod
+ def main_embed(kata_information: dict) -> Embed:
+ """Creates the main embed which displays the name, difficulty and description of the kata."""
+ kata_description = kata_information["description"]
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ # Ensuring it isn't over the length 1024
+ if len(kata_description) > 1024:
+ kata_description = "\n".join(kata_description[:1000].split("\n")[:-1]) + "..."
+ kata_description += f" [continue reading]({kata_url})"
+
+ if kata_information["rank"]["name"] is None:
+ embed_color = 8
+ kata_difficulty = "Unable to retrieve difficulty for beta languages."
+ else:
+ embed_color = int(kata_information["rank"]["name"].replace(" kyu", ""))
+ kata_difficulty = kata_information["rank"]["name"]
+
+ kata_embed = Embed(
+ title=kata_information["name"],
+ description=kata_description,
+ color=MAPPING_OF_KYU[embed_color],
+ url=kata_url
+ )
+ kata_embed.add_field(name="Difficulty", value=kata_difficulty, inline=False)
+ return kata_embed
+
+ @staticmethod
+ def language_embed(kata_information: dict) -> Embed:
+ """Creates the 'language embed' which displays all languages the kata supports."""
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ languages = "\n".join(map(str.title, kata_information["languages"]))
+ language_embed = Embed(
+ title=kata_information["name"],
+ description=f"```yaml\nSupported Languages:\n{languages}\n```",
+ color=Colours.python_blue,
+ url=kata_url
+ )
+ return language_embed
+
+ @staticmethod
+ def tags_embed(kata_information: dict) -> Embed:
+ """
+ Creates the 'tags embed' which displays all the tags of the Kata.
+
+ Tags explain what the kata is about, this is what codewars.com calls categories.
+ """
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ tags = "\n".join(kata_information["tags"])
+ tags_embed = Embed(
+ title=kata_information["name"],
+ description=f"```yaml\nTags:\n{tags}\n```",
+ color=Colours.grass_green,
+ url=kata_url
+ )
+ return tags_embed
+
+ @staticmethod
+ def miscellaneous_embed(kata_information: dict) -> Embed:
+ """
+ Creates the 'other information embed' which displays miscellaneous information about the kata.
+
+ This embed shows statistics such as the total number of people who completed the kata,
+ the total number of stars of the kata, etc.
+ """
+ kata_url = f"https://codewars.com/kata/{kata_information['id']}"
+
+ embed = Embed(
+ title=kata_information["name"],
+ description="```nim\nOther Information\n```",
+ color=Colours.grass_green,
+ url=kata_url
+ )
+ embed.add_field(
+ name="`Total Score`",
+ value=f"```css\n{kata_information['voteScore']}\n```",
+ inline=False
+ )
+ embed.add_field(
+ name="`Total Stars`",
+ value=f"```css\n{kata_information['totalStars']}\n```",
+ inline=False
+ )
+ embed.add_field(
+ name="`Total Completed`",
+ value=f"```css\n{kata_information['totalCompleted']}\n```",
+ inline=False
+ )
+ embed.add_field(
+ name="`Total Attempts`",
+ value=f"```css\n{kata_information['totalAttempts']}\n```",
+ inline=False
+ )
+ return embed
+
+ @staticmethod
+ def create_view(dropdown: InformationDropdown, link: str) -> ui.View:
+ """
+ Creates the discord.py View for the Discord message components (dropdowns and buttons).
+
+ The discord UI is implemented onto the embed, where the user can choose what information about the kata they
+ want, along with a link button to the kata itself.
+ """
+ view = ui.View()
+ view.add_item(dropdown)
+ view.add_item(ui.Button(label="View the Kata", url=link))
+ return view
+
+ @commands.command(aliases=["kata"])
+ @commands.cooldown(1, 5, commands.BucketType.user)
+ async def challenge(self, ctx: commands.Context, language: str = "python", *, query: str = None) -> None:
+ """
+ The challenge command pulls a random kata (challenge) from codewars.com.
+
+ The different ways to use this command are:
+ `.challenge <language>` - Pulls a random challenge within that language's scope.
+ `.challenge <language> <difficulty>` - The difficulty can be from 1-8,
+ 1 being the hardest, 8 being the easiest. This pulls a random challenge within that difficulty & language.
+ `.challenge <language> <query>` - Pulls a random challenge with the query provided under the language
+ `.challenge <language> <query>, <difficulty>` - Pulls a random challenge with the query provided,
+ under that difficulty within the language's scope.
+ """
+ language = language.lower()
+ if language not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]:
+ raise commands.BadArgument("This is not a recognized language on codewars.com!")
+
+ get_kata_link = f"https://codewars.com/kata/search/{language}"
+ params = {}
+
+ if query is not None:
+ if "," in query:
+ query_splitted = query.split("," if ", " not in query else ", ")
+
+ if len(query_splitted) > 2:
+ raise commands.BadArgument(
+ "There can only be one comma within the query, separating the difficulty and the query itself."
+ )
+
+ query, level = query_splitted
+ params["q"] = query
+ params["r[]"] = f"-{level}"
+ elif query.isnumeric():
+ params["r[]"] = f"-{query}"
+ else:
+ params["q"] = query
+
+ params["beta"] = str(language in SUPPORTED_LANGUAGES["beta"]).lower()
+
+ first_kata_id = await self.kata_id(get_kata_link, params)
+ if isinstance(first_kata_id, Embed):
+ # We ran into an error when retrieving the website link
+ await ctx.send(embed=first_kata_id)
+ return
+
+ kata_information = await self.kata_information(first_kata_id)
+ if isinstance(kata_information, Embed):
+ # Something went wrong when trying to fetch the kata information
+ await ctx.d(embed=kata_information)
+ return
+
+ kata_embed = self.main_embed(kata_information)
+ language_embed = self.language_embed(kata_information)
+ tags_embed = self.tags_embed(kata_information)
+ miscellaneous_embed = self.miscellaneous_embed(kata_information)
+
+ dropdown = InformationDropdown(
+ main_embed=kata_embed,
+ language_embed=language_embed,
+ tags_embed=tags_embed,
+ other_info_embed=miscellaneous_embed
+ )
+ kata_view = self.create_view(dropdown, f"https://codewars.com/kata/{first_kata_id}")
+ original_message = await ctx.send(
+ embed=kata_embed,
+ view=kata_view
+ )
+ dropdown.original_message = original_message
+
+ wait_for_kata = await kata_view.wait()
+ if wait_for_kata:
+ await original_message.edit(embed=kata_embed, view=None)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Challenges cog."""
+ bot.add_cog(Challenges(bot))
diff --git a/bot/exts/utilities/colour.py b/bot/exts/utilities/colour.py
new file mode 100644
index 00000000..ee6bad93
--- /dev/null
+++ b/bot/exts/utilities/colour.py
@@ -0,0 +1,266 @@
+import colorsys
+import json
+import pathlib
+import random
+import string
+from io import BytesIO
+from typing import Optional
+
+import discord
+import rapidfuzz
+from PIL import Image, ImageColor
+from discord.ext import commands
+
+from bot import constants
+from bot.bot import Bot
+from bot.exts.core.extensions import invoke_help_command
+from bot.utils.decorators import whitelist_override
+
+THUMBNAIL_SIZE = (80, 80)
+
+
+class Colour(commands.Cog):
+ """Cog for the Colour command."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ with open(pathlib.Path("bot/resources/utilities/ryanzec_colours.json")) as f:
+ self.colour_mapping = json.load(f)
+ del self.colour_mapping['_'] # Delete source credit entry
+
+ async def send_colour_response(self, ctx: commands.Context, rgb: tuple[int, int, int]) -> None:
+ """Create and send embed from user given colour information."""
+ name = self._rgb_to_name(rgb)
+ try:
+ colour_or_color = ctx.invoked_parents[0]
+ except IndexError:
+ colour_or_color = "colour"
+
+ colour_mode = ctx.invoked_with
+ if colour_mode == "random":
+ colour_mode = colour_or_color
+ input_colour = name
+ elif colour_mode in ("colour", "color"):
+ input_colour = ctx.kwargs["colour_input"]
+ elif colour_mode == "name":
+ input_colour = ctx.kwargs["user_colour_name"]
+ elif colour_mode == "hex":
+ input_colour = ctx.args[2:][0]
+ if len(input_colour) > 7:
+ input_colour = input_colour[0:-2]
+ else:
+ input_colour = tuple(ctx.args[2:])
+
+ if colour_mode not in ("name", "hex", "random", "color", "colour"):
+ colour_mode = colour_mode.upper()
+ else:
+ colour_mode = colour_mode.title()
+
+ colour_embed = discord.Embed(
+ title=f"{name or input_colour}",
+ description=f"{colour_or_color.title()} information for {colour_mode} `{input_colour or name}`.",
+ colour=discord.Color.from_rgb(*rgb)
+ )
+ colour_conversions = self.get_colour_conversions(rgb)
+ for colour_space, value in colour_conversions.items():
+ colour_embed.add_field(
+ name=colour_space,
+ value=f"`{value}`",
+ inline=True
+ )
+
+ thumbnail = Image.new("RGB", THUMBNAIL_SIZE, color=rgb)
+ buffer = BytesIO()
+ thumbnail.save(buffer, "PNG")
+ buffer.seek(0)
+ thumbnail_file = discord.File(buffer, filename="colour.png")
+
+ colour_embed.set_thumbnail(url="attachment://colour.png")
+
+ await ctx.send(file=thumbnail_file, embed=colour_embed)
+
+ @commands.group(aliases=("color",), invoke_without_command=True)
+ @whitelist_override(
+ channels=constants.WHITELISTED_CHANNELS,
+ roles=constants.STAFF_ROLES,
+ categories=[constants.Categories.development, constants.Categories.media]
+ )
+ async def colour(self, ctx: commands.Context, *, colour_input: Optional[str] = None) -> None:
+ """
+ Create an embed that displays colour information.
+
+ If no subcommand is called, a randomly selected colour will be shown.
+ """
+ if colour_input is None:
+ await self.random(ctx)
+ return
+
+ try:
+ extra_colour = ImageColor.getrgb(colour_input)
+ await self.send_colour_response(ctx, extra_colour)
+ except ValueError:
+ await invoke_help_command(ctx)
+
+ @colour.command()
+ async def rgb(self, ctx: commands.Context, red: int, green: int, blue: int) -> None:
+ """Create an embed from an RGB input."""
+ if any(c not in range(256) for c in (red, green, blue)):
+ raise commands.BadArgument(
+ message=f"RGB values can only be from 0 to 255. User input was: `{red, green, blue}`."
+ )
+ rgb_tuple = (red, green, blue)
+ await self.send_colour_response(ctx, rgb_tuple)
+
+ @colour.command()
+ async def hsv(self, ctx: commands.Context, hue: int, saturation: int, value: int) -> None:
+ """Create an embed from an HSV input."""
+ if (hue not in range(361)) or any(c not in range(101) for c in (saturation, value)):
+ raise commands.BadArgument(
+ message="Hue can only be from 0 to 360. Saturation and Value can only be from 0 to 100. "
+ f"User input was: `{hue, saturation, value}`."
+ )
+ hsv_tuple = ImageColor.getrgb(f"hsv({hue}, {saturation}%, {value}%)")
+ await self.send_colour_response(ctx, hsv_tuple)
+
+ @colour.command()
+ async def hsl(self, ctx: commands.Context, hue: int, saturation: int, lightness: int) -> None:
+ """Create an embed from an HSL input."""
+ if (hue not in range(361)) or any(c not in range(101) for c in (saturation, lightness)):
+ raise commands.BadArgument(
+ message="Hue can only be from 0 to 360. Saturation and Lightness can only be from 0 to 100. "
+ f"User input was: `{hue, saturation, lightness}`."
+ )
+ hsl_tuple = ImageColor.getrgb(f"hsl({hue}, {saturation}%, {lightness}%)")
+ await self.send_colour_response(ctx, hsl_tuple)
+
+ @colour.command()
+ async def cmyk(self, ctx: commands.Context, cyan: int, magenta: int, yellow: int, key: int) -> None:
+ """Create an embed from a CMYK input."""
+ if any(c not in range(101) for c in (cyan, magenta, yellow, key)):
+ raise commands.BadArgument(
+ message=f"CMYK values can only be from 0 to 100. User input was: `{cyan, magenta, yellow, key}`."
+ )
+ r = round(255 * (1 - (cyan / 100)) * (1 - (key / 100)))
+ g = round(255 * (1 - (magenta / 100)) * (1 - (key / 100)))
+ b = round(255 * (1 - (yellow / 100)) * (1 - (key / 100)))
+ await self.send_colour_response(ctx, (r, g, b))
+
+ @colour.command()
+ async def hex(self, ctx: commands.Context, hex_code: str) -> None:
+ """Create an embed from a HEX input."""
+ if hex_code[0] != "#":
+ hex_code = f"#{hex_code}"
+
+ if len(hex_code) not in (4, 5, 7, 9) or any(digit not in string.hexdigits for digit in hex_code[1:]):
+ raise commands.BadArgument(
+ message=f"Cannot convert `{hex_code}` to a recognizable Hex format. "
+ "Hex values must be hexadecimal and take the form *#RRGGBB* or *#RGB*."
+ )
+
+ hex_tuple = ImageColor.getrgb(hex_code)
+ if len(hex_tuple) == 4:
+ hex_tuple = hex_tuple[:-1] # Colour must be RGB. If RGBA, we remove the alpha value
+ await self.send_colour_response(ctx, hex_tuple)
+
+ @colour.command()
+ async def name(self, ctx: commands.Context, *, user_colour_name: str) -> None:
+ """Create an embed from a name input."""
+ hex_colour = self.match_colour_name(ctx, user_colour_name)
+ if hex_colour is None:
+ name_error_embed = discord.Embed(
+ title="No colour match found.",
+ description=f"No colour found for: `{user_colour_name}`",
+ colour=discord.Color.dark_red()
+ )
+ await ctx.send(embed=name_error_embed)
+ return
+ hex_tuple = ImageColor.getrgb(hex_colour)
+ await self.send_colour_response(ctx, hex_tuple)
+
+ @colour.command()
+ async def random(self, ctx: commands.Context) -> None:
+ """Create an embed from a randomly chosen colour."""
+ hex_colour = random.choice(list(self.colour_mapping.values()))
+ hex_tuple = ImageColor.getrgb(f"#{hex_colour}")
+ await self.send_colour_response(ctx, hex_tuple)
+
+ def get_colour_conversions(self, rgb: tuple[int, int, int]) -> dict[str, str]:
+ """Create a dictionary mapping of colour types and their values."""
+ colour_name = self._rgb_to_name(rgb)
+ if colour_name is None:
+ colour_name = "No match found"
+ return {
+ "RGB": rgb,
+ "HSV": self._rgb_to_hsv(rgb),
+ "HSL": self._rgb_to_hsl(rgb),
+ "CMYK": self._rgb_to_cmyk(rgb),
+ "Hex": self._rgb_to_hex(rgb),
+ "Name": colour_name
+ }
+
+ @staticmethod
+ def _rgb_to_hsv(rgb: tuple[int, int, int]) -> tuple[int, int, int]:
+ """Convert RGB values to HSV values."""
+ rgb_list = [val / 255 for val in rgb]
+ h, s, v = colorsys.rgb_to_hsv(*rgb_list)
+ hsv = (round(h * 360), round(s * 100), round(v * 100))
+ return hsv
+
+ @staticmethod
+ def _rgb_to_hsl(rgb: tuple[int, int, int]) -> tuple[int, int, int]:
+ """Convert RGB values to HSL values."""
+ rgb_list = [val / 255.0 for val in rgb]
+ h, l, s = colorsys.rgb_to_hls(*rgb_list)
+ hsl = (round(h * 360), round(s * 100), round(l * 100))
+ return hsl
+
+ @staticmethod
+ def _rgb_to_cmyk(rgb: tuple[int, int, int]) -> tuple[int, int, int, int]:
+ """Convert RGB values to CMYK values."""
+ rgb_list = [val / 255.0 for val in rgb]
+ if not any(rgb_list):
+ return 0, 0, 0, 100
+ k = 1 - max(rgb_list)
+ c = round((1 - rgb_list[0] - k) * 100 / (1 - k))
+ m = round((1 - rgb_list[1] - k) * 100 / (1 - k))
+ y = round((1 - rgb_list[2] - k) * 100 / (1 - k))
+ cmyk = (c, m, y, round(k * 100))
+ return cmyk
+
+ @staticmethod
+ def _rgb_to_hex(rgb: tuple[int, int, int]) -> str:
+ """Convert RGB values to HEX code."""
+ hex_ = "".join([hex(val)[2:].zfill(2) for val in rgb])
+ hex_code = f"#{hex_}".upper()
+ return hex_code
+
+ def _rgb_to_name(self, rgb: tuple[int, int, int]) -> Optional[str]:
+ """Convert RGB values to a fuzzy matched name."""
+ input_hex_colour = self._rgb_to_hex(rgb)
+ try:
+ match, certainty, _ = rapidfuzz.process.extractOne(
+ query=input_hex_colour,
+ choices=self.colour_mapping.values(),
+ score_cutoff=80
+ )
+ colour_name = [name for name, hex_code in self.colour_mapping.items() if hex_code == match][0]
+ except TypeError:
+ colour_name = None
+ return colour_name
+
+ def match_colour_name(self, ctx: commands.Context, input_colour_name: str) -> Optional[str]:
+ """Convert a colour name to HEX code."""
+ try:
+ match, certainty, _ = rapidfuzz.process.extractOne(
+ query=input_colour_name,
+ choices=self.colour_mapping.keys(),
+ score_cutoff=80
+ )
+ except (ValueError, TypeError):
+ return
+ return f"#{self.colour_mapping[match]}"
+
+
+def setup(bot: Bot) -> None:
+ """Load the Colour cog."""
+ bot.add_cog(Colour(bot))
diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py
index dd537022..8bf2abfd 100644
--- a/bot/exts/utilities/conversationstarters.py
+++ b/bot/exts/utilities/conversationstarters.py
@@ -1,11 +1,15 @@
+import asyncio
+from contextlib import suppress
+from functools import partial
from pathlib import Path
+from typing import Union
+import discord
import yaml
-from discord import Color, Embed
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import WHITELISTED_CHANNELS
+from bot.constants import MODERATION_ROLES, WHITELISTED_CHANNELS
from bot.utils.decorators import whitelist_override
from bot.utils.randomization import RandomCycle
@@ -35,35 +39,88 @@ TOPICS = {
class ConvoStarters(commands.Cog):
"""General conversation topics."""
- @commands.command()
- @whitelist_override(channels=ALL_ALLOWED_CHANNELS)
- async def topic(self, ctx: commands.Context) -> None:
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def _build_topic_embed(channel_id: int) -> discord.Embed:
"""
- Responds with a random topic to start a conversation.
+ Build an embed containing a conversation topic.
If in a Python channel, a python-related topic will be given.
-
Otherwise, a random conversation topic will be received by the user.
"""
# No matter what, the form will be shown.
- embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple())
+ embed = discord.Embed(
+ description=f"Suggest more topics [here]({SUGGESTION_FORM})!",
+ color=discord.Colour.og_blurple()
+ )
try:
- # Fetching topics.
- channel_topics = TOPICS[ctx.channel.id]
-
- # If the channel isn't Python-related.
+ channel_topics = TOPICS[channel_id]
except KeyError:
+ # Channel doesn't have any topics.
embed.title = f"**{next(TOPICS['default'])}**"
-
- # If the channel ID doesn't have any topics.
else:
embed.title = f"**{next(channel_topics)}**"
+ return embed
+
+ @staticmethod
+ def _predicate(
+ command_invoker: Union[discord.User, discord.Member],
+ message: discord.Message,
+ reaction: discord.Reaction,
+ user: discord.User
+ ) -> bool:
+ user_is_moderator = any(role.id in MODERATION_ROLES for role in getattr(user, "roles", []))
+ user_is_invoker = user.id == command_invoker.id
+
+ is_right_reaction = all((
+ reaction.message.id == message.id,
+ str(reaction.emoji) == "πŸ”„",
+ user_is_moderator or user_is_invoker
+ ))
+ return is_right_reaction
+
+ async def _listen_for_refresh(
+ self,
+ command_invoker: Union[discord.User, discord.Member],
+ message: discord.Message
+ ) -> None:
+ await message.add_reaction("πŸ”„")
+ while True:
+ try:
+ reaction, user = await self.bot.wait_for(
+ "reaction_add",
+ check=partial(self._predicate, command_invoker, message),
+ timeout=60.0
+ )
+ except asyncio.TimeoutError:
+ with suppress(discord.NotFound):
+ await message.clear_reaction("πŸ”„")
+ break
+
+ try:
+ await message.edit(embed=self._build_topic_embed(message.channel.id))
+ except discord.NotFound:
+ break
+
+ with suppress(discord.NotFound):
+ await message.remove_reaction(reaction, user)
- finally:
- await ctx.send(embed=embed)
+ @commands.command()
+ @commands.cooldown(1, 60*2, commands.BucketType.channel)
+ @whitelist_override(channels=ALL_ALLOWED_CHANNELS)
+ async def topic(self, ctx: commands.Context) -> None:
+ """
+ Responds with a random topic to start a conversation.
+
+ Allows the refresh of a topic by pressing an emoji.
+ """
+ message = await ctx.send(embed=self._build_topic_embed(ctx.channel.id))
+ self.bot.loop.create_task(self._listen_for_refresh(ctx.author, message))
def setup(bot: Bot) -> None:
"""Load the ConvoStarters cog."""
- bot.add_cog(ConvoStarters())
+ bot.add_cog(ConvoStarters(bot))
diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py
index 55d6b8e9..fa438d7f 100644
--- a/bot/exts/utilities/emoji.py
+++ b/bot/exts/utilities/emoji.py
@@ -107,11 +107,11 @@ class Emojis(commands.Cog):
title=f"Emoji Information: {emoji.name}",
description=textwrap.dedent(f"""
**Name:** {emoji.name}
- **Created:** {time_since(emoji.created_at, precision="hours")}
- **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")}
+ **Created:** {time_since(emoji.created_at.replace(tzinfo=None), precision="hours")}
+ **Date:** {datetime.strftime(emoji.created_at.replace(tzinfo=None), "%d/%m/%Y")}
**ID:** {emoji.id}
"""),
- color=Color.blurple(),
+ color=Color.og_blurple(),
url=str(emoji.url),
).set_thumbnail(url=emoji.url)
diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py
new file mode 100644
index 00000000..42312dd1
--- /dev/null
+++ b/bot/exts/utilities/epoch.py
@@ -0,0 +1,138 @@
+from typing import Optional, Union
+
+import arrow
+import discord
+from dateutil import parser
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.utils.extensions import invoke_help_command
+
+# https://discord.com/developers/docs/reference#message-formatting-timestamp-styles
+STYLES = {
+ "Epoch": ("",),
+ "Short Time": ("t", "h:mm A",),
+ "Long Time": ("T", "h:mm:ss A"),
+ "Short Date": ("d", "MM/DD/YYYY"),
+ "Long Date": ("D", "MMMM D, YYYY"),
+ "Short Date/Time": ("f", "MMMM D, YYYY h:mm A"),
+ "Long Date/Time": ("F", "dddd, MMMM D, YYYY h:mm A"),
+ "Relative Time": ("R",)
+}
+DROPDOWN_TIMEOUT = 60
+
+
+class DateString(commands.Converter):
+ """Convert a relative or absolute date/time string to an arrow.Arrow object."""
+
+ async def convert(self, ctx: commands.Context, argument: str) -> Union[arrow.Arrow, Optional[tuple]]:
+ """
+ Convert a relative or absolute date/time string to an arrow.Arrow object.
+
+ Try to interpret the date string as a relative time. If conversion fails, try to interpret it as an absolute
+ time. Tokens that are not recognised are returned along with the part of the string that was successfully
+ converted to an arrow object. If the date string cannot be parsed, BadArgument is raised.
+ """
+ try:
+ return arrow.utcnow().dehumanize(argument)
+ except (ValueError, OverflowError):
+ try:
+ dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True)
+ except parser.ParserError:
+ raise commands.BadArgument(f"`{argument}` Could not be parsed to a relative or absolute date.")
+ except OverflowError:
+ raise commands.BadArgument(f"`{argument}` Results in a date outside of the supported range.")
+ return arrow.get(dt), ignored_tokens
+
+
+class Epoch(commands.Cog):
+ """Convert an entered time and date to a unix timestamp."""
+
+ @commands.command(name="epoch")
+ async def epoch(self, ctx: commands.Context, *, date_time: DateString = None) -> None:
+ """
+ Convert an entered date/time string to the equivalent epoch.
+
+ **Relative time**
+ Must begin with `in...` or end with `...ago`.
+ Accepted units: "seconds", "minutes", "hours", "days", "weeks", "months", "years".
+ eg `.epoch in a month 4 days and 2 hours`
+
+ **Absolute time**
+ eg `.epoch 2022/6/15 16:43 -04:00`
+ Absolute times must be entered in descending orders of magnitude.
+ If AM or PM is left unspecified, the 24-hour clock is assumed.
+ Timezones are optional, and will default to UTC. The following timezone formats are accepted:
+ Z (UTC)
+ Β±HH:MM
+ Β±HHMM
+ Β±HH
+
+ Times in the dropdown are shown in UTC
+ """
+ if not date_time:
+ await invoke_help_command(ctx)
+ return
+
+ if isinstance(date_time, tuple):
+ # Remove empty strings. Strip extra whitespace from the remaining items
+ ignored_tokens = list(map(str.strip, filter(str.strip, date_time[1])))
+ date_time = date_time[0]
+ if ignored_tokens:
+ await ctx.send(f"Could not parse the following token(s): `{', '.join(ignored_tokens)}`")
+ await ctx.send(f"Date and time parsed as: `{date_time.format(arrow.FORMAT_RSS)}`")
+
+ epoch = int(date_time.timestamp())
+ view = TimestampMenuView(ctx, self._format_dates(date_time), epoch)
+ original = await ctx.send(f"`{epoch}`", view=view)
+ await view.wait() # wait until expiration before removing the dropdown
+ try:
+ await original.edit(view=None)
+ except discord.NotFound: # disregard the error message if the message is deleled
+ pass
+
+ @staticmethod
+ def _format_dates(date: arrow.Arrow) -> list[str]:
+ """
+ Return a list of date strings formatted according to the discord timestamp styles.
+
+ These are used in the description of each style in the dropdown
+ """
+ date = date.to('utc')
+ formatted = [str(int(date.timestamp()))]
+ formatted += [date.format(format[1]) for format in list(STYLES.values())[1:7]]
+ formatted.append(date.humanize())
+ return formatted
+
+
+class TimestampMenuView(discord.ui.View):
+ """View for the epoch command which contains a single `discord.ui.Select` dropdown component."""
+
+ def __init__(self, ctx: commands.Context, formatted_times: list[str], epoch: int):
+ super().__init__(timeout=DROPDOWN_TIMEOUT)
+ self.ctx = ctx
+ self.epoch = epoch
+ self.dropdown: discord.ui.Select = self.children[0]
+ for label, date_time in zip(STYLES.keys(), formatted_times):
+ self.dropdown.add_option(label=label, description=date_time)
+
+ @discord.ui.select(placeholder="Select the format of your timestamp")
+ async def select_format(self, _: discord.ui.Select, interaction: discord.Interaction) -> discord.Message:
+ """Drop down menu which contains a list of formats which discord timestamps can take."""
+ selected = interaction.data["values"][0]
+ if selected == "Epoch":
+ return await interaction.response.edit_message(content=f"`{self.epoch}`")
+ return await interaction.response.edit_message(content=f"`<t:{self.epoch}:{STYLES[selected][0]}>`")
+
+ async def interaction_check(self, interaction: discord.Interaction) -> bool:
+ """Check to ensure that the interacting user is the user who invoked the command."""
+ if interaction.user != self.ctx.author:
+ embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.")
+ await interaction.response.send_message(embed=embed, ephemeral=True)
+ return False
+ return True
+
+
+def setup(bot: Bot) -> None:
+ """Load the Epoch cog."""
+ bot.add_cog(Epoch())
diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py
index d00b408d..963f54e5 100644
--- a/bot/exts/utilities/githubinfo.py
+++ b/bot/exts/utilities/githubinfo.py
@@ -1,30 +1,165 @@
import logging
import random
+import re
+import typing as t
+from dataclasses import dataclass
from datetime import datetime
-from urllib.parse import quote, quote_plus
+from urllib.parse import quote
import discord
+from aiohttp import ClientResponse
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES
+from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens
from bot.exts.core.extensions import invoke_help_command
log = logging.getLogger(__name__)
GITHUB_API_URL = "https://api.github.com"
+REQUEST_HEADERS = {
+ "Accept": "application/vnd.github.v3+json"
+}
+
+REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
+ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
+PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
+
+if Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}"
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+# Maximum number of issues in one message
+MAXIMUM_ISSUES = 5
+
+# Regex used when looking for automatic linking in messages
+# regex101 of current regex https://regex101.com/r/V2ji8M/6
+AUTOMATIC_REGEX = re.compile(
+ r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
+)
+
+
+@dataclass(eq=True, frozen=True)
+class FoundIssue:
+ """Dataclass representing an issue found by the regex."""
+
+ organisation: t.Optional[str]
+ repository: str
+ number: str
+
+
+@dataclass(eq=True, frozen=True)
+class FetchError:
+ """Dataclass representing an error while fetching an issue."""
+
+ return_code: int
+ message: str
+
+
+@dataclass(eq=True, frozen=True)
+class IssueState:
+ """Dataclass representing the state of an issue."""
+
+ repository: str
+ number: int
+ url: str
+ title: str
+ emoji: str
+
class GithubInfo(commands.Cog):
- """Fetches info from GitHub."""
+ """A Cog that fetches info from GitHub."""
def __init__(self, bot: Bot):
self.bot = bot
+ self.repos = []
+
+ @staticmethod
+ def remove_codeblocks(message: str) -> str:
+ """Remove any codeblock in a message."""
+ return CODE_BLOCK_RE.sub("", message)
+
+ async def fetch_issue(
+ self,
+ number: int,
+ repository: str,
+ user: str
+ ) -> t.Union[IssueState, FetchError]:
+ """
+ Retrieve an issue from a GitHub repository.
+
+ Returns IssueState on success, FetchError on failure.
+ """
+ url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
+ pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
+
+ json_data, r = await self.fetch_data(url)
+
+ if r.status == 403:
+ if r.headers.get("X-RateLimit-Remaining") == "0":
+ log.info(f"Ratelimit reached while fetching {url}")
+ return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
+ return FetchError(403, "Cannot access issue.")
+ elif r.status in (404, 410):
+ return FetchError(r.status, "Issue not found.")
+ elif r.status != 200:
+ return FetchError(r.status, "Error while fetching issue.")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data["html_url"]:
+ if json_data.get("state") == "open":
+ emoji = Emojis.issue_open
+ else:
+ emoji = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ pull_data, _ = await self.fetch_data(pulls_url)
+ if pull_data["draft"]:
+ emoji = Emojis.pull_request_draft
+ elif pull_data["state"] == "open":
+ emoji = Emojis.pull_request_open
+ # When 'merged_at' is not None, this means that the state of the PR is merged
+ elif pull_data["merged_at"] is not None:
+ emoji = Emojis.pull_request_merged
+ else:
+ emoji = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
- async def fetch_data(self, url: str) -> dict:
- """Retrieve data as a dictionary."""
- async with self.bot.http_session.get(url) as r:
- return await r.json()
+ return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
+
+ @staticmethod
+ def format_embed(
+ results: t.List[t.Union[IssueState, FetchError]]
+ ) -> discord.Embed:
+ """Take a list of IssueState or FetchError and format a Discord embed for them."""
+ description_list = []
+
+ for result in results:
+ if isinstance(result, IssueState):
+ description_list.append(f"{result.emoji} [{result.title}]({result.url})")
+ elif isinstance(result, FetchError):
+ description_list.append(f":x: [{result.return_code}] {result.message}")
+
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description="\n".join(description_list)
+ )
+
+ resp.set_author(name="GitHub")
+ return resp
@commands.group(name="github", aliases=("gh", "git"))
@commands.cooldown(1, 10, commands.BucketType.user)
@@ -33,11 +168,67 @@ class GithubInfo(commands.Cog):
if ctx.invoked_subcommand is None:
await invoke_help_command(ctx)
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """
+ Automatic issue linking.
+
+ Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
+ """
+ # Ignore bots
+ if message.author.bot:
+ return
+
+ issues = [
+ FoundIssue(*match.group("org", "repo", "number"))
+ for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
+ ]
+ links = []
+
+ if issues:
+ # Block this from working in DMs
+ if not message.guild:
+ return
+
+ log.trace(f"Found {issues = }")
+ # Remove duplicates
+ issues = set(issues)
+
+ if len(issues) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await message.channel.send(embed=embed, delete_after=5)
+ return
+
+ for repo_issue in issues:
+ result = await self.fetch_issue(
+ int(repo_issue.number),
+ repo_issue.repository,
+ repo_issue.organisation or "python-discord"
+ )
+ if isinstance(result, IssueState):
+ links.append(result)
+
+ if not links:
+ return
+
+ resp = self.format_embed(links)
+ await message.channel.send(embed=resp)
+
+ async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]:
+ """Retrieve data as a dictionary and the response in a tuple."""
+ log.trace(f"Querying GH issues API: {url}")
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
+ return await r.json(), r
+
@github_group.command(name="user", aliases=("userinfo",))
async def github_user_info(self, ctx: commands.Context, username: str) -> None:
"""Fetches a user's GitHub information."""
async with ctx.typing():
- user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
+ user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")
# User_data will not have a message key if the user exists
if "message" in user_data:
@@ -50,7 +241,7 @@ class GithubInfo(commands.Cog):
await ctx.send(embed=embed)
return
- org_data = await self.fetch_data(user_data["organizations_url"])
+ org_data, _ = await self.fetch_data(user_data["organizations_url"])
orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
orgs_to_add = " | ".join(orgs)
@@ -67,7 +258,7 @@ class GithubInfo(commands.Cog):
embed = discord.Embed(
title=f"`{user_data['login']}`'s GitHub profile info",
description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "",
- colour=discord.Colour.blurple(),
+ colour=discord.Colour.og_blurple(),
url=user_data["html_url"],
timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ")
)
@@ -91,10 +282,7 @@ class GithubInfo(commands.Cog):
)
if user_data["type"] == "User":
- embed.add_field(
- name="Gists",
- value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
- )
+ embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")
embed.add_field(
name=f"Organization{'s' if len(orgs)!=1 else ''}",
@@ -123,7 +311,7 @@ class GithubInfo(commands.Cog):
return
async with ctx.typing():
- repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
+ repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
# There won't be a message key if this repo exists
if "message" in repo_data:
@@ -139,7 +327,7 @@ class GithubInfo(commands.Cog):
embed = discord.Embed(
title=repo_data["name"],
description=repo_data["description"],
- colour=discord.Colour.blurple(),
+ colour=discord.Colour.og_blurple(),
url=repo_data["html_url"]
)
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
deleted file mode 100644
index 8a7ebed0..00000000
--- a/bot/exts/utilities/issues.py
+++ /dev/null
@@ -1,275 +0,0 @@
-import logging
-import random
-import re
-from dataclasses import dataclass
-from typing import Optional, Union
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import (
- Categories,
- Channels,
- Colours,
- ERROR_REPLIES,
- Emojis,
- NEGATIVE_REPLIES,
- Tokens,
- WHITELISTED_CHANNELS
-)
-from bot.utils.decorators import whitelist_override
-from bot.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-BAD_RESPONSE = {
- 404: "Issue/pull request not located! Please enter a valid number!",
- 403: "Rate limit has been hit! Please try again later!"
-}
-REQUEST_HEADERS = {
- "Accept": "application/vnd.github.v3+json"
-}
-
-REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
-ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
-PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
-
-if GITHUB_TOKEN := Tokens.github:
- REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
-
-WHITELISTED_CATEGORIES = (
- Categories.development, Categories.devprojects, Categories.media, Categories.staff
-)
-
-CODE_BLOCK_RE = re.compile(
- r"^`([^`\n]+)`" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-
-# Maximum number of issues in one message
-MAXIMUM_ISSUES = 5
-
-# Regex used when looking for automatic linking in messages
-# regex101 of current regex https://regex101.com/r/V2ji8M/6
-AUTOMATIC_REGEX = re.compile(
- r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
-)
-
-
-@dataclass
-class FoundIssue:
- """Dataclass representing an issue found by the regex."""
-
- organisation: Optional[str]
- repository: str
- number: str
-
- def __hash__(self) -> int:
- return hash((self.organisation, self.repository, self.number))
-
-
-@dataclass
-class FetchError:
- """Dataclass representing an error while fetching an issue."""
-
- return_code: int
- message: str
-
-
-@dataclass
-class IssueState:
- """Dataclass representing the state of an issue."""
-
- repository: str
- number: int
- url: str
- title: str
- emoji: str
-
-
-class Issues(commands.Cog):
- """Cog that allows users to retrieve issues from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.repos = []
-
- @staticmethod
- def remove_codeblocks(message: str) -> str:
- """Remove any codeblock in a message."""
- return re.sub(CODE_BLOCK_RE, "", message)
-
- async def fetch_issues(
- self,
- number: int,
- repository: str,
- user: str
- ) -> Union[IssueState, FetchError]:
- """
- Retrieve an issue from a GitHub repository.
-
- Returns IssueState on success, FetchError on failure.
- """
- url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
- pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
- log.trace(f"Querying GH issues API: {url}")
-
- async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
- json_data = await r.json()
-
- if r.status == 403:
- if r.headers.get("X-RateLimit-Remaining") == "0":
- log.info(f"Ratelimit reached while fetching {url}")
- return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
- return FetchError(403, "Cannot access issue.")
- elif r.status in (404, 410):
- return FetchError(r.status, "Issue not found.")
- elif r.status != 200:
- return FetchError(r.status, "Error while fetching issue.")
-
- # The initial API request is made to the issues API endpoint, which will return information
- # if the issue or PR is present. However, the scope of information returned for PRs differs
- # from issues: if the 'issues' key is present in the response then we can pull the data we
- # need from the initial API call.
- if "issues" in json_data["html_url"]:
- if json_data.get("state") == "open":
- emoji = Emojis.issue_open
- else:
- emoji = Emojis.issue_closed
-
- # If the 'issues' key is not contained in the API response and there is no error code, then
- # we know that a PR has been requested and a call to the pulls API endpoint is necessary
- # to get the desired information for the PR.
- else:
- log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
- async with self.bot.http_session.get(pulls_url) as p:
- pull_data = await p.json()
- if pull_data["draft"]:
- emoji = Emojis.pull_request_draft
- elif pull_data["state"] == "open":
- emoji = Emojis.pull_request_open
- # When 'merged_at' is not None, this means that the state of the PR is merged
- elif pull_data["merged_at"] is not None:
- emoji = Emojis.pull_request_merged
- else:
- emoji = Emojis.pull_request_closed
-
- issue_url = json_data.get("html_url")
-
- return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
-
- @staticmethod
- def format_embed(
- results: list[Union[IssueState, FetchError]],
- user: str,
- repository: Optional[str] = None
- ) -> discord.Embed:
- """Take a list of IssueState or FetchError and format a Discord embed for them."""
- description_list = []
-
- for result in results:
- if isinstance(result, IssueState):
- description_list.append(f"{result.emoji} [{result.title}]({result.url})")
- elif isinstance(result, FetchError):
- description_list.append(f":x: [{result.return_code}] {result.message}")
-
- resp = discord.Embed(
- colour=Colours.bright_green,
- description="\n".join(description_list)
- )
-
- embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
- resp.set_author(name="GitHub", url=embed_url)
- return resp
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(aliases=("pr",))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
- # Remove duplicates
- numbers = set(numbers)
-
- if len(numbers) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await ctx.send(embed=embed)
- await invoke_help_command(ctx)
-
- results = [await self.fetch_issues(number, repository, user) for number in numbers]
- await ctx.send(embed=self.format_embed(results, user, repository))
-
- @commands.Cog.listener()
- async def on_message(self, message: discord.Message) -> None:
- """
- Automatic issue linking.
-
- Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
- """
- # Ignore bots
- if message.author.bot:
- return
-
- issues = [
- FoundIssue(*match.group("org", "repo", "number"))
- for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
- ]
- links = []
-
- if issues:
- # Block this from working in DMs
- if not message.guild:
- await message.channel.send(
- embed=discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=(
- "You can't retrieve issues from DMs. "
- f"Try again in <#{Channels.community_bot_commands}>"
- ),
- colour=Colours.soft_red
- )
- )
- return
-
- log.trace(f"Found {issues = }")
- # Remove duplicates
- issues = set(issues)
-
- if len(issues) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await message.channel.send(embed=embed, delete_after=5)
- return
-
- for repo_issue in issues:
- result = await self.fetch_issues(
- int(repo_issue.number),
- repo_issue.repository,
- repo_issue.organisation or "python-discord"
- )
- if isinstance(result, IssueState):
- links.append(result)
-
- if not links:
- return
-
- resp = self.format_embed(links, "python-discord")
- await message.channel.send(embed=resp)
-
-
-def setup(bot: Bot) -> None:
- """Load the Issues cog."""
- bot.add_cog(Issues(bot))
diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py
deleted file mode 100644
index 36c7e0ab..00000000
--- a/bot/exts/utilities/latex.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import asyncio
-import hashlib
-import pathlib
-import re
-from concurrent.futures import ThreadPoolExecutor
-from io import BytesIO
-
-import discord
-import matplotlib.pyplot as plt
-from discord.ext import commands
-
-from bot.bot import Bot
-
-# configure fonts and colors for matplotlib
-plt.rcParams.update(
- {
- "font.size": 16,
- "mathtext.fontset": "cm", # Computer Modern font set
- "mathtext.rm": "serif",
- "figure.facecolor": "36393F", # matches Discord's dark mode background color
- "text.color": "white",
- }
-)
-
-FORMATTED_CODE_REGEX = re.compile(
- r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
- r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
- r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
- r"(?P<code>.*?)" # extract all code inside the markup
- r"\s*" # any more whitespace before the end of the code markup
- r"(?P=delim)", # match the exact same delimiter from the start again
- re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
-)
-
-CACHE_DIRECTORY = pathlib.Path("_latex_cache")
-CACHE_DIRECTORY.mkdir(exist_ok=True)
-
-
-class Latex(commands.Cog):
- """Renders latex."""
-
- @staticmethod
- def _render(text: str, filepath: pathlib.Path) -> BytesIO:
- """
- Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception.
-
- Saves rendered image to cache.
- """
- fig = plt.figure()
- rendered_image = BytesIO()
- fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top")
-
- try:
- plt.savefig(rendered_image, bbox_inches="tight", dpi=600)
- except ValueError as e:
- raise commands.BadArgument(str(e))
-
- rendered_image.seek(0)
-
- with open(filepath, "wb") as f:
- f.write(rendered_image.getbuffer())
-
- return rendered_image
-
- @staticmethod
- def _prepare_input(text: str) -> str:
- text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\
-
- if match := FORMATTED_CODE_REGEX.match(text):
- return match.group("code")
- else:
- return text
-
- @commands.command()
- @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
- async def latex(self, ctx: commands.Context, *, text: str) -> None:
- """Renders the text in latex and sends the image."""
- text = self._prepare_input(text)
- query_hash = hashlib.md5(text.encode()).hexdigest()
- image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png")
- async with ctx.typing():
- if image_path.exists():
- await ctx.send(file=discord.File(image_path))
- return
-
- with ThreadPoolExecutor() as pool:
- image = await asyncio.get_running_loop().run_in_executor(
- pool, self._render, text, image_path
- )
-
- await ctx.send(file=discord.File(image, "latex.png"))
-
-
-def setup(bot: Bot) -> None:
- """Load the Latex Cog."""
- # As we have resource issues on this cog,
- # we have it currently disabled while we fix it.
- import logging
- logging.info("Latex cog is currently disabled. It won't be loaded.")
- return
- bot.add_cog(Latex())
diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py
index ef8b2638..bf8f1341 100644
--- a/bot/exts/utilities/realpython.py
+++ b/bot/exts/utilities/realpython.py
@@ -1,5 +1,6 @@
import logging
from html import unescape
+from typing import Optional
from urllib.parse import quote_plus
from discord import Embed
@@ -31,9 +32,18 @@ class RealPython(commands.Cog):
@commands.command(aliases=["rp"])
@commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
- async def realpython(self, ctx: commands.Context, *, user_search: str) -> None:
- """Send 5 articles that match the user's search terms."""
- params = {"q": user_search, "limit": 5, "kind": "article"}
+ async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None:
+ """
+ Send some articles from RealPython that match the search terms.
+
+ By default the top 5 matches are sent, this can be overwritten to
+ a number between 1 and 5 by specifying an amount before the search query.
+ """
+ if not 1 <= amount <= 5:
+ await ctx.send("`amount` must be between 1 and 5 (inclusive).")
+ return
+
+ params = {"q": user_search, "limit": amount, "kind": "article"}
async with self.bot.http_session.get(url=API_ROOT, params=params) as response:
if response.status != 200:
logger.error(
diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py
index e6cb5337..782583d2 100644
--- a/bot/exts/utilities/reddit.py
+++ b/bot/exts/utilities/reddit.py
@@ -244,7 +244,7 @@ class Reddit(Cog):
# Use only starting summary page for #reddit channel posts.
embed.description = self.build_pagination_pages(posts, paginate=False)
- embed.colour = Colour.blurple()
+ embed.colour = Colour.og_blurple()
return embed
@loop()
@@ -312,7 +312,7 @@ class Reddit(Cog):
await ctx.send(f"Here are the top {subreddit} posts of all time!")
embed = Embed(
- color=Colour.blurple()
+ color=Colour.og_blurple()
)
await ImagePaginator.paginate(pages, ctx, embed)
@@ -325,7 +325,7 @@ class Reddit(Cog):
await ctx.send(f"Here are today's top {subreddit} posts!")
embed = Embed(
- color=Colour.blurple()
+ color=Colour.og_blurple()
)
await ImagePaginator.paginate(pages, ctx, embed)
@@ -338,7 +338,7 @@ class Reddit(Cog):
await ctx.send(f"Here are this week's top {subreddit} posts!")
embed = Embed(
- color=Colour.blurple()
+ color=Colour.og_blurple()
)
await ImagePaginator.paginate(pages, ctx, embed)
@@ -349,7 +349,7 @@ class Reddit(Cog):
"""Send a paginated embed of all the subreddits we're relaying."""
embed = Embed()
embed.title = "Relayed subreddits."
- embed.colour = Colour.blurple()
+ embed.colour = Colour.og_blurple()
await LinePaginator.paginate(
RedditConfig.subreddits,
diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py
new file mode 100644
index 00000000..c915f05b
--- /dev/null
+++ b/bot/exts/utilities/twemoji.py
@@ -0,0 +1,150 @@
+import logging
+import re
+from typing import Literal, Optional
+
+import discord
+from discord.ext import commands
+from emoji import UNICODE_EMOJI_ENGLISH, is_emoji
+
+from bot.bot import Bot
+from bot.constants import Colours, Roles
+from bot.utils.decorators import whitelist_override
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+BASE_URLS = {
+ "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/",
+ "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/",
+}
+CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$")
+
+
+class Twemoji(commands.Cog):
+ """Utilities for working with Twemojis."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def get_url(codepoint: str, format: Literal["png", "svg"]) -> str:
+ """Returns a source file URL for the specified Twemoji, in the corresponding format."""
+ return f"{BASE_URLS[format]}{codepoint}.{format}"
+
+ @staticmethod
+ def alias_to_name(alias: str) -> str:
+ """
+ Transform a unicode alias to an emoji name.
+
+ Example usages:
+ >>> alias_to_name(":falling_leaf:")
+ "Falling leaf"
+ >>> alias_to_name(":family_man_girl_boy:")
+ "Family man girl boy"
+ """
+ name = alias.strip(":").replace("_", " ")
+ return name.capitalize()
+
+ @staticmethod
+ def build_embed(codepoint: str) -> discord.Embed:
+ """Returns the main embed for the `twemoji` commmand."""
+ emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-"))
+
+ embed = discord.Embed(
+ title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]),
+ description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})",
+ colour=Colours.twitter_blue,
+ )
+ embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png"))
+ return embed
+
+ @staticmethod
+ def emoji(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found.
+
+ The return value is an emoji character, such as "πŸ‚". The `codepoint`
+ argument can be of any format, since it will be trimmed automatically.
+ """
+ if code := Twemoji.trim_code(codepoint):
+ return chr(int(code, 16))
+
+ @staticmethod
+ def codepoint(emoji: Optional[str]) -> Optional[str]:
+ """
+ Returns the codepoint, in a trimmed format, of a single emoji.
+
+ `emoji` should be an emoji character, such as "🐍" and "πŸ₯°", and
+ not a codepoint like "1f1f8". When working with combined emojis,
+ such as "πŸ‡ΈπŸ‡ͺ" and "πŸ‘¨β€πŸ‘©β€πŸ‘¦", send the component emojis through the method
+ one at a time.
+ """
+ if emoji is None:
+ return None
+ return hex(ord(emoji)).removeprefix("0x")
+
+ @staticmethod
+ def trim_code(codepoint: Optional[str]) -> Optional[str]:
+ """
+ Returns the meaningful information from the given `codepoint`.
+
+ If no codepoint is found, `None` is returned.
+
+ Example usages:
+ >>> trim_code("U+1f1f8")
+ "1f1f8"
+ >>> trim_code("\u0001f1f8")
+ "1f1f8"
+ >>> trim_code("1f466")
+ "1f466"
+ """
+ if code := CODEPOINT_REGEX.search(codepoint or ""):
+ return code.group()
+
+ @staticmethod
+ def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str:
+ """
+ Returns the codepoint corresponding to the passed tuple, separated by "-".
+
+ The return format matches the format used in URLs for Twemoji source files.
+
+ Example usages:
+ >>> codepoint_from_input(("🐍",))
+ "1f40d"
+ >>> codepoint_from_input(("1f1f8", "1f1ea"))
+ "1f1f8-1f1ea"
+ >>> codepoint_from_input(("πŸ‘¨β€πŸ‘§β€πŸ‘¦",))
+ "1f468-200d-1f467-200d-1f466"
+ """
+ raw_emoji = [emoji.lower() for emoji in raw_emoji]
+ if is_emoji(raw_emoji[0]):
+ emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0])
+ return "-".join(emojis)
+
+ emoji = "".join(
+ Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji
+ )
+ if is_emoji(emoji):
+ return "-".join(Twemoji.codepoint(e) or "" for e in emoji)
+
+ raise ValueError("No codepoint could be obtained from the given input")
+
+ @commands.command(aliases=("tw",))
+ @whitelist_override(roles=(Roles.everyone,))
+ async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None:
+ """Sends a preview of a given Twemoji, specified by codepoint or emoji."""
+ if len(raw_emoji) == 0:
+ await invoke_help_command(ctx)
+ return
+ try:
+ codepoint = self.codepoint_from_input(raw_emoji)
+ except ValueError:
+ raise commands.BadArgument(
+ "please include a valid emoji or emoji codepoint."
+ )
+
+ await ctx.send(embed=self.build_embed(codepoint))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Twemoji cog."""
+ bot.add_cog(Twemoji(bot))
diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py
index eccc1f8c..e5e8e289 100644
--- a/bot/exts/utilities/wikipedia.py
+++ b/bot/exts/utilities/wikipedia.py
@@ -82,13 +82,11 @@ class WikipediaSearch(commands.Cog):
if contents:
embed = Embed(
title="Wikipedia Search Results",
- colour=Color.blurple()
+ colour=Color.og_blurple()
)
embed.set_thumbnail(url=WIKI_THUMBNAIL)
embed.timestamp = datetime.utcnow()
- await LinePaginator.paginate(
- contents, ctx, embed
- )
+ await LinePaginator.paginate(contents, ctx, embed, restrict_to_user=ctx.author)
else:
await ctx.send(
"Sorry, we could not find a wikipedia article using that search term."
diff --git a/bot/exts/utilities/wtf_python.py b/bot/exts/utilities/wtf_python.py
new file mode 100644
index 00000000..980b3dba
--- /dev/null
+++ b/bot/exts/utilities/wtf_python.py
@@ -0,0 +1,138 @@
+import logging
+import random
+import re
+from typing import Optional
+
+import rapidfuzz
+from discord import Embed, File
+from discord.ext import commands, tasks
+
+from bot import constants
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+
+WTF_PYTHON_RAW_URL = "http://raw.githubusercontent.com/satwikkansal/wtfpython/master/"
+BASE_URL = "https://github.com/satwikkansal/wtfpython"
+LOGO_PATH = "./bot/resources/utilities/wtf_python_logo.jpg"
+
+ERROR_MESSAGE = f"""
+Unknown WTF Python Query. Please try to reformulate your query.
+
+**Examples**:
+```md
+{constants.Client.prefix}wtf wild imports
+{constants.Client.prefix}wtf subclass
+{constants.Client.prefix}wtf del
+```
+If the problem persists send a message in <#{constants.Channels.dev_contrib}>
+"""
+
+MINIMUM_CERTAINTY = 55
+
+
+class WTFPython(commands.Cog):
+ """Cog that allows getting WTF Python entries from the WTF Python repository."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.headers: dict[str, str] = {}
+ self.fetch_readme.start()
+
+ @tasks.loop(minutes=60)
+ async def fetch_readme(self) -> None:
+ """Gets the content of README.md from the WTF Python Repository."""
+ async with self.bot.http_session.get(f"{WTF_PYTHON_RAW_URL}README.md") as resp:
+ log.trace("Fetching the latest WTF Python README.md")
+ if resp.status == 200:
+ raw = await resp.text()
+ self.parse_readme(raw)
+
+ def parse_readme(self, data: str) -> None:
+ """
+ Parses the README.md into a dict.
+
+ It parses the readme into the `self.headers` dict,
+ where the key is the heading and the value is the
+ link to the heading.
+ """
+ # Match the start of examples, until the end of the table of contents (toc)
+ table_of_contents = re.search(
+ r"\[πŸ‘€ Examples\]\(#-examples\)\n([\w\W]*)<!-- tocstop -->", data
+ )[0].split("\n")
+
+ for header in list(map(str.strip, table_of_contents)):
+ match = re.search(r"\[β–Ά (.*)\]\((.*)\)", header)
+ if match:
+ hyper_link = match[0].split("(")[1].replace(")", "")
+ self.headers[match[0]] = f"{BASE_URL}/{hyper_link}"
+
+ def fuzzy_match_header(self, query: str) -> Optional[str]:
+ """
+ Returns the fuzzy match of a query if its ratio is above "MINIMUM_CERTAINTY" else returns None.
+
+ "MINIMUM_CERTAINTY" is the lowest score at which the fuzzy match will return a result.
+ The certainty returned by rapidfuzz.process.extractOne is a score between 0 and 100,
+ with 100 being a perfect match.
+ """
+ match, certainty, _ = rapidfuzz.process.extractOne(query, self.headers.keys())
+ return match if certainty > MINIMUM_CERTAINTY else None
+
+ @commands.command(aliases=("wtf", "WTF"))
+ async def wtf_python(self, ctx: commands.Context, *, query: Optional[str] = None) -> None:
+ """
+ Search WTF Python repository.
+
+ Gets the link of the fuzzy matched query from https://github.com/satwikkansal/wtfpython.
+ Usage:
+ --> .wtf wild imports
+ """
+ if query is None:
+ no_query_embed = Embed(
+ title="WTF Python?!",
+ colour=constants.Colours.dark_green,
+ description="A repository filled with suprising snippets that can make you say WTF?!\n\n"
+ f"[Go to the Repository]({BASE_URL})"
+ )
+ logo = File(LOGO_PATH, filename="wtf_logo.jpg")
+ no_query_embed.set_thumbnail(url="attachment://wtf_logo.jpg")
+ await ctx.send(embed=no_query_embed, file=logo)
+ return
+
+ if len(query) > 50:
+ embed = Embed(
+ title=random.choice(constants.ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=constants.Colours.soft_red,
+ )
+ match = None
+ else:
+ match = self.fuzzy_match_header(query)
+
+ if not match:
+ embed = Embed(
+ title=random.choice(constants.ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=constants.Colours.soft_red,
+ )
+ await ctx.send(embed=embed)
+ return
+
+ embed = Embed(
+ title="WTF Python?!",
+ colour=constants.Colours.dark_green,
+ description=f"""Search result for '{query}': {match.split("]")[0].replace("[", "")}
+ [Go to Repository Section]({self.headers[match]})""",
+ )
+ logo = File(LOGO_PATH, filename="wtf_logo.jpg")
+ embed.set_thumbnail(url="attachment://wtf_logo.jpg")
+ await ctx.send(embed=embed, file=logo)
+
+ def cog_unload(self) -> None:
+ """Unload the cog and cancel the task."""
+ self.fetch_readme.cancel()
+
+
+def setup(bot: Bot) -> None:
+ """Load the WTFPython Cog."""
+ bot.add_cog(WTFPython(bot))