Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,9 +1453,16 @@ impl MQAdminExt for DefaultMQAdminExtImpl {

async fn get_broker_epoch_cache(
&self,
_broker_addr: CheetahString,
broker_addr: CheetahString,
) -> rocketmq_error::RocketMQResult<EpochEntryCache> {
unimplemented!("get_broker_epoch_cache not implemented yet")
if let Some(ref mq_client_instance) = self.client_instance {
Ok(mq_client_instance
.get_mq_client_api_impl()
.get_broker_epoch_cache(broker_addr, self.timeout_millis.as_millis() as u64)
.await?)
} else {
Err(rocketmq_error::RocketMQError::ClientNotStarted)
}
}

async fn elect_master(
Expand Down
31 changes: 31 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use rocketmq_remoting::protocol::body::acl_info::AclInfo;
use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody;
use rocketmq_remoting::protocol::body::broker_body::cluster_info::ClusterInfo;
use rocketmq_remoting::protocol::body::check_client_request_body::CheckClientRequestBody;
use rocketmq_remoting::protocol::body::epoch_entry_cache::EpochEntryCache;
use rocketmq_remoting::protocol::body::get_consumer_list_by_group_response_body::GetConsumerListByGroupResponseBody;
use rocketmq_remoting::protocol::body::query_assignment_request_body::QueryAssignmentRequestBody;
use rocketmq_remoting::protocol::body::query_assignment_response_body::QueryAssignmentResponseBody;
Expand Down Expand Up @@ -2416,6 +2417,36 @@ impl MQClientAPIImpl {
}
}

pub async fn get_broker_epoch_cache(
&self,
broker_addr: CheetahString,
timeout_millis: u64,
) -> RocketMQResult<EpochEntryCache> {
let request = RemotingCommand::create_remoting_command(RequestCode::GetBrokerEpochCache);
let response = self
.remoting_client
.invoke_request(Some(&broker_addr), request, timeout_millis)
.await?;
match ResponseCode::from(response.code()) {
ResponseCode::Success => {
if let Some(body) = response.body() {
match EpochEntryCache::decode(body) {
Ok(value) => Ok(value),
Err(e) => Err(mq_client_err!(format!("decode EpochEntryCache failed: {}", e))),
}
} else {
Err(mq_client_err!(
"get_broker_epoch_cache response body is empty".to_string()
))
}
}
_ => Err(mq_client_err!(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string())
)),
}
}

pub async fn get_controller_config(
&self,
controller_address: CheetahString,
Expand Down
103 changes: 98 additions & 5 deletions rocketmq-remoting/src/protocol/body/epoch_entry_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,78 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use cheetah_string::CheetahString;
use serde::Deserialize;
use serde::Serialize;
#[derive(Deserialize, Serialize, Default)]
pub struct EpochEntry;

#[derive(Deserialize, Serialize, Default, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct EpochEntry {
#[serde(default)]
epoch: i32,
#[serde(default)]
start_offset: i64,
#[serde(default = "EpochEntry::default_end_offset")]
end_offset: i64,
}

impl EpochEntry {
fn default_end_offset() -> i64 {
i64::MAX
}

pub fn new(epoch: i32, start_offset: i64) -> Self {
Self {
epoch,
start_offset,
end_offset: i64::MAX,
}
}

pub fn with_end_offset(epoch: i32, start_offset: i64, end_offset: i64) -> Self {
Self {
epoch,
start_offset,
end_offset,
}
}

pub fn get_epoch(&self) -> i32 {
self.epoch
}

pub fn set_epoch(&mut self, epoch: i32) {
self.epoch = epoch;
}

pub fn get_start_offset(&self) -> i64 {
self.start_offset
}

pub fn set_start_offset(&mut self, start_offset: i64) {
self.start_offset = start_offset;
}

pub fn get_end_offset(&self) -> i64 {
self.end_offset
}

pub fn set_end_offset(&mut self, end_offset: i64) {
self.end_offset = end_offset;
}
}

impl fmt::Display for EpochEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"EpochEntry{{epoch={}, startOffset={}, endOffset={}}}",
self.epoch, self.start_offset, self.end_offset
)
}
}

#[derive(Deserialize, Serialize, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -59,6 +126,18 @@ impl EpochEntryCache {
pub fn get_broker_id(&self) -> u64 {
self.broker_id
}
pub fn get_epoch_list(&self) -> &Vec<EpochEntry> {
&self.epoch_list
}
pub fn get_epoch_list_mut(&mut self) -> &mut Vec<EpochEntry> {
&mut self.epoch_list
}
pub fn get_max_offset(&self) -> u64 {
self.max_offset
}
pub fn set_max_offset(&mut self, max_offset: u64) {
self.max_offset = max_offset;
}
}

#[cfg(test)]
Expand All @@ -69,22 +148,36 @@ mod tests {

#[test]
fn new_creates_instance_of_epoch_entry_cache() {
let epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry], 1);
let epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry::new(1, 0)], 1);
assert_eq!(epoch_entry_cache.get_cluster_name(), &CheetahString::from("cluster1"));
assert_eq!(epoch_entry_cache.get_broker_id(), 1);
assert_eq!(epoch_entry_cache.get_broker_name(), &CheetahString::from("broker1"));
}
#[test]
fn set_broker_name_updates_broker_name() {
let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry], 1);
let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry::new(1, 0)], 1);
epoch_entry_cache.set_broker_name("broker2");
assert_eq!(epoch_entry_cache.get_broker_name(), &CheetahString::from("broker2"));
}

#[test]
fn set_cluster_name_updates_cluster_name() {
let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry], 1);
let mut epoch_entry_cache = EpochEntryCache::new("cluster1", "broker1", 1, vec![EpochEntry::new(1, 0)], 1);
epoch_entry_cache.set_cluster_name("cluster2");
assert_eq!(epoch_entry_cache.get_cluster_name(), &CheetahString::from("cluster2"));
}

#[test]
fn epoch_entry_display() {
let entry = EpochEntry::with_end_offset(1, 100, 200);
assert_eq!(entry.to_string(), "EpochEntry{epoch=1, startOffset=100, endOffset=200}");
}

#[test]
fn epoch_entry_cache_getters() {
let entries = vec![EpochEntry::new(1, 0), EpochEntry::new(2, 100)];
let cache = EpochEntryCache::new("cluster1", "broker1", 1, entries, 500);
assert_eq!(cache.get_epoch_list().len(), 2);
assert_eq!(cache.get_max_offset(), 500);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1263,9 +1263,9 @@ impl MQAdminExt for DefaultMQAdminExt {

async fn get_broker_epoch_cache(
&self,
_broker_addr: CheetahString,
broker_addr: CheetahString,
) -> rocketmq_error::RocketMQResult<EpochEntryCache> {
unimplemented!("get_broker_epoch_cache not implemented yet")
self.default_mqadmin_ext_impl.get_broker_epoch_cache(broker_addr).await
}

async fn elect_master(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ impl CommandExecute for ClassificationTablePrint {
command: "getBrokerConfig",
remark: "Get broker config by cluster or special broker.",
},
Command {
category: "Broker",
command: "getBrokerEpoch",
remark: "Fetch broker epoch entries.",
},
Command {
category: "Broker",
command: "getColdDataFlowCtrInfo",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod clean_expired_cq_sub_command;
mod clean_unused_topic_sub_command;
mod delete_expired_commit_log_sub_command;
mod get_broker_config_sub_command;
mod get_broker_epoch_sub_command;
mod get_cold_data_flow_ctr_info_sub_command;
mod remove_cold_data_flow_ctr_group_config_sub_command;
mod reset_master_flush_offset_sub_command;
Expand All @@ -34,6 +35,7 @@ use crate::commands::broker_commands::clean_expired_cq_sub_command::CleanExpired
use crate::commands::broker_commands::clean_unused_topic_sub_command::CleanUnusedTopicSubCommand;
use crate::commands::broker_commands::delete_expired_commit_log_sub_command::DeleteExpiredCommitLogSubCommand;
use crate::commands::broker_commands::get_broker_config_sub_command::GetBrokerConfigSubCommand;
use crate::commands::broker_commands::get_broker_epoch_sub_command::GetBrokerEpochSubCommand;
use crate::commands::broker_commands::get_cold_data_flow_ctr_info_sub_command::GetColdDataFlowCtrInfoSubCommand;
use crate::commands::broker_commands::remove_cold_data_flow_ctr_group_config_sub_command::RemoveColdDataFlowCtrGroupConfigSubCommand;
use crate::commands::broker_commands::reset_master_flush_offset_sub_command::ResetMasterFlushOffsetSubCommand;
Expand Down Expand Up @@ -73,6 +75,13 @@ pub enum BrokerCommands {
)]
GetBrokerConfig(GetBrokerConfigSubCommand),

#[command(
name = "getBrokerEpoch",
about = "Fetch broker epoch entries.",
long_about = None,
)]
GetBrokerEpoch(GetBrokerEpochSubCommand),

#[command(
name = "getColdDataFlowCtrInfo",
about = "Get cold data flow ctr info.",
Expand Down Expand Up @@ -130,6 +139,7 @@ impl CommandExecute for BrokerCommands {
BrokerCommands::CleanUnusedTopic(value) => value.execute(rpc_hook).await,
BrokerCommands::DeleteExpiredCommitLog(value) => value.execute(rpc_hook).await,
BrokerCommands::GetBrokerConfig(cmd) => cmd.execute(rpc_hook).await,
BrokerCommands::GetBrokerEpoch(cmd) => cmd.execute(rpc_hook).await,
BrokerCommands::GetColdDataFlowCtrInfo(value) => value.execute(rpc_hook).await,
BrokerCommands::RemoveColdDataFlowCtrGroupConfig(value) => value.execute(rpc_hook).await,
BrokerCommands::ResetMasterFlushOffset(value) => value.execute(rpc_hook).await,
Expand Down
Loading
Loading