From db1efddda4e042587bc9c91408bc5ecf590ed107 Mon Sep 17 00:00:00 2001 From: Leon Sandøy Date: Tue, 9 Oct 2018 20:21:48 +0200 Subject: Wolfram Cog - Merge Request 56, by Chibli --- bot/__main__.py | 1 + bot/cogs/wolfram.py | 289 ++++++++++++++++++++++++++++++++++++++++++++++++++++ bot/constants.py | 9 ++ bot/pagination.py | 185 +++++++++++++++++++++++++++++++-- config-default.yml | 7 ++ 5 files changed, 485 insertions(+), 6 deletions(-) create mode 100644 bot/cogs/wolfram.py diff --git a/bot/__main__.py b/bot/__main__.py index 602846ded..3059b3ed0 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -72,6 +72,7 @@ bot.load_extension("bot.cogs.snekbox") bot.load_extension("bot.cogs.tags") bot.load_extension("bot.cogs.token_remover") bot.load_extension("bot.cogs.utils") +bot.load_extension("bot.cogs.wolfram") if has_rmq: bot.load_extension("bot.cogs.rmq") diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py new file mode 100644 index 000000000..aabd83f9f --- /dev/null +++ b/bot/cogs/wolfram.py @@ -0,0 +1,289 @@ +import logging +from io import BytesIO +from typing import List, Optional, Tuple +from urllib import parse + +import discord +from discord import Embed +from discord.ext import commands +from discord.ext.commands import BucketType, Context, check, group + +from bot.constants import Colours, Roles, Wolfram +from bot.pagination import ImagePaginator + +log = logging.getLogger(__name__) + +APPID = Wolfram.key +DEFAULT_OUTPUT_FORMAT = "JSON" +QUERY = "http://api.wolframalpha.com/v2/{request}?{data}" +WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" + +COOLDOWN_IGNORERS = Roles.moderator, Roles.owner, Roles.admin, Roles.helpers +MAX_PODS = 20 + +# Allows for 10 wolfram calls pr user pr day +usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60*60*24, BucketType.user) + +# Allows for max api requests / days in month per day for the entire guild (Temporary) +guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60*60*24, BucketType.guild) + + +async def send_embed( + ctx: Context, + message_txt: str, + colour: int=Colours.soft_red, + footer: str=None, + img_url: str=None, + f: discord.File = None +) -> None: + """ + Generates an embed with wolfram as the author, with message_txt as description, + adds custom colour if specified, a footer and image (could be a file with f param) and sends + the embed through ctx + :param ctx: Context + :param message_txt: str - Message to be sent + :param colour: int - Default: Colours.soft_red - Colour of embed + :param footer: str - Default: None - Adds a footer to the embed + :param img_url:str - Default: None - Adds an image to the embed + :param f: discord.File - Default: None - Add a file to the msg, often attached as image to embed + """ + + embed = Embed(colour=colour) + embed.description = message_txt + embed.set_author(name="Wolfram Alpha", + icon_url=WOLF_IMAGE, + url="https://www.wolframalpha.com/") + if footer: + embed.set_footer(text=footer) + + if img_url: + embed.set_image(url=img_url) + + await ctx.send(embed=embed, file=f) + + +def custom_cooldown(*ignore: List[int]) -> check: + """ + Custom cooldown mapping that applies a specific requests per day to users. + Staff is ignored by the user cooldown, however the cooldown implements a + total amount of uses per day for the entire guild. (Configurable in configs) + + :param ignore: List[int] -- list of ids of roles to be ignored by user cooldown + :return: check + """ + + async def predicate(ctx: Context) -> bool: + user_bucket = usercd.get_bucket(ctx.message) + + if ctx.author.top_role.id not in ignore: + user_rate = user_bucket.update_rate_limit() + + if user_rate: + # Can't use api; cause: member limit + message = ( + "You've used up your limit for Wolfram|Alpha requests.\n" + f"Cooldown: {int(user_rate)}" + ) + await send_embed(ctx, message) + return False + + guild_bucket = guildcd.get_bucket(ctx.message) + guild_rate = guild_bucket.update_rate_limit() + + # Repr has a token attribute to read requests left + log.debug(guild_bucket) + + if guild_rate: + # Can't use api; cause: guild limit + message = ( + "The max limit of requests for the server has been reached for today.\n" + f"Cooldown: {int(guild_rate)}" + ) + await send_embed(ctx, message) + return False + + return True + return check(predicate) + + +async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]: + # Give feedback that the bot is working. + async with ctx.channel.typing(): + url_str = parse.urlencode({ + "input": query, + "appid": APPID, + "output": DEFAULT_OUTPUT_FORMAT, + "format": "image,plaintext" + }) + request_url = QUERY.format(request="query", data=url_str) + + async with bot.http_session.get(request_url) as response: + json = await response.json(content_type='text/plain') + + result = json["queryresult"] + + if not result["success"]: + message = f"I couldn't find anything for {query}." + await send_embed(ctx, message) + return + + if result["error"]: + message = "Something went wrong internally with your request, please notify staff!" + log.warning(f"Something went wrong getting a response from wolfram: {url_str}, Response: {json}") + await send_embed(ctx, message) + return + + if not result["numpods"]: + message = "Could not find any results." + await send_embed(ctx, message) + return + + pods = result["pods"] + pages = [] + for pod in pods[:MAX_PODS]: + subs = pod.get("subpods") + + for sub in subs: + title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") + img = sub["img"]["src"] + pages.append((title, img)) + return pages + + +class Wolfram: + """ + Commands for interacting with the Wolfram|Alpha API. + """ + + def __init__(self, bot: commands.Bot): + self.bot = bot + + @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) + @custom_cooldown(*COOLDOWN_IGNORERS) + async def wolfram_command(self, ctx: Context, *, query: str) -> None: + """ + Requests all answers on a single image, + sends an image of all related pods + + :param ctx: Context + :param query: str - string request to api + """ + + url_str = parse.urlencode({ + "i": query, + "appid": APPID, + }) + query = QUERY.format(request="simple", data=url_str) + + # Give feedback that the bot is working. + async with ctx.channel.typing(): + async with self.bot.http_session.get(query) as response: + status = response.status + image_bytes = await response.read() + + f = discord.File(BytesIO(image_bytes), filename="image.png") + image_url = "attachment://image.png" + + if status == 501: + message = "Failed to get response" + footer = "" + color = Colours.soft_red + elif status == 400: + message = "No input found" + footer = "" + color = Colours.soft_red + else: + message = "" + footer = "View original for a bigger picture." + color = Colours.soft_orange + + # Sends a "blank" embed if no request is received, unsure how to fix + await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) + + @wolfram_command.command(name="page", aliases=("pa", "p")) + @custom_cooldown(*COOLDOWN_IGNORERS) + async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: + """ + Requests a drawn image of given query + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc + + :param ctx: Context + :param query: str - string request to api + """ + + pages = await get_pod_pages(ctx, self.bot, query) + + if not pages: + return + + embed = Embed() + embed.set_author(name="Wolfram Alpha", + icon_url=WOLF_IMAGE, + url="https://www.wolframalpha.com/") + embed.colour = Colours.soft_orange + + await ImagePaginator.paginate(pages, ctx, embed) + + @wolfram_command.command(name="cut", aliases=("c",)) + @custom_cooldown(*COOLDOWN_IGNORERS) + async def wolfram_cut_command(self, ctx, *, query: str) -> None: + """ + Requests a drawn image of given query + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc + + :param ctx: Context + :param query: str - string request to api + """ + + pages = await get_pod_pages(ctx, self.bot, query) + + if not pages: + return + + if len(pages) >= 2: + page = pages[1] + else: + page = pages[0] + + await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) + + @wolfram_command.command(name="short", aliases=("sh", "s")) + @custom_cooldown(*COOLDOWN_IGNORERS) + async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: + """ + Requests an answer to a simple question + Responds in plaintext + + :param ctx: Context + :param query: str - string request to api + """ + + url_str = parse.urlencode({ + "i": query, + "appid": APPID, + }) + query = QUERY.format(request="result", data=url_str) + + # Give feedback that the bot is working. + async with ctx.channel.typing(): + async with self.bot.http_session.get(query) as response: + status = response.status + response_text = await response.text() + + if status == 501: + message = "Failed to get response" + color = Colours.soft_red + + elif status == 400: + message = "No input found" + color = Colours.soft_red + else: + message = response_text + color = Colours.soft_orange + + await send_embed(ctx, message, color) + + +def setup(bot: commands.Bot) -> None: + bot.add_cog(Wolfram(bot)) + log.info("Cog loaded: Wolfram") diff --git a/bot/constants.py b/bot/constants.py index 2433d15ef..145dc4700 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -224,6 +224,7 @@ class Colours(metaclass=YAMLGetter): soft_red: int soft_green: int + soft_orange: int class Emojis(metaclass=YAMLGetter): @@ -424,6 +425,14 @@ class Reddit(metaclass=YAMLGetter): subreddits: list +class Wolfram(metaclass=YAMLGetter): + section = "wolfram" + + user_limit_day: int + guild_limit_day: int + key: str + + class AntiSpam(metaclass=YAMLGetter): section = 'anti_spam' diff --git a/bot/pagination.py b/bot/pagination.py index 9319a5b60..cfd6287f7 100644 --- a/bot/pagination.py +++ b/bot/pagination.py @@ -1,16 +1,16 @@ import asyncio import logging -from typing import Iterable, Optional +from typing import Iterable, List, Optional, Tuple from discord import Embed, Member, Reaction from discord.abc import User from discord.ext.commands import Context, Paginator -LEFT_EMOJI = "\u2B05" -RIGHT_EMOJI = "\u27A1" -DELETE_EMOJI = "\u274c" -FIRST_EMOJI = "\u23EE" -LAST_EMOJI = "\u23ED" +FIRST_EMOJI = "\u23EE" # [:track_previous:] +LEFT_EMOJI = "\u2B05" # [:arrow_left:] +RIGHT_EMOJI = "\u27A1" # [:arrow_right:] +LAST_EMOJI = "\u23ED" # [:track_next:] +DELETE_EMOJI = "\u274c" # [:x:] PAGINATION_EMOJI = [FIRST_EMOJI, LEFT_EMOJI, RIGHT_EMOJI, LAST_EMOJI, DELETE_EMOJI] @@ -275,3 +275,176 @@ class LinePaginator(Paginator): log.debug("Ending pagination and removing all reactions...") await message.clear_reactions() + + +class ImagePaginator(Paginator): + """ + Helper class that paginates images for embeds in messages. + Close resemblance to LinePaginator, except focuses on images over text. + + Refer to ImagePaginator.paginate for documentation on how to use. + """ + + def __init__(self, prefix="", suffix=""): + super().__init__(prefix, suffix) + self._current_page = [prefix] + self.images = [] + self._pages = [] + + def add_line(self, line: str='', *, empty: bool=False) -> None: + """ + Adds a line to each page, usually just 1 line in this context + :param line: str to be page content / title + :param empty: if there should be new lines between entries + """ + + if line: + self._count = len(line) + else: + self._count = 0 + self._current_page.append(line) + self.close_page() + + def add_image(self, image: str=None) -> None: + """ + Adds an image to a page + :param image: image url to be appended + """ + + self.images.append(image) + + @classmethod + async def paginate(cls, pages: List[Tuple[str, str]], ctx: Context, embed: Embed, + prefix: str="", suffix: str="", timeout: int=300): + """ + Use a paginator and set of reactions to provide + pagination over a set of title/image pairs.The reactions are + used to switch page, or to finish with pagination. + + When used, this will send a message using `ctx.send()` and + apply a set of reactions to it. These reactions may + be used to change page, or to remove pagination from the message. + + Note: Pagination will be removed automatically + if no reaction is added for five minutes (300 seconds). + + >>> embed = Embed() + >>> embed.set_author(name="Some Operation", url=url, icon_url=icon) + >>> await ImagePaginator.paginate(pages, ctx, embed) + + Parameters + ----------- + :param pages: An iterable of tuples with title for page, and img url + :param ctx: ctx for message + :param embed: base embed to modify + :param prefix: prefix of message + :param suffix: suffix of message + :param timeout: timeout for when reactions get auto-removed + """ + + def check_event(reaction_: Reaction, member: Member) -> bool: + """ + Checks each reaction added, if it matches our conditions pass the wait_for + :param reaction_: reaction added + :param member: reaction added by member + """ + + return all(( + # Reaction is on the same message sent + reaction_.message.id == message.id, + # The reaction is part of the navigation menu + reaction_.emoji in PAGINATION_EMOJI, + # The reactor is not a bot + not member.bot + )) + + paginator = cls(prefix=prefix, suffix=suffix) + current_page = 0 + + for text, image_url in pages: + paginator.add_line(text) + paginator.add_image(image_url) + + embed.description = paginator.pages[current_page] + image = paginator.images[current_page] + + if image: + embed.set_image(url=image) + + if len(paginator.pages) <= 1: + return await ctx.send(embed=embed) + + embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") + message = await ctx.send(embed=embed) + + for emoji in PAGINATION_EMOJI: + await message.add_reaction(emoji) + + while True: + # Start waiting for reactions + try: + reaction, user = await ctx.bot.wait_for("reaction_add", timeout=timeout, check=check_event) + except asyncio.TimeoutError: + log.debug("Timed out waiting for a reaction") + break # We're done, no reactions for the last 5 minutes + + # Deletes the users reaction + await message.remove_reaction(reaction.emoji, user) + + # Delete reaction press - [:x:] + if reaction.emoji == DELETE_EMOJI: + log.debug("Got delete reaction") + break + + # First reaction press - [:track_previous:] + if reaction.emoji == FIRST_EMOJI: + if current_page == 0: + log.debug("Got first page reaction, but we're on the first page - ignoring") + continue + + current_page = 0 + reaction_type = "first" + + # Last reaction press - [:track_next:] + if reaction.emoji == LAST_EMOJI: + if current_page >= len(paginator.pages) - 1: + log.debug("Got last page reaction, but we're on the last page - ignoring") + continue + + current_page = len(paginator.pages - 1) + reaction_type = "last" + + # Previous reaction press - [:arrow_left: ] + if reaction.emoji == LEFT_EMOJI: + if current_page <= 0: + log.debug("Got previous page reaction, but we're on the first page - ignoring") + continue + + current_page -= 1 + reaction_type = "previous" + + # Next reaction press - [:arrow_right:] + if reaction.emoji == RIGHT_EMOJI: + if current_page >= len(paginator.pages) - 1: + log.debug("Got next page reaction, but we're on the last page - ignoring") + continue + + current_page += 1 + reaction_type = "next" + + # Magic happens here, after page and reaction_type is set + embed.description = "" + await message.edit(embed=embed) + embed.description = paginator.pages[current_page] + + image = paginator.images[current_page] + if image: + embed.set_image(url=image) + + embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") + log.debug(f"Got {reaction_type} page reaction - changing to page {current_page + 1}/{len(paginator.pages)}") + + await message.edit(embed=embed) + + log.debug("Ending pagination and removing all reactions...") + await message.clear_reactions() diff --git a/config-default.yml b/config-default.yml index 7130eb540..15f1a143a 100644 --- a/config-default.yml +++ b/config-default.yml @@ -15,6 +15,7 @@ style: colours: soft_red: 0xcd6d6d soft_green: 0x68c290 + soft_orange: 0xf9cb54 emojis: defcon_disabled: "<:defcondisabled:470326273952972810>" @@ -306,3 +307,9 @@ reddit: request_delay: 60 subreddits: - 'r/Python' + +wolfram: + # Max requests per day. + user_limit_day: 10 + guild_limit_day: 67 + key: !ENV "WOLFRAM_API_KEY" -- cgit v1.2.3