Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,20 @@ jobs:
cache: true
- uses: actions/checkout@v4

- name: Build P2P Client
run: |
cd grpc_p2p_client
go build -o p2p-client ./p2p_client.go
echo "P2P client built successfully"

- name: Build Proxy Client
run: |
cd grpc_proxy_client
go build -o proxy-client ./proxy_client.go
echo "Proxy client built successfully"

- name: Build Key Generator
run: |
cd keygen
go build -o generate-p2p-key ./generate_p2p_key.go
echo "Key generator built successfully"

- name: Test Key Generation
run: |
cd keygen
go run ./generate_p2p_key.go
echo "Key generation test passed"

# Lint Go code
golangci:
Expand Down Expand Up @@ -218,27 +209,3 @@ jobs:
cd ../grpc_proxy_client && go mod verify
cd ../keygen && go mod verify
echo "All Go modules are valid"

# This job ensures all checks pass - useful for branch protection rules
ci-success:
name: CI Success
runs-on: ubuntu-latest
needs: [test-go-clients, golangci, test-docker-setup, test-scripts, validate-config]
if: always()
steps:
- name: Check all jobs succeeded
run: |
if [[ "${{ needs.test-go-clients.result }}" != "success" ||
"${{ needs.golangci.result }}" != "success" ||
"${{ needs.test-docker-setup.result }}" != "success" ||
"${{ needs.test-scripts.result }}" != "success" ||
"${{ needs.validate-config.result }}" != "success" ]]; then
echo "CI failed:"
echo " - Go Clients: ${{ needs.test-go-clients.result }}"
echo " - Lint: ${{ needs.golangci.result }}"
echo " - Docker Setup: ${{ needs.test-docker-setup.result }}"
echo " - Scripts: ${{ needs.test-scripts.result }}"
echo " - Config: ${{ needs.validate-config.result }}"
exit 1
fi
echo "All CI checks passed successfully!"
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
.env
.idea/*
identity

# macOS
.DS_Store


# Build binaries
grpc_p2p_client/p2p-client
grpc_p2p_client/p2p-multi-publish
grpc_p2p_client/p2p-multi-subscribe

# Test files
grpc_p2p_client/test-ips.txt
grpc_p2p_client/remote-ips.txt

# Output files
*.txt
!grpc_p2p_client/local.ips.tsv
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ SCRIPTS := ./script/generate-identity.sh ./script/proxy_client.sh ./test_suite.s
.PHONY: $(P2P_CLIENT) $(PROXY_CLIENT) $(KEYGEN_BINARY) setup-scripts

$(P2P_CLIENT):
@cd $(P2P_CLIENT_DIR) && go build -o p2p-client ./p2p_client.go
@cd $(P2P_CLIENT_DIR) && go build -o p2p-client ./cmd/single/
@cd $(P2P_CLIENT_DIR) && go build -o p2p-multi-publish ./cmd/multi-publish/
@cd $(P2P_CLIENT_DIR) && go build -o p2p-multi-subscribe ./cmd/multi-subscribe/

$(PROXY_CLIENT):
@cd $(PROXY_CLIENT_DIR) && go build -o proxy-client ./proxy_client.go
Expand Down
10 changes: 5 additions & 5 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ The `.env.example` file contains:
```bash
BOOTSTRAP_PEER_ID=12D3KooWD5RtEPmMR9Yb2ku5VuxqK7Yj1Y5Gv8DmffJ6Ei8maU44
CLUSTER_ID=docker-dev-cluster
PROXY_VERSION=v0.0.1-rc13
P2P_NODE_VERSION=v0.0.1-rc13
PROXY_VERSION=v0.0.1-rc16
P2P_NODE_VERSION=v0.0.1-rc16
```

**Variables explained:**
Expand Down Expand Up @@ -572,7 +572,7 @@ curl http://localhost:8081/api/v1/version

```json
{
"version": "v0.0.1-rc13",
"version": "v0.0.1-rc16",
"commit_hash": "8f3057d"
}
```
Expand Down Expand Up @@ -1081,7 +1081,7 @@ response:

```json
{
"version": "v0.0.1-rc13",
"version": "v0.0.1-rc16",
"commit_hash": "8f3057d"
}
```
Expand Down Expand Up @@ -1155,7 +1155,7 @@ The client recognizes these OptimumP2P trace events (observed in practice):

#### Implementation Details

The trace parsing is implemented in `grpc_p2p_client/p2p_client.go`:
The trace parsing is implemented in `grpc_p2p_client/shared/utils.go`:

```go
func handleGossipSubTrace(data []byte) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,56 +1,33 @@
package main

import (
"bufio"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"log"
"math"
mathrand "math/rand"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
protobuf "p2p_client/grpc"
)
"p2p_client/shared"

// P2PMessage represents a message structure used in P2P communication
type P2PMessage struct {
MessageID string // Unique identifier for the message
Topic string // Topic name where the message was published
Message []byte // Actual message data
SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes)
}

// Command possible operation that sidecar may perform with p2p node
type Command int32

const (
CommandUnknown Command = iota
CommandPublishData
CommandSubscribeToTopic
CommandUnSubscribeToTopic
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var (
topic = flag.String("topic", "", "topic name")

// optional: number of messages to publish (for stress testing or batch sending)
topic = flag.String("topic", "", "topic name")
count = flag.Int("count", 1, "number of messages to publish")
poisson = flag.Bool("poisson", false, "Enable Poisson arrival")
dataSize = flag.Int("datasize", 100, "size of random of messages to publish")
// optional: sleep duration between publishes
sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)")
ipfile = flag.String("ipfile", "", "file with a list of IP addresses")
startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0")
Expand All @@ -64,7 +41,7 @@ func main() {
log.Fatalf("−topic is required")
}

_ips, err := readIPsFromFile(*ipfile)
_ips, err := shared.ReadIPsFromFile(*ipfile)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
Expand All @@ -84,21 +61,19 @@ func main() {
cancel()
}()

// Buffered channel to prevent blocking
dataCh := make(chan string, 100)
*dataSize = int(float32(*dataSize) / 2.0)
var done chan bool
var wg sync.WaitGroup
// Start writing the has of the published data

if *output != "" {
done = make(chan bool)
go func() {
header := fmt.Sprintf("sender\tsize\tsha256(msg)")
go writeHashToFile(dataCh, done, *output, header)
go shared.WriteToFile(ctx, dataCh, done, *output, header)
}()
}

// Launch goroutines with synchronization
for _, ip := range ips {
wg.Add(1)
go func(ip string) {
Expand All @@ -112,11 +87,9 @@ func main() {
if done != nil {
<-done
}

}

func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error {
// connect with simple gRPC settings
for i := 0; i < *count; i++ {
select {
case <-ctx.Done():
Expand All @@ -135,30 +108,25 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
if err != nil {
log.Fatalf("failed to connect to node %v", err)
}
//defer conn.Close()
println(fmt.Sprintf("Connected to node at: %s…", ip))

client := protobuf.NewCommandStreamClient(conn)

stream, err := client.ListenCommands(ctx)

if err != nil {
log.Fatalf("ListenCommands: %v", err)
}

start := time.Now()
var data []byte
//currentTime := time.Now().UnixNano()
randomBytes := make([]byte, datasize)
if _, err := rand.Read(randomBytes); err != nil {
return fmt.Errorf("[%s] failed to generate random bytes: %w", ip, err)

}

randomSuffix := hex.EncodeToString(randomBytes)
data = []byte(fmt.Sprintf("%s-%s", ip, randomSuffix))
data := []byte(fmt.Sprintf("%s-%s", ip, randomSuffix))
pubReq := &protobuf.Request{
Command: int32(CommandPublishData),
Command: int32(shared.CommandPublishData),
Topic: *topic,
Data: data,
}
Expand All @@ -169,11 +137,10 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
fmt.Printf("Published data size %d\n", len(data))

elapsed := time.Since(start)

hash := sha256.Sum256(data)
hexHashString := hex.EncodeToString(hash[:])
var dataToSend string
if write == true {
if write {
dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString)
dataCh <- dataToSend
}
Expand All @@ -186,95 +153,10 @@ func sendMessages(ctx context.Context, ip string, datasize int, write bool, data
time.Sleep(waitTime)
} else {
time.Sleep(*sleep)

}

conn.Close()
}

return nil

}

func readIPsFromFile(filename string) ([]string, error) {
file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

var ips []string
scanner := bufio.NewScanner(file)

for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
// Skip empty lines and comments
if line == "" || strings.HasPrefix(line, "#") {
continue
}
ips = append(ips, line)
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading file: %w", err)
}

return ips, nil
}

func handleResponse(resp *protobuf.Response, counter *int32) {
switch resp.GetCommand() {
case protobuf.ResponseType_Message:
var p2pMessage P2PMessage
if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil {
log.Printf("Error unmarshalling message: %v", err)
return
}
n := atomic.AddInt32(counter, 1)

currentTime := time.Now().UnixNano()
messageSize := len(p2pMessage.Message)

//fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100])
fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message))
default:
log.Println("Unknown response command:", resp.GetCommand())
}
}

func headHex(b []byte, n int) string {
if len(b) > n {
b = b[:n]
}
return hex.EncodeToString(b)
}

func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) {
file, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()

writer := bufio.NewWriter(file)
defer writer.Flush()

// write the header
if header != "" {
_, err := writer.WriteString(header + "\n")
if err != nil {
log.Printf("Write error: %v", err)
}
}

// Process until channel is closed
for data := range dataCh {
_, err := writer.WriteString(data + "\n")
if err != nil {
log.Printf("Write error: %v", err)
}
}
done <- true
fmt.Println("All data flushed to disk")

}
Loading
Loading