Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions carstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser)

// StreamCar streams a DAG in CARv1 format to the given writer, using the given
// selector.
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) error {
func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.Cid, selNode datamodel.Node, out io.Writer, duplicates bool) (int64, int64, error) {
sel, err := selector.CompileSelector(selNode)
if err != nil {
return fmt.Errorf("failed to compile selector: %w", err)
return 0, 0, fmt.Errorf("failed to compile selector: %w", err)
}

carWriter, err := carstorage.NewWritable(out, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(duplicates))
if err != nil {
return fmt.Errorf("failed to create car writer: %w", err)
return 0, 0, fmt.Errorf("failed to create car writer: %w", err)
}

erro := &errorRecordingReadOpener{ctx, requestLsys.StorageReadOpener, carWriter, nil}
erro := newErrorRecordingReadOpener(ctx, requestLsys.StorageReadOpener, carWriter)
requestLsys.StorageReadOpener = erro.StorageReadOpener

rootNode, err := loadNode(ctx, rootCid, requestLsys)
if err != nil {
return fmt.Errorf("failed to load root node: %w", err)
return 0, 0, fmt.Errorf("failed to load root node: %w", err)
}

progress := traversal.Progress{Cfg: &traversal.Config{
Expand All @@ -54,20 +54,26 @@ func StreamCar(ctx context.Context, requestLsys linking.LinkSystem, rootCid cid.
LinkTargetNodePrototypeChooser: protoChooser,
}}
if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil {
return fmt.Errorf("failed to complete traversal: %w", err)
return 0, 0, fmt.Errorf("failed to complete traversal: %w", err)
}
if erro.err != nil {
return fmt.Errorf("block load failed during traversal: %w", erro.err)
return 0, 0, fmt.Errorf("block load failed during traversal: %w", erro.err)
}

return nil
return erro.byteCount, erro.blockCount, nil
}

type errorRecordingReadOpener struct {
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
ctx context.Context
orig linking.BlockReadOpener
car carstorage.WritableCar
err error
byteCount int64
blockCount int64
}

func newErrorRecordingReadOpener(ctx context.Context, orig linking.BlockReadOpener, car carstorage.WritableCar) *errorRecordingReadOpener {
return &errorRecordingReadOpener{ctx, orig, car, nil, 0, 0}
}

func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
Expand All @@ -84,6 +90,8 @@ func (erro *errorRecordingReadOpener) StorageReadOpener(lc linking.LinkContext,
if err != nil {
return nil, err
}
erro.byteCount += int64(len(byts))
erro.blockCount++
return bytes.NewReader(byts), nil
}

Expand Down
96 changes: 70 additions & 26 deletions carstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,61 +50,73 @@ func TestStreamCar(t *testing.T) {
}

testCases := []struct {
name string
selector datamodel.Node
root cid.Cid
lsys linking.LinkSystem
validate func(t *testing.T, r io.Reader)
name string
selector datamodel.Node
root cid.Cid
lsys linking.LinkSystem
expectedBytes int64
expectedBlocks int64
validate func(t *testing.T, r io.Reader)
}{
{
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: all blocks",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
expectedBytes: sizeOf(allChainBlocks),
expectedBlocks: 100,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, allChainBlocks, blks)
},
},
{
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
name: "chain: just root",
selector: selectorparse.CommonSelector_MatchPoint,
root: tbc.TipLink.(cidlink.Link).Cid,
lsys: chainLsys,
expectedBytes: sizeOf(allChainBlocks[:1]),
expectedBlocks: 1,
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, tbc.TipLink.(cidlink.Link).Cid, root)
require.Equal(t, []blocks.Block{allChainBlocks[0]}, blks)
},
},
{
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
name: "unixfs file",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: fileEnt.Root,
lsys: fileLsys,
expectedBytes: sizeOfDirEnt(fileEnt, fileLsys),
expectedBlocks: int64(len(fileEnt.SelfCids)),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, fileEnt.Root, root)
require.ElementsMatch(t, fileEnt.SelfCids, blkCids(blks))
},
},
{
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
name: "unixfs directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: dirEnt.Root,
lsys: dirLsys,
expectedBytes: sizeOfDirEnt(dirEnt, dirLsys),
expectedBlocks: blocksInDirEnt(dirEnt),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, dirEnt.Root, root)
require.ElementsMatch(t, entCids(dirEnt), blkCids(blks))
},
},
{
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
name: "unixfs sharded directory",
selector: selectorparse.CommonSelector_ExploreAllRecursively,
root: shardedDirEnt.Root,
lsys: shardedDirLsys,
expectedBytes: sizeOfDirEnt(shardedDirEnt, shardedDirLsys),
expectedBlocks: blocksInDirEnt(shardedDirEnt),
validate: func(t *testing.T, r io.Reader) {
root, blks := carToBlocks(t, r)
require.Equal(t, shardedDirEnt.Root, root)
Expand All @@ -118,8 +130,10 @@ func TestStreamCar(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
req := require.New(t)
var buf bytes.Buffer
err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
byts, blks, err := frisbii.StreamCar(ctx, tc.lsys, tc.root, tc.selector, &buf, false)
req.NoError(err)
req.Equal(tc.expectedBytes, byts)
req.Equal(tc.expectedBlocks, blks)
tc.validate(t, &buf)
})
}
Expand Down Expand Up @@ -200,3 +214,33 @@ func GenerateNoDupes(gen func() unixfs.DirEntry) unixfs.DirEntry {
}
}
}

