Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions util/contentutil/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type Buffer interface {
content.Provider
content.Ingester
content.IngestManager
content.Manager
}

Expand Down Expand Up @@ -119,6 +120,21 @@ func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content
}, nil
}

func (b *buffer) Status(ctx context.Context, ref string) (content.Status, error) {
return content.Status{}, cerrdefs.ErrNotFound
}

func (b *buffer) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) {
return nil, nil
}

func (b *buffer) Abort(ctx context.Context, ref string) error {
b.mu.Lock()
delete(b.refs, ref)
b.mu.Unlock()
return nil
}

func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
r, err := b.getBytesReader(desc.Digest)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions util/contentutil/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,28 @@ func (p *ReferrersProviderBuffer) ReaderAt(ctx context.Context, desc ocispecs.De
}
return nil, err
}
if st, err := cw.Status(); err == nil {
if st.Offset > 0 {
if err := cw.Truncate(0); err != nil {
cw.Close()
return nil, err
}
}
}
abort := func() {
_ = p.cache.Abort(ctx, desc.Digest.String())
}
defer func() {
if abort != nil {
abort()
}
}()
ra, err := p.p.ReaderAt(ctx, desc)
if err != nil {
cw.Close()
return nil, err
}
defer ra.Close()
if err := content.CopyReaderAt(cw, ra, ra.Size()); err != nil {
cw.Close()
return nil, err
Expand All @@ -65,6 +82,7 @@ func (p *ReferrersProviderBuffer) ReaderAt(ctx context.Context, desc ocispecs.De
cw.Close()
return nil, err
}
abort = nil
ra, err = p.cache.ReaderAt(ctx, desc)
if err != nil {
return nil, err
Expand Down
160 changes: 144 additions & 16 deletions util/contentutil/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,30 @@ import (
"bytes"
"context"
"encoding/json"
"io"
"testing"

"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/plugins/content/local"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

type buf struct {
*bytes.Reader
}

func (r *buf) Close() error { return nil }

func newBuf(b []byte) *buf {
return &buf{
Reader: bytes.NewReader(b),
}
}

type stubProvider struct {
data map[digest.Digest][]byte
calls int
refs map[digest.Digest][]ocispecs.Descriptor
refsCalls int
}

func newStubProvider() *stubProvider {
return &stubProvider{}
}

func (p *stubProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
p.calls++
b, ok := p.data[desc.Digest]
Expand Down Expand Up @@ -81,6 +75,18 @@ func (p *stubProvider) addReferrer(target digest.Digest, dt []byte) ocispecs.Des
return desc
}

type buf struct {
*bytes.Reader
}

func (r *buf) Close() error { return nil }

func newBuf(b []byte) *buf {
return &buf{
Reader: bytes.NewReader(b),
}
}

func stubManifest(t *testing.T, name, artifactType string) []byte {
manif := ocispecs.Manifest{
Versioned: specs.Versioned{
Expand All @@ -98,9 +104,9 @@ func stubManifest(t *testing.T, name, artifactType string) []byte {
}

func TestReferrersProviderBuffer(t *testing.T) {
ctx := context.TODO()
ctx := t.Context()
buf := NewBuffer()
rp := &stubProvider{}
rp := newStubProvider()

rootDigest := digest.FromString("root")
rw, err := content.OpenWriter(ctx, buf)
Expand Down Expand Up @@ -180,9 +186,9 @@ func TestReferrersProviderBuffer(t *testing.T) {
}

func TestReferrersProviderRefsBuffer(t *testing.T) {
ctx := context.TODO()
ctx := t.Context()
buf := NewBuffer()
rp := &stubProvider{}
rp := newStubProvider()

rootDigest := digest.FromString("root")
rw, err := content.OpenWriter(ctx, buf)
Expand Down Expand Up @@ -280,3 +286,125 @@ func TestReferrersProviderRefsBuffer(t *testing.T) {
require.True(t, ok)
require.Equal(t, "repo1", lbl1)
}

// TestReferrersProviderBufferCancelledReadDoesNotPoison verifies a cancelled or
// failed read does not poison later attempts for the same digest ref.
func TestReferrersProviderBufferCancelledReadDoesNotPoison(t *testing.T) {
ctx := t.Context()
store, err := local.NewStore(t.TempDir())
require.NoError(t, err)

data := []byte("0123456789abcdef")
desc := ocispecs.Descriptor{
Digest: digest.FromBytes(data),
Size: int64(len(data)),
}

rp := &cancelOnceProvider{data: data}

rpb := ReferrersProviderWithBuffer(rp, store, "repo1")

_, err = rpb.ReaderAt(ctx, desc)
require.ErrorIs(t, err, context.Canceled)

ra, err := rpb.ReaderAt(ctx, desc)
require.NoError(t, err)
ra.Close()

got, err := content.ReadBlob(ctx, store, desc)
require.NoError(t, err)
require.Equal(t, data, got)
require.Equal(t, 2, rp.calls)
require.Equal(t, 2, rp.closeCalls)
}

// TestReferrersProviderBufferResumedIngestOffset verifies resumed ingests are
// reset before copying to avoid short-read failures on reused refs.
func TestReferrersProviderBufferResumedIngestOffset(t *testing.T) {
ctx := t.Context()
store, err := local.NewStore(t.TempDir())
require.NoError(t, err)

data := []byte("0123456789abcdef")
rp := newStubProvider()
desc := rp.add(data)

// Simulate a pre-existing resumable ingest for the same ref with offset=size.
// This can happen when an earlier attempt closed an ingest without commit.
ref := desc.Digest.String()
cw, err := content.OpenWriter(ctx, store, content.WithRef(ref), content.WithDescriptor(desc))
require.NoError(t, err)
_, err = cw.Write(data)
require.NoError(t, err)
require.NoError(t, cw.Close())

rpb := ReferrersProviderWithBuffer(rp, store, "repo1")

ra, err := rpb.ReaderAt(ctx, desc)
require.NoError(t, err)
ra.Close()

got, err := content.ReadBlob(ctx, store, desc)
require.NoError(t, err)
require.Equal(t, data, got)
}

type sizedReadCloser struct {
io.ReaderAt
size int64
closed *bool
onClose func()
}

func (r *sizedReadCloser) Close() error {
if r.onClose != nil {
r.onClose()
}
if r.closed != nil {
*r.closed = true
}
return nil
}

func (r *sizedReadCloser) Size() int64 {
return r.size
}

// cancelOnceProvider simulates a provider whose first read is canceled and
// whose subsequent reads succeed with full content.
type cancelOnceProvider struct {
data []byte
calls int
closeCalls int
}

func (p *cancelOnceProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
p.calls++
if desc.Digest != digest.FromBytes(p.data) {
return nil, errors.Errorf("unexpected digest: %s", desc.Digest)
}
if p.calls == 1 {
return &sizedReadCloser{
ReaderAt: errReaderAt{err: context.Canceled},
size: int64(len(p.data)),
onClose: func() { p.closeCalls++ },
}, nil
}
return &sizedReadCloser{
ReaderAt: bytes.NewReader(p.data),
size: int64(len(p.data)),
onClose: func() { p.closeCalls++ },
}, nil
}

func (p *cancelOnceProvider) FetchReferrers(ctx context.Context, dgst digest.Digest, opts ...remotes.FetchReferrersOpt) ([]ocispecs.Descriptor, error) {
return nil, nil
}

type errReaderAt struct {
err error
}

func (r errReaderAt) ReadAt([]byte, int64) (int, error) {
return 0, r.err
}