Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/zarr/codecs/arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import annotations

import io
from dataclasses import dataclass
from typing import TYPE_CHECKING

from arro3.core import Array, Table
from arro3.io import read_ipc_stream, write_ipc_stream

from zarr.abc.codec import ArrayBytesCodec
from zarr.core.common import JSON, parse_named_configuration

if TYPE_CHECKING:
from typing import Self

from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDBuffer


@dataclass(frozen=True)
class ArrowIPCCodec(ArrayBytesCodec):
"""Arrow IPC codec"""

column_name: str

def __init__(self, *, column_name: str = "zarr_array") -> None:
object.__setattr__(self, "column_name", column_name)

@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
_, configuration_parsed = parse_named_configuration(
data, "arrow-ipc", require_configuration=False
)
configuration_parsed = configuration_parsed or {}
return cls(**configuration_parsed) # type: ignore[arg-type]

def to_dict(self) -> dict[str, JSON]:
return {"name": "arrow_ipc", "configuration": {"column_name": self.column_name}}

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
# TODO: possibly parse array dtype to configure codec
return self

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
record_batch_reader = read_ipc_stream(io.BytesIO(chunk_bytes.as_buffer_like()))
# Note: we only expect a single batch per chunk
record_batch = record_batch_reader.read_next_batch()
array = record_batch.column(self.column_name)
numpy_array = array.to_numpy()
Copy link
Contributor

@ilan-gold ilan-gold Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very happy to see this happening :)

I would be very curious about the behavior of non-standard types here. What does something like geometry dtype (which isn't in pyarrow) or DictionaryArray (which is in the core but has an implicit masking of sorts) do here? I can't deduce from the pyarrow docs exactly to be honest

Would it make sense to have a custom buffer class similar to what @keewis is doing for sparse (I think?)

# all arrow arrays are flat; reshape to chunk shape
numpy_array.shape = chunk_spec.shape
# make sure we got the right dtype out
# assert numpy_array.dtype == chunk_spec.dtype.to_native_dtype(), (
# f"dtype mismatch, got {numpy_array.dtype}, expected {chunk_spec.dtype.to_native_dtype()}"
# )
return chunk_spec.prototype.nd_buffer.from_numpy_array(numpy_array)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer:
# TODO: generalize flattening strategy to prevent memory copies
numpy_array = chunk_array.as_ndarray_like().ravel(order="C")
arrow_array = Array.from_numpy(numpy_array)
table = Table.from_arrays(arrays=[arrow_array], names=[self.column_name])
# TODO: figure out how to avoid copying the bytes to a new buffer!
# Doh, this is the whole point of Arrow, right?
buffer = io.BytesIO()
write_ipc_stream(table, buffer)
return chunk_spec.prototype.buffer.from_bytes(buffer.getvalue())

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
5 changes: 4 additions & 1 deletion src/zarr/core/metadata/v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ def validate_codecs(codecs: tuple[Codec, ...], dtype: ZDType[TBaseDType, TBaseSc
# TODO: use codec ID instead of class name
codec_class_name = abc.__class__.__name__
# TODO: Fix typing here
if isinstance(dtype, VariableLengthUTF8) and not codec_class_name == "VLenUTF8Codec": # type: ignore[unreachable]
if isinstance(dtype, VariableLengthUTF8) and codec_class_name not in ( # type: ignore[unreachable]
"VLenUTF8Codec",
"ArrowIPCCodec",
):
raise ValueError(
f"For string dtype, ArrayBytesCodec must be `VLenUTF8Codec`, got `{codec_class_name}`."
)
Expand Down
102 changes: 102 additions & 0 deletions tests/test_codecs/test_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import io
from typing import Any

import numpy as np
import pytest
from arro3.io import read_ipc_stream

import zarr
from zarr.codecs.arrow import ArrowIPCCodec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.buffer import numpy_buffer_prototype
from zarr.dtype import parse_dtype

CPU_BUFFER_PROTOTYPE = numpy_buffer_prototype()


numpy_array_fixtures = [
(np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), None),
(np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), None),
(np.array([[True, False, True], [False, True, False]], dtype="bool"), None),
(
np.array(["foo", "barry", "bazo"], dtype=np.dtypes.StringDType()),
zarr.dtype.VariableLengthUTF8(),
),
# both come back as object dtype, but if we pass object array to Zarr, it complains about dtype resolution
# np.array(['foo', 'barry', 'bazo'], dtype='U5'),
# np.array(['foo', 'barry', 'bazo'], dtype=np.dtypes.StringDType())
]


@pytest.mark.parametrize("numpy_array_and_zdtype", numpy_array_fixtures)
async def test_arrow_codec_round_trip(
numpy_array_and_zdtype: tuple[np.ndarray[Any, Any], zarr.dtype.ZDType[Any, Any] | None],
) -> None:
numpy_array, zdtype = numpy_array_and_zdtype
if zdtype is None:
spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3)
else:
spec_dtype = zdtype
codec = ArrowIPCCodec()
array_config = ArrayConfig(order="C", write_empty_chunks=True)
array_spec = ArraySpec(
shape=numpy_array.shape,
dtype=spec_dtype,
fill_value=0,
config=array_config,
prototype=CPU_BUFFER_PROTOTYPE,
)

ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array)
encoded = await codec._encode_single(ndbuffer, array_spec)
decoded = await codec._decode_single(encoded, array_spec)

