Skip to content

Commit 00b5a37

Browse files
committed
Merge branch 'main' of github.com:AntelopeIO/spring into gh_1401
2 parents 6f6dfed + 1aacb84 commit 00b5a37

File tree

5 files changed

+168
-60
lines changed

5 files changed

+168
-60
lines changed

plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <eosio/chain/controller.hpp>
88
#include <eosio/chain/producer_schedule.hpp>
99

10+
#include <boost/unordered/unordered_flat_set.hpp>
1011
#include <boost/algorithm/string.hpp>
1112
#include <boost/range/adaptor/transformed.hpp>
1213

@@ -27,6 +28,8 @@ class bp_connection_manager {
2728
static constexpr fc::microseconds my_bp_gossip_peer_expiration = fc::minutes(30); // resend my bp_peer info every 30 minutes
2829
static constexpr fc::microseconds bp_gossip_peer_expiration_variance = bp_gossip_peer_expiration + fc::minutes(15);
2930

31+
using address_set_t = boost::unordered_flat_set<std::string>;
32+
3033
gossip_bp_index_t gossip_bps;
3134

3235
struct bp_gossip_endpoint_t {
@@ -471,8 +474,8 @@ class bp_connection_manager {
471474
return false;
472475
}
473476

474-
flat_set<std::string> find_gossip_bp_addresses(const name_set_t& accounts, const char* desc) const {
475-
flat_set<std::string> addresses;
477+
address_set_t find_gossip_bp_addresses(const name_set_t& accounts, const char* desc) const {
478+
address_set_t addresses;
476479
fc::lock_guard g(gossip_bps.mtx);
477480
const auto& prod_idx = gossip_bps.index.get<by_producer>();
478481
for (const auto& account : accounts) {
@@ -489,11 +492,22 @@ class bp_connection_manager {
489492
return addresses;
490493
}
491494

495+
address_set_t all_gossip_bp_addresses(const char* desc) const {
496+
address_set_t addresses;
497+
fc::lock_guard g(gossip_bps.mtx);
498+
const auto& prod_idx = gossip_bps.index.get<by_producer>();
499+
for (auto& i : prod_idx) {
500+
fc_dlog(self()->get_logger(), "${d} gossip bp peer ${p}", ("d", desc)("p", i.server_endpoint()));
501+
addresses.insert(i.server_endpoint());
502+
}
503+
return addresses;
504+
}
505+
492506
// thread-safe
493507
void connect_to_active_bp_peers() {
494508
// do not hold mutexes when calling resolve_and_connect which acquires connections mutex since other threads
495509
// can be holding connections mutex when trying to acquire these mutexes
496-
flat_set<std::string> addresses;
510+
address_set_t addresses;
497511
{
498512
fc::lock_guard gm(mtx);
499513
active_bps = active_bp_accounts(active_schedule);
@@ -524,7 +538,7 @@ class bp_connection_manager {
524538

525539
// do not hold mutexes when calling resolve_and_connect which acquires connections mutex since other threads
526540
// can be holding connections mutex when trying to acquire these mutexes
527-
flat_set<std::string> addresses = find_gossip_bp_addresses(pending_connections, "connect");
541+
address_set_t addresses = find_gossip_bp_addresses(pending_connections, "connect");
528542
for (const auto& add : addresses) {
529543
self()->connections.resolve_and_connect(add, self()->get_first_p2p_address());
530544
}
@@ -573,9 +587,16 @@ class bp_connection_manager {
573587
std::inserter(peers_to_drop, peers_to_drop.end()));
574588
fc_dlog(self()->get_logger(), "peers to drop: ${p}", ("p", to_string(peers_to_drop)));
575589

576-
flat_set<std::string> addresses = find_gossip_bp_addresses(peers_to_drop, "disconnect");
590+
// if we dropped out of active schedule then disconnect from all
591+
bool disconnect_from_all = !config.my_bp_gossip_accounts.empty() &&
592+
std::all_of(config.my_bp_gossip_accounts.begin(), config.my_bp_gossip_accounts.end(),
593+
[&](const auto& e) { return peers_to_drop.contains(e.first); });
594+
595+
address_set_t addresses = disconnect_from_all
596+
? all_gossip_bp_addresses("disconnect")
597+
: find_gossip_bp_addresses(peers_to_drop, "disconnect");
577598
for (const auto& add : addresses) {
578-
self()->connections.disconnect(add);
599+
self()->connections.disconnect_gossip_connection(add);
579600
}
580601

581602
active_schedule_version = schedule.version;

plugins/net_plugin/net_plugin.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ namespace eosio {
365365
string connect(const string& host, const string& p2p_address);
366366
string resolve_and_connect(const string& host, const string& p2p_address);
367367
string disconnect(const string& host);
368+
void disconnect_gossip_connection(const string& host);
368369
void close_all();
369370

370371
std::optional<connection_status> status(const string& host) const;
@@ -4979,6 +4980,19 @@ namespace eosio {
49794980
return true;
49804981
}
49814982

4983+
void connections_manager::disconnect_gossip_connection(const string& host) {
4984+
std::lock_guard g( connections_mtx );
4985+
// do not disconnect if a p2p-peer-address
4986+
if (supplied_peers.contains(host))
4987+
return;
4988+
auto& index = connections.get<by_host>();
4989+
if( auto i = index.find( host ); i != index.end() ) {
4990+
fc_ilog( logger, "disconnecting: ${cid}", ("cid", i->c->connection_id) );
4991+
i->c->close();
4992+
connections.erase(i);
4993+
}
4994+
}
4995+
49824996
// called by API
49834997
string connections_manager::disconnect( const string& host ) {
49844998
std::lock_guard g( connections_mtx );

plugins/net_plugin/tests/auto_bp_peering_unittest.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct mock_connections_manager {
2727
std::vector<std::shared_ptr<mock_connection>> connections;
2828

2929
std::function<void(std::string, std::string)> resolve_and_connect;
30-
std::function<void(std::string)> disconnect;
30+
std::function<void(std::string)> disconnect_gossip_connection;
3131

3232
uint32_t get_max_client_count() const { return max_client_count; }
3333

@@ -186,6 +186,7 @@ BOOST_AUTO_TEST_CASE(test_on_pending_schedule) {
186186
BOOST_TEST(plugin.pending_bps == producers_minus_prodkt);
187187

188188
// all connect to bp peers should be invoked
189+
std::ranges::sort(connected_hosts);
189190
BOOST_CHECK_EQUAL(connected_hosts, peer_addresses);
190191

191192
BOOST_CHECK_EQUAL(plugin.pending_schedule_version, 1u);
@@ -211,7 +212,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule1) {
211212
plugin.connections.resolve_and_connect = [](std::string host, std::string p2p_address) {};
212213

213214
std::vector<std::string> disconnected_hosts;
214-
plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
215+
plugin.connections.disconnect_gossip_connection = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
215216

216217
// make sure nothing happens when it is not in_sync
217218
plugin.lib_catchup = true;
@@ -242,7 +243,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule2) {
242243
plugin.set_active_bps( { "proda"_n, "prodh"_n, "prodn"_n, "prodt"_n } );
243244
plugin.connections.resolve_and_connect = [](std::string host, std::string p2p_address) {};
244245
std::vector<std::string> disconnected_hosts;
245-
plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
246+
plugin.connections.disconnect_gossip_connection = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
246247

247248
// when pending and active schedules are changed simultaneously
248249
plugin.lib_catchup = false;

tests/TestHarness/queries.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,13 @@ def getBlockProducer(self, timeout=None, waitForBlock=True, exitOnError=True, bl
725725
return None
726726
return NodeosQueries.getBlockAttribute(block, "producer", blockNum, exitOnError=exitOnError)
727727

728+
def getProducerSchedule(self):
729+
scheduled_producers = []
730+
schedule = self.processUrllibRequest("chain", "get_producer_schedule")
731+
for prod in schedule["payload"]["active"]["producers"]:
732+
scheduled_producers.append(prod["producer_name"])
733+
return scheduled_producers
734+
728735
def getNextCleanProductionCycle(self, trans):
729736
rounds=21*12*2 # max time to ensure that at least 2/3+1 of producers x blocks per producer x at least 2 times
730737
if trans is not None:

tests/auto_bp_gossip_peering_test.py

Lines changed: 116 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@
33
import copy
44
import signal
55

6-
from TestHarness import Cluster, TestHelper, Utils, WalletMgr, createAccountKeys
6+
from TestHarness import Cluster, TestHelper, Utils, WalletMgr, CORE_SYMBOL, createAccountKeys
77

88
###############################################################
99
# auto_bp_gossip_peering_test
1010
#
11-
# This test sets up a cluster with 21 producers nodeos, each nodeos is configured with only one producer and only
11+
# This test sets up a cluster with 21 producers nodeos, each nodeos is configured with only one producer and only
1212
# connects to the bios node. Moreover, each producer nodeos is also configured with a p2p-bp-gossip-endpoint so that
1313
# each one can automatically establish p2p connections to other bps. Test verifies connections are established when
1414
# producer schedule is active.
1515
#
16+
# Test then changes the producer schedule and verifies that connections change appropriately.
17+
# Also verifies manual connections are maintained and that non-producers disconnect from producers when they are no
18+
# longer in the schedule.
19+
#
1620
###############################################################
1721

1822
Print = Utils.Print
@@ -64,6 +68,14 @@ def getHostName(nodeId):
6468
accounts=createAccountKeys(21)
6569
if accounts is None:
6670
Utils.errorExit("FAILURE - create keys")
71+
voteAccounts=createAccountKeys(5)
72+
if voteAccounts is None:
73+
Utils.errorExit("FAILURE - create keys")
74+
voteAccounts[0].name="tester111111"
75+
voteAccounts[1].name="tester222222"
76+
voteAccounts[2].name="tester333333"
77+
voteAccounts[3].name="tester444444"
78+
voteAccounts[4].name="tester555555"
6779

6880
if walletMgr.launch() is False:
6981
errorExit("Failed to stand up keosd.")
@@ -96,26 +108,45 @@ def getHostName(nodeId):
96108
Print("Creating wallet \"%s\"" % (testWalletName))
97109
walletAccounts=copy.deepcopy(cluster.defProducerAccounts)
98110
testWallet = walletMgr.create(testWalletName, walletAccounts.values())
111+
walletMgr.importKeys(voteAccounts, testWallet)
99112
all_acc = accounts + list( cluster.defProducerAccounts.values() )
100113
for account in all_acc:
101114
Print("Importing keys for account %s into wallet %s." % (account.name, testWallet.name))
102115
if not walletMgr.importKey(account, testWallet):
103116
errorExit("Failed to import key for account %s" % (account.name))
104117

118+
for i in range(0, producerNodes):
119+
node=cluster.getNode(i)
120+
node.producers=Cluster.parseProducers(i)
121+
for prod in node.producers:
122+
trans=cluster.biosNode.regproducer(cluster.defProducerAccounts[prod], "http::/mysite.com", 0,
123+
waitForTransBlock=False, silentErrors=False)
124+
Print("Setup vote accounts so they can vote")
125+
# create accounts via eosio as otherwise a bid is needed
126+
for account in voteAccounts:
127+
Print("Create new account %s via %s" % (account.name, cluster.eosioAccount.name))
128+
trans=cluster.biosNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=False, stakeNet=1000, stakeCPU=1000, buyRAM=1000, exitOnError=True)
129+
cluster.biosNode.waitForTransactionInBlock(trans['transaction_id'])
130+
transferAmount="100000000.0000 {0}".format(CORE_SYMBOL)
131+
Print("Transfer funds %s from account %s to %s" % (transferAmount, cluster.eosioAccount.name, account.name))
132+
trans=cluster.biosNode.transferFunds(cluster.eosioAccount, account, transferAmount, "test transfer", waitForTransBlock=False)
133+
cluster.biosNode.waitForTransactionInBlock(trans['transaction_id'])
134+
trans=cluster.biosNode.delegatebw(account, 20000000.0000, 20000000.0000, waitForTransBlock=False, exitOnError=False)
135+
136+
Print("regpeerkey for all the producers")
105137
for nodeId in range(0, producerNodes):
106138
producer_name = "defproducer" + chr(ord('a') + nodeId)
107139
a = accounts[nodeId]
108140
node = cluster.getNode(nodeId)
109141

110142
success, trans = cluster.biosNode.pushMessage('eosio', 'regpeerkey', f'{{"proposer_finalizer_name":"{producer_name}","key":"{a.activePublicKey}"}}', f'-p {producer_name}@active')
111143
assert(success)
112-
113144
# wait for regpeerkey to be final
114145
for nodeId in range(0, producerNodes):
115146
Utils.Print("Wait for last regpeerkey to be final on ", nodeId)
116147
cluster.getNode(nodeId).waitForTransFinalization(trans['transaction_id'])
117148

118-
# relaunch with p2p-bp-gossip-endpoint
149+
Print("relaunch with p2p-bp-gossip-endpoint to enable BP gossip")
119150
for nodeId in range(0, producerNodes):
120151
Utils.Print(f"Relaunch node {nodeId} with p2p-bp-gossip-endpoint")
121152
node = cluster.getNode(nodeId)
@@ -125,64 +156,98 @@ def getHostName(nodeId):
125156
if not node.relaunch(chainArg=f" --enable-stale-production --p2p-bp-gossip-endpoint {producer_name},{server_address},127.0.0.1"):
126157
errorExit(f"Failed to relaunch node {nodeId}")
127158

128-
# give time for messages to be gossiped around
159+
Print("Wait for messages to be gossiped")
129160
cluster.getNode(producerNodes-1).waitForHeadToAdvance(blocksToAdvance=60)
130161
blockNum = cluster.getNode(0).getBlockNum()
131162
for nodeId in range(0, producerNodes):
132163
Utils.Print(f"Wait for block ${blockNum} on node ", nodeId)
133164
cluster.getNode(nodeId).waitForBlock(blockNum)
134165

135-
# retrieve the producer stable producer schedule
136-
scheduled_producers = []
137-
schedule = cluster.nodes[0].processUrllibRequest("chain", "get_producer_schedule")
138-
for prod in schedule["payload"]["active"]["producers"]:
139-
scheduled_producers.append(prod["producer_name"])
140-
scheduled_producers.sort()
141-
142-
connection_failure = False
143-
for nodeId in range(0, producerNodes):
144-
# retrieve the connections in each node and check if each connects to the other bps in the schedule
145-
connections = cluster.nodes[nodeId].processUrllibRequest("net", "connections")
146-
if Utils.Debug: Utils.Print(f"v1/net/connections: {connections}")
147-
bp_peers = cluster.nodes[nodeId].processUrllibRequest("net", "bp_gossip_peers")
148-
if Utils.Debug: Utils.Print(f"v1/net/bp_gossip_peers: {bp_peers}")
149-
peers = []
150-
for conn in connections["payload"]:
151-
if conn["is_socket_open"] is False:
152-
continue
153-
peer_addr = conn["peer"]
154-
if len(peer_addr) == 0:
155-
if len(conn["last_handshake"]["p2p_address"]) == 0:
166+
def verifyGossipConnections(scheduled_producers):
167+
assert(len(scheduled_producers) > 0)
168+
scheduled_producers.sort()
169+
connection_failure = False
170+
for nodeId in range(0, producerNodes):
171+
name = "defproducer" + chr(ord('a') + nodeId)
172+
if name not in scheduled_producers:
173+
break
174+
# retrieve the connections in each node and check if each connects to the other bps in the schedule
175+
connections = cluster.nodes[nodeId].processUrllibRequest("net", "connections")
176+
if Utils.Debug: Utils.Print(f"v1/net/connections: {connections}")
177+
bp_peers = cluster.nodes[nodeId].processUrllibRequest("net", "bp_gossip_peers")
178+
if Utils.Debug: Utils.Print(f"v1/net/bp_gossip_peers: {bp_peers}")
179+
peers = []
180+
for conn in connections["payload"]:
181+
if conn["is_socket_open"] is False:
156182
continue
157-
peer_addr = conn["last_handshake"]["p2p_address"].split()[0]
158-
if peer_names[peer_addr] != "bios" and peer_addr != getHostName(nodeId):
159-
if conn["is_bp_peer"]:
160-
peers.append(peer_names[peer_addr])
161-
162-
if not peers:
163-
Utils.Print(f"ERROR: found no connected peers for node {nodeId}")
164-
connection_failure = True
165-
break
166-
name = "defproducer" + chr(ord('a') + nodeId)
167-
peers.append(name) # add ourselves so matches schedule_producers
168-
peers = list(set(peers))
169-
peers.sort()
170-
if peers != scheduled_producers:
171-
Utils.Print(f"ERROR: expect {name} has connections to {scheduled_producers}, got connections to {peers}")
172-
connection_failure = True
173-
break
174-
num_peers_found = 0
175-
for p in bp_peers["payload"]:
176-
if p["producer_name"] not in peers:
177-
Utils.Print(f"ERROR: expect bp peer {p} in peer list")
183+
peer_addr = conn["peer"]
184+
if len(peer_addr) == 0:
185+
if len(conn["last_handshake"]["p2p_address"]) == 0:
186+
continue
187+
peer_addr = conn["last_handshake"]["p2p_address"].split()[0]
188+
if peer_names[peer_addr] != "bios" and peer_addr != getHostName(nodeId):
189+
if conn["is_bp_peer"]:
190+
peers.append(peer_names[peer_addr])
191+
192+
if not peers:
193+
Utils.Print(f"ERROR: found no connected peers for node {nodeId}")
178194
connection_failure = True
179195
break
180-
else:
181-
num_peers_found += 1
196+
peers.append(name) # add ourselves so matches schedule_producers
197+
peers = list(set(peers))
198+
peers.sort()
199+
if peers != scheduled_producers:
200+
Utils.Print(f"ERROR: expect {name} has connections to {scheduled_producers}, got connections to {peers}")
201+
connection_failure = True
202+
break
203+
num_peers_found = 0
204+
for p in bp_peers["payload"]:
205+
if p["producer_name"] not in peers:
206+
Utils.Print(f"ERROR: expect bp peer {p} in peer list")
207+
connection_failure = True
208+
break
209+
else:
210+
num_peers_found += 1
211+
212+
assert(num_peers_found == len(peers))
213+
return not connection_failure
214+
215+
Print("Verify gossip connections")
216+
scheduled_producers = cluster.nodes[0].getProducerSchedule()
217+
success = verifyGossipConnections(scheduled_producers)
218+
assert(success)
219+
220+
Print("Manual connect node_03 defproducerd to node_04 defproducere")
221+
cluster.nodes[3].processUrllibRequest("net", "connect", payload="localhost:9880", exitOnError=True)
222+
223+
Print("Set new producers b,h,m,r")
224+
for account in voteAccounts:
225+
trans=cluster.biosNode.vote(account, ["defproducerb", "defproducerh", "defproducerm", "defproducerr"], silentErrors=False, exitOnError=True)
226+
cluster.biosNode.getNextCleanProductionCycle(trans)
227+
228+
Print("Verify new gossip connections")
229+
scheduled_producers = cluster.nodes[0].getProducerSchedule()
230+
Print(f"Scheduled producers: {scheduled_producers}")
231+
assert(len(scheduled_producers) == 4)
232+
assert("defproducerb" in scheduled_producers and "defproducerh" in scheduled_producers and "defproducerm" in scheduled_producers and "defproducerr" in scheduled_producers)
233+
success = verifyGossipConnections(scheduled_producers)
234+
assert(success)
235+
236+
Print("Verify manual connection still connected")
237+
connections = cluster.nodes[3].processUrllibRequest("net", "connections")
238+
if Utils.Debug: Utils.Print(f"v1/net/connections: {connections}")
239+
found = []
240+
for conn in connections["payload"]:
241+
if conn["is_socket_open"] is False:
242+
continue
243+
peer_addr = conn["peer"]
244+
found.append(peer_names[peer_addr])
182245

183-
assert(num_peers_found == len(peers))
246+
Print(f"Found connections of Node_03: {found}")
247+
assert(len(found) == 2)
248+
assert("bios" in found and "defproducere" in found)
184249

185-
testSuccessful = not connection_failure
250+
testSuccessful = success
186251

187252
finally:
188253
TestHelper.shutdown(

0 commit comments

Comments
 (0)