Skip to content

Conversation

@pforemski
Copy link
Contributor

No description provided.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new head stage for early pipeline termination, and refactors BMP/OpenBMP/MRT handling in Extio for better efficiency and clearer format detection behavior, plus minor metadata/tag tweaks for RouteViews live ingestion.

Changes:

  • Add head stage (--count/-n) and register it in the stage repo.
  • Introduce sync.Pool reuse for MRT/BMP/OpenBMP/Exa objects and adjust BMP/OpenBMP detection and parsing paths.
  • Refine RouteViews OpenBMP tag enrichment (peer/collector tags) and time selection.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
stages/rv-live/openbmp.go Adjusts RouteViews OpenBMP processing: time selection and tag population changes.
stages/repo.go Registers the new head stage in the stage factory map.
stages/head.go Implements the new head stage that stops the pipeline after N messages.
pkg/extio/extio.go Adds object pooling and revises BMP/OpenBMP/MRT format detection and parsing/marshaling paths.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 157 to 164
case ".bmp":
eio.opt_bmp = true
eio.bmp.OpenBMP = false
return true
case ".obmp":
eio.opt_bmp = true
eio.opt_obmp = true
eio.bmp.OpenBMP = true
return true
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DetectPath mutates eio.bmp.OpenBMP for .bmp/.obmp paths without checking that eio.bmp is initialized. This can panic when DetectPath is called before Extio.Attach() (e.g., stages/write.go calls DetectPath before s.eio.Attach()). Consider deferring OpenBMP assignment until Attach() (based on eio.opt_obmp) or add a nil guard like in DetectSample().

Copilot uses AI. Check for mistakes.
Comment on lines 114 to +127
// DetectNeeded returns true iff data format detection is (still) needed
// Must be called after Attach().
func (eio *Extio) DetectNeeded() bool {
return !eio.Detected && eio.K.String("format") == "auto"
}

// DetectPath tries to detect data format from given path.
// Returns true if format was successfully detected.
// Must be called after Attach().
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comments on DetectNeeded/DetectPath/DetectSample say they "Must be called after Attach()", but DetectNeeded() and DetectPath() are currently called before Attach() in stages/write.go (e.g., write.go:114). Either update the comments to reflect actual allowed usage, or enforce the ordering by moving format detection to after Attach() in the stages that call these methods.

Suggested change
// DetectNeeded returns true iff data format detection is (still) needed
// Must be called after Attach().
func (eio *Extio) DetectNeeded() bool {
return !eio.Detected && eio.K.String("format") == "auto"
}
// DetectPath tries to detect data format from given path.
// Returns true if format was successfully detected.
// Must be called after Attach().
// DetectNeeded returns true iff data format detection is (still) needed.
func (eio *Extio) DetectNeeded() bool {
return !eio.Detected && eio.K.String("format") == "auto"
}
// DetectPath tries to detect data format from given path.
// Returns true if format was successfully detected.

Copilot uses AI. Check for mistakes.
Comment on lines +44 to +58
func (s *Head) onMsg(m *msg.Msg) bool {
// increment counter and check limit
n := s.count.Add(1)

if n < s.limit {
// under limit, pass message
return true
} else if n == s.limit {
// at limit, pass message and stop pipe
go s.P.Stop()
return true
} else {
// already over limit, drop message
return false
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When --count is 0, onMsg increments count to 1 and then falls into the "over limit" branch, dropping messages without ever calling Stop(). If 0 is intended to mean "stop immediately / pass 0 messages", trigger Stop() when limit==0 (e.g., in Attach() or on the first onMsg call) so the pipeline terminates as expected.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants