aboutsummaryrefslogtreecommitdiffstats
path: root/bot/exts/events
diff options
context:
space:
mode:
Diffstat (limited to 'bot/exts/events')
-rw-r--r--bot/exts/events/advent_of_code/__init__.py10
-rw-r--r--bot/exts/events/advent_of_code/_caches.py5
-rw-r--r--bot/exts/events/advent_of_code/_cog.py302
-rw-r--r--bot/exts/events/advent_of_code/_helpers.py591
-rw-r--r--bot/exts/events/hacktoberfest/__init__.py0
-rw-r--r--bot/exts/events/hacktoberfest/hacktober-issue-finder.py117
-rw-r--r--bot/exts/events/hacktoberfest/hacktoberstats.py437
-rw-r--r--bot/exts/events/hacktoberfest/timeleft.py67
8 files changed, 1529 insertions, 0 deletions
diff --git a/bot/exts/events/advent_of_code/__init__.py b/bot/exts/events/advent_of_code/__init__.py
new file mode 100644
index 00000000..3c521168
--- /dev/null
+++ b/bot/exts/events/advent_of_code/__init__.py
@@ -0,0 +1,10 @@
+from bot.bot import Bot
+
+
+def setup(bot: Bot) -> None:
+ """Set up the Advent of Code extension."""
+ # Import the Cog at runtime to prevent side effects like defining
+ # RedisCache instances too early.
+ from ._cog import AdventOfCode
+
+ bot.add_cog(AdventOfCode(bot))
diff --git a/bot/exts/events/advent_of_code/_caches.py b/bot/exts/events/advent_of_code/_caches.py
new file mode 100644
index 00000000..32d5394f
--- /dev/null
+++ b/bot/exts/events/advent_of_code/_caches.py
@@ -0,0 +1,5 @@
+import async_rediscache
+
+leaderboard_counts = async_rediscache.RedisCache(namespace="AOC_leaderboard_counts")
+leaderboard_cache = async_rediscache.RedisCache(namespace="AOC_leaderboard_cache")
+assigned_leaderboard = async_rediscache.RedisCache(namespace="AOC_assigned_leaderboard")
diff --git a/bot/exts/events/advent_of_code/_cog.py b/bot/exts/events/advent_of_code/_cog.py
new file mode 100644
index 00000000..ca60e517
--- /dev/null
+++ b/bot/exts/events/advent_of_code/_cog.py
@@ -0,0 +1,302 @@
+import json
+import logging
+from datetime import datetime, timedelta
+from pathlib import Path
+
+import arrow
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import (
+ AdventOfCode as AocConfig, Channels, Colours, Emojis, Month, Roles, WHITELISTED_CHANNELS,
+)
+from bot.exts.events.advent_of_code import _helpers
+from bot.utils.decorators import InChannelCheckFailure, in_month, whitelist_override, with_role
+from bot.utils.extensions import invoke_help_command
+
+log = logging.getLogger(__name__)
+
+AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"}
+
+AOC_WHITELIST_RESTRICTED = WHITELISTED_CHANNELS + (Channels.advent_of_code_commands,)
+
+# Some commands can be run in the regular advent of code channel
+# They aren't spammy and foster discussion
+AOC_WHITELIST = AOC_WHITELIST_RESTRICTED + (Channels.advent_of_code,)
+
+
+class AdventOfCode(commands.Cog):
+ """Advent of Code festivities! Ho Ho Ho!"""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ self._base_url = f"https://adventofcode.com/{AocConfig.year}"
+ self.global_leaderboard_url = f"https://adventofcode.com/{AocConfig.year}/leaderboard"
+
+ self.about_aoc_filepath = Path("./bot/resources/events/advent_of_code/about.json")
+ self.cached_about_aoc = self._build_about_embed()
+
+ notification_coro = _helpers.new_puzzle_notification(self.bot)
+ self.notification_task = self.bot.loop.create_task(notification_coro)
+ self.notification_task.set_name("Daily AoC Notification")
+ self.notification_task.add_done_callback(_helpers.background_task_callback)
+
+ status_coro = _helpers.countdown_status(self.bot)
+ self.status_task = self.bot.loop.create_task(status_coro)
+ self.status_task.set_name("AoC Status Countdown")
+ self.status_task.add_done_callback(_helpers.background_task_callback)
+
+ @commands.group(name="adventofcode", aliases=("aoc",))
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def adventofcode_group(self, ctx: commands.Context) -> None:
+ """All of the Advent of Code commands."""
+ if not ctx.invoked_subcommand:
+ await invoke_help_command(ctx)
+
+ @adventofcode_group.command(
+ name="subscribe",
+ aliases=("sub", "notifications", "notify", "notifs"),
+ brief="Notifications for new days"
+ )
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_subscribe(self, ctx: commands.Context) -> None:
+ """Assign the role for notifications about new days being ready."""
+ current_year = datetime.now().year
+ if current_year != AocConfig.year:
+ await ctx.send(f"You can't subscribe to {current_year}'s Advent of Code announcements yet!")
+ return
+
+ role = ctx.guild.get_role(AocConfig.role_id)
+ unsubscribe_command = f"{ctx.prefix}{ctx.command.root_parent} unsubscribe"
+
+ if role not in ctx.author.roles:
+ await ctx.author.add_roles(role)
+ await ctx.send(
+ "Okay! You have been __subscribed__ to notifications about new Advent of Code tasks. "
+ f"You can run `{unsubscribe_command}` to disable them again for you."
+ )
+ else:
+ await ctx.send(
+ "Hey, you already are receiving notifications about new Advent of Code tasks. "
+ f"If you don't want them any more, run `{unsubscribe_command}` instead."
+ )
+
+ @in_month(Month.DECEMBER)
+ @adventofcode_group.command(name="unsubscribe", aliases=("unsub",), brief="Notifications for new days")
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_unsubscribe(self, ctx: commands.Context) -> None:
+ """Remove the role for notifications about new days being ready."""
+ role = ctx.guild.get_role(AocConfig.role_id)
+
+ if role in ctx.author.roles:
+ await ctx.author.remove_roles(role)
+ await ctx.send("Okay! You have been __unsubscribed__ from notifications about new Advent of Code tasks.")
+ else:
+ await ctx.send("Hey, you don't even get any notifications about new Advent of Code tasks currently anyway.")
+
+ @adventofcode_group.command(name="countdown", aliases=("count", "c"), brief="Return time left until next day")
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def aoc_countdown(self, ctx: commands.Context) -> None:
+ """Return time left until next day."""
+ if not _helpers.is_in_advent():
+ datetime_now = arrow.now(_helpers.EST)
+
+ # Calculate the delta to this & next year's December 1st to see which one is closest and not in the past
+ this_year = arrow.get(datetime(datetime_now.year, 12, 1), _helpers.EST)
+ next_year = arrow.get(datetime(datetime_now.year + 1, 12, 1), _helpers.EST)
+ deltas = (dec_first - datetime_now for dec_first in (this_year, next_year))
+ delta = min(delta for delta in deltas if delta >= timedelta()) # timedelta() gives 0 duration delta
+
+ # Add a finer timedelta if there's less than a day left
+ if delta.days == 0:
+ delta_str = f"approximately {delta.seconds // 3600} hours"
+ else:
+ delta_str = f"{delta.days} days"
+
+ await ctx.send(
+ "The Advent of Code event is not currently running. "
+ f"The next event will start in {delta_str}."
+ )
+ return
+
+ tomorrow, time_left = _helpers.time_left_to_est_midnight()
+
+ hours, minutes = time_left.seconds // 3600, time_left.seconds // 60 % 60
+
+ await ctx.send(f"There are {hours} hours and {minutes} minutes left until day {tomorrow.day}.")
+
+ @adventofcode_group.command(name="about", aliases=("ab", "info"), brief="Learn about Advent of Code")
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def about_aoc(self, ctx: commands.Context) -> None:
+ """Respond with an explanation of all things Advent of Code."""
+ await ctx.send(embed=self.cached_about_aoc)
+
+ @adventofcode_group.command(name="join", aliases=("j",), brief="Learn how to join the leaderboard (via DM)")
+ @whitelist_override(channels=AOC_WHITELIST)
+ async def join_leaderboard(self, ctx: commands.Context) -> None:
+ """DM the user the information for joining the Python Discord leaderboard."""
+ current_year = datetime.now().year
+ if current_year != AocConfig.year:
+ await ctx.send(f"The Python Discord leaderboard for {current_year} is not yet available!")
+ return
+
+ author = ctx.author
+ log.info(f"{author.name} ({author.id}) has requested a PyDis AoC leaderboard code")
+
+ if AocConfig.staff_leaderboard_id and any(r.id == Roles.helpers for r in author.roles):
+ join_code = AocConfig.leaderboards[AocConfig.staff_leaderboard_id].join_code
+ else:
+ try:
+ join_code = await _helpers.get_public_join_code(author)
+ except _helpers.FetchingLeaderboardFailed:
+ await ctx.send(":x: Failed to get join code! Notified maintainers.")
+ return
+
+ if not join_code:
+ log.error(f"Failed to get a join code for user {author} ({author.id})")
+ error_embed = discord.Embed(
+ title="Unable to get join code",
+ description="Failed to get a join code to one of our boards. Please notify staff.",
+ colour=discord.Colour.red(),
+ )
+ await ctx.send(embed=error_embed)
+ return
+
+ info_str = [
+ "To join our leaderboard, follow these steps:",
+ "• Log in on https://adventofcode.com",
+ "• Head over to https://adventofcode.com/leaderboard/private",
+ f"• Use this code `{join_code}` to join the Python Discord leaderboard!",
+ ]
+ try:
+ await author.send("\n".join(info_str))
+ except discord.errors.Forbidden:
+ log.debug(f"{author.name} ({author.id}) has disabled DMs from server members")
+ await ctx.send(f":x: {author.mention}, please (temporarily) enable DMs to receive the join code")
+ else:
+ await ctx.message.add_reaction(Emojis.envelope)
+
+ @in_month(Month.DECEMBER)
+ @adventofcode_group.command(
+ name="leaderboard",
+ aliases=("board", "lb"),
+ brief="Get a snapshot of the PyDis private AoC leaderboard",
+ )
+ @whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
+ async def aoc_leaderboard(self, ctx: commands.Context) -> None:
+ """Get the current top scorers of the Python Discord Leaderboard."""
+ async with ctx.typing():
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailed:
+ await ctx.send(":x: Unable to fetch leaderboard!")
+ return
+
+ number_of_participants = leaderboard["number_of_participants"]
+
+ top_count = min(AocConfig.leaderboard_displayed_members, number_of_participants)
+ header = f"Here's our current top {top_count}! {Emojis.christmas_tree * 3}"
+
+ table = f"```\n{leaderboard['top_leaderboard']}\n```"
+ info_embed = _helpers.get_summary_embed(leaderboard)
+
+ await ctx.send(content=f"{header}\n\n{table}", embed=info_embed)
+
+ @in_month(Month.DECEMBER)
+ @adventofcode_group.command(
+ name="global",
+ aliases=("globalboard", "gb"),
+ brief="Get a link to the global leaderboard",
+ )
+ @whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
+ async def aoc_global_leaderboard(self, ctx: commands.Context) -> None:
+ """Get a link to the global Advent of Code leaderboard."""
+ url = self.global_leaderboard_url
+ global_leaderboard = discord.Embed(
+ title="Advent of Code — Global Leaderboard",
+ description=f"You can find the global leaderboard [here]({url})."
+ )
+ global_leaderboard.set_thumbnail(url=_helpers.AOC_EMBED_THUMBNAIL)
+ await ctx.send(embed=global_leaderboard)
+
+ @adventofcode_group.command(
+ name="stats",
+ aliases=("dailystats", "ds"),
+ brief="Get daily statistics for the Python Discord leaderboard"
+ )
+ @whitelist_override(channels=AOC_WHITELIST_RESTRICTED)
+ async def private_leaderboard_daily_stats(self, ctx: commands.Context) -> None:
+ """Send an embed with daily completion statistics for the Python Discord leaderboard."""
+ try:
+ leaderboard = await _helpers.fetch_leaderboard()
+ except _helpers.FetchingLeaderboardFailed:
+ await ctx.send(":x: Can't fetch leaderboard for stats right now!")
+ return
+
+ # The daily stats are serialized as JSON as they have to be cached in Redis
+ daily_stats = json.loads(leaderboard["daily_stats"])
+ async with ctx.typing():
+ lines = ["Day ⭐ ⭐⭐ | %⭐ %⭐⭐\n================================"]
+ for day, stars in daily_stats.items():
+ star_one = stars["star_one"]
+ star_two = stars["star_two"]
+ p_star_one = star_one / leaderboard["number_of_participants"]
+ p_star_two = star_two / leaderboard["number_of_participants"]
+ lines.append(
+ f"{day:>2}) {star_one:>4} {star_two:>4} | {p_star_one:>7.2%} {p_star_two:>7.2%}"
+ )
+ table = "\n".join(lines)
+ info_embed = _helpers.get_summary_embed(leaderboard)
+ await ctx.send(f"```\n{table}\n```", embed=info_embed)
+
+ @with_role(Roles.admin)
+ @adventofcode_group.command(
+ name="refresh",
+ aliases=("fetch",),
+ brief="Force a refresh of the leaderboard cache.",
+ )
+ async def refresh_leaderboard(self, ctx: commands.Context) -> None:
+ """
+ Force a refresh of the leaderboard cache.
+
+ Note: This should be used sparingly, as we want to prevent sending too
+ many requests to the Advent of Code server.
+ """
+ async with ctx.typing():
+ try:
+ await _helpers.fetch_leaderboard(invalidate_cache=True)
+ except _helpers.FetchingLeaderboardFailed:
+ await ctx.send(":x: Something went wrong while trying to refresh the cache!")
+ else:
+ await ctx.send("\N{OK Hand Sign} Refreshed leaderboard cache!")
+
+ def cog_unload(self) -> None:
+ """Cancel season-related tasks on cog unload."""
+ log.debug("Unloading the cog and canceling the background task.")
+ self.notification_task.cancel()
+ self.status_task.cancel()
+
+ def _build_about_embed(self) -> discord.Embed:
+ """Build and return the informational "About AoC" embed from the resources file."""
+ embed_fields = json.loads(self.about_aoc_filepath.read_text("utf8"))
+
+ about_embed = discord.Embed(
+ title=self._base_url,
+ colour=Colours.soft_green,
+ url=self._base_url,
+ timestamp=datetime.utcnow()
+ )
+ about_embed.set_author(name="Advent of Code", url=self._base_url)
+ for field in embed_fields:
+ about_embed.add_field(**field)
+
+ about_embed.set_footer(text="Last Updated")
+ return about_embed
+
+ async def cog_command_error(self, ctx: commands.Context, error: Exception) -> None:
+ """Custom error handler if an advent of code command was posted in the wrong channel."""
+ if isinstance(error, InChannelCheckFailure):
+ await ctx.send(f":x: Please use <#{Channels.advent_of_code_commands}> for aoc commands instead.")
+ error.handled = True
diff --git a/bot/exts/events/advent_of_code/_helpers.py b/bot/exts/events/advent_of_code/_helpers.py
new file mode 100644
index 00000000..43aa5a7e
--- /dev/null
+++ b/bot/exts/events/advent_of_code/_helpers.py
@@ -0,0 +1,591 @@
+import asyncio
+import collections
+import datetime
+import json
+import logging
+import math
+import operator
+from typing import Any, Optional
+
+import aiohttp
+import arrow
+import discord
+
+from bot.bot import Bot
+from bot.constants import AdventOfCode, Channels, Colours
+from bot.exts.advent_of_code import _caches
+
+log = logging.getLogger(__name__)
+
+PASTE_URL = "https://paste.pythondiscord.com/documents"
+RAW_PASTE_URL_TEMPLATE = "https://paste.pythondiscord.com/raw/{key}"
+
+# Base API URL for Advent of Code Private Leaderboards
+AOC_API_URL = "https://adventofcode.com/{year}/leaderboard/private/view/{leaderboard_id}.json"
+AOC_REQUEST_HEADER = {"user-agent": "PythonDiscord AoC Event Bot"}
+
+# Leaderboard Line Template
+AOC_TABLE_TEMPLATE = "{rank: >4} | {name:25.25} | {score: >5} | {stars}"
+HEADER = AOC_TABLE_TEMPLATE.format(rank="", name="Name", score="Score", stars="⭐, ⭐⭐")
+HEADER = f"{HEADER}\n{'-' * (len(HEADER) + 2)}"
+HEADER_LINES = len(HEADER.splitlines())
+TOP_LEADERBOARD_LINES = HEADER_LINES + AdventOfCode.leaderboard_displayed_members
+
+# Keys that need to be set for a cached leaderboard
+REQUIRED_CACHE_KEYS = (
+ "full_leaderboard",
+ "top_leaderboard",
+ "full_leaderboard_url",
+ "leaderboard_fetched_at",
+ "number_of_participants",
+ "daily_stats",
+)
+
+AOC_EMBED_THUMBNAIL = (
+ "https://raw.githubusercontent.com/python-discord"
+ "/branding/main/seasonal/christmas/server_icons/festive_256.gif"
+)
+
+# Create an easy constant for the EST timezone
+EST = "America/New_York"
+
+# Step size for the challenge countdown status
+COUNTDOWN_STEP = 60 * 5
+
+# Create namedtuple that combines a participant's name and their completion
+# time for a specific star. We're going to use this later to order the results
+# for each star to compute the rank score.
+StarResult = collections.namedtuple("StarResult", "member_id completion_time")
+
+
+class UnexpectedRedirect(aiohttp.ClientError):
+ """Raised when an unexpected redirect was detected."""
+
+
+class UnexpectedResponseStatus(aiohttp.ClientError):
+ """Raised when an unexpected redirect was detected."""
+
+
+class FetchingLeaderboardFailedError(Exception):
+ """Raised when one or more leaderboards could not be fetched at all."""
+
+
+def leaderboard_sorting_function(entry: tuple[str, dict]) -> tuple[int, int]:
+ """
+ Provide a sorting value for our leaderboard.
+
+ The leaderboard is sorted primarily on the score someone has received and
+ secondary on the number of stars someone has completed.
+ """
+ result = entry[1]
+ return result["score"], result["star_2"] + result["star_1"]
+
+
+def _parse_raw_leaderboard_data(raw_leaderboard_data: dict) -> dict:
+ """
+ Parse the leaderboard data received from the AoC website.
+
+ The data we receive from AoC is structured by member, not by day/star. This
+ means that we need to "transpose" the data to a per star structure in order
+ to calculate the rank scores each individual should get.
+
+ As we need our data both "per participant" as well as "per day", we return
+ the parsed and analyzed data in both formats.
+ """
+ # We need to get an aggregate of completion times for each star of each day,
+ # instead of per participant to compute the rank scores. This dictionary will
+ # provide such a transposed dataset.
+ star_results = collections.defaultdict(list)
+
+ # As we're already iterating over the participants, we can record the number of
+ # first stars and second stars they've achieved right here and now. This means
+ # we won't have to iterate over the participants again later.
+ leaderboard = {}
+
+ # The data we get from the AoC website is structured by member, not by day/star,
+ # which means we need to iterate over the members to transpose the data to a per
+ # star view. We need that per star view to compute rank scores per star.
+ for member in raw_leaderboard_data.values():
+ name = member["name"] if member["name"] else f"Anonymous #{member['id']}"
+ member_id = member["id"]
+ leaderboard[member_id] = {"name": name, "score": 0, "star_1": 0, "star_2": 0}
+
+ # Iterate over all days for this participant
+ for day, stars in member["completion_day_level"].items():
+ # Iterate over the complete stars for this day for this participant
+ for star, data in stars.items():
+ # Record completion of this star for this individual
+ leaderboard[member_id][f"star_{star}"] += 1
+
+ # Record completion datetime for this participant for this day/star
+ completion_time = datetime.datetime.fromtimestamp(int(data["get_star_ts"]))
+ star_results[(day, star)].append(
+ StarResult(member_id=member_id, completion_time=completion_time)
+ )
+
+ # Now that we have a transposed dataset that holds the completion time of all
+ # participants per star, we can compute the rank-based scores each participant
+ # should get for that star.
+ max_score = len(leaderboard)
+ for (day, _star), results in star_results.items():
+ # If this day should not count in the ranking, skip it.
+ if day in AdventOfCode.ignored_days:
+ continue
+
+ sorted_result = sorted(results, key=operator.attrgetter("completion_time"))
+ for rank, star_result in enumerate(sorted_result):
+ leaderboard[star_result.member_id]["score"] += max_score - rank
+
+ # Since dictionaries now retain insertion order, let's use that
+ sorted_leaderboard = dict(
+ sorted(leaderboard.items(), key=leaderboard_sorting_function, reverse=True)
+ )
+
+ # Create summary stats for the stars completed for each day of the event.
+ daily_stats = {}
+ for day in range(1, 26):
+ day = str(day)
+ star_one = len(star_results.get((day, "1"), []))
+ star_two = len(star_results.get((day, "2"), []))
+ # By using a dictionary instead of namedtuple here, we can serialize
+ # this data to JSON in order to cache it in Redis.
+ daily_stats[day] = {"star_one": star_one, "star_two": star_two}
+
+ return {"daily_stats": daily_stats, "leaderboard": sorted_leaderboard}
+
+
+def _format_leaderboard(leaderboard: dict[str, dict]) -> str:
+ """Format the leaderboard using the AOC_TABLE_TEMPLATE."""
+ leaderboard_lines = [HEADER]
+ for rank, data in enumerate(leaderboard.values(), start=1):
+ leaderboard_lines.append(
+ AOC_TABLE_TEMPLATE.format(
+ rank=rank,
+ name=data["name"],
+ score=str(data["score"]),
+ stars=f"({data['star_1']}, {data['star_2']})"
+ )
+ )
+
+ return "\n".join(leaderboard_lines)
+
+
+async def _leaderboard_request(url: str, board: str, cookies: dict) -> dict[str, Any]:
+ """Make a leaderboard request using the specified session cookie."""
+ async with aiohttp.request("GET", url, headers=AOC_REQUEST_HEADER, cookies=cookies) as resp:
+ # The Advent of Code website redirects silently with a 200 response if a
+ # session cookie has expired, is invalid, or was not provided.
+ if str(resp.url) != url:
+ log.error(f"Fetching leaderboard `{board}` failed! Check the session cookie.")
+ raise UnexpectedRedirect(f"redirected unexpectedly to {resp.url} for board `{board}`")
+
+ # Every status other than `200` is unexpected, not only 400+
+ if not resp.status == 200:
+ log.error(f"Unexpected response `{resp.status}` while fetching leaderboard `{board}`")
+ raise UnexpectedResponseStatus(f"status `{resp.status}`")
+
+ return await resp.json()
+
+
+async def _fetch_leaderboard_data() -> dict[str, Any]:
+ """Fetch data for all leaderboards and return a pooled result."""
+ year = AdventOfCode.year
+
+ # We'll make our requests one at a time to not flood the AoC website with
+ # up to six simultaneous requests. This may take a little longer, but it
+ # does avoid putting unnecessary stress on the Advent of Code website.
+
+ # Container to store the raw data of each leaderboard
+ participants = {}
+ for leaderboard in AdventOfCode.leaderboards.values():
+ leaderboard_url = AOC_API_URL.format(year=year, leaderboard_id=leaderboard.id)
+
+ # Two attempts, one with the original session cookie and one with the fallback session
+ for attempt in range(1, 3):
+ log.info(f"Attempting to fetch leaderboard `{leaderboard.id}` ({attempt}/2)")
+ cookies = {"session": leaderboard.session}
+ try:
+ raw_data = await _leaderboard_request(leaderboard_url, leaderboard.id, cookies)
+ except UnexpectedRedirect:
+ if cookies["session"] == AdventOfCode.fallback_session:
+ log.error("It seems like the fallback cookie has expired!")
+ raise FetchingLeaderboardFailedError from None
+
+ # If we're here, it means that the original session did not
+ # work. Let's fall back to the fallback session.
+ leaderboard.use_fallback_session = True
+ continue
+ except aiohttp.ClientError:
+ # Don't retry, something unexpected is wrong and it may not be the session.
+ raise FetchingLeaderboardFailedError from None
+ else:
+ # Get the participants and store their current count.
+ board_participants = raw_data["members"]
+ await _caches.leaderboard_counts.set(leaderboard.id, len(board_participants))
+ participants.update(board_participants)
+ break
+ else:
+ log.error(f"reached 'unreachable' state while fetching board `{leaderboard.id}`.")
+ raise FetchingLeaderboardFailedError
+
+ log.info(f"Fetched leaderboard information for {len(participants)} participants")
+ return participants
+
+
+async def _upload_leaderboard(leaderboard: str) -> str:
+ """Upload the full leaderboard to our paste service and return the URL."""
+ async with aiohttp.request("POST", PASTE_URL, data=leaderboard) as resp:
+ try:
+ resp_json = await resp.json()
+ except Exception:
+ log.exception("Failed to upload full leaderboard to paste service")
+ return ""
+
+ if "key" in resp_json:
+ return RAW_PASTE_URL_TEMPLATE.format(key=resp_json["key"])
+
+ log.error(f"Unexpected response from paste service while uploading leaderboard {resp_json}")
+ return ""
+
+
+def _get_top_leaderboard(full_leaderboard: str) -> str:
+ """Get the leaderboard up to the maximum specified entries."""
+ return "\n".join(full_leaderboard.splitlines()[:TOP_LEADERBOARD_LINES])
+
+
+@_caches.leaderboard_cache.atomic_transaction
+async def fetch_leaderboard(invalidate_cache: bool = False) -> dict:
+ """
+ Get the current Python Discord combined leaderboard.
+
+ The leaderboard is cached and only fetched from the API if the current data
+ is older than the lifetime set in the constants. To prevent multiple calls
+ to this function fetching new leaderboard information in case of a cache
+ miss, this function is locked to one call at a time using a decorator.
+ """
+ cached_leaderboard = await _caches.leaderboard_cache.to_dict()
+
+ # Check if the cached leaderboard contains everything we expect it to. If it
+ # does not, this probably means the cache has not been created yet or has
+ # expired in Redis. This check also accounts for a malformed cache.
+ if invalidate_cache or any(key not in cached_leaderboard for key in REQUIRED_CACHE_KEYS):
+ log.info("No leaderboard cache available, fetching leaderboards...")
+ # Fetch the raw data
+ raw_leaderboard_data = await _fetch_leaderboard_data()
+
+ # Parse it to extract "per star, per day" data and participant scores
+ parsed_leaderboard_data = _parse_raw_leaderboard_data(raw_leaderboard_data)
+
+ leaderboard = parsed_leaderboard_data["leaderboard"]
+ number_of_participants = len(leaderboard)
+ formatted_leaderboard = _format_leaderboard(leaderboard)
+ full_leaderboard_url = await _upload_leaderboard(formatted_leaderboard)
+ leaderboard_fetched_at = datetime.datetime.utcnow().isoformat()
+
+ cached_leaderboard = {
+ "full_leaderboard": formatted_leaderboard,
+ "top_leaderboard": _get_top_leaderboard(formatted_leaderboard),
+ "full_leaderboard_url": full_leaderboard_url,
+ "leaderboard_fetched_at": leaderboard_fetched_at,
+ "number_of_participants": number_of_participants,
+ "daily_stats": json.dumps(parsed_leaderboard_data["daily_stats"]),
+ }
+
+ # Store the new values in Redis
+ await _caches.leaderboard_cache.update(cached_leaderboard)
+
+ # Set an expiry on the leaderboard RedisCache
+ with await _caches.leaderboard_cache._get_pool_connection() as connection:
+ await connection.expire(
+ _caches.leaderboard_cache.namespace,
+ AdventOfCode.leaderboard_cache_expiry_seconds
+ )
+
+ return cached_leaderboard
+
+
+def get_summary_embed(leaderboard: dict) -> discord.Embed:
+ """Get an embed with the current summary stats of the leaderboard."""
+ leaderboard_url = leaderboard["full_leaderboard_url"]
+ refresh_minutes = AdventOfCode.leaderboard_cache_expiry_seconds // 60
+
+ aoc_embed = discord.Embed(
+ colour=Colours.soft_green,
+ timestamp=datetime.datetime.fromisoformat(leaderboard["leaderboard_fetched_at"]),
+ description=f"*The leaderboard is refreshed every {refresh_minutes} minutes.*"
+ )
+ aoc_embed.add_field(
+ name="Number of Participants",
+ value=leaderboard["number_of_participants"],
+ inline=True,
+ )
+ if leaderboard_url:
+ aoc_embed.add_field(
+ name="Full Leaderboard",
+ value=f"[Python Discord Leaderboard]({leaderboard_url})",
+ inline=True,
+ )
+ aoc_embed.set_author(name="Advent of Code", url=leaderboard_url)
+ aoc_embed.set_footer(text="Last Updated")
+ aoc_embed.set_thumbnail(url=AOC_EMBED_THUMBNAIL)
+
+ return aoc_embed
+
+
+async def get_public_join_code(author: discord.Member) -> Optional[str]:
+ """
+ Get the join code for one of the non-staff leaderboards.
+
+ If a user has previously requested a join code and their assigned board
+ hasn't filled up yet, we'll return the same join code to prevent them from
+ getting join codes for multiple boards.
+ """
+ # Make sure to fetch new leaderboard information if the cache is older than
+ # 30 minutes. While this still means that there could be a discrepancy
+ # between the current leaderboard state and the numbers we have here, this
+ # should work fairly well given the buffer of slots that we have.
+ await fetch_leaderboard()
+ previously_assigned_board = await _caches.assigned_leaderboard.get(author.id)
+ current_board_counts = await _caches.leaderboard_counts.to_dict()
+
+ # Remove the staff board from the current board counts as it should be ignored.
+ current_board_counts.pop(AdventOfCode.staff_leaderboard_id, None)
+
+ # If this user has already received a join code, we'll give them the
+ # exact same one to prevent them from joining multiple boards and taking
+ # up multiple slots.
+ if previously_assigned_board:
+ # Check if their previously assigned board still has room for them
+ if current_board_counts.get(previously_assigned_board, 0) < 200:
+ log.info(f"{author} ({author.id}) was already assigned to a board with open slots.")
+ return AdventOfCode.leaderboards[previously_assigned_board].join_code
+
+ log.info(
+ f"User {author} ({author.id}) previously received the join code for "
+ f"board `{previously_assigned_board}`, but that board's now full. "
+ "Assigning another board to this user."
+ )
+
+ # If we don't have the current board counts cached, let's force fetching a new cache
+ if not current_board_counts:
+ log.warning("Leaderboard counts were missing from the cache unexpectedly!")
+ await fetch_leaderboard(invalidate_cache=True)
+ current_board_counts = await _caches.leaderboard_counts.to_dict()
+
+ # Find the board with the current lowest participant count. As we can't
+ best_board, _count = min(current_board_counts.items(), key=operator.itemgetter(1))
+
+ if current_board_counts.get(best_board, 0) >= 200:
+ log.warning(f"User {author} `{author.id}` requested a join code, but all boards are full!")
+ return
+
+ log.info(f"Assigning user {author} ({author.id}) to board `{best_board}`")
+ await _caches.assigned_leaderboard.set(author.id, best_board)
+
+ # Return the join code for this board
+ return AdventOfCode.leaderboards[best_board].join_code
+
+
+def is_in_advent() -> bool:
+ """
+ Check if we're currently on an Advent of Code day, excluding 25 December.
+
+ This helper function is used to check whether or not a feature that prepares
+ something for the next Advent of Code challenge should run. As the puzzle
+ published on the 25th is the last puzzle, this check excludes that date.
+ """
+ return arrow.now(EST).day in range(1, 25) and arrow.now(EST).month == 12
+
+
+def time_left_to_est_midnight() -> tuple[datetime.datetime, datetime.timedelta]:
+ """Calculate the amount of time left until midnight EST/UTC-5."""
+ # Change all time properties back to 00:00
+ todays_midnight = arrow.now(EST).replace(
+ microsecond=0,
+ second=0,
+ minute=0,
+ hour=0
+ )
+
+ # We want tomorrow so add a day on
+ tomorrow = todays_midnight + datetime.timedelta(days=1)
+
+ # Calculate the timedelta between the current time and midnight
+ return tomorrow, tomorrow - arrow.now(EST)
+
+
+async def wait_for_advent_of_code(*, hours_before: int = 1) -> None:
+ """
+ Wait for the Advent of Code event to start.
+
+ This function returns `hours_before` (default: 1) the Advent of Code
+ actually starts. This allows functions to schedule and execute code that
+ needs to run before the event starts.
+
+ If the event has already started, this function returns immediately.
+
+ Note: The "next Advent of Code" is determined based on the current value
+ of the `AOC_YEAR` environment variable. This allows callers to exit early
+ if we're already past the Advent of Code edition the bot is currently
+ configured for.
+ """
+ start = arrow.get(datetime.datetime(AdventOfCode.year, 12, 1), EST)
+ target = start - datetime.timedelta(hours=hours_before)
+ now = arrow.now(EST)
+
+ # If we've already reached or passed to target, we
+ # simply return immediately.
+ if now >= target:
+ return
+
+ delta = target - now
+ await asyncio.sleep(delta.total_seconds())
+
+
+async def countdown_status(bot: Bot) -> None:
+ """
+ Add the time until the next challenge is published to the bot's status.
+
+ This function sleeps until 2 hours before the event and exists one hour
+ after the last challenge has been published. It will not start up again
+ automatically for next year's event, as it will wait for the environment
+ variable AOC_YEAR to be updated.
+
+ This ensures that the task will only start sleeping again once the next
+ event approaches and we're making preparations for that event.
+ """
+ log.debug("Initializing status countdown task.")
+ # We wait until 2 hours before the event starts. Then we
+ # set our first countdown status.
+ await wait_for_advent_of_code(hours_before=2)
+
+ # Log that we're going to start with the countdown status.
+ log.info("The Advent of Code has started or will start soon, starting countdown status.")
+
+ # Trying to change status too early in the bot's startup sequence will fail
+ # the task because the websocket instance has not yet been created. Waiting
+ # for this event means that both the websocket instance has been initialized
+ # and that the connection to Discord is mature enough to change the presence
+ # of the bot.
+ await bot.wait_until_guild_available()
+
+ # Calculate when the task needs to stop running. To prevent the task from
+ # sleeping for the entire year, it will only wait in the currently
+ # configured year. This means that the task will only start hibernating once
+ # we start preparing the next event by changing environment variables.
+ last_challenge = arrow.get(datetime.datetime(AdventOfCode.year, 12, 25), EST)
+ end = last_challenge + datetime.timedelta(hours=1)
+
+ while arrow.now(EST) < end:
+ _, time_left = time_left_to_est_midnight()
+
+ aligned_seconds = int(math.ceil(time_left.seconds / COUNTDOWN_STEP)) * COUNTDOWN_STEP
+ hours, minutes = aligned_seconds // 3600, aligned_seconds // 60 % 60
+
+ if aligned_seconds == 0:
+ playing = "right now!"
+ elif aligned_seconds == COUNTDOWN_STEP:
+ playing = f"in less than {minutes} minutes"
+ elif hours == 0:
+ playing = f"in {minutes} minutes"
+ elif hours == 23:
+ playing = f"since {60 - minutes} minutes ago"
+ else:
+ playing = f"in {hours} hours and {minutes} minutes"
+
+ log.trace(f"Changing presence to {playing!r}")
+ # Status will look like "Playing in 5 hours and 30 minutes"
+ await bot.change_presence(activity=discord.Game(playing))
+
+ # Sleep until next aligned time or a full step if already aligned
+ delay = time_left.seconds % COUNTDOWN_STEP or COUNTDOWN_STEP
+ log.trace(f"The countdown status task will sleep for {delay} seconds.")
+ await asyncio.sleep(delay)
+
+
+async def new_puzzle_notification(bot: Bot) -> None:
+ """
+ Announce the release of a new Advent of Code puzzle.
+
+ This background task hibernates until just before the Advent of Code starts
+ and will then start announcing puzzles as they are published. After the
+ event has finished, this task will terminate.
+ """
+ # We wake up one hour before the event starts to prepare the announcement
+ # of the release of the first puzzle.
+ await wait_for_advent_of_code(hours_before=1)
+
+ log.info("The Advent of Code has started or will start soon, waking up notification task.")
+
+ # Ensure that the guild cache is loaded so we can get the Advent of Code
+ # channel and role.
+ await bot.wait_until_guild_available()
+ aoc_channel = bot.get_channel(Channels.advent_of_code)
+ aoc_role = aoc_channel.guild.get_role(AdventOfCode.role_id)
+
+ if not aoc_channel:
+ log.error("Could not find the AoC channel to send notification in")
+ return
+
+ if not aoc_role:
+ log.error("Could not find the AoC role to announce the daily puzzle")
+ return
+
+ # The last event day is 25 December, so we only have to schedule
+ # a reminder if the current day is before 25 December.
+ end = arrow.get(datetime.datetime(AdventOfCode.year, 12, 25), EST)
+ while arrow.now(EST) < end:
+ log.trace("Started puzzle notification loop.")
+ tomorrow, time_left = time_left_to_est_midnight()
+
+ # Use `total_seconds` to get the time left in fractional seconds This
+ # should wake us up very close to the target. As a safe guard, the sleep
+ # duration is padded with 0.1 second to make sure we wake up after
+ # midnight.
+ sleep_seconds = time_left.total_seconds() + 0.1
+ log.trace(f"The puzzle notification task will sleep for {sleep_seconds} seconds")
+ await asyncio.sleep(sleep_seconds)
+
+ puzzle_url = f"https://adventofcode.com/{AdventOfCode.year}/day/{tomorrow.day}"
+
+ # Check if the puzzle is already available to prevent our members from spamming
+ # the puzzle page before it's available by making a small HEAD request.
+ for retry in range(1, 5):
+ log.debug(f"Checking if the puzzle is already available (attempt {retry}/4)")
+ async with bot.http_session.head(puzzle_url, raise_for_status=False) as resp:
+ if resp.status == 200:
+ log.debug("Puzzle is available; let's send an announcement message.")
+ break
+ log.debug(f"The puzzle is not yet available (status={resp.status})")
+ await asyncio.sleep(10)
+ else:
+ log.error(
+ "The puzzle does does not appear to be available "
+ "at this time, canceling announcement"
+ )
+ break
+
+ await aoc_channel.send(
+ f"{aoc_role.mention} Good morning! Day {tomorrow.day} is ready to be attempted. "
+ f"View it online now at {puzzle_url}. Good luck!",
+ allowed_mentions=discord.AllowedMentions(
+ everyone=False,
+ users=False,
+ roles=[aoc_role],
+ )
+ )
+
+ # Ensure that we don't send duplicate announcements by sleeping to well
+ # over midnight. This means we're certain to calculate the time to the
+ # next midnight at the top of the loop.
+ await asyncio.sleep(120)
+
+
+def background_task_callback(task: asyncio.Task) -> None:
+ """Check if the finished background task failed to make sure we log errors."""
+ if task.cancelled():
+ log.info(f"Background task `{task.get_name()}` was cancelled.")
+ elif exception := task.exception():
+ log.error(f"Background task `{task.get_name()}` failed:", exc_info=exception)
+ else:
+ log.info(f"Background task `{task.get_name()}` exited normally.")
diff --git a/bot/exts/events/hacktoberfest/__init__.py b/bot/exts/events/hacktoberfest/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/bot/exts/events/hacktoberfest/__init__.py
diff --git a/bot/exts/events/hacktoberfest/hacktober-issue-finder.py b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
new file mode 100644
index 00000000..e3053851
--- /dev/null
+++ b/bot/exts/events/hacktoberfest/hacktober-issue-finder.py
@@ -0,0 +1,117 @@
+import datetime
+import logging
+import random
+from typing import Optional
+
+import discord
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Month, Tokens
+from bot.utils.decorators import in_month
+
+log = logging.getLogger(__name__)
+
+URL = "https://api.github.com/search/issues?per_page=100&q=is:issue+label:hacktoberfest+language:python+state:open"
+
+REQUEST_HEADERS = {
+ "User-Agent": "Python Discord Hacktoberbot",
+ "Accept": "application / vnd.github.v3 + json"
+}
+if GITHUB_TOKEN := Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
+
+
+class HacktoberIssues(commands.Cog):
+ """Find a random hacktober python issue on GitHub."""
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+ self.cache_normal = None
+ self.cache_timer_normal = datetime.datetime(1, 1, 1)
+ self.cache_beginner = None
+ self.cache_timer_beginner = datetime.datetime(1, 1, 1)
+
+ @in_month(Month.OCTOBER)
+ @commands.command()
+ async def hacktoberissues(self, ctx: commands.Context, option: str = "") -> None:
+ """
+ Get a random python hacktober issue from Github.
+
+ If the command is run with beginner (`.hacktoberissues beginner`):
+ It will also narrow it down to the "first good issue" label.
+ """
+ async with ctx.typing():
+ issues = await self.get_issues(ctx, option)
+ if issues is None:
+ return
+ issue = random.choice(issues["items"])
+ embed = self.format_embed(issue)
+ await ctx.send(embed=embed)
+
+ async def get_issues(self, ctx: commands.Context, option: str) -> Optional[dict]:
+ """Get a list of the python issues with the label 'hacktoberfest' from the Github api."""
+ if option == "beginner":
+ if (ctx.message.created_at - self.cache_timer_beginner).seconds <= 60:
+ log.debug("using cache")
+ return self.cache_beginner
+ elif (ctx.message.created_at - self.cache_timer_normal).seconds <= 60:
+ log.debug("using cache")
+ return self.cache_normal
+
+ if option == "beginner":
+ url = URL + '+label:"good first issue"'
+ if self.cache_beginner is not None:
+ page = random.randint(1, min(1000, self.cache_beginner["total_count"]) // 100)
+ url += f"&page={page}"
+ else:
+ url = URL
+ if self.cache_normal is not None:
+ page = random.randint(1, min(1000, self.cache_normal["total_count"]) // 100)
+ url += f"&page={page}"
+
+ log.debug(f"making api request to url: {url}")
+ async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as response:
+ if response.status != 200:
+ log.error(f"expected 200 status (got {response.status}) by the GitHub api.")
+ await ctx.send(
+ f"ERROR: expected 200 status (got {response.status}) by the GitHub api.\n"
+ f"{await response.text()}"
+ )
+ return None
+ data = await response.json()
+
+ if len(data["items"]) == 0:
+ log.error(f"no issues returned by GitHub API, with url: {response.url}")
+ await ctx.send(f"ERROR: no issues returned by GitHub API, with url: {response.url}")
+ return None
+
+ if option == "beginner":
+ self.cache_beginner = data
+ self.cache_timer_beginner = ctx.message.created_at
+ else:
+ self.cache_normal = data
+ self.cache_timer_normal = ctx.message.created_at
+
+ return data
+
+ @staticmethod
+ def format_embed(issue: dict) -> discord.Embed:
+ """Format the issue data into a embed."""
+ title = issue["title"]
+ issue_url = issue["url"].replace("api.", "").replace("/repos/", "/")
+ body = issue["body"]
+ labels = [label["name"] for label in issue["labels"]]
+
+ embed = discord.Embed(title=title)
+ embed.description = body[:500] + "..." if len(body) > 500 else body
+ embed.add_field(name="labels", value="\n".join(labels))
+ embed.url = issue_url
+ embed.set_footer(text=issue_url)
+
+ return embed
+
+
+def setup(bot: Bot) -> None:
+ """Load the HacktoberIssue finder."""
+ bot.add_cog(HacktoberIssues(bot))
diff --git a/bot/exts/events/hacktoberfest/hacktoberstats.py b/bot/exts/events/hacktoberfest/hacktoberstats.py
new file mode 100644
index 00000000..72067dbe
--- /dev/null
+++ b/bot/exts/events/hacktoberfest/hacktoberstats.py
@@ -0,0 +1,437 @@
+import logging
+import random
+import re
+from collections import Counter
+from datetime import datetime, timedelta
+from typing import Optional, Union
+from urllib.parse import quote_plus
+
+import discord
+from async_rediscache import RedisCache
+from discord.ext import commands
+
+from bot.bot import Bot
+from bot.constants import Colours, Month, NEGATIVE_REPLIES, Tokens
+from bot.utils.decorators import in_month
+
+log = logging.getLogger(__name__)
+
+CURRENT_YEAR = datetime.now().year # Used to construct GH API query
+PRS_FOR_SHIRT = 4 # Minimum number of PRs before a shirt is awarded
+REVIEW_DAYS = 14 # number of days needed after PR can be mature
+
+REQUEST_HEADERS = {"User-Agent": "Python Discord Hacktoberbot"}
+# using repo topics API during preview period requires an accept header
+GITHUB_TOPICS_ACCEPT_HEADER = {"Accept": "application/vnd.github.mercy-preview+json"}
+if GITHUB_TOKEN := Tokens.github:
+ REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"
+ GITHUB_TOPICS_ACCEPT_HEADER["Authorization"] = f"token {GITHUB_TOKEN}"
+
+GITHUB_NONEXISTENT_USER_MESSAGE = (
+ "The listed users cannot be searched either because the users do not exist "
+ "or you do not have permission to view the users."
+)
+
+
+class HacktoberStats(commands.Cog):
+ """Hacktoberfest statistics Cog."""
+
+ # Stores mapping of user IDs and GitHub usernames
+ linked_accounts = RedisCache()
+
+ def __init__(self, bot: Bot):
+ self.bot = bot
+
+ @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER)
+ @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True)
+ async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None) -> None:
+ """
+ Display an embed for a user's Hacktoberfest contributions.
+
+ If invoked without a subcommand or github_username, get the invoking user's stats if they've
+ linked their Discord name to GitHub using .stats link. If invoked with a github_username,
+ get that user's contributions
+ """
+ if not github_username:
+ author_id, author_mention = self._author_mention_from_context(ctx)
+
+ if await self.linked_accounts.contains(author_id):
+ github_username = await self.linked_accounts.get(author_id)
+ logging.info(f"Getting stats for {author_id} linked GitHub account '{github_username}'")
+ else:
+ msg = (
+ f"{author_mention}, you have not linked a GitHub account\n\n"
+ f"You can link your GitHub account using:\n```\n{ctx.prefix}hackstats link github_username\n```\n"
+ f"Or query GitHub stats directly using:\n```\n{ctx.prefix}hackstats github_username\n```"
+ )
+ await ctx.send(msg)
+ return
+
+ await self.get_stats(ctx, github_username)
+
+ @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER)
+ @hacktoberstats_group.command(name="link")
+ async def link_user(self, ctx: commands.Context, github_username: str = None) -> None:
+ """
+ Link the invoking user's Github github_username to their Discord ID.
+
+ Linked users are stored in Redis: User ID => GitHub Username.
+ """
+ author_id, author_mention = self._author_mention_from_context(ctx)
+ if github_username:
+ if await self.linked_accounts.contains(author_id):
+ old_username = await self.linked_accounts.get(author_id)
+ log.info(f"{author_id} has changed their github link from '{old_username}' to '{github_username}'")
+ await ctx.send(f"{author_mention}, your GitHub username has been updated to: '{github_username}'")
+ else:
+ log.info(f"{author_id} has added a github link to '{github_username}'")
+ await ctx.send(f"{author_mention}, your GitHub username has been added")
+
+ await self.linked_accounts.set(author_id, github_username)
+ else:
+ log.info(f"{author_id} tried to link a GitHub account but didn't provide a username")
+ await ctx.send(f"{author_mention}, a GitHub username is required to link your account")
+
+ @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER)
+ @hacktoberstats_group.command(name="unlink")
+ async def unlink_user(self, ctx: commands.Context) -> None:
+ """Remove the invoking user's account link from the log."""
+ author_id, author_mention = self._author_mention_from_context(ctx)
+
+ stored_user = await self.linked_accounts.pop(author_id, None)
+ if stored_user:
+ await ctx.send(f"{author_mention}, your GitHub profile has been unlinked")
+ logging.info(f"{author_id} has unlinked their GitHub account")
+ else:
+ await ctx.send(f"{author_mention}, you do not currently have a linked GitHub account")
+ logging.info(f"{author_id} tried to unlink their GitHub account but no account was linked")
+
+ async def get_stats(self, ctx: commands.Context, github_username: str) -> None:
+ """
+ Query GitHub's API for PRs created by a GitHub user during the month of October.
+
+ PRs with an 'invalid' or 'spam' label are ignored
+
+ For PRs created after October 3rd, they have to be in a repository that has a
+ 'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it
+ to count.
+
+ If a valid github_username is provided, an embed is generated and posted to the channel
+
+ Otherwise, post a helpful error message
+ """
+ async with ctx.typing():
+ prs = await self.get_october_prs(github_username)
+
+ if prs is None: # Will be None if the user was not found
+ await ctx.send(
+ embed=discord.Embed(
+ title=random.choice(NEGATIVE_REPLIES),
+ description=f"GitHub user `{github_username}` was not found.",
+ colour=discord.Colour.red()
+ )
+ )
+ return
+
+ if prs:
+ stats_embed = await self.build_embed(github_username, prs)
+ await ctx.send("Here are some stats!", embed=stats_embed)
+ else:
+ await ctx.send(f"No valid Hacktoberfest PRs found for '{github_username}'")
+
+ async def build_embed(self, github_username: str, prs: list[dict]) -> discord.Embed:
+ """Return a stats embed built from github_username's PRs."""
+ logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'")
+ in_review, accepted = await self._categorize_prs(prs)
+
+ n = len(accepted) + len(in_review) # Total number of PRs
+ if n >= PRS_FOR_SHIRT:
+ shirtstr = f"**{github_username} is eligible for a T-shirt or a tree!**"
+ elif n == PRS_FOR_SHIRT - 1:
+ shirtstr = f"**{github_username} is 1 PR away from a T-shirt or a tree!**"
+ else:
+ shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a T-shirt or a tree!**"
+
+ stats_embed = discord.Embed(
+ title=f"{github_username}'s Hacktoberfest",
+ color=Colours.purple,
+ description=(
+ f"{github_username} has made {n} valid "
+ f"{self._contributionator(n)} in "
+ f"October\n\n"
+ f"{shirtstr}\n\n"
+ )
+ )
+
+ stats_embed.set_thumbnail(url=f"https://www.github.com/{github_username}.png")
+ stats_embed.set_author(
+ name="Hacktoberfest",
+ url="https://hacktoberfest.digitalocean.com",
+ icon_url="https://avatars1.githubusercontent.com/u/35706162?s=200&v=4"
+ )
+
+ # This will handle when no PRs in_review or accepted
+ review_str = self._build_prs_string(in_review, github_username) or "None"
+ accepted_str = self._build_prs_string(accepted, github_username) or "None"
+ stats_embed.add_field(
+ name=":clock1: In Review",
+ value=review_str
+ )
+ stats_embed.add_field(
+ name=":tada: Accepted",
+ value=accepted_str
+ )
+
+ logging.info(f"Hacktoberfest PR built for GitHub user '{github_username}'")
+ return stats_embed
+
+ async def get_october_prs(self, github_username: str) -> Optional[list[dict]]:
+ """
+ Query GitHub's API for PRs created during the month of October by github_username.
+
+ PRs with an 'invalid' or 'spam' label are ignored unless it is merged or approved
+
+ For PRs created after October 3rd, they have to be in a repository that has a
+ 'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it
+ to count.
+
+ If PRs are found, return a list of dicts with basic PR information
+
+ For each PR:
+ {
+ "repo_url": str
+ "repo_shortname": str (e.g. "python-discord/sir-lancebot")
+ "created_at": datetime.datetime
+ "number": int
+ }
+
+ Otherwise, return empty list.
+ None will be returned when the GitHub user was not found.
+ """
+ log.info(f"Fetching Hacktoberfest Stats for GitHub user: '{github_username}'")
+ base_url = "https://api.github.com/search/issues"
+ action_type = "pr"
+ is_query = "public"
+ not_query = "draft"
+ date_range = f"{CURRENT_YEAR}-09-30T10:00Z..{CURRENT_YEAR}-11-01T12:00Z"
+ per_page = "300"
+ query_params = (
+ f"+type:{action_type}"
+ f"+is:{is_query}"
+ f"+author:{quote_plus(github_username)}"
+ f"+-is:{not_query}"
+ f"+created:{date_range}"
+ f"&per_page={per_page}"
+ )
+
+ log.debug(f"GitHub query parameters generated: {query_params}")
+
+ jsonresp = await self._fetch_url(base_url, REQUEST_HEADERS, {"q": query_params})
+ if "message" in jsonresp:
+ # One of the parameters is invalid, short circuit for now
+ api_message = jsonresp["errors"][0]["message"]
+
+ # Ignore logging non-existent users or users we do not have permission to see
+ if api_message == GITHUB_NONEXISTENT_USER_MESSAGE:
+ log.debug(f"No GitHub user found named '{github_username}'")
+ return
+ else:
+ log.error(f"GitHub API request for '{github_username}' failed with message: {api_message}")
+ return [] # No October PRs were found due to error
+
+ if jsonresp["total_count"] == 0:
+ # Short circuit if there aren't any PRs
+ log.info(f"No October PRs found for GitHub user: '{github_username}'")
+ return []
+
+ logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'")
+ outlist = [] # list of pr information dicts that will get returned
+ oct3 = datetime(int(CURRENT_YEAR), 10, 3, 23, 59, 59, tzinfo=None)
+ hackto_topics = {} # cache whether each repo has the appropriate topic (bool values)
+ for item in jsonresp["items"]:
+ shortname = self._get_shortname(item["repository_url"])
+ itemdict = {
+ "repo_url": f"https://www.github.com/{shortname}",
+ "repo_shortname": shortname,
+ "created_at": datetime.strptime(
+ item["created_at"], "%Y-%m-%dT%H:%M:%SZ"
+ ),
+ "number": item["number"]
+ }
+
+ # If the PR has 'invalid' or 'spam' labels, the PR must be
+ # either merged or approved for it to be included
+ if self._has_label(item, ["invalid", "spam"]):
+ if not await self._is_accepted(itemdict):
+ continue
+
+ # PRs before oct 3 no need to check for topics
+ # continue the loop if 'hacktoberfest-accepted' is labelled then
+ # there is no need to check for its topics
+ if itemdict["created_at"] < oct3:
+ outlist.append(itemdict)
+ continue
+
+ # Checking PR's labels for "hacktoberfest-accepted"
+ if self._has_label(item, "hacktoberfest-accepted"):
+ outlist.append(itemdict)
+ continue
+
+ # No need to query GitHub if repo topics are fetched before already
+ if hackto_topics.get(shortname):
+ outlist.append(itemdict)
+ continue
+ # Fetch topics for the PR's repo
+ topics_query_url = f"https://api.github.com/repos/{shortname}/topics"
+ log.debug(f"Fetching repo topics for {shortname} with url: {topics_query_url}")
+ jsonresp2 = await self._fetch_url(topics_query_url, GITHUB_TOPICS_ACCEPT_HEADER)
+ if jsonresp2.get("names") is None:
+ log.error(f"Error fetching topics for {shortname}: {jsonresp2['message']}")
+ continue # Assume the repo doesn't have the `hacktoberfest` topic if API request errored
+
+ # PRs after oct 3 that doesn't have 'hacktoberfest-accepted' label
+ # must be in repo with 'hacktoberfest' topic
+ if "hacktoberfest" in jsonresp2["names"]:
+ hackto_topics[shortname] = True # Cache result in the dict for later use if needed
+ outlist.append(itemdict)
+ return outlist
+
+ async def _fetch_url(self, url: str, headers: dict, params: dict) -> dict:
+ """Retrieve API response from URL."""
+ async with self.bot.http_session.get(url, headers=headers, params=params) as resp:
+ return await resp.json()
+
+ @staticmethod
+ def _has_label(pr: dict, labels: Union[list[str], str]) -> bool:
+ """
+ Check if a PR has label 'labels'.
+
+ 'labels' can be a string or a list of strings, if it's a list of strings
+ it will return true if any of the labels match.
+ """
+ if not pr.get("labels"): # if PR has no labels
+ return False
+ if isinstance(labels, str) and any(label["name"].casefold() == labels for label in pr["labels"]):
+ return True
+ for item in labels:
+ if any(label["name"].casefold() == item for label in pr["labels"]):
+ return True
+ return False
+
+ async def _is_accepted(self, pr: dict) -> bool:
+ """Check if a PR is merged, approved, or labelled hacktoberfest-accepted."""
+ # checking for merge status
+ query_url = f"https://api.github.com/repos/{pr['repo_shortname']}/pulls/{pr['number']}"
+ jsonresp = await self._fetch_url(query_url, REQUEST_HEADERS)
+
+ if message := jsonresp.get("message"):
+ log.error(f"Error fetching PR stats for #{pr['number']} in repo {pr['repo_shortname']}:\n{message}")
+ return False
+
+ if jsonresp.get("merged"):
+ return True
+
+ # checking for the label, using `jsonresp` which has the label information
+ if self._has_label(jsonresp, "hacktoberfest-accepted"):
+ return True
+
+ # checking approval
+ query_url += "/reviews"
+ jsonresp2 = await self._fetch_url(query_url, REQUEST_HEADERS)
+ if isinstance(jsonresp2, dict):
+ # if API request is unsuccessful it will be a dict with the error in 'message'
+ log.error(
+ f"Error fetching PR reviews for #{pr['number']} in repo {pr['repo_shortname']}:\n"
+ f"{jsonresp2['message']}"
+ )
+ return False
+ # if it is successful it will be a list instead of a dict
+ if len(jsonresp2) == 0: # if PR has no reviews
+ return False
+
+ # loop through reviews and check for approval
+ for item in jsonresp2:
+ if item.get("status") == "APPROVED":
+ return True
+ return False
+
+ @staticmethod
+ def _get_shortname(in_url: str) -> str:
+ """
+ Extract shortname from https://api.github.com/repos/* URL.
+
+ e.g. "https://api.github.com/repos/python-discord/sir-lancebot"
+ |
+ V
+ "python-discord/sir-lancebot"
+ """
+ exp = r"https?:\/\/api.github.com\/repos\/([/\-\_\.\w]+)"
+ return re.findall(exp, in_url)[0]
+
+ async def _categorize_prs(self, prs: list[dict]) -> tuple:
+ """
+ Categorize PRs into 'in_review' and 'accepted' and returns as a tuple.
+
+ PRs created less than 14 days ago are 'in_review', PRs that are not
+ are 'accepted' (after 14 days review period).
+
+ PRs that are accepted must either be merged, approved, or labelled
+ 'hacktoberfest-accepted.
+ """
+ now = datetime.now()
+ oct3 = datetime(CURRENT_YEAR, 10, 3, 23, 59, 59, tzinfo=None)
+ in_review = []
+ accepted = []
+ for pr in prs:
+ if (pr["created_at"] + timedelta(REVIEW_DAYS)) > now:
+ in_review.append(pr)
+ elif (pr["created_at"] <= oct3) or await self._is_accepted(pr):
+ accepted.append(pr)
+
+ return in_review, accepted
+
+ @staticmethod
+ def _build_prs_string(prs: list[tuple], user: str) -> str:
+ """
+ Builds a discord embed compatible string for a list of PRs.
+
+ Repository name with the link to pull requests authored by 'user' for
+ each PR.
+ """
+ base_url = "https://www.github.com/"
+ str_list = []
+ repo_list = [pr["repo_shortname"] for pr in prs]
+ prs_list = Counter(repo_list).most_common(5) # get first 5 counted PRs
+ more = len(prs) - sum(i[1] for i in prs_list)
+
+ for pr in prs_list:
+ # for example: https://www.github.com/python-discord/bot/pulls/octocat
+ # will display pull requests authored by octocat.
+ # pr[1] is the number of PRs to the repo
+ string = f"{pr[1]} to [{pr[0]}]({base_url}{pr[0]}/pulls/{user})"
+ str_list.append(string)
+ if more:
+ str_list.append(f"...and {more} more")
+
+ return "\n".join(str_list)
+
+ @staticmethod
+ def _contributionator(n: int) -> str:
+ """Return "contribution" or "contributions" based on the value of n."""
+ if n == 1:
+ return "contribution"
+ else:
+ return "contributions"
+
+ @staticmethod
+ def _author_mention_from_context(ctx: commands.Context) -> tuple[str, str]:
+ """Return stringified Message author ID and mentionable string from commands.Context."""
+ author_id = str(ctx.author.id)
+ author_mention = ctx.author.mention
+
+ return author_id, author_mention
+
+
+def setup(bot: Bot) -> None:
+ """Load the Hacktober Stats Cog."""
+ bot.add_cog(HacktoberStats(bot))
diff --git a/bot/exts/events/hacktoberfest/timeleft.py b/bot/exts/events/hacktoberfest/timeleft.py
new file mode 100644
index 00000000..55109599
--- /dev/null
+++ b/bot/exts/events/hacktoberfest/timeleft.py
@@ -0,0 +1,67 @@
+import logging
+from datetime import datetime
+
+from discord.ext import commands
+
+from bot.bot import Bot
+
+log = logging.getLogger(__name__)
+
+
+class TimeLeft(commands.Cog):
+ """A Cog that tells you how long left until Hacktober is over!"""
+
+ def in_hacktober(self) -> bool:
+ """Return True if the current time is within Hacktoberfest."""
+ _, end, start = self.load_date()
+
+ now = datetime.utcnow()
+
+ return start <= now <= end
+
+ @staticmethod
+ def load_date() -> tuple[datetime, datetime, datetime]:
+ """Return of a tuple of the current time and the end and start times of the next October."""
+ now = datetime.utcnow()
+ year = now.year
+ if now.month > 10:
+ year += 1
+ end = datetime(year, 11, 1, 12) # November 1st 12:00 (UTC-12:00)
+ start = datetime(year, 9, 30, 10) # September 30th 10:00 (UTC+14:00)
+ return now, end, start
+
+ @commands.command()
+ async def timeleft(self, ctx: commands.Context) -> None:
+ """
+ Calculates the time left until the end of Hacktober.
+
+ Whilst in October, displays the days, hours and minutes left.
+ Only displays the days left until the beginning and end whilst in a different month.
+
+ This factors in that Hacktoberfest starts when it is October anywhere in the world
+ and ends with the same rules. It treats the start as UTC+14:00 and the end as
+ UTC-12.
+ """
+ now, end, start = self.load_date()
+ diff = end - now
+ days, seconds = diff.days, diff.seconds
+ if self.in_hacktober():
+ minutes = seconds // 60
+ hours, minutes = divmod(minutes, 60)
+
+ await ctx.send(
+ f"There are {days} days, {hours} hours and {minutes}"
+ f" minutes left until the end of Hacktober."
+ )
+ else:
+ start_diff = start - now
+ start_days = start_diff.days
+ await ctx.send(
+ f"It is not currently Hacktober. However, the next one will start in {start_days} days "
+ f"and will finish in {days} days."
+ )
+
+
+def setup(bot: Bot) -> None:
+ """Load the Time Left Cog."""
+ bot.add_cog(TimeLeft())