Splitting
obspec_utils.wrappers.SplittingReadableStore ¶
Bases: ReadableStore
Wraps a store to split large get() requests into concurrent get_ranges().
This accelerates fetching large files by dividing them into chunks and concurrent fetching via get_ranges(). The splitting is transparent to callers - they see a normal get() interface.
Designed to compose with CachingReadableStore:
from obstore.store import S3Store
from obspec_utils.wrappers import SplittingReadableStore, CachingReadableStore
store = S3Store(bucket="my-bucket")
store = SplittingReadableStore(store) # Fast concurrent fetches
store = CachingReadableStore(store) # Cache the results
# get() is now: concurrent fetch -> cache
result = store.get("large-file.nc")
Parameters:
-
store(ReadableStore) –The underlying store to wrap.
-
request_size(int, default:12 * 1024 * 1024) –Target size for each concurrent range request in bytes. Default: 12 MB. Tuned for cloud storage throughput.
-
max_concurrent_requests(int, default:18) –Maximum number of concurrent requests. Default: 18. If a file would require more requests than this, request sizes are increased to fit.
Notes
This wrapper only affects get() and get_async(). Range requests (get_range, get_ranges) pass through unchanged since they're already appropriately sized by the caller.
The concurrent fetching strategy is based on Icechunk's approach: github.com/earth-mover/icechunk/blob/main/icechunk/src/storage/mod.rs
Examples:
Basic usage:
from obstore.store import S3Store
from obspec_utils.wrappers import SplittingReadableStore
store = S3Store(bucket="my-bucket")
fast_store = SplittingReadableStore(store)
# Large file fetched via concurrent requests
result = fast_store.get("large-file.nc")
With caching (recommended pattern):
from obspec_utils.wrappers import CachingReadableStore
store = S3Store(bucket="my-bucket")
store = SplittingReadableStore(store)
store = CachingReadableStore(store)
# First access: concurrent fetch, then cached
result1 = store.get("file.nc")
# Second access: served from cache (no fetch)
result2 = store.get("file.nc")
Custom chunk sizes:
# Larger chunks for high-bandwidth connections
store = SplittingReadableStore(
s3_store,
request_size=32 * 1024 * 1024, # 32 MB chunks
max_concurrent_requests=8,
)
__getattr__ ¶
Forward unknown attributes to the underlying store.
This ensures SplittingReadableStore is transparent for any additional public methods or attributes the underlying store may have.
Note: Private attributes (starting with '_') are not forwarded.
__init__ ¶
__init__(
store: ReadableStore,
request_size: int = 12 * 1024 * 1024,
max_concurrent_requests: int = 18,
) -> None
Create a splitting wrapper around a store.
Parameters:
-
store(ReadableStore) –Any object implementing the full read interface: Get, GetAsync, GetRange, GetRangeAsync, GetRanges, GetRangesAsync, Head, and HeadAsync.
-
request_size(int, default:12 * 1024 * 1024) –Target size for each concurrent range request. Default: 12 MB.
-
max_concurrent_requests(int, default:18) –Maximum number of concurrent requests. Default: 18.
get ¶
get(path: str, *, options: GetOptions | None = None) -> GetResult
Get file, using concurrent fetching if beneficial.
If the file is large enough to benefit from splitting, fetches via concurrent get_ranges(). Otherwise falls back to a single get() request.
get_async
async
¶
get_async(path: str, *, options: GetOptions | None = None) -> GetResultAsync
Async get, using concurrent fetching if beneficial.
get_range ¶
Get a byte range (passed through to underlying store).
get_range_async
async
¶
get_range_async(
path: str, *, start: int, end: int | None = None, length: int | None = None
) -> Buffer
Async get range (passed through to underlying store).
get_ranges ¶
get_ranges(
path: str,
*,
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
) -> Sequence[Buffer]
Get multiple byte ranges (passed through to underlying store).
get_ranges_async
async
¶
get_ranges_async(
path: str,
*,
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
) -> Sequence[Buffer]
Async get ranges (passed through to underlying store).
head_async
async
¶
head_async(path: str) -> ObjectMeta
Get file metadata async (delegates to underlying store).