Skip to content

Caching

obspec_utils.wrappers.CachingReadableStore

Bases: ReadableStore

A wrapper that caches full objects in a MemoryStore on first access.

This wrapper caches entire objects when they are first accessed. Subsequent accesses (including range requests) are served from the cache.

The cache uses LRU (Least Recently Used) eviction when it exceeds the maximum size.

Notes

Thread Safety: This class is thread-safe and works correctly with multi-threaded executors (e.g., ThreadPoolExecutor).

Distributed Limitations: The cache is local to each process. In distributed settings (Dask distributed, ProcessPoolExecutor, Lithops), each worker maintains its own independent cache with no sharing:

  • Workers accessing the same files will each fetch independently
  • Memory is duplicated across workers
  • This is typically acceptable when workloads are partitioned by file (each worker processes different files)

For workloads where multiple workers repeatedly access the same files, consider external caching solutions (Redis, shared filesystem) or restructuring the workload to minimize cross-worker file access.

Examples:

With context manager (cache cleared on exit):

from obstore.store import S3Store
from obspec_utils.wrappers import CachingReadableStore
from obspec_utils import ObjectStoreRegistry

s3_store = S3Store(bucket="my-bucket")

with CachingReadableStore(s3_store, max_size=512*1024*1024) as cached:
    registry = ObjectStoreRegistry({"s3://my-bucket": cached})
    # Use registry - first access fetches from S3, subsequent from cache
    store, path = registry.resolve("s3://my-bucket/file.nc")
    data = store.get_range(path, start=0, end=1000)
# Cache cleared automatically

With explicit cleanup:

cached = CachingReadableStore(s3_store)
registry = ObjectStoreRegistry({"s3://my-bucket": cached})
# ... use registry ...
cached.clear_cache()  # Explicit cleanup when done

cache_size property

cache_size: int

Current cache size in bytes.

cached_paths property

cached_paths: list[str]

List of currently cached paths (in LRU order, oldest first).

__enter__

__enter__() -> 'CachingReadableStore'

Enter the context manager.

__exit__

__exit__(exc_type, exc_val, exc_tb) -> None

Exit the context manager, clearing the cache.

__getattr__

__getattr__(name: str) -> Any

Forward unknown attributes to the underlying store.

This ensures CachingReadableStore is transparent for any additional public methods or attributes the underlying store may have.

Note: Private attributes (starting with '_') are not forwarded.

__init__

__init__(store: ReadableStore, max_size: int = 256 * 1024 * 1024) -> None

Create a caching wrapper around a store.

Parameters:

__reduce__

__reduce__()

Support pickling for multiprocessing and distributed frameworks.

Returns a fresh instance with an empty cache. This is intentional: serializing the full cache contents would be inefficient for distributed workloads where each worker typically processes different files.

The underlying store and max_size configuration are preserved.

clear_cache

clear_cache() -> None

Clear all cached objects.

get

get(path: str, *, options: GetOptions | None = None) -> GetResult

Get entire file, using cache if available.

get_async async

get_async(path: str, *, options: GetOptions | None = None) -> GetResultAsync

Get entire file async, using cache if available.

get_range

get_range(
    path: str, *, start: int, end: int | None = None, length: int | None = None
) -> Buffer

Get a byte range, caching the full object first if needed.

get_range_async async

get_range_async(
    path: str, *, start: int, end: int | None = None, length: int | None = None
) -> Buffer

Get a byte range async, caching the full object first if needed.

get_ranges

get_ranges(
    path: str,
    *,
    starts: Sequence[int],
    ends: Sequence[int] | None = None,
    lengths: Sequence[int] | None = None,
) -> Sequence[Buffer]

Get multiple byte ranges, caching the full object first if needed.

get_ranges_async async

get_ranges_async(
    path: str,
    *,
    starts: Sequence[int],
    ends: Sequence[int] | None = None,
    lengths: Sequence[int] | None = None,
) -> Sequence[Buffer]

Get multiple byte ranges async, caching the full object first if needed.

head

head(path: str) -> ObjectMeta

Get file metadata (delegates to underlying store).

head_async async

head_async(path: str) -> ObjectMeta

Get file metadata async (delegates to underlying store).