Skip to content

Registry

obspec_utils.registry.ObjectStoreRegistry

Bases: Generic[T]

A generic registry that maps URLs to object stores.

The registry is parameterized by the store type T, which must implement at least Get. Downstream libraries can specify stricter protocol requirements by using a more specific type parameter.

The registry can be used as an async context manager to automatically manage the lifecycle of stores that support it (like AiohttpStore). Stores that don't implement the async context manager protocol (like obstore's S3Store) are unaffected.

Examples:

Basic usage with obstore:

from obstore.store import S3Store
from obspec_utils.registry import ObjectStoreRegistry

registry = ObjectStoreRegistry({
    "s3://my-bucket": S3Store(bucket="my-bucket"),
})
store, path = registry.resolve("s3://my-bucket/file.nc")

Using with a specific protocol for type safety:

from typing import Protocol
from obspec import List, ListAsync, Head, HeadAsync
from obspec_utils.registry import ObjectStoreRegistry

class ZarrProtocol(List, ListAsync, Head, HeadAsync, Protocol):
    '''Protocol for Zarr chunk discovery.'''

registry: ObjectStoreRegistry[ZarrProtocol] = ObjectStoreRegistry({
    "s3://my-bucket": s3_store,
})
store, path = registry.resolve("s3://my-bucket/data.zarr")
store.list(path)  # Type checker knows this is valid

Using as an async context manager:

from obspec_utils.aiohttp import AiohttpStore

registry = ObjectStoreRegistry({
    "https://example.com": AiohttpStore("https://example.com"),
})

async with registry:
    store, path = registry.resolve("https://example.com/file.nc")
    data = await store.get_range_async(path, start=0, end=1000)
# AiohttpStore session is closed automatically

__aenter__ async

__aenter__() -> 'ObjectStoreRegistry[T]'

Enter the async context manager, opening all stores that support it.

Stores that implement the async context manager protocol (like AiohttpStore) will have their sessions initialized. Stores that don't support it (like obstore's S3Store) are unaffected.

Examples:

from obstore.store import S3Store
from obspec_utils.registry import ObjectStoreRegistry
from obspec_utils.aiohttp import AiohttpStore

registry = ObjectStoreRegistry({
    "s3://my-bucket": S3Store(bucket="my-bucket"),
    "https://example.com": AiohttpStore("https://example.com"),
})

async with registry:
    # S3Store works as-is, AiohttpStore session is opened
    store, path = registry.resolve("https://example.com/file.nc")
    data = await store.get_range_async(path, start=0, end=1000)
# AiohttpStore session is closed

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb) -> None

Exit the async context manager, closing all stores that support it.

Stores that implement the async context manager protocol will have their resources cleaned up. Stores that don't support it are unaffected.

__init__

__init__(stores: dict[Url, T] | None = None) -> None

Create a new store registry.

The registry accepts any object that implements at least Get. For stricter type checking, parameterize the registry with a more specific protocol type.

Parameters:

  • stores (dict[Url, T] | None, default: None ) –

    Mapping of URLs to stores to register.

Examples:

from obstore.store import S3Store
from obspec_utils.registry import ObjectStoreRegistry

s3store = S3Store(bucket="my-bucket-1", prefix="orig-path")
reg = ObjectStoreRegistry({"s3://my-bucket-1": s3store})

ret, path = reg.resolve("s3://my-bucket-1/orig-path/group/my-file.nc")
assert path == "group/my-file.nc"
assert ret is s3store

register

register(url: Url, store: T) -> None

Register a new store for the provided URL.

If a store with the same URL existed before, it is replaced.

Parameters:

  • url (Url) –

    URL to register the store under.

  • store (T) –

    Any object implementing at least Get.

Examples:

from obstore.store import S3Store
from obspec_utils.registry import ObjectStoreRegistry

reg = ObjectStoreRegistry()
orig_store = S3Store(bucket="my-bucket-1", prefix="orig-path")
reg.register("s3://my-bucket-1", orig_store)

new_store = S3Store(bucket="my-bucket-1", prefix="updated-path")
reg.register("s3://my-bucket-1", new_store)

resolve

resolve(url: Url) -> tuple[T, Path]

Resolve a URL within the ObjectStoreRegistry.

If ObjectStoreRegistry.register has been called with a URL with the same scheme and authority/netloc as the object URL, and a path that is a prefix of the provided url's, it is returned along with the trailing path. Paths are matched on a path segment basis, and in the event of multiple possibilities the longest path match is used.

Parameters:

Returns:

  • T

    The store registered at the resolved url.

  • Path

    The trailing portion of the url after the prefix of the matching store in the ObjectStoreRegistry.

Raises:

  • ValueError

    If the URL cannot be resolved, meaning that ObjectStoreRegistry.register has not been called with a URL with the same scheme and authority/netloc as the object URL, and a path that is a prefix of the provided url's.

Examples:

from obstore.store import MemoryStore, S3Store
from obspec_utils.registry import ObjectStoreRegistry

registry = ObjectStoreRegistry()
memstore1 = MemoryStore()
registry.register("s3://bucket1", memstore1)
url = "s3://bucket1/path/to/object"
ret, path = registry.resolve(url)
assert path == "path/to/object"
assert ret is memstore1
print(f"Resolved url: `{url}` to store: `{ret}` and path: `{path}`")

Resolved url: s3://bucket1/path/to/object to store: <obstore.store.MemoryStore object at 0x7790730ce910> and path: path/to/object

memstore2 = MemoryStore()
base = "https://s3.region.amazonaws.com/bucket"
registry.register(base, memstore2)

url = "https://s3.region.amazonaws.com/bucket/path/to/object"
ret, path = registry.resolve(url)
assert path == "bucket/path/to/object"
assert ret is memstore2
print(f"Resolved url: `{url}` to store: `{ret}` and path: `{path}`")

Resolved url: https://s3.region.amazonaws.com/bucket/path/to/object to store: <obstore.store.MemoryStore object at 0x779075f9f950> and path: bucket/path/to/object

s3store = S3Store(bucket = "my-bucket", prefix="my-data/prefix/")
registry.register("s3://my-bucket", s3store)
ret, path = registry.resolve("s3://my-bucket/my-data/prefix/my-file.nc")
assert path == "my-file.nc"
assert ret is s3store

obspec_utils.registry.UrlKey module-attribute

UrlKey = namedtuple('UrlKey', ['scheme', 'netloc'])

A named tuple containing a URL's scheme and authority/netloc.

Used as the primary key in ObjectStoreRegistry.map.

Attributes:

  • scheme

    The URL scheme (e.g., 's3', 'https', 'file').

  • netloc

    The network location/authority (e.g., 'bucket-name', 'example.com').