diff --git a/puffin/puffin.go b/puffin/puffin.go new file mode 100644 index 000000000..3482080a2 --- /dev/null +++ b/puffin/puffin.go @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Package puffin provides reading and writing of Puffin files. +// +// Puffin is a file format designed to store statistics and indexes +// for Iceberg tables. A Puffin file contains blobs (opaque byte sequences) +// with associated metadata, such as Apache DataSketches or deletion vectors. +// +// File structure: +// +// [Magic] [Blob]* [Magic] [Footer Payload] [Footer Payload Size] [Flags] [Magic] +// +// See the specification at https://iceberg.apache.org/puffin-spec/ +package puffin + +var magic = [4]byte{'P', 'F', 'A', '1'} + +const ( + //[Magic] [FooterPayload] [FooterPayloadSize] [Flags] [Magic] + // MagicSize is the number of bytes in the magic marker. + MagicSize = 4 + + // footerTrailerSize accounts for footer length (4)+ flags (4) + trailing magic (4). + footerTrailerSize = 12 + + // FooterFlagCompressed indicates a compressed footer; unsupported in this implementation. + FooterFlagCompressed = 1 // bit 0 + + // Prevents OOM + // DefaultMaxBlobSize is the maximum blob size allowed when reading (256 MB). + // Override with WithMaxBlobSize when creating a reader. + DefaultMaxBlobSize = 256 << 20 + + // CreatedBy is a human-readable identification of the application writing the file, along with its version. + // Example: "Trino version 381". + CreatedBy = "created-by" +) + +type BlobMetadata struct { + Type BlobType `json:"type"` + SnapshotID int64 `json:"snapshot-id"` + SequenceNumber int64 `json:"sequence-number"` + Fields []int32 `json:"fields"` + Offset int64 `json:"offset"` + Length int64 `json:"length"` + CompressionCodec *string `json:"compression-codec,omitempty"` + Properties map[string]string `json:"properties,omitempty"` +} + +// Footer describes the blobs and file-level properties stored in a Puffin file. +type Footer struct { + Blobs []BlobMetadata `json:"blobs"` + Properties map[string]string `json:"properties,omitempty"` +} + +type BlobType string + +const ( + // BlobTypeDataSketchesTheta is a serialized compact Theta sketch + // produced by the Apache DataSketches library. + BlobTypeDataSketchesTheta BlobType = "apache-datasketches-theta-v1" + + // BlobTypeDeletionVector is a serialized deletion vector per the + // Iceberg spec. Requires snapshot-id and sequence-number to be -1. + BlobTypeDeletionVector BlobType = "deletion-vector-v1" +) diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go new file mode 100644 index 000000000..5064e1648 --- /dev/null +++ b/puffin/puffin_reader.go @@ -0,0 +1,376 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// ReaderAtSeeker combines io.ReaderAt and io.Seeker for reading Puffin files. +// This interface is implemented by *os.File, *bytes.Reader, and similar types. +type ReaderAtSeeker interface { + io.ReaderAt + io.Seeker +} + +// Reader reads blobs and metadata from a Puffin file. +// +// Usage: +// +// r, err := puffin.NewReader(file) +// if err != nil { +// return err +// } +// for i := range r.Footer().Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type Reader struct { + r ReaderAtSeeker + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a Reader. +type ReaderOption func(*Reader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *Reader) { + r.maxBlobSize = size + } +} + +// NewReader creates a new Puffin file reader. +// The file size is auto-detected using Seek. +// It validates magic bytes and reads the footer eagerly. +// The caller is responsible for closing the underlying reader. +func NewReader(r ReaderAtSeeker, opts ...ReaderOption) (*Reader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Auto-detect file size + size, err := r.Seek(0, io.SeekEnd) + if err != nil { + return nil, fmt.Errorf("puffin: detect file size: %w", err) + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if !bytes.Equal(headerMagic[:], magic[:]) { + return nil, errors.New("puffin: invalid header magic") + } + + pr := &Reader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + // Read footer + if _, err := pr.readFooter(); err != nil { + return nil, err + } + + return pr, nil +} + +// Footer returns the parsed footer metadata. +// The footer is read during NewReader, so this always returns a valid footer. +func (r *Reader) Footer() *Footer { + return r.footer +} + +// defaultFooterReadSize is the initial read size when reading the footer. +// We read more than strictly needed to hopefully get the entire footer +// in one read, reducing round-trips on cloud object storage. +const defaultFooterReadSize = 8 * 1024 // 8 KB + +// readFooter reads and parses the footer from the Puffin file. +func (r *Reader) readFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read a larger chunk from the end to minimize round-trips on cloud storage. + // This often captures the entire footer in one read. + readSize := min(int64(defaultFooterReadSize), r.size) + buf := make([]byte, readSize) + if _, err := r.r.ReadAt(buf, r.size-readSize); err != nil { + return nil, fmt.Errorf("puffin: read footer region: %w", err) + } + + // Parse trailer from end of buffer: PayloadSize(4) + Flags(4) + Magic(4) + trailer := buf[len(buf)-footerTrailerSize:] + + // Validate trailing magic + if !bytes.Equal(trailer[8:12], magic[:]) { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Total footer size: magic(4) + payload + trailer(12) + totalFooterSize := MagicSize + payloadSize + footerTrailerSize + + // Validate footer start magic + if totalFooterSize <= readSize { + // We already have the footer magic in buf + footerOffset := len(buf) - int(totalFooterSize) + if !bytes.Equal(buf[footerOffset:footerOffset+MagicSize], magic[:]) { + return nil, errors.New("puffin: invalid footer start magic") + } + } else { + // Footer is larger than our initial read, need to read magic separately + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if !bytes.Equal(footerMagic[:], magic[:]) { + return nil, errors.New("puffin: invalid footer start magic") + } + } + + payloadReader := io.NewSectionReader(r.r, footerStart+MagicSize, payloadSize) + var footer Footer + if err := json.NewDecoder(payloadReader).Decode(&footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } + + // Validate blob metadata + if err := r.validateBlobs(footer.Blobs, footerStart); err != nil { + return nil, err + } + + // Cache footer and footerStart + r.footer = &footer + r.footerStart = footerStart + + return r.footer, nil +} + +// ReadBlob reads the content of a specific blob by index. +// The footer is read automatically if not already cached. +func (r *Reader) ReadBlob(index int) (*BlobData, error) { + footer := r.footer + + if index < 0 || index >= len(footer.Blobs) { + return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) + } + + meta := footer.Blobs[index] + data, err := r.readBlobData(meta) + if err != nil { + return nil, err + } + + return &BlobData{Metadata: meta, Data: data}, nil +} + +// ReadBlobByMetadata reads a blob using its metadata directly. +// This is useful when you have metadata from an external source. +func (r *Reader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { + return r.readBlobData(meta) +} + +// readBlobData is the internal implementation for reading blob data. +func (r *Reader) readBlobData(meta BlobMetadata) ([]byte, error) { + // Validate blob type + if meta.Type == "" { + return nil, errors.New("puffin: cannot read blob: type is required") + } + + // Check for compressed blob (unsupported) + if meta.CompressionCodec != nil && *meta.CompressionCodec != "" { + return nil, fmt.Errorf("puffin: cannot read blob: compression %q not supported", *meta.CompressionCodec) + } + + // Validate offset/length + if err := r.validateRange(meta.Offset, meta.Length); err != nil { + return nil, fmt.Errorf("puffin: blob: %w", err) + } + + // Read blob data + data := make([]byte, meta.Length) + if _, err := r.r.ReadAt(data, meta.Offset); err != nil { + return nil, fmt.Errorf("puffin: read blob data: %w", err) + } + + return data, nil +} + +// ReadAllBlobs reads all blobs from the file. +func (r *Reader) ReadAllBlobs() ([]*BlobData, error) { + footer := r.footer + + if len(footer.Blobs) == 0 { + return nil, nil + } + + // Create index mapping to preserve original order + type indexedBlob struct { + index int + meta BlobMetadata + } + indexed := make([]indexedBlob, len(footer.Blobs)) + for i, meta := range footer.Blobs { + indexed[i] = indexedBlob{index: i, meta: meta} + } + + // Sort by offset for sequential I/O + sort.Slice(indexed, func(i, j int) bool { + return indexed[i].meta.Offset < indexed[j].meta.Offset + }) + + // Read blobs in offset order, store in original order + results := make([]*BlobData, len(footer.Blobs)) + for _, ib := range indexed { + data, err := r.readBlobData(ib.meta) + if err != nil { + return nil, fmt.Errorf("puffin: read blob %d: %w", ib.index, err) + } + results[ib.index] = &BlobData{Metadata: ib.meta, Data: data} + } + + return results, nil +} + +// ReadAt implements io.ReaderAt, reading from the blob data region. +// It validates that the read range is within the blob data region +// This is useful for deletion vector use case. +// offset/length pointing directly into the Puffin file in manifest. +func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) { + if err := r.validateRange(off, int64(len(p))); err != nil { + return 0, fmt.Errorf("puffin: %w", err) + } + + return r.r.ReadAt(p, off) +} + +// validateRange validates offset and length for reading from the blob data region. +func (r *Reader) validateRange(offset, length int64) error { + if length < 0 { + return fmt.Errorf("invalid length %d", length) + } + if length > r.maxBlobSize { + return fmt.Errorf("size %d exceeds limit %d", length, r.maxBlobSize) + } + if offset < MagicSize { + return fmt.Errorf("invalid offset %d (before header)", offset) + } + + end := offset + length + if end < offset { + return fmt.Errorf("offset+length overflow: offset=%d length=%d", offset, length) + } + if end > r.footerStart { + return fmt.Errorf("extends into footer: offset=%d length=%d footerStart=%d", + offset, length, r.footerStart) + } + + return nil +} + +// validateBlobs validates all blob metadata entries. +func (r *Reader) validateBlobs(blobs []BlobMetadata, footerStart int64) error { + for i, blob := range blobs { + // Type is required + if blob.Type == "" { + return fmt.Errorf("puffin: blob %d: type is required", i) + } + + // Length must be non-negative + if blob.Length < 0 { + return fmt.Errorf("puffin: blob %d: invalid length %d", i, blob.Length) + } + + // Offset must be after header magic + if blob.Offset < MagicSize { + return fmt.Errorf("puffin: blob %d: offset %d before header", i, blob.Offset) + } + + // Check for overflow + end := blob.Offset + blob.Length + if end < blob.Offset { + return fmt.Errorf("puffin: blob %d: offset+length overflow: offset=%d length=%d", i, blob.Offset, blob.Length) + } + + // Blob must not extend into footer + if end > footerStart { + return fmt.Errorf("puffin: blob %d: extends into footer: offset=%d length=%d footerStart=%d", + i, blob.Offset, blob.Length, footerStart) + } + } + + return nil +} diff --git a/puffin/puffin_test.go b/puffin/puffin_test.go new file mode 100644 index 000000000..f768333f6 --- /dev/null +++ b/puffin/puffin_test.go @@ -0,0 +1,444 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin_test + +import ( + "bytes" + "math" + "testing" + + "github.com/apache/iceberg-go/puffin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- Test Helpers --- + +func newWriter() (*puffin.Writer, *bytes.Buffer) { + buf := &bytes.Buffer{} + w, _ := puffin.NewWriter(buf) + + return w, buf +} + +func newReader(t *testing.T, buf *bytes.Buffer) *puffin.Reader { + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + + return r +} + +func defaultBlobInput() puffin.BlobMetadataInput { + return puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + Fields: []int32{}, + } +} + +func validFile() []byte { + w, buf := newWriter() + w.Finish() + + return buf.Bytes() +} + +func validFileWithBlob() []byte { + w, buf := newWriter() + w.AddBlob(defaultBlobInput(), []byte("test data")) + w.Finish() + + return buf.Bytes() +} + +// --- Tests --- + +// TestRoundTrip verifies that data written by Writer can be read back by Reader. +// This is the core integration test ensuring the puffin format is correctly implemented. +func TestRoundTrip(t *testing.T) { + blob1Data := []byte("theta sketch data here") + blob2Data := []byte("another blob with different content") + + w, buf := newWriter() + w.AddProperties(map[string]string{"test-property": "test-value"}) + + meta1, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 123, + SequenceNumber: 1, + Fields: []int32{1, 2, 3}, + Properties: map[string]string{"ndv": "1000"}, + }, blob1Data) + require.NoError(t, err) + + meta2, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, + SnapshotID: -1, + SequenceNumber: -1, + Fields: []int32{}, + Properties: map[string]string{"referenced-data-file": "data/file.parquet"}, + }, blob2Data) + require.NoError(t, err) + require.NoError(t, w.Finish()) + + r := newReader(t, buf) + footer := r.Footer() + + assert.Len(t, footer.Blobs, 2) + assert.Equal(t, "test-value", footer.Properties["test-property"]) + assert.Contains(t, footer.Properties[puffin.CreatedBy], "iceberg-go") + + // Verify blob 1 + assert.Equal(t, puffin.BlobTypeDataSketchesTheta, footer.Blobs[0].Type) + assert.Equal(t, int64(123), footer.Blobs[0].SnapshotID) + assert.Equal(t, int64(1), footer.Blobs[0].SequenceNumber) + assert.Equal(t, []int32{1, 2, 3}, footer.Blobs[0].Fields) + assert.Equal(t, meta1.Offset, footer.Blobs[0].Offset) + assert.Equal(t, meta1.Length, footer.Blobs[0].Length) + assert.Equal(t, "1000", footer.Blobs[0].Properties["ndv"]) + + // Verify blob 2 + assert.Equal(t, puffin.BlobTypeDeletionVector, footer.Blobs[1].Type) + assert.Equal(t, int64(-1), footer.Blobs[1].SnapshotID) + assert.Equal(t, int64(-1), footer.Blobs[1].SequenceNumber) + assert.Equal(t, meta2.Offset, footer.Blobs[1].Offset) + assert.Equal(t, meta2.Length, footer.Blobs[1].Length) + + // Verify data + blobData1, _ := r.ReadBlob(0) + assert.Equal(t, blob1Data, blobData1.Data) + + blobData2, _ := r.ReadBlob(1) + assert.Equal(t, blob2Data, blobData2.Data) + + allBlobs, _ := r.ReadAllBlobs() + assert.Len(t, allBlobs, 2) + assert.Equal(t, blob1Data, allBlobs[0].Data) + assert.Equal(t, blob2Data, allBlobs[1].Data) +} + +// TestEmptyFile verifies that a puffin file with no blobs is valid. +// Empty files are valid per spec and used when no statistics exist yet. +func TestEmptyFile(t *testing.T) { + w, buf := newWriter() + require.NoError(t, w.Finish()) + + r := newReader(t, buf) + assert.Len(t, r.Footer().Blobs, 0) + assert.Contains(t, r.Footer().Properties[puffin.CreatedBy], "iceberg-go") + + blobs, err := r.ReadAllBlobs() + require.NoError(t, err) + assert.Nil(t, blobs) +} + +// TestEmptyBlobData verifies that a zero-length blob is valid. +// Some blob types may legitimately have empty content. +func TestEmptyBlobData(t *testing.T) { + w, buf := newWriter() + meta, err := w.AddBlob(defaultBlobInput(), []byte{}) + require.NoError(t, err) + assert.Equal(t, int64(0), meta.Length) + w.Finish() + + r := newReader(t, buf) + blob, _ := r.ReadBlob(0) + assert.Empty(t, blob.Data) +} + +// TestLargeFooter verifies reading works when footer exceeds initial 8KB read buffer. +// This exercises the code path where footer requires a second read from storage. +func TestLargeFooter(t *testing.T) { + w, buf := newWriter() + numBlobs := 200 + for i := range numBlobs { + w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: int64(i), + Fields: []int32{1, 2, 3, 4, 5}, + Properties: map[string]string{"key": "value-to-increase-footer-size"}, + }, []byte("blob")) + } + w.Finish() + + r := newReader(t, buf) + assert.Len(t, r.Footer().Blobs, numBlobs) +} + +// TestWriterValidation verifies that Writer rejects invalid input. +// Proper validation prevents corrupt files and provides clear error messages. +func TestWriterValidation(t *testing.T) { + // nil writer: Ensures graceful failure when no underlying writer is provided. + t.Run("nil writer", func(t *testing.T) { + _, err := puffin.NewWriter(nil) + assert.ErrorContains(t, err, "nil") + }) + + // missing type: Blob type is required per spec to identify the blob format. + t.Run("missing type", func(t *testing.T) { + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{Type: "", Fields: []int32{}}, []byte("x")) + assert.ErrorContains(t, err, "type") + }) + + // nil fields: Fields slice must be non-nil per spec (empty slice is valid). + t.Run("nil fields", func(t *testing.T) { + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{Type: puffin.BlobTypeDataSketchesTheta, Fields: nil}, []byte("x")) + assert.ErrorContains(t, err, "fields") + }) + + // add blob after finish: Enforces writer state machine - no writes after finalization. + t.Run("add blob after finish", func(t *testing.T) { + w, _ := newWriter() + w.Finish() + _, err := w.AddBlob(defaultBlobInput(), []byte("x")) + assert.ErrorContains(t, err, "finalized") + }) + + // double finish: Prevents accidental double-write of footer corrupting the file. + t.Run("double finish", func(t *testing.T) { + w, _ := newWriter() + w.Finish() + assert.ErrorContains(t, w.Finish(), "finalized") + }) + + // deletion vector invalid snapshot-id: Spec requires snapshot-id=-1 for deletion vectors. + t.Run("deletion vector invalid snapshot-id", func(t *testing.T) { + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, SnapshotID: 123, SequenceNumber: -1, Fields: []int32{}, + }, []byte("x")) + assert.ErrorContains(t, err, "snapshot-id") + }) + + // deletion vector invalid sequence-number: Spec requires sequence-number=-1 for deletion vectors. + t.Run("deletion vector invalid sequence-number", func(t *testing.T) { + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, SnapshotID: -1, SequenceNumber: 5, Fields: []int32{}, + }, []byte("x")) + assert.ErrorContains(t, err, "sequence-number") + }) +} + +// TestSetCreatedBy verifies the SetCreatedBy method. +// Allows applications to identify themselves in puffin files for debugging. +func TestSetCreatedBy(t *testing.T) { + // custom value: Verifies custom application identifiers are preserved in footer. + t.Run("custom value", func(t *testing.T) { + w, buf := newWriter() + require.NoError(t, w.SetCreatedBy("MyApp 1.0")) + w.Finish() + r := newReader(t, buf) + assert.Equal(t, "MyApp 1.0", r.Footer().Properties[puffin.CreatedBy]) + }) + + // empty rejected: Empty identifier provides no value and likely indicates a bug. + t.Run("empty rejected", func(t *testing.T) { + w, _ := newWriter() + assert.ErrorContains(t, w.SetCreatedBy(""), "empty") + }) + + // after finish rejected: Enforces writer state machine consistency. + t.Run("after finish rejected", func(t *testing.T) { + w, _ := newWriter() + w.Finish() + assert.ErrorContains(t, w.SetCreatedBy("x"), "finalized") + }) +} + +// TestClearProperties verifies the ClearProperties method. +// Allows resetting properties before finishing if initial values were wrong. +func TestClearProperties(t *testing.T) { + w, buf := newWriter() + w.AddProperties(map[string]string{"key": "value"}) + w.ClearProperties() + w.Finish() + + r := newReader(t, buf) + _, exists := r.Footer().Properties["key"] + assert.False(t, exists) +} + +// TestAddPropertiesAfterFinish verifies AddProperties rejects calls after finish. +// Enforces writer state machine - properties cannot be added after footer is written. +func TestAddPropertiesAfterFinish(t *testing.T) { + w, _ := newWriter() + w.Finish() + assert.ErrorContains(t, w.AddProperties(map[string]string{"k": "v"}), "finalized") +} + +// TestReaderInvalidFile verifies that Reader rejects invalid/corrupt files. +// Early detection of corruption prevents silent data loss or security issues. +func TestReaderInvalidFile(t *testing.T) { + tests := []struct { + name string + data func() []byte + wantErr string + }{ + // file too small: Minimum valid puffin file has header magic + footer, rejects truncated files. + {"file too small", func() []byte { return []byte("tiny") }, "too small"}, + // invalid header magic: First 4 bytes must be 'PFA1' to identify puffin format. + {"invalid header magic", func() []byte { + d := validFile() + d[0] = 'X' + + return d + }, "magic"}, + // invalid trailing magic: Last 4 bytes must be 'PFA1' to detect truncation. + {"invalid trailing magic", func() []byte { + d := validFile() + d[len(d)-1] = 'X' + + return d + }, "magic"}, + // invalid footer start magic: Footer section must start with 'PFA1' for integrity. + {"invalid footer start magic", func() []byte { + d := validFile() + d[4] = 'X' + + return d + }, "magic"}, + // unknown flags: Reject files with unsupported features to avoid misinterpretation. + {"unknown flags", func() []byte { + d := validFile() + d[len(d)-8] = 0x80 + + return d + }, "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data := tt.data() + _, err := puffin.NewReader(bytes.NewReader(data)) + assert.ErrorContains(t, err, tt.wantErr) + }) + } + + // nil reader: Ensures graceful failure when no underlying reader is provided. + t.Run("nil reader", func(t *testing.T) { + _, err := puffin.NewReader(nil) + assert.ErrorContains(t, err, "nil") + }) +} + +// TestReaderBlobAccess verifies blob access methods work correctly. +// Tests the primary API for retrieving blob data from puffin files. +func TestReaderBlobAccess(t *testing.T) { + w, buf := newWriter() + blobs := [][]byte{[]byte("first"), []byte("second"), []byte("third")} + for _, b := range blobs { + w.AddBlob(defaultBlobInput(), b) + } + w.Finish() + r := newReader(t, buf) + + // read by index: Primary access method for retrieving blobs sequentially. + t.Run("read by index", func(t *testing.T) { + for i, expected := range blobs { + blob, _ := r.ReadBlob(i) + assert.Equal(t, expected, blob.Data) + } + }) + + // index out of range: Prevents panic and provides clear error for invalid indices. + t.Run("index out of range", func(t *testing.T) { + _, err := r.ReadBlob(-1) + assert.Error(t, err) + _, err = r.ReadBlob(100) + assert.Error(t, err) + }) + + // read by metadata: Allows direct access when caller has metadata from external source. + t.Run("read by metadata", func(t *testing.T) { + data, _ := r.ReadBlobByMetadata(r.Footer().Blobs[1]) + assert.Equal(t, blobs[1], data) + }) + + // read all preserves order: Ensures blobs are returned in same order as written. + t.Run("read all preserves order", func(t *testing.T) { + all, _ := r.ReadAllBlobs() + for i, expected := range blobs { + assert.Equal(t, expected, all[i].Data) + } + }) +} + +// TestReadBlobByMetadataValidation verifies validation of blob metadata. +// Prevents reading garbage data when metadata is corrupted or crafted maliciously. +func TestReadBlobByMetadataValidation(t *testing.T) { + data := validFileWithBlob() + r, _ := puffin.NewReader(bytes.NewReader(data)) + + tests := []struct { + name string + meta puffin.BlobMetadata + wantErr string + }{ + // empty type: Type is required to interpret blob content correctly. + {"empty type", puffin.BlobMetadata{Type: "", Offset: 4, Length: 1}, "type"}, + // offset before header: Prevents reading magic bytes as blob data. + {"offset before header", puffin.BlobMetadata{Type: "t", Offset: 0, Length: 1}, "header"}, + // negative length: Invalid length could cause allocation issues. + {"negative length", puffin.BlobMetadata{Type: "t", Offset: 4, Length: -1}, "length"}, + // extends into footer: Prevents reading footer JSON as blob data. + {"extends into footer", puffin.BlobMetadata{Type: "t", Offset: 4, Length: 9999}, "footer"}, + // overflow: Prevents integer overflow attacks in offset+length calculation. + {"overflow", puffin.BlobMetadata{Type: "t", Offset: math.MaxInt64, Length: 1}, "overflow"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := r.ReadBlobByMetadata(tt.meta) + assert.ErrorContains(t, err, tt.wantErr) + }) + } +} + +// TestReadAt verifies the ReadAt method (io.ReaderAt implementation). +// Enables partial reads for streaming or when only specific byte ranges are needed. +func TestReadAt(t *testing.T) { + w, buf := newWriter() + meta, _ := w.AddBlob(defaultBlobInput(), []byte("hello world")) + w.Finish() + r := newReader(t, buf) + + // valid range: Verifies partial blob reads work correctly. + t.Run("valid range", func(t *testing.T) { + data := make([]byte, 5) + n, err := r.ReadAt(data, meta.Offset) + require.NoError(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, []byte("hello"), data) + }) + + // extends into footer: Prevents reading footer metadata as blob content. + t.Run("extends into footer", func(t *testing.T) { + data := make([]byte, buf.Len()) + _, err := r.ReadAt(data, 4) + assert.ErrorContains(t, err, "footer") + }) + + // offset before header: Prevents reading file magic as blob content. + t.Run("offset before header", func(t *testing.T) { + data := make([]byte, 4) + _, err := r.ReadAt(data, 0) + assert.ErrorContains(t, err, "header") + }) +} diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go new file mode 100644 index 000000000..1966173bc --- /dev/null +++ b/puffin/puffin_writer.go @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" + + "github.com/apache/iceberg-go" +) + +// Writer writes blobs and metadata to a Puffin file. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.BlobTypeDataSketchesTheta, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type Writer struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type BlobType + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} + +// NewWriter creates a new Writer and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. +func NewWriter(w io.Writer) (*Writer, error) { + if w == nil { + return nil, errors.New("puffin: writer is nil") + } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + + return &Writer{ + w: w, + offset: MagicSize, + props: make(map[string]string), + createdBy: "iceberg-go " + iceberg.Version(), + }, nil +} + +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. +func (w *Writer) AddProperties(props map[string]string) error { + if w.done { + return errors.New("puffin: cannot set properties: writer already finalized") + } + for k, v := range props { + w.props[k] = v + } + + return nil +} + +// clear properties +func (w *Writer) ClearProperties() { + w.props = make(map[string]string) +} + +// SetCreatedBy overrides the default "created-by" property written to the footer. +// The default value is "iceberg-go". Example: "MyApp version 1.2.3". +func (w *Writer) SetCreatedBy(createdBy string) error { + if w.done { + return errors.New("puffin: cannot set created-by: writer already finalized") + } + if createdBy == "" { + return errors.New("puffin: cannot set created-by: value cannot be empty") + } + w.createdBy = createdBy + + return nil +} + +// AddBlob writes blob data and records its metadata for the footer. +// Returns the complete BlobMetadata including the computed Offset and Length. +// The input.Type is required; use constants like ApacheDataSketchesThetaV1. +func (w *Writer) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { + if w.done { + return BlobMetadata{}, errors.New("puffin: cannot add blob: writer already finalized") + } + if input.Type == "" { + return BlobMetadata{}, errors.New("puffin: cannot add blob: type is required") + } + if input.Fields == nil { + return BlobMetadata{}, errors.New("puffin: cannot add blob: fields is required") + } + + // Deletion vectors have special requirements per spec + if input.Type == BlobTypeDeletionVector { + if input.SnapshotID != -1 { + return BlobMetadata{}, errors.New("puffin: deletion-vector-v1 requires snapshot-id to be -1") + } + if input.SequenceNumber != -1 { + return BlobMetadata{}, errors.New("puffin: deletion-vector-v1 requires sequence-number to be -1") + } + } + + meta := BlobMetadata{ + Type: input.Type, + SnapshotID: input.SnapshotID, + SequenceNumber: input.SequenceNumber, + Fields: input.Fields, + Offset: w.offset, + Length: int64(len(data)), + Properties: input.Properties, + } + + if err := writeAll(w.w, data); err != nil { + return BlobMetadata{}, fmt.Errorf("puffin: write blob: %w", err) + } + + w.offset += meta.Length + w.blobs = append(w.blobs, meta) + + return meta, nil +} + +// Finish writes the footer and completes the Puffin file structure. +// Must be called exactly once after all blobs are written. +// After Finish returns, no further operations are allowed on the writer. +func (w *Writer) Finish() error { + if w.done { + return errors.New("puffin: cannot finish: writer already finalized") + } + + // Build footer + footer := Footer{ + Blobs: w.blobs, + Properties: w.props, + } + if footer.Properties == nil { + footer.Properties = make(map[string]string) + } + if w.createdBy != "" { + footer.Properties[CreatedBy] = w.createdBy + } + + payload, err := json.Marshal(footer) + if err != nil { + return fmt.Errorf("puffin: marshal footer: %w", err) + } + + // Check footer size fits in int32 + if len(payload) > math.MaxInt32 { + return fmt.Errorf("puffin: footer too large: %d bytes exceeds 2GB limit", len(payload)) + } + + // Write footer start magic + if err := writeAll(w.w, magic[:]); err != nil { + return fmt.Errorf("puffin: write footer magic: %w", err) + } + + // Write footer payload + if err := writeAll(w.w, payload); err != nil { + return fmt.Errorf("puffin: write footer payload: %w", err) + } + + // Write trailer: PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + binary.LittleEndian.PutUint32(trailer[0:4], uint32(len(payload))) + binary.LittleEndian.PutUint32(trailer[4:8], 0) // flags = 0 (uncompressed) + copy(trailer[8:12], magic[:]) + + if err := writeAll(w.w, trailer[:]); err != nil { + return fmt.Errorf("puffin: write footer trailer: %w", err) + } + + w.done = true + + return nil +} + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +}