"""Utilities for working with the GitHub API.""" import asyncio import datetime import math import httpx import jwt from asgiref.sync import async_to_sync from pydis_site import settings MAX_POLLS = 20 """The maximum number of attempts at fetching a workflow run.""" class ArtifactProcessingError(Exception): """Base exception for other errors related to processing a GitHub artifact.""" status: int class UnauthorizedError(ArtifactProcessingError): """The application does not have permission to access the requested repo.""" status = 401 class NotFoundError(ArtifactProcessingError): """The requested resource could not be found.""" status = 404 class ActionFailedError(ArtifactProcessingError): """The requested workflow did not conclude successfully.""" status = 400 class RunTimeoutError(ArtifactProcessingError): """The requested workflow run was not ready in time.""" status = 408 def generate_token() -> str: """ Generate a JWT token to access the GitHub API. The token is valid for roughly 10 minutes after generation, before the API starts returning 401s. Refer to: https://docs.github.com/en/developers/apps/building-github-apps/authenticating-with-github-apps#authenticating-as-a-github-app """ now = datetime.datetime.now() return jwt.encode( { "iat": math.floor((now - datetime.timedelta(seconds=60)).timestamp()), # Issued at "exp": math.floor((now + datetime.timedelta(minutes=9)).timestamp()), # Expires at "iss": settings.GITHUB_OAUTH_APP_ID, }, settings.GITHUB_OAUTH_KEY, algorithm="RS256" ) async def authorize(owner: str, repo: str) -> httpx.AsyncClient: """ Get an access token for the requested repository. The process is roughly: - GET app/installations to get a list of all app installations - POST to get a token to access the given app - GET installation/repositories and check if the requested one is part of those """ client = httpx.AsyncClient( base_url=settings.GITHUB_API, headers={"Authorization": f"bearer {generate_token()}"}, timeout=settings.TIMEOUT_PERIOD, ) try: # Get a list of app installations we have access to apps = await client.get("app/installations") apps.raise_for_status() for app in apps.json(): # Look for an installation with the right owner if app["account"]["login"] != owner: continue # Get the repositories of the specified owner app_token = await client.post(app["access_tokens_url"]) app_token.raise_for_status() client.headers["Authorization"] = f"bearer {app_token.json()['token']}" repos = await client.get("installation/repositories") repos.raise_for_status() # Search for the request repository for accessible_repo in repos.json()["repositories"]: if accessible_repo["name"] == repo: # We've found the correct repository, and it's accessible with the current auth return client raise NotFoundError( "Could not find the requested repository. Make sure the application can access it." ) except BaseException as e: # Close the client if we encountered an unexpected exception await client.aclose() raise e async def wait_for_run(client: httpx.AsyncClient, run: dict) -> str: """Wait for the provided `run` to finish, and return the URL to its artifacts.""" polls = 0 while polls <= MAX_POLLS: if run["status"] != "completed": # The action is still processing, wait a bit longer polls += 1 await asyncio.sleep(10) elif run["conclusion"] != "success": # The action failed, or did not run raise ActionFailedError(f"The requested workflow ended with: {run['conclusion']}") else: # The desired action was found, and it ended successfully return run["artifacts_url"] run = await client.get(run["url"]) run.raise_for_status() run = run.json() raise RunTimeoutError("The requested workflow was not ready in time.") @async_to_sync async def get_artifact( owner: str, repo: str, sha: str, action_name: str, artifact_name: str ) -> str: """Get a download URL for a build artifact.""" client = await authorize(owner, repo) try: # Get the workflow runs for this repository runs = await client.get(f"/repos/{owner}/{repo}/actions/runs", params={"per_page": 100}) runs.raise_for_status() runs = runs.json() # Filter the runs for the one associated with the given SHA for run in runs["workflow_runs"]: if run["name"] == action_name and sha == run["head_sha"]: break else: raise NotFoundError( "Could not find a run matching the provided settings in the previous hundred runs." ) # Wait for the workflow to finish url = await wait_for_run(client, run) # Filter the artifacts, and return the download URL artifacts = await client.get(url) artifacts.raise_for_status() for artifact in artifacts.json()["artifacts"]: if artifact["name"] == artifact_name: data = await client.get(artifact["archive_download_url"]) if data.status_code == 302: return str(data.next_request.url) # The following line is left untested since it should in theory be impossible data.raise_for_status() # pragma: no cover raise NotFoundError("Could not find an artifact matching the provided name.") finally: await client.aclose()