# -*- coding: utf-8 -*-
"""This module contains a storage class and codec to support the N5 format.
"""
from __future__ import absolute_import, division
from .meta import ZARR_FORMAT, json_dumps, json_loads
from .storage import (
NestedDirectoryStore,
group_meta_key as zarr_group_meta_key,
array_meta_key as zarr_array_meta_key,
attrs_key as zarr_attrs_key,
_prog_ckey, _prog_number)
from numcodecs.abc import Codec
from numcodecs.compat import ndarray_copy
from numcodecs.registry import register_codec, get_codec
import numpy as np
import struct
import sys
import os
import warnings
zarr_to_n5_keys = [
('chunks', 'blockSize'),
('dtype', 'dataType'),
('compressor', 'compression'),
('shape', 'dimensions')
]
n5_attrs_key = 'attributes.json'
n5_keywords = ['n5', 'dataType', 'dimensions', 'blockSize', 'compression']
[docs]class N5Store(NestedDirectoryStore):
"""Storage class using directories and files on a standard file system,
following the N5 format (https://github.com/saalfeldlab/n5).
Parameters
----------
path : string
Location of directory to use as the root of the storage hierarchy.
Examples
--------
Store a single array::
>>> import zarr
>>> store = zarr.N5Store('data/array.n5')
>>> z = zarr.zeros((10, 10), chunks=(5, 5), store=store, overwrite=True)
>>> z[...] = 42
Store a group::
>>> store = zarr.N5Store('data/group.n5')
>>> root = zarr.group(store=store, overwrite=True)
>>> foo = root.create_group('foo')
>>> bar = foo.zeros('bar', shape=(10, 10), chunks=(5, 5))
>>> bar[...] = 42
Notes
-----
This is an experimental feature.
Safe to write in multiple threads or processes.
"""
def __getitem__(self, key):
if key.endswith(zarr_group_meta_key):
key = key.replace(zarr_group_meta_key, n5_attrs_key)
value = group_metadata_to_zarr(self._load_n5_attrs(key))
return json_dumps(value)
elif key.endswith(zarr_array_meta_key):
key = key.replace(zarr_array_meta_key, n5_attrs_key)
value = array_metadata_to_zarr(self._load_n5_attrs(key))
return json_dumps(value)
elif key.endswith(zarr_attrs_key):
key = key.replace(zarr_attrs_key, n5_attrs_key)
value = attrs_to_zarr(self._load_n5_attrs(key))
if len(value) == 0:
raise KeyError(key)
else:
return json_dumps(value)
elif is_chunk_key(key):
key = invert_chunk_coords(key)
return super(N5Store, self).__getitem__(key)
def __setitem__(self, key, value):
if key.endswith(zarr_group_meta_key):
key = key.replace(zarr_group_meta_key, n5_attrs_key)
n5_attrs = self._load_n5_attrs(key)
n5_attrs.update(**group_metadata_to_n5(json_loads(value)))
value = json_dumps(n5_attrs)
elif key.endswith(zarr_array_meta_key):
key = key.replace(zarr_array_meta_key, n5_attrs_key)
n5_attrs = self._load_n5_attrs(key)
n5_attrs.update(**array_metadata_to_n5(json_loads(value)))
value = json_dumps(n5_attrs)
elif key.endswith(zarr_attrs_key):
key = key.replace(zarr_attrs_key, n5_attrs_key)
n5_attrs = self._load_n5_attrs(key)
zarr_attrs = json_loads(value)
for k in n5_keywords:
if k in zarr_attrs.keys():
raise ValueError("Can not set attribute %s, this is a reserved N5 keyword" % k)
# replace previous user attributes
for k in list(n5_attrs.keys()):
if k not in n5_keywords:
del n5_attrs[k]
# add new user attributes
n5_attrs.update(**zarr_attrs)
value = json_dumps(n5_attrs)
elif is_chunk_key(key):
key = invert_chunk_coords(key)
super(N5Store, self).__setitem__(key, value)
def __delitem__(self, key):
if key.endswith(zarr_group_meta_key): # pragma: no cover
key = key.replace(zarr_group_meta_key, n5_attrs_key)
elif key.endswith(zarr_array_meta_key): # pragma: no cover
key = key.replace(zarr_array_meta_key, n5_attrs_key)
elif key.endswith(zarr_attrs_key): # pragma: no cover
key = key.replace(zarr_attrs_key, n5_attrs_key)
elif is_chunk_key(key):
key = invert_chunk_coords(key)
super(N5Store, self).__delitem__(key)
def __contains__(self, key):
if key.endswith(zarr_group_meta_key):
key = key.replace(zarr_group_meta_key, n5_attrs_key)
if key not in self:
return False
# group if not a dataset (attributes do not contain 'dimensions')
return 'dimensions' not in self._load_n5_attrs(key)
elif key.endswith(zarr_array_meta_key):
key = key.replace(zarr_array_meta_key, n5_attrs_key)
# array if attributes contain 'dimensions'
return 'dimensions' in self._load_n5_attrs(key)
elif key.endswith(zarr_attrs_key): # pragma: no cover
key = key.replace(zarr_array_meta_key, n5_attrs_key)
return self._contains_attrs(key)
elif is_chunk_key(key):
key = invert_chunk_coords(key)
return super(N5Store, self).__contains__(key)
def __eq__(self, other):
return (
isinstance(other, N5Store) and
self.path == other.path
)
def listdir(self, path=None):
if path is not None:
path = invert_chunk_coords(path)
# We can't use NestedDirectoryStore's listdir, as it requires
# array_meta_key to be present in array directories, which this store
# doesn't provide.
children = super(NestedDirectoryStore, self).listdir(path=path)
if self._is_array(path):
# replace n5 attribute file with respective zarr attribute files
children.remove(n5_attrs_key)
children.append(zarr_array_meta_key)
if self._contains_attrs(path):
children.append(zarr_attrs_key)
# special handling of directories containing an array to map
# inverted nested chunk keys back to standard chunk keys
new_children = []
root_path = self.dir_path(path)
for entry in children:
entry_path = os.path.join(root_path, entry)
if _prog_number.match(entry) and os.path.isdir(entry_path):
for dir_path, _, file_names in os.walk(entry_path):
for file_name in file_names:
file_path = os.path.join(dir_path, file_name)
rel_path = file_path.split(root_path + os.path.sep)[1]
new_child = rel_path.replace(os.path.sep, '.')
new_children.append(invert_chunk_coords(new_child))
else:
new_children.append(entry)
return sorted(new_children)
elif self._is_group(path):
# replace n5 attribute file with respective zarr attribute files
children.remove(n5_attrs_key)
children.append(zarr_group_meta_key)
if self._contains_attrs(path): # pragma: no cover
children.append(zarr_attrs_key)
return sorted(children)
else:
return children
def _load_n5_attrs(self, path):
try:
s = super(N5Store, self).__getitem__(path)
return json_loads(s)
except KeyError:
return {}
def _is_group(self, path):
if path is None:
attrs_key = n5_attrs_key
else:
attrs_key = os.path.join(path, n5_attrs_key)
n5_attrs = self._load_n5_attrs(attrs_key)
return len(n5_attrs) > 0 and 'dimensions' not in n5_attrs
def _is_array(self, path):
if path is None:
attrs_key = n5_attrs_key
else:
attrs_key = os.path.join(path, n5_attrs_key)
return 'dimensions' in self._load_n5_attrs(attrs_key)
def _contains_attrs(self, path):
if path is None:
attrs_key = n5_attrs_key
else:
if not path.endswith(n5_attrs_key):
attrs_key = os.path.join(path, n5_attrs_key)
else: # pragma: no cover
attrs_key = path
attrs = attrs_to_zarr(self._load_n5_attrs(attrs_key))
return len(attrs) > 0
def is_chunk_key(key):
segments = list(key.split('/'))
if segments:
last_segment = segments[-1]
return _prog_ckey.match(last_segment)
return False # pragma: no cover
def invert_chunk_coords(key):
segments = list(key.split('/'))
if segments:
last_segment = segments[-1]
if _prog_ckey.match(last_segment):
coords = list(last_segment.split('.'))
last_segment = '.'.join(coords[::-1])
segments = segments[:-1] + [last_segment]
key = '/'.join(segments)
return key
def group_metadata_to_n5(group_metadata):
'''Convert group metadata from zarr to N5 format.'''
del group_metadata['zarr_format']
group_metadata['n5'] = '2.0.0'
return group_metadata
def group_metadata_to_zarr(group_metadata):
'''Convert group metadata from N5 to zarr format.'''
del group_metadata['n5']
group_metadata['zarr_format'] = ZARR_FORMAT
return group_metadata
def array_metadata_to_n5(array_metadata):
'''Convert array metadata from zarr to N5 format.'''
for f, t in zarr_to_n5_keys:
array_metadata[t] = array_metadata[f]
del array_metadata[f]
del array_metadata['zarr_format']
try:
dtype = np.dtype(array_metadata['dataType'])
except TypeError: # pragma: no cover
raise TypeError(
"data type %s not supported by N5" % array_metadata['dataType'])
array_metadata['dataType'] = dtype.name
array_metadata['dimensions'] = array_metadata['dimensions'][::-1]
array_metadata['blockSize'] = array_metadata['blockSize'][::-1]
if 'fill_value' in array_metadata:
if array_metadata['fill_value'] != 0 and array_metadata['fill_value'] is not None:
raise ValueError("N5 only supports fill_value == 0 (for now)")
del array_metadata['fill_value']
if 'order' in array_metadata:
if array_metadata['order'] != 'C':
raise ValueError("zarr N5 storage only stores arrays in C order (for now)")
del array_metadata['order']
if 'filters' in array_metadata:
if array_metadata['filters'] != [] and array_metadata['filters'] is not None:
raise ValueError("N5 storage does not support zarr filters")
del array_metadata['filters']
assert 'compression' in array_metadata
compressor_config = array_metadata['compression']
compressor_config = compressor_config_to_n5(compressor_config)
array_metadata['compression'] = compressor_config
return array_metadata
def array_metadata_to_zarr(array_metadata):
'''Convert array metadata from N5 to zarr format.'''
for t, f in zarr_to_n5_keys:
array_metadata[t] = array_metadata[f]
del array_metadata[f]
array_metadata['zarr_format'] = ZARR_FORMAT
array_metadata['shape'] = array_metadata['shape'][::-1]
array_metadata['chunks'] = array_metadata['chunks'][::-1]
array_metadata['fill_value'] = 0 # also if None was requested
array_metadata['order'] = 'C'
array_metadata['filters'] = []
compressor_config = array_metadata['compressor']
compressor_config = compressor_config_to_zarr(compressor_config)
array_metadata['compressor'] = {
'id': N5ChunkWrapper.codec_id,
'compressor_config': compressor_config,
'dtype': array_metadata['dtype'],
'chunk_shape': array_metadata['chunks']
}
return array_metadata
def attrs_to_zarr(attrs):
'''Get all zarr attributes from an N5 attributes dictionary (i.e.,
all non-keyword attributes).'''
# remove all N5 keywords
for n5_key in n5_keywords:
if n5_key in attrs:
del attrs[n5_key]
return attrs
def compressor_config_to_n5(compressor_config):
if compressor_config is None:
return {'type': 'raw'}
# peel wrapper, if present
if compressor_config['id'] == N5ChunkWrapper.codec_id:
compressor_config = compressor_config['compressor_config']
codec_id = compressor_config['id']
n5_config = {'type': codec_id}
if codec_id == 'bz2':
n5_config['type'] = 'bzip2'
n5_config['blockSize'] = compressor_config['level']
elif codec_id == 'blosc':
warnings.warn(
"Not all N5 implementations support blosc compression (yet). You "
"might not be able to open the dataset with another N5 library.",
RuntimeWarning
)
n5_config['codec'] = compressor_config['cname']
n5_config['level'] = compressor_config['clevel']
n5_config['shuffle'] = compressor_config['shuffle']
assert compressor_config['blocksize'] == 0, \
"blosc block size needs to be 0 for N5 containers."
elif codec_id == 'lzma':
# Switch to XZ for N5 if we are using the default XZ format.
# Note: 4 is the default, which is lzma.CHECK_CRC64.
if compressor_config['format'] == 1 and compressor_config['check'] in [-1, 4]:
n5_config['type'] = 'xz'
else:
warnings.warn(
"Not all N5 implementations support lzma compression (yet). You "
"might not be able to open the dataset with another N5 library.",
RuntimeWarning
)
n5_config['format'] = compressor_config['format']
n5_config['check'] = compressor_config['check']
n5_config['filters'] = compressor_config['filters']
# The default is lzma.PRESET_DEFAULT, which is 6.
if compressor_config['preset']:
n5_config['preset'] = compressor_config['preset']
else:
n5_config['preset'] = 6
elif codec_id == 'zlib':
n5_config['type'] = 'gzip'
n5_config['level'] = compressor_config['level']
n5_config['useZlib'] = True
elif codec_id == 'gzip': # pragma: no cover
n5_config['type'] = 'gzip'
n5_config['level'] = compressor_config['level']
n5_config['useZlib'] = False
else: # pragma: no cover
raise RuntimeError("Unknown compressor with id %s" % codec_id)
return n5_config
def compressor_config_to_zarr(compressor_config):
codec_id = compressor_config['type']
zarr_config = {'id': codec_id}
if codec_id == 'bzip2':
zarr_config['id'] = 'bz2'
zarr_config['level'] = compressor_config['blockSize']
elif codec_id == 'blosc':
zarr_config['cname'] = compressor_config['codec']
zarr_config['clevel'] = compressor_config['level']
zarr_config['shuffle'] = compressor_config['shuffle']
zarr_config['blocksize'] = 0
elif codec_id == 'lzma':
zarr_config['format'] = compressor_config['format']
zarr_config['check'] = compressor_config['check']
zarr_config['preset'] = compressor_config['preset']
zarr_config['filters'] = compressor_config['filters']
elif codec_id == 'xz':
zarr_config['id'] = 'lzma'
zarr_config['format'] = 1 # lzma.FORMAT_XZ
zarr_config['check'] = -1
zarr_config['preset'] = compressor_config['preset']
zarr_config['filters'] = None
elif codec_id == 'gzip':
if 'useZlib' in compressor_config and compressor_config['useZlib']:
zarr_config['id'] = 'zlib'
zarr_config['level'] = compressor_config['level']
else: # pragma: no cover
zarr_config['id'] = 'gzip'
zarr_config['level'] = compressor_config['level']
elif codec_id == 'raw':
return None
else: # pragma: no cover
raise RuntimeError("Unknown compressor with id %s" % codec_id)
return zarr_config
class N5ChunkWrapper(Codec):
codec_id = 'n5_wrapper'
def __init__(self, dtype, chunk_shape, compressor_config=None, compressor=None):
self.dtype = np.dtype(dtype)
self.chunk_shape = tuple(chunk_shape)
# is the dtype a little endian format?
self._little_endian = (
self.dtype.byteorder == '<' or
(self.dtype.byteorder == '=' and sys.byteorder == 'little')
)
if compressor: # pragma: no cover
if compressor_config is not None:
raise ValueError("Only one of compressor_config or compressor should be given.")
compressor_config = compressor.get_config()
if (
compressor_config is None and compressor is None or
compressor_config['id'] == 'raw'):
self.compressor_config = None
self._compressor = None
else:
self._compressor = get_codec(compressor_config)
self.compressor_config = self._compressor.get_config()
def get_config(self):
config = {
'id': self.codec_id,
'compressor_config': self.compressor_config
}
return config
def encode(self, chunk):
assert chunk.flags.c_contiguous
header = self._create_header(chunk)
chunk = self._to_big_endian(chunk)
if self._compressor:
return header + self._compressor.encode(chunk)
else:
return header + chunk.tobytes(order='A')
def decode(self, chunk, out=None):
len_header, chunk_shape = self._read_header(chunk)
chunk = chunk[len_header:]
if out is not None:
# out should only be used if we read a complete chunk
assert chunk_shape == self.chunk_shape, (
"Expected chunk of shape %s, found %s" % (
self.chunk_shape,
chunk_shape))
if self._compressor:
self._compressor.decode(chunk, out)
else:
ndarray_copy(chunk, out)
# we can byteswap in-place
if self._little_endian:
out.byteswap(True)
return out
else:
if self._compressor:
chunk = self._compressor.decode(chunk)
# more expensive byteswap
chunk = self._from_big_endian(chunk)
# read partial chunk
if chunk_shape != self.chunk_shape: # pragma: no cover
chunk = np.frombuffer(chunk, dtype=self.dtype)
chunk = chunk.reshape(chunk_shape)
complete_chunk = np.zeros(self.chunk_shape, dtype=self.dtype)
target_slices = tuple(slice(0, s) for s in chunk_shape)
complete_chunk[target_slices] = chunk
chunk = complete_chunk
return chunk
def _create_header(self, chunk):
mode = struct.pack('>H', 0)
num_dims = struct.pack('>H', len(chunk.shape))
shape = b''.join(
struct.pack('>I', d)
for d in chunk.shape[::-1]
)
return mode + num_dims + shape
def _read_header(self, chunk):
num_dims = struct.unpack('>H', chunk[2:4])[0]
shape = tuple(
struct.unpack('>I', chunk[i:i+4])[0]
for i in range(4, num_dims*4 + 4, 4)
)[::-1]
len_header = 4 + num_dims*4
return len_header, shape
def _to_big_endian(self, data):
# assumes data is ndarray
if self._little_endian:
return data.byteswap()
return data
def _from_big_endian(self, data):
# assumes data is byte array in big endian
if not self._little_endian:
return data
a = np.frombuffer(data, self.dtype.newbyteorder('>'))
return a.astype(self.dtype)
register_codec(N5ChunkWrapper, N5ChunkWrapper.codec_id)