diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 63f0cb1d844190..13856e021fed59 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -8,6 +8,7 @@ package( filegroup( name = "srcs", srcs = glob(["*"]) + [ + "//src/main/java/com/google/devtools/build/lib/remote/chunking:srcs", "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs", "//src/main/java/com/google/devtools/build/lib/remote/common:srcs", "//src/main/java/com/google/devtools/build/lib/remote/disk:srcs", @@ -28,7 +29,9 @@ java_library( srcs = glob( ["*.java"], exclude = [ + "ChunkingConfig.java", "ExecutionStatusException.java", + "FastCDCChunker.java", "ReferenceCountedChannel.java", "ChannelConnectionWithServerCapabilitiesFactory.java", "RemoteRetrier.java", @@ -53,6 +56,7 @@ java_library( ":Retrier", ":abstract_action_input_prefetcher", ":lease_service", + "//src/main/java/com/google/devtools/build/lib/concurrent:task_deduplicator", ":remote_important_output_handler", ":remote_output_checker", ":scrubber", @@ -97,6 +101,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/exec/local", "//src/main/java/com/google/devtools/build/lib/packages/semantics", "//src/main/java/com/google/devtools/build/lib/profiler", + "//src/main/java/com/google/devtools/build/lib/remote/chunking", "//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java new file mode 100644 index 00000000000000..a402fd65634c88 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloader.java @@ -0,0 +1,85 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; + +import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.SplitBlobResponse; +import com.google.common.flogger.GoogleLogger; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +/** + * Downloads blobs by sequentially fetching chunks via the SplitBlob API. + */ +public class ChunkedBlobDownloader { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + + private final GrpcCacheClient grpcCacheClient; + private final CombinedCache combinedCache; + + public ChunkedBlobDownloader(GrpcCacheClient grpcCacheClient, CombinedCache combinedCache) { + this.grpcCacheClient = grpcCacheClient; + this.combinedCache = combinedCache; + } + + /** + * Downloads a blob using chunked download via the SplitBlob API. This should be called with + * virtual threads, as it blocks on futures via {@link + * com.google.devtools.build.lib.remote.util.Utils#getFromFuture}. + */ + public void downloadChunked( + RemoteActionExecutionContext context, Digest blobDigest, OutputStream out) + throws CacheNotFoundException, IOException, InterruptedException { + List chunkDigests; + try { + chunkDigests = getChunkDigests(context, blobDigest); + } catch (IOException | StatusRuntimeException e) { + logger.atWarning().withCause(e).log( + "SplitBlob failed for %s/%d", blobDigest.getHash(), blobDigest.getSizeBytes()); + throw new CacheNotFoundException(blobDigest); + } + downloadAndReassembleChunks(context, chunkDigests, out); + } + + private List getChunkDigests( + RemoteActionExecutionContext context, Digest blobDigest) + throws IOException, InterruptedException { + ListenableFuture splitResponseFuture = + grpcCacheClient.splitBlob(context, blobDigest); + if (splitResponseFuture == null) { + throw new CacheNotFoundException(blobDigest); + } + List chunkDigests = getFromFuture(splitResponseFuture).getChunkDigestsList(); + if (chunkDigests.isEmpty() && blobDigest.getSizeBytes() > 0) { + throw new CacheNotFoundException(blobDigest); + } + return chunkDigests; + } + + private void downloadAndReassembleChunks( + RemoteActionExecutionContext context, List chunkDigests, OutputStream out) + throws IOException, InterruptedException { + for (Digest chunkDigest : chunkDigests) { + getFromFuture(combinedCache.downloadBlob(context, chunkDigest, out)); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobUploader.java new file mode 100644 index 00000000000000..d6d9bf51329069 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/ChunkedBlobUploader.java @@ -0,0 +1,107 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; + +import build.bazel.remote.execution.v2.Digest; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.remote.chunking.ChunkingConfig; +import com.google.devtools.build.lib.remote.chunking.FastCDCChunker; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.vfs.Path; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Uploads blobs in chunks using Content-Defined Chunking with FastCDC 2020. + * + *

+ * Upload flow for blobs above threshold: + * + *

    + *
  1. Chunk file with FastCDC + *
  2. Call findMissingDigests on chunk digests + *
  3. Upload only missing chunks + *
  4. Call SpliceBlob to register the blob as the concatenation of chunks + *
+ */ +public class ChunkedBlobUploader { + + private final GrpcCacheClient grpcCacheClient; + private final CombinedCache combinedCache; + private final FastCDCChunker chunker; + private final long chunkingThreshold; + + public ChunkedBlobUploader( + GrpcCacheClient grpcCacheClient, + CombinedCache combinedCache, + ChunkingConfig config, + DigestUtil digestUtil) { + this.grpcCacheClient = grpcCacheClient; + this.combinedCache = combinedCache; + this.chunker = new FastCDCChunker(config, digestUtil); + this.chunkingThreshold = config.chunkingThreshold(); + } + + public long getChunkingThreshold() { + return chunkingThreshold; + } + + public void uploadChunked(RemoteActionExecutionContext context, Digest blobDigest, Path file) + throws IOException, InterruptedException { + List chunkDigests; + try (InputStream input = file.getInputStream()) { + chunkDigests = chunker.chunkToDigests(input); + } + if (chunkDigests.isEmpty()) { + return; + } + + ImmutableSet missingDigests = getFromFuture(grpcCacheClient.findMissingDigests(context, chunkDigests)); + uploadMissingChunks(context, missingDigests, chunkDigests, file); + getFromFuture(grpcCacheClient.spliceBlob(context, blobDigest, chunkDigests)); + } + + private void uploadMissingChunks( + RemoteActionExecutionContext context, + ImmutableSet missingDigests, + List chunkDigests, + Path file) + throws IOException, InterruptedException { + if (missingDigests.isEmpty()) { + return; + } + + Set uploaded = new HashSet<>(); + try (InputStream input = file.getInputStream()) { + for (Digest chunkDigest : chunkDigests) { + if (missingDigests.contains(chunkDigest) && uploaded.add(chunkDigest)) { + ByteString.Output out = ByteString.newOutput((int) chunkDigest.getSizeBytes()); + ByteStreams.limit(input, chunkDigest.getSizeBytes()).transferTo(out); + getFromFuture(combinedCache.uploadBlob(context, chunkDigest, out.toByteString())); + } else { + input.skipNBytes(chunkDigest.getSizeBytes()); + } + } + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/CombinedCache.java b/src/main/java/com/google/devtools/build/lib/remote/CombinedCache.java index 26cca8fe703b5d..b0ddd989be83a2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/CombinedCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/CombinedCache.java @@ -32,9 +32,12 @@ import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.concurrent.ThreadSafety; import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent; import com.google.devtools.build.lib.exec.SpawnProgressEvent; +import com.google.devtools.build.lib.remote.chunking.ChunkingConfig; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.LazyFileOutputStream; import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException; @@ -64,6 +67,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -93,9 +97,17 @@ public class CombinedCache extends AbstractReferenceCounted { private final CountDownLatch closeCountDownLatch = new CountDownLatch(1); protected final AsyncTaskCache.NoResult casUploadCache = AsyncTaskCache.NoResult.create(); + @SuppressWarnings("AllowVirtualThreads") + private final ListeningExecutorService virtualThreadExecutor = + MoreExecutors.listeningDecorator( + Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("combined-cache-", 0).factory())); + @Nullable protected final RemoteCacheClient remoteCacheClient; @Nullable protected final DiskCacheClient diskCacheClient; @Nullable protected final String symlinkTemplate; + @Nullable private final ChunkingConfig chunkingConfig; + @Nullable private final ChunkedBlobDownloader chunkedDownloader; + @Nullable private final ChunkedBlobUploader chunkedUploader; protected final DigestUtil digestUtil; public CombinedCache( @@ -110,6 +122,18 @@ public CombinedCache( this.diskCacheClient = diskCacheClient; this.symlinkTemplate = symlinkTemplate; this.digestUtil = digestUtil; + + if (remoteCacheClient instanceof GrpcCacheClient grpcClient + && grpcClient.getChunkingConfig() != null) { + ChunkingConfig config = grpcClient.getChunkingConfig(); + this.chunkingConfig = config; + this.chunkedDownloader = new ChunkedBlobDownloader(grpcClient, this); + this.chunkedUploader = new ChunkedBlobUploader(grpcClient, this, config, digestUtil); + } else { + this.chunkingConfig = null; + this.chunkedDownloader = null; + this.chunkedUploader = null; + } } public CacheCapabilities getRemoteCacheCapabilities() throws IOException { @@ -130,6 +154,11 @@ public ServerCapabilities getRemoteServerCapabilities() throws IOException { return remoteCacheClient.getServerCapabilities(); } + @Nullable + public ChunkingConfig getChunkingConfig() { + return chunkingConfig; + } + /** * Class to keep track of which cache (disk or remote) a given [cached] ActionResult comes from. */ @@ -315,13 +344,21 @@ protected ListenableFuture uploadFile( ListenableFuture remoteCacheFuture = Futures.immediateVoidFuture(); if (remoteCacheClient != null && context.getWriteCachePolicy().allowRemoteCache()) { - Completable upload = - casUploadCache.execute( - digest, - RxFutures.toCompletable( - () -> remoteCacheClient.uploadFile(context, digest, file), directExecutor()), - force); - remoteCacheFuture = RxFutures.toListenableFuture(upload); + if (chunkedUploader != null + && digest.getSizeBytes() > chunkingConfig.chunkingThreshold()) { + remoteCacheFuture = virtualThreadExecutor.submit(() -> { + chunkedUploader.uploadChunked(context, digest, file); + return null; + }); + } else { + Completable upload = + casUploadCache.execute( + digest, + RxFutures.toCompletable( + () -> remoteCacheClient.uploadFile(context, digest, file), directExecutor()), + force); + remoteCacheFuture = RxFutures.toListenableFuture(upload); + } } return Futures.whenAllSucceed(diskCacheFuture, remoteCacheFuture) @@ -416,7 +453,7 @@ private ListenableFuture downloadBlob( directExecutor()); } - private ListenableFuture downloadBlob( + ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out) { ListenableFuture future = immediateFailedFuture(new CacheNotFoundException(digest)); @@ -440,6 +477,27 @@ private ListenableFuture downloadBlobFromRemote( RemoteActionExecutionContext context, Digest digest, OutputStream out) { checkState(remoteCacheClient != null && context.getReadCachePolicy().allowRemoteCache()); + if (chunkedDownloader != null + && digest.getSizeBytes() > chunkingConfig.chunkingThreshold()) { + ListenableFuture chunkedDownloadFuture = + virtualThreadExecutor.submit(() -> { + chunkedDownloader.downloadChunked(context, digest, out); + return null; + }); + return Futures.catchingAsync( + chunkedDownloadFuture, + CacheNotFoundException.class, + (e) -> regularDownloadBlobFromRemote(context, digest, out), + directExecutor()); + } + + return regularDownloadBlobFromRemote(context, digest, out); + } + + private ListenableFuture regularDownloadBlobFromRemote( + RemoteActionExecutionContext context, Digest digest, OutputStream out) { + checkState(remoteCacheClient != null && context.getReadCachePolicy().allowRemoteCache()); + if (diskCacheClient != null && context.getWriteCachePolicy().allowDiskCache()) { Path tempPath = diskCacheClient.getTempPath(); LazyFileOutputStream tempOut = new LazyFileOutputStream(tempPath); diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 345744ce70cc6d..a73c3dfb614393 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -15,11 +15,13 @@ package com.google.devtools.build.lib.remote; import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction; import build.bazel.remote.execution.v2.ActionCacheGrpc; import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub; import build.bazel.remote.execution.v2.ActionResult; +import build.bazel.remote.execution.v2.ChunkingFunction; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageFutureStub; import build.bazel.remote.execution.v2.Digest; @@ -29,6 +31,9 @@ import build.bazel.remote.execution.v2.GetActionResultRequest; import build.bazel.remote.execution.v2.RequestMetadata; import build.bazel.remote.execution.v2.ServerCapabilities; +import build.bazel.remote.execution.v2.SpliceBlobRequest; +import build.bazel.remote.execution.v2.SplitBlobRequest; +import build.bazel.remote.execution.v2.SplitBlobResponse; import build.bazel.remote.execution.v2.UpdateActionResultRequest; import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; @@ -49,6 +54,7 @@ import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff; +import com.google.devtools.build.lib.remote.chunking.ChunkingConfig; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; @@ -88,6 +94,7 @@ public class GrpcCacheClient implements RemoteCacheClient, MissingDigestsFinder private final RemoteRetrier retrier; private final ByteStreamUploader uploader; private final int maxMissingBlobsDigestsPerMessage; + @Nullable private final ChunkingConfig chunkingConfig; private final AtomicBoolean closed = new AtomicBoolean(); @@ -97,12 +104,14 @@ public GrpcCacheClient( CallCredentialsProvider callCredentialsProvider, RemoteOptions options, RemoteRetrier retrier, - DigestUtil digestUtil) { + DigestUtil digestUtil, + @Nullable ChunkingConfig chunkingConfig) { this.callCredentialsProvider = callCredentialsProvider; this.channel = channel; this.options = options; this.digestUtil = digestUtil; this.retrier = retrier; + this.chunkingConfig = chunkingConfig; this.uploader = new ByteStreamUploader( options.remoteInstanceName, @@ -117,6 +126,7 @@ public GrpcCacheClient( maxMissingBlobsDigestsPerMessage > 0, "Error: gRPC message size too small."); } + private int computeMaxMissingBlobsDigestsPerMessage() { final int overhead = FindMissingBlobsRequest.newBuilder() @@ -164,6 +174,61 @@ private ActionCacheFutureStub acFutureStub( .withDeadlineAfter(options.remoteTimeout.toSeconds(), TimeUnit.SECONDS); } + @Override + public ListenableFuture spliceBlob( + RemoteActionExecutionContext context, + Digest blobDigest, + List chunkDigests) { + if (!options.experimentalRemoteCacheChunking) { + return null; + } + SpliceBlobRequest request = + SpliceBlobRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setBlobDigest(blobDigest) + .addAllChunkDigests(chunkDigests) + .setDigestFunction(digestUtil.getDigestFunction()) + .setChunkingFunction(ChunkingFunction.Value.FAST_CDC_2020) + .build(); + return Futures.transform( + Utils.refreshIfUnauthenticatedAsync( + () -> + retrier.executeAsync( + () -> + channel.withChannelFuture( + ch -> casFutureStub(context, ch).spliceBlob(request))), + callCredentialsProvider), + unused -> null, + directExecutor()); + } + + /** + * Queries the server for chunk information about a blob using the SplitBlob RPC. + * + * @return a future with the split blob response, or null if chunking is not enabled + */ + @Nullable + public ListenableFuture splitBlob( + RemoteActionExecutionContext context, Digest digest) { + if (!options.experimentalRemoteCacheChunking) { + return null; + } + SplitBlobRequest request = + SplitBlobRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setBlobDigest(digest) + .setDigestFunction(digestUtil.getDigestFunction()) + .setChunkingFunction(ChunkingFunction.Value.FAST_CDC_2020) + .build(); + return Utils.refreshIfUnauthenticatedAsync( + () -> + retrier.executeAsync( + () -> + channel.withChannelFuture( + ch -> casFutureStub(context, ch).splitBlob(request))), + callCredentialsProvider); + } + @Override public void close() { if (closed.getAndSet(true)) { @@ -172,6 +237,11 @@ public void close() { channel.release(); } + @Nullable + public ChunkingConfig getChunkingConfig() { + return chunkingConfig; + } + /** Returns true if 'options.remoteCache' uses 'grpc' or an empty scheme */ public static boolean isRemoteCacheOptions(RemoteOptions options) { if (isNullOrEmpty(options.remoteCache)) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 7da614dfd33306..6904b18e1a22ba 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -18,6 +18,7 @@ import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.DigestFunction; +import build.bazel.remote.execution.v2.ServerCapabilities; import com.github.benmanes.caffeine.cache.Cache; import com.google.auth.Credentials; import com.google.common.annotations.VisibleForTesting; @@ -64,6 +65,7 @@ import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; import com.google.devtools.build.lib.remote.Retrier.ResultClassifier; import com.google.devtools.build.lib.remote.Retrier.ResultClassifier.Result; +import com.google.devtools.build.lib.remote.chunking.ChunkingConfig; import com.google.devtools.build.lib.remote.circuitbreaker.CircuitBreakerFactory; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; @@ -713,9 +715,20 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } } + ChunkingConfig chunkingConfig = null; + if (remoteOptions.experimentalRemoteCacheChunking) { + try { + ServerCapabilities capabilities = cacheChannel.getServerCapabilities(); + chunkingConfig = ChunkingConfig.fromServerCapabilities(capabilities); + } catch (IOException e) { + chunkingConfig = ChunkingConfig.defaults(); + } + } + RemoteCacheClient remoteCacheClient = new GrpcCacheClient( - cacheChannel.retain(), callCredentialsProvider, remoteOptions, retrier, digestUtil); + cacheChannel.retain(), callCredentialsProvider, remoteOptions, retrier, digestUtil, + chunkingConfig); cacheChannel.release(); DiskCacheClient diskCacheClient = null; diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java index 6028de5b1d4257..6d038e77e769bb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java @@ -246,6 +246,24 @@ public static ClientServerCompatibilityStatus checkClientServerCompatibility( "--remote_cache_compression requested but remote does not support compression"); } + if (remoteOptions.experimentalRemoteCacheChunking) { + if (!cacheCap.getSplitBlobSupport()) { + result.addError( + "--experimental_remote_cache_chunking requested but remote does not support" + + " SplitBlob"); + } + if (!cacheCap.getSpliceBlobSupport()) { + result.addError( + "--experimental_remote_cache_chunking requested but remote does not support" + + " SpliceBlob"); + } + if (!cacheCap.hasFastCdc2020Params()) { + result.addError( + "--experimental_remote_cache_chunking requested but remote does not support" + + " FastCDC 2020 chunking algorithm"); + } + } + // Check result cache priority is in the supported range. checkPriorityInRange( remoteOptions.remoteResultCachePriority, diff --git a/src/main/java/com/google/devtools/build/lib/remote/chunking/BUILD b/src/main/java/com/google/devtools/build/lib/remote/chunking/BUILD new file mode 100644 index 00000000000000..f3eb55bdca3f93 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/chunking/BUILD @@ -0,0 +1,26 @@ +load("@rules_java//java:defs.bzl", "java_library") + +package( + default_applicable_licenses = ["//:license"], + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + srcs = glob(["*"]), + visibility = ["//src:__subpackages__"], +) + +java_library( + name = "chunking", + srcs = [ + "ChunkingConfig.java", + "FastCDCChunker.java" + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote/options", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", + "//third_party:guava", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/chunking/ChunkingConfig.java b/src/main/java/com/google/devtools/build/lib/remote/chunking/ChunkingConfig.java new file mode 100644 index 00000000000000..723cd6dc1246ce --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/chunking/ChunkingConfig.java @@ -0,0 +1,72 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.chunking; + +import build.bazel.remote.execution.v2.CacheCapabilities; +import build.bazel.remote.execution.v2.FastCdc2020Params; +import build.bazel.remote.execution.v2.ServerCapabilities; + +/** Configuration for content-defined chunking. All sizes are in bytes. */ +public record ChunkingConfig(int avgChunkSize, int normalizationLevel, int seed) { + + public static final int DEFAULT_AVG_CHUNK_SIZE = 512 * 1024; + public static final int DEFAULT_NORMALIZATION_LEVEL = 2; + public static final int DEFAULT_SEED = 0; + + public int minChunkSize() { + return avgChunkSize / 4; + } + + public int maxChunkSize() { + return avgChunkSize * 4; + } + + /** Blobs larger than this should be chunked. Equal to maxChunkSize(). */ + public long chunkingThreshold() { + return maxChunkSize(); + } + + public static ChunkingConfig defaults() { + return new ChunkingConfig( + DEFAULT_AVG_CHUNK_SIZE, + DEFAULT_NORMALIZATION_LEVEL, + DEFAULT_SEED); + } + + public static ChunkingConfig fromServerCapabilities(ServerCapabilities capabilities) { + if (!capabilities.hasCacheCapabilities()) { + return null; + } + CacheCapabilities cacheCap = capabilities.getCacheCapabilities(); + + if (!cacheCap.hasFastCdc2020Params()) { + return null; + } + + FastCdc2020Params params = cacheCap.getFastCdc2020Params(); + int avgSize = DEFAULT_AVG_CHUNK_SIZE; + int seed = DEFAULT_SEED; + + long configAvgSize = params.getAvgChunkSizeBytes(); + if (configAvgSize >= 1024 + && configAvgSize <= 1024 * 1024 + && (configAvgSize & (configAvgSize - 1)) == 0) { + avgSize = (int) configAvgSize; + } + seed = params.getSeed(); + + return new ChunkingConfig(avgSize, DEFAULT_NORMALIZATION_LEVEL, seed); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/chunking/FastCDCChunker.java b/src/main/java/com/google/devtools/build/lib/remote/chunking/FastCDCChunker.java new file mode 100644 index 00000000000000..964ed5802951fe --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/chunking/FastCDCChunker.java @@ -0,0 +1,300 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.chunking; + +import static com.google.common.base.Preconditions.checkArgument; + +import build.bazel.remote.execution.v2.Digest; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * FastCDC 2020 implementation for splitting large blobs. + * + *

+ * This module implements the canonical FastCDC algorithm as described in the + * [paper](https://ieeexplore.ieee.org/document/9055082) by Wen Xia, et al., in + * 2020. + */ +public final class FastCDCChunker { + + // Masks for each of the desired number of bits, where 0 through 5 are unused. + // The values for sizes 64 bytes through 128 kilo-bytes come from the C + // reference implementation (found in the destor repository) while the extra + // values come from the restic-FastCDC repository. The FastCDC paper claims that + // the deduplication ratio is slightly improved when the mask bits are spread + // relatively evenly, hence these seemingly "magic" values. + // @formatter:off + private static final long[] MASKS = { + 0, // 0: padding + 0, // 1: padding + 0, // 2: padding + 0, // 3: padding + 0, // 4: padding + 0x0000000001804110L, // 5: unused except for NC 3 + 0x0000000001803110L, // 6: 64B + 0x0000000018035100L, // 7: 128B + 0x0000001800035300L, // 8: 256B + 0x0000019000353000L, // 9: 512B + 0x0000590003530000L, // 10: 1KB + 0x0000d90003530000L, // 11: 2KB + 0x0000d90103530000L, // 12: 4KB + 0x0000d90303530000L, // 13: 8KB + 0x0000d90313530000L, // 14: 16KB + 0x0000d90f03530000L, // 15: 32KB + 0x0000d90303537000L, // 16: 64KB + 0x0000d90703537000L, // 17: 128KB + 0x0000d90707537000L, // 18: 256KB + 0x0000d91707537000L, // 19: 512KB + 0x0000d91747537000L, // 20: 1MB + 0x0000d91767537000L, // 21: 2MB + 0x0000d93767537000L, // 22: 4MB + 0x0000d93777537000L, // 23: 8MB + 0x0000d93777577000L, // 24: 16MB + 0x0000db3777577000L, // 25: unused except for NC 3 + }; + + // GEAR contains seemingly random numbers which are created by computing the MD5 digest of values + // from 0 to 255, using only the high 8 bytes of the 16-byte digest. This is the "gear hash" + // referred to in the FastCDC paper. + private static final long[] GEAR = { + 0x3b5d3c7d207e37dcL, 0x784d68ba91123086L, 0xcd52880f882e7298L, 0xeacf8e4e19fdcca7L, + 0xc31f385dfbd1632bL, 0x1d5f27001e25abe6L, 0x83130bde3c9ad991L, 0xc4b225676e9b7649L, + 0xaa329b29e08eb499L, 0xb67fcbd21e577d58L, 0x0027baaada2acf6bL, 0xe3ef2d5ac73c2226L, + 0x0890f24d6ed312b7L, 0xa809e036851d7c7eL, 0xf0a6fe5e0013d81bL, 0x1d026304452cec14L, + 0x03864632648e248fL, 0xcdaacf3dcd92b9b4L, 0xf5e012e63c187856L, 0x8862f9d3821c00b6L, + 0xa82f7338750f6f8aL, 0x1e583dc6c1cb0b6fL, 0x7a3145b69743a7f1L, 0xabb20fee404807ebL, + 0xb14b3cfe07b83a5dL, 0xb9dc27898adb9a0fL, 0x3703f5e91baa62beL, 0xcf0bb866815f7d98L, + 0x3d9867c41ea9dcd3L, 0x1be1fa65442bf22cL, 0x14300da4c55631d9L, 0xe698e9cbc6545c99L, + 0x4763107ec64e92a5L, 0xc65821fc65696a24L, 0x76196c064822f0b7L, 0x485be841f3525e01L, + 0xf652bc9c85974ff5L, 0xcad8352face9e3e9L, 0x2a6ed1dceb35e98eL, 0xc6f483badc11680fL, + 0x3cfd8c17e9cf12f1L, 0x89b83c5e2ea56471L, 0xae665cfd24e392a9L, 0xec33c4e504cb8915L, + 0x3fb9b15fc9fe7451L, 0xd7fd1fd1945f2195L, 0x31ade0853443efd8L, 0x255efc9863e1e2d2L, + 0x10eab6008d5642cfL, 0x46f04863257ac804L, 0xa52dc42a789a27d3L, 0xdaaadf9ce77af565L, + 0x6b479cd53d87febbL, 0x6309e2d3f93db72fL, 0xc5738ffbaa1ff9d6L, 0x6bd57f3f25af7968L, + 0x67605486d90d0a4aL, 0xe14d0b9663bfbdaeL, 0xb7bbd8d816eb0414L, 0xdef8a4f16b35a116L, + 0xe7932d85aaaffed6L, 0x08161cbae90cfd48L, 0x855507beb294f08bL, 0x91234ea6ffd399b2L, + 0xad70cf4b2435f302L, 0xd289a97565bc2d27L, 0x8e558437ffca99deL, 0x96d2704b7115c040L, + 0x0889bbcdfc660e41L, 0x5e0d4e67dc92128dL, 0x72a9f8917063ed97L, 0x438b69d409e016e3L, + 0xdf4fed8a5d8a4397L, 0x00f41dcf41d403f7L, 0x4814eb038e52603fL, 0x9dafbacc58e2d651L, + 0xfe2f458e4be170afL, 0x4457ec414df6a940L, 0x06e62f1451123314L, 0xbd1014d173ba92ccL, + 0xdef318e25ed57760L, 0x9fea0de9dfca8525L, 0x459de1e76c20624bL, 0xaeec189617e2d666L, + 0x126a2c06ab5a83cbL, 0xb1321532360f6132L, 0x65421503dbb40123L, 0x2d67c287ea089ab3L, + 0x6c93bff5a56bd6b6L, 0x4ffb2036cab6d98dL, 0xce7b785b1be7ad4fL, 0xedb42ef6189fd163L, + 0xdc905288703988f6L, 0x365f9c1d2c691884L, 0xc640583680d99bfeL, 0x3cd4624c07593ec6L, + 0x7f1ea8d85d7c5805L, 0x014842d480b57149L, 0x0b649bcb5a828688L, 0xbcd5708ed79b18f0L, + 0xe987c862fbd2f2f0L, 0x982731671f0cd82cL, 0xbaf13e8b16d8c063L, 0x8ea3109cbd951bbaL, + 0xd141045bfb385cadL, 0x2acbc1a0af1f7d30L, 0xe6444d89df03bfdfL, 0xa18cc771b8188ff9L, + 0x9834429db01c39bbL, 0x214add07fe086a1fL, 0x8f07c19b1f6b3ff9L, 0x56a297b1bf4ffe55L, + 0x94d558e493c54fc7L, 0x40bfc24c764552cbL, 0x931a706f8a8520cbL, 0x32229d322935bd52L, + 0x2560d0f5dc4fefafL, 0x9dbcc48355969bb6L, 0x0fd81c3985c0b56aL, 0xe03817e1560f2bdaL, + 0xc1bb4f81d892b2d5L, 0xb0c4864f4e28d2d7L, 0x3ecc49f9d9d6c263L, 0x51307e99b52ba65eL, + 0x8af2b688da84a752L, 0xf5d72523b91b20b6L, 0x6d95ff1ff4634806L, 0x562f21555458339aL, + 0xc0ce47f889336346L, 0x487823e5089b40d8L, 0xe4727c7ebc6d9592L, 0x5a8f7277e94970baL, + 0xfca2f406b1c8bb50L, 0x5b1f8a95f1791070L, 0xd304af9fc9028605L, 0x5440ab7fc930e748L, + 0x312d25fbca2ab5a1L, 0x10f4a4b234a4d575L, 0x90301d55047e7473L, 0x3b6372886c61591eL, + 0x293402b77c444e06L, 0x451f34a4d3e97dd7L, 0x3158d814d81bc57bL, 0x034942425b9bda69L, + 0xe2032ff9e532d9bbL, 0x62ae066b8b2179e5L, 0x9545e10c2f8d71d8L, 0x7ff7483eb2d23fc0L, + 0x00945fcebdc98d86L, 0x8764bbbe99b26ca2L, 0x1b1ec62284c0bfc3L, 0x58e0fcc4f0aa362bL, + 0x5f4abefa878d458dL, 0xfd74ac2f9607c519L, 0xa4e3fb37df8cbfa9L, 0xbf697e43cac574e5L, + 0x86f14a3f68f4cd53L, 0x24a23d076f1ce522L, 0xe725cd8048868cc8L, 0xbf3c729eb2464362L, + 0xd8f6cd57b3cc1ed8L, 0x6329e52425541577L, 0x62aa688ad5ae1ac0L, 0x0a242566269bf845L, + 0x168b1a4753aca74bL, 0xf789afefff2e7e3cL, 0x6c3362093b6fccdbL, 0x4ce8f50bd28c09b2L, + 0x006a2db95ae8aa93L, 0x975b0d623c3d1a8cL, 0x18605d3935338c5bL, 0x5bb6f6136cad3c71L, + 0x0f53a20701f8d8a6L, 0xab8c5ad2e7e93c67L, 0x40b5ac5127acaa29L, 0x8c7bf63c2075895fL, + 0x78bd9f7e014a805cL, 0xb2c9e9f4f9c8c032L, 0xefd6049827eb91f3L, 0x2be459f482c16fbdL, + 0xd92ce0c5745aaa8cL, 0x0aaa8fb298d965b9L, 0x2b37f92c6c803b15L, 0x8c54a5e94e0f0e78L, + 0x95f9b6e90c0a3032L, 0xe7939faa436c7874L, 0xd16bfe8f6a8a40c9L, 0x44982b86263fd2faL, + 0xe285fb39f984e583L, 0x779a8df72d7619d3L, 0xf2d79a8de8d5dd1eL, 0xd1037354d66684e2L, + 0x004c82a4e668a8e5L, 0x31d40a7668b044e6L, 0xd70578538bd02c11L, 0xdb45431078c5f482L, + 0x977121bb7f6a51adL, 0x73d5ccbd34eff8ddL, 0xe437a07d356e17cdL, 0x47b2782043c95627L, + 0x9fb251413e41d49aL, 0xccd70b60652513d3L, 0x1c95b31e8a1b49b2L, 0xcae73dfd1bcb4c1bL, + 0x34d98331b1f5b70fL, 0x784e39f22338d92fL, 0x18613d4a064df420L, 0xf1d8dae25f0bcebeL, + 0x33f77c15ae855efcL, 0x3c88b3b912eb109cL, 0x956a2ec96bafeea5L, 0x1aa005b5e0ad0e87L, + 0x5500d70527c4bb8eL, 0xe36c57196421cc44L, 0x13c4d286cc36ee39L, 0x5654a23d818b2a81L, + 0x77b1dc13d161abdcL, 0x734f44de5f8d5eb5L, 0x60717e174a6c89a2L, 0xd47d9649266a211eL, + 0x5b13a4322bb69e90L, 0xf7669609f8b5fc3cL, 0x21e6ac55bedcdac9L, 0x9b56b62b61166deaL, + 0xf48f66b939797e9cL, 0x35f332f9c0e6ae9aL, 0xcc733f6a9a878db0L, 0x3da161e41cc108c2L, + 0xb7d74ae535914d51L, 0x4d493b0b11d36469L, 0xce264d1dfba9741aL, 0xa9d1f2dc7436dc06L, + 0x70738016604c2a27L, 0x231d36e96e93f3d5L, 0x7666881197838d19L, 0x4a2a83090aaad40cL, + 0xf1e761591668b35dL, 0x7363236497f730a7L, 0x301080e37379dd4dL, 0x502dea2971827042L, + 0xc2c5eb858f32625fL, 0x786afb9edfafbdffL, 0xdaee0d868490b2a4L, 0x617366b3268609f6L, + 0xae0e35a0fe46173eL, 0xd1a07de93e824f11L, 0x079b8b115ea4cca8L, 0x93a99274558faebbL, + 0xfb1e6e22e08a03b3L, 0xea635fdba3698dd0L, 0xcf53659328503a5cL, 0xcde3b31e6fd5d780L, + 0x8e3e4221d3614413L, 0xef14d0d86bf1a22cL, 0xe1d830d3f16c5ddbL, 0xaabd2b2a451504e1L, + }; + // @formatter:on + + private static final long[] GEAR_LS = computeGearLs(); + + private static long[] computeGearLs() { + long[] gearLs = new long[GEAR.length]; + for (int i = 0; i < GEAR.length; i++) { + gearLs[i] = GEAR[i] << 1; + } + return gearLs; + } + + private final int minSize; + private final int maxSize; + private final int avgSize; + private final long maskS; + private final long maskL; + private final long maskSLs; + private final long maskLLs; + private final long seed; + private final long shiftedSeed; + private final DigestUtil digestUtil; + + public FastCDCChunker(DigestUtil digestUtil) { + this(ChunkingConfig.defaults(), digestUtil); + } + + public FastCDCChunker(ChunkingConfig config, DigestUtil digestUtil) { + this(config.minChunkSize(), config.avgChunkSize(), config.maxChunkSize(), + config.normalizationLevel(), Integer.toUnsignedLong(config.seed()), digestUtil); + } + + public FastCDCChunker( + int minSize, int avgSize, int maxSize, int normalization, long seed, + DigestUtil digestUtil) { + checkArgument(minSize > 0, "minSize must be positive"); + checkArgument(avgSize >= minSize, "avgSize must be >= minSize"); + checkArgument(maxSize >= avgSize, "maxSize must be >= avgSize"); + checkArgument((avgSize & (avgSize - 1)) == 0, "avgSize must be a power of 2, got %s", avgSize); + checkArgument(normalization >= 0 && normalization <= 3, "normalization must be 0-3"); + + this.minSize = minSize; + this.avgSize = avgSize; + this.maxSize = maxSize; + this.digestUtil = digestUtil; + + int bits = 31 - Integer.numberOfLeadingZeros(avgSize); + int smallBits = bits + normalization; + int largeBits = bits - normalization; + checkArgument(smallBits <= 25 && largeBits >= 5, "normalization level too extreme for avgSize"); + + this.maskS = MASKS[smallBits]; + this.maskL = MASKS[largeBits]; + this.maskSLs = this.maskS << 1; + this.maskLLs = this.maskL << 1; + + this.seed = seed; + this.shiftedSeed = seed << 1; + } + + /** + * Finds the next chunk boundary. + */ + private int cut(byte[] buf, int off, int len) { + if (len <= minSize) { + return len; + } + + int n = Math.min(len, maxSize); + int center = Math.min(n, avgSize); + + // Round down to even boundaries for 2-byte processing so we don't need to + // divide by 2 in the loop. + int minLimit = minSize & ~1; + int centerLimit = center & ~1; + int remainingLimit = n & ~1; + + long s = this.seed; + long sLs = this.shiftedSeed; + long hash = 0; + + // Below avgSize: use maskS to discourage early cuts (too small chunks) + for (int a = minLimit; a < centerLimit; a += 2) { + hash = (hash << 2) + (GEAR_LS[buf[off + a] & 0xFF] ^ sLs); + if ((hash & maskSLs) == 0) { + return a; + } + hash = hash + (GEAR[buf[off + a + 1] & 0xFF] ^ s); + if ((hash & maskS) == 0) { + return a + 1; + } + } + + // Above avgSize: use maskL to encourage cuts (too large chunks) + for (int a = centerLimit; a < remainingLimit; a += 2) { + hash = (hash << 2) + (GEAR_LS[buf[off + a] & 0xFF] ^ sLs); + if ((hash & maskLLs) == 0) { + return a; + } + hash = hash + (GEAR[buf[off + a + 1] & 0xFF] ^ s); + if ((hash & maskL) == 0) { + return a + 1; + } + } + + return n; + } + + /** + * Chunks a file and returns chunk digests. + * + *

+ * This method is used for building MerkleTree entries for large files. It + * returns the content digests in order for each chunk. + * + *

+ * Note: We don't need the raw data here. We can read from the original file + * (seekable) when uploading, similar to how whole blobs work. + */ + public List chunkToDigests(InputStream input) throws IOException { + List digests = new ArrayList<>(); + + byte[] buf = new byte[maxSize * 2]; + int cursor = 0; + int end = 0; + boolean eof = false; + + while (true) { + int available = end - cursor; + if (available < maxSize && !eof) { + if (cursor > 0 && available > 0) { + System.arraycopy(buf, cursor, buf, 0, available); + } + cursor = 0; + end = available; + + while (end < buf.length) { + int n = input.read(buf, end, buf.length - end); + if (n == -1) { + eof = true; + break; + } + end += n; + } + available = end - cursor; + } + + if (available == 0) { + break; + } + + int chunkLen = cut(buf, cursor, available); + digests.add(digestUtil.compute(buf, cursor, chunkLen)); + + cursor += chunkLen; + } + + return digests; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index f5160436969e40..7a25425ff8ee63 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -160,6 +161,26 @@ default ListenableFuture uploadBlob( return uploadBlob(context, digest, data::newInput); } + /** + * Registers a blob as the concatenation of the given chunks via SpliceBlob RPC. + * + *

This is used for CDC (Content-Defined Chunking) uploads. After uploading all chunks, + * SpliceBlob is called to register the blob with the given digest as the concatenation of + * the chunks. + * + * @param context the context for the action. + * @param blobDigest The digest of the complete blob. + * @param chunkDigests The digests of the chunks that make up the blob, in order. + * @return A future representing pending completion of the splice operation, or null if + * SpliceBlob is not supported by this cache client. + */ + default ListenableFuture spliceBlob( + RemoteActionExecutionContext context, + Digest blobDigest, + List chunkDigests) { + return null; + } + /** Close resources associated with the remote cache. */ void close(); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java index ce1d0fe77b3cfa..875b17e505d0b2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java @@ -70,6 +70,10 @@ protected LoggingHandler selectHandler(MethodDescriptor } else if (method == ContentAddressableStorageGrpc.getFindMissingBlobsMethod()) { return new FindMissingBlobsHandler(); // + } else if (method == ContentAddressableStorageGrpc.getSplitBlobMethod()) { + return new SplitBlobHandler(); // + } else if (method == ContentAddressableStorageGrpc.getSpliceBlobMethod()) { + return new SpliceBlobHandler(); // } else if (method == ByteStreamGrpc.getReadMethod()) { return new ReadHandler(); // } else if (method == ByteStreamGrpc.getWriteMethod()) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/SpliceBlobHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/SpliceBlobHandler.java new file mode 100644 index 00000000000000..d5ff02b78b287a --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/SpliceBlobHandler.java @@ -0,0 +1,44 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.logging; + +import build.bazel.remote.execution.v2.SpliceBlobRequest; +import build.bazel.remote.execution.v2.SpliceBlobResponse; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.SpliceBlobDetails; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; + +/** + * LoggingHandler for {@link + * build.bazel.remote.execution.v2.ContentAddressableStorage.SpliceBlob} gRPC call. + */ +public class SpliceBlobHandler implements LoggingHandler { + + private final SpliceBlobDetails.Builder builder = SpliceBlobDetails.newBuilder(); + + @Override + public void handleReq(SpliceBlobRequest message) { + builder.setRequest(message); + } + + @Override + public void handleResp(SpliceBlobResponse message) { + builder.setResponse(message); + } + + @Override + public RpcCallDetails getDetails() { + return RpcCallDetails.newBuilder().setSpliceBlob(builder).build(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/SplitBlobHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/SplitBlobHandler.java new file mode 100644 index 00000000000000..088437b06abddb --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/logging/SplitBlobHandler.java @@ -0,0 +1,45 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote.logging; + +import build.bazel.remote.execution.v2.SplitBlobRequest; +import build.bazel.remote.execution.v2.SplitBlobResponse; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.SplitBlobDetails; +import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails; + +/** + * LoggingHandler for {@link + * build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob} gRPC call. + */ +public class SplitBlobHandler + implements LoggingHandler { + + private final SplitBlobDetails.Builder builder = SplitBlobDetails.newBuilder(); + + @Override + public void handleReq(SplitBlobRequest message) { + builder.setRequest(message); + } + + @Override + public void handleResp(SplitBlobResponse message) { + builder.setResponse(message); + } + + @Override + public RpcCallDetails getDetails() { + return RpcCallDetails.newBuilder().setSplitBlob(builder).build(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java index ea1b1992f70b38..46fcb99378703b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java @@ -723,6 +723,19 @@ public RemoteOutputsStrategyConverter() { + " output prefixes).") public Scrubber scrubber; + @Option( + name = "experimental_remote_cache_chunking", + defaultValue = "false", + documentationCategory = OptionDocumentationCategory.REMOTE, + metadataTags = OptionMetadataTag.EXPERIMENTAL, + effectTags = {OptionEffectTag.UNKNOWN}, + help = + "If enabled, large blobs are split into content-defined chunks using FastCDC 2020 and " + + "uploaded/downloaded in chunks, enabling deduplication across blobs. The server " + + "must advertise SplitBlob/SpliceBlob RPCs and FastCDC 2020 parameters in its " + + "capabilities.") + public boolean experimentalRemoteCacheChunking; + @Option( name = "experimental_throttle_remote_action_building", defaultValue = "true", diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java index 5c1f2b3c644bd3..978e9c077e2ba2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java @@ -69,6 +69,18 @@ public Digest compute(byte[] blob) { return buildDigest(hashFn.getHashFunction().hashBytes(blob).toString(), blob.length); } + /** + * Computes a digest for a portion of a byte array. This is useful for uploading + * an individual chunk from a larger file. + * + * @param data the byte array + * @param offset the start offset in the array + * @param length the number of bytes to hash + */ + public Digest compute(byte[] data, int offset, int length) { + return buildDigest(hashFn.getHashFunction().hashBytes(data, offset, length).toString(), length); + } + /** * Computes a digest for a file. * diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto index b3f265aaca959d..7518e4dfd44586 100644 --- a/src/main/protobuf/remote_execution_log.proto +++ b/src/main/protobuf/remote_execution_log.proto @@ -112,6 +112,32 @@ message FindMissingBlobsDetails { build.bazel.remote.execution.v2.FindMissingBlobsResponse response = 2; } +// Details for a call to +// build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob. +message SplitBlobDetails { + // The build.bazel.remote.execution.v2.SplitBlobRequest request + // sent. + build.bazel.remote.execution.v2.SplitBlobRequest request = 1; + + // The build.bazel.remote.execution.v2.SplitBlobResponse + // received. + build.bazel.remote.execution.v2.SplitBlobResponse response = 2; +} + + +// Details for a call to +// build.bazel.remote.execution.v2.ContentAddressableStorage.SpliceBlob. +message SpliceBlobDetails { + // The build.bazel.remote.execution.v2.SpliceBlobRequest request + // sent. + build.bazel.remote.execution.v2.SpliceBlobRequest request = 1; + + // The build.bazel.remote.execution.v2.SpliceBlobResponse + // received. + build.bazel.remote.execution.v2.SpliceBlobResponse response = 2; +} + + // Details for a call to google.bytestream.Read. message ReadDetails { // The google.bytestream.ReadRequest sent. @@ -178,5 +204,7 @@ message RpcCallDetails { QueryWriteStatusDetails query_write_status = 14; GetCapabilitiesDetails get_capabilities = 12; UpdateActionResultDetails update_action_result = 13; + SplitBlobDetails split_blob = 15; + SpliceBlobDetails splice_blob = 16; } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 82a1d58547edde..5a515352abad4e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -10,6 +10,7 @@ filegroup( name = "srcs", testonly = 0, srcs = glob(["**"]) + [ + "//src/test/java/com/google/devtools/build/lib/remote/chunking:srcs", "//src/test/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs", "//src/test/java/com/google/devtools/build/lib/remote/common:srcs", "//src/test/java/com/google/devtools/build/lib/remote/disk:srcs", @@ -63,6 +64,8 @@ java_library( "RemoteActionFileSystemTestBase.java", "BuildWithoutTheBytesIntegrationTest.java", "BuildWithoutTheBytesIntegrationTestBase.java", + "ChunkedCacheIntegrationTest.java", + "ChunkedDiskCacheIntegrationTest.java", "DiskCacheIntegrationTest.java", ], ), @@ -116,6 +119,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/http", + "//src/main/java/com/google/devtools/build/lib/remote/chunking", "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", @@ -270,6 +274,58 @@ java_test( ], ) +java_test( + name = "ChunkedCacheIntegrationTest", + srcs = ["ChunkedCacheIntegrationTest.java"], + jvm_flags = ["-Djava.lang.Thread.allowVirtualThreads=true"], + tags = ["requires-network"], + runtime_deps = [ + "//third_party/grpc-java:grpc-jar", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/runtime", + "//src/main/java/com/google/devtools/build/lib/authandtls/credentialhelper:credential_module", + "//src/main/java/com/google/devtools/build/lib/remote", + "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/standalone", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/test/java/com/google/devtools/build/lib/buildtool/util", + "//src/test/java/com/google/devtools/build/lib/remote/util:integration_test_utils", + "//third_party:guava", + "//third_party:junit4", + "//third_party:truth", + "@googleapis//google/bytestream:bytestream_java_grpc", + "@googleapis//google/bytestream:bytestream_java_proto", + "@grpc-java//api", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) + +java_test( + name = "ChunkedDiskCacheIntegrationTest", + srcs = ["ChunkedDiskCacheIntegrationTest.java"], + jvm_flags = ["-Djava.lang.Thread.allowVirtualThreads=true"], + tags = ["requires-network"], + runtime_deps = [ + "//third_party/grpc-java:grpc-jar", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/runtime", + "//src/main/java/com/google/devtools/build/lib/authandtls/credentialhelper:credential_module", + "//src/main/java/com/google/devtools/build/lib/remote", + "//src/main/java/com/google/devtools/build/lib/standalone", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", + "//src/test/java/com/google/devtools/build/lib/buildtool/util", + "//src/test/java/com/google/devtools/build/lib/remote/util:integration_test_utils", + "//src/test/java/com/google/devtools/build/lib/testutil:TestUtils", + "//third_party:guava", + "//third_party:junit4", + "//third_party:truth", + ], +) + java_test( name = "DiskCacheIntegrationTest", srcs = ["DiskCacheIntegrationTest.java"], diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index 770df3ce036493..989fdee850a0d9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -531,7 +531,8 @@ private static CombinedCache newCombinedCache( CallCredentialsProvider.NO_CREDENTIALS, remoteOptions, retrier, - DIGEST_UTIL)); + DIGEST_UTIL, + /* chunkingConfig= */ null)); doAnswer( invocationOnMock -> missingDigestsFinder.findMissingDigests( diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java new file mode 100644 index 00000000000000..71a22092e9001b --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobDownloaderTest.java @@ -0,0 +1,180 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.SplitBlobResponse; +import com.google.common.util.concurrent.Futures; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.vfs.DigestHashFunction; +import com.google.devtools.build.lib.vfs.SyscallCache; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link ChunkedBlobDownloader}. */ +@RunWith(JUnit4.class) +public class ChunkedBlobDownloaderTest { + private static final DigestUtil DIGEST_UTIL = + new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256); + + @Mock private GrpcCacheClient grpcCacheClient; + @Mock private CombinedCache combinedCache; + @Mock private RemoteActionExecutionContext context; + + private ChunkedBlobDownloader downloader; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + downloader = new ChunkedBlobDownloader(grpcCacheClient, combinedCache); + } + + @Test + public void downloadChunked_splitBlobReturnsNull_throwsCacheNotFound() { + Digest blobDigest = DIGEST_UTIL.compute(new byte[]{1, 2, 3}); + when(grpcCacheClient.splitBlob(any(), eq(blobDigest))).thenReturn(null); + + assertThrows( + CacheNotFoundException.class, + () -> downloader.downloadChunked(context, blobDigest, new ByteArrayOutputStream())); + } + + @Test + public void downloadChunked_singleChunk_downloadsAndReassembles() throws Exception { + byte[] chunkData = new byte[]{1, 2, 3, 4, 5}; + Digest chunkDigest = DIGEST_UTIL.compute(chunkData); + Digest blobDigest = chunkDigest; + + SplitBlobResponse splitResponse = SplitBlobResponse.newBuilder() + .addChunkDigests(chunkDigest) + .build(); + when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) + .thenReturn(Futures.immediateFuture(splitResponse)); + when(combinedCache.downloadBlob(any(), eq(chunkDigest), any())) + .thenAnswer(invocation -> { + OutputStream out = invocation.getArgument(2); + out.write(chunkData); + return Futures.immediateFuture(null); + }); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + downloader.downloadChunked(context, blobDigest, out); + + assertThat(out.toByteArray()).isEqualTo(chunkData); + } + + @Test + public void downloadChunked_multipleChunks_downloadsAndReassemblesInOrder() throws Exception { + byte[] chunk1Data = new byte[]{1, 2, 3}; + byte[] chunk2Data = new byte[]{4, 5, 6}; + byte[] chunk3Data = new byte[]{7, 8, 9}; + Digest chunk1Digest = DIGEST_UTIL.compute(chunk1Data); + Digest chunk2Digest = DIGEST_UTIL.compute(chunk2Data); + Digest chunk3Digest = DIGEST_UTIL.compute(chunk3Data); + Digest blobDigest = DIGEST_UTIL.compute(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}); + + SplitBlobResponse splitResponse = SplitBlobResponse.newBuilder() + .addChunkDigests(chunk1Digest) + .addChunkDigests(chunk2Digest) + .addChunkDigests(chunk3Digest) + .build(); + when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) + .thenReturn(Futures.immediateFuture(splitResponse)); + when(combinedCache.downloadBlob(any(), eq(chunk1Digest), any())) + .thenAnswer(invocation -> { + OutputStream out = invocation.getArgument(2); + out.write(chunk1Data); + return Futures.immediateFuture(null); + }); + when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any())) + .thenAnswer(invocation -> { + OutputStream out = invocation.getArgument(2); + out.write(chunk2Data); + return Futures.immediateFuture(null); + }); + when(combinedCache.downloadBlob(any(), eq(chunk3Digest), any())) + .thenAnswer(invocation -> { + OutputStream out = invocation.getArgument(2); + out.write(chunk3Data); + return Futures.immediateFuture(null); + }); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + downloader.downloadChunked(context, blobDigest, out); + + assertThat(out.toByteArray()).isEqualTo(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}); + verify(combinedCache).downloadBlob(any(), eq(chunk1Digest), any()); + verify(combinedCache).downloadBlob(any(), eq(chunk2Digest), any()); + verify(combinedCache).downloadBlob(any(), eq(chunk3Digest), any()); + } + + @Test + public void downloadChunked_emptyChunkList_producesEmptyOutput() throws Exception { + Digest blobDigest = DIGEST_UTIL.compute(new byte[0]); + + SplitBlobResponse splitResponse = SplitBlobResponse.getDefaultInstance(); + when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) + .thenReturn(Futures.immediateFuture(splitResponse)); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + downloader.downloadChunked(context, blobDigest, out); + + assertThat(out.toByteArray()).isEmpty(); + } + + @Test + public void downloadChunked_chunkFailsAfterPartialWrite_throwsIOException() throws Exception { + byte[] chunk1Data = new byte[]{1, 2, 3}; + byte[] chunk2Data = new byte[]{4, 5, 6}; + Digest chunk1Digest = DIGEST_UTIL.compute(chunk1Data); + Digest chunk2Digest = DIGEST_UTIL.compute(chunk2Data); + Digest blobDigest = DIGEST_UTIL.compute(new byte[]{1, 2, 3, 4, 5, 6}); + + SplitBlobResponse splitResponse = SplitBlobResponse.newBuilder() + .addChunkDigests(chunk1Digest) + .addChunkDigests(chunk2Digest) + .build(); + when(grpcCacheClient.splitBlob(any(), eq(blobDigest))) + .thenReturn(Futures.immediateFuture(splitResponse)); + when(combinedCache.downloadBlob(any(), eq(chunk1Digest), any())) + .thenAnswer(invocation -> { + OutputStream out = invocation.getArgument(2); + out.write(chunk1Data); + return Futures.immediateFuture(null); + }); + when(combinedCache.downloadBlob(any(), eq(chunk2Digest), any())) + .thenReturn(Futures.immediateFailedFuture(new IOException("connection reset"))); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + assertThrows( + IOException.class, + () -> downloader.downloadChunked(context, blobDigest, out)); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobUploaderTest.java new file mode 100644 index 00000000000000..e566a4d4b35fc3 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkedBlobUploaderTest.java @@ -0,0 +1,202 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import build.bazel.remote.execution.v2.Digest; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.devtools.build.lib.clock.JavaClock; +import com.google.devtools.build.lib.remote.chunking.ChunkingConfig; +import com.google.devtools.build.lib.remote.chunking.FastCDCChunker; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.vfs.DigestHashFunction; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.SyscallCache; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link ChunkedBlobUploader}. */ +@RunWith(JUnit4.class) +public class ChunkedBlobUploaderTest { + private static final DigestUtil DIGEST_UTIL = new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256); + + @Mock + private GrpcCacheClient grpcCacheClient; + @Mock + private CombinedCache combinedCache; + @Mock + private RemoteActionExecutionContext context; + + private FileSystem fs; + private Path execRoot; + private ChunkedBlobUploader uploader; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256); + execRoot = fs.getPath("/execroot"); + execRoot.createDirectoryAndParents(); + + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + uploader = new ChunkedBlobUploader(grpcCacheClient, combinedCache, config, DIGEST_UTIL); + } + + @Test + public void getChunkingThreshold_returnsConfiguredValue() { + ChunkingConfig config = new ChunkingConfig(512, 2, 0); + ChunkedBlobUploader uploader = + new ChunkedBlobUploader(grpcCacheClient, combinedCache, config, DIGEST_UTIL); + + assertThat(uploader.getChunkingThreshold()).isEqualTo(512 * 4); + } + + @Test + @SuppressWarnings("unchecked") + public void uploadChunked_allChunksMissing_uploadsAllChunks() throws Exception { + Path file = execRoot.getRelative("test.txt"); + byte[] data = new byte[8192]; + new Random(42).nextBytes(data); + writeFile(file, data); + Digest blobDigest = DIGEST_UTIL.compute(data); + + ArgumentCaptor> digestsCaptor = ArgumentCaptor.forClass(List.class); + when(grpcCacheClient.findMissingDigests(any(), digestsCaptor.capture())) + .thenAnswer(invocation -> { + List digests = invocation.getArgument(1); + return Futures.immediateFuture(ImmutableSet.copyOf(digests)); + }); + when(combinedCache.uploadBlob(any(), any(Digest.class), any())) + .thenReturn(Futures.immediateFuture(null)); + when(grpcCacheClient.spliceBlob(any(), any(), any())) + .thenReturn(Futures.immediateFuture(null)); + + uploader.uploadChunked(context, blobDigest, file); + + List chunkDigests = digestsCaptor.getValue(); + assertThat(chunkDigests.size()).isGreaterThan(1); + long totalSize = chunkDigests.stream().mapToLong(Digest::getSizeBytes).sum(); + assertThat(totalSize).isEqualTo(data.length); + } + + @Test + @SuppressWarnings("unchecked") + public void uploadChunked_noChunksMissing_skipsChunkUpload() throws Exception { + Path file = execRoot.getRelative("test.txt"); + byte[] data = new byte[8192]; + new Random(42).nextBytes(data); + writeFile(file, data); + Digest blobDigest = DIGEST_UTIL.compute(data); + + when(grpcCacheClient.findMissingDigests(any(), any())) + .thenReturn(Futures.immediateFuture(ImmutableSet.of())); + when(grpcCacheClient.spliceBlob(any(), any(), any())) + .thenReturn(Futures.immediateFuture(null)); + + uploader.uploadChunked(context, blobDigest, file); + + verify(combinedCache, never()).uploadBlob(any(), any(Digest.class), any()); + verify(grpcCacheClient).spliceBlob(any(), eq(blobDigest), any()); + } + + @Test + @SuppressWarnings("unchecked") + public void uploadChunked_someChunksMissing_uploadsOnlyMissingWithCorrectData() throws Exception { + Path file = execRoot.getRelative("test_partial.txt"); + byte[] fileData = new byte[16384]; + new Random(42).nextBytes(fileData); + writeFile(file, fileData); + Digest blobDigest = DIGEST_UTIL.compute(fileData); + + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker testChunker = new FastCDCChunker(config, DIGEST_UTIL); + List allChunkDigests; + try (InputStream input = file.getInputStream()) { + allChunkDigests = testChunker.chunkToDigests(input); + } + assertThat(allChunkDigests.size()).isAtLeast(5); + + Set digestsToReportMissing = new LinkedHashSet<>(); + for (int i = 0; i < allChunkDigests.size(); i++) { + boolean isFirst = i == 0; + boolean isLast = i == allChunkDigests.size() - 1; + boolean isOdd = i % 2 == 1; + if (isFirst || isLast || isOdd) { + digestsToReportMissing.add(allChunkDigests.get(i)); + } + } + + Map expectedChunkData = new LinkedHashMap<>(); + try (InputStream input = file.getInputStream()) { + for (Digest digest : allChunkDigests) { + byte[] chunkBytes = input.readNBytes((int) digest.getSizeBytes()); + if (digestsToReportMissing.contains(digest)) { + expectedChunkData.put(digest, ByteString.copyFrom(chunkBytes)); + } + } + } + + when(grpcCacheClient.findMissingDigests(any(), any())) + .thenReturn(Futures.immediateFuture(ImmutableSet.copyOf(digestsToReportMissing))); + Map actualUploads = new HashMap<>(); + when(combinedCache.uploadBlob(any(), any(Digest.class), any())) + .thenAnswer(invocation -> { + Digest d = invocation.getArgument(1); + ByteString bs = invocation.getArgument(2); + actualUploads.put(d, bs); + return Futures.immediateFuture(null); + }); + when(grpcCacheClient.spliceBlob(any(), any(), any())) + .thenReturn(Futures.immediateFuture(null)); + + uploader.uploadChunked(context, blobDigest, file); + + assertThat(actualUploads.keySet()).isEqualTo(expectedChunkData.keySet()); + for (Map.Entry entry : expectedChunkData.entrySet()) { + assertThat(actualUploads.get(entry.getKey())).isEqualTo(entry.getValue()); + } + verify(grpcCacheClient).spliceBlob(any(), eq(blobDigest), eq(allChunkDigests)); + } + + private void writeFile(Path path, byte[] data) throws IOException { + try (var out = path.getOutputStream()) { + out.write(data); + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkedCacheIntegrationTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkedCacheIntegrationTest.java new file mode 100644 index 00000000000000..3d4711d3497379 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkedCacheIntegrationTest.java @@ -0,0 +1,308 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.vfs.FileSystemUtils.readContent; +import static java.nio.charset.StandardCharsets.UTF_8; + +import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc; +import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.RequestMetadata; +import build.bazel.remote.execution.v2.SplitBlobRequest; +import build.bazel.remote.execution.v2.SplitBlobResponse; +import build.bazel.remote.execution.v2.ToolDetails; +import com.google.bytestream.ByteStreamGrpc; +import com.google.bytestream.ByteStreamProto.ReadRequest; +import com.google.bytestream.ByteStreamProto.ReadResponse; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.authandtls.credentialhelper.CredentialModule; +import com.google.devtools.build.lib.buildtool.util.BuildIntegrationTestCase; +import com.google.devtools.build.lib.remote.util.IntegrationTestUtils; +import com.google.devtools.build.lib.remote.util.IntegrationTestUtils.WorkerInstance; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; +import com.google.devtools.build.lib.runtime.BlazeModule; +import com.google.devtools.build.lib.runtime.BlazeRuntime; +import com.google.devtools.build.lib.runtime.BlockWaitingModule; +import com.google.devtools.build.lib.runtime.BuildSummaryStatsModule; +import com.google.devtools.build.lib.standalone.StandaloneModule; +import com.google.devtools.build.lib.vfs.Path; +import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for chunked remote cache using SplitBlob/SpliceBlob APIs. */ +@RunWith(JUnit4.class) +public class ChunkedCacheIntegrationTest extends BuildIntegrationTestCase { + @ClassRule @Rule public static final WorkerInstance worker = IntegrationTestUtils.createWorker(); + + @Override + protected void setupOptions() throws Exception { + super.setupOptions(); + addOptions( + "--remote_cache=grpc://localhost:" + worker.getPort(), + "--experimental_remote_cache_chunking"); + } + + @Override + protected BlazeRuntime.Builder getRuntimeBuilder() throws Exception { + return super.getRuntimeBuilder() + .addBlazeModule(new RemoteModule()) + .addBlazeModule(new BuildSummaryStatsModule()) + .addBlazeModule(new BlockWaitingModule()); + } + + @Override + protected ImmutableList getSpawnModules() { + return ImmutableList.builder() + .addAll(super.getSpawnModules()) + .add(new StandaloneModule()) + .add(new CredentialModule()) + .build(); + } + + @After + public void waitDownloads() throws Exception { + runtimeWrapper.newCommand(); + } + + private Path getOutputPath(String binRelativePath) { + return getTargetConfiguration().getBinDir().getRoot().getRelative(binRelativePath); + } + + private void cleanAndRestartServer() throws Exception { + getOutputBase().getRelative("action_cache").deleteTreesBelow(); + createRuntimeWrapper(); + } + + private byte[] readFileBytes(Path path) throws IOException { + try (InputStream in = path.getInputStream()) { + return ByteStreams.toByteArray(in); + } + } + + private Digest computeDigest(byte[] data) { + HashCode hash = Hashing.sha256().hashBytes(data); + return Digest.newBuilder().setHash(hash.toString()).setSizeBytes(data.length).build(); + } + + @Test + public void uploadAndDownloadLargeBlob_withChunking_succeeds() throws Exception { + write( + "BUILD", + """ + genrule( + name = "large_file", + srcs = [], + outs = ["large.txt"], + cmd = "dd if=/dev/zero bs=1M count=3 2>/dev/null | tr '\\\\0' 'a' > $@", + ) + """); + + buildTarget("//:large_file"); + + Path output = getOutputPath("large.txt"); + assertThat(output.exists()).isTrue(); + byte[] originalContent = readFileBytes(output); + assertThat(originalContent.length).isAtLeast(2 * 1024 * 1024); + + Digest blobDigest = computeDigest(originalContent); + + // Verify SplitBlob returns multiple chunks and each chunk is individually downloadable. + RequestMetadata metadata = + RequestMetadata.newBuilder() + .setCorrelatedInvocationsId("test-build-id") + .setToolInvocationId("test-command-id") + .setActionId("test-action-id") + .setToolDetails(ToolDetails.newBuilder().setToolName("bazel").setToolVersion("test")) + .build(); + ClientInterceptor interceptor = TracingMetadataUtils.attachMetadataInterceptor(metadata); + + ManagedChannel channel = + ManagedChannelBuilder.forAddress("localhost", worker.getPort()) + .usePlaintext() + .intercept(interceptor) + .build(); + try { + ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub casStub = + ContentAddressableStorageGrpc.newBlockingStub(channel); + + SplitBlobResponse splitResponse = + casStub.splitBlob(SplitBlobRequest.newBuilder().setBlobDigest(blobDigest).build()); + List chunkDigests = splitResponse.getChunkDigestsList(); + + assertThat(chunkDigests.size()).isGreaterThan(1); + long totalChunkSize = chunkDigests.stream().mapToLong(Digest::getSizeBytes).sum(); + assertThat(totalChunkSize).isEqualTo(originalContent.length); + + // Download each chunk individually and reassemble to verify integrity. + ByteStreamGrpc.ByteStreamBlockingStub bsStub = ByteStreamGrpc.newBlockingStub(channel); + ByteArrayOutputStream reassembled = new ByteArrayOutputStream(); + for (Digest chunkDigest : chunkDigests) { + String resourceName = + "blobs/" + chunkDigest.getHash() + "/" + chunkDigest.getSizeBytes(); + Iterator readIter = + bsStub.read(ReadRequest.newBuilder().setResourceName(resourceName).build()); + int chunkBytesRead = 0; + while (readIter.hasNext()) { + byte[] data = readIter.next().getData().toByteArray(); + reassembled.write(data); + chunkBytesRead += data.length; + } + assertThat(chunkBytesRead).isEqualTo((int) chunkDigest.getSizeBytes()); + } + assertThat(reassembled.toByteArray()).isEqualTo(originalContent); + } finally { + channel.shutdownNow(); + } + + // Delete output and action cache, then rebuild to exercise chunked download. + output.delete(); + assertThat(output.exists()).isFalse(); + cleanAndRestartServer(); + + buildTarget("//:large_file"); + + assertThat(output.exists()).isTrue(); + assertThat(readFileBytes(output)).isEqualTo(originalContent); + } + + @Test + public void multipleTargets_withChunking_allSucceed() throws Exception { + // Multiple large files built in parallel, with a downstream target that depends on them. + // Use deterministic content (filled with distinct byte patterns) so we can verify integrity. + write( + "BUILD", + """ + genrule( + name = "data_a", + srcs = [], + outs = ["a.bin"], + cmd = "dd if=/dev/zero bs=1M count=3 2>/dev/null | tr '\\\\0' 'A' > $@", + ) + genrule( + name = "data_b", + srcs = [], + outs = ["b.bin"], + cmd = "dd if=/dev/zero bs=1M count=4 2>/dev/null | tr '\\\\0' 'B' > $@", + ) + genrule( + name = "combined", + srcs = [":a.bin", ":b.bin"], + outs = ["combined.bin"], + cmd = "cat $(SRCS) > $@", + ) + """); + + buildTarget("//:combined"); + + Path outputA = getOutputPath("a.bin"); + Path outputB = getOutputPath("b.bin"); + Path outputCombined = getOutputPath("combined.bin"); + assertThat(outputA.exists()).isTrue(); + assertThat(outputB.exists()).isTrue(); + assertThat(outputCombined.exists()).isTrue(); + + byte[] contentA = readFileBytes(outputA); + byte[] contentB = readFileBytes(outputB); + byte[] contentCombined = readFileBytes(outputCombined); + assertThat(contentA.length).isEqualTo(3 * 1024 * 1024); + assertThat(contentB.length).isEqualTo(4 * 1024 * 1024); + assertThat(contentCombined.length).isEqualTo(7 * 1024 * 1024); + + // Clean and rebuild from cache. + outputA.delete(); + outputB.delete(); + outputCombined.delete(); + cleanAndRestartServer(); + + buildTarget("//:combined"); + + assertThat(readFileBytes(outputA)).isEqualTo(contentA); + assertThat(readFileBytes(outputB)).isEqualTo(contentB); + assertThat(readFileBytes(outputCombined)).isEqualTo(contentCombined); + } + + @Test + public void buildWithChunking_smallFile_succeeds() throws Exception { + write( + "BUILD", + """ + genrule( + name = "small_file", + srcs = [], + outs = ["small.txt"], + cmd = "echo 'hello world' > $@", + ) + """); + + buildTarget("//:small_file"); + + Path output = getOutputPath("small.txt"); + assertThat(output.exists()).isTrue(); + assertThat(readContent(output, UTF_8)).isEqualTo("hello world\n"); + } + + @Test + public void mixedSizes_largeAndSmallOutputs_allSucceed() throws Exception { + write( + "BUILD", + """ + genrule( + name = "large", + srcs = [], + outs = ["large.bin"], + cmd = "dd if=/dev/zero bs=1M count=3 2>/dev/null | tr '\\\\0' 'X' > $@", + ) + genrule( + name = "small", + srcs = [], + outs = ["small.txt"], + cmd = "echo 'small output' > $@", + ) + """); + + buildTarget("//:large", "//:small"); + + Path largePath = getOutputPath("large.bin"); + Path smallPath = getOutputPath("small.txt"); + byte[] largeContent = readFileBytes(largePath); + assertThat(largeContent.length).isEqualTo(3 * 1024 * 1024); + assertThat(readContent(smallPath, UTF_8)).isEqualTo("small output\n"); + + // Clean and rebuild. + largePath.delete(); + smallPath.delete(); + cleanAndRestartServer(); + + buildTarget("//:large", "//:small"); + + assertThat(readFileBytes(largePath)).isEqualTo(largeContent); + assertThat(readContent(smallPath, UTF_8)).isEqualTo("small output\n"); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkedDiskCacheIntegrationTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkedDiskCacheIntegrationTest.java new file mode 100644 index 00000000000000..851750d300813c --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkedDiskCacheIntegrationTest.java @@ -0,0 +1,219 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.testutil.TestUtils.tmpDirFile; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.authandtls.credentialhelper.CredentialModule; +import com.google.devtools.build.lib.buildtool.util.BuildIntegrationTestCase; +import com.google.devtools.build.lib.remote.util.IntegrationTestUtils; +import com.google.devtools.build.lib.remote.util.IntegrationTestUtils.WorkerInstance; +import com.google.devtools.build.lib.runtime.BlazeModule; +import com.google.devtools.build.lib.runtime.BlazeRuntime; +import com.google.devtools.build.lib.runtime.BlockWaitingModule; +import com.google.devtools.build.lib.runtime.BuildSummaryStatsModule; +import com.google.devtools.build.lib.standalone.StandaloneModule; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.PathFragment; +import java.io.IOException; +import java.io.InputStream; +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for chunked remote cache with a combined disk + remote cache. + * + *

Verifies that chunks downloaded from the remote cache are properly captured to disk cache, and + * that subsequent builds can serve chunks from disk cache without hitting the remote. + */ +@RunWith(JUnit4.class) +public class ChunkedDiskCacheIntegrationTest extends BuildIntegrationTestCase { + @ClassRule @Rule public static final WorkerInstance worker = IntegrationTestUtils.createWorker(); + + private static PathFragment getDiskCacheDir() { + return PathFragment.create(tmpDirFile().getAbsolutePath()).getRelative("chunked_disk_cache"); + } + + @Override + protected void setupOptions() throws Exception { + super.setupOptions(); + addOptions( + "--remote_cache=grpc://localhost:" + worker.getPort(), + "--disk_cache=" + getDiskCacheDir(), + "--experimental_remote_cache_chunking"); + } + + @Override + protected BlazeRuntime.Builder getRuntimeBuilder() throws Exception { + return super.getRuntimeBuilder() + .addBlazeModule(new RemoteModule()) + .addBlazeModule(new BuildSummaryStatsModule()) + .addBlazeModule(new BlockWaitingModule()); + } + + @Override + protected ImmutableList getSpawnModules() { + return ImmutableList.builder() + .addAll(super.getSpawnModules()) + .add(new StandaloneModule()) + .add(new CredentialModule()) + .build(); + } + + @After + public void tearDown() throws Exception { + runtimeWrapper.newCommand(); + getWorkspace().getFileSystem().getPath(getDiskCacheDir()).deleteTree(); + } + + private Path getOutputPath(String binRelativePath) { + return getTargetConfiguration().getBinDir().getRoot().getRelative(binRelativePath); + } + + private void cleanAndRestartServer() throws Exception { + getOutputBase().getRelative("action_cache").deleteTreesBelow(); + createRuntimeWrapper(); + } + + private byte[] readFileBytes(Path path) throws IOException { + try (InputStream in = path.getInputStream()) { + return ByteStreams.toByteArray(in); + } + } + + @Test + public void largeBlob_uploadedAndDownloaded_throughDiskAndRemoteCache() throws Exception { + write( + "BUILD", + """ + genrule( + name = "large_file", + srcs = [], + outs = ["large.bin"], + cmd = "dd if=/dev/zero bs=1M count=3 2>/dev/null | tr '\\\\0' 'D' > $@", + ) + """); + + // First build: generates the file, uploads chunks to remote + disk cache. + buildTarget("//:large_file"); + + Path output = getOutputPath("large.bin"); + assertThat(output.exists()).isTrue(); + byte[] originalContent = readFileBytes(output); + assertThat(originalContent.length).isEqualTo(3 * 1024 * 1024); + + // Second build: clean outputs + action cache, rebuild. + // Chunks should be served from disk cache (populated during first build's download capture). + output.delete(); + cleanAndRestartServer(); + + buildTarget("//:large_file"); + + assertThat(output.exists()).isTrue(); + assertThat(readFileBytes(output)).isEqualTo(originalContent); + } + + @Test + public void largeBlob_diskCasDeleted_rebuildFromRemote() throws Exception { + write( + "BUILD", + """ + genrule( + name = "large_file", + srcs = [], + outs = ["large.bin"], + cmd = "dd if=/dev/zero bs=1M count=3 2>/dev/null | tr '\\\\0' 'E' > $@", + ) + """); + + // First build: populates both caches. + buildTarget("//:large_file"); + + Path output = getOutputPath("large.bin"); + byte[] originalContent = readFileBytes(output); + + // Delete disk cache CAS entries (simulate cache eviction). + Path diskCasCas = getWorkspace().getFileSystem().getPath(getDiskCacheDir().getRelative("cas")); + if (diskCasCas.exists()) { + diskCasCas.deleteTree(); + } + + // Clean outputs + action cache, rebuild. + // Should fall back to remote cache since disk CAS is gone. + output.delete(); + cleanAndRestartServer(); + + buildTarget("//:large_file"); + + assertThat(output.exists()).isTrue(); + assertThat(readFileBytes(output)).isEqualTo(originalContent); + } + + @Test + public void multipleTargets_withDiskCache_allSucceed() throws Exception { + write( + "BUILD", + """ + genrule( + name = "data_a", + srcs = [], + outs = ["a.bin"], + cmd = "dd if=/dev/zero bs=1M count=3 2>/dev/null | tr '\\\\0' 'F' > $@", + ) + genrule( + name = "data_b", + srcs = [], + outs = ["b.bin"], + cmd = "dd if=/dev/zero bs=1M count=4 2>/dev/null | tr '\\\\0' 'G' > $@", + ) + genrule( + name = "combined", + srcs = [":a.bin", ":b.bin"], + outs = ["combined.bin"], + cmd = "cat $(SRCS) > $@", + ) + """); + + buildTarget("//:data_a", "//:data_b", "//:combined"); + + Path outputA = getOutputPath("a.bin"); + Path outputB = getOutputPath("b.bin"); + Path outputCombined = getOutputPath("combined.bin"); + byte[] contentA = readFileBytes(outputA); + byte[] contentB = readFileBytes(outputB); + byte[] contentCombined = readFileBytes(outputCombined); + assertThat(contentA.length).isEqualTo(3 * 1024 * 1024); + assertThat(contentB.length).isEqualTo(4 * 1024 * 1024); + assertThat(contentCombined.length).isEqualTo(7 * 1024 * 1024); + + // Clean and rebuild from cache. + outputA.delete(); + outputB.delete(); + outputCombined.delete(); + cleanAndRestartServer(); + + buildTarget("//:data_a", "//:data_b", "//:combined"); + + assertThat(readFileBytes(outputA)).isEqualTo(contentA); + assertThat(readFileBytes(outputB)).isEqualTo(contentB); + assertThat(readFileBytes(outputCombined)).isEqualTo(contentCombined); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 9cd74f405652fb..8395e1e4f2dee7 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -215,7 +215,8 @@ public int maxConcurrency() { }); channels.add(channel); return new GrpcCacheClient( - channel, callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL); + channel, callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL, + /* chunkingConfig= */ null); } private static byte[] downloadBlob( diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java index 0c6c0dd80e2a05..9e59d4ad62d0a8 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java @@ -333,7 +333,8 @@ public int maxConcurrency() { GoogleAuthUtils.newCallCredentialsProvider(null); GrpcCacheClient cacheProtocol = new GrpcCacheClient( - channel.retain(), callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL); + channel.retain(), callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL, + /* chunkingConfig= */ null); remoteCache = new RemoteExecutionCache( cacheProtocol, /* diskCacheClient= */ null, /* symlinkTemplate= */ null, DIGEST_UTIL); diff --git a/src/test/java/com/google/devtools/build/lib/remote/chunking/BUILD b/src/test/java/com/google/devtools/build/lib/remote/chunking/BUILD new file mode 100644 index 00000000000000..d100d9c64e7f09 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/chunking/BUILD @@ -0,0 +1,67 @@ +load("@rules_java//java:defs.bzl", "java_library", "java_test") +load("//src:java_opt_binary.bzl", "java_opt_binary") + +package( + default_applicable_licenses = ["//:license"], + default_testonly = 1, + default_visibility = ["//src:__subpackages__"], +) + +filegroup( + name = "srcs", + testonly = 0, + srcs = glob(["**"]), + visibility = ["//src:__subpackages__"], +) + +java_library( + name = "ChunkingTests_lib", + srcs = [ + "ChunkingConfigTest.java", + "FastCDCChunkerTest.java", + ], + data = [ + # Public domain image by Toriyama Sekien (1712-1788), + # first published in "Gazu Hyakki Yagyo" (1776). + # Source: https://commons.wikimedia.org/wiki/File:SekienAkashita.jpg + "SekienAkashita.jpg", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote/chunking", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//third_party:guava", + "//third_party:junit4", + "//third_party:truth", + "@rules_java//java/runfiles", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) + +java_test( + name = "ChunkingTests", + test_class = "com.google.devtools.build.lib.AllTests", + data = [ + "SekienAkashita.jpg", + ], + env = { + "SEKIEN_AKASHITA_PATH": "$(rlocationpath SekienAkashita.jpg)", + }, + runtime_deps = [ + ":ChunkingTests_lib", + "//src/test/java/com/google/devtools/build/lib:test_runner", + ], +) + +java_opt_binary( + name = "FastCDCBenchmark", + srcs = ["FastCDCBenchmark.java"], + main_class = "org.openjdk.jmh.Main", + deps = [ + "//src/main/java/com/google/devtools/build/lib/remote/chunking", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/java/com/google/devtools/build/lib/vfs/bazel", + "//third_party:jmh", + ], +) diff --git a/src/test/java/com/google/devtools/build/lib/remote/chunking/ChunkingConfigTest.java b/src/test/java/com/google/devtools/build/lib/remote/chunking/ChunkingConfigTest.java new file mode 100644 index 00000000000000..db5866603f292d --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/chunking/ChunkingConfigTest.java @@ -0,0 +1,138 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.chunking; + +import static com.google.common.truth.Truth.assertThat; + +import build.bazel.remote.execution.v2.CacheCapabilities; +import build.bazel.remote.execution.v2.FastCdc2020Params; +import build.bazel.remote.execution.v2.ServerCapabilities; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ChunkingConfig}. */ +@RunWith(JUnit4.class) +public class ChunkingConfigTest { + + @Test + public void defaults_returnsExpectedValues() { + ChunkingConfig config = ChunkingConfig.defaults(); + + assertThat(config.avgChunkSize()).isEqualTo(512 * 1024); + assertThat(config.normalizationLevel()).isEqualTo(2); + assertThat(config.seed()).isEqualTo(0); + assertThat(config.chunkingThreshold()).isEqualTo(512 * 1024 * 4); + } + + @Test + public void minChunkSize_returnsQuarterOfAvg() { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + + assertThat(config.minChunkSize()).isEqualTo(256); + } + + @Test + public void maxChunkSize_returnsFourTimesAvg() { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + + assertThat(config.maxChunkSize()).isEqualTo(4096); + } + + @Test + public void chunkingThreshold_equalsMaxChunkSize() { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + + assertThat(config.chunkingThreshold()).isEqualTo(config.maxChunkSize()); + } + + @Test + public void minAndMaxChunkSize_withDefaultConfig() { + ChunkingConfig config = ChunkingConfig.defaults(); + + assertThat(config.minChunkSize()).isEqualTo(128 * 1024); + assertThat(config.maxChunkSize()).isEqualTo(2048 * 1024); + } + + @Test + public void fromServerCapabilities_withoutCacheCapabilities_returnsNull() { + ServerCapabilities capabilities = ServerCapabilities.getDefaultInstance(); + + ChunkingConfig config = ChunkingConfig.fromServerCapabilities(capabilities); + + assertThat(config).isNull(); + } + + @Test + public void fromServerCapabilities_withoutFastCdcParams_returnsNull() { + ServerCapabilities capabilities = ServerCapabilities.newBuilder() + .setCacheCapabilities(CacheCapabilities.getDefaultInstance()) + .build(); + + ChunkingConfig config = ChunkingConfig.fromServerCapabilities(capabilities); + + assertThat(config).isNull(); + } + + @Test + public void fromServerCapabilities_withFastCdcParams_returnsConfig() { + ServerCapabilities capabilities = ServerCapabilities.newBuilder() + .setCacheCapabilities(CacheCapabilities.newBuilder() + .setFastCdc2020Params(FastCdc2020Params.newBuilder() + .setAvgChunkSizeBytes(256 * 1024) + .setSeed(42) + .build()) + .build()) + .build(); + + ChunkingConfig config = ChunkingConfig.fromServerCapabilities(capabilities); + + assertThat(config).isNotNull(); + assertThat(config.avgChunkSize()).isEqualTo(256 * 1024); + assertThat(config.seed()).isEqualTo(42); + assertThat(config.chunkingThreshold()).isEqualTo(256 * 1024 * 4); + } + + @Test + public void fromServerCapabilities_withDefaultFastCdcParams_returnsDefaults() { + ServerCapabilities capabilities = ServerCapabilities.newBuilder() + .setCacheCapabilities(CacheCapabilities.newBuilder() + .setFastCdc2020Params(FastCdc2020Params.newBuilder() + .setAvgChunkSizeBytes(512 * 1024) + .setSeed(0) + .build()) + .build()) + .build(); + + ChunkingConfig config = ChunkingConfig.fromServerCapabilities(capabilities); + + assertThat(config).isEqualTo(ChunkingConfig.defaults()); + } + + @Test + public void fromServerCapabilities_nonPowerOfTwoAvgSize_fallsBackToDefault() { + ServerCapabilities capabilities = ServerCapabilities.newBuilder() + .setCacheCapabilities(CacheCapabilities.newBuilder() + .setFastCdc2020Params(FastCdc2020Params.newBuilder() + .setAvgChunkSizeBytes(300 * 1024) + .build()) + .build()) + .build(); + + ChunkingConfig config = ChunkingConfig.fromServerCapabilities(capabilities); + + assertThat(config).isNotNull(); + assertThat(config.avgChunkSize()).isEqualTo(ChunkingConfig.DEFAULT_AVG_CHUNK_SIZE); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/chunking/FastCDCBenchmark.java b/src/test/java/com/google/devtools/build/lib/remote/chunking/FastCDCBenchmark.java new file mode 100644 index 00000000000000..717590da7b588e --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/chunking/FastCDCBenchmark.java @@ -0,0 +1,65 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.chunking; + +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.vfs.SyscallCache; +import com.google.devtools.build.lib.vfs.bazel.BazelHashFunctions; +import java.io.ByteArrayInputStream; +import java.security.SecureRandom; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode(Mode.Throughput) +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 3, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(3) +public class FastCDCBenchmark { + private static final int AVG_CHUNK_SIZE = 512 * 1024; + + @Param({"1048576", "8388608", "67108864"}) + public int size; + + private byte[] data; + private FastCDCChunker chunker; + + @Setup(Level.Iteration) + public void setup() { + BazelHashFunctions.ensureRegistered(); + data = new byte[size]; + new SecureRandom().nextBytes(data); + + DigestUtil digestUtil = + new DigestUtil(SyscallCache.NO_CACHE, BazelHashFunctions.BLAKE3); + int minSize = AVG_CHUNK_SIZE / 4; + int maxSize = AVG_CHUNK_SIZE * 4; + chunker = new FastCDCChunker(minSize, AVG_CHUNK_SIZE, maxSize, 2, 0, digestUtil); + } + + @Benchmark + public Object chunkToDigests() throws Exception { + return chunker.chunkToDigests(new ByteArrayInputStream(data)); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/chunking/FastCDCChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/chunking/FastCDCChunkerTest.java new file mode 100644 index 00000000000000..890339e6f1e600 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/chunking/FastCDCChunkerTest.java @@ -0,0 +1,271 @@ +// Copyright 2026 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.chunking; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import build.bazel.remote.execution.v2.Digest; +import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.vfs.DigestHashFunction; +import com.google.devtools.build.lib.vfs.SyscallCache; +import com.google.devtools.build.runfiles.Runfiles; +import com.google.common.hash.Hashing; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FastCDCChunker}. */ +@RunWith(JUnit4.class) +public class FastCDCChunkerTest { + private static final DigestUtil DIGEST_UTIL = + new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256); + + @Test + public void chunkToDigests_emptyInput_returnsEmptyList() throws IOException { + FastCDCChunker chunker = new FastCDCChunker(DIGEST_UTIL); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(new byte[0])); + + assertThat(digests).isEmpty(); + } + + @Test + public void chunkToDigests_smallInput_returnsSingleChunk() throws IOException { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker chunker = new FastCDCChunker(config, DIGEST_UTIL); + byte[] data = new byte[100]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests).hasSize(1); + assertThat(digests.get(0).getSizeBytes()).isEqualTo(100); + } + + @Test + public void chunkToDigests_dataAtMinSize_returnsSingleChunk() throws IOException { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker chunker = new FastCDCChunker(config, DIGEST_UTIL); + byte[] data = new byte[config.minChunkSize()]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests).hasSize(1); + assertThat(digests.get(0).getSizeBytes()).isEqualTo(config.minChunkSize()); + } + + @Test + public void chunkToDigests_largeInput_producesMultipleChunks() throws IOException { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker chunker = new FastCDCChunker(config, DIGEST_UTIL); + byte[] data = new byte[config.maxChunkSize() * 3]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests.size()).isGreaterThan(1); + long totalSize = digests.stream().mapToLong(Digest::getSizeBytes).sum(); + assertThat(totalSize).isEqualTo(data.length); + } + + @Test + public void chunkToDigests_sameInputProducesSameChunks() throws IOException { + FastCDCChunker chunker = new FastCDCChunker(DIGEST_UTIL); + byte[] data = new byte[2 * 1024 * 1024]; + new Random(123).nextBytes(data); + + List digests1 = chunker.chunkToDigests(new ByteArrayInputStream(data)); + List digests2 = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests1).isEqualTo(digests2); + } + + @Test + public void chunkToDigests_chunkSizesWithinBounds() throws IOException { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker chunker = new FastCDCChunker(config, DIGEST_UTIL); + byte[] data = new byte[config.maxChunkSize() * 10]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + for (int i = 0; i < digests.size() - 1; i++) { + long size = digests.get(i).getSizeBytes(); + assertThat(size).isAtLeast(config.minChunkSize()); + assertThat(size).isAtMost(config.maxChunkSize()); + } + } + + @Test + public void chunkToDigests_lastChunkCanBeSmallerThanMin() throws IOException { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker chunker = new FastCDCChunker(config, DIGEST_UTIL); + int dataSize = config.maxChunkSize() + config.minChunkSize() / 2; + byte[] data = new byte[dataSize]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests.size()).isAtLeast(1); + long totalSize = digests.stream().mapToLong(Digest::getSizeBytes).sum(); + assertThat(totalSize).isEqualTo(dataSize); + } + + @Test + public void chunkToDigests_digestsAreCorrect() throws IOException { + ChunkingConfig config = new ChunkingConfig(1024, 2, 0); + FastCDCChunker chunker = new FastCDCChunker(config, DIGEST_UTIL); + byte[] data = new byte[500]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests).hasSize(1); + Digest expected = DIGEST_UTIL.compute(data); + assertThat(digests.get(0)).isEqualTo(expected); + } + + @Test + public void constructor_invalidMinSize_throws() { + assertThrows( + IllegalArgumentException.class, + () -> new FastCDCChunker(0, 1024, 4096, 2, 0, DIGEST_UTIL)); + } + + @Test + public void constructor_avgSizeLessThanMinSize_throws() { + assertThrows( + IllegalArgumentException.class, + () -> new FastCDCChunker(1024, 512, 4096, 2, 0, DIGEST_UTIL)); + } + + @Test + public void constructor_maxSizeLessThanAvgSize_throws() { + assertThrows( + IllegalArgumentException.class, + () -> new FastCDCChunker(256, 1024, 512, 2, 0, DIGEST_UTIL)); + } + + @Test + public void constructor_avgSizeNotPowerOfTwo_throws() { + assertThrows( + IllegalArgumentException.class, + () -> new FastCDCChunker(256, 1000, 4096, 2, 0, DIGEST_UTIL)); + } + + @Test + public void constructor_invalidNormalization_throws() { + assertThrows( + IllegalArgumentException.class, + () -> new FastCDCChunker(256, 1024, 4096, 4, 0, DIGEST_UTIL)); + } + + @Test + public void chunkToDigests_withDefaultConfig() throws IOException { + FastCDCChunker chunker = new FastCDCChunker(DIGEST_UTIL); + byte[] data = new byte[4 * 1024 * 1024]; + new Random(42).nextBytes(data); + + List digests = chunker.chunkToDigests(new ByteArrayInputStream(data)); + + assertThat(digests.size()).isGreaterThan(1); + long totalSize = digests.stream().mapToLong(Digest::getSizeBytes).sum(); + assertThat(totalSize).isEqualTo(data.length); + } + + @Test + public void chunkToDigests_testVectorsSeed0() throws Exception { + verifyTestVectors(0, new int[][] { + {0, 19186}, + {19186, 19279}, + {38465, 17354}, + {55819, 16387}, + {72206, 19940}, + {92146, 17320}, + }, new String[] { + "0f9efa589121d5d9e9e2c4ace91337d77cae866537143f6f15a0ffd525a77c2d", + "c7c86a165573c16448cda35c9169742e85645af42be22889f8b96b8ee0ec7cb0", + "bc88521e28a8b4479cdea5f75aa721a24f3a0a7d0be903aa6d505c574e51e89d", + "4b8dac2652e4685c629d2bb1ae9d4448e676b86f2e67ca0b2fff3d9580184b79", + "c0a7062da6f2386c28e086ee0cedd5732252741269838773cff1ddb05b2df6ed", + "7fa5b12134dc75cd2ac8dc60d3a8f3c8d22f0ee9d4cf74a4aa937e2a0d2d79a5", + }); + } + + @Test + public void chunkToDigests_testVectorsSeed666() throws Exception { + verifyTestVectors(666, new int[][] { + {0, 17635}, + {17635, 17334}, + {34969, 19136}, + {54105, 17467}, + {71572, 23593}, + {95165, 14301}, + }, new String[] { + "cb3a9d80a3569772d4ed331ca37ab0c862c759897b890fc1aac90a4f2ea3a407", + "d758c6b7b0b7eef1e996f8ccd17de6c645360b03a26c35541e7581348ac08944", + "24846aefd89e510594bae3e9d7d5ea5012067601512610fed126a3c57ba993f5", + "efa785e1fefb49f190e665f72fd246c1442079874508c312196da1fb3040d00b", + "a2f557bdd8d40d8faada963ad5f91ec54b10ccee7c5ae72754a65137592dc607", + "e131100b4a7147ccad19dc63c4a2fac1f5d8b644e1373eeb6803825024234efc", + }); + } + + // Test vectors from the Remote Execution API specification: + // https://github.com/bazelbuild/remote-apis/blob/v2.12.0/build/bazel/remote/execution/v2/fastcdc2020_test_vectors.txt + // Test image: "Akashita" by Toriyama Sekien (1712-1788), public domain. + // Source: https://commons.wikimedia.org/wiki/File:SekienAkashita.jpg + private void verifyTestVectors(long seed, int[][] expectedChunks, String[] expectedHashes) + throws Exception { + Runfiles runfiles = Runfiles.create(); + String rlocationPath = System.getenv("SEKIEN_AKASHITA_PATH"); + Path testVectorPath = Path.of(runfiles.rlocation(rlocationPath)); + byte[] fileData = Files.readAllBytes(testVectorPath); + + FastCDCChunker chunker = new FastCDCChunker(4096, 16384, 65535, 2, seed, DIGEST_UTIL); + List digests; + try (InputStream input = new ByteArrayInputStream(fileData)) { + digests = chunker.chunkToDigests(input); + } + + assertThat(digests).hasSize(expectedChunks.length); + + List actualChunks = new ArrayList<>(); + int offset = 0; + for (Digest digest : digests) { + actualChunks.add(new int[] {offset, (int) digest.getSizeBytes()}); + offset += digest.getSizeBytes(); + } + + for (int i = 0; i < expectedChunks.length; i++) { + assertThat(actualChunks.get(i)[0]).isEqualTo(expectedChunks[i][0]); + assertThat(actualChunks.get(i)[1]).isEqualTo(expectedChunks[i][1]); + + byte[] chunkData = new byte[expectedChunks[i][1]]; + System.arraycopy(fileData, expectedChunks[i][0], chunkData, 0, chunkData.length); + String chunkHash = Hashing.sha256().hashBytes(chunkData).toString(); + assertThat(chunkHash).isEqualTo(expectedHashes[i]); + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/chunking/SekienAkashita.jpg b/src/test/java/com/google/devtools/build/lib/remote/chunking/SekienAkashita.jpg new file mode 100644 index 00000000000000..71b09702c447a3 Binary files /dev/null and b/src/test/java/com/google/devtools/build/lib/remote/chunking/SekienAkashita.jpg differ diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CapabilitiesServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CapabilitiesServer.java index 944fdb222f3bfa..4add0507553f17 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CapabilitiesServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CapabilitiesServer.java @@ -19,6 +19,7 @@ import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesImplBase; import build.bazel.remote.execution.v2.DigestFunction; import build.bazel.remote.execution.v2.ExecutionCapabilities; +import build.bazel.remote.execution.v2.FastCdc2020Params; import build.bazel.remote.execution.v2.GetCapabilitiesRequest; import build.bazel.remote.execution.v2.ServerCapabilities; import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy; @@ -62,6 +63,12 @@ public void getCapabilities( .setActionCacheUpdateCapabilities( ActionCacheUpdateCapabilities.newBuilder().setUpdateEnabled(true).build()) .setMaxBatchTotalSizeBytes(CasServer.MAX_BATCH_SIZE_BYTES) + .setSplitBlobSupport(true) + .setSpliceBlobSupport(true) + .setFastCdc2020Params(FastCdc2020Params.newBuilder() + .setAvgChunkSizeBytes(512 * 1024) + .setSeed(0) + .build()) .build()); if (execEnabled) { response.setExecutionCapabilities( diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java index 1fdf95de5710d7..8cdf0c47c6d52c 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java @@ -18,6 +18,7 @@ import build.bazel.remote.execution.v2.BatchUpdateBlobsRequest; import build.bazel.remote.execution.v2.BatchUpdateBlobsResponse; +import build.bazel.remote.execution.v2.ChunkingFunction; import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; @@ -27,6 +28,10 @@ import build.bazel.remote.execution.v2.GetTreeRequest; import build.bazel.remote.execution.v2.GetTreeResponse; import build.bazel.remote.execution.v2.RequestMetadata; +import build.bazel.remote.execution.v2.SpliceBlobRequest; +import build.bazel.remote.execution.v2.SpliceBlobResponse; +import build.bazel.remote.execution.v2.SplitBlobRequest; +import build.bazel.remote.execution.v2.SplitBlobResponse; import com.google.common.flogger.GoogleLogger; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; @@ -34,17 +39,24 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; import io.grpc.stub.StreamObserver; +import com.google.devtools.build.lib.remote.util.DigestOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** A basic implementation of a {@link ContentAddressableStorageImplBase} service. */ final class CasServer extends ContentAddressableStorageImplBase { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); static final long MAX_BATCH_SIZE_BYTES = 1024 * 1024 * 4; private final OnDiskBlobStoreCache cache; + private final Map> splicedBlobs = new ConcurrentHashMap<>(); public CasServer(OnDiskBlobStoreCache cache) { this.cache = cache; @@ -145,4 +157,79 @@ public void getTree(GetTreeRequest request, StreamObserver resp responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } + + /** + * Returns the chunk digests for a blob that was previously stored via spliceBlob. + * Clients use this to download large blobs in smaller pieces. + */ + @Override + public void splitBlob( + SplitBlobRequest request, StreamObserver responseObserver) { + Digest blobDigest = request.getBlobDigest(); + + List chunkDigests = splicedBlobs.get(blobDigest); + if (chunkDigests == null) { + responseObserver.onError(StatusUtils.notFoundError(blobDigest)); + return; + } + responseObserver.onNext( + SplitBlobResponse.newBuilder() + .addAllChunkDigests(chunkDigests) + .setChunkingFunction(ChunkingFunction.Value.FAST_CDC_2020) + .build()); + responseObserver.onCompleted(); + } + + /** + * Stores a mapping from a blob digest to the list of chunk digests that compose it. + * + *

All chunks must already exist in the CAS. The concatenated chunks are verified + * to match the expected blob digest before storing the mapping. + */ + @Override + public void spliceBlob( + SpliceBlobRequest request, StreamObserver responseObserver) { + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + RemoteActionExecutionContext context = RemoteActionExecutionContext.create(meta); + + Digest blobDigest = request.getBlobDigest(); + List chunkDigests = request.getChunkDigestsList(); + + try { + // Verify all chunks exist in the cache. + for (Digest chunkDigest : chunkDigests) { + if (!cache.refresh(chunkDigest)) { + responseObserver.onError(StatusUtils.notFoundError(chunkDigest)); + return; + } + } + + DigestOutputStream digestOut = + cache.getDigestUtil().newDigestOutputStream(OutputStream.nullOutputStream()); + for (Digest chunkDigest : chunkDigests) { + byte[] chunkData = getFromFuture(cache.downloadBlob(context, chunkDigest)); + digestOut.write(chunkData); + } + Digest computedDigest = digestOut.digest(); + if (!computedDigest.equals(blobDigest)) { + String err = "Splice digest " + blobDigest + " did not match computed digest: " + computedDigest; + responseObserver.onError(StatusUtils.invalidArgumentError("blob_digest", err)); + return; + } + + // Record the blob-to-chunks mapping for splitBlob lookups. + splicedBlobs.put(blobDigest, new ArrayList<>(chunkDigests)); + + responseObserver.onNext( + SpliceBlobResponse.newBuilder().setBlobDigest(blobDigest).build()); + responseObserver.onCompleted(); + } catch (CacheNotFoundException e) { + responseObserver.onError(StatusUtils.notFoundError(e.getMissingDigest())); + } catch (InterruptedException e) { + responseObserver.onError(StatusUtils.interruptedError(blobDigest)); + } catch (Exception e) { + logger.atWarning().withCause(e).log("SpliceBlob request failed"); + responseObserver.onError(StatusUtils.internalError(e)); + } + } }