diff --git a/lib/block_producer.go b/lib/block_producer.go index c54f6771e..838f34bd5 100644 --- a/lib/block_producer.go +++ b/lib/block_producer.go @@ -374,7 +374,7 @@ func (desoBlockProducer *DeSoBlockProducer) _getBlockTemplate(publicKey []byte) return nil, nil, nil, errors.Wrapf(err, "DeSoBlockProducer._getBlockTemplate: Problem computing next difficulty: ") } - glog.Infof("Produced block with %v txns with approx %v total txns in mempool", + glog.V(1).Infof("Produced block with %v txns with approx %v total txns in mempool", len(blockRet.Txns), len(desoBlockProducer.mempool.readOnlyUniversalTransactionList)) return blockRet, diffTarget, lastNode, nil } diff --git a/lib/blockchain.go b/lib/blockchain.go index e4796ec54..4fce6b8b4 100644 --- a/lib/blockchain.go +++ b/lib/blockchain.go @@ -2575,7 +2575,7 @@ func (bc *Blockchain) processBlockPoW(desoBlock *MsgDeSoBlock, verifySignatures if *bc.blockView.TipHash != *currentTip.Hash { //return false, false, fmt.Errorf("ProcessBlock: Tip hash for utxo view (%v) is "+ // "not the current tip hash (%v)", utxoView.TipHash, currentTip.Hash) - glog.Infof("ProcessBlock: Tip hash for utxo view (%v) is "+ + glog.V(1).Infof("ProcessBlock: Tip hash for utxo view (%v) is "+ "not the current tip hash (%v)", bc.blockView.TipHash, currentTip.Hash) } diff --git a/lib/legacy_mempool.go b/lib/legacy_mempool.go index f195293a9..2c9586601 100644 --- a/lib/legacy_mempool.go +++ b/lib/legacy_mempool.go @@ -792,7 +792,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error { allTxns := mp.readOnlyUniversalTransactionList tempMempoolDBDir := filepath.Join(mp.mempoolDir, "temp_mempool_dump") - glog.Infof("OpenTempDBAndDumpTxns: Opening new temp db %v", tempMempoolDBDir) + glog.V(1).Infof("OpenTempDBAndDumpTxns: Opening new temp db %v", tempMempoolDBDir) // Make the top-level folder if it doesn't exist. err := MakeDirIfNonExistent(mp.mempoolDir) if err != nil { @@ -817,7 +817,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error { // If we're at a multiple of 1k or we're at the end of the list // then dump the txns to disk if len(txnsToDump)%1000 == 0 || ii == len(allTxns)-1 { - glog.Infof("OpenTempDBAndDumpTxns: Dumping txns %v to %v", ii-len(txnsToDump)+1, ii) + glog.V(1).Infof("OpenTempDBAndDumpTxns: Dumping txns %v to %v", ii-len(txnsToDump)+1, ii) err := tempMempoolDB.Update(func(txn *badger.Txn) error { return FlushMempoolToDbWithTxn(txn, nil, blockHeight, txnsToDump, mp.bc.eventManager) }) @@ -828,7 +828,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error { } } endTime := time.Now() - glog.Infof("OpenTempDBAndDumpTxns: Full txn dump of %v txns completed "+ + glog.V(1).Infof("OpenTempDBAndDumpTxns: Full txn dump of %v txns completed "+ "in %v seconds. Safe to reboot node", len(allTxns), endTime.Sub(startTime).Seconds()) return nil } @@ -2606,7 +2606,7 @@ func (mp *DeSoMempool) InefficientRemoveTransaction(tx *MsgDeSoTxn) { } func (mp *DeSoMempool) StartReadOnlyUtxoViewRegenerator() { - glog.Info("Calling StartReadOnlyUtxoViewRegenerator...") + glog.V(1).Info("Calling StartReadOnlyUtxoViewRegenerator...") go func() { var oldSeqNum int64 @@ -2697,7 +2697,7 @@ func (mp *DeSoMempool) StartMempoolDBDumper() { for { select { case <-time.After(30 * time.Second): - glog.Info("StartMempoolDBDumper: Waking up! Dumping txns now...") + glog.V(1).Info("StartMempoolDBDumper: Waking up! Dumping txns now...") // Dump the txns and time it. mp.DumpTxnsToDB() @@ -2734,7 +2734,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() { // If we make it this far, we found a mempool dump to load. Woohoo! tempMempoolDBOpts := mp.getBadgerOptions(savedTxnsDir) tempMempoolDBOpts.ValueDir = savedTxnsDir - glog.Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir) + glog.V(1).Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir) tempMempoolDB, err := badger.Open(tempMempoolDBOpts) if err != nil { glog.Infof("LoadTxnsFrom: Could not open temp db to dump mempool: %v", err) @@ -2759,7 +2759,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() { } } endTime := time.Now() - glog.Infof("LoadTxnsFromDB: Loaded %v txns in %v seconds", len(dbMempoolTxnsOrderedByTime), endTime.Sub(startTime).Seconds()) + glog.V(1).Infof("LoadTxnsFromDB: Loaded %v txns in %v seconds", len(dbMempoolTxnsOrderedByTime), endTime.Sub(startTime).Seconds()) } func (mp *DeSoMempool) Stop() { diff --git a/lib/peer.go b/lib/peer.go index 9fda1bc7b..40492b96e 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -385,7 +385,7 @@ func (pp *Peer) HandleInv(msg *MsgDeSoInv) { // Ignore invs while we're still syncing and before we've requested // all mempool transactions from one of our peers to bootstrap. if pp.srv.blockchain.isSyncing() { - glog.Infof("Server._handleInv: Ignoring INV while syncing from Peer %v", pp) + glog.V(1).Infof("Server._handleInv: Ignoring INV while syncing from Peer %v", pp) return } diff --git a/lib/server.go b/lib/server.go index 5b1939b9c..921f2dbcb 100644 --- a/lib/server.go +++ b/lib/server.go @@ -1442,13 +1442,16 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { msg.SnapshotChunk[0].Key, msg.SnapshotChunk[len(msg.SnapshotChunk)-1].Key, len(msg.SnapshotChunk), msg.SnapshotMetadata, msg.SnapshotChunk[0].IsEmpty(), pp))) // Free up a slot in the operationQueueSemaphore, now that a chunk has been processed. - srv.snapshot.FreeOperationQueueSemaphore() + // srv.snapshot.FreeOperationQueueSemaphore() // There is a possibility that during hypersync the network entered a new snapshot epoch. We handle this case by // restarting the node and starting hypersync from scratch. if msg.SnapshotMetadata.SnapshotBlockHeight > srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight && uint64(srv.blockchain.HeaderTip().Height) >= msg.SnapshotMetadata.SnapshotBlockHeight { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // TODO: Figure out how to handle header not reaching us, yet peer is telling us that the new epoch has started. if srv.nodeMessageChannel != nil { srv.nodeMessageChannel <- NodeRestart @@ -1467,6 +1470,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { if msg.SnapshotMetadata.SnapshotBlockHeight != srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight || !bytes.Equal(msg.SnapshotMetadata.CurrentEpochBlockHash[:], srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochBlockHash[:]) { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() glog.Errorf("srv._handleSnapshot: blockheight (%v) and blockhash (%v) in msg do not match the expected "+ "hyper sync height (%v) and hash (%v)", msg.SnapshotMetadata.SnapshotBlockHeight, msg.SnapshotMetadata.CurrentEpochBlockHash, @@ -1485,6 +1491,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { } // If peer sent a message with an incorrect prefix, we should disconnect them. if syncPrefixProgress == nil { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // We should disconnect the peer because he is misbehaving glog.Errorf("srv._handleSnapshot: Problem finding appropriate sync prefix progress "+ "disconnecting misbehaving peer (%v)", pp) @@ -1492,6 +1501,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { return } + // TODO: disable checksum support? // If we haven't yet set the epoch checksum bytes in the hyper sync progress, we'll do it now. // If we did set the checksum bytes, we will verify that they match the one that peer has sent us. prevChecksumBytes := make([]byte, len(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes)) @@ -1499,6 +1509,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { if len(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes) == 0 { srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = msg.SnapshotMetadata.CurrentEpochChecksumBytes } else if !reflect.DeepEqual(srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes, msg.SnapshotMetadata.CurrentEpochChecksumBytes) { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // We should disconnect the peer because he is misbehaving glog.Errorf("srv._handleSnapshot: HyperSyncProgress epoch checksum bytes does not match that received from peer, "+ "disconnecting misbehaving peer (%v)", pp) @@ -1521,6 +1534,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { // If this is the first message that we're receiving for this sync progress, the first entry in the chunk // is going to be equal to the prefix. if !bytes.HasPrefix(msg.SnapshotChunk[0].Key, msg.Prefix) { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // We should disconnect the peer because he is misbehaving. glog.Errorf("srv._handleSnapshot: Snapshot chunk DBEntry key has mismatched prefix "+ "disconnecting misbehaving peer (%v)", pp) @@ -1534,6 +1550,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { // should be identical to the first key in snapshot chunk. If it is not, then the peer either re-sent // the same payload twice, a message was dropped by the network, or he is misbehaving. if !bytes.Equal(syncPrefixProgress.LastReceivedKey, msg.SnapshotChunk[0].Key) { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() glog.Errorf("srv._handleSnapshot: Received a snapshot chunk that's not in-line with the sync progress "+ "disconnecting misbehaving peer (%v)", pp) srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = prevChecksumBytes @@ -1550,6 +1569,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { for ii := 1; ii < len(dbChunk); ii++ { // Make sure that all dbChunk entries have the same prefix as in the message. if !bytes.HasPrefix(dbChunk[ii].Key, msg.Prefix) { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // We should disconnect the peer because he is misbehaving glog.Errorf("srv._handleSnapshot: DBEntry key has mismatched prefix "+ "disconnecting misbehaving peer (%v)", pp) @@ -1559,6 +1581,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { } // Make sure that the dbChunk is sorted increasingly. if bytes.Compare(dbChunk[ii-1].Key, dbChunk[ii].Key) != -1 { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // We should disconnect the peer because he is misbehaving glog.Errorf("srv._handleSnapshot: dbChunk entries are not sorted: first entry at index (%v) with "+ "value (%v) and second entry with index (%v) and value (%v) disconnecting misbehaving peer (%v)", @@ -1574,6 +1599,10 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { srv.snapshot.ProcessSnapshotChunk(srv.blockchain.db, &srv.blockchain.ChainLock, dbChunk, srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight) srv.timer.End("Server._handleSnapshot Process Snapshot") + } else { + // Free up a slot in the operationQueueSemaphore, since we had added one when we + // requested the snapshot chunk, but didn't end up calling ProcessSnapshotChunk. + srv.snapshot.FreeOperationQueueSemaphore() } // We will update the hyper sync progress tracker struct to reflect the newly added snapshot chunk. @@ -1659,6 +1688,9 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { "attempt to HyperSync from the beginning. Local db checksum %v; peer's snapshot checksum %v", checksumBytes, srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes))) if srv.forceChecksum { + // Free up a slot in the operationQueueSemaphore, since we hit an error and it won't be freed + // by processing the snapshot chunk. + srv.snapshot.FreeOperationQueueSemaphore() // If forceChecksum is true we signal an erasure of the state and return here, // which will cut off the sync. if srv.nodeMessageChannel != nil { @@ -1680,6 +1712,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { // // We split the db update into batches of 10,000 block nodes to avoid a single transaction // being too large and possibly causing an error in badger. + glog.V(0).Infof("Server._handleSnapshot: Updating snapshot block nodes in the database") var blockNodeBatch []*BlockNode for ii := uint64(1); ii <= srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight; ii++ { currentNode := srv.blockchain.bestHeaderChain[ii] @@ -2238,7 +2271,7 @@ func (srv *Server) _logAndDisconnectPeer(pp *Peer, blockMsg *MsgDeSoBlock, suffi // isLastBlock indicates that this is the last block in the list of blocks we received back // via a MsgDeSoBlockBundle message. When we receive a single block, isLastBlock will automatically // be true, which will give it its old single-block behavior. -func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { +func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) error { srv.timer.Start("Server._handleBlock: General") // Pull out the header for easy access. @@ -2246,7 +2279,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { if blockHeader == nil { // Should never happen but check it nevertheless. srv._logAndDisconnectPeer(pp, blk, "Header was nil") - return + return fmt.Errorf("Server._handleBlock: Header was nil") } // If we've set a maximum sync height and we've reached that height, then we will @@ -2254,7 +2287,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { blockTip := srv.blockchain.blockTip() if srv.blockchain.isTipMaxed(blockTip) && blockHeader.Height > uint64(blockTip.Height) { glog.Infof("Server._handleBlock: Exiting because block tip is maxed out") - return + return nil } // Compute the hash of the block. If the hash computation fails, then we log an error and @@ -2262,7 +2295,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { blockHash, err := blk.Header.Hash() if err != nil { srv._logAndDisconnectPeer(pp, blk, "Problem computing block hash") - return + return errors.Wrap(err, "Server._handleBlock: Problem computing block hash") } // Unless we're running a PoS validator, we should not expect to see a block that we did not request. If @@ -2270,7 +2303,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { _, isRequestedBlock := pp.requestedBlocks[*blockHash] if srv.fastHotStuffConsensus == nil && !isRequestedBlock { srv._logAndDisconnectPeer(pp, blk, "Getting a block that we haven't requested before") - return + return fmt.Errorf("Server._handleBlock: Getting a block that we haven't requested before") } // Delete the block from the requested blocks map. We do this whether the block was requested or not. @@ -2287,7 +2320,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { blk.BlockProducerInfo.PublicKey)] if entryExists { srv._logAndDisconnectPeer(pp, blk, "Got forbidden block signature public key.") - return + return fmt.Errorf("Server._handleBlock: Got forbidden block signature public key.") } } } @@ -2302,7 +2335,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { pp, srv.blockchain.chainState(), blk.Header.Height, srv.blockchain.GetCheckpointBlockInfo().Hash.String()) pp.Disconnect("Mismatch between received header height and checkpoint block info") - return + return fmt.Errorf("Server._handleHeaderBundle: Mismatch between received header height and checkpoint block info") } var isOrphan bool @@ -2352,11 +2385,11 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { // bad block proposer signature or it has a bad QC. In either case, we should // disconnect the peer. srv._logAndDisconnectPeer(pp, blk, errors.Wrapf(err, "Error while processing block at height %v: ", blk.Header.Height).Error()) - return + return errors.Wrapf(err, "Server._handleBlock: Error while processing block at height %v: ", blk.Header.Height) } else { // For any other error, we log the error and continue. glog.Errorf("Server._handleBlock: Error while processing block at height %v: %v", blk.Header.Height, err) - return + return errors.Wrapf(err, "Server._handleBlock: Error while processing block at height %v: ", blk.Header.Height) } } @@ -2369,7 +2402,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { // relevant after we've connected the last block, and it generally involves fetching // more data from our peer. if !isLastBlock { - return + return nil } if isOrphan { @@ -2395,9 +2428,11 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { } else { // If we don't have any blocks to request, then we disconnect from the peer. srv._logAndDisconnectPeer(pp, blk, "Received orphan block") + return fmt.Errorf("Server._handleBlock: Received unexpected orphan block") } - return + // We're done processing the orphan block and we don't return an error. + return nil } // We shouldn't be receiving blocks while syncing headers, but we can end up here @@ -2415,13 +2450,13 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { glog.V(1).Infof("Server._handleHeaderBundle: *Syncing* headers for blocks starting at "+ "header tip %v from peer %v", srv.blockchain.HeaderTip(), pp) - return + return nil } if srv.blockchain.chainState() == SyncStateSyncingHistoricalBlocks { srv.GetBlocksToStore(pp) if srv.blockchain.downloadingHistoricalBlocks { - return + return nil } } @@ -2435,7 +2470,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { // we're syncing. maxHeight := -1 srv.RequestBlocksUpToHeight(pp, maxHeight) - return + return nil } if srv.blockchain.chainState() == SyncStateNeedBlocksss { @@ -2456,7 +2491,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { StopHash: &BlockHash{}, BlockLocator: locator, }, false) - return + return nil } // If we get here, it means we're in SyncStateFullyCurrent, which is great. @@ -2465,11 +2500,12 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { // Exit early if the chain isn't SyncStateFullyCurrent. if srv.blockchain.chainState() != SyncStateFullyCurrent { - return + return nil } // If the chain is current, then try to transition to the FastHotStuff consensus. srv.tryTransitionToFastHotStuffConsensus() + return nil } func (srv *Server) _handleBlockBundle(pp *Peer, bundle *MsgDeSoBlockBundle) { @@ -2500,7 +2536,10 @@ func (srv *Server) _handleBlockBundle(pp *Peer, bundle *MsgDeSoBlockBundle) { // _handleBlock is a legacy function that doesn't support erroring out. It's not a big deal // though as we'll just connect all the blocks after the failed one and those blocks will also // gracefully fail. - srv._handleBlock(pp, blk, ii == len(bundle.Blocks)-1 /*isLastBlock*/) + if err := srv._handleBlock(pp, blk, ii == len(bundle.Blocks)-1 /*isLastBlock*/); err != nil { + glog.Errorf("Server._handleBlockBundle: Problem processing block %v: %v", blk, err) + return + } numLogBlocks := 100 if srv.params.IsPoSBlockHeight(blk.Header.Height) || srv.params.NetworkType == NetworkType_TESTNET { diff --git a/lib/snapshot.go b/lib/snapshot.go index 341412278..0e0287ebd 100644 --- a/lib/snapshot.go +++ b/lib/snapshot.go @@ -1211,15 +1211,20 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. dbFlushId := uuid.New() snap.timer.Start("SetSnapshotChunk.Total") - // If there's a problem retrieving the snapshot checksum, we'll reschedule this snapshot chunk set. - initialChecksumBytes, err := snap.Checksum.ToBytes() - if err != nil { - glog.Errorf("Snapshot.SetSnapshotChunk: Problem retrieving checksum bytes, error: (%v)", err) - snap.ProcessSnapshotChunk(mainDb, mainDbMutex, chunk, blockHeight) - return err + var initialChecksumBytes []byte + if !snap.disableChecksum { + // If there's a problem retrieving the snapshot checksum, we'll reschedule this snapshot chunk set. + initialChecksumBytes, err = snap.Checksum.ToBytes() + if err != nil { + glog.Errorf("Snapshot.SetSnapshotChunk: Problem retrieving checksum bytes, error: (%v)", err) + snap.ProcessSnapshotChunk(mainDb, mainDbMutex, chunk, blockHeight) + return err + } } - mainDbMutex.Lock() + // TODO: I think we don't need to hold the chain lock. We can have multithreaded writes. + //mainDbMutex.Lock() + // We use badgerDb write batches as it's the fastest way to write multiple records to the db. wb := mainDb.NewWriteBatch() defer wb.Cancel() @@ -1287,7 +1292,7 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. //snap.timer.End("SetSnapshotChunk.Checksum") }() syncGroup.Wait() - mainDbMutex.Unlock() + //mainDbMutex.Unlock() // If there's a problem setting the snapshot checksum, we'll reschedule this snapshot chunk set. if err != nil { @@ -1299,11 +1304,13 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. } glog.Infof("Snapshot.SetSnapshotChunk: Problem setting the snapshot chunk, error (%v)", err) - // We reset the snapshot checksum so its initial value, so we won't overlap with processing the next snapshot chunk. - // If we've errored during a writeBatch set we'll redo this chunk in next SetSnapshotChunk so we're fine with overlaps. - if err := snap.Checksum.FromBytes(initialChecksumBytes); err != nil { - panic(fmt.Errorf("Snapshot.SetSnapshotChunk: Problem resetting checksum. This should never happen, "+ - "error: (%v)", err)) + if !snap.disableChecksum { + // We reset the snapshot checksum so its initial value, so we won't overlap with processing the next snapshot chunk. + // If we've errored during a writeBatch set we'll redo this chunk in next SetSnapshotChunk so we're fine with overlaps. + if err = snap.Checksum.FromBytes(initialChecksumBytes); err != nil { + panic(fmt.Errorf("Snapshot.SetSnapshotChunk: Problem resetting checksum. This should never happen, "+ + "error: (%v)", err)) + } } snap.ProcessSnapshotChunk(mainDb, mainDbMutex, chunk, blockHeight) return err @@ -1315,6 +1322,9 @@ func (snap *Snapshot) SetSnapshotChunk(mainDb *badger.DB, mainDbMutex *deadlock. Succeeded: true, }) } + // If we get here, then we've successfully processed the snapshot chunk + // and can free one slot in the operation queue semaphore. + snap.FreeOperationQueueSemaphore() snap.timer.End("SetSnapshotChunk.Total") @@ -1914,7 +1924,7 @@ type SnapshotOperationChannel struct { // from the MainDBSemaphore and the AncestralDBSemaphore which manage concurrency // around flushes only. StateSemaphore int32 - StateSemaphoreLock sync.Mutex + StateSemaphoreLock sync.RWMutex mainDb *badger.DB snapshotDbMutex *sync.Mutex @@ -2028,8 +2038,8 @@ func (opChan *SnapshotOperationChannel) FinishOperation() { } func (opChan *SnapshotOperationChannel) GetStatus() int32 { - opChan.StateSemaphoreLock.Lock() - defer opChan.StateSemaphoreLock.Unlock() + opChan.StateSemaphoreLock.RLock() + defer opChan.StateSemaphoreLock.RUnlock() return opChan.StateSemaphore }