diff options
Diffstat (limited to 'bot')
64 files changed, 665 insertions, 1063 deletions
| diff --git a/bot/__init__.py b/bot/__init__.py index 9e0290a7..4729e50c 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -13,7 +13,7 @@ logging.TRACE = 5  logging.addLevelName(logging.TRACE, "TRACE") -def monkeypatch_trace(self, msg, *args, **kwargs): +def monkeypatch_trace(self: logging.Logger, msg: str, *args, **kwargs) -> None:      """      Log 'msg % args' with severity 'TRACE'. @@ -4,7 +4,7 @@ from traceback import format_exc  from typing import List  from aiohttp import AsyncResolver, ClientSession, TCPConnector -from discord import Embed +from discord import DiscordException, Embed  from discord.ext import commands  from bot.constants import Channels, Client @@ -23,7 +23,7 @@ class SeasonalBot(commands.Bot):              connector=TCPConnector(resolver=AsyncResolver(), family=socket.AF_INET)          ) -    def load_extensions(self, exts: List[str]): +    def load_extensions(self, exts: List[str]) -> None:          """Unload all current extensions, then load the given extensions."""          # Unload all cogs          extensions = list(self.extensions.keys()) @@ -40,7 +40,7 @@ class SeasonalBot(commands.Bot):              except Exception as e:                  log.error(f'Failed to load extension {cog}: {repr(e)} {format_exc()}') -    async def send_log(self, title: str, details: str = None, *, icon: str = None): +    async def send_log(self, title: str, details: str = None, *, icon: str = None) -> None:          """Send an embed message to the devlog channel."""          devlog = self.get_channel(Channels.devlog) @@ -56,7 +56,7 @@ class SeasonalBot(commands.Bot):          await devlog.send(embed=embed) -    async def on_command_error(self, context, exception): +    async def on_command_error(self, context: commands.Context, exception: DiscordException) -> None:          """Check command errors for UserInputError and reset the cooldown if thrown."""          if isinstance(exception, commands.UserInputError):              context.command.reset_cooldown(context) diff --git a/bot/decorators.py b/bot/decorators.py index 02cf4b8a..dbaad4a2 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -20,9 +20,9 @@ class InChannelCheckFailure(CheckFailure):      pass -def with_role(*role_ids: int): +def with_role(*role_ids: int) -> bool:      """Check to see whether the invoking user has any of the roles specified in role_ids.""" -    async def predicate(ctx: Context): +    async def predicate(ctx: Context) -> bool:          if not ctx.guild:  # Return False in a DM              log.debug(                  f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. " @@ -43,9 +43,9 @@ def with_role(*role_ids: int):      return commands.check(predicate) -def without_role(*role_ids: int): +def without_role(*role_ids: int) -> bool:      """Check whether the invoking user does not have all of the roles specified in role_ids.""" -    async def predicate(ctx: Context): +    async def predicate(ctx: Context) -> bool:          if not ctx.guild:  # Return False in a DM              log.debug(                  f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. " @@ -117,7 +117,7 @@ def override_in_channel(func: typing.Callable) -> typing.Callable:      return func -def locked(): +def locked() -> typing.Union[typing.Callable, None]:      """      Allows the user to only run one instance of the decorated command at a time. @@ -125,11 +125,11 @@ def locked():      This decorator has to go before (below) the `command` decorator.      """ -    def wrap(func): +    def wrap(func: typing.Callable) -> typing.Union[typing.Callable, None]:          func.__locks = WeakValueDictionary()          @wraps(func) -        async def inner(self, ctx, *args, **kwargs): +        async def inner(self: typing.Callable, ctx: Context, *args, **kwargs) -> typing.Union[typing.Callable, None]:              lock = func.__locks.setdefault(ctx.author.id, Lock())              if lock.locked():                  embed = Embed() diff --git a/bot/pagination.py b/bot/pagination.py index c12b6233..f1233482 100644 --- a/bot/pagination.py +++ b/bot/pagination.py @@ -24,7 +24,7 @@ class EmptyPaginatorEmbed(Exception):  class LinePaginator(Paginator):      """A class that aids in paginating code blocks for Discord messages.""" -    def __init__(self, prefix='```', suffix='```', max_size=2000, max_lines=None): +    def __init__(self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None):          """          Overrides the Paginator.__init__ from inside discord.ext.commands. @@ -42,7 +42,7 @@ class LinePaginator(Paginator):          self._count = len(prefix) + 1  # prefix + newline          self._pages = [] -    def add_line(self, line='', *, empty=False): +    def add_line(self, line: str = '', *, empty: bool = False) -> None:          """          Adds a line to the current page. @@ -98,7 +98,7 @@ class LinePaginator(Paginator):          ...     ctx, embed          ... )          """ -        def event_check(reaction_: Reaction, user_: Member): +        def event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on."""              no_restrictions = (                  # Pagination is not restricted @@ -274,7 +274,7 @@ class ImagePaginator(Paginator):      Refer to ImagePaginator.paginate for documentation on how to use.      """ -    def __init__(self, prefix="", suffix=""): +    def __init__(self, prefix: str = "", suffix: str = ""):          super().__init__(prefix, suffix)          self._current_page = [prefix]          self.images = [] diff --git a/bot/resources/halloween/github_links.json b/bot/resources/halloween/github_links.json index e69de29b..0967ef42 100644 --- a/bot/resources/halloween/github_links.json +++ b/bot/resources/halloween/github_links.json @@ -0,0 +1 @@ +{} diff --git a/bot/resources/persist/egg_hunt.sqlite b/bot/resources/persist/egg_hunt.sqliteBinary files differ deleted file mode 100644 index 6a7ae32d..00000000 --- a/bot/resources/persist/egg_hunt.sqlite +++ /dev/null diff --git a/bot/seasons/__init__.py b/bot/seasons/__init__.py index 1512fae2..7faf9164 100644 --- a/bot/seasons/__init__.py +++ b/bot/seasons/__init__.py @@ -1,5 +1,7 @@  import logging +from discord.ext import commands +  from bot.seasons.season import SeasonBase, SeasonManager, get_season  __all__ = ("SeasonBase", "get_season") @@ -7,6 +9,6 @@ __all__ = ("SeasonBase", "get_season")  log = logging.getLogger(__name__) -def setup(bot): +def setup(bot: commands.Bot) -> None:      bot.add_cog(SeasonManager(bot))      log.info("SeasonManager cog loaded") diff --git a/bot/seasons/christmas/__init__.py b/bot/seasons/christmas/__init__.py index 239181f4..ae93800e 100644 --- a/bot/seasons/christmas/__init__.py +++ b/bot/seasons/christmas/__init__.py @@ -18,7 +18,7 @@ class Christmas(SeasonBase):      greeting = "Happy Holidays!"      start_date = "01/12" -    end_date = "31/12" +    end_date = "01/01"      colour = Colours.dark_green      icon = ( diff --git a/bot/seasons/christmas/adventofcode.py b/bot/seasons/christmas/adventofcode.py index a9e72805..6609387e 100644 --- a/bot/seasons/christmas/adventofcode.py +++ b/bot/seasons/christmas/adventofcode.py @@ -46,7 +46,7 @@ def time_left_to_aoc_midnight() -> Tuple[datetime, timedelta]:      return tomorrow, tomorrow - datetime.now(EST) -async def countdown_status(bot: commands.Bot): +async def countdown_status(bot: commands.Bot) -> None:      """Set the playing status of the bot to the minutes & hours left until the next day's challenge."""      while is_in_advent():          _, time_left = time_left_to_aoc_midnight() @@ -73,7 +73,7 @@ async def countdown_status(bot: commands.Bot):          await asyncio.sleep(delay) -async def day_countdown(bot: commands.Bot): +async def day_countdown(bot: commands.Bot) -> None:      """      Calculate the number of seconds left until the next day of Advent. @@ -127,7 +127,7 @@ class AdventOfCode(commands.Cog):      @commands.group(name="adventofcode", aliases=("aoc",), invoke_without_command=True)      @override_in_channel -    async def adventofcode_group(self, ctx: commands.Context): +    async def adventofcode_group(self, ctx: commands.Context) -> None:          """All of the Advent of Code commands."""          await ctx.send_help(ctx.command) @@ -136,7 +136,7 @@ class AdventOfCode(commands.Cog):          aliases=("sub", "notifications", "notify", "notifs"),          brief="Notifications for new days"      ) -    async def aoc_subscribe(self, ctx: commands.Context): +    async def aoc_subscribe(self, ctx: commands.Context) -> None:          """Assign the role for notifications about new days being ready."""          role = ctx.guild.get_role(AocConfig.role_id)          unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" @@ -150,7 +150,7 @@ class AdventOfCode(commands.Cog):                             f"If you don't want them any more, run `{unsubscribe_command}` instead.")      @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") -    async def aoc_unsubscribe(self, ctx: commands.Context): +    async def aoc_unsubscribe(self, ctx: commands.Context) -> None:          """Remove the role for notifications about new days being ready."""          role = ctx.guild.get_role(AocConfig.role_id) @@ -161,7 +161,7 @@ class AdventOfCode(commands.Cog):              await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.")      @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") -    async def aoc_countdown(self, ctx: commands.Context): +    async def aoc_countdown(self, ctx: commands.Context) -> None:          """Return time left until next day."""          if not is_in_advent():              datetime_now = datetime.now(EST) @@ -178,12 +178,12 @@ class AdventOfCode(commands.Cog):          await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.")      @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") -    async def about_aoc(self, ctx: commands.Context): +    async def about_aoc(self, ctx: commands.Context) -> None:          """Respond with an explanation of all things Advent of Code."""          await ctx.send("", embed=self.cached_about_aoc)      @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join PyDis' private AoC leaderboard") -    async def join_leaderboard(self, ctx: commands.Context): +    async def join_leaderboard(self, ctx: commands.Context) -> None:          """DM the user the information for joining the PyDis AoC private leaderboard."""          author = ctx.message.author          log.info(f"{author.name} ({author.id}) has requested the PyDis AoC leaderboard code") @@ -203,7 +203,7 @@ class AdventOfCode(commands.Cog):          aliases=("board", "lb"),          brief="Get a snapshot of the PyDis private AoC leaderboard",      ) -    async def aoc_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10): +    async def aoc_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10) -> None:          """          Pull the top number_of_people_to_display members from the PyDis leaderboard and post an embed. @@ -244,7 +244,7 @@ class AdventOfCode(commands.Cog):          aliases=("dailystats", "ds"),          brief="Get daily statistics for the PyDis private leaderboard"      ) -    async def private_leaderboard_daily_stats(self, ctx: commands.Context): +    async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None:          """          Respond with a table of the daily completion statistics for the PyDis private leaderboard. @@ -287,7 +287,7 @@ class AdventOfCode(commands.Cog):          aliases=("globalboard", "gb"),          brief="Get a snapshot of the global AoC leaderboard",      ) -    async def global_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10): +    async def global_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10) -> None:          """          Pull the top number_of_people_to_display members from the global AoC leaderboard and post an embed. @@ -319,7 +319,7 @@ class AdventOfCode(commands.Cog):              embed=aoc_embed,          ) -    async def _check_leaderboard_cache(self, ctx, global_board: bool = False): +    async def _check_leaderboard_cache(self, ctx: commands.Context, global_board: bool = False) -> None:          """          Check age of current leaderboard & pull a new one if the board is too old. @@ -359,7 +359,7 @@ class AdventOfCode(commands.Cog):              )      async def _check_n_entries(self, ctx: commands.Context, number_of_people_to_display: int) -> int: -        """Check for n > max_entries and n <= 0""" +        """Check for n > max_entries and n <= 0."""          max_entries = AocConfig.leaderboard_max_displayed_members          author = ctx.message.author          if not 0 <= number_of_people_to_display <= max_entries: @@ -390,7 +390,7 @@ class AdventOfCode(commands.Cog):          return about_embed -    async def _boardgetter(self, global_board: bool): +    async def _boardgetter(self, global_board: bool) -> None:          """Invoke the proper leaderboard getter based on the global_board boolean."""          if global_board:              self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() diff --git a/bot/seasons/christmas/hanukkah_embed.py b/bot/seasons/christmas/hanukkah_embed.py index 652a1f35..aaa02b27 100644 --- a/bot/seasons/christmas/hanukkah_embed.py +++ b/bot/seasons/christmas/hanukkah_embed.py @@ -1,5 +1,6 @@  import datetime  import logging +from typing import List  from discord import Embed  from discord.ext import commands @@ -13,7 +14,7 @@ log = logging.getLogger(__name__)  class HanukkahEmbed(commands.Cog):      """A cog that returns information about Hanukkah festival.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.url = ("https://www.hebcal.com/hebcal/?v=1&cfg=json&maj=on&min=on&mod=on&nx=on&"                      "year=now&month=x&ss=on&mf=on&c=on&geo=geoname&geonameid=3448439&m=50&s=on") @@ -21,7 +22,7 @@ class HanukkahEmbed(commands.Cog):          self.hanukkah_months = []          self.hanukkah_years = [] -    async def get_hanukkah_dates(self): +    async def get_hanukkah_dates(self) -> List[str]:          """Gets the dates for hanukkah festival."""          hanukkah_dates = []          async with self.bot.http_session.get(self.url) as response: @@ -34,7 +35,7 @@ class HanukkahEmbed(commands.Cog):          return hanukkah_dates      @commands.command(name='hanukkah', aliases=['chanukah']) -    async def hanukkah_festival(self, ctx): +    async def hanukkah_festival(self, ctx: commands.Context) -> None:          """Tells you about the Hanukkah Festivaltime of festival, festival day, etc)."""          hanukkah_dates = await self.get_hanukkah_dates()          self.hanukkah_dates_split(hanukkah_dates) @@ -98,7 +99,7 @@ class HanukkahEmbed(commands.Cog):              await ctx.send(embed=embed) -    def hanukkah_dates_split(self, hanukkah_dates): +    def hanukkah_dates_split(self, hanukkah_dates: List[str]) -> None:          """We are splitting the dates for hanukkah into days, months and years."""          for date in hanukkah_dates:              self.hanukkah_days.append(date[8:10]) @@ -106,7 +107,7 @@ class HanukkahEmbed(commands.Cog):              self.hanukkah_years.append(date[0:4]) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Cog load."""      bot.add_cog(HanukkahEmbed(bot))      log.info("Hanukkah embed cog loaded") diff --git a/bot/seasons/easter/april_fools_vids.py b/bot/seasons/easter/april_fools_vids.py index d921d07c..4869f510 100644 --- a/bot/seasons/easter/april_fools_vids.py +++ b/bot/seasons/easter/april_fools_vids.py @@ -11,13 +11,13 @@ log = logging.getLogger(__name__)  class AprilFoolVideos(commands.Cog):      """A cog for April Fools' that gets a random April Fools' video from Youtube.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.yt_vids = self.load_json()          self.youtubers = ['google']  # will add more in future      @staticmethod -    def load_json(): +    def load_json() -> dict:          """A function to load JSON data."""          p = Path('bot/resources/easter/april_fools_vids.json')          with p.open() as json_file: @@ -25,7 +25,7 @@ class AprilFoolVideos(commands.Cog):          return all_vids      @commands.command(name='fool') -    async def aprial_fools(self, ctx): +    async def april_fools(self, ctx: commands.Context) -> None:          """Get a random April Fools' video from Youtube."""          random_youtuber = random.choice(self.youtubers)          category = self.yt_vids[random_youtuber] @@ -33,7 +33,7 @@ class AprilFoolVideos(commands.Cog):          await ctx.send(f"Check out this April Fools' video by {random_youtuber}.\n\n{random_vid['link']}") -def setup(bot): +def setup(bot: commands.Bot) -> None:      """April Fools' Cog load."""      bot.add_cog(AprilFoolVideos(bot))      log.info('April Fools videos cog loaded!') diff --git a/bot/seasons/easter/avatar_easterifier.py b/bot/seasons/easter/avatar_easterifier.py index ad8b5473..e21e35fc 100644 --- a/bot/seasons/easter/avatar_easterifier.py +++ b/bot/seasons/easter/avatar_easterifier.py @@ -2,7 +2,7 @@ import asyncio  import logging  from io import BytesIO  from pathlib import Path -from typing import Union +from typing import Tuple, Union  import discord  from PIL import Image @@ -21,11 +21,11 @@ COLOURS = [  class AvatarEasterifier(commands.Cog):      """Put an Easter spin on your avatar or image!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @staticmethod -    def closest(x): +    def closest(x: Tuple[int, int, int]) -> Tuple[int, int, int]:          """          Finds the closest easter colour to a given pixel. @@ -33,8 +33,8 @@ class AvatarEasterifier(commands.Cog):          """          r1, g1, b1 = x -        def distance(point): -            """Finds the difference between a pastel colour and the original pixel colour""" +        def distance(point: Tuple[int, int, int]) -> Tuple[int, int, int]: +            """Finds the difference between a pastel colour and the original pixel colour."""              r2, g2, b2 = point              return ((r1 - r2)**2 + (g1 - g2)**2 + (b1 - b2)**2) @@ -47,7 +47,7 @@ class AvatarEasterifier(commands.Cog):          return (r, g, b)      @commands.command(pass_context=True, aliases=["easterify"]) -    async def avatareasterify(self, ctx, *colours: Union[discord.Colour, str]): +    async def avatareasterify(self, ctx: commands.Context, *colours: Union[discord.Colour, str]) -> None:          """          This "Easterifies" the user's avatar. @@ -56,7 +56,7 @@ class AvatarEasterifier(commands.Cog):          Colours are split by spaces, unless you wrap the colour name in double quotes.          Discord colour names, HTML colour names, XKCD colour names and hex values are accepted.          """ -        async def send(*args, **kwargs): +        async def send(*args, **kwargs) -> str:              """              This replaces the original ctx.send. @@ -123,7 +123,7 @@ class AvatarEasterifier(commands.Cog):          await ctx.send(file=file, embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Avatar Easterifier Cog load."""      bot.add_cog(AvatarEasterifier(bot))      log.info("AvatarEasterifier cog loaded") diff --git a/bot/seasons/easter/bunny_name_generator.py b/bot/seasons/easter/bunny_name_generator.py index 76d5c478..97c467e1 100644 --- a/bot/seasons/easter/bunny_name_generator.py +++ b/bot/seasons/easter/bunny_name_generator.py @@ -3,6 +3,7 @@ import logging  import random  import re  from pathlib import Path +from typing import List, Union  from discord.ext import commands @@ -15,21 +16,21 @@ with Path("bot/resources/easter/bunny_names.json").open("r", encoding="utf8") as  class BunnyNameGenerator(commands.Cog):      """Generate a random bunny name, or bunnify your Discord username!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot -    def find_separators(self, displayname): +    def find_separators(self, displayname: str) -> Union[List[str], None]:          """Check if Discord name contains spaces so we can bunnify an individual word in the name."""          new_name = re.split(r'[_.\s]', displayname)          if displayname not in new_name:              return new_name -    def find_vowels(self, displayname): +    def find_vowels(self, displayname: str) -> str:          """          Finds vowels in the user's display name. -        If the Discord name contains a vowel and the letter y, -        it will match one or more of these patterns. +        If the Discord name contains a vowel and the letter y, it will match one or more of these patterns. +          Only the most recently matched pattern will apply the changes.          """          expressions = [ @@ -45,8 +46,8 @@ class BunnyNameGenerator(commands.Cog):              if new_name != displayname:                  return new_name -    def append_name(self, displayname): -        """Adds a suffix to the end of the Discord name""" +    def append_name(self, displayname: str) -> str: +        """Adds a suffix to the end of the Discord name."""          extensions = ['foot', 'ear', 'nose', 'tail']          suffix = random.choice(extensions)          appended_name = displayname + suffix @@ -54,13 +55,13 @@ class BunnyNameGenerator(commands.Cog):          return appended_name      @commands.command() -    async def bunnyname(self, ctx): -        """Picks a random bunny name from a JSON file""" +    async def bunnyname(self, ctx: commands.Context) -> None: +        """Picks a random bunny name from a JSON file."""          await ctx.send(random.choice(BUNNY_NAMES["names"]))      @commands.command() -    async def bunnifyme(self, ctx): -        """Gets your Discord username and bunnifies it""" +    async def bunnifyme(self, ctx: commands.Context) -> None: +        """Gets your Discord username and bunnifies it."""          username = ctx.message.author.display_name          # If name contains spaces or other separators, get the individual words to randomly bunnify @@ -86,7 +87,7 @@ class BunnyNameGenerator(commands.Cog):          await ctx.send(bunnified_name) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Bunny Name Generator Cog load."""      bot.add_cog(BunnyNameGenerator(bot))      log.info("BunnyNameGenerator cog loaded.") diff --git a/bot/seasons/easter/conversationstarters.py b/bot/seasons/easter/conversationstarters.py index c2cdf26c..3f38ae82 100644 --- a/bot/seasons/easter/conversationstarters.py +++ b/bot/seasons/easter/conversationstarters.py @@ -14,16 +14,16 @@ with open(Path("bot/resources/easter/starter.json"), "r", encoding="utf8") as f:  class ConvoStarters(commands.Cog):      """Easter conversation topics.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command() -    async def topic(self, ctx): +    async def topic(self, ctx: commands.Context) -> None:          """Responds with a random topic to start a conversation."""          await ctx.send(random.choice(starters['starters'])) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Conversation starters Cog load."""      bot.add_cog(ConvoStarters(bot))      log.info("ConvoStarters cog loaded") diff --git a/bot/seasons/easter/easter_riddle.py b/bot/seasons/easter/easter_riddle.py index 56555586..4b98b204 100644 --- a/bot/seasons/easter/easter_riddle.py +++ b/bot/seasons/easter/easter_riddle.py @@ -20,14 +20,14 @@ TIMELIMIT = 10  class EasterRiddle(commands.Cog):      """This cog contains the command for the Easter quiz!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.winners = []          self.correct = ""          self.current_channel = None      @commands.command(aliases=["riddlemethis", "riddleme"]) -    async def riddle(self, ctx): +    async def riddle(self, ctx: commands.Context) -> None:          """          Gives a random riddle, then provides 2 hints at certain intervals before revealing the answer. @@ -83,8 +83,8 @@ class EasterRiddle(commands.Cog):          self.current_channel = None      @commands.Cog.listener() -    async def on_message(self, message): -        """If a non-bot user enters a correct answer, their username gets added to self.winners""" +    async def on_message(self, message: discord.Messaged) -> None: +        """If a non-bot user enters a correct answer, their username gets added to self.winners."""          if self.current_channel != message.channel:              return @@ -95,7 +95,7 @@ class EasterRiddle(commands.Cog):              self.winners.append(message.author.mention) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Easter Riddle Cog load."""      bot.add_cog(EasterRiddle(bot))      log.info("Easter Riddle bot loaded") diff --git a/bot/seasons/easter/egg_decorating.py b/bot/seasons/easter/egg_decorating.py index ee8a80e5..51f52264 100644 --- a/bot/seasons/easter/egg_decorating.py +++ b/bot/seasons/easter/egg_decorating.py @@ -31,11 +31,11 @@ IRREPLACEABLE = [  class EggDecorating(commands.Cog):      """Decorate some easter eggs!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot) -> None:          self.bot = bot      @staticmethod -    def replace_invalid(colour: str): +    def replace_invalid(colour: str) -> Union[int, None]:          """Attempts to match with HTML or XKCD colour names, returning the int value."""          with suppress(KeyError):              return int(HTML_COLOURS[colour], 16) @@ -44,7 +44,9 @@ class EggDecorating(commands.Cog):          return None      @commands.command(aliases=["decorateegg"]) -    async def eggdecorate(self, ctx, *colours: Union[discord.Colour, str]): +    async def eggdecorate( +        self, ctx: commands.Context, *colours: Union[discord.Colour, str] +    ) -> Union[Image, discord.Message]:          """          Picks a random egg design and decorates it using the given colours. @@ -111,7 +113,7 @@ class EggDecorating(commands.Cog):          return new_im -def setup(bot): +def setup(bot: commands.bot) -> None:      """Egg decorating Cog load."""      bot.add_cog(EggDecorating(bot))      log.info("EggDecorating cog loaded.") diff --git a/bot/seasons/easter/egg_facts.py b/bot/seasons/easter/egg_facts.py index ae08ccd4..9e6fb1cb 100644 --- a/bot/seasons/easter/egg_facts.py +++ b/bot/seasons/easter/egg_facts.py @@ -21,18 +21,18 @@ class EasterFacts(commands.Cog):      It also contains a background task which sends an easter egg fact in the event channel everyday.      """ -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.facts = self.load_json()      @staticmethod -    def load_json(): +    def load_json() -> dict:          """Load a list of easter egg facts from the resource JSON file."""          p = Path("bot/resources/easter/easter_egg_facts.json")          with p.open(encoding="utf8") as f:              return load(f) -    async def send_egg_fact_daily(self): +    async def send_egg_fact_daily(self) -> None:          """A background task that sends an easter egg fact in the event channel everyday."""          channel = self.bot.get_channel(Channels.seasonalbot_chat)          while True: @@ -41,12 +41,12 @@ class EasterFacts(commands.Cog):              await asyncio.sleep(24 * 60 * 60)      @commands.command(name='eggfact', aliases=['fact']) -    async def easter_facts(self, ctx): +    async def easter_facts(self, ctx: commands.Context) -> None:          """Get easter egg facts."""          embed = self.make_embed()          await ctx.send(embed=embed) -    def make_embed(self): +    def make_embed(self) -> discord.Embed:          """Makes a nice embed for the message to be sent."""          return discord.Embed(              colour=Colours.soft_red, @@ -55,7 +55,7 @@ class EasterFacts(commands.Cog):          ) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Easter Egg facts cog load."""      bot.loop.create_task(EasterFacts(bot).send_egg_fact_daily())      bot.add_cog(EasterFacts(bot)) diff --git a/bot/seasons/easter/egg_hunt/__init__.py b/bot/seasons/easter/egg_hunt/__init__.py deleted file mode 100644 index 0e4b9e16..00000000 --- a/bot/seasons/easter/egg_hunt/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -import logging - -from .cog import EggHunt - -log = logging.getLogger(__name__) - - -def setup(bot): -    """Easter Egg Hunt Cog load.""" -    bot.add_cog(EggHunt()) -    log.info("EggHunt cog loaded") diff --git a/bot/seasons/easter/egg_hunt/cog.py b/bot/seasons/easter/egg_hunt/cog.py deleted file mode 100644 index a4ad27df..00000000 --- a/bot/seasons/easter/egg_hunt/cog.py +++ /dev/null @@ -1,618 +0,0 @@ -import asyncio -import contextlib -import logging -import random -import sqlite3 -from datetime import datetime, timezone -from pathlib import Path - -import discord -from discord.ext import commands - -from bot.bot import bot -from bot.constants import Channels, Client, Roles as MainRoles -from bot.decorators import with_role -from .constants import Colours, EggHuntSettings, Emoji, Roles - -log = logging.getLogger(__name__) - -DB_PATH = Path("bot/resources/persist/egg_hunt.sqlite") - -TEAM_MAP = { -    Roles.white: Emoji.egg_white, -    Roles.blurple: Emoji.egg_blurple, -    Emoji.egg_white: Roles.white, -    Emoji.egg_blurple: Roles.blurple -} - -GUILD = bot.get_guild(Client.guild) - -MUTED = GUILD.get_role(MainRoles.muted) - - -def get_team_role(user: discord.Member) -> discord.Role: -    """Helper function to get the team role for a member.""" -    if Roles.white in user.roles: -        return Roles.white -    if Roles.blurple in user.roles: -        return Roles.blurple - - -async def assign_team(user: discord.Member) -> discord.Member: -    """Helper function to assign a new team role for a member.""" -    db = sqlite3.connect(DB_PATH) -    c = db.cursor() -    c.execute(f"SELECT team FROM user_scores WHERE user_id = {user.id}") -    result = c.fetchone() -    if not result: -        c.execute( -            "SELECT team, COUNT(*) AS count FROM user_scores " -            "GROUP BY team ORDER BY count ASC LIMIT 1;" -        ) -        result = c.fetchone() -        result = result[0] if result else "WHITE" - -    if result[0] == "WHITE": -        new_team = Roles.white -    else: -        new_team = Roles.blurple - -    db.close() - -    log.debug(f"Assigned role {new_team} to {user}.") - -    await user.add_roles(new_team) -    return GUILD.get_member(user.id) - - -class EggMessage: -    """Handles a single egg reaction drop session.""" - -    def __init__(self, message: discord.Message, egg: discord.Emoji): -        self.message = message -        self.egg = egg -        self.first = None -        self.users = set() -        self.teams = {Roles.white: "WHITE", Roles.blurple: "BLURPLE"} -        self.new_team_assignments = {} -        self.timeout_task = None - -    @staticmethod -    def add_user_score_sql(user_id: int, team: str, score: int) -> str: -        """Builds the SQL for adding a score to a user in the database.""" -        return ( -            "INSERT INTO user_scores(user_id, team, score)" -            f"VALUES({user_id}, '{team}', {score})" -            f"ON CONFLICT (user_id) DO UPDATE SET score=score+{score}" -        ) - -    @staticmethod -    def add_team_score_sql(team_name: str, score: int) -> str: -        """Builds the SQL for adding a score to a team in the database.""" -        return f"UPDATE team_scores SET team_score=team_score+{score} WHERE team_id='{team_name}'" - -    def finalise_score(self): -        """Sums and actions scoring for this egg drop session.""" -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() - -        team_scores = {"WHITE": 0, "BLURPLE": 0} - -        first_team = get_team_role(self.first) -        if not first_team: -            log.debug("User without team role!") -            db.close() -            return - -        score = 3 if first_team == TEAM_MAP[first_team] else 2 - -        c.execute(self.add_user_score_sql(self.first.id, self.teams[first_team], score)) -        team_scores[self.teams[first_team]] += score - -        for user in self.users: -            team = get_team_role(user) -            if not team: -                log.debug("User without team role!") -                continue - -            team_name = self.teams[team] -            team_scores[team_name] += 1 -            score = 2 if team == first_team else 1 -            c.execute(self.add_user_score_sql(user.id, team_name, score)) - -        for team_name, score in team_scores.items(): -            if not score: -                continue -            c.execute(self.add_team_score_sql(team_name, score)) - -        db.commit() -        db.close() - -        log.debug( -            f"EggHunt session finalising: ID({self.message.id}) " -            f"FIRST({self.first}) REST({self.users})." -        ) - -    async def start_timeout(self, seconds: int = 5): -        """Begins a task that will sleep until the given seconds before finalizing the session.""" -        if self.timeout_task: -            self.timeout_task.cancel() -            self.timeout_task = None - -        await asyncio.sleep(seconds) - -        bot.remove_listener(self.collect_reacts, name="on_reaction_add") - -        with contextlib.suppress(discord.Forbidden): -            await self.message.clear_reactions() - -        if self.first: -            self.finalise_score() - -    def is_valid_react(self, reaction: discord.Reaction, user: discord.Member) -> bool: -        """Validates a reaction event was meant for this session.""" -        if user.bot: -            return False -        if reaction.message.id != self.message.id: -            return False -        if reaction.emoji != self.egg: -            return False - -        # Ignore the punished -        if MUTED in user.roles: -            return False - -        return True - -    async def collect_reacts(self, reaction: discord.Reaction, user: discord.Member): -        """Handles emitted reaction_add events via listener.""" -        if not self.is_valid_react(reaction, user): -            return - -        team = get_team_role(user) -        if not team: -            log.debug(f"Assigning a team for {user}.") -            user = await assign_team(user) - -        if not self.first: -            log.debug(f"{user} was first to react to egg on {self.message.id}.") -            self.first = user -            await self.start_timeout() -        else: -            if user != self.first: -                self.users.add(user) - -    async def start(self): -        """Starts the egg drop session.""" -        log.debug(f"EggHunt session started for message {self.message.id}.") -        bot.add_listener(self.collect_reacts, name="on_reaction_add") -        with contextlib.suppress(discord.Forbidden): -            await self.message.add_reaction(self.egg) -        self.timeout_task = asyncio.create_task(self.start_timeout(300)) -        while True: -            if not self.timeout_task: -                break -            if not self.timeout_task.done(): -                await self.timeout_task -            else: -                # make sure any exceptions raise if necessary -                self.timeout_task.result() -                break - - -class SuperEggMessage(EggMessage): -    """Handles a super egg session.""" - -    def __init__(self, message: discord.Message, egg: discord.Emoji, window: int): -        super().__init__(message, egg) -        self.window = window - -    async def finalise_score(self): -        """Sums and actions scoring for this super egg session.""" -        try: -            message = await self.message.channel.fetch_message(self.message.id) -        except discord.NotFound: -            return - -        count = 0 -        white = 0 -        blurple = 0 -        react_users = [] -        for reaction in message.reactions: -            if reaction.emoji == self.egg: -                react_users = await reaction.users().flatten() -                for user in react_users: -                    team = get_team_role(user) -                    if team == Roles.white: -                        white += 1 -                    elif team == Roles.blurple: -                        blurple += 1 -                count = reaction.count - 1 -                break - -        score = 50 if self.egg == Emoji.egg_gold else 100 -        if white == blurple: -            log.debug("Tied SuperEgg Result.") -            team = None -            score /= 2 -        elif white > blurple: -            team = Roles.white -        else: -            team = Roles.blurple - -        embed = self.message.embeds[0] - -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() - -        user_bonus = 5 if self.egg == Emoji.egg_gold else 10 -        for user in react_users: -            if user.bot: -                continue -            role = get_team_role(user) -            if not role: -                print("issue") -            user_score = 1 if user != self.first else user_bonus -            c.execute(self.add_user_score_sql(user.id, self.teams[role], user_score)) - -        if not team: -            embed.description = f"{embed.description}\n\nA Tie!\nBoth got {score} points!" -            c.execute(self.add_team_score_sql(self.teams[Roles.white], score)) -            c.execute(self.add_team_score_sql(self.teams[Roles.blurple], score)) -            team_name = "TIE" -        else: -            team_name = self.teams[team] -            embed.description = ( -                f"{embed.description}\n\nTeam {team_name.capitalize()} won the points!" -            ) -            c.execute(self.add_team_score_sql(team_name, score)) - -        c.execute( -            "INSERT INTO super_eggs (message_id, egg_type, team, window) " -            f"VALUES ({self.message.id}, '{self.egg.name}', '{team_name}', {self.window});" -        ) - -        log.debug("Committing Super Egg scores.") -        db.commit() -        db.close() - -        embed.set_footer(text=f"Finished with {count} total reacts.") -        with contextlib.suppress(discord.HTTPException): -            await self.message.edit(embed=embed) - -    async def start_timeout(self, seconds=None): -        """Starts the super egg session.""" -        if not seconds: -            return -        count = 4 -        for _ in range(count): -            await asyncio.sleep(60) -            embed = self.message.embeds[0] -            embed.set_footer(text=f"Finishing in {count} minutes.") -            try: -                await self.message.edit(embed=embed) -            except discord.HTTPException: -                break -            count -= 1 -        bot.remove_listener(self.collect_reacts, name="on_reaction_add") -        await self.finalise_score() - - -class EggHunt(commands.Cog): -    """Easter Egg Hunt Event.""" - -    def __init__(self): -        self.event_channel = GUILD.get_channel(Channels.seasonalbot_chat) -        self.super_egg_buffer = 60*60 -        self.tables = { -            "super_eggs": ( -                "CREATE TABLE super_eggs (" -                "message_id INTEGER NOT NULL " -                "  CONSTRAINT super_eggs_pk PRIMARY KEY, " -                "egg_type   TEXT    NOT NULL, " -                "team       TEXT    NOT NULL, " -                "window     INTEGER);" -            ), -            "team_scores": ( -                "CREATE TABLE team_scores (" -                "team_id TEXT, " -                "team_score INTEGER DEFAULT 0);" -            ), -            "user_scores": ( -                "CREATE TABLE user_scores(" -                "user_id INTEGER NOT NULL " -                "  CONSTRAINT user_scores_pk PRIMARY KEY, " -                "team TEXT NOT NULL, " -                "score INTEGER DEFAULT 0 NOT NULL);" -            ), -            "react_logs": ( -                "CREATE TABLE react_logs(" -                "member_id INTEGER NOT NULL, " -                "message_id INTEGER NOT NULL, " -                "reaction_id TEXT NOT NULL, " -                "react_timestamp REAL NOT NULL);" -            ) -        } -        self.prepare_db() -        self.task = asyncio.create_task(self.super_egg()) -        self.task.add_done_callback(self.task_cleanup) - -    def prepare_db(self): -        """Ensures database tables all exist and if not, creates them.""" -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() - -        exists_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';" - -        missing_tables = [] -        for table in self.tables: -            c.execute(exists_sql.format(table_name=table)) -            result = c.fetchone() -            if not result: -                missing_tables.append(table) - -        for table in missing_tables: -            log.info(f"Table {table} is missing, building new one.") -            c.execute(self.tables[table]) - -        db.commit() -        db.close() - -    def task_cleanup(self, task): -        """Returns task result and restarts. Used as a done callback to show raised exceptions.""" -        task.result() -        self.task = asyncio.create_task(self.super_egg()) - -    @staticmethod -    def current_timestamp() -> float: -        """Returns a timestamp of the current UTC time.""" -        return datetime.utcnow().replace(tzinfo=timezone.utc).timestamp() - -    async def super_egg(self): -        """Manages the timing of super egg drops.""" -        while True: -            now = int(self.current_timestamp()) - -            if now > EggHuntSettings.end_time: -                log.debug("Hunt ended. Ending task.") -                break - -            if now < EggHuntSettings.start_time: -                remaining = EggHuntSettings.start_time - now -                log.debug(f"Hunt not started yet. Sleeping for {remaining}.") -                await asyncio.sleep(remaining) - -            log.debug(f"Hunt started.") - -            db = sqlite3.connect(DB_PATH) -            c = db.cursor() - -            current_window = None -            next_window = None -            windows = EggHuntSettings.windows.copy() -            windows.insert(0, EggHuntSettings.start_time) -            for i, window in enumerate(windows): -                c.execute(f"SELECT COUNT(*) FROM super_eggs WHERE window={window}") -                already_dropped = c.fetchone()[0] - -                if already_dropped: -                    log.debug(f"Window {window} already dropped, checking next one.") -                    continue - -                if now < window: -                    log.debug("Drop windows up to date, sleeping until next one.") -                    await asyncio.sleep(window-now) -                    now = int(self.current_timestamp()) - -                current_window = window -                next_window = windows[i+1] -                break - -            count = c.fetchone() -            db.close() - -            if not current_window: -                log.debug("No drop windows left, ending task.") -                break - -            log.debug(f"Current Window: {current_window}. Next Window {next_window}") - -            if not count: -                if next_window < now: -                    log.debug("An Egg Drop Window was missed, dropping one now.") -                    next_drop = 0 -                else: -                    next_drop = random.randrange(now, next_window) - -                if next_drop: -                    log.debug(f"Sleeping until next super egg drop: {next_drop}.") -                    await asyncio.sleep(next_drop) - -                if random.randrange(10) <= 2: -                    egg = Emoji.egg_diamond -                    egg_type = "Diamond" -                    score = "100" -                    colour = Colours.diamond -                else: -                    egg = Emoji.egg_gold -                    egg_type = "Gold" -                    score = "50" -                    colour = Colours.gold - -                embed = discord.Embed( -                    title=f"A {egg_type} Egg Has Appeared!", -                    description=f"**Worth {score} team points!**\n\n" -                                "The team with the most reactions after 5 minutes wins!", -                    colour=colour -                ) -                embed.set_thumbnail(url=egg.url) -                embed.set_footer(text="Finishing in 5 minutes.") -                msg = await self.event_channel.send(embed=embed) -                await SuperEggMessage(msg, egg, current_window).start() - -            log.debug("Sleeping until next window.") -            next_loop = max(next_window - int(self.current_timestamp()), self.super_egg_buffer) -            await asyncio.sleep(next_loop) - -    @commands.Cog.listener() -    async def on_raw_reaction_add(self, payload): -        """Reaction event listener for reaction logging for later anti-cheat analysis.""" -        if payload.channel_id not in EggHuntSettings.allowed_channels: -            return - -        now = self.current_timestamp() -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() -        c.execute( -            "INSERT INTO react_logs(member_id, message_id, reaction_id, react_timestamp) " -            f"VALUES({payload.user_id}, {payload.message_id}, '{payload.emoji}', {now})" -        ) -        db.commit() -        db.close() - -    @commands.Cog.listener() -    async def on_message(self, message): -        """Message event listener for random egg drops.""" -        if self.current_timestamp() < EggHuntSettings.start_time: -            return - -        if message.channel.id not in EggHuntSettings.allowed_channels: -            log.debug("Message not in Egg Hunt channel; ignored.") -            return - -        if message.author.bot: -            return - -        if random.randrange(100) <= 5: -            await EggMessage(message, random.choice([Emoji.egg_white, Emoji.egg_blurple])).start() - -    @commands.group(invoke_without_command=True) -    async def hunt(self, ctx): -        """ -        For 48 hours, hunt down as many eggs randomly appearing as possible. - -        Standard Eggs -        -------------- -        Egg React: +1pt -        Team Bonus for Claimed Egg: +1pt -        First React on Other Team Egg: +1pt -        First React on Your Team Egg: +2pt - -        If you get first react, you will claim that egg for your team, allowing -        your team to get the Team Bonus point, but be quick, as the egg will -        disappear after 5 seconds of the first react. - -        Super Eggs -        ----------- -        Gold Egg: 50 team pts, 5pts to first react -        Diamond Egg: 100 team pts, 10pts to first react - -        Super Eggs only appear in #seasonalbot-chat so be sure to keep an eye -        out. They stay around for 5 minutes and the team with the most reacts -        wins the points. -        """ -        await ctx.invoke(bot.get_command("help"), command="hunt") - -    @hunt.command() -    async def countdown(self, ctx): -        """Show the time status of the Egg Hunt event.""" -        now = self.current_timestamp() -        if now > EggHuntSettings.end_time: -            return await ctx.send("The Hunt has ended.") - -        difference = EggHuntSettings.start_time - now -        if difference < 0: -            difference = EggHuntSettings.end_time - now -            msg = "The Egg Hunt will end in" -        else: -            msg = "The Egg Hunt will start in" - -        hours, r = divmod(difference, 3600) -        minutes, r = divmod(r, 60) -        await ctx.send(f"{msg} {hours:.0f}hrs, {minutes:.0f}mins & {r:.0f}secs") - -    @hunt.command() -    async def leaderboard(self, ctx): -        """Show the Egg Hunt Leaderboards.""" -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() -        c.execute(f"SELECT *, RANK() OVER(ORDER BY score DESC) AS rank FROM user_scores LIMIT 10") -        user_result = c.fetchall() -        c.execute(f"SELECT * FROM team_scores ORDER BY team_score DESC") -        team_result = c.fetchall() -        db.close() -        output = [] -        if user_result: -            # Get the alignment needed for the score -            score_lengths = [] -            for result in user_result: -                length = len(str(result[2])) -                score_lengths.append(length) - -            score_length = max(score_lengths) -            for user_id, team, score, rank in user_result: -                user = GUILD.get_member(user_id) or user_id -                team = team.capitalize() -                score = f"{score}pts" -                output.append(f"{rank:>2}. {score:>{score_length+3}} - {user} ({team})") -            user_board = "\n".join(output) -        else: -            user_board = "No entries." -        if team_result: -            output = [] -            for team, score in team_result: -                output.append(f"{team:<7}: {score}") -            team_board = "\n".join(output) -        else: -            team_board = "No entries." -        embed = discord.Embed( -            title="Egg Hunt Leaderboards", -            description=f"**Team Scores**\n```\n{team_board}\n```\n" -                        f"**Top 10 Members**\n```\n{user_board}\n```" -        ) -        await ctx.send(embed=embed) - -    @hunt.command() -    async def rank(self, ctx, *, member: discord.Member = None): -        """Get your ranking in the Egg Hunt Leaderboard.""" -        member = member or ctx.author -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() -        c.execute( -            "SELECT rank FROM " -            "(SELECT RANK() OVER(ORDER BY score DESC) AS rank, user_id FROM user_scores)" -            f"WHERE user_id = {member.id};" -        ) -        result = c.fetchone() -        db.close() -        if not result: -            embed = discord.Embed().set_author(name=f"Egg Hunt - No Ranking") -        else: -            embed = discord.Embed().set_author(name=f"Egg Hunt - Rank #{result[0]}") -        await ctx.send(embed=embed) - -    @with_role(MainRoles.admin) -    @hunt.command() -    async def clear_db(self, ctx): -        """Resets the database to it's initial state.""" -        def check(msg): -            if msg.author != ctx.author: -                return False -            if msg.channel != ctx.channel: -                return False -            return True -        await ctx.send( -            "WARNING: This will delete all current event data.\n" -            "Please verify this action by replying with 'Yes, I want to delete all data.'" -        ) -        reply_msg = await bot.wait_for('message', check=check) -        if reply_msg.content != "Yes, I want to delete all data.": -            return await ctx.send("Reply did not match. Aborting database deletion.") -        db = sqlite3.connect(DB_PATH) -        c = db.cursor() -        c.execute("DELETE FROM super_eggs;") -        c.execute("DELETE FROM user_scores;") -        c.execute("UPDATE team_scores SET team_score=0") -        db.commit() -        db.close() -        await ctx.send("Database successfully cleared.") diff --git a/bot/seasons/easter/egg_hunt/constants.py b/bot/seasons/easter/egg_hunt/constants.py deleted file mode 100644 index 02f6e9f2..00000000 --- a/bot/seasons/easter/egg_hunt/constants.py +++ /dev/null @@ -1,40 +0,0 @@ -import os - -from discord import Colour - -from bot.bot import bot -from bot.constants import Channels, Client - - -GUILD = bot.get_guild(Client.guild) - - -class EggHuntSettings: -    start_time = int(os.environ["HUNT_START"]) -    end_time = start_time + 172800  # 48 hrs later -    windows = [int(w) for w in os.environ.get("HUNT_WINDOWS").split(',')] or [] -    allowed_channels = [ -        Channels.seasonalbot_chat, -        Channels.off_topic_0, -        Channels.off_topic_1, -        Channels.off_topic_2, -    ] - - -class Roles: -    white = GUILD.get_role(569304397054607363) -    blurple = GUILD.get_role(569304472820514816) - - -class Emoji: -    egg_white = bot.get_emoji(569266762428841989) -    egg_blurple = bot.get_emoji(569266666094067819) -    egg_gold = bot.get_emoji(569266900106739712) -    egg_diamond = bot.get_emoji(569266839738384384) - - -class Colours: -    white = Colour(0xFFFFFF) -    blurple = Colour(0x7289DA) -    gold = Colour(0xE4E415) -    diamond = Colour(0xECF5FF) diff --git a/bot/seasons/easter/egghead_quiz.py b/bot/seasons/easter/egghead_quiz.py index 3e0cc598..bd179fe2 100644 --- a/bot/seasons/easter/egghead_quiz.py +++ b/bot/seasons/easter/egghead_quiz.py @@ -3,6 +3,7 @@ import logging  import random  from json import load  from pathlib import Path +from typing import Union  import discord  from discord.ext import commands @@ -30,14 +31,14 @@ TIMELIMIT = 30  class EggheadQuiz(commands.Cog):      """This cog contains the command for the Easter quiz!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot) -> None:          self.bot = bot          self.quiz_messages = {}      @commands.command(aliases=["eggheadquiz", "easterquiz"]) -    async def eggquiz(self, ctx): +    async def eggquiz(self, ctx: commands.Context) -> None:          """ -        Gives a random quiz question, waits 30 seconds and then outputs the answer +        Gives a random quiz question, waits 30 seconds and then outputs the answer.          Also informs of the percentages and votes of each option          """ @@ -95,14 +96,14 @@ class EggheadQuiz(commands.Cog):          await ctx.send(content, embed=a_embed)      @staticmethod -    async def already_reacted(message, user): -        """Returns whether a given user has reacted more than once to a given message""" +    async def already_reacted(message: discord.Message, user: Union[discord.Member, discord.User]) -> bool: +        """Returns whether a given user has reacted more than once to a given message."""          users = [u.id for reaction in [await r.users().flatten() for r in message.reactions] for u in reaction]          return users.count(user.id) > 1  # Old reaction plus new reaction      @commands.Cog.listener() -    async def on_reaction_add(self, reaction, user): -        """Listener to listen specifically for reactions of quiz messages""" +    async def on_reaction_add(self, reaction: discord.Reaction, user: Union[discord.Member, discord.User]) -> None: +        """Listener to listen specifically for reactions of quiz messages."""          if user.bot:              return          if reaction.message.id not in self.quiz_messages: @@ -113,7 +114,7 @@ class EggheadQuiz(commands.Cog):              return await reaction.message.remove_reaction(reaction, user) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Egghead Quiz Cog load."""      bot.add_cog(EggheadQuiz(bot))      log.info("EggheadQuiz bot loaded") diff --git a/bot/seasons/easter/traditions.py b/bot/seasons/easter/traditions.py index f04b8828..9529823f 100644 --- a/bot/seasons/easter/traditions.py +++ b/bot/seasons/easter/traditions.py @@ -14,18 +14,18 @@ with open(Path("bot/resources/easter/traditions.json"), "r", encoding="utf8") as  class Traditions(commands.Cog):      """A cog which allows users to get a random easter tradition or custom from a random country.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(aliases=('eastercustoms',)) -    async def easter_tradition(self, ctx): -        """Responds with a random tradition or custom""" +    async def easter_tradition(self, ctx: commands.Context) -> None: +        """Responds with a random tradition or custom."""          random_country = random.choice(list(traditions))          await ctx.send(f"{random_country}:\n{traditions[random_country]}") -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Traditions Cog load."""      bot.add_cog(Traditions(bot))      log.info("Traditions cog loaded") diff --git a/bot/seasons/evergreen/8bitify.py b/bot/seasons/evergreen/8bitify.py index 54db71db..60062fc1 100644 --- a/bot/seasons/evergreen/8bitify.py +++ b/bot/seasons/evergreen/8bitify.py @@ -13,17 +13,17 @@ class EightBitify(commands.Cog):      @staticmethod      def pixelate(image: Image) -> Image: -        """Takes an image and pixelates it""" +        """Takes an image and pixelates it."""          return image.resize((32, 32)).resize((1024, 1024))      @staticmethod      def quantize(image: Image) -> Image: -        """Reduces colour palette to 256 colours""" +        """Reduces colour palette to 256 colours."""          return image.quantize(colors=32)      @commands.command(name="8bitify")      async def eightbit_command(self, ctx: commands.Context) -> None: -        """Pixelates your avatar and changes the palette to an 8bit one""" +        """Pixelates your avatar and changes the palette to an 8bit one."""          async with ctx.typing():              image_bytes = await ctx.author.avatar_url.read()              avatar = Image.open(BytesIO(image_bytes)) diff --git a/bot/seasons/evergreen/error_handler.py b/bot/seasons/evergreen/error_handler.py index 6690cf89..120462ee 100644 --- a/bot/seasons/evergreen/error_handler.py +++ b/bot/seasons/evergreen/error_handler.py @@ -4,7 +4,7 @@ import random  import sys
  import traceback
 -from discord import Colour, Embed
 +from discord import Colour, Embed, Message
  from discord.ext import commands
  from bot.constants import NEGATIVE_REPLIES
 @@ -16,11 +16,11 @@ log = logging.getLogger(__name__)  class CommandErrorHandler(commands.Cog):
      """A error handler for the PythonDiscord server."""
 -    def __init__(self, bot):
 +    def __init__(self, bot: commands.Bot):
          self.bot = bot
      @staticmethod
 -    def revert_cooldown_counter(command, message):
 +    def revert_cooldown_counter(command: commands.Command, message: Message) -> None:
          """Undoes the last cooldown counter for user-error cases."""
          if command._buckets.valid:
              bucket = command._buckets.get_bucket(message)
 @@ -30,7 +30,7 @@ class CommandErrorHandler(commands.Cog):              )
      @commands.Cog.listener()
 -    async def on_command_error(self, ctx, error):
 +    async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
          """Activates when a command opens an error."""
          if hasattr(ctx.command, 'on_error'):
              return logging.debug(
 @@ -113,7 +113,7 @@ class CommandErrorHandler(commands.Cog):          traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
 -def setup(bot):
 +def setup(bot: commands.Bot) -> None:
      """Error handler Cog load."""
      bot.add_cog(CommandErrorHandler(bot))
      log.info("CommandErrorHandler cog loaded")
 diff --git a/bot/seasons/evergreen/fun.py b/bot/seasons/evergreen/fun.py index ce3484f7..889ae079 100644 --- a/bot/seasons/evergreen/fun.py +++ b/bot/seasons/evergreen/fun.py @@ -1,21 +1,39 @@ +import functools  import logging  import random +from typing import Callable, Tuple, Union +from discord import Embed, Message  from discord.ext import commands +from discord.ext.commands import Bot, Cog, Context, MessageConverter +from bot import utils  from bot.constants import Emojis  log = logging.getLogger(__name__) +UWU_WORDS = { +    "fi": "fwi", +    "l": "w", +    "r": "w", +    "some": "sum", +    "th": "d", +    "thing": "fing", +    "tho": "fo", +    "you're": "yuw'we", +    "your": "yur", +    "you": "yuw", +} -class Fun(commands.Cog): + +class Fun(Cog):      """A collection of general commands for fun.""" -    def __init__(self, bot): +    def __init__(self, bot: Bot) -> None:          self.bot = bot      @commands.command() -    async def roll(self, ctx, num_rolls: int = 1): +    async def roll(self, ctx: Context, num_rolls: int = 1) -> None:          """Outputs a number of random dice emotes (up to 6)."""          output = ""          if num_rolls > 6: @@ -27,8 +45,104 @@ class Fun(commands.Cog):              output += getattr(Emojis, terning, '')          await ctx.send(output) +    @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) +    async def uwu_command(self, ctx: Context, *, text: str) -> None: +        """ +        Converts a given `text` into it's uwu equivalent. + +        Also accepts a valid discord Message ID or link. +        """ +        conversion_func = functools.partial( +            utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True +        ) +        text, embed = await Fun._get_text_and_embed(ctx, text) +        # Convert embed if it exists +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) +        converted_text = conversion_func(text) +        # Don't put >>> if only embed present +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" +        await ctx.send(content=converted_text, embed=embed) + +    @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) +    async def randomcase_command(self, ctx: Context, *, text: str) -> None: +        """ +        Randomly converts the casing of a given `text`. + +        Also accepts a valid discord Message ID or link. +        """ +        def conversion_func(text: str) -> str: +            """Randomly converts the casing of a given string.""" +            return "".join( +                char.upper() if round(random.random()) else char.lower() for char in text +            ) +        text, embed = await Fun._get_text_and_embed(ctx, text) +        # Convert embed if it exists +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) +        converted_text = conversion_func(text) +        # Don't put >>> if only embed present +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" +        await ctx.send(content=converted_text, embed=embed) + +    @staticmethod +    async def _get_text_and_embed(ctx: Context, text: str) -> Tuple[str, Union[Embed, None]]: +        """ +        Attempts to extract the text and embed from a possible link to a discord Message. + +        Returns a tuple of: +            str: If `text` is a valid discord Message, the contents of the message, else `text`. +            Union[Embed, None]: The embed if found in the valid Message, else None +        """ +        embed = None +        message = await Fun._get_discord_message(ctx, text) +        if isinstance(message, Message): +            text = message.content +            # Take first embed because we can't send multiple embeds +            if message.embeds: +                embed = message.embeds[0] +        return (text, embed) + +    @staticmethod +    async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]: +        """ +        Attempts to convert a given `text` to a discord Message object and return it. + +        Conversion will succeed if given a discord Message ID or link. +        Returns `text` if the conversion fails. +        """ +        try: +            text = await MessageConverter().convert(ctx, text) +        except commands.BadArgument: +            log.debug(f"Input '{text:.20}...' is not a valid Discord Message") +        return text + +    @staticmethod +    def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: +        """ +        Converts the text in an embed using a given conversion function, then return the embed. + +        Only modifies the following fields: title, description, footer, fields +        """ +        embed_dict = embed.to_dict() + +        embed_dict["title"] = func(embed_dict.get("title", "")) +        embed_dict["description"] = func(embed_dict.get("description", "")) + +        if "footer" in embed_dict: +            embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) + +        if "fields" in embed_dict: +            for field in embed_dict["fields"]: +                field["name"] = func(field.get("name", "")) +                field["value"] = func(field.get("value", "")) + +        return Embed.from_dict(embed_dict) + -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Fun Cog load."""      bot.add_cog(Fun(bot))      log.info("Fun cog loaded") diff --git a/bot/seasons/evergreen/issues.py b/bot/seasons/evergreen/issues.py index f19a1129..0ba74d9c 100644 --- a/bot/seasons/evergreen/issues.py +++ b/bot/seasons/evergreen/issues.py @@ -12,12 +12,14 @@ log = logging.getLogger(__name__)  class Issues(commands.Cog):      """Cog that allows users to retrieve issues from GitHub.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(aliases=("issues",))      @override_in_channel -    async def issue(self, ctx, number: int, repository: str = "seasonalbot", user: str = "python-discord"): +    async def issue( +        self, ctx: commands.Context, number: int, repository: str = "seasonalbot", user: str = "python-discord" +    ) -> None:          """Command to retrieve issues from a GitHub repository."""          api_url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}"          failed_status = { @@ -49,7 +51,7 @@ class Issues(commands.Cog):          await ctx.send(embed=issue_embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Github Issues Cog Load."""      bot.add_cog(Issues(bot))      log.info("Issues cog loaded") diff --git a/bot/seasons/evergreen/magic_8ball.py b/bot/seasons/evergreen/magic_8ball.py index 55652af7..e47ef454 100644 --- a/bot/seasons/evergreen/magic_8ball.py +++ b/bot/seasons/evergreen/magic_8ball.py @@ -11,13 +11,13 @@ log = logging.getLogger(__name__)  class Magic8ball(commands.Cog):      """A Magic 8ball command to respond to a user's question.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          with open(Path("bot/resources/evergreen/magic8ball.json"), "r") as file:              self.answers = json.load(file)      @commands.command(name="8ball") -    async def output_answer(self, ctx, *, question): +    async def output_answer(self, ctx: commands.Context, *, question: str) -> None:          """Return a Magic 8ball answer from answers list."""          if len(question.split()) >= 3:              answer = random.choice(self.answers) @@ -26,7 +26,7 @@ class Magic8ball(commands.Cog):              await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)") -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Magic 8ball Cog load."""      bot.add_cog(Magic8ball(bot))      log.info("Magic8ball cog loaded") diff --git a/bot/seasons/evergreen/minesweeper.py b/bot/seasons/evergreen/minesweeper.py index cb859ea9..b0ba8145 100644 --- a/bot/seasons/evergreen/minesweeper.py +++ b/bot/seasons/evergreen/minesweeper.py @@ -32,8 +32,8 @@ log = logging.getLogger(__name__)  class CoordinateConverter(commands.Converter):      """Converter for Coordinates.""" -    async def convert(self, ctx, coordinate: str) -> typing.Tuple[int, int]: -        """Take in a coordinate string and turn it into x, y""" +    async def convert(self, ctx: commands.Context, coordinate: str) -> typing.Tuple[int, int]: +        """Take in a coordinate string and turn it into an (x, y) tuple."""          if not 2 <= len(coordinate) <= 3:              raise commands.BadArgument('Invalid co-ordinate provided') @@ -80,8 +80,8 @@ class Minesweeper(commands.Cog):          self.games: GamesDict = {}  # Store the currently running games      @commands.group(name='minesweeper', aliases=('ms',), invoke_without_command=True) -    async def minesweeper_group(self, ctx: commands.Context): -        """Commands for Playing Minesweeper""" +    async def minesweeper_group(self, ctx: commands.Context) -> None: +        """Commands for Playing Minesweeper."""          await ctx.send_help(ctx.command)      @staticmethod @@ -175,7 +175,7 @@ class Minesweeper(commands.Cog):      @commands.dm_only()      @minesweeper_group.command(name="flag")      async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: -        """Place multiple flags on the board""" +        """Place multiple flags on the board."""          board: GameBoard = self.games[ctx.author.id].revealed          for x, y in coordinates:              if board[y][x] == "hidden": @@ -185,14 +185,14 @@ class Minesweeper(commands.Cog):      @staticmethod      def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None: -        """Reveals all the bombs""" +        """Reveals all the bombs."""          for y, row in enumerate(board):              for x, cell in enumerate(row):                  if cell == "bomb":                      revealed[y][x] = cell      async def lost(self, ctx: commands.Context) -> None: -        """The player lost the game""" +        """The player lost the game."""          game = self.games[ctx.author.id]          self.reveal_bombs(game.revealed, game.board)          await ctx.author.send(":fire: You lost! :fire:") @@ -200,7 +200,7 @@ class Minesweeper(commands.Cog):              await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:")      async def won(self, ctx: commands.Context) -> None: -        """The player won the game""" +        """The player won the game."""          game = self.games[ctx.author.id]          await ctx.author.send(":tada: You won! :tada:")          if game.activated_on_server: @@ -215,8 +215,8 @@ class Minesweeper(commands.Cog):              if board[y_][x_] == 0:                  self.reveal_zeros(revealed, board, x_, y_) -    async def check_if_won(self, ctx, revealed: GameBoard, board: GameBoard) -> bool: -        """Checks if a player has won""" +    async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool: +        """Checks if a player has won."""          if any(              revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb"              for x in range(10) @@ -252,7 +252,7 @@ class Minesweeper(commands.Cog):      @commands.dm_only()      @minesweeper_group.command(name="reveal")      async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: -        """Reveal multiple cells""" +        """Reveal multiple cells."""          game = self.games[ctx.author.id]          revealed: GameBoard = game.revealed          board: GameBoard = game.board @@ -267,8 +267,8 @@ class Minesweeper(commands.Cog):              await self.update_boards(ctx)      @minesweeper_group.command(name="end") -    async def end_command(self, ctx: commands.Context): -        """End your current game""" +    async def end_command(self, ctx: commands.Context) -> None: +        """End your current game."""          game = self.games[ctx.author.id]          game.revealed = game.board          await self.update_boards(ctx) diff --git a/bot/seasons/evergreen/showprojects.py b/bot/seasons/evergreen/showprojects.py index 37809b33..a943e548 100644 --- a/bot/seasons/evergreen/showprojects.py +++ b/bot/seasons/evergreen/showprojects.py @@ -1,5 +1,6 @@  import logging +from discord import Message  from discord.ext import commands  from bot.constants import Channels @@ -8,15 +9,15 @@ log = logging.getLogger(__name__)  class ShowProjects(commands.Cog): -    """Cog that reacts to posts in the #show-your-projects""" +    """Cog that reacts to posts in the #show-your-projects.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.lastPoster = 0  # Given 0 as the default last poster ID as no user can actually have 0 assigned to them      @commands.Cog.listener() -    async def on_message(self, message): -        """Adds reactions to posts in #show-your-projects""" +    async def on_message(self, message: Message) -> None: +        """Adds reactions to posts in #show-your-projects."""          reactions = ["\U0001f44d", "\U00002764", "\U0001f440", "\U0001f389", "\U0001f680", "\U00002b50", "\U0001f6a9"]          if (message.channel.id == Channels.show_your_projects                  and message.author.bot is False @@ -27,7 +28,7 @@ class ShowProjects(commands.Cog):              self.lastPoster = message.author.id -def setup(bot): -    """Show Projects Reaction Cog""" +def setup(bot: commands.Bot) -> None: +    """Show Projects Reaction Cog."""      bot.add_cog(ShowProjects(bot))      log.info("ShowProjects cog loaded") diff --git a/bot/seasons/evergreen/snakes/__init__.py b/bot/seasons/evergreen/snakes/__init__.py index d0e57dae..d7f9f20c 100644 --- a/bot/seasons/evergreen/snakes/__init__.py +++ b/bot/seasons/evergreen/snakes/__init__.py @@ -1,11 +1,13 @@  import logging +from discord.ext import commands +  from bot.seasons.evergreen.snakes.snakes_cog import Snakes  log = logging.getLogger(__name__) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Snakes Cog load."""      bot.add_cog(Snakes(bot))      log.info("Snakes cog loaded") diff --git a/bot/seasons/evergreen/snakes/converter.py b/bot/seasons/evergreen/snakes/converter.py index f2637530..57103b57 100644 --- a/bot/seasons/evergreen/snakes/converter.py +++ b/bot/seasons/evergreen/snakes/converter.py @@ -1,9 +1,10 @@  import json  import logging  import random +from typing import Iterable, List  import discord -from discord.ext.commands import Converter +from discord.ext.commands import Context, Converter  from fuzzywuzzy import fuzz  from bot.seasons.evergreen.snakes.utils import SNAKE_RESOURCES @@ -18,7 +19,7 @@ class Snake(Converter):      snakes = None      special_cases = None -    async def convert(self, ctx, name): +    async def convert(self, ctx: Context, name: str) -> str:          """Convert the input snake name to the closest matching Snake object."""          await self.build_list()          name = name.lower() @@ -26,7 +27,7 @@ class Snake(Converter):          if name == 'python':              return 'Python (programming language)' -        def get_potential(iterable, *, threshold=80): +        def get_potential(iterable: Iterable, *, threshold: int = 80) -> List[str]:              nonlocal name              potential = [] @@ -58,7 +59,7 @@ class Snake(Converter):          return names.get(name, name)      @classmethod -    async def build_list(cls): +    async def build_list(cls) -> None:          """Build list of snakes from the static snake resources."""          # Get all the snakes          if cls.snakes is None: @@ -72,7 +73,7 @@ class Snake(Converter):              cls.special_cases = {snake['name'].lower(): snake for snake in special_cases}      @classmethod -    async def random(cls): +    async def random(cls) -> str:          """          Get a random Snake from the loaded resources. diff --git a/bot/seasons/evergreen/snakes/snakes_cog.py b/bot/seasons/evergreen/snakes/snakes_cog.py index 38878706..1ed38f86 100644 --- a/bot/seasons/evergreen/snakes/snakes_cog.py +++ b/bot/seasons/evergreen/snakes/snakes_cog.py @@ -9,13 +9,13 @@ import textwrap  import urllib  from functools import partial  from io import BytesIO -from typing import Any, Dict +from typing import Any, Dict, List  import aiohttp  import async_timeout  from PIL import Image, ImageDraw, ImageFont  from discord import Colour, Embed, File, Member, Message, Reaction -from discord.ext.commands import BadArgument, Bot, Cog, Context, bot_has_permissions, group +from discord.ext.commands import BadArgument, Bot, Cog, CommandError, Context, bot_has_permissions, group  from bot.constants import ERROR_REPLIES, Tokens  from bot.decorators import locked @@ -154,7 +154,7 @@ class Snakes(Cog):      # region: Helper methods      @staticmethod -    def _beautiful_pastel(hue): +    def _beautiful_pastel(hue: float) -> int:          """Returns random bright pastels."""          light = random.uniform(0.7, 0.85)          saturation = 1 @@ -250,7 +250,7 @@ class Snakes(Cog):          return buffer      @staticmethod -    def _snakify(message): +    def _snakify(message: str) -> str:          """Sssnakifffiesss a sstring."""          # Replace fricatives with exaggerated snake fricatives.          simple_fricatives = [ @@ -272,7 +272,7 @@ class Snakes(Cog):          return message -    async def _fetch(self, session, url, params=None): +    async def _fetch(self, session: aiohttp.ClientSession, url: str, params: dict = None) -> dict:          """Asynchronous web request helper method."""          if params is None:              params = {} @@ -281,7 +281,7 @@ class Snakes(Cog):              async with session.get(url, params=params) as response:                  return await response.json() -    def _get_random_long_message(self, messages, retries=10): +    def _get_random_long_message(self, messages: List[str], retries: int = 10) -> str:          """          Fetch a message that's at least 3 words long, if possible to do so in retries attempts. @@ -403,9 +403,9 @@ class Snakes(Cog):          """Gets a random snake name."""          return random.choice(self.snake_names) -    async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: list): +    async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: list) -> None:          """Validate the answer using a reaction event loop.""" -        def predicate(reaction, user): +        def predicate(reaction: Reaction, user: Member) -> bool:              """Test if the the answer is valid and can be evaluated."""              return (                  reaction.message.id == message.id                  # The reaction is attached to the question we asked. @@ -436,14 +436,14 @@ class Snakes(Cog):      # region: Commands      @group(name='snakes', aliases=('snake',), invoke_without_command=True) -    async def snakes_group(self, ctx: Context): +    async def snakes_group(self, ctx: Context) -> None:          """Commands from our first code jam."""          await ctx.send_help(ctx.command)      @bot_has_permissions(manage_messages=True)      @snakes_group.command(name='antidote')      @locked() -    async def antidote_command(self, ctx: Context): +    async def antidote_command(self, ctx: Context) -> None:          """          Antidote! Can you create the antivenom before the patient dies? @@ -458,7 +458,7 @@ class Snakes(Cog):          This game was created by Lord Bisk and Runew0lf.          """ -        def predicate(reaction_: Reaction, user_: Member): +        def predicate(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -584,7 +584,7 @@ class Snakes(Cog):          await board_id.clear_reactions()      @snakes_group.command(name='draw') -    async def draw_command(self, ctx: Context): +    async def draw_command(self, ctx: Context) -> None:          """          Draws a random snek using Perlin noise. @@ -672,7 +672,7 @@ class Snakes(Cog):      @snakes_group.command(name='guess', aliases=('identify',))      @locked() -    async def guess_command(self, ctx): +    async def guess_command(self, ctx: Context) -> None:          """          Snake identifying game. @@ -706,7 +706,7 @@ class Snakes(Cog):          await self._validate_answer(ctx, guess, answer, options)      @snakes_group.command(name='hatch') -    async def hatch_command(self, ctx: Context): +    async def hatch_command(self, ctx: Context) -> None:          """          Hatches your personal snake. @@ -737,7 +737,7 @@ class Snakes(Cog):          await ctx.channel.send(embed=my_snake_embed)      @snakes_group.command(name='movie') -    async def movie_command(self, ctx: Context): +    async def movie_command(self, ctx: Context) -> None:          """          Gets a random snake-related movie from OMDB. @@ -807,7 +807,7 @@ class Snakes(Cog):      @snakes_group.command(name='quiz')      @locked() -    async def quiz_command(self, ctx: Context): +    async def quiz_command(self, ctx: Context) -> None:          """          Asks a snake-related question in the chat and validates the user's guess. @@ -832,7 +832,7 @@ class Snakes(Cog):          await self._validate_answer(ctx, quiz, answer, options)      @snakes_group.command(name='name', aliases=('name_gen',)) -    async def name_command(self, ctx: Context, *, name: str = None): +    async def name_command(self, ctx: Context, *, name: str = None) -> None:          """          Snakifies a username. @@ -904,7 +904,7 @@ class Snakes(Cog):      @snakes_group.command(name='sal')      @locked() -    async def sal_command(self, ctx: Context): +    async def sal_command(self, ctx: Context) -> None:          """          Play a game of Snakes and Ladders. @@ -922,7 +922,7 @@ class Snakes(Cog):          await game.open_game()      @snakes_group.command(name='about') -    async def about_command(self, ctx: Context): +    async def about_command(self, ctx: Context) -> None:          """Show an embed with information about the event, its participants, and its winners."""          contributors = [              "<@!245270749919576066>", @@ -965,7 +965,7 @@ class Snakes(Cog):          await ctx.channel.send(embed=embed)      @snakes_group.command(name='card') -    async def card_command(self, ctx: Context, *, name: Snake = None): +    async def card_command(self, ctx: Context, *, name: Snake = None) -> None:          """          Create an interesting little card from a snake. @@ -1003,7 +1003,7 @@ class Snakes(Cog):          )      @snakes_group.command(name='fact') -    async def fact_command(self, ctx: Context): +    async def fact_command(self, ctx: Context) -> None:          """          Gets a snake-related fact. @@ -1019,7 +1019,7 @@ class Snakes(Cog):          await ctx.channel.send(embed=embed)      @snakes_group.command(name='snakify') -    async def snakify_command(self, ctx: Context, *, message: str = None): +    async def snakify_command(self, ctx: Context, *, message: str = None) -> None:          """          How would I talk if I were a snake? @@ -1060,7 +1060,7 @@ class Snakes(Cog):              await ctx.channel.send(embed=embed)      @snakes_group.command(name='video', aliases=('get_video',)) -    async def video_command(self, ctx: Context, *, search: str = None): +    async def video_command(self, ctx: Context, *, search: str = None) -> None:          """          Gets a YouTube video about snakes. @@ -1100,7 +1100,7 @@ class Snakes(Cog):              log.warning(f"YouTube API error. Full response looks like {response}")      @snakes_group.command(name='zen') -    async def zen_command(self, ctx: Context): +    async def zen_command(self, ctx: Context) -> None:          """          Gets a random quote from the Zen of Python, except as if spoken by a snake. @@ -1127,7 +1127,7 @@ class Snakes(Cog):      @get_command.error      @card_command.error      @video_command.error -    async def command_error(self, ctx, error): +    async def command_error(self, ctx: Context, error: CommandError) -> None:          """Local error handler for the Snake Cog."""          embed = Embed()          embed.colour = Colour.red() diff --git a/bot/seasons/evergreen/snakes/utils.py b/bot/seasons/evergreen/snakes/utils.py index e8d2ee44..7d6caf04 100644 --- a/bot/seasons/evergreen/snakes/utils.py +++ b/bot/seasons/evergreen/snakes/utils.py @@ -11,7 +11,9 @@ from typing import List, Tuple  from PIL import Image  from PIL.ImageDraw import ImageDraw  from discord import File, Member, Reaction -from discord.ext.commands import Context +from discord.ext.commands import Cog, Context + +from bot.constants import Roles  SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() @@ -116,12 +118,12 @@ def get_resource(file: str) -> List[dict]:          return json.load(snakefile) -def smoothstep(t): +def smoothstep(t: float) -> float:      """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating."""      return t * t * (3. - 2. * t) -def lerp(t, a, b): +def lerp(t: float, a: float, b: float) -> float:      """Linear interpolation between a and b, given a fraction t."""      return a + t * (b - a) @@ -138,7 +140,7 @@ class PerlinNoiseFactory(object):      Licensed under ISC      """ -    def __init__(self, dimension, octaves=1, tile=(), unbias=False): +    def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False):          """          Create a new Perlin noise factory in the given number of dimensions. @@ -152,7 +154,7 @@ class PerlinNoiseFactory(object):          This will produce noise that tiles every 3 units vertically, but never tiles horizontally. -        If ``unbias`` is true, the smoothstep function will be applied to the output before returning +        If ``unbias`` is True, the smoothstep function will be applied to the output before returning          it, to counteract some of Perlin noise's significant bias towards the center of its output range.          """          self.dimension = dimension @@ -166,7 +168,7 @@ class PerlinNoiseFactory(object):          self.gradient = {} -    def _generate_gradient(self): +    def _generate_gradient(self) -> Tuple[float, ...]:          """          Generate a random unit vector at each grid point. @@ -186,7 +188,7 @@ class PerlinNoiseFactory(object):          scale = sum(n * n for n in random_point) ** -0.5          return tuple(coord * scale for coord in random_point) -    def get_plain_noise(self, *point): +    def get_plain_noise(self, *point) -> float:          """Get plain noise for a single point, without taking into account either octaves or tiling."""          if len(point) != self.dimension:              raise ValueError("Expected {0} values, got {1}".format( @@ -234,7 +236,7 @@ class PerlinNoiseFactory(object):          return dots[0] * self.scale_factor -    def __call__(self, *point): +    def __call__(self, *point) -> float:          """          Get the value of this Perlin noise function at the given point. @@ -367,7 +369,7 @@ GAME_SCREEN_EMOJI = [  class SnakeAndLaddersGame:      """Snakes and Ladders game Cog.""" -    def __init__(self, snakes, context: Context): +    def __init__(self, snakes: Cog, context: Context):          self.snakes = snakes          self.ctx = context          self.channel = self.ctx.channel @@ -382,14 +384,13 @@ class SnakeAndLaddersGame:          self.positions = None          self.rolls = [] -    async def open_game(self): +    async def open_game(self) -> None:          """          Create a new Snakes and Ladders game. -        Listen for reactions until players have joined, -        and the game has been started. +        Listen for reactions until players have joined, and the game has been started.          """ -        def startup_event_check(reaction_: Reaction, user_: Member): +        def startup_event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -412,7 +413,6 @@ class SnakeAndLaddersGame:              "**Snakes and Ladders**: A new game is about to start!",              file=File(                  str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), -                # os.path.join("bot", "resources", "snakes", "snakes_and_ladders", "banner.jpg"),                  filename='Snakes and Ladders.jpg'              )          ) @@ -435,8 +435,9 @@ class SnakeAndLaddersGame:                  if reaction.emoji == JOIN_EMOJI:                      await self.player_join(user)                  elif reaction.emoji == CANCEL_EMOJI: -                    if self.ctx.author == user: -                        await self.cancel_game(user) +                    if user == self.author or (self._is_moderator(user) and user not in self.players): +                        # Allow game author or non-playing moderation staff to cancel a waiting game +                        await self.cancel_game()                          return                      else:                          await self.player_leave(user) @@ -451,10 +452,11 @@ class SnakeAndLaddersGame:              except asyncio.TimeoutError:                  log.debug("Snakes and Ladders timed out waiting for a reaction") -                self.cancel_game(self.author) +                await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes -    async def _add_player(self, user: Member): +    async def _add_player(self, user: Member) -> None: +        """Add player to game."""          self.players.append(user)          self.player_tiles[user.id] = 1 @@ -462,7 +464,7 @@ class SnakeAndLaddersGame:          im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE))          self.avatar_images[user.id] = im -    async def player_join(self, user: Member): +    async def player_join(self, user: Member) -> None:          """          Handle players joining the game. @@ -488,20 +490,16 @@ class SnakeAndLaddersGame:              delete_after=10          ) -    async def player_leave(self, user: Member): +    async def player_leave(self, user: Member) -> bool:          """          Handle players leaving the game. -        Leaving is prevented if the user initiated the game or if they weren't part of it in the -        first place. +        Leaving is prevented if the user wasn't part of the game. + +        If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean +        is returned True to prevent a game from continuing after it's destroyed.          """ -        if user == self.author: -            await self.channel.send( -                user.mention + " You are the author, and cannot leave the game. Execute " -                "`sal cancel` to cancel the game.", -                delete_after=10 -            ) -            return +        is_surrendered = False  # Sentinel value to assist with stopping a surrendered game          for p in self.players:              if user == p:                  self.players.remove(p) @@ -512,47 +510,43 @@ class SnakeAndLaddersGame:                      delete_after=10                  ) -                if self.state != 'waiting' and len(self.players) == 1: +                if self.state != 'waiting' and len(self.players) == 0:                      await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") +                    is_surrendered = True                      self._destruct() -                return -        await self.channel.send(user.mention + " You are not in the match.", delete_after=10) -    async def cancel_game(self, user: Member): -        """Allow the game author to cancel the running game.""" -        if not user == self.author: -            await self.channel.send(user.mention + " Only the author of the game can cancel it.", delete_after=10) -            return +                return is_surrendered +        else: +            await self.channel.send(user.mention + " You are not in the match.", delete_after=10) +            return is_surrendered + +    async def cancel_game(self) -> None: +        """Cancel the running game."""          await self.channel.send("**Snakes and Ladders**: Game has been canceled.")          self._destruct() -    async def start_game(self, user: Member): +    async def start_game(self, user: Member) -> None:          """          Allow the game author to begin the game. -        The game cannot be started if there aren't enough players joined or if the game is in a -        waiting state. +        The game cannot be started if the game is in a waiting state.          """          if not user == self.author:              await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10)              return -        if len(self.players) < 1: -            await self.channel.send( -                user.mention + " A minimum of 2 players is required to start the game.", -                delete_after=10 -            ) -            return +          if not self.state == 'waiting':              await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10)              return +          self.state = 'starting'          player_list = ', '.join(user.mention for user in self.players)          await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list)          await self.start_round() -    async def start_round(self): +    async def start_round(self) -> None:          """Begin the round.""" -        def game_event_check(reaction_: Reaction, user_: Member): +        def game_event_check(reaction_: Reaction, user_: Member) -> bool:              """Make sure that this reaction is what we want to operate on."""              return (                  all(( @@ -565,8 +559,6 @@ class SnakeAndLaddersGame:          self.state = 'roll'          for user in self.players:              self.round_has_rolled[user.id] = False -        # board_img = Image.open(os.path.join( -        #     "bot", "resources", "snakes", "snakes_and_ladders", "board.jpg"))          board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg"))          player_row_size = math.ceil(MAX_PLAYERS / 2) @@ -612,6 +604,7 @@ class SnakeAndLaddersGame:          for emoji in GAME_SCREEN_EMOJI:              await self.positions.add_reaction(emoji) +        is_surrendered = False          while True:              try:                  reaction, user = await self.ctx.bot.wait_for( @@ -623,11 +616,12 @@ class SnakeAndLaddersGame:                  if reaction.emoji == ROLL_EMOJI:                      await self.player_roll(user)                  elif reaction.emoji == CANCEL_EMOJI: -                    if self.ctx.author == user: -                        await self.cancel_game(user) +                    if self._is_moderator(user) and user not in self.players: +                        # Only allow non-playing moderation staff to cancel a running game +                        await self.cancel_game()                          return                      else: -                        await self.player_leave(user) +                        is_surrendered = await self.player_leave(user)                  await self.positions.remove_reaction(reaction.emoji, user) @@ -636,13 +630,16 @@ class SnakeAndLaddersGame:              except asyncio.TimeoutError:                  log.debug("Snakes and Ladders timed out waiting for a reaction") -                await self.cancel_game(self.author) +                await self.cancel_game()                  return  # We're done, no reactions for the last 5 minutes          # Round completed -        await self._complete_round() +        # Check to see if the game was surrendered before completing the round, without this +        # sentinel, the game object would be deleted but the next round still posted into purgatory +        if not is_surrendered: +            await self._complete_round() -    async def player_roll(self, user: Member): +    async def player_roll(self, user: Member) -> None:          """Handle the player's roll."""          if user.id not in self.player_tiles:              await self.channel.send(user.mention + " You are not in the match.", delete_after=10) @@ -674,7 +671,8 @@ class SnakeAndLaddersGame:          self.player_tiles[user.id] = min(100, next_tile)          self.round_has_rolled[user.id] = True -    async def _complete_round(self): +    async def _complete_round(self) -> None: +        """At the conclusion of a round check to see if there's been a winner."""          self.state = 'post_round'          # check for winner @@ -689,22 +687,30 @@ class SnakeAndLaddersGame:          self._destruct()      def _check_winner(self) -> Member: +        """Return a winning member if we're in the post-round state and there's a winner."""          if self.state != 'post_round':              return None          return next((player for player in self.players if self.player_tiles[player.id] == 100),                      None) -    def _check_all_rolled(self): +    def _check_all_rolled(self) -> bool: +        """Check if all members have made their roll."""          return all(rolled for rolled in self.round_has_rolled.values()) -    def _destruct(self): +    def _destruct(self) -> None: +        """Clean up the finished game object."""          del self.snakes.active_sal[self.channel] -    def _board_coordinate_from_index(self, index: int): -        # converts the tile number to the x/y coordinates for graphical purposes +    def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]: +        """Convert the tile number to the x/y coordinates for graphical purposes."""          y_level = 9 - math.floor((index - 1) / 10)          is_reversed = math.floor((index - 1) / 10) % 2 != 0          x_level = (index - 1) % 10          if is_reversed:              x_level = 9 - x_level          return x_level, y_level + +    @staticmethod +    def _is_moderator(user: Member) -> bool: +        """Return True if the user is a Moderator.""" +        return any(Roles.moderator == role.id for role in user.roles) diff --git a/bot/seasons/evergreen/speedrun.py b/bot/seasons/evergreen/speedrun.py index f6a43a63..76c5e8d3 100644 --- a/bot/seasons/evergreen/speedrun.py +++ b/bot/seasons/evergreen/speedrun.py @@ -13,16 +13,16 @@ with Path('bot/resources/evergreen/speedrun_links.json').open(encoding="utf-8")  class Speedrun(commands.Cog):      """Commands about the video game speedrunning community.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(name="speedrun") -    async def get_speedrun(self, ctx): +    async def get_speedrun(self, ctx: commands.Context) -> None:          """Sends a link to a video of a random speedrun."""          await ctx.send(choice(LINKS)) -def setup(bot): -    """Load the Speedrun cog""" +def setup(bot: commands.Bot) -> None: +    """Load the Speedrun cog."""      bot.add_cog(Speedrun(bot))      log.info("Speedrun cog loaded") diff --git a/bot/seasons/evergreen/uptime.py b/bot/seasons/evergreen/uptime.py index 92066e0a..6f24f545 100644 --- a/bot/seasons/evergreen/uptime.py +++ b/bot/seasons/evergreen/uptime.py @@ -12,11 +12,11 @@ log = logging.getLogger(__name__)  class Uptime(commands.Cog):      """A cog for posting the bot's uptime.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(name="uptime") -    async def uptime(self, ctx): +    async def uptime(self, ctx: commands.Context) -> None:          """Responds with the uptime of the bot."""          difference = relativedelta(start_time - arrow.utcnow())          uptime_string = start_time.shift( @@ -28,7 +28,7 @@ class Uptime(commands.Cog):          await ctx.send(f"I started up {uptime_string}.") -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Uptime Cog load."""      bot.add_cog(Uptime(bot))      log.info("Uptime cog loaded") diff --git a/bot/seasons/halloween/8ball.py b/bot/seasons/halloween/8ball.py index faf59ca9..2e1c2804 100644 --- a/bot/seasons/halloween/8ball.py +++ b/bot/seasons/halloween/8ball.py @@ -15,11 +15,11 @@ with open(Path("bot/resources/halloween/responses.json"), "r", encoding="utf8")  class SpookyEightBall(commands.Cog):      """Spooky Eightball answers.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(aliases=('spooky8ball',)) -    async def spookyeightball(self, ctx, *, question: str): +    async def spookyeightball(self, ctx: commands.Context, *, question: str) -> None:          """Responds with a random response to a question."""          choice = random.choice(responses['responses'])          msg = await ctx.send(choice[0]) @@ -28,7 +28,7 @@ class SpookyEightBall(commands.Cog):              await msg.edit(content=f"{choice[0]} \n{choice[1]}") -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Spooky Eight Ball Cog Load."""      bot.add_cog(SpookyEightBall(bot))      log.info("SpookyEightBall cog loaded") diff --git a/bot/seasons/halloween/__init__.py b/bot/seasons/halloween/__init__.py index aff51423..c81879d7 100644 --- a/bot/seasons/halloween/__init__.py +++ b/bot/seasons/halloween/__init__.py @@ -3,16 +3,22 @@ from bot.seasons import SeasonBase  class Halloween(SeasonBase): -    """Halloween Seasonal event attributes.""" +    """ +    Halloween Seasonal event attributes. + +    Announcement for this cog temporarily disabled, since we're doing a custom +    Hacktoberfest announcement. If you're enabling the announcement again, +    make sure to update this docstring accordingly. +    """      name = "halloween" -    bot_name = "Spookybot" +    bot_name = "NeonBot"      greeting = "Happy Halloween!"      start_date = "01/10" -    end_date = "31/10" +    end_date = "01/11" -    colour = Colours.orange +    colour = Colours.pink      icon = ( -        "/logos/logo_seasonal/halloween/spooky.png", +        "/logos/logo_seasonal/hacktober/hacktoberfest.png",      ) diff --git a/bot/seasons/halloween/candy_collection.py b/bot/seasons/halloween/candy_collection.py index d35cbee5..64da7ced 100644 --- a/bot/seasons/halloween/candy_collection.py +++ b/bot/seasons/halloween/candy_collection.py @@ -3,6 +3,7 @@ import json  import logging  import os  import random +from typing import List, Union  import discord  from discord.ext import commands @@ -23,7 +24,7 @@ ADD_SKULL_EXISTING_REACTION_CHANCE = 20  # 5%  class CandyCollection(commands.Cog):      """Candy collection game Cog.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          with open(json_location) as candy:              self.candy_json = json.load(candy) @@ -34,7 +35,7 @@ class CandyCollection(commands.Cog):              self.get_candyinfo[userid] = userinfo      @commands.Cog.listener() -    async def on_message(self, message): +    async def on_message(self, message: discord.Message) -> None:          """Randomly adds candy or skull reaction to non-bot messages in the Event channel."""          # make sure its a human message          if message.author.bot: @@ -55,7 +56,7 @@ class CandyCollection(commands.Cog):              return await message.add_reaction('\N{CANDY}')      @commands.Cog.listener() -    async def on_reaction_add(self, reaction, user): +    async def on_reaction_add(self, reaction: discord.Reaction, user: discord.Member) -> None:          """Add/remove candies from a person if the reaction satisfies criteria."""          message = reaction.message          # check to ensure the reactor is human @@ -101,7 +102,7 @@ class CandyCollection(commands.Cog):                          self.candy_json['records'].append(d)                  await self.remove_reactions(reaction) -    async def reacted_msg_chance(self, message): +    async def reacted_msg_chance(self, message: discord.Message) -> None:          """          Randomly add a skull or candy reaction to a message if there is a reaction there already. @@ -118,24 +119,25 @@ class CandyCollection(commands.Cog):              self.msg_reacted.append(d)              return await message.add_reaction('\N{CANDY}') -    async def ten_recent_msg(self): +    async def ten_recent_msg(self) -> List[int]:          """Get the last 10 messages sent in the channel."""          ten_recent = [] -        recent_msg = max(message.id for message -                         in self.bot._connection._messages -                         if message.channel.id == Channels.seasonalbot_chat) +        recent_msg_id = max( +            message.id for message in self.bot._connection._messages +            if message.channel.id == Channels.seasonalbot_chat +        )          channel = await self.hacktober_channel() -        ten_recent.append(recent_msg.id) +        ten_recent.append(recent_msg_id)          for i in range(9): -            o = discord.Object(id=recent_msg.id + i) +            o = discord.Object(id=recent_msg_id + i)              msg = await next(channel.history(limit=1, before=o))              ten_recent.append(msg.id)          return ten_recent -    async def get_message(self, msg_id): +    async def get_message(self, msg_id: int) -> Union[discord.Message, None]:          """Get the message from its ID."""          try:              o = discord.Object(id=msg_id + 1) @@ -151,11 +153,11 @@ class CandyCollection(commands.Cog):          except Exception:              return None -    async def hacktober_channel(self): +    async def hacktober_channel(self) -> discord.TextChannel:          """Get #hacktoberbot channel from its ID."""          return self.bot.get_channel(id=Channels.seasonalbot_chat) -    async def remove_reactions(self, reaction): +    async def remove_reactions(self, reaction: discord.Reaction) -> None:          """Remove all candy/skull reactions."""          try:              async for user in reaction.users(): @@ -164,20 +166,20 @@ class CandyCollection(commands.Cog):          except discord.HTTPException:              pass -    async def send_spook_msg(self, author, channel, candies): +    async def send_spook_msg(self, author: discord.Member, channel: discord.TextChannel, candies: int) -> None:          """Send a spooky message."""          e = discord.Embed(colour=author.colour)          e.set_author(name="Ghosts and Ghouls and Jack o' lanterns at night; "                            f"I took {candies} candies and quickly took flight.")          await channel.send(embed=e) -    def save_to_json(self): +    def save_to_json(self) -> None:          """Save JSON to a local file."""          with open(json_location, 'w') as outfile:              json.dump(self.candy_json, outfile)      @commands.command() -    async def candy(self, ctx): +    async def candy(self, ctx: commands.Context) -> None:          """Get the candy leaderboard and save to JSON."""          # Use run_in_executor to prevent blocking          thing = functools.partial(self.save_to_json) @@ -213,7 +215,7 @@ class CandyCollection(commands.Cog):          await ctx.send(embed=e) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Candy Collection game Cog load."""      bot.add_cog(CandyCollection(bot))      log.info("CandyCollection cog loaded") diff --git a/bot/seasons/halloween/hacktoberstats.py b/bot/seasons/halloween/hacktoberstats.py index b6b5a900..20797037 100644 --- a/bot/seasons/halloween/hacktoberstats.py +++ b/bot/seasons/halloween/hacktoberstats.py @@ -1,32 +1,33 @@  import json  import logging  import re -import typing  from collections import Counter  from datetime import datetime  from pathlib import Path +from typing import List, Tuple  import aiohttp  import discord  from discord.ext import commands +from bot.utils.persist import make_persistent +  log = logging.getLogger(__name__) +CURRENT_YEAR = datetime.now().year  # Used to construct GH API query +PRS_FOR_SHIRT = 4  # Minimum number of PRs before a shirt is awarded +  class HacktoberStats(commands.Cog):      """Hacktoberfest statistics Cog.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot -        self.link_json = Path("bot/resources/github_links.json") +        self.link_json = make_persistent(Path("bot", "resources", "halloween", "github_links.json"))          self.linked_accounts = self.load_linked_users() -    @commands.group( -        name='hacktoberstats', -        aliases=('hackstats',), -        invoke_without_command=True -    ) -    async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None): +    @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True) +    async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None) -> None:          """          Display an embed for a user's Hacktoberfest contributions. @@ -43,8 +44,8 @@ class HacktoberStats(commands.Cog):              else:                  msg = (                      f"{author_mention}, you have not linked a GitHub account\n\n" -                    f"You can link your GitHub account using:\n```{ctx.prefix}stats link github_username```\n" -                    f"Or query GitHub stats directly using:\n```{ctx.prefix}stats github_username```" +                    f"You can link your GitHub account using:\n```{ctx.prefix}hackstats link github_username```\n" +                    f"Or query GitHub stats directly using:\n```{ctx.prefix}hackstats github_username```"                  )                  await ctx.send(msg)                  return @@ -52,7 +53,7 @@ class HacktoberStats(commands.Cog):          await self.get_stats(ctx, github_username)      @hacktoberstats_group.command(name="link") -    async def link_user(self, ctx: commands.Context, github_username: str = None): +    async def link_user(self, ctx: commands.Context, github_username: str = None) -> None:          """          Link the invoking user's Github github_username to their Discord ID. @@ -85,7 +86,7 @@ class HacktoberStats(commands.Cog):              await ctx.send(f"{author_mention}, a GitHub username is required to link your account")      @hacktoberstats_group.command(name="unlink") -    async def unlink_user(self, ctx: commands.Context): +    async def unlink_user(self, ctx: commands.Context) -> None:          """Remove the invoking user's account link from the log."""          author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) @@ -99,7 +100,7 @@ class HacktoberStats(commands.Cog):          self.save_linked_users() -    def load_linked_users(self) -> typing.Dict: +    def load_linked_users(self) -> dict:          """          Load list of linked users from local JSON file. @@ -122,7 +123,7 @@ class HacktoberStats(commands.Cog):              logging.info(f"Linked account log: '{self.link_json}' does not exist")              return {} -    def save_linked_users(self): +    def save_linked_users(self) -> None:          """          Save list of linked users to local JSON file. @@ -139,7 +140,7 @@ class HacktoberStats(commands.Cog):              json.dump(self.linked_accounts, fID, default=str)          logging.info(f"linked_accounts saved to '{self.link_json}'") -    async def get_stats(self, ctx: commands.Context, github_username: str): +    async def get_stats(self, ctx: commands.Context, github_username: str) -> None:          """          Query GitHub's API for PRs created by a GitHub user during the month of October. @@ -158,18 +159,18 @@ class HacktoberStats(commands.Cog):              else:                  await ctx.send(f"No October GitHub contributions found for '{github_username}'") -    def build_embed(self, github_username: str, prs: typing.List[dict]) -> discord.Embed: +    def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed:          """Return a stats embed built from github_username's PRs."""          logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'")          pr_stats = self._summarize_prs(prs)          n = pr_stats['n_prs'] -        if n >= 5: +        if n >= PRS_FOR_SHIRT:              shirtstr = f"**{github_username} has earned a tshirt!**" -        elif n == 4: +        elif n == PRS_FOR_SHIRT - 1:              shirtstr = f"**{github_username} is 1 PR away from a tshirt!**"          else: -            shirtstr = f"**{github_username} is {5 - n} PRs away from a tshirt!**" +            shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a tshirt!**"          stats_embed = discord.Embed(              title=f"{github_username}'s Hacktoberfest", @@ -186,7 +187,7 @@ class HacktoberStats(commands.Cog):          stats_embed.set_author(              name="Hacktoberfest",              url="https://hacktoberfest.digitalocean.com", -            icon_url="https://hacktoberfest.digitalocean.com/assets/logo-hacktoberfest.png" +            icon_url="https://hacktoberfest.digitalocean.com/pretty_logo.png"          )          stats_embed.add_field(              name="Top 5 Repositories:", @@ -197,7 +198,7 @@ class HacktoberStats(commands.Cog):          return stats_embed      @staticmethod -    async def get_october_prs(github_username: str) -> typing.List[dict]: +    async def get_october_prs(github_username: str) -> List[dict]:          """          Query GitHub's API for PRs created during the month of October by github_username. @@ -219,7 +220,7 @@ class HacktoberStats(commands.Cog):          not_label = "invalid"          action_type = "pr"          is_query = f"public+author:{github_username}" -        date_range = "2018-10-01..2018-10-31" +        date_range = f"{CURRENT_YEAR}-10-01..{CURRENT_YEAR}-10-31"          per_page = "300"          query_url = (              f"{base_url}" @@ -274,7 +275,7 @@ class HacktoberStats(commands.Cog):          return re.findall(exp, in_url)[0]      @staticmethod -    def _summarize_prs(prs: typing.List[dict]) -> typing.Dict: +    def _summarize_prs(prs: List[dict]) -> dict:          """          Generate statistics from an input list of PR dictionaries, as output by get_october_prs. @@ -288,7 +289,7 @@ class HacktoberStats(commands.Cog):          return {"n_prs": len(prs), "top5": Counter(contributed_repos).most_common(5)}      @staticmethod -    def _build_top5str(stats: typing.List[tuple]) -> str: +    def _build_top5str(stats: List[tuple]) -> str:          """          Build a string from the Top 5 contributions that is compatible with a discord.Embed field. @@ -316,7 +317,7 @@ class HacktoberStats(commands.Cog):              return "contributions"      @staticmethod -    def _author_mention_from_context(ctx: commands.Context) -> typing.Tuple: +    def _author_mention_from_context(ctx: commands.Context) -> Tuple:          """Return stringified Message author ID and mentionable string from commands.Context."""          author_id = str(ctx.message.author.id)          author_mention = ctx.message.author.mention @@ -324,7 +325,7 @@ class HacktoberStats(commands.Cog):          return author_id, author_mention -def setup(bot): +def setup(bot):  # Noqa      """Hacktoberstats Cog load."""      bot.add_cog(HacktoberStats(bot))      log.info("HacktoberStats cog loaded") diff --git a/bot/seasons/halloween/halloween_facts.py b/bot/seasons/halloween/halloween_facts.py index f09aa4ad..f8610bd3 100644 --- a/bot/seasons/halloween/halloween_facts.py +++ b/bot/seasons/halloween/halloween_facts.py @@ -3,6 +3,7 @@ import logging  import random  from datetime import timedelta  from pathlib import Path +from typing import Tuple  import discord  from discord.ext import commands @@ -28,7 +29,7 @@ INTERVAL = timedelta(hours=6).total_seconds()  class HalloweenFacts(commands.Cog):      """A Cog for displaying interesting facts about Halloween.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          with open(Path("bot/resources/halloween/halloween_facts.json"), "r") as file:              self.halloween_facts = json.load(file) @@ -37,31 +38,31 @@ class HalloweenFacts(commands.Cog):          random.shuffle(self.facts)      @commands.Cog.listener() -    async def on_ready(self): +    async def on_ready(self) -> None:          """Get event Channel object and initialize fact task loop."""          self.channel = self.bot.get_channel(Channels.seasonalbot_chat)          self.bot.loop.create_task(self._fact_publisher_task()) -    def random_fact(self): +    def random_fact(self) -> Tuple[int, str]:          """Return a random fact from the loaded facts."""          return random.choice(self.facts)      @commands.command(name="spookyfact", aliases=("halloweenfact",), brief="Get the most recent Halloween fact") -    async def get_random_fact(self, ctx): +    async def get_random_fact(self, ctx: commands.Context) -> None:          """Reply with the most recent Halloween fact."""          index, fact = self.random_fact()          embed = self._build_embed(index, fact)          await ctx.send(embed=embed)      @staticmethod -    def _build_embed(index, fact): +    def _build_embed(index: int, fact: str) -> discord.Embed:          """Builds a Discord embed from the given fact and its index."""          emoji = random.choice(SPOOKY_EMOJIS)          title = f"{emoji} Halloween Fact #{index + 1}"          return discord.Embed(title=title, description=fact, color=PUMPKIN_ORANGE) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Halloween facts Cog load."""      bot.add_cog(HalloweenFacts(bot))      log.info("HalloweenFacts cog loaded") diff --git a/bot/seasons/halloween/halloweenify.py b/bot/seasons/halloween/halloweenify.py index 334781ab..dfcc2b1e 100644 --- a/bot/seasons/halloween/halloweenify.py +++ b/bot/seasons/halloween/halloweenify.py @@ -13,12 +13,12 @@ log = logging.getLogger(__name__)  class Halloweenify(commands.Cog):      """A cog to change a invokers nickname to a spooky one!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.cooldown(1, 300, BucketType.user)      @commands.command() -    async def halloweenify(self, ctx): +    async def halloweenify(self, ctx: commands.Context) -> None:          """Change your nickname into a much spookier one!"""          async with ctx.typing():              with open(Path("bot/resources/halloween/halloweenify.json"), "r") as f: @@ -46,7 +46,7 @@ class Halloweenify(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Halloweenify Cog load."""      bot.add_cog(Halloweenify(bot))      log.info("Halloweenify cog loaded") diff --git a/bot/seasons/halloween/monstersurvey.py b/bot/seasons/halloween/monstersurvey.py index 173ce8eb..12e1d022 100644 --- a/bot/seasons/halloween/monstersurvey.py +++ b/bot/seasons/halloween/monstersurvey.py @@ -30,7 +30,7 @@ class MonsterSurvey(Cog):          with open(self.registry_location, 'r') as jason:              self.voter_registry = json.load(jason) -    def json_write(self): +    def json_write(self) -> None:          """Write voting results to a local JSON file."""          log.info("Saved Monster Survey Results")          with open(self.registry_location, 'w') as jason: @@ -50,7 +50,7 @@ class MonsterSurvey(Cog):                  if id in vr[m]['votes'] and m != monster:                      vr[m]['votes'].remove(id) -    def get_name_by_leaderboard_index(self, n): +    def get_name_by_leaderboard_index(self, n: int) -> str:          """Return the monster at the specified leaderboard index."""          n = n - 1          vr = self.voter_registry @@ -60,9 +60,9 @@ class MonsterSurvey(Cog):      @commands.group(          name='monster', -        aliases=('ms',) +        aliases=('mon',)      ) -    async def monster_group(self, ctx: Context): +    async def monster_group(self, ctx: Context) -> None:          """The base voting command. If nothing is called, then it will return an embed."""          if ctx.invoked_subcommand is None:              async with ctx.typing(): @@ -92,7 +92,7 @@ class MonsterSurvey(Cog):      @monster_group.command(          name='vote'      ) -    async def monster_vote(self, ctx: Context, name=None): +    async def monster_vote(self, ctx: Context, name: str = None) -> None:          """          Cast a vote for a particular monster. @@ -143,7 +143,7 @@ class MonsterSurvey(Cog):      @monster_group.command(          name='show'      ) -    async def monster_show(self, ctx: Context, name=None) -> None: +    async def monster_show(self, ctx: Context, name: str = None) -> None:          """Shows the named monster. If one is not named, it sends the default voting embed instead."""          if name is None:              await ctx.invoke(self.monster_leaderboard) @@ -200,7 +200,7 @@ class MonsterSurvey(Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None:      """Monster survey Cog load."""      bot.add_cog(MonsterSurvey(bot))      log.info("MonsterSurvey cog loaded") diff --git a/bot/seasons/halloween/scarymovie.py b/bot/seasons/halloween/scarymovie.py index cd95a3a2..3823a3e4 100644 --- a/bot/seasons/halloween/scarymovie.py +++ b/bot/seasons/halloween/scarymovie.py @@ -16,11 +16,11 @@ TMDB_TOKEN = environ.get('TMDB_TOKEN')  class ScaryMovie(commands.Cog):      """Selects a random scary movie and embeds info into Discord chat.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(name='scarymovie', alias=['smovie']) -    async def random_movie(self, ctx): +    async def random_movie(self, ctx: commands.Context) -> None:          """Randomly select a scary movie and display information about it."""          async with ctx.typing():              selection = await self.select_movie() @@ -29,7 +29,7 @@ class ScaryMovie(commands.Cog):          await ctx.send(embed=movie_details)      @staticmethod -    async def select_movie(): +    async def select_movie() -> dict:          """Selects a random movie and returns a JSON of movie details from TMDb."""          url = 'https://api.themoviedb.org/4/discover/movie'          params = { @@ -62,7 +62,7 @@ class ScaryMovie(commands.Cog):              return await selection.json()      @staticmethod -    async def format_metadata(movie): +    async def format_metadata(movie: dict) -> Embed:          """Formats raw TMDb data to be embedded in Discord chat."""          # Build the relevant URLs.          movie_id = movie.get("id") @@ -126,7 +126,7 @@ class ScaryMovie(commands.Cog):          return embed -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Scary movie Cog load."""      bot.add_cog(ScaryMovie(bot))      log.info("ScaryMovie cog loaded") diff --git a/bot/seasons/halloween/spookyavatar.py b/bot/seasons/halloween/spookyavatar.py index 9bdef1a8..268de3fb 100644 --- a/bot/seasons/halloween/spookyavatar.py +++ b/bot/seasons/halloween/spookyavatar.py @@ -15,10 +15,10 @@ log = logging.getLogger(__name__)  class SpookyAvatar(commands.Cog):      """A cog that spookifies an avatar.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot -    async def get(self, url): +    async def get(self, url: str) -> bytes:          """Returns the contents of the supplied URL."""          async with aiohttp.ClientSession() as session:              async with session.get(url) as resp: @@ -26,7 +26,7 @@ class SpookyAvatar(commands.Cog):      @commands.command(name='savatar', aliases=('spookyavatar', 'spookify'),                        brief='Spookify an user\'s avatar.') -    async def spooky_avatar(self, ctx, user: discord.Member = None): +    async def spooky_avatar(self, ctx: commands.Context, user: discord.Member = None) -> None:          """A command to print the user's spookified avatar."""          if user is None:              user = ctx.message.author @@ -47,7 +47,7 @@ class SpookyAvatar(commands.Cog):          os.remove(str(ctx.message.id)+'.png') -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Spooky avatar Cog load."""      bot.add_cog(SpookyAvatar(bot))      log.info("SpookyAvatar cog loaded") diff --git a/bot/seasons/halloween/spookygif.py b/bot/seasons/halloween/spookygif.py index ba2ad6e5..818de8cd 100644 --- a/bot/seasons/halloween/spookygif.py +++ b/bot/seasons/halloween/spookygif.py @@ -12,11 +12,11 @@ log = logging.getLogger(__name__)  class SpookyGif(commands.Cog):      """A cog to fetch a random spooky gif from the web!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(name="spookygif", aliases=("sgif", "scarygif")) -    async def spookygif(self, ctx): +    async def spookygif(self, ctx: commands.Context) -> None:          """Fetches a random gif from the GIPHY API and responds with it."""          async with ctx.typing():              async with aiohttp.ClientSession() as session: @@ -33,7 +33,7 @@ class SpookyGif(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Spooky GIF Cog load."""      bot.add_cog(SpookyGif(bot))      log.info("SpookyGif cog loaded") diff --git a/bot/seasons/halloween/spookyrating.py b/bot/seasons/halloween/spookyrating.py index 08c17a27..7f78f536 100644 --- a/bot/seasons/halloween/spookyrating.py +++ b/bot/seasons/halloween/spookyrating.py @@ -17,15 +17,15 @@ with Path("bot/resources/halloween/spooky_rating.json").open() as file:  class SpookyRating(commands.Cog): -    """A cog for calculating one's spooky rating""" +    """A cog for calculating one's spooky rating.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.local_random = random.Random()      @commands.command()      @commands.cooldown(rate=1, per=5, type=commands.BucketType.user) -    async def spookyrating(self, ctx, who: discord.Member = None): +    async def spookyrating(self, ctx: commands.Context, who: discord.Member = None) -> None:          """          Calculates the spooky rating of someone. @@ -61,7 +61,7 @@ class SpookyRating(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Spooky Rating Cog load."""      bot.add_cog(SpookyRating(bot))      log.info("SpookyRating cog loaded") diff --git a/bot/seasons/halloween/spookyreact.py b/bot/seasons/halloween/spookyreact.py index 5a086072..90b1254d 100644 --- a/bot/seasons/halloween/spookyreact.py +++ b/bot/seasons/halloween/spookyreact.py @@ -2,7 +2,7 @@ import logging  import re  import discord -from discord.ext.commands import Cog +from discord.ext.commands import Bot, Cog  log = logging.getLogger(__name__) @@ -20,11 +20,11 @@ SPOOKY_TRIGGERS = {  class SpookyReact(Cog):      """A cog that makes the bot react to message triggers.""" -    def __init__(self, bot): +    def __init__(self, bot: Bot):          self.bot = bot      @Cog.listener() -    async def on_message(self, ctx: discord.Message): +    async def on_message(self, ctx: discord.Message) -> None:          """          A command to send the seasonalbot github project. @@ -66,7 +66,7 @@ class SpookyReact(Cog):          return False -def setup(bot): +def setup(bot: Bot) -> None:      """Spooky reaction Cog load."""      bot.add_cog(SpookyReact(bot))      log.info("SpookyReact cog loaded") diff --git a/bot/seasons/halloween/spookysound.py b/bot/seasons/halloween/spookysound.py index 44fdd9d6..e0676d0a 100644 --- a/bot/seasons/halloween/spookysound.py +++ b/bot/seasons/halloween/spookysound.py @@ -13,14 +13,14 @@ log = logging.getLogger(__name__)  class SpookySound(commands.Cog):      """A cog that plays a spooky sound in a voice channel on command.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.sound_files = list(Path("bot/resources/halloween/spookysounds").glob("*.mp3"))          self.channel = None      @commands.cooldown(rate=1, per=1)      @commands.command(brief="Play a spooky sound, restricted to once per 2 mins") -    async def spookysound(self, ctx): +    async def spookysound(self, ctx: commands.Context) -> None:          """          Connect to the Hacktoberbot voice channel, play a random spooky sound, then disconnect. @@ -37,12 +37,12 @@ class SpookySound(commands.Cog):          voice.play(src, after=lambda e: self.bot.loop.create_task(self.disconnect(voice)))      @staticmethod -    async def disconnect(voice): +    async def disconnect(voice: discord.VoiceClient) -> None:          """Helper method to disconnect a given voice client."""          await voice.disconnect() -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Spooky sound Cog load."""      bot.add_cog(SpookySound(bot))      log.info("SpookySound cog loaded") diff --git a/bot/seasons/halloween/timeleft.py b/bot/seasons/halloween/timeleft.py index a2b16a6c..77767baa 100644 --- a/bot/seasons/halloween/timeleft.py +++ b/bot/seasons/halloween/timeleft.py @@ -1,5 +1,6 @@  import logging  from datetime import datetime +from typing import Tuple  from discord.ext import commands @@ -9,16 +10,16 @@ log = logging.getLogger(__name__)  class TimeLeft(commands.Cog):      """A Cog that tells you how long left until Hacktober is over!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @staticmethod -    def in_october(): +    def in_october() -> bool:          """Return True if the current month is October."""          return datetime.utcnow().month == 10      @staticmethod -    def load_date(): +    def load_date() -> Tuple[int, datetime, datetime]:          """Return of a tuple of the current time and the end and start times of the next October."""          now = datetime.utcnow()          year = now.year @@ -29,7 +30,7 @@ class TimeLeft(commands.Cog):          return now, end, start      @commands.command() -    async def timeleft(self, ctx): +    async def timeleft(self, ctx: commands.Context) -> None:          """          Calculates the time left until the end of Hacktober. @@ -53,7 +54,7 @@ class TimeLeft(commands.Cog):              ) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Cog load."""      bot.add_cog(TimeLeft(bot))      log.info("TimeLeft cog loaded") diff --git a/bot/seasons/pride/__init__.py b/bot/seasons/pride/__init__.py index b1897eca..75e90b2a 100644 --- a/bot/seasons/pride/__init__.py +++ b/bot/seasons/pride/__init__.py @@ -27,7 +27,7 @@ class Pride(SeasonBase):      # Duration of season      start_date = "01/06" -    end_date = "30/06" +    end_date = "01/07"      # Season logo      colour = Colours.soft_red diff --git a/bot/seasons/pride/pride_anthem.py b/bot/seasons/pride/pride_anthem.py index f226f4bb..b0c6d34e 100644 --- a/bot/seasons/pride/pride_anthem.py +++ b/bot/seasons/pride/pride_anthem.py @@ -11,7 +11,7 @@ log = logging.getLogger(__name__)  class PrideAnthem(commands.Cog):      """Embed a random youtube video for a gay anthem!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.anthems = self.load_vids() @@ -39,7 +39,7 @@ class PrideAnthem(commands.Cog):          return anthems      @commands.command(name="prideanthem", aliases=["anthem", "pridesong"]) -    async def prideanthem(self, ctx, genre: str = None): +    async def prideanthem(self, ctx: commands.Context, genre: str = None) -> None:          """          Sends a message with a video of a random pride anthem. @@ -52,7 +52,7 @@ class PrideAnthem(commands.Cog):              await ctx.send("I couldn't find a video, sorry!") -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Cog loader for pride anthem."""      bot.add_cog(PrideAnthem(bot))      log.info("Pride anthems cog loaded!") diff --git a/bot/seasons/pride/pride_avatar.py b/bot/seasons/pride/pride_avatar.py index a5b38d20..85e49d5c 100644 --- a/bot/seasons/pride/pride_avatar.py +++ b/bot/seasons/pride/pride_avatar.py @@ -56,11 +56,11 @@ OPTIONS = {  class PrideAvatar(commands.Cog):      """Put an LGBT spin on your avatar!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @staticmethod -    def crop_avatar(avatar): +    def crop_avatar(avatar: Image) -> Image:          """This crops the avatar into a circle."""          mask = Image.new("L", avatar.size, 0)          draw = ImageDraw.Draw(mask) @@ -69,7 +69,7 @@ class PrideAvatar(commands.Cog):          return avatar      @staticmethod -    def crop_ring(ring, px): +    def crop_ring(ring: Image, px: int) -> Image:          """This crops the ring into a circle."""          mask = Image.new("L", ring.size, 0)          draw = ImageDraw.Draw(mask) @@ -79,7 +79,7 @@ class PrideAvatar(commands.Cog):          return ring      @commands.group(aliases=["avatarpride", "pridepfp", "prideprofile"], invoke_without_command=True) -    async def prideavatar(self, ctx, option="lgbt", pixels: int = 64): +    async def prideavatar(self, ctx: commands.Context, option: str = "lgbt", pixels: int = 64) -> None:          """          This surrounds an avatar with a border of a specified LGBT flag. @@ -126,7 +126,7 @@ class PrideAvatar(commands.Cog):          await ctx.send(file=file, embed=embed)      @prideavatar.command() -    async def flags(self, ctx): +    async def flags(self, ctx: commands.Context) -> None:          """This lists the flags that can be used with the prideavatar command."""          choices = sorted(set(OPTIONS.values()))          options = "• " + "\n• ".join(choices) @@ -139,7 +139,7 @@ class PrideAvatar(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Cog load."""      bot.add_cog(PrideAvatar(bot))      log.info("PrideAvatar cog loaded") diff --git a/bot/seasons/season.py b/bot/seasons/season.py index c88ef2a7..3546fda6 100644 --- a/bot/seasons/season.py +++ b/bot/seasons/season.py @@ -264,14 +264,14 @@ class SeasonBase:          return await self.apply_server_icon() -    async def announce_season(self): +    async def announce_season(self) -> None:          """          Announces a change in season in the announcement channel.          It will skip the announcement if the current active season is the "evergreen" default season.          """          # Don't actually announce if reverting to normal season -        if self.name in ("evergreen", "wildcard"): +        if self.name in ("evergreen", "wildcard", "halloween"):              log.debug(f"Season Changed: {self.name}")              return @@ -303,7 +303,7 @@ class SeasonBase:                  cogs.append(cog_name)          if cogs: -            def cog_name(cog): +            def cog_name(cog: commands.Cog) -> str:                  return type(cog).__name__              cog_info = [] @@ -320,7 +320,7 @@ class SeasonBase:          await channel.send(mention, embed=embed) -    async def load(self): +    async def load(self) -> None:          """          Loads extensions, bot name and avatar, server icon and announces new season. @@ -361,7 +361,7 @@ class SeasonBase:  class SeasonManager(commands.Cog):      """A cog for managing seasons.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.season = get_season(date=datetime.datetime.utcnow())          self.season_task = bot.loop.create_task(self.load_seasons()) @@ -378,7 +378,7 @@ class SeasonManager(commands.Cog):          )          self.sleep_time = (midnight - datetime.datetime.now()).seconds + 60 -    async def load_seasons(self): +    async def load_seasons(self) -> None:          """Asynchronous timer loop to check for a new season every midnight."""          await self.bot.wait_until_ready()          await self.season.load() @@ -397,7 +397,7 @@ class SeasonManager(commands.Cog):      @with_role(Roles.moderator, Roles.admin, Roles.owner)      @commands.command(name="season") -    async def change_season(self, ctx, new_season: str): +    async def change_season(self, ctx: commands.Context, new_season: str) -> None:          """Changes the currently active season on the bot."""          self.season = get_season(season_name=new_season)          await self.season.load() @@ -405,10 +405,10 @@ class SeasonManager(commands.Cog):      @with_role(Roles.moderator, Roles.admin, Roles.owner)      @commands.command(name="seasons") -    async def show_seasons(self, ctx): +    async def show_seasons(self, ctx: commands.Context) -> None:          """Shows the available seasons and their dates."""          # Sort by start order, followed by lower duration -        def season_key(season_class: Type[SeasonBase]): +        def season_key(season_class: Type[SeasonBase]) -> Tuple[datetime.datetime, datetime.timedelta]:              return season_class.start(), season_class.end() - datetime.datetime.max          current_season = self.season.name @@ -448,13 +448,13 @@ class SeasonManager(commands.Cog):      @with_role(Roles.moderator, Roles.admin, Roles.owner)      @commands.group() -    async def refresh(self, ctx): +    async def refresh(self, ctx: commands.Context) -> None:          """Refreshes certain seasonal elements without reloading seasons."""          if not ctx.invoked_subcommand:              await ctx.send_help(ctx.command)      @refresh.command(name="avatar") -    async def refresh_avatar(self, ctx): +    async def refresh_avatar(self, ctx: commands.Context) -> None:          """Re-applies the bot avatar for the currently loaded season."""          # Attempt the change          is_changed = await self.season.apply_avatar() @@ -477,7 +477,7 @@ class SeasonManager(commands.Cog):          await ctx.send(embed=embed)      @refresh.command(name="icon") -    async def refresh_server_icon(self, ctx): +    async def refresh_server_icon(self, ctx: commands.Context) -> None:          """Re-applies the server icon for the currently loaded season."""          # Attempt the change          is_changed = await self.season.apply_server_icon() @@ -500,7 +500,7 @@ class SeasonManager(commands.Cog):          await ctx.send(embed=embed)      @refresh.command(name="username", aliases=("name",)) -    async def refresh_username(self, ctx): +    async def refresh_username(self, ctx: commands.Context) -> None:          """Re-applies the bot username for the currently loaded season."""          old_username = str(bot.user)          old_display_name = ctx.guild.me.display_name @@ -539,10 +539,10 @@ class SeasonManager(commands.Cog):      @with_role(Roles.moderator, Roles.admin, Roles.owner)      @commands.command() -    async def announce(self, ctx): +    async def announce(self, ctx: commands.Context) -> None:          """Announces the currently loaded season."""          await self.season.announce_season() -    def cog_unload(self): +    def cog_unload(self) -> None:          """Cancel season-related tasks on cog unload."""          self.season_task.cancel() diff --git a/bot/seasons/valentines/be_my_valentine.py b/bot/seasons/valentines/be_my_valentine.py index c4acf17a..a073e1bd 100644 --- a/bot/seasons/valentines/be_my_valentine.py +++ b/bot/seasons/valentines/be_my_valentine.py @@ -1,8 +1,8 @@  import logging  import random -import typing  from json import load  from pathlib import Path +from typing import Optional, Tuple  import discord  from discord.ext import commands @@ -18,12 +18,12 @@ HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_hea  class BeMyValentine(commands.Cog):      """A cog that sends Valentines to other users!""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.valentines = self.load_json()      @staticmethod -    def load_json(): +    def load_json() -> dict:          """Load Valentines messages from the static resources."""          p = Path("bot/resources/valentines/bemyvalentine_valentines.json")          with p.open() as json_data: @@ -31,7 +31,7 @@ class BeMyValentine(commands.Cog):              return valentines      @commands.group(name="lovefest", invoke_without_command=True) -    async def lovefest_role(self, ctx): +    async def lovefest_role(self, ctx: commands.Context) -> None:          """          Subscribe or unsubscribe from the lovefest role. @@ -43,7 +43,7 @@ class BeMyValentine(commands.Cog):          await ctx.send_help(ctx.command)      @lovefest_role.command(name="sub") -    async def add_role(self, ctx): +    async def add_role(self, ctx: commands.Context) -> None:          """Adds the lovefest role."""          user = ctx.author          role = discord.utils.get(ctx.guild.roles, id=Lovefest.role_id) @@ -54,7 +54,7 @@ class BeMyValentine(commands.Cog):              await ctx.send("You already have the role !")      @lovefest_role.command(name="unsub") -    async def remove_role(self, ctx): +    async def remove_role(self, ctx: commands.Context) -> None:          """Removes the lovefest role."""          user = ctx.author          role = discord.utils.get(ctx.guild.roles, id=Lovefest.role_id) @@ -66,7 +66,9 @@ class BeMyValentine(commands.Cog):      @commands.cooldown(1, 1800, BucketType.user)      @commands.group(name='bemyvalentine', invoke_without_command=True) -    async def send_valentine(self, ctx, user: typing.Optional[discord.Member] = None, *, valentine_type=None): +    async def send_valentine( +        self, ctx: commands.Context, user: Optional[discord.Member] = None, *, valentine_type: str = None +    ) -> None:          """          Send a valentine to user, if specified, or to a random user with the lovefest role. @@ -112,7 +114,9 @@ class BeMyValentine(commands.Cog):      @commands.cooldown(1, 1800, BucketType.user)      @send_valentine.command(name='secret') -    async def anonymous(self, ctx, user: typing.Optional[discord.Member] = None, *, valentine_type=None): +    async def anonymous( +        self, ctx: commands.Context, user: Optional[discord.Member] = None, *, valentine_type: str = None +    ) -> None:          """          Send an anonymous Valentine via DM to to a user, if specified, or to a random with the lovefest role. @@ -164,7 +168,7 @@ class BeMyValentine(commands.Cog):          else:              await ctx.author.send(f"Your message has been sent to {user}") -    def valentine_check(self, valentine_type): +    def valentine_check(self, valentine_type: str) -> Tuple[str, str]:          """Return the appropriate Valentine type & title based on the invoking user's input."""          if valentine_type is None:              valentine, title = self.random_valentine() @@ -184,7 +188,7 @@ class BeMyValentine(commands.Cog):          return valentine, title      @staticmethod -    def random_user(author: discord.Member, members: discord.Member): +    def random_user(author: discord.Member, members: discord.Member) -> None:          """          Picks a random member from the list provided in `members`. @@ -196,13 +200,13 @@ class BeMyValentine(commands.Cog):          return random.choice(members) if members else None      @staticmethod -    def random_emoji(): +    def random_emoji() -> Tuple[str, str]:          """Return two random emoji from the module-defined constants."""          EMOJI_1 = random.choice(HEART_EMOJIS)          EMOJI_2 = random.choice(HEART_EMOJIS)          return EMOJI_1, EMOJI_2 -    def random_valentine(self): +    def random_valentine(self) -> Tuple[str, str]:          """Grabs a random poem or a compliment (any message)."""          valentine_poem = random.choice(self.valentines['valentine_poems'])          valentine_compliment = random.choice(self.valentines['valentine_compliments']) @@ -213,18 +217,18 @@ class BeMyValentine(commands.Cog):              title = 'A compliment for '          return random_valentine, title -    def valentine_poem(self): +    def valentine_poem(self) -> str:          """Grabs a random poem."""          valentine_poem = random.choice(self.valentines['valentine_poems'])          return valentine_poem -    def valentine_compliment(self): +    def valentine_compliment(self) -> str:          """Grabs a random compliment."""          valentine_compliment = random.choice(self.valentines['valentine_compliments'])          return valentine_compliment -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Be my Valentine Cog load."""      bot.add_cog(BeMyValentine(bot))      log.info("BeMyValentine cog loaded") diff --git a/bot/seasons/valentines/lovecalculator.py b/bot/seasons/valentines/lovecalculator.py index 1d5a028d..03d3d7d5 100644 --- a/bot/seasons/valentines/lovecalculator.py +++ b/bot/seasons/valentines/lovecalculator.py @@ -23,12 +23,12 @@ with Path("bot/resources/valentines/love_matches.json").open() as file:  class LoveCalculator(Cog):      """A cog for calculating the love between two people.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(aliases=('love_calculator', 'love_calc'))      @commands.cooldown(rate=1, per=5, type=commands.BucketType.user) -    async def love(self, ctx, who: Union[Member, str], whom: Union[Member, str] = None): +    async def love(self, ctx: commands.Context, who: Union[Member, str], whom: Union[Member, str] = None) -> None:          """          Tells you how much the two love each other. @@ -53,7 +53,7 @@ class LoveCalculator(Cog):              staff = ctx.guild.get_role(Roles.helpers).members              whom = random.choice(staff) -        def normalize(arg): +        def normalize(arg: Union[Member, str]) -> str:              if isinstance(arg, Member):                  # If we are given a member, return name#discrim without any extra changes                  arg = str(arg) @@ -98,7 +98,7 @@ class LoveCalculator(Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Love calculator Cog load."""      bot.add_cog(LoveCalculator(bot))      log.info("LoveCalculator cog loaded") diff --git a/bot/seasons/valentines/movie_generator.py b/bot/seasons/valentines/movie_generator.py index fa5f236a..ce1d7d5b 100644 --- a/bot/seasons/valentines/movie_generator.py +++ b/bot/seasons/valentines/movie_generator.py @@ -14,11 +14,11 @@ log = logging.getLogger(__name__)  class RomanceMovieFinder(commands.Cog):      """A Cog that returns a random romance movie suggestion to a user.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(name="romancemovie") -    async def romance_movie(self, ctx): +    async def romance_movie(self, ctx: commands.Context) -> None:          """Randomly selects a romance movie and displays information about it."""          # Selecting a random int to parse it to the page parameter          random_page = random.randint(0, 20) @@ -57,7 +57,7 @@ class RomanceMovieFinder(commands.Cog):                  await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Romance movie Cog load."""      bot.add_cog(RomanceMovieFinder(bot))      log.info("RomanceMovieFinder cog loaded") diff --git a/bot/seasons/valentines/myvalenstate.py b/bot/seasons/valentines/myvalenstate.py index fad202e3..0256c39a 100644 --- a/bot/seasons/valentines/myvalenstate.py +++ b/bot/seasons/valentines/myvalenstate.py @@ -18,10 +18,10 @@ with open(Path("bot/resources/valentines/valenstates.json"), "r") as file:  class MyValenstate(commands.Cog):      """A Cog to find your most likely Valentine's vacation destination.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot -    def levenshtein(self, source, goal): +    def levenshtein(self, source: str, goal: str) -> int:          """Calculates the Levenshtein Distance between source and goal."""          if len(source) < len(goal):              return self.levenshtein(goal, source) @@ -42,7 +42,7 @@ class MyValenstate(commands.Cog):          return pre_row[-1]      @commands.command() -    async def myvalenstate(self, ctx, *, name=None): +    async def myvalenstate(self, ctx: commands.Context, *, name: str = None) -> None:          """Find the vacation spot(s) with the most matching characters to the invoking user."""          eq_chars = collections.defaultdict(int)          if name is None: @@ -81,7 +81,7 @@ class MyValenstate(commands.Cog):          await ctx.channel.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Valenstate Cog load."""      bot.add_cog(MyValenstate(bot))      log.info("MyValenstate cog loaded") diff --git a/bot/seasons/valentines/pickuplines.py b/bot/seasons/valentines/pickuplines.py index 46772197..8b2c9822 100644 --- a/bot/seasons/valentines/pickuplines.py +++ b/bot/seasons/valentines/pickuplines.py @@ -17,11 +17,11 @@ with open(Path("bot/resources/valentines/pickup_lines.json"), "r", encoding="utf  class PickupLine(commands.Cog):      """A cog that gives random cheesy pickup lines.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command() -    async def pickupline(self, ctx): +    async def pickupline(self, ctx: commands.Context) -> None:          """          Gives you a random pickup line. @@ -39,7 +39,7 @@ class PickupLine(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Pickup lines Cog load."""      bot.add_cog(PickupLine(bot))      log.info('PickupLine cog loaded') diff --git a/bot/seasons/valentines/savethedate.py b/bot/seasons/valentines/savethedate.py index 34264183..e0bc3904 100644 --- a/bot/seasons/valentines/savethedate.py +++ b/bot/seasons/valentines/savethedate.py @@ -19,11 +19,11 @@ with open(Path("bot/resources/valentines/date_ideas.json"), "r", encoding="utf8"  class SaveTheDate(commands.Cog):      """A cog that gives random suggestion for a Valentine's date.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command() -    async def savethedate(self, ctx): +    async def savethedate(self, ctx: commands.Context) -> None:          """Gives you ideas for what to do on a date with your valentine."""          random_date = random.choice(VALENTINES_DATES['ideas'])          emoji_1 = random.choice(HEART_EMOJIS) @@ -36,7 +36,7 @@ class SaveTheDate(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Save the date Cog Load."""      bot.add_cog(SaveTheDate(bot))      log.info("SaveTheDate cog loaded") diff --git a/bot/seasons/valentines/valentine_zodiac.py b/bot/seasons/valentines/valentine_zodiac.py index fa849cb2..c8d77e75 100644 --- a/bot/seasons/valentines/valentine_zodiac.py +++ b/bot/seasons/valentines/valentine_zodiac.py @@ -17,12 +17,12 @@ HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_hea  class ValentineZodiac(commands.Cog):      """A Cog that returns a counter compatible zodiac sign to the given user's zodiac sign.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot          self.zodiacs = self.load_json()      @staticmethod -    def load_json(): +    def load_json() -> dict:          """Load zodiac compatibility from static JSON resource."""          p = Path("bot/resources/valentines/zodiac_compatibility.json")          with p.open() as json_data: @@ -30,7 +30,7 @@ class ValentineZodiac(commands.Cog):              return zodiacs      @commands.command(name="partnerzodiac") -    async def counter_zodiac(self, ctx, zodiac_sign): +    async def counter_zodiac(self, ctx: commands.Context, zodiac_sign: str) -> None:          """Provides a counter compatible zodiac sign to the given user's zodiac sign."""          try:              compatible_zodiac = random.choice(self.zodiacs[zodiac_sign.lower()]) @@ -52,7 +52,7 @@ class ValentineZodiac(commands.Cog):          await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Valentine zodiac Cog load."""      bot.add_cog(ValentineZodiac(bot))      log.info("ValentineZodiac cog loaded") diff --git a/bot/seasons/valentines/whoisvalentine.py b/bot/seasons/valentines/whoisvalentine.py index d73ccd9b..b8586dca 100644 --- a/bot/seasons/valentines/whoisvalentine.py +++ b/bot/seasons/valentines/whoisvalentine.py @@ -17,11 +17,11 @@ with open(Path("bot/resources/valentines/valentine_facts.json"), "r") as file:  class ValentineFacts(commands.Cog):      """A Cog for displaying facts about Saint Valentine.""" -    def __init__(self, bot): +    def __init__(self, bot: commands.Bot):          self.bot = bot      @commands.command(aliases=('whoisvalentine', 'saint_valentine')) -    async def who_is_valentine(self, ctx): +    async def who_is_valentine(self, ctx: commands.Context) -> None:          """Displays info about Saint Valentine."""          embed = discord.Embed(              title="Who is Saint Valentine?", @@ -36,7 +36,7 @@ class ValentineFacts(commands.Cog):          await ctx.channel.send(embed=embed)      @commands.command() -    async def valentine_fact(self, ctx): +    async def valentine_fact(self, ctx: commands.Context) -> None:          """Shows a random fact about Valentine's Day."""          embed = discord.Embed(              title=choice(FACTS['titles']), @@ -47,7 +47,7 @@ class ValentineFacts(commands.Cog):          await ctx.channel.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None:      """Who is Valentine Cog load."""      bot.add_cog(ValentineFacts(bot))      log.info("ValentineFacts cog loaded") diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py index 15c4b5db..0aa50af6 100644 --- a/bot/utils/__init__.py +++ b/bot/utils/__init__.py @@ -1,4 +1,6 @@  import asyncio +import re +import string  from typing import List  import discord @@ -27,7 +29,7 @@ async def disambiguate(      choices = (f'{index}: {entry}' for index, entry in enumerate(entries, start=1)) -    def check(message): +    def check(message: discord.Message) -> bool:          return (message.content.isdigit()                  and message.author == ctx.author                  and message.channel == ctx.channel) @@ -71,3 +73,57 @@ async def disambiguate(          return entries[index - 1]      except IndexError:          raise BadArgument('Invalid choice.') + + +def replace_many( +        sentence: str, replacements: dict, *, ignore_case: bool = False, match_case: bool = False +) -> str: +    """ +    Replaces multiple substrings in a string given a mapping of strings. + +    By default replaces long strings before short strings, and lowercase before uppercase. +    Example: +        var = replace_many("This is a sentence", {"is": "was", "This": "That"}) +        assert var == "That was a sentence" + +    If `ignore_case` is given, does a case insensitive match. +    Example: +        var = replace_many("THIS is a sentence", {"IS": "was", "tHiS": "That"}, ignore_case=True) +        assert var == "That was a sentence" + +    If `match_case` is given, matches the case of the replacement with the replaced word. +    Example: +        var = replace_many( +            "This IS a sentence", {"is": "was", "this": "that"}, ignore_case=True, match_case=True +        ) +        assert var == "That WAS a sentence" +    """ +    if ignore_case: +        replacements = dict( +            (word.lower(), replacement) for word, replacement in replacements.items() +        ) + +    words_to_replace = sorted(replacements, key=lambda s: (-len(s), s)) + +    # Join and compile words to replace into a regex +    pattern = "|".join(re.escape(word) for word in words_to_replace) +    regex = re.compile(pattern, re.I if ignore_case else 0) + +    def _repl(match: re.Match) -> str: +        """Returns replacement depending on `ignore_case` and `match_case`.""" +        word = match.group(0) +        replacement = replacements[word.lower() if ignore_case else word] + +        if not match_case: +            return replacement + +        # Clean punctuation from word so string methods work +        cleaned_word = word.translate(str.maketrans('', '', string.punctuation)) +        if cleaned_word.isupper(): +            return replacement.upper() +        elif cleaned_word[0].isupper(): +            return replacement.capitalize() +        else: +            return replacement.lower() + +    return regex.sub(_repl, sentence) diff --git a/bot/utils/halloween/spookifications.py b/bot/utils/halloween/spookifications.py index 69b49919..11f69850 100644 --- a/bot/utils/halloween/spookifications.py +++ b/bot/utils/halloween/spookifications.py @@ -7,7 +7,7 @@ from PIL import ImageOps  log = logging.getLogger() -def inversion(im): +def inversion(im: Image) -> Image:      """      Inverts the image. @@ -18,7 +18,7 @@ def inversion(im):      return inv -def pentagram(im): +def pentagram(im: Image) -> Image:      """Adds pentagram to the image."""      im = im.convert('RGB')      wt, ht = im.size @@ -28,7 +28,7 @@ def pentagram(im):      return im -def bat(im): +def bat(im: Image) -> Image:      """      Adds a bat silhoutte to the image. @@ -50,7 +50,7 @@ def bat(im):      return im -def get_random_effect(im): +def get_random_effect(im: Image) -> Image:      """Randomly selects and applies an effect."""      effects = [inversion, pentagram, bat]      effect = choice(effects) diff --git a/bot/utils/persist.py b/bot/utils/persist.py new file mode 100644 index 00000000..a60a1219 --- /dev/null +++ b/bot/utils/persist.py @@ -0,0 +1,66 @@ +import sqlite3 +from pathlib import Path +from shutil import copyfile + +from bot.seasons.season import get_seasons + +DIRECTORY = Path("data")  # directory that has a persistent volume mapped to it + + +def make_persistent(file_path: Path) -> Path: +    """ +    Copy datafile at the provided file_path to the persistent data directory. + +    A persistent data file is needed by some features in order to not lose data +    after bot rebuilds. + +    This function will ensure that a clean data file with default schema, +    structure or data is copied over to the persistent volume before returning +    the path to this new persistent version of the file. + +    If the persistent file already exists, it won't be overwritten with the +    clean default file, just returning the Path instead to the existing file. + +    Note: Avoid using the same file name as other features in the same seasons +    as otherwise only one datafile can be persistent and will be returned for +    both cases. + +    Example Usage: +    >>> import json +    >>> template_datafile = Path("bot", "resources", "evergreen", "myfile.json") +    >>> path_to_persistent_file = make_persistent(template_datafile) +    >>> print(path_to_persistent_file) +    data/evergreen/myfile.json +    >>> with path_to_persistent_file.open("w+") as f: +    >>>     data = json.load(f) +    """ +    # ensure the persistent data directory exists +    DIRECTORY.mkdir(exist_ok=True) + +    if not file_path.is_file(): +        raise OSError(f"File not found at {file_path}.") + +    # detect season in datafile path for assigning to subdirectory +    season = next((s for s in get_seasons() if s in file_path.parts), None) + +    if season: +        # make sure subdirectory exists first +        subdirectory = Path(DIRECTORY, season) +        subdirectory.mkdir(exist_ok=True) + +        persistent_path = Path(subdirectory, file_path.name) + +    else: +        persistent_path = Path(DIRECTORY, file_path.name) + +    # copy base/template datafile to persistent directory +    if not persistent_path.exists(): +        copyfile(file_path, persistent_path) + +    return persistent_path + + +def sqlite(db_path: Path) -> sqlite3.Connection: +    """Copy sqlite file to the persistent data directory and return an open connection.""" +    persistent_path = make_persistent(db_path) +    return sqlite3.connect(persistent_path) | 
