aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar sco1 <[email protected]>2019-09-09 21:59:04 -0400
committerGravatar sco1 <[email protected]>2019-09-09 21:59:04 -0400
commit63bdc14a66ee24c79fd8a012eada0e75753e33d5 (patch)
tree061caf1aa3e047b0ad63780bcac51a6796d23049
parentUpdate linting dependencies & rules (diff)
Docstring linting chunk 1
-rw-r--r--bot/__init__.py2
-rw-r--r--bot/api.py48
-rw-r--r--bot/cogs/defcon.py51
-rw-r--r--bot/cogs/logging.py14
-rw-r--r--bot/cogs/tags.py53
-rw-r--r--bot/cogs/token_remover.py21
-rw-r--r--bot/cogs/wolfram.py66
7 files changed, 114 insertions, 141 deletions
diff --git a/bot/__init__.py b/bot/__init__.py
index 8efa5e53c..d094e8c13 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -9,7 +9,7 @@ logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
-def monkeypatch_trace(self, msg, *args, **kwargs):
+def monkeypatch_trace(self: logging.Logger, msg: str, *args, **kwargs) -> None:
"""
Log 'msg % args' with severity 'TRACE'.
diff --git a/bot/api.py b/bot/api.py
index cd19896e1..b714bda24 100644
--- a/bot/api.py
+++ b/bot/api.py
@@ -1,5 +1,6 @@
import asyncio
import logging
+from typing import Union
from urllib.parse import quote as quote_url
import aiohttp
@@ -10,11 +11,15 @@ log = logging.getLogger(__name__)
class ResponseCodeError(ValueError):
+ """Represent a not-OK response code."""
+
def __init__(self, response: aiohttp.ClientResponse):
self.response = response
class APIClient:
+ """Django Site API wrapper."""
+
def __init__(self, **kwargs):
auth_headers = {
'Authorization': f"Token {Keys.site_api}"
@@ -28,34 +33,40 @@ class APIClient:
self.session = aiohttp.ClientSession(**kwargs)
@staticmethod
- def _url_for(endpoint: str):
+ def _url_for(endpoint: str) -> str:
return f"{URLs.site_schema}{URLs.site_api}/{quote_url(endpoint)}"
- def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool):
+ def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool) -> None:
+ """Raise ResponseCodeError for non-OK response if an exception should be raised."""
if should_raise and response.status >= 400:
raise ResponseCodeError(response=response)
- async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API GET."""
async with self.session.get(self._url_for(endpoint), *args, **kwargs) as resp:
self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API PATCH."""
async with self.session.patch(self._url_for(endpoint), *args, **kwargs) as resp:
self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API POST."""
async with self.session.post(self._url_for(endpoint), *args, **kwargs) as resp:
self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API PUT."""
async with self.session.put(self._url_for(endpoint), *args, **kwargs) as resp:
self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> Union[dict, None]:
+ """Site API DELETE."""
async with self.session.delete(self._url_for(endpoint), *args, **kwargs) as resp:
if resp.status == 204:
return None
@@ -65,6 +76,12 @@ class APIClient:
def loop_is_running() -> bool:
+ """
+ Determine if there is a running asyncio event loop.
+
+ This helps enable "call this when event loop is running" logic (see: Twisted's `callWhenRunning`),
+ which is currently not provided by asyncio
+ """
# asyncio does not have a way to say "call this when the event
# loop is running", see e.g. `callWhenRunning` from twisted.
@@ -76,6 +93,8 @@ def loop_is_running() -> bool:
class APILoggingHandler(logging.StreamHandler):
+ """Site API logging handler."""
+
def __init__(self, client: APIClient):
logging.StreamHandler.__init__(self)
self.client = client
@@ -84,7 +103,8 @@ class APILoggingHandler(logging.StreamHandler):
# on the event loop yet - scheduled when the event loop is ready.
self.queue = []
- async def ship_off(self, payload: dict):
+ async def ship_off(self, payload: dict) -> None:
+ """Ship log payload to the logging API."""
try:
await self.client.post('logs', json=payload)
except ResponseCodeError as err:
@@ -100,7 +120,14 @@ class APILoggingHandler(logging.StreamHandler):
extra={'via_handler': True}
)
- def emit(self, record: logging.LogRecord):
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ Determine if a log record should be shipped to the logging API.
+
+ The following two conditions are set:
+ 1. Do not log anything below DEBUG
+ 2. Ignore log records from the logging handler
+ """
# Two checks are performed here:
if (
# 1. Do not log anything below `DEBUG`. This is only applicable
@@ -131,7 +158,8 @@ class APILoggingHandler(logging.StreamHandler):
asyncio.create_task(task)
self.schedule_queued_tasks()
- def schedule_queued_tasks(self):
+ def schedule_queued_tasks(self) -> None:
+ """Logging task scheduler."""
for task in self.queue:
asyncio.create_task(task)
diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py
index c67fa2807..fc4dca0ee 100644
--- a/bot/cogs/defcon.py
+++ b/bot/cogs/defcon.py
@@ -25,7 +25,8 @@ BASE_CHANNEL_TOPIC = "Python Discord Defense Mechanism"
class Defcon:
- """Time-sensitive server defense mechanisms"""
+ """Time-sensitive server defense mechanisms."""
+
days = None # type: timedelta
enabled = False # type: bool
@@ -36,9 +37,11 @@ class Defcon:
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """On cog load, try to synchronize DEFCON settings to the API."""
try:
response = await self.bot.api_client.get('bot/bot-settings/defcon')
data = response['data']
@@ -62,7 +65,8 @@ class Defcon:
await self.update_channel_topic()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
+ """If DEFON is enabled, check newly joining users to see if they meet the account age threshold."""
if self.enabled and self.days.days > 0:
now = datetime.utcnow()
@@ -95,21 +99,19 @@ class Defcon:
@group(name='defcon', aliases=('dc',), invoke_without_command=True)
@with_role(Roles.admin, Roles.owner)
- async def defcon_group(self, ctx: Context):
+ async def defcon_group(self, ctx: Context) -> None:
"""Check the DEFCON status or run a subcommand."""
-
await ctx.invoke(self.bot.get_command("help"), "defcon")
@defcon_group.command(name='enable', aliases=('on', 'e'))
@with_role(Roles.admin, Roles.owner)
- async def enable_command(self, ctx: Context):
+ async def enable_command(self, ctx: Context) -> None:
"""
Enable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!
- Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must
- be, in days.
+ Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must be,
+ in days.
"""
-
self.enabled = True
try:
@@ -156,11 +158,8 @@ class Defcon:
@defcon_group.command(name='disable', aliases=('off', 'd'))
@with_role(Roles.admin, Roles.owner)
- async def disable_command(self, ctx: Context):
- """
- Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!
- """
-
+ async def disable_command(self, ctx: Context) -> None:
+ """Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!"""
self.enabled = False
try:
@@ -202,11 +201,8 @@ class Defcon:
@defcon_group.command(name='status', aliases=('s',))
@with_role(Roles.admin, Roles.owner)
- async def status_command(self, ctx: Context):
- """
- Check the current status of DEFCON mode.
- """
-
+ async def status_command(self, ctx: Context) -> None:
+ """Check the current status of DEFCON mode."""
embed = Embed(
colour=Colour.blurple(), title="DEFCON Status",
description=f"**Enabled:** {self.enabled}\n"
@@ -217,11 +213,8 @@ class Defcon:
@defcon_group.command(name='days')
@with_role(Roles.admin, Roles.owner)
- async def days_command(self, ctx: Context, days: int):
- """
- Set how old an account must be to join the server, in days, with DEFCON mode enabled.
- """
-
+ async def days_command(self, ctx: Context, days: int) -> None:
+ """Set how old an account must be to join the server, in days, with DEFCON mode enabled."""
self.days = timedelta(days=days)
try:
@@ -266,11 +259,8 @@ class Defcon:
await self.update_channel_topic()
- async def update_channel_topic(self):
- """
- Update the #defcon channel topic with the current DEFCON status
- """
-
+ async def update_channel_topic(self) -> None:
+ """Update the #defcon channel topic with the current DEFCON status."""
if self.enabled:
day_str = "days" if self.days.days > 1 else "day"
new_topic = f"{BASE_CHANNEL_TOPIC}\n(Status: Enabled, Threshold: {self.days.days} {day_str})"
@@ -282,6 +272,7 @@ class Defcon:
await defcon_channel.edit(topic=new_topic)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """DEFCON cog load."""
bot.add_cog(Defcon(bot))
log.info("Cog loaded: Defcon")
diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py
index 6b8462f3b..22d770c04 100644
--- a/bot/cogs/logging.py
+++ b/bot/cogs/logging.py
@@ -10,27 +10,27 @@ log = logging.getLogger(__name__)
class Logging:
- """
- Debug logging module
- """
+ """Debug logging module."""
def __init__(self, bot: Bot):
self.bot = bot
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Announce our presence to the configured devlog channel."""
log.info("Bot connected!")
embed = Embed(description="Connected!")
embed.set_author(
name="Python Bot",
- url="https://gitlab.com/discord-python/projects/bot",
- icon_url="https://gitlab.com/python-discord/branding/raw/master/logos/logo_circle/logo_circle.png"
+ url="https://github.com/python-discord/bot",
+ icon_url="https://github.com/python-discord/branding/blob/master/logos/logo_circle/logo_circle_256.png"
)
if not DEBUG_MODE:
await self.bot.get_channel(Channels.devlog).send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Logging cog load."""
bot.add_cog(Logging(bot))
log.info("Cog loaded: Logging")
diff --git a/bot/cogs/tags.py b/bot/cogs/tags.py
index 7b1003148..3a93f0d47 100644
--- a/bot/cogs/tags.py
+++ b/bot/cogs/tags.py
@@ -20,9 +20,7 @@ TEST_CHANNELS = (
class Tags:
- """
- Save new tags and fetch existing tags.
- """
+ """Save new tags and fetch existing tags."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -30,32 +28,19 @@ class Tags:
self.headers = {"Authorization": f"Token {Keys.site_api}"}
@group(name='tags', aliases=('tag', 't'), hidden=True, invoke_without_command=True)
- async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None):
+ async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
"""Show all known tags, a single tag, or run a subcommand."""
-
await ctx.invoke(self.get_command, tag_name=tag_name)
@tags_group.command(name='get', aliases=('show', 'g'))
- async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None):
- """
- Get a list of all tags or a specified tag.
-
- :param ctx: Discord message context
- :param tag_name:
- If provided, this function shows data for that specific tag.
- If not provided, this function shows the caller a list of all tags.
- """
-
- def _command_on_cooldown(tag_name) -> bool:
+ async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
+ """Get a specified tag, or a list of all tags if no tag is specified."""
+ def _command_on_cooldown(tag_name: str) -> bool:
"""
- Check if the command is currently on cooldown.
- The cooldown duration is set in constants.py.
+ Check if the command is currently on cooldown, on a per-tag, per-channel basis.
- This works on a per-tag, per-channel basis.
- :param tag_name: The name of the command to check.
- :return: True if the command is cooling down. Otherwise False.
+ The cooldown duration is set in constants.py.
"""
-
now = time.time()
cooldown_conditions = (
@@ -110,15 +95,8 @@ class Tags:
tag_name: TagNameConverter,
*,
tag_content: TagContentConverter,
- ):
- """
- Create a new tag or update an existing one.
-
- :param ctx: discord message context
- :param tag_name: The name of the tag to create or edit.
- :param tag_content: The content of the tag.
- """
-
+ ) -> None:
+ """Create a new tag or update an existing one."""
body = {
'title': tag_name.lower().strip(),
'embed': {
@@ -141,14 +119,8 @@ class Tags:
@tags_group.command(name='delete', aliases=('remove', 'rm', 'd'))
@with_role(Roles.admin, Roles.owner)
- async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter):
- """
- Remove a tag from the database.
-
- :param ctx: discord message context
- :param tag_name: The name of the tag to delete.
- """
-
+ async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter) -> None:
+ """Remove a tag from the database."""
await self.bot.api_client.delete(f'bot/tags/{tag_name}')
log.debug(f"{ctx.author} successfully deleted the tag called '{tag_name}'")
@@ -159,6 +131,7 @@ class Tags:
))
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Tags cog load."""
bot.add_cog(Tags(bot))
log.info("Cog loaded: Tags")
diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py
index 05298a2ff..7e9f5ef84 100644
--- a/bot/cogs/token_remover.py
+++ b/bot/cogs/token_remover.py
@@ -44,9 +44,15 @@ class TokenRemover:
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
- async def on_message(self, msg: Message):
+ async def on_message(self, msg: Message) -> None:
+ """
+ Check each message for a string that matches Discord's token pattern.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
if msg.author.bot:
return
@@ -83,6 +89,11 @@ class TokenRemover:
@staticmethod
def is_valid_user_id(b64_content: str) -> bool:
+ """
+ Check potential token to see if it contains a valid Discord user ID.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
b64_content += '=' * (-len(b64_content) % 4)
try:
@@ -93,6 +104,11 @@ class TokenRemover:
@staticmethod
def is_valid_timestamp(b64_content: str) -> bool:
+ """
+ Check potential token to see if it contains a valid timestamp.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
b64_content += '=' * (-len(b64_content) % 4)
try:
@@ -103,6 +119,7 @@ class TokenRemover:
return snowflake_time(snowflake + TOKEN_EPOCH) < DISCORD_EPOCH_TIMESTAMP
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Token Remover cog load."""
bot.add_cog(TokenRemover(bot))
log.info("Cog loaded: TokenRemover")
diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py
index e8b16b243..0093a1615 100644
--- a/bot/cogs/wolfram.py
+++ b/bot/cogs/wolfram.py
@@ -6,7 +6,7 @@ from urllib import parse
import discord
from discord import Embed
from discord.ext import commands
-from discord.ext.commands import BucketType, Context, check, group
+from discord.ext.commands import Bot, BucketType, Context, check, group
from bot.constants import Colours, STAFF_ROLES, Wolfram
from bot.pagination import ImagePaginator
@@ -35,18 +35,7 @@ async def send_embed(
img_url: str = None,
f: discord.File = None
) -> None:
- """
- Generates an embed with wolfram as the author, with message_txt as description,
- adds custom colour if specified, a footer and image (could be a file with f param) and sends
- the embed through ctx
- :param ctx: Context
- :param message_txt: str - Message to be sent
- :param colour: int - Default: Colours.soft_red - Colour of embed
- :param footer: str - Default: None - Adds a footer to the embed
- :param img_url:str - Default: None - Adds an image to the embed
- :param f: discord.File - Default: None - Add a file to the msg, often attached as image to embed
- """
-
+ """Generate & send a response embed with Wolfram as the author."""
embed = Embed(colour=colour)
embed.description = message_txt
embed.set_author(name="Wolfram Alpha",
@@ -63,14 +52,10 @@ async def send_embed(
def custom_cooldown(*ignore: List[int]) -> check:
"""
- Custom cooldown mapping that applies a specific requests per day to users.
- Staff is ignored by the user cooldown, however the cooldown implements a
- total amount of uses per day for the entire guild. (Configurable in configs)
+ Implement per-user and per-guild cooldowns for requests to the Wolfram API.
- :param ignore: List[int] -- list of ids of roles to be ignored by user cooldown
- :return: check
+ A list of roles may be provided to ignore the per-user cooldown
"""
-
async def predicate(ctx: Context) -> bool:
user_bucket = usercd.get_bucket(ctx.message)
@@ -105,8 +90,8 @@ def custom_cooldown(*ignore: List[int]) -> check:
return check(predicate)
-async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]:
- # Give feedback that the bot is working.
+async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[List[Tuple]]:
+ """Give feedback that the bot is working."""
async with ctx.channel.typing():
url_str = parse.urlencode({
"input": query,
@@ -150,9 +135,7 @@ async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]:
class Wolfram:
- """
- Commands for interacting with the Wolfram|Alpha API.
- """
+ """Commands for interacting with the Wolfram|Alpha API."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@@ -160,14 +143,7 @@ class Wolfram:
@group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
@custom_cooldown(*STAFF_ROLES)
async def wolfram_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests all answers on a single image,
- sends an image of all related pods
-
- :param ctx: Context
- :param query: str - string request to api
- """
-
+ """Requests all answers on a single image, sends an image of all related pods."""
url_str = parse.urlencode({
"i": query,
"appid": APPID,
@@ -203,13 +179,10 @@ class Wolfram:
@custom_cooldown(*STAFF_ROLES)
async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
"""
- Requests a drawn image of given query
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc
+ Requests a drawn image of given query.
- :param ctx: Context
- :param query: str - string request to api
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
-
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
@@ -225,15 +198,12 @@ class Wolfram:
@wolfram_command.command(name="cut", aliases=("c",))
@custom_cooldown(*STAFF_ROLES)
- async def wolfram_cut_command(self, ctx, *, query: str) -> None:
+ async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
"""
- Requests a drawn image of given query
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc
+ Requests a drawn image of given query.
- :param ctx: Context
- :param query: str - string request to api
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
-
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
@@ -249,14 +219,7 @@ class Wolfram:
@wolfram_command.command(name="short", aliases=("sh", "s"))
@custom_cooldown(*STAFF_ROLES)
async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests an answer to a simple question
- Responds in plaintext
-
- :param ctx: Context
- :param query: str - string request to api
- """
-
+ """Requests an answer to a simple question."""
url_str = parse.urlencode({
"i": query,
"appid": APPID,
@@ -284,5 +247,6 @@ class Wolfram:
def setup(bot: commands.Bot) -> None:
+ """Wolfram cog load."""
bot.add_cog(Wolfram(bot))
log.info("Cog loaded: Wolfram")