import abc
import os
import warnings
from collections import defaultdict
from collections.abc import MutableMapping
from copy import copy
from string import ascii_letters, digits
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union
from zarr.meta import Metadata2, Metadata3
from zarr.util import normalize_storage_path
from zarr.context import Context
from zarr.types import ZARR_VERSION
# v2 store keys
array_meta_key = ".zarray"
group_meta_key = ".zgroup"
attrs_key = ".zattrs"
# v3 paths
meta_root = "meta/root/"
data_root = "data/root/"
DEFAULT_ZARR_VERSION: ZARR_VERSION = 2
v3_api_available = os.environ.get("ZARR_V3_EXPERIMENTAL_API", "0").lower() not in ["0", "false"]
_has_warned_about_v3 = False # to avoid printing the warning multiple times
V3_DEPRECATION_MESSAGE = (
"The {store} is deprecated and will be removed in a Zarr-Python version 3, see "
"https://github.com/zarr-developers/zarr-python/issues/1274 for more information."
)
def assert_zarr_v3_api_available():
# we issue a warning about the experimental v3 implementation when it is first used
global _has_warned_about_v3
if v3_api_available and not _has_warned_about_v3:
warnings.warn(
"The experimental Zarr V3 implementation in this version of Zarr-Python is not "
"in alignment with the final V3 specification. This version will be removed in "
"Zarr-Python 3 in favor of a spec compliant version.",
FutureWarning,
stacklevel=1,
)
_has_warned_about_v3 = True
if not v3_api_available:
raise NotImplementedError(
"# V3 reading and writing is experimental! To enable support, set:\n"
"ZARR_V3_EXPERIMENTAL_API=1"
) # pragma: no cover
class BaseStore(MutableMapping):
"""Abstract base class for store implementations.
This is a thin wrapper over MutableMapping that provides methods to check
whether a store is readable, writeable, eraseable and or listable.
Stores cannot be mutable mapping as they do have a couple of other
requirements that would break Liskov substitution principle (stores only
allow strings as keys, mutable mapping are more generic).
Having no-op base method also helps simplifying store usage and do not need
to check the presence of attributes and methods, like `close()`.
Stores can be used as context manager to make sure they close on exit.
.. added: 2.11.0
"""
_readable = True
_writeable = True
_erasable = True
_listable = True
_store_version = 2
_metadata_class = Metadata2
def is_readable(self):
return self._readable
def is_writeable(self):
return self._writeable
def is_listable(self):
return self._listable
def is_erasable(self):
return self._erasable
def __enter__(self):
if not hasattr(self, "_open_count"):
self._open_count = 0
self._open_count += 1
return self
def __exit__(self, exc_type, exc_value, traceback):
self._open_count -= 1
if self._open_count == 0:
self.close()
def close(self) -> None:
"""Do nothing by default"""
pass
def rename(self, src_path: str, dst_path: str) -> None:
if not self.is_erasable():
raise NotImplementedError(
f'{type(self)} is not erasable, cannot call "rename"'
) # pragma: no cover
_rename_from_keys(self, src_path, dst_path)
@staticmethod
def _ensure_store(store: Any):
"""
We want to make sure internally that zarr stores are always a class
with a specific interface derived from ``BaseStore``, which is slightly
different than ``MutableMapping``.
We'll do this conversion in a few places automatically
"""
from zarr.storage import KVStore # avoid circular import
if isinstance(store, BaseStore):
if not store._store_version == 2:
raise ValueError(
f"cannot initialize a v2 store with a v{store._store_version} store"
)
return store
elif isinstance(store, MutableMapping):
return KVStore(store)
else:
for attr in [
"keys",
"values",
"get",
"__setitem__",
"__getitem__",
"__delitem__",
"__contains__",
]:
if not hasattr(store, attr):
break
else:
return KVStore(store)
raise ValueError(
"Starting with Zarr 2.11.0, stores must be subclasses of "
"BaseStore, if your store exposes the MutableMapping interface "
f"wrap it in Zarr.storage.KVStore. Got {store}"
)
def getitems(
self, keys: Sequence[str], *, contexts: Mapping[str, Context]
) -> Mapping[str, Any]:
"""Retrieve data from multiple keys.
Parameters
----------
keys : Iterable[str]
The keys to retrieve
contexts: Mapping[str, Context]
A mapping of keys to their context. Each context is a mapping of store
specific information. E.g. a context could be a dict telling the store
the preferred output array type: `{"meta_array": cupy.empty(())}`
Returns
-------
Mapping
A collection mapping the input keys to their results.
Notes
-----
This default implementation uses __getitem__() to read each key sequentially and
ignores contexts. Overwrite this method to implement concurrent reads of multiple
keys and/or to utilize the contexts.
"""
return {k: self[k] for k in keys if k in self}
class Store(BaseStore):
"""Abstract store class used by implementations following the Zarr v2 spec.
Adds public `listdir`, `rename`, and `rmdir` methods on top of BaseStore.
.. added: 2.11.0
"""
def listdir(self, path: str = "") -> List[str]:
path = normalize_storage_path(path)
return _listdir_from_keys(self, path)
def rmdir(self, path: str = "") -> None:
if not self.is_erasable():
raise NotImplementedError(
f'{type(self)} is not erasable, cannot call "rmdir"'
) # pragma: no cover
path = normalize_storage_path(path)
_rmdir_from_keys(self, path)
class StoreV3(BaseStore):
_store_version = 3
_metadata_class = Metadata3
_valid_key_characters = set(ascii_letters + digits + "/.-_")
def _valid_key(self, key: str) -> bool:
"""
Verify that a key conforms to the specification.
A key is any string containing only character in the range a-z, A-Z,
0-9, or in the set /.-_ it will return True if that's the case, False
otherwise.
"""
if not isinstance(key, str) or not key.isascii():
return False
if set(key) - self._valid_key_characters:
return False
return True
def _validate_key(self, key: str):
"""
Verify that a key conforms to the v3 specification.
A key is any string containing only character in the range a-z, A-Z,
0-9, or in the set /.-_ it will return True if that's the case, False
otherwise.
In spec v3, keys can only start with the prefix meta/, data/ or be
exactly zarr.json and should not end with /. This should not be exposed
to the user, and is a store implementation detail, so this method will
raise a ValueError in that case.
"""
if not self._valid_key(key):
raise ValueError(
f"Keys must be ascii strings and may only contain the "
f"characters {''.join(sorted(self._valid_key_characters))}"
)
if (
not key.startswith(("data/", "meta/"))
and key != "zarr.json"
# TODO: Possibly allow key == ".zmetadata" too if we write a
# consolidated metadata spec corresponding to this?
):
raise ValueError(f"key starts with unexpected value: `{key}`")
if key.endswith("/"):
raise ValueError("keys may not end in /")
def list_prefix(self, prefix):
if prefix.startswith("/"):
raise ValueError("prefix must not begin with /")
# TODO: force prefix to end with /?
return [k for k in self.list() if k.startswith(prefix)]
def erase(self, key):
self.__delitem__(key)
def erase_prefix(self, prefix):
assert prefix.endswith("/")
if prefix == "/":
all_keys = self.list()
else:
all_keys = self.list_prefix(prefix)
for key in all_keys:
self.erase(key)
def list_dir(self, prefix):
"""
TODO: carefully test this with trailing/leading slashes
"""
if prefix: # allow prefix = "" ?
assert prefix.endswith("/")
all_keys = self.list_prefix(prefix)
len_prefix = len(prefix)
keys = []
prefixes = []
for k in all_keys:
trail = k[len_prefix:]
if "/" not in trail:
keys.append(prefix + trail)
else:
prefixes.append(prefix + trail.split("/", maxsplit=1)[0] + "/")
return keys, list(set(prefixes))
def list(self):
return list(self.keys())
def __contains__(self, key):
return key in self.list()
@abc.abstractmethod
def __setitem__(self, key, value):
"""Set a value."""
@abc.abstractmethod
def __getitem__(self, key):
"""Get a value."""
@abc.abstractmethod
def rmdir(self, path=None):
"""Remove a data path and all its subkeys and related metadata.
Expects a path without the data or meta root prefix."""
@property
def supports_efficient_get_partial_values(self):
return False
def get_partial_values(
self, key_ranges: Sequence[Tuple[str, Tuple[int, Optional[int]]]]
) -> List[Union[bytes, memoryview, bytearray]]:
"""Get multiple partial values.
key_ranges can be an iterable of key, range pairs,
where a range specifies two integers range_start and range_length
as a tuple, (range_start, range_length).
range_length may be None to indicate to read until the end.
range_start may be negative to start reading range_start bytes
from the end of the file.
A key may occur multiple times with different ranges.
Inserts None for missing keys into the returned list."""
results: List[Union[bytes, memoryview, bytearray]] = [None] * len(key_ranges) # type: ignore[list-item] # noqa: E501
indexed_ranges_by_key: Dict[str, List[Tuple[int, Tuple[int, Optional[int]]]]] = defaultdict(
list
)
for i, (key, range_) in enumerate(key_ranges):
indexed_ranges_by_key[key].append((i, range_))
for key, indexed_ranges in indexed_ranges_by_key.items():
try:
value = self[key]
except KeyError: # pragma: no cover
continue
for i, (range_from, range_length) in indexed_ranges:
if range_length is None:
results[i] = value[range_from:]
else:
results[i] = value[range_from : range_from + range_length]
return results
def supports_efficient_set_partial_values(self):
return False
def set_partial_values(self, key_start_values):
"""Set multiple partial values.
key_start_values can be an iterable of key, start and value triplets
as tuples, (key, start, value), where start defines the offset in bytes.
A key may occur multiple times with different starts and non-overlapping values.
Also, start may only be beyond the current value if other values fill the gap.
start may be negative to start writing start bytes from the current
end of the file, ending the file with the new value."""
unique_keys = set(next(zip(*key_start_values)))
values = {}
for key in unique_keys:
old_value = self.get(key)
values[key] = None if old_value is None else bytearray(old_value)
for key, start, value in key_start_values:
if values[key] is None:
assert start == 0
values[key] = value
else:
if start > len(values[key]): # pragma: no cover
raise ValueError(
f"Cannot set value at start {start}, "
+ f"since it is beyond the data at key {key}, "
+ f"having length {len(values[key])}."
)
if start < 0:
values[key][start:] = value
else:
values[key][start : start + len(value)] = value
for key, value in values.items():
self[key] = value
def clear(self):
"""Remove all items from store."""
self.erase_prefix("/")
def __eq__(self, other):
return NotImplemented
@staticmethod
def _ensure_store(store):
"""
We want to make sure internally that zarr stores are always a class
with a specific interface derived from ``Store``, which is slightly
different than ``MutableMapping``.
We'll do this conversion in a few places automatically
"""
from zarr._storage.v3 import KVStoreV3 # avoid circular import
if store is None:
return None
elif isinstance(store, StoreV3):
return store
elif isinstance(store, Store):
raise ValueError(f"cannot initialize a v3 store with a v{store._store_version} store")
elif isinstance(store, MutableMapping):
return KVStoreV3(store)
else:
for attr in [
"keys",
"values",
"get",
"__setitem__",
"__getitem__",
"__delitem__",
"__contains__",
]:
if not hasattr(store, attr):
break
else:
return KVStoreV3(store)
raise ValueError(
"v3 stores must be subclasses of StoreV3, "
"if your store exposes the MutableMapping interface wrap it in "
f"Zarr.storage.KVStoreV3. Got {store}"
)
# allow MutableMapping for backwards compatibility
StoreLike = Union[BaseStore, MutableMapping]
def _path_to_prefix(path: Optional[str]) -> str:
# assume path already normalized
if path:
prefix = path + "/"
else:
prefix = ""
return prefix
def _get_hierarchy_metadata(store: StoreV3) -> Mapping[str, Any]:
version = getattr(store, "_store_version", 2)
if version < 3:
raise ValueError("zarr.json hierarchy metadata not stored for " f"zarr v{version} stores")
if "zarr.json" not in store:
raise ValueError("zarr.json metadata not found in store")
return store._metadata_class.decode_hierarchy_metadata(store["zarr.json"])
def _get_metadata_suffix(store: StoreV3) -> str:
if "zarr.json" in store:
return _get_hierarchy_metadata(store)["metadata_key_suffix"]
return ".json"
def _rename_metadata_v3(store: StoreV3, src_path: str, dst_path: str) -> bool:
"""Rename source or group metadata file associated with src_path."""
any_renamed = False
sfx = _get_metadata_suffix(store)
src_path = src_path.rstrip("/")
dst_path = dst_path.rstrip("/")
_src_array_json = meta_root + src_path + ".array" + sfx
if _src_array_json in store:
new_key = meta_root + dst_path + ".array" + sfx
store[new_key] = store.pop(_src_array_json)
any_renamed = True
_src_group_json = meta_root + src_path + ".group" + sfx
if _src_group_json in store:
new_key = meta_root + dst_path + ".group" + sfx
store[new_key] = store.pop(_src_group_json)
any_renamed = True
return any_renamed
def _rename_from_keys(store: BaseStore, src_path: str, dst_path: str) -> None:
# assume path already normalized
src_prefix = _path_to_prefix(src_path)
dst_prefix = _path_to_prefix(dst_path)
version = getattr(store, "_store_version", 2)
if version == 2:
for key in list(store.keys()):
if key.startswith(src_prefix):
new_key = dst_prefix + key.lstrip(src_prefix)
store[new_key] = store.pop(key)
else:
any_renamed = False
for root_prefix in [meta_root, data_root]:
_src_prefix = root_prefix + src_prefix
_dst_prefix = root_prefix + dst_prefix
for key in store.list_prefix(_src_prefix): # type: ignore
new_key = _dst_prefix + key[len(_src_prefix) :]
store[new_key] = store.pop(key)
any_renamed = True
any_meta_renamed = _rename_metadata_v3(store, src_path, dst_path) # type: ignore
any_renamed = any_meta_renamed or any_renamed
if not any_renamed:
raise ValueError(f"no item {src_path} found to rename")
def _rmdir_from_keys(store: StoreLike, path: Optional[str] = None) -> None:
# assume path already normalized
prefix = _path_to_prefix(path)
for key in list(store.keys()):
if key.startswith(prefix):
del store[key]
def _rmdir_from_keys_v3(store: StoreV3, path: str = "") -> None:
meta_dir = meta_root + path
meta_dir = meta_dir.rstrip("/")
_rmdir_from_keys(store, meta_dir)
# remove data folder
data_dir = data_root + path
data_dir = data_dir.rstrip("/")
_rmdir_from_keys(store, data_dir)
# remove metadata files
sfx = _get_metadata_suffix(store)
array_meta_file = meta_dir + ".array" + sfx
if array_meta_file in store:
store.erase(array_meta_file)
group_meta_file = meta_dir + ".group" + sfx
if group_meta_file in store:
store.erase(group_meta_file)
def _listdir_from_keys(store: BaseStore, path: Optional[str] = None) -> List[str]:
# assume path already normalized
prefix = _path_to_prefix(path)
children = set()
for key in list(store.keys()):
if key.startswith(prefix) and len(key) > len(prefix):
suffix = key[len(prefix) :]
child = suffix.split("/")[0]
children.add(child)
return sorted(children)
def _prefix_to_array_key(store: StoreLike, prefix: str) -> str:
if getattr(store, "_store_version", 2) == 3:
sfx = _get_metadata_suffix(store) # type: ignore
if prefix:
key = meta_root + prefix.rstrip("/") + ".array" + sfx
else:
key = meta_root[:-1] + ".array" + sfx
else:
key = prefix + array_meta_key
return key
def _prefix_to_group_key(store: StoreLike, prefix: str) -> str:
if getattr(store, "_store_version", 2) == 3:
sfx = _get_metadata_suffix(store) # type: ignore
if prefix:
key = meta_root + prefix.rstrip("/") + ".group" + sfx
else:
key = meta_root[:-1] + ".group" + sfx
else:
key = prefix + group_meta_key
return key
def _prefix_to_attrs_key(store: StoreLike, prefix: str) -> str:
if getattr(store, "_store_version", 2) == 3:
# for v3, attributes are stored in the array metadata
sfx = _get_metadata_suffix(store) # type: ignore
if prefix:
key = meta_root + prefix.rstrip("/") + ".array" + sfx
else:
key = meta_root[:-1] + ".array" + sfx
else:
key = prefix + attrs_key
return key