diff --git a/cloudbuild/run_zonal_tests.sh b/cloudbuild/run_zonal_tests.sh index ef94e629b..c0f8eabc2 100644 --- a/cloudbuild/run_zonal_tests.sh +++ b/cloudbuild/run_zonal_tests.sh @@ -22,5 +22,6 @@ pip install -e . echo '--- Setting up environment variables on VM ---' export ZONAL_BUCKET=${_ZONAL_BUCKET} export RUN_ZONAL_SYSTEM_TESTS=True -echo '--- Running Zonal tests on VM ---' +CURRENT_ULIMIT=$(ulimit -n) +echo '--- Running Zonal tests on VM with ulimit set to ---' $CURRENT_ULIMIT pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml index 383c4fa96..db36b24fc 100644 --- a/cloudbuild/zb-system-tests-cloudbuild.yaml +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -3,6 +3,7 @@ substitutions: _ZONE: "us-central1-a" _SHORT_BUILD_ID: ${BUILD_ID:0:8} _VM_NAME: "py-sdk-sys-test-${_SHORT_BUILD_ID}" + _ULIMIT: "10000" # 10k, for gRPC bidi streams @@ -67,7 +68,7 @@ steps: # Execute the script on the VM via SSH. # Capture the exit code to ensure cleanup happens before the build fails. set +e - gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh" + gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh" EXIT_CODE=$? set -e diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 8697410b0..d8d20ba36 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -1,5 +1,4 @@ # py standard imports -import asyncio import os import uuid from io import BytesIO @@ -8,6 +7,7 @@ import google_crc32c import pytest +import gc # current library imports from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient @@ -19,6 +19,7 @@ AsyncMultiRangeDownloader, ) + pytestmark = pytest.mark.skipif( os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True", reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.", @@ -36,36 +37,6 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]: return a + step, a + 2 * step -async def write_one_appendable_object( - bucket_name: str, - object_name: str, - data: bytes, -) -> None: - """Helper to write an appendable object.""" - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name) - await writer.open() - await writer.append(data) - await writer.close() - - -@pytest.fixture(scope="function") -def appendable_object(storage_client, blobs_to_delete): - """Fixture to create and cleanup an appendable object.""" - object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}" - asyncio.run( - write_one_appendable_object( - _ZONAL_BUCKET, - object_name, - _BYTES_TO_UPLOAD, - ) - ) - yield object_name - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - - @pytest.mark.asyncio @pytest.mark.parametrize( "object_size", @@ -114,6 +85,9 @@ async def test_basic_wrd( # Clean up; use json client (i.e. `storage_client` fixture) to delete. blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() @pytest.mark.asyncio @@ -161,12 +135,20 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size) # Clean up; use json client (i.e. `storage_client` fixture) to delete. blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() @pytest.mark.asyncio @pytest.mark.parametrize( "flush_interval", - [2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES], + [ + 2 * 1024 * 1024, + 4 * 1024 * 1024, + 8 * 1024 * 1024, + _DEFAULT_FLUSH_INTERVAL_BYTES, + ], ) async def test_wrd_with_non_default_flush_interval( storage_client, @@ -214,6 +196,9 @@ async def test_wrd_with_non_default_flush_interval( # Clean up; use json client (i.e. `storage_client` fixture) to delete. blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() @pytest.mark.asyncio @@ -237,20 +222,28 @@ async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delet # Clean up; use json client (i.e. `storage_client` fixture) to delete. blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() @pytest.mark.asyncio -async def test_mrd_open_with_read_handle(appendable_object): - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client +async def test_mrd_open_with_read_handle(): + grpc_client = AsyncGrpcClient().grpc_client + object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.close() - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object) + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) await mrd.open() read_handle = mrd.read_handle await mrd.close() # Open a new MRD using the `read_handle` obtained above new_mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle + grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle ) await new_mrd.open() # persisted_size not set when opened with read_handle @@ -259,3 +252,6 @@ async def test_mrd_open_with_read_handle(appendable_object): await new_mrd.download_ranges([(0, 0, buffer)]) await new_mrd.close() assert buffer.getvalue() == _BYTES_TO_UPLOAD + del mrd + del new_mrd + gc.collect()