From f73091472079685a9aa3090449dff632cd6e21fc Mon Sep 17 00:00:00 2001 From: baobaoyeye Date: Wed, 11 Jan 2017 20:33:53 +0800 Subject: [PATCH 1/2] delete leveldb before exit function --- src/utils/meta_converter.cc | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/utils/meta_converter.cc b/src/utils/meta_converter.cc index cd028360..553caa81 100644 --- a/src/utils/meta_converter.cc +++ b/src/utils/meta_converter.cc @@ -32,8 +32,10 @@ void CheckChunkserverMeta(const std::vector& store_path_list) { options.create_if_missing = true; leveldb::DB* metadb; leveldb::Status s = leveldb::DB::Open(options, path + "/meta/", &metadb); + meta_dbs[path] = metadb; if (!s.ok()) { LOG(ERROR, "[MetaCheck] Open meta on %s failed: %s", path.c_str(), s.ToString().c_str()); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); return; } @@ -56,19 +58,20 @@ void CheckChunkserverMeta(const std::vector& store_path_list) { LOG(INFO, "[MetaCheck] %s Load meta version %d", path.c_str(), cur_version); if (meta_version != EMPTY_META && cur_version != meta_version) { LOG(ERROR, "Cannot handle this situation!!!"); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); } meta_version = cur_version; } else if (s.IsNotFound()) { if (meta_version != EMPTY_META && meta_version != 0) { LOG(ERROR, "Cannot handle this situation!!!"); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); } meta_version = 0; LOG(INFO, "No meta version %s", path.c_str()); } } - meta_dbs[path] = metadb; } if (meta_version == CHUNKSERVER_META_VERSION) { LOG(INFO, "[MetaCheck] Chunkserver meta check pass"); @@ -78,6 +81,7 @@ void CheckChunkserverMeta(const std::vector& store_path_list) { ChunkserverMetaV02V1(meta_dbs); } else { LOG(ERROR, "[MetaCheck] Cannot handle this situation!!!"); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); } SetChunkserverMetaVersion(meta_dbs); @@ -106,6 +110,7 @@ void ChunkserverMetaV02V1(const std::map& meta_dbs) { } if (!src_meta) { LOG(ERROR, "[MetaCheck] Cannot find a valid meta store"); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); } @@ -114,6 +119,7 @@ void ChunkserverMetaV02V1(const std::map& meta_dbs) { BlockMeta meta; if (!meta.ParseFromArray(it->value().data(), it->value().size())) { LOG(ERROR, "[MetaCheck] Parse BlockMeta failed: key = %s", it->key().ToString().c_str()); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); } const std::string& path = meta.store_path(); @@ -146,6 +152,7 @@ void SetChunkserverMetaVersion(const std::map& meta_d leveldb::Status s = ldb->Put(leveldb::WriteOptions(), meta_key, meta_str); if (!s.ok()) { LOG(ERROR, "[MetaCheck] Put meta failed %s", it->first.c_str()); + CloseMetaStore(meta_dbs); exit(EXIT_FAILURE); } LOG(INFO, "[MetaCheck] Set meta version %s = %d", it->first.c_str(), CHUNKSERVER_META_VERSION); @@ -159,4 +166,4 @@ void CloseMetaStore(const std::map& meta_dbs) { } } // namespace bfs -} // namespace baidu \ No newline at end of file +} // namespace baidu From 07e8de1ddd076a99a2f5cc25f3f74ca9f2e34337 Mon Sep 17 00:00:00 2001 From: baobaoyeye Date: Sun, 15 Jan 2017 23:13:14 +0800 Subject: [PATCH 2/2] gc something at destory MasterSlaveImpl and logs format --- src/nameserver/logdb.cc | 1 + src/nameserver/master_slave.cc | 27 +++++++++++++++++++-------- src/nameserver/master_slave.h | 2 +- src/nameserver/namespace.cc | 1 + 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/nameserver/logdb.cc b/src/nameserver/logdb.cc index 980000b7..16543b1b 100644 --- a/src/nameserver/logdb.cc +++ b/src/nameserver/logdb.cc @@ -22,6 +22,7 @@ LogDB::~LogDB() { if (thread_pool_) { thread_pool_->Stop(true); } + delete thread_pool_; if (write_log_) fclose(write_log_); for (FileCache::iterator it = read_log_.begin(); it != read_log_.end(); ++it) { fclose((it->second).first); diff --git a/src/nameserver/master_slave.cc b/src/nameserver/master_slave.cc index d2b5f9e4..3f15245b 100644 --- a/src/nameserver/master_slave.cc +++ b/src/nameserver/master_slave.cc @@ -61,6 +61,17 @@ MasterSlaveImpl::MasterSlaveImpl() : slave_stub_(NULL), exiting_(false), master_ } } + +MasterSlaveImpl::~MasterSlaveImpl() { + // Wait for threads done task. + if (thread_pool_) { + thread_pool_->Stop(true); + } + delete thread_pool_; + delete rpc_client_; + delete logdb_; +} + void MasterSlaveImpl::Init(std::function callback) { log_callback_ = callback; if (logdb_->GetLargestIdx(¤t_idx_) == kReadError) { @@ -167,7 +178,7 @@ void MasterSlaveImpl::Log(const std::string& entry, std::function c thread_pool_->AddTask(std::bind(&MasterSlaveImpl::PorcessCallbck,this, current_idx_, true)); } else { - LOG(DEBUG, "%s insert callback index = %d", kLogPrefix.c_str(), current_idx_); + LOG(DEBUG, "%s insert callback index = %ld", kLogPrefix.c_str(), current_idx_); thread_pool_->DelayTask(10000, std::bind(&MasterSlaveImpl::PorcessCallbck, this, current_idx_, true)); cond_.Signal(); @@ -253,7 +264,7 @@ void MasterSlaveImpl::ReplicateLog() { mu_.Unlock(); break; } - LOG(DEBUG, "%s ReplicateLog sync_idx_ = %d, current_idx_ = %d", + LOG(DEBUG, "%s ReplicateLog sync_idx_ = %ld, current_idx_ = %ld", kLogPrefix.c_str(), sync_idx_, current_idx_); mu_.Unlock(); std::string entry; @@ -268,21 +279,21 @@ void MasterSlaveImpl::ReplicateLog() { while (!rpc_client_->SendRequest(slave_stub_, &master_slave::MasterSlave_Stub::AppendLog, &request, &response, 15, 1)) { - LOG(WARNING, "%s Replicate log failed index = %d, current_idx_ = %d", + LOG(WARNING, "%s Replicate log failed index = %ld, current_idx_ = %ld", kLogPrefix.c_str(), sync_idx_ + 1, current_idx_); sleep(5); } if (!response.success()) { // log mismatch MutexLock lock(&mu_); sync_idx_ = response.index() - 1; - LOG(INFO, "[Sync] set sync_idx_ to %d", kLogPrefix.c_str(), sync_idx_); + LOG(INFO, "%s set sync_idx_ to %ld", kLogPrefix.c_str(), sync_idx_); continue; } thread_pool_->AddTask(std::bind(&MasterSlaveImpl::PorcessCallbck, this, sync_idx_ + 1, false)); mu_.Lock(); sync_idx_++; - LOG(DEBUG, "%s Replicate log done. sync_idx_ = %d, current_idx_ = %d", + LOG(DEBUG, "%s Replicate log done. sync_idx_ = %ld, current_idx_ = %ld", kLogPrefix.c_str(), sync_idx_ , current_idx_); mu_.Unlock(); } @@ -296,7 +307,7 @@ void MasterSlaveImpl::PorcessCallbck(int64_t index, bool timeout_check) { std::map >::iterator it = callbacks_.find(index); if (it != callbacks_.end()) { callback = it->second; - LOG(DEBUG, "%s calling callback %d", kLogPrefix.c_str(), it->first); + LOG(DEBUG, "%s calling callback %ld", kLogPrefix.c_str(), it->first); callbacks_.erase(it); mu_.Unlock(); callback(true); @@ -306,7 +317,7 @@ void MasterSlaveImpl::PorcessCallbck(int64_t index, bool timeout_check) { } if (timeout_check) { if (!master_only_) { - LOG(WARNING, "%s ReplicateLog sync_idx_ = %d timeout, enter master-only mode", + LOG(WARNING, "%s ReplicateLog sync_idx_ = %ld timeout, enter master-only mode", kLogPrefix.c_str(), index); } master_only_ = true; @@ -320,7 +331,7 @@ void MasterSlaveImpl::PorcessCallbck(int64_t index, bool timeout_check) { } void MasterSlaveImpl::LogStatus() { - LOG(INFO, "%s sync_idx_ = %d, current_idx_ = %d, applied_idx_ = %d, callbacks_ size = %d", + LOG(INFO, "%s sync_idx_ = %ld, current_idx_ = %ld, applied_idx_ = %ld, callbacks_ size = %d", kLogPrefix.c_str(), sync_idx_, current_idx_, applied_idx_, callbacks_.size()); StatusCode ret_a = logdb_->WriteMarker("applied_idx", applied_idx_); StatusCode ret_s = logdb_->WriteMarker("sync_idx", sync_idx_); diff --git a/src/nameserver/master_slave.h b/src/nameserver/master_slave.h index cec4e45e..3dea5eb8 100644 --- a/src/nameserver/master_slave.h +++ b/src/nameserver/master_slave.h @@ -25,7 +25,7 @@ class RpcClient; class MasterSlaveImpl : public Sync, public master_slave::MasterSlave { public: MasterSlaveImpl(); - virtual ~MasterSlaveImpl() {}; + virtual ~MasterSlaveImpl(); virtual void Init(std::function callback); virtual bool IsLeader(std::string* leader_addr = NULL); virtual bool Log(const std::string& entry, int timeout_ms = 10000); diff --git a/src/nameserver/namespace.cc b/src/nameserver/namespace.cc index f9d06912..b22f1adb 100644 --- a/src/nameserver/namespace.cc +++ b/src/nameserver/namespace.cc @@ -38,6 +38,7 @@ NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1), options.block_cache = leveldb::NewLRUCache(FLAGS_namedb_cache_size * 1024L * 1024L); leveldb::Status s = leveldb::DB::Open(options, FLAGS_namedb_path, &db_); if (!s.ok()) { + delete db_; db_ = NULL; LOG(ERROR, "Open leveldb fail: %s", s.ToString().c_str()); exit(EXIT_FAILURE);