diff --git a/util/contentutil/buffer.go b/util/contentutil/buffer.go index 518542339dbb..1627f90aada8 100644 --- a/util/contentutil/buffer.go +++ b/util/contentutil/buffer.go @@ -19,6 +19,7 @@ import ( type Buffer interface { content.Provider content.Ingester + content.IngestManager content.Manager } @@ -119,6 +120,21 @@ func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content }, nil } +func (b *buffer) Status(ctx context.Context, ref string) (content.Status, error) { + return content.Status{}, cerrdefs.ErrNotFound +} + +func (b *buffer) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { + return nil, nil +} + +func (b *buffer) Abort(ctx context.Context, ref string) error { + b.mu.Lock() + delete(b.refs, ref) + b.mu.Unlock() + return nil +} + func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { r, err := b.getBytesReader(desc.Digest) if err != nil { diff --git a/util/contentutil/cache.go b/util/contentutil/cache.go index 286bcab880da..dcb7b77a7735 100644 --- a/util/contentutil/cache.go +++ b/util/contentutil/cache.go @@ -52,11 +52,28 @@ func (p *ReferrersProviderBuffer) ReaderAt(ctx context.Context, desc ocispecs.De } return nil, err } + if st, err := cw.Status(); err == nil { + if st.Offset > 0 { + if err := cw.Truncate(0); err != nil { + cw.Close() + return nil, err + } + } + } + abort := func() { + _ = p.cache.Abort(ctx, desc.Digest.String()) + } + defer func() { + if abort != nil { + abort() + } + }() ra, err := p.p.ReaderAt(ctx, desc) if err != nil { cw.Close() return nil, err } + defer ra.Close() if err := content.CopyReaderAt(cw, ra, ra.Size()); err != nil { cw.Close() return nil, err @@ -65,6 +82,7 @@ func (p *ReferrersProviderBuffer) ReaderAt(ctx context.Context, desc ocispecs.De cw.Close() return nil, err } + abort = nil ra, err = p.cache.ReaderAt(ctx, desc) if err != nil { return nil, err diff --git a/util/contentutil/cache_test.go b/util/contentutil/cache_test.go index c4b84508ce81..68551f672dd8 100644 --- a/util/contentutil/cache_test.go +++ b/util/contentutil/cache_test.go @@ -4,10 +4,12 @@ import ( "bytes" "context" "encoding/json" + "io" "testing" "github.com/containerd/containerd/v2/core/content" "github.com/containerd/containerd/v2/core/remotes" + "github.com/containerd/containerd/v2/plugins/content/local" digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" @@ -15,18 +17,6 @@ import ( "github.com/stretchr/testify/require" ) -type buf struct { - *bytes.Reader -} - -func (r *buf) Close() error { return nil } - -func newBuf(b []byte) *buf { - return &buf{ - Reader: bytes.NewReader(b), - } -} - type stubProvider struct { data map[digest.Digest][]byte calls int @@ -34,6 +24,10 @@ type stubProvider struct { refsCalls int } +func newStubProvider() *stubProvider { + return &stubProvider{} +} + func (p *stubProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { p.calls++ b, ok := p.data[desc.Digest] @@ -81,6 +75,18 @@ func (p *stubProvider) addReferrer(target digest.Digest, dt []byte) ocispecs.Des return desc } +type buf struct { + *bytes.Reader +} + +func (r *buf) Close() error { return nil } + +func newBuf(b []byte) *buf { + return &buf{ + Reader: bytes.NewReader(b), + } +} + func stubManifest(t *testing.T, name, artifactType string) []byte { manif := ocispecs.Manifest{ Versioned: specs.Versioned{ @@ -98,9 +104,9 @@ func stubManifest(t *testing.T, name, artifactType string) []byte { } func TestReferrersProviderBuffer(t *testing.T) { - ctx := context.TODO() + ctx := t.Context() buf := NewBuffer() - rp := &stubProvider{} + rp := newStubProvider() rootDigest := digest.FromString("root") rw, err := content.OpenWriter(ctx, buf) @@ -180,9 +186,9 @@ func TestReferrersProviderBuffer(t *testing.T) { } func TestReferrersProviderRefsBuffer(t *testing.T) { - ctx := context.TODO() + ctx := t.Context() buf := NewBuffer() - rp := &stubProvider{} + rp := newStubProvider() rootDigest := digest.FromString("root") rw, err := content.OpenWriter(ctx, buf) @@ -280,3 +286,125 @@ func TestReferrersProviderRefsBuffer(t *testing.T) { require.True(t, ok) require.Equal(t, "repo1", lbl1) } + +// TestReferrersProviderBufferCancelledReadDoesNotPoison verifies a cancelled or +// failed read does not poison later attempts for the same digest ref. +func TestReferrersProviderBufferCancelledReadDoesNotPoison(t *testing.T) { + ctx := t.Context() + store, err := local.NewStore(t.TempDir()) + require.NoError(t, err) + + data := []byte("0123456789abcdef") + desc := ocispecs.Descriptor{ + Digest: digest.FromBytes(data), + Size: int64(len(data)), + } + + rp := &cancelOnceProvider{data: data} + + rpb := ReferrersProviderWithBuffer(rp, store, "repo1") + + _, err = rpb.ReaderAt(ctx, desc) + require.ErrorIs(t, err, context.Canceled) + + ra, err := rpb.ReaderAt(ctx, desc) + require.NoError(t, err) + ra.Close() + + got, err := content.ReadBlob(ctx, store, desc) + require.NoError(t, err) + require.Equal(t, data, got) + require.Equal(t, 2, rp.calls) + require.Equal(t, 2, rp.closeCalls) +} + +// TestReferrersProviderBufferResumedIngestOffset verifies resumed ingests are +// reset before copying to avoid short-read failures on reused refs. +func TestReferrersProviderBufferResumedIngestOffset(t *testing.T) { + ctx := t.Context() + store, err := local.NewStore(t.TempDir()) + require.NoError(t, err) + + data := []byte("0123456789abcdef") + rp := newStubProvider() + desc := rp.add(data) + + // Simulate a pre-existing resumable ingest for the same ref with offset=size. + // This can happen when an earlier attempt closed an ingest without commit. + ref := desc.Digest.String() + cw, err := content.OpenWriter(ctx, store, content.WithRef(ref), content.WithDescriptor(desc)) + require.NoError(t, err) + _, err = cw.Write(data) + require.NoError(t, err) + require.NoError(t, cw.Close()) + + rpb := ReferrersProviderWithBuffer(rp, store, "repo1") + + ra, err := rpb.ReaderAt(ctx, desc) + require.NoError(t, err) + ra.Close() + + got, err := content.ReadBlob(ctx, store, desc) + require.NoError(t, err) + require.Equal(t, data, got) +} + +type sizedReadCloser struct { + io.ReaderAt + size int64 + closed *bool + onClose func() +} + +func (r *sizedReadCloser) Close() error { + if r.onClose != nil { + r.onClose() + } + if r.closed != nil { + *r.closed = true + } + return nil +} + +func (r *sizedReadCloser) Size() int64 { + return r.size +} + +// cancelOnceProvider simulates a provider whose first read is canceled and +// whose subsequent reads succeed with full content. +type cancelOnceProvider struct { + data []byte + calls int + closeCalls int +} + +func (p *cancelOnceProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) { + p.calls++ + if desc.Digest != digest.FromBytes(p.data) { + return nil, errors.Errorf("unexpected digest: %s", desc.Digest) + } + if p.calls == 1 { + return &sizedReadCloser{ + ReaderAt: errReaderAt{err: context.Canceled}, + size: int64(len(p.data)), + onClose: func() { p.closeCalls++ }, + }, nil + } + return &sizedReadCloser{ + ReaderAt: bytes.NewReader(p.data), + size: int64(len(p.data)), + onClose: func() { p.closeCalls++ }, + }, nil +} + +func (p *cancelOnceProvider) FetchReferrers(ctx context.Context, dgst digest.Digest, opts ...remotes.FetchReferrersOpt) ([]ocispecs.Descriptor, error) { + return nil, nil +} + +type errReaderAt struct { + err error +} + +func (r errReaderAt) ReadAt([]byte, int64) (int, error) { + return 0, r.err +}