aboutsummaryrefslogtreecommitdiffstats
path: root/kubernetes/scripts/create_x509_user_config.py
blob: 38d7cbd423ee12e1765f57e6dce535768a28982d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3

import argparse
import base64
import shutil
import subprocess
import tempfile
from pathlib import Path

parser = argparse.ArgumentParser()
parser.add_argument("user", help="The name to give the created user")
parser.add_argument("--role_binding", help="The role binding to add this user to", default="devops-reader")
args = parser.parse_args()
user = args.user
role_binding = args.role_binding

CERT_REQUEST_TEMPLATE = """apiVersion: certificates.k8s.io/v1
kind: CertificateSigningRequest
metadata:
  name: {user}
spec:
  groups:
  - system:authenticated
  request: {req}
  signerName: kubernetes.io/kube-apiserver-client
  usages:
  - client auth
"""
ROLE_BINDING_PATCH_TEMPLATE = """[
    {{
        "op": "add",
        "path": "/subjects/0",
        "value": {{
            "kind": "User",
            "name": "{user}"
        }}
    }}
]
"""


def run_and_return_output(command: str, cwd: str | None = None) -> str:
    """Run a command in a shell and return the result as a string."""
    return subprocess.run(  # noqa: S602
        command,
        shell=True,
        stdout=subprocess.PIPE,
        text=True,
        check=True,
        cwd=cwd,
    ).stdout


def create_cert_signing_request(tmpdir: str) -> str:
    """Create and return a CertificateSigningRequest."""
    run_and_return_output("openssl genrsa -out priv.key 2048", tmpdir)
    cert_req = run_and_return_output(
        f"openssl req -new -key priv.key -subj /C=GB/ST=GB/O=pydis/CN={user}",
        tmpdir,
    )
    return base64.b64encode(cert_req.encode("utf-8")).decode("utf-8").replace("\n", "")


def approve_cert_signing_request(csr: str, tmpdir: str) -> None:
    """Approve the csr and save as a file in tmpdir."""
    csr_path = Path(tmpdir, "csr.yml")
    with csr_path.open("w") as f:
        f.write(CERT_REQUEST_TEMPLATE.format(user=user, req=csr))
    run_and_return_output(f"kubectl apply -f {csr_path}", tmpdir)
    run_and_return_output(f"kubectl certificate approve {user}")
    approved_cert = run_and_return_output(f"kubectl get csr {user} -o jsonpath='{{.status.certificate}}'")
    with Path(tmpdir, f"{user}.crt").open("w", encoding="locale") as f:
        f.write(base64.b64decode(approved_cert).decode("utf-8"))
    run_and_return_output(f"kubectl delete csr {user}")


def give_user_perms(tmpdir: str) -> None:
    """Add the user from args to the cluster role binding from the args."""
    role_binding_patch_path = Path(tmpdir, "binding_patch.json")
    with role_binding_patch_path.open("w") as f:
        f.write(ROLE_BINDING_PATCH_TEMPLATE.format(user=user))
    run_and_return_output(
        f"kubectl patch clusterrolebinding {role_binding} --type=json --patch-file {role_binding_patch_path}",
        tmpdir,
    )


def build_kubectl_config(tmpdir: str) -> None:
    """Build up a kubectl config from all the files in the tmpdir."""
    cluster_public_key = run_and_return_output(r"kubectl get cm kube-root-ca.crt -o jsonpath={.data.ca\.crt}")
    with Path(tmpdir, "ca.crt").open("w", encoding="locale") as f:
        f.write(cluster_public_key)
    cluster_url = run_and_return_output("kubectl config view --minify --output jsonpath={.clusters[*].cluster.server}")

    run_and_return_output(
        f"kubectl config set-cluster pydis --kubeconfig={user}.config "
        f"--server={cluster_url} --certificate-authority=ca.crt --embed-certs=true",
        tmpdir,
    )
    run_and_return_output(
        f"kubectl config set-credentials {user} "
        f"--client-key=priv.key --client-certificate={user}.crt "
        f"--embed-certs=true --kubeconfig={user}.config",
        tmpdir,
    )
    run_and_return_output(
        f"kubectl config set-context pydis --kubeconfig={user}.config "
        f"--cluster=pydis --user={user} --namespace=default",
        tmpdir,
    )
    run_and_return_output(f"kubectl config use-context pydis --kubeconfig={user}.config", tmpdir)


with tempfile.TemporaryDirectory() as tmpdir:
    csr = create_cert_signing_request(tmpdir)
    approve_cert_signing_request(csr, tmpdir)
    give_user_perms(tmpdir)
    build_kubectl_config(tmpdir)
    # Move the kubectl config from tmpdir to the current working dir ready to be sent to user.
    shutil.copy(
        Path(tmpdir, f"{user}.config"),  # from
        Path(f"{user}.config"),  # to
    )
    print(f"Config generated. Saved to {user}.config in current directory.")  # noqa: T201