func sizeOf(blks []blocks.Block) int64 {
var size int64
for _, blk := range blks {
size += int64(len(blk.RawData()))
}
return size
}
func sizeOfDirEnt(dirEnt unixfs.DirEntry, ls linking.LinkSystem) int64 {
var size int64
for _, c := range dirEnt.SelfCids {
blk, err := ls.LoadRaw(linking.LinkContext{}, cidlink.Link{Cid: c})
if err != nil {
panic(err)
}
size += int64(len(blk))
}
for _, c := range dirEnt.Children {
size += sizeOfDirEnt(c, ls)
}
return size
}

func blocksInDirEnt(dirEnt unixfs.DirEntry) int64 {
size := int64(len(dirEnt.SelfCids))
for _, c := range dirEnt.Children {
size += blocksInDirEnt(c)
}
return size
}
19 changes: 4 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/dustin/go-humanize v1.0.1
github.com/filecoin-project/lassie v0.12.2-0.20230614045620-19b6e938241c
github.com/filecoin-project/lassie v0.14.4-0.20230811073202-26954e00718d
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-graphsync v0.14.7
Expand All @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-unixfsnode v1.7.3
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-prime v0.20.1-0.20230613110822-3142e1304e55
github.com/ipld/go-ipld-prime v0.21.1-0.20230811030745-6e31cea491de
github.com/ipni/go-libipni v0.3.4
github.com/ipni/index-provider v0.13.5
github.com/libp2p/go-libp2p v0.29.2
Expand All @@ -22,6 +22,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
golang.org/x/term v0.10.0
lukechampine.com/blake3 v1.2.1
)

require (
Expand All @@ -32,13 +33,11 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/felixge/httpsnoop v1.0.0 // indirect
github.com/filecoin-project/go-address v1.1.0 // indirect
github.com/filecoin-project/go-amt-ipld/v4 v4.0.0 // indirect
github.com/filecoin-project/go-cbor-util v0.0.1 // indirect
Expand All @@ -58,7 +57,6 @@ require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand All @@ -70,22 +68,18 @@ require (
github.com/hannahhoward/go-pubsub v1.0.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/boxo v0.10.2-0.20230629143123-2d3edc552442 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-blocksutil v0.0.1 // indirect
github.com/ipfs/go-ipfs-chunker v0.0.5 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-unixfs v0.4.5 // indirect
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c // indirect
Expand All @@ -101,9 +95,6 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-gostream v0.6.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.9.3 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect
github.com/libp2p/go-mplex v0.7.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
Expand All @@ -117,7 +108,6 @@ require (
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/go-server-timing v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
Expand Down Expand Up @@ -172,5 +162,4 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
Loading