"""Convenience functions for storing and loading data."""
import itertools
import os
import re
from collections.abc import Mapping, MutableMapping
from zarr._storage.store import data_root, meta_root, assert_zarr_v3_api_available
from zarr.core import Array
from zarr.creation import array as _create_array
from zarr.creation import open_array
from zarr.errors import CopyError, PathNotFoundError
from zarr.hierarchy import Group
from zarr.hierarchy import group as _create_group
from zarr.hierarchy import open_group
from zarr.meta import json_dumps, json_loads
from zarr.storage import (
_get_metadata_suffix,
contains_array,
contains_group,
normalize_store_arg,
BaseStore,
ConsolidatedMetadataStore,
)
from zarr._storage.v3 import ConsolidatedMetadataStoreV3
from zarr.util import TreeViewer, buffer_size, normalize_storage_path
from typing import Union
StoreLike = Union[BaseStore, MutableMapping, str, None]
_builtin_open = open # builtin open is later shadowed by a local open function
def _check_and_update_path(store: BaseStore, path):
if getattr(store, "_store_version", 2) > 2 and not path:
raise ValueError("path must be provided for v3 stores")
return normalize_storage_path(path)
# noinspection PyShadowingBuiltins
[docs]
def open(store: StoreLike = None, mode: str = "a", *, zarr_version=None, path=None, **kwargs):
"""Convenience function to open a group or array using file-mode-like semantics.
Parameters
----------
store : Store or string, optional
Store or path to directory in file system or name of zip file.
mode : {'r', 'r+', 'a', 'w', 'w-'}, optional
Persistence mode: 'r' means read only (must exist); 'r+' means
read/write (must exist); 'a' means read/write (create if doesn't
exist); 'w' means create (overwrite if exists); 'w-' means create
(fail if exists).
zarr_version : {2, 3, None}, optional
The zarr protocol version to use. The default value of None will attempt
to infer the version from `store` if possible, otherwise it will fall
back to 2.
.. warning:: `zarr_version=3` is currently using the experimental Zarr V3
implementation. This implementation is not in sync with the final specification
and will be replaced with a spec compliant version in the version 3.0.
path : str or None, optional
The path within the store to open.
**kwargs
Additional parameters are passed through to :func:`zarr.creation.open_array` or
:func:`zarr.hierarchy.open_group`.
Returns
-------
z : :class:`zarr.core.Array` or :class:`zarr.hierarchy.Group`
Array or group, depending on what exists in the given store.
See Also
--------
zarr.creation.open_array, zarr.hierarchy.open_group
Examples
--------
Storing data in a directory 'data/example.zarr' on the local file system::
>>> import zarr
>>> store = 'data/example.zarr'
>>> zw = zarr.open(store, mode='w', shape=100, dtype='i4') # open new array
>>> zw
<zarr.core.Array (100,) int32>
>>> za = zarr.open(store, mode='a') # open existing array for reading and writing
>>> za
<zarr.core.Array (100,) int32>
>>> zr = zarr.open(store, mode='r') # open existing array read-only
>>> zr
<zarr.core.Array (100,) int32 read-only>
>>> gw = zarr.open(store, mode='w') # open new group, overwriting previous data
>>> gw
<zarr.hierarchy.Group '/'>
>>> ga = zarr.open(store, mode='a') # open existing group for reading and writing
>>> ga
<zarr.hierarchy.Group '/'>
>>> gr = zarr.open(store, mode='r') # open existing group read-only
>>> gr
<zarr.hierarchy.Group '/' read-only>
"""
# handle polymorphic store arg
# we pass storage options explicitly, since normalize_store_arg might construct
# a store if the input is a fsspec-compatible URL
_store: BaseStore = normalize_store_arg(
store,
storage_options=kwargs.pop("storage_options", {}),
mode=mode,
zarr_version=zarr_version,
)
# path = _check_and_update_path(_store, path)
path = normalize_storage_path(path)
kwargs["path"] = path
if mode in {"w", "w-", "x"}:
if "shape" in kwargs:
return open_array(_store, mode=mode, **kwargs)
else:
return open_group(_store, mode=mode, **kwargs)
elif mode == "a":
if "shape" in kwargs or contains_array(_store, path):
return open_array(_store, mode=mode, **kwargs)
else:
return open_group(_store, mode=mode, **kwargs)
else:
if contains_array(_store, path):
return open_array(_store, mode=mode, **kwargs)
elif contains_group(_store, path):
return open_group(_store, mode=mode, **kwargs)
else:
raise PathNotFoundError(path)
def _might_close(path):
return isinstance(path, (str, os.PathLike))
[docs]
def save_array(store: StoreLike, arr, *, zarr_version=None, path=None, **kwargs):
"""Convenience function to save a NumPy array to the local file system, following a
similar API to the NumPy save() function.
Parameters
----------
store : MutableMapping or string
Store or path to directory in file system or name of zip file.
arr : ndarray
NumPy array with data to save.
zarr_version : {2, 3, None}, optional
The zarr protocol version to use when saving. The default value of None
will attempt to infer the version from `store` if possible, otherwise
it will fall back to 2.
.. warning:: `zarr_version=3` is currently using the experimental Zarr V3
implementation. This implementation is not in sync with the final specification
and will be replaced with a spec compliant version in the version 3.0.
path : str or None, optional
The path within the store where the array will be saved.
kwargs
Passed through to :func:`create`, e.g., compressor.
Examples
--------
Save an array to a directory on the file system (uses a :class:`DirectoryStore`)::
>>> import zarr
>>> import numpy as np
>>> arr = np.arange(10000)
>>> zarr.save_array('data/example.zarr', arr)
>>> zarr.load('data/example.zarr')
array([ 0, 1, 2, ..., 9997, 9998, 9999])
Save an array to a single file (uses a :class:`ZipStore`)::
>>> zarr.save_array('data/example.zip', arr)
>>> zarr.load('data/example.zip')
array([ 0, 1, 2, ..., 9997, 9998, 9999])
"""
may_need_closing = _might_close(store)
_store: BaseStore = normalize_store_arg(store, mode="w", zarr_version=zarr_version)
path = _check_and_update_path(_store, path)
try:
_create_array(
arr, store=_store, overwrite=True, zarr_version=zarr_version, path=path, **kwargs
)
finally:
if may_need_closing:
# needed to ensure zip file records are written
_store.close()
[docs]
def save_group(store: StoreLike, *args, zarr_version=None, path=None, **kwargs):
"""Convenience function to save several NumPy arrays to the local file system, following a
similar API to the NumPy savez()/savez_compressed() functions.
Parameters
----------
store : MutableMapping or string
Store or path to directory in file system or name of zip file.
args : ndarray
NumPy arrays with data to save.
zarr_version : {2, 3, None}, optional
The zarr protocol version to use when saving. The default value of None
will attempt to infer the version from `store` if possible, otherwise
it will fall back to 2.
.. warning:: `zarr_version=3` is currently using the experimental Zarr V3
implementation. This implementation is not in sync with the final specification
and will be replaced with a spec compliant version in the version 3.0.
path : str or None, optional
Path within the store where the group will be saved.
kwargs
NumPy arrays with data to save.
Examples
--------
Save several arrays to a directory on the file system (uses a
:class:`DirectoryStore`):
>>> import zarr
>>> import numpy as np
>>> a1 = np.arange(10000)
>>> a2 = np.arange(10000, 0, -1)
>>> zarr.save_group('data/example.zarr', a1, a2)
>>> loader = zarr.load('data/example.zarr')
>>> loader
<LazyLoader: arr_0, arr_1>
>>> loader['arr_0']
array([ 0, 1, 2, ..., 9997, 9998, 9999])
>>> loader['arr_1']
array([10000, 9999, 9998, ..., 3, 2, 1])
Save several arrays using named keyword arguments::
>>> zarr.save_group('data/example.zarr', foo=a1, bar=a2)
>>> loader = zarr.load('data/example.zarr')
>>> loader
<LazyLoader: bar, foo>
>>> loader['foo']
array([ 0, 1, 2, ..., 9997, 9998, 9999])
>>> loader['bar']
array([10000, 9999, 9998, ..., 3, 2, 1])
Store several arrays in a single zip file (uses a :class:`ZipStore`)::
>>> zarr.save_group('data/example.zip', foo=a1, bar=a2)
>>> loader = zarr.load('data/example.zip')
>>> loader
<LazyLoader: bar, foo>
>>> loader['foo']
array([ 0, 1, 2, ..., 9997, 9998, 9999])
>>> loader['bar']
array([10000, 9999, 9998, ..., 3, 2, 1])
Notes
-----
Default compression options will be used.
"""
if len(args) == 0 and len(kwargs) == 0:
raise ValueError("at least one array must be provided")
# handle polymorphic store arg
may_need_closing = _might_close(store)
_store: BaseStore = normalize_store_arg(store, mode="w", zarr_version=zarr_version)
path = _check_and_update_path(_store, path)
try:
grp = _create_group(_store, path=path, overwrite=True, zarr_version=zarr_version)
for i, arr in enumerate(args):
k = f"arr_{i}"
grp.create_dataset(k, data=arr, overwrite=True, zarr_version=zarr_version)
for k, arr in kwargs.items():
grp.create_dataset(k, data=arr, overwrite=True, zarr_version=zarr_version)
finally:
if may_need_closing:
# needed to ensure zip file records are written
_store.close()
[docs]
def save(store: StoreLike, *args, zarr_version=None, path=None, **kwargs):
"""Convenience function to save an array or group of arrays to the local file system.
Parameters
----------
store : MutableMapping or string
Store or path to directory in file system or name of zip file.
args : ndarray
NumPy arrays with data to save.
zarr_version : {2, 3, None}, optional
The zarr protocol version to use when saving. The default value of None
will attempt to infer the version from `store` if possible, otherwise
it will fall back to 2.
.. warning:: `zarr_version=3` is currently using the experimental Zarr V3
implementation. This implementation is not in sync with the final specification
and will be replaced with a spec compliant version in the version 3.0.
path : str or None, optional
The path within the group where the arrays will be saved.
kwargs
NumPy arrays with data to save.
Examples
--------
Save an array to a directory on the file system (uses a :class:`DirectoryStore`)::
>>> import zarr
>>> import numpy as np
>>> arr = np.arange(10000)
>>> zarr.save('data/example.zarr', arr)
>>> zarr.load('data/example.zarr')
array([ 0, 1, 2, ..., 9997, 9998, 9999])
Save an array to a Zip file (uses a :class:`ZipStore`)::
>>> zarr.save('data/example.zip', arr)
>>> zarr.load('data/example.zip')
array([ 0, 1, 2, ..., 9997, 9998, 9999])
Save several arrays to a directory on the file system (uses a
:class:`DirectoryStore` and stores arrays in a group)::
>>> import zarr
>>> import numpy as np
>>> a1 = np.arange(10000)
>>> a2 = np.arange(10000, 0, -1)
>>> zarr.save('data/example.zarr', a1, a2)
>>> loader = zarr.load('data/example.zarr')
>>> loader
<LazyLoader: arr_0, arr_1>
>>> loader['arr_0']
array([ 0, 1, 2, ..., 9997, 9998, 9999])
>>> loader['arr_1']
array([10000, 9999, 9998, ..., 3, 2, 1])
Save several arrays using named keyword arguments::
>>> zarr.save('data/example.zarr', foo=a1, bar=a2)
>>> loader = zarr.load('data/example.zarr')
>>> loader
<LazyLoader: bar, foo>
>>> loader['foo']
array([ 0, 1, 2, ..., 9997, 9998, 9999])
>>> loader['bar']
array([10000, 9999, 9998, ..., 3, 2, 1])
Store several arrays in a single zip file (uses a :class:`ZipStore`)::
>>> zarr.save('data/example.zip', foo=a1, bar=a2)
>>> loader = zarr.load('data/example.zip')
>>> loader
<LazyLoader: bar, foo>
>>> loader['foo']
array([ 0, 1, 2, ..., 9997, 9998, 9999])
>>> loader['bar']
array([10000, 9999, 9998, ..., 3, 2, 1])
See Also
--------
save_array, save_group
"""
if len(args) == 0 and len(kwargs) == 0:
raise ValueError("at least one array must be provided")
if len(args) == 1 and len(kwargs) == 0:
save_array(store, args[0], zarr_version=zarr_version, path=path)
else:
save_group(store, *args, zarr_version=zarr_version, path=path, **kwargs)
class LazyLoader(Mapping):
def __init__(self, grp):
self.grp = grp
self.cache = dict()
def __getitem__(self, item):
try:
return self.cache[item]
except KeyError:
arr = self.grp[item][...]
self.cache[item] = arr
return arr
def __len__(self):
return len(self.grp)
def __iter__(self):
return iter(self.grp)
def __contains__(self, item):
return item in self.grp
def __repr__(self):
r = "<LazyLoader: "
r += ", ".join(sorted(self.grp.array_keys()))
r += ">"
return r
[docs]
def load(store: StoreLike, zarr_version=None, path=None):
"""Load data from an array or group into memory.
Parameters
----------
store : MutableMapping or string
Store or path to directory in file system or name of zip file.
zarr_version : {2, 3, None}, optional
The zarr protocol version to use when loading. The default value of
None will attempt to infer the version from `store` if possible,
otherwise it will fall back to 2.
.. warning:: `zarr_version=3` is currently using the experimental Zarr V3
implementation. This implementation is not in sync with the final specification
and will be replaced with a spec compliant version in the version 3.0.
path : str or None, optional
The path within the store from which to load.
Returns
-------
out
If the store contains an array, out will be a numpy array. If the store contains
a group, out will be a dict-like object where keys are array names and values
are numpy arrays.
See Also
--------
save, savez
Notes
-----
If loading data from a group of arrays, data will not be immediately loaded into
memory. Rather, arrays will be loaded into memory as they are requested.
"""
# handle polymorphic store arg
_store = normalize_store_arg(store, zarr_version=zarr_version)
path = _check_and_update_path(_store, path)
if contains_array(_store, path=path):
return Array(store=_store, path=path)[...]
elif contains_group(_store, path=path):
grp = Group(store=_store, path=path)
return LazyLoader(grp)
[docs]
def tree(grp, expand=False, level=None):
"""Provide a ``print``-able display of the hierarchy. This function is provided
mainly as a convenience for obtaining a tree view of an h5py group - zarr groups
have a ``.tree()`` method.
Parameters
----------
grp : Group
Zarr or h5py group.
expand : bool, optional
Only relevant for HTML representation. If True, tree will be fully expanded.
level : int, optional
Maximum depth to descend into hierarchy.
Examples
--------
>>> import zarr
>>> g1 = zarr.group()
>>> g2 = g1.create_group('foo')
>>> g3 = g1.create_group('bar')
>>> g4 = g3.create_group('baz')
>>> g5 = g3.create_group('qux')
>>> d1 = g5.create_dataset('baz', shape=100, chunks=10)
>>> g1.tree()
/
├── bar
│ ├── baz
│ └── qux
│ └── baz (100,) float64
└── foo
>>> import h5py
>>> h5f = h5py.File('data/example.h5', mode='w')
>>> zarr.copy_all(g1, h5f)
(5, 0, 800)
>>> zarr.tree(h5f)
/
├── bar
│ ├── baz
│ └── qux
│ └── baz (100,) float64
└── foo
See Also
--------
zarr.hierarchy.Group.tree
Notes
-----
Please note that this is an experimental feature. The behaviour of this
function is still evolving and the default output and/or parameters may change
in future versions.
"""
return TreeViewer(grp, expand=expand, level=level)
class _LogWriter:
def __init__(self, log):
self.log_func = None
self.log_file = None
self.needs_closing = False
if log is None:
# don't do any logging
pass
elif callable(log):
self.log_func = log
elif isinstance(log, str):
self.log_file = _builtin_open(log, mode="w")
self.needs_closing = True
elif hasattr(log, "write"):
self.log_file = log
else:
raise TypeError(
f"log must be a callable function, file path or file-like object, found {log!r}"
)
def __enter__(self):
return self
def __exit__(self, *args):
if self.log_file is not None and self.needs_closing:
self.log_file.close()
def __call__(self, *args, **kwargs):
if self.log_file is not None:
kwargs["file"] = self.log_file
print(*args, **kwargs)
if hasattr(self.log_file, "flush"):
# get immediate feedback
self.log_file.flush()
elif self.log_func is not None:
self.log_func(*args, **kwargs)
def _log_copy_summary(log, dry_run, n_copied, n_skipped, n_bytes_copied):
# log a final message with a summary of what happened
if dry_run:
message = "dry run: "
else:
message = "all done: "
message += f"{n_copied:,} copied, {n_skipped:,} skipped"
if not dry_run:
message += f", {n_bytes_copied:,} bytes copied"
log(message)
[docs]
def copy_store(
source,
dest,
source_path="",
dest_path="",
excludes=None,
includes=None,
flags=0,
if_exists="raise",
dry_run=False,
log=None,
):
"""Copy data directly from the `source` store to the `dest` store. Use this
function when you want to copy a group or array in the most efficient way,
preserving all configuration and attributes. This function is more efficient
than the copy() or copy_all() functions because it avoids de-compressing and
re-compressing data, rather the compressed chunk data for each array are
copied directly between stores.
Parameters
----------
source : Mapping
Store to copy data from.
dest : MutableMapping
Store to copy data into.
source_path : str, optional
Only copy data from under this path in the source store.
dest_path : str, optional
Copy data into this path in the destination store.
excludes : sequence of str, optional
One or more regular expressions which will be matched against keys in
the source store. Any matching key will not be copied.
includes : sequence of str, optional
One or more regular expressions which will be matched against keys in
the source store and will override any excludes also matching.
flags : int, optional
Regular expression flags used for matching excludes and includes.
if_exists : {'raise', 'replace', 'skip'}, optional
How to handle keys that already exist in the destination store. If
'raise' then a CopyError is raised on the first key already present
in the destination store. If 'replace' then any data will be replaced in
the destination. If 'skip' then any existing keys will not be copied.
dry_run : bool, optional
If True, don't actually copy anything, just log what would have
happened.
log : callable, file path or file-like object, optional
If provided, will be used to log progress information.
Returns
-------
n_copied : int
Number of items copied.
n_skipped : int
Number of items skipped.
n_bytes_copied : int
Number of bytes of data that were actually copied.
Examples
--------
>>> import zarr
>>> store1 = zarr.DirectoryStore('data/example.zarr')
>>> root = zarr.group(store1, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.create_group('bar')
>>> baz = bar.create_dataset('baz', shape=100, chunks=50, dtype='i8')
>>> import numpy as np
>>> baz[:] = np.arange(100)
>>> root.tree()
/
└── foo
└── bar
└── baz (100,) int64
>>> from sys import stdout
>>> store2 = zarr.ZipStore('data/example.zip', mode='w')
>>> zarr.copy_store(store1, store2, log=stdout)
copy .zgroup
copy foo/.zgroup
copy foo/bar/.zgroup
copy foo/bar/baz/.zarray
copy foo/bar/baz/0
copy foo/bar/baz/1
all done: 6 copied, 0 skipped, 566 bytes copied
(6, 0, 566)
>>> new_root = zarr.group(store2)
>>> new_root.tree()
/
└── foo
└── bar
└── baz (100,) int64
>>> new_root['foo/bar/baz'][:]
array([ 0, 1, 2, ..., 97, 98, 99])
>>> store2.close() # zip stores need to be closed
Notes
-----
Please note that this is an experimental feature. The behaviour of this
function is still evolving and the default behaviour and/or parameters may change
in future versions.
"""
# normalize paths
source_path = normalize_storage_path(source_path)
dest_path = normalize_storage_path(dest_path)
if source_path:
source_path = source_path + "/"
if dest_path:
dest_path = dest_path + "/"
# normalize excludes and includes
if excludes is None:
excludes = []
elif isinstance(excludes, str):
excludes = [excludes]
if includes is None:
includes = []
elif isinstance(includes, str):
includes = [includes]
excludes = [re.compile(e, flags) for e in excludes]
includes = [re.compile(i, flags) for i in includes]
# check if_exists parameter
valid_if_exists = ["raise", "replace", "skip"]
if if_exists not in valid_if_exists:
raise ValueError(f"if_exists must be one of {valid_if_exists!r}; found {if_exists!r}")
# setup counting variables
n_copied = n_skipped = n_bytes_copied = 0
source_store_version = getattr(source, "_store_version", 2)
dest_store_version = getattr(dest, "_store_version", 2)
if source_store_version != dest_store_version:
raise ValueError("zarr stores must share the same protocol version")
if source_store_version > 2:
nchar_root = len(meta_root)
# code below assumes len(meta_root) === len(data_root)
assert len(data_root) == nchar_root
# setup logging
with _LogWriter(log) as log:
# iterate over source keys
for source_key in sorted(source.keys()):
# filter to keys under source path
if source_store_version == 2:
if not source_key.startswith(source_path):
continue
elif source_store_version == 3:
# skip 'meta/root/' or 'data/root/' at start of source_key
if not source_key[nchar_root:].startswith(source_path):
continue
# process excludes and includes
exclude = False
for prog in excludes:
if prog.search(source_key):
exclude = True
break
if exclude:
for prog in includes:
if prog.search(source_key):
exclude = False
break
if exclude:
continue
# map key to destination path
if source_store_version == 2:
key_suffix = source_key[len(source_path) :]
dest_key = dest_path + key_suffix
elif source_store_version == 3:
# nchar_root is length of 'meta/root/' or 'data/root/'
key_suffix = source_key[nchar_root + len(source_path) :]
dest_key = source_key[:nchar_root] + dest_path + key_suffix
# create a descriptive label for this operation
descr = source_key
if dest_key != source_key:
descr = descr + " -> " + dest_key
# decide what to do
do_copy = True
if if_exists != "replace":
if dest_key in dest:
if if_exists == "raise":
raise CopyError(f"key {dest_key!r} exists in destination")
elif if_exists == "skip":
do_copy = False
# take action
if do_copy:
log(f"copy {descr}")
if not dry_run:
data = source[source_key]
n_bytes_copied += buffer_size(data)
dest[dest_key] = data
n_copied += 1
else:
log(f"skip {descr}")
n_skipped += 1
# log a final message with a summary of what happened
_log_copy_summary(log, dry_run, n_copied, n_skipped, n_bytes_copied)
return n_copied, n_skipped, n_bytes_copied
def _check_dest_is_group(dest):
if not hasattr(dest, "create_dataset"):
raise ValueError(f"dest must be a group, got {dest!r}")
[docs]
def copy(
source,
dest,
name=None,
shallow=False,
without_attrs=False,
log=None,
if_exists="raise",
dry_run=False,
**create_kws,
):
"""Copy the `source` array or group into the `dest` group.
Parameters
----------
source : group or array/dataset
A zarr group or array, or an h5py group or dataset.
dest : group
A zarr or h5py group.
name : str, optional
Name to copy the object to.
shallow : bool, optional
If True, only copy immediate children of `source`.
without_attrs : bool, optional
Do not copy user attributes.
log : callable, file path or file-like object, optional
If provided, will be used to log progress information.
if_exists : {'raise', 'replace', 'skip', 'skip_initialized'}, optional
How to handle arrays that already exist in the destination group. If
'raise' then a CopyError is raised on the first array already present
in the destination group. If 'replace' then any array will be
replaced in the destination. If 'skip' then any existing arrays will
not be copied. If 'skip_initialized' then any existing arrays with
all chunks initialized will not be copied (not available when copying to
h5py).
dry_run : bool, optional
If True, don't actually copy anything, just log what would have
happened.
**create_kws
Passed through to the create_dataset method when copying an array/dataset.
Returns
-------
n_copied : int
Number of items copied.
n_skipped : int
Number of items skipped.
n_bytes_copied : int
Number of bytes of data that were actually copied.
Examples
--------
Here's an example of copying a group named 'foo' from an HDF5 file to a
Zarr group::
>>> import h5py
>>> import zarr
>>> import numpy as np
>>> source = h5py.File('data/example.h5', mode='w')
>>> foo = source.create_group('foo')
>>> baz = foo.create_dataset('bar/baz', data=np.arange(100), chunks=(50,))
>>> spam = source.create_dataset('spam', data=np.arange(100, 200), chunks=(30,))
>>> zarr.tree(source)
/
├── foo
│ └── bar
│ └── baz (100,) int64
└── spam (100,) int64
>>> dest = zarr.group()
>>> from sys import stdout
>>> zarr.copy(source['foo'], dest, log=stdout)
copy /foo
copy /foo/bar
copy /foo/bar/baz (100,) int64
all done: 3 copied, 0 skipped, 800 bytes copied
(3, 0, 800)
>>> dest.tree() # N.B., no spam
/
└── foo
└── bar
└── baz (100,) int64
>>> source.close()
The ``if_exists`` parameter provides options for how to handle pre-existing data in
the destination. Here are some examples of these options, also using
``dry_run=True`` to find out what would happen without actually copying anything::
>>> source = zarr.group()
>>> dest = zarr.group()
>>> baz = source.create_dataset('foo/bar/baz', data=np.arange(100))
>>> spam = source.create_dataset('foo/spam', data=np.arange(1000))
>>> existing_spam = dest.create_dataset('foo/spam', data=np.arange(1000))
>>> from sys import stdout
>>> try:
... zarr.copy(source['foo'], dest, log=stdout, dry_run=True)
... except zarr.CopyError as e:
... print(e)
...
copy /foo
copy /foo/bar
copy /foo/bar/baz (100,) int64
an object 'spam' already exists in destination '/foo'
>>> zarr.copy(source['foo'], dest, log=stdout, if_exists='replace', dry_run=True)
copy /foo
copy /foo/bar
copy /foo/bar/baz (100,) int64
copy /foo/spam (1000,) int64
dry run: 4 copied, 0 skipped
(4, 0, 0)
>>> zarr.copy(source['foo'], dest, log=stdout, if_exists='skip', dry_run=True)
copy /foo
copy /foo/bar
copy /foo/bar/baz (100,) int64
skip /foo/spam (1000,) int64
dry run: 3 copied, 1 skipped
(3, 1, 0)
Notes
-----
Please note that this is an experimental feature. The behaviour of this
function is still evolving and the default behaviour and/or parameters may change
in future versions.
"""
# value checks
_check_dest_is_group(dest)
# setup logging
with _LogWriter(log) as log:
# do the copying
n_copied, n_skipped, n_bytes_copied = _copy(
log,
source,
dest,
name=name,
root=True,
shallow=shallow,
without_attrs=without_attrs,
if_exists=if_exists,
dry_run=dry_run,
**create_kws,
)
# log a final message with a summary of what happened
_log_copy_summary(log, dry_run, n_copied, n_skipped, n_bytes_copied)
return n_copied, n_skipped, n_bytes_copied
def _copy(log, source, dest, name, root, shallow, without_attrs, if_exists, dry_run, **create_kws):
# N.B., if this is a dry run, dest may be None
# setup counting variables
n_copied = n_skipped = n_bytes_copied = 0
# are we copying to/from h5py?
source_h5py = source.__module__.startswith("h5py.")
dest_h5py = dest is not None and dest.__module__.startswith("h5py.")
# check if_exists parameter
valid_if_exists = ["raise", "replace", "skip", "skip_initialized"]
if if_exists not in valid_if_exists:
raise ValueError(f"if_exists must be one of {valid_if_exists!r}; found {if_exists!r}")
if dest_h5py and if_exists == "skip_initialized":
raise ValueError(f"{if_exists!r} can only be used when copying to zarr")
# determine name to copy to
if name is None:
name = source.name.split("/")[-1]
if not name:
# this can happen if source is the root group
raise TypeError(
"source has no name, please provide the `name` "
"parameter to indicate a name to copy to"
)
if hasattr(source, "shape"):
# copy a dataset/array
# check if already exists, decide what to do
do_copy = True
exists = dest is not None and name in dest
if exists:
if if_exists == "raise":
raise CopyError(f"an object {name!r} already exists in destination {dest.name!r}")
elif if_exists == "skip":
do_copy = False
elif if_exists == "skip_initialized":
ds = dest[name]
if ds.nchunks_initialized == ds.nchunks:
do_copy = False
# take action
if do_copy:
# log a message about what we're going to do
log(f"copy {source.name} {source.shape} {source.dtype}")
if not dry_run:
# clear the way
if exists:
del dest[name]
# setup creation keyword arguments
kws = create_kws.copy()
# setup chunks option, preserve by default
kws.setdefault("chunks", source.chunks)
# setup compression options
if source_h5py:
if dest_h5py:
# h5py -> h5py; preserve compression options by default
kws.setdefault("compression", source.compression)
kws.setdefault("compression_opts", source.compression_opts)
kws.setdefault("shuffle", source.shuffle)
kws.setdefault("fletcher32", source.fletcher32)
kws.setdefault("fillvalue", source.fillvalue)
else:
# h5py -> zarr; use zarr default compression options
kws.setdefault("fill_value", source.fillvalue)
else:
if dest_h5py:
# zarr -> h5py; use some vaguely sensible defaults
kws.setdefault("chunks", True)
kws.setdefault("compression", "gzip")
kws.setdefault("compression_opts", 1)
kws.setdefault("shuffle", False)
kws.setdefault("fillvalue", source.fill_value)
else:
# zarr -> zarr; preserve compression options by default
kws.setdefault("compressor", source.compressor)
kws.setdefault("filters", source.filters)
kws.setdefault("order", source.order)
kws.setdefault("fill_value", source.fill_value)
# create new dataset in destination
ds = dest.create_dataset(name, shape=source.shape, dtype=source.dtype, **kws)
# copy data - N.B., go chunk by chunk to avoid loading
# everything into memory
shape = ds.shape
chunks = ds.chunks
chunk_offsets = [range(0, s, c) for s, c in zip(shape, chunks)]
for offset in itertools.product(*chunk_offsets):
sel = tuple(slice(o, min(s, o + c)) for o, s, c in zip(offset, shape, chunks))
ds[sel] = source[sel]
n_bytes_copied += ds.size * ds.dtype.itemsize
# copy attributes
if not without_attrs:
if dest_h5py and "filters" in source.attrs:
# No filters key in v3 metadata so it was stored in the
# attributes instead. We cannot copy this key to
# HDF5 attrs, though!
source_attrs = source.attrs.asdict().copy()
source_attrs.pop("filters", None)
else:
source_attrs = source.attrs
ds.attrs.update(source_attrs)
n_copied += 1
else:
log(f"skip {source.name} {source.shape} {source.dtype}")
n_skipped += 1
elif root or not shallow:
# copy a group
# check if an array is in the way
do_copy = True
exists_array = dest is not None and name in dest and hasattr(dest[name], "shape")
if exists_array:
if if_exists == "raise":
raise CopyError(f"an array {name!r} already exists in destination {dest.name!r}")
elif if_exists == "skip":
do_copy = False
# take action
if do_copy:
# log action
log(f"copy {source.name}")
if not dry_run:
# clear the way
if exists_array:
del dest[name]
# require group in destination
grp = dest.require_group(name)
# copy attributes
if not without_attrs:
grp.attrs.update(source.attrs)
else:
# setup for dry run without creating any groups in the
# destination
if dest is not None:
grp = dest.get(name, None)
else:
grp = None
# recurse
for k in source.keys():
c, s, b = _copy(
log,
source[k],
grp,
name=k,
root=False,
shallow=shallow,
without_attrs=without_attrs,
if_exists=if_exists,
dry_run=dry_run,
**create_kws,
)
n_copied += c
n_skipped += s
n_bytes_copied += b
n_copied += 1
else:
log(f"skip {source.name}")
n_skipped += 1
return n_copied, n_skipped, n_bytes_copied
[docs]
def copy_all(
source,
dest,
shallow=False,
without_attrs=False,
log=None,
if_exists="raise",
dry_run=False,
**create_kws,
):
"""Copy all children of the `source` group into the `dest` group.
Parameters
----------
source : group or array/dataset
A zarr group or array, or an h5py group or dataset.
dest : group
A zarr or h5py group.
shallow : bool, optional
If True, only copy immediate children of `source`.
without_attrs : bool, optional
Do not copy user attributes.
log : callable, file path or file-like object, optional
If provided, will be used to log progress information.
if_exists : {'raise', 'replace', 'skip', 'skip_initialized'}, optional
How to handle arrays that already exist in the destination group. If
'raise' then a CopyError is raised on the first array already present
in the destination group. If 'replace' then any array will be
replaced in the destination. If 'skip' then any existing arrays will
not be copied. If 'skip_initialized' then any existing arrays with
all chunks initialized will not be copied (not available when copying to
h5py).
dry_run : bool, optional
If True, don't actually copy anything, just log what would have
happened.
**create_kws
Passed through to the create_dataset method when copying an
array/dataset.
Returns
-------
n_copied : int
Number of items copied.
n_skipped : int
Number of items skipped.
n_bytes_copied : int
Number of bytes of data that were actually copied.
Examples
--------
>>> import h5py
>>> import zarr
>>> import numpy as np
>>> source = h5py.File('data/example.h5', mode='w')
>>> foo = source.create_group('foo')
>>> baz = foo.create_dataset('bar/baz', data=np.arange(100), chunks=(50,))
>>> spam = source.create_dataset('spam', data=np.arange(100, 200), chunks=(30,))
>>> zarr.tree(source)
/
├── foo
│ └── bar
│ └── baz (100,) int64
└── spam (100,) int64
>>> dest = zarr.group()
>>> import sys
>>> zarr.copy_all(source, dest, log=sys.stdout)
copy /foo
copy /foo/bar
copy /foo/bar/baz (100,) int64
copy /spam (100,) int64
all done: 4 copied, 0 skipped, 1,600 bytes copied
(4, 0, 1600)
>>> dest.tree()
/
├── foo
│ └── bar
│ └── baz (100,) int64
└── spam (100,) int64
>>> source.close()
Notes
-----
Please note that this is an experimental feature. The behaviour of this
function is still evolving and the default behaviour and/or parameters may change
in future versions.
"""
# value checks
_check_dest_is_group(dest)
# setup counting variables
n_copied = n_skipped = n_bytes_copied = 0
zarr_version = getattr(source, "_version", 2)
# setup logging
with _LogWriter(log) as log:
for k in source.keys():
c, s, b = _copy(
log,
source[k],
dest,
name=k,
root=False,
shallow=shallow,
without_attrs=without_attrs,
if_exists=if_exists,
dry_run=dry_run,
**create_kws,
)
n_copied += c
n_skipped += s
n_bytes_copied += b
if zarr_version == 2:
dest.attrs.update(**source.attrs)
# log a final message with a summary of what happened
_log_copy_summary(log, dry_run, n_copied, n_skipped, n_bytes_copied)
return n_copied, n_skipped, n_bytes_copied
[docs]
def open_consolidated(store: StoreLike, metadata_key=".zmetadata", mode="r+", **kwargs):
"""Open group using metadata previously consolidated into a single key.
This is an optimised method for opening a Zarr group, where instead of
traversing the group/array hierarchy by accessing the metadata keys at
each level, a single key contains all of the metadata for everything.
For remote data sources where the overhead of accessing a key is large
compared to the time to read data.
The group accessed must have already had its metadata consolidated into a
single key using the function :func:`consolidate_metadata`.
This optimised method only works in modes which do not change the
metadata, although the data may still be written/updated.
Parameters
----------
store : MutableMapping or string
Store or path to directory in file system or name of zip file.
metadata_key : str
Key to read the consolidated metadata from. The default (.zmetadata)
corresponds to the default used by :func:`consolidate_metadata`.
mode : {'r', 'r+'}, optional
Persistence mode: 'r' means read only (must exist); 'r+' means
read/write (must exist) although only writes to data are allowed,
changes to metadata including creation of new arrays or group
are not allowed.
**kwargs
Additional parameters are passed through to :func:`zarr.creation.open_array` or
:func:`zarr.hierarchy.open_group`.
Returns
-------
g : :class:`zarr.hierarchy.Group`
Group instance, opened with the consolidated metadata.
See Also
--------
consolidate_metadata
"""
# normalize parameters
zarr_version = kwargs.get("zarr_version")
store = normalize_store_arg(
store, storage_options=kwargs.get("storage_options"), mode=mode, zarr_version=zarr_version
)
if mode not in {"r", "r+"}:
raise ValueError(f"invalid mode, expected either 'r' or 'r+'; found {mode!r}")
path = kwargs.pop("path", None)
if store._store_version == 2:
ConsolidatedStoreClass = ConsolidatedMetadataStore
else:
assert_zarr_v3_api_available()
ConsolidatedStoreClass = ConsolidatedMetadataStoreV3
# default is to store within 'consolidated' group on v3
if not metadata_key.startswith("meta/root/"):
metadata_key = "meta/root/consolidated/" + metadata_key
# setup metadata store
meta_store = ConsolidatedStoreClass(store, metadata_key=metadata_key)
# pass through
chunk_store = kwargs.pop("chunk_store", None) or store
return open(store=meta_store, chunk_store=chunk_store, mode=mode, path=path, **kwargs)