diff options
Diffstat (limited to 'bot/exts/evergreen')
29 files changed, 1103 insertions, 694 deletions
diff --git a/bot/exts/evergreen/8bitify.py b/bot/exts/evergreen/8bitify.py index 60062fc1..c048d9bf 100644 --- a/bot/exts/evergreen/8bitify.py +++ b/bot/exts/evergreen/8bitify.py @@ -14,7 +14,7 @@ class EightBitify(commands.Cog): @staticmethod def pixelate(image: Image) -> Image: """Takes an image and pixelates it.""" - return image.resize((32, 32)).resize((1024, 1024)) + return image.resize((32, 32), resample=Image.NEAREST).resize((1024, 1024), resample=Image.NEAREST) @staticmethod def quantize(image: Image) -> Image: diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py index 73908702..5fa05d2e 100644 --- a/bot/exts/evergreen/bookmark.py +++ b/bot/exts/evergreen/bookmark.py @@ -5,6 +5,7 @@ import discord from discord.ext import commands from bot.constants import Colours, ERROR_REPLIES, Emojis, Icons +from bot.utils.converters import WrappedMessageConverter log = logging.getLogger(__name__) @@ -19,7 +20,7 @@ class Bookmark(commands.Cog): async def bookmark( self, ctx: commands.Context, - target_message: discord.Message, + target_message: WrappedMessageConverter, *, title: str = "Bookmark" ) -> None: diff --git a/bot/exts/evergreen/branding.py b/bot/exts/evergreen/branding.py deleted file mode 100644 index 1d463c5b..00000000 --- a/bot/exts/evergreen/branding.py +++ /dev/null @@ -1,543 +0,0 @@ -import asyncio -import itertools -import json -import logging -import random -import typing as t -from datetime import datetime, time, timedelta -from pathlib import Path - -import arrow -import discord -from discord.embeds import EmptyEmbed -from discord.ext import commands - -from bot.bot import SeasonalBot -from bot.constants import Branding, Colours, Emojis, MODERATION_ROLES, Tokens -from bot.seasons import SeasonBase, get_all_seasons, get_current_season, get_season -from bot.utils import human_months -from bot.utils.decorators import with_role -from bot.utils.exceptions import BrandingError -from bot.utils.persist import make_persistent - -log = logging.getLogger(__name__) - -STATUS_OK = 200 # HTTP status code - -FILE_BANNER = "banner.png" -FILE_AVATAR = "avatar.png" -SERVER_ICONS = "server_icons" - -BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" - -PARAMS = {"ref": "master"} # Target branch -HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3 - -# A GitHub token is not necessary for the cog to operate, -# unauthorized requests are however limited to 60 per hour -if Tokens.github: - HEADERS["Authorization"] = f"token {Tokens.github}" - - -class GitHubFile(t.NamedTuple): - """ - Represents a remote file on GitHub. - - The `sha` hash is kept so that we can determine that a file has changed, - despite its filename remaining unchanged. - """ - - download_url: str - path: str - sha: str - - -def pretty_files(files: t.Iterable[GitHubFile]) -> str: - """Provide a human-friendly representation of `files`.""" - return "\n".join(file.path for file in files) - - -def time_until_midnight() -> timedelta: - """ - Determine amount of time until the next-up UTC midnight. - - The exact `midnight` moment is actually delayed to 5 seconds after, in order - to avoid potential problems due to imprecise sleep. - """ - now = datetime.utcnow() - tomorrow = now + timedelta(days=1) - midnight = datetime.combine(tomorrow, time(second=5)) - - return midnight - now - - -class BrandingManager(commands.Cog): - """ - Manages the guild's branding. - - The purpose of this cog is to help automate the synchronization of the branding - repository with the guild. It is capable of discovering assets in the repository - via GitHub's API, resolving download urls for them, and delegating - to the `bot` instance to upload them to the guild. - - BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens - once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single - season. The daemon can be turned on and off via the `daemon` cmd group. The value set via - its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will - automatically start on the next bot start-up. Otherwise, it will wait to be started manually. - - All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can - also be invoked manually, via the following API: - - branding list - - Show all available seasons - - branding set <season_name> - - Set the cog's internal state to represent `season_name`, if it exists - - If no `season_name` is given, set chronologically current season - - This will not automatically apply the season's branding to the guild, - the cog's state can be detached from the guild - - Seasons can therefore be 'previewed' using this command - - branding info - - View detailed information about resolved assets for current season - - branding refresh - - Refresh internal state, i.e. synchronize with branding repository - - branding apply - - Apply the current internal state to the guild, i.e. upload the assets - - branding cycle - - If there are multiple available icons for current season, randomly pick - and apply the next one - - The daemon calls these methods autonomously as appropriate. The use of this cog - is locked to moderation roles. As it performs media asset uploads, it is prone to - rate-limits - the `apply` command should be used with caution. The `set` command can, - however, be used freely to 'preview' seasonal branding and check whether paths have been - resolved as appropriate. - - While the bot is in debug mode, it will 'mock' asset uploads by logging the passed - download urls and pretending that the upload was successful. Make use of this - to test this cog's behaviour. - """ - - current_season: t.Type[SeasonBase] - - banner: t.Optional[GitHubFile] - avatar: t.Optional[GitHubFile] - - available_icons: t.List[GitHubFile] - remaining_icons: t.List[GitHubFile] - - days_since_cycle: t.Iterator - - config_file: Path - - daemon: t.Optional[asyncio.Task] - - def __init__(self, bot: SeasonalBot) -> None: - """ - Assign safe default values on init. - - At this point, we don't have information about currently available branding. - Most of these attributes will be overwritten once the daemon connects, or once - the `refresh` command is used. - """ - self.bot = bot - self.current_season = get_current_season() - - self.banner = None - self.avatar = None - - self.available_icons = [] - self.remaining_icons = [] - - self.days_since_cycle = itertools.cycle([None]) - - self.config_file = make_persistent(Path("bot", "resources", "evergreen", "branding.json")) - should_run = self._read_config()["daemon_active"] - - if should_run: - self.daemon = self.bot.loop.create_task(self._daemon_func()) - else: - self.daemon = None - - @property - def _daemon_running(self) -> bool: - """True if the daemon is currently active, False otherwise.""" - return self.daemon is not None and not self.daemon.done() - - def _read_config(self) -> t.Dict[str, bool]: - """Read and return persistent config file.""" - with self.config_file.open("r") as persistent_file: - return json.load(persistent_file) - - def _write_config(self, key: str, value: bool) -> None: - """Write a `key`, `value` pair to persistent config file.""" - current_config = self._read_config() - current_config[key] = value - - with self.config_file.open("w") as persistent_file: - json.dump(current_config, persistent_file) - - async def _daemon_func(self) -> None: - """ - Manage all automated behaviour of the BrandingManager cog. - - Once a day, the daemon will perform the following tasks: - - Update `current_season` - - Poll GitHub API to see if the available branding for `current_season` has changed - - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname) - - Check whether it's time to cycle guild icons - - The internal loop runs once when activated, then periodically at the time - given by `time_until_midnight`. - - All method calls in the internal loop are considered safe, i.e. no errors propagate - to the daemon's loop. The daemon itself does not perform any error handling on its own. - """ - await self.bot.wait_until_guild_available() - - while True: - self.current_season = get_current_season() - branding_changed = await self.refresh() - - if branding_changed: - await self.apply() - - elif next(self.days_since_cycle) == Branding.cycle_frequency: - await self.cycle() - - until_midnight = time_until_midnight() - await asyncio.sleep(until_midnight.total_seconds()) - - async def _info_embed(self) -> discord.Embed: - """Make an informative embed representing current season.""" - info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour) - - # If we're in a non-evergreen season, also show active months - if self.current_season is not SeasonBase: - title = f"{self.current_season.season_name} ({human_months(self.current_season.months)})" - else: - title = self.current_season.season_name - - # Use the author field to show the season's name and avatar if available - info_embed.set_author(name=title, icon_url=self.avatar.download_url if self.avatar else EmptyEmbed) - - banner = self.banner.path if self.banner is not None else "Unavailable" - info_embed.add_field(name="Banner", value=banner, inline=False) - - avatar = self.avatar.path if self.avatar is not None else "Unavailable" - info_embed.add_field(name="Avatar", value=avatar, inline=False) - - icons = pretty_files(self.available_icons) or "Unavailable" - info_embed.add_field(name="Available icons", value=icons, inline=False) - - # Only display cycle frequency if we're actually cycling - if len(self.available_icons) > 1 and Branding.cycle_frequency: - info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}") - - return info_embed - - async def _reset_remaining_icons(self) -> None: - """Set `remaining_icons` to a shuffled copy of `available_icons`.""" - self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons)) - - async def _reset_days_since_cycle(self) -> None: - """ - Reset the `days_since_cycle` iterator based on configured frequency. - - If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey, - the iterator will always yield None. This signals that the icon shouldn't be cycled. - - Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely. - When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle. - """ - if len(self.available_icons) > 1 and Branding.cycle_frequency: - sequence = range(1, Branding.cycle_frequency + 1) - else: - sequence = [None] - - self.days_since_cycle = itertools.cycle(sequence) - - async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]: - """ - Get files at `path` in the branding repository. - - If `include_dirs` is False (default), only returns files at `path`. - Otherwise, will return both files and directories. Never returns symlinks. - - Return dict mapping from filename to corresponding `GitHubFile` instance. - This may return an empty dict if the response status is non-200, - or if the target directory is empty. - """ - url = f"{BRANDING_URL}/{path}" - async with self.bot.http_session.get(url, headers=HEADERS, params=PARAMS) as resp: - # Short-circuit if we get non-200 response - if resp.status != STATUS_OK: - log.error(f"GitHub API returned non-200 response: {resp}") - return {} - directory = await resp.json() # Directory at `path` - - allowed_types = {"file", "dir"} if include_dirs else {"file"} - return { - file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"]) - for file in directory - if file["type"] in allowed_types - } - - async def refresh(self) -> bool: - """ - Synchronize available assets with branding repository. - - If the current season is not the evergreen, and lacks at least one asset, - we use the evergreen seasonal dir as fallback for missing assets. - - Finally, if neither the seasonal nor fallback branding directories contain - an asset, it will simply be ignored. - - Return True if the branding has changed. This will be the case when we enter - a new season, or when something changes in the current seasons's directory - in the branding repository. - """ - old_branding = (self.banner, self.avatar, self.available_icons) - seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True) - - # Only make a call to the fallback directory if there is something to be gained - branding_incomplete = any( - asset not in seasonal_dir - for asset in (FILE_BANNER, FILE_AVATAR, SERVER_ICONS) - ) - if branding_incomplete and self.current_season is not SeasonBase: - fallback_dir = await self._get_files(SeasonBase.branding_path, include_dirs=True) - else: - fallback_dir = {} - - # Resolve assets in this directory, None is a safe value - self.banner = seasonal_dir.get(FILE_BANNER) or fallback_dir.get(FILE_BANNER) - self.avatar = seasonal_dir.get(FILE_AVATAR) or fallback_dir.get(FILE_AVATAR) - - # Now resolve server icons by making a call to the proper sub-directory - if SERVER_ICONS in seasonal_dir: - icons_dir = await self._get_files(f"{self.current_season.branding_path}/{SERVER_ICONS}") - self.available_icons = list(icons_dir.values()) - - elif SERVER_ICONS in fallback_dir: - icons_dir = await self._get_files(f"{SeasonBase.branding_path}/{SERVER_ICONS}") - self.available_icons = list(icons_dir.values()) - - else: - self.available_icons = [] # This should never be the case, but an empty list is a safe value - - # GitHubFile instances carry a `sha` attr so this will pick up if a file changes - branding_changed = old_branding != (self.banner, self.avatar, self.available_icons) - - if branding_changed: - log.info(f"New branding detected (season: {self.current_season.season_name})") - await self._reset_remaining_icons() - await self._reset_days_since_cycle() - - return branding_changed - - async def cycle(self) -> bool: - """ - Apply the next-up server icon. - - Returns True if an icon is available and successfully gets applied, False otherwise. - """ - if not self.available_icons: - log.info("Cannot cycle: no icons for this season") - return False - - if not self.remaining_icons: - log.info("Reset & shuffle remaining icons") - await self._reset_remaining_icons() - - next_up = self.remaining_icons.pop(0) - success = await self.bot.set_icon(next_up.download_url) - - return success - - async def apply(self) -> t.List[str]: - """ - Apply current branding to the guild and bot. - - This delegates to the bot instance to do all the work. We only provide download urls - for available assets. Assets unavailable in the branding repo will be ignored. - - Returns a list of names of all failed assets. An asset is considered failed - if it isn't found in the branding repo, or if something goes wrong while the - bot is trying to apply it. - - An empty list denotes that all assets have been applied successfully. - """ - report = {asset: False for asset in ("banner", "avatar", "nickname", "icon")} - - if self.banner is not None: - report["banner"] = await self.bot.set_banner(self.banner.download_url) - - if self.avatar is not None: - report["avatar"] = await self.bot.set_avatar(self.avatar.download_url) - - if self.current_season.bot_name: - report["nickname"] = await self.bot.set_nickname(self.current_season.bot_name) - - report["icon"] = await self.cycle() - - failed_assets = [asset for asset, succeeded in report.items() if not succeeded] - return failed_assets - - @with_role(*MODERATION_ROLES) - @commands.group(name="branding") - async def branding_cmds(self, ctx: commands.Context) -> None: - """Manual branding control.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @branding_cmds.command(name="list", aliases=["ls"]) - async def branding_list(self, ctx: commands.Context) -> None: - """List all available seasons and branding sources.""" - embed = discord.Embed(title="Available seasons", colour=Colours.soft_green) - - for season in get_all_seasons(): - if season is SeasonBase: - active_when = "always" - else: - active_when = f"in {human_months(season.months)}" - - description = ( - f"Active {active_when}\n" - f"Branding: {season.branding_path}" - ) - embed.add_field(name=season.season_name, value=description, inline=False) - - await ctx.send(embed=embed) - - @branding_cmds.command(name="set") - async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None: - """ - Manually set season, or reset to current if none given. - - Season search is a case-less comparison against both seasonal class name, - and its `season_name` attr. - - This only pre-loads the cog's internal state to the chosen season, but does not - automatically apply the branding. As that is an expensive operation, the `apply` - command must be called explicitly after this command finishes. - - This means that this command can be used to 'preview' a season gathering info - about its available assets, without applying them to the guild. - - If the daemon is running, it will automatically reset the season to current when - it wakes up. The season set via this command can therefore remain 'detached' from - what it should be - the daemon will make sure that it's set back properly. - """ - if season_name is None: - new_season = get_current_season() - else: - new_season = get_season(season_name) - if new_season is None: - raise BrandingError("No such season exists") - - if self.current_season is new_season: - raise BrandingError(f"Season {self.current_season.season_name} already active") - - self.current_season = new_season - await self.branding_refresh(ctx) - - @branding_cmds.command(name="info", aliases=["status"]) - async def branding_info(self, ctx: commands.Context) -> None: - """ - Show available assets for current season. - - This can be used to confirm that assets have been resolved properly. - When `apply` is used, it attempts to upload exactly the assets listed here. - """ - await ctx.send(embed=await self._info_embed()) - - @branding_cmds.command(name="refresh") - async def branding_refresh(self, ctx: commands.Context) -> None: - """Sync currently available assets with branding repository.""" - async with ctx.typing(): - await self.refresh() - await self.branding_info(ctx) - - @branding_cmds.command(name="apply") - async def branding_apply(self, ctx: commands.Context) -> None: - """ - Apply current season's branding to the guild. - - Use `info` to check which assets will be applied. Shows which assets have - failed to be applied, if any. - """ - async with ctx.typing(): - failed_assets = await self.apply() - if failed_assets: - raise BrandingError(f"Failed to apply following assets: {', '.join(failed_assets)}") - - response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @branding_cmds.command(name="cycle") - async def branding_cycle(self, ctx: commands.Context) -> None: - """ - Apply the next-up guild icon, if multiple are available. - - The order is random. - """ - async with ctx.typing(): - success = await self.cycle() - if not success: - raise BrandingError("Failed to cycle icon") - - response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @branding_cmds.group(name="daemon", aliases=["d", "task"]) - async def daemon_group(self, ctx: commands.Context) -> None: - """Control the background daemon.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @daemon_group.command(name="status") - async def daemon_status(self, ctx: commands.Context) -> None: - """Check whether daemon is currently active.""" - if self._daemon_running: - remaining_time = (arrow.utcnow() + time_until_midnight()).humanize() - response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green) - response.set_footer(text=f"Next refresh {remaining_time}") - else: - response = discord.Embed(description="Daemon not running", colour=Colours.soft_red) - - await ctx.send(embed=response) - - @daemon_group.command(name="start") - async def daemon_start(self, ctx: commands.Context) -> None: - """If the daemon isn't running, start it.""" - if self._daemon_running: - raise BrandingError("Daemon already running!") - - self.daemon = self.bot.loop.create_task(self._daemon_func()) - self._write_config("daemon_active", True) - - response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @daemon_group.command(name="stop") - async def daemon_stop(self, ctx: commands.Context) -> None: - """If the daemon is running, stop it.""" - if not self._daemon_running: - raise BrandingError("Daemon not running!") - - self.daemon.cancel() - self._write_config("daemon_active", False) - - response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - -def setup(bot: SeasonalBot) -> None: - """Load BrandingManager cog.""" - bot.add_cog(BrandingManager(bot)) diff --git a/bot/exts/evergreen/conversationstarters.py b/bot/exts/evergreen/conversationstarters.py new file mode 100644 index 00000000..576b8d76 --- /dev/null +++ b/bot/exts/evergreen/conversationstarters.py @@ -0,0 +1,71 @@ +from pathlib import Path + +import yaml +from discord import Color, Embed +from discord.ext import commands + +from bot.constants import WHITELISTED_CHANNELS +from bot.utils.decorators import override_in_channel +from bot.utils.randomization import RandomCycle + +SUGGESTION_FORM = 'https://forms.gle/zw6kkJqv8U43Nfjg9' + +with Path("bot/resources/evergreen/starter.yaml").open("r", encoding="utf8") as f: + STARTERS = yaml.load(f, Loader=yaml.FullLoader) + +with Path("bot/resources/evergreen/py_topics.yaml").open("r", encoding="utf8") as f: + # First ID is #python-general and the rest are top to bottom categories of Topical Chat/Help. + PY_TOPICS = yaml.load(f, Loader=yaml.FullLoader) + + # Removing `None` from lists of topics, if not a list, it is changed to an empty one. + PY_TOPICS = {k: [i for i in v if i] if isinstance(v, list) else [] for k, v in PY_TOPICS.items()} + + # All the allowed channels that the ".topic" command is allowed to be executed in. + ALL_ALLOWED_CHANNELS = list(PY_TOPICS.keys()) + list(WHITELISTED_CHANNELS) + +# Putting all topics into one dictionary and shuffling lists to reduce same-topic repetitions. +ALL_TOPICS = {'default': STARTERS, **PY_TOPICS} +TOPICS = { + channel: RandomCycle(topics or ['No topics found for this channel.']) + for channel, topics in ALL_TOPICS.items() +} + + +class ConvoStarters(commands.Cog): + """Evergreen conversation topics.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @commands.command() + @override_in_channel(ALL_ALLOWED_CHANNELS) + async def topic(self, ctx: commands.Context) -> None: + """ + Responds with a random topic to start a conversation. + + If in a Python channel, a python-related topic will be given. + + Otherwise, a random conversation topic will be received by the user. + """ + # No matter what, the form will be shown. + embed = Embed(description=f'Suggest more topics [here]({SUGGESTION_FORM})!', color=Color.blurple()) + + try: + # Fetching topics. + channel_topics = TOPICS[ctx.channel.id] + + # If the channel isn't Python-related. + except KeyError: + embed.title = f'**{next(TOPICS["default"])}**' + + # If the channel ID doesn't have any topics. + else: + embed.title = f'**{next(channel_topics)}**' + + finally: + await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: + """Conversation starters Cog load.""" + bot.add_cog(ConvoStarters(bot)) diff --git a/bot/exts/evergreen/emoji_count.py b/bot/exts/evergreen/emoji_count.py new file mode 100644 index 00000000..cc43e9ab --- /dev/null +++ b/bot/exts/evergreen/emoji_count.py @@ -0,0 +1,97 @@ +import datetime +import logging +import random +from collections import defaultdict +from typing import List, Tuple + +import discord +from discord.ext import commands + +from bot.constants import Colours, ERROR_REPLIES +from bot.utils.pagination import LinePaginator + +log = logging.getLogger(__name__) + + +class EmojiCount(commands.Cog): + """Command that give random emoji based on category.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @staticmethod + def embed_builder(emoji: dict) -> Tuple[discord.Embed, List[str]]: + """Generates an embed with the emoji names and count.""" + embed = discord.Embed( + color=Colours.orange, + title="Emoji Count", + timestamp=datetime.datetime.utcnow() + ) + msg = [] + + if len(emoji) == 1: + for category_name, category_emojis in emoji.items(): + if len(category_emojis) == 1: + msg.append(f"There is **{len(category_emojis)}** emoji in **{category_name}** category") + else: + msg.append(f"There are **{len(category_emojis)}** emojis in **{category_name}** category") + embed.set_thumbnail(url=random.choice(category_emojis).url) + + else: + for category_name, category_emojis in emoji.items(): + emoji_choice = random.choice(category_emojis) + if len(category_emojis) > 1: + emoji_info = f"There are **{len(category_emojis)}** emojis in **{category_name}** category" + else: + emoji_info = f"There is **{len(category_emojis)}** emoji in **{category_name}** category" + if emoji_choice.animated: + msg.append(f'<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') + else: + msg.append(f'<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') + return embed, msg + + @staticmethod + def generate_invalid_embed(emojis: list) -> Tuple[discord.Embed, List[str]]: + """Generates error embed.""" + embed = discord.Embed( + color=Colours.soft_red, + title=random.choice(ERROR_REPLIES) + ) + msg = [] + + emoji_dict = defaultdict(list) + for emoji in emojis: + emoji_dict[emoji.name.split("_")[0]].append(emoji) + + error_comp = ', '.join(emoji_dict) + msg.append(f"These are the valid categories\n```{error_comp}```") + return embed, msg + + @commands.command(name="emojicount", aliases=["ec", "emojis"]) + async def emoji_count(self, ctx: commands.Context, *, category_query: str = None) -> None: + """Returns embed with emoji category and info given by the user.""" + emoji_dict = defaultdict(list) + + if not ctx.guild.emojis: + await ctx.send("No emojis found.") + return + log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user") + for emoji in ctx.guild.emojis: + emoji_category = emoji.name.split("_")[0] + + if category_query is not None and emoji_category not in category_query: + continue + + emoji_dict[emoji_category].append(emoji) + + if not emoji_dict: + log.trace("Invalid name provided by the user") + embed, msg = self.generate_invalid_embed(ctx.guild.emojis) + else: + embed, msg = self.embed_builder(emoji_dict) + await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) + + +def setup(bot: commands.Bot) -> None: + """Emoji Count Cog load.""" + bot.add_cog(EmojiCount(bot)) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 459a2b2d..6e518435 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -9,7 +9,7 @@ from sentry_sdk import push_scope from bot.constants import Colours, ERROR_REPLIES, NEGATIVE_REPLIES from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import BrandingError, UserNotPlayingError +from bot.utils.exceptions import UserNotPlayingError log = logging.getLogger(__name__) @@ -57,10 +57,6 @@ class CommandErrorHandler(commands.Cog): if isinstance(error, commands.CommandNotFound): return - if isinstance(error, BrandingError): - await ctx.send(embed=self.error_embed(str(error))) - return - if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) return diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py index 67a4bae5..101725da 100644 --- a/bot/exts/evergreen/fun.py +++ b/bot/exts/evergreen/fun.py @@ -1,14 +1,16 @@ import functools +import json import logging import random -from typing import Callable, Tuple, Union +from pathlib import Path +from typing import Callable, Iterable, Tuple, Union from discord import Embed, Message from discord.ext import commands -from discord.ext.commands import Bot, Cog, Context, MessageConverter +from discord.ext.commands import BadArgument, Bot, Cog, Context, MessageConverter, clean_content from bot import utils -from bot.constants import Emojis +from bot.constants import Client, Colours, Emojis log = logging.getLogger(__name__) @@ -26,32 +28,53 @@ UWU_WORDS = { } +def caesar_cipher(text: str, offset: int) -> Iterable[str]: + """ + Implements a lazy Caesar Cipher algorithm. + + Encrypts a `text` given a specific integer `offset`. The sign + of the `offset` dictates the direction in which it shifts to, + with a negative value shifting to the left, and a positive + value shifting to the right. + """ + for char in text: + if not char.isascii() or not char.isalpha() or char.isspace(): + yield char + continue + + case_start = 65 if char.isupper() else 97 + true_offset = (ord(char) - case_start + offset) % 26 + + yield chr(case_start + true_offset) + + class Fun(Cog): """A collection of general commands for fun.""" def __init__(self, bot: Bot) -> None: self.bot = bot + with Path("bot/resources/evergreen/caesar_info.json").open("r", encoding="UTF-8") as f: + self._caesar_cipher_embed = json.load(f) + + @staticmethod + def _get_random_die() -> str: + """Generate a random die emoji, ready to be sent on Discord.""" + die_name = f"dice_{random.randint(1, 6)}" + return getattr(Emojis, die_name) + @commands.command() async def roll(self, ctx: Context, num_rolls: int = 1) -> None: """Outputs a number of random dice emotes (up to 6).""" - output = "" - if num_rolls > 6: - num_rolls = 6 - elif num_rolls < 1: - output = ":no_entry: You must roll at least once." - for _ in range(num_rolls): - terning = f"terning{random.randint(1, 6)}" - output += getattr(Emojis, terning, '') - await ctx.send(output) + if 1 <= num_rolls <= 6: + dice = " ".join(self._get_random_die() for _ in range(num_rolls)) + await ctx.send(dice) + else: + raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.") @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) - async def uwu_command(self, ctx: Context, *, text: str) -> None: - """ - Converts a given `text` into it's uwu equivalent. - - Also accepts a valid discord Message ID or link. - """ + async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: + """Converts a given `text` into it's uwu equivalent.""" conversion_func = functools.partial( utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True ) @@ -66,12 +89,8 @@ class Fun(Cog): await ctx.send(content=converted_text, embed=embed) @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) - async def randomcase_command(self, ctx: Context, *, text: str) -> None: - """ - Randomly converts the casing of a given `text`. - - Also accepts a valid discord Message ID or link. - """ + async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: + """Randomly converts the casing of a given `text`.""" def conversion_func(text: str) -> str: """Randomly converts the casing of a given string.""" return "".join( @@ -87,22 +106,100 @@ class Fun(Cog): converted_text = f">>> {converted_text.lstrip('> ')}" await ctx.send(content=converted_text, embed=embed) + @commands.group(name="caesarcipher", aliases=("caesar", "cc",)) + async def caesarcipher_group(self, ctx: Context) -> None: + """ + Translates a message using the Caesar Cipher. + + See `decrypt`, `encrypt`, and `info` subcommands. + """ + if ctx.invoked_subcommand is None: + await ctx.invoke(self.bot.get_command("help"), "caesarcipher") + + @caesarcipher_group.command(name="info") + async def caesarcipher_info(self, ctx: Context) -> None: + """Information about the Caesar Cipher.""" + embed = Embed.from_dict(self._caesar_cipher_embed) + embed.colour = Colours.dark_green + + await ctx.send(embed=embed) + + @staticmethod + async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None: + """ + Given a positive integer `offset`, translates and sends the given `msg`. + + Performs a right shift by default unless `left_shift` is specified as `True`. + + Also accepts a valid Discord Message ID or link. + """ + if offset < 0: + await ctx.send(":no_entry: Cannot use a negative offset.") + return + + if left_shift: + offset = -offset + + def conversion_func(text: str) -> str: + """Encrypts the given string using the Caesar Cipher.""" + return "".join(caesar_cipher(text, offset)) + + text, embed = await Fun._get_text_and_embed(ctx, msg) + + if embed is not None: + embed = Fun._convert_embed(conversion_func, embed) + + converted_text = conversion_func(text) + + if converted_text: + converted_text = f">>> {converted_text.lstrip('> ')}" + + await ctx.send(content=converted_text, embed=embed) + + @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",)) + async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None: + """ + Given a positive integer `offset`, encrypt the given `msg`. + + Performs a right shift of the letters in the message. + + Also accepts a valid Discord Message ID or link. + """ + await self._caesar_cipher(ctx, offset, msg, left_shift=False) + + @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",)) + async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None: + """ + Given a positive integer `offset`, decrypt the given `msg`. + + Performs a left shift of the letters in the message. + + Also accepts a valid Discord Message ID or link. + """ + await self._caesar_cipher(ctx, offset, msg, left_shift=True) + @staticmethod async def _get_text_and_embed(ctx: Context, text: str) -> Tuple[str, Union[Embed, None]]: """ Attempts to extract the text and embed from a possible link to a discord Message. + Does not retrieve the text and embed from the Message if it is in a channel the user does + not have read permissions in. + Returns a tuple of: str: If `text` is a valid discord Message, the contents of the message, else `text`. Union[Embed, None]: The embed if found in the valid Message, else None """ embed = None - message = await Fun._get_discord_message(ctx, text) - if isinstance(message, Message): - text = message.content + + msg = await Fun._get_discord_message(ctx, text) + # Ensure the user has read permissions for the channel the message is in + if isinstance(msg, Message) and ctx.author.permissions_in(msg.channel).read_messages: + text = msg.clean_content # Take first embed because we can't send multiple embeds - if message.embeds: - embed = message.embeds[0] + if msg.embeds: + embed = msg.embeds[0] + return (text, embed) @staticmethod diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py index 3c8b2725..d0fd7a40 100644 --- a/bot/exts/evergreen/game.py +++ b/bot/exts/evergreen/game.py @@ -11,7 +11,7 @@ from discord import Embed from discord.ext import tasks from discord.ext.commands import Cog, Context, group -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import STAFF_ROLES, Tokens from bot.utils.decorators import with_role from bot.utils.pagination import ImagePaginator, LinePaginator @@ -130,7 +130,7 @@ class AgeRatings(IntEnum): class Games(Cog): """Games Cog contains commands that collect data from IGDB.""" - def __init__(self, bot: SeasonalBot): + def __init__(self, bot: Bot): self.bot = bot self.http_session: ClientSession = bot.http_session @@ -415,7 +415,7 @@ class Games(Cog): return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """Add/Load Games cog.""" # Check does IGDB API key exist, if not, log warning and don't load cog if not Tokens.igdb: diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py new file mode 100644 index 00000000..2e38e3ab --- /dev/null +++ b/bot/exts/evergreen/githubinfo.py @@ -0,0 +1,98 @@ +import logging +import random +from datetime import datetime +from typing import Optional + +import discord +from discord.ext import commands +from discord.ext.commands.cooldowns import BucketType + +from bot.constants import NEGATIVE_REPLIES + +log = logging.getLogger(__name__) + + +class GithubInfo(commands.Cog): + """Fetches info from GitHub.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + async def fetch_data(self, url: str) -> dict: + """Retrieve data as a dictionary.""" + async with self.bot.http_session.get(url) as r: + return await r.json() + + @commands.command(name='github', aliases=['gh']) + @commands.cooldown(1, 60, BucketType.user) + async def get_github_info(self, ctx: commands.Context, username: Optional[str]) -> None: + """ + Fetches a user's GitHub information. + + Username is optional and sends the help command if not specified. + """ + if username is None: + await ctx.invoke(self.bot.get_command('help'), 'github') + ctx.command.reset_cooldown(ctx) + return + + async with ctx.typing(): + user_data = await self.fetch_data(f"https://api.github.com/users/{username}") + + # User_data will not have a message key if the user exists + if user_data.get('message') is not None: + await ctx.send(embed=discord.Embed(title=random.choice(NEGATIVE_REPLIES), + description=f"The profile for `{username}` was not found.", + colour=discord.Colour.red())) + return + + org_data = await self.fetch_data(user_data['organizations_url']) + orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] + orgs_to_add = ' | '.join(orgs) + + gists = user_data['public_gists'] + + # Forming blog link + if user_data['blog'].startswith("http"): # Blog link is complete + blog = user_data['blog'] + elif user_data['blog']: # Blog exists but the link is not complete + blog = f"https://{user_data['blog']}" + else: + blog = "No website link available" + + embed = discord.Embed( + title=f"`{user_data['login']}`'s GitHub profile info", + description=f"```{user_data['bio']}```\n" if user_data['bio'] is not None else "", + colour=0x7289da, + url=user_data['html_url'], + timestamp=datetime.strptime(user_data['created_at'], "%Y-%m-%dT%H:%M:%SZ") + ) + embed.set_thumbnail(url=user_data['avatar_url']) + embed.set_footer(text="Account created at") + + if user_data['type'] == "User": + + embed.add_field(name="Followers", + value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)") + embed.add_field(name="\u200b", value="\u200b") + embed.add_field(name="Following", + value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)") + + embed.add_field(name="Public repos", + value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)") + embed.add_field(name="\u200b", value="\u200b") + + if user_data['type'] == "User": + embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{username})") + + embed.add_field(name=f"Organization{'s' if len(orgs)!=1 else ''}", + value=orgs_to_add if orgs else "No organizations") + embed.add_field(name="\u200b", value="\u200b") + embed.add_field(name="Website", value=blog) + + await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: + """Adding the cog to the bot.""" + bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py index ccd76d76..91147243 100644 --- a/bot/exts/evergreen/help.py +++ b/bot/exts/evergreen/help.py @@ -12,7 +12,7 @@ from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Conte from fuzzywuzzy import fuzz, process from bot import constants -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import Emojis from bot.utils.pagination import ( FIRST_EMOJI, LAST_EMOJI, @@ -511,7 +511,7 @@ class Help(DiscordCog): await ctx.send(embed=embed) -def unload(bot: SeasonalBot) -> None: +def unload(bot: Bot) -> None: """ Reinstates the original help command. @@ -521,7 +521,7 @@ def unload(bot: SeasonalBot) -> None: bot.add_command(bot._old_help) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """ The setup for the help extension. @@ -542,7 +542,7 @@ def setup(bot: SeasonalBot) -> None: raise -def teardown(bot: SeasonalBot) -> None: +def teardown(bot: Bot) -> None: """ The teardown for the help extension. diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index 0f83731b..e419a6f5 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -1,9 +1,10 @@ import logging +import random import discord from discord.ext import commands -from bot.constants import Channels, Colours, Emojis, WHITELISTED_CHANNELS +from bot.constants import Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS from bot.utils.decorators import override_in_channel log = logging.getLogger(__name__) @@ -13,6 +14,12 @@ BAD_RESPONSE = { 403: "Rate limit has been hit! Please try again later!" } +MAX_REQUESTS = 10 + +REQUEST_HEADERS = dict() +if GITHUB_TOKEN := Tokens.github: + REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" + class Issues(commands.Cog): """Cog that allows users to retrieve issues from GitHub.""" @@ -21,53 +28,78 @@ class Issues(commands.Cog): self.bot = bot @commands.command(aliases=("pr",)) - @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib,)) + @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib, Channels.dev_branding)) async def issue( - self, ctx: commands.Context, number: int, repository: str = "seasonalbot", user: str = "python-discord" + self, + ctx: commands.Context, + numbers: commands.Greedy[int], + repository: str = "sir-lancebot", + user: str = "python-discord" ) -> None: - """Command to retrieve issues from a GitHub repository.""" - url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" - merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" - - log.trace(f"Querying GH issues API: {url}") - async with self.bot.http_session.get(url) as r: - json_data = await r.json() - - if r.status in BAD_RESPONSE: - log.warning(f"Received response {r.status} from: {url}") - return await ctx.send(f"[{str(r.status)}] {BAD_RESPONSE.get(r.status)}") - - # The initial API request is made to the issues API endpoint, which will return information - # if the issue or PR is present. However, the scope of information returned for PRs differs - # from issues: if the 'issues' key is present in the response then we can pull the data we - # need from the initial API call. - if "issues" in json_data.get("html_url"): - if json_data.get("state") == "open": - icon_url = Emojis.issue - else: - icon_url = Emojis.issue_closed - - # If the 'issues' key is not contained in the API response and there is no error code, then - # we know that a PR has been requested and a call to the pulls API endpoint is necessary - # to get the desired information for the PR. - else: - log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") - async with self.bot.http_session.get(merge_url) as m: + """Command to retrieve issue(s) from a GitHub repository.""" + links = [] + numbers = set(numbers) # Convert from list to set to remove duplicates, if any + + if not numbers: + await ctx.invoke(self.bot.get_command('help'), 'issue') + return + + if len(numbers) > MAX_REQUESTS: + embed = discord.Embed( + title=random.choice(ERROR_REPLIES), + color=Colours.soft_red, + description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})" + ) + await ctx.send(embed=embed) + return + + for number in numbers: + url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" + merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" + + log.trace(f"Querying GH issues API: {url}") + async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: + json_data = await r.json() + + if r.status in BAD_RESPONSE: + log.warning(f"Received response {r.status} from: {url}") + return await ctx.send(f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}") + + # The initial API request is made to the issues API endpoint, which will return information + # if the issue or PR is present. However, the scope of information returned for PRs differs + # from issues: if the 'issues' key is present in the response then we can pull the data we + # need from the initial API call. + if "issues" in json_data.get("html_url"): if json_data.get("state") == "open": - icon_url = Emojis.pull_request - # When the status is 204 this means that the state of the PR is merged - elif m.status == 204: - icon_url = Emojis.merge + icon_url = Emojis.issue else: - icon_url = Emojis.pull_request_closed + icon_url = Emojis.issue_closed + + # If the 'issues' key is not contained in the API response and there is no error code, then + # we know that a PR has been requested and a call to the pulls API endpoint is necessary + # to get the desired information for the PR. + else: + log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") + async with self.bot.http_session.get(merge_url) as m: + if json_data.get("state") == "open": + icon_url = Emojis.pull_request + # When the status is 204 this means that the state of the PR is merged + elif m.status == 204: + icon_url = Emojis.merge + else: + icon_url = Emojis.pull_request_closed + + issue_url = json_data.get("html_url") + links.append([icon_url, f"[{repository}] #{number} {json_data.get('title')}", issue_url]) - issue_url = json_data.get("html_url") - description_text = f"[{repository}] #{number} {json_data.get('title')}" + # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link. + description_list = ["{0} [{1}]({2})".format(*link) for link in links] resp = discord.Embed( colour=Colours.bright_green, - description=f"{icon_url} [{description_text}]({issue_url})" + description='\n'.join(description_list) ) - resp.set_author(name="GitHub", url=issue_url) + + resp.set_author(name="GitHub", url=f"https://github.com/{user}/{repository}") await ctx.send(embed=resp) diff --git a/bot/exts/evergreen/magic_8ball.py b/bot/exts/evergreen/magic_8ball.py index c10f1f51..f974e487 100644 --- a/bot/exts/evergreen/magic_8ball.py +++ b/bot/exts/evergreen/magic_8ball.py @@ -13,7 +13,7 @@ class Magic8ball(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot - with open(Path("bot/resources/evergreen/magic8ball.json"), "r") as file: + with open(Path("bot/resources/evergreen/magic8ball.json"), "r", encoding="utf8") as file: self.answers = json.load(file) @commands.command(name="8ball") diff --git a/bot/exts/evergreen/minesweeper.py b/bot/exts/evergreen/minesweeper.py index ae057b30..286ac7a5 100644 --- a/bot/exts/evergreen/minesweeper.py +++ b/bot/exts/evergreen/minesweeper.py @@ -120,14 +120,14 @@ class Minesweeper(commands.Cog): def format_for_discord(board: GameBoard) -> str: """Format the board as a string for Discord.""" discord_msg = ( - ":stop_button: :regional_indicator_a::regional_indicator_b::regional_indicator_c:" - ":regional_indicator_d::regional_indicator_e::regional_indicator_f::regional_indicator_g:" - ":regional_indicator_h::regional_indicator_i::regional_indicator_j:\n\n" + ":stop_button: :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: " + ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: " + ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n" ) rows = [] for row_number, row in enumerate(board): new_row = f"{MESSAGE_MAPPING[row_number + 1]} " - new_row += "".join(MESSAGE_MAPPING[cell] for cell in row) + new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row) rows.append(new_row) discord_msg += "\n".join(rows) @@ -141,22 +141,27 @@ class Minesweeper(commands.Cog): await ctx.message.delete(delay=2) return + try: + await ctx.author.send( + f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n" + f"Close the game with `{Client.prefix}ms end`\n" + ) + except discord.errors.Forbidden: + log.debug(f"{ctx.author.name} ({ctx.author.id}) has disabled DMs from server members") + await ctx.send(f":x: {ctx.author.mention}, please enable DMs to play minesweeper.") + return + # Add game to list board: GameBoard = self.generate_board(bomb_chance) revealed_board: GameBoard = [["hidden"] * 10 for _ in range(10)] + dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}") if ctx.guild: await ctx.send(f"{ctx.author.mention} is playing Minesweeper") - chat_msg = await ctx.send(f"Here's there board!\n{self.format_for_discord(revealed_board)}") + chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}") else: chat_msg = None - await ctx.author.send( - f"Play by typing: `{Client.prefix}ms reveal xy [xy]` or `{Client.prefix}ms flag xy [xy]` \n" - f"Close the game with `{Client.prefix}ms end`\n" - ) - dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(revealed_board)}") - self.games[ctx.author.id] = Game( board=board, revealed=revealed_board, @@ -171,7 +176,7 @@ class Minesweeper(commands.Cog): await game.dm_msg.delete() game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}") if game.activated_on_server: - await game.chat_msg.edit(content=f"Here's there board!\n{self.format_for_discord(game.revealed)}") + await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}") @commands.dm_only() @minesweeper_group.command(name="flag") diff --git a/bot/exts/evergreen/movie.py b/bot/exts/evergreen/movie.py index 93aeef30..340a5724 100644 --- a/bot/exts/evergreen/movie.py +++ b/bot/exts/evergreen/movie.py @@ -190,7 +190,10 @@ class Movie(Cog): async def get_embed(self, name: str) -> Embed: """Return embed of random movies. Uses name in title.""" - return Embed(title=f'Random {name} Movies').set_footer(text='Powered by TMDB (themoviedb.org)') + embed = Embed(title=f"Random {name} Movies") + embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") + embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") + return embed def setup(bot: Bot) -> None: diff --git a/bot/exts/evergreen/recommend_game.py b/bot/exts/evergreen/recommend_game.py index 7cd52c2c..5e262a5b 100644 --- a/bot/exts/evergreen/recommend_game.py +++ b/bot/exts/evergreen/recommend_game.py @@ -11,7 +11,7 @@ game_recs = [] # Populate the list `game_recs` with resource files for rec_path in Path("bot/resources/evergreen/game_recs").glob("*.json"): - with rec_path.open(encoding='utf-8') as file: + with rec_path.open(encoding='utf8') as file: data = json.load(file) game_recs.append(data) shuffle(game_recs) diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py index fe204419..49127bea 100644 --- a/bot/exts/evergreen/reddit.py +++ b/bot/exts/evergreen/reddit.py @@ -68,9 +68,9 @@ class Reddit(commands.Cog): # ----------------------------------------------------------- # This code below is bound of change when the emojis are added. - upvote_emoji = self.bot.get_emoji(638729835245731840) - comment_emoji = self.bot.get_emoji(638729835073765387) - user_emoji = self.bot.get_emoji(638729835442602003) + upvote_emoji = self.bot.get_emoji(755845219890757644) + comment_emoji = self.bot.get_emoji(755845255001014384) + user_emoji = self.bot.get_emoji(755845303822974997) text_emoji = self.bot.get_emoji(676030265910493204) video_emoji = self.bot.get_emoji(676030265839190047) image_emoji = self.bot.get_emoji(676030265734201344) diff --git a/bot/exts/evergreen/showprojects.py b/bot/exts/evergreen/showprojects.py deleted file mode 100644 index 328a7aa5..00000000 --- a/bot/exts/evergreen/showprojects.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -from discord import Message -from discord.ext import commands - -from bot.constants import Channels - -log = logging.getLogger(__name__) - - -class ShowProjects(commands.Cog): - """Cog that reacts to posts in the #show-your-projects.""" - - def __init__(self, bot: commands.Bot): - self.bot = bot - self.lastPoster = 0 # Given 0 as the default last poster ID as no user can actually have 0 assigned to them - - @commands.Cog.listener() - async def on_message(self, message: Message) -> None: - """Adds reactions to posts in #show-your-projects.""" - reactions = ["\U0001f44d", "\U00002764", "\U0001f440", "\U0001f389", "\U0001f680", "\U00002b50", "\U0001f6a9"] - if (message.channel.id == Channels.show_your_projects - and message.author.bot is False - and message.author.id != self.lastPoster): - for reaction in reactions: - await message.add_reaction(reaction) - - self.lastPoster = message.author.id - - -def setup(bot: commands.Bot) -> None: - """Show Projects Reaction Cog.""" - bot.add_cog(ShowProjects(bot)) diff --git a/bot/exts/evergreen/snakes/__init__.py b/bot/exts/evergreen/snakes/__init__.py index 2eae2751..bc42f0c2 100644 --- a/bot/exts/evergreen/snakes/__init__.py +++ b/bot/exts/evergreen/snakes/__init__.py @@ -2,7 +2,7 @@ import logging from discord.ext import commands -from bot.exts.evergreen.snakes.snakes_cog import Snakes +from bot.exts.evergreen.snakes._snakes_cog import Snakes log = logging.getLogger(__name__) diff --git a/bot/exts/evergreen/snakes/converter.py b/bot/exts/evergreen/snakes/_converter.py index d4e93b56..eee248cf 100644 --- a/bot/exts/evergreen/snakes/converter.py +++ b/bot/exts/evergreen/snakes/_converter.py @@ -7,7 +7,7 @@ import discord from discord.ext.commands import Context, Converter from fuzzywuzzy import fuzz -from bot.exts.evergreen.snakes.utils import SNAKE_RESOURCES +from bot.exts.evergreen.snakes._utils import SNAKE_RESOURCES from bot.utils import disambiguate log = logging.getLogger(__name__) @@ -63,12 +63,12 @@ class Snake(Converter): """Build list of snakes from the static snake resources.""" # Get all the snakes if cls.snakes is None: - with (SNAKE_RESOURCES / "snake_names.json").open() as snakefile: + with (SNAKE_RESOURCES / "snake_names.json").open(encoding="utf8") as snakefile: cls.snakes = json.load(snakefile) # Get the special cases if cls.special_cases is None: - with (SNAKE_RESOURCES / "special_snakes.json").open() as snakefile: + with (SNAKE_RESOURCES / "special_snakes.json").open(encoding="utf8") as snakefile: special_cases = json.load(snakefile) cls.special_cases = {snake['name'].lower(): snake for snake in special_cases} diff --git a/bot/exts/evergreen/snakes/snakes_cog.py b/bot/exts/evergreen/snakes/_snakes_cog.py index b3896fcd..70bb0e73 100644 --- a/bot/exts/evergreen/snakes/snakes_cog.py +++ b/bot/exts/evergreen/snakes/_snakes_cog.py @@ -18,8 +18,8 @@ from discord import Colour, Embed, File, Member, Message, Reaction from discord.ext.commands import BadArgument, Bot, Cog, CommandError, Context, bot_has_permissions, group from bot.constants import ERROR_REPLIES, Tokens -from bot.exts.evergreen.snakes import utils -from bot.exts.evergreen.snakes.converter import Snake +from bot.exts.evergreen.snakes import _utils as utils +from bot.exts.evergreen.snakes._converter import Snake from bot.utils.decorators import locked log = logging.getLogger(__name__) @@ -567,7 +567,7 @@ class Snakes(Cog): antidote_embed = Embed(color=SNAKE_COLOR, title="Antidote") antidote_embed.set_author(name=ctx.author.name, icon_url=ctx.author.avatar_url) antidote_embed.set_image(url="https://i.makeagif.com/media/7-12-2015/Cj1pts.gif") - antidote_embed.add_field(name=f"You have created the snake antidote!", + antidote_embed.add_field(name="You have created the snake antidote!", value=f"The solution was: {' '.join(antidote_answer)}\n" f"You had {10 - antidote_tries} tries remaining.") await board_id.edit(embed=antidote_embed) @@ -1078,18 +1078,18 @@ class Snakes(Cog): query = snake['name'] # Build the URL and make the request - url = f'https://www.googleapis.com/youtube/v3/search' + url = 'https://www.googleapis.com/youtube/v3/search' response = await self.bot.http_session.get( url, params={ "part": "snippet", - "q": urllib.parse.quote(query), + "q": urllib.parse.quote_plus(query), "type": "video", "key": Tokens.youtube } ) response = await response.json() - data = response['items'] + data = response.get("items", []) # Send the user a video if len(data) > 0: diff --git a/bot/exts/evergreen/snakes/utils.py b/bot/exts/evergreen/snakes/_utils.py index 7d6caf04..7d6caf04 100644 --- a/bot/exts/evergreen/snakes/utils.py +++ b/bot/exts/evergreen/snakes/_utils.py diff --git a/bot/exts/evergreen/source.py b/bot/exts/evergreen/source.py new file mode 100644 index 00000000..cdfe54ec --- /dev/null +++ b/bot/exts/evergreen/source.py @@ -0,0 +1,109 @@ +import inspect +from pathlib import Path +from typing import Optional, Tuple, Union + +from discord import Embed +from discord.ext import commands + +from bot.constants import Source + +SourceType = Union[commands.Command, commands.Cog, str, commands.ExtensionNotLoaded] + + +class SourceConverter(commands.Converter): + """Convert an argument into a help command, tag, command, or cog.""" + + async def convert(self, ctx: commands.Context, argument: str) -> SourceType: + """Convert argument into source object.""" + cog = ctx.bot.get_cog(argument) + if cog: + return cog + + cmd = ctx.bot.get_command(argument) + if cmd: + return cmd + + raise commands.BadArgument( + f"Unable to convert `{argument}` to valid command or Cog." + ) + + +class BotSource(commands.Cog): + """Displays information about the bot's source code.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @commands.command(name="source", aliases=("src",)) + async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: + """Display information and a GitHub link to the source code of a command, tag, or cog.""" + if not source_item: + embed = Embed(title="Sir Lancebot's GitHub Repository") + embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") + embed.set_thumbnail(url=Source.github_avatar_url) + await ctx.send(embed=embed) + return + + embed = await self.build_embed(source_item) + await ctx.send(embed=embed) + + def get_source_link(self, source_item: SourceType) -> Tuple[str, str, Optional[int]]: + """ + Build GitHub link of source item, return this link, file location and first line number. + + Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval). + """ + if isinstance(source_item, commands.Command): + src = source_item.callback.__code__ + filename = src.co_filename + else: + src = type(source_item) + try: + filename = inspect.getsourcefile(src) + except TypeError: + raise commands.BadArgument("Cannot get source for a dynamically-created object.") + + if not isinstance(source_item, str): + try: + lines, first_line_no = inspect.getsourcelines(src) + except OSError: + raise commands.BadArgument("Cannot get source for a dynamically-created object.") + + lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}" + else: + first_line_no = None + lines_extension = "" + + file_location = Path(filename).relative_to(Path.cwd()).as_posix() + + url = f"{Source.github}/blob/master/{file_location}{lines_extension}" + + return url, file_location, first_line_no or None + + async def build_embed(self, source_object: SourceType) -> Optional[Embed]: + """Build embed based on source object.""" + url, location, first_line = self.get_source_link(source_object) + + if isinstance(source_object, commands.Command): + if source_object.cog_name == 'Help': + title = "Help Command" + description = source_object.__doc__.splitlines()[1] + else: + description = source_object.short_doc + title = f"Command: {source_object.qualified_name}" + else: + title = f"Cog: {source_object.qualified_name}" + description = source_object.description.splitlines()[0] + + embed = Embed(title=title, description=description) + embed.set_thumbnail(url=Source.github_avatar_url) + embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})") + line_text = f":{first_line}" if first_line else "" + embed.set_footer(text=f"{location}{line_text}") + + return embed + + +def setup(bot: commands.Bot) -> None: + """Load the BotSource cog.""" + bot.add_cog(BotSource(bot)) diff --git a/bot/exts/evergreen/space.py b/bot/exts/evergreen/space.py index 3587fc00..bc8e3118 100644 --- a/bot/exts/evergreen/space.py +++ b/bot/exts/evergreen/space.py @@ -8,7 +8,7 @@ from discord import Embed from discord.ext import tasks from discord.ext.commands import BadArgument, Cog, Context, Converter, group -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import Tokens logger = logging.getLogger(__name__) @@ -37,7 +37,7 @@ class DateConverter(Converter): class Space(Cog): """Space Cog contains commands, that show images, facts or other information about space.""" - def __init__(self, bot: SeasonalBot): + def __init__(self, bot: Bot): self.bot = bot self.http_session = bot.http_session @@ -240,7 +240,7 @@ class Space(Cog): ).set_image(url=image).set_footer(text="Powered by NASA API" + footer) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """Load Space Cog.""" if not Tokens.nasa: logger.warning("Can't find NASA API key. Not loading Space Cog.") diff --git a/bot/exts/evergreen/speedrun.py b/bot/exts/evergreen/speedrun.py index 4e8d7aee..21aad5aa 100644 --- a/bot/exts/evergreen/speedrun.py +++ b/bot/exts/evergreen/speedrun.py @@ -6,7 +6,7 @@ from random import choice from discord.ext import commands log = logging.getLogger(__name__) -with Path('bot/resources/evergreen/speedrun_links.json').open(encoding="utf-8") as file: +with Path('bot/resources/evergreen/speedrun_links.json').open(encoding="utf8") as file: LINKS = json.load(file) diff --git a/bot/exts/evergreen/status_cats.py b/bot/exts/evergreen/status_cats.py new file mode 100644 index 00000000..586b8378 --- /dev/null +++ b/bot/exts/evergreen/status_cats.py @@ -0,0 +1,33 @@ +from http import HTTPStatus + +import discord +from discord.ext import commands + + +class StatusCats(commands.Cog): + """Commands that give HTTP statuses described and visualized by cats.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @commands.command(aliases=['statuscat']) + async def http_cat(self, ctx: commands.Context, code: int) -> None: + """Sends an embed with an image of a cat, potraying the status code.""" + embed = discord.Embed(title=f'**Status: {code}**') + + try: + HTTPStatus(code) + + except ValueError: + embed.set_footer(text='Inputted status code does not exist.') + + else: + embed.set_image(url=f'https://http.cat/{code}.jpg') + + finally: + await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: + """Load the StatusCats cog.""" + bot.add_cog(StatusCats(bot)) diff --git a/bot/exts/evergreen/trivia_quiz.py b/bot/exts/evergreen/trivia_quiz.py index c1a271e8..fe692c2a 100644 --- a/bot/exts/evergreen/trivia_quiz.py +++ b/bot/exts/evergreen/trivia_quiz.py @@ -40,7 +40,7 @@ class TriviaQuiz(commands.Cog): def load_questions() -> dict: """Load the questions from the JSON file.""" p = Path("bot", "resources", "evergreen", "trivia_quiz.json") - with p.open() as json_data: + with p.open(encoding="utf8") as json_data: questions = json.load(json_data) return questions @@ -121,8 +121,10 @@ class TriviaQuiz(commands.Cog): # A function to check whether user input is the correct answer(close to the right answer) def check(m: discord.Message) -> bool: - ratio = fuzz.ratio(answer.lower(), m.content.lower()) - return ratio > 85 and m.channel == ctx.channel + return ( + m.channel == ctx.channel + and fuzz.ratio(answer.lower(), m.content.lower()) > 85 + ) try: msg = await self.bot.wait_for('message', check=check, timeout=10) diff --git a/bot/exts/evergreen/wikipedia.py b/bot/exts/evergreen/wikipedia.py new file mode 100644 index 00000000..be36e2c4 --- /dev/null +++ b/bot/exts/evergreen/wikipedia.py @@ -0,0 +1,114 @@ +import asyncio +import datetime +import logging +from typing import List, Optional + +from aiohttp import client_exceptions +from discord import Color, Embed, Message +from discord.ext import commands + +from bot.constants import Wikipedia + +log = logging.getLogger(__name__) + +SEARCH_API = "https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={search_term}&format=json" +WIKIPEDIA_URL = "https://en.wikipedia.org/wiki/{title}" + + +class WikipediaSearch(commands.Cog): + """Get info from wikipedia.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + self.http_session = bot.http_session + + @staticmethod + def formatted_wiki_url(index: int, title: str) -> str: + """Formating wikipedia link with index and title.""" + return f'`{index}` [{title}]({WIKIPEDIA_URL.format(title=title.replace(" ", "_"))})' + + async def search_wikipedia(self, search_term: str) -> Optional[List[str]]: + """Search wikipedia and return the first 10 pages found.""" + pages = [] + async with self.http_session.get(SEARCH_API.format(search_term=search_term)) as response: + try: + data = await response.json() + + search_results = data["query"]["search"] + + # Ignore pages with "may refer to" + for search_result in search_results: + log.info("trying to append titles") + if "may refer to" not in search_result["snippet"]: + pages.append(search_result["title"]) + except client_exceptions.ContentTypeError: + pages = None + + log.info("Finished appending titles") + return pages + + @commands.cooldown(1, 10, commands.BucketType.user) + @commands.command(name="wikipedia", aliases=["wiki"]) + async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None: + """Return list of results containing your search query from wikipedia.""" + titles = await self.search_wikipedia(search) + + def check(message: Message) -> bool: + return message.author.id == ctx.author.id and message.channel == ctx.channel + + if not titles: + await ctx.send("Sorry, we could not find a wikipedia article using that search term") + return + + async with ctx.typing(): + log.info("Finished appending titles to titles_no_underscore list") + + s_desc = "\n".join(self.formatted_wiki_url(index, title) for index, title in enumerate(titles, start=1)) + embed = Embed(colour=Color.blue(), title=f"Wikipedia results for `{search}`", description=s_desc) + embed.timestamp = datetime.datetime.utcnow() + await ctx.send(embed=embed) + embed = Embed(colour=Color.green(), description="Enter number to choose") + msg = await ctx.send(embed=embed) + titles_len = len(titles) # getting length of list + + for retry_count in range(1, Wikipedia.total_chance + 1): + retries_left = Wikipedia.total_chance - retry_count + if retry_count < Wikipedia.total_chance: + error_msg = f"You have `{retries_left}/{Wikipedia.total_chance}` chances left" + else: + error_msg = 'Please try again by using `.wiki` command' + try: + message = await ctx.bot.wait_for('message', timeout=60.0, check=check) + response_from_user = await self.bot.get_context(message) + + if response_from_user.command: + return + + response = int(message.content) + if response < 0: + await ctx.send(f"Sorry, but you can't give negative index, {error_msg}") + elif response == 0: + await ctx.send(f"Sorry, please give an integer between `1` to `{titles_len}`, {error_msg}") + else: + await ctx.send(WIKIPEDIA_URL.format(title=titles[response - 1].replace(" ", "_"))) + break + + except asyncio.TimeoutError: + embed = Embed(colour=Color.red(), description=f"Time's up {ctx.author.mention}") + await msg.edit(embed=embed) + break + + except ValueError: + await ctx.send(f"Sorry, but you cannot do that, I will only accept an positive integer, {error_msg}") + + except IndexError: + await ctx.send(f"Sorry, please give an integer between `1` to `{titles_len}`, {error_msg}") + + except Exception as e: + log.info(f"Caught exception {e}, breaking out of retry loop") + break + + +def setup(bot: commands.Bot) -> None: + """Wikipedia Cog load.""" + bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/evergreen/wolfram.py b/bot/exts/evergreen/wolfram.py new file mode 100644 index 00000000..898e8d2a --- /dev/null +++ b/bot/exts/evergreen/wolfram.py @@ -0,0 +1,278 @@ +import logging +from io import BytesIO +from typing import Callable, List, Optional, Tuple +from urllib import parse + +import arrow +import discord +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Cog, Context, check, group + +from bot.constants import Colours, STAFF_ROLES, Wolfram +from bot.utils.pagination import ImagePaginator + +log = logging.getLogger(__name__) + +APPID = Wolfram.key +DEFAULT_OUTPUT_FORMAT = "JSON" +QUERY = "http://api.wolframalpha.com/v2/{request}?{data}" +WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" + +MAX_PODS = 20 + +# Allows for 10 wolfram calls pr user pr day +usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user) + +# Allows for max api requests / days in month per day for the entire guild (Temporary) +guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild) + + +async def send_embed( + ctx: Context, + message_txt: str, + colour: int = Colours.soft_red, + footer: str = None, + img_url: str = None, + f: discord.File = None +) -> None: + """Generate & send a response embed with Wolfram as the author.""" + embed = Embed(colour=colour) + embed.description = message_txt + embed.set_author(name="Wolfram Alpha", + icon_url=WOLF_IMAGE, + url="https://www.wolframalpha.com/") + if footer: + embed.set_footer(text=footer) + + if img_url: + embed.set_image(url=img_url) + + await ctx.send(embed=embed, file=f) + + +def custom_cooldown(*ignore: List[int]) -> Callable: + """ + Implement per-user and per-guild cooldowns for requests to the Wolfram API. + + A list of roles may be provided to ignore the per-user cooldown + """ + async def predicate(ctx: Context) -> bool: + if ctx.invoked_with == 'help': + # if the invoked command is help we don't want to increase the ratelimits since it's not actually + # invoking the command/making a request, so instead just check if the user/guild are on cooldown. + guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown + if not any(r.id in ignore for r in ctx.author.roles): # check user bucket if user is not ignored + return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 + return guild_cooldown + + user_bucket = usercd.get_bucket(ctx.message) + + if all(role.id not in ignore for role in ctx.author.roles): + user_rate = user_bucket.update_rate_limit() + + if user_rate: + # Can't use api; cause: member limit + cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True) + message = ( + "You've used up your limit for Wolfram|Alpha requests.\n" + f"Cooldown: {cooldown}" + ) + await send_embed(ctx, message) + return False + + guild_bucket = guildcd.get_bucket(ctx.message) + guild_rate = guild_bucket.update_rate_limit() + + # Repr has a token attribute to read requests left + log.debug(guild_bucket) + + if guild_rate: + # Can't use api; cause: guild limit + message = ( + "The max limit of requests for the server has been reached for today.\n" + f"Cooldown: {int(guild_rate)}" + ) + await send_embed(ctx, message) + return False + + return True + + return check(predicate) + + +async def get_pod_pages(ctx: Context, bot: commands.Bot, query: str) -> Optional[List[Tuple]]: + """Get the Wolfram API pod pages for the provided query.""" + async with ctx.channel.typing(): + url_str = parse.urlencode({ + "input": query, + "appid": APPID, + "output": DEFAULT_OUTPUT_FORMAT, + "format": "image,plaintext" + }) + request_url = QUERY.format(request="query", data=url_str) + + async with bot.http_session.get(request_url) as response: + json = await response.json(content_type='text/plain') + + result = json["queryresult"] + + if result["error"]: + # API key not set up correctly + if result["error"]["msg"] == "Invalid appid": + message = "Wolfram API key is invalid or missing." + log.warning( + "API key seems to be missing, or invalid when " + f"processing a wolfram request: {url_str}, Response: {json}" + ) + await send_embed(ctx, message) + return + + message = "Something went wrong internally with your request, please notify staff!" + log.warning(f"Something went wrong getting a response from wolfram: {url_str}, Response: {json}") + await send_embed(ctx, message) + return + + if not result["success"]: + message = f"I couldn't find anything for {query}." + await send_embed(ctx, message) + return + + if not result["numpods"]: + message = "Could not find any results." + await send_embed(ctx, message) + return + + pods = result["pods"] + pages = [] + for pod in pods[:MAX_PODS]: + subs = pod.get("subpods") + + for sub in subs: + title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") + img = sub["img"]["src"] + pages.append((title, img)) + return pages + + +class Wolfram(Cog): + """Commands for interacting with the Wolfram|Alpha API.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_command(self, ctx: Context, *, query: str) -> None: + """Requests all answers on a single image, sends an image of all related pods.""" + url_str = parse.urlencode({ + "i": query, + "appid": APPID, + }) + query = QUERY.format(request="simple", data=url_str) + + # Give feedback that the bot is working. + async with ctx.channel.typing(): + async with self.bot.http_session.get(query) as response: + status = response.status + image_bytes = await response.read() + + f = discord.File(BytesIO(image_bytes), filename="image.png") + image_url = "attachment://image.png" + + if status == 501: + message = "Failed to get response" + footer = "" + color = Colours.soft_red + elif status == 400: + message = "No input found" + footer = "" + color = Colours.soft_red + elif status == 403: + message = "Wolfram API key is invalid or missing." + footer = "" + color = Colours.soft_red + else: + message = "" + footer = "View original for a bigger picture." + color = Colours.soft_orange + + # Sends a "blank" embed if no request is received, unsure how to fix + await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) + + @wolfram_command.command(name="page", aliases=("pa", "p")) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: + """ + Requests a drawn image of given query. + + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. + """ + pages = await get_pod_pages(ctx, self.bot, query) + + if not pages: + return + + embed = Embed() + embed.set_author(name="Wolfram Alpha", + icon_url=WOLF_IMAGE, + url="https://www.wolframalpha.com/") + embed.colour = Colours.soft_orange + + await ImagePaginator.paginate(pages, ctx, embed) + + @wolfram_command.command(name="cut", aliases=("c",)) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: + """ + Requests a drawn image of given query. + + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. + """ + pages = await get_pod_pages(ctx, self.bot, query) + + if not pages: + return + + if len(pages) >= 2: + page = pages[1] + else: + page = pages[0] + + await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) + + @wolfram_command.command(name="short", aliases=("sh", "s")) + @custom_cooldown(*STAFF_ROLES) + async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: + """Requests an answer to a simple question.""" + url_str = parse.urlencode({ + "i": query, + "appid": APPID, + }) + query = QUERY.format(request="result", data=url_str) + + # Give feedback that the bot is working. + async with ctx.channel.typing(): + async with self.bot.http_session.get(query) as response: + status = response.status + response_text = await response.text() + + if status == 501: + message = "Failed to get response" + color = Colours.soft_red + elif status == 400: + message = "No input found" + color = Colours.soft_red + elif response_text == "Error 1: Invalid appid": + message = "Wolfram API key is invalid or missing." + color = Colours.soft_red + else: + message = response_text + color = Colours.soft_orange + + await send_embed(ctx, message, color) + + +def setup(bot: commands.Bot) -> None: + """Load the Wolfram cog.""" + bot.add_cog(Wolfram(bot)) diff --git a/bot/exts/evergreen/wonder_twins.py b/bot/exts/evergreen/wonder_twins.py new file mode 100644 index 00000000..afc5346e --- /dev/null +++ b/bot/exts/evergreen/wonder_twins.py @@ -0,0 +1,49 @@ +import random +from pathlib import Path + +import yaml +from discord.ext.commands import Bot, Cog, Context, command + + +class WonderTwins(Cog): + """Cog for a Wonder Twins inspired command.""" + + def __init__(self, bot: Bot): + self.bot = bot + + with open(Path.cwd() / "bot" / "resources" / "evergreen" / "wonder_twins.yaml", "r", encoding="utf-8") as f: + info = yaml.load(f, Loader=yaml.FullLoader) + self.water_types = info["water_types"] + self.objects = info["objects"] + self.adjectives = info["adjectives"] + + @staticmethod + def append_onto(phrase: str, insert_word: str) -> str: + """Appends one word onto the end of another phrase in order to format with the proper determiner.""" + if insert_word.endswith("s"): + phrase = phrase.split() + del phrase[0] + phrase = " ".join(phrase) + + insert_word = insert_word.split()[-1] + return " ".join([phrase, insert_word]) + + def format_phrase(self) -> str: + """Creates a transformation phrase from available words.""" + adjective = random.choice((None, random.choice(self.adjectives))) + object_name = random.choice(self.objects) + water_type = random.choice(self.water_types) + + if adjective: + object_name = self.append_onto(adjective, object_name) + return f"{object_name} of {water_type}" + + @command(name="formof", aliases=["wondertwins", "wondertwin", "fo"]) + async def form_of(self, ctx: Context) -> None: + """Command to send a Wonder Twins inspired phrase to the user invoking the command.""" + await ctx.send(f"Form of {self.format_phrase()}!") + + +def setup(bot: Bot) -> None: + """Load the WonderTwins cog.""" + bot.add_cog(WonderTwins(bot)) |