Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_test(
name = "go_default_test",
srcs = [
"grpc_asset_test.go",
"grpc_cas_spliceblobs_test.go",
"grpc_test.go",
"http_test.go",
],
Expand Down
3 changes: 2 additions & 1 deletion server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (
const grpcHealthServiceName = "/grpc.health.v1.Health/Check"

type grpcServer struct {
pb.UnimplementedContentAddressableStorageServer
cache disk.Cache
accessLogger cache.Logger
errorLogger cache.Logger
Expand Down Expand Up @@ -131,6 +130,8 @@ func (s *grpcServer) GetCapabilities(ctx context.Context,
SupportedCompressors: []pb.Compressor_Value{pb.Compressor_ZSTD},
SupportedBatchUpdateCompressors: []pb.Compressor_Value{pb.Compressor_ZSTD},
MaxCasBlobSizeBytes: s.maxCasBlobSizeBytes,
BlobSpliceSupport: true,
BlobSplitSupport: false,
},
LowApiVersion: &semver.SemVer{Major: int32(2)},
HighApiVersion: &semver.SemVer{Major: int32(2), Minor: int32(3)},
Expand Down
158 changes: 158 additions & 0 deletions server/grpc_cas.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
pb "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/execution/v2"

"github.com/buchgr/bazel-remote/v2/cache"
"github.com/buchgr/bazel-remote/v2/utils/validate"
)

var (
Expand Down Expand Up @@ -374,3 +375,160 @@ func (s *grpcServer) fillDirectories(ctx context.Context, resp *pb.GetTreeRespon

return nil
}

func (s *grpcServer) SpliceBlob(ctx context.Context, req *pb.SpliceBlobRequest) (*pb.SpliceBlobResponse, error) {

if req == nil {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with nil SpliceBlobRequest")
}

if req.BlobDigest == nil {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with nil SpliceBlobRequest.BlobDigest")
}

if req.BlobDigest.SizeBytes == 0 || req.BlobDigest.Hash == emptySha256 {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called to create the empty blob?")
}

if req.BlobDigest.SizeBytes < 0 {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with negative SpliceBlobRequest.BlobDigest.SizeBytes")
}

if s.maxCasBlobSizeBytes > 0 && req.BlobDigest.SizeBytes > s.maxCasBlobSizeBytes {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called to create blob with size %d, which is greater than the max configured blob size %d",
req.BlobDigest.SizeBytes, s.maxCasBlobSizeBytes)
}

if !validate.HashKeyRegex.MatchString(req.BlobDigest.Hash) {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with invalid SpliceBlobRequest.BlobDigest.Hash: %s",
req.BlobDigest.Hash)
}

if len(req.ChunkDigests) == 0 {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with no SpliceBlobRequest.ChunkDigests")
}

chunkTotal := int64(0)
for _, chunkDigest := range req.ChunkDigests {
if chunkDigest == nil {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with a nil value in SpliceBlobRequest.ChunkDigests")
}

if chunkDigest.SizeBytes < 0 {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with a negative Digest in SpliceBlobRequest.ChunkDigests")
}

if chunkDigest.SizeBytes == 0 || chunkDigest.Hash == emptySha256 {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with an empty blob in SpliceBlobRequest.ChunkDigests")
}

if !validate.HashKeyRegex.MatchString(chunkDigest.Hash) {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with an invalid digest in SpliceBlobRequest.ChunkDigests: %s/%d",
chunkDigest.Hash, chunkDigest.SizeBytes)
}

// chunkDigest.SizeBytes must be positive if we reached this point.
// Add it to chunkTotal (which then must be positive, unless there
// was an overflow).

chunkTotal += chunkDigest.SizeBytes

if chunkTotal <= 0 {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"Overflow in SpliceBlobRequest.ChunkDigests, does not match SpliceBlobRequest.BlobDigest.SizeBytes")
}
}

if chunkTotal != req.BlobDigest.SizeBytes {
return nil, grpc_status.Errorf(codes.InvalidArgument,
"SpliceBlob called with SpliceBlobRequest.ChunkDigests sizes sum to %d, but SpliceBlobRequest.BlobDigest.SizeBytes was %d",
chunkTotal, req.BlobDigest.SizeBytes)
}

pr, pw := io.Pipe()
writerResultChan := make(chan error, 1)

go func() {
defer pw.Close()

for _, chunkDigest := range req.ChunkDigests {
rc, _, err := s.cache.Get(ctx, cache.CAS, chunkDigest.Hash, chunkDigest.SizeBytes, 0)
if err != nil {
rc.Close()
writerResultChan <- grpc_status.Errorf(codes.Unknown,
"SpliceBlob failed to get chunk %s/%d: %s",
chunkDigest.Hash, chunkDigest.SizeBytes, err)
return
}

if rc == nil {
writerResultChan <- grpc_status.Errorf(codes.NotFound,
"SpliceBlob called with nonexistent blob: %s/%d",
chunkDigest.Hash, chunkDigest.SizeBytes)
return
}

// We can assume that the size returned by s.cache.Get equals chunkDigest.SizeBytes,
// because we checked that is was not -1 in the chunkTotal check performed earlier.

copiedBytes, err := io.Copy(pw, rc)
if err != nil {
rc.Close()
writerResultChan <- grpc_status.Errorf(codes.Unknown,
"SpliceBlob failed to copy chunk %s/%d: %s",
chunkDigest.Hash, chunkDigest.SizeBytes, err)
return
}

if copiedBytes != chunkDigest.SizeBytes {
rc.Close()
writerResultChan <- grpc_status.Errorf(codes.Unknown,
"SpliceBlob copied unpexpected number of bytes (%d) from chunk %s/%d",
copiedBytes, chunkDigest.Hash, chunkDigest.SizeBytes)
return
}

rc.Close()
}

writerResultChan <- nil
}()

err := s.cache.Put(ctx, cache.CAS, req.BlobDigest.Hash, req.BlobDigest.SizeBytes, pr)
if err != nil {

select {
case writerErr, ok := <-writerResultChan:
if ok && writerErr != nil {
// Return the more specific writerErr.
return nil, writerErr
}
default:
}

return nil, grpc_status.Errorf(codes.Unknown,
"Failed to splice blob %s/%d: %s",
req.BlobDigest.Hash, req.BlobDigest.SizeBytes, err)
}

resp := pb.SpliceBlobResponse{
BlobDigest: req.BlobDigest,
}

return &resp, nil
}

func (s *grpcServer) SplitBlob(ctx context.Context, req *pb.SplitBlobRequest) (*pb.SplitBlobResponse, error) {
return nil, grpc_status.Errorf(codes.Unimplemented, "method SplitBlob not implemented")
}
Loading
Loading