diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index 11803780..4768fbce 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -7,11 +7,19 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2/index" "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-varint" ) +// copy of traversal.PathState +type PathState interface { + AddPath(path []datamodel.PathSegment, link datamodel.Link, atOffset uint64) + GetLinks(root datamodel.Path) []datamodel.Link + GetOffsetAfter(root datamodel.Path) (uint64, error) +} + // indexingWriter wraps an io.Writer with metadata of the index of the car written to it. type indexingWriter struct { w io.Writer @@ -54,7 +62,7 @@ var _ IndexTracker = (*indexingWriter)(nil) type writingReader struct { r io.Reader buf []byte - cid string + cid cid.Cid wo *indexingWriter } @@ -68,7 +76,7 @@ func (w *writingReader) Read(p []byte) (int, error) { return 0, err } // write the cid - if _, err := buf.Write([]byte(w.cid)); err != nil { + if _, err := buf.Write(w.cid.Bytes()); err != nil { return 0, err } // write the block @@ -76,15 +84,15 @@ func (w *writingReader) Read(p []byte) (int, error) { if err != nil { return 0, err } - sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid))) + sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid.Bytes()))) writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):] - w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):] + w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid.Bytes()):] _ = copy(writeBuf[:], sizeBytes) - size := len(writeBuf) + size := uint64(len(writeBuf)) if w.wo.toSkip > 0 { - if w.wo.toSkip >= uint64(len(writeBuf)) { - w.wo.toSkip -= uint64(len(writeBuf)) + if w.wo.toSkip >= size { + w.wo.toSkip -= size writeBuf = []byte{} } else { writeBuf = writeBuf[w.wo.toSkip:] @@ -92,18 +100,19 @@ func (w *writingReader) Read(p []byte) (int, error) { } } - if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil { - return 0, err - } - _, c, err := cid.CidFromBytes([]byte(w.cid)) - if err != nil { - return 0, err - } - w.wo.rcrds[c] = index.Record{ - Cid: c, - Offset: w.wo.size, + // we haven't indexed this cid in this session + if _, ok := w.wo.rcrds[w.cid]; !ok { + if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil { + return 0, err + } + + w.wo.rcrds[w.cid] = index.Record{ + Cid: w.cid, + Offset: w.wo.size, + } } - w.wo.size += uint64(size) + + w.wo.size += size w.wo = nil } @@ -125,7 +134,15 @@ func (w *writingReader) Read(p []byte) (int, error) { // The `initialOffset` is used to calculate the offsets recorded for the index, and will be // included in the `.Size()` of the IndexTracker. // An indexCodec of `index.CarIndexNoIndex` can be used to not track these offsets. -func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, skip uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) { +func TeeingLinkSystem( + ls ipld.LinkSystem, + w io.Writer, + pathState PathState, + initialOffset uint64, + skip uint64, + indexCodec multicodec.Code, +) (ipld.LinkSystem, IndexTracker) { + iw := indexingWriter{ w: w, size: initialOffset, @@ -141,17 +158,18 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ski return nil, err } - // if we've already read this cid in this session, don't re-write it. - if _, ok := iw.rcrds[c]; ok { - return ls.StorageReadOpener(lc, l) - } - r, err := ls.StorageReadOpener(lc, l) if err != nil { return nil, err } - return &writingReader{r, nil, l.Binary(), &iw}, nil + /* + offset, err := pathState.GetOffsetAfter(lc.LinkPath) + if err != nil { + //return nil, err + } + */ + return &writingReader{r, nil, c, &iw}, nil } return tls, &iw } diff --git a/v2/options.go b/v2/options.go index 4ccd289d..4df2185e 100644 --- a/v2/options.go +++ b/v2/options.go @@ -8,6 +8,7 @@ import ( "github.com/multiformats/go-multicodec" "github.com/ipld/go-car/v2/internal/carv1" + resumetraversal "github.com/ipld/go-car/v2/traversal" ) // DefaultMaxIndexCidSize specifies the maximum size in byptes accepted as a section CID by CARv2 index. @@ -62,6 +63,7 @@ type Options struct { TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser DataPayloadSize uint64 SkipOffset uint64 + TraversalResumerPathState resumetraversal.PathState MaxAllowedHeaderSize uint64 MaxAllowedSectionSize uint64 diff --git a/v2/selective.go b/v2/selective.go index 9f128525..9aa379d2 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -50,29 +50,47 @@ func WithDataPayloadSize(size uint64) Option { } } +// WithTraversalResumerPathState provides a custom PathState that can be reused +// between selective CAR creations where traversals may need to be resumed at +// arbitrary points within the DAG. +// +// A PathState shared across multiple traversals using the same selector and DAG +// will yield the same state. This allows us to resume at arbitrary points +// within in the DAG and load the minimal additional blocks required to resume +// the traversal at that point. +func WithTraversalResumerPathState(pathState resumetraversal.PathState) Option { + return func(o *Options) { + o.TraversalResumerPathState = pathState + } +} + +func newTraversalCar(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts Options) *traversalCar { + pathState := opts.TraversalResumerPathState + if pathState == nil { + pathState = resumetraversal.NewPathState() + } + return &traversalCar{ + ctx: ctx, + opts: opts, + ls: ls, + root: root, + selector: selector, + size: opts.DataPayloadSize, + pathState: pathState, + } +} + // NewSelectiveWriter walks through the proposed dag traversal to learn its total size in order to be able to // stream out a car to a writer in the expected traversal order in one go. func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Writer, error) { conf := ApplyOptions(opts...) + tc := newTraversalCar(ctx, ls, root, selector, conf) + if conf.DataPayloadSize != 0 { - return &traversalCar{ - size: conf.DataPayloadSize, - ctx: ctx, - root: root, - selector: selector, - ls: ls, - opts: ApplyOptions(opts...), - }, nil - } - tc := traversalCar{ - //size: headSize + cntr.Size(), - ctx: ctx, - root: root, - selector: selector, - ls: ls, - opts: ApplyOptions(opts...), - } - if err := tc.setup(ctx, ls, ApplyOptions(opts...)); err != nil { + return tc, nil + } + + if err := tc.setup(ctx, ls, conf.SkipOffset, ApplyOptions(opts...)); err != nil { return nil, err } @@ -85,21 +103,15 @@ func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, return nil, err } tc.size = headSize + tc.resumer.Position() - return &tc, nil + + return tc, nil } // TraverseToFile writes a car file matching a given root and selector to the // path at `destination` using one read of each block. func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, destination string, opts ...Option) error { conf := ApplyOptions(opts...) - tc := traversalCar{ - size: conf.DataPayloadSize, - ctx: ctx, - root: root, - selector: selector, - ls: ls, - opts: conf, - } + tc := newTraversalCar(ctx, ls, root, selector, conf) fp, err := os.Create(destination) if err != nil { @@ -129,15 +141,7 @@ func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, sele func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, writer io.Writer, opts ...Option) (uint64, error) { opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) - tc := traversalCar{ - size: conf.DataPayloadSize, - ctx: ctx, - root: root, - selector: selector, - ls: ls, - opts: conf, - } - + tc := newTraversalCar(ctx, ls, root, selector, conf) len, _, err := tc.WriteV1(tc.ctx, conf.SkipOffset, writer) return len, err } @@ -146,14 +150,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { opts = append(opts, WithoutIndex()) conf := ApplyOptions(opts...) - tc := traversalCar{ - size: conf.DataPayloadSize, - ctx: ctx, - root: root, - selector: selector, - ls: ls, - opts: conf, - } + tc := newTraversalCar(ctx, ls, root, selector, conf) rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) { s, _, err := tc.WriteV1(ctx, offset, writer) return s, err @@ -170,14 +167,16 @@ type Writer interface { var _ Writer = (*traversalCar)(nil) type traversalCar struct { - size uint64 - ctx context.Context - root cid.Cid - selector ipld.Node - ls *ipld.LinkSystem - opts Options - progress *traversal.Progress - resumer resumetraversal.TraverseResumer + size uint64 + ctx context.Context + root cid.Cid + selector ipld.Node + ls *ipld.LinkSystem + opts Options + progress *traversal.Progress + resumer resumetraversal.TraverseResumer + pathState resumetraversal.PathState + skip uint64 } func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { @@ -275,9 +274,9 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( skip -= v1Size } - // write the block. - wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) - if err = tc.setup(ctx, &wls, tc.opts); err != nil { + // write the block + wls, writer := loader.TeeingLinkSystem(*tc.ls, w, tc.pathState, v1Size, skip, tc.opts.IndexCodec) + if err = tc.setup(ctx, &wls, skip, tc.opts); err != nil { return v1Size, nil, err } err = tc.traverse(tc.root, tc.selector) @@ -297,7 +296,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) ( return v1Size, idx, err } -func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Options) error { +func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, skip uint64, opts Options) error { chooser := func(_ ipld.Link, _ linking.LinkContext) (ipld.NodePrototype, error) { return basicnode.Prototype.Any, nil } @@ -321,12 +320,13 @@ func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Opt } ls.TrustedStorage = true - resumer, err := resumetraversal.WithTraversingLinksystem(&progress) + resumer, err := resumetraversal.WithTraversingLinksystem(&progress, tc.pathState) if err != nil { return err } tc.progress = &progress tc.resumer = resumer + tc.skip = skip return nil } @@ -342,7 +342,10 @@ func (tc *traversalCar) traverse(root cid.Cid, s ipld.Node) error { } rootNode, err := tc.progress.Cfg.LinkSystem.Load(ipld.LinkContext{}, lnk, rp) if err != nil { - return fmt.Errorf("root blk load failed: %s", err) + return fmt.Errorf("root block load failed: %s", err) + } + if tc.skip > 0 { + tc.resumer.RewindToOffset(tc.skip) } err = tc.progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, node ipld.Node) error { if lbn, ok := node.(datamodel.LargeBytesNode); ok { diff --git a/v2/selective_test.go b/v2/selective_test.go index a387e890..e927e0b3 100644 --- a/v2/selective_test.go +++ b/v2/selective_test.go @@ -15,6 +15,7 @@ import ( "github.com/ipfs/go-unixfsnode/data/builder" "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/blockstore" + "github.com/ipld/go-car/v2/traversal" dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" @@ -154,24 +155,62 @@ func TestOffsetWrites(t *testing.T) { _, rts, err := cid.CidFromBytes([]byte(rootCid.Binary())) require.NoError(t, err) + pathState := traversal.NewPathState() + // get the full car buffer. fullBuf := bytes.Buffer{} - _, err = car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &fullBuf) + _, err = car.TraverseV1( + context.Background(), + &ls, + rts, + selectorparse.CommonSelector_ExploreAllRecursively, + &fullBuf, + car.WithTraversalResumerPathState(pathState), + ) require.NoError(t, err) - for i := uint64(1); i < 1000; i += 1 { - buf := bytes.Buffer{} - - _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) - require.NoError(t, err) - require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + // same again but with pathState cached + // TODO: test block load count + buf := bytes.Buffer{} + _, err = car.TraverseV1( + context.Background(), + &ls, + rts, + selectorparse.CommonSelector_ExploreAllRecursively, + &buf, + car.WithTraversalResumerPathState(pathState), + ) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes(), buf.Bytes()) + + run1000 := func(opts ...car.Option) func(t *testing.T) { + return func(t *testing.T) { + for i := uint64(1); i < 1000; i += 1 { + buf := bytes.Buffer{} + o := append([]car.Option{car.WithSkipOffset(i)}, opts...) + _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, o...) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + } + } } - for i := uint64(1000); i < 1000000; i += 1000 { - buf := bytes.Buffer{} - - _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) - require.NoError(t, err) - require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + t.Run("first 1000 bytes, no cache", run1000()) + // TODO: test block load count + t.Run("first 1000 bytes, with cache", run1000(car.WithTraversalResumerPathState(pathState))) + + run1000000 := func(opts ...car.Option) func(t *testing.T) { + return func(t *testing.T) { + for i := uint64(1000); i < 1000000; i += 1000 { + buf := bytes.Buffer{} + _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + } + } } + + t.Run("next 1000000 bytes, no cache", run1000000()) + // TODO: test block load count + t.Run("next 1000000 bytes, with cache", run1000000(car.WithTraversalResumerPathState(pathState))) } diff --git a/v2/traversal/resumption.go b/v2/traversal/resumption.go index d88eead8..b4c8b991 100644 --- a/v2/traversal/resumption.go +++ b/v2/traversal/resumption.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "math" + "os" "github.com/ipld/go-car/v2/internal/loader" "github.com/ipld/go-ipld-prime" @@ -17,32 +18,77 @@ import ( "github.com/ipld/go-ipld-prime/traversal" ) -type pathNode struct { +// TraverseResumer allows resuming a progress from a previously encountered path +// in the selector. +type TraverseResumer interface { + RewindToPath(from datamodel.Path) error + RewindToOffset(offset uint64) error + Position() uint64 +} + +// PathState tracks a traversal state for the purpose of +// building a CAR. For each block in the CAR it tracks the path to that block, +// the Link of the block and where in the CAR the block is located. +// +// A PathState shared across multiple traversals using the same +// selector and DAG will yield the same state. This allows us to resume at +// arbitrary points within in the DAG and load the minimal additional blocks +// required to resume the traversal at that point. +type PathState interface { + AddPath(path []datamodel.PathSegment, link datamodel.Link, atOffset uint64) + GetLinks(root datamodel.Path) []datamodel.Link + GetOffsetAfter(root datamodel.Path) (uint64, error) +} + +type pathState struct { link datamodel.Link offset uint64 - children map[datamodel.PathSegment]*pathNode + children map[datamodel.PathSegment]*pathState +} + +// NewPathState creates a new PathState. +// +// Note that the PathState returned by this factory is not +// thread-safe. +func NewPathState() PathState { + return &pathState{children: make(map[datamodel.PathSegment]*pathState)} } -func newPath(link datamodel.Link, at uint64) *pathNode { - return &pathNode{ - link: link, - offset: at, - children: make(map[datamodel.PathSegment]*pathNode), +func (pn *pathState) AddPath(p []datamodel.PathSegment, link datamodel.Link, atOffset uint64) { + if len(p) == 0 { // root path + pn.link = link + pn.offset = atOffset + } else { + pn.addPathRecursive(p, link, atOffset) } } -func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link, at uint64) { +func (pn *pathState) addPathRecursive(p []datamodel.PathSegment, link datamodel.Link, atOffset uint64) { if len(p) == 0 { return } if _, ok := pn.children[p[0]]; !ok { - child := newPath(link, at) + child := NewPathState().(*pathState) + child.link = link + child.offset = atOffset pn.children[p[0]] = child } - pn.children[p[0]].addPath(p[1:], link, at) + pn.children[p[0]].addPathRecursive(p[1:], link, atOffset) +} + +func (pn pathState) DebugPrint(indent string) { + if pn.link == nil { + fmt.Fprintf(os.Stderr, "%sRoot: %d\n", indent, pn.offset) + } else { + fmt.Fprintf(os.Stderr, "%s%s: %d\n", indent, pn.link, pn.offset) + } + for ps, ch := range pn.children { + fmt.Fprintf(os.Stderr, "%s%s ->\n", indent, ps) + ch.DebugPrint(fmt.Sprintf("%s\t", indent)) + } } -func (pn pathNode) allLinks() []datamodel.Link { +func (pn pathState) allLinks() []datamodel.Link { if len(pn.children) == 0 { return []datamodel.Link{pn.link} } @@ -57,7 +103,7 @@ func (pn pathNode) allLinks() []datamodel.Link { } // getPaths returns reconstructed paths in the tree rooted at 'root' -func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { +func (pn pathState) GetLinks(root datamodel.Path) []datamodel.Link { segs := root.Segments() switch len(segs) { case 0: @@ -80,12 +126,12 @@ func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { // base case 2: not registered sub-path. return []datamodel.Link{} } - return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:])) + return pn.children[next].GetLinks(datamodel.NewPathNocopy(segs[1:])) } var errInvalid = fmt.Errorf("invalid path") -func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { +func (pn pathState) GetOffsetAfter(root datamodel.Path) (uint64, error) { // we look for offset of next sibling. // if no next sibling recurse up the path segments until we find a next sibling. segs := root.Segments() @@ -100,14 +146,14 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { closest := chld.offset // try recursive path if len(segs) > 1 { - co, err := chld.offsetAfter(datamodel.NewPathNocopy(segs[1:])) + co, err := chld.GetOffsetAfter(datamodel.NewPathNocopy(segs[1:])) if err == nil { return co, err } } // find our next sibling var next uint64 = math.MaxUint64 - var nc *pathNode + var nc *pathState for _, v := range pn.children { if v.offset > closest && v.offset < next { next = v.offset @@ -121,35 +167,28 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { return 0, errInvalid } -// TraverseResumer allows resuming a progress from a previously encountered path in the selector. -type TraverseResumer interface { - RewindToPath(from datamodel.Path) error - RewindToOffset(offset uint64) error - Position() uint64 -} - type traversalState struct { wrappedLinksystem *linking.LinkSystem lsCounter *loader.Counter - blockNumber int - pathOrder map[int]datamodel.Path - pathTree *pathNode + pathTree PathState rewindPathTarget *datamodel.Path rewindOffsetTarget uint64 pendingBlockStart uint64 // on rewinds, we store where the counter was in order to know the length of the last read block. progress *traversal.Progress } +var _ TraverseResumer = (*traversalState)(nil) + func (ts *traversalState) RewindToPath(from datamodel.Path) error { if ts.progress == nil { return nil } // reset progress and traverse until target. ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) - ts.blockNumber = 0 ts.pendingBlockStart = ts.lsCounter.Size() ts.lsCounter.TotalRead = 0 ts.rewindPathTarget = &from + ts.rewindOffsetTarget = 0 return nil } @@ -163,10 +202,10 @@ func (ts *traversalState) RewindToOffset(offset uint64) error { } // reset progress and traverse until target. ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) - ts.blockNumber = 0 ts.pendingBlockStart = ts.lsCounter.Size() ts.lsCounter.TotalRead = 0 ts.rewindOffsetTarget = offset + ts.rewindPathTarget = nil return nil } @@ -177,9 +216,7 @@ func (ts *traversalState) Position() uint64 { func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { // when not in replay mode, we track metadata if ts.rewindPathTarget == nil && ts.rewindOffsetTarget == 0 { - ts.pathOrder[ts.blockNumber] = lc.LinkPath - ts.pathTree.addPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) - ts.blockNumber++ + ts.pathTree.AddPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) return ts.wrappedLinksystem.StorageReadOpener(lc, l) } @@ -205,12 +242,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read break } if targetSegments[i].String() != s.String() { - links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0 : i+1])) + links := ts.pathTree.GetLinks(datamodel.NewPathNocopy(seg[0 : i+1])) for _, l := range links { ts.progress.SeenLinks[l] = struct{}{} } var err error - ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(datamodel.NewPathNocopy(seg[0 : i+1])) + ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(datamodel.NewPathNocopy(seg[0 : i+1])) if err == errInvalid { ts.lsCounter.TotalRead = ts.pendingBlockStart } else if err != nil { @@ -221,19 +258,26 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read } } } + if ts.rewindOffsetTarget != 0 { - links := ts.pathTree.getLinks(lc.LinkPath) - for _, l := range links { - ts.progress.SeenLinks[l] = struct{}{} - } - var err error - ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(lc.LinkPath) - if err == errInvalid { - ts.lsCounter.TotalRead = ts.pendingBlockStart - } else if err != nil { - return nil, err + links := ts.pathTree.GetLinks(lc.LinkPath) + if len(links) == 0 { // we've not seen this before, must be the first time here + ts.pathTree.AddPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) + } else { + for _, l := range links { + ts.progress.SeenLinks[l] = struct{}{} + } + var err error + ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(lc.LinkPath) + if err == errInvalid { + ts.lsCounter.TotalRead = ts.pendingBlockStart + } else if err != nil { + return nil, err + } + if ts.rewindOffsetTarget >= ts.lsCounter.TotalRead { + return nil, traversal.SkipMe{} + } } - return nil, traversal.SkipMe{} } // descend. @@ -243,15 +287,14 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read // WithTraversingLinksystem extends a progress for traversal such that it can // subsequently resume and perform subsets of the walk efficiently from // an arbitrary position within the selector traversal. -func WithTraversingLinksystem(p *traversal.Progress) (TraverseResumer, error) { - wls, ctr := loader.CountingLinkSystem(p.Cfg.LinkSystem) +func WithTraversingLinksystem(progress *traversal.Progress, pathState PathState) (TraverseResumer, error) { + wls, ctr := loader.CountingLinkSystem(progress.Cfg.LinkSystem) ts := &traversalState{ wrappedLinksystem: &wls, lsCounter: ctr.(*loader.Counter), - pathOrder: make(map[int]datamodel.Path), - pathTree: newPath(nil, 0), - progress: p, + pathTree: pathState, + progress: progress, } - p.Cfg.LinkSystem.StorageReadOpener = ts.traverse + progress.Cfg.LinkSystem.StorageReadOpener = ts.traverse return ts, nil } diff --git a/v2/traversal/resumption_test.go b/v2/traversal/resumption_test.go index 58350b62..70b86b85 100644 --- a/v2/traversal/resumption_test.go +++ b/v2/traversal/resumption_test.go @@ -89,7 +89,7 @@ func TestWalkResumeByPath(t *testing.T) { LinkTargetNodePrototypeChooser: basicnode.Chooser, }, } - resumer, err := cartraversal.WithTraversingLinksystem(&p) + resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewPathState()) if err != nil { t.Fatal(err) } @@ -154,7 +154,7 @@ func TestWalkResumeByPathPartialWalk(t *testing.T) { LinkTargetNodePrototypeChooser: basicnode.Chooser, }, } - resumer, err := cartraversal.WithTraversingLinksystem(&p) + resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewPathState()) if err != nil { t.Fatal(err) } @@ -182,7 +182,7 @@ func TestWalkResumeByPathPartialWalk(t *testing.T) { func TestWalkResumeByOffset(t *testing.T) { seen := 0 - count := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + count := func(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { seen++ return nil } @@ -195,7 +195,7 @@ func TestWalkResumeByOffset(t *testing.T) { LinkTargetNodePrototypeChooser: basicnode.Chooser, }, } - resumer, err := cartraversal.WithTraversingLinksystem(&p) + resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewPathState()) if err != nil { t.Fatal(err) } @@ -219,17 +219,28 @@ func TestWalkResumeByOffset(t *testing.T) { } // resume from middle. - resumer.RewindToOffset(10) + resumer.RewindToOffset(17) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } + + // resume from just before the middle. + resumer.RewindToOffset(127) seen = 0 if err := p.WalkAdv(rootNode, s, count); err != nil { t.Fatal(err) } + // will not visit 'linkedString' or 'linkedMap' before linked list. if seen != 13 { t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) } // resume from middle. - resumer.RewindToOffset(50) + resumer.RewindToOffset(128) seen = 0 if err := p.WalkAdv(rootNode, s, count); err != nil { t.Fatal(err)