aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts')
-rw-r--r--bot/exts/avatar_modification/avatar_modify.py2
-rw-r--r--bot/exts/core/error_handler.py10
-rw-r--r--bot/exts/core/help.py5
-rw-r--r--bot/exts/core/internal_eval/_internal_eval.py7
-rw-r--r--bot/exts/events/advent_of_code/_cog.py276
-rw-r--r--bot/exts/events/advent_of_code/_helpers.py63
-rw-r--r--bot/exts/events/advent_of_code/views/dayandstarview.py11
-rw-r--r--bot/exts/fun/snakes/_utils.py25
-rw-r--r--bot/exts/fun/trivia_quiz.py4
-rw-r--r--bot/exts/holidays/easter/earth_photos.py3
-rw-r--r--bot/exts/holidays/halloween/candy_collection.py16
-rw-r--r--bot/exts/holidays/halloween/scarymovie.py1
-rw-r--r--bot/exts/holidays/hanukkah/hanukkah_embed.py84
-rw-r--r--bot/exts/holidays/valentines/be_my_valentine.py40
-rw-r--r--bot/exts/utilities/bookmark.py2
-rw-r--r--bot/exts/utilities/challenges.py44
-rw-r--r--bot/exts/utilities/colour.py266
-rw-r--r--bot/exts/utilities/issues.py9
-rw-r--r--bot/exts/utilities/latex.py101
-rw-r--r--bot/exts/utilities/realpython.py16
-rw-r--r--bot/exts/utilities/wikipedia.py4
-rw-r--r--bot/exts/utilities/wtf_python.py138
22 files changed, 828 insertions, 299 deletions
diff --git a/bot/exts/avatar_modification/avatar_modify.py b/bot/exts/avatar_modification/avatar_modify.py
index fbee96dc..3ee70cfd 100644
--- a/bot/exts/avatar_modification/avatar_modify.py
+++ b/bot/exts/avatar_modification/avatar_modify.py
@@ -286,7 +286,7 @@ class AvatarModify(commands.Cog):
@avatar_modify.command(
aliases=("savatar", "spookify"),
root_aliases=("spookyavatar", "spookify", "savatar"),
- brief="Spookify an user's avatar."
+ brief="Spookify a user's avatar."
)
async def spookyavatar(self, ctx: commands.Context) -> None:
"""This "spookifies" the user's avatar, with a random *spooky* effect."""
diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py
index fd2123e7..676a1e70 100644
--- a/bot/exts/core/error_handler.py
+++ b/bot/exts/core/error_handler.py
@@ -12,7 +12,7 @@ from sentry_sdk import push_scope
from bot.bot import Bot
from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput
from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure
-from bot.utils.exceptions import APIError, UserNotPlayingError
+from bot.utils.exceptions import APIError, MovedCommandError, UserNotPlayingError
log = logging.getLogger(__name__)
@@ -130,6 +130,14 @@ class CommandErrorHandler(commands.Cog):
)
return
+ if isinstance(error, MovedCommandError):
+ description = (
+ f"This command, `{ctx.prefix}{ctx.command.qualified_name}` has moved to `{error.new_command_name}`.\n"
+ f"Please use `{error.new_command_name}` instead."
+ )
+ await ctx.send(embed=self.error_embed(description, NEGATIVE_REPLIES))
+ return
+
with push_scope() as scope:
scope.user = {
"id": ctx.author.id,
diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py
index 4b766b50..db3c2aa6 100644
--- a/bot/exts/core/help.py
+++ b/bot/exts/core/help.py
@@ -13,10 +13,7 @@ from rapidfuzz import process
from bot import constants
from bot.bot import Bot
from bot.constants import Emojis
-from bot.utils.pagination import (
- FIRST_EMOJI, LAST_EMOJI,
- LEFT_EMOJI, LinePaginator, RIGHT_EMOJI,
-)
+from bot.utils.pagination import FIRST_EMOJI, LAST_EMOJI, LEFT_EMOJI, LinePaginator, RIGHT_EMOJI
DELETE_EMOJI = Emojis.trashcan
diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py
index 4f6b4321..5b5461f0 100644
--- a/bot/exts/core/internal_eval/_internal_eval.py
+++ b/bot/exts/core/internal_eval/_internal_eval.py
@@ -10,6 +10,7 @@ from bot.bot import Bot
from bot.constants import Client, Roles
from bot.utils.decorators import with_role
from bot.utils.extensions import invoke_help_command
+
from ._helpers import EvalContext
__all__ = ["InternalEval"]
@@ -146,14 +147,14 @@ class InternalEval(commands.Cog):
await self._send_output(ctx, eval_context.format_output())
@commands.group(name="internal", aliases=("int",))
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
async def internal_group(self, ctx: commands.Context) -> None:
"""Internal commands. Top secret!"""
if not ctx.invoked_subcommand:
await invoke_help_command(ctx)
@internal_group.command(name="eval", aliases=("e",))
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
async def eval(self, ctx: commands.Context, *, code: str) -> None:
"""Run eval in a REPL-like format."""
if match := list(FORMATTED_CODE_REGEX.finditer(code)):
@@ -172,7 +173,7 @@ class InternalEval(commands.Cog):
await self._eval(ctx, code)
@internal_group.command(name="reset", aliases=("clear", "exit", "r", "c"))
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
async def reset(self, ctx: commands.Context) -> None:
"""Reset the context and locals of the eval session."""
self.locals = {}
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py
index 7dd967ec..c597fd0e 100644
--- a/bot/exts/events/advent_of_code/_cog.py
+++ b/bot/exts/events/advent_of_code/_cog.py
@@ -6,14 +6,16 @@ from typing import Optional
import arrow
import discord
-from discord.ext import commands
+from async_rediscache import RedisCache
+from discord.ext import commands, tasks
from bot.bot import Bot
from bot.constants import (
- AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS,
+ AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS
)
from bot.exts.events.advent_of_code import _helpers
from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView
+from bot.utils import members
from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role
from bot.utils.extensions import invoke_help_command
@@ -31,6 +33,14 @@ AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,)
class AdventOfCode(commands.Cog):
"""Advent of Code festivities! Ho Ho Ho!"""
+ # Redis Cache for linking Discord IDs to Advent of Code usernames
+ # RedisCache[member_id: aoc_username_string]
+ account_links = RedisCache()
+
+ # A dict with keys of member_ids to block from getting the role
+ # RedisCache[member_id: None]
+ completionist_block_list = RedisCache()
+
def __init__(self, bot: Bot):
self.bot = bot
@@ -50,6 +60,59 @@ class AdventOfCode(commands.Cog):
self.status_task.set_name("AoC Status Countdown")
self.status_task.add_done_callback(_helpers.background_task_callback)
+ self.completionist_task.start()
+
+ @tasks.loop(minutes=10.0)
+ async def completionist_task(self) -> None:
+ """
+ Give members who have completed all 50 AoC stars the completionist role.
+
+ Runs on a schedule, as defined in the task.loop decorator.
+ """
+ await self.bot.wait_until_guild_available()
+ guild = self.bot.get_guild(Client.guild)
+ completionist_role = guild.get_role(Roles.aoc_completionist)
+ if completionist_role is None:
+ log.warning("Could not find the AoC completionist role; cancelling completionist task.")
+ self.completionist_task.cancel()
+ return
+
+ aoc_name_to_member_id = {
+ aoc_name: member_id
+ for member_id, aoc_name in await self.account_links.items()
+ }
+
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailedError:
+ await self.bot.send_log("Unable to fetch AoC leaderboard during role sync.")
+ return
+
+ placement_leaderboard = json.loads(leaderboard["placement_leaderboard"])
+
+ for member_aoc_info in placement_leaderboard.values():
+ if not member_aoc_info["stars"] == 50:
+ # Only give the role to people who have completed all 50 stars
+ continue
+
+ member_id = aoc_name_to_member_id.get(member_aoc_info["name"], None)
+ if not member_id:
+ log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.")
+ continue
+
+ member = await members.get_or_fetch_member(guild, member_id)
+ if member is None:
+ log.debug(f"Could not find {member_id}, not giving role.")
+ continue
+
+ if completionist_role in member.roles:
+ log.debug(f"{member.name} ({member.mention}) already has the completionist role.")
+ continue
+
+ if not await self.completionist_block_list.contains(member_id):
+ log.debug(f"Giving completionist role to {member.name} ({member.mention}).")
+ await members.handle_role_change(member, member.add_roles, completionist_role)
+
@commands.group(name="adventofcode", aliases=("aoc",))
@whitelist_override(channels=AOC_WHITELIST)
async def adventofcode_group(self, ctx: commands.Context) -> None:
@@ -57,6 +120,21 @@ class AdventOfCode(commands.Cog):
if not ctx.invoked_subcommand:
await invoke_help_command(ctx)
+ @with_role(Roles.admins)
+ @adventofcode_group.command(
+ name="block",
+ brief="Block a user from getting the completionist role.",
+ )
+ async def block_from_role(self, ctx: commands.Context, member: discord.Member) -> None:
+ """Block the given member from receiving the AoC completionist role, removing it from them if needed."""
+ completionist_role = ctx.guild.get_role(Roles.aoc_completionist)
+ if completionist_role in member.roles:
+ await member.remove_roles(completionist_role)
+
+ await self.completionist_block_list.set(member.id, "sentinel")
+ await ctx.send(f":+1: Blocked {member.mention} from getting the AoC completionist role.")
+
+ @commands.guild_only()
@adventofcode_group.command(
name="subscribe",
aliases=("sub", "notifications", "notify", "notifs"),
@@ -86,6 +164,7 @@ class AdventOfCode(commands.Cog):
)
@in_month(Month.DECEMBER)
+ @commands.guild_only()
@adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days")
@whitelist_override(channels=AOC_WHITELIST)
async def aoc_unsubscribe(self, ctx: commands.Context) -> None:
@@ -102,32 +181,26 @@ class AdventOfCode(commands.Cog):
@whitelist_override(channels=AOC_WHITELIST)
async def aoc_countdown(self, ctx: commands.Context) -> None:
"""Return time left until next day."""
- if not _helpers.is_in_advent():
- datetime_now = arrow.now(_helpers.EST)
-
- # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
- this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
- next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
- deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
- delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
-
- # Add a finer timedelta if there's less than a day left
- if delta.days == 0:
- delta_str = f"approximately {delta.seconds // 3600} hours"
- else:
- delta_str = f"{delta.days} days"
+ if _helpers.is_in_advent():
+ tomorrow, _ = _helpers.time_left_to_est_midnight()
+ next_day_timestamp = int(tomorrow.timestamp())
- await ctx.send(
- "The Advent of Code event is not currently running. "
- f"The next event will start in {delta_str}."
- )
+ await ctx.send(f"Day {tomorrow.day} starts <t:{next_day_timestamp}:R>.")
return
- tomorrow, time_left = _helpers.time_left_to_est_midnight()
+ datetime_now = arrow.now(_helpers.EST)
+ # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
+ this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
+ next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
+ deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
+ delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
- hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60
+ next_aoc_timestamp = int((datetime_now + delta).timestamp())
- await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.")
+ await ctx.send(
+ "The Advent of Code event is not currently running. "
+ f"The next event will start <t:{next_aoc_timestamp}:R>."
+ )
@adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code")
@whitelist_override(channels=AOC_WHITELIST)
@@ -135,6 +208,7 @@ class AdventOfCode(commands.Cog):
"""Respond with an explanation of all things Advent of Code."""
await ctx.send(embed=self.cached_about_aoc)
+ @commands.guild_only()
@adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)")
@whitelist_override(channels=AOC_WHITELIST)
async def join_leaderboard(self, ctx: commands.Context) -> None:
@@ -180,26 +254,93 @@ class AdventOfCode(commands.Cog):
else:
await ctx.message.add_reaction(Emojis.envelope)
+ @in_month(Month.NOVEMBER, Month.DECEMBER)
+ @adventofcode_group.command(
+ name="link",
+ aliases=("connect",),
+ brief="Tie your Discord account with your Advent of Code name."
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_link_account(self, ctx: commands.Context, *, aoc_name: str = None) -> None:
+ """
+ Link your Discord Account to your Advent of Code name.
+
+ Stored in a Redis Cache with the format of `Discord ID: Advent of Code Name`
+ """
+ cache_items = await self.account_links.items()
+ cache_aoc_names = [value for _, value in cache_items]
+
+ if aoc_name:
+ # Let's check the current values in the cache to make sure it isn't already tied to a different account
+ if aoc_name == await self.account_links.get(ctx.author.id):
+ await ctx.reply(f"{aoc_name} is already tied to your account.")
+ return
+ elif aoc_name in cache_aoc_names:
+ log.info(
+ f"{ctx.author} ({ctx.author.id}) tried to connect their account to {aoc_name},"
+ " but it's already connected to another user."
+ )
+ await ctx.reply(
+ f"{aoc_name} is already tied to another account."
+ " Please contact an admin if you believe this is an error."
+ )
+ return
+
+ # Update an existing link
+ if old_aoc_name := await self.account_links.get(ctx.author.id):
+ log.info(f"Changing link for {ctx.author} ({ctx.author.id}) from {old_aoc_name} to {aoc_name}.")
+ await self.account_links.set(ctx.author.id, aoc_name)
+ await ctx.reply(f"Your linked account has been changed to {aoc_name}.")
+ else:
+ # Create a new link
+ log.info(f"Linking {ctx.author} ({ctx.author.id}) to account {aoc_name}.")
+ await self.account_links.set(ctx.author.id, aoc_name)
+ await ctx.reply(f"You have linked your Discord ID to {aoc_name}.")
+ else:
+ # User has not supplied a name, let's check if they're in the cache or not
+ if cache_name := await self.account_links.get(ctx.author.id):
+ await ctx.reply(f"You have already linked an Advent of Code account: {cache_name}.")
+ else:
+ await ctx.reply(
+ "You have not linked an Advent of Code account."
+ " Please re-run the command with one specified."
+ )
+
+ @in_month(Month.NOVEMBER, Month.DECEMBER)
+ @adventofcode_group.command(
+ name="unlink",
+ aliases=("disconnect",),
+ brief="Tie your Discord account with your Advent of Code name."
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_unlink_account(self, ctx: commands.Context) -> None:
+ """
+ Unlink your Discord ID with your Advent of Code leaderboard name.
+
+ Deletes the entry that was Stored in the Redis cache.
+ """
+ if aoc_cache_name := await self.account_links.get(ctx.author.id):
+ log.info(f"Unlinking {ctx.author} ({ctx.author.id}) from Advent of Code account {aoc_cache_name}")
+ await self.account_links.delete(ctx.author.id)
+ await ctx.reply(f"We have removed the link between your Discord ID and {aoc_cache_name}.")
+ else:
+ log.info(f"Attempted to unlink {ctx.author} ({ctx.author.id}), but no link was found.")
+ await ctx.reply("You don't have an Advent of Code account linked.")
+
@in_month(Month.DECEMBER)
@adventofcode_group.command(
- name="leaderboard",
- aliases=("board", "lb"),
- brief="Get a snapshot of the PyDis private AoC leaderboard",
+ name="dayandstar",
+ aliases=("daynstar", "daystar"),
+ brief="Get a view that lets you filter the leaderboard by day and star",
)
@whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
- async def aoc_leaderboard(
+ async def aoc_day_and_star_leaderboard(
self,
ctx: commands.Context,
- day_and_star: Optional[bool] = False,
- maximum_scorers: Optional[int] = 10
+ maximum_scorers_day_and_star: Optional[int] = 10
) -> None:
- """
- Get the current top scorers of the Python Discord Leaderboard.
-
- Additionally, you can provide an argument `day_and_star` (Boolean) to have the bot send a View
- that will let you filter by day and star.
- """
- if maximum_scorers > AocConfig.max_day_and_star_results or maximum_scorers <= 0:
+ """Have the bot send a View that will let you filter the leaderboard by day and star."""
+ if maximum_scorers_day_and_star > AocConfig.max_day_and_star_results or maximum_scorers_day_and_star <= 0:
raise commands.BadArgument(
f"The maximum number of results you can query is {AocConfig.max_day_and_star_results}"
)
@@ -209,25 +350,12 @@ class AdventOfCode(commands.Cog):
except _helpers.FetchingLeaderboardFailedError:
await ctx.send(":x: Unable to fetch leaderboard!")
return
- if not day_and_star:
-
- number_of_participants = leaderboard["number_of_participants"]
-
- top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
- header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}"
-
- table = f"```\n{leaderboard['top_leaderboard']}\n```"
- info_embed = _helpers.get_summary_embed(leaderboard)
-
- await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
- return
-
# This is a dictionary that contains solvers in respect of day, and star.
# e.g. 1-1 means the solvers of the first star of the first day and their completion time
per_day_and_star = json.loads(leaderboard['leaderboard_per_day_and_star'])
view = AoCDropdownView(
day_and_star_data=per_day_and_star,
- maximum_scorers=maximum_scorers,
+ maximum_scorers=maximum_scorers_day_and_star,
original_author=ctx.author
)
message = await ctx.send(
@@ -239,6 +367,51 @@ class AdventOfCode(commands.Cog):
@in_month(Month.DECEMBER)
@adventofcode_group.command(
+ name="leaderboard",
+ aliases=("board", "lb"),
+ brief="Get a snapshot of the PyDis private AoC leaderboard",
+ )
+ @whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
+ async def aoc_leaderboard(self, ctx: commands.Context, *, aoc_name: Optional[str] = None) -> None:
+ """
+ Get the current top scorers of the Python Discord Leaderboard.
+
+ Additionally you can specify an `aoc_name` that will append the
+ specified profile's personal stats to the top of the leaderboard
+ """
+ # Strip quotes from the AoC username if needed (e.g. "My Name" -> My Name)
+ # This is to keep compatibility with those already used to wrapping the AoC name in quotes
+ # Note: only strips one layer of quotes to allow names with quotes at the start and end
+ # e.g. ""My Name"" -> "My Name"
+ if aoc_name and aoc_name.startswith('"') and aoc_name.endswith('"'):
+ aoc_name = aoc_name[1:-1]
+
+ # Check if an advent of code account is linked in the Redis Cache if aoc_name is not given
+ if (aoc_cache_name := await self.account_links.get(ctx.author.id)) and aoc_name is None:
+ aoc_name = aoc_cache_name
+
+ async with ctx.typing():
+ try:
+ leaderboard = await _helpers.fetch_leaderboard(self_placement_name=aoc_name)
+ except _helpers.FetchingLeaderboardFailedError:
+ await ctx.send(":x: Unable to fetch leaderboard!")
+ return
+
+ number_of_participants = leaderboard["number_of_participants"]
+
+ top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
+ self_placement_header = " (and your personal stats compared to the top 10)" if aoc_name else ""
+ header = f"Here's our current top {top_count}{self_placement_header}! {Emojis.christmas_tree * 3}"
+ table = "```\n" \
+ f"{leaderboard['placement_leaderboard'] if aoc_name else leaderboard['top_leaderboard']}" \
+ "\n```"
+ info_embed = _helpers.get_summary_embed(leaderboard)
+
+ await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
+ return
+
+ @in_month(Month.DECEMBER)
+ @adventofcode_group.command(
name="global",
aliases=("globalboard", "gb"),
brief="Get a link to the global leaderboard",
@@ -284,7 +457,7 @@ class AdventOfCode(commands.Cog):
info_embed = _helpers.get_summary_embed(leaderboard)
await ctx.send(f"```\n{table}\n```", embed=info_embed)
- @with_role(Roles.admin)
+ @with_role(Roles.admins)
@adventofcode_group.command(
name="refresh",
aliases=("fetch",),
@@ -310,6 +483,7 @@ class AdventOfCode(commands.Cog):
log.debug("Unloading the cog and canceling the background task.")
self.notification_task.cancel()
self.status_task.cancel()
+ self.completionist_task.cancel()
def _build_about_embed(self) -> discord.Embed:
"""Build and return the informational "About AoC" embed from the resources file."""
diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py
index af64bc81..807cc275 100644
--- a/bot/exts/events/advent_of_code/_helpers.py
+++ b/bot/exts/events/advent_of_code/_helpers.py
@@ -10,6 +10,7 @@ from typing import Any, Optional
import aiohttp
import arrow
import discord
+from discord.ext import commands
from bot.bot import Bot
from bot.constants import AdventOfCode, Channels, Colours
@@ -70,6 +71,33 @@ class FetchingLeaderboardFailedError(Exception):
"""Raised when one or more leaderboards could not be fetched at all."""
+def _format_leaderboard_line(rank: int, data: dict[str, Any], *, is_author: bool) -> str:
+ """
+ Build a string representing a line of the leaderboard.
+
+ Parameters:
+ rank:
+ Rank in the leaderboard of this entry.
+
+ data:
+ Mapping with entry information.
+
+ Keyword arguments:
+ is_author:
+ Whether to address the name displayed in the returned line
+ personally.
+
+ Returns:
+ A formatted line for the leaderboard.
+ """
+ return AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=data['name'] if not is_author else f"(You) {data['name']}",
+ score=str(data['score']),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+
+
def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]:
"""
Provide a sorting value for our leaderboard.
@@ -160,10 +188,23 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard, 'per_day_and_star': per_day_star_stats}
-def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
+def _format_leaderboard(leaderboard: dict[str, dict], self_placement_name: str = None) -> str:
"""Format the leaderboard using the AOC_TABLE_TEMPLATE."""
leaderboard_lines = [HEADER]
+ self_placement_exists = False
for rank, data in enumerate(leaderboard.values(), start=1):
+ if self_placement_name and data["name"].lower() == self_placement_name.lower():
+ leaderboard_lines.insert(
+ 1,
+ AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=f"(You) {data['name']}",
+ score=str(data["score"]),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+ )
+ self_placement_exists = True
+ continue
leaderboard_lines.append(
AOC_TABLE_TEMPLATE.format(
rank=rank,
@@ -172,7 +213,13 @@ def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
stars=f"({data['star_1']}, {data['star_2']})"
)
)
-
+ if self_placement_name and not self_placement_exists:
+ raise commands.BadArgument(
+ "Sorry, your profile does not exist in this leaderboard."
+ "\n\n"
+ "To join our leaderboard, run the command `.aoc join`."
+ " If you've joined recently, please wait up to 30 minutes for our leaderboard to refresh."
+ )
return "\n".join(leaderboard_lines)
@@ -260,7 +307,7 @@ def _get_top_leaderboard(full_leaderboard: str) -> str:
@_caches.leaderboard_cache.atomic_transaction
-async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
+async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: str = None) -> dict:
"""
Get the current Python Discord combined leaderboard.
@@ -270,7 +317,6 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
miss, this function is locked to one call at a time using a decorator.
"""
cached_leaderboard = await _caches.leaderboard_cache.to_dict()
-
# Check if the cached leaderboard contains everything we expect it to. If it
# does not, this probably means the cache has not been created yet or has
# expired in Redis. This check also accounts for a malformed cache.
@@ -289,6 +335,7 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
leaderboard_fetched_at = datetime.datetime.utcnow().isoformat()
cached_leaderboard = {
+ "placement_leaderboard": json.dumps(raw_leaderboard_data),
"full_leaderboard": formatted_leaderboard,
"top_leaderboard": _get_top_leaderboard(formatted_leaderboard),
"full_leaderboard_url": full_leaderboard_url,
@@ -307,7 +354,13 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
_caches.leaderboard_cache.namespace,
AdventOfCode.leaderboard_cache_expiry_seconds
)
-
+ if self_placement_name:
+ formatted_placement_leaderboard = _parse_raw_leaderboard_data(
+ json.loads(cached_leaderboard["placement_leaderboard"])
+ )["leaderboard"]
+ cached_leaderboard["placement_leaderboard"] = _get_top_leaderboard(
+ _format_leaderboard(formatted_placement_leaderboard, self_placement_name=self_placement_name)
+ )
return cached_leaderboard
diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py
index 243db32e..a0bfa316 100644
--- a/bot/exts/events/advent_of_code/views/dayandstarview.py
+++ b/bot/exts/events/advent_of_code/views/dayandstarview.py
@@ -17,14 +17,19 @@ class AoCDropdownView(discord.ui.View):
self.original_author = original_author
def generate_output(self) -> str:
- """Generates a formatted codeblock with AoC statistics based on the currently selected day and star."""
+ """
+ Generates a formatted codeblock with AoC statistics based on the currently selected day and star.
+
+ Optionally, when the requested day and star data does not exist yet it returns an error message.
+ """
header = AOC_DAY_AND_STAR_TEMPLATE.format(
rank="Rank",
name="Name", completion_time="Completion time (UTC)"
)
lines = [f"{header}\n{'-' * (len(header) + 2)}"]
-
- for rank, scorer in enumerate(self.data[f"{self.day}-{self.star}"][:self.maximum_scorers]):
+ if not (day_and_star_data := self.data.get(f"{self.day}-{self.star}")):
+ return ":x: The requested data for the specified day and star does not exist yet."
+ for rank, scorer in enumerate(day_and_star_data[:self.maximum_scorers]):
time_data = datetime.fromtimestamp(scorer['completion_time']).strftime("%I:%M:%S %p")
lines.append(AOC_DAY_AND_STAR_TEMPLATE.format(
datastamp="",
diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py
index de51339d..182fa9d9 100644
--- a/bot/exts/fun/snakes/_utils.py
+++ b/bot/exts/fun/snakes/_utils.py
@@ -6,13 +6,14 @@ import math
import random
from itertools import product
from pathlib import Path
+from typing import Union
from PIL import Image
from PIL.ImageDraw import ImageDraw
-from discord import File, Member, Reaction
+from discord import File, Member, Reaction, User
from discord.ext.commands import Cog, Context
-from bot.constants import Roles
+from bot.constants import MODERATION_ROLES
SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute()
@@ -395,7 +396,7 @@ class SnakeAndLaddersGame:
Listen for reactions until players have joined, and the game has been started.
"""
- def startup_event_check(reaction_: Reaction, user_: Member) -> bool:
+ def startup_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
all((
@@ -460,7 +461,7 @@ class SnakeAndLaddersGame:
await self.cancel_game()
return # We're done, no reactions for the last 5 minutes
- async def _add_player(self, user: Member) -> None:
+ async def _add_player(self, user: Union[User, Member]) -> None:
"""Add player to game."""
self.players.append(user)
self.player_tiles[user.id] = 1
@@ -469,7 +470,7 @@ class SnakeAndLaddersGame:
im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))
self.avatar_images[user.id] = im
- async def player_join(self, user: Member) -> None:
+ async def player_join(self, user: Union[User, Member]) -> None:
"""
Handle players joining the game.
@@ -495,7 +496,7 @@ class SnakeAndLaddersGame:
delete_after=10
)
- async def player_leave(self, user: Member) -> bool:
+ async def player_leave(self, user: Union[User, Member]) -> bool:
"""
Handle players leaving the game.
@@ -530,7 +531,7 @@ class SnakeAndLaddersGame:
await self.channel.send("**Snakes and Ladders**: Game has been canceled.")
self._destruct()
- async def start_game(self, user: Member) -> None:
+ async def start_game(self, user: Union[User, Member]) -> None:
"""
Allow the game author to begin the game.
@@ -551,7 +552,7 @@ class SnakeAndLaddersGame:
async def start_round(self) -> None:
"""Begin the round."""
- def game_event_check(reaction_: Reaction, user_: Member) -> bool:
+ def game_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool:
"""Make sure that this reaction is what we want to operate on."""
return (
all((
@@ -644,7 +645,7 @@ class SnakeAndLaddersGame:
if not is_surrendered:
await self._complete_round()
- async def player_roll(self, user: Member) -> None:
+ async def player_roll(self, user: Union[User, Member]) -> None:
"""Handle the player's roll."""
if user.id not in self.player_tiles:
await self.channel.send(user.mention + " You are not in the match.", delete_after=10)
@@ -691,7 +692,7 @@ class SnakeAndLaddersGame:
await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:")
self._destruct()
- def _check_winner(self) -> Member:
+ def _check_winner(self) -> Union[User, Member]:
"""Return a winning member if we're in the post-round state and there's a winner."""
if self.state != "post_round":
return None
@@ -716,6 +717,6 @@ class SnakeAndLaddersGame:
return x_level, y_level
@staticmethod
- def _is_moderator(user: Member) -> bool:
+ def _is_moderator(user: Union[User, Member]) -> bool:
"""Return True if the user is a Moderator."""
- return any(Roles.moderator == role.id for role in user.roles)
+ return any(role.id in MODERATION_ROLES for role in getattr(user, 'roles', []))
diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py
index 712c8a12..4a1cec5b 100644
--- a/bot/exts/fun/trivia_quiz.py
+++ b/bot/exts/fun/trivia_quiz.py
@@ -16,7 +16,7 @@ from discord.ext import commands, tasks
from rapidfuzz import fuzz
from bot.bot import Bot
-from bot.constants import Client, Colours, NEGATIVE_REPLIES, Roles
+from bot.constants import Client, Colours, MODERATION_ROLES, NEGATIVE_REPLIES
logger = logging.getLogger(__name__)
@@ -550,7 +550,7 @@ class TriviaQuiz(commands.Cog):
if self.game_status[ctx.channel.id]:
# Check if the author is the game starter or a moderator.
if ctx.author == self.game_owners[ctx.channel.id] or any(
- Roles.moderator == role.id for role in ctx.author.roles
+ role.id in MODERATION_ROLES for role in getattr(ctx.author, 'roles', [])
):
self.game_status[ctx.channel.id] = False
del self.game_owners[ctx.channel.id]
diff --git a/bot/exts/holidays/easter/earth_photos.py b/bot/exts/holidays/easter/earth_photos.py
index f65790af..27442f1c 100644
--- a/bot/exts/holidays/easter/earth_photos.py
+++ b/bot/exts/holidays/easter/earth_photos.py
@@ -4,8 +4,7 @@ import discord
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Colours
-from bot.constants import Tokens
+from bot.constants import Colours, Tokens
log = logging.getLogger(__name__)
diff --git a/bot/exts/holidays/halloween/candy_collection.py b/bot/exts/holidays/halloween/candy_collection.py
index 079d900d..729bbc97 100644
--- a/bot/exts/holidays/halloween/candy_collection.py
+++ b/bot/exts/holidays/halloween/candy_collection.py
@@ -83,6 +83,11 @@ class CandyCollection(commands.Cog):
# if its not a candy or skull, and it is one of 10 most recent messages,
# proceed to add a skull/candy with higher chance
if str(reaction.emoji) not in (EMOJIS["SKULL"], EMOJIS["CANDY"]):
+ # Ensure the reaction is not for a bot's message so users can't spam
+ # reaction buttons like in .help to get candies.
+ if message.author.bot:
+ return
+
recent_message_ids = map(
lambda m: m.id,
await self.hacktober_channel.history(limit=10).flatten()
@@ -182,6 +187,12 @@ class CandyCollection(commands.Cog):
for index, record in enumerate(top_five)
) if top_five else "No Candies"
+ def get_user_candy_score() -> str:
+ for user_id, score in records:
+ if user_id == ctx.author.id:
+ return f"{ctx.author.mention}: {score}"
+ return f"{ctx.author.mention}: 0"
+
e = discord.Embed(colour=discord.Colour.og_blurple())
e.add_field(
name="Top Candy Records",
@@ -189,6 +200,11 @@ class CandyCollection(commands.Cog):
inline=False
)
e.add_field(
+ name="Your Candy Score",
+ value=get_user_candy_score(),
+ inline=False
+ )
+ e.add_field(
name="\u200b",
value="Candies will randomly appear on messages sent. "
"\nHit the candy when it appears as fast as possible to get the candy! "
diff --git a/bot/exts/holidays/halloween/scarymovie.py b/bot/exts/holidays/halloween/scarymovie.py
index 33659fd8..89310b97 100644
--- a/bot/exts/holidays/halloween/scarymovie.py
+++ b/bot/exts/holidays/halloween/scarymovie.py
@@ -6,6 +6,7 @@ from discord.ext import commands
from bot.bot import Bot
from bot.constants import Tokens
+
log = logging.getLogger(__name__)
diff --git a/bot/exts/holidays/hanukkah/hanukkah_embed.py b/bot/exts/holidays/hanukkah/hanukkah_embed.py
index ac3eab7b..5767f91e 100644
--- a/bot/exts/holidays/hanukkah/hanukkah_embed.py
+++ b/bot/exts/holidays/hanukkah/hanukkah_embed.py
@@ -21,45 +21,41 @@ class HanukkahEmbed(commands.Cog):
def __init__(self, bot: Bot):
self.bot = bot
- self.hanukkah_days = []
- self.hanukkah_months = []
- self.hanukkah_years = []
+ self.hanukkah_dates: list[datetime.date] = []
- async def get_hanukkah_dates(self) -> list[str]:
+ def _parse_time_to_datetime(self, date: list[str]) -> datetime.datetime:
+ """Format the times provided by the api to datetime forms."""
+ try:
+ return datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z")
+ except ValueError:
+ # there is a possibility of an event not having a time, just a day
+ # to catch this, we try again without time information
+ return datetime.datetime.strptime(date, "%Y-%m-%d")
+
+ async def fetch_hanukkah_dates(self) -> list[datetime.date]:
"""Gets the dates for hanukkah festival."""
- hanukkah_dates = []
+ # clear the datetime objects to prevent a memory link
+ self.hanukkah_dates = []
async with self.bot.http_session.get(HEBCAL_URL) as response:
json_data = await response.json()
festivals = json_data["items"]
for festival in festivals:
if festival["title"].startswith("Chanukah"):
date = festival["date"]
- hanukkah_dates.append(date)
- return hanukkah_dates
+ self.hanukkah_dates.append(self._parse_time_to_datetime(date).date())
+ return self.hanukkah_dates
@in_month(Month.NOVEMBER, Month.DECEMBER)
@commands.command(name="hanukkah", aliases=("chanukah",))
async def hanukkah_festival(self, ctx: commands.Context) -> None:
"""Tells you about the Hanukkah Festivaltime of festival, festival day, etc)."""
- hanukkah_dates = await self.get_hanukkah_dates()
- self.hanukkah_dates_split(hanukkah_dates)
- hanukkah_start_day = int(self.hanukkah_days[0])
- hanukkah_start_month = int(self.hanukkah_months[0])
- hanukkah_start_year = int(self.hanukkah_years[0])
- hanukkah_end_day = int(self.hanukkah_days[8])
- hanukkah_end_month = int(self.hanukkah_months[8])
- hanukkah_end_year = int(self.hanukkah_years[8])
-
- hanukkah_start = datetime.date(hanukkah_start_year, hanukkah_start_month, hanukkah_start_day)
- hanukkah_end = datetime.date(hanukkah_end_year, hanukkah_end_month, hanukkah_end_day)
+ hanukkah_dates = await self.fetch_hanukkah_dates()
+ start_day = hanukkah_dates[0]
+ end_day = hanukkah_dates[-1]
today = datetime.date.today()
- # today = datetime.date(2019, 12, 24) (for testing)
- day = str(today.day)
- month = str(today.month)
- year = str(today.year)
embed = Embed(title="Hanukkah", colour=Colours.blue)
- if day in self.hanukkah_days and month in self.hanukkah_months and year in self.hanukkah_years:
- if int(day) == hanukkah_start_day:
+ if start_day <= today <= end_day:
+ if start_day == today:
now = datetime.datetime.utcnow()
hours = now.hour + 4 # using only hours
hanukkah_start_hour = 18
@@ -77,35 +73,27 @@ class HanukkahEmbed(commands.Cog):
)
await ctx.send(embed=embed)
return
- festival_day = self.hanukkah_days.index(day)
+ festival_day = hanukkah_dates.index(today)
number_suffixes = ["st", "nd", "rd", "th"]
suffix = number_suffixes[festival_day - 1 if festival_day <= 3 else 3]
message = ":menorah:" * festival_day
- embed.description = f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}"
- await ctx.send(embed=embed)
+ embed.description = (
+ f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}"
+ )
+ elif today < start_day:
+ format_start = start_day.strftime("%d of %B")
+ embed.description = (
+ "Hanukkah has not started yet. "
+ f"Hanukkah will start at sundown on {format_start}."
+ )
else:
- if today < hanukkah_start:
- festival_starting_month = hanukkah_start.strftime("%B")
- embed.description = (
- f"Hanukkah has not started yet. "
- f"Hanukkah will start at sundown on {hanukkah_start_day}th "
- f"of {festival_starting_month}."
- )
- else:
- festival_end_month = hanukkah_end.strftime("%B")
- embed.description = (
- f"Looks like you missed Hanukkah!"
- f"Hanukkah ended on {hanukkah_end_day}th of {festival_end_month}."
- )
-
- await ctx.send(embed=embed)
+ format_end = end_day.strftime("%d of %B")
+ embed.description = (
+ "Looks like you missed Hanukkah! "
+ f"Hanukkah ended on {format_end}."
+ )
- def hanukkah_dates_split(self, hanukkah_dates: list[str]) -> None:
- """We are splitting the dates for hanukkah into days, months and years."""
- for date in hanukkah_dates:
- self.hanukkah_days.append(date[8:10])
- self.hanukkah_months.append(date[5:7])
- self.hanukkah_years.append(date[0:4])
+ await ctx.send(embed=embed)
def setup(bot: Bot) -> None:
diff --git a/bot/exts/holidays/valentines/be_my_valentine.py b/bot/exts/holidays/valentines/be_my_valentine.py
index 4d454c3a..1572d474 100644
--- a/bot/exts/holidays/valentines/be_my_valentine.py
+++ b/bot/exts/holidays/valentines/be_my_valentine.py
@@ -7,14 +7,16 @@ import discord
from discord.ext import commands
from bot.bot import Bot
-from bot.constants import Channels, Colours, Lovefest, Month
+from bot.constants import Channels, Colours, Lovefest, Month, PYTHON_PREFIX
from bot.utils.decorators import in_month
-from bot.utils.extensions import invoke_help_command
+from bot.utils.exceptions import MovedCommandError
log = logging.getLogger(__name__)
HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_heart:", ":two_hearts:"]
+MOVED_COMMAND = f"{PYTHON_PREFIX}subscribe"
+
class BeMyValentine(commands.Cog):
"""A cog that sends Valentines to other users!"""
@@ -30,40 +32,14 @@ class BeMyValentine(commands.Cog):
return loads(p.read_text("utf8"))
@in_month(Month.FEBRUARY)
- @commands.group(name="lovefest")
+ @commands.command(name="lovefest", help=f"NOTE: This command has been moved to {MOVED_COMMAND}")
async def lovefest_role(self, ctx: commands.Context) -> None:
"""
- Subscribe or unsubscribe from the lovefest role.
-
- The lovefest role makes you eligible to receive anonymous valentines from other users.
+ Deprecated lovefest role command.
- 1) use the command \".lovefest sub\" to get the lovefest role.
- 2) use the command \".lovefest unsub\" to get rid of the lovefest role.
+ This command has been moved to bot, and will be removed in the future.
"""
- if not ctx.invoked_subcommand:
- await invoke_help_command(ctx)
-
- @lovefest_role.command(name="sub")
- async def add_role(self, ctx: commands.Context) -> None:
- """Adds the lovefest role."""
- user = ctx.author
- role = ctx.guild.get_role(Lovefest.role_id)
- if role not in ctx.author.roles:
- await user.add_roles(role)
- await ctx.send("The Lovefest role has been added !")
- else:
- await ctx.send("You already have the role !")
-
- @lovefest_role.command(name="unsub")
- async def remove_role(self, ctx: commands.Context) -> None:
- """Removes the lovefest role."""
- user = ctx.author
- role = ctx.guild.get_role(Lovefest.role_id)
- if role not in ctx.author.roles:
- await ctx.send("You dont have the lovefest role.")
- else:
- await user.remove_roles(role)
- await ctx.send("The lovefest role has been successfully removed!")
+ raise MovedCommandError(MOVED_COMMAND)
@commands.cooldown(1, 1800, commands.BucketType.user)
@commands.group(name="bemyvalentine", invoke_without_command=True)
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py
index a11c366b..b50205a0 100644
--- a/bot/exts/utilities/bookmark.py
+++ b/bot/exts/utilities/bookmark.py
@@ -102,7 +102,7 @@ class Bookmark(commands.Cog):
"You must either provide a valid message to bookmark, or reply to one."
"\n\nThe lookup strategy for a message is as follows (in order):"
"\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')"
- "\n2. Lookup by message ID (the message **must** have been sent after the bot last started)"
+ "\n2. Lookup by message ID (the message **must** be in the context channel)"
"\n3. Lookup by message URL"
)
target_message = ctx.message.reference.resolved
diff --git a/bot/exts/utilities/challenges.py b/bot/exts/utilities/challenges.py
index 234eb0be..ab7ae442 100644
--- a/bot/exts/utilities/challenges.py
+++ b/bot/exts/utilities/challenges.py
@@ -162,13 +162,20 @@ class Challenges(commands.Cog):
kata_description = "\n".join(kata_description[:1000].split("\n")[:-1]) + "..."
kata_description += f" [continue reading]({kata_url})"
+ if kata_information["rank"]["name"] is None:
+ embed_color = 8
+ kata_difficulty = "Unable to retrieve difficulty for beta languages."
+ else:
+ embed_color = int(kata_information["rank"]["name"].replace(" kyu", ""))
+ kata_difficulty = kata_information["rank"]["name"]
+
kata_embed = Embed(
title=kata_information["name"],
description=kata_description,
- color=MAPPING_OF_KYU[int(kata_information["rank"]["name"].replace(" kyu", ""))],
+ color=MAPPING_OF_KYU[embed_color],
url=kata_url
)
- kata_embed.add_field(name="Difficulty", value=kata_information["rank"]["name"], inline=False)
+ kata_embed.add_field(name="Difficulty", value=kata_difficulty, inline=False)
return kata_embed
@staticmethod
@@ -268,30 +275,29 @@ class Challenges(commands.Cog):
`.challenge <language> <query>, <difficulty>` - Pulls a random challenge with the query provided,
under that difficulty within the language's scope.
"""
- if language.lower() not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]:
+ language = language.lower()
+ if language not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]:
raise commands.BadArgument("This is not a recognized language on codewars.com!")
get_kata_link = f"https://codewars.com/kata/search/{language}"
params = {}
- if language and not query:
- level = f"-{choice([1, 2, 3, 4, 5, 6, 7, 8])}"
- params["r[]"] = level
- elif "," in query:
- query_splitted = query.split("," if ", " not in query else ", ")
+ if query is not None:
+ if "," in query:
+ query_splitted = query.split("," if ", " not in query else ", ")
- if len(query_splitted) > 2:
- raise commands.BadArgument(
- "There can only be one comma within the query, separating the difficulty and the query itself."
- )
+ if len(query_splitted) > 2:
+ raise commands.BadArgument(
+ "There can only be one comma within the query, separating the difficulty and the query itself."
+ )
- query, level = query_splitted
- params["q"] = query
- params["r[]"] = f"-{level}"
- elif query.isnumeric():
- params["r[]"] = f"-{query}"
- else:
- params["q"] = query
+ query, level = query_splitted
+ params["q"] = query
+ params["r[]"] = f"-{level}"
+ elif query.isnumeric():
+ params["r[]"] = f"-{query}"
+ else:
+ params["q"] = query
params["beta"] = str(language in SUPPORTED_LANGUAGES["beta"]).lower()
diff --git a/bot/exts/utilities/colour.py b/bot/exts/utilities/colour.py
new file mode 100644
index 00000000..ee6bad93
--- /dev/null
+++ b/bot/exts/utilities/colour.py
@@ -0,0 +1,266 @@
+import colorsys
+import json
+import pathlib
+import random
+import string
+from io import BytesIO
+from typing import Optional
+
+import discord
+import rapidfuzz
+from PIL import Image, ImageColor
+from discord.ext import commands
+
+from bot import constants
+from bot.bot import Bot
+from bot.exts.core.extensions import invoke_help_command
+from bot.utils.decorators import whitelist_override
+
+THUMBNAIL_SIZE = (80, 80)
+
+
+class Colour(commands.Cog):
+ """Cog for the Colour command."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ with open(pathlib.Path("bot/resources/utilities/ryanzec_colours.json")) as f:
+ self.colour_mapping = json.load(f)
+ del self.colour_mapping['_'] # Delete source credit entry
+
+ async def send_colour_response(self, ctx: commands.Context, rgb: tuple[int, int, int]) -> None:
+ """Create and send embed from user given colour information."""
+ name = self._rgb_to_name(rgb)
+ try:
+ colour_or_color = ctx.invoked_parents[0]
+ except IndexError:
+ colour_or_color = "colour"
+
+ colour_mode = ctx.invoked_with
+ if colour_mode == "random":
+ colour_mode = colour_or_color
+ input_colour = name
+ elif colour_mode in ("colour", "color"):
+ input_colour = ctx.kwargs["colour_input"]
+ elif colour_mode == "name":
+ input_colour = ctx.kwargs["user_colour_name"]
+ elif colour_mode == "hex":
+ input_colour = ctx.args[2:][0]
+ if len(input_colour) > 7:
+ input_colour = input_colour[0:-2]
+ else:
+ input_colour = tuple(ctx.args[2:])
+
+ if colour_mode not in ("name", "hex", "random", "color", "colour"):
+ colour_mode = colour_mode.upper()
+ else:
+ colour_mode = colour_mode.title()
+
+ colour_embed = discord.Embed(
+ title=f"{name or input_colour}",
+ description=f"{colour_or_color.title()} information for {colour_mode} `{input_colour or name}`.",
+ colour=discord.Color.from_rgb(*rgb)
+ )
+ colour_conversions = self.get_colour_conversions(rgb)
+ for colour_space, value in colour_conversions.items():
+ colour_embed.add_field(
+ name=colour_space,
+ value=f"`{value}`",
+ inline=True
+ )
+
+ thumbnail = Image.new("RGB", THUMBNAIL_SIZE, color=rgb)
+ buffer = BytesIO()
+ thumbnail.save(buffer, "PNG")
+ buffer.seek(0)
+ thumbnail_file = discord.File(buffer, filename="colour.png")
+
+ colour_embed.set_thumbnail(url="attachment://colour.png")
+
+ await ctx.send(file=thumbnail_file, embed=colour_embed)
+
+ @commands.group(aliases=("color",), invoke_without_command=True)
+ @whitelist_override(
+ channels=constants.WHITELISTED_CHANNELS,
+ roles=constants.STAFF_ROLES,
+ categories=[constants.Categories.development, constants.Categories.media]
+ )
+ async def colour(self, ctx: commands.Context, *, colour_input: Optional[str] = None) -> None:
+ """
+ Create an embed that displays colour information.
+
+ If no subcommand is called, a randomly selected colour will be shown.
+ """
+ if colour_input is None:
+ await self.random(ctx)
+ return
+
+ try:
+ extra_colour = ImageColor.getrgb(colour_input)
+ await self.send_colour_response(ctx, extra_colour)
+ except ValueError:
+ await invoke_help_command(ctx)
+
+ @colour.command()
+ async def rgb(self, ctx: commands.Context, red: int, green: int, blue: int) -> None:
+ """Create an embed from an RGB input."""
+ if any(c not in range(256) for c in (red, green, blue)):
+ raise commands.BadArgument(
+ message=f"RGB values can only be from 0 to 255. User input was: `{red, green, blue}`."
+ )
+ rgb_tuple = (red, green, blue)
+ await self.send_colour_response(ctx, rgb_tuple)
+
+ @colour.command()
+ async def hsv(self, ctx: commands.Context, hue: int, saturation: int, value: int) -> None:
+ """Create an embed from an HSV input."""
+ if (hue not in range(361)) or any(c not in range(101) for c in (saturation, value)):
+ raise commands.BadArgument(
+ message="Hue can only be from 0 to 360. Saturation and Value can only be from 0 to 100. "
+ f"User input was: `{hue, saturation, value}`."
+ )
+ hsv_tuple = ImageColor.getrgb(f"hsv({hue}, {saturation}%, {value}%)")
+ await self.send_colour_response(ctx, hsv_tuple)
+
+ @colour.command()
+ async def hsl(self, ctx: commands.Context, hue: int, saturation: int, lightness: int) -> None:
+ """Create an embed from an HSL input."""
+ if (hue not in range(361)) or any(c not in range(101) for c in (saturation, lightness)):
+ raise commands.BadArgument(
+ message="Hue can only be from 0 to 360. Saturation and Lightness can only be from 0 to 100. "
+ f"User input was: `{hue, saturation, lightness}`."
+ )
+ hsl_tuple = ImageColor.getrgb(f"hsl({hue}, {saturation}%, {lightness}%)")
+ await self.send_colour_response(ctx, hsl_tuple)
+
+ @colour.command()
+ async def cmyk(self, ctx: commands.Context, cyan: int, magenta: int, yellow: int, key: int) -> None:
+ """Create an embed from a CMYK input."""
+ if any(c not in range(101) for c in (cyan, magenta, yellow, key)):
+ raise commands.BadArgument(
+ message=f"CMYK values can only be from 0 to 100. User input was: `{cyan, magenta, yellow, key}`."
+ )
+ r = round(255 * (1 - (cyan / 100)) * (1 - (key / 100)))
+ g = round(255 * (1 - (magenta / 100)) * (1 - (key / 100)))
+ b = round(255 * (1 - (yellow / 100)) * (1 - (key / 100)))
+ await self.send_colour_response(ctx, (r, g, b))
+
+ @colour.command()
+ async def hex(self, ctx: commands.Context, hex_code: str) -> None:
+ """Create an embed from a HEX input."""
+ if hex_code[0] != "#":
+ hex_code = f"#{hex_code}"
+
+ if len(hex_code) not in (4, 5, 7, 9) or any(digit not in string.hexdigits for digit in hex_code[1:]):
+ raise commands.BadArgument(
+ message=f"Cannot convert `{hex_code}` to a recognizable Hex format. "
+ "Hex values must be hexadecimal and take the form *#RRGGBB* or *#RGB*."
+ )
+
+ hex_tuple = ImageColor.getrgb(hex_code)
+ if len(hex_tuple) == 4:
+ hex_tuple = hex_tuple[:-1] # Colour must be RGB. If RGBA, we remove the alpha value
+ await self.send_colour_response(ctx, hex_tuple)
+
+ @colour.command()
+ async def name(self, ctx: commands.Context, *, user_colour_name: str) -> None:
+ """Create an embed from a name input."""
+ hex_colour = self.match_colour_name(ctx, user_colour_name)
+ if hex_colour is None:
+ name_error_embed = discord.Embed(
+ title="No colour match found.",
+ description=f"No colour found for: `{user_colour_name}`",
+ colour=discord.Color.dark_red()
+ )
+ await ctx.send(embed=name_error_embed)
+ return
+ hex_tuple = ImageColor.getrgb(hex_colour)
+ await self.send_colour_response(ctx, hex_tuple)
+
+ @colour.command()
+ async def random(self, ctx: commands.Context) -> None:
+ """Create an embed from a randomly chosen colour."""
+ hex_colour = random.choice(list(self.colour_mapping.values()))
+ hex_tuple = ImageColor.getrgb(f"#{hex_colour}")
+ await self.send_colour_response(ctx, hex_tuple)
+
+ def get_colour_conversions(self, rgb: tuple[int, int, int]) -> dict[str, str]:
+ """Create a dictionary mapping of colour types and their values."""
+ colour_name = self._rgb_to_name(rgb)
+ if colour_name is None:
+ colour_name = "No match found"
+ return {
+ "RGB": rgb,
+ "HSV": self._rgb_to_hsv(rgb),
+ "HSL": self._rgb_to_hsl(rgb),
+ "CMYK": self._rgb_to_cmyk(rgb),
+ "Hex": self._rgb_to_hex(rgb),
+ "Name": colour_name
+ }
+
+ @staticmethod
+ def _rgb_to_hsv(rgb: tuple[int, int, int]) -> tuple[int, int, int]:
+ """Convert RGB values to HSV values."""
+ rgb_list = [val / 255 for val in rgb]
+ h, s, v = colorsys.rgb_to_hsv(*rgb_list)
+ hsv = (round(h * 360), round(s * 100), round(v * 100))
+ return hsv
+
+ @staticmethod
+ def _rgb_to_hsl(rgb: tuple[int, int, int]) -> tuple[int, int, int]:
+ """Convert RGB values to HSL values."""
+ rgb_list = [val / 255.0 for val in rgb]
+ h, l, s = colorsys.rgb_to_hls(*rgb_list)
+ hsl = (round(h * 360), round(s * 100), round(l * 100))
+ return hsl
+
+ @staticmethod
+ def _rgb_to_cmyk(rgb: tuple[int, int, int]) -> tuple[int, int, int, int]:
+ """Convert RGB values to CMYK values."""
+ rgb_list = [val / 255.0 for val in rgb]
+ if not any(rgb_list):
+ return 0, 0, 0, 100
+ k = 1 - max(rgb_list)
+ c = round((1 - rgb_list[0] - k) * 100 / (1 - k))
+ m = round((1 - rgb_list[1] - k) * 100 / (1 - k))
+ y = round((1 - rgb_list[2] - k) * 100 / (1 - k))
+ cmyk = (c, m, y, round(k * 100))
+ return cmyk
+
+ @staticmethod
+ def _rgb_to_hex(rgb: tuple[int, int, int]) -> str:
+ """Convert RGB values to HEX code."""
+ hex_ = "".join([hex(val)[2:].zfill(2) for val in rgb])
+ hex_code = f"#{hex_}".upper()
+ return hex_code
+
+ def _rgb_to_name(self, rgb: tuple[int, int, int]) -> Optional[str]:
+ """Convert RGB values to a fuzzy matched name."""
+ input_hex_colour = self._rgb_to_hex(rgb)
+ try:
+ match, certainty, _ = rapidfuzz.process.extractOne(
+ query=input_hex_colour,
+ choices=self.colour_mapping.values(),
+ score_cutoff=80
+ )
+ colour_name = [name for name, hex_code in self.colour_mapping.items() if hex_code == match][0]
+ except TypeError:
+ colour_name = None
+ return colour_name
+
+ def match_colour_name(self, ctx: commands.Context, input_colour_name: str) -> Optional[str]:
+ """Convert a colour name to HEX code."""
+ try:
+ match, certainty, _ = rapidfuzz.process.extractOne(
+ query=input_colour_name,
+ choices=self.colour_mapping.keys(),
+ score_cutoff=80
+ )
+ except (ValueError, TypeError):
+ return
+ return f"#{self.colour_mapping[match]}"
+
+
+def setup(bot: Bot) -> None:
+ """Load the Colour cog."""
+ bot.add_cog(Colour(bot))
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
index 36655e1b..b6d5a43e 100644
--- a/bot/exts/utilities/issues.py
+++ b/bot/exts/utilities/issues.py
@@ -9,14 +9,7 @@ from discord.ext import commands
from bot.bot import Bot
from bot.constants import (
- Categories,
- Channels,
- Colours,
- ERROR_REPLIES,
- Emojis,
- NEGATIVE_REPLIES,
- Tokens,
- WHITELISTED_CHANNELS
+ Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS
)
from bot.utils.decorators import whitelist_override
from bot.utils.extensions import invoke_help_command
diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py
deleted file mode 100644
index 36c7e0ab..00000000
--- a/bot/exts/utilities/latex.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import asyncio
-import hashlib
-import pathlib
-import re
-from concurrent.futures import ThreadPoolExecutor
-from io import BytesIO
-
-import discord
-import matplotlib.pyplot as plt
-from discord.ext import commands
-
-from bot.bot import Bot
-
-# configure fonts and colors for matplotlib
-plt.rcParams.update(
- {
- "font.size": 16,
- "mathtext.fontset": "cm", # Computer Modern font set
- "mathtext.rm": "serif",
- "figure.facecolor": "36393F", # matches Discord's dark mode background color
- "text.color": "white",
- }
-)
-
-FORMATTED_CODE_REGEX = re.compile(
- r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
- r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
- r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
- r"(?P<code>.*?)" # extract all code inside the markup
- r"\s*" # any more whitespace before the end of the code markup
- r"(?P=delim)", # match the exact same delimiter from the start again
- re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
-)
-
-CACHE_DIRECTORY = pathlib.Path("_latex_cache")
-CACHE_DIRECTORY.mkdir(exist_ok=True)
-
-
-class Latex(commands.Cog):
- """Renders latex."""
-
- @staticmethod
- def _render(text: str, filepath: pathlib.Path) -> BytesIO:
- """
- Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception.
-
- Saves rendered image to cache.
- """
- fig = plt.figure()
- rendered_image = BytesIO()
- fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top")
-
- try:
- plt.savefig(rendered_image, bbox_inches="tight", dpi=600)
- except ValueError as e:
- raise commands.BadArgument(str(e))
-
- rendered_image.seek(0)
-
- with open(filepath, "wb") as f:
- f.write(rendered_image.getbuffer())
-
- return rendered_image
-
- @staticmethod
- def _prepare_input(text: str) -> str:
- text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\
-
- if match := FORMATTED_CODE_REGEX.match(text):
- return match.group("code")
- else:
- return text
-
- @commands.command()
- @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
- async def latex(self, ctx: commands.Context, *, text: str) -> None:
- """Renders the text in latex and sends the image."""
- text = self._prepare_input(text)
- query_hash = hashlib.md5(text.encode()).hexdigest()
- image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png")
- async with ctx.typing():
- if image_path.exists():
- await ctx.send(file=discord.File(image_path))
- return
-
- with ThreadPoolExecutor() as pool:
- image = await asyncio.get_running_loop().run_in_executor(
- pool, self._render, text, image_path
- )
-
- await ctx.send(file=discord.File(image, "latex.png"))
-
-
-def setup(bot: Bot) -> None:
- """Load the Latex Cog."""
- # As we have resource issues on this cog,
- # we have it currently disabled while we fix it.
- import logging
- logging.info("Latex cog is currently disabled. It won't be loaded.")
- return
- bot.add_cog(Latex())
diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py
index ef8b2638..bf8f1341 100644
--- a/bot/exts/utilities/realpython.py
+++ b/bot/exts/utilities/realpython.py
@@ -1,5 +1,6 @@
import logging
from html import unescape
+from typing import Optional
from urllib.parse import quote_plus
from discord import Embed
@@ -31,9 +32,18 @@ class RealPython(commands.Cog):
@commands.command(aliases=["rp"])
@commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
- async def realpython(self, ctx: commands.Context, *, user_search: str) -> None:
- """Send 5 articles that match the user's search terms."""
- params = {"q": user_search, "limit": 5, "kind": "article"}
+ async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None:
+ """
+ Send some articles from RealPython that match the search terms.
+
+ By default the top 5 matches are sent, this can be overwritten to
+ a number between 1 and 5 by specifying an amount before the search query.
+ """
+ if not 1 <= amount <= 5:
+ await ctx.send("`amount` must be between 1 and 5 (inclusive).")
+ return
+
+ params = {"q": user_search, "limit": amount, "kind": "article"}
async with self.bot.http_session.get(url=API_ROOT, params=params) as response:
if response.status != 200:
logger.error(
diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py
index c5283de0..e5e8e289 100644
--- a/bot/exts/utilities/wikipedia.py
+++ b/bot/exts/utilities/wikipedia.py
@@ -86,9 +86,7 @@ class WikipediaSearch(commands.Cog):
)
embed.set_thumbnail(url=WIKI_THUMBNAIL)
embed.timestamp = datetime.utcnow()
- await LinePaginator.paginate(
- contents, ctx, embed
- )
+ await LinePaginator.paginate(contents, ctx, embed, restrict_to_user=ctx.author)
else:
await ctx.send(
"Sorry, we could not find a wikipedia article using that search term."
diff --git a/bot/exts/utilities/wtf_python.py b/bot/exts/utilities/wtf_python.py
new file mode 100644
index 00000000..980b3dba
--- /dev/null
+++ b/bot/exts/utilities/wtf_python.py
@@ -0,0 +1,138 @@
+import logging
+import random
+import re
+from typing import Optional
+
+import rapidfuzz
+from discord import Embed, File
+from discord.ext import commands, tasks
+
+from bot import constants
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+
+WTF_PYTHON_RAW_URL = "http://raw.githubusercontent.com/satwikkansal/wtfpython/master/"
+BASE_URL = "https://github.com/satwikkansal/wtfpython"
+LOGO_PATH = "./bot/resources/utilities/wtf_python_logo.jpg"
+
+ERROR_MESSAGE = f"""
+Unknown WTF Python Query. Please try to reformulate your query.
+
+**Examples**:
+```md
+{constants.Client.prefix}wtf wild imports
+{constants.Client.prefix}wtf subclass
+{constants.Client.prefix}wtf del
+```
+If the problem persists send a message in <#{constants.Channels.dev_contrib}>
+"""
+
+MINIMUM_CERTAINTY = 55
+
+
+class WTFPython(commands.Cog):
+ """Cog that allows getting WTF Python entries from the WTF Python repository."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.headers: dict[str, str] = {}
+ self.fetch_readme.start()
+
+ @tasks.loop(minutes=60)
+ async def fetch_readme(self) -> None:
+ """Gets the content of README.md from the WTF Python Repository."""
+ async with self.bot.http_session.get(f"{WTF_PYTHON_RAW_URL}README.md") as resp:
+ log.trace("Fetching the latest WTF Python README.md")
+ if resp.status == 200:
+ raw = await resp.text()
+ self.parse_readme(raw)
+
+ def parse_readme(self, data: str) -> None:
+ """
+ Parses the README.md into a dict.
+
+ It parses the readme into the `self.headers` dict,
+ where the key is the heading and the value is the
+ link to the heading.
+ """
+ # Match the start of examples, until the end of the table of contents (toc)
+ table_of_contents = re.search(
+ r"\[👀 Examples\]\(#-examples\)\n([\w\W]*)<!-- tocstop -->", data
+ )[0].split("\n")
+
+ for header in list(map(str.strip, table_of_contents)):
+ match = re.search(r"\[â–¶ (.*)\]\((.*)\)", header)
+ if match:
+ hyper_link = match[0].split("(")[1].replace(")", "")
+ self.headers[match[0]] = f"{BASE_URL}/{hyper_link}"
+
+ def fuzzy_match_header(self, query: str) -> Optional[str]:
+ """
+ Returns the fuzzy match of a query if its ratio is above "MINIMUM_CERTAINTY" else returns None.
+
+ "MINIMUM_CERTAINTY" is the lowest score at which the fuzzy match will return a result.
+ The certainty returned by rapidfuzz.process.extractOne is a score between 0 and 100,
+ with 100 being a perfect match.
+ """
+ match, certainty, _ = rapidfuzz.process.extractOne(query, self.headers.keys())
+ return match if certainty > MINIMUM_CERTAINTY else None
+
+ @commands.command(aliases=("wtf", "WTF"))
+ async def wtf_python(self, ctx: commands.Context, *, query: Optional[str] = None) -> None:
+ """
+ Search WTF Python repository.
+
+ Gets the link of the fuzzy matched query from https://github.com/satwikkansal/wtfpython.
+ Usage:
+ --> .wtf wild imports
+ """
+ if query is None:
+ no_query_embed = Embed(
+ title="WTF Python?!",
+ colour=constants.Colours.dark_green,
+ description="A repository filled with suprising snippets that can make you say WTF?!\n\n"
+ f"[Go to the Repository]({BASE_URL})"
+ )
+ logo = File(LOGO_PATH, filename="wtf_logo.jpg")
+ no_query_embed.set_thumbnail(url="attachment://wtf_logo.jpg")
+ await ctx.send(embed=no_query_embed, file=logo)
+ return
+
+ if len(query) > 50:
+ embed = Embed(
+ title=random.choice(constants.ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=constants.Colours.soft_red,
+ )
+ match = None
+ else:
+ match = self.fuzzy_match_header(query)
+
+ if not match:
+ embed = Embed(
+ title=random.choice(constants.ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=constants.Colours.soft_red,
+ )
+ await ctx.send(embed=embed)
+ return
+
+ embed = Embed(
+ title="WTF Python?!",
+ colour=constants.Colours.dark_green,
+ description=f"""Search result for '{query}': {match.split("]")[0].replace("[", "")}
+ [Go to Repository Section]({self.headers[match]})""",
+ )
+ logo = File(LOGO_PATH, filename="wtf_logo.jpg")
+ embed.set_thumbnail(url="attachment://wtf_logo.jpg")
+ await ctx.send(embed=embed, file=logo)
+
+ def cog_unload(self) -> None:
+ """Unload the cog and cancel the task."""
+ self.fetch_readme.cancel()
+
+
+def setup(bot: Bot) -> None:
+ """Load the WTFPython Cog."""
+ bot.add_cog(WTFPython(bot))