diff options
Diffstat (limited to 'bot/exts/evergreen')
| -rw-r--r-- | bot/exts/evergreen/error_handler.py | 182 | ||||
| -rw-r--r-- | bot/exts/evergreen/help.py | 562 | ||||
| -rw-r--r-- | bot/exts/evergreen/ping.py | 45 | ||||
| -rw-r--r-- | bot/exts/evergreen/source.py | 85 |
4 files changed, 0 insertions, 874 deletions
diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py deleted file mode 100644 index fd2123e7..00000000 --- a/bot/exts/evergreen/error_handler.py +++ /dev/null @@ -1,182 +0,0 @@ -import difflib -import logging -import math -import random -from collections.abc import Iterable -from typing import Union - -from discord import Embed, Message -from discord.ext import commands -from sentry_sdk import push_scope - -from bot.bot import Bot -from bot.constants import Channels, Colours, ERROR_REPLIES, NEGATIVE_REPLIES, RedirectOutput -from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import APIError, UserNotPlayingError - -log = logging.getLogger(__name__) - - -QUESTION_MARK_ICON = "https://cdn.discordapp.com/emojis/512367613339369475.png" - - -class CommandErrorHandler(commands.Cog): - """A error handler for the PythonDiscord server.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @staticmethod - def revert_cooldown_counter(command: commands.Command, message: Message) -> None: - """Undoes the last cooldown counter for user-error cases.""" - if command._buckets.valid: - bucket = command._buckets.get_bucket(message) - bucket._tokens = min(bucket.rate, bucket._tokens + 1) - logging.debug("Cooldown counter reverted as the command was not used correctly.") - - @staticmethod - def error_embed(message: str, title: Union[Iterable, str] = ERROR_REPLIES) -> Embed: - """Build a basic embed with red colour and either a random error title or a title provided.""" - embed = Embed(colour=Colours.soft_red) - if isinstance(title, str): - embed.title = title - else: - embed.title = random.choice(title) - embed.description = message - return embed - - @commands.Cog.listener() - async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None: - """Activates when a command raises an error.""" - if getattr(error, "handled", False): - logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.") - return - - parent_command = "" - if subctx := getattr(ctx, "subcontext", None): - parent_command = f"{ctx.command} " - ctx = subctx - - error = getattr(error, "original", error) - logging.debug( - f"Error Encountered: {type(error).__name__} - {str(error)}, " - f"Command: {ctx.command}, " - f"Author: {ctx.author}, " - f"Channel: {ctx.channel}" - ) - - if isinstance(error, commands.CommandNotFound): - await self.send_command_suggestion(ctx, ctx.invoked_with) - return - - if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): - await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) - return - - if isinstance(error, commands.UserInputError): - self.revert_cooldown_counter(ctx.command, ctx.message) - usage = f"```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" - embed = self.error_embed( - f"Your input was invalid: {error}\n\nUsage:{usage}" - ) - await ctx.send(embed=embed) - return - - if isinstance(error, commands.CommandOnCooldown): - mins, secs = divmod(math.ceil(error.retry_after), 60) - embed = self.error_embed( - f"This command is on cooldown:\nPlease retry in {mins} minutes {secs} seconds.", - NEGATIVE_REPLIES - ) - await ctx.send(embed=embed, delete_after=7.5) - return - - if isinstance(error, commands.DisabledCommand): - await ctx.send(embed=self.error_embed("This command has been disabled.", NEGATIVE_REPLIES)) - return - - if isinstance(error, commands.NoPrivateMessage): - await ctx.send( - embed=self.error_embed( - f"This command can only be used in the server. Go to <#{Channels.community_bot_commands}> instead!", - NEGATIVE_REPLIES - ) - ) - return - - if isinstance(error, commands.BadArgument): - self.revert_cooldown_counter(ctx.command, ctx.message) - embed = self.error_embed( - "The argument you provided was invalid: " - f"{error}\n\nUsage:\n```\n{ctx.prefix}{parent_command}{ctx.command} {ctx.command.signature}\n```" - ) - await ctx.send(embed=embed) - return - - if isinstance(error, commands.CheckFailure): - await ctx.send(embed=self.error_embed("You are not authorized to use this command.", NEGATIVE_REPLIES)) - return - - if isinstance(error, UserNotPlayingError): - await ctx.send("Game not found.") - return - - if isinstance(error, APIError): - await ctx.send( - embed=self.error_embed( - f"There was an error when communicating with the {error.api}", - NEGATIVE_REPLIES - ) - ) - return - - with push_scope() as scope: - scope.user = { - "id": ctx.author.id, - "username": str(ctx.author) - } - - scope.set_tag("command", ctx.command.qualified_name) - scope.set_tag("message_id", ctx.message.id) - scope.set_tag("channel_id", ctx.channel.id) - - scope.set_extra("full_message", ctx.message.content) - - if ctx.guild is not None: - scope.set_extra("jump_to", ctx.message.jump_url) - - log.exception(f"Unhandled command error: {str(error)}", exc_info=error) - - async def send_command_suggestion(self, ctx: commands.Context, command_name: str) -> None: - """Sends user similar commands if any can be found.""" - raw_commands = [] - for cmd in self.bot.walk_commands(): - if not cmd.hidden: - raw_commands += (cmd.name, *cmd.aliases) - if similar_command_data := difflib.get_close_matches(command_name, raw_commands, 1): - similar_command_name = similar_command_data[0] - similar_command = self.bot.get_command(similar_command_name) - - if not similar_command: - return - - log_msg = "Cancelling attempt to suggest a command due to failed checks." - try: - if not await similar_command.can_run(ctx): - log.debug(log_msg) - return - except commands.errors.CommandError as cmd_error: - log.debug(log_msg) - await self.on_command_error(ctx, cmd_error) - return - - misspelled_content = ctx.message.content - e = Embed() - e.set_author(name="Did you mean:", icon_url=QUESTION_MARK_ICON) - e.description = misspelled_content.replace(command_name, similar_command_name, 1) - await ctx.send(embed=e, delete_after=RedirectOutput.delete_delay) - - -def setup(bot: Bot) -> None: - """Load the ErrorHandler cog.""" - bot.add_cog(CommandErrorHandler(bot)) diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py deleted file mode 100644 index 4b766b50..00000000 --- a/bot/exts/evergreen/help.py +++ /dev/null @@ -1,562 +0,0 @@ -# Help command from Python bot. All commands that will be added to there in futures should be added to here too. -import asyncio -import itertools -import logging -from contextlib import suppress -from typing import NamedTuple, Union - -from discord import Colour, Embed, HTTPException, Message, Reaction, User -from discord.ext import commands -from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Context -from rapidfuzz import process - -from bot import constants -from bot.bot import Bot -from bot.constants import Emojis -from bot.utils.pagination import ( - FIRST_EMOJI, LAST_EMOJI, - LEFT_EMOJI, LinePaginator, RIGHT_EMOJI, -) - -DELETE_EMOJI = Emojis.trashcan - -REACTIONS = { - FIRST_EMOJI: "first", - LEFT_EMOJI: "back", - RIGHT_EMOJI: "next", - LAST_EMOJI: "end", - DELETE_EMOJI: "stop", -} - - -class Cog(NamedTuple): - """Show information about a Cog's name, description and commands.""" - - name: str - description: str - commands: list[Command] - - -log = logging.getLogger(__name__) - - -class HelpQueryNotFound(ValueError): - """ - Raised when a HelpSession Query doesn't match a command or cog. - - Contains the custom attribute of ``possible_matches``. - Instances of this object contain a dictionary of any command(s) that were close to matching the - query, where keys are the possible matched command names and values are the likeness match scores. - """ - - def __init__(self, arg: str, possible_matches: dict = None): - super().__init__(arg) - self.possible_matches = possible_matches - - -class HelpSession: - """ - An interactive session for bot and command help output. - - Expected attributes include: - * title: str - The title of the help message. - * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command] - * description: str - The description of the query. - * pages: list[str] - A list of the help content split into manageable pages. - * message: `discord.Message` - The message object that's showing the help contents. - * destination: `discord.abc.Messageable` - Where the help message is to be sent to. - Cogs can be grouped into custom categories. All cogs with the same category will be displayed - under a single category name in the help output. Custom categories are defined inside the cogs - as a class attribute named `category`. A description can also be specified with the attribute - `category_description`. If a description is not found in at least one cog, the default will be - the regular description (class docstring) of the first cog found in the category. - """ - - def __init__( - self, - ctx: Context, - *command, - cleanup: bool = False, - only_can_run: bool = True, - show_hidden: bool = False, - max_lines: int = 15 - ): - """Creates an instance of the HelpSession class.""" - self._ctx = ctx - self._bot = ctx.bot - self.title = "Command Help" - - # set the query details for the session - if command: - query_str = " ".join(command) - self.query = self._get_query(query_str) - self.description = self.query.description or self.query.help - else: - self.query = ctx.bot - self.description = self.query.description - self.author = ctx.author - self.destination = ctx.channel - - # set the config for the session - self._cleanup = cleanup - self._only_can_run = only_can_run - self._show_hidden = show_hidden - self._max_lines = max_lines - - # init session states - self._pages = None - self._current_page = 0 - self.message = None - self._timeout_task = None - self.reset_timeout() - - def _get_query(self, query: str) -> Union[Command, Cog]: - """Attempts to match the provided query with a valid command or cog.""" - command = self._bot.get_command(query) - if command: - return command - - # Find all cog categories that match. - cog_matches = [] - description = None - for cog in self._bot.cogs.values(): - if hasattr(cog, "category") and cog.category == query: - cog_matches.append(cog) - if hasattr(cog, "category_description"): - description = cog.category_description - - # Try to search by cog name if no categories match. - if not cog_matches: - cog = self._bot.cogs.get(query) - - # Don't consider it a match if the cog has a category. - if cog and not hasattr(cog, "category"): - cog_matches = [cog] - - if cog_matches: - cog = cog_matches[0] - cmds = (cog.get_commands() for cog in cog_matches) # Commands of all cogs - - return Cog( - name=cog.category if hasattr(cog, "category") else cog.qualified_name, - description=description or cog.description, - commands=tuple(itertools.chain.from_iterable(cmds)) # Flatten the list - ) - - self._handle_not_found(query) - - def _handle_not_found(self, query: str) -> None: - """ - Handles when a query does not match a valid command or cog. - - Will pass on possible close matches along with the `HelpQueryNotFound` exception. - """ - # Combine command and cog names - choices = list(self._bot.all_commands) + list(self._bot.cogs) - - result = process.extract(query, choices, score_cutoff=90) - - raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) - - async def timeout(self, seconds: int = 30) -> None: - """Waits for a set number of seconds, then stops the help session.""" - await asyncio.sleep(seconds) - await self.stop() - - def reset_timeout(self) -> None: - """Cancels the original timeout task and sets it again from the start.""" - # cancel original if it exists - if self._timeout_task: - if not self._timeout_task.cancelled(): - self._timeout_task.cancel() - - # recreate the timeout task - self._timeout_task = self._bot.loop.create_task(self.timeout()) - - async def on_reaction_add(self, reaction: Reaction, user: User) -> None: - """Event handler for when reactions are added on the help message.""" - # ensure it was the relevant session message - if reaction.message.id != self.message.id: - return - - # ensure it was the session author who reacted - if user.id != self.author.id: - return - - emoji = str(reaction.emoji) - - # check if valid action - if emoji not in REACTIONS: - return - - self.reset_timeout() - - # Run relevant action method - action = getattr(self, f"do_{REACTIONS[emoji]}", None) - if action: - await action() - - # remove the added reaction to prep for re-use - with suppress(HTTPException): - await self.message.remove_reaction(reaction, user) - - async def on_message_delete(self, message: Message) -> None: - """Closes the help session when the help message is deleted.""" - if message.id == self.message.id: - await self.stop() - - async def prepare(self) -> None: - """Sets up the help session pages, events, message and reactions.""" - await self.build_pages() - - self._bot.add_listener(self.on_reaction_add) - self._bot.add_listener(self.on_message_delete) - - await self.update_page() - self.add_reactions() - - def add_reactions(self) -> None: - """Adds the relevant reactions to the help message based on if pagination is required.""" - # if paginating - if len(self._pages) > 1: - for reaction in REACTIONS: - self._bot.loop.create_task(self.message.add_reaction(reaction)) - - # if single-page - else: - self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI)) - - def _category_key(self, cmd: Command) -> str: - """ - Returns a cog name of a given command for use as a key for `sorted` and `groupby`. - - A zero width space is used as a prefix for results with no cogs to force them last in ordering. - """ - if cmd.cog: - try: - if cmd.cog.category: - return f"**{cmd.cog.category}**" - except AttributeError: - pass - - return f"**{cmd.cog_name}**" - else: - return "**\u200bNo Category:**" - - def _get_command_params(self, cmd: Command) -> str: - """ - Returns the command usage signature. - - This is a custom implementation of `command.signature` in order to format the command - signature without aliases. - """ - results = [] - for name, param in cmd.clean_params.items(): - - # if argument has a default value - if param.default is not param.empty: - - if isinstance(param.default, str): - show_default = param.default - else: - show_default = param.default is not None - - # if default is not an empty string or None - if show_default: - results.append(f"[{name}={param.default}]") - else: - results.append(f"[{name}]") - - # if variable length argument - elif param.kind == param.VAR_POSITIONAL: - results.append(f"[{name}...]") - - # if required - else: - results.append(f"<{name}>") - - return f"{cmd.name} {' '.join(results)}" - - async def build_pages(self) -> None: - """Builds the list of content pages to be paginated through in the help message, as a list of str.""" - # Use LinePaginator to restrict embed line height - paginator = LinePaginator(prefix="", suffix="", max_lines=self._max_lines) - - # show signature if query is a command - if isinstance(self.query, commands.Command): - await self._add_command_signature(paginator) - - if isinstance(self.query, Cog): - paginator.add_line(f"**{self.query.name}**") - - if self.description: - paginator.add_line(f"*{self.description}*") - - # list all children commands of the queried object - if isinstance(self.query, (commands.GroupMixin, Cog)): - await self._list_child_commands(paginator) - - self._pages = paginator.pages - - async def _add_command_signature(self, paginator: LinePaginator) -> None: - prefix = constants.Client.prefix - - signature = self._get_command_params(self.query) - parent = self.query.full_parent_name + " " if self.query.parent else "" - paginator.add_line(f"**```\n{prefix}{parent}{signature}\n```**") - aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in self.query.aliases] - aliases += [f"`{alias}`" for alias in getattr(self.query, "root_aliases", ())] - aliases = ", ".join(sorted(aliases)) - if aliases: - paginator.add_line(f"**Can also use:** {aliases}\n") - if not await self.query.can_run(self._ctx): - paginator.add_line("***You cannot run this command.***\n") - - async def _list_child_commands(self, paginator: LinePaginator) -> None: - # remove hidden commands if session is not wanting hiddens - if not self._show_hidden: - filtered = [c for c in self.query.commands if not c.hidden] - else: - filtered = self.query.commands - - # if after filter there are no commands, finish up - if not filtered: - self._pages = paginator.pages - return - - if isinstance(self.query, Cog): - grouped = (("**Commands:**", self.query.commands),) - - elif isinstance(self.query, commands.Command): - grouped = (("**Subcommands:**", self.query.commands),) - - # otherwise sort and organise all commands into categories - else: - cat_sort = sorted(filtered, key=self._category_key) - grouped = itertools.groupby(cat_sort, key=self._category_key) - - for category, cmds in grouped: - await self._format_command_category(paginator, category, list(cmds)) - - async def _format_command_category(self, paginator: LinePaginator, category: str, cmds: list[Command]) -> None: - cmds = sorted(cmds, key=lambda c: c.name) - cat_cmds = [] - for command in cmds: - cat_cmds += await self._format_command(command) - - # state var for if the category should be added next - print_cat = 1 - new_page = True - - for details in cat_cmds: - - # keep details together, paginating early if it won"t fit - lines_adding = len(details.split("\n")) + print_cat - if paginator._linecount + lines_adding > self._max_lines: - paginator._linecount = 0 - new_page = True - paginator.close_page() - - # new page so print category title again - print_cat = 1 - - if print_cat: - if new_page: - paginator.add_line("") - paginator.add_line(category) - print_cat = 0 - - paginator.add_line(details) - - async def _format_command(self, command: Command) -> list[str]: - # skip if hidden and hide if session is set to - if command.hidden and not self._show_hidden: - return [] - - # Patch to make the !help command work outside of #bot-commands again - # This probably needs a proper rewrite, but this will make it work in - # the mean time. - try: - can_run = await command.can_run(self._ctx) - except CheckFailure: - can_run = False - - # see if the user can run the command - strikeout = "" - if not can_run: - # skip if we don't show commands they can't run - if self._only_can_run: - return [] - strikeout = "~~" - - if isinstance(self.query, commands.Command): - prefix = "" - else: - prefix = constants.Client.prefix - - signature = self._get_command_params(command) - info = f"{strikeout}**`{prefix}{signature}`**{strikeout}" - - # handle if the command has no docstring - short_doc = command.short_doc or "No details provided" - return [f"{info}\n*{short_doc}*"] - - def embed_page(self, page_number: int = 0) -> Embed: - """Returns an Embed with the requested page formatted within.""" - embed = Embed() - - if isinstance(self.query, (commands.Command, Cog)) and page_number > 0: - title = f'Command Help | "{self.query.name}"' - else: - title = self.title - - embed.set_author(name=title, icon_url=constants.Icons.questionmark) - embed.description = self._pages[page_number] - - page_count = len(self._pages) - if page_count > 1: - embed.set_footer(text=f"Page {self._current_page+1} / {page_count}") - - return embed - - async def update_page(self, page_number: int = 0) -> None: - """Sends the intial message, or changes the existing one to the given page number.""" - self._current_page = page_number - embed_page = self.embed_page(page_number) - - if not self.message: - self.message = await self.destination.send(embed=embed_page) - else: - await self.message.edit(embed=embed_page) - - @classmethod - async def start(cls, ctx: Context, *command, **options) -> "HelpSession": - """ - Create and begin a help session based on the given command context. - - Available options kwargs: - * cleanup: Optional[bool] - Set to `True` to have the message deleted on session end. Defaults to `False`. - * only_can_run: Optional[bool] - Set to `True` to hide commands the user can't run. Defaults to `False`. - * show_hidden: Optional[bool] - Set to `True` to include hidden commands. Defaults to `False`. - * max_lines: Optional[int] - Sets the max number of lines the paginator will add to a single page. Defaults to 20. - """ - session = cls(ctx, *command, **options) - await session.prepare() - - return session - - async def stop(self) -> None: - """Stops the help session, removes event listeners and attempts to delete the help message.""" - self._bot.remove_listener(self.on_reaction_add) - self._bot.remove_listener(self.on_message_delete) - - # ignore if permission issue, or the message doesn't exist - with suppress(HTTPException, AttributeError): - if self._cleanup: - await self.message.delete() - else: - await self.message.clear_reactions() - - @property - def is_first_page(self) -> bool: - """Check if session is currently showing the first page.""" - return self._current_page == 0 - - @property - def is_last_page(self) -> bool: - """Check if the session is currently showing the last page.""" - return self._current_page == (len(self._pages)-1) - - async def do_first(self) -> None: - """Event that is called when the user requests the first page.""" - if not self.is_first_page: - await self.update_page(0) - - async def do_back(self) -> None: - """Event that is called when the user requests the previous page.""" - if not self.is_first_page: - await self.update_page(self._current_page-1) - - async def do_next(self) -> None: - """Event that is called when the user requests the next page.""" - if not self.is_last_page: - await self.update_page(self._current_page+1) - - async def do_end(self) -> None: - """Event that is called when the user requests the last page.""" - if not self.is_last_page: - await self.update_page(len(self._pages)-1) - - async def do_stop(self) -> None: - """Event that is called when the user requests to stop the help session.""" - await self.message.delete() - - -class Help(DiscordCog): - """Custom Embed Pagination Help feature.""" - - @commands.command("help") - async def new_help(self, ctx: Context, *commands) -> None: - """Shows Command Help.""" - try: - await HelpSession.start(ctx, *commands) - except HelpQueryNotFound as error: - embed = Embed() - embed.colour = Colour.red() - embed.title = str(error) - - if error.possible_matches: - matches = "\n".join(error.possible_matches.keys()) - embed.description = f"**Did you mean:**\n`{matches}`" - - await ctx.send(embed=embed) - - -def unload(bot: Bot) -> None: - """ - Reinstates the original help command. - - This is run if the cog raises an exception on load, or if the extension is unloaded. - """ - bot.remove_command("help") - bot.add_command(bot._old_help) - - -def setup(bot: Bot) -> None: - """ - The setup for the help extension. - - This is called automatically on `bot.load_extension` being run. - Stores the original help command instance on the `bot._old_help` attribute for later - reinstatement, before removing it from the command registry so the new help command can be - loaded successfully. - If an exception is raised during the loading of the cog, `unload` will be called in order to - reinstate the original help command. - """ - bot._old_help = bot.get_command("help") - bot.remove_command("help") - - try: - bot.add_cog(Help()) - except Exception: - unload(bot) - raise - - -def teardown(bot: Bot) -> None: - """ - The teardown for the help extension. - - This is called automatically on `bot.unload_extension` being run. - Calls `unload` in order to reinstate the original help command. - """ - unload(bot) diff --git a/bot/exts/evergreen/ping.py b/bot/exts/evergreen/ping.py deleted file mode 100644 index 6be78117..00000000 --- a/bot/exts/evergreen/ping.py +++ /dev/null @@ -1,45 +0,0 @@ -import arrow -from dateutil.relativedelta import relativedelta -from discord import Embed -from discord.ext import commands - -from bot import start_time -from bot.bot import Bot -from bot.constants import Colours - - -class Ping(commands.Cog): - """Get info about the bot's ping and uptime.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.command(name="ping") - async def ping(self, ctx: commands.Context) -> None: - """Ping the bot to see its latency and state.""" - embed = Embed( - title=":ping_pong: Pong!", - colour=Colours.bright_green, - description=f"Gateway Latency: {round(self.bot.latency * 1000)}ms", - ) - - await ctx.send(embed=embed) - - # Originally made in 70d2170a0a6594561d59c7d080c4280f1ebcd70b by lemon & gdude2002 - @commands.command(name="uptime") - async def uptime(self, ctx: commands.Context) -> None: - """Get the current uptime of the bot.""" - difference = relativedelta(start_time - arrow.utcnow()) - uptime_string = start_time.shift( - seconds=-difference.seconds, - minutes=-difference.minutes, - hours=-difference.hours, - days=-difference.days - ).humanize() - - await ctx.send(f"I started up {uptime_string}.") - - -def setup(bot: Bot) -> None: - """Load the Ping cog.""" - bot.add_cog(Ping(bot)) diff --git a/bot/exts/evergreen/source.py b/bot/exts/evergreen/source.py deleted file mode 100644 index 7572ce51..00000000 --- a/bot/exts/evergreen/source.py +++ /dev/null @@ -1,85 +0,0 @@ -import inspect -from pathlib import Path -from typing import Optional - -from discord import Embed -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Source -from bot.utils.converters import SourceConverter, SourceType - - -class BotSource(commands.Cog): - """Displays information about the bot's source code.""" - - @commands.command(name="source", aliases=("src",)) - async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: - """Display information and a GitHub link to the source code of a command, tag, or cog.""" - if not source_item: - embed = Embed(title="Sir Lancebot's GitHub Repository") - embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") - embed.set_thumbnail(url=Source.github_avatar_url) - await ctx.send(embed=embed) - return - - embed = await self.build_embed(source_item) - await ctx.send(embed=embed) - - def get_source_link(self, source_item: SourceType) -> tuple[str, str, Optional[int]]: - """ - Build GitHub link of source item, return this link, file location and first line number. - - Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval). - """ - if isinstance(source_item, commands.Command): - callback = inspect.unwrap(source_item.callback) - src = callback.__code__ - filename = src.co_filename - else: - src = type(source_item) - try: - filename = inspect.getsourcefile(src) - except TypeError: - raise commands.BadArgument("Cannot get source for a dynamically-created object.") - - if not isinstance(source_item, str): - try: - lines, first_line_no = inspect.getsourcelines(src) - except OSError: - raise commands.BadArgument("Cannot get source for a dynamically-created object.") - - lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}" - else: - first_line_no = None - lines_extension = "" - - file_location = Path(filename).relative_to(Path.cwd()).as_posix() - - url = f"{Source.github}/blob/main/{file_location}{lines_extension}" - - return url, file_location, first_line_no or None - - async def build_embed(self, source_object: SourceType) -> Optional[Embed]: - """Build embed based on source object.""" - url, location, first_line = self.get_source_link(source_object) - - if isinstance(source_object, commands.Command): - description = source_object.short_doc - title = f"Command: {source_object.qualified_name}" - else: - title = f"Cog: {source_object.qualified_name}" - description = source_object.description.splitlines()[0] - - embed = Embed(title=title, description=description) - embed.set_thumbnail(url=Source.github_avatar_url) - embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})") - line_text = f":{first_line}" if first_line else "" - embed.set_footer(text=f"{location}{line_text}") - - return embed - - -def setup(bot: Bot) -> None: - """Load the BotSource cog.""" - bot.add_cog(BotSource()) |