import logging import re from collections import Counter from datetime import datetime, timedelta from typing import List, Tuple, Union import aiohttp import discord from async_rediscache import RedisCache from discord.ext import commands from bot.constants import Channels, Month, Tokens, WHITELISTED_CHANNELS from bot.utils.decorators import in_month, override_in_channel log = logging.getLogger(__name__) CURRENT_YEAR = datetime.now().year # Used to construct GH API query PRS_FOR_SHIRT = 4 # Minimum number of PRs before a shirt is awarded REVIEW_DAYS = 14 # number of days needed after PR can be mature HACKTOBER_WHITELIST = WHITELISTED_CHANNELS + (Channels.hacktoberfest_2020,) REQUEST_HEADERS = {"User-Agent": "Python Discord Hacktoberbot"} # using repo topics API during preview period requires an accept header GITHUB_TOPICS_ACCEPT_HEADER = {"Accept": "application/vnd.github.mercy-preview+json"} if GITHUB_TOKEN := Tokens.github: REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" GITHUB_TOPICS_ACCEPT_HEADER["Authorization"] = f"token {GITHUB_TOKEN}" GITHUB_NONEXISTENT_USER_MESSAGE = ( "The listed users cannot be searched either because the users do not exist " "or you do not have permission to view the users." ) class HacktoberStats(commands.Cog): """Hacktoberfest statistics Cog.""" # Stores mapping of user IDs and GitHub usernames linked_accounts = RedisCache() def __init__(self, bot: commands.Bot): self.bot = bot @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True) @override_in_channel(HACKTOBER_WHITELIST) async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None) -> None: """ Display an embed for a user's Hacktoberfest contributions. If invoked without a subcommand or github_username, get the invoking user's stats if they've linked their Discord name to GitHub using .stats link. If invoked with a github_username, get that user's contributions """ if not github_username: author_id, author_mention = self._author_mention_from_context(ctx) if await self.linked_accounts.contains(author_id): github_username = await self.linked_accounts.get(author_id) logging.info(f"Getting stats for {author_id} linked GitHub account '{github_username}'") else: msg = ( f"{author_mention}, you have not linked a GitHub account\n\n" f"You can link your GitHub account using:\n```{ctx.prefix}hackstats link github_username```\n" f"Or query GitHub stats directly using:\n```{ctx.prefix}hackstats github_username```" ) await ctx.send(msg) return await self.get_stats(ctx, github_username) @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) @hacktoberstats_group.command(name="link") @override_in_channel(HACKTOBER_WHITELIST) async def link_user(self, ctx: commands.Context, github_username: str = None) -> None: """ Link the invoking user's Github github_username to their Discord ID. Linked users are stored in Redis: User ID => GitHub Username. """ author_id, author_mention = self._author_mention_from_context(ctx) if github_username: if await self.linked_accounts.contains(author_id): old_username = await self.linked_accounts.get(author_id) logging.info(f"{author_id} has changed their github link from '{old_username}' to '{github_username}'") await ctx.send(f"{author_mention}, your GitHub username has been updated to: '{github_username}'") else: logging.info(f"{author_id} has added a github link to '{github_username}'") await ctx.send(f"{author_mention}, your GitHub username has been added") await self.linked_accounts.set(author_id, github_username) else: logging.info(f"{author_id} tried to link a GitHub account but didn't provide a username") await ctx.send(f"{author_mention}, a GitHub username is required to link your account") @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER) @hacktoberstats_group.command(name="unlink") @override_in_channel(HACKTOBER_WHITELIST) async def unlink_user(self, ctx: commands.Context) -> None: """Remove the invoking user's account link from the log.""" author_id, author_mention = self._author_mention_from_context(ctx) stored_user = await self.linked_accounts.pop(author_id, None) if stored_user: await ctx.send(f"{author_mention}, your GitHub profile has been unlinked") logging.info(f"{author_id} has unlinked their GitHub account") else: await ctx.send(f"{author_mention}, you do not currently have a linked GitHub account") logging.info(f"{author_id} tried to unlink their GitHub account but no account was linked") async def get_stats(self, ctx: commands.Context, github_username: str) -> None: """ Query GitHub's API for PRs created by a GitHub user during the month of October. PRs with an 'invalid' or 'spam' label are ignored For PRs created after October 3rd, they have to be in a repository that has a 'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it to count. If a valid github_username is provided, an embed is generated and posted to the channel Otherwise, post a helpful error message """ async with ctx.typing(): prs = await self.get_october_prs(github_username) if prs: stats_embed = await self.build_embed(github_username, prs) await ctx.send('Here are some stats!', embed=stats_embed) else: await ctx.send(f"No valid October GitHub contributions found for '{github_username}'") async def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed: """Return a stats embed built from github_username's PRs.""" logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'") in_review, accepted = await self._categorize_prs(prs) n = len(accepted) + len(in_review) # total number of PRs if n >= PRS_FOR_SHIRT: shirtstr = f"**{github_username} is eligible for a T-shirt or a tree!**" elif n == PRS_FOR_SHIRT - 1: shirtstr = f"**{github_username} is 1 PR away from a T-shirt or a tree!**" else: shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a T-shirt or a tree!**" stats_embed = discord.Embed( title=f"{github_username}'s Hacktoberfest", color=discord.Color(0x9c4af7), description=( f"{github_username} has made {n} valid " f"{self._contributionator(n)} in " f"October\n\n" f"{shirtstr}\n\n" ) ) stats_embed.set_thumbnail(url=f"https://www.github.com/{github_username}.png") stats_embed.set_author( name="Hacktoberfest", url="https://hacktoberfest.digitalocean.com", icon_url="https://avatars1.githubusercontent.com/u/35706162?s=200&v=4" ) # this will handle when no PRs in_review or accepted review_str = self._build_prs_string(in_review, github_username) or "None" accepted_str = self._build_prs_string(accepted, github_username) or "None" stats_embed.add_field( name=":clock1: In Review", value=review_str ) stats_embed.add_field( name=":tada: Accepted", value=accepted_str ) logging.info(f"Hacktoberfest PR built for GitHub user '{github_username}'") return stats_embed @staticmethod async def get_october_prs(github_username: str) -> Union[List[dict], None]: """ Query GitHub's API for PRs created during the month of October by github_username. PRs with an 'invalid' or 'spam' label are ignored unless it is merged or approved For PRs created after October 3rd, they have to be in a repository that has a 'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it to count. If PRs are found, return a list of dicts with basic PR information For each PR: { "repo_url": str "repo_shortname": str (e.g. "python-discord/sir-lancebot") "created_at": datetime.datetime "number": int } Otherwise, return None """ logging.info(f"Fetching Hacktoberfest Stats for GitHub user: '{github_username}'") base_url = "https://api.github.com/search/issues?q=" action_type = "pr" is_query = "public" not_query = "draft" date_range = f"{CURRENT_YEAR}-09-30T10:00Z..{CURRENT_YEAR}-11-01T12:00Z" per_page = "300" query_url = ( f"{base_url}" f"+type:{action_type}" f"+is:{is_query}" f"+author:{github_username}" f"+-is:{not_query}" f"+created:{date_range}" f"&per_page={per_page}" ) logging.debug(f"GitHub query URL generated: {query_url}") jsonresp = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) if "message" in jsonresp.keys(): # One of the parameters is invalid, short circuit for now api_message = jsonresp["errors"][0]["message"] # Ignore logging non-existent users or users we do not have permission to see if api_message == GITHUB_NONEXISTENT_USER_MESSAGE: logging.debug(f"No GitHub user found named '{github_username}'") else: logging.error(f"GitHub API request for '{github_username}' failed with message: {api_message}") return if jsonresp["total_count"] == 0: # Short circuit if there aren't any PRs logging.info(f"No Hacktoberfest PRs found for GitHub user: '{github_username}'") return logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'") outlist = [] # list of pr information dicts that will get returned oct3 = datetime(int(CURRENT_YEAR), 10, 3, 23, 59, 59, tzinfo=None) hackto_topics = {} # cache whether each repo has the appropriate topic (bool values) for item in jsonresp["items"]: shortname = HacktoberStats._get_shortname(item["repository_url"]) itemdict = { "repo_url": f"https://www.github.com/{shortname}", "repo_shortname": shortname, "created_at": datetime.strptime( item["created_at"], r"%Y-%m-%dT%H:%M:%SZ" ), "number": item["number"] } # if the PR has 'invalid' or 'spam' labels, the PR must be # either merged or approved for it to be included if HacktoberStats._has_label(item, ["invalid", "spam"]): if not await HacktoberStats._is_accepted(itemdict): continue # PRs before oct 3 no need to check for topics # continue the loop if 'hacktoberfest-accepted' is labelled then # there is no need to check for its topics if itemdict["created_at"] < oct3: outlist.append(itemdict) continue # checking PR's labels for "hacktoberfest-accepted" if HacktoberStats._has_label(item, "hacktoberfest-accepted"): outlist.append(itemdict) continue # no need to query github if repo topics are fetched before already if shortname in hackto_topics.keys(): if hackto_topics[shortname]: outlist.append(itemdict) continue # fetch topics for the pr repo topics_query_url = f"https://api.github.com/repos/{shortname}/topics" logging.debug(f"Fetching repo topics for {shortname} with url: {topics_query_url}") jsonresp2 = await HacktoberStats._fetch_url(topics_query_url, GITHUB_TOPICS_ACCEPT_HEADER) if jsonresp2.get("names") is None: logging.error(f"Error fetching topics for {shortname}: {jsonresp2['message']}") return # PRs after oct 3 that doesn't have 'hacktoberfest-accepted' label # must be in repo with 'hacktoberfest' topic if "hacktoberfest" in jsonresp2["names"]: hackto_topics[shortname] = True # cache result in the dict for later use if needed outlist.append(itemdict) return outlist @staticmethod async def _fetch_url(url: str, headers: dict) -> dict: """Retrieve API response from URL.""" async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as resp: jsonresp = await resp.json() return jsonresp @staticmethod def _has_label(pr: dict, labels: Union[List[str], str]) -> bool: """ Check if a PR has label 'labels'. 'labels' can be a string or a list of strings, if it's a list of strings it will return true if any of the labels match. """ if not pr.get("labels"): # if PR has no labels return False if (isinstance(labels, str)) and (any(label["name"].casefold() == labels for label in pr["labels"])): return True for item in labels: if any(label["name"].casefold() == item for label in pr["labels"]): return True return False @staticmethod async def _is_accepted(pr: dict) -> bool: """Check if a PR is merged, approved, or labelled hacktoberfest-accepted.""" # checking for merge status query_url = f"https://api.github.com/repos/{pr['repo_shortname']}/pulls/" query_url += str(pr["number"]) jsonresp = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) if "message" in jsonresp.keys(): logging.error( f"Error fetching PR stats for #{pr['number']} in repo {pr['repo_shortname']}:\n" f"{jsonresp['message']}" ) return False if ("merged" in jsonresp.keys()) and jsonresp["merged"]: return True # checking for the label, using `jsonresp` which has the label information if HacktoberStats._has_label(jsonresp, "hacktoberfest-accepted"): return True # checking approval query_url += "/reviews" jsonresp2 = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) if isinstance(jsonresp2, dict): # if API request is unsuccessful it will be a dict with the error in 'message' logging.error( f"Error fetching PR reviews for #{pr['number']} in repo {pr['repo_shortname']}:\n" f"{jsonresp2['message']}" ) return False # if it is successful it will be a list instead of a dict if len(jsonresp2) == 0: # if PR has no reviews return False # loop through reviews and check for approval for item in jsonresp2: if "status" in item.keys(): if item['status'] == "APPROVED": return True return False @staticmethod def _get_shortname(in_url: str) -> str: """ Extract shortname from https://api.github.com/repos/* URL. e.g. "https://api.github.com/repos/python-discord/sir-lancebot" | V "python-discord/sir-lancebot" """ exp = r"https?:\/\/api.github.com\/repos\/([/\-\_\.\w]+)" return re.findall(exp, in_url)[0] @staticmethod async def _categorize_prs(prs: List[dict]) -> tuple: """ Categorize PRs into 'in_review' and 'accepted' and returns as a tuple. PRs created less than 14 days ago are 'in_review', PRs that are not are 'accepted' (after 14 days review period). PRs that are accepted must either be merged, approved, or labelled 'hacktoberfest-accepted. """ now = datetime.now() oct3 = datetime(CURRENT_YEAR, 10, 3, 23, 59, 59, tzinfo=None) in_review = [] accepted = [] for pr in prs: if (pr['created_at'] + timedelta(REVIEW_DAYS)) > now: in_review.append(pr) elif (pr['created_at'] <= oct3) or await HacktoberStats._is_accepted(pr): accepted.append(pr) return in_review, accepted @staticmethod def _build_prs_string(prs: List[tuple], user: str) -> str: """ Builds a discord embed compatible string for a list of PRs. Repository name with the link to pull requests authored by 'user' for each PR. """ base_url = "https://www.github.com/" str_list = [] repo_list = [pr["repo_shortname"] for pr in prs] prs_list = Counter(repo_list).most_common(5) # get first 5 counted PRs more = len(prs) - sum(i[1] for i in prs_list) for pr in prs_list: # for example: https://www.github.com/python-discord/bot/pulls/octocat # will display pull requests authored by octocat. # pr[1] is the number of PRs to the repo string = f"{pr[1]} to [{pr[0]}]({base_url}{pr[0]}/pulls/{user})" str_list.append(string) if more: str_list.append(f"...and {more} more") return "\n".join(str_list) @staticmethod def _contributionator(n: int) -> str: """Return "contribution" or "contributions" based on the value of n.""" if n == 1: return "contribution" else: return "contributions" @staticmethod def _author_mention_from_context(ctx: commands.Context) -> Tuple: """Return stringified Message author ID and mentionable string from commands.Context.""" author_id = str(ctx.message.author.id) author_mention = ctx.message.author.mention return author_id, author_mention def setup(bot: commands.Bot) -> None: """Hacktoberstats Cog load.""" bot.add_cog(HacktoberStats(bot))