aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--snekbox/memfs.py88
1 files changed, 41 insertions, 47 deletions
diff --git a/snekbox/memfs.py b/snekbox/memfs.py
index c5c1505..74b0d70 100644
--- a/snekbox/memfs.py
+++ b/snekbox/memfs.py
@@ -2,11 +2,10 @@
from __future__ import annotations
import logging
-import os
import subprocess
from collections.abc import Generator
+from functools import cached_property
from pathlib import Path
-from threading import BoundedSemaphore
from types import TracebackType
from typing import Type
from uuid import uuid4
@@ -15,16 +14,11 @@ from snekbox.snekio import FileAttachment
log = logging.getLogger(__name__)
-PID = os.getpid()
-NAMESPACE_DIR = Path("/memfs")
-NAMESPACE_DIR.mkdir(exist_ok=True)
-
-
-def mount_tmpfs(name: str, size: int | str) -> Path:
+def mount_tmpfs(path: str | Path, size: int | str) -> Path:
"""Create and mount a tmpfs directory."""
- tmp = NAMESPACE_DIR / name
- tmp.mkdir()
+ path = Path(path)
+ path.mkdir()
# Mount the tmpfs
subprocess.check_call(
[
@@ -34,18 +28,18 @@ def mount_tmpfs(name: str, size: int | str) -> Path:
"-o",
f"size={size}",
"tmpfs",
- str(tmp),
+ str(path),
]
)
- return tmp
+ return path
-def unmount_tmpfs(name: str) -> None:
+def unmount_tmpfs(path: str | Path) -> None:
"""Unmount and remove a tmpfs directory."""
- tmp = NAMESPACE_DIR / name
- subprocess.check_call(["umount", str(tmp)])
+ path = Path(path)
+ subprocess.check_call(["umount", str(path)])
# Unmounting will not remove the original folder, so do that here
- tmp.rmdir()
+ path.rmdir()
def parse_files(
@@ -65,39 +59,48 @@ def parse_files(
class MemFS:
"""A temporary directory using tmpfs."""
- assignment_lock = BoundedSemaphore(1)
- assigned_names: set[str] = set() # Pool of tempdir names in use
-
- def __init__(self, instance_size: int) -> None:
+ def __init__(self, instance_size: int, root_dir: str | Path = "/memfs") -> None:
"""
Create a temporary directory using tmpfs.
- size: Size limit of each tmpfs instance in bytes
+ Args:
+ instance_size: Size limit of each tmpfs instance in bytes.
+ root_dir: Root directory to mount instances in.
"""
- self.path: Path | None = None
self.instance_size = instance_size
+ self._path: Path | None = None
+ self.root_dir: Path = Path(root_dir)
+ self.root_dir.mkdir(exist_ok=True, parents=True)
+
+ @cached_property
+ def path(self) -> Path:
+ """Returns the path of the MemFS."""
+ if self._path is None:
+ raise RuntimeError("MemFS accessed before __enter__.")
+ return self._path
@property
def name(self) -> str | None:
"""Name of the temp dir."""
- return self.path.name if self.path else None
+ return self.path.name
@property
def home(self) -> Path | None:
"""Path to home directory."""
- return Path(self.path, "home") if self.path else None
+ return Path(self.path, "home")
def __enter__(self) -> MemFS:
"""Mounts a new tempfs, returns self."""
- with self.assignment_lock:
- for _ in range(10):
- # Combine PID to avoid collisions with multiple snekbox processes
- if (name := f"{PID}-{uuid4()}") not in self.assigned_names:
- self.path = mount_tmpfs(name, self.instance_size)
- self.assigned_names.add(name)
- break
- else:
- raise RuntimeError("Failed to generate a unique tempdir name in 10 attempts")
+ for _ in range(10):
+ name = str(uuid4())
+ try:
+ path = self.root_dir / name
+ self._path = mount_tmpfs(path, self.instance_size)
+ break
+ except FileExistsError:
+ continue
+ else:
+ raise RuntimeError("Failed to generate a unique tempdir name in 10 attempts")
self.mkdir("home")
return self
@@ -131,20 +134,11 @@ class MemFS:
yield FileAttachment.from_path(file)
def cleanup(self) -> None:
- """Unmounts tmpfs, releases name."""
- if self.path is None:
+ """Unmounts tmpfs."""
+ if self._path is None:
return
- # Remove the path folder
- unmount_tmpfs(self.name)
-
- if not self.path.exists():
- with self.assignment_lock:
- self.assigned_names.remove(self.name)
- else:
- # Don't remove name from pool if failed to delete folder
- logging.warning(f"Failed to remove {self.path} in cleanup")
-
- self.path = None
+ unmount_tmpfs(self.path)
+ self._path = None
def __repr__(self):
- return f"<MemoryTempDir {self.name or '(Uninitialized)'}>"
+ return f"<MemFS {self.name if self._path else '(Uninitialized)'}>"