aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar scragly <[email protected]>2019-09-23 16:51:22 +1000
committerGravatar GitHub <[email protected]>2019-09-23 16:51:22 +1000
commitd49befb1800135000f324588c593acdb2d1bebcb (patch)
tree38d3475bcd71b55264acf90a23c49bca93774e7a
parentFix date formatting bug in infraction search (diff)
parentApply suggestions from code review (diff)
Update linting (#406)
Update linting
-rw-r--r--Pipfile2
-rw-r--r--Pipfile.lock31
-rw-r--r--bot/__init__.py2
-rw-r--r--bot/api.py61
-rw-r--r--bot/cogs/alias.py145
-rw-r--r--bot/cogs/antispam.py6
-rw-r--r--bot/cogs/bot.py71
-rw-r--r--bot/cogs/clean.py97
-rw-r--r--bot/cogs/cogs.py31
-rw-r--r--bot/cogs/defcon.py51
-rw-r--r--bot/cogs/doc.py133
-rw-r--r--bot/cogs/error_handler.py31
-rw-r--r--bot/cogs/eval.py24
-rw-r--r--bot/cogs/filtering.py65
-rw-r--r--bot/cogs/free.py9
-rw-r--r--bot/cogs/help.py401
-rw-r--r--bot/cogs/information.py32
-rw-r--r--bot/cogs/jams.py15
-rw-r--r--bot/cogs/logging.py10
-rw-r--r--bot/cogs/moderation.py251
-rw-r--r--bot/cogs/modlog.py69
-rw-r--r--bot/cogs/off_topic_names.py42
-rw-r--r--bot/cogs/reddit.py78
-rw-r--r--bot/cogs/reminders.py103
-rw-r--r--bot/cogs/security.py13
-rw-r--r--bot/cogs/site.py28
-rw-r--r--bot/cogs/snekbox.py15
-rw-r--r--bot/cogs/superstarify/__init__.py56
-rw-r--r--bot/cogs/superstarify/stars.py3
-rw-r--r--bot/cogs/sync/__init__.py5
-rw-r--r--bot/cogs/sync/cog.py2
-rw-r--r--bot/cogs/sync/syncers.py7
-rw-r--r--bot/cogs/tags.py53
-rw-r--r--bot/cogs/token_remover.py21
-rw-r--r--bot/cogs/utils.py34
-rw-r--r--bot/cogs/verification.py53
-rw-r--r--bot/cogs/watchchannels/__init__.py5
-rw-r--r--bot/cogs/watchchannels/bigbrother.py4
-rw-r--r--bot/cogs/watchchannels/talentpool.py6
-rw-r--r--bot/cogs/watchchannels/watchchannel.py14
-rw-r--r--bot/cogs/wolfram.py70
-rw-r--r--bot/converters.py73
-rw-r--r--bot/decorators.py57
-rw-r--r--bot/interpreter.py14
-rw-r--r--bot/pagination.py169
-rw-r--r--bot/patches/__init__.py2
-rw-r--r--bot/patches/message_edited_at.py6
-rw-r--r--bot/rules/attachments.py8
-rw-r--r--bot/rules/burst.py8
-rw-r--r--bot/rules/burst_shared.py8
-rw-r--r--bot/rules/chars.py8
-rw-r--r--bot/rules/discord_emojis.py8
-rw-r--r--bot/rules/duplicates.py8
-rw-r--r--bot/rules/links.py8
-rw-r--r--bot/rules/mentions.py8
-rw-r--r--bot/rules/newlines.py8
-rw-r--r--bot/rules/role_mentions.py8
-rw-r--r--bot/utils/__init__.py37
-rw-r--r--bot/utils/checks.py18
-rw-r--r--bot/utils/messages.py63
-rw-r--r--bot/utils/scheduling.py43
-rw-r--r--bot/utils/time.py44
-rw-r--r--tox.ini19
63 files changed, 1066 insertions, 1708 deletions
diff --git a/Pipfile b/Pipfile
index 273db04d2..6a58054c1 100644
--- a/Pipfile
+++ b/Pipfile
@@ -23,7 +23,9 @@ urllib3 = ">=1.24.2,<1.25"
[dev-packages]
flake8 = "~=3.7"
+flake8-annotations = "~=1.0"
flake8-bugbear = "~=19.8"
+flake8-docstrings = "~=1.4"
flake8-import-order = "~=0.18"
flake8-string-format = "~=0.2"
flake8-tidy-imports = "~=2.0"
diff --git a/Pipfile.lock b/Pipfile.lock
index e5978198f..9bdcff923 100644
--- a/Pipfile.lock
+++ b/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "f21c27a5c4493b65a36a78721c2cb597c3eed7fcbd28f3bf731453f2c3cccb56"
+ "sha256": "29aaaa90a070d544e5b39fb6033410daa9bb7f658077205e44099f3175f6822b"
},
"pipfile-spec": 6,
"requires": {
@@ -700,6 +700,14 @@
"index": "pypi",
"version": "==3.7.8"
},
+ "flake8-annotations": {
+ "hashes": [
+ "sha256:1309f2bc9853a2d77d578b089d331b0b832b40c97932641e136e1b49d3650c82",
+ "sha256:3ecdd27054c3eed6484139025698465e3c9f4e68dbd5043d0204fcb2550ee27b"
+ ],
+ "index": "pypi",
+ "version": "==1.0.0"
+ },
"flake8-bugbear": {
"hashes": [
"sha256:d8c466ea79d5020cb20bf9f11cf349026e09517a42264f313d3f6fddb83e0571",
@@ -708,6 +716,14 @@
"index": "pypi",
"version": "==19.8.0"
},
+ "flake8-docstrings": {
+ "hashes": [
+ "sha256:1666dd069c9c457ee57e80af3c1a6b37b00cc1801c6fde88e455131bb2e186cd",
+ "sha256:9c0db5a79a1affd70fdf53b8765c8a26bf968e59e0252d7f2fc546b41c0cda06"
+ ],
+ "index": "pypi",
+ "version": "==1.4.0"
+ },
"flake8-import-order": {
"hashes": [
"sha256:90a80e46886259b9c396b578d75c749801a41ee969a235e163cfe1be7afd2543",
@@ -818,6 +834,13 @@
],
"version": "==2.5.0"
},
+ "pydocstyle": {
+ "hashes": [
+ "sha256:04c84e034ebb56eb6396c820442b8c4499ac5eb94a3bda88951ac3dc519b6058",
+ "sha256:66aff87ffe34b1e49bff2dd03a88ce6843be2f3346b0c9814410d34987fbab59"
+ ],
+ "version": "==4.0.1"
+ },
"pyflakes": {
"hashes": [
"sha256:17dbeb2e3f4d772725c777fabc446d5634d1038f234e77343108ce445ea69ce0",
@@ -890,6 +913,12 @@
],
"version": "==1.12.0"
},
+ "snowballstemmer": {
+ "hashes": [
+ "sha256:713e53b79cbcf97bc5245a06080a33d54a77e7cce2f789c835a143bcdb5c033e"
+ ],
+ "version": "==1.9.1"
+ },
"toml": {
"hashes": [
"sha256:229f81c57791a41d65e399fc06bf0848bab550a9dfd5ed66df18ce5f05e73d5c",
diff --git a/bot/__init__.py b/bot/__init__.py
index 8efa5e53c..d094e8c13 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -9,7 +9,7 @@ logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
-def monkeypatch_trace(self, msg, *args, **kwargs):
+def monkeypatch_trace(self: logging.Logger, msg: str, *args, **kwargs) -> None:
"""
Log 'msg % args' with severity 'TRACE'.
diff --git a/bot/api.py b/bot/api.py
index 3acde242e..7f26e5305 100644
--- a/bot/api.py
+++ b/bot/api.py
@@ -11,6 +11,8 @@ log = logging.getLogger(__name__)
class ResponseCodeError(ValueError):
+ """Raised when a non-OK HTTP response is received."""
+
def __init__(
self,
response: aiohttp.ClientResponse,
@@ -28,6 +30,8 @@ class ResponseCodeError(ValueError):
class APIClient:
+ """Django Site API wrapper."""
+
def __init__(self, **kwargs):
auth_headers = {
'Authorization': f"Token {Keys.site_api}"
@@ -41,10 +45,11 @@ class APIClient:
self.session = aiohttp.ClientSession(**kwargs)
@staticmethod
- def _url_for(endpoint: str):
+ def _url_for(endpoint: str) -> str:
return f"{URLs.site_schema}{URLs.site_api}/{quote_url(endpoint)}"
- async def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool):
+ async def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool) -> None:
+ """Raise ResponseCodeError for non-OK response if an exception should be raised."""
if should_raise and response.status >= 400:
try:
response_json = await response.json()
@@ -53,27 +58,32 @@ class APIClient:
response_text = await response.text()
raise ResponseCodeError(response=response, response_text=response_text)
- async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API GET."""
async with self.session.get(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API PATCH."""
async with self.session.patch(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API POST."""
async with self.session.post(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API PUT."""
async with self.session.put(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> Optional[dict]:
+ """Site API DELETE."""
async with self.session.delete(self._url_for(endpoint), *args, **kwargs) as resp:
if resp.status == 204:
return None
@@ -83,9 +93,12 @@ class APIClient:
def loop_is_running() -> bool:
- # asyncio does not have a way to say "call this when the event
- # loop is running", see e.g. `callWhenRunning` from twisted.
+ """
+ Determine if there is a running asyncio event loop.
+ This helps enable "call this when event loop is running" logic (see: Twisted's `callWhenRunning`),
+ which is currently not provided by asyncio.
+ """
try:
asyncio.get_running_loop()
except RuntimeError:
@@ -94,6 +107,8 @@ def loop_is_running() -> bool:
class APILoggingHandler(logging.StreamHandler):
+ """Site API logging handler."""
+
def __init__(self, client: APIClient):
logging.StreamHandler.__init__(self)
self.client = client
@@ -102,7 +117,8 @@ class APILoggingHandler(logging.StreamHandler):
# on the event loop yet - scheduled when the event loop is ready.
self.queue = []
- async def ship_off(self, payload: dict):
+ async def ship_off(self, payload: dict) -> None:
+ """Ship log payload to the logging API."""
try:
await self.client.post('logs', json=payload)
except ResponseCodeError as err:
@@ -118,19 +134,19 @@ class APILoggingHandler(logging.StreamHandler):
extra={'via_handler': True}
)
- def emit(self, record: logging.LogRecord):
- # Two checks are performed here:
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ Determine if a log record should be shipped to the logging API.
+
+ If the asyncio event loop is not yet running, log records will instead be put in a queue
+ which will be consumed once the event loop is running.
+
+ The following two conditions are set:
+ 1. Do not log anything below DEBUG (only applies to the monkeypatched `TRACE` level)
+ 2. Ignore log records originating from this logging handler itself to prevent infinite recursion
+ """
if (
- # 1. Do not log anything below `DEBUG`. This is only applicable
- # for the monkeypatched `TRACE` logging level, which has a
- # lower numeric value than `DEBUG`.
record.levelno >= logging.DEBUG
- # 2. Ignore logging messages which are sent by this logging
- # handler itself. This is required because if we were to
- # not ignore messages emitted by this handler, we would
- # infinitely recurse back down into this logging handler,
- # making the reactor run like crazy, and eventually OOM
- # something. Let's not do that...
and not record.__dict__.get('via_handler')
):
payload = {
@@ -149,7 +165,8 @@ class APILoggingHandler(logging.StreamHandler):
asyncio.create_task(task)
self.schedule_queued_tasks()
- def schedule_queued_tasks(self):
+ def schedule_queued_tasks(self) -> None:
+ """Consume the queue and schedule the logging of each queued record."""
for task in self.queue:
asyncio.create_task(task)
diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py
index 3d0c9d826..80ff37983 100644
--- a/bot/cogs/alias.py
+++ b/bot/cogs/alias.py
@@ -3,9 +3,7 @@ import logging
from typing import Union
from discord import Colour, Embed, Member, User
-from discord.ext.commands import (
- Cog, Command, Context, clean_content, command, group
-)
+from discord.ext.commands import Bot, Cog, Command, Context, clean_content, command, group
from bot.cogs.watchchannels.watchchannel import proxy_user
from bot.converters import TagNameConverter
@@ -14,26 +12,14 @@ from bot.pagination import LinePaginator
log = logging.getLogger(__name__)
-class Alias(Cog):
- """
- Aliases for more used commands
- """
+class Alias (Cog):
+ """Aliases for commonly used commands."""
- def __init__(self, bot):
+ def __init__(self, bot: Bot):
self.bot = bot
- async def invoke(self, ctx, cmd_name, *args, **kwargs):
- """
- Invokes a command with args and kwargs.
- Fail early through `command.can_run`, and logs warnings.
-
- :param ctx: Context instance for command call
- :param cmd_name: Name of command/subcommand to be invoked
- :param args: args to be passed to the command
- :param kwargs: kwargs to be passed to the command
- :return: None
- """
-
+ async def invoke(self, ctx: Context, cmd_name: str, *args, **kwargs) -> None:
+ """Invokes a command with args and kwargs."""
log.debug(f"{cmd_name} was invoked through an alias")
cmd = self.bot.get_command(cmd_name)
if not cmd:
@@ -46,9 +32,8 @@ class Alias(Cog):
await ctx.invoke(cmd, *args, **kwargs)
@command(name='aliases')
- async def aliases_command(self, ctx):
+ async def aliases_command(self, ctx: Context) -> None:
"""Show configured aliases on the bot."""
-
embed = Embed(
title='Configured aliases',
colour=Colour.blue()
@@ -64,148 +49,98 @@ class Alias(Cog):
)
@command(name="resources", aliases=("resource",), hidden=True)
- async def site_resources_alias(self, ctx):
- """
- Alias for invoking <prefix>site resources.
- """
-
+ async def site_resources_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site resources."""
await self.invoke(ctx, "site resources")
@command(name="watch", hidden=True)
- async def bigbrother_watch_alias(self, ctx, user: Union[Member, User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>bigbrother watch [user] [reason].
- """
-
+ async def bigbrother_watch_alias(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>bigbrother watch [user] [reason]."""
await self.invoke(ctx, "bigbrother watch", user, reason=reason)
@command(name="unwatch", hidden=True)
- async def bigbrother_unwatch_alias(self, ctx, user: Union[User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>bigbrother unwatch [user] [reason].
- """
-
+ async def bigbrother_unwatch_alias(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>bigbrother unwatch [user] [reason]."""
await self.invoke(ctx, "bigbrother unwatch", user, reason=reason)
@command(name="home", hidden=True)
- async def site_home_alias(self, ctx):
- """
- Alias for invoking <prefix>site home.
- """
-
+ async def site_home_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site home."""
await self.invoke(ctx, "site home")
@command(name="faq", hidden=True)
- async def site_faq_alias(self, ctx):
- """
- Alias for invoking <prefix>site faq.
- """
-
+ async def site_faq_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site faq."""
await self.invoke(ctx, "site faq")
@command(name="rules", hidden=True)
- async def site_rules_alias(self, ctx):
- """
- Alias for invoking <prefix>site rules.
- """
-
+ async def site_rules_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site rules."""
await self.invoke(ctx, "site rules")
@command(name="reload", hidden=True)
- async def cogs_reload_alias(self, ctx, *, cog_name: str):
- """
- Alias for invoking <prefix>cogs reload [cog_name].
-
- cog_name: str - name of the cog to be reloaded.
- """
-
+ async def cogs_reload_alias(self, ctx: Context, *, cog_name: str) -> None:
+ """Alias for invoking <prefix>cogs reload [cog_name]."""
await self.invoke(ctx, "cogs reload", cog_name)
@command(name="defon", hidden=True)
- async def defcon_enable_alias(self, ctx):
- """
- Alias for invoking <prefix>defcon enable.
- """
-
+ async def defcon_enable_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>defcon enable."""
await self.invoke(ctx, "defcon enable")
@command(name="defoff", hidden=True)
- async def defcon_disable_alias(self, ctx):
- """
- Alias for invoking <prefix>defcon disable.
- """
-
+ async def defcon_disable_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>defcon disable."""
await self.invoke(ctx, "defcon disable")
@command(name="exception", hidden=True)
- async def tags_get_traceback_alias(self, ctx):
- """
- Alias for invoking <prefix>tags get traceback.
- """
-
+ async def tags_get_traceback_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>tags get traceback."""
await self.invoke(ctx, "tags get", tag_name="traceback")
@group(name="get",
aliases=("show", "g"),
hidden=True,
invoke_without_command=True)
- async def get_group_alias(self, ctx):
- """
- Group for reverse aliases for commands like `tags get`,
- allowing for `get tags` or `get docs`.
- """
-
+ async def get_group_alias(self, ctx: Context) -> None:
+ """Group for reverse aliases for commands like `tags get`, allowing for `get tags` or `get docs`."""
pass
@get_group_alias.command(name="tags", aliases=("tag", "t"), hidden=True)
async def tags_get_alias(
self, ctx: Context, *, tag_name: TagNameConverter = None
- ):
+ ) -> None:
"""
Alias for invoking <prefix>tags get [tag_name].
tag_name: str - tag to be viewed.
"""
-
await self.invoke(ctx, "tags get", tag_name=tag_name)
@get_group_alias.command(name="docs", aliases=("doc", "d"), hidden=True)
async def docs_get_alias(
self, ctx: Context, symbol: clean_content = None
- ):
- """
- Alias for invoking <prefix>docs get [symbol].
-
- symbol: str - name of doc to be viewed.
- """
-
+ ) -> None:
+ """Alias for invoking <prefix>docs get [symbol]."""
await self.invoke(ctx, "docs get", symbol)
@command(name="nominate", hidden=True)
- async def nomination_add_alias(self, ctx, user: Union[Member, User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>talentpool add [user] [reason].
- """
-
+ async def nomination_add_alias(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>talentpool add [user] [reason]."""
await self.invoke(ctx, "talentpool add", user, reason=reason)
@command(name="unnominate", hidden=True)
- async def nomination_end_alias(self, ctx, user: Union[User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>nomination end [user] [reason].
- """
-
+ async def nomination_end_alias(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>nomination end [user] [reason]."""
await self.invoke(ctx, "nomination end", user, reason=reason)
@command(name="nominees", hidden=True)
- async def nominees_alias(self, ctx):
- """
- Alias for invoking <prefix>tp watched.
- """
-
+ async def nominees_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>tp watched."""
await self.invoke(ctx, "talentpool watched")
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Alias cog load."""
bot.add_cog(Alias(bot))
log.info("Cog loaded: Alias")
diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py
index 7b97881fd..7a3360436 100644
--- a/bot/cogs/antispam.py
+++ b/bot/cogs/antispam.py
@@ -113,7 +113,7 @@ class AntiSpam(Cog):
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
"""Unloads the cog and alerts admins if configuration validation failed."""
if self.validation_errors:
body = "**The following errors were encountered:**\n"
@@ -221,9 +221,7 @@ class AntiSpam(Cog):
async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]) -> None:
"""Cleans the messages if cleaning is configured."""
-
if AntiSpamConfig.clean_offending:
-
# If we have more than one message, we can use bulk delete.
if len(messages) > 1:
message_ids = [message.id for message in messages]
@@ -274,7 +272,7 @@ def validate_config(rules: Mapping = AntiSpamConfig.rules) -> Dict[str, str]:
def setup(bot: Bot) -> None:
- """Setup for the cog."""
+ """Antispam cog load."""
validation_errors = validate_config()
bot.add_cog(AntiSpam(bot, validation_errors))
log.info("Cog loaded: AntiSpam")
diff --git a/bot/cogs/bot.py b/bot/cogs/bot.py
index 577865a65..324d2ccd3 100644
--- a/bot/cogs/bot.py
+++ b/bot/cogs/bot.py
@@ -2,6 +2,7 @@ import ast
import logging
import re
import time
+from typing import Optional, Tuple
from discord import Embed, Message, RawMessageUpdateEvent
from discord.ext.commands import Bot, Cog, Context, command, group
@@ -16,9 +17,7 @@ RE_MARKDOWN = re.compile(r'([*_~`|>])')
class Bot(Cog):
- """
- Bot information commands
- """
+ """Bot information commands."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -47,20 +46,14 @@ class Bot(Cog):
@group(invoke_without_command=True, name="bot", hidden=True)
@with_role(Roles.verified)
- async def botinfo_group(self, ctx: Context):
- """
- Bot informational commands
- """
-
+ async def botinfo_group(self, ctx: Context) -> None:
+ """Bot informational commands."""
await ctx.invoke(self.bot.get_command("help"), "bot")
@botinfo_group.command(name='about', aliases=('info',), hidden=True)
@with_role(Roles.verified)
- async def about_command(self, ctx: Context):
- """
- Get information about the bot
- """
-
+ async def about_command(self, ctx: Context) -> None:
+ """Get information about the bot."""
embed = Embed(
description="A utility bot designed just for the Python server! Try `!help` for more info.",
url="https://github.com/python-discord/bot"
@@ -78,24 +71,18 @@ class Bot(Cog):
@command(name='echo', aliases=('print',))
@with_role(*MODERATION_ROLES)
- async def echo_command(self, ctx: Context, *, text: str):
- """
- Send the input verbatim to the current channel
- """
-
+ async def echo_command(self, ctx: Context, *, text: str) -> None:
+ """Send the input verbatim to the current channel."""
await ctx.send(text)
@command(name='embed')
@with_role(*MODERATION_ROLES)
- async def embed_command(self, ctx: Context, *, text: str):
- """
- Send the input within an embed to the current channel
- """
-
+ async def embed_command(self, ctx: Context, *, text: str) -> None:
+ """Send the input within an embed to the current channel."""
embed = Embed(description=text)
await ctx.send(embed=embed)
- def codeblock_stripping(self, msg: str, bad_ticks: bool):
+ def codeblock_stripping(self, msg: str, bad_ticks: bool) -> Optional[Tuple[Tuple[str, ...], str]]:
"""
Strip msg in order to find Python code.
@@ -164,15 +151,10 @@ class Bot(Cog):
log.trace(f"Returning message.\n\n{content}\n\n")
return (content,), repl_code
- def fix_indentation(self, msg: str):
- """
- Attempts to fix badly indented code.
- """
-
- def unindent(code, skip_spaces=0):
- """
- Unindents all code down to the number of spaces given ins skip_spaces
- """
+ def fix_indentation(self, msg: str) -> str:
+ """Attempts to fix badly indented code."""
+ def unindent(code, skip_spaces: int = 0) -> str:
+ """Unindents all code down to the number of spaces given in skip_spaces."""
final = ""
current = code[0]
leading_spaces = 0
@@ -208,11 +190,13 @@ class Bot(Cog):
msg = f"{first_line}\n{unindent(code, 4)}"
return msg
- def repl_stripping(self, msg: str):
+ def repl_stripping(self, msg: str) -> Tuple[str, bool]:
"""
Strip msg in order to extract Python code out of REPL output.
Tries to strip out REPL Python code out of msg and returns the stripped msg.
+
+ Returns True for the boolean if REPL code was found in the input msg.
"""
final = ""
for line in msg.splitlines(keepends=True):
@@ -226,7 +210,8 @@ class Bot(Cog):
log.trace(f"Found REPL code in \n\n{msg}\n\n")
return final.rstrip(), True
- def has_bad_ticks(self, msg: Message):
+ def has_bad_ticks(self, msg: Message) -> bool:
+ """Check to see if msg contains ticks that aren't '`'."""
not_backticks = [
"'''", '"""', "\u00b4\u00b4\u00b4", "\u2018\u2018\u2018", "\u2019\u2019\u2019",
"\u2032\u2032\u2032", "\u201c\u201c\u201c", "\u201d\u201d\u201d", "\u2033\u2033\u2033",
@@ -236,13 +221,13 @@ class Bot(Cog):
return msg.content[:3] in not_backticks
@Cog.listener()
- async def on_message(self, msg: Message):
- """
- Detect poorly formatted Python code and send the user
- a helpful message explaining how to do properly
- formatted Python syntax highlighting codeblocks.
+ async def on_message(self, msg: Message) -> None:
"""
+ Detect poorly formatted Python code in new messages.
+ If poorly formatted code is detected, send the user a helpful message explaining how to do
+ properly formatted Python syntax highlighting codeblocks.
+ """
parse_codeblock = (
(
msg.channel.id in self.channel_cooldowns
@@ -361,7 +346,8 @@ class Bot(Cog):
)
@Cog.listener()
- async def on_raw_message_edit(self, payload: RawMessageUpdateEvent):
+ async def on_raw_message_edit(self, payload: RawMessageUpdateEvent) -> None:
+ """Check to see if an edited message (previously called out) still contains poorly formatted code."""
if (
# Checks to see if the message was called out by the bot
payload.message_id not in self.codeblock_message_ids
@@ -387,6 +373,7 @@ class Bot(Cog):
log.trace("User's incorrect code block has been fixed. Removing bot formatting message.")
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Bot cog load."""
bot.add_cog(Bot(bot))
log.info("Cog loaded: Bot")
diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py
index 20c24dafc..1c0c9a7a8 100644
--- a/bot/cogs/clean.py
+++ b/bot/cogs/clean.py
@@ -18,17 +18,13 @@ log = logging.getLogger(__name__)
class Clean(Cog):
"""
- A cog that allows messages to be deleted in
- bulk, while applying various filters.
+ A cog that allows messages to be deleted in bulk, while applying various filters.
- You can delete messages sent by a specific user,
- messages sent by bots, all messages, or messages
- that match a specific regular expression.
+ You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a
+ specific regular expression.
- The deleted messages are saved and uploaded
- to the database via an API endpoint, and a URL is
- returned which can be used to view the messages
- in the Discord dark theme style.
+ The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be
+ used to view the messages in the Discord dark theme style.
"""
def __init__(self, bot: Bot):
@@ -37,44 +33,25 @@ class Clean(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
async def _clean_messages(
self, amount: int, ctx: Context,
bots_only: bool = False, user: User = None,
regex: Optional[str] = None
- ):
- """
- A helper function that does the actual message cleaning.
-
- :param bots_only: Set this to True if you only want to delete bot messages.
- :param user: Specify a user and it will only delete messages by this user.
- :param regular_expression: Specify a regular expression and it will only
- delete messages that match this.
- """
-
+ ) -> None:
+ """A helper function that does the actual message cleaning."""
def predicate_bots_only(message: Message) -> bool:
- """
- Returns true if the message was sent by a bot
- """
-
+ """Return True if the message was sent by a bot."""
return message.author.bot
def predicate_specific_user(message: Message) -> bool:
- """
- Return True if the message was sent by the
- user provided in the _clean_messages call.
- """
-
+ """Return True if the message was sent by the user provided in the _clean_messages call."""
return message.author == user
- def predicate_regex(message: Message):
- """
- Returns True if the regex provided in the
- _clean_messages matches the message content
- or any embed attributes the message may have.
- """
-
+ def predicate_regex(message: Message) -> bool:
+ """Check if the regex provided in _clean_messages matches the message content or any embed attributes."""
content = [message.content]
# Add the content for all embed attributes
@@ -192,61 +169,38 @@ class Clean(Cog):
@group(invoke_without_command=True, name="clean", hidden=True)
@with_role(*MODERATION_ROLES)
- async def clean_group(self, ctx: Context):
- """
- Commands for cleaning messages in channels
- """
-
+ async def clean_group(self, ctx: Context) -> None:
+ """Commands for cleaning messages in channels."""
await ctx.invoke(self.bot.get_command("help"), "clean")
@clean_group.command(name="user", aliases=["users"])
@with_role(*MODERATION_ROLES)
- async def clean_user(self, ctx: Context, user: User, amount: int = 10):
- """
- Delete messages posted by the provided user,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_user(self, ctx: Context, user: User, amount: int = 10) -> None:
+ """Delete messages posted by the provided user, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx, user=user)
@clean_group.command(name="all", aliases=["everything"])
@with_role(*MODERATION_ROLES)
- async def clean_all(self, ctx: Context, amount: int = 10):
- """
- Delete all messages, regardless of poster,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_all(self, ctx: Context, amount: int = 10) -> None:
+ """Delete all messages, regardless of poster, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx)
@clean_group.command(name="bots", aliases=["bot"])
@with_role(*MODERATION_ROLES)
- async def clean_bots(self, ctx: Context, amount: int = 10):
- """
- Delete all messages posted by a bot,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_bots(self, ctx: Context, amount: int = 10) -> None:
+ """Delete all messages posted by a bot, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx, bots_only=True)
@clean_group.command(name="regex", aliases=["word", "expression"])
@with_role(*MODERATION_ROLES)
- async def clean_regex(self, ctx: Context, regex, amount: int = 10):
- """
- Delete all messages that match a certain regex,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_regex(self, ctx: Context, regex: str, amount: int = 10) -> None:
+ """Delete all messages that match a certain regex, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx, regex=regex)
@clean_group.command(name="stop", aliases=["cancel", "abort"])
@with_role(*MODERATION_ROLES)
- async def clean_cancel(self, ctx: Context):
- """
- If there is an ongoing cleaning process,
- attempt to immediately cancel it.
- """
-
+ async def clean_cancel(self, ctx: Context) -> None:
+ """If there is an ongoing cleaning process, attempt to immediately cancel it."""
self.cleaning = False
embed = Embed(
@@ -256,6 +210,7 @@ class Clean(Cog):
await ctx.send(embed=embed, delete_after=10)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Clean cog load."""
bot.add_cog(Clean(bot))
log.info("Cog loaded: Clean")
diff --git a/bot/cogs/cogs.py b/bot/cogs/cogs.py
index ec497b966..117c77d4b 100644
--- a/bot/cogs/cogs.py
+++ b/bot/cogs/cogs.py
@@ -16,9 +16,7 @@ KEEP_LOADED = ["bot.cogs.cogs", "bot.cogs.modlog"]
class Cogs(Cog):
- """
- Cog management commands
- """
+ """Cog management commands."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -38,21 +36,19 @@ class Cogs(Cog):
@group(name='cogs', aliases=('c',), invoke_without_command=True)
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def cogs_group(self, ctx: Context):
+ async def cogs_group(self, ctx: Context) -> None:
"""Load, unload, reload, and list active cogs."""
-
await ctx.invoke(self.bot.get_command("help"), "cogs")
@cogs_group.command(name='load', aliases=('l',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def load_command(self, ctx: Context, cog: str):
+ async def load_command(self, ctx: Context, cog: str) -> None:
"""
- Load up an unloaded cog, given the module containing it
+ Load up an unloaded cog, given the module containing it.
You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
entire module directly.
"""
-
cog = cog.lower()
embed = Embed()
@@ -98,14 +94,13 @@ class Cogs(Cog):
@cogs_group.command(name='unload', aliases=('ul',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def unload_command(self, ctx: Context, cog: str):
+ async def unload_command(self, ctx: Context, cog: str) -> None:
"""
- Unload an already-loaded cog, given the module containing it
+ Unload an already-loaded cog, given the module containing it.
You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
entire module directly.
"""
-
cog = cog.lower()
embed = Embed()
@@ -150,9 +145,9 @@ class Cogs(Cog):
@cogs_group.command(name='reload', aliases=('r',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def reload_command(self, ctx: Context, cog: str):
+ async def reload_command(self, ctx: Context, cog: str) -> None:
"""
- Reload an unloaded cog, given the module containing it
+ Reload an unloaded cog, given the module containing it.
You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
entire module directly.
@@ -160,7 +155,6 @@ class Cogs(Cog):
If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the
bot/cogs directory will be loaded.
"""
-
cog = cog.lower()
embed = Embed()
@@ -232,7 +226,8 @@ class Cogs(Cog):
log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n"
f"{lines}")
- return await LinePaginator.paginate(lines, ctx, embed, empty=False)
+ await LinePaginator.paginate(lines, ctx, embed, empty=False)
+ return
elif full_cog in self.bot.extensions:
try:
@@ -255,13 +250,12 @@ class Cogs(Cog):
@cogs_group.command(name='list', aliases=('all',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def list_command(self, ctx: Context):
+ async def list_command(self, ctx: Context) -> None:
"""
Get a list of all cogs, including their loaded status.
Gray indicates that the cog is unloaded. Green indicates that the cog is currently loaded.
"""
-
embed = Embed()
lines = []
cogs = {}
@@ -301,6 +295,7 @@ class Cogs(Cog):
await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Cogs cog load."""
bot.add_cog(Cogs(bot))
log.info("Cog loaded: Cogs")
diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py
index 8fab00712..936147c8f 100644
--- a/bot/cogs/defcon.py
+++ b/bot/cogs/defcon.py
@@ -25,7 +25,8 @@ BASE_CHANNEL_TOPIC = "Python Discord Defense Mechanism"
class Defcon(Cog):
- """Time-sensitive server defense mechanisms"""
+ """Time-sensitive server defense mechanisms."""
+
days = None # type: timedelta
enabled = False # type: bool
@@ -36,10 +37,12 @@ class Defcon(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """On cog load, try to synchronize DEFCON settings to the API."""
self.channel = await self.bot.fetch_channel(Channels.defcon)
try:
response = await self.bot.api_client.get('bot/bot-settings/defcon')
@@ -65,7 +68,8 @@ class Defcon(Cog):
await self.update_channel_topic()
@Cog.listener()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
+ """If DEFCON is enabled, check newly joining users to see if they meet the account age threshold."""
if self.enabled and self.days.days > 0:
now = datetime.utcnow()
@@ -98,21 +102,19 @@ class Defcon(Cog):
@group(name='defcon', aliases=('dc',), invoke_without_command=True)
@with_role(Roles.admin, Roles.owner)
- async def defcon_group(self, ctx: Context):
+ async def defcon_group(self, ctx: Context) -> None:
"""Check the DEFCON status or run a subcommand."""
-
await ctx.invoke(self.bot.get_command("help"), "defcon")
@defcon_group.command(name='enable', aliases=('on', 'e'))
@with_role(Roles.admin, Roles.owner)
- async def enable_command(self, ctx: Context):
+ async def enable_command(self, ctx: Context) -> None:
"""
Enable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!
- Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must
- be, in days.
+ Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must be,
+ in days.
"""
-
self.enabled = True
try:
@@ -159,11 +161,8 @@ class Defcon(Cog):
@defcon_group.command(name='disable', aliases=('off', 'd'))
@with_role(Roles.admin, Roles.owner)
- async def disable_command(self, ctx: Context):
- """
- Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!
- """
-
+ async def disable_command(self, ctx: Context) -> None:
+ """Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!"""
self.enabled = False
try:
@@ -205,11 +204,8 @@ class Defcon(Cog):
@defcon_group.command(name='status', aliases=('s',))
@with_role(Roles.admin, Roles.owner)
- async def status_command(self, ctx: Context):
- """
- Check the current status of DEFCON mode.
- """
-
+ async def status_command(self, ctx: Context) -> None:
+ """Check the current status of DEFCON mode."""
embed = Embed(
colour=Colour.blurple(), title="DEFCON Status",
description=f"**Enabled:** {self.enabled}\n"
@@ -220,11 +216,8 @@ class Defcon(Cog):
@defcon_group.command(name='days')
@with_role(Roles.admin, Roles.owner)
- async def days_command(self, ctx: Context, days: int):
- """
- Set how old an account must be to join the server, in days, with DEFCON mode enabled.
- """
-
+ async def days_command(self, ctx: Context, days: int) -> None:
+ """Set how old an account must be to join the server, in days, with DEFCON mode enabled."""
self.days = timedelta(days=days)
try:
@@ -269,11 +262,8 @@ class Defcon(Cog):
await self.update_channel_topic()
- async def update_channel_topic(self):
- """
- Update the #defcon channel topic with the current DEFCON status
- """
-
+ async def update_channel_topic(self) -> None:
+ """Update the #defcon channel topic with the current DEFCON status."""
if self.enabled:
day_str = "days" if self.days.days > 1 else "day"
new_topic = f"{BASE_CHANNEL_TOPIC}\n(Status: Enabled, Threshold: {self.days.days} {day_str})"
@@ -284,6 +274,7 @@ class Defcon(Cog):
await self.channel.edit(topic=new_topic)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """DEFCON cog load."""
bot.add_cog(Defcon(bot))
log.info("Cog loaded: Defcon")
diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py
index ebf2c1d65..e5c51748f 100644
--- a/bot/cogs/doc.py
+++ b/bot/cogs/doc.py
@@ -4,10 +4,11 @@ import logging
import re
import textwrap
from collections import OrderedDict
-from typing import Optional, Tuple
+from typing import Any, Callable, Optional, Tuple
import discord
from bs4 import BeautifulSoup
+from bs4.element import PageElement
from discord.ext import commands
from markdownify import MarkdownConverter
from requests import ConnectionError
@@ -27,24 +28,22 @@ UNWANTED_SIGNATURE_SYMBOLS = ('[source]', '¶')
WHITESPACE_AFTER_NEWLINES_RE = re.compile(r"(?<=\n\n)(\s+)")
-def async_cache(max_size=128, arg_offset=0):
+def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable:
"""
LRU cache implementation for coroutines.
- :param max_size:
- Specifies the maximum size the cache should have.
- Once it exceeds the maximum size, keys are deleted in FIFO order.
- :param arg_offset:
- The offset that should be applied to the coroutine's arguments
- when creating the cache key. Defaults to `0`.
- """
+ Once the cache exceeds the maximum size, keys are deleted in FIFO order.
+ An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key.
+ """
# Assign the cache to the function itself so we can clear it from outside.
async_cache.cache = OrderedDict()
- def decorator(function):
+ def decorator(function: Callable) -> Callable:
+ """Define the async_cache decorator."""
@functools.wraps(function)
- async def wrapper(*args):
+ async def wrapper(*args) -> Any:
+ """Decorator wrapper for the caching logic."""
key = ':'.join(args[arg_offset:])
value = async_cache.cache.get(key)
@@ -59,27 +58,25 @@ def async_cache(max_size=128, arg_offset=0):
class DocMarkdownConverter(MarkdownConverter):
- def convert_code(self, el, text):
- """Undo `markdownify`s underscore escaping."""
+ """Subclass markdownify's MarkdownCoverter to provide custom conversion methods."""
+ def convert_code(self, el: PageElement, text: str) -> str:
+ """Undo `markdownify`s underscore escaping."""
return f"`{text}`".replace('\\', '')
- def convert_pre(self, el, text):
+ def convert_pre(self, el: PageElement, text: str) -> str:
"""Wrap any codeblocks in `py` for syntax highlighting."""
-
code = ''.join(el.strings)
return f"```py\n{code}```"
-def markdownify(html):
+def markdownify(html: str) -> DocMarkdownConverter:
+ """Create a DocMarkdownConverter object from the input html."""
return DocMarkdownConverter(bullets='•').convert(html)
class DummyObject(object):
- """
- A dummy object which supports assigning anything,
- which the builtin `object()` does not support normally.
- """
+ """A dummy object which supports assigning anything, which the builtin `object()` does not support normally."""
class SphinxConfiguration:
@@ -94,14 +91,15 @@ class InventoryURL(commands.Converter):
"""
Represents an Intersphinx inventory URL.
- This converter checks whether intersphinx
- accepts the given inventory URL, and raises
+ This converter checks whether intersphinx accepts the given inventory URL, and raises
`BadArgument` if that is not the case.
+
Otherwise, it simply passes through the given URL.
"""
@staticmethod
- async def convert(ctx, url: str):
+ async def convert(ctx: commands.Context, url: str) -> str:
+ """Convert url to Intersphinx inventory URL."""
try:
intersphinx.fetch_inventory(SphinxConfiguration(), '', url)
except AttributeError:
@@ -121,31 +119,33 @@ class InventoryURL(commands.Converter):
class Doc(commands.Cog):
- def __init__(self, bot):
+ """A set of commands for querying & displaying documentation."""
+
+ def __init__(self, bot: commands.Bot):
self.base_urls = {}
self.bot = bot
self.inventories = {}
@commands.Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Refresh documentation inventory."""
await self.refresh_inventory()
async def update_single(
self, package_name: str, base_url: str, inventory_url: str, config: SphinxConfiguration
- ):
+ ) -> None:
"""
Rebuild the inventory for a single package.
- :param package_name: The package name to use, appears in the log.
- :param base_url: The root documentation URL for the specified package.
- Used to build absolute paths that link to specific symbols.
- :param inventory_url: The absolute URL to the intersphinx inventory.
- Fetched by running `intersphinx.fetch_inventory` in an
- executor on the bot's event loop.
- :param config: A `SphinxConfiguration` instance to mock the regular sphinx
- project layout. Required for use with intersphinx.
+ Where:
+ * `package_name` is the package name to use, appears in the log
+ * `base_url` is the root documentation URL for the specified package, used to build
+ absolute paths that link to specific symbols
+ * `inventory_url` is the absolute URL to the intersphinx inventory, fetched by running
+ `intersphinx.fetch_inventory` in an executor on the bot's event loop
+ * `config` is a `SphinxConfiguration` instance to mock the regular sphinx
+ project layout, required for use with intersphinx
"""
-
self.base_urls[package_name] = base_url
fetch_func = functools.partial(intersphinx.fetch_inventory, config, '', inventory_url)
@@ -159,7 +159,8 @@ class Doc(commands.Cog):
log.trace(f"Fetched inventory for {package_name}.")
- async def refresh_inventory(self):
+ async def refresh_inventory(self) -> None:
+ """Refresh internal documentation inventory."""
log.debug("Refreshing documentation inventory...")
# Clear the old base URLS and inventories to ensure
@@ -186,16 +187,13 @@ class Doc(commands.Cog):
"""
Given a Python symbol, return its signature and description.
- :param symbol: The symbol for which HTML data should be returned.
- :return:
- A tuple in the form (str, str), or `None`.
- The first tuple element is the signature of the given
- symbol as a markup-free string, and the second tuple
- element is the description of the given symbol with HTML
- markup included. If the given symbol could not be found,
- returns `None`.
- """
+ Returns a tuple in the form (str, str), or `None`.
+ The first tuple element is the signature of the given symbol as a markup-free string, and
+ the second tuple element is the description of the given symbol with HTML markup included.
+
+ If the given symbol could not be found, returns `None`.
+ """
url = self.inventories.get(symbol)
if url is None:
return None
@@ -223,16 +221,10 @@ class Doc(commands.Cog):
@async_cache(arg_offset=1)
async def get_symbol_embed(self, symbol: str) -> Optional[discord.Embed]:
"""
- Using `get_symbol_html`, attempt to scrape and
- fetch the data for the given `symbol`, and build
- a formatted embed out of its contents.
-
- :param symbol: The symbol for which the embed should be returned
- :return:
- If the symbol is known, an Embed with documentation about it.
- Otherwise, `None`.
- """
+ Attempt to scrape and fetch the data for the given `symbol`, and build an embed from its contents.
+ If the symbol is known, an Embed with documentation about it is returned.
+ """
scraped_html = await self.get_symbol_html(symbol)
if scraped_html is None:
return None
@@ -267,20 +259,16 @@ class Doc(commands.Cog):
)
@commands.group(name='docs', aliases=('doc', 'd'), invoke_without_command=True)
- async def docs_group(self, ctx, symbol: commands.clean_content = None):
+ async def docs_group(self, ctx: commands.Context, symbol: commands.clean_content = None) -> None:
"""Lookup documentation for Python symbols."""
-
await ctx.invoke(self.get_command)
@docs_group.command(name='get', aliases=('g',))
- async def get_command(self, ctx, symbol: commands.clean_content = None):
+ async def get_command(self, ctx: commands.Context, symbol: commands.clean_content = None) -> None:
"""
Return a documentation embed for a given symbol.
- If no symbol is given, return a list of all available inventories.
- :param ctx: Discord message context
- :param symbol: The symbol for which documentation should be returned,
- or nothing to get a list of all inventories
+ If no symbol is given, return a list of all available inventories.
Examples:
!docs
@@ -288,7 +276,6 @@ class Doc(commands.Cog):
!docs aiohttp.ClientSession
!docs get aiohttp.ClientSession
"""
-
if symbol is None:
inventory_embed = discord.Embed(
title=f"All inventories (`{len(self.base_urls)}` total)",
@@ -322,18 +309,13 @@ class Doc(commands.Cog):
@docs_group.command(name='set', aliases=('s',))
@with_role(*MODERATION_ROLES)
async def set_command(
- self, ctx, package_name: ValidPythonIdentifier,
+ self, ctx: commands.Context, package_name: ValidPythonIdentifier,
base_url: ValidURL, inventory_url: InventoryURL
- ):
+ ) -> None:
"""
Adds a new documentation metadata object to the site's database.
- The database will update the object, should an existing item
- with the specified `package_name` already exist.
- :param ctx: Discord message context
- :param package_name: The package name, for example `aiohttp`.
- :param base_url: The package documentation's root URL, used to build absolute links.
- :param inventory_url: The intersphinx inventory URL.
+ The database will update the object, should an existing item with the specified `package_name` already exist.
Example:
!docs set \
@@ -341,7 +323,6 @@ class Doc(commands.Cog):
https://discordpy.readthedocs.io/en/rewrite/ \
https://discordpy.readthedocs.io/en/rewrite/objects.inv
"""
-
body = {
'package': package_name,
'base_url': base_url,
@@ -365,17 +346,13 @@ class Doc(commands.Cog):
@docs_group.command(name='delete', aliases=('remove', 'rm', 'd'))
@with_role(*MODERATION_ROLES)
- async def delete_command(self, ctx, package_name: ValidPythonIdentifier):
+ async def delete_command(self, ctx: commands.Context, package_name: ValidPythonIdentifier) -> None:
"""
Removes the specified package from the database.
- :param ctx: Discord message context
- :param package_name: The package name, for example `aiohttp`.
-
Examples:
!docs delete aiohttp
"""
-
await self.bot.api_client.delete(f'bot/documentation-links/{package_name}')
async with ctx.typing():
@@ -385,5 +362,7 @@ class Doc(commands.Cog):
await ctx.send(f"Successfully deleted `{package_name}` and refreshed inventory.")
-def setup(bot):
+def setup(bot: commands.Bot) -> None:
+ """Doc cog load."""
bot.add_cog(Doc(bot))
+ log.info("Cog loaded: Doc")
diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py
index e2d8c3a8f..49411814c 100644
--- a/bot/cogs/error_handler.py
+++ b/bot/cogs/error_handler.py
@@ -30,7 +30,27 @@ class ErrorHandler(Cog):
self.bot = bot
@Cog.listener()
- async def on_command_error(self, ctx: Context, e: CommandError):
+ async def on_command_error(self, ctx: Context, e: CommandError) -> None:
+ """
+ Provide generic command error handling.
+
+ Error handling is deferred to any local error handler, if present.
+
+ Error handling emits a single error response, prioritized as follows:
+ 1. If the name fails to match a command but matches a tag, the tag is invoked
+ 2. Send a BadArgument error message to the invoking context & invoke the command's help
+ 3. Send a UserInputError error message to the invoking context & invoke the command's help
+ 4. Send a NoPrivateMessage error message to the invoking context
+ 5. Send a BotMissingPermissions error message to the invoking context
+ 6. Log a MissingPermissions error, no message is sent
+ 7. Send a InChannelCheckFailure error message to the invoking context
+ 8. Log CheckFailure, CommandOnCooldown, and DisabledCommand errors, no message is sent
+ 9. For CommandInvokeErrors, response is based on the type of error:
+ * 404: Error message is sent to the invoking context
+ * 400: Log the resopnse JSON, no message is sent
+ * 500 <= status <= 600: Error message is sent to the invoking context
+ 10. Otherwise, handling is deferred to `handle_unexpected_error`
+ """
command = ctx.command
parent = None
@@ -57,7 +77,8 @@ class ErrorHandler(Cog):
# Return to not raise the exception
with contextlib.suppress(ResponseCodeError):
- return await ctx.invoke(tags_get_command, tag_name=ctx.invoked_with)
+ await ctx.invoke(tags_get_command, tag_name=ctx.invoked_with)
+ return
elif isinstance(e, BadArgument):
await ctx.send(f"Bad argument: {e}\n")
await ctx.invoke(*help_command)
@@ -109,7 +130,8 @@ class ErrorHandler(Cog):
await self.handle_unexpected_error(ctx, e)
@staticmethod
- async def handle_unexpected_error(ctx: Context, e: CommandError):
+ async def handle_unexpected_error(ctx: Context, e: CommandError) -> None:
+ """Generic handler for errors without an explicit handler."""
await ctx.send(
f"Sorry, an unexpected error occurred. Please let us know!\n\n"
f"```{e.__class__.__name__}: {e}```"
@@ -120,6 +142,7 @@ class ErrorHandler(Cog):
raise e
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Error handler cog load."""
bot.add_cog(ErrorHandler(bot))
log.info("Cog loaded: Events")
diff --git a/bot/cogs/eval.py b/bot/cogs/eval.py
index c52c04df1..9ce854f2c 100644
--- a/bot/cogs/eval.py
+++ b/bot/cogs/eval.py
@@ -6,9 +6,10 @@ import re
import textwrap
import traceback
from io import StringIO
+from typing import Any, Optional, Tuple
import discord
-from discord.ext.commands import Bot, Cog, group
+from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Roles
from bot.decorators import with_role
@@ -18,10 +19,7 @@ log = logging.getLogger(__name__)
class CodeEval(Cog):
- """
- Owner and admin feature that evaluates code
- and returns the result to the channel.
- """
+ """Owner and admin feature that evaluates code and returns the result to the channel."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -31,7 +29,8 @@ class CodeEval(Cog):
self.interpreter = Interpreter(bot)
- def _format(self, inp, out): # (str, Any) -> (str, discord.Embed)
+ def _format(self, inp: str, out: Any) -> Tuple[str, Optional[discord.Embed]]:
+ """Format the eval output into a string & attempt to format it into an Embed."""
self._ = out
res = ""
@@ -124,7 +123,8 @@ class CodeEval(Cog):
return res # Return (text, embed)
- async def _eval(self, ctx, code): # (discord.Context, str) -> None
+ async def _eval(self, ctx: Context, code: str) -> Optional[discord.Message]:
+ """Eval the input code string & send an embed to the invoking context."""
self.ln += 1
if code.startswith("exit"):
@@ -174,16 +174,15 @@ async def func(): # (None,) -> Any
@group(name='internal', aliases=('int',))
@with_role(Roles.owner, Roles.admin)
- async def internal_group(self, ctx):
+ async def internal_group(self, ctx: Context) -> None:
"""Internal commands. Top secret!"""
-
if not ctx.invoked_subcommand:
await ctx.invoke(self.bot.get_command("help"), "internal")
@internal_group.command(name='eval', aliases=('e',))
@with_role(Roles.admin, Roles.owner)
- async def eval(self, ctx, *, code: str):
- """ Run eval in a REPL-like format. """
+ async def eval(self, ctx: Context, *, code: str) -> None:
+ """Run eval in a REPL-like format."""
code = code.strip("`")
if re.match('py(thon)?\n', code):
code = "\n".join(code.split("\n")[1:])
@@ -197,6 +196,7 @@ async def func(): # (None,) -> Any
await self._eval(ctx, code)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Code eval cog load."""
bot.add_cog(CodeEval(bot))
log.info("Cog loaded: Eval")
diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py
index dc4de7ff1..9cd1b7203 100644
--- a/bot/cogs/filtering.py
+++ b/bot/cogs/filtering.py
@@ -30,10 +30,7 @@ ZALGO_RE = r"[\u0300-\u036F\u0489]"
class Filtering(Cog):
- """
- Filtering out invites, blacklisting domains,
- and warning us of certain regular expressions
- """
+ """Filtering out invites, blacklisting domains, and warning us of certain regular expressions."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -94,28 +91,29 @@ class Filtering(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_message(self, msg: Message):
+ async def on_message(self, msg: Message) -> None:
+ """Invoke message filter for new messages."""
await self._filter_message(msg)
@Cog.listener()
- async def on_message_edit(self, before: Message, after: Message):
+ async def on_message_edit(self, before: Message, after: Message) -> None:
+ """
+ Invoke message filter for message edits.
+
+ If there have been multiple edits, calculate the time delta from the previous edit.
+ """
if not before.edited_at:
delta = relativedelta(after.edited_at, before.created_at).microseconds
else:
delta = relativedelta(after.edited_at, before.edited_at).microseconds
await self._filter_message(after, delta)
- async def _filter_message(self, msg: Message, delta: Optional[int] = None):
- """
- Whenever a message is sent or edited,
- run it through our filters to see if it
- violates any of our rules, and then respond
- accordingly.
- """
-
+ async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None:
+ """Filter the input message to see if it violates any of our rules, and then respond accordingly."""
# Should we filter this message?
role_whitelisted = False
@@ -226,14 +224,10 @@ class Filtering(Cog):
@staticmethod
async def _has_watchlist_words(text: str) -> bool:
"""
- Returns True if the text contains
- one of the regular expressions from the
- word_watchlist in our filter config.
+ Returns True if the text contains one of the regular expressions from the word_watchlist in our filter config.
- Only matches words with boundaries before
- and after the expression.
+ Only matches words with boundaries before and after the expression.
"""
-
for expression in Filter.word_watchlist:
if re.search(fr"\b{expression}\b", text, re.IGNORECASE):
return True
@@ -243,14 +237,10 @@ class Filtering(Cog):
@staticmethod
async def _has_watchlist_tokens(text: str) -> bool:
"""
- Returns True if the text contains
- one of the regular expressions from the
- token_watchlist in our filter config.
+ Returns True if the text contains one of the regular expressions from the token_watchlist in our filter config.
- This will match the expression even if it
- does not have boundaries before and after
+ This will match the expression even if it does not have boundaries before and after.
"""
-
for expression in Filter.token_watchlist:
if re.search(fr"{expression}", text, re.IGNORECASE):
@@ -262,11 +252,7 @@ class Filtering(Cog):
@staticmethod
async def _has_urls(text: str) -> bool:
- """
- Returns True if the text contains one of
- the blacklisted URLs from the config file.
- """
-
+ """Returns True if the text contains one of the blacklisted URLs from the config file."""
if not re.search(URL_RE, text, re.IGNORECASE):
return False
@@ -285,7 +271,6 @@ class Filtering(Cog):
Zalgo range is \u0300 – \u036F and \u0489.
"""
-
return bool(re.search(ZALGO_RE, text))
async def _has_invites(self, text: str) -> Union[dict, bool]:
@@ -297,7 +282,6 @@ class Filtering(Cog):
Attempts to catch some of common ways to try to cheat the system.
"""
-
# Remove backslashes to prevent escape character aroundfuckery like
# discord\.gg/gdudes-pony-farm
text = text.replace("\\", "")
@@ -338,30 +322,27 @@ class Filtering(Cog):
return invite_data if invite_data else False
@staticmethod
- async def _has_rich_embed(msg: Message):
- """
- Returns True if any of the embeds in the message are of type 'rich', but are not twitter
- embeds. Returns False otherwise.
- """
+ async def _has_rich_embed(msg: Message) -> bool:
+ """Returns True if any of the embeds in the message are of type 'rich', but are not twitter embeds."""
if msg.embeds:
for embed in msg.embeds:
if embed.type == "rich" and (not embed.url or "twitter.com" not in embed.url):
return True
return False
- async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel):
+ async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None:
"""
- Notify filtered_member about a moderation action with the reason str
+ Notify filtered_member about a moderation action with the reason str.
First attempts to DM the user, fall back to in-channel notification if user has DMs disabled
"""
-
try:
await filtered_member.send(reason)
except discord.errors.Forbidden:
await channel.send(f"{filtered_member.mention} {reason}")
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Filtering cog load."""
bot.add_cog(Filtering(bot))
log.info("Cog loaded: Filtering")
diff --git a/bot/cogs/free.py b/bot/cogs/free.py
index 92a9ca041..167fab319 100644
--- a/bot/cogs/free.py
+++ b/bot/cogs/free.py
@@ -2,7 +2,7 @@ import logging
from datetime import datetime
from discord import Colour, Embed, Member, utils
-from discord.ext.commands import Cog, Context, command
+from discord.ext.commands import Bot, Cog, Context, command
from bot.constants import Categories, Channels, Free, STAFF_ROLES
from bot.decorators import redirect_output
@@ -22,11 +22,9 @@ class Free(Cog):
@command(name="free", aliases=('f',))
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def free(self, ctx: Context, user: Member = None, seek: int = 2):
+ async def free(self, ctx: Context, user: Member = None, seek: int = 2) -> None:
"""
Lists free help channels by likeliness of availability.
- :param user: accepts user mention, ID, etc.
- :param seek: How far back to check the last active message.
seek is used only when this command is invoked in a help channel.
You cannot override seek without mentioning a user first.
@@ -101,6 +99,7 @@ class Free(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Free cog load."""
bot.add_cog(Free())
log.info("Cog loaded: Free")
diff --git a/bot/cogs/help.py b/bot/cogs/help.py
index 31e729003..4971cd0bb 100644
--- a/bot/cogs/help.py
+++ b/bot/cogs/help.py
@@ -3,10 +3,11 @@ import inspect
import itertools
from collections import namedtuple
from contextlib import suppress
+from typing import Union
-from discord import Colour, Embed, HTTPException
+from discord import Colour, Embed, HTTPException, Message, Reaction, User
from discord.ext import commands
-from discord.ext.commands import CheckFailure, Cog as DiscordCog
+from discord.ext.commands import Bot, CheckFailure, Cog as DiscordCog, Command, Context
from fuzzywuzzy import fuzz, process
from bot import constants
@@ -35,15 +36,11 @@ class HelpQueryNotFound(ValueError):
Contains the custom attribute of ``possible_matches``.
- Attributes
- ----------
- possible_matches: dict
- Any commands that were close to matching the Query.
- The possible matched command names are the keys.
- The likeness match scores are the values.
+ Instances of this object contain a dictionary of any command(s) that were close to matching the
+ query, where keys are the possible matched command names and values are the likeness match scores.
"""
- def __init__(self, arg, possible_matches=None):
+ def __init__(self, arg: str, possible_matches: dict = None):
super().__init__(arg)
self.possible_matches = possible_matches
@@ -52,48 +49,30 @@ class HelpSession:
"""
An interactive session for bot and command help output.
- Attributes
- ----------
- title: str
- The title of the help message.
- query: Union[:class:`discord.ext.commands.Bot`,
- :class:`discord.ext.commands.Command]
- description: str
- The description of the query.
- pages: list[str]
- A list of the help content split into manageable pages.
- message: :class:`discord.Message`
- The message object that's showing the help contents.
- destination: :class:`discord.abc.Messageable`
- Where the help message is to be sent to.
+ Expected attributes include:
+ * title: str
+ The title of the help message.
+ * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command]
+ * description: str
+ The description of the query.
+ * pages: list[str]
+ A list of the help content split into manageable pages.
+ * message: `discord.Message`
+ The message object that's showing the help contents.
+ * destination: `discord.abc.Messageable`
+ Where the help message is to be sent to.
"""
- def __init__(self, ctx, *command, cleanup=False, only_can_run=True, show_hidden=False, max_lines=15):
- """
- Creates an instance of the HelpSession class.
-
- Parameters
- ----------
- ctx: :class:`discord.Context`
- The context of the invoked help command.
- *command: str
- A variable argument of the command being queried.
- cleanup: Optional[bool]
- Set to ``True`` to have the message deleted on timeout.
- If ``False``, it will clear all reactions on timeout.
- Defaults to ``False``.
- only_can_run: Optional[bool]
- Set to ``True`` to hide commands the user can't run.
- Defaults to ``False``.
- show_hidden: Optional[bool]
- Set to ``True`` to include hidden commands.
- Defaults to ``False``.
- max_lines: Optional[int]
- Sets the max number of lines the paginator will add to a
- single page.
- Defaults to 20.
- """
-
+ def __init__(
+ self,
+ ctx: Context,
+ *command,
+ cleanup: bool = False,
+ only_can_run: bool = True,
+ show_hidden: bool = False,
+ max_lines: int = 15
+ ):
+ """Creates an instance of the HelpSession class."""
self._ctx = ctx
self._bot = ctx.bot
self.title = "Command Help"
@@ -122,20 +101,8 @@ class HelpSession:
self._timeout_task = None
self.reset_timeout()
- def _get_query(self, query):
- """
- Attempts to match the provided query with a valid command or cog.
-
- Parameters
- ----------
- query: str
- The joined string representing the session query.
-
- Returns
- -------
- Union[:class:`discord.ext.commands.Command`, :class:`Cog`]
- """
-
+ def _get_query(self, query: str) -> Union[Command, Cog]:
+ """Attempts to match the provided query with a valid command or cog."""
command = self._bot.get_command(query)
if command:
return command
@@ -150,48 +117,26 @@ class HelpSession:
self._handle_not_found(query)
- def _handle_not_found(self, query):
+ def _handle_not_found(self, query: str) -> None:
"""
Handles when a query does not match a valid command or cog.
- Will pass on possible close matches along with the
- ``HelpQueryNotFound`` exception.
-
- Parameters
- ----------
- query: str
- The full query that was requested.
-
- Raises
- ------
- HelpQueryNotFound
+ Will pass on possible close matches along with the `HelpQueryNotFound` exception.
"""
-
- # combine command and cog names
+ # Combine command and cog names
choices = list(self._bot.all_commands) + list(self._bot.cogs)
result = process.extractBests(query, choices, scorer=fuzz.ratio, score_cutoff=90)
raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result))
- async def timeout(self, seconds=30):
- """
- Waits for a set number of seconds, then stops the help session.
-
- Parameters
- ----------
- seconds: int
- Number of seconds to wait.
- """
-
+ async def timeout(self, seconds: int = 30) -> None:
+ """Waits for a set number of seconds, then stops the help session."""
await asyncio.sleep(seconds)
await self.stop()
- def reset_timeout(self):
- """
- Cancels the original timeout task and sets it again from the start.
- """
-
+ def reset_timeout(self) -> None:
+ """Cancels the original timeout task and sets it again from the start."""
# cancel original if it exists
if self._timeout_task:
if not self._timeout_task.cancelled():
@@ -200,18 +145,8 @@ class HelpSession:
# recreate the timeout task
self._timeout_task = self._bot.loop.create_task(self.timeout())
- async def on_reaction_add(self, reaction, user):
- """
- Event handler for when reactions are added on the help message.
-
- Parameters
- ----------
- reaction: :class:`discord.Reaction`
- The reaction that was added.
- user: :class:`discord.User`
- The user who added the reaction.
- """
-
+ async def on_reaction_add(self, reaction: Reaction, user: User) -> None:
+ """Event handler for when reactions are added on the help message."""
# ensure it was the relevant session message
if reaction.message.id != self.message.id:
return
@@ -237,24 +172,13 @@ class HelpSession:
with suppress(HTTPException):
await self.message.remove_reaction(reaction, user)
- async def on_message_delete(self, message):
- """
- Closes the help session when the help message is deleted.
-
- Parameters
- ----------
- message: :class:`discord.Message`
- The message that was deleted.
- """
-
+ async def on_message_delete(self, message: Message) -> None:
+ """Closes the help session when the help message is deleted."""
if message.id == self.message.id:
await self.stop()
- async def prepare(self):
- """
- Sets up the help session pages, events, message and reactions.
- """
-
+ async def prepare(self) -> None:
+ """Sets up the help session pages, events, message and reactions."""
# create paginated content
await self.build_pages()
@@ -266,12 +190,8 @@ class HelpSession:
await self.update_page()
self.add_reactions()
- def add_reactions(self):
- """
- Adds the relevant reactions to the help message based on if
- pagination is required.
- """
-
+ def add_reactions(self) -> None:
+ """Adds the relevant reactions to the help message based on if pagination is required."""
# if paginating
if len(self._pages) > 1:
for reaction in REACTIONS:
@@ -281,44 +201,22 @@ class HelpSession:
else:
self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI))
- def _category_key(self, cmd):
+ def _category_key(self, cmd: Command) -> str:
"""
- Returns a cog name of a given command. Used as a key for
- ``sorted`` and ``groupby``.
-
- A zero width space is used as a prefix for results with no cogs
- to force them last in ordering.
+ Returns a cog name of a given command for use as a key for `sorted` and `groupby`.
- Parameters
- ----------
- cmd: :class:`discord.ext.commands.Command`
- The command object being checked.
-
- Returns
- -------
- str
+ A zero width space is used as a prefix for results with no cogs to force them last in ordering.
"""
-
cog = cmd.cog_name
return f'**{cog}**' if cog else f'**\u200bNo Category:**'
- def _get_command_params(self, cmd):
+ def _get_command_params(self, cmd: Command) -> str:
"""
Returns the command usage signature.
- This is a custom implementation of ``command.signature`` in
- order to format the command signature without aliases.
-
- Parameters
- ----------
- cmd: :class:`discord.ext.commands.Command`
- The command object to get the parameters of.
-
- Returns
- -------
- str
+ This is a custom implementation of `command.signature` in order to format the command
+ signature without aliases.
"""
-
results = []
for name, param in cmd.clean_params.items():
@@ -346,16 +244,8 @@ class HelpSession:
return f"{cmd.name} {' '.join(results)}"
- async def build_pages(self):
- """
- Builds the list of content pages to be paginated through in the
- help message.
-
- Returns
- -------
- list[str]
- """
-
+ async def build_pages(self) -> None:
+ """Builds the list of content pages to be paginated through in the help message, as a list of str."""
# Use LinePaginator to restrict embed line height
paginator = LinePaginator(prefix='', suffix='', max_lines=self._max_lines)
@@ -482,20 +372,8 @@ class HelpSession:
# save organised pages to session
self._pages = paginator.pages
- def embed_page(self, page_number=0):
- """
- Returns an Embed with the requested page formatted within.
-
- Parameters
- ----------
- page_number: int
- The page to be retrieved. Zero indexed.
-
- Returns
- -------
- :class:`discord.Embed`
- """
-
+ def embed_page(self, page_number: int = 0) -> Embed:
+ """Returns an Embed with the requested page formatted within."""
embed = Embed()
# if command or cog, add query to title for pages other than first
@@ -514,17 +392,8 @@ class HelpSession:
return embed
- async def update_page(self, page_number=0):
- """
- Sends the intial message, or changes the existing one to the
- given page number.
-
- Parameters
- ----------
- page_number: int
- The page number to show in the help message.
- """
-
+ async def update_page(self, page_number: int = 0) -> None:
+ """Sends the intial message, or changes the existing one to the given page number."""
self._current_page = page_number
embed_page = self.embed_page(page_number)
@@ -534,47 +403,27 @@ class HelpSession:
await self.message.edit(embed=embed_page)
@classmethod
- async def start(cls, ctx, *command, **options):
- """
- Create and begin a help session based on the given command
- context.
-
- Parameters
- ----------
- ctx: :class:`discord.ext.commands.Context`
- The context of the invoked help command.
- *command: str
- A variable argument of the command being queried.
- cleanup: Optional[bool]
- Set to ``True`` to have the message deleted on session end.
- Defaults to ``False``.
- only_can_run: Optional[bool]
- Set to ``True`` to hide commands the user can't run.
- Defaults to ``False``.
- show_hidden: Optional[bool]
- Set to ``True`` to include hidden commands.
- Defaults to ``False``.
- max_lines: Optional[int]
- Sets the max number of lines the paginator will add to a
- single page.
- Defaults to 20.
-
- Returns
- -------
- :class:`HelpSession`
+ async def start(cls, ctx: Context, *command, **options) -> "HelpSession":
"""
+ Create and begin a help session based on the given command context.
+ Available options kwargs:
+ * cleanup: Optional[bool]
+ Set to `True` to have the message deleted on session end. Defaults to `False`.
+ * only_can_run: Optional[bool]
+ Set to `True` to hide commands the user can't run. Defaults to `False`.
+ * show_hidden: Optional[bool]
+ Set to `True` to include hidden commands. Defaults to `False`.
+ * max_lines: Optional[int]
+ Sets the max number of lines the paginator will add to a single page. Defaults to 20.
+ """
session = cls(ctx, *command, **options)
await session.prepare()
return session
- async def stop(self):
- """
- Stops the help session, removes event listeners and attempts to
- delete the help message.
- """
-
+ async def stop(self) -> None:
+ """Stops the help session, removes event listeners and attempts to delete the help message."""
self._bot.remove_listener(self.on_reaction_add)
self._bot.remove_listener(self.on_message_delete)
@@ -586,80 +435,47 @@ class HelpSession:
await self.message.clear_reactions()
@property
- def is_first_page(self):
- """
- A bool reflecting if session is currently showing the first page.
-
- Returns
- -------
- bool
- """
-
+ def is_first_page(self) -> bool:
+ """Check if session is currently showing the first page."""
return self._current_page == 0
@property
- def is_last_page(self):
- """
- A bool reflecting if the session is currently showing the last page.
-
- Returns
- -------
- bool
- """
-
+ def is_last_page(self) -> bool:
+ """Check if the session is currently showing the last page."""
return self._current_page == (len(self._pages)-1)
- async def do_first(self):
- """
- Event that is called when the user requests the first page.
- """
-
+ async def do_first(self) -> None:
+ """Event that is called when the user requests the first page."""
if not self.is_first_page:
await self.update_page(0)
- async def do_back(self):
- """
- Event that is called when the user requests the previous page.
- """
-
+ async def do_back(self) -> None:
+ """Event that is called when the user requests the previous page."""
if not self.is_first_page:
await self.update_page(self._current_page-1)
- async def do_next(self):
- """
- Event that is called when the user requests the next page.
- """
-
+ async def do_next(self) -> None:
+ """Event that is called when the user requests the next page."""
if not self.is_last_page:
await self.update_page(self._current_page+1)
- async def do_end(self):
- """
- Event that is called when the user requests the last page.
- """
-
+ async def do_end(self) -> None:
+ """Event that is called when the user requests the last page."""
if not self.is_last_page:
await self.update_page(len(self._pages)-1)
- async def do_stop(self):
- """
- Event that is called when the user requests to stop the help session.
- """
-
+ async def do_stop(self) -> None:
+ """Event that is called when the user requests to stop the help session."""
await self.message.delete()
class Help(DiscordCog):
- """
- Custom Embed Pagination Help feature
- """
+ """Custom Embed Pagination Help feature."""
+
@commands.command('help')
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def new_help(self, ctx, *commands):
- """
- Shows Command Help.
- """
-
+ async def new_help(self, ctx: Context, *commands) -> None:
+ """Shows Command Help."""
try:
await HelpSession.start(ctx, *commands)
except HelpQueryNotFound as error:
@@ -674,42 +490,29 @@ class Help(DiscordCog):
await ctx.send(embed=embed)
-def unload(bot):
+def unload(bot: Bot) -> None:
"""
Reinstates the original help command.
- This is run if the cog raises an exception on load, or if the
- extension is unloaded.
-
- Parameters
- ----------
- bot: :class:`discord.ext.commands.Bot`
- The discord bot client.
+ This is run if the cog raises an exception on load, or if the extension is unloaded.
"""
-
bot.remove_command('help')
bot.add_command(bot._old_help)
-def setup(bot):
+def setup(bot: Bot) -> None:
"""
The setup for the help extension.
This is called automatically on `bot.load_extension` being run.
- Stores the original help command instance on the ``bot._old_help``
- attribute for later reinstatement, before removing it from the
- command registry so the new help command can be loaded successfully.
-
- If an exception is raised during the loading of the cog, ``unload``
- will be called in order to reinstate the original help command.
+ Stores the original help command instance on the `bot._old_help` attribute for later
+ reinstatement, before removing it from the command registry so the new help command can be
+ loaded successfully.
- Parameters
- ----------
- bot: `discord.ext.commands.Bot`
- The discord bot client.
+ If an exception is raised during the loading of the cog, `unload` will be called in order to
+ reinstate the original help command.
"""
-
bot._old_help = bot.get_command('help')
bot.remove_command('help')
@@ -720,18 +523,12 @@ def setup(bot):
raise
-def teardown(bot):
+def teardown(bot: Bot) -> None:
"""
The teardown for the help extension.
This is called automatically on `bot.unload_extension` being run.
- Calls ``unload`` in order to reinstate the original help command.
-
- Parameters
- ----------
- bot: `discord.ext.commands.Bot`
- The discord bot client.
+ Calls `unload` in order to reinstate the original help command.
"""
-
unload(bot)
diff --git a/bot/cogs/information.py b/bot/cogs/information.py
index c4aff73b8..60aec6219 100644
--- a/bot/cogs/information.py
+++ b/bot/cogs/information.py
@@ -13,23 +13,15 @@ log = logging.getLogger(__name__)
class Information(Cog):
- """
- A cog with commands for generating embeds with
- server information, such as server statistics
- and user information.
- """
+ """A cog with commands for generating embeds with server info, such as server stats and user info."""
def __init__(self, bot: Bot):
self.bot = bot
@with_role(*MODERATION_ROLES)
@command(name="roles")
- async def roles_info(self, ctx: Context):
- """
- Returns a list of all roles and their
- corresponding IDs.
- """
-
+ async def roles_info(self, ctx: Context) -> None:
+ """Returns a list of all roles and their corresponding IDs."""
# Sort the roles alphabetically and remove the @everyone role
roles = sorted(ctx.guild.roles, key=lambda role: role.name)
roles = [role for role in roles if role.name != "@everyone"]
@@ -51,12 +43,8 @@ class Information(Cog):
await ctx.send(embed=embed)
@command(name="server", aliases=["server_info", "guild", "guild_info"])
- async def server_info(self, ctx: Context):
- """
- Returns an embed full of
- server information.
- """
-
+ async def server_info(self, ctx: Context) -> None:
+ """Returns an embed full of server information."""
created = time_since(ctx.guild.created_at, precision="days")
features = ", ".join(ctx.guild.features)
region = ctx.guild.region
@@ -120,11 +108,8 @@ class Information(Cog):
await ctx.send(embed=embed)
@command(name="user", aliases=["user_info", "member", "member_info"])
- async def user_info(self, ctx: Context, user: Member = None, hidden: bool = False):
- """
- Returns info about a user.
- """
-
+ async def user_info(self, ctx: Context, user: Member = None, hidden: bool = False) -> None:
+ """Returns info about a user."""
if user is None:
user = ctx.author
@@ -197,6 +182,7 @@ class Information(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Information cog load."""
bot.add_cog(Information(bot))
log.info("Cog loaded: Information")
diff --git a/bot/cogs/jams.py b/bot/cogs/jams.py
index dd14111ce..be9d33e3e 100644
--- a/bot/cogs/jams.py
+++ b/bot/cogs/jams.py
@@ -11,22 +11,16 @@ log = logging.getLogger(__name__)
class CodeJams(commands.Cog):
- """
- Manages the code-jam related parts of our server
- """
+ """Manages the code-jam related parts of our server."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.command()
@with_role(Roles.admin)
- async def createteam(
- self, ctx: commands.Context,
- team_name: str, members: commands.Greedy[Member]
- ):
+ async def createteam(self, ctx: commands.Context, team_name: str, members: commands.Greedy[Member]) -> None:
"""
- Create a team channel (both voice and text) in the Code Jams category, assign roles
- and then add overwrites for the team.
+ Create team channels (voice and text) in the Code Jams category, assign roles, and add overwrites for the team.
The first user passed will always be the team leader.
"""
@@ -114,6 +108,7 @@ class CodeJams(commands.Cog):
)
-def setup(bot):
+def setup(bot: commands.Bot) -> None:
+ """Code Jams cog load."""
bot.add_cog(CodeJams(bot))
log.info("Cog loaded: CodeJams")
diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py
index 64bbed46e..8e47bcc36 100644
--- a/bot/cogs/logging.py
+++ b/bot/cogs/logging.py
@@ -10,15 +10,14 @@ log = logging.getLogger(__name__)
class Logging(Cog):
- """
- Debug logging module
- """
+ """Debug logging module."""
def __init__(self, bot: Bot):
self.bot = bot
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Announce our presence to the configured devlog channel."""
log.info("Bot connected!")
embed = Embed(description="Connected!")
@@ -35,6 +34,7 @@ class Logging(Cog):
await self.bot.get_channel(Channels.devlog).send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Logging cog load."""
bot.add_cog(Logging(bot))
log.info("Cog loaded: Logging")
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py
index fea86c33e..81b3864a7 100644
--- a/bot/cogs/moderation.py
+++ b/bot/cogs/moderation.py
@@ -33,6 +33,7 @@ APPEALABLE_INFRACTIONS = ("Ban", "Mute")
def proxy_user(user_id: str) -> Object:
+ """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
try:
user_id = int(user_id)
except ValueError:
@@ -47,9 +48,7 @@ UserTypes = Union[Member, User, proxy_user]
class Moderation(Scheduler, Cog):
- """
- Server moderation tools.
- """
+ """Server moderation tools."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -58,10 +57,12 @@ class Moderation(Scheduler, Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Schedule expiration for previous infractions."""
# Schedule expiration for previous infractions
infractions = await self.bot.api_client.get(
'bot/infractions', params={'active': 'true'}
@@ -74,14 +75,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a warning infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
- """
-
+ async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a warning infraction in the database for a user."""
infraction = await post_infraction(ctx, user, type="warning", reason=reason)
if infraction is None:
return
@@ -120,14 +115,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def kick(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Kicks a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
- """
-
+ async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Kicks a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'kick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -176,14 +165,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a permanent ban infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
- """
-
+ async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a permanent ban infraction for a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'ban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -242,14 +225,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def mute(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Create a permanent mute infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
- """
-
+ async def mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Create a permanent mute infraction for a user with the provided reason."""
if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
return
@@ -304,11 +281,9 @@ class Moderation(Scheduler, Cog):
@command()
async def tempmute(self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ Duration strings are parsed per: http://strftime.org/
"""
expiration = duration
@@ -372,11 +347,9 @@ class Moderation(Scheduler, Cog):
@command()
async def tempban(self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ Duration strings are parsed per: http://strftime.org/
"""
expiration = duration
@@ -453,12 +426,10 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn'])
async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a private infraction note in the database for a user.
+ Create a private infraction note in the database for a user with the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
+ This does not send the user a notification
"""
-
infraction = await post_infraction(ctx, user, type="warning", reason=reason, hidden=True)
if infraction is None:
return
@@ -485,12 +456,10 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=['shadowkick', 'skick'])
async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Kicks a user.
+ Kick a user for the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -538,12 +507,10 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=['shadowban', 'sban'])
async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a permanent ban infraction in the database for a user.
+ Create a permanent ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -595,12 +562,10 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=['shadowmute', 'smute'])
async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Create a permanent mute infraction in the database for a user.
+ Create a permanent mute infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
+ This does not send the user a notification.
"""
-
if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
return
@@ -635,19 +600,14 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=["shadowtempmute, stempmute"])
async def shadow_tempmute(
- self,
- ctx: Context,
- user: Member,
- duration: ExpirationDate,
- *,
- reason: str = None
+ self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None
) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ Duration strings are parsed per: http://strftime.org/
+
+ This does not send the user a notification.
"""
expiration = duration
@@ -693,19 +653,14 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=["shadowtempban, stempban"])
async def shadow_tempban(
- self,
- ctx: Context,
- user: UserTypes,
- duration: ExpirationDate,
- *,
- reason: str = None
+ self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None
) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ Duration strings are parsed per: http://strftime.org/
+
+ This does not send the user a notification.
"""
expiration = duration
@@ -774,12 +729,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
async def unmute(self, ctx: Context, user: UserTypes) -> None:
- """
- Deactivates the active mute infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ """Deactivates the active mute infraction for a user."""
try:
# check the current active infraction
response = await self.bot.api_client.get(
@@ -857,12 +807,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
async def unban(self, ctx: Context, user: UserTypes) -> None:
- """
- Deactivates the active ban infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ """Deactivates the active ban infraction for a user."""
try:
# check the current active infraction
response = await self.bot.api_client.get(
@@ -925,16 +870,14 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
- async def infraction_group(self, ctx: Context):
+ async def infraction_group(self, ctx: Context) -> None:
"""Infraction manipulation commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction")
@with_role(*MODERATION_ROLES)
@infraction_group.group(name='edit', invoke_without_command=True)
- async def infraction_edit_group(self, ctx: Context):
+ async def infraction_edit_group(self, ctx: Context) -> None:
"""Infraction editing commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction", "edit")
@with_role(*MODERATION_ROLES)
@@ -942,15 +885,12 @@ class Moderation(Scheduler, Cog):
async def edit_duration(
self, ctx: Context,
infraction_id: int, expires_at: Union[ExpirationDate, str]
- ):
+ ) -> None:
"""
Sets the duration of the given infraction, relative to the time of updating.
- **`infraction_id`:** the id of the infraction
- **`expires_at`:** the new expiration date of the infraction.
- Use "permanent" to mark the infraction as permanent.
+ Duration strings are parsed per: http://strftime.org/, use "permanent" to mark the infraction as permanent.
"""
-
if isinstance(expires_at, str) and expires_at != 'permanent':
raise BadArgument(
"If `expires_at` is given as a non-datetime, "
@@ -1031,12 +971,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_edit_group.command(name="reason")
async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None:
- """
- Sets the reason of the given infraction.
- **`infraction_id`:** the id of the infraction
- **`reason`:** The new reason of the infraction
- """
-
+ """Edit the reason of the given infraction."""
try:
old_infraction = await self.bot.api_client.get(
'bot/infractions/' + str(infraction_id)
@@ -1087,11 +1022,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_group.group(name="search", invoke_without_command=True)
- async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery):
- """
- Searches for infractions in the database.
- """
-
+ async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
+ """Searches for infractions in the database."""
if isinstance(query, User):
await ctx.invoke(self.search_user, query)
@@ -1100,11 +1032,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="user", aliases=("member", "id"))
- async def search_user(self, ctx: Context, user: Union[User, proxy_user]):
- """
- Search for infractions by member.
- """
-
+ async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None:
+ """Search for infractions by member."""
infraction_list = await self.bot.api_client.get(
'bot/infractions',
params={'user__id': str(user.id)}
@@ -1117,11 +1046,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
- async def search_reason(self, ctx: Context, reason: str):
- """
- Search for infractions by their reason. Use Re2 for matching.
- """
-
+ async def search_reason(self, ctx: Context, reason: str) -> None:
+ """Search for infractions by their reason. Use Re2 for matching."""
infraction_list = await self.bot.api_client.get(
'bot/infractions', params={'search': reason}
)
@@ -1134,8 +1060,8 @@ class Moderation(Scheduler, Cog):
# endregion
# region: Utility functions
- async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list):
-
+ async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None:
+ """Send a paginated embed of infractions for the specified user."""
if not infractions:
await ctx.send(f":warning: No infractions could be found for that query.")
return
@@ -1158,17 +1084,9 @@ class Moderation(Scheduler, Cog):
# region: Utility functions
def schedule_expiration(
- self,
- loop: asyncio.AbstractEventLoop,
- infraction_object: Dict[str, Union[str, int, bool]]
+ self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]]
) -> None:
- """
- Schedules a task to expire a temporary infraction.
-
- :param loop: the asyncio event loop
- :param infraction_object: the infraction object to expire at the end of the task
- """
-
+ """Schedules a task to expire a temporary infraction."""
infraction_id = infraction_object["id"]
if infraction_id in self.scheduled_tasks:
return
@@ -1177,12 +1095,8 @@ class Moderation(Scheduler, Cog):
self.scheduled_tasks[infraction_id] = task
- def cancel_expiration(self, infraction_id: str):
- """
- Un-schedules a task set to expire a temporary infraction.
- :param infraction_id: the ID of the infraction in question
- """
-
+ def cancel_expiration(self, infraction_id: str) -> None:
+ """Un-schedules a task set to expire a temporary infraction."""
task = self.scheduled_tasks.get(infraction_id)
if task is None:
log.warning(f"Failed to unschedule {infraction_id}: no task found.")
@@ -1193,13 +1107,11 @@ class Moderation(Scheduler, Cog):
async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
"""
- A co-routine which marks an infraction as expired after the delay from the time of
- scheduling to the time of expiration. At the time of expiration, the infraction is
- marked as inactive on the website, and the expiration task is cancelled.
+ Marks an infraction expired after the delay from time of scheduling to time of expiration.
- :param infraction_object: the infraction in question
+ At the time of expiration, the infraction is marked as inactive on the website, and the
+ expiration task is cancelled. The user is then notified via DM.
"""
-
infraction_id = infraction_object["id"]
# transform expiration to delay in seconds
@@ -1224,11 +1136,9 @@ class Moderation(Scheduler, Cog):
async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
"""
A co-routine which marks an infraction as inactive on the website.
- This co-routine does not cancel or un-schedule an expiration task.
- :param infraction_object: the infraction in question
+ This co-routine does not cancel or un-schedule an expiration task.
"""
-
guild: Guild = self.bot.get_guild(constants.Guild.id)
user_id = infraction_object["user"]
infraction_type = infraction_object["type"]
@@ -1254,6 +1164,7 @@ class Moderation(Scheduler, Cog):
)
def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str:
+ """Convert the infraction object to a string representation."""
actor_id = infraction_object["actor"]
guild: Guild = self.bot.get_guild(constants.Guild.id)
actor = guild.get_member(actor_id)
@@ -1283,21 +1194,17 @@ class Moderation(Scheduler, Cog):
return lines.strip()
async def notify_infraction(
- self,
- user: Union[User, Member],
- infr_type: str,
- expires_at: Union[datetime, str] = 'N/A',
- reason: str = "No reason provided."
+ self,
+ user: Union[User, Member],
+ infr_type: str,
+ expires_at: Union[datetime, str] = 'N/A',
+ reason: str = "No reason provided."
) -> bool:
"""
- Notify a user of their fresh infraction :)
+ Attempt to notify a user, via DM, of their fresh infraction.
- :param user: The user to send the message to.
- :param infr_type: The type of infraction, as a string.
- :param duration: The duration of the infraction.
- :param reason: The reason for the infraction.
+ Returns a boolean indicator of whether the DM was successful.
"""
-
if isinstance(expires_at, datetime):
expires_at = expires_at.strftime('%c')
@@ -1328,14 +1235,10 @@ class Moderation(Scheduler, Cog):
icon_url: str = Icons.user_verified
) -> bool:
"""
- Notify a user that an infraction has been lifted.
+ Attempt to notify a user, via DM, of their expired infraction.
- :param user: The user to send the message to.
- :param title: The title of the embed.
- :param content: The content of the embed.
- :param icon_url: URL for the title icon.
+ Optionally returns a boolean indicator of whether the DM was successful.
"""
-
embed = Embed(
description=content,
colour=Colour(Colours.soft_green)
@@ -1349,10 +1252,8 @@ class Moderation(Scheduler, Cog):
"""
A helper method for sending an embed to a user's DMs.
- :param user: The user to send the embed to.
- :param embed: The embed to send.
+ Returns a boolean indicator of DM success.
"""
-
# sometimes `user` is a `discord.Object`, so let's make it a proper user.
user = await self.bot.fetch_user(user.id)
@@ -1366,7 +1267,8 @@ class Moderation(Scheduler, Cog):
)
return False
- async def log_notify_failure(self, target: str, actor: Member, infraction_type: str):
+ async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None:
+ """Send a mod log entry if an attempt to DM the target user has failed."""
await self.mod_log.send_log_message(
icon_url=Icons.token_removed,
content=actor.mention,
@@ -1381,7 +1283,8 @@ class Moderation(Scheduler, Cog):
# endregion
@staticmethod
- async def cog_command_error(ctx: Context, error) -> None:
+ async def cog_command_error(ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
if isinstance(error, BadUnionArgument):
if User in error.converters:
await ctx.send(str(error.errors[0]))
@@ -1391,15 +1294,11 @@ class Moderation(Scheduler, Cog):
async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool:
"""
Check if the highest role of the invoking member is greater than that of the target member.
+
If this check fails, a warning is sent to the invoking ctx.
Returns True always if target is not a discord.Member instance.
-
- :param ctx: The command context when invoked.
- :param target: The target of the infraction.
- :param infr_type: The type of infraction.
"""
-
if not isinstance(target, Member):
return True
@@ -1419,6 +1318,6 @@ class Moderation(Scheduler, Cog):
def setup(bot: Bot) -> None:
- """Sets up the Moderation cog."""
+ """Moderation cog load."""
bot.add_cog(Moderation(bot))
log.info("Cog loaded: Moderation")
diff --git a/bot/cogs/modlog.py b/bot/cogs/modlog.py
index 978646f46..68424d268 100644
--- a/bot/cogs/modlog.py
+++ b/bot/cogs/modlog.py
@@ -11,7 +11,7 @@ from discord import (
RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel
)
from discord.abc import GuildChannel
-from discord.ext.commands import Bot, Cog
+from discord.ext.commands import Bot, Cog, Context
from bot.constants import (
Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs
@@ -29,9 +29,7 @@ ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions")
class ModLog(Cog, name="ModLog"):
- """
- Logging for server events and staff actions
- """
+ """Logging for server events and staff actions."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -40,16 +38,14 @@ class ModLog(Cog, name="ModLog"):
self._cached_deletes = []
self._cached_edits = []
- async def upload_log(self, messages: List[Message], actor_id: int) -> Optional[str]:
+ async def upload_log(self, messages: List[Message], actor_id: int) -> str:
"""
- Uploads the log data to the database via
- an API endpoint for uploading logs.
+ Uploads the log data to the database via an API endpoint for uploading logs.
Used in several mod log embeds.
Returns a URL that can be used to view the log.
"""
-
response = await self.bot.api_client.post(
'bot/deleted-messages',
json={
@@ -70,7 +66,8 @@ class ModLog(Cog, name="ModLog"):
return f"{URLs.site_logs_view}/{response['id']}"
- def ignore(self, event: Event, *items: int):
+ def ignore(self, event: Event, *items: int) -> None:
+ """Add event to ignored events to suppress log emission."""
for item in items:
if item not in self._ignored[event]:
self._ignored[event].append(item)
@@ -90,7 +87,8 @@ class ModLog(Cog, name="ModLog"):
additional_embeds_msg: Optional[str] = None,
timestamp_override: Optional[datetime] = None,
footer: Optional[str] = None,
- ):
+ ) -> Context:
+ """Generate log embed and send to logging channel."""
embed = Embed(description=text)
if title and icon_url:
@@ -123,7 +121,8 @@ class ModLog(Cog, name="ModLog"):
return await self.bot.get_context(log_message) # Optionally return for use with antispam
@Cog.listener()
- async def on_guild_channel_create(self, channel: GUILD_CHANNEL):
+ async def on_guild_channel_create(self, channel: GUILD_CHANNEL) -> None:
+ """Log channel create event to mod log."""
if channel.guild.id != GuildConstant.id:
return
@@ -148,7 +147,8 @@ class ModLog(Cog, name="ModLog"):
await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message)
@Cog.listener()
- async def on_guild_channel_delete(self, channel: GUILD_CHANNEL):
+ async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None:
+ """Log channel delete event to mod log."""
if channel.guild.id != GuildConstant.id:
return
@@ -170,7 +170,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel):
+ async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel) -> None:
+ """Log channel update event to mod log."""
if before.guild.id != GuildConstant.id:
return
@@ -229,7 +230,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_create(self, role: Role):
+ async def on_guild_role_create(self, role: Role) -> None:
+ """Log role create event to mod log."""
if role.guild.id != GuildConstant.id:
return
@@ -239,7 +241,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_delete(self, role: Role):
+ async def on_guild_role_delete(self, role: Role) -> None:
+ """Log role delete event to mod log."""
if role.guild.id != GuildConstant.id:
return
@@ -249,7 +252,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_update(self, before: Role, after: Role):
+ async def on_guild_role_update(self, before: Role, after: Role) -> None:
+ """Log role update event to mod log."""
if before.guild.id != GuildConstant.id:
return
@@ -301,7 +305,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_update(self, before: Guild, after: Guild):
+ async def on_guild_update(self, before: Guild, after: Guild) -> None:
+ """Log guild update event to mod log."""
if before.id != GuildConstant.id:
return
@@ -351,7 +356,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_ban(self, guild: Guild, member: Union[Member, User]):
+ async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None:
+ """Log ban event to mod log."""
if guild.id != GuildConstant.id:
return
@@ -367,7 +373,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
+ """Log member join event to user log."""
if member.guild.id != GuildConstant.id:
return
@@ -388,7 +395,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_remove(self, member: Member):
+ async def on_member_remove(self, member: Member) -> None:
+ """Log member leave event to user log."""
if member.guild.id != GuildConstant.id:
return
@@ -404,7 +412,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_unban(self, guild: Guild, member: User):
+ async def on_member_unban(self, guild: Guild, member: User) -> None:
+ """Log member unban event to mod log."""
if guild.id != GuildConstant.id:
return
@@ -420,7 +429,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_update(self, before: Member, after: Member):
+ async def on_member_update(self, before: Member, after: Member) -> None:
+ """Log member update event to user log."""
if before.guild.id != GuildConstant.id:
return
@@ -510,7 +520,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_message_delete(self, message: Message):
+ async def on_message_delete(self, message: Message) -> None:
+ """Log message delete event to message change log."""
channel = message.channel
author = message.author
@@ -565,7 +576,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_raw_message_delete(self, event: RawMessageDeleteEvent):
+ async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None:
+ """Log raw message delete event to message change log."""
if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored:
return
@@ -605,7 +617,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_message_edit(self, before: Message, after: Message):
+ async def on_message_edit(self, before: Message, after: Message) -> None:
+ """Log message edit event to message change log."""
if (
not before.guild
or before.guild.id != GuildConstant.id
@@ -679,7 +692,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_raw_message_edit(self, event: RawMessageUpdateEvent):
+ async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None:
+ """Log raw message edit event to message change log."""
try:
channel = self.bot.get_channel(int(event.data["channel_id"]))
message = await channel.fetch_message(event.message_id)
@@ -748,6 +762,7 @@ class ModLog(Cog, name="ModLog"):
)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Mod log cog load."""
bot.add_cog(ModLog(bot))
log.info("Cog loaded: ModLog")
diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py
index 1f6ed80b5..8f1af347a 100644
--- a/bot/cogs/off_topic_names.py
+++ b/bot/cogs/off_topic_names.py
@@ -19,7 +19,8 @@ class OffTopicName(Converter):
"""A converter that ensures an added off-topic name is valid."""
@staticmethod
- async def convert(ctx: Context, argument: str):
+ async def convert(ctx: Context, argument: str) -> str:
+ """Attempt to replace any invalid characters with their approximate Unicode equivalent."""
allowed_characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ!?'`-"
if not (2 <= len(argument) <= 96):
@@ -38,16 +39,8 @@ class OffTopicName(Converter):
return argument.translate(table)
-async def update_names(bot: Bot):
- """
- The background updater task that performs a channel name update daily.
-
- Args:
- bot (Bot):
- The running bot instance, used for fetching data from the
- website via the bot's `api_client`.
- """
-
+async def update_names(bot: Bot) -> None:
+ """Background updater task that performs the daily channel name update."""
while True:
# Since we truncate the compute timedelta to seconds, we add one second to ensure
# we go past midnight in the `seconds_to_sleep` set below.
@@ -77,26 +70,27 @@ class OffTopicNames(Cog):
self.bot = bot
self.updater_task = None
- def cog_unload(self):
+ def cog_unload(self) -> None:
+ """Cancel any running updater tasks on cog unload."""
if self.updater_task is not None:
self.updater_task.cancel()
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Start off-topic channel updating event loop if it hasn't already started."""
if self.updater_task is None:
coro = update_names(self.bot)
self.updater_task = self.bot.loop.create_task(coro)
@group(name='otname', aliases=('otnames', 'otn'), invoke_without_command=True)
@with_role(*MODERATION_ROLES)
- async def otname_group(self, ctx):
+ async def otname_group(self, ctx: Context) -> None:
"""Add or list items from the off-topic channel name rotation."""
-
await ctx.invoke(self.bot.get_command("help"), "otname")
@otname_group.command(name='add', aliases=('a',))
@with_role(*MODERATION_ROLES)
- async def add_command(self, ctx, *names: OffTopicName):
+ async def add_command(self, ctx: Context, *names: OffTopicName) -> None:
"""Adds a new off-topic name to the rotation."""
# Chain multiple words to a single one
name = "-".join(names)
@@ -110,7 +104,7 @@ class OffTopicNames(Cog):
@otname_group.command(name='delete', aliases=('remove', 'rm', 'del', 'd'))
@with_role(*MODERATION_ROLES)
- async def delete_command(self, ctx, *names: OffTopicName):
+ async def delete_command(self, ctx: Context, *names: OffTopicName) -> None:
"""Removes a off-topic name from the rotation."""
# Chain multiple words to a single one
name = "-".join(names)
@@ -124,12 +118,12 @@ class OffTopicNames(Cog):
@otname_group.command(name='list', aliases=('l',))
@with_role(*MODERATION_ROLES)
- async def list_command(self, ctx):
+ async def list_command(self, ctx: Context) -> None:
"""
Lists all currently known off-topic channel names in a paginator.
+
Restricted to Moderator and above to not spoil the surprise.
"""
-
result = await self.bot.api_client.get('bot/off-topic-channel-names')
lines = sorted(f"• {name}" for name in result)
embed = Embed(
@@ -144,11 +138,8 @@ class OffTopicNames(Cog):
@otname_group.command(name='search', aliases=('s',))
@with_role(*MODERATION_ROLES)
- async def search_command(self, ctx, *, query: OffTopicName):
- """
- Search for an off-topic name.
- """
-
+ async def search_command(self, ctx: Context, *, query: OffTopicName) -> None:
+ """Search for an off-topic name."""
result = await self.bot.api_client.get('bot/off-topic-channel-names')
in_matches = {name for name in result if query in name}
close_matches = difflib.get_close_matches(query, result, n=10, cutoff=0.70)
@@ -165,6 +156,7 @@ class OffTopicNames(Cog):
await ctx.send(embed=embed)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Off topic names cog load."""
bot.add_cog(OffTopicNames(bot))
log.info("Cog loaded: OffTopicNames")
diff --git a/bot/cogs/reddit.py b/bot/cogs/reddit.py
index 4c561b7e8..63a57c5c6 100644
--- a/bot/cogs/reddit.py
+++ b/bot/cogs/reddit.py
@@ -3,8 +3,9 @@ import logging
import random
import textwrap
from datetime import datetime, timedelta
+from typing import List
-from discord import Colour, Embed, TextChannel
+from discord import Colour, Embed, Message, TextChannel
from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Channels, ERROR_REPLIES, Reddit as RedditConfig, STAFF_ROLES
@@ -16,9 +17,7 @@ log = logging.getLogger(__name__)
class Reddit(Cog):
- """
- Track subreddit posts and show detailed statistics about them.
- """
+ """Track subreddit posts and show detailed statistics about them."""
HEADERS = {"User-Agent": "Discord Bot: PythonDiscord (https://pythondiscord.com/)"}
URL = "https://www.reddit.com"
@@ -34,11 +33,8 @@ class Reddit(Cog):
self.new_posts_task = None
self.top_weekly_posts_task = None
- async def fetch_posts(self, route: str, *, amount: int = 25, params=None):
- """
- A helper method to fetch a certain amount of Reddit posts at a given route.
- """
-
+ async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]:
+ """A helper method to fetch a certain amount of Reddit posts at a given route."""
# Reddit's JSON responses only provide 25 posts at most.
if not 25 >= amount > 0:
raise ValueError("Invalid amount of subreddit posts requested.")
@@ -57,11 +53,10 @@ class Reddit(Cog):
return posts[:amount]
- async def send_top_posts(self, channel: TextChannel, subreddit: Subreddit, content=None, time="all"):
- """
- Create an embed for the top posts, then send it in a given TextChannel.
- """
-
+ async def send_top_posts(
+ self, channel: TextChannel, subreddit: Subreddit, content: str = None, time: str = "all"
+ ) -> Message:
+ """Create an embed for the top posts, then send it in a given TextChannel."""
# Create the new spicy embed.
embed = Embed()
embed.description = ""
@@ -115,11 +110,8 @@ class Reddit(Cog):
embed=embed
)
- async def poll_new_posts(self):
- """
- Periodically search for new subreddit posts.
- """
-
+ async def poll_new_posts(self) -> None:
+ """Periodically search for new subreddit posts."""
while True:
await asyncio.sleep(RedditConfig.request_delay)
@@ -179,11 +171,8 @@ class Reddit(Cog):
log.trace(f"Sent {len(new_posts)} new {subreddit} posts to channel {self.reddit_channel.id}.")
- async def poll_top_weekly_posts(self):
- """
- Post a summary of the top posts every week.
- """
-
+ async def poll_top_weekly_posts(self) -> None:
+ """Post a summary of the top posts every week."""
while True:
now = datetime.utcnow()
@@ -214,19 +203,13 @@ class Reddit(Cog):
await message.pin()
@group(name="reddit", invoke_without_command=True)
- async def reddit_group(self, ctx: Context):
- """
- View the top posts from various subreddits.
- """
-
+ async def reddit_group(self, ctx: Context) -> None:
+ """View the top posts from various subreddits."""
await ctx.invoke(self.bot.get_command("help"), "reddit")
@reddit_group.command(name="top")
- async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python"):
- """
- Send the top posts of all time from a given subreddit.
- """
-
+ async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of all time from a given subreddit."""
await self.send_top_posts(
channel=ctx.channel,
subreddit=subreddit,
@@ -235,11 +218,8 @@ class Reddit(Cog):
)
@reddit_group.command(name="daily")
- async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python"):
- """
- Send the top posts of today from a given subreddit.
- """
-
+ async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of today from a given subreddit."""
await self.send_top_posts(
channel=ctx.channel,
subreddit=subreddit,
@@ -248,11 +228,8 @@ class Reddit(Cog):
)
@reddit_group.command(name="weekly")
- async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python"):
- """
- Send the top posts of this week from a given subreddit.
- """
-
+ async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of this week from a given subreddit."""
await self.send_top_posts(
channel=ctx.channel,
subreddit=subreddit,
@@ -262,11 +239,8 @@ class Reddit(Cog):
@with_role(*STAFF_ROLES)
@reddit_group.command(name="subreddits", aliases=("subs",))
- async def subreddits_command(self, ctx: Context):
- """
- Send a paginated embed of all the subreddits we're relaying.
- """
-
+ async def subreddits_command(self, ctx: Context) -> None:
+ """Send a paginated embed of all the subreddits we're relaying."""
embed = Embed()
embed.title = "Relayed subreddits."
embed.colour = Colour.blurple()
@@ -280,7 +254,8 @@ class Reddit(Cog):
)
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Initiate reddit post event loop."""
self.reddit_channel = await self.bot.fetch_channel(Channels.reddit)
if self.reddit_channel is not None:
@@ -292,6 +267,7 @@ class Reddit(Cog):
log.warning("Couldn't locate a channel for subreddit relaying.")
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Reddit cog load."""
bot.add_cog(Reddit(bot))
log.info("Cog loaded: Reddit")
diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py
index c6ae984ea..8460de91f 100644
--- a/bot/cogs/reminders.py
+++ b/bot/cogs/reminders.py
@@ -4,9 +4,10 @@ import random
import textwrap
from datetime import datetime
from operator import itemgetter
+from typing import Optional
from dateutil.relativedelta import relativedelta
-from discord import Colour, Embed
+from discord import Colour, Embed, Message
from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Channels, Icons, NEGATIVE_REPLIES, POSITIVE_REPLIES, STAFF_ROLES
@@ -23,14 +24,15 @@ MAXIMUM_REMINDERS = 5
class Reminders(Scheduler, Cog):
+ """Provide in-channel reminder functionality."""
def __init__(self, bot: Bot):
self.bot = bot
super().__init__()
@Cog.listener()
- async def on_ready(self):
- # Get all the current reminders for re-scheduling
+ async def on_ready(self) -> None:
+ """Get all current reminders from the API and reschedule them."""
response = await self.bot.api_client.get(
'bot/reminders',
params={'active': 'true'}
@@ -51,25 +53,16 @@ class Reminders(Scheduler, Cog):
self.schedule_task(loop, reminder["id"], reminder)
@staticmethod
- async def _send_confirmation(ctx: Context, on_success: str):
- """
- Send an embed confirming the change was made successfully.
- """
-
+ async def _send_confirmation(ctx: Context, on_success: str) -> None:
+ """Send an embed confirming the reminder change was made successfully."""
embed = Embed()
embed.colour = Colour.green()
embed.title = random.choice(POSITIVE_REPLIES)
embed.description = on_success
await ctx.send(embed=embed)
- async def _scheduled_task(self, reminder: dict):
- """
- A coroutine which sends the reminder once the time is reached.
-
- :param reminder: the data of the reminder.
- :return:
- """
-
+ async def _scheduled_task(self, reminder: dict) -> None:
+ """A coroutine which sends the reminder once the time is reached, and cancels the running task."""
reminder_id = reminder["id"]
reminder_datetime = datetime.fromisoformat(reminder['expiration'][:-1])
@@ -83,38 +76,22 @@ class Reminders(Scheduler, Cog):
# Now we can begone with it from our schedule list.
self.cancel_task(reminder_id)
- async def _delete_reminder(self, reminder_id: str):
- """
- Delete a reminder from the database, given its ID.
-
- :param reminder_id: The ID of the reminder.
- """
-
+ async def _delete_reminder(self, reminder_id: str) -> None:
+ """Delete a reminder from the database, given its ID, and cancel the running task."""
await self.bot.api_client.delete('bot/reminders/' + str(reminder_id))
# Now we can remove it from the schedule list
self.cancel_task(reminder_id)
- async def _reschedule_reminder(self, reminder):
- """
- Reschedule a reminder object.
-
- :param reminder: The reminder to be rescheduled.
- """
-
+ async def _reschedule_reminder(self, reminder: dict) -> None:
+ """Reschedule a reminder object."""
loop = asyncio.get_event_loop()
self.cancel_task(reminder["id"])
self.schedule_task(loop, reminder["id"], reminder)
- async def send_reminder(self, reminder, late: relativedelta = None):
- """
- Send the reminder.
-
- :param reminder: The data about the reminder.
- :param late: How late the reminder is (if at all)
- """
-
+ async def send_reminder(self, reminder: dict, late: relativedelta = None) -> None:
+ """Send the reminder."""
channel = self.bot.get_channel(reminder["channel_id"])
user = self.bot.get_user(reminder["author"])
@@ -141,19 +118,17 @@ class Reminders(Scheduler, Cog):
await self._delete_reminder(reminder["id"])
@group(name="remind", aliases=("reminder", "reminders"), invoke_without_command=True)
- async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str):
- """
- Commands for managing your reminders.
- """
-
+ async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> None:
+ """Commands for managing your reminders."""
await ctx.invoke(self.new_reminder, expiration=expiration, content=content)
@remind_group.command(name="new", aliases=("add", "create"))
- async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str):
+ async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> Optional[Message]:
"""
Set yourself a simple reminder.
- """
+ Expiration is parsed per: http://strftime.org/
+ """
embed = Embed()
# If the user is not staff, we need to verify whether or not to make a reminder at all.
@@ -204,11 +179,8 @@ class Reminders(Scheduler, Cog):
self.schedule_task(loop, reminder["id"], reminder)
@remind_group.command(name="list")
- async def list_reminders(self, ctx: Context):
- """
- View a paginated embed of all reminders for your user.
- """
-
+ async def list_reminders(self, ctx: Context) -> Optional[Message]:
+ """View a paginated embed of all reminders for your user."""
# Get all the user's reminders from the database.
data = await self.bot.api_client.get(
'bot/reminders',
@@ -260,19 +232,17 @@ class Reminders(Scheduler, Cog):
)
@remind_group.group(name="edit", aliases=("change", "modify"), invoke_without_command=True)
- async def edit_reminder_group(self, ctx: Context):
- """
- Commands for modifying your current reminders.
- """
-
+ async def edit_reminder_group(self, ctx: Context) -> None:
+ """Commands for modifying your current reminders."""
await ctx.invoke(self.bot.get_command("help"), "reminders", "edit")
@edit_reminder_group.command(name="duration", aliases=("time",))
- async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate):
- """
- Edit one of your reminders' expiration.
+ async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate) -> None:
"""
+ Edit one of your reminder's expiration.
+ Expiration is parsed per: http://strftime.org/
+ """
# Send the request to update the reminder in the database
reminder = await self.bot.api_client.patch(
'bot/reminders/' + str(id_),
@@ -287,11 +257,8 @@ class Reminders(Scheduler, Cog):
await self._reschedule_reminder(reminder)
@edit_reminder_group.command(name="content", aliases=("reason",))
- async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str):
- """
- Edit one of your reminders' content.
- """
-
+ async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str) -> None:
+ """Edit one of your reminder's content."""
# Send the request to update the reminder in the database
reminder = await self.bot.api_client.patch(
'bot/reminders/' + str(id_),
@@ -305,17 +272,15 @@ class Reminders(Scheduler, Cog):
await self._reschedule_reminder(reminder)
@remind_group.command("delete", aliases=("remove",))
- async def delete_reminder(self, ctx: Context, id_: int):
- """
- Delete one of your active reminders.
- """
-
+ async def delete_reminder(self, ctx: Context, id_: int) -> None:
+ """Delete one of your active reminders."""
await self._delete_reminder(id_)
await self._send_confirmation(
ctx, on_success="That reminder has been deleted successfully!"
)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Reminders cog load."""
bot.add_cog(Reminders(bot))
log.info("Cog loaded: Reminders")
diff --git a/bot/cogs/security.py b/bot/cogs/security.py
index e02e91530..316b33d6b 100644
--- a/bot/cogs/security.py
+++ b/bot/cogs/security.py
@@ -6,24 +6,25 @@ log = logging.getLogger(__name__)
class Security(Cog):
- """
- Security-related helpers
- """
+ """Security-related helpers."""
def __init__(self, bot: Bot):
self.bot = bot
self.bot.check(self.check_not_bot) # Global commands check - no bots can run any commands at all
self.bot.check(self.check_on_guild) # Global commands check - commands can't be run in a DM
- def check_not_bot(self, ctx: Context):
+ def check_not_bot(self, ctx: Context) -> bool:
+ """Check if the context is a bot user."""
return not ctx.author.bot
- def check_on_guild(self, ctx: Context):
+ def check_on_guild(self, ctx: Context) -> bool:
+ """Check if the context is in a guild."""
if ctx.guild is None:
raise NoPrivateMessage("This command cannot be used in private messages.")
return True
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Security cog load."""
bot.add_cog(Security(bot))
log.info("Cog loaded: Security")
diff --git a/bot/cogs/site.py b/bot/cogs/site.py
index 4d5b2e811..4a423faa9 100644
--- a/bot/cogs/site.py
+++ b/bot/cogs/site.py
@@ -19,15 +19,13 @@ class Site(Cog):
self.bot = bot
@group(name="site", aliases=("s",), invoke_without_command=True)
- async def site_group(self, ctx):
+ async def site_group(self, ctx: Context) -> None:
"""Commands for getting info about our website."""
-
await ctx.invoke(self.bot.get_command("help"), "site")
@site_group.command(name="home", aliases=("about",))
- async def site_main(self, ctx: Context):
+ async def site_main(self, ctx: Context) -> None:
"""Info about the website itself."""
-
url = f"{URLs.site_schema}{URLs.site}/"
embed = Embed(title="Python Discord website")
@@ -43,9 +41,8 @@ class Site(Cog):
await ctx.send(embed=embed)
@site_group.command(name="resources")
- async def site_resources(self, ctx: Context):
+ async def site_resources(self, ctx: Context) -> None:
"""Info about the site's Resources page."""
-
learning_url = f"{PAGES_URL}/resources"
tools_url = f"{PAGES_URL}/tools"
@@ -63,9 +60,8 @@ class Site(Cog):
await ctx.send(embed=embed)
@site_group.command(name="help")
- async def site_help(self, ctx: Context):
+ async def site_help(self, ctx: Context) -> None:
"""Info about the site's Getting Help page."""
-
url = f"{PAGES_URL}/asking-good-questions"
embed = Embed(title="Asking Good Questions")
@@ -80,9 +76,8 @@ class Site(Cog):
await ctx.send(embed=embed)
@site_group.command(name="faq")
- async def site_faq(self, ctx: Context):
+ async def site_faq(self, ctx: Context) -> None:
"""Info about the site's FAQ page."""
-
url = f"{PAGES_URL}/frequently-asked-questions"
embed = Embed(title="FAQ")
@@ -99,14 +94,8 @@ class Site(Cog):
@site_group.command(aliases=['r', 'rule'], name='rules')
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def site_rules(self, ctx: Context, *rules: int):
- """
- Provides a link to the `rules` endpoint of the website, or displays
- specific rules, if they are requested.
-
- **`ctx`:** The Discord message context
- **`rules`:** The rules a user wants to get.
- """
+ async def site_rules(self, ctx: Context, *rules: int) -> None:
+ """Provides a link to all rules or, if specified, displays specific rule(s)."""
rules_embed = Embed(title='Rules', color=Colour.blurple())
rules_embed.url = f"{PAGES_URL}/rules"
@@ -138,6 +127,7 @@ class Site(Cog):
await LinePaginator.paginate(final_rules, ctx, rules_embed, max_lines=3)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Site cog load."""
bot.add_cog(Site(bot))
log.info("Cog loaded: Site")
diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py
index d36c0795d..5accbdb5e 100644
--- a/bot/cogs/snekbox.py
+++ b/bot/cogs/snekbox.py
@@ -37,9 +37,7 @@ MAX_PASTE_LEN = 1000
class Snekbox(Cog):
- """
- Safe evaluation of Python code using Snekbox
- """
+ """Safe evaluation of Python code using Snekbox."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -169,7 +167,7 @@ class Snekbox(Cog):
@command(name="eval", aliases=("e",))
@guild_only()
@in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
- async def eval_command(self, ctx: Context, *, code: str = None):
+ async def eval_command(self, ctx: Context, *, code: str = None) -> None:
"""
Run Python code and get the results.
@@ -178,13 +176,15 @@ class Snekbox(Cog):
issue with it!
"""
if ctx.author.id in self.jobs:
- return await ctx.send(
+ await ctx.send(
f"{ctx.author.mention} You've already got a job running - "
f"please wait for it to finish!"
)
+ return
if not code: # None or empty string
- return await ctx.invoke(self.bot.get_command("help"), "eval")
+ await ctx.invoke(self.bot.get_command("help"), "eval")
+ return
log.info(
f"Received code from {ctx.author.name}#{ctx.author.discriminator} "
@@ -221,6 +221,7 @@ class Snekbox(Cog):
del self.jobs[ctx.author.id]
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Snekbox cog load."""
bot.add_cog(Snekbox(bot))
log.info("Cog loaded: Snekbox")
diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/superstarify/__init__.py
index e9743a2f5..f7d6a269d 100644
--- a/bot/cogs/superstarify/__init__.py
+++ b/bot/cogs/superstarify/__init__.py
@@ -19,30 +19,30 @@ NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#wiki-toc-nickname-
class Superstarify(Cog):
- """
- A set of commands to moderate terrible nicknames.
- """
+ """A set of commands to moderate terrible nicknames."""
def __init__(self, bot: Bot):
self.bot = bot
@property
def moderation(self) -> Moderation:
+ """Get currently loaded Moderation cog instance."""
return self.bot.get_cog("Moderation")
@property
def modlog(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_member_update(self, before: Member, after: Member):
+ async def on_member_update(self, before: Member, after: Member) -> None:
"""
This event will trigger when someone changes their name.
- At this point we will look up the user in our database and check
- whether they are allowed to change their names, or if they are in
- superstar-prison. If they are not allowed, we will change it back.
- """
+ At this point we will look up the user in our database and check whether they are allowed to
+ change their names, or if they are in superstar-prison. If they are not allowed, we will
+ change it back.
+ """
if before.display_name == after.display_name:
return # User didn't change their nickname. Abort!
@@ -93,14 +93,13 @@ class Superstarify(Cog):
)
@Cog.listener()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
"""
This event will trigger when someone (re)joins the server.
- At this point we will look up the user in our database and check
- whether they are in superstar-prison. If so, we will change their name
- back to the forced nickname.
- """
+ At this point we will look up the user in our database and check whether they are in
+ superstar-prison. If so, we will change their name back to the forced nickname.
+ """
active_superstarifies = await self.bot.api_client.get(
'bot/infractions',
params={
@@ -155,13 +154,14 @@ class Superstarify(Cog):
@with_role(*MODERATION_ROLES)
async def superstarify(
self, ctx: Context, member: Member, expiration: ExpirationDate, reason: str = None
- ):
+ ) -> None:
"""
- This command will force a random superstar name (like Taylor Swift) to be the user's
- nickname for a specified duration. An optional reason can be provided.
+ Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration.
+
+ An optional reason can be provided.
+
If no reason is given, the original name will be shown in a generated reason.
"""
-
active_superstarifies = await self.bot.api_client.get(
'bot/infractions',
params={
@@ -171,10 +171,11 @@ class Superstarify(Cog):
}
)
if active_superstarifies:
- return await ctx.send(
+ await ctx.send(
":x: According to my records, this user is already superstarified. "
f"See infraction **#{active_superstarifies[0]['id']}**."
)
+ return
infraction = await post_infraction(
ctx, member,
@@ -224,15 +225,8 @@ class Superstarify(Cog):
@command(name='unsuperstarify', aliases=('release_nick', 'unstar'))
@with_role(*MODERATION_ROLES)
- async def unsuperstarify(self, ctx: Context, member: Member):
- """
- This command will remove the entry from our database, allowing the user
- to once again change their nickname.
-
- :param ctx: Discord message context
- :param member: The member to unsuperstarify
- """
-
+ async def unsuperstarify(self, ctx: Context, member: Member) -> None:
+ """Remove the superstarify entry from our database, allowing the user to change their nickname."""
log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}")
embed = Embed()
@@ -247,9 +241,8 @@ class Superstarify(Cog):
}
)
if not active_superstarifies:
- return await ctx.send(
- ":x: There is no active superstarify infraction for this user."
- )
+ await ctx.send(":x: There is no active superstarify infraction for this user.")
+ return
[infraction] = active_superstarifies
await self.bot.api_client.patch(
@@ -270,6 +263,7 @@ class Superstarify(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Superstarify cog load."""
bot.add_cog(Superstarify(bot))
log.info("Cog loaded: Superstarify")
diff --git a/bot/cogs/superstarify/stars.py b/bot/cogs/superstarify/stars.py
index 9b49d7175..dbac86770 100644
--- a/bot/cogs/superstarify/stars.py
+++ b/bot/cogs/superstarify/stars.py
@@ -81,6 +81,7 @@ STAR_NAMES = (
)
-def get_nick(infraction_id, member_id):
+def get_nick(infraction_id: int, member_id: int) -> str:
+ """Randomly select a nickname from the Superstarify nickname list."""
rng = random.Random(str(infraction_id) + str(member_id))
return rng.choice(STAR_NAMES)
diff --git a/bot/cogs/sync/__init__.py b/bot/cogs/sync/__init__.py
index e4f960620..d4565f848 100644
--- a/bot/cogs/sync/__init__.py
+++ b/bot/cogs/sync/__init__.py
@@ -1,10 +1,13 @@
import logging
+from discord.ext.commands import Bot
+
from .cog import Sync
log = logging.getLogger(__name__)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Sync cog load."""
bot.add_cog(Sync(bot))
log.info("Cog loaded: Sync")
diff --git a/bot/cogs/sync/cog.py b/bot/cogs/sync/cog.py
index 9a3a48bba..b75fb26cd 100644
--- a/bot/cogs/sync/cog.py
+++ b/bot/cogs/sync/cog.py
@@ -177,7 +177,6 @@ class Sync(Cog):
@commands.has_permissions(administrator=True)
async def sync_roles_command(self, ctx: Context) -> None:
"""Manually synchronize the guild's roles with the roles on the site."""
-
initial_response = await ctx.send("📊 Synchronizing roles.")
total_created, total_updated, total_deleted = await syncers.sync_roles(self.bot, ctx.guild)
await initial_response.edit(
@@ -191,7 +190,6 @@ class Sync(Cog):
@commands.has_permissions(administrator=True)
async def sync_users_command(self, ctx: Context) -> None:
"""Manually synchronize the guild's users with the users on the site."""
-
initial_response = await ctx.send("📊 Synchronizing users.")
total_created, total_updated, total_deleted = await syncers.sync_users(self.bot, ctx.guild)
await initial_response.edit(
diff --git a/bot/cogs/sync/syncers.py b/bot/cogs/sync/syncers.py
index 414c24adb..2cc5a66e1 100644
--- a/bot/cogs/sync/syncers.py
+++ b/bot/cogs/sync/syncers.py
@@ -34,7 +34,6 @@ def get_roles_for_sync(
to be deleted on the site, meaning the roles are present on
the API but not in the cached guild.
"""
-
guild_role_ids = {role.id for role in guild_roles}
api_role_ids = {role.id for role in api_roles}
new_role_ids = guild_role_ids - api_role_ids
@@ -66,7 +65,6 @@ async def sync_roles(bot: Bot, guild: Guild) -> Tuple[int, int, int]:
(element `0`) , how many roles were updated (element `1`), and how many
roles were deleted (element `2`) on the API.
"""
-
roles = await bot.api_client.get('bot/roles')
# Pack API roles and guild roles into one common format,
@@ -138,7 +136,6 @@ def get_users_for_sync(
guild, but where the attribute of a user on the API is not
equal to the attribute of the user on the guild.
"""
-
users_to_create = set()
users_to_update = set()
@@ -169,8 +166,7 @@ def get_users_for_sync(
async def sync_users(bot: Bot, guild: Guild) -> Tuple[int, int, None]:
"""
- Synchronize users found on the given
- `guild` with the ones on the API.
+ Synchronize users found in the given `guild` with the ones in the API.
Arguments:
bot (discord.ext.commands.Bot):
@@ -186,7 +182,6 @@ async def sync_users(bot: Bot, guild: Guild) -> Tuple[int, int, None]:
(element `0`) and how many users were updated (element `1`), and `None`
to indicate that a user sync never deletes entries from the API.
"""
-
current_users = await bot.api_client.get('bot/users')
# Pack API users and guild users into one common format,
diff --git a/bot/cogs/tags.py b/bot/cogs/tags.py
index 8e9ba5da3..660620284 100644
--- a/bot/cogs/tags.py
+++ b/bot/cogs/tags.py
@@ -20,41 +20,26 @@ TEST_CHANNELS = (
class Tags(Cog):
- """
- Save new tags and fetch existing tags.
- """
+ """Save new tags and fetch existing tags."""
def __init__(self, bot: Bot):
self.bot = bot
self.tag_cooldowns = {}
@group(name='tags', aliases=('tag', 't'), hidden=True, invoke_without_command=True)
- async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None):
+ async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
"""Show all known tags, a single tag, or run a subcommand."""
-
await ctx.invoke(self.get_command, tag_name=tag_name)
@tags_group.command(name='get', aliases=('show', 'g'))
- async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None):
- """
- Get a list of all tags or a specified tag.
-
- :param ctx: Discord message context
- :param tag_name:
- If provided, this function shows data for that specific tag.
- If not provided, this function shows the caller a list of all tags.
- """
-
- def _command_on_cooldown(tag_name) -> bool:
+ async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
+ """Get a specified tag, or a list of all tags if no tag is specified."""
+ def _command_on_cooldown(tag_name: str) -> bool:
"""
- Check if the command is currently on cooldown.
- The cooldown duration is set in constants.py.
+ Check if the command is currently on cooldown, on a per-tag, per-channel basis.
- This works on a per-tag, per-channel basis.
- :param tag_name: The name of the command to check.
- :return: True if the command is cooling down. Otherwise False.
+ The cooldown duration is set in constants.py.
"""
-
now = time.time()
cooldown_conditions = (
@@ -109,15 +94,8 @@ class Tags(Cog):
tag_name: TagNameConverter,
*,
tag_content: TagContentConverter,
- ):
- """
- Create a new tag or update an existing one.
-
- :param ctx: discord message context
- :param tag_name: The name of the tag to create or edit.
- :param tag_content: The content of the tag.
- """
-
+ ) -> None:
+ """Create a new tag or update an existing one."""
body = {
'title': tag_name.lower().strip(),
'embed': {
@@ -140,14 +118,8 @@ class Tags(Cog):
@tags_group.command(name='delete', aliases=('remove', 'rm', 'd'))
@with_role(Roles.admin, Roles.owner)
- async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter):
- """
- Remove a tag from the database.
-
- :param ctx: discord message context
- :param tag_name: The name of the tag to delete.
- """
-
+ async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter) -> None:
+ """Remove a tag from the database."""
await self.bot.api_client.delete(f'bot/tags/{tag_name}')
log.debug(f"{ctx.author} successfully deleted the tag called '{tag_name}'")
@@ -158,6 +130,7 @@ class Tags(Cog):
))
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Tags cog load."""
bot.add_cog(Tags(bot))
log.info("Cog loaded: Tags")
diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py
index 64bf126d6..7dd0afbbd 100644
--- a/bot/cogs/token_remover.py
+++ b/bot/cogs/token_remover.py
@@ -42,10 +42,16 @@ class TokenRemover(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_message(self, msg: Message):
+ async def on_message(self, msg: Message) -> None:
+ """
+ Check each message for a string that matches Discord's token pattern.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
if msg.author.bot:
return
@@ -82,6 +88,11 @@ class TokenRemover(Cog):
@staticmethod
def is_valid_user_id(b64_content: str) -> bool:
+ """
+ Check potential token to see if it contains a valid Discord user ID.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
b64_content += '=' * (-len(b64_content) % 4)
try:
@@ -92,6 +103,11 @@ class TokenRemover(Cog):
@staticmethod
def is_valid_timestamp(b64_content: str) -> bool:
+ """
+ Check potential token to see if it contains a valid timestamp.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
b64_content += '=' * (-len(b64_content) % 4)
try:
@@ -102,6 +118,7 @@ class TokenRemover(Cog):
return snowflake_time(snowflake + TOKEN_EPOCH) < DISCORD_EPOCH_TIMESTAMP
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Token Remover cog load."""
bot.add_cog(TokenRemover(bot))
log.info("Cog loaded: TokenRemover")
diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py
index 08e77a24e..62e2fb03f 100644
--- a/bot/cogs/utils.py
+++ b/bot/cogs/utils.py
@@ -5,7 +5,7 @@ from email.parser import HeaderParser
from io import StringIO
from discord import Colour, Embed
-from discord.ext.commands import AutoShardedBot, Cog, Context, command
+from discord.ext.commands import Bot, Cog, Context, command
from bot.constants import Channels, STAFF_ROLES
from bot.decorators import in_channel
@@ -14,26 +14,22 @@ log = logging.getLogger(__name__)
class Utils(Cog):
- """
- A selection of utilities which don't have a clear category.
- """
+ """A selection of utilities which don't have a clear category."""
- def __init__(self, bot: AutoShardedBot):
+ def __init__(self, bot: Bot):
self.bot = bot
self.base_pep_url = "http://www.python.org/dev/peps/pep-"
self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-"
@command(name='pep', aliases=('get_pep', 'p'))
- async def pep_command(self, ctx: Context, pep_number: str):
- """
- Fetches information about a PEP and sends it to the channel.
- """
-
+ async def pep_command(self, ctx: Context, pep_number: str) -> None:
+ """Fetches information about a PEP and sends it to the channel."""
if pep_number.isdigit():
pep_number = int(pep_number)
else:
- return await ctx.invoke(self.bot.get_command("help"), "pep")
+ await ctx.invoke(self.bot.get_command("help"), "pep")
+ return
# Newer PEPs are written in RST instead of txt
if pep_number > 542:
@@ -89,11 +85,8 @@ class Utils(Cog):
@command()
@in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
- async def charinfo(self, ctx, *, characters: str):
- """
- Shows you information on up to 25 unicode characters.
- """
-
+ async def charinfo(self, ctx: Context, *, characters: str) -> None:
+ """Shows you information on up to 25 unicode characters."""
match = re.match(r"<(a?):(\w+):(\d+)>", characters)
if match:
embed = Embed(
@@ -104,12 +97,14 @@ class Utils(Cog):
)
)
embed.colour = Colour.red()
- return await ctx.send(embed=embed)
+ await ctx.send(embed=embed)
+ return
if len(characters) > 25:
embed = Embed(title=f"Too many characters ({len(characters)}/25)")
embed.colour = Colour.red()
- return await ctx.send(embed=embed)
+ await ctx.send(embed=embed)
+ return
def get_info(char):
digit = f"{ord(char):x}"
@@ -133,6 +128,7 @@ class Utils(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Utils cog load."""
bot.add_cog(Utils(bot))
log.info("Cog loaded: Utils")
diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py
index c42d4d67e..b0c250603 100644
--- a/bot/cogs/verification.py
+++ b/bot/cogs/verification.py
@@ -29,19 +29,19 @@ If you'd like to unsubscribe from the announcement notifications, simply send `!
class Verification(Cog):
- """
- User verification and role self-management
- """
+ """User verification and role self-management."""
def __init__(self, bot: Bot):
self.bot = bot
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_message(self, message: Message):
+ async def on_message(self, message: Message) -> None:
+ """Check new message event for messages to the checkpoint channel & process."""
if message.author.bot:
return # They're a bot, ignore
@@ -75,11 +75,8 @@ class Verification(Cog):
@command(name='accept', aliases=('verify', 'verified', 'accepted'), hidden=True)
@without_role(Roles.verified)
@in_channel(Channels.verification)
- async def accept_command(self, ctx: Context, *_): # We don't actually care about the args
- """
- Accept our rules and gain access to the rest of the server
- """
-
+ async def accept_command(self, ctx: Context, *_) -> None: # We don't actually care about the args
+ """Accept our rules and gain access to the rest of the server."""
log.debug(f"{ctx.author} called !accept. Assigning the 'Developer' role.")
await ctx.author.add_roles(Object(Roles.verified), reason="Accepted the rules")
try:
@@ -98,11 +95,8 @@ class Verification(Cog):
@command(name='subscribe')
@in_channel(Channels.bot)
- async def subscribe_command(self, ctx: Context, *_): # We don't actually care about the args
- """
- Subscribe to announcement notifications by assigning yourself the role
- """
-
+ async def subscribe_command(self, ctx: Context, *_) -> None: # We don't actually care about the args
+ """Subscribe to announcement notifications by assigning yourself the role."""
has_role = False
for role in ctx.author.roles:
@@ -111,9 +105,8 @@ class Verification(Cog):
break
if has_role:
- return await ctx.send(
- f"{ctx.author.mention} You're already subscribed!",
- )
+ await ctx.send(f"{ctx.author.mention} You're already subscribed!")
+ return
log.debug(f"{ctx.author} called !subscribe. Assigning the 'Announcements' role.")
await ctx.author.add_roles(Object(Roles.announcements), reason="Subscribed to announcements")
@@ -126,11 +119,8 @@ class Verification(Cog):
@command(name='unsubscribe')
@in_channel(Channels.bot)
- async def unsubscribe_command(self, ctx: Context, *_): # We don't actually care about the args
- """
- Unsubscribe from announcement notifications by removing the role from yourself
- """
-
+ async def unsubscribe_command(self, ctx: Context, *_) -> None: # We don't actually care about the args
+ """Unsubscribe from announcement notifications by removing the role from yourself."""
has_role = False
for role in ctx.author.roles:
@@ -139,9 +129,8 @@ class Verification(Cog):
break
if not has_role:
- return await ctx.send(
- f"{ctx.author.mention} You're already unsubscribed!"
- )
+ await ctx.send(f"{ctx.author.mention} You're already unsubscribed!")
+ return
log.debug(f"{ctx.author} called !unsubscribe. Removing the 'Announcements' role.")
await ctx.author.remove_roles(Object(Roles.announcements), reason="Unsubscribed from announcements")
@@ -153,23 +142,21 @@ class Verification(Cog):
)
@staticmethod
- async def cog_command_error(ctx: Context, error):
+ async def cog_command_error(ctx: Context, error: Exception) -> None:
+ """Check for & ignore any InChannelCheckFailure."""
if isinstance(error, InChannelCheckFailure):
- # Do nothing; just ignore this error
error.handled = True
@staticmethod
- def bot_check(ctx: Context):
- """
- Block any command within the verification channel that is not !accept.
- """
-
+ def bot_check(ctx: Context) -> bool:
+ """Block any command within the verification channel that is not !accept."""
if ctx.channel.id == Channels.verification:
return ctx.command.name == "accept"
else:
return True
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Verification cog load."""
bot.add_cog(Verification(bot))
log.info("Cog loaded: Verification")
diff --git a/bot/cogs/watchchannels/__init__.py b/bot/cogs/watchchannels/__init__.py
index ac7713803..86e1050fa 100644
--- a/bot/cogs/watchchannels/__init__.py
+++ b/bot/cogs/watchchannels/__init__.py
@@ -1,5 +1,7 @@
import logging
+from discord.ext.commands import Bot
+
from .bigbrother import BigBrother
from .talentpool import TalentPool
@@ -7,7 +9,8 @@ from .talentpool import TalentPool
log = logging.getLogger(__name__)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Monitoring cogs load."""
bot.add_cog(BigBrother(bot))
log.info("Cog loaded: BigBrother")
diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py
index 338b6c4ad..e191c2dbc 100644
--- a/bot/cogs/watchchannels/bigbrother.py
+++ b/bot/cogs/watchchannels/bigbrother.py
@@ -3,7 +3,7 @@ from collections import ChainMap
from typing import Union
from discord import User
-from discord.ext.commands import Cog, Context, group
+from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Channels, Roles, Webhooks
from bot.decorators import with_role
@@ -16,7 +16,7 @@ log = logging.getLogger(__name__)
class BigBrother(WatchChannel, Cog, name="Big Brother"):
"""Monitors users by relaying their messages to a watch channel to assist with moderation."""
- def __init__(self, bot) -> None:
+ def __init__(self, bot: Bot) -> None:
super().__init__(
bot,
destination=Channels.big_brother_logs,
diff --git a/bot/cogs/watchchannels/talentpool.py b/bot/cogs/watchchannels/talentpool.py
index 4452d7a59..ffe7693a9 100644
--- a/bot/cogs/watchchannels/talentpool.py
+++ b/bot/cogs/watchchannels/talentpool.py
@@ -4,7 +4,7 @@ from collections import ChainMap
from typing import Union
from discord import Color, Embed, Member, User
-from discord.ext.commands import Cog, Context, group
+from discord.ext.commands import Bot, Cog, Context, group
from bot.api import ResponseCodeError
from bot.constants import Channels, Guild, Roles, Webhooks
@@ -19,7 +19,7 @@ STAFF_ROLES = Roles.owner, Roles.admin, Roles.moderator, Roles.helpers # <- I
class TalentPool(WatchChannel, Cog, name="Talentpool"):
"""Relays messages of helper candidates to a watch channel to observe them."""
- def __init__(self, bot) -> None:
+ def __init__(self, bot: Bot) -> None:
super().__init__(
bot,
destination=Channels.talent_pool,
@@ -33,7 +33,6 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
@with_role(Roles.owner, Roles.admin, Roles.moderator)
async def nomination_group(self, ctx: Context) -> None:
"""Highlights the activity of helper nominees by relaying their messages to the talent pool channel."""
-
await ctx.invoke(self.bot.get_command("help"), "talentpool")
@nomination_group.command(name='watched', aliases=('all', 'list'))
@@ -156,7 +155,6 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
@with_role(Roles.owner, Roles.admin, Roles.moderator)
async def nomination_edit_group(self, ctx: Context) -> None:
"""Commands to edit nominations."""
-
await ctx.invoke(self.bot.get_command("help"), "talentpool", "edit")
@nomination_edit_group.command(name='reason')
diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py
index c34b0d5bb..e78282900 100644
--- a/bot/cogs/watchchannels/watchchannel.py
+++ b/bot/cogs/watchchannels/watchchannel.py
@@ -42,6 +42,8 @@ def proxy_user(user_id: str) -> Object:
@dataclass
class MessageHistory:
+ """Represents a watch channel's message history."""
+
last_author: Optional[int] = None
last_channel: Optional[int] = None
message_count: int = 0
@@ -51,7 +53,15 @@ class WatchChannel(metaclass=CogABCMeta):
"""ABC with functionality for relaying users' messages to a certain channel."""
@abstractmethod
- def __init__(self, bot: Bot, destination, webhook_id, api_endpoint, api_default_params, logger) -> None:
+ def __init__(
+ self,
+ bot: Bot,
+ destination: int,
+ webhook_id: int,
+ api_endpoint: str,
+ api_default_params: dict,
+ logger: logging.Logger
+ ) -> None:
self.bot = bot
self.destination = destination # E.g., Channels.big_brother_logs
@@ -265,7 +275,7 @@ class WatchChannel(metaclass=CogABCMeta):
self.message_history.message_count += 1
- async def send_header(self, msg) -> None:
+ async def send_header(self, msg: Message) -> None:
"""Sends a header embed with information about the relayed messages to the watch channel."""
user_id = msg.author.id
diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py
index e88efa033..ab0ed2472 100644
--- a/bot/cogs/wolfram.py
+++ b/bot/cogs/wolfram.py
@@ -1,13 +1,13 @@
import logging
from io import BytesIO
-from typing import List, Optional, Tuple
+from typing import Callable, List, Optional, Tuple
from urllib import parse
import discord
from dateutil.relativedelta import relativedelta
from discord import Embed
from discord.ext import commands
-from discord.ext.commands import BucketType, Cog, Context, check, group
+from discord.ext.commands import Bot, BucketType, Cog, Context, check, group
from bot.constants import Colours, STAFF_ROLES, Wolfram
from bot.pagination import ImagePaginator
@@ -37,18 +37,7 @@ async def send_embed(
img_url: str = None,
f: discord.File = None
) -> None:
- """
- Generates an embed with wolfram as the author, with message_txt as description,
- adds custom colour if specified, a footer and image (could be a file with f param) and sends
- the embed through ctx
- :param ctx: Context
- :param message_txt: str - Message to be sent
- :param colour: int - Default: Colours.soft_red - Colour of embed
- :param footer: str - Default: None - Adds a footer to the embed
- :param img_url:str - Default: None - Adds an image to the embed
- :param f: discord.File - Default: None - Add a file to the msg, often attached as image to embed
- """
-
+ """Generate & send a response embed with Wolfram as the author."""
embed = Embed(colour=colour)
embed.description = message_txt
embed.set_author(name="Wolfram Alpha",
@@ -63,16 +52,12 @@ async def send_embed(
await ctx.send(embed=embed, file=f)
-def custom_cooldown(*ignore: List[int]) -> check:
+def custom_cooldown(*ignore: List[int]) -> Callable:
"""
- Custom cooldown mapping that applies a specific requests per day to users.
- Staff is ignored by the user cooldown, however the cooldown implements a
- total amount of uses per day for the entire guild. (Configurable in configs)
+ Implement per-user and per-guild cooldowns for requests to the Wolfram API.
- :param ignore: List[int] -- list of ids of roles to be ignored by user cooldown
- :return: check
+ A list of roles may be provided to ignore the per-user cooldown
"""
-
async def predicate(ctx: Context) -> bool:
user_bucket = usercd.get_bucket(ctx.message)
@@ -109,8 +94,8 @@ def custom_cooldown(*ignore: List[int]) -> check:
return check(predicate)
-async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]:
- # Give feedback that the bot is working.
+async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[List[Tuple]]:
+ """Get the Wolfram API pod pages for the provided query."""
async with ctx.channel.typing():
url_str = parse.urlencode({
"input": query,
@@ -164,9 +149,7 @@ async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]:
class Wolfram(Cog):
- """
- Commands for interacting with the Wolfram|Alpha API.
- """
+ """Commands for interacting with the Wolfram|Alpha API."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@@ -174,14 +157,7 @@ class Wolfram(Cog):
@group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
@custom_cooldown(*STAFF_ROLES)
async def wolfram_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests all answers on a single image,
- sends an image of all related pods
-
- :param ctx: Context
- :param query: str - string request to api
- """
-
+ """Requests all answers on a single image, sends an image of all related pods."""
url_str = parse.urlencode({
"i": query,
"appid": APPID,
@@ -221,13 +197,10 @@ class Wolfram(Cog):
@custom_cooldown(*STAFF_ROLES)
async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
"""
- Requests a drawn image of given query
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc
+ Requests a drawn image of given query.
- :param ctx: Context
- :param query: str - string request to api
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
-
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
@@ -243,15 +216,12 @@ class Wolfram(Cog):
@wolfram_command.command(name="cut", aliases=("c",))
@custom_cooldown(*STAFF_ROLES)
- async def wolfram_cut_command(self, ctx, *, query: str) -> None:
+ async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
"""
- Requests a drawn image of given query
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc
+ Requests a drawn image of given query.
- :param ctx: Context
- :param query: str - string request to api
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
-
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
@@ -267,14 +237,7 @@ class Wolfram(Cog):
@wolfram_command.command(name="short", aliases=("sh", "s"))
@custom_cooldown(*STAFF_ROLES)
async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests an answer to a simple question
- Responds in plaintext
-
- :param ctx: Context
- :param query: str - string request to api
- """
-
+ """Requests an answer to a simple question."""
url_str = parse.urlencode({
"i": query,
"appid": APPID,
@@ -304,5 +267,6 @@ class Wolfram(Cog):
def setup(bot: commands.Bot) -> None:
+ """Wolfram cog load."""
bot.add_cog(Wolfram(bot))
log.info("Cog loaded: Wolfram")
diff --git a/bot/converters.py b/bot/converters.py
index 4bd9aba13..7386187ab 100644
--- a/bot/converters.py
+++ b/bot/converters.py
@@ -1,6 +1,7 @@
import logging
from datetime import datetime
from ssl import CertificateError
+from typing import Union
import dateparser
import discord
@@ -15,17 +16,16 @@ class ValidPythonIdentifier(Converter):
"""
A converter that checks whether the given string is a valid Python identifier.
- This is used to have package names
- that correspond to how you would use
- the package in your code, e.g.
- `import package`. Raises `BadArgument`
- if the argument is not a valid Python
- identifier, and simply passes through
+ This is used to have package names that correspond to how you would use the package in your
+ code, e.g. `import package`.
+
+ Raises `BadArgument` if the argument is not a valid Python identifier, and simply passes through
the given argument otherwise.
"""
@staticmethod
- async def convert(ctx, argument: str):
+ async def convert(ctx: Context, argument: str) -> str:
+ """Checks whether the given string is a valid Python identifier."""
if not argument.isidentifier():
raise BadArgument(f"`{argument}` is not a valid Python identifier")
return argument
@@ -35,14 +35,15 @@ class ValidURL(Converter):
"""
Represents a valid webpage URL.
- This converter checks whether the given
- URL can be reached and requesting it returns
- a status code of 200. If not, `BadArgument`
- is raised. Otherwise, it simply passes through the given URL.
+ This converter checks whether the given URL can be reached and requesting it returns a status
+ code of 200. If not, `BadArgument` is raised.
+
+ Otherwise, it simply passes through the given URL.
"""
@staticmethod
- async def convert(ctx, url: str):
+ async def convert(ctx: Context, url: str) -> str:
+ """This converter checks whether the given URL can be reached with a status code of 200."""
try:
async with ctx.bot.http_session.get(url) as resp:
if resp.status != 200:
@@ -63,12 +64,11 @@ class ValidURL(Converter):
class InfractionSearchQuery(Converter):
- """
- A converter that checks if the argument is a Discord user, and if not, falls back to a string.
- """
+ """A converter that checks if the argument is a Discord user, and if not, falls back to a string."""
@staticmethod
- async def convert(ctx, arg):
+ async def convert(ctx: Context, arg: str) -> Union[discord.Member, str]:
+ """Check if the argument is a Discord user, and if not, falls back to a string."""
try:
maybe_snowflake = arg.strip("<@!>")
return await ctx.bot.fetch_user(maybe_snowflake)
@@ -77,12 +77,15 @@ class InfractionSearchQuery(Converter):
class Subreddit(Converter):
- """
- Forces a string to begin with "r/" and checks if it's a valid subreddit.
- """
+ """Forces a string to begin with "r/" and checks if it's a valid subreddit."""
@staticmethod
- async def convert(ctx, sub: str):
+ async def convert(ctx: Context, sub: str) -> str:
+ """
+ Force sub to begin with "r/" and check if it's a valid subreddit.
+
+ If sub is a valid subreddit, return it prepended with "r/"
+ """
sub = sub.lower()
if not sub.startswith("r/"):
@@ -103,9 +106,21 @@ class Subreddit(Converter):
class TagNameConverter(Converter):
+ """
+ Ensure that a proposed tag name is valid.
+
+ Valid tag names meet the following conditions:
+ * All ASCII characters
+ * Has at least one non-whitespace character
+ * Not solely numeric
+ * Shorter than 127 characters
+ """
+
@staticmethod
- async def convert(ctx: Context, tag_name: str):
- def is_number(value):
+ async def convert(ctx: Context, tag_name: str) -> str:
+ """Lowercase & strip whitespace from proposed tag_name & ensure it's valid."""
+ def is_number(value: str) -> bool:
+ """Check to see if the input string is numeric."""
try:
float(value)
except ValueError:
@@ -142,8 +157,15 @@ class TagNameConverter(Converter):
class TagContentConverter(Converter):
+ """Ensure proposed tag content is not empty and contains at least one non-whitespace character."""
+
@staticmethod
- async def convert(ctx: Context, tag_content: str):
+ async def convert(ctx: Context, tag_content: str) -> str:
+ """
+ Ensure tag_content is non-empty and contains at least one non-whitespace character.
+
+ If tag_content is valid, return the stripped version.
+ """
tag_content = tag_content.strip()
# The tag contents should not be empty, or filled with whitespace.
@@ -156,13 +178,16 @@ class TagContentConverter(Converter):
class ExpirationDate(Converter):
+ """Convert relative expiration date into UTC datetime using dateparser."""
+
DATEPARSER_SETTINGS = {
'PREFER_DATES_FROM': 'future',
'TIMEZONE': 'UTC',
'TO_TIMEZONE': 'UTC'
}
- async def convert(self, ctx, expiration_string: str):
+ async def convert(self, ctx: Context, expiration_string: str) -> datetime:
+ """Convert relative expiration date into UTC datetime."""
expiry = dateparser.parse(expiration_string, settings=self.DATEPARSER_SETTINGS)
if expiry is None:
raise BadArgument(f"Failed to parse expiration date from `{expiration_string}`")
diff --git a/bot/decorators.py b/bot/decorators.py
index 923d21938..33a6bcadd 100644
--- a/bot/decorators.py
+++ b/bot/decorators.py
@@ -1,9 +1,9 @@
import logging
import random
-import typing
from asyncio import Lock, sleep
from contextlib import suppress
from functools import wraps
+from typing import Any, Callable, Container, Optional
from weakref import WeakValueDictionary
from discord import Colour, Embed
@@ -18,6 +18,8 @@ log = logging.getLogger(__name__)
class InChannelCheckFailure(CheckFailure):
+ """Raised when a check fails for a message being sent in a whitelisted channel."""
+
def __init__(self, *channels: int):
self.channels = channels
channels_str = ', '.join(f"<#{c_id}>" for c_id in channels)
@@ -25,11 +27,10 @@ class InChannelCheckFailure(CheckFailure):
super().__init__(f"Sorry, but you may only use this command within {channels_str}.")
-def in_channel(*channels: int, bypass_roles: typing.Container[int] = None):
- """
- Checks that the message is in a whitelisted channel or optionally has a bypass role.
- """
- def predicate(ctx: Context):
+def in_channel(*channels: int, bypass_roles: Container[int] = None) -> Callable:
+ """Checks that the message is in a whitelisted channel or optionally has a bypass role."""
+ def predicate(ctx: Context) -> bool:
+ """In-channel checker predicate."""
if ctx.channel.id in channels:
log.debug(f"{ctx.author} tried to call the '{ctx.command.name}' command. "
f"The command was used in a whitelisted channel.")
@@ -50,42 +51,34 @@ def in_channel(*channels: int, bypass_roles: typing.Container[int] = None):
return commands.check(predicate)
-def with_role(*role_ids: int):
- """
- Returns True if the user has any one
- of the roles in role_ids.
- """
-
- async def predicate(ctx: Context):
+def with_role(*role_ids: int) -> Callable:
+ """Returns True if the user has any one of the roles in role_ids."""
+ async def predicate(ctx: Context) -> bool:
+ """With role checker predicate."""
return with_role_check(ctx, *role_ids)
return commands.check(predicate)
-def without_role(*role_ids: int):
- """
- Returns True if the user does not have any
- of the roles in role_ids.
- """
-
- async def predicate(ctx: Context):
+def without_role(*role_ids: int) -> Callable:
+ """Returns True if the user does not have any of the roles in role_ids."""
+ async def predicate(ctx: Context) -> bool:
return without_role_check(ctx, *role_ids)
return commands.check(predicate)
-def locked():
+def locked() -> Callable:
"""
Allows the user to only run one instance of the decorated command at a time.
- Subsequent calls to the command from the same author are
- ignored until the command has completed invocation.
+
+ Subsequent calls to the command from the same author are ignored until the command has completed invocation.
This decorator has to go before (below) the `command` decorator.
"""
-
- def wrap(func):
+ def wrap(func: Callable) -> Callable:
func.__locks = WeakValueDictionary()
@wraps(func)
- async def inner(self, ctx, *args, **kwargs):
+ async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Optional[Any]:
lock = func.__locks.setdefault(ctx.author.id, Lock())
if lock.locked():
embed = Embed()
@@ -105,15 +98,15 @@ def locked():
return wrap
-def redirect_output(destination_channel: int, bypass_roles: typing.Container[int] = None):
- """
- Changes the channel in the context of the command to redirect the output
- to a certain channel, unless the author has a role to bypass redirection
+def redirect_output(destination_channel: int, bypass_roles: Container[int] = None) -> Callable:
"""
+ Changes the channel in the context of the command to redirect the output to a certain channel.
- def wrap(func):
+ Redirect is bypassed if the author has a role to bypass redirection.
+ """
+ def wrap(func: Callable) -> Callable:
@wraps(func)
- async def inner(self, ctx, *args, **kwargs):
+ async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Any:
if ctx.channel.id == destination_channel:
log.trace(f"Command {ctx.command.name} was invoked in destination_channel, not redirecting")
return await func(self, ctx, *args, **kwargs)
diff --git a/bot/interpreter.py b/bot/interpreter.py
index 06343db39..a42b45a2d 100644
--- a/bot/interpreter.py
+++ b/bot/interpreter.py
@@ -1,5 +1,8 @@
from code import InteractiveInterpreter
from io import StringIO
+from typing import Any
+
+from discord.ext.commands import Bot, Context
CODE_TEMPLATE = """
async def _func():
@@ -8,13 +11,20 @@ async def _func():
class Interpreter(InteractiveInterpreter):
+ """
+ Subclass InteractiveInterpreter to specify custom run functionality.
+
+ Helper class for internal eval.
+ """
+
write_callable = None
- def __init__(self, bot):
+ def __init__(self, bot: Bot):
_locals = {"bot": bot}
super().__init__(_locals)
- async def run(self, code, ctx, io, *args, **kwargs):
+ async def run(self, code: str, ctx: Context, io: StringIO, *args, **kwargs) -> Any:
+ """Execute the provided source code as the bot & return the output."""
self.locals["_rvalue"] = []
self.locals["ctx"] = ctx
self.locals["print"] = lambda x: io.write(f"{x}\n")
diff --git a/bot/pagination.py b/bot/pagination.py
index 0ad5b81f1..76082f459 100644
--- a/bot/pagination.py
+++ b/bot/pagination.py
@@ -2,7 +2,7 @@ import asyncio
import logging
from typing import Iterable, List, Optional, Tuple
-from discord import Embed, Member, Reaction
+from discord import Embed, Member, Message, Reaction
from discord.abc import User
from discord.ext.commands import Context, Paginator
@@ -18,6 +18,8 @@ log = logging.getLogger(__name__)
class EmptyPaginatorEmbed(Exception):
+ """Raised when attempting to paginate with empty contents."""
+
pass
@@ -25,25 +27,24 @@ class LinePaginator(Paginator):
"""
A class that aids in paginating code blocks for Discord messages.
- Attributes
- -----------
- prefix: :class:`str`
+ Available attributes include:
+ * prefix: `str`
The prefix inserted to every page. e.g. three backticks.
- suffix: :class:`str`
+ * suffix: `str`
The suffix appended at the end of every page. e.g. three backticks.
- max_size: :class:`int`
+ * max_size: `int`
The maximum amount of codepoints allowed in a page.
- max_lines: :class:`int`
+ * max_lines: `int`
The maximum amount of lines allowed in a page.
"""
- def __init__(self, prefix='```', suffix='```',
- max_size=2000, max_lines=None):
+ def __init__(
+ self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None
+ ) -> None:
"""
- This function overrides the Paginator.__init__
- from inside discord.ext.commands.
- It overrides in order to allow us to configure
- the maximum number of lines per page.
+ This function overrides the Paginator.__init__ from inside discord.ext.commands.
+
+ It overrides in order to allow us to configure the maximum number of lines per page.
"""
self.prefix = prefix
self.suffix = suffix
@@ -54,28 +55,15 @@ class LinePaginator(Paginator):
self._count = len(prefix) + 1 # prefix + newline
self._pages = []
- def add_line(self, line='', *, empty=False):
- """Adds a line to the current page.
-
- If the line exceeds the :attr:`max_size` then an exception
- is raised.
+ def add_line(self, line: str = '', *, empty: bool = False) -> None:
+ """
+ Adds a line to the current page.
- This function overrides the Paginator.add_line
- from inside discord.ext.commands.
- It overrides in order to allow us to configure
- the maximum number of lines per page.
+ If the line exceeds the `self.max_size` then an exception is raised.
- Parameters
- -----------
- line: str
- The line to add.
- empty: bool
- Indicates if another empty line should be added.
+ This function overrides the `Paginator.add_line` from inside `discord.ext.commands`.
- Raises
- ------
- RuntimeError
- The line was too big for the current :attr:`max_size`.
+ It overrides in order to allow us to configure the maximum number of lines per page.
"""
if len(line) > self.max_size - len(self.prefix) - 2:
raise RuntimeError('Line exceeds maximum page size %s' % (self.max_size - len(self.prefix) - 2))
@@ -97,42 +85,39 @@ class LinePaginator(Paginator):
self._count += 1
@classmethod
- async def paginate(cls, lines: Iterable[str], ctx: Context, embed: Embed,
- prefix: str = "", suffix: str = "", max_lines: Optional[int] = None, max_size: int = 500,
- empty: bool = True, restrict_to_user: User = None, timeout: int = 300,
- footer_text: str = None, url: str = None, exception_on_empty_embed: bool = False):
+ async def paginate(
+ cls,
+ lines: Iterable[str],
+ ctx: Context,
+ embed: Embed,
+ prefix: str = "",
+ suffix: str = "",
+ max_lines: Optional[int] = None,
+ max_size: int = 500,
+ empty: bool = True,
+ restrict_to_user: User = None,
+ timeout: int = 300,
+ footer_text: str = None,
+ url: str = None,
+ exception_on_empty_embed: bool = False
+ ) -> Optional[Message]:
"""
- Use a paginator and set of reactions to provide pagination over a set of lines. The reactions are used to
- switch page, or to finish with pagination.
+ Use a paginator and set of reactions to provide pagination over a set of lines.
+
+ The reactions are used to switch page, or to finish with pagination.
+
When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may
- be used to change page, or to remove pagination from the message. Pagination will also be removed automatically
- if no reaction is added for five minutes (300 seconds).
+ be used to change page, or to remove pagination from the message.
+
+ Pagination will also be removed automatically if no reaction is added for five minutes (300 seconds).
+
+ Example:
>>> embed = Embed()
>>> embed.set_author(name="Some Operation", url=url, icon_url=icon)
- >>> await LinePaginator.paginate(
- ... (line for line in lines),
- ... ctx, embed
- ... )
- :param lines: The lines to be paginated
- :param ctx: Current context object
- :param embed: A pre-configured embed to be used as a template for each page
- :param prefix: Text to place before each page
- :param suffix: Text to place after each page
- :param max_lines: The maximum number of lines on each page
- :param max_size: The maximum number of characters on each page
- :param empty: Whether to place an empty line between each given line
- :param restrict_to_user: A user to lock pagination operations to for this message, if supplied
- :param exception_on_empty_embed: Should there be an exception if the embed is empty?
- :param url: the url to use for the embed headline
- :param timeout: The amount of time in seconds to disable pagination of no reaction is added
- :param footer_text: Text to prefix the page number in the footer with
+ >>> await LinePaginator.paginate((line for line in lines), ctx, embed)
"""
-
- def event_check(reaction_: Reaction, user_: Member):
- """
- Make sure that this reaction is what we want to operate on
- """
-
+ def event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
no_restrictions = (
# Pagination is not restricted
not restrict_to_user
@@ -301,24 +286,20 @@ class LinePaginator(Paginator):
class ImagePaginator(Paginator):
"""
Helper class that paginates images for embeds in messages.
+
Close resemblance to LinePaginator, except focuses on images over text.
Refer to ImagePaginator.paginate for documentation on how to use.
"""
- def __init__(self, prefix="", suffix=""):
+ def __init__(self, prefix: str = "", suffix: str = ""):
super().__init__(prefix, suffix)
self._current_page = [prefix]
self.images = []
self._pages = []
def add_line(self, line: str = '', *, empty: bool = False) -> None:
- """
- Adds a line to each page, usually just 1 line in this context
- :param line: str to be page content / title
- :param empty: if there should be new lines between entries
- """
-
+ """Adds a line to each page."""
if line:
self._count = len(line)
else:
@@ -327,50 +308,36 @@ class ImagePaginator(Paginator):
self.close_page()
def add_image(self, image: str = None) -> None:
- """
- Adds an image to a page
- :param image: image url to be appended
- """
-
+ """Adds an image to a page."""
self.images.append(image)
@classmethod
- async def paginate(cls, pages: List[Tuple[str, str]], ctx: Context, embed: Embed,
- prefix: str = "", suffix: str = "", timeout: int = 300,
- exception_on_empty_embed: bool = False):
+ async def paginate(
+ cls,
+ pages: List[Tuple[str, str]],
+ ctx: Context, embed: Embed,
+ prefix: str = "",
+ suffix: str = "",
+ timeout: int = 300,
+ exception_on_empty_embed: bool = False
+ ) -> Optional[Message]:
"""
- Use a paginator and set of reactions to provide
- pagination over a set of title/image pairs.The reactions are
- used to switch page, or to finish with pagination.
+ Use a paginator and set of reactions to provide pagination over a set of title/image pairs.
+
+ The reactions are used to switch page, or to finish with pagination.
- When used, this will send a message using `ctx.send()` and
- apply a set of reactions to it. These reactions may
+ When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may
be used to change page, or to remove pagination from the message.
- Note: Pagination will be removed automatically
- if no reaction is added for five minutes (300 seconds).
+ Note: Pagination will be removed automatically if no reaction is added for five minutes (300 seconds).
+ Example:
>>> embed = Embed()
>>> embed.set_author(name="Some Operation", url=url, icon_url=icon)
>>> await ImagePaginator.paginate(pages, ctx, embed)
-
- Parameters
- -----------
- :param pages: An iterable of tuples with title for page, and img url
- :param ctx: ctx for message
- :param embed: base embed to modify
- :param prefix: prefix of message
- :param suffix: suffix of message
- :param timeout: timeout for when reactions get auto-removed
"""
-
def check_event(reaction_: Reaction, member: Member) -> bool:
- """
- Checks each reaction added, if it matches our conditions pass the wait_for
- :param reaction_: reaction added
- :param member: reaction added by member
- """
-
+ """Checks each reaction added, if it matches our conditions pass the wait_for."""
return all((
# Reaction is on the same message sent
reaction_.message.id == message.id,
diff --git a/bot/patches/__init__.py b/bot/patches/__init__.py
index fd38ea8cf..60f6becaa 100644
--- a/bot/patches/__init__.py
+++ b/bot/patches/__init__.py
@@ -1,4 +1,4 @@
-"""Subpackage that contains patches for discord.py"""
+"""Subpackage that contains patches for discord.py."""
from . import message_edited_at
__all__ = [
diff --git a/bot/patches/message_edited_at.py b/bot/patches/message_edited_at.py
index 528373a9b..a0154f12d 100644
--- a/bot/patches/message_edited_at.py
+++ b/bot/patches/message_edited_at.py
@@ -1,5 +1,5 @@
"""
-# message_edited_at patch
+# message_edited_at patch.
Date: 2019-09-16
Author: Scragly
@@ -16,12 +16,12 @@ from discord import message, utils
log = logging.getLogger(__name__)
-def _handle_edited_timestamp(self, value):
+def _handle_edited_timestamp(self: message.Message, value: str) -> None:
"""Helper function that takes care of parsing the edited timestamp."""
self._edited_timestamp = utils.parse_time(value)
-def apply_patch():
+def apply_patch() -> None:
"""Applies the `edited_at` patch to the `discord.message.Message` class."""
message.Message._handle_edited_timestamp = _handle_edited_timestamp
message.Message._HANDLERS['edited_timestamp'] = message.Message._handle_edited_timestamp
diff --git a/bot/rules/attachments.py b/bot/rules/attachments.py
index 80a15d440..c550aed76 100644
--- a/bot/rules/attachments.py
+++ b/bot/rules/attachments.py
@@ -1,16 +1,12 @@
-"""Detects total attachments exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total attachments exceeding the limit sent by a single user."""
relevant_messages = [last_message] + [
msg
for msg in recent_messages
diff --git a/bot/rules/burst.py b/bot/rules/burst.py
index 80c79be60..25c5a2f33 100644
--- a/bot/rules/burst.py
+++ b/bot/rules/burst.py
@@ -1,16 +1,12 @@
-"""Detects repeated messages sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects repeated messages sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/burst_shared.py b/bot/rules/burst_shared.py
index 2cb7b5200..bbe9271b3 100644
--- a/bot/rules/burst_shared.py
+++ b/bot/rules/burst_shared.py
@@ -1,16 +1,12 @@
-"""Detects repeated messages sent by multiple users."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects repeated messages sent by multiple users."""
total_recent = len(recent_messages)
if total_recent > config['max']:
diff --git a/bot/rules/chars.py b/bot/rules/chars.py
index d05e3cd83..1f587422c 100644
--- a/bot/rules/chars.py
+++ b/bot/rules/chars.py
@@ -1,16 +1,12 @@
-"""Detects total message char count exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total message char count exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/discord_emojis.py b/bot/rules/discord_emojis.py
index e4f957ddb..5bab514f2 100644
--- a/bot/rules/discord_emojis.py
+++ b/bot/rules/discord_emojis.py
@@ -1,5 +1,3 @@
-"""Detects total Discord emojis (excluding Unicode emojis) exceeding the limit sent by a single user."""
-
import re
from typing import Dict, Iterable, List, Optional, Tuple
@@ -10,11 +8,9 @@ DISCORD_EMOJI_RE = re.compile(r"<:\w+:\d+>")
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total Discord emojis (excluding Unicode emojis) exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/duplicates.py b/bot/rules/duplicates.py
index 763fc9983..455764b53 100644
--- a/bot/rules/duplicates.py
+++ b/bot/rules/duplicates.py
@@ -1,16 +1,12 @@
-"""Detects duplicated messages sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects duplicated messages sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/links.py b/bot/rules/links.py
index fa4043fcb..ec75a19c5 100644
--- a/bot/rules/links.py
+++ b/bot/rules/links.py
@@ -1,5 +1,3 @@
-"""Detects total links exceeding the limit sent by a single user."""
-
import re
from typing import Dict, Iterable, List, Optional, Tuple
@@ -10,11 +8,9 @@ LINK_RE = re.compile(r"(https?://[^\s]+)")
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total links exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/mentions.py b/bot/rules/mentions.py
index 45c47b6ba..79725a4b1 100644
--- a/bot/rules/mentions.py
+++ b/bot/rules/mentions.py
@@ -1,16 +1,12 @@
-"""Detects total mentions exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total mentions exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/newlines.py b/bot/rules/newlines.py
index fdad6ffd3..4e66e1359 100644
--- a/bot/rules/newlines.py
+++ b/bot/rules/newlines.py
@@ -1,5 +1,3 @@
-"""Detects total newlines exceeding the set limit sent by a single user."""
-
import re
from typing import Dict, Iterable, List, Optional, Tuple
@@ -7,11 +5,9 @@ from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total newlines exceeding the set limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/role_mentions.py b/bot/rules/role_mentions.py
index 2177a73b5..0649540b6 100644
--- a/bot/rules/role_mentions.py
+++ b/bot/rules/role_mentions.py
@@ -1,16 +1,12 @@
-"""Detects total role mentions exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total role mentions exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py
index d5ae0a7c5..8184be824 100644
--- a/bot/utils/__init__.py
+++ b/bot/utils/__init__.py
@@ -1,10 +1,12 @@
from abc import ABCMeta
+from typing import Any, Generator, Hashable, Iterable
from discord.ext.commands import CogMeta
class CogABCMeta(CogMeta, ABCMeta):
"""Metaclass for ABCs meant to be implemented as Cogs."""
+
pass
@@ -16,50 +18,59 @@ class CaseInsensitiveDict(dict):
"""
@classmethod
- def _k(cls, key):
+ def _k(cls, key: Hashable) -> Hashable:
+ """Return lowered key if a string-like is passed, otherwise pass key straight through."""
return key.lower() if isinstance(key, str) else key
def __init__(self, *args, **kwargs):
super(CaseInsensitiveDict, self).__init__(*args, **kwargs)
self._convert_keys()
- def __getitem__(self, key):
+ def __getitem__(self, key: Hashable) -> Any:
+ """Case insensitive __setitem__."""
return super(CaseInsensitiveDict, self).__getitem__(self.__class__._k(key))
- def __setitem__(self, key, value):
+ def __setitem__(self, key: Hashable, value: Any):
+ """Case insensitive __setitem__."""
super(CaseInsensitiveDict, self).__setitem__(self.__class__._k(key), value)
- def __delitem__(self, key):
+ def __delitem__(self, key: Hashable) -> Any:
+ """Case insensitive __delitem__."""
return super(CaseInsensitiveDict, self).__delitem__(self.__class__._k(key))
- def __contains__(self, key):
+ def __contains__(self, key: Hashable) -> bool:
+ """Case insensitive __contains__."""
return super(CaseInsensitiveDict, self).__contains__(self.__class__._k(key))
- def pop(self, key, *args, **kwargs):
+ def pop(self, key: Hashable, *args, **kwargs) -> Any:
+ """Case insensitive pop."""
return super(CaseInsensitiveDict, self).pop(self.__class__._k(key), *args, **kwargs)
- def get(self, key, *args, **kwargs):
+ def get(self, key: Hashable, *args, **kwargs) -> Any:
+ """Case insensitive get."""
return super(CaseInsensitiveDict, self).get(self.__class__._k(key), *args, **kwargs)
- def setdefault(self, key, *args, **kwargs):
+ def setdefault(self, key: Hashable, *args, **kwargs) -> Any:
+ """Case insensitive setdefault."""
return super(CaseInsensitiveDict, self).setdefault(self.__class__._k(key), *args, **kwargs)
- def update(self, E=None, **F):
+ def update(self, E: Any = None, **F) -> None:
+ """Case insensitive update."""
super(CaseInsensitiveDict, self).update(self.__class__(E))
super(CaseInsensitiveDict, self).update(self.__class__(**F))
- def _convert_keys(self):
+ def _convert_keys(self) -> None:
+ """Helper method to lowercase all existing string-like keys."""
for k in list(self.keys()):
v = super(CaseInsensitiveDict, self).pop(k)
self.__setitem__(k, v)
-def chunks(iterable, size):
+def chunks(iterable: Iterable, size: int) -> Generator[Any, None, None]:
"""
- Generator that allows you to iterate over any indexable collection in `size`-length chunks
+ Generator that allows you to iterate over any indexable collection in `size`-length chunks.
Found: https://stackoverflow.com/a/312464/4022104
"""
-
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
diff --git a/bot/utils/checks.py b/bot/utils/checks.py
index 195edab0f..19f64ff9f 100644
--- a/bot/utils/checks.py
+++ b/bot/utils/checks.py
@@ -6,11 +6,7 @@ log = logging.getLogger(__name__)
def with_role_check(ctx: Context, *role_ids: int) -> bool:
- """
- Returns True if the user has any one
- of the roles in role_ids.
- """
-
+ """Returns True if the user has any one of the roles in role_ids."""
if not ctx.guild: # Return False in a DM
log.trace(f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. "
"This command is restricted by the with_role decorator. Rejecting request.")
@@ -27,11 +23,7 @@ def with_role_check(ctx: Context, *role_ids: int) -> bool:
def without_role_check(ctx: Context, *role_ids: int) -> bool:
- """
- Returns True if the user does not have any
- of the roles in role_ids.
- """
-
+ """Returns True if the user does not have any of the roles in role_ids."""
if not ctx.guild: # Return False in a DM
log.trace(f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. "
"This command is restricted by the without_role decorator. Rejecting request.")
@@ -45,11 +37,7 @@ def without_role_check(ctx: Context, *role_ids: int) -> bool:
def in_channel_check(ctx: Context, channel_id: int) -> bool:
- """
- Checks if the command was executed
- inside of the specified channel.
- """
-
+ """Checks if the command was executed inside of the specified channel."""
check = ctx.channel.id == channel_id
log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. "
f"The result of the in_channel check was {check}.")
diff --git a/bot/utils/messages.py b/bot/utils/messages.py
index 94a8b36ed..549b33ca6 100644
--- a/bot/utils/messages.py
+++ b/bot/utils/messages.py
@@ -1,9 +1,9 @@
import asyncio
import contextlib
from io import BytesIO
-from typing import Sequence, Union
+from typing import Optional, Sequence, Union
-from discord import Embed, File, Message, TextChannel, Webhook
+from discord import Client, Embed, File, Member, Message, Reaction, TextChannel, Webhook
from discord.abc import Snowflake
from discord.errors import HTTPException
@@ -17,42 +17,18 @@ async def wait_for_deletion(
user_ids: Sequence[Snowflake],
deletion_emojis: Sequence[str] = (Emojis.cross_mark,),
timeout: float = 60 * 5,
- attach_emojis=True,
- client=None
-):
- """
- Waits for up to `timeout` seconds for a reaction by
- any of the specified `user_ids` to delete the message.
-
- Args:
- message (Message):
- The message that should be monitored for reactions
- and possibly deleted. Must be a message sent on a
- guild since access to the bot instance is required.
-
- user_ids (Sequence[Snowflake]):
- A sequence of users that are allowed to delete
- this message.
-
- Kwargs:
- deletion_emojis (Sequence[str]):
- A sequence of emojis that are considered deletion
- emojis.
-
- timeout (float):
- A positive float denoting the maximum amount of
- time to wait for a deletion reaction.
-
- attach_emojis (bool):
- Whether to attach the given `deletion_emojis`
- to the message in the given `context`
-
- client (Optional[discord.Client]):
- The client instance handling the original command.
- If not given, will take the client from the guild
- of the message.
+ attach_emojis: bool = True,
+ client: Optional[Client] = None
+) -> None:
"""
+ Wait for up to `timeout` seconds for a reaction by any of the specified `user_ids` to delete the message.
+
+ An `attach_emojis` bool may be specified to determine whether to attach the given
+ `deletion_emojis` to the message in the given `context`
+ A `client` instance may be optionally specified, otherwise client will be taken from the
+ guild of the message.
+ """
if message.guild is None and client is None:
raise ValueError("Message must be sent on a guild")
@@ -62,7 +38,8 @@ async def wait_for_deletion(
for emoji in deletion_emojis:
await message.add_reaction(emoji)
- def check(reaction, user):
+ def check(reaction: Reaction, user: Member) -> bool:
+ """Check that the deletion emoji is reacted by the approprite user."""
return (
reaction.message.id == message.id
and reaction.emoji in deletion_emojis
@@ -70,25 +47,17 @@ async def wait_for_deletion(
)
with contextlib.suppress(asyncio.TimeoutError):
- await bot.wait_for(
- 'reaction_add',
- check=check,
- timeout=timeout
- )
+ await bot.wait_for('reaction_add', check=check, timeout=timeout)
await message.delete()
-async def send_attachments(message: Message, destination: Union[TextChannel, Webhook]):
+async def send_attachments(message: Message, destination: Union[TextChannel, Webhook]) -> None:
"""
Re-uploads each attachment in a message to the given channel or webhook.
Each attachment is sent as a separate message to more easily comply with the 8 MiB request size limit.
If attachments are too large, they are instead grouped into a single embed which links to them.
-
- :param message: the message whose attachments to re-upload
- :param destination: the channel in which to re-upload the attachments
"""
-
large = []
for attachment in message.attachments:
try:
diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py
index f03865013..08abd91d7 100644
--- a/bot/utils/scheduling.py
+++ b/bot/utils/scheduling.py
@@ -2,7 +2,7 @@ import asyncio
import contextlib
import logging
from abc import abstractmethod
-from typing import Dict
+from typing import Coroutine, Dict, Union
from bot.utils import CogABCMeta
@@ -10,6 +10,7 @@ log = logging.getLogger(__name__)
class Scheduler(metaclass=CogABCMeta):
+ """Task scheduler."""
def __init__(self):
@@ -17,24 +18,23 @@ class Scheduler(metaclass=CogABCMeta):
self.scheduled_tasks: Dict[str, asyncio.Task] = {}
@abstractmethod
- async def _scheduled_task(self, task_object: dict):
+ async def _scheduled_task(self, task_object: dict) -> None:
"""
- A coroutine which handles the scheduling. This is added to the scheduled tasks,
- and should wait the task duration, execute the desired code, and clean up the task.
+ A coroutine which handles the scheduling.
+
+ This is added to the scheduled tasks, and should wait the task duration, execute the desired
+ code, then clean up the task.
+
For example, in Reminders this will wait for the reminder duration, send the reminder,
then make a site API request to delete the reminder from the database.
-
- :param task_object:
"""
- def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict):
+ def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict) -> None:
"""
Schedules a task.
- :param loop: the asyncio event loop
- :param task_id: the ID of the task.
- :param task_data: the data of the task, passed to `Scheduler._scheduled_expiration`.
- """
+ `task_data` is passed to `Scheduler._scheduled_expiration`
+ """
if task_id in self.scheduled_tasks:
return
@@ -42,12 +42,8 @@ class Scheduler(metaclass=CogABCMeta):
self.scheduled_tasks[task_id] = task
- def cancel_task(self, task_id: str):
- """
- Un-schedules a task.
- :param task_id: the ID of the infraction in question
- """
-
+ def cancel_task(self, task_id: str) -> None:
+ """Un-schedules a task."""
task = self.scheduled_tasks.get(task_id)
if task is None:
@@ -59,14 +55,8 @@ class Scheduler(metaclass=CogABCMeta):
del self.scheduled_tasks[task_id]
-def create_task(loop: asyncio.AbstractEventLoop, coro_or_future):
- """
- Creates an asyncio.Task object from a coroutine or future object.
-
- :param loop: the asyncio event loop.
- :param coro_or_future: the coroutine or future object to be scheduled.
- """
-
+def create_task(loop: asyncio.AbstractEventLoop, coro_or_future: Union[Coroutine, asyncio.Future]) -> asyncio.Task:
+ """Creates an asyncio.Task object from a coroutine or future object."""
task: asyncio.Task = asyncio.ensure_future(coro_or_future, loop=loop)
# Silently ignore exceptions in a callback (handles the CancelledError nonsense)
@@ -74,6 +64,7 @@ def create_task(loop: asyncio.AbstractEventLoop, coro_or_future):
return task
-def _silent_exception(future):
+def _silent_exception(future: asyncio.Future) -> None:
+ """Suppress future's exception."""
with contextlib.suppress(Exception):
future.exception()
diff --git a/bot/utils/time.py b/bot/utils/time.py
index a330c9cd8..c529ccc2b 100644
--- a/bot/utils/time.py
+++ b/bot/utils/time.py
@@ -6,10 +6,9 @@ from dateutil.relativedelta import relativedelta
RFC1123_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
-def _stringify_time_unit(value: int, unit: str):
+def _stringify_time_unit(value: int, unit: str) -> str:
"""
- Returns a string to represent a value and time unit,
- ensuring that it uses the right plural form of the unit.
+ Returns a string to represent a value and time unit, ensuring that it uses the right plural form of the unit.
>>> _stringify_time_unit(1, "seconds")
"1 second"
@@ -18,7 +17,6 @@ def _stringify_time_unit(value: int, unit: str):
>>> _stringify_time_unit(0, "minutes")
"less than a minute"
"""
-
if value == 1:
return f"{value} {unit[:-1]}"
elif value == 0:
@@ -27,18 +25,13 @@ def _stringify_time_unit(value: int, unit: str):
return f"{value} {unit}"
-def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: int = 6):
+def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: int = 6) -> str:
"""
Returns a human-readable version of the relativedelta.
- :param delta: A dateutil.relativedelta.relativedelta object
- :param precision: The smallest unit that should be included.
- :param max_units: The maximum number of time-units to return.
-
- :return: A string like `4 days, 12 hours and 1 second`,
- `1 minute`, or `less than a minute`.
+ precision specifies the smallest unit of time to include (e.g. "seconds", "minutes").
+ max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours).
"""
-
units = (
("years", delta.years),
("months", delta.months),
@@ -73,19 +66,13 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units:
return humanized
-def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max_units: int = 6):
+def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max_units: int = 6) -> str:
"""
- Takes a datetime and returns a human-readable string that
- describes how long ago that datetime was.
+ Takes a datetime and returns a human-readable string that describes how long ago that datetime was.
- :param past_datetime: A datetime.datetime object
- :param precision: The smallest unit that should be included.
- :param max_units: The maximum number of time-units to return.
-
- :return: A string like `4 days, 12 hours and 1 second ago`,
- `1 minute ago`, or `less than a minute ago`.
+ precision specifies the smallest unit of time to include (e.g. "seconds", "minutes").
+ max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours).
"""
-
now = datetime.datetime.utcnow()
delta = abs(relativedelta(now, past_datetime))
@@ -94,20 +81,17 @@ def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max
return f"{humanized} ago"
-def parse_rfc1123(time_str):
+def parse_rfc1123(time_str: str) -> datetime.datetime:
+ """Parse RFC1123 time string into datetime."""
return datetime.datetime.strptime(time_str, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc)
# Hey, this could actually be used in the off_topic_names and reddit cogs :)
-async def wait_until(time: datetime.datetime):
- """
- Wait until a given time.
-
- :param time: A datetime.datetime object to wait until.
- """
-
+async def wait_until(time: datetime.datetime) -> None:
+ """Wait until a given time."""
delay = time - datetime.datetime.utcnow()
delay_seconds = delay.total_seconds()
+ # Incorporate a small delay so we don't rapid-fire the event due to time precision errors
if delay_seconds > 1.0:
await asyncio.sleep(delay_seconds)
diff --git a/tox.ini b/tox.ini
index 21097cd97..d14819d57 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,19 @@
[flake8]
max-line-length=120
-application_import_names=bot,tests
-exclude=.cache,.venv
-ignore=B311,W503,E226,S311,T000
+docstring-convention=all
import-order-style=pycharm
+application_import_names=bot,tests
+exclude=.cache,.venv,constants.py
+ignore=
+ B311,W503,E226,S311,T000
+ # Missing Docstrings
+ D100,D104,D105,D107,
+ # Docstring Whitespace
+ D203,D212,D214,D215,
+ # Docstring Quotes
+ D301,D302,
+ # Docstring Content
+ D400,D401,D402,D404,D405,D406,D407,D408,D409,D410,D411,D412,D413,D414,D416,D417
+ # Type Annotations
+ TYP002,TYP003,TYP101,TYP102,TYP204,TYP206
+per-file-ignores=tests/*:D,TYP