diff options
Diffstat (limited to '')
| -rw-r--r-- | bot/__main__.py | 1 | ||||
| -rw-r--r-- | bot/cogs/wolfram.py | 280 | ||||
| -rw-r--r-- | bot/constants.py | 8 | ||||
| -rw-r--r-- | bot/pagination.py | 164 | ||||
| -rw-r--r-- | config-default.yml | 7 | ||||
| -rw-r--r-- | tests/bot/test_pagination.py | 15 | 
6 files changed, 0 insertions, 475 deletions
diff --git a/bot/__main__.py b/bot/__main__.py index f698b5662..fe2cf90e6 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -74,7 +74,6 @@ bot.load_extension("bot.cogs.token_remover")  bot.load_extension("bot.cogs.utils")  bot.load_extension("bot.cogs.watchchannels")  bot.load_extension("bot.cogs.webhook_remover") -bot.load_extension("bot.cogs.wolfram")  if constants.HelpChannels.enable:      bot.load_extension("bot.cogs.help_channels") diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py deleted file mode 100644 index e6cae3bb8..000000000 --- a/bot/cogs/wolfram.py +++ /dev/null @@ -1,280 +0,0 @@ -import logging -from io import BytesIO -from typing import Callable, List, Optional, Tuple -from urllib import parse - -import discord -from dateutil.relativedelta import relativedelta -from discord import Embed -from discord.ext import commands -from discord.ext.commands import BucketType, Cog, Context, check, group - -from bot.bot import Bot -from bot.constants import Colours, STAFF_ROLES, Wolfram -from bot.pagination import ImagePaginator -from bot.utils.time import humanize_delta - -log = logging.getLogger(__name__) - -APPID = Wolfram.key -DEFAULT_OUTPUT_FORMAT = "JSON" -QUERY = "http://api.wolframalpha.com/v2/{request}?{data}" -WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1" - -MAX_PODS = 20 - -# Allows for 10 wolfram calls pr user pr day -usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60*60*24, BucketType.user) - -# Allows for max api requests / days in month per day for the entire guild (Temporary) -guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60*60*24, BucketType.guild) - - -async def send_embed( -        ctx: Context, -        message_txt: str, -        colour: int = Colours.soft_red, -        footer: str = None, -        img_url: str = None, -        f: discord.File = None -) -> None: -    """Generate & send a response embed with Wolfram as the author.""" -    embed = Embed(colour=colour) -    embed.description = message_txt -    embed.set_author(name="Wolfram Alpha", -                     icon_url=WOLF_IMAGE, -                     url="https://www.wolframalpha.com/") -    if footer: -        embed.set_footer(text=footer) - -    if img_url: -        embed.set_image(url=img_url) - -    await ctx.send(embed=embed, file=f) - - -def custom_cooldown(*ignore: List[int]) -> Callable: -    """ -    Implement per-user and per-guild cooldowns for requests to the Wolfram API. - -    A list of roles may be provided to ignore the per-user cooldown -    """ -    async def predicate(ctx: Context) -> bool: -        if ctx.invoked_with == 'help': -            # if the invoked command is help we don't want to increase the ratelimits since it's not actually -            # invoking the command/making a request, so instead just check if the user/guild are on cooldown. -            guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0  # if guild is on cooldown -            if not any(r.id in ignore for r in ctx.author.roles):  # check user bucket if user is not ignored -                return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0 -            return guild_cooldown - -        user_bucket = usercd.get_bucket(ctx.message) - -        if all(role.id not in ignore for role in ctx.author.roles): -            user_rate = user_bucket.update_rate_limit() - -            if user_rate: -                # Can't use api; cause: member limit -                delta = relativedelta(seconds=int(user_rate)) -                cooldown = humanize_delta(delta) -                message = ( -                    "You've used up your limit for Wolfram|Alpha requests.\n" -                    f"Cooldown: {cooldown}" -                ) -                await send_embed(ctx, message) -                return False - -        guild_bucket = guildcd.get_bucket(ctx.message) -        guild_rate = guild_bucket.update_rate_limit() - -        # Repr has a token attribute to read requests left -        log.debug(guild_bucket) - -        if guild_rate: -            # Can't use api; cause: guild limit -            message = ( -                "The max limit of requests for the server has been reached for today.\n" -                f"Cooldown: {int(guild_rate)}" -            ) -            await send_embed(ctx, message) -            return False - -        return True -    return check(predicate) - - -async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[List[Tuple]]: -    """Get the Wolfram API pod pages for the provided query.""" -    async with ctx.channel.typing(): -        url_str = parse.urlencode({ -            "input": query, -            "appid": APPID, -            "output": DEFAULT_OUTPUT_FORMAT, -            "format": "image,plaintext" -        }) -        request_url = QUERY.format(request="query", data=url_str) - -        async with bot.http_session.get(request_url) as response: -            json = await response.json(content_type='text/plain') - -        result = json["queryresult"] - -        if result["error"]: -            # API key not set up correctly -            if result["error"]["msg"] == "Invalid appid": -                message = "Wolfram API key is invalid or missing." -                log.warning( -                    "API key seems to be missing, or invalid when " -                    f"processing a wolfram request: {url_str}, Response: {json}" -                ) -                await send_embed(ctx, message) -                return - -            message = "Something went wrong internally with your request, please notify staff!" -            log.warning(f"Something went wrong getting a response from wolfram: {url_str}, Response: {json}") -            await send_embed(ctx, message) -            return - -        if not result["success"]: -            message = f"I couldn't find anything for {query}." -            await send_embed(ctx, message) -            return - -        if not result["numpods"]: -            message = "Could not find any results." -            await send_embed(ctx, message) -            return - -        pods = result["pods"] -        pages = [] -        for pod in pods[:MAX_PODS]: -            subs = pod.get("subpods") - -            for sub in subs: -                title = sub.get("title") or sub.get("plaintext") or sub.get("id", "") -                img = sub["img"]["src"] -                pages.append((title, img)) -        return pages - - -class Wolfram(Cog): -    """Commands for interacting with the Wolfram|Alpha API.""" - -    def __init__(self, bot: Bot): -        self.bot = bot - -    @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) -    @custom_cooldown(*STAFF_ROLES) -    async def wolfram_command(self, ctx: Context, *, query: str) -> None: -        """Requests all answers on a single image, sends an image of all related pods.""" -        url_str = parse.urlencode({ -            "i": query, -            "appid": APPID, -        }) -        query = QUERY.format(request="simple", data=url_str) - -        # Give feedback that the bot is working. -        async with ctx.channel.typing(): -            async with self.bot.http_session.get(query) as response: -                status = response.status -                image_bytes = await response.read() - -            f = discord.File(BytesIO(image_bytes), filename="image.png") -            image_url = "attachment://image.png" - -            if status == 501: -                message = "Failed to get response" -                footer = "" -                color = Colours.soft_red -            elif status == 400: -                message = "No input found" -                footer = "" -                color = Colours.soft_red -            elif status == 403: -                message = "Wolfram API key is invalid or missing." -                footer = "" -                color = Colours.soft_red -            else: -                message = "" -                footer = "View original for a bigger picture." -                color = Colours.soft_orange - -            # Sends a "blank" embed if no request is received, unsure how to fix -            await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f) - -    @wolfram_command.command(name="page", aliases=("pa", "p")) -    @custom_cooldown(*STAFF_ROLES) -    async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: -        """ -        Requests a drawn image of given query. - -        Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. -        """ -        pages = await get_pod_pages(ctx, self.bot, query) - -        if not pages: -            return - -        embed = Embed() -        embed.set_author(name="Wolfram Alpha", -                         icon_url=WOLF_IMAGE, -                         url="https://www.wolframalpha.com/") -        embed.colour = Colours.soft_orange - -        await ImagePaginator.paginate(pages, ctx, embed) - -    @wolfram_command.command(name="cut", aliases=("c",)) -    @custom_cooldown(*STAFF_ROLES) -    async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: -        """ -        Requests a drawn image of given query. - -        Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. -        """ -        pages = await get_pod_pages(ctx, self.bot, query) - -        if not pages: -            return - -        if len(pages) >= 2: -            page = pages[1] -        else: -            page = pages[0] - -        await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1]) - -    @wolfram_command.command(name="short", aliases=("sh", "s")) -    @custom_cooldown(*STAFF_ROLES) -    async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: -        """Requests an answer to a simple question.""" -        url_str = parse.urlencode({ -            "i": query, -            "appid": APPID, -        }) -        query = QUERY.format(request="result", data=url_str) - -        # Give feedback that the bot is working. -        async with ctx.channel.typing(): -            async with self.bot.http_session.get(query) as response: -                status = response.status -                response_text = await response.text() - -            if status == 501: -                message = "Failed to get response" -                color = Colours.soft_red -            elif status == 400: -                message = "No input found" -                color = Colours.soft_red -            elif response_text == "Error 1: Invalid appid": -                message = "Wolfram API key is invalid or missing." -                color = Colours.soft_red -            else: -                message = response_text -                color = Colours.soft_orange - -            await send_embed(ctx, message, color) - - -def setup(bot: Bot) -> None: -    """Load the Wolfram cog.""" -    bot.add_cog(Wolfram(bot)) diff --git a/bot/constants.py b/bot/constants.py index f3db80279..17fe34e95 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -514,14 +514,6 @@ class Reddit(metaclass=YAMLGetter):      secret: Optional[str] -class Wolfram(metaclass=YAMLGetter): -    section = "wolfram" - -    user_limit_day: int -    guild_limit_day: int -    key: Optional[str] - -  class AntiSpam(metaclass=YAMLGetter):      section = 'anti_spam' diff --git a/bot/pagination.py b/bot/pagination.py index bab98cacf..182b2fa76 100644 --- a/bot/pagination.py +++ b/bot/pagination.py @@ -374,167 +374,3 @@ class LinePaginator(Paginator):          log.debug("Ending pagination and clearing reactions.")          with suppress(discord.NotFound):              await message.clear_reactions() - - -class ImagePaginator(Paginator): -    """ -    Helper class that paginates images for embeds in messages. - -    Close resemblance to LinePaginator, except focuses on images over text. - -    Refer to ImagePaginator.paginate for documentation on how to use. -    """ - -    def __init__(self, prefix: str = "", suffix: str = ""): -        super().__init__(prefix, suffix) -        self._current_page = [prefix] -        self.images = [] -        self._pages = [] -        self._count = 0 - -    def add_line(self, line: str = '', *, empty: bool = False) -> None: -        """Adds a line to each page.""" -        if line: -            self._count = len(line) -        else: -            self._count = 0 -        self._current_page.append(line) -        self.close_page() - -    def add_image(self, image: str = None) -> None: -        """Adds an image to a page.""" -        self.images.append(image) - -    @classmethod -    async def paginate( -        cls, -        pages: t.List[t.Tuple[str, str]], -        ctx: Context, embed: discord.Embed, -        prefix: str = "", -        suffix: str = "", -        timeout: int = 300, -        exception_on_empty_embed: bool = False -    ) -> t.Optional[discord.Message]: -        """ -        Use a paginator and set of reactions to provide pagination over a set of title/image pairs. - -        The reactions are used to switch page, or to finish with pagination. - -        When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may -        be used to change page, or to remove pagination from the message. - -        Note: Pagination will be removed automatically if no reaction is added for five minutes (300 seconds). - -        Example: -        >>> embed = discord.Embed() -        >>> embed.set_author(name="Some Operation", url=url, icon_url=icon) -        >>> await ImagePaginator.paginate(pages, ctx, embed) -        """ -        def check_event(reaction_: discord.Reaction, member: discord.Member) -> bool: -            """Checks each reaction added, if it matches our conditions pass the wait_for.""" -            return all(( -                # Reaction is on the same message sent -                reaction_.message.id == message.id, -                # The reaction is part of the navigation menu -                str(reaction_.emoji) in PAGINATION_EMOJI, -                # The reactor is not a bot -                not member.bot -            )) - -        paginator = cls(prefix=prefix, suffix=suffix) -        current_page = 0 - -        if not pages: -            if exception_on_empty_embed: -                log.exception("Pagination asked for empty image list") -                raise EmptyPaginatorEmbed("No images to paginate") - -            log.debug("No images to add to paginator, adding '(no images to display)' message") -            pages.append(("(no images to display)", "")) - -        for text, image_url in pages: -            paginator.add_line(text) -            paginator.add_image(image_url) - -        embed.description = paginator.pages[current_page] -        image = paginator.images[current_page] - -        if image: -            embed.set_image(url=image) - -        if len(paginator.pages) <= 1: -            return await ctx.send(embed=embed) - -        embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") -        message = await ctx.send(embed=embed) - -        for emoji in PAGINATION_EMOJI: -            await message.add_reaction(emoji) - -        while True: -            # Start waiting for reactions -            try: -                reaction, user = await ctx.bot.wait_for("reaction_add", timeout=timeout, check=check_event) -            except asyncio.TimeoutError: -                log.debug("Timed out waiting for a reaction") -                break  # We're done, no reactions for the last 5 minutes - -            # Deletes the users reaction -            await message.remove_reaction(reaction.emoji, user) - -            # Delete reaction press - [:trashcan:] -            if str(reaction.emoji) == DELETE_EMOJI: -                log.debug("Got delete reaction") -                return await message.delete() - -            # First reaction press - [:track_previous:] -            if reaction.emoji == FIRST_EMOJI: -                if current_page == 0: -                    log.debug("Got first page reaction, but we're on the first page - ignoring") -                    continue - -                current_page = 0 -                reaction_type = "first" - -            # Last reaction press - [:track_next:] -            if reaction.emoji == LAST_EMOJI: -                if current_page >= len(paginator.pages) - 1: -                    log.debug("Got last page reaction, but we're on the last page - ignoring") -                    continue - -                current_page = len(paginator.pages) - 1 -                reaction_type = "last" - -            # Previous reaction press - [:arrow_left: ] -            if reaction.emoji == LEFT_EMOJI: -                if current_page <= 0: -                    log.debug("Got previous page reaction, but we're on the first page - ignoring") -                    continue - -                current_page -= 1 -                reaction_type = "previous" - -            # Next reaction press - [:arrow_right:] -            if reaction.emoji == RIGHT_EMOJI: -                if current_page >= len(paginator.pages) - 1: -                    log.debug("Got next page reaction, but we're on the last page - ignoring") -                    continue - -                current_page += 1 -                reaction_type = "next" - -            # Magic happens here, after page and reaction_type is set -            embed.description = paginator.pages[current_page] - -            image = paginator.images[current_page] -            if image: -                embed.set_image(url=image) - -            embed.set_footer(text=f"Page {current_page + 1}/{len(paginator.pages)}") -            log.debug(f"Got {reaction_type} page reaction - changing to page {current_page + 1}/{len(paginator.pages)}") - -            await message.edit(embed=embed) - -        log.debug("Ending pagination and clearing reactions.") -        with suppress(discord.NotFound): -            await message.clear_reactions() diff --git a/config-default.yml b/config-default.yml index b4dc34e85..a0f601728 100644 --- a/config-default.yml +++ b/config-default.yml @@ -393,13 +393,6 @@ reddit:      secret:    !ENV "REDDIT_SECRET" -wolfram: -    # Max requests per day. -    user_limit_day: 10 -    guild_limit_day: 67 -    key: !ENV "WOLFRAM_API_KEY" - -  big_brother:      log_delay: 15      header_message_limit: 15 diff --git a/tests/bot/test_pagination.py b/tests/bot/test_pagination.py index ce880d457..630f2516d 100644 --- a/tests/bot/test_pagination.py +++ b/tests/bot/test_pagination.py @@ -44,18 +44,3 @@ class LinePaginatorTests(TestCase):          self.paginator.add_line('x' * (self.paginator.scale_to_size + 1))          # Note: item at index 1 is the truncated line, index 0 is prefix          self.assertEqual(self.paginator._current_page[1], 'x' * self.paginator.scale_to_size) - - -class ImagePaginatorTests(TestCase): -    """Tests functionality of the `ImagePaginator`.""" - -    def setUp(self): -        """Create a paginator for the test method.""" -        self.paginator = pagination.ImagePaginator() - -    def test_add_image_appends_image(self): -        """`add_image` appends the image to the image list.""" -        image = 'lemon' -        self.paginator.add_image(image) - -        assert self.paginator.images == [image]  |