From c721de0833041dd0a03c763e29de403227946820 Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Mon, 19 Jan 2026 11:48:03 +0300 Subject: [PATCH 1/2] router: refactoring of ref_ids' logic in router_map_callrw This patch takes initialization of `rid` out to `router_map_callrw` and passes this variable to ref-functions. It is needed for future features tidiness, for example - `make full map_callrw with split args` in which the logic of `router_map_callrw` becomes more complex. Needed for tarantool/vshard#559 NO_DOC=refactoring NO_TEST=refactoring --- vshard/router/init.lua | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 50e6b85a..ae4677cd 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -774,7 +774,7 @@ end -- -- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. -- -local function router_ref_storage_all(router, timeout) +local function router_ref_storage_all(router, timeout, rid) local replicasets = router.replicasets local deadline = fiber_clock() + timeout local err, err_id, res @@ -782,8 +782,6 @@ local function router_ref_storage_all(router, timeout) local bucket_count = 0 local opts_async = {is_async = true} local rs_count = 0 - local rid = M.ref_id - M.ref_id = rid + 1 -- Nil checks are done explicitly here (== nil instead of 'not'), because -- netbox requests return box.NULL instead of nils. @@ -837,20 +835,20 @@ local function router_ref_storage_all(router, timeout) router.total_bucket_count - bucket_count) goto fail end - do return timeout, nil, nil, rid, replicasets end + do return timeout, nil, nil, replicasets end ::fail:: for _, f in pairs(futures) do f:discard() end - return nil, err, err_id, rid, replicasets + return nil, err, err_id, replicasets end -- -- Perform Ref stage of the Ref-Map-Reduce process on a subset of all the -- replicasets, which contains all the listed bucket IDs. -- -local function router_ref_storage_by_buckets(router, bucket_ids, timeout) +local function router_ref_storage_by_buckets(router, bucket_ids, timeout, rid) local grouped_buckets local group_count local err, err_id, res @@ -859,8 +857,6 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout) local futures = {} local opts_async = {is_async = true} local deadline = fiber_clock() + timeout - local rid = M.ref_id - M.ref_id = rid + 1 -- Nil checks are done explicitly here (== nil instead of 'not'), because -- netbox requests return box.NULL instead of nils. @@ -950,13 +946,13 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout) timeout = deadline - fiber_clock() end end - do return timeout, nil, nil, rid, replicasets_to_map end + do return timeout, nil, nil, replicasets_to_map end ::fail:: for _, f in pairs(futures) do f:discard() end - return nil, err, err_id, rid, replicasets_to_map + return nil, err, err_id, replicasets_to_map end -- @@ -1146,17 +1142,20 @@ local function router_map_callrw(router, func, args, opts) else timeout = consts.CALL_TIMEOUT_MIN end + rid = M.ref_id + M.ref_id = rid + 1 if plain_bucket_ids then - timeout, err, err_id, rid, replicasets_to_map = - router_ref_storage_by_buckets(router, plain_bucket_ids, timeout) + timeout, err, err_id, replicasets_to_map = + router_ref_storage_by_buckets(router, plain_bucket_ids, timeout, + rid) -- Grouped arguments are only possible with partial Map-Reduce. if timeout then grouped_args = router_group_map_callrw_args( router, plain_bucket_ids, bucket_ids) end else - timeout, err, err_id, rid, replicasets_to_map = - router_ref_storage_all(router, timeout) + timeout, err, err_id, replicasets_to_map = + router_ref_storage_all(router, timeout, rid) end if timeout then map, err, err_id = replicasets_map_reduce(replicasets_to_map, rid, func, From edfeebc7acf5e723819bf4b31632ab4bfd43f8c8 Mon Sep 17 00:00:00 2001 From: Roman Gromov Date: Mon, 19 Jan 2026 12:11:12 +0300 Subject: [PATCH 2/2] router: make full map_callrw with split args This patch introduces a new way of `map_callrw` execution by which we can pass some arguments to all storages and split buckets' arguments to those storages that have at least one bucket of `bucket_ids`. To achieve this we introduce a new string option - `mode` to `map_callrw` api. Also we change the logic of ref stages in `map_callrw`. Now we primarily ref storages that have at least one bucket of `bucket_ids` by `router_ref_storage_by_buckets` function. It can help us to cover `partial map_callrw` scenarios and a part of `full map_callrw` with split args. After that if mode is `full` we ref remaining storages by `router_ref_remaining_storages` function. Closes tarantool#559 @TarantoolBot document Title: vshard: `mode` option for `router.map_callrw()` This string option regulates on which storages the user function will be executed via `map_callrw`. Possible values: 1) mode = 'partial'. In this mode user function will be executed on storages that have at least one bucket of 'bucket_ids'. The 'bucket_ids' option can be presented in two ways: like a numeric array of buckets' ids or like a map of buckets' arguments. In first one user function will only receive args, in second one it will additionally receive buckets' arguments. 2) mode = 'full'. In this mode user function will be executed with args on all storages in cluster. If we pass 'bucket_ids' like a map of bucket's arguments the user function will additionally receive buckets' arguments on those storages that have at least one bucket of 'bucket_ids'. If we didn't specify the 'mode' option, then it is set based on 'bucket_ids' option - if 'bucket_ids' is presented, the mode will be 'partial' otherwise 'full'. Also now `map_callrw` ends with error in cases of `` and ``. --- test/router-luatest/map_callrw_test.lua | 177 ++++++++++++++++++++++++ test/upgrade/upgrade.result | 1 + vshard/consts.lua | 5 + vshard/router/init.lua | 103 ++++++++++---- vshard/storage/init.lua | 9 ++ 5 files changed, 271 insertions(+), 24 deletions(-) diff --git a/test/router-luatest/map_callrw_test.lua b/test/router-luatest/map_callrw_test.lua index 72c6c717..a8d4b3f0 100644 --- a/test/router-luatest/map_callrw_test.lua +++ b/test/router-luatest/map_callrw_test.lua @@ -685,3 +685,180 @@ g.test_map_callrw_with_cdata_bucket_id = function(cg) ilt.assert_not(err) end) end + +g.test_full_map_callrw_with_numeric_bucket_ids = function(cg) + local res = router_do_map(cg.router, {123}, { + mode = 'full', + timeout = vtest.wait_timeout, + bucket_ids = {1, 2, 3} + }) + t.assert(res.err) + t.assert_equals(res.err.message, 'Router can\'t execute map_callrw ' .. + 'with \'full\' mode and numeric bucket_ids') + t.assert_not(res.err_id) + t.assert_not(res.val) +end + +g.test_partial_map_callrw_with_nil_bucket_ids = function(cg) + local res = router_do_map(cg.router, {123}, { + mode = 'partial', + timeout = vtest.wait_timeout + }) + t.assert(res.err) + t.assert_equals(res.err.message, 'Router can\'t execute map_callrw ' .. + 'with \'partial\' mode and nil bucket_ids') + t.assert_not(res.err_id) + t.assert_not(res.val) +end + +local function make_do_map_tracking_bucket_ids(cg) + -- We override 'do_map' function on storages in order to check that + -- default arguments and bucket arguments were successfully passed into + -- destination storages according to mode and bucket_ids options. + vtest.cluster_exec_each_master(cg, function() + rawset(_G, 'old_do_map', _G.do_map) + rawset(_G, 'do_map', function(args, bucket_args) + ilt.assert_gt(require('vshard.storage.ref').count, 0) + return {ivutil.replicaset_uuid(), + {args = args, b_args = bucket_args}} + end) + end) +end + +local function reset_do_map_to_old_state(cg) + vtest.cluster_exec_each_master(cg, function() + rawset(_G, 'do_map', _G.old_do_map) + end) +end + +g.test_full_map_callrw_with_split_args = function(cg) + make_do_map_tracking_bucket_ids(cg) + local bid1 = vtest.storage_first_bucket(g.replica_1_a) + local bid2 = vtest.storage_first_bucket(g.replica_2_a) + local bid3 = vtest.storage_first_bucket(g.replica_3_a) + + local res = router_do_map(cg.router, {0}, { + mode = 'full', + timeout = vtest.wait_timeout, + bucket_ids = {[bid1] = {111}, [bid2] = {222}, [bid3] = {333}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0, b_args = {[bid1] = {111}}}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0, b_args = {[bid3] = {333}}}}}, + }) + + res = router_do_map(cg.router, {0}, { + mode = 'full', + timeout = vtest.wait_timeout, + bucket_ids = {[bid2] = {222}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0}}}, + }) + reset_do_map_to_old_state(cg) +end + +g.test_full_map_callrw_without_bucket_ids = function(cg) + make_do_map_tracking_bucket_ids(cg) + local res = router_do_map(cg.router, {0}, { + mode = 'full', + timeout = vtest.wait_timeout + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0}}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, {args = 0}}}, + }) + reset_do_map_to_old_state(cg) +end + +g.test_partial_map_callrw_with_numeric_bucket_ids = function(cg) + make_do_map_tracking_bucket_ids(cg) + local bid1 = vtest.storage_first_bucket(g.replica_1_a) + local res = router_do_map(cg.router, {0}, { + mode = 'partial', + timeout = vtest.wait_timeout, + bucket_ids = {bid1} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, {[cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0}}}}) + reset_do_map_to_old_state(cg) +end + +g.test_partial_map_callrw_with_split_args = function(cg) + make_do_map_tracking_bucket_ids(cg) + local bid1 = vtest.storage_first_bucket(g.replica_1_a) + local bid2 = vtest.storage_first_bucket(g.replica_2_a) + local res = router_do_map(cg.router, {0}, { + mode = 'partial', + timeout = vtest.wait_timeout, + bucket_ids = {[bid1] = {111}, [bid2] = {222}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, {args = 0, b_args = {[bid1] = {111}}}}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, b_args = {[bid2] = {222}}}}}, + }) + reset_do_map_to_old_state(cg) +end + +local function move_bucket(src_storage, dest_storage, bucket_id) + src_storage:exec(function(bucket_id, replicaset_id) + t.helpers.retrying({timeout = 60}, function() + local res, err = ivshard.storage.bucket_send(bucket_id, + replicaset_id) + t.assert_not(err) + t.assert(res) + end) + end, {bucket_id, dest_storage:replicaset_uuid()}) + src_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + t.assert_equals(box.space._bucket:select(bucket_id), {}) + end) + end, {bucket_id}) + dest_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + t.assert_equals(box.space._bucket:get(bucket_id).status, 'active') + end) + end, {bucket_id}) +end + +g.test_full_map_callrw_with_split_args_and_broken_cache = function(cg) + make_do_map_tracking_bucket_ids(cg) + cg.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true + ivshard.router.discovery_wakeup() + end) + + local moved_bucket = vtest.storage_first_bucket(cg.replica_1_a) + move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket) + local res = router_do_map(cg.router, {0}, { + mode = 'partial', + timeout = vtest.wait_timeout, + bucket_ids = {[moved_bucket] = {111}} + }) + t.assert_not(res.err) + t.assert_not(res.err_id) + t.assert_equals(res.val, { + [cg.rs2_uuid] = {{cg.rs2_uuid, {args = 0, + b_args = {[moved_bucket] = {111}}}}}, + }) + + cg.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false + ivshard.router.discovery_wakeup() + end) + move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket) + reset_do_map_to_old_state(cg) +end diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result index e0c9469b..393af083 100644 --- a/test/upgrade/upgrade.result +++ b/test/upgrade/upgrade.result @@ -179,6 +179,7 @@ vshard.storage._call('test_api', 1, 2, 3) | - recovery_bucket_stat | - storage_map | - storage_ref + | - storage_ref_check | - storage_ref_check_with_buckets | - storage_ref_make_with_buckets | - storage_unref diff --git a/vshard/consts.lua b/vshard/consts.lua index b42755a4..5142f6a8 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -28,6 +28,11 @@ return { RED = 3, }, + MAP_CALLRW_MODE = { + FULL = 'full', + PARTIAL = 'partial', + }, + REPLICATION_THRESHOLD_SOFT = 1, REPLICATION_THRESHOLD_HARD = 5, REPLICATION_THRESHOLD_FAIL = 10, diff --git a/vshard/router/init.lua b/vshard/router/init.lua index ae4677cd..cfac663c 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -29,6 +29,8 @@ local map_serializer = { __serialize = 'map' } local future_wait = util.future_wait local msgpack_is_object = lmsgpack.is_object +local MAP_CALLRW_FULL = consts.MAP_CALLRW_MODE.FULL +local MAP_CALLRW_PARTIAL = consts.MAP_CALLRW_MODE.PARTIAL if not util.feature.msgpack_object then local msg = 'Msgpack object feature is not supported by current '.. @@ -774,7 +776,7 @@ end -- -- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. -- -local function router_ref_storage_all(router, timeout, rid) +local function router_ref_storage_all(router, timeout, refed_replicasets, rid) local replicasets = router.replicasets local deadline = fiber_clock() + timeout local err, err_id, res @@ -782,6 +784,7 @@ local function router_ref_storage_all(router, timeout, rid) local bucket_count = 0 local opts_async = {is_async = true} local rs_count = 0 + refed_replicasets = refed_replicasets or {} -- Nil checks are done explicitly here (== nil instead of 'not'), because -- netbox requests return box.NULL instead of nils. @@ -796,8 +799,13 @@ local function router_ref_storage_all(router, timeout, rid) goto fail end for id, rs in pairs(replicasets) do - res, err = rs:callrw('vshard.storage._call', - {'storage_ref', rid, timeout}, opts_async) + if refed_replicasets[id] then + res, err = rs:callrw('vshard.storage._call', + {'storage_ref_check', rid}, opts_async) + else + res, err = rs:callrw('vshard.storage._call', + {'storage_ref', rid, timeout}, opts_async) + end if res == nil then err_id = id goto fail @@ -975,13 +983,14 @@ local function replicasets_map_reduce(replicasets, rid, func, args, -- local func_args = {'storage_map', rid, func, args} for id, rs in pairs(replicasets) do - if grouped_args ~= nil then + local rs_args = grouped_args and grouped_args[id] + if rs_args then -- It's cheaper to push and then pop, rather then deepcopy -- arguments table for every call. - table.insert(args, grouped_args[id]) + table.insert(args, rs_args) end local res, err = rs:callrw('vshard.storage._call', func_args, opts_map) - if grouped_args ~= nil then + if rs_args then table.remove(args) end if res == nil then @@ -1079,14 +1088,50 @@ local function router_group_map_callrw_args(router, bucket_ids, bucket_args) return grouped_args end +-- +-- Set the appropriate mode according to bucket_ids option for backward +-- compatibility (in case of opts_mode is nil) and check the given opts_mode +-- correctness in other cases. +-- +local function router_check_map_callrw_mode(opts_mode, bucket_ids) + if opts_mode == nil then + return bucket_ids and MAP_CALLRW_PARTIAL or MAP_CALLRW_FULL + end + if opts_mode == MAP_CALLRW_PARTIAL and bucket_ids == nil then + return nil, lerror.make('Router can\'t execute map_callrw with ' .. + '\'partial\' mode and nil bucket_ids') + end + if opts_mode == MAP_CALLRW_FULL and util.table_is_numeric(bucket_ids) then + return nil, lerror.make('Router can\'t execute map_callrw with ' .. + '\'full\' mode and numeric bucket_ids') + end + return opts_mode +end + -- -- Consistent Map-Reduce. The given function is called on masters in the cluster -- with a guarantee that in case of success it was executed with all buckets -- being accessible for reads and writes. -- --- The selection of masters depends on bucket_ids option. When specified, the --- Map-Reduce is performed only on masters having at least one of these buckets. --- Otherwise it is executed on all the masters in the cluster. +-- The selection of masters depends on 'mode' and 'bucket_ids' options. There +-- are 2 general modes how map_callrw can be executed: +-- 1) mode = 'partial'. In this mode user function will be executed on +-- storages that have at least one bucket of 'bucket_ids'. The +-- 'bucket_ids' option can be presented in two ways: like a numeric array +-- of buckets' ids or like a map of buckets' arguments. In first one user +-- function will only receive args, in second one it will additionally +-- receive buckets' arguments. +-- 2) mode = 'full'. In this mode user function will be executed with args on +-- all storages in cluster. If we pass 'bucket_ids' like a map of bucket's +-- arguments the user function will additionally receive buckets' +-- arguments on those storages that have at least one bucket of +-- 'bucket_ids'. +-- +-- If we didn't specify the 'mode' option, then it is set based on 'bucket_ids' +-- option - if 'bucket_ids' is presented, the mode will be 'partial' otherwise +-- 'full'. Also the next combination of map_callrw options can lead to error: +-- and . -- -- Consistency in scope of map-reduce means all the data was accessible, and -- didn't move during map requests execution. To preserve the consistency there @@ -1109,6 +1154,8 @@ end -- @param func Name of the function to call. -- @param args Function arguments passed in netbox style (as an array). -- @param opts Options. See below: +-- - mode - a string option ('full' / 'partial') that represents a way of +-- execution of user function on destination storages. -- - timeout - a number of seconds. Note that the refs may end up being kept -- on the storages during this entire timeout if something goes wrong. -- For instance, network issues appear. This means better not use a @@ -1132,8 +1179,13 @@ end -- local function router_map_callrw(router, func, args, opts) local replicasets_to_map, err, err_id, map, rid - local timeout, do_return_raw, bucket_ids, plain_bucket_ids, grouped_args + local mode, timeout, do_return_raw, bucket_ids, plain_bucket_ids, + grouped_args if opts then + mode, err = router_check_map_callrw_mode(opts.mode, opts.bucket_ids) + if err then + return nil, err + end timeout = opts.timeout or consts.CALL_TIMEOUT_MIN do_return_raw = opts.return_raw bucket_ids = opts.bucket_ids @@ -1141,6 +1193,7 @@ local function router_map_callrw(router, func, args, opts) util.table_keys(bucket_ids) else timeout = consts.CALL_TIMEOUT_MIN + mode = MAP_CALLRW_FULL end rid = M.ref_id M.ref_id = rid + 1 @@ -1148,24 +1201,26 @@ local function router_map_callrw(router, func, args, opts) timeout, err, err_id, replicasets_to_map = router_ref_storage_by_buckets(router, plain_bucket_ids, timeout, rid) - -- Grouped arguments are only possible with partial Map-Reduce. - if timeout then - grouped_args = router_group_map_callrw_args( - router, plain_bucket_ids, bucket_ids) + if not timeout then + goto fail end - else - timeout, err, err_id, replicasets_to_map = - router_ref_storage_all(router, timeout, rid) + grouped_args = router_group_map_callrw_args( + router, plain_bucket_ids, bucket_ids) end - if timeout then - map, err, err_id = replicasets_map_reduce(replicasets_to_map, rid, func, - args, grouped_args, { - timeout = timeout, return_raw = do_return_raw - }) - if map then - return map + if mode == MAP_CALLRW_FULL then + timeout, err, err_id, replicasets_to_map = + router_ref_storage_all(router, timeout, replicasets_to_map, rid) + if not timeout then + goto fail end end + opts = {timeout = timeout, return_raw = do_return_raw, mode = mode} + map, err, err_id = replicasets_map_reduce(replicasets_to_map, rid, func, + args, grouped_args, opts) + if map then + return map + end + ::fail:: replicasets_map_cancel_refs(replicasets_to_map, rid) err = lerror.make(err) return nil, err, err_id diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 97642250..0983c657 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3163,6 +3163,14 @@ local function storage_ref(rid, timeout) return bucket_count() end +local function storage_ref_check(rid) + local ok, err = lref.check(rid, box.session.id()) + if not ok then + return nil, err + end + return bucket_count() +end + -- -- Lookup buckets which are definitely not going to recover into ACTIVE state -- under any circumstances. @@ -3353,6 +3361,7 @@ service_call_api = setmetatable({ rebalancer_request_state = rebalancer_request_state, recovery_bucket_stat = recovery_bucket_stat, storage_ref = storage_ref, + storage_ref_check = storage_ref_check, storage_ref_make_with_buckets = storage_ref_make_with_buckets, storage_ref_check_with_buckets = storage_ref_check_with_buckets, storage_unref = storage_unref,