diff options
Diffstat (limited to 'bot/exts/events')
| -rw-r--r-- | bot/exts/events/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/events/advent_of_code/__init__.py | 10 | ||||
| -rw-r--r-- | bot/exts/events/advent_of_code/_caches.py | 5 | ||||
| -rw-r--r-- | bot/exts/events/advent_of_code/_cog.py | 302 | ||||
| -rw-r--r-- | bot/exts/events/advent_of_code/_helpers.py | 591 | ||||
| -rw-r--r-- | bot/exts/events/hacktoberfest/__init__.py | 0 | ||||
| -rw-r--r-- | bot/exts/events/hacktoberfest/hacktober-issue-finder.py | 117 | ||||
| -rw-r--r-- | bot/exts/events/hacktoberfest/hacktoberstats.py | 437 | ||||
| -rw-r--r-- | bot/exts/events/hacktoberfest/timeleft.py | 67 | 
9 files changed, 1529 insertions, 0 deletions
diff --git a/bot/exts/events/__init__.py b/bot/exts/events/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/events/__init__.py diff --git a/bot/exts/events/advent_of_code/__init__.py b/bot/exts/events/advent_of_code/__init__.py new file mode 100644 index 00000000..3c521168 --- /dev/null +++ b/bot/exts/events/advent_of_code/__init__.py @@ -0,0 +1,10 @@ +from bot.bot import Bot + + +def setup(bot: Bot) -> None: +    """Set up the Advent of Code extension.""" +    # Import the Cog at runtime to prevent side effects like defining +    # RedisCache instances too early. +    from ._cog import AdventOfCode + +    bot.add_cog(AdventOfCode(bot)) diff --git a/bot/exts/events/advent_of_code/_caches.py b/bot/exts/events/advent_of_code/_caches.py new file mode 100644 index 00000000..32d5394f --- /dev/null +++ b/bot/exts/events/advent_of_code/_caches.py @@ -0,0 +1,5 @@ +import async_rediscache + +leaderboard_counts = async_rediscache.RedisCache(namespace="AOC_leaderboard_counts") +leaderboard_cache = async_rediscache.RedisCache(namespace="AOC_leaderboard_cache") +assigned_leaderboard = async_rediscache.RedisCache(namespace="AOC_assigned_leaderboard") diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py new file mode 100644 index 00000000..ca60e517 --- /dev/null +++ b/bot/exts/events/advent_of_code/_cog.py @@ -0,0 +1,302 @@ +import json +import logging +from datetime import datetime, timedelta +from pathlib import Path + +import arrow +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import ( +    AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS, +) +from bot.exts.events.advent_of_code import _helpers +from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role +from bot.utils.extensions import invoke_help_command + +log = logging.getLogger(__name__) + +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} + +AOC_WHITELIST_RESTRICTED = WHITELISTED_CHANNELS + (Channels.advent_of_code_commands,) + +# Some commands can be run in the regular advent of code channel +# They aren't spammy and foster discussion +AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,) + + +class AdventOfCode(commands.Cog): +    """Advent of Code festivities! Ho Ho Ho!""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +        self._base_url = f"https://adventofcode.com/{AocConfig.year}" +        self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard" + +        self.about_aoc_filepath = Path("./bot/resources/events/advent_of_code/about.json") +        self.cached_about_aoc = self._build_about_embed() + +        notification_coro = _helpers.new_puzzle_notification(self.bot) +        self.notification_task = self.bot.loop.create_task(notification_coro) +        self.notification_task.set_name("Daily AoC Notification") +        self.notification_task.add_done_callback(_helpers.background_task_callback) + +        status_coro = _helpers.countdown_status(self.bot) +        self.status_task = self.bot.loop.create_task(status_coro) +        self.status_task.set_name("AoC Status Countdown") +        self.status_task.add_done_callback(_helpers.background_task_callback) + +    @commands.group(name="adventofcode", aliases=("aoc",)) +    @whitelist_override(channels=AOC_WHITELIST) +    async def adventofcode_group(self, ctx: commands.Context) -> None: +        """All of the Advent of Code commands.""" +        if not ctx.invoked_subcommand: +            await invoke_help_command(ctx) + +    @adventofcode_group.command( +        name="subscribe", +        aliases=("sub", "notifications", "notify", "notifs"), +        brief="Notifications for new days" +    ) +    @whitelist_override(channels=AOC_WHITELIST) +    async def aoc_subscribe(self, ctx: commands.Context) -> None: +        """Assign the role for notifications about new days being ready.""" +        current_year = datetime.now().year +        if current_year != AocConfig.year: +            await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!") +            return + +        role = ctx.guild.get_role(AocConfig.role_id) +        unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe" + +        if role not in ctx.author.roles: +            await ctx.author.add_roles(role) +            await ctx.send( +                "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. " +                f"You can run `{unsubscribe_command}` to disable them again for you." +            ) +        else: +            await ctx.send( +                "Hey, you already are receiving notifications about new Advent of Code tasks. " +                f"If you don't want them any more, run `{unsubscribe_command}` instead." +            ) + +    @in_month(Month.DECEMBER) +    @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days") +    @whitelist_override(channels=AOC_WHITELIST) +    async def aoc_unsubscribe(self, ctx: commands.Context) -> None: +        """Remove the role for notifications about new days being ready.""" +        role = ctx.guild.get_role(AocConfig.role_id) + +        if role in ctx.author.roles: +            await ctx.author.remove_roles(role) +            await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.") +        else: +            await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.") + +    @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day") +    @whitelist_override(channels=AOC_WHITELIST) +    async def aoc_countdown(self, ctx: commands.Context) -> None: +        """Return time left until next day.""" +        if not _helpers.is_in_advent(): +            datetime_now = arrow.now(_helpers.EST) + +            # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past +            this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST) +            next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST) +            deltas = (dec_first - datetime_now for dec_first in (this_year, next_year)) +            delta = min(delta for delta in deltas if delta >= timedelta())  # timedelta() gives 0 duration delta + +            # Add a finer timedelta if there's less than a day left +            if delta.days == 0: +                delta_str = f"approximately {delta.seconds // 3600} hours" +            else: +                delta_str = f"{delta.days} days" + +            await ctx.send( +                "The Advent of Code event is not currently running. " +                f"The next event will start in {delta_str}." +            ) +            return + +        tomorrow, time_left = _helpers.time_left_to_est_midnight() + +        hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60 + +        await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.") + +    @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code") +    @whitelist_override(channels=AOC_WHITELIST) +    async def about_aoc(self, ctx: commands.Context) -> None: +        """Respond with an explanation of all things Advent of Code.""" +        await ctx.send(embed=self.cached_about_aoc) + +    @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)") +    @whitelist_override(channels=AOC_WHITELIST) +    async def join_leaderboard(self, ctx: commands.Context) -> None: +        """DM the user the information for joining the Python Discord leaderboard.""" +        current_year = datetime.now().year +        if current_year != AocConfig.year: +            await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!") +            return + +        author = ctx.author +        log.info(f"{author.name} ({author.id}) has requested a PyDis AoC leaderboard code") + +        if AocConfig.staff_leaderboard_id and any(r.id == Roles.helpers for r in author.roles): +            join_code = AocConfig.leaderboards[AocConfig.staff_leaderboard_id].join_code +        else: +            try: +                join_code = await _helpers.get_public_join_code(author) +            except _helpers.FetchingLeaderboardFailed: +                await ctx.send(":x: Failed to get join code! Notified maintainers.") +                return + +        if not join_code: +            log.error(f"Failed to get a join code for user {author} ({author.id})") +            error_embed = discord.Embed( +                title="Unable to get join code", +                description="Failed to get a join code to one of our boards. Please notify staff.", +                colour=discord.Colour.red(), +            ) +            await ctx.send(embed=error_embed) +            return + +        info_str = [ +            "To join our leaderboard, follow these steps:", +            "• Log in on https://adventofcode.com", +            "• Head over to https://adventofcode.com/leaderboard/private", +            f"• Use this code `{join_code}` to join the Python Discord leaderboard!", +        ] +        try: +            await author.send("\n".join(info_str)) +        except discord.errors.Forbidden: +            log.debug(f"{author.name} ({author.id}) has disabled DMs from server members") +            await ctx.send(f":x: {author.mention}, please (temporarily) enable DMs to receive the join code") +        else: +            await ctx.message.add_reaction(Emojis.envelope) + +    @in_month(Month.DECEMBER) +    @adventofcode_group.command( +        name="leaderboard", +        aliases=("board", "lb"), +        brief="Get a snapshot of the PyDis private AoC leaderboard", +    ) +    @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) +    async def aoc_leaderboard(self, ctx: commands.Context) -> None: +        """Get the current top scorers of the Python Discord Leaderboard.""" +        async with ctx.typing(): +            try: +                leaderboard = await _helpers.fetch_leaderboard() +            except _helpers.FetchingLeaderboardFailed: +                await ctx.send(":x: Unable to fetch leaderboard!") +                return + +            number_of_participants = leaderboard["number_of_participants"] + +            top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants) +            header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}" + +            table = f"```\n{leaderboard['top_leaderboard']}\n```" +            info_embed = _helpers.get_summary_embed(leaderboard) + +            await ctx.send(content=f"{header}\n\n{table}", embed=info_embed) + +    @in_month(Month.DECEMBER) +    @adventofcode_group.command( +        name="global", +        aliases=("globalboard", "gb"), +        brief="Get a link to the global leaderboard", +    ) +    @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) +    async def aoc_global_leaderboard(self, ctx: commands.Context) -> None: +        """Get a link to the global Advent of Code leaderboard.""" +        url = self.global_leaderboard_url +        global_leaderboard = discord.Embed( +            title="Advent of Code — Global Leaderboard", +            description=f"You can find the global leaderboard [here]({url})." +        ) +        global_leaderboard.set_thumbnail(url=_helpers.AOC_EMBED_THUMBNAIL) +        await ctx.send(embed=global_leaderboard) + +    @adventofcode_group.command( +        name="stats", +        aliases=("dailystats", "ds"), +        brief="Get daily statistics for the Python Discord leaderboard" +    ) +    @whitelist_override(channels=AOC_WHITELIST_RESTRICTED) +    async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None: +        """Send an embed with daily completion statistics for the Python Discord leaderboard.""" +        try: +            leaderboard = await _helpers.fetch_leaderboard() +        except _helpers.FetchingLeaderboardFailed: +            await ctx.send(":x: Can't fetch leaderboard for stats right now!") +            return + +        # The daily stats are serialized as JSON as they have to be cached in Redis +        daily_stats = json.loads(leaderboard["daily_stats"]) +        async with ctx.typing(): +            lines = ["Day   ⭐  ⭐⭐ |   %⭐    %⭐⭐\n================================"] +            for day, stars in daily_stats.items(): +                star_one = stars["star_one"] +                star_two = stars["star_two"] +                p_star_one = star_one / leaderboard["number_of_participants"] +                p_star_two = star_two / leaderboard["number_of_participants"] +                lines.append( +                    f"{day:>2}) {star_one:>4}  {star_two:>4} | {p_star_one:>7.2%} {p_star_two:>7.2%}" +                ) +            table = "\n".join(lines) +            info_embed = _helpers.get_summary_embed(leaderboard) +            await ctx.send(f"```\n{table}\n```", embed=info_embed) + +    @with_role(Roles.admin) +    @adventofcode_group.command( +        name="refresh", +        aliases=("fetch",), +        brief="Force a refresh of the leaderboard cache.", +    ) +    async def refresh_leaderboard(self, ctx: commands.Context) -> None: +        """ +        Force a refresh of the leaderboard cache. + +        Note: This should be used sparingly, as we want to prevent sending too +        many requests to the Advent of Code server. +        """ +        async with ctx.typing(): +            try: +                await _helpers.fetch_leaderboard(invalidate_cache=True) +            except _helpers.FetchingLeaderboardFailed: +                await ctx.send(":x: Something went wrong while trying to refresh the cache!") +            else: +                await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!") + +    def cog_unload(self) -> None: +        """Cancel season-related tasks on cog unload.""" +        log.debug("Unloading the cog and canceling the background task.") +        self.notification_task.cancel() +        self.status_task.cancel() + +    def _build_about_embed(self) -> discord.Embed: +        """Build and return the informational "About AoC" embed from the resources file.""" +        embed_fields = json.loads(self.about_aoc_filepath.read_text("utf8")) + +        about_embed = discord.Embed( +            title=self._base_url, +            colour=Colours.soft_green, +            url=self._base_url, +            timestamp=datetime.utcnow() +        ) +        about_embed.set_author(name="Advent of Code", url=self._base_url) +        for field in embed_fields: +            about_embed.add_field(**field) + +        about_embed.set_footer(text="Last Updated") +        return about_embed + +    async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None: +        """Custom error handler if an advent of code command was posted in the wrong channel.""" +        if isinstance(error, InChannelCheckFailure): +            await ctx.send(f":x: Please use <#{Channels.advent_of_code_commands}> for aoc commands instead.") +            error.handled = True diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py new file mode 100644 index 00000000..5fedb60f --- /dev/null +++ b/bot/exts/events/advent_of_code/_helpers.py @@ -0,0 +1,591 @@ +import asyncio +import collections +import datetime +import json +import logging +import math +import operator +from typing import Any, Optional + +import aiohttp +import arrow +import discord + +from bot.bot import Bot +from bot.constants import AdventOfCode, Channels, Colours +from bot.exts.events.advent_of_code import _caches + +log = logging.getLogger(__name__) + +PASTE_URL = "https://paste.pythondiscord.com/documents" +RAW_PASTE_URL_TEMPLATE = "https://paste.pythondiscord.com/raw/{key}" + +# Base API URL for Advent of Code Private Leaderboards +AOC_API_URL = "https://adventofcode.com/{year}/leaderboard/private/view/{leaderboard_id}.json" +AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"} + +# Leaderboard Line Template +AOC_TABLE_TEMPLATE = "{rank: >4} | {name:25.25} | {score: >5} | {stars}" +HEADER = AOC_TABLE_TEMPLATE.format(rank="", name="Name", score="Score", stars="⭐, ⭐⭐") +HEADER = f"{HEADER}\n{'-' * (len(HEADER) + 2)}" +HEADER_LINES = len(HEADER.splitlines()) +TOP_LEADERBOARD_LINES = HEADER_LINES + AdventOfCode.leaderboard_displayed_members + +# Keys that need to be set for a cached leaderboard +REQUIRED_CACHE_KEYS = ( +    "full_leaderboard", +    "top_leaderboard", +    "full_leaderboard_url", +    "leaderboard_fetched_at", +    "number_of_participants", +    "daily_stats", +) + +AOC_EMBED_THUMBNAIL = ( +    "https://raw.githubusercontent.com/python-discord" +    "/branding/main/seasonal/christmas/server_icons/festive_256.gif" +) + +# Create an easy constant for the EST timezone +EST = "America/New_York" + +# Step size for the challenge countdown status +COUNTDOWN_STEP = 60 * 5 + +# Create namedtuple that combines a participant's name and their completion +# time for a specific star. We're going to use this later to order the results +# for each star to compute the rank score. +StarResult = collections.namedtuple("StarResult", "member_id completion_time") + + +class UnexpectedRedirect(aiohttp.ClientError): +    """Raised when an unexpected redirect was detected.""" + + +class UnexpectedResponseStatus(aiohttp.ClientError): +    """Raised when an unexpected redirect was detected.""" + + +class FetchingLeaderboardFailedError(Exception): +    """Raised when one or more leaderboards could not be fetched at all.""" + + +def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]: +    """ +    Provide a sorting value for our leaderboard. + +    The leaderboard is sorted primarily on the score someone has received and +    secondary on the number of stars someone has completed. +    """ +    result = entry[1] +    return result["score"], result["star_2"] + result["star_1"] + + +def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict: +    """ +    Parse the leaderboard data received from the AoC website. + +    The data we receive from AoC is structured by member, not by day/star. This +    means that we need to "transpose" the data to a per star structure in order +    to calculate the rank scores each individual should get. + +    As we need our data both "per participant" as well as "per day", we return +    the parsed and analyzed data in both formats. +    """ +    # We need to get an aggregate of completion times for each star of each day, +    # instead of per participant to compute the rank scores. This dictionary will +    # provide such a transposed dataset. +    star_results = collections.defaultdict(list) + +    # As we're already iterating over the participants, we can record the number of +    # first stars and second stars they've achieved right here and now. This means +    # we won't have to iterate over the participants again later. +    leaderboard = {} + +    # The data we get from the AoC website is structured by member, not by day/star, +    # which means we need to iterate over the members to transpose the data to a per +    # star view. We need that per star view to compute rank scores per star. +    for member in raw_leaderboard_data.values(): +        name = member["name"] if member["name"] else f"Anonymous #{member['id']}" +        member_id = member["id"] +        leaderboard[member_id] = {"name": name, "score": 0, "star_1": 0, "star_2": 0} + +        # Iterate over all days for this participant +        for day, stars in member["completion_day_level"].items(): +            # Iterate over the complete stars for this day for this participant +            for star, data in stars.items(): +                # Record completion of this star for this individual +                leaderboard[member_id][f"star_{star}"] += 1 + +                # Record completion datetime for this participant for this day/star +                completion_time = datetime.datetime.fromtimestamp(int(data["get_star_ts"])) +                star_results[(day, star)].append( +                    StarResult(member_id=member_id, completion_time=completion_time) +                ) + +    # Now that we have a transposed dataset that holds the completion time of all +    # participants per star, we can compute the rank-based scores each participant +    # should get for that star. +    max_score = len(leaderboard) +    for (day, _star), results in star_results.items(): +        # If this day should not count in the ranking, skip it. +        if day in AdventOfCode.ignored_days: +            continue + +        sorted_result = sorted(results, key=operator.attrgetter("completion_time")) +        for rank, star_result in enumerate(sorted_result): +            leaderboard[star_result.member_id]["score"] += max_score - rank + +    # Since dictionaries now retain insertion order, let's use that +    sorted_leaderboard = dict( +        sorted(leaderboard.items(), key=leaderboard_sorting_function, reverse=True) +    ) + +    # Create summary stats for the stars completed for each day of the event. +    daily_stats = {} +    for day in range(1, 26): +        day = str(day) +        star_one = len(star_results.get((day, "1"), [])) +        star_two = len(star_results.get((day, "2"), [])) +        # By using a dictionary instead of namedtuple here, we can serialize +        # this data to JSON in order to cache it in Redis. +        daily_stats[day] = {"star_one": star_one, "star_two": star_two} + +    return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard} + + +def _format_leaderboard(leaderboard: dict[str, dict]) -> str: +    """Format the leaderboard using the AOC_TABLE_TEMPLATE.""" +    leaderboard_lines = [HEADER] +    for rank, data in enumerate(leaderboard.values(), start=1): +        leaderboard_lines.append( +            AOC_TABLE_TEMPLATE.format( +                rank=rank, +                name=data["name"], +                score=str(data["score"]), +                stars=f"({data['star_1']}, {data['star_2']})" +            ) +        ) + +    return "\n".join(leaderboard_lines) + + +async def _leaderboard_request(url: str, board: str, cookies: dict) -> dict[str, Any]: +    """Make a leaderboard request using the specified session cookie.""" +    async with aiohttp.request("GET", url, headers=AOC_REQUEST_HEADER, cookies=cookies) as resp: +        # The Advent of Code website redirects silently with a 200 response if a +        # session cookie has expired, is invalid, or was not provided. +        if str(resp.url) != url: +            log.error(f"Fetching leaderboard `{board}` failed! Check the session cookie.") +            raise UnexpectedRedirect(f"redirected unexpectedly to {resp.url} for board `{board}`") + +        # Every status other than `200` is unexpected, not only 400+ +        if not resp.status == 200: +            log.error(f"Unexpected response `{resp.status}` while fetching leaderboard `{board}`") +            raise UnexpectedResponseStatus(f"status `{resp.status}`") + +        return await resp.json() + + +async def _fetch_leaderboard_data() -> dict[str, Any]: +    """Fetch data for all leaderboards and return a pooled result.""" +    year = AdventOfCode.year + +    # We'll make our requests one at a time to not flood the AoC website with +    # up to six simultaneous requests. This may take a little longer, but it +    # does avoid putting unnecessary stress on the Advent of Code website. + +    # Container to store the raw data of each leaderboard +    participants = {} +    for leaderboard in AdventOfCode.leaderboards.values(): +        leaderboard_url = AOC_API_URL.format(year=year, leaderboard_id=leaderboard.id) + +        # Two attempts, one with the original session cookie and one with the fallback session +        for attempt in range(1, 3): +            log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)") +            cookies = {"session": leaderboard.session} +            try: +                raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies) +            except UnexpectedRedirect: +                if cookies["session"] == AdventOfCode.fallback_session: +                    log.error("It seems like the fallback cookie has expired!") +                    raise FetchingLeaderboardFailedError from None + +                # If we're here, it means that the original session did not +                # work. Let's fall back to the fallback session. +                leaderboard.use_fallback_session = True +                continue +            except aiohttp.ClientError: +                # Don't retry, something unexpected is wrong and it may not be the session. +                raise FetchingLeaderboardFailedError from None +            else: +                # Get the participants and store their current count. +                board_participants = raw_data["members"] +                await _caches.leaderboard_counts.set(leaderboard.id, len(board_participants)) +                participants.update(board_participants) +                break +        else: +            log.error(f"reached 'unreachable' state while fetching board `{leaderboard.id}`.") +            raise FetchingLeaderboardFailedError + +    log.info(f"Fetched leaderboard information for {len(participants)} participants") +    return participants + + +async def _upload_leaderboard(leaderboard: str) -> str: +    """Upload the full leaderboard to our paste service and return the URL.""" +    async with aiohttp.request("POST", PASTE_URL, data=leaderboard) as resp: +        try: +            resp_json = await resp.json() +        except Exception: +            log.exception("Failed to upload full leaderboard to paste service") +            return "" + +    if "key" in resp_json: +        return RAW_PASTE_URL_TEMPLATE.format(key=resp_json["key"]) + +    log.error(f"Unexpected response from paste service while uploading leaderboard {resp_json}") +    return "" + + +def _get_top_leaderboard(full_leaderboard: str) -> str: +    """Get the leaderboard up to the maximum specified entries.""" +    return "\n".join(full_leaderboard.splitlines()[:TOP_LEADERBOARD_LINES]) + + +@_caches.leaderboard_cache.atomic_transaction +async def fetch_leaderboard(invalidate_cache: bool = False) -> dict: +    """ +    Get the current Python Discord combined leaderboard. + +    The leaderboard is cached and only fetched from the API if the current data +    is older than the lifetime set in the constants. To prevent multiple calls +    to this function fetching new leaderboard information in case of a cache +    miss, this function is locked to one call at a time using a decorator. +    """ +    cached_leaderboard = await _caches.leaderboard_cache.to_dict() + +    # Check if the cached leaderboard contains everything we expect it to. If it +    # does not, this probably means the cache has not been created yet or has +    # expired in Redis. This check also accounts for a malformed cache. +    if invalidate_cache or any(key not in cached_leaderboard for key in REQUIRED_CACHE_KEYS): +        log.info("No leaderboard cache available, fetching leaderboards...") +        # Fetch the raw data +        raw_leaderboard_data = await _fetch_leaderboard_data() + +        # Parse it to extract "per star, per day" data and participant scores +        parsed_leaderboard_data = _parse_raw_leaderboard_data(raw_leaderboard_data) + +        leaderboard = parsed_leaderboard_data["leaderboard"] +        number_of_participants = len(leaderboard) +        formatted_leaderboard = _format_leaderboard(leaderboard) +        full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard) +        leaderboard_fetched_at = datetime.datetime.utcnow().isoformat() + +        cached_leaderboard = { +            "full_leaderboard": formatted_leaderboard, +            "top_leaderboard": _get_top_leaderboard(formatted_leaderboard), +            "full_leaderboard_url": full_leaderboard_url, +            "leaderboard_fetched_at": leaderboard_fetched_at, +            "number_of_participants": number_of_participants, +            "daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]), +        } + +        # Store the new values in Redis +        await _caches.leaderboard_cache.update(cached_leaderboard) + +        # Set an expiry on the leaderboard RedisCache +        with await _caches.leaderboard_cache._get_pool_connection() as connection: +            await connection.expire( +                _caches.leaderboard_cache.namespace, +                AdventOfCode.leaderboard_cache_expiry_seconds +            ) + +    return cached_leaderboard + + +def get_summary_embed(leaderboard: dict) -> discord.Embed: +    """Get an embed with the current summary stats of the leaderboard.""" +    leaderboard_url = leaderboard["full_leaderboard_url"] +    refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60 + +    aoc_embed = discord.Embed( +        colour=Colours.soft_green, +        timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]), +        description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*" +    ) +    aoc_embed.add_field( +        name="Number of Participants", +        value=leaderboard["number_of_participants"], +        inline=True, +    ) +    if leaderboard_url: +        aoc_embed.add_field( +            name="Full Leaderboard", +            value=f"[Python Discord Leaderboard]({leaderboard_url})", +            inline=True, +        ) +    aoc_embed.set_author(name="Advent of Code", url=leaderboard_url) +    aoc_embed.set_footer(text="Last Updated") +    aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL) + +    return aoc_embed + + +async def get_public_join_code(author: discord.Member) -> Optional[str]: +    """ +    Get the join code for one of the non-staff leaderboards. + +    If a user has previously requested a join code and their assigned board +    hasn't filled up yet, we'll return the same join code to prevent them from +    getting join codes for multiple boards. +    """ +    # Make sure to fetch new leaderboard information if the cache is older than +    # 30 minutes. While this still means that there could be a discrepancy +    # between the current leaderboard state and the numbers we have here, this +    # should work fairly well given the buffer of slots that we have. +    await fetch_leaderboard() +    previously_assigned_board = await _caches.assigned_leaderboard.get(author.id) +    current_board_counts = await _caches.leaderboard_counts.to_dict() + +    # Remove the staff board from the current board counts as it should be ignored. +    current_board_counts.pop(AdventOfCode.staff_leaderboard_id, None) + +    # If this user has already received a join code, we'll give them the +    # exact same one to prevent them from joining multiple boards and taking +    # up multiple slots. +    if previously_assigned_board: +        # Check if their previously assigned board still has room for them +        if current_board_counts.get(previously_assigned_board, 0) < 200: +            log.info(f"{author} ({author.id}) was already assigned to a board with open slots.") +            return AdventOfCode.leaderboards[previously_assigned_board].join_code + +        log.info( +            f"User {author} ({author.id}) previously received the join code for " +            f"board `{previously_assigned_board}`, but that board's now full. " +            "Assigning another board to this user." +        ) + +    # If we don't have the current board counts cached, let's force fetching a new cache +    if not current_board_counts: +        log.warning("Leaderboard counts were missing from the cache unexpectedly!") +        await fetch_leaderboard(invalidate_cache=True) +        current_board_counts = await _caches.leaderboard_counts.to_dict() + +    # Find the board with the current lowest participant count. As we can't +    best_board, _count = min(current_board_counts.items(), key=operator.itemgetter(1)) + +    if current_board_counts.get(best_board, 0) >= 200: +        log.warning(f"User {author} `{author.id}` requested a join code, but all boards are full!") +        return + +    log.info(f"Assigning user {author} ({author.id}) to board `{best_board}`") +    await _caches.assigned_leaderboard.set(author.id, best_board) + +    # Return the join code for this board +    return AdventOfCode.leaderboards[best_board].join_code + + +def is_in_advent() -> bool: +    """ +    Check if we're currently on an Advent of Code day, excluding 25 December. + +    This helper function is used to check whether or not a feature that prepares +    something for the next Advent of Code challenge should run. As the puzzle +    published on the 25th is the last puzzle, this check excludes that date. +    """ +    return arrow.now(EST).day in range(1, 25) and arrow.now(EST).month == 12 + + +def time_left_to_est_midnight() -> tuple[datetime.datetime, datetime.timedelta]: +    """Calculate the amount of time left until midnight EST/UTC-5.""" +    # Change all time properties back to 00:00 +    todays_midnight = arrow.now(EST).replace( +        microsecond=0, +        second=0, +        minute=0, +        hour=0 +    ) + +    # We want tomorrow so add a day on +    tomorrow = todays_midnight + datetime.timedelta(days=1) + +    # Calculate the timedelta between the current time and midnight +    return tomorrow, tomorrow - arrow.now(EST) + + +async def wait_for_advent_of_code(*, hours_before: int = 1) -> None: +    """ +    Wait for the Advent of Code event to start. + +    This function returns `hours_before` (default: 1) the Advent of Code +    actually starts. This allows functions to schedule and execute code that +    needs to run before the event starts. + +    If the event has already started, this function returns immediately. + +    Note: The "next Advent of Code" is determined based on the current value +    of the `AOC_YEAR` environment variable. This allows callers to exit early +    if we're already past the Advent of Code edition the bot is currently +    configured for. +    """ +    start = arrow.get(datetime.datetime(AdventOfCode.year, 12, 1), EST) +    target = start - datetime.timedelta(hours=hours_before) +    now = arrow.now(EST) + +    # If we've already reached or passed to target, we +    # simply return immediately. +    if now >= target: +        return + +    delta = target - now +    await asyncio.sleep(delta.total_seconds()) + + +async def countdown_status(bot: Bot) -> None: +    """ +    Add the time until the next challenge is published to the bot's status. + +    This function sleeps until 2 hours before the event and exists one hour +    after the last challenge has been published. It will not start up again +    automatically for next year's event, as it will wait for the environment +    variable AOC_YEAR to be updated. + +    This ensures that the task will only start sleeping again once the next +    event approaches and we're making preparations for that event. +    """ +    log.debug("Initializing status countdown task.") +    # We wait until 2 hours before the event starts. Then we +    # set our first countdown status. +    await wait_for_advent_of_code(hours_before=2) + +    # Log that we're going to start with the countdown status. +    log.info("The Advent of Code has started or will start soon, starting countdown status.") + +    # Trying to change status too early in the bot's startup sequence will fail +    # the task because the websocket instance has not yet been created. Waiting +    # for this event means that both the websocket instance has been initialized +    # and that the connection to Discord is mature enough to change the presence +    # of the bot. +    await bot.wait_until_guild_available() + +    # Calculate when the task needs to stop running. To prevent the task from +    # sleeping for the entire year, it will only wait in the currently +    # configured year. This means that the task will only start hibernating once +    # we start preparing the next event by changing environment variables. +    last_challenge = arrow.get(datetime.datetime(AdventOfCode.year, 12, 25), EST) +    end = last_challenge + datetime.timedelta(hours=1) + +    while arrow.now(EST) < end: +        _, time_left = time_left_to_est_midnight() + +        aligned_seconds = int(math.ceil(time_left.seconds / COUNTDOWN_STEP)) * COUNTDOWN_STEP +        hours, minutes = aligned_seconds // 3600, aligned_seconds // 60 % 60 + +        if aligned_seconds == 0: +            playing = "right now!" +        elif aligned_seconds == COUNTDOWN_STEP: +            playing = f"in less than {minutes} minutes" +        elif hours == 0: +            playing = f"in {minutes} minutes" +        elif hours == 23: +            playing = f"since {60 - minutes} minutes ago" +        else: +            playing = f"in {hours} hours and {minutes} minutes" + +        log.trace(f"Changing presence to {playing!r}") +        # Status will look like "Playing in 5 hours and 30 minutes" +        await bot.change_presence(activity=discord.Game(playing)) + +        # Sleep until next aligned time or a full step if already aligned +        delay = time_left.seconds % COUNTDOWN_STEP or COUNTDOWN_STEP +        log.trace(f"The countdown status task will sleep for {delay} seconds.") +        await asyncio.sleep(delay) + + +async def new_puzzle_notification(bot: Bot) -> None: +    """ +    Announce the release of a new Advent of Code puzzle. + +    This background task hibernates until just before the Advent of Code starts +    and will then start announcing puzzles as they are published. After the +    event has finished, this task will terminate. +    """ +    # We wake up one hour before the event starts to prepare the announcement +    # of the release of the first puzzle. +    await wait_for_advent_of_code(hours_before=1) + +    log.info("The Advent of Code has started or will start soon, waking up notification task.") + +    # Ensure that the guild cache is loaded so we can get the Advent of Code +    # channel and role. +    await bot.wait_until_guild_available() +    aoc_channel = bot.get_channel(Channels.advent_of_code) +    aoc_role = aoc_channel.guild.get_role(AdventOfCode.role_id) + +    if not aoc_channel: +        log.error("Could not find the AoC channel to send notification in") +        return + +    if not aoc_role: +        log.error("Could not find the AoC role to announce the daily puzzle") +        return + +    # The last event day is 25 December, so we only have to schedule +    # a reminder if the current day is before 25 December. +    end = arrow.get(datetime.datetime(AdventOfCode.year, 12, 25), EST) +    while arrow.now(EST) < end: +        log.trace("Started puzzle notification loop.") +        tomorrow, time_left = time_left_to_est_midnight() + +        # Use `total_seconds` to get the time left in fractional seconds This +        # should wake us up very close to the target. As a safe guard, the sleep +        # duration is padded with 0.1 second to make sure we wake up after +        # midnight. +        sleep_seconds = time_left.total_seconds() + 0.1 +        log.trace(f"The puzzle notification task will sleep for {sleep_seconds} seconds") +        await asyncio.sleep(sleep_seconds) + +        puzzle_url = f"https://adventofcode.com/{AdventOfCode.year}/day/{tomorrow.day}" + +        # Check if the puzzle is already available to prevent our members from spamming +        # the puzzle page before it's available by making a small HEAD request. +        for retry in range(1, 5): +            log.debug(f"Checking if the puzzle is already available (attempt {retry}/4)") +            async with bot.http_session.head(puzzle_url, raise_for_status=False) as resp: +                if resp.status == 200: +                    log.debug("Puzzle is available; let's send an announcement message.") +                    break +            log.debug(f"The puzzle is not yet available (status={resp.status})") +            await asyncio.sleep(10) +        else: +            log.error( +                "The puzzle does does not appear to be available " +                "at this time, canceling announcement" +            ) +            break + +        await aoc_channel.send( +            f"{aoc_role.mention} Good morning! Day {tomorrow.day} is ready to be attempted. " +            f"View it online now at {puzzle_url}. Good luck!", +            allowed_mentions=discord.AllowedMentions( +                everyone=False, +                users=False, +                roles=[aoc_role], +            ) +        ) + +        # Ensure that we don't send duplicate announcements by sleeping to well +        # over midnight. This means we're certain to calculate the time to the +        # next midnight at the top of the loop. +        await asyncio.sleep(120) + + +def background_task_callback(task: asyncio.Task) -> None: +    """Check if the finished background task failed to make sure we log errors.""" +    if task.cancelled(): +        log.info(f"Background task `{task.get_name()}` was cancelled.") +    elif exception := task.exception(): +        log.error(f"Background task `{task.get_name()}` failed:", exc_info=exception) +    else: +        log.info(f"Background task `{task.get_name()}` exited normally.") diff --git a/bot/exts/events/hacktoberfest/__init__.py b/bot/exts/events/hacktoberfest/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/events/hacktoberfest/__init__.py diff --git a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py new file mode 100644 index 00000000..e3053851 --- /dev/null +++ b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py @@ -0,0 +1,117 @@ +import datetime +import logging +import random +from typing import Optional + +import discord +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Month, Tokens +from bot.utils.decorators import in_month + +log = logging.getLogger(__name__) + +URL = "https://api.github.com/search/issues?per_page=100&q=is:issue+label:hacktoberfest+language:python+state:open" + +REQUEST_HEADERS = { +    "User-Agent": "Python Discord Hacktoberbot", +    "Accept": "application / vnd.github.v3 + json" +} +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" + + +class HacktoberIssues(commands.Cog): +    """Find a random hacktober python issue on GitHub.""" + +    def __init__(self, bot: Bot): +        self.bot = bot +        self.cache_normal = None +        self.cache_timer_normal = datetime.datetime(1, 1, 1) +        self.cache_beginner = None +        self.cache_timer_beginner = datetime.datetime(1, 1, 1) + +    @in_month(Month.OCTOBER) +    @commands.command() +    async def hacktoberissues(self, ctx: commands.Context, option: str = "") -> None: +        """ +        Get a random python hacktober issue from Github. + +        If the command is run with beginner (`.hacktoberissues beginner`): +        It will also narrow it down to the "first good issue" label. +        """ +        async with ctx.typing(): +            issues = await self.get_issues(ctx, option) +            if issues is None: +                return +            issue = random.choice(issues["items"]) +            embed = self.format_embed(issue) +        await ctx.send(embed=embed) + +    async def get_issues(self, ctx: commands.Context, option: str) -> Optional[dict]: +        """Get a list of the python issues with the label 'hacktoberfest' from the Github api.""" +        if option == "beginner": +            if (ctx.message.created_at - self.cache_timer_beginner).seconds <= 60: +                log.debug("using cache") +                return self.cache_beginner +        elif (ctx.message.created_at - self.cache_timer_normal).seconds <= 60: +            log.debug("using cache") +            return self.cache_normal + +        if option == "beginner": +            url = URL + '+label:"good first issue"' +            if self.cache_beginner is not None: +                page = random.randint(1, min(1000, self.cache_beginner["total_count"]) // 100) +                url += f"&page={page}" +        else: +            url = URL +            if self.cache_normal is not None: +                page = random.randint(1, min(1000, self.cache_normal["total_count"]) // 100) +                url += f"&page={page}" + +        log.debug(f"making api request to url: {url}") +        async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as response: +            if response.status != 200: +                log.error(f"expected 200 status (got {response.status}) by the GitHub api.") +                await ctx.send( +                    f"ERROR: expected 200 status (got {response.status}) by the GitHub api.\n" +                    f"{await response.text()}" +                ) +                return None +            data = await response.json() + +            if len(data["items"]) == 0: +                log.error(f"no issues returned by GitHub API, with url: {response.url}") +                await ctx.send(f"ERROR: no issues returned by GitHub API, with url: {response.url}") +                return None + +            if option == "beginner": +                self.cache_beginner = data +                self.cache_timer_beginner = ctx.message.created_at +            else: +                self.cache_normal = data +                self.cache_timer_normal = ctx.message.created_at + +            return data + +    @staticmethod +    def format_embed(issue: dict) -> discord.Embed: +        """Format the issue data into a embed.""" +        title = issue["title"] +        issue_url = issue["url"].replace("api.", "").replace("/repos/", "/") +        body = issue["body"] +        labels = [label["name"] for label in issue["labels"]] + +        embed = discord.Embed(title=title) +        embed.description = body[:500] + "..." if len(body) > 500 else body +        embed.add_field(name="labels", value="\n".join(labels)) +        embed.url = issue_url +        embed.set_footer(text=issue_url) + +        return embed + + +def setup(bot: Bot) -> None: +    """Load the HacktoberIssue finder.""" +    bot.add_cog(HacktoberIssues(bot)) diff --git a/bot/exts/events/hacktoberfest/hacktoberstats.py b/bot/exts/events/hacktoberfest/hacktoberstats.py new file mode 100644 index 00000000..72067dbe --- /dev/null +++ b/bot/exts/events/hacktoberfest/hacktoberstats.py @@ -0,0 +1,437 @@ +import logging +import random +import re +from collections import Counter +from datetime import datetime, timedelta +from typing import Optional, Union +from urllib.parse import quote_plus + +import discord +from async_rediscache import RedisCache +from discord.ext import commands + +from bot.bot import Bot +from bot.constants import Colours, Month, NEGATIVE_REPLIES, Tokens +from bot.utils.decorators import in_month + +log = logging.getLogger(__name__) + +CURRENT_YEAR = datetime.now().year  # Used to construct GH API query +PRS_FOR_SHIRT = 4  # Minimum number of PRs before a shirt is awarded +REVIEW_DAYS = 14  # number of days needed after PR can be mature + +REQUEST_HEADERS = {"User-Agent": "Python Discord Hacktoberbot"} +# using repo topics API during preview period requires an accept header +GITHUB_TOPICS_ACCEPT_HEADER = {"Accept": "application/vnd.github.mercy-preview+json"} +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" +    GITHUB_TOPICS_ACCEPT_HEADER["Authorization"] = f"token {GITHUB_TOKEN}" + +GITHUB_NONEXISTENT_USER_MESSAGE = ( +    "The listed users cannot be searched either because the users do not exist " +    "or you do not have permission to view the users." +) + + +class HacktoberStats(commands.Cog): +    """Hacktoberfest statistics Cog.""" + +    # Stores mapping of user IDs and GitHub usernames +    linked_accounts = RedisCache() + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) +    @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True) +    async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None) -> None: +        """ +        Display an embed for a user's Hacktoberfest contributions. + +        If invoked without a subcommand or github_username, get the invoking user's stats if they've +        linked their Discord name to GitHub using .stats link. If invoked with a github_username, +        get that user's contributions +        """ +        if not github_username: +            author_id, author_mention = self._author_mention_from_context(ctx) + +            if await self.linked_accounts.contains(author_id): +                github_username = await self.linked_accounts.get(author_id) +                logging.info(f"Getting stats for {author_id} linked GitHub account '{github_username}'") +            else: +                msg = ( +                    f"{author_mention}, you have not linked a GitHub account\n\n" +                    f"You can link your GitHub account using:\n```\n{ctx.prefix}hackstats link github_username\n```\n" +                    f"Or query GitHub stats directly using:\n```\n{ctx.prefix}hackstats github_username\n```" +                ) +                await ctx.send(msg) +                return + +        await self.get_stats(ctx, github_username) + +    @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) +    @hacktoberstats_group.command(name="link") +    async def link_user(self, ctx: commands.Context, github_username: str = None) -> None: +        """ +        Link the invoking user's Github github_username to their Discord ID. + +        Linked users are stored in Redis: User ID => GitHub Username. +        """ +        author_id, author_mention = self._author_mention_from_context(ctx) +        if github_username: +            if await self.linked_accounts.contains(author_id): +                old_username = await self.linked_accounts.get(author_id) +                log.info(f"{author_id} has changed their github link from '{old_username}' to '{github_username}'") +                await ctx.send(f"{author_mention}, your GitHub username has been updated to: '{github_username}'") +            else: +                log.info(f"{author_id} has added a github link to '{github_username}'") +                await ctx.send(f"{author_mention}, your GitHub username has been added") + +            await self.linked_accounts.set(author_id, github_username) +        else: +            log.info(f"{author_id} tried to link a GitHub account but didn't provide a username") +            await ctx.send(f"{author_mention}, a GitHub username is required to link your account") + +    @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) +    @hacktoberstats_group.command(name="unlink") +    async def unlink_user(self, ctx: commands.Context) -> None: +        """Remove the invoking user's account link from the log.""" +        author_id, author_mention = self._author_mention_from_context(ctx) + +        stored_user = await self.linked_accounts.pop(author_id, None) +        if stored_user: +            await ctx.send(f"{author_mention}, your GitHub profile has been unlinked") +            logging.info(f"{author_id} has unlinked their GitHub account") +        else: +            await ctx.send(f"{author_mention}, you do not currently have a linked GitHub account") +            logging.info(f"{author_id} tried to unlink their GitHub account but no account was linked") + +    async def get_stats(self, ctx: commands.Context, github_username: str) -> None: +        """ +        Query GitHub's API for PRs created by a GitHub user during the month of October. + +        PRs with an 'invalid' or 'spam' label are ignored + +        For PRs created after October 3rd, they have to be in a repository that has a +        'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it +        to count. + +        If a valid github_username is provided, an embed is generated and posted to the channel + +        Otherwise, post a helpful error message +        """ +        async with ctx.typing(): +            prs = await self.get_october_prs(github_username) + +            if prs is None:  # Will be None if the user was not found +                await ctx.send( +                    embed=discord.Embed( +                        title=random.choice(NEGATIVE_REPLIES), +                        description=f"GitHub user `{github_username}` was not found.", +                        colour=discord.Colour.red() +                    ) +                ) +                return + +            if prs: +                stats_embed = await self.build_embed(github_username, prs) +                await ctx.send("Here are some stats!", embed=stats_embed) +            else: +                await ctx.send(f"No valid Hacktoberfest PRs found for '{github_username}'") + +    async def build_embed(self, github_username: str, prs: list[dict]) -> discord.Embed: +        """Return a stats embed built from github_username's PRs.""" +        logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'") +        in_review, accepted = await self._categorize_prs(prs) + +        n = len(accepted) + len(in_review)  # Total number of PRs +        if n >= PRS_FOR_SHIRT: +            shirtstr = f"**{github_username} is eligible for a T-shirt or a tree!**" +        elif n == PRS_FOR_SHIRT - 1: +            shirtstr = f"**{github_username} is 1 PR away from a T-shirt or a tree!**" +        else: +            shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a T-shirt or a tree!**" + +        stats_embed = discord.Embed( +            title=f"{github_username}'s Hacktoberfest", +            color=Colours.purple, +            description=( +                f"{github_username} has made {n} valid " +                f"{self._contributionator(n)} in " +                f"October\n\n" +                f"{shirtstr}\n\n" +            ) +        ) + +        stats_embed.set_thumbnail(url=f"https://www.github.com/{github_username}.png") +        stats_embed.set_author( +            name="Hacktoberfest", +            url="https://hacktoberfest.digitalocean.com", +            icon_url="https://avatars1.githubusercontent.com/u/35706162?s=200&v=4" +        ) + +        # This will handle when no PRs in_review or accepted +        review_str = self._build_prs_string(in_review, github_username) or "None" +        accepted_str = self._build_prs_string(accepted, github_username) or "None" +        stats_embed.add_field( +            name=":clock1: In Review", +            value=review_str +        ) +        stats_embed.add_field( +            name=":tada: Accepted", +            value=accepted_str +        ) + +        logging.info(f"Hacktoberfest PR built for GitHub user '{github_username}'") +        return stats_embed + +    async def get_october_prs(self, github_username: str) -> Optional[list[dict]]: +        """ +        Query GitHub's API for PRs created during the month of October by github_username. + +        PRs with an 'invalid' or 'spam' label are ignored unless it is merged or approved + +        For PRs created after October 3rd, they have to be in a repository that has a +        'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it +        to count. + +        If PRs are found, return a list of dicts with basic PR information + +        For each PR: +        { +            "repo_url": str +            "repo_shortname": str (e.g. "python-discord/sir-lancebot") +            "created_at": datetime.datetime +            "number": int +        } + +        Otherwise, return empty list. +        None will be returned when the GitHub user was not found. +        """ +        log.info(f"Fetching Hacktoberfest Stats for GitHub user: '{github_username}'") +        base_url = "https://api.github.com/search/issues" +        action_type = "pr" +        is_query = "public" +        not_query = "draft" +        date_range = f"{CURRENT_YEAR}-09-30T10:00Z..{CURRENT_YEAR}-11-01T12:00Z" +        per_page = "300" +        query_params = ( +            f"+type:{action_type}" +            f"+is:{is_query}" +            f"+author:{quote_plus(github_username)}" +            f"+-is:{not_query}" +            f"+created:{date_range}" +            f"&per_page={per_page}" +        ) + +        log.debug(f"GitHub query parameters generated: {query_params}") + +        jsonresp = await self._fetch_url(base_url, REQUEST_HEADERS, {"q": query_params}) +        if "message" in jsonresp: +            # One of the parameters is invalid, short circuit for now +            api_message = jsonresp["errors"][0]["message"] + +            # Ignore logging non-existent users or users we do not have permission to see +            if api_message == GITHUB_NONEXISTENT_USER_MESSAGE: +                log.debug(f"No GitHub user found named '{github_username}'") +                return +            else: +                log.error(f"GitHub API request for '{github_username}' failed with message: {api_message}") +            return []  # No October PRs were found due to error + +        if jsonresp["total_count"] == 0: +            # Short circuit if there aren't any PRs +            log.info(f"No October PRs found for GitHub user: '{github_username}'") +            return [] + +        logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'") +        outlist = []  # list of pr information dicts that will get returned +        oct3 = datetime(int(CURRENT_YEAR), 10, 3, 23, 59, 59, tzinfo=None) +        hackto_topics = {}  # cache whether each repo has the appropriate topic (bool values) +        for item in jsonresp["items"]: +            shortname = self._get_shortname(item["repository_url"]) +            itemdict = { +                "repo_url": f"https://www.github.com/{shortname}", +                "repo_shortname": shortname, +                "created_at": datetime.strptime( +                    item["created_at"], "%Y-%m-%dT%H:%M:%SZ" +                ), +                "number": item["number"] +            } + +            # If the PR has 'invalid' or 'spam' labels, the PR must be +            # either merged or approved for it to be included +            if self._has_label(item, ["invalid", "spam"]): +                if not await self._is_accepted(itemdict): +                    continue + +            # PRs before oct 3 no need to check for topics +            # continue the loop if 'hacktoberfest-accepted' is labelled then +            # there is no need to check for its topics +            if itemdict["created_at"] < oct3: +                outlist.append(itemdict) +                continue + +            # Checking PR's labels for "hacktoberfest-accepted" +            if self._has_label(item, "hacktoberfest-accepted"): +                outlist.append(itemdict) +                continue + +            # No need to query GitHub if repo topics are fetched before already +            if hackto_topics.get(shortname): +                outlist.append(itemdict) +                continue +            # Fetch topics for the PR's repo +            topics_query_url = f"https://api.github.com/repos/{shortname}/topics" +            log.debug(f"Fetching repo topics for {shortname} with url: {topics_query_url}") +            jsonresp2 = await self._fetch_url(topics_query_url, GITHUB_TOPICS_ACCEPT_HEADER) +            if jsonresp2.get("names") is None: +                log.error(f"Error fetching topics for {shortname}: {jsonresp2['message']}") +                continue  # Assume the repo doesn't have the `hacktoberfest` topic if API  request errored + +            # PRs after oct 3 that doesn't have 'hacktoberfest-accepted' label +            # must be in repo with 'hacktoberfest' topic +            if "hacktoberfest" in jsonresp2["names"]: +                hackto_topics[shortname] = True  # Cache result in the dict for later use if needed +                outlist.append(itemdict) +        return outlist + +    async def _fetch_url(self, url: str, headers: dict, params: dict) -> dict: +        """Retrieve API response from URL.""" +        async with self.bot.http_session.get(url, headers=headers, params=params) as resp: +            return await resp.json() + +    @staticmethod +    def _has_label(pr: dict, labels: Union[list[str], str]) -> bool: +        """ +        Check if a PR has label 'labels'. + +        'labels' can be a string or a list of strings, if it's a list of strings +        it will return true if any of the labels match. +        """ +        if not pr.get("labels"):  # if PR has no labels +            return False +        if isinstance(labels, str) and any(label["name"].casefold() == labels for label in pr["labels"]): +            return True +        for item in labels: +            if any(label["name"].casefold() == item for label in pr["labels"]): +                return True +        return False + +    async def _is_accepted(self, pr: dict) -> bool: +        """Check if a PR is merged, approved, or labelled hacktoberfest-accepted.""" +        # checking for merge status +        query_url = f"https://api.github.com/repos/{pr['repo_shortname']}/pulls/{pr['number']}" +        jsonresp = await self._fetch_url(query_url, REQUEST_HEADERS) + +        if message := jsonresp.get("message"): +            log.error(f"Error fetching PR stats for #{pr['number']} in repo {pr['repo_shortname']}:\n{message}") +            return False + +        if jsonresp.get("merged"): +            return True + +        # checking for the label, using `jsonresp` which has the label information +        if self._has_label(jsonresp, "hacktoberfest-accepted"): +            return True + +        # checking approval +        query_url += "/reviews" +        jsonresp2 = await self._fetch_url(query_url, REQUEST_HEADERS) +        if isinstance(jsonresp2, dict): +            # if API request is unsuccessful it will be a dict with the error in 'message' +            log.error( +                f"Error fetching PR reviews for #{pr['number']} in repo {pr['repo_shortname']}:\n" +                f"{jsonresp2['message']}" +            ) +            return False +        # if it is successful it will be a list instead of a dict +        if len(jsonresp2) == 0:  # if PR has no reviews +            return False + +        # loop through reviews and check for approval +        for item in jsonresp2: +            if item.get("status") == "APPROVED": +                return True +        return False + +    @staticmethod +    def _get_shortname(in_url: str) -> str: +        """ +        Extract shortname from https://api.github.com/repos/* URL. + +        e.g. "https://api.github.com/repos/python-discord/sir-lancebot" +             | +             V +             "python-discord/sir-lancebot" +        """ +        exp = r"https?:\/\/api.github.com\/repos\/([/\-\_\.\w]+)" +        return re.findall(exp, in_url)[0] + +    async def _categorize_prs(self, prs: list[dict]) -> tuple: +        """ +        Categorize PRs into 'in_review' and 'accepted' and returns as a tuple. + +        PRs created less than 14 days ago are 'in_review', PRs that are not +        are 'accepted' (after 14 days review period). + +        PRs that are accepted must either be merged, approved, or labelled +        'hacktoberfest-accepted. +        """ +        now = datetime.now() +        oct3 = datetime(CURRENT_YEAR, 10, 3, 23, 59, 59, tzinfo=None) +        in_review = [] +        accepted = [] +        for pr in prs: +            if (pr["created_at"] + timedelta(REVIEW_DAYS)) > now: +                in_review.append(pr) +            elif (pr["created_at"] <= oct3) or await self._is_accepted(pr): +                accepted.append(pr) + +        return in_review, accepted + +    @staticmethod +    def _build_prs_string(prs: list[tuple], user: str) -> str: +        """ +        Builds a discord embed compatible string for a list of PRs. + +        Repository name with the link to pull requests authored by 'user' for +        each PR. +        """ +        base_url = "https://www.github.com/" +        str_list = [] +        repo_list = [pr["repo_shortname"] for pr in prs] +        prs_list = Counter(repo_list).most_common(5)  # get first 5 counted PRs +        more = len(prs) - sum(i[1] for i in prs_list) + +        for pr in prs_list: +            # for example: https://www.github.com/python-discord/bot/pulls/octocat +            # will display pull requests authored by octocat. +            # pr[1] is the number of PRs to the repo +            string = f"{pr[1]} to [{pr[0]}]({base_url}{pr[0]}/pulls/{user})" +            str_list.append(string) +        if more: +            str_list.append(f"...and {more} more") + +        return "\n".join(str_list) + +    @staticmethod +    def _contributionator(n: int) -> str: +        """Return "contribution" or "contributions" based on the value of n.""" +        if n == 1: +            return "contribution" +        else: +            return "contributions" + +    @staticmethod +    def _author_mention_from_context(ctx: commands.Context) -> tuple[str, str]: +        """Return stringified Message author ID and mentionable string from commands.Context.""" +        author_id = str(ctx.author.id) +        author_mention = ctx.author.mention + +        return author_id, author_mention + + +def setup(bot: Bot) -> None: +    """Load the Hacktober Stats Cog.""" +    bot.add_cog(HacktoberStats(bot)) diff --git a/bot/exts/events/hacktoberfest/timeleft.py b/bot/exts/events/hacktoberfest/timeleft.py new file mode 100644 index 00000000..55109599 --- /dev/null +++ b/bot/exts/events/hacktoberfest/timeleft.py @@ -0,0 +1,67 @@ +import logging +from datetime import datetime + +from discord.ext import commands + +from bot.bot import Bot + +log = logging.getLogger(__name__) + + +class TimeLeft(commands.Cog): +    """A Cog that tells you how long left until Hacktober is over!""" + +    def in_hacktober(self) -> bool: +        """Return True if the current time is within Hacktoberfest.""" +        _, end, start = self.load_date() + +        now = datetime.utcnow() + +        return start <= now <= end + +    @staticmethod +    def load_date() -> tuple[datetime, datetime, datetime]: +        """Return of a tuple of the current time and the end and start times of the next October.""" +        now = datetime.utcnow() +        year = now.year +        if now.month > 10: +            year += 1 +        end = datetime(year, 11, 1, 12)  # November 1st 12:00 (UTC-12:00) +        start = datetime(year, 9, 30, 10)  # September 30th 10:00 (UTC+14:00) +        return now, end, start + +    @commands.command() +    async def timeleft(self, ctx: commands.Context) -> None: +        """ +        Calculates the time left until the end of Hacktober. + +        Whilst in October, displays the days, hours and minutes left. +        Only displays the days left until the beginning and end whilst in a different month. + +        This factors in that Hacktoberfest starts when it is October anywhere in the world +        and ends with the same rules. It treats the start as UTC+14:00 and the end as +        UTC-12. +        """ +        now, end, start = self.load_date() +        diff = end - now +        days, seconds = diff.days, diff.seconds +        if self.in_hacktober(): +            minutes = seconds // 60 +            hours, minutes = divmod(minutes, 60) + +            await ctx.send( +                f"There are {days} days, {hours} hours and {minutes}" +                f" minutes left until the end of Hacktober." +            ) +        else: +            start_diff = start - now +            start_days = start_diff.days +            await ctx.send( +                f"It is not currently Hacktober. However, the next one will start in {start_days} days " +                f"and will finish in {days} days." +            ) + + +def setup(bot: Bot) -> None: +    """Load the Time Left Cog.""" +    bot.add_cog(TimeLeft())  |