Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/substreams/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func init() {
sink.FlagIncludeOptional(
sink.FlagCursor,
sink.FlagSkipCheckModuleBinariesExist,
sink.FlagPartialBlocks,
),
sink.FlagExcludeDefault(
sink.FlagStopBlock,
Expand Down
99 changes: 97 additions & 2 deletions tui2/pages/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"slices"
"sort"
"strconv"
"strings"

"github.com/charmbracelet/bubbles/viewport"
tea "github.com/charmbracelet/bubbletea"
Expand Down Expand Up @@ -46,6 +48,7 @@ type Output struct {

blocksPerModule map[string][]uint64
payloads map[common.BlockContext]*pbsubstreamsrpc.AnyModuleOutput
partialPayloads map[string]*pbsubstreamsrpc.AnyModuleOutput // stores partial blocks using "module:blocknum:partialindex" as key
bytesRepresentation dynamic.BytesRepresentation

blockIDs map[uint64]string
Expand Down Expand Up @@ -184,6 +187,7 @@ func (o *Output) Update(msg tea.Msg) (tea.Model, tea.Cmd) {

o.blocksPerModule = make(map[string][]uint64)
o.payloads = make(map[common.BlockContext]*pbsubstreamsrpc.AnyModuleOutput)
o.partialPayloads = make(map[string]*pbsubstreamsrpc.AnyModuleOutput)
o.blockIDs = make(map[uint64]string)
o.blockSelector.Update(blockselect.NewRequestInstanceMsg{})

Expand Down Expand Up @@ -254,6 +258,67 @@ func (o *Output) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
o.setOutputViewContent(forceRedraw)
}

case *pbsubstreamsrpc.PartialBlockData:
blockNum := msg.Clock.Number

if o.lowBlock == nil {
o.lowBlock = &blockNum
}
if o.highBlock < blockNum {
o.highBlock = blockNum
}
o.blockSelector.StretchBounds(*o.lowBlock, o.highBlock)

// Handle partial block data similar to BlockScopedData
if o.moduleSelector != nil && o.moduleSelector.AddModule(o.outputModule) {
cmds = append(cmds, func() tea.Msg { return common.UpdateSeenModulesMsg(o.moduleSelector.Modules) })
o.active.Module = o.outputModule
o.active.BlockNum = blockNum
}

o.blockIDs[msg.Clock.Number] = msg.Clock.Id

// Handle the partial block output
if msg.Output != nil && !msg.Output.IsEmpty() {
modName := msg.Output.Name()
blockCtx := common.BlockContext{
Module: modName,
BlockNum: blockNum,
}

forceRedraw := false
if _, found := o.payloads[blockCtx]; !found {
if o.moduleSelector != nil && modName != "" && o.moduleSelector.AddModule(modName) {
cmds = append(cmds, func() tea.Msg { return common.UpdateSeenModulesMsg(o.moduleSelector.Modules) })
}
if o.active.Module == "" {
o.active.Module = modName
o.active.BlockNum = blockNum
}
if o.active.Module == modName && len(o.blocksPerModule[modName]) == 0 {
forceRedraw = true
o.active.BlockNum = blockNum
}
o.blocksPerModule[modName] = append(o.blocksPerModule[modName], blockNum)
if modName == o.active.Module {
o.blockSelector.SetAvailableBlocks(o.blocksPerModule[modName])
}

if o.keywordToSearchFor != "" {
if hasKeyword := o.searchIncomingBlockInModule(o.active.Module, blockNum); hasKeyword {
cmds = append(cmds, func() tea.Msg {
return search.AddMatchingBlock(blockNum)
})
}
}
}
// Store partial block data separately from regular blocks
// Create composite key for partial block storage: "module:blocknum:partialindex"
partialKey := fmt.Sprintf("%s:%d:%d", modName, blockNum, msg.PartialIndex)
o.partialPayloads[partialKey] = msg.Output
o.setOutputViewContent(forceRedraw)
}

case search.ApplySearchQueryMsg:
o.keywordToSearchFor = msg.Query
o.setOutputViewContent(true)
Expand Down Expand Up @@ -359,22 +424,52 @@ type displayContext struct {
payload *pbsubstreamsrpc.AnyModuleOutput
searchJQMode bool
errReceived error
isPartialBlock bool
partialIndex uint32
}

func (o *Output) setOutputViewContent(forcedRender bool) {
// Check if there are partial blocks for the current active context
var payload *pbsubstreamsrpc.AnyModuleOutput
var isPartialBlock bool
var partialIndex uint32

// First, look for partial blocks matching the current module:blocknum
for key, partialPayload := range o.partialPayloads {
if strings.HasPrefix(key, fmt.Sprintf("%s:%d:", o.active.Module, o.active.BlockNum)) {
payload = partialPayload
isPartialBlock = true
// Extract partial index from key "module:blocknum:partialindex"
parts := strings.Split(key, ":")
if len(parts) == 3 {
if idx, err := strconv.ParseUint(parts[2], 10, 32); err == nil {
partialIndex = uint32(idx)
}
}
break // Use the first partial block found (could be enhanced to show latest or allow selection)
}
}

// If no partial blocks found, use regular payload
if !isPartialBlock {
payload = o.payloads[o.active]
}

displayCtx := &displayContext{
logsEnabled: o.logsEnabled,
blockCtx: o.active,
searchViewEnabled: o.searchEnabled,
searchQuery: o.searchCtx.Current.Query,
searchJQMode: o.searchCtx.Current.JQMode,
payload: o.payloads[o.active],
payload: payload,
errReceived: o.errReceived,
isPartialBlock: isPartialBlock,
partialIndex: partialIndex,
}

if forcedRender {
vals := o.renderedOutput(displayCtx.payload, true)
content := o.renderPayload(vals)
content := o.renderPayload(vals, displayCtx)
if displayCtx.searchViewEnabled {
var matchCount int
var positions []int
Expand Down
14 changes: 13 additions & 1 deletion tui2/pages/output/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/alecthomas/chroma/quick"
"github.com/charmbracelet/lipgloss"
"github.com/dustin/go-humanize"
"github.com/itchyny/gojq"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
Expand Down Expand Up @@ -117,8 +118,19 @@ func (o *Output) renderedOutput(in *pbsubstreamsrpc.AnyModuleOutput, withStyle b
return
}

func (o *Output) renderPayload(in *renderedOutput) string {
func (o *Output) renderPayload(in *renderedOutput, displayCtx *displayContext) string {
out := &strings.Builder{}

// Add block header with partial block indication
if displayCtx.isPartialBlock {
header := fmt.Sprintf("----------- PARTIAL BLOCK #%s (idx=%d) (%s) ---------------",
humanize.Comma(int64(displayCtx.blockCtx.BlockNum)),
displayCtx.partialIndex,
o.blockIDs[displayCtx.blockCtx.BlockNum])
out.WriteString(styles.Output.LogLabel.Render(header))
out.WriteString("\n\n")
}

if in.error != nil {
out.WriteString(styles.Output.ErrorLine.Render(in.error.Error()))
out.WriteString("\n")
Expand Down
10 changes: 10 additions & 0 deletions tui2/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@
}
cmds = append(cmds, s.readNextMessage)
return tea.Batch(cmds...)
case *pbsubstreamsrpc.PartialBlockData:

Check failure on line 126 in tui2/stream/stream.go

View workflow job for this annotation

GitHub Actions / Test (1.24.x, macos-latest)

undefined: pbsubstreamsrpc.PartialBlockData

Check failure on line 126 in tui2/stream/stream.go

View workflow job for this annotation

GitHub Actions / Test (1.24.x, ubuntu-latest)

undefined: pbsubstreamsrpc.PartialBlockData
var cmds []tea.Cmd
if !s.seenBlockData {
s.seenBlockData = true
cmds = append(cmds, func() tea.Msg { return StreamingMsg })
}
cmds = append(cmds, s.readNextMessage)
return tea.Batch(cmds...)
case *pbsubstreamsrpc.ModulesProgress:
var cmds []tea.Cmd
if !s.seenBlockData && !s.sentBackprocessingMsg {
Expand Down Expand Up @@ -232,6 +240,8 @@
switch m := resp.Message.(type) {
case *pbsubstreamsrpc.Response_BlockScopedData:
return m.BlockScopedData
case *pbsubstreamsrpc.Response_PartialBlockData:

Check failure on line 243 in tui2/stream/stream.go

View workflow job for this annotation

GitHub Actions / Test (1.24.x, macos-latest)

undefined: pbsubstreamsrpc.Response_PartialBlockData

Check failure on line 243 in tui2/stream/stream.go

View workflow job for this annotation

GitHub Actions / Test (1.24.x, ubuntu-latest)

undefined: pbsubstreamsrpc.Response_PartialBlockData
return m.PartialBlockData
case *pbsubstreamsrpc.Response_Progress:
//log.Printf("Progress response: %T %v", resp, resp)
return m.Progress
Expand Down
Loading