Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/chunkserver/chunkserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ChunkServerImpl : public ChunkServer {
int64_t first_round_report_start_;
volatile bool service_stop_;

Params params_;
ChunkServerParams params_;
};

} // namespace bfs
Expand Down
9 changes: 7 additions & 2 deletions src/nameserver/block_mapping.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ NSBlock::NSBlock(int64_t block_id, int32_t replica,
recover_stat(block_version < 0 ? kBlockWriting : kNotInRecover) {
}

BlockMapping::BlockMapping(ThreadPool* thread_pool) : thread_pool_(thread_pool) {}
BlockMapping::BlockMapping(ThreadPool* thread_pool) : thread_pool_(thread_pool), clean_redundancy_(false) {}

bool BlockMapping::GetBlock(int64_t block_id, NSBlock* block) {
MutexLock lock(&mu_, "BlockMapping::GetBlock", 1000);
Expand Down Expand Up @@ -265,7 +265,7 @@ bool BlockMapping::UpdateNormalBlock(NSBlock* nsblock,
}

TryRecover(nsblock);
if (FLAGS_clean_redundancy && replica.size() > nsblock->expect_replica_num) {
if (clean_redundancy_ && replica.size() > nsblock->expect_replica_num) {
LOG(INFO, "Too much replica #%ld R%lu expect=%d C%d ",
block_id, replica.size(), nsblock->expect_replica_num, cs_id);
replica.erase(cs_id);
Expand Down Expand Up @@ -948,5 +948,10 @@ void BlockMapping::MarkIncomplete(int64_t block_id) {
}
}

void BlockMapping::SetCleanRedundancy(bool clean) {
MutexLock lock(&mu_);
clean_redundancy_ = clean;
}

} // namespace bfs
} // namespace baidu
3 changes: 3 additions & 0 deletions src/nameserver/block_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class BlockMapping {
void ListRecover(RecoverBlockSet* blocks);
int32_t GetCheckNum();
void MarkIncomplete(int64_t block_id);
void SetCleanRedundancy(bool clean);
private:
void DealWithDeadBlockInternal(int32_t cs_id, int64_t block_id);
typedef std::map<int32_t, std::set<int64_t> > CheckList;
Expand Down Expand Up @@ -112,6 +113,8 @@ class BlockMapping {
std::set<int64_t> lo_pri_recover_;
std::set<int64_t> hi_pri_recover_;
std::set<int64_t> lost_blocks_;

bool clean_redundancy_;
};

} // namespace bfs
Expand Down
6 changes: 6 additions & 0 deletions src/nameserver/block_mapping_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,11 @@ void BlockMappingManager::MarkIncomplete(int64_t block_id) {
block_mapping_[bucket_offset]->MarkIncomplete(block_id);
}

void BlockMappingManager::SetCleanRedundancy(bool clean) {
for (size_t i = 0; i < block_mapping_.size(); i++) {
block_mapping_[i]->SetCleanRedundancy(clean);
}
}

} //namespace bfs
} //namespace baidu
1 change: 1 addition & 0 deletions src/nameserver/block_mapping_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public :
void GetRecoverNum(int32_t bucket_id, RecoverBlockNum* recover_num);
void ListRecover(RecoverBlockSet* recover_blocks);
void MarkIncomplete(int64_t block_id);
void SetCleanRedundancy(bool clean);
private:
int32_t GetBucketOffset(int64_t block_id);
private:
Expand Down
2 changes: 1 addition & 1 deletion src/nameserver/chunkserver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ void ChunkServerManager::AddBlock(int32_t id, int64_t block_id, bool is_recover)
cs_block_map->blocks.insert(block_id);
}

