Skip to content
Draft
2 changes: 1 addition & 1 deletion lib/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (desoBlockProducer *DeSoBlockProducer) _getBlockTemplate(publicKey []byte)
return nil, nil, nil, errors.Wrapf(err, "DeSoBlockProducer._getBlockTemplate: Problem computing next difficulty: ")
}

glog.Infof("Produced block with %v txns with approx %v total txns in mempool",
glog.V(1).Infof("Produced block with %v txns with approx %v total txns in mempool",
len(blockRet.Txns), len(desoBlockProducer.mempool.readOnlyUniversalTransactionList))
return blockRet, diffTarget, lastNode, nil
}
Expand Down
2 changes: 1 addition & 1 deletion lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures
if *bc.blockView.TipHash != *currentTip.Hash {
//return false, false, fmt.Errorf("ProcessBlock: Tip hash for utxo view (%v) is "+
// "not the current tip hash (%v)", utxoView.TipHash, currentTip.Hash)
glog.Infof("ProcessBlock: Tip hash for utxo view (%v) is "+
glog.V(1).Infof("ProcessBlock: Tip hash for utxo view (%v) is "+
"not the current tip hash (%v)", bc.blockView.TipHash, currentTip.Hash)
}

Expand Down
14 changes: 7 additions & 7 deletions lib/legacy_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
allTxns := mp.readOnlyUniversalTransactionList

tempMempoolDBDir := filepath.Join(mp.mempoolDir, "temp_mempool_dump")
glog.Infof("OpenTempDBAndDumpTxns: Opening new temp db %v", tempMempoolDBDir)
glog.V(1).Infof("OpenTempDBAndDumpTxns: Opening new temp db %v", tempMempoolDBDir)
// Make the top-level folder if it doesn't exist.
err := MakeDirIfNonExistent(mp.mempoolDir)
if err != nil {
Expand All @@ -817,7 +817,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
// If we're at a multiple of 1k or we're at the end of the list
// then dump the txns to disk
if len(txnsToDump)%1000 == 0 || ii == len(allTxns)-1 {
glog.Infof("OpenTempDBAndDumpTxns: Dumping txns %v to %v", ii-len(txnsToDump)+1, ii)
glog.V(1).Infof("OpenTempDBAndDumpTxns: Dumping txns %v to %v", ii-len(txnsToDump)+1, ii)
err := tempMempoolDB.Update(func(txn *badger.Txn) error {
return FlushMempoolToDbWithTxn(txn, nil, blockHeight, txnsToDump, mp.bc.eventManager)
})
Expand All @@ -828,7 +828,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
}
}
endTime := time.Now()
glog.Infof("OpenTempDBAndDumpTxns: Full txn dump of %v txns completed "+
glog.V(1).Infof("OpenTempDBAndDumpTxns: Full txn dump of %v txns completed "+
"in %v seconds. Safe to reboot node", len(allTxns), endTime.Sub(startTime).Seconds())
return nil
}
Expand Down Expand Up @@ -2606,7 +2606,7 @@ func (mp *DeSoMempool) InefficientRemoveTransaction(tx *MsgDeSoTxn) {
}

func (mp *DeSoMempool) StartReadOnlyUtxoViewRegenerator() {
glog.Info("Calling StartReadOnlyUtxoViewRegenerator...")
glog.V(1).Info("Calling StartReadOnlyUtxoViewRegenerator...")

go func() {
var oldSeqNum int64
Expand Down Expand Up @@ -2697,7 +2697,7 @@ func (mp *DeSoMempool) StartMempoolDBDumper() {
for {
select {
case <-time.After(30 * time.Second):
glog.Info("StartMempoolDBDumper: Waking up! Dumping txns now...")
glog.V(1).Info("StartMempoolDBDumper: Waking up! Dumping txns now...")

// Dump the txns and time it.
mp.DumpTxnsToDB()
Expand Down Expand Up @@ -2734,7 +2734,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() {
// If we make it this far, we found a mempool dump to load. Woohoo!
tempMempoolDBOpts := mp.getBadgerOptions(savedTxnsDir)
tempMempoolDBOpts.ValueDir = savedTxnsDir
glog.Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir)
glog.V(1).Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir)
tempMempoolDB, err := badger.Open(tempMempoolDBOpts)
if err != nil {
glog.Infof("LoadTxnsFrom: Could not open temp db to dump mempool: %v", err)
Expand All @@ -2759,7 +2759,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() {
}
}
endTime := time.Now()
glog.Infof("LoadTxnsFromDB: Loaded %v txns in %v seconds", len(dbMempoolTxnsOrderedByTime), endTime.Sub(startTime).Seconds())
glog.V(1).Infof("LoadTxnsFromDB: Loaded %v txns in %v seconds", len(dbMempoolTxnsOrderedByTime), endTime.Sub(startTime).Seconds())
}

func (mp *DeSoMempool) Stop() {
Expand Down
2 changes: 1 addition & 1 deletion lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (pp *Peer) HandleInv(msg *MsgDeSoInv) {
// Ignore invs while we're still syncing and before we've requested
// all mempool transactions from one of our peers to bootstrap.
if pp.srv.blockchain.isSyncing() {
glog.Infof("Server._handleInv: Ignoring INV while syncing from Peer %v", pp)
glog.V(1).Infof("Server._handleInv: Ignoring INV while syncing from Peer %v", pp)
return
}

Expand Down
75 changes: 57 additions & 18 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,13 +1442,16 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
msg.SnapshotChunk[0].Key, msg.SnapshotChunk[len(msg.SnapshotChunk)-1].Key, len(msg.SnapshotChunk),
msg.SnapshotMetadata, msg.SnapshotChunk[0].IsEmpty(), pp)))
// Free up a slot in the operationQueueSemaphore, now that a chunk has been processed.
srv.snapshot.FreeOperationQueueSemaphore()
// srv.snapshot.FreeOperationQueueSemaphore()

// There is a possibility that during hypersync the network entered a new snapshot epoch. We handle this case by
// restarting the node and starting hypersync from scratch.
if msg.SnapshotMetadata.SnapshotBlockHeight > srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight &&
uint64(srv.blockchain.HeaderTip().Height) >= msg.SnapshotMetadata.SnapshotBlockHeight {

// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// TODO: Figure out how to handle header not reaching us, yet peer is telling us that the new epoch has started.
if srv.nodeMessageChannel != nil {
srv.nodeMessageChannel <- NodeRestart
Expand All @@ -1467,6 +1470,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
if msg.SnapshotMetadata.SnapshotBlockHeight != srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight ||
!bytes.Equal(msg.SnapshotMetadata.CurrentEpochBlockHash[:], srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochBlockHash[:]) {

// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
glog.Errorf("srv._handleSnapshot: blockheight (%v) and blockhash (%v) in msg do not match the expected "+
"hyper sync height (%v) and hash (%v)",
msg.SnapshotMetadata.SnapshotBlockHeight, msg.SnapshotMetadata.CurrentEpochBlockHash,
Expand All @@ -1485,20 +1491,27 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
}
// If peer sent a message with an incorrect prefix, we should disconnect them.
if syncPrefixProgress == nil {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// We should disconnect the peer because he is misbehaving
glog.Errorf("srv._handleSnapshot: Problem finding appropriate sync prefix progress "+
"disconnecting misbehaving peer (%v)", pp)
pp.Disconnect("handleSnapshot: Problem finding appropriate sync prefix progress")
return
}

// TODO: disable checksum support?
// If we haven't yet set the epoch checksum bytes in the hyper sync progress, we'll do it now.
// If we did set the checksum bytes, we will verify that they match the one that peer has sent us.
prevChecksumBytes := make([]byte, len(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes))
copy(prevChecksumBytes, srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes[:])
if len(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes) == 0 {
srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = msg.SnapshotMetadata.CurrentEpochChecksumBytes
} else if !reflect.DeepEqual(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes, msg.SnapshotMetadata.CurrentEpochChecksumBytes) {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// We should disconnect the peer because he is misbehaving
glog.Errorf("srv._handleSnapshot: HyperSyncProgress epoch checksum bytes does not match that received from peer, "+
"disconnecting misbehaving peer (%v)", pp)
Expand All @@ -1521,6 +1534,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
// If this is the first message that we're receiving for this sync progress, the first entry in the chunk
// is going to be equal to the prefix.
if !bytes.HasPrefix(msg.SnapshotChunk[0].Key, msg.Prefix) {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// We should disconnect the peer because he is misbehaving.
glog.Errorf("srv._handleSnapshot: Snapshot chunk DBEntry key has mismatched prefix "+
"disconnecting misbehaving peer (%v)", pp)
Expand All @@ -1534,6 +1550,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
// should be identical to the first key in snapshot chunk. If it is not, then the peer either re-sent
// the same payload twice, a message was dropped by the network, or he is misbehaving.
if !bytes.Equal(syncPrefixProgress.LastReceivedKey, msg.SnapshotChunk[0].Key) {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
glog.Errorf("srv._handleSnapshot: Received a snapshot chunk that's not in-line with the sync progress "+
"disconnecting misbehaving peer (%v)", pp)
srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = prevChecksumBytes
Expand All @@ -1550,6 +1569,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
for ii := 1; ii < len(dbChunk); ii++ {
// Make sure that all dbChunk entries have the same prefix as in the message.
if !bytes.HasPrefix(dbChunk[ii].Key, msg.Prefix) {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// We should disconnect the peer because he is misbehaving
glog.Errorf("srv._handleSnapshot: DBEntry key has mismatched prefix "+
"disconnecting misbehaving peer (%v)", pp)
Expand All @@ -1559,6 +1581,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
}
// Make sure that the dbChunk is sorted increasingly.
if bytes.Compare(dbChunk[ii-1].Key, dbChunk[ii].Key) != -1 {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// We should disconnect the peer because he is misbehaving
glog.Errorf("srv._handleSnapshot: dbChunk entries are not sorted: first entry at index (%v) with "+
"value (%v) and second entry with index (%v) and value (%v) disconnecting misbehaving peer (%v)",
Expand All @@ -1574,6 +1599,10 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
srv.snapshot.ProcessSnapshotChunk(srv.blockchain.db, &srv.blockchain.ChainLock, dbChunk,
srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight)
srv.timer.End("Server._handleSnapshot Process Snapshot")
} else {
// Free up a slot in the operationQueueSemaphore, since we had added one when we
// requested the snapshot chunk, but didn't end up calling ProcessSnapshotChunk.
srv.snapshot.FreeOperationQueueSemaphore()
}

// We will update the hyper sync progress tracker struct to reflect the newly added snapshot chunk.
Expand Down Expand Up @@ -1659,6 +1688,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
"attempt to HyperSync from the beginning. Local db checksum %v; peer's snapshot checksum %v",
checksumBytes, srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes)))
if srv.forceChecksum {
// Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed
// by processing the snapshot chunk.
srv.snapshot.FreeOperationQueueSemaphore()
// If forceChecksum is true we signal an erasure of the state and return here,
// which will cut off the sync.
if srv.nodeMessageChannel != nil {
Expand All @@ -1680,6 +1712,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
//
// We split the db update into batches of 10,000 block nodes to avoid a single transaction
// being too large and possibly causing an error in badger.
glog.V(0).Infof("Server._handleSnapshot: Updating snapshot block nodes in the database")
var blockNodeBatch []*BlockNode
for ii := uint64(1); ii <= srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight; ii++ {
currentNode := srv.blockchain.bestHeaderChain[ii]
Expand Down Expand Up @@ -2238,39 +2271,39 @@ func (srv *Server) _logAndDisconnectPeer(pp *Peer, blockMsg *MsgDeSoBlock, suffi
// isLastBlock indicates that this is the last block in the list of blocks we received back
// via a MsgDeSoBlockBundle message. When we receive a single block, isLastBlock will automatically
// be true, which will give it its old single-block behavior.
func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) error {
srv.timer.Start("Server._handleBlock: General")

// Pull out the header for easy access.
blockHeader := blk.Header
if blockHeader == nil {
// Should never happen but check it nevertheless.
srv._logAndDisconnectPeer(pp, blk, "Header was nil")
return
return fmt.Errorf("Server._handleBlock: Header was nil")
}

// If we've set a maximum sync height and we've reached that height, then we will
// stop accepting new blocks.
blockTip := srv.blockchain.blockTip()
if srv.blockchain.isTipMaxed(blockTip) && blockHeader.Height > uint64(blockTip.Height) {
glog.Infof("Server._handleBlock: Exiting because block tip is maxed out")
return
return nil
}

// Compute the hash of the block. If the hash computation fails, then we log an error and
// disconnect from the peer. The block is obviously bad.
blockHash, err := blk.Header.Hash()
if err != nil {
srv._logAndDisconnectPeer(pp, blk, "Problem computing block hash")
return
return errors.Wrap(err, "Server._handleBlock: Problem computing block hash")
}

// Unless we're running a PoS validator, we should not expect to see a block that we did not request. If
// we see such a block, then we log an error and disconnect from the peer.
_, isRequestedBlock := pp.requestedBlocks[*blockHash]
if srv.fastHotStuffConsensus == nil && !isRequestedBlock {
srv._logAndDisconnectPeer(pp, blk, "Getting a block that we haven't requested before")
return
return fmt.Errorf("Server._handleBlock: Getting a block that we haven't requested before")
}

// Delete the block from the requested blocks map. We do this whether the block was requested or not.
Expand All @@ -2287,7 +2320,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
blk.BlockProducerInfo.PublicKey)]
if entryExists {
srv._logAndDisconnectPeer(pp, blk, "Got forbidden block signature public key.")
return
return fmt.Errorf("Server._handleBlock: Got forbidden block signature public key.")
}
}
}
Expand All @@ -2302,7 +2335,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
pp, srv.blockchain.chainState(), blk.Header.Height,
srv.blockchain.GetCheckpointBlockInfo().Hash.String())
pp.Disconnect("Mismatch between received header height and checkpoint block info")
return
return fmt.Errorf("Server._handleHeaderBundle: Mismatch between received header height and checkpoint block info")
}

var isOrphan bool
Expand Down Expand Up @@ -2352,11 +2385,11 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
// bad block proposer signature or it has a bad QC. In either case, we should
// disconnect the peer.
srv._logAndDisconnectPeer(pp, blk, errors.Wrapf(err, "Error while processing block at height %v: ", blk.Header.Height).Error())
return
return errors.Wrapf(err, "Server._handleBlock: Error while processing block at height %v: ", blk.Header.Height)
} else {
// For any other error, we log the error and continue.
glog.Errorf("Server._handleBlock: Error while processing block at height %v: %v", blk.Header.Height, err)
return
return errors.Wrapf(err, "Server._handleBlock: Error while processing block at height %v: ", blk.Header.Height)
}
}

Expand All @@ -2369,7 +2402,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
// relevant after we've connected the last block, and it generally involves fetching
// more data from our peer.
if !isLastBlock {
return
return nil
}

if isOrphan {
Expand All @@ -2395,9 +2428,11 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
} else {
// If we don't have any blocks to request, then we disconnect from the peer.
srv._logAndDisconnectPeer(pp, blk, "Received orphan block")
return fmt.Errorf("Server._handleBlock: Received unexpected orphan block")
}

return
// We're done processing the orphan block and we don't return an error.
return nil
}

// We shouldn't be receiving blocks while syncing headers, but we can end up here
Expand All @@ -2415,13 +2450,13 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
glog.V(1).Infof("Server._handleHeaderBundle: *Syncing* headers for blocks starting at "+
"header tip %v from peer %v",
srv.blockchain.HeaderTip(), pp)
return
return nil
}

if srv.blockchain.chainState() == SyncStateSyncingHistoricalBlocks {
srv.GetBlocksToStore(pp)
if srv.blockchain.downloadingHistoricalBlocks {
return
return nil
}
}

Expand All @@ -2435,7 +2470,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
// we're syncing.
maxHeight := -1
srv.RequestBlocksUpToHeight(pp, maxHeight)
return
return nil
}

if srv.blockchain.chainState() == SyncStateNeedBlocksss {
Expand All @@ -2456,7 +2491,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {
StopHash: &BlockHash{},
BlockLocator: locator,
}, false)
return
return nil
}

