diff options
author | 2019-10-02 09:39:10 +0530 | |
---|---|---|
committer | 2019-10-02 09:39:10 +0530 | |
commit | cd5842811f92bd5c82a164d33ad71a9c7c172e57 (patch) | |
tree | 4a15827a3d94cbc78fa8a9a5b47cdd57283f304e /bot | |
parent | Worked on the requested changes and also made a few other changes: (diff) | |
parent | Merge branch 'master' into trivia_quiz (diff) |
Merge branch 'trivia_quiz' of https://github.com/RohanJnr/seasonalbot into trivia_quiz
Diffstat (limited to 'bot')
64 files changed, 665 insertions, 1063 deletions
diff --git a/bot/__init__.py b/bot/__init__.py index 9e0290a7..4729e50c 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -13,7 +13,7 @@ logging.TRACE = 5 logging.addLevelName(logging.TRACE, "TRACE") -def monkeypatch_trace(self, msg, *args, **kwargs): +def monkeypatch_trace(self: logging.Logger, msg: str, *args, **kwargs) -> None: """ Log 'msg % args' with severity 'TRACE'. @@ -4,7 +4,7 @@ from traceback import format_exc from typing import List from aiohttp import AsyncResolver, ClientSession, TCPConnector -from discord import Embed +from discord import DiscordException, Embed from discord.ext import commands from bot.constants import Channels, Client @@ -23,7 +23,7 @@ class SeasonalBot(commands.Bot): connector=TCPConnector(resolver=AsyncResolver(), family=socket.AF_INET) ) - def load_extensions(self, exts: List[str]): + def load_extensions(self, exts: List[str]) -> None: """Unload all current extensions, then load the given extensions.""" # Unload all cogs extensions = list(self.extensions.keys()) @@ -40,7 +40,7 @@ class SeasonalBot(commands.Bot): except Exception as e: log.error(f'Failed to load extension {cog}: {repr(e)} {format_exc()}') - async def send_log(self, title: str, details: str = None, *, icon: str = None): + async def send_log(self, title: str, details: str = None, *, icon: str = None) -> None: """Send an embed message to the devlog channel.""" devlog = self.get_channel(Channels.devlog) @@ -56,7 +56,7 @@ class SeasonalBot(commands.Bot): await devlog.send(embed=embed) - async def on_command_error(self, context, exception): + async def on_command_error(self, context: commands.Context, exception: DiscordException) -> None: """Check command errors for UserInputError and reset the cooldown if thrown.""" if isinstance(exception, commands.UserInputError): context.command.reset_cooldown(context) diff --git a/bot/decorators.py b/bot/decorators.py index 02cf4b8a..dbaad4a2 100644 --- a/bot/decorators.py +++ b/bot/decorators.py @@ -20,9 +20,9 @@ class InChannelCheckFailure(CheckFailure): pass -def with_role(*role_ids: int): +def with_role(*role_ids: int) -> bool: """Check to see whether the invoking user has any of the roles specified in role_ids.""" - async def predicate(ctx: Context): + async def predicate(ctx: Context) -> bool: if not ctx.guild: # Return False in a DM log.debug( f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. " @@ -43,9 +43,9 @@ def with_role(*role_ids: int): return commands.check(predicate) -def without_role(*role_ids: int): +def without_role(*role_ids: int) -> bool: """Check whether the invoking user does not have all of the roles specified in role_ids.""" - async def predicate(ctx: Context): + async def predicate(ctx: Context) -> bool: if not ctx.guild: # Return False in a DM log.debug( f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. " @@ -117,7 +117,7 @@ def override_in_channel(func: typing.Callable) -> typing.Callable: return func -def locked(): +def locked() -> typing.Union[typing.Callable, None]: """ Allows the user to only run one instance of the decorated command at a time. @@ -125,11 +125,11 @@ def locked(): This decorator has to go before (below) the `command` decorator. """ - def wrap(func): + def wrap(func: typing.Callable) -> typing.Union[typing.Callable, None]: func.__locks = WeakValueDictionary() @wraps(func) - async def inner(self, ctx, *args, **kwargs): + async def inner(self: typing.Callable, ctx: Context, *args, **kwargs) -> typing.Union[typing.Callable, None]: lock = func.__locks.setdefault(ctx.author.id, Lock()) if lock.locked(): embed = Embed() diff --git a/bot/pagination.py b/bot/pagination.py index c12b6233..f1233482 100644 --- a/bot/pagination.py +++ b/bot/pagination.py @@ -24,7 +24,7 @@ class EmptyPaginatorEmbed(Exception): class LinePaginator(Paginator): """A class that aids in paginating code blocks for Discord messages.""" - def __init__(self, prefix='```', suffix='```', max_size=2000, max_lines=None): + def __init__(self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None): """ Overrides the Paginator.__init__ from inside discord.ext.commands. @@ -42,7 +42,7 @@ class LinePaginator(Paginator): self._count = len(prefix) + 1 # prefix + newline self._pages = [] - def add_line(self, line='', *, empty=False): + def add_line(self, line: str = '', *, empty: bool = False) -> None: """ Adds a line to the current page. @@ -98,7 +98,7 @@ class LinePaginator(Paginator): ... ctx, embed ... ) """ - def event_check(reaction_: Reaction, user_: Member): + def event_check(reaction_: Reaction, user_: Member) -> bool: """Make sure that this reaction is what we want to operate on.""" no_restrictions = ( # Pagination is not restricted @@ -274,7 +274,7 @@ class ImagePaginator(Paginator): Refer to ImagePaginator.paginate for documentation on how to use. """ - def __init__(self, prefix="", suffix=""): + def __init__(self, prefix: str = "", suffix: str = ""): super().__init__(prefix, suffix) self._current_page = [prefix] self.images = [] diff --git a/bot/resources/halloween/github_links.json b/bot/resources/halloween/github_links.json index e69de29b..0967ef42 100644 --- a/bot/resources/halloween/github_links.json +++ b/bot/resources/halloween/github_links.json @@ -0,0 +1 @@ +{} diff --git a/bot/resources/persist/egg_hunt.sqlite b/bot/resources/persist/egg_hunt.sqlite Binary files differdeleted file mode 100644 index 6a7ae32d..00000000 --- a/bot/resources/persist/egg_hunt.sqlite +++ /dev/null diff --git a/bot/seasons/__init__.py b/bot/seasons/__init__.py index 1512fae2..7faf9164 100644 --- a/bot/seasons/__init__.py +++ b/bot/seasons/__init__.py @@ -1,5 +1,7 @@ import logging +from discord.ext import commands + from bot.seasons.season import SeasonBase, SeasonManager, get_season __all__ = ("SeasonBase", "get_season") @@ -7,6 +9,6 @@ __all__ = ("SeasonBase", "get_season") log = logging.getLogger(__name__) -def setup(bot): +def setup(bot: commands.Bot) -> None: bot.add_cog(SeasonManager(bot)) log.info("SeasonManager cog loaded") diff --git a/bot/seasons/christmas/__init__.py b/bot/seasons/christmas/__init__.py index 239181f4..ae93800e 100644 --- a/bot/seasons/christmas/__init__.py +++ b/bot/seasons/christmas/__init__.py @@ -18,7 +18,7 @@ class Christmas(SeasonBase): greeting = "Happy Holidays!" start_date = "01/12" - end_date = "31/12" + end_date = "01/01" colour = Colours.dark_green icon = ( diff --git a/bot/seasons/christmas/adventofcode.py b/bot/seasons/christmas/adventofcode.py index a9e72805..6609387e 100644 --- a/bot/seasons/christmas/adventofcode.py +++ b/bot/seasons/christmas/adventofcode.py @@ -46,7 +46,7 @@ def time_left_to_aoc_midnight() -> Tuple[datetime, timedelta]: return tomorrow, tomorrow - datetime.now(EST) -async def countdown_status(bot: commands.Bot): +async def countdown_status(bot: commands.Bot) -> None: """Set the playing status of the bot to the minutes & hours left until the next day's challenge.""" while is_in_advent(): _, time_left = time_left_to_aoc_midnight() @@ -73,7 +73,7 @@ async def countdown_status(bot: commands.Bot): await asyncio.sleep(delay) -async def day_countdown(bot: commands.Bot): +async def day_countdown(bot: commands.Bot) -> None: """ Calculate the number of seconds left until the next day of Advent. @@ -127,7 +127,7 @@ class AdventOfCode(commands.Cog): @commands.group(name="adventofcode", aliases=("aoc",), invoke_without_command=True) @override_in_channel - async def adventofcode_group(self, ctx: commands.Context): + async def adventofcode_group(self, ctx: commands.Context) -> None: """All of the Advent of Code commands.""" await ctx.send_help(ctx.command) @@ -136,7 +136,7 @@ class AdventOfCode(commands.Cog): aliases=("sub", "notifications", "notify", "notifs"), brief="Notifications for new days" ) - async def aoc_subscribe(self, ctx: commands.Context): + async def aoc_subscribe(self, ctx: commands.Context) -> None: """Assign the role for notifications about new days being ready.""" role = ctx.guild.get_role(AocConfig.role_id) unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" @@ -150,7 +150,7 @@ class AdventOfCode(commands.Cog): f"If you don't want them any more, run `{unsubscribe_command}` instead.") @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") - async def aoc_unsubscribe(self, ctx: commands.Context): + async def aoc_unsubscribe(self, ctx: commands.Context) -> None: """Remove the role for notifications about new days being ready.""" role = ctx.guild.get_role(AocConfig.role_id) @@ -161,7 +161,7 @@ class AdventOfCode(commands.Cog): await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") - async def aoc_countdown(self, ctx: commands.Context): + async def aoc_countdown(self, ctx: commands.Context) -> None: """Return time left until next day.""" if not is_in_advent(): datetime_now = datetime.now(EST) @@ -178,12 +178,12 @@ class AdventOfCode(commands.Cog): await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") - async def about_aoc(self, ctx: commands.Context): + async def about_aoc(self, ctx: commands.Context) -> None: """Respond with an explanation of all things Advent of Code.""" await ctx.send("", embed=self.cached_about_aoc) @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join PyDis' private AoC leaderboard") - async def join_leaderboard(self, ctx: commands.Context): + async def join_leaderboard(self, ctx: commands.Context) -> None: """DM the user the information for joining the PyDis AoC private leaderboard.""" author = ctx.message.author log.info(f"{author.name} ({author.id}) has requested the PyDis AoC leaderboard code") @@ -203,7 +203,7 @@ class AdventOfCode(commands.Cog): aliases=("board", "lb"), brief="Get a snapshot of the PyDis private AoC leaderboard", ) - async def aoc_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10): + async def aoc_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10) -> None: """ Pull the top number_of_people_to_display members from the PyDis leaderboard and post an embed. @@ -244,7 +244,7 @@ class AdventOfCode(commands.Cog): aliases=("dailystats", "ds"), brief="Get daily statistics for the PyDis private leaderboard" ) - async def private_leaderboard_daily_stats(self, ctx: commands.Context): + async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None: """ Respond with a table of the daily completion statistics for the PyDis private leaderboard. @@ -287,7 +287,7 @@ class AdventOfCode(commands.Cog): aliases=("globalboard", "gb"), brief="Get a snapshot of the global AoC leaderboard", ) - async def global_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10): + async def global_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10) -> None: """ Pull the top number_of_people_to_display members from the global AoC leaderboard and post an embed. @@ -319,7 +319,7 @@ class AdventOfCode(commands.Cog): embed=aoc_embed, ) - async def _check_leaderboard_cache(self, ctx, global_board: bool = False): + async def _check_leaderboard_cache(self, ctx: commands.Context, global_board: bool = False) -> None: """ Check age of current leaderboard & pull a new one if the board is too old. @@ -359,7 +359,7 @@ class AdventOfCode(commands.Cog): ) async def _check_n_entries(self, ctx: commands.Context, number_of_people_to_display: int) -> int: - """Check for n > max_entries and n <= 0""" + """Check for n > max_entries and n <= 0.""" max_entries = AocConfig.leaderboard_max_displayed_members author = ctx.message.author if not 0 <= number_of_people_to_display <= max_entries: @@ -390,7 +390,7 @@ class AdventOfCode(commands.Cog): return about_embed - async def _boardgetter(self, global_board: bool): + async def _boardgetter(self, global_board: bool) -> None: """Invoke the proper leaderboard getter based on the global_board boolean.""" if global_board: self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() diff --git a/bot/seasons/christmas/hanukkah_embed.py b/bot/seasons/christmas/hanukkah_embed.py index 652a1f35..aaa02b27 100644 --- a/bot/seasons/christmas/hanukkah_embed.py +++ b/bot/seasons/christmas/hanukkah_embed.py @@ -1,5 +1,6 @@ import datetime import logging +from typing import List from discord import Embed from discord.ext import commands @@ -13,7 +14,7 @@ log = logging.getLogger(__name__) class HanukkahEmbed(commands.Cog): """A cog that returns information about Hanukkah festival.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.url = ("https://www.hebcal.com/hebcal/?v=1&cfg=json&maj=on&min=on&mod=on&nx=on&" "year=now&month=x&ss=on&mf=on&c=on&geo=geoname&geonameid=3448439&m=50&s=on") @@ -21,7 +22,7 @@ class HanukkahEmbed(commands.Cog): self.hanukkah_months = [] self.hanukkah_years = [] - async def get_hanukkah_dates(self): + async def get_hanukkah_dates(self) -> List[str]: """Gets the dates for hanukkah festival.""" hanukkah_dates = [] async with self.bot.http_session.get(self.url) as response: @@ -34,7 +35,7 @@ class HanukkahEmbed(commands.Cog): return hanukkah_dates @commands.command(name='hanukkah', aliases=['chanukah']) - async def hanukkah_festival(self, ctx): + async def hanukkah_festival(self, ctx: commands.Context) -> None: """Tells you about the Hanukkah Festivaltime of festival, festival day, etc).""" hanukkah_dates = await self.get_hanukkah_dates() self.hanukkah_dates_split(hanukkah_dates) @@ -98,7 +99,7 @@ class HanukkahEmbed(commands.Cog): await ctx.send(embed=embed) - def hanukkah_dates_split(self, hanukkah_dates): + def hanukkah_dates_split(self, hanukkah_dates: List[str]) -> None: """We are splitting the dates for hanukkah into days, months and years.""" for date in hanukkah_dates: self.hanukkah_days.append(date[8:10]) @@ -106,7 +107,7 @@ class HanukkahEmbed(commands.Cog): self.hanukkah_years.append(date[0:4]) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Cog load.""" bot.add_cog(HanukkahEmbed(bot)) log.info("Hanukkah embed cog loaded") diff --git a/bot/seasons/easter/april_fools_vids.py b/bot/seasons/easter/april_fools_vids.py index d921d07c..4869f510 100644 --- a/bot/seasons/easter/april_fools_vids.py +++ b/bot/seasons/easter/april_fools_vids.py @@ -11,13 +11,13 @@ log = logging.getLogger(__name__) class AprilFoolVideos(commands.Cog): """A cog for April Fools' that gets a random April Fools' video from Youtube.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.yt_vids = self.load_json() self.youtubers = ['google'] # will add more in future @staticmethod - def load_json(): + def load_json() -> dict: """A function to load JSON data.""" p = Path('bot/resources/easter/april_fools_vids.json') with p.open() as json_file: @@ -25,7 +25,7 @@ class AprilFoolVideos(commands.Cog): return all_vids @commands.command(name='fool') - async def aprial_fools(self, ctx): + async def april_fools(self, ctx: commands.Context) -> None: """Get a random April Fools' video from Youtube.""" random_youtuber = random.choice(self.youtubers) category = self.yt_vids[random_youtuber] @@ -33,7 +33,7 @@ class AprilFoolVideos(commands.Cog): await ctx.send(f"Check out this April Fools' video by {random_youtuber}.\n\n{random_vid['link']}") -def setup(bot): +def setup(bot: commands.Bot) -> None: """April Fools' Cog load.""" bot.add_cog(AprilFoolVideos(bot)) log.info('April Fools videos cog loaded!') diff --git a/bot/seasons/easter/avatar_easterifier.py b/bot/seasons/easter/avatar_easterifier.py index ad8b5473..e21e35fc 100644 --- a/bot/seasons/easter/avatar_easterifier.py +++ b/bot/seasons/easter/avatar_easterifier.py @@ -2,7 +2,7 @@ import asyncio import logging from io import BytesIO from pathlib import Path -from typing import Union +from typing import Tuple, Union import discord from PIL import Image @@ -21,11 +21,11 @@ COLOURS = [ class AvatarEasterifier(commands.Cog): """Put an Easter spin on your avatar or image!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @staticmethod - def closest(x): + def closest(x: Tuple[int, int, int]) -> Tuple[int, int, int]: """ Finds the closest easter colour to a given pixel. @@ -33,8 +33,8 @@ class AvatarEasterifier(commands.Cog): """ r1, g1, b1 = x - def distance(point): - """Finds the difference between a pastel colour and the original pixel colour""" + def distance(point: Tuple[int, int, int]) -> Tuple[int, int, int]: + """Finds the difference between a pastel colour and the original pixel colour.""" r2, g2, b2 = point return ((r1 - r2)**2 + (g1 - g2)**2 + (b1 - b2)**2) @@ -47,7 +47,7 @@ class AvatarEasterifier(commands.Cog): return (r, g, b) @commands.command(pass_context=True, aliases=["easterify"]) - async def avatareasterify(self, ctx, *colours: Union[discord.Colour, str]): + async def avatareasterify(self, ctx: commands.Context, *colours: Union[discord.Colour, str]) -> None: """ This "Easterifies" the user's avatar. @@ -56,7 +56,7 @@ class AvatarEasterifier(commands.Cog): Colours are split by spaces, unless you wrap the colour name in double quotes. Discord colour names, HTML colour names, XKCD colour names and hex values are accepted. """ - async def send(*args, **kwargs): + async def send(*args, **kwargs) -> str: """ This replaces the original ctx.send. @@ -123,7 +123,7 @@ class AvatarEasterifier(commands.Cog): await ctx.send(file=file, embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Avatar Easterifier Cog load.""" bot.add_cog(AvatarEasterifier(bot)) log.info("AvatarEasterifier cog loaded") diff --git a/bot/seasons/easter/bunny_name_generator.py b/bot/seasons/easter/bunny_name_generator.py index 76d5c478..97c467e1 100644 --- a/bot/seasons/easter/bunny_name_generator.py +++ b/bot/seasons/easter/bunny_name_generator.py @@ -3,6 +3,7 @@ import logging import random import re from pathlib import Path +from typing import List, Union from discord.ext import commands @@ -15,21 +16,21 @@ with Path("bot/resources/easter/bunny_names.json").open("r", encoding="utf8") as class BunnyNameGenerator(commands.Cog): """Generate a random bunny name, or bunnify your Discord username!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot - def find_separators(self, displayname): + def find_separators(self, displayname: str) -> Union[List[str], None]: """Check if Discord name contains spaces so we can bunnify an individual word in the name.""" new_name = re.split(r'[_.\s]', displayname) if displayname not in new_name: return new_name - def find_vowels(self, displayname): + def find_vowels(self, displayname: str) -> str: """ Finds vowels in the user's display name. - If the Discord name contains a vowel and the letter y, - it will match one or more of these patterns. + If the Discord name contains a vowel and the letter y, it will match one or more of these patterns. + Only the most recently matched pattern will apply the changes. """ expressions = [ @@ -45,8 +46,8 @@ class BunnyNameGenerator(commands.Cog): if new_name != displayname: return new_name - def append_name(self, displayname): - """Adds a suffix to the end of the Discord name""" + def append_name(self, displayname: str) -> str: + """Adds a suffix to the end of the Discord name.""" extensions = ['foot', 'ear', 'nose', 'tail'] suffix = random.choice(extensions) appended_name = displayname + suffix @@ -54,13 +55,13 @@ class BunnyNameGenerator(commands.Cog): return appended_name @commands.command() - async def bunnyname(self, ctx): - """Picks a random bunny name from a JSON file""" + async def bunnyname(self, ctx: commands.Context) -> None: + """Picks a random bunny name from a JSON file.""" await ctx.send(random.choice(BUNNY_NAMES["names"])) @commands.command() - async def bunnifyme(self, ctx): - """Gets your Discord username and bunnifies it""" + async def bunnifyme(self, ctx: commands.Context) -> None: + """Gets your Discord username and bunnifies it.""" username = ctx.message.author.display_name # If name contains spaces or other separators, get the individual words to randomly bunnify @@ -86,7 +87,7 @@ class BunnyNameGenerator(commands.Cog): await ctx.send(bunnified_name) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Bunny Name Generator Cog load.""" bot.add_cog(BunnyNameGenerator(bot)) log.info("BunnyNameGenerator cog loaded.") diff --git a/bot/seasons/easter/conversationstarters.py b/bot/seasons/easter/conversationstarters.py index c2cdf26c..3f38ae82 100644 --- a/bot/seasons/easter/conversationstarters.py +++ b/bot/seasons/easter/conversationstarters.py @@ -14,16 +14,16 @@ with open(Path("bot/resources/easter/starter.json"), "r", encoding="utf8") as f: class ConvoStarters(commands.Cog): """Easter conversation topics.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command() - async def topic(self, ctx): + async def topic(self, ctx: commands.Context) -> None: """Responds with a random topic to start a conversation.""" await ctx.send(random.choice(starters['starters'])) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Conversation starters Cog load.""" bot.add_cog(ConvoStarters(bot)) log.info("ConvoStarters cog loaded") diff --git a/bot/seasons/easter/easter_riddle.py b/bot/seasons/easter/easter_riddle.py index 56555586..4b98b204 100644 --- a/bot/seasons/easter/easter_riddle.py +++ b/bot/seasons/easter/easter_riddle.py @@ -20,14 +20,14 @@ TIMELIMIT = 10 class EasterRiddle(commands.Cog): """This cog contains the command for the Easter quiz!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.winners = [] self.correct = "" self.current_channel = None @commands.command(aliases=["riddlemethis", "riddleme"]) - async def riddle(self, ctx): + async def riddle(self, ctx: commands.Context) -> None: """ Gives a random riddle, then provides 2 hints at certain intervals before revealing the answer. @@ -83,8 +83,8 @@ class EasterRiddle(commands.Cog): self.current_channel = None @commands.Cog.listener() - async def on_message(self, message): - """If a non-bot user enters a correct answer, their username gets added to self.winners""" + async def on_message(self, message: discord.Messaged) -> None: + """If a non-bot user enters a correct answer, their username gets added to self.winners.""" if self.current_channel != message.channel: return @@ -95,7 +95,7 @@ class EasterRiddle(commands.Cog): self.winners.append(message.author.mention) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Easter Riddle Cog load.""" bot.add_cog(EasterRiddle(bot)) log.info("Easter Riddle bot loaded") diff --git a/bot/seasons/easter/egg_decorating.py b/bot/seasons/easter/egg_decorating.py index ee8a80e5..51f52264 100644 --- a/bot/seasons/easter/egg_decorating.py +++ b/bot/seasons/easter/egg_decorating.py @@ -31,11 +31,11 @@ IRREPLACEABLE = [ class EggDecorating(commands.Cog): """Decorate some easter eggs!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot) -> None: self.bot = bot @staticmethod - def replace_invalid(colour: str): + def replace_invalid(colour: str) -> Union[int, None]: """Attempts to match with HTML or XKCD colour names, returning the int value.""" with suppress(KeyError): return int(HTML_COLOURS[colour], 16) @@ -44,7 +44,9 @@ class EggDecorating(commands.Cog): return None @commands.command(aliases=["decorateegg"]) - async def eggdecorate(self, ctx, *colours: Union[discord.Colour, str]): + async def eggdecorate( + self, ctx: commands.Context, *colours: Union[discord.Colour, str] + ) -> Union[Image, discord.Message]: """ Picks a random egg design and decorates it using the given colours. @@ -111,7 +113,7 @@ class EggDecorating(commands.Cog): return new_im -def setup(bot): +def setup(bot: commands.bot) -> None: """Egg decorating Cog load.""" bot.add_cog(EggDecorating(bot)) log.info("EggDecorating cog loaded.") diff --git a/bot/seasons/easter/egg_facts.py b/bot/seasons/easter/egg_facts.py index ae08ccd4..9e6fb1cb 100644 --- a/bot/seasons/easter/egg_facts.py +++ b/bot/seasons/easter/egg_facts.py @@ -21,18 +21,18 @@ class EasterFacts(commands.Cog): It also contains a background task which sends an easter egg fact in the event channel everyday. """ - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.facts = self.load_json() @staticmethod - def load_json(): + def load_json() -> dict: """Load a list of easter egg facts from the resource JSON file.""" p = Path("bot/resources/easter/easter_egg_facts.json") with p.open(encoding="utf8") as f: return load(f) - async def send_egg_fact_daily(self): + async def send_egg_fact_daily(self) -> None: """A background task that sends an easter egg fact in the event channel everyday.""" channel = self.bot.get_channel(Channels.seasonalbot_chat) while True: @@ -41,12 +41,12 @@ class EasterFacts(commands.Cog): await asyncio.sleep(24 * 60 * 60) @commands.command(name='eggfact', aliases=['fact']) - async def easter_facts(self, ctx): + async def easter_facts(self, ctx: commands.Context) -> None: """Get easter egg facts.""" embed = self.make_embed() await ctx.send(embed=embed) - def make_embed(self): + def make_embed(self) -> discord.Embed: """Makes a nice embed for the message to be sent.""" return discord.Embed( colour=Colours.soft_red, @@ -55,7 +55,7 @@ class EasterFacts(commands.Cog): ) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Easter Egg facts cog load.""" bot.loop.create_task(EasterFacts(bot).send_egg_fact_daily()) bot.add_cog(EasterFacts(bot)) diff --git a/bot/seasons/easter/egg_hunt/__init__.py b/bot/seasons/easter/egg_hunt/__init__.py deleted file mode 100644 index 0e4b9e16..00000000 --- a/bot/seasons/easter/egg_hunt/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -import logging - -from .cog import EggHunt - -log = logging.getLogger(__name__) - - -def setup(bot): - """Easter Egg Hunt Cog load.""" - bot.add_cog(EggHunt()) - log.info("EggHunt cog loaded") diff --git a/bot/seasons/easter/egg_hunt/cog.py b/bot/seasons/easter/egg_hunt/cog.py deleted file mode 100644 index a4ad27df..00000000 --- a/bot/seasons/easter/egg_hunt/cog.py +++ /dev/null @@ -1,618 +0,0 @@ -import asyncio -import contextlib -import logging -import random -import sqlite3 -from datetime import datetime, timezone -from pathlib import Path - -import discord -from discord.ext import commands - -from bot.bot import bot -from bot.constants import Channels, Client, Roles as MainRoles -from bot.decorators import with_role -from .constants import Colours, EggHuntSettings, Emoji, Roles - -log = logging.getLogger(__name__) - -DB_PATH = Path("bot/resources/persist/egg_hunt.sqlite") - -TEAM_MAP = { - Roles.white: Emoji.egg_white, - Roles.blurple: Emoji.egg_blurple, - Emoji.egg_white: Roles.white, - Emoji.egg_blurple: Roles.blurple -} - -GUILD = bot.get_guild(Client.guild) - -MUTED = GUILD.get_role(MainRoles.muted) - - -def get_team_role(user: discord.Member) -> discord.Role: - """Helper function to get the team role for a member.""" - if Roles.white in user.roles: - return Roles.white - if Roles.blurple in user.roles: - return Roles.blurple - - -async def assign_team(user: discord.Member) -> discord.Member: - """Helper function to assign a new team role for a member.""" - db = sqlite3.connect(DB_PATH) - c = db.cursor() - c.execute(f"SELECT team FROM user_scores WHERE user_id = {user.id}") - result = c.fetchone() - if not result: - c.execute( - "SELECT team, COUNT(*) AS count FROM user_scores " - "GROUP BY team ORDER BY count ASC LIMIT 1;" - ) - result = c.fetchone() - result = result[0] if result else "WHITE" - - if result[0] == "WHITE": - new_team = Roles.white - else: - new_team = Roles.blurple - - db.close() - - log.debug(f"Assigned role {new_team} to {user}.") - - await user.add_roles(new_team) - return GUILD.get_member(user.id) - - -class EggMessage: - """Handles a single egg reaction drop session.""" - - def __init__(self, message: discord.Message, egg: discord.Emoji): - self.message = message - self.egg = egg - self.first = None - self.users = set() - self.teams = {Roles.white: "WHITE", Roles.blurple: "BLURPLE"} - self.new_team_assignments = {} - self.timeout_task = None - - @staticmethod - def add_user_score_sql(user_id: int, team: str, score: int) -> str: - """Builds the SQL for adding a score to a user in the database.""" - return ( - "INSERT INTO user_scores(user_id, team, score)" - f"VALUES({user_id}, '{team}', {score})" - f"ON CONFLICT (user_id) DO UPDATE SET score=score+{score}" - ) - - @staticmethod - def add_team_score_sql(team_name: str, score: int) -> str: - """Builds the SQL for adding a score to a team in the database.""" - return f"UPDATE team_scores SET team_score=team_score+{score} WHERE team_id='{team_name}'" - - def finalise_score(self): - """Sums and actions scoring for this egg drop session.""" - db = sqlite3.connect(DB_PATH) - c = db.cursor() - - team_scores = {"WHITE": 0, "BLURPLE": 0} - - first_team = get_team_role(self.first) - if not first_team: - log.debug("User without team role!") - db.close() - return - - score = 3 if first_team == TEAM_MAP[first_team] else 2 - - c.execute(self.add_user_score_sql(self.first.id, self.teams[first_team], score)) - team_scores[self.teams[first_team]] += score - - for user in self.users: - team = get_team_role(user) - if not team: - log.debug("User without team role!") - continue - - team_name = self.teams[team] - team_scores[team_name] += 1 - score = 2 if team == first_team else 1 - c.execute(self.add_user_score_sql(user.id, team_name, score)) - - for team_name, score in team_scores.items(): - if not score: - continue - c.execute(self.add_team_score_sql(team_name, score)) - - db.commit() - db.close() - - log.debug( - f"EggHunt session finalising: ID({self.message.id}) " - f"FIRST({self.first}) REST({self.users})." - ) - - async def start_timeout(self, seconds: int = 5): - """Begins a task that will sleep until the given seconds before finalizing the session.""" - if self.timeout_task: - self.timeout_task.cancel() - self.timeout_task = None - - await asyncio.sleep(seconds) - - bot.remove_listener(self.collect_reacts, name="on_reaction_add") - - with contextlib.suppress(discord.Forbidden): - await self.message.clear_reactions() - - if self.first: - self.finalise_score() - - def is_valid_react(self, reaction: discord.Reaction, user: discord.Member) -> bool: - """Validates a reaction event was meant for this session.""" - if user.bot: - return False - if reaction.message.id != self.message.id: - return False - if reaction.emoji != self.egg: - return False - - # Ignore the punished - if MUTED in user.roles: - return False - - return True - - async def collect_reacts(self, reaction: discord.Reaction, user: discord.Member): - """Handles emitted reaction_add events via listener.""" - if not self.is_valid_react(reaction, user): - return - - team = get_team_role(user) - if not team: - log.debug(f"Assigning a team for {user}.") - user = await assign_team(user) - - if not self.first: - log.debug(f"{user} was first to react to egg on {self.message.id}.") - self.first = user - await self.start_timeout() - else: - if user != self.first: - self.users.add(user) - - async def start(self): - """Starts the egg drop session.""" - log.debug(f"EggHunt session started for message {self.message.id}.") - bot.add_listener(self.collect_reacts, name="on_reaction_add") - with contextlib.suppress(discord.Forbidden): - await self.message.add_reaction(self.egg) - self.timeout_task = asyncio.create_task(self.start_timeout(300)) - while True: - if not self.timeout_task: - break - if not self.timeout_task.done(): - await self.timeout_task - else: - # make sure any exceptions raise if necessary - self.timeout_task.result() - break - - -class SuperEggMessage(EggMessage): - """Handles a super egg session.""" - - def __init__(self, message: discord.Message, egg: discord.Emoji, window: int): - super().__init__(message, egg) - self.window = window - - async def finalise_score(self): - """Sums and actions scoring for this super egg session.""" - try: - message = await self.message.channel.fetch_message(self.message.id) - except discord.NotFound: - return - - count = 0 - white = 0 - blurple = 0 - react_users = [] - for reaction in message.reactions: - if reaction.emoji == self.egg: - react_users = await reaction.users().flatten() - for user in react_users: - team = get_team_role(user) - if team == Roles.white: - white += 1 - elif team == Roles.blurple: - blurple += 1 - count = reaction.count - 1 - break - - score = 50 if self.egg == Emoji.egg_gold else 100 - if white == blurple: - log.debug("Tied SuperEgg Result.") - team = None - score /= 2 - elif white > blurple: - team = Roles.white - else: - team = Roles.blurple - - embed = self.message.embeds[0] - - db = sqlite3.connect(DB_PATH) - c = db.cursor() - - user_bonus = 5 if self.egg == Emoji.egg_gold else 10 - for user in react_users: - if user.bot: - continue - role = get_team_role(user) - if not role: - print("issue") - user_score = 1 if user != self.first else user_bonus - c.execute(self.add_user_score_sql(user.id, self.teams[role], user_score)) - - if not team: - embed.description = f"{embed.description}\n\nA Tie!\nBoth got {score} points!" - c.execute(self.add_team_score_sql(self.teams[Roles.white], score)) - c.execute(self.add_team_score_sql(self.teams[Roles.blurple], score)) - team_name = "TIE" - else: - team_name = self.teams[team] - embed.description = ( - f"{embed.description}\n\nTeam {team_name.capitalize()} won the points!" - ) - c.execute(self.add_team_score_sql(team_name, score)) - - c.execute( - "INSERT INTO super_eggs (message_id, egg_type, team, window) " - f"VALUES ({self.message.id}, '{self.egg.name}', '{team_name}', {self.window});" - ) - - log.debug("Committing Super Egg scores.") - db.commit() - db.close() - - embed.set_footer(text=f"Finished with {count} total reacts.") - with contextlib.suppress(discord.HTTPException): - await self.message.edit(embed=embed) - - async def start_timeout(self, seconds=None): - """Starts the super egg session.""" - if not seconds: - return - count = 4 - for _ in range(count): - await asyncio.sleep(60) - embed = self.message.embeds[0] - embed.set_footer(text=f"Finishing in {count} minutes.") - try: - await self.message.edit(embed=embed) - except discord.HTTPException: - break - count -= 1 - bot.remove_listener(self.collect_reacts, name="on_reaction_add") - await self.finalise_score() - - -class EggHunt(commands.Cog): - """Easter Egg Hunt Event.""" - - def __init__(self): - self.event_channel = GUILD.get_channel(Channels.seasonalbot_chat) - self.super_egg_buffer = 60*60 - self.tables = { - "super_eggs": ( - "CREATE TABLE super_eggs (" - "message_id INTEGER NOT NULL " - " CONSTRAINT super_eggs_pk PRIMARY KEY, " - "egg_type TEXT NOT NULL, " - "team TEXT NOT NULL, " - "window INTEGER);" - ), - "team_scores": ( - "CREATE TABLE team_scores (" - "team_id TEXT, " - "team_score INTEGER DEFAULT 0);" - ), - "user_scores": ( - "CREATE TABLE user_scores(" - "user_id INTEGER NOT NULL " - " CONSTRAINT user_scores_pk PRIMARY KEY, " - "team TEXT NOT NULL, " - "score INTEGER DEFAULT 0 NOT NULL);" - ), - "react_logs": ( - "CREATE TABLE react_logs(" - "member_id INTEGER NOT NULL, " - "message_id INTEGER NOT NULL, " - "reaction_id TEXT NOT NULL, " - "react_timestamp REAL NOT NULL);" - ) - } - self.prepare_db() - self.task = asyncio.create_task(self.super_egg()) - self.task.add_done_callback(self.task_cleanup) - - def prepare_db(self): - """Ensures database tables all exist and if not, creates them.""" - db = sqlite3.connect(DB_PATH) - c = db.cursor() - - exists_sql = "SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';" - - missing_tables = [] - for table in self.tables: - c.execute(exists_sql.format(table_name=table)) - result = c.fetchone() - if not result: - missing_tables.append(table) - - for table in missing_tables: - log.info(f"Table {table} is missing, building new one.") - c.execute(self.tables[table]) - - db.commit() - db.close() - - def task_cleanup(self, task): - """Returns task result and restarts. Used as a done callback to show raised exceptions.""" - task.result() - self.task = asyncio.create_task(self.super_egg()) - - @staticmethod - def current_timestamp() -> float: - """Returns a timestamp of the current UTC time.""" - return datetime.utcnow().replace(tzinfo=timezone.utc).timestamp() - - async def super_egg(self): - """Manages the timing of super egg drops.""" - while True: - now = int(self.current_timestamp()) - - if now > EggHuntSettings.end_time: - log.debug("Hunt ended. Ending task.") - break - - if now < EggHuntSettings.start_time: - remaining = EggHuntSettings.start_time - now - log.debug(f"Hunt not started yet. Sleeping for {remaining}.") - await asyncio.sleep(remaining) - - log.debug(f"Hunt started.") - - db = sqlite3.connect(DB_PATH) - c = db.cursor() - - current_window = None - next_window = None - windows = EggHuntSettings.windows.copy() - windows.insert(0, EggHuntSettings.start_time) - for i, window in enumerate(windows): - c.execute(f"SELECT COUNT(*) FROM super_eggs WHERE window={window}") - already_dropped = c.fetchone()[0] - - if already_dropped: - log.debug(f"Window {window} already dropped, checking next one.") - continue - - if now < window: - log.debug("Drop windows up to date, sleeping until next one.") - await asyncio.sleep(window-now) - now = int(self.current_timestamp()) - - current_window = window - next_window = windows[i+1] - break - - count = c.fetchone() - db.close() - - if not current_window: - log.debug("No drop windows left, ending task.") - break - - log.debug(f"Current Window: {current_window}. Next Window {next_window}") - - if not count: - if next_window < now: - log.debug("An Egg Drop Window was missed, dropping one now.") - next_drop = 0 - else: - next_drop = random.randrange(now, next_window) - - if next_drop: - log.debug(f"Sleeping until next super egg drop: {next_drop}.") - await asyncio.sleep(next_drop) - - if random.randrange(10) <= 2: - egg = Emoji.egg_diamond - egg_type = "Diamond" - score = "100" - colour = Colours.diamond - else: - egg = Emoji.egg_gold - egg_type = "Gold" - score = "50" - colour = Colours.gold - - embed = discord.Embed( - title=f"A {egg_type} Egg Has Appeared!", - description=f"**Worth {score} team points!**\n\n" - "The team with the most reactions after 5 minutes wins!", - colour=colour - ) - embed.set_thumbnail(url=egg.url) - embed.set_footer(text="Finishing in 5 minutes.") - msg = await self.event_channel.send(embed=embed) - await SuperEggMessage(msg, egg, current_window).start() - - log.debug("Sleeping until next window.") - next_loop = max(next_window - int(self.current_timestamp()), self.super_egg_buffer) - await asyncio.sleep(next_loop) - - @commands.Cog.listener() - async def on_raw_reaction_add(self, payload): - """Reaction event listener for reaction logging for later anti-cheat analysis.""" - if payload.channel_id not in EggHuntSettings.allowed_channels: - return - - now = self.current_timestamp() - db = sqlite3.connect(DB_PATH) - c = db.cursor() - c.execute( - "INSERT INTO react_logs(member_id, message_id, reaction_id, react_timestamp) " - f"VALUES({payload.user_id}, {payload.message_id}, '{payload.emoji}', {now})" - ) - db.commit() - db.close() - - @commands.Cog.listener() - async def on_message(self, message): - """Message event listener for random egg drops.""" - if self.current_timestamp() < EggHuntSettings.start_time: - return - - if message.channel.id not in EggHuntSettings.allowed_channels: - log.debug("Message not in Egg Hunt channel; ignored.") - return - - if message.author.bot: - return - - if random.randrange(100) <= 5: - await EggMessage(message, random.choice([Emoji.egg_white, Emoji.egg_blurple])).start() - - @commands.group(invoke_without_command=True) - async def hunt(self, ctx): - """ - For 48 hours, hunt down as many eggs randomly appearing as possible. - - Standard Eggs - -------------- - Egg React: +1pt - Team Bonus for Claimed Egg: +1pt - First React on Other Team Egg: +1pt - First React on Your Team Egg: +2pt - - If you get first react, you will claim that egg for your team, allowing - your team to get the Team Bonus point, but be quick, as the egg will - disappear after 5 seconds of the first react. - - Super Eggs - ----------- - Gold Egg: 50 team pts, 5pts to first react - Diamond Egg: 100 team pts, 10pts to first react - - Super Eggs only appear in #seasonalbot-chat so be sure to keep an eye - out. They stay around for 5 minutes and the team with the most reacts - wins the points. - """ - await ctx.invoke(bot.get_command("help"), command="hunt") - - @hunt.command() - async def countdown(self, ctx): - """Show the time status of the Egg Hunt event.""" - now = self.current_timestamp() - if now > EggHuntSettings.end_time: - return await ctx.send("The Hunt has ended.") - - difference = EggHuntSettings.start_time - now - if difference < 0: - difference = EggHuntSettings.end_time - now - msg = "The Egg Hunt will end in" - else: - msg = "The Egg Hunt will start in" - - hours, r = divmod(difference, 3600) - minutes, r = divmod(r, 60) - await ctx.send(f"{msg} {hours:.0f}hrs, {minutes:.0f}mins & {r:.0f}secs") - - @hunt.command() - async def leaderboard(self, ctx): - """Show the Egg Hunt Leaderboards.""" - db = sqlite3.connect(DB_PATH) - c = db.cursor() - c.execute(f"SELECT *, RANK() OVER(ORDER BY score DESC) AS rank FROM user_scores LIMIT 10") - user_result = c.fetchall() - c.execute(f"SELECT * FROM team_scores ORDER BY team_score DESC") - team_result = c.fetchall() - db.close() - output = [] - if user_result: - # Get the alignment needed for the score - score_lengths = [] - for result in user_result: - length = len(str(result[2])) - score_lengths.append(length) - - score_length = max(score_lengths) - for user_id, team, score, rank in user_result: - user = GUILD.get_member(user_id) or user_id - team = team.capitalize() - score = f"{score}pts" - output.append(f"{rank:>2}. {score:>{score_length+3}} - {user} ({team})") - user_board = "\n".join(output) - else: - user_board = "No entries." - if team_result: - output = [] - for team, score in team_result: - output.append(f"{team:<7}: {score}") - team_board = "\n".join(output) - else: - team_board = "No entries." - embed = discord.Embed( - title="Egg Hunt Leaderboards", - description=f"**Team Scores**\n```\n{team_board}\n```\n" - f"**Top 10 Members**\n```\n{user_board}\n```" - ) - await ctx.send(embed=embed) - - @hunt.command() - async def rank(self, ctx, *, member: discord.Member = None): - """Get your ranking in the Egg Hunt Leaderboard.""" - member = member or ctx.author - db = sqlite3.connect(DB_PATH) - c = db.cursor() - c.execute( - "SELECT rank FROM " - "(SELECT RANK() OVER(ORDER BY score DESC) AS rank, user_id FROM user_scores)" - f"WHERE user_id = {member.id};" - ) - result = c.fetchone() - db.close() - if not result: - embed = discord.Embed().set_author(name=f"Egg Hunt - No Ranking") - else: - embed = discord.Embed().set_author(name=f"Egg Hunt - Rank #{result[0]}") - await ctx.send(embed=embed) - - @with_role(MainRoles.admin) - @hunt.command() - async def clear_db(self, ctx): - """Resets the database to it's initial state.""" - def check(msg): - if msg.author != ctx.author: - return False - if msg.channel != ctx.channel: - return False - return True - await ctx.send( - "WARNING: This will delete all current event data.\n" - "Please verify this action by replying with 'Yes, I want to delete all data.'" - ) - reply_msg = await bot.wait_for('message', check=check) - if reply_msg.content != "Yes, I want to delete all data.": - return await ctx.send("Reply did not match. Aborting database deletion.") - db = sqlite3.connect(DB_PATH) - c = db.cursor() - c.execute("DELETE FROM super_eggs;") - c.execute("DELETE FROM user_scores;") - c.execute("UPDATE team_scores SET team_score=0") - db.commit() - db.close() - await ctx.send("Database successfully cleared.") diff --git a/bot/seasons/easter/egg_hunt/constants.py b/bot/seasons/easter/egg_hunt/constants.py deleted file mode 100644 index 02f6e9f2..00000000 --- a/bot/seasons/easter/egg_hunt/constants.py +++ /dev/null @@ -1,40 +0,0 @@ -import os - -from discord import Colour - -from bot.bot import bot -from bot.constants import Channels, Client - - -GUILD = bot.get_guild(Client.guild) - - -class EggHuntSettings: - start_time = int(os.environ["HUNT_START"]) - end_time = start_time + 172800 # 48 hrs later - windows = [int(w) for w in os.environ.get("HUNT_WINDOWS").split(',')] or [] - allowed_channels = [ - Channels.seasonalbot_chat, - Channels.off_topic_0, - Channels.off_topic_1, - Channels.off_topic_2, - ] - - -class Roles: - white = GUILD.get_role(569304397054607363) - blurple = GUILD.get_role(569304472820514816) - - -class Emoji: - egg_white = bot.get_emoji(569266762428841989) - egg_blurple = bot.get_emoji(569266666094067819) - egg_gold = bot.get_emoji(569266900106739712) - egg_diamond = bot.get_emoji(569266839738384384) - - -class Colours: - white = Colour(0xFFFFFF) - blurple = Colour(0x7289DA) - gold = Colour(0xE4E415) - diamond = Colour(0xECF5FF) diff --git a/bot/seasons/easter/egghead_quiz.py b/bot/seasons/easter/egghead_quiz.py index 3e0cc598..bd179fe2 100644 --- a/bot/seasons/easter/egghead_quiz.py +++ b/bot/seasons/easter/egghead_quiz.py @@ -3,6 +3,7 @@ import logging import random from json import load from pathlib import Path +from typing import Union import discord from discord.ext import commands @@ -30,14 +31,14 @@ TIMELIMIT = 30 class EggheadQuiz(commands.Cog): """This cog contains the command for the Easter quiz!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot) -> None: self.bot = bot self.quiz_messages = {} @commands.command(aliases=["eggheadquiz", "easterquiz"]) - async def eggquiz(self, ctx): + async def eggquiz(self, ctx: commands.Context) -> None: """ - Gives a random quiz question, waits 30 seconds and then outputs the answer + Gives a random quiz question, waits 30 seconds and then outputs the answer. Also informs of the percentages and votes of each option """ @@ -95,14 +96,14 @@ class EggheadQuiz(commands.Cog): await ctx.send(content, embed=a_embed) @staticmethod - async def already_reacted(message, user): - """Returns whether a given user has reacted more than once to a given message""" + async def already_reacted(message: discord.Message, user: Union[discord.Member, discord.User]) -> bool: + """Returns whether a given user has reacted more than once to a given message.""" users = [u.id for reaction in [await r.users().flatten() for r in message.reactions] for u in reaction] return users.count(user.id) > 1 # Old reaction plus new reaction @commands.Cog.listener() - async def on_reaction_add(self, reaction, user): - """Listener to listen specifically for reactions of quiz messages""" + async def on_reaction_add(self, reaction: discord.Reaction, user: Union[discord.Member, discord.User]) -> None: + """Listener to listen specifically for reactions of quiz messages.""" if user.bot: return if reaction.message.id not in self.quiz_messages: @@ -113,7 +114,7 @@ class EggheadQuiz(commands.Cog): return await reaction.message.remove_reaction(reaction, user) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Egghead Quiz Cog load.""" bot.add_cog(EggheadQuiz(bot)) log.info("EggheadQuiz bot loaded") diff --git a/bot/seasons/easter/traditions.py b/bot/seasons/easter/traditions.py index f04b8828..9529823f 100644 --- a/bot/seasons/easter/traditions.py +++ b/bot/seasons/easter/traditions.py @@ -14,18 +14,18 @@ with open(Path("bot/resources/easter/traditions.json"), "r", encoding="utf8") as class Traditions(commands.Cog): """A cog which allows users to get a random easter tradition or custom from a random country.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(aliases=('eastercustoms',)) - async def easter_tradition(self, ctx): - """Responds with a random tradition or custom""" + async def easter_tradition(self, ctx: commands.Context) -> None: + """Responds with a random tradition or custom.""" random_country = random.choice(list(traditions)) await ctx.send(f"{random_country}:\n{traditions[random_country]}") -def setup(bot): +def setup(bot: commands.Bot) -> None: """Traditions Cog load.""" bot.add_cog(Traditions(bot)) log.info("Traditions cog loaded") diff --git a/bot/seasons/evergreen/8bitify.py b/bot/seasons/evergreen/8bitify.py index 54db71db..60062fc1 100644 --- a/bot/seasons/evergreen/8bitify.py +++ b/bot/seasons/evergreen/8bitify.py @@ -13,17 +13,17 @@ class EightBitify(commands.Cog): @staticmethod def pixelate(image: Image) -> Image: - """Takes an image and pixelates it""" + """Takes an image and pixelates it.""" return image.resize((32, 32)).resize((1024, 1024)) @staticmethod def quantize(image: Image) -> Image: - """Reduces colour palette to 256 colours""" + """Reduces colour palette to 256 colours.""" return image.quantize(colors=32) @commands.command(name="8bitify") async def eightbit_command(self, ctx: commands.Context) -> None: - """Pixelates your avatar and changes the palette to an 8bit one""" + """Pixelates your avatar and changes the palette to an 8bit one.""" async with ctx.typing(): image_bytes = await ctx.author.avatar_url.read() avatar = Image.open(BytesIO(image_bytes)) diff --git a/bot/seasons/evergreen/error_handler.py b/bot/seasons/evergreen/error_handler.py index 6690cf89..120462ee 100644 --- a/bot/seasons/evergreen/error_handler.py +++ b/bot/seasons/evergreen/error_handler.py @@ -4,7 +4,7 @@ import random import sys
import traceback
-from discord import Colour, Embed
+from discord import Colour, Embed, Message
from discord.ext import commands
from bot.constants import NEGATIVE_REPLIES
@@ -16,11 +16,11 @@ log = logging.getLogger(__name__) class CommandErrorHandler(commands.Cog):
"""A error handler for the PythonDiscord server."""
- def __init__(self, bot):
+ def __init__(self, bot: commands.Bot):
self.bot = bot
@staticmethod
- def revert_cooldown_counter(command, message):
+ def revert_cooldown_counter(command: commands.Command, message: Message) -> None:
"""Undoes the last cooldown counter for user-error cases."""
if command._buckets.valid:
bucket = command._buckets.get_bucket(message)
@@ -30,7 +30,7 @@ class CommandErrorHandler(commands.Cog): )
@commands.Cog.listener()
- async def on_command_error(self, ctx, error):
+ async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None:
"""Activates when a command opens an error."""
if hasattr(ctx.command, 'on_error'):
return logging.debug(
@@ -113,7 +113,7 @@ class CommandErrorHandler(commands.Cog): traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
-def setup(bot):
+def setup(bot: commands.Bot) -> None:
"""Error handler Cog load."""
bot.add_cog(CommandErrorHandler(bot))
log.info("CommandErrorHandler cog loaded")
diff --git a/bot/seasons/evergreen/fun.py b/bot/seasons/evergreen/fun.py index ce3484f7..889ae079 100644 --- a/bot/seasons/evergreen/fun.py +++ b/bot/seasons/evergreen/fun.py @@ -1,21 +1,39 @@ +import functools import logging import random +from typing import Callable, Tuple, Union +from discord import Embed, Message from discord.ext import commands +from discord.ext.commands import Bot, Cog, Context, MessageConverter +from bot import utils from bot.constants import Emojis log = logging.getLogger(__name__) +UWU_WORDS = { + "fi": "fwi", + "l": "w", + "r": "w", + "some": "sum", + "th": "d", + "thing": "fing", + "tho": "fo", + "you're": "yuw'we", + "your": "yur", + "you": "yuw", +} -class Fun(commands.Cog): + +class Fun(Cog): """A collection of general commands for fun.""" - def __init__(self, bot): + def __init__(self, bot: Bot) -> None: self.bot = bot @commands.command() - async def roll(self, ctx, num_rolls: int = 1): + async def roll(self, ctx: Context, num_rolls: int = 1) -> None: """Outputs a number of random dice emotes (up to 6).""" output = "" if num_rolls > 6: @@ -27,8 +45,104 @@ class Fun(commands.Cog): output += getattr(Emojis, terning, '') await ctx.send(output) + @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) + async def uwu_command(self, ctx: Context, *, text: str) -> None: + """ + Converts a given `text` into it's uwu equivalent. + + Also accepts a valid discord Message ID or link. + """ + conversion_func = functools.partial( + utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True + ) + text, embed = await Fun._get_text_and_embed(ctx, text) + # Convert embed if it exists + if embed is not None: + embed = Fun._convert_embed(conversion_func, embed) + converted_text = conversion_func(text) + # Don't put >>> if only embed present + if converted_text: + converted_text = f">>> {converted_text.lstrip('> ')}" + await ctx.send(content=converted_text, embed=embed) + + @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) + async def randomcase_command(self, ctx: Context, *, text: str) -> None: + """ + Randomly converts the casing of a given `text`. + + Also accepts a valid discord Message ID or link. + """ + def conversion_func(text: str) -> str: + """Randomly converts the casing of a given string.""" + return "".join( + char.upper() if round(random.random()) else char.lower() for char in text + ) + text, embed = await Fun._get_text_and_embed(ctx, text) + # Convert embed if it exists + if embed is not None: + embed = Fun._convert_embed(conversion_func, embed) + converted_text = conversion_func(text) + # Don't put >>> if only embed present + if converted_text: + converted_text = f">>> {converted_text.lstrip('> ')}" + await ctx.send(content=converted_text, embed=embed) + + @staticmethod + async def _get_text_and_embed(ctx: Context, text: str) -> Tuple[str, Union[Embed, None]]: + """ + Attempts to extract the text and embed from a possible link to a discord Message. + + Returns a tuple of: + str: If `text` is a valid discord Message, the contents of the message, else `text`. + Union[Embed, None]: The embed if found in the valid Message, else None + """ + embed = None + message = await Fun._get_discord_message(ctx, text) + if isinstance(message, Message): + text = message.content + # Take first embed because we can't send multiple embeds + if message.embeds: + embed = message.embeds[0] + return (text, embed) + + @staticmethod + async def _get_discord_message(ctx: Context, text: str) -> Union[Message, str]: + """ + Attempts to convert a given `text` to a discord Message object and return it. + + Conversion will succeed if given a discord Message ID or link. + Returns `text` if the conversion fails. + """ + try: + text = await MessageConverter().convert(ctx, text) + except commands.BadArgument: + log.debug(f"Input '{text:.20}...' is not a valid Discord Message") + return text + + @staticmethod + def _convert_embed(func: Callable[[str, ], str], embed: Embed) -> Embed: + """ + Converts the text in an embed using a given conversion function, then return the embed. + + Only modifies the following fields: title, description, footer, fields + """ + embed_dict = embed.to_dict() + + embed_dict["title"] = func(embed_dict.get("title", "")) + embed_dict["description"] = func(embed_dict.get("description", "")) + + if "footer" in embed_dict: + embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", "")) + + if "fields" in embed_dict: + for field in embed_dict["fields"]: + field["name"] = func(field.get("name", "")) + field["value"] = func(field.get("value", "")) + + return Embed.from_dict(embed_dict) + -def setup(bot): +def setup(bot: commands.Bot) -> None: """Fun Cog load.""" bot.add_cog(Fun(bot)) log.info("Fun cog loaded") diff --git a/bot/seasons/evergreen/issues.py b/bot/seasons/evergreen/issues.py index f19a1129..0ba74d9c 100644 --- a/bot/seasons/evergreen/issues.py +++ b/bot/seasons/evergreen/issues.py @@ -12,12 +12,14 @@ log = logging.getLogger(__name__) class Issues(commands.Cog): """Cog that allows users to retrieve issues from GitHub.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(aliases=("issues",)) @override_in_channel - async def issue(self, ctx, number: int, repository: str = "seasonalbot", user: str = "python-discord"): + async def issue( + self, ctx: commands.Context, number: int, repository: str = "seasonalbot", user: str = "python-discord" + ) -> None: """Command to retrieve issues from a GitHub repository.""" api_url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" failed_status = { @@ -49,7 +51,7 @@ class Issues(commands.Cog): await ctx.send(embed=issue_embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Github Issues Cog Load.""" bot.add_cog(Issues(bot)) log.info("Issues cog loaded") diff --git a/bot/seasons/evergreen/magic_8ball.py b/bot/seasons/evergreen/magic_8ball.py index 55652af7..e47ef454 100644 --- a/bot/seasons/evergreen/magic_8ball.py +++ b/bot/seasons/evergreen/magic_8ball.py @@ -11,13 +11,13 @@ log = logging.getLogger(__name__) class Magic8ball(commands.Cog): """A Magic 8ball command to respond to a user's question.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot with open(Path("bot/resources/evergreen/magic8ball.json"), "r") as file: self.answers = json.load(file) @commands.command(name="8ball") - async def output_answer(self, ctx, *, question): + async def output_answer(self, ctx: commands.Context, *, question: str) -> None: """Return a Magic 8ball answer from answers list.""" if len(question.split()) >= 3: answer = random.choice(self.answers) @@ -26,7 +26,7 @@ class Magic8ball(commands.Cog): await ctx.send("Usage: .8ball <question> (minimum length of 3 eg: `will I win?`)") -def setup(bot): +def setup(bot: commands.Bot) -> None: """Magic 8ball Cog load.""" bot.add_cog(Magic8ball(bot)) log.info("Magic8ball cog loaded") diff --git a/bot/seasons/evergreen/minesweeper.py b/bot/seasons/evergreen/minesweeper.py index cb859ea9..b0ba8145 100644 --- a/bot/seasons/evergreen/minesweeper.py +++ b/bot/seasons/evergreen/minesweeper.py @@ -32,8 +32,8 @@ log = logging.getLogger(__name__) class CoordinateConverter(commands.Converter): """Converter for Coordinates.""" - async def convert(self, ctx, coordinate: str) -> typing.Tuple[int, int]: - """Take in a coordinate string and turn it into x, y""" + async def convert(self, ctx: commands.Context, coordinate: str) -> typing.Tuple[int, int]: + """Take in a coordinate string and turn it into an (x, y) tuple.""" if not 2 <= len(coordinate) <= 3: raise commands.BadArgument('Invalid co-ordinate provided') @@ -80,8 +80,8 @@ class Minesweeper(commands.Cog): self.games: GamesDict = {} # Store the currently running games @commands.group(name='minesweeper', aliases=('ms',), invoke_without_command=True) - async def minesweeper_group(self, ctx: commands.Context): - """Commands for Playing Minesweeper""" + async def minesweeper_group(self, ctx: commands.Context) -> None: + """Commands for Playing Minesweeper.""" await ctx.send_help(ctx.command) @staticmethod @@ -175,7 +175,7 @@ class Minesweeper(commands.Cog): @commands.dm_only() @minesweeper_group.command(name="flag") async def flag_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: - """Place multiple flags on the board""" + """Place multiple flags on the board.""" board: GameBoard = self.games[ctx.author.id].revealed for x, y in coordinates: if board[y][x] == "hidden": @@ -185,14 +185,14 @@ class Minesweeper(commands.Cog): @staticmethod def reveal_bombs(revealed: GameBoard, board: GameBoard) -> None: - """Reveals all the bombs""" + """Reveals all the bombs.""" for y, row in enumerate(board): for x, cell in enumerate(row): if cell == "bomb": revealed[y][x] = cell async def lost(self, ctx: commands.Context) -> None: - """The player lost the game""" + """The player lost the game.""" game = self.games[ctx.author.id] self.reveal_bombs(game.revealed, game.board) await ctx.author.send(":fire: You lost! :fire:") @@ -200,7 +200,7 @@ class Minesweeper(commands.Cog): await game.chat_msg.channel.send(f":fire: {ctx.author.mention} just lost Minesweeper! :fire:") async def won(self, ctx: commands.Context) -> None: - """The player won the game""" + """The player won the game.""" game = self.games[ctx.author.id] await ctx.author.send(":tada: You won! :tada:") if game.activated_on_server: @@ -215,8 +215,8 @@ class Minesweeper(commands.Cog): if board[y_][x_] == 0: self.reveal_zeros(revealed, board, x_, y_) - async def check_if_won(self, ctx, revealed: GameBoard, board: GameBoard) -> bool: - """Checks if a player has won""" + async def check_if_won(self, ctx: commands.Context, revealed: GameBoard, board: GameBoard) -> bool: + """Checks if a player has won.""" if any( revealed[y][x] in ["hidden", "flag"] and board[y][x] != "bomb" for x in range(10) @@ -252,7 +252,7 @@ class Minesweeper(commands.Cog): @commands.dm_only() @minesweeper_group.command(name="reveal") async def reveal_command(self, ctx: commands.Context, *coordinates: CoordinateConverter) -> None: - """Reveal multiple cells""" + """Reveal multiple cells.""" game = self.games[ctx.author.id] revealed: GameBoard = game.revealed board: GameBoard = game.board @@ -267,8 +267,8 @@ class Minesweeper(commands.Cog): await self.update_boards(ctx) @minesweeper_group.command(name="end") - async def end_command(self, ctx: commands.Context): - """End your current game""" + async def end_command(self, ctx: commands.Context) -> None: + """End your current game.""" game = self.games[ctx.author.id] game.revealed = game.board await self.update_boards(ctx) diff --git a/bot/seasons/evergreen/showprojects.py b/bot/seasons/evergreen/showprojects.py index 37809b33..a943e548 100644 --- a/bot/seasons/evergreen/showprojects.py +++ b/bot/seasons/evergreen/showprojects.py @@ -1,5 +1,6 @@ import logging +from discord import Message from discord.ext import commands from bot.constants import Channels @@ -8,15 +9,15 @@ log = logging.getLogger(__name__) class ShowProjects(commands.Cog): - """Cog that reacts to posts in the #show-your-projects""" + """Cog that reacts to posts in the #show-your-projects.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.lastPoster = 0 # Given 0 as the default last poster ID as no user can actually have 0 assigned to them @commands.Cog.listener() - async def on_message(self, message): - """Adds reactions to posts in #show-your-projects""" + async def on_message(self, message: Message) -> None: + """Adds reactions to posts in #show-your-projects.""" reactions = ["\U0001f44d", "\U00002764", "\U0001f440", "\U0001f389", "\U0001f680", "\U00002b50", "\U0001f6a9"] if (message.channel.id == Channels.show_your_projects and message.author.bot is False @@ -27,7 +28,7 @@ class ShowProjects(commands.Cog): self.lastPoster = message.author.id -def setup(bot): - """Show Projects Reaction Cog""" +def setup(bot: commands.Bot) -> None: + """Show Projects Reaction Cog.""" bot.add_cog(ShowProjects(bot)) log.info("ShowProjects cog loaded") diff --git a/bot/seasons/evergreen/snakes/__init__.py b/bot/seasons/evergreen/snakes/__init__.py index d0e57dae..d7f9f20c 100644 --- a/bot/seasons/evergreen/snakes/__init__.py +++ b/bot/seasons/evergreen/snakes/__init__.py @@ -1,11 +1,13 @@ import logging +from discord.ext import commands + from bot.seasons.evergreen.snakes.snakes_cog import Snakes log = logging.getLogger(__name__) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Snakes Cog load.""" bot.add_cog(Snakes(bot)) log.info("Snakes cog loaded") diff --git a/bot/seasons/evergreen/snakes/converter.py b/bot/seasons/evergreen/snakes/converter.py index f2637530..57103b57 100644 --- a/bot/seasons/evergreen/snakes/converter.py +++ b/bot/seasons/evergreen/snakes/converter.py @@ -1,9 +1,10 @@ import json import logging import random +from typing import Iterable, List import discord -from discord.ext.commands import Converter +from discord.ext.commands import Context, Converter from fuzzywuzzy import fuzz from bot.seasons.evergreen.snakes.utils import SNAKE_RESOURCES @@ -18,7 +19,7 @@ class Snake(Converter): snakes = None special_cases = None - async def convert(self, ctx, name): + async def convert(self, ctx: Context, name: str) -> str: """Convert the input snake name to the closest matching Snake object.""" await self.build_list() name = name.lower() @@ -26,7 +27,7 @@ class Snake(Converter): if name == 'python': return 'Python (programming language)' - def get_potential(iterable, *, threshold=80): + def get_potential(iterable: Iterable, *, threshold: int = 80) -> List[str]: nonlocal name potential = [] @@ -58,7 +59,7 @@ class Snake(Converter): return names.get(name, name) @classmethod - async def build_list(cls): + async def build_list(cls) -> None: """Build list of snakes from the static snake resources.""" # Get all the snakes if cls.snakes is None: @@ -72,7 +73,7 @@ class Snake(Converter): cls.special_cases = {snake['name'].lower(): snake for snake in special_cases} @classmethod - async def random(cls): + async def random(cls) -> str: """ Get a random Snake from the loaded resources. diff --git a/bot/seasons/evergreen/snakes/snakes_cog.py b/bot/seasons/evergreen/snakes/snakes_cog.py index 38878706..1ed38f86 100644 --- a/bot/seasons/evergreen/snakes/snakes_cog.py +++ b/bot/seasons/evergreen/snakes/snakes_cog.py @@ -9,13 +9,13 @@ import textwrap import urllib from functools import partial from io import BytesIO -from typing import Any, Dict +from typing import Any, Dict, List import aiohttp import async_timeout from PIL import Image, ImageDraw, ImageFont from discord import Colour, Embed, File, Member, Message, Reaction -from discord.ext.commands import BadArgument, Bot, Cog, Context, bot_has_permissions, group +from discord.ext.commands import BadArgument, Bot, Cog, CommandError, Context, bot_has_permissions, group from bot.constants import ERROR_REPLIES, Tokens from bot.decorators import locked @@ -154,7 +154,7 @@ class Snakes(Cog): # region: Helper methods @staticmethod - def _beautiful_pastel(hue): + def _beautiful_pastel(hue: float) -> int: """Returns random bright pastels.""" light = random.uniform(0.7, 0.85) saturation = 1 @@ -250,7 +250,7 @@ class Snakes(Cog): return buffer @staticmethod - def _snakify(message): + def _snakify(message: str) -> str: """Sssnakifffiesss a sstring.""" # Replace fricatives with exaggerated snake fricatives. simple_fricatives = [ @@ -272,7 +272,7 @@ class Snakes(Cog): return message - async def _fetch(self, session, url, params=None): + async def _fetch(self, session: aiohttp.ClientSession, url: str, params: dict = None) -> dict: """Asynchronous web request helper method.""" if params is None: params = {} @@ -281,7 +281,7 @@ class Snakes(Cog): async with session.get(url, params=params) as response: return await response.json() - def _get_random_long_message(self, messages, retries=10): + def _get_random_long_message(self, messages: List[str], retries: int = 10) -> str: """ Fetch a message that's at least 3 words long, if possible to do so in retries attempts. @@ -403,9 +403,9 @@ class Snakes(Cog): """Gets a random snake name.""" return random.choice(self.snake_names) - async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: list): + async def _validate_answer(self, ctx: Context, message: Message, answer: str, options: list) -> None: """Validate the answer using a reaction event loop.""" - def predicate(reaction, user): + def predicate(reaction: Reaction, user: Member) -> bool: """Test if the the answer is valid and can be evaluated.""" return ( reaction.message.id == message.id # The reaction is attached to the question we asked. @@ -436,14 +436,14 @@ class Snakes(Cog): # region: Commands @group(name='snakes', aliases=('snake',), invoke_without_command=True) - async def snakes_group(self, ctx: Context): + async def snakes_group(self, ctx: Context) -> None: """Commands from our first code jam.""" await ctx.send_help(ctx.command) @bot_has_permissions(manage_messages=True) @snakes_group.command(name='antidote') @locked() - async def antidote_command(self, ctx: Context): + async def antidote_command(self, ctx: Context) -> None: """ Antidote! Can you create the antivenom before the patient dies? @@ -458,7 +458,7 @@ class Snakes(Cog): This game was created by Lord Bisk and Runew0lf. """ - def predicate(reaction_: Reaction, user_: Member): + def predicate(reaction_: Reaction, user_: Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -584,7 +584,7 @@ class Snakes(Cog): await board_id.clear_reactions() @snakes_group.command(name='draw') - async def draw_command(self, ctx: Context): + async def draw_command(self, ctx: Context) -> None: """ Draws a random snek using Perlin noise. @@ -672,7 +672,7 @@ class Snakes(Cog): @snakes_group.command(name='guess', aliases=('identify',)) @locked() - async def guess_command(self, ctx): + async def guess_command(self, ctx: Context) -> None: """ Snake identifying game. @@ -706,7 +706,7 @@ class Snakes(Cog): await self._validate_answer(ctx, guess, answer, options) @snakes_group.command(name='hatch') - async def hatch_command(self, ctx: Context): + async def hatch_command(self, ctx: Context) -> None: """ Hatches your personal snake. @@ -737,7 +737,7 @@ class Snakes(Cog): await ctx.channel.send(embed=my_snake_embed) @snakes_group.command(name='movie') - async def movie_command(self, ctx: Context): + async def movie_command(self, ctx: Context) -> None: """ Gets a random snake-related movie from OMDB. @@ -807,7 +807,7 @@ class Snakes(Cog): @snakes_group.command(name='quiz') @locked() - async def quiz_command(self, ctx: Context): + async def quiz_command(self, ctx: Context) -> None: """ Asks a snake-related question in the chat and validates the user's guess. @@ -832,7 +832,7 @@ class Snakes(Cog): await self._validate_answer(ctx, quiz, answer, options) @snakes_group.command(name='name', aliases=('name_gen',)) - async def name_command(self, ctx: Context, *, name: str = None): + async def name_command(self, ctx: Context, *, name: str = None) -> None: """ Snakifies a username. @@ -904,7 +904,7 @@ class Snakes(Cog): @snakes_group.command(name='sal') @locked() - async def sal_command(self, ctx: Context): + async def sal_command(self, ctx: Context) -> None: """ Play a game of Snakes and Ladders. @@ -922,7 +922,7 @@ class Snakes(Cog): await game.open_game() @snakes_group.command(name='about') - async def about_command(self, ctx: Context): + async def about_command(self, ctx: Context) -> None: """Show an embed with information about the event, its participants, and its winners.""" contributors = [ "<@!245270749919576066>", @@ -965,7 +965,7 @@ class Snakes(Cog): await ctx.channel.send(embed=embed) @snakes_group.command(name='card') - async def card_command(self, ctx: Context, *, name: Snake = None): + async def card_command(self, ctx: Context, *, name: Snake = None) -> None: """ Create an interesting little card from a snake. @@ -1003,7 +1003,7 @@ class Snakes(Cog): ) @snakes_group.command(name='fact') - async def fact_command(self, ctx: Context): + async def fact_command(self, ctx: Context) -> None: """ Gets a snake-related fact. @@ -1019,7 +1019,7 @@ class Snakes(Cog): await ctx.channel.send(embed=embed) @snakes_group.command(name='snakify') - async def snakify_command(self, ctx: Context, *, message: str = None): + async def snakify_command(self, ctx: Context, *, message: str = None) -> None: """ How would I talk if I were a snake? @@ -1060,7 +1060,7 @@ class Snakes(Cog): await ctx.channel.send(embed=embed) @snakes_group.command(name='video', aliases=('get_video',)) - async def video_command(self, ctx: Context, *, search: str = None): + async def video_command(self, ctx: Context, *, search: str = None) -> None: """ Gets a YouTube video about snakes. @@ -1100,7 +1100,7 @@ class Snakes(Cog): log.warning(f"YouTube API error. Full response looks like {response}") @snakes_group.command(name='zen') - async def zen_command(self, ctx: Context): + async def zen_command(self, ctx: Context) -> None: """ Gets a random quote from the Zen of Python, except as if spoken by a snake. @@ -1127,7 +1127,7 @@ class Snakes(Cog): @get_command.error @card_command.error @video_command.error - async def command_error(self, ctx, error): + async def command_error(self, ctx: Context, error: CommandError) -> None: """Local error handler for the Snake Cog.""" embed = Embed() embed.colour = Colour.red() diff --git a/bot/seasons/evergreen/snakes/utils.py b/bot/seasons/evergreen/snakes/utils.py index e8d2ee44..7d6caf04 100644 --- a/bot/seasons/evergreen/snakes/utils.py +++ b/bot/seasons/evergreen/snakes/utils.py @@ -11,7 +11,9 @@ from typing import List, Tuple from PIL import Image from PIL.ImageDraw import ImageDraw from discord import File, Member, Reaction -from discord.ext.commands import Context +from discord.ext.commands import Cog, Context + +from bot.constants import Roles SNAKE_RESOURCES = Path("bot/resources/snakes").absolute() @@ -116,12 +118,12 @@ def get_resource(file: str) -> List[dict]: return json.load(snakefile) -def smoothstep(t): +def smoothstep(t: float) -> float: """Smooth curve with a zero derivative at 0 and 1, making it useful for interpolating.""" return t * t * (3. - 2. * t) -def lerp(t, a, b): +def lerp(t: float, a: float, b: float) -> float: """Linear interpolation between a and b, given a fraction t.""" return a + t * (b - a) @@ -138,7 +140,7 @@ class PerlinNoiseFactory(object): Licensed under ISC """ - def __init__(self, dimension, octaves=1, tile=(), unbias=False): + def __init__(self, dimension: int, octaves: int = 1, tile: Tuple[int] = (), unbias: bool = False): """ Create a new Perlin noise factory in the given number of dimensions. @@ -152,7 +154,7 @@ class PerlinNoiseFactory(object): This will produce noise that tiles every 3 units vertically, but never tiles horizontally. - If ``unbias`` is true, the smoothstep function will be applied to the output before returning + If ``unbias`` is True, the smoothstep function will be applied to the output before returning it, to counteract some of Perlin noise's significant bias towards the center of its output range. """ self.dimension = dimension @@ -166,7 +168,7 @@ class PerlinNoiseFactory(object): self.gradient = {} - def _generate_gradient(self): + def _generate_gradient(self) -> Tuple[float, ...]: """ Generate a random unit vector at each grid point. @@ -186,7 +188,7 @@ class PerlinNoiseFactory(object): scale = sum(n * n for n in random_point) ** -0.5 return tuple(coord * scale for coord in random_point) - def get_plain_noise(self, *point): + def get_plain_noise(self, *point) -> float: """Get plain noise for a single point, without taking into account either octaves or tiling.""" if len(point) != self.dimension: raise ValueError("Expected {0} values, got {1}".format( @@ -234,7 +236,7 @@ class PerlinNoiseFactory(object): return dots[0] * self.scale_factor - def __call__(self, *point): + def __call__(self, *point) -> float: """ Get the value of this Perlin noise function at the given point. @@ -367,7 +369,7 @@ GAME_SCREEN_EMOJI = [ class SnakeAndLaddersGame: """Snakes and Ladders game Cog.""" - def __init__(self, snakes, context: Context): + def __init__(self, snakes: Cog, context: Context): self.snakes = snakes self.ctx = context self.channel = self.ctx.channel @@ -382,14 +384,13 @@ class SnakeAndLaddersGame: self.positions = None self.rolls = [] - async def open_game(self): + async def open_game(self) -> None: """ Create a new Snakes and Ladders game. - Listen for reactions until players have joined, - and the game has been started. + Listen for reactions until players have joined, and the game has been started. """ - def startup_event_check(reaction_: Reaction, user_: Member): + def startup_event_check(reaction_: Reaction, user_: Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -412,7 +413,6 @@ class SnakeAndLaddersGame: "**Snakes and Ladders**: A new game is about to start!", file=File( str(SNAKE_RESOURCES / "snakes_and_ladders" / "banner.jpg"), - # os.path.join("bot", "resources", "snakes", "snakes_and_ladders", "banner.jpg"), filename='Snakes and Ladders.jpg' ) ) @@ -435,8 +435,9 @@ class SnakeAndLaddersGame: if reaction.emoji == JOIN_EMOJI: await self.player_join(user) elif reaction.emoji == CANCEL_EMOJI: - if self.ctx.author == user: - await self.cancel_game(user) + if user == self.author or (self._is_moderator(user) and user not in self.players): + # Allow game author or non-playing moderation staff to cancel a waiting game + await self.cancel_game() return else: await self.player_leave(user) @@ -451,10 +452,11 @@ class SnakeAndLaddersGame: except asyncio.TimeoutError: log.debug("Snakes and Ladders timed out waiting for a reaction") - self.cancel_game(self.author) + await self.cancel_game() return # We're done, no reactions for the last 5 minutes - async def _add_player(self, user: Member): + async def _add_player(self, user: Member) -> None: + """Add player to game.""" self.players.append(user) self.player_tiles[user.id] = 1 @@ -462,7 +464,7 @@ class SnakeAndLaddersGame: im = Image.open(io.BytesIO(avatar_bytes)).resize((BOARD_PLAYER_SIZE, BOARD_PLAYER_SIZE)) self.avatar_images[user.id] = im - async def player_join(self, user: Member): + async def player_join(self, user: Member) -> None: """ Handle players joining the game. @@ -488,20 +490,16 @@ class SnakeAndLaddersGame: delete_after=10 ) - async def player_leave(self, user: Member): + async def player_leave(self, user: Member) -> bool: """ Handle players leaving the game. - Leaving is prevented if the user initiated the game or if they weren't part of it in the - first place. + Leaving is prevented if the user wasn't part of the game. + + If the number of players reaches 0, the game is terminated. In this case, a sentinel boolean + is returned True to prevent a game from continuing after it's destroyed. """ - if user == self.author: - await self.channel.send( - user.mention + " You are the author, and cannot leave the game. Execute " - "`sal cancel` to cancel the game.", - delete_after=10 - ) - return + is_surrendered = False # Sentinel value to assist with stopping a surrendered game for p in self.players: if user == p: self.players.remove(p) @@ -512,47 +510,43 @@ class SnakeAndLaddersGame: delete_after=10 ) - if self.state != 'waiting' and len(self.players) == 1: + if self.state != 'waiting' and len(self.players) == 0: await self.channel.send("**Snakes and Ladders**: The game has been surrendered!") + is_surrendered = True self._destruct() - return - await self.channel.send(user.mention + " You are not in the match.", delete_after=10) - async def cancel_game(self, user: Member): - """Allow the game author to cancel the running game.""" - if not user == self.author: - await self.channel.send(user.mention + " Only the author of the game can cancel it.", delete_after=10) - return + return is_surrendered + else: + await self.channel.send(user.mention + " You are not in the match.", delete_after=10) + return is_surrendered + + async def cancel_game(self) -> None: + """Cancel the running game.""" await self.channel.send("**Snakes and Ladders**: Game has been canceled.") self._destruct() - async def start_game(self, user: Member): + async def start_game(self, user: Member) -> None: """ Allow the game author to begin the game. - The game cannot be started if there aren't enough players joined or if the game is in a - waiting state. + The game cannot be started if the game is in a waiting state. """ if not user == self.author: await self.channel.send(user.mention + " Only the author of the game can start it.", delete_after=10) return - if len(self.players) < 1: - await self.channel.send( - user.mention + " A minimum of 2 players is required to start the game.", - delete_after=10 - ) - return + if not self.state == 'waiting': await self.channel.send(user.mention + " The game cannot be started at this time.", delete_after=10) return + self.state = 'starting' player_list = ', '.join(user.mention for user in self.players) await self.channel.send("**Snakes and Ladders**: The game is starting!\nPlayers: " + player_list) await self.start_round() - async def start_round(self): + async def start_round(self) -> None: """Begin the round.""" - def game_event_check(reaction_: Reaction, user_: Member): + def game_event_check(reaction_: Reaction, user_: Member) -> bool: """Make sure that this reaction is what we want to operate on.""" return ( all(( @@ -565,8 +559,6 @@ class SnakeAndLaddersGame: self.state = 'roll' for user in self.players: self.round_has_rolled[user.id] = False - # board_img = Image.open(os.path.join( - # "bot", "resources", "snakes", "snakes_and_ladders", "board.jpg")) board_img = Image.open(str(SNAKE_RESOURCES / "snakes_and_ladders" / "board.jpg")) player_row_size = math.ceil(MAX_PLAYERS / 2) @@ -612,6 +604,7 @@ class SnakeAndLaddersGame: for emoji in GAME_SCREEN_EMOJI: await self.positions.add_reaction(emoji) + is_surrendered = False while True: try: reaction, user = await self.ctx.bot.wait_for( @@ -623,11 +616,12 @@ class SnakeAndLaddersGame: if reaction.emoji == ROLL_EMOJI: await self.player_roll(user) elif reaction.emoji == CANCEL_EMOJI: - if self.ctx.author == user: - await self.cancel_game(user) + if self._is_moderator(user) and user not in self.players: + # Only allow non-playing moderation staff to cancel a running game + await self.cancel_game() return else: - await self.player_leave(user) + is_surrendered = await self.player_leave(user) await self.positions.remove_reaction(reaction.emoji, user) @@ -636,13 +630,16 @@ class SnakeAndLaddersGame: except asyncio.TimeoutError: log.debug("Snakes and Ladders timed out waiting for a reaction") - await self.cancel_game(self.author) + await self.cancel_game() return # We're done, no reactions for the last 5 minutes # Round completed - await self._complete_round() + # Check to see if the game was surrendered before completing the round, without this + # sentinel, the game object would be deleted but the next round still posted into purgatory + if not is_surrendered: + await self._complete_round() - async def player_roll(self, user: Member): + async def player_roll(self, user: Member) -> None: """Handle the player's roll.""" if user.id not in self.player_tiles: await self.channel.send(user.mention + " You are not in the match.", delete_after=10) @@ -674,7 +671,8 @@ class SnakeAndLaddersGame: self.player_tiles[user.id] = min(100, next_tile) self.round_has_rolled[user.id] = True - async def _complete_round(self): + async def _complete_round(self) -> None: + """At the conclusion of a round check to see if there's been a winner.""" self.state = 'post_round' # check for winner @@ -689,22 +687,30 @@ class SnakeAndLaddersGame: self._destruct() def _check_winner(self) -> Member: + """Return a winning member if we're in the post-round state and there's a winner.""" if self.state != 'post_round': return None return next((player for player in self.players if self.player_tiles[player.id] == 100), None) - def _check_all_rolled(self): + def _check_all_rolled(self) -> bool: + """Check if all members have made their roll.""" return all(rolled for rolled in self.round_has_rolled.values()) - def _destruct(self): + def _destruct(self) -> None: + """Clean up the finished game object.""" del self.snakes.active_sal[self.channel] - def _board_coordinate_from_index(self, index: int): - # converts the tile number to the x/y coordinates for graphical purposes + def _board_coordinate_from_index(self, index: int) -> Tuple[int, int]: + """Convert the tile number to the x/y coordinates for graphical purposes.""" y_level = 9 - math.floor((index - 1) / 10) is_reversed = math.floor((index - 1) / 10) % 2 != 0 x_level = (index - 1) % 10 if is_reversed: x_level = 9 - x_level return x_level, y_level + + @staticmethod + def _is_moderator(user: Member) -> bool: + """Return True if the user is a Moderator.""" + return any(Roles.moderator == role.id for role in user.roles) diff --git a/bot/seasons/evergreen/speedrun.py b/bot/seasons/evergreen/speedrun.py index f6a43a63..76c5e8d3 100644 --- a/bot/seasons/evergreen/speedrun.py +++ b/bot/seasons/evergreen/speedrun.py @@ -13,16 +13,16 @@ with Path('bot/resources/evergreen/speedrun_links.json').open(encoding="utf-8") class Speedrun(commands.Cog): """Commands about the video game speedrunning community.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(name="speedrun") - async def get_speedrun(self, ctx): + async def get_speedrun(self, ctx: commands.Context) -> None: """Sends a link to a video of a random speedrun.""" await ctx.send(choice(LINKS)) -def setup(bot): - """Load the Speedrun cog""" +def setup(bot: commands.Bot) -> None: + """Load the Speedrun cog.""" bot.add_cog(Speedrun(bot)) log.info("Speedrun cog loaded") diff --git a/bot/seasons/evergreen/uptime.py b/bot/seasons/evergreen/uptime.py index 92066e0a..6f24f545 100644 --- a/bot/seasons/evergreen/uptime.py +++ b/bot/seasons/evergreen/uptime.py @@ -12,11 +12,11 @@ log = logging.getLogger(__name__) class Uptime(commands.Cog): """A cog for posting the bot's uptime.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(name="uptime") - async def uptime(self, ctx): + async def uptime(self, ctx: commands.Context) -> None: """Responds with the uptime of the bot.""" difference = relativedelta(start_time - arrow.utcnow()) uptime_string = start_time.shift( @@ -28,7 +28,7 @@ class Uptime(commands.Cog): await ctx.send(f"I started up {uptime_string}.") -def setup(bot): +def setup(bot: commands.Bot) -> None: """Uptime Cog load.""" bot.add_cog(Uptime(bot)) log.info("Uptime cog loaded") diff --git a/bot/seasons/halloween/8ball.py b/bot/seasons/halloween/8ball.py index faf59ca9..2e1c2804 100644 --- a/bot/seasons/halloween/8ball.py +++ b/bot/seasons/halloween/8ball.py @@ -15,11 +15,11 @@ with open(Path("bot/resources/halloween/responses.json"), "r", encoding="utf8") class SpookyEightBall(commands.Cog): """Spooky Eightball answers.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(aliases=('spooky8ball',)) - async def spookyeightball(self, ctx, *, question: str): + async def spookyeightball(self, ctx: commands.Context, *, question: str) -> None: """Responds with a random response to a question.""" choice = random.choice(responses['responses']) msg = await ctx.send(choice[0]) @@ -28,7 +28,7 @@ class SpookyEightBall(commands.Cog): await msg.edit(content=f"{choice[0]} \n{choice[1]}") -def setup(bot): +def setup(bot: commands.Bot) -> None: """Spooky Eight Ball Cog Load.""" bot.add_cog(SpookyEightBall(bot)) log.info("SpookyEightBall cog loaded") diff --git a/bot/seasons/halloween/__init__.py b/bot/seasons/halloween/__init__.py index aff51423..c81879d7 100644 --- a/bot/seasons/halloween/__init__.py +++ b/bot/seasons/halloween/__init__.py @@ -3,16 +3,22 @@ from bot.seasons import SeasonBase class Halloween(SeasonBase): - """Halloween Seasonal event attributes.""" + """ + Halloween Seasonal event attributes. + + Announcement for this cog temporarily disabled, since we're doing a custom + Hacktoberfest announcement. If you're enabling the announcement again, + make sure to update this docstring accordingly. + """ name = "halloween" - bot_name = "Spookybot" + bot_name = "NeonBot" greeting = "Happy Halloween!" start_date = "01/10" - end_date = "31/10" + end_date = "01/11" - colour = Colours.orange + colour = Colours.pink icon = ( - "/logos/logo_seasonal/halloween/spooky.png", + "/logos/logo_seasonal/hacktober/hacktoberfest.png", ) diff --git a/bot/seasons/halloween/candy_collection.py b/bot/seasons/halloween/candy_collection.py index d35cbee5..64da7ced 100644 --- a/bot/seasons/halloween/candy_collection.py +++ b/bot/seasons/halloween/candy_collection.py @@ -3,6 +3,7 @@ import json import logging import os import random +from typing import List, Union import discord from discord.ext import commands @@ -23,7 +24,7 @@ ADD_SKULL_EXISTING_REACTION_CHANCE = 20 # 5% class CandyCollection(commands.Cog): """Candy collection game Cog.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot with open(json_location) as candy: self.candy_json = json.load(candy) @@ -34,7 +35,7 @@ class CandyCollection(commands.Cog): self.get_candyinfo[userid] = userinfo @commands.Cog.listener() - async def on_message(self, message): + async def on_message(self, message: discord.Message) -> None: """Randomly adds candy or skull reaction to non-bot messages in the Event channel.""" # make sure its a human message if message.author.bot: @@ -55,7 +56,7 @@ class CandyCollection(commands.Cog): return await message.add_reaction('\N{CANDY}') @commands.Cog.listener() - async def on_reaction_add(self, reaction, user): + async def on_reaction_add(self, reaction: discord.Reaction, user: discord.Member) -> None: """Add/remove candies from a person if the reaction satisfies criteria.""" message = reaction.message # check to ensure the reactor is human @@ -101,7 +102,7 @@ class CandyCollection(commands.Cog): self.candy_json['records'].append(d) await self.remove_reactions(reaction) - async def reacted_msg_chance(self, message): + async def reacted_msg_chance(self, message: discord.Message) -> None: """ Randomly add a skull or candy reaction to a message if there is a reaction there already. @@ -118,24 +119,25 @@ class CandyCollection(commands.Cog): self.msg_reacted.append(d) return await message.add_reaction('\N{CANDY}') - async def ten_recent_msg(self): + async def ten_recent_msg(self) -> List[int]: """Get the last 10 messages sent in the channel.""" ten_recent = [] - recent_msg = max(message.id for message - in self.bot._connection._messages - if message.channel.id == Channels.seasonalbot_chat) + recent_msg_id = max( + message.id for message in self.bot._connection._messages + if message.channel.id == Channels.seasonalbot_chat + ) channel = await self.hacktober_channel() - ten_recent.append(recent_msg.id) + ten_recent.append(recent_msg_id) for i in range(9): - o = discord.Object(id=recent_msg.id + i) + o = discord.Object(id=recent_msg_id + i) msg = await next(channel.history(limit=1, before=o)) ten_recent.append(msg.id) return ten_recent - async def get_message(self, msg_id): + async def get_message(self, msg_id: int) -> Union[discord.Message, None]: """Get the message from its ID.""" try: o = discord.Object(id=msg_id + 1) @@ -151,11 +153,11 @@ class CandyCollection(commands.Cog): except Exception: return None - async def hacktober_channel(self): + async def hacktober_channel(self) -> discord.TextChannel: """Get #hacktoberbot channel from its ID.""" return self.bot.get_channel(id=Channels.seasonalbot_chat) - async def remove_reactions(self, reaction): + async def remove_reactions(self, reaction: discord.Reaction) -> None: """Remove all candy/skull reactions.""" try: async for user in reaction.users(): @@ -164,20 +166,20 @@ class CandyCollection(commands.Cog): except discord.HTTPException: pass - async def send_spook_msg(self, author, channel, candies): + async def send_spook_msg(self, author: discord.Member, channel: discord.TextChannel, candies: int) -> None: """Send a spooky message.""" e = discord.Embed(colour=author.colour) e.set_author(name="Ghosts and Ghouls and Jack o' lanterns at night; " f"I took {candies} candies and quickly took flight.") await channel.send(embed=e) - def save_to_json(self): + def save_to_json(self) -> None: """Save JSON to a local file.""" with open(json_location, 'w') as outfile: json.dump(self.candy_json, outfile) @commands.command() - async def candy(self, ctx): + async def candy(self, ctx: commands.Context) -> None: """Get the candy leaderboard and save to JSON.""" # Use run_in_executor to prevent blocking thing = functools.partial(self.save_to_json) @@ -213,7 +215,7 @@ class CandyCollection(commands.Cog): await ctx.send(embed=e) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Candy Collection game Cog load.""" bot.add_cog(CandyCollection(bot)) log.info("CandyCollection cog loaded") diff --git a/bot/seasons/halloween/hacktoberstats.py b/bot/seasons/halloween/hacktoberstats.py index b6b5a900..20797037 100644 --- a/bot/seasons/halloween/hacktoberstats.py +++ b/bot/seasons/halloween/hacktoberstats.py @@ -1,32 +1,33 @@ import json import logging import re -import typing from collections import Counter from datetime import datetime from pathlib import Path +from typing import List, Tuple import aiohttp import discord from discord.ext import commands +from bot.utils.persist import make_persistent + log = logging.getLogger(__name__) +CURRENT_YEAR = datetime.now().year # Used to construct GH API query +PRS_FOR_SHIRT = 4 # Minimum number of PRs before a shirt is awarded + class HacktoberStats(commands.Cog): """Hacktoberfest statistics Cog.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot - self.link_json = Path("bot/resources/github_links.json") + self.link_json = make_persistent(Path("bot", "resources", "halloween", "github_links.json")) self.linked_accounts = self.load_linked_users() - @commands.group( - name='hacktoberstats', - aliases=('hackstats',), - invoke_without_command=True - ) - async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None): + @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True) + async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None) -> None: """ Display an embed for a user's Hacktoberfest contributions. @@ -43,8 +44,8 @@ class HacktoberStats(commands.Cog): else: msg = ( f"{author_mention}, you have not linked a GitHub account\n\n" - f"You can link your GitHub account using:\n```{ctx.prefix}stats link github_username```\n" - f"Or query GitHub stats directly using:\n```{ctx.prefix}stats github_username```" + f"You can link your GitHub account using:\n```{ctx.prefix}hackstats link github_username```\n" + f"Or query GitHub stats directly using:\n```{ctx.prefix}hackstats github_username```" ) await ctx.send(msg) return @@ -52,7 +53,7 @@ class HacktoberStats(commands.Cog): await self.get_stats(ctx, github_username) @hacktoberstats_group.command(name="link") - async def link_user(self, ctx: commands.Context, github_username: str = None): + async def link_user(self, ctx: commands.Context, github_username: str = None) -> None: """ Link the invoking user's Github github_username to their Discord ID. @@ -85,7 +86,7 @@ class HacktoberStats(commands.Cog): await ctx.send(f"{author_mention}, a GitHub username is required to link your account") @hacktoberstats_group.command(name="unlink") - async def unlink_user(self, ctx: commands.Context): + async def unlink_user(self, ctx: commands.Context) -> None: """Remove the invoking user's account link from the log.""" author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) @@ -99,7 +100,7 @@ class HacktoberStats(commands.Cog): self.save_linked_users() - def load_linked_users(self) -> typing.Dict: + def load_linked_users(self) -> dict: """ Load list of linked users from local JSON file. @@ -122,7 +123,7 @@ class HacktoberStats(commands.Cog): logging.info(f"Linked account log: '{self.link_json}' does not exist") return {} - def save_linked_users(self): + def save_linked_users(self) -> None: """ Save list of linked users to local JSON file. @@ -139,7 +140,7 @@ class HacktoberStats(commands.Cog): json.dump(self.linked_accounts, fID, default=str) logging.info(f"linked_accounts saved to '{self.link_json}'") - async def get_stats(self, ctx: commands.Context, github_username: str): + async def get_stats(self, ctx: commands.Context, github_username: str) -> None: """ Query GitHub's API for PRs created by a GitHub user during the month of October. @@ -158,18 +159,18 @@ class HacktoberStats(commands.Cog): else: await ctx.send(f"No October GitHub contributions found for '{github_username}'") - def build_embed(self, github_username: str, prs: typing.List[dict]) -> discord.Embed: + def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed: """Return a stats embed built from github_username's PRs.""" logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'") pr_stats = self._summarize_prs(prs) n = pr_stats['n_prs'] - if n >= 5: + if n >= PRS_FOR_SHIRT: shirtstr = f"**{github_username} has earned a tshirt!**" - elif n == 4: + elif n == PRS_FOR_SHIRT - 1: shirtstr = f"**{github_username} is 1 PR away from a tshirt!**" else: - shirtstr = f"**{github_username} is {5 - n} PRs away from a tshirt!**" + shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a tshirt!**" stats_embed = discord.Embed( title=f"{github_username}'s Hacktoberfest", @@ -186,7 +187,7 @@ class HacktoberStats(commands.Cog): stats_embed.set_author( name="Hacktoberfest", url="https://hacktoberfest.digitalocean.com", - icon_url="https://hacktoberfest.digitalocean.com/assets/logo-hacktoberfest.png" + icon_url="https://hacktoberfest.digitalocean.com/pretty_logo.png" ) stats_embed.add_field( name="Top 5 Repositories:", @@ -197,7 +198,7 @@ class HacktoberStats(commands.Cog): return stats_embed @staticmethod - async def get_october_prs(github_username: str) -> typing.List[dict]: + async def get_october_prs(github_username: str) -> List[dict]: """ Query GitHub's API for PRs created during the month of October by github_username. @@ -219,7 +220,7 @@ class HacktoberStats(commands.Cog): not_label = "invalid" action_type = "pr" is_query = f"public+author:{github_username}" - date_range = "2018-10-01..2018-10-31" + date_range = f"{CURRENT_YEAR}-10-01..{CURRENT_YEAR}-10-31" per_page = "300" query_url = ( f"{base_url}" @@ -274,7 +275,7 @@ class HacktoberStats(commands.Cog): return re.findall(exp, in_url)[0] @staticmethod - def _summarize_prs(prs: typing.List[dict]) -> typing.Dict: + def _summarize_prs(prs: List[dict]) -> dict: """ Generate statistics from an input list of PR dictionaries, as output by get_october_prs. @@ -288,7 +289,7 @@ class HacktoberStats(commands.Cog): return {"n_prs": len(prs), "top5": Counter(contributed_repos).most_common(5)} @staticmethod - def _build_top5str(stats: typing.List[tuple]) -> str: + def _build_top5str(stats: List[tuple]) -> str: """ Build a string from the Top 5 contributions that is compatible with a discord.Embed field. @@ -316,7 +317,7 @@ class HacktoberStats(commands.Cog): return "contributions" @staticmethod - def _author_mention_from_context(ctx: commands.Context) -> typing.Tuple: + def _author_mention_from_context(ctx: commands.Context) -> Tuple: """Return stringified Message author ID and mentionable string from commands.Context.""" author_id = str(ctx.message.author.id) author_mention = ctx.message.author.mention @@ -324,7 +325,7 @@ class HacktoberStats(commands.Cog): return author_id, author_mention -def setup(bot): +def setup(bot): # Noqa """Hacktoberstats Cog load.""" bot.add_cog(HacktoberStats(bot)) log.info("HacktoberStats cog loaded") diff --git a/bot/seasons/halloween/halloween_facts.py b/bot/seasons/halloween/halloween_facts.py index f09aa4ad..f8610bd3 100644 --- a/bot/seasons/halloween/halloween_facts.py +++ b/bot/seasons/halloween/halloween_facts.py @@ -3,6 +3,7 @@ import logging import random from datetime import timedelta from pathlib import Path +from typing import Tuple import discord from discord.ext import commands @@ -28,7 +29,7 @@ INTERVAL = timedelta(hours=6).total_seconds() class HalloweenFacts(commands.Cog): """A Cog for displaying interesting facts about Halloween.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot with open(Path("bot/resources/halloween/halloween_facts.json"), "r") as file: self.halloween_facts = json.load(file) @@ -37,31 +38,31 @@ class HalloweenFacts(commands.Cog): random.shuffle(self.facts) @commands.Cog.listener() - async def on_ready(self): + async def on_ready(self) -> None: """Get event Channel object and initialize fact task loop.""" self.channel = self.bot.get_channel(Channels.seasonalbot_chat) self.bot.loop.create_task(self._fact_publisher_task()) - def random_fact(self): + def random_fact(self) -> Tuple[int, str]: """Return a random fact from the loaded facts.""" return random.choice(self.facts) @commands.command(name="spookyfact", aliases=("halloweenfact",), brief="Get the most recent Halloween fact") - async def get_random_fact(self, ctx): + async def get_random_fact(self, ctx: commands.Context) -> None: """Reply with the most recent Halloween fact.""" index, fact = self.random_fact() embed = self._build_embed(index, fact) await ctx.send(embed=embed) @staticmethod - def _build_embed(index, fact): + def _build_embed(index: int, fact: str) -> discord.Embed: """Builds a Discord embed from the given fact and its index.""" emoji = random.choice(SPOOKY_EMOJIS) title = f"{emoji} Halloween Fact #{index + 1}" return discord.Embed(title=title, description=fact, color=PUMPKIN_ORANGE) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Halloween facts Cog load.""" bot.add_cog(HalloweenFacts(bot)) log.info("HalloweenFacts cog loaded") diff --git a/bot/seasons/halloween/halloweenify.py b/bot/seasons/halloween/halloweenify.py index 334781ab..dfcc2b1e 100644 --- a/bot/seasons/halloween/halloweenify.py +++ b/bot/seasons/halloween/halloweenify.py @@ -13,12 +13,12 @@ log = logging.getLogger(__name__) class Halloweenify(commands.Cog): """A cog to change a invokers nickname to a spooky one!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.cooldown(1, 300, BucketType.user) @commands.command() - async def halloweenify(self, ctx): + async def halloweenify(self, ctx: commands.Context) -> None: """Change your nickname into a much spookier one!""" async with ctx.typing(): with open(Path("bot/resources/halloween/halloweenify.json"), "r") as f: @@ -46,7 +46,7 @@ class Halloweenify(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Halloweenify Cog load.""" bot.add_cog(Halloweenify(bot)) log.info("Halloweenify cog loaded") diff --git a/bot/seasons/halloween/monstersurvey.py b/bot/seasons/halloween/monstersurvey.py index 173ce8eb..12e1d022 100644 --- a/bot/seasons/halloween/monstersurvey.py +++ b/bot/seasons/halloween/monstersurvey.py @@ -30,7 +30,7 @@ class MonsterSurvey(Cog): with open(self.registry_location, 'r') as jason: self.voter_registry = json.load(jason) - def json_write(self): + def json_write(self) -> None: """Write voting results to a local JSON file.""" log.info("Saved Monster Survey Results") with open(self.registry_location, 'w') as jason: @@ -50,7 +50,7 @@ class MonsterSurvey(Cog): if id in vr[m]['votes'] and m != monster: vr[m]['votes'].remove(id) - def get_name_by_leaderboard_index(self, n): + def get_name_by_leaderboard_index(self, n: int) -> str: """Return the monster at the specified leaderboard index.""" n = n - 1 vr = self.voter_registry @@ -60,9 +60,9 @@ class MonsterSurvey(Cog): @commands.group( name='monster', - aliases=('ms',) + aliases=('mon',) ) - async def monster_group(self, ctx: Context): + async def monster_group(self, ctx: Context) -> None: """The base voting command. If nothing is called, then it will return an embed.""" if ctx.invoked_subcommand is None: async with ctx.typing(): @@ -92,7 +92,7 @@ class MonsterSurvey(Cog): @monster_group.command( name='vote' ) - async def monster_vote(self, ctx: Context, name=None): + async def monster_vote(self, ctx: Context, name: str = None) -> None: """ Cast a vote for a particular monster. @@ -143,7 +143,7 @@ class MonsterSurvey(Cog): @monster_group.command( name='show' ) - async def monster_show(self, ctx: Context, name=None) -> None: + async def monster_show(self, ctx: Context, name: str = None) -> None: """Shows the named monster. If one is not named, it sends the default voting embed instead.""" if name is None: await ctx.invoke(self.monster_leaderboard) @@ -200,7 +200,7 @@ class MonsterSurvey(Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: Bot) -> None: """Monster survey Cog load.""" bot.add_cog(MonsterSurvey(bot)) log.info("MonsterSurvey cog loaded") diff --git a/bot/seasons/halloween/scarymovie.py b/bot/seasons/halloween/scarymovie.py index cd95a3a2..3823a3e4 100644 --- a/bot/seasons/halloween/scarymovie.py +++ b/bot/seasons/halloween/scarymovie.py @@ -16,11 +16,11 @@ TMDB_TOKEN = environ.get('TMDB_TOKEN') class ScaryMovie(commands.Cog): """Selects a random scary movie and embeds info into Discord chat.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(name='scarymovie', alias=['smovie']) - async def random_movie(self, ctx): + async def random_movie(self, ctx: commands.Context) -> None: """Randomly select a scary movie and display information about it.""" async with ctx.typing(): selection = await self.select_movie() @@ -29,7 +29,7 @@ class ScaryMovie(commands.Cog): await ctx.send(embed=movie_details) @staticmethod - async def select_movie(): + async def select_movie() -> dict: """Selects a random movie and returns a JSON of movie details from TMDb.""" url = 'https://api.themoviedb.org/4/discover/movie' params = { @@ -62,7 +62,7 @@ class ScaryMovie(commands.Cog): return await selection.json() @staticmethod - async def format_metadata(movie): + async def format_metadata(movie: dict) -> Embed: """Formats raw TMDb data to be embedded in Discord chat.""" # Build the relevant URLs. movie_id = movie.get("id") @@ -126,7 +126,7 @@ class ScaryMovie(commands.Cog): return embed -def setup(bot): +def setup(bot: commands.Bot) -> None: """Scary movie Cog load.""" bot.add_cog(ScaryMovie(bot)) log.info("ScaryMovie cog loaded") diff --git a/bot/seasons/halloween/spookyavatar.py b/bot/seasons/halloween/spookyavatar.py index 9bdef1a8..268de3fb 100644 --- a/bot/seasons/halloween/spookyavatar.py +++ b/bot/seasons/halloween/spookyavatar.py @@ -15,10 +15,10 @@ log = logging.getLogger(__name__) class SpookyAvatar(commands.Cog): """A cog that spookifies an avatar.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot - async def get(self, url): + async def get(self, url: str) -> bytes: """Returns the contents of the supplied URL.""" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: @@ -26,7 +26,7 @@ class SpookyAvatar(commands.Cog): @commands.command(name='savatar', aliases=('spookyavatar', 'spookify'), brief='Spookify an user\'s avatar.') - async def spooky_avatar(self, ctx, user: discord.Member = None): + async def spooky_avatar(self, ctx: commands.Context, user: discord.Member = None) -> None: """A command to print the user's spookified avatar.""" if user is None: user = ctx.message.author @@ -47,7 +47,7 @@ class SpookyAvatar(commands.Cog): os.remove(str(ctx.message.id)+'.png') -def setup(bot): +def setup(bot: commands.Bot) -> None: """Spooky avatar Cog load.""" bot.add_cog(SpookyAvatar(bot)) log.info("SpookyAvatar cog loaded") diff --git a/bot/seasons/halloween/spookygif.py b/bot/seasons/halloween/spookygif.py index ba2ad6e5..818de8cd 100644 --- a/bot/seasons/halloween/spookygif.py +++ b/bot/seasons/halloween/spookygif.py @@ -12,11 +12,11 @@ log = logging.getLogger(__name__) class SpookyGif(commands.Cog): """A cog to fetch a random spooky gif from the web!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(name="spookygif", aliases=("sgif", "scarygif")) - async def spookygif(self, ctx): + async def spookygif(self, ctx: commands.Context) -> None: """Fetches a random gif from the GIPHY API and responds with it.""" async with ctx.typing(): async with aiohttp.ClientSession() as session: @@ -33,7 +33,7 @@ class SpookyGif(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Spooky GIF Cog load.""" bot.add_cog(SpookyGif(bot)) log.info("SpookyGif cog loaded") diff --git a/bot/seasons/halloween/spookyrating.py b/bot/seasons/halloween/spookyrating.py index 08c17a27..7f78f536 100644 --- a/bot/seasons/halloween/spookyrating.py +++ b/bot/seasons/halloween/spookyrating.py @@ -17,15 +17,15 @@ with Path("bot/resources/halloween/spooky_rating.json").open() as file: class SpookyRating(commands.Cog): - """A cog for calculating one's spooky rating""" + """A cog for calculating one's spooky rating.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.local_random = random.Random() @commands.command() @commands.cooldown(rate=1, per=5, type=commands.BucketType.user) - async def spookyrating(self, ctx, who: discord.Member = None): + async def spookyrating(self, ctx: commands.Context, who: discord.Member = None) -> None: """ Calculates the spooky rating of someone. @@ -61,7 +61,7 @@ class SpookyRating(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Spooky Rating Cog load.""" bot.add_cog(SpookyRating(bot)) log.info("SpookyRating cog loaded") diff --git a/bot/seasons/halloween/spookyreact.py b/bot/seasons/halloween/spookyreact.py index 5a086072..90b1254d 100644 --- a/bot/seasons/halloween/spookyreact.py +++ b/bot/seasons/halloween/spookyreact.py @@ -2,7 +2,7 @@ import logging import re import discord -from discord.ext.commands import Cog +from discord.ext.commands import Bot, Cog log = logging.getLogger(__name__) @@ -20,11 +20,11 @@ SPOOKY_TRIGGERS = { class SpookyReact(Cog): """A cog that makes the bot react to message triggers.""" - def __init__(self, bot): + def __init__(self, bot: Bot): self.bot = bot @Cog.listener() - async def on_message(self, ctx: discord.Message): + async def on_message(self, ctx: discord.Message) -> None: """ A command to send the seasonalbot github project. @@ -66,7 +66,7 @@ class SpookyReact(Cog): return False -def setup(bot): +def setup(bot: Bot) -> None: """Spooky reaction Cog load.""" bot.add_cog(SpookyReact(bot)) log.info("SpookyReact cog loaded") diff --git a/bot/seasons/halloween/spookysound.py b/bot/seasons/halloween/spookysound.py index 44fdd9d6..e0676d0a 100644 --- a/bot/seasons/halloween/spookysound.py +++ b/bot/seasons/halloween/spookysound.py @@ -13,14 +13,14 @@ log = logging.getLogger(__name__) class SpookySound(commands.Cog): """A cog that plays a spooky sound in a voice channel on command.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.sound_files = list(Path("bot/resources/halloween/spookysounds").glob("*.mp3")) self.channel = None @commands.cooldown(rate=1, per=1) @commands.command(brief="Play a spooky sound, restricted to once per 2 mins") - async def spookysound(self, ctx): + async def spookysound(self, ctx: commands.Context) -> None: """ Connect to the Hacktoberbot voice channel, play a random spooky sound, then disconnect. @@ -37,12 +37,12 @@ class SpookySound(commands.Cog): voice.play(src, after=lambda e: self.bot.loop.create_task(self.disconnect(voice))) @staticmethod - async def disconnect(voice): + async def disconnect(voice: discord.VoiceClient) -> None: """Helper method to disconnect a given voice client.""" await voice.disconnect() -def setup(bot): +def setup(bot: commands.Bot) -> None: """Spooky sound Cog load.""" bot.add_cog(SpookySound(bot)) log.info("SpookySound cog loaded") diff --git a/bot/seasons/halloween/timeleft.py b/bot/seasons/halloween/timeleft.py index a2b16a6c..77767baa 100644 --- a/bot/seasons/halloween/timeleft.py +++ b/bot/seasons/halloween/timeleft.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from typing import Tuple from discord.ext import commands @@ -9,16 +10,16 @@ log = logging.getLogger(__name__) class TimeLeft(commands.Cog): """A Cog that tells you how long left until Hacktober is over!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @staticmethod - def in_october(): + def in_october() -> bool: """Return True if the current month is October.""" return datetime.utcnow().month == 10 @staticmethod - def load_date(): + def load_date() -> Tuple[int, datetime, datetime]: """Return of a tuple of the current time and the end and start times of the next October.""" now = datetime.utcnow() year = now.year @@ -29,7 +30,7 @@ class TimeLeft(commands.Cog): return now, end, start @commands.command() - async def timeleft(self, ctx): + async def timeleft(self, ctx: commands.Context) -> None: """ Calculates the time left until the end of Hacktober. @@ -53,7 +54,7 @@ class TimeLeft(commands.Cog): ) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Cog load.""" bot.add_cog(TimeLeft(bot)) log.info("TimeLeft cog loaded") diff --git a/bot/seasons/pride/__init__.py b/bot/seasons/pride/__init__.py index b1897eca..75e90b2a 100644 --- a/bot/seasons/pride/__init__.py +++ b/bot/seasons/pride/__init__.py @@ -27,7 +27,7 @@ class Pride(SeasonBase): # Duration of season start_date = "01/06" - end_date = "30/06" + end_date = "01/07" # Season logo colour = Colours.soft_red diff --git a/bot/seasons/pride/pride_anthem.py b/bot/seasons/pride/pride_anthem.py index f226f4bb..b0c6d34e 100644 --- a/bot/seasons/pride/pride_anthem.py +++ b/bot/seasons/pride/pride_anthem.py @@ -11,7 +11,7 @@ log = logging.getLogger(__name__) class PrideAnthem(commands.Cog): """Embed a random youtube video for a gay anthem!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.anthems = self.load_vids() @@ -39,7 +39,7 @@ class PrideAnthem(commands.Cog): return anthems @commands.command(name="prideanthem", aliases=["anthem", "pridesong"]) - async def prideanthem(self, ctx, genre: str = None): + async def prideanthem(self, ctx: commands.Context, genre: str = None) -> None: """ Sends a message with a video of a random pride anthem. @@ -52,7 +52,7 @@ class PrideAnthem(commands.Cog): await ctx.send("I couldn't find a video, sorry!") -def setup(bot): +def setup(bot: commands.Bot) -> None: """Cog loader for pride anthem.""" bot.add_cog(PrideAnthem(bot)) log.info("Pride anthems cog loaded!") diff --git a/bot/seasons/pride/pride_avatar.py b/bot/seasons/pride/pride_avatar.py index a5b38d20..85e49d5c 100644 --- a/bot/seasons/pride/pride_avatar.py +++ b/bot/seasons/pride/pride_avatar.py @@ -56,11 +56,11 @@ OPTIONS = { class PrideAvatar(commands.Cog): """Put an LGBT spin on your avatar!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @staticmethod - def crop_avatar(avatar): + def crop_avatar(avatar: Image) -> Image: """This crops the avatar into a circle.""" mask = Image.new("L", avatar.size, 0) draw = ImageDraw.Draw(mask) @@ -69,7 +69,7 @@ class PrideAvatar(commands.Cog): return avatar @staticmethod - def crop_ring(ring, px): + def crop_ring(ring: Image, px: int) -> Image: """This crops the ring into a circle.""" mask = Image.new("L", ring.size, 0) draw = ImageDraw.Draw(mask) @@ -79,7 +79,7 @@ class PrideAvatar(commands.Cog): return ring @commands.group(aliases=["avatarpride", "pridepfp", "prideprofile"], invoke_without_command=True) - async def prideavatar(self, ctx, option="lgbt", pixels: int = 64): + async def prideavatar(self, ctx: commands.Context, option: str = "lgbt", pixels: int = 64) -> None: """ This surrounds an avatar with a border of a specified LGBT flag. @@ -126,7 +126,7 @@ class PrideAvatar(commands.Cog): await ctx.send(file=file, embed=embed) @prideavatar.command() - async def flags(self, ctx): + async def flags(self, ctx: commands.Context) -> None: """This lists the flags that can be used with the prideavatar command.""" choices = sorted(set(OPTIONS.values())) options = "• " + "\n• ".join(choices) @@ -139,7 +139,7 @@ class PrideAvatar(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Cog load.""" bot.add_cog(PrideAvatar(bot)) log.info("PrideAvatar cog loaded") diff --git a/bot/seasons/season.py b/bot/seasons/season.py index c88ef2a7..3546fda6 100644 --- a/bot/seasons/season.py +++ b/bot/seasons/season.py @@ -264,14 +264,14 @@ class SeasonBase: return await self.apply_server_icon() - async def announce_season(self): + async def announce_season(self) -> None: """ Announces a change in season in the announcement channel. It will skip the announcement if the current active season is the "evergreen" default season. """ # Don't actually announce if reverting to normal season - if self.name in ("evergreen", "wildcard"): + if self.name in ("evergreen", "wildcard", "halloween"): log.debug(f"Season Changed: {self.name}") return @@ -303,7 +303,7 @@ class SeasonBase: cogs.append(cog_name) if cogs: - def cog_name(cog): + def cog_name(cog: commands.Cog) -> str: return type(cog).__name__ cog_info = [] @@ -320,7 +320,7 @@ class SeasonBase: await channel.send(mention, embed=embed) - async def load(self): + async def load(self) -> None: """ Loads extensions, bot name and avatar, server icon and announces new season. @@ -361,7 +361,7 @@ class SeasonBase: class SeasonManager(commands.Cog): """A cog for managing seasons.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.season = get_season(date=datetime.datetime.utcnow()) self.season_task = bot.loop.create_task(self.load_seasons()) @@ -378,7 +378,7 @@ class SeasonManager(commands.Cog): ) self.sleep_time = (midnight - datetime.datetime.now()).seconds + 60 - async def load_seasons(self): + async def load_seasons(self) -> None: """Asynchronous timer loop to check for a new season every midnight.""" await self.bot.wait_until_ready() await self.season.load() @@ -397,7 +397,7 @@ class SeasonManager(commands.Cog): @with_role(Roles.moderator, Roles.admin, Roles.owner) @commands.command(name="season") - async def change_season(self, ctx, new_season: str): + async def change_season(self, ctx: commands.Context, new_season: str) -> None: """Changes the currently active season on the bot.""" self.season = get_season(season_name=new_season) await self.season.load() @@ -405,10 +405,10 @@ class SeasonManager(commands.Cog): @with_role(Roles.moderator, Roles.admin, Roles.owner) @commands.command(name="seasons") - async def show_seasons(self, ctx): + async def show_seasons(self, ctx: commands.Context) -> None: """Shows the available seasons and their dates.""" # Sort by start order, followed by lower duration - def season_key(season_class: Type[SeasonBase]): + def season_key(season_class: Type[SeasonBase]) -> Tuple[datetime.datetime, datetime.timedelta]: return season_class.start(), season_class.end() - datetime.datetime.max current_season = self.season.name @@ -448,13 +448,13 @@ class SeasonManager(commands.Cog): @with_role(Roles.moderator, Roles.admin, Roles.owner) @commands.group() - async def refresh(self, ctx): + async def refresh(self, ctx: commands.Context) -> None: """Refreshes certain seasonal elements without reloading seasons.""" if not ctx.invoked_subcommand: await ctx.send_help(ctx.command) @refresh.command(name="avatar") - async def refresh_avatar(self, ctx): + async def refresh_avatar(self, ctx: commands.Context) -> None: """Re-applies the bot avatar for the currently loaded season.""" # Attempt the change is_changed = await self.season.apply_avatar() @@ -477,7 +477,7 @@ class SeasonManager(commands.Cog): await ctx.send(embed=embed) @refresh.command(name="icon") - async def refresh_server_icon(self, ctx): + async def refresh_server_icon(self, ctx: commands.Context) -> None: """Re-applies the server icon for the currently loaded season.""" # Attempt the change is_changed = await self.season.apply_server_icon() @@ -500,7 +500,7 @@ class SeasonManager(commands.Cog): await ctx.send(embed=embed) @refresh.command(name="username", aliases=("name",)) - async def refresh_username(self, ctx): + async def refresh_username(self, ctx: commands.Context) -> None: """Re-applies the bot username for the currently loaded season.""" old_username = str(bot.user) old_display_name = ctx.guild.me.display_name @@ -539,10 +539,10 @@ class SeasonManager(commands.Cog): @with_role(Roles.moderator, Roles.admin, Roles.owner) @commands.command() - async def announce(self, ctx): + async def announce(self, ctx: commands.Context) -> None: """Announces the currently loaded season.""" await self.season.announce_season() - def cog_unload(self): + def cog_unload(self) -> None: """Cancel season-related tasks on cog unload.""" self.season_task.cancel() diff --git a/bot/seasons/valentines/be_my_valentine.py b/bot/seasons/valentines/be_my_valentine.py index c4acf17a..a073e1bd 100644 --- a/bot/seasons/valentines/be_my_valentine.py +++ b/bot/seasons/valentines/be_my_valentine.py @@ -1,8 +1,8 @@ import logging import random -import typing from json import load from pathlib import Path +from typing import Optional, Tuple import discord from discord.ext import commands @@ -18,12 +18,12 @@ HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_hea class BeMyValentine(commands.Cog): """A cog that sends Valentines to other users!""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.valentines = self.load_json() @staticmethod - def load_json(): + def load_json() -> dict: """Load Valentines messages from the static resources.""" p = Path("bot/resources/valentines/bemyvalentine_valentines.json") with p.open() as json_data: @@ -31,7 +31,7 @@ class BeMyValentine(commands.Cog): return valentines @commands.group(name="lovefest", invoke_without_command=True) - async def lovefest_role(self, ctx): + async def lovefest_role(self, ctx: commands.Context) -> None: """ Subscribe or unsubscribe from the lovefest role. @@ -43,7 +43,7 @@ class BeMyValentine(commands.Cog): await ctx.send_help(ctx.command) @lovefest_role.command(name="sub") - async def add_role(self, ctx): + async def add_role(self, ctx: commands.Context) -> None: """Adds the lovefest role.""" user = ctx.author role = discord.utils.get(ctx.guild.roles, id=Lovefest.role_id) @@ -54,7 +54,7 @@ class BeMyValentine(commands.Cog): await ctx.send("You already have the role !") @lovefest_role.command(name="unsub") - async def remove_role(self, ctx): + async def remove_role(self, ctx: commands.Context) -> None: """Removes the lovefest role.""" user = ctx.author role = discord.utils.get(ctx.guild.roles, id=Lovefest.role_id) @@ -66,7 +66,9 @@ class BeMyValentine(commands.Cog): @commands.cooldown(1, 1800, BucketType.user) @commands.group(name='bemyvalentine', invoke_without_command=True) - async def send_valentine(self, ctx, user: typing.Optional[discord.Member] = None, *, valentine_type=None): + async def send_valentine( + self, ctx: commands.Context, user: Optional[discord.Member] = None, *, valentine_type: str = None + ) -> None: """ Send a valentine to user, if specified, or to a random user with the lovefest role. @@ -112,7 +114,9 @@ class BeMyValentine(commands.Cog): @commands.cooldown(1, 1800, BucketType.user) @send_valentine.command(name='secret') - async def anonymous(self, ctx, user: typing.Optional[discord.Member] = None, *, valentine_type=None): + async def anonymous( + self, ctx: commands.Context, user: Optional[discord.Member] = None, *, valentine_type: str = None + ) -> None: """ Send an anonymous Valentine via DM to to a user, if specified, or to a random with the lovefest role. @@ -164,7 +168,7 @@ class BeMyValentine(commands.Cog): else: await ctx.author.send(f"Your message has been sent to {user}") - def valentine_check(self, valentine_type): + def valentine_check(self, valentine_type: str) -> Tuple[str, str]: """Return the appropriate Valentine type & title based on the invoking user's input.""" if valentine_type is None: valentine, title = self.random_valentine() @@ -184,7 +188,7 @@ class BeMyValentine(commands.Cog): return valentine, title @staticmethod - def random_user(author: discord.Member, members: discord.Member): + def random_user(author: discord.Member, members: discord.Member) -> None: """ Picks a random member from the list provided in `members`. @@ -196,13 +200,13 @@ class BeMyValentine(commands.Cog): return random.choice(members) if members else None @staticmethod - def random_emoji(): + def random_emoji() -> Tuple[str, str]: """Return two random emoji from the module-defined constants.""" EMOJI_1 = random.choice(HEART_EMOJIS) EMOJI_2 = random.choice(HEART_EMOJIS) return EMOJI_1, EMOJI_2 - def random_valentine(self): + def random_valentine(self) -> Tuple[str, str]: """Grabs a random poem or a compliment (any message).""" valentine_poem = random.choice(self.valentines['valentine_poems']) valentine_compliment = random.choice(self.valentines['valentine_compliments']) @@ -213,18 +217,18 @@ class BeMyValentine(commands.Cog): title = 'A compliment for ' return random_valentine, title - def valentine_poem(self): + def valentine_poem(self) -> str: """Grabs a random poem.""" valentine_poem = random.choice(self.valentines['valentine_poems']) return valentine_poem - def valentine_compliment(self): + def valentine_compliment(self) -> str: """Grabs a random compliment.""" valentine_compliment = random.choice(self.valentines['valentine_compliments']) return valentine_compliment -def setup(bot): +def setup(bot: commands.Bot) -> None: """Be my Valentine Cog load.""" bot.add_cog(BeMyValentine(bot)) log.info("BeMyValentine cog loaded") diff --git a/bot/seasons/valentines/lovecalculator.py b/bot/seasons/valentines/lovecalculator.py index 1d5a028d..03d3d7d5 100644 --- a/bot/seasons/valentines/lovecalculator.py +++ b/bot/seasons/valentines/lovecalculator.py @@ -23,12 +23,12 @@ with Path("bot/resources/valentines/love_matches.json").open() as file: class LoveCalculator(Cog): """A cog for calculating the love between two people.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(aliases=('love_calculator', 'love_calc')) @commands.cooldown(rate=1, per=5, type=commands.BucketType.user) - async def love(self, ctx, who: Union[Member, str], whom: Union[Member, str] = None): + async def love(self, ctx: commands.Context, who: Union[Member, str], whom: Union[Member, str] = None) -> None: """ Tells you how much the two love each other. @@ -53,7 +53,7 @@ class LoveCalculator(Cog): staff = ctx.guild.get_role(Roles.helpers).members whom = random.choice(staff) - def normalize(arg): + def normalize(arg: Union[Member, str]) -> str: if isinstance(arg, Member): # If we are given a member, return name#discrim without any extra changes arg = str(arg) @@ -98,7 +98,7 @@ class LoveCalculator(Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Love calculator Cog load.""" bot.add_cog(LoveCalculator(bot)) log.info("LoveCalculator cog loaded") diff --git a/bot/seasons/valentines/movie_generator.py b/bot/seasons/valentines/movie_generator.py index fa5f236a..ce1d7d5b 100644 --- a/bot/seasons/valentines/movie_generator.py +++ b/bot/seasons/valentines/movie_generator.py @@ -14,11 +14,11 @@ log = logging.getLogger(__name__) class RomanceMovieFinder(commands.Cog): """A Cog that returns a random romance movie suggestion to a user.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(name="romancemovie") - async def romance_movie(self, ctx): + async def romance_movie(self, ctx: commands.Context) -> None: """Randomly selects a romance movie and displays information about it.""" # Selecting a random int to parse it to the page parameter random_page = random.randint(0, 20) @@ -57,7 +57,7 @@ class RomanceMovieFinder(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Romance movie Cog load.""" bot.add_cog(RomanceMovieFinder(bot)) log.info("RomanceMovieFinder cog loaded") diff --git a/bot/seasons/valentines/myvalenstate.py b/bot/seasons/valentines/myvalenstate.py index fad202e3..0256c39a 100644 --- a/bot/seasons/valentines/myvalenstate.py +++ b/bot/seasons/valentines/myvalenstate.py @@ -18,10 +18,10 @@ with open(Path("bot/resources/valentines/valenstates.json"), "r") as file: class MyValenstate(commands.Cog): """A Cog to find your most likely Valentine's vacation destination.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot - def levenshtein(self, source, goal): + def levenshtein(self, source: str, goal: str) -> int: """Calculates the Levenshtein Distance between source and goal.""" if len(source) < len(goal): return self.levenshtein(goal, source) @@ -42,7 +42,7 @@ class MyValenstate(commands.Cog): return pre_row[-1] @commands.command() - async def myvalenstate(self, ctx, *, name=None): + async def myvalenstate(self, ctx: commands.Context, *, name: str = None) -> None: """Find the vacation spot(s) with the most matching characters to the invoking user.""" eq_chars = collections.defaultdict(int) if name is None: @@ -81,7 +81,7 @@ class MyValenstate(commands.Cog): await ctx.channel.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Valenstate Cog load.""" bot.add_cog(MyValenstate(bot)) log.info("MyValenstate cog loaded") diff --git a/bot/seasons/valentines/pickuplines.py b/bot/seasons/valentines/pickuplines.py index 46772197..8b2c9822 100644 --- a/bot/seasons/valentines/pickuplines.py +++ b/bot/seasons/valentines/pickuplines.py @@ -17,11 +17,11 @@ with open(Path("bot/resources/valentines/pickup_lines.json"), "r", encoding="utf class PickupLine(commands.Cog): """A cog that gives random cheesy pickup lines.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command() - async def pickupline(self, ctx): + async def pickupline(self, ctx: commands.Context) -> None: """ Gives you a random pickup line. @@ -39,7 +39,7 @@ class PickupLine(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Pickup lines Cog load.""" bot.add_cog(PickupLine(bot)) log.info('PickupLine cog loaded') diff --git a/bot/seasons/valentines/savethedate.py b/bot/seasons/valentines/savethedate.py index 34264183..e0bc3904 100644 --- a/bot/seasons/valentines/savethedate.py +++ b/bot/seasons/valentines/savethedate.py @@ -19,11 +19,11 @@ with open(Path("bot/resources/valentines/date_ideas.json"), "r", encoding="utf8" class SaveTheDate(commands.Cog): """A cog that gives random suggestion for a Valentine's date.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command() - async def savethedate(self, ctx): + async def savethedate(self, ctx: commands.Context) -> None: """Gives you ideas for what to do on a date with your valentine.""" random_date = random.choice(VALENTINES_DATES['ideas']) emoji_1 = random.choice(HEART_EMOJIS) @@ -36,7 +36,7 @@ class SaveTheDate(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Save the date Cog Load.""" bot.add_cog(SaveTheDate(bot)) log.info("SaveTheDate cog loaded") diff --git a/bot/seasons/valentines/valentine_zodiac.py b/bot/seasons/valentines/valentine_zodiac.py index fa849cb2..c8d77e75 100644 --- a/bot/seasons/valentines/valentine_zodiac.py +++ b/bot/seasons/valentines/valentine_zodiac.py @@ -17,12 +17,12 @@ HEART_EMOJIS = [":heart:", ":gift_heart:", ":revolving_hearts:", ":sparkling_hea class ValentineZodiac(commands.Cog): """A Cog that returns a counter compatible zodiac sign to the given user's zodiac sign.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot self.zodiacs = self.load_json() @staticmethod - def load_json(): + def load_json() -> dict: """Load zodiac compatibility from static JSON resource.""" p = Path("bot/resources/valentines/zodiac_compatibility.json") with p.open() as json_data: @@ -30,7 +30,7 @@ class ValentineZodiac(commands.Cog): return zodiacs @commands.command(name="partnerzodiac") - async def counter_zodiac(self, ctx, zodiac_sign): + async def counter_zodiac(self, ctx: commands.Context, zodiac_sign: str) -> None: """Provides a counter compatible zodiac sign to the given user's zodiac sign.""" try: compatible_zodiac = random.choice(self.zodiacs[zodiac_sign.lower()]) @@ -52,7 +52,7 @@ class ValentineZodiac(commands.Cog): await ctx.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Valentine zodiac Cog load.""" bot.add_cog(ValentineZodiac(bot)) log.info("ValentineZodiac cog loaded") diff --git a/bot/seasons/valentines/whoisvalentine.py b/bot/seasons/valentines/whoisvalentine.py index d73ccd9b..b8586dca 100644 --- a/bot/seasons/valentines/whoisvalentine.py +++ b/bot/seasons/valentines/whoisvalentine.py @@ -17,11 +17,11 @@ with open(Path("bot/resources/valentines/valentine_facts.json"), "r") as file: class ValentineFacts(commands.Cog): """A Cog for displaying facts about Saint Valentine.""" - def __init__(self, bot): + def __init__(self, bot: commands.Bot): self.bot = bot @commands.command(aliases=('whoisvalentine', 'saint_valentine')) - async def who_is_valentine(self, ctx): + async def who_is_valentine(self, ctx: commands.Context) -> None: """Displays info about Saint Valentine.""" embed = discord.Embed( title="Who is Saint Valentine?", @@ -36,7 +36,7 @@ class ValentineFacts(commands.Cog): await ctx.channel.send(embed=embed) @commands.command() - async def valentine_fact(self, ctx): + async def valentine_fact(self, ctx: commands.Context) -> None: """Shows a random fact about Valentine's Day.""" embed = discord.Embed( title=choice(FACTS['titles']), @@ -47,7 +47,7 @@ class ValentineFacts(commands.Cog): await ctx.channel.send(embed=embed) -def setup(bot): +def setup(bot: commands.Bot) -> None: """Who is Valentine Cog load.""" bot.add_cog(ValentineFacts(bot)) log.info("ValentineFacts cog loaded") diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py index 15c4b5db..0aa50af6 100644 --- a/bot/utils/__init__.py +++ b/bot/utils/__init__.py @@ -1,4 +1,6 @@ import asyncio +import re +import string from typing import List import discord @@ -27,7 +29,7 @@ async def disambiguate( choices = (f'{index}: {entry}' for index, entry in enumerate(entries, start=1)) - def check(message): + def check(message: discord.Message) -> bool: return (message.content.isdigit() and message.author == ctx.author and message.channel == ctx.channel) @@ -71,3 +73,57 @@ async def disambiguate( return entries[index - 1] except IndexError: raise BadArgument('Invalid choice.') + + +def replace_many( + sentence: str, replacements: dict, *, ignore_case: bool = False, match_case: bool = False +) -> str: + """ + Replaces multiple substrings in a string given a mapping of strings. + + By default replaces long strings before short strings, and lowercase before uppercase. + Example: + var = replace_many("This is a sentence", {"is": "was", "This": "That"}) + assert var == "That was a sentence" + + If `ignore_case` is given, does a case insensitive match. + Example: + var = replace_many("THIS is a sentence", {"IS": "was", "tHiS": "That"}, ignore_case=True) + assert var == "That was a sentence" + + If `match_case` is given, matches the case of the replacement with the replaced word. + Example: + var = replace_many( + "This IS a sentence", {"is": "was", "this": "that"}, ignore_case=True, match_case=True + ) + assert var == "That WAS a sentence" + """ + if ignore_case: + replacements = dict( + (word.lower(), replacement) for word, replacement in replacements.items() + ) + + words_to_replace = sorted(replacements, key=lambda s: (-len(s), s)) + + # Join and compile words to replace into a regex + pattern = "|".join(re.escape(word) for word in words_to_replace) + regex = re.compile(pattern, re.I if ignore_case else 0) + + def _repl(match: re.Match) -> str: + """Returns replacement depending on `ignore_case` and `match_case`.""" + word = match.group(0) + replacement = replacements[word.lower() if ignore_case else word] + + if not match_case: + return replacement + + # Clean punctuation from word so string methods work + cleaned_word = word.translate(str.maketrans('', '', string.punctuation)) + if cleaned_word.isupper(): + return replacement.upper() + elif cleaned_word[0].isupper(): + return replacement.capitalize() + else: + return replacement.lower() + + return regex.sub(_repl, sentence) diff --git a/bot/utils/halloween/spookifications.py b/bot/utils/halloween/spookifications.py index 69b49919..11f69850 100644 --- a/bot/utils/halloween/spookifications.py +++ b/bot/utils/halloween/spookifications.py @@ -7,7 +7,7 @@ from PIL import ImageOps log = logging.getLogger() -def inversion(im): +def inversion(im: Image) -> Image: """ Inverts the image. @@ -18,7 +18,7 @@ def inversion(im): return inv -def pentagram(im): +def pentagram(im: Image) -> Image: """Adds pentagram to the image.""" im = im.convert('RGB') wt, ht = im.size @@ -28,7 +28,7 @@ def pentagram(im): return im -def bat(im): +def bat(im: Image) -> Image: """ Adds a bat silhoutte to the image. @@ -50,7 +50,7 @@ def bat(im): return im -def get_random_effect(im): +def get_random_effect(im: Image) -> Image: """Randomly selects and applies an effect.""" effects = [inversion, pentagram, bat] effect = choice(effects) diff --git a/bot/utils/persist.py b/bot/utils/persist.py new file mode 100644 index 00000000..a60a1219 --- /dev/null +++ b/bot/utils/persist.py @@ -0,0 +1,66 @@ +import sqlite3 +from pathlib import Path +from shutil import copyfile + +from bot.seasons.season import get_seasons + +DIRECTORY = Path("data") # directory that has a persistent volume mapped to it + + +def make_persistent(file_path: Path) -> Path: + """ + Copy datafile at the provided file_path to the persistent data directory. + + A persistent data file is needed by some features in order to not lose data + after bot rebuilds. + + This function will ensure that a clean data file with default schema, + structure or data is copied over to the persistent volume before returning + the path to this new persistent version of the file. + + If the persistent file already exists, it won't be overwritten with the + clean default file, just returning the Path instead to the existing file. + + Note: Avoid using the same file name as other features in the same seasons + as otherwise only one datafile can be persistent and will be returned for + both cases. + + Example Usage: + >>> import json + >>> template_datafile = Path("bot", "resources", "evergreen", "myfile.json") + >>> path_to_persistent_file = make_persistent(template_datafile) + >>> print(path_to_persistent_file) + data/evergreen/myfile.json + >>> with path_to_persistent_file.open("w+") as f: + >>> data = json.load(f) + """ + # ensure the persistent data directory exists + DIRECTORY.mkdir(exist_ok=True) + + if not file_path.is_file(): + raise OSError(f"File not found at {file_path}.") + + # detect season in datafile path for assigning to subdirectory + season = next((s for s in get_seasons() if s in file_path.parts), None) + + if season: + # make sure subdirectory exists first + subdirectory = Path(DIRECTORY, season) + subdirectory.mkdir(exist_ok=True) + + persistent_path = Path(subdirectory, file_path.name) + + else: + persistent_path = Path(DIRECTORY, file_path.name) + + # copy base/template datafile to persistent directory + if not persistent_path.exists(): + copyfile(file_path, persistent_path) + + return persistent_path + + +def sqlite(db_path: Path) -> sqlite3.Connection: + """Copy sqlite file to the persistent data directory and return an open connection.""" + persistent_path = make_persistent(db_path) + return sqlite3.connect(persistent_path) |