From 1e5065efee7e2b2dc0e0be2a2e85fb062f6d681c Mon Sep 17 00:00:00 2001 From: WaterWhisperer Date: Wed, 18 Feb 2026 19:56:12 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#6278]=F0=9F=9A=80Implement=20BrokerCo?= =?UTF-8?q?nsumeStats=20command=20in=20rocketmq-admin-core?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 11 +- Cargo.toml | 109 ++++++------ rocketmq-broker/Cargo.toml | 2 +- .../rocketmq-dashboard-common/Cargo.toml | 10 -- rocketmq-example/Cargo.toml | 30 ++-- rocketmq-remoting/Cargo.toml | 8 +- .../rocketmq-admin-cli/Cargo.toml | 25 ++- .../rocketmq-admin-core/Cargo.toml | 1 + .../rocketmq-admin-core/src/commands.rs | 5 + .../src/commands/broker_commands.rs | 12 +- .../broker_consume_stats_sub_command.rs | 159 ++++++++++++++++++ 11 files changed, 265 insertions(+), 107 deletions(-) create mode 100644 rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/broker_consume_stats_sub_command.rs diff --git a/Cargo.lock b/Cargo.lock index 18df4b35d..5377bbe83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3424,16 +3424,13 @@ dependencies = [ [[package]] name = "rocketmq-admin-cli" version = "0.8.0" -dependencies = [ - "rocketmq-admin-core", - "tokio", -] [[package]] name = "rocketmq-admin-core" version = "0.8.0" dependencies = [ "cheetah-string", + "chrono", "clap", "clap_complete", "colored", @@ -3651,13 +3648,7 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", - "rocketmq-client-rust", - "rocketmq-common", - "rocketmq-remoting", "serde", - "serde_json", - "tokio", - "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 69f9e5cb0..46cc71859 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,25 +15,25 @@ [workspace] members = [ - "rocketmq", - "rocketmq-auth", - "rocketmq-broker", - "rocketmq-client", - "rocketmq-common", - "rocketmq-controller", - "rocketmq-error", - "rocketmq-filter", - "rocketmq-macros", - "rocketmq-namesrv", - "rocketmq-proxy", - "rocketmq-remoting", - "rocketmq-runtime", - "rocketmq-store", - "rocketmq-tools/rocketmq-admin/rocketmq-admin-cli", - "rocketmq-tools/rocketmq-admin/rocketmq-admin-core", - "rocketmq-tools/rocketmq-admin/rocketmq-admin-tui", - "rocketmq-tools/rocketmq-store-inspect", - "rocketmq-dashboard/rocketmq-dashboard-common" + "rocketmq", + "rocketmq-auth", + "rocketmq-broker", + "rocketmq-client", + "rocketmq-common", + "rocketmq-controller", + "rocketmq-error", + "rocketmq-filter", + "rocketmq-macros", + "rocketmq-namesrv", + "rocketmq-proxy", + "rocketmq-remoting", + "rocketmq-runtime", + "rocketmq-store", + "rocketmq-tools/rocketmq-admin/rocketmq-admin-cli", + "rocketmq-tools/rocketmq-admin/rocketmq-admin-core", + "rocketmq-tools/rocketmq-admin/rocketmq-admin-tui", + "rocketmq-tools/rocketmq-store-inspect", + "rocketmq-dashboard/rocketmq-dashboard-common", ] resolver = "2" @@ -45,11 +45,11 @@ homepage = "https://github.com/mxsm/rocketmq-rust" repository = "https://github.com/mxsm/rocketmq-rust" license = "MIT OR Apache-2.0" keywords = [ - "apache-rocketmq", - "rocketmq-rust", - "rocketmq-rs", - "rust", - "rocketmq-client", + "apache-rocketmq", + "rocketmq-rust", + "rocketmq-rs", + "rust", + "rocketmq-client", ] categories = ["asynchronous", "network-programming", "development-tools"] readme = "README.md" @@ -58,53 +58,52 @@ Unofficial Rust implementation of Apache RocketMQ """ rust-version = "1.85.0" [workspace.dependencies] -rocketmq-common = { path = "./rocketmq-common" } -rocketmq-runtime = { path = "./rocketmq-runtime" } -rocketmq-macros = { path = "./rocketmq-macros" } -rocketmq-rust = { path = "./rocketmq" } -rocketmq-filter = { path = "./rocketmq-filter" } -rocketmq-store = { path = "./rocketmq-store", default-features = true } -rocketmq-remoting = { path = "./rocketmq-remoting" } +rocketmq-common = { path = "./rocketmq-common" } +rocketmq-runtime = { path = "./rocketmq-runtime" } +rocketmq-macros = { path = "./rocketmq-macros" } +rocketmq-rust = { path = "./rocketmq" } +rocketmq-filter = { path = "./rocketmq-filter" } +rocketmq-store = { path = "./rocketmq-store", default-features = true } +rocketmq-remoting = { path = "./rocketmq-remoting" } rocketmq-client-rust = { path = "./rocketmq-client" } -rocketmq-tools = { path = "./rocketmq-tools/rocketmq-admin/rocketmq-admin-core", package = "rocketmq-admin-core" } -rocketmq-error = { path = "./rocketmq-error" } -rocketmq-auth = { path = "./rocketmq-auth" } +rocketmq-error = { path = "./rocketmq-error" } +rocketmq-auth = { path = "./rocketmq-auth" } -tokio = { version = "1.49", features = ["full"] } -tokio-util = { version = "0.7.18", features = ["full"] } +tokio = { version = "1.49", features = ["full"] } +tokio-util = { version = "0.7.18", features = ["full"] } tokio-stream = { version = "0.1.18", features = ["full"] } -tracing = "0.1.44" +tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["time"] } -tracing-appender = "0.2.4" +tracing-appender = "0.2.4" thiserror = "2.0.18" #serde -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -serde_yaml = "0.9" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_yaml = "0.9" serde_json_any_key = "2.0.0" -anyhow = "1.0.101" -bytes = "1.11.1" -rand = "0.10.0" -num_cpus = "1.17" +anyhow = "1.0.101" +bytes = "1.11.1" +rand = "0.10.0" +num_cpus = "1.17" config = "0.15.19" -parking_lot = "0.12" -dirs = "6.0" +parking_lot = "0.12" +dirs = "6.0" trait-variant = "0.1.2" mockall = "0.14.0" -cfg-if = "1.0.4" +cfg-if = "1.0.4" sysinfo = "0.38.2" uuid = { version = "1.21.0", features = [ - "v4", # Lets you generate random UUIDs - "fast-rng", # Use a faster (but still sufficiently random) RNG - "macro-diagnostics", + "v4", # Lets you generate random UUIDs + "fast-rng", # Use a faster (but still sufficiently random) RNG + "macro-diagnostics", ] } @@ -112,10 +111,10 @@ futures = "0.3" cheetah-string = { version = "1.0.1", features = ["serde", "bytes", "simd"] } -flate2 = "1.1.9" -dashmap = "6.1.0" -strum = { version = "0.27.2", features = ["derive"] } +flate2 = "1.1.9" +dashmap = "6.1.0" +strum = { version = "0.27.2", features = ["derive"] } smallvec = "1.13" -opentelemetry = { version = "0.31", features = ["metrics"] } +opentelemetry = { version = "0.31", features = ["metrics"] } opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio"] } diff --git a/rocketmq-broker/Cargo.toml b/rocketmq-broker/Cargo.toml index 762817b3b..0cba125bc 100644 --- a/rocketmq-broker/Cargo.toml +++ b/rocketmq-broker/Cargo.toml @@ -41,7 +41,7 @@ rocketmq-filter = { workspace = true } rocketmq-runtime = { workspace = true } rocketmq-client-rust = { workspace = true } rocketmq-error = { workspace = true } -rocketmq-auth = { workspace = true } +rocketmq-auth = { workspace = true } anyhow.workspace = true diff --git a/rocketmq-dashboard/rocketmq-dashboard-common/Cargo.toml b/rocketmq-dashboard/rocketmq-dashboard-common/Cargo.toml index 38453a882..cdaa44d34 100644 --- a/rocketmq-dashboard/rocketmq-dashboard-common/Cargo.toml +++ b/rocketmq-dashboard/rocketmq-dashboard-common/Cargo.toml @@ -28,22 +28,12 @@ description = "Common shared code for RocketMQ Dashboard implementati rust-version.workspace = true [dependencies] -rocketmq-common = { workspace = true } -rocketmq-client-rust = { workspace = true } -rocketmq-remoting = { workspace = true } - -# Async runtime -tokio = { workspace = true } # Async trait support async-trait = "0.1" # Serialization serde = { workspace = true } -serde_json = { workspace = true } - -# Logging -tracing = { workspace = true } # Error handling anyhow = { workspace = true } diff --git a/rocketmq-example/Cargo.toml b/rocketmq-example/Cargo.toml index 8c42a23e4..b5a85961c 100644 --- a/rocketmq-example/Cargo.toml +++ b/rocketmq-example/Cargo.toml @@ -17,17 +17,23 @@ # This is a standalone project, not part of the parent workspace [package] -name = "rocketmq-example" -version = "0.8.0" -authors = ["mxsm "] -edition = "2024" -homepage = "https://github.com/mxsm/rocketmq-rust" -repository = "https://github.com/mxsm/rocketmq-rust" -license = "MIT OR Apache-2.0" -keywords = ["apache-rocketmq", "rocketmq-rust", "rocketmq-rs", "rust", "rocketmq-client"] -readme = "README.md" -description = "Examples for RocketMQ Rust implementation" -rust-version = "1.85.0" +name = "rocketmq-example" +version = "0.8.0" +authors = ["mxsm "] +edition = "2024" +homepage = "https://github.com/mxsm/rocketmq-rust" +repository = "https://github.com/mxsm/rocketmq-rust" +license = "MIT OR Apache-2.0" +keywords = [ + "apache-rocketmq", + "rocketmq-rust", + "rocketmq-rs", + "rust", + "rocketmq-client", +] +readme = "README.md" +description = "Examples for RocketMQ Rust implementation" +rust-version = "1.85.0" resolver = "3" [dependencies] @@ -68,4 +74,4 @@ path = "examples/producer/send_to_queue.rs" [[example]] name = "producer-send-with-selector" -path = "examples/producer/send_with_selector.rs" \ No newline at end of file +path = "examples/producer/send_with_selector.rs" diff --git a/rocketmq-remoting/Cargo.toml b/rocketmq-remoting/Cargo.toml index c42a386b4..0bc38e787 100644 --- a/rocketmq-remoting/Cargo.toml +++ b/rocketmq-remoting/Cargo.toml @@ -29,10 +29,10 @@ rust-version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rocketmq-common = { workspace = true } -rocketmq-macros = { workspace = true } -rocketmq-rust = { workspace = true } -rocketmq-error = { workspace = true } +rocketmq-common = { workspace = true } +rocketmq-macros = { workspace = true } +rocketmq-rust = { workspace = true } +rocketmq-error = { workspace = true } anyhow.workspace = true bytes.workspace = true diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-cli/Cargo.toml b/rocketmq-tools/rocketmq-admin/rocketmq-admin-cli/Cargo.toml index 27cf0347a..d051a354b 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-cli/Cargo.toml +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-cli/Cargo.toml @@ -14,23 +14,20 @@ [package] -name = "rocketmq-admin-cli" -version.workspace = true -authors.workspace = true -edition = "2024" -homepage.workspace = true -repository.workspace = true -license.workspace = true -keywords = ["rocketmq", "cli", "admin", "messaging", "rocketmq-rust"] -categories = ["command-line-utilities", "development-tools"] -readme = "README.md" -description = "Command-line interface for managing RocketMQ-Rust and Apache RocketMQ clusters" +name = "rocketmq-admin-cli" +version.workspace = true +authors.workspace = true +edition = "2024" +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords = ["rocketmq", "cli", "admin", "messaging", "rocketmq-rust"] +categories = ["command-line-utilities", "development-tools"] +readme = "README.md" +description = "Command-line interface for managing RocketMQ-Rust and Apache RocketMQ clusters" rust-version.workspace = true [dependencies] -rocketmq-tools = { workspace = true } - -tokio = { workspace = true, features = ["full"] } [[bin]] name = "rocketmq-admin-cli" diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/Cargo.toml b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/Cargo.toml index 4dea11755..2cd6414b6 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/Cargo.toml +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/Cargo.toml @@ -47,6 +47,7 @@ tokio = { workspace = true } tracing = { workspace = true } cheetah-string = { workspace = true } +chrono = "0.4" clap = { version = "4.5.59", features = ["derive"] } clap_complete = "4.5" tabled = { version = "0.20.0", features = ["derive"] } diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs index f41676c8a..43bd48c90 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands.rs @@ -177,6 +177,11 @@ impl CommandExecute for ClassificationTablePrint { command: "updateAcl", remark: "Update ACL.", }, + Command { + category: "Broker", + command: "brokerConsumeStats", + remark: "Fetch broker consume stats data.", + }, Command { category: "Broker", command: "brokerStatus", diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs index 3a25c3277..1429f110d 100644 --- a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands.rs @@ -1,4 +1,4 @@ -// Copyright 2026 The RocketMQ Rust Authors +// Copyright 2023 The RocketMQ Rust Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod broker_consume_stats_sub_command; mod broker_status_sub_command; mod clean_expired_cq_sub_command; mod clean_unused_topic_sub_command; @@ -32,6 +33,7 @@ use clap::Subcommand; use rocketmq_error::RocketMQResult; use rocketmq_remoting::runtime::RPCHook; +use crate::commands::broker_commands::broker_consume_stats_sub_command::BrokerConsumeStatsSubCommand; use crate::commands::broker_commands::broker_status_sub_command::BrokerStatusSubCommand; use crate::commands::broker_commands::clean_expired_cq_sub_command::CleanExpiredCQSubCommand; use crate::commands::broker_commands::clean_unused_topic_sub_command::CleanUnusedTopicSubCommand; @@ -49,6 +51,13 @@ use crate::commands::CommandExecute; #[derive(Subcommand)] pub enum BrokerCommands { + #[command( + name = "brokerConsumeStats", + about = "Fetch broker consume stats data.", + long_about = None, + )] + BrokerConsumeStats(BrokerConsumeStatsSubCommand), + #[command( name = "brokerStatus", about = "Fetch broker runtime status data.", @@ -144,6 +153,7 @@ pub enum BrokerCommands { impl CommandExecute for BrokerCommands { async fn execute(&self, rpc_hook: Option>) -> RocketMQResult<()> { match self { + BrokerCommands::BrokerConsumeStats(value) => value.execute(rpc_hook).await, BrokerCommands::BrokerStatus(cmd) => cmd.execute(rpc_hook).await, BrokerCommands::CleanExpiredCQ(value) => value.execute(rpc_hook).await, BrokerCommands::CleanUnusedTopic(value) => value.execute(rpc_hook).await, diff --git a/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/broker_consume_stats_sub_command.rs b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/broker_consume_stats_sub_command.rs new file mode 100644 index 000000000..c33f100c6 --- /dev/null +++ b/rocketmq-tools/rocketmq-admin/rocketmq-admin-core/src/commands/broker_commands/broker_consume_stats_sub_command.rs @@ -0,0 +1,159 @@ +// Copyright 2023 The RocketMQ Rust Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cheetah_string::CheetahString; +use clap::Parser; +use rocketmq_client_rust::admin::mq_admin_ext_async::MQAdminExt; +use rocketmq_common::TimeUtils::get_current_millis; +use rocketmq_error::RocketMQError; +use rocketmq_error::RocketMQResult; +use rocketmq_remoting::runtime::RPCHook; + +use crate::admin::default_mq_admin_ext::DefaultMQAdminExt; +use crate::commands::CommandExecute; + +#[derive(Debug, Clone, Parser)] +pub struct BrokerConsumeStatsSubCommand { + #[arg(short = 'b', long = "brokerAddr", required = true, help = "Broker address")] + broker_addr: String, + + #[arg( + short = 't', + long = "timeoutMillis", + required = false, + default_value = "50000", + help = "request timeout Millis" + )] + timeout_millis: u64, + + #[arg( + short = 'l', + long = "level", + required = false, + default_value = "0", + help = "threshold of print diff" + )] + diff_level: i64, + + #[arg( + short = 'o', + long = "order", + required = false, + default_value = "false", + help = "order topic" + )] + is_order: String, +} + +fn format_timestamp(timestamp: i64) -> String { + if timestamp <= 0 { + return "-".to_string(); + } + let secs = timestamp / 1000; + let nanos = ((timestamp % 1000) * 1_000_000) as u32; + match chrono::DateTime::from_timestamp(secs, nanos) { + Some(dt) => { + use chrono::Local; + let local_dt = dt.with_timezone(&Local); + local_dt.format("%Y-%m-%d %H:%M:%S").to_string() + } + None => "-".to_string(), + } +} + +impl CommandExecute for BrokerConsumeStatsSubCommand { + async fn execute(&self, rpc_hook: Option>) -> RocketMQResult<()> { + let mut default_mqadmin_ext = if let Some(rpc_hook) = rpc_hook { + DefaultMQAdminExt::with_rpc_hook(rpc_hook) + } else { + DefaultMQAdminExt::new() + }; + + default_mqadmin_ext + .client_config_mut() + .set_instance_name(get_current_millis().to_string().into()); + + let broker_addr = self.broker_addr.trim().to_string(); + let is_order = self.is_order.trim().parse::().unwrap_or(false); + let timeout_millis = self.timeout_millis; + let diff_level = self.diff_level; + + let operation_result = async { + MQAdminExt::start(&mut default_mqadmin_ext).await.map_err(|e| { + RocketMQError::Internal(format!( + "BrokerConsumeStatsSubCommand: Failed to start MQAdminExt: {}", + e + )) + })?; + + let consume_stats_list = default_mqadmin_ext + .fetch_consume_stats_in_broker(CheetahString::from(broker_addr.as_str()), is_order, timeout_millis) + .await + .map_err(|e| { + RocketMQError::Internal(format!( + "BrokerConsumeStatsSubCommand: Failed to fetch consume stats: {}", + e + )) + })?; + + println!( + "{:<64} {:<64} {:<32} {:<4} {:<20} {:<20} {:<20} #LastTime", + "#Topic", "#Group", "#Broker Name", "#QID", "#Broker Offset", "#Consumer Offset", "#Diff" + ); + + for map in &consume_stats_list.consume_stats_list { + for (group, consume_stats_array) in map { + for consume_stats in consume_stats_array { + let mut mq_list: Vec<_> = consume_stats.offset_table.keys().collect(); + mq_list.sort(); + + for mq in &mq_list { + let offset_wrapper = &consume_stats.offset_table[*mq]; + let diff = offset_wrapper.get_broker_offset() - offset_wrapper.get_consumer_offset(); + + if diff < diff_level { + continue; + } + + if offset_wrapper.get_last_timestamp() > 0 { + let last_time = format_timestamp(offset_wrapper.get_last_timestamp()); + println!( + "{:<64} {:<64} {:<32} {:<4} {:<20} {:<20} {:<20} {}", + mq.get_topic(), + group, + mq.get_broker_name(), + mq.get_queue_id(), + offset_wrapper.get_broker_offset(), + offset_wrapper.get_consumer_offset(), + diff, + last_time + ); + } + } + } + } + } + + println!("\nDiff Total: {}", consume_stats_list.total_diff); + + Ok(()) + } + .await; + + MQAdminExt::shutdown(&mut default_mqadmin_ext).await; + operation_result + } +}