// If we get here, it means we're in SyncStateFullyCurrent, which is great.
Expand All @@ -2465,11 +2500,12 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) {

// Exit early if the chain isn't SyncStateFullyCurrent.
if srv.blockchain.chainState() != SyncStateFullyCurrent {
return
return nil
}

// If the chain is current, then try to transition to the FastHotStuff consensus.
srv.tryTransitionToFastHotStuffConsensus()
return nil
}

func (srv *Server) _handleBlockBundle(pp *Peer, bundle *MsgDeSoBlockBundle) {
Expand Down Expand Up @@ -2500,7 +2536,10 @@ func (srv *Server) _handleBlockBundle(pp *Peer, bundle *MsgDeSoBlockBundle) {
// _handleBlock is a legacy function that doesn't support erroring out. It's not a big deal
// though as we'll just connect all the blocks after the failed one and those blocks will also
// gracefully fail.
srv._handleBlock(pp, blk, ii == len(bundle.Blocks)-1 /*isLastBlock*/)
if err := srv._handleBlock(pp, blk, ii == len(bundle.Blocks)-1 /*isLastBlock*/); err != nil {
glog.Errorf("Server._handleBlockBundle: Problem processing block %v: %v", blk, err)
return
}
numLogBlocks := 100
if srv.params.IsPoSBlockHeight(blk.Header.Height) ||
srv.params.NetworkType == NetworkType_TESTNET {
Expand Down
Loading