Source code for zarr.codecs.blosc

from __future__ import annotations

import asyncio
from dataclasses import dataclass, replace
from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING

import numcodecs
from numcodecs.blosc import Blosc
from packaging.version import Version

from zarr.abc.codec import BytesBytesCodec
from zarr.core.buffer.cpu import as_numpy_array_wrapper
from zarr.core.common import JSON, parse_enum, parse_named_configuration
from zarr.registry import register_codec

if TYPE_CHECKING:
    from typing import Self

    from zarr.core.array_spec import ArraySpec
    from zarr.core.buffer import Buffer


[docs] class BloscShuffle(Enum): """ Enum for shuffle filter used by blosc. """ noshuffle = "noshuffle" shuffle = "shuffle" bitshuffle = "bitshuffle"
[docs] @classmethod def from_int(cls, num: int) -> BloscShuffle: blosc_shuffle_int_to_str = { 0: "noshuffle", 1: "shuffle", 2: "bitshuffle", } if num not in blosc_shuffle_int_to_str: raise ValueError(f"Value must be between 0 and 2. Got {num}.") return BloscShuffle[blosc_shuffle_int_to_str[num]]
[docs] class BloscCname(Enum): """ Enum for compression library used by blosc. """ lz4 = "lz4" lz4hc = "lz4hc" blosclz = "blosclz" zstd = "zstd" snappy = "snappy" zlib = "zlib"
# See https://zarr.readthedocs.io/en/stable/user-guide/performance.html#configuring-blosc numcodecs.blosc.use_threads = False def parse_typesize(data: JSON) -> int: if isinstance(data, int): if data > 0: return data else: raise ValueError( f"Value must be greater than 0. Got {data}, which is less or equal to 0." ) raise TypeError(f"Value must be an int. Got {type(data)} instead.") # todo: real validation def parse_clevel(data: JSON) -> int: if isinstance(data, int): return data raise TypeError(f"Value should be an int. Got {type(data)} instead.") def parse_blocksize(data: JSON) -> int: if isinstance(data, int): return data raise TypeError(f"Value should be an int. Got {type(data)} instead.")
[docs] @dataclass(frozen=True) class BloscCodec(BytesBytesCodec): is_fixed_size = False typesize: int | None cname: BloscCname = BloscCname.zstd clevel: int = 5 shuffle: BloscShuffle | None = BloscShuffle.noshuffle blocksize: int = 0 def __init__( self, *, typesize: int | None = None, cname: BloscCname | str = BloscCname.zstd, clevel: int = 5, shuffle: BloscShuffle | str | None = None, blocksize: int = 0, ) -> None: typesize_parsed = parse_typesize(typesize) if typesize is not None else None cname_parsed = parse_enum(cname, BloscCname) clevel_parsed = parse_clevel(clevel) shuffle_parsed = parse_enum(shuffle, BloscShuffle) if shuffle is not None else None blocksize_parsed = parse_blocksize(blocksize) object.__setattr__(self, "typesize", typesize_parsed) object.__setattr__(self, "cname", cname_parsed) object.__setattr__(self, "clevel", clevel_parsed) object.__setattr__(self, "shuffle", shuffle_parsed) object.__setattr__(self, "blocksize", blocksize_parsed)
[docs] @classmethod def from_dict(cls, data: dict[str, JSON]) -> Self: _, configuration_parsed = parse_named_configuration(data, "blosc") return cls(**configuration_parsed) # type: ignore[arg-type]
[docs] def to_dict(self) -> dict[str, JSON]: if self.typesize is None: raise ValueError("`typesize` needs to be set for serialization.") if self.shuffle is None: raise ValueError("`shuffle` needs to be set for serialization.") return { "name": "blosc", "configuration": { "typesize": self.typesize, "cname": self.cname.value, "clevel": self.clevel, "shuffle": self.shuffle.value, "blocksize": self.blocksize, }, }
[docs] def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: dtype = array_spec.dtype new_codec = self if new_codec.typesize is None: new_codec = replace(new_codec, typesize=dtype.itemsize) if new_codec.shuffle is None: new_codec = replace( new_codec, shuffle=(BloscShuffle.bitshuffle if dtype.itemsize == 1 else BloscShuffle.shuffle), ) return new_codec
@cached_property def _blosc_codec(self) -> Blosc: if self.shuffle is None: raise ValueError("`shuffle` needs to be set for decoding and encoding.") map_shuffle_str_to_int = { BloscShuffle.noshuffle: 0, BloscShuffle.shuffle: 1, BloscShuffle.bitshuffle: 2, } config_dict = { "cname": self.cname.name, "clevel": self.clevel, "shuffle": map_shuffle_str_to_int[self.shuffle], "blocksize": self.blocksize, } # See https://github.com/zarr-developers/numcodecs/pull/713 if Version(numcodecs.__version__) >= Version("0.16.0"): config_dict["typesize"] = self.typesize return Blosc.from_config(config_dict) async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: return await asyncio.to_thread( as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype ) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: # Since blosc only support host memory, we convert the input and output of the encoding # between numpy array and buffer return await asyncio.to_thread( lambda chunk: chunk_spec.prototype.buffer.from_bytes( self._blosc_codec.encode(chunk.as_numpy_array()) ), chunk_bytes, )
[docs] def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError
register_codec("blosc", BloscCodec)