Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ docs/_build
docs/_autosummary
docs/*.zarr
__pycache__
*.swp

.envrc
.venv*
.direnv
62 changes: 62 additions & 0 deletions xarray_beam/_src/zarr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,68 @@ def test_chunks_to_zarr(self):
):
inputs2 | xbeam.ChunksToZarr(temp_dir, template)

def test_chunks_to_zarr_append(self):
zarr_chunks = {'t': 1, 'x': 5}

# ds is the full dataset we want to write to zarr.

# Warning: Do not chunk this ds. If you do, caching will make tests
# pass, so long as you simply modify the template (without even calling
# ChunksToZarr).
ds = xarray.Dataset(
{'foo': (('t', 'x'), np.arange(3 * 5).reshape(3, 5))},
coords={
't': np.arange(100, 103),
'x': np.arange(5),
},
)

# Write the first two chunks.
two_chunk_template = xbeam.make_template(ds.isel(t=slice(2)))
first_two_chunks = [
(xbeam.Key({'t': 0}), ds.isel(t=[0])),
(xbeam.Key({'t': 1}), ds.isel(t=[1])),
]
path = self.create_tempdir().full_path
first_two_chunks | xbeam.ChunksToZarr(
path, template=two_chunk_template, zarr_chunks=zarr_chunks
)
two_chunk_result = xarray.open_zarr(path, consolidated=True)
xarray.testing.assert_identical(ds.isel(t=slice(2)), two_chunk_result)

# Now append the last chunk.

# First modify the metadata
# Append the new data (t=[2]) to the existing metadata.
# This results in t/.zarray that has chunk:2, equal to the number of times
# in the first write.
xbeam.make_template(ds.isel(t=[2])).chunk(zarr_chunks).to_zarr(
# Caling make_template is not necessary, but let's test it since
# this is the anticipated workflow.
path,
mode='a',
append_dim='t',
compute=False,
)
xbeam_opened_result, chunks = xbeam.open_zarr(path)
self.assertEqual(zarr_chunks, chunks)

# Second, write the last chunk only.
last_chunk = [
(xbeam.Key({'t': 2}), ds.isel(t=[2])),
]

last_chunk | xbeam.ChunksToZarr(
path,
template=xbeam.make_template(xbeam_opened_result),
zarr_chunks=zarr_chunks,
needs_setup=False,
)

final_result, final_chunks = xbeam.open_zarr(path, consolidated=True)
xarray.testing.assert_identical(ds, final_result)
self.assertEqual(zarr_chunks, final_chunks)

def test_multiple_vars_chunks_to_zarr(self):
dataset = xarray.Dataset(
{
Expand Down