Skip to content

Conversation

@swarna1101
Copy link
Collaborator

@swarna1101 swarna1101 commented Nov 27, 2025

  • Restructured into cmd/ subdirectories with separate binaries:

    • cmd/single/ - Single node publisher/subscriber
    • cmd/multi-publish/ - Multi-node concurrent publisher
    • cmd/multi-subscribe/ - Multi-node concurrent subscriber
  • Extracted shared code into shared/ package:

    • shared/types.go - Common types and constants
    • shared/utils.go - Shared utilities and trace handlers

Summary by CodeRabbit

  • Documentation

    • Updated version references to rc16 across guides and examples.
  • New Features

    • Added standalone single-node client plus multi-node publish and multi-node subscribe tools with flag-driven modes, optional data/trace logging, and graceful shutdown.
  • Refactor

    • Centralized IP, response/tracing handling, hashing, and file-output utilities into a shared module used by the clients.
  • Chores

    • CI/build updated to produce multiple P2P binaries and removed the legacy monolithic client.

✏️ Tip: You can customize this high-level summary in your review settings.

@swarna1101 swarna1101 requested a review from hpsing November 27, 2025 07:05
@coderabbitai
Copy link

coderabbitai bot commented Nov 27, 2025

Walkthrough

This PR reorganizes the grpc_p2p_client into a modular layout: it removes the previous monolithic clients (grpc_p2p_client/p2p_client.go and p2p_client_multi_streams_subscribe.go), adds a new shared package (grpc_p2p_client/shared/{types.go,utils.go}) that centralizes message/command types and utilities, and introduces three CLI binaries under grpc_p2p_client/cmd/ (single, multi-publish, multi-subscribe). The Makefile and CI were updated to build multiple binaries. Documentation version strings were bumped from rc13 to rc16 and a docs code path for trace handling was adjusted.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant CLI as cmd/single
    participant gRPC as gRPC Server
    participant Shared as shared package
    participant File as File Output
    CLI->>CLI: parse flags, validate
    CLI->>gRPC: open connection & ListenCommands
    alt subscribe
        CLI->>gRPC: Send SubscribeToTopic
        gRPC->>CLI: stream Response
        CLI->>Shared: HandleResponse(resp)
        Shared-->>File: optional data/trace via WriteToFile
    else publish
        CLI->>CLI: generate payload(s)
        loop per message
            CLI->>gRPC: Send PublishData
            CLI->>CLI: log latency
        end
    end
    CLI->>CLI: on signal -> cancel -> close
Loading
sequenceDiagram
    autonumber
    participant Main as cmd/multi-subscribe
    participant Shared as shared package
    participant Worker as per-IP worker
    participant gRPC as gRPC Server
    participant Data as Data Channel
    participant Trace as Trace Channel
    participant File as File I/O
    Main->>Shared: ReadIPsFromFile(ipFile)
    Shared-->>Main: []IPs
    Main->>Main: slice IPs, start workers
    par per IP
        Main->>Worker: start goroutine
        Worker->>gRPC: connect & ListenCommands
        Worker->>gRPC: Send SubscribeToTopic
        gRPC->>Worker: stream Response
        Worker->>Shared: HandleResponseWithTracking(ip, resp, ...)
        Shared-->>Data: enqueue data (optional)
        Shared-->>Trace: enqueue trace (optional)
    end
    par file writers
        File->>Data: drain -> write file
        File->>Trace: drain -> write file
    end
    Main->>Main: on cancel -> stop workers -> wait -> exit
Loading
sequenceDiagram
    autonumber
    participant Client as cmd/multi-publish
    participant Shared as shared package
    participant IPs as IP file
    participant gRPC as gRPC Server
    participant Output as Output writer
    Client->>Shared: ReadIPsFromFile(ipFile)
    Shared-->>Client: []IPs
    loop per IP (concurrent)
        Client->>gRPC: connect & ListenCommands
        Client->>Client: prepare payload(s)
        Client->>gRPC: Send PublishData
        Client->>Shared: send hashes/records to WriteToFile (via channel)
    end
    Client->>Client: on signal -> cancel -> wait -> exit
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Review concurrency and cancellation across cmd/* (goroutines, channels, context cancellation, WaitGroup).
  • Verify correctness of shared.HandleResponseWithTracking and trace decoders against protobuf message schemas.
  • Inspect ReadIPsFromFile and WriteToFile for edge cases (empty/commented lines, file errors, cancellation).
  • Validate Makefile and CI changes produce the three binaries and that build targets point to correct cmd/* paths.

Possibly related PRs

  • Kmk grpc add example #53 — Modifies the same grpc_p2p_client area (P2PMessage/Command migration and subscribe/publish/trace handling), likely a direct code-level match.

Suggested reviewers

  • paiva

Pre-merge checks and finishing touches

❌ Failed checks (5 warnings)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Go Build And Test Rationale ⚠️ Warning PR introduces 1000+ lines of Go code across shared utilities and three command packages with zero test coverage for critical protobuf parsing, file I/O, concurrency, and gRPC logic. Add unit tests for shared/utils.go (protobuf unmarshaling, file I/O, SHA-256, channels) and integration tests for cmd packages validating gRPC flows and concurrent worker behavior.
Concurrency Safety ⚠️ Warning Pull request contains critical concurrency safety violations including goroutine leaks, unbounded spawning, and race conditions on closed channels causing panics. Remove nested goroutine wrappers, track handlers with WaitGroup, implement bounded goroutine pools, add defer statements for cleanup, and use context.WithTimeout instead of WithCancel.
Public Api Changes ⚠️ Warning PR introduces exported types and functions in new shared/ package without documenting these public API additions in CHANGELOG or migration guide. Add CHANGELOG entry documenting the new shared/ package exports, update readme.md with public API documentation including usage examples, and add comprehensive godoc comments to all exported types and functions.
Security Considerations ⚠️ Warning PR introduces three MEDIUM-severity security vulnerabilities: path traversal in file operations, missing index bounds validation, and insecure gRPC credentials without TLS. Implement path validation with filepath.Clean/Abs, add index bounds checking with clamping, and add configurable TLS support for gRPC connections.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main objective of the PR: refactoring grpc_p2p_client to eliminate multiple main() conflicts by restructuring the codebase into separate binaries and shared utilities.
Rust Best Practices ✅ Passed The Rust best practices check is not applicable to this PR, which contains only Go, YAML, and configuration files with no Rust source code.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/refactor-grpc-client-multiple-main

📜 Recent review details

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e44f43e and a1679f4.

📒 Files selected for processing (1)
  • .github/workflows/ci.yml (0 hunks)
💤 Files with no reviewable changes (1)
  • .github/workflows/ci.yml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test Docker Setup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai bot requested a review from paiva November 27, 2025 07:06
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
docs/guide.md (1)

13-14: ⚠️ Documentation and build configuration both reference removed binary—refactor incomplete.

The PR moved p2p_client.go to grpc_p2p_client/cmd/single/main.go to resolve a multiple main() conflict. However, both the Makefile and documentation still reference the removed file.

Issues:

  1. Makefile broken: Build target tries to compile non-existent ./p2p_client.go

    # Line in Makefile - will fail:
    @cd $(P2P_CLIENT_DIR) && go build -o p2p-client ./p2p_client.go

    Should be: @cd $(P2P_CLIENT_DIR) && go build -o p2p-client ./cmd/single

  2. Documentation outdated: All 15 references to ./grpc_p2p_client/p2p-client (lines 13–14, 220, 223, 226, 229, 905, 913, 965, 973, 996, 999, 1011, 1112, 1297) assume the old unified binary path.

The refactored code supports the documented -mode flag (subscribe/publish), so building cmd/single/main.go preserves the expected interface. Update both Makefile and docs to reference the new build location.

grpc_p2p_client/cmd/multi-publish/main.go (3)

77-84: Discarded error return from sendMessages.

The goroutine ignores the error returned by sendMessages. At minimum, log errors so failures are visible.

 	for _, ip := range ips {
 		wg.Add(1)
 		go func(ip string) {
 			defer wg.Done()
 			datasize := *dataSize
-			sendMessages(ctx, ip, datasize, *output != "", dataCh)
+			if err := sendMessages(ctx, ip, datasize, *output != "", dataCh); err != nil {
+				log.Printf("[%s] sendMessages failed: %v", ip, err)
+			}
 		}(ip)
 	}

101-118: log.Fatalf inside loop kills entire process on transient errors; connection-per-message is inefficient.

  1. Using log.Fatalf (lines 109, 117) inside a loop within a goroutine will terminate the entire program on any single node failure. Return errors instead.
  2. Creating a new gRPC connection per message is expensive. Move connection establishment outside the loop and reuse.
 func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error {
+	conn, err := grpc.NewClient(ip,
+		grpc.WithTransportCredentials(insecure.NewCredentials()),
+		grpc.WithDefaultCallOptions(
+			grpc.MaxCallRecvMsgSize(math.MaxInt),
+			grpc.MaxCallSendMsgSize(math.MaxInt),
+		),
+	)
+	if err != nil {
+		return fmt.Errorf("[%s] failed to connect: %w", ip, err)
+	}
+	defer conn.Close()
+
+	client := protobuf.NewCommandStreamClient(conn)
+	stream, err := client.ListenCommands(ctx)
+	if err != nil {
+		return fmt.Errorf("[%s] ListenCommands: %w", ip, err)
+	}
+	log.Printf("Connected to node at: %s…", ip)
+
 	for i := 0; i < *count; i++ {
 		select {
 		case <-ctx.Done():
 			log.Printf("[%s] context canceled, stopping", ip)
 			return ctx.Err()
 		default:
 		}
-
-		conn, err := grpc.NewClient(ip,
-			grpc.WithTransportCredentials(insecure.NewCredentials()),
-			grpc.WithDefaultCallOptions(
-				grpc.MaxCallRecvMsgSize(math.MaxInt),
-				grpc.MaxCallSendMsgSize(math.MaxInt),
-			),
-		)
-		if err != nil {
-			log.Fatalf("failed to connect to node %v", err)
-		}
-		println(fmt.Sprintf("Connected to node at: %s…", ip))
-
-		client := protobuf.NewCommandStreamClient(conn)
-		stream, err := client.ListenCommands(ctx)
-
-		if err != nil {
-			log.Fatalf("ListenCommands: %v", err)
-		}
         // ... rest of publish logic
-		conn.Close()
 	}
 	return nil
 }

44-51: Missing validation for empty IP file.

If *ipfile is empty or the file contains no valid IPs, ips will be an empty slice and no work happens silently. Consider validating.

 	_ips, err := shared.ReadIPsFromFile(*ipfile)
 	if err != nil {
 		fmt.Printf("Error: %v\n", err)
 		return
 	}
+	if len(_ips) == 0 {
+		log.Fatalf("no IPs found in %s", *ipfile)
+	}
📜 Review details

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 542483f and df14d26.

⛔ Files ignored due to path filters (2)
  • .gitignore is excluded by !.gitignore and included by **.gitignore
  • grpc_p2p_client/go.sum is excluded by !**/*.sum and included by none
