Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5c1c181
adds all config files to var/lib/kvctl
bseto Jul 16, 2025
ef5be08
needs consul0 for local dev
bseto Jul 16, 2025
dacb630
adds some more things required for local development
bseto Jul 16, 2025
0d4ee9e
adds a queue so we can find requests that can't be made yet and queue…
bseto Jul 24, 2025
102bb46
changes the config-consul.yaml back to what it was before
bseto Jul 24, 2025
0d58272
changes slot to slot range instead of ranges
bseto Jul 25, 2025
8bf5bb8
doing some debugging
bseto Jul 31, 2025
10c368c
removes old logging
bseto Jul 31, 2025
cd45e8f
adds additional kvrocks nodes to test parallel and sequential migrations
bseto Aug 1, 2025
c6c1e19
fixes up some tests
bseto Aug 1, 2025
7323879
updates log info
bseto Aug 1, 2025
165dece
comments out migrateslot test
bseto Aug 1, 2025
881cde2
fixed test
bseto Aug 1, 2025
d46121f
adds old test back in
bseto Aug 1, 2025
5e91e3e
removes external docker network and removes some logging
bseto Aug 1, 2025
323879d
removes some more logging
bseto Aug 2, 2025
1e7158f
adds a way to delete migration queue
bseto Aug 4, 2025
f9eb3b9
adds a way to cancel migration queue
bseto Aug 5, 2025
0f94db9
removes some old logging
bseto Aug 5, 2025
2a31dc4
adds a CanMigrate function to check if migrations are possible
bseto Aug 6, 2025
45636e8
changes cancel to be clear instead
bseto Aug 12, 2025
6aa2c67
adds a new starting hook for clusterChecker
bseto Aug 12, 2025
9b5e171
adds another hook that initializes the clusterInfo first
bseto Aug 12, 2025
dcd45e0
adds more comment to initializeClusterInfo
bseto Aug 12, 2025
bf69c6f
adds a writer to load the cluster with
bseto Aug 14, 2025
b460269
removes the nolock call
bseto Aug 14, 2025
64197ca
adds writer and reader for testing
bseto Aug 14, 2025
a473d0d
created some reader and writers, but can't reproduce out of order error
bseto Aug 14, 2025
d8f3813
removes the reader and writer utils
bseto Aug 14, 2025
81c5803
adds a delay to the syncClusterToNodes
bseto Nov 19, 2025
6105e71
adds delay to sync cluster nodes
bseto Nov 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ COPY --from=build /kvctl/_build/kvctl ./bin/
VOLUME /var/lib/kvctl

