"""This module contains storage classes for use with Zarr arrays and groups.
Note that any object implementing the :class:`MutableMapping` interface from the
:mod:`collections` module in the Python standard library can be used as a Zarr
array store, as long as it accepts string (str) keys and bytes values.
In addition to the :class:`MutableMapping` interface, store classes may also implement
optional methods `listdir` (list members of a "directory") and `rmdir` (remove all
members of a "directory"). These methods should be implemented if the store class is
aware of the hierarchical organisation of resources within the store and can provide
efficient implementations. If these methods are not available, Zarr will fall back to
slower implementations that work via the :class:`MutableMapping` interface. Store
classes may also optionally implement a `rename` method (rename all members under a given
path) and a `getsize` method (return the size in bytes of a given value).
"""
import atexit
import errno
import glob
import multiprocessing
import operator
import os
import re
import shutil
import sys
import tempfile
import warnings
import zipfile
from collections import OrderedDict
from collections.abc import MutableMapping
from os import scandir
from pickle import PicklingError
from threading import Lock, RLock
from typing import Optional, Union, List, Tuple, Dict, Any
import uuid
import time
from numcodecs.abc import Codec
from numcodecs.compat import (
ensure_bytes,
ensure_text,
ensure_contiguous_ndarray_like
)
from numcodecs.registry import codec_registry
from zarr.errors import (
MetadataError,
BadCompressorError,
ContainsArrayError,
ContainsGroupError,
FSPathExistNotDir,
ReadOnlyError,
)
from zarr.meta import encode_array_metadata, encode_group_metadata
from zarr.util import (buffer_size, json_loads, nolock, normalize_chunks,
normalize_dimension_separator,
normalize_dtype, normalize_fill_value, normalize_order,
normalize_shape, normalize_storage_path, retry_call
)
from zarr._storage.absstore import ABSStore # noqa: F401
from zarr._storage.store import (_get_hierarchy_metadata, # noqa: F401
_get_metadata_suffix,
_listdir_from_keys,
_rename_from_keys,
_rename_metadata_v3,
_rmdir_from_keys,
_rmdir_from_keys_v3,
_path_to_prefix,
_prefix_to_array_key,
_prefix_to_group_key,
array_meta_key,
attrs_key,
data_root,
group_meta_key,
meta_root,
DEFAULT_ZARR_VERSION,
BaseStore,
Store)
__doctest_requires__ = {
('RedisStore', 'RedisStore.*'): ['redis'],
('MongoDBStore', 'MongoDBStore.*'): ['pymongo'],
('LRUStoreCache', 'LRUStoreCache.*'): ['s3fs'],
}
try:
# noinspection PyUnresolvedReferences
from zarr.codecs import Blosc
default_compressor = Blosc()
except ImportError: # pragma: no cover
from zarr.codecs import Zlib
default_compressor = Zlib()
Path = Union[str, bytes, None]
# allow MutableMapping for backwards compatibility
StoreLike = Union[BaseStore, MutableMapping]
[docs]def contains_array(store: StoreLike, path: Path = None) -> bool:
"""Return True if the store contains an array at the given logical path."""
path = normalize_storage_path(path)
prefix = _path_to_prefix(path)
key = _prefix_to_array_key(store, prefix)
return key in store
[docs]def contains_group(store: StoreLike, path: Path = None, explicit_only=True) -> bool:
"""Return True if the store contains a group at the given logical path."""
path = normalize_storage_path(path)
prefix = _path_to_prefix(path)
key = _prefix_to_group_key(store, prefix)
store_version = getattr(store, '_store_version', 2)
if store_version == 2 or explicit_only:
return key in store
else:
if key in store:
return True
# for v3, need to also handle implicit groups
sfx = _get_metadata_suffix(store) # type: ignore
implicit_prefix = key.replace('.group' + sfx, '')
if not implicit_prefix.endswith('/'):
implicit_prefix += '/'
if store.list_prefix(implicit_prefix): # type: ignore
return True
return False
def _normalize_store_arg_v2(store: Any, storage_options=None, mode="r") -> BaseStore:
# default to v2 store for backward compatibility
zarr_version = getattr(store, '_store_version', 2)
if zarr_version != 2:
raise ValueError("store must be a version 2 store")
if store is None:
store = KVStore(dict())
return store
if isinstance(store, os.PathLike):
store = os.fspath(store)
if isinstance(store, str):
if "://" in store or "::" in store:
return FSStore(store, mode=mode, **(storage_options or {}))
elif storage_options:
raise ValueError("storage_options passed with non-fsspec path")
if store.endswith('.zip'):
return ZipStore(store, mode=mode)
elif store.endswith('.n5'):
from zarr.n5 import N5Store
return N5Store(store)
else:
return DirectoryStore(store)
else:
store = Store._ensure_store(store)
return store
def normalize_store_arg(store: Any, storage_options=None, mode="r", *,
zarr_version=None) -> BaseStore:
if zarr_version is None:
# default to v2 store for backward compatibility
zarr_version = getattr(store, "_store_version", DEFAULT_ZARR_VERSION)
elif zarr_version not in [2, 3]:
raise ValueError("zarr_version must be either 2 or 3")
if zarr_version == 2:
normalize_store = _normalize_store_arg_v2
elif zarr_version == 3:
from zarr._storage.v3 import _normalize_store_arg_v3
normalize_store = _normalize_store_arg_v3
return normalize_store(store, storage_options, mode)
[docs]def rmdir(store: StoreLike, path: Path = None):
"""Remove all items under the given path. If `store` provides a `rmdir` method,
this will be called, otherwise will fall back to implementation via the
`Store` interface."""
path = normalize_storage_path(path)
store_version = getattr(store, '_store_version', 2)
if hasattr(store, "rmdir") and store.is_erasable(): # type: ignore
# pass through
store.rmdir(path) # type: ignore
else:
# slow version, delete one key at a time
if store_version == 2:
_rmdir_from_keys(store, path)
else:
_rmdir_from_keys_v3(store, path) # type: ignore
[docs]def rename(store: Store, src_path: Path, dst_path: Path):
"""Rename all items under the given path. If `store` provides a `rename` method,
this will be called, otherwise will fall back to implementation via the
`Store` interface."""
src_path = normalize_storage_path(src_path)
dst_path = normalize_storage_path(dst_path)
if hasattr(store, 'rename'):
# pass through
store.rename(src_path, dst_path)
else:
# slow version, delete one key at a time
_rename_from_keys(store, src_path, dst_path)
[docs]def listdir(store: BaseStore, path: Path = None):
"""Obtain a directory listing for the given path. If `store` provides a `listdir`
method, this will be called, otherwise will fall back to implementation via the
`MutableMapping` interface."""
path = normalize_storage_path(path)
if hasattr(store, 'listdir'):
# pass through
return store.listdir(path) # type: ignore
else:
# slow version, iterate through all keys
warnings.warn(
f"Store {store} has no `listdir` method. From zarr 2.9 onwards "
"may want to inherit from `Store`.",
stacklevel=2,
)
return _listdir_from_keys(store, path)
def _getsize(store: BaseStore, path: Path = None) -> int:
# compute from size of values
if path and path in store:
v = store[path]
size = buffer_size(v)
else:
path = '' if path is None else normalize_storage_path(path)
size = 0
store_version = getattr(store, '_store_version', 2)
if store_version == 3:
if path == '':
# have to list the root folders without trailing / in this case
members = store.list_prefix(data_root.rstrip('/')) # type: ignore
members += store.list_prefix(meta_root.rstrip('/')) # type: ignore
else:
members = store.list_prefix(data_root + path) # type: ignore
members += store.list_prefix(meta_root + path) # type: ignore
# also include zarr.json?
# members += ['zarr.json']
else:
members = listdir(store, path)
prefix = _path_to_prefix(path)
members = [prefix + k for k in members]
for k in members:
try:
v = store[k]
except KeyError:
pass
else:
try:
size += buffer_size(v)
except TypeError:
return -1
return size
[docs]def getsize(store: BaseStore, path: Path = None) -> int:
"""Compute size of stored items for a given path. If `store` provides a `getsize`
method, this will be called, otherwise will return -1."""
if hasattr(store, 'getsize'):
# pass through
path = normalize_storage_path(path)
return store.getsize(path) # type: ignore
elif isinstance(store, MutableMapping):
return _getsize(store, path)
else:
return -1
def _require_parent_group(
path: Optional[str],
store: StoreLike,
chunk_store: Optional[StoreLike],
overwrite: bool,
):
# assume path is normalized
if path:
segments = path.split('/')
for i in range(len(segments)):
p = '/'.join(segments[:i])
if contains_array(store, p):
_init_group_metadata(store, path=p, chunk_store=chunk_store,
overwrite=overwrite)
elif not contains_group(store, p):
_init_group_metadata(store, path=p, chunk_store=chunk_store)
[docs]def init_array(
store: StoreLike,
shape: Tuple[int, ...],
chunks: Union[bool, int, Tuple[int, ...]] = True,
dtype=None,
compressor="default",
fill_value=None,
order: str = "C",
overwrite: bool = False,
path: Optional[Path] = None,
chunk_store: Optional[StoreLike] = None,
filters=None,
object_codec=None,
dimension_separator=None,
):
"""Initialize an array store with the given configuration. Note that this is a low-level
function and there should be no need to call this directly from user code.
Parameters
----------
store : Store
A mapping that supports string keys and bytes-like values.
shape : int or tuple of ints
Array shape.
chunks : bool, int or tuple of ints, optional
Chunk shape. If True, will be guessed from `shape` and `dtype`. If
False, will be set to `shape`, i.e., single chunk for the whole array.
dtype : string or dtype, optional
NumPy dtype.
compressor : Codec, optional
Primary compressor.
fill_value : object
Default value to use for uninitialized portions of the array.
order : {'C', 'F'}, optional
Memory layout to be used within each chunk.
overwrite : bool, optional
If True, erase all data in `store` prior to initialisation.
path : string, bytes, optional
Path under which array is stored.
chunk_store : Store, optional
Separate storage for chunks. If not provided, `store` will be used
for storage of both chunks and metadata.
filters : sequence, optional
Sequence of filters to use to encode chunk data prior to compression.
object_codec : Codec, optional
A codec to encode object arrays, only needed if dtype=object.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
Examples
--------
Initialize an array store::
>>> from zarr.storage import init_array, KVStore
>>> store = KVStore(dict())
>>> init_array(store, shape=(10000, 10000), chunks=(1000, 1000))
>>> sorted(store.keys())
['.zarray']
Array metadata is stored as JSON::
>>> print(store['.zarray'].decode())
{
"chunks": [
1000,
1000
],
"compressor": {
"blocksize": 0,
"clevel": 5,
"cname": "lz4",
"id": "blosc",
"shuffle": 1
},
"dtype": "<f8",
"fill_value": null,
"filters": null,
"order": "C",
"shape": [
10000,
10000
],
"zarr_format": 2
}
Initialize an array using a storage path::
>>> store = KVStore(dict())
>>> init_array(store, shape=100000000, chunks=1000000, dtype='i1', path='foo')
>>> sorted(store.keys())
['.zgroup', 'foo/.zarray']
>>> print(store['foo/.zarray'].decode())
{
"chunks": [
1000000
],
"compressor": {
"blocksize": 0,
"clevel": 5,
"cname": "lz4",
"id": "blosc",
"shuffle": 1
},
"dtype": "|i1",
"fill_value": null,
"filters": null,
"order": "C",
"shape": [
100000000
],
"zarr_format": 2
}
Notes
-----
The initialisation process involves normalising all array metadata, encoding
as JSON and storing under the '.zarray' key.
"""
# normalize path
path = normalize_storage_path(path)
# ensure parent group initialized
store_version = getattr(store, "_store_version", 2)
if store_version < 3:
_require_parent_group(path, store=store, chunk_store=chunk_store,
overwrite=overwrite)
if store_version == 3 and 'zarr.json' not in store:
# initialize with default zarr.json entry level metadata
store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) # type: ignore
if not compressor:
# compatibility with legacy tests using compressor=[]
compressor = None
_init_array_metadata(store, shape=shape, chunks=chunks, dtype=dtype,
compressor=compressor, fill_value=fill_value,
order=order, overwrite=overwrite, path=path,
chunk_store=chunk_store, filters=filters,
object_codec=object_codec,
dimension_separator=dimension_separator)
def _init_array_metadata(
store: StoreLike,
shape,
chunks=None,
dtype=None,
compressor="default",
fill_value=None,
order="C",
overwrite=False,
path: Optional[str] = None,
chunk_store: Optional[StoreLike] = None,
filters=None,
object_codec=None,
dimension_separator=None,
):
store_version = getattr(store, '_store_version', 2)
path = normalize_storage_path(path)
# guard conditions
if overwrite:
if store_version == 2:
# attempt to delete any pre-existing array in store
rmdir(store, path)
if chunk_store is not None:
rmdir(chunk_store, path)
else:
group_meta_key = _prefix_to_group_key(store, _path_to_prefix(path))
array_meta_key = _prefix_to_array_key(store, _path_to_prefix(path))
data_prefix = data_root + _path_to_prefix(path)
# attempt to delete any pre-existing array in store
if array_meta_key in store:
store.erase(array_meta_key) # type: ignore
if group_meta_key in store:
store.erase(group_meta_key) # type: ignore
store.erase_prefix(data_prefix) # type: ignore
if chunk_store is not None:
chunk_store.erase_prefix(data_prefix) # type: ignore
if '/' in path:
# path is a subfolder of an existing array, remove that array
parent_path = '/'.join(path.split('/')[:-1])
sfx = _get_metadata_suffix(store) # type: ignore
array_key = meta_root + parent_path + '.array' + sfx
if array_key in store:
store.erase(array_key) # type: ignore
if not overwrite:
if contains_array(store, path):
raise ContainsArrayError(path)
elif contains_group(store, path, explicit_only=False):
raise ContainsGroupError(path)
elif store_version == 3:
if '/' in path:
# cannot create an array within an existing array path
parent_path = '/'.join(path.split('/')[:-1])
if contains_array(store, parent_path):
raise ContainsArrayError(path)
# normalize metadata
dtype, object_codec = normalize_dtype(dtype, object_codec)
shape = normalize_shape(shape) + dtype.shape
dtype = dtype.base
chunks = normalize_chunks(chunks, shape, dtype.itemsize)
order = normalize_order(order)
fill_value = normalize_fill_value(fill_value, dtype)
# optional array metadata
if dimension_separator is None and store_version == 2:
dimension_separator = getattr(store, "_dimension_separator", None)
dimension_separator = normalize_dimension_separator(dimension_separator)
# compressor prep
if shape == ():
# no point in compressing a 0-dimensional array, only a single value
compressor = None
elif compressor == 'none':
# compatibility
compressor = None
elif compressor == 'default':
compressor = default_compressor
# obtain compressor config
compressor_config = None
if compressor:
if store_version == 2:
try:
compressor_config = compressor.get_config()
except AttributeError as e:
raise BadCompressorError(compressor) from e
elif not isinstance(compressor, Codec):
raise ValueError("expected a numcodecs Codec for compressor")
# TODO: alternatively, could autoconvert str to a Codec
# e.g. 'zlib' -> numcodec.Zlib object
# compressor = numcodecs.get_codec({'id': compressor})
# obtain filters config
if filters:
# TODO: filters was removed from the metadata in v3
# raise error here if store_version > 2?
filters_config = [f.get_config() for f in filters]
else:
filters_config = []
# deal with object encoding
if dtype.hasobject:
if object_codec is None:
if not filters:
# there are no filters so we can be sure there is no object codec
raise ValueError('missing object_codec for object array')
else:
# one of the filters may be an object codec, issue a warning rather
# than raise an error to maintain backwards-compatibility
warnings.warn('missing object_codec for object array; this will raise a '
'ValueError in version 3.0', FutureWarning)
else:
filters_config.insert(0, object_codec.get_config())
elif object_codec is not None:
warnings.warn('an object_codec is only needed for object arrays')
# use null to indicate no filters
if not filters_config:
filters_config = None # type: ignore
# initialize metadata
# TODO: don't store redundant dimension_separator for v3?
_compressor = compressor_config if store_version == 2 else compressor
meta = dict(shape=shape, compressor=_compressor,
fill_value=fill_value,
dimension_separator=dimension_separator)
if store_version < 3:
meta.update(dict(chunks=chunks, dtype=dtype, order=order,
filters=filters_config))
else:
if dimension_separator is None:
dimension_separator = "/"
if filters_config:
attributes = {'filters': filters_config}
else:
attributes = {}
meta.update(
dict(chunk_grid=dict(type="regular",
chunk_shape=chunks,
separator=dimension_separator),
chunk_memory_layout=order,
data_type=dtype,
attributes=attributes)
)
key = _prefix_to_array_key(store, _path_to_prefix(path))
if hasattr(store, '_metadata_class'):
store[key] = store._metadata_class.encode_array_metadata(meta) # type: ignore
else:
store[key] = encode_array_metadata(meta)
# backwards compatibility
init_store = init_array
[docs]def init_group(
store: StoreLike,
overwrite: bool = False,
path: Path = None,
chunk_store: StoreLike = None,
):
"""Initialize a group store. Note that this is a low-level function and there should be no
need to call this directly from user code.
Parameters
----------
store : Store
A mapping that supports string keys and byte sequence values.
overwrite : bool, optional
If True, erase all data in `store` prior to initialisation.
path : string, optional
Path under which array is stored.
chunk_store : Store, optional
Separate storage for chunks. If not provided, `store` will be used
for storage of both chunks and metadata.
"""
# normalize path
path = normalize_storage_path(path)
store_version = getattr(store, '_store_version', 2)
if store_version < 3:
# ensure parent group initialized
_require_parent_group(path, store=store, chunk_store=chunk_store,
overwrite=overwrite)
if store_version == 3 and 'zarr.json' not in store:
# initialize with default zarr.json entry level metadata
store['zarr.json'] = store._metadata_class.encode_hierarchy_metadata(None) # type: ignore
# initialise metadata
_init_group_metadata(store=store, overwrite=overwrite, path=path,
chunk_store=chunk_store)
if store_version == 3:
# TODO: Should initializing a v3 group also create a corresponding
# empty folder under data/root/? I think probably not until there
# is actual data written there.
pass
def _init_group_metadata(
store: StoreLike,
overwrite: Optional[bool] = False,
path: Optional[str] = None,
chunk_store: StoreLike = None,
):
store_version = getattr(store, '_store_version', 2)
path = normalize_storage_path(path)
# guard conditions
if overwrite:
if store_version == 2:
# attempt to delete any pre-existing items in store
rmdir(store, path)
if chunk_store is not None:
rmdir(chunk_store, path)
else:
group_meta_key = _prefix_to_group_key(store, _path_to_prefix(path))
array_meta_key = _prefix_to_array_key(store, _path_to_prefix(path))
data_prefix = data_root + _path_to_prefix(path)
meta_prefix = meta_root + _path_to_prefix(path)
# attempt to delete any pre-existing array in store
if array_meta_key in store:
store.erase(array_meta_key) # type: ignore
if group_meta_key in store:
store.erase(group_meta_key) # type: ignore
store.erase_prefix(data_prefix) # type: ignore
store.erase_prefix(meta_prefix) # type: ignore
if chunk_store is not None:
chunk_store.erase_prefix(data_prefix) # type: ignore
if not overwrite:
if contains_array(store, path):
raise ContainsArrayError(path)
elif contains_group(store, path):
raise ContainsGroupError(path)
elif store_version == 3 and '/' in path:
# cannot create a group overlapping with an existing array name
parent_path = '/'.join(path.split('/')[:-1])
if contains_array(store, parent_path):
raise ContainsArrayError(path)
# initialize metadata
# N.B., currently no metadata properties are needed, however there may
# be in future
if store_version == 3:
meta = {'attributes': {}} # type: ignore
else:
meta = {} # type: ignore
key = _prefix_to_group_key(store, _path_to_prefix(path))
if hasattr(store, '_metadata_class'):
store[key] = store._metadata_class.encode_group_metadata(meta) # type: ignore
else:
store[key] = encode_group_metadata(meta)
def _dict_store_keys(d: Dict, prefix="", cls=dict):
for k in d.keys():
v = d[k]
if isinstance(v, cls):
for sk in _dict_store_keys(v, prefix + k + '/', cls):
yield sk
else:
yield prefix + k
class KVStore(Store):
"""
This provides a default implementation of a store interface around
a mutable mapping, to avoid having to test stores for presence of methods.
This, for most methods should just be a pass-through to the underlying KV
store which is likely to expose a MuttableMapping interface,
"""
def __init__(self, mutablemapping):
self._mutable_mapping = mutablemapping
def __getitem__(self, key):
return self._mutable_mapping[key]
def __setitem__(self, key, value):
self._mutable_mapping[key] = value
def __delitem__(self, key):
del self._mutable_mapping[key]
def get(self, key, default=None):
return self._mutable_mapping.get(key, default)
def values(self):
return self._mutable_mapping.values()
def __iter__(self):
return iter(self._mutable_mapping)
def __len__(self):
return len(self._mutable_mapping)
def __repr__(self):
return f"<{self.__class__.__name__}: \n{repr(self._mutable_mapping)}\n at {hex(id(self))}>"
def __eq__(self, other):
if isinstance(other, KVStore):
return self._mutable_mapping == other._mutable_mapping
else:
return NotImplemented
[docs]class MemoryStore(Store):
"""Store class that uses a hierarchy of :class:`KVStore` objects, thus all data
will be held in main memory.
Examples
--------
This is the default class used when creating a group. E.g.::
>>> import zarr
>>> g = zarr.group()
>>> type(g.store)
<class 'zarr.storage.MemoryStore'>
Note that the default class when creating an array is the built-in
:class:`KVStore` class, i.e.::
>>> z = zarr.zeros(100)
>>> type(z.store)
<class 'zarr.storage.KVStore'>
Notes
-----
Safe to write in multiple threads.
"""
def __init__(self, root=None, cls=dict, dimension_separator=None):
if root is None:
self.root = cls()
else:
self.root = root
self.cls = cls
self.write_mutex = Lock()
self._dimension_separator = dimension_separator
def __getstate__(self):
return self.root, self.cls
def __setstate__(self, state):
root, cls = state
self.__init__(root=root, cls=cls)
def _get_parent(self, item: str):
parent = self.root
# split the item
segments = item.split('/')
# find the parent container
for k in segments[:-1]:
parent = parent[k]
if not isinstance(parent, self.cls):
raise KeyError(item)
return parent, segments[-1]
def _require_parent(self, item):
parent = self.root
# split the item
segments = item.split('/')
# require the parent container
for k in segments[:-1]:
try:
parent = parent[k]
except KeyError:
parent[k] = self.cls()
parent = parent[k]
else:
if not isinstance(parent, self.cls):
raise KeyError(item)
return parent, segments[-1]
def __getitem__(self, item: str):
parent, key = self._get_parent(item)
try:
value = parent[key]
except KeyError:
raise KeyError(item)
else:
if isinstance(value, self.cls):
raise KeyError(item)
else:
return value
def __setitem__(self, item: str, value):
with self.write_mutex:
parent, key = self._require_parent(item)
value = ensure_bytes(value)
parent[key] = value
def __delitem__(self, item: str):
with self.write_mutex:
parent, key = self._get_parent(item)
try:
del parent[key]
except KeyError:
raise KeyError(item)
def __contains__(self, item: str): # type: ignore[override]
try:
parent, key = self._get_parent(item)
value = parent[key]
except KeyError:
return False
else:
return not isinstance(value, self.cls)
def __eq__(self, other):
return (
isinstance(other, MemoryStore) and
self.root == other.root and
self.cls == other.cls
)
def keys(self):
for k in _dict_store_keys(self.root, cls=self.cls):
yield k
def __iter__(self):
return self.keys()
def __len__(self) -> int:
return sum(1 for _ in self.keys())
def listdir(self, path: Path = None) -> List[str]:
path = normalize_storage_path(path)
if path:
try:
parent, key = self._get_parent(path)
value = parent[key]
except KeyError:
return []
else:
value = self.root
if isinstance(value, self.cls):
return sorted(value.keys())
else:
return []
def rename(self, src_path: Path, dst_path: Path):
src_path = normalize_storage_path(src_path)
dst_path = normalize_storage_path(dst_path)
src_parent, src_key = self._get_parent(src_path)
dst_parent, dst_key = self._require_parent(dst_path)
dst_parent[dst_key] = src_parent.pop(src_key)
def rmdir(self, path: Path = None):
path = normalize_storage_path(path)
if path:
try:
parent, key = self._get_parent(path)
value = parent[key]
except KeyError:
return
else:
if isinstance(value, self.cls):
del parent[key]
else:
# clear out root
self.root = self.cls()
def getsize(self, path: Path = None):
path = normalize_storage_path(path)
# obtain value to return size of
value = None
if path:
try:
parent, key = self._get_parent(path)
value = parent[key]
except KeyError:
pass
else:
value = self.root
# obtain size of value
if value is None:
return 0
elif isinstance(value, self.cls):
# total size for directory
size = 0
for v in value.values():
if not isinstance(v, self.cls):
size += buffer_size(v)
return size
else:
return buffer_size(value)
def clear(self):
with self.write_mutex:
self.root.clear()
class DictStore(MemoryStore):
def __init__(self, *args, **kwargs):
warnings.warn("DictStore has been renamed to MemoryStore in 2.4.0 and "
"will be removed in the future. Please use MemoryStore.",
DeprecationWarning,
stacklevel=2)
super().__init__(*args, **kwargs)
[docs]class DirectoryStore(Store):
"""Storage class using directories and files on a standard file system.
Parameters
----------
path : string
Location of directory to use as the root of the storage hierarchy.
normalize_keys : bool, optional
If True, all store keys will be normalized to use lower case characters
(e.g. 'foo' and 'FOO' will be treated as equivalent). This can be
useful to avoid potential discrepancies between case-sensitive and
case-insensitive file system. Default value is False.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.DirectoryStore('data/array.zarr')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
Each chunk of the array is stored as a separate file on the file system,
i.e.::
>>> import os
>>> sorted(os.listdir('data/array.zarr'))
['.zarray', '0.0', '0.1', '1.0', '1.1']
Store a group::
>>> store = zarr.DirectoryStore('data/group.zarr')
>>> root = zarr.group(store=store, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
When storing a group, levels in the group hierarchy will correspond to
directories on the file system, i.e.::
>>> sorted(os.listdir('data/group.zarr'))
['.zgroup', 'foo']
>>> sorted(os.listdir('data/group.zarr/foo'))
['.zgroup', 'bar']
>>> sorted(os.listdir('data/group.zarr/foo/bar'))
['.zarray', '0.0', '0.1', '1.0', '1.1']
Notes
-----
Atomic writes are used, which means that data are first written to a
temporary file, then moved into place when the write is successfully
completed. Files are only held open while they are being read or written and are
closed immediately afterwards, so there is no need to manually close any files.
Safe to write in multiple threads or processes.
"""
def __init__(self, path, normalize_keys=False, dimension_separator=None):
# guard conditions
path = os.path.abspath(path)
if os.path.exists(path) and not os.path.isdir(path):
raise FSPathExistNotDir(path)
self.path = path
self.normalize_keys = normalize_keys
self._dimension_separator = dimension_separator
def _normalize_key(self, key):
return key.lower() if self.normalize_keys else key
@staticmethod
def _fromfile(fn):
""" Read data from a file
Parameters
----------
fn : str
Filepath to open and read from.
Notes
-----
Subclasses should overload this method to specify any custom
file reading logic.
"""
with open(fn, 'rb') as f:
return f.read()
@staticmethod
def _tofile(a, fn):
""" Write data to a file
Parameters
----------
a : array-like
Data to write into the file.
fn : str
Filepath to open and write to.
Notes
-----
Subclasses should overload this method to specify any custom
file writing logic.
"""
with open(fn, mode='wb') as f:
f.write(a)
def __getitem__(self, key):
key = self._normalize_key(key)
filepath = os.path.join(self.path, key)
if os.path.isfile(filepath):
return self._fromfile(filepath)
else:
raise KeyError(key)
def __setitem__(self, key, value):
key = self._normalize_key(key)
# coerce to flat, contiguous array (ideally without copying)
value = ensure_contiguous_ndarray_like(value)
# destination path for key
file_path = os.path.join(self.path, key)
# ensure there is no directory in the way
if os.path.isdir(file_path):
shutil.rmtree(file_path)
# ensure containing directory exists
dir_path, file_name = os.path.split(file_path)
if os.path.isfile(dir_path):
raise KeyError(key)
if not os.path.exists(dir_path):
try:
os.makedirs(dir_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise KeyError(key)
# write to temporary file
# note we're not using tempfile.NamedTemporaryFile to avoid restrictive file permissions
temp_name = file_name + '.' + uuid.uuid4().hex + '.partial'
temp_path = os.path.join(dir_path, temp_name)
try:
self._tofile(value, temp_path)
# move temporary file into place;
# make several attempts at writing the temporary file to get past
# potential antivirus file locking issues
retry_call(os.replace, (temp_path, file_path), exceptions=(PermissionError,))
finally:
# clean up if temp file still exists for whatever reason
if os.path.exists(temp_path): # pragma: no cover
os.remove(temp_path)
def __delitem__(self, key):
key = self._normalize_key(key)
path = os.path.join(self.path, key)
if os.path.isfile(path):
os.remove(path)
elif os.path.isdir(path):
# include support for deleting directories, even though strictly
# speaking these do not exist as keys in the store
shutil.rmtree(path)
else:
raise KeyError(key)
def __contains__(self, key):
key = self._normalize_key(key)
file_path = os.path.join(self.path, key)
return os.path.isfile(file_path)
def __eq__(self, other):
return (
isinstance(other, DirectoryStore) and
self.path == other.path
)
def keys(self):
if os.path.exists(self.path):
yield from self._keys_fast(self.path)
@staticmethod
def _keys_fast(path, walker=os.walk):
for dirpath, _, filenames in walker(path):
dirpath = os.path.relpath(dirpath, path)
if dirpath == os.curdir:
for f in filenames:
yield f
else:
dirpath = dirpath.replace("\\", "/")
for f in filenames:
yield "/".join((dirpath, f))
def __iter__(self):
return self.keys()
def __len__(self):
return sum(1 for _ in self.keys())
def dir_path(self, path=None):
store_path = normalize_storage_path(path)
dir_path = self.path
if store_path:
dir_path = os.path.join(dir_path, store_path)
return dir_path
def listdir(self, path=None):
return self._nested_listdir(path) if self._dimension_separator == "/" else \
self._flat_listdir(path)
def _flat_listdir(self, path=None):
dir_path = self.dir_path(path)
if os.path.isdir(dir_path):
return sorted(os.listdir(dir_path))
else:
return []
def _nested_listdir(self, path=None):
children = self._flat_listdir(path=path)
if array_meta_key in children:
# special handling of directories containing an array to map nested chunk
# keys back to standard chunk keys
new_children = []
root_path = self.dir_path(path)
for entry in children:
entry_path = os.path.join(root_path, entry)
if _prog_number.match(entry) and os.path.isdir(entry_path):
for dir_path, _, file_names in os.walk(entry_path):
for file_name in file_names:
file_path = os.path.join(dir_path, file_name)
rel_path = file_path.split(root_path + os.path.sep)[1]
new_children.append(rel_path.replace(os.path.sep, '.'))
else:
new_children.append(entry)
return sorted(new_children)
else:
return children
def rename(self, src_path, dst_path):
store_src_path = normalize_storage_path(src_path)
store_dst_path = normalize_storage_path(dst_path)
dir_path = self.path
src_path = os.path.join(dir_path, store_src_path)
dst_path = os.path.join(dir_path, store_dst_path)
os.renames(src_path, dst_path)
def rmdir(self, path=None):
store_path = normalize_storage_path(path)
dir_path = self.path
if store_path:
dir_path = os.path.join(dir_path, store_path)
if os.path.isdir(dir_path):
shutil.rmtree(dir_path)
def getsize(self, path=None):
store_path = normalize_storage_path(path)
fs_path = self.path
if store_path:
fs_path = os.path.join(fs_path, store_path)
if os.path.isfile(fs_path):
return os.path.getsize(fs_path)
elif os.path.isdir(fs_path):
size = 0
for child in scandir(fs_path):
if child.is_file():
size += child.stat().st_size
return size
else:
return 0
def clear(self):
shutil.rmtree(self.path)
def atexit_rmtree(path,
isdir=os.path.isdir,
rmtree=shutil.rmtree): # pragma: no cover
"""Ensure directory removal at interpreter exit."""
if isdir(path):
rmtree(path)
# noinspection PyShadowingNames
def atexit_rmglob(path,
glob=glob.glob,
isdir=os.path.isdir,
isfile=os.path.isfile,
remove=os.remove,
rmtree=shutil.rmtree): # pragma: no cover
"""Ensure removal of multiple files at interpreter exit."""
for p in glob(path):
if isfile(p):
remove(p)
elif isdir(p):
rmtree(p)
[docs]class FSStore(Store):
"""Wraps an fsspec.FSMap to give access to arbitrary filesystems
Requires that ``fsspec`` is installed, as well as any additional
requirements for the protocol chosen.
Parameters
----------
url : str
The destination to map. If no fs is provided, should include protocol
and path, like "s3://bucket/root". If an fs is provided, can be a path
within that filesystem, like "bucket/root"
normalize_keys : bool
key_separator : str
public API for accessing dimension_separator. Never `None`
See dimension_separator for more information.
mode : str
"w" for writable, "r" for read-only
exceptions : list of Exception subclasses
When accessing data, any of these exceptions will be treated
as a missing key
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
fs : fsspec.spec.AbstractFileSystem, optional
An existing filesystem to use for the store.
check : bool, optional
If True, performs a touch at the root location, to check for write access.
Passed to `fsspec.mapping.FSMap` constructor.
create : bool, optional
If True, performs a mkdir at the rool location.
Passed to `fsspec.mapping.FSMap` constructor.
missing_exceptions : sequence of Exceptions, optional
Exceptions classes to associate with missing files.
Passed to `fsspec.mapping.FSMap` constructor.
storage_options : passed to the fsspec implementation. Cannot be used
together with fs.
"""
_array_meta_key = array_meta_key
_group_meta_key = group_meta_key
_attrs_key = attrs_key
def __init__(self, url, normalize_keys=False, key_separator=None,
mode='w',
exceptions=(KeyError, PermissionError, IOError),
dimension_separator=None,
fs=None,
check=False,
create=False,
missing_exceptions=None,
**storage_options):
import fsspec
mapper_options = {"check": check, "create": create}
# https://github.com/zarr-developers/zarr-python/pull/911#discussion_r841926292
# Some fsspec implementations don't accept missing_exceptions.
# This is a workaround to avoid passing it in the most common scenarios.
# Remove this and add missing_exceptions to mapper_options when fsspec is released.
if missing_exceptions is not None:
mapper_options["missing_exceptions"] = missing_exceptions # pragma: no cover
if fs is None:
protocol, _ = fsspec.core.split_protocol(url)
# set auto_mkdir to True for local file system
if protocol in (None, "file") and not storage_options.get("auto_mkdir"):
storage_options["auto_mkdir"] = True
self.map = fsspec.get_mapper(url, **{**mapper_options, **storage_options})
self.fs = self.map.fs # for direct operations
self.path = self.fs._strip_protocol(url)
else:
if storage_options:
raise ValueError("Cannot specify both fs and storage_options")
self.fs = fs
self.path = self.fs._strip_protocol(url)
self.map = self.fs.get_mapper(self.path, **mapper_options)
self.normalize_keys = normalize_keys
self.mode = mode
self.exceptions = exceptions
# For backwards compatibility. Guaranteed to be non-None
if key_separator is not None:
dimension_separator = key_separator
self.key_separator = dimension_separator
self._default_key_separator()
# Pass attributes to array creation
self._dimension_separator = dimension_separator
def _default_key_separator(self):
if self.key_separator is None:
self.key_separator = "."
def _normalize_key(self, key):
key = normalize_storage_path(key).lstrip('/')
if key:
*bits, end = key.split('/')
if end not in (self._array_meta_key, self._group_meta_key, self._attrs_key):
end = end.replace('.', self.key_separator)
key = '/'.join(bits + [end])
return key.lower() if self.normalize_keys else key
def getitems(self, keys, **kwargs):
keys_transformed = [self._normalize_key(key) for key in keys]
results = self.map.getitems(keys_transformed, on_error="omit")
# The function calling this method may not recognize the transformed keys
# So we send the values returned by self.map.getitems back into the original key space.
return {keys[keys_transformed.index(rk)]: rv for rk, rv in results.items()}
def __getitem__(self, key):
key = self._normalize_key(key)
try:
return self.map[key]
except self.exceptions as e:
raise KeyError(key) from e
def setitems(self, values):
if self.mode == 'r':
raise ReadOnlyError()
values = {self._normalize_key(key): val for key, val in values.items()}
self.map.setitems(values)
def __setitem__(self, key, value):
if self.mode == 'r':
raise ReadOnlyError()
key = self._normalize_key(key)
path = self.dir_path(key)
try:
if self.fs.isdir(path):
self.fs.rm(path, recursive=True)
self.map[key] = value
self.fs.invalidate_cache(self.fs._parent(path))
except self.exceptions as e:
raise KeyError(key) from e
def __delitem__(self, key):
if self.mode == 'r':
raise ReadOnlyError()
key = self._normalize_key(key)
path = self.dir_path(key)
if self.fs.isdir(path):
self.fs.rm(path, recursive=True)
else:
del self.map[key]
def delitems(self, keys):
if self.mode == 'r':
raise ReadOnlyError()
# only remove the keys that exist in the store
nkeys = [self._normalize_key(key) for key in keys if key in self]
# rm errors if you pass an empty collection
if len(nkeys) > 0:
self.map.delitems(nkeys)
def __contains__(self, key):
key = self._normalize_key(key)
return key in self.map
def __eq__(self, other):
return (type(self) is type(other) and self.map == other.map
and self.mode == other.mode)
def keys(self):
return iter(self.map)
def __iter__(self):
return self.keys()
def __len__(self):
return len(list(self.keys()))
def dir_path(self, path=None):
store_path = normalize_storage_path(path)
return self.map._key_to_str(store_path)
def listdir(self, path=None):
dir_path = self.dir_path(path)
try:
children = sorted(p.rstrip('/').rsplit('/', 1)[-1]
for p in self.fs.ls(dir_path, detail=False))
if self.key_separator != "/":
return children
else:
if self._array_meta_key in children:
# special handling of directories containing an array to map nested chunk
# keys back to standard chunk keys
new_children = []
root_path = self.dir_path(path)
for entry in children:
entry_path = os.path.join(root_path, entry)
if _prog_number.match(entry) and self.fs.isdir(entry_path):
for file_name in self.fs.find(entry_path):
file_path = os.path.join(dir_path, file_name)
rel_path = file_path.split(root_path)[1]
rel_path = rel_path.lstrip('/')
new_children.append(rel_path.replace('/', '.'))
else:
new_children.append(entry)
return sorted(new_children)
else:
return children
except IOError:
return []
def rmdir(self, path=None):
if self.mode == 'r':
raise ReadOnlyError()
store_path = self.dir_path(path)
if self.fs.isdir(store_path):
self.fs.rm(store_path, recursive=True)
def getsize(self, path=None):
store_path = self.dir_path(path)
return self.fs.du(store_path, True, True)
def clear(self):
if self.mode == 'r':
raise ReadOnlyError()
self.map.clear()
[docs]class TempStore(DirectoryStore):
"""Directory store using a temporary directory for storage.
Parameters
----------
suffix : string, optional
Suffix for the temporary directory name.
prefix : string, optional
Prefix for the temporary directory name.
dir : string, optional
Path to parent directory in which to create temporary directory.
normalize_keys : bool, optional
If True, all store keys will be normalized to use lower case characters
(e.g. 'foo' and 'FOO' will be treated as equivalent). This can be
useful to avoid potential discrepancies between case-sensitive and
case-insensitive file system. Default value is False.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
"""
# noinspection PyShadowingBuiltins
def __init__(self, suffix='', prefix='zarr', dir=None, normalize_keys=False,
dimension_separator=None):
path = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir)
atexit.register(atexit_rmtree, path)
super().__init__(path, normalize_keys=normalize_keys)
_prog_ckey = re.compile(r'^(\d+)(\.\d+)+$')
_prog_number = re.compile(r'^\d+$')
[docs]class NestedDirectoryStore(DirectoryStore):
"""Storage class using directories and files on a standard file system, with
special handling for chunk keys so that chunk files for multidimensional
arrays are stored in a nested directory tree.
Parameters
----------
path : string
Location of directory to use as the root of the storage hierarchy.
normalize_keys : bool, optional
If True, all store keys will be normalized to use lower case characters
(e.g. 'foo' and 'FOO' will be treated as equivalent). This can be
useful to avoid potential discrepancies between case-sensitive and
case-insensitive file system. Default value is False.
dimension_separator : {'/'}, optional
Separator placed between the dimensions of a chunk.
Only supports "/" unlike other implementations.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.NestedDirectoryStore('data/array.zarr')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
Each chunk of the array is stored as a separate file on the file system,
note the multiple directory levels used for the chunk files::
>>> import os
>>> sorted(os.listdir('data/array.zarr'))
['.zarray', '0', '1']
>>> sorted(os.listdir('data/array.zarr/0'))
['0', '1']
>>> sorted(os.listdir('data/array.zarr/1'))
['0', '1']
Store a group::
>>> store = zarr.NestedDirectoryStore('data/group.zarr')
>>> root = zarr.group(store=store, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
When storing a group, levels in the group hierarchy will correspond to
directories on the file system, i.e.::
>>> sorted(os.listdir('data/group.zarr'))
['.zgroup', 'foo']
>>> sorted(os.listdir('data/group.zarr/foo'))
['.zgroup', 'bar']
>>> sorted(os.listdir('data/group.zarr/foo/bar'))
['.zarray', '0', '1']
>>> sorted(os.listdir('data/group.zarr/foo/bar/0'))
['0', '1']
>>> sorted(os.listdir('data/group.zarr/foo/bar/1'))
['0', '1']
Notes
-----
The :class:`DirectoryStore` class stores all chunk files for an array
together in a single directory. On some file systems, the potentially large
number of files in a single directory can cause performance issues. The
:class:`NestedDirectoryStore` class provides an alternative where chunk
files for multidimensional arrays will be organised into a directory
hierarchy, thus reducing the number of files in any one directory.
Safe to write in multiple threads or processes.
"""
def __init__(self, path, normalize_keys=False, dimension_separator="/"):
super().__init__(path, normalize_keys=normalize_keys)
if dimension_separator is None:
dimension_separator = "/"
elif dimension_separator != "/":
raise ValueError(
"NestedDirectoryStore only supports '/' as dimension_separator")
self._dimension_separator = dimension_separator
def __eq__(self, other):
return (
isinstance(other, NestedDirectoryStore) and
self.path == other.path
)
# noinspection PyPep8Naming
[docs]class ZipStore(Store):
"""Storage class using a Zip file.
Parameters
----------
path : string
Location of file.
compression : integer, optional
Compression method to use when writing to the archive.
allowZip64 : bool, optional
If True (the default) will create ZIP files that use the ZIP64
extensions when the zipfile is larger than 2 GiB. If False
will raise an exception when the ZIP file would require ZIP64
extensions.
mode : string, optional
One of 'r' to read an existing file, 'w' to truncate and write a new
file, 'a' to append to an existing file, or 'x' to exclusively create
and write a new file.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.ZipStore('data/array.zip', mode='w')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store)
>>> z[...] = 42
>>> store.close() # don't forget to call this when you're done
Store a group::
>>> store = zarr.ZipStore('data/group.zip', mode='w')
>>> root = zarr.group(store=store)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
>>> store.close() # don't forget to call this when you're done
After modifying a ZipStore, the ``close()`` method must be called, otherwise
essential data will not be written to the underlying Zip file. The ZipStore
class also supports the context manager protocol, which ensures the ``close()``
method is called on leaving the context, e.g.::
>>> with zarr.ZipStore('data/array.zip', mode='w') as store:
... z = zarr.zeros((10, 10), chunks=(5, 5), store=store)
... z[...] = 42
... # no need to call store.close()
Notes
-----
Each chunk of an array is stored as a separate entry in the Zip file. Note
that Zip files do not provide any way to remove or replace existing entries.
If an attempt is made to replace an entry, then a warning is generated by
the Python standard library about a duplicate Zip file entry. This can be
triggered if you attempt to write data to a Zarr array more than once,
e.g.::
>>> store = zarr.ZipStore('data/example.zip', mode='w')
>>> z = zarr.zeros(100, chunks=10, store=store)
>>> # first write OK
... z[...] = 42
>>> # second write generates warnings
... z[...] = 42 # doctest: +SKIP
>>> store.close()
This can also happen in a more subtle situation, where data are written only
once to a Zarr array, but the write operations are not aligned with chunk
boundaries, e.g.::
>>> store = zarr.ZipStore('data/example.zip', mode='w')
>>> z = zarr.zeros(100, chunks=10, store=store)
>>> z[5:15] = 42
>>> # write overlaps chunk previously written, generates warnings
... z[15:25] = 42 # doctest: +SKIP
To avoid creating duplicate entries, only write data once, and align writes
with chunk boundaries. This alignment is done automatically if you call
``z[...] = ...`` or create an array from existing data via :func:`zarr.array`.
Alternatively, use a :class:`DirectoryStore` when writing the data, then
manually Zip the directory and use the Zip file for subsequent reads.
Take note that the files in the Zip file must be relative to the root of the
Zarr archive. You may find it easier to create such a Zip file with ``7z``, e.g.::
7z a -tzip archive.zarr.zip archive.zarr/.
Safe to write in multiple threads but not in multiple processes.
"""
_erasable = False
def __init__(self, path, compression=zipfile.ZIP_STORED, allowZip64=True, mode='a',
dimension_separator=None):
# store properties
path = os.path.abspath(path)
self.path = path
self.compression = compression
self.allowZip64 = allowZip64
self.mode = mode
self._dimension_separator = dimension_separator
# Current understanding is that zipfile module in stdlib is not thread-safe,
# and so locking is required for both read and write. However, this has not
# been investigated in detail, perhaps no lock is needed if mode='r'.
self.mutex = RLock()
# open zip file
self.zf = zipfile.ZipFile(path, mode=mode, compression=compression,
allowZip64=allowZip64)
def __getstate__(self):
self.flush()
return self.path, self.compression, self.allowZip64, self.mode
def __setstate__(self, state):
path, compression, allowZip64, mode = state
# if initially opened with mode 'w' or 'x', re-open in mode 'a' so file doesn't
# get clobbered
if mode in 'wx':
mode = 'a'
self.__init__(path=path, compression=compression, allowZip64=allowZip64,
mode=mode)
[docs] def close(self):
"""Closes the underlying zip file, ensuring all records are written."""
with self.mutex:
self.zf.close()
[docs] def flush(self):
"""Closes the underlying zip file, ensuring all records are written,
then re-opens the file for further modifications."""
if self.mode != 'r':
with self.mutex:
self.zf.close()
# N.B., re-open with mode 'a' regardless of initial mode so we don't wipe
# what's been written
self.zf = zipfile.ZipFile(self.path, mode='a',
compression=self.compression,
allowZip64=self.allowZip64)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __getitem__(self, key):
with self.mutex:
with self.zf.open(key) as f: # will raise KeyError
return f.read()
def __setitem__(self, key, value):
if self.mode == 'r':
raise ReadOnlyError()
value = ensure_contiguous_ndarray_like(value).view("u1")
with self.mutex:
# writestr(key, value) writes with default permissions from
# zipfile (600) that are too restrictive, build ZipInfo for
# the key to work around limitation
keyinfo = zipfile.ZipInfo(filename=key,
date_time=time.localtime(time.time())[:6])
keyinfo.compress_type = self.compression
if keyinfo.filename[-1] == os.sep:
keyinfo.external_attr = 0o40775 << 16 # drwxrwxr-x
keyinfo.external_attr |= 0x10 # MS-DOS directory flag
else:
keyinfo.external_attr = 0o644 << 16 # ?rw-r--r--
self.zf.writestr(keyinfo, value)
def __delitem__(self, key):
raise NotImplementedError
def __eq__(self, other):
return (
isinstance(other, ZipStore) and
self.path == other.path and
self.compression == other.compression and
self.allowZip64 == other.allowZip64
)
def keylist(self):
with self.mutex:
return sorted(self.zf.namelist())
def keys(self):
for key in self.keylist():
yield key
def __iter__(self):
return self.keys()
def __len__(self):
return sum(1 for _ in self.keys())
def __contains__(self, key):
try:
with self.mutex:
self.zf.getinfo(key)
except KeyError:
return False
else:
return True
def listdir(self, path=None):
path = normalize_storage_path(path)
return _listdir_from_keys(self, path)
def getsize(self, path=None):
path = normalize_storage_path(path)
with self.mutex:
children = self.listdir(path)
if children:
size = 0
for child in children:
if path:
name = path + '/' + child
else:
name = child
try:
info = self.zf.getinfo(name)
except KeyError:
pass
else:
size += info.compress_size
return size
elif path:
try:
info = self.zf.getinfo(path)
return info.compress_size
except KeyError:
return 0
else:
return 0
def clear(self):
if self.mode == 'r':
raise ReadOnlyError()
with self.mutex:
self.close()
os.remove(self.path)
self.zf = zipfile.ZipFile(self.path, mode=self.mode,
compression=self.compression,
allowZip64=self.allowZip64)
[docs]def migrate_1to2(store):
"""Migrate array metadata in `store` from Zarr format version 1 to
version 2.
Parameters
----------
store : Store
Store to be migrated.
Notes
-----
Version 1 did not support hierarchies, so this migration function will
look for a single array in `store` and migrate the array metadata to
version 2.
"""
# migrate metadata
from zarr import meta_v1
meta = meta_v1.decode_metadata(store['meta'])
del store['meta']
# add empty filters
meta['filters'] = None
# migration compression metadata
compression = meta['compression']
if compression is None or compression == 'none':
compressor_config = None
else:
compression_opts = meta['compression_opts']
codec_cls = codec_registry[compression]
if isinstance(compression_opts, dict):
compressor = codec_cls(**compression_opts)
else:
compressor = codec_cls(compression_opts)
compressor_config = compressor.get_config()
meta['compressor'] = compressor_config
del meta['compression']
del meta['compression_opts']
# store migrated metadata
if hasattr(store, '_metadata_class'):
store[array_meta_key] = store._metadata_class.encode_array_metadata(meta)
else:
store[array_meta_key] = encode_array_metadata(meta)
# migrate user attributes
store[attrs_key] = store['attrs']
del store['attrs']
# noinspection PyShadowingBuiltins
[docs]class DBMStore(Store):
"""Storage class using a DBM-style database.
Parameters
----------
path : string
Location of database file.
flag : string, optional
Flags for opening the database file.
mode : int
File mode used if a new file is created.
open : function, optional
Function to open the database file. If not provided, :func:`dbm.open` will be
used on Python 3, and :func:`anydbm.open` will be used on Python 2.
write_lock: bool, optional
Use a lock to prevent concurrent writes from multiple threads (True by default).
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.e
**open_kwargs
Keyword arguments to pass the `open` function.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.DBMStore('data/array.db')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
>>> store.close() # don't forget to call this when you're done
Store a group::
>>> store = zarr.DBMStore('data/group.db')
>>> root = zarr.group(store=store, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
>>> store.close() # don't forget to call this when you're done
After modifying a DBMStore, the ``close()`` method must be called, otherwise
essential data may not be written to the underlying database file. The
DBMStore class also supports the context manager protocol, which ensures the
``close()`` method is called on leaving the context, e.g.::
>>> with zarr.DBMStore('data/array.db') as store:
... z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
... z[...] = 42
... # no need to call store.close()
A different database library can be used by passing a different function to
the `open` parameter. For example, if the `bsddb3
<https://www.jcea.es/programacion/pybsddb.htm>`_ package is installed, a
Berkeley DB database can be used::
>>> import bsddb3
>>> store = zarr.DBMStore('data/array.bdb', open=bsddb3.btopen)
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
>>> store.close()
Notes
-----
Please note that, by default, this class will use the Python standard
library `dbm.open` function to open the database file (or `anydbm.open` on
Python 2). There are up to three different implementations of DBM-style
databases available in any Python installation, and which one is used may
vary from one system to another. Database file formats are not compatible
between these different implementations. Also, some implementations are
more efficient than others. In particular, the "dumb" implementation will be
the fall-back on many systems, and has very poor performance for some usage
scenarios. If you want to ensure a specific implementation is used, pass the
corresponding open function, e.g., `dbm.gnu.open` to use the GNU DBM
library.
Safe to write in multiple threads. May be safe to write in multiple processes,
depending on which DBM implementation is being used, although this has not been
tested.
"""
def __init__(self, path, flag='c', mode=0o666, open=None, write_lock=True,
dimension_separator=None,
**open_kwargs):
if open is None:
import dbm
open = dbm.open
path = os.path.abspath(path)
# noinspection PyArgumentList
self.db = open(path, flag, mode, **open_kwargs)
self.path = path
self.flag = flag
self.mode = mode
self.open = open
self.write_lock = write_lock
if write_lock:
# This may not be required as some dbm implementations manage their own
# locks, but err on the side of caution.
self.write_mutex = Lock()
else:
self.write_mutex = nolock
self.open_kwargs = open_kwargs
self._dimension_separator = dimension_separator
def __getstate__(self):
try:
self.flush() # needed for ndbm
except Exception:
# flush may fail if db has already been closed
pass
return (self.path, self.flag, self.mode, self.open, self.write_lock,
self.open_kwargs)
def __setstate__(self, state):
path, flag, mode, open, write_lock, open_kws = state
if flag[0] == 'n':
flag = 'c' + flag[1:] # don't clobber an existing database
self.__init__(path=path, flag=flag, mode=mode, open=open,
write_lock=write_lock, **open_kws)
[docs] def close(self):
"""Closes the underlying database file."""
if hasattr(self.db, 'close'):
with self.write_mutex:
self.db.close()
[docs] def flush(self):
"""Synchronizes data to the underlying database file."""
if self.flag[0] != 'r':
with self.write_mutex:
if hasattr(self.db, 'sync'):
self.db.sync()
else: # pragma: no cover
# we don't cover this branch anymore as ndbm (oracle) is not packaged
# by conda-forge on non-mac OS:
# https://github.com/conda-forge/staged-recipes/issues/4476
# fall-back, close and re-open, needed for ndbm
flag = self.flag
if flag[0] == 'n':
flag = 'c' + flag[1:] # don't clobber an existing database
self.db.close()
# noinspection PyArgumentList
self.db = self.open(self.path, flag, self.mode, **self.open_kwargs)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __getitem__(self, key):
if isinstance(key, str):
key = key.encode("ascii")
return self.db[key]
def __setitem__(self, key, value):
if isinstance(key, str):
key = key.encode("ascii")
value = ensure_bytes(value)
with self.write_mutex:
self.db[key] = value
def __delitem__(self, key):
if isinstance(key, str):
key = key.encode("ascii")
with self.write_mutex:
del self.db[key]
def __eq__(self, other):
return (
isinstance(other, DBMStore) and
self.path == other.path and
# allow flag and mode to differ
self.open == other.open and
self.open_kwargs == other.open_kwargs
)
def keys(self):
return (ensure_text(k, "ascii") for k in iter(self.db.keys()))
def __iter__(self):
return self.keys()
def __len__(self):
return sum(1 for _ in self.keys())
def __contains__(self, key):
if isinstance(key, str):
key = key.encode("ascii")
return key in self.db
def rmdir(self, path: str = "") -> None:
path = normalize_storage_path(path)
_rmdir_from_keys(self, path)
[docs]class LMDBStore(Store):
"""Storage class using LMDB. Requires the `lmdb <https://lmdb.readthedocs.io/>`_
package to be installed.
Parameters
----------
path : string
Location of database file.
buffers : bool, optional
If True (default) use support for buffers, which should increase performance by
reducing memory copies.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
**kwargs
Keyword arguments passed through to the `lmdb.open` function.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.LMDBStore('data/array.mdb')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
>>> store.close() # don't forget to call this when you're done
Store a group::
>>> store = zarr.LMDBStore('data/group.mdb')
>>> root = zarr.group(store=store, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
>>> store.close() # don't forget to call this when you're done
After modifying a DBMStore, the ``close()`` method must be called, otherwise
essential data may not be written to the underlying database file. The
DBMStore class also supports the context manager protocol, which ensures the
``close()`` method is called on leaving the context, e.g.::
>>> with zarr.LMDBStore('data/array.mdb') as store:
... z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
... z[...] = 42
... # no need to call store.close()
Notes
-----
By default writes are not immediately flushed to disk to increase performance. You
can ensure data are flushed to disk by calling the ``flush()`` or ``close()`` methods.
Should be safe to write in multiple threads or processes due to the synchronization
support within LMDB, although writing from multiple processes has not been tested.
"""
def __init__(self, path, buffers=True, dimension_separator=None, **kwargs):
import lmdb
# set default memory map size to something larger than the lmdb default, which is
# very likely to be too small for any moderate array (logic copied from zict)
map_size = (2**40 if sys.maxsize >= 2**32 else 2**28)
kwargs.setdefault('map_size', map_size)
# don't initialize buffers to zero by default, shouldn't be necessary
kwargs.setdefault('meminit', False)
# decide whether to use the writemap option based on the operating system's
# support for sparse files - writemap requires sparse file support otherwise
# the whole# `map_size` may be reserved up front on disk (logic copied from zict)
writemap = sys.platform.startswith('linux')
kwargs.setdefault('writemap', writemap)
# decide options for when data are flushed to disk - choose to delay syncing
# data to filesystem, otherwise pay a large performance penalty (zict also does
# this)
kwargs.setdefault('metasync', False)
kwargs.setdefault('sync', False)
kwargs.setdefault('map_async', False)
# set default option for number of cached transactions
max_spare_txns = multiprocessing.cpu_count()
kwargs.setdefault('max_spare_txns', max_spare_txns)
# normalize path
path = os.path.abspath(path)
# open database
self.db = lmdb.open(path, **kwargs)
# store properties
self.buffers = buffers
self.path = path
self.kwargs = kwargs
self._dimension_separator = dimension_separator
def __getstate__(self):
try:
self.flush() # just in case
except Exception:
# flush may fail if db has already been closed
pass
return self.path, self.buffers, self.kwargs
def __setstate__(self, state):
path, buffers, kwargs = state
self.__init__(path=path, buffers=buffers, **kwargs)
[docs] def close(self):
"""Closes the underlying database."""
self.db.close()
[docs] def flush(self):
"""Synchronizes data to the file system."""
self.db.sync()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __getitem__(self, key):
if isinstance(key, str):
key = key.encode("ascii")
# use the buffers option, should avoid a memory copy
with self.db.begin(buffers=self.buffers) as txn:
value = txn.get(key)
if value is None:
raise KeyError(key)
return value
def __setitem__(self, key, value):
if isinstance(key, str):
key = key.encode("ascii")
with self.db.begin(write=True, buffers=self.buffers) as txn:
txn.put(key, value)
def __delitem__(self, key):
if isinstance(key, str):
key = key.encode("ascii")
with self.db.begin(write=True) as txn:
if not txn.delete(key):
raise KeyError(key)
def __contains__(self, key):
if isinstance(key, str):
key = key.encode("ascii")
with self.db.begin(buffers=self.buffers) as txn:
with txn.cursor() as cursor:
return cursor.set_key(key)
def items(self):
with self.db.begin(buffers=self.buffers) as txn:
with txn.cursor() as cursor:
for k, v in cursor.iternext(keys=True, values=True):
yield ensure_text(k, "ascii"), v
def keys(self):
with self.db.begin(buffers=self.buffers) as txn:
with txn.cursor() as cursor:
for k in cursor.iternext(keys=True, values=False):
yield ensure_text(k, "ascii")
def values(self):
with self.db.begin(buffers=self.buffers) as txn:
with txn.cursor() as cursor:
for v in cursor.iternext(keys=False, values=True):
yield v
def __iter__(self):
return self.keys()
def __len__(self):
return self.db.stat()['entries']
[docs]class LRUStoreCache(Store):
"""Storage class that implements a least-recently-used (LRU) cache layer over
some other store. Intended primarily for use with stores that can be slow to
access, e.g., remote stores that require network communication to store and
retrieve data.
Parameters
----------
store : Store
The store containing the actual data to be cached.
max_size : int
The maximum size that the cache may grow to, in number of bytes. Provide `None`
if you would like the cache to have unlimited size.
Examples
--------
The example below wraps an S3 store with an LRU cache::
>>> import s3fs
>>> import zarr
>>> s3 = s3fs.S3FileSystem(anon=True, client_kwargs=dict(region_name='eu-west-2'))
>>> store = s3fs.S3Map(root='zarr-demo/store', s3=s3, check=False)
>>> cache = zarr.LRUStoreCache(store, max_size=2**28)
>>> root = zarr.group(store=cache) # doctest: +REMOTE_DATA
>>> z = root['foo/bar/baz'] # doctest: +REMOTE_DATA
>>> from timeit import timeit
>>> # first data access is relatively slow, retrieved from store
... timeit('print(z[:].tobytes())', number=1, globals=globals()) # doctest: +SKIP
b'Hello from the cloud!'
0.1081731989979744
>>> # second data access is faster, uses cache
... timeit('print(z[:].tobytes())', number=1, globals=globals()) # doctest: +SKIP
b'Hello from the cloud!'
0.0009490990014455747
"""
def __init__(self, store: StoreLike, max_size: int):
self._store: BaseStore = BaseStore._ensure_store(store)
self._max_size = max_size
self._current_size = 0
self._keys_cache = None
self._contains_cache = None
self._listdir_cache: Dict[Path, Any] = dict()
self._values_cache: Dict[Path, Any] = OrderedDict()
self._mutex = Lock()
self.hits = self.misses = 0
def __getstate__(self):
return (self._store, self._max_size, self._current_size, self._keys_cache,
self._contains_cache, self._listdir_cache, self._values_cache, self.hits,
self.misses)
def __setstate__(self, state):
(self._store, self._max_size, self._current_size, self._keys_cache,
self._contains_cache, self._listdir_cache, self._values_cache, self.hits,
self.misses) = state
self._mutex = Lock()
def __len__(self):
return len(self._keys())
def __iter__(self):
return self.keys()
def __contains__(self, key):
with self._mutex:
if self._contains_cache is None:
self._contains_cache = set(self._keys())
return key in self._contains_cache
def clear(self):
self._store.clear()
self.invalidate()
def keys(self):
with self._mutex:
return iter(self._keys())
def _keys(self):
if self._keys_cache is None:
self._keys_cache = list(self._store.keys())
return self._keys_cache
def listdir(self, path: Path = None):
with self._mutex:
try:
return self._listdir_cache[path]
except KeyError:
listing = listdir(self._store, path)
self._listdir_cache[path] = listing
return listing
def getsize(self, path=None) -> int:
return getsize(self._store, path=path)
def _pop_value(self):
# remove the first value from the cache, as this will be the least recently
# used value
_, v = self._values_cache.popitem(last=False)
return v
def _accommodate_value(self, value_size):
if self._max_size is None:
return
# ensure there is enough space in the cache for a new value
while self._current_size + value_size > self._max_size:
v = self._pop_value()
self._current_size -= buffer_size(v)
def _cache_value(self, key: Path, value):
# cache a value
value_size = buffer_size(value)
# check size of the value against max size, as if the value itself exceeds max
# size then we are never going to cache it
if self._max_size is None or value_size <= self._max_size:
self._accommodate_value(value_size)
self._values_cache[key] = value
self._current_size += value_size
[docs] def invalidate(self):
"""Completely clear the cache."""
with self._mutex:
self._values_cache.clear()
self._invalidate_keys()
self._current_size = 0
[docs] def invalidate_values(self):
"""Clear the values cache."""
with self._mutex:
self._values_cache.clear()
[docs] def invalidate_keys(self):
"""Clear the keys cache."""
with self._mutex:
self._invalidate_keys()
def _invalidate_keys(self):
self._keys_cache = None
self._contains_cache = None
self._listdir_cache.clear()
def _invalidate_value(self, key):
if key in self._values_cache:
value = self._values_cache.pop(key)
self._current_size -= buffer_size(value)
def __getitem__(self, key):
try:
# first try to obtain the value from the cache
with self._mutex:
value = self._values_cache[key]
# cache hit if no KeyError is raised
self.hits += 1
# treat the end as most recently used
self._values_cache.move_to_end(key)
except KeyError:
# cache miss, retrieve value from the store
value = self._store[key]
with self._mutex:
self.misses += 1
# need to check if key is not in the cache, as it may have been cached
# while we were retrieving the value from the store
if key not in self._values_cache:
self._cache_value(key, value)
return value
def __setitem__(self, key, value):
self._store[key] = value
with self._mutex:
self._invalidate_keys()
self._invalidate_value(key)
self._cache_value(key, value)
def __delitem__(self, key):
del self._store[key]
with self._mutex:
self._invalidate_keys()
self._invalidate_value(key)
[docs]class SQLiteStore(Store):
"""Storage class using SQLite.
Parameters
----------
path : string
Location of database file.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
**kwargs
Keyword arguments passed through to the `sqlite3.connect` function.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.SQLiteStore('data/array.sqldb')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
>>> store.close() # don't forget to call this when you're done
Store a group::
>>> store = zarr.SQLiteStore('data/group.sqldb')
>>> root = zarr.group(store=store, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
>>> store.close() # don't forget to call this when you're done
"""
def __init__(self, path, dimension_separator=None, **kwargs):
import sqlite3
self._dimension_separator = dimension_separator
# normalize path
if path != ':memory:':
path = os.path.abspath(path)
# store properties
self.path = path
self.kwargs = kwargs
# allow threading if SQLite connections are thread-safe
#
# ref: https://www.sqlite.org/releaselog/3_3_1.html
# ref: https://github.com/python/cpython/issues/71377
check_same_thread = True
if sqlite3.sqlite_version_info >= (3, 3, 1):
check_same_thread = False
# keep a lock for serializing mutable operations
self.lock = Lock()
# open database
self.db = sqlite3.connect(
self.path,
detect_types=0,
isolation_level=None,
check_same_thread=check_same_thread,
**self.kwargs
)
# handle keys as `str`s
self.db.text_factory = str
# get a cursor to read/write to the database
self.cursor = self.db.cursor()
# initialize database with our table if missing
with self.lock:
self.cursor.execute(
'CREATE TABLE IF NOT EXISTS zarr(k TEXT PRIMARY KEY, v BLOB)'
)
def __getstate__(self):
if self.path == ':memory:':
raise PicklingError('Cannot pickle in-memory SQLite databases')
return self.path, self.kwargs
def __setstate__(self, state):
path, kwargs = state
self.__init__(path=path, **kwargs)
[docs] def close(self):
"""Closes the underlying database."""
# close cursor and db objects
self.cursor.close()
self.db.close()
def __getitem__(self, key):
value = self.cursor.execute('SELECT v FROM zarr WHERE (k = ?)', (key,))
for v, in value:
return v
raise KeyError(key)
def __setitem__(self, key, value):
self.update({key: value})
def __delitem__(self, key):
with self.lock:
self.cursor.execute('DELETE FROM zarr WHERE (k = ?)', (key,))
if self.cursor.rowcount < 1:
raise KeyError(key)
def __contains__(self, key):
cs = self.cursor.execute(
'SELECT COUNT(*) FROM zarr WHERE (k = ?)', (key,)
)
for has, in cs:
has = bool(has)
return has
def items(self):
kvs = self.cursor.execute('SELECT k, v FROM zarr')
for k, v in kvs:
yield k, v
def keys(self):
ks = self.cursor.execute('SELECT k FROM zarr')
for k, in ks:
yield k
def values(self):
vs = self.cursor.execute('SELECT v FROM zarr')
for v, in vs:
yield v
def __iter__(self):
return self.keys()
def __len__(self):
cs = self.cursor.execute('SELECT COUNT(*) FROM zarr')
for c, in cs:
return c
def update(self, *args, **kwargs):
args += (kwargs,)
kv_list = []
for dct in args:
for k, v in dct.items():
v = ensure_contiguous_ndarray_like(v)
# Accumulate key-value pairs for storage
kv_list.append((k, v))
with self.lock:
self.cursor.executemany('REPLACE INTO zarr VALUES (?, ?)', kv_list)
def listdir(self, path=None):
path = normalize_storage_path(path)
sep = '_' if path == '' else '/'
keys = self.cursor.execute(
'''
SELECT DISTINCT SUBSTR(m, 0, INSTR(m, "/")) AS l FROM (
SELECT LTRIM(SUBSTR(k, LENGTH(?) + 1), "/") || "/" AS m
FROM zarr WHERE k LIKE (? || "{sep}%")
) ORDER BY l ASC
'''.format(sep=sep),
(path, path)
)
keys = list(map(operator.itemgetter(0), keys))
return keys
def getsize(self, path=None):
path = normalize_storage_path(path)
size = self.cursor.execute(
'''
SELECT COALESCE(SUM(LENGTH(v)), 0) FROM zarr
WHERE k LIKE (? || "%") AND
0 == INSTR(LTRIM(SUBSTR(k, LENGTH(?) + 1), "/"), "/")
''',
(path, path)
)
for s, in size:
return s
def rmdir(self, path=None):
path = normalize_storage_path(path)
if path:
with self.lock:
self.cursor.execute(
'DELETE FROM zarr WHERE k LIKE (? || "/%")', (path,)
)
else:
self.clear()
def clear(self):
with self.lock:
self.cursor.executescript(
'''
BEGIN TRANSACTION;
DROP TABLE zarr;
CREATE TABLE zarr(k TEXT PRIMARY KEY, v BLOB);
COMMIT TRANSACTION;
'''
)
[docs]class MongoDBStore(Store):
"""Storage class using MongoDB.
.. note:: This is an experimental feature.
Requires the `pymongo <https://pymongo.readthedocs.io/en/stable/>`_
package to be installed.
Parameters
----------
database : string
Name of database
collection : string
Name of collection
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
**kwargs
Keyword arguments passed through to the `pymongo.MongoClient` function.
Notes
-----
The maximum chunksize in MongoDB documents is 16 MB.
"""
_key = 'key'
_value = 'value'
def __init__(self, database='mongodb_zarr', collection='zarr_collection',
dimension_separator=None, **kwargs):
import pymongo
self._database = database
self._collection = collection
self._dimension_separator = dimension_separator
self._kwargs = kwargs
self.client = pymongo.MongoClient(**self._kwargs)
self.db = self.client.get_database(self._database)
self.collection = self.db.get_collection(self._collection)
def __getitem__(self, key):
doc = self.collection.find_one({self._key: key})
if doc is None:
raise KeyError(key)
else:
return doc[self._value]
def __setitem__(self, key, value):
value = ensure_bytes(value)
self.collection.replace_one({self._key: key},
{self._key: key, self._value: value},
upsert=True)
def __delitem__(self, key):
result = self.collection.delete_many({self._key: key})
if not result.deleted_count == 1:
raise KeyError(key)
def __iter__(self):
for f in self.collection.find({}):
yield f[self._key]
def __len__(self):
return self.collection.count_documents({})
def __getstate__(self):
return self._database, self._collection, self._kwargs
def __setstate__(self, state):
database, collection, kwargs = state
self.__init__(database=database, collection=collection, **kwargs)
def close(self):
"""Cleanup client resources and disconnect from MongoDB."""
self.client.close()
def clear(self):
"""Remove all items from store."""
self.collection.delete_many({})
[docs]class RedisStore(Store):
"""Storage class using Redis.
.. note:: This is an experimental feature.
Requires the `redis <https://redis-py.readthedocs.io/>`_
package to be installed.
Parameters
----------
prefix : string
Name of prefix for Redis keys
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
**kwargs
Keyword arguments passed through to the `redis.Redis` function.
"""
def __init__(self, prefix='zarr', dimension_separator=None, **kwargs):
import redis
self._prefix = prefix
self._kwargs = kwargs
self._dimension_separator = dimension_separator
self.client = redis.Redis(**kwargs)
def _key(self, key):
return '{prefix}:{key}'.format(prefix=self._prefix, key=key)
def __getitem__(self, key):
return self.client[self._key(key)]
def __setitem__(self, key, value):
value = ensure_bytes(value)
self.client[self._key(key)] = value
def __delitem__(self, key):
count = self.client.delete(self._key(key))
if not count:
raise KeyError(key)
def keylist(self):
offset = len(self._key('')) # length of prefix
return [key[offset:].decode('utf-8')
for key in self.client.keys(self._key('*'))]
def keys(self):
for key in self.keylist():
yield key
def __iter__(self):
for key in self.keys():
yield key
def __len__(self):
return len(self.keylist())
def __getstate__(self):
return self._prefix, self._kwargs
def __setstate__(self, state):
prefix, kwargs = state
self.__init__(prefix=prefix, **kwargs)
def clear(self):
for key in self.keys():
del self[key]