From 744bf103d9bdb1d7d23f055d82546b26eed38f3a Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Tue, 13 Jan 2026 02:31:38 +0530 Subject: [PATCH 1/7] feat: init puffin Signed-off-by: Shreyas220 --- puffin/puffin.go | 36 ++++++++++++++++++++++++++++ puffin/puffin_reader.go | 27 +++++++++++++++++++++ puffin/puffin_writer.go | 53 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 puffin/puffin.go create mode 100644 puffin/puffin_reader.go create mode 100644 puffin/puffin_writer.go diff --git a/puffin/puffin.go b/puffin/puffin.go new file mode 100644 index 000000000..27af38dbf --- /dev/null +++ b/puffin/puffin.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +var magic = [4]byte{'P', 'F', 'A', '1'} + +type BlobMetadata struct { + Type string `json:"type"` + SnapshotID int64 `json:"snapshot-id"` + SequenceNumber int64 `json:"sequence-number"` + Fields []int32 `json:"fields,omitempty"` + Offset int64 `json:"offset"` // absolute file offset + Length int64 `json:"length"` + CompressionCodec *string `json:"compression-codec,omitempty"` + Properties map[string]string `json:"properties,omitempty"` +} + +// Footer describes the blobs and file-level properties stored in a Puffin file. +type Footer struct { + Blobs []BlobMetadata `json:"blobs"` + Properties map[string]string `json:"properties,omitempty"` +} diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go new file mode 100644 index 000000000..cc69d5255 --- /dev/null +++ b/puffin/puffin_reader.go @@ -0,0 +1,27 @@ +package puffin + +import "io" + +type PuffinReader struct { + r io.ReaderAt + size int64 + footerStart int64 + footerRead bool +} + +func NewPuffinReader(r io.ReaderAt, size int64) *PuffinReader { + return &PuffinReader{ + r: r, + size: size, + footerStart: size - 4, + } +} + +func (r *PuffinReader) ReadFooter() {} + +func (r *PuffinReader) ReadAllBlobs() {} + +// ReadBlob reads the content of one specific blob. +func (r *PuffinReader) ReadBlob() {} + +func validateMetadata() {} diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go new file mode 100644 index 000000000..845d908ff --- /dev/null +++ b/puffin/puffin_writer.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "fmt" + "io" +) + +// PuffinWriter writes a Puffin file to an output stream. +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +type BlobMetadataInput struct{} + +// NewWriter creates a new PuffinWriter. +func NewWriter(w io.Writer) (*PuffinWriter, error) { + if w == nil { + return nil, fmt.Errorf("puffin: writer is nil") + } + return &PuffinWriter{ + w: w, + offset: 0, + blobs: make([]BlobMetadata, 0), + props: make(map[string]string), + done: false, + createdBy: "iceberg-go", + }, nil +} + +func (w *PuffinWriter) AddBlob() {} + +func (w *PuffinReader) Finish() {} From 7f9c6d09d6e55d9d4caf22c67d5ab9c05f1d9b10 Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Tue, 13 Jan 2026 03:33:42 +0530 Subject: [PATCH 2/7] feat: puffin writer Signed-off-by: Shreyas220 --- puffin/puffin.go | 22 ++++++ puffin/puffin_reader.go | 16 +++++ puffin/puffin_writer.go | 144 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 175 insertions(+), 7 deletions(-) diff --git a/puffin/puffin.go b/puffin/puffin.go index 27af38dbf..ab74adc78 100644 --- a/puffin/puffin.go +++ b/puffin/puffin.go @@ -18,6 +18,28 @@ package puffin var magic = [4]byte{'P', 'F', 'A', '1'} +const ( + // MagicSize is the number of bytes in the magic marker. + MagicSize = 4 + // footerTrailerSize accounts for footer length (4), flags (4), and trailing magic (4). + footerTrailerSize = 12 + // FooterFlagCompressed indicates a compressed footer; unsupported in this implementation. + FooterFlagCompressed = 1 // bit 0 + + // DefaultMaxBlobSize is the maximum blob size allowed when reading (256 MB). + // This prevents OOM attacks from malicious files with huge blob lengths. + // Override with WithMaxBlobSize when creating a reader. + DefaultMaxBlobSize = 256 << 20 + // CreatedBy is a human-readable identification of the application writing the file, along with its version. + // Example: "Trino version 381". + CreatedBy = "created-by" + // ApacheDataSketchesThetaV1 is a serialized compact Theta sketch from Apache DataSketches. + ApacheDataSketchesThetaV1 = "apache-datasketches-theta-v1" + + // DeletionVectorV1 is a serialized deletion vector according to the Iceberg spec. + DeletionVectorV1 = "deletion-vector-v1" +) + type BlobMetadata struct { Type string `json:"type"` SnapshotID int64 `json:"snapshot-id"` diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go index cc69d5255..20b6e4260 100644 --- a/puffin/puffin_reader.go +++ b/puffin/puffin_reader.go @@ -1,3 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. package puffin import "io" diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go index 845d908ff..25d15dc3b 100644 --- a/puffin/puffin_writer.go +++ b/puffin/puffin_writer.go @@ -17,10 +17,26 @@ package puffin import ( + "encoding/binary" + "encoding/json" "fmt" "io" + "math" ) +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + return nil +} + // PuffinWriter writes a Puffin file to an output stream. type PuffinWriter struct { w io.Writer @@ -31,23 +47,137 @@ type PuffinWriter struct { createdBy string } -type BlobMetadataInput struct{} +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} -// NewWriter creates a new PuffinWriter. +// NewWriter creates a new PuffinWriter and writes the file header magic. func NewWriter(w io.Writer) (*PuffinWriter, error) { if w == nil { return nil, fmt.Errorf("puffin: writer is nil") } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + return &PuffinWriter{ w: w, - offset: 0, - blobs: make([]BlobMetadata, 0), + offset: MagicSize, props: make(map[string]string), - done: false, createdBy: "iceberg-go", }, nil } -func (w *PuffinWriter) AddBlob() {} +// SetProperties sets file-level properties. +func (w *PuffinWriter) SetProperties(props map[string]string) error { + if w.done { + return fmt.Errorf("puffin: writer finalized") + } + for k, v := range props { + w.props[k] = v + } + return nil +} + +// SetCreatedBy sets the created-by property in the footer. +func (w *PuffinWriter) SetCreatedBy(createdBy string) error { + if w.done { + return fmt.Errorf("puffin: writer finalized") + } + if createdBy == "" { + return fmt.Errorf("puffin: created-by cannot be empty") + } + w.createdBy = createdBy + return nil +} + +// AddBlob writes a blob to the file and records its metadata. +// Returns the BlobMetadata with computed Offset and Length. +func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { + if w.done { + return BlobMetadata{}, fmt.Errorf("puffin: writer finalized") + } + if input.Type == "" { + return BlobMetadata{}, fmt.Errorf("puffin: blob type required") + } + + meta := BlobMetadata{ + Type: input.Type, + SnapshotID: input.SnapshotID, + SequenceNumber: input.SequenceNumber, + Fields: input.Fields, + Offset: w.offset, + Length: int64(len(data)), + Properties: input.Properties, + } + + if err := writeAll(w.w, data); err != nil { + return BlobMetadata{}, fmt.Errorf("puffin: write blob: %w", err) + } -func (w *PuffinReader) Finish() {} + w.offset += meta.Length + w.blobs = append(w.blobs, meta) + + return meta, nil +} + +// Finish writes the footer and completes the file. +// Must be called exactly once after all blobs are written. +func (w *PuffinWriter) Finish() error { + if w.done { + return fmt.Errorf("puffin: writer finalized") + } + + // Build footer + footer := Footer{ + Blobs: w.blobs, + Properties: w.props, + } + if footer.Properties == nil { + footer.Properties = make(map[string]string) + } + if w.createdBy != "" { + footer.Properties[CreatedBy] = w.createdBy + } + + payload, err := json.Marshal(footer) + if err != nil { + return fmt.Errorf("puffin: marshal footer: %w", err) + } + + // Check footer size fits in int32 + if len(payload) > math.MaxInt32 { + return fmt.Errorf("puffin: footer too large: %d bytes exceeds 2GB limit", len(payload)) + } + + // Write footer start magic + if err := writeAll(w.w, magic[:]); err != nil { + return fmt.Errorf("puffin: write footer magic: %w", err) + } + + // Write footer payload + if err := writeAll(w.w, payload); err != nil { + return fmt.Errorf("puffin: write footer payload: %w", err) + } + + // Write trailer: PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + binary.LittleEndian.PutUint32(trailer[0:4], uint32(len(payload))) + binary.LittleEndian.PutUint32(trailer[4:8], 0) // flags = 0 (uncompressed) + copy(trailer[8:12], magic[:]) + + if err := writeAll(w.w, trailer[:]); err != nil { + return fmt.Errorf("puffin: write footer trailer: %w", err) + } + + w.done = true + return nil +} From 87c2634dc2a1209a75d1903f7b3c03861dc1b741 Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Tue, 13 Jan 2026 03:59:12 +0530 Subject: [PATCH 3/7] feat: puffin reader Signed-off-by: Shreyas220 --- puffin/puffin_reader.go | 366 ++++++++++++++++++++++++++++++++++++++-- puffin/puffin_writer.go | 49 ++++-- 2 files changed, 392 insertions(+), 23 deletions(-) diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go index 20b6e4260..9774e799c 100644 --- a/puffin/puffin_reader.go +++ b/puffin/puffin_reader.go @@ -14,30 +14,374 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package puffin -import "io" +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "sort" +) +// PuffinReader provides random access to blobs and metadata in a Puffin file. +// +// The reader validates magic bytes on construction and caches the footer +// after the first read. Use ReadBlob or ReadAllBlobs to access blob data. +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } type PuffinReader struct { r io.ReaderAt size int64 - footerStart int64 - footerRead bool + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } } -func NewPuffinReader(r io.ReaderAt, size int64) *PuffinReader { - return &PuffinReader{ +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, fmt.Errorf("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, fmt.Errorf("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, fmt.Errorf("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ r: r, size: size, - footerStart: size - 4, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) } + + return pr, nil } -func (r *PuffinReader) ReadFooter() {} +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, fmt.Errorf("puffin: invalid trailing magic in footer") + } -func (r *PuffinReader) ReadAllBlobs() {} + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) -// ReadBlob reads the content of one specific blob. -func (r *PuffinReader) ReadBlob() {} + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, fmt.Errorf("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Validate footer start magic + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if footerMagic != magic { + return nil, fmt.Errorf("puffin: invalid footer start magic") + } + + // Read footer JSON payload + payload := make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } + + // Parse footer JSON + var footer Footer + if err := json.Unmarshal(payload, &footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } + + // Validate blob metadata + if err := r.validateBlobs(footer.Blobs, footerStart); err != nil { + return nil, err + } + + // Cache footer and footerStart + r.footer = &footer + r.footerStart = footerStart + + return r.footer, nil +} + +// ReadBlob reads the content of a specific blob by index. +// The footer is read automatically if not already cached. +func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if index < 0 || index >= len(footer.Blobs) { + return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) + } + + meta := footer.Blobs[index] + data, err := r.readBlobData(meta) + if err != nil { + return nil, err + } + + return &BlobData{Metadata: meta, Data: data}, nil +} -func validateMetadata() {} +// ReadBlobByMetadata reads a blob using its metadata directly. +// This is useful when you have metadata from an external source. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { + if r.footer == nil { + return nil, fmt.Errorf("puffin: cannot read blob: footer not read") + } + return r.readBlobData(meta) +} + +// readBlobData is the internal implementation for reading blob data. +func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { + // Validate blob type + if meta.Type == "" { + return nil, fmt.Errorf("puffin: cannot read blob: type is required") + } + + // Check for compressed blob (unsupported) + if meta.CompressionCodec != nil && *meta.CompressionCodec != "" { + return nil, fmt.Errorf("puffin: cannot read blob: compression %q not supported", *meta.CompressionCodec) + } + + // Validate length + if meta.Length < 0 { + return nil, fmt.Errorf("puffin: invalid blob length %d", meta.Length) + } + if meta.Length > r.maxBlobSize { + return nil, fmt.Errorf("puffin: blob size %d exceeds limit %d", meta.Length, r.maxBlobSize) + } + + // Validate offset (must be after header magic) + if meta.Offset < MagicSize { + return nil, fmt.Errorf("puffin: invalid blob offset %d (before header)", meta.Offset) + } + + // Check for overflow + end := meta.Offset + meta.Length + if end < meta.Offset { + return nil, fmt.Errorf("puffin: blob offset+length overflow: offset=%d length=%d", meta.Offset, meta.Length) + } + + // Validate blob doesn't extend into footer + if end > r.footerStart { + return nil, fmt.Errorf("puffin: blob extends into footer: offset=%d length=%d footerStart=%d", + meta.Offset, meta.Length, r.footerStart) + } + + // Read blob data + data := make([]byte, meta.Length) + if _, err := r.r.ReadAt(data, meta.Offset); err != nil { + return nil, fmt.Errorf("puffin: read blob data: %w", err) + } + + return data, nil +} + +// ReadAllBlobs reads all blobs from the file. +// Blobs are read in offset order for sequential I/O efficiency, +// but results are returned in the original footer order. +func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if len(footer.Blobs) == 0 { + return nil, nil + } + + // Create index mapping to preserve original order + type indexedBlob struct { + index int + meta BlobMetadata + } + indexed := make([]indexedBlob, len(footer.Blobs)) + for i, meta := range footer.Blobs { + indexed[i] = indexedBlob{index: i, meta: meta} + } + + // Sort by offset for sequential I/O + sort.Slice(indexed, func(i, j int) bool { + return indexed[i].meta.Offset < indexed[j].meta.Offset + }) + + // Read blobs in offset order, store in original order + results := make([]*BlobData, len(footer.Blobs)) + for _, ib := range indexed { + data, err := r.readBlobData(ib.meta) + if err != nil { + return nil, fmt.Errorf("puffin: read blob %d: %w", ib.index, err) + } + results[ib.index] = &BlobData{Metadata: ib.meta, Data: data} + } + + return results, nil +} + +// ReadRange reads a raw byte range from the blob data region. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadRange(offset, length int64) ([]byte, error) { + if r.footer == nil { + return nil, fmt.Errorf("puffin: cannot read range: footer not read") + } + + // Validate length + if length < 0 { + return nil, fmt.Errorf("puffin: invalid range length %d", length) + } + if length > r.maxBlobSize { + return nil, fmt.Errorf("puffin: range size %d exceeds limit %d", length, r.maxBlobSize) + } + + // Validate offset + if offset < MagicSize { + return nil, fmt.Errorf("puffin: invalid range offset %d (before header)", offset) + } + + // Check for overflow + end := offset + length + if end < offset { + return nil, fmt.Errorf("puffin: range offset+length overflow: offset=%d length=%d", offset, length) + } + + // Validate range doesn't extend into footer + if end > r.footerStart { + return nil, fmt.Errorf("puffin: range extends into footer: offset=%d length=%d footerStart=%d", + offset, length, r.footerStart) + } + + // Read data + buf := make([]byte, length) + if _, err := r.r.ReadAt(buf, offset); err != nil { + return nil, fmt.Errorf("puffin: read range: %w", err) + } + + return buf, nil +} + +// validateBlobs validates all blob metadata entries. +func (r *PuffinReader) validateBlobs(blobs []BlobMetadata, footerStart int64) error { + for i, blob := range blobs { + // Type is required + if blob.Type == "" { + return fmt.Errorf("puffin: blob %d: type is required", i) + } + + // Length must be non-negative + if blob.Length < 0 { + return fmt.Errorf("puffin: blob %d: invalid length %d", i, blob.Length) + } + + // Offset must be after header magic + if blob.Offset < MagicSize { + return fmt.Errorf("puffin: blob %d: offset %d before header", i, blob.Offset) + } + + // Check for overflow + end := blob.Offset + blob.Length + if end < blob.Offset { + return fmt.Errorf("puffin: blob %d: offset+length overflow: offset=%d length=%d", i, blob.Offset, blob.Length) + } + + // Blob must not extend into footer + if end > footerStart { + return fmt.Errorf("puffin: blob %d: extends into footer: offset=%d length=%d footerStart=%d", + i, blob.Offset, blob.Length, footerStart) + } + } + + return nil +} diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go index 25d15dc3b..48e52dbb9 100644 --- a/puffin/puffin_writer.go +++ b/puffin/puffin_writer.go @@ -37,7 +37,27 @@ func writeAll(w io.Writer, data []byte) error { return nil } -// PuffinWriter writes a Puffin file to an output stream. +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() type PuffinWriter struct { w io.Writer offset int64 @@ -58,6 +78,7 @@ type BlobMetadataInput struct { } // NewWriter creates a new PuffinWriter and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. func NewWriter(w io.Writer) (*PuffinWriter, error) { if w == nil { return nil, fmt.Errorf("puffin: writer is nil") @@ -76,10 +97,11 @@ func NewWriter(w io.Writer) (*PuffinWriter, error) { }, nil } -// SetProperties sets file-level properties. +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. func (w *PuffinWriter) SetProperties(props map[string]string) error { if w.done { - return fmt.Errorf("puffin: writer finalized") + return fmt.Errorf("puffin: cannot set properties: writer already finalized") } for k, v := range props { w.props[k] = v @@ -87,26 +109,28 @@ func (w *PuffinWriter) SetProperties(props map[string]string) error { return nil } -// SetCreatedBy sets the created-by property in the footer. +// SetCreatedBy overrides the default "created-by" property written to the footer. +// The default value is "iceberg-go". Example: "MyApp version 1.2.3". func (w *PuffinWriter) SetCreatedBy(createdBy string) error { if w.done { - return fmt.Errorf("puffin: writer finalized") + return fmt.Errorf("puffin: cannot set created-by: writer already finalized") } if createdBy == "" { - return fmt.Errorf("puffin: created-by cannot be empty") + return fmt.Errorf("puffin: cannot set created-by: value cannot be empty") } w.createdBy = createdBy return nil } -// AddBlob writes a blob to the file and records its metadata. -// Returns the BlobMetadata with computed Offset and Length. +// AddBlob writes blob data and records its metadata for the footer. +// Returns the complete BlobMetadata including the computed Offset and Length. +// The input.Type is required; use constants like ApacheDataSketchesThetaV1. func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { if w.done { - return BlobMetadata{}, fmt.Errorf("puffin: writer finalized") + return BlobMetadata{}, fmt.Errorf("puffin: cannot add blob: writer already finalized") } if input.Type == "" { - return BlobMetadata{}, fmt.Errorf("puffin: blob type required") + return BlobMetadata{}, fmt.Errorf("puffin: cannot add blob: type is required") } meta := BlobMetadata{ @@ -129,11 +153,12 @@ func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetada return meta, nil } -// Finish writes the footer and completes the file. +// Finish writes the footer and completes the Puffin file structure. // Must be called exactly once after all blobs are written. +// After Finish returns, no further operations are allowed on the writer. func (w *PuffinWriter) Finish() error { if w.done { - return fmt.Errorf("puffin: writer finalized") + return fmt.Errorf("puffin: cannot finish: writer already finalized") } // Build footer From e563a7353a18ee6410a5717c89053bdcada96a71 Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Tue, 13 Jan 2026 04:20:14 +0530 Subject: [PATCH 4/7] chore: lint fix Signed-off-by: Shreyas220 --- puffin/puffin.go | 3 ++- puffin/puffin_reader.go | 26 +++++++++++++------------- puffin/puffin_writer.go | 19 ++++++++++++------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/puffin/puffin.go b/puffin/puffin.go index ab74adc78..dcd21897a 100644 --- a/puffin/puffin.go +++ b/puffin/puffin.go @@ -19,9 +19,10 @@ package puffin var magic = [4]byte{'P', 'F', 'A', '1'} const ( + //[Magic] [FooterPayload] [FooterPayloadSize] [Flags] [Magic] // MagicSize is the number of bytes in the magic marker. MagicSize = 4 - // footerTrailerSize accounts for footer length (4), flags (4), and trailing magic (4). + // footerTrailerSize accounts for footer length (4)+ flags (4) + trailing magic (4). footerTrailerSize = 12 // FooterFlagCompressed indicates a compressed footer; unsupported in this implementation. FooterFlagCompressed = 1 // bit 0 diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go index 9774e799c..7f9615835 100644 --- a/puffin/puffin_reader.go +++ b/puffin/puffin_reader.go @@ -20,15 +20,13 @@ package puffin import ( "encoding/binary" "encoding/json" + "errors" "fmt" "io" "sort" ) -// PuffinReader provides random access to blobs and metadata in a Puffin file. -// -// The reader validates magic bytes on construction and caches the footer -// after the first read. Use ReadBlob or ReadAllBlobs to access blob data. +// PuffinReader // // Usage: // @@ -75,10 +73,11 @@ func WithMaxBlobSize(size int64) ReaderOption { // The caller is responsible for closing the underlying io.ReaderAt. func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { if r == nil { - return nil, fmt.Errorf("puffin: reader is nil") + return nil, errors.New("puffin: reader is nil") } // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] minSize := int64(MagicSize + MagicSize + footerTrailerSize) if size < minSize { return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) @@ -90,7 +89,7 @@ func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinRe return nil, fmt.Errorf("puffin: read header magic: %w", err) } if headerMagic != magic { - return nil, fmt.Errorf("puffin: invalid header magic") + return nil, errors.New("puffin: invalid header magic") } // Validate trailing magic (fail fast on corrupt/truncated files) @@ -99,7 +98,7 @@ func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinRe return nil, fmt.Errorf("puffin: read trailing magic: %w", err) } if trailingMagic != magic { - return nil, fmt.Errorf("puffin: invalid trailing magic") + return nil, errors.New("puffin: invalid trailing magic") } pr := &PuffinReader{ @@ -131,7 +130,7 @@ func (r *PuffinReader) ReadFooter() (*Footer, error) { // Validate trailing magic (already checked in constructor, but be defensive) if trailer[8] != magic[0] || trailer[9] != magic[1] || trailer[10] != magic[2] || trailer[11] != magic[3] { - return nil, fmt.Errorf("puffin: invalid trailing magic in footer") + return nil, errors.New("puffin: invalid trailing magic in footer") } // Extract payload size and flags @@ -140,7 +139,7 @@ func (r *PuffinReader) ReadFooter() (*Footer, error) { // Check for compressed footer (unsupported) if flags&FooterFlagCompressed != 0 { - return nil, fmt.Errorf("puffin: compressed footer not supported") + return nil, errors.New("puffin: compressed footer not supported") } // Check for unknown flags (future-proofing) @@ -166,7 +165,7 @@ func (r *PuffinReader) ReadFooter() (*Footer, error) { return nil, fmt.Errorf("puffin: read footer start magic: %w", err) } if footerMagic != magic { - return nil, fmt.Errorf("puffin: invalid footer start magic") + return nil, errors.New("puffin: invalid footer start magic") } // Read footer JSON payload @@ -219,8 +218,9 @@ func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { // The footer must be read first to establish bounds checking. func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { if r.footer == nil { - return nil, fmt.Errorf("puffin: cannot read blob: footer not read") + return nil, errors.New("puffin: cannot read blob: footer not read") } + return r.readBlobData(meta) } @@ -228,7 +228,7 @@ func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { // Validate blob type if meta.Type == "" { - return nil, fmt.Errorf("puffin: cannot read blob: type is required") + return nil, errors.New("puffin: cannot read blob: type is required") } // Check for compressed blob (unsupported) @@ -315,7 +315,7 @@ func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { // The footer must be read first to establish bounds checking. func (r *PuffinReader) ReadRange(offset, length int64) ([]byte, error) { if r.footer == nil { - return nil, fmt.Errorf("puffin: cannot read range: footer not read") + return nil, errors.New("puffin: cannot read range: footer not read") } // Validate length diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go index 48e52dbb9..9fcf227ec 100644 --- a/puffin/puffin_writer.go +++ b/puffin/puffin_writer.go @@ -19,6 +19,7 @@ package puffin import ( "encoding/binary" "encoding/json" + "errors" "fmt" "io" "math" @@ -34,6 +35,7 @@ func writeAll(w io.Writer, data []byte) error { if n != len(data) { return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) } + return nil } @@ -81,7 +83,7 @@ type BlobMetadataInput struct { // The caller is responsible for closing the underlying writer after Finish returns. func NewWriter(w io.Writer) (*PuffinWriter, error) { if w == nil { - return nil, fmt.Errorf("puffin: writer is nil") + return nil, errors.New("puffin: writer is nil") } // Write header magic bytes @@ -101,11 +103,12 @@ func NewWriter(w io.Writer) (*PuffinWriter, error) { // written to the footer. Can be called multiple times before Finish. func (w *PuffinWriter) SetProperties(props map[string]string) error { if w.done { - return fmt.Errorf("puffin: cannot set properties: writer already finalized") + return errors.New("puffin: cannot set properties: writer already finalized") } for k, v := range props { w.props[k] = v } + return nil } @@ -113,12 +116,13 @@ func (w *PuffinWriter) SetProperties(props map[string]string) error { // The default value is "iceberg-go". Example: "MyApp version 1.2.3". func (w *PuffinWriter) SetCreatedBy(createdBy string) error { if w.done { - return fmt.Errorf("puffin: cannot set created-by: writer already finalized") + return errors.New("puffin: cannot set created-by: writer already finalized") } if createdBy == "" { - return fmt.Errorf("puffin: cannot set created-by: value cannot be empty") + return errors.New("puffin: cannot set created-by: value cannot be empty") } w.createdBy = createdBy + return nil } @@ -127,10 +131,10 @@ func (w *PuffinWriter) SetCreatedBy(createdBy string) error { // The input.Type is required; use constants like ApacheDataSketchesThetaV1. func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { if w.done { - return BlobMetadata{}, fmt.Errorf("puffin: cannot add blob: writer already finalized") + return BlobMetadata{}, errors.New("puffin: cannot add blob: writer already finalized") } if input.Type == "" { - return BlobMetadata{}, fmt.Errorf("puffin: cannot add blob: type is required") + return BlobMetadata{}, errors.New("puffin: cannot add blob: type is required") } meta := BlobMetadata{ @@ -158,7 +162,7 @@ func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetada // After Finish returns, no further operations are allowed on the writer. func (w *PuffinWriter) Finish() error { if w.done { - return fmt.Errorf("puffin: cannot finish: writer already finalized") + return errors.New("puffin: cannot finish: writer already finalized") } // Build footer @@ -204,5 +208,6 @@ func (w *PuffinWriter) Finish() error { } w.done = true + return nil } From 1aa0e7a3dbbcb2d9e38f0596028b1e7b33017f7f Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Wed, 14 Jan 2026 14:52:31 +0530 Subject: [PATCH 5/7] chore: hanlding review changes Signed-off-by: Shreyas220 --- puffin/puffin.go | 22 +- puffin/puffin_reader.go | 207 ++++++++++--------- puffin/puffin_test.go | 432 ++++++++++++++++++++++++++++++++++++++++ puffin/puffin_writer.go | 62 +++--- 4 files changed, 583 insertions(+), 140 deletions(-) create mode 100644 puffin/puffin_test.go diff --git a/puffin/puffin.go b/puffin/puffin.go index dcd21897a..abd34c827 100644 --- a/puffin/puffin.go +++ b/puffin/puffin.go @@ -34,18 +34,13 @@ const ( // CreatedBy is a human-readable identification of the application writing the file, along with its version. // Example: "Trino version 381". CreatedBy = "created-by" - // ApacheDataSketchesThetaV1 is a serialized compact Theta sketch from Apache DataSketches. - ApacheDataSketchesThetaV1 = "apache-datasketches-theta-v1" - - // DeletionVectorV1 is a serialized deletion vector according to the Iceberg spec. - DeletionVectorV1 = "deletion-vector-v1" ) type BlobMetadata struct { - Type string `json:"type"` + Type BlobType `json:"type"` SnapshotID int64 `json:"snapshot-id"` SequenceNumber int64 `json:"sequence-number"` - Fields []int32 `json:"fields,omitempty"` + Fields []int32 `json:"fields"` Offset int64 `json:"offset"` // absolute file offset Length int64 `json:"length"` CompressionCodec *string `json:"compression-codec,omitempty"` @@ -57,3 +52,16 @@ type Footer struct { Blobs []BlobMetadata `json:"blobs"` Properties map[string]string `json:"properties,omitempty"` } + +type BlobType string + +const ( + // BlobTypeDataSketchesTheta is a serialized compact Theta sketch + // produced by the Apache DataSketches library. + BlobTypeDataSketchesTheta BlobType = "apache-datasketches-theta-v1" + + // BlobTypeDeletionVector is a serialized deletion vector per the + // Iceberg spec. Requires snapshot-id and sequence-number to be -1. + BlobTypeDeletionVector BlobType = "deletion-vector-v1" + +) diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go index 7f9615835..ded03af53 100644 --- a/puffin/puffin_reader.go +++ b/puffin/puffin_reader.go @@ -18,6 +18,7 @@ package puffin import ( + "bytes" "encoding/binary" "encoding/json" "errors" @@ -26,23 +27,19 @@ import ( "sort" ) -// PuffinReader +// Reader reads blobs and metadata from a Puffin file. // // Usage: // -// r, err := puffin.NewPuffinReader(file, fileSize) +// r, err := puffin.NewReader(file, fileSize) // if err != nil { // return err // } -// footer, err := r.ReadFooter() -// if err != nil { -// return err -// } -// for i := range footer.Blobs { +// for i := range r.Footer().Blobs { // blob, err := r.ReadBlob(i) // // process blob.Data // } -type PuffinReader struct { +type Reader struct { r io.ReaderAt size int64 footer *Footer @@ -56,22 +53,23 @@ type BlobData struct { Data []byte } -// ReaderOption configures a PuffinReader. -type ReaderOption func(*PuffinReader) +// ReaderOption configures a Reader. +type ReaderOption func(*Reader) // WithMaxBlobSize sets the maximum blob size allowed when reading. // This prevents OOM attacks from malicious files with huge blob lengths. // Default is DefaultMaxBlobSize (256 MB). func WithMaxBlobSize(size int64) ReaderOption { - return func(r *PuffinReader) { + return func(r *Reader) { r.maxBlobSize = size } } -// NewPuffinReader creates a new Puffin reader. -// It validates both the header and trailing magic bytes upfront. +// NewReader creates a new Puffin file reader. +// The size parameter must be the total size of the file in bytes. +// It validates magic bytes and reads the footer eagerly. // The caller is responsible for closing the underlying io.ReaderAt. -func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { +func NewReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*Reader, error) { if r == nil { return nil, errors.New("puffin: reader is nil") } @@ -88,7 +86,7 @@ func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinRe if _, err := r.ReadAt(headerMagic[:], 0); err != nil { return nil, fmt.Errorf("puffin: read header magic: %w", err) } - if headerMagic != magic { + if !bytes.Equal(headerMagic[:], magic[:]) { return nil, errors.New("puffin: invalid header magic") } @@ -97,11 +95,11 @@ func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinRe if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { return nil, fmt.Errorf("puffin: read trailing magic: %w", err) } - if trailingMagic != magic { + if !bytes.Equal(trailingMagic[:], magic[:]) { return nil, errors.New("puffin: invalid trailing magic") } - pr := &PuffinReader{ + pr := &Reader{ r: r, size: size, maxBlobSize: DefaultMaxBlobSize, @@ -111,25 +109,44 @@ func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinRe opt(pr) } + // Read footer + if _, err := pr.readFooter(); err != nil { + return nil, err + } + return pr, nil } -// ReadFooter reads and parses the footer from the Puffin file. -// The footer is cached after the first successful read. -func (r *PuffinReader) ReadFooter() (*Footer, error) { +// Footer returns the parsed footer metadata. +// The footer is read during NewReader, so this always returns a valid footer. +func (r *Reader) Footer() *Footer { + return r.footer +} + +// defaultFooterReadSize is the initial read size when reading the footer. +// We read more than strictly needed to hopefully get the entire footer +// in one read, reducing round-trips on cloud object storage. +const defaultFooterReadSize = 8 * 1024 // 8 KB + +// readFooter reads and parses the footer from the Puffin file. +func (r *Reader) readFooter() (*Footer, error) { if r.footer != nil { return r.footer, nil } - // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) - var trailer [footerTrailerSize]byte - if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { - return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + // Read a larger chunk from the end to minimize round-trips on cloud storage. + // This often captures the entire footer in one read. + readSize := min(int64(defaultFooterReadSize), r.size) + buf := make([]byte, readSize) + if _, err := r.r.ReadAt(buf, r.size-readSize); err != nil { + return nil, fmt.Errorf("puffin: read footer region: %w", err) } - // Validate trailing magic (already checked in constructor, but be defensive) - if trailer[8] != magic[0] || trailer[9] != magic[1] || - trailer[10] != magic[2] || trailer[11] != magic[3] { + // Parse trailer from end of buffer: PayloadSize(4) + Flags(4) + Magic(4) + trailer := buf[len(buf)-footerTrailerSize:] + + // Validate trailing magic + if !bytes.Equal(trailer[8:12], magic[:]) { return nil, errors.New("puffin: invalid trailing magic in footer") } @@ -159,24 +176,38 @@ func (r *PuffinReader) ReadFooter() (*Footer, error) { return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) } - // Validate footer start magic - var footerMagic [MagicSize]byte - if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { - return nil, fmt.Errorf("puffin: read footer start magic: %w", err) - } - if footerMagic != magic { - return nil, errors.New("puffin: invalid footer start magic") - } + // Total footer size: magic(4) + payload + trailer(12) + totalFooterSize := MagicSize + payloadSize + footerTrailerSize + + var payload []byte + if totalFooterSize <= readSize { + // We already have the entire footer in buf + footerOffset := len(buf) - int(totalFooterSize) - // Read footer JSON payload - payload := make([]byte, payloadSize) - if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { - return nil, fmt.Errorf("puffin: read footer payload: %w", err) + // Validate footer start magic + if !bytes.Equal(buf[footerOffset:footerOffset+MagicSize], magic[:]) { + return nil, errors.New("puffin: invalid footer start magic") + } + payload = buf[footerOffset+MagicSize : len(buf)-footerTrailerSize] + } else { + // Footer is larger than our initial read, need additional reads + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if !bytes.Equal(footerMagic[:], magic[:]) { + return nil, errors.New("puffin: invalid footer start magic") + } + + payload = make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } } // Parse footer JSON var footer Footer - if err := json.Unmarshal(payload, &footer); err != nil { + if err := json.NewDecoder(bytes.NewReader(payload)).Decode(&footer); err != nil { return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) } @@ -194,11 +225,8 @@ func (r *PuffinReader) ReadFooter() (*Footer, error) { // ReadBlob reads the content of a specific blob by index. // The footer is read automatically if not already cached. -func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { - footer, err := r.ReadFooter() - if err != nil { - return nil, err - } +func (r *Reader) ReadBlob(index int) (*BlobData, error) { + footer := r.footer if index < 0 || index >= len(footer.Blobs) { return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) @@ -215,17 +243,12 @@ func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { // ReadBlobByMetadata reads a blob using its metadata directly. // This is useful when you have metadata from an external source. -// The footer must be read first to establish bounds checking. -func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { - if r.footer == nil { - return nil, errors.New("puffin: cannot read blob: footer not read") - } - +func (r *Reader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { return r.readBlobData(meta) } // readBlobData is the internal implementation for reading blob data. -func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { +func (r *Reader) readBlobData(meta BlobMetadata) ([]byte, error) { // Validate blob type if meta.Type == "" { return nil, errors.New("puffin: cannot read blob: type is required") @@ -236,29 +259,9 @@ func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { return nil, fmt.Errorf("puffin: cannot read blob: compression %q not supported", *meta.CompressionCodec) } - // Validate length - if meta.Length < 0 { - return nil, fmt.Errorf("puffin: invalid blob length %d", meta.Length) - } - if meta.Length > r.maxBlobSize { - return nil, fmt.Errorf("puffin: blob size %d exceeds limit %d", meta.Length, r.maxBlobSize) - } - - // Validate offset (must be after header magic) - if meta.Offset < MagicSize { - return nil, fmt.Errorf("puffin: invalid blob offset %d (before header)", meta.Offset) - } - - // Check for overflow - end := meta.Offset + meta.Length - if end < meta.Offset { - return nil, fmt.Errorf("puffin: blob offset+length overflow: offset=%d length=%d", meta.Offset, meta.Length) - } - - // Validate blob doesn't extend into footer - if end > r.footerStart { - return nil, fmt.Errorf("puffin: blob extends into footer: offset=%d length=%d footerStart=%d", - meta.Offset, meta.Length, r.footerStart) + // Validate offset/length + if err := r.validateRange(meta.Offset, meta.Length); err != nil { + return nil, fmt.Errorf("puffin: blob: %w", err) } // Read blob data @@ -271,13 +274,8 @@ func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { } // ReadAllBlobs reads all blobs from the file. -// Blobs are read in offset order for sequential I/O efficiency, -// but results are returned in the original footer order. -func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { - footer, err := r.ReadFooter() - if err != nil { - return nil, err - } +func (r *Reader) ReadAllBlobs() ([]*BlobData, error) { + footer := r.footer if len(footer.Blobs) == 0 { return nil, nil @@ -312,48 +310,47 @@ func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { } // ReadRange reads a raw byte range from the blob data region. -// The footer must be read first to establish bounds checking. -func (r *PuffinReader) ReadRange(offset, length int64) ([]byte, error) { - if r.footer == nil { - return nil, errors.New("puffin: cannot read range: footer not read") +// This is useful for streaming partial blob content or when the caller +// has external blob metadata. +func (r *Reader) ReadRange(offset, length int64) ([]byte, error) { + if err := r.validateRange(offset, length); err != nil { + return nil, fmt.Errorf("puffin: range: %w", err) + } + + buf := make([]byte, length) + if _, err := r.r.ReadAt(buf, offset); err != nil { + return nil, fmt.Errorf("puffin: read range: %w", err) } - // Validate length + return buf, nil +} + +// validateRange validates offset and length for reading from the blob data region. +func (r *Reader) validateRange(offset, length int64) error { if length < 0 { - return nil, fmt.Errorf("puffin: invalid range length %d", length) + return fmt.Errorf("invalid length %d", length) } if length > r.maxBlobSize { - return nil, fmt.Errorf("puffin: range size %d exceeds limit %d", length, r.maxBlobSize) + return fmt.Errorf("size %d exceeds limit %d", length, r.maxBlobSize) } - - // Validate offset if offset < MagicSize { - return nil, fmt.Errorf("puffin: invalid range offset %d (before header)", offset) + return fmt.Errorf("invalid offset %d (before header)", offset) } - // Check for overflow end := offset + length if end < offset { - return nil, fmt.Errorf("puffin: range offset+length overflow: offset=%d length=%d", offset, length) + return fmt.Errorf("offset+length overflow: offset=%d length=%d", offset, length) } - - // Validate range doesn't extend into footer if end > r.footerStart { - return nil, fmt.Errorf("puffin: range extends into footer: offset=%d length=%d footerStart=%d", + return fmt.Errorf("extends into footer: offset=%d length=%d footerStart=%d", offset, length, r.footerStart) } - // Read data - buf := make([]byte, length) - if _, err := r.r.ReadAt(buf, offset); err != nil { - return nil, fmt.Errorf("puffin: read range: %w", err) - } - - return buf, nil + return nil } // validateBlobs validates all blob metadata entries. -func (r *PuffinReader) validateBlobs(blobs []BlobMetadata, footerStart int64) error { +func (r *Reader) validateBlobs(blobs []BlobMetadata, footerStart int64) error { for i, blob := range blobs { // Type is required if blob.Type == "" { diff --git a/puffin/puffin_test.go b/puffin/puffin_test.go new file mode 100644 index 000000000..e1e87a249 --- /dev/null +++ b/puffin/puffin_test.go @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin_test + +import ( + "bytes" + "testing" + + "github.com/apache/iceberg-go/puffin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWriterEmptyFile(t *testing.T) { + var buf bytes.Buffer + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + // Verify minimum structure: header magic + footer magic + JSON + trailer + assert.GreaterOrEqual(t, buf.Len(), puffin.MagicSize*2+12) + + // Verify header magic + assert.Equal(t, []byte{'P', 'F', 'A', '1'}, buf.Bytes()[:4]) + + // Verify trailing magic + assert.Equal(t, []byte{'P', 'F', 'A', '1'}, buf.Bytes()[buf.Len()-4:]) +} + +func TestWriterSingleBlob(t *testing.T) { + var buf bytes.Buffer + blobData := []byte("hello world") + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + meta, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 123, + SequenceNumber: 1, + Fields: []int32{1, 2}, + }, blobData) + require.NoError(t, err) + + assert.Equal(t, puffin.BlobTypeDataSketchesTheta, meta.Type) + assert.Equal(t, int64(123), meta.SnapshotID) + assert.Equal(t, int64(1), meta.SequenceNumber) + assert.Equal(t, []int32{1, 2}, meta.Fields) + assert.Equal(t, int64(puffin.MagicSize), meta.Offset) // blob starts after header magic + assert.Equal(t, int64(len(blobData)), meta.Length) + + err = w.Finish() + require.NoError(t, err) +} + +func TestWriterMultipleBlobs(t *testing.T) { + var buf bytes.Buffer + blob1 := []byte("first blob") + blob2 := []byte("second blob data") + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + meta1, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 100, + Fields: []int32{1}, + }, blob1) + require.NoError(t, err) + + meta2, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, + SnapshotID: 200, + Fields: []int32{2}, + }, blob2) + require.NoError(t, err) + + // Second blob should start after first blob + assert.Equal(t, meta1.Offset+meta1.Length, meta2.Offset) + + err = w.Finish() + require.NoError(t, err) +} + +func TestWriterSetCreatedBy(t *testing.T) { + var buf bytes.Buffer + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + err = w.SetCreatedBy("test-app v1.0") + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + // Read back and verify + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + + footer, err := r.ReadFooter() + require.NoError(t, err) + + assert.Equal(t, "test-app v1.0", footer.Properties[puffin.CreatedBy]) +} + +func TestWriterSetProperties(t *testing.T) { + var buf bytes.Buffer + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + err = w.AddProperties(map[string]string{ + "custom-key": "custom-value", + }) + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + // Read back and verify + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + + footer, err := r.ReadFooter() + require.NoError(t, err) + + assert.Equal(t, "custom-value", footer.Properties["custom-key"]) +} + +func TestWriterErrors(t *testing.T) { + t.Run("nil writer", func(t *testing.T) { + _, err := puffin.NewWriter(nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "writer is nil") + }) + + t.Run("empty blob type", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + _, err = w.AddBlob(puffin.BlobMetadataInput{ + Type: "", // empty type + }, []byte("data")) + assert.Error(t, err) + assert.Contains(t, err.Error(), "type is required") + }) + + t.Run("add after finish", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + _, err = w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + Fields: []int32{1}, + }, []byte("data")) + assert.Error(t, err) + assert.Contains(t, err.Error(), "finalized") + }) + + t.Run("double finish", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + err = w.Finish() + assert.Error(t, err) + assert.Contains(t, err.Error(), "finalized") + }) +} + +func TestReaderErrors(t *testing.T) { + t.Run("nil reader", func(t *testing.T) { + _, err := puffin.NewReader(nil, 100) + assert.Error(t, err) + assert.Contains(t, err.Error(), "reader is nil") + }) + + t.Run("file too small", func(t *testing.T) { + data := make([]byte, 10) // too small + _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) + assert.Error(t, err) + assert.Contains(t, err.Error(), "too small") + }) + + t.Run("invalid header magic", func(t *testing.T) { + data := make([]byte, 100) + copy(data[0:4], []byte("XXXX")) // wrong magic + copy(data[96:100], []byte{'P', 'F', 'A', '1'}) + + _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid header magic") + }) + + t.Run("invalid trailing magic", func(t *testing.T) { + data := make([]byte, 100) + copy(data[0:4], []byte{'P', 'F', 'A', '1'}) + copy(data[96:100], []byte("XXXX")) // wrong trailing magic + + _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid trailing magic") + }) + + t.Run("blob index out of range", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + err = w.Finish() + require.NoError(t, err) + + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + + _, err = r.ReadBlob(0) // no blobs, index 0 is out of range + assert.Error(t, err) + assert.Contains(t, err.Error(), "out of range") + }) +} + +func TestRoundTrip(t *testing.T) { + tests := []struct { + name string + blobs []struct { + data []byte + input puffin.BlobMetadataInput + } + }{ + { + name: "empty file", + blobs: nil, + }, + { + name: "single blob", + blobs: []struct { + data []byte + input puffin.BlobMetadataInput + }{ + { + data: []byte("test data"), + input: puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 1, + SequenceNumber: 1, + Fields: []int32{1}, + }, + }, + }, + }, + { + name: "multiple blobs", + blobs: []struct { + data []byte + input puffin.BlobMetadataInput + }{ + { + data: []byte("first blob data"), + input: puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 100, + SequenceNumber: 1, + Fields: []int32{1, 2}, + }, + }, + { + data: []byte("second blob with more data"), + input: puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, + SnapshotID: 200, + SequenceNumber: 2, + Fields: []int32{3}, + Properties: map[string]string{"key": "value"}, + }, + }, + { + data: []byte("third"), + input: puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 300, + SequenceNumber: 3, + Fields: []int32{4, 5, 6}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Write + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + for _, blob := range tt.blobs { + _, err := w.AddBlob(blob.input, blob.data) + require.NoError(t, err) + } + + err = w.Finish() + require.NoError(t, err) + + // Read + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + + footer, err := r.ReadFooter() + require.NoError(t, err) + + // Verify blob count + assert.Len(t, footer.Blobs, len(tt.blobs)) + + // Verify each blob + for i, expected := range tt.blobs { + blobData, err := r.ReadBlob(i) + require.NoError(t, err) + + assert.Equal(t, expected.data, blobData.Data, "blob %d data mismatch", i) + assert.Equal(t, expected.input.Type, blobData.Metadata.Type, "blob %d type mismatch", i) + assert.Equal(t, expected.input.SnapshotID, blobData.Metadata.SnapshotID, "blob %d snapshot-id mismatch", i) + assert.Equal(t, expected.input.SequenceNumber, blobData.Metadata.SequenceNumber, "blob %d sequence-number mismatch", i) + assert.Equal(t, expected.input.Fields, blobData.Metadata.Fields, "blob %d fields mismatch", i) + + if expected.input.Properties != nil { + assert.Equal(t, expected.input.Properties, blobData.Metadata.Properties, "blob %d properties mismatch", i) + } + } + + // Also test ReadAllBlobs + if len(tt.blobs) > 0 { + allBlobs, err := r.ReadAllBlobs() + require.NoError(t, err) + assert.Len(t, allBlobs, len(tt.blobs)) + + for i, expected := range tt.blobs { + assert.Equal(t, expected.data, allBlobs[i].Data) + } + } + }) + } +} + +func TestReadBlobByMetadata(t *testing.T) { + // Write a file with a blob + var buf bytes.Buffer + blobData := []byte("metadata test blob") + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + meta, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 42, + Fields: []int32{1}, + }, blobData) + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + // Read using metadata directly + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + + // Must read footer first + _, err = r.ReadFooter() + require.NoError(t, err) + + data, err := r.ReadBlobByMetadata(meta) + require.NoError(t, err) + + assert.Equal(t, blobData, data) +} + +func TestReadRange(t *testing.T) { + // Write a file with a known blob + var buf bytes.Buffer + blobData := []byte("0123456789") + + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + meta, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: 1, + Fields: []int32{1}, + }, blobData) + require.NoError(t, err) + + err = w.Finish() + require.NoError(t, err) + + // Read a range + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + require.NoError(t, err) + + _, err = r.ReadFooter() + require.NoError(t, err) + + // Read middle portion of blob + data, err := r.ReadRange(meta.Offset+2, 5) + require.NoError(t, err) + + assert.Equal(t, []byte("23456"), data) +} diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go index 9fcf227ec..1a1d40c8c 100644 --- a/puffin/puffin_writer.go +++ b/puffin/puffin_writer.go @@ -23,28 +23,12 @@ import ( "fmt" "io" "math" -) -// writeAll writes all bytes to w or returns an error. -// Handles the io.Writer contract where Write can return n < len(data) without error. -func writeAll(w io.Writer, data []byte) error { - n, err := w.Write(data) - if err != nil { - return err - } - if n != len(data) { - return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) - } - - return nil -} + "github.com/apache/iceberg-go" +) // PuffinWriter writes blobs and metadata to a Puffin file. // -// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) -// along with JSON metadata in a footer. The writer handles the binary layout: -// header magic, blob data, and footer structure. -// // Usage: // // w, err := puffin.NewWriter(file) @@ -52,7 +36,7 @@ func writeAll(w io.Writer, data []byte) error { // return err // } // _, err = w.AddBlob(puffin.BlobMetadataInput{ -// Type: puffin.ApacheDataSketchesThetaV1, +// Type: puffin.BlobTypeDataSketchesTheta, // SnapshotID: 123, // Fields: []int32{1}, // }, sketchBytes) @@ -60,7 +44,7 @@ func writeAll(w io.Writer, data []byte) error { // return err // } // return w.Finish() -type PuffinWriter struct { +type Writer struct { w io.Writer offset int64 blobs []BlobMetadata @@ -72,7 +56,7 @@ type PuffinWriter struct { // BlobMetadataInput contains fields the caller provides when adding a blob. // Offset, Length, and CompressionCodec are set by the writer. type BlobMetadataInput struct { - Type string + Type BlobType SnapshotID int64 SequenceNumber int64 Fields []int32 @@ -81,7 +65,7 @@ type BlobMetadataInput struct { // NewWriter creates a new PuffinWriter and writes the file header magic. // The caller is responsible for closing the underlying writer after Finish returns. -func NewWriter(w io.Writer) (*PuffinWriter, error) { +func NewWriter(w io.Writer) (*Writer, error) { if w == nil { return nil, errors.New("puffin: writer is nil") } @@ -91,17 +75,17 @@ func NewWriter(w io.Writer) (*PuffinWriter, error) { return nil, fmt.Errorf("puffin: write header magic: %w", err) } - return &PuffinWriter{ + return &Writer{ w: w, offset: MagicSize, props: make(map[string]string), - createdBy: "iceberg-go", + createdBy: fmt.Sprintf("iceberg-go %s", iceberg.Version()), }, nil } // SetProperties merges the provided properties into the file-level properties // written to the footer. Can be called multiple times before Finish. -func (w *PuffinWriter) SetProperties(props map[string]string) error { +func (w *Writer) AddProperties(props map[string]string) error { if w.done { return errors.New("puffin: cannot set properties: writer already finalized") } @@ -112,9 +96,14 @@ func (w *PuffinWriter) SetProperties(props map[string]string) error { return nil } +// clear properties +func (w *Writer) ClearProperties() { + w.props = make(map[string]string) +} + // SetCreatedBy overrides the default "created-by" property written to the footer. // The default value is "iceberg-go". Example: "MyApp version 1.2.3". -func (w *PuffinWriter) SetCreatedBy(createdBy string) error { +func (w *Writer) SetCreatedBy(createdBy string) error { if w.done { return errors.New("puffin: cannot set created-by: writer already finalized") } @@ -129,13 +118,16 @@ func (w *PuffinWriter) SetCreatedBy(createdBy string) error { // AddBlob writes blob data and records its metadata for the footer. // Returns the complete BlobMetadata including the computed Offset and Length. // The input.Type is required; use constants like ApacheDataSketchesThetaV1. -func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { +func (w *Writer) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { if w.done { return BlobMetadata{}, errors.New("puffin: cannot add blob: writer already finalized") } if input.Type == "" { return BlobMetadata{}, errors.New("puffin: cannot add blob: type is required") } + if input.Fields == nil { + return BlobMetadata{}, errors.New("puffin: cannot add blob: fields is required") + } meta := BlobMetadata{ Type: input.Type, @@ -160,7 +152,7 @@ func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetada // Finish writes the footer and completes the Puffin file structure. // Must be called exactly once after all blobs are written. // After Finish returns, no further operations are allowed on the writer. -func (w *PuffinWriter) Finish() error { +func (w *Writer) Finish() error { if w.done { return errors.New("puffin: cannot finish: writer already finalized") } @@ -211,3 +203,17 @@ func (w *PuffinWriter) Finish() error { return nil } + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} From 91c2a6a6492464f199ae112cb6c58c6ef9b13358 Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Wed, 14 Jan 2026 15:43:50 +0530 Subject: [PATCH 6/7] chore: adding test Signed-off-by: Shreyas220 --- puffin/puffin.go | 20 +- puffin/puffin_test.go | 487 ++++++++++++++++------------------------ puffin/puffin_writer.go | 10 + 3 files changed, 226 insertions(+), 291 deletions(-) diff --git a/puffin/puffin.go b/puffin/puffin.go index abd34c827..3482080a2 100644 --- a/puffin/puffin.go +++ b/puffin/puffin.go @@ -14,6 +14,18 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// +// Package puffin provides reading and writing of Puffin files. +// +// Puffin is a file format designed to store statistics and indexes +// for Iceberg tables. A Puffin file contains blobs (opaque byte sequences) +// with associated metadata, such as Apache DataSketches or deletion vectors. +// +// File structure: +// +// [Magic] [Blob]* [Magic] [Footer Payload] [Footer Payload Size] [Flags] [Magic] +// +// See the specification at https://iceberg.apache.org/puffin-spec/ package puffin var magic = [4]byte{'P', 'F', 'A', '1'} @@ -22,15 +34,18 @@ const ( //[Magic] [FooterPayload] [FooterPayloadSize] [Flags] [Magic] // MagicSize is the number of bytes in the magic marker. MagicSize = 4 + // footerTrailerSize accounts for footer length (4)+ flags (4) + trailing magic (4). footerTrailerSize = 12 + // FooterFlagCompressed indicates a compressed footer; unsupported in this implementation. FooterFlagCompressed = 1 // bit 0 + // Prevents OOM // DefaultMaxBlobSize is the maximum blob size allowed when reading (256 MB). - // This prevents OOM attacks from malicious files with huge blob lengths. // Override with WithMaxBlobSize when creating a reader. DefaultMaxBlobSize = 256 << 20 + // CreatedBy is a human-readable identification of the application writing the file, along with its version. // Example: "Trino version 381". CreatedBy = "created-by" @@ -41,7 +56,7 @@ type BlobMetadata struct { SnapshotID int64 `json:"snapshot-id"` SequenceNumber int64 `json:"sequence-number"` Fields []int32 `json:"fields"` - Offset int64 `json:"offset"` // absolute file offset + Offset int64 `json:"offset"` Length int64 `json:"length"` CompressionCodec *string `json:"compression-codec,omitempty"` Properties map[string]string `json:"properties,omitempty"` @@ -63,5 +78,4 @@ const ( // BlobTypeDeletionVector is a serialized deletion vector per the // Iceberg spec. Requires snapshot-id and sequence-number to be -1. BlobTypeDeletionVector BlobType = "deletion-vector-v1" - ) diff --git a/puffin/puffin_test.go b/puffin/puffin_test.go index e1e87a249..569be8cfe 100644 --- a/puffin/puffin_test.go +++ b/puffin/puffin_test.go @@ -6,7 +6,7 @@ // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an @@ -26,146 +26,157 @@ import ( "github.com/stretchr/testify/require" ) -func TestWriterEmptyFile(t *testing.T) { - var buf bytes.Buffer +// TestRoundTrip verifies that data written by Writer can be read back by Reader +// with all metadata and blob data preserved exactly. +func TestRoundTrip(t *testing.T) { + // Test data + blob1Data := []byte("theta sketch data here") + blob2Data := []byte("another blob with different content") + // Write puffin file to buffer + var buf bytes.Buffer w, err := puffin.NewWriter(&buf) require.NoError(t, err) - err = w.Finish() - require.NoError(t, err) - - // Verify minimum structure: header magic + footer magic + JSON + trailer - assert.GreaterOrEqual(t, buf.Len(), puffin.MagicSize*2+12) - - // Verify header magic - assert.Equal(t, []byte{'P', 'F', 'A', '1'}, buf.Bytes()[:4]) - - // Verify trailing magic - assert.Equal(t, []byte{'P', 'F', 'A', '1'}, buf.Bytes()[buf.Len()-4:]) -} - -func TestWriterSingleBlob(t *testing.T) { - var buf bytes.Buffer - blobData := []byte("hello world") - - w, err := puffin.NewWriter(&buf) + // Add file-level properties + err = w.AddProperties(map[string]string{ + "test-property": "test-value", + }) require.NoError(t, err) - meta, err := w.AddBlob(puffin.BlobMetadataInput{ + // Add first blob (DataSketches type) + meta1, err := w.AddBlob(puffin.BlobMetadataInput{ Type: puffin.BlobTypeDataSketchesTheta, SnapshotID: 123, SequenceNumber: 1, - Fields: []int32{1, 2}, - }, blobData) - require.NoError(t, err) - - assert.Equal(t, puffin.BlobTypeDataSketchesTheta, meta.Type) - assert.Equal(t, int64(123), meta.SnapshotID) - assert.Equal(t, int64(1), meta.SequenceNumber) - assert.Equal(t, []int32{1, 2}, meta.Fields) - assert.Equal(t, int64(puffin.MagicSize), meta.Offset) // blob starts after header magic - assert.Equal(t, int64(len(blobData)), meta.Length) - - err = w.Finish() - require.NoError(t, err) -} - -func TestWriterMultipleBlobs(t *testing.T) { - var buf bytes.Buffer - blob1 := []byte("first blob") - blob2 := []byte("second blob data") - - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - meta1, err := w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - SnapshotID: 100, - Fields: []int32{1}, - }, blob1) + Fields: []int32{1, 2, 3}, + Properties: map[string]string{"ndv": "1000"}, + }, blob1Data) require.NoError(t, err) + // Add second blob (DeletionVector type) meta2, err := w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDeletionVector, - SnapshotID: 200, - Fields: []int32{2}, - }, blob2) + Type: puffin.BlobTypeDeletionVector, + SnapshotID: -1, + SequenceNumber: -1, + Fields: []int32{}, + Properties: map[string]string{ + "referenced-data-file": "data/file.parquet", + "cardinality": "42", + }, + }, blob2Data) require.NoError(t, err) - // Second blob should start after first blob - assert.Equal(t, meta1.Offset+meta1.Length, meta2.Offset) - err = w.Finish() require.NoError(t, err) -} - -func TestWriterSetCreatedBy(t *testing.T) { - var buf bytes.Buffer - - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - err = w.SetCreatedBy("test-app v1.0") + // Read puffin file back + data := buf.Bytes() + r, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) require.NoError(t, err) - err = w.Finish() + // Verify footer + footer := r.Footer() + require.NotNil(t, footer) + assert.Len(t, footer.Blobs, 2) + + // Verify file properties + assert.Equal(t, "test-value", footer.Properties["test-property"]) + assert.Contains(t, footer.Properties[puffin.CreatedBy], "iceberg-go") + + // Verify first blob metadata + assert.Equal(t, puffin.BlobTypeDataSketchesTheta, footer.Blobs[0].Type) + assert.Equal(t, int64(123), footer.Blobs[0].SnapshotID) + assert.Equal(t, int64(1), footer.Blobs[0].SequenceNumber) + assert.Equal(t, []int32{1, 2, 3}, footer.Blobs[0].Fields) + assert.Equal(t, meta1.Offset, footer.Blobs[0].Offset) + assert.Equal(t, meta1.Length, footer.Blobs[0].Length) + assert.Equal(t, "1000", footer.Blobs[0].Properties["ndv"]) + + // Verify second blob metadata + assert.Equal(t, puffin.BlobTypeDeletionVector, footer.Blobs[1].Type) + assert.Equal(t, int64(-1), footer.Blobs[1].SnapshotID) + assert.Equal(t, int64(-1), footer.Blobs[1].SequenceNumber) + assert.Equal(t, []int32{}, footer.Blobs[1].Fields) + assert.Equal(t, meta2.Offset, footer.Blobs[1].Offset) + assert.Equal(t, meta2.Length, footer.Blobs[1].Length) + + // Verify blob data via ReadBlob + blobData1, err := r.ReadBlob(0) require.NoError(t, err) + assert.Equal(t, blob1Data, blobData1.Data) - // Read back and verify - r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + blobData2, err := r.ReadBlob(1) require.NoError(t, err) + assert.Equal(t, blob2Data, blobData2.Data) - footer, err := r.ReadFooter() + // Verify ReadAllBlobs + allBlobs, err := r.ReadAllBlobs() require.NoError(t, err) - - assert.Equal(t, "test-app v1.0", footer.Properties[puffin.CreatedBy]) + assert.Len(t, allBlobs, 2) + assert.Equal(t, blob1Data, allBlobs[0].Data) + assert.Equal(t, blob2Data, allBlobs[1].Data) } -func TestWriterSetProperties(t *testing.T) { +// TestEmptyFile verifies that a puffin file with no blobs is valid. +func TestEmptyFile(t *testing.T) { var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) require.NoError(t, err) - err = w.AddProperties(map[string]string{ - "custom-key": "custom-value", - }) - require.NoError(t, err) - err = w.Finish() require.NoError(t, err) - // Read back and verify - r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) + data := buf.Bytes() + r, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) require.NoError(t, err) - footer, err := r.ReadFooter() - require.NoError(t, err) + footer := r.Footer() + assert.NotNil(t, footer) + assert.Len(t, footer.Blobs, 0) + assert.Contains(t, footer.Properties[puffin.CreatedBy], "iceberg-go") - assert.Equal(t, "custom-value", footer.Properties["custom-key"]) + // ReadAllBlobs should return nil for empty file + blobs, err := r.ReadAllBlobs() + require.NoError(t, err) + assert.Nil(t, blobs) } -func TestWriterErrors(t *testing.T) { +// TestWriterValidation verifies that Writer rejects invalid input. +func TestWriterValidation(t *testing.T) { t.Run("nil writer", func(t *testing.T) { _, err := puffin.NewWriter(nil) assert.Error(t, err) - assert.Contains(t, err.Error(), "writer is nil") + assert.ErrorContains(t, err, "nil") + }) + + t.Run("missing type", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + _, err = w.AddBlob(puffin.BlobMetadataInput{ + Type: "", // missing + Fields: []int32{1}, + }, []byte("data")) + assert.Error(t, err) + assert.ErrorContains(t, err, "type") }) - t.Run("empty blob type", func(t *testing.T) { + t.Run("nil fields", func(t *testing.T) { var buf bytes.Buffer w, err := puffin.NewWriter(&buf) require.NoError(t, err) _, err = w.AddBlob(puffin.BlobMetadataInput{ - Type: "", // empty type + Type: puffin.BlobTypeDataSketchesTheta, + Fields: nil, // nil not allowed }, []byte("data")) assert.Error(t, err) - assert.Contains(t, err.Error(), "type is required") + assert.ErrorContains(t, err, "fields") }) - t.Run("add after finish", func(t *testing.T) { + t.Run("add blob after finish", func(t *testing.T) { var buf bytes.Buffer w, err := puffin.NewWriter(&buf) require.NoError(t, err) @@ -178,7 +189,7 @@ func TestWriterErrors(t *testing.T) { Fields: []int32{1}, }, []byte("data")) assert.Error(t, err) - assert.Contains(t, err.Error(), "finalized") + assert.ErrorContains(t, err, "finalized") }) t.Run("double finish", func(t *testing.T) { @@ -191,242 +202,142 @@ func TestWriterErrors(t *testing.T) { err = w.Finish() assert.Error(t, err) - assert.Contains(t, err.Error(), "finalized") + assert.ErrorContains(t, err, "finalized") + }) + + t.Run("deletion vector with invalid snapshot-id", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + _, err = w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, + SnapshotID: 123, // must be -1 + SequenceNumber: -1, + Fields: []int32{}, + }, []byte("data")) + assert.Error(t, err) + assert.ErrorContains(t, err, "snapshot-id") + }) + + t.Run("deletion vector with invalid sequence-number", func(t *testing.T) { + var buf bytes.Buffer + w, err := puffin.NewWriter(&buf) + require.NoError(t, err) + + _, err = w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, + SnapshotID: -1, + SequenceNumber: 5, // must be -1 + Fields: []int32{}, + }, []byte("data")) + assert.Error(t, err) + assert.ErrorContains(t, err, "sequence-number") }) } -func TestReaderErrors(t *testing.T) { +// TestReaderInvalidFile verifies that Reader rejects invalid/corrupt files. +func TestReaderInvalidFile(t *testing.T) { t.Run("nil reader", func(t *testing.T) { _, err := puffin.NewReader(nil, 100) assert.Error(t, err) - assert.Contains(t, err.Error(), "reader is nil") + assert.ErrorContains(t, err, "nil") }) t.Run("file too small", func(t *testing.T) { - data := make([]byte, 10) // too small + data := []byte("tiny") _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) assert.Error(t, err) - assert.Contains(t, err.Error(), "too small") + assert.ErrorContains(t, err, "too small") }) t.Run("invalid header magic", func(t *testing.T) { - data := make([]byte, 100) - copy(data[0:4], []byte("XXXX")) // wrong magic - copy(data[96:100], []byte{'P', 'F', 'A', '1'}) - - _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) - assert.Error(t, err) - assert.Contains(t, err.Error(), "invalid header magic") - }) + // Create valid file first + var buf bytes.Buffer + w, _ := puffin.NewWriter(&buf) + w.Finish() + data := buf.Bytes() - t.Run("invalid trailing magic", func(t *testing.T) { - data := make([]byte, 100) - copy(data[0:4], []byte{'P', 'F', 'A', '1'}) - copy(data[96:100], []byte("XXXX")) // wrong trailing magic + // Corrupt header magic + data[0] = 'X' _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) assert.Error(t, err) - assert.Contains(t, err.Error(), "invalid trailing magic") + assert.ErrorContains(t, err, "magic") }) - t.Run("blob index out of range", func(t *testing.T) { + t.Run("invalid trailing magic", func(t *testing.T) { + // Create valid file first var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - err = w.Finish() - require.NoError(t, err) + w, _ := puffin.NewWriter(&buf) + w.Finish() + data := buf.Bytes() - r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) + // Corrupt trailing magic (last 4 bytes) + data[len(data)-1] = 'X' - _, err = r.ReadBlob(0) // no blobs, index 0 is out of range + _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) assert.Error(t, err) - assert.Contains(t, err.Error(), "out of range") + assert.ErrorContains(t, err, "magic") }) } -func TestRoundTrip(t *testing.T) { - tests := []struct { - name string - blobs []struct { - data []byte - input puffin.BlobMetadataInput - } - }{ - { - name: "empty file", - blobs: nil, - }, - { - name: "single blob", - blobs: []struct { - data []byte - input puffin.BlobMetadataInput - }{ - { - data: []byte("test data"), - input: puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - SnapshotID: 1, - SequenceNumber: 1, - Fields: []int32{1}, - }, - }, - }, - }, - { - name: "multiple blobs", - blobs: []struct { - data []byte - input puffin.BlobMetadataInput - }{ - { - data: []byte("first blob data"), - input: puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - SnapshotID: 100, - SequenceNumber: 1, - Fields: []int32{1, 2}, - }, - }, - { - data: []byte("second blob with more data"), - input: puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDeletionVector, - SnapshotID: 200, - SequenceNumber: 2, - Fields: []int32{3}, - Properties: map[string]string{"key": "value"}, - }, - }, - { - data: []byte("third"), - input: puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - SnapshotID: 300, - SequenceNumber: 3, - Fields: []int32{4, 5, 6}, - }, - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Write - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - for _, blob := range tt.blobs { - _, err := w.AddBlob(blob.input, blob.data) - require.NoError(t, err) - } - - err = w.Finish() - require.NoError(t, err) - - // Read - r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) - - footer, err := r.ReadFooter() - require.NoError(t, err) - - // Verify blob count - assert.Len(t, footer.Blobs, len(tt.blobs)) - - // Verify each blob - for i, expected := range tt.blobs { - blobData, err := r.ReadBlob(i) - require.NoError(t, err) - - assert.Equal(t, expected.data, blobData.Data, "blob %d data mismatch", i) - assert.Equal(t, expected.input.Type, blobData.Metadata.Type, "blob %d type mismatch", i) - assert.Equal(t, expected.input.SnapshotID, blobData.Metadata.SnapshotID, "blob %d snapshot-id mismatch", i) - assert.Equal(t, expected.input.SequenceNumber, blobData.Metadata.SequenceNumber, "blob %d sequence-number mismatch", i) - assert.Equal(t, expected.input.Fields, blobData.Metadata.Fields, "blob %d fields mismatch", i) - - if expected.input.Properties != nil { - assert.Equal(t, expected.input.Properties, blobData.Metadata.Properties, "blob %d properties mismatch", i) - } - } - - // Also test ReadAllBlobs - if len(tt.blobs) > 0 { - allBlobs, err := r.ReadAllBlobs() - require.NoError(t, err) - assert.Len(t, allBlobs, len(tt.blobs)) - - for i, expected := range tt.blobs { - assert.Equal(t, expected.data, allBlobs[i].Data) - } - } - }) - } -} - -func TestReadBlobByMetadata(t *testing.T) { - // Write a file with a blob +// TestReaderBlobAccess verifies blob access methods work correctly. +func TestReaderBlobAccess(t *testing.T) { + // Create file with multiple blobs var buf bytes.Buffer - blobData := []byte("metadata test blob") - w, err := puffin.NewWriter(&buf) require.NoError(t, err) - meta, err := w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - SnapshotID: 42, - Fields: []int32{1}, - }, blobData) - require.NoError(t, err) - - err = w.Finish() - require.NoError(t, err) - - // Read using metadata directly - r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) - - // Must read footer first - _, err = r.ReadFooter() - require.NoError(t, err) - - data, err := r.ReadBlobByMetadata(meta) - require.NoError(t, err) - - assert.Equal(t, blobData, data) -} + blobs := [][]byte{ + []byte("first"), + []byte("second"), + []byte("third"), + } -func TestReadRange(t *testing.T) { - // Write a file with a known blob - var buf bytes.Buffer - blobData := []byte("0123456789") + for _, blob := range blobs { + _, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + Fields: []int32{}, + }, blob) + require.NoError(t, err) + } + require.NoError(t, w.Finish()) - w, err := puffin.NewWriter(&buf) + data := buf.Bytes() + r, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) require.NoError(t, err) - meta, err := w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - SnapshotID: 1, - Fields: []int32{1}, - }, blobData) - require.NoError(t, err) + t.Run("read by index", func(t *testing.T) { + for i, expected := range blobs { + blobData, err := r.ReadBlob(i) + require.NoError(t, err) + assert.Equal(t, expected, blobData.Data) + } + }) - err = w.Finish() - require.NoError(t, err) + t.Run("index out of range", func(t *testing.T) { + _, err := r.ReadBlob(-1) + assert.Error(t, err) - // Read a range - r, err := puffin.NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len())) - require.NoError(t, err) + _, err = r.ReadBlob(100) + assert.Error(t, err) + }) - _, err = r.ReadFooter() - require.NoError(t, err) + t.Run("read by metadata", func(t *testing.T) { + meta := r.Footer().Blobs[1] + data, err := r.ReadBlobByMetadata(meta) + require.NoError(t, err) + assert.Equal(t, blobs[1], data) + }) - // Read middle portion of blob - data, err := r.ReadRange(meta.Offset+2, 5) - require.NoError(t, err) + t.Run("read all preserves order", func(t *testing.T) { + allBlobs, err := r.ReadAllBlobs() + require.NoError(t, err) + require.Len(t, allBlobs, 3) - assert.Equal(t, []byte("23456"), data) + for i, expected := range blobs { + assert.Equal(t, expected, allBlobs[i].Data) + } + }) } diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go index 1a1d40c8c..30c14f651 100644 --- a/puffin/puffin_writer.go +++ b/puffin/puffin_writer.go @@ -129,6 +129,16 @@ func (w *Writer) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, er return BlobMetadata{}, errors.New("puffin: cannot add blob: fields is required") } + // Deletion vectors have special requirements per spec + if input.Type == BlobTypeDeletionVector { + if input.SnapshotID != -1 { + return BlobMetadata{}, errors.New("puffin: deletion-vector-v1 requires snapshot-id to be -1") + } + if input.SequenceNumber != -1 { + return BlobMetadata{}, errors.New("puffin: deletion-vector-v1 requires sequence-number to be -1") + } + } + meta := BlobMetadata{ Type: input.Type, SnapshotID: input.SnapshotID, From 3e3ea8350434e4b0978466d9e98721728c9fb09c Mon Sep 17 00:00:00 2001 From: Shreyas220 Date: Tue, 20 Jan 2026 00:35:31 +0530 Subject: [PATCH 7/7] feat: fmt + review comments + tests Signed-off-by: Shreyas220 --- puffin/puffin_reader.go | 72 +++--- puffin/puffin_test.go | 477 ++++++++++++++++++++++++---------------- puffin/puffin_writer.go | 6 +- 3 files changed, 324 insertions(+), 231 deletions(-) diff --git a/puffin/puffin_reader.go b/puffin/puffin_reader.go index ded03af53..5064e1648 100644 --- a/puffin/puffin_reader.go +++ b/puffin/puffin_reader.go @@ -27,11 +27,18 @@ import ( "sort" ) +// ReaderAtSeeker combines io.ReaderAt and io.Seeker for reading Puffin files. +// This interface is implemented by *os.File, *bytes.Reader, and similar types. +type ReaderAtSeeker interface { + io.ReaderAt + io.Seeker +} + // Reader reads blobs and metadata from a Puffin file. // // Usage: // -// r, err := puffin.NewReader(file, fileSize) +// r, err := puffin.NewReader(file) // if err != nil { // return err // } @@ -40,7 +47,7 @@ import ( // // process blob.Data // } type Reader struct { - r io.ReaderAt + r ReaderAtSeeker size int64 footer *Footer footerStart int64 // cached after ReadFooter @@ -66,14 +73,20 @@ func WithMaxBlobSize(size int64) ReaderOption { } // NewReader creates a new Puffin file reader. -// The size parameter must be the total size of the file in bytes. +// The file size is auto-detected using Seek. // It validates magic bytes and reads the footer eagerly. -// The caller is responsible for closing the underlying io.ReaderAt. -func NewReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*Reader, error) { +// The caller is responsible for closing the underlying reader. +func NewReader(r ReaderAtSeeker, opts ...ReaderOption) (*Reader, error) { if r == nil { return nil, errors.New("puffin: reader is nil") } + // Auto-detect file size + size, err := r.Seek(0, io.SeekEnd) + if err != nil { + return nil, fmt.Errorf("puffin: detect file size: %w", err) + } + // Minimum size: header magic + footer magic + footer trailer // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] minSize := int64(MagicSize + MagicSize + footerTrailerSize) @@ -90,15 +103,6 @@ func NewReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*Reader, error) return nil, errors.New("puffin: invalid header magic") } - // Validate trailing magic (fail fast on corrupt/truncated files) - var trailingMagic [MagicSize]byte - if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { - return nil, fmt.Errorf("puffin: read trailing magic: %w", err) - } - if !bytes.Equal(trailingMagic[:], magic[:]) { - return nil, errors.New("puffin: invalid trailing magic") - } - pr := &Reader{ r: r, size: size, @@ -159,7 +163,7 @@ func (r *Reader) readFooter() (*Footer, error) { return nil, errors.New("puffin: compressed footer not supported") } - // Check for unknown flags (future-proofing) + // Check for unknown flags if flags&^uint32(FooterFlagCompressed) != 0 { return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) } @@ -179,18 +183,15 @@ func (r *Reader) readFooter() (*Footer, error) { // Total footer size: magic(4) + payload + trailer(12) totalFooterSize := MagicSize + payloadSize + footerTrailerSize - var payload []byte + // Validate footer start magic if totalFooterSize <= readSize { - // We already have the entire footer in buf + // We already have the footer magic in buf footerOffset := len(buf) - int(totalFooterSize) - - // Validate footer start magic if !bytes.Equal(buf[footerOffset:footerOffset+MagicSize], magic[:]) { return nil, errors.New("puffin: invalid footer start magic") } - payload = buf[footerOffset+MagicSize : len(buf)-footerTrailerSize] } else { - // Footer is larger than our initial read, need additional reads + // Footer is larger than our initial read, need to read magic separately var footerMagic [MagicSize]byte if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { return nil, fmt.Errorf("puffin: read footer start magic: %w", err) @@ -198,16 +199,11 @@ func (r *Reader) readFooter() (*Footer, error) { if !bytes.Equal(footerMagic[:], magic[:]) { return nil, errors.New("puffin: invalid footer start magic") } - - payload = make([]byte, payloadSize) - if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { - return nil, fmt.Errorf("puffin: read footer payload: %w", err) - } } - // Parse footer JSON + payloadReader := io.NewSectionReader(r.r, footerStart+MagicSize, payloadSize) var footer Footer - if err := json.NewDecoder(bytes.NewReader(payload)).Decode(&footer); err != nil { + if err := json.NewDecoder(payloadReader).Decode(&footer); err != nil { return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) } @@ -309,20 +305,16 @@ func (r *Reader) ReadAllBlobs() ([]*BlobData, error) { return results, nil } -// ReadRange reads a raw byte range from the blob data region. -// This is useful for streaming partial blob content or when the caller -// has external blob metadata. -func (r *Reader) ReadRange(offset, length int64) ([]byte, error) { - if err := r.validateRange(offset, length); err != nil { - return nil, fmt.Errorf("puffin: range: %w", err) - } - - buf := make([]byte, length) - if _, err := r.r.ReadAt(buf, offset); err != nil { - return nil, fmt.Errorf("puffin: read range: %w", err) +// ReadAt implements io.ReaderAt, reading from the blob data region. +// It validates that the read range is within the blob data region +// This is useful for deletion vector use case. +// offset/length pointing directly into the Puffin file in manifest. +func (r *Reader) ReadAt(p []byte, off int64) (n int, err error) { + if err := r.validateRange(off, int64(len(p))); err != nil { + return 0, fmt.Errorf("puffin: %w", err) } - return buf, nil + return r.r.ReadAt(p, off) } // validateRange validates offset and length for reading from the blob data region. diff --git a/puffin/puffin_test.go b/puffin/puffin_test.go index 569be8cfe..f768333f6 100644 --- a/puffin/puffin_test.go +++ b/puffin/puffin_test.go @@ -19,6 +19,7 @@ package puffin_test import ( "bytes" + "math" "testing" "github.com/apache/iceberg-go/puffin" @@ -26,25 +27,55 @@ import ( "github.com/stretchr/testify/require" ) -// TestRoundTrip verifies that data written by Writer can be read back by Reader -// with all metadata and blob data preserved exactly. +// --- Test Helpers --- + +func newWriter() (*puffin.Writer, *bytes.Buffer) { + buf := &bytes.Buffer{} + w, _ := puffin.NewWriter(buf) + + return w, buf +} + +func newReader(t *testing.T, buf *bytes.Buffer) *puffin.Reader { + r, err := puffin.NewReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + + return r +} + +func defaultBlobInput() puffin.BlobMetadataInput { + return puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + Fields: []int32{}, + } +} + +func validFile() []byte { + w, buf := newWriter() + w.Finish() + + return buf.Bytes() +} + +func validFileWithBlob() []byte { + w, buf := newWriter() + w.AddBlob(defaultBlobInput(), []byte("test data")) + w.Finish() + + return buf.Bytes() +} + +// --- Tests --- + +// TestRoundTrip verifies that data written by Writer can be read back by Reader. +// This is the core integration test ensuring the puffin format is correctly implemented. func TestRoundTrip(t *testing.T) { - // Test data blob1Data := []byte("theta sketch data here") blob2Data := []byte("another blob with different content") - // Write puffin file to buffer - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - // Add file-level properties - err = w.AddProperties(map[string]string{ - "test-property": "test-value", - }) - require.NoError(t, err) + w, buf := newWriter() + w.AddProperties(map[string]string{"test-property": "test-value"}) - // Add first blob (DataSketches type) meta1, err := w.AddBlob(puffin.BlobMetadataInput{ Type: puffin.BlobTypeDataSketchesTheta, SnapshotID: 123, @@ -54,37 +85,24 @@ func TestRoundTrip(t *testing.T) { }, blob1Data) require.NoError(t, err) - // Add second blob (DeletionVector type) meta2, err := w.AddBlob(puffin.BlobMetadataInput{ Type: puffin.BlobTypeDeletionVector, SnapshotID: -1, SequenceNumber: -1, Fields: []int32{}, - Properties: map[string]string{ - "referenced-data-file": "data/file.parquet", - "cardinality": "42", - }, + Properties: map[string]string{"referenced-data-file": "data/file.parquet"}, }, blob2Data) require.NoError(t, err) + require.NoError(t, w.Finish()) - err = w.Finish() - require.NoError(t, err) - - // Read puffin file back - data := buf.Bytes() - r, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) - require.NoError(t, err) - - // Verify footer + r := newReader(t, buf) footer := r.Footer() - require.NotNil(t, footer) - assert.Len(t, footer.Blobs, 2) - // Verify file properties + assert.Len(t, footer.Blobs, 2) assert.Equal(t, "test-value", footer.Properties["test-property"]) assert.Contains(t, footer.Properties[puffin.CreatedBy], "iceberg-go") - // Verify first blob metadata + // Verify blob 1 assert.Equal(t, puffin.BlobTypeDataSketchesTheta, footer.Blobs[0].Type) assert.Equal(t, int64(123), footer.Blobs[0].SnapshotID) assert.Equal(t, int64(1), footer.Blobs[0].SequenceNumber) @@ -93,251 +111,334 @@ func TestRoundTrip(t *testing.T) { assert.Equal(t, meta1.Length, footer.Blobs[0].Length) assert.Equal(t, "1000", footer.Blobs[0].Properties["ndv"]) - // Verify second blob metadata + // Verify blob 2 assert.Equal(t, puffin.BlobTypeDeletionVector, footer.Blobs[1].Type) assert.Equal(t, int64(-1), footer.Blobs[1].SnapshotID) assert.Equal(t, int64(-1), footer.Blobs[1].SequenceNumber) - assert.Equal(t, []int32{}, footer.Blobs[1].Fields) assert.Equal(t, meta2.Offset, footer.Blobs[1].Offset) assert.Equal(t, meta2.Length, footer.Blobs[1].Length) - // Verify blob data via ReadBlob - blobData1, err := r.ReadBlob(0) - require.NoError(t, err) + // Verify data + blobData1, _ := r.ReadBlob(0) assert.Equal(t, blob1Data, blobData1.Data) - blobData2, err := r.ReadBlob(1) - require.NoError(t, err) + blobData2, _ := r.ReadBlob(1) assert.Equal(t, blob2Data, blobData2.Data) - // Verify ReadAllBlobs - allBlobs, err := r.ReadAllBlobs() - require.NoError(t, err) + allBlobs, _ := r.ReadAllBlobs() assert.Len(t, allBlobs, 2) assert.Equal(t, blob1Data, allBlobs[0].Data) assert.Equal(t, blob2Data, allBlobs[1].Data) } // TestEmptyFile verifies that a puffin file with no blobs is valid. +// Empty files are valid per spec and used when no statistics exist yet. func TestEmptyFile(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) + w, buf := newWriter() + require.NoError(t, w.Finish()) + + r := newReader(t, buf) + assert.Len(t, r.Footer().Blobs, 0) + assert.Contains(t, r.Footer().Properties[puffin.CreatedBy], "iceberg-go") - err = w.Finish() + blobs, err := r.ReadAllBlobs() require.NoError(t, err) + assert.Nil(t, blobs) +} - data := buf.Bytes() - r, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) +// TestEmptyBlobData verifies that a zero-length blob is valid. +// Some blob types may legitimately have empty content. +func TestEmptyBlobData(t *testing.T) { + w, buf := newWriter() + meta, err := w.AddBlob(defaultBlobInput(), []byte{}) require.NoError(t, err) + assert.Equal(t, int64(0), meta.Length) + w.Finish() - footer := r.Footer() - assert.NotNil(t, footer) - assert.Len(t, footer.Blobs, 0) - assert.Contains(t, footer.Properties[puffin.CreatedBy], "iceberg-go") + r := newReader(t, buf) + blob, _ := r.ReadBlob(0) + assert.Empty(t, blob.Data) +} - // ReadAllBlobs should return nil for empty file - blobs, err := r.ReadAllBlobs() - require.NoError(t, err) - assert.Nil(t, blobs) +// TestLargeFooter verifies reading works when footer exceeds initial 8KB read buffer. +// This exercises the code path where footer requires a second read from storage. +func TestLargeFooter(t *testing.T) { + w, buf := newWriter() + numBlobs := 200 + for i := range numBlobs { + w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDataSketchesTheta, + SnapshotID: int64(i), + Fields: []int32{1, 2, 3, 4, 5}, + Properties: map[string]string{"key": "value-to-increase-footer-size"}, + }, []byte("blob")) + } + w.Finish() + + r := newReader(t, buf) + assert.Len(t, r.Footer().Blobs, numBlobs) } // TestWriterValidation verifies that Writer rejects invalid input. +// Proper validation prevents corrupt files and provides clear error messages. func TestWriterValidation(t *testing.T) { + // nil writer: Ensures graceful failure when no underlying writer is provided. t.Run("nil writer", func(t *testing.T) { _, err := puffin.NewWriter(nil) - assert.Error(t, err) assert.ErrorContains(t, err, "nil") }) + // missing type: Blob type is required per spec to identify the blob format. t.Run("missing type", func(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - _, err = w.AddBlob(puffin.BlobMetadataInput{ - Type: "", // missing - Fields: []int32{1}, - }, []byte("data")) - assert.Error(t, err) + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{Type: "", Fields: []int32{}}, []byte("x")) assert.ErrorContains(t, err, "type") }) + // nil fields: Fields slice must be non-nil per spec (empty slice is valid). t.Run("nil fields", func(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - _, err = w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - Fields: nil, // nil not allowed - }, []byte("data")) - assert.Error(t, err) + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{Type: puffin.BlobTypeDataSketchesTheta, Fields: nil}, []byte("x")) assert.ErrorContains(t, err, "fields") }) + // add blob after finish: Enforces writer state machine - no writes after finalization. t.Run("add blob after finish", func(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - err = w.Finish() - require.NoError(t, err) - - _, err = w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - Fields: []int32{1}, - }, []byte("data")) - assert.Error(t, err) + w, _ := newWriter() + w.Finish() + _, err := w.AddBlob(defaultBlobInput(), []byte("x")) assert.ErrorContains(t, err, "finalized") }) + // double finish: Prevents accidental double-write of footer corrupting the file. t.Run("double finish", func(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - err = w.Finish() - require.NoError(t, err) - - err = w.Finish() - assert.Error(t, err) - assert.ErrorContains(t, err, "finalized") + w, _ := newWriter() + w.Finish() + assert.ErrorContains(t, w.Finish(), "finalized") }) - t.Run("deletion vector with invalid snapshot-id", func(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - _, err = w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDeletionVector, - SnapshotID: 123, // must be -1 - SequenceNumber: -1, - Fields: []int32{}, - }, []byte("data")) - assert.Error(t, err) + // deletion vector invalid snapshot-id: Spec requires snapshot-id=-1 for deletion vectors. + t.Run("deletion vector invalid snapshot-id", func(t *testing.T) { + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, SnapshotID: 123, SequenceNumber: -1, Fields: []int32{}, + }, []byte("x")) assert.ErrorContains(t, err, "snapshot-id") }) - t.Run("deletion vector with invalid sequence-number", func(t *testing.T) { - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - _, err = w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDeletionVector, - SnapshotID: -1, - SequenceNumber: 5, // must be -1 - Fields: []int32{}, - }, []byte("data")) - assert.Error(t, err) + // deletion vector invalid sequence-number: Spec requires sequence-number=-1 for deletion vectors. + t.Run("deletion vector invalid sequence-number", func(t *testing.T) { + w, _ := newWriter() + _, err := w.AddBlob(puffin.BlobMetadataInput{ + Type: puffin.BlobTypeDeletionVector, SnapshotID: -1, SequenceNumber: 5, Fields: []int32{}, + }, []byte("x")) assert.ErrorContains(t, err, "sequence-number") }) } -// TestReaderInvalidFile verifies that Reader rejects invalid/corrupt files. -func TestReaderInvalidFile(t *testing.T) { - t.Run("nil reader", func(t *testing.T) { - _, err := puffin.NewReader(nil, 100) - assert.Error(t, err) - assert.ErrorContains(t, err, "nil") +// TestSetCreatedBy verifies the SetCreatedBy method. +// Allows applications to identify themselves in puffin files for debugging. +func TestSetCreatedBy(t *testing.T) { + // custom value: Verifies custom application identifiers are preserved in footer. + t.Run("custom value", func(t *testing.T) { + w, buf := newWriter() + require.NoError(t, w.SetCreatedBy("MyApp 1.0")) + w.Finish() + r := newReader(t, buf) + assert.Equal(t, "MyApp 1.0", r.Footer().Properties[puffin.CreatedBy]) }) - t.Run("file too small", func(t *testing.T) { - data := []byte("tiny") - _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) - assert.Error(t, err) - assert.ErrorContains(t, err, "too small") + // empty rejected: Empty identifier provides no value and likely indicates a bug. + t.Run("empty rejected", func(t *testing.T) { + w, _ := newWriter() + assert.ErrorContains(t, w.SetCreatedBy(""), "empty") }) - t.Run("invalid header magic", func(t *testing.T) { - // Create valid file first - var buf bytes.Buffer - w, _ := puffin.NewWriter(&buf) + // after finish rejected: Enforces writer state machine consistency. + t.Run("after finish rejected", func(t *testing.T) { + w, _ := newWriter() w.Finish() - data := buf.Bytes() + assert.ErrorContains(t, w.SetCreatedBy("x"), "finalized") + }) +} - // Corrupt header magic - data[0] = 'X' +// TestClearProperties verifies the ClearProperties method. +// Allows resetting properties before finishing if initial values were wrong. +func TestClearProperties(t *testing.T) { + w, buf := newWriter() + w.AddProperties(map[string]string{"key": "value"}) + w.ClearProperties() + w.Finish() + + r := newReader(t, buf) + _, exists := r.Footer().Properties["key"] + assert.False(t, exists) +} - _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) - assert.Error(t, err) - assert.ErrorContains(t, err, "magic") - }) +// TestAddPropertiesAfterFinish verifies AddProperties rejects calls after finish. +// Enforces writer state machine - properties cannot be added after footer is written. +func TestAddPropertiesAfterFinish(t *testing.T) { + w, _ := newWriter() + w.Finish() + assert.ErrorContains(t, w.AddProperties(map[string]string{"k": "v"}), "finalized") +} - t.Run("invalid trailing magic", func(t *testing.T) { - // Create valid file first - var buf bytes.Buffer - w, _ := puffin.NewWriter(&buf) - w.Finish() - data := buf.Bytes() +// TestReaderInvalidFile verifies that Reader rejects invalid/corrupt files. +// Early detection of corruption prevents silent data loss or security issues. +func TestReaderInvalidFile(t *testing.T) { + tests := []struct { + name string + data func() []byte + wantErr string + }{ + // file too small: Minimum valid puffin file has header magic + footer, rejects truncated files. + {"file too small", func() []byte { return []byte("tiny") }, "too small"}, + // invalid header magic: First 4 bytes must be 'PFA1' to identify puffin format. + {"invalid header magic", func() []byte { + d := validFile() + d[0] = 'X' + + return d + }, "magic"}, + // invalid trailing magic: Last 4 bytes must be 'PFA1' to detect truncation. + {"invalid trailing magic", func() []byte { + d := validFile() + d[len(d)-1] = 'X' + + return d + }, "magic"}, + // invalid footer start magic: Footer section must start with 'PFA1' for integrity. + {"invalid footer start magic", func() []byte { + d := validFile() + d[4] = 'X' + + return d + }, "magic"}, + // unknown flags: Reject files with unsupported features to avoid misinterpretation. + {"unknown flags", func() []byte { + d := validFile() + d[len(d)-8] = 0x80 + + return d + }, "unknown"}, + } - // Corrupt trailing magic (last 4 bytes) - data[len(data)-1] = 'X' + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data := tt.data() + _, err := puffin.NewReader(bytes.NewReader(data)) + assert.ErrorContains(t, err, tt.wantErr) + }) + } - _, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) - assert.Error(t, err) - assert.ErrorContains(t, err, "magic") + // nil reader: Ensures graceful failure when no underlying reader is provided. + t.Run("nil reader", func(t *testing.T) { + _, err := puffin.NewReader(nil) + assert.ErrorContains(t, err, "nil") }) } // TestReaderBlobAccess verifies blob access methods work correctly. +// Tests the primary API for retrieving blob data from puffin files. func TestReaderBlobAccess(t *testing.T) { - // Create file with multiple blobs - var buf bytes.Buffer - w, err := puffin.NewWriter(&buf) - require.NoError(t, err) - - blobs := [][]byte{ - []byte("first"), - []byte("second"), - []byte("third"), + w, buf := newWriter() + blobs := [][]byte{[]byte("first"), []byte("second"), []byte("third")} + for _, b := range blobs { + w.AddBlob(defaultBlobInput(), b) } + w.Finish() + r := newReader(t, buf) - for _, blob := range blobs { - _, err := w.AddBlob(puffin.BlobMetadataInput{ - Type: puffin.BlobTypeDataSketchesTheta, - Fields: []int32{}, - }, blob) - require.NoError(t, err) - } - require.NoError(t, w.Finish()) - - data := buf.Bytes() - r, err := puffin.NewReader(bytes.NewReader(data), int64(len(data))) - require.NoError(t, err) - + // read by index: Primary access method for retrieving blobs sequentially. t.Run("read by index", func(t *testing.T) { for i, expected := range blobs { - blobData, err := r.ReadBlob(i) - require.NoError(t, err) - assert.Equal(t, expected, blobData.Data) + blob, _ := r.ReadBlob(i) + assert.Equal(t, expected, blob.Data) } }) + // index out of range: Prevents panic and provides clear error for invalid indices. t.Run("index out of range", func(t *testing.T) { _, err := r.ReadBlob(-1) assert.Error(t, err) - _, err = r.ReadBlob(100) assert.Error(t, err) }) + // read by metadata: Allows direct access when caller has metadata from external source. t.Run("read by metadata", func(t *testing.T) { - meta := r.Footer().Blobs[1] - data, err := r.ReadBlobByMetadata(meta) - require.NoError(t, err) + data, _ := r.ReadBlobByMetadata(r.Footer().Blobs[1]) assert.Equal(t, blobs[1], data) }) + // read all preserves order: Ensures blobs are returned in same order as written. t.Run("read all preserves order", func(t *testing.T) { - allBlobs, err := r.ReadAllBlobs() - require.NoError(t, err) - require.Len(t, allBlobs, 3) - + all, _ := r.ReadAllBlobs() for i, expected := range blobs { - assert.Equal(t, expected, allBlobs[i].Data) + assert.Equal(t, expected, all[i].Data) } }) } + +// TestReadBlobByMetadataValidation verifies validation of blob metadata. +// Prevents reading garbage data when metadata is corrupted or crafted maliciously. +func TestReadBlobByMetadataValidation(t *testing.T) { + data := validFileWithBlob() + r, _ := puffin.NewReader(bytes.NewReader(data)) + + tests := []struct { + name string + meta puffin.BlobMetadata + wantErr string + }{ + // empty type: Type is required to interpret blob content correctly. + {"empty type", puffin.BlobMetadata{Type: "", Offset: 4, Length: 1}, "type"}, + // offset before header: Prevents reading magic bytes as blob data. + {"offset before header", puffin.BlobMetadata{Type: "t", Offset: 0, Length: 1}, "header"}, + // negative length: Invalid length could cause allocation issues. + {"negative length", puffin.BlobMetadata{Type: "t", Offset: 4, Length: -1}, "length"}, + // extends into footer: Prevents reading footer JSON as blob data. + {"extends into footer", puffin.BlobMetadata{Type: "t", Offset: 4, Length: 9999}, "footer"}, + // overflow: Prevents integer overflow attacks in offset+length calculation. + {"overflow", puffin.BlobMetadata{Type: "t", Offset: math.MaxInt64, Length: 1}, "overflow"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := r.ReadBlobByMetadata(tt.meta) + assert.ErrorContains(t, err, tt.wantErr) + }) + } +} + +// TestReadAt verifies the ReadAt method (io.ReaderAt implementation). +// Enables partial reads for streaming or when only specific byte ranges are needed. +func TestReadAt(t *testing.T) { + w, buf := newWriter() + meta, _ := w.AddBlob(defaultBlobInput(), []byte("hello world")) + w.Finish() + r := newReader(t, buf) + + // valid range: Verifies partial blob reads work correctly. + t.Run("valid range", func(t *testing.T) { + data := make([]byte, 5) + n, err := r.ReadAt(data, meta.Offset) + require.NoError(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, []byte("hello"), data) + }) + + // extends into footer: Prevents reading footer metadata as blob content. + t.Run("extends into footer", func(t *testing.T) { + data := make([]byte, buf.Len()) + _, err := r.ReadAt(data, 4) + assert.ErrorContains(t, err, "footer") + }) + + // offset before header: Prevents reading file magic as blob content. + t.Run("offset before header", func(t *testing.T) { + data := make([]byte, 4) + _, err := r.ReadAt(data, 0) + assert.ErrorContains(t, err, "header") + }) +} diff --git a/puffin/puffin_writer.go b/puffin/puffin_writer.go index 30c14f651..1966173bc 100644 --- a/puffin/puffin_writer.go +++ b/puffin/puffin_writer.go @@ -27,7 +27,7 @@ import ( "github.com/apache/iceberg-go" ) -// PuffinWriter writes blobs and metadata to a Puffin file. +// Writer writes blobs and metadata to a Puffin file. // // Usage: // @@ -63,7 +63,7 @@ type BlobMetadataInput struct { Properties map[string]string } -// NewWriter creates a new PuffinWriter and writes the file header magic. +// NewWriter creates a new Writer and writes the file header magic. // The caller is responsible for closing the underlying writer after Finish returns. func NewWriter(w io.Writer) (*Writer, error) { if w == nil { @@ -79,7 +79,7 @@ func NewWriter(w io.Writer) (*Writer, error) { w: w, offset: MagicSize, props: make(map[string]string), - createdBy: fmt.Sprintf("iceberg-go %s", iceberg.Version()), + createdBy: "iceberg-go " + iceberg.Version(), }, nil }