📒 Files selected for processing (8)
  • docs/guide.md (3 hunks)
  • grpc_p2p_client/cmd/multi-publish/main.go (5 hunks)
  • grpc_p2p_client/cmd/multi-subscribe/main.go (1 hunks)
  • grpc_p2p_client/cmd/single/main.go (1 hunks)
  • grpc_p2p_client/p2p_client.go (0 hunks)
  • grpc_p2p_client/p2p_client_multi_streams_subscribe.go (0 hunks)
  • grpc_p2p_client/shared/types.go (1 hunks)
  • grpc_p2p_client/shared/utils.go (1 hunks)
💤 Files with no reviewable changes (2)
  • grpc_p2p_client/p2p_client.go
  • grpc_p2p_client/p2p_client_multi_streams_subscribe.go
🧰 Additional context used
📓 Path-based instructions (3)
**/*

⚙️ CodeRabbit configuration file

**/*: When you find the same underlying issue in multiple locations (same pattern, same fix):

  • Do NOT post separate comments for each occurrence.
  • Post a single primary comment on the first occurrence.
  • In that comment, include a short list of “Also at: file:line” references
    for the other locations (e.g. 'Also at: foo.go:42, bar.go:17').
    Prioritize signal over volume: one strong comment that references multiple
    locations is preferred over many near-identical comments.