# Test that the decoded array matches the original
numpy_array_decoded = decoded.as_ndarray_like()
np.testing.assert_array_equal(numpy_array_decoded, numpy_array)


async def test_custom_field_name() -> None:
numpy_array = np.array([[1, 2, 3], [4, 5, 6]], dtype="int64")
spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3)
codec = ArrowIPCCodec(column_name="custom_field_name")
array_config = ArrayConfig(order="C", write_empty_chunks=True)
array_spec = ArraySpec(
shape=numpy_array.shape,
dtype=spec_dtype,
fill_value=0,
config=array_config,
prototype=CPU_BUFFER_PROTOTYPE,
)

ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array)
encoded = await codec._encode_single(ndbuffer, array_spec)
decoded = await codec._decode_single(encoded, array_spec)

# Test that the decoded array matches the original
numpy_array_decoded = decoded.as_ndarray_like()
np.testing.assert_array_equal(numpy_array_decoded, numpy_array)

# test that we can read the arrow data directly
record_batch_reader = read_ipc_stream(io.BytesIO(encoded.as_buffer_like()))
record_batch = record_batch_reader.read_next_batch()
assert record_batch.num_columns == 1
_ = record_batch.column("custom_field_name")


def test_string_array() -> None:
# IMO codec tests should be much more self contained,
# not end-to-end array round-tripping tests.
# But don't see a better way to test this at the moment..

a = zarr.create_array(
shape=4,
chunks=(2,),
dtype=zarr.dtype.VariableLengthUTF8(), # type: ignore[arg-type]
serializer=ArrowIPCCodec(),
store=zarr.storage.MemoryStore(),
)

a[:] = np.array(["abc", "1234", "foo", "bar"])
result = a[:]
np.testing.assert_equal(a, result)
43 changes: 43 additions & 0 deletions tests/test_codecs/test_bytes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any

import numpy as np
import pytest

from zarr.codecs.bytes import BytesCodec
from zarr.core.array_spec import ArrayConfig, ArraySpec
from zarr.core.buffer import numpy_buffer_prototype
from zarr.dtype import parse_dtype

CPU_BUFFER_PROTOTYPE = numpy_buffer_prototype()


numpy_array_fixtures = [
np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"),
np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"),
np.array([[True, False, True], [False, True, False]], dtype="bool"),
]


@pytest.mark.parametrize("numpy_array", numpy_array_fixtures)
async def test_bytes_codec_round_trip(numpy_array: np.ndarray[Any, Any]) -> None:
# Test default initialization
codec = BytesCodec()

# numpy_array = np.array([[1, 2, 3], [4, 5, 6]], dtype='int64')
array_config = ArrayConfig(order="C", write_empty_chunks=True)
array_spec = ArraySpec(
shape=numpy_array.shape,
dtype=parse_dtype(numpy_array.dtype, zarr_format=3),
fill_value=0,
config=array_config,
prototype=CPU_BUFFER_PROTOTYPE,
)

ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array)
encoded = await codec._encode_single(ndbuffer, array_spec)
assert encoded is not None
decoded = await codec._decode_single(encoded, array_spec)

# Test that the decoded array matches the original
numpy_array_decoded = decoded.as_ndarray_like()
np.testing.assert_array_equal(numpy_array_decoded, numpy_array)
Loading