Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,17 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
resourceConcurrencyKey := table.Name
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) != "" {
resourceConcurrencyKey = lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey)
}
resourceConcurrencyKey = resourceConcurrencyKey + "-" + client.ID() + "-" + "resource"
resourceConcurrency := s.scheduler.singleResourceMaxConcurrency
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.MaxResourceConcurrency) > 0 {
resourceConcurrency = int64(lo.FromPtr(table.ConcurrencySettings.MaxResourceConcurrency))
}

resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(resourceConcurrency))
resourceSem := resourceSemVal.(*semaphore.Weighted)
if err := resourceSem.Acquire(ctx, 1); err != nil {
s.logger.Warn().Err(err).Msg("failed to acquire semaphore. context cancelled")
Expand Down Expand Up @@ -227,9 +236,18 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
resolvedResources <- resource
for _, relation := range resource.Table.Relations {
relation := relation
tableConcurrencyKey := table.Name + "-" + client.ID()
tableConcurrencyKey := table.Name
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey) != "" {
tableConcurrencyKey = lo.FromPtr(table.ConcurrencySettings.ConcurrencyKey)
}
tableConcurrencyKey = tableConcurrencyKey + "-" + client.ID()
tableConcurrency := s.scheduler.singleNestedTableMaxConcurrency
if table.ConcurrencySettings != nil && lo.FromPtr(table.ConcurrencySettings.MaxTableConcurrency) > 0 {
tableConcurrency = int64(lo.FromPtr(table.ConcurrencySettings.MaxTableConcurrency))
}

// Acquire the semaphore for the table
tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleNestedTableMaxConcurrency))
tableSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(tableConcurrencyKey, semaphore.NewWeighted(tableConcurrency))
tableSem := tableSemVal.(*semaphore.Weighted)
if err := tableSem.Acquire(ctx, 1); err != nil {
// This means context was cancelled
Expand Down
8 changes: 8 additions & 0 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ type Table struct {

// IgnorePKComponentsMismatchValidation is a flag that indicates if the table should skip validating usage of both primary key components and primary keys
IgnorePKComponentsMismatchValidation bool `json:"ignore_pk_components_mismatch_validation"`

// ConcurrencySettings Enables configuring concurrency settings for specific table. This provides a mechanism for irregular APIs to have unique settings either because it shares a common rate limit pool or has different limits than the default settings.
ConcurrencySettings *ConcurrencySettings `json:"concurrency_settings,omitempty"`
}
type ConcurrencySettings struct {
ConcurrencyKey *string `json:"concurrency_key,omitempty"`
MaxResourceConcurrency *int `json:"max_resource_concurrency,omitempty"`
MaxTableConcurrency *int `json:"max_table_concurrency,omitempty"`
}

var (
Expand Down