From 6a3349074a6ca112d3e35fac7344acbc87eb5bf2 Mon Sep 17 00:00:00 2001 From: Hung Nguyen Date: Mon, 10 Mar 2025 14:20:51 +0700 Subject: [PATCH] fix(lcm): Decrease default thread count using for lcm bricks --- lib/gooddata/lcm/actions/import_object_collections.rb | 8 ++++++-- lib/gooddata/lcm/actions/synchronize_dataset_mappings.rb | 8 ++++++-- lib/gooddata/lcm/actions/synchronize_user_filters.rb | 6 +++++- lib/gooddata/lcm/lcm2.rb | 4 ++++ lib/gooddata/models/user_filters/user_filter_builder.rb | 2 +- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/lib/gooddata/lcm/actions/import_object_collections.rb b/lib/gooddata/lcm/actions/import_object_collections.rb index 5b4b9e178..c99d883b1 100644 --- a/lib/gooddata/lcm/actions/import_object_collections.rb +++ b/lib/gooddata/lcm/actions/import_object_collections.rb @@ -20,6 +20,9 @@ class ImportObjectCollections < BaseAction description 'Synchronization Info' param :synchronize, array_of(instance_of(Type::SynchronizationInfoType)), required: true, generated: true + + description 'Number Of Threads' + param :number_of_threads, instance_of(Type::StringType), required: false, default: '10' end class << self @@ -28,15 +31,16 @@ def call(params) client = params.gdc_gd_client development_client = params.development_client + number_of_threads = Integer(params.number_of_threads || '10') - params.synchronize.peach do |info| + params.synchronize.peach(number_of_threads) do |info| from = info.from to_projects = info.to transfer_uris = info.transfer_uris from_project = development_client.projects(from) || fail("Invalid 'from' project specified - '#{from}'") - to_projects.peach do |entry| + to_projects.peach(number_of_threads) do |entry| pid = entry[:pid] to_project = client.projects(pid) || fail("Invalid 'to' project specified - '#{pid}'") diff --git a/lib/gooddata/lcm/actions/synchronize_dataset_mappings.rb b/lib/gooddata/lcm/actions/synchronize_dataset_mappings.rb index 9ecec109f..f2bf4be5a 100644 --- a/lib/gooddata/lcm/actions/synchronize_dataset_mappings.rb +++ b/lib/gooddata/lcm/actions/synchronize_dataset_mappings.rb @@ -33,6 +33,9 @@ class SynchronizeDataSetMapping < BaseAction description 'Sync failed list' param :sync_failed_list, instance_of(Type::HashType), required: false + + description 'Number Of Threads' + param :number_of_threads, instance_of(Type::StringType), required: false, default: '10' end RESULT_HEADER = %i[from to count status] @@ -45,8 +48,9 @@ def call(params) client = params.gdc_gd_client development_client = params.development_client + number_of_threads = Integer(params.number_of_threads || '10') - params.synchronize.peach do |info| + params.synchronize.peach(number_of_threads) do |info| from_project = info.from to_projects = info.to @@ -60,7 +64,7 @@ def call(params) if dataset_mapping&.dig('datasetMappings', 'items').nil? || dataset_mapping['datasetMappings']['items'].empty? params.gdc_logger.info "Project: '#{from.title}', PID: '#{from.pid}' has no model mapping, skip synchronizing model mapping." else - to_projects.peach do |to| + to_projects.peach(number_of_threads) do |to| pid = to[:pid] next if sync_failed_project(pid, params) diff --git a/lib/gooddata/lcm/actions/synchronize_user_filters.rb b/lib/gooddata/lcm/actions/synchronize_user_filters.rb index 29938314e..ec1153068 100644 --- a/lib/gooddata/lcm/actions/synchronize_user_filters.rb +++ b/lib/gooddata/lcm/actions/synchronize_user_filters.rb @@ -74,6 +74,9 @@ class SynchronizeUserFilters < BaseAction description 'Makes the brick run without altering user filters' param :dry_run, instance_of(Type::StringType), required: false, default: false + + description 'Number Of Threads' + param :number_of_threads, instance_of(Type::StringType), required: false, default: '10' end class << self @@ -104,6 +107,7 @@ def call(params) symbolized_config = GoodData::Helpers.symbolize_keys(symbolized_config) symbolized_config[:labels] = symbolized_config[:labels].map { |l| GoodData::Helpers.symbolize_keys(l) } multiple_projects_column = params.multiple_projects_column + number_of_threads = Integer(params.number_of_threads || '10') mode = params.sync_mode unless MODES.include?(mode) @@ -196,7 +200,7 @@ def call(params) unless run_params[:do_not_touch_filters_that_are_not_mentioned] to_be_deleted_clients = UserBricksHelper.non_working_clients(domain_clients, working_client_ids) - to_be_deleted_clients.peach do |c| + to_be_deleted_clients.peach(number_of_threads) do |c| begin current_project = c.project users = users_by_project[c.client_id] diff --git a/lib/gooddata/lcm/lcm2.rb b/lib/gooddata/lcm/lcm2.rb index e0faeab85..c0db81072 100644 --- a/lib/gooddata/lcm/lcm2.rb +++ b/lib/gooddata/lcm/lcm2.rb @@ -295,6 +295,10 @@ def print_actions_result(actions, results) end def perform(mode, params = {}) + # Default setting for $pmap_default_thread_count = 20 in gooddata.rb file. We are going to decrease default + # number of threads count to 10 for LCM bricks only + $pmap_default_thread_count = 10 # rubocop:disable GlobalVars + params = convert_params(params) GoodData.gd_logger.brick = mode diff --git a/lib/gooddata/models/user_filters/user_filter_builder.rb b/lib/gooddata/models/user_filters/user_filter_builder.rb index e3b364f20..e64988301 100644 --- a/lib/gooddata/models/user_filters/user_filter_builder.rb +++ b/lib/gooddata/models/user_filters/user_filter_builder.rb @@ -514,7 +514,7 @@ def self.execute_mufs(user_filters, options = {}) if to_create.empty? create_results = [] else - create_results = to_create.each_slice(100).flat_map do |batch| + create_results = to_create.each_slice(50).flat_map do |batch| batch.pmapcat do |related_uri, group| group.each(&:save) res = client.get("/gdc/md/#{project.pid}/userfilters?users=#{related_uri}")