diff options
Diffstat (limited to 'bot/exts/utilities')
-rw-r--r-- | bot/exts/utilities/__init__.py | 0 | ||||
-rw-r--r-- | bot/exts/utilities/bookmark.py | 153 | ||||
-rw-r--r-- | bot/exts/utilities/cheatsheet.py | 112 | ||||
-rw-r--r-- | bot/exts/utilities/conversationstarters.py | 69 | ||||
-rw-r--r-- | bot/exts/utilities/emoji.py | 123 | ||||
-rw-r--r-- | bot/exts/utilities/githubinfo.py | 178 | ||||
-rw-r--r-- | bot/exts/utilities/issues.py | 275 | ||||
-rw-r--r-- | bot/exts/utilities/latex.py | 101 | ||||
-rw-r--r-- | bot/exts/utilities/pythonfacts.py | 36 | ||||
-rw-r--r-- | bot/exts/utilities/realpython.py | 81 | ||||
-rw-r--r-- | bot/exts/utilities/reddit.py | 368 | ||||
-rw-r--r-- | bot/exts/utilities/stackoverflow.py | 88 | ||||
-rw-r--r-- | bot/exts/utilities/timed.py | 48 | ||||
-rw-r--r-- | bot/exts/utilities/wikipedia.py | 100 | ||||
-rw-r--r-- | bot/exts/utilities/wolfram.py | 293 |
15 files changed, 2025 insertions, 0 deletions
diff --git a/bot/exts/utilities/__init__.py b/bot/exts/utilities/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/utilities/__init__.py diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py new file mode 100644 index 00000000..a91ef1c0 --- /dev/null +++ b/bot/exts/utilities/bookmark.py @@ -0,0 +1,153 @@ +import asyncio +import logging +import random +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS +from bot.utils.converters import WrappedMessageConverter +from bot.utils.decorators import whitelist_override + +log = logging.getLogger(__name__) + +# Number of seconds to wait for other users to bookmark the same message +TIMEOUT = 120 +BOOKMARK_EMOJI = "📌" +WHITELISTED_CATEGORIES = (Categories.help_in_use,) + + +class Bookmark(commands.Cog): + """Creates personal bookmarks by relaying a message link to the user's DMs.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed: + """Build the embed to DM the bookmark requester.""" + embed = discord.Embed( + title=title, + description=target_message.content, + colour=Colours.soft_green + ) + embed.add_field( + name="Wanna give it a visit?", + value=f"[Visit original message]({target_message.jump_url})" + ) + embed.set_author(name=target_message.author, icon_url=target_message.author.display_avatar.url) + embed.set_thumbnail(url=Icons.bookmark) + + return embed + + @staticmethod + def build_error_embed(user: discord.Member) -> discord.Embed: + """Builds an error embed for when a bookmark requester has DMs disabled.""" + return discord.Embed( + title=random.choice(ERROR_REPLIES), + description=f"{user.mention}, please enable your DMs to receive the bookmark.", + colour=Colours.soft_red + ) + + async def action_bookmark( + self, + channel: discord.TextChannel, + user: discord.Member, + target_message: discord.Message, + title: str + ) -> None: + """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" + try: + embed = self.build_bookmark_dm(target_message, title) + await user.send(embed=embed) + except discord.Forbidden: + error_embed = self.build_error_embed(user) + await channel.send(embed=error_embed) + else: + log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") + + @staticmethod + async def send_reaction_embed( + channel: discord.TextChannel, + target_message: discord.Message + ) -> discord.Message: + """Sends an embed, with a reaction, so users can react to bookmark the message too.""" + message = await channel.send( + embed=discord.Embed( + description=( + f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " + f"[this message]({target_message.jump_url})." + ), + colour=Colours.soft_green + ) + ) + + await message.add_reaction(BOOKMARK_EMOJI) + return message + + @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) + @commands.command(name="bookmark", aliases=("bm", "pin")) + async def bookmark( + self, + ctx: commands.Context, + target_message: Optional[WrappedMessageConverter], + *, + title: str = "Bookmark" + ) -> None: + """Send the author a link to `target_message` via DMs.""" + if not target_message: + if not ctx.message.reference: + raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") + target_message = ctx.message.reference.resolved + + # Prevent users from bookmarking a message in a channel they don't have access to + permissions = target_message.channel.permissions_for(ctx.author) + if not permissions.read_messages: + log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.") + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description="You don't have permission to view this channel." + ) + await ctx.send(embed=embed) + return + + def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: + """Make sure that this reaction is what we want to operate on.""" + return ( + # Conditions for a successful pagination: + all(( + # Reaction is on this message + reaction.message.id == reaction_message.id, + # User has not already bookmarked this message + user.id not in bookmarked_users, + # Reaction is the `BOOKMARK_EMOJI` emoji + str(reaction.emoji) == BOOKMARK_EMOJI, + # Reaction was not made by the Bot + user.id != self.bot.user.id + )) + ) + await self.action_bookmark(ctx.channel, ctx.author, target_message, title) + + # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs + bookmarked_users = [ctx.author.id] + reaction_message = await self.send_reaction_embed(ctx.channel, target_message) + + while True: + try: + _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check) + except asyncio.TimeoutError: + log.debug("Timed out waiting for a reaction") + break + log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.") + await self.action_bookmark(ctx.channel, user, target_message, title) + bookmarked_users.append(user.id) + + await reaction_message.delete() + + +def setup(bot: Bot) -> None: + """Load the Bookmark cog.""" + bot.add_cog(Bookmark(bot)) diff --git a/bot/exts/utilities/cheatsheet.py b/bot/exts/utilities/cheatsheet.py new file mode 100644 index 00000000..33d29f67 --- /dev/null +++ b/bot/exts/utilities/cheatsheet.py @@ -0,0 +1,112 @@ +import random +import re +from typing import Union +from urllib.parse import quote_plus + +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Context + +from bot import constants +from bot.bot import Bot +from bot.constants import Categories, Channels, Colours, ERROR_REPLIES +from bot.utils.decorators import whitelist_override + +ERROR_MESSAGE = f""" +Unknown cheat sheet. Please try to reformulate your query. + +**Examples**: +```md +{constants.Client.prefix}cht read json +{constants.Client.prefix}cht hello world +{constants.Client.prefix}cht lambda +``` +If the problem persists send a message in <#{Channels.dev_contrib}> +""" + +URL = "https://cheat.sh/python/{search}" +ESCAPE_TT = str.maketrans({"`": "\\`"}) +ANSI_RE = re.compile(r"\x1b\[.*?m") +# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html. +HEADERS = {"User-Agent": "curl/7.68.0"} + + +class CheatSheet(commands.Cog): + """Commands that sends a result of a cht.sh search in code blocks.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @staticmethod + def fmt_error_embed() -> Embed: + """ + Format the Error Embed. + + If the cht.sh search returned 404, overwrite it to send a custom error embed. + link -> https://github.com/chubin/cheat.sh/issues/198 + """ + embed = Embed( + title=random.choice(ERROR_REPLIES), + description=ERROR_MESSAGE, + colour=Colours.soft_red + ) + return embed + + def result_fmt(self, url: str, body_text: str) -> tuple[bool, Union[str, Embed]]: + """Format Result.""" + if body_text.startswith("# 404 NOT FOUND"): + embed = self.fmt_error_embed() + return True, embed + + body_space = min(1986 - len(url), 1000) + + if len(body_text) > body_space: + description = ( + f"**Result Of cht.sh**\n" + f"```python\n{body_text[:body_space]}\n" + f"... (truncated - too many lines)\n```\n" + f"Full results: {url} " + ) + else: + description = ( + f"**Result Of cht.sh**\n" + f"```python\n{body_text}\n```\n" + f"{url}" + ) + return False, description + + @commands.command( + name="cheat", + aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"), + ) + @commands.cooldown(1, 10, BucketType.user) + @whitelist_override(categories=[Categories.help_in_use]) + async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None: + """ + Search cheat.sh. + + Gets a post from https://cheat.sh/python/ by default. + Usage: + --> .cht read json + """ + async with ctx.typing(): + search_string = quote_plus(" ".join(search_terms)) + + async with self.bot.http_session.get( + URL.format(search=search_string), headers=HEADERS + ) as response: + result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT) + + is_embed, description = self.result_fmt( + URL.format(search=search_string), + result + ) + if is_embed: + await ctx.send(embed=description) + else: + await ctx.send(content=description) + + +def setup(bot: Bot) -> None: + """Load the CheatSheet cog.""" + bot.add_cog(CheatSheet(bot)) diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py new file mode 100644 index 00000000..dd537022 --- /dev/null +++ b/bot/exts/utilities/conversationstarters.py @@ -0,0 +1,69 @@ +from pathlib import Path + +import yaml +from discord import Color, Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import WHITELISTED_CHANNELS +from bot.utils.decorators import whitelist_override +from bot.utils.randomization import RandomCycle + +SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9" + +with Path("bot/resources/utilities/starter.yaml").open("r", encoding="utf8") as f: + STARTERS = yaml.load(f, Loader=yaml.FullLoader) + +with Path("bot/resources/utilities/py_topics.yaml").open("r", encoding="utf8") as f: + # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help. + PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader) + + # Removing `None` from lists of topics, if not a list, it is changed to an empty one. + PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()} + + # All the allowed channels that the ".topic" command is allowed to be executed in. + ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS) + +# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions. +ALL_TOPICS = {"default": STARTERS, **PY_TOPICS} +TOPICS = { + channel: RandomCycle(topics or ["No topics found for this channel."]) + for channel, topics in ALL_TOPICS.items() +} + + +class ConvoStarters(commands.Cog): + """General conversation topics.""" + + @commands.command() + @whitelist_override(channels=ALL_ALLOWED_CHANNELS) + async def topic(self, ctx: commands.Context) -> None: + """ + Responds with a random topic to start a conversation. + + If in a Python channel, a python-related topic will be given. + + Otherwise, a random conversation topic will be received by the user. + """ + # No matter what, the form will be shown. + embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple()) + + try: + # Fetching topics. + channel_topics = TOPICS[ctx.channel.id] + + # If the channel isn't Python-related. + except KeyError: + embed.title = f"**{next(TOPICS['default'])}**" + + # If the channel ID doesn't have any topics. + else: + embed.title = f"**{next(channel_topics)}**" + + finally: + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the ConvoStarters cog.""" + bot.add_cog(ConvoStarters()) diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py new file mode 100644 index 00000000..55d6b8e9 --- /dev/null +++ b/bot/exts/utilities/emoji.py @@ -0,0 +1,123 @@ +import logging +import random +import textwrap +from collections import defaultdict +from datetime import datetime +from typing import Optional + +from discord import Color, Embed, Emoji +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, ERROR_REPLIES +from bot.utils.extensions import invoke_help_command +from bot.utils.pagination import LinePaginator +from bot.utils.time import time_since + +log = logging.getLogger(__name__) + + +class Emojis(commands.Cog): + """A collection of commands related to emojis in the server.""" + + @staticmethod + def embed_builder(emoji: dict) -> tuple[Embed, list[str]]: + """Generates an embed with the emoji names and count.""" + embed = Embed( + color=Colours.orange, + title="Emoji Count", + timestamp=datetime.utcnow() + ) + msg = [] + + if len(emoji) == 1: + for category_name, category_emojis in emoji.items(): + if len(category_emojis) == 1: + msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.") + else: + msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.") + embed.set_thumbnail(url=random.choice(category_emojis).url) + + else: + for category_name, category_emojis in emoji.items(): + emoji_choice = random.choice(category_emojis) + if len(category_emojis) > 1: + emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category." + else: + emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category." + if emoji_choice.animated: + msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") + else: + msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") + return embed, msg + + @staticmethod + def generate_invalid_embed(emojis: list[Emoji]) -> tuple[Embed, list[str]]: + """Generates error embed for invalid emoji categories.""" + embed = Embed( + color=Colours.soft_red, + title=random.choice(ERROR_REPLIES) + ) + msg = [] + + emoji_dict = defaultdict(list) + for emoji in emojis: + emoji_dict[emoji.name.split("_")[0]].append(emoji) + + error_comp = ", ".join(emoji_dict) + msg.append(f"These are the valid emoji categories:\n```\n{error_comp}\n```") + return embed, msg + + @commands.group(name="emoji", invoke_without_command=True) + async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None: + """A group of commands related to emojis.""" + if emoji is not None: + await ctx.invoke(self.info_command, emoji) + else: + await invoke_help_command(ctx) + + @emoji_group.command(name="count", aliases=("c",)) + async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None: + """Returns embed with emoji category and info given by the user.""" + emoji_dict = defaultdict(list) + + if not ctx.guild.emojis: + await ctx.send("No emojis found.") + return + log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.") + for emoji in ctx.guild.emojis: + emoji_category = emoji.name.split("_")[0] + + if category_query is not None and emoji_category not in category_query: + continue + + emoji_dict[emoji_category].append(emoji) + + if not emoji_dict: + log.trace("Invalid name provided by the user") + embed, msg = self.generate_invalid_embed(ctx.guild.emojis) + else: + embed, msg = self.embed_builder(emoji_dict) + await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) + + @emoji_group.command(name="info", aliases=("i",)) + async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None: + """Returns relevant information about a Discord Emoji.""" + emoji_information = Embed( + title=f"Emoji Information: {emoji.name}", + description=textwrap.dedent(f""" + **Name:** {emoji.name} + **Created:** {time_since(emoji.created_at, precision="hours")} + **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} + **ID:** {emoji.id} + """), + color=Color.blurple(), + url=str(emoji.url), + ).set_thumbnail(url=emoji.url) + + await ctx.send(embed=emoji_information) + + +def setup(bot: Bot) -> None: + """Load the Emojis cog.""" + bot.add_cog(Emojis()) diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py new file mode 100644 index 00000000..d00b408d --- /dev/null +++ b/bot/exts/utilities/githubinfo.py @@ -0,0 +1,178 @@ +import logging +import random +from datetime import datetime +from urllib.parse import quote, quote_plus + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, NEGATIVE_REPLIES +from bot.exts.core.extensions import invoke_help_command + +log = logging.getLogger(__name__) + +GITHUB_API_URL = "https://api.github.com" + + +class GithubInfo(commands.Cog): + """Fetches info from GitHub.""" + + def __init__(self, bot: Bot): + self.bot = bot + + async def fetch_data(self, url: str) -> dict: + """Retrieve data as a dictionary.""" + async with self.bot.http_session.get(url) as r: + return await r.json() + + @commands.group(name="github", aliases=("gh", "git")) + @commands.cooldown(1, 10, commands.BucketType.user) + async def github_group(self, ctx: commands.Context) -> None: + """Commands for finding information related to GitHub.""" + if ctx.invoked_subcommand is None: + await invoke_help_command(ctx) + + @github_group.command(name="user", aliases=("userinfo",)) + async def github_user_info(self, ctx: commands.Context, username: str) -> None: + """Fetches a user's GitHub information.""" + async with ctx.typing(): + user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") + + # User_data will not have a message key if the user exists + if "message" in user_data: + embed = discord.Embed( + title=random.choice(NEGATIVE_REPLIES), + description=f"The profile for `{username}` was not found.", + colour=Colours.soft_red + ) + + await ctx.send(embed=embed) + return + + org_data = await self.fetch_data(user_data["organizations_url"]) + orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] + orgs_to_add = " | ".join(orgs) + + gists = user_data["public_gists"] + + # Forming blog link + if user_data["blog"].startswith("http"): # Blog link is complete + blog = user_data["blog"] + elif user_data["blog"]: # Blog exists but the link is not complete + blog = f"https://{user_data['blog']}" + else: + blog = "No website link available" + + embed = discord.Embed( + title=f"`{user_data['login']}`'s GitHub profile info", + description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "", + colour=discord.Colour.blurple(), + url=user_data["html_url"], + timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ") + ) + embed.set_thumbnail(url=user_data["avatar_url"]) + embed.set_footer(text="Account created at") + + if user_data["type"] == "User": + + embed.add_field( + name="Followers", + value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)" + ) + embed.add_field( + name="Following", + value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)" + ) + + embed.add_field( + name="Public repos", + value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)" + ) + + if user_data["type"] == "User": + embed.add_field( + name="Gists", + value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" + ) + + embed.add_field( + name=f"Organization{'s' if len(orgs)!=1 else ''}", + value=orgs_to_add if orgs else "No organizations." + ) + embed.add_field(name="Website", value=blog) + + await ctx.send(embed=embed) + + @github_group.command(name='repository', aliases=('repo',)) + async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None: + """ + Fetches a repositories' GitHub information. + + The repository should look like `user/reponame` or `user reponame`. + """ + repo = "/".join(repo) + if repo.count("/") != 1: + embed = discord.Embed( + title=random.choice(NEGATIVE_REPLIES), + description="The repository should look like `user/reponame` or `user reponame`.", + colour=Colours.soft_red + ) + + await ctx.send(embed=embed) + return + + async with ctx.typing(): + repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") + + # There won't be a message key if this repo exists + if "message" in repo_data: + embed = discord.Embed( + title=random.choice(NEGATIVE_REPLIES), + description="The requested repository was not found.", + colour=Colours.soft_red + ) + + await ctx.send(embed=embed) + return + + embed = discord.Embed( + title=repo_data["name"], + description=repo_data["description"], + colour=discord.Colour.blurple(), + url=repo_data["html_url"] + ) + + # If it's a fork, then it will have a parent key + try: + parent = repo_data["parent"] + embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})" + except KeyError: + log.debug("Repository is not a fork.") + + repo_owner = repo_data["owner"] + + embed.set_author( + name=repo_owner["login"], + url=repo_owner["html_url"], + icon_url=repo_owner["avatar_url"] + ) + + repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y") + last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M") + + embed.set_footer( + text=( + f"{repo_data['forks_count']} ⑂ " + f"• {repo_data['stargazers_count']} ⭐ " + f"• Created At {repo_created_at} " + f"• Last Commit {last_pushed}" + ) + ) + + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the GithubInfo cog.""" + bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py new file mode 100644 index 00000000..8a7ebed0 --- /dev/null +++ b/bot/exts/utilities/issues.py @@ -0,0 +1,275 @@ +import logging +import random +import re +from dataclasses import dataclass +from typing import Optional, Union + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import ( + Categories, + Channels, + Colours, + ERROR_REPLIES, + Emojis, + NEGATIVE_REPLIES, + Tokens, + WHITELISTED_CHANNELS +) +from bot.utils.decorators import whitelist_override +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) + +BAD_RESPONSE = { + 404: "Issue/pull request not located! Please enter a valid number!", + 403: "Rate limit has been hit! Please try again later!" +} +REQUEST_HEADERS = { + "Accept": "application/vnd.github.v3+json" +} + +REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" +ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" +PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" + +if GITHUB_TOKEN := Tokens.github: + REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" + +WHITELISTED_CATEGORIES = ( + Categories.development, Categories.devprojects, Categories.media, Categories.staff +) + +CODE_BLOCK_RE = re.compile( + r"^`([^`\n]+)`" # Inline codeblock + r"|```(.+?)```", # Multiline codeblock + re.DOTALL | re.MULTILINE +) + +# Maximum number of issues in one message +MAXIMUM_ISSUES = 5 + +# Regex used when looking for automatic linking in messages +# regex101 of current regex https://regex101.com/r/V2ji8M/6 +AUTOMATIC_REGEX = re.compile( + r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" +) + + +@dataclass +class FoundIssue: + """Dataclass representing an issue found by the regex.""" + + organisation: Optional[str] + repository: str + number: str + + def __hash__(self) -> int: + return hash((self.organisation, self.repository, self.number)) + + +@dataclass +class FetchError: + """Dataclass representing an error while fetching an issue.""" + + return_code: int + message: str + + +@dataclass +class IssueState: + """Dataclass representing the state of an issue.""" + + repository: str + number: int + url: str + title: str + emoji: str + + +class Issues(commands.Cog): + """Cog that allows users to retrieve issues from GitHub.""" + + def __init__(self, bot: Bot): + self.bot = bot + self.repos = [] + + @staticmethod + def remove_codeblocks(message: str) -> str: + """Remove any codeblock in a message.""" + return re.sub(CODE_BLOCK_RE, "", message) + + async def fetch_issues( + self, + number: int, + repository: str, + user: str + ) -> Union[IssueState, FetchError]: + """ + Retrieve an issue from a GitHub repository. + + Returns IssueState on success, FetchError on failure. + """ + url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) + pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) + log.trace(f"Querying GH issues API: {url}") + + async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: + json_data = await r.json() + + if r.status == 403: + if r.headers.get("X-RateLimit-Remaining") == "0": + log.info(f"Ratelimit reached while fetching {url}") + return FetchError(403, "Ratelimit reached, please retry in a few minutes.") + return FetchError(403, "Cannot access issue.") + elif r.status in (404, 410): + return FetchError(r.status, "Issue not found.") + elif r.status != 200: + return FetchError(r.status, "Error while fetching issue.") + + # The initial API request is made to the issues API endpoint, which will return information + # if the issue or PR is present. However, the scope of information returned for PRs differs + # from issues: if the 'issues' key is present in the response then we can pull the data we + # need from the initial API call. + if "issues" in json_data["html_url"]: + if json_data.get("state") == "open": + emoji = Emojis.issue_open + else: + emoji = Emojis.issue_closed + + # If the 'issues' key is not contained in the API response and there is no error code, then + # we know that a PR has been requested and a call to the pulls API endpoint is necessary + # to get the desired information for the PR. + else: + log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") + async with self.bot.http_session.get(pulls_url) as p: + pull_data = await p.json() + if pull_data["draft"]: + emoji = Emojis.pull_request_draft + elif pull_data["state"] == "open": + emoji = Emojis.pull_request_open + # When 'merged_at' is not None, this means that the state of the PR is merged + elif pull_data["merged_at"] is not None: + emoji = Emojis.pull_request_merged + else: + emoji = Emojis.pull_request_closed + + issue_url = json_data.get("html_url") + + return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) + + @staticmethod + def format_embed( + results: list[Union[IssueState, FetchError]], + user: str, + repository: Optional[str] = None + ) -> discord.Embed: + """Take a list of IssueState or FetchError and format a Discord embed for them.""" + description_list = [] + + for result in results: + if isinstance(result, IssueState): + description_list.append(f"{result.emoji} [{result.title}]({result.url})") + elif isinstance(result, FetchError): + description_list.append(f":x: [{result.return_code}] {result.message}") + + resp = discord.Embed( + colour=Colours.bright_green, + description="\n".join(description_list) + ) + + embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" + resp.set_author(name="GitHub", url=embed_url) + return resp + + @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) + @commands.command(aliases=("pr",)) + async def issue( + self, + ctx: commands.Context, + numbers: commands.Greedy[int], + repository: str = "sir-lancebot", + user: str = "python-discord" + ) -> None: + """Command to retrieve issue(s) from a GitHub repository.""" + # Remove duplicates + numbers = set(numbers) + + if len(numbers) > MAXIMUM_ISSUES: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" + ) + await ctx.send(embed=embed) + await invoke_help_command(ctx) + + results = [await self.fetch_issues(number, repository, user) for number in numbers] + await ctx.send(embed=self.format_embed(results, user, repository)) + + @commands.Cog.listener() + async def on_message(self, message: discord.Message) -> None: + """ + Automatic issue linking. + + Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. + """ + # Ignore bots + if message.author.bot: + return + + issues = [ + FoundIssue(*match.group("org", "repo", "number")) + for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) + ] + links = [] + + if issues: + # Block this from working in DMs + if not message.guild: + await message.channel.send( + embed=discord.Embed( + title=random.choice(NEGATIVE_REPLIES), + description=( + "You can't retrieve issues from DMs. " + f"Try again in <#{Channels.community_bot_commands}>" + ), + colour=Colours.soft_red + ) + ) + return + + log.trace(f"Found {issues = }") + # Remove duplicates + issues = set(issues) + + if len(issues) > MAXIMUM_ISSUES: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" + ) + await message.channel.send(embed=embed, delete_after=5) + return + + for repo_issue in issues: + result = await self.fetch_issues( + int(repo_issue.number), + repo_issue.repository, + repo_issue.organisation or "python-discord" + ) + if isinstance(result, IssueState): + links.append(result) + + if not links: + return + + resp = self.format_embed(links, "python-discord") + await message.channel.send(embed=resp) + + +def setup(bot: Bot) -> None: + """Load the Issues cog.""" + bot.add_cog(Issues(bot)) diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py new file mode 100644 index 00000000..36c7e0ab --- /dev/null +++ b/bot/exts/utilities/latex.py @@ -0,0 +1,101 @@ +import asyncio +import hashlib +import pathlib +import re +from concurrent.futures import ThreadPoolExecutor +from io import BytesIO + +import discord +import matplotlib.pyplot as plt +from discord.ext import commands + +from bot.bot import Bot + +# configure fonts and colors for matplotlib +plt.rcParams.update( + { + "font.size": 16, + "mathtext.fontset": "cm", # Computer Modern font set + "mathtext.rm": "serif", + "figure.facecolor": "36393F", # matches Discord's dark mode background color + "text.color": "white", + } +) + +FORMATTED_CODE_REGEX = re.compile( + r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block + r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) + r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code + r"(?P<code>.*?)" # extract all code inside the markup + r"\s*" # any more whitespace before the end of the code markup + r"(?P=delim)", # match the exact same delimiter from the start again + re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive +) + +CACHE_DIRECTORY = pathlib.Path("_latex_cache") +CACHE_DIRECTORY.mkdir(exist_ok=True) + + +class Latex(commands.Cog): + """Renders latex.""" + + @staticmethod + def _render(text: str, filepath: pathlib.Path) -> BytesIO: + """ + Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. + + Saves rendered image to cache. + """ + fig = plt.figure() + rendered_image = BytesIO() + fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") + + try: + plt.savefig(rendered_image, bbox_inches="tight", dpi=600) + except ValueError as e: + raise commands.BadArgument(str(e)) + + rendered_image.seek(0) + + with open(filepath, "wb") as f: + f.write(rendered_image.getbuffer()) + + return rendered_image + + @staticmethod + def _prepare_input(text: str) -> str: + text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\ + + if match := FORMATTED_CODE_REGEX.match(text): + return match.group("code") + else: + return text + + @commands.command() + @commands.max_concurrency(1, commands.BucketType.guild, wait=True) + async def latex(self, ctx: commands.Context, *, text: str) -> None: + """Renders the text in latex and sends the image.""" + text = self._prepare_input(text) + query_hash = hashlib.md5(text.encode()).hexdigest() + image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") + async with ctx.typing(): + if image_path.exists(): + await ctx.send(file=discord.File(image_path)) + return + + with ThreadPoolExecutor() as pool: + image = await asyncio.get_running_loop().run_in_executor( + pool, self._render, text, image_path + ) + + await ctx.send(file=discord.File(image, "latex.png")) + + +def setup(bot: Bot) -> None: + """Load the Latex Cog.""" + # As we have resource issues on this cog, + # we have it currently disabled while we fix it. + import logging + logging.info("Latex cog is currently disabled. It won't be loaded.") + return + bot.add_cog(Latex()) diff --git a/bot/exts/utilities/pythonfacts.py b/bot/exts/utilities/pythonfacts.py new file mode 100644 index 00000000..ef190185 --- /dev/null +++ b/bot/exts/utilities/pythonfacts.py @@ -0,0 +1,36 @@ +import itertools + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +with open("bot/resources/utilities/python_facts.txt") as file: + FACTS = itertools.cycle(list(file)) + +COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow]) +PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93" + + +class PythonFacts(commands.Cog): + """Sends a random fun fact about Python.""" + + @commands.command(name="pythonfact", aliases=("pyfact",)) + async def get_python_fact(self, ctx: commands.Context) -> None: + """Sends a Random fun fact about Python.""" + embed = discord.Embed( + title="Python Facts", + description=next(FACTS), + colour=next(COLORS) + ) + embed.add_field( + name="Suggestions", + value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})" + ) + await ctx.send(embed=embed) + + +def setup(bot: Bot) -> None: + """Load the PythonFacts Cog.""" + bot.add_cog(PythonFacts()) diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py new file mode 100644 index 00000000..ef8b2638 --- /dev/null +++ b/bot/exts/utilities/realpython.py @@ -0,0 +1,81 @@ +import logging +from html import unescape +from urllib.parse import quote_plus + +from discord import Embed +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours + +logger = logging.getLogger(__name__) + + +API_ROOT = "https://realpython.com/search/api/v1/" +ARTICLE_URL = "https://realpython.com{article_url}" +SEARCH_URL = "https://realpython.com/search?q={user_search}" + + +ERROR_EMBED = Embed( + title="Error while searching Real Python", + description="There was an error while trying to reach Real Python. Please try again shortly.", + color=Colours.soft_red, +) + + +class RealPython(commands.Cog): + """User initiated command to search for a Real Python article.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @commands.command(aliases=["rp"]) + @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) + async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: + """Send 5 articles that match the user's search terms.""" + params = {"q": user_search, "limit": 5, "kind": "article"} + async with self.bot.http_session.get(url=API_ROOT, params=params) as response: + if response.status != 200: + logger.error( + f"Unexpected status code {response.status} from Real Python" + ) + await ctx.send(embed=ERROR_EMBED) + return + + data = await response.json() + + articles = data["results"] + + if len(articles) == 0: + no_articles = Embed( + title=f"No articles found for '{user_search}'", color=Colours.soft_red + ) + await ctx.send(embed=no_articles) + return + + if len(articles) == 1: + article_description = "Here is the result:" + else: + article_description = f"Here are the top {len(articles)} results:" + + article_embed = Embed( + title="Search results - Real Python", + url=SEARCH_URL.format(user_search=quote_plus(user_search)), + description=article_description, + color=Colours.orange, + ) + + for article in articles: + article_embed.add_field( + name=unescape(article["title"]), + value=ARTICLE_URL.format(article_url=article["url"]), + inline=False, + ) + article_embed.set_footer(text="Click the links to go to the articles.") + + await ctx.send(embed=article_embed) + + +def setup(bot: Bot) -> None: + """Load the Real Python Cog.""" + bot.add_cog(RealPython(bot)) diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py new file mode 100644 index 00000000..e6cb5337 --- /dev/null +++ b/bot/exts/utilities/reddit.py @@ -0,0 +1,368 @@ +import asyncio +import logging +import random +import textwrap +from collections import namedtuple +from datetime import datetime, timedelta +from typing import Union + +from aiohttp import BasicAuth, ClientError +from discord import Colour, Embed, TextChannel +from discord.ext.commands import Cog, Context, group, has_any_role +from discord.ext.tasks import loop +from discord.utils import escape_markdown, sleep_until + +from bot.bot import Bot +from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES +from bot.utils.converters import Subreddit +from bot.utils.extensions import invoke_help_command +from bot.utils.messages import sub_clyde +from bot.utils.pagination import ImagePaginator, LinePaginator + +log = logging.getLogger(__name__) + +AccessToken = namedtuple("AccessToken", ["token", "expires_at"]) + + +class Reddit(Cog): + """Track subreddit posts and show detailed statistics about them.""" + + HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"} + URL = "https://www.reddit.com" + OAUTH_URL = "https://oauth.reddit.com" + MAX_RETRIES = 3 + + def __init__(self, bot: Bot): + self.bot = bot + + self.webhook = None + self.access_token = None + self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret) + + bot.loop.create_task(self.init_reddit_ready()) + self.auto_poster_loop.start() + + def cog_unload(self) -> None: + """Stop the loop task and revoke the access token when the cog is unloaded.""" + self.auto_poster_loop.cancel() + if self.access_token and self.access_token.expires_at > datetime.utcnow(): + asyncio.create_task(self.revoke_access_token()) + + async def init_reddit_ready(self) -> None: + """Sets the reddit webhook when the cog is loaded.""" + await self.bot.wait_until_guild_available() + if not self.webhook: + self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook) + + @property + def channel(self) -> TextChannel: + """Get the #reddit channel object from the bot's cache.""" + return self.bot.get_channel(Channels.reddit) + + def build_pagination_pages(self, posts: list[dict], paginate: bool) -> Union[list[tuple], str]: + """Build embed pages required for Paginator.""" + pages = [] + first_page = "" + for post in posts: + post_page = "" + image_url = "" + + data = post["data"] + + title = textwrap.shorten(data["title"], width=50, placeholder="...") + + # Normal brackets interfere with Markdown. + title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌") + link = self.URL + data["permalink"] + + first_page += f"**[{title.replace('*', '')}]({link})**\n" + + text = data["selftext"] + if text: + text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌") + first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n" + + ups = data["ups"] + comments = data["num_comments"] + author = data["author"] + + content_type = Emojis.reddit_post_text + if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))): + # This means the content type in the post is a video. + content_type = f"{Emojis.reddit_post_video}" + + elif data["url"].endswith(("jpg", "png", "gif")): + # This means the content type in the post is an image. + content_type = f"{Emojis.reddit_post_photo}" + image_url = data["url"] + + first_page += ( + f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}" + f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n" + ) + + if paginate: + post_page += f"**[{title}]({link})**\n\n" + if text: + post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n" + post_page += ( + f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002" + f"{comments}\u2003{Emojis.reddit_users}{author}" + ) + + pages.append((post_page, image_url)) + + if not paginate: + # Return the first summery page if pagination is not required + return first_page + + pages.insert(0, (first_page, "")) # Using image paginator, hence settings image url to empty string + return pages + + async def get_access_token(self) -> None: + """ + Get a Reddit API OAuth2 access token and assign it to self.access_token. + + A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog + will be unloaded and a ClientError raised if retrieval was still unsuccessful. + """ + for i in range(1, self.MAX_RETRIES + 1): + response = await self.bot.http_session.post( + url=f"{self.URL}/api/v1/access_token", + headers=self.HEADERS, + auth=self.client_auth, + data={ + "grant_type": "client_credentials", + "duration": "temporary" + } + ) + + if response.status == 200 and response.content_type == "application/json": + content = await response.json() + expiration = int(content["expires_in"]) - 60 # Subtract 1 minute for leeway. + self.access_token = AccessToken( + token=content["access_token"], + expires_at=datetime.utcnow() + timedelta(seconds=expiration) + ) + + log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}") + return + else: + log.debug( + f"Failed to get an access token: " + f"status {response.status} & content type {response.content_type}; " + f"retrying ({i}/{self.MAX_RETRIES})" + ) + + await asyncio.sleep(3) + + self.bot.remove_cog(self.qualified_name) + raise ClientError("Authentication with the Reddit API failed. Unloading the cog.") + + async def revoke_access_token(self) -> None: + """ + Revoke the OAuth2 access token for the Reddit API. + + For security reasons, it's good practice to revoke the token when it's no longer being used. + """ + response = await self.bot.http_session.post( + url=f"{self.URL}/api/v1/revoke_token", + headers=self.HEADERS, + auth=self.client_auth, + data={ + "token": self.access_token.token, + "token_type_hint": "access_token" + } + ) + + if response.status in [200, 204] and response.content_type == "application/json": + self.access_token = None + else: + log.warning(f"Unable to revoke access token: status {response.status}.") + + async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> list[dict]: + """A helper method to fetch a certain amount of Reddit posts at a given route.""" + # Reddit's JSON responses only provide 25 posts at most. + if not 25 >= amount > 0: + raise ValueError("Invalid amount of subreddit posts requested.") + + # Renew the token if necessary. + if not self.access_token or self.access_token.expires_at < datetime.utcnow(): + await self.get_access_token() + + url = f"{self.OAUTH_URL}/{route}" + for _ in range(self.MAX_RETRIES): + response = await self.bot.http_session.get( + url=url, + headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"}, + params=params + ) + if response.status == 200 and response.content_type == 'application/json': + # Got appropriate response - process and return. + content = await response.json() + posts = content["data"]["children"] + + filtered_posts = [post for post in posts if not post["data"]["over_18"]] + + return filtered_posts[:amount] + + await asyncio.sleep(3) + + log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}") + return list() # Failed to get appropriate response within allowed number of retries. + + async def get_top_posts( + self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False + ) -> Union[Embed, list[tuple]]: + """ + Get the top amount of posts for a given subreddit within a specified timeframe. + + A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top + weekly posts. + + The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most. + """ + embed = Embed() + + posts = await self.fetch_posts( + route=f"{subreddit}/top", + amount=amount, + params={"t": time} + ) + if not posts: + embed.title = random.choice(ERROR_REPLIES) + embed.colour = Colour.red() + embed.description = ( + "Sorry! We couldn't find any SFW posts from that subreddit. " + "If this problem persists, please let us know." + ) + + return embed + + if paginate: + return self.build_pagination_pages(posts, paginate=True) + + # Use only starting summary page for #reddit channel posts. + embed.description = self.build_pagination_pages(posts, paginate=False) + embed.colour = Colour.blurple() + return embed + + @loop() + async def auto_poster_loop(self) -> None: + """Post the top 5 posts daily, and the top 5 posts weekly.""" + # once d.py get support for `time` parameter in loop decorator, + # this can be removed and the loop can use the `time=datetime.time.min` parameter + now = datetime.utcnow() + tomorrow = now + timedelta(days=1) + midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0) + + await sleep_until(midnight_tomorrow) + + await self.bot.wait_until_guild_available() + if not self.webhook: + await self.bot.fetch_webhook(RedditConfig.webhook) + + if datetime.utcnow().weekday() == 0: + await self.top_weekly_posts() + # if it's a monday send the top weekly posts + + for subreddit in RedditConfig.subreddits: + top_posts = await self.get_top_posts(subreddit=subreddit, time="day") + username = sub_clyde(f"{subreddit} Top Daily Posts") + message = await self.webhook.send(username=username, embed=top_posts, wait=True) + + if message.channel.is_news(): + await message.publish() + + async def top_weekly_posts(self) -> None: + """Post a summary of the top posts.""" + for subreddit in RedditConfig.subreddits: + # Send and pin the new weekly posts. + top_posts = await self.get_top_posts(subreddit=subreddit, time="week") + username = sub_clyde(f"{subreddit} Top Weekly Posts") + message = await self.webhook.send(wait=True, username=username, embed=top_posts) + + if subreddit.lower() == "r/python": + if not self.channel: + log.warning("Failed to get #reddit channel to remove pins in the weekly loop.") + return + + # Remove the oldest pins so that only 12 remain at most. + pins = await self.channel.pins() + + while len(pins) >= 12: + await pins[-1].unpin() + del pins[-1] + + await message.pin() + + if message.channel.is_news(): + await message.publish() + + @group(name="reddit", invoke_without_command=True) + async def reddit_group(self, ctx: Context) -> None: + """View the top posts from various subreddits.""" + await invoke_help_command(ctx) + + @reddit_group.command(name="top") + async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: + """Send the top posts of all time from a given subreddit.""" + async with ctx.typing(): + pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True) + + await ctx.send(f"Here are the top {subreddit} posts of all time!") + embed = Embed( + color=Colour.blurple() + ) + + await ImagePaginator.paginate(pages, ctx, embed) + + @reddit_group.command(name="daily") + async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: + """Send the top posts of today from a given subreddit.""" + async with ctx.typing(): + pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True) + + await ctx.send(f"Here are today's top {subreddit} posts!") + embed = Embed( + color=Colour.blurple() + ) + + await ImagePaginator.paginate(pages, ctx, embed) + + @reddit_group.command(name="weekly") + async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: + """Send the top posts of this week from a given subreddit.""" + async with ctx.typing(): + pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True) + + await ctx.send(f"Here are this week's top {subreddit} posts!") + embed = Embed( + color=Colour.blurple() + ) + + await ImagePaginator.paginate(pages, ctx, embed) + + @has_any_role(*STAFF_ROLES) + @reddit_group.command(name="subreddits", aliases=("subs",)) + async def subreddits_command(self, ctx: Context) -> None: + """Send a paginated embed of all the subreddits we're relaying.""" + embed = Embed() + embed.title = "Relayed subreddits." + embed.colour = Colour.blurple() + + await LinePaginator.paginate( + RedditConfig.subreddits, + ctx, embed, + footer_text="Use the reddit commands along with these to view their posts.", + empty=False, + max_lines=15 + ) + + +def setup(bot: Bot) -> None: + """Load the Reddit cog.""" + if not RedditConfig.secret or not RedditConfig.client_id: + log.error("Credentials not provided, cog not loaded.") + return + bot.add_cog(Reddit(bot)) diff --git a/bot/exts/utilities/stackoverflow.py b/bot/exts/utilities/stackoverflow.py new file mode 100644 index 00000000..64455e33 --- /dev/null +++ b/bot/exts/utilities/stackoverflow.py @@ -0,0 +1,88 @@ +import logging +from html import unescape +from urllib.parse import quote_plus + +from discord import Embed, HTTPException +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, Emojis + +logger = logging.getLogger(__name__) + +BASE_URL = "https://api.stackexchange.com/2.2/search/advanced" +SO_PARAMS = { + "order": "desc", + "sort": "activity", + "site": "stackoverflow" +} +SEARCH_URL = "https://stackoverflow.com/search?q={query}" +ERR_EMBED = Embed( + title="Error in fetching results from Stackoverflow", + description=( + "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some " + "time. If this issue persists, please contact the staff or send a message in #dev-contrib." + ), + color=Colours.soft_red +) + + +class Stackoverflow(commands.Cog): + """Contains command to interact with stackoverflow from discord.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @commands.command(aliases=["so"]) + @commands.cooldown(1, 15, commands.cooldowns.BucketType.user) + async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None: + """Sends the top 5 results of a search query from stackoverflow.""" + params = SO_PARAMS | {"q": search_query} + async with self.bot.http_session.get(url=BASE_URL, params=params) as response: + if response.status == 200: + data = await response.json() + else: + logger.error(f'Status code is not 200, it is {response.status}') + await ctx.send(embed=ERR_EMBED) + return + if not data['items']: + no_search_result = Embed( + title=f"No search results found for {search_query}", + color=Colours.soft_red + ) + await ctx.send(embed=no_search_result) + return + + top5 = data["items"][:5] + encoded_search_query = quote_plus(search_query) + embed = Embed( + title="Search results - Stackoverflow", + url=SEARCH_URL.format(query=encoded_search_query), + description=f"Here are the top {len(top5)} results:", + color=Colours.orange + ) + for item in top5: + embed.add_field( + name=unescape(item['title']), + value=( + f"[{Emojis.reddit_upvote} {item['score']} " + f"{Emojis.stackoverflow_views} {item['view_count']} " + f"{Emojis.reddit_comments} {item['answer_count']} " + f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]" + f"({item['link']})" + ), + inline=False) + embed.set_footer(text="View the original link for more results.") + try: + await ctx.send(embed=embed) + except HTTPException: + search_query_too_long = Embed( + title="Your search query is too long, please try shortening your search query", + color=Colours.soft_red + ) + await ctx.send(embed=search_query_too_long) + + +def setup(bot: Bot) -> None: + """Load the Stackoverflow Cog.""" + bot.add_cog(Stackoverflow(bot)) diff --git a/bot/exts/utilities/timed.py b/bot/exts/utilities/timed.py new file mode 100644 index 00000000..2ea6b419 --- /dev/null +++ b/bot/exts/utilities/timed.py @@ -0,0 +1,48 @@ +from copy import copy +from time import perf_counter + +from discord import Message +from discord.ext import commands + +from bot.bot import Bot + + +class TimedCommands(commands.Cog): + """Time the command execution of a command.""" + + @staticmethod + async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context: + """Get a new execution context for a command.""" + msg: Message = copy(ctx.message) + msg.content = f"{ctx.prefix}{command}" + + return await ctx.bot.get_context(msg) + + @commands.command(name="timed", aliases=("time", "t")) + async def timed(self, ctx: commands.Context, *, command: str) -> None: + """Time the command execution of a command.""" + new_ctx = await self.create_execution_context(ctx, command) + + ctx.subcontext = new_ctx + + if not ctx.subcontext.command: + help_command = f"{ctx.prefix}help" + error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands." + + await ctx.send(error) + return + + if new_ctx.command.qualified_name == "timed": + await ctx.send("You are not allowed to time the execution of the `timed` command.") + return + + t_start = perf_counter() + await new_ctx.command.invoke(new_ctx) + t_end = perf_counter() + + await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.") + + +def setup(bot: Bot) -> None: + """Load the Timed cog.""" + bot.add_cog(TimedCommands()) diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py new file mode 100644 index 00000000..eccc1f8c --- /dev/null +++ b/bot/exts/utilities/wikipedia.py @@ -0,0 +1,100 @@ +import logging +import re +from datetime import datetime +from html import unescape + +from discord import Color, Embed, TextChannel +from discord.ext import commands + +from bot.bot import Bot +from bot.utils import LinePaginator +from bot.utils.exceptions import APIError + +log = logging.getLogger(__name__) + +SEARCH_API = ( + "https://en.wikipedia.org/w/api.php" +) +WIKI_PARAMS = { + "action": "query", + "list": "search", + "prop": "info", + "inprop": "url", + "utf8": "", + "format": "json", + "origin": "*", + +} +WIKI_THUMBNAIL = ( + "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg" + "/330px-Wikipedia-logo-v2.svg.png" +) +WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)" +WIKI_SEARCH_RESULT = ( + "**[{name}]({url})**\n" + "{description}\n" +) + + +class WikipediaSearch(commands.Cog): + """Get info from wikipedia.""" + + def __init__(self, bot: Bot): + self.bot = bot + + async def wiki_request(self, channel: TextChannel, search: str) -> list[str]: + """Search wikipedia search string and return formatted first 10 pages found.""" + params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search} + async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp: + if resp.status != 200: + log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`") + raise APIError("Wikipedia API", resp.status) + + raw_data = await resp.json() + + if not raw_data.get("query"): + if error := raw_data.get("errors"): + log.error(f"There was an error while communicating with the Wikipedia API: {error}") + raise APIError("Wikipedia API", resp.status, error) + + lines = [] + if raw_data["query"]["searchinfo"]["totalhits"]: + for article in raw_data["query"]["search"]: + line = WIKI_SEARCH_RESULT.format( + name=article["title"], + description=unescape( + re.sub( + WIKI_SNIPPET_REGEX, "", article["snippet"] + ) + ), + url=f"https://en.wikipedia.org/?curid={article['pageid']}" + ) + lines.append(line) + + return lines + + @commands.cooldown(1, 10, commands.BucketType.user) + @commands.command(name="wikipedia", aliases=("wiki",)) + async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None: + """Sends paginated top 10 results of Wikipedia search..""" + contents = await self.wiki_request(ctx.channel, search) + + if contents: + embed = Embed( + title="Wikipedia Search Results", + colour=Color.blurple() + ) + embed.set_thumbnail(url=WIKI_THUMBNAIL) + embed.timestamp = datetime.utcnow() + await LinePaginator.paginate( + contents, ctx, embed + ) + else: + await ctx.send( + "Sorry, we could not find a wikipedia article using that search term." + ) + + +def setup(bot: Bot) -> None: + """Load the WikipediaSearch cog.""" + bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/utilities/wolfram.py b/bot/exts/utilities/wolfram.py new file mode 100644 index 00000000..9a26e545 --- /dev/null +++ b/bot/exts/utilities/wolfram.py @@ -0,0 +1,293 @@ +import logging +from io import BytesIO +from typing import Callable, Optional +from urllib.parse import urlencode + +import arrow +import discord +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Cog, Context, check, group + +from bot.bot import Bot +from bot.constants import Colours, STAFF_ROLES, Wolfram +from bot.utils.pagination import ImagePaginator + +log = logging.getLogger(__name__) + +APPID = Wolfram.key +DEFAULT_OUTPUT_FORMAT = "JSON" +QUERY = "http://api.wolframalpha.com/v2/{request}" +WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" + +MAX_PODS = 20 + +# Allows for 10 wolfram calls pr user pr day +usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user) + +# Allows for max api requests / days in month per day for the entire guild (Temporary) +guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild) + + +async def send_embed( + ctx: Context, + message_txt: str, + colour: int = Colours.soft_red, + footer: str = None, + img_url: str = None, + f: discord.File = None +) -> None: + """Generate & send a response embed with Wolfram as the author.""" + embed = Embed(colour=colour) + embed.description = message_txt + embed.set_author( + name="Wolfram Alpha", + icon_url=WOLF_IMAGE, + url="https://www.wolframalpha.com/" + ) + if footer: + embed.set_footer(text=footer) + + if img_url: + embed.set_image(url=img_url) + + await ctx.send(embed=embed, file=f) + + +def custom_cooldown(*ignore: int) -> Callable: + """ + Implement per-user and per-guild cooldowns for requests to the Wolfram API. + + A list of roles may be provided to ignore the per-user cooldown. + """ + async def predicate(ctx: Context) -> bool: + if ctx.invoked_with == "help": + # if the invoked command is help we don't want to increase the ratelimits since it's not actually + # invoking the command/making a request, so instead just check if the user/guild are on cooldown. + guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown + # check the message is in a guild, and check user bucket if user is not ignored + if ctx.guild and not any(r.id in ignore for r in ctx.author.roles): + return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 + return guild_cooldown + + user_bucket = usercd.get_bucket(ctx.message) + + if all(role.id not in ignore for role in ctx.author.roles): + user_rate = user_bucket.update_rate_limit() + + if user_rate: + # Can't use api; cause: member limit + cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True) + message = ( + "You've used up your limit for Wolfram|Alpha requests.\n" + f"Cooldown: {cooldown}" + ) + await send_embed(ctx, message) + return False + + guild_bucket = guildcd.get_bucket(ctx.message) + guild_rate = guild_bucket.update_rate_limit() + + # Repr has a token attribute to read requests left + log.debug(guild_bucket) + + if guild_rate: + # Can't use api; cause: guild limit + message = ( + "The max limit of requests for the server has been reached for today.\n" + f"Cooldown: {int(guild_rate)}" + ) + await send_embed(ctx, message) + return False + + return True + + return check(predicate) + + +async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]: + """Get the Wolfram API pod pages for the provided query.""" + async with ctx.typing(): + params = { + "input": query, + "appid": APPID, + "output": DEFAULT_OUTPUT_FORMAT, + "format": "image,plaintext", + "location": "the moon", + "latlong": "0.0,0.0", + "ip": "1.1.1.1" + } + request_url = QUERY.format(request="query") + + async with bot.http_session.get(url=request_url, params=params) as response: + json = await response.json(content_type="text/plain") + + result = json["queryresult"] + log_full_url = f"{request_url}?{urlencode(params)}" + if result["error"]: + # API key not set up correctly + if result["error"]["msg"] == "Invalid appid": + message = "Wolfram API key is invalid or missing." + log.warning( + "API key seems to be missing, or invalid when " + f"processing a wolfram request: {log_full_url}, Response: {json}" + ) + await send_embed(ctx, message) + return None + + message = "Something went wrong internally with your request, please notify staff!" + log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}") + await send_embed(ctx, message) + return None + + if not result["success"]: + message = f"I couldn't find anything for {query}." + await send_embed(ctx, message) + return None + + if not result["numpods"]: + message = "Could not find any results." + await send_embed(ctx, message) + return None + + pods = result["pods"] + pages = [] + for pod in pods[:MAX_PODS]: + subs = pod.get("subpods") + + for sub in subs: + title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") + img = sub["img"]["src"] + pages.append((title, img)) + return pages + + +class Wolfram(Cog): + """Commands for interacting with the Wolfram|Alpha API.""" + + def __init__(self, bot: Bot): + self.bot = bot + + @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_command(self, ctx: Context, *, query: str) -> None: + """Requests all answers on a single image, sends an image of all related pods.""" + params = { + "i": query, + "appid": APPID, + "location": "the moon", + "latlong": "0.0,0.0", + "ip": "1.1.1.1" + } + request_url = QUERY.format(request="simple") + + # Give feedback that the bot is working. + async with ctx.typing(): + async with self.bot.http_session.get(url=request_url, params=params) as response: + status = response.status + image_bytes = await response.read() + + f = discord.File(BytesIO(image_bytes), filename="image.png") + image_url = "attachment://image.png" + + if status == 501: + message = "Failed to get response." + footer = "" + color = Colours.soft_red + elif status == 400: + message = "No input found." + footer = "" + color = Colours.soft_red + elif status == 403: + message = "Wolfram API key is invalid or missing." + footer = "" + color = Colours.soft_red + else: + message = "" + footer = "View original for a bigger picture." + color = Colours.soft_orange + + # Sends a "blank" embed if no request is received, unsure how to fix + await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) + + @wolfram_command.command(name="page", aliases=("pa", "p")) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: + """ + Requests a drawn image of given query. + + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. + """ + pages = await get_pod_pages(ctx, self.bot, query) + + if not pages: + return + + embed = Embed() + embed.set_author( + name="Wolfram Alpha", + icon_url=WOLF_IMAGE, + url="https://www.wolframalpha.com/" + ) + embed.colour = Colours.soft_orange + + await ImagePaginator.paginate(pages, ctx, embed) + + @wolfram_command.command(name="cut", aliases=("c",)) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: + """ + Requests a drawn image of given query. + + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. + """ + pages = await get_pod_pages(ctx, self.bot, query) + + if not pages: + return + + if len(pages) >= 2: + page = pages[1] + else: + page = pages[0] + + await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) + + @wolfram_command.command(name="short", aliases=("sh", "s")) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: + """Requests an answer to a simple question.""" + params = { + "i": query, + "appid": APPID, + "location": "the moon", + "latlong": "0.0,0.0", + "ip": "1.1.1.1" + } + request_url = QUERY.format(request="result") + + # Give feedback that the bot is working. + async with ctx.typing(): + async with self.bot.http_session.get(url=request_url, params=params) as response: + status = response.status + response_text = await response.text() + + if status == 501: + message = "Failed to get response." + color = Colours.soft_red + elif status == 400: + message = "No input found." + color = Colours.soft_red + elif response_text == "Error 1: Invalid appid.": + message = "Wolfram API key is invalid or missing." + color = Colours.soft_red + else: + message = response_text + color = Colours.soft_orange + + await send_embed(ctx, message, color) + + +def setup(bot: Bot) -> None: + """Load the Wolfram cog.""" + bot.add_cog(Wolfram(bot)) |