aboutsummaryrefslogtreecommitdiffstats
path: root/pydis_site/apps/api/github_utils.py
blob: 70dccdffc1968f90907aed93ceb38ba314bc55e0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""Utilities for working with the GitHub API."""

import asyncio
import datetime
import math

import httpx
import jwt
from asgiref.sync import async_to_sync

from pydis_site import settings

MAX_POLLS = 20
"""The maximum number of attempts at fetching a workflow run."""


class ArtifactProcessingError(Exception):
    """Base exception for other errors related to processing a GitHub artifact."""

    status: int


class UnauthorizedError(ArtifactProcessingError):
    """The application does not have permission to access the requested repo."""

    status = 401


class NotFoundError(ArtifactProcessingError):
    """The requested resource could not be found."""

    status = 404


class ActionFailedError(ArtifactProcessingError):
    """The requested workflow did not conclude successfully."""

    status = 400


class RunTimeoutError(ArtifactProcessingError):
    """The requested workflow run was not ready in time."""

    status = 408


def generate_token() -> str:
    """
    Generate a JWT token to access the GitHub API.

    The token is valid for roughly 10 minutes after generation, before the API starts
    returning 401s.

    Refer to:
    https://docs.github.com/en/developers/apps/building-github-apps/authenticating-with-github-apps#authenticating-as-a-github-app
    """
    now = datetime.datetime.now()
    return jwt.encode(
        {
            "iat": math.floor((now - datetime.timedelta(seconds=60)).timestamp()),  # Issued at
            "exp": math.floor((now + datetime.timedelta(minutes=9)).timestamp()),  # Expires at
            "iss": settings.GITHUB_OAUTH_APP_ID,
        },
        settings.GITHUB_OAUTH_KEY,
        algorithm="RS256"
    )


async def authorize(owner: str, repo: str) -> httpx.AsyncClient:
    """
    Get an access token for the requested repository.

    The process is roughly:
        - GET app/installations to get a list of all app installations
        - POST <app_access_token> to get a token to access the given app
        - GET installation/repositories and check if the requested one is part of those
    """
    client = httpx.AsyncClient(
        base_url=settings.GITHUB_API,
        headers={"Authorization": f"bearer {generate_token()}"},
        timeout=settings.TIMEOUT_PERIOD,
    )

    try:
        # Get a list of app installations we have access to
        apps = await client.get("app/installations")
        apps.raise_for_status()

        for app in apps.json():
            # Look for an installation with the right owner
            if app["account"]["login"] != owner:
                continue

            # Get the repositories of the specified owner
            app_token = await client.post(app["access_tokens_url"])
            app_token.raise_for_status()
            client.headers["Authorization"] = f"bearer {app_token.json()['token']}"

            repos = await client.get("installation/repositories")
            repos.raise_for_status()

            # Search for the request repository
            for accessible_repo in repos.json()["repositories"]:
                if accessible_repo["name"] == repo:
                    # We've found the correct repository, and it's accessible with the current auth
                    return client

        raise NotFoundError(
            "Could not find the requested repository. Make sure the application can access it."
        )

    except BaseException as e:
        # Close the client if we encountered an unexpected exception
        await client.aclose()
        raise e


async def wait_for_run(client: httpx.AsyncClient, run: dict) -> str:
    """Wait for the provided `run` to finish, and return the URL to its artifacts."""
    polls = 0
    while polls <= MAX_POLLS:
        if run["status"] != "completed":
            # The action is still processing, wait a bit longer
            polls += 1
            await asyncio.sleep(10)

        elif run["conclusion"] != "success":
            # The action failed, or did not run
            raise ActionFailedError(f"The requested workflow ended with: {run['conclusion']}")

        else:
            # The desired action was found, and it ended successfully
            return run["artifacts_url"]

        run = await client.get(run["url"])
        run.raise_for_status()
        run = run.json()

    raise RunTimeoutError("The requested workflow was not ready in time.")


@async_to_sync
async def get_artifact(
    owner: str, repo: str, sha: str, action_name: str, artifact_name: str
) -> str:
    """Get a download URL for a build artifact."""
    client = await authorize(owner, repo)

    try:
        # Get the workflow runs for this repository
        runs = await client.get(f"/repos/{owner}/{repo}/actions/runs", params={"per_page": 100})
        runs.raise_for_status()
        runs = runs.json()

        # Filter the runs for the one associated with the given SHA
        for run in runs["workflow_runs"]:
            if run["name"] == action_name and sha == run["head_sha"]:
                break
        else:
            raise NotFoundError(
                "Could not find a run matching the provided settings in the previous hundred runs."
            )

        # Wait for the workflow to finish
        url = await wait_for_run(client, run)

        # Filter the artifacts, and return the download URL
        artifacts = await client.get(url)
        artifacts.raise_for_status()

        for artifact in artifacts.json()["artifacts"]:
            if artifact["name"] == artifact_name:
                data = await client.get(artifact["archive_download_url"])
                if data.status_code == 302:
                    return str(data.next_request.url)

                # The following line is left untested since it should in theory be impossible
                data.raise_for_status()  # pragma: no cover

        raise NotFoundError("Could not find an artifact matching the provided name.")

    finally:
        await client.aclose()