Source code for pestifer.util.cacheable_object


# Author: ChatGPT 5
# Assistant Author: Cameron F. Abrams <cfa22@drexel.edu>

"""
Implements the general-purpose CacheableObject class and the TarBytesFS class.
"""

from __future__ import annotations

import fsspec
import hashlib
import io
import joblib
import logging
import os
import tempfile

from filelock import FileLock
from packaging.version import Version, InvalidVersion
from pathlib import Path
from platformdirs import user_cache_dir
from typing import Iterable

logger = logging.getLogger(__name__)

from .stringthings import __pestifer_version__

[docs] class TarBytesFS: """ A class for handling tarred bytes as a filesystem. """ __slots__ = ("tar_bytes", "compression", "_fs") def __init__(self, tar_bytes: bytes, compression: str | None = None): self.tar_bytes = tar_bytes # pickleable self.compression = compression # pickleable self._fs = None # ephemeral, NOT pickleable
[docs] @classmethod def from_file(cls, path: str | Path, compression: str | None = None): """ Initializes a TarBytesFS instance from a file. """ return cls(Path(path).read_bytes(), compression=compression)
[docs] def fs(self): """ Returns a FileSystem handle for the TarBytesFS object """ if self._fs is None: bio = io.BytesIO(self.tar_bytes) if self.compression is None and not hasattr(bio, "name"): # Let fsspec infer if you wish: bio.name = "archive.tgz" pass self._fs = fsspec.filesystem("tar", fo=bio, compression=self.compression) return self._fs
[docs] def ls(self, path: str = ""): """ Lists the contents of a directory in the tarred filesystem. """ return self.fs().ls(path)
[docs] def open(self, path: str, mode: str = "rb"): """ Opens a file in the tarred filesystem and returns a context handle to that file. """ return self.fs().open(path, mode)
# --- Pickle hooks: drop the ephemeral filesystem def __getstate__(self): return {"tar_bytes": self.tar_bytes, "compression": self.compression} def __setstate__(self, state): self.tar_bytes = state["tar_bytes"] self.compression = state.get("compression") self._fs = None
def _latest_mtime(root: Path, *, ignore_names: set[str] = {"__pycache__"}, ignore_suffixes: set[str] = set()) -> float: latest = 0.0 if isinstance(root, CacheableObject): root = root.resource_root root = Path(root) for p in root.rglob("*"): if p.name.startswith(".") or p.name in ignore_names: continue if p.is_file() and p.suffix not in ignore_suffixes: try: mt = p.stat().st_mtime if mt > latest: latest = mt except FileNotFoundError: pass return latest def _hash_resource(resource: Path) -> str: return hashlib.sha256(str(Path(resource).resolve()).encode()).hexdigest()[:12]
[docs] class CacheableObject: """ Base/mixin providing native cache/de-cache behavior controlled by resource mtimes. Subclasses must implement: ``_build_from_resources(self, resource_root: Path) -> None`` to populate `self` from files in resource_root. Behavior: - On __init__, compare cache mtime to newest file mtime under ``resource_root``. - If cache is fresh, hydrate ``self`` from cache and set ``self.from_cache = True``. - Else, call ``_build_from_resources(...)``, ensure ``self.from_cache = False``, and write cache. """ APP_NAME = "pestifer" # for per-user cache dir APP_VERSION = __pestifer_version__ CACHE_PREFIX = "cacheobj" # file prefix; class & path hash appended CACHE_COMPRESS = ("gzip", 3) # joblib compression # Choose how versioning gates the cache: # "major" -> v{MAJOR} # "major.minor" -> v{MAJOR}.{MINOR} (recommended default) # "exact" -> full version string # None -> no version in filename (old behavior) VERSION_SCOPE = "major.minor" @classmethod def _version_tag(cls) -> str: if not cls.VERSION_SCOPE: return "" try: v = Version(cls.APP_VERSION) except InvalidVersion: # fall back to raw string if non-PEP440 return f"v{cls.APP_VERSION}" if cls.VERSION_SCOPE == "major": return f"v{v.major}" if cls.VERSION_SCOPE == "major.minor": return f"v{v.major}.{v.minor}" if cls.VERSION_SCOPE == "exact": return f"v{str(v)}" return "" # unknown scope => no tag def __init__(self, resource_root: str | Path, *, cache_dir: str | Path | object | None = None, force_rebuild: bool = False, ignore_suffixes: Iterable[str] = (), resource_label: str | None = None, **kwargs): if isinstance(resource_root, str): resource_root = Path(resource_root) self.resource_root = resource_root label = resource_label if resource_label else _hash_resource(resource_root) base_key = f"{self.__class__.__name__.lower()}-{label}" vtag = self._version_tag() key = f"{base_key}-{vtag}" if vtag else base_key cdir = Path(cache_dir) if cache_dir else Path(user_cache_dir(self.APP_NAME)) cdir.mkdir(parents=True, exist_ok=True) cpath = cdir / f"{self.CACHE_PREFIX}-{key}.joblib" lock = FileLock(str(cpath) + ".lock") resources_mtime = _latest_mtime(resource_root, ignore_suffixes=set(ignore_suffixes)) # logger.debug("Resource mtime: %s", resources_mtime) # logger.debug("Cache mtime: %s", cpath.stat().st_mtime if cpath.exists() else None) with lock: # logger.debug(f'Acquired lock for {cpath}') # logger.debug(f'cache exists: {cpath.exists()} x force_rebuild: {force_rebuild} => load cache? {"yes" if cpath.exists() and not force_rebuild else "no"}') if cpath.exists() and not force_rebuild: # logger.debug(f"{cpath.stat().st_mtime} >=? {resources_mtime} -> {cpath.stat().st_mtime >= resources_mtime}") try: if cpath.stat().st_mtime >= resources_mtime: # logger.info(f'Loading {self.__class__.__name__} from cache: {cpath}') cached = joblib.load(cpath) # trusted cache only # logger.debug("Loaded cached object") self._adopt_state_from(cached) # mark as loaded-from-cache (even if subclass set it earlier) try: object.__setattr__(self, "from_cache", True) except Exception: # if __slots__ disallow it, ignore pass # logger.debug("Loaded from cache: %s", cpath) return except Exception: # fall through to rebuild on any load problem # logger.debug("Cache load failed; rebuilding...") pass # Rebuild from resources logger.info(f'Rebuilding {self.__class__.__name__} from resources...') self._build_from_resources(resource_root, **kwargs) # if subclass didn't set it, default to False if not hasattr(self, "from_cache"): try: object.__setattr__(self, "from_cache", False) except Exception: pass # Persist atomically fd, tmp = tempfile.mkstemp(dir=str(cdir), suffix=".joblib") os.close(fd) try: logger.debug(f'Writing {self.__class__.__name__} to cache: {cpath}') joblib.dump(self, tmp, compress=self.CACHE_COMPRESS) os.replace(tmp, cpath) # logger.debug("Wrote cache: %s", cpath) finally: if os.path.exists(tmp): os.remove(tmp) # ----- hooks ----- def _build_from_resources(self, resource_root: Path, **kwargs) -> None: """Subclasses must populate `self` here.""" raise NotImplementedError def _adopt_state_from(self, other: "CacheableObject") -> None: """Default hydration copies __dict__; override if using __slots__.""" if hasattr(self, "__dict__") and hasattr(other, "__dict__"): self.__dict__.clear() self.__dict__.update(other.__dict__) else: # basic slots copy (doesn't walk MRO deeply; override for complex cases) for slot in getattr(type(self), "__slots__", ()): setattr(self, slot, getattr(other, slot))