diff --git a/cmd/substreams/gui.go b/cmd/substreams/gui.go index 16b3a7d40..6bef985dd 100644 --- a/cmd/substreams/gui.go +++ b/cmd/substreams/gui.go @@ -20,6 +20,7 @@ func init() { sink.FlagIncludeOptional( sink.FlagCursor, sink.FlagSkipCheckModuleBinariesExist, + sink.FlagPartialBlocks, ), sink.FlagExcludeDefault( sink.FlagStopBlock, diff --git a/tui2/pages/output/output.go b/tui2/pages/output/output.go index b3a035691..f3e0f0db5 100644 --- a/tui2/pages/output/output.go +++ b/tui2/pages/output/output.go @@ -4,6 +4,8 @@ import ( "fmt" "slices" "sort" + "strconv" + "strings" "github.com/charmbracelet/bubbles/viewport" tea "github.com/charmbracelet/bubbletea" @@ -46,6 +48,7 @@ type Output struct { blocksPerModule map[string][]uint64 payloads map[common.BlockContext]*pbsubstreamsrpc.AnyModuleOutput + partialPayloads map[string]*pbsubstreamsrpc.AnyModuleOutput // stores partial blocks using "module:blocknum:partialindex" as key bytesRepresentation dynamic.BytesRepresentation blockIDs map[uint64]string @@ -184,6 +187,7 @@ func (o *Output) Update(msg tea.Msg) (tea.Model, tea.Cmd) { o.blocksPerModule = make(map[string][]uint64) o.payloads = make(map[common.BlockContext]*pbsubstreamsrpc.AnyModuleOutput) + o.partialPayloads = make(map[string]*pbsubstreamsrpc.AnyModuleOutput) o.blockIDs = make(map[uint64]string) o.blockSelector.Update(blockselect.NewRequestInstanceMsg{}) @@ -254,6 +258,67 @@ func (o *Output) Update(msg tea.Msg) (tea.Model, tea.Cmd) { o.setOutputViewContent(forceRedraw) } + case *pbsubstreamsrpc.PartialBlockData: + blockNum := msg.Clock.Number + + if o.lowBlock == nil { + o.lowBlock = &blockNum + } + if o.highBlock < blockNum { + o.highBlock = blockNum + } + o.blockSelector.StretchBounds(*o.lowBlock, o.highBlock) + + // Handle partial block data similar to BlockScopedData + if o.moduleSelector != nil && o.moduleSelector.AddModule(o.outputModule) { + cmds = append(cmds, func() tea.Msg { return common.UpdateSeenModulesMsg(o.moduleSelector.Modules) }) + o.active.Module = o.outputModule + o.active.BlockNum = blockNum + } + + o.blockIDs[msg.Clock.Number] = msg.Clock.Id + + // Handle the partial block output + if msg.Output != nil && !msg.Output.IsEmpty() { + modName := msg.Output.Name() + blockCtx := common.BlockContext{ + Module: modName, + BlockNum: blockNum, + } + + forceRedraw := false + if _, found := o.payloads[blockCtx]; !found { + if o.moduleSelector != nil && modName != "" && o.moduleSelector.AddModule(modName) { + cmds = append(cmds, func() tea.Msg { return common.UpdateSeenModulesMsg(o.moduleSelector.Modules) }) + } + if o.active.Module == "" { + o.active.Module = modName + o.active.BlockNum = blockNum + } + if o.active.Module == modName && len(o.blocksPerModule[modName]) == 0 { + forceRedraw = true + o.active.BlockNum = blockNum + } + o.blocksPerModule[modName] = append(o.blocksPerModule[modName], blockNum) + if modName == o.active.Module { + o.blockSelector.SetAvailableBlocks(o.blocksPerModule[modName]) + } + + if o.keywordToSearchFor != "" { + if hasKeyword := o.searchIncomingBlockInModule(o.active.Module, blockNum); hasKeyword { + cmds = append(cmds, func() tea.Msg { + return search.AddMatchingBlock(blockNum) + }) + } + } + } + // Store partial block data separately from regular blocks + // Create composite key for partial block storage: "module:blocknum:partialindex" + partialKey := fmt.Sprintf("%s:%d:%d", modName, blockNum, msg.PartialIndex) + o.partialPayloads[partialKey] = msg.Output + o.setOutputViewContent(forceRedraw) + } + case search.ApplySearchQueryMsg: o.keywordToSearchFor = msg.Query o.setOutputViewContent(true) @@ -359,22 +424,52 @@ type displayContext struct { payload *pbsubstreamsrpc.AnyModuleOutput searchJQMode bool errReceived error + isPartialBlock bool + partialIndex uint32 } func (o *Output) setOutputViewContent(forcedRender bool) { + // Check if there are partial blocks for the current active context + var payload *pbsubstreamsrpc.AnyModuleOutput + var isPartialBlock bool + var partialIndex uint32 + + // First, look for partial blocks matching the current module:blocknum + for key, partialPayload := range o.partialPayloads { + if strings.HasPrefix(key, fmt.Sprintf("%s:%d:", o.active.Module, o.active.BlockNum)) { + payload = partialPayload + isPartialBlock = true + // Extract partial index from key "module:blocknum:partialindex" + parts := strings.Split(key, ":") + if len(parts) == 3 { + if idx, err := strconv.ParseUint(parts[2], 10, 32); err == nil { + partialIndex = uint32(idx) + } + } + break // Use the first partial block found (could be enhanced to show latest or allow selection) + } + } + + // If no partial blocks found, use regular payload + if !isPartialBlock { + payload = o.payloads[o.active] + } + displayCtx := &displayContext{ logsEnabled: o.logsEnabled, blockCtx: o.active, searchViewEnabled: o.searchEnabled, searchQuery: o.searchCtx.Current.Query, searchJQMode: o.searchCtx.Current.JQMode, - payload: o.payloads[o.active], + payload: payload, errReceived: o.errReceived, + isPartialBlock: isPartialBlock, + partialIndex: partialIndex, } if forcedRender { vals := o.renderedOutput(displayCtx.payload, true) - content := o.renderPayload(vals) + content := o.renderPayload(vals, displayCtx) if displayCtx.searchViewEnabled { var matchCount int var positions []int diff --git a/tui2/pages/output/render.go b/tui2/pages/output/render.go index 3884a3539..cecda4350 100644 --- a/tui2/pages/output/render.go +++ b/tui2/pages/output/render.go @@ -8,6 +8,7 @@ import ( "github.com/alecthomas/chroma/quick" "github.com/charmbracelet/lipgloss" + "github.com/dustin/go-humanize" "github.com/itchyny/gojq" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/dynamic" @@ -117,8 +118,19 @@ func (o *Output) renderedOutput(in *pbsubstreamsrpc.AnyModuleOutput, withStyle b return } -func (o *Output) renderPayload(in *renderedOutput) string { +func (o *Output) renderPayload(in *renderedOutput, displayCtx *displayContext) string { out := &strings.Builder{} + + // Add block header with partial block indication + if displayCtx.isPartialBlock { + header := fmt.Sprintf("----------- PARTIAL BLOCK #%s (idx=%d) (%s) ---------------", + humanize.Comma(int64(displayCtx.blockCtx.BlockNum)), + displayCtx.partialIndex, + o.blockIDs[displayCtx.blockCtx.BlockNum]) + out.WriteString(styles.Output.LogLabel.Render(header)) + out.WriteString("\n\n") + } + if in.error != nil { out.WriteString(styles.Output.ErrorLine.Render(in.error.Error())) out.WriteString("\n") diff --git a/tui2/stream/stream.go b/tui2/stream/stream.go index 7b6edcf78..80118013f 100644 --- a/tui2/stream/stream.go +++ b/tui2/stream/stream.go @@ -123,6 +123,14 @@ func (s *Stream) Update(msg tea.Msg) tea.Cmd { } cmds = append(cmds, s.readNextMessage) return tea.Batch(cmds...) + case *pbsubstreamsrpc.PartialBlockData: + var cmds []tea.Cmd + if !s.seenBlockData { + s.seenBlockData = true + cmds = append(cmds, func() tea.Msg { return StreamingMsg }) + } + cmds = append(cmds, s.readNextMessage) + return tea.Batch(cmds...) case *pbsubstreamsrpc.ModulesProgress: var cmds []tea.Cmd if !s.seenBlockData && !s.sentBackprocessingMsg { @@ -232,6 +240,8 @@ func (s *Stream) routeNextMessage(resp *pbsubstreamsrpc.Response) tea.Msg { switch m := resp.Message.(type) { case *pbsubstreamsrpc.Response_BlockScopedData: return m.BlockScopedData + case *pbsubstreamsrpc.Response_PartialBlockData: + return m.PartialBlockData case *pbsubstreamsrpc.Response_Progress: //log.Printf("Progress response: %T %v", resp, resp) return m.Progress