aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/utilities
diff options
context:
space:
mode:
authorGravatar brad90four <[email protected]>2021-09-07 07:34:57 -0400
committerGravatar brad90four <[email protected]>2021-09-07 07:34:57 -0400
commit1e711f3e04c6e6adb66159ffbdc4f72740b62fd7 (patch)
treeaf1eae3a95b557ee27cf7577b67b8fecf90b1752 /bot/exts/utilities
parentAdded ryanzec_colours.json constructed from ryanzec/name-that-color (diff)
parentContinue work in progress (diff)
Fixing merge conflicts
Diffstat (limited to 'bot/exts/utilities')
-rw-r--r--bot/exts/utilities/__init__.py0
-rw-r--r--bot/exts/utilities/bookmark.py153
-rw-r--r--bot/exts/utilities/cheatsheet.py112
-rw-r--r--bot/exts/utilities/color.py97
-rw-r--r--bot/exts/utilities/conversationstarters.py69
-rw-r--r--bot/exts/utilities/emoji.py123
-rw-r--r--bot/exts/utilities/githubinfo.py178
-rw-r--r--bot/exts/utilities/issues.py275
-rw-r--r--bot/exts/utilities/latex.py101
-rw-r--r--bot/exts/utilities/pythonfacts.py36
-rw-r--r--bot/exts/utilities/realpython.py81
-rw-r--r--bot/exts/utilities/reddit.py368
-rw-r--r--bot/exts/utilities/stackoverflow.py88
-rw-r--r--bot/exts/utilities/timed.py48
-rw-r--r--bot/exts/utilities/wikipedia.py100
-rw-r--r--bot/exts/utilities/wolfram.py293
16 files changed, 2122 insertions, 0 deletions
diff --git a/bot/exts/utilities/__init__.py b/bot/exts/utilities/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/utilities/__init__.py
diff --git a/bot/exts/utilities/bookmark.py b/bot/exts/utilities/bookmark.py
new file mode 100644
index 00000000..a91ef1c0
--- /dev/null
+++ b/bot/exts/utilities/bookmark.py
@@ -0,0 +1,153 @@
+import asyncio
+import logging
+import random
+from typing import Optional
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS
+from bot.utils.converters import WrappedMessageConverter
+from bot.utils.decorators import whitelist_override
+
+log = logging.getLogger(__name__)
+
+# Number of seconds to wait for other users to bookmark the same message
+TIMEOUT = 120
+BOOKMARK_EMOJI = "📌"
+WHITELISTED_CATEGORIES = (Categories.help_in_use,)
+
+
+class Bookmark(commands.Cog):
+ """Creates personal bookmarks by relaying a message link to the user's DMs."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed:
+ """Build the embed to DM the bookmark requester."""
+ embed = discord.Embed(
+ title=title,
+ description=target_message.content,
+ colour=Colours.soft_green
+ )
+ embed.add_field(
+ name="Wanna give it a visit?",
+ value=f"[Visit original message]({target_message.jump_url})"
+ )
+ embed.set_author(name=target_message.author, icon_url=target_message.author.display_avatar.url)
+ embed.set_thumbnail(url=Icons.bookmark)
+
+ return embed
+
+ @staticmethod
+ def build_error_embed(user: discord.Member) -> discord.Embed:
+ """Builds an error embed for when a bookmark requester has DMs disabled."""
+ return discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ description=f"{user.mention}, please enable your DMs to receive the bookmark.",
+ colour=Colours.soft_red
+ )
+
+ async def action_bookmark(
+ self,
+ channel: discord.TextChannel,
+ user: discord.Member,
+ target_message: discord.Message,
+ title: str
+ ) -> None:
+ """Sends the bookmark DM, or sends an error embed when a user bookmarks a message."""
+ try:
+ embed = self.build_bookmark_dm(target_message, title)
+ await user.send(embed=embed)
+ except discord.Forbidden:
+ error_embed = self.build_error_embed(user)
+ await channel.send(embed=error_embed)
+ else:
+ log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'")
+
+ @staticmethod
+ async def send_reaction_embed(
+ channel: discord.TextChannel,
+ target_message: discord.Message
+ ) -> discord.Message:
+ """Sends an embed, with a reaction, so users can react to bookmark the message too."""
+ message = await channel.send(
+ embed=discord.Embed(
+ description=(
+ f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to "
+ f"[this message]({target_message.jump_url})."
+ ),
+ colour=Colours.soft_green
+ )
+ )
+
+ await message.add_reaction(BOOKMARK_EMOJI)
+ return message
+
+ @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
+ @commands.command(name="bookmark", aliases=("bm", "pin"))
+ async def bookmark(
+ self,
+ ctx: commands.Context,
+ target_message: Optional[WrappedMessageConverter],
+ *,
+ title: str = "Bookmark"
+ ) -> None:
+ """Send the author a link to `target_message` via DMs."""
+ if not target_message:
+ if not ctx.message.reference:
+ raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.")
+ target_message = ctx.message.reference.resolved
+
+ # Prevent users from bookmarking a message in a channel they don't have access to
+ permissions = target_message.channel.permissions_for(ctx.author)
+ if not permissions.read_messages:
+ log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.")
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description="You don't have permission to view this channel."
+ )
+ await ctx.send(embed=embed)
+ return
+
+ def event_check(reaction: discord.Reaction, user: discord.Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
+ return (
+ # Conditions for a successful pagination:
+ all((
+ # Reaction is on this message
+ reaction.message.id == reaction_message.id,
+ # User has not already bookmarked this message
+ user.id not in bookmarked_users,
+ # Reaction is the `BOOKMARK_EMOJI` emoji
+ str(reaction.emoji) == BOOKMARK_EMOJI,
+ # Reaction was not made by the Bot
+ user.id != self.bot.user.id
+ ))
+ )
+ await self.action_bookmark(ctx.channel, ctx.author, target_message, title)
+
+ # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs
+ bookmarked_users = [ctx.author.id]
+ reaction_message = await self.send_reaction_embed(ctx.channel, target_message)
+
+ while True:
+ try:
+ _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check)
+ except asyncio.TimeoutError:
+ log.debug("Timed out waiting for a reaction")
+ break
+ log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.")
+ await self.action_bookmark(ctx.channel, user, target_message, title)
+ bookmarked_users.append(user.id)
+
+ await reaction_message.delete()
+
+
+def setup(bot: Bot) -> None:
+ """Load the Bookmark cog."""
+ bot.add_cog(Bookmark(bot))
diff --git a/bot/exts/utilities/cheatsheet.py b/bot/exts/utilities/cheatsheet.py
new file mode 100644
index 00000000..33d29f67
--- /dev/null
+++ b/bot/exts/utilities/cheatsheet.py
@@ -0,0 +1,112 @@
+import random
+import re
+from typing import Union
+from urllib.parse import quote_plus
+
+from discord import Embed
+from discord.ext import commands
+from discord.ext.commands import BucketType, Context
+
+from bot import constants
+from bot.bot import Bot
+from bot.constants import Categories, Channels, Colours, ERROR_REPLIES
+from bot.utils.decorators import whitelist_override
+
+ERROR_MESSAGE = f"""
+Unknown cheat sheet. Please try to reformulate your query.
+
+**Examples**:
+```md
+{constants.Client.prefix}cht read json
+{constants.Client.prefix}cht hello world
+{constants.Client.prefix}cht lambda
+```
+If the problem persists send a message in <#{Channels.dev_contrib}>
+"""
+
+URL = "https://cheat.sh/python/{search}"
+ESCAPE_TT = str.maketrans({"`": "\\`"})
+ANSI_RE = re.compile(r"\x1b\[.*?m")
+# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html.
+HEADERS = {"User-Agent": "curl/7.68.0"}
+
+
+class CheatSheet(commands.Cog):
+ """Commands that sends a result of a cht.sh search in code blocks."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @staticmethod
+ def fmt_error_embed() -> Embed:
+ """
+ Format the Error Embed.
+
+ If the cht.sh search returned 404, overwrite it to send a custom error embed.
+ link -> https://github.com/chubin/cheat.sh/issues/198
+ """
+ embed = Embed(
+ title=random.choice(ERROR_REPLIES),
+ description=ERROR_MESSAGE,
+ colour=Colours.soft_red
+ )
+ return embed
+
+ def result_fmt(self, url: str, body_text: str) -> tuple[bool, Union[str, Embed]]:
+ """Format Result."""
+ if body_text.startswith("# 404 NOT FOUND"):
+ embed = self.fmt_error_embed()
+ return True, embed
+
+ body_space = min(1986 - len(url), 1000)
+
+ if len(body_text) > body_space:
+ description = (
+ f"**Result Of cht.sh**\n"
+ f"```python\n{body_text[:body_space]}\n"
+ f"... (truncated - too many lines)\n```\n"
+ f"Full results: {url} "
+ )
+ else:
+ description = (
+ f"**Result Of cht.sh**\n"
+ f"```python\n{body_text}\n```\n"
+ f"{url}"
+ )
+ return False, description
+
+ @commands.command(
+ name="cheat",
+ aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"),
+ )
+ @commands.cooldown(1, 10, BucketType.user)
+ @whitelist_override(categories=[Categories.help_in_use])
+ async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None:
+ """
+ Search cheat.sh.
+
+ Gets a post from https://cheat.sh/python/ by default.
+ Usage:
+ --> .cht read json
+ """
+ async with ctx.typing():
+ search_string = quote_plus(" ".join(search_terms))
+
+ async with self.bot.http_session.get(
+ URL.format(search=search_string), headers=HEADERS
+ ) as response:
+ result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT)
+
+ is_embed, description = self.result_fmt(
+ URL.format(search=search_string),
+ result
+ )
+ if is_embed:
+ await ctx.send(embed=description)
+ else:
+ await ctx.send(content=description)
+
+
+def setup(bot: Bot) -> None:
+ """Load the CheatSheet cog."""
+ bot.add_cog(CheatSheet(bot))
diff --git a/bot/exts/utilities/color.py b/bot/exts/utilities/color.py
new file mode 100644
index 00000000..1a4f7031
--- /dev/null
+++ b/bot/exts/utilities/color.py
@@ -0,0 +1,97 @@
+# imports
+import colorsys
+import logging
+import re
+from io import BytesIO
+
+from PIL import Image, ImageColor
+from discord import Embed, File
+from discord.ext import commands
+from rapidfuzz import process
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+
+logger = logging.getLogger(__name__)
+
+
+ERROR_MSG = """The color code {user_color} is not a possible color combination.
+\nThe range of possible values are:
+\nRGB & HSV: 0-255
+\nCMYK: 0-100%
+\nHSL: 0-360 degrees
+\nHex: #000000-#FFFFFF
+"""
+
+
+# define color command
+class Color(commands.Cog):
+ """User initiated command to receive color information."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @commands.command(aliases=["colour"])
+ @commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
+ async def color(self, ctx: commands.Context, *, user_color: str) -> None:
+ """Send information on input color code or color name."""
+ # need to check if user_color is RGB, HSV, CMYK, HSL, Hex or color name
+ # should we assume the color is RGB if not defined?
+
+ if "#" in user_color:
+ logger.info(f"{user_color = }")
+ hex_match = re.search(r"^#(?:[0-9a-fA-F]{3}){1,2}$", user_color)
+ if hex_match:
+ hex_color = int(hex(int(user_color.replace("#", ""), 16)), 0)
+ logger.info(f"{hex_color = }")
+ rgb_color = ImageColor.getcolor(user_color, "RGB")
+ else:
+ await ctx.send(embed=Embed(
+ title="An error has occured.",
+ description=ERROR_MSG.format(user_color=user_color)
+ )
+ )
+
+ elif "RGB" or "rgb" in user_color:
+ rgb_parse = user_color.split()
+ rgb = rgb_parse[1:].replace(", ", "")
+ logger.info(f"{rgb = }")
+ logger.info(f"{rgb[0] = }")
+ logger.info(f"{rgb[1] = }")
+ logger.info(f"{rgb[2] = }")
+ rgb_color = tuple(rgb)
+ hex_color = f"0x{int(rgb[0]):02x}{int(rgb[1]):02x}{int(rgb[2]):02x}"
+
+ main_embed = Embed(
+ title=user_color,
+ color=hex_color,
+ )
+ async with ctx.typing():
+ file = await self._create_thumbnail_attachment(rgb_color)
+ main_embed.set_thumbnail(url="attachment://color.png")
+
+ await ctx.send(file=file, embed=main_embed)
+
+ async def _create_thumbnail_attachment(self, color: str) -> File:
+ """Generate a thumbnail from `color`."""
+
+ thumbnail = Image.new("RGB", (100, 100), color=color)
+ bufferedio = BytesIO()
+ thumbnail.save(bufferedio, format="PNG")
+ bufferedio.seek(0)
+
+ file = File(bufferedio, filename="color.png")
+
+ return file
+
+
+ # if user_color in color_lists:
+ # # TODO fuzzy match for color
+ # pass
+
+
+
+def setup(bot: Bot) -> None:
+ """Load the Color Cog."""
+ bot.add_cog(Color(bot))
diff --git a/bot/exts/utilities/conversationstarters.py b/bot/exts/utilities/conversationstarters.py
new file mode 100644
index 00000000..dd537022
--- /dev/null
+++ b/bot/exts/utilities/conversationstarters.py
@@ -0,0 +1,69 @@
+from pathlib import Path
+
+import yaml
+from discord import Color, Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import WHITELISTED_CHANNELS
+from bot.utils.decorators import whitelist_override
+from bot.utils.randomization import RandomCycle
+
+SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9"
+
+with Path("bot/resources/utilities/starter.yaml").open("r", encoding="utf8") as f:
+ STARTERS = yaml.load(f, Loader=yaml.FullLoader)
+
+with Path("bot/resources/utilities/py_topics.yaml").open("r", encoding="utf8") as f:
+ # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help.
+ PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader)
+
+ # Removing `None` from lists of topics, if not a list, it is changed to an empty one.
+ PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()}
+
+ # All the allowed channels that the ".topic" command is allowed to be executed in.
+ ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS)
+
+# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions.
+ALL_TOPICS = {"default": STARTERS, **PY_TOPICS}
+TOPICS = {
+ channel: RandomCycle(topics or ["No topics found for this channel."])
+ for channel, topics in ALL_TOPICS.items()
+}
+
+
+class ConvoStarters(commands.Cog):
+ """General conversation topics."""
+
+ @commands.command()
+ @whitelist_override(channels=ALL_ALLOWED_CHANNELS)
+ async def topic(self, ctx: commands.Context) -> None:
+ """
+ Responds with a random topic to start a conversation.
+
+ If in a Python channel, a python-related topic will be given.
+
+ Otherwise, a random conversation topic will be received by the user.
+ """
+ # No matter what, the form will be shown.
+ embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple())
+
+ try:
+ # Fetching topics.
+ channel_topics = TOPICS[ctx.channel.id]
+
+ # If the channel isn't Python-related.
+ except KeyError:
+ embed.title = f"**{next(TOPICS['default'])}**"
+
+ # If the channel ID doesn't have any topics.
+ else:
+ embed.title = f"**{next(channel_topics)}**"
+
+ finally:
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the ConvoStarters cog."""
+ bot.add_cog(ConvoStarters())
diff --git a/bot/exts/utilities/emoji.py b/bot/exts/utilities/emoji.py
new file mode 100644
index 00000000..55d6b8e9
--- /dev/null
+++ b/bot/exts/utilities/emoji.py
@@ -0,0 +1,123 @@
+import logging
+import random
+import textwrap
+from collections import defaultdict
+from datetime import datetime
+from typing import Optional
+
+from discord import Color, Embed, Emoji
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, ERROR_REPLIES
+from bot.utils.extensions import invoke_help_command
+from bot.utils.pagination import LinePaginator
+from bot.utils.time import time_since
+
+log = logging.getLogger(__name__)
+
+
+class Emojis(commands.Cog):
+ """A collection of commands related to emojis in the server."""
+
+ @staticmethod
+ def embed_builder(emoji: dict) -> tuple[Embed, list[str]]:
+ """Generates an embed with the emoji names and count."""
+ embed = Embed(
+ color=Colours.orange,
+ title="Emoji Count",
+ timestamp=datetime.utcnow()
+ )
+ msg = []
+
+ if len(emoji) == 1:
+ for category_name, category_emojis in emoji.items():
+ if len(category_emojis) == 1:
+ msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.")
+ else:
+ msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.")
+ embed.set_thumbnail(url=random.choice(category_emojis).url)
+
+ else:
+ for category_name, category_emojis in emoji.items():
+ emoji_choice = random.choice(category_emojis)
+ if len(category_emojis) > 1:
+ emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category."
+ else:
+ emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category."
+ if emoji_choice.animated:
+ msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}")
+ else:
+ msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}")
+ return embed, msg
+
+ @staticmethod
+ def generate_invalid_embed(emojis: list[Emoji]) -> tuple[Embed, list[str]]:
+ """Generates error embed for invalid emoji categories."""
+ embed = Embed(
+ color=Colours.soft_red,
+ title=random.choice(ERROR_REPLIES)
+ )
+ msg = []
+
+ emoji_dict = defaultdict(list)
+ for emoji in emojis:
+ emoji_dict[emoji.name.split("_")[0]].append(emoji)
+
+ error_comp = ", ".join(emoji_dict)
+ msg.append(f"These are the valid emoji categories:\n```\n{error_comp}\n```")
+ return embed, msg
+
+ @commands.group(name="emoji", invoke_without_command=True)
+ async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None:
+ """A group of commands related to emojis."""
+ if emoji is not None:
+ await ctx.invoke(self.info_command, emoji)
+ else:
+ await invoke_help_command(ctx)
+
+ @emoji_group.command(name="count", aliases=("c",))
+ async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None:
+ """Returns embed with emoji category and info given by the user."""
+ emoji_dict = defaultdict(list)
+
+ if not ctx.guild.emojis:
+ await ctx.send("No emojis found.")
+ return
+ log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.")
+ for emoji in ctx.guild.emojis:
+ emoji_category = emoji.name.split("_")[0]
+
+ if category_query is not None and emoji_category not in category_query:
+ continue
+
+ emoji_dict[emoji_category].append(emoji)
+
+ if not emoji_dict:
+ log.trace("Invalid name provided by the user")
+ embed, msg = self.generate_invalid_embed(ctx.guild.emojis)
+ else:
+ embed, msg = self.embed_builder(emoji_dict)
+ await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed)
+
+ @emoji_group.command(name="info", aliases=("i",))
+ async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None:
+ """Returns relevant information about a Discord Emoji."""
+ emoji_information = Embed(
+ title=f"Emoji Information: {emoji.name}",
+ description=textwrap.dedent(f"""
+ **Name:** {emoji.name}
+ **Created:** {time_since(emoji.created_at, precision="hours")}
+ **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")}
+ **ID:** {emoji.id}
+ """),
+ color=Color.blurple(),
+ url=str(emoji.url),
+ ).set_thumbnail(url=emoji.url)
+
+ await ctx.send(embed=emoji_information)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Emojis cog."""
+ bot.add_cog(Emojis())
diff --git a/bot/exts/utilities/githubinfo.py b/bot/exts/utilities/githubinfo.py
new file mode 100644
index 00000000..d00b408d
--- /dev/null
+++ b/bot/exts/utilities/githubinfo.py
@@ -0,0 +1,178 @@
+import logging
+import random
+from datetime import datetime
+from urllib.parse import quote, quote_plus
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, NEGATIVE_REPLIES
+from bot.exts.core.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+
+GITHUB_API_URL = "https://api.github.com"
+
+
+class GithubInfo(commands.Cog):
+ """Fetches info from GitHub."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ async def fetch_data(self, url: str) -> dict:
+ """Retrieve data as a dictionary."""
+ async with self.bot.http_session.get(url) as r:
+ return await r.json()
+
+ @commands.group(name="github", aliases=("gh", "git"))
+ @commands.cooldown(1, 10, commands.BucketType.user)
+ async def github_group(self, ctx: commands.Context) -> None:
+ """Commands for finding information related to GitHub."""
+ if ctx.invoked_subcommand is None:
+ await invoke_help_command(ctx)
+
+ @github_group.command(name="user", aliases=("userinfo",))
+ async def github_user_info(self, ctx: commands.Context, username: str) -> None:
+ """Fetches a user's GitHub information."""
+ async with ctx.typing():
+ user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
+
+ # User_data will not have a message key if the user exists
+ if "message" in user_data:
+ embed = discord.Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description=f"The profile for `{username}` was not found.",
+ colour=Colours.soft_red
+ )
+
+ await ctx.send(embed=embed)
+ return
+
+ org_data = await self.fetch_data(user_data["organizations_url"])
+ orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
+ orgs_to_add = " | ".join(orgs)
+
+ gists = user_data["public_gists"]
+
+ # Forming blog link
+ if user_data["blog"].startswith("http"): # Blog link is complete
+ blog = user_data["blog"]
+ elif user_data["blog"]: # Blog exists but the link is not complete
+ blog = f"https://{user_data['blog']}"
+ else:
+ blog = "No website link available"
+
+ embed = discord.Embed(
+ title=f"`{user_data['login']}`'s GitHub profile info",
+ description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "",
+ colour=discord.Colour.blurple(),
+ url=user_data["html_url"],
+ timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ")
+ )
+ embed.set_thumbnail(url=user_data["avatar_url"])
+ embed.set_footer(text="Account created at")
+
+ if user_data["type"] == "User":
+
+ embed.add_field(
+ name="Followers",
+ value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)"
+ )
+ embed.add_field(
+ name="Following",
+ value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)"
+ )
+
+ embed.add_field(
+ name="Public repos",
+ value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)"
+ )
+
+ if user_data["type"] == "User":
+ embed.add_field(
+ name="Gists",
+ value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
+ )
+
+ embed.add_field(
+ name=f"Organization{'s' if len(orgs)!=1 else ''}",
+ value=orgs_to_add if orgs else "No organizations."
+ )
+ embed.add_field(name="Website", value=blog)
+
+ await ctx.send(embed=embed)
+
+ @github_group.command(name='repository', aliases=('repo',))
+ async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None:
+ """
+ Fetches a repositories' GitHub information.
+
+ The repository should look like `user/reponame` or `user reponame`.
+ """
+ repo = "/".join(repo)
+ if repo.count("/") != 1:
+ embed = discord.Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description="The repository should look like `user/reponame` or `user reponame`.",
+ colour=Colours.soft_red
+ )
+
+ await ctx.send(embed=embed)
+ return
+
+ async with ctx.typing():
+ repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
+
+ # There won't be a message key if this repo exists
+ if "message" in repo_data:
+ embed = discord.Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description="The requested repository was not found.",
+ colour=Colours.soft_red
+ )
+
+ await ctx.send(embed=embed)
+ return
+
+ embed = discord.Embed(
+ title=repo_data["name"],
+ description=repo_data["description"],
+ colour=discord.Colour.blurple(),
+ url=repo_data["html_url"]
+ )
+
+ # If it's a fork, then it will have a parent key
+ try:
+ parent = repo_data["parent"]
+ embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})"
+ except KeyError:
+ log.debug("Repository is not a fork.")
+
+ repo_owner = repo_data["owner"]
+
+ embed.set_author(
+ name=repo_owner["login"],
+ url=repo_owner["html_url"],
+ icon_url=repo_owner["avatar_url"]
+ )
+
+ repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y")
+ last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M")
+
+ embed.set_footer(
+ text=(
+ f"{repo_data['forks_count']} ⑂ "
+ f"• {repo_data['stargazers_count']} ⭐ "
+ f"• Created At {repo_created_at} "
+ f"• Last Commit {last_pushed}"
+ )
+ )
+
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the GithubInfo cog."""
+ bot.add_cog(GithubInfo(bot))
diff --git a/bot/exts/utilities/issues.py b/bot/exts/utilities/issues.py
new file mode 100644
index 00000000..8a7ebed0
--- /dev/null
+++ b/bot/exts/utilities/issues.py
@@ -0,0 +1,275 @@
+import logging
+import random
+import re
+from dataclasses import dataclass
+from typing import Optional, Union
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import (
+ Categories,
+ Channels,
+ Colours,
+ ERROR_REPLIES,
+ Emojis,
+ NEGATIVE_REPLIES,
+ Tokens,
+ WHITELISTED_CHANNELS
+)
+from bot.utils.decorators import whitelist_override
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+
+BAD_RESPONSE = {
+ 404: "Issue/pull request not located! Please enter a valid number!",
+ 403: "Rate limit has been hit! Please try again later!"
+}
+REQUEST_HEADERS = {
+ "Accept": "application/vnd.github.v3+json"
+}
+
+REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
+ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
+PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
+
+if GITHUB_TOKEN := Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
+
+WHITELISTED_CATEGORIES = (
+ Categories.development, Categories.devprojects, Categories.media, Categories.staff
+)
+
+CODE_BLOCK_RE = re.compile(
+ r"^`([^`\n]+)`" # Inline codeblock
+ r"|```(.+?)```", # Multiline codeblock
+ re.DOTALL | re.MULTILINE
+)
+
+# Maximum number of issues in one message
+MAXIMUM_ISSUES = 5
+
+# Regex used when looking for automatic linking in messages
+# regex101 of current regex https://regex101.com/r/V2ji8M/6
+AUTOMATIC_REGEX = re.compile(
+ r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
+)
+
+
+@dataclass
+class FoundIssue:
+ """Dataclass representing an issue found by the regex."""
+
+ organisation: Optional[str]
+ repository: str
+ number: str
+
+ def __hash__(self) -> int:
+ return hash((self.organisation, self.repository, self.number))
+
+
+@dataclass
+class FetchError:
+ """Dataclass representing an error while fetching an issue."""
+
+ return_code: int
+ message: str
+
+
+@dataclass
+class IssueState:
+ """Dataclass representing the state of an issue."""
+
+ repository: str
+ number: int
+ url: str
+ title: str
+ emoji: str
+
+
+class Issues(commands.Cog):
+ """Cog that allows users to retrieve issues from GitHub."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.repos = []
+
+ @staticmethod
+ def remove_codeblocks(message: str) -> str:
+ """Remove any codeblock in a message."""
+ return re.sub(CODE_BLOCK_RE, "", message)
+
+ async def fetch_issues(
+ self,
+ number: int,
+ repository: str,
+ user: str
+ ) -> Union[IssueState, FetchError]:
+ """
+ Retrieve an issue from a GitHub repository.
+
+ Returns IssueState on success, FetchError on failure.
+ """
+ url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
+ pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
+ log.trace(f"Querying GH issues API: {url}")
+
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
+ json_data = await r.json()
+
+ if r.status == 403:
+ if r.headers.get("X-RateLimit-Remaining") == "0":
+ log.info(f"Ratelimit reached while fetching {url}")
+ return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
+ return FetchError(403, "Cannot access issue.")
+ elif r.status in (404, 410):
+ return FetchError(r.status, "Issue not found.")
+ elif r.status != 200:
+ return FetchError(r.status, "Error while fetching issue.")
+
+ # The initial API request is made to the issues API endpoint, which will return information
+ # if the issue or PR is present. However, the scope of information returned for PRs differs
+ # from issues: if the 'issues' key is present in the response then we can pull the data we
+ # need from the initial API call.
+ if "issues" in json_data["html_url"]:
+ if json_data.get("state") == "open":
+ emoji = Emojis.issue_open
+ else:
+ emoji = Emojis.issue_closed
+
+ # If the 'issues' key is not contained in the API response and there is no error code, then
+ # we know that a PR has been requested and a call to the pulls API endpoint is necessary
+ # to get the desired information for the PR.
+ else:
+ log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
+ async with self.bot.http_session.get(pulls_url) as p:
+ pull_data = await p.json()
+ if pull_data["draft"]:
+ emoji = Emojis.pull_request_draft
+ elif pull_data["state"] == "open":
+ emoji = Emojis.pull_request_open
+ # When 'merged_at' is not None, this means that the state of the PR is merged
+ elif pull_data["merged_at"] is not None:
+ emoji = Emojis.pull_request_merged
+ else:
+ emoji = Emojis.pull_request_closed
+
+ issue_url = json_data.get("html_url")
+
+ return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
+
+ @staticmethod
+ def format_embed(
+ results: list[Union[IssueState, FetchError]],
+ user: str,
+ repository: Optional[str] = None
+ ) -> discord.Embed:
+ """Take a list of IssueState or FetchError and format a Discord embed for them."""
+ description_list = []
+
+ for result in results:
+ if isinstance(result, IssueState):
+ description_list.append(f"{result.emoji} [{result.title}]({result.url})")
+ elif isinstance(result, FetchError):
+ description_list.append(f":x: [{result.return_code}] {result.message}")
+
+ resp = discord.Embed(
+ colour=Colours.bright_green,
+ description="\n".join(description_list)
+ )
+
+ embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
+ resp.set_author(name="GitHub", url=embed_url)
+ return resp
+
+ @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
+ @commands.command(aliases=("pr",))
+ async def issue(
+ self,
+ ctx: commands.Context,
+ numbers: commands.Greedy[int],
+ repository: str = "sir-lancebot",
+ user: str = "python-discord"
+ ) -> None:
+ """Command to retrieve issue(s) from a GitHub repository."""
+ # Remove duplicates
+ numbers = set(numbers)
+
+ if len(numbers) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await ctx.send(embed=embed)
+ await invoke_help_command(ctx)
+
+ results = [await self.fetch_issues(number, repository, user) for number in numbers]
+ await ctx.send(embed=self.format_embed(results, user, repository))
+
+ @commands.Cog.listener()
+ async def on_message(self, message: discord.Message) -> None:
+ """
+ Automatic issue linking.
+
+ Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
+ """
+ # Ignore bots
+ if message.author.bot:
+ return
+
+ issues = [
+ FoundIssue(*match.group("org", "repo", "number"))
+ for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
+ ]
+ links = []
+
+ if issues:
+ # Block this from working in DMs
+ if not message.guild:
+ await message.channel.send(
+ embed=discord.Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description=(
+ "You can't retrieve issues from DMs. "
+ f"Try again in <#{Channels.community_bot_commands}>"
+ ),
+ colour=Colours.soft_red
+ )
+ )
+ return
+
+ log.trace(f"Found {issues = }")
+ # Remove duplicates
+ issues = set(issues)
+
+ if len(issues) > MAXIMUM_ISSUES:
+ embed = discord.Embed(
+ title=random.choice(ERROR_REPLIES),
+ color=Colours.soft_red,
+ description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
+ )
+ await message.channel.send(embed=embed, delete_after=5)
+ return
+
+ for repo_issue in issues:
+ result = await self.fetch_issues(
+ int(repo_issue.number),
+ repo_issue.repository,
+ repo_issue.organisation or "python-discord"
+ )
+ if isinstance(result, IssueState):
+ links.append(result)
+
+ if not links:
+ return
+
+ resp = self.format_embed(links, "python-discord")
+ await message.channel.send(embed=resp)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Issues cog."""
+ bot.add_cog(Issues(bot))
diff --git a/bot/exts/utilities/latex.py b/bot/exts/utilities/latex.py
new file mode 100644
index 00000000..36c7e0ab
--- /dev/null
+++ b/bot/exts/utilities/latex.py
@@ -0,0 +1,101 @@
+import asyncio
+import hashlib
+import pathlib
+import re
+from concurrent.futures import ThreadPoolExecutor
+from io import BytesIO
+
+import discord
+import matplotlib.pyplot as plt
+from discord.ext import commands
+
+from bot.bot import Bot
+
+# configure fonts and colors for matplotlib
+plt.rcParams.update(
+ {
+ "font.size": 16,
+ "mathtext.fontset": "cm", # Computer Modern font set
+ "mathtext.rm": "serif",
+ "figure.facecolor": "36393F", # matches Discord's dark mode background color
+ "text.color": "white",
+ }
+)
+
+FORMATTED_CODE_REGEX = re.compile(
+ r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
+ r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
+ r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
+ r"(?P<code>.*?)" # extract all code inside the markup
+ r"\s*" # any more whitespace before the end of the code markup
+ r"(?P=delim)", # match the exact same delimiter from the start again
+ re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
+)
+
+CACHE_DIRECTORY = pathlib.Path("_latex_cache")
+CACHE_DIRECTORY.mkdir(exist_ok=True)
+
+
+class Latex(commands.Cog):
+ """Renders latex."""
+
+ @staticmethod
+ def _render(text: str, filepath: pathlib.Path) -> BytesIO:
+ """
+ Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception.
+
+ Saves rendered image to cache.
+ """
+ fig = plt.figure()
+ rendered_image = BytesIO()
+ fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top")
+
+ try:
+ plt.savefig(rendered_image, bbox_inches="tight", dpi=600)
+ except ValueError as e:
+ raise commands.BadArgument(str(e))
+
+ rendered_image.seek(0)
+
+ with open(filepath, "wb") as f:
+ f.write(rendered_image.getbuffer())
+
+ return rendered_image
+
+ @staticmethod
+ def _prepare_input(text: str) -> str:
+ text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\
+
+ if match := FORMATTED_CODE_REGEX.match(text):
+ return match.group("code")
+ else:
+ return text
+
+ @commands.command()
+ @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
+ async def latex(self, ctx: commands.Context, *, text: str) -> None:
+ """Renders the text in latex and sends the image."""
+ text = self._prepare_input(text)
+ query_hash = hashlib.md5(text.encode()).hexdigest()
+ image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png")
+ async with ctx.typing():
+ if image_path.exists():
+ await ctx.send(file=discord.File(image_path))
+ return
+
+ with ThreadPoolExecutor() as pool:
+ image = await asyncio.get_running_loop().run_in_executor(
+ pool, self._render, text, image_path
+ )
+
+ await ctx.send(file=discord.File(image, "latex.png"))
+
+
+def setup(bot: Bot) -> None:
+ """Load the Latex Cog."""
+ # As we have resource issues on this cog,
+ # we have it currently disabled while we fix it.
+ import logging
+ logging.info("Latex cog is currently disabled. It won't be loaded.")
+ return
+ bot.add_cog(Latex())
diff --git a/bot/exts/utilities/pythonfacts.py b/bot/exts/utilities/pythonfacts.py
new file mode 100644
index 00000000..ef190185
--- /dev/null
+++ b/bot/exts/utilities/pythonfacts.py
@@ -0,0 +1,36 @@
+import itertools
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+with open("bot/resources/utilities/python_facts.txt") as file:
+ FACTS = itertools.cycle(list(file))
+
+COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow])
+PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93"
+
+
+class PythonFacts(commands.Cog):
+ """Sends a random fun fact about Python."""
+
+ @commands.command(name="pythonfact", aliases=("pyfact",))
+ async def get_python_fact(self, ctx: commands.Context) -> None:
+ """Sends a Random fun fact about Python."""
+ embed = discord.Embed(
+ title="Python Facts",
+ description=next(FACTS),
+ colour=next(COLORS)
+ )
+ embed.add_field(
+ name="Suggestions",
+ value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})"
+ )
+ await ctx.send(embed=embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the PythonFacts Cog."""
+ bot.add_cog(PythonFacts())
diff --git a/bot/exts/utilities/realpython.py b/bot/exts/utilities/realpython.py
new file mode 100644
index 00000000..ef8b2638
--- /dev/null
+++ b/bot/exts/utilities/realpython.py
@@ -0,0 +1,81 @@
+import logging
+from html import unescape
+from urllib.parse import quote_plus
+
+from discord import Embed
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours
+
+logger = logging.getLogger(__name__)
+
+
+API_ROOT = "https://realpython.com/search/api/v1/"
+ARTICLE_URL = "https://realpython.com{article_url}"
+SEARCH_URL = "https://realpython.com/search?q={user_search}"
+
+
+ERROR_EMBED = Embed(
+ title="Error while searching Real Python",
+ description="There was an error while trying to reach Real Python. Please try again shortly.",
+ color=Colours.soft_red,
+)
+
+
+class RealPython(commands.Cog):
+ """User initiated command to search for a Real Python article."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @commands.command(aliases=["rp"])
+ @commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
+ async def realpython(self, ctx: commands.Context, *, user_search: str) -> None:
+ """Send 5 articles that match the user's search terms."""
+ params = {"q": user_search, "limit": 5, "kind": "article"}
+ async with self.bot.http_session.get(url=API_ROOT, params=params) as response:
+ if response.status != 200:
+ logger.error(
+ f"Unexpected status code {response.status} from Real Python"
+ )
+ await ctx.send(embed=ERROR_EMBED)
+ return
+
+ data = await response.json()
+
+ articles = data["results"]
+
+ if len(articles) == 0:
+ no_articles = Embed(
+ title=f"No articles found for '{user_search}'", color=Colours.soft_red
+ )
+ await ctx.send(embed=no_articles)
+ return
+
+ if len(articles) == 1:
+ article_description = "Here is the result:"
+ else:
+ article_description = f"Here are the top {len(articles)} results:"
+
+ article_embed = Embed(
+ title="Search results - Real Python",
+ url=SEARCH_URL.format(user_search=quote_plus(user_search)),
+ description=article_description,
+ color=Colours.orange,
+ )
+
+ for article in articles:
+ article_embed.add_field(
+ name=unescape(article["title"]),
+ value=ARTICLE_URL.format(article_url=article["url"]),
+ inline=False,
+ )
+ article_embed.set_footer(text="Click the links to go to the articles.")
+
+ await ctx.send(embed=article_embed)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Real Python Cog."""
+ bot.add_cog(RealPython(bot))
diff --git a/bot/exts/utilities/reddit.py b/bot/exts/utilities/reddit.py
new file mode 100644
index 00000000..e6cb5337
--- /dev/null
+++ b/bot/exts/utilities/reddit.py
@@ -0,0 +1,368 @@
+import asyncio
+import logging
+import random
+import textwrap
+from collections import namedtuple
+from datetime import datetime, timedelta
+from typing import Union
+
+from aiohttp import BasicAuth, ClientError
+from discord import Colour, Embed, TextChannel
+from discord.ext.commands import Cog, Context, group, has_any_role
+from discord.ext.tasks import loop
+from discord.utils import escape_markdown, sleep_until
+
+from bot.bot import Bot
+from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES
+from bot.utils.converters import Subreddit
+from bot.utils.extensions import invoke_help_command
+from bot.utils.messages import sub_clyde
+from bot.utils.pagination import ImagePaginator, LinePaginator
+
+log = logging.getLogger(__name__)
+
+AccessToken = namedtuple("AccessToken", ["token", "expires_at"])
+
+
+class Reddit(Cog):
+ """Track subreddit posts and show detailed statistics about them."""
+
+ HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"}
+ URL = "https://www.reddit.com"
+ OAUTH_URL = "https://oauth.reddit.com"
+ MAX_RETRIES = 3
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ self.webhook = None
+ self.access_token = None
+ self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret)
+
+ bot.loop.create_task(self.init_reddit_ready())
+ self.auto_poster_loop.start()
+
+ def cog_unload(self) -> None:
+ """Stop the loop task and revoke the access token when the cog is unloaded."""
+ self.auto_poster_loop.cancel()
+ if self.access_token and self.access_token.expires_at > datetime.utcnow():
+ asyncio.create_task(self.revoke_access_token())
+
+ async def init_reddit_ready(self) -> None:
+ """Sets the reddit webhook when the cog is loaded."""
+ await self.bot.wait_until_guild_available()
+ if not self.webhook:
+ self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook)
+
+ @property
+ def channel(self) -> TextChannel:
+ """Get the #reddit channel object from the bot's cache."""
+ return self.bot.get_channel(Channels.reddit)
+
+ def build_pagination_pages(self, posts: list[dict], paginate: bool) -> Union[list[tuple], str]:
+ """Build embed pages required for Paginator."""
+ pages = []
+ first_page = ""
+ for post in posts:
+ post_page = ""
+ image_url = ""
+
+ data = post["data"]
+
+ title = textwrap.shorten(data["title"], width=50, placeholder="...")
+
+ # Normal brackets interfere with Markdown.
+ title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌")
+ link = self.URL + data["permalink"]
+
+ first_page += f"**[{title.replace('*', '')}]({link})**\n"
+
+ text = data["selftext"]
+ if text:
+ text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌")
+ first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n"
+
+ ups = data["ups"]
+ comments = data["num_comments"]
+ author = data["author"]
+
+ content_type = Emojis.reddit_post_text
+ if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))):
+ # This means the content type in the post is a video.
+ content_type = f"{Emojis.reddit_post_video}"
+
+ elif data["url"].endswith(("jpg", "png", "gif")):
+ # This means the content type in the post is an image.
+ content_type = f"{Emojis.reddit_post_photo}"
+ image_url = data["url"]
+
+ first_page += (
+ f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}"
+ f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n"
+ )
+
+ if paginate:
+ post_page += f"**[{title}]({link})**\n\n"
+ if text:
+ post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n"
+ post_page += (
+ f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002"
+ f"{comments}\u2003{Emojis.reddit_users}{author}"
+ )
+
+ pages.append((post_page, image_url))
+
+ if not paginate:
+ # Return the first summery page if pagination is not required
+ return first_page
+
+ pages.insert(0, (first_page, "")) # Using image paginator, hence settings image url to empty string
+ return pages
+
+ async def get_access_token(self) -> None:
+ """
+ Get a Reddit API OAuth2 access token and assign it to self.access_token.
+
+ A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog
+ will be unloaded and a ClientError raised if retrieval was still unsuccessful.
+ """
+ for i in range(1, self.MAX_RETRIES + 1):
+ response = await self.bot.http_session.post(
+ url=f"{self.URL}/api/v1/access_token",
+ headers=self.HEADERS,
+ auth=self.client_auth,
+ data={
+ "grant_type": "client_credentials",
+ "duration": "temporary"
+ }
+ )
+
+ if response.status == 200 and response.content_type == "application/json":
+ content = await response.json()
+ expiration = int(content["expires_in"]) - 60 # Subtract 1 minute for leeway.
+ self.access_token = AccessToken(
+ token=content["access_token"],
+ expires_at=datetime.utcnow() + timedelta(seconds=expiration)
+ )
+
+ log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}")
+ return
+ else:
+ log.debug(
+ f"Failed to get an access token: "
+ f"status {response.status} & content type {response.content_type}; "
+ f"retrying ({i}/{self.MAX_RETRIES})"
+ )
+
+ await asyncio.sleep(3)
+
+ self.bot.remove_cog(self.qualified_name)
+ raise ClientError("Authentication with the Reddit API failed. Unloading the cog.")
+
+ async def revoke_access_token(self) -> None:
+ """
+ Revoke the OAuth2 access token for the Reddit API.
+
+ For security reasons, it's good practice to revoke the token when it's no longer being used.
+ """
+ response = await self.bot.http_session.post(
+ url=f"{self.URL}/api/v1/revoke_token",
+ headers=self.HEADERS,
+ auth=self.client_auth,
+ data={
+ "token": self.access_token.token,
+ "token_type_hint": "access_token"
+ }
+ )
+
+ if response.status in [200, 204] and response.content_type == "application/json":
+ self.access_token = None
+ else:
+ log.warning(f"Unable to revoke access token: status {response.status}.")
+
+ async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> list[dict]:
+ """A helper method to fetch a certain amount of Reddit posts at a given route."""
+ # Reddit's JSON responses only provide 25 posts at most.
+ if not 25 >= amount > 0:
+ raise ValueError("Invalid amount of subreddit posts requested.")
+
+ # Renew the token if necessary.
+ if not self.access_token or self.access_token.expires_at < datetime.utcnow():
+ await self.get_access_token()
+
+ url = f"{self.OAUTH_URL}/{route}"
+ for _ in range(self.MAX_RETRIES):
+ response = await self.bot.http_session.get(
+ url=url,
+ headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"},
+ params=params
+ )
+ if response.status == 200 and response.content_type == 'application/json':
+ # Got appropriate response - process and return.
+ content = await response.json()
+ posts = content["data"]["children"]
+
+ filtered_posts = [post for post in posts if not post["data"]["over_18"]]
+
+ return filtered_posts[:amount]
+
+ await asyncio.sleep(3)
+
+ log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}")
+ return list() # Failed to get appropriate response within allowed number of retries.
+
+ async def get_top_posts(
+ self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False
+ ) -> Union[Embed, list[tuple]]:
+ """
+ Get the top amount of posts for a given subreddit within a specified timeframe.
+
+ A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top
+ weekly posts.
+
+ The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most.
+ """
+ embed = Embed()
+
+ posts = await self.fetch_posts(
+ route=f"{subreddit}/top",
+ amount=amount,
+ params={"t": time}
+ )
+ if not posts:
+ embed.title = random.choice(ERROR_REPLIES)
+ embed.colour = Colour.red()
+ embed.description = (
+ "Sorry! We couldn't find any SFW posts from that subreddit. "
+ "If this problem persists, please let us know."
+ )
+
+ return embed
+
+ if paginate:
+ return self.build_pagination_pages(posts, paginate=True)
+
+ # Use only starting summary page for #reddit channel posts.
+ embed.description = self.build_pagination_pages(posts, paginate=False)
+ embed.colour = Colour.blurple()
+ return embed
+
+ @loop()
+ async def auto_poster_loop(self) -> None:
+ """Post the top 5 posts daily, and the top 5 posts weekly."""
+ # once d.py get support for `time` parameter in loop decorator,
+ # this can be removed and the loop can use the `time=datetime.time.min` parameter
+ now = datetime.utcnow()
+ tomorrow = now + timedelta(days=1)
+ midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0)
+
+ await sleep_until(midnight_tomorrow)
+
+ await self.bot.wait_until_guild_available()
+ if not self.webhook:
+ await self.bot.fetch_webhook(RedditConfig.webhook)
+
+ if datetime.utcnow().weekday() == 0:
+ await self.top_weekly_posts()
+ # if it's a monday send the top weekly posts
+
+ for subreddit in RedditConfig.subreddits:
+ top_posts = await self.get_top_posts(subreddit=subreddit, time="day")
+ username = sub_clyde(f"{subreddit} Top Daily Posts")
+ message = await self.webhook.send(username=username, embed=top_posts, wait=True)
+
+ if message.channel.is_news():
+ await message.publish()
+
+ async def top_weekly_posts(self) -> None:
+ """Post a summary of the top posts."""
+ for subreddit in RedditConfig.subreddits:
+ # Send and pin the new weekly posts.
+ top_posts = await self.get_top_posts(subreddit=subreddit, time="week")
+ username = sub_clyde(f"{subreddit} Top Weekly Posts")
+ message = await self.webhook.send(wait=True, username=username, embed=top_posts)
+
+ if subreddit.lower() == "r/python":
+ if not self.channel:
+ log.warning("Failed to get #reddit channel to remove pins in the weekly loop.")
+ return
+
+ # Remove the oldest pins so that only 12 remain at most.
+ pins = await self.channel.pins()
+
+ while len(pins) >= 12:
+ await pins[-1].unpin()
+ del pins[-1]
+
+ await message.pin()
+
+ if message.channel.is_news():
+ await message.publish()
+
+ @group(name="reddit", invoke_without_command=True)
+ async def reddit_group(self, ctx: Context) -> None:
+ """View the top posts from various subreddits."""
+ await invoke_help_command(ctx)
+
+ @reddit_group.command(name="top")
+ async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of all time from a given subreddit."""
+ async with ctx.typing():
+ pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True)
+
+ await ctx.send(f"Here are the top {subreddit} posts of all time!")
+ embed = Embed(
+ color=Colour.blurple()
+ )
+
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+ @reddit_group.command(name="daily")
+ async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of today from a given subreddit."""
+ async with ctx.typing():
+ pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True)
+
+ await ctx.send(f"Here are today's top {subreddit} posts!")
+ embed = Embed(
+ color=Colour.blurple()
+ )
+
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+ @reddit_group.command(name="weekly")
+ async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of this week from a given subreddit."""
+ async with ctx.typing():
+ pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True)
+
+ await ctx.send(f"Here are this week's top {subreddit} posts!")
+ embed = Embed(
+ color=Colour.blurple()
+ )
+
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+ @has_any_role(*STAFF_ROLES)
+ @reddit_group.command(name="subreddits", aliases=("subs",))
+ async def subreddits_command(self, ctx: Context) -> None:
+ """Send a paginated embed of all the subreddits we're relaying."""
+ embed = Embed()
+ embed.title = "Relayed subreddits."
+ embed.colour = Colour.blurple()
+
+ await LinePaginator.paginate(
+ RedditConfig.subreddits,
+ ctx, embed,
+ footer_text="Use the reddit commands along with these to view their posts.",
+ empty=False,
+ max_lines=15
+ )
+
+
+def setup(bot: Bot) -> None:
+ """Load the Reddit cog."""
+ if not RedditConfig.secret or not RedditConfig.client_id:
+ log.error("Credentials not provided, cog not loaded.")
+ return
+ bot.add_cog(Reddit(bot))
diff --git a/bot/exts/utilities/stackoverflow.py b/bot/exts/utilities/stackoverflow.py
new file mode 100644
index 00000000..64455e33
--- /dev/null
+++ b/bot/exts/utilities/stackoverflow.py
@@ -0,0 +1,88 @@
+import logging
+from html import unescape
+from urllib.parse import quote_plus
+
+from discord import Embed, HTTPException
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, Emojis
+
+logger = logging.getLogger(__name__)
+
+BASE_URL = "https://api.stackexchange.com/2.2/search/advanced"
+SO_PARAMS = {
+ "order": "desc",
+ "sort": "activity",
+ "site": "stackoverflow"
+}
+SEARCH_URL = "https://stackoverflow.com/search?q={query}"
+ERR_EMBED = Embed(
+ title="Error in fetching results from Stackoverflow",
+ description=(
+ "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some "
+ "time. If this issue persists, please contact the staff or send a message in #dev-contrib."
+ ),
+ color=Colours.soft_red
+)
+
+
+class Stackoverflow(commands.Cog):
+ """Contains command to interact with stackoverflow from discord."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @commands.command(aliases=["so"])
+ @commands.cooldown(1, 15, commands.cooldowns.BucketType.user)
+ async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None:
+ """Sends the top 5 results of a search query from stackoverflow."""
+ params = SO_PARAMS | {"q": search_query}
+ async with self.bot.http_session.get(url=BASE_URL, params=params) as response:
+ if response.status == 200:
+ data = await response.json()
+ else:
+ logger.error(f'Status code is not 200, it is {response.status}')
+ await ctx.send(embed=ERR_EMBED)
+ return
+ if not data['items']:
+ no_search_result = Embed(
+ title=f"No search results found for {search_query}",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=no_search_result)
+ return
+
+ top5 = data["items"][:5]
+ encoded_search_query = quote_plus(search_query)
+ embed = Embed(
+ title="Search results - Stackoverflow",
+ url=SEARCH_URL.format(query=encoded_search_query),
+ description=f"Here are the top {len(top5)} results:",
+ color=Colours.orange
+ )
+ for item in top5:
+ embed.add_field(
+ name=unescape(item['title']),
+ value=(
+ f"[{Emojis.reddit_upvote} {item['score']} "
+ f"{Emojis.stackoverflow_views} {item['view_count']} "
+ f"{Emojis.reddit_comments} {item['answer_count']} "
+ f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]"
+ f"({item['link']})"
+ ),
+ inline=False)
+ embed.set_footer(text="View the original link for more results.")
+ try:
+ await ctx.send(embed=embed)
+ except HTTPException:
+ search_query_too_long = Embed(
+ title="Your search query is too long, please try shortening your search query",
+ color=Colours.soft_red
+ )
+ await ctx.send(embed=search_query_too_long)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Stackoverflow Cog."""
+ bot.add_cog(Stackoverflow(bot))
diff --git a/bot/exts/utilities/timed.py b/bot/exts/utilities/timed.py
new file mode 100644
index 00000000..2ea6b419
--- /dev/null
+++ b/bot/exts/utilities/timed.py
@@ -0,0 +1,48 @@
+from copy import copy
+from time import perf_counter
+
+from discord import Message
+from discord.ext import commands
+
+from bot.bot import Bot
+
+
+class TimedCommands(commands.Cog):
+ """Time the command execution of a command."""
+
+ @staticmethod
+ async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context:
+ """Get a new execution context for a command."""
+ msg: Message = copy(ctx.message)
+ msg.content = f"{ctx.prefix}{command}"
+
+ return await ctx.bot.get_context(msg)
+
+ @commands.command(name="timed", aliases=("time", "t"))
+ async def timed(self, ctx: commands.Context, *, command: str) -> None:
+ """Time the command execution of a command."""
+ new_ctx = await self.create_execution_context(ctx, command)
+
+ ctx.subcontext = new_ctx
+
+ if not ctx.subcontext.command:
+ help_command = f"{ctx.prefix}help"
+ error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands."
+
+ await ctx.send(error)
+ return
+
+ if new_ctx.command.qualified_name == "timed":
+ await ctx.send("You are not allowed to time the execution of the `timed` command.")
+ return
+
+ t_start = perf_counter()
+ await new_ctx.command.invoke(new_ctx)
+ t_end = perf_counter()
+
+ await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.")
+
+
+def setup(bot: Bot) -> None:
+ """Load the Timed cog."""
+ bot.add_cog(TimedCommands())
diff --git a/bot/exts/utilities/wikipedia.py b/bot/exts/utilities/wikipedia.py
new file mode 100644
index 00000000..eccc1f8c
--- /dev/null
+++ b/bot/exts/utilities/wikipedia.py
@@ -0,0 +1,100 @@
+import logging
+import re
+from datetime import datetime
+from html import unescape
+
+from discord import Color, Embed, TextChannel
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.utils import LinePaginator
+from bot.utils.exceptions import APIError
+
+log = logging.getLogger(__name__)
+
+SEARCH_API = (
+ "https://en.wikipedia.org/w/api.php"
+)
+WIKI_PARAMS = {
+ "action": "query",
+ "list": "search",
+ "prop": "info",
+ "inprop": "url",
+ "utf8": "",
+ "format": "json",
+ "origin": "*",
+
+}
+WIKI_THUMBNAIL = (
+ "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg"
+ "/330px-Wikipedia-logo-v2.svg.png"
+)
+WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)"
+WIKI_SEARCH_RESULT = (
+ "**[{name}]({url})**\n"
+ "{description}\n"
+)
+
+
+class WikipediaSearch(commands.Cog):
+ """Get info from wikipedia."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ async def wiki_request(self, channel: TextChannel, search: str) -> list[str]:
+ """Search wikipedia search string and return formatted first 10 pages found."""
+ params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search}
+ async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp:
+ if resp.status != 200:
+ log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`")
+ raise APIError("Wikipedia API", resp.status)
+
+ raw_data = await resp.json()
+
+ if not raw_data.get("query"):
+ if error := raw_data.get("errors"):
+ log.error(f"There was an error while communicating with the Wikipedia API: {error}")
+ raise APIError("Wikipedia API", resp.status, error)
+
+ lines = []
+ if raw_data["query"]["searchinfo"]["totalhits"]:
+ for article in raw_data["query"]["search"]:
+ line = WIKI_SEARCH_RESULT.format(
+ name=article["title"],
+ description=unescape(
+ re.sub(
+ WIKI_SNIPPET_REGEX, "", article["snippet"]
+ )
+ ),
+ url=f"https://en.wikipedia.org/?curid={article['pageid']}"
+ )
+ lines.append(line)
+
+ return lines
+
+ @commands.cooldown(1, 10, commands.BucketType.user)
+ @commands.command(name="wikipedia", aliases=("wiki",))
+ async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None:
+ """Sends paginated top 10 results of Wikipedia search.."""
+ contents = await self.wiki_request(ctx.channel, search)
+
+ if contents:
+ embed = Embed(
+ title="Wikipedia Search Results",
+ colour=Color.blurple()
+ )
+ embed.set_thumbnail(url=WIKI_THUMBNAIL)
+ embed.timestamp = datetime.utcnow()
+ await LinePaginator.paginate(
+ contents, ctx, embed
+ )
+ else:
+ await ctx.send(
+ "Sorry, we could not find a wikipedia article using that search term."
+ )
+
+
+def setup(bot: Bot) -> None:
+ """Load the WikipediaSearch cog."""
+ bot.add_cog(WikipediaSearch(bot))
diff --git a/bot/exts/utilities/wolfram.py b/bot/exts/utilities/wolfram.py
new file mode 100644
index 00000000..9a26e545
--- /dev/null
+++ b/bot/exts/utilities/wolfram.py
@@ -0,0 +1,293 @@
+import logging
+from io import BytesIO
+from typing import Callable, Optional
+from urllib.parse import urlencode
+
+import arrow
+import discord
+from discord import Embed
+from discord.ext import commands
+from discord.ext.commands import BucketType, Cog, Context, check, group
+
+from bot.bot import Bot
+from bot.constants import Colours, STAFF_ROLES, Wolfram
+from bot.utils.pagination import ImagePaginator
+
+log = logging.getLogger(__name__)
+
+APPID = Wolfram.key
+DEFAULT_OUTPUT_FORMAT = "JSON"
+QUERY = "http://api.wolframalpha.com/v2/{request}"
+WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1"
+
+MAX_PODS = 20
+
+# Allows for 10 wolfram calls pr user pr day
+usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user)
+
+# Allows for max api requests / days in month per day for the entire guild (Temporary)
+guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild)
+
+
+async def send_embed(
+ ctx: Context,
+ message_txt: str,
+ colour: int = Colours.soft_red,
+ footer: str = None,
+ img_url: str = None,
+ f: discord.File = None
+) -> None:
+ """Generate & send a response embed with Wolfram as the author."""
+ embed = Embed(colour=colour)
+ embed.description = message_txt
+ embed.set_author(
+ name="Wolfram Alpha",
+ icon_url=WOLF_IMAGE,
+ url="https://www.wolframalpha.com/"
+ )
+ if footer:
+ embed.set_footer(text=footer)
+
+ if img_url:
+ embed.set_image(url=img_url)
+
+ await ctx.send(embed=embed, file=f)
+
+
+def custom_cooldown(*ignore: int) -> Callable:
+ """
+ Implement per-user and per-guild cooldowns for requests to the Wolfram API.
+
+ A list of roles may be provided to ignore the per-user cooldown.
+ """
+ async def predicate(ctx: Context) -> bool:
+ if ctx.invoked_with == "help":
+ # if the invoked command is help we don't want to increase the ratelimits since it's not actually
+ # invoking the command/making a request, so instead just check if the user/guild are on cooldown.
+ guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown
+ # check the message is in a guild, and check user bucket if user is not ignored
+ if ctx.guild and not any(r.id in ignore for r in ctx.author.roles):
+ return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0
+ return guild_cooldown
+
+ user_bucket = usercd.get_bucket(ctx.message)
+
+ if all(role.id not in ignore for role in ctx.author.roles):
+ user_rate = user_bucket.update_rate_limit()
+
+ if user_rate:
+ # Can't use api; cause: member limit
+ cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True)
+ message = (
+ "You've used up your limit for Wolfram|Alpha requests.\n"
+ f"Cooldown: {cooldown}"
+ )
+ await send_embed(ctx, message)
+ return False
+
+ guild_bucket = guildcd.get_bucket(ctx.message)
+ guild_rate = guild_bucket.update_rate_limit()
+
+ # Repr has a token attribute to read requests left
+ log.debug(guild_bucket)
+
+ if guild_rate:
+ # Can't use api; cause: guild limit
+ message = (
+ "The max limit of requests for the server has been reached for today.\n"
+ f"Cooldown: {int(guild_rate)}"
+ )
+ await send_embed(ctx, message)
+ return False
+
+ return True
+
+ return check(predicate)
+
+
+async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]:
+ """Get the Wolfram API pod pages for the provided query."""
+ async with ctx.typing():
+ params = {
+ "input": query,
+ "appid": APPID,
+ "output": DEFAULT_OUTPUT_FORMAT,
+ "format": "image,plaintext",
+ "location": "the moon",
+ "latlong": "0.0,0.0",
+ "ip": "1.1.1.1"
+ }
+ request_url = QUERY.format(request="query")
+
+ async with bot.http_session.get(url=request_url, params=params) as response:
+ json = await response.json(content_type="text/plain")
+
+ result = json["queryresult"]
+ log_full_url = f"{request_url}?{urlencode(params)}"
+ if result["error"]:
+ # API key not set up correctly
+ if result["error"]["msg"] == "Invalid appid":
+ message = "Wolfram API key is invalid or missing."
+ log.warning(
+ "API key seems to be missing, or invalid when "
+ f"processing a wolfram request: {log_full_url}, Response: {json}"
+ )
+ await send_embed(ctx, message)
+ return None
+
+ message = "Something went wrong internally with your request, please notify staff!"
+ log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}")
+ await send_embed(ctx, message)
+ return None
+
+ if not result["success"]:
+ message = f"I couldn't find anything for {query}."
+ await send_embed(ctx, message)
+ return None
+
+ if not result["numpods"]:
+ message = "Could not find any results."
+ await send_embed(ctx, message)
+ return None
+
+ pods = result["pods"]
+ pages = []
+ for pod in pods[:MAX_PODS]:
+ subs = pod.get("subpods")
+
+ for sub in subs:
+ title = sub.get("title") or sub.get("plaintext") or sub.get("id", "")
+ img = sub["img"]["src"]
+ pages.append((title, img))
+ return pages
+
+
+class Wolfram(Cog):
+ """Commands for interacting with the Wolfram|Alpha API."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
+ @custom_cooldown(*STAFF_ROLES)
+ async def wolfram_command(self, ctx: Context, *, query: str) -> None:
+ """Requests all answers on a single image, sends an image of all related pods."""
+ params = {
+ "i": query,
+ "appid": APPID,
+ "location": "the moon",
+ "latlong": "0.0,0.0",
+ "ip": "1.1.1.1"
+ }
+ request_url = QUERY.format(request="simple")
+
+ # Give feedback that the bot is working.
+ async with ctx.typing():
+ async with self.bot.http_session.get(url=request_url, params=params) as response:
+ status = response.status
+ image_bytes = await response.read()
+
+ f = discord.File(BytesIO(image_bytes), filename="image.png")
+ image_url = "attachment://image.png"
+
+ if status == 501:
+ message = "Failed to get response."
+ footer = ""
+ color = Colours.soft_red
+ elif status == 400:
+ message = "No input found."
+ footer = ""
+ color = Colours.soft_red
+ elif status == 403:
+ message = "Wolfram API key is invalid or missing."
+ footer = ""
+ color = Colours.soft_red
+ else:
+ message = ""
+ footer = "View original for a bigger picture."
+ color = Colours.soft_orange
+
+ # Sends a "blank" embed if no request is received, unsure how to fix
+ await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f)
+
+ @wolfram_command.command(name="page", aliases=("pa", "p"))
+ @custom_cooldown(*STAFF_ROLES)
+ async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
+ """
+ Requests a drawn image of given query.
+
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
+ """
+ pages = await get_pod_pages(ctx, self.bot, query)
+
+ if not pages:
+ return
+
+ embed = Embed()
+ embed.set_author(
+ name="Wolfram Alpha",
+ icon_url=WOLF_IMAGE,
+ url="https://www.wolframalpha.com/"
+ )
+ embed.colour = Colours.soft_orange
+
+ await ImagePaginator.paginate(pages, ctx, embed)
+
+ @wolfram_command.command(name="cut", aliases=("c",))
+ @custom_cooldown(*STAFF_ROLES)
+ async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
+ """
+ Requests a drawn image of given query.
+
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
+ """
+ pages = await get_pod_pages(ctx, self.bot, query)
+
+ if not pages:
+ return
+
+ if len(pages) >= 2:
+ page = pages[1]
+ else:
+ page = pages[0]
+
+ await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1])
+
+ @wolfram_command.command(name="short", aliases=("sh", "s"))
+ @custom_cooldown(*STAFF_ROLES)
+ async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
+ """Requests an answer to a simple question."""
+ params = {
+ "i": query,
+ "appid": APPID,
+ "location": "the moon",
+ "latlong": "0.0,0.0",
+ "ip": "1.1.1.1"
+ }
+ request_url = QUERY.format(request="result")
+
+ # Give feedback that the bot is working.
+ async with ctx.typing():
+ async with self.bot.http_session.get(url=request_url, params=params) as response:
+ status = response.status
+ response_text = await response.text()
+
+ if status == 501:
+ message = "Failed to get response."
+ color = Colours.soft_red
+ elif status == 400:
+ message = "No input found."
+ color = Colours.soft_red
+ elif response_text == "Error 1: Invalid appid.":
+ message = "Wolfram API key is invalid or missing."
+ color = Colours.soft_red
+ else:
+ message = response_text
+ color = Colours.soft_orange
+
+ await send_embed(ctx, message, color)
+
+
+def setup(bot: Bot) -> None:
+ """Load the Wolfram cog."""
+ bot.add_cog(Wolfram(bot))