diff options
| author | 2020-12-29 01:15:03 -0500 | |
|---|---|---|
| committer | 2020-12-29 01:15:03 -0500 | |
| commit | 52d626131527dfae3c100aab9a530bae928b0fff (patch) | |
| tree | 8b09d5e4579924d8f1c7b9917cb2f04fdd54c29c /bot/exts | |
| parent | Capitalize "ID" in error message (diff) | |
| parent | Merge pull request #484 from WillDaSilva/omdb-to-tmdb (diff) | |
Merge branch 'master' into startup-channel-check
Diffstat (limited to 'bot/exts')
35 files changed, 1644 insertions, 1808 deletions
diff --git a/bot/exts/christmas/advent_of_code/__init__.py b/bot/exts/christmas/advent_of_code/__init__.py new file mode 100644 index 00000000..3c521168 --- /dev/null +++ b/bot/exts/christmas/advent_of_code/__init__.py @@ -0,0 +1,10 @@ +from bot.bot import Bot + + +def setup(bot: Bot) -> None: + """Set up the Advent of Code extension.""" + # Import the Cog at runtime to prevent side effects like defining + # RedisCache instances too early. + from ._cog import AdventOfCode + + bot.add_cog(AdventOfCode(bot)) diff --git a/bot/exts/christmas/advent_of_code/_caches.py b/bot/exts/christmas/advent_of_code/_caches.py new file mode 100644 index 00000000..32d5394f --- /dev/null +++ b/bot/exts/christmas/advent_of_code/_caches.py @@ -0,0 +1,5 @@ +import async_rediscache + +leaderboard_counts = async_rediscache.RedisCache(namespace="AOC_leaderboard_counts") +leaderboard_cache = async_rediscache.RedisCache(namespace="AOC_leaderboard_cache") +assigned_leaderboard = async_rediscache.RedisCache(namespace="AOC_assigned_leaderboard") diff --git a/bot/exts/christmas/advent_of_code/_cog.py b/bot/exts/christmas/advent_of_code/_cog.py new file mode 100644 index 00000000..c3b87f96 --- /dev/null +++ b/bot/exts/christmas/advent_of_code/_cog.py @@ -0,0 +1,296 @@ +import json +import logging +from datetime import datetime, timedelta +from pathlib import Path + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import ( + AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS, +) +from bot.exts.christmas.advent_of_code import _helpers +from bot.utils.decorators import InChannelCheckFailure, in_month, override_in_channel, with_role + +log = logging.getLogger(__name__) + +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} + +AOC_WHITELIST_RESTRICTED = WHITELISTED_CHANNELS + (Channels.advent_of_code_commands,) + +# Some commands can be run in the regular advent of code channel +# They aren't spammy and foster discussion +AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,) + + +class AdventOfCode(commands.Cog): + """Advent of Code festivities! Ho Ho Ho!""" + + def __init__(self, bot: Bot) -> None: + self.bot = bot + + self._base_url = f"https://adventofcode.com/{AocConfig.year}" + self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" + + self.about_aoc_filepath = Path("./bot/resources/advent_of_code/about.json") + self.cached_about_aoc = self._build_about_embed() + + self.countdown_task = None + self.status_task = None + + notification_coro = _helpers.new_puzzle_notification(self.bot) + self.notification_task = self.bot.loop.create_task(notification_coro) + self.notification_task.set_name("Daily AoC Notification") + self.notification_task.add_done_callback(_helpers.background_task_callback) + + status_coro = _helpers.countdown_status(self.bot) + self.status_task = self.bot.loop.create_task(status_coro) + self.status_task.set_name("AoC Status Countdown") + self.status_task.add_done_callback(_helpers.background_task_callback) + + @commands.group(name="adventofcode", aliases=("aoc",)) + @override_in_channel(AOC_WHITELIST) + async def adventofcode_group(self, ctx: commands.Context) -> None: + """All of the Advent of Code commands.""" + if not ctx.invoked_subcommand: + await ctx.send_help(ctx.command) + + @adventofcode_group.command( + name="subscribe", + aliases=("sub", "notifications", "notify", "notifs"), + brief="Notifications for new days" + ) + @override_in_channel(AOC_WHITELIST) + async def aoc_subscribe(self, ctx: commands.Context) -> None: + """Assign the role for notifications about new days being ready.""" + current_year = datetime.now().year + if current_year != AocConfig.year: + await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!") + return + + role = ctx.guild.get_role(AocConfig.role_id) + unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" + + if role not in ctx.author.roles: + await ctx.author.add_roles(role) + await ctx.send("Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " + f"You can run `{unsubscribe_command}` to disable them again for you.") + else: + await ctx.send("Hey, you already are receiving notifications about new Advent of Code tasks. " + f"If you don't want them any more, run `{unsubscribe_command}` instead.") + + @in_month(Month.DECEMBER) + @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") + @override_in_channel(AOC_WHITELIST) + async def aoc_unsubscribe(self, ctx: commands.Context) -> None: + """Remove the role for notifications about new days being ready.""" + role = ctx.guild.get_role(AocConfig.role_id) + + if role in ctx.author.roles: + await ctx.author.remove_roles(role) + await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") + else: + await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") + + @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") + @override_in_channel(AOC_WHITELIST) + async def aoc_countdown(self, ctx: commands.Context) -> None: + """Return time left until next day.""" + if not _helpers.is_in_advent(): + datetime_now = datetime.now(_helpers.EST) + + # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past + this_year = datetime(datetime_now.year, 12, 1, tzinfo=_helpers.EST) + next_year = datetime(datetime_now.year + 1, 12, 1, tzinfo=_helpers.EST) + deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) + delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta + + # Add a finer timedelta if there's less than a day left + if delta.days == 0: + delta_str = f"approximately {delta.seconds // 3600} hours" + else: + delta_str = f"{delta.days} days" + + await ctx.send(f"The Advent of Code event is not currently running. " + f"The next event will start in {delta_str}.") + return + + tomorrow, time_left = _helpers.time_left_to_est_midnight() + + hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + + await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") + + @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") + @override_in_channel(AOC_WHITELIST) + async def about_aoc(self, ctx: commands.Context) -> None: + """Respond with an explanation of all things Advent of Code.""" + await ctx.send("", embed=self.cached_about_aoc) + + @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)") + @override_in_channel(AOC_WHITELIST) + async def join_leaderboard(self, ctx: commands.Context) -> None: + """DM the user the information for joining the Python Discord leaderboard.""" + current_year = datetime.now().year + if current_year != AocConfig.year: + await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!") + return + + author = ctx.message.author + log.info(f"{author.name} ({author.id}) has requested a PyDis AoC leaderboard code") + + if AocConfig.staff_leaderboard_id and any(r.id == Roles.helpers for r in author.roles): + join_code = AocConfig.leaderboards[AocConfig.staff_leaderboard_id].join_code + else: + try: + join_code = await _helpers.get_public_join_code(author) + except _helpers.FetchingLeaderboardFailed: + await ctx.send(":x: Failed to get join code! Notified maintainers.") + return + + if not join_code: + log.error(f"Failed to get a join code for user {author} ({author.id})") + error_embed = discord.Embed( + title="Unable to get join code", + description="Failed to get a join code to one of our boards. Please notify staff.", + colour=discord.Colour.red(), + ) + await ctx.send(embed=error_embed) + return + + info_str = [ + "To join our leaderboard, follow these steps:", + "• Log in on https://adventofcode.com", + "• Head over to https://adventofcode.com/leaderboard/private", + f"• Use this code `{join_code}` to join the Python Discord leaderboard!", + ] + try: + await author.send("\n".join(info_str)) + except discord.errors.Forbidden: + log.debug(f"{author.name} ({author.id}) has disabled DMs from server members") + await ctx.send(f":x: {author.mention}, please (temporarily) enable DMs to receive the join code") + else: + await ctx.message.add_reaction(Emojis.envelope) + + @adventofcode_group.command( + name="leaderboard", + aliases=("board", "lb"), + brief="Get a snapshot of the PyDis private AoC leaderboard", + ) + @override_in_channel(AOC_WHITELIST_RESTRICTED) + async def aoc_leaderboard(self, ctx: commands.Context) -> None: + """Get the current top scorers of the Python Discord Leaderboard.""" + async with ctx.typing(): + try: + leaderboard = await _helpers.fetch_leaderboard() + except _helpers.FetchingLeaderboardFailed: + await ctx.send(":x: Unable to fetch leaderboard!") + return + + number_of_participants = leaderboard["number_of_participants"] + + top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) + header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}" + + table = f"```\n{leaderboard['top_leaderboard']}\n```" + info_embed = _helpers.get_summary_embed(leaderboard) + + await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) + + @adventofcode_group.command( + name="global", + aliases=("globalboard", "gb"), + brief="Get a link to the global leaderboard", + ) + @override_in_channel(AOC_WHITELIST_RESTRICTED) + async def aoc_global_leaderboard(self, ctx: commands.Context) -> None: + """Get a link to the global Advent of Code leaderboard.""" + url = self.global_leaderboard_url + global_leaderboard = discord.Embed( + title="Advent of Code — Global Leaderboard", + description=f"You can find the global leaderboard [here]({url})." + ) + global_leaderboard.set_thumbnail(url=_helpers.AOC_EMBED_THUMBNAIL) + await ctx.send(embed=global_leaderboard) + + @adventofcode_group.command( + name="stats", + aliases=("dailystats", "ds"), + brief="Get daily statistics for the Python Discord leaderboard" + ) + @override_in_channel(AOC_WHITELIST_RESTRICTED) + async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None: + """Send an embed with daily completion statistics for the Python Discord leaderboard.""" + try: + leaderboard = await _helpers.fetch_leaderboard() + except _helpers.FetchingLeaderboardFailed: + await ctx.send(":x: Can't fetch leaderboard for stats right now!") + return + + # The daily stats are serialized as JSON as they have to be cached in Redis + daily_stats = json.loads(leaderboard["daily_stats"]) + async with ctx.typing(): + lines = ["Day ⭐ ⭐⭐ | %⭐ %⭐⭐\n================================"] + for day, stars in daily_stats.items(): + star_one = stars["star_one"] + star_two = stars["star_two"] + p_star_one = star_one / leaderboard["number_of_participants"] + p_star_two = star_two / leaderboard["number_of_participants"] + lines.append( + f"{day:>2}) {star_one:>4} {star_two:>4} | {p_star_one:>7.2%} {p_star_two:>7.2%}" + ) + table = "\n".join(lines) + info_embed = _helpers.get_summary_embed(leaderboard) + await ctx.send(f"```\n{table}\n```", embed=info_embed) + + @with_role(Roles.admin, Roles.events_lead) + @adventofcode_group.command( + name="refresh", + aliases=("fetch",), + brief="Force a refresh of the leaderboard cache.", + ) + async def refresh_leaderboard(self, ctx: commands.Context) -> None: + """ + Force a refresh of the leaderboard cache. + + Note: This should be used sparingly, as we want to prevent sending too + many requests to the Advent of Code server. + """ + async with ctx.typing(): + try: + await _helpers.fetch_leaderboard(invalidate_cache=True) + except _helpers.FetchingLeaderboardFailed: + await ctx.send(":x: Something went wrong while trying to refresh the cache!") + else: + await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!") + + def cog_unload(self) -> None: + """Cancel season-related tasks on cog unload.""" + log.debug("Unloading the cog and canceling the background task.") + self.countdown_task.cancel() + self.status_task.cancel() + + def _build_about_embed(self) -> discord.Embed: + """Build and return the informational "About AoC" embed from the resources file.""" + with self.about_aoc_filepath.open("r", encoding="utf8") as f: + embed_fields = json.load(f) + + about_embed = discord.Embed( + title=self._base_url, + colour=Colours.soft_green, + url=self._base_url, + timestamp=datetime.utcnow() + ) + about_embed.set_author(name="Advent of Code", url=self._base_url) + for field in embed_fields: + about_embed.add_field(**field) + + about_embed.set_footer(text="Last Updated") + return about_embed + + async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None: + """Custom error handler if an advent of code command was posted in the wrong channel.""" + if isinstance(error, InChannelCheckFailure): + await ctx.send(f":x: Please use <#{Channels.advent_of_code_commands}> for aoc commands instead.") + error.handled = True diff --git a/bot/exts/christmas/advent_of_code/_helpers.py b/bot/exts/christmas/advent_of_code/_helpers.py new file mode 100644 index 00000000..b7adc895 --- /dev/null +++ b/bot/exts/christmas/advent_of_code/_helpers.py @@ -0,0 +1,592 @@ +import asyncio +import collections +import datetime +import json +import logging +import math +import operator +import typing +from typing import Tuple + +import aiohttp +import discord +import pytz + +from bot.bot import Bot +from bot.constants import AdventOfCode, Channels, Colours +from bot.exts.christmas.advent_of_code import _caches + +log = logging.getLogger(__name__) + +PASTE_URL = "https://paste.pythondiscord.com/documents" +RAW_PASTE_URL_TEMPLATE = "https://paste.pythondiscord.com/raw/{key}" + +# Base API URL for Advent of Code Private Leaderboards +AOC_API_URL = "https://adventofcode.com/{year}/leaderboard/private/view/{leaderboard_id}.json" +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} + +# Leaderboard Line Template +AOC_TABLE_TEMPLATE = "{rank: >4} | {name:25.25} | {score: >5} | {stars}" +HEADER = AOC_TABLE_TEMPLATE.format(rank="", name="Name", score="Score", stars="⭐, ⭐⭐") +HEADER = f"{HEADER}\n{'-' * (len(HEADER) + 2)}" +HEADER_LINES = len(HEADER.splitlines()) +TOP_LEADERBOARD_LINES = HEADER_LINES + AdventOfCode.leaderboard_displayed_members + +# Keys that need to be set for a cached leaderboard +REQUIRED_CACHE_KEYS = ( + "full_leaderboard", + "top_leaderboard", + "full_leaderboard_url", + "leaderboard_fetched_at", + "number_of_participants", + "daily_stats", +) + +AOC_EMBED_THUMBNAIL = ( + "https://raw.githubusercontent.com/python-discord" + "/branding/master/seasonal/christmas/server_icons/festive_256.gif" +) + +# Create an easy constant for the EST timezone +EST = pytz.timezone("EST") + +# Step size for the challenge countdown status +COUNTDOWN_STEP = 60 * 5 + +# Create namedtuple that combines a participant's name and their completion +# time for a specific star. We're going to use this later to order the results +# for each star to compute the rank score. +StarResult = collections.namedtuple("StarResult", "member_id completion_time") + + +class UnexpectedRedirect(aiohttp.ClientError): + """Raised when an unexpected redirect was detected.""" + + +class UnexpectedResponseStatus(aiohttp.ClientError): + """Raised when an unexpected redirect was detected.""" + + +class FetchingLeaderboardFailed(Exception): + """Raised when one or more leaderboards could not be fetched at all.""" + + +def leaderboard_sorting_function(entry: typing.Tuple[str, dict]) -> typing.Tuple[int, int]: + """ + Provide a sorting value for our leaderboard. + + The leaderboard is sorted primarily on the score someone has received and + secondary on the number of stars someone has completed. + """ + result = entry[1] + return result["score"], result["star_2"] + result["star_1"] + + +def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: + """ + Parse the leaderboard data received from the AoC website. + + The data we receive from AoC is structured by member, not by day/star. This + means that we need to "transpose" the data to a per star structure in order + to calculate the rank scores each individual should get. + + As we need our data both "per participant" as well as "per day", we return + the parsed and analyzed data in both formats. + """ + # We need to get an aggregate of completion times for each star of each day, + # instead of per participant to compute the rank scores. This dictionary will + # provide such a transposed dataset. + star_results = collections.defaultdict(list) + + # As we're already iterating over the participants, we can record the number of + # first stars and second stars they've achieved right here and now. This means + # we won't have to iterate over the participants again later. + leaderboard = {} + + # The data we get from the AoC website is structured by member, not by day/star, + # which means we need to iterate over the members to transpose the data to a per + # star view. We need that per star view to compute rank scores per star. + for member in raw_leaderboard_data.values(): + name = member["name"] if member["name"] else f"Anonymous #{member['id']}" + member_id = member['id'] + leaderboard[member_id] = {"name": name, "score": 0, "star_1": 0, "star_2": 0} + + # Iterate over all days for this participant + for day, stars in member["completion_day_level"].items(): + # Iterate over the complete stars for this day for this participant + for star, data in stars.items(): + # Record completion of this star for this individual + leaderboard[member_id][f"star_{star}"] += 1 + + # Record completion datetime for this participant for this day/star + completion_time = datetime.datetime.fromtimestamp(int(data['get_star_ts'])) + star_results[(day, star)].append( + StarResult(member_id=member_id, completion_time=completion_time) + ) + + # Now that we have a transposed dataset that holds the completion time of all + # participants per star, we can compute the rank-based scores each participant + # should get for that star. + max_score = len(leaderboard) + for (day, _star), results in star_results.items(): + # If this day should not count in the ranking, skip it. + if day in AdventOfCode.ignored_days: + continue + + sorted_result = sorted(results, key=operator.attrgetter('completion_time')) + for rank, star_result in enumerate(sorted_result): + leaderboard[star_result.member_id]["score"] += max_score - rank + + # Since dictionaries now retain insertion order, let's use that + sorted_leaderboard = dict( + sorted(leaderboard.items(), key=leaderboard_sorting_function, reverse=True) + ) + + # Create summary stats for the stars completed for each day of the event. + daily_stats = {} + for day in range(1, 26): + day = str(day) + star_one = len(star_results.get((day, "1"), [])) + star_two = len(star_results.get((day, "2"), [])) + # By using a dictionary instead of namedtuple here, we can serialize + # this data to JSON in order to cache it in Redis. + daily_stats[day] = {"star_one": star_one, "star_two": star_two} + + return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard} + + +def _format_leaderboard(leaderboard: typing.Dict[str, dict]) -> str: + """Format the leaderboard using the AOC_TABLE_TEMPLATE.""" + leaderboard_lines = [HEADER] + for rank, data in enumerate(leaderboard.values(), start=1): + leaderboard_lines.append( + AOC_TABLE_TEMPLATE.format( + rank=rank, + name=data["name"], + score=str(data["score"]), + stars=f"({data['star_1']}, {data['star_2']})" + ) + ) + + return "\n".join(leaderboard_lines) + + +async def _leaderboard_request(url: str, board: int, cookies: dict) -> typing.Optional[dict]: + """Make a leaderboard request using the specified session cookie.""" + async with aiohttp.request("GET", url, headers=AOC_REQUEST_HEADER, cookies=cookies) as resp: + # The Advent of Code website redirects silently with a 200 response if a + # session cookie has expired, is invalid, or was not provided. + if str(resp.url) != url: + log.error(f"Fetching leaderboard `{board}` failed! Check the session cookie.") + raise UnexpectedRedirect(f"redirected unexpectedly to {resp.url} for board `{board}`") + + # Every status other than `200` is unexpected, not only 400+ + if not resp.status == 200: + log.error(f"Unexpected response `{resp.status}` while fetching leaderboard `{board}`") + raise UnexpectedResponseStatus(f"status `{resp.status}`") + + return await resp.json() + + +async def _fetch_leaderboard_data() -> typing.Dict[str, typing.Any]: + """Fetch data for all leaderboards and return a pooled result.""" + year = AdventOfCode.year + + # We'll make our requests one at a time to not flood the AoC website with + # up to six simultaneous requests. This may take a little longer, but it + # does avoid putting unnecessary stress on the Advent of Code website. + + # Container to store the raw data of each leaderboard + participants = {} + for leaderboard in AdventOfCode.leaderboards.values(): + leaderboard_url = AOC_API_URL.format(year=year, leaderboard_id=leaderboard.id) + + # Two attempts, one with the original session cookie and one with the fallback session + for attempt in range(1, 3): + log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") + cookies = {"session": leaderboard.session} + try: + raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies) + except UnexpectedRedirect: + if cookies["session"] == AdventOfCode.fallback_session: + log.error("It seems like the fallback cookie has expired!") + raise FetchingLeaderboardFailed from None + + # If we're here, it means that the original session did not + # work. Let's fall back to the fallback session. + leaderboard.use_fallback_session = True + continue + except aiohttp.ClientError: + # Don't retry, something unexpected is wrong and it may not be the session. + raise FetchingLeaderboardFailed from None + else: + # Get the participants and store their current count. + board_participants = raw_data["members"] + await _caches.leaderboard_counts.set(leaderboard.id, len(board_participants)) + participants.update(board_participants) + break + else: + log.error(f"reached 'unreachable' state while fetching board `{leaderboard.id}`.") + raise FetchingLeaderboardFailed + + log.info(f"Fetched leaderboard information for {len(participants)} participants") + return participants + + +async def _upload_leaderboard(leaderboard: str) -> str: + """Upload the full leaderboard to our paste service and return the URL.""" + async with aiohttp.request("POST", PASTE_URL, data=leaderboard) as resp: + try: + resp_json = await resp.json() + except Exception: + log.exception("Failed to upload full leaderboard to paste service") + return "" + + if "key" in resp_json: + return RAW_PASTE_URL_TEMPLATE.format(key=resp_json["key"]) + + log.error(f"Unexpected response from paste service while uploading leaderboard {resp_json}") + return "" + + +def _get_top_leaderboard(full_leaderboard: str) -> str: + """Get the leaderboard up to the maximum specified entries.""" + return "\n".join(full_leaderboard.splitlines()[:TOP_LEADERBOARD_LINES]) + + +@_caches.leaderboard_cache.atomic_transaction +async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: + """ + Get the current Python Discord combined leaderboard. + + The leaderboard is cached and only fetched from the API if the current data + is older than the lifetime set in the constants. To prevent multiple calls + to this function fetching new leaderboard information in case of a cache + miss, this function is locked to one call at a time using a decorator. + """ + cached_leaderboard = await _caches.leaderboard_cache.to_dict() + + # Check if the cached leaderboard contains everything we expect it to. If it + # does not, this probably means the cache has not been created yet or has + # expired in Redis. This check also accounts for a malformed cache. + if invalidate_cache or any(key not in cached_leaderboard for key in REQUIRED_CACHE_KEYS): + log.info("No leaderboard cache available, fetching leaderboards...") + # Fetch the raw data + raw_leaderboard_data = await _fetch_leaderboard_data() + + # Parse it to extract "per star, per day" data and participant scores + parsed_leaderboard_data = _parse_raw_leaderboard_data(raw_leaderboard_data) + + leaderboard = parsed_leaderboard_data["leaderboard"] + number_of_participants = len(leaderboard) + formatted_leaderboard = _format_leaderboard(leaderboard) + full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) + leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() + + cached_leaderboard = { + "full_leaderboard": formatted_leaderboard, + "top_leaderboard": _get_top_leaderboard(formatted_leaderboard), + "full_leaderboard_url": full_leaderboard_url, + "leaderboard_fetched_at": leaderboard_fetched_at, + "number_of_participants": number_of_participants, + "daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]), + } + + # Store the new values in Redis + await _caches.leaderboard_cache.update(cached_leaderboard) + + # Set an expiry on the leaderboard RedisCache + with await _caches.leaderboard_cache._get_pool_connection() as connection: + await connection.expire( + _caches.leaderboard_cache.namespace, + AdventOfCode.leaderboard_cache_expiry_seconds + ) + + return cached_leaderboard + + +def get_summary_embed(leaderboard: dict) -> discord.Embed: + """Get an embed with the current summary stats of the leaderboard.""" + leaderboard_url = leaderboard['full_leaderboard_url'] + refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60 + + aoc_embed = discord.Embed( + colour=Colours.soft_green, + timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), + description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*" + ) + aoc_embed.add_field( + name="Number of Participants", + value=leaderboard["number_of_participants"], + inline=True, + ) + if leaderboard_url: + aoc_embed.add_field( + name="Full Leaderboard", + value=f"[Python Discord Leaderboard]({leaderboard_url})", + inline=True, + ) + aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) + aoc_embed.set_footer(text="Last Updated") + aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL) + + return aoc_embed + + +async def get_public_join_code(author: discord.Member) -> typing.Optional[str]: + """ + Get the join code for one of the non-staff leaderboards. + + If a user has previously requested a join code and their assigned board + hasn't filled up yet, we'll return the same join code to prevent them from + getting join codes for multiple boards. + """ + # Make sure to fetch new leaderboard information if the cache is older than + # 30 minutes. While this still means that there could be a discrepancy + # between the current leaderboard state and the numbers we have here, this + # should work fairly well given the buffer of slots that we have. + await fetch_leaderboard() + previously_assigned_board = await _caches.assigned_leaderboard.get(author.id) + current_board_counts = await _caches.leaderboard_counts.to_dict() + + # Remove the staff board from the current board counts as it should be ignored. + current_board_counts.pop(AdventOfCode.staff_leaderboard_id, None) + + # If this user has already received a join code, we'll give them the + # exact same one to prevent them from joining multiple boards and taking + # up multiple slots. + if previously_assigned_board: + # Check if their previously assigned board still has room for them + if current_board_counts.get(previously_assigned_board, 0) < 200: + log.info(f"{author} ({author.id}) was already assigned to a board with open slots.") + return AdventOfCode.leaderboards[previously_assigned_board].join_code + + log.info( + f"User {author} ({author.id}) previously received the join code for " + f"board `{previously_assigned_board}`, but that board's now full. " + "Assigning another board to this user." + ) + + # If we don't have the current board counts cached, let's force fetching a new cache + if not current_board_counts: + log.warning("Leaderboard counts were missing from the cache unexpectedly!") + await fetch_leaderboard(invalidate_cache=True) + current_board_counts = await _caches.leaderboard_counts.to_dict() + + # Find the board with the current lowest participant count. As we can't + best_board, _count = min(current_board_counts.items(), key=operator.itemgetter(1)) + + if current_board_counts.get(best_board, 0) >= 200: + log.warning(f"User {author} `{author.id}` requested a join code, but all boards are full!") + return + + log.info(f"Assigning user {author} ({author.id}) to board `{best_board}`") + await _caches.assigned_leaderboard.set(author.id, best_board) + + # Return the join code for this board + return AdventOfCode.leaderboards[best_board].join_code + + +def is_in_advent() -> bool: + """ + Check if we're currently on an Advent of Code day, excluding 25 December. + + This helper function is used to check whether or not a feature that prepares + something for the next Advent of Code challenge should run. As the puzzle + published on the 25th is the last puzzle, this check excludes that date. + """ + return datetime.datetime.now(EST).day in range(1, 25) and datetime.datetime.now(EST).month == 12 + + +def time_left_to_est_midnight() -> Tuple[datetime.datetime, datetime.timedelta]: + """Calculate the amount of time left until midnight EST/UTC-5.""" + # Change all time properties back to 00:00 + todays_midnight = datetime.datetime.now(EST).replace( + microsecond=0, + second=0, + minute=0, + hour=0 + ) + + # We want tomorrow so add a day on + tomorrow = todays_midnight + datetime.timedelta(days=1) + + # Calculate the timedelta between the current time and midnight + return tomorrow, tomorrow - datetime.datetime.now(EST) + + +async def wait_for_advent_of_code(*, hours_before: int = 1) -> None: + """ + Wait for the Advent of Code event to start. + + This function returns `hours_before` (default: 1) the Advent of Code + actually starts. This allows functions to schedule and execute code that + needs to run before the event starts. + + If the event has already started, this function returns immediately. + + Note: The "next Advent of Code" is determined based on the current value + of the `AOC_YEAR` environment variable. This allows callers to exit early + if we're already past the Advent of Code edition the bot is currently + configured for. + """ + start = datetime.datetime(AdventOfCode.year, 12, 1, 0, 0, 0, tzinfo=EST) + target = start - datetime.timedelta(hours=hours_before) + now = datetime.datetime.now(EST) + + # If we've already reached or passed to target, we + # simply return immediately. + if now >= target: + return + + delta = target - now + await asyncio.sleep(delta.total_seconds()) + + +async def countdown_status(bot: Bot) -> None: + """ + Add the time until the next challenge is published to the bot's status. + + This function sleeps until 2 hours before the event and exists one hour + after the last challenge has been published. It will not start up again + automatically for next year's event, as it will wait for the environment + variable AOC_YEAR to be updated. + + This ensures that the task will only start sleeping again once the next + event approaches and we're making preparations for that event. + """ + log.debug("Initializing status countdown task.") + # We wait until 2 hours before the event starts. Then we + # set our first countdown status. + await wait_for_advent_of_code(hours_before=2) + + # Log that we're going to start with the countdown status. + log.info("The Advent of Code has started or will start soon, starting countdown status.") + + # Trying to change status too early in the bot's startup sequence will fail + # the task because the websocket instance has not yet been created. Waiting + # for this event means that both the websocket instance has been initialized + # and that the connection to Discord is mature enough to change the presence + # of the bot. + await bot.wait_until_guild_available() + + # Calculate when the task needs to stop running. To prevent the task from + # sleeping for the entire year, it will only wait in the currently + # configured year. This means that the task will only start hibernating once + # we start preparing the next event by changing environment variables. + last_challenge = datetime.datetime(AdventOfCode.year, 12, 25, 0, 0, 0, tzinfo=EST) + end = last_challenge + datetime.timedelta(hours=1) + + while datetime.datetime.now(EST) < end: + _, time_left = time_left_to_est_midnight() + + aligned_seconds = int(math.ceil(time_left.seconds / COUNTDOWN_STEP)) * COUNTDOWN_STEP + hours, minutes = aligned_seconds // 3600, aligned_seconds // 60 % 60 + + if aligned_seconds == 0: + playing = "right now!" + elif aligned_seconds == COUNTDOWN_STEP: + playing = f"in less than {minutes} minutes" + elif hours == 0: + playing = f"in {minutes} minutes" + elif hours == 23: + playing = f"since {60 - minutes} minutes ago" + else: + playing = f"in {hours} hours and {minutes} minutes" + + log.trace(f"Changing presence to {playing!r}") + # Status will look like "Playing in 5 hours and 30 minutes" + await bot.change_presence(activity=discord.Game(playing)) + + # Sleep until next aligned time or a full step if already aligned + delay = time_left.seconds % COUNTDOWN_STEP or COUNTDOWN_STEP + log.trace(f"The countdown status task will sleep for {delay} seconds.") + await asyncio.sleep(delay) + + +async def new_puzzle_notification(bot: Bot) -> None: + """ + Announce the release of a new Advent of Code puzzle. + + This background task hibernates until just before the Advent of Code starts + and will then start announcing puzzles as they are published. After the + event has finished, this task will terminate. + """ + # We wake up one hour before the event starts to prepare the announcement + # of the release of the first puzzle. + await wait_for_advent_of_code(hours_before=1) + + log.info("The Advent of Code has started or will start soon, waking up notification task.") + + # Ensure that the guild cache is loaded so we can get the Advent of Code + # channel and role. + await bot.wait_until_guild_available() + aoc_channel = bot.get_channel(Channels.advent_of_code) + aoc_role = aoc_channel.guild.get_role(AdventOfCode.role_id) + + if not aoc_channel: + log.error("Could not find the AoC channel to send notification in") + return + + if not aoc_role: + log.error("Could not find the AoC role to announce the daily puzzle") + return + + # The last event day is 25 December, so we only have to schedule + # a reminder if the current day is before 25 December. + end = datetime.datetime(AdventOfCode.year, 12, 25, tzinfo=EST) + while datetime.datetime.now(EST) < end: + log.trace("Started puzzle notification loop.") + tomorrow, time_left = time_left_to_est_midnight() + + # Use `total_seconds` to get the time left in fractional seconds This + # should wake us up very close to the target. As a safe guard, the sleep + # duration is padded with 0.1 second to make sure we wake up after + # midnight. + sleep_seconds = time_left.total_seconds() + 0.1 + log.trace(f"The puzzle notification task will sleep for {sleep_seconds} seconds") + await asyncio.sleep(sleep_seconds) + + puzzle_url = f"https://adventofcode.com/{AdventOfCode.year}/day/{tomorrow.day}" + + # Check if the puzzle is already available to prevent our members from spamming + # the puzzle page before it's available by making a small HEAD request. + for retry in range(1, 5): + log.debug(f"Checking if the puzzle is already available (attempt {retry}/4)") + async with bot.http_session.head(puzzle_url, raise_for_status=False) as resp: + if resp.status == 200: + log.debug("Puzzle is available; let's send an announcement message.") + break + log.debug(f"The puzzle is not yet available (status={resp.status})") + await asyncio.sleep(10) + else: + log.error( + "The puzzle does does not appear to be available " + "at this time, canceling announcement" + ) + break + + await aoc_channel.send( + f"{aoc_role.mention} Good morning! Day {tomorrow.day} is ready to be attempted. " + f"View it online now at {puzzle_url}. Good luck!", + allowed_mentions=discord.AllowedMentions( + everyone=False, + users=False, + roles=[aoc_role], + ) + ) + + # Ensure that we don't send duplicate announcements by sleeping to well + # over midnight. This means we're certain to calculate the time to the + # next midnight at the top of the loop. + await asyncio.sleep(120) + + +def background_task_callback(task: asyncio.Task) -> None: + """Check if the finished background task failed to make sure we log errors.""" + if task.cancelled(): + log.info(f"Background task `{task.get_name()}` was cancelled.") + elif exception := task.exception(): + log.error(f"Background task `{task.get_name()}` failed:", exc_info=exception) + else: + log.info(f"Background task `{task.get_name()}` exited normally.") diff --git a/bot/exts/christmas/adventofcode.py b/bot/exts/christmas/adventofcode.py deleted file mode 100644 index b3fe0623..00000000 --- a/bot/exts/christmas/adventofcode.py +++ /dev/null @@ -1,743 +0,0 @@ -import asyncio -import json -import logging -import math -import re -from datetime import datetime, timedelta -from pathlib import Path -from typing import List, Tuple - -import aiohttp -import discord -from bs4 import BeautifulSoup -from discord.ext import commands -from pytz import timezone - -from bot.constants import AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Tokens, WHITELISTED_CHANNELS -from bot.utils import unlocked_role -from bot.utils.decorators import in_month, override_in_channel - -log = logging.getLogger(__name__) - -AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} -AOC_SESSION_COOKIE = {"session": Tokens.aoc_session_cookie} - -EST = timezone("EST") -COUNTDOWN_STEP = 60 * 5 - -AOC_WHITELIST = WHITELISTED_CHANNELS + (Channels.advent_of_code,) - - -def is_in_advent() -> bool: - """Utility function to check if we are between December 1st and December 25th.""" - # Run the code from the 1st to the 24th - return datetime.now(EST).day in range(1, 25) and datetime.now(EST).month == 12 - - -def time_left_to_aoc_midnight() -> Tuple[datetime, timedelta]: - """Calculates the amount of time left until midnight in UTC-5 (Advent of Code maintainer timezone).""" - # Change all time properties back to 00:00 - todays_midnight = datetime.now(EST).replace(microsecond=0, - second=0, - minute=0, - hour=0) - - # We want tomorrow so add a day on - tomorrow = todays_midnight + timedelta(days=1) - - # Calculate the timedelta between the current time and midnight - return tomorrow, tomorrow - datetime.now(EST) - - -async def countdown_status(bot: commands.Bot) -> None: - """Set the playing status of the bot to the minutes & hours left until the next day's challenge.""" - while is_in_advent(): - _, time_left = time_left_to_aoc_midnight() - - aligned_seconds = int(math.ceil(time_left.seconds / COUNTDOWN_STEP)) * COUNTDOWN_STEP - hours, minutes = aligned_seconds // 3600, aligned_seconds // 60 % 60 - - if aligned_seconds == 0: - playing = "right now!" - elif aligned_seconds == COUNTDOWN_STEP: - playing = f"in less than {minutes} minutes" - elif hours == 0: - playing = f"in {minutes} minutes" - elif hours == 23: - playing = f"since {60 - minutes} minutes ago" - else: - playing = f"in {hours} hours and {minutes} minutes" - - # Status will look like "Playing in 5 hours and 30 minutes" - await bot.change_presence(activity=discord.Game(playing)) - - # Sleep until next aligned time or a full step if already aligned - delay = time_left.seconds % COUNTDOWN_STEP or COUNTDOWN_STEP - await asyncio.sleep(delay) - - -async def day_countdown(bot: commands.Bot) -> None: - """ - Calculate the number of seconds left until the next day of Advent. - - Once we have calculated this we should then sleep that number and when the time is reached, ping - the Advent of Code role notifying them that the new challenge is ready. - """ - while is_in_advent(): - tomorrow, time_left = time_left_to_aoc_midnight() - - # Correct `time_left.seconds` for the sleep we have after unlocking the role (-5) and adding - # a second (+1) as the bot is consistently ~0.5 seconds early in announcing the puzzles. - await asyncio.sleep(time_left.seconds - 4) - - channel = bot.get_channel(Channels.advent_of_code) - - if not channel: - log.error("Could not find the AoC channel to send notification in") - break - - aoc_role = channel.guild.get_role(AocConfig.role_id) - if not aoc_role: - log.error("Could not find the AoC role to announce the daily puzzle") - break - - async with unlocked_role(aoc_role, delay=5): - puzzle_url = f"https://adventofcode.com/{AocConfig.year}/day/{tomorrow.day}" - - # Check if the puzzle is already available to prevent our members from spamming - # the puzzle page before it's available by making a small HEAD request. - for retry in range(1, 5): - log.debug(f"Checking if the puzzle is already available (attempt {retry}/4)") - async with bot.http_session.head(puzzle_url, raise_for_status=False) as resp: - if resp.status == 200: - log.debug("Puzzle is available; let's send an announcement message.") - break - log.debug(f"The puzzle is not yet available (status={resp.status})") - await asyncio.sleep(10) - else: - log.error("The puzzle does does not appear to be available at this time, canceling announcement") - break - - await channel.send( - f"{aoc_role.mention} Good morning! Day {tomorrow.day} is ready to be attempted. " - f"View it online now at {puzzle_url}. Good luck!" - ) - - # Wait a couple minutes so that if our sleep didn't sleep enough - # time we don't end up announcing twice. - await asyncio.sleep(120) - - -class AdventOfCode(commands.Cog): - """Advent of Code festivities! Ho Ho Ho!""" - - def __init__(self, bot: commands.Bot): - self.bot = bot - - self._base_url = f"https://adventofcode.com/{AocConfig.year}" - self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" - self.private_leaderboard_url = f"{self._base_url}/leaderboard/private/view/{AocConfig.leaderboard_id}" - - self.about_aoc_filepath = Path("./bot/resources/advent_of_code/about.json") - self.cached_about_aoc = self._build_about_embed() - - self.cached_global_leaderboard = None - self.cached_private_leaderboard = None - - self.countdown_task = None - self.status_task = None - - countdown_coro = day_countdown(self.bot) - self.countdown_task = self.bot.loop.create_task(countdown_coro) - - status_coro = countdown_status(self.bot) - self.status_task = self.bot.loop.create_task(status_coro) - - @in_month(Month.DECEMBER) - @commands.group(name="adventofcode", aliases=("aoc",)) - @override_in_channel(AOC_WHITELIST) - async def adventofcode_group(self, ctx: commands.Context) -> None: - """All of the Advent of Code commands.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @adventofcode_group.command( - name="subscribe", - aliases=("sub", "notifications", "notify", "notifs"), - brief="Notifications for new days" - ) - @override_in_channel(AOC_WHITELIST) - async def aoc_subscribe(self, ctx: commands.Context) -> None: - """Assign the role for notifications about new days being ready.""" - role = ctx.guild.get_role(AocConfig.role_id) - unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" - - if role not in ctx.author.roles: - await ctx.author.add_roles(role) - await ctx.send("Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " - f"You can run `{unsubscribe_command}` to disable them again for you.") - else: - await ctx.send("Hey, you already are receiving notifications about new Advent of Code tasks. " - f"If you don't want them any more, run `{unsubscribe_command}` instead.") - - @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") - @override_in_channel(AOC_WHITELIST) - async def aoc_unsubscribe(self, ctx: commands.Context) -> None: - """Remove the role for notifications about new days being ready.""" - role = ctx.guild.get_role(AocConfig.role_id) - - if role in ctx.author.roles: - await ctx.author.remove_roles(role) - await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") - else: - await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") - - @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") - @override_in_channel(AOC_WHITELIST) - async def aoc_countdown(self, ctx: commands.Context) -> None: - """Return time left until next day.""" - if not is_in_advent(): - datetime_now = datetime.now(EST) - - # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past - this_year = datetime(datetime_now.year, 12, 1, tzinfo=EST) - next_year = datetime(datetime_now.year + 1, 12, 1, tzinfo=EST) - deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) - delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta - - # Add a finer timedelta if there's less than a day left - if delta.days == 0: - delta_str = f"approximately {delta.seconds // 3600} hours" - else: - delta_str = f"{delta.days} days" - - await ctx.send(f"The Advent of Code event is not currently running. " - f"The next event will start in {delta_str}.") - return - - tomorrow, time_left = time_left_to_aoc_midnight() - - hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 - - await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") - - @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") - @override_in_channel(AOC_WHITELIST) - async def about_aoc(self, ctx: commands.Context) -> None: - """Respond with an explanation of all things Advent of Code.""" - await ctx.send("", embed=self.cached_about_aoc) - - @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)") - @override_in_channel(AOC_WHITELIST) - async def join_leaderboard(self, ctx: commands.Context) -> None: - """DM the user the information for joining the PyDis AoC private leaderboard.""" - author = ctx.message.author - log.info(f"{author.name} ({author.id}) has requested the PyDis AoC leaderboard code") - - info_str = ( - "Head over to https://adventofcode.com/leaderboard/private " - f"with code `{AocConfig.leaderboard_join_code}` to join the PyDis private leaderboard!" - ) - try: - await author.send(info_str) - except discord.errors.Forbidden: - log.debug(f"{author.name} ({author.id}) has disabled DMs from server members") - await ctx.send(f":x: {author.mention}, please (temporarily) enable DMs to receive the join code") - else: - await ctx.message.add_reaction(Emojis.envelope) - - @adventofcode_group.command( - name="leaderboard", - aliases=("board", "lb"), - brief="Get a snapshot of the PyDis private AoC leaderboard", - ) - @override_in_channel(AOC_WHITELIST) - async def aoc_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10) -> None: - """ - Pull the top number_of_people_to_display members from the PyDis leaderboard and post an embed. - - For readability, number_of_people_to_display defaults to 10. A maximum value is configured in the - Advent of Code section of the bot constants. number_of_people_to_display values greater than this - limit will default to this maximum and provide feedback to the user. - """ - async with ctx.typing(): - await self._check_leaderboard_cache(ctx) - - if not self.cached_private_leaderboard: - # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() - # Short circuit here if there's an issue - return - - number_of_people_to_display = await self._check_n_entries(ctx, number_of_people_to_display) - - # Generate leaderboard table for embed - members_to_print = self.cached_private_leaderboard.top_n(number_of_people_to_display) - table = AocPrivateLeaderboard.build_leaderboard_embed(members_to_print) - - # Build embed - aoc_embed = discord.Embed( - description=f"Total members: {len(self.cached_private_leaderboard.members)}", - colour=Colours.soft_green, - timestamp=self.cached_private_leaderboard.last_updated - ) - aoc_embed.set_author(name="Advent of Code", url=self.private_leaderboard_url) - aoc_embed.set_footer(text="Last Updated") - - await ctx.send( - content=f"Here's the current Top {number_of_people_to_display}! {Emojis.christmas_tree*3}\n\n{table}", - embed=aoc_embed, - ) - - @adventofcode_group.command( - name="stats", - aliases=("dailystats", "ds"), - brief="Get daily statistics for the PyDis private leaderboard" - ) - @override_in_channel(AOC_WHITELIST) - async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None: - """ - Respond with a table of the daily completion statistics for the PyDis private leaderboard. - - Embed will display the total members and the number of users who have completed each day's puzzle - """ - async with ctx.typing(): - await self._check_leaderboard_cache(ctx) - - if not self.cached_private_leaderboard: - # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() - # Short circuit here if there's an issue - return - - # Build ASCII table - total_members = len(self.cached_private_leaderboard.members) - _star = Emojis.star - header = f"{'Day':4}{_star:^8}{_star*2:^4}{'% ' + _star:^8}{'% ' + _star*2:^4}\n{'='*35}" - table = "" - for day, completions in enumerate(self.cached_private_leaderboard.daily_completion_summary): - per_one_star = f"{(completions[0]/total_members)*100:.2f}" - per_two_star = f"{(completions[1]/total_members)*100:.2f}" - - table += f"{day+1:3}){completions[0]:^8}{completions[1]:^6}{per_one_star:^10}{per_two_star:^6}\n" - - table = f"```\n{header}\n{table}```" - - # Build embed - daily_stats_embed = discord.Embed( - colour=Colours.soft_green, timestamp=self.cached_private_leaderboard.last_updated - ) - daily_stats_embed.set_author(name="Advent of Code", url=self._base_url) - daily_stats_embed.set_footer(text="Last Updated") - - await ctx.send( - content=f"Here's the current daily statistics!\n\n{table}", embed=daily_stats_embed - ) - - @adventofcode_group.command( - name="global", - aliases=("globalboard", "gb"), - brief="Get a snapshot of the global AoC leaderboard", - ) - @override_in_channel(AOC_WHITELIST) - async def global_leaderboard(self, ctx: commands.Context, number_of_people_to_display: int = 10) -> None: - """ - Pull the top number_of_people_to_display members from the global AoC leaderboard and post an embed. - - For readability, number_of_people_to_display defaults to 10. A maximum value is configured in the - Advent of Code section of the bot constants. number_of_people_to_display values greater than this - limit will default to this maximum and provide feedback to the user. - """ - async with ctx.typing(): - await self._check_leaderboard_cache(ctx, global_board=True) - - if not self.cached_global_leaderboard: - # Feedback on issues with leaderboard caching are sent by _check_leaderboard_cache() - # Short circuit here if there's an issue - return - - number_of_people_to_display = await self._check_n_entries(ctx, number_of_people_to_display) - - # Generate leaderboard table for embed - members_to_print = self.cached_global_leaderboard.top_n(number_of_people_to_display) - table = AocGlobalLeaderboard.build_leaderboard_embed(members_to_print) - - # Build embed - aoc_embed = discord.Embed(colour=Colours.soft_green, timestamp=self.cached_global_leaderboard.last_updated) - aoc_embed.set_author(name="Advent of Code", url=self._base_url) - aoc_embed.set_footer(text="Last Updated") - - await ctx.send( - f"Here's the current global Top {number_of_people_to_display}! {Emojis.christmas_tree*3}\n\n{table}", - embed=aoc_embed, - ) - - async def _check_leaderboard_cache(self, ctx: commands.Context, global_board: bool = False) -> None: - """ - Check age of current leaderboard & pull a new one if the board is too old. - - global_board is a boolean to toggle between the global board and the Pydis private board - """ - # Toggle between global & private leaderboards - if global_board: - log.debug("Checking global leaderboard cache") - leaderboard_str = "cached_global_leaderboard" - _shortstr = "global" - else: - log.debug("Checking private leaderboard cache") - leaderboard_str = "cached_private_leaderboard" - _shortstr = "private" - - leaderboard = getattr(self, leaderboard_str) - if not leaderboard: - log.debug(f"No cached {_shortstr} leaderboard found") - await self._boardgetter(global_board) - else: - leaderboard_age = datetime.utcnow() - leaderboard.last_updated - age_seconds = leaderboard_age.total_seconds() - if age_seconds < AocConfig.leaderboard_cache_age_threshold_seconds: - log.debug(f"Cached {_shortstr} leaderboard age less than threshold ({age_seconds} seconds old)") - else: - log.debug(f"Cached {_shortstr} leaderboard age greater than threshold ({age_seconds} seconds old)") - await self._boardgetter(global_board) - - leaderboard = getattr(self, leaderboard_str) - if not leaderboard: - await ctx.send( - "", - embed=_error_embed_helper( - title=f"Something's gone wrong and there's no cached {_shortstr} leaderboard!", - description="Please check in with a staff member.", - ), - ) - - async def _check_n_entries(self, ctx: commands.Context, number_of_people_to_display: int) -> int: - """Check for n > max_entries and n <= 0.""" - max_entries = AocConfig.leaderboard_max_displayed_members - author = ctx.message.author - if not 0 <= number_of_people_to_display <= max_entries: - log.debug( - f"{author.name} ({author.id}) attempted to fetch an invalid number " - f" of entries from the AoC leaderboard ({number_of_people_to_display})" - ) - await ctx.send( - f":x: {author.mention}, number of entries to display must be a positive " - f"integer less than or equal to {max_entries}\n\n" - f"Head to {self.private_leaderboard_url} to view the entire leaderboard" - ) - number_of_people_to_display = max_entries - - return number_of_people_to_display - - def _build_about_embed(self) -> discord.Embed: - """Build and return the informational "About AoC" embed from the resources file.""" - with self.about_aoc_filepath.open("r", encoding="utf8") as f: - embed_fields = json.load(f) - - about_embed = discord.Embed(title=self._base_url, colour=Colours.soft_green, url=self._base_url) - about_embed.set_author(name="Advent of Code", url=self._base_url) - for field in embed_fields: - about_embed.add_field(**field) - - about_embed.set_footer(text=f"Last Updated (UTC): {datetime.utcnow()}") - - return about_embed - - async def _boardgetter(self, global_board: bool) -> None: - """Invoke the proper leaderboard getter based on the global_board boolean.""" - if global_board: - self.cached_global_leaderboard = await AocGlobalLeaderboard.from_url() - else: - self.cached_private_leaderboard = await AocPrivateLeaderboard.from_url() - - def cog_unload(self) -> None: - """Cancel season-related tasks on cog unload.""" - log.debug("Unloading the cog and canceling the background task.") - self.countdown_task.cancel() - self.status_task.cancel() - - -class AocMember: - """Object representing the Advent of Code user.""" - - def __init__(self, name: str, aoc_id: int, stars: int, starboard: list, local_score: int, global_score: int): - self.name = name - self.aoc_id = aoc_id - self.stars = stars - self.starboard = starboard - self.local_score = local_score - self.global_score = global_score - self.completions = self._completions_from_starboard(self.starboard) - - def __repr__(self): - """Generate a user-friendly representation of the AocMember & their score.""" - return f"<{self.name} ({self.aoc_id}): {self.local_score}>" - - @classmethod - def member_from_json(cls, injson: dict) -> "AocMember": - """ - Generate an AocMember from AoC's private leaderboard API JSON. - - injson is expected to be the dict contained in: - - AoC_APIjson['members'][<member id>:str] - - Returns an AocMember object - """ - return cls( - name=injson["name"] if injson["name"] else "Anonymous User", - aoc_id=int(injson["id"]), - stars=injson["stars"], - starboard=cls._starboard_from_json(injson["completion_day_level"]), - local_score=injson["local_score"], - global_score=injson["global_score"], - ) - - @staticmethod - def _starboard_from_json(injson: dict) -> list: - """ - Generate starboard from AoC's private leaderboard API JSON. - - injson is expected to be the dict contained in: - - AoC_APIjson['members'][<member id>:str]['completion_day_level'] - - Returns a list of 25 lists, where each nested list contains a pair of booleans representing - the code challenge completion status for that day - """ - # Basic input validation - if not isinstance(injson, dict): - raise ValueError - - # Initialize starboard - starboard = [] - for _i in range(25): - starboard.append([False, False]) - - # Iterate over days, which are the keys of injson (as str) - for day in injson: - idx = int(day) - 1 - # If there is a second star, the first star must be completed - if "2" in injson[day].keys(): - starboard[idx] = [True, True] - # If the day exists in injson, then at least the first star is completed - else: - starboard[idx] = [True, False] - - return starboard - - @staticmethod - def _completions_from_starboard(starboard: list) -> tuple: - """Return days completed, as a (1 star, 2 star) tuple, from starboard.""" - completions = [0, 0] - for day in starboard: - if day[0]: - completions[0] += 1 - if day[1]: - completions[1] += 1 - - return tuple(completions) - - -class AocPrivateLeaderboard: - """Object representing the Advent of Code private leaderboard.""" - - def __init__(self, members: list, owner_id: int, event_year: int): - self.members = members - self._owner_id = owner_id - self._event_year = event_year - self.last_updated = datetime.utcnow() - - self.daily_completion_summary = self.calculate_daily_completion() - - def top_n(self, n: int = 10) -> dict: - """ - Return the top n participants on the leaderboard. - - If n is not specified, default to the top 10 - """ - return self.members[:n] - - def calculate_daily_completion(self) -> List[tuple]: - """ - Calculate member completion rates by day. - - Return a list of tuples for each day containing the number of users who completed each part - of the challenge - """ - daily_member_completions = [] - for day in range(25): - one_star_count = 0 - two_star_count = 0 - for member in self.members: - if member.starboard[day][1]: - one_star_count += 1 - two_star_count += 1 - elif member.starboard[day][0]: - one_star_count += 1 - else: - daily_member_completions.append((one_star_count, two_star_count)) - - return(daily_member_completions) - - @staticmethod - async def json_from_url( - leaderboard_id: int = AocConfig.leaderboard_id, year: int = AocConfig.year - ) -> "AocPrivateLeaderboard": - """ - Request the API JSON from Advent of Code for leaderboard_id for the specified year's event. - - If no year is input, year defaults to the current year - """ - api_url = f"https://adventofcode.com/{year}/leaderboard/private/view/{leaderboard_id}.json" - - log.debug("Querying Advent of Code Private Leaderboard API") - async with aiohttp.ClientSession(cookies=AOC_SESSION_COOKIE, headers=AOC_REQUEST_HEADER) as session: - async with session.get(api_url) as resp: - if resp.status == 200: - raw_dict = await resp.json() - else: - log.warning(f"Bad response received from AoC ({resp.status}), check session cookie") - resp.raise_for_status() - - return raw_dict - - @classmethod - def from_json(cls, injson: dict) -> "AocPrivateLeaderboard": - """Generate an AocPrivateLeaderboard object from AoC's private leaderboard API JSON.""" - return cls( - members=cls._sorted_members(injson["members"]), owner_id=injson["owner_id"], event_year=injson["event"] - ) - - @classmethod - async def from_url(cls) -> "AocPrivateLeaderboard": - """Helper wrapping of AocPrivateLeaderboard.json_from_url and AocPrivateLeaderboard.from_json.""" - api_json = await cls.json_from_url() - return cls.from_json(api_json) - - @staticmethod - def _sorted_members(injson: dict) -> list: - """ - Generate a sorted list of AocMember objects from AoC's private leaderboard API JSON. - - Output list is sorted based on the AocMember.local_score - """ - members = [AocMember.member_from_json(injson[member]) for member in injson] - members.sort(key=lambda x: x.local_score, reverse=True) - - return members - - @staticmethod - def build_leaderboard_embed(members_to_print: List[AocMember]) -> str: - """ - Build a text table from members_to_print, a list of AocMember objects. - - Returns a string to be used as the content of the bot's leaderboard response - """ - stargroup = f"{Emojis.star}, {Emojis.star*2}" - header = f"{' '*3}{'Score'} {'Name':^25} {stargroup:^7}\n{'-'*44}" - table = "" - for i, member in enumerate(members_to_print): - if member.name == "Anonymous User": - name = f"{member.name} #{member.aoc_id}" - else: - name = member.name - - table += ( - f"{i+1:2}) {member.local_score:4} {name:25.25} " - f"({member.completions[0]:2}, {member.completions[1]:2})\n" - ) - else: - table = f"```{header}\n{table}```" - - return table - - -class AocGlobalLeaderboard: - """Object representing the Advent of Code global leaderboard.""" - - def __init__(self, members: List[tuple]): - self.members = members - self.last_updated = datetime.utcnow() - - def top_n(self, n: int = 10) -> dict: - """ - Return the top n participants on the leaderboard. - - If n is not specified, default to the top 10 - """ - return self.members[:n] - - @classmethod - async def from_url(cls) -> "AocGlobalLeaderboard": - """ - Generate an list of tuples for the entries on AoC's global leaderboard. - - Because there is no API for this, web scraping needs to be used - """ - aoc_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" - - async with aiohttp.ClientSession(headers=AOC_REQUEST_HEADER) as session: - async with session.get(aoc_url) as resp: - if resp.status == 200: - raw_html = await resp.text() - else: - log.warning(f"Bad response received from AoC ({resp.status}), check session cookie") - resp.raise_for_status() - - soup = BeautifulSoup(raw_html, "html.parser") - ele = soup.find_all("div", class_="leaderboard-entry") - - exp = r"(?:[ ]{,2}(\d+)\))?[ ]+(\d+)\s+([\w\(\)\#\@\-\d ]+)" - - lb_list = [] - for entry in ele: - # Strip off the AoC++ decorator - raw_str = entry.text.replace("(AoC++)", "").rstrip() - - # Use a regex to extract the info from the string to unify formatting - # Group 1: Rank - # Group 2: Global Score - # Group 3: Member string - r = re.match(exp, raw_str) - - rank = int(r.group(1)) if r.group(1) else None - global_score = int(r.group(2)) - - member = r.group(3) - if member.lower().startswith("(anonymous"): - # Normalize anonymous user string by stripping () and title casing - member = re.sub(r"[\(\)]", "", member).title() - - lb_list.append((rank, global_score, member)) - - return cls(lb_list) - - @staticmethod - def build_leaderboard_embed(members_to_print: List[tuple]) -> str: - """ - Build a text table from members_to_print, a list of tuples. - - Returns a string to be used as the content of the bot's leaderboard response - """ - header = f"{' '*4}{'Score'} {'Name':^25}\n{'-'*36}" - table = "" - for member in members_to_print: - # In the event of a tie, rank is None - if member[0]: - rank = f"{member[0]:3})" - else: - rank = f"{' ':4}" - table += f"{rank} {member[1]:4} {member[2]:25.25}\n" - else: - table = f"```{header}\n{table}```" - - return table - - -def _error_embed_helper(title: str, description: str) -> discord.Embed: - """Return a red-colored Embed with the given title and description.""" - return discord.Embed(title=title, description=description, colour=discord.Colour.red()) - - -def setup(bot: commands.Bot) -> None: - """Advent of Code Cog load.""" - bot.add_cog(AdventOfCode(bot)) diff --git a/bot/exts/easter/easter_riddle.py b/bot/exts/easter/easter_riddle.py index 8977534f..3c612eb1 100644 --- a/bot/exts/easter/easter_riddle.py +++ b/bot/exts/easter/easter_riddle.py @@ -22,7 +22,7 @@ class EasterRiddle(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot - self.winners = [] + self.winners = set() self.correct = "" self.current_channel = None @@ -79,7 +79,7 @@ class EasterRiddle(commands.Cog): await ctx.send(content, embed=answer_embed) - self.winners = [] + self.winners.clear() self.current_channel = None @commands.Cog.listener() @@ -92,7 +92,7 @@ class EasterRiddle(commands.Cog): return if message.content.lower() == self.correct.lower(): - self.winners.append(message.author.mention) + self.winners.add(message.author.mention) def setup(bot: commands.Bot) -> None: diff --git a/bot/exts/easter/egg_facts.py b/bot/exts/easter/egg_facts.py index 0051aa50..761e9059 100644 --- a/bot/exts/easter/egg_facts.py +++ b/bot/exts/easter/egg_facts.py @@ -6,7 +6,7 @@ from pathlib import Path import discord from discord.ext import commands -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import Channels, Colours, Month from bot.utils.decorators import seasonal_task @@ -20,7 +20,7 @@ class EasterFacts(commands.Cog): It also contains a background task which sends an easter egg fact in the event channel everyday. """ - def __init__(self, bot: SeasonalBot): + def __init__(self, bot: Bot): self.bot = bot self.facts = self.load_json() @@ -38,7 +38,7 @@ class EasterFacts(commands.Cog): """A background task that sends an easter egg fact in the event channel everyday.""" await self.bot.wait_until_guild_available() - channel = self.bot.get_channel(Channels.seasonalbot_commands) + channel = self.bot.get_channel(Channels.community_bot_commands) await channel.send(embed=self.make_embed()) @commands.command(name='eggfact', aliases=['fact']) @@ -56,6 +56,6 @@ class EasterFacts(commands.Cog): ) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """Easter Egg facts cog load.""" bot.add_cog(EasterFacts(bot)) diff --git a/bot/exts/easter/save_the_planet.py b/bot/exts/easter/save_the_planet.py new file mode 100644 index 00000000..8f644259 --- /dev/null +++ b/bot/exts/easter/save_the_planet.py @@ -0,0 +1,29 @@ +import json +from pathlib import Path + +from discord import Embed +from discord.ext import commands + +from bot.utils.randomization import RandomCycle + + +with Path("bot/resources/easter/save_the_planet.json").open('r', encoding='utf8') as f: + EMBED_DATA = RandomCycle(json.load(f)) + + +class SaveThePlanet(commands.Cog): + """A cog that teaches users how they can help our planet.""" + + def __init__(self, bot: commands.Bot) -> None: + self.bot = bot + + @commands.command(aliases=('savetheearth', 'saveplanet', 'saveearth')) + async def savetheplanet(self, ctx: commands.Context) -> None: + """Responds with a random tip on how to be eco-friendly and help our planet.""" + return_embed = Embed.from_dict(next(EMBED_DATA)) + await ctx.send(embed=return_embed) + + +def setup(bot: commands.Bot) -> None: + """Save the Planet Cog load.""" + bot.add_cog(SaveThePlanet(bot)) diff --git a/bot/exts/evergreen/branding.py b/bot/exts/evergreen/branding.py deleted file mode 100644 index 7e531011..00000000 --- a/bot/exts/evergreen/branding.py +++ /dev/null @@ -1,543 +0,0 @@ -import asyncio -import itertools -import json -import logging -import random -import typing as t -from datetime import datetime, time, timedelta -from pathlib import Path - -import arrow -import discord -from discord.embeds import EmptyEmbed -from discord.ext import commands - -from bot.bot import SeasonalBot -from bot.constants import Branding, Colours, Emojis, MODERATION_ROLES, Tokens -from bot.seasons import SeasonBase, get_all_seasons, get_current_season, get_season -from bot.utils import human_months -from bot.utils.decorators import with_role -from bot.utils.exceptions import BrandingError -from bot.utils.persist import make_persistent - -log = logging.getLogger(__name__) - -STATUS_OK = 200 # HTTP status code - -FILE_BANNER = "banner.png" -FILE_AVATAR = "avatar.png" -SERVER_ICONS = "server_icons" - -BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" - -PARAMS = {"ref": "master"} # Target branch -HEADERS = {"Accept": "application/vnd.github.v3+json"} # Ensure we use API v3 - -# A GitHub token is not necessary for the cog to operate, -# unauthorized requests are however limited to 60 per hour -if Tokens.github: - HEADERS["Authorization"] = f"token {Tokens.github}" - - -class GitHubFile(t.NamedTuple): - """ - Represents a remote file on GitHub. - - The `sha` hash is kept so that we can determine that a file has changed, - despite its filename remaining unchanged. - """ - - download_url: str - path: str - sha: str - - -def pretty_files(files: t.Iterable[GitHubFile]) -> str: - """Provide a human-friendly representation of `files`.""" - return "\n".join(file.path for file in files) - - -def time_until_midnight() -> timedelta: - """ - Determine amount of time until the next-up UTC midnight. - - The exact `midnight` moment is actually delayed to 5 seconds after, in order - to avoid potential problems due to imprecise sleep. - """ - now = datetime.utcnow() - tomorrow = now + timedelta(days=1) - midnight = datetime.combine(tomorrow, time(second=5)) - - return midnight - now - - -class BrandingManager(commands.Cog): - """ - Manages the guild's branding. - - The purpose of this cog is to help automate the synchronization of the branding - repository with the guild. It is capable of discovering assets in the repository - via GitHub's API, resolving download urls for them, and delegating - to the `bot` instance to upload them to the guild. - - BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens - once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single - season. The daemon can be turned on and off via the `daemon` cmd group. The value set via - its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will - automatically start on the next bot start-up. Otherwise, it will wait to be started manually. - - All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can - also be invoked manually, via the following API: - - branding list - - Show all available seasons - - branding set <season_name> - - Set the cog's internal state to represent `season_name`, if it exists - - If no `season_name` is given, set chronologically current season - - This will not automatically apply the season's branding to the guild, - the cog's state can be detached from the guild - - Seasons can therefore be 'previewed' using this command - - branding info - - View detailed information about resolved assets for current season - - branding refresh - - Refresh internal state, i.e. synchronize with branding repository - - branding apply - - Apply the current internal state to the guild, i.e. upload the assets - - branding cycle - - If there are multiple available icons for current season, randomly pick - and apply the next one - - The daemon calls these methods autonomously as appropriate. The use of this cog - is locked to moderation roles. As it performs media asset uploads, it is prone to - rate-limits - the `apply` command should be used with caution. The `set` command can, - however, be used freely to 'preview' seasonal branding and check whether paths have been - resolved as appropriate. - - While the bot is in debug mode, it will 'mock' asset uploads by logging the passed - download urls and pretending that the upload was successful. Make use of this - to test this cog's behaviour. - """ - - current_season: t.Type[SeasonBase] - - banner: t.Optional[GitHubFile] - avatar: t.Optional[GitHubFile] - - available_icons: t.List[GitHubFile] - remaining_icons: t.List[GitHubFile] - - days_since_cycle: t.Iterator - - config_file: Path - - daemon: t.Optional[asyncio.Task] - - def __init__(self, bot: SeasonalBot) -> None: - """ - Assign safe default values on init. - - At this point, we don't have information about currently available branding. - Most of these attributes will be overwritten once the daemon connects, or once - the `refresh` command is used. - """ - self.bot = bot - self.current_season = get_current_season() - - self.banner = None - self.avatar = None - - self.available_icons = [] - self.remaining_icons = [] - - self.days_since_cycle = itertools.cycle([None]) - - self.config_file = make_persistent(Path("bot", "resources", "evergreen", "branding.json")) - should_run = self._read_config()["daemon_active"] - - if should_run: - self.daemon = self.bot.loop.create_task(self._daemon_func()) - else: - self.daemon = None - - @property - def _daemon_running(self) -> bool: - """True if the daemon is currently active, False otherwise.""" - return self.daemon is not None and not self.daemon.done() - - def _read_config(self) -> t.Dict[str, bool]: - """Read and return persistent config file.""" - with self.config_file.open("r", encoding="utf8") as persistent_file: - return json.load(persistent_file) - - def _write_config(self, key: str, value: bool) -> None: - """Write a `key`, `value` pair to persistent config file.""" - current_config = self._read_config() - current_config[key] = value - - with self.config_file.open("w", encoding="utf8") as persistent_file: - json.dump(current_config, persistent_file) - - async def _daemon_func(self) -> None: - """ - Manage all automated behaviour of the BrandingManager cog. - - Once a day, the daemon will perform the following tasks: - - Update `current_season` - - Poll GitHub API to see if the available branding for `current_season` has changed - - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname) - - Check whether it's time to cycle guild icons - - The internal loop runs once when activated, then periodically at the time - given by `time_until_midnight`. - - All method calls in the internal loop are considered safe, i.e. no errors propagate - to the daemon's loop. The daemon itself does not perform any error handling on its own. - """ - await self.bot.wait_until_guild_available() - - while True: - self.current_season = get_current_season() - branding_changed = await self.refresh() - - if branding_changed: - await self.apply() - - elif next(self.days_since_cycle) == Branding.cycle_frequency: - await self.cycle() - - until_midnight = time_until_midnight() - await asyncio.sleep(until_midnight.total_seconds()) - - async def _info_embed(self) -> discord.Embed: - """Make an informative embed representing current season.""" - info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour) - - # If we're in a non-evergreen season, also show active months - if self.current_season is not SeasonBase: - title = f"{self.current_season.season_name} ({human_months(self.current_season.months)})" - else: - title = self.current_season.season_name - - # Use the author field to show the season's name and avatar if available - info_embed.set_author(name=title, icon_url=self.avatar.download_url if self.avatar else EmptyEmbed) - - banner = self.banner.path if self.banner is not None else "Unavailable" - info_embed.add_field(name="Banner", value=banner, inline=False) - - avatar = self.avatar.path if self.avatar is not None else "Unavailable" - info_embed.add_field(name="Avatar", value=avatar, inline=False) - - icons = pretty_files(self.available_icons) or "Unavailable" - info_embed.add_field(name="Available icons", value=icons, inline=False) - - # Only display cycle frequency if we're actually cycling - if len(self.available_icons) > 1 and Branding.cycle_frequency: - info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}") - - return info_embed - - async def _reset_remaining_icons(self) -> None: - """Set `remaining_icons` to a shuffled copy of `available_icons`.""" - self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons)) - - async def _reset_days_since_cycle(self) -> None: - """ - Reset the `days_since_cycle` iterator based on configured frequency. - - If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey, - the iterator will always yield None. This signals that the icon shouldn't be cycled. - - Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely. - When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle. - """ - if len(self.available_icons) > 1 and Branding.cycle_frequency: - sequence = range(1, Branding.cycle_frequency + 1) - else: - sequence = [None] - - self.days_since_cycle = itertools.cycle(sequence) - - async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]: - """ - Get files at `path` in the branding repository. - - If `include_dirs` is False (default), only returns files at `path`. - Otherwise, will return both files and directories. Never returns symlinks. - - Return dict mapping from filename to corresponding `GitHubFile` instance. - This may return an empty dict if the response status is non-200, - or if the target directory is empty. - """ - url = f"{BRANDING_URL}/{path}" - async with self.bot.http_session.get(url, headers=HEADERS, params=PARAMS) as resp: - # Short-circuit if we get non-200 response - if resp.status != STATUS_OK: - log.error(f"GitHub API returned non-200 response: {resp}") - return {} - directory = await resp.json() # Directory at `path` - - allowed_types = {"file", "dir"} if include_dirs else {"file"} - return { - file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"]) - for file in directory - if file["type"] in allowed_types - } - - async def refresh(self) -> bool: - """ - Synchronize available assets with branding repository. - - If the current season is not the evergreen, and lacks at least one asset, - we use the evergreen seasonal dir as fallback for missing assets. - - Finally, if neither the seasonal nor fallback branding directories contain - an asset, it will simply be ignored. - - Return True if the branding has changed. This will be the case when we enter - a new season, or when something changes in the current seasons's directory - in the branding repository. - """ - old_branding = (self.banner, self.avatar, self.available_icons) - seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True) - - # Only make a call to the fallback directory if there is something to be gained - branding_incomplete = any( - asset not in seasonal_dir - for asset in (FILE_BANNER, FILE_AVATAR, SERVER_ICONS) - ) - if branding_incomplete and self.current_season is not SeasonBase: - fallback_dir = await self._get_files(SeasonBase.branding_path, include_dirs=True) - else: - fallback_dir = {} - - # Resolve assets in this directory, None is a safe value - self.banner = seasonal_dir.get(FILE_BANNER) or fallback_dir.get(FILE_BANNER) - self.avatar = seasonal_dir.get(FILE_AVATAR) or fallback_dir.get(FILE_AVATAR) - - # Now resolve server icons by making a call to the proper sub-directory - if SERVER_ICONS in seasonal_dir: - icons_dir = await self._get_files(f"{self.current_season.branding_path}/{SERVER_ICONS}") - self.available_icons = list(icons_dir.values()) - - elif SERVER_ICONS in fallback_dir: - icons_dir = await self._get_files(f"{SeasonBase.branding_path}/{SERVER_ICONS}") - self.available_icons = list(icons_dir.values()) - - else: - self.available_icons = [] # This should never be the case, but an empty list is a safe value - - # GitHubFile instances carry a `sha` attr so this will pick up if a file changes - branding_changed = old_branding != (self.banner, self.avatar, self.available_icons) - - if branding_changed: - log.info(f"New branding detected (season: {self.current_season.season_name})") - await self._reset_remaining_icons() - await self._reset_days_since_cycle() - - return branding_changed - - async def cycle(self) -> bool: - """ - Apply the next-up server icon. - - Returns True if an icon is available and successfully gets applied, False otherwise. - """ - if not self.available_icons: - log.info("Cannot cycle: no icons for this season") - return False - - if not self.remaining_icons: - log.info("Reset & shuffle remaining icons") - await self._reset_remaining_icons() - - next_up = self.remaining_icons.pop(0) - success = await self.bot.set_icon(next_up.download_url) - - return success - - async def apply(self) -> t.List[str]: - """ - Apply current branding to the guild and bot. - - This delegates to the bot instance to do all the work. We only provide download urls - for available assets. Assets unavailable in the branding repo will be ignored. - - Returns a list of names of all failed assets. An asset is considered failed - if it isn't found in the branding repo, or if something goes wrong while the - bot is trying to apply it. - - An empty list denotes that all assets have been applied successfully. - """ - report = {asset: False for asset in ("banner", "avatar", "nickname", "icon")} - - if self.banner is not None: - report["banner"] = await self.bot.set_banner(self.banner.download_url) - - if self.avatar is not None: - report["avatar"] = await self.bot.set_avatar(self.avatar.download_url) - - if self.current_season.bot_name: - report["nickname"] = await self.bot.set_nickname(self.current_season.bot_name) - - report["icon"] = await self.cycle() - - failed_assets = [asset for asset, succeeded in report.items() if not succeeded] - return failed_assets - - @with_role(*MODERATION_ROLES) - @commands.group(name="branding") - async def branding_cmds(self, ctx: commands.Context) -> None: - """Manual branding control.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @branding_cmds.command(name="list", aliases=["ls"]) - async def branding_list(self, ctx: commands.Context) -> None: - """List all available seasons and branding sources.""" - embed = discord.Embed(title="Available seasons", colour=Colours.soft_green) - - for season in get_all_seasons(): - if season is SeasonBase: - active_when = "always" - else: - active_when = f"in {human_months(season.months)}" - - description = ( - f"Active {active_when}\n" - f"Branding: {season.branding_path}" - ) - embed.add_field(name=season.season_name, value=description, inline=False) - - await ctx.send(embed=embed) - - @branding_cmds.command(name="set") - async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None: - """ - Manually set season, or reset to current if none given. - - Season search is a case-less comparison against both seasonal class name, - and its `season_name` attr. - - This only pre-loads the cog's internal state to the chosen season, but does not - automatically apply the branding. As that is an expensive operation, the `apply` - command must be called explicitly after this command finishes. - - This means that this command can be used to 'preview' a season gathering info - about its available assets, without applying them to the guild. - - If the daemon is running, it will automatically reset the season to current when - it wakes up. The season set via this command can therefore remain 'detached' from - what it should be - the daemon will make sure that it's set back properly. - """ - if season_name is None: - new_season = get_current_season() - else: - new_season = get_season(season_name) - if new_season is None: - raise BrandingError("No such season exists") - - if self.current_season is new_season: - raise BrandingError(f"Season {self.current_season.season_name} already active") - - self.current_season = new_season - await self.branding_refresh(ctx) - - @branding_cmds.command(name="info", aliases=["status"]) - async def branding_info(self, ctx: commands.Context) -> None: - """ - Show available assets for current season. - - This can be used to confirm that assets have been resolved properly. - When `apply` is used, it attempts to upload exactly the assets listed here. - """ - await ctx.send(embed=await self._info_embed()) - - @branding_cmds.command(name="refresh") - async def branding_refresh(self, ctx: commands.Context) -> None: - """Sync currently available assets with branding repository.""" - async with ctx.typing(): - await self.refresh() - await self.branding_info(ctx) - - @branding_cmds.command(name="apply") - async def branding_apply(self, ctx: commands.Context) -> None: - """ - Apply current season's branding to the guild. - - Use `info` to check which assets will be applied. Shows which assets have - failed to be applied, if any. - """ - async with ctx.typing(): - failed_assets = await self.apply() - if failed_assets: - raise BrandingError(f"Failed to apply following assets: {', '.join(failed_assets)}") - - response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @branding_cmds.command(name="cycle") - async def branding_cycle(self, ctx: commands.Context) -> None: - """ - Apply the next-up guild icon, if multiple are available. - - The order is random. - """ - async with ctx.typing(): - success = await self.cycle() - if not success: - raise BrandingError("Failed to cycle icon") - - response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @branding_cmds.group(name="daemon", aliases=["d", "task"]) - async def daemon_group(self, ctx: commands.Context) -> None: - """Control the background daemon.""" - if not ctx.invoked_subcommand: - await ctx.send_help(ctx.command) - - @daemon_group.command(name="status") - async def daemon_status(self, ctx: commands.Context) -> None: - """Check whether daemon is currently active.""" - if self._daemon_running: - remaining_time = (arrow.utcnow() + time_until_midnight()).humanize() - response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green) - response.set_footer(text=f"Next refresh {remaining_time}") - else: - response = discord.Embed(description="Daemon not running", colour=Colours.soft_red) - - await ctx.send(embed=response) - - @daemon_group.command(name="start") - async def daemon_start(self, ctx: commands.Context) -> None: - """If the daemon isn't running, start it.""" - if self._daemon_running: - raise BrandingError("Daemon already running!") - - self.daemon = self.bot.loop.create_task(self._daemon_func()) - self._write_config("daemon_active", True) - - response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - @daemon_group.command(name="stop") - async def daemon_stop(self, ctx: commands.Context) -> None: - """If the daemon is running, stop it.""" - if not self._daemon_running: - raise BrandingError("Daemon not running!") - - self.daemon.cancel() - self._write_config("daemon_active", False) - - response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green) - await ctx.send(embed=response) - - -def setup(bot: SeasonalBot) -> None: - """Load BrandingManager cog.""" - bot.add_cog(BrandingManager(bot)) diff --git a/bot/exts/evergreen/emoji_count.py b/bot/exts/evergreen/emoji_count.py index ef900199..cc43e9ab 100644 --- a/bot/exts/evergreen/emoji_count.py +++ b/bot/exts/evergreen/emoji_count.py @@ -1,12 +1,14 @@ import datetime import logging import random -from typing import Dict, Optional +from collections import defaultdict +from typing import List, Tuple import discord from discord.ext import commands from bot.constants import Colours, ERROR_REPLIES +from bot.utils.pagination import LinePaginator log = logging.getLogger(__name__) @@ -17,73 +19,77 @@ class EmojiCount(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot - def embed_builder(self, emoji: dict) -> discord.Embed: + @staticmethod + def embed_builder(emoji: dict) -> Tuple[discord.Embed, List[str]]: """Generates an embed with the emoji names and count.""" embed = discord.Embed( color=Colours.orange, title="Emoji Count", timestamp=datetime.datetime.utcnow() ) + msg = [] if len(emoji) == 1: - for key, value in emoji.items(): - embed.description = f"There are **{len(value)}** emojis in the **{key}** category" - embed.set_thumbnail(url=random.choice(value).url) + for category_name, category_emojis in emoji.items(): + if len(category_emojis) == 1: + msg.append(f"There is **{len(category_emojis)}** emoji in **{category_name}** category") + else: + msg.append(f"There are **{len(category_emojis)}** emojis in **{category_name}** category") + embed.set_thumbnail(url=random.choice(category_emojis).url) + else: - msg = '' - for key, value in emoji.items(): - emoji_choice = random.choice(value) - emoji_info = f'There are **{len(value)}** emojis in the **{key}** category\n' - msg += f'<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}' - embed.description = msg - return embed + for category_name, category_emojis in emoji.items(): + emoji_choice = random.choice(category_emojis) + if len(category_emojis) > 1: + emoji_info = f"There are **{len(category_emojis)}** emojis in **{category_name}** category" + else: + emoji_info = f"There is **{len(category_emojis)}** emoji in **{category_name}** category" + if emoji_choice.animated: + msg.append(f'<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') + else: + msg.append(f'<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') + return embed, msg @staticmethod - def generate_invalid_embed(ctx: commands.Context) -> discord.Embed: - """Genrates error embed.""" + def generate_invalid_embed(emojis: list) -> Tuple[discord.Embed, List[str]]: + """Generates error embed.""" embed = discord.Embed( color=Colours.soft_red, title=random.choice(ERROR_REPLIES) ) + msg = [] - emoji_dict = {} - for emoji in ctx.guild.emojis: - emoji_dict[emoji.name.split("_")[0]] = [] + emoji_dict = defaultdict(list) + for emoji in emojis: + emoji_dict[emoji.name.split("_")[0]].append(emoji) - error_comp = ', '.join(key for key in emoji_dict.keys()) - embed.description = f"These are the valid categories\n```{error_comp}```" - return embed + error_comp = ', '.join(emoji_dict) + msg.append(f"These are the valid categories\n```{error_comp}```") + return embed, msg - def emoji_list(self, ctx: commands.Context, categories: dict) -> Dict: - """Generates an embed with the emoji names and count.""" - out = {category: [] for category in categories} + @commands.command(name="emojicount", aliases=["ec", "emojis"]) + async def emoji_count(self, ctx: commands.Context, *, category_query: str = None) -> None: + """Returns embed with emoji category and info given by the user.""" + emoji_dict = defaultdict(list) + if not ctx.guild.emojis: + await ctx.send("No emojis found.") + return + log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user") for emoji in ctx.guild.emojis: - category = emoji.name.split('_')[0] - if category in out: - out[category].append(emoji) - return out - - @commands.command(name="emoji_count", aliases=["ec"]) - async def ec(self, ctx: commands.Context, *, emoji: str = None) -> Optional[str]: - """Returns embed with emoji category and info given by the user.""" - emoji_dict = {} + emoji_category = emoji.name.split("_")[0] - for a in ctx.guild.emojis: - if emoji is None: - log.trace("Emoji Category not provided by the user") - emoji_dict.update({a.name.split("_")[0]: []}) - elif a.name.split("_")[0] in emoji: - log.trace("Emoji Category provided by the user") - emoji_dict.update({a.name.split("_")[0]: []}) + if category_query is not None and emoji_category not in category_query: + continue - emoji_dict = self.emoji_list(ctx, emoji_dict) + emoji_dict[emoji_category].append(emoji) - if len(emoji_dict) == 0: - embed = self.generate_invalid_embed(ctx) + if not emoji_dict: + log.trace("Invalid name provided by the user") + embed, msg = self.generate_invalid_embed(ctx.guild.emojis) else: - embed = self.embed_builder(emoji_dict) - await ctx.send(embed=embed) + embed, msg = self.embed_builder(emoji_dict) + await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) def setup(bot: commands.Bot) -> None: diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 459a2b2d..99af1519 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -9,7 +9,7 @@ from sentry_sdk import push_scope from bot.constants import Colours, ERROR_REPLIES, NEGATIVE_REPLIES from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import BrandingError, UserNotPlayingError +from bot.utils.exceptions import UserNotPlayingError log = logging.getLogger(__name__) @@ -42,8 +42,8 @@ class CommandErrorHandler(commands.Cog): @commands.Cog.listener() async def on_command_error(self, ctx: commands.Context, error: commands.CommandError) -> None: """Activates when a command opens an error.""" - if hasattr(ctx.command, 'on_error'): - logging.debug("A command error occured but the command had it's own error handler.") + if getattr(error, 'handled', False): + logging.debug(f"Command {ctx.command} had its error already handled locally; ignoring.") return error = getattr(error, 'original', error) @@ -57,10 +57,6 @@ class CommandErrorHandler(commands.Cog): if isinstance(error, commands.CommandNotFound): return - if isinstance(error, BrandingError): - await ctx.send(embed=self.error_embed(str(error))) - return - if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)): await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5) return diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py index de6a92c6..101725da 100644 --- a/bot/exts/evergreen/fun.py +++ b/bot/exts/evergreen/fun.py @@ -7,10 +7,10 @@ from typing import Callable, Iterable, Tuple, Union from discord import Embed, Message from discord.ext import commands -from discord.ext.commands import Bot, Cog, Context, MessageConverter, clean_content +from discord.ext.commands import BadArgument, Bot, Cog, Context, MessageConverter, clean_content from bot import utils -from bot.constants import Colours, Emojis +from bot.constants import Client, Colours, Emojis log = logging.getLogger(__name__) @@ -57,18 +57,20 @@ class Fun(Cog): with Path("bot/resources/evergreen/caesar_info.json").open("r", encoding="UTF-8") as f: self._caesar_cipher_embed = json.load(f) + @staticmethod + def _get_random_die() -> str: + """Generate a random die emoji, ready to be sent on Discord.""" + die_name = f"dice_{random.randint(1, 6)}" + return getattr(Emojis, die_name) + @commands.command() async def roll(self, ctx: Context, num_rolls: int = 1) -> None: """Outputs a number of random dice emotes (up to 6).""" - output = "" - if num_rolls > 6: - num_rolls = 6 - elif num_rolls < 1: - output = ":no_entry: You must roll at least once." - for _ in range(num_rolls): - dice = f"dice_{random.randint(1, 6)}" - output += getattr(Emojis, dice, '') - await ctx.send(output) + if 1 <= num_rolls <= 6: + dice = " ".join(self._get_random_die() for _ in range(num_rolls)) + await ctx.send(dice) + else: + raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.") @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None: diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py index 3c8b2725..d0fd7a40 100644 --- a/bot/exts/evergreen/game.py +++ b/bot/exts/evergreen/game.py @@ -11,7 +11,7 @@ from discord import Embed from discord.ext import tasks from discord.ext.commands import Cog, Context, group -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import STAFF_ROLES, Tokens from bot.utils.decorators import with_role from bot.utils.pagination import ImagePaginator, LinePaginator @@ -130,7 +130,7 @@ class AgeRatings(IntEnum): class Games(Cog): """Games Cog contains commands that collect data from IGDB.""" - def __init__(self, bot: SeasonalBot): + def __init__(self, bot: Bot): self.bot = bot self.http_session: ClientSession = bot.http_session @@ -415,7 +415,7 @@ class Games(Cog): return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """Add/Load Games cog.""" # Check does IGDB API key exist, if not, log warning and don't load cog if not Tokens.igdb: diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py new file mode 100644 index 00000000..2e38e3ab --- /dev/null +++ b/bot/exts/evergreen/githubinfo.py @@ -0,0 +1,98 @@ +import logging +import random +from datetime import datetime +from typing import Optional + +import discord +from discord.ext import commands +from discord.ext.commands.cooldowns import BucketType + +from bot.constants import NEGATIVE_REPLIES + +log = logging.getLogger(__name__) + + +class GithubInfo(commands.Cog): + """Fetches info from GitHub.""" + + def __init__(self, bot: commands.Bot): + self.bot = bot + + async def fetch_data(self, url: str) -> dict: + """Retrieve data as a dictionary.""" + async with self.bot.http_session.get(url) as r: + return await r.json() + + @commands.command(name='github', aliases=['gh']) + @commands.cooldown(1, 60, BucketType.user) + async def get_github_info(self, ctx: commands.Context, username: Optional[str]) -> None: + """ + Fetches a user's GitHub information. + + Username is optional and sends the help command if not specified. + """ + if username is None: + await ctx.invoke(self.bot.get_command('help'), 'github') + ctx.command.reset_cooldown(ctx) + return + + async with ctx.typing(): + user_data = await self.fetch_data(f"https://api.github.com/users/{username}") + + # User_data will not have a message key if the user exists + if user_data.get('message') is not None: + await ctx.send(embed=discord.Embed(title=random.choice(NEGATIVE_REPLIES), + description=f"The profile for `{username}` was not found.", + colour=discord.Colour.red())) + return + + org_data = await self.fetch_data(user_data['organizations_url']) + orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] + orgs_to_add = ' | '.join(orgs) + + gists = user_data['public_gists'] + + # Forming blog link + if user_data['blog'].startswith("http"): # Blog link is complete + blog = user_data['blog'] + elif user_data['blog']: # Blog exists but the link is not complete + blog = f"https://{user_data['blog']}" + else: + blog = "No website link available" + + embed = discord.Embed( + title=f"`{user_data['login']}`'s GitHub profile info", + description=f"```{user_data['bio']}```\n" if user_data['bio'] is not None else "", + colour=0x7289da, + url=user_data['html_url'], + timestamp=datetime.strptime(user_data['created_at'], "%Y-%m-%dT%H:%M:%SZ") + ) + embed.set_thumbnail(url=user_data['avatar_url']) + embed.set_footer(text="Account created at") + + if user_data['type'] == "User": + + embed.add_field(name="Followers", + value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)") + embed.add_field(name="\u200b", value="\u200b") + embed.add_field(name="Following", + value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)") + + embed.add_field(name="Public repos", + value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)") + embed.add_field(name="\u200b", value="\u200b") + + if user_data['type'] == "User": + embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{username})") + + embed.add_field(name=f"Organization{'s' if len(orgs)!=1 else ''}", + value=orgs_to_add if orgs else "No organizations") + embed.add_field(name="\u200b", value="\u200b") + embed.add_field(name="Website", value=blog) + + await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: + """Adding the cog to the bot.""" + bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py index ccd76d76..91147243 100644 --- a/bot/exts/evergreen/help.py +++ b/bot/exts/evergreen/help.py @@ -12,7 +12,7 @@ from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Conte from fuzzywuzzy import fuzz, process from bot import constants -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import Emojis from bot.utils.pagination import ( FIRST_EMOJI, LAST_EMOJI, @@ -511,7 +511,7 @@ class Help(DiscordCog): await ctx.send(embed=embed) -def unload(bot: SeasonalBot) -> None: +def unload(bot: Bot) -> None: """ Reinstates the original help command. @@ -521,7 +521,7 @@ def unload(bot: SeasonalBot) -> None: bot.add_command(bot._old_help) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """ The setup for the help extension. @@ -542,7 +542,7 @@ def setup(bot: SeasonalBot) -> None: raise -def teardown(bot: SeasonalBot) -> None: +def teardown(bot: Bot) -> None: """ The teardown for the help extension. diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index 5a5c82e7..e419a6f5 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -33,12 +33,12 @@ class Issues(commands.Cog): self, ctx: commands.Context, numbers: commands.Greedy[int], - repository: str = "seasonalbot", + repository: str = "sir-lancebot", user: str = "python-discord" ) -> None: """Command to retrieve issue(s) from a GitHub repository.""" links = [] - numbers = set(numbers) + numbers = set(numbers) # Convert from list to set to remove duplicates, if any if not numbers: await ctx.invoke(self.bot.get_command('help'), 'issue') @@ -53,8 +53,7 @@ class Issues(commands.Cog): await ctx.send(embed=embed) return - for number in set(numbers): - # Convert from list to set to remove duplicates, if any. + for number in numbers: url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" diff --git a/bot/exts/evergreen/minesweeper.py b/bot/exts/evergreen/minesweeper.py index 3e40f493..286ac7a5 100644 --- a/bot/exts/evergreen/minesweeper.py +++ b/bot/exts/evergreen/minesweeper.py @@ -120,14 +120,14 @@ class Minesweeper(commands.Cog): def format_for_discord(board: GameBoard) -> str: """Format the board as a string for Discord.""" discord_msg = ( - ":stop_button: :regional_indicator_a::regional_indicator_b::regional_indicator_c:" - ":regional_indicator_d::regional_indicator_e::regional_indicator_f::regional_indicator_g:" - ":regional_indicator_h::regional_indicator_i::regional_indicator_j:\n\n" + ":stop_button: :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: " + ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: " + ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n" ) rows = [] for row_number, row in enumerate(board): new_row = f"{MESSAGE_MAPPING[row_number + 1]} " - new_row += "".join(MESSAGE_MAPPING[cell] for cell in row) + new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row) rows.append(new_row) discord_msg += "\n".join(rows) @@ -158,7 +158,7 @@ class Minesweeper(commands.Cog): if ctx.guild: await ctx.send(f"{ctx.author.mention} is playing Minesweeper") - chat_msg = await ctx.send(f"Here's there board!\n{self.format_for_discord(revealed_board)}") + chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}") else: chat_msg = None @@ -176,7 +176,7 @@ class Minesweeper(commands.Cog): await game.dm_msg.delete() game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}") if game.activated_on_server: - await game.chat_msg.edit(content=f"Here's there board!\n{self.format_for_discord(game.revealed)}") + await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}") @commands.dm_only() @minesweeper_group.command(name="flag") diff --git a/bot/exts/evergreen/movie.py b/bot/exts/evergreen/movie.py index 93aeef30..340a5724 100644 --- a/bot/exts/evergreen/movie.py +++ b/bot/exts/evergreen/movie.py @@ -190,7 +190,10 @@ class Movie(Cog): async def get_embed(self, name: str) -> Embed: """Return embed of random movies. Uses name in title.""" - return Embed(title=f'Random {name} Movies').set_footer(text='Powered by TMDB (themoviedb.org)') + embed = Embed(title=f"Random {name} Movies") + embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") + embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") + return embed def setup(bot: Bot) -> None: diff --git a/bot/exts/evergreen/showprojects.py b/bot/exts/evergreen/showprojects.py deleted file mode 100644 index 328a7aa5..00000000 --- a/bot/exts/evergreen/showprojects.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -from discord import Message -from discord.ext import commands - -from bot.constants import Channels - -log = logging.getLogger(__name__) - - -class ShowProjects(commands.Cog): - """Cog that reacts to posts in the #show-your-projects.""" - - def __init__(self, bot: commands.Bot): - self.bot = bot - self.lastPoster = 0 # Given 0 as the default last poster ID as no user can actually have 0 assigned to them - - @commands.Cog.listener() - async def on_message(self, message: Message) -> None: - """Adds reactions to posts in #show-your-projects.""" - reactions = ["\U0001f44d", "\U00002764", "\U0001f440", "\U0001f389", "\U0001f680", "\U00002b50", "\U0001f6a9"] - if (message.channel.id == Channels.show_your_projects - and message.author.bot is False - and message.author.id != self.lastPoster): - for reaction in reactions: - await message.add_reaction(reaction) - - self.lastPoster = message.author.id - - -def setup(bot: commands.Bot) -> None: - """Show Projects Reaction Cog.""" - bot.add_cog(ShowProjects(bot)) diff --git a/bot/exts/evergreen/snakes/_snakes_cog.py b/bot/exts/evergreen/snakes/_snakes_cog.py index a846274b..d5e4f206 100644 --- a/bot/exts/evergreen/snakes/_snakes_cog.py +++ b/bot/exts/evergreen/snakes/_snakes_cog.py @@ -15,7 +15,8 @@ import aiohttp import async_timeout from PIL import Image, ImageDraw, ImageFont from discord import Colour, Embed, File, Member, Message, Reaction -from discord.ext.commands import BadArgument, Bot, Cog, CommandError, Context, bot_has_permissions, group +from discord.errors import HTTPException +from discord.ext.commands import Bot, Cog, CommandError, Context, bot_has_permissions, group from bot.constants import ERROR_REPLIES, Tokens from bot.exts.evergreen.snakes import _utils as utils @@ -151,6 +152,7 @@ class Snakes(Cog): self.snake_idioms = utils.get_resource("snake_idioms") self.snake_quizzes = utils.get_resource("snake_quiz") self.snake_facts = utils.get_resource("snake_facts") + self.num_movie_pages = None # region: Helper methods @staticmethod @@ -739,71 +741,68 @@ class Snakes(Cog): @snakes_group.command(name='movie') async def movie_command(self, ctx: Context) -> None: """ - Gets a random snake-related movie from OMDB. + Gets a random snake-related movie from TMDB. Written by Samuel. Modified by gdude. + Modified by Will Da Silva. """ - url = "http://www.omdbapi.com/" - page = random.randint(1, 27) + # Initially 8 pages are fetched. The actual number of pages is set after the first request. + page = random.randint(1, self.num_movie_pages or 8) - response = await self.bot.http_session.get( - url, - params={ - "s": "snake", - "page": page, - "type": "movie", - "apikey": Tokens.omdb - } - ) - data = await response.json() - movie = random.choice(data["Search"])["imdbID"] - - response = await self.bot.http_session.get( - url, - params={ - "i": movie, - "apikey": Tokens.omdb - } - ) - data = await response.json() - - embed = Embed( - title=data["Title"], - color=SNAKE_COLOR - ) - - del data["Response"], data["imdbID"], data["Title"] - - for key, value in data.items(): - if not value or value == "N/A" or key in ("Response", "imdbID", "Title", "Type"): - continue + async with ctx.typing(): + response = await self.bot.http_session.get( + "https://api.themoviedb.org/3/search/movie", + params={ + "query": "snake", + "page": page, + "language": "en-US", + "api_key": Tokens.tmdb, + } + ) + data = await response.json() + if self.num_movie_pages is None: + self.num_movie_pages = data["total_pages"] + movie = random.choice(data["results"])["id"] + + response = await self.bot.http_session.get( + f"https://api.themoviedb.org/3/movie/{movie}", + params={ + "language": "en-US", + "api_key": Tokens.tmdb, + } + ) + data = await response.json() - if key == "Ratings": # [{'Source': 'Internet Movie Database', 'Value': '7.6/10'}] - rating = random.choice(value) + embed = Embed(title=data["title"], color=SNAKE_COLOR) - if rating["Source"] != "Internet Movie Database": - embed.add_field(name=f"Rating: {rating['Source']}", value=rating["Value"]) + if data["poster_path"] is not None: + embed.set_image(url=f"https://images.tmdb.org/t/p/original{data['poster_path']}") - continue + if data["overview"]: + embed.add_field(name="Overview", value=data["overview"]) - if key == "Poster": - embed.set_image(url=value) - continue + if data["release_date"]: + embed.add_field(name="Release Date", value=data["release_date"]) - elif key == "imdbRating": - key = "IMDB Rating" + if data["genres"]: + embed.add_field(name="Genres", value=", ".join([x["name"] for x in data["genres"]])) - elif key == "imdbVotes": - key = "IMDB Votes" + if data["vote_count"]: + embed.add_field(name="Rating", value=f"{data['vote_average']}/10 ({data['vote_count']} votes)", inline=True) - embed.add_field(name=key, value=value, inline=True) + if data["budget"] and data["revenue"]: + embed.add_field(name="Budget", value=data["budget"], inline=True) + embed.add_field(name="Revenue", value=data["revenue"], inline=True) - embed.set_footer(text="Data provided by the OMDB API") + embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") + embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") - await ctx.channel.send( - embed=embed - ) + try: + await ctx.channel.send(embed=embed) + except HTTPException as err: + await ctx.channel.send("An error occurred while fetching a snake-related movie!") + raise err from None @snakes_group.command(name='quiz') @locked() @@ -1083,13 +1082,13 @@ class Snakes(Cog): url, params={ "part": "snippet", - "q": urllib.parse.quote(query), + "q": urllib.parse.quote_plus(query), "type": "video", "key": Tokens.youtube } ) response = await response.json() - data = response['items'] + data = response.get("items", []) # Send the user a video if len(data) > 0: @@ -1126,26 +1125,15 @@ class Snakes(Cog): # endregion # region: Error handlers - @get_command.error @card_command.error - @video_command.error async def command_error(self, ctx: Context, error: CommandError) -> None: """Local error handler for the Snake Cog.""" - embed = Embed() - embed.colour = Colour.red() - - if isinstance(error, BadArgument): - embed.description = str(error) - embed.title = random.choice(ERROR_REPLIES) - - elif isinstance(error, OSError): - log.error(f"snake_card encountered an OSError: {error} ({error.original})") + original_error = getattr(error, "original", None) + if isinstance(original_error, OSError): + error.handled = True + embed = Embed() + embed.colour = Colour.red() + log.error(f"snake_card encountered an OSError: {error} ({original_error})") embed.description = "Could not generate the snake card! Please try again." embed.title = random.choice(ERROR_REPLIES) - - else: - log.error(f"Unhandled tag command error: {error} ({error.original})") - return - - await ctx.send(embed=embed) - # endregion + await ctx.send(embed=embed) diff --git a/bot/exts/evergreen/source.py b/bot/exts/evergreen/source.py index 0725714f..cdfe54ec 100644 --- a/bot/exts/evergreen/source.py +++ b/bot/exts/evergreen/source.py @@ -38,7 +38,7 @@ class BotSource(commands.Cog): async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: """Display information and a GitHub link to the source code of a command, tag, or cog.""" if not source_item: - embed = Embed(title="Seasonal Bot's GitHub Repository") + embed = Embed(title="Sir Lancebot's GitHub Repository") embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") embed.set_thumbnail(url=Source.github_avatar_url) await ctx.send(embed=embed) diff --git a/bot/exts/evergreen/space.py b/bot/exts/evergreen/space.py index 3587fc00..bc8e3118 100644 --- a/bot/exts/evergreen/space.py +++ b/bot/exts/evergreen/space.py @@ -8,7 +8,7 @@ from discord import Embed from discord.ext import tasks from discord.ext.commands import BadArgument, Cog, Context, Converter, group -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import Tokens logger = logging.getLogger(__name__) @@ -37,7 +37,7 @@ class DateConverter(Converter): class Space(Cog): """Space Cog contains commands, that show images, facts or other information about space.""" - def __init__(self, bot: SeasonalBot): + def __init__(self, bot: Bot): self.bot = bot self.http_session = bot.http_session @@ -240,7 +240,7 @@ class Space(Cog): ).set_image(url=image).set_footer(text="Powered by NASA API" + footer) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """Load Space Cog.""" if not Tokens.nasa: logger.warning("Can't find NASA API key. Not loading Space Cog.") diff --git a/bot/exts/evergreen/trivia_quiz.py b/bot/exts/evergreen/trivia_quiz.py index 8dceceac..fe692c2a 100644 --- a/bot/exts/evergreen/trivia_quiz.py +++ b/bot/exts/evergreen/trivia_quiz.py @@ -121,8 +121,10 @@ class TriviaQuiz(commands.Cog): # A function to check whether user input is the correct answer(close to the right answer) def check(m: discord.Message) -> bool: - ratio = fuzz.ratio(answer.lower(), m.content.lower()) - return ratio > 85 and m.channel == ctx.channel + return ( + m.channel == ctx.channel + and fuzz.ratio(answer.lower(), m.content.lower()) > 85 + ) try: msg = await self.bot.wait_for('message', check=check, timeout=10) diff --git a/bot/exts/evergreen/wonder_twins.py b/bot/exts/evergreen/wonder_twins.py new file mode 100644 index 00000000..afc5346e --- /dev/null +++ b/bot/exts/evergreen/wonder_twins.py @@ -0,0 +1,49 @@ +import random +from pathlib import Path + +import yaml +from discord.ext.commands import Bot, Cog, Context, command + + +class WonderTwins(Cog): + """Cog for a Wonder Twins inspired command.""" + + def __init__(self, bot: Bot): + self.bot = bot + + with open(Path.cwd() / "bot" / "resources" / "evergreen" / "wonder_twins.yaml", "r", encoding="utf-8") as f: + info = yaml.load(f, Loader=yaml.FullLoader) + self.water_types = info["water_types"] + self.objects = info["objects"] + self.adjectives = info["adjectives"] + + @staticmethod + def append_onto(phrase: str, insert_word: str) -> str: + """Appends one word onto the end of another phrase in order to format with the proper determiner.""" + if insert_word.endswith("s"): + phrase = phrase.split() + del phrase[0] + phrase = " ".join(phrase) + + insert_word = insert_word.split()[-1] + return " ".join([phrase, insert_word]) + + def format_phrase(self) -> str: + """Creates a transformation phrase from available words.""" + adjective = random.choice((None, random.choice(self.adjectives))) + object_name = random.choice(self.objects) + water_type = random.choice(self.water_types) + + if adjective: + object_name = self.append_onto(adjective, object_name) + return f"{object_name} of {water_type}" + + @command(name="formof", aliases=["wondertwins", "wondertwin", "fo"]) + async def form_of(self, ctx: Context) -> None: + """Command to send a Wonder Twins inspired phrase to the user invoking the command.""" + await ctx.send(f"Form of {self.format_phrase()}!") + + +def setup(bot: Bot) -> None: + """Load the WonderTwins cog.""" + bot.add_cog(WonderTwins(bot)) diff --git a/bot/exts/halloween/candy_collection.py b/bot/exts/halloween/candy_collection.py index caf0df11..0cb37ecd 100644 --- a/bot/exts/halloween/candy_collection.py +++ b/bot/exts/halloween/candy_collection.py @@ -1,11 +1,9 @@ -import functools -import json import logging -import os import random -from typing import List, Union +from typing import Union import discord +from async_rediscache import RedisCache from discord.ext import commands from bot.constants import Channels, Month @@ -13,27 +11,37 @@ from bot.utils.decorators import in_month log = logging.getLogger(__name__) -json_location = os.path.join("bot", "resources", "halloween", "candy_collection.json") - # chance is 1 in x range, so 1 in 20 range would give 5% chance (for add candy) ADD_CANDY_REACTION_CHANCE = 20 # 5% ADD_CANDY_EXISTING_REACTION_CHANCE = 10 # 10% ADD_SKULL_REACTION_CHANCE = 50 # 2% ADD_SKULL_EXISTING_REACTION_CHANCE = 20 # 5% +EMOJIS = dict( + CANDY="\N{CANDY}", + SKULL="\N{SKULL}", + MEDALS=( + '\N{FIRST PLACE MEDAL}', + '\N{SECOND PLACE MEDAL}', + '\N{THIRD PLACE MEDAL}', + '\N{SPORTS MEDAL}', + '\N{SPORTS MEDAL}', + ), +) + class CandyCollection(commands.Cog): """Candy collection game Cog.""" + # User candy amount records + candy_records = RedisCache() + + # Candy and skull messages mapping + candy_messages = RedisCache() + skull_messages = RedisCache() + def __init__(self, bot: commands.Bot): self.bot = bot - with open(json_location, encoding="utf8") as candy: - self.candy_json = json.load(candy) - self.msg_reacted = self.candy_json['msg_reacted'] - self.get_candyinfo = dict() - for userinfo in self.candy_json['records']: - userid = userinfo['userid'] - self.get_candyinfo[userid] = userinfo @in_month(Month.OCTOBER) @commands.Cog.listener() @@ -43,19 +51,17 @@ class CandyCollection(commands.Cog): if message.author.bot: return # ensure it's hacktober channel - if message.channel.id != Channels.seasonalbot_commands: + if message.channel.id != Channels.community_bot_commands: return # do random check for skull first as it has the lower chance if random.randint(1, ADD_SKULL_REACTION_CHANCE) == 1: - d = {"reaction": '\N{SKULL}', "msg_id": message.id, "won": False} - self.msg_reacted.append(d) - return await message.add_reaction('\N{SKULL}') + await self.skull_messages.set(message.id, "skull") + return await message.add_reaction(EMOJIS['SKULL']) # check for the candy chance next if random.randint(1, ADD_CANDY_REACTION_CHANCE) == 1: - d = {"reaction": '\N{CANDY}', "msg_id": message.id, "won": False} - self.msg_reacted.append(d) - return await message.add_reaction('\N{CANDY}') + await self.candy_messages.set(message.id, "candy") + return await message.add_reaction(EMOJIS['CANDY']) @in_month(Month.OCTOBER) @commands.Cog.listener() @@ -67,43 +73,44 @@ class CandyCollection(commands.Cog): return # check to ensure it is in correct channel - if message.channel.id != Channels.seasonalbot_commands: + if message.channel.id != Channels.community_bot_commands: return # if its not a candy or skull, and it is one of 10 most recent messages, # proceed to add a skull/candy with higher chance - if str(reaction.emoji) not in ('\N{SKULL}', '\N{CANDY}'): - if message.id in await self.ten_recent_msg(): + if str(reaction.emoji) not in (EMOJIS['SKULL'], EMOJIS['CANDY']): + recent_message_ids = map( + lambda m: m.id, + await self.hacktober_channel.history(limit=10).flatten() + ) + if message.id in recent_message_ids: await self.reacted_msg_chance(message) return - for react in self.msg_reacted: - # check to see if the message id of a message we added a - # reaction to is in json file, and if nobody has won/claimed it yet - if react['msg_id'] == message.id and react['won'] is False: - react['user_reacted'] = user.id - react['won'] = True - try: - # if they have record/candies in json already it will do this - user_records = self.get_candyinfo[user.id] - if str(reaction.emoji) == '\N{CANDY}': - user_records['record'] += 1 - if str(reaction.emoji) == '\N{SKULL}': - if user_records['record'] <= 3: - user_records['record'] = 0 - lost = 'all of your' - else: - lost = random.randint(1, 3) - user_records['record'] -= lost - await self.send_spook_msg(message.author, message.channel, lost) - - except KeyError: - # otherwise it will raise KeyError so we need to add them to file - if str(reaction.emoji) == '\N{CANDY}': - print('ok') - d = {"userid": user.id, "record": 1} - self.candy_json['records'].append(d) - await self.remove_reactions(reaction) + if await self.candy_messages.get(message.id) == "candy" and str(reaction.emoji) == EMOJIS['CANDY']: + await self.candy_messages.delete(message.id) + if await self.candy_records.contains(user.id): + await self.candy_records.increment(user.id) + else: + await self.candy_records.set(user.id, 1) + + elif await self.skull_messages.get(message.id) == "skull" and str(reaction.emoji) == EMOJIS['SKULL']: + await self.skull_messages.delete(message.id) + + if prev_record := await self.candy_records.get(user.id): + lost = min(random.randint(1, 3), prev_record) + await self.candy_records.decrement(user.id, lost) + + if lost == prev_record: + await CandyCollection.send_spook_msg(user, message.channel, 'all of your') + else: + await CandyCollection.send_spook_msg(user, message.channel, lost) + else: + await CandyCollection.send_no_candy_spook_message(user, message.channel) + else: + return # Skip saving + + await reaction.clear() async def reacted_msg_chance(self, message: discord.Message) -> None: """ @@ -113,109 +120,71 @@ class CandyCollection(commands.Cog): existing reaction. """ if random.randint(1, ADD_SKULL_EXISTING_REACTION_CHANCE) == 1: - d = {"reaction": '\N{SKULL}', "msg_id": message.id, "won": False} - self.msg_reacted.append(d) - return await message.add_reaction('\N{SKULL}') + await self.skull_messages.set(message.id, "skull") + return await message.add_reaction(EMOJIS['SKULL']) if random.randint(1, ADD_CANDY_EXISTING_REACTION_CHANCE) == 1: - d = {"reaction": '\N{CANDY}', "msg_id": message.id, "won": False} - self.msg_reacted.append(d) - return await message.add_reaction('\N{CANDY}') - - async def ten_recent_msg(self) -> List[int]: - """Get the last 10 messages sent in the channel.""" - ten_recent = [] - recent_msg_id = max( - message.id for message in self.bot._connection._messages - if message.channel.id == Channels.seasonalbot_commands - ) - - channel = await self.hacktober_channel() - ten_recent.append(recent_msg_id) - - for i in range(9): - o = discord.Object(id=recent_msg_id + i) - msg = await next(channel.history(limit=1, before=o)) - ten_recent.append(msg.id) + await self.candy_messages.set(message.id, "candy") + return await message.add_reaction(EMOJIS['CANDY']) - return ten_recent - - async def get_message(self, msg_id: int) -> Union[discord.Message, None]: - """Get the message from its ID.""" - try: - o = discord.Object(id=msg_id + 1) - # Use history rather than get_message due to - # poor ratelimit (50/1s vs 1/1s) - msg = await next(self.hacktober_channel.history(limit=1, before=o)) - - if msg.id != msg_id: - return None - - return msg - - except Exception: - return None - - async def hacktober_channel(self) -> discord.TextChannel: + @property + def hacktober_channel(self) -> discord.TextChannel: """Get #hacktoberbot channel from its ID.""" - return self.bot.get_channel(id=Channels.seasonalbot_commands) - - async def remove_reactions(self, reaction: discord.Reaction) -> None: - """Remove all candy/skull reactions.""" - try: - async for user in reaction.users(): - await reaction.message.remove_reaction(reaction.emoji, user) + return self.bot.get_channel(id=Channels.community_bot_commands) - except discord.HTTPException: - pass - - async def send_spook_msg(self, author: discord.Member, channel: discord.TextChannel, candies: int) -> None: + @staticmethod + async def send_spook_msg( + author: discord.Member, channel: discord.TextChannel, candies: Union[str, int] + ) -> None: """Send a spooky message.""" e = discord.Embed(colour=author.colour) e.set_author(name="Ghosts and Ghouls and Jack o' lanterns at night; " f"I took {candies} candies and quickly took flight.") await channel.send(embed=e) - def save_to_json(self) -> None: - """Save JSON to a local file.""" - with open(json_location, 'w', encoding="utf8") as outfile: - json.dump(self.candy_json, outfile) + @staticmethod + async def send_no_candy_spook_message( + author: discord.Member, + channel: discord.TextChannel + ) -> None: + """An alternative spooky message sent when user has no candies in the collection.""" + embed = discord.Embed(color=author.color) + embed.set_author(name="Ghosts and Ghouls and Jack o' lanterns at night; " + "I tried to take your candies but you had none to begin with!") + await channel.send(embed=embed) @in_month(Month.OCTOBER) @commands.command() async def candy(self, ctx: commands.Context) -> None: """Get the candy leaderboard and save to JSON.""" - # Use run_in_executor to prevent blocking - thing = functools.partial(self.save_to_json) - await self.bot.loop.run_in_executor(None, thing) - - emoji = ( - '\N{FIRST PLACE MEDAL}', - '\N{SECOND PLACE MEDAL}', - '\N{THIRD PLACE MEDAL}', - '\N{SPORTS MEDAL}', - '\N{SPORTS MEDAL}' - ) - - top_sorted = sorted(self.candy_json['records'], key=lambda k: k.get('record', 0), reverse=True) - top_five = top_sorted[:5] + records = await self.candy_records.items() - usersid = [] - records = [] - for record in top_five: - usersid.append(record['userid']) - records.append(record['record']) + def generate_leaderboard() -> str: + top_sorted = sorted( + ((user_id, score) for user_id, score in records if score > 0), + key=lambda x: x[1], + reverse=True + ) + top_five = top_sorted[:5] - value = '\n'.join(f'{emoji[index]} <@{usersid[index]}>: {records[index]}' - for index in range(0, len(usersid))) or 'No Candies' + return '\n'.join( + f"{EMOJIS['MEDALS'][index]} <@{record[0]}>: {record[1]}" + for index, record in enumerate(top_five) + ) if top_five else 'No Candies' e = discord.Embed(colour=discord.Colour.blurple()) - e.add_field(name="Top Candy Records", value=value, inline=False) - e.add_field(name='\u200b', - value="Candies will randomly appear on messages sent. " - "\nHit the candy when it appears as fast as possible to get the candy! " - "\nBut beware the ghosts...", - inline=False) + e.add_field( + name="Top Candy Records", + value=generate_leaderboard(), + inline=False + ) + e.add_field( + name='\u200b', + value="Candies will randomly appear on messages sent. " + "\nHit the candy when it appears as fast as possible to get the candy! " + "\nBut beware the ghosts...", + inline=False + ) await ctx.send(embed=e) diff --git a/bot/exts/halloween/hacktober-issue-finder.py b/bot/exts/halloween/hacktober-issue-finder.py index 78acf391..9deadde9 100644 --- a/bot/exts/halloween/hacktober-issue-finder.py +++ b/bot/exts/halloween/hacktober-issue-finder.py @@ -103,7 +103,7 @@ class HacktoberIssues(commands.Cog): labels = [label["name"] for label in issue["labels"]] embed = discord.Embed(title=title) - embed.description = body + embed.description = body[:500] + '...' if len(body) > 500 else body embed.add_field(name="labels", value="\n".join(labels)) embed.url = issue_url embed.set_footer(text=issue_url) diff --git a/bot/exts/halloween/hacktoberstats.py b/bot/exts/halloween/hacktoberstats.py index ed1755e3..84b75022 100644 --- a/bot/exts/halloween/hacktoberstats.py +++ b/bot/exts/halloween/hacktoberstats.py @@ -1,28 +1,30 @@ -import json import logging import re from collections import Counter -from datetime import datetime -from pathlib import Path -from typing import List, Tuple +from datetime import datetime, timedelta +from typing import List, Tuple, Union import aiohttp import discord +from async_rediscache import RedisCache from discord.ext import commands from bot.constants import Channels, Month, Tokens, WHITELISTED_CHANNELS from bot.utils.decorators import in_month, override_in_channel -from bot.utils.persist import make_persistent log = logging.getLogger(__name__) CURRENT_YEAR = datetime.now().year # Used to construct GH API query PRS_FOR_SHIRT = 4 # Minimum number of PRs before a shirt is awarded +REVIEW_DAYS = 14 # number of days needed after PR can be mature HACKTOBER_WHITELIST = WHITELISTED_CHANNELS + (Channels.hacktoberfest_2020,) REQUEST_HEADERS = {"User-Agent": "Python Discord Hacktoberbot"} +# using repo topics API during preview period requires an accept header +GITHUB_TOPICS_ACCEPT_HEADER = {"Accept": "application/vnd.github.mercy-preview+json"} if GITHUB_TOKEN := Tokens.github: REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" + GITHUB_TOPICS_ACCEPT_HEADER["Authorization"] = f"token {GITHUB_TOKEN}" GITHUB_NONEXISTENT_USER_MESSAGE = ( "The listed users cannot be searched either because the users do not exist " @@ -33,10 +35,11 @@ GITHUB_NONEXISTENT_USER_MESSAGE = ( class HacktoberStats(commands.Cog): """Hacktoberfest statistics Cog.""" + # Stores mapping of user IDs and GitHub usernames + linked_accounts = RedisCache() + def __init__(self, bot: commands.Bot): self.bot = bot - self.link_json = make_persistent(Path("bot", "resources", "halloween", "github_links.json")) - self.linked_accounts = self.load_linked_users() @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True) @@ -50,10 +53,10 @@ class HacktoberStats(commands.Cog): get that user's contributions """ if not github_username: - author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) + author_id, author_mention = self._author_mention_from_context(ctx) - if str(author_id) in self.linked_accounts.keys(): - github_username = self.linked_accounts[author_id]["github_username"] + if await self.linked_accounts.contains(author_id): + github_username = await self.linked_accounts.get(author_id) logging.info(f"Getting stats for {author_id} linked GitHub account '{github_username}'") else: msg = ( @@ -73,30 +76,19 @@ class HacktoberStats(commands.Cog): """ Link the invoking user's Github github_username to their Discord ID. - Linked users are stored as a nested dict: - { - Discord_ID: { - "github_username": str - "date_added": datetime - } - } + Linked users are stored in Redis: User ID => GitHub Username. """ - author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) + author_id, author_mention = self._author_mention_from_context(ctx) if github_username: - if str(author_id) in self.linked_accounts.keys(): - old_username = self.linked_accounts[author_id]["github_username"] + if await self.linked_accounts.contains(author_id): + old_username = await self.linked_accounts.get(author_id) logging.info(f"{author_id} has changed their github link from '{old_username}' to '{github_username}'") await ctx.send(f"{author_mention}, your GitHub username has been updated to: '{github_username}'") else: logging.info(f"{author_id} has added a github link to '{github_username}'") await ctx.send(f"{author_mention}, your GitHub username has been added") - self.linked_accounts[author_id] = { - "github_username": github_username, - "date_added": datetime.now() - } - - self.save_linked_users() + await self.linked_accounts.set(author_id, github_username) else: logging.info(f"{author_id} tried to link a GitHub account but didn't provide a username") await ctx.send(f"{author_mention}, a GitHub username is required to link your account") @@ -106,9 +98,9 @@ class HacktoberStats(commands.Cog): @override_in_channel(HACKTOBER_WHITELIST) async def unlink_user(self, ctx: commands.Context) -> None: """Remove the invoking user's account link from the log.""" - author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) + author_id, author_mention = self._author_mention_from_context(ctx) - stored_user = self.linked_accounts.pop(author_id, None) + stored_user = await self.linked_accounts.pop(author_id, None) if stored_user: await ctx.send(f"{author_mention}, your GitHub profile has been unlinked") logging.info(f"{author_id} has unlinked their GitHub account") @@ -116,53 +108,15 @@ class HacktoberStats(commands.Cog): await ctx.send(f"{author_mention}, you do not currently have a linked GitHub account") logging.info(f"{author_id} tried to unlink their GitHub account but no account was linked") - self.save_linked_users() - - def load_linked_users(self) -> dict: - """ - Load list of linked users from local JSON file. - - Linked users are stored as a nested dict: - { - Discord_ID: { - "github_username": str - "date_added": datetime - } - } - """ - if self.link_json.exists(): - logging.info(f"Loading linked GitHub accounts from '{self.link_json}'") - with open(self.link_json, 'r', encoding="utf8") as file: - linked_accounts = json.load(file) - - logging.info(f"Loaded {len(linked_accounts)} linked GitHub accounts from '{self.link_json}'") - return linked_accounts - else: - logging.info(f"Linked account log: '{self.link_json}' does not exist") - return {} - - def save_linked_users(self) -> None: - """ - Save list of linked users to local JSON file. - - Linked users are stored as a nested dict: - { - Discord_ID: { - "github_username": str - "date_added": datetime - } - } - """ - logging.info(f"Saving linked_accounts to '{self.link_json}'") - with open(self.link_json, 'w', encoding="utf8") as file: - json.dump(self.linked_accounts, file, default=str) - logging.info(f"linked_accounts saved to '{self.link_json}'") - async def get_stats(self, ctx: commands.Context, github_username: str) -> None: """ Query GitHub's API for PRs created by a GitHub user during the month of October. - PRs with the 'invalid' tag are ignored + PRs with an 'invalid' or 'spam' label are ignored + + For PRs created after October 3rd, they have to be in a repository that has a + 'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it + to count. If a valid github_username is provided, an embed is generated and posted to the channel @@ -172,19 +126,19 @@ class HacktoberStats(commands.Cog): prs = await self.get_october_prs(github_username) if prs: - stats_embed = self.build_embed(github_username, prs) + stats_embed = await self.build_embed(github_username, prs) await ctx.send('Here are some stats!', embed=stats_embed) else: - await ctx.send(f"No October GitHub contributions found for '{github_username}'") + await ctx.send(f"No valid October GitHub contributions found for '{github_username}'") - def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed: + async def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed: """Return a stats embed built from github_username's PRs.""" logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'") - pr_stats = self._summarize_prs(prs) + in_review, accepted = await self._categorize_prs(prs) - n = pr_stats['n_prs'] + n = len(accepted) + len(in_review) # total number of PRs if n >= PRS_FOR_SHIRT: - shirtstr = f"**{github_username} has earned a T-shirt or a tree!**" + shirtstr = f"**{github_username} is eligible for a T-shirt or a tree!**" elif n == PRS_FOR_SHIRT - 1: shirtstr = f"**{github_username} is 1 PR away from a T-shirt or a tree!**" else: @@ -194,8 +148,8 @@ class HacktoberStats(commands.Cog): title=f"{github_username}'s Hacktoberfest", color=discord.Color(0x9c4af7), description=( - f"{github_username} has made {n} " - f"{HacktoberStats._contributionator(n)} in " + f"{github_username} has made {n} valid " + f"{self._contributionator(n)} in " f"October\n\n" f"{shirtstr}\n\n" ) @@ -207,54 +161,64 @@ class HacktoberStats(commands.Cog): url="https://hacktoberfest.digitalocean.com", icon_url="https://avatars1.githubusercontent.com/u/35706162?s=200&v=4" ) + + # this will handle when no PRs in_review or accepted + review_str = self._build_prs_string(in_review, github_username) or "None" + accepted_str = self._build_prs_string(accepted, github_username) or "None" stats_embed.add_field( - name="Top 5 Repositories:", - value=self._build_top5str(pr_stats) + name=":clock1: In Review", + value=review_str + ) + stats_embed.add_field( + name=":tada: Accepted", + value=accepted_str ) logging.info(f"Hacktoberfest PR built for GitHub user '{github_username}'") return stats_embed @staticmethod - async def get_october_prs(github_username: str) -> List[dict]: + async def get_october_prs(github_username: str) -> Union[List[dict], None]: """ Query GitHub's API for PRs created during the month of October by github_username. - PRs with an 'invalid' tag are ignored + PRs with an 'invalid' or 'spam' label are ignored unless it is merged or approved + + For PRs created after October 3rd, they have to be in a repository that has a + 'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it + to count. If PRs are found, return a list of dicts with basic PR information For each PR: - { + { "repo_url": str - "repo_shortname": str (e.g. "python-discord/seasonalbot") + "repo_shortname": str (e.g. "python-discord/sir-lancebot") "created_at": datetime.datetime - } + "number": int + } Otherwise, return None """ - logging.info(f"Generating Hacktoberfest PR query for GitHub user: '{github_username}'") + logging.info(f"Fetching Hacktoberfest Stats for GitHub user: '{github_username}'") base_url = "https://api.github.com/search/issues?q=" - not_label = "invalid" action_type = "pr" - is_query = f"public+author:{github_username}" + is_query = "public" not_query = "draft" - date_range = f"{CURRENT_YEAR}-10-01T00:00:00%2B14:00..{CURRENT_YEAR}-10-31T23:59:59-11:00" + date_range = f"{CURRENT_YEAR}-09-30T10:00Z..{CURRENT_YEAR}-11-01T12:00Z" per_page = "300" query_url = ( f"{base_url}" - f"-label:{not_label}" f"+type:{action_type}" f"+is:{is_query}" + f"+author:{github_username}" f"+-is:{not_query}" f"+created:{date_range}" f"&per_page={per_page}" ) + logging.debug(f"GitHub query URL generated: {query_url}") - async with aiohttp.ClientSession() as session: - async with session.get(query_url, headers=REQUEST_HEADERS) as resp: - jsonresp = await resp.json() - + jsonresp = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) if "message" in jsonresp.keys(): # One of the parameters is invalid, short circuit for now api_message = jsonresp["errors"][0]["message"] @@ -264,75 +228,193 @@ class HacktoberStats(commands.Cog): logging.debug(f"No GitHub user found named '{github_username}'") else: logging.error(f"GitHub API request for '{github_username}' failed with message: {api_message}") + return + if jsonresp["total_count"] == 0: + # Short circuit if there aren't any PRs + logging.info(f"No Hacktoberfest PRs found for GitHub user: '{github_username}'") return - else: - if jsonresp["total_count"] == 0: - # Short circuit if there aren't any PRs - logging.info(f"No Hacktoberfest PRs found for GitHub user: '{github_username}'") - return - else: - logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'") - outlist = [] - for item in jsonresp["items"]: - shortname = HacktoberStats._get_shortname(item["repository_url"]) - itemdict = { - "repo_url": f"https://www.github.com/{shortname}", - "repo_shortname": shortname, - "created_at": datetime.strptime( - item["created_at"], r"%Y-%m-%dT%H:%M:%SZ" - ), - } + logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'") + outlist = [] # list of pr information dicts that will get returned + oct3 = datetime(int(CURRENT_YEAR), 10, 3, 23, 59, 59, tzinfo=None) + hackto_topics = {} # cache whether each repo has the appropriate topic (bool values) + for item in jsonresp["items"]: + shortname = HacktoberStats._get_shortname(item["repository_url"]) + itemdict = { + "repo_url": f"https://www.github.com/{shortname}", + "repo_shortname": shortname, + "created_at": datetime.strptime( + item["created_at"], r"%Y-%m-%dT%H:%M:%SZ" + ), + "number": item["number"] + } + + # if the PR has 'invalid' or 'spam' labels, the PR must be + # either merged or approved for it to be included + if HacktoberStats._has_label(item, ["invalid", "spam"]): + if not await HacktoberStats._is_accepted(itemdict): + continue + + # PRs before oct 3 no need to check for topics + # continue the loop if 'hacktoberfest-accepted' is labelled then + # there is no need to check for its topics + if itemdict["created_at"] < oct3: + outlist.append(itemdict) + continue + + # checking PR's labels for "hacktoberfest-accepted" + if HacktoberStats._has_label(item, "hacktoberfest-accepted"): + outlist.append(itemdict) + continue + + # no need to query github if repo topics are fetched before already + if shortname in hackto_topics.keys(): + if hackto_topics[shortname]: outlist.append(itemdict) - return outlist + continue + # fetch topics for the pr repo + topics_query_url = f"https://api.github.com/repos/{shortname}/topics" + logging.debug(f"Fetching repo topics for {shortname} with url: {topics_query_url}") + jsonresp2 = await HacktoberStats._fetch_url(topics_query_url, GITHUB_TOPICS_ACCEPT_HEADER) + if jsonresp2.get("names") is None: + logging.error(f"Error fetching topics for {shortname}: {jsonresp2['message']}") + return + + # PRs after oct 3 that doesn't have 'hacktoberfest-accepted' label + # must be in repo with 'hacktoberfest' topic + if "hacktoberfest" in jsonresp2["names"]: + hackto_topics[shortname] = True # cache result in the dict for later use if needed + outlist.append(itemdict) + return outlist + + @staticmethod + async def _fetch_url(url: str, headers: dict) -> dict: + """Retrieve API response from URL.""" + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as resp: + jsonresp = await resp.json() + return jsonresp + + @staticmethod + def _has_label(pr: dict, labels: Union[List[str], str]) -> bool: + """ + Check if a PR has label 'labels'. + + 'labels' can be a string or a list of strings, if it's a list of strings + it will return true if any of the labels match. + """ + if not pr.get("labels"): # if PR has no labels + return False + if (isinstance(labels, str)) and (any(label["name"].casefold() == labels for label in pr["labels"])): + return True + for item in labels: + if any(label["name"].casefold() == item for label in pr["labels"]): + return True + return False + + @staticmethod + async def _is_accepted(pr: dict) -> bool: + """Check if a PR is merged, approved, or labelled hacktoberfest-accepted.""" + # checking for merge status + query_url = f"https://api.github.com/repos/{pr['repo_shortname']}/pulls/" + query_url += str(pr["number"]) + jsonresp = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) + + if "message" in jsonresp.keys(): + logging.error( + f"Error fetching PR stats for #{pr['number']} in repo {pr['repo_shortname']}:\n" + f"{jsonresp['message']}" + ) + return False + if ("merged" in jsonresp.keys()) and jsonresp["merged"]: + return True + + # checking for the label, using `jsonresp` which has the label information + if HacktoberStats._has_label(jsonresp, "hacktoberfest-accepted"): + return True + + # checking approval + query_url += "/reviews" + jsonresp2 = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) + if isinstance(jsonresp2, dict): + # if API request is unsuccessful it will be a dict with the error in 'message' + logging.error( + f"Error fetching PR reviews for #{pr['number']} in repo {pr['repo_shortname']}:\n" + f"{jsonresp2['message']}" + ) + return False + # if it is successful it will be a list instead of a dict + if len(jsonresp2) == 0: # if PR has no reviews + return False + + # loop through reviews and check for approval + for item in jsonresp2: + if "status" in item.keys(): + if item['status'] == "APPROVED": + return True + return False @staticmethod def _get_shortname(in_url: str) -> str: """ Extract shortname from https://api.github.com/repos/* URL. - e.g. "https://api.github.com/repos/python-discord/seasonalbot" + e.g. "https://api.github.com/repos/python-discord/sir-lancebot" | V - "python-discord/seasonalbot" + "python-discord/sir-lancebot" """ exp = r"https?:\/\/api.github.com\/repos\/([/\-\_\.\w]+)" return re.findall(exp, in_url)[0] @staticmethod - def _summarize_prs(prs: List[dict]) -> dict: + async def _categorize_prs(prs: List[dict]) -> tuple: """ - Generate statistics from an input list of PR dictionaries, as output by get_october_prs. + Categorize PRs into 'in_review' and 'accepted' and returns as a tuple. - Return a dictionary containing: - { - "n_prs": int - "top5": [(repo_shortname, ncontributions), ...] - } + PRs created less than 14 days ago are 'in_review', PRs that are not + are 'accepted' (after 14 days review period). + + PRs that are accepted must either be merged, approved, or labelled + 'hacktoberfest-accepted. """ - contributed_repos = [pr["repo_shortname"] for pr in prs] - return {"n_prs": len(prs), "top5": Counter(contributed_repos).most_common(5)} + now = datetime.now() + oct3 = datetime(CURRENT_YEAR, 10, 3, 23, 59, 59, tzinfo=None) + in_review = [] + accepted = [] + for pr in prs: + if (pr['created_at'] + timedelta(REVIEW_DAYS)) > now: + in_review.append(pr) + elif (pr['created_at'] <= oct3) or await HacktoberStats._is_accepted(pr): + accepted.append(pr) + + return in_review, accepted @staticmethod - def _build_top5str(stats: List[tuple]) -> str: + def _build_prs_string(prs: List[tuple], user: str) -> str: """ - Build a string from the Top 5 contributions that is compatible with a discord.Embed field. - - Top 5 contributions should be a list of tuples, as output in the stats dictionary by - _summarize_prs + Builds a discord embed compatible string for a list of PRs. - String is of the form: - n contribution(s) to [shortname](url) - ... + Repository name with the link to pull requests authored by 'user' for + each PR. """ base_url = "https://www.github.com/" - contributionstrs = [] - for repo in stats['top5']: - n = repo[1] - contributionstrs.append(f"{n} {HacktoberStats._contributionator(n)} to [{repo[0]}]({base_url}{repo[0]})") - - return "\n".join(contributionstrs) + str_list = [] + repo_list = [pr["repo_shortname"] for pr in prs] + prs_list = Counter(repo_list).most_common(5) # get first 5 counted PRs + more = len(prs) - sum(i[1] for i in prs_list) + + for pr in prs_list: + # for example: https://www.github.com/python-discord/bot/pulls/octocat + # will display pull requests authored by octocat. + # pr[1] is the number of PRs to the repo + string = f"{pr[1]} to [{pr[0]}]({base_url}{pr[0]}/pulls/{user})" + str_list.append(string) + if more: + str_list.append(f"...and {more} more") + + return "\n".join(str_list) @staticmethod def _contributionator(n: int) -> str: diff --git a/bot/exts/halloween/monstersurvey.py b/bot/exts/halloween/monstersurvey.py index 7b1a1e84..80196825 100644 --- a/bot/exts/halloween/monstersurvey.py +++ b/bot/exts/halloween/monstersurvey.py @@ -202,4 +202,3 @@ class MonsterSurvey(Cog): def setup(bot: Bot) -> None: """Monster survey Cog load.""" - bot.add_cog(MonsterSurvey(bot)) diff --git a/bot/exts/halloween/scarymovie.py b/bot/exts/halloween/scarymovie.py index c80e0298..0807eca6 100644 --- a/bot/exts/halloween/scarymovie.py +++ b/bot/exts/halloween/scarymovie.py @@ -121,7 +121,8 @@ class ScaryMovie(commands.Cog): if value: embed.add_field(name=name, value=value) - embed.set_footer(text='powered by themoviedb.org') + embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") + embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") return embed diff --git a/bot/exts/halloween/spookyreact.py b/bot/exts/halloween/spookyreact.py index e5945aea..b335df75 100644 --- a/bot/exts/halloween/spookyreact.py +++ b/bot/exts/halloween/spookyreact.py @@ -29,13 +29,7 @@ class SpookyReact(Cog): @in_month(Month.OCTOBER) @Cog.listener() async def on_message(self, ctx: discord.Message) -> None: - """ - A command to send the seasonalbot github project. - - Lines that begin with the bot's command prefix are ignored - - Seasonalbot's own messages are ignored - """ + """Triggered when the bot sees a message in October.""" for trigger in SPOOKY_TRIGGERS.keys(): trigger_test = re.search(SPOOKY_TRIGGERS[trigger][0], ctx.content.lower()) if trigger_test: diff --git a/bot/exts/pride/pride_avatar.py b/bot/exts/pride/pride_avatar.py index 3f9878e3..2eade796 100644 --- a/bot/exts/pride/pride_avatar.py +++ b/bot/exts/pride/pride_avatar.py @@ -1,10 +1,12 @@ import logging from io import BytesIO from pathlib import Path +from typing import Tuple +import aiohttp import discord -from PIL import Image, ImageDraw -from discord.ext import commands +from PIL import Image, ImageDraw, UnidentifiedImageError +from discord.ext.commands import Bot, Cog, Context, group from bot.constants import Colours @@ -53,10 +55,10 @@ OPTIONS = { } -class PrideAvatar(commands.Cog): +class PrideAvatar(Cog): """Put an LGBT spin on your avatar!""" - def __init__(self, bot: commands.Bot): + def __init__(self, bot: Bot): self.bot = bot @staticmethod @@ -78,8 +80,41 @@ class PrideAvatar(commands.Cog): ring.putalpha(mask) return ring - @commands.group(aliases=["avatarpride", "pridepfp", "prideprofile"], invoke_without_command=True) - async def prideavatar(self, ctx: commands.Context, option: str = "lgbt", pixels: int = 64) -> None: + @staticmethod + def process_options(option: str, pixels: int) -> Tuple[str, int, str]: + """Does some shared preprocessing for the prideavatar commands.""" + return option.lower(), max(0, min(512, pixels)), OPTIONS.get(option) + + async def process_image(self, ctx: Context, image_bytes: bytes, pixels: int, flag: str, option: str) -> None: + """Constructs the final image, embeds it, and sends it.""" + try: + avatar = Image.open(BytesIO(image_bytes)) + except UnidentifiedImageError: + return await ctx.send("Cannot identify image from provided URL") + avatar = avatar.convert("RGBA").resize((1024, 1024)) + + avatar = self.crop_avatar(avatar) + + ring = Image.open(Path(f"bot/resources/pride/flags/{flag}.png")).resize((1024, 1024)) + ring = ring.convert("RGBA") + ring = self.crop_ring(ring, pixels) + + avatar.alpha_composite(ring, (0, 0)) + bufferedio = BytesIO() + avatar.save(bufferedio, format="PNG") + bufferedio.seek(0) + + file = discord.File(bufferedio, filename="pride_avatar.png") # Creates file to be used in embed + embed = discord.Embed( + name="Your Lovely Pride Avatar", + description=f"Here is your lovely avatar, surrounded by\n a beautiful {option} flag. Enjoy :D" + ) + embed.set_image(url="attachment://pride_avatar.png") + embed.set_footer(text=f"Made by {ctx.author.display_name}", icon_url=ctx.author.avatar_url) + await ctx.send(file=file, embed=embed) + + @group(aliases=["avatarpride", "pridepfp", "prideprofile"], invoke_without_command=True) + async def prideavatar(self, ctx: Context, option: str = "lgbt", pixels: int = 64) -> None: """ This surrounds an avatar with a border of a specified LGBT flag. @@ -88,45 +123,43 @@ class PrideAvatar(commands.Cog): This has a maximum of 512px and defaults to a 64px border. The full image is 1024x1024. """ - pixels = 0 if pixels < 0 else 512 if pixels > 512 else pixels - - option = option.lower() - - if option not in OPTIONS.keys(): + option, pixels, flag = self.process_options(option, pixels) + if flag is None: return await ctx.send("I don't have that flag!") - flag = OPTIONS[option] - async with ctx.typing(): - - # Get avatar bytes image_bytes = await ctx.author.avatar_url.read() - avatar = Image.open(BytesIO(image_bytes)) - avatar = avatar.convert("RGBA").resize((1024, 1024)) - - avatar = self.crop_avatar(avatar) - - ring = Image.open(Path(f"bot/resources/pride/flags/{flag}.png")).resize((1024, 1024)) - ring = ring.convert("RGBA") - ring = self.crop_ring(ring, pixels) + await self.process_image(ctx, image_bytes, pixels, flag, option) - avatar.alpha_composite(ring, (0, 0)) - bufferedio = BytesIO() - avatar.save(bufferedio, format="PNG") - bufferedio.seek(0) + @prideavatar.command() + async def image(self, ctx: Context, url: str, option: str = "lgbt", pixels: int = 64) -> None: + """ + This surrounds the image specified by the URL with a border of a specified LGBT flag. - file = discord.File(bufferedio, filename="pride_avatar.png") # Creates file to be used in embed - embed = discord.Embed( - name="Your Lovely Pride Avatar", - description=f"Here is your lovely avatar, surrounded by\n a beautiful {option} flag. Enjoy :D" - ) - embed.set_image(url="attachment://pride_avatar.png") - embed.set_footer(text=f"Made by {ctx.author.display_name}", icon_url=ctx.author.avatar_url) + This defaults to the LGBT rainbow flag if none is given. + The amount of pixels can be given which determines the thickness of the flag border. + This has a maximum of 512px and defaults to a 64px border. + The full image is 1024x1024. + """ + option, pixels, flag = self.process_options(option, pixels) + if flag is None: + return await ctx.send("I don't have that flag!") - await ctx.send(file=file, embed=embed) + async with ctx.typing(): + async with aiohttp.ClientSession() as session: + try: + response = await session.get(url) + except aiohttp.client_exceptions.ClientConnectorError: + return await ctx.send("Cannot connect to provided URL!") + except aiohttp.client_exceptions.InvalidURL: + return await ctx.send("Invalid URL!") + if response.status != 200: + return await ctx.send("Bad response from provided URL!") + image_bytes = await response.read() + await self.process_image(ctx, image_bytes, pixels, flag, option) @prideavatar.command() - async def flags(self, ctx: commands.Context) -> None: + async def flags(self, ctx: Context) -> None: """This lists the flags that can be used with the prideavatar command.""" choices = sorted(set(OPTIONS.values())) options = "• " + "\n• ".join(choices) @@ -139,6 +172,6 @@ class PrideAvatar(commands.Cog): await ctx.send(embed=embed) -def setup(bot: commands.Bot) -> None: +def setup(bot: Bot) -> None: """Cog load.""" bot.add_cog(PrideAvatar(bot)) diff --git a/bot/exts/pride/pride_facts.py b/bot/exts/pride/pride_facts.py index 9ff4c9e0..5bd5d0ce 100644 --- a/bot/exts/pride/pride_facts.py +++ b/bot/exts/pride/pride_facts.py @@ -9,7 +9,7 @@ import dateutil.parser import discord from discord.ext import commands -from bot.bot import SeasonalBot +from bot.bot import Bot from bot.constants import Channels, Colours, Month from bot.utils.decorators import seasonal_task @@ -21,7 +21,7 @@ Sendable = Union[commands.Context, discord.TextChannel] class PrideFacts(commands.Cog): """Provides a new fact every day during the Pride season!""" - def __init__(self, bot: SeasonalBot): + def __init__(self, bot: Bot): self.bot = bot self.facts = self.load_facts() @@ -38,7 +38,7 @@ class PrideFacts(commands.Cog): """Background task to post the daily pride fact every day.""" await self.bot.wait_until_guild_available() - channel = self.bot.get_channel(Channels.seasonalbot_commands) + channel = self.bot.get_channel(Channels.community_bot_commands) await self.send_select_fact(channel, datetime.utcnow()) async def send_random_fact(self, ctx: commands.Context) -> None: @@ -102,6 +102,6 @@ class PrideFacts(commands.Cog): ) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None: """Cog loader for pride facts.""" bot.add_cog(PrideFacts(bot)) diff --git a/bot/exts/utils/extensions.py b/bot/exts/utils/extensions.py index 102a0416..bb22c353 100644 --- a/bot/exts/utils/extensions.py +++ b/bot/exts/utils/extensions.py @@ -8,7 +8,7 @@ from discord.ext import commands from discord.ext.commands import Context, group from bot import exts -from bot.bot import SeasonalBot as Bot +from bot.bot import Bot from bot.constants import Client, Emojis, MODERATION_ROLES, Roles from bot.utils.checks import with_role_check from bot.utils.extensions import EXTENSIONS, unqualify diff --git a/bot/exts/valentines/be_my_valentine.py b/bot/exts/valentines/be_my_valentine.py index b1258307..4db4d191 100644 --- a/bot/exts/valentines/be_my_valentine.py +++ b/bot/exts/valentines/be_my_valentine.py @@ -99,7 +99,7 @@ class BeMyValentine(commands.Cog): emoji_1, emoji_2 = self.random_emoji() lovefest_role = discord.utils.get(ctx.guild.roles, id=Lovefest.role_id) - channel = self.bot.get_channel(Channels.seasonalbot_commands) + channel = self.bot.get_channel(Channels.community_bot_commands) valentine, title = self.valentine_check(valentine_type) if user is None: diff --git a/bot/exts/valentines/movie_generator.py b/bot/exts/valentines/movie_generator.py index 0843175a..4df9e0d5 100644 --- a/bot/exts/valentines/movie_generator.py +++ b/bot/exts/valentines/movie_generator.py @@ -48,6 +48,8 @@ class RomanceMovieFinder(commands.Cog): embed.set_image(url=f"http://image.tmdb.org/t/p/w200/{selected_movie['poster_path']}") embed.add_field(name="Release date :clock1:", value=selected_movie["release_date"]) embed.add_field(name="Rating :star2:", value=selected_movie["vote_average"]) + embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") + embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") await ctx.send(embed=embed) except KeyError: warning_message = "A KeyError was raised while fetching information on the movie. The API service" \ |