aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/evergreen
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/evergreen')
-rw-r--r--bot/exts/evergreen/bookmark.py153
-rw-r--r--bot/exts/evergreen/cheatsheet.py112
-rw-r--r--bot/exts/evergreen/conversationstarters.py69
-rw-r--r--bot/exts/evergreen/emoji.py123
-rw-r--r--bot/exts/evergreen/githubinfo.py178
-rw-r--r--bot/exts/evergreen/issues.py275
-rw-r--r--bot/exts/evergreen/latex.py101
-rw-r--r--bot/exts/evergreen/pythonfacts.py36
-rw-r--r--bot/exts/evergreen/realpython.py81
-rw-r--r--bot/exts/evergreen/reddit.py368
-rw-r--r--bot/exts/evergreen/stackoverflow.py88
-rw-r--r--bot/exts/evergreen/timed.py48
-rw-r--r--bot/exts/evergreen/wikipedia.py100
-rw-r--r--bot/exts/evergreen/wolfram.py293
14 files changed, 0 insertions, 2025 deletions
diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py
deleted file mode 100644
index a91ef1c0..00000000
--- a/bot/exts/evergreen/bookmark.py
+++ /dev/null
@@ -1,153 +0,0 @@
-import asyncio
-import logging
-import random
-from typing import Optional
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import Categories, Colours, ERROR_REPLIES, Icons, WHITELISTED_CHANNELS
-from bot.utils.converters import WrappedMessageConverter
-from bot.utils.decorators import whitelist_override
-
-log = logging.getLogger(__name__)
-
-# Number of seconds to wait for other users to bookmark the same message
-TIMEOUT = 120
-BOOKMARK_EMOJI = "📌"
-WHITELISTED_CATEGORIES = (Categories.help_in_use,)
-
-
-class Bookmark(commands.Cog):
- """Creates personal bookmarks by relaying a message link to the user's DMs."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @staticmethod
- def build_bookmark_dm(target_message: discord.Message, title: str) -> discord.Embed:
- """Build the embed to DM the bookmark requester."""
- embed = discord.Embed(
- title=title,
- description=target_message.content,
- colour=Colours.soft_green
- )
- embed.add_field(
- name="Wanna give it a visit?",
- value=f"[Visit original message]({target_message.jump_url})"
- )
- embed.set_author(name=target_message.author, icon_url=target_message.author.display_avatar.url)
- embed.set_thumbnail(url=Icons.bookmark)
-
- return embed
-
- @staticmethod
- def build_error_embed(user: discord.Member) -> discord.Embed:
- """Builds an error embed for when a bookmark requester has DMs disabled."""
- return discord.Embed(
- title=random.choice(ERROR_REPLIES),
- description=f"{user.mention}, please enable your DMs to receive the bookmark.",
- colour=Colours.soft_red
- )
-
- async def action_bookmark(
- self,
- channel: discord.TextChannel,
- user: discord.Member,
- target_message: discord.Message,
- title: str
- ) -> None:
- """Sends the bookmark DM, or sends an error embed when a user bookmarks a message."""
- try:
- embed = self.build_bookmark_dm(target_message, title)
- await user.send(embed=embed)
- except discord.Forbidden:
- error_embed = self.build_error_embed(user)
- await channel.send(embed=error_embed)
- else:
- log.info(f"{user} bookmarked {target_message.jump_url} with title '{title}'")
-
- @staticmethod
- async def send_reaction_embed(
- channel: discord.TextChannel,
- target_message: discord.Message
- ) -> discord.Message:
- """Sends an embed, with a reaction, so users can react to bookmark the message too."""
- message = await channel.send(
- embed=discord.Embed(
- description=(
- f"React with {BOOKMARK_EMOJI} to be sent your very own bookmark to "
- f"[this message]({target_message.jump_url})."
- ),
- colour=Colours.soft_green
- )
- )
-
- await message.add_reaction(BOOKMARK_EMOJI)
- return message
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(name="bookmark", aliases=("bm", "pin"))
- async def bookmark(
- self,
- ctx: commands.Context,
- target_message: Optional[WrappedMessageConverter],
- *,
- title: str = "Bookmark"
- ) -> None:
- """Send the author a link to `target_message` via DMs."""
- if not target_message:
- if not ctx.message.reference:
- raise commands.UserInputError("You must either provide a valid message to bookmark, or reply to one.")
- target_message = ctx.message.reference.resolved
-
- # Prevent users from bookmarking a message in a channel they don't have access to
- permissions = target_message.channel.permissions_for(ctx.author)
- if not permissions.read_messages:
- log.info(f"{ctx.author} tried to bookmark a message in #{target_message.channel} but has no permissions.")
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description="You don't have permission to view this channel."
- )
- await ctx.send(embed=embed)
- return
-
- def event_check(reaction: discord.Reaction, user: discord.Member) -> bool:
- """Make sure that this reaction is what we want to operate on."""
- return (
- # Conditions for a successful pagination:
- all((
- # Reaction is on this message
- reaction.message.id == reaction_message.id,
- # User has not already bookmarked this message
- user.id not in bookmarked_users,
- # Reaction is the `BOOKMARK_EMOJI` emoji
- str(reaction.emoji) == BOOKMARK_EMOJI,
- # Reaction was not made by the Bot
- user.id != self.bot.user.id
- ))
- )
- await self.action_bookmark(ctx.channel, ctx.author, target_message, title)
-
- # Keep track of who has already bookmarked, so users can't spam reactions and cause loads of DMs
- bookmarked_users = [ctx.author.id]
- reaction_message = await self.send_reaction_embed(ctx.channel, target_message)
-
- while True:
- try:
- _, user = await self.bot.wait_for("reaction_add", timeout=TIMEOUT, check=event_check)
- except asyncio.TimeoutError:
- log.debug("Timed out waiting for a reaction")
- break
- log.trace(f"{user} has successfully bookmarked from a reaction, attempting to DM them.")
- await self.action_bookmark(ctx.channel, user, target_message, title)
- bookmarked_users.append(user.id)
-
- await reaction_message.delete()
-
-
-def setup(bot: Bot) -> None:
- """Load the Bookmark cog."""
- bot.add_cog(Bookmark(bot))
diff --git a/bot/exts/evergreen/cheatsheet.py b/bot/exts/evergreen/cheatsheet.py
deleted file mode 100644
index 33d29f67..00000000
--- a/bot/exts/evergreen/cheatsheet.py
+++ /dev/null
@@ -1,112 +0,0 @@
-import random
-import re
-from typing import Union
-from urllib.parse import quote_plus
-
-from discord import Embed
-from discord.ext import commands
-from discord.ext.commands import BucketType, Context
-
-from bot import constants
-from bot.bot import Bot
-from bot.constants import Categories, Channels, Colours, ERROR_REPLIES
-from bot.utils.decorators import whitelist_override
-
-ERROR_MESSAGE = f"""
-Unknown cheat sheet. Please try to reformulate your query.
-
-**Examples**:
-```md
-{constants.Client.prefix}cht read json
-{constants.Client.prefix}cht hello world
-{constants.Client.prefix}cht lambda
-```
-If the problem persists send a message in <#{Channels.dev_contrib}>
-"""
-
-URL = "https://cheat.sh/python/{search}"
-ESCAPE_TT = str.maketrans({"`": "\\`"})
-ANSI_RE = re.compile(r"\x1b\[.*?m")
-# We need to pass headers as curl otherwise it would default to aiohttp which would return raw html.
-HEADERS = {"User-Agent": "curl/7.68.0"}
-
-
-class CheatSheet(commands.Cog):
- """Commands that sends a result of a cht.sh search in code blocks."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @staticmethod
- def fmt_error_embed() -> Embed:
- """
- Format the Error Embed.
-
- If the cht.sh search returned 404, overwrite it to send a custom error embed.
- link -> https://github.com/chubin/cheat.sh/issues/198
- """
- embed = Embed(
- title=random.choice(ERROR_REPLIES),
- description=ERROR_MESSAGE,
- colour=Colours.soft_red
- )
- return embed
-
- def result_fmt(self, url: str, body_text: str) -> tuple[bool, Union[str, Embed]]:
- """Format Result."""
- if body_text.startswith("# 404 NOT FOUND"):
- embed = self.fmt_error_embed()
- return True, embed
-
- body_space = min(1986 - len(url), 1000)
-
- if len(body_text) > body_space:
- description = (
- f"**Result Of cht.sh**\n"
- f"```python\n{body_text[:body_space]}\n"
- f"... (truncated - too many lines)\n```\n"
- f"Full results: {url} "
- )
- else:
- description = (
- f"**Result Of cht.sh**\n"
- f"```python\n{body_text}\n```\n"
- f"{url}"
- )
- return False, description
-
- @commands.command(
- name="cheat",
- aliases=("cht.sh", "cheatsheet", "cheat-sheet", "cht"),
- )
- @commands.cooldown(1, 10, BucketType.user)
- @whitelist_override(categories=[Categories.help_in_use])
- async def cheat_sheet(self, ctx: Context, *search_terms: str) -> None:
- """
- Search cheat.sh.
-
- Gets a post from https://cheat.sh/python/ by default.
- Usage:
- --> .cht read json
- """
- async with ctx.typing():
- search_string = quote_plus(" ".join(search_terms))
-
- async with self.bot.http_session.get(
- URL.format(search=search_string), headers=HEADERS
- ) as response:
- result = ANSI_RE.sub("", await response.text()).translate(ESCAPE_TT)
-
- is_embed, description = self.result_fmt(
- URL.format(search=search_string),
- result
- )
- if is_embed:
- await ctx.send(embed=description)
- else:
- await ctx.send(content=description)
-
-
-def setup(bot: Bot) -> None:
- """Load the CheatSheet cog."""
- bot.add_cog(CheatSheet(bot))
diff --git a/bot/exts/evergreen/conversationstarters.py b/bot/exts/evergreen/conversationstarters.py
deleted file mode 100644
index fdc4467a..00000000
--- a/bot/exts/evergreen/conversationstarters.py
+++ /dev/null
@@ -1,69 +0,0 @@
-from pathlib import Path
-
-import yaml
-from discord import Color, Embed
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import WHITELISTED_CHANNELS
-from bot.utils.decorators import whitelist_override
-from bot.utils.randomization import RandomCycle
-
-SUGGESTION_FORM = "https://forms.gle/zw6kkJqv8U43Nfjg9"
-
-with Path("bot/resources/evergreen/starter.yaml").open("r", encoding="utf8") as f:
- STARTERS = yaml.load(f, Loader=yaml.FullLoader)
-
-with Path("bot/resources/evergreen/py_topics.yaml").open("r", encoding="utf8") as f:
- # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help.
- PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader)
-
- # Removing `None` from lists of topics, if not a list, it is changed to an empty one.
- PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()}
-
- # All the allowed channels that the ".topic" command is allowed to be executed in.
- ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS)
-
-# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions.
-ALL_TOPICS = {"default": STARTERS, **PY_TOPICS}
-TOPICS = {
- channel: RandomCycle(topics or ["No topics found for this channel."])
- for channel, topics in ALL_TOPICS.items()
-}
-
-
-class ConvoStarters(commands.Cog):
- """Evergreen conversation topics."""
-
- @commands.command()
- @whitelist_override(channels=ALL_ALLOWED_CHANNELS)
- async def topic(self, ctx: commands.Context) -> None:
- """
- Responds with a random topic to start a conversation.
-
- If in a Python channel, a python-related topic will be given.
-
- Otherwise, a random conversation topic will be received by the user.
- """
- # No matter what, the form will be shown.
- embed = Embed(description=f"Suggest more topics [here]({SUGGESTION_FORM})!", color=Color.blurple())
-
- try:
- # Fetching topics.
- channel_topics = TOPICS[ctx.channel.id]
-
- # If the channel isn't Python-related.
- except KeyError:
- embed.title = f"**{next(TOPICS['default'])}**"
-
- # If the channel ID doesn't have any topics.
- else:
- embed.title = f"**{next(channel_topics)}**"
-
- finally:
- await ctx.send(embed=embed)
-
-
-def setup(bot: Bot) -> None:
- """Load the ConvoStarters cog."""
- bot.add_cog(ConvoStarters())
diff --git a/bot/exts/evergreen/emoji.py b/bot/exts/evergreen/emoji.py
deleted file mode 100644
index 55d6b8e9..00000000
--- a/bot/exts/evergreen/emoji.py
+++ /dev/null
@@ -1,123 +0,0 @@
-import logging
-import random
-import textwrap
-from collections import defaultdict
-from datetime import datetime
-from typing import Optional
-
-from discord import Color, Embed, Emoji
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import Colours, ERROR_REPLIES
-from bot.utils.extensions import invoke_help_command
-from bot.utils.pagination import LinePaginator
-from bot.utils.time import time_since
-
-log = logging.getLogger(__name__)
-
-
-class Emojis(commands.Cog):
- """A collection of commands related to emojis in the server."""
-
- @staticmethod
- def embed_builder(emoji: dict) -> tuple[Embed, list[str]]:
- """Generates an embed with the emoji names and count."""
- embed = Embed(
- color=Colours.orange,
- title="Emoji Count",
- timestamp=datetime.utcnow()
- )
- msg = []
-
- if len(emoji) == 1:
- for category_name, category_emojis in emoji.items():
- if len(category_emojis) == 1:
- msg.append(f"There is **{len(category_emojis)}** emoji in the **{category_name}** category.")
- else:
- msg.append(f"There are **{len(category_emojis)}** emojis in the **{category_name}** category.")
- embed.set_thumbnail(url=random.choice(category_emojis).url)
-
- else:
- for category_name, category_emojis in emoji.items():
- emoji_choice = random.choice(category_emojis)
- if len(category_emojis) > 1:
- emoji_info = f"There are **{len(category_emojis)}** emojis in the **{category_name}** category."
- else:
- emoji_info = f"There is **{len(category_emojis)}** emoji in the **{category_name}** category."
- if emoji_choice.animated:
- msg.append(f"<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}")
- else:
- msg.append(f"<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}")
- return embed, msg
-
- @staticmethod
- def generate_invalid_embed(emojis: list[Emoji]) -> tuple[Embed, list[str]]:
- """Generates error embed for invalid emoji categories."""
- embed = Embed(
- color=Colours.soft_red,
- title=random.choice(ERROR_REPLIES)
- )
- msg = []
-
- emoji_dict = defaultdict(list)
- for emoji in emojis:
- emoji_dict[emoji.name.split("_")[0]].append(emoji)
-
- error_comp = ", ".join(emoji_dict)
- msg.append(f"These are the valid emoji categories:\n```\n{error_comp}\n```")
- return embed, msg
-
- @commands.group(name="emoji", invoke_without_command=True)
- async def emoji_group(self, ctx: commands.Context, emoji: Optional[Emoji]) -> None:
- """A group of commands related to emojis."""
- if emoji is not None:
- await ctx.invoke(self.info_command, emoji)
- else:
- await invoke_help_command(ctx)
-
- @emoji_group.command(name="count", aliases=("c",))
- async def count_command(self, ctx: commands.Context, *, category_query: str = None) -> None:
- """Returns embed with emoji category and info given by the user."""
- emoji_dict = defaultdict(list)
-
- if not ctx.guild.emojis:
- await ctx.send("No emojis found.")
- return
- log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user.")
- for emoji in ctx.guild.emojis:
- emoji_category = emoji.name.split("_")[0]
-
- if category_query is not None and emoji_category not in category_query:
- continue
-
- emoji_dict[emoji_category].append(emoji)
-
- if not emoji_dict:
- log.trace("Invalid name provided by the user")
- embed, msg = self.generate_invalid_embed(ctx.guild.emojis)
- else:
- embed, msg = self.embed_builder(emoji_dict)
- await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed)
-
- @emoji_group.command(name="info", aliases=("i",))
- async def info_command(self, ctx: commands.Context, emoji: Emoji) -> None:
- """Returns relevant information about a Discord Emoji."""
- emoji_information = Embed(
- title=f"Emoji Information: {emoji.name}",
- description=textwrap.dedent(f"""
- **Name:** {emoji.name}
- **Created:** {time_since(emoji.created_at, precision="hours")}
- **Date:** {datetime.strftime(emoji.created_at, "%d/%m/%Y")}
- **ID:** {emoji.id}
- """),
- color=Color.blurple(),
- url=str(emoji.url),
- ).set_thumbnail(url=emoji.url)
-
- await ctx.send(embed=emoji_information)
-
-
-def setup(bot: Bot) -> None:
- """Load the Emojis cog."""
- bot.add_cog(Emojis())
diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py
deleted file mode 100644
index bbc9061a..00000000
--- a/bot/exts/evergreen/githubinfo.py
+++ /dev/null
@@ -1,178 +0,0 @@
-import logging
-import random
-from datetime import datetime
-from urllib.parse import quote, quote_plus
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import Colours, NEGATIVE_REPLIES
-from bot.exts.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-GITHUB_API_URL = "https://api.github.com"
-
-
-class GithubInfo(commands.Cog):
- """Fetches info from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- async def fetch_data(self, url: str) -> dict:
- """Retrieve data as a dictionary."""
- async with self.bot.http_session.get(url) as r:
- return await r.json()
-
- @commands.group(name="github", aliases=("gh", "git"))
- @commands.cooldown(1, 10, commands.BucketType.user)
- async def github_group(self, ctx: commands.Context) -> None:
- """Commands for finding information related to GitHub."""
- if ctx.invoked_subcommand is None:
- await invoke_help_command(ctx)
-
- @github_group.command(name="user", aliases=("userinfo",))
- async def github_user_info(self, ctx: commands.Context, username: str) -> None:
- """Fetches a user's GitHub information."""
- async with ctx.typing():
- user_data = await self.fetch_data(f"{GITHUB_API_URL}/users/{quote_plus(username)}")
-
- # User_data will not have a message key if the user exists
- if "message" in user_data:
- embed = discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=f"The profile for `{username}` was not found.",
- colour=Colours.soft_red
- )
-
- await ctx.send(embed=embed)
- return
-
- org_data = await self.fetch_data(user_data["organizations_url"])
- orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data]
- orgs_to_add = " | ".join(orgs)
-
- gists = user_data["public_gists"]
-
- # Forming blog link
- if user_data["blog"].startswith("http"): # Blog link is complete
- blog = user_data["blog"]
- elif user_data["blog"]: # Blog exists but the link is not complete
- blog = f"https://{user_data['blog']}"
- else:
- blog = "No website link available"
-
- embed = discord.Embed(
- title=f"`{user_data['login']}`'s GitHub profile info",
- description=f"```\n{user_data['bio']}\n```\n" if user_data["bio"] else "",
- colour=discord.Colour.blurple(),
- url=user_data["html_url"],
- timestamp=datetime.strptime(user_data["created_at"], "%Y-%m-%dT%H:%M:%SZ")
- )
- embed.set_thumbnail(url=user_data["avatar_url"])
- embed.set_footer(text="Account created at")
-
- if user_data["type"] == "User":
-
- embed.add_field(
- name="Followers",
- value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)"
- )
- embed.add_field(
- name="Following",
- value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)"
- )
-
- embed.add_field(
- name="Public repos",
- value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)"
- )
-
- if user_data["type"] == "User":
- embed.add_field(
- name="Gists",
- value=f"[{gists}](https://gist.github.com/{quote_plus(username, safe='')})"
- )
-
- embed.add_field(
- name=f"Organization{'s' if len(orgs)!=1 else ''}",
- value=orgs_to_add if orgs else "No organizations."
- )
- embed.add_field(name="Website", value=blog)
-
- await ctx.send(embed=embed)
-
- @github_group.command(name='repository', aliases=('repo',))
- async def github_repo_info(self, ctx: commands.Context, *repo: str) -> None:
- """
- Fetches a repositories' GitHub information.
-
- The repository should look like `user/reponame` or `user reponame`.
- """
- repo = "/".join(repo)
- if repo.count("/") != 1:
- embed = discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description="The repository should look like `user/reponame` or `user reponame`.",
- colour=Colours.soft_red
- )
-
- await ctx.send(embed=embed)
- return
-
- async with ctx.typing():
- repo_data = await self.fetch_data(f"{GITHUB_API_URL}/repos/{quote(repo)}")
-
- # There won't be a message key if this repo exists
- if "message" in repo_data:
- embed = discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description="The requested repository was not found.",
- colour=Colours.soft_red
- )
-
- await ctx.send(embed=embed)
- return
-
- embed = discord.Embed(
- title=repo_data["name"],
- description=repo_data["description"],
- colour=discord.Colour.blurple(),
- url=repo_data["html_url"]
- )
-
- # If it's a fork, then it will have a parent key
- try:
- parent = repo_data["parent"]
- embed.description += f"\n\nForked from [{parent['full_name']}]({parent['html_url']})"
- except KeyError:
- log.debug("Repository is not a fork.")
-
- repo_owner = repo_data["owner"]
-
- embed.set_author(
- name=repo_owner["login"],
- url=repo_owner["html_url"],
- icon_url=repo_owner["avatar_url"]
- )
-
- repo_created_at = datetime.strptime(repo_data["created_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y")
- last_pushed = datetime.strptime(repo_data["pushed_at"], "%Y-%m-%dT%H:%M:%SZ").strftime("%d/%m/%Y at %H:%M")
-
- embed.set_footer(
- text=(
- f"{repo_data['forks_count']} ⑂ "
- f"• {repo_data['stargazers_count']} ⭐ "
- f"• Created At {repo_created_at} "
- f"• Last Commit {last_pushed}"
- )
- )
-
- await ctx.send(embed=embed)
-
-
-def setup(bot: Bot) -> None:
- """Load the GithubInfo cog."""
- bot.add_cog(GithubInfo(bot))
diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py
deleted file mode 100644
index 8a7ebed0..00000000
--- a/bot/exts/evergreen/issues.py
+++ /dev/null
@@ -1,275 +0,0 @@
-import logging
-import random
-import re
-from dataclasses import dataclass
-from typing import Optional, Union
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import (
- Categories,
- Channels,
- Colours,
- ERROR_REPLIES,
- Emojis,
- NEGATIVE_REPLIES,
- Tokens,
- WHITELISTED_CHANNELS
-)
-from bot.utils.decorators import whitelist_override
-from bot.utils.extensions import invoke_help_command
-
-log = logging.getLogger(__name__)
-
-BAD_RESPONSE = {
- 404: "Issue/pull request not located! Please enter a valid number!",
- 403: "Rate limit has been hit! Please try again later!"
-}
-REQUEST_HEADERS = {
- "Accept": "application/vnd.github.v3+json"
-}
-
-REPOSITORY_ENDPOINT = "https://api.github.com/orgs/{org}/repos?per_page=100&type=public"
-ISSUE_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/issues/{number}"
-PR_ENDPOINT = "https://api.github.com/repos/{user}/{repository}/pulls/{number}"
-
-if GITHUB_TOKEN := Tokens.github:
- REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
-
-WHITELISTED_CATEGORIES = (
- Categories.development, Categories.devprojects, Categories.media, Categories.staff
-)
-
-CODE_BLOCK_RE = re.compile(
- r"^`([^`\n]+)`" # Inline codeblock
- r"|```(.+?)```", # Multiline codeblock
- re.DOTALL | re.MULTILINE
-)
-
-# Maximum number of issues in one message
-MAXIMUM_ISSUES = 5
-
-# Regex used when looking for automatic linking in messages
-# regex101 of current regex https://regex101.com/r/V2ji8M/6
-AUTOMATIC_REGEX = re.compile(
- r"((?P<org>[a-zA-Z0-9][a-zA-Z0-9\-]{1,39})\/)?(?P<repo>[\w\-\.]{1,100})#(?P<number>[0-9]+)"
-)
-
-
-@dataclass
-class FoundIssue:
- """Dataclass representing an issue found by the regex."""
-
- organisation: Optional[str]
- repository: str
- number: str
-
- def __hash__(self) -> int:
- return hash((self.organisation, self.repository, self.number))
-
-
-@dataclass
-class FetchError:
- """Dataclass representing an error while fetching an issue."""
-
- return_code: int
- message: str
-
-
-@dataclass
-class IssueState:
- """Dataclass representing the state of an issue."""
-
- repository: str
- number: int
- url: str
- title: str
- emoji: str
-
-
-class Issues(commands.Cog):
- """Cog that allows users to retrieve issues from GitHub."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
- self.repos = []
-
- @staticmethod
- def remove_codeblocks(message: str) -> str:
- """Remove any codeblock in a message."""
- return re.sub(CODE_BLOCK_RE, "", message)
-
- async def fetch_issues(
- self,
- number: int,
- repository: str,
- user: str
- ) -> Union[IssueState, FetchError]:
- """
- Retrieve an issue from a GitHub repository.
-
- Returns IssueState on success, FetchError on failure.
- """
- url = ISSUE_ENDPOINT.format(user=user, repository=repository, number=number)
- pulls_url = PR_ENDPOINT.format(user=user, repository=repository, number=number)
- log.trace(f"Querying GH issues API: {url}")
-
- async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r:
- json_data = await r.json()
-
- if r.status == 403:
- if r.headers.get("X-RateLimit-Remaining") == "0":
- log.info(f"Ratelimit reached while fetching {url}")
- return FetchError(403, "Ratelimit reached, please retry in a few minutes.")
- return FetchError(403, "Cannot access issue.")
- elif r.status in (404, 410):
- return FetchError(r.status, "Issue not found.")
- elif r.status != 200:
- return FetchError(r.status, "Error while fetching issue.")
-
- # The initial API request is made to the issues API endpoint, which will return information
- # if the issue or PR is present. However, the scope of information returned for PRs differs
- # from issues: if the 'issues' key is present in the response then we can pull the data we
- # need from the initial API call.
- if "issues" in json_data["html_url"]:
- if json_data.get("state") == "open":
- emoji = Emojis.issue_open
- else:
- emoji = Emojis.issue_closed
-
- # If the 'issues' key is not contained in the API response and there is no error code, then
- # we know that a PR has been requested and a call to the pulls API endpoint is necessary
- # to get the desired information for the PR.
- else:
- log.trace(f"PR provided, querying GH pulls API for additional information: {pulls_url}")
- async with self.bot.http_session.get(pulls_url) as p:
- pull_data = await p.json()
- if pull_data["draft"]:
- emoji = Emojis.pull_request_draft
- elif pull_data["state"] == "open":
- emoji = Emojis.pull_request_open
- # When 'merged_at' is not None, this means that the state of the PR is merged
- elif pull_data["merged_at"] is not None:
- emoji = Emojis.pull_request_merged
- else:
- emoji = Emojis.pull_request_closed
-
- issue_url = json_data.get("html_url")
-
- return IssueState(repository, number, issue_url, json_data.get("title", ""), emoji)
-
- @staticmethod
- def format_embed(
- results: list[Union[IssueState, FetchError]],
- user: str,
- repository: Optional[str] = None
- ) -> discord.Embed:
- """Take a list of IssueState or FetchError and format a Discord embed for them."""
- description_list = []
-
- for result in results:
- if isinstance(result, IssueState):
- description_list.append(f"{result.emoji} [{result.title}]({result.url})")
- elif isinstance(result, FetchError):
- description_list.append(f":x: [{result.return_code}] {result.message}")
-
- resp = discord.Embed(
- colour=Colours.bright_green,
- description="\n".join(description_list)
- )
-
- embed_url = f"https://github.com/{user}/{repository}" if repository else f"https://github.com/{user}"
- resp.set_author(name="GitHub", url=embed_url)
- return resp
-
- @whitelist_override(channels=WHITELISTED_CHANNELS, categories=WHITELISTED_CATEGORIES)
- @commands.command(aliases=("pr",))
- async def issue(
- self,
- ctx: commands.Context,
- numbers: commands.Greedy[int],
- repository: str = "sir-lancebot",
- user: str = "python-discord"
- ) -> None:
- """Command to retrieve issue(s) from a GitHub repository."""
- # Remove duplicates
- numbers = set(numbers)
-
- if len(numbers) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await ctx.send(embed=embed)
- await invoke_help_command(ctx)
-
- results = [await self.fetch_issues(number, repository, user) for number in numbers]
- await ctx.send(embed=self.format_embed(results, user, repository))
-
- @commands.Cog.listener()
- async def on_message(self, message: discord.Message) -> None:
- """
- Automatic issue linking.
-
- Listener to retrieve issue(s) from a GitHub repository using automatic linking if matching <org>/<repo>#<issue>.
- """
- # Ignore bots
- if message.author.bot:
- return
-
- issues = [
- FoundIssue(*match.group("org", "repo", "number"))
- for match in AUTOMATIC_REGEX.finditer(self.remove_codeblocks(message.content))
- ]
- links = []
-
- if issues:
- # Block this from working in DMs
- if not message.guild:
- await message.channel.send(
- embed=discord.Embed(
- title=random.choice(NEGATIVE_REPLIES),
- description=(
- "You can't retrieve issues from DMs. "
- f"Try again in <#{Channels.community_bot_commands}>"
- ),
- colour=Colours.soft_red
- )
- )
- return
-
- log.trace(f"Found {issues = }")
- # Remove duplicates
- issues = set(issues)
-
- if len(issues) > MAXIMUM_ISSUES:
- embed = discord.Embed(
- title=random.choice(ERROR_REPLIES),
- color=Colours.soft_red,
- description=f"Too many issues/PRs! (maximum of {MAXIMUM_ISSUES})"
- )
- await message.channel.send(embed=embed, delete_after=5)
- return
-
- for repo_issue in issues:
- result = await self.fetch_issues(
- int(repo_issue.number),
- repo_issue.repository,
- repo_issue.organisation or "python-discord"
- )
- if isinstance(result, IssueState):
- links.append(result)
-
- if not links:
- return
-
- resp = self.format_embed(links, "python-discord")
- await message.channel.send(embed=resp)
-
-
-def setup(bot: Bot) -> None:
- """Load the Issues cog."""
- bot.add_cog(Issues(bot))
diff --git a/bot/exts/evergreen/latex.py b/bot/exts/evergreen/latex.py
deleted file mode 100644
index 36c7e0ab..00000000
--- a/bot/exts/evergreen/latex.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import asyncio
-import hashlib
-import pathlib
-import re
-from concurrent.futures import ThreadPoolExecutor
-from io import BytesIO
-
-import discord
-import matplotlib.pyplot as plt
-from discord.ext import commands
-
-from bot.bot import Bot
-
-# configure fonts and colors for matplotlib
-plt.rcParams.update(
- {
- "font.size": 16,
- "mathtext.fontset": "cm", # Computer Modern font set
- "mathtext.rm": "serif",
- "figure.facecolor": "36393F", # matches Discord's dark mode background color
- "text.color": "white",
- }
-)
-
-FORMATTED_CODE_REGEX = re.compile(
- r"(?P<delim>(?P<block>```)|``?)" # code delimiter: 1-3 backticks; (?P=block) only matches if it's a block
- r"(?(block)(?:(?P<lang>[a-z]+)\n)?)" # if we're in a block, match optional language (only letters plus newline)
- r"(?:[ \t]*\n)*" # any blank (empty or tabs/spaces only) lines before the code
- r"(?P<code>.*?)" # extract all code inside the markup
- r"\s*" # any more whitespace before the end of the code markup
- r"(?P=delim)", # match the exact same delimiter from the start again
- re.DOTALL | re.IGNORECASE, # "." also matches newlines, case insensitive
-)
-
-CACHE_DIRECTORY = pathlib.Path("_latex_cache")
-CACHE_DIRECTORY.mkdir(exist_ok=True)
-
-
-class Latex(commands.Cog):
- """Renders latex."""
-
- @staticmethod
- def _render(text: str, filepath: pathlib.Path) -> BytesIO:
- """
- Return the rendered image if latex compiles without errors, otherwise raise a BadArgument Exception.
-
- Saves rendered image to cache.
- """
- fig = plt.figure()
- rendered_image = BytesIO()
- fig.text(0, 1, text, horizontalalignment="left", verticalalignment="top")
-
- try:
- plt.savefig(rendered_image, bbox_inches="tight", dpi=600)
- except ValueError as e:
- raise commands.BadArgument(str(e))
-
- rendered_image.seek(0)
-
- with open(filepath, "wb") as f:
- f.write(rendered_image.getbuffer())
-
- return rendered_image
-
- @staticmethod
- def _prepare_input(text: str) -> str:
- text = text.replace(r"\\", "$\n$") # matplotlib uses \n for newlines, not \\
-
- if match := FORMATTED_CODE_REGEX.match(text):
- return match.group("code")
- else:
- return text
-
- @commands.command()
- @commands.max_concurrency(1, commands.BucketType.guild, wait=True)
- async def latex(self, ctx: commands.Context, *, text: str) -> None:
- """Renders the text in latex and sends the image."""
- text = self._prepare_input(text)
- query_hash = hashlib.md5(text.encode()).hexdigest()
- image_path = CACHE_DIRECTORY.joinpath(f"{query_hash}.png")
- async with ctx.typing():
- if image_path.exists():
- await ctx.send(file=discord.File(image_path))
- return
-
- with ThreadPoolExecutor() as pool:
- image = await asyncio.get_running_loop().run_in_executor(
- pool, self._render, text, image_path
- )
-
- await ctx.send(file=discord.File(image, "latex.png"))
-
-
-def setup(bot: Bot) -> None:
- """Load the Latex Cog."""
- # As we have resource issues on this cog,
- # we have it currently disabled while we fix it.
- import logging
- logging.info("Latex cog is currently disabled. It won't be loaded.")
- return
- bot.add_cog(Latex())
diff --git a/bot/exts/evergreen/pythonfacts.py b/bot/exts/evergreen/pythonfacts.py
deleted file mode 100644
index 80a8da5d..00000000
--- a/bot/exts/evergreen/pythonfacts.py
+++ /dev/null
@@ -1,36 +0,0 @@
-import itertools
-
-import discord
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import Colours
-
-with open("bot/resources/evergreen/python_facts.txt") as file:
- FACTS = itertools.cycle(list(file))
-
-COLORS = itertools.cycle([Colours.python_blue, Colours.python_yellow])
-PYFACTS_DISCUSSION = "https://github.com/python-discord/meta/discussions/93"
-
-
-class PythonFacts(commands.Cog):
- """Sends a random fun fact about Python."""
-
- @commands.command(name="pythonfact", aliases=("pyfact",))
- async def get_python_fact(self, ctx: commands.Context) -> None:
- """Sends a Random fun fact about Python."""
- embed = discord.Embed(
- title="Python Facts",
- description=next(FACTS),
- colour=next(COLORS)
- )
- embed.add_field(
- name="Suggestions",
- value=f"Suggest more facts [here!]({PYFACTS_DISCUSSION})"
- )
- await ctx.send(embed=embed)
-
-
-def setup(bot: Bot) -> None:
- """Load the PythonFacts Cog."""
- bot.add_cog(PythonFacts())
diff --git a/bot/exts/evergreen/realpython.py b/bot/exts/evergreen/realpython.py
deleted file mode 100644
index ef8b2638..00000000
--- a/bot/exts/evergreen/realpython.py
+++ /dev/null
@@ -1,81 +0,0 @@
-import logging
-from html import unescape
-from urllib.parse import quote_plus
-
-from discord import Embed
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import Colours
-
-logger = logging.getLogger(__name__)
-
-
-API_ROOT = "https://realpython.com/search/api/v1/"
-ARTICLE_URL = "https://realpython.com{article_url}"
-SEARCH_URL = "https://realpython.com/search?q={user_search}"
-
-
-ERROR_EMBED = Embed(
- title="Error while searching Real Python",
- description="There was an error while trying to reach Real Python. Please try again shortly.",
- color=Colours.soft_red,
-)
-
-
-class RealPython(commands.Cog):
- """User initiated command to search for a Real Python article."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @commands.command(aliases=["rp"])
- @commands.cooldown(1, 10, commands.cooldowns.BucketType.user)
- async def realpython(self, ctx: commands.Context, *, user_search: str) -> None:
- """Send 5 articles that match the user's search terms."""
- params = {"q": user_search, "limit": 5, "kind": "article"}
- async with self.bot.http_session.get(url=API_ROOT, params=params) as response:
- if response.status != 200:
- logger.error(
- f"Unexpected status code {response.status} from Real Python"
- )
- await ctx.send(embed=ERROR_EMBED)
- return
-
- data = await response.json()
-
- articles = data["results"]
-
- if len(articles) == 0:
- no_articles = Embed(
- title=f"No articles found for '{user_search}'", color=Colours.soft_red
- )
- await ctx.send(embed=no_articles)
- return
-
- if len(articles) == 1:
- article_description = "Here is the result:"
- else:
- article_description = f"Here are the top {len(articles)} results:"
-
- article_embed = Embed(
- title="Search results - Real Python",
- url=SEARCH_URL.format(user_search=quote_plus(user_search)),
- description=article_description,
- color=Colours.orange,
- )
-
- for article in articles:
- article_embed.add_field(
- name=unescape(article["title"]),
- value=ARTICLE_URL.format(article_url=article["url"]),
- inline=False,
- )
- article_embed.set_footer(text="Click the links to go to the articles.")
-
- await ctx.send(embed=article_embed)
-
-
-def setup(bot: Bot) -> None:
- """Load the Real Python Cog."""
- bot.add_cog(RealPython(bot))
diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py
deleted file mode 100644
index e6cb5337..00000000
--- a/bot/exts/evergreen/reddit.py
+++ /dev/null
@@ -1,368 +0,0 @@
-import asyncio
-import logging
-import random
-import textwrap
-from collections import namedtuple
-from datetime import datetime, timedelta
-from typing import Union
-
-from aiohttp import BasicAuth, ClientError
-from discord import Colour, Embed, TextChannel
-from discord.ext.commands import Cog, Context, group, has_any_role
-from discord.ext.tasks import loop
-from discord.utils import escape_markdown, sleep_until
-
-from bot.bot import Bot
-from bot.constants import Channels, ERROR_REPLIES, Emojis, Reddit as RedditConfig, STAFF_ROLES
-from bot.utils.converters import Subreddit
-from bot.utils.extensions import invoke_help_command
-from bot.utils.messages import sub_clyde
-from bot.utils.pagination import ImagePaginator, LinePaginator
-
-log = logging.getLogger(__name__)
-
-AccessToken = namedtuple("AccessToken", ["token", "expires_at"])
-
-
-class Reddit(Cog):
- """Track subreddit posts and show detailed statistics about them."""
-
- HEADERS = {"User-Agent": "python3:python-discord/bot:1.0.0 (by /u/PythonDiscord)"}
- URL = "https://www.reddit.com"
- OAUTH_URL = "https://oauth.reddit.com"
- MAX_RETRIES = 3
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- self.webhook = None
- self.access_token = None
- self.client_auth = BasicAuth(RedditConfig.client_id, RedditConfig.secret)
-
- bot.loop.create_task(self.init_reddit_ready())
- self.auto_poster_loop.start()
-
- def cog_unload(self) -> None:
- """Stop the loop task and revoke the access token when the cog is unloaded."""
- self.auto_poster_loop.cancel()
- if self.access_token and self.access_token.expires_at > datetime.utcnow():
- asyncio.create_task(self.revoke_access_token())
-
- async def init_reddit_ready(self) -> None:
- """Sets the reddit webhook when the cog is loaded."""
- await self.bot.wait_until_guild_available()
- if not self.webhook:
- self.webhook = await self.bot.fetch_webhook(RedditConfig.webhook)
-
- @property
- def channel(self) -> TextChannel:
- """Get the #reddit channel object from the bot's cache."""
- return self.bot.get_channel(Channels.reddit)
-
- def build_pagination_pages(self, posts: list[dict], paginate: bool) -> Union[list[tuple], str]:
- """Build embed pages required for Paginator."""
- pages = []
- first_page = ""
- for post in posts:
- post_page = ""
- image_url = ""
-
- data = post["data"]
-
- title = textwrap.shorten(data["title"], width=50, placeholder="...")
-
- # Normal brackets interfere with Markdown.
- title = escape_markdown(title).replace("[", "⦋").replace("]", "⦌")
- link = self.URL + data["permalink"]
-
- first_page += f"**[{title.replace('*', '')}]({link})**\n"
-
- text = data["selftext"]
- if text:
- text = escape_markdown(text).replace("[", "⦋").replace("]", "⦌")
- first_page += textwrap.shorten(text, width=100, placeholder="...") + "\n"
-
- ups = data["ups"]
- comments = data["num_comments"]
- author = data["author"]
-
- content_type = Emojis.reddit_post_text
- if data["is_video"] or {"youtube", "youtu.be"}.issubset(set(data["url"].split("."))):
- # This means the content type in the post is a video.
- content_type = f"{Emojis.reddit_post_video}"
-
- elif data["url"].endswith(("jpg", "png", "gif")):
- # This means the content type in the post is an image.
- content_type = f"{Emojis.reddit_post_photo}"
- image_url = data["url"]
-
- first_page += (
- f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}"
- f"\u2002{comments}\u2003{Emojis.reddit_users}{author}\n\n"
- )
-
- if paginate:
- post_page += f"**[{title}]({link})**\n\n"
- if text:
- post_page += textwrap.shorten(text, width=252, placeholder="...") + "\n\n"
- post_page += (
- f"{content_type}\u2003{Emojis.reddit_upvote}{ups}\u2003{Emojis.reddit_comments}\u2002"
- f"{comments}\u2003{Emojis.reddit_users}{author}"
- )
-
- pages.append((post_page, image_url))
-
- if not paginate:
- # Return the first summery page if pagination is not required
- return first_page
-
- pages.insert(0, (first_page, "")) # Using image paginator, hence settings image url to empty string
- return pages
-
- async def get_access_token(self) -> None:
- """
- Get a Reddit API OAuth2 access token and assign it to self.access_token.
-
- A token is valid for 1 hour. There will be MAX_RETRIES to get a token, after which the cog
- will be unloaded and a ClientError raised if retrieval was still unsuccessful.
- """
- for i in range(1, self.MAX_RETRIES + 1):
- response = await self.bot.http_session.post(
- url=f"{self.URL}/api/v1/access_token",
- headers=self.HEADERS,
- auth=self.client_auth,
- data={
- "grant_type": "client_credentials",
- "duration": "temporary"
- }
- )
-
- if response.status == 200 and response.content_type == "application/json":
- content = await response.json()
- expiration = int(content["expires_in"]) - 60 # Subtract 1 minute for leeway.
- self.access_token = AccessToken(
- token=content["access_token"],
- expires_at=datetime.utcnow() + timedelta(seconds=expiration)
- )
-
- log.debug(f"New token acquired; expires on UTC {self.access_token.expires_at}")
- return
- else:
- log.debug(
- f"Failed to get an access token: "
- f"status {response.status} & content type {response.content_type}; "
- f"retrying ({i}/{self.MAX_RETRIES})"
- )
-
- await asyncio.sleep(3)
-
- self.bot.remove_cog(self.qualified_name)
- raise ClientError("Authentication with the Reddit API failed. Unloading the cog.")
-
- async def revoke_access_token(self) -> None:
- """
- Revoke the OAuth2 access token for the Reddit API.
-
- For security reasons, it's good practice to revoke the token when it's no longer being used.
- """
- response = await self.bot.http_session.post(
- url=f"{self.URL}/api/v1/revoke_token",
- headers=self.HEADERS,
- auth=self.client_auth,
- data={
- "token": self.access_token.token,
- "token_type_hint": "access_token"
- }
- )
-
- if response.status in [200, 204] and response.content_type == "application/json":
- self.access_token = None
- else:
- log.warning(f"Unable to revoke access token: status {response.status}.")
-
- async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> list[dict]:
- """A helper method to fetch a certain amount of Reddit posts at a given route."""
- # Reddit's JSON responses only provide 25 posts at most.
- if not 25 >= amount > 0:
- raise ValueError("Invalid amount of subreddit posts requested.")
-
- # Renew the token if necessary.
- if not self.access_token or self.access_token.expires_at < datetime.utcnow():
- await self.get_access_token()
-
- url = f"{self.OAUTH_URL}/{route}"
- for _ in range(self.MAX_RETRIES):
- response = await self.bot.http_session.get(
- url=url,
- headers={**self.HEADERS, "Authorization": f"bearer {self.access_token.token}"},
- params=params
- )
- if response.status == 200 and response.content_type == 'application/json':
- # Got appropriate response - process and return.
- content = await response.json()
- posts = content["data"]["children"]
-
- filtered_posts = [post for post in posts if not post["data"]["over_18"]]
-
- return filtered_posts[:amount]
-
- await asyncio.sleep(3)
-
- log.debug(f"Invalid response from: {url} - status code {response.status}, mimetype {response.content_type}")
- return list() # Failed to get appropriate response within allowed number of retries.
-
- async def get_top_posts(
- self, subreddit: Subreddit, time: str = "all", amount: int = 5, paginate: bool = False
- ) -> Union[Embed, list[tuple]]:
- """
- Get the top amount of posts for a given subreddit within a specified timeframe.
-
- A time of "all" will get posts from all time, "day" will get top daily posts and "week" will get the top
- weekly posts.
-
- The amount should be between 0 and 25 as Reddit's JSON requests only provide 25 posts at most.
- """
- embed = Embed()
-
- posts = await self.fetch_posts(
- route=f"{subreddit}/top",
- amount=amount,
- params={"t": time}
- )
- if not posts:
- embed.title = random.choice(ERROR_REPLIES)
- embed.colour = Colour.red()
- embed.description = (
- "Sorry! We couldn't find any SFW posts from that subreddit. "
- "If this problem persists, please let us know."
- )
-
- return embed
-
- if paginate:
- return self.build_pagination_pages(posts, paginate=True)
-
- # Use only starting summary page for #reddit channel posts.
- embed.description = self.build_pagination_pages(posts, paginate=False)
- embed.colour = Colour.blurple()
- return embed
-
- @loop()
- async def auto_poster_loop(self) -> None:
- """Post the top 5 posts daily, and the top 5 posts weekly."""
- # once d.py get support for `time` parameter in loop decorator,
- # this can be removed and the loop can use the `time=datetime.time.min` parameter
- now = datetime.utcnow()
- tomorrow = now + timedelta(days=1)
- midnight_tomorrow = tomorrow.replace(hour=0, minute=0, second=0)
-
- await sleep_until(midnight_tomorrow)
-
- await self.bot.wait_until_guild_available()
- if not self.webhook:
- await self.bot.fetch_webhook(RedditConfig.webhook)
-
- if datetime.utcnow().weekday() == 0:
- await self.top_weekly_posts()
- # if it's a monday send the top weekly posts
-
- for subreddit in RedditConfig.subreddits:
- top_posts = await self.get_top_posts(subreddit=subreddit, time="day")
- username = sub_clyde(f"{subreddit} Top Daily Posts")
- message = await self.webhook.send(username=username, embed=top_posts, wait=True)
-
- if message.channel.is_news():
- await message.publish()
-
- async def top_weekly_posts(self) -> None:
- """Post a summary of the top posts."""
- for subreddit in RedditConfig.subreddits:
- # Send and pin the new weekly posts.
- top_posts = await self.get_top_posts(subreddit=subreddit, time="week")
- username = sub_clyde(f"{subreddit} Top Weekly Posts")
- message = await self.webhook.send(wait=True, username=username, embed=top_posts)
-
- if subreddit.lower() == "r/python":
- if not self.channel:
- log.warning("Failed to get #reddit channel to remove pins in the weekly loop.")
- return
-
- # Remove the oldest pins so that only 12 remain at most.
- pins = await self.channel.pins()
-
- while len(pins) >= 12:
- await pins[-1].unpin()
- del pins[-1]
-
- await message.pin()
-
- if message.channel.is_news():
- await message.publish()
-
- @group(name="reddit", invoke_without_command=True)
- async def reddit_group(self, ctx: Context) -> None:
- """View the top posts from various subreddits."""
- await invoke_help_command(ctx)
-
- @reddit_group.command(name="top")
- async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
- """Send the top posts of all time from a given subreddit."""
- async with ctx.typing():
- pages = await self.get_top_posts(subreddit=subreddit, time="all", paginate=True)
-
- await ctx.send(f"Here are the top {subreddit} posts of all time!")
- embed = Embed(
- color=Colour.blurple()
- )
-
- await ImagePaginator.paginate(pages, ctx, embed)
-
- @reddit_group.command(name="daily")
- async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
- """Send the top posts of today from a given subreddit."""
- async with ctx.typing():
- pages = await self.get_top_posts(subreddit=subreddit, time="day", paginate=True)
-
- await ctx.send(f"Here are today's top {subreddit} posts!")
- embed = Embed(
- color=Colour.blurple()
- )
-
- await ImagePaginator.paginate(pages, ctx, embed)
-
- @reddit_group.command(name="weekly")
- async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
- """Send the top posts of this week from a given subreddit."""
- async with ctx.typing():
- pages = await self.get_top_posts(subreddit=subreddit, time="week", paginate=True)
-
- await ctx.send(f"Here are this week's top {subreddit} posts!")
- embed = Embed(
- color=Colour.blurple()
- )
-
- await ImagePaginator.paginate(pages, ctx, embed)
-
- @has_any_role(*STAFF_ROLES)
- @reddit_group.command(name="subreddits", aliases=("subs",))
- async def subreddits_command(self, ctx: Context) -> None:
- """Send a paginated embed of all the subreddits we're relaying."""
- embed = Embed()
- embed.title = "Relayed subreddits."
- embed.colour = Colour.blurple()
-
- await LinePaginator.paginate(
- RedditConfig.subreddits,
- ctx, embed,
- footer_text="Use the reddit commands along with these to view their posts.",
- empty=False,
- max_lines=15
- )
-
-
-def setup(bot: Bot) -> None:
- """Load the Reddit cog."""
- if not RedditConfig.secret or not RedditConfig.client_id:
- log.error("Credentials not provided, cog not loaded.")
- return
- bot.add_cog(Reddit(bot))
diff --git a/bot/exts/evergreen/stackoverflow.py b/bot/exts/evergreen/stackoverflow.py
deleted file mode 100644
index 64455e33..00000000
--- a/bot/exts/evergreen/stackoverflow.py
+++ /dev/null
@@ -1,88 +0,0 @@
-import logging
-from html import unescape
-from urllib.parse import quote_plus
-
-from discord import Embed, HTTPException
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.constants import Colours, Emojis
-
-logger = logging.getLogger(__name__)
-
-BASE_URL = "https://api.stackexchange.com/2.2/search/advanced"
-SO_PARAMS = {
- "order": "desc",
- "sort": "activity",
- "site": "stackoverflow"
-}
-SEARCH_URL = "https://stackoverflow.com/search?q={query}"
-ERR_EMBED = Embed(
- title="Error in fetching results from Stackoverflow",
- description=(
- "Sorry, there was en error while trying to fetch data from the Stackoverflow website. Please try again in some "
- "time. If this issue persists, please contact the staff or send a message in #dev-contrib."
- ),
- color=Colours.soft_red
-)
-
-
-class Stackoverflow(commands.Cog):
- """Contains command to interact with stackoverflow from discord."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @commands.command(aliases=["so"])
- @commands.cooldown(1, 15, commands.cooldowns.BucketType.user)
- async def stackoverflow(self, ctx: commands.Context, *, search_query: str) -> None:
- """Sends the top 5 results of a search query from stackoverflow."""
- params = SO_PARAMS | {"q": search_query}
- async with self.bot.http_session.get(url=BASE_URL, params=params) as response:
- if response.status == 200:
- data = await response.json()
- else:
- logger.error(f'Status code is not 200, it is {response.status}')
- await ctx.send(embed=ERR_EMBED)
- return
- if not data['items']:
- no_search_result = Embed(
- title=f"No search results found for {search_query}",
- color=Colours.soft_red
- )
- await ctx.send(embed=no_search_result)
- return
-
- top5 = data["items"][:5]
- encoded_search_query = quote_plus(search_query)
- embed = Embed(
- title="Search results - Stackoverflow",
- url=SEARCH_URL.format(query=encoded_search_query),
- description=f"Here are the top {len(top5)} results:",
- color=Colours.orange
- )
- for item in top5:
- embed.add_field(
- name=unescape(item['title']),
- value=(
- f"[{Emojis.reddit_upvote} {item['score']} "
- f"{Emojis.stackoverflow_views} {item['view_count']} "
- f"{Emojis.reddit_comments} {item['answer_count']} "
- f"{Emojis.stackoverflow_tag} {', '.join(item['tags'][:3])}]"
- f"({item['link']})"
- ),
- inline=False)
- embed.set_footer(text="View the original link for more results.")
- try:
- await ctx.send(embed=embed)
- except HTTPException:
- search_query_too_long = Embed(
- title="Your search query is too long, please try shortening your search query",
- color=Colours.soft_red
- )
- await ctx.send(embed=search_query_too_long)
-
-
-def setup(bot: Bot) -> None:
- """Load the Stackoverflow Cog."""
- bot.add_cog(Stackoverflow(bot))
diff --git a/bot/exts/evergreen/timed.py b/bot/exts/evergreen/timed.py
deleted file mode 100644
index 2ea6b419..00000000
--- a/bot/exts/evergreen/timed.py
+++ /dev/null
@@ -1,48 +0,0 @@
-from copy import copy
-from time import perf_counter
-
-from discord import Message
-from discord.ext import commands
-
-from bot.bot import Bot
-
-
-class TimedCommands(commands.Cog):
- """Time the command execution of a command."""
-
- @staticmethod
- async def create_execution_context(ctx: commands.Context, command: str) -> commands.Context:
- """Get a new execution context for a command."""
- msg: Message = copy(ctx.message)
- msg.content = f"{ctx.prefix}{command}"
-
- return await ctx.bot.get_context(msg)
-
- @commands.command(name="timed", aliases=("time", "t"))
- async def timed(self, ctx: commands.Context, *, command: str) -> None:
- """Time the command execution of a command."""
- new_ctx = await self.create_execution_context(ctx, command)
-
- ctx.subcontext = new_ctx
-
- if not ctx.subcontext.command:
- help_command = f"{ctx.prefix}help"
- error = f"The command you are trying to time doesn't exist. Use `{help_command}` for a list of commands."
-
- await ctx.send(error)
- return
-
- if new_ctx.command.qualified_name == "timed":
- await ctx.send("You are not allowed to time the execution of the `timed` command.")
- return
-
- t_start = perf_counter()
- await new_ctx.command.invoke(new_ctx)
- t_end = perf_counter()
-
- await ctx.send(f"Command execution for `{new_ctx.command}` finished in {(t_end - t_start):.4f} seconds.")
-
-
-def setup(bot: Bot) -> None:
- """Load the Timed cog."""
- bot.add_cog(TimedCommands())
diff --git a/bot/exts/evergreen/wikipedia.py b/bot/exts/evergreen/wikipedia.py
deleted file mode 100644
index eccc1f8c..00000000
--- a/bot/exts/evergreen/wikipedia.py
+++ /dev/null
@@ -1,100 +0,0 @@
-import logging
-import re
-from datetime import datetime
-from html import unescape
-
-from discord import Color, Embed, TextChannel
-from discord.ext import commands
-
-from bot.bot import Bot
-from bot.utils import LinePaginator
-from bot.utils.exceptions import APIError
-
-log = logging.getLogger(__name__)
-
-SEARCH_API = (
- "https://en.wikipedia.org/w/api.php"
-)
-WIKI_PARAMS = {
- "action": "query",
- "list": "search",
- "prop": "info",
- "inprop": "url",
- "utf8": "",
- "format": "json",
- "origin": "*",
-
-}
-WIKI_THUMBNAIL = (
- "https://upload.wikimedia.org/wikipedia/en/thumb/8/80/Wikipedia-logo-v2.svg"
- "/330px-Wikipedia-logo-v2.svg.png"
-)
-WIKI_SNIPPET_REGEX = r"(<!--.*?-->|<[^>]*>)"
-WIKI_SEARCH_RESULT = (
- "**[{name}]({url})**\n"
- "{description}\n"
-)
-
-
-class WikipediaSearch(commands.Cog):
- """Get info from wikipedia."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- async def wiki_request(self, channel: TextChannel, search: str) -> list[str]:
- """Search wikipedia search string and return formatted first 10 pages found."""
- params = WIKI_PARAMS | {"srlimit": 10, "srsearch": search}
- async with self.bot.http_session.get(url=SEARCH_API, params=params) as resp:
- if resp.status != 200:
- log.info(f"Unexpected response `{resp.status}` while searching wikipedia for `{search}`")
- raise APIError("Wikipedia API", resp.status)
-
- raw_data = await resp.json()
-
- if not raw_data.get("query"):
- if error := raw_data.get("errors"):
- log.error(f"There was an error while communicating with the Wikipedia API: {error}")
- raise APIError("Wikipedia API", resp.status, error)
-
- lines = []
- if raw_data["query"]["searchinfo"]["totalhits"]:
- for article in raw_data["query"]["search"]:
- line = WIKI_SEARCH_RESULT.format(
- name=article["title"],
- description=unescape(
- re.sub(
- WIKI_SNIPPET_REGEX, "", article["snippet"]
- )
- ),
- url=f"https://en.wikipedia.org/?curid={article['pageid']}"
- )
- lines.append(line)
-
- return lines
-
- @commands.cooldown(1, 10, commands.BucketType.user)
- @commands.command(name="wikipedia", aliases=("wiki",))
- async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None:
- """Sends paginated top 10 results of Wikipedia search.."""
- contents = await self.wiki_request(ctx.channel, search)
-
- if contents:
- embed = Embed(
- title="Wikipedia Search Results",
- colour=Color.blurple()
- )
- embed.set_thumbnail(url=WIKI_THUMBNAIL)
- embed.timestamp = datetime.utcnow()
- await LinePaginator.paginate(
- contents, ctx, embed
- )
- else:
- await ctx.send(
- "Sorry, we could not find a wikipedia article using that search term."
- )
-
-
-def setup(bot: Bot) -> None:
- """Load the WikipediaSearch cog."""
- bot.add_cog(WikipediaSearch(bot))
diff --git a/bot/exts/evergreen/wolfram.py b/bot/exts/evergreen/wolfram.py
deleted file mode 100644
index 9a26e545..00000000
--- a/bot/exts/evergreen/wolfram.py
+++ /dev/null
@@ -1,293 +0,0 @@
-import logging
-from io import BytesIO
-from typing import Callable, Optional
-from urllib.parse import urlencode
-
-import arrow
-import discord
-from discord import Embed
-from discord.ext import commands
-from discord.ext.commands import BucketType, Cog, Context, check, group
-
-from bot.bot import Bot
-from bot.constants import Colours, STAFF_ROLES, Wolfram
-from bot.utils.pagination import ImagePaginator
-
-log = logging.getLogger(__name__)
-
-APPID = Wolfram.key
-DEFAULT_OUTPUT_FORMAT = "JSON"
-QUERY = "http://api.wolframalpha.com/v2/{request}"
-WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1"
-
-MAX_PODS = 20
-
-# Allows for 10 wolfram calls pr user pr day
-usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user)
-
-# Allows for max api requests / days in month per day for the entire guild (Temporary)
-guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild)
-
-
-async def send_embed(
- ctx: Context,
- message_txt: str,
- colour: int = Colours.soft_red,
- footer: str = None,
- img_url: str = None,
- f: discord.File = None
-) -> None:
- """Generate & send a response embed with Wolfram as the author."""
- embed = Embed(colour=colour)
- embed.description = message_txt
- embed.set_author(
- name="Wolfram Alpha",
- icon_url=WOLF_IMAGE,
- url="https://www.wolframalpha.com/"
- )
- if footer:
- embed.set_footer(text=footer)
-
- if img_url:
- embed.set_image(url=img_url)
-
- await ctx.send(embed=embed, file=f)
-
-
-def custom_cooldown(*ignore: int) -> Callable:
- """
- Implement per-user and per-guild cooldowns for requests to the Wolfram API.
-
- A list of roles may be provided to ignore the per-user cooldown.
- """
- async def predicate(ctx: Context) -> bool:
- if ctx.invoked_with == "help":
- # if the invoked command is help we don't want to increase the ratelimits since it's not actually
- # invoking the command/making a request, so instead just check if the user/guild are on cooldown.
- guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown
- # check the message is in a guild, and check user bucket if user is not ignored
- if ctx.guild and not any(r.id in ignore for r in ctx.author.roles):
- return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0
- return guild_cooldown
-
- user_bucket = usercd.get_bucket(ctx.message)
-
- if all(role.id not in ignore for role in ctx.author.roles):
- user_rate = user_bucket.update_rate_limit()
-
- if user_rate:
- # Can't use api; cause: member limit
- cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True)
- message = (
- "You've used up your limit for Wolfram|Alpha requests.\n"
- f"Cooldown: {cooldown}"
- )
- await send_embed(ctx, message)
- return False
-
- guild_bucket = guildcd.get_bucket(ctx.message)
- guild_rate = guild_bucket.update_rate_limit()
-
- # Repr has a token attribute to read requests left
- log.debug(guild_bucket)
-
- if guild_rate:
- # Can't use api; cause: guild limit
- message = (
- "The max limit of requests for the server has been reached for today.\n"
- f"Cooldown: {int(guild_rate)}"
- )
- await send_embed(ctx, message)
- return False
-
- return True
-
- return check(predicate)
-
-
-async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]:
- """Get the Wolfram API pod pages for the provided query."""
- async with ctx.typing():
- params = {
- "input": query,
- "appid": APPID,
- "output": DEFAULT_OUTPUT_FORMAT,
- "format": "image,plaintext",
- "location": "the moon",
- "latlong": "0.0,0.0",
- "ip": "1.1.1.1"
- }
- request_url = QUERY.format(request="query")
-
- async with bot.http_session.get(url=request_url, params=params) as response:
- json = await response.json(content_type="text/plain")
-
- result = json["queryresult"]
- log_full_url = f"{request_url}?{urlencode(params)}"
- if result["error"]:
- # API key not set up correctly
- if result["error"]["msg"] == "Invalid appid":
- message = "Wolfram API key is invalid or missing."
- log.warning(
- "API key seems to be missing, or invalid when "
- f"processing a wolfram request: {log_full_url}, Response: {json}"
- )
- await send_embed(ctx, message)
- return None
-
- message = "Something went wrong internally with your request, please notify staff!"
- log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}")
- await send_embed(ctx, message)
- return None
-
- if not result["success"]:
- message = f"I couldn't find anything for {query}."
- await send_embed(ctx, message)
- return None
-
- if not result["numpods"]:
- message = "Could not find any results."
- await send_embed(ctx, message)
- return None
-
- pods = result["pods"]
- pages = []
- for pod in pods[:MAX_PODS]:
- subs = pod.get("subpods")
-
- for sub in subs:
- title = sub.get("title") or sub.get("plaintext") or sub.get("id", "")
- img = sub["img"]["src"]
- pages.append((title, img))
- return pages
-
-
-class Wolfram(Cog):
- """Commands for interacting with the Wolfram|Alpha API."""
-
- def __init__(self, bot: Bot):
- self.bot = bot
-
- @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
- @custom_cooldown(*STAFF_ROLES)
- async def wolfram_command(self, ctx: Context, *, query: str) -> None:
- """Requests all answers on a single image, sends an image of all related pods."""
- params = {
- "i": query,
- "appid": APPID,
- "location": "the moon",
- "latlong": "0.0,0.0",
- "ip": "1.1.1.1"
- }
- request_url = QUERY.format(request="simple")
-
- # Give feedback that the bot is working.
- async with ctx.typing():
- async with self.bot.http_session.get(url=request_url, params=params) as response:
- status = response.status
- image_bytes = await response.read()
-
- f = discord.File(BytesIO(image_bytes), filename="image.png")
- image_url = "attachment://image.png"
-
- if status == 501:
- message = "Failed to get response."
- footer = ""
- color = Colours.soft_red
- elif status == 400:
- message = "No input found."
- footer = ""
- color = Colours.soft_red
- elif status == 403:
- message = "Wolfram API key is invalid or missing."
- footer = ""
- color = Colours.soft_red
- else:
- message = ""
- footer = "View original for a bigger picture."
- color = Colours.soft_orange
-
- # Sends a "blank" embed if no request is received, unsure how to fix
- await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f)
-
- @wolfram_command.command(name="page", aliases=("pa", "p"))
- @custom_cooldown(*STAFF_ROLES)
- async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests a drawn image of given query.
-
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
- """
- pages = await get_pod_pages(ctx, self.bot, query)
-
- if not pages:
- return
-
- embed = Embed()
- embed.set_author(
- name="Wolfram Alpha",
- icon_url=WOLF_IMAGE,
- url="https://www.wolframalpha.com/"
- )
- embed.colour = Colours.soft_orange
-
- await ImagePaginator.paginate(pages, ctx, embed)
-
- @wolfram_command.command(name="cut", aliases=("c",))
- @custom_cooldown(*STAFF_ROLES)
- async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests a drawn image of given query.
-
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
- """
- pages = await get_pod_pages(ctx, self.bot, query)
-
- if not pages:
- return
-
- if len(pages) >= 2:
- page = pages[1]
- else:
- page = pages[0]
-
- await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1])
-
- @wolfram_command.command(name="short", aliases=("sh", "s"))
- @custom_cooldown(*STAFF_ROLES)
- async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
- """Requests an answer to a simple question."""
- params = {
- "i": query,
- "appid": APPID,
- "location": "the moon",
- "latlong": "0.0,0.0",
- "ip": "1.1.1.1"
- }
- request_url = QUERY.format(request="result")
-
- # Give feedback that the bot is working.
- async with ctx.typing():
- async with self.bot.http_session.get(url=request_url, params=params) as response:
- status = response.status
- response_text = await response.text()
-
- if status == 501:
- message = "Failed to get response."
- color = Colours.soft_red
- elif status == 400:
- message = "No input found."
- color = Colours.soft_red
- elif response_text == "Error 1: Invalid appid.":
- message = "Wolfram API key is invalid or missing."
- color = Colours.soft_red
- else:
- message = response_text
- color = Colours.soft_orange
-
- await send_embed(ctx, message, color)
-
-
-def setup(bot: Bot) -> None:
- """Load the Wolfram cog."""
- bot.add_cog(Wolfram(bot))