diff --git a/Dockerfile b/Dockerfile index abef6553..1fbcc621 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.23.6-bullseye as builder +FROM golang:1.23.6-bullseye AS builder WORKDIR /app COPY go.* ./ RUN go mod download diff --git a/client/swagger/models/dataprep_create_request.go b/client/swagger/models/dataprep_create_request.go index 4a4d325d..b74df463 100644 --- a/client/swagger/models/dataprep_create_request.go +++ b/client/swagger/models/dataprep_create_request.go @@ -25,6 +25,9 @@ type DataprepCreateRequest struct { // Maximum size of the CAR files to be created MaxSize *string `json:"maxSize,omitempty"` + // Minimum piece size for the preparation, applies only to DAG and remainer pieces + MinPieceSize *string `json:"minPieceSize,omitempty"` + // Name of the preparation // Required: true Name *string `json:"name"` diff --git a/client/swagger/models/model_car.go b/client/swagger/models/model_car.go index 81395736..a790ea9f 100644 --- a/client/swagger/models/model_car.go +++ b/client/swagger/models/model_car.go @@ -41,6 +41,9 @@ type ModelCar struct { // piece size PieceSize int64 `json:"pieceSize,omitempty"` + // PieceType indicates whether this is a data piece or DAG piece + PieceType string `json:"pieceType,omitempty"` + // Association PreparationID int64 `json:"preparationId,omitempty"` diff --git a/client/swagger/models/model_preparation.go b/client/swagger/models/model_preparation.go index c0363612..16cc6d7b 100644 --- a/client/swagger/models/model_preparation.go +++ b/client/swagger/models/model_preparation.go @@ -31,6 +31,9 @@ type ModelPreparation struct { // max size MaxSize int64 `json:"maxSize,omitempty"` + // Minimum piece size for the preparation, applies only to DAG and remainder pieces + MinPieceSize int64 `json:"minPieceSize,omitempty"` + // name Name string `json:"name,omitempty"` diff --git a/cmd/api_test.go b/cmd/api_test.go index 21422478..c2154a3a 100644 --- a/cmd/api_test.go +++ b/cmd/api_test.go @@ -263,7 +263,7 @@ func TestBasicDataPrep(t *testing.T) { require.True(t, listPiecesResp.IsSuccess()) require.Len(t, listPiecesResp.Payload, 1) require.Len(t, listPiecesResp.Payload[0].Pieces, 1) - require.Equal(t, "baga6ea4seaqoahdvfwkrp64ecsxbjvyuqcwpz3o7ctxrjanlv2x4u2cq2qjf2ji", listPiecesResp.Payload[0].Pieces[0].PieceCid) + require.Equal(t, "baga6ea4seaqhmks2wnochilik4updmit54agfi5mjf6r7ehotu36ksdp46uxahi", listPiecesResp.Payload[0].Pieces[0].PieceCid) // Start daggen startDagGenResp, err := client.Job.StartDagGen(&job.StartDagGenParams{ ID: "prep", @@ -285,7 +285,9 @@ func TestBasicDataPrep(t *testing.T) { require.True(t, listPiecesResp.IsSuccess()) require.Len(t, listPiecesResp.Payload, 1) require.Len(t, listPiecesResp.Payload[0].Pieces, 2) - require.Equal(t, "baga6ea4seaqoahdvfwkrp64ecsxbjvyuqcwpz3o7ctxrjanlv2x4u2cq2qjf2ji", listPiecesResp.Payload[0].Pieces[0].PieceCid) - require.Equal(t, "baga6ea4seaqbkouoyih2elxfrztq3gr23rpvgpx5e3fnud2rhvvzf4b7tneeyki", listPiecesResp.Payload[0].Pieces[1].PieceCid) + // data piece, full size + require.Equal(t, "baga6ea4seaqhmks2wnochilik4updmit54agfi5mjf6r7ehotu36ksdp46uxahi", listPiecesResp.Payload[0].Pieces[0].PieceCid) + // dag piece, min piece size + require.Equal(t, "baga6ea4seaqfoo2k3wmwp7gvxnc7hbjpb7ovtvt52tehwfvzxbreljcebbnwgiq", listPiecesResp.Payload[0].Pieces[1].PieceCid) }) } diff --git a/cmd/dataprep/create.go b/cmd/dataprep/create.go index ebe947f2..c48f68a0 100644 --- a/cmd/dataprep/create.go +++ b/cmd/dataprep/create.go @@ -55,6 +55,12 @@ var CreateCmd = &cli.Command{ Value: "", DefaultText: "Determined by --max-size", }, + &cli.StringFlag{ + Name: "min-piece-size", + Usage: "The minimum size of a piece. Pieces smaller than this will be padded up to this size. It's recommended to leave this as the default", + Value: "1MiB", + DefaultText: "1MiB", + }, &cli.BoolFlag{ Name: "delete-after-export", Usage: "Whether to delete the source files after export to CAR files", @@ -83,6 +89,7 @@ var CreateCmd = &cli.Command{ outputStorages := c.StringSlice("output") maxSizeStr := c.String("max-size") pieceSizeStr := c.String("piece-size") + minPieceSizeStr := c.String("min-piece-size") for _, sourcePath := range c.StringSlice("local-source") { source, err := createStorageIfNotExist(c.Context, db, sourcePath) if err != nil { @@ -103,8 +110,9 @@ var CreateCmd = &cli.Command{ OutputStorages: outputStorages, MaxSizeStr: maxSizeStr, PieceSizeStr: pieceSizeStr, - DeleteAfterExport: c.Bool("delete-after-export"), + MinPieceSizeStr: minPieceSizeStr, Name: name, + DeleteAfterExport: c.Bool("delete-after-export"), NoInline: c.Bool("no-inline"), NoDag: c.Bool("no-dag"), }) diff --git a/cmd/functional_test.go b/cmd/functional_test.go index 1bdffe40..14009ea6 100644 --- a/cmd/functional_test.go +++ b/cmd/functional_test.go @@ -451,6 +451,10 @@ func TestDataPrep(t *testing.T) { require.Equal(t, pieceCID, calculatedPieceCID) err = os.WriteFile(filepath.Join(downloadDir, pieceCID+".car"), downloaded, 0777) require.NoError(t, err) + + // Verify piece size is a power of two + pieceSize := uint64(len(downloaded)) + require.True(t, util.IsPowerOfTwo(pieceSize), "piece size %d is not a power of two", pieceSize) } // Download all pieces using local download server @@ -499,7 +503,7 @@ func TestNoDuplicatedOutput(t *testing.T) { _, _, err = runner.Run(ctx, fmt.Sprintf("singularity storage create local --name source --path %s", testutil.EscapePath(source))) require.NoError(t, err) - _, _, err = runner.Run(ctx, fmt.Sprintf("singularity prep create --name test-prep --delete-after-export --source source --local-output %s --max-size=500KiB", testutil.EscapePath(output))) + _, _, err = runner.Run(ctx, fmt.Sprintf("singularity prep create --name test-prep --delete-after-export --source source --local-output %s --max-size=500KiB --min-piece-size=256KiB", testutil.EscapePath(output))) require.NoError(t, err) // Start scanning diff --git a/docs/en/cli-reference/prep/create.md b/docs/en/cli-reference/prep/create.md index d777ec90..d9250788 100644 --- a/docs/en/cli-reference/prep/create.md +++ b/docs/en/cli-reference/prep/create.md @@ -15,6 +15,7 @@ OPTIONS: --delete-after-export Whether to delete the source files after export to CAR files (default: false) --help, -h show help --max-size value The maximum size of a single CAR file (default: "31.5GiB") + --min-piece-size value The minimum size of a piece. Pieces smaller than this will be padded up to this size. It's recommended to leave this as the default (default: 1MiB) --name value The name for the preparation (default: Auto generated) --no-dag Whether to disable maintaining folder dag structure for the sources. If disabled, DagGen will not be possible and folders will not have an associated CID. (default: false) --no-inline Whether to disable inline storage for the preparation. Can save database space but requires at least one output storage. (default: false) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 5cd7cff6..424f209f 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -5626,6 +5626,11 @@ const docTemplate = `{ "type": "string", "default": "31.5GiB" }, + "minPieceSize": { + "description": "Minimum piece size for the preparation, applies only to DAG and remainer pieces", + "type": "string", + "default": "1MiB" + }, "name": { "description": "Name of the preparation", "type": "string" @@ -5953,6 +5958,10 @@ const docTemplate = `{ "pieceSize": { "type": "integer" }, + "pieceType": { + "description": "PieceType indicates whether this is a data piece or DAG piece", + "type": "string" + }, "preparationId": { "description": "Association", "type": "integer" @@ -6295,6 +6304,10 @@ const docTemplate = `{ "maxSize": { "type": "integer" }, + "minPieceSize": { + "description": "Minimum piece size for the preparation, applies only to DAG and remainder pieces", + "type": "integer" + }, "name": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 76b71d3b..2bf8a3a6 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -5620,6 +5620,11 @@ "type": "string", "default": "31.5GiB" }, + "minPieceSize": { + "description": "Minimum piece size for the preparation, applies only to DAG and remainer pieces", + "type": "string", + "default": "1MiB" + }, "name": { "description": "Name of the preparation", "type": "string" @@ -5947,6 +5952,10 @@ "pieceSize": { "type": "integer" }, + "pieceType": { + "description": "PieceType indicates whether this is a data piece or DAG piece", + "type": "string" + }, "preparationId": { "description": "Association", "type": "integer" @@ -6289,6 +6298,10 @@ "maxSize": { "type": "integer" }, + "minPieceSize": { + "description": "Minimum piece size for the preparation, applies only to DAG and remainder pieces", + "type": "integer" + }, "name": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 814d0294..490d4d59 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -42,6 +42,11 @@ definitions: default: 31.5GiB description: Maximum size of the CAR files to be created type: string + minPieceSize: + default: 1MiB + description: Minimum piece size for the preparation, applies only to DAG and + remainer pieces + type: string name: description: Name of the preparation type: string @@ -277,6 +282,9 @@ definitions: type: string pieceSize: type: integer + pieceType: + description: PieceType indicates whether this is a data piece or DAG piece + type: string preparationId: description: Association type: integer @@ -529,6 +537,10 @@ definitions: type: integer maxSize: type: integer + minPieceSize: + description: Minimum piece size for the preparation, applies only to DAG and + remainder pieces + type: integer name: type: string noDag: diff --git a/handler/dataprep/create.go b/handler/dataprep/create.go index 9fd8480a..f38d3f8f 100644 --- a/handler/dataprep/create.go +++ b/handler/dataprep/create.go @@ -18,6 +18,7 @@ type CreateRequest struct { OutputStorages []string `json:"outputStorages"` // Name of Output storage systems to be used for the output MaxSizeStr string `default:"31.5GiB" json:"maxSize"` // Maximum size of the CAR files to be created PieceSizeStr string `default:"" json:"pieceSize"` // Target piece size of the CAR files used for piece commitment calculation + MinPieceSizeStr string `default:"1MiB" json:"minPieceSize"` // Minimum piece size for the preparation, applies only to DAG and remainer pieces DeleteAfterExport bool `default:"false" json:"deleteAfterExport"` // Whether to delete the source files after export NoInline bool `default:"false" json:"noInline"` // Whether to disable inline storage for the preparation. Can save database space but requires at least one output storage. NoDag bool `default:"false" json:"noDag"` // Whether to disable maintaining folder dag structure for the sources. If disabled, DagGen will not be possible and folders will not have an associated CID. @@ -77,6 +78,24 @@ func ValidateCreateRequest(ctx context.Context, db *gorm.DB, request CreateReque return nil, errors.Wrap(handlererror.ErrInvalidParameter, "maxSize needs to be reduced to leave space for padding") } + minPieceSizeStr := request.MinPieceSizeStr + if minPieceSizeStr == "" { + minPieceSizeStr = "1MiB" + } + + minPieceSize, err := humanize.ParseBytes(minPieceSizeStr) + if err != nil { + return nil, errors.Join(handlererror.ErrInvalidParameter, errors.Wrapf(err, "invalid value for minPieceSize: %s", minPieceSizeStr)) + } + + if minPieceSize > pieceSize { + return nil, errors.Wrap(handlererror.ErrInvalidParameter, "minPieceSize cannot be larger than pieceSize") + } + + if minPieceSize != util.NextPowerOfTwo(minPieceSize) { + return nil, errors.Wrap(handlererror.ErrInvalidParameter, "minPieceSize must be a power of two") + } + var sources []model.Storage for _, name := range request.SourceStorages { var source model.Storage @@ -114,6 +133,7 @@ func ValidateCreateRequest(ctx context.Context, db *gorm.DB, request CreateReque return &model.Preparation{ MaxSize: int64(maxSize), PieceSize: int64(pieceSize), + MinPieceSize: int64(minPieceSize), SourceStorages: sources, OutputStorages: outputs, DeleteAfterExport: request.DeleteAfterExport, diff --git a/handler/dataprep/piece.go b/handler/dataprep/piece.go index 256781ab..b84985b5 100644 --- a/handler/dataprep/piece.go +++ b/handler/dataprep/piece.go @@ -210,6 +210,7 @@ func (DefaultHandler) AddPieceHandler( StoragePath: request.FilePath, PreparationID: preparation.ID, FileSize: fileSize, + PieceType: model.DataPiece, } err = database.DoRetry(ctx, func() error { return db.Create(&mCar).Error }) diff --git a/handler/job/pack_test.go b/handler/job/pack_test.go index f832908c..3ced374f 100644 --- a/handler/job/pack_test.go +++ b/handler/job/pack_test.go @@ -137,7 +137,7 @@ func TestPackHandler_Success(t *testing.T) { require.NoError(t, err) require.NotNil(t, car) require.EqualValues(t, 100, car.FileSize) - require.EqualValues(t, "baga6ea4seaqbuglmtahbspkbeunqohciieh4yjivfhcqawufwgs4gt7mzmyfmmi", car.PieceCID.String()) + require.EqualValues(t, "baga6ea4seaqpikooah5wmbpjmnvx3ysyf36xagymjtbccnf5twt2cpaqcgcwqha", car.PieceCID.String()) err = db.Find(&job, 1).Error require.NoError(t, err) require.Equal(t, model.Complete, job.State) diff --git a/model/preparation.go b/model/preparation.go index 3414d998..fd0fe75f 100644 --- a/model/preparation.go +++ b/model/preparation.go @@ -11,6 +11,13 @@ import ( "gorm.io/gorm" ) +type PieceType string + +const ( + DataPiece PieceType = "data" + DagPiece PieceType = "dag" +) + type Worker struct { ID string `gorm:"primaryKey" json:"id"` LastHeartbeat time.Time `json:"lastHeartbeat"` @@ -34,6 +41,7 @@ type Preparation struct { DeleteAfterExport bool `json:"deleteAfterExport"` // DeleteAfterExport is a flag that indicates whether the source files should be deleted after export. MaxSize int64 `json:"maxSize"` PieceSize int64 `json:"pieceSize"` + MinPieceSize int64 `json:"minPieceSize"` // Minimum piece size for the preparation, applies only to DAG and remainder pieces NoInline bool `json:"noInline"` NoDag bool `json:"noDag"` @@ -252,6 +260,7 @@ type CarID uint32 type Car struct { ID CarID `cbor:"-" gorm:"primaryKey" json:"id" table:"verbose"` CreatedAt time.Time `cbor:"-" json:"createdAt" table:"verbose;format:2006-01-02 15:04:05"` + PieceType PieceType `cbor:"0,keyasint,omitempty" json:"pieceType" swaggertype:"string"` // PieceType indicates whether this is a data piece or DAG piece PieceCID CID `cbor:"1,keyasint,omitempty" gorm:"column:piece_cid;index;type:bytes;size:255" json:"pieceCid" swaggertype:"string"` PieceSize int64 `cbor:"2,keyasint,omitempty" json:"pieceSize"` RootCID CID `cbor:"3,keyasint,omitempty" gorm:"column:root_cid;type:bytes" json:"rootCid" swaggertype:"string"` @@ -319,3 +328,12 @@ func (c CarBlock) BlockLength() int32 { return c.blockLength } + +// GetMinPieceSize returns the minimum piece size for the preparation, with a fallback to 1MiB if not set. +// This ensures backward compatibility with older preparations that don't have minPieceSize set. +func (p *Preparation) GetMinPieceSize() int64 { + if p.MinPieceSize == 0 { + return 1 << 20 // 1MiB + } + return p.MinPieceSize +} diff --git a/pack/e2e_test.go b/pack/e2e_test.go new file mode 100644 index 00000000..a6ed8e76 --- /dev/null +++ b/pack/e2e_test.go @@ -0,0 +1,413 @@ +package pack + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/stretchr/testify/require" + "gorm.io/gorm" + + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/scan" + "github.com/data-preservation-programs/singularity/util/testutil" +) + +func TestLastPieceBehaviorE2ENoInline(t *testing.T) { + // This is an end-to-end test that verifies the last piece behavior by: + // 1. Creating a dataset with a file that will be split across multiple pieces + // 2. Using scan to automatically create pack jobs + // 3. Running those pack jobs + // 4. Verifying the resulting pieces have the expected properties + + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + // Setup: Create temporary directories for source and output + sourceDir := t.TempDir() + outputDir := t.TempDir() + + // 1. Create test files with known sizes + // Create a large random file that will be split into multiple pieces + // Each piece will exercise different padding behavior + testFileSize := 4_200_000 // ~4.2 MB - will be split into multiple pieces + + // Create the test file with random data + err := os.WriteFile(filepath.Join(sourceDir, "large_file.bin"), + testutil.GenerateRandomBytes(testFileSize), 0644) + require.NoError(t, err) + + // 2. Create a preparation with specific settings + pieceSize := int64(2 * 1024 * 1024) // 2 MiB target piece size + minPieceSize := int64(1 * 1024 * 1024) // 1 MiB min piece size + maxSize := pieceSize / 3 // Set max size to ensure we get multiple pieces from our file + + prep := model.Preparation{ + Name: "test-preparation", + MaxSize: maxSize, // Each job will have at most maxSize bytes (forcing splitting) + PieceSize: pieceSize, // Target piece size + MinPieceSize: minPieceSize, // Minimum piece size + NoInline: true, // Force writing CAR files to disk instead of using inline preparation + SourceStorages: []model.Storage{ + { + Name: "source-storage", + Type: "local", + Path: sourceDir, + }, + }, + OutputStorages: []model.Storage{ + { + Name: "output-storage", + Type: "local", + Path: outputDir, + }, + }, + } + + // Save the preparation + err = db.Create(&prep).Error + require.NoError(t, err) + + // 3. Create the source attachment + var sourceAttachment model.SourceAttachment + err = db.Preload("Storage").Preload("Preparation"). + Where("preparation_id = ? AND storage_id = ?", prep.ID, prep.SourceStorages[0].ID). + First(&sourceAttachment).Error + require.NoError(t, err) + + // 4. Run the scan job to discover files and create pack jobs + err = db.Create(&model.Directory{ + AttachmentID: sourceAttachment.ID, + Name: "", // Root directory has empty name + ParentID: nil, + }).Error + require.NoError(t, err) + + // Run the scan + t.Logf("Running scan job") + err = scan.Scan(ctx, db, sourceAttachment) + require.NoError(t, err) + + // 5. Verify scan created appropriate jobs + var packJobs []model.Job + err = db.Where("type = ? AND state = ?", model.Pack, model.Ready).Find(&packJobs).Error + require.NoError(t, err) + + // We should have multiple pack jobs due to the file size and max size setting + require.Greater(t, len(packJobs), 2, "Scan should have created multiple pack jobs") + + for i := range packJobs { + t.Logf("Pack job %d created", i+1) + } + + // 6. Run all pack jobs and collect CAR files for verification + carSizes := make(map[int64]int64) + + for _, job := range packJobs { + // Load the full job with attachments - important to preload OutputStorages + err = db.Preload("Attachment.Preparation.OutputStorages").Preload("Attachment.Storage"). + Preload("FileRanges.File").Where("id = ?", job.ID).First(&job).Error + require.NoError(t, err) + + // Execute the pack job + car, err := Pack(ctx, db, job) + require.NoError(t, err) + + // Log job and car details + fileRangeInfo := "" + if len(job.FileRanges) > 0 { + fileRangeInfo = fmt.Sprintf(", range length: %d", job.FileRanges[0].Length) + } + t.Logf("Packed job ID %d, created car with piece size: %d, file size: %d%s", + job.ID, car.PieceSize, car.FileSize, fileRangeInfo) + + // Record car sizes for later verification + carSizes[car.PieceSize] = car.FileSize + + // Update job state + err = db.Model(&model.Job{}).Where("id = ?", job.ID).Update("state", model.Complete).Error + require.NoError(t, err) + } + + // 7. Verify the resulting Cars + var cars []model.Car + err = db.Find(&cars).Error + require.NoError(t, err) + + // Find all CAR files in the output directory + outputDirFiles, err := os.ReadDir(outputDir) + require.NoError(t, err) + + // Collect CAR file paths for verification + var carFilePaths []string + for _, file := range outputDirFiles { + if !file.IsDir() && strings.HasSuffix(file.Name(), ".car") { + carFilePaths = append(carFilePaths, filepath.Join(outputDir, file.Name())) + } + } + + require.NotEmpty(t, carFilePaths, "Should have CAR files in the output directory") + t.Logf("Found %d CAR files in the output directory", len(carFilePaths)) + + // Verify we have the expected number of cars matching our jobs + require.Equal(t, len(packJobs), len(cars), "Should have one car per pack job") + require.Equal(t, len(packJobs), len(carFilePaths), "Should have one CAR file per pack job") + + // Count cars by piece size + fullSizePieceCount := 0 // 2 MiB or 4 MiB + halfSizePieceCount := 0 // 1 MiB + otherSizePieceCount := 0 // Anything else + + for _, car := range cars { + t.Logf("Car has piece size: %d, file size: %d", car.PieceSize, car.FileSize) + + if car.PieceSize == pieceSize || car.PieceSize == pieceSize*2 { + // Full-sized piece (2 MiB or 4 MiB) + fullSizePieceCount++ + require.Greater(t, car.FileSize, int64(0), "Car file size should be greater than 0") + } else if car.PieceSize == minPieceSize { + // Piece padded to min piece size (1 MiB) + halfSizePieceCount++ + require.Greater(t, car.FileSize, int64(0), "Car file size should be greater than 0") + } else { + t.Logf("Found car with unexpected piece size: %d", car.PieceSize) + otherSizePieceCount++ + } + } + + // Verify we have the expected types of pieces + require.Equal(t, 0, otherSizePieceCount, "Should not have any cars with unexpected piece sizes") + require.Equal(t, fullSizePieceCount+halfSizePieceCount, len(packJobs), "Should have exactly one car per pack job") + + // At least one piece should be padded to min piece size (last piece) + require.GreaterOrEqual(t, halfSizePieceCount, 1, "Should have at least 1 car padded to min piece size") + + // 8. Verify that file ranges have valid CIDs + var fileRanges []model.FileRange + err = db.Find(&fileRanges).Error + require.NoError(t, err) + require.Greater(t, len(fileRanges), 0, "Should have at least one file range") + + // Verify that all file ranges have CIDs + for _, fileRange := range fileRanges { + require.NotEqual(t, cid.Undef, cid.Cid(fileRange.CID), "File range should have a valid CID") + } + + // 9. Verify CAR file format using go-car's verification + for _, carFilePath := range carFilePaths { + // Verify the CAR file format + reader, err := car.OpenReader(carFilePath) + require.NoError(t, err, "Should be able to open CAR file %s", carFilePath) + defer reader.Close() + + // Verify the CAR has roots + roots, err := reader.Roots() + require.NoError(t, err, "Should be able to read CAR roots") + require.NotEmpty(t, roots, "CAR file should have at least one root") + + // Read all blocks to verify integrity + rd, err := os.Open(carFilePath) + require.NoError(t, err) + defer rd.Close() + + blockReader, err := car.NewBlockReader(rd) + require.NoError(t, err, "Should be able to create block reader") + + blockCount := 0 + for { + block, err := blockReader.Next() + if err == io.EOF { + break + } + require.NoError(t, err, "Should be able to read all blocks") + require.NotNil(t, block, "Block should not be nil") + require.NotEqual(t, cid.Undef, block.Cid(), "Block should have valid CID") + blockCount++ + } + + require.Greater(t, blockCount, 0, "CAR file should contain at least one block") + t.Logf("Verified CAR file %s: found %d blocks", filepath.Base(carFilePath), blockCount) + } + }) +} + +func TestLastPieceBehaviorE2EInline(t *testing.T) { + // This is an end-to-end test that verifies the last piece behavior with inline CARs by: + // 1. Creating a dataset with a file that will be split across multiple pieces + // 2. Using scan to automatically create pack jobs + // 3. Running those pack jobs with NoInline:false (so CAR data is stored in database) + // 4. Verifying the resulting pieces have the expected properties + + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + // Setup: Create temporary directories for source and output + sourceDir := t.TempDir() + outputDir := t.TempDir() + + // 1. Create test files with known sizes + // Create a large random file that will be split into multiple pieces + // Each piece will exercise different padding behavior + testFileSize := 4_200_000 // ~4.2 MB - will be split into multiple pieces + + // Create the test file with random data + err := os.WriteFile(filepath.Join(sourceDir, "large_file.bin"), + testutil.GenerateRandomBytes(testFileSize), 0644) + require.NoError(t, err) + + // 2. Create a preparation with specific settings + pieceSize := int64(2 * 1024 * 1024) // 2 MiB target piece size + minPieceSize := int64(1 * 1024 * 1024) // 1 MiB min piece size + maxSize := pieceSize / 3 // Set max size to ensure we get multiple pieces from our file + + prep := model.Preparation{ + Name: "test-preparation", + MaxSize: maxSize, // Each job will have at most maxSize bytes (forcing splitting) + PieceSize: pieceSize, // Target piece size + MinPieceSize: minPieceSize, // Minimum piece size + NoInline: false, // Use inline preparation (CAR data stored in database) + SourceStorages: []model.Storage{ + { + Name: "source-storage", + Type: "local", + Path: sourceDir, + }, + }, + OutputStorages: []model.Storage{ + { + Name: "output-storage", + Type: "local", + Path: outputDir, + }, + }, + } + + // Save the preparation + err = db.Create(&prep).Error + require.NoError(t, err) + + // 3. Create the source attachment + var sourceAttachment model.SourceAttachment + err = db.Preload("Storage").Preload("Preparation"). + Where("preparation_id = ? AND storage_id = ?", prep.ID, prep.SourceStorages[0].ID). + First(&sourceAttachment).Error + require.NoError(t, err) + + // 4. Run the scan job to discover files and create pack jobs + err = db.Create(&model.Directory{ + AttachmentID: sourceAttachment.ID, + Name: "", // Root directory has empty name + ParentID: nil, + }).Error + require.NoError(t, err) + + // Run the scan + t.Logf("Running scan job") + err = scan.Scan(ctx, db, sourceAttachment) + require.NoError(t, err) + + // 5. Verify scan created appropriate jobs + var packJobs []model.Job + err = db.Where("type = ? AND state = ?", model.Pack, model.Ready).Find(&packJobs).Error + require.NoError(t, err) + + // We should have multiple pack jobs due to the file size and max size setting + require.Greater(t, len(packJobs), 2, "Scan should have created multiple pack jobs") + + for i := range packJobs { + t.Logf("Pack job %d created", i+1) + } + + // 6. Run all pack jobs and collect CAR files for verification + carSizes := make(map[int64]int64) + + for _, job := range packJobs { + // Load the full job with attachments + err = db.Preload("Attachment.Preparation").Preload("Attachment.Storage"). + Preload("FileRanges.File").Where("id = ?", job.ID).First(&job).Error + require.NoError(t, err) + + // Execute the pack job + car, err := Pack(ctx, db, job) + require.NoError(t, err) + + // Log job and car details + fileRangeInfo := "" + if len(job.FileRanges) > 0 { + fileRangeInfo = fmt.Sprintf(", range length: %d", job.FileRanges[0].Length) + } + t.Logf("Packed job ID %d, created car with piece size: %d, file size: %d%s", + job.ID, car.PieceSize, car.FileSize, fileRangeInfo) + + // Record car sizes for later verification + carSizes[car.PieceSize] = car.FileSize + + // Update job state + err = db.Model(&model.Job{}).Where("id = ?", job.ID).Update("state", model.Complete).Error + require.NoError(t, err) + } + + // 7. Verify the resulting Cars + var cars []model.Car + err = db.Find(&cars).Error + require.NoError(t, err) + + // For inline preparation, no CAR files should be in the output directory + outputDirFiles, err := os.ReadDir(outputDir) + require.NoError(t, err) + + carFileCount := 0 + for _, file := range outputDirFiles { + if !file.IsDir() && strings.HasSuffix(file.Name(), ".car") { + carFileCount++ + } + } + + require.Equal(t, 0, carFileCount, "Should not have CAR files on disk for inline preparation") + + // Count cars by piece size + fullSizePieceCount := 0 // 2 MiB or 4 MiB + halfSizePieceCount := 0 // 1 MiB + otherSizePieceCount := 0 // Anything else + + for _, car := range cars { + t.Logf("Car has piece size: %d, file size: %d", car.PieceSize, car.FileSize) + + if car.PieceSize == pieceSize || car.PieceSize == pieceSize*2 { + // Full-sized piece (2 MiB or 4 MiB) + fullSizePieceCount++ + require.Greater(t, car.FileSize, int64(0), "Car file size should be greater than 0") + // For inline preparation, cars should exist in database but not have file paths + require.Empty(t, car.StoragePath, "Car storage path should be empty for inline preparation") + } else if car.PieceSize == minPieceSize { + // Piece padded to min piece size (1 MiB) + halfSizePieceCount++ + require.Greater(t, car.FileSize, int64(0), "Car file size should be greater than 0") + require.Empty(t, car.StoragePath, "Car storage path should be empty for inline preparation") + } else { + t.Logf("Found car with unexpected piece size: %d", car.PieceSize) + otherSizePieceCount++ + } + } + + // Verify we have the expected types of pieces + require.Equal(t, 0, otherSizePieceCount, "Should not have any cars with unexpected piece sizes") + require.Equal(t, fullSizePieceCount+halfSizePieceCount, len(packJobs), "Should have exactly one car per pack job") + + // At least one piece should be padded to min piece size (last piece) + require.GreaterOrEqual(t, halfSizePieceCount, 1, "Should have at least 1 car padded to min piece size") + + // 8. Verify that file ranges have valid CIDs + var fileRanges []model.FileRange + err = db.Find(&fileRanges).Error + require.NoError(t, err) + require.Greater(t, len(fileRanges), 0, "Should have at least one file range") + + // Verify that all file ranges have CIDs + for _, fileRange := range fileRanges { + require.NotEqual(t, cid.Undef, cid.Cid(fileRange.CID), "File range should have a valid CID") + } + }) +} diff --git a/pack/pack.go b/pack/pack.go index 53355590..fec28daa 100644 --- a/pack/pack.go +++ b/pack/pack.go @@ -55,8 +55,6 @@ func GetCommp(calc *commp.Calc, targetPieceSize uint64) (cid.Cid, uint64, error) } rawPieceSize = targetPieceSize - } else if rawPieceSize > targetPieceSize { - logger.Warn("piece size is larger than the target piece size") } commCid, err := commcid.DataCommitmentV1ToCID(rawCommp) @@ -88,7 +86,7 @@ func Pack( job model.Job, ) (*model.Car, error) { db = db.WithContext(ctx) - pieceSize := job.Attachment.Preparation.PieceSize + pieceSize := job.Attachment.Preparation.GetMinPieceSize() // storageWriter can be nil for inline preparation storageID, storageWriter, err := storagesystem.GetRandomOutputWriter(ctx, job.Attachment.Preparation.OutputStorages) if err != nil { @@ -169,6 +167,7 @@ func Pack( AttachmentID: &job.AttachmentID, PreparationID: job.Attachment.PreparationID, JobID: &job.ID, + PieceType: model.DataPiece, } // Update all Files and FileRanges that have size == -1 diff --git a/pack/pack_test.go b/pack/pack_test.go index 8ca70412..d75339ac 100644 --- a/pack/pack_test.go +++ b/pack/pack_test.go @@ -6,11 +6,12 @@ import ( "path/filepath" "testing" - "github.com/data-preservation-programs/singularity/model" - "github.com/data-preservation-programs/singularity/util/testutil" "github.com/gotidy/ptr" "github.com/stretchr/testify/require" "gorm.io/gorm" + + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" ) func TestAssembleCar(t *testing.T) { @@ -224,3 +225,259 @@ func TestAssembleCar(t *testing.T) { }) } } + +func TestLastPiecePadding(t *testing.T) { + // Test last piece padding scenarios + tmp := t.TempDir() + out := t.TempDir() + + // Create a file that's smaller than min piece size for testing + smallSize := 500_000 // 500 KB + err := os.WriteFile(filepath.Join(tmp, "small.txt"), testutil.GenerateRandomBytes(smallSize), 0644) + require.NoError(t, err) + smallStat, err := os.Stat(filepath.Join(tmp, "small.txt")) + require.NoError(t, err) + + // Create a file that's larger than min piece size for testing + largeSize := 1_500_000 // 1.5 MB (larger than min piece size of 1 MiB) + err = os.WriteFile(filepath.Join(tmp, "medium.txt"), testutil.GenerateRandomBytes(largeSize), 0644) + require.NoError(t, err) + mediumStat, err := os.Stat(filepath.Join(tmp, "medium.txt")) + require.NoError(t, err) + + tests := []struct { + name string + pieceSize int64 + minPieceSize int64 + fileSize int64 + expectedPieceSize int64 + expectedFileSize int64 + expectedFileRanges int + expectedFileRangeLen int64 + }{ + { + name: "last piece smaller than min piece size gets padded to min piece size", + pieceSize: 1 << 21, // 2 MiB piece size + minPieceSize: 1 << 20, // 1 MiB min piece size + fileSize: int64(smallSize), // 500 KB file + expectedPieceSize: 1 << 20, // Expected to be padded to 1 MiB (min piece size) + expectedFileSize: 500098, // Based on actual test results + expectedFileRanges: 1, + expectedFileRangeLen: int64(smallSize), + }, + { + name: "last piece larger than min piece size gets padded to next power of two", + pieceSize: 1 << 21, // 2 MiB piece size + minPieceSize: 1 << 20, // 1 MiB min piece size + fileSize: int64(largeSize), // 1.5 MB file + expectedPieceSize: 1 << 21, // Expected to be padded to 2 MiB (next power of 2) + expectedFileSize: 1500283, // Based on actual test results + expectedFileRanges: 1, + expectedFileRangeLen: int64(largeSize), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + // Create job with appropriate file + filePath := "small.txt" + fileStat := smallStat + if tc.fileSize > 1_000_000 { + filePath = "medium.txt" + fileStat = mediumStat + } + + job := model.Job{ + Type: model.Pack, + State: model.Processing, + Attachment: &model.SourceAttachment{ + Preparation: &model.Preparation{ + MaxSize: tc.fileSize + 1000, // Buffer + PieceSize: tc.pieceSize, + MinPieceSize: tc.minPieceSize, + OutputStorages: []model.Storage{ + { + Name: "out", + Type: "local", + Path: out, + }, + }, + }, + Storage: &model.Storage{ + Type: "local", + Path: tmp, + }, + }, + FileRanges: []model.FileRange{ + { + Offset: 0, + Length: tc.fileSize, + File: &model.File{ + Path: filePath, + Size: tc.fileSize, + LastModifiedNano: fileStat.ModTime().UnixNano(), + AttachmentID: 1, + Directory: &model.Directory{ + AttachmentID: 1, + }, + }, + }, + }, + } + + // Create and execute the packing job + err := db.Create(&job).Error + require.NoError(t, err) + car, err := Pack(ctx, db, job) + require.NoError(t, err) + + // Verify the car was created successfully + require.NotNil(t, car) + + // Log the actual file size for debugging + t.Logf("Test case: %s, Expected piece size: %d, Actual piece size: %d, Expected file size: %d, Actual file size: %d", + tc.name, tc.expectedPieceSize, car.PieceSize, tc.expectedFileSize, car.FileSize) + + // Verify the piece size is correct (should match our expected padded size) + require.Equal(t, tc.expectedPieceSize, car.PieceSize, + "Piece size should be padded to expected value") + + // Verify exact file size for regression testing + require.Equal(t, tc.expectedFileSize, car.FileSize, + "CAR file size should match expected value exactly") + + // Verify correct number of file ranges + var fileRanges []model.FileRange + err = db.Find(&fileRanges).Error + require.NoError(t, err) + require.Len(t, fileRanges, tc.expectedFileRanges) + require.Equal(t, tc.expectedFileRangeLen, fileRanges[0].Length) + }) + }) + } +} + +func TestMultiplePiecesWithLastPiece(t *testing.T) { + // Test pieces with different sizes and verify the padding behavior + tmp := t.TempDir() + out := t.TempDir() + + pieceSize := int64(1 << 20) // 1 MiB piece size + + // Create test files of different sizes + smallSize := 500_000 // 500 KB (smaller than min piece size of 1 MiB) + err := os.WriteFile(filepath.Join(tmp, "small.txt"), testutil.GenerateRandomBytes(smallSize), 0644) + require.NoError(t, err) + smallStat, err := os.Stat(filepath.Join(tmp, "small.txt")) + require.NoError(t, err) + + mediumSize := 1_500_000 // 1.5 MB (larger than min piece size but smaller than piece size) + err = os.WriteFile(filepath.Join(tmp, "medium.txt"), testutil.GenerateRandomBytes(mediumSize), 0644) + require.NoError(t, err) + mediumStat, err := os.Stat(filepath.Join(tmp, "medium.txt")) + require.NoError(t, err) + + // Test cases + tests := []struct { + name string + filePath string + fileStat os.FileInfo + fileSize int64 + pieceSize int64 // Target piece size + minPieceSize int64 // Minimum piece size + expectedPieceSize int64 // Expected final piece size after padding + }{ + { + name: "file smaller than min piece size gets padded to min piece size", + filePath: "small.txt", + fileStat: smallStat, + fileSize: int64(smallSize), + pieceSize: pieceSize, // 1 MiB target + minPieceSize: pieceSize / 2, // 512 KiB min + expectedPieceSize: pieceSize / 2, // Padded to 512 KiB (min piece size) + }, + { + name: "file larger than min piece size gets padded to next power of two", + filePath: "medium.txt", + fileStat: mediumStat, + fileSize: int64(mediumSize), + pieceSize: pieceSize, // 1 MiB target + minPieceSize: pieceSize / 4, // 256 KiB min + expectedPieceSize: pieceSize * 2, // Padded to 2 MiB (next power of 2) + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + // Create job with the test file + job := model.Job{ + Type: model.Pack, + State: model.Processing, + Attachment: &model.SourceAttachment{ + Preparation: &model.Preparation{ + MaxSize: tc.fileSize + 1000, // Buffer + PieceSize: tc.pieceSize, // Target piece size + MinPieceSize: tc.minPieceSize, // Min piece size + OutputStorages: []model.Storage{ + { + Name: "out", + Type: "local", + Path: out, + }, + }, + }, + Storage: &model.Storage{ + Type: "local", + Path: tmp, + }, + }, + FileRanges: []model.FileRange{ + { + Offset: 0, + Length: tc.fileSize, + File: &model.File{ + Path: tc.filePath, + Size: tc.fileSize, + LastModifiedNano: tc.fileStat.ModTime().UnixNano(), + AttachmentID: 1, + Directory: &model.Directory{ + AttachmentID: 1, + }, + }, + }, + }, + } + + // Create and execute the packing job + err := db.Create(&job).Error + require.NoError(t, err) + car, err := Pack(ctx, db, job) + require.NoError(t, err) + + // Verify the car was created successfully + require.NotNil(t, car) + + // Verify the piece size is correct (should match our expected padded size) + require.Equal(t, tc.expectedPieceSize, car.PieceSize, + "Piece size should be padded to expected value") + + // Verify the actual file size is reasonable (specific bytes may vary slightly) + // The CAR file size should be at least as large as the input file + some overhead + require.GreaterOrEqual(t, car.FileSize, tc.fileSize, + "CAR file size should be at least as large as the input file") + // And shouldn't be much larger than the file size + overhead + require.LessOrEqual(t, car.FileSize, tc.fileSize+1000, + "CAR file size shouldn't be excessively larger than the input file") + + // Verify correct number of file ranges + var fileRanges []model.FileRange + err = db.Find(&fileRanges).Error + require.NoError(t, err) + require.Len(t, fileRanges, 1) + require.Equal(t, tc.fileSize, fileRanges[0].Length) + }) + }) + } +} diff --git a/service/contentprovider/http_test.go b/service/contentprovider/http_test.go index b6185d1c..50508cd5 100644 --- a/service/contentprovider/http_test.go +++ b/service/contentprovider/http_test.go @@ -2,6 +2,7 @@ package contentprovider import ( "context" + "encoding/json" "net/http" "net/http/httptest" "os" @@ -63,6 +64,7 @@ func TestHTTPServerHandler(t *testing.T) { FileSize: 59 + 1 + 36 + 5, StoragePath: "", PreparationID: 1, + PieceType: model.DataPiece, Attachment: &model.SourceAttachment{ Preparation: &model.Preparation{}, Storage: &model.Storage{ @@ -134,6 +136,14 @@ func TestHTTPServerHandler(t *testing.T) { if test.cbor { require.Equal(t, "application/cbor", rec.Header().Get(echo.HeaderContentType)) } + + // For successful responses, validate the piece_type field + if test.code == http.StatusOK && !test.cbor { + var metadata PieceMetadata + err = json.Unmarshal(rec.Body.Bytes(), &metadata) + require.NoError(t, err) + require.Equal(t, model.DataPiece, metadata.Car.PieceType) + } }) t.Run(test.name, func(t *testing.T) { @@ -149,6 +159,50 @@ func TestHTTPServerHandler(t *testing.T) { }) } + // Test DAG piece type + t.Run("dag_piece_metadata", func(t *testing.T) { + preparation := &model.Preparation{Name: "test_prep_dag"} + err := db.Create(preparation).Error + require.NoError(t, err) + + storage := &model.Storage{Name: "test_storage_dag", Type: "local"} + err = db.Create(storage).Error + require.NoError(t, err) + + attachment := &model.SourceAttachment{ + PreparationID: preparation.ID, + StorageID: storage.ID, + } + err = db.Create(attachment).Error + require.NoError(t, err) + + dagPieceCID := cid.NewCidV1(cid.FilCommitmentUnsealed, util.Hash([]byte("dag_test"))) + err = db.Create(&model.Car{ + PieceCID: model.CID(dagPieceCID), + PieceSize: 256, + PreparationID: preparation.ID, + PieceType: model.DagPiece, + AttachmentID: &attachment.ID, + RootCID: model.CID(testutil.TestCid), + }).Error + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodGet, "/piece/metadata/:id", nil) + rec := httptest.NewRecorder() + c := e.NewContext(req, rec) + c.SetPath("/piece/metadata/:id") + c.SetParamNames("id") + c.SetParamValues(dagPieceCID.String()) + err = s.getMetadataHandler(c) + require.NoError(t, err) + require.Equal(t, http.StatusOK, rec.Code) + + var metadata PieceMetadata + err = json.Unmarshal(rec.Body.Bytes(), &metadata) + require.NoError(t, err) + require.Equal(t, model.DagPiece, metadata.Car.PieceType) + }) + // Add car file tmp := t.TempDir() err = db.Model(&model.Car{}).Where("id = ?", 1).Update("file_path", filepath.Join(tmp, "test.car")).Error diff --git a/service/datasetworker/daggen.go b/service/datasetworker/daggen.go index bbf6b474..e76494d6 100644 --- a/service/datasetworker/daggen.go +++ b/service/datasetworker/daggen.go @@ -189,7 +189,7 @@ func (w *Thread) ExportDag(ctx context.Context, job model.Job) error { } db := w.dbNoContext.WithContext(ctx) - pieceSize := job.Attachment.Preparation.PieceSize + pieceSize := job.Attachment.Preparation.GetMinPieceSize() // storageWriter can be nil for inline preparation storageID, storageWriter, err := storagesystem.GetRandomOutputWriter(ctx, job.Attachment.Preparation.OutputStorages) if err != nil { @@ -255,6 +255,7 @@ func (w *Thread) ExportDag(ctx context.Context, job model.Job) error { StoragePath: filename, AttachmentID: &job.AttachmentID, PreparationID: job.Attachment.PreparationID, + PieceType: model.DagPiece, } err = database.DoRetry(ctx, func() error { diff --git a/util/testutil/testutils.go b/util/testutil/testutils.go index 10881e82..064fdbe5 100644 --- a/util/testutil/testutils.go +++ b/util/testutil/testutils.go @@ -117,7 +117,7 @@ func OneWithoutReset(t *testing.T, testFunc func(ctx context.Context, t *testing backend := SupportedTestDialects[0] db, closer, connStr := getTestDB(t, backend) if db == nil { - t.Log("Skip " + backend) + t.Skip("Skip " + backend + " - database not available") return } defer closer.Close() @@ -135,7 +135,7 @@ func doOne(t *testing.T, backend string, testFunc func(ctx context.Context, t *t t.Helper() db, closer, connStr := getTestDB(t, backend) if db == nil { - t.Log("Skip " + backend) + t.Skip("Skip " + backend + " - database not available") return } defer closer.Close() diff --git a/util/util.go b/util/util.go index d3bfecd7..dddfec4f 100644 --- a/util/util.go +++ b/util/util.go @@ -42,6 +42,14 @@ func NextPowerOfTwo(x uint64) uint64 { return 1 << pos } +// IsPowerOfTwo returns true if x is a power of two. +func IsPowerOfTwo(x uint64) bool { + if x == 0 { + return false + } + return (x & (x - 1)) == 0 +} + // NewLotusClient is a function that creates a new JSON-RPC client for interacting with a Lotus node. // It takes the Lotus API endpoint and an optional Lotus token as input. // If the Lotus token is provided, it is included in the 'Authorization' header of the JSON-RPC requests. diff --git a/version.json b/version.json index 16e19ce2..b2ca1e59 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.5.17-RC1" + "version": "v0.6.0-RC1" }