-
Notifications
You must be signed in to change notification settings - Fork 4
Kmk grpc add example #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Warning Rate limit exceeded@kishori82 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 9 minutes and 2 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughRemoved detailed trace decoding and related imports from the core gRPC client; simplified handleResponse (moved atomic counter, removed timestamp printing, replaced trace handlers with no-op logs). Added two new standalone CLI binaries: a concurrent multi-stream publisher and a concurrent multi-stream subscriber (gRPC-based) with file logging and optional SHA-256 hashing. Adjusted CI linter flags and updated test expectation strings. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as Publisher CLI
participant IPFile as IP File
participant Worker as Worker (per IP)
participant gRPC as gRPC Server
participant HashWriter as Hash Writer
rect rgb(235,247,235)
CLI->>IPFile: Read & parse IPs
CLI->>Worker: Spawn workers (one per IP)
end
Worker->>gRPC: Dial & open CommandStream
loop For each message
Worker->>gRPC: Send PublishData (random payload)
gRPC-->>Worker: Ack/Response
Worker->>Worker: Compute SHA-256 & timing
Worker->>HashWriter: Enqueue hash
end
Worker->>gRPC: Close stream
CLI->>Worker: Signal -> cancel context
Worker-->>HashWriter: flush & exit
sequenceDiagram
participant CLI as Subscriber CLI
participant IPFile as IP File
participant Worker as Worker (per IP)
participant gRPC as gRPC Server
participant DataWriter as Data File Writer
participant TraceWriter as Trace File Writer
rect rgb(235,247,235)
CLI->>IPFile: Read & slice IPs
CLI->>Worker: Spawn workers (one per IP)
end
Worker->>gRPC: Dial & open ListenCommands stream
Worker->>gRPC: Send SubscribeToTopic
loop Receive responses
gRPC-->>Worker: Response (Message / Trace / EOF)
alt Message
Worker->>Worker: Parse P2PMessage & compute SHA-256
Worker->>DataWriter: Write ip,publisher,size,hash
else Trace
Worker->>TraceWriter: Write formatted trace
end
end
CLI->>Worker: Signal -> cancel context
Worker-->>DataWriter: flush & close
Worker-->>TraceWriter: flush & close
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches❌ Failed checks (4 warnings, 1 inconclusive)
✅ Passed checks (2 passed)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
grpc_p2p_client/p2p_client.go (1)
106-141: Fix missing counter increment and consider bounding logged message size
receivedCountis still logged in the subscribe loop (Lines 106–141), buthandleResponse(Lines 189–205) no longer increments it. As a result, “Total messages received” will always report0, regardless of actual traffic.Also,
fmt.Printf("Recv message: %d %s\n\n", messageSize, string(p2pMessage.Message))now logs the full message body, which can be arbitrarily large and hurt performance/readability under load.Recommend:
- Either restore the counter increment or remove the counter usage entirely; most likely you want to keep the metric:
func handleResponse(resp *protobuf.Response, counter *int32) { switch resp.GetCommand() { case protobuf.ResponseType_Message: var p2pMessage P2PMessage if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { log.Printf("Error unmarshalling message: %v", err) return } - messageSize := len(p2pMessage.Message) + n := atomic.AddInt32(counter, 1) + messageSize := len(p2pMessage.Message) - //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: %d %s\n\n", messageSize, string(p2pMessage.Message)) + //fmt.Printf("Recv message: [%d] %dB %.100s\n\n", n, messageSize, string(p2pMessage.Message)) + fmt.Printf("Recv message: [%d] %dB %s\n\n", n, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_Unknown: default: log.Println("Unknown response command:", resp.GetCommand()) } }
- Optionally keep logging truncated bodies (e.g.
%.100s) to avoid log blow‑ups when messages are large.Also applies to: 189-205
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
grpc_p2p_client/local.ips.tsvis excluded by!**/*.tsvand included by none
📒 Files selected for processing (3)
grpc_p2p_client/p2p_client.go(1 hunks)grpc_p2p_client/p2p_client_multi_streams_publish.go(1 hunks)grpc_p2p_client/p2p_client_multi_streams_subscribe.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client.gogrpc_p2p_client/p2p_client_multi_streams_subscribe.gogrpc_p2p_client/p2p_client_multi_streams_publish.go
🧠 Learnings (1)
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Concurrency safety in Go: avoid goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_subscribe.go
🧬 Code graph analysis (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go (2)
grpc_p2p_client/p2p_client.go (3)
Command(34-34)CommandPublishData(38-38)P2PMessage(26-31)grpc_p2p_client/p2p_client_multi_streams_subscribe.go (3)
Command(40-40)CommandPublishData(44-44)P2PMessage(32-37)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (3)
grpc_p2p_client/p2p_client_multi_streams_publish.go (3)
85-112: Deadlock when-outputis unset; fix writer goroutine tracking.This contains the same critical deadlock issue flagged in previous reviews:
Deadlock:
doneis always created (Line 87), but the writer goroutine is only started when*output != ""(Lines 92–98). At shutdown, Line 112 always blocks on<-done. If-outputis omitted, no goroutine ever sends ondone→ the program hangs forever.WaitGroup doesn't track writer: The outer goroutine (Lines 94–98) immediately exits after launching the inner
writeHashToFilegoroutine (Line 97), sowg.Wait()on Line 110 does not wait for the writer to finish drainingdataCh.Apply the fix from the previous review to resolve both issues:
// Buffered channel to prevent blocking dataCh := make(chan string, 100) - done := make(chan bool) + var done chan bool *dataSize = int(float32(*dataSize)/2.0) var wg sync.WaitGroup - // Start writing the has of the published data + // Start writing the hash of the published data if requested if *output != "" { - wg.Add(1) - go func() { - defer wg.Done() - header := fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeHashToFile(dataCh, done, *output, header) - }() + done = make(chan bool) + header := fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeHashToFile(dataCh, done, *output, header) } @@ wg.Wait() close(dataCh) - <-done + if done != nil { + <-done + }This ensures
doneis only created and waited on when the writer is actually started, and removes the unnecessary wrapper goroutine.Based on learnings.
116-185: Replacelog.Fatalfwith error returns and respect context in the send loop.This contains the issues flagged in previous reviews:
log.Fatalfin goroutines: Lines 133, 143, 152, and 164 calllog.Fatalf, which terminates the entire process. In a multi-IP concurrent publisher, a single failure should not kill all workers. Return errors instead so the caller can log and continue.No context checking in loop: Once inside the send loop (Lines 146–181), the function never checks
ctx. If the user cancels (CTRL+C), the goroutine won't stop until the next gRPC error or loop completion, which can delay shutdown.Apply the fix from the previous review:
func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { @@ - for i := 0; i < *count; i++ { + for i := 0; i < *count; i++ { + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping publishes", ip) + return ctx.Err() + default: + } @@ - if _, err := rand.Read(randomBytes); err != nil { - log.Fatalf("failed to generate random bytes: %v", err) - } + if _, err := rand.Read(randomBytes); err != nil { + return fmt.Errorf("[%s] failed to generate random bytes: %w", ip, err) + } @@ if err := stream.Send(pubReq); err != nil { - log.Fatalf("send publish: %v", err) + return fmt.Errorf("[%s] send publish: %w", ip, err) }For Lines 133 and 143, replace
log.Fatalfwithreturn fmt.Errorf(...)similarly. Inmain, handle the error:go func(ip string) { defer wg.Done() datasize := *dataSize - sendMessages(ctx, ip, datasize, *output != "", dataCh) + if err := sendMessages(ctx, ip, datasize, *output != "", dataCh); err != nil { + log.Printf("[%s] sendMessages error: %v", ip, err) + } }(ip)Based on learnings.
213-231: Remove unusedhandleResponsefunction andsync/atomicimport.As noted in previous reviews,
handleResponseis never invoked—sendMessagesonly sends on the gRPC stream and never reads responses. This means:
- Server responses/acks are silently ignored.
- The
sync/atomicimport (Line 18) is only used in this function (Line 221) and becomes dead weight.- The
counterparameter has no callers.Decision required:
- If the server sends responses, wire up a receive loop in
sendMessages(similar to the subscribe client) that continuously callsstream.Recv()and passes the result tohandleResponsewith anint32counter.- If the publisher should not process responses, remove
handleResponse,headHex(also unused), and thesync/atomicimport to eliminate dead code.Clarify the intended protocol behavior and implement the chosen option.
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
🧠 Learnings (1)
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Concurrency safety in Go: avoid goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
🧬 Code graph analysis (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go (2)
grpc_p2p_client/p2p_client.go (3)
Command(34-34)CommandPublishData(38-38)P2PMessage(26-31)grpc_p2p_client/p2p_client_multi_streams_subscribe.go (3)
Command(40-40)CommandPublishData(44-44)P2PMessage(32-37)
🔇 Additional comments (2)
grpc_p2p_client/p2p_client_multi_streams_publish.go (2)
45-57: Flag definitions look good.The CLI flags provide appropriate defaults and cover the necessary configuration options for the publisher.
187-211: LGTM!The IP file parsing correctly handles comments, empty lines, and errors. The implementation is clean and follows Go conventions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
grpc_p2p_client/p2p_client.go(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test Docker Setup
🔇 Additional comments (1)
grpc_p2p_client/p2p_client.go (1)
197-197: LGTM: Message size computation is correct.Computing the message size from the unmarshalled
p2pMessage.Messagefield is straightforward and correct.
7be8a0c to
88a8ba9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go (1)
93-98: CRITICAL: WaitGroup doesn't actually wait for the writer goroutine.The nested goroutine pattern is incorrect:
- Line 93:
wg.Add(1)increments the WaitGroup- Line 94: A wrapper goroutine is started
- Line 95:
defer wg.Done()will execute when the wrapper returns- Line 97: Another goroutine
go writeHashToFile(...)is started- The wrapper goroutine returns immediately after line 97
This means
wg.Done()is called immediately, beforewriteHashToFilecompletes. Whenwg.Wait()on line 110 returns, the writer goroutine may still be processing data fromdataCh, which is then closed on line 111. This can cause writes to be lost or incomplete output.Remove the wrapper goroutine and start
writeHashToFiledirectly:var wg sync.WaitGroup // Start writing the has of the published data if *output != "" { - wg.Add(1) - go func() { - defer wg.Done() - header := fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeHashToFile(dataCh, done, *output, header) - }() + header := fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeHashToFile(dataCh, done, *output, header) }Note: The writer shouldn't be in the WaitGroup since it needs to consume data after all senders finish. The
donechannel already synchronizes completion correctly (once the deadlock issue is fixed).
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
🧠 Learnings (2)
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Concurrency safety in Go: avoid goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Security considerations in Go: flag SQL/command injection, path traversal, unvalidated inputs, hardcoded credentials, insecure crypto (MD5/SHA1), and missing rate limiting on public endpoints
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
grpc_p2p_client/p2p_client.go (2)
137-139: Unbounded goroutine spawning risks resource exhaustion and lacks panic recovery.Spawning a new goroutine for each received message (line 137) can exhaust memory and goroutine scheduler capacity under high message rates. Additionally, if
handleResponsepanics, the program crashes with no recovery.Consider one of these patterns:
- Worker pool: Pre-spawn a fixed number of worker goroutines that consume from
msgChan.- Serial processing: Remove the goroutine and call
handleResponsedirectly if message ordering or simpler error handling is acceptable.Example worker pool pattern:
msgChan := make(chan *protobuf.Response, 10000) + + // Start worker pool + const numWorkers = 10 + for i := 0; i < numWorkers; i++ { + go func() { + defer func() { + if r := recover(); r != nil { + log.Printf("Worker panic recovered: %v", r) + } + }() + for resp := range msgChan { + handleResponse(resp, &receivedCount) + } + }() + } // recv goroutine go func() { for { resp, err := stream.Recv() if err == io.EOF { close(msgChan) return } if err != nil { log.Printf("recv error: %v", err) close(msgChan) return } msgChan <- resp } }() // message handler loop for { select { case <-ctx.Done(): log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) return - case resp, ok := <-msgChan: - if !ok { - log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) - return - } - go func(resp *protobuf.Response) { - handleResponse(resp, &receivedCount) - }(resp) + case _, ok := <-msgChan: + if !ok { + log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return + } + // Workers process from msgChan directly } }As per coding guidelines.
65-68: Configure reasonable message size limits instead ofmath.MaxInt.Setting
MaxCallRecvMsgSizeandMaxCallSendMsgSizetomath.MaxIntremoves all size constraints, which exposes the client to memory exhaustion if the server sends or accepts extremely large messages. This can lead to OOM crashes or DoS conditions.Apply this diff to set reasonable limits based on your expected message sizes:
grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt), - grpc.MaxCallSendMsgSize(math.MaxInt), + grpc.MaxCallRecvMsgSize(100*1024*1024), // 100 MB + grpc.MaxCallSendMsgSize(100*1024*1024), // 100 MB ),Adjust the limits (e.g., 10 MB, 100 MB) based on your actual P2P message size requirements.
As per coding guidelines.
♻️ Duplicate comments (1)
grpc_p2p_client/p2p_client.go (1)
214-219:headHexremains unused in this file.This function is still defined but never called within
p2p_client.go. As noted in previous reviews, it's duplicated across multiple files and should be moved to a shared utility package.Based on past review feedback.
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
grpc_p2p_client/p2p_client.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test Docker Setup
🔇 Additional comments (1)
grpc_p2p_client/p2p_client.go (1)
202-207: Clarify maintenance status and address unimplemented MumP2P trace handler.This file appears to be a separate, simplified client variant—the active client is
p2p_client_multi_streams_subscribe.go, which properly handlesResponseType_MessageTraceGossipSubviahandleGossipSubTrace().However, two issues remain:
MumP2P trace handler is missing:
ResponseType_MessageTraceMumP2Pis defined in the proto (p2p_stream.proto) but has zero implementations anywhere in the codebase. Either this trace type should be removed from the proto or a handler must be implemented.Confirm if p2p_client.go is maintained: If this client is still active/maintained, silently logging and discarding
resp.GetData()for traces wastes bandwidth. Either explicitly disable trace subscriptions at the connection level or route traces to proper handlers.If this file is legacy/example code, consider marking it clearly or removing it to avoid confusion about which client should be used.
…dev-setup-guide into kmk-grpc-add-example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (13)
grpc_p2p_client/p2p_client_multi_streams_publish.go (6)
28-44: DeduplicateP2PMessageandCommandacross clients
P2PMessageandCommandare copy-pasted here and in the other gRPC clients. This is already hard to maintain and will only get worse as fields evolve.Extract these into a shared internal package (e.g.
internal/p2p) and import them in all three CLIs instead of re-declaring the types. This removes duplication and keeps protocol changes in one place.
As per coding guidelines
87-99: Fix writer startup: WaitGroup misuse anddone-channel deadlockThis block has multiple concurrency bugs:
done := make(chan bool)is always created, butwriteHashToFileis only started when*output != "". When-outputis omitted,<-doneat Line 112 blocks forever. This was already flagged earlier and still exists.- The writer is started via a nested goroutine; the outer goroutine calls
defer wg.Done()without any correspondingwg.Add(1), which will panic withsync: WaitGroup misuse: negative counter.- There’s no reason for the wrapper goroutine.
Simplify and make the shutdown safe:
- // Buffered channel to prevent blocking - dataCh := make(chan string, 100) - done := make(chan bool) - *dataSize = int(float32(*dataSize) / 2.0) - - var wg sync.WaitGroup - // Start writing the has of the published data - if *output != "" { - go func() { - header := fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeHashToFile(dataCh, done, *output, header) - }() - } + // Buffered channel to prevent blocking writer + dataCh := make(chan string, 100) + var done chan bool + *dataSize = int(float32(*dataSize) / 2.0) + + // Start writing the hash of the published data if requested + if *output != "" { + done = make(chan bool) + header := fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeHashToFile(dataCh, done, *output, header) + } + + var wg sync.WaitGroup @@ - wg.Wait() - close(dataCh) - <-done + wg.Wait() + close(dataCh) + if done != nil { + <-done + }This removes the WaitGroup misuse, only waits on
donewhen a writer exists, and keeps the shutdown path deterministic.
Based on learningsAlso applies to: 111-112, 249-277
90-90: Avoid mutating thedatasizeflag at runtime
*dataSize = int(float32(*dataSize) / 2.0)silently halves the user-specified flag. That’s surprising behavior and unnecessarily uses float conversions.If you really want “half the flag value” for the payload, compute it locally and keep the flag intact:
- *dataSize = int(float32(*dataSize) / 2.0) + effectiveSize := *dataSize / 2 @@ - for _, ip := range ips { + for _, ip := range ips { wg.Add(1) go func(ip string) { defer wg.Done() - datasize := *dataSize + datasize := effectiveSize sendMessages(ctx, ip, datasize, *output != "", dataCh) }(ip) }Or, if halving was just an experiment, drop the line entirely and pass
*dataSizethrough.
As per coding guidelinesAlso applies to: 101-108
101-109: RestructuresendMessages: reuse gRPC connection, respect context, avoidlog.Fatalf
sendMessageshas several serious issues:
- A new gRPC connection and stream are created inside the
for i := 0; i < *count; i++loop. This does full TCP/HTTP2 setup per message and can exhaust resources at high-count. This was previously flagged and is still present.- Errors use
log.Fatalfinside worker goroutines, killing the entire process if a single IP misbehaves.- The loop ignores
ctxonce the firstselectpasses.- The function returns
errorbut never actually returns a non-nil error.Restructure to create one connection/stream per worker, honor context, and return errors:
func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { - // connect with simple gRPC settings - select { - case <-ctx.Done(): - log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() - default: - } - - for i := 0; i < *count; i++ { - conn, err := grpc.NewClient(ip, + // connect with simple gRPC settings + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + + conn, err := grpc.NewClient(ip, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(math.MaxInt), grpc.MaxCallSendMsgSize(math.MaxInt), ), ) if err != nil { - log.Fatalf("failed to connect to node %v", err) + return fmt.Errorf("[%s] failed to connect to node: %w", ip, err) } - //defer conn.Close() - println(fmt.Sprintf("Connected to node at: %s…", ip)) - - client := protobuf.NewCommandStreamClient(conn) - - stream, err := client.ListenCommands(ctx) - - if err != nil { - log.Fatalf("ListenCommands: %v", err) - } - + defer conn.Close() + println(fmt.Sprintf("Connected to node at: %s…", ip)) + + client := protobuf.NewCommandStreamClient(conn) + stream, err := client.ListenCommands(ctx) + if err != nil { + return fmt.Errorf("[%s] ListenCommands: %w", ip, err) + } + defer stream.CloseSend() + + for i := 0; i < *count; i++ { + // allow prompt cancellation + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled during send loop", ip) + return ctx.Err() + default: + } + start := time.Now() - var data []byte - //currentTime := time.Now().UnixNano() randomBytes := make([]byte, datasize) if _, err := rand.Read(randomBytes); err != nil { - log.Fatalf("failed to generate random bytes: %v", err) + return fmt.Errorf("[%s] failed to generate random bytes: %w", ip, err) } - randomSuffix := hex.EncodeToString(randomBytes) - data = []byte(fmt.Sprintf("%s-%s", ip, randomSuffix)) + randomSuffix := hex.EncodeToString(randomBytes) + data := []byte(fmt.Sprintf("%s-%s", ip, randomSuffix)) pubReq := &protobuf.Request{ Command: int32(CommandPublishData), Topic: *topic, Data: data, } if err := stream.Send(pubReq); err != nil { - log.Fatalf("send publish: %v", err) + return fmt.Errorf("[%s] send publish: %w", ip, err) } @@ - conn.Close() } return nil }And in
main, actually log the error instead of ignoring it:- sendMessages(ctx, ip, datasize, *output != "", dataCh) + if err := sendMessages(ctx, ip, datasize, *output != "", dataCh); err != nil { + log.Printf("[%s] sendMessages error: %v", ip, err) + }This fixes the connection leak pattern, makes the CLI responsive to
Ctrl+C, and isolates failures per IP.
Based on learningsAlso applies to: 116-193
222-247: Remove or wire up unusedhandleResponse/headHex
handleResponseandheadHexare not used anywhere in this file. As a result:
encoding/jsonandsync/atomicare only needed to support dead code.- Any server responses on this stream are silently ignored on the publisher side.
Either:
- Wire up a receive loop on the same stream and call
handleResponse, or- Remove
handleResponse,headHex, and any now-unused imports to keep the publisher focused and maintainable.Right now they just add noise.
249-277: Consider returning errors fromwriteHashToFileinstead oflog.Fatal
writeHashToFilecallslog.Fatalonos.Createfailure, which terminates the whole process. In a multi-worker CLI this is brittle; you likely want to log and keep other publishers running.Consider changing the signature to return
errorand letting the caller log it, e.g.:-func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) { +func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) error { file, err := os.Create(filename) if err != nil { - log.Fatal(err) + return fmt.Errorf("failed to create output file %q: %w", filename, err) } defer file.Close() @@ - done <- true - fmt.Println("All data flushed to disk") + done <- true + fmt.Println("All data flushed to disk") + return nil }Then wrap it in a goroutine in
mainand log any returned error.
As per coding guidelinesgrpc_p2p_client/p2p_client_multi_streams_subscribe.go (7)
31-47: ShareP2PMessageandCommanddefinitions instead of duplicating themSame as in the publisher: these types are duplicated here and in other CLI files.
Move
P2PMessageandCommand(and their constants) into a shared internal package and import them, instead of redefining per binary. That keeps your protocol surface in one place and avoids drift.
As per coding guidelines
84-88: Fix writer coordination: deadlocks and WaitGroup misuseThis writer setup is currently unsafe:
dataDone/traceDoneare always created, but writers only start when*outputData/*outputTraceare non-empty. If either flag is omitted,<-dataDone/<-traceDoneat Lines 119–120 can block forever.- Writer goroutines call
defer wg.Done()even thoughwg.Add(1)is never called for them. This will panic with a negative WaitGroup counter.- There is an unnecessary wrapper goroutine that only starts another goroutine.
Adopt the simpler pattern previously suggested:
- // Buffered channel to prevent blocking - dataCh := make(chan string, 100) - traceCh := make(chan string, 100) - dataDone := make(chan bool) - traceDone := make(chan bool) - - // Launch goroutines with synchronization - var wg sync.WaitGroup - if *outputData != "" { - go func() { - defer wg.Done() - header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") - go writeToFile(ctx, dataCh, dataDone, *outputData, header) - }() - } - - if *outputTrace != "" { - go func() { - defer wg.Done() - header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) - }() - } + // Buffered channels to prevent blocking + dataCh := make(chan string, 100) + traceCh := make(chan string, 100) + var dataDone chan bool + var traceDone chan bool + + var wg sync.WaitGroup + if *outputData != "" { + dataDone = make(chan bool) + header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + go writeToFile(ctx, dataCh, dataDone, *outputData, header) + } + + if *outputTrace != "" { + traceDone = make(chan bool) + header := "" // e.g. "type\tpeer\trecv\tmsgID\ttopic\ttimestamp" + go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) + } @@ - wg.Wait() - close(dataCh) - close(traceCh) - <-dataDone - <-traceDone + wg.Wait() + close(dataCh) + close(traceCh) + if dataDone != nil { + <-dataDone + } + if traceDone != nil { + <-traceDone + }This removes the deadlocks and WaitGroup misuse while keeping the shutdown semantics clear.
Based on learningsAlso applies to: 90-107, 116-121
108-114: HardenreceiveMessages: avoidlog.Fatalf, Recv goroutine leaks, and per-message goroutinesCurrent issues:
grpc.NewClient,ListenCommands, and the initialSendall uselog.Fatalf, which kills the entire process on failure of a single IP.- The Recv goroutine writes to
msgChanwithout checkingctx; ifctxis canceled and the handler loop exits, it can block forever on a full buffer, leaking a goroutine.- Every message spawns a goroutine calling
handleResponse, which can explode under high throughput.Refactor along these lines:
func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, writeTrace bool, traceCh chan<- string) error { @@ - conn, err := grpc.NewClient(ip, + conn, err := grpc.NewClient(ip, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ ) if err != nil { - log.Fatalf("failed to connect to node %v", err) + return fmt.Errorf("[%s] failed to connect to node: %w", ip, err) } defer conn.Close() @@ - stream, err := client.ListenCommands(ctx) - - if err != nil { - log.Fatalf("ListenCommands: %v", err) - } + stream, err := client.ListenCommands(ctx) + if err != nil { + return fmt.Errorf("[%s] ListenCommands: %w", ip, err) + } @@ subReq := &protobuf.Request{ Command: int32(CommandSubscribeToTopic), Topic: *topic, } if err := stream.Send(subReq); err != nil { - log.Fatalf("send subscribe: %v", err) + return fmt.Errorf("[%s] send subscribe: %w", ip, err) } @@ msgChan := make(chan *protobuf.Response, 10000) @@ - // recv goroutine - go func() { + // recv goroutine + go func() { for { resp, err := stream.Recv() if err == io.EOF { close(msgChan) return } if err != nil { - log.Printf("recv error: %v", err) + log.Printf("[%s] recv error: %v", ip, err) close(msgChan) return } - msgChan <- resp + select { + case msgChan <- resp: + case <-ctx.Done(): + log.Printf("[%s] context canceled while delivering message", ip) + close(msgChan) + return + } } }() @@ - case resp, ok := <-msgChan: - if !ok { - log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) - return nil - } - go func(resp *protobuf.Response) { - handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) - }(resp) + case resp, ok := <-msgChan: + if !ok { + log.Printf("[%s] stream closed. Total messages received: %d", ip, atomic.LoadInt32(&receivedCount)) + return nil + } + handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh)And in
main, actually handle errors fromreceiveMessages:- go func(ip string) { - defer wg.Done() - receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) - }(ip) + go func(ip string) { + defer wg.Done() + if err := receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh); err != nil { + log.Printf("[%s] receiveMessages error: %v", ip, err) + } + }(ip)This removes global process aborts, prevents Recv goroutine leaks, and keeps per-node processing bounded.
Based on learningsAlso applies to: 123-202
230-263: Backpressure & ordering: process messages inline instead of spawning a goroutine per messageRelated to the previous comment: spawning a goroutine for every
handleResponsecall can create thousands of goroutines under load, with no ordering guarantees for outputs todataCh/traceCh.Processing inline (as shown in the previous diff) is both simpler and safer. If you need concurrency later, consider a fixed-size worker pool rather than unbounded goroutine fan-out.
Based on learnings
272-332: Fix GossipSub trace printing and guard against nil topic pointersTwo concrete problems here:
topic = string(*evt.DeliverMessage.Topic)andstring(*evt.PublishMessage.Topic)will panic ifTopicis nil. Trace events without topics are valid.fmt.Print("%s\t...")does not apply formatting verbs, so the stdout branch never formats the fields correctly.Tighten this up:
- msgID := "" - topic := "" - if evt.DeliverMessage != nil { - rawBytes = []byte(evt.DeliverMessage.MessageID) - msgID = base58.Encode(rawBytes) - topic = string(*evt.DeliverMessage.Topic) - } - if evt.PublishMessage != nil { - rawBytes = []byte(evt.PublishMessage.MessageID) - msgID = base58.Encode(rawBytes) - topic = string(*evt.PublishMessage.Topic) - } + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + if evt.DeliverMessage.Topic != nil { + topic = string(*evt.DeliverMessage.Topic) + } + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + if evt.PublishMessage.Topic != nil { + topic = string(*evt.PublishMessage.Topic) + } + } @@ - if writetrace { - dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) - traceCh <- dataToSend - } else { - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) - } + if writetrace { + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend + } else { + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + }This restores correct tabular formatting and avoids nil-pointer panics.
334-437: Apply the same safety/formatting fixes inhandleOptimumP2PTrace
handleOptimumP2PTracesuffers from the same issues:
topic = string(*evt.DeliverMessage.Topic)/string(*evt.PublishMessage.Topic)without nil checks.fmt.Print("%s\t...")with formatting verbs, so stdout trace lines are not formatted.Mirror the GossipSub handler fix:
- msgID := "" - topic := "" - if evt.DeliverMessage != nil { - rawBytes = []byte(evt.DeliverMessage.MessageID) - msgID = base58.Encode(rawBytes) - topic = string(*evt.DeliverMessage.Topic) - } - if evt.PublishMessage != nil { - rawBytes = []byte(evt.PublishMessage.MessageID) - msgID = base58.Encode(rawBytes) - topic = string(*evt.PublishMessage.Topic) - } + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + if evt.DeliverMessage.Topic != nil { + topic = string(*evt.DeliverMessage.Topic) + } + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + if evt.PublishMessage.Topic != nil { + topic = string(*evt.PublishMessage.Topic) + } + } @@ - if writetrace { - dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) - traceCh <- dataToSend - } else { - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) - } + if writetrace { + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend + } else { + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + }Without this, valid trace events can still panic the subscriber or emit malformed stdout.
439-473: EnsurewriteToFilealways signalsdoneand doesn’t drop data on ctx cancellationThe current
writeToFilecombinesrange dataChwith aselectonctx.Done():
- If
ctxis canceled after reading a line fromdataCh, it returns immediately without writing that line and without sending ondone.- Callers then block forever on
<-dataDone/<-traceDone. This deadlock path was already reported and still exists.Use a
deferto guaranteedoneis signaled exactly once, and drive termination viactxor channel close:func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) { file, err := os.Create(filename) if err != nil { log.Fatal(err) } defer file.Close() writer := bufio.NewWriter(file) - defer writer.Flush() - - // write the header - if header != "" { - _, err := writer.WriteString(header + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } - - // Process until channel is closed - for data := range dataCh { - select { - case <-ctx.Done(): - return - default: - - } - _, err := writer.WriteString(data + "\n") - writer.Flush() - if err != nil { - log.Printf("Write error: %v", err) - } - } - done <- true - fmt.Println("All data flushed to disk") + defer writer.Flush() + defer func() { + done <- true + fmt.Println("All data flushed to disk") + }() + + // optional header + if header != "" { + if _, err := writer.WriteString(header + "\n"); err != nil { + log.Printf("Write error: %v", err) + } + } + + for { + select { + case <-ctx.Done(): + return + case data, ok := <-dataCh: + if !ok { + return + } + if _, err := writer.WriteString(data + "\n"); err != nil { + log.Printf("Write error: %v", err) + } + } + } }That guarantees
doneis fired and avoids losing the last line on cancellation.
Based on learnings
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (2)
grpc_p2p_client/go.modis excluded by none and included by nonegrpc_p2p_client/local.ips.tsvis excluded by!**/*.tsvand included by none
📒 Files selected for processing (2)
grpc_p2p_client/p2p_client_multi_streams_publish.go(1 hunks)grpc_p2p_client/p2p_client_multi_streams_subscribe.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client_multi_streams_publish.gogrpc_p2p_client/p2p_client_multi_streams_subscribe.go
🧠 Learnings (1)
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Concurrency safety in Go: avoid goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.gogrpc_p2p_client/p2p_client_multi_streams_subscribe.go
🧬 Code graph analysis (1)
grpc_p2p_client/p2p_client_multi_streams_subscribe.go (2)
grpc_p2p_client/p2p_client_multi_streams_publish.go (3)
Command(37-37)CommandSubscribeToTopic(42-42)P2PMessage(29-34)grpc_p2p_client/p2p_client.go (3)
Command(34-34)CommandSubscribeToTopic(39-39)P2PMessage(26-31)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test Docker Setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
grpc_p2p_client/p2p_client_multi_streams_publish.go (3)
120-193: Connection-in-loop and other critical issues already flaggedThis function has several critical issues that were comprehensively covered in previous reviews:
- Resource leak: Lines 128-192 create a new gRPC connection inside the message loop (flagged as critical)
- Process termination: Lines 136, 146 use
log.Fatalfin worker goroutines (flagged)- Deterministic randomness: Lines 182-186 use Poisson without seeding
mathrand(flagged)Please address the previous review comments, particularly the connection-in-loop issue which will cause resource exhaustion under load.
225-250: Dead code:handleResponseandheadHexare unusedBoth functions are never invoked in this file:
handleResponse(lines 225-243) is the only place usingsync/atomicheadHex(lines 245-250) has no callers- This was already flagged in previous reviews
Remove or wire up these functions as suggested in earlier review comments.
252-280: File creation error handling already flaggedLine 255 uses
log.Fatalwhich terminates the entire process. Previous reviews suggested returning an error instead, allowing the publisher goroutines to continue even if file output fails.Refer to the earlier review comment suggesting
writeHashToFilereturn an error.
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
🧠 Learnings (2)
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Concurrency safety in Go: avoid goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Security considerations in Go: flag SQL/command injection, path traversal, unvalidated inputs, hardcoded credentials, insecure crypto (MD5/SHA1), and missing rate limiting on public endpoints
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test Docker Setup
🔇 Additional comments (1)
grpc_p2p_client/p2p_client_multi_streams_publish.go (1)
199-223: LGTM: Clean IP file parsingThe implementation correctly:
- Opens and defers file closure
- Skips empty lines and comments
- Returns wrapped errors with context
- Handles scanner errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (16)
grpc_p2p_client/p2p_client_multi_streams_publish.go (7)
28-44: Extract duplicated types to a shared package.
P2PMessageandCommandare duplicated verbatim acrossp2p_client.go,p2p_client_multi_streams_subscribe.go, and this file. Consolidate them into a common internal package (e.g.,internal/p2p) to eliminate maintenance burden and ensure consistency.
88-88: Remove confusing runtime flag mutation.Halving
*dataSizein-place via float cast is opaque to users and risks precision loss. If half the flag value is needed, use a local variable:computedSize := *dataSize / 2.
95-98: Eliminate unnecessary nested goroutine wrapper.The outer
go func()(line 95) adds no value and doesn't track the actual writer in the WaitGroup. StartwriteHashToFiledirectly:if *output != "" { done = make(chan bool) - go func() { - header := fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeHashToFile(dataCh, done, *output, header) - }() + header := fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeHashToFile(dataCh, done, *output, header) }
120-193: CRITICAL: Connection created inside loop causes massive resource leak.Lines 128-134 create a new gRPC connection inside the
for i := 0; i < *count; i++loop, meaning a fresh TCP/HTTP2 connection (with handshake and TLS overhead) is established for every single message. With highcountvalues, this exhausts file descriptors and memory.Move the connection and stream setup outside the loop and reuse them:
func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { - for i := 0; i < *count; i++ { - select { - case <-ctx.Done(): - log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() - default: - } + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } - conn, err := grpc.NewClient(ip, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt), - grpc.MaxCallSendMsgSize(math.MaxInt), - ), - ) - if err != nil { - log.Fatalf("failed to connect to node %v", err) - } - //defer conn.Close() - println(fmt.Sprintf("Connected to node at: %s…", ip)) + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + return fmt.Errorf("[%s] failed to connect: %w", ip, err) + } + defer conn.Close() + println(fmt.Sprintf("Connected to node at: %s…", ip)) - client := protobuf.NewCommandStreamClient(conn) + client := protobuf.NewCommandStreamClient(conn) + stream, err := client.ListenCommands(ctx) + if err != nil { + return fmt.Errorf("[%s] ListenCommands: %w", ip, err) + } + defer stream.CloseSend() - stream, err := client.ListenCommands(ctx) - - if err != nil { - log.Fatalf("ListenCommands: %v", err) - } + for i := 0; i < *count; i++ { + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping publishes", ip) + return ctx.Err() + default: + } start := time.Now() var data []byte @@ - conn.Close() } return nil }Also replace
log.Fatalfwith returned errors (lines 136, 146) so a single node failure doesn't kill the entire process. Based on learnings
182-186: Seed Poisson RNG once in main for non-deterministic delays.
mathrand.ExpFloat64()uses the default seed (1), producing identical Poisson intervals across runs. Seed the RNG once inmainafterflag.Parse():func main() { flag.Parse() + mathrand.Seed(time.Now().UnixNano()) if *topic == "" { log.Fatalf("−topic is required") }
225-243: Remove unusedhandleResponsefunction andsync/atomicimport.
handleResponseis never invoked—sendMessagesonly sends messages and never callsstream.Recv(). Remove this function and thesync/atomicimport to eliminate dead code.
252-280: Return errors instead of callinglog.Fatalfor output file failures.
log.Fatal(line 255) terminates the entire process if the output file cannot be created. Return an error instead so the caller can decide:-func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) { +func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) error { file, err := os.Create(filename) if err != nil { - log.Fatal(err) + return fmt.Errorf("failed to create output file: %w", err) } defer file.Close() @@ done <- true fmt.Println("All data flushed to disk") + return nil }Then in the writer startup (line 97):
- go writeHashToFile(dataCh, done, *output, header) + go func() { + if err := writeHashToFile(dataCh, done, *output, header); err != nil { + log.Printf("Hash writer error: %v", err) + } + }()grpc_p2p_client/p2p_client_multi_streams_subscribe.go (9)
88-124: CRITICAL: Deadlock on shutdown when output files are not specified.
dataDoneandtraceDoneare always created (lines 91-92) and always awaited (lines 123-124), but writers only run when*outputData != ""or*outputTrace != "". When either flag is omitted, no goroutine sends on the corresponding done channel, causing the program to block forever.Additionally, the nested goroutine pattern (lines 96-101, 104-109) adds complexity:
wg.Add(1)anddefer wg.Done()wrap a call togo writeToFile, but the WaitGroup doesn't track the actual writer.Fix by conditionally creating done channels and starting writers directly:
// Buffered channel to prevent blocking dataCh := make(chan string, 100) traceCh := make(chan string, 100) - dataDone := make(chan bool) - traceDone := make(chan bool) + var dataDone chan bool + var traceDone chan bool - // Launch goroutines with synchronization var wg sync.WaitGroup if *outputData != "" { - go func() { - defer wg.Done() - header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") - go writeToFile(ctx, dataCh, dataDone, *outputData, header) - }() + dataDone = make(chan bool) + header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + go writeToFile(ctx, dataCh, dataDone, *outputData, header) } if *outputTrace != "" { - go func() { - defer wg.Done() - header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") - go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) - }() + traceDone = make(chan bool) + header := "" + go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) } for _, ip := range ips { wg.Add(1); @@ wg.Wait() close(dataCh) close(traceCh) - <-dataDone - <-traceDone + if dataDone != nil { + <-dataDone + } + if traceDone != nil { + <-traceDone + }Also ensure
writeToFilealways signalsdone(see separate comment). Based on learnings
173-186: Fix Recv goroutine leak by respecting context when sending to msgChan.The Recv goroutine (lines 173-186) writes to
msgChanwithout checkingctx. If the context is canceled and the buffer fills, the goroutine blocks forever onmsgChan <- resp, leaking resources.Add context awareness:
go func() { for { resp, err := stream.Recv() if err == io.EOF { close(msgChan) return } if err != nil { - log.Printf("recv error: %v", err) + log.Printf("[%s] recv error: %v", ip, err) close(msgChan) return } - msgChan <- resp + select { + case msgChan <- resp: + case <-ctx.Done(): + log.Printf("[%s] context canceled while delivering message", ip) + close(msgChan) + return + } } }()Based on learnings
200-202: Avoid spawning a goroutine per message—handle responses inline.Every incoming message spawns a new goroutine (lines 200-202). Under high message rates, this creates excessive goroutines and scheduling overhead. Process responses inline unless there's a clear need for concurrency:
if !ok { log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) return nil } - go func(resp *protobuf.Response) { - handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) - }(resp) + handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh)Based on learnings
146-166: Replacelog.Fatalfwith error returns to prevent process termination on single-node failures.Lines 146, 155, and 165 call
log.Fatalf, which kills the entire process when one IP fails to connect. Return errors instead so other workers can continue:if err != nil { - log.Fatalf("failed to connect to node %v", err) + return fmt.Errorf("[%s] failed to connect: %w", ip, err) } @@ if err != nil { - log.Fatalf("ListenCommands: %v", err) + return fmt.Errorf("[%s] ListenCommands: %w", ip, err) } @@ if err := stream.Send(subReq); err != nil { - log.Fatalf("send subscribe: %v", err) + return fmt.Errorf("[%s] send subscribe: %w", ip, err) }Then log the returned error from
receiveMessagesin the caller (line 116). Based on learnings
302-317: Guard against nil pointer dereference when accessing Topic fields.Lines 308 and 315 dereference
evt.DeliverMessage.Topicandevt.PublishMessage.Topicwithout checking for nil. Valid trace events may omit topics, causing a panic.Add nil checks:
msgID := "" topic := "" if evt.DeliverMessage != nil { rawBytes = []byte(evt.DeliverMessage.MessageID) msgID = base58.Encode(rawBytes) - topic = string(*evt.DeliverMessage.Topic) + if evt.DeliverMessage.Topic != nil { + topic = string(*evt.DeliverMessage.Topic) + } } if evt.PublishMessage != nil { rawBytes = []byte(evt.PublishMessage.MessageID) msgID = base58.Encode(rawBytes) - topic = string(*evt.PublishMessage.Topic) + if evt.PublishMessage.Topic != nil { + topic = string(*evt.PublishMessage.Topic) + } }
333-333: Fix broken stdout trace formatting—usefmt.Printfinstead offmt.Print.Line 333 uses
fmt.Printwith a format string and arguments, which doesn't apply formatting verbs. The output will literally print%s\t%s...followed by unformatted arguments.Change to
fmt.Printf:} else { - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) }
391-406: Guard against nil pointer dereference when accessing Topic fields.Lines 397 and 404 dereference topic pointers without nil checks. Add the same guards as in
handleGossipSubTrace:msgID := "" topic := "" if evt.DeliverMessage != nil { rawBytes = []byte(evt.DeliverMessage.MessageID) msgID = base58.Encode(rawBytes) - topic = string(*evt.DeliverMessage.Topic) + if evt.DeliverMessage.Topic != nil { + topic = string(*evt.DeliverMessage.Topic) + } } if evt.PublishMessage != nil { rawBytes = []byte(evt.PublishMessage.MessageID) msgID = base58.Encode(rawBytes) - topic = string(*evt.PublishMessage.Topic) + if evt.PublishMessage.Topic != nil { + topic = string(*evt.PublishMessage.Topic) + } }
430-430: Fix broken stdout trace formatting—usefmt.Printfinstead offmt.Print.Line 430 incorrectly uses
fmt.Printwith a format string. Change tofmt.Printf:} else { - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + fmt.Printf("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) }
443-477: CRITICAL: EnsurewriteToFilealways signalsdonechannel to prevent deadlock.Lines 463-465 return early on context cancellation without signaling the
donechannel. This causes the caller (main) to block forever on<-dataDoneor<-traceDone(lines 123-124).Use
deferto guaranteedoneis signaled exactly once:func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) { file, err := os.Create(filename) if err != nil { log.Fatal(err) } defer file.Close() writer := bufio.NewWriter(file) defer writer.Flush() + defer func() { + done <- true + fmt.Println("All data flushed to disk") + }() // write the header if header != "" { _, err := writer.WriteString(header + "\n") if err != nil { log.Printf("Write error: %v", err) } } - // Process until channel is closed - for data := range dataCh { - select { - case <-ctx.Done(): - return - default: - - } - _, err := writer.WriteString(data + "\n") - writer.Flush() - if err != nil { - log.Printf("Write error: %v", err) - } - } - done <- true - fmt.Println("All data flushed to disk") + for { + select { + case <-ctx.Done(): + return + case data, ok := <-dataCh: + if !ok { + return + } + if _, err := writer.WriteString(data + "\n"); err != nil { + log.Printf("Write error: %v", err) + } + } + } }This also removes redundant per-line
Flush()calls. Based on learnings
📜 Review details
Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
grpc_p2p_client/p2p_client_multi_streams_publish.go(1 hunks)grpc_p2p_client/p2p_client_multi_streams_subscribe.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go
⚙️ CodeRabbit configuration file
Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.
Files:
grpc_p2p_client/p2p_client_multi_streams_publish.gogrpc_p2p_client/p2p_client_multi_streams_subscribe.go
🧠 Learnings (2)
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Concurrency safety in Go: avoid goroutine leaks, unbounded channels, racy access; ensure contexts are plumbed and respected
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.gogrpc_p2p_client/p2p_client_multi_streams_subscribe.go
📚 Learning: 2025-10-23T18:30:48.214Z
Learnt from: CR
Repo: getoptimum/coderabbit PR: 0
File: coderabbit-custom-pre-merge-checks-unique-id-file-non-traceable-F7F2B60C-1728-4C9A-8889-4F2235E186CA.txt:0-0
Timestamp: 2025-10-23T18:30:48.214Z
Learning: Applies to **/*.go : Security considerations in Go: flag SQL/command injection, path traversal, unvalidated inputs, hardcoded credentials, insecure crypto (MD5/SHA1), and missing rate limiting on public endpoints
Applied to files:
grpc_p2p_client/p2p_client_multi_streams_publish.gogrpc_p2p_client/p2p_client_multi_streams_subscribe.go
🧬 Code graph analysis (1)
grpc_p2p_client/p2p_client_multi_streams_subscribe.go (2)
grpc_p2p_client/p2p_client_multi_streams_publish.go (3)
Command(37-37)CommandSubscribeToTopic(42-42)P2PMessage(29-34)grpc_p2p_client/p2p_client.go (3)
Command(34-34)CommandSubscribeToTopic(39-39)P2PMessage(26-31)
|
@paiva, this creates a problem, and the PR is wrongly approved. @swarna1101 can you please isolate the multiple |
|
@hpsing Can you provide more context on the problem it creates? The PR was tested for Publishing and Subscribing |
@paiva P2P client is a single go project, it can’t have multiple main func, only one entry point. Either use one script extend the flag with the functions needed or create new project. https://github.com/getoptimum/optimum-dev-setup-guide/blob/main/grpc_p2p_client/p2p_client.go#L55 |
Sounds good, we will keep a different branch or a fork of the repository for experiments and QA, we won't merge unless it is reviewed by Swarna |
Summary by CodeRabbit
New Features
Refactor
Tests
Chore