aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/evergreen/wolfram.py
blob: 54ff5d92b7c8b082e3173e7b63e1b20d4516c248 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import logging
from io import BytesIO
from typing import Callable, Optional
from urllib.parse import urlencode

import arrow
import discord
from discord import Embed
from discord.ext import commands
from discord.ext.commands import BucketType, Cog, Context, check, group

from bot.bot import Bot
from bot.constants import Colours, STAFF_ROLES, Wolfram
from bot.utils.pagination import ImagePaginator

log = logging.getLogger(__name__)

APPID = Wolfram.key
DEFAULT_OUTPUT_FORMAT = "JSON"
QUERY = "http://api.wolframalpha.com/v2/{request}"
WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1"

MAX_PODS = 20

# Allows for 10 wolfram calls pr user pr day
usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user)

# Allows for max api requests / days in month per day for the entire guild (Temporary)
guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild)


async def send_embed(
        ctx: Context,
        message_txt: str,
        colour: int = Colours.soft_red,
        footer: str = None,
        img_url: str = None,
        f: discord.File = None
) -> None:
    """Generate & send a response embed with Wolfram as the author."""
    embed = Embed(colour=colour)
    embed.description = message_txt
    embed.set_author(
        name="Wolfram Alpha",
        icon_url=WOLF_IMAGE,
        url="https://www.wolframalpha.com/"
    )
    if footer:
        embed.set_footer(text=footer)

    if img_url:
        embed.set_image(url=img_url)

    await ctx.send(embed=embed, file=f)


def custom_cooldown(*ignore: int) -> Callable:
    """
    Implement per-user and per-guild cooldowns for requests to the Wolfram API.

    A list of roles may be provided to ignore the per-user cooldown.
    """
    async def predicate(ctx: Context) -> bool:
        if ctx.invoked_with == "help":
            # if the invoked command is help we don't want to increase the ratelimits since it's not actually
            # invoking the command/making a request, so instead just check if the user/guild are on cooldown.
            guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0  # if guild is on cooldown
            # check the message is in a guild, and check user bucket if user is not ignored
            if ctx.guild and not any(r.id in ignore for r in ctx.author.roles):
                return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0
            return guild_cooldown

        user_bucket = usercd.get_bucket(ctx.message)

        if all(role.id not in ignore for role in ctx.author.roles):
            user_rate = user_bucket.update_rate_limit()

            if user_rate:
                # Can't use api; cause: member limit
                cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True)
                message = (
                    "You've used up your limit for Wolfram|Alpha requests.\n"
                    f"Cooldown: {cooldown}"
                )
                await send_embed(ctx, message)
                return False

        guild_bucket = guildcd.get_bucket(ctx.message)
        guild_rate = guild_bucket.update_rate_limit()

        # Repr has a token attribute to read requests left
        log.debug(guild_bucket)

        if guild_rate:
            # Can't use api; cause: guild limit
            message = (
                "The max limit of requests for the server has been reached for today.\n"
                f"Cooldown: {int(guild_rate)}"
            )
            await send_embed(ctx, message)
            return False

        return True

    return check(predicate)


async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple]]:
    """Get the Wolfram API pod pages for the provided query."""
    async with ctx.typing():
        params = {
            "input": query,
            "appid": APPID,
            "output": DEFAULT_OUTPUT_FORMAT,
            "format": "image,plaintext",
            "location": "the moon",
            "latlong": "0.0,0.0",
            "ip": "1.1.1.1"
        }
        request_url = QUERY.format(request="query")

        async with bot.http_session.get(url=request_url, params=params) as response:
            json = await response.json(content_type="text/plain")

        result = json["queryresult"]
        log_full_url = f"{request_url}?{urlencode(params)}"
        if result["error"]:
            # API key not set up correctly
            if result["error"]["msg"] == "Invalid appid":
                message = "Wolfram API key is invalid or missing."
                log.warning(
                    "API key seems to be missing, or invalid when "
                    f"processing a wolfram request: {log_full_url}, Response: {json}"
                )
                await send_embed(ctx, message)
                return None

            message = "Something went wrong internally with your request, please notify staff!"
            log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}")
            await send_embed(ctx, message)
            return None

        if not result["success"]:
            message = f"I couldn't find anything for {query}."
            await send_embed(ctx, message)
            return None

        if not result["numpods"]:
            message = "Could not find any results."
            await send_embed(ctx, message)
            return None

        pods = result["pods"]
        pages = []
        for pod in pods[:MAX_PODS]:
            subs = pod.get("subpods")

            for sub in subs:
                title = sub.get("title") or sub.get("plaintext") or sub.get("id", "")
                img = sub["img"]["src"]
                pages.append((title, img))
        return pages


