1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
|
import logging
from io import BytesIO
from typing import Callable, Optional
from urllib.parse import urlencode
import arrow
import discord
from discord import Embed
from discord.ext import commands
from discord.ext.commands import BucketType, Cog, Context, check, group
from bot.bot import Bot
from bot.constants import Colours, STAFF_ROLES, Wolfram
from bot.utils.pagination import ImagePaginator
log = logging.getLogger(__name__)
APPID = Wolfram.key
DEFAULT_OUTPUT_FORMAT = "JSON"
QUERY = "http://api.wolframalpha.com/v2/{request}"
WOLF_IMAGE = "https://www.symbols.com/gi.php?type=1&id=2886&i=1"
MAX_PODS = 20
# Allows for 10 wolfram calls pr user pr day
usercd = commands.CooldownMapping.from_cooldown(Wolfram.user_limit_day, 60 * 60 * 24, BucketType.user)
# Allows for max api requests / days in month per day for the entire guild (Temporary)
guildcd = commands.CooldownMapping.from_cooldown(Wolfram.guild_limit_day, 60 * 60 * 24, BucketType.guild)
async def send_embed(
ctx: Context,
message_txt: str,
colour: int = Colours.soft_red,
footer: str = None,
img_url: str = None,
f: discord.File = None
) -> None:
"""Generate & send a response embed with Wolfram as the author."""
embed = Embed(colour=colour)
embed.description = message_txt
embed.set_author(
name="Wolfram Alpha",
icon_url=WOLF_IMAGE,
url="https://www.wolframalpha.com/"
)
if footer:
embed.set_footer(text=footer)
if img_url:
embed.set_image(url=img_url)
await ctx.send(embed=embed, file=f)
def custom_cooldown(*ignore: int) -> Callable:
"""
Implement per-user and per-guild cooldowns for requests to the Wolfram API.
A list of roles may be provided to ignore the per-user cooldown.
"""
async def predicate(ctx: Context) -> bool:
if ctx.invoked_with == "help":
# if the invoked command is help we don't want to increase the ratelimits since it's not actually
# invoking the command/making a request, so instead just check if the user/guild are on cooldown.
guild_cooldown = not guildcd.get_bucket(ctx.message).get_tokens() == 0 # if guild is on cooldown
# check the message is in a guild, and check user bucket if user is not ignored
if ctx.guild and not any(r.id in ignore for r in ctx.author.roles):
return guild_cooldown and not usercd.get_bucket(ctx.message).get_tokens() == 0
return guild_cooldown
user_bucket = usercd.get_bucket(ctx.message)
if all(role.id not in ignore for role in ctx.author.roles):
user_rate = user_bucket.update_rate_limit()
if user_rate:
# Can't use api; cause: member limit
cooldown = arrow.utcnow().shift(seconds=int(user_rate)).humanize(only_distance=True)
message = (
"You've used up your limit for Wolfram|Alpha requests.\n"
f"Cooldown: {cooldown}"
)
await send_embed(ctx, message)
return False
guild_bucket = guildcd.get_bucket(ctx.message)
guild_rate = guild_bucket.update_rate_limit()
# Repr has a token attribute to read requests left
log.debug(guild_bucket)
if guild_rate:
# Can't use api; cause: guild limit
message = (
"The max limit of requests for the server has been reached for today.\n"
f"Cooldown: {int(guild_rate)}"
)
await send_embed(ctx, message)
return False
return True
return check(predicate)
async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[list[tuple[str, str]]]:
"""Get the Wolfram API pod pages for the provided query."""
async with ctx.typing():
params = {
"input": query,
"appid": APPID,
"output": DEFAULT_OUTPUT_FORMAT,
"format": "image,plaintext",
"location": "the moon",
"latlong": "0.0,0.0",
"ip": "1.1.1.1"
}
request_url = QUERY.format(request="query")
async with bot.http_session.get(url=request_url, params=params) as response:
json = await response.json(content_type="text/plain")
result = json["queryresult"]
log_full_url = f"{request_url}?{urlencode(params)}"
if result["error"]:
# API key not set up correctly
if result["error"]["msg"] == "Invalid appid":
message = "Wolfram API key is invalid or missing."
log.warning(
"API key seems to be missing, or invalid when "
f"processing a wolfram request: {log_full_url}, Response: {json}"
)
await send_embed(ctx, message)
return None
message = "Something went wrong internally with your request, please notify staff!"
log.warning(f"Something went wrong getting a response from wolfram: {log_full_url}, Response: {json}")
await send_embed(ctx, message)
return None
if not result["success"]:
message = f"I couldn't find anything for {query}."
await send_embed(ctx, message)
return None
if not result["numpods"]:
message = "Could not find any results."
await send_embed(ctx, message)
return None
pods = result["pods"]
pages = []
for pod in pods[:MAX_PODS]:
subs = pod.get("subpods")
for sub in subs:
title = sub.get("title") or sub.get("plaintext") or sub.get("id", "")
img = sub["img"]["src"]
pages.append((title, img))
return pages
class Wolfram(Cog):
"""Commands for interacting with the Wolfram|Alpha API."""
def __init__(self, bot: Bot):
self.bot = bot
@group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
@custom_cooldown(*STAFF_ROLES)
async def wolfram_command(self, ctx: Context, *, query: str) -> None:
"""Requests all answers on a single image, sends an image of all related pods."""
params = {
"i": query,
"appid": APPID,
"location": "the moon",
"latlong": "0.0,0.0",
"ip": "1.1.1.1"
}
request_url = QUERY.format(request="simple")
# Give feedback that the bot is working.
async with ctx.typing():
async with self.bot.http_session.get(url=request_url, params=params) as response:
status = response.status
image_bytes = await response.read()
f = discord.File(BytesIO(image_bytes), filename="image.png")
image_url = "attachment://image.png"
if status == 501:
message = "Failed to get response."
footer = ""
color = Colours.soft_red
elif status == 400:
message = "No input found."
footer = ""
color = Colours.soft_red
elif status == 403:
message = "Wolfram API key is invalid or missing."
footer = ""
color = Colours.soft_red
else:
message = ""
footer = "View original for a bigger picture."
color = Colours.soft_orange
# Sends a "blank" embed if no request is received, unsure how to fix
await send_embed(ctx, message, color, footer=footer, img_url=image_url, f=f)
@wolfram_command.command(name="page", aliases=("pa", "p"))
@custom_cooldown(*STAFF_ROLES)
async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
"""
Requests a drawn image of given query.
Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
return
embed = Embed()
embed.set_author(
name="Wolfram Alpha",
icon_url=WOLF_IMAGE,
url="https://www.wolframalpha.com/"
)
embed.colour = Colours.soft_orange
await ImagePaginator.paginate(pages, ctx, embed)
@wolfram_command.command(name="cut", aliases=("c",))
@custom_cooldown(*STAFF_ROLES)
async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
"""
Requests a drawn image of given query.
Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
return
if len(pages) >= 2:
page = pages[1]
else:
page = pages[0]
await send_embed(ctx, page[0], colour=Colours.soft_orange, img_url=page[1])
@wolfram_command.command(name="short", aliases=("sh", "s"))
@custom_cooldown(*STAFF_ROLES)
async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
"""Requests an answer to a simple question."""
params = {
"i": query,
"appid": APPID,
"location": "the moon",
"latlong": "0.0,0.0",
"ip": "1.1.1.1"
}
request_url = QUERY.format(request="result")
# Give feedback that the bot is working.
async with ctx.typing():
async with self.bot.http_session.get(url=request_url, params=params) as response:
status = response.status
response_text = await response.text()
if status == 501:
message = "Failed to get response."
color = Colours.soft_red
elif status == 400:
message = "No input found."
color = Colours.soft_red
elif response_text == "Error 1: Invalid appid.":
message = "Wolfram API key is invalid or missing."
color = Colours.soft_red
else:
message = response_text
color = Colours.soft_orange
await send_embed(ctx, message, color)
def setup(bot: Bot) -> None:
"""Load the Wolfram cog."""
bot.add_cog(Wolfram(bot))
|