diff --git a/rocketmq-client/src/admin/default_mq_admin_ext_impl.rs b/rocketmq-client/src/admin/default_mq_admin_ext_impl.rs index a8b14a18b..3e9557002 100644 --- a/rocketmq-client/src/admin/default_mq_admin_ext_impl.rs +++ b/rocketmq-client/src/admin/default_mq_admin_ext_impl.rs @@ -1453,9 +1453,16 @@ impl MQAdminExt for DefaultMQAdminExtImpl { async fn get_broker_epoch_cache( &self, - _broker_addr: CheetahString, + broker_addr: CheetahString, ) -> rocketmq_error::RocketMQResult { - unimplemented!("get_broker_epoch_cache not implemented yet") + if let Some(ref mq_client_instance) = self.client_instance { + Ok(mq_client_instance + .get_mq_client_api_impl() + .get_broker_epoch_cache(broker_addr, self.timeout_millis.as_millis() as u64) + .await?) + } else { + Err(rocketmq_error::RocketMQError::ClientNotStarted) + } } async fn elect_master( diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 711ef6c04..3dcd6ffde 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -70,6 +70,7 @@ use rocketmq_remoting::protocol::body::acl_info::AclInfo; use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody; use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo; use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody; +use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntryCache; use rocketmq_remoting::protocol::body::get_consumer_list_by_group_response_body::GetConsumerListByGroupResponseBody; use rocketmq_remoting::protocol::body::query_assignment_request_body::QueryAssignmentRequestBody; use rocketmq_remoting::protocol::body::query_assignment_response_body::QueryAssignmentResponseBody; @@ -2416,6 +2417,36 @@ impl MQClientAPIImpl { } } + pub async fn get_broker_epoch_cache( + &self, + broker_addr: CheetahString, + timeout_millis: u64, + ) -> RocketMQResult { + let request = RemotingCommand::create_remoting_command(RequestCode::GetBrokerEpochCache); + let response = self + .remoting_client + .invoke_request(Some(&broker_addr), request, timeout_millis) + .await?; + match ResponseCode::from(response.code()) { + ResponseCode::Success => { + if let Some(body) = response.body() { + match EpochEntryCache::decode(body) { + Ok(value) => Ok(value), + Err(e) => Err(mq_client_err!(format!("decode EpochEntryCache failed: {}", e))), + } + } else { + Err(mq_client_err!( + "get_broker_epoch_cache response body is empty".to_string() + )) + } + } + _ => Err(mq_client_err!( + response.code(), + response.remark().map_or("".to_string(), |s| s.to_string()) + )), + } + } + pub async fn get_controller_config( &self, controller_address: CheetahString, diff --git a/rocketmq-remoting/src/protocol/body/epoch_entry_cache.rs b/rocketmq-remoting/src/protocol/body/epoch_entry_cache.rs index 710f4a994..cd2a26e37 100644 --- a/rocketmq-remoting/src/protocol/body/epoch_entry_cache.rs +++ b/rocketmq-remoting/src/protocol/body/epoch_entry_cache.rs @@ -12,11 +12,78 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; + use cheetah_string::CheetahString; use serde::Deserialize; use serde::Serialize; -#[derive(Deserialize, Serialize, Default)] -pub struct EpochEntry; + +#[derive(Deserialize, Serialize, Default, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct EpochEntry { + #[serde(default)] + epoch: i32, + #[serde(default)] + start_offset: i64, + #[serde(default = "EpochEntry::default_end_offset")] + end_offset: i64, +} + +impl EpochEntry { + fn default_end_offset() -> i64 { + i64::MAX + } + + pub fn new(epoch: i32, start_offset: i64) -> Self { + Self { + epoch, + start_offset, + end_offset: i64::MAX, + } + } + + pub fn with_end_offset(epoch: i32, start_offset: i64, end_offset: i64) -> Self { + Self { + epoch, + start_offset, + end_offset, + } + } + + pub fn get_epoch(&self) -> i32 { + self.epoch + } + + pub fn set_epoch(&mut self, epoch: i32) { + self.epoch = epoch; + } + + pub fn get_start_offset(&self) -> i64 { + self.start_offset + } + + pub fn set_start_offset(&mut self, start_offset: i64) { + self.start_offset = start_offset; + } + + pub fn get_end_offset(&self) -> i64 { + self.end_offset + } + + pub fn set_end_offset(&mut self, end_offset: i64) { + self.end_offset = end_offset; + } +} + +impl fmt::Display for EpochEntry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "EpochEntry{{epoch={}, startOffset={}, endOffset={}}}", + self.epoch, self.start_offset, self.end_offset + ) + } +} #[derive(Deserialize, Serialize, Default)] #[serde(rename_all = "camelCase")] @@ -59,6 +126,18 @@ impl EpochEntryCache { pub fn get_broker_id(&self) -> u64 { self.broker_id } + pub fn get_epoch_list(&self) -> &Vec { + &self.epoch_list + } + pub fn get_epoch_list_mut(&mut self) -> &mut Vec { + &mut self.epoch_list + } + pub fn get_max_offset(&self) -> u64 { + self.max_offset + } + pub fn set_max_offset(&mut self, max_offset: u64) { + self.max_offset = max_offset; + } } #[cfg(test)] @@ -69,22 +148,36 @@ mod tests { #[test] fn new_creates_instance_of_epoch_entry_cache() { - let epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry], 1); + let epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry::new(1, 0)], 1); assert_eq!(epoch_entry_cache.get_cluster_name(), &CheetahString::from("cluster1")); assert_eq!(epoch_entry_cache.get_broker_id(), 1); assert_eq!(epoch_entry_cache.get_broker_name(), &CheetahString::from("broker1")); } #[test] fn set_broker_name_updates_broker_name() { - let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry], 1); + let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry::new(1, 0)], 1); epoch_entry_cache.set_broker_name("broker2"); assert_eq!(epoch_entry_cache.get_broker_name(), &CheetahString::from("broker2")); } #[test] fn set_cluster_name_updates_cluster_name() { - let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry], 1); + let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry::new(1, 0)], 1); epoch_entry_cache.set_cluster_name("cluster2"); assert_eq!(epoch_entry_cache.get_cluster_name(), &CheetahString::from("cluster2")); } + + #[test] + fn epoch_entry_display() { + let entry = EpochEntry::with_end_offset(1, 100, 200); + assert_eq!(entry.to_string(), "EpochEntry{epoch=1, startOffset=100, endOffset=200}"); + } + + #[test] + fn epoch_entry_cache_getters() { + let entries = vec![EpochEntry::new(1, 0), EpochEntry::new(2, 100)]; + let cache = EpochEntryCache::new("cluster1", "broker1", 1, entries, 500); + assert_eq!(cache.get_epoch_list().len(), 2); + assert_eq!(cache.get_max_offset(), 500); + } } diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs index 76a8bffa0..774b234b6 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/admin/default_mq_admin_ext.rs @@ -1263,9 +1263,9 @@ impl MQAdminExt for DefaultMQAdminExt { async fn get_broker_epoch_cache( &self, - _broker_addr: CheetahString, + broker_addr: CheetahString, ) -> rocketmq_error::RocketMQResult { - unimplemented!("get_broker_epoch_cache not implemented yet") + self.default_mqadmin_ext_impl.get_broker_epoch_cache(broker_addr).await } async fn elect_master( diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs index f4920a8b2..0232bdf4a 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs @@ -197,6 +197,11 @@ impl CommandExecute for ClassificationTablePrint { command: "getBrokerConfig", remark: "Get broker config by cluster or special broker.", }, + Command { + category: "Broker", + command: "getBrokerEpoch", + remark: "Fetch broker epoch entries.", + }, Command { category: "Broker", command: "getColdDataFlowCtrInfo", diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs index 878931ab0..c40d83b07 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs @@ -16,6 +16,7 @@ mod clean_expired_cq_sub_command; mod clean_unused_topic_sub_command; mod delete_expired_commit_log_sub_command; mod get_broker_config_sub_command; +mod get_broker_epoch_sub_command; mod get_cold_data_flow_ctr_info_sub_command; mod remove_cold_data_flow_ctr_group_config_sub_command; mod reset_master_flush_offset_sub_command; @@ -34,6 +35,7 @@ use crate::commands::broker_commands::clean_expired_cq_sub_command::CleanExpired use crate::commands::broker_commands::clean_unused_topic_sub_command::CleanUnusedTopicSubCommand; use crate::commands::broker_commands::delete_expired_commit_log_sub_command::DeleteExpiredCommitLogSubCommand; use crate::commands::broker_commands::get_broker_config_sub_command::GetBrokerConfigSubCommand; +use crate::commands::broker_commands::get_broker_epoch_sub_command::GetBrokerEpochSubCommand; use crate::commands::broker_commands::get_cold_data_flow_ctr_info_sub_command::GetColdDataFlowCtrInfoSubCommand; use crate::commands::broker_commands::remove_cold_data_flow_ctr_group_config_sub_command::RemoveColdDataFlowCtrGroupConfigSubCommand; use crate::commands::broker_commands::reset_master_flush_offset_sub_command::ResetMasterFlushOffsetSubCommand; @@ -73,6 +75,13 @@ pub enum BrokerCommands { )] GetBrokerConfig(GetBrokerConfigSubCommand), + #[command( + name = "getBrokerEpoch", + about = "Fetch broker epoch entries.", + long_about = None, + )] + GetBrokerEpoch(GetBrokerEpochSubCommand), + #[command( name = "getColdDataFlowCtrInfo", about = "Get cold data flow ctr info.", @@ -130,6 +139,7 @@ impl CommandExecute for BrokerCommands { BrokerCommands::CleanUnusedTopic(value) => value.execute(rpc_hook).await, BrokerCommands::DeleteExpiredCommitLog(value) => value.execute(rpc_hook).await, BrokerCommands::GetBrokerConfig(cmd) => cmd.execute(rpc_hook).await, + BrokerCommands::GetBrokerEpoch(cmd) => cmd.execute(rpc_hook).await, BrokerCommands::GetColdDataFlowCtrInfo(value) => value.execute(rpc_hook).await, BrokerCommands::RemoveColdDataFlowCtrGroupConfig(value) => value.execute(rpc_hook).await, BrokerCommands::ResetMasterFlushOffset(value) => value.execute(rpc_hook).await, diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/get_broker_epoch_sub_command.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/get_broker_epoch_sub_command.rs new file mode 100644 index 000000000..8c8921f5b --- /dev/null +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/get_broker_epoch_sub_command.rs @@ -0,0 +1,151 @@ +// Copyright 2023 The RocketMQ Rust Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cheetah_string::CheetahString; +use clap::ArgGroup; +use clap::Parser; +use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt; +use rocketmq_common::TimeUtils::get_current_millis; +use rocketmq_error::RocketMQError; +use rocketmq_error::RocketMQResult; +use rocketmq_remoting::runtime::RPCHook; + +use crate::admin::default_mq_admin_ext::DefaultMQAdminExt; +use crate::commands::command_util::CommandUtil; +use crate::commands::CommandExecute; + +#[derive(Debug, Clone, Parser)] +#[command(group( + ArgGroup::new("target") + .required(true) + .args(&["broker_name", "cluster_name"]) +))] +pub struct GetBrokerEpochSubCommand { + #[arg(short = 'b', long = "brokerName", help = "which broker to get epoch")] + broker_name: Option, + + #[arg(short = 'c', long = "clusterName", help = "which cluster to get epoch")] + cluster_name: Option, + + #[arg( + short = 'i', + long = "interval", + required = false, + help = "the interval(second) of get info" + )] + interval: Option, +} + +impl CommandExecute for GetBrokerEpochSubCommand { + async fn execute(&self, rpc_hook: Option>) -> RocketMQResult<()> { + let mut default_mqadmin_ext = if let Some(rpc_hook) = rpc_hook { + DefaultMQAdminExt::with_rpc_hook(rpc_hook) + } else { + DefaultMQAdminExt::new() + }; + default_mqadmin_ext + .client_config_mut() + .set_instance_name(get_current_millis().to_string().into()); + + MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| { + RocketMQError::Internal(format!("GetBrokerEpochSubCommand: Failed to start MQAdminExt: {}", e)) + })?; + + let operation_result = if let Some(interval) = self.interval { + let flush_second = if interval > 0 { interval } else { 3 }; + loop { + let result = self.inner_exec(&default_mqadmin_ext).await; + if let Err(ref e) = result { + eprintln!("GetBrokerEpochSubCommand: error: {}", e); + } + tokio::time::sleep(tokio::time::Duration::from_secs(flush_second)).await; + } + } else { + self.inner_exec(&default_mqadmin_ext).await + }; + + MQAdminExt::shutdown(&mut default_mqadmin_ext).await; + operation_result + } +} + +impl GetBrokerEpochSubCommand { + async fn inner_exec(&self, default_mqadmin_ext: &DefaultMQAdminExt) -> RocketMQResult<()> { + if let Some(ref broker_name) = self.broker_name { + let broker_name = broker_name.trim(); + let cluster_info = default_mqadmin_ext.examine_broker_cluster_info().await.map_err(|e| { + RocketMQError::Internal(format!( + "GetBrokerEpochSubCommand: Failed to examine broker cluster info: {}", + e + )) + })?; + let brokers = CommandUtil::fetch_master_and_slave_addr_by_broker_name(&cluster_info, broker_name)?; + self.print_data( + &brokers.iter().map(|s| s.as_str()).collect::>(), + default_mqadmin_ext, + ) + .await?; + } else if let Some(ref cluster_name) = self.cluster_name { + let cluster_name = cluster_name.trim(); + let cluster_info = default_mqadmin_ext.examine_broker_cluster_info().await.map_err(|e| { + RocketMQError::Internal(format!( + "GetBrokerEpochSubCommand: Failed to examine broker cluster info: {}", + e + )) + })?; + let brokers = CommandUtil::fetch_master_and_slave_addr_by_cluster_name(&cluster_info, cluster_name)?; + self.print_data( + &brokers.iter().map(|s| s.as_str()).collect::>(), + default_mqadmin_ext, + ) + .await?; + } + Ok(()) + } + + async fn print_data(&self, brokers: &[&str], default_mqadmin_ext: &DefaultMQAdminExt) -> RocketMQResult<()> { + for broker_addr in brokers { + let mut epoch_cache = default_mqadmin_ext + .get_broker_epoch_cache(CheetahString::from(*broker_addr)) + .await + .map_err(|e| { + RocketMQError::Internal(format!( + "GetBrokerEpochSubCommand: Failed to get broker epoch cache from {}: {}", + broker_addr, e + )) + })?; + + println!( + "\n#clusterName\t{}\n#brokerName\t{}\n#brokerAddr\t{}\n#brokerId\t{}", + epoch_cache.get_cluster_name(), + epoch_cache.get_broker_name(), + broker_addr, + epoch_cache.get_broker_id() + ); + + let max_offset = epoch_cache.get_max_offset(); + let epoch_list = epoch_cache.get_epoch_list_mut(); + let len = epoch_list.len(); + for (i, epoch_entry) in epoch_list.iter_mut().enumerate() { + if i == len - 1 { + epoch_entry.set_end_offset(max_offset as i64); + } + println!("#Epoch: {}", epoch_entry); + } + } + Ok(()) + } +} diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/command_util.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/command_util.rs index dd620229e..0ebc7dc43 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/command_util.rs +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/command_util.rs @@ -70,6 +70,25 @@ impl CommandUtil { ))) } + pub fn fetch_master_and_slave_addr_by_broker_name( + cluster_info: &ClusterInfo, + broker_name: &str, + ) -> RocketMQResult> { + let broker_addr_table = cluster_info.broker_addr_table.as_ref().ok_or_else(|| { + RocketMQError::Internal("CommandUtil: No broker address table available from nameserver.".into()) + })?; + let broker_data = broker_addr_table.get(broker_name).ok_or_else(|| { + RocketMQError::Internal(format!( + "CommandUtil: No broker data found for broker name: {}", + broker_name + )) + })?; + let mut addrs: Vec = broker_data.broker_addrs().values().cloned().collect(); + addrs.sort(); + addrs.dedup(); + Ok(addrs) + } + #[allow(unused)] pub fn fetch_broker_name_by_cluster_name( cluster_info: &ClusterInfo,