diff options
author | 2022-11-28 14:26:11 +0800 | |
---|---|---|
committer | 2022-11-28 14:26:11 +0800 | |
commit | 0de6825f3fa25afa4d0ccd0fabde91b07cc64d02 (patch) | |
tree | e4afaddb0f7ca3ada67898361f6e654409c6c2a1 | |
parent | Move rlimit_fsize to cfg (diff) |
Refactor MemFS for implicit cleanup support
-rw-r--r-- | snekbox/memfs.py | 86 | ||||
-rw-r--r-- | tests/test_memfs.py | 55 |
2 files changed, 81 insertions, 60 deletions
diff --git a/snekbox/memfs.py b/snekbox/memfs.py index b8abc23..0e13da4 100644 --- a/snekbox/memfs.py +++ b/snekbox/memfs.py @@ -2,8 +2,10 @@ from __future__ import annotations import logging +import warnings +import weakref from collections.abc import Generator -from functools import cached_property +from contextlib import suppress from pathlib import Path from types import TracebackType from typing import Type @@ -18,27 +20,59 @@ __all__ = ("MemFS",) class MemFS: - """A temporary directory using tmpfs.""" + """A memory temporary file system.""" def __init__(self, instance_size: int, root_dir: str | Path = "/memfs") -> None: """ - Create a temporary directory using tmpfs. + Initialize a memory temporary file system. + + Examples: + >>> with MemFS(1024) as memfs: + ... (memfs.home / "test.txt").write_text("Hello") Args: instance_size: Size limit of each tmpfs instance in bytes. root_dir: Root directory to mount instances in. """ self.instance_size = instance_size - self._path: Path | None = None - self.root_dir: Path = Path(root_dir) + self.root_dir = Path(root_dir) self.root_dir.mkdir(exist_ok=True, parents=True) - @cached_property - def path(self) -> Path: - """Returns the path of the MemFS.""" - if self._path is None: - raise RuntimeError("MemFS accessed before __enter__.") - return self._path + for _ in range(10): + name = str(uuid4()) + try: + self.path = self.root_dir / name + self.path.mkdir() + mount("", self.path, "tmpfs", size=self.instance_size) + break + except OSError: + continue + else: + raise RuntimeError("Failed to generate a unique MemFS name in 10 attempts") + + self.mkdir(self.home) + self.mkdir(self.output) + + self._finalizer = weakref.finalize( + self, + self._cleanup, + self.path, + warn_message=f"Implicitly cleaning up {self!r}", + ) + + @classmethod + def _cleanup(cls, path: Path, warn_message: str): + """Implicit cleanup of the MemFS.""" + with suppress(OSError): + unmount(path) + path.rmdir() + warnings.warn(warn_message, ResourceWarning) + + def cleanup(self) -> None: + """Unmount the tempfs and remove the directory.""" + if self._finalizer.detach() or self.path.exists(): + unmount(self.path) + self.path.rmdir() @property def name(self) -> str: @@ -56,22 +90,6 @@ class MemFS: return self.home / "output" def __enter__(self) -> MemFS: - """Mount a new tempfs and return self.""" - for _ in range(10): - name = str(uuid4()) - try: - path = self.root_dir / name - path.mkdir() - mount("", path, "tmpfs", size=self.instance_size) - self._path = path - break - except FileExistsError: - continue - else: - raise RuntimeError("Failed to generate a unique tempdir name in 10 attempts") - - self.mkdir(self.home) - self.mkdir(self.output) return self def __exit__( @@ -82,6 +100,9 @@ class MemFS: ) -> None: self.cleanup() + def __repr__(self) -> str: + return f"<{self.__class__.__name__} {self.path}>" + def mkdir(self, path: Path | str, chmod: int = 0o777) -> Path: """Create a directory in the tempdir.""" folder = Path(self.path, path) @@ -128,14 +149,3 @@ class MemFS: for file in res: _ = file.as_dict return res - - def cleanup(self) -> None: - """Unmount the tmpfs.""" - if self._path is None: - return - unmount(self.path) - self.path.rmdir() - self._path = None - - def __repr__(self): - return f"<MemFS {self.name if self._path else '(Uninitialized)'}>" diff --git a/tests/test_memfs.py b/tests/test_memfs.py index 8050562..53dee22 100644 --- a/tests/test_memfs.py +++ b/tests/test_memfs.py @@ -1,6 +1,6 @@ import logging +import warnings from concurrent.futures import ThreadPoolExecutor -from operator import attrgetter from unittest import TestCase, mock from uuid import uuid4 @@ -9,24 +9,26 @@ from snekbox.memfs import MemFS UUID_TEST = uuid4() -def get_memfs_with_context(): - return MemFS(10).__enter__() - - class MemFSTests(TestCase): def setUp(self): super().setUp() self.logger = logging.getLogger("snekbox.memfs") self.logger.setLevel(logging.WARNING) + warnings.filterwarnings( + "ignore", + ".*Implicitly cleaning up.*", + ResourceWarning, + "snekbox.memfs", + ) @mock.patch("snekbox.memfs.uuid4", lambda: UUID_TEST) def test_assignment_thread_safe(self): """Test concurrent mounting works in multi-thread environments.""" # Concurrently create MemFS in threads, check only 1 can be created # Others should result in RuntimeError - with ThreadPoolExecutor() as pool: + with ThreadPoolExecutor() as executor: memfs: MemFS | None = None - futures = [pool.submit(get_memfs_with_context) for _ in range(8)] + futures = [executor.submit(MemFS, 10) for _ in range(8)] for future in futures: # We should have exactly one result and all others RuntimeErrors if err := future.exception(): @@ -37,20 +39,29 @@ class MemFSTests(TestCase): # Original memfs should still exist afterwards self.assertIsInstance(memfs, MemFS) - self.assertTrue(memfs.path.exists()) - - def test_no_context_error(self): - """Accessing MemFS attributes before __enter__ raises RuntimeError.""" - cases = [ - attrgetter("path"), - attrgetter("name"), - attrgetter("home"), - attrgetter("output"), - lambda fs: fs.mkdir(""), - lambda fs: list(fs.files(1)), - ] + self.assertTrue(memfs.path.is_mount()) + + def test_cleanup(self): + """Test explicit cleanup.""" + memfs = MemFS(10) + path = memfs.path + self.assertTrue(path.is_mount()) + memfs.cleanup() + self.assertFalse(path.exists()) + + def test_context_cleanup(self): + """Context __exit__ should trigger cleanup.""" + with MemFS(10) as memfs: + path = memfs.path + self.assertTrue(path.is_mount()) + self.assertFalse(path.exists()) + def test_implicit_cleanup(self): + """Test implicit _cleanup triggered by GC.""" memfs = MemFS(10) - for case in cases: - with self.subTest(case=case), self.assertRaises(RuntimeError): - case(memfs) + path = memfs.path + self.assertTrue(path.is_mount()) + # Catch the warning about implicit cleanup + with self.assertWarns(ResourceWarning): + del memfs + self.assertFalse(path.exists()) |