diff --git a/src/zarr/codecs/arrow.py b/src/zarr/codecs/arrow.py new file mode 100644 index 0000000000..12bd2a7767 --- /dev/null +++ b/src/zarr/codecs/arrow.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import io +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from arro3.core import Array, Table +from arro3.io import read_ipc_stream, write_ipc_stream + +from zarr.abc.codec import ArrayBytesCodec +from zarr.core.common import JSON, parse_named_configuration + +if TYPE_CHECKING: + from typing import Self + + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer, NDBuffer + + +@dataclass(frozen=True) +class ArrowIPCCodec(ArrayBytesCodec): + """Arrow IPC codec""" + + column_name: str + + def __init__(self, *, column_name: str = "zarr_array") -> None: + object.__setattr__(self, "column_name", column_name) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "arrow-ipc", require_configuration=False + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + return {"name": "arrow_ipc", "configuration": {"column_name": self.column_name}} + + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + # TODO: possibly parse array dtype to configure codec + return self + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + record_batch_reader = read_ipc_stream(io.BytesIO(chunk_bytes.as_buffer_like())) + # Note: we only expect a single batch per chunk + record_batch = record_batch_reader.read_next_batch() + array = record_batch.column(self.column_name) + numpy_array = array.to_numpy() + # all arrow arrays are flat; reshape to chunk shape + numpy_array.shape = chunk_spec.shape + # make sure we got the right dtype out + # assert numpy_array.dtype == chunk_spec.dtype.to_native_dtype(), ( + # f"dtype mismatch, got {numpy_array.dtype}, expected {chunk_spec.dtype.to_native_dtype()}" + # ) + return chunk_spec.prototype.nd_buffer.from_numpy_array(numpy_array) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer: + # TODO: generalize flattening strategy to prevent memory copies + numpy_array = chunk_array.as_ndarray_like().ravel(order="C") + arrow_array = Array.from_numpy(numpy_array) + table = Table.from_arrays(arrays=[arrow_array], names=[self.column_name]) + # TODO: figure out how to avoid copying the bytes to a new buffer! + # Doh, this is the whole point of Arrow, right? + buffer = io.BytesIO() + write_ipc_stream(table, buffer) + return chunk_spec.prototype.buffer.from_bytes(buffer.getvalue()) + + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError diff --git a/src/zarr/core/metadata/v3.py b/src/zarr/core/metadata/v3.py index a14373c340..c313509ba9 100644 --- a/src/zarr/core/metadata/v3.py +++ b/src/zarr/core/metadata/v3.py @@ -105,7 +105,10 @@ def validate_codecs(codecs: tuple[Codec, ...], dtype: ZDType[TBaseDType, TBaseSc # TODO: use codec ID instead of class name codec_class_name = abc.__class__.__name__ # TODO: Fix typing here - if isinstance(dtype, VariableLengthUTF8) and not codec_class_name == "VLenUTF8Codec": # type: ignore[unreachable] + if isinstance(dtype, VariableLengthUTF8) and codec_class_name not in ( # type: ignore[unreachable] + "VLenUTF8Codec", + "ArrowIPCCodec", + ): raise ValueError( f"For string dtype, ArrayBytesCodec must be `VLenUTF8Codec`, got `{codec_class_name}`." ) diff --git a/tests/test_codecs/test_arrow.py b/tests/test_codecs/test_arrow.py new file mode 100644 index 0000000000..bb42f1c35d --- /dev/null +++ b/tests/test_codecs/test_arrow.py @@ -0,0 +1,102 @@ +import io +from typing import Any + +import numpy as np +import pytest +from arro3.io import read_ipc_stream + +import zarr +from zarr.codecs.arrow import ArrowIPCCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import numpy_buffer_prototype +from zarr.dtype import parse_dtype + +CPU_BUFFER_PROTOTYPE = numpy_buffer_prototype() + + +numpy_array_fixtures = [ + (np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), None), + (np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), None), + (np.array([[True, False, True], [False, True, False]], dtype="bool"), None), + ( + np.array(["foo", "barry", "bazo"], dtype=np.dtypes.StringDType()), + zarr.dtype.VariableLengthUTF8(), + ), + # both come back as object dtype, but if we pass object array to Zarr, it complains about dtype resolution + # np.array(['foo', 'barry', 'bazo'], dtype='U5'), + # np.array(['foo', 'barry', 'bazo'], dtype=np.dtypes.StringDType()) +] + + +@pytest.mark.parametrize("numpy_array_and_zdtype", numpy_array_fixtures) +async def test_arrow_codec_round_trip( + numpy_array_and_zdtype: tuple[np.ndarray[Any, Any], zarr.dtype.ZDType[Any, Any] | None], +) -> None: + numpy_array, zdtype = numpy_array_and_zdtype + if zdtype is None: + spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3) + else: + spec_dtype = zdtype + codec = ArrowIPCCodec() + array_config = ArrayConfig(order="C", write_empty_chunks=True) + array_spec = ArraySpec( + shape=numpy_array.shape, + dtype=spec_dtype, + fill_value=0, + config=array_config, + prototype=CPU_BUFFER_PROTOTYPE, + ) + + ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array) + encoded = await codec._encode_single(ndbuffer, array_spec) + decoded = await codec._decode_single(encoded, array_spec) + + # Test that the decoded array matches the original + numpy_array_decoded = decoded.as_ndarray_like() + np.testing.assert_array_equal(numpy_array_decoded, numpy_array) + + +async def test_custom_field_name() -> None: + numpy_array = np.array([[1, 2, 3], [4, 5, 6]], dtype="int64") + spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3) + codec = ArrowIPCCodec(column_name="custom_field_name") + array_config = ArrayConfig(order="C", write_empty_chunks=True) + array_spec = ArraySpec( + shape=numpy_array.shape, + dtype=spec_dtype, + fill_value=0, + config=array_config, + prototype=CPU_BUFFER_PROTOTYPE, + ) + + ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array) + encoded = await codec._encode_single(ndbuffer, array_spec) + decoded = await codec._decode_single(encoded, array_spec) + + # Test that the decoded array matches the original + numpy_array_decoded = decoded.as_ndarray_like() + np.testing.assert_array_equal(numpy_array_decoded, numpy_array) + + # test that we can read the arrow data directly + record_batch_reader = read_ipc_stream(io.BytesIO(encoded.as_buffer_like())) + record_batch = record_batch_reader.read_next_batch() + assert record_batch.num_columns == 1 + _ = record_batch.column("custom_field_name") + + +def test_string_array() -> None: + # IMO codec tests should be much more self contained, + # not end-to-end array round-tripping tests. + # But don't see a better way to test this at the moment.. + + a = zarr.create_array( + shape=4, + chunks=(2,), + dtype=zarr.dtype.VariableLengthUTF8(), # type: ignore[arg-type] + serializer=ArrowIPCCodec(), + store=zarr.storage.MemoryStore(), + ) + + a[:] = np.array(["abc", "1234", "foo", "bar"]) + result = a[:] + np.testing.assert_equal(a, result) diff --git a/tests/test_codecs/test_bytes.py b/tests/test_codecs/test_bytes.py new file mode 100644 index 0000000000..31b72f84a6 --- /dev/null +++ b/tests/test_codecs/test_bytes.py @@ -0,0 +1,43 @@ +from typing import Any + +import numpy as np +import pytest + +from zarr.codecs.bytes import BytesCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import numpy_buffer_prototype +from zarr.dtype import parse_dtype + +CPU_BUFFER_PROTOTYPE = numpy_buffer_prototype() + + +numpy_array_fixtures = [ + np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), + np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), + np.array([[True, False, True], [False, True, False]], dtype="bool"), +] + + +@pytest.mark.parametrize("numpy_array", numpy_array_fixtures) +async def test_bytes_codec_round_trip(numpy_array: np.ndarray[Any, Any]) -> None: + # Test default initialization + codec = BytesCodec() + + # numpy_array = np.array([[1, 2, 3], [4, 5, 6]], dtype='int64') + array_config = ArrayConfig(order="C", write_empty_chunks=True) + array_spec = ArraySpec( + shape=numpy_array.shape, + dtype=parse_dtype(numpy_array.dtype, zarr_format=3), + fill_value=0, + config=array_config, + prototype=CPU_BUFFER_PROTOTYPE, + ) + + ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array) + encoded = await codec._encode_single(ndbuffer, array_spec) + assert encoded is not None + decoded = await codec._decode_single(encoded, array_spec) + + # Test that the decoded array matches the original + numpy_array_decoded = decoded.as_ndarray_like() + np.testing.assert_array_equal(numpy_array_decoded, numpy_array)