diff --git a/Makefile b/Makefile index 09914679..cdf62998 100644 --- a/Makefile +++ b/Makefile @@ -126,9 +126,13 @@ nameserver_test: src/nameserver/test/nameserver_impl_test.o \ src/nameserver/namespace.o src/nameserver/raft_impl.o \ src/nameserver/raft_node.o $(OBJS) -o $@ $(LDFLAGS) -block_mapping_test: src/nameserver/test/block_mapping_test.o src/nameserver/block_mapping.o +block_mapping_test: src/nameserver/test/block_mapping_test.o \ + src/nameserver/block_mapping.o src/nameserver/chunkserver_manager.o \ + src/nameserver/location_provider.o $(CXX) src/nameserver/block_mapping.o src/nameserver/test/block_mapping_test.o \ - src/nameserver/block_mapping_manager.o $(OBJS) -o $@ $(LDFLAGS) + src/nameserver/block_mapping_manager.o src/nameserver/chunkserver_manager.o \ + src/nameserver/location_provider.o \ + $(OBJS) -o $@ $(LDFLAGS) logdb_test: src/nameserver/test/logdb_test.o src/nameserver/logdb.o $(CXX) src/nameserver/logdb.o src/nameserver/test/logdb_test.o $(OBJS) -o $@ $(LDFLAGS) diff --git a/src/chunkserver/chunkserver_impl.cc b/src/chunkserver/chunkserver_impl.cc index c219a696..e8974236 100644 --- a/src/chunkserver/chunkserver_impl.cc +++ b/src/chunkserver/chunkserver_impl.cc @@ -858,6 +858,29 @@ void ChunkServerImpl::GetBlockInfo(::google::protobuf::RpcController* controller } +void ChunkServerImpl::PrepareForWrite(::google::protobuf::RpcController* controller, + const PrepareForWriteRequest* request, + PrepareForWriteResponse* response, + ::google::protobuf::Closure* done) { + response->set_sequence_id(request->sequence_id()); + int64_t block_id = request->block_id(); + int32_t seq = request->sliding_window_start_seq(); + int64_t block_size = request->block_size(); + + LOG(INFO, "Prepare write for block #%ld from seq %d, size %ld", + block_id, seq, block_size); + StatusCode s; + Block* block = block_manager_->CreateBlock(block_id, &s); + if (s != kOK) { + LOG(INFO, "[PrepareForWrite] block #%ld created failed, reason %s", + block_id, StatusCode_Name(s).c_str()); + } else { + block->PrepareForWrite(seq, block_size); + } + response->set_status(s); + done->Run(); +} + bool ChunkServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response) { ChunkserverStat c_stat = counter_manager_.GetCounters(); diff --git a/src/chunkserver/chunkserver_impl.h b/src/chunkserver/chunkserver_impl.h index 011a9c4b..57387fee 100644 --- a/src/chunkserver/chunkserver_impl.h +++ b/src/chunkserver/chunkserver_impl.h @@ -49,6 +49,10 @@ class ChunkServerImpl : public ChunkServer { const GetBlockInfoRequest* request, GetBlockInfoResponse* response, ::google::protobuf::Closure* done); + virtual void PrepareForWrite(::google::protobuf::RpcController* controller, + const PrepareForWriteRequest* request, + PrepareForWriteResponse* response, + ::google::protobuf::Closure* done); bool WebService(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response); private: diff --git a/src/chunkserver/data_block.cc b/src/chunkserver/data_block.cc index 86d0c8cd..b373a930 100644 --- a/src/chunkserver/data_block.cc +++ b/src/chunkserver/data_block.cc @@ -421,6 +421,10 @@ int64_t Block::GetExpectedSize() const { void Block::SetExpectedSize(int64_t expected_size) { expected_size_ = expected_size; } +void Block::PrepareForWrite(int32_t seq, int64_t size) { + recv_window_->SeekToOffset(seq); + meta_.set_block_size(size); +} /// Append to block buffer StatusCode Block::Append(int32_t seq, const char* buf, int64_t len) { mu_.AssertHeld(); diff --git a/src/chunkserver/data_block.h b/src/chunkserver/data_block.h index 62541140..01c96b53 100644 --- a/src/chunkserver/data_block.h +++ b/src/chunkserver/data_block.h @@ -71,6 +71,7 @@ class Block { void SetExpectedSize(int64_t expected_size); /// Flush block to disk. bool Close(bool sync); + void PrepareForWrite(int32_t seq, int64_t size); void AddRef(); void DecRef(); int GetRef() const; diff --git a/src/chunkserver/test/data_block_test.cc b/src/chunkserver/test/data_block_test.cc index e84b4bd9..e84db00f 100644 --- a/src/chunkserver/test/data_block_test.cc +++ b/src/chunkserver/test/data_block_test.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. // +#define private public #include "chunkserver/data_block.h" #include "chunkserver/file_cache.h" #include "chunkserver/disk.h" @@ -102,6 +103,31 @@ TEST_F(DataBlockTest, WriteAndReadBlock) { system("rm -rf ./block123"); } +TEST_F(DataBlockTest, PrepareForWrite) { + mkdir("./block123", 0755); + std::string file_path("./block123"); + Disk disk(file_path, 1000000); + disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); + BlockMeta meta; + FileCache file_cache(10); + int64_t block_id = 123; + meta.set_block_id(block_id); + meta.set_store_path(file_path); + Block* block = new Block(meta, &disk, &file_cache); + block->AddRef(); + ASSERT_TRUE(block != NULL); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 0); + ASSERT_EQ(block->meta_.block_size(), 0); + block->PrepareForWrite(50, 1000); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 50); + ASSERT_EQ(block->meta_.block_size(), 1000); + block->PrepareForWrite(300, 4000); + ASSERT_EQ(block->recv_window_->GetBaseOffset(), 300); + ASSERT_EQ(block->meta_.block_size(), 4000); + block->DecRef(); + system("rm -rf ./block123"); +} } } diff --git a/src/nameserver/block_mapping.cc b/src/nameserver/block_mapping.cc index 1cdb9e1b..6f2180e4 100644 --- a/src/nameserver/block_mapping.cc +++ b/src/nameserver/block_mapping.cc @@ -3,6 +3,7 @@ // found in the LICENSE file. #include "block_mapping.h" +#include "chunkserver_manager.h" #include #include @@ -67,8 +68,12 @@ bool BlockMapping::GetLocatedBlock(int64_t id, std::vector* replica, in if (replica->empty()) { LOG(DEBUG, "Block #%ld lost all replica", id); } - *size = block->block_size; - *status = block->recover_stat; + if (size) { + *size = block->block_size; + } + if (status) { + *status = block->recover_stat; + } return true; } @@ -969,5 +974,50 @@ void BlockMapping::MarkIncomplete(int64_t block_id) { } } +void BlockMapping::AddRecoverBlock(int64_t block_id, int32_t cs_id, + int64_t start_offset, int64_t end_offset) { + RecoverInfo* recover_info = new RecoverInfo; + recover_info->set_block_id(block_id); + recover_info->set_cs_id(cs_id); + recover_info->set_start_offset(start_offset); + recover_info->set_end_offset(end_offset); + MutexLock lock(&mu_); + auto it = recover_writing_blocks_.find(block_id); + if (it != recover_writing_blocks_.end()) { + RecoverInfo* info = it->second; + LOG(WARNING, "Block #%ld recover to C%d from offset %ld to %ld ", + "already in recover map", + info->block_id(), info->cs_id(), info->start_offset(), + info->end_offset()); + delete info; + it->second = recover_info; + } else { + recover_writing_blocks_[block_id] = recover_info; + } +} + +void BlockMapping::PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks) { + int32_t cs_id = cs_block_map->GetChunkServerId(); + MutexLock lock(&mu_); + for (auto it = recover_writing_blocks_.begin(); + it != recover_writing_blocks_.end(); ++it) { + int64_t block_id = it->first; + if (!cs_block_map->BlockExists(block_id)) { + continue; + } + RecoverInfo* info = it->second; + if (info->cs_id() == cs_id) { + continue; + } + RecoverInfo* block = recover_blocks->Add(); + block->CopyFrom(*info); + LOG(INFO, "Pick writing block #%ld from C%ld recover to C%ld", + "start offset %ld end offset %ld", + info->block_id(), cs_id, info->cs_id(), + info->start_offset(), info->end_offset()); + } +} + } // namespace bfs } // namespace baidu diff --git a/src/nameserver/block_mapping.h b/src/nameserver/block_mapping.h index b18a1de8..4549ae04 100644 --- a/src/nameserver/block_mapping.h +++ b/src/nameserver/block_mapping.h @@ -55,6 +55,8 @@ struct RecoverBlockSet { std::map > incomplete; }; +class Blocks; + class BlockMapping { public: BlockMapping(ThreadPool* thread_pool); @@ -81,6 +83,10 @@ class BlockMapping { void ListRecover(RecoverBlockSet* blocks); int32_t GetCheckNum(); void MarkIncomplete(int64_t block_id); + void AddRecoverBlock(int64_t block_id, int32_t cs_id, + int64_t start_offset, int64_t end_offset); + void PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks); private: void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id); typedef std::map > CheckList; @@ -113,6 +119,7 @@ class BlockMapping { std::set lo_pri_recover_; std::set hi_pri_recover_; std::set lost_blocks_; + std::map recover_writing_blocks_; }; } // namespace bfs diff --git a/src/nameserver/block_mapping_manager.cc b/src/nameserver/block_mapping_manager.cc index 032685e5..25564d97 100644 --- a/src/nameserver/block_mapping_manager.cc +++ b/src/nameserver/block_mapping_manager.cc @@ -159,5 +159,23 @@ void BlockMappingManager::MarkIncomplete(int64_t block_id) { block_mapping_[bucket_offset]->MarkIncomplete(block_id); } +void BlockMappingManager::AddRecoverBlock(int64_t block_id, int32_t cs_id, + int64_t start_offset, + int64_t end_offset) { + int32_t bucket_offset = GetBucketOffset(block_id); + block_mapping_[bucket_offset]->AddRecoverBlock(block_id, cs_id, + start_offset, end_offset); +} + +void BlockMappingManager::PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks) { + //TODO use flag + for (int i = 0; i < blockmapping_bucket_num_ && + recover_blocks->size() < 100; i++) { + block_mapping_[i]->PickRecoverWritingBlocks(cs_block_map, + recover_blocks); + } +} + } //namespace bfs } //namespace baidu diff --git a/src/nameserver/block_mapping_manager.h b/src/nameserver/block_mapping_manager.h index ec86bd4e..75c95649 100644 --- a/src/nameserver/block_mapping_manager.h +++ b/src/nameserver/block_mapping_manager.h @@ -13,6 +13,8 @@ namespace baidu { namespace bfs { +class Blocks; + class BlockMappingManager { public : BlockMappingManager(int32_t bucket_num); @@ -40,6 +42,10 @@ public : void GetRecoverNum(int32_t bucket_id, RecoverBlockNum* recover_num); void ListRecover(RecoverBlockSet* recover_blocks); void MarkIncomplete(int64_t block_id); + void AddRecoverBlock(int64_t block_id, int32_t cs_id, int64_t start_offset, + int64_t end_offset); + void PickRecoverWritingBlocks(Blocks* cs_block_map, + ::google::protobuf::RepeatedPtrField* recover_blocks); private: int32_t GetBucketOffset(int64_t block_id); private: diff --git a/src/nameserver/chunkserver_manager.cc b/src/nameserver/chunkserver_manager.cc index c5533fa1..c1be52b2 100644 --- a/src/nameserver/chunkserver_manager.cc +++ b/src/nameserver/chunkserver_manager.cc @@ -103,6 +103,28 @@ int64_t Blocks::CheckLost(int64_t report_id, const std::set& blocks, return report_id; } +bool Blocks::BlockExists(int64_t block_id) { + // blocks in new_blocks_ maybe merged into blocks_, + // so we must look up new_blocks_ first + { + MutexLock lock(&new_blocks_mu_); + if (new_blocks_.find(block_id) != new_blocks_.end()) { + return true; + } + } + { + MutexLock lock(&block_mu_); + if (blocks_.find(block_id) != blocks_.end()) { + return true; + } + } + return false; +} + +int32_t Blocks::GetChunkServerId() const { + return cs_id_; +} + ChunkServerManager::ChunkServerManager(ThreadPool* thread_pool, BlockMappingManager* block_mapping_manager) : thread_pool_(thread_pool), block_mapping_manager_(block_mapping_manager), @@ -428,7 +450,8 @@ bool ChunkServerManager::GetChunkServerChains(int num, } bool ChunkServerManager::GetRecoverChains(const std::set& replica, - std::vector* chains) { + std::vector* chains, + int32_t select_num) { mu_.AssertHeld(); std::map >::iterator it = heartbeat_list_.begin(); std::vector > loads; @@ -490,8 +513,9 @@ bool ChunkServerManager::GetRecoverChains(const std::set& replica, return false; } } - RandomSelect(&loads, FLAGS_recover_dest_limit); - for (int i = 0; i < static_cast(loads.size()) && i < FLAGS_recover_dest_limit; ++i) { + RandomSelect(&loads, select_num); + for (int i = 0; i < static_cast(loads.size()) && + i < select_num; ++i) { ChunkServerInfo* cs = loads[i].second; chains->push_back(cs->address()); } @@ -674,7 +698,9 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks it != blocks.end(); ++it) { MutexLock lock(&mu_); recover_blocks->push_back(std::make_pair((*it).first, std::vector())); - if (GetRecoverChains((*it).second, &(recover_blocks->back().second))) { + if (GetRecoverChains((*it).second, + &(recover_blocks->back().second), + FLAGS_recover_dest_limit)) { // } else { block_mapping_manager_->ProcessRecoveredBlock(cs_id, (*it).first, kGetChunkServerError); @@ -695,6 +721,21 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks } } +void ChunkServerManager::PickRecoverWritingBlocks(int32_t cs_id, + ::google::protobuf::RepeatedPtrField* recover_block_info) { + Blocks* block_map = GetBlockMap(cs_id); + if (!block_map) { + LOG(FATAL, "Get block map for C%d fail", cs_id); + } + block_mapping_manager_->PickRecoverWritingBlocks(block_map, + recover_block_info); + // covert cs_id to cs_addr + for (int i = 0; i < recover_block_info->size(); i++) { + RecoverInfo* info = recover_block_info->Mutable(i); + info->set_cs_addr(GetChunkServerAddr(info->cs_id())); + } +} + void ChunkServerManager::GetStat(int32_t* w_qps, int64_t* w_speed, int32_t* r_qps, int64_t* r_speed, int64_t* recover_speed) { if (w_qps) *w_qps = stats_.w_qps; diff --git a/src/nameserver/chunkserver_manager.h b/src/nameserver/chunkserver_manager.h index db7e314d..e2dfa2a4 100644 --- a/src/nameserver/chunkserver_manager.h +++ b/src/nameserver/chunkserver_manager.h @@ -29,6 +29,8 @@ class Blocks { void MoveNew(); int64_t CheckLost(int64_t report_id, const std::set& blocks, int64_t start, int64_t end, std::vector* lost); + bool BlockExists(int64_t block_id); + int32_t GetChunkServerId() const; private: Mutex block_mu_; std::set blocks_; @@ -55,7 +57,8 @@ class ChunkServerManager { void ListChunkServers(::google::protobuf::RepeatedPtrField* chunkservers); bool GetChunkServerChains(int num, std::vector >* chains, const std::string& client_address); - bool GetRecoverChains(const std::set& replica, std::vector* chains); + bool GetRecoverChains(const std::set& replica, + std::vector* chains, int num); int32_t AddChunkServer(const std::string& address, const std::string& ip, const std::string& tag, int64_t quota); bool KickChunkServer(int cs_id); @@ -66,7 +69,10 @@ class ChunkServerManager { void AddBlock(int32_t id, int64_t block_id); void RemoveBlock(int32_t id, int64_t block_id); void CleanChunkServer(ChunkServerInfo* cs, const std::string& reason); - void PickRecoverBlocks(int cs_id, RecoverVec* recover_blocks, int* hi_num, bool hi_only); + void PickRecoverBlocks(int32_t cs_id, RecoverVec* recover_blocks, + int32_t* hi_num, bool hi_only); + void PickRecoverWritingBlocks(int32_t cs_id, + ::google::protobuf::RepeatedPtrField* recover_block_info); void GetStat(int32_t* w_qps, int64_t* w_speed, int32_t* r_qps, int64_t* r_speed, int64_t* recover_speed); StatusCode ShutdownChunkServer(const::google::protobuf::RepeatedPtrField& chunkserver_address); diff --git a/src/nameserver/nameserver_impl.cc b/src/nameserver/nameserver_impl.cc index ade00ec9..70e21e8f 100644 --- a/src/nameserver/nameserver_impl.cc +++ b/src/nameserver/nameserver_impl.cc @@ -318,6 +318,11 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller, } LOG(INFO, "Response to C%d %s new_replicas_size= %d", cs_id, request->chunkserver_addr().c_str(), response->new_replicas_size()); + + ::google::protobuf::RepeatedPtrField* + recover_writing_blocks = response->mutable_recover_writing_blocks(); + chunkserver_manager_->PickRecoverWritingBlocks(cs_id, + recover_writing_blocks); } block_mapping_manager_->GetCloseBlocks(cs_id, response->mutable_close_blocks()); int64_t end_report = common::timer::get_micros(); @@ -1015,6 +1020,62 @@ void NameServerImpl::ShutdownChunkServerStat(::google::protobuf::RpcController* done->Run(); } +void NameServerImpl::GetChunkServer(::google::protobuf::RpcController* controller, + const GetChunkServerRequest* request, + GetChunkServerResponse* response, + ::google::protobuf::Closure* done) { + if (!is_leader_) { + response->set_status(kIsFollower); + done->Run(); + return; + } + sofa::pbrpc::RpcController* ctl = + reinterpret_cast(controller); + LOG(INFO, "Sdk %s want to get %d chunkserver for block #%ld", + ctl->RemoteAddress().c_str(), request->chunkserver_num(), + request->block_id()); + + response->set_sequence_id(request->sequence_id()); + int64_t block_id = request->block_id(); + std::vector cur_replicas; + block_mapping_manager_->GetLocatedBlock(block_id, &cur_replicas, + NULL, NULL); + std::set cur_rep(cur_replicas.begin(), cur_replicas.end()); + int32_t cs_num = request->chunkserver_num(); + std::vector result; + if (!chunkserver_manager_->GetRecoverChains(cur_rep, &result, cs_num)) { + LOG(WARNING, "Get %d chunkserver for block #%ld fail", + cs_num, block_id); + response->set_status(kGetChunkServerError); + done->Run(); + } + for (size_t i = 0; i < result.size(); i++) { + response->add_chunkservers(result[i]); + } + response->set_status(kOK); + done->Run(); +} + +void NameServerImpl::StartRecoverBlock(::google::protobuf::RpcController* controller, + const StartRecoverBlockRequest* request, + StartRecoverBlockResponse* response, + ::google::protobuf::Closure* done) { + if (!is_leader_) { + response->set_status(kIsFollower); + done->Run(); + return; + } + int64_t block_id = request->block_id(); + const std::string& cs_addr = request->chunkserver_addr(); + int64_t start_offset = request->start_offset(); + int64_t end_offset = request->end_offset(); + int32_t cs_id = chunkserver_manager_->GetChunkServerId(cs_addr); + LOG(INFO, "Start recover block #%ld to cs C%d, from offset %ld to %ld", + block_id, cs_id, start_offset, end_offset); + block_mapping_manager_->AddRecoverBlock(block_id, cs_id, + start_offset, end_offset); +} + void NameServerImpl::TransToString(const std::map >& chk_set, std::string* output) { for (std::map >::const_iterator it = @@ -1515,7 +1576,8 @@ void NameServerImpl::CallMethod(const ::google::protobuf::MethodDescriptor* meth std::make_pair("PushBlockReport", work_thread_pool_), std::make_pair("SysStat", read_thread_pool_), std::make_pair("Chmod", work_thread_pool_), - std::make_pair("Symlink", work_thread_pool_) + std::make_pair("Symlink", work_thread_pool_), + std::make_pair("GetChunkServer", work_thread_pool_) }; static int method_num = sizeof(ThreadPoolOfMethod) / diff --git a/src/nameserver/nameserver_impl.h b/src/nameserver/nameserver_impl.h index 181ebd70..c67130f4 100644 --- a/src/nameserver/nameserver_impl.h +++ b/src/nameserver/nameserver_impl.h @@ -132,6 +132,14 @@ class NameServerImpl : public NameServer { const ChmodRequest* request, ChmodResponse* response, ::google::protobuf::Closure* done); + void GetChunkServer(::google::protobuf::RpcController* controller, + const GetChunkServerRequest* request, + GetChunkServerResponse* response, + ::google::protobuf::Closure* done); + void StartRecoverBlock(::google::protobuf::RpcController* controller, + const StartRecoverBlockRequest* request, + StartRecoverBlockResponse* response, + ::google::protobuf::Closure* done); bool WebService(const sofa::pbrpc::HTTPRequest&, sofa::pbrpc::HTTPResponse&); private: diff --git a/src/nameserver/test/block_mapping_test.cc b/src/nameserver/test/block_mapping_test.cc index d2deda8f..6e93c913 100644 --- a/src/nameserver/test/block_mapping_test.cc +++ b/src/nameserver/test/block_mapping_test.cc @@ -5,7 +5,9 @@ #define private public #include "nameserver/block_mapping.h" +#include "nameserver/chunkserver_manager.h" #include "proto/status_code.pb.h" +#include "proto/nameserver.pb.h" #include @@ -58,7 +60,7 @@ TEST_F(BlockMappingTest, Basic) { } } -TEST_F(BlockMappingTest, DoNotRemoteHigherVersionBlock) { +TEST_F(BlockMappingTest, DoNotRemoveHigherVersionBlock) { int64_t block_id = 1; int64_t block_version = 2; int64_t block_size = 3; @@ -126,6 +128,54 @@ TEST_F(BlockMappingTest, NotRecoverEmptyBlock) { ASSERT_TRUE(bm->lost_blocks_.empty()); } +TEST_F(BlockMappingTest, AddRecoverBlock) { + BlockMapping* bm = new BlockMapping(&thread_pool); + int64_t block_id = 1; + int32_t cs_id = 2; + int64_t start_offset = 300; + int64_t end_offset = 400; + bm->AddRecoverBlock(block_id, cs_id, start_offset, end_offset); + auto it = bm->recover_writing_blocks_.find(block_id); + ASSERT_TRUE(it != bm->recover_writing_blocks_.end()); + RecoverInfo* info = it->second; + ASSERT_EQ(info->block_id(), block_id); + ASSERT_EQ(info->cs_id(), cs_id); + ASSERT_EQ(info->start_offset(), start_offset); + ASSERT_EQ(info->end_offset(), end_offset); +} + +TEST_F(BlockMappingTest, PickRecoverWritingBlocks) { + BlockMapping* bm = new BlockMapping(&thread_pool); + int64_t block_id = 1; + int32_t cs_id1 = 1; + int64_t start_offset = 300; + int64_t end_offset = 400; + bm->AddRecoverBlock(block_id, cs_id1, start_offset, end_offset); + Blocks* blocks1 = new Blocks(cs_id1); + blocks1->Insert(block_id); + BlockReportResponse response; + ::google::protobuf::RepeatedPtrField* result = + response.mutable_recover_writing_blocks(); + bm->PickRecoverWritingBlocks(blocks1, result); + ASSERT_EQ(result->size(), 0); + result->Clear(); + int32_t cs_id2 = 2; + Blocks* blocks2 = new Blocks(cs_id2); + blocks2->Insert(block_id); + bm->PickRecoverWritingBlocks(blocks2, result); + ASSERT_EQ(result->size(), 1); + const RecoverInfo& info = result->Get(0); + ASSERT_EQ(info.block_id(), block_id); + ASSERT_EQ(info.cs_id(), cs_id1); + ASSERT_EQ(info.start_offset(), start_offset); + ASSERT_EQ(info.end_offset(), end_offset); + result->Clear(); + int32_t cs_id3 = 3; + Blocks* blocks3 = new Blocks(cs_id3); + bm->PickRecoverWritingBlocks(blocks3, result); + ASSERT_EQ(result->size(), 0); +} + } // namespace bfs } // namespace baidu diff --git a/src/proto/chunkserver.proto b/src/proto/chunkserver.proto index b88b33d4..44d94d2d 100644 --- a/src/proto/chunkserver.proto +++ b/src/proto/chunkserver.proto @@ -18,7 +18,6 @@ message WriteBlockRequest { optional bool sync_on_close = 14; optional int64 total_size = 15 [default = -1]; } - message WriteBlockResponse { optional int64 sequence_id = 1; optional StatusCode status = 2; @@ -53,9 +52,21 @@ message GetBlockInfoResponse { repeated int64 timestamp = 9; } +message PrepareForWriteRequest { + optional int64 sequence_id = 1; + optional int64 block_id = 2; + optional int32 sliding_window_start_seq = 3; + optional int64 block_size = 4; +} +message PrepareForWriteResponse { + optional int64 sequence_id = 1; + optional StatusCode status = 2; +} + service ChunkServer { rpc WriteBlock(WriteBlockRequest) returns(WriteBlockResponse); rpc ReadBlock(ReadBlockRequest) returns(ReadBlockResponse); rpc GetBlockInfo(GetBlockInfoRequest) returns(GetBlockInfoResponse); + rpc PrepareForWrite(PrepareForWriteRequest) returns(PrepareForWriteResponse); } diff --git a/src/proto/nameserver.proto b/src/proto/nameserver.proto index 996cc499..7226ac9d 100644 --- a/src/proto/nameserver.proto +++ b/src/proto/nameserver.proto @@ -238,6 +238,7 @@ message BlockReportResponse { repeated int64 close_blocks = 4; repeated ReplicaInfo new_replicas = 5; optional int64 report_id = 6 [default = -1]; + repeated RecoverInfo recover_writing_blocks = 7; } message BlockReceivedRequest { @@ -338,6 +339,40 @@ message SymlinkResponse { optional StatusCode status = 2; } +message GetChunkServerRequest { + optional int64 sequence_id = 1; + optional int64 block_id = 2; + optional int32 chunkserver_num = 3; +} + +message GetChunkServerResponse { + optional int64 sequence_id = 1; + optional StatusCode status = 2; + repeated string chunkservers = 3; +} + +message StartRecoverBlockRequest { + optional int64 sequence_id = 1; + optional int32 chunkserver_id = 2; + optional string chunkserver_addr = 3; + optional int64 block_id = 4; + optional int64 start_offset = 5; + optional int64 end_offset = 6; +} + +message StartRecoverBlockResponse { + optional int64 sequence_id = 1; + optional StatusCode status = 2; +} + +message RecoverInfo { + optional int64 block_id = 1; + optional int32 cs_id = 2; + optional string cs_addr = 3; + optional int64 start_offset = 4; + optional int64 end_offset = 5; +} + service NameServer { rpc CreateFile(CreateFileRequest) returns(CreateFileResponse); rpc AddBlock(AddBlockRequest) returns(AddBlockResponse); @@ -363,5 +398,7 @@ service NameServer { rpc SysStat(SysStatRequest) returns(SysStatResponse); rpc Chmod(ChmodRequest) returns(ChmodResponse); rpc Symlink(SymlinkRequest) returns(SymlinkResponse); + rpc GetChunkServer(GetChunkServerRequest) returns(GetChunkServerResponse); + rpc StartRecoverBlock(StartRecoverBlockRequest) returns(StartRecoverBlockResponse); }