1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
|
import ctypes
import os
import os.path
import pathlib
import sys
import traceback
import poetry as poetry_package
from cleo.io.io import IO
from landlock import FSAccess, Ruleset
from poetry.plugins.plugin import Plugin
from poetry.poetry import Poetry
def existing_paths(paths):
assert isinstance(paths, (list, tuple))
for path in paths:
if os.path.exists(path):
yield path
def ensure_paths(paths):
assert isinstance(paths, (list, tuple))
for path in paths:
if not os.path.exists(path):
os.makedirs(path)
yield path
def find_libc(**kwargs):
# intentionally doesn't use `ctypes.util.find_library` since that seems
# to run external programs, which seems like a security risk.
libc = ctypes.CDLL("libc.so.6", **kwargs)
# const char *source, const char *target, const char *filesystemtype,
# unsigned long mountflags, const void *_Nullable data
libc.mount.argtypes = (ctypes.c_char_p, ctypes.c_char_p,
ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p)
return libc
class UnshareFlags:
# HACK: manually entered here from `/usr/include/linux/sched.h`...
CLONE_VM = 0x00000100
CLONE_FS = 0x00000200
CLONE_FILES = 0x00000400
CLONE_SIGHAND = 0x00000800
CLONE_PIDFD = 0x00001000
CLONE_PTRACE = 0x00002000
CLONE_VFORK = 0x00004000
CLONE_PARENT = 0x00008000
CLONE_THREAD = 0x00010000
CLONE_NEWNS = 0x00020000
CLONE_SYSVSEM = 0x00040000
CLONE_SETTLS = 0x00080000
CLONE_PARENT_SETTID = 0x00100000
CLONE_CHILD_CLEARTID = 0x00200000
CLONE_DETACHED = 0x00400000
CLONE_UNTRACED = 0x00800000
CLONE_CHILD_SETTID = 0x01000000
CLONE_NEWCGROUP = 0x02000000
CLONE_NEWUTS = 0x04000000
CLONE_NEWIPC = 0x08000000
CLONE_NEWUSER = 0x10000000
CLONE_NEWPID = 0x20000000
CLONE_NEWNET = 0x40000000
CLONE_IO = 0x80000000
def exc_from_errno(syscall: str, detail: str | None = None, hint: str | None = None):
errno = ctypes.get_errno()
err = RuntimeError(f"Failed to {syscall}()...")
err.add_note(f"OS error: {os.strerror(errno)}")
err.add_note(f"Details: {detail}")
if hint:
err.add_note(f"HINT: {hint}")
return err
class RestrictPlugin(Plugin):
def unshare(self, poetry: Poetry):
libc = find_libc(use_errno=True)
# After CLONE_NEWUSER, every UID will be 65534 (nobody)
uid = os.getuid()
rc = libc.unshare(UnshareFlags.CLONE_NEWUSER)
if rc != 0:
raise exc_from_errno(
syscall="unshare",
detail="Tried to create a new user namespace",
hint="Ensure user namespacing is enabled (sysctl `kernel.unprivileged_userns_clone`)",
)
# Pretend we're root, see `man 7 user_namespaces`, "Defining user and group ID mappings"
with open("/proc/self/uid_map", "w") as f:
f.write(f"0 {uid} 1")
# Now we are """root""" and can unshare whatever we wish
flags = (
UnshareFlags.CLONE_FILES
| UnshareFlags.CLONE_FS
| UnshareFlags.CLONE_NEWCGROUP
| UnshareFlags.CLONE_NEWUTS
| UnshareFlags.CLONE_NEWPID
| UnshareFlags.CLONE_NEWNS
)
rc = libc.unshare(flags)
if rc != 0:
raise exc_from_errno(syscall="unshare")
# "The first child created by the calling process will have the process
# ID 1 and will assume the role of init(1) in the new namespace."
# Let's create that first child.
pid = os.fork() # Raises on error.
if pid > 0:
(_pid, child_exit_code) = os.waitpid(pid, 0)
exit_code = os.waitstatus_to_exitcode(child_exit_code)
sys.exit(exit_code)
# Hide process table
mounts = (
# source, target, fstype, mountflags, data (options?)
(b"proc", b"/proc", b"proc", 0, b""),
)
for mountargs in mounts:
rc = libc.mount(*mountargs)
if rc != 0:
raise exc_from_errno(syscall="mount", detail=f"Mount options are {mountargs!r}")
def landlock(self, poetry: Poetry):
# /home/user/.local/pipx/venvs/poetry/lib/python3.11/site-packages
poetry_libs_path = pathlib.Path(poetry_package.__path__._path[0]).parent
# Needed, otherwise raises:
# Fatal Python error: init_import_site: Failed to import the site module
# /home/user/.local/pipx/venvs/poetry/pyvenv.cfg
poetry_pyvenv_cfg = poetry_libs_path.parent.parent.parent / "pyvenv.cfg"
ruleset = Ruleset()
# Rules for Poetry's virtual environment management
ruleset.allow(
*ensure_paths(
(
# Storing the virtual environment
poetry.config.virtualenvs_path,
# Cached dependencies
poetry.config.artifacts_cache_directory,
poetry.config.repository_cache_directory
),
),
rules=FSAccess.all(),
)
# Temporary storage
ruleset.allow("/tmp", rules=FSAccess.all() & ~FSAccess.EXECUTE)
# Poetry may also want to late-import some of its dependencies, or built-in modules
ruleset.allow(*existing_paths(sys.path), rules=FSAccess.READ_FILE | FSAccess.READ_DIR)
# Finally, the Python executable may need to import some of its shared libraries
ruleset.allow(
*existing_paths(("/lib", "/lib64")),
rules=FSAccess.READ_FILE | FSAccess.READ_DIR | FSAccess.EXECUTE,
)
# and in poetry shell, we might want to run some system executables, too
ruleset.allow("/usr/bin", rules=FSAccess.READ_FILE | FSAccess.READ_DIR | FSAccess.EXECUTE)
# For compilation of C dependencies, we need to be able to find headers
ruleset.allow(*existing_paths(("/usr/include",)), rules=FSAccess.READ_FILE | FSAccess.READ_DIR)
# We allow read access here, note the pid namespace is restricted
ruleset.allow("/proc", rules=FSAccess.READ_FILE | FSAccess.READ_DIR)
# needed for /dev/tty and /dev/pty devices, see /usr/lib/python3.11/pty.py
ruleset.allow("/dev", rules=FSAccess.READ_FILE | FSAccess.READ_DIR | FSAccess.WRITE_FILE)
# Python's `zoneinfo` module
ruleset.allow("/usr/share/zoneinfo/", rules=FSAccess.READ_FILE | FSAccess.READ_DIR)
ruleset.allow(
# We need to know which DNS resolver to use, and any custom hosts
*existing_paths(("/etc/resolv.conf", "/etc/hosts")),
# pip reads this file in _vendor/distro/distro.py
*existing_paths(("/etc/debian_version",)),
# I'm not opposed to including things like this because I don't want to annoy people
# when their tooling doesn't work. But we have to be conservative. I think shells
# are fine, but if there was some further tooling (e.g. shell tools run at startup)
# I don't think those should be included.
*existing_paths(("/etc/bash.bashrc", os.path.expanduser("~/.bashrc"))),
rules=FSAccess.READ_FILE,
)
ruleset.allow("/etc/ssl/certs", "/usr/local/share/ca-certificates", rules=FSAccess.READ_FILE | FSAccess.READ_DIR)
# Allow determining mime types. Used for ruamel.yaml installation.
ruleset.allow("/etc/mime.types", rules=FSAccess.READ_FILE)
# Allow working with shared memory
ruleset.allow("/dev/shm")
# Black cache access
ruleset.allow(
*existing_paths((os.path.expanduser("~/.cache/black"),)),
rules=FSAccess.READ_FILE | FSAccess.WRITE_FILE | FSAccess.READ_DIR,
)
pre_commit_cache = os.path.expanduser("~/.cache/pre-commit")
if os.path.exists(pre_commit_cache):
ruleset.allow(pre_commit_cache)
# pre-commit runs git to figure out the diff to lint, which will
# be pretty noisy if we do not whitelist the gitconfig.
ruleset.allow(
*existing_paths(
(
os.path.expanduser("~/.gitconfig"),
os.path.expanduser("~/.config/git/config")
)
),
rules=FSAccess.READ_FILE,
)
# # Usage of Ansible with DEFAULT_LOCAL_TMP
# ruleset.allow(*existing_paths((os.path.expanduser("~/.ansible/tmp"),)))
# ruleset.allow("/etc/passwd", rules=FSAccess.READ_FILE)
# ruleset.allow(*existing_paths((os.path.expanduser("~/.ssh/known_hosts"),)), rules=FSAccess.READ_FILE)
# Allow manipulation of files in our projects, e.g. for linters.
# We might need to check this more thoroughly. ~~For instance, configuring custom
# filter programs in gitattributes might allow a sandbox escape.~~ this should
# not happen, since landlock enforces nonewprivs.
ruleset.allow(os.path.dirname(poetry.pyproject_path))
# => Rules for poetry-in-poetry
#
# This is suboptimal. It is needed for nested invocations of poetry, which
# sometimes happen through a combination of tooling (e.g. script calling
# command through poetry being run in poetry shell). However, the
# poetry configuration directory contains a file named `auth.toml`, which
# sounds it makes sense to restrict. The cleaner solution here would be
# to mount a tmpfs over here so it appears empty.
ruleset.allow(
*existing_paths((os.path.expanduser("~/.config/pypoetry"),)),
rules=FSAccess.READ_FILE | FSAccess.READ_DIR,
)
# Python may need to read pyvenv.cfg
ruleset.allow(poetry_pyvenv_cfg, rules=FSAccess.READ_FILE)
# [Errno 13] Permission denied: '~/.local/share/virtualenv/py_info/1/$HASH.lock'
# Needs more investigation. Seems to happen on some occasions when
# setting up the virtual env.
# ruleset.allow(*existing_paths((os.path.expanduser("~/.local/share/virtualenv/py_info"),)), rules=FSAccess.READ_FILE | FSAccess.READ_DIR)
ruleset.apply()
def activate(self, poetry: Poetry, io: IO):
if os.getenv("POETRY_NO_RESTRICT") == "1":
io.write_line(
"<info>poetry-restrict-plugin</info>: "
"<comment>Disabled via POETRY_NO_RESTRICT environment variable!</comment>"
)
return
try:
self.unshare(poetry)
self.landlock(poetry)
io.write_line("<info>poetry-restrict-plugin</info>: Landlocked & unshared.")
except Exception as err:
io.write_line("<error>Fatal error trying to enforce Landlock rules or unshare:</error>")
traceback.print_exception(err)
io.write_line("<error>This is an issue of the Poetry restrict plugin, not of Poetry itself.</error>")
raise
|