Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Added-20260109-223536.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: Enabled automatic etcd client ↔ server mode reconfiguration
time: 2026-01-09T22:35:36.887346+05:30
77 changes: 77 additions & 0 deletions server/internal/etcd/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (e *EmbeddedEtcd) Start(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.etcd != nil {
return nil // already started
}

initialized, err := e.IsInitialized()
if err != nil {
return err
Expand Down Expand Up @@ -292,6 +296,7 @@ func (e *EmbeddedEtcd) Shutdown() error {
}
if e.etcd != nil {
e.etcd.Close()
e.etcd = nil
}
return errors.Join(errs...)
}
Expand Down Expand Up @@ -443,6 +448,78 @@ func (e *EmbeddedEtcd) HealthCheck() common.ComponentStatus {
}
}

func (e *EmbeddedEtcd) ChangeMode(ctx context.Context, mode config.EtcdMode) (Etcd, error) {
if mode != config.EtcdModeClient {
return nil, fmt.Errorf("invalid mode transition from %s to %s", config.EtcdModeServer, mode)
}

if err := e.Start(ctx); err != nil {
return nil, err
}
Comment on lines +451 to +458
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the full embedded.go file to understand the structure and mutex usage
wc -l server/internal/etcd/embedded.go

Repository: pgEdge/control-plane

Length of output: 100


🏁 Script executed:

# Get the EmbeddedEtcd struct definition and relevant methods
sed -n '1,100p' server/internal/etcd/embedded.go

Repository: pgEdge/control-plane

Length of output: 2769


🏁 Script executed:

# Look at the mutex field and how it's used
rg -n "mu.*sync" server/internal/etcd/embedded.go

Repository: pgEdge/control-plane

Length of output: 90


🏁 Script executed:

# Search for the ChangeMode method and surrounding context
sed -n '440,480p' server/internal/etcd/embedded.go

Repository: pgEdge/control-plane

Length of output: 1088


🏁 Script executed:

# Check the Start() and Shutdown() methods to see mutex usage
ast-grep --pattern $'func ($_ *EmbeddedEtcd) Start($_) {
  $$$
}'

Repository: pgEdge/control-plane

Length of output: 46


🏁 Script executed:

# Check the Error() method implementation
sed -n '300,310p' server/internal/etcd/embedded.go

Repository: pgEdge/control-plane

Length of output: 264


🏁 Script executed:

# Search for all ChangeMode calls in the codebase
rg -n "ChangeMode" --type go -A 2 -B 2

Repository: pgEdge/control-plane

Length of output: 1871


🏁 Script executed:

# Get the full ChangeMode method (it continues beyond line 458)
sed -n '451,520p' server/internal/etcd/embedded.go

Repository: pgEdge/control-plane

Length of output: 2180


🏁 Script executed:

# Find the Shutdown method
rg -n "func.*Shutdown" server/internal/etcd/embedded.go -A 10

Repository: pgEdge/control-plane

Length of output: 383


🏁 Script executed:

# Check GetClient implementation
rg -n "func.*GetClient" server/internal/etcd/embedded.go -A 5

Repository: pgEdge/control-plane

Length of output: 222


🏁 Script executed:

# Look at the provide.go file context around ChangeMode call
sed -n '80,110p' server/internal/etcd/provide.go

Repository: pgEdge/control-plane

Length of output: 1077


Address missing mutex protection in ChangeMode and related methods.

ChangeMode creates a race condition: it calls e.Start() (which acquires and releases the mutex), then calls e.GetClient() and e.Shutdown() without holding the mutex. Since Shutdown() sets e.etcd = nil without protection, any concurrent call to Error() (which accesses e.etcd unsafely at line 304-306) would panic.

The broader issue is inconsistent synchronization in EmbeddedEtcd: Start() properly uses the mutex, but GetClient(), Error(), Shutdown(), and other methods access shared fields (e.etcd, e.client) without any lock, creating potential data races.

Either ensure at the caller level (in provide.go) that this instance receives no concurrent access during the transition, or add mutex protection to ChangeMode() and unprotected accessor methods throughout the class.