Files:

  • docs/guide.md
  • grpc_p2p_client/shared/types.go
  • grpc_p2p_client/cmd/multi-subscribe/main.go
  • grpc_p2p_client/cmd/multi-publish/main.go
  • grpc_p2p_client/cmd/single/main.go
  • grpc_p2p_client/shared/utils.go
**/*.md

⚙️ CodeRabbit configuration file

**/*.md: - Technical accuracy first; keep commands copy-pastable; prefer minimal prerequisites.

Files:

  • docs/guide.md
**/*.go

⚙️ CodeRabbit configuration file

Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.

Files:

  • grpc_p2p_client/shared/types.go
  • grpc_p2p_client/cmd/multi-subscribe/main.go
  • grpc_p2p_client/cmd/multi-publish/main.go
  • grpc_p2p_client/cmd/single/main.go
  • grpc_p2p_client/shared/utils.go
🧬 Code graph analysis (2)
grpc_p2p_client/cmd/multi-publish/main.go (2)
grpc_p2p_client/shared/utils.go (2)
  • ReadIPsFromFile (24-47)
  • WriteToFile (213-244)
grpc_p2p_client/shared/types.go (2)
  • Command (12-12)
  • CommandPublishData (16-16)
grpc_p2p_client/shared/utils.go (1)
grpc_p2p_client/shared/types.go (1)
  • P2PMessage (4-9)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test Docker Setup
🔇 Additional comments (5)
grpc_p2p_client/shared/types.go (2)

3-9: Add JSON struct tags for reliable unmarshalling.

P2PMessage is unmarshalled from JSON in HandleResponse and HandleResponseWithTracking (see shared/utils.go lines 60, 86). Without explicit JSON tags, field mapping depends on exact casing match. If the incoming JSON uses different casing (e.g., messageId, source_node_id), unmarshalling will silently fail to populate fields.

 type P2PMessage struct {
-	MessageID    string
-	Topic        string
-	Message      []byte
-	SourceNodeID string
+	MessageID    string `json:"messageId,omitempty"`
+	Topic        string `json:"topic,omitempty"`
+	Message      []byte `json:"message,omitempty"`
+	SourceNodeID string `json:"sourceNodeId,omitempty"`
 }

Verify the expected JSON field names from the protobuf/sidecar response and adjust tags accordingly.


11-19: LGTM — Clean enum definition.

The Command type with iota constants is idiomatic Go. The explicit CommandUnknown = iota as zero value is a good defensive default.

grpc_p2p_client/cmd/single/main.go (1)

34-79: Overall structure is sound.

The single-node CLI design with subscribe/publish modes, signal handling, and stream management is clean. The use of shared utilities is appropriate.

grpc_p2p_client/shared/utils.go (2)

24-47: ReadIPsFromFile is well-implemented.

Proper error wrapping, comment/empty-line filtering, and scanner error checking. Clean idiomatic Go.


49-54: HeadHex is correct and concise.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
Makefile (1)

137-137: Update clean target to remove all three P2P client binaries.

The clean target only removes $(P2P_CLIENT) but three binaries are now created. The p2p-multi-publish and p2p-multi-subscribe binaries will persist after make clean.

Apply this diff to clean all binaries:

clean: ## Clean build artifacts
	@echo "Cleaning build artifacts..."
-	@rm -f $(P2P_CLIENT) $(PROXY_CLIENT) $(KEYGEN_BINARY)
+	@rm -f $(P2P_CLIENT_DIR)/p2p-client $(P2P_CLIENT_DIR)/p2p-multi-publish $(P2P_CLIENT_DIR)/p2p-multi-subscribe $(PROXY_CLIENT) $(KEYGEN_BINARY)
	@echo "Clean complete!"

Alternatively, if you adopt Option 1 from the previous comment, define variables and reference them:

clean: ## Clean build artifacts
	@echo "Cleaning build artifacts..."
	@rm -f $(P2P_SINGLE) $(P2P_MULTI_PUBLISH) $(P2P_MULTI_SUBSCRIBE) $(PROXY_CLIENT) $(KEYGEN_BINARY)
	@echo "Clean complete!"
docs/guide.md (2)

187-230: Add documentation explaining the three new binary modes and their use cases.

The refactoring introduces three separate binaries under cmd/ (single, multi-publish, multi-subscribe), but the "Build and Development Commands" and "Direct Binary Usage" sections don't explain:

  • When to use each mode: Single-node vs. multi-node scenarios
  • How they differ operationally: Are they mutually exclusive or complementary?
  • How the Makefile abstracts binary selection: Users may not realize which binary is invoked by make subscribe vs. make publish

Add a new subsection under "Build and Development Commands" to clarify:

  1. Which binary mode each Makefile target uses
  2. When to invoke binaries directly vs. using Makefile targets
  3. Example scenarios for each mode

1158-1195: Documentation code snippets are completely inaccurate and do not match the actual implementation in grpc_p2p_client/shared/utils.go.

The documented functions have critical differences from the actual implementation:

  1. Function names: Documentation shows lowercase handleGossipSubTrace and handleOptimumP2PTrace, but actual functions are capitalized (HandleGossipSubTrace, HandleOptimumP2PTrace) following Go export conventions.

  2. Function signatures:

    • Documented: func handleGossipSubTrace(data []byte)
    • Actual: func HandleGossipSubTrace(data []byte, writeTrace bool, traceCh chan<- string)
  3. Implementation logic is fundamentally different:

    • Actual code extracts structured fields (peerID, recvID, msgID, topic, timestamp) using base58 encoding
    • Actual code outputs tab-separated formatted strings to channels or printf
    • Documented code shows fictional JSON marshaling that doesn't exist in actual implementation
    • Documented code shows switch statements on shard types (NEW_SHARD, DUPLICATE_SHARD) that don't exist in actual code
  4. Error handling: Actual implementation calls HeadHex(data, 64) in error messages; documented version doesn't.

Replace the code snippets with accurate implementations that match the actual shared/utils.go file.

📜 Review details

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df14d26 and 3b65404.

📒 Files selected for processing (4)
  • .github/workflows/ci.yml (1 hunks)
  • Makefile (1 hunks)
  • docs/guide.md (4 hunks)
  • readme.md (1 hunks)
🧰 Additional context used
📓 Path-based instructions (4)
**/*