class Wolfram(Cog):
    """Commands for interacting with the Wolfram|Alpha API."""

    def __init__(self, bot: Bot):
        self.bot = bot

    @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
    @custom_cooldown(*STAFF_ROLES)
    async def wolfram_command(self, ctx: Context, *, query: str) -> None:
        """Requests all answers on a single image, sends an image of all related pods."""
        params = {
            "i": query,
            "appid": APPID,
            "location": "the moon",
            "latlong": "0.0,0.0",
            "ip": "1.1.1.1"
        }
        request_url = QUERY.format(request="simple")

        # Give feedback that the bot is working.
        async with ctx.typing():
            async with self.bot.http_session.get(url=request_url, params=params) as response:
                status = response.status
                image_bytes = await response.read()

            f = discord.File(BytesIO(image_bytes), filename="image.png")
            image_url = "attachment://image.png"

            if status == 501:
                message = "Failed to get response."
                footer = ""
                color = Colours.soft_red
            elif status == 400:
                message = "No input found."
                footer = ""
                color = Colours.soft_red
            elif status == 403:
                message = "Wolfram API key is invalid or missing."
                footer = ""
                color = Colours.soft_red
            else:
                message = ""
                footer = "View original for a bigger picture."
                color = Colours.soft_orange

            # Sends a "blank" embed if no request is received, unsure how to fix
            await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f)

    @wolfram_command.command(name="page", aliases=("pa", "p"))
    @custom_cooldown(*STAFF_ROLES)
    async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
        """
        Requests a drawn image of given query.

        Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
        """
        pages = await get_pod_pages(ctx, self.bot, query)

        if not pages:
            return

        embed = Embed()
        embed.set_author(
            name="Wolfram Alpha",
            icon_url=WOLF_IMAGE,
            url="https://www.wolframalpha.com/"
        )
        embed.colour = Colours.soft_orange

        await ImagePaginator.paginate(pages, ctx, embed)

    @wolfram_command.command(name="cut", aliases=("c",))
    @custom_cooldown(*STAFF_ROLES)
    async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
        """
        Requests a drawn image of given query.

        Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
        """
        pages = await get_pod_pages(ctx, self.bot, query)

        if not pages:
            return

        if len(pages) >= 2:
            page = pages[1]
        else:
            page = pages[0]

        await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1])

    @wolfram_command.command(name="short", aliases=("sh", "s"))
    @custom_cooldown(*STAFF_ROLES)
    async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
        """Requests an answer to a simple question."""
        params = {
            "i": query,
            "appid": APPID,
            "location": "the moon",
            "latlong": "0.0,0.0",
            "ip": "1.1.1.1"
        }
        request_url = QUERY.format(request="result")

        # Give feedback that the bot is working.
        async with ctx.typing():
            async with self.bot.http_session.get(url=request_url, params=params) as response:
                status = response.status
                response_text = await response.text()

            if status == 501:
                message = "Failed to get response."
                color = Colours.soft_red
            elif status == 400:
                message = "No input found."
                color = Colours.soft_red
            elif response_text == "Error 1: Invalid appid.":
                message = "Wolfram API key is invalid or missing."
                color = Colours.soft_red
            else:
                message = response_text
                color = Colours.soft_orange

            await send_embed(ctx, message, color)


def setup(bot: Bot) -> None:
    """Load the Wolfram cog."""
    bot.add_cog(Wolfram(bot))