From 5c1c181c4431ee89f553a069260ff811c58dd0a3 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Wed, 16 Jul 2025 15:40:56 -0600 Subject: [PATCH 01/31] adds all config files to var/lib/kvctl --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index df53ba2f..c88aa91e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,7 +37,7 @@ COPY --from=build /kvctl/_build/kvctl ./bin/ VOLUME /var/lib/kvctl COPY ./LICENSE ./NOTICE ./licenses ./ -COPY ./config/config.yaml /var/lib/kvctl/ +COPY ./config/* /var/lib/kvctl/ EXPOSE 9379:9379 ENTRYPOINT ["./bin/kvctl-server", "-c", "/var/lib/kvctl/config.yaml"] From ef5be08de5d2baa0c62584625a4bc897f1bfe19a Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Wed, 16 Jul 2025 15:52:51 -0600 Subject: [PATCH 02/31] needs consul0 for local dev --- config/config-consul.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config-consul.yaml b/config/config-consul.yaml index 28f5e337..af52f932 100644 --- a/config/config-consul.yaml +++ b/config/config-consul.yaml @@ -27,7 +27,7 @@ storage_type: consul consul: addrs: - - "127.0.0.1:8500" + - "consul0:8500" elect_path: tls: enable: false From dacb630372e7c14e7f6e63891208584a66d79d02 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Wed, 16 Jul 2025 16:16:27 -0600 Subject: [PATCH 03/31] adds some more things required for local development --- config/config-consul.yaml | 2 +- scripts/docker/docker-compose.yml | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/config/config-consul.yaml b/config/config-consul.yaml index af52f932..658370ff 100644 --- a/config/config-consul.yaml +++ b/config/config-consul.yaml @@ -16,7 +16,7 @@ # under the License. # -addr: "127.0.0.1:9379" +addr: "0.0.0.0:9379" # Which store engine should be used by controller # options: etcd, zookeeper, raft, consul diff --git a/scripts/docker/docker-compose.yml b/scripts/docker/docker-compose.yml index 4e676153..d4e51dd4 100644 --- a/scripts/docker/docker-compose.yml +++ b/scripts/docker/docker-compose.yml @@ -21,14 +21,17 @@ version: "3.7" services: kvrocks0: build: ./kvrocks + networks: + - kvrocks-dev container_name: kvrocks-cluster ports: - "7770:7770" - "7771:7771" - etcd0: image: "quay.io/coreos/etcd:v3.5.17" + networks: + - kvrocks-dev container_name: etcd0 ports: - "2380:2380" @@ -46,12 +49,16 @@ services: zookeeper0: image: "zookeeper:latest" + networks: + - kvrocks-dev container_name: zookeeper0 ports: - "2181:2181" consul0: image: hashicorp/consul:latest + networks: + - kvrocks-dev container_name: consul0 ports: - "8500:8500" @@ -59,6 +66,8 @@ services: postgres0: build: ./pg-dockerfile + networks: + - kvrocks-dev container_name: postgres0 environment: POSTGRES_USER: postgres @@ -68,3 +77,7 @@ services: - "5432:5432" volumes: - ./pg-init-scripts:/docker-entrypoint-initdb.d + +networks: + kvrocks-dev: + external: true From 0d4ee9e42528da078e13407ff1f10cef2e1fafcd Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 24 Jul 2025 15:57:12 -0600 Subject: [PATCH 04/31] adds a queue so we can find requests that can't be made yet and queue them up --- cmd/client/command/consts.go | 5 +++ cmd/client/command/migrate.go | 12 ++++++- consts/errors.go | 1 + controller/cluster.go | 5 +++ server/api/cluster.go | 14 +++++--- server/api/cluster_test.go | 4 +-- store/cluster.go | 67 ++++++++++++++++++++++++++++++++--- store/cluster_node.go | 3 ++ store/slot.go | 2 ++ 9 files changed, 102 insertions(+), 11 deletions(-) diff --git a/cmd/client/command/consts.go b/cmd/client/command/consts.go index 59b6ef5a..1782d1d8 100644 --- a/cmd/client/command/consts.go +++ b/cmd/client/command/consts.go @@ -26,3 +26,8 @@ const ( ResourceShard = "shard" ResourceNode = "node" ) + +const ( + MigrateSlot = "slot" + MigrateCancel = "cancel" +) diff --git a/cmd/client/command/migrate.go b/cmd/client/command/migrate.go index 009b4169..74f4be33 100644 --- a/cmd/client/command/migrate.go +++ b/cmd/client/command/migrate.go @@ -46,6 +46,9 @@ var MigrateCommand = &cobra.Command{ Example: ` # Migrate slot between cluster shards kvctl migrate slot --target -n -c + +# Cancel the queue for migration - does not stop the current migration +kvctl migrate cancel -n -c `, PreRunE: migrationPreRun, RunE: func(cmd *cobra.Command, args []string) error { @@ -53,8 +56,10 @@ kvctl migrate slot --target -n -c --target -n -c 0 +} + +func (m *MigrationQueue) Clear() { + m.Data = nil } func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { @@ -72,7 +107,7 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { shards = append(shards, shard) } - cluster := &Cluster{Name: name, Shards: shards} + cluster := &Cluster{Name: name, Shards: shards, MigrationQueue: MigrationQueue{}} cluster.Version.Store(1) return cluster, nil } @@ -195,6 +230,29 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { return sourceShardIdx, nil } +func (cluster *Cluster) MigrateNextSlots(ctx context.Context) error { + if !cluster.MigrationQueue.Available() { + return consts.ErrNoMigrationsAvailable + } + + var newQueue []Migration + for cluster.MigrationQueue.Available() { + request, ok := cluster.MigrationQueue.Dequeue() + if !ok { + break + } + err := cluster.MigrateSlot(ctx, request.Slot, request.Target, request.SlotOnly) + if errors.Is(err, consts.ErrShardSlotIsMigrating) { + newQueue = append(newQueue, request) + } + } + + for i := 0; i < len(newQueue); i++ { + cluster.MigrationQueue.Enqueue(newQueue[i]) + } + return nil +} + func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetShardIdx int, slotOnly bool) error { if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) { return consts.ErrIndexOutOfRange @@ -215,6 +273,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS } if cluster.Shards[sourceShardIdx].IsMigrating() || cluster.Shards[targetShardIdx].IsMigrating() { + // cluster.MigrationQueue.Add(Migration{Target: targetShardIdx, Slot: slot, SlotOnly: slotOnly}) return consts.ErrShardSlotIsMigrating } // Send the migration command to the source node diff --git a/store/cluster_node.go b/store/cluster_node.go index 8d038d0d..925c1b60 100644 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -149,6 +149,9 @@ func (n *ClusterNode) GetClient() *redis.Client { } } + // TODO: bseto consider making these configurable, I'm seeing a lot of errors + // whenever we try to update the cluster. Lots of client connection spam, could + // be related to the timeouts? client := redis.NewClient(&redis.Options{ Addr: n.addr, Password: n.password, diff --git a/store/slot.go b/store/slot.go index 394ced52..0fa870fa 100644 --- a/store/slot.go +++ b/store/slot.go @@ -44,6 +44,8 @@ type SlotRange struct { type SlotRanges []SlotRange +// MigratingSlot is a wrapper around SlotRange but it's meant to be used +// to store the migration state in the cluster information. type MigratingSlot struct { SlotRange IsMigrating bool From 102bb467876b338f59486029b624f1d46b037b86 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 24 Jul 2025 16:10:34 -0600 Subject: [PATCH 05/31] changes the config-consul.yaml back to what it was before --- config/config-consul.yaml | 4 +-- config/config-local-dev-consul.yaml | 49 +++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 config/config-local-dev-consul.yaml diff --git a/config/config-consul.yaml b/config/config-consul.yaml index 658370ff..28f5e337 100644 --- a/config/config-consul.yaml +++ b/config/config-consul.yaml @@ -16,7 +16,7 @@ # under the License. # -addr: "0.0.0.0:9379" +addr: "127.0.0.1:9379" # Which store engine should be used by controller # options: etcd, zookeeper, raft, consul @@ -27,7 +27,7 @@ storage_type: consul consul: addrs: - - "consul0:8500" + - "127.0.0.1:8500" elect_path: tls: enable: false diff --git a/config/config-local-dev-consul.yaml b/config/config-local-dev-consul.yaml new file mode 100644 index 00000000..658370ff --- /dev/null +++ b/config/config-local-dev-consul.yaml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +addr: "0.0.0.0:9379" + +# Which store engine should be used by controller +# options: etcd, zookeeper, raft, consul +# Note: the raft engine is an experimental feature and is not recommended for production use. +# +# default: etcd +storage_type: consul + +consul: + addrs: + - "consul0:8500" + elect_path: + tls: + enable: false + cert_file: + key_file: + ca_file: + +controller: + failover: + ping_interval_seconds: 3 + min_alive_size: 5 +# Uncomment this part to save logs to filename instead of stdout +#log: +# level: info +# filename: /data/logs/kvctl.log +# max_backups: 10 +# max_age: 7 +# max_size: 100 +# compress: false From 0d582729449ae3fe7d19423a04c2da414c050d6d Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 25 Jul 2025 16:04:09 -0600 Subject: [PATCH 06/31] changes slot to slot range instead of ranges --- controller/cluster.go | 3 ++- server/api/cluster.go | 19 ++++++++++++++----- server/api/cluster_test.go | 4 ++-- store/cluster.go | 6 ++++-- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index baa862c2..aed39f5d 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -378,7 +378,8 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu c.updateCluster(clonedCluster) if clonedCluster.MigrationQueue.Available() { - clonedCluster.MigrateNextSlots(ctx) + clonedCluster.MigrateAvailableSlots(ctx) + c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster) c.updateCluster(clonedCluster) } default: diff --git a/server/api/cluster.go b/server/api/cluster.go index 2752ee37..79fa7543 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -26,7 +26,9 @@ import ( "strings" "sync" + "github.com/apache/kvrocks-controller/logger" "github.com/gin-gonic/gin" + "go.uber.org/zap" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/server/helper" @@ -34,9 +36,10 @@ import ( ) type MigrateSlotRequest struct { - Target int `json:"target" validate:"required"` - Slot store.SlotRanges `json:"slot" validate:"required"` - SlotOnly bool `json:"slot_only"` + Target int `json:"target" validate:"required"` + Slot store.SlotRange `json:"slot" validate:"required"` + // Slot store.SlotRanges `json:"slot" validate:"required"` // TODO: we need to make the unmarshal work for ranges still + SlotOnly bool `json:"slot_only"` } type CreateClusterRequest struct { @@ -147,11 +150,17 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { return } + log := logger.Get().With( + zap.String("namespace", namespace), + zap.String("cluster", cluster.Name)) + + log.Info("migrate slot!") // TODO: Need to modify MigrateSlot to be able to add to queue. // and then use a loop and call migrateSlot multiple times depending on req.Slot - err = cluster.MigrateSlot(c, req.Slot[0], req.Target, req.SlotOnly) + err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly) if errors.Is(err, consts.ErrShardSlotIsMigrating) { - cluster.MigrationQueue.Enqueue(store.Migration{Target: req.Target, Slot: req.Slot[0], SlotOnly: req.SlotOnly}) + log.Info("slot was migrating but we're going to queue it up") + cluster.MigrationQueue.Enqueue(store.Migration{Target: req.Target, Slot: req.Slot, SlotOnly: req.SlotOnly}) helper.ResponseOK(c, gin.H{"cluster": cluster}) } if err != nil { diff --git a/server/api/cluster_test.go b/server/api/cluster_test.go index 891dee5d..5d3aa8d6 100644 --- a/server/api/cluster_test.go +++ b/server/api/cluster_test.go @@ -131,7 +131,7 @@ func TestClusterBasics(t *testing.T) { slotRange, err := store.NewSlotRange(3, 3) require.NoError(t, err) testMigrateReq := &MigrateSlotRequest{ - Slot: store.SlotRanges{slotRange}, + Slot: slotRange, SlotOnly: true, Target: 1, } @@ -234,7 +234,7 @@ func TestClusterMigrateData(t *testing.T) { reqCtx := GetTestContext(recorder) reqCtx.Set(consts.ContextKeyStore, handler.s) reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: cluster}} - body, err := json.Marshal(&MigrateSlotRequest{Target: 1, Slot: store.SlotRanges{slotRange}}) + body, err := json.Marshal(&MigrateSlotRequest{Target: 1, Slot: slotRange}) require.NoError(t, err) reqCtx.Request.Body = io.NopCloser(bytes.NewBuffer(body)) diff --git a/store/cluster.go b/store/cluster.go index f4960602..2f0d058b 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -37,7 +37,7 @@ type Cluster struct { Name string `json:"name"` Version atomic.Int64 `json:"-"` Shards []*Shard `json:"shards"` - MigrationQueue MigrationQueue `json:"-"` // just testing for now + MigrationQueue MigrationQueue `json:"migration_queue"` } type MigrationQueue struct { @@ -230,7 +230,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { return sourceShardIdx, nil } -func (cluster *Cluster) MigrateNextSlots(ctx context.Context) error { +func (cluster *Cluster) MigrateAvailableSlots(ctx context.Context) error { if !cluster.MigrationQueue.Available() { return consts.ErrNoMigrationsAvailable } @@ -247,6 +247,8 @@ func (cluster *Cluster) MigrateNextSlots(ctx context.Context) error { } } + // any of the requests that got rejected because shard is already migrating will be put + // into a queue again for i := 0; i < len(newQueue); i++ { cluster.MigrationQueue.Enqueue(newQueue[i]) } From 8bf5bb8101b83c37e4fdfabbf3314bbe9a362f62 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 31 Jul 2025 15:11:24 -0600 Subject: [PATCH 07/31] doing some debugging --- controller/cluster.go | 17 ++++++++++++++++- server/api/cluster.go | 6 ++++-- store/cluster.go | 4 ++++ store/store.go | 9 ++++++++- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index aed39f5d..85180545 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -316,7 +316,13 @@ func (c *ClusterChecker) probeLoop() { func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { c.clusterMu.Lock() + log := logger.Get().With( + zap.String("cluster", cluster.Name)) + oldQueue := c.cluster.MigrationQueue + log.Info("old queue length", zap.Int("len", len(oldQueue.Data))) c.cluster = cluster + c.cluster.MigrationQueue = oldQueue + log.Info("new cluster queue length", zap.Int("len", len(c.cluster.MigrationQueue.Data))) c.clusterMu.Unlock() } @@ -364,6 +370,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": + log.Info("successful migration of slot", zap.String("debug", "byron")) clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange) clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange, @@ -377,10 +384,18 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu } c.updateCluster(clonedCluster) + log.Info("checking if migration queue is available", zap.String("debug", "byron")) if clonedCluster.MigrationQueue.Available() { - clonedCluster.MigrateAvailableSlots(ctx) + log.Info("should be available, should trigger some more? ", zap.String("debug", "byron")) + err = clonedCluster.MigrateAvailableSlots(ctx) + if err != nil { + log.Error("not available?", zap.Error(err)) + continue + } c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster) c.updateCluster(clonedCluster) + } else { + log.Info("not available", zap.String("debug", "byron")) } default: clonedCluster.Shards[i].ClearMigrateState() diff --git a/server/api/cluster.go b/server/api/cluster.go index 79fa7543..d48333a4 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -159,10 +159,12 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { // and then use a loop and call migrateSlot multiple times depending on req.Slot err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly) if errors.Is(err, consts.ErrShardSlotIsMigrating) { + err = nil log.Info("slot was migrating but we're going to queue it up") cluster.MigrationQueue.Enqueue(store.Migration{Target: req.Target, Slot: req.Slot, SlotOnly: req.SlotOnly}) - helper.ResponseOK(c, gin.H{"cluster": cluster}) + // helper.ResponseOK(c, gin.H{"cluster": cluster}) } + if err != nil { helper.ResponseError(c, err) return @@ -171,7 +173,7 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { if req.SlotOnly { err = handler.s.UpdateCluster(c, namespace, cluster) } else { - // The version should be increased after the slot migration is done + log.Info("setting cluster from MigrateSlot call") err = handler.s.SetCluster(c, namespace, cluster) } if err != nil { diff --git a/store/cluster.go b/store/cluster.go index 2f0d058b..eecb02be 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -31,6 +31,8 @@ import ( "sync/atomic" "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/logger" + "go.uber.org/zap" ) type Cluster struct { @@ -67,6 +69,8 @@ func (m *MigrationQueue) Dequeue() (Migration, bool) { } func (m *MigrationQueue) Available() bool { + log := logger.Get() + log.Info("length", zap.Int("len", len(m.Data))) return len(m.Data) > 0 } diff --git a/store/store.go b/store/store.go index fc319fcc..7a9eae05 100644 --- a/store/store.go +++ b/store/store.go @@ -24,9 +24,10 @@ import ( "context" "encoding/json" "fmt" + "sync" + "github.com/apache/kvrocks-controller/logger" "go.uber.org/zap" - "sync" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/store/engine" @@ -186,6 +187,9 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo return fmt.Errorf("the cluster has been updated by others") } + // We want the most up to date queue + clusterInfo.MigrationQueue = oldCluster.MigrationQueue + clusterInfo.Version.Add(1) clusterBytes, err := json.Marshal(clusterInfo) if err != nil { @@ -207,6 +211,9 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo // SetCluster set the cluster to store under the specified namespace but won't increase the version. func (s *ClusterStore) SetCluster(ctx context.Context, ns string, clusterInfo *Cluster) error { + log := logger.Get().With( + zap.String("inside", "hello")) + log.Info("setting cluster") lock := s.getLock(ns, clusterInfo.Name) lock.Lock() defer lock.Unlock() From 10c368cad2247b7e49a12dbe1c4221afde149826 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 31 Jul 2025 15:16:57 -0600 Subject: [PATCH 08/31] removes old logging --- controller/cluster.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 85180545..8473b7c8 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -316,13 +316,7 @@ func (c *ClusterChecker) probeLoop() { func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { c.clusterMu.Lock() - log := logger.Get().With( - zap.String("cluster", cluster.Name)) - oldQueue := c.cluster.MigrationQueue - log.Info("old queue length", zap.Int("len", len(oldQueue.Data))) c.cluster = cluster - c.cluster.MigrationQueue = oldQueue - log.Info("new cluster queue length", zap.Int("len", len(c.cluster.MigrationQueue.Data))) c.clusterMu.Unlock() } From cd45e8f8d3276ef12e3342b7688a8d8f8d88f93c Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 14:50:50 -0600 Subject: [PATCH 09/31] adds additional kvrocks nodes to test parallel and sequential migrations --- controller/cluster_test.go | 64 +++++++++++++++++++++++++++++++ scripts/docker/docker-compose.yml | 2 + scripts/docker/kvrocks/Dockerfile | 8 +++- scripts/setup.sh | 2 +- server/api/cluster.go | 9 ----- store/cluster.go | 48 ++++++++++++++--------- store/cluster_test.go | 47 ++++++++++++++++++++++- 7 files changed, 147 insertions(+), 33 deletions(-) diff --git a/controller/cluster_test.go b/controller/cluster_test.go index a1a720a6..5065e79b 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -235,3 +235,67 @@ func TestCluster_MigrateSlot(t *testing.T) { defer ticker.Stop() <-ticker.C } + +func TestCluster_MigrateQueuedSlot(t *testing.T) { + ctx := context.Background() + ns := "test-ns" + clusterName := "test-clusterProbe" + nodes := []string{"127.0.0.1:7770", "127.0.0.1:7771", "127.0.0.1:7772", "127.0.0.1:7773"} + // nodes := []string{"127.0.0.1:7770", "127.0.0.1:7771"} + cluster, err := store.NewCluster(clusterName, nodes, 1) + require.NoError(t, err) + + require.NoError(t, cluster.Reset(ctx)) + require.NoError(t, cluster.SyncToNodes(ctx)) + defer func() { + require.NoError(t, cluster.Reset(ctx)) + }() + slotRange, err := store.NewSlotRange(1, 1) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 2, false)) + slotRange, err = store.NewSlotRange(4097, 4097) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) + slotRange, err = store.NewSlotRange(2, 2) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 2, false)) + slotRange, err = store.NewSlotRange(4098, 4098) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) + + // only 2 should be queue'd because slot(1,1) and slot(4097,4097) should have no + // shard conflict and can run in parallel + // slot(2,2) and slot(4098,4098) will be blocked and queue'd up + require.Equal(t, 2, len(cluster.MigrationQueue.Data), "expected migration queue to be len 2") + + s := NewMockClusterStore() + require.NoError(t, s.CreateCluster(ctx, ns, cluster)) + + clusterProbe := NewClusterChecker(s, ns, clusterName) + clusterProbe.WithPingInterval(200 * time.Millisecond) // minimum is 200 milliseconds + clusterProbe.Start() + defer clusterProbe.Close() + + ticker := time.NewTicker(4000 * time.Millisecond) + defer ticker.Stop() + <-ticker.C + cluster, err = s.GetCluster(ctx, ns, clusterName) + require.NoError(t, err) + require.Equal(t, 0, len(cluster.MigrationQueue.Data), "expected migration queue to be len 0") + + // we expect slots 1, and 2 on shard 2, from shard 0 + expectedSlotRange1, err := store.NewSlotRange(1, 2) + require.NoError(t, err) + expectedSlotRange2, err := store.NewSlotRange(8192, 12287) + require.NoError(t, err) + expectedSlotRanges := []store.SlotRange{expectedSlotRange1, expectedSlotRange2} + require.Equal(t, expectedSlotRanges, cluster.Shards[2].SlotRanges) + + // we expect slots 4097, and 4098 on shard 3 from shard 1 + expectedSlotRange1, err = store.NewSlotRange(4097, 4098) + require.NoError(t, err) + expectedSlotRange2, err = store.NewSlotRange(12288, 16383) + require.NoError(t, err) + expectedSlotRanges = []store.SlotRange{expectedSlotRange1, expectedSlotRange2} + require.Equal(t, expectedSlotRanges, cluster.Shards[3].SlotRanges) +} diff --git a/scripts/docker/docker-compose.yml b/scripts/docker/docker-compose.yml index d4e51dd4..d098933d 100644 --- a/scripts/docker/docker-compose.yml +++ b/scripts/docker/docker-compose.yml @@ -27,6 +27,8 @@ services: ports: - "7770:7770" - "7771:7771" + - "7772:7772" + - "7773:7773" etcd0: image: "quay.io/coreos/etcd:v3.5.17" diff --git a/scripts/docker/kvrocks/Dockerfile b/scripts/docker/kvrocks/Dockerfile index b3fef976..01300ddf 100644 --- a/scripts/docker/kvrocks/Dockerfile +++ b/scripts/docker/kvrocks/Dockerfile @@ -1,12 +1,16 @@ FROM apache/kvrocks:latest USER root -RUN mkdir /tmp/kvrocks7770 /tmp/kvrocks7771 +RUN mkdir /tmp/kvrocks7770 /tmp/kvrocks7771 /tmp/kvrocks7772 /tmp/kvrocks7773 RUN echo "kvrocks -c /var/lib/kvrocks/kvrocks.conf --port 7770 --dir /tmp/kvrocks7770 --daemonize yes --cluster-enabled yes --bind 0.0.0.0" >> start.sh -RUN echo "kvrocks -c /var/lib/kvrocks/kvrocks.conf --port 7771 --dir /tmp/kvrocks7771 --cluster-enabled yes --bind 0.0.0.0" >> start.sh +RUN echo "kvrocks -c /var/lib/kvrocks/kvrocks.conf --port 7771 --dir /tmp/kvrocks7771 --daemonize yes --cluster-enabled yes --bind 0.0.0.0" >> start.sh +RUN echo "kvrocks -c /var/lib/kvrocks/kvrocks.conf --port 7772 --dir /tmp/kvrocks7772 --daemonize yes --cluster-enabled yes --bind 0.0.0.0" >> start.sh +RUN echo "kvrocks -c /var/lib/kvrocks/kvrocks.conf --port 7773 --dir /tmp/kvrocks7773 --cluster-enabled yes --bind 0.0.0.0" >> start.sh RUN chmod +x start.sh EXPOSE 7770:7770 EXPOSE 7771:7771 +EXPOSE 7772:7772 +EXPOSE 7773:7773 ENTRYPOINT ["sh","start.sh"] diff --git a/scripts/setup.sh b/scripts/setup.sh index 6a23d336..94278724 100644 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -16,4 +16,4 @@ # under the License. # -cd docker && docker compose -p kvrocks-controller up -d --force-recreate && cd .. +cd docker && docker compose -p kvrocks-controller up -d --force-recreate --build kvrocks0 && cd .. diff --git a/server/api/cluster.go b/server/api/cluster.go index d48333a4..12eb9ee1 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -155,16 +155,7 @@ func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { zap.String("cluster", cluster.Name)) log.Info("migrate slot!") - // TODO: Need to modify MigrateSlot to be able to add to queue. - // and then use a loop and call migrateSlot multiple times depending on req.Slot err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly) - if errors.Is(err, consts.ErrShardSlotIsMigrating) { - err = nil - log.Info("slot was migrating but we're going to queue it up") - cluster.MigrationQueue.Enqueue(store.Migration{Target: req.Target, Slot: req.Slot, SlotOnly: req.SlotOnly}) - // helper.ResponseOK(c, gin.H{"cluster": cluster}) - } - if err != nil { helper.ResponseError(c, err) return diff --git a/store/cluster.go b/store/cluster.go index eecb02be..3e9e894a 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -55,6 +55,14 @@ type Migration struct { SlotOnly bool `json:"slot_only"` } +func (m *MigrationQueue) Clone() MigrationQueue { + q := MigrationQueue{ + Data: make([]Migration, len(m.Data)), + } + copy(q.Data, m.Data) + return q +} + func (m *MigrationQueue) Enqueue(migration Migration) { m.Data = append(m.Data, migration) } @@ -69,8 +77,6 @@ func (m *MigrationQueue) Dequeue() (Migration, bool) { } func (m *MigrationQueue) Available() bool { - log := logger.Get() - log.Info("length", zap.Int("len", len(m.Data))) return len(m.Data) > 0 } @@ -118,8 +124,9 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { func (cluster *Cluster) Clone() *Cluster { clone := &Cluster{ - Name: cluster.Name, - Shards: make([]*Shard, 0), + Name: cluster.Name, + Shards: make([]*Shard, 0), + MigrationQueue: cluster.MigrationQueue.Clone(), } clone.Version.Store(cluster.Version.Load()) for _, shard := range cluster.Shards { @@ -235,31 +242,28 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { } func (cluster *Cluster) MigrateAvailableSlots(ctx context.Context) error { + log := logger.Get() + log.Info("about to migrate available slots") if !cluster.MigrationQueue.Available() { return consts.ErrNoMigrationsAvailable } - var newQueue []Migration - for cluster.MigrationQueue.Available() { - request, ok := cluster.MigrationQueue.Dequeue() - if !ok { - break - } + log.Info("about to go through the loop") + queueCopy := cluster.MigrationQueue.Clone().Data + cluster.MigrationQueue.Clear() + for _, request := range queueCopy { + log.Info("dequeue", zap.String("request", request.Slot.String())) err := cluster.MigrateSlot(ctx, request.Slot, request.Target, request.SlotOnly) - if errors.Is(err, consts.ErrShardSlotIsMigrating) { - newQueue = append(newQueue, request) + if err != nil { + return err } } - // any of the requests that got rejected because shard is already migrating will be put - // into a queue again - for i := 0; i < len(newQueue); i++ { - cluster.MigrationQueue.Enqueue(newQueue[i]) - } return nil } func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetShardIdx int, slotOnly bool) error { + log := logger.Get() if targetShardIdx < 0 || targetShardIdx >= len(cluster.Shards) { return consts.ErrIndexOutOfRange } @@ -279,8 +283,14 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS } if cluster.Shards[sourceShardIdx].IsMigrating() || cluster.Shards[targetShardIdx].IsMigrating() { - // cluster.MigrationQueue.Add(Migration{Target: targetShardIdx, Slot: slot, SlotOnly: slotOnly}) - return consts.ErrShardSlotIsMigrating + log.Info( + "source or target shard is migrating, queueing up", + zap.String("slot", slot.String()), + zap.Int("source", sourceShardIdx), + zap.Int("target", targetShardIdx), + ) + cluster.MigrationQueue.Enqueue(Migration{Target: targetShardIdx, Slot: slot, SlotOnly: slotOnly}) + return nil } // Send the migration command to the source node sourceMasterNode := cluster.Shards[sourceShardIdx].GetMasterNode() diff --git a/store/cluster_test.go b/store/cluster_test.go index 975f03f6..11aa3962 100644 --- a/store/cluster_test.go +++ b/store/cluster_test.go @@ -22,11 +22,11 @@ package store import ( "context" + "reflect" "testing" - "github.com/stretchr/testify/require" - "github.com/apache/kvrocks-controller/consts" + "github.com/stretchr/testify/require" ) func TestCluster_Clone(t *testing.T) { @@ -106,3 +106,46 @@ func TestCluster_PromoteNewMaster(t *testing.T) { require.NoError(t, err) require.Equal(t, node2.ID(), newMasterID) } + +func TestMigrationQueue_Dequeue(t *testing.T) { + type fields struct { + Data []Migration + } + tests := []struct { + name string + fields fields + want Migration + ok bool + }{ + { + name: "single item dequeue", + fields: fields{ + Data: []Migration{{Target: 1}}, + }, + want: Migration{Target: 1}, + ok: true, + }, + { + name: "empty dequeue", + fields: fields{ + Data: []Migration{}, + }, + want: Migration{}, + ok: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &MigrationQueue{ + Data: tt.fields.Data, + } + got, ok := m.Dequeue() + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("MigrationQueue.Dequeue() got = %v, want %v", got, tt.want) + } + if ok != tt.ok { + t.Errorf("MigrationQueue.Dequeue() got1 = %v, want %v", ok, tt.ok) + } + }) + } +} From c6c1e19a7188d4e4adece8274ca8852cbfbd8260 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 15:38:36 -0600 Subject: [PATCH 10/31] fixes up some tests --- controller/cluster_test.go | 17 ++++++++++------- scripts/setup.sh | 2 +- server/api/cluster.go | 7 +++---- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/controller/cluster_test.go b/controller/cluster_test.go index 5065e79b..f9769612 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -241,7 +241,6 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { ns := "test-ns" clusterName := "test-clusterProbe" nodes := []string{"127.0.0.1:7770", "127.0.0.1:7771", "127.0.0.1:7772", "127.0.0.1:7773"} - // nodes := []string{"127.0.0.1:7770", "127.0.0.1:7771"} cluster, err := store.NewCluster(clusterName, nodes, 1) require.NoError(t, err) @@ -250,16 +249,20 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { defer func() { require.NoError(t, cluster.Reset(ctx)) }() + + // migrate slots from shard 0 to shard 2 slotRange, err := store.NewSlotRange(1, 1) require.NoError(t, err) require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 2, false)) - slotRange, err = store.NewSlotRange(4097, 4097) - require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) slotRange, err = store.NewSlotRange(2, 2) require.NoError(t, err) require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 2, false)) - slotRange, err = store.NewSlotRange(4098, 4098) + + // migrate slots from shard 1 to shard 3 + slotRange, err = store.NewSlotRange(4096, 4096) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) + slotRange, err = store.NewSlotRange(4097, 4097) require.NoError(t, err) require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) @@ -276,7 +279,7 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { clusterProbe.Start() defer clusterProbe.Close() - ticker := time.NewTicker(4000 * time.Millisecond) + ticker := time.NewTicker(6000 * time.Millisecond) defer ticker.Stop() <-ticker.C cluster, err = s.GetCluster(ctx, ns, clusterName) @@ -292,7 +295,7 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { require.Equal(t, expectedSlotRanges, cluster.Shards[2].SlotRanges) // we expect slots 4097, and 4098 on shard 3 from shard 1 - expectedSlotRange1, err = store.NewSlotRange(4097, 4098) + expectedSlotRange1, err = store.NewSlotRange(4096, 4097) require.NoError(t, err) expectedSlotRange2, err = store.NewSlotRange(12288, 16383) require.NoError(t, err) diff --git a/scripts/setup.sh b/scripts/setup.sh index 94278724..6a23d336 100644 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -16,4 +16,4 @@ # under the License. # -cd docker && docker compose -p kvrocks-controller up -d --force-recreate --build kvrocks0 && cd .. +cd docker && docker compose -p kvrocks-controller up -d --force-recreate && cd .. diff --git a/server/api/cluster.go b/server/api/cluster.go index 12eb9ee1..5f7fd798 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -36,10 +36,9 @@ import ( ) type MigrateSlotRequest struct { - Target int `json:"target" validate:"required"` - Slot store.SlotRange `json:"slot" validate:"required"` - // Slot store.SlotRanges `json:"slot" validate:"required"` // TODO: we need to make the unmarshal work for ranges still - SlotOnly bool `json:"slot_only"` + Target int `json:"target" validate:"required"` + Slot store.SlotRange `json:"slot" validate:"required"` + SlotOnly bool `json:"slot_only"` } type CreateClusterRequest struct { From 7323879769ca8e94a734fe8b388d5685653f3d34 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 16:11:23 -0600 Subject: [PATCH 11/31] updates log info --- store/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/cluster.go b/store/cluster.go index 3e9e894a..05895350 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -252,7 +252,7 @@ func (cluster *Cluster) MigrateAvailableSlots(ctx context.Context) error { queueCopy := cluster.MigrationQueue.Clone().Data cluster.MigrationQueue.Clear() for _, request := range queueCopy { - log.Info("dequeue", zap.String("request", request.Slot.String())) + log.Info("migrating", zap.String("request", request.Slot.String())) err := cluster.MigrateSlot(ctx, request.Slot, request.Target, request.SlotOnly) if err != nil { return err From 165decec2db83e32d65cdf0584395b6620cf2a0f Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 16:11:32 -0600 Subject: [PATCH 12/31] comments out migrateslot test --- controller/cluster_test.go | 74 +++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/controller/cluster_test.go b/controller/cluster_test.go index f9769612..2bae6402 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -207,34 +207,34 @@ func TestCluster_LoadAndProbe(t *testing.T) { } } -func TestCluster_MigrateSlot(t *testing.T) { - ctx := context.Background() - ns := "test-ns" - clusterName := "test-clusterProbe" - cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:7770", "127.0.0.1:7771"}, 1) - require.NoError(t, err) - - require.NoError(t, cluster.Reset(ctx)) - require.NoError(t, cluster.SyncToNodes(ctx)) - defer func() { - require.NoError(t, cluster.Reset(ctx)) - }() - slotRange, err := store.NewSlotRange(0, 0) - require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false)) - - s := NewMockClusterStore() - require.NoError(t, s.CreateCluster(ctx, ns, cluster)) - - clusterProbe := NewClusterChecker(s, ns, clusterName) - clusterProbe.WithPingInterval(100 * time.Millisecond) - clusterProbe.Start() - defer clusterProbe.Close() - - ticker := time.NewTicker(400 * time.Millisecond) - defer ticker.Stop() - <-ticker.C -} +// func TestCluster_MigrateSlot(t *testing.T) { +// ctx := context.Background() +// ns := "test-ns" +// clusterName := "test-clusterProbe" +// cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:7770", "127.0.0.1:7771"}, 1) +// require.NoError(t, err) +// +// require.NoError(t, cluster.Reset(ctx)) +// require.NoError(t, cluster.SyncToNodes(ctx)) +// defer func() { +// require.NoError(t, cluster.Reset(ctx)) +// }() +// slotRange, err := store.NewSlotRange(0, 0) +// require.NoError(t, err) +// require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false)) +// +// s := NewMockClusterStore() +// require.NoError(t, s.CreateCluster(ctx, ns, cluster)) +// +// clusterProbe := NewClusterChecker(s, ns, clusterName) +// clusterProbe.WithPingInterval(100 * time.Millisecond) +// clusterProbe.Start() +// defer clusterProbe.Close() +// +// ticker := time.NewTicker(2000 * time.Millisecond) +// defer ticker.Stop() +// <-ticker.C +// } func TestCluster_MigrateQueuedSlot(t *testing.T) { ctx := context.Background() @@ -251,20 +251,20 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { }() // migrate slots from shard 0 to shard 2 - slotRange, err := store.NewSlotRange(1, 1) + slotRange1, err := store.NewSlotRange(1, 1) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 2, false)) - slotRange, err = store.NewSlotRange(2, 2) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange1, 2, false)) + slotRange2, err := store.NewSlotRange(2, 2) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 2, false)) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange2, 2, false)) // migrate slots from shard 1 to shard 3 - slotRange, err = store.NewSlotRange(4096, 4096) + slotRange3, err := store.NewSlotRange(4097, 4097) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) - slotRange, err = store.NewSlotRange(4097, 4097) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange3, 3, false)) + slotRange4, err := store.NewSlotRange(4098, 4098) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 3, false)) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange4, 3, false)) // only 2 should be queue'd because slot(1,1) and slot(4097,4097) should have no // shard conflict and can run in parallel @@ -295,7 +295,7 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { require.Equal(t, expectedSlotRanges, cluster.Shards[2].SlotRanges) // we expect slots 4097, and 4098 on shard 3 from shard 1 - expectedSlotRange1, err = store.NewSlotRange(4096, 4097) + expectedSlotRange1, err = store.NewSlotRange(4097, 4098) require.NoError(t, err) expectedSlotRange2, err = store.NewSlotRange(12288, 16383) require.NoError(t, err) From 881cde2dea2dd1d1471f3666582c255a2d27f955 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 16:54:27 -0600 Subject: [PATCH 13/31] fixed test --- controller/cluster_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/controller/cluster_test.go b/controller/cluster_test.go index 2bae6402..682c8123 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -250,19 +250,19 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { require.NoError(t, cluster.Reset(ctx)) }() - // migrate slots from shard 0 to shard 2 + // migrate slots from shard 0 to shard 1 slotRange1, err := store.NewSlotRange(1, 1) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange1, 2, false)) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange1, 1, false)) slotRange2, err := store.NewSlotRange(2, 2) require.NoError(t, err) - require.NoError(t, cluster.MigrateSlot(ctx, slotRange2, 2, false)) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange2, 1, false)) - // migrate slots from shard 1 to shard 3 - slotRange3, err := store.NewSlotRange(4097, 4097) + // migrate slots from shard 2 to shard 3 + slotRange3, err := store.NewSlotRange(8192, 8192) require.NoError(t, err) require.NoError(t, cluster.MigrateSlot(ctx, slotRange3, 3, false)) - slotRange4, err := store.NewSlotRange(4098, 4098) + slotRange4, err := store.NewSlotRange(8193, 8193) require.NoError(t, err) require.NoError(t, cluster.MigrateSlot(ctx, slotRange4, 3, false)) @@ -289,13 +289,13 @@ func TestCluster_MigrateQueuedSlot(t *testing.T) { // we expect slots 1, and 2 on shard 2, from shard 0 expectedSlotRange1, err := store.NewSlotRange(1, 2) require.NoError(t, err) - expectedSlotRange2, err := store.NewSlotRange(8192, 12287) + expectedSlotRange2, err := store.NewSlotRange(4096, 8191) require.NoError(t, err) expectedSlotRanges := []store.SlotRange{expectedSlotRange1, expectedSlotRange2} - require.Equal(t, expectedSlotRanges, cluster.Shards[2].SlotRanges) + require.Equal(t, expectedSlotRanges, cluster.Shards[1].SlotRanges) // we expect slots 4097, and 4098 on shard 3 from shard 1 - expectedSlotRange1, err = store.NewSlotRange(4097, 4098) + expectedSlotRange1, err = store.NewSlotRange(8192, 8193) require.NoError(t, err) expectedSlotRange2, err = store.NewSlotRange(12288, 16383) require.NoError(t, err) From d46121fc02f849e4de88dca5f03b1a070d0990b4 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 16:56:33 -0600 Subject: [PATCH 14/31] adds old test back in --- controller/cluster_test.go | 56 +++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/controller/cluster_test.go b/controller/cluster_test.go index 682c8123..9bec9a49 100644 --- a/controller/cluster_test.go +++ b/controller/cluster_test.go @@ -207,34 +207,34 @@ func TestCluster_LoadAndProbe(t *testing.T) { } } -// func TestCluster_MigrateSlot(t *testing.T) { -// ctx := context.Background() -// ns := "test-ns" -// clusterName := "test-clusterProbe" -// cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:7770", "127.0.0.1:7771"}, 1) -// require.NoError(t, err) -// -// require.NoError(t, cluster.Reset(ctx)) -// require.NoError(t, cluster.SyncToNodes(ctx)) -// defer func() { -// require.NoError(t, cluster.Reset(ctx)) -// }() -// slotRange, err := store.NewSlotRange(0, 0) -// require.NoError(t, err) -// require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false)) -// -// s := NewMockClusterStore() -// require.NoError(t, s.CreateCluster(ctx, ns, cluster)) -// -// clusterProbe := NewClusterChecker(s, ns, clusterName) -// clusterProbe.WithPingInterval(100 * time.Millisecond) -// clusterProbe.Start() -// defer clusterProbe.Close() -// -// ticker := time.NewTicker(2000 * time.Millisecond) -// defer ticker.Stop() -// <-ticker.C -// } +func TestCluster_MigrateSlot(t *testing.T) { + ctx := context.Background() + ns := "test-ns" + clusterName := "test-clusterProbe" + cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:7770", "127.0.0.1:7771"}, 1) + require.NoError(t, err) + + require.NoError(t, cluster.Reset(ctx)) + require.NoError(t, cluster.SyncToNodes(ctx)) + defer func() { + require.NoError(t, cluster.Reset(ctx)) + }() + slotRange, err := store.NewSlotRange(0, 0) + require.NoError(t, err) + require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false)) + + s := NewMockClusterStore() + require.NoError(t, s.CreateCluster(ctx, ns, cluster)) + + clusterProbe := NewClusterChecker(s, ns, clusterName) + clusterProbe.WithPingInterval(100 * time.Millisecond) + clusterProbe.Start() + defer clusterProbe.Close() + + ticker := time.NewTicker(2000 * time.Millisecond) + defer ticker.Stop() + <-ticker.C +} func TestCluster_MigrateQueuedSlot(t *testing.T) { ctx := context.Background() From 5e91e3e9798243b7f27f6839a74ef146d2ed0b95 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Fri, 1 Aug 2025 17:14:33 -0600 Subject: [PATCH 15/31] removes external docker network and removes some logging --- scripts/docker/docker-compose.yml | 1 - store/store.go | 6 ++---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/scripts/docker/docker-compose.yml b/scripts/docker/docker-compose.yml index d098933d..40af151f 100644 --- a/scripts/docker/docker-compose.yml +++ b/scripts/docker/docker-compose.yml @@ -82,4 +82,3 @@ services: networks: kvrocks-dev: - external: true diff --git a/store/store.go b/store/store.go index 7a9eae05..84951c37 100644 --- a/store/store.go +++ b/store/store.go @@ -187,7 +187,8 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo return fmt.Errorf("the cluster has been updated by others") } - // We want the most up to date queue + // We want the most up to date queue. The oldCluster could have had updates to + // the migration queue clusterInfo.MigrationQueue = oldCluster.MigrationQueue clusterInfo.Version.Add(1) @@ -211,9 +212,6 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo // SetCluster set the cluster to store under the specified namespace but won't increase the version. func (s *ClusterStore) SetCluster(ctx context.Context, ns string, clusterInfo *Cluster) error { - log := logger.Get().With( - zap.String("inside", "hello")) - log.Info("setting cluster") lock := s.getLock(ns, clusterInfo.Name) lock.Lock() defer lock.Unlock() From 323879d53506b94b0fa44453bd25480ac3ad52b6 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Sat, 2 Aug 2025 02:22:53 -0600 Subject: [PATCH 16/31] removes some more logging --- controller/cluster.go | 2 +- store/cluster.go | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 8473b7c8..07a31e4f 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -378,7 +378,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu } c.updateCluster(clonedCluster) - log.Info("checking if migration queue is available", zap.String("debug", "byron")) + log.Info("Check if migration queue is available") if clonedCluster.MigrationQueue.Available() { log.Info("should be available, should trigger some more? ", zap.String("debug", "byron")) err = clonedCluster.MigrateAvailableSlots(ctx) diff --git a/store/cluster.go b/store/cluster.go index 05895350..2887967d 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -242,23 +242,17 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) { } func (cluster *Cluster) MigrateAvailableSlots(ctx context.Context) error { - log := logger.Get() - log.Info("about to migrate available slots") if !cluster.MigrationQueue.Available() { return consts.ErrNoMigrationsAvailable } - - log.Info("about to go through the loop") queueCopy := cluster.MigrationQueue.Clone().Data cluster.MigrationQueue.Clear() for _, request := range queueCopy { - log.Info("migrating", zap.String("request", request.Slot.String())) err := cluster.MigrateSlot(ctx, request.Slot, request.Target, request.SlotOnly) if err != nil { return err } } - return nil } From 1e7158f58ad1c43c3589209601056610ff5bd2a3 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 4 Aug 2025 14:35:45 -0600 Subject: [PATCH 17/31] adds a way to delete migration queue --- cmd/client/command/migrate.go | 21 ++++++++++++++++++--- server/api/cluster.go | 22 ++++++++++++++++++++++ server/route.go | 1 + store/cluster_node.go | 3 --- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/cmd/client/command/migrate.go b/cmd/client/command/migrate.go index 74f4be33..65b21aab 100644 --- a/cmd/client/command/migrate.go +++ b/cmd/client/command/migrate.go @@ -59,7 +59,7 @@ kvctl migrate cancel -n -c case MigrateSlot: return migrateSlot(client, &migrateOptions) case MigrateCancel: - return migrateCancel(client) + return migrateCancel(client, &migrateOptions) default: return fmt.Errorf("unsupported resource type: %s", resource) } @@ -68,8 +68,23 @@ kvctl migrate cancel -n -c SilenceErrors: true, } -func migrateCancel(_ *client) error { - // TODO: STUB bseto need to add the cancel logic still +func migrateCancel(client *client, options *MigrationOptions) error { + rsp, err := client.restyCli.R(). + SetPathParam("namespace", options.namespace). + SetPathParam("cluster", options.cluster). + SetBody(map[string]interface{}{ + "slot": options.slot, + "target": options.target, + "slotOnly": strconv.FormatBool(options.slotOnly), + }). + Delete("/namespaces/{namespace}/clusters/{cluster}/migrate") + if err != nil { + return err + } + if rsp.IsError() { + return errors.New(rsp.String()) + } + printLine("migrate slot[%s] task is submitted successfully.", options.slot) return nil } diff --git a/server/api/cluster.go b/server/api/cluster.go index 5f7fd798..b7582a88 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -128,6 +128,28 @@ func (handler *ClusterHandler) Remove(c *gin.Context) { helper.ResponseNoContent(c) } +func (handler *ClusterHandler) DeleteMigrateQueue(c *gin.Context) { + namespace := c.Param("namespace") + clusterName := c.Param("cluster") + log := logger.Get().With( + zap.String("namespace", namespace), + zap.String("cluster", clusterName)) + log.Info("deleting migration queue") + + lock := handler.getLock(namespace, clusterName) + lock.Lock() + defer lock.Unlock() + + s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore) + cluster, err := s.GetCluster(c, namespace, clusterName) + if err != nil { + helper.ResponseError(c, err) + return + } + cluster.MigrationQueue.Clear() + helper.ResponseOK(c, gin.H{"cluster": cluster}) +} + func (handler *ClusterHandler) MigrateSlot(c *gin.Context) { namespace := c.Param("namespace") clusterName := c.Param("cluster") diff --git a/server/route.go b/server/route.go index 109f8dd4..610465d4 100644 --- a/server/route.go +++ b/server/route.go @@ -70,6 +70,7 @@ func (srv *Server) initHandlers() { clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get) clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove) clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot) + clusters.DELETE("/:cluster/migrate", handler.Cluster.DeleteMigrateQueue) } shards := clusters.Group("/:cluster/shards") diff --git a/store/cluster_node.go b/store/cluster_node.go index 925c1b60..8d038d0d 100644 --- a/store/cluster_node.go +++ b/store/cluster_node.go @@ -149,9 +149,6 @@ func (n *ClusterNode) GetClient() *redis.Client { } } - // TODO: bseto consider making these configurable, I'm seeing a lot of errors - // whenever we try to update the cluster. Lots of client connection spam, could - // be related to the timeouts? client := redis.NewClient(&redis.Options{ Addr: n.addr, Password: n.password, From f9eb3b9714a485835c3329cf5024a1bfdac9a47f Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Mon, 4 Aug 2025 19:11:24 -0600 Subject: [PATCH 18/31] adds a way to cancel migration queue --- cmd/client/command/migrate.go | 17 +++++++++++------ server/api/cluster.go | 5 +++++ 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/cmd/client/command/migrate.go b/cmd/client/command/migrate.go index 65b21aab..2c91f111 100644 --- a/cmd/client/command/migrate.go +++ b/cmd/client/command/migrate.go @@ -92,6 +92,17 @@ func migrationPreRun(_ *cobra.Command, args []string) error { if len(args) < 1 { return fmt.Errorf("resource type should be specified") } + resource := strings.ToLower(args[0]) + if migrateOptions.namespace == "" { + return fmt.Errorf("namespace is required, please specify with -n or --namespace") + } + if migrateOptions.cluster == "" { + return fmt.Errorf("cluster is required, please specify with -c or --cluster") + } + // for migrate cancel, we only need namespace and cluster + if resource == MigrateCancel { + return nil + } if len(args) < 2 { return fmt.Errorf("the slot number should be specified") } @@ -101,12 +112,6 @@ func migrationPreRun(_ *cobra.Command, args []string) error { } migrateOptions.slot = args[1] - if migrateOptions.namespace == "" { - return fmt.Errorf("namespace is required, please specify with -n or --namespace") - } - if migrateOptions.cluster == "" { - return fmt.Errorf("cluster is required, please specify with -c or --cluster") - } if migrateOptions.target < 0 { return fmt.Errorf("target is required, please specify with --target") } diff --git a/server/api/cluster.go b/server/api/cluster.go index b7582a88..a269b401 100644 --- a/server/api/cluster.go +++ b/server/api/cluster.go @@ -147,6 +147,11 @@ func (handler *ClusterHandler) DeleteMigrateQueue(c *gin.Context) { return } cluster.MigrationQueue.Clear() + err = s.SetCluster(c, namespace, cluster) + if err != nil { + helper.ResponseError(c, err) + return + } helper.ResponseOK(c, gin.H{"cluster": cluster}) } From 0f94db95ba2ae613e500868535138c03247a533c Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Tue, 5 Aug 2025 10:25:50 -0600 Subject: [PATCH 19/31] removes some old logging --- controller/cluster.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 07a31e4f..091d11c0 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -364,7 +364,6 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": - log.Info("successful migration of slot", zap.String("debug", "byron")) clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange) clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange, @@ -378,18 +377,18 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu } c.updateCluster(clonedCluster) - log.Info("Check if migration queue is available") if clonedCluster.MigrationQueue.Available() { - log.Info("should be available, should trigger some more? ", zap.String("debug", "byron")) + log.Info("Migration queue is not empty, migrating queue'd requests") err = clonedCluster.MigrateAvailableSlots(ctx) if err != nil { - log.Error("not available?", zap.Error(err)) + log.Error("Unable to trigger", zap.Error(err)) continue } - c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster) + if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil { + log.Error("Failed to update the cluster", zap.Error(err)) + return + } c.updateCluster(clonedCluster) - } else { - log.Info("not available", zap.String("debug", "byron")) } default: clonedCluster.Shards[i].ClearMigrateState() From 2a31dc46d4d983f7a0d08ef8071237d6186b9bf0 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Tue, 5 Aug 2025 18:41:00 -0600 Subject: [PATCH 20/31] adds a CanMigrate function to check if migrations are possible --- store/cluster.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/store/cluster.go b/store/cluster.go index 2887967d..b60fe21a 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -276,9 +276,9 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS return nil } - if cluster.Shards[sourceShardIdx].IsMigrating() || cluster.Shards[targetShardIdx].IsMigrating() { + if !cluster.CanMigrate(sourceShardIdx, targetShardIdx) { log.Info( - "source or target shard is migrating, queueing up", + "source or target shard is already involved in a migration, queueing up a migration", zap.String("slot", slot.String()), zap.Int("source", sourceShardIdx), zap.Int("target", targetShardIdx), @@ -302,6 +302,22 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS return nil } +func (cluster *Cluster) CanMigrate(sourceIdx, targetIdx int) bool { + // need to check if source or target is already migrating + if cluster.Shards[sourceIdx].IsMigrating() || cluster.Shards[targetIdx].IsMigrating() { + return false + } + + // also need to check if anything is migrating to the source, or the target too + for _, shard := range cluster.Shards { + if shard.TargetShardIndex == sourceIdx || shard.TargetShardIndex == targetIdx { + return false + } + } + + return true +} + func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID string) error { version := cluster.Version.Add(1) for i := 0; i < len(cluster.Shards); i++ { From 45636e8e288475085d8c054316cfc6254778f3e1 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Tue, 12 Aug 2025 09:53:07 -0600 Subject: [PATCH 21/31] changes cancel to be clear instead --- cmd/client/command/consts.go | 4 ++-- cmd/client/command/migrate.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/client/command/consts.go b/cmd/client/command/consts.go index 1782d1d8..dd931d19 100644 --- a/cmd/client/command/consts.go +++ b/cmd/client/command/consts.go @@ -28,6 +28,6 @@ const ( ) const ( - MigrateSlot = "slot" - MigrateCancel = "cancel" + MigrateSlot = "slot" + MigrateClear = "clear" ) diff --git a/cmd/client/command/migrate.go b/cmd/client/command/migrate.go index 2c91f111..bb1ca3ca 100644 --- a/cmd/client/command/migrate.go +++ b/cmd/client/command/migrate.go @@ -47,8 +47,8 @@ var MigrateCommand = &cobra.Command{ # Migrate slot between cluster shards kvctl migrate slot --target -n -c -# Cancel the queue for migration - does not stop the current migration -kvctl migrate cancel -n -c +# Clear the queue for migration - does not stop the current migration +kvctl migrate clear -n -c `, PreRunE: migrationPreRun, RunE: func(cmd *cobra.Command, args []string) error { @@ -58,8 +58,8 @@ kvctl migrate cancel -n -c switch resource { case MigrateSlot: return migrateSlot(client, &migrateOptions) - case MigrateCancel: - return migrateCancel(client, &migrateOptions) + case MigrateClear: + return migrateClear(client, &migrateOptions) default: return fmt.Errorf("unsupported resource type: %s", resource) } @@ -68,7 +68,7 @@ kvctl migrate cancel -n -c SilenceErrors: true, } -func migrateCancel(client *client, options *MigrationOptions) error { +func migrateClear(client *client, options *MigrationOptions) error { rsp, err := client.restyCli.R(). SetPathParam("namespace", options.namespace). SetPathParam("cluster", options.cluster). @@ -99,8 +99,8 @@ func migrationPreRun(_ *cobra.Command, args []string) error { if migrateOptions.cluster == "" { return fmt.Errorf("cluster is required, please specify with -c or --cluster") } - // for migrate cancel, we only need namespace and cluster - if resource == MigrateCancel { + // for migrate clear, we only need namespace and cluster + if resource == MigrateClear { return nil } if len(args) < 2 { From 6aa2c67e560f838a88b20e398b5cf3ea008289ca Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Tue, 12 Aug 2025 13:42:23 -0600 Subject: [PATCH 22/31] adds a new starting hook for clusterChecker when the controller starts up, it can go through the queue and see if anything can be queue'd up --- controller/cluster.go | 62 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/controller/cluster.go b/controller/cluster.go index 091d11c0..794e3b1c 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -30,6 +30,7 @@ import ( "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store" + "github.com/rs/zerolog/log" ) var ( @@ -58,9 +59,23 @@ type ClusterChecker struct { ctx context.Context cancelFn context.CancelFunc + startHooks []Hook + wg sync.WaitGroup } +// HookParams, any writes/updates to the `cluster` variable will automatically +// be saved to the ClusterChecker after all the hooks are run +type HookParams struct { + options ClusterCheckOptions + clusterStore store.Store + cluster *store.Cluster + namespace string + clusterName string +} + +type Hook func(context.Context, HookParams) error + func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { ctx, cancel := context.WithCancel(context.Background()) c := &ClusterChecker{ @@ -78,16 +93,63 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { ctx: ctx, cancelFn: cancel, } + c.AddStartHook(MigrateAvailableSlots()) return c } func (c *ClusterChecker) Start() { + c.triggerStartHooks() + c.wg.Add(1) go c.probeLoop() c.wg.Add(1) go c.migrationLoop() } +func MigrateAvailableSlots() Hook { + return func(ctx context.Context, params HookParams) error { + if params.cluster.MigrationQueue.Available() { + err := params.cluster.MigrateAvailableSlots(ctx) + if err != nil { + return err + } + if err := params.clusterStore.SetCluster(ctx, params.namespace, params.cluster); err != nil { + log.Error("Failed to update the cluster", zap.Error(err)) + return err + } + } + return nil + } +} + +func (c *ClusterChecker) AddStartHook(hooks ...Hook) { + c.startHooks = append(c.startHooks, hooks...) +} + +func (c *ClusterChecker) triggerStartHooks() { + log := logger.Get().With( + zap.String("namespace", c.namespace), + zap.String("cluster", c.clusterName)) + c.clusterMu.Lock() + defer c.clusterMu.Unlock() + params := HookParams{ + options: c.options, + clusterStore: c.clusterStore, + cluster: c.cluster.Clone(), + namespace: c.namespace, + clusterName: c.clusterName, + } + for _, hook := range c.startHooks { + err := hook(c.ctx, params) + if err != nil { + log.Error("start hook", zap.Error(err)) + } + } + + // we're already holding the lock, so we can update c.cluster + c.cluster = params.cluster +} + func (c *ClusterChecker) WithPingInterval(interval time.Duration) *ClusterChecker { c.options.pingInterval = interval if c.options.pingInterval < 200*time.Millisecond { From 9b5e17110bde963ccd25f0b76a0ebce390900a9e Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Tue, 12 Aug 2025 16:33:15 -0600 Subject: [PATCH 23/31] adds another hook that initializes the clusterInfo first --- controller/cluster.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 794e3b1c..1368c43d 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -22,6 +22,7 @@ package controller import ( "context" "errors" + "fmt" "strings" "sync" "time" @@ -30,7 +31,6 @@ import ( "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store" - "github.com/rs/zerolog/log" ) var ( @@ -74,7 +74,7 @@ type HookParams struct { clusterName string } -type Hook func(context.Context, HookParams) error +type Hook func(context.Context, *HookParams) error func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { ctx, cancel := context.WithCancel(context.Background()) @@ -93,6 +93,7 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { ctx: ctx, cancelFn: cancel, } + c.AddStartHook(InitializeClusterInfo()) // needs to be first c.AddStartHook(MigrateAvailableSlots()) return c } @@ -106,15 +107,25 @@ func (c *ClusterChecker) Start() { go c.migrationLoop() } +func InitializeClusterInfo() Hook { + return func(ctx context.Context, params *HookParams) error { + clusterInfo, err := params.clusterStore.GetCluster(ctx, params.namespace, params.clusterName) + if err != nil { + return fmt.Errorf("failed to get the clusterName info from the clusterStore: %w", err) + } + params.cluster = clusterInfo + return nil + } +} + func MigrateAvailableSlots() Hook { - return func(ctx context.Context, params HookParams) error { + return func(ctx context.Context, params *HookParams) error { if params.cluster.MigrationQueue.Available() { err := params.cluster.MigrateAvailableSlots(ctx) if err != nil { return err } if err := params.clusterStore.SetCluster(ctx, params.namespace, params.cluster); err != nil { - log.Error("Failed to update the cluster", zap.Error(err)) return err } } @@ -132,10 +143,11 @@ func (c *ClusterChecker) triggerStartHooks() { zap.String("cluster", c.clusterName)) c.clusterMu.Lock() defer c.clusterMu.Unlock() - params := HookParams{ + + params := &HookParams{ options: c.options, clusterStore: c.clusterStore, - cluster: c.cluster.Clone(), + cluster: c.cluster, // we have the clusterMu lock, so it's ok that hooks can access this cluster var directly namespace: c.namespace, clusterName: c.clusterName, } @@ -145,9 +157,6 @@ func (c *ClusterChecker) triggerStartHooks() { log.Error("start hook", zap.Error(err)) } } - - // we're already holding the lock, so we can update c.cluster - c.cluster = params.cluster } func (c *ClusterChecker) WithPingInterval(interval time.Duration) *ClusterChecker { From dcd45e0145dfb07ef3ea37c6a0e52654a5aa78c7 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Tue, 12 Aug 2025 17:14:11 -0600 Subject: [PATCH 24/31] adds more comment to initializeClusterInfo --- controller/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controller/cluster.go b/controller/cluster.go index 1368c43d..2d383035 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -93,7 +93,7 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { ctx: ctx, cancelFn: cancel, } - c.AddStartHook(InitializeClusterInfo()) // needs to be first + c.AddStartHook(InitializeClusterInfo()) // needs to be first to initialize cluster info c.AddStartHook(MigrateAvailableSlots()) return c } From bf69c6f5561e5abc5f6cfb11e1cfa4ab268ed8cc Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 14 Aug 2025 10:20:15 -0600 Subject: [PATCH 25/31] adds a writer to load the cluster with --- cmd/writer/main.go | 57 ++ controller/cluster.go | 18 +- go.mod | 1 + go.sum | 8 +- scripts/docker/kvrocks/Dockerfile | 4 + scripts/docker/kvrocks/kvrocks.conf | 1060 +++++++++++++++++++++++++++ store/store.go | 6 +- 7 files changed, 1142 insertions(+), 12 deletions(-) create mode 100644 cmd/writer/main.go create mode 100644 scripts/docker/kvrocks/kvrocks.conf diff --git a/cmd/writer/main.go b/cmd/writer/main.go new file mode 100644 index 00000000..31d738f3 --- /dev/null +++ b/cmd/writer/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/apache/kvrocks-controller/logger" + "github.com/redis/rueidis" + "go.uber.org/zap" +) + +func main() { + client, err := rueidis.NewClient( + rueidis.ClientOption{ + InitAddress: []string{"127.0.0.1:7000"}, + ShuffleInit: true, + ConnWriteTimeout: time.Millisecond * 100, + DisableCache: true, // client cache is not enabled on kvrocks + PipelineMultiplex: 5, + MaxFlushDelay: 50 * time.Microsecond, + AlwaysPipelining: true, + DisableTCPNoDelay: true, + DisableRetry: true, + }, + ) + if err != nil { + logger.Get().Error("unable to get rueidis client", zap.Error(err)) + return + } + + hSetExpire(ctx, time.Second * 1, client, "hello", , data map[string][]byte, ttl time.Duration) +} + +func hSetExpire(ctx context.Context, timeout time.Duration, client rueidis.Client, key string, cols []string, data map[string][]byte, ttl time.Duration) error { + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + convertedSlice := make([]string, 0, len(cols)*2) + for _, col := range cols { + if _, ok := data[col]; !ok { + return fmt.Errorf("field %s not found in data", col) + } + convertedSlice = append(convertedSlice, col, string(data[col])) + } + + cmd := client.B(). + Arbitrary("HSETEXPIRE"). + Keys(key). + Args(strconv.Itoa(int(ttl.Seconds()))). + Args(convertedSlice...). + Build() + resp := client.Do(timeoutCtx, cmd) + err := resp.Error() + return err +} diff --git a/controller/cluster.go b/controller/cluster.go index 2d383035..e91cab11 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -373,8 +373,8 @@ func (c *ClusterChecker) probeLoop() { } c.clusterMu.Lock() c.cluster = clusterInfo - c.clusterMu.Unlock() c.parallelProbeNodes(c.ctx, clusterInfo) + c.clusterMu.Unlock() case <-c.syncCh: if err := c.syncClusterToNodes(c.ctx); err != nil { log.Error("Failed to sync the clusterName to the nodes", zap.Error(err)) @@ -391,6 +391,10 @@ func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { c.clusterMu.Unlock() } +func (c *ClusterChecker) updateClusterNoLock(cluster *store.Cluster) { + c.cluster = cluster +} + func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) { log := logger.Get().With( zap.String("namespace", c.namespace), @@ -432,7 +436,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to update the cluster", zap.Error(err)) return } - c.updateCluster(clonedCluster) + c.updateClusterNoLock(clonedCluster) log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange) @@ -446,8 +450,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu } else { log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String())) } - c.updateCluster(clonedCluster) - + c.updateClusterNoLock(clonedCluster) if clonedCluster.MigrationQueue.Available() { log.Info("Migration queue is not empty, migrating queue'd requests") err = clonedCluster.MigrateAvailableSlots(ctx) @@ -459,7 +462,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to update the cluster", zap.Error(err)) return } - c.updateCluster(clonedCluster) + c.updateClusterNoLock(clonedCluster) } default: clonedCluster.Shards[i].ClearMigrateState() @@ -467,7 +470,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to update the cluster", zap.Error(err)) return } - c.updateCluster(clonedCluster) + c.updateClusterNoLock(clonedCluster) log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState)) } } @@ -489,11 +492,12 @@ func (c *ClusterChecker) migrationLoop() { continue } clonedCluster := c.cluster.Clone() - c.clusterMu.Unlock() if clonedCluster == nil { + c.clusterMu.Unlock() continue } c.tryUpdateMigrationStatus(c.ctx, clonedCluster) + c.clusterMu.Unlock() } } } diff --git a/go.mod b/go.mod index c427b212..867450a3 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/hashicorp/consul/api v1.31.2 github.com/olekukonko/tablewriter v0.0.5 github.com/prometheus/client_golang v1.21.1 + github.com/redis/rueidis v1.0.64 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 go.etcd.io/etcd v3.3.27+incompatible diff --git a/go.sum b/go.sum index a34447aa..471f71d2 100644 --- a/go.sum +++ b/go.sum @@ -220,8 +220,8 @@ github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= -github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= @@ -258,6 +258,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM= github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg= +github.com/redis/rueidis v1.0.64 h1:XqgbueDuNV3qFdVdQwAHJl1uNt90zUuAJuzqjH4cw6Y= +github.com/redis/rueidis v1.0.64/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -435,8 +437,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scripts/docker/kvrocks/Dockerfile b/scripts/docker/kvrocks/Dockerfile index 01300ddf..4db9c912 100644 --- a/scripts/docker/kvrocks/Dockerfile +++ b/scripts/docker/kvrocks/Dockerfile @@ -1,5 +1,9 @@ FROM apache/kvrocks:latest USER root + +# Copy your custom kvrocks.conf file into the image +COPY kvrocks.conf /var/lib/kvrocks/kvrocks.conf + RUN mkdir /tmp/kvrocks7770 /tmp/kvrocks7771 /tmp/kvrocks7772 /tmp/kvrocks7773 RUN echo "kvrocks -c /var/lib/kvrocks/kvrocks.conf --port 7770 --dir /tmp/kvrocks7770 --daemonize yes --cluster-enabled yes --bind 0.0.0.0" >> start.sh diff --git a/scripts/docker/kvrocks/kvrocks.conf b/scripts/docker/kvrocks/kvrocks.conf new file mode 100644 index 00000000..db1aa639 --- /dev/null +++ b/scripts/docker/kvrocks/kvrocks.conf @@ -0,0 +1,1060 @@ +################################ GENERAL ##################################### + +# By default kvrocks listens for connections from localhost interface. +# It is possible to listen to just one or multiple interfaces using +# the "bind" configuration directive, followed by one or more IP addresses. +# +# Examples: +# +# bind 192.168.1.100 10.0.0.1 +# bind 127.0.0.1 ::1 +# bind 0.0.0.0 +bind 127.0.0.1 + +# Unix socket. +# +# Specify the path for the unix socket that will be used to listen for +# incoming connections. There is no default, so kvrocks will not listen +# on a unix socket when not specified. +# +# unixsocket /tmp/kvrocks.sock +# unixsocketperm 777 + +# Allows a parent process to open a socket and pass its FD down to kvrocks as a child +# process. Useful to reserve a port and prevent race conditions. +# +# PLEASE NOTE: +# If this is overridden to a value other than -1, the bind and tls* directives will be +# ignored. +# +# Default: -1 (not overridden, defer to creating a connection to the specified port) +socket-fd -1 + +# Accept connections on the specified port, default is 6666. +port 6666 + +# Close the connection after a client is idle for N seconds (0 to disable) +timeout 0 + +# The number of worker's threads, increase or decrease would affect the performance. +workers 8 + +# By default, kvrocks does not run as a daemon. Use 'yes' if you need it. +# It will create a PID file when daemonize is enabled, and its path is specified by pidfile. +daemonize no + +# Kvrocks implements the cluster solution that is similar to the Redis cluster solution. +# You can get cluster information by CLUSTER NODES|SLOTS|INFO command, it also is +# adapted to redis-cli, redis-benchmark, Redis cluster SDK, and Redis cluster proxy. +# But kvrocks doesn't support communicating with each other, so you must set +# cluster topology by CLUSTER SETNODES|SETNODEID commands, more details: #219. +# +# PLEASE NOTE: +# If you enable cluster, kvrocks will encode key with its slot id calculated by +# CRC16 and modulo 16384, encoding key with its slot id makes it efficient to +# migrate keys based on the slot. So if you enabled at first time, cluster mode must +# not be disabled after restarting, and vice versa. That is to say, data is not +# compatible between standalone mode with cluster mode, you must migrate data +# if you want to change mode, otherwise, kvrocks will make data corrupt. +# +# Default: no + +cluster-enabled no + +# By default, namespaces are stored in the configuration file and won't be replicated +# to replicas. This option allows to change this behavior, so that namespaces are also +# propagated to slaves. Note that: +# 1) it won't replicate the 'masterauth' to prevent breaking master/replica replication +# 2) it will overwrite replica's namespace with master's namespace, so be careful of in-using namespaces +# 3) cannot switch off the namespace replication once it's enabled +# +# Default: no +repl-namespace-enabled no + +# By default, the max length of bulk string is limited to 512MB. If you want to +# change this limit to a different value(must >= 1MiB), you can use the following configuration. +# It can be just an integer (e.g. 10000000), or an integer followed by a unit (e.g. 12M, 7G, 2T). +# +# proto-max-bulk-len 536870912 + +# Persist the cluster nodes topology in local file($dir/nodes.conf). This configuration +# takes effect only if the cluster mode was enabled. +# +# If yes, it will try to load the cluster topology from the local file when starting, +# and dump the cluster nodes into the file if it was changed. +# +# Default: yes +persist-cluster-nodes-enabled yes + +# Set the max number of connected clients at the same time. By default +# this limit is set to 10000 clients. However, if the server is not +# able to configure the process file limit to allow for the specified limit +# the max number of allowed clients is set to the current file limit +# +# Once the limit is reached the server will close all the new connections sending +# an error 'max number of clients reached'. +# +maxclients 10000 + +# Require clients to issue AUTH before processing any other +# commands. This might be useful in environments in which you do not trust +# others with access to the host running kvrocks. +# +# This should stay commented out for backward compatibility and because most +# people do not need auth (e.g. they run their own servers). +# +# Warning: since kvrocks is pretty fast an outside user can try up to +# 150k passwords per second against a good box. This means that you should +# use a very strong password otherwise it will be very easy to break. +# +# requirepass foobared + +# If the master is password protected (using the "masterauth" configuration +# directive below) it is possible to tell the slave to authenticate before +# starting the replication synchronization process. Otherwise, the master will +# refuse the slave request. +# +# masterauth foobared + +# Master-Salve replication would check db name is matched. if not, the slave should +# refuse to sync the db from master. Don't use the default value, set the db-name to identify +# the cluster. +db-name change.me.db + +# The working directory +# +# The DB will be written inside this directory +# Note that you must specify a directory here, not a file name. +dir /tmp/kvrocks + +# You can configure where to store your server logs by the log-dir. +# If you don't specify one, we will use the above `dir` as our default log directory. +# We also can send logs to stdout/stderr is as simple as: +# +log-dir stdout + +# Log level +# Possible values: info, warning, error, fatal +# Default: info +log-level info + +# You can configure log-retention-days to control whether to enable the log cleaner +# and the maximum retention days that the INFO level logs will be kept. +# +# if set to -1, that means to disable the log cleaner. +# if set to 0, all previous INFO level logs will be immediately removed. +# if set to between 0 to INT_MAX, that means it will retent latest N(log-retention-days) day logs. + +# By default the log-retention-days is -1. +log-retention-days -1 + +# When running in daemonize mode, kvrocks writes a PID file in ${CONFIG_DIR}/kvrocks.pid by +# default. You can specify a custom pid file location here. +# pidfile /var/run/kvrocks.pid + +# You can configure a slave instance to accept writes or not. Writing against +# a slave instance may be useful to store some ephemeral data (because data +# written on a slave will be easily deleted after resync with the master) but +# may also cause problems if clients are writing to it because of a +# misconfiguration. +slave-read-only yes + +# The slave priority is an integer number published by Kvrocks in the INFO output. +# It is used by Redis Sentinel in order to select a slave to promote into a +# master if the master is no longer working correctly. +# +# A slave with a low priority number is considered better for promotion, so +# for instance if there are three slave with priority 10, 100, 25 Sentinel will +# pick the one with priority 10, that is the lowest. +# +# However a special priority of 0 marks the replica as not able to perform the +# role of master, so a slave with priority of 0 will never be selected by +# Redis Sentinel for promotion. +# +# By default the priority is 100. +slave-priority 100 + +# Change the default timeout in milliseconds for socket connect during replication. +# The default value is 3100, and 0 means no timeout. +# +# If the master is unreachable before connecting, not having a timeout may block future +# 'clusterx setnodes' commands because the replication thread is blocked on connect. +replication-connect-timeout-ms 3100 + +# Change the default timeout in milliseconds for socket recv during fullsync. +# The default value is 3200, and 0 means no timeout. +# +# If the master is unreachable when fetching SST files, not having a timeout may block +# future 'clusterx setnodes' commands because the replication thread is blocked on recv. +replication-recv-timeout-ms 3200 + +# TCP listen() backlog. +# +# In high requests-per-second environments you need an high backlog in order +# to avoid slow clients connections issues. Note that the Linux kernel +# will silently truncate it to the value of /proc/sys/net/core/somaxconn so +# make sure to raise both the value of somaxconn and tcp_max_syn_backlog +# in order to Get the desired effect. +tcp-backlog 511 + +# If the master is an old version, it may have specified replication threads +# that use 'port + 1' as listening port, but in new versions, we don't use +# extra port to implement replication. In order to allow the new replicas to +# copy old masters, you should indicate that the master uses replication port +# or not. +# If yes, that indicates master uses replication port and replicas will connect +# to 'master's listening port + 1' when synchronization. +# If no, that indicates master doesn't use replication port and replicas will +# connect 'master's listening port' when synchronization. +master-use-repl-port no + +# Currently, master only checks sequence number when replica asks for PSYNC, +# that is not enough since they may have different replication histories even +# the replica asking sequence is in the range of the master current WAL. +# +# We design 'Replication Sequence ID' PSYNC, we add unique replication id for +# every write batch (the operation of each command on the storage engine), so +# the combination of replication id and sequence is unique for write batch. +# The master can identify whether the replica has the same replication history +# by checking replication id and sequence. +# +# By default, it is not enabled since this stricter check may easily lead to +# full synchronization. +use-rsid-psync no + +# Master-Slave replication. Use slaveof to make a kvrocks instance a copy of +# another kvrocks server. A few things to understand ASAP about kvrocks replication. +# +# 1) Kvrocks replication is asynchronous, but you can configure a master to +# stop accepting writes if it appears to be not connected with at least +# a given number of slaves. +# 2) Kvrocks slaves are able to perform a partial resynchronization with the +# master if the replication link is lost for a relatively small amount of +# time. You may want to configure the replication backlog size (see the next +# sections of this file) with a sensible value depending on your needs. +# 3) Replication is automatic and does not need user intervention. After a +# network partition slaves automatically try to reconnect to masters +# and resynchronize with them. +# +# slaveof +# slaveof 127.0.0.1 6379 + +# When a slave loses its connection with the master, or when the replication +# is still in progress, the slave can act in two different ways: +# +# 1) if slave-serve-stale-data is set to 'yes' (the default) the slave will +# still reply to client requests, possibly with out-of-date data, or the +# data set may just be empty if this is the first synchronization. +# +# 2) if slave-serve-stale-data is set to 'no' the slave will reply with +# an error "SYNC with master in progress" to all kinds of commands +# but to INFO and SLAVEOF. +# +slave-serve-stale-data yes + +# To guarantee slave's data safe and serve when it is in full synchronization +# state, slave still keep itself data. But this way needs to occupy much disk +# space, so we provide a way to reduce disk occupation, slave will delete itself +# entire database before fetching files from master during full synchronization. +# If you want to enable this way, you can set 'slave-delete-db-before-fullsync' +# to yes, but you must know that database will be lost if master is down during +# full synchronization, unless you have a backup of database. +# +# This option is similar redis replicas RDB diskless load option: +# repl-diskless-load on-empty-db +# +# Default: no +slave-empty-db-before-fullsync no + +# A Kvrocks master is able to list the address and port of the attached +# replicas in different ways. For example the "INFO replication" section +# offers this information, which is used, among other tools, by +# Redis Sentinel in order to discover replica instances. +# Another place where this info is available is in the output of the +# "ROLE" command of a master. +# +# The listed IP address and port normally reported by a replica is +# obtained in the following way: +# +# IP: The address is auto detected by checking the peer address +# of the socket used by the replica to connect with the master. +# +# Port: The port is communicated by the replica during the replication +# handshake, and is normally the port that the replica is using to +# listen for connections. +# +# However when port forwarding or Network Address Translation (NAT) is +# used, the replica may actually be reachable via different IP and port +# pairs. The following two options can be used by a replica in order to +# report to its master a specific set of IP and port, so that both INFO +# and ROLE will report those values. +# +# There is no need to use both the options if you need to override just +# the port or the IP address. +# +# replica-announce-ip 5.5.5.5 +# replica-announce-port 1234 + +# If replicas need full synchronization with master, master need to create +# checkpoint for feeding replicas, and replicas also stage a checkpoint of +# the master. If we also keep the backup, it maybe occupy extra disk space. +# You can enable 'purge-backup-on-fullsync' if disk is not sufficient, but +# that may cause remote backup copy failing. +# +# Default: no +purge-backup-on-fullsync no + +# The maximum allowed rate (in MB/s) that should be used by replication. +# If the rate exceeds max-replication-mb, replication will slow down. +# Default: 0 (i.e. no limit) +max-replication-mb 0 + +# The maximum allowed aggregated write rate of flush and compaction (in MB/s). +# If the rate exceeds max-io-mb, io will slow down. +# 0 is no limit +# Default: 0 +max-io-mb 0 + +# The maximum allowed space (in GB) that should be used by RocksDB. +# If the total size of the SST files exceeds max_allowed_space, writes to RocksDB will fail. +# Please see: https://github.com/facebook/rocksdb/wiki/Managing-Disk-Space-Utilization +# Default: 0 (i.e. no limit) +max-db-size 0 + +# The maximum backup to keep, server cron would run every minutes to check the num of current +# backup, and purge the old backup if exceed the max backup num to keep. If max-backup-to-keep +# is 0, no backup would be kept. But now, we only support 0 or 1. +max-backup-to-keep 1 + +# The maximum hours to keep the backup. If max-backup-keep-hours is 0, wouldn't purge any backup. +# default: 1 day +max-backup-keep-hours 24 + +# max-bitmap-to-string-mb use to limit the max size of bitmap to string transformation(MB). +# +# Default: 16 +max-bitmap-to-string-mb 16 + +# Whether to enable SCAN-like cursor compatible with Redis. +# If enabled, the cursor will be unsigned 64-bit integers. +# If disabled, the cursor will be a string. +# Default: yes +redis-cursor-compatible yes + +# Whether to enable the RESP3 protocol. +# +# Default: yes +# resp3-enabled yes + +# Maximum nesting depth allowed when parsing and serializing +# JSON documents while using JSON commands like JSON.SET. +# Default: 1024 +json-max-nesting-depth 1024 + +# The underlying storage format of JSON data type +# NOTE: This option only affects newly written/updated key-values +# The CBOR format may reduce the storage size and speed up JSON commands +# Available values: json, cbor +# Default: json +json-storage-format json + +# Whether to enable transactional mode engine::Context. +# +# If enabled, is_txn_mode in engine::Context will be set properly, +# which is expected to improve the consistency of commands. +# If disabled, is_txn_mode in engine::Context will be set to false, +# making engine::Context equivalent to engine::Storage. +# +# NOTE: This is an experimental feature. If you find errors, performance degradation, +# excessive memory usage, excessive disk I/O, etc. after enabling it, please try disabling it. +# At the same time, we welcome feedback on related issues to help iterative improvements. +# +# Default: no +txn-context-enabled no + +# Define the histogram bucket values. +# +# If enabled, those values will be used to store the command execution latency values +# in buckets defined below. The values should be integers and must be sorted. +# An implicit bucket (+Inf in prometheus jargon) will be added to track the highest values +# that are beyond the bucket limits. + +# NOTE: This is an experimental feature. There might be some performance overhead when using this +# feature, please be aware. +# Default: disabled +# histogram-bucket-boundaries 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000 + +################################## TLS ################################### + +# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0. +# To enable it, `tls-port` can be used to define TLS-listening ports. +# tls-port 0 + +# Configure a X.509 certificate and private key to use for authenticating the +# server to connected clients, masters or cluster peers. +# These files should be PEM formatted. +# +# tls-cert-file kvrocks.crt +# tls-key-file kvrocks.key + +# If the key file is encrypted using a passphrase, it can be included here +# as well. +# +# tls-key-file-pass secret + +# Configure a CA certificate(s) bundle or directory to authenticate TLS/SSL +# clients and peers. Kvrocks requires an explicit configuration of at least one +# of these, and will not implicitly use the system wide configuration. +# +# tls-ca-cert-file ca.crt +# tls-ca-cert-dir /etc/ssl/certs + +# By default, clients on a TLS port are required +# to authenticate using valid client side certificates. +# +# If "no" is specified, client certificates are not required and not accepted. +# If "optional" is specified, client certificates are accepted and must be +# valid if provided, but are not required. +# +# tls-auth-clients no +# tls-auth-clients optional + +# By default, only TLSv1.2 and TLSv1.3 are enabled and it is highly recommended +# that older formally deprecated versions are kept disabled to reduce the attack surface. +# You can explicitly specify TLS versions to support. +# Allowed values are case insensitive and include "TLSv1", "TLSv1.1", "TLSv1.2", +# "TLSv1.3" (OpenSSL >= 1.1.1) or any combination. +# To enable only TLSv1.2 and TLSv1.3, use: +# +# tls-protocols "TLSv1.2 TLSv1.3" + +# Configure allowed ciphers. See the ciphers(1ssl) manpage for more information +# about the syntax of this string. +# +# Note: this configuration applies only to <= TLSv1.2. +# +# tls-ciphers DEFAULT:!MEDIUM + +# Configure allowed TLSv1.3 ciphersuites. See the ciphers(1ssl) manpage for more +# information about the syntax of this string, and specifically for TLSv1.3 +# ciphersuites. +# +# tls-ciphersuites TLS_CHACHA20_POLY1305_SHA256 + +# When choosing a cipher, use the server's preference instead of the client +# preference. By default, the server follows the client's preference. +# +# tls-prefer-server-ciphers yes + +# By default, TLS session caching is enabled to allow faster and less expensive +# reconnections by clients that support it. Use the following directive to disable +# caching. +# +# tls-session-caching no + +# Change the default number of TLS sessions cached. A zero value sets the cache +# to unlimited size. The default size is 20480. +# +# tls-session-cache-size 5000 + +# Change the default timeout of cached TLS sessions. The default timeout is 300 +# seconds. +# +# tls-session-cache-timeout 60 + +# By default, a replica does not attempt to establish a TLS connection +# with its master. +# +# Use the following directive to enable TLS on replication links. +# +# tls-replication yes + +################################## SLOW LOG ################################### + +# The Kvrocks Slow Log is a mechanism to log queries that exceeded a specified +# execution time. The execution time does not include the I/O operations +# like talking with the client, sending the reply and so forth, +# but just the time needed to actually execute the command (this is the only +# stage of command execution where the thread is blocked and can not serve +# other requests in the meantime). +# +# You can configure the slow log with two parameters: one tells Kvrocks +# what is the execution time, in microseconds, to exceed in order for the +# command to get logged, and the other parameter is the length of the +# slow log. When a new command is logged the oldest one is removed from the +# queue of logged commands. + +# The following time is expressed in microseconds, so 1000000 is equivalent +# to one second. Note that -1 value disables the slow log, while +# a value of zero forces the logging of every command. +slowlog-log-slower-than 100000 + +# There is no limit to this length. Just be aware that it will consume memory. +# You can reclaim memory used by the slow log with SLOWLOG RESET. +slowlog-max-len 128 + +# If you run kvrocks from upstart or systemd, kvrocks can interact with your +# supervision tree. Options: +# supervised no - no supervision interaction +# supervised upstart - signal upstart by putting kvrocks into SIGSTOP mode +# supervised systemd - signal systemd by writing READY=1 to $NOTIFY_SOCKET +# supervised auto - detect upstart or systemd method based on +# UPSTART_JOB or NOTIFY_SOCKET environment variables +# Note: these supervision methods only signal "process is ready." +# They do not enable continuous liveness pings back to your supervisor. +supervised no + +################################## PERF LOG ################################### + +# The Kvrocks Perf Log is a mechanism to log queries' performance context that +# exceeded a specified execution time. This mechanism uses rocksdb's +# Perf Context and IO Stats Context, Please see: +# https://github.com/facebook/rocksdb/wiki/Perf-Context-and-IO-Stats-Context +# +# This mechanism is enabled when profiling-sample-commands is not empty and +# profiling-sample-ratio greater than 0. +# It is important to note that this mechanism affects performance, but it is +# useful for troubleshooting performance bottlenecks, so it should only be +# enabled when performance problems occur. + +# The name of the commands you want to record. Must be original name of +# commands supported by Kvrocks. Use ',' to separate multiple commands and +# use '*' to record all commands supported by Kvrocks. +# Example: +# - Single command: profiling-sample-commands get +# - Multiple commands: profiling-sample-commands get,mget,hget +# +# Default: empty +# profiling-sample-commands "" + +# Ratio of the samples would be recorded. It is a number between 0 and 100. +# We simply use the rand to determine whether to record the sample or not. +# +# Default: 0 +profiling-sample-ratio 0 + +# There is no limit to this length. Just be aware that it will consume memory. +# You can reclaim memory used by the perf log with PERFLOG RESET. +# +# Default: 256 +profiling-sample-record-max-len 256 + +# profiling-sample-record-threshold-ms use to tell the kvrocks when to record. +# +# Default: 100 millisecond +profiling-sample-record-threshold-ms 100 + +################################## CRON ################################### + +# Compact Scheduler, auto compact at schedule time +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. compact-cron 0 3,4 * * * +# would compact the db at 3am and 4am everyday +# compact-cron 0 3 * * * + +# The hour range that compaction checker would be active +# e.g. compaction-checker-range 0-7 means compaction checker would be worker between +# 0-7am every day. +# WARNING: this config option is deprecated and will be removed, +# please use compaction-checker-cron instead +# compaction-checker-range 0-7 + +# The time pattern that compaction checker would be active +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. compaction-checker-cron * 0-7 * * * means compaction checker would be worker between +# 0-7am every day. +compaction-checker-cron * 0-7 * * * + +# When the compaction checker is triggered, the db will periodically pick the SST file +# with the highest "deleted percentage" (i.e. the percentage of deleted keys in the SST +# file) to compact, in order to free disk space. +# However, if a specific SST file was created more than "force-compact-file-age" seconds +# ago, and its percentage of deleted keys is higher than +# "force-compact-file-min-deleted-percentage", it will be forcibly compacted as well. + +# Default: 172800 seconds; Range: [60, INT64_MAX]; +# force-compact-file-age 172800 +# Default: 10 %; Range: [1, 100]; +# force-compact-file-min-deleted-percentage 10 + +# Bgsave scheduler, auto bgsave at scheduled time +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. bgsave-cron 0 3,4 * * * +# would bgsave the db at 3am and 4am every day + +# Kvrocks doesn't store the key number directly. It needs to scan the DB and +# then retrieve the key number by using the dbsize scan command. +# The Dbsize scan scheduler auto-recalculates the estimated keys at scheduled time. +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. dbsize-scan-cron 0 * * * * +# would recalculate the keyspace infos of the db every hour. + +# Command renaming. +# +# It is possible to change the name of dangerous commands in a shared +# environment. For instance, the KEYS command may be renamed into something +# hard to guess so that it will still be available for internal-use tools +# but not available for general clients. +# +# Example: +# +# rename-command KEYS b840fc02d524045429941cc15f59e41cb7be6c52 +# +# It is also possible to completely kill a command by renaming it into +# an empty string: +# +# rename-command KEYS "" + +################################ MIGRATE ##################################### +# Slot migration supports two ways: +# - redis-command: Migrate data by redis serialization protocol(RESP). +# - raw-key-value: Migrate the raw key value data of the storage engine directly. +# This way eliminates the overhead of converting to the redis +# command, reduces resource consumption, improves migration +# efficiency, and can implement a finer rate limit. +# +# Default: redis-command +migrate-type raw-key-value + +# If the network bandwidth is completely consumed by the migration task, +# it will affect the availability of kvrocks. To avoid this situation, +# migrate-speed is adopted to limit the migrating speed. +# Migrating speed is limited by controlling the duration between sending data, +# the duration is calculated by: 1000000 * migrate-pipeline-size / migrate-speed (us). +# Value: [0,INT_MAX], 0 means no limit +# +# Default: 4096 +# changing migrate-speed to test out slow migrations +migrate-speed 1 + +# In order to reduce data transmission times and improve the efficiency of data migration, +# pipeline is adopted to send multiple data at once. Pipeline size can be set by this option. +# Value: [1, INT_MAX], it can't be 0 +# +# Default: 16 +# migrate-pipeline-size 16 +migrate-pipeline-size 1 + +# In order to reduce the write forbidden time during migrating slot, we will migrate the incremental +# data several times to reduce the amount of incremental data. Until the quantity of incremental +# data is reduced to a certain threshold, slot will be forbidden write. The threshold is set by +# this option. +# Value: [1, INT_MAX], it can't be 0 +# +# Default: 10000 +migrate-sequence-gap 10000 + +# The raw-key-value migration way uses batch for migration. This option sets the batch size +# for each migration. +# +# Default: 16kb +#migrate-batch-size-kb 16 +migrate-batch-size-kb 1 + +# Rate limit for migration based on raw-key-value, representing the maximum number of data +# that can be migrated per second. 0 means no limit. +# +# Default: 16M +#migrate-batch-rate-limit-mb 16 +migrate-batch-rate-limit-mb 1 + + +# If it is set to yes, kvrocks will skip the deallocation of block cache +# while closing the database to speed up the shutdown +# +# Default: no +# skip-block-cache-deallocation-on-close no + +################################ ROCKSDB ##################################### + +# Specify the capacity of column family block cache. A larger block cache +# may make requests faster while more keys would be cached. Max Size is 400*1024. +# Default: 4096MB +rocksdb.block_cache_size 4096 + +# Specify the type of cache used in the block cache. +# Accept value: "lru", "hcc" +# "lru" stands for the cache with the LRU(Least Recently Used) replacement policy. +# +# "hcc" stands for the Hyper Clock Cache, a lock-free cache alternative +# that offers much improved CPU efficiency vs. LRU cache under high parallel +# load or high contention. +# +# default lru +rocksdb.block_cache_type lru + +# A global cache for table-level rows in RocksDB. If almost always point +# lookups, enlarging row cache may improve read performance. Otherwise, +# if we enlarge this value, we can lessen metadata/subkey block cache size. +# +# Default: 0 (disabled) +rocksdb.row_cache_size 0 + +# Number of open files that can be used by the DB. You may need to +# increase this if your database has a large working set. Value -1 means +# files opened are always kept open. You can estimate number of files based +# on target_file_size_base and target_file_size_multiplier for level-based +# compaction. For universal-style compaction, you can usually set it to -1. +# Default: 8096 +rocksdb.max_open_files 8096 + +# Amount of data to build up in memory (backed by an unsorted log +# on disk) before converting to a sorted on-disk file. +# +# Larger values increase performance, especially during bulk loads. +# Up to max_write_buffer_number write buffers may be held in memory +# at the same time, +# so you may wish to adjust this parameter to control memory usage. +# Also, a larger write buffer will result in a longer recovery time +# the next time the database is opened. +# +# Note that write_buffer_size is enforced per column family. +# See db_write_buffer_size for sharing memory across column families. + +# default is 64MB +rocksdb.write_buffer_size 64 + +# Target file size for compaction, target file size for Level N can be calculated +# by target_file_size_base * (target_file_size_multiplier ^ (L-1)) +# +# Default: 128MB +rocksdb.target_file_size_base 128 + +# The maximum number of write buffers that are built up in memory. +# The default and the minimum number is 2, so that when 1 write buffer +# is being flushed to storage, new writes can continue to the other +# write buffer. +# If max_write_buffer_number > 3, writing will be slowed down to +# options.delayed_write_rate if we are writing to the last write buffer +# allowed. +rocksdb.max_write_buffer_number 4 + +# Maximum number of concurrent background jobs (compactions and flushes). +# For backwards compatibility we will set `max_background_jobs = +# max_background_compactions + max_background_flushes` in the case where user +# sets at least one of `max_background_compactions` or `max_background_flushes` +# (we replace -1 by 1 in case one option is unset). +rocksdb.max_background_jobs 4 + +# DEPRECATED: it is automatically decided based on the value of rocksdb.max_background_jobs +# Maximum number of concurrent background compaction jobs, submitted to +# the default LOW priority thread pool. +rocksdb.max_background_compactions -1 + +# DEPRECATED: it is automatically decided based on the value of rocksdb.max_background_jobs +# Maximum number of concurrent background memtable flush jobs, submitted by +# default to the HIGH priority thread pool. If the HIGH priority thread pool +# is configured to have zero threads, flush jobs will share the LOW priority +# thread pool with compaction jobs. +rocksdb.max_background_flushes -1 + +# This value represents the maximum number of threads that will +# concurrently perform a compaction job by breaking it into multiple, +# smaller ones that are run simultaneously. +# Default: 2 +rocksdb.max_subcompactions 2 + +# If enabled WAL records will be compressed before they are written. Only +# ZSTD (= kZSTD) is supported (until streaming support is adapted for other +# compression types). Compressed WAL records will be read in supported +# versions (>= RocksDB 7.4.0 for ZSTD) regardless of this setting when +# the WAL is read. +# +# Accept value: "no", "zstd" +# Default is no +rocksdb.wal_compression no + +# In order to limit the size of WALs, RocksDB uses DBOptions::max_total_wal_size +# as the trigger of column family flush. Once WALs exceed this size, RocksDB +# will start forcing the flush of column families to allow deletion of some +# oldest WALs. This config can be useful when column families are updated at +# non-uniform frequencies. If there's no size limit, users may need to keep +# really old WALs when the infrequently-updated column families hasn't flushed +# for a while. +# +# In kvrocks, we use multiple column families to store metadata, subkeys, etc. +# If users always use string type, but use list, hash and other complex data types +# infrequently, there will be a lot of old WALs if we don't set size limit +# (0 by default in rocksdb), because rocksdb will dynamically choose the WAL size +# limit to be [sum of all write_buffer_size * max_write_buffer_number] * 4 if set to 0. +# +# Moreover, you should increase this value if you already set rocksdb.write_buffer_size +# to a big value, to avoid influencing the effect of rocksdb.write_buffer_size and +# rocksdb.max_write_buffer_number. +# +# default is 512MB +rocksdb.max_total_wal_size 512 + +# Whether to print malloc stats together with rocksdb.stats when printing to LOG. +# +# Accepted values: "yes", "no" +# Default: yes +rocksdb.dump_malloc_stats yes + +# We implement the replication with rocksdb WAL, it would trigger full sync when the seq was out of range. +# wal_ttl_seconds and wal_size_limit_mb would affect how archived logs will be deleted. +# If WAL_ttl_seconds is not 0, then WAL files will be checked every WAL_ttl_seconds / 2 and those that +# are older than WAL_ttl_seconds will be deleted# +# +# Default: 3 Hours +rocksdb.wal_ttl_seconds 10800 + +# If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, +# WAL files will be checked every 10 min and if total size is greater +# then WAL_size_limit_MB, they will be deleted starting with the +# earliest until size_limit is met. All empty files will be deleted +# Default: 16GB +rocksdb.wal_size_limit_mb 16384 + +# Approximate size of user data packed per block. Note that the +# block size specified here corresponds to uncompressed data. The +# actual size of the unit read from disk may be smaller if +# compression is enabled. +# +# Default: 16KB +rocksdb.block_size 16384 + +# Indicating if we'd put index/filter blocks to the block cache +# +# Default: yes +rocksdb.cache_index_and_filter_blocks yes + +# Specify the compression to use. +# Accept value: "no", "snappy", "lz4", "zstd", "zlib" +# default snappy +rocksdb.compression snappy + +# Specify the compression level to use. It trades compression speed +# and ratio, might be useful when tuning for disk space. +# See details: https://github.com/facebook/rocksdb/wiki/Space-Tuning +# For zstd: valid range is from 1 (fastest) to 19 (best ratio), +# For zlib: valid range is from 1 (fastest) to 9 (best ratio), +# For lz4: adjusting the level influences the 'acceleration'. +# RocksDB sets a negative level to indicate acceleration directly, +# with more negative values indicating higher speed and less compression. +# Note: This setting is ignored for compression algorithms like Snappy that +# do not support variable compression levels. +# +# RocksDB Default: +# - zstd: 3 +# - zlib: Z_DEFAULT_COMPRESSION (currently -1) +# - kLZ4: -1 (i.e., `acceleration=1`; see `CompressionOptions::level` doc) +# For all others, RocksDB does not specify a compression level. +# If the compression type doesn't support the setting, it will be a no-op. +# +# Default: 32767 (RocksDB's generic default compression level. Internally +# it'll be translated to the default compression level specific to the +# compression library as mentioned above) +rocksdb.compression_level 32767 + +# If non-zero, we perform bigger reads when doing compaction. If you're +# running RocksDB on spinning disks, you should set this to at least 2MB. +# That way RocksDB's compaction is doing sequential instead of random reads. +# When non-zero, we also force new_table_reader_for_compaction_inputs to +# true. +# +# Default: 2 MB +rocksdb.compaction_readahead_size 2097152 + +# Enable compression from n levels of LSM-tree. +# By default compression is disabled for the first two levels (L0 and L1), +# because it may contain the frequently accessed data, so it'd be better +# to use uncompressed data to save the CPU. +# Value: [0, 7) (upper boundary is kvrocks maximum levels number) +# +# Default: 2 +rocksdb.compression_start_level 2 + +# he limited write rate to DB if soft_pending_compaction_bytes_limit or +# level0_slowdown_writes_trigger is triggered. + +# If the value is 0, we will infer a value from `rater_limiter` value +# if it is not empty, or 16MB if `rater_limiter` is empty. Note that +# if users change the rate in `rate_limiter` after DB is opened, +# `delayed_write_rate` won't be adjusted. +# +rocksdb.delayed_write_rate 0 +# If enable_pipelined_write is true, separate write thread queue is +# maintained for WAL write and memtable write. +# +# Default: no +rocksdb.enable_pipelined_write no + +# Soft limit on number of level-0 files. We start slowing down writes at this +# point. A value <0 means that no writing slow down will be triggered by +# number of files in level-0. +# +# Default: 20 +rocksdb.level0_slowdown_writes_trigger 20 + +# Maximum number of level-0 files. We stop writes at this point. +# +# Default: 40 +rocksdb.level0_stop_writes_trigger 40 + +# Number of files to trigger level-0 compaction. +# +# Default: 4 +rocksdb.level0_file_num_compaction_trigger 4 + +# if not zero, dump rocksdb.stats to LOG every stats_dump_period_sec +# +# Default: 0 +rocksdb.stats_dump_period_sec 0 + +# if yes, the auto compaction would be disabled, but the manual compaction remain works +# +# Default: no +rocksdb.disable_auto_compactions no + +# BlobDB(key-value separation) is essentially RocksDB for large-value use cases. +# Since 6.18.0, The new implementation is integrated into the RocksDB core. +# When set, large values (blobs) are written to separate blob files, and only +# pointers to them are stored in SST files. This can reduce write amplification +# for large-value use cases at the cost of introducing a level of indirection +# for reads. Please see: https://github.com/facebook/rocksdb/wiki/BlobDB. +# +# Note that when enable_blob_files is set to yes, BlobDB-related configuration +# items will take effect. +# +# Default: no +rocksdb.enable_blob_files no + +# The size of the smallest value to be stored separately in a blob file. Values +# which have an uncompressed size smaller than this threshold are stored alongside +# the keys in SST files in the usual fashion. +# +# Default: 4096 byte, 0 means that all values are stored in blob files +rocksdb.min_blob_size 4096 + +# The size limit for blob files. When writing blob files, a new file is +# opened once this limit is reached. +# +# Default: 268435456 bytes +rocksdb.blob_file_size 268435456 + +# Enables garbage collection of blobs. Valid blobs residing in blob files +# older than a cutoff get relocated to new files as they are encountered +# during compaction, which makes it possible to clean up blob files once +# they contain nothing but obsolete/garbage blobs. +# See also rocksdb.blob_garbage_collection_age_cutoff below. +# +# Default: yes +rocksdb.enable_blob_garbage_collection yes + +# The percentage cutoff in terms of blob file age for garbage collection. +# Blobs in the oldest N blob files will be relocated when encountered during +# compaction, where N = (garbage_collection_cutoff/100) * number_of_blob_files. +# Note that this value must belong to [0, 100]. +# +# Default: 25 +rocksdb.blob_garbage_collection_age_cutoff 25 + + +# The purpose of the following three options are to dynamically adjust the upper limit of +# the data that each layer can store according to the size of the different +# layers of the LSM. Enabling this option will bring some improvements in +# deletion efficiency and space amplification, but it will lose a certain +# amount of read performance. +# If you want to know more details about Levels' Target Size, you can read RocksDB wiki: +# https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#levels-target-size +# +# Default: yes +rocksdb.level_compaction_dynamic_level_bytes yes + +# The total file size of level-1 sst. +# +# Default: 268435456 bytes +rocksdb.max_bytes_for_level_base 268435456 + +# Multiplication factor for the total file size of L(n+1) layers. +# This option is a double type number in RocksDB, but kvrocks is +# not support the double data type number yet, so we use integer +# number instead of double currently. +# +# Default: 10 +rocksdb.max_bytes_for_level_multiplier 10 + +# This feature only takes effect in Iterators and MultiGet. +# If yes, RocksDB will try to read asynchronously and in parallel as much as possible to hide IO latency. +# In iterators, it will prefetch data asynchronously in the background for each file being iterated on. +# In MultiGet, it will read the necessary data blocks from those files in parallel as much as possible. + +# Default yes +rocksdb.read_options.async_io yes + +# If yes, the write will be flushed from the operating system +# buffer cache before the write is considered complete. +# If this flag is enabled, writes will be slower. +# If this flag is disabled, and the machine crashes, some recent +# writes may be lost. Note that if it is just the process that +# crashes (i.e., the machine does not reboot), no writes will be +# lost even if sync==false. +# +# Default: no +rocksdb.write_options.sync no + +# If yes, writes will not first go to the write ahead log, +# and the write may get lost after a crash. +# You must keep wal enabled if you use replication. +# +# Default: no +rocksdb.write_options.disable_wal no + +# If enabled and we need to wait or sleep for the write request, fails +# immediately. +# +# Default: no +rocksdb.write_options.no_slowdown no + +# If enabled, write requests are of lower priority if compaction is +# behind. In this case, no_slowdown = true, the request will be canceled +# immediately. Otherwise, it will be slowed down. +# The slowdown value is determined by RocksDB to guarantee +# it introduces minimum impacts to high priority writes. +# +# Default: no +rocksdb.write_options.low_pri no + +# If enabled, this writebatch will maintain the last insert positions of each +# memtable as hints in concurrent write. It can improve write performance +# in concurrent writes if keys in one writebatch are sequential. +# +# Default: no +rocksdb.write_options.memtable_insert_hint_per_batch no + + +# Support RocksDB auto-tune rate limiter for the background IO +# if enabled, Rate limiter will limit the compaction write if flush write is high +# Please see https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html +# +# Default: yes +rocksdb.rate_limiter_auto_tuned yes + +# If enabled, rocksdb will use partitioned full filters for each SST file. +# +# Default: yes +rocksdb.partition_filters yes + +# Enable this option will schedule the deletion of obsolete files in a background thread +# on iterator destruction. It can reduce the latency if there are many files to be removed. +# see https://github.com/facebook/rocksdb/wiki/IO#avoid-blocking-io +# +# Default: yes +# rocksdb.avoid_unnecessary_blocking_io yes + +# Specifies the maximum size in bytes for a write batch in RocksDB. +# If set to 0, there is no size limit for write batches. +# This option can help control memory usage and manage large WriteBatch operations more effectively. +# +# Default: 0 +# rocksdb.write_options.write_batch_max_bytes 0 + +# RocksDB will try to limit number of bytes in one compaction to be lower than this threshold. +# If set to 0, it will be sanitized to [25 * target_file_size_base] +# +# Default: 0 +rocksdb.max_compaction_bytes 0 + +################################ NAMESPACE ##################################### +# namespace.test change.me diff --git a/store/store.go b/store/store.go index 84951c37..c9b2c4ee 100644 --- a/store/store.go +++ b/store/store.go @@ -189,7 +189,11 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo // We want the most up to date queue. The oldCluster could have had updates to // the migration queue - clusterInfo.MigrationQueue = oldCluster.MigrationQueue + // clusterInfo.MigrationQueue = oldCluster.MigrationQueue + // TODO: bseto, need to remember why we had this. I think sometimes clusterInfo would have + // out of date info cause it comes from clonedData + // but then oldCluster would have the correct newer stuff cause it received a new + // migration recently. clusterInfo.Version.Add(1) clusterBytes, err := json.Marshal(clusterInfo) From b46026999eec7caa188c4c095c1b27b217b10dfb Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 14 Aug 2025 15:12:29 -0600 Subject: [PATCH 26/31] removes the nolock call --- controller/cluster.go | 15 +++++---------- store/store.go | 8 -------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index e91cab11..cd4290aa 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -391,10 +391,6 @@ func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { c.clusterMu.Unlock() } -func (c *ClusterChecker) updateClusterNoLock(cluster *store.Cluster) { - c.cluster = cluster -} - func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) { log := logger.Get().With( zap.String("namespace", c.namespace), @@ -436,7 +432,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to update the cluster", zap.Error(err)) return } - c.updateClusterNoLock(clonedCluster) + c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange) @@ -450,7 +446,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu } else { log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String())) } - c.updateClusterNoLock(clonedCluster) + c.updateCluster(clonedCluster) if clonedCluster.MigrationQueue.Available() { log.Info("Migration queue is not empty, migrating queue'd requests") err = clonedCluster.MigrateAvailableSlots(ctx) @@ -462,7 +458,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to update the cluster", zap.Error(err)) return } - c.updateClusterNoLock(clonedCluster) + c.updateCluster(clonedCluster) } default: clonedCluster.Shards[i].ClearMigrateState() @@ -470,7 +466,7 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu log.Error("Failed to update the cluster", zap.Error(err)) return } - c.updateClusterNoLock(clonedCluster) + c.updateCluster(clonedCluster) log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState)) } } @@ -492,12 +488,11 @@ func (c *ClusterChecker) migrationLoop() { continue } clonedCluster := c.cluster.Clone() + c.clusterMu.Unlock() if clonedCluster == nil { - c.clusterMu.Unlock() continue } c.tryUpdateMigrationStatus(c.ctx, clonedCluster) - c.clusterMu.Unlock() } } } diff --git a/store/store.go b/store/store.go index c9b2c4ee..df649810 100644 --- a/store/store.go +++ b/store/store.go @@ -187,14 +187,6 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo return fmt.Errorf("the cluster has been updated by others") } - // We want the most up to date queue. The oldCluster could have had updates to - // the migration queue - // clusterInfo.MigrationQueue = oldCluster.MigrationQueue - // TODO: bseto, need to remember why we had this. I think sometimes clusterInfo would have - // out of date info cause it comes from clonedData - // but then oldCluster would have the correct newer stuff cause it received a new - // migration recently. - clusterInfo.Version.Add(1) clusterBytes, err := json.Marshal(clusterInfo) if err != nil { From 64197ca4707398eea72a2f34e4f4c67948ec9781 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 14 Aug 2025 15:16:20 -0600 Subject: [PATCH 27/31] adds writer and reader for testing --- cmd/reader/main.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++ cmd/writer/main.go | 30 +++++++++++++++++--- 2 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 cmd/reader/main.go diff --git a/cmd/reader/main.go b/cmd/reader/main.go new file mode 100644 index 00000000..d847c6d6 --- /dev/null +++ b/cmd/reader/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "os" + "time" + + "github.com/apache/kvrocks-controller/logger" + "github.com/redis/rueidis" + "go.uber.org/zap" +) + +func main() { + // goal is to spam reading and client connections + for i := 0; i < 5; i++ { + go func() { + client, err := rueidis.NewClient( + rueidis.ClientOption{ + InitAddress: []string{"127.0.0.1:7770"}, + ShuffleInit: true, + ConnWriteTimeout: time.Millisecond * 300, + DisableCache: true, // client cache is not enabled on kvrocks + PipelineMultiplex: 5, + MaxFlushDelay: 50 * time.Microsecond, + AlwaysPipelining: true, + DisableTCPNoDelay: true, + DisableRetry: true, + }, + ) + if err != nil { + logger.Get().Error("unable to get rueidis client", zap.Error(err)) + return + } + ctx := context.Background() + for i := 0; i < 1000000; i++ { + _, err := hGetAll(ctx, time.Second, client, fmt.Sprintf("hello:%d", i)) + if err != nil { + logger.Get().Error("err calling hGetAll", zap.Error(err)) + } + } + }() + } + + fmt.Println("Press Enter to exit...") + bufio.NewReader(os.Stdin).ReadBytes('\n') + fmt.Println("Goodbye!") +} + +// hGetAll queries from a hashmap and returns all fields and values of the hashmap +// it returns a map of field names to their values as strings +// returns an empty map if the key does not exist +func hGetAll( + ctx context.Context, + timeout time.Duration, + client rueidis.Client, + key string, +) (map[string]string, error) { + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + cmd := client.B().Hgetall().Key(key).Build() + resp := client.Do(timeoutCtx, cmd) + data, err := resp.AsStrMap() + if err != nil { + return nil, err + } + + return data, nil +} diff --git a/cmd/writer/main.go b/cmd/writer/main.go index 31d738f3..e5abef71 100644 --- a/cmd/writer/main.go +++ b/cmd/writer/main.go @@ -14,9 +14,9 @@ import ( func main() { client, err := rueidis.NewClient( rueidis.ClientOption{ - InitAddress: []string{"127.0.0.1:7000"}, + InitAddress: []string{"127.0.0.1:7770"}, ShuffleInit: true, - ConnWriteTimeout: time.Millisecond * 100, + ConnWriteTimeout: time.Millisecond * 300, DisableCache: true, // client cache is not enabled on kvrocks PipelineMultiplex: 5, MaxFlushDelay: 50 * time.Microsecond, @@ -29,11 +29,33 @@ func main() { logger.Get().Error("unable to get rueidis client", zap.Error(err)) return } + ctx := context.Background() + payload := []byte("123123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789456789") + data := make(map[string][]byte) + cols := []string{} - hSetExpire(ctx, time.Second * 1, client, "hello", , data map[string][]byte, ttl time.Duration) + for i := 0; i < 100; i++ { + data[fmt.Sprintf("%d", i)] = payload + cols = append(cols, fmt.Sprintf("%d", i)) + } + + for i := 0; i < 1000000; i++ { + hSetExpire(ctx, time.Second*1, client, fmt.Sprintf("hello:%d", i), cols, data, time.Hour*24) + if i%10000 == 0 { + logger.Get().Info("inserted", zap.Int("num", i)) + } + } } -func hSetExpire(ctx context.Context, timeout time.Duration, client rueidis.Client, key string, cols []string, data map[string][]byte, ttl time.Duration) error { +func hSetExpire( + ctx context.Context, + timeout time.Duration, + client rueidis.Client, + key string, + cols []string, + data map[string][]byte, + ttl time.Duration, +) error { timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() From a473d0d58f01bef28bfe1b0fdfd314615d326031 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 14 Aug 2025 15:50:37 -0600 Subject: [PATCH 28/31] created some reader and writers, but can't reproduce out of order error --- cmd/reader/main.go | 1 + cmd/writer/main.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/reader/main.go b/cmd/reader/main.go index d847c6d6..83718c99 100644 --- a/cmd/reader/main.go +++ b/cmd/reader/main.go @@ -40,6 +40,7 @@ func main() { logger.Get().Error("err calling hGetAll", zap.Error(err)) } } + logger.Get().Info("done") }() } diff --git a/cmd/writer/main.go b/cmd/writer/main.go index e5abef71..65c167c3 100644 --- a/cmd/writer/main.go +++ b/cmd/writer/main.go @@ -40,7 +40,10 @@ func main() { } for i := 0; i < 1000000; i++ { - hSetExpire(ctx, time.Second*1, client, fmt.Sprintf("hello:%d", i), cols, data, time.Hour*24) + err := hSetExpire(ctx, time.Second*1, client, fmt.Sprintf("hello:%d", i), cols, data, time.Hour*24) + if err != nil { + logger.Get().Error("unable to hSetExpire", zap.Error(err)) + } if i%10000 == 0 { logger.Get().Info("inserted", zap.Int("num", i)) } From d8f3813e10b42241b8b32548dc3923154b34b9d6 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Thu, 14 Aug 2025 15:55:05 -0600 Subject: [PATCH 29/31] removes the reader and writer utils --- cmd/reader/main.go | 72 ---------------------------------------- cmd/writer/main.go | 82 ---------------------------------------------- 2 files changed, 154 deletions(-) delete mode 100644 cmd/reader/main.go delete mode 100644 cmd/writer/main.go diff --git a/cmd/reader/main.go b/cmd/reader/main.go deleted file mode 100644 index 83718c99..00000000 --- a/cmd/reader/main.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -import ( - "bufio" - "context" - "fmt" - "os" - "time" - - "github.com/apache/kvrocks-controller/logger" - "github.com/redis/rueidis" - "go.uber.org/zap" -) - -func main() { - // goal is to spam reading and client connections - for i := 0; i < 5; i++ { - go func() { - client, err := rueidis.NewClient( - rueidis.ClientOption{ - InitAddress: []string{"127.0.0.1:7770"}, - ShuffleInit: true, - ConnWriteTimeout: time.Millisecond * 300, - DisableCache: true, // client cache is not enabled on kvrocks - PipelineMultiplex: 5, - MaxFlushDelay: 50 * time.Microsecond, - AlwaysPipelining: true, - DisableTCPNoDelay: true, - DisableRetry: true, - }, - ) - if err != nil { - logger.Get().Error("unable to get rueidis client", zap.Error(err)) - return - } - ctx := context.Background() - for i := 0; i < 1000000; i++ { - _, err := hGetAll(ctx, time.Second, client, fmt.Sprintf("hello:%d", i)) - if err != nil { - logger.Get().Error("err calling hGetAll", zap.Error(err)) - } - } - logger.Get().Info("done") - }() - } - - fmt.Println("Press Enter to exit...") - bufio.NewReader(os.Stdin).ReadBytes('\n') - fmt.Println("Goodbye!") -} - -// hGetAll queries from a hashmap and returns all fields and values of the hashmap -// it returns a map of field names to their values as strings -// returns an empty map if the key does not exist -func hGetAll( - ctx context.Context, - timeout time.Duration, - client rueidis.Client, - key string, -) (map[string]string, error) { - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - cmd := client.B().Hgetall().Key(key).Build() - resp := client.Do(timeoutCtx, cmd) - data, err := resp.AsStrMap() - if err != nil { - return nil, err - } - - return data, nil -} diff --git a/cmd/writer/main.go b/cmd/writer/main.go deleted file mode 100644 index 65c167c3..00000000 --- a/cmd/writer/main.go +++ /dev/null @@ -1,82 +0,0 @@ -package main - -import ( - "context" - "fmt" - "strconv" - "time" - - "github.com/apache/kvrocks-controller/logger" - "github.com/redis/rueidis" - "go.uber.org/zap" -) - -func main() { - client, err := rueidis.NewClient( - rueidis.ClientOption{ - InitAddress: []string{"127.0.0.1:7770"}, - ShuffleInit: true, - ConnWriteTimeout: time.Millisecond * 300, - DisableCache: true, // client cache is not enabled on kvrocks - PipelineMultiplex: 5, - MaxFlushDelay: 50 * time.Microsecond, - AlwaysPipelining: true, - DisableTCPNoDelay: true, - DisableRetry: true, - }, - ) - if err != nil { - logger.Get().Error("unable to get rueidis client", zap.Error(err)) - return - } - ctx := context.Background() - payload := []byte("123123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789456789") - data := make(map[string][]byte) - cols := []string{} - - for i := 0; i < 100; i++ { - data[fmt.Sprintf("%d", i)] = payload - cols = append(cols, fmt.Sprintf("%d", i)) - } - - for i := 0; i < 1000000; i++ { - err := hSetExpire(ctx, time.Second*1, client, fmt.Sprintf("hello:%d", i), cols, data, time.Hour*24) - if err != nil { - logger.Get().Error("unable to hSetExpire", zap.Error(err)) - } - if i%10000 == 0 { - logger.Get().Info("inserted", zap.Int("num", i)) - } - } -} - -func hSetExpire( - ctx context.Context, - timeout time.Duration, - client rueidis.Client, - key string, - cols []string, - data map[string][]byte, - ttl time.Duration, -) error { - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - convertedSlice := make([]string, 0, len(cols)*2) - for _, col := range cols { - if _, ok := data[col]; !ok { - return fmt.Errorf("field %s not found in data", col) - } - convertedSlice = append(convertedSlice, col, string(data[col])) - } - - cmd := client.B(). - Arbitrary("HSETEXPIRE"). - Keys(key). - Args(strconv.Itoa(int(ttl.Seconds()))). - Args(convertedSlice...). - Build() - resp := client.Do(timeoutCtx, cmd) - err := resp.Error() - return err -} From 81c5803f2fab6ff025387456b98ba7233f3e6d4b Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Wed, 19 Nov 2025 10:59:57 -0700 Subject: [PATCH 30/31] adds a delay to the syncClusterToNodes --- controller/cluster.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index cd4290aa..da0904cf 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -254,9 +254,12 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { return err } version := clusterInfo.Version.Load() + operationCount := 0 + staggerDelay := 1000 * time.Millisecond for _, shard := range clusterInfo.Shards { for _, node := range shard.Nodes { - go func(n store.Node) { + go func(n store.Node, delay time.Duration) { + time.Sleep(delay) log := logger.Get().With( zap.String("namespace", c.namespace), zap.String("cluster", c.clusterName), @@ -269,7 +272,8 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { } else { log.Info("Succeed to sync the cluster topology to the node") } - }(node) + }(node, time.Duration(operationCount)*staggerDelay) + operationCount++ } } return nil From 6105e71b60ec7bdf40dffefce158d1cf80b99223 Mon Sep 17 00:00:00 2001 From: Byron Seto Date: Wed, 19 Nov 2025 16:39:04 -0700 Subject: [PATCH 31/31] adds delay to sync cluster nodes --- controller/cluster.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index da0904cf..fb76bad7 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -255,7 +255,7 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { } version := clusterInfo.Version.Load() operationCount := 0 - staggerDelay := 1000 * time.Millisecond + staggerDelay := 5000 * time.Millisecond for _, shard := range clusterInfo.Shards { for _, node := range shard.Nodes { go func(n store.Node, delay time.Duration) { @@ -285,10 +285,13 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store. var latestClusterNodesStr string var wg sync.WaitGroup + operationCount := 0 + staggerDelay := 5000 * time.Millisecond for i, shard := range cluster.Shards { for _, node := range shard.Nodes { wg.Add(1) - go func(shardIdx int, n store.Node) { + go func(shardIdx int, n store.Node, delay time.Duration) { + time.Sleep(delay) defer wg.Done() log := logger.Get().With( zap.String("id", n.ID()), @@ -336,7 +339,8 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store. mu.Unlock() } c.resetFailureCount(n.ID()) - }(i, node) + }(i, node, time.Duration(operationCount)*staggerDelay) + operationCount++ } }