⚙️ CodeRabbit configuration file

**/*: When you find the same underlying issue in multiple locations (same pattern, same fix):

  • Do NOT post separate comments for each occurrence.
  • Post a single primary comment on the first occurrence.
  • In that comment, include a short list of “Also at: file:line” references
    for the other locations (e.g. 'Also at: foo.go:42, bar.go:17').
    Prioritize signal over volume: one strong comment that references multiple
    locations is preferred over many near-identical comments.

Files:

  • readme.md
  • docs/guide.md
  • Makefile
**/*.md

⚙️ CodeRabbit configuration file

**/*.md: - Technical accuracy first; keep commands copy-pastable; prefer minimal prerequisites.

Files:

  • readme.md
  • docs/guide.md
.github/workflows/**

⚙️ CodeRabbit configuration file

Check permissions, concurrency, timeout-minutes, use actions/checkout@v4 with fetch-depth: 0 for tools that need full history; prefer pinned SHAs for third-party actions; validate matrix bounds and secrets exposure.

Files:

  • .github/workflows/ci.yml
Makefile*

⚙️ CodeRabbit configuration file

  • Provide phony targets; stable reproducible builds; avoid environment-specific paths.

Files:

  • Makefile
🔇 Additional comments (5)
readme.md (1)

133-139: Documentation structure is accurate and well-organized.

The updated repository structure documentation clearly reflects the new cmd/ subdirectory layout with proper hierarchy and descriptions. Examples throughout the document correctly reference the binary paths and command-line flags.

.github/workflows/ci.yml (2)

35-41: CI build configuration correctly handles three separate binaries.

The updated build step properly sequences compilation of all three P2P clients with appropriate directory navigation and success confirmation. Workflow permissions are correctly scoped and action versions are current.


15-21: Permissions configuration is appropriate and well-documented.

Each permission is justified by inline comments explaining why it's needed (read for analysis, write for annotations). This aligns with security best practice of least privilege.

docs/guide.md (2)

271-272: Version updates applied consistently across documentation.

The rc13 → rc16 version bump is reflected uniformly in the .env example, Proxy API responses, and Node API responses. ✓

Also applies to: 575-575, 1084-1084


13-14: I'll help you verify this review comment about binary path inconsistencies. Let me start by exploring the repository structure and build configuration to understand the actual state of binary paths.
<function_calls>

#!/bin/bash

Explore repository structure to understand binary organization

echo "=== Checking for cmd/ directory structure ==="
find . -type d -name "cmd" -o -name "grpc_p2p_client" 2>/dev/null | head -20

echo -e "\n=== Listing root-level directories ==="
ls -la | grep "^d" | head -20

echo -e "\n=== Looking for Makefile or build files ==="
find . -maxdepth 2 -type f ( -name "Makefile" -o -name "*.mk" -o -name "go.mod" ) 2>/dev/null


</function_calls>

<function_calls>

#!/bin/bash

Check if cmd/ directory exists with subdirectories

echo "=== Checking cmd/ structure ==="
if [ -d "cmd" ]; then
find cmd -type d | head -20
echo -e "\n=== Files in cmd/ ==="
find cmd -type f -name "*.go" -o -name "main.go" | head -20
fi

echo -e "\n=== Checking grpc_p2p_client/ ==="
if [ -d "grpc_p2p_client" ]; then
ls -la grpc_p2p_client/
fi


</function_calls>

<function_calls>

#!/bin/bash

Get repository structure overview

echo "=== Top-level repository structure ==="
ls -la

echo -e "\n=== Check docs/guide.md existence and size ==="
if [ -f "docs/guide.md" ]; then
wc -l docs/guide.md
echo -e "\n=== Lines 13-14 from docs/guide.md ==="
sed -n '13,14p' docs/guide.md
fi


</function_calls>

<function_calls>

#!/bin/bash

Search for all occurrences of grpc_p2p_client in documentation

echo "=== Searching for grpc_p2p_client references in docs/guide.md ==="
if [ -f "docs/guide.md" ]; then
rg -n "grpc_p2p_client" docs/guide.md
fi

echo -e "\n=== Checking all binary path patterns in guide.md ==="
if [ -f "docs/guide.md" ]; then
rg -n "./.*p2p-client" docs/guide.md | head -30
fi


</function_calls>

<function_calls>

#!/bin/bash

Check for build configuration and binary output

echo "=== Looking for Makefile content ==="
if [ -f "Makefile" ]; then
cat Makefile | head -100
fi

echo -e "\n=== Looking for build.sh or similar ==="
find . -maxdepth 2 -type f -name "build" 2>/dev/null


</function_calls>

<function_calls>

#!/bin/bash

Check git info and PR-related files

echo "=== Git log summary (last 5 commits) ==="
git log --oneline -5 2>/dev/null || echo "Git not available"

echo -e "\n=== Checking current branch ==="
git branch 2>/dev/null || echo "Git not available"

echo -e "\n=== Looking for any recent changes to cmd/ or grpc_p2p_client ==="
git diff --name-status HEAD~1 2>/dev/null | grep -E "(cmd/|grpc_p2p_client)" || echo "No git diff available"


</function_calls>

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (8)
grpc_p2p_client/cmd/multi-subscribe/main.go (3)

68-82: Nested goroutine wrappers are unnecessary — remove them.

The outer go func() does nothing but launch an inner goroutine. This creates pointless indirection and potential confusion. Just launch shared.WriteToFile directly.

 	if *outputData != "" {
 		dataDone = make(chan bool)
-		go func() {
-			header := "receiver\tsender\tsize\tsha256(msg)"
-			go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header)
-		}()
+		header := "receiver\tsender\tsize\tsha256(msg)"
+		go shared.WriteToFile(ctx, dataCh, dataDone, *outputData, header)
 	}

 	if *outputTrace != "" {
 		traceDone = make(chan bool)
-		go func() {
-			header := ""
-			go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, header)
-		}()
+		go shared.WriteToFile(ctx, traceCh, traceDone, *outputTrace, "")
 	}

84-90: Discarding the error from receiveMessages hides failures.

If a node fails to connect or stream, you'll have no visibility. Log the error.

 	for _, ip := range ips {
 		wg.Add(1)
 		go func(ip string) {
 			defer wg.Done()
-			receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh)
+			if err := receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh); err != nil {
+				log.Printf("[%s] receiveMessages failed: %v", ip, err)
+			}
 		}(ip)
 	}

176-179: Unbounded goroutine spawning per message is a resource risk.

Under high message volume, this can exhaust memory and degrade performance. HandleResponseWithTracking primarily formats data and sends to a buffered channel — process inline.

 		case resp, ok := <-msgChan:
 			if !ok {
 				log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount))
 				return nil
 			}
-			go func(resp *protobuf.Response) {
-				shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh)
-			}(resp)
+			shared.HandleResponseWithTracking(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh)
grpc_p2p_client/cmd/single/main.go (2)

120-123: Unbounded goroutine spawning per message — process inline instead.

Same issue as multi-subscribe. HandleResponse is lightweight; spawning a goroutine per message adds overhead without benefit.

 		case resp, ok := <-msgChan:
 			if !ok {
 				log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount))
 				return
 			}
-			go func(resp *protobuf.Response) {
-				shared.HandleResponse(resp, &receivedCount)
-			}(resp)
+			shared.HandleResponse(resp, &receivedCount)

130-132: Empty message validation has an edge case when count > 1.

The condition msg == "" && count == 1 allows empty messages when publishing multiple times. If multi-message mode intentionally generates random data (lines 143-150), add a clarifying comment. Otherwise, fix the logic.

-	if msg == "" && count == 1 {
+	// When count > 1, random payloads are generated, so msg is optional
+	if msg == "" && count == 1 {
 		log.Fatalf("−msg is required in publish mode")
 	}
grpc_p2p_client/shared/utils.go (3)

212-216: log.Fatal terminates the entire process on file creation failure.

This is a shared utility called from goroutines. A single file creation error shouldn't crash the whole program. Return an error instead.

-func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) {
+func WriteToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) error {
 	file, err := os.Create(filename)
 	if err != nil {
-		log.Fatal(err)
+		log.Printf("failed to create file %s: %v", filename, err)
+		done <- false
+		return err
 	}
 	defer file.Close()

Callers would need to be updated to handle the return value, but this prevents catastrophic failure.


229-240: Context cancellation returns without signaling the done channel — callers block forever.

When ctx.Done() fires, the function returns at line 232 without sending to done. The caller at line 96-99 (multi-subscribe) will block indefinitely on <-dataDone.

 	for data := range dataCh {
 		select {
 		case <-ctx.Done():
+			done <- true
-			return
+			return ctx.Err()
 		default:
 		}
 		_, err := writer.WriteString(data + "\n")
-		writer.Flush()
 		if err != nil {
 			log.Printf("Write error: %v", err)
 		}
 	}

235-236: Per-write Flush() defeats the purpose of buffered I/O.

Flushing after every single line is inefficient. Rely on the deferred flush at function exit, or flush periodically (e.g., every N writes or on a timer).

 		_, err := writer.WriteString(data + "\n")
-		writer.Flush()
 		if err != nil {
📜 Review details

Configuration used: Repository: getoptimum/coderabbit/.coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b65404 and e44f43e.

📒 Files selected for processing (3)
  • grpc_p2p_client/cmd/multi-subscribe/main.go (1 hunks)
  • grpc_p2p_client/cmd/single/main.go (1 hunks)
  • grpc_p2p_client/shared/utils.go (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*

⚙️ CodeRabbit configuration file

**/*: When you find the same underlying issue in multiple locations (same pattern, same fix):

  • Do NOT post separate comments for each occurrence.
  • Post a single primary comment on the first occurrence.
  • In that comment, include a short list of “Also at: file:line” references
    for the other locations (e.g. 'Also at: foo.go:42, bar.go:17').
    Prioritize signal over volume: one strong comment that references multiple
    locations is preferred over many near-identical comments.

