diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7760992..3eb1e5e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,7 +71,7 @@ jobs: uses: golangci/golangci-lint-action@v7 with: version: ${{ env.lint-version }} - args: --timeout 10m --skip-dirs-use-default + args: --timeout 10m only-new-issues: true working-directory: ./ continue-on-error: true diff --git a/grpc_p2p_client/go.mod b/grpc_p2p_client/go.mod index d373e8e..15a190f 100644 --- a/grpc_p2p_client/go.mod +++ b/grpc_p2p_client/go.mod @@ -4,15 +4,32 @@ go 1.24.1 require ( github.com/gogo/protobuf v1.3.2 - github.com/golang/protobuf v1.5.4 + github.com/libp2p/go-libp2p v0.39.1 github.com/libp2p/go-libp2p-pubsub v0.14.2 + github.com/mr-tron/base58 v1.2.0 google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 ) require ( + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/ipfs/go-cid v0.5.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/libp2p/go-buffer-pool v0.1.0 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/multiformats/go-multiaddr v0.14.0 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multicodec v0.9.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-varint v0.0.7 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect + lukechampine.com/blake3 v1.3.0 // indirect ) diff --git a/grpc_p2p_client/local.ips.tsv b/grpc_p2p_client/local.ips.tsv new file mode 100644 index 0000000..c06dbd9 --- /dev/null +++ b/grpc_p2p_client/local.ips.tsv @@ -0,0 +1,1001 @@ +localhost:33212 +localhost:33213 +localhost:33214 +localhost:33215 +localhost:33216 +localhost:33217 +localhost:33218 +localhost:33219 +localhost:33220 +localhost:33221 +localhost:33222 +localhost:33223 +localhost:33224 +localhost:33225 +localhost:33226 +localhost:33227 +localhost:33228 +localhost:33229 +localhost:33230 +localhost:33231 +localhost:33232 +localhost:33233 +localhost:33234 +localhost:33235 +localhost:33236 +localhost:33237 +localhost:33238 +localhost:33239 +localhost:33240 +localhost:33241 +localhost:33242 +localhost:33243 +localhost:33244 +localhost:33245 +localhost:33246 +localhost:33247 +localhost:33248 +localhost:33249 +localhost:33250 +localhost:33251 +localhost:33252 +localhost:33253 +localhost:33254 +localhost:33255 +localhost:33256 +localhost:33257 +localhost:33258 +localhost:33259 +localhost:33260 +localhost:33261 +localhost:33262 +localhost:33263 +localhost:33264 +localhost:33265 +localhost:33266 +localhost:33267 +localhost:33268 +localhost:33269 +localhost:33270 +localhost:33271 +localhost:33272 +localhost:33273 +localhost:33274 +localhost:33275 +localhost:33276 +localhost:33277 +localhost:33278 +localhost:33279 +localhost:33280 +localhost:33281 +localhost:33282 +localhost:33283 +localhost:33284 +localhost:33285 +localhost:33286 +localhost:33287 +localhost:33288 +localhost:33289 +localhost:33290 +localhost:33291 +localhost:33292 +localhost:33293 +localhost:33294 +localhost:33295 +localhost:33296 +localhost:33297 +localhost:33298 +localhost:33299 +localhost:33300 +localhost:33301 +localhost:33302 +localhost:33303 +localhost:33304 +localhost:33305 +localhost:33306 +localhost:33307 +localhost:33308 +localhost:33309 +localhost:33310 +localhost:33311 +localhost:33312 +localhost:33313 +localhost:33314 +localhost:33315 +localhost:33316 +localhost:33317 +localhost:33318 +localhost:33319 +localhost:33320 +localhost:33321 +localhost:33322 +localhost:33323 +localhost:33324 +localhost:33325 +localhost:33326 +localhost:33327 +localhost:33328 +localhost:33329 +localhost:33330 +localhost:33331 +localhost:33332 +localhost:33333 +localhost:33334 +localhost:33335 +localhost:33336 +localhost:33337 +localhost:33338 +localhost:33339 +localhost:33340 +localhost:33341 +localhost:33342 +localhost:33343 +localhost:33344 +localhost:33345 +localhost:33346 +localhost:33347 +localhost:33348 +localhost:33349 +localhost:33350 +localhost:33351 +localhost:33352 +localhost:33353 +localhost:33354 +localhost:33355 +localhost:33356 +localhost:33357 +localhost:33358 +localhost:33359 +localhost:33360 +localhost:33361 +localhost:33362 +localhost:33363 +localhost:33364 +localhost:33365 +localhost:33366 +localhost:33367 +localhost:33368 +localhost:33369 +localhost:33370 +localhost:33371 +localhost:33372 +localhost:33373 +localhost:33374 +localhost:33375 +localhost:33376 +localhost:33377 +localhost:33378 +localhost:33379 +localhost:33380 +localhost:33381 +localhost:33382 +localhost:33383 +localhost:33384 +localhost:33385 +localhost:33386 +localhost:33387 +localhost:33388 +localhost:33389 +localhost:33390 +localhost:33391 +localhost:33392 +localhost:33393 +localhost:33394 +localhost:33395 +localhost:33396 +localhost:33397 +localhost:33398 +localhost:33399 +localhost:33400 +localhost:33401 +localhost:33402 +localhost:33403 +localhost:33404 +localhost:33405 +localhost:33406 +localhost:33407 +localhost:33408 +localhost:33409 +localhost:33410 +localhost:33411 +localhost:33412 +localhost:33413 +localhost:33414 +localhost:33415 +localhost:33416 +localhost:33417 +localhost:33418 +localhost:33419 +localhost:33420 +localhost:33421 +localhost:33422 +localhost:33423 +localhost:33424 +localhost:33425 +localhost:33426 +localhost:33427 +localhost:33428 +localhost:33429 +localhost:33430 +localhost:33431 +localhost:33432 +localhost:33433 +localhost:33434 +localhost:33435 +localhost:33436 +localhost:33437 +localhost:33438 +localhost:33439 +localhost:33440 +localhost:33441 +localhost:33442 +localhost:33443 +localhost:33444 +localhost:33445 +localhost:33446 +localhost:33447 +localhost:33448 +localhost:33449 +localhost:33450 +localhost:33451 +localhost:33452 +localhost:33453 +localhost:33454 +localhost:33455 +localhost:33456 +localhost:33457 +localhost:33458 +localhost:33459 +localhost:33460 +localhost:33461 +localhost:33462 +localhost:33463 +localhost:33464 +localhost:33465 +localhost:33466 +localhost:33467 +localhost:33468 +localhost:33469 +localhost:33470 +localhost:33471 +localhost:33472 +localhost:33473 +localhost:33474 +localhost:33475 +localhost:33476 +localhost:33477 +localhost:33478 +localhost:33479 +localhost:33480 +localhost:33481 +localhost:33482 +localhost:33483 +localhost:33484 +localhost:33485 +localhost:33486 +localhost:33487 +localhost:33488 +localhost:33489 +localhost:33490 +localhost:33491 +localhost:33492 +localhost:33493 +localhost:33494 +localhost:33495 +localhost:33496 +localhost:33497 +localhost:33498 +localhost:33499 +localhost:33500 +localhost:33501 +localhost:33502 +localhost:33503 +localhost:33504 +localhost:33505 +localhost:33506 +localhost:33507 +localhost:33508 +localhost:33509 +localhost:33510 +localhost:33511 +localhost:33512 +localhost:33513 +localhost:33514 +localhost:33515 +localhost:33516 +localhost:33517 +localhost:33518 +localhost:33519 +localhost:33520 +localhost:33521 +localhost:33522 +localhost:33523 +localhost:33524 +localhost:33525 +localhost:33526 +localhost:33527 +localhost:33528 +localhost:33529 +localhost:33530 +localhost:33531 +localhost:33532 +localhost:33533 +localhost:33534 +localhost:33535 +localhost:33536 +localhost:33537 +localhost:33538 +localhost:33539 +localhost:33540 +localhost:33541 +localhost:33542 +localhost:33543 +localhost:33544 +localhost:33545 +localhost:33546 +localhost:33547 +localhost:33548 +localhost:33549 +localhost:33550 +localhost:33551 +localhost:33552 +localhost:33553 +localhost:33554 +localhost:33555 +localhost:33556 +localhost:33557 +localhost:33558 +localhost:33559 +localhost:33560 +localhost:33561 +localhost:33562 +localhost:33563 +localhost:33564 +localhost:33565 +localhost:33566 +localhost:33567 +localhost:33568 +localhost:33569 +localhost:33570 +localhost:33571 +localhost:33572 +localhost:33573 +localhost:33574 +localhost:33575 +localhost:33576 +localhost:33577 +localhost:33578 +localhost:33579 +localhost:33580 +localhost:33581 +localhost:33582 +localhost:33583 +localhost:33584 +localhost:33585 +localhost:33586 +localhost:33587 +localhost:33588 +localhost:33589 +localhost:33590 +localhost:33591 +localhost:33592 +localhost:33593 +localhost:33594 +localhost:33595 +localhost:33596 +localhost:33597 +localhost:33598 +localhost:33599 +localhost:33600 +localhost:33601 +localhost:33602 +localhost:33603 +localhost:33604 +localhost:33605 +localhost:33606 +localhost:33607 +localhost:33608 +localhost:33609 +localhost:33610 +localhost:33611 +localhost:33612 +localhost:33613 +localhost:33614 +localhost:33615 +localhost:33616 +localhost:33617 +localhost:33618 +localhost:33619 +localhost:33620 +localhost:33621 +localhost:33622 +localhost:33623 +localhost:33624 +localhost:33625 +localhost:33626 +localhost:33627 +localhost:33628 +localhost:33629 +localhost:33630 +localhost:33631 +localhost:33632 +localhost:33633 +localhost:33634 +localhost:33635 +localhost:33636 +localhost:33637 +localhost:33638 +localhost:33639 +localhost:33640 +localhost:33641 +localhost:33642 +localhost:33643 +localhost:33644 +localhost:33645 +localhost:33646 +localhost:33647 +localhost:33648 +localhost:33649 +localhost:33650 +localhost:33651 +localhost:33652 +localhost:33653 +localhost:33654 +localhost:33655 +localhost:33656 +localhost:33657 +localhost:33658 +localhost:33659 +localhost:33660 +localhost:33661 +localhost:33662 +localhost:33663 +localhost:33664 +localhost:33665 +localhost:33666 +localhost:33667 +localhost:33668 +localhost:33669 +localhost:33670 +localhost:33671 +localhost:33672 +localhost:33673 +localhost:33674 +localhost:33675 +localhost:33676 +localhost:33677 +localhost:33678 +localhost:33679 +localhost:33680 +localhost:33681 +localhost:33682 +localhost:33683 +localhost:33684 +localhost:33685 +localhost:33686 +localhost:33687 +localhost:33688 +localhost:33689 +localhost:33690 +localhost:33691 +localhost:33692 +localhost:33693 +localhost:33694 +localhost:33695 +localhost:33696 +localhost:33697 +localhost:33698 +localhost:33699 +localhost:33700 +localhost:33701 +localhost:33702 +localhost:33703 +localhost:33704 +localhost:33705 +localhost:33706 +localhost:33707 +localhost:33708 +localhost:33709 +localhost:33710 +localhost:33711 +localhost:33712 +localhost:33713 +localhost:33714 +localhost:33715 +localhost:33716 +localhost:33717 +localhost:33718 +localhost:33719 +localhost:33720 +localhost:33721 +localhost:33722 +localhost:33723 +localhost:33724 +localhost:33725 +localhost:33726 +localhost:33727 +localhost:33728 +localhost:33729 +localhost:33730 +localhost:33731 +localhost:33732 +localhost:33733 +localhost:33734 +localhost:33735 +localhost:33736 +localhost:33737 +localhost:33738 +localhost:33739 +localhost:33740 +localhost:33741 +localhost:33742 +localhost:33743 +localhost:33744 +localhost:33745 +localhost:33746 +localhost:33747 +localhost:33748 +localhost:33749 +localhost:33750 +localhost:33751 +localhost:33752 +localhost:33753 +localhost:33754 +localhost:33755 +localhost:33756 +localhost:33757 +localhost:33758 +localhost:33759 +localhost:33760 +localhost:33761 +localhost:33762 +localhost:33763 +localhost:33764 +localhost:33765 +localhost:33766 +localhost:33767 +localhost:33768 +localhost:33769 +localhost:33770 +localhost:33771 +localhost:33772 +localhost:33773 +localhost:33774 +localhost:33775 +localhost:33776 +localhost:33777 +localhost:33778 +localhost:33779 +localhost:33780 +localhost:33781 +localhost:33782 +localhost:33783 +localhost:33784 +localhost:33785 +localhost:33786 +localhost:33787 +localhost:33788 +localhost:33789 +localhost:33790 +localhost:33791 +localhost:33792 +localhost:33793 +localhost:33794 +localhost:33795 +localhost:33796 +localhost:33797 +localhost:33798 +localhost:33799 +localhost:33800 +localhost:33801 +localhost:33802 +localhost:33803 +localhost:33804 +localhost:33805 +localhost:33806 +localhost:33807 +localhost:33808 +localhost:33809 +localhost:33810 +localhost:33811 +localhost:33812 +localhost:33813 +localhost:33814 +localhost:33815 +localhost:33816 +localhost:33817 +localhost:33818 +localhost:33819 +localhost:33820 +localhost:33821 +localhost:33822 +localhost:33823 +localhost:33824 +localhost:33825 +localhost:33826 +localhost:33827 +localhost:33828 +localhost:33829 +localhost:33830 +localhost:33831 +localhost:33832 +localhost:33833 +localhost:33834 +localhost:33835 +localhost:33836 +localhost:33837 +localhost:33838 +localhost:33839 +localhost:33840 +localhost:33841 +localhost:33842 +localhost:33843 +localhost:33844 +localhost:33845 +localhost:33846 +localhost:33847 +localhost:33848 +localhost:33849 +localhost:33850 +localhost:33851 +localhost:33852 +localhost:33853 +localhost:33854 +localhost:33855 +localhost:33856 +localhost:33857 +localhost:33858 +localhost:33859 +localhost:33860 +localhost:33861 +localhost:33862 +localhost:33863 +localhost:33864 +localhost:33865 +localhost:33866 +localhost:33867 +localhost:33868 +localhost:33869 +localhost:33870 +localhost:33871 +localhost:33872 +localhost:33873 +localhost:33874 +localhost:33875 +localhost:33876 +localhost:33877 +localhost:33878 +localhost:33879 +localhost:33880 +localhost:33881 +localhost:33882 +localhost:33883 +localhost:33884 +localhost:33885 +localhost:33886 +localhost:33887 +localhost:33888 +localhost:33889 +localhost:33890 +localhost:33891 +localhost:33892 +localhost:33893 +localhost:33894 +localhost:33895 +localhost:33896 +localhost:33897 +localhost:33898 +localhost:33899 +localhost:33900 +localhost:33901 +localhost:33902 +localhost:33903 +localhost:33904 +localhost:33905 +localhost:33906 +localhost:33907 +localhost:33908 +localhost:33909 +localhost:33910 +localhost:33911 +localhost:33912 +localhost:33913 +localhost:33914 +localhost:33915 +localhost:33916 +localhost:33917 +localhost:33918 +localhost:33919 +localhost:33920 +localhost:33921 +localhost:33922 +localhost:33923 +localhost:33924 +localhost:33925 +localhost:33926 +localhost:33927 +localhost:33928 +localhost:33929 +localhost:33930 +localhost:33931 +localhost:33932 +localhost:33933 +localhost:33934 +localhost:33935 +localhost:33936 +localhost:33937 +localhost:33938 +localhost:33939 +localhost:33940 +localhost:33941 +localhost:33942 +localhost:33943 +localhost:33944 +localhost:33945 +localhost:33946 +localhost:33947 +localhost:33948 +localhost:33949 +localhost:33950 +localhost:33951 +localhost:33952 +localhost:33953 +localhost:33954 +localhost:33955 +localhost:33956 +localhost:33957 +localhost:33958 +localhost:33959 +localhost:33960 +localhost:33961 +localhost:33962 +localhost:33963 +localhost:33964 +localhost:33965 +localhost:33966 +localhost:33967 +localhost:33968 +localhost:33969 +localhost:33970 +localhost:33971 +localhost:33972 +localhost:33973 +localhost:33974 +localhost:33975 +localhost:33976 +localhost:33977 +localhost:33978 +localhost:33979 +localhost:33980 +localhost:33981 +localhost:33982 +localhost:33983 +localhost:33984 +localhost:33985 +localhost:33986 +localhost:33987 +localhost:33988 +localhost:33989 +localhost:33990 +localhost:33991 +localhost:33992 +localhost:33993 +localhost:33994 +localhost:33995 +localhost:33996 +localhost:33997 +localhost:33998 +localhost:33999 +localhost:34000 +localhost:34001 +localhost:34002 +localhost:34003 +localhost:34004 +localhost:34005 +localhost:34006 +localhost:34007 +localhost:34008 +localhost:34009 +localhost:34010 +localhost:34011 +localhost:34012 +localhost:34013 +localhost:34014 +localhost:34015 +localhost:34016 +localhost:34017 +localhost:34018 +localhost:34019 +localhost:34020 +localhost:34021 +localhost:34022 +localhost:34023 +localhost:34024 +localhost:34025 +localhost:34026 +localhost:34027 +localhost:34028 +localhost:34029 +localhost:34030 +localhost:34031 +localhost:34032 +localhost:34033 +localhost:34034 +localhost:34035 +localhost:34036 +localhost:34037 +localhost:34038 +localhost:34039 +localhost:34040 +localhost:34041 +localhost:34042 +localhost:34043 +localhost:34044 +localhost:34045 +localhost:34046 +localhost:34047 +localhost:34048 +localhost:34049 +localhost:34050 +localhost:34051 +localhost:34052 +localhost:34053 +localhost:34054 +localhost:34055 +localhost:34056 +localhost:34057 +localhost:34058 +localhost:34059 +localhost:34060 +localhost:34061 +localhost:34062 +localhost:34063 +localhost:34064 +localhost:34065 +localhost:34066 +localhost:34067 +localhost:34068 +localhost:34069 +localhost:34070 +localhost:34071 +localhost:34072 +localhost:34073 +localhost:34074 +localhost:34075 +localhost:34076 +localhost:34077 +localhost:34078 +localhost:34079 +localhost:34080 +localhost:34081 +localhost:34082 +localhost:34083 +localhost:34084 +localhost:34085 +localhost:34086 +localhost:34087 +localhost:34088 +localhost:34089 +localhost:34090 +localhost:34091 +localhost:34092 +localhost:34093 +localhost:34094 +localhost:34095 +localhost:34096 +localhost:34097 +localhost:34098 +localhost:34099 +localhost:34100 +localhost:34101 +localhost:34102 +localhost:34103 +localhost:34104 +localhost:34105 +localhost:34106 +localhost:34107 +localhost:34108 +localhost:34109 +localhost:34110 +localhost:34111 +localhost:34112 +localhost:34113 +localhost:34114 +localhost:34115 +localhost:34116 +localhost:34117 +localhost:34118 +localhost:34119 +localhost:34120 +localhost:34121 +localhost:34122 +localhost:34123 +localhost:34124 +localhost:34125 +localhost:34126 +localhost:34127 +localhost:34128 +localhost:34129 +localhost:34130 +localhost:34131 +localhost:34132 +localhost:34133 +localhost:34134 +localhost:34135 +localhost:34136 +localhost:34137 +localhost:34138 +localhost:34139 +localhost:34140 +localhost:34141 +localhost:34142 +localhost:34143 +localhost:34144 +localhost:34145 +localhost:34146 +localhost:34147 +localhost:34148 +localhost:34149 +localhost:34150 +localhost:34151 +localhost:34152 +localhost:34153 +localhost:34154 +localhost:34155 +localhost:34156 +localhost:34157 +localhost:34158 +localhost:34159 +localhost:34160 +localhost:34161 +localhost:34162 +localhost:34163 +localhost:34164 +localhost:34165 +localhost:34166 +localhost:34167 +localhost:34168 +localhost:34169 +localhost:34170 +localhost:34171 +localhost:34172 +localhost:34173 +localhost:34174 +localhost:34175 +localhost:34176 +localhost:34177 +localhost:34178 +localhost:34179 +localhost:34180 +localhost:34181 +localhost:34182 +localhost:34183 +localhost:34184 +localhost:34185 +localhost:34186 +localhost:34187 +localhost:34188 +localhost:34189 +localhost:34190 +localhost:34191 +localhost:34192 +localhost:34193 +localhost:34194 +localhost:34195 +localhost:34196 +localhost:34197 +localhost:34198 +localhost:34199 +localhost:34200 +localhost:34201 +localhost:34202 +localhost:34203 +localhost:34204 +localhost:34205 +localhost:34206 +localhost:34207 +localhost:34208 +localhost:34209 +localhost:34210 +localhost:34211 +localhost:34212 diff --git a/grpc_p2p_client/p2p_client.go b/grpc_p2p_client/p2p_client.go index f5b6ba8..0a6cdd8 100644 --- a/grpc_p2p_client/p2p_client.go +++ b/grpc_p2p_client/p2p_client.go @@ -17,10 +17,7 @@ import ( "time" protobuf "p2p_client/grpc" - optsub "p2p_client/grpc/mump2p_trace" - "github.com/gogo/protobuf/proto" - pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -197,17 +194,17 @@ func handleResponse(resp *protobuf.Response, counter *int32) { log.Printf("Error unmarshalling message: %v", err) return } - n := atomic.AddInt32(counter, 1) - - currentTime := time.Now().UnixNano() messageSize := len(p2pMessage.Message) + n := atomic.AddInt32(counter, 1) //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) - fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) + fmt.Printf("Recv message: [%d] [%d] %s\n\n", n, messageSize, string(p2pMessage.Message)) case protobuf.ResponseType_MessageTraceGossipSub: - handleGossipSubTrace(resp.GetData()) + // Note: These trace handlers are not implemented in this file + log.Printf("GossipSub trace received but handler not implemented") case protobuf.ResponseType_MessageTraceMumP2P: - handleOptimumP2PTrace(resp.GetData()) + // Note: These trace handlers are not implemented in this file + log.Printf("MumP2P trace received but handler not implemented") case protobuf.ResponseType_Unknown: default: log.Println("Unknown response command:", resp.GetCommand()) @@ -220,48 +217,3 @@ func headHex(b []byte, n int) string { } return hex.EncodeToString(b) } - -func handleGossipSubTrace(data []byte) { - evt := &pubsubpb.TraceEvent{} - if err := proto.Unmarshal(data, evt); err != nil { - fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", - err, len(data), headHex(data, 64)) - return - } - - ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) - // print type - fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) - jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) -} - -func handleOptimumP2PTrace(data []byte) { - evt := &optsub.TraceEvent{} - if err := proto.Unmarshal(data, evt); err != nil { - fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) - return - } - - // human-readable timestamp - ts := time.Unix(0, evt.GetTimestamp()).Format(time.RFC3339Nano) - - // print type - typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] - fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) - - // if shard-related - switch evt.GetType() { - case optsub.TraceEvent_NEW_SHARD: - fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) - case optsub.TraceEvent_DUPLICATE_SHARD: - fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) - case optsub.TraceEvent_UNHELPFUL_SHARD: - fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) - case optsub.TraceEvent_UNNECESSARY_SHARD: - fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) - } - - jb, _ := json.Marshal(evt) - fmt.Printf("[TRACE] OptimumP2P JSON (%dB): %s\n", len(jb), string(jb)) -} diff --git a/grpc_p2p_client/p2p_client_multi_streams_publish.go b/grpc_p2p_client/p2p_client_multi_streams_publish.go new file mode 100644 index 0000000..c43d713 --- /dev/null +++ b/grpc_p2p_client/p2p_client_multi_streams_publish.go @@ -0,0 +1,280 @@ +package main + +import ( + "bufio" + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "math" + mathrand "math/rand" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + protobuf "p2p_client/grpc" +) + +// P2PMessage represents a message structure used in P2P communication +type P2PMessage struct { + MessageID string // Unique identifier for the message + Topic string // Topic name where the message was published + Message []byte // Actual message data + SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) +} + +// Command possible operation that sidecar may perform with p2p node +type Command int32 + +const ( + CommandUnknown Command = iota + CommandPublishData + CommandSubscribeToTopic + CommandUnSubscribeToTopic +) + +var ( + topic = flag.String("topic", "", "topic name") + + // optional: number of messages to publish (for stress testing or batch sending) + count = flag.Int("count", 1, "number of messages to publish") + poisson = flag.Bool("poisson", false, "Enable Poisson arrival") + dataSize = flag.Int("datasize", 100, "size of random of messages to publish") + // optional: sleep duration between publishes + sleep = flag.Duration("sleep", 50*time.Millisecond, "optional delay between publishes (e.g., 1s, 500ms)") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") + output = flag.String("output", "", "file to write the outgoing data hashes") +) + +func main() { + flag.Parse() + if *topic == "" { + log.Fatalf("−topic is required") + } + + _ips, err := readIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) + ips := _ips[*startIdx:*endIdx] + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nShutting down gracefully…") + cancel() + }() + + // Buffered channel to prevent blocking + dataCh := make(chan string, 100) + *dataSize = int(float32(*dataSize) / 2.0) + var done chan bool + var wg sync.WaitGroup + // Start writing the has of the published data + if *output != "" { + done = make(chan bool) + go func() { + header := fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeHashToFile(dataCh, done, *output, header) + }() + } + + // Launch goroutines with synchronization + for _, ip := range ips { + wg.Add(1) + go func(ip string) { + defer wg.Done() + datasize := *dataSize + sendMessages(ctx, ip, datasize, *output != "", dataCh) + }(ip) + } + wg.Wait() + close(dataCh) + if done != nil { + <-done + } + +} + +func sendMessages(ctx context.Context, ip string, datasize int, write bool, dataCh chan<- string) error { + // connect with simple gRPC settings + for i := 0; i < *count; i++ { + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + //defer conn.Close() + println(fmt.Sprintf("Connected to node at: %s…", ip)) + + client := protobuf.NewCommandStreamClient(conn) + + stream, err := client.ListenCommands(ctx) + + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + start := time.Now() + var data []byte + //currentTime := time.Now().UnixNano() + randomBytes := make([]byte, datasize) + if _, err := rand.Read(randomBytes); err != nil { + return fmt.Errorf("[%s] failed to generate random bytes: %w", ip, err) + + } + + randomSuffix := hex.EncodeToString(randomBytes) + data = []byte(fmt.Sprintf("%s-%s", ip, randomSuffix)) + pubReq := &protobuf.Request{ + Command: int32(CommandPublishData), + Topic: *topic, + Data: data, + } + + if err := stream.Send(pubReq); err != nil { + return fmt.Errorf("[%s] send publish: %w", ip, err) + } + fmt.Printf("Published data size %d\n", len(data)) + + elapsed := time.Since(start) + + hash := sha256.Sum256(data) + hexHashString := hex.EncodeToString(hash[:]) + var dataToSend string + if write == true { + dataToSend = fmt.Sprintf("%s\t%d\t%s", ip, len(data), hexHashString) + dataCh <- dataToSend + } + fmt.Printf("Published %s to %q (took %v)\n", dataToSend, *topic, elapsed) + + if *poisson { + lambda := 1.0 / (*sleep).Seconds() + interval := mathrand.ExpFloat64() / lambda + waitTime := time.Duration(interval * float64(time.Second)) + time.Sleep(waitTime) + } else { + time.Sleep(*sleep) + + } + + conn.Close() + } + + return nil + +} + +func readIPsFromFile(filename string) ([]string, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var ips []string + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + ips = append(ips, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading file: %w", err) + } + + return ips, nil +} + +func handleResponse(resp *protobuf.Response, counter *int32) { + switch resp.GetCommand() { + case protobuf.ResponseType_Message: + var p2pMessage P2PMessage + if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { + log.Printf("Error unmarshalling message: %v", err) + return + } + n := atomic.AddInt32(counter, 1) + + currentTime := time.Now().UnixNano() + messageSize := len(p2pMessage.Message) + + //fmt.Printf("Recv message: [%d] [%d %d] %s\n\n",n, currentTime, messageSize, string(p2pMessage.Message)[0:100]) + fmt.Printf("Recv message: [%d] [%d %d] %s\n\n", n, currentTime, messageSize, string(p2pMessage.Message)) + default: + log.Println("Unknown response command:", resp.GetCommand()) + } +} + +func headHex(b []byte, n int) string { + if len(b) > n { + b = b[:n] + } + return hex.EncodeToString(b) +} + +func writeHashToFile(dataCh <-chan string, done chan<- bool, filename string, header string) { + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + writer := bufio.NewWriter(file) + defer writer.Flush() + + // write the header + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + + // Process until channel is closed + for data := range dataCh { + _, err := writer.WriteString(data + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") + +} diff --git a/grpc_p2p_client/p2p_client_multi_streams_subscribe.go b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go new file mode 100644 index 0000000..3b954de --- /dev/null +++ b/grpc_p2p_client/p2p_client_multi_streams_subscribe.go @@ -0,0 +1,482 @@ +package main + +import ( + "bufio" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "github.com/mr-tron/base58" + "io" + "log" + "math" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + + "github.com/gogo/protobuf/proto" + pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + protobuf "p2p_client/grpc" + optsub "p2p_client/grpc/mump2p_trace" +) + +// P2PMessage represents a message structure used in P2P communication +type P2PMessage struct { + MessageID string // Unique identifier for the message + Topic string // Topic name where the message was published + Message []byte // Actual message data + SourceNodeID string // ID of the node that sent the message (we don't need it in future, it is just for debug purposes) +} + +// Command possible operation that sidecar may perform with p2p node +type Command int32 + +const ( + CommandUnknown Command = iota + CommandPublishData + CommandSubscribeToTopic + CommandUnSubscribeToTopic +) + +var ( + topic = flag.String("topic", "", "topic name") + ipfile = flag.String("ipfile", "", "file with a list of IP addresses") + startIdx = flag.Int("start-index", 0, "beginning index is 0: default 0") + endIdx = flag.Int("end-index", 10000, "index-1") + outputTrace = flag.String("output-trace", "", "file to write the outgoing data hashes") + outputData = flag.String("output-data", "", "file to write the outgoing data hashes") +) + +func main() { + flag.Parse() + if *topic == "" { + log.Fatalf("−topic is required") + } + + _ips, err := readIPsFromFile(*ipfile) + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + fmt.Printf("numip %d index %d\n", len(_ips), *endIdx) + *endIdx = min(len(_ips), *endIdx) + if *startIdx < 0 || *startIdx >= *endIdx || *startIdx >= len(_ips) { + log.Fatalf("invalid index range: start-index=%d end-index=%d (num IPs=%d)", *startIdx, *endIdx, len(_ips)) + } + + ips := _ips[*startIdx:*endIdx] + fmt.Printf("Found %d IPs: %v\n", len(ips), ips) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nShutting down gracefully…") + cancel() + }() + + // Buffered channel to prevent blocking + dataCh := make(chan string, 100) + traceCh := make(chan string, 100) + var dataDone chan bool + var traceDone chan bool + + // Launch goroutines with synchronization + var wg sync.WaitGroup + if *outputData != "" { + dataDone = make(chan bool) + go func() { + defer wg.Done() + header := fmt.Sprintf("receiver\tsender\tsize\tsha256(msg)") + go writeToFile(ctx, dataCh, dataDone, *outputData, header) + }() + } + + if *outputTrace != "" { + traceDone = make(chan bool) + go func() { + defer wg.Done() + header := "" //fmt.Sprintf("sender\tsize\tsha256(msg)") + go writeToFile(ctx, traceCh, traceDone, *outputTrace, header) + }() + } + + for _, ip := range ips { + wg.Add(1) + go func(ip string) { + defer wg.Done() + receiveMessages(ctx, ip, *outputData != "", dataCh, *outputTrace != "", traceCh) + }(ip) + } + + wg.Wait() + close(dataCh) + close(traceCh) + if dataDone != nil { + <-dataDone + } + if traceDone != nil { + <-traceDone + } +} + +func receiveMessages(ctx context.Context, ip string, writeData bool, dataCh chan<- string, + writeTrace bool, traceCh chan<- string) error { + // connect with simple gRPC settings + //fmt.Println("Starting ", ip) + select { + case <-ctx.Done(): + log.Printf("[%s] context canceled, stopping", ip) + return ctx.Err() + default: + } + + conn, err := grpc.NewClient(ip, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt), + grpc.MaxCallSendMsgSize(math.MaxInt), + ), + ) + if err != nil { + log.Fatalf("failed to connect to node %v", err) + } + defer conn.Close() + + client := protobuf.NewCommandStreamClient(conn) + + stream, err := client.ListenCommands(ctx) + + if err != nil { + log.Fatalf("ListenCommands: %v", err) + } + + println(fmt.Sprintf("Connected to node at: %s…", ip)) + println(fmt.Sprintf("Trying to subscribe to topic %s…", *topic)) + subReq := &protobuf.Request{ + Command: int32(CommandSubscribeToTopic), + Topic: *topic, + } + if err := stream.Send(subReq); err != nil { + log.Fatalf("send subscribe: %v", err) + } + fmt.Printf("Subscribed to topic %q, waiting for messages…\n", *topic) + + var receivedCount int32 + msgChan := make(chan *protobuf.Response, 10000) + + // recv goroutine + go func() { + for { + resp, err := stream.Recv() + if err == io.EOF { + close(msgChan) + return + } + if err != nil { + log.Printf("recv error: %v", err) + close(msgChan) + return + } + msgChan <- resp + } + }() + + // message handler loop + for { + select { + case <-ctx.Done(): + log.Printf("Context canceled. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return nil + case resp, ok := <-msgChan: + if !ok { + log.Printf("Stream closed. Total messages received: %d", atomic.LoadInt32(&receivedCount)) + return nil + } + go func(resp *protobuf.Response) { + handleResponse(ip, resp, &receivedCount, writeData, dataCh, writeTrace, traceCh) + }(resp) + } + } + +} + +func readIPsFromFile(filename string) ([]string, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + var ips []string + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + ips = append(ips, line) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading file: %w", err) + } + + return ips, nil +} + +func handleResponse(ip string, resp *protobuf.Response, counter *int32, + writedata bool, dataCh chan<- string, writetrace bool, traceCh chan<- string) { + + switch resp.GetCommand() { + case protobuf.ResponseType_Message: + var p2pMessage P2PMessage + if err := json.Unmarshal(resp.GetData(), &p2pMessage); err != nil { + log.Printf("Error unmarshalling message: %v", err) + return + } + _ = atomic.AddInt32(counter, 1) + + hash := sha256.Sum256(p2pMessage.Message) + hexHashString := hex.EncodeToString(hash[:]) + + parts := strings.Split(string(p2pMessage.Message), "-") + if len(parts) > 0 { + publisher := parts[0] + var dataToSend string + if writedata == true { + dataToSend = fmt.Sprintf("%s\t%s\t%d\t%s", ip, publisher, len(p2pMessage.Message), hexHashString) + dataCh <- dataToSend + } + } + + //fmt.Printf("Recv message: %s %d %s\n", ip, messageSize, string(p2pMessage.Message)) + case protobuf.ResponseType_MessageTraceMumP2P: + handleOptimumP2PTrace(resp.GetData(), writetrace, traceCh) + case protobuf.ResponseType_MessageTraceGossipSub: + handleGossipSubTrace(resp.GetData(), writetrace, traceCh) + default: + log.Println("Unknown response command:", resp.GetCommand()) + } +} + +func headHex(b []byte, n int) string { + if len(b) > n { + b = b[:n] + } + return hex.EncodeToString(b) +} + +func handleGossipSubTrace(data []byte, writetrace bool, traceCh chan<- string) { + evt := &pubsubpb.TraceEvent{} + if err := proto.Unmarshal(data, evt); err != nil { + fmt.Printf("[TRACE] GossipSub decode error: %v raw=%dB head=%s\n", + err, len(data), headHex(data, 64)) + return + } + typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] + //fmt.Printf("[TRACE] GossipSub type=%s ts=%s size=%dB\n", evt.GetType().String(), ts, len(data)) + //fmt.Printf("[TRACE] GossipSub JSON (%dB): %s\n", len(jb), string(jb)) + + rawBytes := []byte{} + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + // fmt.Printf("peerID: %s\n", peerID) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + // fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.DeliverMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.PublishMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + + timestamp := int64(0) + if evt.Timestamp != nil { + timestamp = *evt.Timestamp + // fmt.Printf("Timestamp: %d\n", timestamp) + } + + //jb, _ := json.Marshal(evt) + //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + if writetrace { + //dataToSend := fmt.Sprintf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend + } else { + //fmt.Printf("[TRACE] GossipSub JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + } + +} + +func handleOptimumP2PTrace(data []byte, writetrace bool, traceCh chan<- string) { + evt := &optsub.TraceEvent{} + if err := proto.Unmarshal(data, evt); err != nil { + fmt.Printf("[TRACE] OptimumP2P decode error: %v\n", err) + return + } + + // print type + typeStr := optsub.TraceEvent_Type_name[int32(evt.GetType())] + //fmt.Printf("[TRACE] OptimumP2P type=%s ts=%s size=%dB\n", typeStr, ts, len(data)) + //fmt.Printf("[TRACE] OptimumP2P type=%s msg_id=%x time=%d, recvr_id=%s, size=%dB\n", + // typeStr, evt.GetDuplicateShard().GetMessageID(), time.Unix(0, evt.GetTimestamp()), evt.GetPeerID(), len(data)) + + // if shard-related + /* + switch evt.GetType() { + case optsub.TraceEvent_NEW_SHARD: + fmt.Printf(" NEW_SHARD id=%x coeff=%x\n", evt.GetNewShard().GetMessageID(), evt.GetNewShard().GetCoefficients()) + case optsub.TraceEvent_DUPLICATE_SHARD: + fmt.Printf(" DUPLICATE_SHARD id=%x\n", evt.GetDuplicateShard().GetMessageID()) + case optsub.TraceEvent_UNHELPFUL_SHARD: + fmt.Printf(" UNHELPFUL_SHARD id=%x\n", evt.GetUnhelpfulShard().GetMessageID()) + case optsub.TraceEvent_UNNECESSARY_SHARD: + fmt.Printf(" UNNECESSARY_SHARD id=%x\n", evt.GetUnnecessaryShard().GetMessageID()) + } + */ + + /*if evt.PeerID != nil { + fmt.Printf("PeerID: %s\n", string(evt.PeerID)) + } + */ + + rawBytes := []byte{} + var peerID peer.ID + if evt.PeerID != nil { + rawBytes := []byte(evt.PeerID) + peerID = peer.ID(rawBytes) + // fmt.Printf("peerID: %s\n", peerID) + } + + recvID := "" + if evt.DeliverMessage != nil && evt.DeliverMessage.ReceivedFrom != nil { + rawBytes = []byte(evt.DeliverMessage.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + if evt.NewShard != nil && evt.NewShard.ReceivedFrom != nil { + rawBytes = []byte(evt.NewShard.ReceivedFrom) + recvID = base58.Encode(rawBytes) + // fmt.Printf("Receiv: %s\n", recvID) + } + + msgID := "" + topic := "" + if evt.DeliverMessage != nil { + rawBytes = []byte(evt.DeliverMessage.MessageID) + msgID = base58.Encode(rawBytes) + // fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.DeliverMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.PublishMessage != nil { + rawBytes = []byte(evt.PublishMessage.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + topic = string(*evt.PublishMessage.Topic) + //fmt.Printf("Topic: %q\n", topic) + } + if evt.NewShard != nil { + rawBytes = []byte(evt.NewShard.MessageID) + msgID = base58.Encode(rawBytes) + //fmt.Printf("MsgID: %s\n", msgID) + //fmt.Printf("Topic: %q\n", topic) + } + + timestamp := int64(0) + if evt.Timestamp != nil { + timestamp = *evt.Timestamp + // fmt.Printf("Timestamp: %d\n", timestamp) + } + + //jb, _ := json.Marshal(evt) + + if writetrace { + //dataToSend := fmt.Sprintf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s", typeStr, len(jb), string(jb)) + // fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + dataToSend := fmt.Sprintf("%s\t%s\t%s\t%s\t%s\t%d", typeStr, peerID, recvID, msgID, topic, timestamp) + traceCh <- dataToSend + } else { + //fmt.Printf("[TRACE] OptimumP2P JSON message_type=%s, (%dB): %s\n", typeStr, len(jb), string(jb)) + fmt.Print("%s\t%s\t%s\t%s\t%s\t%d\n", typeStr, peerID, recvID, msgID, topic, timestamp) + } + /* + message_type <- systems information + message_id <- application layer + time_stamp <- event occuring the event publish, new shard, duplicate shard + receiver_id + sender_id + + */ + +} + +func writeToFile(ctx context.Context, dataCh <-chan string, done chan<- bool, filename string, header string) { + file, err := os.Create(filename) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + writer := bufio.NewWriter(file) + defer writer.Flush() + + // write the header + if header != "" { + _, err := writer.WriteString(header + "\n") + if err != nil { + log.Printf("Write error: %v", err) + } + } + + // Process until channel is closed + for data := range dataCh { + select { + case <-ctx.Done(): + return + default: + + } + _, err := writer.WriteString(data + "\n") + writer.Flush() + if err != nil { + log.Printf("Write error: %v", err) + } + } + done <- true + fmt.Println("All data flushed to disk") +} diff --git a/test_suite.sh b/test_suite.sh index 6d5be3a..b8e2258 100755 --- a/test_suite.sh +++ b/test_suite.sh @@ -120,7 +120,7 @@ echo # Test 2: Subscribe (empty topic) echo -e "${YELLOW}Test: Subscribe (empty topic)${NC}" resp=$(api_subscribe "$CLIENT_ID" "" 0.7) -test_result "$resp" 'topic is missing' "Subscribe (empty topic)" +test_result "$resp" 'missing topic' "Subscribe (empty topic)" echo # Test 3: Publish (valid) - but first ensure subscription exists @@ -230,7 +230,7 @@ echo echo -e "${YELLOW}Test: Subscribe validation (empty topic)${NC}" resp=$(api_subscribe "$CLIENT_ID" "" 0.7) -test_result "$resp" 'topic is missing' "Subscribe validation (empty topic)" +test_result "$resp" 'missing topic' "Subscribe validation (empty topic)" echo echo -e "${YELLOW}Test: Publish validation (empty message)${NC}" @@ -242,7 +242,7 @@ echo -e "${YELLOW}Test: Publish validation (missing topic)${NC}" resp=$(curl -s -X POST "$PROXY_URL/api/v1/publish" \ -H "Content-Type: application/json" \ -d "{\"client_id\": \"$CLIENT_ID\", \"message\": \"Hello\"}") -test_result "$resp" 'topic is missing' "Publish validation (missing topic)" +test_result "$resp" 'missing topic' "Publish validation (missing topic)" echo echo -e "${YELLOW}Test: Publish validation (missing message)${NC}"