Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions e2e_test/background_ddl/sink.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
statement ok
set streaming_parallelism=4;

statement ok
SET RW_IMPLICIT_FLUSH TO true;

Expand All @@ -13,6 +16,9 @@ insert into t select * from generate_series(1, 10000);
statement ok
set backfill_rate_limit=1;

statement ok
set sink_rate_limit=1;

statement ok
set background_ddl=true;

Expand All @@ -35,6 +41,15 @@ show jobs;
statement ok
alter sink sink1 set SINK_RATE_LIMIT to 10000;

statement ok
alter sink sink1 set backfill_rate_limit to 10000;

query I
select rate_limit from rw_rate_limit
----
10000
10000
Comment on lines +48 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so one for SINK_RATE_LIMIT and one for backfill_rate_limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep


statement count 0 retry 5 backoff 2s
show jobs;

Expand Down
8 changes: 8 additions & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,11 @@ enum DistanceType {
DISTANCE_TYPE_COSINE = 3;
DISTANCE_TYPE_INNER_PRODUCT = 4;
}

enum ThrottleType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type -> Object, as you are rate limit database objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refers to the type of rate limit we are applying. Source, backfill, sink, dml these are all different types of rate limits.

THROTTLE_TYPE_UNSPECIFIED = 0;
THROTTLE_TYPE_DML = 1;
THROTTLE_TYPE_BACKFILL = 2;
THROTTLE_TYPE_SOURCE = 3;
THROTTLE_TYPE_SINK = 4;
}
15 changes: 7 additions & 8 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -346,21 +346,20 @@
repeated ObjectDependencies dependencies = 1;
}

enum ThrottleTarget {

Check failure on line 349 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present enum value "7" on enum "ThrottleTarget" was deleted without reserving the number "7".

Check failure on line 349 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present enum value "6" on enum "ThrottleTarget" was deleted without reserving the number "6".

Check failure on line 349 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present enum value "7" on enum "ThrottleTarget" was deleted without reserving the name "FRAGMENT".

Check failure on line 349 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present enum value "6" on enum "ThrottleTarget" was deleted without reserving the name "SINK".
THROTTLE_TARGET_UNSPECIFIED = 0;
SOURCE = 1;
MV = 2;
TABLE_WITH_SOURCE = 3;
CDC_TABLE = 4;
TABLE_DML = 5;
SINK = 6;
FRAGMENT = 7;
TABLE = 3;

Check failure on line 353 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Enum value "3" on enum "ThrottleTarget" changed name from "TABLE_WITH_SOURCE" to "TABLE".
SINK = 4;

Check failure on line 354 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Enum value "4" on enum "ThrottleTarget" changed name from "CDC_TABLE" to "SINK".
FRAGMENT = 5;

Check failure on line 355 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Enum value "5" on enum "ThrottleTarget" changed name from "TABLE_DML" to "FRAGMENT".
}

message ApplyThrottleRequest {
ThrottleTarget kind = 1;
uint32 id = 2;
optional uint32 rate = 3;
ThrottleTarget throttle_target = 1;

Check failure on line 359 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "ApplyThrottleRequest" changed name from "kind" to "throttle_target".

Check failure on line 359 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "throttle_target" on message "ApplyThrottleRequest" changed option "json_name" from "kind" to "throttleTarget".
common.ThrottleType throttle_type = 2;

Check failure on line 360 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "throttle_type" on message "ApplyThrottleRequest" changed option "json_name" from "id" to "throttleType".
Comment on lines +359 to +360
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think ThrottleTarget and ThrottleType are orthogonal in concept.
ThrottleType in your design stands for things you want to apply rate limit, but ThrottleTarget also contain similar concepts.

Maybe we can change to ThrottleObject and ThrottleDirection,
eg. ThrottleObject.sink + ThrottleDirection.out = sink_rate_limit and ThrottleObject.sink + ThrottleDirection.in = backfill_rate_limit

Copy link
Contributor Author

@kwannoel kwannoel Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support streaming rate limit in the future, direction also becomes ambiguous. streaming and source share the same semantics. Further consider altering a cdc table rate limit. Lets say we specify the direction "in". Are we changing the dml rate limit? Or the source rate limit? Or the backfill rate limit? Decoupling the rate limit type from the target makes this unambiguous.

uint32 id = 3;
optional uint32 rate = 4;
}

