diff options
author | 2019-09-23 16:54:02 +1000 | |
---|---|---|
committer | 2019-09-23 16:54:02 +1000 | |
commit | 08abfd5623dc5a65f4a606e94f02f3dd07a12414 (patch) | |
tree | 9e9eff020cff8a59c532e40890b61c09002d22fb | |
parent | Return the message to send (diff) | |
parent | Update linting (#406) (diff) |
Merge branch 'master' into defcon-fix-django
63 files changed, 1066 insertions, 1708 deletions
@@ -23,7 +23,9 @@ urllib3 = ">=1.24.2,<1.25" [dev-packages] flake8 = "~=3.7" +flake8-annotations = "~=1.0" flake8-bugbear = "~=19.8" +flake8-docstrings = "~=1.4" flake8-import-order = "~=0.18" flake8-string-format = "~=0.2" flake8-tidy-imports = "~=2.0" diff --git a/Pipfile.lock b/Pipfile.lock index e5978198f..9bdcff923 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "f21c27a5c4493b65a36a78721c2cb597c3eed7fcbd28f3bf731453f2c3cccb56" + "sha256": "29aaaa90a070d544e5b39fb6033410daa9bb7f658077205e44099f3175f6822b" }, "pipfile-spec": 6, "requires": { @@ -700,6 +700,14 @@ "index": "pypi", "version": "==3.7.8" }, + "flake8-annotations": { + "hashes": [ + "sha256:1309f2bc9853a2d77d578b089d331b0b832b40c97932641e136e1b49d3650c82", + "sha256:3ecdd27054c3eed6484139025698465e3c9f4e68dbd5043d0204fcb2550ee27b" + ], + "index": "pypi", + "version": "==1.0.0" + }, "flake8-bugbear": { "hashes": [ "sha256:d8c466ea79d5020cb20bf9f11cf349026e09517a42264f313d3f6fddb83e0571", @@ -708,6 +716,14 @@ "index": "pypi", "version": "==19.8.0" }, + "flake8-docstrings": { + "hashes": [ + "sha256:1666dd069c9c457ee57e80af3c1a6b37b00cc1801c6fde88e455131bb2e186cd", + "sha256:9c0db5a79a1affd70fdf53b8765c8a26bf968e59e0252d7f2fc546b41c0cda06" + ], + "index": "pypi", + "version": "==1.4.0" + }, "flake8-import-order": { "hashes": [ "sha256:90a80e46886259b9c396b578d75c749801a41ee969a235e163cfe1be7afd2543", @@ -818,6 +834,13 @@ ], "version": "==2.5.0" }, + "pydocstyle": { + "hashes": [ + "sha256:04c84e034ebb56eb6396c820442b8c4499ac5eb94a3bda88951ac3dc519b6058", + "sha256:66aff87ffe34b1e49bff2dd03a88ce6843be2f3346b0c9814410d34987fbab59" + ], + "version": "==4.0.1" + }, "pyflakes": { "hashes": [ "sha256:17dbeb2e3f4d772725c777fabc446d5634d1038f234e77343108ce445ea69ce0", @@ -890,6 +913,12 @@ ], "version": "==1.12.0" }, + "snowballstemmer": { + "hashes": [ + "sha256:713e53b79cbcf97bc5245a06080a33d54a77e7cce2f789c835a143bcdb5c033e" + ], + "version": "==1.9.1" + }, "toml": { "hashes": [ "sha256:229f81c57791a41d65e399fc06bf0848bab550a9dfd5ed66df18ce5f05e73d5c", diff --git a/bot/__init__.py b/bot/__init__.py index 8efa5e53c..d094e8c13 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -9,7 +9,7 @@ logging.TRACE = 5 logging.addLevelName(logging.TRACE, "TRACE") -def monkeypatch_trace(self, msg, *args, **kwargs): +def monkeypatch_trace(self: logging.Logger, msg: str, *args, **kwargs) -> None: """ Log 'msg % args' with severity 'TRACE'. diff --git a/bot/api.py b/bot/api.py index 3acde242e..7f26e5305 100644 --- a/bot/api.py +++ b/bot/api.py @@ -11,6 +11,8 @@ log = logging.getLogger(__name__) class ResponseCodeError(ValueError): + """Raised when a non-OK HTTP response is received.""" + def __init__( self, response: aiohttp.ClientResponse, @@ -28,6 +30,8 @@ class ResponseCodeError(ValueError): class APIClient: + """Django Site API wrapper.""" + def __init__(self, **kwargs): auth_headers = { 'Authorization': f"Token {Keys.site_api}" @@ -41,10 +45,11 @@ class APIClient: self.session = aiohttp.ClientSession(**kwargs) @staticmethod - def _url_for(endpoint: str): + def _url_for(endpoint: str) -> str: return f"{URLs.site_schema}{URLs.site_api}/{quote_url(endpoint)}" - async def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool): + async def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool) -> None: + """Raise ResponseCodeError for non-OK response if an exception should be raised.""" if should_raise and response.status >= 400: try: response_json = await response.json() @@ -53,27 +58,32 @@ class APIClient: response_text = await response.text() raise ResponseCodeError(response=response, response_text=response_text) - async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs): + async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict: + """Site API GET.""" async with self.session.get(self._url_for(endpoint), *args, **kwargs) as resp: await self.maybe_raise_for_status(resp, raise_for_status) return await resp.json() - async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs): + async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict: + """Site API PATCH.""" async with self.session.patch(self._url_for(endpoint), *args, **kwargs) as resp: await self.maybe_raise_for_status(resp, raise_for_status) return await resp.json() - async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs): + async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict: + """Site API POST.""" async with self.session.post(self._url_for(endpoint), *args, **kwargs) as resp: await self.maybe_raise_for_status(resp, raise_for_status) return await resp.json() - async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs): + async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict: + """Site API PUT.""" async with self.session.put(self._url_for(endpoint), *args, **kwargs) as resp: await self.maybe_raise_for_status(resp, raise_for_status) return await resp.json() - async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs): + async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> Optional[dict]: + """Site API DELETE.""" async with self.session.delete(self._url_for(endpoint), *args, **kwargs) as resp: if resp.status == 204: return None @@ -83,9 +93,12 @@ class APIClient: def loop_is_running() -> bool: - # asyncio does not have a way to say "call this when the event - # loop is running", see e.g. `callWhenRunning` from twisted. + """ + Determine if there is a running asyncio event loop. + This helps enable "call this when event loop is running" logic (see: Twisted's `callWhenRunning`), + which is currently not provided by asyncio. + """ try: asyncio.get_running_loop() except RuntimeError: @@ -94,6 +107,8 @@ def loop_is_running() -> bool: class APILoggingHandler(logging.StreamHandler): + """Site API logging handler.""" + def __init__(self, client: APIClient): logging.StreamHandler.__init__(self) self.client = client @@ -102,7 +117,8 @@ class APILoggingHandler(logging.StreamHandler): # on the event loop yet - scheduled when the event loop is ready. self.queue = [] - async def ship_off(self, payload: dict): + async def ship_off(self, payload: dict) -> None: + """Ship log payload to the logging API.""" try: await self.client.post('logs', json=payload) except ResponseCodeError as err: @@ -118,19 +134,19 @@ class APILoggingHandler(logging.StreamHandler): extra={'via_handler': True} ) - def emit(self, record: logging.LogRecord): - # Two checks are performed here: + def emit(self, record: logging.LogRecord) -> None: + """ + Determine if a log record should be shipped to the logging API. + + If the asyncio event loop is not yet running, log records will instead be put in a queue + which will be consumed once the event loop is running. + + The following two conditions are set: + 1. Do not log anything below DEBUG (only applies to the monkeypatched `TRACE` level) + 2. Ignore log records originating from this logging handler itself to prevent infinite recursion + """ if ( - # 1. Do not log anything below `DEBUG`. This is only applicable - # for the monkeypatched `TRACE` logging level, which has a - # lower numeric value than `DEBUG`. record.levelno >= logging.DEBUG - # 2. Ignore logging messages which are sent by this logging - # handler itself. This is required because if we were to - # not ignore messages emitted by this handler, we would - # infinitely recurse back down into this logging handler, - # making the reactor run like crazy, and eventually OOM - # something. Let's not do that... and not record.__dict__.get('via_handler') ): payload = { @@ -149,7 +165,8 @@ class APILoggingHandler(logging.StreamHandler): asyncio.create_task(task) self.schedule_queued_tasks() - def schedule_queued_tasks(self): + def schedule_queued_tasks(self) -> None: + """Consume the queue and schedule the logging of each queued record.""" for task in self.queue: asyncio.create_task(task) diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py index 3d0c9d826..80ff37983 100644 --- a/bot/cogs/alias.py +++ b/bot/cogs/alias.py @@ -3,9 +3,7 @@ import logging from typing import Union from discord import Colour, Embed, Member, User -from discord.ext.commands import ( - Cog, Command, Context, clean_content, command, group -) +from discord.ext.commands import Bot, Cog, Command, Context, clean_content, command, group from bot.cogs.watchchannels.watchchannel import proxy_user from bot.converters import TagNameConverter @@ -14,26 +12,14 @@ from bot.pagination import LinePaginator log = logging.getLogger(__name__) -class Alias(Cog): - """ - Aliases for more used commands - """ +class Alias (Cog): + """Aliases for commonly used commands.""" - def __init__(self, bot): + def __init__(self, bot: Bot): self.bot = bot - async def invoke(self, ctx, cmd_name, *args, **kwargs): - """ - Invokes a command with args and kwargs. - Fail early through `command.can_run`, and logs warnings. - - :param ctx: Context instance for command call - :param cmd_name: Name of command/subcommand to be invoked - :param args: args to be passed to the command - :param kwargs: kwargs to be passed to the command - :return: None - """ - + async def invoke(self, ctx: Context, cmd_name: str, *args, **kwargs) -> None: + """Invokes a command with args and kwargs.""" log.debug(f"{cmd_name} was invoked through an alias") cmd = self.bot.get_command(cmd_name) if not cmd: @@ -46,9 +32,8 @@ class Alias(Cog): await ctx.invoke(cmd, *args, **kwargs) @command(name='aliases') - async def aliases_command(self, ctx): + async def aliases_command(self, ctx: Context) -> None: """Show configured aliases on the bot.""" - embed = Embed( title='Configured aliases', colour=Colour.blue() @@ -64,148 +49,98 @@ class Alias(Cog): ) @command(name="resources", aliases=("resource",), hidden=True) - async def site_resources_alias(self, ctx): - """ - Alias for invoking <prefix>site resources. - """ - + async def site_resources_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>site resources.""" await self.invoke(ctx, "site resources") @command(name="watch", hidden=True) - async def bigbrother_watch_alias(self, ctx, user: Union[Member, User, proxy_user], *, reason: str): - """ - Alias for invoking <prefix>bigbrother watch [user] [reason]. - """ - + async def bigbrother_watch_alias(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None: + """Alias for invoking <prefix>bigbrother watch [user] [reason].""" await self.invoke(ctx, "bigbrother watch", user, reason=reason) @command(name="unwatch", hidden=True) - async def bigbrother_unwatch_alias(self, ctx, user: Union[User, proxy_user], *, reason: str): - """ - Alias for invoking <prefix>bigbrother unwatch [user] [reason]. - """ - + async def bigbrother_unwatch_alias(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None: + """Alias for invoking <prefix>bigbrother unwatch [user] [reason].""" await self.invoke(ctx, "bigbrother unwatch", user, reason=reason) @command(name="home", hidden=True) - async def site_home_alias(self, ctx): - """ - Alias for invoking <prefix>site home. - """ - + async def site_home_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>site home.""" await self.invoke(ctx, "site home") @command(name="faq", hidden=True) - async def site_faq_alias(self, ctx): - """ - Alias for invoking <prefix>site faq. - """ - + async def site_faq_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>site faq.""" await self.invoke(ctx, "site faq") @command(name="rules", hidden=True) - async def site_rules_alias(self, ctx): - """ - Alias for invoking <prefix>site rules. - """ - + async def site_rules_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>site rules.""" await self.invoke(ctx, "site rules") @command(name="reload", hidden=True) - async def cogs_reload_alias(self, ctx, *, cog_name: str): - """ - Alias for invoking <prefix>cogs reload [cog_name]. - - cog_name: str - name of the cog to be reloaded. - """ - + async def cogs_reload_alias(self, ctx: Context, *, cog_name: str) -> None: + """Alias for invoking <prefix>cogs reload [cog_name].""" await self.invoke(ctx, "cogs reload", cog_name) @command(name="defon", hidden=True) - async def defcon_enable_alias(self, ctx): - """ - Alias for invoking <prefix>defcon enable. - """ - + async def defcon_enable_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>defcon enable.""" await self.invoke(ctx, "defcon enable") @command(name="defoff", hidden=True) - async def defcon_disable_alias(self, ctx): - """ - Alias for invoking <prefix>defcon disable. - """ - + async def defcon_disable_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>defcon disable.""" await self.invoke(ctx, "defcon disable") @command(name="exception", hidden=True) - async def tags_get_traceback_alias(self, ctx): - """ - Alias for invoking <prefix>tags get traceback. - """ - + async def tags_get_traceback_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>tags get traceback.""" await self.invoke(ctx, "tags get", tag_name="traceback") @group(name="get", aliases=("show", "g"), hidden=True, invoke_without_command=True) - async def get_group_alias(self, ctx): - """ - Group for reverse aliases for commands like `tags get`, - allowing for `get tags` or `get docs`. - """ - + async def get_group_alias(self, ctx: Context) -> None: + """Group for reverse aliases for commands like `tags get`, allowing for `get tags` or `get docs`.""" pass @get_group_alias.command(name="tags", aliases=("tag", "t"), hidden=True) async def tags_get_alias( self, ctx: Context, *, tag_name: TagNameConverter = None - ): + ) -> None: """ Alias for invoking <prefix>tags get [tag_name]. tag_name: str - tag to be viewed. """ - await self.invoke(ctx, "tags get", tag_name=tag_name) @get_group_alias.command(name="docs", aliases=("doc", "d"), hidden=True) async def docs_get_alias( self, ctx: Context, symbol: clean_content = None - ): - """ - Alias for invoking <prefix>docs get [symbol]. - - symbol: str - name of doc to be viewed. - """ - + ) -> None: + """Alias for invoking <prefix>docs get [symbol].""" await self.invoke(ctx, "docs get", symbol) @command(name="nominate", hidden=True) - async def nomination_add_alias(self, ctx, user: Union[Member, User, proxy_user], *, reason: str): - """ - Alias for invoking <prefix>talentpool add [user] [reason]. - """ - + async def nomination_add_alias(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None: + """Alias for invoking <prefix>talentpool add [user] [reason].""" await self.invoke(ctx, "talentpool add", user, reason=reason) @command(name="unnominate", hidden=True) - async def nomination_end_alias(self, ctx, user: Union[User, proxy_user], *, reason: str): - """ - Alias for invoking <prefix>nomination end [user] [reason]. - """ - + async def nomination_end_alias(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None: + """Alias for invoking <prefix>nomination end [user] [reason].""" await self.invoke(ctx, "nomination end", user, reason=reason) @command(name="nominees", hidden=True) - async def nominees_alias(self, ctx): - """ - Alias for invoking <prefix>tp watched. - """ - + async def nominees_alias(self, ctx: Context) -> None: + """Alias for invoking <prefix>tp watched.""" await self.invoke(ctx, "talentpool watched") -def setup(bot): +def setup(bot: Bot) -> None: + """Alias cog load.""" bot.add_cog(Alias(bot)) log.info("Cog loaded: Alias") diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py index 7b97881fd..7a3360436 100644 --- a/bot/cogs/antispam.py +++ b/bot/cogs/antispam.py @@ -113,7 +113,7 @@ class AntiSpam(Cog): return self.bot.get_cog("ModLog") @Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: """Unloads the cog and alerts admins if configuration validation failed.""" if self.validation_errors: body = "**The following errors were encountered:**\n" @@ -221,9 +221,7 @@ class AntiSpam(Cog): async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]) -> None: """Cleans the messages if cleaning is configured.""" - if AntiSpamConfig.clean_offending: - # If we have more than one message, we can use bulk delete. if len(messages) > 1: message_ids = [message.id for message in messages] @@ -274,7 +272,7 @@ def validate_config(rules: Mapping = AntiSpamConfig.rules) -> Dict[str, str]: def setup(bot: Bot) -> None: - """Setup for the cog.""" + """Antispam cog load.""" validation_errors = validate_config() bot.add_cog(AntiSpam(bot, validation_errors)) log.info("Cog loaded: AntiSpam") diff --git a/bot/cogs/bot.py b/bot/cogs/bot.py index 577865a65..324d2ccd3 100644 --- a/bot/cogs/bot.py +++ b/bot/cogs/bot.py @@ -2,6 +2,7 @@ import ast import logging import re import time +from typing import Optional, Tuple from discord import Embed, Message, RawMessageUpdateEvent from discord.ext.commands import Bot, Cog, Context, command, group @@ -16,9 +17,7 @@ RE_MARKDOWN = re.compile(r'([*_~`|>])') class Bot(Cog): - """ - Bot information commands - """ + """Bot information commands.""" def __init__(self, bot: Bot): self.bot = bot @@ -47,20 +46,14 @@ class Bot(Cog): @group(invoke_without_command=True, name="bot", hidden=True) @with_role(Roles.verified) - async def botinfo_group(self, ctx: Context): - """ - Bot informational commands - """ - + async def botinfo_group(self, ctx: Context) -> None: + """Bot informational commands.""" await ctx.invoke(self.bot.get_command("help"), "bot") @botinfo_group.command(name='about', aliases=('info',), hidden=True) @with_role(Roles.verified) - async def about_command(self, ctx: Context): - """ - Get information about the bot - """ - + async def about_command(self, ctx: Context) -> None: + """Get information about the bot.""" embed = Embed( description="A utility bot designed just for the Python server! Try `!help` for more info.", url="https://github.com/python-discord/bot" @@ -78,24 +71,18 @@ class Bot(Cog): @command(name='echo', aliases=('print',)) @with_role(*MODERATION_ROLES) - async def echo_command(self, ctx: Context, *, text: str): - """ - Send the input verbatim to the current channel - """ - + async def echo_command(self, ctx: Context, *, text: str) -> None: + """Send the input verbatim to the current channel.""" await ctx.send(text) @command(name='embed') @with_role(*MODERATION_ROLES) - async def embed_command(self, ctx: Context, *, text: str): - """ - Send the input within an embed to the current channel - """ - + async def embed_command(self, ctx: Context, *, text: str) -> None: + """Send the input within an embed to the current channel.""" embed = Embed(description=text) await ctx.send(embed=embed) - def codeblock_stripping(self, msg: str, bad_ticks: bool): + def codeblock_stripping(self, msg: str, bad_ticks: bool) -> Optional[Tuple[Tuple[str, ...], str]]: """ Strip msg in order to find Python code. @@ -164,15 +151,10 @@ class Bot(Cog): log.trace(f"Returning message.\n\n{content}\n\n") return (content,), repl_code - def fix_indentation(self, msg: str): - """ - Attempts to fix badly indented code. - """ - - def unindent(code, skip_spaces=0): - """ - Unindents all code down to the number of spaces given ins skip_spaces - """ + def fix_indentation(self, msg: str) -> str: + """Attempts to fix badly indented code.""" + def unindent(code, skip_spaces: int = 0) -> str: + """Unindents all code down to the number of spaces given in skip_spaces.""" final = "" current = code[0] leading_spaces = 0 @@ -208,11 +190,13 @@ class Bot(Cog): msg = f"{first_line}\n{unindent(code, 4)}" return msg - def repl_stripping(self, msg: str): + def repl_stripping(self, msg: str) -> Tuple[str, bool]: """ Strip msg in order to extract Python code out of REPL output. Tries to strip out REPL Python code out of msg and returns the stripped msg. + + Returns True for the boolean if REPL code was found in the input msg. """ final = "" for line in msg.splitlines(keepends=True): @@ -226,7 +210,8 @@ class Bot(Cog): log.trace(f"Found REPL code in \n\n{msg}\n\n") return final.rstrip(), True - def has_bad_ticks(self, msg: Message): + def has_bad_ticks(self, msg: Message) -> bool: + """Check to see if msg contains ticks that aren't '`'.""" not_backticks = [ "'''", '"""', "\u00b4\u00b4\u00b4", "\u2018\u2018\u2018", "\u2019\u2019\u2019", "\u2032\u2032\u2032", "\u201c\u201c\u201c", "\u201d\u201d\u201d", "\u2033\u2033\u2033", @@ -236,13 +221,13 @@ class Bot(Cog): return msg.content[:3] in not_backticks @Cog.listener() - async def on_message(self, msg: Message): - """ - Detect poorly formatted Python code and send the user - a helpful message explaining how to do properly - formatted Python syntax highlighting codeblocks. + async def on_message(self, msg: Message) -> None: """ + Detect poorly formatted Python code in new messages. + If poorly formatted code is detected, send the user a helpful message explaining how to do + properly formatted Python syntax highlighting codeblocks. + """ parse_codeblock = ( ( msg.channel.id in self.channel_cooldowns @@ -361,7 +346,8 @@ class Bot(Cog): ) @Cog.listener() - async def on_raw_message_edit(self, payload: RawMessageUpdateEvent): + async def on_raw_message_edit(self, payload: RawMessageUpdateEvent) -> None: + """Check to see if an edited message (previously called out) still contains poorly formatted code.""" if ( # Checks to see if the message was called out by the bot payload.message_id not in self.codeblock_message_ids @@ -387,6 +373,7 @@ class Bot(Cog): log.trace("User's incorrect code block has been fixed. Removing bot formatting message.") -def setup(bot): +def setup(bot: Bot) -> None: + """Bot cog load.""" bot.add_cog(Bot(bot)) log.info("Cog loaded: Bot") diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py index 20c24dafc..1c0c9a7a8 100644 --- a/bot/cogs/clean.py +++ b/bot/cogs/clean.py @@ -18,17 +18,13 @@ log = logging.getLogger(__name__) class Clean(Cog): """ - A cog that allows messages to be deleted in - bulk, while applying various filters. + A cog that allows messages to be deleted in bulk, while applying various filters. - You can delete messages sent by a specific user, - messages sent by bots, all messages, or messages - that match a specific regular expression. + You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a + specific regular expression. - The deleted messages are saved and uploaded - to the database via an API endpoint, and a URL is - returned which can be used to view the messages - in the Discord dark theme style. + The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be + used to view the messages in the Discord dark theme style. """ def __init__(self, bot: Bot): @@ -37,44 +33,25 @@ class Clean(Cog): @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") async def _clean_messages( self, amount: int, ctx: Context, bots_only: bool = False, user: User = None, regex: Optional[str] = None - ): - """ - A helper function that does the actual message cleaning. - - :param bots_only: Set this to True if you only want to delete bot messages. - :param user: Specify a user and it will only delete messages by this user. - :param regular_expression: Specify a regular expression and it will only - delete messages that match this. - """ - + ) -> None: + """A helper function that does the actual message cleaning.""" def predicate_bots_only(message: Message) -> bool: - """ - Returns true if the message was sent by a bot - """ - + """Return True if the message was sent by a bot.""" return message.author.bot def predicate_specific_user(message: Message) -> bool: - """ - Return True if the message was sent by the - user provided in the _clean_messages call. - """ - + """Return True if the message was sent by the user provided in the _clean_messages call.""" return message.author == user - def predicate_regex(message: Message): - """ - Returns True if the regex provided in the - _clean_messages matches the message content - or any embed attributes the message may have. - """ - + def predicate_regex(message: Message) -> bool: + """Check if the regex provided in _clean_messages matches the message content or any embed attributes.""" content = [message.content] # Add the content for all embed attributes @@ -192,61 +169,38 @@ class Clean(Cog): @group(invoke_without_command=True, name="clean", hidden=True) @with_role(*MODERATION_ROLES) - async def clean_group(self, ctx: Context): - """ - Commands for cleaning messages in channels - """ - + async def clean_group(self, ctx: Context) -> None: + """Commands for cleaning messages in channels.""" await ctx.invoke(self.bot.get_command("help"), "clean") @clean_group.command(name="user", aliases=["users"]) @with_role(*MODERATION_ROLES) - async def clean_user(self, ctx: Context, user: User, amount: int = 10): - """ - Delete messages posted by the provided user, - and stop cleaning after traversing `amount` messages. - """ - + async def clean_user(self, ctx: Context, user: User, amount: int = 10) -> None: + """Delete messages posted by the provided user, stop cleaning after traversing `amount` messages.""" await self._clean_messages(amount, ctx, user=user) @clean_group.command(name="all", aliases=["everything"]) @with_role(*MODERATION_ROLES) - async def clean_all(self, ctx: Context, amount: int = 10): - """ - Delete all messages, regardless of poster, - and stop cleaning after traversing `amount` messages. - """ - + async def clean_all(self, ctx: Context, amount: int = 10) -> None: + """Delete all messages, regardless of poster, stop cleaning after traversing `amount` messages.""" await self._clean_messages(amount, ctx) @clean_group.command(name="bots", aliases=["bot"]) @with_role(*MODERATION_ROLES) - async def clean_bots(self, ctx: Context, amount: int = 10): - """ - Delete all messages posted by a bot, - and stop cleaning after traversing `amount` messages. - """ - + async def clean_bots(self, ctx: Context, amount: int = 10) -> None: + """Delete all messages posted by a bot, stop cleaning after traversing `amount` messages.""" await self._clean_messages(amount, ctx, bots_only=True) @clean_group.command(name="regex", aliases=["word", "expression"]) @with_role(*MODERATION_ROLES) - async def clean_regex(self, ctx: Context, regex, amount: int = 10): - """ - Delete all messages that match a certain regex, - and stop cleaning after traversing `amount` messages. - """ - + async def clean_regex(self, ctx: Context, regex: str, amount: int = 10) -> None: + """Delete all messages that match a certain regex, stop cleaning after traversing `amount` messages.""" await self._clean_messages(amount, ctx, regex=regex) @clean_group.command(name="stop", aliases=["cancel", "abort"]) @with_role(*MODERATION_ROLES) - async def clean_cancel(self, ctx: Context): - """ - If there is an ongoing cleaning process, - attempt to immediately cancel it. - """ - + async def clean_cancel(self, ctx: Context) -> None: + """If there is an ongoing cleaning process, attempt to immediately cancel it.""" self.cleaning = False embed = Embed( @@ -256,6 +210,7 @@ class Clean(Cog): await ctx.send(embed=embed, delete_after=10) -def setup(bot): +def setup(bot: Bot) -> None: + """Clean cog load.""" bot.add_cog(Clean(bot)) log.info("Cog loaded: Clean") diff --git a/bot/cogs/cogs.py b/bot/cogs/cogs.py index ec497b966..117c77d4b 100644 --- a/bot/cogs/cogs.py +++ b/bot/cogs/cogs.py @@ -16,9 +16,7 @@ KEEP_LOADED = ["bot.cogs.cogs", "bot.cogs.modlog"] class Cogs(Cog): - """ - Cog management commands - """ + """Cog management commands.""" def __init__(self, bot: Bot): self.bot = bot @@ -38,21 +36,19 @@ class Cogs(Cog): @group(name='cogs', aliases=('c',), invoke_without_command=True) @with_role(*MODERATION_ROLES, Roles.core_developer) - async def cogs_group(self, ctx: Context): + async def cogs_group(self, ctx: Context) -> None: """Load, unload, reload, and list active cogs.""" - await ctx.invoke(self.bot.get_command("help"), "cogs") @cogs_group.command(name='load', aliases=('l',)) @with_role(*MODERATION_ROLES, Roles.core_developer) - async def load_command(self, ctx: Context, cog: str): + async def load_command(self, ctx: Context, cog: str) -> None: """ - Load up an unloaded cog, given the module containing it + Load up an unloaded cog, given the module containing it. You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the entire module directly. """ - cog = cog.lower() embed = Embed() @@ -98,14 +94,13 @@ class Cogs(Cog): @cogs_group.command(name='unload', aliases=('ul',)) @with_role(*MODERATION_ROLES, Roles.core_developer) - async def unload_command(self, ctx: Context, cog: str): + async def unload_command(self, ctx: Context, cog: str) -> None: """ - Unload an already-loaded cog, given the module containing it + Unload an already-loaded cog, given the module containing it. You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the entire module directly. """ - cog = cog.lower() embed = Embed() @@ -150,9 +145,9 @@ class Cogs(Cog): @cogs_group.command(name='reload', aliases=('r',)) @with_role(*MODERATION_ROLES, Roles.core_developer) - async def reload_command(self, ctx: Context, cog: str): + async def reload_command(self, ctx: Context, cog: str) -> None: """ - Reload an unloaded cog, given the module containing it + Reload an unloaded cog, given the module containing it. You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the entire module directly. @@ -160,7 +155,6 @@ class Cogs(Cog): If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the bot/cogs directory will be loaded. """ - cog = cog.lower() embed = Embed() @@ -232,7 +226,8 @@ class Cogs(Cog): log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n" f"{lines}") - return await LinePaginator.paginate(lines, ctx, embed, empty=False) + await LinePaginator.paginate(lines, ctx, embed, empty=False) + return elif full_cog in self.bot.extensions: try: @@ -255,13 +250,12 @@ class Cogs(Cog): @cogs_group.command(name='list', aliases=('all',)) @with_role(*MODERATION_ROLES, Roles.core_developer) - async def list_command(self, ctx: Context): + async def list_command(self, ctx: Context) -> None: """ Get a list of all cogs, including their loaded status. Gray indicates that the cog is unloaded. Green indicates that the cog is currently loaded. """ - embed = Embed() lines = [] cogs = {} @@ -301,6 +295,7 @@ class Cogs(Cog): await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False) -def setup(bot): +def setup(bot: Bot) -> None: + """Cogs cog load.""" bot.add_cog(Cogs(bot)) log.info("Cog loaded: Cogs") diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py index cf47697ff..048d8a683 100644 --- a/bot/cogs/defcon.py +++ b/bot/cogs/defcon.py @@ -25,7 +25,8 @@ BASE_CHANNEL_TOPIC = "Python Discord Defense Mechanism" class Defcon(Cog): - """Time-sensitive server defense mechanisms""" + """Time-sensitive server defense mechanisms.""" + days = None # type: timedelta enabled = False # type: bool @@ -36,10 +37,12 @@ class Defcon(Cog): @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: + """On cog load, try to synchronize DEFCON settings to the API.""" self.channel = await self.bot.fetch_channel(Channels.defcon) try: response = await self.bot.api_client.get('bot/bot-settings/defcon') @@ -65,7 +68,8 @@ class Defcon(Cog): await self.update_channel_topic() @Cog.listener() - async def on_member_join(self, member: Member): + async def on_member_join(self, member: Member) -> None: + """If DEFCON is enabled, check newly joining users to see if they meet the account age threshold.""" if self.enabled and self.days.days > 0: now = datetime.utcnow() @@ -98,21 +102,19 @@ class Defcon(Cog): @group(name='defcon', aliases=('dc',), invoke_without_command=True) @with_role(Roles.admin, Roles.owner) - async def defcon_group(self, ctx: Context): + async def defcon_group(self, ctx: Context) -> None: """Check the DEFCON status or run a subcommand.""" - await ctx.invoke(self.bot.get_command("help"), "defcon") @defcon_group.command(name='enable', aliases=('on', 'e')) @with_role(Roles.admin, Roles.owner) - async def enable_command(self, ctx: Context): + async def enable_command(self, ctx: Context) -> None: """ Enable DEFCON mode. Useful in a pinch, but be sure you know what you're doing! - Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must - be, in days. + Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must be, + in days. """ - self.enabled = True try: @@ -141,11 +143,8 @@ class Defcon(Cog): @defcon_group.command(name='disable', aliases=('off', 'd')) @with_role(Roles.admin, Roles.owner) - async def disable_command(self, ctx: Context): - """ - Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing! - """ - + async def disable_command(self, ctx: Context) -> None: + """Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!""" self.enabled = False try: @@ -171,11 +170,8 @@ class Defcon(Cog): @defcon_group.command(name='status', aliases=('s',)) @with_role(Roles.admin, Roles.owner) - async def status_command(self, ctx: Context): - """ - Check the current status of DEFCON mode. - """ - + async def status_command(self, ctx: Context) -> None: + """Check the current status of DEFCON mode.""" embed = Embed( colour=Colour.blurple(), title="DEFCON Status", description=f"**Enabled:** {self.enabled}\n" @@ -186,11 +182,8 @@ class Defcon(Cog): @defcon_group.command(name='days') @with_role(Roles.admin, Roles.owner) - async def days_command(self, ctx: Context, days: int): - """ - Set how old an account must be to join the server, in days, with DEFCON mode enabled. - """ - + async def days_command(self, ctx: Context, days: int) -> None: + """Set how old an account must be to join the server, in days, with DEFCON mode enabled.""" self.days = timedelta(days=days) try: @@ -218,11 +211,8 @@ class Defcon(Cog): await self.update_channel_topic() - async def update_channel_topic(self): - """ - Update the #defcon channel topic with the current DEFCON status - """ - + async def update_channel_topic(self) -> None: + """Update the #defcon channel topic with the current DEFCON status.""" if self.enabled: day_str = "days" if self.days.days > 1 else "day" new_topic = f"{BASE_CHANNEL_TOPIC}\n(Status: Enabled, Threshold: {self.days.days} {day_str})" @@ -288,6 +278,7 @@ class Defcon(Cog): await self.mod_log.send_log_message(icon, color, status_msg, log_msg) -def setup(bot: Bot): +def setup(bot: Bot) -> None: + """DEFCON cog load.""" bot.add_cog(Defcon(bot)) log.info("Cog loaded: Defcon") diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py index ebf2c1d65..e5c51748f 100644 --- a/bot/cogs/doc.py +++ b/bot/cogs/doc.py @@ -4,10 +4,11 @@ import logging import re import textwrap from collections import OrderedDict -from typing import Optional, Tuple +from typing import Any, Callable, Optional, Tuple import discord from bs4 import BeautifulSoup +from bs4.element import PageElement from discord.ext import commands from markdownify import MarkdownConverter from requests import ConnectionError @@ -27,24 +28,22 @@ UNWANTED_SIGNATURE_SYMBOLS = ('[source]', '¶') WHITESPACE_AFTER_NEWLINES_RE = re.compile(r"(?<=\n\n)(\s+)") -def async_cache(max_size=128, arg_offset=0): +def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable: """ LRU cache implementation for coroutines. - :param max_size: - Specifies the maximum size the cache should have. - Once it exceeds the maximum size, keys are deleted in FIFO order. - :param arg_offset: - The offset that should be applied to the coroutine's arguments - when creating the cache key. Defaults to `0`. - """ + Once the cache exceeds the maximum size, keys are deleted in FIFO order. + An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key. + """ # Assign the cache to the function itself so we can clear it from outside. async_cache.cache = OrderedDict() - def decorator(function): + def decorator(function: Callable) -> Callable: + """Define the async_cache decorator.""" @functools.wraps(function) - async def wrapper(*args): + async def wrapper(*args) -> Any: + """Decorator wrapper for the caching logic.""" key = ':'.join(args[arg_offset:]) value = async_cache.cache.get(key) @@ -59,27 +58,25 @@ def async_cache(max_size=128, arg_offset=0): class DocMarkdownConverter(MarkdownConverter): - def convert_code(self, el, text): - """Undo `markdownify`s underscore escaping.""" + """Subclass markdownify's MarkdownCoverter to provide custom conversion methods.""" + def convert_code(self, el: PageElement, text: str) -> str: + """Undo `markdownify`s underscore escaping.""" return f"`{text}`".replace('\\', '') - def convert_pre(self, el, text): + def convert_pre(self, el: PageElement, text: str) -> str: """Wrap any codeblocks in `py` for syntax highlighting.""" - code = ''.join(el.strings) return f"```py\n{code}```" -def markdownify(html): +def markdownify(html: str) -> DocMarkdownConverter: + """Create a DocMarkdownConverter object from the input html.""" return DocMarkdownConverter(bullets='•').convert(html) class DummyObject(object): - """ - A dummy object which supports assigning anything, - which the builtin `object()` does not support normally. - """ + """A dummy object which supports assigning anything, which the builtin `object()` does not support normally.""" class SphinxConfiguration: @@ -94,14 +91,15 @@ class InventoryURL(commands.Converter): """ Represents an Intersphinx inventory URL. - This converter checks whether intersphinx - accepts the given inventory URL, and raises + This converter checks whether intersphinx accepts the given inventory URL, and raises `BadArgument` if that is not the case. + Otherwise, it simply passes through the given URL. """ @staticmethod - async def convert(ctx, url: str): + async def convert(ctx: commands.Context, url: str) -> str: + """Convert url to Intersphinx inventory URL.""" try: intersphinx.fetch_inventory(SphinxConfiguration(), '', url) except AttributeError: @@ -121,31 +119,33 @@ class InventoryURL(commands.Converter): class Doc(commands.Cog): - def __init__(self, bot): + """A set of commands for querying & displaying documentation.""" + + def __init__(self, bot: commands.Bot): self.base_urls = {} self.bot = bot self.inventories = {} @commands.Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: + """Refresh documentation inventory.""" await self.refresh_inventory() async def update_single( self, package_name: str, base_url: str, inventory_url: str, config: SphinxConfiguration - ): + ) -> None: """ Rebuild the inventory for a single package. - :param package_name: The package name to use, appears in the log. - :param base_url: The root documentation URL for the specified package. - Used to build absolute paths that link to specific symbols. - :param inventory_url: The absolute URL to the intersphinx inventory. - Fetched by running `intersphinx.fetch_inventory` in an - executor on the bot's event loop. - :param config: A `SphinxConfiguration` instance to mock the regular sphinx - project layout. Required for use with intersphinx. + Where: + * `package_name` is the package name to use, appears in the log + * `base_url` is the root documentation URL for the specified package, used to build + absolute paths that link to specific symbols + * `inventory_url` is the absolute URL to the intersphinx inventory, fetched by running + `intersphinx.fetch_inventory` in an executor on the bot's event loop + * `config` is a `SphinxConfiguration` instance to mock the regular sphinx + project layout, required for use with intersphinx """ - self.base_urls[package_name] = base_url fetch_func = functools.partial(intersphinx.fetch_inventory, config, '', inventory_url) @@ -159,7 +159,8 @@ class Doc(commands.Cog): log.trace(f"Fetched inventory for {package_name}.") - async def refresh_inventory(self): + async def refresh_inventory(self) -> None: + """Refresh internal documentation inventory.""" log.debug("Refreshing documentation inventory...") # Clear the old base URLS and inventories to ensure @@ -186,16 +187,13 @@ class Doc(commands.Cog): """ Given a Python symbol, return its signature and description. - :param symbol: The symbol for which HTML data should be returned. - :return: - A tuple in the form (str, str), or `None`. - The first tuple element is the signature of the given - symbol as a markup-free string, and the second tuple - element is the description of the given symbol with HTML - markup included. If the given symbol could not be found, - returns `None`. - """ + Returns a tuple in the form (str, str), or `None`. + The first tuple element is the signature of the given symbol as a markup-free string, and + the second tuple element is the description of the given symbol with HTML markup included. + + If the given symbol could not be found, returns `None`. + """ url = self.inventories.get(symbol) if url is None: return None @@ -223,16 +221,10 @@ class Doc(commands.Cog): @async_cache(arg_offset=1) async def get_symbol_embed(self, symbol: str) -> Optional[discord.Embed]: """ - Using `get_symbol_html`, attempt to scrape and - fetch the data for the given `symbol`, and build - a formatted embed out of its contents. - - :param symbol: The symbol for which the embed should be returned - :return: - If the symbol is known, an Embed with documentation about it. - Otherwise, `None`. - """ + Attempt to scrape and fetch the data for the given `symbol`, and build an embed from its contents. + If the symbol is known, an Embed with documentation about it is returned. + """ scraped_html = await self.get_symbol_html(symbol) if scraped_html is None: return None @@ -267,20 +259,16 @@ class Doc(commands.Cog): ) @commands.group(name='docs', aliases=('doc', 'd'), invoke_without_command=True) - async def docs_group(self, ctx, symbol: commands.clean_content = None): + async def docs_group(self, ctx: commands.Context, symbol: commands.clean_content = None) -> None: """Lookup documentation for Python symbols.""" - await ctx.invoke(self.get_command) @docs_group.command(name='get', aliases=('g',)) - async def get_command(self, ctx, symbol: commands.clean_content = None): + async def get_command(self, ctx: commands.Context, symbol: commands.clean_content = None) -> None: """ Return a documentation embed for a given symbol. - If no symbol is given, return a list of all available inventories. - :param ctx: Discord message context - :param symbol: The symbol for which documentation should be returned, - or nothing to get a list of all inventories + If no symbol is given, return a list of all available inventories. Examples: !docs @@ -288,7 +276,6 @@ class Doc(commands.Cog): !docs aiohttp.ClientSession !docs get aiohttp.ClientSession """ - if symbol is None: inventory_embed = discord.Embed( title=f"All inventories (`{len(self.base_urls)}` total)", @@ -322,18 +309,13 @@ class Doc(commands.Cog): @docs_group.command(name='set', aliases=('s',)) @with_role(*MODERATION_ROLES) async def set_command( - self, ctx, package_name: ValidPythonIdentifier, + self, ctx: commands.Context, package_name: ValidPythonIdentifier, base_url: ValidURL, inventory_url: InventoryURL - ): + ) -> None: """ Adds a new documentation metadata object to the site's database. - The database will update the object, should an existing item - with the specified `package_name` already exist. - :param ctx: Discord message context - :param package_name: The package name, for example `aiohttp`. - :param base_url: The package documentation's root URL, used to build absolute links. - :param inventory_url: The intersphinx inventory URL. + The database will update the object, should an existing item with the specified `package_name` already exist. Example: !docs set \ @@ -341,7 +323,6 @@ class Doc(commands.Cog): https://discordpy.readthedocs.io/en/rewrite/ \ https://discordpy.readthedocs.io/en/rewrite/objects.inv """ - body = { 'package': package_name, 'base_url': base_url, @@ -365,17 +346,13 @@ class Doc(commands.Cog): @docs_group.command(name='delete', aliases=('remove', 'rm', 'd')) @with_role(*MODERATION_ROLES) - async def delete_command(self, ctx, package_name: ValidPythonIdentifier): + async def delete_command(self, ctx: commands.Context, package_name: ValidPythonIdentifier) -> None: """ Removes the specified package from the database. - :param ctx: Discord message context - :param package_name: The package name, for example `aiohttp`. - Examples: !docs delete aiohttp """ - await self.bot.api_client.delete(f'bot/documentation-links/{package_name}') async with ctx.typing(): @@ -385,5 +362,7 @@ class Doc(commands.Cog): await ctx.send(f"Successfully deleted `{package_name}` and refreshed inventory.") -def setup(bot): +def setup(bot: commands.Bot) -> None: + """Doc cog load.""" bot.add_cog(Doc(bot)) + log.info("Cog loaded: Doc") diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py index e2d8c3a8f..49411814c 100644 --- a/bot/cogs/error_handler.py +++ b/bot/cogs/error_handler.py @@ -30,7 +30,27 @@ class ErrorHandler(Cog): self.bot = bot @Cog.listener() - async def on_command_error(self, ctx: Context, e: CommandError): + async def on_command_error(self, ctx: Context, e: CommandError) -> None: + """ + Provide generic command error handling. + + Error handling is deferred to any local error handler, if present. + + Error handling emits a single error response, prioritized as follows: + 1. If the name fails to match a command but matches a tag, the tag is invoked + 2. Send a BadArgument error message to the invoking context & invoke the command's help + 3. Send a UserInputError error message to the invoking context & invoke the command's help + 4. Send a NoPrivateMessage error message to the invoking context + 5. Send a BotMissingPermissions error message to the invoking context + 6. Log a MissingPermissions error, no message is sent + 7. Send a InChannelCheckFailure error message to the invoking context + 8. Log CheckFailure, CommandOnCooldown, and DisabledCommand errors, no message is sent + 9. For CommandInvokeErrors, response is based on the type of error: + * 404: Error message is sent to the invoking context + * 400: Log the resopnse JSON, no message is sent + * 500 <= status <= 600: Error message is sent to the invoking context + 10. Otherwise, handling is deferred to `handle_unexpected_error` + """ command = ctx.command parent = None @@ -57,7 +77,8 @@ class ErrorHandler(Cog): # Return to not raise the exception with contextlib.suppress(ResponseCodeError): - return await ctx.invoke(tags_get_command, tag_name=ctx.invoked_with) + await ctx.invoke(tags_get_command, tag_name=ctx.invoked_with) + return elif isinstance(e, BadArgument): await ctx.send(f"Bad argument: {e}\n") await ctx.invoke(*help_command) @@ -109,7 +130,8 @@ class ErrorHandler(Cog): await self.handle_unexpected_error(ctx, e) @staticmethod - async def handle_unexpected_error(ctx: Context, e: CommandError): + async def handle_unexpected_error(ctx: Context, e: CommandError) -> None: + """Generic handler for errors without an explicit handler.""" await ctx.send( f"Sorry, an unexpected error occurred. Please let us know!\n\n" f"```{e.__class__.__name__}: {e}```" @@ -120,6 +142,7 @@ class ErrorHandler(Cog): raise e -def setup(bot: Bot): +def setup(bot: Bot) -> None: + """Error handler cog load.""" bot.add_cog(ErrorHandler(bot)) log.info("Cog loaded: Events") diff --git a/bot/cogs/eval.py b/bot/cogs/eval.py index c52c04df1..9ce854f2c 100644 --- a/bot/cogs/eval.py +++ b/bot/cogs/eval.py @@ -6,9 +6,10 @@ import re import textwrap import traceback from io import StringIO +from typing import Any, Optional, Tuple import discord -from discord.ext.commands import Bot, Cog, group +from discord.ext.commands import Bot, Cog, Context, group from bot.constants import Roles from bot.decorators import with_role @@ -18,10 +19,7 @@ log = logging.getLogger(__name__) class CodeEval(Cog): - """ - Owner and admin feature that evaluates code - and returns the result to the channel. - """ + """Owner and admin feature that evaluates code and returns the result to the channel.""" def __init__(self, bot: Bot): self.bot = bot @@ -31,7 +29,8 @@ class CodeEval(Cog): self.interpreter = Interpreter(bot) - def _format(self, inp, out): # (str, Any) -> (str, discord.Embed) + def _format(self, inp: str, out: Any) -> Tuple[str, Optional[discord.Embed]]: + """Format the eval output into a string & attempt to format it into an Embed.""" self._ = out res = "" @@ -124,7 +123,8 @@ class CodeEval(Cog): return res # Return (text, embed) - async def _eval(self, ctx, code): # (discord.Context, str) -> None + async def _eval(self, ctx: Context, code: str) -> Optional[discord.Message]: + """Eval the input code string & send an embed to the invoking context.""" self.ln += 1 if code.startswith("exit"): @@ -174,16 +174,15 @@ async def func(): # (None,) -> Any @group(name='internal', aliases=('int',)) @with_role(Roles.owner, Roles.admin) - async def internal_group(self, ctx): + async def internal_group(self, ctx: Context) -> None: """Internal commands. Top secret!""" - if not ctx.invoked_subcommand: await ctx.invoke(self.bot.get_command("help"), "internal") @internal_group.command(name='eval', aliases=('e',)) @with_role(Roles.admin, Roles.owner) - async def eval(self, ctx, *, code: str): - """ Run eval in a REPL-like format. """ + async def eval(self, ctx: Context, *, code: str) -> None: + """Run eval in a REPL-like format.""" code = code.strip("`") if re.match('py(thon)?\n', code): code = "\n".join(code.split("\n")[1:]) @@ -197,6 +196,7 @@ async def func(): # (None,) -> Any await self._eval(ctx, code) -def setup(bot): +def setup(bot: Bot) -> None: + """Code eval cog load.""" bot.add_cog(CodeEval(bot)) log.info("Cog loaded: Eval") diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py index dc4de7ff1..9cd1b7203 100644 --- a/bot/cogs/filtering.py +++ b/bot/cogs/filtering.py @@ -30,10 +30,7 @@ ZALGO_RE = r"[\u0300-\u036F\u0489]" class Filtering(Cog): - """ - Filtering out invites, blacklisting domains, - and warning us of certain regular expressions - """ + """Filtering out invites, blacklisting domains, and warning us of certain regular expressions.""" def __init__(self, bot: Bot): self.bot = bot @@ -94,28 +91,29 @@ class Filtering(Cog): @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @Cog.listener() - async def on_message(self, msg: Message): + async def on_message(self, msg: Message) -> None: + """Invoke message filter for new messages.""" await self._filter_message(msg) @Cog.listener() - async def on_message_edit(self, before: Message, after: Message): + async def on_message_edit(self, before: Message, after: Message) -> None: + """ + Invoke message filter for message edits. + + If there have been multiple edits, calculate the time delta from the previous edit. + """ if not before.edited_at: delta = relativedelta(after.edited_at, before.created_at).microseconds else: delta = relativedelta(after.edited_at, before.edited_at).microseconds await self._filter_message(after, delta) - async def _filter_message(self, msg: Message, delta: Optional[int] = None): - """ - Whenever a message is sent or edited, - run it through our filters to see if it - violates any of our rules, and then respond - accordingly. - """ - + async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None: + """Filter the input message to see if it violates any of our rules, and then respond accordingly.""" # Should we filter this message? role_whitelisted = False @@ -226,14 +224,10 @@ class Filtering(Cog): @staticmethod async def _has_watchlist_words(text: str) -> bool: """ - Returns True if the text contains - one of the regular expressions from the - word_watchlist in our filter config. + Returns True if the text contains one of the regular expressions from the word_watchlist in our filter config. - Only matches words with boundaries before - and after the expression. + Only matches words with boundaries before and after the expression. """ - for expression in Filter.word_watchlist: if re.search(fr"\b{expression}\b", text, re.IGNORECASE): return True @@ -243,14 +237,10 @@ class Filtering(Cog): @staticmethod async def _has_watchlist_tokens(text: str) -> bool: """ - Returns True if the text contains - one of the regular expressions from the - token_watchlist in our filter config. + Returns True if the text contains one of the regular expressions from the token_watchlist in our filter config. - This will match the expression even if it - does not have boundaries before and after + This will match the expression even if it does not have boundaries before and after. """ - for expression in Filter.token_watchlist: if re.search(fr"{expression}", text, re.IGNORECASE): @@ -262,11 +252,7 @@ class Filtering(Cog): @staticmethod async def _has_urls(text: str) -> bool: - """ - Returns True if the text contains one of - the blacklisted URLs from the config file. - """ - + """Returns True if the text contains one of the blacklisted URLs from the config file.""" if not re.search(URL_RE, text, re.IGNORECASE): return False @@ -285,7 +271,6 @@ class Filtering(Cog): Zalgo range is \u0300 – \u036F and \u0489. """ - return bool(re.search(ZALGO_RE, text)) async def _has_invites(self, text: str) -> Union[dict, bool]: @@ -297,7 +282,6 @@ class Filtering(Cog): Attempts to catch some of common ways to try to cheat the system. """ - # Remove backslashes to prevent escape character aroundfuckery like # discord\.gg/gdudes-pony-farm text = text.replace("\\", "") @@ -338,30 +322,27 @@ class Filtering(Cog): return invite_data if invite_data else False @staticmethod - async def _has_rich_embed(msg: Message): - """ - Returns True if any of the embeds in the message are of type 'rich', but are not twitter - embeds. Returns False otherwise. - """ + async def _has_rich_embed(msg: Message) -> bool: + """Returns True if any of the embeds in the message are of type 'rich', but are not twitter embeds.""" if msg.embeds: for embed in msg.embeds: if embed.type == "rich" and (not embed.url or "twitter.com" not in embed.url): return True return False - async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel): + async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None: """ - Notify filtered_member about a moderation action with the reason str + Notify filtered_member about a moderation action with the reason str. First attempts to DM the user, fall back to in-channel notification if user has DMs disabled """ - try: await filtered_member.send(reason) except discord.errors.Forbidden: await channel.send(f"{filtered_member.mention} {reason}") -def setup(bot: Bot): +def setup(bot: Bot) -> None: + """Filtering cog load.""" bot.add_cog(Filtering(bot)) log.info("Cog loaded: Filtering") diff --git a/bot/cogs/free.py b/bot/cogs/free.py index 92a9ca041..167fab319 100644 --- a/bot/cogs/free.py +++ b/bot/cogs/free.py @@ -2,7 +2,7 @@ import logging from datetime import datetime from discord import Colour, Embed, Member, utils -from discord.ext.commands import Cog, Context, command +from discord.ext.commands import Bot, Cog, Context, command from bot.constants import Categories, Channels, Free, STAFF_ROLES from bot.decorators import redirect_output @@ -22,11 +22,9 @@ class Free(Cog): @command(name="free", aliases=('f',)) @redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES) - async def free(self, ctx: Context, user: Member = None, seek: int = 2): + async def free(self, ctx: Context, user: Member = None, seek: int = 2) -> None: """ Lists free help channels by likeliness of availability. - :param user: accepts user mention, ID, etc. - :param seek: How far back to check the last active message. seek is used only when this command is invoked in a help channel. You cannot override seek without mentioning a user first. @@ -101,6 +99,7 @@ class Free(Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: + """Free cog load.""" bot.add_cog(Free()) log.info("Cog loaded: Free") diff --git a/bot/cogs/help.py b/bot/cogs/help.py index 31e729003..4971cd0bb 100644 --- a/bot/cogs/help.py +++ b/bot/cogs/help.py @@ -3,10 +3,11 @@ import inspect import itertools from collections import namedtuple from contextlib import suppress +from typing import Union -from discord import Colour, Embed, HTTPException +from discord import Colour, Embed, HTTPException, Message, Reaction, User from discord.ext import commands -from discord.ext.commands import CheckFailure, Cog as DiscordCog +from discord.ext.commands import Bot, CheckFailure, Cog as DiscordCog, Command, Context from fuzzywuzzy import fuzz, process from bot import constants @@ -35,15 +36,11 @@ class HelpQueryNotFound(ValueError): Contains the custom attribute of ``possible_matches``. - Attributes - ---------- - possible_matches: dict - Any commands that were close to matching the Query. - The possible matched command names are the keys. - The likeness match scores are the values. + Instances of this object contain a dictionary of any command(s) that were close to matching the + query, where keys are the possible matched command names and values are the likeness match scores. """ - def __init__(self, arg, possible_matches=None): + def __init__(self, arg: str, possible_matches: dict = None): super().__init__(arg) self.possible_matches = possible_matches @@ -52,48 +49,30 @@ class HelpSession: """ An interactive session for bot and command help output. - Attributes - ---------- - title: str - The title of the help message. - query: Union[:class:`discord.ext.commands.Bot`, - :class:`discord.ext.commands.Command] - description: str - The description of the query. - pages: list[str] - A list of the help content split into manageable pages. - message: :class:`discord.Message` - The message object that's showing the help contents. - destination: :class:`discord.abc.Messageable` - Where the help message is to be sent to. + Expected attributes include: + * title: str + The title of the help message. + * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command] + * description: str + The description of the query. + * pages: list[str] + A list of the help content split into manageable pages. + * message: `discord.Message` + The message object that's showing the help contents. + * destination: `discord.abc.Messageable` + Where the help message is to be sent to. """ - def __init__(self, ctx, *command, cleanup=False, only_can_run=True, show_hidden=False, max_lines=15): - """ - Creates an instance of the HelpSession class. - - Parameters - ---------- - ctx: :class:`discord.Context` - The context of the invoked help command. - *command: str - A variable argument of the command being queried. - cleanup: Optional[bool] - Set to ``True`` to have the message deleted on timeout. - If ``False``, it will clear all reactions on timeout. - Defaults to ``False``. - only_can_run: Optional[bool] - Set to ``True`` to hide commands the user can't run. - Defaults to ``False``. - show_hidden: Optional[bool] - Set to ``True`` to include hidden commands. - Defaults to ``False``. - max_lines: Optional[int] - Sets the max number of lines the paginator will add to a - single page. - Defaults to 20. - """ - + def __init__( + self, + ctx: Context, + *command, + cleanup: bool = False, + only_can_run: bool = True, + show_hidden: bool = False, + max_lines: int = 15 + ): + """Creates an instance of the HelpSession class.""" self._ctx = ctx self._bot = ctx.bot self.title = "Command Help" @@ -122,20 +101,8 @@ class HelpSession: self._timeout_task = None self.reset_timeout() - def _get_query(self, query): - """ - Attempts to match the provided query with a valid command or cog. - - Parameters - ---------- - query: str - The joined string representing the session query. - - Returns - ------- - Union[:class:`discord.ext.commands.Command`, :class:`Cog`] - """ - + def _get_query(self, query: str) -> Union[Command, Cog]: + """Attempts to match the provided query with a valid command or cog.""" command = self._bot.get_command(query) if command: return command @@ -150,48 +117,26 @@ class HelpSession: self._handle_not_found(query) - def _handle_not_found(self, query): + def _handle_not_found(self, query: str) -> None: """ Handles when a query does not match a valid command or cog. - Will pass on possible close matches along with the - ``HelpQueryNotFound`` exception. - - Parameters - ---------- - query: str - The full query that was requested. - - Raises - ------ - HelpQueryNotFound + Will pass on possible close matches along with the `HelpQueryNotFound` exception. """ - - # combine command and cog names + # Combine command and cog names choices = list(self._bot.all_commands) + list(self._bot.cogs) result = process.extractBests(query, choices, scorer=fuzz.ratio, score_cutoff=90) raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result)) - async def timeout(self, seconds=30): - """ - Waits for a set number of seconds, then stops the help session. - - Parameters - ---------- - seconds: int - Number of seconds to wait. - """ - + async def timeout(self, seconds: int = 30) -> None: + """Waits for a set number of seconds, then stops the help session.""" await asyncio.sleep(seconds) await self.stop() - def reset_timeout(self): - """ - Cancels the original timeout task and sets it again from the start. - """ - + def reset_timeout(self) -> None: + """Cancels the original timeout task and sets it again from the start.""" # cancel original if it exists if self._timeout_task: if not self._timeout_task.cancelled(): @@ -200,18 +145,8 @@ class HelpSession: # recreate the timeout task self._timeout_task = self._bot.loop.create_task(self.timeout()) - async def on_reaction_add(self, reaction, user): - """ - Event handler for when reactions are added on the help message. - - Parameters - ---------- - reaction: :class:`discord.Reaction` - The reaction that was added. - user: :class:`discord.User` - The user who added the reaction. - """ - + async def on_reaction_add(self, reaction: Reaction, user: User) -> None: + """Event handler for when reactions are added on the help message.""" # ensure it was the relevant session message if reaction.message.id != self.message.id: return @@ -237,24 +172,13 @@ class HelpSession: with suppress(HTTPException): await self.message.remove_reaction(reaction, user) - async def on_message_delete(self, message): - """ - Closes the help session when the help message is deleted. - - Parameters - ---------- - message: :class:`discord.Message` - The message that was deleted. - """ - + async def on_message_delete(self, message: Message) -> None: + """Closes the help session when the help message is deleted.""" if message.id == self.message.id: await self.stop() - async def prepare(self): - """ - Sets up the help session pages, events, message and reactions. - """ - + async def prepare(self) -> None: + """Sets up the help session pages, events, message and reactions.""" # create paginated content await self.build_pages() @@ -266,12 +190,8 @@ class HelpSession: await self.update_page() self.add_reactions() - def add_reactions(self): - """ - Adds the relevant reactions to the help message based on if - pagination is required. - """ - + def add_reactions(self) -> None: + """Adds the relevant reactions to the help message based on if pagination is required.""" # if paginating if len(self._pages) > 1: for reaction in REACTIONS: @@ -281,44 +201,22 @@ class HelpSession: else: self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI)) - def _category_key(self, cmd): + def _category_key(self, cmd: Command) -> str: """ - Returns a cog name of a given command. Used as a key for - ``sorted`` and ``groupby``. - - A zero width space is used as a prefix for results with no cogs - to force them last in ordering. + Returns a cog name of a given command for use as a key for `sorted` and `groupby`. - Parameters - ---------- - cmd: :class:`discord.ext.commands.Command` - The command object being checked. - - Returns - ------- - str + A zero width space is used as a prefix for results with no cogs to force them last in ordering. """ - cog = cmd.cog_name return f'**{cog}**' if cog else f'**\u200bNo Category:**' - def _get_command_params(self, cmd): + def _get_command_params(self, cmd: Command) -> str: """ Returns the command usage signature. - This is a custom implementation of ``command.signature`` in - order to format the command signature without aliases. - - Parameters - ---------- - cmd: :class:`discord.ext.commands.Command` - The command object to get the parameters of. - - Returns - ------- - str + This is a custom implementation of `command.signature` in order to format the command + signature without aliases. """ - results = [] for name, param in cmd.clean_params.items(): @@ -346,16 +244,8 @@ class HelpSession: return f"{cmd.name} {' '.join(results)}" - async def build_pages(self): - """ - Builds the list of content pages to be paginated through in the - help message. - - Returns - ------- - list[str] - """ - + async def build_pages(self) -> None: + """Builds the list of content pages to be paginated through in the help message, as a list of str.""" # Use LinePaginator to restrict embed line height paginator = LinePaginator(prefix='', suffix='', max_lines=self._max_lines) @@ -482,20 +372,8 @@ class HelpSession: # save organised pages to session self._pages = paginator.pages - def embed_page(self, page_number=0): - """ - Returns an Embed with the requested page formatted within. - - Parameters - ---------- - page_number: int - The page to be retrieved. Zero indexed. - - Returns - ------- - :class:`discord.Embed` - """ - + def embed_page(self, page_number: int = 0) -> Embed: + """Returns an Embed with the requested page formatted within.""" embed = Embed() # if command or cog, add query to title for pages other than first @@ -514,17 +392,8 @@ class HelpSession: return embed - async def update_page(self, page_number=0): - """ - Sends the intial message, or changes the existing one to the - given page number. - - Parameters - ---------- - page_number: int - The page number to show in the help message. - """ - + async def update_page(self, page_number: int = 0) -> None: + """Sends the intial message, or changes the existing one to the given page number.""" self._current_page = page_number embed_page = self.embed_page(page_number) @@ -534,47 +403,27 @@ class HelpSession: await self.message.edit(embed=embed_page) @classmethod - async def start(cls, ctx, *command, **options): - """ - Create and begin a help session based on the given command - context. - - Parameters - ---------- - ctx: :class:`discord.ext.commands.Context` - The context of the invoked help command. - *command: str - A variable argument of the command being queried. - cleanup: Optional[bool] - Set to ``True`` to have the message deleted on session end. - Defaults to ``False``. - only_can_run: Optional[bool] - Set to ``True`` to hide commands the user can't run. - Defaults to ``False``. - show_hidden: Optional[bool] - Set to ``True`` to include hidden commands. - Defaults to ``False``. - max_lines: Optional[int] - Sets the max number of lines the paginator will add to a - single page. - Defaults to 20. - - Returns - ------- - :class:`HelpSession` + async def start(cls, ctx: Context, *command, **options) -> "HelpSession": """ + Create and begin a help session based on the given command context. + Available options kwargs: + * cleanup: Optional[bool] + Set to `True` to have the message deleted on session end. Defaults to `False`. + * only_can_run: Optional[bool] + Set to `True` to hide commands the user can't run. Defaults to `False`. + * show_hidden: Optional[bool] + Set to `True` to include hidden commands. Defaults to `False`. + * max_lines: Optional[int] + Sets the max number of lines the paginator will add to a single page. Defaults to 20. + """ session = cls(ctx, *command, **options) await session.prepare() return session - async def stop(self): - """ - Stops the help session, removes event listeners and attempts to - delete the help message. - """ - + async def stop(self) -> None: + """Stops the help session, removes event listeners and attempts to delete the help message.""" self._bot.remove_listener(self.on_reaction_add) self._bot.remove_listener(self.on_message_delete) @@ -586,80 +435,47 @@ class HelpSession: await self.message.clear_reactions() @property - def is_first_page(self): - """ - A bool reflecting if session is currently showing the first page. - - Returns - ------- - bool - """ - + def is_first_page(self) -> bool: + """Check if session is currently showing the first page.""" return self._current_page == 0 @property - def is_last_page(self): - """ - A bool reflecting if the session is currently showing the last page. - - Returns - ------- - bool - """ - + def is_last_page(self) -> bool: + """Check if the session is currently showing the last page.""" return self._current_page == (len(self._pages)-1) - async def do_first(self): - """ - Event that is called when the user requests the first page. - """ - + async def do_first(self) -> None: + """Event that is called when the user requests the first page.""" if not self.is_first_page: await self.update_page(0) - async def do_back(self): - """ - Event that is called when the user requests the previous page. - """ - + async def do_back(self) -> None: + """Event that is called when the user requests the previous page.""" if not self.is_first_page: await self.update_page(self._current_page-1) - async def do_next(self): - """ - Event that is called when the user requests the next page. - """ - + async def do_next(self) -> None: + """Event that is called when the user requests the next page.""" if not self.is_last_page: await self.update_page(self._current_page+1) - async def do_end(self): - """ - Event that is called when the user requests the last page. - """ - + async def do_end(self) -> None: + """Event that is called when the user requests the last page.""" if not self.is_last_page: await self.update_page(len(self._pages)-1) - async def do_stop(self): - """ - Event that is called when the user requests to stop the help session. - """ - + async def do_stop(self) -> None: + """Event that is called when the user requests to stop the help session.""" await self.message.delete() class Help(DiscordCog): - """ - Custom Embed Pagination Help feature - """ + """Custom Embed Pagination Help feature.""" + @commands.command('help') @redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES) - async def new_help(self, ctx, *commands): - """ - Shows Command Help. - """ - + async def new_help(self, ctx: Context, *commands) -> None: + """Shows Command Help.""" try: await HelpSession.start(ctx, *commands) except HelpQueryNotFound as error: @@ -674,42 +490,29 @@ class Help(DiscordCog): await ctx.send(embed=embed) -def unload(bot): +def unload(bot: Bot) -> None: """ Reinstates the original help command. - This is run if the cog raises an exception on load, or if the - extension is unloaded. - - Parameters - ---------- - bot: :class:`discord.ext.commands.Bot` - The discord bot client. + This is run if the cog raises an exception on load, or if the extension is unloaded. """ - bot.remove_command('help') bot.add_command(bot._old_help) -def setup(bot): +def setup(bot: Bot) -> None: """ The setup for the help extension. This is called automatically on `bot.load_extension` being run. - Stores the original help command instance on the ``bot._old_help`` - attribute for later reinstatement, before removing it from the - command registry so the new help command can be loaded successfully. - - If an exception is raised during the loading of the cog, ``unload`` - will be called in order to reinstate the original help command. + Stores the original help command instance on the `bot._old_help` attribute for later + reinstatement, before removing it from the command registry so the new help command can be + loaded successfully. - Parameters - ---------- - bot: `discord.ext.commands.Bot` - The discord bot client. + If an exception is raised during the loading of the cog, `unload` will be called in order to + reinstate the original help command. """ - bot._old_help = bot.get_command('help') bot.remove_command('help') @@ -720,18 +523,12 @@ def setup(bot): raise -def teardown(bot): +def teardown(bot: Bot) -> None: """ The teardown for the help extension. This is called automatically on `bot.unload_extension` being run. - Calls ``unload`` in order to reinstate the original help command. - - Parameters - ---------- - bot: `discord.ext.commands.Bot` - The discord bot client. + Calls `unload` in order to reinstate the original help command. """ - unload(bot) diff --git a/bot/cogs/information.py b/bot/cogs/information.py index c4aff73b8..60aec6219 100644 --- a/bot/cogs/information.py +++ b/bot/cogs/information.py @@ -13,23 +13,15 @@ log = logging.getLogger(__name__) class Information(Cog): - """ - A cog with commands for generating embeds with - server information, such as server statistics - and user information. - """ + """A cog with commands for generating embeds with server info, such as server stats and user info.""" def __init__(self, bot: Bot): self.bot = bot @with_role(*MODERATION_ROLES) @command(name="roles") - async def roles_info(self, ctx: Context): - """ - Returns a list of all roles and their - corresponding IDs. - """ - + async def roles_info(self, ctx: Context) -> None: + """Returns a list of all roles and their corresponding IDs.""" # Sort the roles alphabetically and remove the @everyone role roles = sorted(ctx.guild.roles, key=lambda role: role.name) roles = [role for role in roles if role.name != "@everyone"] @@ -51,12 +43,8 @@ class Information(Cog): await ctx.send(embed=embed) @command(name="server", aliases=["server_info", "guild", "guild_info"]) - async def server_info(self, ctx: Context): - """ - Returns an embed full of - server information. - """ - + async def server_info(self, ctx: Context) -> None: + """Returns an embed full of server information.""" created = time_since(ctx.guild.created_at, precision="days") features = ", ".join(ctx.guild.features) region = ctx.guild.region @@ -120,11 +108,8 @@ class Information(Cog): await ctx.send(embed=embed) @command(name="user", aliases=["user_info", "member", "member_info"]) - async def user_info(self, ctx: Context, user: Member = None, hidden: bool = False): - """ - Returns info about a user. - """ - + async def user_info(self, ctx: Context, user: Member = None, hidden: bool = False) -> None: + """Returns info about a user.""" if user is None: user = ctx.author @@ -197,6 +182,7 @@ class Information(Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: + """Information cog load.""" bot.add_cog(Information(bot)) log.info("Cog loaded: Information") diff --git a/bot/cogs/jams.py b/bot/cogs/jams.py index dd14111ce..be9d33e3e 100644 --- a/bot/cogs/jams.py +++ b/bot/cogs/jams.py @@ -11,22 +11,16 @@ log = logging.getLogger(__name__) class CodeJams(commands.Cog): - """ - Manages the code-jam related parts of our server - """ + """Manages the code-jam related parts of our server.""" def __init__(self, bot: commands.Bot): self.bot = bot @commands.command() @with_role(Roles.admin) - async def createteam( - self, ctx: commands.Context, - team_name: str, members: commands.Greedy[Member] - ): + async def createteam(self, ctx: commands.Context, team_name: str, members: commands.Greedy[Member]) -> None: """ - Create a team channel (both voice and text) in the Code Jams category, assign roles - and then add overwrites for the team. + Create team channels (voice and text) in the Code Jams category, assign roles, and add overwrites for the team. The first user passed will always be the team leader. """ @@ -114,6 +108,7 @@ class CodeJams(commands.Cog): ) -def setup(bot): +def setup(bot: commands.Bot) -> None: + """Code Jams cog load.""" bot.add_cog(CodeJams(bot)) log.info("Cog loaded: CodeJams") diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py index 64bbed46e..8e47bcc36 100644 --- a/bot/cogs/logging.py +++ b/bot/cogs/logging.py @@ -10,15 +10,14 @@ log = logging.getLogger(__name__) class Logging(Cog): - """ - Debug logging module - """ + """Debug logging module.""" def __init__(self, bot: Bot): self.bot = bot @Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: + """Announce our presence to the configured devlog channel.""" log.info("Bot connected!") embed = Embed(description="Connected!") @@ -35,6 +34,7 @@ class Logging(Cog): await self.bot.get_channel(Channels.devlog).send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: + """Logging cog load.""" bot.add_cog(Logging(bot)) log.info("Cog loaded: Logging") diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py index fea86c33e..81b3864a7 100644 --- a/bot/cogs/moderation.py +++ b/bot/cogs/moderation.py @@ -33,6 +33,7 @@ APPEALABLE_INFRACTIONS = ("Ban", "Mute") def proxy_user(user_id: str) -> Object: + """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved.""" try: user_id = int(user_id) except ValueError: @@ -47,9 +48,7 @@ UserTypes = Union[Member, User, proxy_user] class Moderation(Scheduler, Cog): - """ - Server moderation tools. - """ + """Server moderation tools.""" def __init__(self, bot: Bot): self.bot = bot @@ -58,10 +57,12 @@ class Moderation(Scheduler, Cog): @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: + """Schedule expiration for previous infractions.""" # Schedule expiration for previous infractions infractions = await self.bot.api_client.get( 'bot/infractions', params={'active': 'true'} @@ -74,14 +75,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command() - async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None): - """ - Create a warning infraction in the database for a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the warning. - """ - + async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: + """Create a warning infraction in the database for a user.""" infraction = await post_infraction(ctx, user, type="warning", reason=reason) if infraction is None: return @@ -120,14 +115,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command() - async def kick(self, ctx: Context, user: Member, *, reason: str = None): - """ - Kicks a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the kick. - """ - + async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Kicks a user with the provided reason.""" if not await self.respect_role_hierarchy(ctx, user, 'kick'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method @@ -176,14 +165,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command() - async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None): - """ - Create a permanent ban infraction in the database for a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the ban. - """ - + async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: + """Create a permanent ban infraction for a user with the provided reason.""" if not await self.respect_role_hierarchy(ctx, user, 'ban'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method @@ -242,14 +225,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command() - async def mute(self, ctx: Context, user: Member, *, reason: str = None): - """ - Create a permanent mute infraction in the database for a user. - - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the mute. - """ - + async def mute(self, ctx: Context, user: Member, *, reason: str = None) -> None: + """Create a permanent mute infraction for a user with the provided reason.""" if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): return @@ -304,11 +281,9 @@ class Moderation(Scheduler, Cog): @command() async def tempmute(self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None) -> None: """ - Create a temporary mute infraction in the database for a user. + Create a temporary mute infraction for a user with the provided expiration and reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary mute infraction - **`reason`:** The reason for the temporary mute. + Duration strings are parsed per: http://strftime.org/ """ expiration = duration @@ -372,11 +347,9 @@ class Moderation(Scheduler, Cog): @command() async def tempban(self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None) -> None: """ - Create a temporary ban infraction in the database for a user. + Create a temporary ban infraction for a user with the provided expiration and reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary ban infraction - **`reason`:** The reason for the temporary ban. + Duration strings are parsed per: http://strftime.org/ """ expiration = duration @@ -453,12 +426,10 @@ class Moderation(Scheduler, Cog): @command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn']) async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: """ - Create a private infraction note in the database for a user. + Create a private infraction note in the database for a user with the provided reason. - **`user`:** accepts user mention, ID, etc. - **`reason`:** The reason for the warning. + This does not send the user a notification """ - infraction = await post_infraction(ctx, user, type="warning", reason=reason, hidden=True) if infraction is None: return @@ -485,12 +456,10 @@ class Moderation(Scheduler, Cog): @command(hidden=True, aliases=['shadowkick', 'skick']) async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: """ - Kicks a user. + Kick a user for the provided reason. - **`user`:** accepts user mention, ID, etc. - **`reason`:** The reason for the kick. + This does not send the user a notification. """ - if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method @@ -538,12 +507,10 @@ class Moderation(Scheduler, Cog): @command(hidden=True, aliases=['shadowban', 'sban']) async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: """ - Create a permanent ban infraction in the database for a user. + Create a permanent ban infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the ban. + This does not send the user a notification. """ - if not await self.respect_role_hierarchy(ctx, user, 'shadowban'): # Ensure ctx author has a higher top role than the target user # Warning is sent to ctx by the helper method @@ -595,12 +562,10 @@ class Moderation(Scheduler, Cog): @command(hidden=True, aliases=['shadowmute', 'smute']) async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None: """ - Create a permanent mute infraction in the database for a user. + Create a permanent mute infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`reason`:** The reason for the mute. + This does not send the user a notification. """ - if await already_has_active_infraction(ctx=ctx, user=user, type="mute"): return @@ -635,19 +600,14 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=["shadowtempmute, stempmute"]) async def shadow_tempmute( - self, - ctx: Context, - user: Member, - duration: ExpirationDate, - *, - reason: str = None + self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None ) -> None: """ - Create a temporary mute infraction in the database for a user. + Create a temporary mute infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary mute infraction - **`reason`:** The reason for the temporary mute. + Duration strings are parsed per: http://strftime.org/ + + This does not send the user a notification. """ expiration = duration @@ -693,19 +653,14 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command(hidden=True, aliases=["shadowtempban, stempban"]) async def shadow_tempban( - self, - ctx: Context, - user: UserTypes, - duration: ExpirationDate, - *, - reason: str = None + self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None ) -> None: """ - Create a temporary ban infraction in the database for a user. + Create a temporary ban infraction for a user with the provided reason. - **`user`:** Accepts user mention, ID, etc. - **`duration`:** The duration for the temporary ban infraction - **`reason`:** The reason for the temporary ban. + Duration strings are parsed per: http://strftime.org/ + + This does not send the user a notification. """ expiration = duration @@ -774,12 +729,7 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command() async def unmute(self, ctx: Context, user: UserTypes) -> None: - """ - Deactivates the active mute infraction for a user. - - **`user`:** Accepts user mention, ID, etc. - """ - + """Deactivates the active mute infraction for a user.""" try: # check the current active infraction response = await self.bot.api_client.get( @@ -857,12 +807,7 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @command() async def unban(self, ctx: Context, user: UserTypes) -> None: - """ - Deactivates the active ban infraction for a user. - - **`user`:** Accepts user mention, ID, etc. - """ - + """Deactivates the active ban infraction for a user.""" try: # check the current active infraction response = await self.bot.api_client.get( @@ -925,16 +870,14 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) - async def infraction_group(self, ctx: Context): + async def infraction_group(self, ctx: Context) -> None: """Infraction manipulation commands.""" - await ctx.invoke(self.bot.get_command("help"), "infraction") @with_role(*MODERATION_ROLES) @infraction_group.group(name='edit', invoke_without_command=True) - async def infraction_edit_group(self, ctx: Context): + async def infraction_edit_group(self, ctx: Context) -> None: """Infraction editing commands.""" - await ctx.invoke(self.bot.get_command("help"), "infraction", "edit") @with_role(*MODERATION_ROLES) @@ -942,15 +885,12 @@ class Moderation(Scheduler, Cog): async def edit_duration( self, ctx: Context, infraction_id: int, expires_at: Union[ExpirationDate, str] - ): + ) -> None: """ Sets the duration of the given infraction, relative to the time of updating. - **`infraction_id`:** the id of the infraction - **`expires_at`:** the new expiration date of the infraction. - Use "permanent" to mark the infraction as permanent. + Duration strings are parsed per: http://strftime.org/, use "permanent" to mark the infraction as permanent. """ - if isinstance(expires_at, str) and expires_at != 'permanent': raise BadArgument( "If `expires_at` is given as a non-datetime, " @@ -1031,12 +971,7 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @infraction_edit_group.command(name="reason") async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None: - """ - Sets the reason of the given infraction. - **`infraction_id`:** the id of the infraction - **`reason`:** The new reason of the infraction - """ - + """Edit the reason of the given infraction.""" try: old_infraction = await self.bot.api_client.get( 'bot/infractions/' + str(infraction_id) @@ -1087,11 +1022,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @infraction_group.group(name="search", invoke_without_command=True) - async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery): - """ - Searches for infractions in the database. - """ - + async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: + """Searches for infractions in the database.""" if isinstance(query, User): await ctx.invoke(self.search_user, query) @@ -1100,11 +1032,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @infraction_search_group.command(name="user", aliases=("member", "id")) - async def search_user(self, ctx: Context, user: Union[User, proxy_user]): - """ - Search for infractions by member. - """ - + async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None: + """Search for infractions by member.""" infraction_list = await self.bot.api_client.get( 'bot/infractions', params={'user__id': str(user.id)} @@ -1117,11 +1046,8 @@ class Moderation(Scheduler, Cog): @with_role(*MODERATION_ROLES) @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) - async def search_reason(self, ctx: Context, reason: str): - """ - Search for infractions by their reason. Use Re2 for matching. - """ - + async def search_reason(self, ctx: Context, reason: str) -> None: + """Search for infractions by their reason. Use Re2 for matching.""" infraction_list = await self.bot.api_client.get( 'bot/infractions', params={'search': reason} ) @@ -1134,8 +1060,8 @@ class Moderation(Scheduler, Cog): # endregion # region: Utility functions - async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list): - + async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None: + """Send a paginated embed of infractions for the specified user.""" if not infractions: await ctx.send(f":warning: No infractions could be found for that query.") return @@ -1158,17 +1084,9 @@ class Moderation(Scheduler, Cog): # region: Utility functions def schedule_expiration( - self, - loop: asyncio.AbstractEventLoop, - infraction_object: Dict[str, Union[str, int, bool]] + self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]] ) -> None: - """ - Schedules a task to expire a temporary infraction. - - :param loop: the asyncio event loop - :param infraction_object: the infraction object to expire at the end of the task - """ - + """Schedules a task to expire a temporary infraction.""" infraction_id = infraction_object["id"] if infraction_id in self.scheduled_tasks: return @@ -1177,12 +1095,8 @@ class Moderation(Scheduler, Cog): self.scheduled_tasks[infraction_id] = task - def cancel_expiration(self, infraction_id: str): - """ - Un-schedules a task set to expire a temporary infraction. - :param infraction_id: the ID of the infraction in question - """ - + def cancel_expiration(self, infraction_id: str) -> None: + """Un-schedules a task set to expire a temporary infraction.""" task = self.scheduled_tasks.get(infraction_id) if task is None: log.warning(f"Failed to unschedule {infraction_id}: no task found.") @@ -1193,13 +1107,11 @@ class Moderation(Scheduler, Cog): async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: """ - A co-routine which marks an infraction as expired after the delay from the time of - scheduling to the time of expiration. At the time of expiration, the infraction is - marked as inactive on the website, and the expiration task is cancelled. + Marks an infraction expired after the delay from time of scheduling to time of expiration. - :param infraction_object: the infraction in question + At the time of expiration, the infraction is marked as inactive on the website, and the + expiration task is cancelled. The user is then notified via DM. """ - infraction_id = infraction_object["id"] # transform expiration to delay in seconds @@ -1224,11 +1136,9 @@ class Moderation(Scheduler, Cog): async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None: """ A co-routine which marks an infraction as inactive on the website. - This co-routine does not cancel or un-schedule an expiration task. - :param infraction_object: the infraction in question + This co-routine does not cancel or un-schedule an expiration task. """ - guild: Guild = self.bot.get_guild(constants.Guild.id) user_id = infraction_object["user"] infraction_type = infraction_object["type"] @@ -1254,6 +1164,7 @@ class Moderation(Scheduler, Cog): ) def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str: + """Convert the infraction object to a string representation.""" actor_id = infraction_object["actor"] guild: Guild = self.bot.get_guild(constants.Guild.id) actor = guild.get_member(actor_id) @@ -1283,21 +1194,17 @@ class Moderation(Scheduler, Cog): return lines.strip() async def notify_infraction( - self, - user: Union[User, Member], - infr_type: str, - expires_at: Union[datetime, str] = 'N/A', - reason: str = "No reason provided." + self, + user: Union[User, Member], + infr_type: str, + expires_at: Union[datetime, str] = 'N/A', + reason: str = "No reason provided." ) -> bool: """ - Notify a user of their fresh infraction :) + Attempt to notify a user, via DM, of their fresh infraction. - :param user: The user to send the message to. - :param infr_type: The type of infraction, as a string. - :param duration: The duration of the infraction. - :param reason: The reason for the infraction. + Returns a boolean indicator of whether the DM was successful. """ - if isinstance(expires_at, datetime): expires_at = expires_at.strftime('%c') @@ -1328,14 +1235,10 @@ class Moderation(Scheduler, Cog): icon_url: str = Icons.user_verified ) -> bool: """ - Notify a user that an infraction has been lifted. + Attempt to notify a user, via DM, of their expired infraction. - :param user: The user to send the message to. - :param title: The title of the embed. - :param content: The content of the embed. - :param icon_url: URL for the title icon. + Optionally returns a boolean indicator of whether the DM was successful. """ - embed = Embed( description=content, colour=Colour(Colours.soft_green) @@ -1349,10 +1252,8 @@ class Moderation(Scheduler, Cog): """ A helper method for sending an embed to a user's DMs. - :param user: The user to send the embed to. - :param embed: The embed to send. + Returns a boolean indicator of DM success. """ - # sometimes `user` is a `discord.Object`, so let's make it a proper user. user = await self.bot.fetch_user(user.id) @@ -1366,7 +1267,8 @@ class Moderation(Scheduler, Cog): ) return False - async def log_notify_failure(self, target: str, actor: Member, infraction_type: str): + async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None: + """Send a mod log entry if an attempt to DM the target user has failed.""" await self.mod_log.send_log_message( icon_url=Icons.token_removed, content=actor.mention, @@ -1381,7 +1283,8 @@ class Moderation(Scheduler, Cog): # endregion @staticmethod - async def cog_command_error(ctx: Context, error) -> None: + async def cog_command_error(ctx: Context, error: Exception) -> None: + """Send a notification to the invoking context on a Union failure.""" if isinstance(error, BadUnionArgument): if User in error.converters: await ctx.send(str(error.errors[0])) @@ -1391,15 +1294,11 @@ class Moderation(Scheduler, Cog): async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool: """ Check if the highest role of the invoking member is greater than that of the target member. + If this check fails, a warning is sent to the invoking ctx. Returns True always if target is not a discord.Member instance. - - :param ctx: The command context when invoked. - :param target: The target of the infraction. - :param infr_type: The type of infraction. """ - if not isinstance(target, Member): return True @@ -1419,6 +1318,6 @@ class Moderation(Scheduler, Cog): def setup(bot: Bot) -> None: - """Sets up the Moderation cog.""" + """Moderation cog load.""" bot.add_cog(Moderation(bot)) log.info("Cog loaded: Moderation") diff --git a/bot/cogs/modlog.py b/bot/cogs/modlog.py index 978646f46..68424d268 100644 --- a/bot/cogs/modlog.py +++ b/bot/cogs/modlog.py @@ -11,7 +11,7 @@ from discord import ( RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel ) from discord.abc import GuildChannel -from discord.ext.commands import Bot, Cog +from discord.ext.commands import Bot, Cog, Context from bot.constants import ( Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs @@ -29,9 +29,7 @@ ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions") class ModLog(Cog, name="ModLog"): - """ - Logging for server events and staff actions - """ + """Logging for server events and staff actions.""" def __init__(self, bot: Bot): self.bot = bot @@ -40,16 +38,14 @@ class ModLog(Cog, name="ModLog"): self._cached_deletes = [] self._cached_edits = [] - async def upload_log(self, messages: List[Message], actor_id: int) -> Optional[str]: + async def upload_log(self, messages: List[Message], actor_id: int) -> str: """ - Uploads the log data to the database via - an API endpoint for uploading logs. + Uploads the log data to the database via an API endpoint for uploading logs. Used in several mod log embeds. Returns a URL that can be used to view the log. """ - response = await self.bot.api_client.post( 'bot/deleted-messages', json={ @@ -70,7 +66,8 @@ class ModLog(Cog, name="ModLog"): return f"{URLs.site_logs_view}/{response['id']}" - def ignore(self, event: Event, *items: int): + def ignore(self, event: Event, *items: int) -> None: + """Add event to ignored events to suppress log emission.""" for item in items: if item not in self._ignored[event]: self._ignored[event].append(item) @@ -90,7 +87,8 @@ class ModLog(Cog, name="ModLog"): additional_embeds_msg: Optional[str] = None, timestamp_override: Optional[datetime] = None, footer: Optional[str] = None, - ): + ) -> Context: + """Generate log embed and send to logging channel.""" embed = Embed(description=text) if title and icon_url: @@ -123,7 +121,8 @@ class ModLog(Cog, name="ModLog"): return await self.bot.get_context(log_message) # Optionally return for use with antispam @Cog.listener() - async def on_guild_channel_create(self, channel: GUILD_CHANNEL): + async def on_guild_channel_create(self, channel: GUILD_CHANNEL) -> None: + """Log channel create event to mod log.""" if channel.guild.id != GuildConstant.id: return @@ -148,7 +147,8 @@ class ModLog(Cog, name="ModLog"): await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message) @Cog.listener() - async def on_guild_channel_delete(self, channel: GUILD_CHANNEL): + async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None: + """Log channel delete event to mod log.""" if channel.guild.id != GuildConstant.id: return @@ -170,7 +170,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel): + async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel) -> None: + """Log channel update event to mod log.""" if before.guild.id != GuildConstant.id: return @@ -229,7 +230,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_role_create(self, role: Role): + async def on_guild_role_create(self, role: Role) -> None: + """Log role create event to mod log.""" if role.guild.id != GuildConstant.id: return @@ -239,7 +241,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_role_delete(self, role: Role): + async def on_guild_role_delete(self, role: Role) -> None: + """Log role delete event to mod log.""" if role.guild.id != GuildConstant.id: return @@ -249,7 +252,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_role_update(self, before: Role, after: Role): + async def on_guild_role_update(self, before: Role, after: Role) -> None: + """Log role update event to mod log.""" if before.guild.id != GuildConstant.id: return @@ -301,7 +305,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_guild_update(self, before: Guild, after: Guild): + async def on_guild_update(self, before: Guild, after: Guild) -> None: + """Log guild update event to mod log.""" if before.id != GuildConstant.id: return @@ -351,7 +356,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_ban(self, guild: Guild, member: Union[Member, User]): + async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None: + """Log ban event to mod log.""" if guild.id != GuildConstant.id: return @@ -367,7 +373,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_join(self, member: Member): + async def on_member_join(self, member: Member) -> None: + """Log member join event to user log.""" if member.guild.id != GuildConstant.id: return @@ -388,7 +395,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_remove(self, member: Member): + async def on_member_remove(self, member: Member) -> None: + """Log member leave event to user log.""" if member.guild.id != GuildConstant.id: return @@ -404,7 +412,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_unban(self, guild: Guild, member: User): + async def on_member_unban(self, guild: Guild, member: User) -> None: + """Log member unban event to mod log.""" if guild.id != GuildConstant.id: return @@ -420,7 +429,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_member_update(self, before: Member, after: Member): + async def on_member_update(self, before: Member, after: Member) -> None: + """Log member update event to user log.""" if before.guild.id != GuildConstant.id: return @@ -510,7 +520,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_message_delete(self, message: Message): + async def on_message_delete(self, message: Message) -> None: + """Log message delete event to message change log.""" channel = message.channel author = message.author @@ -565,7 +576,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_raw_message_delete(self, event: RawMessageDeleteEvent): + async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None: + """Log raw message delete event to message change log.""" if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored: return @@ -605,7 +617,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_message_edit(self, before: Message, after: Message): + async def on_message_edit(self, before: Message, after: Message) -> None: + """Log message edit event to message change log.""" if ( not before.guild or before.guild.id != GuildConstant.id @@ -679,7 +692,8 @@ class ModLog(Cog, name="ModLog"): ) @Cog.listener() - async def on_raw_message_edit(self, event: RawMessageUpdateEvent): + async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None: + """Log raw message edit event to message change log.""" try: channel = self.bot.get_channel(int(event.data["channel_id"])) message = await channel.fetch_message(event.message_id) @@ -748,6 +762,7 @@ class ModLog(Cog, name="ModLog"): ) -def setup(bot): +def setup(bot: Bot) -> None: + """Mod log cog load.""" bot.add_cog(ModLog(bot)) log.info("Cog loaded: ModLog") diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py index 1f6ed80b5..8f1af347a 100644 --- a/bot/cogs/off_topic_names.py +++ b/bot/cogs/off_topic_names.py @@ -19,7 +19,8 @@ class OffTopicName(Converter): """A converter that ensures an added off-topic name is valid.""" @staticmethod - async def convert(ctx: Context, argument: str): + async def convert(ctx: Context, argument: str) -> str: + """Attempt to replace any invalid characters with their approximate Unicode equivalent.""" allowed_characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ!?'`-" if not (2 <= len(argument) <= 96): @@ -38,16 +39,8 @@ class OffTopicName(Converter): return argument.translate(table) -async def update_names(bot: Bot): - """ - The background updater task that performs a channel name update daily. - - Args: - bot (Bot): - The running bot instance, used for fetching data from the - website via the bot's `api_client`. - """ - +async def update_names(bot: Bot) -> None: + """Background updater task that performs the daily channel name update.""" while True: # Since we truncate the compute timedelta to seconds, we add one second to ensure # we go past midnight in the `seconds_to_sleep` set below. @@ -77,26 +70,27 @@ class OffTopicNames(Cog): self.bot = bot self.updater_task = None - def cog_unload(self): + def cog_unload(self) -> None: + """Cancel any running updater tasks on cog unload.""" if self.updater_task is not None: self.updater_task.cancel() @Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: + """Start off-topic channel updating event loop if it hasn't already started.""" if self.updater_task is None: coro = update_names(self.bot) self.updater_task = self.bot.loop.create_task(coro) @group(name='otname', aliases=('otnames', 'otn'), invoke_without_command=True) @with_role(*MODERATION_ROLES) - async def otname_group(self, ctx): + async def otname_group(self, ctx: Context) -> None: """Add or list items from the off-topic channel name rotation.""" - await ctx.invoke(self.bot.get_command("help"), "otname") @otname_group.command(name='add', aliases=('a',)) @with_role(*MODERATION_ROLES) - async def add_command(self, ctx, *names: OffTopicName): + async def add_command(self, ctx: Context, *names: OffTopicName) -> None: """Adds a new off-topic name to the rotation.""" # Chain multiple words to a single one name = "-".join(names) @@ -110,7 +104,7 @@ class OffTopicNames(Cog): @otname_group.command(name='delete', aliases=('remove', 'rm', 'del', 'd')) @with_role(*MODERATION_ROLES) - async def delete_command(self, ctx, *names: OffTopicName): + async def delete_command(self, ctx: Context, *names: OffTopicName) -> None: """Removes a off-topic name from the rotation.""" # Chain multiple words to a single one name = "-".join(names) @@ -124,12 +118,12 @@ class OffTopicNames(Cog): @otname_group.command(name='list', aliases=('l',)) @with_role(*MODERATION_ROLES) - async def list_command(self, ctx): + async def list_command(self, ctx: Context) -> None: """ Lists all currently known off-topic channel names in a paginator. + Restricted to Moderator and above to not spoil the surprise. """ - result = await self.bot.api_client.get('bot/off-topic-channel-names') lines = sorted(f"• {name}" for name in result) embed = Embed( @@ -144,11 +138,8 @@ class OffTopicNames(Cog): @otname_group.command(name='search', aliases=('s',)) @with_role(*MODERATION_ROLES) - async def search_command(self, ctx, *, query: OffTopicName): - """ - Search for an off-topic name. - """ - + async def search_command(self, ctx: Context, *, query: OffTopicName) -> None: + """Search for an off-topic name.""" result = await self.bot.api_client.get('bot/off-topic-channel-names') in_matches = {name for name in result if query in name} close_matches = difflib.get_close_matches(query, result, n=10, cutoff=0.70) @@ -165,6 +156,7 @@ class OffTopicNames(Cog): await ctx.send(embed=embed) -def setup(bot: Bot): +def setup(bot: Bot) -> None: + """Off topic names cog load.""" bot.add_cog(OffTopicNames(bot)) log.info("Cog loaded: OffTopicNames") diff --git a/bot/cogs/reddit.py b/bot/cogs/reddit.py index 4c561b7e8..63a57c5c6 100644 --- a/bot/cogs/reddit.py +++ b/bot/cogs/reddit.py @@ -3,8 +3,9 @@ import logging import random import textwrap from datetime import datetime, timedelta +from typing import List -from discord import Colour, Embed, TextChannel +from discord import Colour, Embed, Message, TextChannel from discord.ext.commands import Bot, Cog, Context, group from bot.constants import Channels, ERROR_REPLIES, Reddit as RedditConfig, STAFF_ROLES @@ -16,9 +17,7 @@ log = logging.getLogger(__name__) class Reddit(Cog): - """ - Track subreddit posts and show detailed statistics about them. - """ + """Track subreddit posts and show detailed statistics about them.""" HEADERS = {"User-Agent": "Discord Bot: PythonDiscord (https://pythondiscord.com/)"} URL = "https://www.reddit.com" @@ -34,11 +33,8 @@ class Reddit(Cog): self.new_posts_task = None self.top_weekly_posts_task = None - async def fetch_posts(self, route: str, *, amount: int = 25, params=None): - """ - A helper method to fetch a certain amount of Reddit posts at a given route. - """ - + async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]: + """A helper method to fetch a certain amount of Reddit posts at a given route.""" # Reddit's JSON responses only provide 25 posts at most. if not 25 >= amount > 0: raise ValueError("Invalid amount of subreddit posts requested.") @@ -57,11 +53,10 @@ class Reddit(Cog): return posts[:amount] - async def send_top_posts(self, channel: TextChannel, subreddit: Subreddit, content=None, time="all"): - """ - Create an embed for the top posts, then send it in a given TextChannel. - """ - + async def send_top_posts( + self, channel: TextChannel, subreddit: Subreddit, content: str = None, time: str = "all" + ) -> Message: + """Create an embed for the top posts, then send it in a given TextChannel.""" # Create the new spicy embed. embed = Embed() embed.description = "" @@ -115,11 +110,8 @@ class Reddit(Cog): embed=embed ) - async def poll_new_posts(self): - """ - Periodically search for new subreddit posts. - """ - + async def poll_new_posts(self) -> None: + """Periodically search for new subreddit posts.""" while True: await asyncio.sleep(RedditConfig.request_delay) @@ -179,11 +171,8 @@ class Reddit(Cog): log.trace(f"Sent {len(new_posts)} new {subreddit} posts to channel {self.reddit_channel.id}.") - async def poll_top_weekly_posts(self): - """ - Post a summary of the top posts every week. - """ - + async def poll_top_weekly_posts(self) -> None: + """Post a summary of the top posts every week.""" while True: now = datetime.utcnow() @@ -214,19 +203,13 @@ class Reddit(Cog): await message.pin() @group(name="reddit", invoke_without_command=True) - async def reddit_group(self, ctx: Context): - """ - View the top posts from various subreddits. - """ - + async def reddit_group(self, ctx: Context) -> None: + """View the top posts from various subreddits.""" await ctx.invoke(self.bot.get_command("help"), "reddit") @reddit_group.command(name="top") - async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python"): - """ - Send the top posts of all time from a given subreddit. - """ - + async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: + """Send the top posts of all time from a given subreddit.""" await self.send_top_posts( channel=ctx.channel, subreddit=subreddit, @@ -235,11 +218,8 @@ class Reddit(Cog): ) @reddit_group.command(name="daily") - async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python"): - """ - Send the top posts of today from a given subreddit. - """ - + async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: + """Send the top posts of today from a given subreddit.""" await self.send_top_posts( channel=ctx.channel, subreddit=subreddit, @@ -248,11 +228,8 @@ class Reddit(Cog): ) @reddit_group.command(name="weekly") - async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python"): - """ - Send the top posts of this week from a given subreddit. - """ - + async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None: + """Send the top posts of this week from a given subreddit.""" await self.send_top_posts( channel=ctx.channel, subreddit=subreddit, @@ -262,11 +239,8 @@ class Reddit(Cog): @with_role(*STAFF_ROLES) @reddit_group.command(name="subreddits", aliases=("subs",)) - async def subreddits_command(self, ctx: Context): - """ - Send a paginated embed of all the subreddits we're relaying. - """ - + async def subreddits_command(self, ctx: Context) -> None: + """Send a paginated embed of all the subreddits we're relaying.""" embed = Embed() embed.title = "Relayed subreddits." embed.colour = Colour.blurple() @@ -280,7 +254,8 @@ class Reddit(Cog): ) @Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: + """Initiate reddit post event loop.""" self.reddit_channel = await self.bot.fetch_channel(Channels.reddit) if self.reddit_channel is not None: @@ -292,6 +267,7 @@ class Reddit(Cog): log.warning("Couldn't locate a channel for subreddit relaying.") -def setup(bot): +def setup(bot: Bot) -> None: + """Reddit cog load.""" bot.add_cog(Reddit(bot)) log.info("Cog loaded: Reddit") diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py index c6ae984ea..8460de91f 100644 --- a/bot/cogs/reminders.py +++ b/bot/cogs/reminders.py @@ -4,9 +4,10 @@ import random import textwrap from datetime import datetime from operator import itemgetter +from typing import Optional from dateutil.relativedelta import relativedelta -from discord import Colour, Embed +from discord import Colour, Embed, Message from discord.ext.commands import Bot, Cog, Context, group from bot.constants import Channels, Icons, NEGATIVE_REPLIES, POSITIVE_REPLIES, STAFF_ROLES @@ -23,14 +24,15 @@ MAXIMUM_REMINDERS = 5 class Reminders(Scheduler, Cog): + """Provide in-channel reminder functionality.""" def __init__(self, bot: Bot): self.bot = bot super().__init__() @Cog.listener() - async def on_ready(self): - # Get all the current reminders for re-scheduling + async def on_ready(self) -> None: + """Get all current reminders from the API and reschedule them.""" response = await self.bot.api_client.get( 'bot/reminders', params={'active': 'true'} @@ -51,25 +53,16 @@ class Reminders(Scheduler, Cog): self.schedule_task(loop, reminder["id"], reminder) @staticmethod - async def _send_confirmation(ctx: Context, on_success: str): - """ - Send an embed confirming the change was made successfully. - """ - + async def _send_confirmation(ctx: Context, on_success: str) -> None: + """Send an embed confirming the reminder change was made successfully.""" embed = Embed() embed.colour = Colour.green() embed.title = random.choice(POSITIVE_REPLIES) embed.description = on_success await ctx.send(embed=embed) - async def _scheduled_task(self, reminder: dict): - """ - A coroutine which sends the reminder once the time is reached. - - :param reminder: the data of the reminder. - :return: - """ - + async def _scheduled_task(self, reminder: dict) -> None: + """A coroutine which sends the reminder once the time is reached, and cancels the running task.""" reminder_id = reminder["id"] reminder_datetime = datetime.fromisoformat(reminder['expiration'][:-1]) @@ -83,38 +76,22 @@ class Reminders(Scheduler, Cog): # Now we can begone with it from our schedule list. self.cancel_task(reminder_id) - async def _delete_reminder(self, reminder_id: str): - """ - Delete a reminder from the database, given its ID. - - :param reminder_id: The ID of the reminder. - """ - + async def _delete_reminder(self, reminder_id: str) -> None: + """Delete a reminder from the database, given its ID, and cancel the running task.""" await self.bot.api_client.delete('bot/reminders/' + str(reminder_id)) # Now we can remove it from the schedule list self.cancel_task(reminder_id) - async def _reschedule_reminder(self, reminder): - """ - Reschedule a reminder object. - - :param reminder: The reminder to be rescheduled. - """ - + async def _reschedule_reminder(self, reminder: dict) -> None: + """Reschedule a reminder object.""" loop = asyncio.get_event_loop() self.cancel_task(reminder["id"]) self.schedule_task(loop, reminder["id"], reminder) - async def send_reminder(self, reminder, late: relativedelta = None): - """ - Send the reminder. - - :param reminder: The data about the reminder. - :param late: How late the reminder is (if at all) - """ - + async def send_reminder(self, reminder: dict, late: relativedelta = None) -> None: + """Send the reminder.""" channel = self.bot.get_channel(reminder["channel_id"]) user = self.bot.get_user(reminder["author"]) @@ -141,19 +118,17 @@ class Reminders(Scheduler, Cog): await self._delete_reminder(reminder["id"]) @group(name="remind", aliases=("reminder", "reminders"), invoke_without_command=True) - async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str): - """ - Commands for managing your reminders. - """ - + async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> None: + """Commands for managing your reminders.""" await ctx.invoke(self.new_reminder, expiration=expiration, content=content) @remind_group.command(name="new", aliases=("add", "create")) - async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str): + async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> Optional[Message]: """ Set yourself a simple reminder. - """ + Expiration is parsed per: http://strftime.org/ + """ embed = Embed() # If the user is not staff, we need to verify whether or not to make a reminder at all. @@ -204,11 +179,8 @@ class Reminders(Scheduler, Cog): self.schedule_task(loop, reminder["id"], reminder) @remind_group.command(name="list") - async def list_reminders(self, ctx: Context): - """ - View a paginated embed of all reminders for your user. - """ - + async def list_reminders(self, ctx: Context) -> Optional[Message]: + """View a paginated embed of all reminders for your user.""" # Get all the user's reminders from the database. data = await self.bot.api_client.get( 'bot/reminders', @@ -260,19 +232,17 @@ class Reminders(Scheduler, Cog): ) @remind_group.group(name="edit", aliases=("change", "modify"), invoke_without_command=True) - async def edit_reminder_group(self, ctx: Context): - """ - Commands for modifying your current reminders. - """ - + async def edit_reminder_group(self, ctx: Context) -> None: + """Commands for modifying your current reminders.""" await ctx.invoke(self.bot.get_command("help"), "reminders", "edit") @edit_reminder_group.command(name="duration", aliases=("time",)) - async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate): - """ - Edit one of your reminders' expiration. + async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate) -> None: """ + Edit one of your reminder's expiration. + Expiration is parsed per: http://strftime.org/ + """ # Send the request to update the reminder in the database reminder = await self.bot.api_client.patch( 'bot/reminders/' + str(id_), @@ -287,11 +257,8 @@ class Reminders(Scheduler, Cog): await self._reschedule_reminder(reminder) @edit_reminder_group.command(name="content", aliases=("reason",)) - async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str): - """ - Edit one of your reminders' content. - """ - + async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str) -> None: + """Edit one of your reminder's content.""" # Send the request to update the reminder in the database reminder = await self.bot.api_client.patch( 'bot/reminders/' + str(id_), @@ -305,17 +272,15 @@ class Reminders(Scheduler, Cog): await self._reschedule_reminder(reminder) @remind_group.command("delete", aliases=("remove",)) - async def delete_reminder(self, ctx: Context, id_: int): - """ - Delete one of your active reminders. - """ - + async def delete_reminder(self, ctx: Context, id_: int) -> None: + """Delete one of your active reminders.""" await self._delete_reminder(id_) await self._send_confirmation( ctx, on_success="That reminder has been deleted successfully!" ) -def setup(bot: Bot): +def setup(bot: Bot) -> None: + """Reminders cog load.""" bot.add_cog(Reminders(bot)) log.info("Cog loaded: Reminders") diff --git a/bot/cogs/security.py b/bot/cogs/security.py index e02e91530..316b33d6b 100644 --- a/bot/cogs/security.py +++ b/bot/cogs/security.py @@ -6,24 +6,25 @@ log = logging.getLogger(__name__) class Security(Cog): - """ - Security-related helpers - """ + """Security-related helpers.""" def __init__(self, bot: Bot): self.bot = bot self.bot.check(self.check_not_bot) # Global commands check - no bots can run any commands at all self.bot.check(self.check_on_guild) # Global commands check - commands can't be run in a DM - def check_not_bot(self, ctx: Context): + def check_not_bot(self, ctx: Context) -> bool: + """Check if the context is a bot user.""" return not ctx.author.bot - def check_on_guild(self, ctx: Context): + def check_on_guild(self, ctx: Context) -> bool: + """Check if the context is in a guild.""" if ctx.guild is None: raise NoPrivateMessage("This command cannot be used in private messages.") return True -def setup(bot): +def setup(bot: Bot) -> None: + """Security cog load.""" bot.add_cog(Security(bot)) log.info("Cog loaded: Security") diff --git a/bot/cogs/site.py b/bot/cogs/site.py index 4d5b2e811..4a423faa9 100644 --- a/bot/cogs/site.py +++ b/bot/cogs/site.py @@ -19,15 +19,13 @@ class Site(Cog): self.bot = bot @group(name="site", aliases=("s",), invoke_without_command=True) - async def site_group(self, ctx): + async def site_group(self, ctx: Context) -> None: """Commands for getting info about our website.""" - await ctx.invoke(self.bot.get_command("help"), "site") @site_group.command(name="home", aliases=("about",)) - async def site_main(self, ctx: Context): + async def site_main(self, ctx: Context) -> None: """Info about the website itself.""" - url = f"{URLs.site_schema}{URLs.site}/" embed = Embed(title="Python Discord website") @@ -43,9 +41,8 @@ class Site(Cog): await ctx.send(embed=embed) @site_group.command(name="resources") - async def site_resources(self, ctx: Context): + async def site_resources(self, ctx: Context) -> None: """Info about the site's Resources page.""" - learning_url = f"{PAGES_URL}/resources" tools_url = f"{PAGES_URL}/tools" @@ -63,9 +60,8 @@ class Site(Cog): await ctx.send(embed=embed) @site_group.command(name="help") - async def site_help(self, ctx: Context): + async def site_help(self, ctx: Context) -> None: """Info about the site's Getting Help page.""" - url = f"{PAGES_URL}/asking-good-questions" embed = Embed(title="Asking Good Questions") @@ -80,9 +76,8 @@ class Site(Cog): await ctx.send(embed=embed) @site_group.command(name="faq") - async def site_faq(self, ctx: Context): + async def site_faq(self, ctx: Context) -> None: """Info about the site's FAQ page.""" - url = f"{PAGES_URL}/frequently-asked-questions" embed = Embed(title="FAQ") @@ -99,14 +94,8 @@ class Site(Cog): @site_group.command(aliases=['r', 'rule'], name='rules') @redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES) - async def site_rules(self, ctx: Context, *rules: int): - """ - Provides a link to the `rules` endpoint of the website, or displays - specific rules, if they are requested. - - **`ctx`:** The Discord message context - **`rules`:** The rules a user wants to get. - """ + async def site_rules(self, ctx: Context, *rules: int) -> None: + """Provides a link to all rules or, if specified, displays specific rule(s).""" rules_embed = Embed(title='Rules', color=Colour.blurple()) rules_embed.url = f"{PAGES_URL}/rules" @@ -138,6 +127,7 @@ class Site(Cog): await LinePaginator.paginate(final_rules, ctx, rules_embed, max_lines=3) -def setup(bot): +def setup(bot: Bot) -> None: + """Site cog load.""" bot.add_cog(Site(bot)) log.info("Cog loaded: Site") diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py index d36c0795d..5accbdb5e 100644 --- a/bot/cogs/snekbox.py +++ b/bot/cogs/snekbox.py @@ -37,9 +37,7 @@ MAX_PASTE_LEN = 1000 class Snekbox(Cog): - """ - Safe evaluation of Python code using Snekbox - """ + """Safe evaluation of Python code using Snekbox.""" def __init__(self, bot: Bot): self.bot = bot @@ -169,7 +167,7 @@ class Snekbox(Cog): @command(name="eval", aliases=("e",)) @guild_only() @in_channel(Channels.bot, bypass_roles=STAFF_ROLES) - async def eval_command(self, ctx: Context, *, code: str = None): + async def eval_command(self, ctx: Context, *, code: str = None) -> None: """ Run Python code and get the results. @@ -178,13 +176,15 @@ class Snekbox(Cog): issue with it! """ if ctx.author.id in self.jobs: - return await ctx.send( + await ctx.send( f"{ctx.author.mention} You've already got a job running - " f"please wait for it to finish!" ) + return if not code: # None or empty string - return await ctx.invoke(self.bot.get_command("help"), "eval") + await ctx.invoke(self.bot.get_command("help"), "eval") + return log.info( f"Received code from {ctx.author.name}#{ctx.author.discriminator} " @@ -221,6 +221,7 @@ class Snekbox(Cog): del self.jobs[ctx.author.id] -def setup(bot): +def setup(bot: Bot) -> None: + """Snekbox cog load.""" bot.add_cog(Snekbox(bot)) log.info("Cog loaded: Snekbox") diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/superstarify/__init__.py index e9743a2f5..f7d6a269d 100644 --- a/bot/cogs/superstarify/__init__.py +++ b/bot/cogs/superstarify/__init__.py @@ -19,30 +19,30 @@ NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#wiki-toc-nickname- class Superstarify(Cog): - """ - A set of commands to moderate terrible nicknames. - """ + """A set of commands to moderate terrible nicknames.""" def __init__(self, bot: Bot): self.bot = bot @property def moderation(self) -> Moderation: + """Get currently loaded Moderation cog instance.""" return self.bot.get_cog("Moderation") @property def modlog(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @Cog.listener() - async def on_member_update(self, before: Member, after: Member): + async def on_member_update(self, before: Member, after: Member) -> None: """ This event will trigger when someone changes their name. - At this point we will look up the user in our database and check - whether they are allowed to change their names, or if they are in - superstar-prison. If they are not allowed, we will change it back. - """ + At this point we will look up the user in our database and check whether they are allowed to + change their names, or if they are in superstar-prison. If they are not allowed, we will + change it back. + """ if before.display_name == after.display_name: return # User didn't change their nickname. Abort! @@ -93,14 +93,13 @@ class Superstarify(Cog): ) @Cog.listener() - async def on_member_join(self, member: Member): + async def on_member_join(self, member: Member) -> None: """ This event will trigger when someone (re)joins the server. - At this point we will look up the user in our database and check - whether they are in superstar-prison. If so, we will change their name - back to the forced nickname. - """ + At this point we will look up the user in our database and check whether they are in + superstar-prison. If so, we will change their name back to the forced nickname. + """ active_superstarifies = await self.bot.api_client.get( 'bot/infractions', params={ @@ -155,13 +154,14 @@ class Superstarify(Cog): @with_role(*MODERATION_ROLES) async def superstarify( self, ctx: Context, member: Member, expiration: ExpirationDate, reason: str = None - ): + ) -> None: """ - This command will force a random superstar name (like Taylor Swift) to be the user's - nickname for a specified duration. An optional reason can be provided. + Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration. + + An optional reason can be provided. + If no reason is given, the original name will be shown in a generated reason. """ - active_superstarifies = await self.bot.api_client.get( 'bot/infractions', params={ @@ -171,10 +171,11 @@ class Superstarify(Cog): } ) if active_superstarifies: - return await ctx.send( + await ctx.send( ":x: According to my records, this user is already superstarified. " f"See infraction **#{active_superstarifies[0]['id']}**." ) + return infraction = await post_infraction( ctx, member, @@ -224,15 +225,8 @@ class Superstarify(Cog): @command(name='unsuperstarify', aliases=('release_nick', 'unstar')) @with_role(*MODERATION_ROLES) - async def unsuperstarify(self, ctx: Context, member: Member): - """ - This command will remove the entry from our database, allowing the user - to once again change their nickname. - - :param ctx: Discord message context - :param member: The member to unsuperstarify - """ - + async def unsuperstarify(self, ctx: Context, member: Member) -> None: + """Remove the superstarify entry from our database, allowing the user to change their nickname.""" log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}") embed = Embed() @@ -247,9 +241,8 @@ class Superstarify(Cog): } ) if not active_superstarifies: - return await ctx.send( - ":x: There is no active superstarify infraction for this user." - ) + await ctx.send(":x: There is no active superstarify infraction for this user.") + return [infraction] = active_superstarifies await self.bot.api_client.patch( @@ -270,6 +263,7 @@ class Superstarify(Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: + """Superstarify cog load.""" bot.add_cog(Superstarify(bot)) log.info("Cog loaded: Superstarify") diff --git a/bot/cogs/superstarify/stars.py b/bot/cogs/superstarify/stars.py index 9b49d7175..dbac86770 100644 --- a/bot/cogs/superstarify/stars.py +++ b/bot/cogs/superstarify/stars.py @@ -81,6 +81,7 @@ STAR_NAMES = ( ) -def get_nick(infraction_id, member_id): +def get_nick(infraction_id: int, member_id: int) -> str: + """Randomly select a nickname from the Superstarify nickname list.""" rng = random.Random(str(infraction_id) + str(member_id)) return rng.choice(STAR_NAMES) diff --git a/bot/cogs/sync/__init__.py b/bot/cogs/sync/__init__.py index e4f960620..d4565f848 100644 --- a/bot/cogs/sync/__init__.py +++ b/bot/cogs/sync/__init__.py @@ -1,10 +1,13 @@ import logging +from discord.ext.commands import Bot + from .cog import Sync log = logging.getLogger(__name__) -def setup(bot): +def setup(bot: Bot) -> None: + """Sync cog load.""" bot.add_cog(Sync(bot)) log.info("Cog loaded: Sync") diff --git a/bot/cogs/sync/cog.py b/bot/cogs/sync/cog.py index 9a3a48bba..b75fb26cd 100644 --- a/bot/cogs/sync/cog.py +++ b/bot/cogs/sync/cog.py @@ -177,7 +177,6 @@ class Sync(Cog): @commands.has_permissions(administrator=True) async def sync_roles_command(self, ctx: Context) -> None: """Manually synchronize the guild's roles with the roles on the site.""" - initial_response = await ctx.send("📊 Synchronizing roles.") total_created, total_updated, total_deleted = await syncers.sync_roles(self.bot, ctx.guild) await initial_response.edit( @@ -191,7 +190,6 @@ class Sync(Cog): @commands.has_permissions(administrator=True) async def sync_users_command(self, ctx: Context) -> None: """Manually synchronize the guild's users with the users on the site.""" - initial_response = await ctx.send("📊 Synchronizing users.") total_created, total_updated, total_deleted = await syncers.sync_users(self.bot, ctx.guild) await initial_response.edit( diff --git a/bot/cogs/sync/syncers.py b/bot/cogs/sync/syncers.py index 414c24adb..2cc5a66e1 100644 --- a/bot/cogs/sync/syncers.py +++ b/bot/cogs/sync/syncers.py @@ -34,7 +34,6 @@ def get_roles_for_sync( to be deleted on the site, meaning the roles are present on the API but not in the cached guild. """ - guild_role_ids = {role.id for role in guild_roles} api_role_ids = {role.id for role in api_roles} new_role_ids = guild_role_ids - api_role_ids @@ -66,7 +65,6 @@ async def sync_roles(bot: Bot, guild: Guild) -> Tuple[int, int, int]: (element `0`) , how many roles were updated (element `1`), and how many roles were deleted (element `2`) on the API. """ - roles = await bot.api_client.get('bot/roles') # Pack API roles and guild roles into one common format, @@ -138,7 +136,6 @@ def get_users_for_sync( guild, but where the attribute of a user on the API is not equal to the attribute of the user on the guild. """ - users_to_create = set() users_to_update = set() @@ -169,8 +166,7 @@ def get_users_for_sync( async def sync_users(bot: Bot, guild: Guild) -> Tuple[int, int, None]: """ - Synchronize users found on the given - `guild` with the ones on the API. + Synchronize users found in the given `guild` with the ones in the API. Arguments: bot (discord.ext.commands.Bot): @@ -186,7 +182,6 @@ async def sync_users(bot: Bot, guild: Guild) -> Tuple[int, int, None]: (element `0`) and how many users were updated (element `1`), and `None` to indicate that a user sync never deletes entries from the API. """ - current_users = await bot.api_client.get('bot/users') # Pack API users and guild users into one common format, diff --git a/bot/cogs/tags.py b/bot/cogs/tags.py index 8e9ba5da3..660620284 100644 --- a/bot/cogs/tags.py +++ b/bot/cogs/tags.py @@ -20,41 +20,26 @@ TEST_CHANNELS = ( class Tags(Cog): - """ - Save new tags and fetch existing tags. - """ + """Save new tags and fetch existing tags.""" def __init__(self, bot: Bot): self.bot = bot self.tag_cooldowns = {} @group(name='tags', aliases=('tag', 't'), hidden=True, invoke_without_command=True) - async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None): + async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None: """Show all known tags, a single tag, or run a subcommand.""" - await ctx.invoke(self.get_command, tag_name=tag_name) @tags_group.command(name='get', aliases=('show', 'g')) - async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None): - """ - Get a list of all tags or a specified tag. - - :param ctx: Discord message context - :param tag_name: - If provided, this function shows data for that specific tag. - If not provided, this function shows the caller a list of all tags. - """ - - def _command_on_cooldown(tag_name) -> bool: + async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None: + """Get a specified tag, or a list of all tags if no tag is specified.""" + def _command_on_cooldown(tag_name: str) -> bool: """ - Check if the command is currently on cooldown. - The cooldown duration is set in constants.py. + Check if the command is currently on cooldown, on a per-tag, per-channel basis. - This works on a per-tag, per-channel basis. - :param tag_name: The name of the command to check. - :return: True if the command is cooling down. Otherwise False. + The cooldown duration is set in constants.py. """ - now = time.time() cooldown_conditions = ( @@ -109,15 +94,8 @@ class Tags(Cog): tag_name: TagNameConverter, *, tag_content: TagContentConverter, - ): - """ - Create a new tag or update an existing one. - - :param ctx: discord message context - :param tag_name: The name of the tag to create or edit. - :param tag_content: The content of the tag. - """ - + ) -> None: + """Create a new tag or update an existing one.""" body = { 'title': tag_name.lower().strip(), 'embed': { @@ -140,14 +118,8 @@ class Tags(Cog): @tags_group.command(name='delete', aliases=('remove', 'rm', 'd')) @with_role(Roles.admin, Roles.owner) - async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter): - """ - Remove a tag from the database. - - :param ctx: discord message context - :param tag_name: The name of the tag to delete. - """ - + async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter) -> None: + """Remove a tag from the database.""" await self.bot.api_client.delete(f'bot/tags/{tag_name}') log.debug(f"{ctx.author} successfully deleted the tag called '{tag_name}'") @@ -158,6 +130,7 @@ class Tags(Cog): )) -def setup(bot): +def setup(bot: Bot) -> None: + """Tags cog load.""" bot.add_cog(Tags(bot)) log.info("Cog loaded: Tags") diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py index 64bf126d6..7dd0afbbd 100644 --- a/bot/cogs/token_remover.py +++ b/bot/cogs/token_remover.py @@ -42,10 +42,16 @@ class TokenRemover(Cog): @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @Cog.listener() - async def on_message(self, msg: Message): + async def on_message(self, msg: Message) -> None: + """ + Check each message for a string that matches Discord's token pattern. + + See: https://discordapp.com/developers/docs/reference#snowflakes + """ if msg.author.bot: return @@ -82,6 +88,11 @@ class TokenRemover(Cog): @staticmethod def is_valid_user_id(b64_content: str) -> bool: + """ + Check potential token to see if it contains a valid Discord user ID. + + See: https://discordapp.com/developers/docs/reference#snowflakes + """ b64_content += '=' * (-len(b64_content) % 4) try: @@ -92,6 +103,11 @@ class TokenRemover(Cog): @staticmethod def is_valid_timestamp(b64_content: str) -> bool: + """ + Check potential token to see if it contains a valid timestamp. + + See: https://discordapp.com/developers/docs/reference#snowflakes + """ b64_content += '=' * (-len(b64_content) % 4) try: @@ -102,6 +118,7 @@ class TokenRemover(Cog): return snowflake_time(snowflake + TOKEN_EPOCH) < DISCORD_EPOCH_TIMESTAMP -def setup(bot: Bot): +def setup(bot: Bot) -> None: + """Token Remover cog load.""" bot.add_cog(TokenRemover(bot)) log.info("Cog loaded: TokenRemover") diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py index 08e77a24e..62e2fb03f 100644 --- a/bot/cogs/utils.py +++ b/bot/cogs/utils.py @@ -5,7 +5,7 @@ from email.parser import HeaderParser from io import StringIO from discord import Colour, Embed -from discord.ext.commands import AutoShardedBot, Cog, Context, command +from discord.ext.commands import Bot, Cog, Context, command from bot.constants import Channels, STAFF_ROLES from bot.decorators import in_channel @@ -14,26 +14,22 @@ log = logging.getLogger(__name__) class Utils(Cog): - """ - A selection of utilities which don't have a clear category. - """ + """A selection of utilities which don't have a clear category.""" - def __init__(self, bot: AutoShardedBot): + def __init__(self, bot: Bot): self.bot = bot self.base_pep_url = "http://www.python.org/dev/peps/pep-" self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-" @command(name='pep', aliases=('get_pep', 'p')) - async def pep_command(self, ctx: Context, pep_number: str): - """ - Fetches information about a PEP and sends it to the channel. - """ - + async def pep_command(self, ctx: Context, pep_number: str) -> None: + """Fetches information about a PEP and sends it to the channel.""" if pep_number.isdigit(): pep_number = int(pep_number) else: - return await ctx.invoke(self.bot.get_command("help"), "pep") + await ctx.invoke(self.bot.get_command("help"), "pep") + return # Newer PEPs are written in RST instead of txt if pep_number > 542: @@ -89,11 +85,8 @@ class Utils(Cog): @command() @in_channel(Channels.bot, bypass_roles=STAFF_ROLES) - async def charinfo(self, ctx, *, characters: str): - """ - Shows you information on up to 25 unicode characters. - """ - + async def charinfo(self, ctx: Context, *, characters: str) -> None: + """Shows you information on up to 25 unicode characters.""" match = re.match(r"<(a?):(\w+):(\d+)>", characters) if match: embed = Embed( @@ -104,12 +97,14 @@ class Utils(Cog): ) ) embed.colour = Colour.red() - return await ctx.send(embed=embed) + await ctx.send(embed=embed) + return if len(characters) > 25: embed = Embed(title=f"Too many characters ({len(characters)}/25)") embed.colour = Colour.red() - return await ctx.send(embed=embed) + await ctx.send(embed=embed) + return def get_info(char): digit = f"{ord(char):x}" @@ -133,6 +128,7 @@ class Utils(Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: + """Utils cog load.""" bot.add_cog(Utils(bot)) log.info("Cog loaded: Utils") diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py index c42d4d67e..b0c250603 100644 --- a/bot/cogs/verification.py +++ b/bot/cogs/verification.py @@ -29,19 +29,19 @@ If you'd like to unsubscribe from the announcement notifications, simply send `! class Verification(Cog): - """ - User verification and role self-management - """ + """User verification and role self-management.""" def __init__(self, bot: Bot): self.bot = bot @property def mod_log(self) -> ModLog: + """Get currently loaded ModLog cog instance.""" return self.bot.get_cog("ModLog") @Cog.listener() - async def on_message(self, message: Message): + async def on_message(self, message: Message) -> None: + """Check new message event for messages to the checkpoint channel & process.""" if message.author.bot: return # They're a bot, ignore @@ -75,11 +75,8 @@ class Verification(Cog): @command(name='accept', aliases=('verify', 'verified', 'accepted'), hidden=True) @without_role(Roles.verified) @in_channel(Channels.verification) - async def accept_command(self, ctx: Context, *_): # We don't actually care about the args - """ - Accept our rules and gain access to the rest of the server - """ - + async def accept_command(self, ctx: Context, *_) -> None: # We don't actually care about the args + """Accept our rules and gain access to the rest of the server.""" log.debug(f"{ctx.author} called !accept. Assigning the 'Developer' role.") await ctx.author.add_roles(Object(Roles.verified), reason="Accepted the rules") try: @@ -98,11 +95,8 @@ class Verification(Cog): @command(name='subscribe') @in_channel(Channels.bot) - async def subscribe_command(self, ctx: Context, *_): # We don't actually care about the args - """ - Subscribe to announcement notifications by assigning yourself the role - """ - + async def subscribe_command(self, ctx: Context, *_) -> None: # We don't actually care about the args + """Subscribe to announcement notifications by assigning yourself the role.""" has_role = False for role in ctx.author.roles: @@ -111,9 +105,8 @@ class Verification(Cog): break if has_role: - return await ctx.send( - f"{ctx.author.mention} You're already subscribed!", - ) + await ctx.send(f"{ctx.author.mention} You're already subscribed!") + return log.debug(f"{ctx.author} called !subscribe. Assigning the 'Announcements' role.") await ctx.author.add_roles(Object(Roles.announcements), reason="Subscribed to announcements") @@ -126,11 +119,8 @@ class Verification(Cog): @command(name='unsubscribe') @in_channel(Channels.bot) - async def unsubscribe_command(self, ctx: Context, *_): # We don't actually care about the args - """ - Unsubscribe from announcement notifications by removing the role from yourself - """ - + async def unsubscribe_command(self, ctx: Context, *_) -> None: # We don't actually care about the args + """Unsubscribe from announcement notifications by removing the role from yourself.""" has_role = False for role in ctx.author.roles: @@ -139,9 +129,8 @@ class Verification(Cog): break if not has_role: - return await ctx.send( - f"{ctx.author.mention} You're already unsubscribed!" - ) + await ctx.send(f"{ctx.author.mention} You're already unsubscribed!") + return log.debug(f"{ctx.author} called !unsubscribe. Removing the 'Announcements' role.") await ctx.author.remove_roles(Object(Roles.announcements), reason="Unsubscribed from announcements") @@ -153,23 +142,21 @@ class Verification(Cog): ) @staticmethod - async def cog_command_error(ctx: Context, error): + async def cog_command_error(ctx: Context, error: Exception) -> None: + """Check for & ignore any InChannelCheckFailure.""" if isinstance(error, InChannelCheckFailure): - # Do nothing; just ignore this error error.handled = True @staticmethod - def bot_check(ctx: Context): - """ - Block any command within the verification channel that is not !accept. - """ - + def bot_check(ctx: Context) -> bool: + """Block any command within the verification channel that is not !accept.""" if ctx.channel.id == Channels.verification: return ctx.command.name == "accept" else: return True -def setup(bot): +def setup(bot: Bot) -> None: + """Verification cog load.""" bot.add_cog(Verification(bot)) log.info("Cog loaded: Verification") diff --git a/bot/cogs/watchchannels/__init__.py b/bot/cogs/watchchannels/__init__.py index ac7713803..86e1050fa 100644 --- a/bot/cogs/watchchannels/__init__.py +++ b/bot/cogs/watchchannels/__init__.py @@ -1,5 +1,7 @@ import logging +from discord.ext.commands import Bot + from .bigbrother import BigBrother from .talentpool import TalentPool @@ -7,7 +9,8 @@ from .talentpool import TalentPool log = logging.getLogger(__name__) -def setup(bot): +def setup(bot: Bot) -> None: + """Monitoring cogs load.""" bot.add_cog(BigBrother(bot)) log.info("Cog loaded: BigBrother") diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py index 338b6c4ad..e191c2dbc 100644 --- a/bot/cogs/watchchannels/bigbrother.py +++ b/bot/cogs/watchchannels/bigbrother.py @@ -3,7 +3,7 @@ from collections import ChainMap from typing import Union from discord import User -from discord.ext.commands import Cog, Context, group +from discord.ext.commands import Bot, Cog, Context, group from bot.constants import Channels, Roles, Webhooks from bot.decorators import with_role @@ -16,7 +16,7 @@ log = logging.getLogger(__name__) class BigBrother(WatchChannel, Cog, name="Big Brother"): """Monitors users by relaying their messages to a watch channel to assist with moderation.""" - def __init__(self, bot) -> None: + def __init__(self, bot: Bot) -> None: super().__init__( bot, destination=Channels.big_brother_logs, diff --git a/bot/cogs/watchchannels/talentpool.py b/bot/cogs/watchchannels/talentpool.py index 4452d7a59..ffe7693a9 100644 --- a/bot/cogs/watchchannels/talentpool.py +++ b/bot/cogs/watchchannels/talentpool.py @@ -4,7 +4,7 @@ from collections import ChainMap from typing import Union from discord import Color, Embed, Member, User -from discord.ext.commands import Cog, Context, group +from discord.ext.commands import Bot, Cog, Context, group from bot.api import ResponseCodeError from bot.constants import Channels, Guild, Roles, Webhooks @@ -19,7 +19,7 @@ STAFF_ROLES = Roles.owner, Roles.admin, Roles.moderator, Roles.helpers # <- I class TalentPool(WatchChannel, Cog, name="Talentpool"): """Relays messages of helper candidates to a watch channel to observe them.""" - def __init__(self, bot) -> None: + def __init__(self, bot: Bot) -> None: super().__init__( bot, destination=Channels.talent_pool, @@ -33,7 +33,6 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"): @with_role(Roles.owner, Roles.admin, Roles.moderator) async def nomination_group(self, ctx: Context) -> None: """Highlights the activity of helper nominees by relaying their messages to the talent pool channel.""" - await ctx.invoke(self.bot.get_command("help"), "talentpool") @nomination_group.command(name='watched', aliases=('all', 'list')) @@ -156,7 +155,6 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"): @with_role(Roles.owner, Roles.admin, Roles.moderator) async def nomination_edit_group(self, ctx: Context) -> None: """Commands to edit nominations.""" - await ctx.invoke(self.bot.get_command("help"), "talentpool", "edit") @nomination_edit_group.command(name='reason') diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py index c34b0d5bb..e78282900 100644 --- a/bot/cogs/watchchannels/watchchannel.py +++ b/bot/cogs/watchchannels/watchchannel.py @@ -42,6 +42,8 @@ def proxy_user(user_id: str) -> Object: @dataclass class MessageHistory: + """Represents a watch channel's message history.""" + last_author: Optional[int] = None last_channel: Optional[int] = None message_count: int = 0 @@ -51,7 +53,15 @@ class WatchChannel(metaclass=CogABCMeta): """ABC with functionality for relaying users' messages to a certain channel.""" @abstractmethod - def __init__(self, bot: Bot, destination, webhook_id, api_endpoint, api_default_params, logger) -> None: + def __init__( + self, + bot: Bot, + destination: int, + webhook_id: int, + api_endpoint: str, + api_default_params: dict, + logger: logging.Logger + ) -> None: self.bot = bot self.destination = destination # E.g., Channels.big_brother_logs @@ -265,7 +275,7 @@ class WatchChannel(metaclass=CogABCMeta): self.message_history.message_count += 1 - async def send_header(self, msg) -> None: + async def send_header(self, msg: Message) -> None: """Sends a header embed with information about the relayed messages to the watch channel.""" user_id = msg.author.id diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py index e88efa033..ab0ed2472 100644 --- a/bot/cogs/wolfram.py +++ b/bot/cogs/wolfram.py @@ -1,13 +1,13 @@ import logging from io import BytesIO -from typing import List, Optional, Tuple +from typing import Callable, List, Optional, Tuple from urllib import parse import discord from dateutil.relativedelta import relativedelta from discord import Embed from discord.ext import commands -from discord.ext.commands import BucketType, Cog, Context, check, group +from discord.ext.commands import Bot, BucketType, Cog, Context, check, group from bot.constants import Colours, STAFF_ROLES, Wolfram from bot.pagination import ImagePaginator @@ -37,18 +37,7 @@ async def send_embed( img_url: str = None, f: discord.File = None ) -> None: - """ - Generates an embed with wolfram as the author, with message_txt as description, - adds custom colour if specified, a footer and image (could be a file with f param) and sends - the embed through ctx - :param ctx: Context - :param message_txt: str - Message to be sent - :param colour: int - Default: Colours.soft_red - Colour of embed - :param footer: str - Default: None - Adds a footer to the embed - :param img_url:str - Default: None - Adds an image to the embed - :param f: discord.File - Default: None - Add a file to the msg, often attached as image to embed - """ - + """Generate & send a response embed with Wolfram as the author.""" embed = Embed(colour=colour) embed.description = message_txt embed.set_author(name="Wolfram Alpha", @@ -63,16 +52,12 @@ async def send_embed( await ctx.send(embed=embed, file=f) -def custom_cooldown(*ignore: List[int]) -> check: +def custom_cooldown(*ignore: List[int]) -> Callable: """ - Custom cooldown mapping that applies a specific requests per day to users. - Staff is ignored by the user cooldown, however the cooldown implements a - total amount of uses per day for the entire guild. (Configurable in configs) + Implement per-user and per-guild cooldowns for requests to the Wolfram API. - :param ignore: List[int] -- list of ids of roles to be ignored by user cooldown - :return: check + A list of roles may be provided to ignore the per-user cooldown """ - async def predicate(ctx: Context) -> bool: user_bucket = usercd.get_bucket(ctx.message) @@ -109,8 +94,8 @@ def custom_cooldown(*ignore: List[int]) -> check: return check(predicate) -async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]: - # Give feedback that the bot is working. +async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[List[Tuple]]: + """Get the Wolfram API pod pages for the provided query.""" async with ctx.channel.typing(): url_str = parse.urlencode({ "input": query, @@ -164,9 +149,7 @@ async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]: class Wolfram(Cog): - """ - Commands for interacting with the Wolfram|Alpha API. - """ + """Commands for interacting with the Wolfram|Alpha API.""" def __init__(self, bot: commands.Bot): self.bot = bot @@ -174,14 +157,7 @@ class Wolfram(Cog): @group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True) @custom_cooldown(*STAFF_ROLES) async def wolfram_command(self, ctx: Context, *, query: str) -> None: - """ - Requests all answers on a single image, - sends an image of all related pods - - :param ctx: Context - :param query: str - string request to api - """ - + """Requests all answers on a single image, sends an image of all related pods.""" url_str = parse.urlencode({ "i": query, "appid": APPID, @@ -221,13 +197,10 @@ class Wolfram(Cog): @custom_cooldown(*STAFF_ROLES) async def wolfram_page_command(self, ctx: Context, *, query: str) -> None: """ - Requests a drawn image of given query - Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc + Requests a drawn image of given query. - :param ctx: Context - :param query: str - string request to api + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. """ - pages = await get_pod_pages(ctx, self.bot, query) if not pages: @@ -243,15 +216,12 @@ class Wolfram(Cog): @wolfram_command.command(name="cut", aliases=("c",)) @custom_cooldown(*STAFF_ROLES) - async def wolfram_cut_command(self, ctx, *, query: str) -> None: + async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None: """ - Requests a drawn image of given query - Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc + Requests a drawn image of given query. - :param ctx: Context - :param query: str - string request to api + Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc. """ - pages = await get_pod_pages(ctx, self.bot, query) if not pages: @@ -267,14 +237,7 @@ class Wolfram(Cog): @wolfram_command.command(name="short", aliases=("sh", "s")) @custom_cooldown(*STAFF_ROLES) async def wolfram_short_command(self, ctx: Context, *, query: str) -> None: - """ - Requests an answer to a simple question - Responds in plaintext - - :param ctx: Context - :param query: str - string request to api - """ - + """Requests an answer to a simple question.""" url_str = parse.urlencode({ "i": query, "appid": APPID, @@ -304,5 +267,6 @@ class Wolfram(Cog): def setup(bot: commands.Bot) -> None: + """Wolfram cog load.""" bot.add_cog(Wolfram(bot)) log.info("Cog loaded: Wolfram") diff --git a/bot/converters.py b/bot/converters.py index 4bd9aba13..7386187ab 100644 --- a/bot/converters.py +++ b/bot/converters.py @@ -1,6 +1,7 @@ import logging from datetime import datetime from ssl import CertificateError +from typing import Union import dateparser import discord @@ -15,17 +16,16 @@ class ValidPythonIdentifier(Converter): """ A converter that checks whether the given string is a valid Python identifier. - This is used to have package names - that correspond to how you would use - the package in your code, e.g. - `import package`. Raises `BadArgument` - if the argument is not a valid Python - identifier, and simply passes through + This is used to have package names that correspond to how you would use the package in your + code, e.g. `import package`. + + Raises `BadArgument` if the argument is not a valid Python identifier, and simply passes through the given argument otherwise. """ @staticmethod - async def convert(ctx, argument: str): + async def convert(ctx: Context, argument: str) -> str: + """Checks whether the given string is a valid Python identifier.""" if not argument.isidentifier(): raise BadArgument(f"`{argument}` is not a valid Python identifier") return argument @@ -35,14 +35,15 @@ class ValidURL(Converter): """ Represents a valid webpage URL. - This converter checks whether the given - URL can be reached and requesting it returns - a status code of 200. If not, `BadArgument` - is raised. Otherwise, it simply passes through the given URL. + This converter checks whether the given URL can be reached and requesting it returns a status + code of 200. If not, `BadArgument` is raised. + + Otherwise, it simply passes through the given URL. """ @staticmethod - async def convert(ctx, url: str): + async def convert(ctx: Context, url: str) -> str: + """This converter checks whether the given URL can be reached with a status code of 200.""" try: async with ctx.bot.http_session.get(url) as resp: if resp.status != 200: @@ -63,12 +64,11 @@ class ValidURL(Converter): class InfractionSearchQuery(Converter): - """ - A converter that checks if the argument is a Discord user, and if not, falls back to a string. - """ + """A converter that checks if the argument is a Discord user, and if not, falls back to a string.""" @staticmethod - async def convert(ctx, arg): + async def convert(ctx: Context, arg: str) -> Union[discord.Member, str]: + """Check if the argument is a Discord user, and if not, falls back to a string.""" try: maybe_snowflake = arg.strip("<@!>") return await ctx.bot.fetch_user(maybe_snowflake) @@ -77,12 +77,15 @@ class InfractionSearchQuery(Converter): class Subreddit(Converter): - """ - Forces a string to begin with "r/" and checks if it's a valid subreddit. - """ + """Forces a string to begin with "r/" and checks if it's a valid subreddit.""" @staticmethod - async def convert(ctx, sub: str): + async def convert(ctx: Context, sub: str) -> str: + """ + Force sub to begin with "r/" and check if it's a valid subreddit. + + If sub is a valid subreddit, return it prepended with "r/" + """ sub = sub.lower() if not sub.startswith("r/"): @@ -103,9 +106,21 @@ class Subreddit(Converter): class TagNameConverter(Converter): + """ + Ensure that a proposed tag name is valid. + + Valid tag names meet the following conditions: + * All ASCII characters + * Has at least one non-whitespace character + * Not solely numeric + * Shorter than 127 characters + """ + @staticmethod - async def convert(ctx: Context, tag_name: str): - def is_number(value): + async def convert(ctx: Context, tag_name: str) -> str: + """Lowercase & strip whitespace from proposed tag_name & ensure it's valid.""" + def is_number(value: str) -> bool: + """Check to see if the input string is numeric.""" try: float(value) except ValueError: @@ -142,8 +157,15 @@ class TagNameConverter(Converter): class TagContentConverter(Converter): + """Ensure proposed tag content is not empty and contains at least one non-whitespace character.""" + @staticmethod - async def convert(ctx: Context, tag_content: str): + async def convert(ctx: Context, tag_content: str) -> str: + """ + Ensure tag_content is non-empty and contains at least one non-whitespace character. + + If tag_content is valid, return the stripped version. + """ tag_content = tag_content.strip() # The tag contents should not be empty, or filled with whitespace. @@ -156,13 +178,16 @@ class TagContentConverter(Converter): class ExpirationDate(Converter): + """Convert relative expiration date into UTC datetime using dateparser.""" + DATEPARSER_SETTINGS = { 'PREFER_DATES_FROM': 'future', 'TIMEZONE': 'UTC', 'TO_TIMEZONE': 'UTC' } - async def convert(self, ctx, expiration_string: str): + async def convert(self, ctx: Context, expiration_string: str) -> datetime: + """Convert relative expiration date into UTC datetime.""" expiry = dateparser.parse(expiration_string, settings=self.DATEPARSER_SETTINGS) if expiry is None: raise BadArgument(f"Failed to parse expiration date from `{expiration_string}`") diff --git a/bot/decorators.py b/bot/decorators.py index 923d21938..33a6bcadd 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -1,9 +1,9 @@ import logging import random -import typing from asyncio import Lock, sleep from contextlib import suppress from functools import wraps +from typing import Any, Callable, Container, Optional from weakref import WeakValueDictionary from discord import Colour, Embed @@ -18,6 +18,8 @@ log = logging.getLogger(__name__) class InChannelCheckFailure(CheckFailure): + """Raised when a check fails for a message being sent in a whitelisted channel.""" + def __init__(self, *channels: int): self.channels = channels channels_str = ', '.join(f"<#{c_id}>" for c_id in channels) @@ -25,11 +27,10 @@ class InChannelCheckFailure(CheckFailure): super().__init__(f"Sorry, but you may only use this command within {channels_str}.") -def in_channel(*channels: int, bypass_roles: typing.Container[int] = None): - """ - Checks that the message is in a whitelisted channel or optionally has a bypass role. - """ - def predicate(ctx: Context): +def in_channel(*channels: int, bypass_roles: Container[int] = None) -> Callable: + """Checks that the message is in a whitelisted channel or optionally has a bypass role.""" + def predicate(ctx: Context) -> bool: + """In-channel checker predicate.""" if ctx.channel.id in channels: log.debug(f"{ctx.author} tried to call the '{ctx.command.name}' command. " f"The command was used in a whitelisted channel.") @@ -50,42 +51,34 @@ def in_channel(*channels: int, bypass_roles: typing.Container[int] = None): return commands.check(predicate) -def with_role(*role_ids: int): - """ - Returns True if the user has any one - of the roles in role_ids. - """ - - async def predicate(ctx: Context): +def with_role(*role_ids: int) -> Callable: + """Returns True if the user has any one of the roles in role_ids.""" + async def predicate(ctx: Context) -> bool: + """With role checker predicate.""" return with_role_check(ctx, *role_ids) return commands.check(predicate) -def without_role(*role_ids: int): - """ - Returns True if the user does not have any - of the roles in role_ids. - """ - - async def predicate(ctx: Context): +def without_role(*role_ids: int) -> Callable: + """Returns True if the user does not have any of the roles in role_ids.""" + async def predicate(ctx: Context) -> bool: return without_role_check(ctx, *role_ids) return commands.check(predicate) -def locked(): +def locked() -> Callable: """ Allows the user to only run one instance of the decorated command at a time. - Subsequent calls to the command from the same author are - ignored until the command has completed invocation. + + Subsequent calls to the command from the same author are ignored until the command has completed invocation. This decorator has to go before (below) the `command` decorator. """ - - def wrap(func): + def wrap(func: Callable) -> Callable: func.__locks = WeakValueDictionary() @wraps(func) - async def inner(self, ctx, *args, **kwargs): + async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Optional[Any]: lock = func.__locks.setdefault(ctx.author.id, Lock()) if lock.locked(): embed = Embed() @@ -105,15 +98,15 @@ def locked(): return wrap -def redirect_output(destination_channel: int, bypass_roles: typing.Container[int] = None): - """ - Changes the channel in the context of the command to redirect the output - to a certain channel, unless the author has a role to bypass redirection +def redirect_output(destination_channel: int, bypass_roles: Container[int] = None) -> Callable: """ + Changes the channel in the context of the command to redirect the output to a certain channel. - def wrap(func): + Redirect is bypassed if the author has a role to bypass redirection. + """ + def wrap(func: Callable) -> Callable: @wraps(func) - async def inner(self, ctx, *args, **kwargs): + async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Any: if ctx.channel.id == destination_channel: log.trace(f"Command {ctx.command.name} was invoked in destination_channel, not redirecting") return await func(self, ctx, *args, **kwargs) diff --git a/bot/interpreter.py b/bot/interpreter.py index 06343db39..a42b45a2d 100644 --- a/bot/interpreter.py +++ b/bot/interpreter.py @@ -1,5 +1,8 @@ from code import InteractiveInterpreter from io import StringIO +from typing import Any + +from discord.ext.commands import Bot, Context CODE_TEMPLATE = """ async def _func(): @@ -8,13 +11,20 @@ async def _func(): class Interpreter(InteractiveInterpreter): + """ + Subclass InteractiveInterpreter to specify custom run functionality. + + Helper class for internal eval. + """ + write_callable = None - def __init__(self, bot): + def __init__(self, bot: Bot): _locals = {"bot": bot} super().__init__(_locals) - async def run(self, code, ctx, io, *args, **kwargs): + async def run(self, code: str, ctx: Context, io: StringIO, *args, **kwargs) -> Any: + """Execute the provided source code as the bot & return the output.""" self.locals["_rvalue"] = [] self.locals["ctx"] = ctx self.locals["print"] = lambda x: io.write(f"{x}\n") diff --git a/bot/pagination.py b/bot/pagination.py index 0ad5b81f1..76082f459 100644 --- a/bot/pagination.py +++ b/bot/pagination.py @@ -2,7 +2,7 @@ import asyncio import logging from typing import Iterable, List, Optional, Tuple -from discord import Embed, Member, Reaction +from discord import Embed, Member, Message, Reaction from discord.abc import User from discord.ext.commands import Context, Paginator @@ -18,6 +18,8 @@ log = logging.getLogger(__name__) class EmptyPaginatorEmbed(Exception): + """Raised when attempting to paginate with empty contents.""" + pass @@ -25,25 +27,24 @@ class LinePaginator(Paginator): """ A class that aids in paginating code blocks for Discord messages. - Attributes - ----------- - prefix: :class:`str` + Available attributes include: + * prefix: `str` The prefix inserted to every page. e.g. three backticks. - suffix: :class:`str` + * suffix: `str` The suffix appended at the end of every page. e.g. three backticks. - max_size: :class:`int` + * max_size: `int` The maximum amount of codepoints allowed in a page. - max_lines: :class:`int` + * max_lines: `int` The maximum amount of lines allowed in a page. """ - def __init__(self, prefix='```', suffix='```', - max_size=2000, max_lines=None): + def __init__( + self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None + ) -> None: """ - This function overrides the Paginator.__init__ - from inside discord.ext.commands. - It overrides in order to allow us to configure - the maximum number of lines per page. + This function overrides the Paginator.__init__ from inside discord.ext.commands. + + It overrides in order to allow us to configure the maximum number of lines per page. """ self.prefix = prefix self.suffix = suffix @@ -54,28 +55,15 @@ class LinePaginator(Paginator): self._count = len(prefix) + 1 # prefix + newline self._pages = [] - def add_line(self, line='', *, empty=False): - """Adds a line to the current page. - - If the line exceeds the :attr:`max_size` then an exception - is raised. + def add_line(self, line: str = '', *, empty: bool = False) -> None: + """ + Adds a line to the current page. - This function overrides the Paginator.add_line - from inside discord.ext.commands. - It overrides in order to allow us to configure - the maximum number of lines per page. + If the line exceeds the `self.max_size` then an exception is raised. - Parameters - ----------- - line: str - The line to add. - empty: bool - Indicates if another empty line should be added. + This function overrides the `Paginator.add_line` from inside `discord.ext.commands`. - Raises - ------ - RuntimeError - The line was too big for the current :attr:`max_size`. + It overrides in order to allow us to configure the maximum number of lines per page. """ if len(line) > self.max_size - len(self.prefix) - 2: raise RuntimeError('Line exceeds maximum page size %s' % (self.max_size - len(self.prefix) - 2)) @@ -97,42 +85,39 @@ class LinePaginator(Paginator): self._count += 1 @classmethod - async def paginate(cls, lines: Iterable[str], ctx: Context, embed: Embed, - prefix: str = "", suffix: str = "", max_lines: Optional[int] = None, max_size: int = 500, - empty: bool = True, restrict_to_user: User = None, timeout: int = 300, - footer_text: str = None, url: str = None, exception_on_empty_embed: bool = False): + async def paginate( + cls, + lines: Iterable[str], + ctx: Context, + embed: Embed, + prefix: str = "", + suffix: str = "", + max_lines: Optional[int] = None, + max_size: int = 500, + empty: bool = True, + restrict_to_user: User = None, + timeout: int = 300, + footer_text: str = None, + url: str = None, + exception_on_empty_embed: bool = False + ) -> Optional[Message]: """ - Use a paginator and set of reactions to provide pagination over a set of lines. The reactions are used to - switch page, or to finish with pagination. + Use a paginator and set of reactions to provide pagination over a set of lines. + + The reactions are used to switch page, or to finish with pagination. + When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may - be used to change page, or to remove pagination from the message. Pagination will also be removed automatically - if no reaction is added for five minutes (300 seconds). + be used to change page, or to remove pagination from the message. + + Pagination will also be removed automatically if no reaction is added for five minutes (300 seconds). + + Example: >>> embed = Embed() >>> embed.set_author(name="Some Operation", url=url, icon_url=icon) - >>> await LinePaginator.paginate( - ... (line for line in lines), - ... ctx, embed - ... ) - :param lines: The lines to be paginated - :param ctx: Current context object - :param embed: A pre-configured embed to be used as a template for each page - :param prefix: Text to place before each page - :param suffix: Text to place after each page - :param max_lines: The maximum number of lines on each page - :param max_size: The maximum number of characters on each page - :param empty: Whether to place an empty line between each given line - :param restrict_to_user: A user to lock pagination operations to for this message, if supplied - :param exception_on_empty_embed: Should there be an exception if the embed is empty? - :param url: the url to use for the embed headline - :param timeout: The amount of time in seconds to disable pagination of no reaction is added - :param footer_text: Text to prefix the page number in the footer with + >>> await LinePaginator.paginate((line for line in lines), ctx, embed) """ - - def event_check(reaction_: Reaction, user_: Member): - """ - Make sure that this reaction is what we want to operate on - """ - + def event_check(reaction_: Reaction, user_: Member) -> bool: + """Make sure that this reaction is what we want to operate on.""" no_restrictions = ( # Pagination is not restricted not restrict_to_user @@ -301,24 +286,20 @@ class LinePaginator(Paginator): class ImagePaginator(Paginator): """ Helper class that paginates images for embeds in messages. + Close resemblance to LinePaginator, except focuses on images over text. Refer to ImagePaginator.paginate for documentation on how to use. """ - def __init__(self, prefix="", suffix=""): + def __init__(self, prefix: str = "", suffix: str = ""): super().__init__(prefix, suffix) self._current_page = [prefix] self.images = [] self._pages = [] def add_line(self, line: str = '', *, empty: bool = False) -> None: - """ - Adds a line to each page, usually just 1 line in this context - :param line: str to be page content / title - :param empty: if there should be new lines between entries - """ - + """Adds a line to each page.""" if line: self._count = len(line) else: @@ -327,50 +308,36 @@ class ImagePaginator(Paginator): self.close_page() def add_image(self, image: str = None) -> None: - """ - Adds an image to a page - :param image: image url to be appended - """ - + """Adds an image to a page.""" self.images.append(image) @classmethod - async def paginate(cls, pages: List[Tuple[str, str]], ctx: Context, embed: Embed, - prefix: str = "", suffix: str = "", timeout: int = 300, - exception_on_empty_embed: bool = False): + async def paginate( + cls, + pages: List[Tuple[str, str]], + ctx: Context, embed: Embed, + prefix: str = "", + suffix: str = "", + timeout: int = 300, + exception_on_empty_embed: bool = False + ) -> Optional[Message]: """ - Use a paginator and set of reactions to provide - pagination over a set of title/image pairs.The reactions are - used to switch page, or to finish with pagination. + Use a paginator and set of reactions to provide pagination over a set of title/image pairs. + + The reactions are used to switch page, or to finish with pagination. - When used, this will send a message using `ctx.send()` and - apply a set of reactions to it. These reactions may + When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may be used to change page, or to remove pagination from the message. - Note: Pagination will be removed automatically - if no reaction is added for five minutes (300 seconds). + Note: Pagination will be removed automatically if no reaction is added for five minutes (300 seconds). + Example: >>> embed = Embed() >>> embed.set_author(name="Some Operation", url=url, icon_url=icon) >>> await ImagePaginator.paginate(pages, ctx, embed) - - Parameters - ----------- - :param pages: An iterable of tuples with title for page, and img url - :param ctx: ctx for message - :param embed: base embed to modify - :param prefix: prefix of message - :param suffix: suffix of message - :param timeout: timeout for when reactions get auto-removed """ - def check_event(reaction_: Reaction, member: Member) -> bool: - """ - Checks each reaction added, if it matches our conditions pass the wait_for - :param reaction_: reaction added - :param member: reaction added by member - """ - + """Checks each reaction added, if it matches our conditions pass the wait_for.""" return all(( # Reaction is on the same message sent reaction_.message.id == message.id, diff --git a/bot/patches/__init__.py b/bot/patches/__init__.py index fd38ea8cf..60f6becaa 100644 --- a/bot/patches/__init__.py +++ b/bot/patches/__init__.py @@ -1,4 +1,4 @@ -"""Subpackage that contains patches for discord.py""" +"""Subpackage that contains patches for discord.py.""" from . import message_edited_at __all__ = [ diff --git a/bot/patches/message_edited_at.py b/bot/patches/message_edited_at.py index 528373a9b..a0154f12d 100644 --- a/bot/patches/message_edited_at.py +++ b/bot/patches/message_edited_at.py @@ -1,5 +1,5 @@ """ -# message_edited_at patch +# message_edited_at patch. Date: 2019-09-16 Author: Scragly @@ -16,12 +16,12 @@ from discord import message, utils log = logging.getLogger(__name__) -def _handle_edited_timestamp(self, value): +def _handle_edited_timestamp(self: message.Message, value: str) -> None: """Helper function that takes care of parsing the edited timestamp.""" self._edited_timestamp = utils.parse_time(value) -def apply_patch(): +def apply_patch() -> None: """Applies the `edited_at` patch to the `discord.message.Message` class.""" message.Message._handle_edited_timestamp = _handle_edited_timestamp message.Message._HANDLERS['edited_timestamp'] = message.Message._handle_edited_timestamp diff --git a/bot/rules/attachments.py b/bot/rules/attachments.py index 80a15d440..c550aed76 100644 --- a/bot/rules/attachments.py +++ b/bot/rules/attachments.py @@ -1,16 +1,12 @@ -"""Detects total attachments exceeding the limit sent by a single user.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total attachments exceeding the limit sent by a single user.""" relevant_messages = [last_message] + [ msg for msg in recent_messages diff --git a/bot/rules/burst.py b/bot/rules/burst.py index 80c79be60..25c5a2f33 100644 --- a/bot/rules/burst.py +++ b/bot/rules/burst.py @@ -1,16 +1,12 @@ -"""Detects repeated messages sent by a single user.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects repeated messages sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/burst_shared.py b/bot/rules/burst_shared.py index 2cb7b5200..bbe9271b3 100644 --- a/bot/rules/burst_shared.py +++ b/bot/rules/burst_shared.py @@ -1,16 +1,12 @@ -"""Detects repeated messages sent by multiple users.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects repeated messages sent by multiple users.""" total_recent = len(recent_messages) if total_recent > config['max']: diff --git a/bot/rules/chars.py b/bot/rules/chars.py index d05e3cd83..1f587422c 100644 --- a/bot/rules/chars.py +++ b/bot/rules/chars.py @@ -1,16 +1,12 @@ -"""Detects total message char count exceeding the limit sent by a single user.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total message char count exceeding the limit sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/discord_emojis.py b/bot/rules/discord_emojis.py index e4f957ddb..5bab514f2 100644 --- a/bot/rules/discord_emojis.py +++ b/bot/rules/discord_emojis.py @@ -1,5 +1,3 @@ -"""Detects total Discord emojis (excluding Unicode emojis) exceeding the limit sent by a single user.""" - import re from typing import Dict, Iterable, List, Optional, Tuple @@ -10,11 +8,9 @@ DISCORD_EMOJI_RE = re.compile(r"<:\w+:\d+>") async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total Discord emojis (excluding Unicode emojis) exceeding the limit sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/duplicates.py b/bot/rules/duplicates.py index 763fc9983..455764b53 100644 --- a/bot/rules/duplicates.py +++ b/bot/rules/duplicates.py @@ -1,16 +1,12 @@ -"""Detects duplicated messages sent by a single user.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects duplicated messages sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/links.py b/bot/rules/links.py index fa4043fcb..ec75a19c5 100644 --- a/bot/rules/links.py +++ b/bot/rules/links.py @@ -1,5 +1,3 @@ -"""Detects total links exceeding the limit sent by a single user.""" - import re from typing import Dict, Iterable, List, Optional, Tuple @@ -10,11 +8,9 @@ LINK_RE = re.compile(r"(https?://[^\s]+)") async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total links exceeding the limit sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/mentions.py b/bot/rules/mentions.py index 45c47b6ba..79725a4b1 100644 --- a/bot/rules/mentions.py +++ b/bot/rules/mentions.py @@ -1,16 +1,12 @@ -"""Detects total mentions exceeding the limit sent by a single user.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total mentions exceeding the limit sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/newlines.py b/bot/rules/newlines.py index fdad6ffd3..4e66e1359 100644 --- a/bot/rules/newlines.py +++ b/bot/rules/newlines.py @@ -1,5 +1,3 @@ -"""Detects total newlines exceeding the set limit sent by a single user.""" - import re from typing import Dict, Iterable, List, Optional, Tuple @@ -7,11 +5,9 @@ from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total newlines exceeding the set limit sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/rules/role_mentions.py b/bot/rules/role_mentions.py index 2177a73b5..0649540b6 100644 --- a/bot/rules/role_mentions.py +++ b/bot/rules/role_mentions.py @@ -1,16 +1,12 @@ -"""Detects total role mentions exceeding the limit sent by a single user.""" - from typing import Dict, Iterable, List, Optional, Tuple from discord import Member, Message async def apply( - last_message: Message, - recent_messages: List[Message], - config: Dict[str, int] + last_message: Message, recent_messages: List[Message], config: Dict[str, int] ) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]: - + """Detects total role mentions exceeding the limit sent by a single user.""" relevant_messages = tuple( msg for msg in recent_messages diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py index d5ae0a7c5..8184be824 100644 --- a/bot/utils/__init__.py +++ b/bot/utils/__init__.py @@ -1,10 +1,12 @@ from abc import ABCMeta +from typing import Any, Generator, Hashable, Iterable from discord.ext.commands import CogMeta class CogABCMeta(CogMeta, ABCMeta): """Metaclass for ABCs meant to be implemented as Cogs.""" + pass @@ -16,50 +18,59 @@ class CaseInsensitiveDict(dict): """ @classmethod - def _k(cls, key): + def _k(cls, key: Hashable) -> Hashable: + """Return lowered key if a string-like is passed, otherwise pass key straight through.""" return key.lower() if isinstance(key, str) else key def __init__(self, *args, **kwargs): super(CaseInsensitiveDict, self).__init__(*args, **kwargs) self._convert_keys() - def __getitem__(self, key): + def __getitem__(self, key: Hashable) -> Any: + """Case insensitive __setitem__.""" return super(CaseInsensitiveDict, self).__getitem__(self.__class__._k(key)) - def __setitem__(self, key, value): + def __setitem__(self, key: Hashable, value: Any): + """Case insensitive __setitem__.""" super(CaseInsensitiveDict, self).__setitem__(self.__class__._k(key), value) - def __delitem__(self, key): + def __delitem__(self, key: Hashable) -> Any: + """Case insensitive __delitem__.""" return super(CaseInsensitiveDict, self).__delitem__(self.__class__._k(key)) - def __contains__(self, key): + def __contains__(self, key: Hashable) -> bool: + """Case insensitive __contains__.""" return super(CaseInsensitiveDict, self).__contains__(self.__class__._k(key)) - def pop(self, key, *args, **kwargs): + def pop(self, key: Hashable, *args, **kwargs) -> Any: + """Case insensitive pop.""" return super(CaseInsensitiveDict, self).pop(self.__class__._k(key), *args, **kwargs) - def get(self, key, *args, **kwargs): + def get(self, key: Hashable, *args, **kwargs) -> Any: + """Case insensitive get.""" return super(CaseInsensitiveDict, self).get(self.__class__._k(key), *args, **kwargs) - def setdefault(self, key, *args, **kwargs): + def setdefault(self, key: Hashable, *args, **kwargs) -> Any: + """Case insensitive setdefault.""" return super(CaseInsensitiveDict, self).setdefault(self.__class__._k(key), *args, **kwargs) - def update(self, E=None, **F): + def update(self, E: Any = None, **F) -> None: + """Case insensitive update.""" super(CaseInsensitiveDict, self).update(self.__class__(E)) super(CaseInsensitiveDict, self).update(self.__class__(**F)) - def _convert_keys(self): + def _convert_keys(self) -> None: + """Helper method to lowercase all existing string-like keys.""" for k in list(self.keys()): v = super(CaseInsensitiveDict, self).pop(k) self.__setitem__(k, v) -def chunks(iterable, size): +def chunks(iterable: Iterable, size: int) -> Generator[Any, None, None]: """ - Generator that allows you to iterate over any indexable collection in `size`-length chunks + Generator that allows you to iterate over any indexable collection in `size`-length chunks. Found: https://stackoverflow.com/a/312464/4022104 """ - for i in range(0, len(iterable), size): yield iterable[i:i + size] diff --git a/bot/utils/checks.py b/bot/utils/checks.py index 195edab0f..19f64ff9f 100644 --- a/bot/utils/checks.py +++ b/bot/utils/checks.py @@ -6,11 +6,7 @@ log = logging.getLogger(__name__) def with_role_check(ctx: Context, *role_ids: int) -> bool: - """ - Returns True if the user has any one - of the roles in role_ids. - """ - + """Returns True if the user has any one of the roles in role_ids.""" if not ctx.guild: # Return False in a DM log.trace(f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. " "This command is restricted by the with_role decorator. Rejecting request.") @@ -27,11 +23,7 @@ def with_role_check(ctx: Context, *role_ids: int) -> bool: def without_role_check(ctx: Context, *role_ids: int) -> bool: - """ - Returns True if the user does not have any - of the roles in role_ids. - """ - + """Returns True if the user does not have any of the roles in role_ids.""" if not ctx.guild: # Return False in a DM log.trace(f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. " "This command is restricted by the without_role decorator. Rejecting request.") @@ -45,11 +37,7 @@ def without_role_check(ctx: Context, *role_ids: int) -> bool: def in_channel_check(ctx: Context, channel_id: int) -> bool: - """ - Checks if the command was executed - inside of the specified channel. - """ - + """Checks if the command was executed inside of the specified channel.""" check = ctx.channel.id == channel_id log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. " f"The result of the in_channel check was {check}.") diff --git a/bot/utils/messages.py b/bot/utils/messages.py index 94a8b36ed..549b33ca6 100644 --- a/bot/utils/messages.py +++ b/bot/utils/messages.py @@ -1,9 +1,9 @@ import asyncio import contextlib from io import BytesIO -from typing import Sequence, Union +from typing import Optional, Sequence, Union -from discord import Embed, File, Message, TextChannel, Webhook +from discord import Client, Embed, File, Member, Message, Reaction, TextChannel, Webhook from discord.abc import Snowflake from discord.errors import HTTPException @@ -17,42 +17,18 @@ async def wait_for_deletion( user_ids: Sequence[Snowflake], deletion_emojis: Sequence[str] = (Emojis.cross_mark,), timeout: float = 60 * 5, - attach_emojis=True, - client=None -): - """ - Waits for up to `timeout` seconds for a reaction by - any of the specified `user_ids` to delete the message. - - Args: - message (Message): - The message that should be monitored for reactions - and possibly deleted. Must be a message sent on a - guild since access to the bot instance is required. - - user_ids (Sequence[Snowflake]): - A sequence of users that are allowed to delete - this message. - - Kwargs: - deletion_emojis (Sequence[str]): - A sequence of emojis that are considered deletion - emojis. - - timeout (float): - A positive float denoting the maximum amount of - time to wait for a deletion reaction. - - attach_emojis (bool): - Whether to attach the given `deletion_emojis` - to the message in the given `context` - - client (Optional[discord.Client]): - The client instance handling the original command. - If not given, will take the client from the guild - of the message. + attach_emojis: bool = True, + client: Optional[Client] = None +) -> None: """ + Wait for up to `timeout` seconds for a reaction by any of the specified `user_ids` to delete the message. + + An `attach_emojis` bool may be specified to determine whether to attach the given + `deletion_emojis` to the message in the given `context` + A `client` instance may be optionally specified, otherwise client will be taken from the + guild of the message. + """ if message.guild is None and client is None: raise ValueError("Message must be sent on a guild") @@ -62,7 +38,8 @@ async def wait_for_deletion( for emoji in deletion_emojis: await message.add_reaction(emoji) - def check(reaction, user): + def check(reaction: Reaction, user: Member) -> bool: + """Check that the deletion emoji is reacted by the approprite user.""" return ( reaction.message.id == message.id and reaction.emoji in deletion_emojis @@ -70,25 +47,17 @@ async def wait_for_deletion( ) with contextlib.suppress(asyncio.TimeoutError): - await bot.wait_for( - 'reaction_add', - check=check, - timeout=timeout - ) + await bot.wait_for('reaction_add', check=check, timeout=timeout) await message.delete() -async def send_attachments(message: Message, destination: Union[TextChannel, Webhook]): +async def send_attachments(message: Message, destination: Union[TextChannel, Webhook]) -> None: """ Re-uploads each attachment in a message to the given channel or webhook. Each attachment is sent as a separate message to more easily comply with the 8 MiB request size limit. If attachments are too large, they are instead grouped into a single embed which links to them. - - :param message: the message whose attachments to re-upload - :param destination: the channel in which to re-upload the attachments """ - large = [] for attachment in message.attachments: try: diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py index f03865013..08abd91d7 100644 --- a/bot/utils/scheduling.py +++ b/bot/utils/scheduling.py @@ -2,7 +2,7 @@ import asyncio import contextlib import logging from abc import abstractmethod -from typing import Dict +from typing import Coroutine, Dict, Union from bot.utils import CogABCMeta @@ -10,6 +10,7 @@ log = logging.getLogger(__name__) class Scheduler(metaclass=CogABCMeta): + """Task scheduler.""" def __init__(self): @@ -17,24 +18,23 @@ class Scheduler(metaclass=CogABCMeta): self.scheduled_tasks: Dict[str, asyncio.Task] = {} @abstractmethod - async def _scheduled_task(self, task_object: dict): + async def _scheduled_task(self, task_object: dict) -> None: """ - A coroutine which handles the scheduling. This is added to the scheduled tasks, - and should wait the task duration, execute the desired code, and clean up the task. + A coroutine which handles the scheduling. + + This is added to the scheduled tasks, and should wait the task duration, execute the desired + code, then clean up the task. + For example, in Reminders this will wait for the reminder duration, send the reminder, then make a site API request to delete the reminder from the database. - - :param task_object: """ - def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict): + def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict) -> None: """ Schedules a task. - :param loop: the asyncio event loop - :param task_id: the ID of the task. - :param task_data: the data of the task, passed to `Scheduler._scheduled_expiration`. - """ + `task_data` is passed to `Scheduler._scheduled_expiration` + """ if task_id in self.scheduled_tasks: return @@ -42,12 +42,8 @@ class Scheduler(metaclass=CogABCMeta): self.scheduled_tasks[task_id] = task - def cancel_task(self, task_id: str): - """ - Un-schedules a task. - :param task_id: the ID of the infraction in question - """ - + def cancel_task(self, task_id: str) -> None: + """Un-schedules a task.""" task = self.scheduled_tasks.get(task_id) if task is None: @@ -59,14 +55,8 @@ class Scheduler(metaclass=CogABCMeta): del self.scheduled_tasks[task_id] -def create_task(loop: asyncio.AbstractEventLoop, coro_or_future): - """ - Creates an asyncio.Task object from a coroutine or future object. - - :param loop: the asyncio event loop. - :param coro_or_future: the coroutine or future object to be scheduled. - """ - +def create_task(loop: asyncio.AbstractEventLoop, coro_or_future: Union[Coroutine, asyncio.Future]) -> asyncio.Task: + """Creates an asyncio.Task object from a coroutine or future object.""" task: asyncio.Task = asyncio.ensure_future(coro_or_future, loop=loop) # Silently ignore exceptions in a callback (handles the CancelledError nonsense) @@ -74,6 +64,7 @@ def create_task(loop: asyncio.AbstractEventLoop, coro_or_future): return task -def _silent_exception(future): +def _silent_exception(future: asyncio.Future) -> None: + """Suppress future's exception.""" with contextlib.suppress(Exception): future.exception() diff --git a/bot/utils/time.py b/bot/utils/time.py index a330c9cd8..c529ccc2b 100644 --- a/bot/utils/time.py +++ b/bot/utils/time.py @@ -6,10 +6,9 @@ from dateutil.relativedelta import relativedelta RFC1123_FORMAT = "%a, %d %b %Y %H:%M:%S GMT" -def _stringify_time_unit(value: int, unit: str): +def _stringify_time_unit(value: int, unit: str) -> str: """ - Returns a string to represent a value and time unit, - ensuring that it uses the right plural form of the unit. + Returns a string to represent a value and time unit, ensuring that it uses the right plural form of the unit. >>> _stringify_time_unit(1, "seconds") "1 second" @@ -18,7 +17,6 @@ def _stringify_time_unit(value: int, unit: str): >>> _stringify_time_unit(0, "minutes") "less than a minute" """ - if value == 1: return f"{value} {unit[:-1]}" elif value == 0: @@ -27,18 +25,13 @@ def _stringify_time_unit(value: int, unit: str): return f"{value} {unit}" -def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: int = 6): +def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: int = 6) -> str: """ Returns a human-readable version of the relativedelta. - :param delta: A dateutil.relativedelta.relativedelta object - :param precision: The smallest unit that should be included. - :param max_units: The maximum number of time-units to return. - - :return: A string like `4 days, 12 hours and 1 second`, - `1 minute`, or `less than a minute`. + precision specifies the smallest unit of time to include (e.g. "seconds", "minutes"). + max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours). """ - units = ( ("years", delta.years), ("months", delta.months), @@ -73,19 +66,13 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: return humanized -def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max_units: int = 6): +def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max_units: int = 6) -> str: """ - Takes a datetime and returns a human-readable string that - describes how long ago that datetime was. + Takes a datetime and returns a human-readable string that describes how long ago that datetime was. - :param past_datetime: A datetime.datetime object - :param precision: The smallest unit that should be included. - :param max_units: The maximum number of time-units to return. - - :return: A string like `4 days, 12 hours and 1 second ago`, - `1 minute ago`, or `less than a minute ago`. + precision specifies the smallest unit of time to include (e.g. "seconds", "minutes"). + max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours). """ - now = datetime.datetime.utcnow() delta = abs(relativedelta(now, past_datetime)) @@ -94,20 +81,17 @@ def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max return f"{humanized} ago" -def parse_rfc1123(time_str): +def parse_rfc1123(time_str: str) -> datetime.datetime: + """Parse RFC1123 time string into datetime.""" return datetime.datetime.strptime(time_str, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc) # Hey, this could actually be used in the off_topic_names and reddit cogs :) -async def wait_until(time: datetime.datetime): - """ - Wait until a given time. - - :param time: A datetime.datetime object to wait until. - """ - +async def wait_until(time: datetime.datetime) -> None: + """Wait until a given time.""" delay = time - datetime.datetime.utcnow() delay_seconds = delay.total_seconds() + # Incorporate a small delay so we don't rapid-fire the event due to time precision errors if delay_seconds > 1.0: await asyncio.sleep(delay_seconds) @@ -1,6 +1,19 @@ [flake8] max-line-length=120 -application_import_names=bot,tests -exclude=.cache,.venv -ignore=B311,W503,E226,S311,T000 +docstring-convention=all import-order-style=pycharm +application_import_names=bot,tests +exclude=.cache,.venv,constants.py +ignore= + B311,W503,E226,S311,T000 + # Missing Docstrings + D100,D104,D105,D107, + # Docstring Whitespace + D203,D212,D214,D215, + # Docstring Quotes + D301,D302, + # Docstring Content + D400,D401,D402,D404,D405,D406,D407,D408,D409,D410,D411,D412,D413,D414,D416,D417 + # Type Annotations + TYP002,TYP003,TYP101,TYP102,TYP204,TYP206 +per-file-ignores=tests/*:D,TYP |