From 79c29078fa3cd22371953ff2a42279fe4ca0e1ac Mon Sep 17 00:00:00 2001 From: Tyler Williams Date: Tue, 20 Jun 2023 21:59:20 -0700 Subject: [PATCH] Support new-style digest functions --- .../build/lib/remote/ByteStreamUploader.java | 37 +++++- .../build/lib/remote/GrpcCacheClient.java | 24 +++- .../lib/remote/ByteStreamUploaderTest.java | 120 ++++++++++++++---- ...SpawnRunnerWithGrpcRemoteExecutorTest.java | 12 +- 4 files changed, 156 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 18aaa6a807a7cf..fd53932fce7f78 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.DigestFunction; import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; @@ -69,6 +70,7 @@ final class ByteStreamUploader { private final CallCredentialsProvider callCredentialsProvider; private final long callTimeoutSecs; private final RemoteRetrier retrier; + private final DigestFunction.Value digestFunction; @Nullable private final Semaphore openedFilePermits; @@ -89,7 +91,8 @@ final class ByteStreamUploader { CallCredentialsProvider callCredentialsProvider, long callTimeoutSecs, RemoteRetrier retrier, - int maximumOpenFiles) { + int maximumOpenFiles, + DigestFunction.Value digestFunction) { checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); this.instanceName = instanceName; this.channel = channel; @@ -97,6 +100,7 @@ final class ByteStreamUploader { this.callTimeoutSecs = callTimeoutSecs; this.retrier = retrier; this.openedFilePermits = maximumOpenFiles != -1 ? new Semaphore(maximumOpenFiles) : null; + this.digestFunction = digestFunction; } @VisibleForTesting @@ -175,11 +179,34 @@ public ListenableFuture uploadBlobAsync( MoreExecutors.directExecutor()); } - private static String buildUploadResourceName( + private boolean isOldStyleDigestFunction() { + // Old-style digest functions (SHA256, etc) are distinguishable by the length + // of their hash alone and do not require extra specification, but newer + // digest functions (which may have the same length hashes as the older + // functions!) must be explicitly specified in the upload resource name. + return digestFunction.getNumber() <= 7; + } + + private String buildUploadResourceName( String instanceName, UUID uuid, Digest digest, boolean compressed) { - String template = - compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d"; - String resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes()); + + String resourceName; + + if (isOldStyleDigestFunction()) { + String template = + compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d"; + resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes()); + } else { + String template = + compressed ? "uploads/%s/compressed-blobs/zstd/%s/%s/%d" : "uploads/%s/blobs/%s/%s/%d"; + resourceName = + format( + template, + uuid, + digestFunction.getValueDescriptor().getName().toLowerCase(), + digest.getHash(), + digest.getSizeBytes()); + } if (!Strings.isNullOrEmpty(instanceName)) { resourceName = instanceName + "/" + resourceName; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 470d3845dc3273..0a8bb99e310745 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -22,6 +22,7 @@ import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.DigestFunction; import build.bazel.remote.execution.v2.FindMissingBlobsRequest; import build.bazel.remote.execution.v2.FindMissingBlobsResponse; import build.bazel.remote.execution.v2.GetActionResultRequest; @@ -107,7 +108,8 @@ public GrpcCacheClient( callCredentialsProvider, options.remoteTimeout.getSeconds(), retrier, - options.maximumOpenFiles); + options.maximumOpenFiles, + digestUtil.getDigestFunction()); maxMissingBlobsDigestsPerMessage = computeMaxMissingBlobsDigestsPerMessage(); Preconditions.checkState( maxMissingBlobsDigestsPerMessage > 0, "Error: gRPC message size too small."); @@ -352,12 +354,24 @@ private ListenableFuture downloadBlob( MoreExecutors.directExecutor()); } - public static String getResourceName(String instanceName, Digest digest, boolean compressed) { + private static boolean isOldStyleDigestFunction(DigestFunction.Value digestFunction) { + // Old-style digest functions (SHA256, etc) are distinguishable by the length + // of their hash alone and do not require extra specification, but newer + // digest functions (which may have the same length hashes as the older + // functions!) must be explicitly specified in the upload resource name. + return digestFunction.getNumber() <= 7; + } + + public static String getResourceName( + String instanceName, Digest digest, boolean compressed, DigestFunction.Value digestFunction) { String resourceName = ""; if (!instanceName.isEmpty()) { resourceName += instanceName + "/"; } resourceName += compressed ? "compressed-blobs/zstd/" : "blobs/"; + if (!isOldStyleDigestFunction(digestFunction)) { + resourceName += digestFunction.getValueDescriptor().getName().toLowerCase() + "/"; + } return resourceName + DigestUtil.toString(digest); } @@ -369,7 +383,11 @@ private ListenableFuture requestRead( @Nullable Supplier digestSupplier, Channel channel) { String resourceName = - getResourceName(options.remoteInstanceName, digest, options.cacheCompression); + getResourceName( + options.remoteInstanceName, + digest, + options.cacheCompression, + digestUtil.getDigestFunction()); SettableFuture future = SettableFuture.create(); OutputStream out; try { diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index fd19dddc9e9114..53d433faa03e21 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.DigestFunction; import build.bazel.remote.execution.v2.RequestMetadata; import com.github.luben.zstd.Zstd; import com.github.luben.zstd.ZstdInputStream; @@ -165,7 +166,8 @@ public void singleBlobUploadShouldWork() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -192,7 +194,8 @@ public void singleChunkCompressedUploadAlreadyExists() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /* maximumOpenFiles= */ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = {'A'}; @@ -232,8 +235,7 @@ public void onError(Throwable throwable) { } @Override - public void onCompleted() { - } + public void onCompleted() {} }; } }); @@ -256,7 +258,8 @@ public void progressiveUploadShouldWork() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, 3, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -372,7 +375,8 @@ public void progressiveCompressedUploadShouldWork() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, 300, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); int chunkSize = 1024; int skipSize = chunkSize + 1; @@ -491,7 +495,8 @@ public void progressiveCompressedUploadSeesAlreadyExistsAtTheEnd() throws Except CallCredentialsProvider.NO_CREDENTIALS, 300, retrier, - /* maximumOpenFiles= */ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); int chunkSize = 1024; byte[] blob = new byte[chunkSize * 2 + 1]; @@ -549,7 +554,8 @@ public void concurrentlyCompletedUploadIsNotRetried() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, 1, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -607,7 +613,8 @@ public void unimplementedQueryShouldRestartUpload() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, 3, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -676,7 +683,8 @@ public void earlyWriteResponseShouldCompleteUpload() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, 3, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -713,7 +721,8 @@ public void incorrectCommittedSizeFailsCompletedUpload() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, 3, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -766,7 +775,8 @@ public void incorrectCommittedSizeDoesNotFailIncompleteUpload() throws Exception CallCredentialsProvider.NO_CREDENTIALS, 300, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -798,7 +808,8 @@ public void multipleBlobsUploadShouldWork() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); int numUploads = 10; Map blobsByHash = Maps.newHashMap(); @@ -830,7 +841,8 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE]; Chunker chunker = Mockito.mock(Chunker.class); Digest digest = DIGEST_UTIL.compute(blob); @@ -862,7 +874,8 @@ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAll CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - maximumOpenFiles); + maximumOpenFiles, + /* digestFunction= */ DigestFunction.Value.SHA256); assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(999); @@ -900,7 +913,8 @@ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); assertThat(uploader.getOpenedFilePermits()).isNull(); int numUploads = 10; @@ -936,7 +950,8 @@ public void contextShouldBePreservedUponRetries() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); List toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc"); Map chunkers = Maps.newHashMapWithExpectedSize(toUpload.size()); @@ -1066,7 +1081,8 @@ public int maxConcurrency() { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE]; Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); @@ -1127,7 +1143,8 @@ public void errorsShouldBeReported() throws IOException, InterruptedException { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE]; Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); @@ -1163,7 +1180,8 @@ public void failureInRetryExecutorShouldBeHandled() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); serviceRegistry.addService( new ByteStreamImplBase() { @@ -1202,7 +1220,8 @@ public void resourceNameWithoutInstanceName() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); serviceRegistry.addService( new ByteStreamImplBase() { @@ -1234,6 +1253,50 @@ public void onCompleted() { uploader.uploadBlob(context, digest, chunker); } + @Test + public void resourceWithNewStyleDigestFunction() throws Exception { + RemoteRetrier retrier = + TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + /* instanceName= */ null, + referenceCountedChannel, + CallCredentialsProvider.NO_CREDENTIALS, + /* callTimeoutSecs= */ 60, + retrier, + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.BLAKE3); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver response) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) { + // Test that the resource name contains the digest function. + assertThat(writeRequest.getResourceName()).contains("blobs/blake3/"); + } + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onCompleted() { + response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build()); + response.onCompleted(); + } + }; + } + }); + + byte[] blob = new byte[1]; + Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); + Digest digest = DIGEST_UTIL.compute(blob); + + uploader.uploadBlob(context, digest, chunker); + } + @Test public void nonRetryableStatusShouldNotBeRetried() throws Exception { RemoteRetrier retrier = @@ -1246,7 +1309,8 @@ public void nonRetryableStatusShouldNotBeRetried() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); AtomicInteger numCalls = new AtomicInteger(); @@ -1299,7 +1363,8 @@ public void refresh() throws IOException { callCredentialsProvider, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -1355,7 +1420,8 @@ public void refresh() throws IOException { callCredentialsProvider, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); @@ -1425,7 +1491,8 @@ public void failureAfterUploadCompletes() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - -1); + -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE - 1]; new Random().nextBytes(blob); @@ -1484,7 +1551,8 @@ public void testCompressedUploads() throws Exception { CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier, - /*maximumOpenFiles=*/ -1); + /* maximumOpenFiles= */ -1, + /* digestFunction= */ DigestFunction.Value.SHA256); byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java index c03e0a6d7395f9..0663ba4aeff8a4 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java @@ -31,6 +31,7 @@ import build.bazel.remote.execution.v2.Command; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.DigestFunction; import build.bazel.remote.execution.v2.Directory; import build.bazel.remote.execution.v2.ExecuteRequest; import build.bazel.remote.execution.v2.ExecuteResponse; @@ -1110,7 +1111,8 @@ public void findMissingBlobs( } }); String stdOutResourceName = - getResourceName(remoteOptions.remoteInstanceName, stdOutDigest, false); + getResourceName( + remoteOptions.remoteInstanceName, stdOutDigest, false, DigestFunction.Value.SHA256); serviceRegistry.addService( new ByteStreamImplBase() { @Override @@ -1171,7 +1173,8 @@ public void findMissingBlobs( } }); String stdOutResourceName = - getResourceName(remoteOptions.remoteInstanceName, stdOutDigest, false); + getResourceName( + remoteOptions.remoteInstanceName, stdOutDigest, false, DigestFunction.Value.SHA256); serviceRegistry.addService( new ByteStreamImplBase() { @Override @@ -1297,7 +1300,10 @@ public void getActionResult( }); String dummyTreeResourceName = getResourceName( - remoteOptions.remoteInstanceName, DUMMY_OUTPUT_DIRECTORY.getTreeDigest(), false); + remoteOptions.remoteInstanceName, + DUMMY_OUTPUT_DIRECTORY.getTreeDigest(), + false, + DigestFunction.Value.SHA256); serviceRegistry.addService( new ByteStreamImplBase() { private boolean first = true;