Skip to content
Merged
3 changes: 2 additions & 1 deletion cloudbuild/run_zonal_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ pip install -e .
echo '--- Setting up environment variables on VM ---'
export ZONAL_BUCKET=${_ZONAL_BUCKET}
export RUN_ZONAL_SYSTEM_TESTS=True
echo '--- Running Zonal tests on VM ---'
CURRENT_ULIMIT=$(ulimit -n)
echo '--- Running Zonal tests on VM with ulimit set to ---' $CURRENT_ULIMIT
pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py
3 changes: 2 additions & 1 deletion cloudbuild/zb-system-tests-cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ substitutions:
_ZONE: "us-central1-a"
_SHORT_BUILD_ID: ${BUILD_ID:0:8}
_VM_NAME: "py-sdk-sys-test-${_SHORT_BUILD_ID}"
_ULIMIT: "10000" # 10k, for gRPC bidi streams



Expand Down Expand Up @@ -67,7 +68,7 @@ steps:
# Execute the script on the VM via SSH.
# Capture the exit code to ensure cleanup happens before the build fails.
set +e
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh"
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh"
EXIT_CODE=$?
set -e

Expand Down
68 changes: 32 additions & 36 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# py standard imports
import asyncio
import os
import uuid
from io import BytesIO
Expand All @@ -8,6 +7,7 @@
import google_crc32c

import pytest
import gc

# current library imports
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
Expand All @@ -19,6 +19,7 @@
AsyncMultiRangeDownloader,
)


pytestmark = pytest.mark.skipif(
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.",
Expand All @@ -36,36 +37,6 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
return a + step, a + 2 * step


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
data: bytes,
) -> None:
"""Helper to write an appendable object."""
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
await writer.open()
await writer.append(data)
await writer.close()


@pytest.fixture(scope="function")
def appendable_object(storage_client, blobs_to_delete):
"""Fixture to create and cleanup an appendable object."""
object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
asyncio.run(
write_one_appendable_object(
_ZONAL_BUCKET,
object_name,
_BYTES_TO_UPLOAD,
)
)
yield object_name

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
Expand Down Expand Up @@ -114,6 +85,9 @@ async def test_basic_wrd(

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()


@pytest.mark.asyncio
Expand Down Expand Up @@ -161,12 +135,20 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size)

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()


@pytest.mark.asyncio
@pytest.mark.parametrize(
"flush_interval",
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
[
2 * 1024 * 1024,
4 * 1024 * 1024,
8 * 1024 * 1024,
_DEFAULT_FLUSH_INTERVAL_BYTES,
],
)
async def test_wrd_with_non_default_flush_interval(
storage_client,
Expand Down Expand Up @@ -214,6 +196,9 @@ async def test_wrd_with_non_default_flush_interval(

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()


@pytest.mark.asyncio
Expand All @@ -237,20 +222,28 @@ async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delet

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del mrd
gc.collect()


@pytest.mark.asyncio
async def test_mrd_open_with_read_handle(appendable_object):
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
async def test_mrd_open_with_read_handle():
grpc_client = AsyncGrpcClient().grpc_client
object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}"
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.close()

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object)
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
await mrd.open()
read_handle = mrd.read_handle
await mrd.close()

# Open a new MRD using the `read_handle` obtained above
new_mrd = AsyncMultiRangeDownloader(
grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle
grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle
)
await new_mrd.open()
# persisted_size not set when opened with read_handle
Expand All @@ -259,3 +252,6 @@ async def test_mrd_open_with_read_handle(appendable_object):
await new_mrd.download_ranges([(0, 0, buffer)])
await new_mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
del mrd
del new_mrd
gc.collect()