From c229a3e5e28f198167e55cbcd7ba24d05d4b0b5d Mon Sep 17 00:00:00 2001 From: hll1213181368 Date: Wed, 17 Dec 2025 08:54:46 +0800 Subject: [PATCH] Fix MigratingSlot maybe report the error of the cluster has been updated by others when updateCluster --- consts/context_key.go | 4 ++++ consts/errors.go | 2 +- server/api/cluster.go | 42 ++++++++++++++++++++++++++++++++++++++---- store/cluster.go | 4 ++-- store/cluster_test.go | 6 +++--- 5 files changed, 48 insertions(+), 10 deletions(-) diff --git a/consts/context_key.go b/consts/context_key.go index cfb0e3e0..eea54e15 100644 --- a/consts/context_key.go +++ b/consts/context_key.go @@ -30,3 +30,7 @@ const ( HeaderIsRedirect = "X-Is-Redirect" HeaderDontCheckClusterMode = "X-Dont-Check-Cluster-Mode" ) + +const ( + MigrateFailMaxRetry = 3 +) diff --git a/consts/errors.go b/consts/errors.go index bf7e51d2..24168715 100644 --- a/consts/errors.go +++ b/consts/errors.go @@ -38,5 +38,5 @@ var ( ErrShardIsServicing = errors.New("shard is servicing") ErrShardSlotIsMigrating = errors.New("shard slot is migrating") ErrShardNoMatchNewMaster = errors.New("no match new master in shard") - ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal") + ErrClusterUpdatedByOthers = errors.New("the cluster has been updated by others") ) diff --git a/server/api/cluster.go b/server/api/cluster.go index 52178891..21048098 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -153,10 +153,44 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { return } - err = handler.s.UpdateCluster(c, namespace, cluster) - if err != nil { - helper.ResponseError(c, err) - return + if err = handler.s.UpdateCluster(c, namespace, cluster); err != nil { + if !errors.Is(err, consts.ErrClusterUpdatedByOthers) { + helper.ResponseError(c, err) + return + } + + retryCount := 0 + for ; retryCount < consts.MigrateFailMaxRetry; retryCount++ { + newCluster, getErr := s.GetCluster(c, namespace, clusterName) + if getErr != nil { + err = getErr + continue + } + + sourceShardIdx, findErr := newCluster.FindShardIndexBySlot(req.Slot) + if findErr != nil { + err = findErr + continue + } + + newCluster.Shards[sourceShardIdx].MigratingSlot = &store.MigratingSlot{ + SlotRange: req.Slot, + IsMigrating: true, + } + newCluster.Shards[sourceShardIdx].TargetShardIndex = req.Target + + err = handler.s.UpdateCluster(c, namespace, newCluster) + + if !errors.Is(err, consts.ErrClusterUpdatedByOthers) { + helper.ResponseError(c, err) + return + } + } + + if err != nil { + helper.ResponseError(c, err) + return + } } helper.ResponseOK(c, gin.H{"cluster": cluster}) } diff --git a/store/cluster.go b/store/cluster.go index e33e41a1..2ffb1e54 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -176,7 +176,7 @@ func (cluster *Cluster) Reset(ctx context.Context) error { return nil } -func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { +func (cluster *Cluster) FindShardIndexBySlot(slot SlotRange) (int, error) { sourceShardIdx := -1 for i := 0; i < len(cluster.Shards); i++ { slotRanges := cluster.Shards[i].SlotRanges @@ -199,7 +199,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) { return consts.ErrIndexOutOfRange } - sourceShardIdx, err := cluster.findShardIndexBySlot(slot) + sourceShardIdx, err := cluster.FindShardIndexBySlot(slot) if err != nil { return err } diff --git a/store/cluster_test.go b/store/cluster_test.go index 975f03f6..b3d168ae 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -44,19 +44,19 @@ func TestCluster_FindIndexShardBySlot(t *testing.T) { slotRange, err := NewSlotRange(0, 0) require.NoError(t, err) - shard, err := cluster.findShardIndexBySlot(slotRange) + shard, err := cluster.FindShardIndexBySlot(slotRange) require.NoError(t, err) require.Equal(t, 0, shard) slotRange, err = NewSlotRange(MaxSlotID/3+1, MaxSlotID/3+1) require.NoError(t, err) - shard, err = cluster.findShardIndexBySlot(slotRange) + shard, err = cluster.FindShardIndexBySlot(slotRange) require.NoError(t, err) require.Equal(t, 1, shard) slotRange, err = NewSlotRange(MaxSlotID, MaxSlotID) require.NoError(t, err) - shard, err = cluster.findShardIndexBySlot(slotRange) + shard, err = cluster.FindShardIndexBySlot(slotRange) require.NoError(t, err) require.Equal(t, 2, shard) }