From 3e63ec05dea72fc1d7c7961c9e58a87bbbe5997a Mon Sep 17 00:00:00 2001 From: kishori82 Date: Fri, 17 Oct 2025 07:25:21 -0400 Subject: [PATCH 01/19] added a multi subscription --- grpc_p2p_client/local.ips.tsv | 50 ++++ grpc_p2p_client/p2p_client.go | 56 +--- .../p2p_client_multi_streams_publish.go | 220 ++++++++++++++ .../p2p_client_multi_streams_subscribe.go | 273 ++++++++++++++++++ 4 files changed, 544 insertions(+), 55 deletions(-) create mode 100644 grpc_p2p_client/local.ips.tsv create mode 100644 grpc_p2p_client/p2p_client_multi_streams_publish.go create mode 100644 grpc_p2p_client/p2p_client_multi_streams_subscribe.go diff --git a/grpc_p2p_client/local.ips.tsv b/grpc_p2p_client/local.ips.tsv new file mode 100644 index 0000000..75b0f3a --- /dev/null +++ b/grpc_p2p_client/local.ips.tsv @@ -0,0 +1,50 @@ +localhost:33212 +localhost:33213 +localhost:33214 +localhost:33215 +localhost:33216 +localhost:33217 +localhost:33218 +localhost:33219 +localhost:33220 +localhost:33221 +localhost:33222 +localhost:33223 +localhost:33224 +localhost:33225 +localhost:33226 +localhost:33227 +localhost:33228 +localhost:33229 +localhost:33230 +localhost:33231 +localhost:33232 +localhost:33233 +localhost:33234 +localhost:33235 +localhost:33236 +localhost:33237 +localhost:33238 +localhost:33239 +localhost:33240 +localhost:33241 +localhost:33242 +localhost:33243 +localhost:33244 +localhost:33245 +localhost:33246 +localhost:33247 +localhost:33248 +localhost:33249 +localhost:33250 +localhost:33251 +localhost:33252 +localhost:33253 +localhost:33254 +localhost:33255 +localhost:33256 +localhost:33257 +localhost:33258 +localhost:33259 +localhost:33260 +localhost:33261 diff --git a/grpc_p2p_client/p2p_client.go b/grpc_p2p_client/p2p_client.go index aacd959..648905b 100644 --- a/grpc_p2p_client/p2p_client.go +++ b/grpc_p2p_client/p2p_client.go @@ -17,10 +17,7 @@ import ( "time" protobuf "p2p_client/grpc" - optsub "p2p_client/grpc/mump2p_trace" - "github.com/gogo/protobuf/proto" - pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -197,17 +194,11 @@ func handleResponse(resp *protobuf.Response, counter *int32) { log.Printf("Error unmarshalling message: %v", err) return } - n := atomic.AddInt32(counter, 1) - currentTime := time.Now().UnixNano() messageSize := len(p2pMessage.Message) //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) - case protobuf.ResponseType_MessageTraceGossipSub: - handleGossipSubTrace(resp.GetData()) - case protobuf.ResponseType_MessageTraceOptimumP2P: - handleOptimumP2PTrace(resp.GetData()) + fmt.Printf("Recv message: %d %s\n\n", messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_Unknown: default: log.Println("Unknown response command:", resp.GetCommand()) @@ -220,48 +211,3 @@ func headHex(b []byte, n int) string { } return hex.EncodeToString(b) } - -func handleGossipSubTrace(data []byte) { - evt := &pubsubpb.TraceEvent{} - if err := proto.Unmarshal(data, evt); err != nil { - fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", - err, len(data), headHex(data, 64)) - return - } - - ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) - // print type - fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) - jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) -} - -func handleOptimumP2PTrace(data []byte) { - evt := &optsub.TraceEvent{} - if err := proto.Unmarshal(data, evt); err != nil { - fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) - return - } - - // human-readable timestamp - ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) - - // print type - typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] - fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) - - // if shard-related - switch evt.GetType() { - case optsub.TraceEvent_NEW_SHARD: - fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) - case optsub.TraceEvent_DUPLICATE_SHARD: - fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) - case optsub.TraceEvent_UNHELPFUL_SHARD: - fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) - case optsub.TraceEvent_UNNECESSARY_SHARD: - fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) - } - - jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] OptimumP2P JSON (%dB): %s\n", len(jb), string(jb)) -} diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go new file mode 100644 index 0000000..a5733e2 --- /dev/null +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -0,0 +1,220 @@ +package main + +import ( + "bufio" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "math" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + protobuf "p2p_client/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// P2PMessage represents a message structure used in P2P communication +type P2PMessage struct { + MessageID string // Unique identifier for the message + Topic string // Topic name where the message was published + Message []byte // Actual message data + SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) +} + +// Command possible operation that sidecar may perform with p2p node +type Command int32 + +const ( + CommandUnknown Command = iota + CommandPublishData + CommandSubscribeToTopic + CommandUnSubscribeToTopic +) + +var ( + topic = flag.String("topic", "", "topic name") + + // optional: number of messages to publish (for stress testing or batch sending) + count = flag.Int("count", 1, "number of messages to publish") + // optional: sleep duration between publishes + sleep = flag.Duration("sleep", 0, "optional delay between publishes (e.g., 1s, 500ms)") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + numips = flag.Int("num-ip-use", 0, "default 0, -1 use all") +) + +func main() { + flag.Parse() + if *topic == "" { + log.Fatalf("−topic is required") + } + + ips, err := readIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) + + if *numips >= 0 { + ips = ips[:min(len(ips), *numips)] + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nShutting down gracefully…") + cancel() + }() + + + // Launch goroutines with synchronization + var wg sync.WaitGroup + for _, ip := range ips { + wg.Add(1) + go func(ip string) { + defer wg.Done() + data := ip + sendMessages(ctx, ip, data) + }(ip) + } + + wg.Wait() + +} + +func sendMessages(ctx context.Context, ip string, message string) error { + // connect with simple gRPC settings + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + +/* +conn, err := grpc.Dial(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), +) +*/ + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + defer conn.Close() + println(fmt.Sprintf("Connected to node at: %s…", ip)) + + client := protobuf.NewCommandStreamClient(conn) + + stream, err := client.ListenCommands(ctx) + + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + for i := 0; i < *count; i++ { + start := time.Now() + var data []byte + //currentTime := time.Now().UnixNano() + randomBytes := make([]byte, 10) + if _, err := rand.Read(randomBytes); err != nil { + log.Fatalf("failed to generate random bytes: %v", err) + } + + randomSuffix := hex.EncodeToString(randomBytes) + data = []byte(fmt.Sprintf("%s-%s", message, randomSuffix)) + pubReq := &protobuf.Request{ + Command: int32(CommandPublishData), + Topic: *topic, + Data: data, + } + if err := stream.Send(pubReq); err != nil { + log.Fatalf("send publish: %v", err) + } + + elapsed := time.Since(start) + fmt.Printf("Published %q to %q (took %v)\n", string(data), *topic, elapsed) + + if *sleep > 0 { + time.Sleep(*sleep) + } + } + + return nil + +} + +func readIPsFromFile(filename string) ([]string, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var ips []string + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + ips = append(ips, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading file: %w", err) + } + + return ips, nil +} + +func handleResponse(resp *protobuf.Response, counter *int32) { + switch resp.GetCommand() { + case protobuf.ResponseType_Message: + var p2pMessage P2PMessage + if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { + log.Printf("Error unmarshalling message: %v", err) + return + } + n := atomic.AddInt32(counter, 1) + + currentTime := time.Now().UnixNano() + messageSize := len(p2pMessage.Message) + + //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) + fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) + default: + log.Println("Unknown response command:", resp.GetCommand()) + } +} + +func headHex(b []byte, n int) string { + if len(b) > n { + b = b[:n] + } + return hex.EncodeToString(b) +} diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go new file mode 100644 index 0000000..2a60b6d --- /dev/null +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -0,0 +1,273 @@ +package main + +import ( + "bufio" + "context" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "math" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + protobuf "p2p_client/grpc" + optsub "p2p_client/grpc/mump2p_trace" + + "github.com/gogo/protobuf/proto" + pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// P2PMessage represents a message structure used in P2P communication +type P2PMessage struct { + MessageID string // Unique identifier for the message + Topic string // Topic name where the message was published + Message []byte // Actual message data + SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) +} + +// Command possible operation that sidecar may perform with p2p node +type Command int32 + +const ( + CommandUnknown Command = iota + CommandPublishData + CommandSubscribeToTopic + CommandUnSubscribeToTopic +) + +var ( + topic = flag.String("topic", "", "topic name") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "default 0" ) + endIdx = flag.Int("end-index", 10000, "default 0" ) +) + +func main() { + flag.Parse() + if *topic == "" { + log.Fatalf("−topic is required") + } + + _ips, err := readIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + fmt.Printf("numip %d index %d\n",len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) + ips := _ips[*startIdx:*endIdx] + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) + + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nShutting down gracefully…") + cancel() + }() + + // Launch goroutines with synchronization + var wg sync.WaitGroup + for _, ip := range ips { + wg.Add(1) + go func(ip string) { + defer wg.Done() + data := ip + receiveMessages(ctx, ip, data) + }(ip) + } + + wg.Wait() +} + +func receiveMessages(ctx context.Context, ip string, message string) error { + // connect with simple gRPC settings + //fmt.Println("Starting ", ip) + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + defer conn.Close() + + client := protobuf.NewCommandStreamClient(conn) + + stream, err := client.ListenCommands(ctx) + + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + println(fmt.Sprintf("Connected to node at: %s…", ip)) + println(fmt.Sprintf("Trying to subscribe to topic %s…", *topic)) + subReq := &protobuf.Request{ + Command: int32(CommandSubscribeToTopic), + Topic: *topic, + } + if err := stream.Send(subReq); err != nil { + log.Fatalf("send subscribe: %v", err) + } + fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) + + var receivedCount int32 + msgChan := make(chan *protobuf.Response, 10000) + + // recv goroutine + go func() { + for { + resp, err := stream.Recv() + if err == io.EOF { + close(msgChan) + return + } + if err != nil { + log.Printf("recv error: %v", err) + close(msgChan) + return + } + msgChan <- resp + } + }() + + // message handler loop + for { + select { + case <-ctx.Done(): + log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return nil + case resp, ok := <-msgChan: + if !ok { + log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return nil + } + go func(resp *protobuf.Response) { + handleResponse(ip, resp, &receivedCount) + }(resp) + } + } + + return nil +} + +func readIPsFromFile(filename string) ([]string, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var ips []string + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + ips = append(ips, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading file: %w", err) + } + + return ips, nil +} + +func handleResponse(ip string, resp *protobuf.Response, counter *int32) { + switch resp.GetCommand() { + case protobuf.ResponseType_Message: + var p2pMessage P2PMessage + if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { + log.Printf("Error unmarshalling message: %v", err) + return + } + n := atomic.AddInt32(counter, 1) + + currentTime := time.Now().UnixNano() + messageSize := len(p2pMessage.Message) + + //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) + fmt.Printf("Recv message: [%s] [%d] [%d %d] %s\n\n", ip, n, currentTime, messageSize, string(p2pMessage.Message)) + default: + log.Println("Unknown response command:", resp.GetCommand()) + } +} + +func headHex(b []byte, n int) string { + if len(b) > n { + b = b[:n] + } + return hex.EncodeToString(b) +} + +func handleGossipSubTrace(data []byte) { + evt := &pubsubpb.TraceEvent{} + if err := proto.Unmarshal(data, evt); err != nil { + fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", + err, len(data), headHex(data, 64)) + return + } + + ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) + // print type + fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) + jb, _ := json.Marshal(evt) + fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) +} + +func handleOptimumP2PTrace(data []byte) { + evt := &optsub.TraceEvent{} + if err := proto.Unmarshal(data, evt); err != nil { + fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) + return + } + + // human-readable timestamp + ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) + + // print type + typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] + fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) + + // if shard-related + switch evt.GetType() { + case optsub.TraceEvent_NEW_SHARD: + fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) + case optsub.TraceEvent_DUPLICATE_SHARD: + fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) + case optsub.TraceEvent_UNHELPFUL_SHARD: + fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) + case optsub.TraceEvent_UNNECESSARY_SHARD: + fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) + } + + jb, _ := json.Marshal(evt) + fmt.Printf("[TRACE] OptimumP2P JSON (%dB): %s\n", len(jb), string(jb)) +} From a45dd16f4a069f4ea5c2c02fb4218b48b67d2640 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Thu, 23 Oct 2025 08:10:49 -0400 Subject: [PATCH 02/19] add a multi subscribe script --- grpc_p2p_client/p2p_client_multi_streams_subscribe.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 2a60b6d..21ded2c 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -215,6 +215,8 @@ func handleResponse(ip string, resp *protobuf.Response, counter *int32) { //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) fmt.Printf("Recv message: [%s] [%d] [%d %d] %s\n\n", ip, n, currentTime, messageSize, string(p2pMessage.Message)) + case protobuf.ResponseType_MessageTraceOptimumP2P: + var p2pMessage P2PMessage default: log.Println("Unknown response command:", resp.GetCommand()) } From 21f3c1efa2ad40b964dfd7d9849bc93536304e47 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Thu, 23 Oct 2025 09:50:42 -0400 Subject: [PATCH 03/19] added prints event information --- .../p2p_client_multi_streams_subscribe.go | 71 +++++++++++++------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 21ded2c..44290f6 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -46,10 +46,10 @@ const ( ) var ( - topic = flag.String("topic", "", "topic name") - ipfile = flag.String("ipfile", "", "file with a list of IP addresses") - startIdx = flag.Int("start-index", 0, "default 0" ) - endIdx = flag.Int("end-index", 10000, "default 0" ) + topic = flag.String("topic", "", "topic name") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "default 0") + endIdx = flag.Int("end-index", 10000, "default 0") ) func main() { @@ -63,12 +63,11 @@ func main() { fmt.Printf("Error: %v\n", err) return } - fmt.Printf("numip %d index %d\n",len(_ips), *endIdx) - *endIdx = min(len(_ips), *endIdx) + fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) ips := _ips[*startIdx:*endIdx] fmt.Printf("Found %d IPs: %v\n", len(ips), ips) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() c := make(chan os.Signal, 1) @@ -216,7 +215,7 @@ func handleResponse(ip string, resp *protobuf.Response, counter *int32) { //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) fmt.Printf("Recv message: [%s] [%d] [%d %d] %s\n\n", ip, n, currentTime, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_MessageTraceOptimumP2P: - var p2pMessage P2PMessage + handleOptimumP2PTrace(resp.GetData()) default: log.Println("Unknown response command:", resp.GetCommand()) } @@ -252,24 +251,54 @@ func handleOptimumP2PTrace(data []byte) { } // human-readable timestamp - ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) + //ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) // print type typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] - fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) + //fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) + //fmt.Printf("[TRACE] OptimumP2P type=%s msg_id=%x time=%d, recvr_id=%s, size=%dB\n", + // typeStr, evt.GetDuplicateShard().GetMessageID(), time.Unix(0, evt.GetTimestamp()), evt.GetPeerID(), len(data)) // if shard-related - switch evt.GetType() { - case optsub.TraceEvent_NEW_SHARD: - fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) - case optsub.TraceEvent_DUPLICATE_SHARD: - fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) - case optsub.TraceEvent_UNHELPFUL_SHARD: - fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) - case optsub.TraceEvent_UNNECESSARY_SHARD: - fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) - } + /* + switch evt.GetType() { + case optsub.TraceEvent_NEW_SHARD: + fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) + case optsub.TraceEvent_DUPLICATE_SHARD: + fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) + case optsub.TraceEvent_UNHELPFUL_SHARD: + fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) + case optsub.TraceEvent_UNNECESSARY_SHARD: + fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) + } + */ + + // if shard-related + /* + switch evt.GetType() { + case optsub.TraceEvent_NEW_SHARD: + fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) + case optsub.TraceEvent_DUPLICATE_SHARD: + fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) + case optsub.TraceEvent_UNHELPFUL_SHARD: + fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) + case optsub.TraceEvent_UNNECESSARY_SHARD: + fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) + } + */ jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] OptimumP2P JSON (%dB): %s\n", len(jb), string(jb)) + fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + + /* + message_type <- systems information + message_id <- application layer + time_stamp <- event occuring the event publish, new shard, duplicate shard + receiver_id + sender_id + + + + */ + } From 9c2b241fb48626f8f15ea0caeb7131ad4685f254 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Fri, 24 Oct 2025 10:21:32 -0400 Subject: [PATCH 04/19] added start and end index options --- .../p2p_client_multi_streams_publish.go | 25 ++++++++++--------- .../p2p_client_multi_streams_subscribe.go | 4 +-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index a5733e2..bfd2930 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -47,9 +47,11 @@ var ( // optional: number of messages to publish (for stress testing or batch sending) count = flag.Int("count", 1, "number of messages to publish") // optional: sleep duration between publishes - sleep = flag.Duration("sleep", 0, "optional delay between publishes (e.g., 1s, 500ms)") - ipfile = flag.String("ipfile", "", "file with a list of IP addresses") - numips = flag.Int("num-ip-use", 0, "default 0, -1 use all") + sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") + ) func main() { @@ -58,16 +60,15 @@ func main() { log.Fatalf("−topic is required") } - ips, err := readIPsFromFile(*ipfile) - if err != nil { - fmt.Printf("Error: %v\n", err) - return - } - fmt.Printf("Found %d IPs: %v\n", len(ips), ips) - - if *numips >= 0 { - ips = ips[:min(len(ips), *numips)] + _ips, err := readIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return } + fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) + ips := _ips[*startIdx:*endIdx] + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 44290f6..20a2ae0 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -48,8 +48,8 @@ const ( var ( topic = flag.String("topic", "", "topic name") ipfile = flag.String("ipfile", "", "file with a list of IP addresses") - startIdx = flag.Int("start-index", 0, "default 0") - endIdx = flag.Int("end-index", 10000, "default 0") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") ) func main() { From b92255fd92b3e187c719bc0df0f76e88e7654f0d Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 25 Oct 2025 09:05:11 -0400 Subject: [PATCH 05/19] write subscriptions --- .../p2p_client_multi_streams_publish.go | 76 +++++++++++++---- .../p2p_client_multi_streams_subscribe.go | 84 +++++++++++++++---- 2 files changed, 129 insertions(+), 31 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index bfd2930..c48b883 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "crypto/rand" + "crypto/sha256" "encoding/hex" "encoding/json" "flag" @@ -46,11 +47,13 @@ var ( // optional: number of messages to publish (for stress testing or batch sending) count = flag.Int("count", 1, "number of messages to publish") + dataSize = flag.Int("datasize", 100, "size of random of messages to publish") // optional: sleep duration between publishes sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") ipfile = flag.String("ipfile", "", "file with a list of IP addresses") startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") endIdx = flag.Int("end-index", 10000, "index-1") + output = flag.String("output", "", "file to write the outgoing data hashes") ) @@ -80,23 +83,36 @@ func main() { cancel() }() + // Buffered channel to prevent blocking + dataCh := make(chan string, 100) + done := make(chan bool) + + var wg sync.WaitGroup + // Start writing the has of the published data + if *output != "" { + wg.Add(1) + go func() { + defer wg.Done() + go writeHashToFile(dataCh, done, *output) + }() + } // Launch goroutines with synchronization - var wg sync.WaitGroup for _, ip := range ips { wg.Add(1) go func(ip string) { defer wg.Done() - data := ip - sendMessages(ctx, ip, data) + datasize := *dataSize + sendMessages(ctx, ip, datasize, *output!="", dataCh) }(ip) } - wg.Wait() + close(dataCh) + <-done } -func sendMessages(ctx context.Context, ip string, message string) error { +func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string ) error { // connect with simple gRPC settings select { case <-ctx.Done(): @@ -105,15 +121,6 @@ func sendMessages(ctx context.Context, ip string, message string) error { default: } -/* -conn, err := grpc.Dial(ip, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt), - grpc.MaxCallSendMsgSize(math.MaxInt), - ), -) -*/ conn, err := grpc.NewClient(ip, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( @@ -139,24 +146,33 @@ conn, err := grpc.Dial(ip, start := time.Now() var data []byte //currentTime := time.Now().UnixNano() - randomBytes := make([]byte, 10) + randomBytes := make([]byte, datasize) if _, err := rand.Read(randomBytes); err != nil { log.Fatalf("failed to generate random bytes: %v", err) } randomSuffix := hex.EncodeToString(randomBytes) - data = []byte(fmt.Sprintf("%s-%s", message, randomSuffix)) + data = []byte(fmt.Sprintf("%s-%s", ip, randomSuffix)) pubReq := &protobuf.Request{ Command: int32(CommandPublishData), Topic: *topic, Data: data, } + if err := stream.Send(pubReq); err != nil { log.Fatalf("send publish: %v", err) } elapsed := time.Since(start) - fmt.Printf("Published %q to %q (took %v)\n", string(data), *topic, elapsed) + + hash := sha256.Sum256(data) + hexHashString := hex.EncodeToString(hash[:]) + var dataToSend string + if write == true { + dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) + dataCh <- dataToSend + } + fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed) if *sleep > 0 { time.Sleep(*sleep) @@ -219,3 +235,29 @@ func headHex(b []byte, n int) string { } return hex.EncodeToString(b) } + + +func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string) { + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + writer := bufio.NewWriter(file) + defer writer.Flush() + + // Process until channel is closed + for data := range dataCh { + _, err := writer.WriteString(data + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") + +} + + + diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 20a2ae0..1fcdbf5 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -3,6 +3,7 @@ package main import ( "bufio" "context" + "crypto/sha256" "encoding/hex" "encoding/json" "flag" @@ -50,6 +51,7 @@ var ( ipfile = flag.String("ipfile", "", "file with a list of IP addresses") startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") endIdx = flag.Int("end-index", 10000, "index-1") + output = flag.String("output", "", "file to write the outgoing data hashes") ) func main() { @@ -78,21 +80,37 @@ func main() { cancel() }() + + // Buffered channel to prevent blocking + dataCh := make(chan string, 100) + done := make(chan bool) + // Launch goroutines with synchronization var wg sync.WaitGroup + // Start writing the has of the published data + if *output != "" { + wg.Add(1) + go func() { + defer wg.Done() + go writeHashToFile(ctx, dataCh, done, *output) + }() + } + + for _, ip := range ips { wg.Add(1) go func(ip string) { defer wg.Done() - data := ip - receiveMessages(ctx, ip, data) + receiveMessages(ctx, ip, *output!="", dataCh) }(ip) } wg.Wait() + close(dataCh) + <- done } -func receiveMessages(ctx context.Context, ip string, message string) error { +func receiveMessages(ctx context.Context, ip string, write bool, dataCh chan<- string) error { // connect with simple gRPC settings //fmt.Println("Starting ", ip) select { @@ -165,7 +183,7 @@ func receiveMessages(ctx context.Context, ip string, message string) error { return nil } go func(resp *protobuf.Response) { - handleResponse(ip, resp, &receivedCount) + handleResponse(ip, resp, &receivedCount, write, dataCh) }(resp) } } @@ -199,7 +217,7 @@ func readIPsFromFile(filename string) ([]string, error) { return ips, nil } -func handleResponse(ip string, resp *protobuf.Response, counter *int32) { +func handleResponse(ip string, resp *protobuf.Response, counter *int32, write bool, dataCh chan<- string) { switch resp.GetCommand() { case protobuf.ResponseType_Message: var p2pMessage P2PMessage @@ -207,13 +225,23 @@ func handleResponse(ip string, resp *protobuf.Response, counter *int32) { log.Printf("Error unmarshalling message: %v", err) return } - n := atomic.AddInt32(counter, 1) - - currentTime := time.Now().UnixNano() - messageSize := len(p2pMessage.Message) - - //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: [%s] [%d] [%d %d] %s\n\n", ip, n, currentTime, messageSize, string(p2pMessage.Message)) + _ = atomic.AddInt32(counter, 1) + + + hash := sha256.Sum256(p2pMessage.Message) + hexHashString := hex.EncodeToString(hash[:]) + + parts := strings.Split(string(p2pMessage.Message), "-") + if len(parts) > 0 { + publisher := parts[0] + var dataToSend string + if write == true { + dataToSend = fmt.Sprintf("%s\t%s\t%d\t%s",publisher, ip, len(p2pMessage.Message), hexHashString) + dataCh <- dataToSend + } + } + + //fmt.Printf("Recv message: %s %d %s\n", ip, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_MessageTraceOptimumP2P: handleOptimumP2PTrace(resp.GetData()) default: @@ -297,8 +325,36 @@ func handleOptimumP2PTrace(data []byte) { receiver_id sender_id - - */ } + +func writeHashToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string) { + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + writer := bufio.NewWriter(file) + defer writer.Flush() + + // Process until channel is closed + for data := range dataCh { + select { + case <-ctx.Done(): + return + default: + + } + _, err := writer.WriteString(data + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") +} + + + From 18b273967f68936b36938fb7c9b407f69706466d Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 25 Oct 2025 19:40:47 -0400 Subject: [PATCH 06/19] added the traces and data output choices --- .../p2p_client_multi_streams_publish.go | 129 ++++++------ .../p2p_client_multi_streams_subscribe.go | 188 +++++++++--------- 2 files changed, 159 insertions(+), 158 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index c48b883..1432634 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -19,9 +19,9 @@ import ( "syscall" "time" - protobuf "p2p_client/grpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + protobuf "p2p_client/grpc" ) // P2PMessage represents a message structure used in P2P communication @@ -43,18 +43,17 @@ const ( ) var ( - topic = flag.String("topic", "", "topic name") + topic = flag.String("topic", "", "topic name") // optional: number of messages to publish (for stress testing or batch sending) - count = flag.Int("count", 1, "number of messages to publish") + count = flag.Int("count", 1, "number of messages to publish") dataSize = flag.Int("datasize", 100, "size of random of messages to publish") // optional: sleep duration between publishes - sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") - ipfile = flag.String("ipfile", "", "file with a list of IP addresses") - startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") - endIdx = flag.Int("end-index", 10000, "index-1") - output = flag.String("output", "", "file to write the outgoing data hashes") - + sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") + output = flag.String("output", "", "file to write the outgoing data hashes") ) func main() { @@ -63,15 +62,15 @@ func main() { log.Fatalf("−topic is required") } - _ips, err := readIPsFromFile(*ipfile) - if err != nil { - fmt.Printf("Error: %v\n", err) - return - } - fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) - *endIdx = min(len(_ips), *endIdx) - ips := _ips[*startIdx:*endIdx] - fmt.Printf("Found %d IPs: %v\n", len(ips), ips) + _ips, err := readIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) + ips := _ips[*startIdx:*endIdx] + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -83,18 +82,18 @@ func main() { cancel() }() - // Buffered channel to prevent blocking - dataCh := make(chan string, 100) - done := make(chan bool) - + // Buffered channel to prevent blocking + dataCh := make(chan string, 100) + done := make(chan bool) + var wg sync.WaitGroup - // Start writing the has of the published data + // Start writing the has of the published data if *output != "" { - wg.Add(1) - go func() { - defer wg.Done() - go writeHashToFile(dataCh, done, *output) - }() + wg.Add(1) + go func() { + defer wg.Done() + go writeHashToFile(dataCh, done, *output) + }() } // Launch goroutines with synchronization @@ -103,23 +102,23 @@ func main() { go func(ip string) { defer wg.Done() datasize := *dataSize - sendMessages(ctx, ip, datasize, *output!="", dataCh) + sendMessages(ctx, ip, datasize, *output != "", dataCh) }(ip) } wg.Wait() - close(dataCh) - <-done + close(dataCh) + <-done } -func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string ) error { +func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { // connect with simple gRPC settings - select { - case <-ctx.Done(): - log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() - default: - } + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } conn, err := grpc.NewClient(ip, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -158,20 +157,20 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, dat Topic: *topic, Data: data, } - + if err := stream.Send(pubReq); err != nil { log.Fatalf("send publish: %v", err) } elapsed := time.Since(start) - hash := sha256.Sum256(data) - hexHashString := hex.EncodeToString(hash[:]) - var dataToSend string - if write == true { - dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) - dataCh <- dataToSend - } + hash := sha256.Sum256(data) + hexHashString := hex.EncodeToString(hash[:]) + var dataToSend string + if write == true { + dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) + dataCh <- dataToSend + } fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed) if *sleep > 0 { @@ -236,28 +235,24 @@ func headHex(b []byte, n int) string { return hex.EncodeToString(b) } - func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string) { - file, err := os.Create(filename) - if err != nil { - log.Fatal(err) - } - defer file.Close() - - writer := bufio.NewWriter(file) - defer writer.Flush() - - // Process until channel is closed - for data := range dataCh { - _, err := writer.WriteString(data + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } - done <- true - fmt.Println("All data flushed to disk") - -} + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + writer := bufio.NewWriter(file) + defer writer.Flush() + // Process until channel is closed + for data := range dataCh { + _, err := writer.WriteString(data + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") +} diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 1fcdbf5..2e399d2 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -3,7 +3,7 @@ package main import ( "bufio" "context" - "crypto/sha256" + "crypto/sha256" "encoding/hex" "encoding/json" "flag" @@ -17,7 +17,6 @@ import ( "sync" "sync/atomic" "syscall" - "time" protobuf "p2p_client/grpc" optsub "p2p_client/grpc/mump2p_trace" @@ -47,11 +46,12 @@ const ( ) var ( - topic = flag.String("topic", "", "topic name") - ipfile = flag.String("ipfile", "", "file with a list of IP addresses") - startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") - endIdx = flag.Int("end-index", 10000, "index-1") - output = flag.String("output", "", "file to write the outgoing data hashes") + topic = flag.String("topic", "", "topic name") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") + outputTrace = flag.String("output-trace", "", "file to write the outgoing data hashes") + outputData = flag.String("output-data", "", "file to write the outgoing data hashes") ) func main() { @@ -80,37 +80,47 @@ func main() { cancel() }() - - // Buffered channel to prevent blocking - dataCh := make(chan string, 100) - done := make(chan bool) + // Buffered channel to prevent blocking + dataCh := make(chan string, 100) + traceCh := make(chan string, 100) + dataDone := make(chan bool) + traceDone := make(chan bool) // Launch goroutines with synchronization var wg sync.WaitGroup - // Start writing the has of the published data - if *output != "" { - wg.Add(1) - go func() { - defer wg.Done() - go writeHashToFile(ctx, dataCh, done, *output) - }() + if *outputData != "" { + wg.Add(1) + go func() { + defer wg.Done() + go writeToFile(ctx, dataCh, dataDone, *outputData) + }() } + if *outputTrace != "" { + wg.Add(1) + go func() { + defer wg.Done() + go writeToFile(ctx, traceCh, traceDone, *outputTrace) + }() + } for _, ip := range ips { wg.Add(1) go func(ip string) { defer wg.Done() - receiveMessages(ctx, ip, *output!="", dataCh) + receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) }(ip) } wg.Wait() - close(dataCh) - <- done + close(dataCh) + close(traceCh) + <-dataDone + <-traceDone } -func receiveMessages(ctx context.Context, ip string, write bool, dataCh chan<- string) error { +func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, + writeTrace bool, traceCh chan<- string) error { // connect with simple gRPC settings //fmt.Println("Starting ", ip) select { @@ -183,12 +193,11 @@ func receiveMessages(ctx context.Context, ip string, write bool, dataCh chan<- return nil } go func(resp *protobuf.Response) { - handleResponse(ip, resp, &receivedCount, write, dataCh) + handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) }(resp) } } - return nil } func readIPsFromFile(filename string) ([]string, error) { @@ -217,7 +226,9 @@ func readIPsFromFile(filename string) ([]string, error) { return ips, nil } -func handleResponse(ip string, resp *protobuf.Response, counter *int32, write bool, dataCh chan<- string) { +func handleResponse(ip string, resp *protobuf.Response, counter *int32, + writedata bool, dataCh chan<- string, writetrace bool, traceCh chan<- string) { + switch resp.GetCommand() { case protobuf.ResponseType_Message: var p2pMessage P2PMessage @@ -227,23 +238,24 @@ func handleResponse(ip string, resp *protobuf.Response, counter *int32, write bo } _ = atomic.AddInt32(counter, 1) + hash := sha256.Sum256(p2pMessage.Message) + hexHashString := hex.EncodeToString(hash[:]) - hash := sha256.Sum256(p2pMessage.Message) - hexHashString := hex.EncodeToString(hash[:]) + parts := strings.Split(string(p2pMessage.Message), "-") + if len(parts) > 0 { + publisher := parts[0] + var dataToSend string + if writedata == true { + dataToSend = fmt.Sprintf("%s\t%s\t%d\t%s", ip, publisher, len(p2pMessage.Message), hexHashString) + dataCh <- dataToSend + } + } - parts := strings.Split(string(p2pMessage.Message), "-") - if len(parts) > 0 { - publisher := parts[0] - var dataToSend string - if write == true { - dataToSend = fmt.Sprintf("%s\t%s\t%d\t%s",publisher, ip, len(p2pMessage.Message), hexHashString) - dataCh <- dataToSend - } - } - //fmt.Printf("Recv message: %s %d %s\n", ip, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_MessageTraceOptimumP2P: - handleOptimumP2PTrace(resp.GetData()) + handleOptimumP2PTrace(resp.GetData(), writetrace, traceCh) + case protobuf.ResponseType_MessageTraceGossipSub: + handleGossipSubTrace(resp.GetData(), writetrace, traceCh) default: log.Println("Unknown response command:", resp.GetCommand()) } @@ -256,31 +268,35 @@ func headHex(b []byte, n int) string { return hex.EncodeToString(b) } -func handleGossipSubTrace(data []byte) { +func handleGossipSubTrace(data []byte, writetrace bool, traceCh chan<- string) { evt := &pubsubpb.TraceEvent{} if err := proto.Unmarshal(data, evt); err != nil { fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", err, len(data), headHex(data, 64)) return } + typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] + //fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) + //fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) - ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) - // print type - fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) + //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + if writetrace { + dataToSend := fmt.Sprintf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + traceCh <- dataToSend + } else { + fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + } + } -func handleOptimumP2PTrace(data []byte) { +func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) { evt := &optsub.TraceEvent{} if err := proto.Unmarshal(data, evt); err != nil { fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) return } - // human-readable timestamp - //ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) - // print type typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] //fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) @@ -301,60 +317,50 @@ func handleOptimumP2PTrace(data []byte) { } */ - // if shard-related - /* - switch evt.GetType() { - case optsub.TraceEvent_NEW_SHARD: - fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) - case optsub.TraceEvent_DUPLICATE_SHARD: - fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) - case optsub.TraceEvent_UNHELPFUL_SHARD: - fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) - case optsub.TraceEvent_UNNECESSARY_SHARD: - fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) - } - */ - jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + if writetrace { + dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + traceCh <- dataToSend + } else { + fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + + } /* message_type <- systems information message_id <- application layer - time_stamp <- event occuring the event publish, new shard, duplicate shard - receiver_id - sender_id + time_stamp <- event occuring the event publish, new shard, duplicate shard + receiver_id + sender_id */ } -func writeHashToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string) { - file, err := os.Create(filename) - if err != nil { - log.Fatal(err) - } - defer file.Close() - - writer := bufio.NewWriter(file) - defer writer.Flush() - - // Process until channel is closed - for data := range dataCh { - select { - case <-ctx.Done(): - return - default: - - } - _, err := writer.WriteString(data + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } - done <- true - fmt.Println("All data flushed to disk") -} +func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string) { + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + writer := bufio.NewWriter(file) + defer writer.Flush() + // Process until channel is closed + for data := range dataCh { + select { + case <-ctx.Done(): + return + default: + } + _, err := writer.WriteString(data + "\n") + writer.Flush() + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") +} From 56f32070997a4e48a422c4b5e05d6912aa29b8d4 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Thu, 30 Oct 2025 14:51:37 -0400 Subject: [PATCH 07/19] added the recrieval --- .../p2p_client_multi_streams_subscribe.go | 54 +++++++++++++++++-- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 2e399d2..fbb0cf5 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -3,6 +3,7 @@ package main import ( "bufio" "context" + "github.com/mr-tron/base58" "crypto/sha256" "encoding/hex" "encoding/json" @@ -20,7 +21,7 @@ import ( protobuf "p2p_client/grpc" optsub "p2p_client/grpc/mump2p_trace" - + "github.com/libp2p/go-libp2p/core/peer" "github.com/gogo/protobuf/proto" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "google.golang.org/grpc" @@ -279,13 +280,53 @@ func handleGossipSubTrace(data []byte, writetrace bool, traceCh chan<- string) { //fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) //fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) - jb, _ := json.Marshal(evt) + rawBytes := []byte{} + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + // fmt.Printf("peerID: %s\n", peerID) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + // fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.DeliverMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.PublishMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + + timestamp:= int64(0) + if evt.Timestamp != nil { + timestamp= *evt.Timestamp + // fmt.Printf("Timestamp: %d\n", timestamp) + } + + //jb, _ := json.Marshal(evt) //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) if writetrace { - dataToSend := fmt.Sprintf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + //dataToSend := fmt.Sprintf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) traceCh <- dataToSend } else { - fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) } } @@ -317,7 +358,12 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) } */ + if evt.PeerID != nil { + fmt.Printf("PeerID: %s\n", string(evt.PeerID)) + } + jb, _ := json.Marshal(evt) + if writetrace { dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) traceCh <- dataToSend From 9ac506ab1ca61103ff5883aafe2aac374d8cd92b Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sun, 2 Nov 2025 17:50:17 -0500 Subject: [PATCH 08/19] trace formatting --- .../p2p_client_multi_streams_publish.go | 13 ++- .../p2p_client_multi_streams_subscribe.go | 82 +++++++++++++++++-- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index 1432634..239d69c 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -92,7 +92,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - go writeHashToFile(dataCh, done, *output) + header := fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeHashToFile(dataCh, done, *output, header) }() } @@ -235,7 +236,7 @@ func headHex(b []byte, n int) string { return hex.EncodeToString(b) } -func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string) { +func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) { file, err := os.Create(filename) if err != nil { log.Fatal(err) @@ -245,6 +246,14 @@ func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string) { writer := bufio.NewWriter(file) defer writer.Flush() + // write the header + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + // Process until channel is closed for data := range dataCh { _, err := writer.WriteString(data + "\n") diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index fbb0cf5..477bab7 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -93,7 +93,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - go writeToFile(ctx, dataCh, dataDone, *outputData) + header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + go writeToFile(ctx, dataCh, dataDone, *outputData, header) }() } @@ -101,7 +102,8 @@ func main() { wg.Add(1) go func() { defer wg.Done() - go writeToFile(ctx, traceCh, traceDone, *outputTrace) + header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) }() } @@ -358,20 +360,74 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) } */ - if evt.PeerID != nil { + /*if evt.PeerID != nil { fmt.Printf("PeerID: %s\n", string(evt.PeerID)) } + */ jb, _ := json.Marshal(evt) + rawBytes := []byte{} + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + // fmt.Printf("peerID: %s\n", peerID) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + if evt.NewShard != nil && evt.NewShard.ReceivedFrom != nil { + rawBytes = []byte(evt.NewShard.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + // fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.DeliverMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.PublishMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.NewShard != nil { + rawBytes = []byte(evt.NewShard.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + //fmt.Printf("Topic: %q\n", topic) + } + + timestamp:= int64(0) + if evt.Timestamp != nil { + timestamp= *evt.Timestamp + // fmt.Printf("Timestamp: %d\n", timestamp) + } + + if writetrace { - dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) - traceCh <- dataToSend + //dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend } else { - fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - + //fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) } - /* message_type <- systems information message_id <- application layer @@ -383,7 +439,7 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) } -func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string) { +func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) { file, err := os.Create(filename) if err != nil { log.Fatal(err) @@ -393,6 +449,14 @@ func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, fi writer := bufio.NewWriter(file) defer writer.Flush() + // write the header + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + // Process until channel is closed for data := range dataCh { select { From 42157b5384b2b43adc3795a63b0c020b4df61016 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Fri, 14 Nov 2025 13:22:02 -0500 Subject: [PATCH 09/19] added the multi subscription ready --- grpc_p2p_client/p2p_client.go | 1 - grpc_p2p_client/p2p_client_multi_streams_subscribe.go | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/grpc_p2p_client/p2p_client.go b/grpc_p2p_client/p2p_client.go index 648905b..a75a880 100644 --- a/grpc_p2p_client/p2p_client.go +++ b/grpc_p2p_client/p2p_client.go @@ -194,7 +194,6 @@ func handleResponse(resp *protobuf.Response, counter *int32) { log.Printf("Error unmarshalling message: %v", err) return } - messageSize := len(p2pMessage.Message) //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 477bab7..47f46ea 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -365,8 +365,6 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) } */ - jb, _ := json.Marshal(evt) - rawBytes := []byte{} var peerID peer.ID if evt.PeerID != nil { @@ -388,7 +386,6 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) // fmt.Printf("Receiv: %s\n", recvID) } - msgID := "" topic := "" if evt.DeliverMessage != nil { @@ -419,9 +416,11 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) } + //jb, _ := json.Marshal(evt) + if writetrace { //dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) - fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) +// fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) traceCh <- dataToSend } else { From 1cf236a0407f3d86b7114cef476aac437c0ff151 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Fri, 14 Nov 2025 13:40:28 -0500 Subject: [PATCH 10/19] fixed the datasize issue --- grpc_p2p_client/p2p_client_multi_streams_publish.go | 1 + 1 file changed, 1 insertion(+) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index 239d69c..0c6c7f0 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -85,6 +85,7 @@ func main() { // Buffered channel to prevent blocking dataCh := make(chan string, 100) done := make(chan bool) + *dataSize = int(float32(*dataSize)/2.0) var wg sync.WaitGroup // Start writing the has of the published data From 88a8ba95eacb88ffd9f85b51273b7b3105fc5815 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Fri, 14 Nov 2025 14:40:36 -0500 Subject: [PATCH 11/19] fixed the client stream and size issue of the option --- grpc_p2p_client/p2p_client_multi_streams_publish.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index 0c6c7f0..f99a0ce 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -122,6 +122,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data default: } + for i := 0; i < *count; i++ { conn, err := grpc.NewClient(ip, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( @@ -132,7 +133,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data if err != nil { log.Fatalf("failed to connect to node %v", err) } - defer conn.Close() + //defer conn.Close() println(fmt.Sprintf("Connected to node at: %s…", ip)) client := protobuf.NewCommandStreamClient(conn) @@ -143,7 +144,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data log.Fatalf("ListenCommands: %v", err) } - for i := 0; i < *count; i++ { + //for i := 0; i < *count; i++ { start := time.Now() var data []byte //currentTime := time.Now().UnixNano() @@ -163,6 +164,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data if err := stream.Send(pubReq); err != nil { log.Fatalf("send publish: %v", err) } + fmt.Printf("Published data size %d\n", len(data)) elapsed := time.Since(start) @@ -178,6 +180,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data if *sleep > 0 { time.Sleep(*sleep) } + conn.Close() } return nil From 178937cabda8b1b8368efa5a82bba8416dd4bd0e Mon Sep 17 00:00:00 2001 From: santiago Date: Fri, 14 Nov 2025 16:16:57 -0500 Subject: [PATCH 12/19] fix ci errors --- grpc_p2p_client/p2p_client.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/grpc_p2p_client/p2p_client.go b/grpc_p2p_client/p2p_client.go index 9dc8360..0a6cdd8 100644 --- a/grpc_p2p_client/p2p_client.go +++ b/grpc_p2p_client/p2p_client.go @@ -195,13 +195,16 @@ func handleResponse(resp *protobuf.Response, counter *int32) { return } messageSize := len(p2pMessage.Message) + n := atomic.AddInt32(counter, 1) //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) + fmt.Printf("Recv message: [%d] [%d] %s\n\n", n, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_MessageTraceGossipSub: - handleGossipSubTrace(resp.GetData()) + // Note: These trace handlers are not implemented in this file + log.Printf("GossipSub trace received but handler not implemented") case protobuf.ResponseType_MessageTraceMumP2P: - handleOptimumP2PTrace(resp.GetData()) + // Note: These trace handlers are not implemented in this file + log.Printf("MumP2P trace received but handler not implemented") case protobuf.ResponseType_Unknown: default: log.Println("Unknown response command:", resp.GetCommand()) From f1b66e28fa7dfe1f85e00dbe5cdc8e340bf3bc47 Mon Sep 17 00:00:00 2001 From: santiago Date: Fri, 14 Nov 2025 16:24:27 -0500 Subject: [PATCH 13/19] update ci --- .github/workflows/ci.yml | 2 +- test_suite.sh | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7760992..3eb1e5e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,7 +71,7 @@ jobs: uses: golangci/golangci-lint-action@v7 with: version: ${{ env.lint-version }} - args: --timeout 10m --skip-dirs-use-default + args: --timeout 10m only-new-issues: true working-directory: ./ continue-on-error: true diff --git a/test_suite.sh b/test_suite.sh index 6d5be3a..b8e2258 100755 --- a/test_suite.sh +++ b/test_suite.sh @@ -120,7 +120,7 @@ echo # Test 2: Subscribe (empty topic) echo -e "${YELLOW}Test: Subscribe (empty topic)${NC}" resp=$(api_subscribe "$CLIENT_ID" "" 0.7) -test_result "$resp" 'topic is missing' "Subscribe (empty topic)" +test_result "$resp" 'missing topic' "Subscribe (empty topic)" echo # Test 3: Publish (valid) - but first ensure subscription exists @@ -230,7 +230,7 @@ echo echo -e "${YELLOW}Test: Subscribe validation (empty topic)${NC}" resp=$(api_subscribe "$CLIENT_ID" "" 0.7) -test_result "$resp" 'topic is missing' "Subscribe validation (empty topic)" +test_result "$resp" 'missing topic' "Subscribe validation (empty topic)" echo echo -e "${YELLOW}Test: Publish validation (empty message)${NC}" @@ -242,7 +242,7 @@ echo -e "${YELLOW}Test: Publish validation (missing topic)${NC}" resp=$(curl -s -X POST "$PROXY_URL/api/v1/publish" \ -H "Content-Type: application/json" \ -d "{\"client_id\": \"$CLIENT_ID\", \"message\": \"Hello\"}") -test_result "$resp" 'topic is missing' "Publish validation (missing topic)" +test_result "$resp" 'missing topic' "Publish validation (missing topic)" echo echo -e "${YELLOW}Test: Publish validation (missing message)${NC}" From f040b3aa3eb11aee6360f4173f563846a852a140 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 15 Nov 2025 11:08:32 -0500 Subject: [PATCH 14/19] fix code rabbits suggestions --- .../p2p_client_multi_streams_publish.go | 72 ++++++++++--------- .../p2p_client_multi_streams_subscribe.go | 10 ++- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index f99a0ce..c95aaa8 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "crypto/rand" + mathrand "math/rand" "crypto/sha256" "encoding/hex" "encoding/json" @@ -47,6 +48,7 @@ var ( // optional: number of messages to publish (for stress testing or batch sending) count = flag.Int("count", 1, "number of messages to publish") + poisson = flag.Bool("poisson", false, "Enable Poisson arrival") dataSize = flag.Int("datasize", 100, "size of random of messages to publish") // optional: sleep duration between publishes sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") @@ -85,15 +87,13 @@ func main() { // Buffered channel to prevent blocking dataCh := make(chan string, 100) done := make(chan bool) - *dataSize = int(float32(*dataSize)/2.0) + *dataSize = int(float32(*dataSize) / 2.0) var wg sync.WaitGroup // Start writing the has of the published data if *output != "" { - wg.Add(1) go func() { - defer wg.Done() - header := fmt.Sprintf("sender\tsize\tsha256(msg)") + header := fmt.Sprintf("sender\tsize\tsha256(msg)") go writeHashToFile(dataCh, done, *output, header) }() } @@ -123,28 +123,27 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } for i := 0; i < *count; i++ { - conn, err := grpc.NewClient(ip, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(math.MaxInt), - grpc.MaxCallSendMsgSize(math.MaxInt), - ), - ) - if err != nil { - log.Fatalf("failed to connect to node %v", err) - } - //defer conn.Close() - println(fmt.Sprintf("Connected to node at: %s…", ip)) + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + //defer conn.Close() + println(fmt.Sprintf("Connected to node at: %s…", ip)) - client := protobuf.NewCommandStreamClient(conn) + client := protobuf.NewCommandStreamClient(conn) - stream, err := client.ListenCommands(ctx) + stream, err := client.ListenCommands(ctx) - if err != nil { - log.Fatalf("ListenCommands: %v", err) - } + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } - //for i := 0; i < *count; i++ { start := time.Now() var data []byte //currentTime := time.Now().UnixNano() @@ -177,10 +176,17 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed) - if *sleep > 0 { - time.Sleep(*sleep) - } - conn.Close() + if *poisson { + lambda := 1.0 / (*sleep).Seconds() + interval := mathrand.ExpFloat64() / lambda + waitTime := time.Duration(interval * float64(time.Second)) + time.Sleep(waitTime) + } else { + time.Sleep(*sleep) + + } + + conn.Close() } return nil @@ -250,13 +256,13 @@ func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, he writer := bufio.NewWriter(file) defer writer.Flush() - // write the header - if header != "" { - _, err := writer.WriteString(header + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } + // write the header + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } // Process until channel is closed for data := range dataCh { diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 47f46ea..35b181f 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -90,27 +90,25 @@ func main() { // Launch goroutines with synchronization var wg sync.WaitGroup if *outputData != "" { - wg.Add(1) go func() { - defer wg.Done() + defer wg.Done() header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") go writeToFile(ctx, dataCh, dataDone, *outputData, header) }() } if *outputTrace != "" { - wg.Add(1) go func() { - defer wg.Done() + defer wg.Done() header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) }() } for _, ip := range ips { - wg.Add(1) + wg.Add(1); go func(ip string) { - defer wg.Done() + defer wg.Done() receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) }(ip) } From c8659a16dc140e3dc08af6fb5aab3cf7166184fa Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 15 Nov 2025 11:32:38 -0500 Subject: [PATCH 15/19] fixed the protofub field change issue --- grpc_p2p_client/go.mod | 19 +- grpc_p2p_client/local.ips.tsv | 951 ++++++++++++++++++ .../p2p_client_multi_streams_subscribe.go | 2 +- 3 files changed, 970 insertions(+), 2 deletions(-) diff --git a/grpc_p2p_client/go.mod b/grpc_p2p_client/go.mod index d373e8e..15a190f 100644 --- a/grpc_p2p_client/go.mod +++ b/grpc_p2p_client/go.mod @@ -4,15 +4,32 @@ go 1.24.1 require ( github.com/gogo/protobuf v1.3.2 - github.com/golang/protobuf v1.5.4 + github.com/libp2p/go-libp2p v0.39.1 github.com/libp2p/go-libp2p-pubsub v0.14.2 + github.com/mr-tron/base58 v1.2.0 google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 ) require ( + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/ipfs/go-cid v0.5.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/libp2p/go-buffer-pool v0.1.0 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/multiformats/go-multiaddr v0.14.0 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multicodec v0.9.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-varint v0.0.7 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect + lukechampine.com/blake3 v1.3.0 // indirect ) diff --git a/grpc_p2p_client/local.ips.tsv b/grpc_p2p_client/local.ips.tsv index 75b0f3a..c06dbd9 100644 --- a/grpc_p2p_client/local.ips.tsv +++ b/grpc_p2p_client/local.ips.tsv @@ -48,3 +48,954 @@ localhost:33258 localhost:33259 localhost:33260 localhost:33261 +localhost:33262 +localhost:33263 +localhost:33264 +localhost:33265 +localhost:33266 +localhost:33267 +localhost:33268 +localhost:33269 +localhost:33270 +localhost:33271 +localhost:33272 +localhost:33273 +localhost:33274 +localhost:33275 +localhost:33276 +localhost:33277 +localhost:33278 +localhost:33279 +localhost:33280 +localhost:33281 +localhost:33282 +localhost:33283 +localhost:33284 +localhost:33285 +localhost:33286 +localhost:33287 +localhost:33288 +localhost:33289 +localhost:33290 +localhost:33291 +localhost:33292 +localhost:33293 +localhost:33294 +localhost:33295 +localhost:33296 +localhost:33297 +localhost:33298 +localhost:33299 +localhost:33300 +localhost:33301 +localhost:33302 +localhost:33303 +localhost:33304 +localhost:33305 +localhost:33306 +localhost:33307 +localhost:33308 +localhost:33309 +localhost:33310 +localhost:33311 +localhost:33312 +localhost:33313 +localhost:33314 +localhost:33315 +localhost:33316 +localhost:33317 +localhost:33318 +localhost:33319 +localhost:33320 +localhost:33321 +localhost:33322 +localhost:33323 +localhost:33324 +localhost:33325 +localhost:33326 +localhost:33327 +localhost:33328 +localhost:33329 +localhost:33330 +localhost:33331 +localhost:33332 +localhost:33333 +localhost:33334 +localhost:33335 +localhost:33336 +localhost:33337 +localhost:33338 +localhost:33339 +localhost:33340 +localhost:33341 +localhost:33342 +localhost:33343 +localhost:33344 +localhost:33345 +localhost:33346 +localhost:33347 +localhost:33348 +localhost:33349 +localhost:33350 +localhost:33351 +localhost:33352 +localhost:33353 +localhost:33354 +localhost:33355 +localhost:33356 +localhost:33357 +localhost:33358 +localhost:33359 +localhost:33360 +localhost:33361 +localhost:33362 +localhost:33363 +localhost:33364 +localhost:33365 +localhost:33366 +localhost:33367 +localhost:33368 +localhost:33369 +localhost:33370 +localhost:33371 +localhost:33372 +localhost:33373 +localhost:33374 +localhost:33375 +localhost:33376 +localhost:33377 +localhost:33378 +localhost:33379 +localhost:33380 +localhost:33381 +localhost:33382 +localhost:33383 +localhost:33384 +localhost:33385 +localhost:33386 +localhost:33387 +localhost:33388 +localhost:33389 +localhost:33390 +localhost:33391 +localhost:33392 +localhost:33393 +localhost:33394 +localhost:33395 +localhost:33396 +localhost:33397 +localhost:33398 +localhost:33399 +localhost:33400 +localhost:33401 +localhost:33402 +localhost:33403 +localhost:33404 +localhost:33405 +localhost:33406 +localhost:33407 +localhost:33408 +localhost:33409 +localhost:33410 +localhost:33411 +localhost:33412 +localhost:33413 +localhost:33414 +localhost:33415 +localhost:33416 +localhost:33417 +localhost:33418 +localhost:33419 +localhost:33420 +localhost:33421 +localhost:33422 +localhost:33423 +localhost:33424 +localhost:33425 +localhost:33426 +localhost:33427 +localhost:33428 +localhost:33429 +localhost:33430 +localhost:33431 +localhost:33432 +localhost:33433 +localhost:33434 +localhost:33435 +localhost:33436 +localhost:33437 +localhost:33438 +localhost:33439 +localhost:33440 +localhost:33441 +localhost:33442 +localhost:33443 +localhost:33444 +localhost:33445 +localhost:33446 +localhost:33447 +localhost:33448 +localhost:33449 +localhost:33450 +localhost:33451 +localhost:33452 +localhost:33453 +localhost:33454 +localhost:33455 +localhost:33456 +localhost:33457 +localhost:33458 +localhost:33459 +localhost:33460 +localhost:33461 +localhost:33462 +localhost:33463 +localhost:33464 +localhost:33465 +localhost:33466 +localhost:33467 +localhost:33468 +localhost:33469 +localhost:33470 +localhost:33471 +localhost:33472 +localhost:33473 +localhost:33474 +localhost:33475 +localhost:33476 +localhost:33477 +localhost:33478 +localhost:33479 +localhost:33480 +localhost:33481 +localhost:33482 +localhost:33483 +localhost:33484 +localhost:33485 +localhost:33486 +localhost:33487 +localhost:33488 +localhost:33489 +localhost:33490 +localhost:33491 +localhost:33492 +localhost:33493 +localhost:33494 +localhost:33495 +localhost:33496 +localhost:33497 +localhost:33498 +localhost:33499 +localhost:33500 +localhost:33501 +localhost:33502 +localhost:33503 +localhost:33504 +localhost:33505 +localhost:33506 +localhost:33507 +localhost:33508 +localhost:33509 +localhost:33510 +localhost:33511 +localhost:33512 +localhost:33513 +localhost:33514 +localhost:33515 +localhost:33516 +localhost:33517 +localhost:33518 +localhost:33519 +localhost:33520 +localhost:33521 +localhost:33522 +localhost:33523 +localhost:33524 +localhost:33525 +localhost:33526 +localhost:33527 +localhost:33528 +localhost:33529 +localhost:33530 +localhost:33531 +localhost:33532 +localhost:33533 +localhost:33534 +localhost:33535 +localhost:33536 +localhost:33537 +localhost:33538 +localhost:33539 +localhost:33540 +localhost:33541 +localhost:33542 +localhost:33543 +localhost:33544 +localhost:33545 +localhost:33546 +localhost:33547 +localhost:33548 +localhost:33549 +localhost:33550 +localhost:33551 +localhost:33552 +localhost:33553 +localhost:33554 +localhost:33555 +localhost:33556 +localhost:33557 +localhost:33558 +localhost:33559 +localhost:33560 +localhost:33561 +localhost:33562 +localhost:33563 +localhost:33564 +localhost:33565 +localhost:33566 +localhost:33567 +localhost:33568 +localhost:33569 +localhost:33570 +localhost:33571 +localhost:33572 +localhost:33573 +localhost:33574 +localhost:33575 +localhost:33576 +localhost:33577 +localhost:33578 +localhost:33579 +localhost:33580 +localhost:33581 +localhost:33582 +localhost:33583 +localhost:33584 +localhost:33585 +localhost:33586 +localhost:33587 +localhost:33588 +localhost:33589 +localhost:33590 +localhost:33591 +localhost:33592 +localhost:33593 +localhost:33594 +localhost:33595 +localhost:33596 +localhost:33597 +localhost:33598 +localhost:33599 +localhost:33600 +localhost:33601 +localhost:33602 +localhost:33603 +localhost:33604 +localhost:33605 +localhost:33606 +localhost:33607 +localhost:33608 +localhost:33609 +localhost:33610 +localhost:33611 +localhost:33612 +localhost:33613 +localhost:33614 +localhost:33615 +localhost:33616 +localhost:33617 +localhost:33618 +localhost:33619 +localhost:33620 +localhost:33621 +localhost:33622 +localhost:33623 +localhost:33624 +localhost:33625 +localhost:33626 +localhost:33627 +localhost:33628 +localhost:33629 +localhost:33630 +localhost:33631 +localhost:33632 +localhost:33633 +localhost:33634 +localhost:33635 +localhost:33636 +localhost:33637 +localhost:33638 +localhost:33639 +localhost:33640 +localhost:33641 +localhost:33642 +localhost:33643 +localhost:33644 +localhost:33645 +localhost:33646 +localhost:33647 +localhost:33648 +localhost:33649 +localhost:33650 +localhost:33651 +localhost:33652 +localhost:33653 +localhost:33654 +localhost:33655 +localhost:33656 +localhost:33657 +localhost:33658 +localhost:33659 +localhost:33660 +localhost:33661 +localhost:33662 +localhost:33663 +localhost:33664 +localhost:33665 +localhost:33666 +localhost:33667 +localhost:33668 +localhost:33669 +localhost:33670 +localhost:33671 +localhost:33672 +localhost:33673 +localhost:33674 +localhost:33675 +localhost:33676 +localhost:33677 +localhost:33678 +localhost:33679 +localhost:33680 +localhost:33681 +localhost:33682 +localhost:33683 +localhost:33684 +localhost:33685 +localhost:33686 +localhost:33687 +localhost:33688 +localhost:33689 +localhost:33690 +localhost:33691 +localhost:33692 +localhost:33693 +localhost:33694 +localhost:33695 +localhost:33696 +localhost:33697 +localhost:33698 +localhost:33699 +localhost:33700 +localhost:33701 +localhost:33702 +localhost:33703 +localhost:33704 +localhost:33705 +localhost:33706 +localhost:33707 +localhost:33708 +localhost:33709 +localhost:33710 +localhost:33711 +localhost:33712 +localhost:33713 +localhost:33714 +localhost:33715 +localhost:33716 +localhost:33717 +localhost:33718 +localhost:33719 +localhost:33720 +localhost:33721 +localhost:33722 +localhost:33723 +localhost:33724 +localhost:33725 +localhost:33726 +localhost:33727 +localhost:33728 +localhost:33729 +localhost:33730 +localhost:33731 +localhost:33732 +localhost:33733 +localhost:33734 +localhost:33735 +localhost:33736 +localhost:33737 +localhost:33738 +localhost:33739 +localhost:33740 +localhost:33741 +localhost:33742 +localhost:33743 +localhost:33744 +localhost:33745 +localhost:33746 +localhost:33747 +localhost:33748 +localhost:33749 +localhost:33750 +localhost:33751 +localhost:33752 +localhost:33753 +localhost:33754 +localhost:33755 +localhost:33756 +localhost:33757 +localhost:33758 +localhost:33759 +localhost:33760 +localhost:33761 +localhost:33762 +localhost:33763 +localhost:33764 +localhost:33765 +localhost:33766 +localhost:33767 +localhost:33768 +localhost:33769 +localhost:33770 +localhost:33771 +localhost:33772 +localhost:33773 +localhost:33774 +localhost:33775 +localhost:33776 +localhost:33777 +localhost:33778 +localhost:33779 +localhost:33780 +localhost:33781 +localhost:33782 +localhost:33783 +localhost:33784 +localhost:33785 +localhost:33786 +localhost:33787 +localhost:33788 +localhost:33789 +localhost:33790 +localhost:33791 +localhost:33792 +localhost:33793 +localhost:33794 +localhost:33795 +localhost:33796 +localhost:33797 +localhost:33798 +localhost:33799 +localhost:33800 +localhost:33801 +localhost:33802 +localhost:33803 +localhost:33804 +localhost:33805 +localhost:33806 +localhost:33807 +localhost:33808 +localhost:33809 +localhost:33810 +localhost:33811 +localhost:33812 +localhost:33813 +localhost:33814 +localhost:33815 +localhost:33816 +localhost:33817 +localhost:33818 +localhost:33819 +localhost:33820 +localhost:33821 +localhost:33822 +localhost:33823 +localhost:33824 +localhost:33825 +localhost:33826 +localhost:33827 +localhost:33828 +localhost:33829 +localhost:33830 +localhost:33831 +localhost:33832 +localhost:33833 +localhost:33834 +localhost:33835 +localhost:33836 +localhost:33837 +localhost:33838 +localhost:33839 +localhost:33840 +localhost:33841 +localhost:33842 +localhost:33843 +localhost:33844 +localhost:33845 +localhost:33846 +localhost:33847 +localhost:33848 +localhost:33849 +localhost:33850 +localhost:33851 +localhost:33852 +localhost:33853 +localhost:33854 +localhost:33855 +localhost:33856 +localhost:33857 +localhost:33858 +localhost:33859 +localhost:33860 +localhost:33861 +localhost:33862 +localhost:33863 +localhost:33864 +localhost:33865 +localhost:33866 +localhost:33867 +localhost:33868 +localhost:33869 +localhost:33870 +localhost:33871 +localhost:33872 +localhost:33873 +localhost:33874 +localhost:33875 +localhost:33876 +localhost:33877 +localhost:33878 +localhost:33879 +localhost:33880 +localhost:33881 +localhost:33882 +localhost:33883 +localhost:33884 +localhost:33885 +localhost:33886 +localhost:33887 +localhost:33888 +localhost:33889 +localhost:33890 +localhost:33891 +localhost:33892 +localhost:33893 +localhost:33894 +localhost:33895 +localhost:33896 +localhost:33897 +localhost:33898 +localhost:33899 +localhost:33900 +localhost:33901 +localhost:33902 +localhost:33903 +localhost:33904 +localhost:33905 +localhost:33906 +localhost:33907 +localhost:33908 +localhost:33909 +localhost:33910 +localhost:33911 +localhost:33912 +localhost:33913 +localhost:33914 +localhost:33915 +localhost:33916 +localhost:33917 +localhost:33918 +localhost:33919 +localhost:33920 +localhost:33921 +localhost:33922 +localhost:33923 +localhost:33924 +localhost:33925 +localhost:33926 +localhost:33927 +localhost:33928 +localhost:33929 +localhost:33930 +localhost:33931 +localhost:33932 +localhost:33933 +localhost:33934 +localhost:33935 +localhost:33936 +localhost:33937 +localhost:33938 +localhost:33939 +localhost:33940 +localhost:33941 +localhost:33942 +localhost:33943 +localhost:33944 +localhost:33945 +localhost:33946 +localhost:33947 +localhost:33948 +localhost:33949 +localhost:33950 +localhost:33951 +localhost:33952 +localhost:33953 +localhost:33954 +localhost:33955 +localhost:33956 +localhost:33957 +localhost:33958 +localhost:33959 +localhost:33960 +localhost:33961 +localhost:33962 +localhost:33963 +localhost:33964 +localhost:33965 +localhost:33966 +localhost:33967 +localhost:33968 +localhost:33969 +localhost:33970 +localhost:33971 +localhost:33972 +localhost:33973 +localhost:33974 +localhost:33975 +localhost:33976 +localhost:33977 +localhost:33978 +localhost:33979 +localhost:33980 +localhost:33981 +localhost:33982 +localhost:33983 +localhost:33984 +localhost:33985 +localhost:33986 +localhost:33987 +localhost:33988 +localhost:33989 +localhost:33990 +localhost:33991 +localhost:33992 +localhost:33993 +localhost:33994 +localhost:33995 +localhost:33996 +localhost:33997 +localhost:33998 +localhost:33999 +localhost:34000 +localhost:34001 +localhost:34002 +localhost:34003 +localhost:34004 +localhost:34005 +localhost:34006 +localhost:34007 +localhost:34008 +localhost:34009 +localhost:34010 +localhost:34011 +localhost:34012 +localhost:34013 +localhost:34014 +localhost:34015 +localhost:34016 +localhost:34017 +localhost:34018 +localhost:34019 +localhost:34020 +localhost:34021 +localhost:34022 +localhost:34023 +localhost:34024 +localhost:34025 +localhost:34026 +localhost:34027 +localhost:34028 +localhost:34029 +localhost:34030 +localhost:34031 +localhost:34032 +localhost:34033 +localhost:34034 +localhost:34035 +localhost:34036 +localhost:34037 +localhost:34038 +localhost:34039 +localhost:34040 +localhost:34041 +localhost:34042 +localhost:34043 +localhost:34044 +localhost:34045 +localhost:34046 +localhost:34047 +localhost:34048 +localhost:34049 +localhost:34050 +localhost:34051 +localhost:34052 +localhost:34053 +localhost:34054 +localhost:34055 +localhost:34056 +localhost:34057 +localhost:34058 +localhost:34059 +localhost:34060 +localhost:34061 +localhost:34062 +localhost:34063 +localhost:34064 +localhost:34065 +localhost:34066 +localhost:34067 +localhost:34068 +localhost:34069 +localhost:34070 +localhost:34071 +localhost:34072 +localhost:34073 +localhost:34074 +localhost:34075 +localhost:34076 +localhost:34077 +localhost:34078 +localhost:34079 +localhost:34080 +localhost:34081 +localhost:34082 +localhost:34083 +localhost:34084 +localhost:34085 +localhost:34086 +localhost:34087 +localhost:34088 +localhost:34089 +localhost:34090 +localhost:34091 +localhost:34092 +localhost:34093 +localhost:34094 +localhost:34095 +localhost:34096 +localhost:34097 +localhost:34098 +localhost:34099 +localhost:34100 +localhost:34101 +localhost:34102 +localhost:34103 +localhost:34104 +localhost:34105 +localhost:34106 +localhost:34107 +localhost:34108 +localhost:34109 +localhost:34110 +localhost:34111 +localhost:34112 +localhost:34113 +localhost:34114 +localhost:34115 +localhost:34116 +localhost:34117 +localhost:34118 +localhost:34119 +localhost:34120 +localhost:34121 +localhost:34122 +localhost:34123 +localhost:34124 +localhost:34125 +localhost:34126 +localhost:34127 +localhost:34128 +localhost:34129 +localhost:34130 +localhost:34131 +localhost:34132 +localhost:34133 +localhost:34134 +localhost:34135 +localhost:34136 +localhost:34137 +localhost:34138 +localhost:34139 +localhost:34140 +localhost:34141 +localhost:34142 +localhost:34143 +localhost:34144 +localhost:34145 +localhost:34146 +localhost:34147 +localhost:34148 +localhost:34149 +localhost:34150 +localhost:34151 +localhost:34152 +localhost:34153 +localhost:34154 +localhost:34155 +localhost:34156 +localhost:34157 +localhost:34158 +localhost:34159 +localhost:34160 +localhost:34161 +localhost:34162 +localhost:34163 +localhost:34164 +localhost:34165 +localhost:34166 +localhost:34167 +localhost:34168 +localhost:34169 +localhost:34170 +localhost:34171 +localhost:34172 +localhost:34173 +localhost:34174 +localhost:34175 +localhost:34176 +localhost:34177 +localhost:34178 +localhost:34179 +localhost:34180 +localhost:34181 +localhost:34182 +localhost:34183 +localhost:34184 +localhost:34185 +localhost:34186 +localhost:34187 +localhost:34188 +localhost:34189 +localhost:34190 +localhost:34191 +localhost:34192 +localhost:34193 +localhost:34194 +localhost:34195 +localhost:34196 +localhost:34197 +localhost:34198 +localhost:34199 +localhost:34200 +localhost:34201 +localhost:34202 +localhost:34203 +localhost:34204 +localhost:34205 +localhost:34206 +localhost:34207 +localhost:34208 +localhost:34209 +localhost:34210 +localhost:34211 +localhost:34212 diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 35b181f..0b4e00d 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -253,7 +253,7 @@ func handleResponse(ip string, resp *protobuf.Response, counter *int32, } //fmt.Printf("Recv message: %s %d %s\n", ip, messageSize, string(p2pMessage.Message)) - case protobuf.ResponseType_MessageTraceOptimumP2P: + case protobuf.ResponseType_MessageTraceMumP2P: handleOptimumP2PTrace(resp.GetData(), writetrace, traceCh) case protobuf.ResponseType_MessageTraceGossipSub: handleGossipSubTrace(resp.GetData(), writetrace, traceCh) From 03aa65e106cef3d56c19836171529d6930eebb42 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 15 Nov 2025 11:45:56 -0500 Subject: [PATCH 16/19] fixed coderabbit's suggestions --- .../p2p_client_multi_streams_publish.go | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index c95aaa8..ad2d61a 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -4,7 +4,6 @@ import ( "bufio" "context" "crypto/rand" - mathrand "math/rand" "crypto/sha256" "encoding/hex" "encoding/json" @@ -12,6 +11,7 @@ import ( "fmt" "log" "math" + mathrand "math/rand" "os" "os/signal" "strings" @@ -48,7 +48,7 @@ var ( // optional: number of messages to publish (for stress testing or batch sending) count = flag.Int("count", 1, "number of messages to publish") - poisson = flag.Bool("poisson", false, "Enable Poisson arrival") + poisson = flag.Bool("poisson", false, "Enable Poisson arrival") dataSize = flag.Int("datasize", 100, "size of random of messages to publish") // optional: sleep duration between publishes sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") @@ -86,12 +86,12 @@ func main() { // Buffered channel to prevent blocking dataCh := make(chan string, 100) - done := make(chan bool) *dataSize = int(float32(*dataSize) / 2.0) - + var done chan bool var wg sync.WaitGroup // Start writing the has of the published data if *output != "" { + done := make(chan bool) go func() { header := fmt.Sprintf("sender\tsize\tsha256(msg)") go writeHashToFile(dataCh, done, *output, header) @@ -109,20 +109,22 @@ func main() { } wg.Wait() close(dataCh) - <-done + if done != nil { + <-done + } } func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { // connect with simple gRPC settings - select { - case <-ctx.Done(): - log.Printf("[%s] context canceled, stopping", ip) - return ctx.Err() - default: - } - for i := 0; i < *count; i++ { + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + conn, err := grpc.NewClient(ip, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( @@ -149,7 +151,8 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data //currentTime := time.Now().UnixNano() randomBytes := make([]byte, datasize) if _, err := rand.Read(randomBytes); err != nil { - log.Fatalf("failed to generate random bytes: %v", err) + return fmt.Errorf("[%s] failed to generate random bytes: %w", ip, err) + } randomSuffix := hex.EncodeToString(randomBytes) @@ -161,7 +164,7 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } if err := stream.Send(pubReq); err != nil { - log.Fatalf("send publish: %v", err) + return fmt.Errorf("[%s] send publish: %w", ip, err) } fmt.Printf("Published data size %d\n", len(data)) @@ -176,16 +179,16 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data } fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed) - if *poisson { - lambda := 1.0 / (*sleep).Seconds() - interval := mathrand.ExpFloat64() / lambda - waitTime := time.Duration(interval * float64(time.Second)) - time.Sleep(waitTime) - } else { - time.Sleep(*sleep) + if *poisson { + lambda := 1.0 / (*sleep).Seconds() + interval := mathrand.ExpFloat64() / lambda + waitTime := time.Duration(interval * float64(time.Second)) + time.Sleep(waitTime) + } else { + time.Sleep(*sleep) + + } - } - conn.Close() } From ba5eaa3415b59e0fe45e7d3daaced408bb61d702 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 15 Nov 2025 11:51:59 -0500 Subject: [PATCH 17/19] fixed code rabbit's suggestions --- grpc_p2p_client/p2p_client_multi_streams_subscribe.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 0b4e00d..607865e 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -68,6 +68,10 @@ func main() { } fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) *endIdx = min(len(_ips), *endIdx) + if *startIdx < 0 || *startIdx >= *endIdx || *startIdx >= len(_ips) { + log.Fatalf("invalid index range: start-index=%d end-index=%d (num IPs=%d)", *startIdx, *endIdx, len(_ips)) + } + ips := _ips[*startIdx:*endIdx] fmt.Printf("Found %d IPs: %v\n", len(ips), ips) From c336455a78605190735ba483f9da6cf0b2a004c0 Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 15 Nov 2025 11:53:25 -0500 Subject: [PATCH 18/19] fixed the done --- grpc_p2p_client/p2p_client_multi_streams_publish.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go index ad2d61a..c43d713 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_publish.go +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -91,7 +91,7 @@ func main() { var wg sync.WaitGroup // Start writing the has of the published data if *output != "" { - done := make(chan bool) + done = make(chan bool) go func() { header := fmt.Sprintf("sender\tsize\tsha256(msg)") go writeHashToFile(dataCh, done, *output, header) From e420eec7d19aab8ca7cf12f41fa0b9b3385d42ac Mon Sep 17 00:00:00 2001 From: kishori82 Date: Sat, 15 Nov 2025 11:58:52 -0500 Subject: [PATCH 19/19] fixed dataChan traceChan --- .../p2p_client_multi_streams_subscribe.go | 255 +++++++++--------- 1 file changed, 130 insertions(+), 125 deletions(-) diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go index 607865e..3b954de 100644 --- a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -3,12 +3,12 @@ package main import ( "bufio" "context" - "github.com/mr-tron/base58" "crypto/sha256" "encoding/hex" "encoding/json" "flag" "fmt" + "github.com/mr-tron/base58" "io" "log" "math" @@ -19,13 +19,13 @@ import ( "sync/atomic" "syscall" - protobuf "p2p_client/grpc" - optsub "p2p_client/grpc/mump2p_trace" - "github.com/libp2p/go-libp2p/core/peer" "github.com/gogo/protobuf/proto" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + protobuf "p2p_client/grpc" + optsub "p2p_client/grpc/mump2p_trace" ) // P2PMessage represents a message structure used in P2P communication @@ -88,31 +88,33 @@ func main() { // Buffered channel to prevent blocking dataCh := make(chan string, 100) traceCh := make(chan string, 100) - dataDone := make(chan bool) - traceDone := make(chan bool) + var dataDone chan bool + var traceDone chan bool // Launch goroutines with synchronization var wg sync.WaitGroup if *outputData != "" { + dataDone = make(chan bool) go func() { - defer wg.Done() - header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + defer wg.Done() + header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") go writeToFile(ctx, dataCh, dataDone, *outputData, header) }() } if *outputTrace != "" { + traceDone = make(chan bool) go func() { - defer wg.Done() - header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") + defer wg.Done() + header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) }() } for _, ip := range ips { - wg.Add(1); + wg.Add(1) go func(ip string) { - defer wg.Done() + defer wg.Done() receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) }(ip) } @@ -120,8 +122,12 @@ func main() { wg.Wait() close(dataCh) close(traceCh) - <-dataDone - <-traceDone + if dataDone != nil { + <-dataDone + } + if traceDone != nil { + <-traceDone + } } func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, @@ -284,53 +290,53 @@ func handleGossipSubTrace(data []byte, writetrace bool, traceCh chan<- string) { //fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) //fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) - rawBytes := []byte{} - var peerID peer.ID - if evt.PeerID != nil { - rawBytes := []byte(evt.PeerID) - peerID = peer.ID(rawBytes) - // fmt.Printf("peerID: %s\n", peerID) - } - - recvID := "" - if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { - rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) - recvID = base58.Encode(rawBytes) - // fmt.Printf("Receiv: %s\n", recvID) - } - - msgID := "" - topic := "" - if evt.DeliverMessage != nil { - rawBytes = []byte(evt.DeliverMessage.MessageID) - msgID = base58.Encode(rawBytes) - // fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.DeliverMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - if evt.PublishMessage != nil { - rawBytes = []byte(evt.PublishMessage.MessageID) - msgID = base58.Encode(rawBytes) - //fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.PublishMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - - timestamp:= int64(0) - if evt.Timestamp != nil { - timestamp= *evt.Timestamp - // fmt.Printf("Timestamp: %d\n", timestamp) - } + rawBytes := []byte{} + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + // fmt.Printf("peerID: %s\n", peerID) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + // fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.DeliverMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.PublishMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + + timestamp := int64(0) + if evt.Timestamp != nil { + timestamp = *evt.Timestamp + // fmt.Printf("Timestamp: %d\n", timestamp) + } //jb, _ := json.Marshal(evt) //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) if writetrace { //dataToSend := fmt.Sprintf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) - dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) traceCh <- dataToSend } else { //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) } } @@ -362,79 +368,78 @@ func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) } */ - /*if evt.PeerID != nil { - fmt.Printf("PeerID: %s\n", string(evt.PeerID)) - } - */ - - rawBytes := []byte{} - var peerID peer.ID - if evt.PeerID != nil { - rawBytes := []byte(evt.PeerID) - peerID = peer.ID(rawBytes) - // fmt.Printf("peerID: %s\n", peerID) - } - - recvID := "" - if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { - rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) - recvID = base58.Encode(rawBytes) - // fmt.Printf("Receiv: %s\n", recvID) - } - - if evt.NewShard != nil && evt.NewShard.ReceivedFrom != nil { - rawBytes = []byte(evt.NewShard.ReceivedFrom) - recvID = base58.Encode(rawBytes) - // fmt.Printf("Receiv: %s\n", recvID) - } - - msgID := "" - topic := "" - if evt.DeliverMessage != nil { - rawBytes = []byte(evt.DeliverMessage.MessageID) - msgID = base58.Encode(rawBytes) - // fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.DeliverMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - if evt.PublishMessage != nil { - rawBytes = []byte(evt.PublishMessage.MessageID) - msgID = base58.Encode(rawBytes) - //fmt.Printf("MsgID: %s\n", msgID) - topic = string(*evt.PublishMessage.Topic) - //fmt.Printf("Topic: %q\n", topic) - } - if evt.NewShard != nil { - rawBytes = []byte(evt.NewShard.MessageID) - msgID = base58.Encode(rawBytes) - //fmt.Printf("MsgID: %s\n", msgID) - //fmt.Printf("Topic: %q\n", topic) - } - - timestamp:= int64(0) - if evt.Timestamp != nil { - timestamp= *evt.Timestamp - // fmt.Printf("Timestamp: %d\n", timestamp) - } + /*if evt.PeerID != nil { + fmt.Printf("PeerID: %s\n", string(evt.PeerID)) + } + */ + rawBytes := []byte{} + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + // fmt.Printf("peerID: %s\n", peerID) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + if evt.NewShard != nil && evt.NewShard.ReceivedFrom != nil { + rawBytes = []byte(evt.NewShard.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + // fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.DeliverMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.PublishMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.NewShard != nil { + rawBytes = []byte(evt.NewShard.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + //fmt.Printf("Topic: %q\n", topic) + } + + timestamp := int64(0) + if evt.Timestamp != nil { + timestamp = *evt.Timestamp + // fmt.Printf("Timestamp: %d\n", timestamp) + } //jb, _ := json.Marshal(evt) if writetrace { - //dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) -// fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - dataToSend :=fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) - traceCh <- dataToSend + //dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + // fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend } else { - //fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) - fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + //fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) } /* - message_type <- systems information - message_id <- application layer - time_stamp <- event occuring the event publish, new shard, duplicate shard - receiver_id - sender_id + message_type <- systems information + message_id <- application layer + time_stamp <- event occuring the event publish, new shard, duplicate shard + receiver_id + sender_id */ @@ -450,13 +455,13 @@ func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, fi writer := bufio.NewWriter(file) defer writer.Flush() - // write the header - if header != "" { - _, err := writer.WriteString(header + "\n") - if err != nil { - log.Printf("Write error: %v", err) - } - } + // write the header + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } // Process until channel is closed for data := range dataCh {