void ChunkServerManager::SetParam(const Params& p) {
void ChunkServerManager::SetParam(const ChunkServerParams& p) {
MutexLock lock(&mu_);
if (p.report_interval() != -1) {
params_.set_report_interval(p.report_interval());
Expand Down
4 changes: 2 additions & 2 deletions src/nameserver/chunkserver_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ChunkServerManager {
bool GetShutdownChunkServerStat();
int64_t AddBlockWithCheck(int32_t id, const std::set<int64_t>& blocks, int64_t start, int64_t end,
std::vector<int64_t>* lost, int64_t report_id);
void SetParam(const Params& p);
void SetParam(const ChunkServerParams& p);
private:
struct ChunkServerBlockMap {
Mutex* mu;
Expand Down Expand Up @@ -96,7 +96,7 @@ class ChunkServerManager {
std::vector<std::string> chunkservers_to_offline_;

// for chunkserver
Params params_;
ChunkServerParams params_;
};


Expand Down
62 changes: 52 additions & 10 deletions src/nameserver/nameserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ extern common::Counter g_blocks_num;

NameServerImpl::NameServerImpl(Sync* sync) : readonly_(true),
recover_timeout_(FLAGS_nameserver_start_recover_timeout),
block_report_timeout_(FLAGS_block_report_timeout),
recover_mode_(kStopRecover), sync_(sync) {
block_mapping_manager_ = new BlockMappingManager(FLAGS_blockmapping_bucket_num);
report_thread_pool_ = new common::ThreadPool(FLAGS_nameserver_report_thread_num);
Expand Down Expand Up @@ -1023,21 +1024,28 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
ListRecover(&response);
return true;
} else if (path == "/dfs/hi_only") {
recover_timeout_ = 0;
NameServerParams para;
para.set_recover_timeout(0);
LOG(INFO, "ChangeRecoverMode hi_only");
recover_mode_ = kHiOnly;
para.set_recover_mode(kHiOnly);
response.content->Append("<body onload=\"history.back()\"></body>");
SetNameServerParams(para);
return true;
} else if (path == "/dfs/recover_all") {
recover_timeout_ = 0;
NameServerParams para;
para.set_recover_timeout(0);
LOG(INFO, "ChangeRecoverMode recover_all");
recover_mode_ = kRecoverAll;
para.set_recover_mode(kRecoverAll);
SetNameServerParams(para);
response.content->Append("<body onload=\"history.back()\"></body>");
return true;
} else if (path == "/dfs/stop_recover") {
recover_timeout_ = 0;
NameServerParams para;
para.set_recover_timeout(0);
LOG(INFO, "ChangeRecoverMode stop_recover");
recover_mode_ = kStopRecover;
para.set_recover_mode(kStopRecover);
SetNameServerParams(para);
response.content->Append("<body onload=\"history.back()\"></body>");
return true;
} else if (path == "/dfs/leave_read_only") {
Expand All @@ -1047,7 +1055,9 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
return true;
} else if (path == "/dfs/entry_read_only") {
LOG(INFO, "ChangeStatus entry_read_only");
readonly_ = true;
NameServerParams para;
para.set_readonly(true);
SetNameServerParams(para);
response.content->Append("<body onload=\"history.back()\"></body>");
return true;
} else if (path == "/dfs/kick" && FLAGS_bfs_web_kick_enable) {
Expand All @@ -1071,7 +1081,7 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
display_mode = kOverload;
} else if (path == "/dfs/set") {
std::map<const std::string, std::string>::const_iterator it = request.query_params->begin();
Params p;
ChunkServerParams p;
if (it != request.query_params->end()) {
int32_t v = 0;
if (it->first != "clean_redundancy") {
Expand Down Expand Up @@ -1106,13 +1116,21 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
response.content->Append("<h1>Bad Parameter : 2 <= block_report_timeout <= 3600 </h1>");
return true;
}
FLAGS_block_report_timeout = v;
NameServerParams para;
para.set_block_report_timeout(v);
SetNameServerParams(para);
} else if (it->first == "clean_redundancy") {
if (it->second != "true" && it->second != "false") {
response.content->Append("<h1>Bad Parameter : clean_redundancy == true || false");
return true;
}
FLAGS_clean_redundancy = it->second == "true" ? true : false;
NameServerParams para;
if (it->second == "true") {
para.set_clean_redundancy(true);
} else {
para.set_clean_redundancy(false);
}
SetNameServerParams(para);
} else {
response.content->Append("<h1>Bad Parameter :");
response.content->Append(it->first);
Expand Down Expand Up @@ -1348,7 +1366,8 @@ static void CallMethodHelper(NameServerImpl* impl,
int64_t recv_time) {
if (method->index() == 16) {
int64_t delay = common::timer::get_micros() - recv_time;
if (delay > FLAGS_block_report_timeout *1000L * 1000L) {
int32_t timeout = impl->GetBlockReportTimeout();
if (delay > timeout * 1000L * 1000L) {
const BlockReportRequest* report =
static_cast<const BlockReportRequest*>(request);
LOG(WARNING, "BlockReport from %s, delay %ld ms",
Expand Down Expand Up @@ -1407,6 +1426,29 @@ void NameServerImpl::CallMethod(const ::google::protobuf::MethodDescriptor* meth
}
}

void NameServerImpl::SetNameServerParams(const NameServerParams& para) {
MutexLock lock(&mu_);
if (para.has_block_report_timeout()) {
block_report_timeout_ = para.block_report_timeout();
}
if (para.has_clean_redundancy()) {
block_mapping_manager_->SetCleanRedundancy(para.clean_redundancy());
}
if (para.has_recover_mode()) {
recover_mode_ = para.recover_mode();
}
if (para.has_recover_timeout()) {
recover_timeout_ = para.recover_timeout();
}
if (para.has_readonly()) {
readonly_ = para.readonly();
}
}

int32_t NameServerImpl::GetBlockReportTimeout() {
return block_report_timeout_;
}

} // namespace bfs
} // namespace baidu

Expand Down
22 changes: 7 additions & 15 deletions src/nameserver/nameserver_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <common/thread_pool.h>

#include "proto/nameserver.pb.h"
#include "proto/status_code.pb.h"

namespace sofa {
namespace pbrpc {
Expand All @@ -26,19 +27,6 @@ class ChunkServerManager;
class BlockMappingManager;
class Sync;

enum RecoverMode {
kStopRecover = 0,
kHiOnly = 1,
kRecoverAll = 2,
};

enum DisplayMode {
kDisplayAll = 0,
kAliveOnly = 1,
kDeadOnly = 2,
kOverload = 3,
};

class NameServerImpl : public NameServer {
public:
NameServerImpl(Sync* sync);
Expand Down Expand Up @@ -125,6 +113,7 @@ class NameServerImpl : public NameServer {
::google::protobuf::Closure* done);

bool WebService(const sofa::pbrpc::HTTPRequest&, sofa::pbrpc::HTTPResponse&);
int32_t GetBlockReportTimeout();

private:
void CheckLeader();
Expand Down Expand Up @@ -152,6 +141,7 @@ class NameServerImpl : public NameServer {
bool CheckFileHasBlock(const FileInfo& file_info,
const std::string& file_name,
int64_t block_id);
void SetNameServerParams(const NameServerParams& para);
private:
/// Global thread pool
ThreadPool* read_thread_pool_;
Expand All @@ -164,14 +154,16 @@ class NameServerImpl : public NameServer {
BlockMappingManager* block_mapping_manager_;

volatile bool readonly_;
volatile int recover_timeout_;
RecoverMode recover_mode_;
volatile int32_t recover_timeout_;
volatile int32_t block_report_timeout_;
volatile RecoverMode recover_mode_;
int64_t start_time_;
/// Namespace
NameSpace* namespace_;
/// ha
Sync* sync_;
bool is_leader_;
Mutex mu_;
};

} // namespace bfs
Expand Down
23 changes: 22 additions & 1 deletion src/proto/status_code.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,30 @@ enum RecoverPri {
kLow = 1;
}

message Params {
enum RecoverMode {
kStopRecover = 0;
kHiOnly = 1;
kRecoverAll = 2;
}

enum DisplayMode {
kDisplayAll = 0;
kAliveOnly = 1;
kDeadOnly = 2;
kOverload = 3;
}

message ChunkServerParams {
optional int32 report_interval = 1 [default = -1];
optional int32 report_size = 2 [default = -1];
optional int32 recover_size = 3 [default = -1];
optional int32 keepalive_timeout = 4 [default = -1];
}

message NameServerParams {
optional int32 block_report_timeout = 1;
optional bool clean_redundancy = 2;
optional RecoverMode recover_mode = 3;
optional int32 recover_timeout = 4;
optional bool readonly = 5;
}