Source code for zarr.storage._common

from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Self

from zarr.abc.store import ByteRequest, Store
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.common import ZARR_JSON, ZARRAY_JSON, ZGROUP_JSON, AccessModeLiteral, ZarrFormat
from zarr.errors import ContainsArrayAndGroupError, ContainsArrayError, ContainsGroupError
from zarr.storage._local import LocalStore
from zarr.storage._memory import MemoryStore
from zarr.storage._utils import normalize_path

if TYPE_CHECKING:
    from zarr.core.buffer import BufferPrototype


def _dereference_path(root: str, path: str) -> str:
    assert isinstance(root, str)
    assert isinstance(path, str)
    root = root.rstrip("/")
    path = f"{root}/{path}" if root else path
    return path.rstrip("/")


[docs] class StorePath: """ Path-like interface for a Store. Parameters ---------- store : Store The store to use. path : str The path within the store. """ store: Store path: str def __init__(self, store: Store, path: str = "") -> None: self.store = store self.path = normalize_path(path) @property def read_only(self) -> bool: return self.store.read_only
[docs] @classmethod async def open(cls, store: Store, path: str, mode: AccessModeLiteral | None = None) -> Self: """ Open StorePath based on the provided mode. * If the mode is 'w-' and the StorePath contains keys, raise a FileExistsError. * If the mode is 'w', delete all keys nested within the StorePath * If the mode is 'a', 'r', or 'r+', do nothing Parameters ---------- mode : AccessModeLiteral The mode to use when initializing the store path. Raises ------ FileExistsError If the mode is 'w-' and the store path already exists. ValueError If the mode is not "r" and the store is read-only, or if the mode is "r" and the store is not read-only. """ await store._ensure_open() self = cls(store, path) # fastpath if mode is None if mode is None: return self if store.read_only and mode != "r": raise ValueError(f"Store is read-only but mode is '{mode}'") if not store.read_only and mode == "r": raise ValueError(f"Store is not read-only but mode is '{mode}'") match mode: case "w-": if not await self.is_empty(): msg = ( f"{self} is not empty, but `mode` is set to 'w-'." "Either remove the existing objects in storage," "or set `mode` to a value that handles pre-existing objects" "in storage, like `a` or `w`." ) raise FileExistsError(msg) case "w": await self.delete_dir() case "a" | "r" | "r+": # No init action pass case _: raise ValueError(f"Invalid mode: {mode}") return self
[docs] async def get( self, prototype: BufferPrototype | None = None, byte_range: ByteRequest | None = None, ) -> Buffer | None: """ Read bytes from the store. Parameters ---------- prototype : BufferPrototype, optional The buffer prototype to use when reading the bytes. byte_range : ByteRequest, optional The range of bytes to read. Returns ------- buffer : Buffer or None The read bytes, or None if the key does not exist. """ if prototype is None: prototype = default_buffer_prototype() return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
[docs] async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None: """ Write bytes to the store. Parameters ---------- value : Buffer The buffer to write. byte_range : ByteRequest, optional The range of bytes to write. If None, the entire buffer is written. Raises ------ NotImplementedError If `byte_range` is not None, because Store.set does not support partial writes yet. """ if byte_range is not None: raise NotImplementedError("Store.set does not have partial writes yet") await self.store.set(self.path, value)
[docs] async def delete(self) -> None: """ Delete the key from the store. Raises ------ NotImplementedError If the store does not support deletion. """ await self.store.delete(self.path)
[docs] async def delete_dir(self) -> None: """ Delete all keys with the given prefix from the store. """ await self.store.delete_dir(self.path)
[docs] async def set_if_not_exists(self, default: Buffer) -> None: """ Store a key to ``value`` if the key is not already present. Parameters ---------- default : Buffer The buffer to store if the key is not already present. """ await self.store.set_if_not_exists(self.path, default)
[docs] async def exists(self) -> bool: """ Check if the key exists in the store. Returns ------- bool True if the key exists in the store, False otherwise. """ return await self.store.exists(self.path)
[docs] async def is_empty(self) -> bool: """ Check if any keys exist in the store with the given prefix. Returns ------- bool True if no keys exist in the store with the given prefix, False otherwise. """ return await self.store.is_empty(self.path)
def __truediv__(self, other: str) -> StorePath: """Combine this store path with another path""" return self.__class__(self.store, _dereference_path(self.path, other)) def __str__(self) -> str: return _dereference_path(str(self.store), self.path) def __repr__(self) -> str: return f"StorePath({self.store.__class__.__name__}, '{self}')" def __eq__(self, other: object) -> bool: """ Check if two StorePath objects are equal. Returns ------- bool True if the two objects are equal, False otherwise. Notes ----- Two StorePath objects are considered equal if their stores are equal and their paths are equal. """ try: return self.store == other.store and self.path == other.path # type: ignore[attr-defined, no-any-return] except Exception: pass return False
StoreLike = Store | StorePath | Path | str | dict[str, Buffer] async def make_store_path( store_like: StoreLike | None, *, path: str | None = "", mode: AccessModeLiteral | None = None, storage_options: dict[str, Any] | None = None, ) -> StorePath: """ Convert a `StoreLike` object into a StorePath object. This function takes a `StoreLike` object and returns a `StorePath` object. The `StoreLike` object can be a `Store`, `StorePath`, `Path`, `str`, or `dict[str, Buffer]`. If the `StoreLike` object is a Store or `StorePath`, it is converted to a `StorePath` object. If the `StoreLike` object is a Path or str, it is converted to a LocalStore object and then to a `StorePath` object. If the `StoreLike` object is a dict[str, Buffer], it is converted to a `MemoryStore` object and then to a `StorePath` object. If the `StoreLike` object is None, a `MemoryStore` object is created and converted to a `StorePath` object. If the `StoreLike` object is a str and starts with a protocol, it is converted to a RemoteStore object and then to a `StorePath` object. If the `StoreLike` object is a dict[str, Buffer] and the mode is not None, the `MemoryStore` object is created with the given mode. If the `StoreLike` object is a str and starts with a protocol, the RemoteStore object is created with the given mode and storage options. Parameters ---------- store_like : StoreLike | None The object to convert to a `StorePath` object. path : str | None, optional The path to use when creating the `StorePath` object. If None, the default path is the empty string. mode : StoreAccessMode | None, optional The mode to use when creating the `StorePath` object. If None, the default mode is 'r'. storage_options : dict[str, Any] | None, optional The storage options to use when creating the `RemoteStore` object. If None, the default storage options are used. Returns ------- StorePath The converted StorePath object. Raises ------ TypeError If the StoreLike object is not one of the supported types. """ from zarr.storage._fsspec import FsspecStore # circular import used_storage_options = False path_normalized = normalize_path(path) if isinstance(store_like, StorePath): result = store_like / path_normalized else: assert mode in (None, "r", "r+", "a", "w", "w-") # if mode 'r' was provided, we'll open any new stores as read-only _read_only = mode == "r" if isinstance(store_like, Store): store = store_like elif store_like is None: store = await MemoryStore.open(read_only=_read_only) elif isinstance(store_like, Path): store = await LocalStore.open(root=store_like, read_only=_read_only) elif isinstance(store_like, str): storage_options = storage_options or {} if _is_fsspec_uri(store_like): used_storage_options = True store = FsspecStore.from_url( store_like, storage_options=storage_options, read_only=_read_only ) else: store = await LocalStore.open(root=Path(store_like), read_only=_read_only) elif isinstance(store_like, dict): # We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings. # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. store = await MemoryStore.open(store_dict=store_like, read_only=_read_only) else: msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable] raise TypeError(msg) result = await StorePath.open(store, path=path_normalized, mode=mode) if storage_options and not used_storage_options: msg = "'storage_options' was provided but unused. 'storage_options' is only used for fsspec filesystem stores." raise TypeError(msg) return result def _is_fsspec_uri(uri: str) -> bool: """ Check if a URI looks like a non-local fsspec URI. Examples -------- >>> _is_fsspec_uri("s3://bucket") True >>> _is_fsspec_uri("my-directory") False >>> _is_fsspec_uri("local://my-directory") False """ return "://" in uri or ("::" in uri and "local://" not in uri) async def ensure_no_existing_node(store_path: StorePath, zarr_format: ZarrFormat) -> None: """ Check if a store_path is safe for array / group creation. Returns `None` or raises an exception. Parameters ---------- store_path : StorePath The storage location to check. zarr_format : ZarrFormat The Zarr format to check. Raises ------ ContainsArrayError, ContainsGroupError, ContainsArrayAndGroupError """ if zarr_format == 2: extant_node = await _contains_node_v2(store_path) elif zarr_format == 3: extant_node = await _contains_node_v3(store_path) if extant_node == "array": raise ContainsArrayError(store_path.store, store_path.path) elif extant_node == "group": raise ContainsGroupError(store_path.store, store_path.path) elif extant_node == "nothing": return msg = f"Invalid value for extant_node: {extant_node}" # type: ignore[unreachable] raise ValueError(msg) async def _contains_node_v3(store_path: StorePath) -> Literal["array", "group", "nothing"]: """ Check if a store_path contains nothing, an array, or a group. This function returns the string "array", "group", or "nothing" to denote containing an array, a group, or nothing. Parameters ---------- store_path : StorePath The location in storage to check. Returns ------- Literal["array", "group", "nothing"] A string representing the zarr node found at store_path. """ result: Literal["array", "group", "nothing"] = "nothing" extant_meta_bytes = await (store_path / ZARR_JSON).get() # if no metadata document could be loaded, then we just return "nothing" if extant_meta_bytes is not None: try: extant_meta_json = json.loads(extant_meta_bytes.to_bytes()) # avoid constructing a full metadata document here in the name of speed. if extant_meta_json["node_type"] == "array": result = "array" elif extant_meta_json["node_type"] == "group": result = "group" except (KeyError, json.JSONDecodeError): # either of these errors is consistent with no array or group present. pass return result async def _contains_node_v2(store_path: StorePath) -> Literal["array", "group", "nothing"]: """ Check if a store_path contains nothing, an array, a group, or both. If both an array and a group are detected, a `ContainsArrayAndGroup` exception is raised. Otherwise, this function returns the string "array", "group", or "nothing" to denote containing an array, a group, or nothing. Parameters ---------- store_path : StorePath The location in storage to check. Returns ------- Literal["array", "group", "nothing"] A string representing the zarr node found at store_path. """ _array = await contains_array(store_path=store_path, zarr_format=2) _group = await contains_group(store_path=store_path, zarr_format=2) if _array and _group: raise ContainsArrayAndGroupError(store_path.store, store_path.path) elif _array: return "array" elif _group: return "group" else: return "nothing" async def contains_array(store_path: StorePath, zarr_format: ZarrFormat) -> bool: """ Check if an array exists at a given StorePath. Parameters ---------- store_path : StorePath The StorePath to check for an existing group. zarr_format : The zarr format to check for. Returns ------- bool True if the StorePath contains a group, False otherwise. """ if zarr_format == 3: extant_meta_bytes = await (store_path / ZARR_JSON).get() if extant_meta_bytes is None: return False else: try: extant_meta_json = json.loads(extant_meta_bytes.to_bytes()) # we avoid constructing a full metadata document here in the name of speed. if extant_meta_json["node_type"] == "array": return True except (ValueError, KeyError): return False elif zarr_format == 2: return await (store_path / ZARRAY_JSON).exists() msg = f"Invalid zarr_format provided. Got {zarr_format}, expected 2 or 3" raise ValueError(msg) async def contains_group(store_path: StorePath, zarr_format: ZarrFormat) -> bool: """ Check if a group exists at a given StorePath. Parameters ---------- store_path : StorePath The StorePath to check for an existing group. zarr_format : The zarr format to check for. Returns ------- bool True if the StorePath contains a group, False otherwise """ if zarr_format == 3: extant_meta_bytes = await (store_path / ZARR_JSON).get() if extant_meta_bytes is None: return False else: try: extant_meta_json = json.loads(extant_meta_bytes.to_bytes()) # we avoid constructing a full metadata document here in the name of speed. result: bool = extant_meta_json["node_type"] == "group" except (ValueError, KeyError): return False else: return result elif zarr_format == 2: return await (store_path / ZGROUP_JSON).exists() msg = f"Invalid zarr_format provided. Got {zarr_format}, expected 2 or 3" # type: ignore[unreachable] raise ValueError(msg)