aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar ionite34 <[email protected]>2022-11-28 14:26:11 +0800
committerGravatar ionite34 <[email protected]>2022-11-28 14:26:11 +0800
commit0de6825f3fa25afa4d0ccd0fabde91b07cc64d02 (patch)
treee4afaddb0f7ca3ada67898361f6e654409c6c2a1
parentMove rlimit_fsize to cfg (diff)
Refactor MemFS for implicit cleanup support
-rw-r--r--snekbox/memfs.py86
-rw-r--r--tests/test_memfs.py55
2 files changed, 81 insertions, 60 deletions
diff --git a/snekbox/memfs.py b/snekbox/memfs.py
index b8abc23..0e13da4 100644
--- a/snekbox/memfs.py
+++ b/snekbox/memfs.py
@@ -2,8 +2,10 @@
from __future__ import annotations
import logging
+import warnings
+import weakref
from collections.abc import Generator
-from functools import cached_property
+from contextlib import suppress
from pathlib import Path
from types import TracebackType
from typing import Type
@@ -18,27 +20,59 @@ __all__ = ("MemFS",)
class MemFS:
- """A temporary directory using tmpfs."""
+ """A memory temporary file system."""
def __init__(self, instance_size: int, root_dir: str | Path = "/memfs") -> None:
"""
- Create a temporary directory using tmpfs.
+ Initialize a memory temporary file system.
+
+ Examples:
+ >>> with MemFS(1024) as memfs:
+ ... (memfs.home / "test.txt").write_text("Hello")
Args:
instance_size: Size limit of each tmpfs instance in bytes.
root_dir: Root directory to mount instances in.
"""
self.instance_size = instance_size
- self._path: Path | None = None
- self.root_dir: Path = Path(root_dir)
+ self.root_dir = Path(root_dir)
self.root_dir.mkdir(exist_ok=True, parents=True)
- @cached_property
- def path(self) -> Path:
- """Returns the path of the MemFS."""
- if self._path is None:
- raise RuntimeError("MemFS accessed before __enter__.")
- return self._path
+ for _ in range(10):
+ name = str(uuid4())
+ try:
+ self.path = self.root_dir / name
+ self.path.mkdir()
+ mount("", self.path, "tmpfs", size=self.instance_size)
+ break
+ except OSError:
+ continue
+ else:
+ raise RuntimeError("Failed to generate a unique MemFS name in 10 attempts")
+
+ self.mkdir(self.home)
+ self.mkdir(self.output)
+
+ self._finalizer = weakref.finalize(
+ self,
+ self._cleanup,
+ self.path,
+ warn_message=f"Implicitly cleaning up {self!r}",
+ )
+
+ @classmethod
+ def _cleanup(cls, path: Path, warn_message: str):
+ """Implicit cleanup of the MemFS."""
+ with suppress(OSError):
+ unmount(path)
+ path.rmdir()
+ warnings.warn(warn_message, ResourceWarning)
+
+ def cleanup(self) -> None:
+ """Unmount the tempfs and remove the directory."""
+ if self._finalizer.detach() or self.path.exists():
+ unmount(self.path)
+ self.path.rmdir()
@property
def name(self) -> str:
@@ -56,22 +90,6 @@ class MemFS:
return self.home / "output"
def __enter__(self) -> MemFS:
- """Mount a new tempfs and return self."""
- for _ in range(10):
- name = str(uuid4())
- try:
- path = self.root_dir / name
- path.mkdir()
- mount("", path, "tmpfs", size=self.instance_size)
- self._path = path
- break
- except FileExistsError:
- continue
- else:
- raise RuntimeError("Failed to generate a unique tempdir name in 10 attempts")
-
- self.mkdir(self.home)
- self.mkdir(self.output)
return self
def __exit__(
@@ -82,6 +100,9 @@ class MemFS:
) -> None:
self.cleanup()
+ def __repr__(self) -> str:
+ return f"<{self.__class__.__name__} {self.path}>"
+
def mkdir(self, path: Path | str, chmod: int = 0o777) -> Path:
"""Create a directory in the tempdir."""
folder = Path(self.path, path)
@@ -128,14 +149,3 @@ class MemFS:
for file in res:
_ = file.as_dict
return res
-
- def cleanup(self) -> None:
- """Unmount the tmpfs."""
- if self._path is None:
- return
- unmount(self.path)
- self.path.rmdir()
- self._path = None
-
- def __repr__(self):
- return f"<MemFS {self.name if self._path else '(Uninitialized)'}>"
diff --git a/tests/test_memfs.py b/tests/test_memfs.py
index 8050562..53dee22 100644
--- a/tests/test_memfs.py
+++ b/tests/test_memfs.py
@@ -1,6 +1,6 @@
import logging
+import warnings
from concurrent.futures import ThreadPoolExecutor
-from operator import attrgetter
from unittest import TestCase, mock
from uuid import uuid4
@@ -9,24 +9,26 @@ from snekbox.memfs import MemFS
UUID_TEST = uuid4()
-def get_memfs_with_context():
- return MemFS(10).__enter__()
-
-
class MemFSTests(TestCase):
def setUp(self):
super().setUp()
self.logger = logging.getLogger("snekbox.memfs")
self.logger.setLevel(logging.WARNING)
+ warnings.filterwarnings(
+ "ignore",
+ ".*Implicitly cleaning up.*",
+ ResourceWarning,
+ "snekbox.memfs",
+ )
@mock.patch("snekbox.memfs.uuid4", lambda: UUID_TEST)
def test_assignment_thread_safe(self):
"""Test concurrent mounting works in multi-thread environments."""
# Concurrently create MemFS in threads, check only 1 can be created
# Others should result in RuntimeError
- with ThreadPoolExecutor() as pool:
+ with ThreadPoolExecutor() as executor:
memfs: MemFS | None = None
- futures = [pool.submit(get_memfs_with_context) for _ in range(8)]
+ futures = [executor.submit(MemFS, 10) for _ in range(8)]
for future in futures:
# We should have exactly one result and all others RuntimeErrors
if err := future.exception():
@@ -37,20 +39,29 @@ class MemFSTests(TestCase):
# Original memfs should still exist afterwards
self.assertIsInstance(memfs, MemFS)
- self.assertTrue(memfs.path.exists())
-
- def test_no_context_error(self):
- """Accessing MemFS attributes before __enter__ raises RuntimeError."""
- cases = [
- attrgetter("path"),
- attrgetter("name"),
- attrgetter("home"),
- attrgetter("output"),
- lambda fs: fs.mkdir(""),
- lambda fs: list(fs.files(1)),
- ]
+ self.assertTrue(memfs.path.is_mount())
+
+ def test_cleanup(self):
+ """Test explicit cleanup."""
+ memfs = MemFS(10)
+ path = memfs.path
+ self.assertTrue(path.is_mount())
+ memfs.cleanup()
+ self.assertFalse(path.exists())
+
+ def test_context_cleanup(self):
+ """Context __exit__ should trigger cleanup."""
+ with MemFS(10) as memfs:
+ path = memfs.path
+ self.assertTrue(path.is_mount())
+ self.assertFalse(path.exists())
+ def test_implicit_cleanup(self):
+ """Test implicit _cleanup triggered by GC."""
memfs = MemFS(10)
- for case in cases:
- with self.subTest(case=case), self.assertRaises(RuntimeError):
- case(memfs)
+ path = memfs.path
+ self.assertTrue(path.is_mount())
+ # Catch the warning about implicit cleanup
+ with self.assertWarns(ResourceWarning):
+ del memfs
+ self.assertFalse(path.exists())