diff options
Diffstat (limited to 'bot/exts')
51 files changed, 3316 insertions, 687 deletions
diff --git a/bot/exts/avatar_modification/avatar_modify.py b/bot/exts/avatar_modification/avatar_modify.py index 87eb05e6..3ee70cfd 100644 --- a/bot/exts/avatar_modification/avatar_modify.py +++ b/bot/exts/avatar_modification/avatar_modify.py @@ -239,7 +239,7 @@ class AvatarModify(commands.Cog):                  description=f"Here is your lovely avatar, surrounded by\n a beautiful {option} flag. Enjoy :D"              )              embed.set_image(url=f"attachment://{file_name}") -            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.avatar.url) +            embed.set_footer(text=f"Made by {ctx.author.display_name}.", icon_url=ctx.author.display_avatar.url)              await ctx.send(file=file, embed=embed)      @avatar_modify.group( @@ -286,7 +286,7 @@ class AvatarModify(commands.Cog):      @avatar_modify.command(          aliases=("savatar", "spookify"),          root_aliases=("spookyavatar", "spookify", "savatar"), -        brief="Spookify an user's avatar." +        brief="Spookify a user's avatar."      )      async def spookyavatar(self, ctx: commands.Context) -> None:          """This "spookifies" the user's avatar, with a random *spooky* effect.""" diff --git a/bot/exts/core/error_handler.py b/bot/exts/core/error_handler.py index fd2123e7..983632ba 100644 --- a/bot/exts/core/error_handler.py +++ b/bot/exts/core/error_handler.py @@ -12,7 +12,7 @@ from sentry_sdk import push_scope  from bot.bot import Bot  from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput  from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import APIError, UserNotPlayingError +from bot.utils.exceptions import APIError, MovedCommandError, UserNotPlayingError  log = logging.getLogger(__name__) @@ -98,7 +98,8 @@ class CommandErrorHandler(commands.Cog):          if isinstance(error, commands.NoPrivateMessage):              await ctx.send(                  embed=self.error_embed( -                    f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", +                    "This command can only be used in the server. " +                    f"Go to <#{Channels.sir_lancebot_playground}> instead!",                      NEGATIVE_REPLIES                  )              ) @@ -130,6 +131,14 @@ class CommandErrorHandler(commands.Cog):              )              return +        if isinstance(error, MovedCommandError): +            description = ( +                f"This command, `{ctx.prefix}{ctx.command.qualified_name}` has moved to `{error.new_command_name}`.\n" +                f"Please use `{error.new_command_name}` instead." +            ) +            await ctx.send(embed=self.error_embed(description, NEGATIVE_REPLIES)) +            return +          with push_scope() as scope:              scope.user = {                  "id": ctx.author.id, diff --git a/bot/exts/core/extensions.py b/bot/exts/core/extensions.py index 424bacac..d809d2b9 100644 --- a/bot/exts/core/extensions.py +++ b/bot/exts/core/extensions.py @@ -18,7 +18,7 @@ from bot.utils.pagination import LinePaginator  log = logging.getLogger(__name__) -UNLOAD_BLACKLIST = {f"{exts.__name__}.utils.extensions"} +UNLOAD_BLACKLIST = {f"{exts.__name__}.core.extensions"}  BASE_PATH_LEN = len(exts.__name__.split(".")) @@ -152,7 +152,7 @@ class Extensions(commands.Cog):          Grey indicates that the extension is unloaded.          Green indicates that the extension is currently loaded.          """ -        embed = Embed(colour=Colour.blurple()) +        embed = Embed(colour=Colour.og_blurple())          embed.set_author(              name="Extensions List",              url=Client.github_bot_repo, diff --git a/bot/exts/core/help.py b/bot/exts/core/help.py index 4b766b50..db3c2aa6 100644 --- a/bot/exts/core/help.py +++ b/bot/exts/core/help.py @@ -13,10 +13,7 @@ from rapidfuzz import process  from bot import constants  from bot.bot import Bot  from bot.constants import Emojis -from bot.utils.pagination import ( -    FIRST_EMOJI, LAST_EMOJI, -    LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, -) +from bot.utils.pagination import FIRST_EMOJI, LAST_EMOJI, LEFT_EMOJI, LinePaginator, RIGHT_EMOJI  DELETE_EMOJI = Emojis.trashcan diff --git a/bot/exts/core/internal_eval/_internal_eval.py b/bot/exts/core/internal_eval/_internal_eval.py index 4f6b4321..190a15ec 100644 --- a/bot/exts/core/internal_eval/_internal_eval.py +++ b/bot/exts/core/internal_eval/_internal_eval.py @@ -10,6 +10,7 @@ from bot.bot import Bot  from bot.constants import Client, Roles  from bot.utils.decorators import with_role  from bot.utils.extensions import invoke_help_command +  from ._helpers import EvalContext  __all__ = ["InternalEval"] @@ -33,6 +34,8 @@ RAW_CODE_REGEX = re.compile(      re.DOTALL                               # "." also matches newlines  ) +MAX_LENGTH = 99980 +  class InternalEval(commands.Cog):      """Top secret code evaluation for admins and owners.""" @@ -84,9 +87,10 @@ class InternalEval(commands.Cog):      async def _upload_output(self, output: str) -> Optional[str]:          """Upload `internal eval` output to our pastebin and return the url.""" +        data = self.shorten_output(output, max_length=MAX_LENGTH)          try:              async with self.bot.http_session.post( -                "https://paste.pythondiscord.com/documents", data=output, raise_for_status=True +                "https://paste.pythondiscord.com/documents", data=data, raise_for_status=True              ) as resp:                  data = await resp.json() @@ -146,14 +150,14 @@ class InternalEval(commands.Cog):          await self._send_output(ctx, eval_context.format_output())      @commands.group(name="internal", aliases=("int",)) -    @with_role(Roles.admin) +    @with_role(Roles.admins)      async def internal_group(self, ctx: commands.Context) -> None:          """Internal commands. Top secret!"""          if not ctx.invoked_subcommand:              await invoke_help_command(ctx)      @internal_group.command(name="eval", aliases=("e",)) -    @with_role(Roles.admin) +    @with_role(Roles.admins)      async def eval(self, ctx: commands.Context, *, code: str) -> None:          """Run eval in a REPL-like format."""          if match := list(FORMATTED_CODE_REGEX.finditer(code)): @@ -172,7 +176,7 @@ class InternalEval(commands.Cog):          await self._eval(ctx, code)      @internal_group.command(name="reset", aliases=("clear", "exit", "r", "c")) -    @with_role(Roles.admin) +    @with_role(Roles.admins)      async def reset(self, ctx: commands.Context) -> None:          """Reset the context and locals of the eval session."""          self.locals = {} diff --git a/bot/exts/core/source.py b/bot/exts/core/source.py index 7572ce51..2801be0f 100644 --- a/bot/exts/core/source.py +++ b/bot/exts/core/source.py @@ -6,14 +6,16 @@ from discord import Embed  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Source +from bot.constants import Channels, Source, WHITELISTED_CHANNELS  from bot.utils.converters import SourceConverter, SourceType +from bot.utils.decorators import whitelist_override  class BotSource(commands.Cog):      """Displays information about the bot's source code."""      @commands.command(name="source", aliases=("src",)) +    @whitelist_override(channels=WHITELISTED_CHANNELS+(Channels.community_meta, Channels.dev_contrib))      async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None:          """Display information and a GitHub link to the source code of a command, tag, or cog."""          if not source_item: diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py index ca60e517..518841d4 100644 --- a/bot/exts/events/advent_of_code/_cog.py +++ b/bot/exts/events/advent_of_code/_cog.py @@ -2,17 +2,22 @@ import json  import logging  from datetime import datetime, timedelta  from pathlib import Path +from typing import Optional  import arrow  import discord -from discord.ext import commands +from async_rediscache import RedisCache +from discord.ext import commands, tasks  from bot.bot import Bot  from bot.constants import ( -    AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS, +    AdventOfCode as AocConfig, Channels, Client, Colours, Emojis, Month, PYTHON_PREFIX, Roles, WHITELISTED_CHANNELS  )  from bot.exts.events.advent_of_code import _helpers +from bot.exts.events.advent_of_code.views.dayandstarview import AoCDropdownView +from bot.utils import members  from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role +from bot.utils.exceptions import MovedCommandError  from bot.utils.extensions import invoke_help_command  log = logging.getLogger(__name__) @@ -29,6 +34,14 @@ AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,)  class AdventOfCode(commands.Cog):      """Advent of Code festivities! Ho Ho Ho!""" +    # Redis Cache for linking Discord IDs to Advent of Code usernames +    # RedisCache[member_id: aoc_username_string] +    account_links = RedisCache() + +    # A dict with keys of member_ids to block from getting the role +    # RedisCache[member_id: None] +    completionist_block_list = RedisCache() +      def __init__(self, bot: Bot):          self.bot = bot @@ -48,6 +61,62 @@ class AdventOfCode(commands.Cog):          self.status_task.set_name("AoC Status Countdown")          self.status_task.add_done_callback(_helpers.background_task_callback) +        # Don't start task while event isn't running +        # self.completionist_task.start() + +    @tasks.loop(minutes=10.0) +    async def completionist_task(self) -> None: +        """ +        Give members who have completed all 50 AoC stars the completionist role. + +        Runs on a schedule, as defined in the task.loop decorator. +        """ +        await self.bot.wait_until_guild_available() +        guild = self.bot.get_guild(Client.guild) +        completionist_role = guild.get_role(Roles.aoc_completionist) +        if completionist_role is None: +            log.warning("Could not find the AoC completionist role; cancelling completionist task.") +            self.completionist_task.cancel() +            return + +        aoc_name_to_member_id = { +            aoc_name: member_id +            for member_id, aoc_name in await self.account_links.items() +        } + +        try: +            leaderboard = await _helpers.fetch_leaderboard() +        except _helpers.FetchingLeaderboardFailedError: +            await self.bot.send_log("Unable to fetch AoC leaderboard during role sync.") +            return + +        placement_leaderboard = json.loads(leaderboard["placement_leaderboard"]) + +        for member_aoc_info in placement_leaderboard.values(): +            if not member_aoc_info["stars"] == 50: +                # Only give the role to people who have completed all 50 stars +                continue + +            aoc_name = member_aoc_info["name"] or f"Anonymous #{member_aoc_info['id']}" + +            member_id = aoc_name_to_member_id.get(aoc_name) +            if not member_id: +                log.debug(f"Could not find member_id for {member_aoc_info['name']}, not giving role.") +                continue + +            member = await members.get_or_fetch_member(guild, member_id) +            if member is None: +                log.debug(f"Could not find {member_id}, not giving role.") +                continue + +            if completionist_role in member.roles: +                log.debug(f"{member.name} ({member.mention}) already has the completionist role.") +                continue + +            if not await self.completionist_block_list.contains(member_id): +                log.debug(f"Giving completionist role to {member.name} ({member.mention}).") +                await members.handle_role_change(member, member.add_roles, completionist_role) +      @commands.group(name="adventofcode", aliases=("aoc",))      @whitelist_override(channels=AOC_WHITELIST)      async def adventofcode_group(self, ctx: commands.Context) -> None: @@ -55,77 +124,59 @@ class AdventOfCode(commands.Cog):          if not ctx.invoked_subcommand:              await invoke_help_command(ctx) +    @with_role(Roles.admins)      @adventofcode_group.command( -        name="subscribe", -        aliases=("sub", "notifications", "notify", "notifs"), -        brief="Notifications for new days" +        name="block", +        brief="Block a user from getting the completionist role.",      ) -    @whitelist_override(channels=AOC_WHITELIST) -    async def aoc_subscribe(self, ctx: commands.Context) -> None: -        """Assign the role for notifications about new days being ready.""" -        current_year = datetime.now().year -        if current_year != AocConfig.year: -            await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!") -            return +    async def block_from_role(self, ctx: commands.Context, member: discord.Member) -> None: +        """Block the given member from receiving the AoC completionist role, removing it from them if needed.""" +        completionist_role = ctx.guild.get_role(Roles.aoc_completionist) +        if completionist_role in member.roles: +            await member.remove_roles(completionist_role) -        role = ctx.guild.get_role(AocConfig.role_id) -        unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" +        await self.completionist_block_list.set(member.id, "sentinel") +        await ctx.send(f":+1: Blocked {member.mention} from getting the AoC completionist role.") -        if role not in ctx.author.roles: -            await ctx.author.add_roles(role) -            await ctx.send( -                "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " -                f"You can run `{unsubscribe_command}` to disable them again for you." -            ) -        else: -            await ctx.send( -                "Hey, you already are receiving notifications about new Advent of Code tasks. " -                f"If you don't want them any more, run `{unsubscribe_command}` instead." -            ) - -    @in_month(Month.DECEMBER) -    @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") +    @commands.guild_only() +    @adventofcode_group.command( +        name="subscribe", +        aliases=("sub", "notifications", "notify", "notifs", "unsubscribe", "unsub"), +        help=f"NOTE: This command has been moved to {PYTHON_PREFIX}subscribe", +    )      @whitelist_override(channels=AOC_WHITELIST) -    async def aoc_unsubscribe(self, ctx: commands.Context) -> None: -        """Remove the role for notifications about new days being ready.""" -        role = ctx.guild.get_role(AocConfig.role_id) +    async def aoc_subscribe(self, ctx: commands.Context) -> None: +        """ +        Deprecated role command. -        if role in ctx.author.roles: -            await ctx.author.remove_roles(role) -            await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") -        else: -            await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") +        This command has been moved to bot, and will be removed in the future. +        """ +        raise MovedCommandError(f"{PYTHON_PREFIX}subscribe")      @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day")      @whitelist_override(channels=AOC_WHITELIST)      async def aoc_countdown(self, ctx: commands.Context) -> None:          """Return time left until next day.""" -        if not _helpers.is_in_advent(): -            datetime_now = arrow.now(_helpers.EST) - -            # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past -            this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) -            next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) -            deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) -            delta = min(delta for delta in deltas if delta >= timedelta())  # timedelta() gives 0 duration delta - -            # Add a finer timedelta if there's less than a day left -            if delta.days == 0: -                delta_str = f"approximately {delta.seconds // 3600} hours" -            else: -                delta_str = f"{delta.days} days" +        if _helpers.is_in_advent(): +            tomorrow, _ = _helpers.time_left_to_est_midnight() +            next_day_timestamp = int(tomorrow.timestamp()) -            await ctx.send( -                "The Advent of Code event is not currently running. " -                f"The next event will start in {delta_str}." -            ) +            await ctx.send(f"Day {tomorrow.day} starts <t:{next_day_timestamp}:R>.")              return -        tomorrow, time_left = _helpers.time_left_to_est_midnight() +        datetime_now = arrow.now(_helpers.EST) +        # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past +        this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) +        next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) +        deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) +        delta = min(delta for delta in deltas if delta >= timedelta())  # timedelta() gives 0 duration delta -        hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 +        next_aoc_timestamp = int((datetime_now + delta).timestamp()) -        await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") +        await ctx.send( +            "The Advent of Code event is not currently running. " +            f"The next event will start <t:{next_aoc_timestamp}:R>." +        )      @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code")      @whitelist_override(channels=AOC_WHITELIST) @@ -133,13 +184,19 @@ class AdventOfCode(commands.Cog):          """Respond with an explanation of all things Advent of Code."""          await ctx.send(embed=self.cached_about_aoc) +    @commands.guild_only()      @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)")      @whitelist_override(channels=AOC_WHITELIST)      async def join_leaderboard(self, ctx: commands.Context) -> None:          """DM the user the information for joining the Python Discord leaderboard.""" -        current_year = datetime.now().year -        if current_year != AocConfig.year: -            await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!") +        current_date = datetime.now() +        allowed_months = (Month.NOVEMBER.value, Month.DECEMBER.value) +        if not ( +            current_date.month in allowed_months and current_date.year == AocConfig.year or +            current_date.month == Month.JANUARY.value and current_date.year == AocConfig.year + 1 +        ): +            # Only allow joining the leaderboard in the run up to AOC and the January following. +            await ctx.send(f"The Python Discord leaderboard for {current_date.year} is not yet available!")              return          author = ctx.author @@ -150,7 +207,7 @@ class AdventOfCode(commands.Cog):          else:              try:                  join_code = await _helpers.get_public_join_code(author) -            except _helpers.FetchingLeaderboardFailed: +            except _helpers.FetchingLeaderboardFailedError:                  await ctx.send(":x: Failed to get join code! Notified maintainers.")                  return @@ -178,33 +235,163 @@ class AdventOfCode(commands.Cog):          else:              await ctx.message.add_reaction(Emojis.envelope) -    @in_month(Month.DECEMBER) +    @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY) +    @adventofcode_group.command( +        name="link", +        aliases=("connect",), +        brief="Tie your Discord account with your Advent of Code name." +    ) +    @whitelist_override(channels=AOC_WHITELIST) +    async def aoc_link_account(self, ctx: commands.Context, *, aoc_name: str = None) -> None: +        """ +        Link your Discord Account to your Advent of Code name. + +        Stored in a Redis Cache with the format of `Discord ID: Advent of Code Name` +        """ +        cache_items = await self.account_links.items() +        cache_aoc_names = [value for _, value in cache_items] + +        if aoc_name: +            # Let's check the current values in the cache to make sure it isn't already tied to a different account +            if aoc_name == await self.account_links.get(ctx.author.id): +                await ctx.reply(f"{aoc_name} is already tied to your account.") +                return +            elif aoc_name in cache_aoc_names: +                log.info( +                    f"{ctx.author} ({ctx.author.id}) tried to connect their account to {aoc_name}," +                    " but it's already connected to another user." +                ) +                await ctx.reply( +                    f"{aoc_name} is already tied to another account." +                    " Please contact an admin if you believe this is an error." +                ) +                return + +            # Update an existing link +            if old_aoc_name := await self.account_links.get(ctx.author.id): +                log.info(f"Changing link for {ctx.author} ({ctx.author.id}) from {old_aoc_name} to {aoc_name}.") +                await self.account_links.set(ctx.author.id, aoc_name) +                await ctx.reply(f"Your linked account has been changed to {aoc_name}.") +            else: +                # Create a new link +                log.info(f"Linking {ctx.author} ({ctx.author.id}) to account {aoc_name}.") +                await self.account_links.set(ctx.author.id, aoc_name) +                await ctx.reply(f"You have linked your Discord ID to {aoc_name}.") +        else: +            # User has not supplied a name, let's check if they're in the cache or not +            if cache_name := await self.account_links.get(ctx.author.id): +                await ctx.reply(f"You have already linked an Advent of Code account: {cache_name}.") +            else: +                await ctx.reply( +                    "You have not linked an Advent of Code account." +                    " Please re-run the command with one specified." +                ) + +    @in_month(Month.NOVEMBER, Month.DECEMBER, Month.JANUARY) +    @adventofcode_group.command( +        name="unlink", +        aliases=("disconnect",), +        brief="Tie your Discord account with your Advent of Code name." +    ) +    @whitelist_override(channels=AOC_WHITELIST) +    async def aoc_unlink_account(self, ctx: commands.Context) -> None: +        """ +        Unlink your Discord ID with your Advent of Code leaderboard name. + +        Deletes the entry that was Stored in the Redis cache. +        """ +        if aoc_cache_name := await self.account_links.get(ctx.author.id): +            log.info(f"Unlinking {ctx.author} ({ctx.author.id}) from Advent of Code account {aoc_cache_name}") +            await self.account_links.delete(ctx.author.id) +            await ctx.reply(f"We have removed the link between your Discord ID and {aoc_cache_name}.") +        else: +            log.info(f"Attempted to unlink {ctx.author} ({ctx.author.id}), but no link was found.") +            await ctx.reply("You don't have an Advent of Code account linked.") + +    @in_month(Month.DECEMBER, Month.JANUARY) +    @adventofcode_group.command( +        name="dayandstar", +        aliases=("daynstar", "daystar"), +        brief="Get a view that lets you filter the leaderboard by day and star", +    ) +    @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) +    async def aoc_day_and_star_leaderboard( +            self, +            ctx: commands.Context, +            maximum_scorers_day_and_star: Optional[int] = 10 +    ) -> None: +        """Have the bot send a View that will let you filter the leaderboard by day and star.""" +        if maximum_scorers_day_and_star > AocConfig.max_day_and_star_results or maximum_scorers_day_and_star <= 0: +            raise commands.BadArgument( +                f"The maximum number of results you can query is {AocConfig.max_day_and_star_results}" +            ) +        async with ctx.typing(): +            try: +                leaderboard = await _helpers.fetch_leaderboard() +            except _helpers.FetchingLeaderboardFailedError: +                await ctx.send(":x: Unable to fetch leaderboard!") +                return +        # This is a dictionary that contains solvers in respect of day, and star. +        # e.g. 1-1 means the solvers of the first star of the first day and their completion time +        per_day_and_star = json.loads(leaderboard['leaderboard_per_day_and_star']) +        view = AoCDropdownView( +            day_and_star_data=per_day_and_star, +            maximum_scorers=maximum_scorers_day_and_star, +            original_author=ctx.author +        ) +        message = await ctx.send( +            content="Please select a day and a star to filter by!", +            view=view +        ) +        await view.wait() +        await message.edit(view=None) + +    @in_month(Month.DECEMBER, Month.JANUARY)      @adventofcode_group.command(          name="leaderboard",          aliases=("board", "lb"),          brief="Get a snapshot of the PyDis private AoC leaderboard",      )      @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) -    async def aoc_leaderboard(self, ctx: commands.Context) -> None: -        """Get the current top scorers of the Python Discord Leaderboard.""" +    async def aoc_leaderboard(self, ctx: commands.Context, *, aoc_name: Optional[str] = None) -> None: +        """ +        Get the current top scorers of the Python Discord Leaderboard. + +        Additionally you can specify an `aoc_name` that will append the +        specified profile's personal stats to the top of the leaderboard +        """ +        # Strip quotes from the AoC username if needed (e.g. "My Name" -> My Name) +        # This is to keep compatibility with those already used to wrapping the AoC name in quotes +        # Note: only strips one layer of quotes to allow names with quotes at the start and end +        #      e.g. ""My Name"" -> "My Name" +        if aoc_name and aoc_name.startswith('"') and aoc_name.endswith('"'): +            aoc_name = aoc_name[1:-1] + +        # Check if an advent of code account is linked in the Redis Cache if aoc_name is not given +        if (aoc_cache_name := await self.account_links.get(ctx.author.id)) and aoc_name is None: +            aoc_name = aoc_cache_name +          async with ctx.typing():              try: -                leaderboard = await _helpers.fetch_leaderboard() -            except _helpers.FetchingLeaderboardFailed: +                leaderboard = await _helpers.fetch_leaderboard(self_placement_name=aoc_name) +            except _helpers.FetchingLeaderboardFailedError:                  await ctx.send(":x: Unable to fetch leaderboard!")                  return -            number_of_participants = leaderboard["number_of_participants"] +        number_of_participants = leaderboard["number_of_participants"] -            top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) -            header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}" - -            table = f"```\n{leaderboard['top_leaderboard']}\n```" -            info_embed = _helpers.get_summary_embed(leaderboard) +        top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) +        self_placement_header = " (and your personal stats compared to the top 10)" if aoc_name else "" +        header = f"Here's our current top {top_count}{self_placement_header}! {Emojis.christmas_tree * 3}" +        table = "```\n" \ +                f"{leaderboard['placement_leaderboard'] if aoc_name else leaderboard['top_leaderboard']}" \ +                "\n```" +        info_embed = _helpers.get_summary_embed(leaderboard) -            await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) +        await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) +        return -    @in_month(Month.DECEMBER) +    @in_month(Month.DECEMBER, Month.JANUARY)      @adventofcode_group.command(          name="global",          aliases=("globalboard", "gb"), @@ -231,7 +418,7 @@ class AdventOfCode(commands.Cog):          """Send an embed with daily completion statistics for the Python Discord leaderboard."""          try:              leaderboard = await _helpers.fetch_leaderboard() -        except _helpers.FetchingLeaderboardFailed: +        except _helpers.FetchingLeaderboardFailedError:              await ctx.send(":x: Can't fetch leaderboard for stats right now!")              return @@ -251,7 +438,7 @@ class AdventOfCode(commands.Cog):              info_embed = _helpers.get_summary_embed(leaderboard)              await ctx.send(f"```\n{table}\n```", embed=info_embed) -    @with_role(Roles.admin) +    @with_role(Roles.admins)      @adventofcode_group.command(          name="refresh",          aliases=("fetch",), @@ -267,7 +454,7 @@ class AdventOfCode(commands.Cog):          async with ctx.typing():              try:                  await _helpers.fetch_leaderboard(invalidate_cache=True) -            except _helpers.FetchingLeaderboardFailed: +            except _helpers.FetchingLeaderboardFailedError:                  await ctx.send(":x: Something went wrong while trying to refresh the cache!")              else:                  await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!") @@ -277,6 +464,7 @@ class AdventOfCode(commands.Cog):          log.debug("Unloading the cog and canceling the background task.")          self.notification_task.cancel()          self.status_task.cancel() +        self.completionist_task.cancel()      def _build_about_embed(self) -> discord.Embed:          """Build and return the informational "About AoC" embed from the resources file.""" diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py index 5fedb60f..6c004901 100644 --- a/bot/exts/events/advent_of_code/_helpers.py +++ b/bot/exts/events/advent_of_code/_helpers.py @@ -10,6 +10,7 @@ from typing import Any, Optional  import aiohttp  import arrow  import discord +from discord.ext import commands  from bot.bot import Bot  from bot.constants import AdventOfCode, Channels, Colours @@ -70,6 +71,33 @@ class FetchingLeaderboardFailedError(Exception):      """Raised when one or more leaderboards could not be fetched at all.""" +def _format_leaderboard_line(rank: int, data: dict[str, Any], *, is_author: bool) -> str: +    """ +    Build a string representing a line of the leaderboard. + +    Parameters: +        rank: +            Rank in the leaderboard of this entry. + +        data: +            Mapping with entry information. + +    Keyword arguments: +        is_author: +            Whether to address the name displayed in the returned line +            personally. + +    Returns: +        A formatted line for the leaderboard. +    """ +    return AOC_TABLE_TEMPLATE.format( +        rank=rank, +        name=data['name'] if not is_author else f"(You) {data['name']}", +        score=str(data['score']), +        stars=f"({data['star_1']}, {data['star_2']})" +    ) + +  def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]:      """      Provide a sorting value for our leaderboard. @@ -105,6 +133,7 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:      # The data we get from the AoC website is structured by member, not by day/star,      # which means we need to iterate over the members to transpose the data to a per      # star view. We need that per star view to compute rank scores per star. +    per_day_star_stats = collections.defaultdict(list)      for member in raw_leaderboard_data.values():          name = member["name"] if member["name"] else f"Anonymous #{member['id']}"          member_id = member["id"] @@ -122,6 +151,11 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:                  star_results[(day, star)].append(                      StarResult(member_id=member_id, completion_time=completion_time)                  ) +                per_day_star_stats[f"{day}-{star}"].append( +                    {'completion_time': int(data["get_star_ts"]), 'member_name': name} +                ) +    for key in per_day_star_stats: +        per_day_star_stats[key] = sorted(per_day_star_stats[key], key=operator.itemgetter('completion_time'))      # Now that we have a transposed dataset that holds the completion time of all      # participants per star, we can compute the rank-based scores each participant @@ -151,13 +185,26 @@ def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:          # this data to JSON in order to cache it in Redis.          daily_stats[day] = {"star_one": star_one, "star_two": star_two} -    return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard} +    return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard, 'per_day_and_star': per_day_star_stats} -def _format_leaderboard(leaderboard: dict[str, dict]) -> str: +def _format_leaderboard(leaderboard: dict[str, dict], self_placement_name: str = None) -> str:      """Format the leaderboard using the AOC_TABLE_TEMPLATE."""      leaderboard_lines = [HEADER] +    self_placement_exists = False      for rank, data in enumerate(leaderboard.values(), start=1): +        if self_placement_name and data["name"].lower() == self_placement_name.lower(): +            leaderboard_lines.insert( +                1, +                AOC_TABLE_TEMPLATE.format( +                    rank=rank, +                    name=f"(You) {data['name']}", +                    score=str(data["score"]), +                    stars=f"({data['star_1']}, {data['star_2']})" +                ) +            ) +            self_placement_exists = True +            continue          leaderboard_lines.append(              AOC_TABLE_TEMPLATE.format(                  rank=rank, @@ -166,7 +213,13 @@ def _format_leaderboard(leaderboard: dict[str, dict]) -> str:                  stars=f"({data['star_1']}, {data['star_2']})"              )          ) - +    if self_placement_name and not self_placement_exists: +        raise commands.BadArgument( +            "Sorry, your profile does not exist in this leaderboard." +            "\n\n" +            "To join our leaderboard, run the command `.aoc join`." +            " If you've joined recently, please wait up to 30 minutes for our leaderboard to refresh." +        )      return "\n".join(leaderboard_lines) @@ -202,7 +255,7 @@ async def _fetch_leaderboard_data() -> dict[str, Any]:          # Two attempts, one with the original session cookie and one with the fallback session          for attempt in range(1, 3): -            log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") +            log.debug(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")              cookies = {"session": leaderboard.session}              try:                  raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies) @@ -254,7 +307,7 @@ def _get_top_leaderboard(full_leaderboard: str) -> str:  @_caches.leaderboard_cache.atomic_transaction -async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: +async def fetch_leaderboard(invalidate_cache: bool = False, self_placement_name: str = None) -> dict:      """      Get the current Python Discord combined leaderboard. @@ -264,7 +317,6 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:      miss, this function is locked to one call at a time using a decorator.      """      cached_leaderboard = await _caches.leaderboard_cache.to_dict() -      # Check if the cached leaderboard contains everything we expect it to. If it      # does not, this probably means the cache has not been created yet or has      # expired in Redis. This check also accounts for a malformed cache. @@ -280,15 +332,17 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:          number_of_participants = len(leaderboard)          formatted_leaderboard = _format_leaderboard(leaderboard)          full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) -        leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() +        leaderboard_fetched_at = datetime.datetime.now(datetime.timezone.utc).isoformat()          cached_leaderboard = { +            "placement_leaderboard": json.dumps(raw_leaderboard_data),              "full_leaderboard": formatted_leaderboard,              "top_leaderboard": _get_top_leaderboard(formatted_leaderboard),              "full_leaderboard_url": full_leaderboard_url,              "leaderboard_fetched_at": leaderboard_fetched_at,              "number_of_participants": number_of_participants,              "daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]), +            "leaderboard_per_day_and_star": json.dumps(parsed_leaderboard_data["per_day_and_star"])          }          # Store the new values in Redis @@ -300,7 +354,13 @@ async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:                  _caches.leaderboard_cache.namespace,                  AdventOfCode.leaderboard_cache_expiry_seconds              ) - +    if self_placement_name: +        formatted_placement_leaderboard = _parse_raw_leaderboard_data( +            json.loads(cached_leaderboard["placement_leaderboard"]) +        )["leaderboard"] +        cached_leaderboard["placement_leaderboard"] = _get_top_leaderboard( +            _format_leaderboard(formatted_placement_leaderboard, self_placement_name=self_placement_name) +        )      return cached_leaderboard @@ -308,11 +368,13 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:      """Get an embed with the current summary stats of the leaderboard."""      leaderboard_url = leaderboard["full_leaderboard_url"]      refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60 +    refreshed_unix = int(datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]).timestamp()) + +    aoc_embed = discord.Embed(colour=Colours.soft_green) -    aoc_embed = discord.Embed( -        colour=Colours.soft_green, -        timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), -        description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*" +    aoc_embed.description = ( +        f"The leaderboard is refreshed every {refresh_minutes} minutes.\n" +        f"Last Updated: <t:{refreshed_unix}:t>"      )      aoc_embed.add_field(          name="Number of Participants", @@ -326,7 +388,6 @@ def get_summary_embed(leaderboard: dict) -> discord.Embed:              inline=True,          )      aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) -    aoc_embed.set_footer(text="Last Updated")      aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL)      return aoc_embed diff --git a/bot/exts/events/advent_of_code/views/dayandstarview.py b/bot/exts/events/advent_of_code/views/dayandstarview.py new file mode 100644 index 00000000..5529c12b --- /dev/null +++ b/bot/exts/events/advent_of_code/views/dayandstarview.py @@ -0,0 +1,82 @@ +from datetime import datetime + +import discord + +AOC_DAY_AND_STAR_TEMPLATE = "{rank: >4} | {name:25.25} | {completion_time: >10}" + + +class AoCDropdownView(discord.ui.View): +    """Interactive view to filter AoC stats by Day and Star.""" + +    def __init__(self, original_author: discord.Member, day_and_star_data: dict[str: dict], maximum_scorers: int): +        super().__init__() +        self.day = 0 +        self.star = 0 +        self.data = day_and_star_data +        self.maximum_scorers = maximum_scorers +        self.original_author = original_author + +    def generate_output(self) -> str: +        """ +        Generates a formatted codeblock with AoC statistics based on the currently selected day and star. + +        Optionally, when the requested day and star data does not exist yet it returns an error message. +        """ +        header = AOC_DAY_AND_STAR_TEMPLATE.format( +            rank="Rank", +            name="Name", completion_time="Completion time (UTC)" +        ) +        lines = [f"{header}\n{'-' * (len(header) + 2)}"] +        if not (day_and_star_data := self.data.get(f"{self.day}-{self.star}")): +            return ":x: The requested data for the specified day and star does not exist yet." +        for rank, scorer in enumerate(day_and_star_data[:self.maximum_scorers]): +            time_data = datetime.fromtimestamp(scorer['completion_time']).strftime("%I:%M:%S %p") +            lines.append(AOC_DAY_AND_STAR_TEMPLATE.format( +                datastamp="", +                rank=rank + 1, +                name=scorer['member_name'], +                completion_time=time_data) +            ) +        joined_lines = "\n".join(lines) +        return f"Statistics for Day: {self.day}, Star: {self.star}.\n ```\n{joined_lines}\n```" + +    async def interaction_check(self, interaction: discord.Interaction) -> bool: +        """Global check to ensure that the interacting user is the user who invoked the command originally.""" +        if interaction.user != self.original_author: +            await interaction.response.send_message( +                ":x: You can't interact with someone else's response. Please run the command yourself!", +                ephemeral=True +            ) +            return False +        return True + +    @discord.ui.select( +        placeholder="Day", +        options=[discord.SelectOption(label=str(i)) for i in range(1, 26)], +        custom_id="day_select" +    ) +    async def day_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: +        """Dropdown to choose a Day of the AoC.""" +        self.day = select.values[0] + +    @discord.ui.select( +        placeholder="Star", +        options=[discord.SelectOption(label=str(i)) for i in range(1, 3)], +        custom_id="star_select" +    ) +    async def star_select(self, select: discord.ui.Select, interaction: discord.Interaction) -> None: +        """Dropdown to choose either the first or the second star.""" +        self.star = select.values[0] + +    @discord.ui.button(label="Fetch", style=discord.ButtonStyle.blurple) +    async def fetch(self, button: discord.ui.Button, interaction: discord.Interaction) -> None: +        """Button that fetches the statistics based on the dropdown values.""" +        if self.day == 0 or self.star == 0: +            await interaction.response.send_message( +                "You have to select a value from both of the dropdowns!", +                ephemeral=True +            ) +        else: +            await interaction.response.edit_message(content=self.generate_output()) +            self.day = 0 +            self.star = 0 diff --git a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py index e3053851..1774564b 100644 --- a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py +++ b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py @@ -52,10 +52,10 @@ class HacktoberIssues(commands.Cog):      async def get_issues(self, ctx: commands.Context, option: str) -> Optional[dict]:          """Get a list of the python issues with the label 'hacktoberfest' from the Github api."""          if option == "beginner": -            if (ctx.message.created_at - self.cache_timer_beginner).seconds <= 60: +            if (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_beginner).seconds <= 60:                  log.debug("using cache")                  return self.cache_beginner -        elif (ctx.message.created_at - self.cache_timer_normal).seconds <= 60: +        elif (ctx.message.created_at.replace(tzinfo=None) - self.cache_timer_normal).seconds <= 60:              log.debug("using cache")              return self.cache_normal @@ -88,10 +88,10 @@ class HacktoberIssues(commands.Cog):              if option == "beginner":                  self.cache_beginner = data -                self.cache_timer_beginner = ctx.message.created_at +                self.cache_timer_beginner = ctx.message.created_at.replace(tzinfo=None)              else:                  self.cache_normal = data -                self.cache_timer_normal = ctx.message.created_at +                self.cache_timer_normal = ctx.message.created_at.replace(tzinfo=None)              return data @@ -100,7 +100,8 @@ class HacktoberIssues(commands.Cog):          """Format the issue data into a embed."""          title = issue["title"]          issue_url = issue["url"].replace("api.", "").replace("/repos/", "/") -        body = issue["body"] +        # issues can have empty bodies, which in that case GitHub doesn't include the key in the API response +        body = issue.get("body", "")          labels = [label["name"] for label in issue["labels"]]          embed = discord.Embed(title=title) diff --git a/bot/exts/events/trivianight/__init__.py b/bot/exts/events/trivianight/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/events/trivianight/__init__.py diff --git a/bot/exts/events/trivianight/_game.py b/bot/exts/events/trivianight/_game.py new file mode 100644 index 00000000..8b012a17 --- /dev/null +++ b/bot/exts/events/trivianight/_game.py @@ -0,0 +1,192 @@ +import time +from random import randrange +from string import ascii_uppercase +from typing import Iterable, NamedTuple, Optional, TypedDict + +DEFAULT_QUESTION_POINTS = 10 +DEFAULT_QUESTION_TIME = 20 + + +class QuestionData(TypedDict): +    """Representing the different 'keys' of the question taken from the JSON.""" + +    number: str +    description: str +    answers: list[str] +    correct: str +    points: Optional[int] +    time: Optional[int] + + +class UserGuess(NamedTuple): +    """Represents the user's guess for a question.""" + +    answer: str +    editable: bool +    elapsed: float + + +class QuestionClosed(RuntimeError): +    """Exception raised when the question is not open for guesses anymore.""" + + +class AlreadyUpdated(RuntimeError): +    """Exception raised when the user has already updated their guess once.""" + + +class AllQuestionsVisited(RuntimeError): +    """Exception raised when all of the questions have been visited.""" + + +class Question: +    """Interface for one question in a trivia night game.""" + +    def __init__(self, data: QuestionData): +        self._data = data +        self._guesses: dict[int, UserGuess] = {} +        self._started = None + +    # These properties are mostly proxies to the underlying data: + +    @property +    def number(self) -> str: +        """The number of the question.""" +        return self._data["number"] + +    @property +    def description(self) -> str: +        """The description of the question.""" +        return self._data["description"] + +    @property +    def answers(self) -> list[tuple[str, str]]: +        """ +        The possible answers for this answer. + +        This is a property that returns a list of letter, answer pairs. +        """ +        return [(ascii_uppercase[i], q) for (i, q) in enumerate(self._data["answers"])] + +    @property +    def correct(self) -> str: +        """The correct answer for this question.""" +        return self._data["correct"] + +    @property +    def max_points(self) -> int: +        """The maximum points that can be awarded for this question.""" +        return self._data.get("points") or DEFAULT_QUESTION_POINTS + +    @property +    def time(self) -> float: +        """The time allowed to answer the question.""" +        return self._data.get("time") or DEFAULT_QUESTION_TIME + +    def start(self) -> float: +        """Start the question and return the time it started.""" +        self._started = time.perf_counter() +        return self._started + +    def _update_guess(self, user: int, answer: str) -> UserGuess: +        """Update an already existing guess.""" +        if self._started is None: +            raise QuestionClosed("Question is not open for answers.") + +        if self._guesses[user][1] is False: +            raise AlreadyUpdated(f"User({user}) has already updated their guess once.") + +        self._guesses[user] = (answer, False, time.perf_counter() - self._started) +        return self._guesses[user] + +    def guess(self, user: int, answer: str) -> UserGuess: +        """Add a guess made by a user to the current question.""" +        if user in self._guesses: +            return self._update_guess(user, answer) + +        if self._started is None: +            raise QuestionClosed("Question is not open for answers.") + +        self._guesses[user] = (answer, True, time.perf_counter() - self._started) +        return self._guesses[user] + +    def stop(self) -> dict[int, UserGuess]: +        """Stop the question and return the guesses that were made.""" +        guesses = self._guesses + +        self._started = None +        self._guesses = {} + +        return guesses + + +class TriviaNightGame: +    """Interface for managing a game of trivia night.""" + +    def __init__(self, data: list[QuestionData]) -> None: +        self._questions = [Question(q) for q in data] +        # A copy of the questions to keep for `.trivianight list` +        self._all_questions = list(self._questions) +        self.current_question: Optional[Question] = None +        self._points = {} +        self._speed = {} + +    def __iter__(self) -> Iterable[Question]: +        return iter(self._questions) + +    def next_question(self, number: str = None) -> Question: +        """ +        Consume one random question from the trivia night game. + +        One question is randomly picked from the list of questions which is then removed and returned. +        """ +        if self.current_question is not None: +            raise RuntimeError("Cannot call next_question() when there is a current question.") + +        if number is not None: +            try: +                question = [q for q in self._all_questions if q.number == int(number)][0] +            except IndexError: +                raise ValueError(f"Question number {number} does not exist.") +        elif len(self._questions) == 0: +            raise AllQuestionsVisited("All of the questions have been visited.") +        else: +            question = self._questions.pop(randrange(len(self._questions))) + +        self.current_question = question +        return question + +    def end_question(self) -> None: +        """ +        End the current question. + +        This method should be called when the question has been answered, it must be called before +        attempting to call `next_question()` again. +        """ +        if self.current_question is None: +            raise RuntimeError("Cannot call end_question() when there is no current question.") + +        self.current_question.stop() +        self.current_question = None + +    def list_questions(self) -> str: +        """ +        List all the questions. + +        This method should be called when `.trivianight list` is called to display the following information: +            - Question number +            - Question description +            - Visited/not visited +        """ +        question_list = [] + +        visited = ":white_check_mark:" +        not_visited = ":x:" + +        for question in self._all_questions: +            formatted_string = ( +                f"**Q{question.number}** {not_visited if question in self._questions else visited}" +                f"\n{question.description}\n\n" +            ) +            question_list.append(formatted_string.rstrip()) + +        return question_list diff --git a/bot/exts/events/trivianight/_questions.py b/bot/exts/events/trivianight/_questions.py new file mode 100644 index 00000000..d6beced9 --- /dev/null +++ b/bot/exts/events/trivianight/_questions.py @@ -0,0 +1,179 @@ +from random import choice +from string import ascii_uppercase + +import discord +from discord import Embed, Interaction +from discord.ui import Button, View + +from bot.constants import Colours, NEGATIVE_REPLIES + +from ._game import AlreadyUpdated, Question, QuestionClosed +from ._scoreboard import Scoreboard + + +class AnswerButton(Button): +    """Button subclass that's used to guess on a particular answer.""" + +    def __init__(self, label: str, question: Question): +        super().__init__(label=label, style=discord.ButtonStyle.green) + +        self.question = question + +    async def callback(self, interaction: Interaction) -> None: +        """ +        When a user interacts with the button, this will be called. + +        Parameters: +            - interaction: an instance of discord.Interaction representing the interaction between the user and the +            button. +        """ +        try: +            guess = self.question.guess(interaction.user.id, self.label) +        except AlreadyUpdated: +            await interaction.response.send_message( +                embed=Embed( +                    title=choice(NEGATIVE_REPLIES), +                    description="You've already changed your answer more than once!", +                    color=Colours.soft_red +                ), +                ephemeral=True +            ) +            return +        except QuestionClosed: +            await interaction.response.send_message( +                embed=Embed( +                    title=choice(NEGATIVE_REPLIES), +                    description="The question is no longer accepting guesses!", +                    color=Colours.soft_red +                ), +                ephemeral=True +            ) +            return + +        if guess[1]: +            await interaction.response.send_message( +                embed=Embed( +                    title="Confirming that...", +                    description=f"You chose answer {self.label}.", +                    color=Colours.soft_green +                ), +                ephemeral=True +            ) +        else: +            # guess[1] is False and they cannot change their answer again. Which +            # indicates that they changed it this time around. +            await interaction.response.send_message( +                embed=Embed( +                    title="Confirming that...", +                    description=f"You changed your answer to answer {self.label}.", +                    color=Colours.soft_green +                ), +                ephemeral=True +            ) + + +class QuestionView(View): +    """View for one trivia night question.""" + +    def __init__(self, question: Question) -> None: +        super().__init__() +        self.question = question + +        for letter, _ in self.question.answers: +            self.add_item(AnswerButton(letter, self.question)) + +    @staticmethod +    def unicodeify(text: str) -> str: +        """ +        Takes `text` and adds zero-width spaces to prevent copy and pasting the question. + +        Parameters: +            - text: A string that represents the question description to 'unicodeify' +        """ +        return "".join( +            f"{letter}\u200b" if letter not in ('\n', '\t', '`', 'p', 'y') else letter +            for idx, letter in enumerate(text) +        ) + +    def create_embed(self) -> Embed: +        """Helper function to create the embed for the current question.""" +        question_embed = Embed( +            title=f"Question {self.question.number}", +            description=self.unicodeify(self.question.description), +            color=Colours.python_yellow +        ) + +        for label, answer in self.question.answers: +            question_embed.add_field(name=f"Answer {label}", value=answer, inline=False) + +        return question_embed + +    def end_question(self, scoreboard: Scoreboard) -> Embed: +        """ +        Ends the question and displays the statistics on who got the question correct, awards points, etc. + +        Returns: +            An embed displaying the correct answers and the % of people that chose each answer. +        """ +        guesses = self.question.stop() + +        labels = ascii_uppercase[:len(self.question.answers)] + +        answer_embed = Embed( +            title=f"The correct answer for Question {self.question.number} was...", +            description=self.question.correct +        ) + +        if len(guesses) != 0: +            answers_chosen = { +                answer_choice: len( +                    tuple(filter(lambda x: x[0] == answer_choice, guesses.values())) +                ) +                for answer_choice in labels +            } + +            answers_chosen = dict( +                sorted(list(answers_chosen.items()), key=lambda item: item[1], reverse=True) +            ) + +            for answer, people_answered in answers_chosen.items(): +                is_correct_answer = dict(self.question.answers)[answer[0]] == self.question.correct + +                # Setting the color of answer_embed to the % of people that got it correct via the mapping +                if is_correct_answer: +                    # Maps the % of people who got it right to a color, from a range of red to green +                    percentage_to_color = [0xFC94A1, 0xFFCCCB, 0xCDFFCC, 0xB0F5AB, 0xB0F5AB] +                    answer_embed.color = percentage_to_color[round(people_answered / len(guesses) * 100) // 25] + +                field_title = ( +                    (":white_check_mark: " if is_correct_answer else "") +                    + f"{people_answered} players ({people_answered / len(guesses) * 100:.1f}%) chose" +                ) + +                # The `ord` function is used here to change the letter to its corresponding position +                answer_embed.add_field( +                    name=field_title, +                    value=self.question.answers[ord(answer) - 65][1], +                    inline=False +                ) + +            # Assign points to users +            for user_id, answer in guesses.items(): +                if dict(self.question.answers)[answer[0]] == self.question.correct: +                    scoreboard.assign_points( +                        int(user_id), +                        points=(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points, +                        speed=answer[-1] +                    ) +                elif answer[-1] <= 2: +                    scoreboard.assign_points( +                        int(user_id), +                        points=-(1 - (answer[-1] / self.question.time) / 2) * self.question.max_points +                    ) +                else: +                    scoreboard.assign_points( +                        int(user_id), +                        points=0 +                    ) + +        return answer_embed diff --git a/bot/exts/events/trivianight/_scoreboard.py b/bot/exts/events/trivianight/_scoreboard.py new file mode 100644 index 00000000..a5a5fcac --- /dev/null +++ b/bot/exts/events/trivianight/_scoreboard.py @@ -0,0 +1,186 @@ +from random import choice + +import discord.ui +from discord import ButtonStyle, Embed, Interaction, Member +from discord.ui import Button, View + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + + +class ScoreboardView(View): +    """View for the scoreboard.""" + +    def __init__(self, bot: Bot): +        super().__init__() +        self.bot = bot + +    @staticmethod +    def _int_to_ordinal(number: int) -> str: +        """ +        Converts an integer into an ordinal number, i.e. 1 to 1st. + +        Parameters: +            - number: an integer representing the number to convert to an ordinal number. +        """ +        suffix = ["th", "st", "nd", "rd", "th"][min(number % 10, 4)] +        if (number % 100) in {11, 12, 13}: +            suffix = "th" + +        return str(number) + suffix + +    async def create_main_leaderboard(self) -> Embed: +        """ +        Helper function that iterates through `self.points` to generate the main leaderboard embed. + +        The main leaderboard would be formatted like the following: +        **1**. @mention of the user (# of points) +        along with the 29 other users who made it onto the leaderboard. +        """ +        formatted_string = "" + +        for current_placement, (user, points) in enumerate(self.points.items()): +            if current_placement + 1 > 30: +                break + +            user = await self.bot.fetch_user(int(user)) +            formatted_string += f"**{current_placement + 1}.** {user.mention} " +            formatted_string += f"({points:.1f} pts)\n" +            if (current_placement + 1) % 10 == 0: +                formatted_string += "β―β―β―β―β―β―β―β―\n" + +        main_embed = Embed( +            title="Winners of the Trivia Night", +            description=formatted_string, +            color=Colours.python_blue, +        ) + +        return main_embed + +    async def _create_speed_embed(self) -> Embed: +        """ +        Helper function that iterates through `self.speed` to generate a leaderboard embed. + +        The speed leaderboard would be formatted like the following: +        **1**. @mention of the user ([average speed as a float with the precision of one decimal point]s) +        along with the 29 other users who made it onto the leaderboard. +        """ +        formatted_string = "" + +        for current_placement, (user, time_taken) in enumerate(self.speed.items()): +            if current_placement + 1 > 30: +                break + +            user = await self.bot.fetch_user(int(user)) +            formatted_string += f"**{current_placement + 1}.** {user.mention} " +            formatted_string += f"({(time_taken[-1] / time_taken[0]):.1f}s)\n" +            if (current_placement + 1) % 10 == 0: +                formatted_string += "β―β―β―β―β―β―β―β―\n" + +        speed_embed = Embed( +            title="Average time taken to answer a question", +            description=formatted_string, +            color=Colours.python_blue +        ) +        return speed_embed + +    def _get_rank(self, member: Member) -> Embed: +        """ +        Gets the member's rank for the points leaderboard and speed leaderboard. + +        Parameters: +            - member: An instance of discord.Member representing the person who is trying to get their rank. +        """ +        rank_embed = Embed(title=f"Ranks for {member.display_name}", color=Colours.python_blue) +        # These are stored as strings so that the last digit can be determined to choose the suffix +        try: +            points_rank = str(list(self.points).index(member.id) + 1) +            speed_rank = str(list(self.speed).index(member.id) + 1) +        except ValueError: +            return Embed( +                title=choice(NEGATIVE_REPLIES), +                description="It looks like you didn't participate in the Trivia Night event!", +                color=Colours.soft_red +            ) + +        rank_embed.add_field( +            name="Total Points", +            value=( +                f"You got {self._int_to_ordinal(int(points_rank))} place" +                f" with {self.points[member.id]:.1f} points." +            ), +            inline=False +        ) + +        rank_embed.add_field( +            name="Average Speed", +            value=( +                f"You got {self._int_to_ordinal(int(speed_rank))} place" +                f" with a time of {(self.speed[member.id][1] / self.speed[member.id][0]):.1f} seconds." +            ), +            inline=False +        ) +        return rank_embed + +    @discord.ui.button(label="Scoreboard for Speed", style=ButtonStyle.green) +    async def speed_leaderboard(self, button: Button, interaction: Interaction) -> None: +        """ +        Send an ephemeral message with the speed leaderboard embed. + +        Parameters: +            - button: The discord.ui.Button instance representing the `Speed Leaderboard` button. +            - interaction: The discord.Interaction instance containing information on the interaction between the user +            and the button. +        """ +        await interaction.response.send_message(embed=await self._create_speed_embed(), ephemeral=True) + +    @discord.ui.button(label="What's my rank?", style=ButtonStyle.blurple) +    async def rank_button(self, button: Button, interaction: Interaction) -> None: +        """ +        Send an ephemeral message with the user's rank for the overall points/average speed. + +        Parameters: +            - button: The discord.ui.Button instance representing the `What's my rank?` button. +            - interaction: The discord.Interaction instance containing information on the interaction between the user +            and the button. +        """ +        await interaction.response.send_message(embed=self._get_rank(interaction.user), ephemeral=True) + + +class Scoreboard: +    """Class for the scoreboard for the Trivia Night event.""" + +    def __init__(self, bot: Bot): +        self._bot = bot +        self._points = {} +        self._speed = {} + +    def assign_points(self, user_id: int, *, points: int = None, speed: float = None) -> None: +        """ +        Assign points or deduct points to/from a certain user. + +        This method should be called once the question has finished and all answers have been registered. +        """ +        if points is not None and user_id not in self._points.keys(): +            self._points[user_id] = points +        elif points is not None: +            self._points[user_id] += points + +        if speed is not None and user_id not in self._speed.keys(): +            self._speed[user_id] = [1, speed] +        elif speed is not None: +            self._speed[user_id] = [ +                self._speed[user_id][0] + 1, self._speed[user_id][1] + speed +            ] + +    async def display(self, speed_leaderboard: bool = False) -> tuple[Embed, View]: +        """Returns the embed of the main leaderboard along with the ScoreboardView.""" +        view = ScoreboardView(self._bot) + +        view.points = dict(sorted(self._points.items(), key=lambda item: item[-1], reverse=True)) +        view.speed = dict(sorted(self._speed.items(), key=lambda item: item[-1][1] / item[-1][0])) + +        return ( +            await view.create_main_leaderboard(), +            view if not speed_leaderboard else await view._create_speed_embed() +        ) diff --git a/bot/exts/events/trivianight/trivianight.py b/bot/exts/events/trivianight/trivianight.py new file mode 100644 index 00000000..18d8327a --- /dev/null +++ b/bot/exts/events/trivianight/trivianight.py @@ -0,0 +1,328 @@ +import asyncio +from json import JSONDecodeError, loads +from random import choice +from typing import Optional + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES, POSITIVE_REPLIES, Roles +from bot.utils.pagination import LinePaginator + +from ._game import AllQuestionsVisited, TriviaNightGame +from ._questions import QuestionView +from ._scoreboard import Scoreboard + +# The ID you see below are the Events Lead role ID and the Event Runner Role ID +TRIVIA_NIGHT_ROLES = (Roles.admins, 778361735739998228, 940911658799333408) + + +class TriviaNightCog(commands.Cog): +    """Cog for the Python Trivia Night event.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.game: Optional[TriviaNightGame] = None +        self.scoreboard: Optional[Scoreboard] = None +        self.question_closed: asyncio.Event = None + +    @commands.group(aliases=["tn"], invoke_without_command=True) +    async def trivianight(self, ctx: commands.Context) -> None: +        """ +        The command group for the Python Discord Trivia Night. + +        If invoked without a subcommand (i.e. simply .trivianight), it will explain what the Trivia Night event is. +        """ +        cog_description = Embed( +            title="What is .trivianight?", +            description=( +                "This 'cog' is for the Python Discord's TriviaNight (date tentative)! Compete against other" +                " players in a trivia about Python!" +            ), +            color=Colours.soft_green +        ) +        await ctx.send(embed=cog_description) + +    @trivianight.command() +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def load(self, ctx: commands.Context, *, to_load: Optional[str]) -> None: +        """ +        Loads a JSON file from the provided attachment or argument. + +        The JSON provided is formatted where it is a list of dictionaries, each dictionary containing the keys below: +        - number: int (represents the current question #) +        - description: str (represents the question itself) +        - answers: list[str] (represents the different answers possible, must be a length of 4) +        - correct: str (represents the correct answer in terms of what the correct answer is in `answers` +        - time: Optional[int] (represents the timer for the question and how long it should run, default is 10) +        - points: Optional[int] (represents how many points are awarded for each question, default is 10) + +        The load command accepts three different ways of loading in a JSON: +        - an attachment of the JSON file +        - a message link to the attachment/JSON +        - reading the JSON itself via a codeblock or plain text +        """ +        if self.game is not None: +            await ctx.send(embed=Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is already a trivia night running!", +                color=Colours.soft_red +            )) +            return + +        if ctx.message.attachments: +            json_text = (await ctx.message.attachments[0].read()).decode("utf8") +        elif not to_load: +            raise commands.BadArgument("You didn't attach an attachment nor link a message!") +        elif ( +            to_load.startswith("https://discord.com/channels") +            or to_load.startswith("https://discordapp.com/channels") +        ): +            channel_id, message_id = to_load.split("/")[-2:] +            channel = await ctx.guild.fetch_channel(int(channel_id)) +            message = await channel.fetch_message(int(message_id)) +            if message.attachments: +                json_text = (await message.attachments[0].read()).decode("utf8") +            else: +                json_text = message.content.replace("```", "").replace("json", "").replace("\n", "") +        else: +            json_text = to_load.replace("```", "").replace("json", "").replace("\n", "") + +        try: +            serialized_json = loads(json_text) +        except JSONDecodeError as error: +            raise commands.BadArgument(f"Looks like something went wrong:\n{str(error)}") + +        self.game = TriviaNightGame(serialized_json) +        self.question_closed = asyncio.Event() + +        success_embed = Embed( +            title=choice(POSITIVE_REPLIES), +            description="The JSON was loaded successfully!", +            color=Colours.soft_green +        ) + +        self.scoreboard = Scoreboard(self.bot) + +        await ctx.send(embed=success_embed) + +    @trivianight.command(aliases=('next',)) +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def question(self, ctx: commands.Context, question_number: str = None) -> None: +        """ +        Gets a random question from the unanswered question list and lets the user(s) choose the answer. + +        This command will continuously count down until the time limit of the question is exhausted. +        However, if `.trivianight stop` is invoked, the counting down is interrupted to show the final results. +        """ +        if self.game is None: +            await ctx.send(embed=Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is no trivia night running!", +                color=Colours.soft_red +            )) +            return + +        if self.game.current_question is not None: +            error_embed = Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is already an ongoing question!", +                color=Colours.soft_red +            ) +            await ctx.send(embed=error_embed) +            return + +        try: +            next_question = self.game.next_question(question_number) +        except AllQuestionsVisited: +            error_embed = Embed( +                title=choice(NEGATIVE_REPLIES), +                description="All of the questions have been used.", +                color=Colours.soft_red +            ) +            await ctx.send(embed=error_embed) +            return + +        await ctx.send("Next question in 3 seconds! Get ready...") +        await asyncio.sleep(3) + +        question_view = QuestionView(next_question) +        question_embed = question_view.create_embed() + +        next_question.start() +        message = await ctx.send(embed=question_embed, view=question_view) + +        # Exponentially sleep less and less until the time limit is reached +        percentage = 1 +        while True: +            percentage *= 0.5 +            duration = next_question.time * percentage + +            await asyncio.wait([self.question_closed.wait()], timeout=duration) + +            if self.question_closed.is_set(): +                await ctx.send(embed=question_view.end_question(self.scoreboard)) +                await message.edit(embed=question_embed, view=None) + +                self.game.end_question() +                self.question_closed.clear() +                return + +            if int(duration) > 1: +                # It is quite ugly to display decimals, the delay for requests to reach Discord +                # cause sub-second accuracy to be quite pointless. +                await ctx.send(f"{int(duration)}s remaining...") +            else: +                # Since each time we divide the percentage by 2 and sleep one half of the halves (then sleep a +                # half, of that half) we must sleep both halves at the end. +                await asyncio.wait([self.question_closed.wait()], timeout=duration) +                if self.question_closed.is_set(): +                    await ctx.send(embed=question_view.end_question(self.scoreboard)) +                    await message.edit(embed=question_embed, view=None) + +                    self.game.end_question() +                    self.question_closed.clear() +                    return +                break + +        await ctx.send(embed=question_view.end_question(self.scoreboard)) +        await message.edit(embed=question_embed, view=None) + +        self.game.end_question() + +    @trivianight.command() +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def list(self, ctx: commands.Context) -> None: +        """ +        Display all the questions left in the question bank. + +        Questions are displayed in the following format: +        Q(number): Question description | :white_check_mark: if the question was used otherwise :x:. +        """ +        if self.game is None: +            await ctx.send(embed=Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is no trivia night running!", +                color=Colours.soft_red +            )) +            return + +        question_list = self.game.list_questions() + +        list_embed = Embed(title="All Trivia Night Questions") + +        if len(question_list) == 1: +            list_embed.description = question_list[0] +            await ctx.send(embed=list_embed) +        else: +            await LinePaginator.paginate( +                question_list, +                ctx, +                list_embed +            ) + +    @trivianight.command() +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def stop(self, ctx: commands.Context) -> None: +        """ +        End the ongoing question to show the correct question. + +        This command should be used if the question should be ended early or if the time limit fails +        """ +        if self.game is None: +            await ctx.send(embed=Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is no trivia night running!", +                color=Colours.soft_red +            )) +            return + +        if self.game.current_question is None: +            error_embed = Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is no ongoing question!", +                color=Colours.soft_red +            ) +            await ctx.send(embed=error_embed) +            return + +        self.question_closed.set() + +    @trivianight.command() +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def end(self, ctx: commands.Context) -> None: +        """ +        Displays the scoreboard view. + +        The scoreboard view consists of the two scoreboards with the 30 players who got the highest points and the +        30 players who had the fastest average response time to a question where they got the question right. + +        The scoreboard view also has a button where the user can see their own rank, points and average speed if they +        didn't make it onto the leaderboard. +        """ +        if self.game is None: +            await ctx.send(embed=Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is no trivia night running!", +                color=Colours.soft_red +            )) +            return + +        if self.game.current_question is not None: +            error_embed = Embed( +                title=choice(NEGATIVE_REPLIES), +                description="You can't end the event while a question is ongoing!", +                color=Colours.soft_red +            ) +            await ctx.send(embed=error_embed) +            return + +        scoreboard_embed, scoreboard_view = await self.scoreboard.display() +        await ctx.send(embed=scoreboard_embed, view=scoreboard_view) + +    @trivianight.command() +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def scoreboard(self, ctx: commands.Context) -> None: +        """ +        Displays the scoreboard. + +        The scoreboard consists of the two scoreboards with the 30 players who got the highest points and the +        30 players who had the fastest average response time to a question where they got the question right. +        """ +        if self.game is None: +            await ctx.send(embed=Embed( +                title=choice(NEGATIVE_REPLIES), +                description="There is no trivia night running!", +                color=Colours.soft_red +            )) +            return + +        if self.game.current_question is not None: +            error_embed = Embed( +                title=choice(NEGATIVE_REPLIES), +                description="You can't end the event while a question is ongoing!", +                color=Colours.soft_red +            ) +            await ctx.send(embed=error_embed) +            return + +        scoreboard_embed, speed_scoreboard = await self.scoreboard.display(speed_leaderboard=True) +        await ctx.send(embeds=(scoreboard_embed, speed_scoreboard)) + +    @trivianight.command() +    @commands.has_any_role(*TRIVIA_NIGHT_ROLES) +    async def end_game(self, ctx: commands.Context) -> None: +        """Ends the ongoing game.""" +        self.game = None + +        await ctx.send(embed=Embed( +            title=choice(POSITIVE_REPLIES), +            description="The game has been stopped.", +            color=Colours.soft_green +        )) + + +def setup(bot: Bot) -> None: +    """Load the TriviaNight cog.""" +    bot.add_cog(TriviaNightCog(bot)) diff --git a/bot/exts/fun/anagram.py b/bot/exts/fun/anagram.py new file mode 100644 index 00000000..79280fa9 --- /dev/null +++ b/bot/exts/fun/anagram.py @@ -0,0 +1,109 @@ +import asyncio +import json +import logging +import random +from pathlib import Path + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +log = logging.getLogger(__name__) + +TIME_LIMIT = 60 + +# anagram.json file contains all the anagrams +with open(Path("bot/resources/fun/anagram.json"), "r") as f: +    ANAGRAMS_ALL = json.load(f) + + +class AnagramGame: +    """ +    Used for creating instances of anagram games. + +    Once multiple games can be run at the same time, this class' instances +    can be used for keeping track of each anagram game. +    """ + +    def __init__(self, scrambled: str, correct: list[str]) -> None: +        self.scrambled = scrambled +        self.correct = set(correct) + +        self.winners = set() + +    async def message_creation(self, message: discord.Message) -> None: +        """Check if the message is a correct answer and remove it from the list of answers.""" +        if message.content.lower() in self.correct: +            self.winners.add(message.author.mention) +            self.correct.remove(message.content.lower()) + + +class Anagram(commands.Cog): +    """Cog for the Anagram game command.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +        self.games: dict[int, AnagramGame] = {} + +    @commands.command(name="anagram", aliases=("anag", "gram", "ag")) +    async def anagram_command(self, ctx: commands.Context) -> None: +        """ +        Given shuffled letters, rearrange them into anagrams. + +        Show an embed with scrambled letters which if rearranged can form words. +        After a specific amount of time, list the correct answers and whether someone provided a +        correct answer. +        """ +        if self.games.get(ctx.channel.id): +            await ctx.send("An anagram is already being solved in this channel!") +            return + +        scrambled_letters, correct = random.choice(list(ANAGRAMS_ALL.items())) + +        game = AnagramGame(scrambled_letters, correct) +        self.games[ctx.channel.id] = game + +        anagram_embed = discord.Embed( +            title=f"Find anagrams from these letters: '{scrambled_letters.upper()}'", +            description=f"You have {TIME_LIMIT} seconds to find correct words.", +            colour=Colours.purple, +        ) + +        await ctx.send(embed=anagram_embed) +        await asyncio.sleep(TIME_LIMIT) + +        if game.winners: +            win_list = ", ".join(game.winners) +            content = f"Well done {win_list} for getting it right!" +        else: +            content = "Nobody got it right." + +        answer_embed = discord.Embed( +            title=f"The words were:  `{'`, `'.join(ANAGRAMS_ALL[game.scrambled])}`!", +            colour=Colours.pink, +        ) + +        await ctx.send(content, embed=answer_embed) + +        # Game is finished, let's remove it from the dict +        self.games.pop(ctx.channel.id) + +    @commands.Cog.listener() +    async def on_message(self, message: discord.Message) -> None: +        """Check a message for an anagram attempt and pass to an ongoing game.""" +        if message.author.bot or not message.guild: +            return + +        game = self.games.get(message.channel.id) +        if not game: +            return + +        await game.message_creation(message) + + +def setup(bot: Bot) -> None: +    """Load the Anagram cog.""" +    bot.add_cog(Anagram(bot)) diff --git a/bot/exts/fun/battleship.py b/bot/exts/fun/battleship.py index f4351954..beff196f 100644 --- a/bot/exts/fun/battleship.py +++ b/bot/exts/fun/battleship.py @@ -369,7 +369,6 @@ class Battleship(commands.Cog):          return any(player in (game.p1.user, game.p2.user) for game in self.games)      @commands.group(invoke_without_command=True) -    @commands.guild_only()      async def battleship(self, ctx: commands.Context) -> None:          """          Play a game of Battleship with someone else! diff --git a/bot/exts/fun/connect_four.py b/bot/exts/fun/connect_four.py index 647bb2b7..f53695d5 100644 --- a/bot/exts/fun/connect_four.py +++ b/bot/exts/fun/connect_four.py @@ -6,7 +6,6 @@ from typing import Optional, Union  import discord  import emojis  from discord.ext import commands -from discord.ext.commands import guild_only  from bot.bot import Bot  from bot.constants import Emojis @@ -361,7 +360,6 @@ class ConnectFour(commands.Cog):                  self.games.remove(game)              raise -    @guild_only()      @commands.group(          invoke_without_command=True,          aliases=("4inarow", "connect4", "connectfour", "c4"), @@ -426,7 +424,6 @@ class ConnectFour(commands.Cog):          await self._play_game(ctx, user, board_size, str(emoji1), str(emoji2)) -    @guild_only()      @connect_four.command(aliases=("bot", "computer", "cpu"))      async def ai(          self, diff --git a/bot/exts/fun/duck_game.py b/bot/exts/fun/duck_game.py index 1ef7513f..10b03a49 100644 --- a/bot/exts/fun/duck_game.py +++ b/bot/exts/fun/duck_game.py @@ -11,7 +11,7 @@ from PIL import Image, ImageDraw, ImageFont  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Colours, MODERATION_ROLES +from bot.constants import MODERATION_ROLES  from bot.utils.decorators import with_role  DECK = list(product(*[(0, 1, 2)]*4)) @@ -130,6 +130,9 @@ class DuckGame:          while len(self.solutions) < minimum_solutions:              self.board = random.sample(DECK, size) +        self.board_msg = None +        self.found_msg = None +      @property      def board(self) -> list[tuple[int]]:          """Accesses board property.""" @@ -181,7 +184,7 @@ class DuckGamesDirector(commands.Cog):      )      @commands.cooldown(rate=1, per=2, type=commands.BucketType.channel)      async def start_game(self, ctx: commands.Context) -> None: -        """Generate a board, send the game embed, and end the game after a time limit.""" +        """Start a new Duck Duck Duck Goose game."""          if ctx.channel.id in self.current_games:              await ctx.send("There's already a game running!")              return @@ -191,8 +194,8 @@ class DuckGamesDirector(commands.Cog):          game.running = True          self.current_games[ctx.channel.id] = game -        game.msg_content = "" -        game.embed_msg = await self.send_board_embed(ctx, game) +        game.board_msg = await self.send_board_embed(ctx, game) +        game.found_msg = await self.send_found_embed(ctx)          await asyncio.sleep(GAME_DURATION)          # Checking for the channel ID in the currently running games is not sufficient. @@ -245,13 +248,13 @@ class DuckGamesDirector(commands.Cog):          if answer in game.solutions:              game.claimed_answers[answer] = msg.author              game.scores[msg.author] += CORRECT_SOLN -            await self.display_claimed_answer(game, msg.author, answer) +            await self.append_to_found_embed(game, f"{str(answer):12s}  -  {msg.author.display_name}")          else:              await msg.add_reaction(EMOJI_WRONG)              game.scores[msg.author] += INCORRECT_SOLN      async def send_board_embed(self, ctx: commands.Context, game: DuckGame) -> discord.Message: -        """Create and send the initial game embed. This will be edited as the game goes on.""" +        """Create and send an embed to display the board."""          image = assemble_board_image(game.board, game.rows, game.columns)          with BytesIO() as image_stream:              image.save(image_stream, format="png") @@ -259,19 +262,27 @@ class DuckGamesDirector(commands.Cog):              file = discord.File(fp=image_stream, filename="board.png")          embed = discord.Embed(              title="Duck Duck Duck Goose!", -            color=Colours.bright_green, +            color=discord.Color.dark_purple(),          )          embed.set_image(url="attachment://board.png")          return await ctx.send(embed=embed, file=file) -    async def display_claimed_answer(self, game: DuckGame, author: discord.Member, answer: tuple[int]) -> None: -        """Add a claimed answer to the game embed.""" +    async def send_found_embed(self, ctx: commands.Context) -> discord.Message: +        """Create and send an embed to display claimed answers. This will be edited as the game goes on.""" +        # Can't be part of the board embed because of discord.py limitations with editing an embed with an image. +        embed = discord.Embed( +            title="Flights Found", +            color=discord.Color.dark_purple(), +        ) +        return await ctx.send(embed=embed) + +    async def append_to_found_embed(self, game: DuckGame, text: str) -> None: +        """Append text to the claimed answers embed."""          async with game.editing_embed: -            # We specifically edit the message contents instead of the embed -            # Because we load in the image from the file, editing any portion of the embed -            # Does weird things to the image and this works around that weirdness -            game.msg_content = f"{game.msg_content}\n{str(answer):12s}  -  {author.display_name}" -            await game.embed_msg.edit(content=game.msg_content) +            found_embed, = game.found_msg.embeds +            old_desc = found_embed.description or "" +            found_embed.description = f"{old_desc.rstrip()}\n{text}" +            await game.found_msg.edit(embed=found_embed)      async def end_game(self, channel: discord.TextChannel, game: DuckGame, end_message: str) -> None:          """Edit the game embed to reflect the end of the game and mark the game as not running.""" @@ -296,8 +307,7 @@ class DuckGamesDirector(commands.Cog):              missed_text = "Flights everyone missed:\n" + "\n".join(f"{ans}" for ans in missed)          else:              missed_text = "All the flights were found!" - -        await game.embed_msg.edit(content=f"{missed_text}") +        await self.append_to_found_embed(game, f"\n{missed_text}")      @start_game.command(name="help")      async def show_rules(self, ctx: commands.Context) -> None: diff --git a/bot/exts/fun/game.py b/bot/exts/fun/game.py index f9c150e6..5f56bef7 100644 --- a/bot/exts/fun/game.py +++ b/bot/exts/fun/game.py @@ -118,6 +118,7 @@ class GameStatus(IntEnum):      Offline = 5      Cancelled = 6      Rumored = 7 +    Delisted = 8  class AgeRatingCategories(IntEnum): @@ -125,6 +126,11 @@ class AgeRatingCategories(IntEnum):      ESRB = 1      PEGI = 2 +    CERO = 3 +    USK = 4 +    GRAC = 5 +    CLASS_IND = 6 +    ACB = 7  class AgeRatings(IntEnum): @@ -142,6 +148,32 @@ class AgeRatings(IntEnum):      T = 10      M = 11      AO = 12 +    CERO_A = 13 +    CERO_B = 14 +    CERO_C = 15 +    CERO_D = 16 +    CERO_Z = 17 +    USK_0 = 18 +    USK_6 = 19 +    USK_12 = 20 +    USK_18 = 21 +    GRAC_ALL = 22 +    GRAC_Twelve = 23 +    GRAC_Fifteen = 24 +    GRAC_Eighteen = 25 +    GRAC_TESTING = 26 +    CLASS_IND_L = 27 +    CLASS_IND_Ten = 28 +    CLASS_IND_Twelve = 29 +    CLASS_IND_Fourteen = 30 +    CLASS_IND_Sixteen = 31 +    CLASS_IND_Eighteen = 32 +    ACB_G = 33 +    ACB_PG = 34 +    ACB_M = 35 +    ACB_MA15 = 36 +    ACB_R18 = 37 +    ACB_RC = 38  class Games(Cog): diff --git a/bot/exts/fun/latex.py b/bot/exts/fun/latex.py new file mode 100644 index 00000000..d43ec8c4 --- /dev/null +++ b/bot/exts/fun/latex.py @@ -0,0 +1,130 @@ +import hashlib +import re +import string +from io import BytesIO +from pathlib import Path +from typing import BinaryIO, Optional + +import discord +from PIL import Image +from discord.ext import commands + +from bot.bot import Bot + +FORMATTED_CODE_REGEX = re.compile( +    r"(?P<delim>(?P<block>```)|``?)"        # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block +    r"(?(block)(?:(?P<lang>[a-z]+)\n)?)"    # if we're in a block, match optional language (only letters plus newline) +    r"(?:[ \t]*\n)*"                        # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all code inside the markup +    r"\s*"                                  # any more whitespace before the end of the code markup +    r"(?P=delim)",                          # match the exact same delimiter from the start again +    re.DOTALL | re.IGNORECASE,              # "." also matches newlines, case insensitive +) + +LATEX_API_URL = "https://rtex.probablyaweb.site/api/v2" +PASTEBIN_URL = "https://paste.pythondiscord.com" + +THIS_DIR = Path(__file__).parent +CACHE_DIRECTORY = THIS_DIR / "_latex_cache" +CACHE_DIRECTORY.mkdir(exist_ok=True) +TEMPLATE = string.Template(Path("bot/resources/fun/latex_template.txt").read_text()) + +PAD = 10 + + +def _prepare_input(text: str) -> str: +    """Extract latex from a codeblock, if it is in one.""" +    if match := FORMATTED_CODE_REGEX.match(text): +        return match.group("code") +    else: +        return text + + +def _process_image(data: bytes, out_file: BinaryIO) -> None: +    """Read `data` as an image file, and paste it on a white background.""" +    image = Image.open(BytesIO(data)).convert("RGBA") +    width, height = image.size +    background = Image.new("RGBA", (width + 2 * PAD, height + 2 * PAD), "WHITE") + +    # paste the image on the background, using the same image as the mask +    # when an RGBA image is passed as the mask, its alpha band is used. +    # this has the effect of skipping pasting the pixels where the image is transparent. +    background.paste(image, (PAD, PAD), image) +    background.save(out_file) + + +class InvalidLatexError(Exception): +    """Represents an error caused by invalid latex.""" + +    def __init__(self, logs: Optional[str]): +        super().__init__(logs) +        self.logs = logs + + +class Latex(commands.Cog): +    """Renders latex.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    async def _generate_image(self, query: str, out_file: BinaryIO) -> None: +        """Make an API request and save the generated image to cache.""" +        payload = {"code": query, "format": "png"} +        async with self.bot.http_session.post(LATEX_API_URL, data=payload, raise_for_status=True) as response: +            response_json = await response.json() +        if response_json["status"] != "success": +            raise InvalidLatexError(logs=response_json.get("log")) +        async with self.bot.http_session.get( +            f"{LATEX_API_URL}/{response_json['filename']}", +            raise_for_status=True +        ) as response: +            _process_image(await response.read(), out_file) + +    async def _upload_to_pastebin(self, text: str) -> Optional[str]: +        """Uploads `text` to the paste service, returning the url if successful.""" +        try: +            async with self.bot.http_session.post( +                PASTEBIN_URL + "/documents", +                data=text, +                raise_for_status=True +            ) as response: +                response_json = await response.json() +            if "key" in response_json: +                return f"{PASTEBIN_URL}/{response_json['key']}.txt?noredirect" +        except Exception: +            # 400 (Bad Request) means there are too many characters +            pass + +    @commands.command() +    @commands.max_concurrency(1, commands.BucketType.guild, wait=True) +    async def latex(self, ctx: commands.Context, *, query: str) -> None: +        """Renders the text in latex and sends the image.""" +        query = _prepare_input(query) + +        # the hash of the query is used as the filename in the cache. +        query_hash = hashlib.md5(query.encode()).hexdigest() +        image_path = CACHE_DIRECTORY / f"{query_hash}.png" +        async with ctx.typing(): +            if not image_path.exists(): +                try: +                    with open(image_path, "wb") as out_file: +                        await self._generate_image(TEMPLATE.substitute(text=query), out_file) +                except InvalidLatexError as err: +                    embed = discord.Embed(title="Failed to render input.") +                    if err.logs is None: +                        embed.description = "No logs available." +                    else: +                        logs_paste_url = await self._upload_to_pastebin(err.logs) +                        if logs_paste_url: +                            embed.description = f"[View Logs]({logs_paste_url})" +                        else: +                            embed.description = "Couldn't upload logs." +                    await ctx.send(embed=embed) +                    image_path.unlink() +                    return +            await ctx.send(file=discord.File(image_path, "latex.png")) + + +def setup(bot: Bot) -> None: +    """Load the Latex Cog.""" +    bot.add_cog(Latex(bot)) diff --git a/bot/exts/fun/madlibs.py b/bot/exts/fun/madlibs.py new file mode 100644 index 00000000..21708e53 --- /dev/null +++ b/bot/exts/fun/madlibs.py @@ -0,0 +1,148 @@ +import json +from asyncio import TimeoutError +from pathlib import Path +from random import choice +from typing import TypedDict + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + +TIMEOUT = 60.0 + + +class MadlibsTemplate(TypedDict): +    """Structure of a template in the madlibs JSON file.""" + +    title: str +    blanks: list[str] +    value: list[str] + + +class Madlibs(commands.Cog): +    """Cog for the Madlibs game.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.templates = self._load_templates() +        self.edited_content = {} +        self.checks = set() + +    @staticmethod +    def _load_templates() -> list[MadlibsTemplate]: +        madlibs_stories = Path("bot/resources/fun/madlibs_templates.json") + +        with open(madlibs_stories) as file: +            return json.load(file) + +    @staticmethod +    def madlibs_embed(part_of_speech: str, number_of_inputs: int) -> discord.Embed: +        """Method to generate an embed with the game information.""" +        madlibs_embed = discord.Embed(title="Madlibs", color=Colours.python_blue) + +        madlibs_embed.add_field( +            name="Enter a word that fits the given part of speech!", +            value=f"Part of speech: {part_of_speech}\n\nMake sure not to spam, or you may get auto-muted!" +        ) + +        madlibs_embed.set_footer(text=f"Inputs remaining: {number_of_inputs}") + +        return madlibs_embed + +    @commands.Cog.listener() +    async def on_message_edit(self, _: discord.Message, after: discord.Message) -> None: +        """A listener that checks for message edits from the user.""" +        for check in self.checks: +            if check(after): +                break +        else: +            return + +        self.edited_content[after.id] = after.content + +    @commands.command() +    @commands.max_concurrency(1, per=commands.BucketType.user) +    async def madlibs(self, ctx: commands.Context) -> None: +        """ +        Play Madlibs with the bot! + +        Madlibs is a game where the player is asked to enter a word that +        fits a random part of speech (e.g. noun, adjective, verb, plural noun, etc.) +        a random amount of times, depending on the story chosen by the bot at the beginning. +        """ +        random_template = choice(self.templates) + +        def author_check(message: discord.Message) -> bool: +            return message.channel.id == ctx.channel.id and message.author.id == ctx.author.id + +        self.checks.add(author_check) + +        loading_embed = discord.Embed( +            title="Madlibs", description="Loading your Madlibs game...", color=Colours.python_blue +        ) +        original_message = await ctx.send(embed=loading_embed) + +        submitted_words = {} + +        for i, part_of_speech in enumerate(random_template["blanks"]): +            inputs_left = len(random_template["blanks"]) - i + +            madlibs_embed = self.madlibs_embed(part_of_speech, inputs_left) +            await original_message.edit(embed=madlibs_embed) + +            try: +                message = await self.bot.wait_for(event="message", check=author_check, timeout=TIMEOUT) +            except TimeoutError: +                timeout_embed = discord.Embed( +                    title=choice(NEGATIVE_REPLIES), +                    description="Uh oh! You took too long to respond!", +                    color=Colours.soft_red +                ) + +                await ctx.send(ctx.author.mention, embed=timeout_embed) + +                for msg_id in submitted_words: +                    self.edited_content.pop(msg_id, submitted_words[msg_id]) + +                self.checks.remove(author_check) + +                return + +            submitted_words[message.id] = message.content + +        blanks = [self.edited_content.pop(msg_id, submitted_words[msg_id]) for msg_id in submitted_words] + +        self.checks.remove(author_check) + +        story = [] +        for value, blank in zip(random_template["value"], blanks): +            story.append(f"{value}__{blank}__") + +        # In each story template, there is always one more "value" +        # (fragment from the story) than there are blanks (words that the player enters) +        # so we need to compensate by appending the last line of the story again. +        story.append(random_template["value"][-1]) + +        story_embed = discord.Embed( +            title=random_template["title"], +            description="".join(story), +            color=Colours.bright_green +        ) + +        story_embed.set_footer(text=f"Generated for {ctx.author}", icon_url=ctx.author.display_avatar.url) + +        await ctx.send(embed=story_embed) + +    @madlibs.error +    async def handle_madlibs_error(self, ctx: commands.Context, error: commands.CommandError) -> None: +        """Error handler for the Madlibs command.""" +        if isinstance(error, commands.MaxConcurrencyReached): +            await ctx.send("You are already playing Madlibs!") +            error.handled = True + + +def setup(bot: Bot) -> None: +    """Load the Madlibs cog.""" +    bot.add_cog(Madlibs(bot)) diff --git a/bot/exts/fun/quack.py b/bot/exts/fun/quack.py new file mode 100644 index 00000000..0c228aed --- /dev/null +++ b/bot/exts/fun/quack.py @@ -0,0 +1,75 @@ +import logging +import random +from typing import Literal, Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES + +API_URL = 'https://quackstack.pythondiscord.com' + +log = logging.getLogger(__name__) + + +class Quackstack(commands.Cog): +    """Cog used for wrapping Quackstack.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @commands.command() +    async def quack( +        self, +        ctx: commands.Context, +        ducktype: Literal["duck", "manduck"] = "duck", +        *, +        seed: Optional[str] = None +    ) -> None: +        """ +        Use the Quackstack API to generate a random duck. + +        If a seed is provided, a duck is generated based on the given seed. +        Either "duck" or "manduck" can be provided to change the duck type generated. +        """ +        ducktype = ducktype.lower() +        quackstack_url = f"{API_URL}/{ducktype}" +        params = {} +        if seed is not None: +            try: +                seed = int(seed) +            except ValueError: +                # We just need to turn the string into an integer any way possible +                seed = int.from_bytes(seed.encode(), "big") +            params["seed"] = seed + +        async with self.bot.http_session.get(quackstack_url, params=params) as response: +            error_embed = discord.Embed( +                title=random.choice(NEGATIVE_REPLIES), +                description="The request failed. Please try again later.", +                color=Colours.soft_red, +            ) +            if response.status != 200: +                log.error(f"Response to Quackstack returned code {response.status}") +                await ctx.send(embed=error_embed) +                return + +            data = await response.json() +            file = data["file"] + +        embed = discord.Embed( +            title=f"Quack! Here's a {ducktype} for you.", +            description=f"A {ducktype} from Quackstack.", +            color=Colours.grass_green, +            url=f"{API_URL}/docs" +        ) + +        embed.set_image(url=API_URL + file) + +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Loads the Quack cog.""" +    bot.add_cog(Quackstack(bot)) diff --git a/bot/exts/fun/snakes/_utils.py b/bot/exts/fun/snakes/_utils.py index de51339d..182fa9d9 100644 --- a/bot/exts/fun/snakes/_utils.py +++ b/bot/exts/fun/snakes/_utils.py @@ -6,13 +6,14 @@ import math  import random  from itertools import product  from pathlib import Path +from typing import Union  from PIL import Image  from PIL.ImageDraw import ImageDraw -from discord import File, Member, Reaction +from discord import File, Member, Reaction, User  from discord.ext.commands import Cog, Context -from bot.constants import Roles +from bot.constants import MODERATION_ROLES  SNAKE_RESOURCES = Path("bot/resources/fun/snakes").absolute() @@ -395,7 +396,7 @@ class SnakeAndLaddersGame:          Listen for reactions until players have joined, and the game has been started.          """ -        def startup_event_check(reaction_: Reaction, user_: Member) -> bool: +        def startup_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -460,7 +461,7 @@ class SnakeAndLaddersGame:                  await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes -    async def _add_player(self, user: Member) -> None: +    async def _add_player(self, user: Union[User, Member]) -> None:          """Add player to game."""          self.players.append(user)          self.player_tiles[user.id] = 1 @@ -469,7 +470,7 @@ class SnakeAndLaddersGame:          im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))          self.avatar_images[user.id] = im -    async def player_join(self, user: Member) -> None: +    async def player_join(self, user: Union[User, Member]) -> None:          """          Handle players joining the game. @@ -495,7 +496,7 @@ class SnakeAndLaddersGame:              delete_after=10          ) -    async def player_leave(self, user: Member) -> bool: +    async def player_leave(self, user: Union[User, Member]) -> bool:          """          Handle players leaving the game. @@ -530,7 +531,7 @@ class SnakeAndLaddersGame:          await self.channel.send("**Snakes and Ladders**: Game has been canceled.")          self._destruct() -    async def start_game(self, user: Member) -> None: +    async def start_game(self, user: Union[User, Member]) -> None:          """          Allow the game author to begin the game. @@ -551,7 +552,7 @@ class SnakeAndLaddersGame:      async def start_round(self) -> None:          """Begin the round.""" -        def game_event_check(reaction_: Reaction, user_: Member) -> bool: +        def game_event_check(reaction_: Reaction, user_: Union[User, Member]) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -644,7 +645,7 @@ class SnakeAndLaddersGame:          if not is_surrendered:              await self._complete_round() -    async def player_roll(self, user: Member) -> None: +    async def player_roll(self, user: Union[User, Member]) -> None:          """Handle the player's roll."""          if user.id not in self.player_tiles:              await self.channel.send(user.mention + " You are not in the match.", delete_after=10) @@ -691,7 +692,7 @@ class SnakeAndLaddersGame:          await self.channel.send("**Snakes and Ladders**: " + winner.mention + " has won the game! :tada:")          self._destruct() -    def _check_winner(self) -> Member: +    def _check_winner(self) -> Union[User, Member]:          """Return a winning member if we're in the post-round state and there's a winner."""          if self.state != "post_round":              return None @@ -716,6 +717,6 @@ class SnakeAndLaddersGame:          return x_level, y_level      @staticmethod -    def _is_moderator(user: Member) -> bool: +    def _is_moderator(user: Union[User, Member]) -> bool:          """Return True if the user is a Moderator.""" -        return any(Roles.moderator == role.id for role in user.roles) +        return any(role.id in MODERATION_ROLES for role in getattr(user, 'roles', [])) diff --git a/bot/exts/fun/tic_tac_toe.py b/bot/exts/fun/tic_tac_toe.py index 5c4f8051..5dd38a81 100644 --- a/bot/exts/fun/tic_tac_toe.py +++ b/bot/exts/fun/tic_tac_toe.py @@ -3,7 +3,7 @@ import random  from typing import Callable, Optional, Union  import discord -from discord.ext.commands import Cog, Context, check, group, guild_only +from discord.ext.commands import Cog, Context, check, group  from bot.bot import Bot  from bot.constants import Emojis @@ -72,10 +72,12 @@ class Player:  class AI:      """Tic Tac Toe AI class for against computer gaming.""" -    def __init__(self, symbol: str): +    def __init__(self, bot_user: discord.Member, symbol: str): +        self.user = bot_user          self.symbol = symbol -    async def get_move(self, board: dict[int, str], _: discord.Message) -> tuple[bool, int]: +    @staticmethod +    async def get_move(board: dict[int, str], _: discord.Message) -> tuple[bool, int]:          """Get move from AI. AI use Minimax strategy."""          possible_moves = [i for i, emoji in board.items() if emoji in list(Emojis.number_emojis.values())] @@ -97,8 +99,8 @@ class AI:          return False, random.choice(open_edges)      def __str__(self) -> str: -        """Return `AI` as user name.""" -        return "AI" +        """Return mention of @Sir Lancebot.""" +        return self.user.mention  class Game: @@ -107,6 +109,7 @@ class Game:      def __init__(self, players: list[Union[Player, AI]], ctx: Context):          self.players = players          self.ctx = ctx +        self.channel = ctx.channel          self.board = {              1: Emojis.number_emojis[1],              2: Emojis.number_emojis[2], @@ -173,7 +176,8 @@ class Game:              self.canceled = True              return False, "User declined" -    async def add_reactions(self, msg: discord.Message) -> None: +    @staticmethod +    async def add_reactions(msg: discord.Message) -> None:          """Add number emojis to message."""          for nr in Emojis.number_emojis.values():              await msg.add_reaction(nr) @@ -249,7 +253,6 @@ class TicTacToe(Cog):      def __init__(self):          self.games: list[Game] = [] -    @guild_only()      @is_channel_free()      @is_requester_free()      @group(name="tictactoe", aliases=("ttt", "tic"), invoke_without_command=True) @@ -265,7 +268,7 @@ class TicTacToe(Cog):              return          if opponent is None:              game = Game( -                [Player(ctx.author, ctx, Emojis.x_square), AI(Emojis.o_square)], +                [Player(ctx.author, ctx, Emojis.x_square), AI(ctx.me, Emojis.o_square)],                  ctx              )          else: diff --git a/bot/exts/fun/trivia_quiz.py b/bot/exts/fun/trivia_quiz.py index 236586b0..4a1cec5b 100644 --- a/bot/exts/fun/trivia_quiz.py +++ b/bot/exts/fun/trivia_quiz.py @@ -16,7 +16,7 @@ from discord.ext import commands, tasks  from rapidfuzz import fuzz  from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES, Roles +from bot.constants import Client, Colours, MODERATION_ROLES, NEGATIVE_REPLIES  logger = logging.getLogger(__name__) @@ -332,7 +332,7 @@ class TriviaQuiz(commands.Cog):          if self.game_status[ctx.channel.id]:              await ctx.send(                  "Game is already running... " -                f"do `{self.bot.command_prefix}quiz stop`" +                f"do `{Client.prefix}quiz stop`"              )              return @@ -550,7 +550,7 @@ class TriviaQuiz(commands.Cog):              if self.game_status[ctx.channel.id]:                  # Check if the author is the game starter or a moderator.                  if ctx.author == self.game_owners[ctx.channel.id] or any( -                    Roles.moderator == role.id for role in ctx.author.roles +                    role.id in MODERATION_ROLES for role in getattr(ctx.author, 'roles', [])                  ):                      self.game_status[ctx.channel.id] = False                      del self.game_owners[ctx.channel.id] diff --git a/bot/exts/holidays/easter/earth_photos.py b/bot/exts/holidays/easter/earth_photos.py index f65790af..27442f1c 100644 --- a/bot/exts/holidays/easter/earth_photos.py +++ b/bot/exts/holidays/easter/earth_photos.py @@ -4,8 +4,7 @@ import discord  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Colours -from bot.constants import Tokens +from bot.constants import Colours, Tokens  log = logging.getLogger(__name__) diff --git a/bot/exts/holidays/easter/egg_facts.py b/bot/exts/holidays/easter/egg_facts.py index 5f216e0d..152af6a4 100644 --- a/bot/exts/holidays/easter/egg_facts.py +++ b/bot/exts/holidays/easter/egg_facts.py @@ -31,7 +31,7 @@ class EasterFacts(commands.Cog):          """A background task that sends an easter egg fact in the event channel everyday."""          await self.bot.wait_until_guild_available() -        channel = self.bot.get_channel(Channels.community_bot_commands) +        channel = self.bot.get_channel(Channels.sir_lancebot_playground)          await channel.send(embed=self.make_embed())      @commands.command(name="eggfact", aliases=("fact",)) diff --git a/bot/exts/holidays/halloween/candy_collection.py b/bot/exts/holidays/halloween/candy_collection.py index 4afd5913..220ba8e5 100644 --- a/bot/exts/holidays/halloween/candy_collection.py +++ b/bot/exts/holidays/halloween/candy_collection.py @@ -55,7 +55,7 @@ class CandyCollection(commands.Cog):          if message.author.bot:              return          # ensure it's hacktober channel -        if message.channel.id != Channels.community_bot_commands: +        if message.channel.id != Channels.sir_lancebot_playground:              return          # do random check for skull first as it has the lower chance @@ -77,12 +77,17 @@ class CandyCollection(commands.Cog):              return          # check to ensure it is in correct channel -        if message.channel.id != Channels.community_bot_commands: +        if message.channel.id != Channels.sir_lancebot_playground:              return          # if its not a candy or skull, and it is one of 10 most recent messages,          # proceed to add a skull/candy with higher chance          if str(reaction.emoji) not in (EMOJIS["SKULL"], EMOJIS["CANDY"]): +            # Ensure the reaction is not for a bot's message so users can't spam +            # reaction buttons like in .help to get candies. +            if message.author.bot: +                return +              recent_message_ids = map(                  lambda m: m.id,                  await self.hacktober_channel.history(limit=10).flatten() @@ -134,7 +139,7 @@ class CandyCollection(commands.Cog):      @property      def hacktober_channel(self) -> discord.TextChannel:          """Get #hacktoberbot channel from its ID.""" -        return self.bot.get_channel(id=Channels.community_bot_commands) +        return self.bot.get_channel(Channels.sir_lancebot_playground)      @staticmethod      async def send_spook_msg( @@ -182,13 +187,24 @@ class CandyCollection(commands.Cog):                  for index, record in enumerate(top_five)              ) if top_five else "No Candies" -        e = discord.Embed(colour=discord.Colour.blurple()) +        def get_user_candy_score() -> str: +            for user_id, score in records: +                if user_id == ctx.author.id: +                    return f"{ctx.author.mention}: {score}" +            return f"{ctx.author.mention}: 0" + +        e = discord.Embed(colour=discord.Colour.og_blurple())          e.add_field(              name="Top Candy Records",              value=generate_leaderboard(),              inline=False          )          e.add_field( +            name="Your Candy Score", +            value=get_user_candy_score(), +            inline=False +        ) +        e.add_field(              name="\u200b",              value="Candies will randomly appear on messages sent. "                    "\nHit the candy when it appears as fast as possible to get the candy! " diff --git a/bot/exts/holidays/halloween/scarymovie.py b/bot/exts/holidays/halloween/scarymovie.py index 33659fd8..89310b97 100644 --- a/bot/exts/holidays/halloween/scarymovie.py +++ b/bot/exts/holidays/halloween/scarymovie.py @@ -6,6 +6,7 @@ from discord.ext import commands  from bot.bot import Bot  from bot.constants import Tokens +  log = logging.getLogger(__name__) diff --git a/bot/exts/holidays/halloween/spookynamerate.py b/bot/exts/holidays/halloween/spookynamerate.py index 2e59d4a8..02fb71c3 100644 --- a/bot/exts/holidays/halloween/spookynamerate.py +++ b/bot/exts/holidays/halloween/spookynamerate.py @@ -143,7 +143,7 @@ class SpookyNameRate(Cog):              if data["author"] == ctx.author.id:                  await ctx.send(                      "But you have already added an entry! Type " -                    f"`{self.bot.command_prefix}spookynamerate " +                    f"`{Client.prefix}spookynamerate "                      "delete` to delete it, and then you can add it again"                  )                  return @@ -185,7 +185,7 @@ class SpookyNameRate(Cog):                  return          await ctx.send( -            f"But you don't have an entry... :eyes: Type `{self.bot.command_prefix}spookynamerate add your entry`" +            f"But you don't have an entry... :eyes: Type `{Client.prefix}spookynamerate add your entry`"          )      @Cog.listener() @@ -223,9 +223,9 @@ class SpookyNameRate(Cog):          if self.first_time:              await channel.send(                  "Okkey... Welcome to the **Spooky Name Rate Game**! It's a relatively simple game.\n" -                f"Everyday, a random name will be sent in <#{Channels.community_bot_commands}> " +                f"Everyday, a random name will be sent in <#{Channels.sir_lancebot_playground}> "                  "and you need to try and spookify it!\nRegister your name using " -                f"`{self.bot.command_prefix}spookynamerate add spookified name`" +                f"`{Client.prefix}spookynamerate add spookified name`"              )              await self.data.set("first_time", False) @@ -359,10 +359,10 @@ class SpookyNameRate(Cog):          """Gets the sir-lancebot-channel after waiting until ready."""          await self.bot.wait_until_ready()          channel = self.bot.get_channel( -            Channels.community_bot_commands -        ) or await self.bot.fetch_channel(Channels.community_bot_commands) +            Channels.sir_lancebot_playground +        ) or await self.bot.fetch_channel(Channels.sir_lancebot_playground)          if not channel: -            logger.warning("Bot is unable to get the #seasonalbot-commands channel. Please check the channel ID.") +            logger.warning("Bot is unable to get the #sir-lancebot-playground channel. Please check the channel ID.")          return channel      @staticmethod diff --git a/bot/exts/holidays/halloween/spookyreact.py b/bot/exts/holidays/halloween/spookyreact.py index 25e783f4..e228b91d 100644 --- a/bot/exts/holidays/halloween/spookyreact.py +++ b/bot/exts/holidays/halloween/spookyreact.py @@ -47,12 +47,12 @@ class SpookyReact(Cog):          Short-circuit helper check.          Return True if: -          * author is the bot +          * author is a bot            * prefix is not None          """ -        # Check for self reaction -        if message.author == self.bot.user: -            log.debug(f"Ignoring reactions on self message. Message ID: {message.id}") +        # Check if message author is a bot +        if message.author.bot: +            log.debug(f"Ignoring reactions on bot message. Message ID: {message.id}")              return True          # Check for command invocation diff --git a/bot/exts/holidays/hanukkah/hanukkah_embed.py b/bot/exts/holidays/hanukkah/hanukkah_embed.py index ac3eab7b..5767f91e 100644 --- a/bot/exts/holidays/hanukkah/hanukkah_embed.py +++ b/bot/exts/holidays/hanukkah/hanukkah_embed.py @@ -21,45 +21,41 @@ class HanukkahEmbed(commands.Cog):      def __init__(self, bot: Bot):          self.bot = bot -        self.hanukkah_days = [] -        self.hanukkah_months = [] -        self.hanukkah_years = [] +        self.hanukkah_dates: list[datetime.date] = [] -    async def get_hanukkah_dates(self) -> list[str]: +    def _parse_time_to_datetime(self, date: list[str]) -> datetime.datetime: +        """Format the times provided by the api to datetime forms.""" +        try: +            return datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z") +        except ValueError: +            # there is a possibility of an event not having a time, just a day +            # to catch this, we try again without time information +            return datetime.datetime.strptime(date, "%Y-%m-%d") + +    async def fetch_hanukkah_dates(self) -> list[datetime.date]:          """Gets the dates for hanukkah festival.""" -        hanukkah_dates = [] +        # clear the datetime objects to prevent a memory link +        self.hanukkah_dates = []          async with self.bot.http_session.get(HEBCAL_URL) as response:              json_data = await response.json()          festivals = json_data["items"]          for festival in festivals:              if festival["title"].startswith("Chanukah"):                  date = festival["date"] -                hanukkah_dates.append(date) -        return hanukkah_dates +                self.hanukkah_dates.append(self._parse_time_to_datetime(date).date()) +        return self.hanukkah_dates      @in_month(Month.NOVEMBER, Month.DECEMBER)      @commands.command(name="hanukkah", aliases=("chanukah",))      async def hanukkah_festival(self, ctx: commands.Context) -> None:          """Tells you about the Hanukkah Festivaltime of festival, festival day, etc).""" -        hanukkah_dates = await self.get_hanukkah_dates() -        self.hanukkah_dates_split(hanukkah_dates) -        hanukkah_start_day = int(self.hanukkah_days[0]) -        hanukkah_start_month = int(self.hanukkah_months[0]) -        hanukkah_start_year = int(self.hanukkah_years[0]) -        hanukkah_end_day = int(self.hanukkah_days[8]) -        hanukkah_end_month = int(self.hanukkah_months[8]) -        hanukkah_end_year = int(self.hanukkah_years[8]) - -        hanukkah_start = datetime.date(hanukkah_start_year, hanukkah_start_month, hanukkah_start_day) -        hanukkah_end = datetime.date(hanukkah_end_year, hanukkah_end_month, hanukkah_end_day) +        hanukkah_dates = await self.fetch_hanukkah_dates() +        start_day = hanukkah_dates[0] +        end_day = hanukkah_dates[-1]          today = datetime.date.today() -        # today = datetime.date(2019, 12, 24) (for testing) -        day = str(today.day) -        month = str(today.month) -        year = str(today.year)          embed = Embed(title="Hanukkah", colour=Colours.blue) -        if day in self.hanukkah_days and month in self.hanukkah_months and year in self.hanukkah_years: -            if int(day) == hanukkah_start_day: +        if start_day <= today <= end_day: +            if start_day == today:                  now = datetime.datetime.utcnow()                  hours = now.hour + 4  # using only hours                  hanukkah_start_hour = 18 @@ -77,35 +73,27 @@ class HanukkahEmbed(commands.Cog):                      )                      await ctx.send(embed=embed)                      return -            festival_day = self.hanukkah_days.index(day) +            festival_day = hanukkah_dates.index(today)              number_suffixes = ["st", "nd", "rd", "th"]              suffix = number_suffixes[festival_day - 1 if festival_day <= 3 else 3]              message = ":menorah:" * festival_day -            embed.description = f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}" -            await ctx.send(embed=embed) +            embed.description = ( +                f"It is the {festival_day}{suffix} day of Hanukkah!\n{message}" +            ) +        elif today < start_day: +            format_start = start_day.strftime("%d of %B") +            embed.description = ( +                "Hanukkah has not started yet. " +                f"Hanukkah will start at sundown on {format_start}." +            )          else: -            if today < hanukkah_start: -                festival_starting_month = hanukkah_start.strftime("%B") -                embed.description = ( -                    f"Hanukkah has not started yet. " -                    f"Hanukkah will start at sundown on {hanukkah_start_day}th " -                    f"of {festival_starting_month}." -                ) -            else: -                festival_end_month = hanukkah_end.strftime("%B") -                embed.description = ( -                    f"Looks like you missed Hanukkah!" -                    f"Hanukkah ended on {hanukkah_end_day}th of {festival_end_month}." -                ) - -            await ctx.send(embed=embed) +            format_end = end_day.strftime("%d of %B") +            embed.description = ( +                "Looks like you missed Hanukkah! " +                f"Hanukkah ended on {format_end}." +            ) -    def hanukkah_dates_split(self, hanukkah_dates: list[str]) -> None: -        """We are splitting the dates for hanukkah into days, months and years.""" -        for date in hanukkah_dates: -            self.hanukkah_days.append(date[8:10]) -            self.hanukkah_months.append(date[5:7]) -            self.hanukkah_years.append(date[0:4]) +        await ctx.send(embed=embed)  def setup(bot: Bot) -> None: diff --git a/bot/exts/holidays/pride/pride_facts.py b/bot/exts/holidays/pride/pride_facts.py index e6ef7108..340f0b43 100644 --- a/bot/exts/holidays/pride/pride_facts.py +++ b/bot/exts/holidays/pride/pride_facts.py @@ -30,7 +30,7 @@ class PrideFacts(commands.Cog):          """Background task to post the daily pride fact every day."""          await self.bot.wait_until_guild_available() -        channel = self.bot.get_channel(Channels.community_bot_commands) +        channel = self.bot.get_channel(Channels.sir_lancebot_playground)          await self.send_select_fact(channel, datetime.utcnow())      async def send_random_fact(self, ctx: commands.Context) -> None: diff --git a/bot/exts/holidays/pride/pride_leader.py b/bot/exts/holidays/pride/pride_leader.py index 298c9328..adf01134 100644 --- a/bot/exts/holidays/pride/pride_leader.py +++ b/bot/exts/holidays/pride/pride_leader.py @@ -83,7 +83,7 @@ class PrideLeader(commands.Cog):          embed.add_field(              name="For More Information",              value=f"Do `{constants.Client.prefix}wiki {name}`" -                  f" in <#{constants.Channels.community_bot_commands}>", +                  f" in <#{constants.Channels.sir_lancebot_playground}>",              inline=False          )          embed.set_thumbnail(url=pride_leader["url"]) diff --git a/bot/exts/holidays/valentines/be_my_valentine.py b/bot/exts/holidays/valentines/be_my_valentine.py index 4d454c3a..cbb95157 100644 --- a/bot/exts/holidays/valentines/be_my_valentine.py +++ b/bot/exts/holidays/valentines/be_my_valentine.py @@ -7,14 +7,16 @@ import discord  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Channels, Colours, Lovefest, Month +from bot.constants import Channels, Colours, Lovefest, Month, PYTHON_PREFIX  from bot.utils.decorators import in_month -from bot.utils.extensions import invoke_help_command +from bot.utils.exceptions import MovedCommandError  log = logging.getLogger(__name__)  HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_heart:", ":two_hearts:"] +MOVED_COMMAND = f"{PYTHON_PREFIX}subscribe" +  class BeMyValentine(commands.Cog):      """A cog that sends Valentines to other users!""" @@ -30,40 +32,14 @@ class BeMyValentine(commands.Cog):          return loads(p.read_text("utf8"))      @in_month(Month.FEBRUARY) -    @commands.group(name="lovefest") +    @commands.command(name="lovefest", help=f"NOTE: This command has been moved to {MOVED_COMMAND}")      async def lovefest_role(self, ctx: commands.Context) -> None:          """ -        Subscribe or unsubscribe from the lovefest role. - -        The lovefest role makes you eligible to receive anonymous valentines from other users. +        Deprecated lovefest role command. -        1) use the command \".lovefest sub\" to get the lovefest role. -        2) use the command \".lovefest unsub\" to get rid of the lovefest role. +        This command has been moved to bot, and will be removed in the future.          """ -        if not ctx.invoked_subcommand: -            await invoke_help_command(ctx) - -    @lovefest_role.command(name="sub") -    async def add_role(self, ctx: commands.Context) -> None: -        """Adds the lovefest role.""" -        user = ctx.author -        role = ctx.guild.get_role(Lovefest.role_id) -        if role not in ctx.author.roles: -            await user.add_roles(role) -            await ctx.send("The Lovefest role has been added !") -        else: -            await ctx.send("You already have the role !") - -    @lovefest_role.command(name="unsub") -    async def remove_role(self, ctx: commands.Context) -> None: -        """Removes the lovefest role.""" -        user = ctx.author -        role = ctx.guild.get_role(Lovefest.role_id) -        if role not in ctx.author.roles: -            await ctx.send("You dont have the lovefest role.") -        else: -            await user.remove_roles(role) -            await ctx.send("The lovefest role has been successfully removed!") +        raise MovedCommandError(MOVED_COMMAND)      @commands.cooldown(1, 1800, commands.BucketType.user)      @commands.group(name="bemyvalentine", invoke_without_command=True) @@ -94,7 +70,7 @@ class BeMyValentine(commands.Cog):              raise commands.UserInputError("Come on, you can't send a valentine to yourself :expressionless:")          emoji_1, emoji_2 = self.random_emoji() -        channel = self.bot.get_channel(Channels.community_bot_commands) +        channel = self.bot.get_channel(Channels.sir_lancebot_playground)          valentine, title = self.valentine_check(valentine_type)          embed = discord.Embed( diff --git a/bot/exts/holidays/valentines/lovecalculator.py b/bot/exts/holidays/valentines/lovecalculator.py index 3999db2b..10dea9df 100644 --- a/bot/exts/holidays/valentines/lovecalculator.py +++ b/bot/exts/holidays/valentines/lovecalculator.py @@ -12,7 +12,7 @@ from discord.ext import commands  from discord.ext.commands import BadArgument, Cog, clean_content  from bot.bot import Bot -from bot.constants import Channels, Client, Lovefest, Month +from bot.constants import Channels, Lovefest, Month, PYTHON_PREFIX  from bot.utils.decorators import in_month  log = logging.getLogger(__name__) @@ -32,7 +32,7 @@ class LoveCalculator(Cog):          Tells you how much the two love each other.          This command requires at least one member as input, if two are given love will be calculated between -        those two users, if only one is given, the second member is asusmed to be the invoker. +        those two users, if only one is given, the second member is assumed to be the invoker.          Members are converted from:            - User ID            - Mention @@ -51,7 +51,7 @@ class LoveCalculator(Cog):              raise BadArgument(                  "This command can only be ran against members with the lovefest role! "                  "This role be can assigned by running " -                f"`{Client.prefix}lovefest sub` in <#{Channels.community_bot_commands}>." +                f"`{PYTHON_PREFIX}subscribe` in <#{Channels.bot_commands}>."              )          if whom is None: @@ -74,7 +74,8 @@ class LoveCalculator(Cog):          # We need the -1 due to how bisect returns the point          # see the documentation for further detail          # https://docs.python.org/3/library/bisect.html#bisect.bisect -        index = bisect.bisect(LOVE_DATA, (love_percent,)) - 1 +        love_threshold = [threshold for threshold, _ in LOVE_DATA] +        index = bisect.bisect(love_threshold, love_percent) - 1          # We already have the nearest "fit" love level          # We only need the dict, so we can ditch the first element          _, data = LOVE_DATA[index] @@ -89,7 +90,7 @@ class LoveCalculator(Cog):              name="A letter from Dr. Love:",              value=data["text"]          ) -        embed.set_footer(text=f"You can unsubscribe from lovefest by using {Client.prefix}lovefest unsub") +        embed.set_footer(text=f"You can unsubscribe from lovefest by using {PYTHON_PREFIX}subscribe.")          await ctx.send(embed=embed) diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py index a91ef1c0..b50205a0 100644 --- a/bot/exts/utilities/bookmark.py +++ b/bot/exts/utilities/bookmark.py @@ -7,7 +7,7 @@ import discord  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS +from bot.constants import Colours, ERROR_REPLIES, Icons, Roles  from bot.utils.converters import WrappedMessageConverter  from bot.utils.decorators import whitelist_override @@ -16,7 +16,6 @@ log = logging.getLogger(__name__)  # Number of seconds to wait for other users to bookmark the same message  TIMEOUT = 120  BOOKMARK_EMOJI = "π" -WHITELISTED_CATEGORIES = (Categories.help_in_use,)  class Bookmark(commands.Cog): @@ -87,8 +86,8 @@ class Bookmark(commands.Cog):          await message.add_reaction(BOOKMARK_EMOJI)          return message -    @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)      @commands.command(name="bookmark", aliases=("bm", "pin")) +    @whitelist_override(roles=(Roles.everyone,))      async def bookmark(          self,          ctx: commands.Context, @@ -99,7 +98,13 @@ class Bookmark(commands.Cog):          """Send the author a link to `target_message` via DMs."""          if not target_message:              if not ctx.message.reference: -                raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") +                raise commands.UserInputError( +                    "You must either provide a valid message to bookmark, or reply to one." +                    "\n\nThe lookup strategy for a message is as follows (in order):" +                    "\n1. Lookup by '{channel ID}-{message ID}' (retrieved by shift-clicking on 'Copy ID')" +                    "\n2. Lookup by message ID (the message **must** be in the context channel)" +                    "\n3. Lookup by message URL" +                )              target_message = ctx.message.reference.resolved          # Prevent users from bookmarking a message in a channel they don't have access to diff --git a/bot/exts/utilities/challenges.py b/bot/exts/utilities/challenges.py new file mode 100644 index 00000000..ab7ae442 --- /dev/null +++ b/bot/exts/utilities/challenges.py @@ -0,0 +1,341 @@ +import logging +from asyncio import to_thread +from random import choice +from typing import Union + +from bs4 import BeautifulSoup +from discord import Embed, Interaction, SelectOption, ui +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, Emojis, NEGATIVE_REPLIES + +log = logging.getLogger(__name__) +API_ROOT = "https://www.codewars.com/api/v1/code-challenges/{kata_id}" + +# Map difficulty for the kata to color we want to display in the embed. +# These colors are representative of the colors that each kyu's level represents on codewars.com +MAPPING_OF_KYU = { +    8: 0xdddbda, 7: 0xdddbda, 6: 0xecb613, 5: 0xecb613, +    4: 0x3c7ebb, 3: 0x3c7ebb, 2: 0x866cc7, 1: 0x866cc7 +} + +# Supported languages for a kata on codewars.com +SUPPORTED_LANGUAGES = { +    "stable": [ +        "c", "c#", "c++", "clojure", "coffeescript", "coq", "crystal", "dart", "elixir", +        "f#", "go", "groovy", "haskell", "java", "javascript", "kotlin", "lean", "lua", "nasm", +        "php", "python", "racket", "ruby", "rust", "scala", "shell", "sql", "swift", "typescript" +    ], +    "beta": [ +        "agda", "bf", "cfml", "cobol", "commonlisp", "elm", "erlang", "factor", +        "forth", "fortran", "haxe", "idris", "julia", "nim", "objective-c", "ocaml", +        "pascal", "perl", "powershell", "prolog", "purescript", "r", "raku", "reason", "solidity", "vb.net" +    ] +} + + +class InformationDropdown(ui.Select): +    """A dropdown inheriting from ui.Select that allows finding out other information about the kata.""" + +    def __init__(self, language_embed: Embed, tags_embed: Embed, other_info_embed: Embed, main_embed: Embed): +        options = [ +            SelectOption( +                label="Main Information", +                description="See the kata's difficulty, description, etc.", +                emoji="π" +            ), +            SelectOption( +                label="Languages", +                description="See what languages this kata supports!", +                emoji=Emojis.reddit_post_text +            ), +            SelectOption( +                label="Tags", +                description="See what categories this kata falls under!", +                emoji=Emojis.stackoverflow_tag +            ), +            SelectOption( +                label="Other Information", +                description="See how other people performed on this kata and more!", +                emoji="βΉ" +            ) +        ] + +        # We map the option label to the embed instance so that it can be easily looked up later in O(1) +        self.mapping_of_embeds = { +            "Main Information": main_embed, +            "Languages": language_embed, +            "Tags": tags_embed, +            "Other Information": other_info_embed, +        } + +        super().__init__( +            placeholder="See more information regarding this kata", +            min_values=1, +            max_values=1, +            options=options +        ) + +    async def callback(self, interaction: Interaction) -> None: +        """Callback for when someone clicks on a dropdown.""" +        # Edit the message to the embed selected in the option +        # The `original_message` attribute is set just after the message is sent with the view. +        # The attribute is not set during initialization. +        result_embed = self.mapping_of_embeds[self.values[0]] +        await self.original_message.edit(embed=result_embed) + + +class Challenges(commands.Cog): +    """ +    Cog for the challenge command. + +    The challenge command pulls a random kata from codewars.com. +    A kata is the name for a challenge, specific to codewars.com. + +    The challenge command also has filters to customize the kata that is given. +    You can specify the language the kata should be from, difficulty and topic of the kata. +    """ + +    def __init__(self, bot: Bot): +        self.bot = bot + +    async def kata_id(self, search_link: str, params: dict) -> Union[str, Embed]: +        """ +        Uses bs4 to get the HTML code for the page of katas, where the page is the link of the formatted `search_link`. + +        This will webscrape the search page with `search_link` and then get the ID of a kata for the +        codewars.com API to use. +        """ +        async with self.bot.http_session.get(search_link, params=params) as response: +            if response.status != 200: +                error_embed = Embed( +                    title=choice(NEGATIVE_REPLIES), +                    description="We ran into an error when getting the kata from codewars.com, try again later.", +                    color=Colours.soft_red +                ) +                log.error(f"Unexpected response from codewars.com, status code: {response.status}") +                return error_embed + +            soup = BeautifulSoup(await response.text(), features="lxml") +            first_kata_div = await to_thread(soup.find_all, "div", class_="item-title px-0") + +            if not first_kata_div: +                raise commands.BadArgument("No katas could be found with the filters provided.") +            elif len(first_kata_div) >= 3: +                first_kata_div = choice(first_kata_div[:3]) +            elif "q=" not in search_link: +                first_kata_div = choice(first_kata_div) +            else: +                first_kata_div = first_kata_div[0] + +            # There are numerous divs before arriving at the id of the kata, which can be used for the link. +            first_kata_id = first_kata_div.a["href"].split("/")[-1] +            return first_kata_id + +    async def kata_information(self, kata_id: str) -> Union[dict, Embed]: +        """ +        Returns the information about the Kata. + +        Uses the codewars.com API to get information about the kata using `kata_id`. +        """ +        async with self.bot.http_session.get(API_ROOT.format(kata_id=kata_id)) as response: +            if response.status != 200: +                error_embed = Embed( +                    title=choice(NEGATIVE_REPLIES), +                    description="We ran into an error when getting the kata information, try again later.", +                    color=Colours.soft_red +                ) +                log.error(f"Unexpected response from codewars.com/api/v1, status code: {response.status}") +                return error_embed + +            return await response.json() + +    @staticmethod +    def main_embed(kata_information: dict) -> Embed: +        """Creates the main embed which displays the name, difficulty and description of the kata.""" +        kata_description = kata_information["description"] +        kata_url = f"https://codewars.com/kata/{kata_information['id']}" + +        # Ensuring it isn't over the length 1024 +        if len(kata_description) > 1024: +            kata_description = "\n".join(kata_description[:1000].split("\n")[:-1]) + "..." +            kata_description += f" [continue reading]({kata_url})" + +        if kata_information["rank"]["name"] is None: +            embed_color = 8 +            kata_difficulty = "Unable to retrieve difficulty for beta languages." +        else: +            embed_color = int(kata_information["rank"]["name"].replace(" kyu", "")) +            kata_difficulty = kata_information["rank"]["name"] + +        kata_embed = Embed( +            title=kata_information["name"], +            description=kata_description, +            color=MAPPING_OF_KYU[embed_color], +            url=kata_url +        ) +        kata_embed.add_field(name="Difficulty", value=kata_difficulty, inline=False) +        return kata_embed + +    @staticmethod +    def language_embed(kata_information: dict) -> Embed: +        """Creates the 'language embed' which displays all languages the kata supports.""" +        kata_url = f"https://codewars.com/kata/{kata_information['id']}" + +        languages = "\n".join(map(str.title, kata_information["languages"])) +        language_embed = Embed( +            title=kata_information["name"], +            description=f"```yaml\nSupported Languages:\n{languages}\n```", +            color=Colours.python_blue, +            url=kata_url +        ) +        return language_embed + +    @staticmethod +    def tags_embed(kata_information: dict) -> Embed: +        """ +        Creates the 'tags embed' which displays all the tags of the Kata. + +        Tags explain what the kata is about, this is what codewars.com calls categories. +        """ +        kata_url = f"https://codewars.com/kata/{kata_information['id']}" + +        tags = "\n".join(kata_information["tags"]) +        tags_embed = Embed( +            title=kata_information["name"], +            description=f"```yaml\nTags:\n{tags}\n```", +            color=Colours.grass_green, +            url=kata_url +        ) +        return tags_embed + +    @staticmethod +    def miscellaneous_embed(kata_information: dict) -> Embed: +        """ +        Creates the 'other information embed' which displays miscellaneous information about the kata. + +        This embed shows statistics such as the total number of people who completed the kata, +        the total number of stars of the kata, etc. +        """ +        kata_url = f"https://codewars.com/kata/{kata_information['id']}" + +        embed = Embed( +            title=kata_information["name"], +            description="```nim\nOther Information\n```", +            color=Colours.grass_green, +            url=kata_url +        ) +        embed.add_field( +            name="`Total Score`", +            value=f"```css\n{kata_information['voteScore']}\n```", +            inline=False +        ) +        embed.add_field( +            name="`Total Stars`", +            value=f"```css\n{kata_information['totalStars']}\n```", +            inline=False +        ) +        embed.add_field( +            name="`Total Completed`", +            value=f"```css\n{kata_information['totalCompleted']}\n```", +            inline=False +        ) +        embed.add_field( +            name="`Total Attempts`", +            value=f"```css\n{kata_information['totalAttempts']}\n```", +            inline=False +        ) +        return embed + +    @staticmethod +    def create_view(dropdown: InformationDropdown, link: str) -> ui.View: +        """ +        Creates the discord.py View for the Discord message components (dropdowns and buttons). + +        The discord UI is implemented onto the embed, where the user can choose what information about the kata they +        want, along with a link button to the kata itself. +        """ +        view = ui.View() +        view.add_item(dropdown) +        view.add_item(ui.Button(label="View the Kata", url=link)) +        return view + +    @commands.command(aliases=["kata"]) +    @commands.cooldown(1, 5, commands.BucketType.user) +    async def challenge(self, ctx: commands.Context, language: str = "python", *, query: str = None) -> None: +        """ +        The challenge command pulls a random kata (challenge) from codewars.com. + +        The different ways to use this command are: +        `.challenge <language>` - Pulls a random challenge within that language's scope. +        `.challenge <language> <difficulty>` - The difficulty can be from 1-8, +        1 being the hardest, 8 being the easiest. This pulls a random challenge within that difficulty & language. +        `.challenge <language> <query>` - Pulls a random challenge with the query provided under the language +        `.challenge <language> <query>, <difficulty>` - Pulls a random challenge with the query provided, +        under that difficulty within the language's scope. +        """ +        language = language.lower() +        if language not in SUPPORTED_LANGUAGES["stable"] + SUPPORTED_LANGUAGES["beta"]: +            raise commands.BadArgument("This is not a recognized language on codewars.com!") + +        get_kata_link = f"https://codewars.com/kata/search/{language}" +        params = {} + +        if query is not None: +            if "," in query: +                query_splitted = query.split("," if ", " not in query else ", ") + +                if len(query_splitted) > 2: +                    raise commands.BadArgument( +                        "There can only be one comma within the query, separating the difficulty and the query itself." +                    ) + +                query, level = query_splitted +                params["q"] = query +                params["r[]"] = f"-{level}" +            elif query.isnumeric(): +                params["r[]"] = f"-{query}" +            else: +                params["q"] = query + +        params["beta"] = str(language in SUPPORTED_LANGUAGES["beta"]).lower() + +        first_kata_id = await self.kata_id(get_kata_link, params) +        if isinstance(first_kata_id, Embed): +            # We ran into an error when retrieving the website link +            await ctx.send(embed=first_kata_id) +            return + +        kata_information = await self.kata_information(first_kata_id) +        if isinstance(kata_information, Embed): +            # Something went wrong when trying to fetch the kata information +            await ctx.d(embed=kata_information) +            return + +        kata_embed = self.main_embed(kata_information) +        language_embed = self.language_embed(kata_information) +        tags_embed = self.tags_embed(kata_information) +        miscellaneous_embed = self.miscellaneous_embed(kata_information) + +        dropdown = InformationDropdown( +            main_embed=kata_embed, +            language_embed=language_embed, +            tags_embed=tags_embed, +            other_info_embed=miscellaneous_embed +        ) +        kata_view = self.create_view(dropdown, f"https://codewars.com/kata/{first_kata_id}") +        original_message = await ctx.send( +            embed=kata_embed, +            view=kata_view +        ) +        dropdown.original_message = original_message + +        wait_for_kata = await kata_view.wait() +        if wait_for_kata: +            await original_message.edit(embed=kata_embed, view=None) + + +def setup(bot: Bot) -> None: +    """Load the Challenges cog.""" +    bot.add_cog(Challenges(bot)) diff --git a/bot/exts/utilities/colour.py b/bot/exts/utilities/colour.py new file mode 100644 index 00000000..ee6bad93 --- /dev/null +++ b/bot/exts/utilities/colour.py @@ -0,0 +1,266 @@ +import colorsys +import json +import pathlib +import random +import string +from io import BytesIO +from typing import Optional + +import discord +import rapidfuzz +from PIL import Image, ImageColor +from discord.ext import commands + +from bot import constants +from bot.bot import Bot +from bot.exts.core.extensions import invoke_help_command +from bot.utils.decorators import whitelist_override + +THUMBNAIL_SIZE = (80, 80) + + +class Colour(commands.Cog): +    """Cog for the Colour command.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        with open(pathlib.Path("bot/resources/utilities/ryanzec_colours.json")) as f: +            self.colour_mapping = json.load(f) +            del self.colour_mapping['_']  # Delete source credit entry + +    async def send_colour_response(self, ctx: commands.Context, rgb: tuple[int, int, int]) -> None: +        """Create and send embed from user given colour information.""" +        name = self._rgb_to_name(rgb) +        try: +            colour_or_color = ctx.invoked_parents[0] +        except IndexError: +            colour_or_color = "colour" + +        colour_mode = ctx.invoked_with +        if colour_mode == "random": +            colour_mode = colour_or_color +            input_colour = name +        elif colour_mode in ("colour", "color"): +            input_colour = ctx.kwargs["colour_input"] +        elif colour_mode == "name": +            input_colour = ctx.kwargs["user_colour_name"] +        elif colour_mode == "hex": +            input_colour = ctx.args[2:][0] +            if len(input_colour) > 7: +                input_colour = input_colour[0:-2] +        else: +            input_colour = tuple(ctx.args[2:]) + +        if colour_mode not in ("name", "hex", "random", "color", "colour"): +            colour_mode = colour_mode.upper() +        else: +            colour_mode = colour_mode.title() + +        colour_embed = discord.Embed( +            title=f"{name or input_colour}", +            description=f"{colour_or_color.title()} information for {colour_mode} `{input_colour or name}`.", +            colour=discord.Color.from_rgb(*rgb) +        ) +        colour_conversions = self.get_colour_conversions(rgb) +        for colour_space, value in colour_conversions.items(): +            colour_embed.add_field( +                name=colour_space, +                value=f"`{value}`", +                inline=True +            ) + +        thumbnail = Image.new("RGB", THUMBNAIL_SIZE, color=rgb) +        buffer = BytesIO() +        thumbnail.save(buffer, "PNG") +        buffer.seek(0) +        thumbnail_file = discord.File(buffer, filename="colour.png") + +        colour_embed.set_thumbnail(url="attachment://colour.png") + +        await ctx.send(file=thumbnail_file, embed=colour_embed) + +    @commands.group(aliases=("color",), invoke_without_command=True) +    @whitelist_override( +        channels=constants.WHITELISTED_CHANNELS, +        roles=constants.STAFF_ROLES, +        categories=[constants.Categories.development, constants.Categories.media] +    ) +    async def colour(self, ctx: commands.Context, *, colour_input: Optional[str] = None) -> None: +        """ +        Create an embed that displays colour information. + +        If no subcommand is called, a randomly selected colour will be shown. +        """ +        if colour_input is None: +            await self.random(ctx) +            return + +        try: +            extra_colour = ImageColor.getrgb(colour_input) +            await self.send_colour_response(ctx, extra_colour) +        except ValueError: +            await invoke_help_command(ctx) + +    @colour.command() +    async def rgb(self, ctx: commands.Context, red: int, green: int, blue: int) -> None: +        """Create an embed from an RGB input.""" +        if any(c not in range(256) for c in (red, green, blue)): +            raise commands.BadArgument( +                message=f"RGB values can only be from 0 to 255. User input was: `{red, green, blue}`." +            ) +        rgb_tuple = (red, green, blue) +        await self.send_colour_response(ctx, rgb_tuple) + +    @colour.command() +    async def hsv(self, ctx: commands.Context, hue: int, saturation: int, value: int) -> None: +        """Create an embed from an HSV input.""" +        if (hue not in range(361)) or any(c not in range(101) for c in (saturation, value)): +            raise commands.BadArgument( +                message="Hue can only be from 0 to 360. Saturation and Value can only be from 0 to 100. " +                f"User input was: `{hue, saturation, value}`." +            ) +        hsv_tuple = ImageColor.getrgb(f"hsv({hue}, {saturation}%, {value}%)") +        await self.send_colour_response(ctx, hsv_tuple) + +    @colour.command() +    async def hsl(self, ctx: commands.Context, hue: int, saturation: int, lightness: int) -> None: +        """Create an embed from an HSL input.""" +        if (hue not in range(361)) or any(c not in range(101) for c in (saturation, lightness)): +            raise commands.BadArgument( +                message="Hue can only be from 0 to 360. Saturation and Lightness can only be from 0 to 100. " +                f"User input was: `{hue, saturation, lightness}`." +            ) +        hsl_tuple = ImageColor.getrgb(f"hsl({hue}, {saturation}%, {lightness}%)") +        await self.send_colour_response(ctx, hsl_tuple) + +    @colour.command() +    async def cmyk(self, ctx: commands.Context, cyan: int, magenta: int, yellow: int, key: int) -> None: +        """Create an embed from a CMYK input.""" +        if any(c not in range(101) for c in (cyan, magenta, yellow, key)): +            raise commands.BadArgument( +                message=f"CMYK values can only be from 0 to 100. User input was: `{cyan, magenta, yellow, key}`." +            ) +        r = round(255 * (1 - (cyan / 100)) * (1 - (key / 100))) +        g = round(255 * (1 - (magenta / 100)) * (1 - (key / 100))) +        b = round(255 * (1 - (yellow / 100)) * (1 - (key / 100))) +        await self.send_colour_response(ctx, (r, g, b)) + +    @colour.command() +    async def hex(self, ctx: commands.Context, hex_code: str) -> None: +        """Create an embed from a HEX input.""" +        if hex_code[0] != "#": +            hex_code = f"#{hex_code}" + +        if len(hex_code) not in (4, 5, 7, 9) or any(digit not in string.hexdigits for digit in hex_code[1:]): +            raise commands.BadArgument( +                message=f"Cannot convert `{hex_code}` to a recognizable Hex format. " +                "Hex values must be hexadecimal and take the form *#RRGGBB* or *#RGB*." +            ) + +        hex_tuple = ImageColor.getrgb(hex_code) +        if len(hex_tuple) == 4: +            hex_tuple = hex_tuple[:-1]  # Colour must be RGB. If RGBA, we remove the alpha value +        await self.send_colour_response(ctx, hex_tuple) + +    @colour.command() +    async def name(self, ctx: commands.Context, *, user_colour_name: str) -> None: +        """Create an embed from a name input.""" +        hex_colour = self.match_colour_name(ctx, user_colour_name) +        if hex_colour is None: +            name_error_embed = discord.Embed( +                title="No colour match found.", +                description=f"No colour found for: `{user_colour_name}`", +                colour=discord.Color.dark_red() +            ) +            await ctx.send(embed=name_error_embed) +            return +        hex_tuple = ImageColor.getrgb(hex_colour) +        await self.send_colour_response(ctx, hex_tuple) + +    @colour.command() +    async def random(self, ctx: commands.Context) -> None: +        """Create an embed from a randomly chosen colour.""" +        hex_colour = random.choice(list(self.colour_mapping.values())) +        hex_tuple = ImageColor.getrgb(f"#{hex_colour}") +        await self.send_colour_response(ctx, hex_tuple) + +    def get_colour_conversions(self, rgb: tuple[int, int, int]) -> dict[str, str]: +        """Create a dictionary mapping of colour types and their values.""" +        colour_name = self._rgb_to_name(rgb) +        if colour_name is None: +            colour_name = "No match found" +        return { +            "RGB": rgb, +            "HSV": self._rgb_to_hsv(rgb), +            "HSL": self._rgb_to_hsl(rgb), +            "CMYK": self._rgb_to_cmyk(rgb), +            "Hex": self._rgb_to_hex(rgb), +            "Name": colour_name +        } + +    @staticmethod +    def _rgb_to_hsv(rgb: tuple[int, int, int]) -> tuple[int, int, int]: +        """Convert RGB values to HSV values.""" +        rgb_list = [val / 255 for val in rgb] +        h, s, v = colorsys.rgb_to_hsv(*rgb_list) +        hsv = (round(h * 360), round(s * 100), round(v * 100)) +        return hsv + +    @staticmethod +    def _rgb_to_hsl(rgb: tuple[int, int, int]) -> tuple[int, int, int]: +        """Convert RGB values to HSL values.""" +        rgb_list = [val / 255.0 for val in rgb] +        h, l, s = colorsys.rgb_to_hls(*rgb_list) +        hsl = (round(h * 360), round(s * 100), round(l * 100)) +        return hsl + +    @staticmethod +    def _rgb_to_cmyk(rgb: tuple[int, int, int]) -> tuple[int, int, int, int]: +        """Convert RGB values to CMYK values.""" +        rgb_list = [val / 255.0 for val in rgb] +        if not any(rgb_list): +            return 0, 0, 0, 100 +        k = 1 - max(rgb_list) +        c = round((1 - rgb_list[0] - k) * 100 / (1 - k)) +        m = round((1 - rgb_list[1] - k) * 100 / (1 - k)) +        y = round((1 - rgb_list[2] - k) * 100 / (1 - k)) +        cmyk = (c, m, y, round(k * 100)) +        return cmyk + +    @staticmethod +    def _rgb_to_hex(rgb: tuple[int, int, int]) -> str: +        """Convert RGB values to HEX code.""" +        hex_ = "".join([hex(val)[2:].zfill(2) for val in rgb]) +        hex_code = f"#{hex_}".upper() +        return hex_code + +    def _rgb_to_name(self, rgb: tuple[int, int, int]) -> Optional[str]: +        """Convert RGB values to a fuzzy matched name.""" +        input_hex_colour = self._rgb_to_hex(rgb) +        try: +            match, certainty, _ = rapidfuzz.process.extractOne( +                query=input_hex_colour, +                choices=self.colour_mapping.values(), +                score_cutoff=80 +            ) +            colour_name = [name for name, hex_code in self.colour_mapping.items() if hex_code == match][0] +        except TypeError: +            colour_name = None +        return colour_name + +    def match_colour_name(self, ctx: commands.Context, input_colour_name: str) -> Optional[str]: +        """Convert a colour name to HEX code.""" +        try: +            match, certainty, _ = rapidfuzz.process.extractOne( +                query=input_colour_name, +                choices=self.colour_mapping.keys(), +                score_cutoff=80 +            ) +        except (ValueError, TypeError): +            return +        return f"#{self.colour_mapping[match]}" + + +def setup(bot: Bot) -> None: +    """Load the Colour cog.""" +    bot.add_cog(Colour(bot)) diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py index dd537022..8bf2abfd 100644 --- a/bot/exts/utilities/conversationstarters.py +++ b/bot/exts/utilities/conversationstarters.py @@ -1,11 +1,15 @@ +import asyncio +from contextlib import suppress +from functools import partial  from pathlib import Path +from typing import Union +import discord  import yaml -from discord import Color, Embed  from discord.ext import commands  from bot.bot import Bot -from bot.constants import WHITELISTED_CHANNELS +from bot.constants import MODERATION_ROLES, WHITELISTED_CHANNELS  from bot.utils.decorators import whitelist_override  from bot.utils.randomization import RandomCycle @@ -35,35 +39,88 @@ TOPICS = {  class ConvoStarters(commands.Cog):      """General conversation topics.""" -    @commands.command() -    @whitelist_override(channels=ALL_ALLOWED_CHANNELS) -    async def topic(self, ctx: commands.Context) -> None: +    def __init__(self, bot: Bot): +        self.bot = bot + +    @staticmethod +    def _build_topic_embed(channel_id: int) -> discord.Embed:          """ -        Responds with a random topic to start a conversation. +        Build an embed containing a conversation topic.          If in a Python channel, a python-related topic will be given. -          Otherwise, a random conversation topic will be received by the user.          """          # No matter what, the form will be shown. -        embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple()) +        embed = discord.Embed( +            description=f"Suggest more topics [here]({SUGGESTION_FORM})!", +            color=discord.Colour.og_blurple() +        )          try: -            # Fetching topics. -            channel_topics = TOPICS[ctx.channel.id] - -        # If the channel isn't Python-related. +            channel_topics = TOPICS[channel_id]          except KeyError: +            # Channel doesn't have any topics.              embed.title = f"**{next(TOPICS['default'])}**" - -        # If the channel ID doesn't have any topics.          else:              embed.title = f"**{next(channel_topics)}**" +        return embed + +    @staticmethod +    def _predicate( +        command_invoker: Union[discord.User, discord.Member], +        message: discord.Message, +        reaction: discord.Reaction, +        user: discord.User +    ) -> bool: +        user_is_moderator = any(role.id in MODERATION_ROLES for role in getattr(user, "roles", [])) +        user_is_invoker = user.id == command_invoker.id + +        is_right_reaction = all(( +            reaction.message.id == message.id, +            str(reaction.emoji) == "π", +            user_is_moderator or user_is_invoker +        )) +        return is_right_reaction + +    async def _listen_for_refresh( +        self, +        command_invoker: Union[discord.User, discord.Member], +        message: discord.Message +    ) -> None: +        await message.add_reaction("π") +        while True: +            try: +                reaction, user = await self.bot.wait_for( +                    "reaction_add", +                    check=partial(self._predicate, command_invoker, message), +                    timeout=60.0 +                ) +            except asyncio.TimeoutError: +                with suppress(discord.NotFound): +                    await message.clear_reaction("π") +                break + +            try: +                await message.edit(embed=self._build_topic_embed(message.channel.id)) +            except discord.NotFound: +                break + +            with suppress(discord.NotFound): +                await message.remove_reaction(reaction, user) -        finally: -            await ctx.send(embed=embed) +    @commands.command() +    @commands.cooldown(1, 60*2, commands.BucketType.channel) +    @whitelist_override(channels=ALL_ALLOWED_CHANNELS) +    async def topic(self, ctx: commands.Context) -> None: +        """ +        Responds with a random topic to start a conversation. + +        Allows the refresh of a topic by pressing an emoji. +        """ +        message = await ctx.send(embed=self._build_topic_embed(ctx.channel.id)) +        self.bot.loop.create_task(self._listen_for_refresh(ctx.author, message))  def setup(bot: Bot) -> None:      """Load the ConvoStarters cog.""" -    bot.add_cog(ConvoStarters()) +    bot.add_cog(ConvoStarters(bot)) diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py index 55d6b8e9..fa438d7f 100644 --- a/bot/exts/utilities/emoji.py +++ b/bot/exts/utilities/emoji.py @@ -107,11 +107,11 @@ class Emojis(commands.Cog):              title=f"Emoji Information: {emoji.name}",              description=textwrap.dedent(f"""                  **Name:** {emoji.name} -                **Created:** {time_since(emoji.created_at, precision="hours")} -                **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} +                **Created:** {time_since(emoji.created_at.replace(tzinfo=None), precision="hours")} +                **Date:** {datetime.strftime(emoji.created_at.replace(tzinfo=None), "%d/%m/%Y")}                  **ID:** {emoji.id}              """), -            color=Color.blurple(), +            color=Color.og_blurple(),              url=str(emoji.url),          ).set_thumbnail(url=emoji.url) diff --git a/bot/exts/utilities/epoch.py b/bot/exts/utilities/epoch.py new file mode 100644 index 00000000..42312dd1 --- /dev/null +++ b/bot/exts/utilities/epoch.py @@ -0,0 +1,138 @@ +from typing import Optional, Union + +import arrow +import discord +from dateutil import parser +from discord.ext import commands + +from bot.bot import Bot +from bot.utils.extensions import invoke_help_command + +# https://discord.com/developers/docs/reference#message-formatting-timestamp-styles +STYLES = { +    "Epoch": ("",), +    "Short Time": ("t", "h:mm A",), +    "Long Time": ("T", "h:mm:ss A"), +    "Short Date": ("d", "MM/DD/YYYY"), +    "Long Date": ("D", "MMMM D, YYYY"), +    "Short Date/Time": ("f", "MMMM D, YYYY h:mm A"), +    "Long Date/Time": ("F", "dddd, MMMM D, YYYY h:mm A"), +    "Relative Time": ("R",) +} +DROPDOWN_TIMEOUT = 60 + + +class DateString(commands.Converter): +    """Convert a relative or absolute date/time string to an arrow.Arrow object.""" + +    async def convert(self, ctx: commands.Context, argument: str) -> Union[arrow.Arrow, Optional[tuple]]: +        """ +        Convert a relative or absolute date/time string to an arrow.Arrow object. + +        Try to interpret the date string as a relative time. If conversion fails, try to interpret it as an absolute +        time. Tokens that are not recognised are returned along with the part of the string that was successfully +        converted to an arrow object. If the date string cannot be parsed, BadArgument is raised. +        """ +        try: +            return arrow.utcnow().dehumanize(argument) +        except (ValueError, OverflowError): +            try: +                dt, ignored_tokens = parser.parse(argument, fuzzy_with_tokens=True) +            except parser.ParserError: +                raise commands.BadArgument(f"`{argument}` Could not be parsed to a relative or absolute date.") +            except OverflowError: +                raise commands.BadArgument(f"`{argument}` Results in a date outside of the supported range.") +            return arrow.get(dt), ignored_tokens + + +class Epoch(commands.Cog): +    """Convert an entered time and date to a unix timestamp.""" + +    @commands.command(name="epoch") +    async def epoch(self, ctx: commands.Context, *, date_time: DateString = None) -> None: +        """ +        Convert an entered date/time string to the equivalent epoch. + +        **Relative time** +            Must begin with `in...` or end with `...ago`. +            Accepted units: "seconds", "minutes", "hours", "days", "weeks", "months", "years". +            eg `.epoch in a month 4 days and 2 hours` + +        **Absolute time** +            eg `.epoch 2022/6/15 16:43 -04:00` +            Absolute times must be entered in descending orders of magnitude. +            If AM or PM is left unspecified, the 24-hour clock is assumed. +            Timezones are optional, and will default to UTC. The following timezone formats are accepted: +                Z (UTC) +                Β±HH:MM +                Β±HHMM +                Β±HH + +        Times in the dropdown are shown in UTC +        """ +        if not date_time: +            await invoke_help_command(ctx) +            return + +        if isinstance(date_time, tuple): +            # Remove empty strings. Strip extra whitespace from the remaining items +            ignored_tokens = list(map(str.strip, filter(str.strip, date_time[1]))) +            date_time = date_time[0] +            if ignored_tokens: +                await ctx.send(f"Could not parse the following token(s): `{', '.join(ignored_tokens)}`") +        await ctx.send(f"Date and time parsed as: `{date_time.format(arrow.FORMAT_RSS)}`") + +        epoch = int(date_time.timestamp()) +        view = TimestampMenuView(ctx, self._format_dates(date_time), epoch) +        original = await ctx.send(f"`{epoch}`", view=view) +        await view.wait()  # wait until expiration before removing the dropdown +        try: +            await original.edit(view=None) +        except discord.NotFound:  # disregard the error message if the message is deleled +            pass + +    @staticmethod +    def _format_dates(date: arrow.Arrow) -> list[str]: +        """ +        Return a list of date strings formatted according to the discord timestamp styles. + +        These are used in the description of each style in the dropdown +        """ +        date = date.to('utc') +        formatted = [str(int(date.timestamp()))] +        formatted += [date.format(format[1]) for format in list(STYLES.values())[1:7]] +        formatted.append(date.humanize()) +        return formatted + + +class TimestampMenuView(discord.ui.View): +    """View for the epoch command which contains a single `discord.ui.Select` dropdown component.""" + +    def __init__(self, ctx: commands.Context, formatted_times: list[str], epoch: int): +        super().__init__(timeout=DROPDOWN_TIMEOUT) +        self.ctx = ctx +        self.epoch = epoch +        self.dropdown: discord.ui.Select = self.children[0] +        for label, date_time in zip(STYLES.keys(), formatted_times): +            self.dropdown.add_option(label=label, description=date_time) + +    @discord.ui.select(placeholder="Select the format of your timestamp") +    async def select_format(self, _: discord.ui.Select, interaction: discord.Interaction) -> discord.Message: +        """Drop down menu which contains a list of formats which discord timestamps can take.""" +        selected = interaction.data["values"][0] +        if selected == "Epoch": +            return await interaction.response.edit_message(content=f"`{self.epoch}`") +        return await interaction.response.edit_message(content=f"`<t:{self.epoch}:{STYLES[selected][0]}>`") + +    async def interaction_check(self, interaction: discord.Interaction) -> bool: +        """Check to ensure that the interacting user is the user who invoked the command.""" +        if interaction.user != self.ctx.author: +            embed = discord.Embed(description="Sorry, but this dropdown menu can only be used by the original author.") +            await interaction.response.send_message(embed=embed, ephemeral=True) +            return False +        return True + + +def setup(bot: Bot) -> None: +    """Load the Epoch cog.""" +    bot.add_cog(Epoch()) diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py index d00b408d..963f54e5 100644 --- a/bot/exts/utilities/githubinfo.py +++ b/bot/exts/utilities/githubinfo.py @@ -1,30 +1,165 @@  import logging  import random +import re +import typing as t +from dataclasses import dataclass  from datetime import datetime -from urllib.parse import quote, quote_plus +from urllib.parse import quote  import discord +from aiohttp import ClientResponse  from discord.ext import commands  from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES +from bot.constants import Colours, ERROR_REPLIES, Emojis, NEGATIVE_REPLIES, Tokens  from bot.exts.core.extensions import invoke_help_command  log = logging.getLogger(__name__)  GITHUB_API_URL = "https://api.github.com" +REQUEST_HEADERS = { +    "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {Tokens.github}" + +CODE_BLOCK_RE = re.compile( +    r"^`([^`\n]+)`"   # Inline codeblock +    r"|```(.+?)```",  # Multiline codeblock +    re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( +    r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass(eq=True, frozen=True) +class FoundIssue: +    """Dataclass representing an issue found by the regex.""" + +    organisation: t.Optional[str] +    repository: str +    number: str + + +@dataclass(eq=True, frozen=True) +class FetchError: +    """Dataclass representing an error while fetching an issue.""" + +    return_code: int +    message: str + + +@dataclass(eq=True, frozen=True) +class IssueState: +    """Dataclass representing the state of an issue.""" + +    repository: str +    number: int +    url: str +    title: str +    emoji: str +  class GithubInfo(commands.Cog): -    """Fetches info from GitHub.""" +    """A Cog that fetches info from GitHub."""      def __init__(self, bot: Bot):          self.bot = bot +        self.repos = [] + +    @staticmethod +    def remove_codeblocks(message: str) -> str: +        """Remove any codeblock in a message.""" +        return CODE_BLOCK_RE.sub("", message) + +    async def fetch_issue( +        self, +        number: int, +        repository: str, +        user: str +    ) -> t.Union[IssueState, FetchError]: +        """ +        Retrieve an issue from a GitHub repository. + +        Returns IssueState on success, FetchError on failure. +        """ +        url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) +        pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + +        json_data, r = await self.fetch_data(url) + +        if r.status == 403: +            if r.headers.get("X-RateLimit-Remaining") == "0": +                log.info(f"Ratelimit reached while fetching {url}") +                return FetchError(403, "Ratelimit reached, please retry in a few minutes.") +            return FetchError(403, "Cannot access issue.") +        elif r.status in (404, 410): +            return FetchError(r.status, "Issue not found.") +        elif r.status != 200: +            return FetchError(r.status, "Error while fetching issue.") + +        # The initial API request is made to the issues API endpoint, which will return information +        # if the issue or PR is present. However, the scope of information returned for PRs differs +        # from issues: if the 'issues' key is present in the response then we can pull the data we +        # need from the initial API call. +        if "issues" in json_data["html_url"]: +            if json_data.get("state") == "open": +                emoji = Emojis.issue_open +            else: +                emoji = Emojis.issue_closed + +        # If the 'issues' key is not contained in the API response and there is no error code, then +        # we know that a PR has been requested and a call to the pulls API endpoint is necessary +        # to get the desired information for the PR. +        else: +            pull_data, _ = await self.fetch_data(pulls_url) +            if pull_data["draft"]: +                emoji = Emojis.pull_request_draft +            elif pull_data["state"] == "open": +                emoji = Emojis.pull_request_open +            # When 'merged_at' is not None, this means that the state of the PR is merged +            elif pull_data["merged_at"] is not None: +                emoji = Emojis.pull_request_merged +            else: +                emoji = Emojis.pull_request_closed + +        issue_url = json_data.get("html_url") -    async def fetch_data(self, url: str) -> dict: -        """Retrieve data as a dictionary.""" -        async with self.bot.http_session.get(url) as r: -            return await r.json() +        return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + +    @staticmethod +    def format_embed( +        results: t.List[t.Union[IssueState, FetchError]] +    ) -> discord.Embed: +        """Take a list of IssueState or FetchError and format a Discord embed for them.""" +        description_list = [] + +        for result in results: +            if isinstance(result, IssueState): +                description_list.append(f"{result.emoji} [{result.title}]({result.url})") +            elif isinstance(result, FetchError): +                description_list.append(f":x: [{result.return_code}] {result.message}") + +        resp = discord.Embed( +            colour=Colours.bright_green, +            description="\n".join(description_list) +        ) + +        resp.set_author(name="GitHub") +        return resp      @commands.group(name="github", aliases=("gh", "git"))      @commands.cooldown(1, 10, commands.BucketType.user) @@ -33,11 +168,67 @@ class GithubInfo(commands.Cog):          if ctx.invoked_subcommand is None:              await invoke_help_command(ctx) +    @commands.Cog.listener() +    async def on_message(self, message: discord.Message) -> None: +        """ +        Automatic issue linking. + +        Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. +        """ +        # Ignore bots +        if message.author.bot: +            return + +        issues = [ +            FoundIssue(*match.group("org", "repo", "number")) +            for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) +        ] +        links = [] + +        if issues: +            # Block this from working in DMs +            if not message.guild: +                return + +            log.trace(f"Found {issues = }") +            # Remove duplicates +            issues = set(issues) + +            if len(issues) > MAXIMUM_ISSUES: +                embed = discord.Embed( +                    title=random.choice(ERROR_REPLIES), +                    color=Colours.soft_red, +                    description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" +                ) +                await message.channel.send(embed=embed, delete_after=5) +                return + +            for repo_issue in issues: +                result = await self.fetch_issue( +                    int(repo_issue.number), +                    repo_issue.repository, +                    repo_issue.organisation or "python-discord" +                ) +                if isinstance(result, IssueState): +                    links.append(result) + +        if not links: +            return + +        resp = self.format_embed(links) +        await message.channel.send(embed=resp) + +    async def fetch_data(self, url: str) -> tuple[dict[str], ClientResponse]: +        """Retrieve data as a dictionary and the response in a tuple.""" +        log.trace(f"Querying GH issues API: {url}") +        async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: +            return await r.json(), r +      @github_group.command(name="user", aliases=("userinfo",))      async def github_user_info(self, ctx: commands.Context, username: str) -> None:          """Fetches a user's GitHub information."""          async with ctx.typing(): -            user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") +            user_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/users/{username}")              # User_data will not have a message key if the user exists              if "message" in user_data: @@ -50,7 +241,7 @@ class GithubInfo(commands.Cog):                  await ctx.send(embed=embed)                  return -            org_data = await self.fetch_data(user_data["organizations_url"]) +            org_data, _ = await self.fetch_data(user_data["organizations_url"])              orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]              orgs_to_add = " | ".join(orgs) @@ -67,7 +258,7 @@ class GithubInfo(commands.Cog):              embed = discord.Embed(                  title=f"`{user_data['login']}`'s GitHub profile info",                  description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "", -                colour=discord.Colour.blurple(), +                colour=discord.Colour.og_blurple(),                  url=user_data["html_url"],                  timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ")              ) @@ -91,10 +282,7 @@ class GithubInfo(commands.Cog):              )              if user_data["type"] == "User": -                embed.add_field( -                    name="Gists", -                    value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" -                ) +                embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{quote(username, safe='')})")                  embed.add_field(                      name=f"Organization{'s' if len(orgs)!=1 else ''}", @@ -123,7 +311,7 @@ class GithubInfo(commands.Cog):              return          async with ctx.typing(): -            repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") +            repo_data, _ = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")              # There won't be a message key if this repo exists              if "message" in repo_data: @@ -139,7 +327,7 @@ class GithubInfo(commands.Cog):          embed = discord.Embed(              title=repo_data["name"],              description=repo_data["description"], -            colour=discord.Colour.blurple(), +            colour=discord.Colour.og_blurple(),              url=repo_data["html_url"]          ) diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py deleted file mode 100644 index 8a7ebed0..00000000 --- a/bot/exts/utilities/issues.py +++ /dev/null @@ -1,275 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( -    Categories, -    Channels, -    Colours, -    ERROR_REPLIES, -    Emojis, -    NEGATIVE_REPLIES, -    Tokens, -    WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { -    404: "Issue/pull request not located! Please enter a valid number!", -    403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { -    "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: -    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( -    Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( -    r"^`([^`\n]+)`"  # Inline codeblock -    r"|```(.+?)```",  # Multiline codeblock -    re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( -    r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: -    """Dataclass representing an issue found by the regex.""" - -    organisation: Optional[str] -    repository: str -    number: str - -    def __hash__(self) -> int: -        return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: -    """Dataclass representing an error while fetching an issue.""" - -    return_code: int -    message: str - - -@dataclass -class IssueState: -    """Dataclass representing the state of an issue.""" - -    repository: str -    number: int -    url: str -    title: str -    emoji: str - - -class Issues(commands.Cog): -    """Cog that allows users to retrieve issues from GitHub.""" - -    def __init__(self, bot: Bot): -        self.bot = bot -        self.repos = [] - -    @staticmethod -    def remove_codeblocks(message: str) -> str: -        """Remove any codeblock in a message.""" -        return re.sub(CODE_BLOCK_RE, "", message) - -    async def fetch_issues( -            self, -            number: int, -            repository: str, -            user: str -    ) -> Union[IssueState, FetchError]: -        """ -        Retrieve an issue from a GitHub repository. - -        Returns IssueState on success, FetchError on failure. -        """ -        url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) -        pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) -        log.trace(f"Querying GH issues API: {url}") - -        async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: -            json_data = await r.json() - -        if r.status == 403: -            if r.headers.get("X-RateLimit-Remaining") == "0": -                log.info(f"Ratelimit reached while fetching {url}") -                return FetchError(403, "Ratelimit reached, please retry in a few minutes.") -            return FetchError(403, "Cannot access issue.") -        elif r.status in (404, 410): -            return FetchError(r.status, "Issue not found.") -        elif r.status != 200: -            return FetchError(r.status, "Error while fetching issue.") - -        # The initial API request is made to the issues API endpoint, which will return information -        # if the issue or PR is present. However, the scope of information returned for PRs differs -        # from issues: if the 'issues' key is present in the response then we can pull the data we -        # need from the initial API call. -        if "issues" in json_data["html_url"]: -            if json_data.get("state") == "open": -                emoji = Emojis.issue_open -            else: -                emoji = Emojis.issue_closed - -        # If the 'issues' key is not contained in the API response and there is no error code, then -        # we know that a PR has been requested and a call to the pulls API endpoint is necessary -        # to get the desired information for the PR. -        else: -            log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") -            async with self.bot.http_session.get(pulls_url) as p: -                pull_data = await p.json() -                if pull_data["draft"]: -                    emoji = Emojis.pull_request_draft -                elif pull_data["state"] == "open": -                    emoji = Emojis.pull_request_open -                # When 'merged_at' is not None, this means that the state of the PR is merged -                elif pull_data["merged_at"] is not None: -                    emoji = Emojis.pull_request_merged -                else: -                    emoji = Emojis.pull_request_closed - -        issue_url = json_data.get("html_url") - -        return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - -    @staticmethod -    def format_embed( -        results: list[Union[IssueState, FetchError]], -        user: str, -        repository: Optional[str] = None -    ) -> discord.Embed: -        """Take a list of IssueState or FetchError and format a Discord embed for them.""" -        description_list = [] - -        for result in results: -            if isinstance(result, IssueState): -                description_list.append(f"{result.emoji} [{result.title}]({result.url})") -            elif isinstance(result, FetchError): -                description_list.append(f":x: [{result.return_code}] {result.message}") - -        resp = discord.Embed( -            colour=Colours.bright_green, -            description="\n".join(description_list) -        ) - -        embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" -        resp.set_author(name="GitHub", url=embed_url) -        return resp - -    @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) -    @commands.command(aliases=("pr",)) -    async def issue( -        self, -        ctx: commands.Context, -        numbers: commands.Greedy[int], -        repository: str = "sir-lancebot", -        user: str = "python-discord" -    ) -> None: -        """Command to retrieve issue(s) from a GitHub repository.""" -        # Remove duplicates -        numbers = set(numbers) - -        if len(numbers) > MAXIMUM_ISSUES: -            embed = discord.Embed( -                title=random.choice(ERROR_REPLIES), -                color=Colours.soft_red, -                description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" -            ) -            await ctx.send(embed=embed) -            await invoke_help_command(ctx) - -        results = [await self.fetch_issues(number, repository, user) for number in numbers] -        await ctx.send(embed=self.format_embed(results, user, repository)) - -    @commands.Cog.listener() -    async def on_message(self, message: discord.Message) -> None: -        """ -        Automatic issue linking. - -        Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. -        """ -        # Ignore bots -        if message.author.bot: -            return - -        issues = [ -            FoundIssue(*match.group("org", "repo", "number")) -            for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) -        ] -        links = [] - -        if issues: -            # Block this from working in DMs -            if not message.guild: -                await message.channel.send( -                    embed=discord.Embed( -                        title=random.choice(NEGATIVE_REPLIES), -                        description=( -                            "You can't retrieve issues from DMs. " -                            f"Try again in <#{Channels.community_bot_commands}>" -                        ), -                        colour=Colours.soft_red -                    ) -                ) -                return - -            log.trace(f"Found {issues = }") -            # Remove duplicates -            issues = set(issues) - -            if len(issues) > MAXIMUM_ISSUES: -                embed = discord.Embed( -                    title=random.choice(ERROR_REPLIES), -                    color=Colours.soft_red, -                    description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" -                ) -                await message.channel.send(embed=embed, delete_after=5) -                return - -            for repo_issue in issues: -                result = await self.fetch_issues( -                    int(repo_issue.number), -                    repo_issue.repository, -                    repo_issue.organisation or "python-discord" -                ) -                if isinstance(result, IssueState): -                    links.append(result) - -        if not links: -            return - -        resp = self.format_embed(links, "python-discord") -        await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: -    """Load the Issues cog.""" -    bot.add_cog(Issues(bot)) diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py deleted file mode 100644 index 36c7e0ab..00000000 --- a/bot/exts/utilities/latex.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import hashlib -import pathlib -import re -from concurrent.futures import ThreadPoolExecutor -from io import BytesIO - -import discord -import matplotlib.pyplot as plt -from discord.ext import commands - -from bot.bot import Bot - -# configure fonts and colors for matplotlib -plt.rcParams.update( -    { -        "font.size": 16, -        "mathtext.fontset": "cm",  # Computer Modern font set -        "mathtext.rm": "serif", -        "figure.facecolor": "36393F",  # matches Discord's dark mode background color -        "text.color": "white", -    } -) - -FORMATTED_CODE_REGEX = re.compile( -    r"(?P<delim>(?P<block>```)|``?)"        # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block -    r"(?(block)(?:(?P<lang>[a-z]+)\n)?)"    # if we're in a block, match optional language (only letters plus newline) -    r"(?:[ \t]*\n)*"                        # any blank (empty or tabs/spaces only) lines before the code -    r"(?P<code>.*?)"                        # extract all code inside the markup -    r"\s*"                                  # any more whitespace before the end of the code markup -    r"(?P=delim)",                          # match the exact same delimiter from the start again -    re.DOTALL | re.IGNORECASE,              # "." also matches newlines, case insensitive -) - -CACHE_DIRECTORY = pathlib.Path("_latex_cache") -CACHE_DIRECTORY.mkdir(exist_ok=True) - - -class Latex(commands.Cog): -    """Renders latex.""" - -    @staticmethod -    def _render(text: str, filepath: pathlib.Path) -> BytesIO: -        """ -        Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. - -        Saves rendered image to cache. -        """ -        fig = plt.figure() -        rendered_image = BytesIO() -        fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") - -        try: -            plt.savefig(rendered_image, bbox_inches="tight", dpi=600) -        except ValueError as e: -            raise commands.BadArgument(str(e)) - -        rendered_image.seek(0) - -        with open(filepath, "wb") as f: -            f.write(rendered_image.getbuffer()) - -        return rendered_image - -    @staticmethod -    def _prepare_input(text: str) -> str: -        text = text.replace(r"\\", "$\n$")  # matplotlib uses \n for newlines, not \\ - -        if match := FORMATTED_CODE_REGEX.match(text): -            return match.group("code") -        else: -            return text - -    @commands.command() -    @commands.max_concurrency(1, commands.BucketType.guild, wait=True) -    async def latex(self, ctx: commands.Context, *, text: str) -> None: -        """Renders the text in latex and sends the image.""" -        text = self._prepare_input(text) -        query_hash = hashlib.md5(text.encode()).hexdigest() -        image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") -        async with ctx.typing(): -            if image_path.exists(): -                await ctx.send(file=discord.File(image_path)) -                return - -            with ThreadPoolExecutor() as pool: -                image = await asyncio.get_running_loop().run_in_executor( -                    pool, self._render, text, image_path -                ) - -            await ctx.send(file=discord.File(image, "latex.png")) - - -def setup(bot: Bot) -> None: -    """Load the Latex Cog.""" -    # As we have resource issues on this cog, -    # we have it currently disabled while we fix it. -    import logging -    logging.info("Latex cog is currently disabled. It won't be loaded.") -    return -    bot.add_cog(Latex()) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py index ef8b2638..bf8f1341 100644 --- a/bot/exts/utilities/realpython.py +++ b/bot/exts/utilities/realpython.py @@ -1,5 +1,6 @@  import logging  from html import unescape +from typing import Optional  from urllib.parse import quote_plus  from discord import Embed @@ -31,9 +32,18 @@ class RealPython(commands.Cog):      @commands.command(aliases=["rp"])      @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) -    async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: -        """Send 5 articles that match the user's search terms.""" -        params = {"q": user_search, "limit": 5, "kind": "article"} +    async def realpython(self, ctx: commands.Context, amount: Optional[int] = 5, *, user_search: str) -> None: +        """ +        Send some articles from RealPython that match the search terms. + +        By default the top 5 matches are sent, this can be overwritten to +        a number between 1 and 5 by specifying an amount before the search query. +        """ +        if not 1 <= amount <= 5: +            await ctx.send("`amount` must be between 1 and 5 (inclusive).") +            return + +        params = {"q": user_search, "limit": amount, "kind": "article"}          async with self.bot.http_session.get(url=API_ROOT, params=params) as response:              if response.status != 200:                  logger.error( diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py index e6cb5337..782583d2 100644 --- a/bot/exts/utilities/reddit.py +++ b/bot/exts/utilities/reddit.py @@ -244,7 +244,7 @@ class Reddit(Cog):          # Use only starting summary page for #reddit channel posts.          embed.description = self.build_pagination_pages(posts, paginate=False) -        embed.colour = Colour.blurple() +        embed.colour = Colour.og_blurple()          return embed      @loop() @@ -312,7 +312,7 @@ class Reddit(Cog):          await ctx.send(f"Here are the top {subreddit} posts of all time!")          embed = Embed( -            color=Colour.blurple() +            color=Colour.og_blurple()          )          await ImagePaginator.paginate(pages, ctx, embed) @@ -325,7 +325,7 @@ class Reddit(Cog):          await ctx.send(f"Here are today's top {subreddit} posts!")          embed = Embed( -            color=Colour.blurple() +            color=Colour.og_blurple()          )          await ImagePaginator.paginate(pages, ctx, embed) @@ -338,7 +338,7 @@ class Reddit(Cog):          await ctx.send(f"Here are this week's top {subreddit} posts!")          embed = Embed( -            color=Colour.blurple() +            color=Colour.og_blurple()          )          await ImagePaginator.paginate(pages, ctx, embed) @@ -349,7 +349,7 @@ class Reddit(Cog):          """Send a paginated embed of all the subreddits we're relaying."""          embed = Embed()          embed.title = "Relayed subreddits." -        embed.colour = Colour.blurple() +        embed.colour = Colour.og_blurple()          await LinePaginator.paginate(              RedditConfig.subreddits, diff --git a/bot/exts/utilities/twemoji.py b/bot/exts/utilities/twemoji.py new file mode 100644 index 00000000..c915f05b --- /dev/null +++ b/bot/exts/utilities/twemoji.py @@ -0,0 +1,150 @@ +import logging +import re +from typing import Literal, Optional + +import discord +from discord.ext import commands +from emoji import UNICODE_EMOJI_ENGLISH, is_emoji + +from bot.bot import Bot +from bot.constants import Colours, Roles +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) +BASE_URLS = { +    "png": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/72x72/", +    "svg": "https://raw.githubusercontent.com/twitter/twemoji/master/assets/svg/", +} +CODEPOINT_REGEX = re.compile(r"[a-f1-9][a-f0-9]{3,5}$") + + +class Twemoji(commands.Cog): +    """Utilities for working with Twemojis.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @staticmethod +    def get_url(codepoint: str, format: Literal["png", "svg"]) -> str: +        """Returns a source file URL for the specified Twemoji, in the corresponding format.""" +        return f"{BASE_URLS[format]}{codepoint}.{format}" + +    @staticmethod +    def alias_to_name(alias: str) -> str: +        """ +        Transform a unicode alias to an emoji name. + +        Example usages: +        >>> alias_to_name(":falling_leaf:") +        "Falling leaf" +        >>> alias_to_name(":family_man_girl_boy:") +        "Family man girl boy" +        """ +        name = alias.strip(":").replace("_", " ") +        return name.capitalize() + +    @staticmethod +    def build_embed(codepoint: str) -> discord.Embed: +        """Returns the main embed for the `twemoji` commmand.""" +        emoji = "".join(Twemoji.emoji(e) or "" for e in codepoint.split("-")) + +        embed = discord.Embed( +            title=Twemoji.alias_to_name(UNICODE_EMOJI_ENGLISH[emoji]), +            description=f"{codepoint.replace('-', ' ')}\n[Download svg]({Twemoji.get_url(codepoint, 'svg')})", +            colour=Colours.twitter_blue, +        ) +        embed.set_thumbnail(url=Twemoji.get_url(codepoint, "png")) +        return embed + +    @staticmethod +    def emoji(codepoint: Optional[str]) -> Optional[str]: +        """ +        Returns the emoji corresponding to a given `codepoint`, or `None` if no emoji was found. + +        The return value is an emoji character, such as "π". The `codepoint` +        argument can be of any format, since it will be trimmed automatically. +        """ +        if code := Twemoji.trim_code(codepoint): +            return chr(int(code, 16)) + +    @staticmethod +    def codepoint(emoji: Optional[str]) -> Optional[str]: +        """ +        Returns the codepoint, in a trimmed format, of a single emoji. + +        `emoji` should be an emoji character, such as "π" and "π₯°", and +        not a codepoint like "1f1f8". When working with combined emojis, +        such as "πΈπͺ" and "π¨βπ©βπ¦", send the component emojis through the method +        one at a time. +        """ +        if emoji is None: +            return None +        return hex(ord(emoji)).removeprefix("0x") + +    @staticmethod +    def trim_code(codepoint: Optional[str]) -> Optional[str]: +        """ +        Returns the meaningful information from the given `codepoint`. + +        If no codepoint is found, `None` is returned. + +        Example usages: +        >>> trim_code("U+1f1f8") +        "1f1f8" +        >>> trim_code("\u0001f1f8") +        "1f1f8" +        >>> trim_code("1f466") +        "1f466" +        """ +        if code := CODEPOINT_REGEX.search(codepoint or ""): +            return code.group() + +    @staticmethod +    def codepoint_from_input(raw_emoji: tuple[str, ...]) -> str: +        """ +        Returns the codepoint corresponding to the passed tuple, separated by "-". + +        The return format matches the format used in URLs for Twemoji source files. + +        Example usages: +        >>> codepoint_from_input(("π",)) +        "1f40d" +        >>> codepoint_from_input(("1f1f8", "1f1ea")) +        "1f1f8-1f1ea" +        >>> codepoint_from_input(("π¨βπ§βπ¦",)) +        "1f468-200d-1f467-200d-1f466" +        """ +        raw_emoji = [emoji.lower() for emoji in raw_emoji] +        if is_emoji(raw_emoji[0]): +            emojis = (Twemoji.codepoint(emoji) or "" for emoji in raw_emoji[0]) +            return "-".join(emojis) + +        emoji = "".join( +            Twemoji.emoji(Twemoji.trim_code(code)) or "" for code in raw_emoji +        ) +        if is_emoji(emoji): +            return "-".join(Twemoji.codepoint(e) or "" for e in emoji) + +        raise ValueError("No codepoint could be obtained from the given input") + +    @commands.command(aliases=("tw",)) +    @whitelist_override(roles=(Roles.everyone,)) +    async def twemoji(self, ctx: commands.Context, *raw_emoji: str) -> None: +        """Sends a preview of a given Twemoji, specified by codepoint or emoji.""" +        if len(raw_emoji) == 0: +            await invoke_help_command(ctx) +            return +        try: +            codepoint = self.codepoint_from_input(raw_emoji) +        except ValueError: +            raise commands.BadArgument( +                "please include a valid emoji or emoji codepoint." +            ) + +        await ctx.send(embed=self.build_embed(codepoint)) + + +def setup(bot: Bot) -> None: +    """Load the Twemoji cog.""" +    bot.add_cog(Twemoji(bot)) diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py index eccc1f8c..e5e8e289 100644 --- a/bot/exts/utilities/wikipedia.py +++ b/bot/exts/utilities/wikipedia.py @@ -82,13 +82,11 @@ class WikipediaSearch(commands.Cog):          if contents:              embed = Embed(                  title="Wikipedia Search Results", -                colour=Color.blurple() +                colour=Color.og_blurple()              )              embed.set_thumbnail(url=WIKI_THUMBNAIL)              embed.timestamp = datetime.utcnow() -            await LinePaginator.paginate( -                contents, ctx, embed -            ) +            await LinePaginator.paginate(contents, ctx, embed, restrict_to_user=ctx.author)          else:              await ctx.send(                  "Sorry, we could not find a wikipedia article using that search term." diff --git a/bot/exts/utilities/wtf_python.py b/bot/exts/utilities/wtf_python.py new file mode 100644 index 00000000..980b3dba --- /dev/null +++ b/bot/exts/utilities/wtf_python.py @@ -0,0 +1,138 @@ +import logging +import random +import re +from typing import Optional + +import rapidfuzz +from discord import Embed, File +from discord.ext import commands, tasks + +from bot import constants +from bot.bot import Bot + +log = logging.getLogger(__name__) + +WTF_PYTHON_RAW_URL = "http://raw.githubusercontent.com/satwikkansal/wtfpython/master/" +BASE_URL = "https://github.com/satwikkansal/wtfpython" +LOGO_PATH = "./bot/resources/utilities/wtf_python_logo.jpg" + +ERROR_MESSAGE = f""" +Unknown WTF Python Query. Please try to reformulate your query. + +**Examples**: +```md +{constants.Client.prefix}wtf wild imports +{constants.Client.prefix}wtf subclass +{constants.Client.prefix}wtf del +``` +If the problem persists send a message in <#{constants.Channels.dev_contrib}> +""" + +MINIMUM_CERTAINTY = 55 + + +class WTFPython(commands.Cog): +    """Cog that allows getting WTF Python entries from the WTF Python repository.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.headers: dict[str, str] = {} +        self.fetch_readme.start() + +    @tasks.loop(minutes=60) +    async def fetch_readme(self) -> None: +        """Gets the content of README.md from the WTF Python Repository.""" +        async with self.bot.http_session.get(f"{WTF_PYTHON_RAW_URL}README.md") as resp: +            log.trace("Fetching the latest WTF Python README.md") +            if resp.status == 200: +                raw = await resp.text() +                self.parse_readme(raw) + +    def parse_readme(self, data: str) -> None: +        """ +        Parses the README.md into a dict. + +        It parses the readme into the `self.headers` dict, +        where the key is the heading and the value is the +        link to the heading. +        """ +        # Match the start of examples, until the end of the table of contents (toc) +        table_of_contents = re.search( +            r"\[π Examples\]\(#-examples\)\n([\w\W]*)<!-- tocstop -->", data +        )[0].split("\n") + +        for header in list(map(str.strip, table_of_contents)): +            match = re.search(r"\[βΆ (.*)\]\((.*)\)", header) +            if match: +                hyper_link = match[0].split("(")[1].replace(")", "") +                self.headers[match[0]] = f"{BASE_URL}/{hyper_link}" + +    def fuzzy_match_header(self, query: str) -> Optional[str]: +        """ +        Returns the fuzzy match of a query if its ratio is above "MINIMUM_CERTAINTY" else returns None. + +        "MINIMUM_CERTAINTY" is the lowest score at which the fuzzy match will return a result. +        The certainty returned by rapidfuzz.process.extractOne is a score between 0 and 100, +        with 100 being a perfect match. +        """ +        match, certainty, _ = rapidfuzz.process.extractOne(query, self.headers.keys()) +        return match if certainty > MINIMUM_CERTAINTY else None + +    @commands.command(aliases=("wtf", "WTF")) +    async def wtf_python(self, ctx: commands.Context, *, query: Optional[str] = None) -> None: +        """ +        Search WTF Python repository. + +        Gets the link of the fuzzy matched query from https://github.com/satwikkansal/wtfpython. +        Usage: +            --> .wtf wild imports +        """ +        if query is None: +            no_query_embed = Embed( +                title="WTF Python?!", +                colour=constants.Colours.dark_green, +                description="A repository filled with suprising snippets that can make you say WTF?!\n\n" +                f"[Go to the Repository]({BASE_URL})" +            ) +            logo = File(LOGO_PATH, filename="wtf_logo.jpg") +            no_query_embed.set_thumbnail(url="attachment://wtf_logo.jpg") +            await ctx.send(embed=no_query_embed, file=logo) +            return + +        if len(query) > 50: +            embed = Embed( +                title=random.choice(constants.ERROR_REPLIES), +                description=ERROR_MESSAGE, +                colour=constants.Colours.soft_red, +            ) +            match = None +        else: +            match = self.fuzzy_match_header(query) + +        if not match: +            embed = Embed( +                title=random.choice(constants.ERROR_REPLIES), +                description=ERROR_MESSAGE, +                colour=constants.Colours.soft_red, +            ) +            await ctx.send(embed=embed) +            return + +        embed = Embed( +            title="WTF Python?!", +            colour=constants.Colours.dark_green, +            description=f"""Search result for '{query}': {match.split("]")[0].replace("[", "")} +            [Go to Repository Section]({self.headers[match]})""", +        ) +        logo = File(LOGO_PATH, filename="wtf_logo.jpg") +        embed.set_thumbnail(url="attachment://wtf_logo.jpg") +        await ctx.send(embed=embed, file=logo) + +    def cog_unload(self) -> None: +        """Unload the cog and cancel the task.""" +        self.fetch_readme.cancel() + + +def setup(bot: Bot) -> None: +    """Load the WTFPython Cog.""" +    bot.add_cog(WTFPython(bot))  |