🤖 Prompt for AI Agents
In @server/internal/etcd/embedded.go around lines 451 - 458, ChangeMode on
EmbeddedEtcd has a race: it calls Start() (which locks) then invokes GetClient()
and Shutdown() without holding the mutex while Shutdown and Error access/modify
shared fields e.etcd and e.client unsafely; to fix, add mutex protection around
the entire mode transition in ChangeMode (acquire the same mutex used in
Start/Shutdown), and audit unprotected accessors GetClient, Error, Shutdown (and
any other methods touching e.etcd or e.client) to either take the mutex for
reads/writes or provide copy-on-read semantics; reference methods:
EmbeddedEtcd.ChangeMode, EmbeddedEtcd.Start, EmbeddedEtcd.GetClient,
EmbeddedEtcd.Shutdown, EmbeddedEtcd.Error and shared fields e.etcd/e.client when
making the change.


cfg := e.cfg.Config()

embeddedClient, err := e.GetClient()
if err != nil {
return nil, err
}

// Get the full member list before removing this host
resp, err := embeddedClient.MemberList(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list etcd members for server->client transition: %w", err)
}

var endpoints []string
for _, m := range resp.Members {
// Skip this host's member; we are about to remove it.
if m.Name == cfg.HostID {
continue
}
endpoints = append(endpoints, m.ClientURLs...)
}

if len(endpoints) == 0 {
return nil, fmt.Errorf("cannot demote etcd server on host %s: no remaining cluster members with client URLs", cfg.HostID)
}

generated := e.cfg.GeneratedConfig()
generated.EtcdClient.Endpoints = endpoints
if err := e.cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to update generated config with client endpoints: %w", err)
}

if err := e.Shutdown(); err != nil {
return nil, err
}

remote := NewRemoteEtcd(e.cfg, e.logger)
if err := remote.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start remote client: %w", err)
}

remoteClient, err := remote.GetClient()
if err != nil {
return nil, fmt.Errorf("failed to get remote client: %w", err)
}

if err := RemoveMember(ctx, remoteClient, cfg.HostID); err != nil {
return nil, fmt.Errorf("failed to remove embedded etcd from cluster: %w", err)
}

if err := os.RemoveAll(e.etcdDir()); err != nil {
return nil, fmt.Errorf("failed to remove embedded etcd data dir: %w", err)
}
Comment on lines +492 to +512
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential inconsistent state on partial failure during demotion.

Similar to the client→server transition, if operations fail after e.Shutdown() (e.g., remote.Start(), RemoveMember(), or os.RemoveAll()), the system may be left in an inconsistent state. Consider adding recovery logic or documenting the manual recovery steps.

For example, if RemoveMember fails, the local embedded etcd is shut down and data directory still exists, but the remote client is running. On restart, the generated config will still show server mode (since config update happens later), potentially causing issues.

🤖 Prompt for AI Agents
In @server/internal/etcd/embedded.go around lines 492 - 512, The demotion path
can leave the system inconsistent if steps after e.Shutdown() fail; add
recovery/rollback handling around NewRemoteEtcd/remote.Start(),
remote.GetClient(), RemoveMember(), and os.RemoveAll() so partial failures
restore the previous server state or perform safe cleanup: specifically, if
remote.Start() or remote.GetClient() fail, attempt to restart the embedded node
(call the inverse of e.Shutdown() or e.Start equivalent) or ensure config
reflects server mode; if RemoveMember() fails, try to stop the remote client and
restart embedded etcd (or persist a clear failure state) so the node remains
joinable; always ensure os.RemoveAll(e.etcdDir()) only runs after successful
cluster removal or, on its failure, leave data dir intact and surface an
actionable error; reference e.Shutdown(), NewRemoteEtcd(), remote.Start(),
remote.GetClient(), RemoveMember(), os.RemoveAll(), e.etcdDir(), and cfg.HostID
when implementing these rollback and state-consistency steps.


generated.EtcdMode = config.EtcdModeClient
generated.EtcdServer = config.EtcdServer{}
if err := e.cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to clear out etcd server settings in generated config: %w", err)
}
Comment on lines +492 to +518
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Partial failure leaves system in inconsistent state.

