diff options
Diffstat (limited to 'bot/exts/evergreen')
| -rw-r--r-- | bot/exts/evergreen/bookmark.py | 153 | ||||
| -rw-r--r-- | bot/exts/evergreen/cheatsheet.py | 112 | ||||
| -rw-r--r-- | bot/exts/evergreen/conversationstarters.py | 69 | ||||
| -rw-r--r-- | bot/exts/evergreen/emoji.py | 123 | ||||
| -rw-r--r-- | bot/exts/evergreen/githubinfo.py | 178 | ||||
| -rw-r--r-- | bot/exts/evergreen/issues.py | 275 | ||||
| -rw-r--r-- | bot/exts/evergreen/latex.py | 101 | ||||
| -rw-r--r-- | bot/exts/evergreen/pythonfacts.py | 36 | ||||
| -rw-r--r-- | bot/exts/evergreen/realpython.py | 81 | ||||
| -rw-r--r-- | bot/exts/evergreen/reddit.py | 368 | ||||
| -rw-r--r-- | bot/exts/evergreen/stackoverflow.py | 88 | ||||
| -rw-r--r-- | bot/exts/evergreen/timed.py | 48 | ||||
| -rw-r--r-- | bot/exts/evergreen/wikipedia.py | 100 | ||||
| -rw-r--r-- | bot/exts/evergreen/wolfram.py | 293 |
14 files changed, 0 insertions, 2025 deletions
diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py deleted file mode 100644 index a91ef1c0..00000000 --- a/bot/exts/evergreen/bookmark.py +++ /dev/null @@ -1,153 +0,0 @@ -import asyncio -import logging -import random -from typing import Optional - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS -from bot.utils.converters import WrappedMessageConverter -from bot.utils.decorators import whitelist_override - -log = logging.getLogger(__name__) - -# Number of seconds to wait for other users to bookmark the same message -TIMEOUT = 120 -BOOKMARK_EMOJI = "📌" -WHITELISTED_CATEGORIES = (Categories.help_in_use,) - - -class Bookmark(commands.Cog): - """Creates personal bookmarks by relaying a message link to the user's DMs.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @staticmethod - def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed: - """Build the embed to DM the bookmark requester.""" - embed = discord.Embed( - title=title, - description=target_message.content, - colour=Colours.soft_green - ) - embed.add_field( - name="Wanna give it a visit?", - value=f"[Visit original message]({target_message.jump_url})" - ) - embed.set_author(name=target_message.author, icon_url=target_message.author.display_avatar.url) - embed.set_thumbnail(url=Icons.bookmark) - - return embed - - @staticmethod - def build_error_embed(user: discord.Member) -> discord.Embed: - """Builds an error embed for when a bookmark requester has DMs disabled.""" - return discord.Embed( - title=random.choice(ERROR_REPLIES), - description=f"{user.mention}, please enable your DMs to receive the bookmark.", - colour=Colours.soft_red - ) - - async def action_bookmark( - self, - channel: discord.TextChannel, - user: discord.Member, - target_message: discord.Message, - title: str - ) -> None: - """Sends the bookmark DM, or sends an error embed when a user bookmarks a message.""" - try: - embed = self.build_bookmark_dm(target_message, title) - await user.send(embed=embed) - except discord.Forbidden: - error_embed = self.build_error_embed(user) - await channel.send(embed=error_embed) - else: - log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'") - - @staticmethod - async def send_reaction_embed( - channel: discord.TextChannel, - target_message: discord.Message - ) -> discord.Message: - """Sends an embed, with a reaction, so users can react to bookmark the message too.""" - message = await channel.send( - embed=discord.Embed( - description=( - f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to " - f"[this message]({target_message.jump_url})." - ), - colour=Colours.soft_green - ) - ) - - await message.add_reaction(BOOKMARK_EMOJI) - return message - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(name="bookmark", aliases=("bm", "pin")) - async def bookmark( - self, - ctx: commands.Context, - target_message: Optional[WrappedMessageConverter], - *, - title: str = "Bookmark" - ) -> None: - """Send the author a link to `target_message` via DMs.""" - if not target_message: - if not ctx.message.reference: - raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.") - target_message = ctx.message.reference.resolved - - # Prevent users from bookmarking a message in a channel they don't have access to - permissions = target_message.channel.permissions_for(ctx.author) - if not permissions.read_messages: - log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.") - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description="You don't have permission to view this channel." - ) - await ctx.send(embed=embed) - return - - def event_check(reaction: discord.Reaction, user: discord.Member) -> bool: - """Make sure that this reaction is what we want to operate on.""" - return ( - # Conditions for a successful pagination: - all(( - # Reaction is on this message - reaction.message.id == reaction_message.id, - # User has not already bookmarked this message - user.id not in bookmarked_users, - # Reaction is the `BOOKMARK_EMOJI` emoji - str(reaction.emoji) == BOOKMARK_EMOJI, - # Reaction was not made by the Bot - user.id != self.bot.user.id - )) - ) - await self.action_bookmark(ctx.channel, ctx.author, target_message, title) - - # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs - bookmarked_users = [ctx.author.id] - reaction_message = await self.send_reaction_embed(ctx.channel, target_message) - - while True: - try: - _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check) - except asyncio.TimeoutError: - log.debug("Timed out waiting for a reaction") - break - log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.") - await self.action_bookmark(ctx.channel, user, target_message, title) - bookmarked_users.append(user.id) - - await reaction_message.delete() - - -def setup(bot: Bot) -> None: - """Load the Bookmark cog.""" - bot.add_cog(Bookmark(bot)) diff --git a/bot/exts/evergreen/cheatsheet.py b/bot/exts/evergreen/cheatsheet.py deleted file mode 100644 index 33d29f67..00000000 --- a/bot/exts/evergreen/cheatsheet.py +++ /dev/null @@ -1,112 +0,0 @@ -import random -import re -from typing import Union -from urllib.parse import quote_plus - -from discord import Embed -from discord.ext import commands -from discord.ext.commands import BucketType, Context - -from bot import constants -from bot.bot import Bot -from bot.constants import Categories, Channels, Colours, ERROR_REPLIES -from bot.utils.decorators import whitelist_override - -ERROR_MESSAGE = f""" -Unknown cheat sheet. Please try to reformulate your query. - -**Examples**: -```md -{constants.Client.prefix}cht read json -{constants.Client.prefix}cht hello world -{constants.Client.prefix}cht lambda -``` -If the problem persists send a message in <#{Channels.dev_contrib}> -""" - -URL = "https://cheat.sh/python/{search}" -ESCAPE_TT = str.maketrans({"`": "\\`"}) -ANSI_RE = re.compile(r"\x1b\[.*?m") -# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html. -HEADERS = {"User-Agent": "curl/7.68.0"} - - -class CheatSheet(commands.Cog): - """Commands that sends a result of a cht.sh search in code blocks.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @staticmethod - def fmt_error_embed() -> Embed: - """ - Format the Error Embed. - - If the cht.sh search returned 404, overwrite it to send a custom error embed. - link -> https://github.com/chubin/cheat.sh/issues/198 - """ - embed = Embed( - title=random.choice(ERROR_REPLIES), - description=ERROR_MESSAGE, - colour=Colours.soft_red - ) - return embed - - def result_fmt(self, url: str, body_text: str) -> tuple[bool, Union[str, Embed]]: - """Format Result.""" - if body_text.startswith("# 404 NOT FOUND"): - embed = self.fmt_error_embed() - return True, embed - - body_space = min(1986 - len(url), 1000) - - if len(body_text) > body_space: - description = ( - f"**Result Of cht.sh**\n" - f"```python\n{body_text[:body_space]}\n" - f"... (truncated - too many lines)\n```\n" - f"Full results: {url} " - ) - else: - description = ( - f"**Result Of cht.sh**\n" - f"```python\n{body_text}\n```\n" - f"{url}" - ) - return False, description - - @commands.command( - name="cheat", - aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"), - ) - @commands.cooldown(1, 10, BucketType.user) - @whitelist_override(categories=[Categories.help_in_use]) - async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None: - """ - Search cheat.sh. - - Gets a post from https://cheat.sh/python/ by default. - Usage: - --> .cht read json - """ - async with ctx.typing(): - search_string = quote_plus(" ".join(search_terms)) - - async with self.bot.http_session.get( - URL.format(search=search_string), headers=HEADERS - ) as response: - result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT) - - is_embed, description = self.result_fmt( - URL.format(search=search_string), - result - ) - if is_embed: - await ctx.send(embed=description) - else: - await ctx.send(content=description) - - -def setup(bot: Bot) -> None: - """Load the CheatSheet cog.""" - bot.add_cog(CheatSheet(bot)) diff --git a/bot/exts/evergreen/conversationstarters.py b/bot/exts/evergreen/conversationstarters.py deleted file mode 100644 index fdc4467a..00000000 --- a/bot/exts/evergreen/conversationstarters.py +++ /dev/null @@ -1,69 +0,0 @@ -from pathlib import Path - -import yaml -from discord import Color, Embed -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import WHITELISTED_CHANNELS -from bot.utils.decorators import whitelist_override -from bot.utils.randomization import RandomCycle - -SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9" - -with Path("bot/resources/evergreen/starter.yaml").open("r", encoding="utf8") as f: - STARTERS = yaml.load(f, Loader=yaml.FullLoader) - -with Path("bot/resources/evergreen/py_topics.yaml").open("r", encoding="utf8") as f: - # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help. - PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader) - - # Removing `None` from lists of topics, if not a list, it is changed to an empty one. - PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()} - - # All the allowed channels that the ".topic" command is allowed to be executed in. - ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS) - -# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions. -ALL_TOPICS = {"default": STARTERS, **PY_TOPICS} -TOPICS = { - channel: RandomCycle(topics or ["No topics found for this channel."]) - for channel, topics in ALL_TOPICS.items() -} - - -class ConvoStarters(commands.Cog): - """Evergreen conversation topics.""" - - @commands.command() - @whitelist_override(channels=ALL_ALLOWED_CHANNELS) - async def topic(self, ctx: commands.Context) -> None: - """ - Responds with a random topic to start a conversation. - - If in a Python channel, a python-related topic will be given. - - Otherwise, a random conversation topic will be received by the user. - """ - # No matter what, the form will be shown. - embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple()) - - try: - # Fetching topics. - channel_topics = TOPICS[ctx.channel.id] - - # If the channel isn't Python-related. - except KeyError: - embed.title = f"**{next(TOPICS['default'])}**" - - # If the channel ID doesn't have any topics. - else: - embed.title = f"**{next(channel_topics)}**" - - finally: - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the ConvoStarters cog.""" - bot.add_cog(ConvoStarters()) diff --git a/bot/exts/evergreen/emoji.py b/bot/exts/evergreen/emoji.py deleted file mode 100644 index 55d6b8e9..00000000 --- a/bot/exts/evergreen/emoji.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging -import random -import textwrap -from collections import defaultdict -from datetime import datetime -from typing import Optional - -from discord import Color, Embed, Emoji -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, ERROR_REPLIES -from bot.utils.extensions import invoke_help_command -from bot.utils.pagination import LinePaginator -from bot.utils.time import time_since - -log = logging.getLogger(__name__) - - -class Emojis(commands.Cog): - """A collection of commands related to emojis in the server.""" - - @staticmethod - def embed_builder(emoji: dict) -> tuple[Embed, list[str]]: - """Generates an embed with the emoji names and count.""" - embed = Embed( - color=Colours.orange, - title="Emoji Count", - timestamp=datetime.utcnow() - ) - msg = [] - - if len(emoji) == 1: - for category_name, category_emojis in emoji.items(): - if len(category_emojis) == 1: - msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.") - else: - msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.") - embed.set_thumbnail(url=random.choice(category_emojis).url) - - else: - for category_name, category_emojis in emoji.items(): - emoji_choice = random.choice(category_emojis) - if len(category_emojis) > 1: - emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category." - else: - emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category." - if emoji_choice.animated: - msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") - else: - msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}") - return embed, msg - - @staticmethod - def generate_invalid_embed(emojis: list[Emoji]) -> tuple[Embed, list[str]]: - """Generates error embed for invalid emoji categories.""" - embed = Embed( - color=Colours.soft_red, - title=random.choice(ERROR_REPLIES) - ) - msg = [] - - emoji_dict = defaultdict(list) - for emoji in emojis: - emoji_dict[emoji.name.split("_")[0]].append(emoji) - - error_comp = ", ".join(emoji_dict) - msg.append(f"These are the valid emoji categories:\n```\n{error_comp}\n```") - return embed, msg - - @commands.group(name="emoji", invoke_without_command=True) - async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None: - """A group of commands related to emojis.""" - if emoji is not None: - await ctx.invoke(self.info_command, emoji) - else: - await invoke_help_command(ctx) - - @emoji_group.command(name="count", aliases=("c",)) - async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None: - """Returns embed with emoji category and info given by the user.""" - emoji_dict = defaultdict(list) - - if not ctx.guild.emojis: - await ctx.send("No emojis found.") - return - log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.") - for emoji in ctx.guild.emojis: - emoji_category = emoji.name.split("_")[0] - - if category_query is not None and emoji_category not in category_query: - continue - - emoji_dict[emoji_category].append(emoji) - - if not emoji_dict: - log.trace("Invalid name provided by the user") - embed, msg = self.generate_invalid_embed(ctx.guild.emojis) - else: - embed, msg = self.embed_builder(emoji_dict) - await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) - - @emoji_group.command(name="info", aliases=("i",)) - async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None: - """Returns relevant information about a Discord Emoji.""" - emoji_information = Embed( - title=f"Emoji Information: {emoji.name}", - description=textwrap.dedent(f""" - **Name:** {emoji.name} - **Created:** {time_since(emoji.created_at, precision="hours")} - **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")} - **ID:** {emoji.id} - """), - color=Color.blurple(), - url=str(emoji.url), - ).set_thumbnail(url=emoji.url) - - await ctx.send(embed=emoji_information) - - -def setup(bot: Bot) -> None: - """Load the Emojis cog.""" - bot.add_cog(Emojis()) diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py deleted file mode 100644 index bbc9061a..00000000 --- a/bot/exts/evergreen/githubinfo.py +++ /dev/null @@ -1,178 +0,0 @@ -import logging -import random -from datetime import datetime -from urllib.parse import quote, quote_plus - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, NEGATIVE_REPLIES -from bot.exts.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -GITHUB_API_URL = "https://api.github.com" - - -class GithubInfo(commands.Cog): - """Fetches info from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - - async def fetch_data(self, url: str) -> dict: - """Retrieve data as a dictionary.""" - async with self.bot.http_session.get(url) as r: - return await r.json() - - @commands.group(name="github", aliases=("gh", "git")) - @commands.cooldown(1, 10, commands.BucketType.user) - async def github_group(self, ctx: commands.Context) -> None: - """Commands for finding information related to GitHub.""" - if ctx.invoked_subcommand is None: - await invoke_help_command(ctx) - - @github_group.command(name="user", aliases=("userinfo",)) - async def github_user_info(self, ctx: commands.Context, username: str) -> None: - """Fetches a user's GitHub information.""" - async with ctx.typing(): - user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}") - - # User_data will not have a message key if the user exists - if "message" in user_data: - embed = discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=f"The profile for `{username}` was not found.", - colour=Colours.soft_red - ) - - await ctx.send(embed=embed) - return - - org_data = await self.fetch_data(user_data["organizations_url"]) - orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] - orgs_to_add = " | ".join(orgs) - - gists = user_data["public_gists"] - - # Forming blog link - if user_data["blog"].startswith("http"): # Blog link is complete - blog = user_data["blog"] - elif user_data["blog"]: # Blog exists but the link is not complete - blog = f"https://{user_data['blog']}" - else: - blog = "No website link available" - - embed = discord.Embed( - title=f"`{user_data['login']}`'s GitHub profile info", - description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "", - colour=discord.Colour.blurple(), - url=user_data["html_url"], - timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ") - ) - embed.set_thumbnail(url=user_data["avatar_url"]) - embed.set_footer(text="Account created at") - - if user_data["type"] == "User": - - embed.add_field( - name="Followers", - value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)" - ) - embed.add_field( - name="Following", - value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)" - ) - - embed.add_field( - name="Public repos", - value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)" - ) - - if user_data["type"] == "User": - embed.add_field( - name="Gists", - value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})" - ) - - embed.add_field( - name=f"Organization{'s' if len(orgs)!=1 else ''}", - value=orgs_to_add if orgs else "No organizations." - ) - embed.add_field(name="Website", value=blog) - - await ctx.send(embed=embed) - - @github_group.command(name='repository', aliases=('repo',)) - async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None: - """ - Fetches a repositories' GitHub information. - - The repository should look like `user/reponame` or `user reponame`. - """ - repo = "/".join(repo) - if repo.count("/") != 1: - embed = discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description="The repository should look like `user/reponame` or `user reponame`.", - colour=Colours.soft_red - ) - - await ctx.send(embed=embed) - return - - async with ctx.typing(): - repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}") - - # There won't be a message key if this repo exists - if "message" in repo_data: - embed = discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description="The requested repository was not found.", - colour=Colours.soft_red - ) - - await ctx.send(embed=embed) - return - - embed = discord.Embed( - title=repo_data["name"], - description=repo_data["description"], - colour=discord.Colour.blurple(), - url=repo_data["html_url"] - ) - - # If it's a fork, then it will have a parent key - try: - parent = repo_data["parent"] - embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})" - except KeyError: - log.debug("Repository is not a fork.") - - repo_owner = repo_data["owner"] - - embed.set_author( - name=repo_owner["login"], - url=repo_owner["html_url"], - icon_url=repo_owner["avatar_url"] - ) - - repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y") - last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M") - - embed.set_footer( - text=( - f"{repo_data['forks_count']} ⑂ " - f"• {repo_data['stargazers_count']} ⭐ " - f"• Created At {repo_created_at} " - f"• Last Commit {last_pushed}" - ) - ) - - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the GithubInfo cog.""" - bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py deleted file mode 100644 index 8a7ebed0..00000000 --- a/bot/exts/evergreen/issues.py +++ /dev/null @@ -1,275 +0,0 @@ -import logging -import random -import re -from dataclasses import dataclass -from typing import Optional, Union - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import ( - Categories, - Channels, - Colours, - ERROR_REPLIES, - Emojis, - NEGATIVE_REPLIES, - Tokens, - WHITELISTED_CHANNELS -) -from bot.utils.decorators import whitelist_override -from bot.utils.extensions import invoke_help_command - -log = logging.getLogger(__name__) - -BAD_RESPONSE = { - 404: "Issue/pull request not located! Please enter a valid number!", - 403: "Rate limit has been hit! Please try again later!" -} -REQUEST_HEADERS = { - "Accept": "application/vnd.github.v3+json" -} - -REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public" -ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}" -PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}" - -if GITHUB_TOKEN := Tokens.github: - REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" - -WHITELISTED_CATEGORIES = ( - Categories.development, Categories.devprojects, Categories.media, Categories.staff -) - -CODE_BLOCK_RE = re.compile( - r"^`([^`\n]+)`" # Inline codeblock - r"|```(.+?)```", # Multiline codeblock - re.DOTALL | re.MULTILINE -) - -# Maximum number of issues in one message -MAXIMUM_ISSUES = 5 - -# Regex used when looking for automatic linking in messages -# regex101 of current regex https://regex101.com/r/V2ji8M/6 -AUTOMATIC_REGEX = re.compile( - r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)" -) - - -@dataclass -class FoundIssue: - """Dataclass representing an issue found by the regex.""" - - organisation: Optional[str] - repository: str - number: str - - def __hash__(self) -> int: - return hash((self.organisation, self.repository, self.number)) - - -@dataclass -class FetchError: - """Dataclass representing an error while fetching an issue.""" - - return_code: int - message: str - - -@dataclass -class IssueState: - """Dataclass representing the state of an issue.""" - - repository: str - number: int - url: str - title: str - emoji: str - - -class Issues(commands.Cog): - """Cog that allows users to retrieve issues from GitHub.""" - - def __init__(self, bot: Bot): - self.bot = bot - self.repos = [] - - @staticmethod - def remove_codeblocks(message: str) -> str: - """Remove any codeblock in a message.""" - return re.sub(CODE_BLOCK_RE, "", message) - - async def fetch_issues( - self, - number: int, - repository: str, - user: str - ) -> Union[IssueState, FetchError]: - """ - Retrieve an issue from a GitHub repository. - - Returns IssueState on success, FetchError on failure. - """ - url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number) - pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number) - log.trace(f"Querying GH issues API: {url}") - - async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: - json_data = await r.json() - - if r.status == 403: - if r.headers.get("X-RateLimit-Remaining") == "0": - log.info(f"Ratelimit reached while fetching {url}") - return FetchError(403, "Ratelimit reached, please retry in a few minutes.") - return FetchError(403, "Cannot access issue.") - elif r.status in (404, 410): - return FetchError(r.status, "Issue not found.") - elif r.status != 200: - return FetchError(r.status, "Error while fetching issue.") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data["html_url"]: - if json_data.get("state") == "open": - emoji = Emojis.issue_open - else: - emoji = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}") - async with self.bot.http_session.get(pulls_url) as p: - pull_data = await p.json() - if pull_data["draft"]: - emoji = Emojis.pull_request_draft - elif pull_data["state"] == "open": - emoji = Emojis.pull_request_open - # When 'merged_at' is not None, this means that the state of the PR is merged - elif pull_data["merged_at"] is not None: - emoji = Emojis.pull_request_merged - else: - emoji = Emojis.pull_request_closed - - issue_url = json_data.get("html_url") - - return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji) - - @staticmethod - def format_embed( - results: list[Union[IssueState, FetchError]], - user: str, - repository: Optional[str] = None - ) -> discord.Embed: - """Take a list of IssueState or FetchError and format a Discord embed for them.""" - description_list = [] - - for result in results: - if isinstance(result, IssueState): - description_list.append(f"{result.emoji} [{result.title}]({result.url})") - elif isinstance(result, FetchError): - description_list.append(f":x: [{result.return_code}] {result.message}") - - resp = discord.Embed( - colour=Colours.bright_green, - description="\n".join(description_list) - ) - - embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}" - resp.set_author(name="GitHub", url=embed_url) - return resp - - @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES) - @commands.command(aliases=("pr",)) - async def issue( - self, - ctx: commands.Context, - numbers: commands.Greedy[int], - repository: str = "sir-lancebot", - user: str = "python-discord" - ) -> None: - """Command to retrieve issue(s) from a GitHub repository.""" - # Remove duplicates - numbers = set(numbers) - - if len(numbers) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await ctx.send(embed=embed) - await invoke_help_command(ctx) - - results = [await self.fetch_issues(number, repository, user) for number in numbers] - await ctx.send(embed=self.format_embed(results, user, repository)) - - @commands.Cog.listener() - async def on_message(self, message: discord.Message) -> None: - """ - Automatic issue linking. - - Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>. - """ - # Ignore bots - if message.author.bot: - return - - issues = [ - FoundIssue(*match.group("org", "repo", "number")) - for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content)) - ] - links = [] - - if issues: - # Block this from working in DMs - if not message.guild: - await message.channel.send( - embed=discord.Embed( - title=random.choice(NEGATIVE_REPLIES), - description=( - "You can't retrieve issues from DMs. " - f"Try again in <#{Channels.community_bot_commands}>" - ), - colour=Colours.soft_red - ) - ) - return - - log.trace(f"Found {issues = }") - # Remove duplicates - issues = set(issues) - - if len(issues) > MAXIMUM_ISSUES: - embed = discord.Embed( - title=random.choice(ERROR_REPLIES), - color=Colours.soft_red, - description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})" - ) - await message.channel.send(embed=embed, delete_after=5) - return - - for repo_issue in issues: - result = await self.fetch_issues( - int(repo_issue.number), - repo_issue.repository, - repo_issue.organisation or "python-discord" - ) - if isinstance(result, IssueState): - links.append(result) - - if not links: - return - - resp = self.format_embed(links, "python-discord") - await message.channel.send(embed=resp) - - -def setup(bot: Bot) -> None: - """Load the Issues cog.""" - bot.add_cog(Issues(bot)) diff --git a/bot/exts/evergreen/latex.py b/bot/exts/evergreen/latex.py deleted file mode 100644 index 36c7e0ab..00000000 --- a/bot/exts/evergreen/latex.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import hashlib -import pathlib -import re -from concurrent.futures import ThreadPoolExecutor -from io import BytesIO - -import discord -import matplotlib.pyplot as plt -from discord.ext import commands - -from bot.bot import Bot - -# configure fonts and colors for matplotlib -plt.rcParams.update( - { - "font.size": 16, - "mathtext.fontset": "cm", # Computer Modern font set - "mathtext.rm": "serif", - "figure.facecolor": "36393F", # matches Discord's dark mode background color - "text.color": "white", - } -) - -FORMATTED_CODE_REGEX = re.compile( - r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block - r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline) - r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code - r"(?P<code>.*?)" # extract all code inside the markup - r"\s*" # any more whitespace before the end of the code markup - r"(?P=delim)", # match the exact same delimiter from the start again - re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive -) - -CACHE_DIRECTORY = pathlib.Path("_latex_cache") -CACHE_DIRECTORY.mkdir(exist_ok=True) - - -class Latex(commands.Cog): - """Renders latex.""" - - @staticmethod - def _render(text: str, filepath: pathlib.Path) -> BytesIO: - """ - Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception. - - Saves rendered image to cache. - """ - fig = plt.figure() - rendered_image = BytesIO() - fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top") - - try: - plt.savefig(rendered_image, bbox_inches="tight", dpi=600) - except ValueError as e: - raise commands.BadArgument(str(e)) - - rendered_image.seek(0) - - with open(filepath, "wb") as f: - f.write(rendered_image.getbuffer()) - - return rendered_image - - @staticmethod - def _prepare_input(text: str) -> str: - text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\ - - if match := FORMATTED_CODE_REGEX.match(text): - return match.group("code") - else: - return text - - @commands.command() - @commands.max_concurrency(1, commands.BucketType.guild, wait=True) - async def latex(self, ctx: commands.Context, *, text: str) -> None: - """Renders the text in latex and sends the image.""" - text = self._prepare_input(text) - query_hash = hashlib.md5(text.encode()).hexdigest() - image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png") - async with ctx.typing(): - if image_path.exists(): - await ctx.send(file=discord.File(image_path)) - return - - with ThreadPoolExecutor() as pool: - image = await asyncio.get_running_loop().run_in_executor( - pool, self._render, text, image_path - ) - - await ctx.send(file=discord.File(image, "latex.png")) - - -def setup(bot: Bot) -> None: - """Load the Latex Cog.""" - # As we have resource issues on this cog, - # we have it currently disabled while we fix it. - import logging - logging.info("Latex cog is currently disabled. It won't be loaded.") - return - bot.add_cog(Latex()) diff --git a/bot/exts/evergreen/pythonfacts.py b/bot/exts/evergreen/pythonfacts.py deleted file mode 100644 index 80a8da5d..00000000 --- a/bot/exts/evergreen/pythonfacts.py +++ /dev/null @@ -1,36 +0,0 @@ -import itertools - -import discord -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours - -with open("bot/resources/evergreen/python_facts.txt") as file: - FACTS = itertools.cycle(list(file)) - -COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow]) -PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93" - - -class PythonFacts(commands.Cog): - """Sends a random fun fact about Python.""" - - @commands.command(name="pythonfact", aliases=("pyfact",)) - async def get_python_fact(self, ctx: commands.Context) -> None: - """Sends a Random fun fact about Python.""" - embed = discord.Embed( - title="Python Facts", - description=next(FACTS), - colour=next(COLORS) - ) - embed.add_field( - name="Suggestions", - value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})" - ) - await ctx.send(embed=embed) - - -def setup(bot: Bot) -> None: - """Load the PythonFacts Cog.""" - bot.add_cog(PythonFacts()) diff --git a/bot/exts/evergreen/realpython.py b/bot/exts/evergreen/realpython.py deleted file mode 100644 index ef8b2638..00000000 --- a/bot/exts/evergreen/realpython.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -from html import unescape -from urllib.parse import quote_plus - -from discord import Embed -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours - -logger = logging.getLogger(__name__) - - -API_ROOT = "https://realpython.com/search/api/v1/" -ARTICLE_URL = "https://realpython.com{article_url}" -SEARCH_URL = "https://realpython.com/search?q={user_search}" - - -ERROR_EMBED = Embed( - title="Error while searching Real Python", - description="There was an error while trying to reach Real Python. Please try again shortly.", - color=Colours.soft_red, -) - - -class RealPython(commands.Cog): - """User initiated command to search for a Real Python article.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.command(aliases=["rp"]) - @commands.cooldown(1, 10, commands.cooldowns.BucketType.user) - async def realpython(self, ctx: commands.Context, *, user_search: str) -> None: - """Send 5 articles that match the user's search terms.""" - params = {"q": user_search, "limit": 5, "kind": "article"} - async with self.bot.http_session.get(url=API_ROOT, params=params) as response: - if response.status != 200: - logger.error( - f"Unexpected status code {response.status} from Real Python" - ) - await ctx.send(embed=ERROR_EMBED) - return - - data = await response.json() - - articles = data["results"] - - if len(articles) == 0: - no_articles = Embed( - title=f"No articles found for '{user_search}'", color=Colours.soft_red - ) - await ctx.send(embed=no_articles) - return - - if len(articles) == 1: - article_description = "Here is the result:" - else: - article_description = f"Here are the top {len(articles)} results:" - - article_embed = Embed( - title="Search results - Real Python", - url=SEARCH_URL.format(user_search=quote_plus(user_search)), - description=article_description, - color=Colours.orange, - ) - - for article in articles: - article_embed.add_field( - name=unescape(article["title"]), - value=ARTICLE_URL.format(article_url=article["url"]), - inline=False, - ) - article_embed.set_footer(text="Click the links to go to the articles.") - - await ctx.send(embed=article_embed) - - -def setup(bot: Bot) -> None: - """Load the Real Python Cog.""" - bot.add_cog(RealPython(bot)) diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py deleted file mode 100644 index e6cb5337..00000000 --- a/bot/exts/evergreen/reddit.py +++ /dev/null @@ -1,368 +0,0 @@ -import asyncio -import logging -import random -import textwrap -from collections import namedtuple -from datetime import datetime, timedelta -from typing import Union - -from aiohttp import BasicAuth, ClientError -from discord import Colour, Embed, TextChannel -from discord.ext.commands import Cog, Context, group, has_any_role -from discord.ext.tasks import loop -from discord.utils import escape_markdown, sleep_until - -from bot.bot import Bot -from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES -from bot.utils.converters import Subreddit -from bot.utils.extensions import invoke_help_command -from bot.utils.messages import sub_clyde -from bot.utils.pagination import ImagePaginator, LinePaginator - -log = logging.getLogger(__name__) - -AccessToken = namedtuple("AccessToken", ["token", "expires_at"]) - - -class Reddit(Cog): - """Track subreddit posts and show detailed statistics about them.""" - - HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"} - URL = "https://www.reddit.com" - OAUTH_URL = "https://oauth.reddit.com" - MAX_RETRIES = 3 - - def __init__(self, bot: Bot): - self.bot = bot - - self.webhook = None - self.access_token = None - self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret) - - bot.loop.create_task(self.init_reddit_ready()) - self.auto_poster_loop.start() - - def cog_unload(self) -> None: - """Stop the loop task and revoke the access token when the cog is unloaded.""" - self.auto_poster_loop.cancel() - if self.access_token and self.access_token.expires_at > datetime.utcnow(): - asyncio.create_task(self.revoke_access_token()) - - async def init_reddit_ready(self) -> None: - """Sets the reddit webhook when the cog is loaded.""" - await self.bot.wait_until_guild_available() - if not self.webhook: - self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook) - - @property - def channel(self) -> TextChannel: - """Get the #reddit channel object from the bot's cache.""" - return self.bot.get_channel(Channels.reddit) - - def build_pagination_pages(self, posts: list[dict], paginate: bool) -> Union[list[tuple], str]: - """Build embed pages required for Paginator.""" - pages = [] - first_page = "" - for post in posts: - post_page = "" - image_url = "" - - data = post["data"] - - title = textwrap.shorten(data["title"], width=50, placeholder="...") - - # Normal brackets interfere with Markdown. - title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌") - link = self.URL + data["permalink"] - - first_page += f"**[{title.replace('*', '')}]({link})**\n" - - text = data["selftext"] - if text: - text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌") - first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n" - - ups = data["ups"] - comments = data["num_comments"] - author = data["author"] - - content_type = Emojis.reddit_post_text - if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))): - # This means the content type in the post is a video. - content_type = f"{Emojis.reddit_post_video}" - - elif data["url"].endswith(("jpg", "png", "gif")): - # This means the content type in the post is an image. - content_type = f"{Emojis.reddit_post_photo}" - image_url = data["url"] - - first_page += ( - f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}" - f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n" - ) - - if paginate: - post_page += f"**[{title}]({link})**\n\n" - if text: - post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n" - post_page += ( - f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002" - f"{comments}\u2003{Emojis.reddit_users}{author}" - ) - - pages.append((post_page, image_url)) - - if not paginate: - # Return the first summery page if pagination is not required - return first_page - - pages.insert(0, (first_page, "")) # Using image paginator, hence settings image url to empty string - return pages - - async def get_access_token(self) -> None: - """ - Get a Reddit API OAuth2 access token and assign it to self.access_token. - - A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog - will be unloaded and a ClientError raised if retrieval was still unsuccessful. - """ - for i in range(1, self.MAX_RETRIES + 1): - response = await self.bot.http_session.post( - url=f"{self.URL}/api/v1/access_token", - headers=self.HEADERS, - auth=self.client_auth, - data={ - "grant_type": "client_credentials", - "duration": "temporary" - } - ) - - if response.status == 200 and response.content_type == "application/json": - content = await response.json() - expiration = int(content["expires_in"]) - 60 # Subtract 1 minute for leeway. - self.access_token = AccessToken( - token=content["access_token"], - expires_at=datetime.utcnow() + timedelta(seconds=expiration) - ) - - log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}") - return - else: - log.debug( - f"Failed to get an access token: " - f"status {response.status} & content type {response.content_type}; " - f"retrying ({i}/{self.MAX_RETRIES})" - ) - - await asyncio.sleep(3) - - self.bot.remove_cog(self.qualified_name) - raise ClientError("Authentication with the Reddit API failed. Unloading the cog.") - - async def revoke_access_token(self) -> None: - """ - Revoke the OAuth2 access token for the Reddit API. - - For security reasons, it's good practice to revoke the token when it's no longer being used. - """ - response = await self.bot.http_session.post( - url=f"{self.URL}/api/v1/revoke_token", - headers=self.HEADERS, - auth=self.client_auth, - data={ - "token": self.access_token.token, - "token_type_hint": "access_token" - } - ) - - if response.status in [200, 204] and response.content_type == "application/json": - self.access_token = None - else: - log.warning(f"Unable to revoke access token: status {response.status}.") - - async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> list[dict]: - """A helper method to fetch a certain amount of Reddit posts at a given route.""" - # Reddit's JSON responses only provide 25 posts at most. - if not 25 >= amount > 0: - raise ValueError("Invalid amount of subreddit posts requested.") - - # Renew the token if necessary. - if not self.access_token or self.access_token.expires_at < datetime.utcnow(): - await self.get_access_token() - - url = f"{self.OAUTH_URL}/{route}" - for _ in range(self.MAX_RETRIES): - response = await self.bot.http_session.get( - url=url, - headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"}, - params=params - ) - if response.status == 200 and response.content_type == 'application/json': - # Got appropriate response - process and return. - content = await response.json() - posts = content["data"]["children"] - - filtered_posts = [post for post in posts if not post["data"]["over_18"]] - - return filtered_posts[:amount] - - await asyncio.sleep(3) - - log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}") - return list() # Failed to get appropriate response within allowed number of retries. - - async def get_top_posts( - self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False - ) -> Union[Embed, list[tuple]]: - """ - Get the top amount of posts for a given subreddit within a specified timeframe. - - A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top - weekly posts. - - The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most. - """ - embed = Embed() - - posts = await self.fetch_posts( - route=f"{subreddit}/top", - amount=amount, - params={"t": time} - ) - if not posts: - embed.title = random.choice(ERROR_REPLIES) - embed.colour = Colour.red() - embed.description = ( - "Sorry! We couldn't find any SFW posts from that subreddit. " - "If this problem persists, please let us know." - ) - - return embed - - if paginate: - return self.build_pagination_pages(posts, paginate=True) - - # Use only starting summary page for #reddit channel posts. - embed.description = self.build_pagination_pages(posts, paginate=False) - embed.colour = Colour.blurple() - return embed - - @loop() - async def auto_poster_loop(self) -> None: - """Post the top 5 posts daily, and the top 5 posts weekly.""" - # once d.py get support for `time` parameter in loop decorator, - # this can be removed and the loop can use the `time=datetime.time.min` parameter - now = datetime.utcnow() - tomorrow = now + timedelta(days=1) - midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0) - - await sleep_until(midnight_tomorrow) - - await self.bot.wait_until_guild_available() - if not self.webhook: - await self.bot.fetch_webhook(RedditConfig.webhook) - - if datetime.utcnow().weekday() == 0: - await self.top_weekly_posts() - # if it's a monday send the top weekly posts - - for subreddit in RedditConfig.subreddits: - top_posts = await self.get_top_posts(subreddit=subreddit, time="day") - username = sub_clyde(f"{subreddit} Top Daily Posts") - message = await self.webhook.send(username=username, embed=top_posts, wait=True) - - if message.channel.is_news(): - await message.publish() - - async def top_weekly_posts(self) -> None: - """Post a summary of the top posts.""" - for subreddit in RedditConfig.subreddits: - # Send and pin the new weekly posts. - top_posts = await self.get_top_posts(subreddit=subreddit, time="week") - username = sub_clyde(f"{subreddit} Top Weekly Posts") - message = await self.webhook.send(wait=True, username=username, embed=top_posts) - - if subreddit.lower() == "r/python": - if not self.channel: - log.warning("Failed to get #reddit channel to remove pins in the weekly loop.") - return - - # Remove the oldest pins so that only 12 remain at most. - pins = await self.channel.pins() - - while len(pins) >= 12: - await pins[-1].unpin() - del pins[-1] - - await message.pin() - - if message.channel.is_news(): - await message.publish() - - @group(name="reddit", invoke_without_command=True) - async def reddit_group(self, ctx: Context) -> None: - """View the top posts from various subreddits.""" - await invoke_help_command(ctx) - - @reddit_group.command(name="top") - async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: - """Send the top posts of all time from a given subreddit.""" - async with ctx.typing(): - pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True) - - await ctx.send(f"Here are the top {subreddit} posts of all time!") - embed = Embed( - color=Colour.blurple() - ) - - await ImagePaginator.paginate(pages, ctx, embed) - - @reddit_group.command(name="daily") - async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: - """Send the top posts of today from a given subreddit.""" - async with ctx.typing(): - pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True) - - await ctx.send(f"Here are today's top {subreddit} posts!") - embed = Embed( - color=Colour.blurple() - ) - - await ImagePaginator.paginate(pages, ctx, embed) - - @reddit_group.command(name="weekly") - async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: - """Send the top posts of this week from a given subreddit.""" - async with ctx.typing(): - pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True) - - await ctx.send(f"Here are this week's top {subreddit} posts!") - embed = Embed( - color=Colour.blurple() - ) - - await ImagePaginator.paginate(pages, ctx, embed) - - @has_any_role(*STAFF_ROLES) - @reddit_group.command(name="subreddits", aliases=("subs",)) - async def subreddits_command(self, ctx: Context) -> None: - """Send a paginated embed of all the subreddits we're relaying.""" - embed = Embed() - embed.title = "Relayed subreddits." - embed.colour = Colour.blurple() - - await LinePaginator.paginate( - RedditConfig.subreddits, - ctx, embed, - footer_text="Use the reddit commands along with these to view their posts.", - empty=False, - max_lines=15 - ) - - -def setup(bot: Bot) -> None: - """Load the Reddit cog.""" - if not RedditConfig.secret or not RedditConfig.client_id: - log.error("Credentials not provided, cog not loaded.") - return - bot.add_cog(Reddit(bot)) diff --git a/bot/exts/evergreen/stackoverflow.py b/bot/exts/evergreen/stackoverflow.py deleted file mode 100644 index 64455e33..00000000 --- a/bot/exts/evergreen/stackoverflow.py +++ /dev/null @@ -1,88 +0,0 @@ -import logging -from html import unescape -from urllib.parse import quote_plus - -from discord import Embed, HTTPException -from discord.ext import commands - -from bot.bot import Bot -from bot.constants import Colours, Emojis - -logger = logging.getLogger(__name__) - -BASE_URL = "https://api.stackexchange.com/2.2/search/advanced" -SO_PARAMS = { - "order": "desc", - "sort": "activity", - "site": "stackoverflow" -} -SEARCH_URL = "https://stackoverflow.com/search?q={query}" -ERR_EMBED = Embed( - title="Error in fetching results from Stackoverflow", - description=( - "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some " - "time. If this issue persists, please contact the staff or send a message in #dev-contrib." - ), - color=Colours.soft_red -) - - -class Stackoverflow(commands.Cog): - """Contains command to interact with stackoverflow from discord.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @commands.command(aliases=["so"]) - @commands.cooldown(1, 15, commands.cooldowns.BucketType.user) - async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None: - """Sends the top 5 results of a search query from stackoverflow.""" - params = SO_PARAMS | {"q": search_query} - async with self.bot.http_session.get(url=BASE_URL, params=params) as response: - if response.status == 200: - data = await response.json() - else: - logger.error(f'Status code is not 200, it is {response.status}') - await ctx.send(embed=ERR_EMBED) - return - if not data['items']: - no_search_result = Embed( - title=f"No search results found for {search_query}", - color=Colours.soft_red - ) - await ctx.send(embed=no_search_result) - return - - top5 = data["items"][:5] - encoded_search_query = quote_plus(search_query) - embed = Embed( - title="Search results - Stackoverflow", - url=SEARCH_URL.format(query=encoded_search_query), - description=f"Here are the top {len(top5)} results:", - color=Colours.orange - ) - for item in top5: - embed.add_field( - name=unescape(item['title']), - value=( - f"[{Emojis.reddit_upvote} {item['score']} " - f"{Emojis.stackoverflow_views} {item['view_count']} " - f"{Emojis.reddit_comments} {item['answer_count']} " - f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]" - f"({item['link']})" - ), - inline=False) - embed.set_footer(text="View the original link for more results.") - try: - await ctx.send(embed=embed) - except HTTPException: - search_query_too_long = Embed( - title="Your search query is too long, please try shortening your search query", - color=Colours.soft_red - ) - await ctx.send(embed=search_query_too_long) - - -def setup(bot: Bot) -> None: - """Load the Stackoverflow Cog.""" - bot.add_cog(Stackoverflow(bot)) diff --git a/bot/exts/evergreen/timed.py b/bot/exts/evergreen/timed.py deleted file mode 100644 index 2ea6b419..00000000 --- a/bot/exts/evergreen/timed.py +++ /dev/null @@ -1,48 +0,0 @@ -from copy import copy -from time import perf_counter - -from discord import Message -from discord.ext import commands - -from bot.bot import Bot - - -class TimedCommands(commands.Cog): - """Time the command execution of a command.""" - - @staticmethod - async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context: - """Get a new execution context for a command.""" - msg: Message = copy(ctx.message) - msg.content = f"{ctx.prefix}{command}" - - return await ctx.bot.get_context(msg) - - @commands.command(name="timed", aliases=("time", "t")) - async def timed(self, ctx: commands.Context, *, command: str) -> None: - """Time the command execution of a command.""" - new_ctx = await self.create_execution_context(ctx, command) - - ctx.subcontext = new_ctx - - if not ctx.subcontext.command: - help_command = f"{ctx.prefix}help" - error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands." - - await ctx.send(error) - return - - if new_ctx.command.qualified_name == "timed": - await ctx.send("You are not allowed to time the execution of the `timed` command.") - return - - t_start = perf_counter() - await new_ctx.command.invoke(new_ctx) - t_end = perf_counter() - - await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.") - - -def setup(bot: Bot) -> None: - """Load the Timed cog.""" - bot.add_cog(TimedCommands()) diff --git a/bot/exts/evergreen/wikipedia.py b/bot/exts/evergreen/wikipedia.py deleted file mode 100644 index eccc1f8c..00000000 --- a/bot/exts/evergreen/wikipedia.py +++ /dev/null @@ -1,100 +0,0 @@ -import logging -import re -from datetime import datetime -from html import unescape - -from discord import Color, Embed, TextChannel -from discord.ext import commands - -from bot.bot import Bot -from bot.utils import LinePaginator -from bot.utils.exceptions import APIError - -log = logging.getLogger(__name__) - -SEARCH_API = ( - "https://en.wikipedia.org/w/api.php" -) -WIKI_PARAMS = { - "action": "query", - "list": "search", - "prop": "info", - "inprop": "url", - "utf8": "", - "format": "json", - "origin": "*", - -} -WIKI_THUMBNAIL = ( - "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg" - "/330px-Wikipedia-logo-v2.svg.png" -) -WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)" -WIKI_SEARCH_RESULT = ( - "**[{name}]({url})**\n" - "{description}\n" -) - - -class WikipediaSearch(commands.Cog): - """Get info from wikipedia.""" - - def __init__(self, bot: Bot): - self.bot = bot - - async def wiki_request(self, channel: TextChannel, search: str) -> list[str]: - """Search wikipedia search string and return formatted first 10 pages found.""" - params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search} - async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp: - if resp.status != 200: - log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`") - raise APIError("Wikipedia API", resp.status) - - raw_data = await resp.json() - - if not raw_data.get("query"): - if error := raw_data.get("errors"): - log.error(f"There was an error while communicating with the Wikipedia API: {error}") - raise APIError("Wikipedia API", resp.status, error) - - lines = [] - if raw_data["query"]["searchinfo"]["totalhits"]: - for article in raw_data["query"]["search"]: - line = WIKI_SEARCH_RESULT.format( - name=article["title"], - description=unescape( - re.sub( - WIKI_SNIPPET_REGEX, "", article["snippet"] - ) - ), - url=f"https://en.wikipedia.org/?curid={article['pageid']}" - ) - lines.append(line) - - return lines - - @commands.cooldown(1, 10, commands.BucketType.user) - @commands.command(name="wikipedia", aliases=("wiki",)) - async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None: - """Sends paginated top 10 results of Wikipedia search..""" - contents = await self.wiki_request(ctx.channel, search) - - if contents: - embed = Embed( - title="Wikipedia Search Results", - colour=Color.blurple() - ) - embed.set_thumbnail(url=WIKI_THUMBNAIL) - embed.timestamp = datetime.utcnow() - await LinePaginator.paginate( - contents, ctx, embed - ) - else: - await ctx.send( - "Sorry, we could not find a wikipedia article using that search term." - ) - - -def setup(bot: Bot) -> None: - """Load the WikipediaSearch cog.""" - bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/evergreen/wolfram.py b/bot/exts/evergreen/wolfram.py deleted file mode 100644 index 9a26e545..00000000 --- a/bot/exts/evergreen/wolfram.py +++ /dev/null @@ -1,293 +0,0 @@ -import logging -from io import BytesIO -from typing import Callable, Optional -from urllib.parse import urlencode - -import arrow -import discord -from discord import Embed -from discord.ext import commands -from discord.ext.commands import BucketType, Cog, Context, check, group - -from bot.bot import Bot -from bot.constants import Colours, STAFF_ROLES, Wolfram -from bot.utils.pagination import ImagePaginator - -log = logging.getLogger(__name__) - -APPID = Wolfram.key -DEFAULT_OUTPUT_FORMAT = "JSON" -QUERY = "http://api.wolframalpha.com/v2/{request}" -WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" - -MAX_PODS = 20 - -# Allows for 10 wolfram calls pr user pr day -usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user) - -# Allows for max api requests / days in month per day for the entire guild (Temporary) -guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild) - - -async def send_embed( - ctx: Context, - message_txt: str, - colour: int = Colours.soft_red, - footer: str = None, - img_url: str = None, - f: discord.File = None -) -> None: - """Generate & send a response embed with Wolfram as the author.""" - embed = Embed(colour=colour) - embed.description = message_txt - embed.set_author( - name="Wolfram Alpha", - icon_url=WOLF_IMAGE, - url="https://www.wolframalpha.com/" - ) - if footer: - embed.set_footer(text=footer) - - if img_url: - embed.set_image(url=img_url) - - await ctx.send(embed=embed, file=f) - - -def custom_cooldown(*ignore: int) -> Callable: - """ - Implement per-user and per-guild cooldowns for requests to the Wolfram API. - - A list of roles may be provided to ignore the per-user cooldown. - """ - async def predicate(ctx: Context) -> bool: - if ctx.invoked_with == "help": - # if the invoked command is help we don't want to increase the ratelimits since it's not actually - # invoking the command/making a request, so instead just check if the user/guild are on cooldown. - guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown - # check the message is in a guild, and check user bucket if user is not ignored - if ctx.guild and not any(r.id in ignore for r in ctx.author.roles): - return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 - return guild_cooldown - - user_bucket = usercd.get_bucket(ctx.message) - - if all(role.id not in ignore for role in ctx.author.roles): - user_rate = user_bucket.update_rate_limit() - - if user_rate: - # Can't use api; cause: member limit - cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True) - message = ( - "You've used up your limit for Wolfram|Alpha requests.\n" - f"Cooldown: {cooldown}" - ) - await send_embed(ctx, message) - return False - - guild_bucket = guildcd.get_bucket(ctx.message) - guild_rate = guild_bucket.update_rate_limit() - - # Repr has a token attribute to read requests left - log.debug(guild_bucket) - - if guild_rate: - # Can't use api; cause: guild limit - message = ( - "The max limit of requests for the server has been reached for today.\n" - f"Cooldown: {int(guild_rate)}" - ) - await send_embed(ctx, message) - return False - - return True - - return check(predicate) - - -async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]: - """Get the Wolfram API pod pages for the provided query.""" - async with ctx.typing(): - params = { - "input": query, - "appid": APPID, - "output": DEFAULT_OUTPUT_FORMAT, - "format": "image,plaintext", - "location": "the moon", - "latlong": "0.0,0.0", - "ip": "1.1.1.1" - } - request_url = QUERY.format(request="query") - - async with bot.http_session.get(url=request_url, params=params) as response: - json = await response.json(content_type="text/plain") - - result = json["queryresult"] - log_full_url = f"{request_url}?{urlencode(params)}" - if result["error"]: - # API key not set up correctly - if result["error"]["msg"] == "Invalid appid": - message = "Wolfram API key is invalid or missing." - log.warning( - "API key seems to be missing, or invalid when " - f"processing a wolfram request: {log_full_url}, Response: {json}" - ) - await send_embed(ctx, message) - return None - - message = "Something went wrong internally with your request, please notify staff!" - log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}") - await send_embed(ctx, message) - return None - - if not result["success"]: - message = f"I couldn't find anything for {query}." - await send_embed(ctx, message) - return None - - if not result["numpods"]: - message = "Could not find any results." - await send_embed(ctx, message) - return None - - pods = result["pods"] - pages = [] - for pod in pods[:MAX_PODS]: - subs = pod.get("subpods") - - for sub in subs: - title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") - img = sub["img"]["src"] - pages.append((title, img)) - return pages - - -class Wolfram(Cog): - """Commands for interacting with the Wolfram|Alpha API.""" - - def __init__(self, bot: Bot): - self.bot = bot - - @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_command(self, ctx: Context, *, query: str) -> None: - """Requests all answers on a single image, sends an image of all related pods.""" - params = { - "i": query, - "appid": APPID, - "location": "the moon", - "latlong": "0.0,0.0", - "ip": "1.1.1.1" - } - request_url = QUERY.format(request="simple") - - # Give feedback that the bot is working. - async with ctx.typing(): - async with self.bot.http_session.get(url=request_url, params=params) as response: - status = response.status - image_bytes = await response.read() - - f = discord.File(BytesIO(image_bytes), filename="image.png") - image_url = "attachment://image.png" - - if status == 501: - message = "Failed to get response." - footer = "" - color = Colours.soft_red - elif status == 400: - message = "No input found." - footer = "" - color = Colours.soft_red - elif status == 403: - message = "Wolfram API key is invalid or missing." - footer = "" - color = Colours.soft_red - else: - message = "" - footer = "View original for a bigger picture." - color = Colours.soft_orange - - # Sends a "blank" embed if no request is received, unsure how to fix - await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) - - @wolfram_command.command(name="page", aliases=("pa", "p")) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: - """ - Requests a drawn image of given query. - - Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. - """ - pages = await get_pod_pages(ctx, self.bot, query) - - if not pages: - return - - embed = Embed() - embed.set_author( - name="Wolfram Alpha", - icon_url=WOLF_IMAGE, - url="https://www.wolframalpha.com/" - ) - embed.colour = Colours.soft_orange - - await ImagePaginator.paginate(pages, ctx, embed) - - @wolfram_command.command(name="cut", aliases=("c",)) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: - """ - Requests a drawn image of given query. - - Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. - """ - pages = await get_pod_pages(ctx, self.bot, query) - - if not pages: - return - - if len(pages) >= 2: - page = pages[1] - else: - page = pages[0] - - await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) - - @wolfram_command.command(name="short", aliases=("sh", "s")) - @custom_cooldown(*STAFF_ROLES) - async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: - """Requests an answer to a simple question.""" - params = { - "i": query, - "appid": APPID, - "location": "the moon", - "latlong": "0.0,0.0", - "ip": "1.1.1.1" - } - request_url = QUERY.format(request="result") - - # Give feedback that the bot is working. - async with ctx.typing(): - async with self.bot.http_session.get(url=request_url, params=params) as response: - status = response.status - response_text = await response.text() - - if status == 501: - message = "Failed to get response." - color = Colours.soft_red - elif status == 400: - message = "No input found." - color = Colours.soft_red - elif response_text == "Error 1: Invalid appid.": - message = "Wolfram API key is invalid or missing." - color = Colours.soft_red - else: - message = response_text - color = Colours.soft_orange - - await send_embed(ctx, message, color) - - -def setup(bot: Bot) -> None: - """Load the Wolfram cog.""" - bot.add_cog(Wolfram(bot)) |