aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar sco1 <[email protected]>2019-09-10 17:57:42 -0400
committerGravatar sco1 <[email protected]>2019-09-10 17:57:42 -0400
commitd20905ef77c86cd348cb8a86014b68012861d3b0 (patch)
treebc4e9cfca3363fc2c3332f20247ce2f37a0f0b0a
parentDocstring linting chunk 2 (diff)
Docstring linting chunk 3
-rw-r--r--bot/cogs/filtering.py65
-rw-r--r--bot/cogs/free.py9
-rw-r--r--bot/cogs/moderation.py258
-rw-r--r--bot/cogs/off_topic_names.py28
-rw-r--r--bot/cogs/reminders.py98
-rw-r--r--bot/cogs/security.py13
-rw-r--r--bot/cogs/snekbox.py12
7 files changed, 170 insertions, 313 deletions
diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py
index 418297fc4..b924ac265 100644
--- a/bot/cogs/filtering.py
+++ b/bot/cogs/filtering.py
@@ -30,10 +30,7 @@ ZALGO_RE = r"[\u0300-\u036F\u0489]"
class Filtering:
- """
- Filtering out invites, blacklisting domains,
- and warning us of certain regular expressions
- """
+ """Filtering out invites, blacklisting domains, and warning us of certain regular expressions."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -94,26 +91,27 @@ class Filtering:
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
- async def on_message(self, msg: Message):
+ async def on_message(self, msg: Message) -> None:
+ """Invoke message filter for new messages."""
await self._filter_message(msg)
- async def on_message_edit(self, before: Message, after: Message):
+ async def on_message_edit(self, before: Message, after: Message) -> None:
+ """
+ Invoke message filter for message edits.
+
+ If there have been multiple edits, calculate the time delta from the previous edit
+ """
if not before.edited_at:
delta = relativedelta(after.edited_at, before.created_at).microseconds
else:
delta = None
await self._filter_message(after, delta)
- async def _filter_message(self, msg: Message, delta: Optional[int] = None):
- """
- Whenever a message is sent or edited,
- run it through our filters to see if it
- violates any of our rules, and then respond
- accordingly.
- """
-
+ async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None:
+ """Filter the input message to see if it violates any of our rules, and then respond accordingly."""
# Should we filter this message?
role_whitelisted = False
@@ -224,14 +222,10 @@ class Filtering:
@staticmethod
async def _has_watchlist_words(text: str) -> bool:
"""
- Returns True if the text contains
- one of the regular expressions from the
- word_watchlist in our filter config.
+ Returns True if the text contains one of the regular expressions from the word_watchlist in our filter config.
- Only matches words with boundaries before
- and after the expression.
+ Only matches words with boundaries before and after the expression.
"""
-
for expression in Filter.word_watchlist:
if re.search(fr"\b{expression}\b", text, re.IGNORECASE):
return True
@@ -241,14 +235,10 @@ class Filtering:
@staticmethod
async def _has_watchlist_tokens(text: str) -> bool:
"""
- Returns True if the text contains
- one of the regular expressions from the
- token_watchlist in our filter config.
+ Returns True if the text contains one of the regular expressions from the token_watchlist in our filter config.
- This will match the expression even if it
- does not have boundaries before and after
+ This will match the expression even if it does not have boundaries before and after.
"""
-
for expression in Filter.token_watchlist:
if re.search(fr"{expression}", text, re.IGNORECASE):
@@ -260,11 +250,7 @@ class Filtering:
@staticmethod
async def _has_urls(text: str) -> bool:
- """
- Returns True if the text contains one of
- the blacklisted URLs from the config file.
- """
-
+ """Returns True if the text contains one of the blacklisted URLs from the config file."""
if not re.search(URL_RE, text, re.IGNORECASE):
return False
@@ -283,7 +269,6 @@ class Filtering:
Zalgo range is \u0300 – \u036F and \u0489.
"""
-
return bool(re.search(ZALGO_RE, text))
async def _has_invites(self, text: str) -> Union[dict, bool]:
@@ -295,7 +280,6 @@ class Filtering:
Attempts to catch some of common ways to try to cheat the system.
"""
-
# Remove backslashes to prevent escape character aroundfuckery like
# discord\.gg/gdudes-pony-farm
text = text.replace("\\", "")
@@ -336,30 +320,27 @@ class Filtering:
return invite_data if invite_data else False
@staticmethod
- async def _has_rich_embed(msg: Message):
- """
- Returns True if any of the embeds in the message are of type 'rich', but are not twitter
- embeds. Returns False otherwise.
- """
+ async def _has_rich_embed(msg: Message) -> bool:
+ """Returns True if any of the embeds in the message are of type 'rich', but are not twitter embeds."""
if msg.embeds:
for embed in msg.embeds:
if embed.type == "rich" and (not embed.url or "twitter.com" not in embed.url):
return True
return False
- async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel):
+ async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None:
"""
- Notify filtered_member about a moderation action with the reason str
+ Notify filtered_member about a moderation action with the reason str.
First attempts to DM the user, fall back to in-channel notification if user has DMs disabled
"""
-
try:
await filtered_member.send(reason)
except discord.errors.Forbidden:
await channel.send(f"{filtered_member.mention} {reason}")
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Filtering cog load."""
bot.add_cog(Filtering(bot))
log.info("Cog loaded: Filtering")
diff --git a/bot/cogs/free.py b/bot/cogs/free.py
index fd6009bb8..ccc722e66 100644
--- a/bot/cogs/free.py
+++ b/bot/cogs/free.py
@@ -2,7 +2,7 @@ import logging
from datetime import datetime
from discord import Colour, Embed, Member, utils
-from discord.ext.commands import Context, command
+from discord.ext.commands import Bot, Context, command
from bot.constants import Categories, Channels, Free, STAFF_ROLES
from bot.decorators import redirect_output
@@ -22,11 +22,9 @@ class Free:
@command(name="free", aliases=('f',))
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def free(self, ctx: Context, user: Member = None, seek: int = 2):
+ async def free(self, ctx: Context, user: Member = None, seek: int = 2) -> None:
"""
Lists free help channels by likeliness of availability.
- :param user: accepts user mention, ID, etc.
- :param seek: How far back to check the last active message.
seek is used only when this command is invoked in a help channel.
You cannot override seek without mentioning a user first.
@@ -101,6 +99,7 @@ class Free:
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Free cog load."""
bot.add_cog(Free())
log.info("Cog loaded: Free")
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py
index 1dc2c70d6..28956e636 100644
--- a/bot/cogs/moderation.py
+++ b/bot/cogs/moderation.py
@@ -33,6 +33,7 @@ APPEALABLE_INFRACTIONS = ("Ban", "Mute")
def proxy_user(user_id: str) -> Object:
+ """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
try:
user_id = int(user_id)
except ValueError:
@@ -47,9 +48,7 @@ UserTypes = Union[Member, User, proxy_user]
class Moderation(Scheduler):
- """
- Server moderation tools.
- """
+ """Server moderation tools."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -58,10 +57,11 @@ class Moderation(Scheduler):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
- async def on_ready(self):
- # Schedule expiration for previous infractions
+ async def on_ready(self) -> None:
+ """Schedule expiration for previous infractions."""
infractions = await self.bot.api_client.get(
'bot/infractions', params={'active': 'true'}
)
@@ -73,14 +73,13 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None):
+ async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
Create a warning infraction in the database for a user.
**`user`:** Accepts user mention, ID, etc.
**`reason`:** The reason for the warning.
"""
-
response_object = await post_infraction(ctx, user, type="warning", reason=reason)
if response_object is None:
return
@@ -123,14 +122,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def kick(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Kicks a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
- """
-
+ async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Kicks a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'kick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -183,14 +176,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a permanent ban infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
- """
-
+ async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a permanent ban infraction for a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'ban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -260,14 +247,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def mute(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Create a permanent mute infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
- """
-
+ async def mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Create a permanent mute infraction for a user with the provided reason."""
active_mutes = await self.bot.api_client.get(
'bot/infractions',
params={
@@ -334,15 +315,12 @@ class Moderation(Scheduler):
async def tempmute(
self, ctx: Context, user: Member, expiration: ExpirationDate,
*, reason: str = None
- ):
+ ) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ Duration strings are parsed per: http://strftime.org/
"""
-
active_mutes = await self.bot.api_client.get(
'bot/infractions',
params={
@@ -416,15 +394,12 @@ class Moderation(Scheduler):
@command()
async def tempban(
self, ctx: Context, user: UserTypes, expiry: ExpirationDate, *, reason: str = None
- ):
+ ) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`expiry`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ Duration strings are parsed per: http://strftime.org/
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'tempban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -510,14 +485,12 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn'])
- async def note(self, ctx: Context, user: UserTypes, *, reason: str = None):
+ async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a private infraction note in the database for a user.
+ Create a private infraction note in the database for a user with the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
+ This does not send the user a notification
"""
-
response_object = await post_infraction(
ctx, user, type="warning", reason=reason, hidden=True
)
@@ -545,14 +518,12 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowkick', 'skick'])
- async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None):
+ async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Kicks a user.
+ Kick a user for the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -598,14 +569,12 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowban', 'sban'])
- async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None):
+ async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a permanent ban infraction in the database for a user.
+ Create a permanent ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -652,14 +621,12 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowmute', 'smute'])
- async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None):
+ async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Create a permanent mute infraction in the database for a user.
+ Create a permanent mute infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
+ This does not send the user a notification.
"""
-
response_object = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True)
if response_object is None:
return
@@ -692,15 +659,14 @@ class Moderation(Scheduler):
@command(hidden=True, aliases=["shadowtempmute, stempmute"])
async def shadow_tempmute(
self, ctx: Context, user: Member, duration: str, *, reason: str = None
- ):
+ ) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
- """
+ Duration strings are parsed per: http://strftime.org/
+ This does not send the user a notification.
+ """
response_object = await post_infraction(
ctx, user, type="mute", reason=reason, duration=duration, hidden=True
)
@@ -741,15 +707,14 @@ class Moderation(Scheduler):
@command(hidden=True, aliases=["shadowtempban, stempban"])
async def shadow_tempban(
self, ctx: Context, user: UserTypes, duration: str, *, reason: str = None
- ):
+ ) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
- """
+ Duration strings are parsed per: http://strftime.org/
+ This does not send the user a notification.
+ """
if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -811,13 +776,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def unmute(self, ctx: Context, user: Member):
- """
- Deactivates the active mute infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ async def unmute(self, ctx: Context, user: Member) -> None:
+ """Deactivates the active mute infraction for a user."""
try:
# check the current active infraction
response = await self.bot.api_client.get(
@@ -881,13 +841,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def unban(self, ctx: Context, user: UserTypes):
- """
- Deactivates the active ban infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ async def unban(self, ctx: Context, user: UserTypes) -> None:
+ """Deactivates the active ban infraction for a user."""
try:
# check the current active infraction
response = await self.bot.api_client.get(
@@ -938,16 +893,14 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
- async def infraction_group(self, ctx: Context):
+ async def infraction_group(self, ctx: Context) -> None:
"""Infraction manipulation commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction")
@with_role(*MODERATION_ROLES)
@infraction_group.group(name='edit', invoke_without_command=True)
- async def infraction_edit_group(self, ctx: Context):
+ async def infraction_edit_group(self, ctx: Context) -> None:
"""Infraction editing commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction", "edit")
@with_role(*MODERATION_ROLES)
@@ -955,15 +908,12 @@ class Moderation(Scheduler):
async def edit_duration(
self, ctx: Context,
infraction_id: int, expires_at: Union[ExpirationDate, str]
- ):
+ ) -> None:
"""
Sets the duration of the given infraction, relative to the time of updating.
- **`infraction_id`:** the id of the infraction
- **`expires_at`:** the new expiration date of the infraction.
- Use "permanent" to mark the infraction as permanent.
+ Duration strings are parsed per: http://strftime.org/, use "permanent" to mark the infraction as permanent.
"""
-
if isinstance(expires_at, str) and expires_at != 'permanent':
raise BadArgument(
"If `expires_at` is given as a non-datetime, "
@@ -1043,13 +993,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@infraction_edit_group.command(name="reason")
- async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str):
- """
- Sets the reason of the given infraction.
- **`infraction_id`:** the id of the infraction
- **`reason`:** The new reason of the infraction
- """
-
+ async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None:
+ """Edit the reason of the given infraction."""
try:
old_infraction = await self.bot.api_client.get(
'bot/infractions/' + str(infraction_id)
@@ -1099,11 +1044,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@infraction_group.group(name="search", invoke_without_command=True)
- async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery):
- """
- Searches for infractions in the database.
- """
-
+ async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
+ """Searches for infractions in the database."""
if isinstance(query, User):
await ctx.invoke(self.search_user, query)
@@ -1112,11 +1054,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="user", aliases=("member", "id"))
- async def search_user(self, ctx: Context, user: Union[User, proxy_user]):
- """
- Search for infractions by member.
- """
-
+ async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None:
+ """Search for infractions by member."""
infraction_list = await self.bot.api_client.get(
'bot/infractions',
params={'user__id': str(user.id)}
@@ -1129,11 +1068,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
- async def search_reason(self, ctx: Context, reason: str):
- """
- Search for infractions by their reason. Use Re2 for matching.
- """
-
+ async def search_reason(self, ctx: Context, reason: str) -> None:
+ """Search for infractions by their reason. Use Re2 for matching."""
infraction_list = await self.bot.api_client.get(
'bot/infractions', params={'search': reason}
)
@@ -1146,8 +1082,8 @@ class Moderation(Scheduler):
# endregion
# region: Utility functions
- async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list):
-
+ async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None:
+ """Send a paginated embed of infractions for the specified user."""
if not infractions:
await ctx.send(f":warning: No infractions could be found for that query.")
return
@@ -1169,14 +1105,8 @@ class Moderation(Scheduler):
# endregion
# region: Utility functions
- def schedule_expiration(self, loop: asyncio.AbstractEventLoop, infraction_object: dict):
- """
- Schedules a task to expire a temporary infraction.
-
- :param loop: the asyncio event loop
- :param infraction_object: the infraction object to expire at the end of the task
- """
-
+ def schedule_expiration(self, loop: asyncio.AbstractEventLoop, infraction_object: dict) -> None:
+ """Schedules a task to expire a temporary infraction."""
infraction_id = infraction_object["id"]
if infraction_id in self.scheduled_tasks:
return
@@ -1185,12 +1115,8 @@ class Moderation(Scheduler):
self.scheduled_tasks[infraction_id] = task
- def cancel_expiration(self, infraction_id: str):
- """
- Un-schedules a task set to expire a temporary infraction.
- :param infraction_id: the ID of the infraction in question
- """
-
+ def cancel_expiration(self, infraction_id: str) -> None:
+ """Un-schedules a task set to expire a temporary infraction."""
task = self.scheduled_tasks.get(infraction_id)
if task is None:
log.warning(f"Failed to unschedule {infraction_id}: no task found.")
@@ -1199,15 +1125,13 @@ class Moderation(Scheduler):
log.debug(f"Unscheduled {infraction_id}.")
del self.scheduled_tasks[infraction_id]
- async def _scheduled_task(self, infraction_object: dict):
+ async def _scheduled_task(self, infraction_object: dict) -> None:
"""
- A co-routine which marks an infraction as expired after the delay from the time of
- scheduling to the time of expiration. At the time of expiration, the infraction is
- marked as inactive on the website, and the expiration task is cancelled.
+ Marks an infraction expired after the delay from time of scheduling to time of expiration.
- :param infraction_object: the infraction in question
+ At the time of expiration, the infraction is marked as inactive on the website, and the
+ expiration task is cancelled. The user is then notified via DM.
"""
-
infraction_id = infraction_object["id"]
# transform expiration to delay in seconds
@@ -1229,14 +1153,12 @@ class Moderation(Scheduler):
icon_url=Icons.user_unmute
)
- async def _deactivate_infraction(self, infraction_object):
+ async def _deactivate_infraction(self, infraction_object: dict) -> None:
"""
A co-routine which marks an infraction as inactive on the website.
- This co-routine does not cancel or un-schedule an expiration task.
- :param infraction_object: the infraction in question
+ This co-routine does not cancel or un-schedule an expiration task.
"""
-
guild: Guild = self.bot.get_guild(constants.Guild.id)
user_id = infraction_object["user"]
infraction_type = infraction_object["type"]
@@ -1258,7 +1180,8 @@ class Moderation(Scheduler):
json={"active": False}
)
- def _infraction_to_string(self, infraction_object):
+ def _infraction_to_string(self, infraction_object: dict) -> str:
+ """Convert the infraction object to a string representation."""
actor_id = infraction_object["actor"]
guild: Guild = self.bot.get_guild(constants.Guild.id)
actor = guild.get_member(actor_id)
@@ -1285,16 +1208,12 @@ class Moderation(Scheduler):
async def notify_infraction(
self, user: Union[User, Member], infr_type: str,
expires_at: Union[datetime, str] = 'N/A', reason: str = "No reason provided."
- ):
+ ) -> bool:
"""
- Notify a user of their fresh infraction :)
+ Attempt to notify a user, via DM, of their fresh infraction.
- :param user: The user to send the message to.
- :param infr_type: The type of infraction, as a string.
- :param duration: The duration of the infraction.
- :param reason: The reason for the infraction.
+ Optionally returns a boolean indicator of whether the DM was successful.
"""
-
if isinstance(expires_at, datetime):
expires_at = expires_at.strftime('%c')
@@ -1320,16 +1239,12 @@ class Moderation(Scheduler):
async def notify_pardon(
self, user: Union[User, Member], title: str, content: str,
icon_url: str = Icons.user_verified
- ):
+ ) -> bool:
"""
- Notify a user that an infraction has been lifted.
+ Attempt to notify a user, via DM, of their expired infraction.
- :param user: The user to send the message to.
- :param title: The title of the embed.
- :param content: The content of the embed.
- :param icon_url: URL for the title icon.
+ Optionally returns a boolean indicator of whether the DM was successful.
"""
-
embed = Embed(
description=content,
colour=Colour(Colours.soft_green)
@@ -1339,14 +1254,12 @@ class Moderation(Scheduler):
return await self.send_private_embed(user, embed)
- async def send_private_embed(self, user: Union[User, Member], embed: Embed):
+ async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool:
"""
A helper method for sending an embed to a user's DMs.
- :param user: The user to send the embed to.
- :param embed: The embed to send.
+ Returns a boolean indicator of DM success.
"""
-
# sometimes `user` is a `discord.Object`, so let's make it a proper user.
user = await self.bot.get_user_info(user.id)
@@ -1360,7 +1273,8 @@ class Moderation(Scheduler):
)
return False
- async def log_notify_failure(self, target: str, actor: Member, infraction_type: str):
+ async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None:
+ """Send a mod log entry if an attempt to DM the target user has failed."""
await self.mod_log.send_log_message(
icon_url=Icons.token_removed,
content=actor.mention,
@@ -1374,7 +1288,8 @@ class Moderation(Scheduler):
# endregion
- async def __error(self, ctx, error):
+ async def __error(self, ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
if isinstance(error, BadUnionArgument):
if User in error.converters:
await ctx.send(str(error.errors[0]))
@@ -1382,15 +1297,11 @@ class Moderation(Scheduler):
async def respect_role_hierarchy(self, ctx: Context, target: UserTypes, infr_type: str) -> bool:
"""
Check if the highest role of the invoking member is greater than that of the target member.
+
If this check fails, a warning is sent to the invoking ctx.
Returns True always if target is not a discord.Member instance.
-
- :param ctx: The command context when invoked.
- :param target: The target of the infraction.
- :param infr_type: The type of infraction.
"""
-
if not isinstance(target, Member):
return True
@@ -1409,6 +1320,7 @@ class Moderation(Scheduler):
return target_is_lower
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Moderation cog load."""
bot.add_cog(Moderation(bot))
log.info("Cog loaded: Moderation")
diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py
index c0d2e5dc5..3849d3d59 100644
--- a/bot/cogs/off_topic_names.py
+++ b/bot/cogs/off_topic_names.py
@@ -18,7 +18,8 @@ class OffTopicName(Converter):
"""A converter that ensures an added off-topic name is valid."""
@staticmethod
- async def convert(ctx: Context, argument: str):
+ async def convert(ctx: Context, argument: str) -> None:
+ """Attempt to replace any invalid characters with their approximate unicode equivalent."""
allowed_characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ!?'`-"
if not (2 <= len(argument) <= 96):
@@ -37,7 +38,7 @@ class OffTopicName(Converter):
return argument.translate(table)
-async def update_names(bot: Bot, headers: dict):
+async def update_names(bot: Bot, headers: dict) -> None:
"""
The background updater task that performs a channel name update daily.
@@ -46,7 +47,6 @@ async def update_names(bot: Bot, headers: dict):
The running bot instance, used for fetching data from the
website via the bot's `api_client`.
"""
-
while True:
# Since we truncate the compute timedelta to seconds, we add one second to ensure
# we go past midnight in the `seconds_to_sleep` set below.
@@ -77,27 +77,27 @@ class OffTopicNames:
self.headers = {"X-API-KEY": Keys.site_api}
self.updater_task = None
- def __cleanup(self):
+ def __cleanup(self) -> None:
+ """Cancel any leftover running updater task."""
if self.updater_task is not None:
self.updater_task.cancel()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Start off-topic channel updating event loop if it hasn't already started."""
if self.updater_task is None:
coro = update_names(self.bot, self.headers)
self.updater_task = self.bot.loop.create_task(coro)
@group(name='otname', aliases=('otnames', 'otn'), invoke_without_command=True)
@with_role(*MODERATION_ROLES)
- async def otname_group(self, ctx):
+ async def otname_group(self, ctx: Context) -> None:
"""Add or list items from the off-topic channel name rotation."""
-
await ctx.invoke(self.bot.get_command("help"), "otname")
@otname_group.command(name='add', aliases=('a',))
@with_role(*MODERATION_ROLES)
- async def add_command(self, ctx, name: OffTopicName):
+ async def add_command(self, ctx: Context, name: OffTopicName) -> None:
"""Adds a new off-topic name to the rotation."""
-
await self.bot.api_client.post(f'bot/off-topic-channel-names', params={'name': name})
log.info(
f"{ctx.author.name}#{ctx.author.discriminator}"
@@ -107,9 +107,8 @@ class OffTopicNames:
@otname_group.command(name='delete', aliases=('remove', 'rm', 'del', 'd'))
@with_role(*MODERATION_ROLES)
- async def delete_command(self, ctx, name: OffTopicName):
+ async def delete_command(self, ctx: Context, name: OffTopicName) -> None:
"""Removes a off-topic name from the rotation."""
-
await self.bot.api_client.delete(f'bot/off-topic-channel-names/{name}')
log.info(
f"{ctx.author.name}#{ctx.author.discriminator}"
@@ -119,12 +118,12 @@ class OffTopicNames:
@otname_group.command(name='list', aliases=('l',))
@with_role(*MODERATION_ROLES)
- async def list_command(self, ctx):
+ async def list_command(self, ctx: Context) -> None:
"""
Lists all currently known off-topic channel names in a paginator.
+
Restricted to Moderator and above to not spoil the surprise.
"""
-
result = await self.bot.api_client.get('bot/off-topic-channel-names')
lines = sorted(f"• {name}" for name in result)
embed = Embed(
@@ -138,6 +137,7 @@ class OffTopicNames:
await ctx.send(embed=embed)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Off topic names cog load."""
bot.add_cog(OffTopicNames(bot))
log.info("Cog loaded: OffTopicNames")
diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py
index 03ea00de8..d9f6f6536 100644
--- a/bot/cogs/reminders.py
+++ b/bot/cogs/reminders.py
@@ -23,13 +23,14 @@ MAXIMUM_REMINDERS = 5
class Reminders(Scheduler):
+ """Provide in-channel reminder functionality."""
def __init__(self, bot: Bot):
self.bot = bot
super().__init__()
- async def on_ready(self):
- # Get all the current reminders for re-scheduling
+ async def on_ready(self) -> None:
+ """Reschedule all current reminders."""
response = await self.bot.api_client.get(
'bot/reminders',
params={'active': 'true'}
@@ -50,25 +51,16 @@ class Reminders(Scheduler):
self.schedule_task(loop, reminder["id"], reminder)
@staticmethod
- async def _send_confirmation(ctx: Context, on_success: str):
- """
- Send an embed confirming the change was made successfully.
- """
-
+ async def _send_confirmation(ctx: Context, on_success: str) -> None:
+ """Send an embed confirming the reminder change was made successfully."""
embed = Embed()
embed.colour = Colour.green()
embed.title = random.choice(POSITIVE_REPLIES)
embed.description = on_success
await ctx.send(embed=embed)
- async def _scheduled_task(self, reminder: dict):
- """
- A coroutine which sends the reminder once the time is reached.
-
- :param reminder: the data of the reminder.
- :return:
- """
-
+ async def _scheduled_task(self, reminder: dict) -> None:
+ """A coroutine which sends the reminder once the time is reached, and cancels the running task."""
reminder_id = reminder["id"]
reminder_datetime = datetime.fromisoformat(reminder['expiration'][:-1])
@@ -82,38 +74,22 @@ class Reminders(Scheduler):
# Now we can begone with it from our schedule list.
self.cancel_task(reminder_id)
- async def _delete_reminder(self, reminder_id: str):
- """
- Delete a reminder from the database, given its ID.
-
- :param reminder_id: The ID of the reminder.
- """
-
+ async def _delete_reminder(self, reminder_id: str) -> None:
+ """Delete a reminder from the database, given its ID, and cancels the running task."""
await self.bot.api_client.delete('bot/reminders/' + str(reminder_id))
# Now we can remove it from the schedule list
self.cancel_task(reminder_id)
- async def _reschedule_reminder(self, reminder):
- """
- Reschedule a reminder object.
-
- :param reminder: The reminder to be rescheduled.
- """
-
+ async def _reschedule_reminder(self, reminder: dict) -> None:
+ """Reschedule a reminder object."""
loop = asyncio.get_event_loop()
self.cancel_task(reminder["id"])
self.schedule_task(loop, reminder["id"], reminder)
- async def send_reminder(self, reminder, late: relativedelta = None):
- """
- Send the reminder.
-
- :param reminder: The data about the reminder.
- :param late: How late the reminder is (if at all)
- """
-
+ async def send_reminder(self, reminder: dict, late: relativedelta = None) -> None:
+ """Send the reminder."""
channel = self.bot.get_channel(reminder["channel_id"])
user = self.bot.get_user(reminder["author"])
@@ -140,19 +116,17 @@ class Reminders(Scheduler):
await self._delete_reminder(reminder["id"])
@group(name="remind", aliases=("reminder", "reminders"), invoke_without_command=True)
- async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str):
- """
- Commands for managing your reminders.
- """
-
+ async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> None:
+ """Commands for managing your reminders."""
await ctx.invoke(self.new_reminder, expiration=expiration, content=content)
@remind_group.command(name="new", aliases=("add", "create"))
- async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str):
+ async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> None:
"""
Set yourself a simple reminder.
- """
+ Expiration is parsed per: http://strftime.org/
+ """
embed = Embed()
# If the user is not staff, we need to verify whether or not to make a reminder at all.
@@ -203,11 +177,8 @@ class Reminders(Scheduler):
self.schedule_task(loop, reminder["id"], reminder)
@remind_group.command(name="list")
- async def list_reminders(self, ctx: Context):
- """
- View a paginated embed of all reminders for your user.
- """
-
+ async def list_reminders(self, ctx: Context) -> None:
+ """View a paginated embed of all reminders for your user."""
# Get all the user's reminders from the database.
data = await self.bot.api_client.get(
'bot/reminders',
@@ -259,19 +230,17 @@ class Reminders(Scheduler):
)
@remind_group.group(name="edit", aliases=("change", "modify"), invoke_without_command=True)
- async def edit_reminder_group(self, ctx: Context):
- """
- Commands for modifying your current reminders.
- """
-
+ async def edit_reminder_group(self, ctx: Context) -> None:
+ """Commands for modifying your current reminders."""
await ctx.invoke(self.bot.get_command("help"), "reminders", "edit")
@edit_reminder_group.command(name="duration", aliases=("time",))
- async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate):
+ async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate) -> None:
"""
Edit one of your reminders' expiration.
- """
+ Expiration is parsed per: http://strftime.org/
+ """
# Send the request to update the reminder in the database
reminder = await self.bot.api_client.patch(
'bot/reminders/' + str(id_),
@@ -286,11 +255,8 @@ class Reminders(Scheduler):
await self._reschedule_reminder(reminder)
@edit_reminder_group.command(name="content", aliases=("reason",))
- async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str):
- """
- Edit one of your reminders' content.
- """
-
+ async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str) -> None:
+ """Edit one of your reminders' content."""
# Send the request to update the reminder in the database
reminder = await self.bot.api_client.patch(
'bot/reminders/' + str(id_),
@@ -304,17 +270,15 @@ class Reminders(Scheduler):
await self._reschedule_reminder(reminder)
@remind_group.command("delete", aliases=("remove",))
- async def delete_reminder(self, ctx: Context, id_: int):
- """
- Delete one of your active reminders.
- """
-
+ async def delete_reminder(self, ctx: Context, id_: int) -> None:
+ """Delete one of your active reminders."""
await self._delete_reminder(id_)
await self._send_confirmation(
ctx, on_success="That reminder has been deleted successfully!"
)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Reminders cog load."""
bot.add_cog(Reminders(bot))
log.info("Cog loaded: Reminders")
diff --git a/bot/cogs/security.py b/bot/cogs/security.py
index f4a843fbf..7ada9a4f6 100644
--- a/bot/cogs/security.py
+++ b/bot/cogs/security.py
@@ -6,22 +6,23 @@ log = logging.getLogger(__name__)
class Security:
- """
- Security-related helpers
- """
+ """Security-related helpers."""
def __init__(self, bot: Bot):
self.bot = bot
self.bot.check(self.check_not_bot) # Global commands check - no bots can run any commands at all
self.bot.check(self.check_on_guild) # Global commands check - commands can't be run in a DM
- def check_not_bot(self, ctx: Context):
+ def check_not_bot(self, ctx: Context) -> bool:
+ """Check if Context instance author is not a bot."""
return not ctx.author.bot
- def check_on_guild(self, ctx: Context):
+ def check_on_guild(self, ctx: Context) -> bool:
+ """Check if Context instance has a guild attribute."""
return ctx.guild is not None
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Security cog load."""
bot.add_cog(Security(bot))
log.info("Cog loaded: Security")
diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py
index 05834e421..5a5e26655 100644
--- a/bot/cogs/snekbox.py
+++ b/bot/cogs/snekbox.py
@@ -41,9 +41,7 @@ MAX_PASTE_LEN = 1000
class Snekbox:
- """
- Safe evaluation of Python code using Snekbox
- """
+ """Safe evaluation of Python code using Snekbox."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -173,7 +171,7 @@ class Snekbox:
@command(name="eval", aliases=("e",))
@guild_only()
@in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
- async def eval_command(self, ctx: Context, *, code: str = None):
+ async def eval_command(self, ctx: Context, *, code: str = None) -> None:
"""
Run Python code and get the results.
@@ -225,7 +223,8 @@ class Snekbox:
del self.jobs[ctx.author.id]
@eval_command.error
- async def eval_command_error(self, ctx: Context, error: CommandError):
+ async def eval_command_error(self, ctx: Context, error: CommandError) -> None:
+ """Eval commands error handler."""
embed = Embed(colour=Colour.red())
if isinstance(error, NoPrivateMessage):
@@ -246,6 +245,7 @@ class Snekbox:
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Snekbox cog load."""
bot.add_cog(Snekbox(bot))
log.info("Cog loaded: Snekbox")