COPY ./LICENSE ./NOTICE ./licenses ./
COPY ./config/config.yaml /var/lib/kvctl/
COPY ./config/* /var/lib/kvctl/

EXPOSE 9379:9379
ENTRYPOINT ["./bin/kvctl-server", "-c", "/var/lib/kvctl/config.yaml"]
5 changes: 5 additions & 0 deletions cmd/client/command/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ const (
ResourceShard = "shard"
ResourceNode = "node"
)

const (
MigrateSlot = "slot"
MigrateClear = "clear"
)
44 changes: 37 additions & 7 deletions cmd/client/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ var MigrateCommand = &cobra.Command{
Example: `
# Migrate slot between cluster shards
kvctl migrate slot <slot> --target <target_shard_index> -n <namespace> -c <cluster>

# Clear the queue for migration - does not stop the current migration
kvctl migrate clear -n <namespace> -c <cluster>
`,
PreRunE: migrationPreRun,
RunE: func(cmd *cobra.Command, args []string) error {
host, _ := cmd.Flags().GetString("host")
client := newClient(host)
resource := strings.ToLower(args[0])
switch resource {
case "slot":
case MigrateSlot:
return migrateSlot(client, &migrateOptions)
case MigrateClear:
return migrateClear(client, &migrateOptions)
default:
return fmt.Errorf("unsupported resource type: %s", resource)
}
Expand All @@ -63,10 +68,41 @@ kvctl migrate slot <slot> --target <target_shard_index> -n <namespace> -c <clust
SilenceErrors: true,
}

func migrateClear(client *client, options *MigrationOptions) error {
rsp, err := client.restyCli.R().
SetPathParam("namespace", options.namespace).
SetPathParam("cluster", options.cluster).
SetBody(map[string]interface{}{
"slot": options.slot,
"target": options.target,
"slotOnly": strconv.FormatBool(options.slotOnly),
}).
Delete("/namespaces/{namespace}/clusters/{cluster}/migrate")
if err != nil {
return err
}
if rsp.IsError() {
return errors.New(rsp.String())
}
printLine("migrate slot[%s] task is submitted successfully.", options.slot)
return nil
}

func migrationPreRun(_ *cobra.Command, args []string) error {
if len(args) < 1 {
return fmt.Errorf("resource type should be specified")
}
resource := strings.ToLower(args[0])
if migrateOptions.namespace == "" {
return fmt.Errorf("namespace is required, please specify with -n or --namespace")
}
if migrateOptions.cluster == "" {
return fmt.Errorf("cluster is required, please specify with -c or --cluster")
}
// for migrate clear, we only need namespace and cluster
if resource == MigrateClear {
return nil
}
if len(args) < 2 {
return fmt.Errorf("the slot number should be specified")
}
Expand All @@ -76,12 +112,6 @@ func migrationPreRun(_ *cobra.Command, args []string) error {
}
migrateOptions.slot = args[1]

if migrateOptions.namespace == "" {
return fmt.Errorf("namespace is required, please specify with -n or --namespace")
}
if migrateOptions.cluster == "" {
return fmt.Errorf("cluster is required, please specify with -c or --cluster")
}
if migrateOptions.target < 0 {
return fmt.Errorf("target is required, please specify with --target")
}
Expand Down
49 changes: 49 additions & 0 deletions config/config-local-dev-consul.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

addr: "0.0.0.0:9379"

# Which store engine should be used by controller
# options: etcd, zookeeper, raft, consul
# Note: the raft engine is an experimental feature and is not recommended for production use.
#
# default: etcd
storage_type: consul

consul:
addrs:
- "consul0:8500"
elect_path:
tls:
enable: false
cert_file:
key_file:
ca_file:

controller:
failover:
ping_interval_seconds: 3
min_alive_size: 5
# Uncomment this part to save logs to filename instead of stdout
#log:
# level: info
# filename: /data/logs/kvctl.log
# max_backups: 10
# max_age: 7
# max_size: 100
# compress: false
1 change: 1 addition & 0 deletions consts/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ var (
ErrShardSlotIsMigrating = errors.New("shard slot is migrating")
ErrShardNoMatchNewMaster = errors.New("no match new master in shard")
ErrSlotStartAndStopEqual = errors.New("start and stop of a range cannot be equal")
ErrNoMigrationsAvailable = errors.New("no more migrations available in queue")
)
102 changes: 97 additions & 5 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package controller
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -58,9 +59,23 @@ type ClusterChecker struct {
ctx context.Context
cancelFn context.CancelFunc

startHooks []Hook

wg sync.WaitGroup
}

// HookParams, any writes/updates to the `cluster` variable will automatically
// be saved to the ClusterChecker after all the hooks are run
type HookParams struct {
options ClusterCheckOptions
clusterStore store.Store
cluster *store.Cluster
namespace string
clusterName string
}

type Hook func(context.Context, *HookParams) error

func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker {
ctx, cancel := context.WithCancel(context.Background())
c := &ClusterChecker{
Expand All @@ -78,16 +93,72 @@ func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker {
ctx: ctx,
cancelFn: cancel,
}
c.AddStartHook(InitializeClusterInfo()) // needs to be first to initialize cluster info
c.AddStartHook(MigrateAvailableSlots())
return c
}

func (c *ClusterChecker) Start() {
c.triggerStartHooks()

c.wg.Add(1)
go c.probeLoop()
c.wg.Add(1)
go c.migrationLoop()
}

func InitializeClusterInfo() Hook {
return func(ctx context.Context, params *HookParams) error {
clusterInfo, err := params.clusterStore.GetCluster(ctx, params.namespace, params.clusterName)
if err != nil {
return fmt.Errorf("failed to get the clusterName info from the clusterStore: %w", err)
}
params.cluster = clusterInfo
return nil
}
}

func MigrateAvailableSlots() Hook {
return func(ctx context.Context, params *HookParams) error {
if params.cluster.MigrationQueue.Available() {
err := params.cluster.MigrateAvailableSlots(ctx)
if err != nil {
return err
}
if err := params.clusterStore.SetCluster(ctx, params.namespace, params.cluster); err != nil {
return err
}
}
return nil
}
}

func (c *ClusterChecker) AddStartHook(hooks ...Hook) {
c.startHooks = append(c.startHooks, hooks...)
}

func (c *ClusterChecker) triggerStartHooks() {
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName))
c.clusterMu.Lock()
defer c.clusterMu.Unlock()

params := &HookParams{
options: c.options,
clusterStore: c.clusterStore,
cluster: c.cluster, // we have the clusterMu lock, so it's ok that hooks can access this cluster var directly
namespace: c.namespace,
clusterName: c.clusterName,
}
for _, hook := range c.startHooks {
err := hook(c.ctx, params)
if err != nil {
log.Error("start hook", zap.Error(err))
}
}
}

func (c *ClusterChecker) WithPingInterval(interval time.Duration) *ClusterChecker {
c.options.pingInterval = interval
if c.options.pingInterval < 200*time.Millisecond {
Expand Down Expand Up @@ -183,9 +254,12 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error {
return err
}
version := clusterInfo.Version.Load()
operationCount := 0
staggerDelay := 5000 * time.Millisecond
for _, shard := range clusterInfo.Shards {
for _, node := range shard.Nodes {
go func(n store.Node) {
go func(n store.Node, delay time.Duration) {
time.Sleep(delay)
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName),
Expand All @@ -198,7 +272,8 @@ func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error {
} else {
log.Info("Succeed to sync the cluster topology to the node")
}
}(node)
}(node, time.Duration(operationCount)*staggerDelay)
operationCount++
}
}
return nil
Expand All @@ -210,10 +285,13 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store.
var latestClusterNodesStr string
var wg sync.WaitGroup

operationCount := 0
staggerDelay := 5000 * time.Millisecond
for i, shard := range cluster.Shards {
for _, node := range shard.Nodes {
wg.Add(1)
go func(shardIdx int, n store.Node) {
go func(shardIdx int, n store.Node, delay time.Duration) {
time.Sleep(delay)
defer wg.Done()
log := logger.Get().With(
zap.String("id", n.ID()),
Expand Down Expand Up @@ -261,7 +339,8 @@ func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store.
mu.Unlock()
}
c.resetFailureCount(n.ID())
}(i, node)
}(i, node, time.Duration(operationCount)*staggerDelay)
operationCount++
}
}

Expand Down Expand Up @@ -302,8 +381,8 @@ func (c *ClusterChecker) probeLoop() {
}
c.clusterMu.Lock()
c.cluster = clusterInfo
c.clusterMu.Unlock()
c.parallelProbeNodes(c.ctx, clusterInfo)
c.clusterMu.Unlock()
case <-c.syncCh:
if err := c.syncClusterToNodes(c.ctx); err != nil {
log.Error("Failed to sync the clusterName to the nodes", zap.Error(err))
Expand Down Expand Up @@ -376,6 +455,19 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String()))
}
c.updateCluster(clonedCluster)
if clonedCluster.MigrationQueue.Available() {
log.Info("Migration queue is not empty, migrating queue'd requests")
err = clonedCluster.MigrateAvailableSlots(ctx)
if err != nil {
log.Error("Unable to trigger", zap.Error(err))
continue
}
if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
}
default:
clonedCluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil {
Expand Down
Loading
Loading