From 78221e7b0fa2c7b74da3968ab47019d76b4d3432 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 9 Dec 2025 11:34:12 -0600 Subject: [PATCH 1/2] Update table.go --- schema/table.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/schema/table.go b/schema/table.go index 875d973714..ebcffcb414 100644 --- a/schema/table.go +++ b/schema/table.go @@ -118,6 +118,14 @@ type Table struct { // IgnorePKComponentsMismatchValidation is a flag that indicates if the table should skip validating usage of both primary key components and primary keys IgnorePKComponentsMismatchValidation bool `json:"ignore_pk_components_mismatch_validation"` + + // ConcurrencySettings Enables configuring concurrency settings for specific table. This provides a mechanism for irregular APIs to have unique settings either because it shares a common rate limit pool or has different limits than the default settings. + ConcurrencySettings *ConcurrencySettings `json:"concurrency_settings,omitempty"` +} +type ConcurrencySettings struct { + ConcurrencyKey *string `json:"concurrency_key,omitempty"` + MaxResourceConcurrency *int `json:"max_resource_concurrency,omitempty"` + MaxTableConcurrency *int `json:"max_table_concurrency,omitempty"` } var ( From 972e100fbe0dee6d326d5c93bb14e8538d40b7f3 Mon Sep 17 00:00:00 2001 From: bbernays Date: Tue, 9 Dec 2025 11:34:15 -0600 Subject: [PATCH 2/2] Update scheduler_dfs.go --- scheduler/scheduler_dfs.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 515cd9bc77..fec4c6e149 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -163,8 +163,17 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) } for i := range chunks { - resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource" - resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency)) + resourceConcurrencyKey := table.Name + if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) != "" { + resourceConcurrencyKey = lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) + } + resourceConcurrencyKey = resourceConcurrencyKey + "-" + client.ID() + "-" + "resource" + resourceConcurrency := s.scheduler.singleResourceMaxConcurrency + if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.MaxResourceConcurrency) > 0 { + resourceConcurrency = int64(lo.FromPtr(table.ConcurrencySettings.MaxResourceConcurrency)) + } + + resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(resourceConcurrency)) resourceSem := resourceSemVal.(*semaphore.Weighted) if err := resourceSem.Acquire(ctx, 1); err != nil { s.logger.Warn().Err(err).Msg("failed to acquire semaphore. context cancelled") @@ -227,9 +236,18 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl resolvedResources <- resource for _, relation := range resource.Table.Relations { relation := relation - tableConcurrencyKey := table.Name + "-" + client.ID() + tableConcurrencyKey := table.Name + if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) != "" { + tableConcurrencyKey = lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) + } + tableConcurrencyKey = tableConcurrencyKey + "-" + client.ID() + tableConcurrency := s.scheduler.singleNestedTableMaxConcurrency + if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.MaxTableConcurrency) > 0 { + tableConcurrency = int64(lo.FromPtr(table.ConcurrencySettings.MaxTableConcurrency)) + } + // Acquire the semaphore for the table - tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency)) + tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(tableConcurrency)) tableSem := tableSemVal.(*semaphore.Weighted) if err := tableSem.Acquire(ctx, 1); err != nil { // This means context was cancelled