Files:

  • grpc_p2p_client/cmd/multi-subscribe/main.go
  • grpc_p2p_client/cmd/single/main.go
  • grpc_p2p_client/shared/utils.go
**/*.go

⚙️ CodeRabbit configuration file

Review Go code for: idiomatic patterns, error handling (check all errors), concurrency safety (context propagation, goroutine cleanup), test coverage for changed logic, exported API stability, memory efficiency (avoid unnecessary allocations), and prefer standard library over third-party when reasonable.

Files:

  • grpc_p2p_client/cmd/multi-subscribe/main.go
  • grpc_p2p_client/cmd/single/main.go
  • grpc_p2p_client/shared/utils.go
🧬 Code graph analysis (1)
grpc_p2p_client/shared/utils.go (1)
grpc_p2p_client/shared/types.go (1)
  • P2PMessage (4-9)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test Docker Setup
🔇 Additional comments (4)
grpc_p2p_client/cmd/single/main.go (2)

62-68: os.Exit(0) is missing, but that's correct — previous issue was already addressed.

The signal handler now only calls cancel() without os.Exit(0), allowing deferred cleanup to run properly.


144-150: Error from rand.Read uses log.Fatalf which is fine here, but check the return value properly.

rand.Read from crypto/rand returns (n int, err error). The code correctly checks err, but doesn't verify that n == 4. In practice, crypto/rand.Read will either fill the buffer or error, so this is acceptable.

grpc_p2p_client/shared/utils.go (2)

25-48: ReadIPsFromFile is well-implemented.

Proper error handling, comment filtering, and scanner error checking. Clean implementation.


57-77: HandleResponse now correctly uses time.Now().UnixNano() — past issue resolved.

The previous concern about an uninitialized timeNow variable has been addressed.

hpsing
hpsing previously approved these changes Nov 27, 2025
@swarna1101 swarna1101 merged commit 3bbe399 into main Nov 27, 2025
6 checks passed
@swarna1101 swarna1101 deleted the fix/refactor-grpc-client-multiple-main branch November 27, 2025 10:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix: refactor the multi publisher/subscriber script in correct package

3 participants