diff options
author | 2021-12-30 23:41:58 +0000 | |
---|---|---|
committer | 2021-12-30 23:41:58 +0000 | |
commit | 17bcd45c68f144f3b88fdfdf05bd0db654c0322e (patch) | |
tree | ad07a6a449a322165d7246699908256c2df7a047 /bot/exts | |
parent | Migrate to `og_blurple` (diff) | |
parent | Merge pull request #994 from python-discord/logging-in-AoC-completer-task (diff) |
Merge remote-tracking branch 'origin/main' into main
# Conflicts:
# bot/exts/holidays/halloween/candy_collection.py
Diffstat (limited to 'bot/exts')
22 files changed, 828 insertions, 299 deletions
diff --git a/bot/exts/avatar_modification/avatar_modify.py b/bot/exts/avatar_modification/avatar_modify.py index fbee96dc..3ee70cfd 100644 --- a/bot/exts/avatar_modification/avatar_modify.py +++ b/bot/exts/avatar_modification/avatar_modify.py @@ -286,7 +286,7 @@ class AvatarModify(commands.Cog): @avatar_modify.command( aliases=("savatar", "spookify"), root_aliases=("spookyavatar", "spookify", "savatar"), - brief="Spookify an user's avatar." + brief="Spookify a user's avatar." ) async def spookyavatar(self, ctx: commands.Context) -> None: """This "spookifies" the user's avatar, with a random *spooky* effect.""" diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py index fd2123e7..676a1e70 100644 --- a/bot/exts/core/error_handler.py +++ b/bot/exts/core/error_handler.py @@ -12,7 +12,7 @@ from sentry_sdk import push_scope from bot.bot import Bot from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import APIError, UserNotPlayingError +from bot.utils.exceptions import APIError, MovedCommandError, UserNotPlayingError log = logging.getLogger(__name__) @@ -130,6 +130,14 @@ class CommandErrorHandler(commands.Cog): ) return + if isinstance(error, MovedCommandError): + description = ( + f"This command, `{ctx.prefix}{ctx.command.qualified_name}` has moved to `{error.new_command_name}`.\n" + f"Please use `{error.new_command_name}` instead." + ) + await ctx.send(embed=self.error_embed(description, NEGATIVE_REPLIES)) + return + with push_scope() as scope: scope.user = { "id": ctx.author.id, diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py index 4b766b50..db3c2aa6 100644 --- a/bot/exts/core/help.py +++ b/bot/exts/core/help.py @@ -13,10 +13,7 @@ from rapidfuzz import process from bot import constants from bot.bot import Bot from bot.constants import Emojis -from bot.utils.pagination import ( - FIRST_EMOJI, LAST_EMOJI, - LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, -) +from bot.utils.pagination import FIRST_EMOJI, LAST_EMOJI, LEFT_EMOJI, LinePaginator, RIGHT_EMOJI DELETE_EMOJI = Emojis.trashcan diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py index 4f6b4321..5b5461f0 100644 --- a/bot/exts/core/internal_eval/_internal_eval.py +++ b/bot/exts/core/internal_eval/_internal_eval.py @@ -10,6 +10,7 @@ from bot.bot import Bot from bot.constants import Client, Roles from bot.utils.decorators import with_role from bot.utils.extensions import invoke_help_command + from ._helpers import EvalContext __all__ = ["InternalEval"] @@ -146,14 +147,14 @@ class InternalEval(commands.Cog): await self._send_output(ctx, eval_context.format_output()) @commands.group(name="internal", aliases=("int",)) - @with_role(Roles.admin) + @with_role(Roles.admins) async def internal_group(self, ctx: commands.Context) -> None: """Internal commands. Top secret!""" if not ctx.invoked_subcommand: await invoke_help_command(ctx) @internal_group.command(name="eval", aliases=("e",)) - @with_role(Roles.admin) + @with_role(Roles.admins) async def eval(self, ctx: commands.Context, *, code: str) -> None: """Run eval in a REPL-like format.""" if match := list(FORMATTED_CODE_REGEX.finditer(code)): @@ -172,7 +173,7 @@ class InternalEval(commands.Cog): await self._eval(ctx, code) @internal_group.command(name="reset", aliases=("clear", "exit", "r", "c")) - @with_role(Roles.admin) + @with_role(Roles.admins) async def reset(self, ctx: commands.Context) -> None: """Reset the context and locals of the eval session.""" self.locals = {} diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py index 7dd967ec..c597fd0e 100644 --- a/bot/exts/events/advent_of_code/_cog.py +++ b/bot/exts/events/advent_of_code/_cog.py @@ -6,14 +6,16 @@ from typing import Optional import arrow import discord -from discord.ext import commands +from async_rediscache import RedisCache +from discord.ext import commands, tasks from bot.bot import Bot from bot.constants import ( - AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS, + AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS ) from bot.exts.events.advent_of_code import _helpers from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView +from bot.utils import members from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role from bot.utils.extensions import invoke_help_command @@ -31,6 +33,14 @@ AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,) class AdventOfCode(commands.Cog): """Advent of Code festivities! Ho Ho Ho!""" + # Redis Cache for linking Discord IDs to Advent of Code usernames + # RedisCache[member_id: aoc_username_string] + account_links = RedisCache() + + # A dict with keys of member_ids to block from getting the role + # RedisCache[member_id: None] + completionist_block_list = RedisCache() + def __init__(self, bot: Bot): self.bot = bot @@ -50,6 +60,59 @@ class AdventOfCode(commands.Cog): self.status_task.set_name("AoC Status Countdown") self.status_task.add_done_callback(_helpers.background_task_callback) + self.completionist_task.start() + + @tasks.loop(minutes=10.0) + async def completionist_task(self) -> None: + """ + Give members who have completed all 50 AoC stars the completionist role. + + Runs on a schedule, as defined in the task.loop decorator. + """ + await self.bot.wait_until_guild_available() + guild = self.bot.get_guild(Client.guild) + completionist_role = guild.get_role(Roles.aoc_completionist) + if completionist_role is None: + log.warning("Could not find the AoC completionist role; cancelling completionist task.") + self.completionist_task.cancel() + return + + aoc_name_to_member_id = { + aoc_name: member_id + for member_id, aoc_name in await self.account_links.items() + } + + try: + leaderboard = await _helpers.fetch_leaderboard() + except _helpers.FetchingLeaderboardFailedError: + await self.bot.send_log("Unable to fetch AoC leaderboard during role sync.") + return + + placement_leaderboard = json.loads(leaderboard["placement_leaderboard"]) + + for member_aoc_info in placement_leaderboard.values(): + if not member_aoc_info["stars"] == 50: + # Only give the role to people who have completed all 50 stars + continue + + member_id = aoc_name_to_member_id.get(member_aoc_info["name"], None) + if not member_id: + log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.") + continue + + member = await members.get_or_fetch_member(guild, member_id) + if member is None: + log.debug(f"Could not find {member_id}, not giving role.") + continue + + if completionist_role in member.roles: + log.debug(f"{member.name} ({member.mention}) already has the completionist role.") + continue + + if not await self.completionist_block_list.contains(member_id): + log.debug(f"Giving completionist role to {member.name} ({member.mention}).") + await members.handle_role_change(member, member.add_roles, completionist_role) + @commands.group(name="adventofcode", aliases=("aoc",)) @whitelist_override(channels=AOC_WHITELIST) async def adventofcode_group(self, ctx: commands.Context) -> None: @@ -57,6 +120,21 @@ class AdventOfCode(commands.Cog): if not ctx.invoked_subcommand: await invoke_help_command(ctx) + @with_role(Roles.admins) + @adventofcode_group.command( + name="block", + brief="Block a user from getting the completionist role.", + ) + async def block_from_role(self, ctx: commands.Context, member: discord.Member) -> None: + """Block the given member from receiving the AoC completionist role, removing it from them if needed.""" + completionist_role = ctx.guild.get_role(Roles.aoc_completionist) + if completionist_role in member.roles: + await member.remove_roles(completionist_role) + + await self.completionist_block_list.set(member.id, "sentinel") + await ctx.send(f":+1: Blocked {member.mention} from getting the AoC completionist role.") + + @commands.guild_only() @adventofcode_group.command( name="subscribe", aliases=("sub", "notifications", "notify", "notifs"), @@ -86,6 +164,7 @@ class AdventOfCode(commands.Cog): ) @in_month(Month.DECEMBER) + @commands.guild_only() @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") @whitelist_override(channels=AOC_WHITELIST) async def aoc_unsubscribe(self, ctx: commands.Context) -> None: @@ -102,32 +181,26 @@ class AdventOfCode(commands.Cog): @whitelist_override(channels=AOC_WHITELIST) async def aoc_countdown(self, ctx: commands.Context) -> None: """Return time left until next day.""" - if not _helpers.is_in_advent(): - datetime_now = arrow.now(_helpers.EST) - - # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past - this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) - next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) - deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) - delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta - - # Add a finer timedelta if there's less than a day left - if delta.days == 0: - delta_str = f"approximately {delta.seconds // 3600} hours" - else: - delta_str = f"{delta.days} days" + if _helpers.is_in_advent(): + tomorrow, _ = _helpers.time_left_to_est_midnight() + next_day_timestamp = int(tomorrow.timestamp()) - await ctx.send( - "The Advent of Code event is not currently running. " - f"The next event will start in {delta_str}." - ) + await ctx.send(f"Day {tomorrow.day} starts <t:{next_day_timestamp}:R>.") return - tomorrow, time_left = _helpers.time_left_to_est_midnight() + datetime_now = arrow.now(_helpers.EST) + # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past + this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) + next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) + deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) + delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta - hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + next_aoc_timestamp = int((datetime_now + delta).timestamp()) - await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") + await ctx.send( + "The Advent of Code event is not currently running. " + f"The next event will start <t:{next_aoc_timestamp}:R>." + ) @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") @whitelist_override(channels=AOC_WHITELIST) @@ -135,6 +208,7 @@ class AdventOfCode(commands.Cog): """Respond with an explanation of all things Advent of Code.""" await ctx.send(embed=self.cached_about_aoc) + @commands.guild_only() @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)") @whitelist_override(channels=AOC_WHITELIST) async def join_leaderboard(self, ctx: commands.Context) -> None: @@ -180,26 +254,93 @@ class AdventOfCode(commands.Cog): else: await ctx.message.add_reaction(Emojis.envelope) + @in_month(Month.NOVEMBER, Month.DECEMBER) + @adventofcode_group.command( + name="link", + aliases=("connect",), + brief="Tie your Discord account with your Advent of Code name." + ) + @whitelist_override(channels=AOC_WHITELIST) + async def aoc_link_account(self, ctx: commands.Context, *, aoc_name: str = None) -> None: + """ + Link your Discord Account to your Advent of Code name. + + Stored in a Redis Cache with the format of `Discord ID: Advent of Code Name` + """ + cache_items = await self.account_links.items() + cache_aoc_names = [value for _, value in cache_items] + + if aoc_name: + # Let's check the current values in the cache to make sure it isn't already tied to a different account + if aoc_name == await self.account_links.get(ctx.author.id): + await ctx.reply(f"{aoc_name} is already tied to your account.") + return + elif aoc_name in cache_aoc_names: + log.info( + f"{ctx.author} ({ctx.author.id}) tried to connect their account to {aoc_name}," + " but it's already connected to another user." + ) + await ctx.reply( + f"{aoc_name} is already tied to another account." + " Please contact an admin if you believe this is an error." + ) + return + + # Update an existing link + if old_aoc_name := await self.account_links.get(ctx.author.id): + log.info(f"Changing link for {ctx.author} ({ctx.author.id}) from {old_aoc_name} to {aoc_name}.") + await self.account_links.set(ctx.author.id, aoc_name) + await ctx.reply(f"Your linked account has been changed to {aoc_name}.") + else: + # Create a new link + log.info(f"Linking {ctx.author} ({ctx.author.id}) to account {aoc_name}.") + await self.account_links.set(ctx.author.id, aoc_name) + await ctx.reply(f"You have linked your Discord ID to {aoc_name}.") + else: + # User has not supplied a name, let's check if they're in the cache or not + if cache_name := await self.account_links.get(ctx.author.id): + await ctx.reply(f"You have already linked an Advent of Code account: {cache_name}.") + else: + await ctx.reply( + "You have not linked an Advent of Code account." + " Please re-run the command with one specified." + ) + + @in_month(Month.NOVEMBER, Month.DECEMBER) + @adventofcode_group.command( + name="unlink", + aliases=("disconnect",), + brief="Tie your Discord account with your Advent of Code name." + ) + @whitelist_override(channels=AOC_WHITELIST) + async def aoc_unlink_account(self, ctx: commands.Context) -> None: + """ + Unlink your Discord ID with your Advent of Code leaderboard name. + + Deletes the entry that was Stored in the Redis cache. + """ + if aoc_cache_name := await self.account_links.get(ctx.author.id): + log.info(f"Unlinking {ctx.author} ({ctx.author.id}) from Advent of Code account {aoc_cache_name}") + await self.account_links.delete(ctx.author.id) + await ctx.reply(f"We have removed the link between your Discord ID and {aoc_cache_name}.") + else: + log.info(f"Attempted to unlink {ctx.author} ({ctx.author.id}), but no link was found.") + await ctx.reply("You don't have an Advent of Code account linked.") + @in_month(Month.DECEMBER) @adventofcode_group.command( - name="leaderboard", - aliases=("board", "lb"), - brief="Get a snapshot of the PyDis private AoC leaderboard", + name="dayandstar", + aliases=("daynstar", "daystar"), + brief="Get a view that lets you filter the leaderboard by day and star", ) @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) - async def aoc_leaderboard( + async def aoc_day_and_star_leaderboard( self, ctx: commands.Context, - day_and_star: Optional[bool] = False, - maximum_scorers: Optional[int] = 10 + maximum_scorers_day_and_star: Optional[int] = 10 ) -> None: - """ - Get the current top scorers of the Python Discord Leaderboard. - - Additionally, you can provide an argument `day_and_star` (Boolean) to have the bot send a View - that will let you filter by day and star. - """ - if maximum_scorers > AocConfig.max_day_and_star_results or maximum_scorers <= 0: + """Have the bot send a View that will let you filter the leaderboard by day and star.""" + if maximum_scorers_day_and_star > AocConfig.max_day_and_star_results or maximum_scorers_day_and_star <= 0: raise commands.BadArgument( f"The maximum number of results you can query is {AocConfig.max_day_and_star_results}" ) @@ -209,25 +350,12 @@ class AdventOfCode(commands.Cog): except _helpers.FetchingLeaderboardFailedError: await ctx.send(":x: Unable to fetch leaderboard!") return - if not day_and_star: - - number_of_participants = leaderboard["number_of_participants"] - - top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) - header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}" - - table = f"```\n{leaderboard['top_leaderboard']}\n```" - info_embed = _helpers.get_summary_embed(leaderboard) - - await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) - return - # This is a dictionary that contains solvers in respect of day, and star. # e.g. 1-1 means the solvers of the first star of the first day and their completion time per_day_and_star = json.loads(leaderboard['leaderboard_per_day_and_star']) view = AoCDropdownView( day_and_star_data=per_day_and_star, - maximum_scorers=maximum_scorers, + maximum_scorers=maximum_scorers_day_and_star, original_author=ctx.author ) message = await ctx.send( @@ -239,6 +367,51 @@ class AdventOfCode(commands.Cog): @in_month(Month.DECEMBER) @adventofcode_group.command( + name="leaderboard", + aliases=("board", "lb"), + brief="Get a snapshot of the PyDis private AoC leaderboard", + ) + @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) + async def aoc_leaderboard(self, ctx: commands.Context, *, aoc_name: Optional[str] = None) -> None: + """ + Get the current top scorers of the Python Discord Leaderboard. + + Additionally you can specify an `aoc_name` that will append the + specified profile's personal stats to the top of the leaderboard + """ + # Strip quotes from the AoC username if needed (e.g. "My Name" -> My Name) + # This is to keep compatibility with those already used to wrapping the AoC name in quotes + # Note: only strips one layer of quotes to allow names with quotes at the start and end + # e.g. ""My Name"" -> "My Name" + if aoc_name and aoc_name.startswith('"') and aoc_name.endswith('"'): + aoc_name = aoc_name[1:-1] + + # Check if an advent of code account is linked in the Redis Cache if aoc_name is not given + if (aoc_cache_name := await self.account_links.get(ctx.author.id)) and aoc_name is None: + aoc_name = aoc_cache_name + + async with ctx.typing(): + try: + leaderboard = await _helpers.fetch_leaderboard(self_placement_name=aoc_name) + except _helpers.FetchingLeaderboardFailedError: + await ctx.send(":x: Unable to fetch leaderboard!") + return + + number_of_participants = leaderboard["number_of_participants"] + + top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) + self_placement_header = " (and your personal stats compared to the top 10)" if aoc_name else "" + header = f"Here's our current top {top_count}{self_placement_header}! {Emojis.christmas_tree * 3}" + table = "```\n" \ + f"{leaderboard['placement_leaderboard'] if aoc_name else leaderboard['top_leaderboard']}" \ + "\n```" + info_embed = _helpers.get_summary_embed(leaderboard) + + await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) + return + + @in_month(Month.DECEMBER) + @adventofcode_group.command( name="global", aliases=("globalboard", "gb"), brief="Get a link to the global leaderboard", @@ -284,7 +457,7 @@ class AdventOfCode(commands.Cog): info_embed = _helpers.get_summary_embed(leaderboard) await ctx.send(f"```\n{table}\n```", embed=info_embed) - @with_role(Roles.admin) + @with_role(Roles.admins) @adventofcode_group.command( name="refresh", aliases=("fetch",), @@ -310,6 +483,7 @@ class AdventOfCode(commands.Cog): log.debug("Unloading the cog and canceling the background task.") self.notification_task.cancel() self.status_task.cancel() + self.completionist_task.cancel() def _build_about_embed(self) -> discord.Embed: """Build and return the informational "About AoC" embed from the resources file.""" diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py index af64bc81..807cc275 100644 --- a/bot/exts/events/advent_of_code/_helpers.py +++ b/bot/exts/events/advent_of_code/_helpers.py @@ -10,6 +10,7 @@ from typing import Any, Optional import aiohttp import arrow import discord +from discord.ext import commands from bot.bot import Bot from bot.constants import AdventOfCode, Channels, Colours @@ -70,6 +71,33 @@ class FetchingLeaderboardFailedError(Exception): """Raised when one or more leaderboards could not be fetched at all.""" +def _format_leaderboard_line(rank: int, data: dict[str, Any], *, is_author: bool) -> str: + """ + Build a string representing a line of the leaderboard. + + Parameters: + rank: + Rank in the leaderboard of this entry. + + data: + Mapping with entry information. + + Keyword arguments: + is_author: + Whether to address the name displayed in the returned line + personally. + + Returns: + A formatted line for the leaderboard. + """ + return AOC_TABLE_TEMPLATE.format( + rank=rank, + name=data['name'] if not is_author else f"(You) {data['name']}", + score=str(data['score']), + stars=f"({data['star_1']}, {data['star_2']})" + ) + + def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]: """ Provide a sorting value for our leaderboard. @@ -160,10 +188,23 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard, 'per_day_and_star': per_day_star_stats} -def _format_leaderboard(leaderboard: dict[str, dict]) -> str: +def _format_leaderboard(leaderboard: dict[str, dict], self_placement_name: str = None) -> str: """Format the leaderboard using the AOC_TABLE_TEMPLATE.""" leaderboard_lines = [HEADER] + self_placement_exists = False for rank, data in enumerate(leaderboard.values(), start=1): + if self_placement_name and data["name"].lower() == self_placement_name.lower(): + leaderboard_lines.insert( + 1, + AOC_TABLE_TEMPLATE.format( + rank=rank, + name=f"(You) {data['name']}", + score=str(data["score"]), + stars=f"({data['star_1']}, {data['star_2']})" + ) + ) + self_placement_exists = True + continue leaderboard_lines.append( AOC_TABLE_TEMPLATE.format( rank=rank, @@ -172,7 +213,13 @@ def _format_leaderboard(leaderboard: dict[str, dict]) -> str: stars=f"({data['star_1']}, {data['star_2']})" ) ) - + if self_placement_name and not self_placement_exists: + raise commands.BadArgument( + "Sorry, your profile does not exist in this leaderboard." + "\n\n" + "To join our leaderboard, run the command `.aoc join`." + " If you've joined recently, please wait up to 30 minutes for our leaderboard to refresh." + ) return "\n".join(leaderboard_lines) @@ -260,7 +307,7 @@ def _get_top_leaderboard(full_leaderboard: str) -> str: @_caches.leaderboard_cache.atomic_transaction -async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: +async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: str = None) -> dict: """ Get the current Python Discord combined leaderboard. @@ -270,7 +317,6 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: miss, this function is locked to one call at a time using a decorator. """ cached_leaderboard = await _caches.leaderboard_cache.to_dict() - # Check if the cached leaderboard contains everything we expect it to. If it # does not, this probably means the cache has not been created yet or has # expired in Redis. This check also accounts for a malformed cache. @@ -289,6 +335,7 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() cached_leaderboard = { + "placement_leaderboard": json.dumps(raw_leaderboard_data), "full_leaderboard": formatted_leaderboard, "top_leaderboard": _get_top_leaderboard(formatted_leaderboard), "full_leaderboard_url": full_leaderboard_url, @@ -307,7 +354,13 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: _caches.leaderboard_cache.namespace, AdventOfCode.leaderboard_cache_expiry_seconds ) - + if self_placement_name: + formatted_placement_leaderboard = _parse_raw_leaderboard_data( + json.loads(cached_leaderboard["placement_leaderboard"]) + )["leaderboard"] + cached_leaderboard["placement_leaderboard"] = _get_top_leaderboard( + _format_leaderboard(formatted_placement_leaderboard, self_placement_name=self_placement_name) + ) return cached_leaderboard diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py index 243db32e..a0bfa316 100644 --- a/bot/exts/events/advent_of_code/views/dayandstarview.py +++ b/bot/exts/events/advent_of_code/views/dayandstarview.py @@ -17,14 +17,19 @@ class AoCDropdownView(discord.ui.View): self.original_author = original_author def generate_output(self) -> str: - """Generates a formatted codeblock with AoC statistics based on the currently selected day and star.""" + """ + Generates a formatted codeblock with AoC statistics based on the currently selected day and star. + + Optionally, when the requested day and star data does not exist yet it returns an error message. + """ header = AOC_DAY_AND_STAR_TEMPLATE.format( rank="Rank", name="Name", completion_time="Completion time (UTC)" ) lines = [f"{header}\n{'-' * (len(header) + 2)}"] - - for rank, scorer in enumerate(self.data[f"{self.day}-{self.star}"][:self.maximum_scorers]): + if not (day_and_star_data := self.data.get(f"{self.day}-{self.star}")): + return ":x: The requested data for the specified day and star does not exist yet." + for rank, scorer in enumerate(day_and_star_data[:self.maximum_scorers]): time_data = datetime.fromtimestamp(scorer['completion_time']).strftime("%I:%M:%S %p") lines.append(AOC_DAY_AND_STAR_TEMPLATE.format( datastamp="", diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py index de51339d..182fa9d9 100644 --- a/bot/exts/fun/snakes/_utils.py +++ b/bot/exts/fun/snakes/_utils.py @@ -6,13 +6,14 @@ import math import random from itertools import product from pathlib import Path +from typing import Union from PIL import Image from PIL.ImageDraw import ImageDraw -from discord import File, Member, Reaction +from discord import File, Member, Reaction, User from discord.ext.commands import Cog, Context -from bot.constants import Roles +from bot.constants import MODERATION_ROLES SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute() @@ -395,7 +396,7 @@ class SnakeAndLaddersGame: Listen for reactions until players have joined, and the game has been started. """ - def startup_event_check(reaction_: Reaction, user_: Member) -> bool: + def startup_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -460,7 +461,7 @@ class SnakeAndLaddersGame: await self.cancel_game() return # We're done, no reactions for the last 5 minutes - async def _add_player(self, user: Member) -> None: + async def _add_player(self, user: Union[User, Member]) -> None: """Add player to game.""" self.players.append(user) self.player_tiles[user.id] = 1 @@ -469,7 +470,7 @@ class SnakeAndLaddersGame: im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE)) self.avatar_images[user.id] = im - async def player_join(self, user: Member) -> None: + async def player_join(self, user: Union[User, Member]) -> None: """ Handle players joining the game. @@ -495,7 +496,7 @@ class SnakeAndLaddersGame: delete_after=10 ) - async def player_leave(self, user: Member) -> bool: + async def player_leave(self, user: Union[User, Member]) -> bool: """ Handle players leaving the game. @@ -530,7 +531,7 @@ class SnakeAndLaddersGame: await self.channel.send("**Snakes and Ladders**: Game has been canceled.") self._destruct() - async def start_game(self, user: Member) -> None: + async def start_game(self, user: Union[User, Member]) -> None: """ Allow the game author to begin the game. @@ -551,7 +552,7 @@ class SnakeAndLaddersGame: async def start_round(self) -> None: """Begin the round.""" - def game_event_check(reaction_: Reaction, user_: Member) -> bool: + def game_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -644,7 +645,7 @@ class SnakeAndLaddersGame: if not is_surrendered: await self._complete_round() - async def player_roll(self, user: Member) -> None: + async def player_roll(self, user: Union[User, Member]) -> None: """Handle the player's roll.""" if user.id not in self.player_tiles: await self.channel.send(user.mention + " You are not in the match.", delete_after=10) @@ -691,7 +692,7 @@ class SnakeAndLaddersGame: await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:") self._destruct() - def _check_winner(self) -> Member: + def _check_winner(self) -> Union[User, Member]: """Return a winning member if we're in the post-round state and there's a winner.""" if self.state != "post_round": return None @@ -716,6 +717,6 @@ class SnakeAndLaddersGame: return x_level, y_level @staticmethod - def _is_moderator(user: Member) -> bool: + def _is_moderator(user: Union[User, Member]) -> bool: """Return True if the user is a Moderator.""" - return any(Roles.moderator == role.id for role in user.roles) + return any(role.id in MODERATION_ROLES for role in getattr(user, 'roles', [])) diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py index 712c8a12..4a1cec5b 100644 --- a/bot/exts/fun/trivia_quiz.py +++ b/bot/exts/fun/trivia_quiz.py @@ -16,7 +16,7 @@ from discord.ext import commands, tasks from rapidfuzz import fuzz from bot.bot import Bot -from bot.constants import Client, Colours, NEGATIVE_REPLIES, Roles +from bot.constants import Client, Colours, MODERATION_ROLES, NEGATIVE_REPLIES logger = logging.getLogger(__name__) @@ -550,7 +550,7 @@ class TriviaQuiz(commands.Cog): if self.game_status[ctx.channel.id]: # Check if the author is the game starter or a moderator. if ctx.author == self.game_owners[ctx.channel.id] or any( - Roles.moderator == role.id for role in ctx.author.roles + role.id in MODERATION_ROLES for role in getattr(ctx.author, 'roles', []) ): self.game_status[ctx.channel.id] = False del self.game_owners[ctx.channel.id] diff --git a/bot/exts/holidays/easter/earth_photos.py b/bot/exts/holidays/easter/earth_photos.py index f65790af..27442f1c 100644 --- a/bot/exts/holidays/easter/earth_photos.py +++ b/bot/exts/holidays/easter/earth_photos.py @@ -4,8 +4,7 @@ import discord from discord.ext import commands from bot.bot import Bot -from bot.constants import Colours -from bot.constants import Tokens +from bot.constants import Colours, Tokens log = logging.getLogger(__name__) diff --git a/bot/exts/holidays/halloween/candy_collection.py b/bot/exts/holidays/halloween/candy_collection.py index 079d900d..729bbc97 100644 --- a/bot/exts/holidays/halloween/candy_collection.py +++ b/bot/exts/holidays/halloween/candy_collection.py @@ -83,6 +83,11 @@ class CandyCollection(commands.Cog): # if its not a candy or skull, and it is one of 10 most recent messages, # proceed to add a skull/candy with higher chance if str(reaction.emoji) not in (EMOJIS["SKULL"], EMOJIS["CANDY"]): + # Ensure the reaction is not for a bot's message so users can't spam + # reaction buttons like in .help to get candies. + if message.author.bot: + return + recent_message_ids = map( lambda m: m.id, await self.hacktober_channel.history(limit=10).flatten() @@ -182,6 +187,12 @@ class CandyCollection(commands.Cog): for index, record in enumerate(top_five) ) if top_five else "No Candies" + def get_user_candy_score() -> str: + for user_id, score in records: + if user_id == ctx.author.id: + return f"{ctx.author.mention}: {score}" + return f"{ctx.author.mention}: 0" + e = discord.Embed(colour=discord.Colour.og_blurple()) e.add_field( name="Top Candy Records", @@ -189,6 +200,11 @@ class CandyCollection(commands.Cog): inline=False ) e.add_field( + name="Your Candy Score", + value=get_user_candy_score(), + inline=False + ) + e.add_field( name="\u200b", value="Candies will randomly appear on messages sent. " "\nHit the candy when it appears as fast as possible to get the candy! " diff --git a/bot/exts/holidays/halloween/scarymovie.py b/bot/exts/holidays/halloween/scarymovie.py index 33659fd8..89310b97 100644 --- a/bot/exts/holidays/halloween/scarymovie.py +++ b/bot/exts/holidays/halloween/scarymovie.py @@ -6,6 +6,7 @@ from discord.ext import commands from bot.bot import Bot from bot.constants import Tokens + log = logging.getLogger(__name__) diff --git a/bot/exts/holidays/hanukkah/hanukkah_embed.py b/bot/exts/holidays/hanukkah/hanukkah_embed.py index ac3eab7b..5767f91e 100644 --- a/bot/exts/holidays/hanukkah/hanukkah_embed.py +++ b/bot/exts/holidays/hanukkah/hanukkah_embed.py @@ -21,45 +21,41 @@ class HanukkahEmbed(commands.Cog): def __init__(self, bot: Bot): self.bot = bot - self.hanukkah_days = [] - self.hanukkah_months = [] - self.hanukkah_years = [] + self.hanukkah_dates: list[datetime.date] = [] - async def get_hanukkah_dates(self) -> list[str]: + def _parse_time_to_datetime(self, date: list[str]) -> datetime.datetime: + """Format the times provided by the api to datetime forms.""" + try: + return datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z") + except ValueError: + # there is a possibility of an event not having a time, just a day + # to catch this, we try again without time information + return datetime.datetime.strptime(date, "%Y-%m-%d") + + async def fetch_hanukkah_dates(self) -> list[datetime.date]: """Gets the dates for hanukkah festival.""" - hanukkah_dates = [] + # clear the datetime objects to prevent a memory link + self.hanukkah_dates = [] async with self.bot.http_session.get(HEBCAL_URL) as response: json_data = await response.json() festivals = json_data["items"] for festival in festivals: if festival["title"].startswith("Chanukah"): date = festival["date"] - hanukkah_dates.append(date) - return hanukkah_dates + self.hanukkah_dates.append(self._parse_time_to_datetime(date).date()) + return self.hanukkah_dates @in_month(Month.NOVEMBER, Month.DECEMBER) @commands.command(name="hanukkah", aliases=("chanukah",)) async def hanukkah_festival(self, ctx: commands.Context) -> None: """Tells you about the Hanukkah Festivaltime of festival, festival day, etc).""" - hanukkah_dates = await self.get_hanukkah_dates() - self.hanukkah_dates_split(hanukkah_dates) - hanukkah_start_day = int(self.hanukkah_days[0]) - hanukkah_start_month = int(self.hanukkah_months[0]) - hanukkah_start_year = int(self.hanukkah_years[0]) - hanukkah_end_day = int(self.hanukkah_days[8]) - hanukkah_end_month = int(self.hanukkah_months[8]) - hanukkah_end_year = int(self.hanukkah_years[8]) - - hanukkah_start = datetime.date(hanukkah_start_year, hanukkah_start_month, hanukkah_start_day) - hanukkah_end = datetime.date(hanukkah_end_year, hanukkah_end_month, hanukkah_end_day) + hanukkah_dates = await self.fetch_hanukkah_dates() + start_day = hanukkah_dates[0] + end_day = hanukkah_dates[-1] today = datetime.date.today() - # today = datetime.date(2019, 12, 24) (for testing) - day = str(today.day) - month = str(today.month) - year = str(today.year) embed = Embed(title="Hanukkah", colour=Colours.blue) - if day in self.hanukkah_days and month in self.hanukkah_months and year in self.hanukkah_years: - if int(day) == hanukkah_start_day: + if start_day <= today <= end_day: + if start_day == today: now = datetime.datetime.utcnow() hours = now.hour + 4 # using only hours hanukkah_start_hour = 18 @@ -77,35 +73,27 @@ class HanukkahEmbed(commands.Cog): ) await ctx.send(embed=embed) return - festival_day = self.hanukkah_days.index(day) + festival_day = hanukkah_dates.index(today) number_suffixes = ["st", "nd", "rd", "th"] suffix = number_suffixes[festival_day - 1 if festival_day <= 3 else 3] message = ":menorah:" * festival_day - embed.description = f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}" - await ctx.send(embed=embed) + embed.description = ( + f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}" + ) + elif today < start_day: + format_start = start_day.strftime("%d of %B") + embed.description = ( + "Hanukkah has not started yet. " + f"Hanukkah will start at sundown on {format_start}." + ) else: - if today < hanukkah_start: - festival_starting_month = hanukkah_start.strftime("%B") - embed.description = ( - f"Hanukkah has not started yet. " - f"Hanukkah will start at sundown on {hanukkah_start_day}th " - f"of {festival_starting_month}." - ) - else: - festival_end_month = hanukkah_end.strftime("%B") - embed.description = ( - f"Looks like you missed Hanukkah!" - f"Hanukkah ended on {hanukkah_end_day}th of {festival_end_month}." - ) - - await ctx.send(embed=embed) + format_end = end_day.strftime("%d of %B") + embed.description = ( + "Looks like you missed Hanukkah! " + f"Hanukkah ended on {format_end}." + ) - def hanukkah_dates_split(self, hanukkah_dates: list[str]) -> None: - """We are splitting the dates for hanukkah into days, months and years.""" - for date in hanukkah_dates: - self.hanukkah_days.append(date[8:10]) - self.hanukkah_months.append(date[5:7]) - self.hanukkah_years.append(date[0:4]) + await ctx.send(embed=embed) def setup(bot: Bot) -> None: diff --git a/bot/exts/holidays/valentines/be_my_valentine.py b/bot/exts/holidays/valentines/be_my_valentine.py index 4d454c3a..1572d474 100644 --- a/bot/exts/holidays/valentines/be_my_valentine.py +++ b/bot/exts/holidays/valentines/be_my_valentine.py @@ -7,14 +7,16 @@ import discord from discord.ext import commands from bot.bot import Bot -from bot.constants import Channels, Colours, Lovefest, Month +from bot.constants import Channels, Colours, Lovefest, Month, PYTHON_PREFIX from bot.utils.decorators import in_month -from bot.utils.extensions import invoke_help_command +from bot.utils.exceptions import MovedCommandError log = logging.getLogger(__name__) HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_heart:", ":two_hearts:"] +MOVED_COMMAND = f"{PYTHON_PREFIX}subscribe" + class BeMyValentine(commands.Cog): """A cog that sends Valentines to other users!""" @@ -30,40 +32,14 @@ class BeMyValentine(commands.Cog): return loads(p.read_text("utf8")) @in_month(Month.FEBRUARY) - @commands.group(name="lovefest") + @commands.command(name="lovefest", help=f"NOTE: This command has been moved to {MOVED_COMMAND}") async def lovefest_role(self, ctx: commands.Context) -> None: """ - Subscribe or unsubscribe from the lovefest role. - - The lovefest role makes you eligible to receive anonymous valentines from other users. + Deprecated lovefest role command. - 1) use the command \".lovefest sub\" to get the lovefest role. - 2) use the command \".lovefest unsub\" to get rid of the lovefest role. + This command has been moved to bot, and will be removed in the future. """ - if not ctx.invoked_subcommand: - await invoke_help_command(ctx) - - @lovefest_role.command(name="sub") - async def add_role(self, ctx: commands.Context) -> None: - """Adds the lovefest role.""" - user = ctx.author - role = ctx.guild.get_role(Lovefest.role_id) - if role not in ctx.author.roles: - await user.add_roles(role) - await ctx.send("The Lovefest role has been added !") - else: - await ctx.send("You already have the role !") - - @lovefest_role.command(name="unsub") - async def remove_role(self, ctx: commands.Context) -> None: - """Removes the lovefest role.""" - user = ctx.author - role = ctx.guild.get_role(Lovefest.role_id) - if role not in ctx.author.roles: - await ctx.send("You dont have the lovefest role.") - else: - await user.remove_roles(role) - await ctx.send("The lovefest role has been successfully removed!") + raise MovedCommandError(MOVED_COMMAND) @commands.cooldown(1, 1800, commands.BucketType.user) @commands.group(name="bemyvalentine", invoke_without_command=True) diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py index a11c366b..b50205a0 100644 --- a/bot/exts/utilities/bookmark.py +++ b/bot/exts/utilities/bookmark.py @@ -102,7 +102,7 @@ class Bookmark(commands.Cog): "You must either provide a valid message to bookmark, or reply to one." "\n\nThe lookup strategy for a message is as follows (in order):" "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" - "\n2. Lookup by message ID (the message **must** have been sent after the bot last started)" + "\n2. Lookup by message ID (the message **must** be in the context channel)" "\n3. Lookup by message URL" ) target_message = ctx.message.reference.resolved diff --git a/bot/exts/utilities/challenges.py b/bot/exts/utilities/challenges.py index 234eb0be..ab7ae442 100644 --- a/bot/exts/utilities/challenges.py +++ b/bot/exts/utilities/challenges.py @@ -162,13 +162,20 @@ class Challenges(commands.Cog): kata_description = "\n".join(kata_description[:1000].split("\n")[:-1]) + "..." kata_description += f" [continue reading]({kata_url})" + if kata_information["rank"]["name"] is None: + embed_color = 8 + kata_difficulty = "Unable to retrieve difficulty for beta languages." + else: + embed_color = int(kata_information["rank"]["name"].replace(" kyu", "")) + kata_difficulty = kata_information["rank"]["name"] + kata_embed = Embed( title=kata_information["name"], description=kata_description, - color=MAPPING_OF_KYU[int(kata_information["rank"]["name"].replace(" kyu", ""))], + color=MAPPING_OF_KYU[embed_color], url=kata_url ) - kata_embed.add_field(name="Difficulty", value=kata_information["rank"]["name"], inline=False) + kata_embed.add_field(name="Difficulty", value=kata_difficulty, inline=False) return kata_embed @staticmethod @@ -268,30 +275,29 @@ class Challenges(commands.Cog): `.challenge <language> <query>, <difficulty>` - Pulls a random challenge with the query provided, under that difficulty within the language's scope. """ - if language.lower() not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]: + language = language.lower() + if language not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]: raise commands.BadArgument("This is not a recognized language on codewars.com!") get_kata_link = f"https://codewars.com/kata/search/{language}" params = {} - if language and not query: - level = f"-{choice([1, 2, 3, 4, 5, 6, 7, 8])}" - params["r[]"] = level - elif "," in query: - query_splitted = query.split("," if ", " not in query else ", ") + if query is not None: + if "," in query: + query_splitted = query.split("," if ", " not in query else ", ") - if len(query_splitted) > 2: - raise commands.BadArgument( - "There can only be one comma within the query, separating the difficulty and the query itself." - ) + if len(query_splitted) > 2: + raise commands.BadArgument( + "There can only be one comma within the query, separating the difficulty and the query itself." + ) - query, level = query_splitted - params["q"] = query - params["r[]"] = f"-{level}" - elif query.isnumeric(): - params["r[]"] = f"-{query}" - else: - params["q"] = query + query, level = query_splitted + params["q"] = query + params["r[]"] = f"-{level}" + elif query.isnumeric(): + params["r[]"] = f"-{query}" + else: + params["q"] = query params["beta"] = str(language in SUPPORTED_LANGUAGES["beta"]).lower() diff --git a/bot/exts/utilities/colour.py b/bot/exts/utilities/colour.py new file mode 100644 index 00000000..ee6bad93 --- /dev/null +++ b/bot/exts/utilities/colour.py @@ -0,0 +1,266 @@ +import colorsys +import json +import pathlib +import random +import string +from io import BytesIO +from typing import Optional + +import discord +import rapidfuzz +from PIL import Image, ImageColor +from discord.ext import commands + +from bot import constants +from bot.bot import Bot +from bot.exts.core.extensions import invoke_help_command +from bot.utils.decorators import whitelist_override + +THUMBNAIL_SIZE = (80, 80) + + +class Colour(commands.Cog): + """Cog for the Colour command.""" + + def __init__(self, bot: Bot): + self.bot = bot + with open(pathlib.Path("bot/resources/utilities/ryanzec_colours.json")) as f: + self.colour_mapping = json.load(f) + del self.colour_mapping['_'] # Delete source credit entry + + async def send_colour_response(self, ctx: commands.Context, rgb: tuple[int, int, int]) -> None: + """Create and send embed from user given colour information.""" + name = self._rgb_to_name(rgb) + try: + colour_or_color = ctx.invoked_parents[0] + except IndexError: + colour_or_color = "colour" + + colour_mode = ctx.invoked_with + if colour_mode == "random": + colour_mode = colour_or_color + input_colour = name + elif colour_mode in ("colour", "color"): + input_colour = ctx.kwargs["colour_input"] + elif colour_mode == "name": + input_colour = ctx.kwargs["user_colour_name"] + elif colour_mode == "hex": + input_colour = ctx.args[2:][0] + if len(input_colour) > 7: + input_colour = input_colour[0:-2] + else: + input_colour = tuple(ctx.args[2:]) + + if colour_mode not in ("name", "hex", "random", "color", "colour"): + colour_mode = colour_mode.upper() + else: + colour_mode = colour_mode.title() + + colour_embed = discord.Embed( + title=f"{name or input_colour}", + description=f"{colour_or_color.title()} information for {colour_mode} `{input_colour or name}`.", + colour=discord.Color.from_rgb(*rgb) + ) + colour_conversions = self.get_colour_conversions(rgb) + for colour_space, value in colour_conversions.items(): + colour_embed.add_field( + name=colour_space, + value=f"`{value}`", + inline=True + ) + + thumbnail = Image.new("RGB", THUMBNAIL_SIZE, color=rgb) + buffer = BytesIO() + thumbnail.save(buffer, "PNG") + buffer.seek(0) + thumbnail_file = discord.File(buffer, filename="colour.png") + + colour_embed.set_thumbnail(url="attachment://colour.png") + + await ctx.send(file=thumbnail_file, embed=colour_embed) + + @commands.group(aliases=("color",), invoke_without_command=True) + @whitelist_override( + channels=constants.WHITELISTED_CHANNELS, + roles=constants.STAFF_ROLES, + categories=[constants.Categories.development, constants.Categories.media] + ) + async def colour(self, ctx: commands.Context, *, colour_input: Optional[str] = None) -> None: + """ + Create an embed that displays colour information. + + If no subcommand is called, a randomly selected colour will be shown. + """ + if colour_input is None: + await self.random(ctx) + return + + try: + extra_colour = ImageColor.getrgb(colour_input) + await self.send_colour_response(ctx, extra_colour) + except ValueError: + await invoke_help_command(ctx) + + @colour.command() + async def rgb(self, ctx: commands.Context, red: int, green: int, blue: int) -> None: + """Create an embed from an RGB input.""" + if any(c not in range(256) for c in (red, green, blue)): + raise commands.BadArgument( + message=f"RGB values can only be from 0 to 255. User input was: `{red, green, blue}`." + ) + rgb_tuple = (red, green, blue) + await self.send_colour_response(ctx, rgb_tuple) + + @colour.command() + async def hsv(self, ctx: commands.Context, hue: int, saturation: int, value: int) -> None: + """Create an embed from an HSV input.""" + if (hue not in range(361)) or any(c not in range(101) for c in (saturation, value)): + raise commands.BadArgument( + message="Hue can only be from 0 to 360. Saturation and Value can only be from 0 to 100. " + f"User input was: `{hue, saturation, value}`." + ) + hsv_tuple = ImageColor.getrgb(f"hsv({hue}, {saturation}%, {value}%)") + await self.send_colour_response(ctx, hsv_tuple) + + @colour.command() + async def hsl(self, ctx: commands.Context, hue: int, saturation: int, lightness: int) -> None: + """Create an embed from an HSL input.""" + if (hue not in range(361)) or any(c not in range(101) for c in (saturation, lightness)): + raise commands.BadArgument( + message="Hue can only be from 0 to 360. Saturation and Lightness can only be from 0 to 100. " + f"User input was: `{hue, saturation, lightness}`." + ) + hsl_tuple = ImageColor.getrgb(f"hsl({hue}, {saturation}%, {lightness}%)") + await self.send_colour_response(ctx, hsl_tuple) + + @colour.command() + async def cmyk(self, ctx: commands.Context, cyan: int, magenta: int, yellow: int, key: int) -> None: + """Create an embed from a CMYK input.""" + if any(c not in range(101) for c in (cyan, magenta, yellow, key)): + raise commands.BadArgument( + message=f"CMYK values can only be from 0 to 100. User input was: `{cyan, magenta, yellow, key}`." + ) + r = round(255 * (1 - (cyan / 100)) * (1 - (key / 100))) + g = round(255 * (1 - (magenta / 100)) * (1 - (key / 100))) + b = round(255 * (1 - (yellow / 100)) * (1 - (key / 100))) + await self.send_colour_response(ctx, (r, g, b)) + + @colour.command() + async def hex(self, ctx: commands.Context, hex_code: str) -> None: + """Create an embed from a HEX input.""" + if hex_code[0] != "#": + hex_code = f"#{hex_code}" + + if len(hex_code) not in (4, 5, 7, 9) or any(digit not in string.hexdigits for digit in hex_code[1:]): + raise commands.BadArgument( + message=f"Cannot convert `{hex_code}` to a recognizable Hex format. " + "Hex values must be hexadecimal and take the form *#RRGGBB* or *#RGB*." + ) + + hex_tuple = ImageColor.getrgb(hex_code) + if len(hex_tuple) == 4: + hex_tuple = hex_tuple[:-1] # Colour must be RGB. If RGBA, we remove the alpha value + await self.send_colour_response(ctx, hex_tuple) + + @colour.command() + async def name(self, ctx: commands.Context, *, user_colour_name: str) -> None: + """Create an embed from a name input.""" + hex_colour = self.match_colour_name(ctx, user_colour_name) + if hex_colour is None: + name_error_embed = discord.Embed( + title="No colour match found.", + description=f"No colour found for: `{user_colour_name}`", + colour=discord.Color.dark_red() + ) + await ctx.send(embed=name_error_embed) + return + hex_tuple = ImageColor.getrgb(hex_colour) + await self.send_colour_response(ctx, hex_tuple) + + @colour.command() + async def random(self, ctx: commands.Context) -> None: + """Create an embed from a randomly chosen colour.""" + hex_colour = random.choice(list(self.colour_mapping.values())) + hex_tuple = ImageColor.getrgb(f"#{hex_colour}") + await self.send_colour_response(ctx, hex_tuple) + + def get_colour_conversions(self, rgb: tuple[int, int, int]) -> dict[str, str]: + """Create a dictionary mapping of colour types and their values.""" + colour_name = self._rgb_to_name(rgb) + if colour_name is None: + colour_name = "No match found" + return { + "RGB": rgb, + "HSV": self._rgb_to_hsv(rgb), + "HSL": self._rgb_to_hsl(rgb), + "CMYK": self._rgb_to_cmyk(rgb), + "Hex": self._rgb_to_hex(rgb), + "Name": colour_name + } + + @staticmethod + def _rgb_to_hsv(rgb: tuple[int, int, int]) -> tuple[int, int, int]: + """Convert RGB values to HSV values.""" + rgb_list = [val / 255 for val in rgb] + h, s, v = colorsys.rgb_to_hsv(*rgb_list) + hsv = (round(h * 360), round(s * 100), round(v * 100)) + return hsv + + @staticmethod + def _rgb_to_hsl(rgb: tuple[int, int, int]) -> tuple[int, int, int]: + """Convert RGB values to HSL values.""" + rgb_list = [val / 255.0 for val in rgb] + h, l, s = colorsys.rgb_to_hls(*rgb_list) + hsl = (round(h * 360), round(s * 100), round(l * 100)) + return hsl + + @staticmethod + def _rgb_to_cmyk(rgb: tuple[int, int, int]) -> tuple[int, int, int, int]: + """Convert RGB values to CMYK values.""" + rgb_list = [val / 255.0 for val in rgb] + if not any(rgb_list): + return 0, 0, 0, 100 + k = 1 - max(rgb_list) + c = round((1 - rgb_list[0] - k) * 100 / (1 - k)) + m = round((1 - rgb_list[1] - k) * 100 / (1 - k)) + y = round((1 - rgb_list[2] - k) * 100 / (1 - k)) + cmyk = (c, m, y, round(k * 100)) + return cmyk + + @staticmethod + def _rgb_to_hex(rgb: tuple[int, int, int]) -> str: + """Convert RGB values to HEX code.""" + hex_ = "".join([hex(val)[2:].zfill(2) for val in rgb]) + hex_code = f"#{hex_}".upper() + return hex_code + + def _rgb_to_name(self, rgb: tuple[int, int, int]) -> Optional[str]: + """Convert RGB values to a fuzzy matched name.""" + input_hex_colour = self._rgb_to_hex(rgb) + try: + match, certainty, _ = rapidfuzz.process.extractOne( + query=input_hex_colour, + choices=self.colour_mapping.values(), + score_cutoff=80 + ) + colour_name = [name for name, hex_code in self.colour_mapping.items() if hex_code == match][0] + except TypeError: + colour_name = None + return colour_name + + def match_colour_name(self, ctx: commands.Context, input_colour_name: str) -> Optional[str]: + """Convert a colour name to HEX code.""" + try: + match, certainty, _ = rapidfuzz.process.extractOne( + query=input_colour_name, + choices=self.colour_mapping.keys(), + score_cutoff=80 + ) + except (ValueError, TypeError): + return + return f"#{self.colour_mapping[match]}" + + +def setup(bot: Bot) -> None: + """Load the Colour cog.""" + bot.add_cog(Colour(bot)) diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py index 36655e1b..b6d5a43e 100644 --- a/bot/exts/utilities/issues.py +++ b/bot/exts/utilities/issues.py @@ -9,14 +9,7 @@ from discord.ext import commands from bot.bot import Bot from bot.constants import ( - Categories, - Channels, - Colours, - ERROR_REPLIES, - Emojis, - NEGATIVE_REPLIES, - Tokens, - WHITELISTED_CHANNELS + Categories, Channels, Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens, WHITELISTED_CHANNELS ) from bot.utils.decorators import whitelist_override from bot.utils.extensions import invoke_help_command diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py deleted file mode 100644 index 36c7e0ab..00000000 --- a/bot/exts/utilities/latex.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import hashlib -import pathlib -import re -from concurrent.futures import ThreadPoolExecutor -from io import BytesIO - -import discord -import matplotlib.pyplot as plt -from discord.ext import commands - -from bot.bot import Bot - -# configure fonts and colors for matplotlib -plt.rcParams.update( - { - "font.size": 16, - "mathtext.fontset": "cm", # Computer Modern font set - "mathtext.rm": "serif", - "figure.facecolor": "36393F", # matches Discord's dark mode background color - "text.color": "white", - } -) - -FORMATTED_CODE_REGEX = re.compile( - r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block - r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) - r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code - r"(?P<code>.*?)" # extract all code inside the markup - r"\s*" # any more whitespace before the end of the code markup - r"(?P=delim)", # match the exact same delimiter from the start again - re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive -) - -CACHE_DIRECTORY = pathlib.Path("_latex_cache") -CACHE_DIRECTORY.mkdir(exist_ok=True) - - -class Latex(commands.Cog): - """Renders latex.""" - - @staticmethod - def _render(text: str, filepath: pathlib.Path) -> BytesIO: - """ - Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. - - Saves rendered image to cache. - """ - fig = plt.figure() - rendered_image = BytesIO() - fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") - - try: - plt.savefig(rendered_image, bbox_inches="tight", dpi=600) - except ValueError as e: - raise commands.BadArgument(str(e)) - - rendered_image.seek(0) - - with open(filepath, "wb") as f: - f.write(rendered_image.getbuffer()) - - return rendered_image - - @staticmethod - def _prepare_input(text: str) -> str: - text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\ - - if match := FORMATTED_CODE_REGEX.match(text): - return match.group("code") - else: - return text - - @commands.command() - @commands.max_concurrency(1, commands.BucketType.guild, wait=True) - async def latex(self, ctx: commands.Context, *, text: str) -> None: - """Renders the text in latex and sends the image.""" - text = self._prepare_input(text) - query_hash = hashlib.md5(text.encode()).hexdigest() - image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") - async with ctx.typing(): - if image_path.exists(): - await ctx.send(file=discord.File(image_path)) - return - - with ThreadPoolExecutor() as pool: - image = await asyncio.get_running_loop().run_in_executor( - pool, self._render, text, image_path - ) - - await ctx.send(file=discord.File(image, "latex.png")) - - -def setup(bot: Bot) -> None: - """Load the Latex Cog.""" - # As we have resource issues on this cog, - # we have it currently disabled while we fix it. - import logging - logging.info("Latex cog is currently disabled. It won't be loaded.") - return - bot.add_cog(Latex()) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py index ef8b2638..bf8f1341 100644 --- a/bot/exts/utilities/realpython.py +++ b/bot/exts/utilities/realpython.py @@ -1,5 +1,6 @@ import logging from html import unescape +from typing import Optional from urllib.parse import quote_plus from discord import Embed @@ -31,9 +32,18 @@ class RealPython(commands.Cog): @commands.command(aliases=["rp"]) @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: - """Send 5 articles that match the user's search terms.""" - params = {"q": user_search, "limit": 5, "kind": "article"} + async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None: + """ + Send some articles from RealPython that match the search terms. + + By default the top 5 matches are sent, this can be overwritten to + a number between 1 and 5 by specifying an amount before the search query. + """ + if not 1 <= amount <= 5: + await ctx.send("`amount` must be between 1 and 5 (inclusive).") + return + + params = {"q": user_search, "limit": amount, "kind": "article"} async with self.bot.http_session.get(url=API_ROOT, params=params) as response: if response.status != 200: logger.error( diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py index c5283de0..e5e8e289 100644 --- a/bot/exts/utilities/wikipedia.py +++ b/bot/exts/utilities/wikipedia.py @@ -86,9 +86,7 @@ class WikipediaSearch(commands.Cog): ) embed.set_thumbnail(url=WIKI_THUMBNAIL) embed.timestamp = datetime.utcnow() - await LinePaginator.paginate( - contents, ctx, embed - ) + await LinePaginator.paginate(contents, ctx, embed, restrict_to_user=ctx.author) else: await ctx.send( "Sorry, we could not find a wikipedia article using that search term." diff --git a/bot/exts/utilities/wtf_python.py b/bot/exts/utilities/wtf_python.py new file mode 100644 index 00000000..980b3dba --- /dev/null +++ b/bot/exts/utilities/wtf_python.py @@ -0,0 +1,138 @@ +import logging +import random +import re +from typing import Optional + +import rapidfuzz +from discord import Embed, File +from discord.ext import commands, tasks + +from bot import constants +from bot.bot import Bot + +log = logging.getLogger(__name__) + +WTF_PYTHON_RAW_URL = "http://raw.githubusercontent.com/satwikkansal/wtfpython/master/" +BASE_URL = "https://github.com/satwikkansal/wtfpython" +LOGO_PATH = "./bot/resources/utilities/wtf_python_logo.jpg" + +ERROR_MESSAGE = f""" +Unknown WTF Python Query. Please try to reformulate your query. + +**Examples**: +```md +{constants.Client.prefix}wtf wild imports +{constants.Client.prefix}wtf subclass +{constants.Client.prefix}wtf del +``` +If the problem persists send a message in <#{constants.Channels.dev_contrib}> +""" + +MINIMUM_CERTAINTY = 55 + + +class WTFPython(commands.Cog): + """Cog that allows getting WTF Python entries from the WTF Python repository.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.headers: dict[str, str] = {} + self.fetch_readme.start() + + @tasks.loop(minutes=60) + async def fetch_readme(self) -> None: + """Gets the content of README.md from the WTF Python Repository.""" + async with self.bot.http_session.get(f"{WTF_PYTHON_RAW_URL}README.md") as resp: + log.trace("Fetching the latest WTF Python README.md") + if resp.status == 200: + raw = await resp.text() + self.parse_readme(raw) + + def parse_readme(self, data: str) -> None: + """ + Parses the README.md into a dict. + + It parses the readme into the `self.headers` dict, + where the key is the heading and the value is the + link to the heading. + """ + # Match the start of examples, until the end of the table of contents (toc) + table_of_contents = re.search( + r"\[👀 Examples\]\(#-examples\)\n([\w\W]*)<!-- tocstop -->", data + )[0].split("\n") + + for header in list(map(str.strip, table_of_contents)): + match = re.search(r"\[▶ (.*)\]\((.*)\)", header) + if match: + hyper_link = match[0].split("(")[1].replace(")", "") + self.headers[match[0]] = f"{BASE_URL}/{hyper_link}" + + def fuzzy_match_header(self, query: str) -> Optional[str]: + """ + Returns the fuzzy match of a query if its ratio is above "MINIMUM_CERTAINTY" else returns None. + + "MINIMUM_CERTAINTY" is the lowest score at which the fuzzy match will return a result. + The certainty returned by rapidfuzz.process.extractOne is a score between 0 and 100, + with 100 being a perfect match. + """ + match, certainty, _ = rapidfuzz.process.extractOne(query, self.headers.keys()) + return match if certainty > MINIMUM_CERTAINTY else None + + @commands.command(aliases=("wtf", "WTF")) + async def wtf_python(self, ctx: commands.Context, *, query: Optional[str] = None) -> None: + """ + Search WTF Python repository. + + Gets the link of the fuzzy matched query from https://github.com/satwikkansal/wtfpython. + Usage: + --> .wtf wild imports + """ + if query is None: + no_query_embed = Embed( + title="WTF Python?!", + colour=constants.Colours.dark_green, + description="A repository filled with suprising snippets that can make you say WTF?!\n\n" + f"[Go to the Repository]({BASE_URL})" + ) + logo = File(LOGO_PATH, filename="wtf_logo.jpg") + no_query_embed.set_thumbnail(url="attachment://wtf_logo.jpg") + await ctx.send(embed=no_query_embed, file=logo) + return + + if len(query) > 50: + embed = Embed( + title=random.choice(constants.ERROR_REPLIES), + description=ERROR_MESSAGE, + colour=constants.Colours.soft_red, + ) + match = None + else: + match = self.fuzzy_match_header(query) + + if not match: + embed = Embed( + title=random.choice(constants.ERROR_REPLIES), + description=ERROR_MESSAGE, + colour=constants.Colours.soft_red, + ) + await ctx.send(embed=embed) + return + + embed = Embed( + title="WTF Python?!", + colour=constants.Colours.dark_green, + description=f"""Search result for '{query}': {match.split("]")[0].replace("[", "")} + [Go to Repository Section]({self.headers[match]})""", + ) + logo = File(LOGO_PATH, filename="wtf_logo.jpg") + embed.set_thumbnail(url="attachment://wtf_logo.jpg") + await ctx.send(embed=embed, file=logo) + + def cog_unload(self) -> None: + """Unload the cog and cancel the task.""" + self.fetch_readme.cancel() + + +def setup(bot: Bot) -> None: + """Load the WTFPython Cog.""" + bot.add_cog(WTFPython(bot)) |