1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
"""Utilities for working with the GitHub API."""
import asyncio
import datetime
import math
import httpx
import jwt
from asgiref.sync import async_to_sync
from pydis_site import settings
MAX_POLLS = 20
"""The maximum number of attempts at fetching a workflow run."""
class ArtifactProcessingError(Exception):
"""Base exception for other errors related to processing a GitHub artifact."""
status: int
class UnauthorizedError(ArtifactProcessingError):
"""The application does not have permission to access the requested repo."""
status = 401
class NotFoundError(ArtifactProcessingError):
"""The requested resource could not be found."""
status = 404
class ActionFailedError(ArtifactProcessingError):
"""The requested workflow did not conclude successfully."""
status = 400
class RunTimeoutError(ArtifactProcessingError):
"""The requested workflow run was not ready in time."""
status = 408
def generate_token() -> str:
"""
Generate a JWT token to access the GitHub API.
The token is valid for roughly 10 minutes after generation, before the API starts
returning 401s.
Refer to:
https://docs.github.com/en/developers/apps/building-github-apps/authenticating-with-github-apps#authenticating-as-a-github-app
"""
now = datetime.datetime.now()
return jwt.encode(
{
"iat": math.floor((now - datetime.timedelta(seconds=60)).timestamp()), # Issued at
"exp": math.floor((now + datetime.timedelta(minutes=9)).timestamp()), # Expires at
"iss": settings.GITHUB_OAUTH_APP_ID,
},
settings.GITHUB_OAUTH_KEY,
algorithm="RS256"
)
async def authorize(owner: str, repo: str) -> httpx.AsyncClient:
"""
Get an access token for the requested repository.
The process is roughly:
- GET app/installations to get a list of all app installations
- POST <app_access_token> to get a token to access the given app
- GET installation/repositories and check if the requested one is part of those
"""
client = httpx.AsyncClient(
base_url=settings.GITHUB_API,
headers={"Authorization": f"bearer {generate_token()}"},
timeout=settings.TIMEOUT_PERIOD,
)
try:
# Get a list of app installations we have access to
apps = await client.get("app/installations")
apps.raise_for_status()
for app in apps.json():
# Look for an installation with the right owner
if app["account"]["login"] != owner:
continue
# Get the repositories of the specified owner
app_token = await client.post(app["access_tokens_url"])
app_token.raise_for_status()
client.headers["Authorization"] = f"bearer {app_token.json()['token']}"
repos = await client.get("installation/repositories")
repos.raise_for_status()
# Search for the request repository
for accessible_repo in repos.json()["repositories"]:
if accessible_repo["name"] == repo:
# We've found the correct repository, and it's accessible with the current auth
return client
raise NotFoundError(
"Could not find the requested repository. Make sure the application can access it."
)
except BaseException as e:
# Close the client if we encountered an unexpected exception
await client.aclose()
raise e
async def wait_for_run(client: httpx.AsyncClient, run: dict) -> str:
"""Wait for the provided `run` to finish, and return the URL to its artifacts."""
polls = 0
while polls <= MAX_POLLS:
if run["status"] != "completed":
# The action is still processing, wait a bit longer
polls += 1
await asyncio.sleep(10)
elif run["conclusion"] != "success":
# The action failed, or did not run
raise ActionFailedError(f"The requested workflow ended with: {run['conclusion']}")
else:
# The desired action was found, and it ended successfully
return run["artifacts_url"]
run = await client.get(run["url"])
run.raise_for_status()
run = run.json()
raise RunTimeoutError("The requested workflow was not ready in time.")
@async_to_sync
async def get_artifact(
owner: str, repo: str, sha: str, action_name: str, artifact_name: str
) -> str:
"""Get a download URL for a build artifact."""
client = await authorize(owner, repo)
try:
# Get the workflow runs for this repository
runs = await client.get(f"/repos/{owner}/{repo}/actions/runs", params={"per_page": 100})
runs.raise_for_status()
runs = runs.json()
# Filter the runs for the one associated with the given SHA
for run in runs["workflow_runs"]:
if run["name"] == action_name and sha == run["head_sha"]:
break
else:
raise NotFoundError(
"Could not find a run matching the provided settings in the previous hundred runs."
)
# Wait for the workflow to finish
url = await wait_for_run(client, run)
# Filter the artifacts, and return the download URL
artifacts = await client.get(url)
artifacts.raise_for_status()
for artifact in artifacts.json()["artifacts"]:
if artifact["name"] == artifact_name:
data = await client.get(artifact["archive_download_url"])
if data.status_code == 302:
return str(data.next_request.url)
# The following line is left untested since it should in theory be impossible
data.raise_for_status() # pragma: no cover
raise NotFoundError("Could not find an artifact matching the provided name.")
finally:
await client.aclose()
|