After Shutdown() succeeds, failures in subsequent steps leave the system in an unrecoverable state:

  • If remote.Start() fails: embedded is shut down, no client available.
  • If RemoveMember() fails: running as client but still a cluster member.
  • If os.RemoveAll() fails: orphaned data, but still demoted.
  • If the final UpdateGeneratedConfig() fails: config doesn't reflect actual state.

Consider either:

  1. Reordering operations to fail fast before irreversible changes (e.g., validate remote connectivity first).
  2. Adding best-effort rollback or recovery logging.
  3. Documenting that manual intervention may be required on partial failure.


return remote, nil
}

const maxLearnerStallTime = 5 * time.Minute

type learnerProgress struct {
Expand Down
2 changes: 2 additions & 0 deletions server/internal/etcd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pgEdge/control-plane/server/internal/common"
"github.com/pgEdge/control-plane/server/internal/config"
)

type ClusterMember struct {
Expand Down Expand Up @@ -50,4 +51,5 @@ type Etcd interface {
RemoveHost(ctx context.Context, hostID string) error
JoinToken() (string, error)
VerifyJoinToken(in string) error
ChangeMode(ctx context.Context, mode config.EtcdMode) (Etcd, error)
}
64 changes: 58 additions & 6 deletions server/internal/etcd/provide.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package etcd

import (
"context"
"fmt"
"time"

"github.com/rs/zerolog"
"github.com/samber/do"
Expand All @@ -27,6 +29,18 @@ func provideClient(i *do.Injector) {
})
}

// newEtcdForMode creates an Etcd instance based on the specified mode.
func newEtcdForMode(mode config.EtcdMode, cfg *config.Manager, logger zerolog.Logger) (Etcd, error) {
switch mode {
case config.EtcdModeServer:
return NewEmbeddedEtcd(cfg, logger), nil
case config.EtcdModeClient:
return NewRemoteEtcd(cfg, logger), nil
default:
return nil, fmt.Errorf("invalid etcd mode: %s", mode)
}
}

