aboutsummaryrefslogtreecommitdiffstats
path: root/kubernetes/scripts/create_x509_user_config.py
blob: 05a67505a82f244143a44dac205200d82b174dcb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import argparse
import base64
import shutil
import subprocess  # noqa: S404
import tempfile
from pathlib import Path

parser = argparse.ArgumentParser()
parser.add_argument("user", help="The name to give the created user")
parser.add_argument("--role_binding", help="The role binding to add this user to", default="devops-reader")
args = parser.parse_args()
user = args.user
role_binding = args.role_binding

CERT_REQUEST_TEMPLATE = """apiVersion: certificates.k8s.io/v1
kind: CertificateSigningRequest
metadata:
  name: {user}
spec:
  groups:
  - system:authenticated
  request: {req}
  signerName: kubernetes.io/kube-apiserver-client
  usages:
  - client auth
"""
ROLE_BINDING_PATCH_TEMPLATE = """[
    {{
        "op": "add",
        "path": "/subjects/0",
        "value": {{
            "kind": "User",
            "name": "{user}"
        }}
    }}
]
"""  # noqa: RUF027


def run_and_return_output(command: str, cwd: str | None = None) -> str:
    """Run a command in a shell and return the result as a string."""
    return subprocess.run(
        command,
        shell=True,  # noqa: S602
        stdout=subprocess.PIPE,
        text=True,
        check=True,
        cwd=cwd,
    ).stdout


def create_cert_signing_request(tmpdir: str) -> str:
    """Create and return a CertificateSigningRequest."""
    run_and_return_output("openssl genrsa -out priv.key 2048", tmpdir)
    cert_req = run_and_return_output(
        f"openssl req -new -key priv.key -subj /C=GB/ST=GB/O=pydis/CN={user}",
        tmpdir,
    )
    return base64.b64encode(cert_req.encode("utf-8")).decode("utf-8").replace("\n", "")


def approve_cert_signing_request(csr: str, tmpdir: str) -> None:
    """Approve the csr and save as a file in tmpdir."""
    csr_path = Path(tmpdir, "csr.yml")
    with csr_path.open("w") as f:
        f.write(CERT_REQUEST_TEMPLATE.format(user=user, req=csr))
    run_and_return_output(f"kubectl apply -f {csr_path}", tmpdir)
    run_and_return_output(f"kubectl certificate approve {user}")
    approved_cert = run_and_return_output(f"kubectl get csr {user} -o jsonpath='{{.status.certificate}}'")
    with Path(tmpdir, f"{user}.crt").open("w", encoding="locale") as f:
        f.write(base64.b64decode(approved_cert).decode("utf-8"))
    run_and_return_output(f"kubectl delete csr {user}")


def give_user_perms(tmpdir: str) -> None:
    """Add the user from args to the cluster role binding from the args."""
    role_binding_patch_path = Path(tmpdir, "binding_patch.json")
    with role_binding_patch_path.open("w") as f:
        f.write(ROLE_BINDING_PATCH_TEMPLATE.format(user=user))
    run_and_return_output(
        f"kubectl patch clusterrolebinding {role_binding} --type=json --patch-file {role_binding_patch_path}",
        tmpdir,
    )


def build_kubectl_config(tmpdir: str) -> None:
    """Build up a kubectl config from all the files in the tmpdir."""
    cluster_public_key = run_and_return_output(r"kubectl get cm kube-root-ca.crt -o jsonpath={.data.ca\.crt}")
    with Path(tmpdir, "ca.crt").open("w", encoding="locale") as f:
        f.write(cluster_public_key)
    cluster_url = run_and_return_output("kubectl config view --minify --output jsonpath={.clusters[*].cluster.server}")

    run_and_return_output(
        f"kubectl config set-cluster pydis --kubeconfig={user}.config "
        f"--server={cluster_url} --certificate-authority=ca.crt --embed-certs=true",
        tmpdir,
    )
    run_and_return_output(
        f"kubectl config set-credentials {user} "
        f"--client-key=priv.key --client-certificate={user}.crt "
        f"--embed-certs=true --kubeconfig={user}.config",
        tmpdir,
    )
    run_and_return_output(
        f"kubectl config set-context pydis --kubeconfig={user}.config "
        f"--cluster=pydis --user={user} --namespace=default",
        tmpdir,
    )
    run_and_return_output(f"kubectl config use-context pydis --kubeconfig={user}.config", tmpdir)


with tempfile.TemporaryDirectory() as tmpdir:
    csr = create_cert_signing_request(tmpdir)
    approve_cert_signing_request(csr, tmpdir)
    give_user_perms(tmpdir)
    build_kubectl_config(tmpdir)
    # Move the kubectl config from tmpdir to the current working dir ready to be sent to user.
    shutil.copy(
        Path(tmpdir, f"{user}.config"),  # from
        Path(f"{user}.config"),  # to
    )
    print(f"Config generated. Saved to {user}.config in current directory.")  # noqa: T201