Skip to content

Splitting

obspec_utils.wrappers.SplittingReadableStore

Bases: ReadableStore

Wraps a store to split large get() requests into concurrent get_ranges().

This accelerates fetching large files by dividing them into chunks and concurrent fetching via get_ranges(). The splitting is transparent to callers - they see a normal get() interface.

Designed to compose with CachingReadableStore:

from obstore.store import S3Store
from obspec_utils.wrappers import SplittingReadableStore, CachingReadableStore

store = S3Store(bucket="my-bucket")
store = SplittingReadableStore(store)  # Fast concurrent fetches
store = CachingReadableStore(store)    # Cache the results

# get() is now: concurrent fetch -> cache
result = store.get("large-file.nc")

Parameters:

  • store (ReadableStore) –

    The underlying store to wrap.

  • request_size (int, default: 12 * 1024 * 1024 ) –

    Target size for each concurrent range request in bytes. Default: 12 MB. Tuned for cloud storage throughput.

  • max_concurrent_requests (int, default: 18 ) –

    Maximum number of concurrent requests. Default: 18. If a file would require more requests than this, request sizes are increased to fit.

Notes

This wrapper only affects get() and get_async(). Range requests (get_range, get_ranges) pass through unchanged since they're already appropriately sized by the caller.

The concurrent fetching strategy is based on Icechunk's approach: github.com/earth-mover/icechunk/blob/main/icechunk/src/storage/mod.rs

Examples:

Basic usage:

from obstore.store import S3Store
from obspec_utils.wrappers import SplittingReadableStore

store = S3Store(bucket="my-bucket")
fast_store = SplittingReadableStore(store)

# Large file fetched via concurrent requests
result = fast_store.get("large-file.nc")

With caching (recommended pattern):

from obspec_utils.wrappers import CachingReadableStore

store = S3Store(bucket="my-bucket")
store = SplittingReadableStore(store)
store = CachingReadableStore(store)

# First access: concurrent fetch, then cached
result1 = store.get("file.nc")
# Second access: served from cache (no fetch)
result2 = store.get("file.nc")

Custom chunk sizes:

# Larger chunks for high-bandwidth connections
store = SplittingReadableStore(
    s3_store,
    request_size=32 * 1024 * 1024,  # 32 MB chunks
    max_concurrent_requests=8,
)

__getattr__

__getattr__(name: str) -> Any

Forward unknown attributes to the underlying store.

This ensures SplittingReadableStore is transparent for any additional public methods or attributes the underlying store may have.

Note: Private attributes (starting with '_') are not forwarded.

__init__

__init__(
    store: ReadableStore,
    request_size: int = 12 * 1024 * 1024,
    max_concurrent_requests: int = 18,
) -> None

Create a splitting wrapper around a store.

Parameters:

get

get(path: str, *, options: GetOptions | None = None) -> GetResult

Get file, using concurrent fetching if beneficial.

If the file is large enough to benefit from splitting, fetches via concurrent get_ranges(). Otherwise falls back to a single get() request.

get_async async

get_async(path: str, *, options: GetOptions | None = None) -> GetResultAsync

Async get, using concurrent fetching if beneficial.

get_range

get_range(
    path: str, *, start: int, end: int | None = None, length: int | None = None
) -> Buffer

Get a byte range (passed through to underlying store).

get_range_async async

get_range_async(
    path: str, *, start: int, end: int | None = None, length: int | None = None
) -> Buffer

Async get range (passed through to underlying store).

get_ranges

get_ranges(
    path: str,
    *,
    starts: Sequence[int],
    ends: Sequence[int] | None = None,
    lengths: Sequence[int] | None = None,
) -> Sequence[Buffer]

Get multiple byte ranges (passed through to underlying store).

get_ranges_async async

get_ranges_async(
    path: str,
    *,
    starts: Sequence[int],
    ends: Sequence[int] | None = None,
    lengths: Sequence[int] | None = None,
) -> Sequence[Buffer]

Async get ranges (passed through to underlying store).

head

head(path: str) -> ObjectMeta

Get file metadata (delegates to underlying store).

head_async async

head_async(path: str) -> ObjectMeta

Get file metadata async (delegates to underlying store).