From 1ccd074bc4384facba64c78f6b00e81056c03f9c Mon Sep 17 00:00:00 2001 From: lylei Date: Mon, 27 Mar 2017 17:38:33 +0800 Subject: [PATCH] Concurrency rebuild block map when ns started #880 (#880) --- src/nameserver/nameserver_impl.cc | 22 +++++++++++++++------- src/nameserver/nameserver_impl.h | 3 ++- src/nameserver/namespace.cc | 25 +++++++++++++++++++------ src/nameserver/namespace.h | 4 ++-- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index cd9e87f3..0521c0b1 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -94,7 +94,7 @@ void NameServerImpl::CheckLeader() { if (!sync_ || sync_->IsLeader()) { LOG(INFO, "Leader nameserver, rebuild block map."); NameServerLog log; - std::function task = + std::function&)> task = std::bind(&NameServerImpl::RebuildBlockMapCallback, this, std::placeholders::_1); namespace_->Activate(task, &log); if (!LogRemote(log, std::function())) { @@ -1061,12 +1061,20 @@ void NameServerImpl::UnlockDir(::google::protobuf::RpcController* controller, done->Run(); } -void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) { - for (int i = 0; i < file_info.blocks_size(); i++) { - int64_t block_id = file_info.blocks(i); - int64_t version = file_info.version(); - block_mapping_manager_->RebuildBlock(block_id, file_info.replicas(), - version, file_info.size()); +void NameServerImpl::RebuildBlockMapCallback(const std::vector& file_info) { + work_thread_pool_->AddTask(std::bind(&NameServerImpl::RebuildBlockMapCallbackAsync, + this, file_info)); +} + +void NameServerImpl::RebuildBlockMapCallbackAsync(const std::vector& file_info) { + for (auto it = file_info.begin(); it != file_info.end(); ++it) { + const FileInfo& file = *it; + for (int i = 0; i < file.blocks_size(); i++) { + int64_t block_id = file.blocks(i); + int64_t version = file.version(); + block_mapping_manager_->RebuildBlock(block_id, file.replicas(), + version, file.size()); + } } } diff --git a/src/nameserver/nameserver_impl.h b/src/nameserver/nameserver_impl.h index 4b2f3d79..51d37239 100644 --- a/src/nameserver/nameserver_impl.h +++ b/src/nameserver/nameserver_impl.h @@ -146,7 +146,8 @@ class NameServerImpl : public NameServer { private: void CheckLeader(); - void RebuildBlockMapCallback(const FileInfo& file_info); + void RebuildBlockMapCallback(const std::vector& file_info); + void RebuildBlockMapCallbackAsync(const std::vector& file_info); void LogStatus(); void CheckRecoverMode(); void LeaveReadOnly(); diff --git a/src/nameserver/namespace.cc b/src/nameserver/namespace.cc index 76c3db04..dfa4ae9b 100644 --- a/src/nameserver/namespace.cc +++ b/src/nameserver/namespace.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include "nameserver/sync.h" @@ -32,6 +33,8 @@ const int64_t kRootEntryid = 1; namespace baidu { namespace bfs { +extern common::Counter g_blocks_num; + NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1), block_id_upbound_(1), next_block_id_(1) { leveldb::Options options; @@ -49,7 +52,7 @@ NameSpace::NameSpace(bool standalone): version_(0), last_entry_id_(1), } } -void NameSpace::Activate(std::function callback, NameServerLog* log) { +void NameSpace::Activate(std::function&)> callback, NameServerLog* log) { std::string version_key(8, 0); version_key.append("version"); std::string version_str; @@ -672,13 +675,14 @@ StatusCode NameSpace::InternalDeleteDirectory(const FileInfo& dir_info, return ret_status; } -bool NameSpace::RebuildBlockMap(std::function callback) { +bool NameSpace::RebuildBlockMap(std::function&)> callback) { int64_t block_num = 0; int64_t file_num = 0; int64_t link_num = 0; std::set entry_id_set; entry_id_set.insert(root_path_.entry_id()); leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + std::vector files; for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) { FileInfo file_info; bool ret = file_info.ParseFromArray(it->value().data(), it->value().size()); @@ -697,8 +701,10 @@ bool NameSpace::RebuildBlockMap(std::function callback) ++block_num; } ++file_num; - if (callback) { - callback(file_info); + files.push_back(file_info); + if (callback && files.size() >= 1000) { + callback(files); + files.clear(); } } else if (file_type == kSymlink) { ++link_num; @@ -706,9 +712,16 @@ bool NameSpace::RebuildBlockMap(std::function callback) entry_id_set.insert(file_info.entry_id()); } } + if (callback && files.size()) { + callback(files); + } + while (g_blocks_num.Get() != block_num) { + sleep(1); + LOG(INFO, "Rebuild process: %ld / %ld blocks", g_blocks_num.Get(), block_num); + } LOG(INFO, "RebuildBlockMap done. %ld directories, %ld symlinks, %ld files, " - "%lu blocks, last_entry_id= E%ld", - entry_id_set.size(), link_num, file_num, block_num, last_entry_id_); + "%lu blocks, mapping size %lu last_entry_id= E%ld", + entry_id_set.size(), link_num, file_num, block_num, g_blocks_num.Get(), last_entry_id_); if (FLAGS_check_orphan) { std::vector > orphan_entrys; for (it->Seek(std::string(7, '\0') + '\1'); it->Valid(); it->Next()) { diff --git a/src/nameserver/namespace.h b/src/nameserver/namespace.h index 42e46e87..12ff3911 100644 --- a/src/nameserver/namespace.h +++ b/src/nameserver/namespace.h @@ -25,7 +25,7 @@ namespace bfs { class NameSpace { public: NameSpace(bool standalone = true); - void Activate(std::function rebuild_callback, NameServerLog* log); + void Activate(std::function&)> rebuild_callback, NameServerLog* log); ~NameSpace(); /// List a directory StatusCode ListDirectory(const std::string& path, @@ -60,7 +60,7 @@ class NameSpace { /// Namespace version int64_t Version() const; /// Rebuild blockmap - bool RebuildBlockMap(std::function callback); + bool RebuildBlockMap(std::function&)> callback); /// NormalizePath static std::string NormalizePath(const std::string& path); /// ha - tail log from leader/master