message ApplyThrottleResponse {
Expand Down
4 changes: 3 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ message ResumeMutation {}
message ThrottleMutation {
message RateLimit {
optional uint32 rate_limit = 1;
common.ThrottleType throttle_type = 2;
}

// Actor -> RateLimit
map<uint32, RateLimit> actor_throttle = 1;
}

Expand Down Expand Up @@ -172,7 +174,7 @@ message BarrierMutation {
PauseMutation pause = 7;
// Resume the dataflow of the whole streaming graph, only used for scaling.
ResumeMutation resume = 8;
// Throttle specific source exec or backfill exec.
// Alter the RateLimit of some specific executors
ThrottleMutation throttle = 10;
// Drop subscription on mv
DropSubscriptionsMutation drop_subscriptions = 12;
Expand Down
12 changes: 10 additions & 2 deletions src/ctl/src/cmd_impl/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::common::PbThrottleType;
use risingwave_pb::meta::PbThrottleTarget;

use crate::ThrottleCommandArgs;
use crate::common::CtlContext;
use crate::{ThrottleCommandArgs, ThrottleTypeArg};

pub async fn apply_throttle(
context: &CtlContext,
kind: PbThrottleTarget,
params: ThrottleCommandArgs,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let throttle_type = match params.throttle_type {
ThrottleTypeArg::Dml => PbThrottleType::Dml,
ThrottleTypeArg::Backfill => PbThrottleType::Backfill,
ThrottleTypeArg::Source => PbThrottleType::Source,
ThrottleTypeArg::Sink => PbThrottleType::Sink,
};

meta_client
.apply_throttle(kind, params.id, params.rate)
.apply_throttle(kind, throttle_type, params.id, params.rate)
.await?;
Ok(())
}
19 changes: 19 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,12 +506,28 @@ enum TestCommands {
enum ThrottleCommands {
Source(ThrottleCommandArgs),
Mv(ThrottleCommandArgs),
Sink(ThrottleCommandArgs),
}

#[derive(Clone, Debug, clap::ValueEnum)]
pub enum ThrottleTypeArg {
Dml,
Backfill,
Source,
Sink,
}

#[derive(Clone, Debug, Args)]
pub struct ThrottleCommandArgs {
/// The ID of the object to throttle
#[clap(long, required = true)]
id: u32,
/// The rate limit to apply
#[clap(long)]
rate: Option<u32>,
/// The type of throttle to apply
#[clap(long, value_enum, required = true)]
throttle_type: ThrottleTypeArg,
Comment on lines +528 to +530
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also if using id, there is no need for ThrottleTypeArg. Id is unique in the cluster, using id can find the object and determine the throttle_type.

Copy link
Contributor Author

@kwannoel kwannoel Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Id is not specified at the executor level, it's done at the fragment level in the current implementation. In such a scenario, consider when sink executor and scan executor are collocated in the same fragment. When altering either rate limit (sink or backfill), both rate limits will be changed, since the provided id is fragment id. You can see that the slt test was changed, because this was incorrect behaviour that existed due to the current design.

}

#[derive(Subcommand, Clone, Debug)]
Expand Down Expand Up @@ -921,6 +937,9 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Throttle(ThrottleCommands::Mv(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Mv, args).await?;
}
Commands::Throttle(ThrottleCommands::Sink(args)) => {
apply_throttle(context, risingwave_pb::meta::PbThrottleTarget::Sink, args).await?;
}
Commands::Meta(MetaCommands::SetCdcTableBackfillParallelism {
table_id,
parallelism,
Expand Down
81 changes: 56 additions & 25 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::common::ThrottleType as PbThrottleType;
use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;

Expand All @@ -27,7 +28,8 @@ use crate::session::SessionImpl;

pub async fn handle_alter_streaming_rate_limit(
handler_args: HandlerArgs,
kind: PbThrottleTarget,
throttle_target: PbThrottleTarget,
throttle_type: PbThrottleType,
table_name: ObjectName,
rate_limit: i32,
) -> Result<RwPgResponse> {
Expand All @@ -40,8 +42,8 @@ pub async fn handle_alter_streaming_rate_limit(

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (stmt_type, id) = match kind {
PbThrottleTarget::Mv => {
let (stmt_type, id) = match (throttle_target, throttle_type) {
(PbThrottleTarget::Mv, PbThrottleType::Backfill) => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
Expand All @@ -54,50 +56,58 @@ pub async fn handle_alter_streaming_rate_limit(
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_MATERIALIZED_VIEW, table.id.as_raw_id())
}
PbThrottleTarget::Source => {
(PbThrottleTarget::Source, PbThrottleType::Source) => {
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
(StatementType::ALTER_SOURCE, source.id.as_raw_id())
}
PbThrottleTarget::TableWithSource => {
(PbThrottleTarget::Table, PbThrottleType::Dml) => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
// Get the corresponding source catalog.
let source_id = if let Some(id) = table.associated_source_id {
id.as_raw_id()
} else {
bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_SOURCE, source_id)
}
PbThrottleTarget::CdcTable => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::Table {
return Err(ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" ",)).into());
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a table",
))
.into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_TABLE, table.id.as_raw_id())
}
PbThrottleTarget::TableDml => {
(PbThrottleTarget::Table, PbThrottleType::Source) => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
if table.table_type != TableType::Table {
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a table",
))
.into());
}
let source_id = if let Some(id) = table.associated_source_id {
id.as_raw_id()
} else {
bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_TABLE, source_id)
}
(PbThrottleTarget::Table, PbThrottleType::Backfill) => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
if table.table_type != TableType::Table || table.cdc_table_type.is_none() {
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a CDC table",
))
.into());
}
(StatementType::ALTER_TABLE, table.id.as_raw_id())
}
PbThrottleTarget::Sink => {
(PbThrottleTarget::Sink, PbThrottleType::Sink) => {
let reader = session.env().catalog_reader().read_guard();
let (sink, schema_name) =
reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
Expand All @@ -107,10 +117,28 @@ pub async fn handle_alter_streaming_rate_limit(
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
(StatementType::ALTER_SINK, sink.id.as_raw_id())
}
_ => bail!("Unsupported throttle target: {:?}", kind),
(PbThrottleTarget::Sink, PbThrottleType::Backfill) => {
let reader = session.env().catalog_reader().read_guard();
let (sink, schema_name) =
reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**sink)?;
(StatementType::ALTER_SINK, sink.id.as_raw_id())
}
_ => bail!(
"Unsupported throttle target: {:?} and throttle type: {:?}",
throttle_target,
throttle_type
),
};
execute_with_long_running_notification(
handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type),
handle_alter_streaming_rate_limit_by_id(
&session,
throttle_target,
throttle_type,
id,
rate_limit,
stmt_type,
),
&session,
"ALTER STREAMING RATE LIMIT",
LongRunningNotificationAction::SuggestRecover,
Expand All @@ -120,7 +148,8 @@ pub async fn handle_alter_streaming_rate_limit(

pub async fn handle_alter_streaming_rate_limit_by_id(
session: &SessionImpl,
kind: PbThrottleTarget,
throttle_target: PbThrottleTarget,
throttle_type: PbThrottleType,
id: u32,
rate_limit: i32,
stmt_type: StatementType,
Expand All @@ -133,7 +162,9 @@ pub async fn handle_alter_streaming_rate_limit_by_id(
Some(rate_limit as u32)
};

meta_client.apply_throttle(kind, id, rate_limit).await?;
meta_client
.apply_throttle(throttle_target, throttle_type, id, rate_limit)
.await?;

Ok(PgResponse::empty_result(stmt_type))
}
23 changes: 20 additions & 3 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ pub async fn handle(
AlterTableOperation::SetSourceRateLimit { rate_limit } => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::TableWithSource,
PbThrottleTarget::Table,
risingwave_pb::common::PbThrottleType::Source,
name,
rate_limit,
)
Expand All @@ -863,7 +864,8 @@ pub async fn handle(
AlterTableOperation::SetDmlRateLimit { rate_limit } => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::TableDml,
PbThrottleTarget::Table,
risingwave_pb::common::PbThrottleType::Dml,
name,
rate_limit,
)
Expand All @@ -890,7 +892,8 @@ pub async fn handle(
AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::CdcTable,
PbThrottleTarget::Table,
risingwave_pb::common::PbThrottleType::Backfill,
name,
rate_limit,
)
Expand Down Expand Up @@ -1037,6 +1040,7 @@ pub async fn handle(
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::Mv,
risingwave_pb::common::PbThrottleType::Backfill,
name,
rate_limit,
)
Expand Down Expand Up @@ -1166,6 +1170,17 @@ pub async fn handle(
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::Sink,
risingwave_pb::common::PbThrottleType::Sink,
name,
rate_limit,
)
.await
}
AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::Sink,
risingwave_pb::common::PbThrottleType::Backfill,
name,
rate_limit,
)
Expand Down Expand Up @@ -1261,6 +1276,7 @@ pub async fn handle(
alter_streaming_rate_limit::handle_alter_streaming_rate_limit(
handler_args,
PbThrottleTarget::Source,
risingwave_pb::common::PbThrottleType::Source,
name,
rate_limit,
)
Expand Down Expand Up @@ -1375,6 +1391,7 @@ pub async fn handle(
alter_streaming_rate_limit::handle_alter_streaming_rate_limit_by_id(
&handler_args.session,
PbThrottleTarget::Fragment,
risingwave_pb::common::PbThrottleType::Backfill,
*fragment_id,
rate_limit,
StatementType::SET_VARIABLE,
Expand Down
Loading
Loading