diff options
Diffstat (limited to 'bot/exts/utilities')
| -rw-r--r-- | bot/exts/utilities/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/utilities/bookmark.py | 153 | ||||
| -rw-r--r-- | bot/exts/utilities/cheatsheet.py | 112 | ||||
| -rw-r--r-- | bot/exts/utilities/conversationstarters.py | 69 | ||||
| -rw-r--r-- | bot/exts/utilities/emoji.py | 123 | ||||
| -rw-r--r-- | bot/exts/utilities/githubinfo.py | 178 | ||||
| -rw-r--r-- | bot/exts/utilities/issues.py | 275 | ||||
| -rw-r--r-- | bot/exts/utilities/latex.py | 101 | ||||
| -rw-r--r-- | bot/exts/utilities/pythonfacts.py | 36 | ||||
| -rw-r--r-- | bot/exts/utilities/realpython.py | 81 | ||||
| -rw-r--r-- | bot/exts/utilities/reddit.py | 368 | ||||
| -rw-r--r-- | bot/exts/utilities/stackoverflow.py | 88 | ||||
| -rw-r--r-- | bot/exts/utilities/timed.py | 48 | ||||
| -rw-r--r-- | bot/exts/utilities/wikipedia.py | 100 | ||||
| -rw-r--r-- | bot/exts/utilities/wolfram.py | 293 | 
15 files changed, 2025 insertions, 0 deletions
| diff --git a/bot/exts/utilities/__init__.py b/bot/exts/utilities/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/utilities/__init__.py diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py new file mode 100644 index 00000000..a91ef1c0 --- /dev/null +++ b/bot/exts/utilities/bookmark.py @@ -0,0 +1,153 @@ +import asyncio +import logging +import random +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS +from bot.utils.converters import WrappedMessageConverter +from bot.utils.decorators import whitelist_override + +log = logging.getLogger(__name__) + +# Number of seconds to wait for other users to bookmark the same message +TIMEOUT = 120 +BOOKMARK_EMOJI = "📌" +WHITELISTED_CATEGORIES = (Categories.help_in_use,) + + +class Bookmark(commands.Cog): +    """Creates personal bookmarks by relaying a message link to the user's DMs.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @staticmethod +    def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed: +        """Build the embed to DM the bookmark requester.""" +        embed = discord.Embed( +            title=title, +            description=target_message.content, +            colour=Colours.soft_green +        ) +        embed.add_field( +            name="Wanna give it a visit?", +            value=f"[Visit original message]({target_message.jump_url})" +        ) +        embed.set_author(name=target_message.author, icon_url=target_message.author.display_avatar.url) +        embed.set_thumbnail(url=Icons.bookmark) + +        return embed + +    @staticmethod +    def build_error_embed(user: discord.Member) -> discord.Embed: +        """Builds an error embed for when a bookmark requester has DMs disabled.""" +        return discord.Embed( +            title=random.choice(ERROR_REPLIES), +            description=f"{user.mention}, please enable your DMs to receive the bookmark.", +            colour=Colours.soft_red +        ) + +    async def action_bookmark( +        self, +        channel: discord.TextChannel, +        user: discord.Member, +        target_message: discord.Message, +        title: str +    ) -> None: +        """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" +        try: +            embed = self.build_bookmark_dm(target_message, title) +            await user.send(embed=embed) +        except discord.Forbidden: +            error_embed = self.build_error_embed(user) +            await channel.send(embed=error_embed) +        else: +            log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") + +    @staticmethod +    async def send_reaction_embed( +        channel: discord.TextChannel, +        target_message: discord.Message +    ) -> discord.Message: +        """Sends an embed, with a reaction, so users can react to bookmark the message too.""" +        message = await channel.send( +            embed=discord.Embed( +                description=( +                    f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " +                    f"[this message]({target_message.jump_url})." +                ), +                colour=Colours.soft_green +            ) +        ) + +        await message.add_reaction(BOOKMARK_EMOJI) +        return message + +    @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) +    @commands.command(name="bookmark", aliases=("bm", "pin")) +    async def bookmark( +        self, +        ctx: commands.Context, +        target_message: Optional[WrappedMessageConverter], +        *, +        title: str = "Bookmark" +    ) -> None: +        """Send the author a link to `target_message` via DMs.""" +        if not target_message: +            if not ctx.message.reference: +                raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") +            target_message = ctx.message.reference.resolved + +        # Prevent users from bookmarking a message in a channel they don't have access to +        permissions = target_message.channel.permissions_for(ctx.author) +        if not permissions.read_messages: +            log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.") +            embed = discord.Embed( +                title=random.choice(ERROR_REPLIES), +                color=Colours.soft_red, +                description="You don't have permission to view this channel." +            ) +            await ctx.send(embed=embed) +            return + +        def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: +            """Make sure that this reaction is what we want to operate on.""" +            return ( +                # Conditions for a successful pagination: +                all(( +                    # Reaction is on this message +                    reaction.message.id == reaction_message.id, +                    # User has not already bookmarked this message +                    user.id not in bookmarked_users, +                    # Reaction is the `BOOKMARK_EMOJI` emoji +                    str(reaction.emoji) == BOOKMARK_EMOJI, +                    # Reaction was not made by the Bot +                    user.id != self.bot.user.id +                )) +            ) +        await self.action_bookmark(ctx.channel, ctx.author, target_message, title) + +        # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs +        bookmarked_users = [ctx.author.id] +        reaction_message = await self.send_reaction_embed(ctx.channel, target_message) + +        while True: +            try: +                _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check) +            except asyncio.TimeoutError: +                log.debug("Timed out waiting for a reaction") +                break +            log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.") +            await self.action_bookmark(ctx.channel, user, target_message, title) +            bookmarked_users.append(user.id) + +        await reaction_message.delete() + + +def setup(bot: Bot) -> None: +    """Load the Bookmark cog.""" +    bot.add_cog(Bookmark(bot)) diff --git a/bot/exts/utilities/cheatsheet.py b/bot/exts/utilities/cheatsheet.py new file mode 100644 index 00000000..33d29f67 --- /dev/null +++ b/bot/exts/utilities/cheatsheet.py @@ -0,0 +1,112 @@ +import random +import re +from typing import Union +from urllib.parse import quote_plus + +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Context + +from bot import constants +from bot.bot import Bot +from bot.constants import Categories, Channels, Colours, ERROR_REPLIES +from bot.utils.decorators import whitelist_override + +ERROR_MESSAGE = f""" +Unknown cheat sheet. Please try to reformulate your query. + +**Examples**: +```md +{constants.Client.prefix}cht read json +{constants.Client.prefix}cht hello world +{constants.Client.prefix}cht lambda +``` +If the problem persists send a message in <#{Channels.dev_contrib}> +""" + +URL = "https://cheat.sh/python/{search}" +ESCAPE_TT = str.maketrans({"`": "\\`"}) +ANSI_RE = re.compile(r"\x1b\[.*?m") +# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html. +HEADERS = {"User-Agent": "curl/7.68.0"} + + +class CheatSheet(commands.Cog): +    """Commands that sends a result of a cht.sh search in code blocks.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @staticmethod +    def fmt_error_embed() -> Embed: +        """ +        Format the Error Embed. + +        If the cht.sh search returned 404, overwrite it to send a custom error embed. +        link -> https://github.com/chubin/cheat.sh/issues/198 +        """ +        embed = Embed( +            title=random.choice(ERROR_REPLIES), +            description=ERROR_MESSAGE, +            colour=Colours.soft_red +        ) +        return embed + +    def result_fmt(self, url: str, body_text: str) -> tuple[bool, Union[str, Embed]]: +        """Format Result.""" +        if body_text.startswith("#  404 NOT FOUND"): +            embed = self.fmt_error_embed() +            return True, embed + +        body_space = min(1986 - len(url), 1000) + +        if len(body_text) > body_space: +            description = ( +                f"**Result Of cht.sh**\n" +                f"```python\n{body_text[:body_space]}\n" +                f"... (truncated - too many lines)\n```\n" +                f"Full results: {url} " +            ) +        else: +            description = ( +                f"**Result Of cht.sh**\n" +                f"```python\n{body_text}\n```\n" +                f"{url}" +            ) +        return False, description + +    @commands.command( +        name="cheat", +        aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"), +    ) +    @commands.cooldown(1, 10, BucketType.user) +    @whitelist_override(categories=[Categories.help_in_use]) +    async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None: +        """ +        Search cheat.sh. + +        Gets a post from https://cheat.sh/python/ by default. +        Usage: +        --> .cht read json +        """ +        async with ctx.typing(): +            search_string = quote_plus(" ".join(search_terms)) + +            async with self.bot.http_session.get( +                    URL.format(search=search_string), headers=HEADERS +            ) as response: +                result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT) + +            is_embed, description = self.result_fmt( +                URL.format(search=search_string), +                result +            ) +            if is_embed: +                await ctx.send(embed=description) +            else: +                await ctx.send(content=description) + + +def setup(bot: Bot) -> None: +    """Load the CheatSheet cog.""" +    bot.add_cog(CheatSheet(bot)) diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py new file mode 100644 index 00000000..dd537022 --- /dev/null +++ b/bot/exts/utilities/conversationstarters.py @@ -0,0 +1,69 @@ +from pathlib import Path + +import yaml +from discord import Color, Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import WHITELISTED_CHANNELS +from bot.utils.decorators import whitelist_override +from bot.utils.randomization import RandomCycle + +SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9" + +with Path("bot/resources/utilities/starter.yaml").open("r", encoding="utf8") as f: +    STARTERS = yaml.load(f, Loader=yaml.FullLoader) + +with Path("bot/resources/utilities/py_topics.yaml").open("r", encoding="utf8") as f: +    # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help. +    PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader) + +    # Removing `None` from lists of topics, if not a list, it is changed to an empty one. +    PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()} + +    # All the allowed channels that the ".topic" command is allowed to be executed in. +    ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS) + +# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions. +ALL_TOPICS = {"default": STARTERS, **PY_TOPICS} +TOPICS = { +    channel: RandomCycle(topics or ["No topics found for this channel."]) +    for channel, topics in ALL_TOPICS.items() +} + + +class ConvoStarters(commands.Cog): +    """General conversation topics.""" + +    @commands.command() +    @whitelist_override(channels=ALL_ALLOWED_CHANNELS) +    async def topic(self, ctx: commands.Context) -> None: +        """ +        Responds with a random topic to start a conversation. + +        If in a Python channel, a python-related topic will be given. + +        Otherwise, a random conversation topic will be received by the user. +        """ +        # No matter what, the form will be shown. +        embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple()) + +        try: +            # Fetching topics. +            channel_topics = TOPICS[ctx.channel.id] + +        # If the channel isn't Python-related. +        except KeyError: +            embed.title = f"**{next(TOPICS['default'])}**" + +        # If the channel ID doesn't have any topics. +        else: +            embed.title = f"**{next(channel_topics)}**" + +        finally: +            await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the ConvoStarters cog.""" +    bot.add_cog(ConvoStarters()) diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py new file mode 100644 index 00000000..55d6b8e9 --- /dev/null +++ b/bot/exts/utilities/emoji.py @@ -0,0 +1,123 @@ +import logging +import random +import textwrap +from collections import defaultdict +from datetime import datetime +from typing import Optional + +from discord import Color, Embed, Emoji +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, ERROR_REPLIES +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import LinePaginator +from bot.utils.time import time_since + +log = logging.getLogger(__name__) + + +class Emojis(commands.Cog): +    """A collection of commands related to emojis in the server.""" + +    @staticmethod +    def embed_builder(emoji: dict) -> tuple[Embed, list[str]]: +        """Generates an embed with the emoji names and count.""" +        embed = Embed( +            color=Colours.orange, +            title="Emoji Count", +            timestamp=datetime.utcnow() +        ) +        msg = [] + +        if len(emoji) == 1: +            for category_name, category_emojis in emoji.items(): +                if len(category_emojis) == 1: +                    msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.") +                else: +                    msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.") +                embed.set_thumbnail(url=random.choice(category_emojis).url) + +        else: +            for category_name, category_emojis in emoji.items(): +                emoji_choice = random.choice(category_emojis) +                if len(category_emojis) > 1: +                    emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category." +                else: +                    emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category." +                if emoji_choice.animated: +                    msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") +                else: +                    msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") +        return embed, msg + +    @staticmethod +    def generate_invalid_embed(emojis: list[Emoji]) -> tuple[Embed, list[str]]: +        """Generates error embed for invalid emoji categories.""" +        embed = Embed( +            color=Colours.soft_red, +            title=random.choice(ERROR_REPLIES) +        ) +        msg = [] + +        emoji_dict = defaultdict(list) +        for emoji in emojis: +            emoji_dict[emoji.name.split("_")[0]].append(emoji) + +        error_comp = ", ".join(emoji_dict) +        msg.append(f"These are the valid emoji categories:\n```\n{error_comp}\n```") +        return embed, msg + +    @commands.group(name="emoji", invoke_without_command=True) +    async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None: +        """A group of commands related to emojis.""" +        if emoji is not None: +            await ctx.invoke(self.info_command, emoji) +        else: +            await invoke_help_command(ctx) + +    @emoji_group.command(name="count", aliases=("c",)) +    async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None: +        """Returns embed with emoji category and info given by the user.""" +        emoji_dict = defaultdict(list) + +        if not ctx.guild.emojis: +            await ctx.send("No emojis found.") +            return +        log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.") +        for emoji in ctx.guild.emojis: +            emoji_category = emoji.name.split("_")[0] + +            if category_query is not None and emoji_category not in category_query: +                continue + +            emoji_dict[emoji_category].append(emoji) + +        if not emoji_dict: +            log.trace("Invalid name provided by the user") +            embed, msg = self.generate_invalid_embed(ctx.guild.emojis) +        else: +            embed, msg = self.embed_builder(emoji_dict) +        await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) + +    @emoji_group.command(name="info", aliases=("i",)) +    async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None: +        """Returns relevant information about a Discord Emoji.""" +        emoji_information = Embed( +            title=f"Emoji Information: {emoji.name}", +            description=textwrap.dedent(f""" +                **Name:** {emoji.name} +                **Created:** {time_since(emoji.created_at, precision="hours")} +                **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} +                **ID:** {emoji.id} +            """), +            color=Color.blurple(), +            url=str(emoji.url), +        ).set_thumbnail(url=emoji.url) + +        await ctx.send(embed=emoji_information) + + +def setup(bot: Bot) -> None: +    """Load the Emojis cog.""" +    bot.add_cog(Emojis()) diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py new file mode 100644 index 00000000..d00b408d --- /dev/null +++ b/bot/exts/utilities/githubinfo.py @@ -0,0 +1,178 @@ +import logging +import random +from datetime import datetime +from urllib.parse import quote, quote_plus + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES +from bot.exts.core.extensions import invoke_help_command + +log = logging.getLogger(__name__) + +GITHUB_API_URL = "https://api.github.com" + + +class GithubInfo(commands.Cog): +    """Fetches info from GitHub.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    async def fetch_data(self, url: str) -> dict: +        """Retrieve data as a dictionary.""" +        async with self.bot.http_session.get(url) as r: +            return await r.json() + +    @commands.group(name="github", aliases=("gh", "git")) +    @commands.cooldown(1, 10, commands.BucketType.user) +    async def github_group(self, ctx: commands.Context) -> None: +        """Commands for finding information related to GitHub.""" +        if ctx.invoked_subcommand is None: +            await invoke_help_command(ctx) + +    @github_group.command(name="user", aliases=("userinfo",)) +    async def github_user_info(self, ctx: commands.Context, username: str) -> None: +        """Fetches a user's GitHub information.""" +        async with ctx.typing(): +            user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") + +            # User_data will not have a message key if the user exists +            if "message" in user_data: +                embed = discord.Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description=f"The profile for `{username}` was not found.", +                    colour=Colours.soft_red +                ) + +                await ctx.send(embed=embed) +                return + +            org_data = await self.fetch_data(user_data["organizations_url"]) +            orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] +            orgs_to_add = " | ".join(orgs) + +            gists = user_data["public_gists"] + +            # Forming blog link +            if user_data["blog"].startswith("http"):  # Blog link is complete +                blog = user_data["blog"] +            elif user_data["blog"]:  # Blog exists but the link is not complete +                blog = f"https://{user_data['blog']}" +            else: +                blog = "No website link available" + +            embed = discord.Embed( +                title=f"`{user_data['login']}`'s GitHub profile info", +                description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "", +                colour=discord.Colour.blurple(), +                url=user_data["html_url"], +                timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ") +            ) +            embed.set_thumbnail(url=user_data["avatar_url"]) +            embed.set_footer(text="Account created at") + +            if user_data["type"] == "User": + +                embed.add_field( +                    name="Followers", +                    value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)" +                ) +                embed.add_field( +                    name="Following", +                    value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)" +                ) + +            embed.add_field( +                name="Public repos", +                value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)" +            ) + +            if user_data["type"] == "User": +                embed.add_field( +                    name="Gists", +                    value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" +                ) + +                embed.add_field( +                    name=f"Organization{'s' if len(orgs)!=1 else ''}", +                    value=orgs_to_add if orgs else "No organizations." +                ) +            embed.add_field(name="Website", value=blog) + +        await ctx.send(embed=embed) + +    @github_group.command(name='repository', aliases=('repo',)) +    async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None: +        """ +        Fetches a repositories' GitHub information. + +        The repository should look like `user/reponame` or `user reponame`. +        """ +        repo = "/".join(repo) +        if repo.count("/") != 1: +            embed = discord.Embed( +                title=random.choice(NEGATIVE_REPLIES), +                description="The repository should look like `user/reponame` or `user reponame`.", +                colour=Colours.soft_red +            ) + +            await ctx.send(embed=embed) +            return + +        async with ctx.typing(): +            repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + +            # There won't be a message key if this repo exists +            if "message" in repo_data: +                embed = discord.Embed( +                    title=random.choice(NEGATIVE_REPLIES), +                    description="The requested repository was not found.", +                    colour=Colours.soft_red +                ) + +                await ctx.send(embed=embed) +                return + +        embed = discord.Embed( +            title=repo_data["name"], +            description=repo_data["description"], +            colour=discord.Colour.blurple(), +            url=repo_data["html_url"] +        ) + +        # If it's a fork, then it will have a parent key +        try: +            parent = repo_data["parent"] +            embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})" +        except KeyError: +            log.debug("Repository is not a fork.") + +        repo_owner = repo_data["owner"] + +        embed.set_author( +            name=repo_owner["login"], +            url=repo_owner["html_url"], +            icon_url=repo_owner["avatar_url"] +        ) + +        repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y") +        last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M") + +        embed.set_footer( +            text=( +                f"{repo_data['forks_count']} ⑂ " +                f"• {repo_data['stargazers_count']} ⭐ " +                f"• Created At {repo_created_at} " +                f"• Last Commit {last_pushed}" +            ) +        ) + +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the GithubInfo cog.""" +    bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py new file mode 100644 index 00000000..8a7ebed0 --- /dev/null +++ b/bot/exts/utilities/issues.py @@ -0,0 +1,275 @@ +import logging +import random +import re +from dataclasses import dataclass +from typing import Optional, Union + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import ( +    Categories, +    Channels, +    Colours, +    ERROR_REPLIES, +    Emojis, +    NEGATIVE_REPLIES, +    Tokens, +    WHITELISTED_CHANNELS +) +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) + +BAD_RESPONSE = { +    404: "Issue/pull request not located! Please enter a valid number!", +    403: "Rate limit has been hit! Please try again later!" +} +REQUEST_HEADERS = { +    "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" + +WHITELISTED_CATEGORIES = ( +    Categories.development, Categories.devprojects, Categories.media, Categories.staff +) + +CODE_BLOCK_RE = re.compile( +    r"^`([^`\n]+)`"  # Inline codeblock +    r"|```(.+?)```",  # Multiline codeblock +    re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( +    r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass +class FoundIssue: +    """Dataclass representing an issue found by the regex.""" + +    organisation: Optional[str] +    repository: str +    number: str + +    def __hash__(self) -> int: +        return hash((self.organisation, self.repository, self.number)) + + +@dataclass +class FetchError: +    """Dataclass representing an error while fetching an issue.""" + +    return_code: int +    message: str + + +@dataclass +class IssueState: +    """Dataclass representing the state of an issue.""" + +    repository: str +    number: int +    url: str +    title: str +    emoji: str + + +class Issues(commands.Cog): +    """Cog that allows users to retrieve issues from GitHub.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.repos = [] + +    @staticmethod +    def remove_codeblocks(message: str) -> str: +        """Remove any codeblock in a message.""" +        return re.sub(CODE_BLOCK_RE, "", message) + +    async def fetch_issues( +            self, +            number: int, +            repository: str, +            user: str +    ) -> Union[IssueState, FetchError]: +        """ +        Retrieve an issue from a GitHub repository. + +        Returns IssueState on success, FetchError on failure. +        """ +        url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) +        pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) +        log.trace(f"Querying GH issues API: {url}") + +        async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: +            json_data = await r.json() + +        if r.status == 403: +            if r.headers.get("X-RateLimit-Remaining") == "0": +                log.info(f"Ratelimit reached while fetching {url}") +                return FetchError(403, "Ratelimit reached, please retry in a few minutes.") +            return FetchError(403, "Cannot access issue.") +        elif r.status in (404, 410): +            return FetchError(r.status, "Issue not found.") +        elif r.status != 200: +            return FetchError(r.status, "Error while fetching issue.") + +        # The initial API request is made to the issues API endpoint, which will return information +        # if the issue or PR is present. However, the scope of information returned for PRs differs +        # from issues: if the 'issues' key is present in the response then we can pull the data we +        # need from the initial API call. +        if "issues" in json_data["html_url"]: +            if json_data.get("state") == "open": +                emoji = Emojis.issue_open +            else: +                emoji = Emojis.issue_closed + +        # If the 'issues' key is not contained in the API response and there is no error code, then +        # we know that a PR has been requested and a call to the pulls API endpoint is necessary +        # to get the desired information for the PR. +        else: +            log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") +            async with self.bot.http_session.get(pulls_url) as p: +                pull_data = await p.json() +                if pull_data["draft"]: +                    emoji = Emojis.pull_request_draft +                elif pull_data["state"] == "open": +                    emoji = Emojis.pull_request_open +                # When 'merged_at' is not None, this means that the state of the PR is merged +                elif pull_data["merged_at"] is not None: +                    emoji = Emojis.pull_request_merged +                else: +                    emoji = Emojis.pull_request_closed + +        issue_url = json_data.get("html_url") + +        return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + +    @staticmethod +    def format_embed( +        results: list[Union[IssueState, FetchError]], +        user: str, +        repository: Optional[str] = None +    ) -> discord.Embed: +        """Take a list of IssueState or FetchError and format a Discord embed for them.""" +        description_list = [] + +        for result in results: +            if isinstance(result, IssueState): +                description_list.append(f"{result.emoji} [{result.title}]({result.url})") +            elif isinstance(result, FetchError): +                description_list.append(f":x: [{result.return_code}] {result.message}") + +        resp = discord.Embed( +            colour=Colours.bright_green, +            description="\n".join(description_list) +        ) + +        embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" +        resp.set_author(name="GitHub", url=embed_url) +        return resp + +    @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) +    @commands.command(aliases=("pr",)) +    async def issue( +        self, +        ctx: commands.Context, +        numbers: commands.Greedy[int], +        repository: str = "sir-lancebot", +        user: str = "python-discord" +    ) -> None: +        """Command to retrieve issue(s) from a GitHub repository.""" +        # Remove duplicates +        numbers = set(numbers) + +        if len(numbers) > MAXIMUM_ISSUES: +            embed = discord.Embed( +                title=random.choice(ERROR_REPLIES), +                color=Colours.soft_red, +                description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" +            ) +            await ctx.send(embed=embed) +            await invoke_help_command(ctx) + +        results = [await self.fetch_issues(number, repository, user) for number in numbers] +        await ctx.send(embed=self.format_embed(results, user, repository)) + +    @commands.Cog.listener() +    async def on_message(self, message: discord.Message) -> None: +        """ +        Automatic issue linking. + +        Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. +        """ +        # Ignore bots +        if message.author.bot: +            return + +        issues = [ +            FoundIssue(*match.group("org", "repo", "number")) +            for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) +        ] +        links = [] + +        if issues: +            # Block this from working in DMs +            if not message.guild: +                await message.channel.send( +                    embed=discord.Embed( +                        title=random.choice(NEGATIVE_REPLIES), +                        description=( +                            "You can't retrieve issues from DMs. " +                            f"Try again in <#{Channels.community_bot_commands}>" +                        ), +                        colour=Colours.soft_red +                    ) +                ) +                return + +            log.trace(f"Found {issues = }") +            # Remove duplicates +            issues = set(issues) + +            if len(issues) > MAXIMUM_ISSUES: +                embed = discord.Embed( +                    title=random.choice(ERROR_REPLIES), +                    color=Colours.soft_red, +                    description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" +                ) +                await message.channel.send(embed=embed, delete_after=5) +                return + +            for repo_issue in issues: +                result = await self.fetch_issues( +                    int(repo_issue.number), +                    repo_issue.repository, +                    repo_issue.organisation or "python-discord" +                ) +                if isinstance(result, IssueState): +                    links.append(result) + +        if not links: +            return + +        resp = self.format_embed(links, "python-discord") +        await message.channel.send(embed=resp) + + +def setup(bot: Bot) -> None: +    """Load the Issues cog.""" +    bot.add_cog(Issues(bot)) diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py new file mode 100644 index 00000000..36c7e0ab --- /dev/null +++ b/bot/exts/utilities/latex.py @@ -0,0 +1,101 @@ +import asyncio +import hashlib +import pathlib +import re +from concurrent.futures import ThreadPoolExecutor +from io import BytesIO + +import discord +import matplotlib.pyplot as plt +from discord.ext import commands + +from bot.bot import Bot + +# configure fonts and colors for matplotlib +plt.rcParams.update( +    { +        "font.size": 16, +        "mathtext.fontset": "cm",  # Computer Modern font set +        "mathtext.rm": "serif", +        "figure.facecolor": "36393F",  # matches Discord's dark mode background color +        "text.color": "white", +    } +) + +FORMATTED_CODE_REGEX = re.compile( +    r"(?P<delim>(?P<block>```)|``?)"        # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block +    r"(?(block)(?:(?P<lang>[a-z]+)\n)?)"    # if we're in a block, match optional language (only letters plus newline) +    r"(?:[ \t]*\n)*"                        # any blank (empty or tabs/spaces only) lines before the code +    r"(?P<code>.*?)"                        # extract all code inside the markup +    r"\s*"                                  # any more whitespace before the end of the code markup +    r"(?P=delim)",                          # match the exact same delimiter from the start again +    re.DOTALL | re.IGNORECASE,              # "." also matches newlines, case insensitive +) + +CACHE_DIRECTORY = pathlib.Path("_latex_cache") +CACHE_DIRECTORY.mkdir(exist_ok=True) + + +class Latex(commands.Cog): +    """Renders latex.""" + +    @staticmethod +    def _render(text: str, filepath: pathlib.Path) -> BytesIO: +        """ +        Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. + +        Saves rendered image to cache. +        """ +        fig = plt.figure() +        rendered_image = BytesIO() +        fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") + +        try: +            plt.savefig(rendered_image, bbox_inches="tight", dpi=600) +        except ValueError as e: +            raise commands.BadArgument(str(e)) + +        rendered_image.seek(0) + +        with open(filepath, "wb") as f: +            f.write(rendered_image.getbuffer()) + +        return rendered_image + +    @staticmethod +    def _prepare_input(text: str) -> str: +        text = text.replace(r"\\", "$\n$")  # matplotlib uses \n for newlines, not \\ + +        if match := FORMATTED_CODE_REGEX.match(text): +            return match.group("code") +        else: +            return text + +    @commands.command() +    @commands.max_concurrency(1, commands.BucketType.guild, wait=True) +    async def latex(self, ctx: commands.Context, *, text: str) -> None: +        """Renders the text in latex and sends the image.""" +        text = self._prepare_input(text) +        query_hash = hashlib.md5(text.encode()).hexdigest() +        image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") +        async with ctx.typing(): +            if image_path.exists(): +                await ctx.send(file=discord.File(image_path)) +                return + +            with ThreadPoolExecutor() as pool: +                image = await asyncio.get_running_loop().run_in_executor( +                    pool, self._render, text, image_path +                ) + +            await ctx.send(file=discord.File(image, "latex.png")) + + +def setup(bot: Bot) -> None: +    """Load the Latex Cog.""" +    # As we have resource issues on this cog, +    # we have it currently disabled while we fix it. +    import logging +    logging.info("Latex cog is currently disabled. It won't be loaded.") +    return +    bot.add_cog(Latex()) diff --git a/bot/exts/utilities/pythonfacts.py b/bot/exts/utilities/pythonfacts.py new file mode 100644 index 00000000..ef190185 --- /dev/null +++ b/bot/exts/utilities/pythonfacts.py @@ -0,0 +1,36 @@ +import itertools + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +with open("bot/resources/utilities/python_facts.txt") as file: +    FACTS = itertools.cycle(list(file)) + +COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow]) +PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93" + + +class PythonFacts(commands.Cog): +    """Sends a random fun fact about Python.""" + +    @commands.command(name="pythonfact", aliases=("pyfact",)) +    async def get_python_fact(self, ctx: commands.Context) -> None: +        """Sends a Random fun fact about Python.""" +        embed = discord.Embed( +            title="Python Facts", +            description=next(FACTS), +            colour=next(COLORS) +        ) +        embed.add_field( +            name="Suggestions", +            value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})" +        ) +        await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: +    """Load the PythonFacts Cog.""" +    bot.add_cog(PythonFacts()) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py new file mode 100644 index 00000000..ef8b2638 --- /dev/null +++ b/bot/exts/utilities/realpython.py @@ -0,0 +1,81 @@ +import logging +from html import unescape +from urllib.parse import quote_plus + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +logger = logging.getLogger(__name__) + + +API_ROOT = "https://realpython.com/search/api/v1/" +ARTICLE_URL = "https://realpython.com{article_url}" +SEARCH_URL = "https://realpython.com/search?q={user_search}" + + +ERROR_EMBED = Embed( +    title="Error while searching Real Python", +    description="There was an error while trying to reach Real Python. Please try again shortly.", +    color=Colours.soft_red, +) + + +class RealPython(commands.Cog): +    """User initiated command to search for a Real Python article.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @commands.command(aliases=["rp"]) +    @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) +    async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: +        """Send 5 articles that match the user's search terms.""" +        params = {"q": user_search, "limit": 5, "kind": "article"} +        async with self.bot.http_session.get(url=API_ROOT, params=params) as response: +            if response.status != 200: +                logger.error( +                    f"Unexpected status code {response.status} from Real Python" +                ) +                await ctx.send(embed=ERROR_EMBED) +                return + +            data = await response.json() + +        articles = data["results"] + +        if len(articles) == 0: +            no_articles = Embed( +                title=f"No articles found for '{user_search}'", color=Colours.soft_red +            ) +            await ctx.send(embed=no_articles) +            return + +        if len(articles) == 1: +            article_description = "Here is the result:" +        else: +            article_description = f"Here are the top {len(articles)} results:" + +        article_embed = Embed( +            title="Search results - Real Python", +            url=SEARCH_URL.format(user_search=quote_plus(user_search)), +            description=article_description, +            color=Colours.orange, +        ) + +        for article in articles: +            article_embed.add_field( +                name=unescape(article["title"]), +                value=ARTICLE_URL.format(article_url=article["url"]), +                inline=False, +            ) +        article_embed.set_footer(text="Click the links to go to the articles.") + +        await ctx.send(embed=article_embed) + + +def setup(bot: Bot) -> None: +    """Load the Real Python Cog.""" +    bot.add_cog(RealPython(bot)) diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py new file mode 100644 index 00000000..e6cb5337 --- /dev/null +++ b/bot/exts/utilities/reddit.py @@ -0,0 +1,368 @@ +import asyncio +import logging +import random +import textwrap +from collections import namedtuple +from datetime import datetime, timedelta +from typing import Union + +from aiohttp import BasicAuth, ClientError +from discord import Colour, Embed, TextChannel +from discord.ext.commands import Cog, Context, group, has_any_role +from discord.ext.tasks import loop +from discord.utils import escape_markdown, sleep_until + +from bot.bot import Bot +from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES +from bot.utils.converters import Subreddit +from bot.utils.extensions import invoke_help_command +from bot.utils.messages import sub_clyde +from bot.utils.pagination import ImagePaginator, LinePaginator + +log = logging.getLogger(__name__) + +AccessToken = namedtuple("AccessToken", ["token", "expires_at"]) + + +class Reddit(Cog): +    """Track subreddit posts and show detailed statistics about them.""" + +    HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"} +    URL = "https://www.reddit.com" +    OAUTH_URL = "https://oauth.reddit.com" +    MAX_RETRIES = 3 + +    def __init__(self, bot: Bot): +        self.bot = bot + +        self.webhook = None +        self.access_token = None +        self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret) + +        bot.loop.create_task(self.init_reddit_ready()) +        self.auto_poster_loop.start() + +    def cog_unload(self) -> None: +        """Stop the loop task and revoke the access token when the cog is unloaded.""" +        self.auto_poster_loop.cancel() +        if self.access_token and self.access_token.expires_at > datetime.utcnow(): +            asyncio.create_task(self.revoke_access_token()) + +    async def init_reddit_ready(self) -> None: +        """Sets the reddit webhook when the cog is loaded.""" +        await self.bot.wait_until_guild_available() +        if not self.webhook: +            self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook) + +    @property +    def channel(self) -> TextChannel: +        """Get the #reddit channel object from the bot's cache.""" +        return self.bot.get_channel(Channels.reddit) + +    def build_pagination_pages(self, posts: list[dict], paginate: bool) -> Union[list[tuple], str]: +        """Build embed pages required for Paginator.""" +        pages = [] +        first_page = "" +        for post in posts: +            post_page = "" +            image_url = "" + +            data = post["data"] + +            title = textwrap.shorten(data["title"], width=50, placeholder="...") + +            # Normal brackets interfere with Markdown. +            title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌") +            link = self.URL + data["permalink"] + +            first_page += f"**[{title.replace('*', '')}]({link})**\n" + +            text = data["selftext"] +            if text: +                text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌") +                first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n" + +            ups = data["ups"] +            comments = data["num_comments"] +            author = data["author"] + +            content_type = Emojis.reddit_post_text +            if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))): +                # This means the content type in the post is a video. +                content_type = f"{Emojis.reddit_post_video}" + +            elif data["url"].endswith(("jpg", "png", "gif")): +                # This means the content type in the post is an image. +                content_type = f"{Emojis.reddit_post_photo}" +                image_url = data["url"] + +            first_page += ( +                f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}" +                f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n" +            ) + +            if paginate: +                post_page += f"**[{title}]({link})**\n\n" +                if text: +                    post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n" +                post_page += ( +                    f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002" +                    f"{comments}\u2003{Emojis.reddit_users}{author}" +                ) + +                pages.append((post_page, image_url)) + +        if not paginate: +            # Return the first summery page if pagination is not required +            return first_page + +        pages.insert(0, (first_page, ""))  # Using image paginator, hence settings image url to empty string +        return pages + +    async def get_access_token(self) -> None: +        """ +        Get a Reddit API OAuth2 access token and assign it to self.access_token. + +        A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog +        will be unloaded and a ClientError raised if retrieval was still unsuccessful. +        """ +        for i in range(1, self.MAX_RETRIES + 1): +            response = await self.bot.http_session.post( +                url=f"{self.URL}/api/v1/access_token", +                headers=self.HEADERS, +                auth=self.client_auth, +                data={ +                    "grant_type": "client_credentials", +                    "duration": "temporary" +                } +            ) + +            if response.status == 200 and response.content_type == "application/json": +                content = await response.json() +                expiration = int(content["expires_in"]) - 60  # Subtract 1 minute for leeway. +                self.access_token = AccessToken( +                    token=content["access_token"], +                    expires_at=datetime.utcnow() + timedelta(seconds=expiration) +                ) + +                log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}") +                return +            else: +                log.debug( +                    f"Failed to get an access token: " +                    f"status {response.status} & content type {response.content_type}; " +                    f"retrying ({i}/{self.MAX_RETRIES})" +                ) + +            await asyncio.sleep(3) + +        self.bot.remove_cog(self.qualified_name) +        raise ClientError("Authentication with the Reddit API failed. Unloading the cog.") + +    async def revoke_access_token(self) -> None: +        """ +        Revoke the OAuth2 access token for the Reddit API. + +        For security reasons, it's good practice to revoke the token when it's no longer being used. +        """ +        response = await self.bot.http_session.post( +            url=f"{self.URL}/api/v1/revoke_token", +            headers=self.HEADERS, +            auth=self.client_auth, +            data={ +                "token": self.access_token.token, +                "token_type_hint": "access_token" +            } +        ) + +        if response.status in [200, 204] and response.content_type == "application/json": +            self.access_token = None +        else: +            log.warning(f"Unable to revoke access token: status {response.status}.") + +    async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> list[dict]: +        """A helper method to fetch a certain amount of Reddit posts at a given route.""" +        # Reddit's JSON responses only provide 25 posts at most. +        if not 25 >= amount > 0: +            raise ValueError("Invalid amount of subreddit posts requested.") + +        # Renew the token if necessary. +        if not self.access_token or self.access_token.expires_at < datetime.utcnow(): +            await self.get_access_token() + +        url = f"{self.OAUTH_URL}/{route}" +        for _ in range(self.MAX_RETRIES): +            response = await self.bot.http_session.get( +                url=url, +                headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"}, +                params=params +            ) +            if response.status == 200 and response.content_type == 'application/json': +                # Got appropriate response - process and return. +                content = await response.json() +                posts = content["data"]["children"] + +                filtered_posts = [post for post in posts if not post["data"]["over_18"]] + +                return filtered_posts[:amount] + +            await asyncio.sleep(3) + +        log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}") +        return list()  # Failed to get appropriate response within allowed number of retries. + +    async def get_top_posts( +            self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False +    ) -> Union[Embed, list[tuple]]: +        """ +        Get the top amount of posts for a given subreddit within a specified timeframe. + +        A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top +        weekly posts. + +        The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most. +        """ +        embed = Embed() + +        posts = await self.fetch_posts( +            route=f"{subreddit}/top", +            amount=amount, +            params={"t": time} +        ) +        if not posts: +            embed.title = random.choice(ERROR_REPLIES) +            embed.colour = Colour.red() +            embed.description = ( +                "Sorry! We couldn't find any SFW posts from that subreddit. " +                "If this problem persists, please let us know." +            ) + +            return embed + +        if paginate: +            return self.build_pagination_pages(posts, paginate=True) + +        # Use only starting summary page for #reddit channel posts. +        embed.description = self.build_pagination_pages(posts, paginate=False) +        embed.colour = Colour.blurple() +        return embed + +    @loop() +    async def auto_poster_loop(self) -> None: +        """Post the top 5 posts daily, and the top 5 posts weekly.""" +        # once d.py get support for `time` parameter in loop decorator, +        # this can be removed and the loop can use the `time=datetime.time.min` parameter +        now = datetime.utcnow() +        tomorrow = now + timedelta(days=1) +        midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0) + +        await sleep_until(midnight_tomorrow) + +        await self.bot.wait_until_guild_available() +        if not self.webhook: +            await self.bot.fetch_webhook(RedditConfig.webhook) + +        if datetime.utcnow().weekday() == 0: +            await self.top_weekly_posts() +            # if it's a monday send the top weekly posts + +        for subreddit in RedditConfig.subreddits: +            top_posts = await self.get_top_posts(subreddit=subreddit, time="day") +            username = sub_clyde(f"{subreddit} Top Daily Posts") +            message = await self.webhook.send(username=username, embed=top_posts, wait=True) + +            if message.channel.is_news(): +                await message.publish() + +    async def top_weekly_posts(self) -> None: +        """Post a summary of the top posts.""" +        for subreddit in RedditConfig.subreddits: +            # Send and pin the new weekly posts. +            top_posts = await self.get_top_posts(subreddit=subreddit, time="week") +            username = sub_clyde(f"{subreddit} Top Weekly Posts") +            message = await self.webhook.send(wait=True, username=username, embed=top_posts) + +            if subreddit.lower() == "r/python": +                if not self.channel: +                    log.warning("Failed to get #reddit channel to remove pins in the weekly loop.") +                    return + +                # Remove the oldest pins so that only 12 remain at most. +                pins = await self.channel.pins() + +                while len(pins) >= 12: +                    await pins[-1].unpin() +                    del pins[-1] + +                await message.pin() + +                if message.channel.is_news(): +                    await message.publish() + +    @group(name="reddit", invoke_without_command=True) +    async def reddit_group(self, ctx: Context) -> None: +        """View the top posts from various subreddits.""" +        await invoke_help_command(ctx) + +    @reddit_group.command(name="top") +    async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: +        """Send the top posts of all time from a given subreddit.""" +        async with ctx.typing(): +            pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True) + +        await ctx.send(f"Here are the top {subreddit} posts of all time!") +        embed = Embed( +            color=Colour.blurple() +        ) + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @reddit_group.command(name="daily") +    async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: +        """Send the top posts of today from a given subreddit.""" +        async with ctx.typing(): +            pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True) + +        await ctx.send(f"Here are today's top {subreddit} posts!") +        embed = Embed( +            color=Colour.blurple() +        ) + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @reddit_group.command(name="weekly") +    async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: +        """Send the top posts of this week from a given subreddit.""" +        async with ctx.typing(): +            pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True) + +        await ctx.send(f"Here are this week's top {subreddit} posts!") +        embed = Embed( +            color=Colour.blurple() +        ) + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @has_any_role(*STAFF_ROLES) +    @reddit_group.command(name="subreddits", aliases=("subs",)) +    async def subreddits_command(self, ctx: Context) -> None: +        """Send a paginated embed of all the subreddits we're relaying.""" +        embed = Embed() +        embed.title = "Relayed subreddits." +        embed.colour = Colour.blurple() + +        await LinePaginator.paginate( +            RedditConfig.subreddits, +            ctx, embed, +            footer_text="Use the reddit commands along with these to view their posts.", +            empty=False, +            max_lines=15 +        ) + + +def setup(bot: Bot) -> None: +    """Load the Reddit cog.""" +    if not RedditConfig.secret or not RedditConfig.client_id: +        log.error("Credentials not provided, cog not loaded.") +        return +    bot.add_cog(Reddit(bot)) diff --git a/bot/exts/utilities/stackoverflow.py b/bot/exts/utilities/stackoverflow.py new file mode 100644 index 00000000..64455e33 --- /dev/null +++ b/bot/exts/utilities/stackoverflow.py @@ -0,0 +1,88 @@ +import logging +from html import unescape +from urllib.parse import quote_plus + +from discord import Embed, HTTPException +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, Emojis + +logger = logging.getLogger(__name__) + +BASE_URL = "https://api.stackexchange.com/2.2/search/advanced" +SO_PARAMS = { +    "order": "desc", +    "sort": "activity", +    "site": "stackoverflow" +} +SEARCH_URL = "https://stackoverflow.com/search?q={query}" +ERR_EMBED = Embed( +    title="Error in fetching results from Stackoverflow", +    description=( +        "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some " +        "time. If this issue persists, please contact the staff or send a message in #dev-contrib." +    ), +    color=Colours.soft_red +) + + +class Stackoverflow(commands.Cog): +    """Contains command to interact with stackoverflow from discord.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @commands.command(aliases=["so"]) +    @commands.cooldown(1, 15, commands.cooldowns.BucketType.user) +    async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None: +        """Sends the top 5 results of a search query from stackoverflow.""" +        params = SO_PARAMS | {"q": search_query} +        async with self.bot.http_session.get(url=BASE_URL, params=params) as response: +            if response.status == 200: +                data = await response.json() +            else: +                logger.error(f'Status code is not 200, it is {response.status}') +                await ctx.send(embed=ERR_EMBED) +                return +        if not data['items']: +            no_search_result = Embed( +                title=f"No search results found for {search_query}", +                color=Colours.soft_red +            ) +            await ctx.send(embed=no_search_result) +            return + +        top5 = data["items"][:5] +        encoded_search_query = quote_plus(search_query) +        embed = Embed( +            title="Search results - Stackoverflow", +            url=SEARCH_URL.format(query=encoded_search_query), +            description=f"Here are the top {len(top5)} results:", +            color=Colours.orange +        ) +        for item in top5: +            embed.add_field( +                name=unescape(item['title']), +                value=( +                    f"[{Emojis.reddit_upvote} {item['score']}    " +                    f"{Emojis.stackoverflow_views} {item['view_count']}     " +                    f"{Emojis.reddit_comments} {item['answer_count']}   " +                    f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]" +                    f"({item['link']})" +                ), +                inline=False) +        embed.set_footer(text="View the original link for more results.") +        try: +            await ctx.send(embed=embed) +        except HTTPException: +            search_query_too_long = Embed( +                title="Your search query is too long, please try shortening your search query", +                color=Colours.soft_red +            ) +            await ctx.send(embed=search_query_too_long) + + +def setup(bot: Bot) -> None: +    """Load the Stackoverflow Cog.""" +    bot.add_cog(Stackoverflow(bot)) diff --git a/bot/exts/utilities/timed.py b/bot/exts/utilities/timed.py new file mode 100644 index 00000000..2ea6b419 --- /dev/null +++ b/bot/exts/utilities/timed.py @@ -0,0 +1,48 @@ +from copy import copy +from time import perf_counter + +from discord import Message +from discord.ext import commands + +from bot.bot import Bot + + +class TimedCommands(commands.Cog): +    """Time the command execution of a command.""" + +    @staticmethod +    async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context: +        """Get a new execution context for a command.""" +        msg: Message = copy(ctx.message) +        msg.content = f"{ctx.prefix}{command}" + +        return await ctx.bot.get_context(msg) + +    @commands.command(name="timed", aliases=("time", "t")) +    async def timed(self, ctx: commands.Context, *, command: str) -> None: +        """Time the command execution of a command.""" +        new_ctx = await self.create_execution_context(ctx, command) + +        ctx.subcontext = new_ctx + +        if not ctx.subcontext.command: +            help_command = f"{ctx.prefix}help" +            error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands." + +            await ctx.send(error) +            return + +        if new_ctx.command.qualified_name == "timed": +            await ctx.send("You are not allowed to time the execution of the `timed` command.") +            return + +        t_start = perf_counter() +        await new_ctx.command.invoke(new_ctx) +        t_end = perf_counter() + +        await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.") + + +def setup(bot: Bot) -> None: +    """Load the Timed cog.""" +    bot.add_cog(TimedCommands()) diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py new file mode 100644 index 00000000..eccc1f8c --- /dev/null +++ b/bot/exts/utilities/wikipedia.py @@ -0,0 +1,100 @@ +import logging +import re +from datetime import datetime +from html import unescape + +from discord import Color, Embed, TextChannel +from discord.ext import commands + +from bot.bot import Bot +from bot.utils import LinePaginator +from bot.utils.exceptions import APIError + +log = logging.getLogger(__name__) + +SEARCH_API = ( +    "https://en.wikipedia.org/w/api.php" +) +WIKI_PARAMS = { +    "action": "query", +    "list": "search", +    "prop": "info", +    "inprop": "url", +    "utf8": "", +    "format": "json", +    "origin": "*", + +} +WIKI_THUMBNAIL = ( +    "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg" +    "/330px-Wikipedia-logo-v2.svg.png" +) +WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)" +WIKI_SEARCH_RESULT = ( +    "**[{name}]({url})**\n" +    "{description}\n" +) + + +class WikipediaSearch(commands.Cog): +    """Get info from wikipedia.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    async def wiki_request(self, channel: TextChannel, search: str) -> list[str]: +        """Search wikipedia search string and return formatted first 10 pages found.""" +        params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search} +        async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp: +            if resp.status != 200: +                log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`") +                raise APIError("Wikipedia API", resp.status) + +            raw_data = await resp.json() + +            if not raw_data.get("query"): +                if error := raw_data.get("errors"): +                    log.error(f"There was an error while communicating with the Wikipedia API: {error}") +                raise APIError("Wikipedia API", resp.status, error) + +            lines = [] +            if raw_data["query"]["searchinfo"]["totalhits"]: +                for article in raw_data["query"]["search"]: +                    line = WIKI_SEARCH_RESULT.format( +                        name=article["title"], +                        description=unescape( +                            re.sub( +                                WIKI_SNIPPET_REGEX, "", article["snippet"] +                            ) +                        ), +                        url=f"https://en.wikipedia.org/?curid={article['pageid']}" +                    ) +                    lines.append(line) + +            return lines + +    @commands.cooldown(1, 10, commands.BucketType.user) +    @commands.command(name="wikipedia", aliases=("wiki",)) +    async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None: +        """Sends paginated top 10 results of Wikipedia search..""" +        contents = await self.wiki_request(ctx.channel, search) + +        if contents: +            embed = Embed( +                title="Wikipedia Search Results", +                colour=Color.blurple() +            ) +            embed.set_thumbnail(url=WIKI_THUMBNAIL) +            embed.timestamp = datetime.utcnow() +            await LinePaginator.paginate( +                contents, ctx, embed +            ) +        else: +            await ctx.send( +                "Sorry, we could not find a wikipedia article using that search term." +            ) + + +def setup(bot: Bot) -> None: +    """Load the WikipediaSearch cog.""" +    bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/utilities/wolfram.py b/bot/exts/utilities/wolfram.py new file mode 100644 index 00000000..9a26e545 --- /dev/null +++ b/bot/exts/utilities/wolfram.py @@ -0,0 +1,293 @@ +import logging +from io import BytesIO +from typing import Callable, Optional +from urllib.parse import urlencode + +import arrow +import discord +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Cog, Context, check, group + +from bot.bot import Bot +from bot.constants import Colours, STAFF_ROLES, Wolfram +from bot.utils.pagination import ImagePaginator + +log = logging.getLogger(__name__) + +APPID = Wolfram.key +DEFAULT_OUTPUT_FORMAT = "JSON" +QUERY = "http://api.wolframalpha.com/v2/{request}" +WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" + +MAX_PODS = 20 + +# Allows for 10 wolfram calls pr user pr day +usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user) + +# Allows for max api requests / days in month per day for the entire guild (Temporary) +guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild) + + +async def send_embed( +        ctx: Context, +        message_txt: str, +        colour: int = Colours.soft_red, +        footer: str = None, +        img_url: str = None, +        f: discord.File = None +) -> None: +    """Generate & send a response embed with Wolfram as the author.""" +    embed = Embed(colour=colour) +    embed.description = message_txt +    embed.set_author( +        name="Wolfram Alpha", +        icon_url=WOLF_IMAGE, +        url="https://www.wolframalpha.com/" +    ) +    if footer: +        embed.set_footer(text=footer) + +    if img_url: +        embed.set_image(url=img_url) + +    await ctx.send(embed=embed, file=f) + + +def custom_cooldown(*ignore: int) -> Callable: +    """ +    Implement per-user and per-guild cooldowns for requests to the Wolfram API. + +    A list of roles may be provided to ignore the per-user cooldown. +    """ +    async def predicate(ctx: Context) -> bool: +        if ctx.invoked_with == "help": +            # if the invoked command is help we don't want to increase the ratelimits since it's not actually +            # invoking the command/making a request, so instead just check if the user/guild are on cooldown. +            guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0  # if guild is on cooldown +            # check the message is in a guild, and check user bucket if user is not ignored +            if ctx.guild and not any(r.id in ignore for r in ctx.author.roles): +                return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 +            return guild_cooldown + +        user_bucket = usercd.get_bucket(ctx.message) + +        if all(role.id not in ignore for role in ctx.author.roles): +            user_rate = user_bucket.update_rate_limit() + +            if user_rate: +                # Can't use api; cause: member limit +                cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True) +                message = ( +                    "You've used up your limit for Wolfram|Alpha requests.\n" +                    f"Cooldown: {cooldown}" +                ) +                await send_embed(ctx, message) +                return False + +        guild_bucket = guildcd.get_bucket(ctx.message) +        guild_rate = guild_bucket.update_rate_limit() + +        # Repr has a token attribute to read requests left +        log.debug(guild_bucket) + +        if guild_rate: +            # Can't use api; cause: guild limit +            message = ( +                "The max limit of requests for the server has been reached for today.\n" +                f"Cooldown: {int(guild_rate)}" +            ) +            await send_embed(ctx, message) +            return False + +        return True + +    return check(predicate) + + +async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]: +    """Get the Wolfram API pod pages for the provided query.""" +    async with ctx.typing(): +        params = { +            "input": query, +            "appid": APPID, +            "output": DEFAULT_OUTPUT_FORMAT, +            "format": "image,plaintext", +            "location": "the moon", +            "latlong": "0.0,0.0", +            "ip": "1.1.1.1" +        } +        request_url = QUERY.format(request="query") + +        async with bot.http_session.get(url=request_url, params=params) as response: +            json = await response.json(content_type="text/plain") + +        result = json["queryresult"] +        log_full_url = f"{request_url}?{urlencode(params)}" +        if result["error"]: +            # API key not set up correctly +            if result["error"]["msg"] == "Invalid appid": +                message = "Wolfram API key is invalid or missing." +                log.warning( +                    "API key seems to be missing, or invalid when " +                    f"processing a wolfram request: {log_full_url}, Response: {json}" +                ) +                await send_embed(ctx, message) +                return None + +            message = "Something went wrong internally with your request, please notify staff!" +            log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}") +            await send_embed(ctx, message) +            return None + +        if not result["success"]: +            message = f"I couldn't find anything for {query}." +            await send_embed(ctx, message) +            return None + +        if not result["numpods"]: +            message = "Could not find any results." +            await send_embed(ctx, message) +            return None + +        pods = result["pods"] +        pages = [] +        for pod in pods[:MAX_PODS]: +            subs = pod.get("subpods") + +            for sub in subs: +                title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") +                img = sub["img"]["src"] +                pages.append((title, img)) +        return pages + + +class Wolfram(Cog): +    """Commands for interacting with the Wolfram|Alpha API.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) +    @custom_cooldown(*STAFF_ROLES) +    async def wolfram_command(self, ctx: Context, *, query: str) -> None: +        """Requests all answers on a single image, sends an image of all related pods.""" +        params = { +            "i": query, +            "appid": APPID, +            "location": "the moon", +            "latlong": "0.0,0.0", +            "ip": "1.1.1.1" +        } +        request_url = QUERY.format(request="simple") + +        # Give feedback that the bot is working. +        async with ctx.typing(): +            async with self.bot.http_session.get(url=request_url, params=params) as response: +                status = response.status +                image_bytes = await response.read() + +            f = discord.File(BytesIO(image_bytes), filename="image.png") +            image_url = "attachment://image.png" + +            if status == 501: +                message = "Failed to get response." +                footer = "" +                color = Colours.soft_red +            elif status == 400: +                message = "No input found." +                footer = "" +                color = Colours.soft_red +            elif status == 403: +                message = "Wolfram API key is invalid or missing." +                footer = "" +                color = Colours.soft_red +            else: +                message = "" +                footer = "View original for a bigger picture." +                color = Colours.soft_orange + +            # Sends a "blank" embed if no request is received, unsure how to fix +            await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) + +    @wolfram_command.command(name="page", aliases=("pa", "p")) +    @custom_cooldown(*STAFF_ROLES) +    async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: +        """ +        Requests a drawn image of given query. + +        Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. +        """ +        pages = await get_pod_pages(ctx, self.bot, query) + +        if not pages: +            return + +        embed = Embed() +        embed.set_author( +            name="Wolfram Alpha", +            icon_url=WOLF_IMAGE, +            url="https://www.wolframalpha.com/" +        ) +        embed.colour = Colours.soft_orange + +        await ImagePaginator.paginate(pages, ctx, embed) + +    @wolfram_command.command(name="cut", aliases=("c",)) +    @custom_cooldown(*STAFF_ROLES) +    async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: +        """ +        Requests a drawn image of given query. + +        Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. +        """ +        pages = await get_pod_pages(ctx, self.bot, query) + +        if not pages: +            return + +        if len(pages) >= 2: +            page = pages[1] +        else: +            page = pages[0] + +        await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) + +    @wolfram_command.command(name="short", aliases=("sh", "s")) +    @custom_cooldown(*STAFF_ROLES) +    async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: +        """Requests an answer to a simple question.""" +        params = { +            "i": query, +            "appid": APPID, +            "location": "the moon", +            "latlong": "0.0,0.0", +            "ip": "1.1.1.1" +        } +        request_url = QUERY.format(request="result") + +        # Give feedback that the bot is working. +        async with ctx.typing(): +            async with self.bot.http_session.get(url=request_url, params=params) as response: +                status = response.status +                response_text = await response.text() + +            if status == 501: +                message = "Failed to get response." +                color = Colours.soft_red +            elif status == 400: +                message = "No input found." +                color = Colours.soft_red +            elif response_text == "Error 1: Invalid appid.": +                message = "Wolfram API key is invalid or missing." +                color = Colours.soft_red +            else: +                message = response_text +                color = Colours.soft_orange + +            await send_embed(ctx, message, color) + + +def setup(bot: Bot) -> None: +    """Load the Wolfram cog.""" +    bot.add_cog(Wolfram(bot)) | 
