diff options
Diffstat (limited to 'pydis_site/apps/content/utils.py')
-rw-r--r-- | pydis_site/apps/content/utils.py | 304 |
1 files changed, 299 insertions, 5 deletions
diff --git a/pydis_site/apps/content/utils.py b/pydis_site/apps/content/utils.py index d3f270ff..cfd73d67 100644 --- a/pydis_site/apps/content/utils.py +++ b/pydis_site/apps/content/utils.py @@ -1,14 +1,44 @@ +import datetime +import functools +import json +import logging +import tarfile +import tempfile +from http import HTTPStatus +from io import BytesIO from pathlib import Path -from typing import Dict, Tuple import frontmatter +import httpx import markdown import yaml from django.http import Http404 +from django.utils import timezone from markdown.extensions.toc import TocExtension +from pydis_site import settings +from .models import Commit, Tag -def get_category(path: Path) -> Dict[str, str]: +TAG_CACHE_TTL = datetime.timedelta(hours=1) +log = logging.getLogger(__name__) + + +def github_client(**kwargs) -> httpx.Client: + """Get a client to access the GitHub API with important settings pre-configured.""" + client = httpx.Client( + base_url=settings.GITHUB_API, + follow_redirects=True, + timeout=settings.TIMEOUT_PERIOD, + **kwargs + ) + if settings.GITHUB_TOKEN: # pragma: no cover + if not client.headers.get("Authorization"): + client.headers = {"Authorization": f"token {settings.GITHUB_TOKEN}"} + + return client + + +def get_category(path: Path) -> dict[str, str]: """Load category information by name from _info.yml.""" if not path.is_dir(): raise Http404("Category not found.") @@ -16,7 +46,7 @@ def get_category(path: Path) -> Dict[str, str]: return yaml.safe_load(path.joinpath("_info.yml").read_text(encoding="utf-8")) -def get_categories(path: Path) -> Dict[str, Dict]: +def get_categories(path: Path) -> dict[str, dict]: """Get information for all categories.""" categories = {} @@ -27,8 +57,272 @@ def get_categories(path: Path) -> Dict[str, Dict]: return categories -def get_category_pages(path: Path) -> Dict[str, Dict]: +def get_tags_static() -> list[Tag]: + """ + Fetch tag information in static builds. + + This also includes some fake tags to preview the tag groups feature. + This will return a cached value, so it should only be used for static builds. + """ + tags = fetch_tags() + for tag in tags[3:5]: # pragma: no cover + tag.group = "very-cool-group" + return tags + + +def fetch_tags() -> list[Tag]: + """ + Fetch tag data from the GitHub API. + + The entire repository is downloaded and extracted locally because + getting file content would require one request per file, and can get rate-limited. + """ + with github_client() as client: + # Grab metadata + metadata = client.get("/repos/python-discord/bot/contents/bot/resources") + metadata.raise_for_status() + + hashes = {} + for entry in metadata.json(): + if entry["type"] == "dir": + # Tag group + files = client.get(entry["url"]) + files.raise_for_status() + files = files.json() + else: + files = [entry] + + for file in files: + hashes[file["name"]] = file["sha"] + + # Download the files + tar_file = client.get("/repos/python-discord/bot/tarball") + tar_file.raise_for_status() + + tags = [] + with tempfile.TemporaryDirectory() as folder: + with tarfile.open(fileobj=BytesIO(tar_file.content)) as repo: + included = [] + for file in repo.getmembers(): + if "/bot/resources/tags" in file.path: + included.append(file) + repo.extractall(folder, included) + + for tag_file in Path(folder).rglob("*.md"): + name = tag_file.name + group = None + if tag_file.parent.name != "tags": + # Tags in sub-folders are considered part of a group + group = tag_file.parent.name + + tags.append(Tag( + name=name.removesuffix(".md"), + sha=hashes[name], + group=group, + body=tag_file.read_text(encoding="utf-8"), + last_commit=None, + )) + + return tags + + +def set_tag_commit(tag: Tag) -> None: + """Fetch commit information from the API, and save it for the tag.""" + if settings.STATIC_BUILD: # pragma: no cover + # Static builds request every page during build, which can ratelimit it. + # Instead, we return some fake data. + tag.last_commit = Commit( + sha="68da80efc00d9932a209d5cccd8d344cec0f09ea", + message="Initial Commit\n\nTHIS IS FAKE DEMO DATA", + date=datetime.datetime(2018, 2, 3, 12, 20, 26, tzinfo=datetime.UTC), + authors=json.dumps([{"name": "Joseph", "email": "[email protected]"}]), + ) + return + + path = "/bot/resources/tags" + if tag.group: + path += f"/{tag.group}" + path += f"/{tag.name}.md" + + # Fetch and set the commit + with github_client() as client: + response = client.get("/repos/python-discord/bot/commits", params={"path": path}) + if ( + # We want to hop out early in three cases: + # - We got a forbidden response. (GitHub wrongfully uses this for rate limits.) + # - We got ratelimited. + response.status_code in (HTTPStatus.FORBIDDEN, HTTPStatus.TOO_MANY_REQUESTS) + # - GitHub has unicorn time again and is returning 5xx codes. + or int(response.status_code / 100) == 5 + ): # pragma: no cover + log.warning( + "Received code %d from GitHub for commit history for bot file %r", + response.status_code, path, + ) + # We hop out early because otherwise, these failures may result in the + # overall request to the tag page breaking. + return + + # This should only be permanent issues from here, such as bad requests. + response.raise_for_status() + data = response.json()[0] + + commit = data["commit"] + author, committer = commit["author"], commit["committer"] + + date = ( + datetime.datetime + .strptime(committer["date"], settings.GITHUB_TIMESTAMP_FORMAT) + .replace(tzinfo=datetime.UTC) + ) + + if author["email"] == committer["email"]: + authors = [author] + else: + authors = [author, committer] + + commit_obj, _ = Commit.objects.get_or_create( + sha=data["sha"], + message=commit["message"], + date=date, + authors=json.dumps(authors), + ) + tag.last_commit = commit_obj + tag.save() + + +def record_tags(tags: list[Tag]) -> None: + """Sync the database with an updated set of tags.""" + # Remove entries which no longer exist + Tag.objects.exclude(name__in=[tag.name for tag in tags]).delete() + + # Insert/update the tags + for new_tag in tags: + try: + old_tag = Tag.objects.get(name=new_tag.name) + except Tag.DoesNotExist: + # The tag is not in the database yet, + # pretend it's previous state is the current state + old_tag = new_tag + + if old_tag.sha == new_tag.sha and old_tag.last_commit is not None: + # We still have an up-to-date commit entry + new_tag.last_commit = old_tag.last_commit + + new_tag.save() + + # Drop old, unused commits + Commit.objects.filter(tag__isnull=True).delete() + + +def get_tags() -> list[Tag]: + """Return a list of all tags visible to the application, from the cache or API.""" + if settings.STATIC_BUILD: # pragma: no cover + last_update = None + else: + last_update = ( + Tag.objects.values_list("last_updated", flat=True) + .order_by("last_updated").first() + ) + + if last_update is None or timezone.now() >= (last_update + TAG_CACHE_TTL): + # Stale or empty cache + if settings.STATIC_BUILD: # pragma: no cover + tags = get_tags_static() + else: + tags = fetch_tags() + record_tags(tags) + + return tags + + return list(Tag.objects.all()) + + +def get_tag(path: str, *, skip_sync: bool = False) -> Tag | list[Tag]: + """ + Return a tag based on the search location. + + If certain tag data is out of sync (for instance a commit date is missing), + an extra request will be made to sync the information. + + The tag name and group must match. If only one argument is provided in the path, + it's assumed to either be a group name, or a no-group tag name. + + If it's a group name, a list of tags which belong to it is returned. + """ + path = path.split("/") + if len(path) == 2: + group, name = path + else: + name = path[0] + group = None + + matches = [] + for tag in get_tags(): + if tag.name == name and tag.group == group: + if tag.last_commit is None and not skip_sync: + set_tag_commit(tag) + return tag + elif tag.group == name and group is None: # noqa: RET505 + matches.append(tag) + + if matches: + return matches + + raise Tag.DoesNotExist + + +def get_tag_category(tags: list[Tag] | None = None, *, collapse_groups: bool) -> dict[str, dict]: + """ + Generate context data for `tags`, or all tags if None. + + If `tags` is None, `get_tag` is used to populate the data. + If `collapse_groups` is True, tags with parent groups are not included in the list, + and instead the parent itself is included as a single entry with it's sub-tags + in the description. + """ + if not tags: + tags = get_tags() + + data = [] + groups = {} + + # Create all the metadata for the tags + for tag in tags: + if tag.group is None or not collapse_groups: + content = frontmatter.parse(tag.body)[1] + data.append({ + "title": tag.name, + "description": markdown.markdown(content, extensions=["pymdownx.superfences"]), + "icon": "fas fa-tag", + }) + else: + if tag.group not in groups: + groups[tag.group] = { + "title": tag.group, + "description": [tag.name], + "icon": "fas fa-tags", + } + else: + groups[tag.group]["description"].append(tag.name) + + # Flatten group description into a single string + for group in groups.values(): + # If the following string is updated, make sure to update it in the frontend JS as well + group["description"] = "Contains the following tags: " + ", ".join(group["description"]) + data.append(group) + + # Sort the tags, and return them in the proper format + return {tag["title"]: tag for tag in sorted(data, key=lambda tag: tag["title"].casefold())} + + +def get_category_pages(path: Path) -> dict[str, dict]: """Get all page names and their metadata at a category path.""" + # Special handling for tags + if path == Path(__file__).parent / "resources/tags": + return get_tag_category(collapse_groups=True) + pages = {} for item in path.glob("*.md"): @@ -39,7 +333,7 @@ def get_category_pages(path: Path) -> Dict[str, Dict]: return pages -def get_page(path: Path) -> Tuple[str, Dict]: +def get_page(path: Path) -> tuple[str, dict]: """Get one specific page.""" if not path.is_file(): raise Http404("Page not found.") |