func provideEtcd(i *do.Injector) {
do.Provide(i, func(i *do.Injector) (Etcd, error) {
cfg, err := do.Invoke[*config.Manager](i)
Expand All @@ -38,13 +52,51 @@ func provideEtcd(i *do.Injector) {
return nil, err
}

switch storageType := cfg.Config().EtcdMode; storageType {
case config.EtcdModeServer:
return NewEmbeddedEtcd(cfg, logger), nil
case config.EtcdModeClient:
return NewRemoteEtcd(cfg, logger), nil
appCfg := cfg.Config()
generated := cfg.GeneratedConfig()

oldMode := generated.EtcdMode
newMode := appCfg.EtcdMode

logger.Info().
Str("old_mode", string(oldMode)).
Str("new_mode", string(newMode)).
Bool("old_mode_empty", oldMode == "").
Bool("modes_equal", oldMode == newMode).
Msg("checking etcd mode for reconfiguration")

// Mode has changed - perform reconfiguration.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

switch {
case oldMode == "" || oldMode == newMode:
etcd, err := newEtcdForMode(newMode, cfg, logger)
if err != nil {
return nil, err
}
initialized, err := etcd.IsInitialized()
if err != nil {
return nil, err
}
if initialized {
generated.EtcdMode = appCfg.EtcdMode
generated.EtcdServer = appCfg.EtcdServer
generated.EtcdClient = appCfg.EtcdClient
if err := cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to persist etcd configuration: %w", err)
}
}

return etcd, nil
case oldMode == config.EtcdModeServer && newMode == config.EtcdModeClient:
embedded := NewEmbeddedEtcd(cfg, logger)
return embedded.ChangeMode(ctx, newMode)
case oldMode == config.EtcdModeClient && newMode == config.EtcdModeServer:
remote := NewRemoteEtcd(cfg, logger)
return remote.ChangeMode(ctx, newMode)
default:
return nil, fmt.Errorf("invalid storage type: %s", storageType)
return nil, fmt.Errorf("unsupported etcd mode transition: %s -> %s", oldMode, newMode)
}
})
}
Expand Down
42 changes: 31 additions & 11 deletions server/internal/etcd/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,9 @@ func CreateHostCredentials(

if opts.EmbeddedEtcdEnabled {
// Create a cert for the peer server
serverPrincipal, err := certSvc.EtcdServer(ctx,
opts.HostID,
opts.Hostname,
[]string{"localhost", opts.Hostname},
[]string{"127.0.0.1", opts.IPv4Address},
)
if err != nil {
return nil, fmt.Errorf("failed to create cert for etcd server: %w", err)
if err := addEtcdServerCredentials(ctx, opts.HostID, opts.Hostname, opts.IPv4Address, certSvc, creds); err != nil {
return nil, err
}

creds.ServerCert = serverPrincipal.CertPEM
creds.ServerKey = serverPrincipal.KeyPEM
}

return creds, nil
Expand Down Expand Up @@ -353,9 +344,38 @@ func writeHostCredentials(creds *HostCredentials, cfg *config.Manager) error {
generatedCfg := cfg.GeneratedConfig()
generatedCfg.EtcdUsername = creds.Username
generatedCfg.EtcdPassword = creds.Password
generatedCfg.EtcdMode = appCfg.EtcdMode
generatedCfg.EtcdClient = appCfg.EtcdClient
generatedCfg.EtcdServer = appCfg.EtcdServer

if err := cfg.UpdateGeneratedConfig(generatedCfg); err != nil {
return fmt.Errorf("failed to update generated config: %w", err)
}

return nil
}

func addEtcdServerCredentials(
ctx context.Context,
hostID string,
hostname string,
ipv4Address string,
certSvc *certificates.Service,
creds *HostCredentials,
) error {
// Create a cert for the peer server
serverPrincipal, err := certSvc.EtcdServer(ctx,
hostID,
hostname,
[]string{"localhost", hostname},
[]string{"127.0.0.1", ipv4Address},
)
if err != nil {
return fmt.Errorf("failed to create cert for etcd server: %w", err)
}

creds.ServerCert = serverPrincipal.CertPEM
creds.ServerKey = serverPrincipal.KeyPEM

return nil
}
62 changes: 62 additions & 0 deletions server/internal/etcd/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,65 @@ func (r *RemoteEtcd) updateEndpointsConfig(ctx context.Context, client *clientv3

return nil
}

func (r *RemoteEtcd) ChangeMode(ctx context.Context, mode config.EtcdMode) (Etcd, error) {
if mode != config.EtcdModeServer {
return nil, fmt.Errorf("invalid mode transition from %s to %s", config.EtcdModeClient, mode)
}

if err := r.Start(ctx); err != nil {
return nil, err
}

cfg := r.cfg.Config()

clientPrincipal, err := r.certSvc.HostEtcdUser(ctx, cfg.HostID)
if err != nil {
return nil, fmt.Errorf("failed to get client principal: %w", err)
}

creds := &HostCredentials{
Username: cfg.EtcdUsername,
Password: cfg.EtcdPassword,
CaCert: r.certSvc.CACert(),
ClientCert: clientPrincipal.CertPEM,
ClientKey: clientPrincipal.KeyPEM,
}

if err := addEtcdServerCredentials(ctx, cfg.HostID, cfg.Hostname, cfg.IPv4Address, r.certSvc, creds); err != nil {
return nil, err
}

client, err := r.GetClient()
if err != nil {
return nil, err
}

leader, err := GetClusterLeader(ctx, client)
if err != nil {
return nil, fmt.Errorf("failed to get cluster leader: %w", err)
}

if err := r.Shutdown(); err != nil {
return nil, err
}

embedded := NewEmbeddedEtcd(r.cfg, r.logger)
err = embedded.Join(ctx, JoinOptions{
Leader: leader,
Credentials: creds,
})
if err != nil {
return nil, fmt.Errorf("failed to join embedded etcd to cluster: %w", err)
}

generated := r.cfg.GeneratedConfig()
generated.EtcdMode = config.EtcdModeServer
generated.EtcdClient = config.EtcdClient{}
generated.EtcdServer = cfg.EtcdServer
if err := r.cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to clear out etcd client settings in generated config: %w", err)
}

return embedded, nil
}