From 217fb773ab6122b99eef8d2c08fedd9558ab67cc Mon Sep 17 00:00:00 2001 From: yangce Date: Thu, 2 Feb 2017 15:35:36 +0800 Subject: [PATCH 01/15] Add proto (#820) --- src/chunkserver/chunkserver_impl.cc | 9 +++++++++ src/chunkserver/chunkserver_impl.h | 4 ++++ src/nameserver/nameserver_impl.cc | 17 +++++++++++++++++ src/nameserver/nameserver_impl.h | 8 ++++++++ src/proto/chunkserver.proto | 12 +++++++++++- src/proto/nameserver.proto | 27 +++++++++++++++++++++++++++ 6 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/chunkserver/chunkserver_impl.cc b/src/chunkserver/chunkserver_impl.cc index c219a696..9f62e5f9 100644 --- a/src/chunkserver/chunkserver_impl.cc +++ b/src/chunkserver/chunkserver_impl.cc @@ -858,6 +858,15 @@ void ChunkServerImpl::GetBlockInfo(::google::protobuf::RpcController* controller } +void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* controller, + const PrepareForWriteRequest* request, + PrepareForWriteResponse* response, + ::google::protobuf::Closure* done) + +{ + +} + bool ChunkServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response) { ChunkserverStat c_stat = counter_manager_.GetCounters(); diff --git a/src/chunkserver/chunkserver_impl.h b/src/chunkserver/chunkserver_impl.h index 011a9c4b..57387fee 100644 --- a/src/chunkserver/chunkserver_impl.h +++ b/src/chunkserver/chunkserver_impl.h @@ -49,6 +49,10 @@ class ChunkServerImpl : public ChunkServer { const GetBlockInfoRequest* request, GetBlockInfoResponse* response, ::google::protobuf::Closure* done); + virtual void PrepareForWrite(::google::protobuf::RpcController* controller, + const PrepareForWriteRequest* request, + PrepareForWriteResponse* response, + ::google::protobuf::Closure* done); bool WebService(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response); private: diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index ade00ec9..ba114b18 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -1015,6 +1015,23 @@ void NameServerImpl::ShutdownChunkServerStat(::google::protobuf::RpcController* done->Run(); } + +void NameServerImpl::GetChunkServer(::google::protobuf::RpcController* controller, + const GetChunkServerRequest* request, + GetChunkServerResponse* response, + ::google::protobuf::Closure* done) +{ + +} + +void NameServerImpl::StartRecoverBlock(::google::protobuf::RpcController* controller, + const StartRecoverBlockRequest* request, + StartRecoverBlockResponse* response, + ::google::protobuf::Closure* done) +{ + +} + void NameServerImpl::TransToString(const std::map >& chk_set, std::string* output) { for (std::map >::const_iterator it = diff --git a/src/nameserver/nameserver_impl.h b/src/nameserver/nameserver_impl.h index 181ebd70..c67130f4 100644 --- a/src/nameserver/nameserver_impl.h +++ b/src/nameserver/nameserver_impl.h @@ -132,6 +132,14 @@ class NameServerImpl : public NameServer { const ChmodRequest* request, ChmodResponse* response, ::google::protobuf::Closure* done); + void GetChunkServer(::google::protobuf::RpcController* controller, + const GetChunkServerRequest* request, + GetChunkServerResponse* response, + ::google::protobuf::Closure* done); + void StartRecoverBlock(::google::protobuf::RpcController* controller, + const StartRecoverBlockRequest* request, + StartRecoverBlockResponse* response, + ::google::protobuf::Closure* done); bool WebService(const sofa::pbrpc::HTTPRequest&, sofa::pbrpc::HTTPResponse&); private: diff --git a/src/proto/chunkserver.proto b/src/proto/chunkserver.proto index b88b33d4..61c33652 100644 --- a/src/proto/chunkserver.proto +++ b/src/proto/chunkserver.proto @@ -18,7 +18,6 @@ message WriteBlockRequest { optional bool sync_on_close = 14; optional int64 total_size = 15 [default = -1]; } - message WriteBlockResponse { optional int64 sequence_id = 1; optional StatusCode status = 2; @@ -53,9 +52,20 @@ message GetBlockInfoResponse { repeated int64 timestamp = 9; } +message PrepareForWriteRequest { + optional int64 sequence_id = 1; + optional int64 block_id = 2; + optional int64 sliding_window_start_seq = 3; +} +message PrepareForWriteResponse { + optional int64 sequence_id = 1; + optional StatusCode status = 2; +} + service ChunkServer { rpc WriteBlock(WriteBlockRequest) returns(WriteBlockResponse); rpc ReadBlock(ReadBlockRequest) returns(ReadBlockResponse); rpc GetBlockInfo(GetBlockInfoRequest) returns(GetBlockInfoResponse); + rpc PrepareForWrite(PrepareForWriteRequest) returns(PrepareForWriteResponse); } diff --git a/src/proto/nameserver.proto b/src/proto/nameserver.proto index 996cc499..b9454726 100644 --- a/src/proto/nameserver.proto +++ b/src/proto/nameserver.proto @@ -338,6 +338,31 @@ message SymlinkResponse { optional StatusCode status = 2; } +message GetChunkServerRequest { + optional int64 sequence_id = 1; + optional int64 block_id = 2; + optional int32 chunkserver_num = 3; +} + +message GetChunkServerResponse { + optional int64 sequence_id = 1; + optional StatusCode status = 2; + repeated string chunkservers = 3; +} + +message StartRecoverBlockRequest { + optional int64 sequence_id = 1; + optional int32 chunkserver_id = 2; + optional int64 block_id = 3; + optional int64 start_offset = 4; + optional int64 end_offset = 5; +} + +message StartRecoverBlockResponse { + optional int64 sequence_id = 1; + optional StatusCode status = 2; +} + service NameServer { rpc CreateFile(CreateFileRequest) returns(CreateFileResponse); rpc AddBlock(AddBlockRequest) returns(AddBlockResponse); @@ -363,5 +388,7 @@ service NameServer { rpc SysStat(SysStatRequest) returns(SysStatResponse); rpc Chmod(ChmodRequest) returns(ChmodResponse); rpc Symlink(SymlinkRequest) returns(SymlinkResponse); + rpc GetChunkServer(GetChunkServerRequest) returns(GetChunkServerResponse); + rpc StartRecoverBlock(StartRecoverBlockRequest) returns(StartRecoverBlockResponse); } From 8686c854cafa8f41872c79f2d20c4768593b2c39 Mon Sep 17 00:00:00 2001 From: yangce Date: Thu, 2 Feb 2017 18:09:24 +0800 Subject: [PATCH 02/15] Fix code style (#820) --- src/chunkserver/chunkserver_impl.cc | 4 +--- src/nameserver/nameserver_impl.cc | 6 ++---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/chunkserver/chunkserver_impl.cc b/src/chunkserver/chunkserver_impl.cc index 9f62e5f9..584b9556 100644 --- a/src/chunkserver/chunkserver_impl.cc +++ b/src/chunkserver/chunkserver_impl.cc @@ -861,9 +861,7 @@ void ChunkServerImpl::GetBlockInfo(::google::protobuf::RpcController* controller void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* controller, const PrepareForWriteRequest* request, PrepareForWriteResponse* response, - ::google::protobuf::Closure* done) - -{ + ::google::protobuf::Closure* done) { } diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index ba114b18..76faeba2 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -1019,16 +1019,14 @@ void NameServerImpl::ShutdownChunkServerStat(::google::protobuf::RpcController* void NameServerImpl::GetChunkServer(::google::protobuf::RpcController* controller, const GetChunkServerRequest* request, GetChunkServerResponse* response, - ::google::protobuf::Closure* done) -{ + ::google::protobuf::Closure* done) { } void NameServerImpl::StartRecoverBlock(::google::protobuf::RpcController* controller, const StartRecoverBlockRequest* request, StartRecoverBlockResponse* response, - ::google::protobuf::Closure* done) -{ + ::google::protobuf::Closure* done) { } From 9bdf559045fcc161af74cb993bbf2d1262cde66b Mon Sep 17 00:00:00 2001 From: yangce Date: Thu, 2 Feb 2017 16:01:52 +0800 Subject: [PATCH 03/15] Disable close incomplete blocks (#820) --- src/nameserver/block_mapping.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index 1cdb9e1b..82fa311f 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -748,6 +748,7 @@ void BlockMapping::ProcessRecoveredBlock(int32_t cs_id, int64_t block_id, Status void BlockMapping::GetCloseBlocks(int32_t cs_id, google::protobuf::RepeatedField* close_blocks) { + /* MutexLock lock(&mu_); CheckList::iterator c_it = incomplete_.find(cs_id); if (c_it != incomplete_.end()) { @@ -757,6 +758,7 @@ void BlockMapping::GetCloseBlocks(int32_t cs_id, close_blocks->Add(*it); } } + */ } void BlockMapping::GetStat(int32_t cs_id, RecoverBlockNum* recover_num) { From 499d627abaa5db3c7f27832fa1502287932ea7ee Mon Sep 17 00:00:00 2001 From: yangce Date: Thu, 2 Feb 2017 17:01:30 +0800 Subject: [PATCH 04/15] Add GetChunkServer implementation (#820) --- src/nameserver/block_mapping.cc | 8 +++++-- src/nameserver/chunkserver_manager.cc | 11 ++++++--- src/nameserver/chunkserver_manager.h | 3 ++- src/nameserver/nameserver_impl.cc | 33 +++++++++++++++++++++++++-- 4 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index 82fa311f..c7803ca4 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -67,8 +67,12 @@ bool BlockMapping::GetLocatedBlock(int64_t id, std::vector* replica, in if (replica->empty()) { LOG(DEBUG, "Block #%ld lost all replica", id); } - *size = block->block_size; - *status = block->recover_stat; + if (size) { + *size = block->block_size; + } + if (status) { + *status = block->recover_stat; + } return true; } diff --git a/src/nameserver/chunkserver_manager.cc b/src/nameserver/chunkserver_manager.cc index c5533fa1..0e810f87 100644 --- a/src/nameserver/chunkserver_manager.cc +++ b/src/nameserver/chunkserver_manager.cc @@ -428,7 +428,8 @@ bool ChunkServerManager::GetChunkServerChains(int num, } bool ChunkServerManager::GetRecoverChains(const std::set& replica, - std::vector* chains) { + std::vector* chains, + int32_t select_num) { mu_.AssertHeld(); std::map >::iterator it = heartbeat_list_.begin(); std::vector > loads; @@ -490,8 +491,12 @@ bool ChunkServerManager::GetRecoverChains(const std::set& replica, return false; } } - RandomSelect(&loads, FLAGS_recover_dest_limit); - for (int i = 0; i < static_cast(loads.size()) && i < FLAGS_recover_dest_limit; ++i) { + if (select_num == -1) { + select_num = FLAGS_recover_dest_limit; + } + RandomSelect(&loads, select_num); + for (int i = 0; i < static_cast(loads.size()) && + i < FLAGS_recover_dest_limit; ++i) { ChunkServerInfo* cs = loads[i].second; chains->push_back(cs->address()); } diff --git a/src/nameserver/chunkserver_manager.h b/src/nameserver/chunkserver_manager.h index db7e314d..fb970ebe 100644 --- a/src/nameserver/chunkserver_manager.h +++ b/src/nameserver/chunkserver_manager.h @@ -55,7 +55,8 @@ class ChunkServerManager { void ListChunkServers(::google::protobuf::RepeatedPtrField* chunkservers); bool GetChunkServerChains(int num, std::vector >* chains, const std::string& client_address); - bool GetRecoverChains(const std::set& replica, std::vector* chains); + bool GetRecoverChains(const std::set& replica, + std::vector* chains, int num = -1); int32_t AddChunkServer(const std::string& address, const std::string& ip, const std::string& tag, int64_t quota); bool KickChunkServer(int cs_id); diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index 76faeba2..2ac809a1 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -1015,12 +1015,40 @@ void NameServerImpl::ShutdownChunkServerStat(::google::protobuf::RpcController* done->Run(); } - void NameServerImpl::GetChunkServer(::google::protobuf::RpcController* controller, const GetChunkServerRequest* request, GetChunkServerResponse* response, ::google::protobuf::Closure* done) { + if (!is_leader_) { + response->set_status(kIsFollower); + done->Run(); + return; + } + sofa::pbrpc::RpcController* ctl = + reinterpret_cast(controller); + LOG(INFO, "Sdk %s want to get %d chunkserver for block #%ld", + ctl->RemoteAddress().c_str(), request->chunkserver_num(), + request->block_id()); + response->set_sequence_id(request->sequence_id()); + int64_t block_id = request->block_id(); + std::vector cur_replicas; + block_mapping_manager_->GetLocatedBlock(block_id, &cur_replicas, + NULL, NULL); + std::set cur_rep(cur_replicas.begin(), cur_replicas.end()); + int32_t cs_num = request->chunkserver_num(); + std::vector result; + if (!chunkserver_manager_->GetRecoverChains(cur_rep, &result, cs_num)) { + LOG(WARNING, "Get %d chunkserver for block #%ld fail", + cs_num, block_id); + response->set_status(kGetChunkServerError); + done->Run(); + } + for (size_t i = 0; i < result.size(); i++) { + response->add_chunkservers(result[i]); + } + response->set_status(kOK); + done->Run(); } void NameServerImpl::StartRecoverBlock(::google::protobuf::RpcController* controller, @@ -1530,7 +1558,8 @@ void NameServerImpl::CallMethod(const ::google::protobuf::MethodDescriptor* meth std::make_pair("PushBlockReport", work_thread_pool_), std::make_pair("SysStat", read_thread_pool_), std::make_pair("Chmod", work_thread_pool_), - std::make_pair("Symlink", work_thread_pool_) + std::make_pair("Symlink", work_thread_pool_), + std::make_pair("GetChunkServer", work_thread_pool_) }; static int method_num = sizeof(ThreadPoolOfMethod) / From e5a7c500ef3904d592b02d9a5b59ad6bf587091f Mon Sep 17 00:00:00 2001 From: yangce Date: Thu, 2 Feb 2017 20:30:37 +0800 Subject: [PATCH 05/15] Small fix (#820) --- src/nameserver/chunkserver_manager.cc | 9 ++++----- src/nameserver/chunkserver_manager.h | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/nameserver/chunkserver_manager.cc b/src/nameserver/chunkserver_manager.cc index 0e810f87..9d758358 100644 --- a/src/nameserver/chunkserver_manager.cc +++ b/src/nameserver/chunkserver_manager.cc @@ -491,12 +491,9 @@ bool ChunkServerManager::GetRecoverChains(const std::set& replica, return false; } } - if (select_num == -1) { - select_num = FLAGS_recover_dest_limit; - } RandomSelect(&loads, select_num); for (int i = 0; i < static_cast(loads.size()) && - i < FLAGS_recover_dest_limit; ++i) { + i < select_num; ++i) { ChunkServerInfo* cs = loads[i].second; chains->push_back(cs->address()); } @@ -679,7 +676,9 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks it != blocks.end(); ++it) { MutexLock lock(&mu_); recover_blocks->push_back(std::make_pair((*it).first, std::vector())); - if (GetRecoverChains((*it).second, &(recover_blocks->back().second))) { + if (GetRecoverChains((*it).second, + &(recover_blocks->back().second), + FLAGS_recover_dest_limit)) { // } else { block_mapping_manager_->ProcessRecoveredBlock(cs_id, (*it).first, kGetChunkServerError); diff --git a/src/nameserver/chunkserver_manager.h b/src/nameserver/chunkserver_manager.h index fb970ebe..754d3ec8 100644 --- a/src/nameserver/chunkserver_manager.h +++ b/src/nameserver/chunkserver_manager.h @@ -56,7 +56,7 @@ class ChunkServerManager { bool GetChunkServerChains(int num, std::vector >* chains, const std::string& client_address); bool GetRecoverChains(const std::set& replica, - std::vector* chains, int num = -1); + std::vector* chains, int num); int32_t AddChunkServer(const std::string& address, const std::string& ip, const std::string& tag, int64_t quota); bool KickChunkServer(int cs_id); From 33dc90f46283303caa7359d656f4d397ff76e9c3 Mon Sep 17 00:00:00 2001 From: yangce Date: Fri, 3 Feb 2017 10:12:33 +0800 Subject: [PATCH 06/15] Add PrepareForWrite implementation (#820) --- src/chunkserver/chunkserver_impl.cc | 20 +++++++++++++++++--- src/chunkserver/data_block.cc | 3 +++ src/chunkserver/data_block.h | 1 + src/proto/chunkserver.proto | 2 +- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/chunkserver/chunkserver_impl.cc b/src/chunkserver/chunkserver_impl.cc index 584b9556..7c63a060 100644 --- a/src/chunkserver/chunkserver_impl.cc +++ b/src/chunkserver/chunkserver_impl.cc @@ -859,10 +859,24 @@ void ChunkServerImpl::GetBlockInfo(::google::protobuf::RpcController* controller } void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* controller, - const PrepareForWriteRequest* request, - PrepareForWriteResponse* response, - ::google::protobuf::Closure* done) { + const PrepareForWriteRequest* request, + PrepareForWriteResponse* response, + ::google::protobuf::Closure* done) { + response->set_sequence_id(request->sequence_id()); + int64_t block_id = request->block_id(); + int32_t seq = request->sliding_window_start_seq(); + LOG(INFO, "Prepare write for block #%ld from seq %d", block_id, seq); + StatusCode s; + Block* block = block_manager_->CreateBlock(block_id, &s); + if (s != kOK) { + LOG(INFO, "[PrepareForWrite] block #%ld created failed, reason %s", + block_id, StatusCode_Name(s).c_str()); + } else { + block->SeekReceiveWindow(seq); + } + response->set_status(s); + done->Run(); } bool ChunkServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request, diff --git a/src/chunkserver/data_block.cc b/src/chunkserver/data_block.cc index 86d0c8cd..f1fd4e5d 100644 --- a/src/chunkserver/data_block.cc +++ b/src/chunkserver/data_block.cc @@ -421,6 +421,9 @@ int64_t Block::GetExpectedSize() const { void Block::SetExpectedSize(int64_t expected_size) { expected_size_ = expected_size; } +void Block::SeekReceiveWindow(int32_t seq) { + recv_window_->SeekToOffset(seq); +} /// Append to block buffer StatusCode Block::Append(int32_t seq, const char* buf, int64_t len) { mu_.AssertHeld(); diff --git a/src/chunkserver/data_block.h b/src/chunkserver/data_block.h index 62541140..a6920fe5 100644 --- a/src/chunkserver/data_block.h +++ b/src/chunkserver/data_block.h @@ -69,6 +69,7 @@ class Block { bool IsRecover() const; int64_t GetExpectedSize() const; void SetExpectedSize(int64_t expected_size); + void SeekReceiveWindow(int32_t seq); /// Flush block to disk. bool Close(bool sync); void AddRef(); diff --git a/src/proto/chunkserver.proto b/src/proto/chunkserver.proto index 61c33652..32970db9 100644 --- a/src/proto/chunkserver.proto +++ b/src/proto/chunkserver.proto @@ -55,7 +55,7 @@ message GetBlockInfoResponse { message PrepareForWriteRequest { optional int64 sequence_id = 1; optional int64 block_id = 2; - optional int64 sliding_window_start_seq = 3; + optional int32 sliding_window_start_seq = 3; } message PrepareForWriteResponse { optional int64 sequence_id = 1; From 0dae3912be63db1cf9135bb328717a5c99c2625e Mon Sep 17 00:00:00 2001 From: yangce Date: Fri, 3 Feb 2017 10:37:15 +0800 Subject: [PATCH 07/15] Add ut for SeekReceiveWindow (#820) --- src/chunkserver/test/data_block_test.cc | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/chunkserver/test/data_block_test.cc b/src/chunkserver/test/data_block_test.cc index e84b4bd9..5f30b6b3 100644 --- a/src/chunkserver/test/data_block_test.cc +++ b/src/chunkserver/test/data_block_test.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. // +#define private public #include "chunkserver/data_block.h" #include "chunkserver/file_cache.h" #include "chunkserver/disk.h" @@ -102,6 +103,28 @@ TEST_F(DataBlockTest, WriteAndReadBlock) { system("rm -rf ./block123"); } +TEST_F(DataBlockTest, SeekReceiveWindow) { + mkdir("./block123", 0755); + std::string file_path("./block123"); + Disk disk(file_path, 1000000); + disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + BlockMeta meta; + FileCache file_cache(10); + int64_t block_id = 123; + meta.set_block_id(block_id); + meta.set_store_path(file_path); + Block* block = new Block(meta, &disk, &file_cache); + block->AddRef(); + ASSERT_TRUE(block != NULL); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 0); + block->SeekReceiveWindow(50); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 50); + block->SeekReceiveWindow(200); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 200); + block->DecRef(); + system("rm -rf ./block123"); +} } } From afb614c6273e1e8da2bf2a347fc19bed45506a07 Mon Sep 17 00:00:00 2001 From: yangce Date: Mon, 6 Feb 2017 10:56:36 +0800 Subject: [PATCH 08/15] Set block size in PrepareForWrite (#820) --- src/chunkserver/chunkserver_impl.cc | 2 ++ src/chunkserver/data_block.cc | 3 +++ src/chunkserver/data_block.h | 1 + src/proto/chunkserver.proto | 1 + 4 files changed, 7 insertions(+) diff --git a/src/chunkserver/chunkserver_impl.cc b/src/chunkserver/chunkserver_impl.cc index 7c63a060..65d9f826 100644 --- a/src/chunkserver/chunkserver_impl.cc +++ b/src/chunkserver/chunkserver_impl.cc @@ -865,6 +865,7 @@ void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* control response->set_sequence_id(request->sequence_id()); int64_t block_id = request->block_id(); int32_t seq = request->sliding_window_start_seq(); + int64_t block_size = request->block_size(); LOG(INFO, "Prepare write for block #%ld from seq %d", block_id, seq); StatusCode s; @@ -874,6 +875,7 @@ void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* control block_id, StatusCode_Name(s).c_str()); } else { block->SeekReceiveWindow(seq); + block->SetSize(block_size); } response->set_status(s); done->Run(); diff --git a/src/chunkserver/data_block.cc b/src/chunkserver/data_block.cc index f1fd4e5d..3a2a7231 100644 --- a/src/chunkserver/data_block.cc +++ b/src/chunkserver/data_block.cc @@ -159,6 +159,9 @@ void Block::SetVersion(int64_t version) { int Block::GetVersion() const { return meta_.version(); } +void Block::SetSize(int64_t size) { + meta_.set_block_size(size); +} int32_t Block::GetLastSeq() const { return last_seq_; } diff --git a/src/chunkserver/data_block.h b/src/chunkserver/data_block.h index a6920fe5..9be62583 100644 --- a/src/chunkserver/data_block.h +++ b/src/chunkserver/data_block.h @@ -51,6 +51,7 @@ class Block { StatusCode SetDeleted(); void SetVersion(int64_t version); int GetVersion() const; + void SetSize(int64_t size); int32_t GetLastSeq() const; /// Set expected slice num, for IsComplete. void SetSliceNum(int32_t num); diff --git a/src/proto/chunkserver.proto b/src/proto/chunkserver.proto index 32970db9..44d94d2d 100644 --- a/src/proto/chunkserver.proto +++ b/src/proto/chunkserver.proto @@ -56,6 +56,7 @@ message PrepareForWriteRequest { optional int64 sequence_id = 1; optional int64 block_id = 2; optional int32 sliding_window_start_seq = 3; + optional int64 block_size = 4; } message PrepareForWriteResponse { optional int64 sequence_id = 1; From 676de0f8462719ae05129aed96d94675ba6b0071 Mon Sep 17 00:00:00 2001 From: yangce Date: Mon, 6 Feb 2017 13:59:01 +0800 Subject: [PATCH 09/15] Fix Block::PrepareForWrite (#820) --- src/chunkserver/chunkserver_impl.cc | 6 +++--- src/chunkserver/data_block.cc | 3 ++- src/chunkserver/data_block.h | 2 +- src/chunkserver/test/data_block_test.cc | 11 +++++++---- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/chunkserver/chunkserver_impl.cc b/src/chunkserver/chunkserver_impl.cc index 65d9f826..e8974236 100644 --- a/src/chunkserver/chunkserver_impl.cc +++ b/src/chunkserver/chunkserver_impl.cc @@ -867,15 +867,15 @@ void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* control int32_t seq = request->sliding_window_start_seq(); int64_t block_size = request->block_size(); - LOG(INFO, "Prepare write for block #%ld from seq %d", block_id, seq); + LOG(INFO, "Prepare write for block #%ld from seq %d, size %ld", + block_id, seq, block_size); StatusCode s; Block* block = block_manager_->CreateBlock(block_id, &s); if (s != kOK) { LOG(INFO, "[PrepareForWrite] block #%ld created failed, reason %s", block_id, StatusCode_Name(s).c_str()); } else { - block->SeekReceiveWindow(seq); - block->SetSize(block_size); + block->PrepareForWrite(seq, block_size); } response->set_status(s); done->Run(); diff --git a/src/chunkserver/data_block.cc b/src/chunkserver/data_block.cc index 3a2a7231..97e02c05 100644 --- a/src/chunkserver/data_block.cc +++ b/src/chunkserver/data_block.cc @@ -424,8 +424,9 @@ int64_t Block::GetExpectedSize() const { void Block::SetExpectedSize(int64_t expected_size) { expected_size_ = expected_size; } -void Block::SeekReceiveWindow(int32_t seq) { +void Block::PrepareForWrite(int32_t seq, int64_t size) { recv_window_->SeekToOffset(seq); + meta_.set_block_size(size); } /// Append to block buffer StatusCode Block::Append(int32_t seq, const char* buf, int64_t len) { diff --git a/src/chunkserver/data_block.h b/src/chunkserver/data_block.h index 9be62583..d9b4c107 100644 --- a/src/chunkserver/data_block.h +++ b/src/chunkserver/data_block.h @@ -70,9 +70,9 @@ class Block { bool IsRecover() const; int64_t GetExpectedSize() const; void SetExpectedSize(int64_t expected_size); - void SeekReceiveWindow(int32_t seq); /// Flush block to disk. bool Close(bool sync); + void PrepareForWrite(int32_t seq, int64_t size); void AddRef(); void DecRef(); int GetRef() const; diff --git a/src/chunkserver/test/data_block_test.cc b/src/chunkserver/test/data_block_test.cc index 5f30b6b3..e84db00f 100644 --- a/src/chunkserver/test/data_block_test.cc +++ b/src/chunkserver/test/data_block_test.cc @@ -103,7 +103,7 @@ TEST_F(DataBlockTest, WriteAndReadBlock) { system("rm -rf ./block123"); } -TEST_F(DataBlockTest, SeekReceiveWindow) { +TEST_F(DataBlockTest, PrepareForWrite) { mkdir("./block123", 0755); std::string file_path("./block123"); Disk disk(file_path, 1000000); @@ -118,10 +118,13 @@ TEST_F(DataBlockTest, SeekReceiveWindow) { block->AddRef(); ASSERT_TRUE(block != NULL); ASSERT_EQ(block->recv_window_->GetBaseOffset(), 0); - block->SeekReceiveWindow(50); + ASSERT_EQ(block->meta_.block_size(), 0); + block->PrepareForWrite(50, 1000); ASSERT_EQ(block->recv_window_->GetBaseOffset(), 50); - block->SeekReceiveWindow(200); - ASSERT_EQ(block->recv_window_->GetBaseOffset(), 200); + ASSERT_EQ(block->meta_.block_size(), 1000); + block->PrepareForWrite(300, 4000); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 300); + ASSERT_EQ(block->meta_.block_size(), 4000); block->DecRef(); system("rm -rf ./block123"); } From e608233e64ec59e78288ef76da6d28d151a944a8 Mon Sep 17 00:00:00 2001 From: yangce Date: Tue, 7 Feb 2017 10:34:22 +0800 Subject: [PATCH 10/15] Remove Block::SetSize (#820) --- src/chunkserver/data_block.cc | 3 --- src/chunkserver/data_block.h | 1 - 2 files changed, 4 deletions(-) diff --git a/src/chunkserver/data_block.cc b/src/chunkserver/data_block.cc index 97e02c05..b373a930 100644 --- a/src/chunkserver/data_block.cc +++ b/src/chunkserver/data_block.cc @@ -159,9 +159,6 @@ void Block::SetVersion(int64_t version) { int Block::GetVersion() const { return meta_.version(); } -void Block::SetSize(int64_t size) { - meta_.set_block_size(size); -} int32_t Block::GetLastSeq() const { return last_seq_; } diff --git a/src/chunkserver/data_block.h b/src/chunkserver/data_block.h index d9b4c107..01c96b53 100644 --- a/src/chunkserver/data_block.h +++ b/src/chunkserver/data_block.h @@ -51,7 +51,6 @@ class Block { StatusCode SetDeleted(); void SetVersion(int64_t version); int GetVersion() const; - void SetSize(int64_t size); int32_t GetLastSeq() const; /// Set expected slice num, for IsComplete. void SetSliceNum(int32_t num); From 44ddce438433b4ad0e340eac35825b232bf66193 Mon Sep 17 00:00:00 2001 From: yangce Date: Tue, 7 Feb 2017 11:17:32 +0800 Subject: [PATCH 11/15] Add AddRecoverBlock (#820) --- src/nameserver/block_mapping.cc | 17 +++++++++++++++++ src/nameserver/block_mapping.h | 10 ++++++++++ src/nameserver/block_mapping_manager.cc | 8 ++++++++ src/nameserver/block_mapping_manager.h | 2 ++ src/nameserver/nameserver_impl.cc | 14 +++++++++++++- src/nameserver/test/block_mapping_test.cc | 17 ++++++++++++++++- 6 files changed, 66 insertions(+), 2 deletions(-) diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index c7803ca4..7eab3fbe 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -975,5 +975,22 @@ void BlockMapping::MarkIncomplete(int64_t block_id) { } } +void BlockMapping::AddRecoverBlock(int64_t block_id, int32_t cs_id, + int64_t start_offset, int64_t end_offset) { + RecoverInfo* recover_info = new RecoverInfo(cs_id, start_offset, end_offset); + MutexLock lock(&mu_); + auto it = recover_writing_blocks_.find(block_id); + if (it != recover_writing_blocks_.end()) { + RecoverInfo* info = it->second; + LOG(WARNING, "Block #%ld recover to C%d from offset %ld to %ld ", + "already in recover map", + it->first, info->cs_id, info->start_offset, info->end_offset); + delete info; + it->second = recover_info; + } else { + recover_writing_blocks_[block_id] = recover_info; + } +} + } // namespace bfs } // namespace baidu diff --git a/src/nameserver/block_mapping.h b/src/nameserver/block_mapping.h index b18a1de8..0880a699 100644 --- a/src/nameserver/block_mapping.h +++ b/src/nameserver/block_mapping.h @@ -81,6 +81,8 @@ class BlockMapping { void ListRecover(RecoverBlockSet* blocks); int32_t GetCheckNum(); void MarkIncomplete(int64_t block_id); + void AddRecoverBlock(int64_t block_id, int32_t cs_id, + int64_t start_offset, int64_t end_offset); private: void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id); typedef std::map > CheckList; @@ -113,6 +115,14 @@ class BlockMapping { std::set lo_pri_recover_; std::set hi_pri_recover_; std::set lost_blocks_; + struct RecoverInfo { + RecoverInfo(int32_t c_id, int64_t s_offset, int64_t e_offset) : + cs_id(c_id), start_offset(s_offset), end_offset(e_offset) { } + int32_t cs_id; + int64_t start_offset; + int64_t end_offset; + }; + std::map recover_writing_blocks_; }; } // namespace bfs diff --git a/src/nameserver/block_mapping_manager.cc b/src/nameserver/block_mapping_manager.cc index 032685e5..e838b454 100644 --- a/src/nameserver/block_mapping_manager.cc +++ b/src/nameserver/block_mapping_manager.cc @@ -159,5 +159,13 @@ void BlockMappingManager::MarkIncomplete(int64_t block_id) { block_mapping_[bucket_offset]->MarkIncomplete(block_id); } +void BlockMappingManager::AddRecoverBlock(int64_t block_id, int32_t cs_id, + int64_t start_offset, + int64_t end_offset) { + int32_t bucket_offset = GetBucketOffset(block_id); + block_mapping_[bucket_offset]->AddRecoverBlock(block_id, cs_id, + start_offset, end_offset); +} + } //namespace bfs } //namespace baidu diff --git a/src/nameserver/block_mapping_manager.h b/src/nameserver/block_mapping_manager.h index ec86bd4e..4ec7748b 100644 --- a/src/nameserver/block_mapping_manager.h +++ b/src/nameserver/block_mapping_manager.h @@ -40,6 +40,8 @@ public : void GetRecoverNum(int32_t bucket_id, RecoverBlockNum* recover_num); void ListRecover(RecoverBlockSet* recover_blocks); void MarkIncomplete(int64_t block_id); + void AddRecoverBlock(int64_t block_id, int32_t cs_id, int64_t start_offset, + int64_t end_offset); private: int32_t GetBucketOffset(int64_t block_id); private: diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index 2ac809a1..92677983 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -1055,7 +1055,19 @@ void NameServerImpl::StartRecoverBlock(::google::protobuf::RpcController* contro const StartRecoverBlockRequest* request, StartRecoverBlockResponse* response, ::google::protobuf::Closure* done) { - + if (!is_leader_) { + response->set_status(kIsFollower); + done->Run(); + return; + } + int64_t block_id = request->block_id(); + int32_t cs_id = request->chunkserver_id(); + int64_t start_offset = request->start_offset(); + int64_t end_offset = request->end_offset(); + LOG(INFO, "Start recover block #%ld to cs C%d, from offset %ld to %ld", + block_id, cs_id, start_offset, end_offset); + block_mapping_manager_->AddRecoverBlock(block_id, cs_id, + start_offset, end_offset); } void NameServerImpl::TransToString(const std::map >& chk_set, diff --git a/src/nameserver/test/block_mapping_test.cc b/src/nameserver/test/block_mapping_test.cc index d2deda8f..95d24378 100644 --- a/src/nameserver/test/block_mapping_test.cc +++ b/src/nameserver/test/block_mapping_test.cc @@ -58,7 +58,7 @@ TEST_F(BlockMappingTest, Basic) { } } -TEST_F(BlockMappingTest, DoNotRemoteHigherVersionBlock) { +TEST_F(BlockMappingTest, DoNotRemoveHigherVersionBlock) { int64_t block_id = 1; int64_t block_version = 2; int64_t block_size = 3; @@ -126,6 +126,21 @@ TEST_F(BlockMappingTest, NotRecoverEmptyBlock) { ASSERT_TRUE(bm->lost_blocks_.empty()); } +TEST_F(BlockMappingTest, AddRecoverBlock) { + BlockMapping* bm = new BlockMapping(&thread_pool); + int64_t block_id = 1; + int32_t cs_id = 2; + int64_t start_offset = 300; + int64_t end_offset = 400; + bm->AddRecoverBlock(block_id, cs_id, start_offset, end_offset); + auto it = bm->recover_writing_blocks_.find(block_id); + ASSERT_TRUE(it != bm->recover_writing_blocks_.end()); + BlockMapping::RecoverInfo* info = it->second; + ASSERT_EQ(info->cs_id, cs_id); + ASSERT_EQ(info->start_offset, start_offset); + ASSERT_EQ(info->end_offset, end_offset); +} + } // namespace bfs } // namespace baidu From 6f3a2898093cc6b313e414370994817ee0a22b0b Mon Sep 17 00:00:00 2001 From: yangce Date: Tue, 7 Feb 2017 11:36:21 +0800 Subject: [PATCH 12/15] Fix AddRecoverBlock (#820) --- src/nameserver/block_mapping.cc | 9 +++++++-- src/nameserver/block_mapping.h | 7 ------- src/nameserver/test/block_mapping_test.cc | 9 +++++---- src/proto/nameserver.proto | 7 +++++++ 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index 7eab3fbe..2fedba1b 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -977,14 +977,19 @@ void BlockMapping::MarkIncomplete(int64_t block_id) { void BlockMapping::AddRecoverBlock(int64_t block_id, int32_t cs_id, int64_t start_offset, int64_t end_offset) { - RecoverInfo* recover_info = new RecoverInfo(cs_id, start_offset, end_offset); + RecoverInfo* recover_info = new RecoverInfo; + recover_info->set_block_id(block_id); + recover_info->set_cs_id(cs_id); + recover_info->set_start_offset(start_offset); + recover_info->set_end_offset(end_offset); MutexLock lock(&mu_); auto it = recover_writing_blocks_.find(block_id); if (it != recover_writing_blocks_.end()) { RecoverInfo* info = it->second; LOG(WARNING, "Block #%ld recover to C%d from offset %ld to %ld ", "already in recover map", - it->first, info->cs_id, info->start_offset, info->end_offset); + info->block_id(), info->cs_id(), info->start_offset(), + info->end_offset()); delete info; it->second = recover_info; } else { diff --git a/src/nameserver/block_mapping.h b/src/nameserver/block_mapping.h index 0880a699..d75b27ef 100644 --- a/src/nameserver/block_mapping.h +++ b/src/nameserver/block_mapping.h @@ -115,13 +115,6 @@ class BlockMapping { std::set lo_pri_recover_; std::set hi_pri_recover_; std::set lost_blocks_; - struct RecoverInfo { - RecoverInfo(int32_t c_id, int64_t s_offset, int64_t e_offset) : - cs_id(c_id), start_offset(s_offset), end_offset(e_offset) { } - int32_t cs_id; - int64_t start_offset; - int64_t end_offset; - }; std::map recover_writing_blocks_; }; diff --git a/src/nameserver/test/block_mapping_test.cc b/src/nameserver/test/block_mapping_test.cc index 95d24378..5eddf0bd 100644 --- a/src/nameserver/test/block_mapping_test.cc +++ b/src/nameserver/test/block_mapping_test.cc @@ -135,10 +135,11 @@ TEST_F(BlockMappingTest, AddRecoverBlock) { bm->AddRecoverBlock(block_id, cs_id, start_offset, end_offset); auto it = bm->recover_writing_blocks_.find(block_id); ASSERT_TRUE(it != bm->recover_writing_blocks_.end()); - BlockMapping::RecoverInfo* info = it->second; - ASSERT_EQ(info->cs_id, cs_id); - ASSERT_EQ(info->start_offset, start_offset); - ASSERT_EQ(info->end_offset, end_offset); + RecoverInfo* info = it->second; + ASSERT_EQ(info->block_id(), block_id); + ASSERT_EQ(info->cs_id(), cs_id); + ASSERT_EQ(info->start_offset(), start_offset); + ASSERT_EQ(info->end_offset(), end_offset); } } // namespace bfs diff --git a/src/proto/nameserver.proto b/src/proto/nameserver.proto index b9454726..07559da6 100644 --- a/src/proto/nameserver.proto +++ b/src/proto/nameserver.proto @@ -363,6 +363,13 @@ message StartRecoverBlockResponse { optional StatusCode status = 2; } +message RecoverInfo { + optional int64 block_id = 1; + optional int32 cs_id = 2; + optional int64 start_offset = 3; + optional int64 end_offset = 4; +} + service NameServer { rpc CreateFile(CreateFileRequest) returns(CreateFileResponse); rpc AddBlock(AddBlockRequest) returns(AddBlockResponse); From a985be59cb04489e168b0af3613b01c646b326d0 Mon Sep 17 00:00:00 2001 From: yangce Date: Tue, 7 Feb 2017 15:26:59 +0800 Subject: [PATCH 13/15] Add PickRecoverWritingBlock (#820) --- Makefile | 8 ++++-- src/nameserver/block_mapping.cc | 24 +++++++++++++++++ src/nameserver/block_mapping.h | 4 +++ src/nameserver/block_mapping_manager.cc | 10 +++++++ src/nameserver/block_mapping_manager.h | 4 +++ src/nameserver/chunkserver_manager.cc | 32 +++++++++++++++++++++++ src/nameserver/chunkserver_manager.h | 7 ++++- src/nameserver/nameserver_impl.cc | 5 ++++ src/nameserver/test/block_mapping_test.cc | 29 ++++++++++++++++++++ src/proto/nameserver.proto | 1 + 10 files changed, 121 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 09914679..cdf62998 100644 --- a/Makefile +++ b/Makefile @@ -126,9 +126,13 @@ nameserver_test: src/nameserver/test/nameserver_impl_test.o \ src/nameserver/namespace.o src/nameserver/raft_impl.o \ src/nameserver/raft_node.o $(OBJS) -o $@ $(LDFLAGS) -block_mapping_test: src/nameserver/test/block_mapping_test.o src/nameserver/block_mapping.o +block_mapping_test: src/nameserver/test/block_mapping_test.o \ + src/nameserver/block_mapping.o src/nameserver/chunkserver_manager.o \ + src/nameserver/location_provider.o $(CXX) src/nameserver/block_mapping.o src/nameserver/test/block_mapping_test.o \ - src/nameserver/block_mapping_manager.o $(OBJS) -o $@ $(LDFLAGS) + src/nameserver/block_mapping_manager.o src/nameserver/chunkserver_manager.o \ + src/nameserver/location_provider.o \ + $(OBJS) -o $@ $(LDFLAGS) logdb_test: src/nameserver/test/logdb_test.o src/nameserver/logdb.o $(CXX) src/nameserver/logdb.o src/nameserver/test/logdb_test.o $(OBJS) -o $@ $(LDFLAGS) diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index 2fedba1b..156d3845 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. #include "block_mapping.h" +#include "chunkserver_manager.h" #include #include @@ -997,5 +998,28 @@ void BlockMapping::AddRecoverBlock(int64_t block_id, int32_t cs_id, } } +void BlockMapping::PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks) { + int32_t cs_id = cs_block_map->GetChunkServerId(); + MutexLock lock(&mu_); + for (auto it = recover_writing_blocks_.begin(); + it != recover_writing_blocks_.end(); ++it) { + int64_t block_id = it->first; + if (!cs_block_map->BlockExists(block_id)) { + continue; + } + RecoverInfo* info = it->second; + if (info->cs_id() == cs_id) { + continue; + } + RecoverInfo* block = recover_blocks->Add(); + block->CopyFrom(*info); + LOG(INFO, "Pick writing block #%ld from C%ld recover to C%ld", + "start offset %ld end offset %ld", + info->block_id(), cs_id, info->cs_id(), + info->start_offset(), info->end_offset()); + } +} + } // namespace bfs } // namespace baidu diff --git a/src/nameserver/block_mapping.h b/src/nameserver/block_mapping.h index d75b27ef..4549ae04 100644 --- a/src/nameserver/block_mapping.h +++ b/src/nameserver/block_mapping.h @@ -55,6 +55,8 @@ struct RecoverBlockSet { std::map > incomplete; }; +class Blocks; + class BlockMapping { public: BlockMapping(ThreadPool* thread_pool); @@ -83,6 +85,8 @@ class BlockMapping { void MarkIncomplete(int64_t block_id); void AddRecoverBlock(int64_t block_id, int32_t cs_id, int64_t start_offset, int64_t end_offset); + void PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks); private: void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id); typedef std::map > CheckList; diff --git a/src/nameserver/block_mapping_manager.cc b/src/nameserver/block_mapping_manager.cc index e838b454..25564d97 100644 --- a/src/nameserver/block_mapping_manager.cc +++ b/src/nameserver/block_mapping_manager.cc @@ -167,5 +167,15 @@ void BlockMappingManager::AddRecoverBlock(int64_t block_id, int32_t cs_id, start_offset, end_offset); } +void BlockMappingManager::PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks) { + //TODO use flag + for (int i = 0; i < blockmapping_bucket_num_ && + recover_blocks->size() < 100; i++) { + block_mapping_[i]->PickRecoverWritingBlocks(cs_block_map, + recover_blocks); + } +} + } //namespace bfs } //namespace baidu diff --git a/src/nameserver/block_mapping_manager.h b/src/nameserver/block_mapping_manager.h index 4ec7748b..75c95649 100644 --- a/src/nameserver/block_mapping_manager.h +++ b/src/nameserver/block_mapping_manager.h @@ -13,6 +13,8 @@ namespace baidu { namespace bfs { +class Blocks; + class BlockMappingManager { public : BlockMappingManager(int32_t bucket_num); @@ -42,6 +44,8 @@ public : void MarkIncomplete(int64_t block_id); void AddRecoverBlock(int64_t block_id, int32_t cs_id, int64_t start_offset, int64_t end_offset); + void PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks); private: int32_t GetBucketOffset(int64_t block_id); private: diff --git a/src/nameserver/chunkserver_manager.cc b/src/nameserver/chunkserver_manager.cc index 9d758358..42eaf588 100644 --- a/src/nameserver/chunkserver_manager.cc +++ b/src/nameserver/chunkserver_manager.cc @@ -103,6 +103,28 @@ int64_t Blocks::CheckLost(int64_t report_id, const std::set& blocks, return report_id; } +bool Blocks::BlockExists(int64_t block_id) { + // blocks in new_blocks_ maybe merged into blocks_, + // so we must look up new_blocks_ first + { + MutexLock lock(&new_blocks_mu_); + if (new_blocks_.find(block_id) != new_blocks_.end()) { + return true; + } + } + { + MutexLock lock(&block_mu_); + if (blocks_.find(block_id) != blocks_.end()) { + return true; + } + } + return false; +} + +int32_t Blocks::GetChunkServerId() const { + return cs_id_; +} + ChunkServerManager::ChunkServerManager(ThreadPool* thread_pool, BlockMappingManager* block_mapping_manager) : thread_pool_(thread_pool), block_mapping_manager_(block_mapping_manager), @@ -699,6 +721,16 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks } } +void ChunkServerManager::PickRecoverWritingBlocks(int32_t cs_id, + ::google::protobuf::RepeatedPtrField* recover_block_info) { + Blocks* block_map = GetBlockMap(cs_id); + if (!block_map) { + LOG(FATAL, "Get block map for C%d fail", cs_id); + } + block_mapping_manager_->PickRecoverWritingBlocks(block_map, + recover_block_info); +} + void ChunkServerManager::GetStat(int32_t* w_qps, int64_t* w_speed, int32_t* r_qps, int64_t* r_speed, int64_t* recover_speed) { if (w_qps) *w_qps = stats_.w_qps; diff --git a/src/nameserver/chunkserver_manager.h b/src/nameserver/chunkserver_manager.h index 754d3ec8..e2dfa2a4 100644 --- a/src/nameserver/chunkserver_manager.h +++ b/src/nameserver/chunkserver_manager.h @@ -29,6 +29,8 @@ class Blocks { void MoveNew(); int64_t CheckLost(int64_t report_id, const std::set& blocks, int64_t start, int64_t end, std::vector* lost); + bool BlockExists(int64_t block_id); + int32_t GetChunkServerId() const; private: Mutex block_mu_; std::set blocks_; @@ -67,7 +69,10 @@ class ChunkServerManager { void AddBlock(int32_t id, int64_t block_id); void RemoveBlock(int32_t id, int64_t block_id); void CleanChunkServer(ChunkServerInfo* cs, const std::string& reason); - void PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks, int* hi_num, bool hi_only); + void PickRecoverBlocks(int32_t cs_id, RecoverVec* recover_blocks, + int32_t* hi_num, bool hi_only); + void PickRecoverWritingBlocks(int32_t cs_id, + ::google::protobuf::RepeatedPtrField* recover_block_info); void GetStat(int32_t* w_qps, int64_t* w_speed, int32_t* r_qps, int64_t* r_speed, int64_t* recover_speed); StatusCode ShutdownChunkServer(const::google::protobuf::RepeatedPtrField& chunkserver_address); diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index 92677983..c12abd84 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -318,6 +318,11 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller, } LOG(INFO, "Response to C%d %s new_replicas_size= %d", cs_id, request->chunkserver_addr().c_str(), response->new_replicas_size()); + + ::google::protobuf::RepeatedPtrField* + recover_writing_blocks = response->mutable_recover_writing_blocks(); + chunkserver_manager_->PickRecoverWritingBlocks(cs_id, + recover_writing_blocks); } block_mapping_manager_->GetCloseBlocks(cs_id, response->mutable_close_blocks()); int64_t end_report = common::timer::get_micros(); diff --git a/src/nameserver/test/block_mapping_test.cc b/src/nameserver/test/block_mapping_test.cc index 5eddf0bd..ae2a379e 100644 --- a/src/nameserver/test/block_mapping_test.cc +++ b/src/nameserver/test/block_mapping_test.cc @@ -5,7 +5,9 @@ #define private public #include "nameserver/block_mapping.h" +#include "nameserver/chunkserver_manager.h" #include "proto/status_code.pb.h" +#include "proto/nameserver.pb.h" #include @@ -142,6 +144,33 @@ TEST_F(BlockMappingTest, AddRecoverBlock) { ASSERT_EQ(info->end_offset(), end_offset); } +TEST_F(BlockMappingTest, PickRecoverWritingBlocks) { + BlockMapping* bm = new BlockMapping(&thread_pool); + int64_t block_id = 1; + int32_t cs_id1 = 1; + int64_t start_offset = 300; + int64_t end_offset = 400; + bm->AddRecoverBlock(block_id, cs_id1, start_offset, end_offset); + Blocks* blocks1 = new Blocks(cs_id1); + blocks1->Insert(block_id); + BlockReportResponse response; + ::google::protobuf::RepeatedPtrField* result = + response.mutable_recover_writing_blocks(); + bm->PickRecoverWritingBlocks(blocks1, result); + ASSERT_EQ(result->size(), 0); + result->Clear(); + int32_t cs_id2 = 2; + Blocks* blocks2 = new Blocks(cs_id2); + blocks2->Insert(block_id); + bm->PickRecoverWritingBlocks(blocks2, result); + ASSERT_EQ(result->size(), 1); + result->Clear(); + int32_t cs_id3 = 3; + Blocks* blocks3 = new Blocks(cs_id3); + bm->PickRecoverWritingBlocks(blocks3, result); + ASSERT_EQ(result->size(), 0); +} + } // namespace bfs } // namespace baidu diff --git a/src/proto/nameserver.proto b/src/proto/nameserver.proto index 07559da6..2b528a0d 100644 --- a/src/proto/nameserver.proto +++ b/src/proto/nameserver.proto @@ -238,6 +238,7 @@ message BlockReportResponse { repeated int64 close_blocks = 4; repeated ReplicaInfo new_replicas = 5; optional int64 report_id = 6 [default = -1]; + repeated RecoverInfo recover_writing_blocks = 7; } message BlockReceivedRequest { From 78abd0fc842cea3fc67f4396bcad9de3b665c7cc Mon Sep 17 00:00:00 2001 From: yangce Date: Tue, 7 Feb 2017 15:57:51 +0800 Subject: [PATCH 14/15] Return cs_addr in PickRecoverWritingBlocks (#820) --- src/nameserver/chunkserver_manager.cc | 5 +++++ src/nameserver/nameserver_impl.cc | 3 ++- src/nameserver/test/block_mapping_test.cc | 5 +++++ src/proto/nameserver.proto | 12 +++++++----- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/nameserver/chunkserver_manager.cc b/src/nameserver/chunkserver_manager.cc index 42eaf588..c1be52b2 100644 --- a/src/nameserver/chunkserver_manager.cc +++ b/src/nameserver/chunkserver_manager.cc @@ -729,6 +729,11 @@ void ChunkServerManager::PickRecoverWritingBlocks(int32_t cs_id, } block_mapping_manager_->PickRecoverWritingBlocks(block_map, recover_block_info); + // covert cs_id to cs_addr + for (int i = 0; i < recover_block_info->size(); i++) { + RecoverInfo* info = recover_block_info->Mutable(i); + info->set_cs_addr(GetChunkServerAddr(info->cs_id())); + } } void ChunkServerManager::GetStat(int32_t* w_qps, int64_t* w_speed, diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index c12abd84..70e21e8f 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -1066,9 +1066,10 @@ void NameServerImpl::StartRecoverBlock(::google::protobuf::RpcController* contro return; } int64_t block_id = request->block_id(); - int32_t cs_id = request->chunkserver_id(); + const std::string& cs_addr = request->chunkserver_addr(); int64_t start_offset = request->start_offset(); int64_t end_offset = request->end_offset(); + int32_t cs_id = chunkserver_manager_->GetChunkServerId(cs_addr); LOG(INFO, "Start recover block #%ld to cs C%d, from offset %ld to %ld", block_id, cs_id, start_offset, end_offset); block_mapping_manager_->AddRecoverBlock(block_id, cs_id, diff --git a/src/nameserver/test/block_mapping_test.cc b/src/nameserver/test/block_mapping_test.cc index ae2a379e..6e93c913 100644 --- a/src/nameserver/test/block_mapping_test.cc +++ b/src/nameserver/test/block_mapping_test.cc @@ -164,6 +164,11 @@ TEST_F(BlockMappingTest, PickRecoverWritingBlocks) { blocks2->Insert(block_id); bm->PickRecoverWritingBlocks(blocks2, result); ASSERT_EQ(result->size(), 1); + const RecoverInfo& info = result->Get(0); + ASSERT_EQ(info.block_id(), block_id); + ASSERT_EQ(info.cs_id(), cs_id1); + ASSERT_EQ(info.start_offset(), start_offset); + ASSERT_EQ(info.end_offset(), end_offset); result->Clear(); int32_t cs_id3 = 3; Blocks* blocks3 = new Blocks(cs_id3); diff --git a/src/proto/nameserver.proto b/src/proto/nameserver.proto index 2b528a0d..7226ac9d 100644 --- a/src/proto/nameserver.proto +++ b/src/proto/nameserver.proto @@ -354,9 +354,10 @@ message GetChunkServerResponse { message StartRecoverBlockRequest { optional int64 sequence_id = 1; optional int32 chunkserver_id = 2; - optional int64 block_id = 3; - optional int64 start_offset = 4; - optional int64 end_offset = 5; + optional string chunkserver_addr = 3; + optional int64 block_id = 4; + optional int64 start_offset = 5; + optional int64 end_offset = 6; } message StartRecoverBlockResponse { @@ -367,8 +368,9 @@ message StartRecoverBlockResponse { message RecoverInfo { optional int64 block_id = 1; optional int32 cs_id = 2; - optional int64 start_offset = 3; - optional int64 end_offset = 4; + optional string cs_addr = 3; + optional int64 start_offset = 4; + optional int64 end_offset = 5; } service NameServer { From 33d986c1c0076df90d5e9af608cadf6d90c45507 Mon Sep 17 00:00:00 2001 From: yangce Date: Tue, 7 Feb 2017 16:02:50 +0800 Subject: [PATCH 15/15] Revert "Disable close incomplete blocks" This reverts commit 9bdf559045fcc161af74cb993bbf2d1262cde66b. --- src/nameserver/block_mapping.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index 156d3845..6f2180e4 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -753,7 +753,6 @@ void BlockMapping::ProcessRecoveredBlock(int32_t cs_id, int64_t block_id, Status void BlockMapping::GetCloseBlocks(int32_t cs_id, google::protobuf::RepeatedField* close_blocks) { - /* MutexLock lock(&mu_); CheckList::iterator c_it = incomplete_.find(cs_id); if (c_it != incomplete_.end()) { @@ -763,7 +762,6 @@ void BlockMapping::GetCloseBlocks(int32_t cs_id, close_blocks->Add(*it); } } - */ } void BlockMapping::GetStat(int32_t cs_id, RecoverBlockNum* recover_num) {