diff options
| author | 2019-09-10 17:57:42 -0400 | |
|---|---|---|
| committer | 2019-09-10 17:57:42 -0400 | |
| commit | d20905ef77c86cd348cb8a86014b68012861d3b0 (patch) | |
| tree | bc4e9cfca3363fc2c3332f20247ce2f37a0f0b0a | |
| parent | Docstring linting chunk 2 (diff) | |
Docstring linting chunk 3
| -rw-r--r-- | bot/cogs/filtering.py | 65 | ||||
| -rw-r--r-- | bot/cogs/free.py | 9 | ||||
| -rw-r--r-- | bot/cogs/moderation.py | 258 | ||||
| -rw-r--r-- | bot/cogs/off_topic_names.py | 28 | ||||
| -rw-r--r-- | bot/cogs/reminders.py | 98 | ||||
| -rw-r--r-- | bot/cogs/security.py | 13 | ||||
| -rw-r--r-- | bot/cogs/snekbox.py | 12 | 
7 files changed, 170 insertions, 313 deletions
| diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py index 418297fc4..b924ac265 100644 --- a/bot/cogs/filtering.py +++ b/bot/cogs/filtering.py @@ -30,10 +30,7 @@ ZALGO_RE = r"[\u0300-\u036F\u0489]"  class Filtering: -    """ -    Filtering out invites, blacklisting domains, -    and warning us of certain regular expressions -    """ +    """Filtering out invites, blacklisting domains, and warning us of certain regular expressions."""      def __init__(self, bot: Bot):          self.bot = bot @@ -94,26 +91,27 @@ class Filtering:      @property      def mod_log(self) -> ModLog: +        """Get currently loaded ModLog cog instance."""          return self.bot.get_cog("ModLog") -    async def on_message(self, msg: Message): +    async def on_message(self, msg: Message) -> None: +        """Invoke message filter for new messages."""          await self._filter_message(msg) -    async def on_message_edit(self, before: Message, after: Message): +    async def on_message_edit(self, before: Message, after: Message) -> None: +        """ +        Invoke message filter for message edits. + +        If there have been multiple edits, calculate the time delta from the previous edit +        """          if not before.edited_at:              delta = relativedelta(after.edited_at, before.created_at).microseconds          else:              delta = None          await self._filter_message(after, delta) -    async def _filter_message(self, msg: Message, delta: Optional[int] = None): -        """ -        Whenever a message is sent or edited, -        run it through our filters to see if it -        violates any of our rules, and then respond -        accordingly. -        """ - +    async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None: +        """Filter the input message to see if it violates any of our rules, and then respond accordingly."""          # Should we filter this message?          role_whitelisted = False @@ -224,14 +222,10 @@ class Filtering:      @staticmethod      async def _has_watchlist_words(text: str) -> bool:          """ -        Returns True if the text contains -        one of the regular expressions from the -        word_watchlist in our filter config. +        Returns True if the text contains one of the regular expressions from the word_watchlist in our filter config. -        Only matches words with boundaries before -        and after the expression. +        Only matches words with boundaries before and after the expression.          """ -          for expression in Filter.word_watchlist:              if re.search(fr"\b{expression}\b", text, re.IGNORECASE):                  return True @@ -241,14 +235,10 @@ class Filtering:      @staticmethod      async def _has_watchlist_tokens(text: str) -> bool:          """ -        Returns True if the text contains -        one of the regular expressions from the -        token_watchlist in our filter config. +        Returns True if the text contains one of the regular expressions from the token_watchlist in our filter config. -        This will match the expression even if it -        does not have boundaries before and after +        This will match the expression even if it does not have boundaries before and after.          """ -          for expression in Filter.token_watchlist:              if re.search(fr"{expression}", text, re.IGNORECASE): @@ -260,11 +250,7 @@ class Filtering:      @staticmethod      async def _has_urls(text: str) -> bool: -        """ -        Returns True if the text contains one of -        the blacklisted URLs from the config file. -        """ - +        """Returns True if the text contains one of the blacklisted URLs from the config file."""          if not re.search(URL_RE, text, re.IGNORECASE):              return False @@ -283,7 +269,6 @@ class Filtering:          Zalgo range is \u0300 – \u036F and \u0489.          """ -          return bool(re.search(ZALGO_RE, text))      async def _has_invites(self, text: str) -> Union[dict, bool]: @@ -295,7 +280,6 @@ class Filtering:          Attempts to catch some of common ways to try to cheat the system.          """ -          # Remove backslashes to prevent escape character aroundfuckery like          # discord\.gg/gdudes-pony-farm          text = text.replace("\\", "") @@ -336,30 +320,27 @@ class Filtering:          return invite_data if invite_data else False      @staticmethod -    async def _has_rich_embed(msg: Message): -        """ -        Returns True if any of the embeds in the message are of type 'rich', but are not twitter -        embeds. Returns False otherwise. -        """ +    async def _has_rich_embed(msg: Message) -> bool: +        """Returns True if any of the embeds in the message are of type 'rich', but are not twitter embeds."""          if msg.embeds:              for embed in msg.embeds:                  if embed.type == "rich" and (not embed.url or "twitter.com" not in embed.url):                      return True          return False -    async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel): +    async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None:          """ -        Notify filtered_member about a moderation action with the reason str +        Notify filtered_member about a moderation action with the reason str.          First attempts to DM the user, fall back to in-channel notification if user has DMs disabled          """ -          try:              await filtered_member.send(reason)          except discord.errors.Forbidden:              await channel.send(f"{filtered_member.mention} {reason}") -def setup(bot: Bot): +def setup(bot: Bot) -> None: +    """Filtering cog load."""      bot.add_cog(Filtering(bot))      log.info("Cog loaded: Filtering") diff --git a/bot/cogs/free.py b/bot/cogs/free.py index fd6009bb8..ccc722e66 100644 --- a/bot/cogs/free.py +++ b/bot/cogs/free.py @@ -2,7 +2,7 @@ import logging  from datetime import datetime  from discord import Colour, Embed, Member, utils -from discord.ext.commands import Context, command +from discord.ext.commands import Bot, Context, command  from bot.constants import Categories, Channels, Free, STAFF_ROLES  from bot.decorators import redirect_output @@ -22,11 +22,9 @@ class Free:      @command(name="free", aliases=('f',))      @redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES) -    async def free(self, ctx: Context, user: Member = None, seek: int = 2): +    async def free(self, ctx: Context, user: Member = None, seek: int = 2) -> None:          """          Lists free help channels by likeliness of availability. -        :param user: accepts user mention, ID, etc. -        :param seek: How far back to check the last active message.          seek is used only when this command is invoked in a help channel.          You cannot override seek without mentioning a user first. @@ -101,6 +99,7 @@ class Free:          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: +    """Free cog load."""      bot.add_cog(Free())      log.info("Cog loaded: Free") diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py index 1dc2c70d6..28956e636 100644 --- a/bot/cogs/moderation.py +++ b/bot/cogs/moderation.py @@ -33,6 +33,7 @@ APPEALABLE_INFRACTIONS = ("Ban", "Mute")  def proxy_user(user_id: str) -> Object: +    """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""      try:          user_id = int(user_id)      except ValueError: @@ -47,9 +48,7 @@ UserTypes = Union[Member, User, proxy_user]  class Moderation(Scheduler): -    """ -    Server moderation tools. -    """ +    """Server moderation tools."""      def __init__(self, bot: Bot):          self.bot = bot @@ -58,10 +57,11 @@ class Moderation(Scheduler):      @property      def mod_log(self) -> ModLog: +        """Get currently loaded ModLog cog instance."""          return self.bot.get_cog("ModLog") -    async def on_ready(self): -        # Schedule expiration for previous infractions +    async def on_ready(self) -> None: +        """Schedule expiration for previous infractions."""          infractions = await self.bot.api_client.get(              'bot/infractions', params={'active': 'true'}          ) @@ -73,14 +73,13 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command() -    async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None): +    async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:          """          Create a warning infraction in the database for a user.          **`user`:** Accepts user mention, ID, etc.          **`reason`:** The reason for the warning.          """ -          response_object = await post_infraction(ctx, user, type="warning", reason=reason)          if response_object is None:              return @@ -123,14 +122,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command() -    async def kick(self, ctx: Context, user: Member, *, reason: str = None): -        """ -        Kicks a user. - -        **`user`:** Accepts user mention, ID, etc. -        **`reason`:** The reason for the kick. -        """ - +    async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None: +        """Kicks a user with the provided reason."""          if not await self.respect_role_hierarchy(ctx, user, 'kick'):              # Ensure ctx author has a higher top role than the target user              # Warning is sent to ctx by the helper method @@ -183,14 +176,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command() -    async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None): -        """ -        Create a permanent ban infraction in the database for a user. - -        **`user`:** Accepts user mention, ID, etc. -        **`reason`:** The reason for the ban. -        """ - +    async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None: +        """Create a permanent ban infraction for a user with the provided reason."""          if not await self.respect_role_hierarchy(ctx, user, 'ban'):              # Ensure ctx author has a higher top role than the target user              # Warning is sent to ctx by the helper method @@ -260,14 +247,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command() -    async def mute(self, ctx: Context, user: Member, *, reason: str = None): -        """ -        Create a permanent mute infraction in the database for a user. - -        **`user`:** Accepts user mention, ID, etc. -        **`reason`:** The reason for the mute. -        """ - +    async def mute(self, ctx: Context, user: Member, *, reason: str = None) -> None: +        """Create a permanent mute infraction for a user with the provided reason."""          active_mutes = await self.bot.api_client.get(              'bot/infractions',              params={ @@ -334,15 +315,12 @@ class Moderation(Scheduler):      async def tempmute(              self, ctx: Context, user: Member, expiration: ExpirationDate,              *, reason: str = None -    ): +    ) -> None:          """ -        Create a temporary mute infraction in the database for a user. +        Create a temporary mute infraction for a user with the provided expiration and reason. -        **`user`:** Accepts user mention, ID, etc. -        **`duration`:** The duration for the temporary mute infraction -        **`reason`:** The reason for the temporary mute. +        Duration strings are parsed per: http://strftime.org/          """ -          active_mutes = await self.bot.api_client.get(              'bot/infractions',              params={ @@ -416,15 +394,12 @@ class Moderation(Scheduler):      @command()      async def tempban(              self, ctx: Context, user: UserTypes, expiry: ExpirationDate, *, reason: str = None -    ): +    ) -> None:          """ -        Create a temporary ban infraction in the database for a user. +        Create a temporary ban infraction for a user with the provided expiration and reason. -        **`user`:**  Accepts user mention, ID, etc. -        **`expiry`:**  The duration for the temporary ban infraction -        **`reason`:**  The reason for the temporary ban. +        Duration strings are parsed per: http://strftime.org/          """ -          if not await self.respect_role_hierarchy(ctx, user, 'tempban'):              # Ensure ctx author has a higher top role than the target user              # Warning is sent to ctx by the helper method @@ -510,14 +485,12 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn']) -    async def note(self, ctx: Context, user: UserTypes, *, reason: str = None): +    async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:          """ -        Create a private infraction note in the database for a user. +        Create a private infraction note in the database for a user with the provided reason. -        **`user`:** accepts user mention, ID, etc. -        **`reason`:** The reason for the warning. +        This does not send the user a notification          """ -          response_object = await post_infraction(              ctx, user, type="warning", reason=reason, hidden=True          ) @@ -545,14 +518,12 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command(hidden=True, aliases=['shadowkick', 'skick']) -    async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None): +    async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:          """ -        Kicks a user. +        Kick a user for the provided reason. -        **`user`:** accepts user mention, ID, etc. -        **`reason`:** The reason for the kick. +        This does not send the user a notification.          """ -          if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'):              # Ensure ctx author has a higher top role than the target user              # Warning is sent to ctx by the helper method @@ -598,14 +569,12 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command(hidden=True, aliases=['shadowban', 'sban']) -    async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None): +    async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:          """ -        Create a permanent ban infraction in the database for a user. +        Create a permanent ban infraction for a user with the provided reason. -        **`user`:** Accepts user mention, ID, etc. -        **`reason`:** The reason for the ban. +        This does not send the user a notification.          """ -          if not await self.respect_role_hierarchy(ctx, user, 'shadowban'):              # Ensure ctx author has a higher top role than the target user              # Warning is sent to ctx by the helper method @@ -652,14 +621,12 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command(hidden=True, aliases=['shadowmute', 'smute']) -    async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None): +    async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:          """ -        Create a permanent mute infraction in the database for a user. +        Create a permanent mute infraction for a user with the provided reason. -        **`user`:** Accepts user mention, ID, etc. -        **`reason`:** The reason for the mute. +        This does not send the user a notification.          """ -          response_object = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True)          if response_object is None:              return @@ -692,15 +659,14 @@ class Moderation(Scheduler):      @command(hidden=True, aliases=["shadowtempmute, stempmute"])      async def shadow_tempmute(          self, ctx: Context, user: Member, duration: str, *, reason: str = None -    ): +    ) -> None:          """ -        Create a temporary mute infraction in the database for a user. +        Create a temporary mute infraction for a user with the provided reason. -        **`user`:** Accepts user mention, ID, etc. -        **`duration`:** The duration for the temporary mute infraction -        **`reason`:** The reason for the temporary mute. -        """ +        Duration strings are parsed per: http://strftime.org/ +        This does not send the user a notification. +        """          response_object = await post_infraction(              ctx, user, type="mute", reason=reason, duration=duration, hidden=True          ) @@ -741,15 +707,14 @@ class Moderation(Scheduler):      @command(hidden=True, aliases=["shadowtempban, stempban"])      async def shadow_tempban(              self, ctx: Context, user: UserTypes, duration: str, *, reason: str = None -    ): +    ) -> None:          """ -        Create a temporary ban infraction in the database for a user. +        Create a temporary ban infraction for a user with the provided reason. -        **`user`:** Accepts user mention, ID, etc. -        **`duration`:** The duration for the temporary ban infraction -        **`reason`:** The reason for the temporary ban. -        """ +        Duration strings are parsed per: http://strftime.org/ +        This does not send the user a notification. +        """          if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'):              # Ensure ctx author has a higher top role than the target user              # Warning is sent to ctx by the helper method @@ -811,13 +776,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command() -    async def unmute(self, ctx: Context, user: Member): -        """ -        Deactivates the active mute infraction for a user. - -        **`user`:** Accepts user mention, ID, etc. -        """ - +    async def unmute(self, ctx: Context, user: Member) -> None: +        """Deactivates the active mute infraction for a user."""          try:              # check the current active infraction              response = await self.bot.api_client.get( @@ -881,13 +841,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @command() -    async def unban(self, ctx: Context, user: UserTypes): -        """ -        Deactivates the active ban infraction for a user. - -        **`user`:** Accepts user mention, ID, etc. -        """ - +    async def unban(self, ctx: Context, user: UserTypes) -> None: +        """Deactivates the active ban infraction for a user."""          try:              # check the current active infraction              response = await self.bot.api_client.get( @@ -938,16 +893,14 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True) -    async def infraction_group(self, ctx: Context): +    async def infraction_group(self, ctx: Context) -> None:          """Infraction manipulation commands.""" -          await ctx.invoke(self.bot.get_command("help"), "infraction")      @with_role(*MODERATION_ROLES)      @infraction_group.group(name='edit', invoke_without_command=True) -    async def infraction_edit_group(self, ctx: Context): +    async def infraction_edit_group(self, ctx: Context) -> None:          """Infraction editing commands.""" -          await ctx.invoke(self.bot.get_command("help"), "infraction", "edit")      @with_role(*MODERATION_ROLES) @@ -955,15 +908,12 @@ class Moderation(Scheduler):      async def edit_duration(              self, ctx: Context,              infraction_id: int, expires_at: Union[ExpirationDate, str] -    ): +    ) -> None:          """          Sets the duration of the given infraction, relative to the time of updating. -        **`infraction_id`:**  the id of the infraction -        **`expires_at`:**  the new expiration date of the infraction. -        Use "permanent" to mark the infraction as permanent. +        Duration strings are parsed per: http://strftime.org/, use "permanent" to mark the infraction as permanent.          """ -          if isinstance(expires_at, str) and expires_at != 'permanent':              raise BadArgument(                  "If `expires_at` is given as a non-datetime, " @@ -1043,13 +993,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @infraction_edit_group.command(name="reason") -    async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str): -        """ -        Sets the reason of the given infraction. -        **`infraction_id`:** the id of the infraction -        **`reason`:** The new reason of the infraction -        """ - +    async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None: +        """Edit the reason of the given infraction."""          try:              old_infraction = await self.bot.api_client.get(                  'bot/infractions/' + str(infraction_id) @@ -1099,11 +1044,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @infraction_group.group(name="search", invoke_without_command=True) -    async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery): -        """ -        Searches for infractions in the database. -        """ - +    async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None: +        """Searches for infractions in the database."""          if isinstance(query, User):              await ctx.invoke(self.search_user, query) @@ -1112,11 +1054,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @infraction_search_group.command(name="user", aliases=("member", "id")) -    async def search_user(self, ctx: Context, user: Union[User, proxy_user]): -        """ -        Search for infractions by member. -        """ - +    async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None: +        """Search for infractions by member."""          infraction_list = await self.bot.api_client.get(              'bot/infractions',              params={'user__id': str(user.id)} @@ -1129,11 +1068,8 @@ class Moderation(Scheduler):      @with_role(*MODERATION_ROLES)      @infraction_search_group.command(name="reason", aliases=("match", "regex", "re")) -    async def search_reason(self, ctx: Context, reason: str): -        """ -        Search for infractions by their reason. Use Re2 for matching. -        """ - +    async def search_reason(self, ctx: Context, reason: str) -> None: +        """Search for infractions by their reason. Use Re2 for matching."""          infraction_list = await self.bot.api_client.get(              'bot/infractions', params={'search': reason}          ) @@ -1146,8 +1082,8 @@ class Moderation(Scheduler):      # endregion      # region: Utility functions -    async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list): - +    async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None: +        """Send a paginated embed of infractions for the specified user."""          if not infractions:              await ctx.send(f":warning: No infractions could be found for that query.")              return @@ -1169,14 +1105,8 @@ class Moderation(Scheduler):      # endregion      # region: Utility functions -    def schedule_expiration(self, loop: asyncio.AbstractEventLoop, infraction_object: dict): -        """ -        Schedules a task to expire a temporary infraction. - -        :param loop: the asyncio event loop -        :param infraction_object: the infraction object to expire at the end of the task -        """ - +    def schedule_expiration(self, loop: asyncio.AbstractEventLoop, infraction_object: dict) -> None: +        """Schedules a task to expire a temporary infraction."""          infraction_id = infraction_object["id"]          if infraction_id in self.scheduled_tasks:              return @@ -1185,12 +1115,8 @@ class Moderation(Scheduler):          self.scheduled_tasks[infraction_id] = task -    def cancel_expiration(self, infraction_id: str): -        """ -        Un-schedules a task set to expire a temporary infraction. -        :param infraction_id: the ID of the infraction in question -        """ - +    def cancel_expiration(self, infraction_id: str) -> None: +        """Un-schedules a task set to expire a temporary infraction."""          task = self.scheduled_tasks.get(infraction_id)          if task is None:              log.warning(f"Failed to unschedule {infraction_id}: no task found.") @@ -1199,15 +1125,13 @@ class Moderation(Scheduler):          log.debug(f"Unscheduled {infraction_id}.")          del self.scheduled_tasks[infraction_id] -    async def _scheduled_task(self, infraction_object: dict): +    async def _scheduled_task(self, infraction_object: dict) -> None:          """ -        A co-routine which marks an infraction as expired after the delay from the time of -        scheduling to the time of expiration. At the time of expiration, the infraction is -        marked as inactive on the website, and the expiration task is cancelled. +        Marks an infraction expired after the delay from time of scheduling to time of expiration. -        :param infraction_object: the infraction in question +        At the time of expiration, the infraction is marked as inactive on the website, and the +        expiration task is cancelled. The user is then notified via DM.          """ -          infraction_id = infraction_object["id"]          # transform expiration to delay in seconds @@ -1229,14 +1153,12 @@ class Moderation(Scheduler):              icon_url=Icons.user_unmute          ) -    async def _deactivate_infraction(self, infraction_object): +    async def _deactivate_infraction(self, infraction_object: dict) -> None:          """          A co-routine which marks an infraction as inactive on the website. -        This co-routine does not cancel or un-schedule an expiration task. -        :param infraction_object: the infraction in question +        This co-routine does not cancel or un-schedule an expiration task.          """ -          guild: Guild = self.bot.get_guild(constants.Guild.id)          user_id = infraction_object["user"]          infraction_type = infraction_object["type"] @@ -1258,7 +1180,8 @@ class Moderation(Scheduler):              json={"active": False}          ) -    def _infraction_to_string(self, infraction_object): +    def _infraction_to_string(self, infraction_object: dict) -> str: +        """Convert the infraction object to a string representation."""          actor_id = infraction_object["actor"]          guild: Guild = self.bot.get_guild(constants.Guild.id)          actor = guild.get_member(actor_id) @@ -1285,16 +1208,12 @@ class Moderation(Scheduler):      async def notify_infraction(              self, user: Union[User, Member], infr_type: str,              expires_at: Union[datetime, str] = 'N/A', reason: str = "No reason provided." -    ): +    ) -> bool:          """ -        Notify a user of their fresh infraction :) +        Attempt to notify a user, via DM, of their fresh infraction. -        :param user: The user to send the message to. -        :param infr_type: The type of infraction, as a string. -        :param duration: The duration of the infraction. -        :param reason: The reason for the infraction. +        Optionally returns a boolean indicator of whether the DM was successful.          """ -          if isinstance(expires_at, datetime):              expires_at = expires_at.strftime('%c') @@ -1320,16 +1239,12 @@ class Moderation(Scheduler):      async def notify_pardon(              self, user: Union[User, Member], title: str, content: str,              icon_url: str = Icons.user_verified -    ): +    ) -> bool:          """ -        Notify a user that an infraction has been lifted. +        Attempt to notify a user, via DM, of their expired infraction. -        :param user: The user to send the message to. -        :param title: The title of the embed. -        :param content: The content of the embed. -        :param icon_url: URL for the title icon. +        Optionally returns a boolean indicator of whether the DM was successful.          """ -          embed = Embed(              description=content,              colour=Colour(Colours.soft_green) @@ -1339,14 +1254,12 @@ class Moderation(Scheduler):          return await self.send_private_embed(user, embed) -    async def send_private_embed(self, user: Union[User, Member], embed: Embed): +    async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool:          """          A helper method for sending an embed to a user's DMs. -        :param user: The user to send the embed to. -        :param embed: The embed to send. +        Returns a boolean indicator of DM success.          """ -          # sometimes `user` is a `discord.Object`, so let's make it a proper user.          user = await self.bot.get_user_info(user.id) @@ -1360,7 +1273,8 @@ class Moderation(Scheduler):              )              return False -    async def log_notify_failure(self, target: str, actor: Member, infraction_type: str): +    async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None: +        """Send a mod log entry if an attempt to DM the target user has failed."""          await self.mod_log.send_log_message(              icon_url=Icons.token_removed,              content=actor.mention, @@ -1374,7 +1288,8 @@ class Moderation(Scheduler):      # endregion -    async def __error(self, ctx, error): +    async def __error(self, ctx: Context, error: Exception) -> None: +        """Send a notification to the invoking context on a Union failure."""          if isinstance(error, BadUnionArgument):              if User in error.converters:                  await ctx.send(str(error.errors[0])) @@ -1382,15 +1297,11 @@ class Moderation(Scheduler):      async def respect_role_hierarchy(self, ctx: Context, target: UserTypes, infr_type: str) -> bool:          """          Check if the highest role of the invoking member is greater than that of the target member. +          If this check fails, a warning is sent to the invoking ctx.          Returns True always if target is not a discord.Member instance. - -        :param ctx: The command context when invoked. -        :param target: The target of the infraction. -        :param infr_type: The type of infraction.          """ -          if not isinstance(target, Member):              return True @@ -1409,6 +1320,7 @@ class Moderation(Scheduler):          return target_is_lower -def setup(bot): +def setup(bot: Bot) -> None: +    """Moderation cog load."""      bot.add_cog(Moderation(bot))      log.info("Cog loaded: Moderation") diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py index c0d2e5dc5..3849d3d59 100644 --- a/bot/cogs/off_topic_names.py +++ b/bot/cogs/off_topic_names.py @@ -18,7 +18,8 @@ class OffTopicName(Converter):      """A converter that ensures an added off-topic name is valid."""      @staticmethod -    async def convert(ctx: Context, argument: str): +    async def convert(ctx: Context, argument: str) -> None: +        """Attempt to replace any invalid characters with their approximate unicode equivalent."""          allowed_characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ!?'`-"          if not (2 <= len(argument) <= 96): @@ -37,7 +38,7 @@ class OffTopicName(Converter):          return argument.translate(table) -async def update_names(bot: Bot, headers: dict): +async def update_names(bot: Bot, headers: dict) -> None:      """      The background updater task that performs a channel name update daily. @@ -46,7 +47,6 @@ async def update_names(bot: Bot, headers: dict):              The running bot instance, used for fetching data from the              website via the bot's `api_client`.      """ -      while True:          # Since we truncate the compute timedelta to seconds, we add one second to ensure          # we go past midnight in the `seconds_to_sleep` set below. @@ -77,27 +77,27 @@ class OffTopicNames:          self.headers = {"X-API-KEY": Keys.site_api}          self.updater_task = None -    def __cleanup(self): +    def __cleanup(self) -> None: +        """Cancel any leftover running updater task."""          if self.updater_task is not None:              self.updater_task.cancel() -    async def on_ready(self): +    async def on_ready(self) -> None: +        """Start off-topic channel updating event loop if it hasn't already started."""          if self.updater_task is None:              coro = update_names(self.bot, self.headers)              self.updater_task = self.bot.loop.create_task(coro)      @group(name='otname', aliases=('otnames', 'otn'), invoke_without_command=True)      @with_role(*MODERATION_ROLES) -    async def otname_group(self, ctx): +    async def otname_group(self, ctx: Context) -> None:          """Add or list items from the off-topic channel name rotation.""" -          await ctx.invoke(self.bot.get_command("help"), "otname")      @otname_group.command(name='add', aliases=('a',))      @with_role(*MODERATION_ROLES) -    async def add_command(self, ctx, name: OffTopicName): +    async def add_command(self, ctx: Context, name: OffTopicName) -> None:          """Adds a new off-topic name to the rotation.""" -          await self.bot.api_client.post(f'bot/off-topic-channel-names', params={'name': name})          log.info(              f"{ctx.author.name}#{ctx.author.discriminator}" @@ -107,9 +107,8 @@ class OffTopicNames:      @otname_group.command(name='delete', aliases=('remove', 'rm', 'del', 'd'))      @with_role(*MODERATION_ROLES) -    async def delete_command(self, ctx, name: OffTopicName): +    async def delete_command(self, ctx: Context, name: OffTopicName) -> None:          """Removes a off-topic name from the rotation.""" -          await self.bot.api_client.delete(f'bot/off-topic-channel-names/{name}')          log.info(              f"{ctx.author.name}#{ctx.author.discriminator}" @@ -119,12 +118,12 @@ class OffTopicNames:      @otname_group.command(name='list', aliases=('l',))      @with_role(*MODERATION_ROLES) -    async def list_command(self, ctx): +    async def list_command(self, ctx: Context) -> None:          """          Lists all currently known off-topic channel names in a paginator. +          Restricted to Moderator and above to not spoil the surprise.          """ -          result = await self.bot.api_client.get('bot/off-topic-channel-names')          lines = sorted(f"• {name}" for name in result)          embed = Embed( @@ -138,6 +137,7 @@ class OffTopicNames:              await ctx.send(embed=embed) -def setup(bot: Bot): +def setup(bot: Bot) -> None: +    """Off topic names cog load."""      bot.add_cog(OffTopicNames(bot))      log.info("Cog loaded: OffTopicNames") diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py index 03ea00de8..d9f6f6536 100644 --- a/bot/cogs/reminders.py +++ b/bot/cogs/reminders.py @@ -23,13 +23,14 @@ MAXIMUM_REMINDERS = 5  class Reminders(Scheduler): +    """Provide in-channel reminder functionality."""      def __init__(self, bot: Bot):          self.bot = bot          super().__init__() -    async def on_ready(self): -        # Get all the current reminders for re-scheduling +    async def on_ready(self) -> None: +        """Reschedule all current reminders."""          response = await self.bot.api_client.get(              'bot/reminders',              params={'active': 'true'} @@ -50,25 +51,16 @@ class Reminders(Scheduler):                  self.schedule_task(loop, reminder["id"], reminder)      @staticmethod -    async def _send_confirmation(ctx: Context, on_success: str): -        """ -        Send an embed confirming the change was made successfully. -        """ - +    async def _send_confirmation(ctx: Context, on_success: str) -> None: +        """Send an embed confirming the reminder change was made successfully."""          embed = Embed()          embed.colour = Colour.green()          embed.title = random.choice(POSITIVE_REPLIES)          embed.description = on_success          await ctx.send(embed=embed) -    async def _scheduled_task(self, reminder: dict): -        """ -        A coroutine which sends the reminder once the time is reached. - -        :param reminder: the data of the reminder. -        :return: -        """ - +    async def _scheduled_task(self, reminder: dict) -> None: +        """A coroutine which sends the reminder once the time is reached, and cancels the running task."""          reminder_id = reminder["id"]          reminder_datetime = datetime.fromisoformat(reminder['expiration'][:-1]) @@ -82,38 +74,22 @@ class Reminders(Scheduler):          # Now we can begone with it from our schedule list.          self.cancel_task(reminder_id) -    async def _delete_reminder(self, reminder_id: str): -        """ -        Delete a reminder from the database, given its ID. - -        :param reminder_id: The ID of the reminder. -        """ - +    async def _delete_reminder(self, reminder_id: str) -> None: +        """Delete a reminder from the database, given its ID, and cancels the running task."""          await self.bot.api_client.delete('bot/reminders/' + str(reminder_id))          # Now we can remove it from the schedule list          self.cancel_task(reminder_id) -    async def _reschedule_reminder(self, reminder): -        """ -        Reschedule a reminder object. - -        :param reminder: The reminder to be rescheduled. -        """ - +    async def _reschedule_reminder(self, reminder: dict) -> None: +        """Reschedule a reminder object."""          loop = asyncio.get_event_loop()          self.cancel_task(reminder["id"])          self.schedule_task(loop, reminder["id"], reminder) -    async def send_reminder(self, reminder, late: relativedelta = None): -        """ -        Send the reminder. - -        :param reminder: The data about the reminder. -        :param late: How late the reminder is (if at all) -        """ - +    async def send_reminder(self, reminder: dict, late: relativedelta = None) -> None: +        """Send the reminder."""          channel = self.bot.get_channel(reminder["channel_id"])          user = self.bot.get_user(reminder["author"]) @@ -140,19 +116,17 @@ class Reminders(Scheduler):          await self._delete_reminder(reminder["id"])      @group(name="remind", aliases=("reminder", "reminders"), invoke_without_command=True) -    async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str): -        """ -        Commands for managing your reminders. -        """ - +    async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> None: +        """Commands for managing your reminders."""          await ctx.invoke(self.new_reminder, expiration=expiration, content=content)      @remind_group.command(name="new", aliases=("add", "create")) -    async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str): +    async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str) -> None:          """          Set yourself a simple reminder. -        """ +        Expiration is parsed per: http://strftime.org/ +        """          embed = Embed()          # If the user is not staff, we need to verify whether or not to make a reminder at all. @@ -203,11 +177,8 @@ class Reminders(Scheduler):          self.schedule_task(loop, reminder["id"], reminder)      @remind_group.command(name="list") -    async def list_reminders(self, ctx: Context): -        """ -        View a paginated embed of all reminders for your user. -        """ - +    async def list_reminders(self, ctx: Context) -> None: +        """View a paginated embed of all reminders for your user."""          # Get all the user's reminders from the database.          data = await self.bot.api_client.get(              'bot/reminders', @@ -259,19 +230,17 @@ class Reminders(Scheduler):          )      @remind_group.group(name="edit", aliases=("change", "modify"), invoke_without_command=True) -    async def edit_reminder_group(self, ctx: Context): -        """ -        Commands for modifying your current reminders. -        """ - +    async def edit_reminder_group(self, ctx: Context) -> None: +        """Commands for modifying your current reminders."""          await ctx.invoke(self.bot.get_command("help"), "reminders", "edit")      @edit_reminder_group.command(name="duration", aliases=("time",)) -    async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate): +    async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate) -> None:          """          Edit one of your reminders' expiration. -        """ +        Expiration is parsed per: http://strftime.org/ +        """          # Send the request to update the reminder in the database          reminder = await self.bot.api_client.patch(              'bot/reminders/' + str(id_), @@ -286,11 +255,8 @@ class Reminders(Scheduler):          await self._reschedule_reminder(reminder)      @edit_reminder_group.command(name="content", aliases=("reason",)) -    async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str): -        """ -        Edit one of your reminders' content. -        """ - +    async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str) -> None: +        """Edit one of your reminders' content."""          # Send the request to update the reminder in the database          reminder = await self.bot.api_client.patch(              'bot/reminders/' + str(id_), @@ -304,17 +270,15 @@ class Reminders(Scheduler):          await self._reschedule_reminder(reminder)      @remind_group.command("delete", aliases=("remove",)) -    async def delete_reminder(self, ctx: Context, id_: int): -        """ -        Delete one of your active reminders. -        """ - +    async def delete_reminder(self, ctx: Context, id_: int) -> None: +        """Delete one of your active reminders."""          await self._delete_reminder(id_)          await self._send_confirmation(              ctx, on_success="That reminder has been deleted successfully!"          ) -def setup(bot: Bot): +def setup(bot: Bot) -> None: +    """Reminders cog load."""      bot.add_cog(Reminders(bot))      log.info("Cog loaded: Reminders") diff --git a/bot/cogs/security.py b/bot/cogs/security.py index f4a843fbf..7ada9a4f6 100644 --- a/bot/cogs/security.py +++ b/bot/cogs/security.py @@ -6,22 +6,23 @@ log = logging.getLogger(__name__)  class Security: -    """ -    Security-related helpers -    """ +    """Security-related helpers."""      def __init__(self, bot: Bot):          self.bot = bot          self.bot.check(self.check_not_bot)  # Global commands check - no bots can run any commands at all          self.bot.check(self.check_on_guild)  # Global commands check - commands can't be run in a DM -    def check_not_bot(self, ctx: Context): +    def check_not_bot(self, ctx: Context) -> bool: +        """Check if Context instance author is not a bot."""          return not ctx.author.bot -    def check_on_guild(self, ctx: Context): +    def check_on_guild(self, ctx: Context) -> bool: +        """Check if Context instance has a guild attribute."""          return ctx.guild is not None -def setup(bot): +def setup(bot: Bot) -> None: +    """Security cog load."""      bot.add_cog(Security(bot))      log.info("Cog loaded: Security") diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py index 05834e421..5a5e26655 100644 --- a/bot/cogs/snekbox.py +++ b/bot/cogs/snekbox.py @@ -41,9 +41,7 @@ MAX_PASTE_LEN = 1000  class Snekbox: -    """ -    Safe evaluation of Python code using Snekbox -    """ +    """Safe evaluation of Python code using Snekbox."""      def __init__(self, bot: Bot):          self.bot = bot @@ -173,7 +171,7 @@ class Snekbox:      @command(name="eval", aliases=("e",))      @guild_only()      @in_channel(Channels.bot, bypass_roles=STAFF_ROLES) -    async def eval_command(self, ctx: Context, *, code: str = None): +    async def eval_command(self, ctx: Context, *, code: str = None) -> None:          """          Run Python code and get the results. @@ -225,7 +223,8 @@ class Snekbox:              del self.jobs[ctx.author.id]      @eval_command.error -    async def eval_command_error(self, ctx: Context, error: CommandError): +    async def eval_command_error(self, ctx: Context, error: CommandError) -> None: +        """Eval commands error handler."""          embed = Embed(colour=Colour.red())          if isinstance(error, NoPrivateMessage): @@ -246,6 +245,7 @@ class Snekbox:              await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: +    """Snekbox cog load."""      bot.add_cog(Snekbox(bot))      log.info("Cog loaded: Snekbox") | 
