Skip to content

Commit a7097bd

Browse files
committed
add support for memdb
Signed-off-by: Marcin Fabrykowski <git@fabrykowski.pl>
1 parent a3585ed commit a7097bd

File tree

11 files changed

+340
-207
lines changed

11 files changed

+340
-207
lines changed

dist/examples/config.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ tls = false
1515
[etcd3]
1616
endpoints = [ "http://127.0.0.1:2379" ]
1717

18+
# MemoryDB configuration
19+
[memdb]
20+
enabled = false
21+
1822
# Lock configuration, base reboot group
1923
[lock]
2024
default_group_name = "default"

internal/cli/common.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,11 @@ func verbosityLevel(verbCount int) logrus.Level {
8282

8383
// validateSettings sanity-checks all settings
8484
func validateSettings(cfg config.Settings) error {
85-
if len(cfg.EtcdEndpoints) == 0 {
86-
return errors.New("no etcd3 endpoints configured")
85+
if len(cfg.EtcdEndpoints) == 0 && !cfg.MemDBEnabled {
86+
return errors.New("no etcd3 endpoints configured and MemDB is not enabled")
87+
}
88+
if len(cfg.EtcdEndpoints) > 0 && cfg.MemDBEnabled {
89+
return errors.New("both etcd3 endpoints and MemDB are configured, choose one")
8790
}
8891
if len(cfg.LockGroups) == 0 {
8992
return errors.New("no lock-groups configured")

internal/cli/ex-get-slots.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func runGetSlots(cmd *cobra.Command, cmdArgs []string) error {
3333
ctx, cancel := context.WithTimeout(context.Background(), runSettings.EtcdTxnTimeout)
3434
defer cancel()
3535

36-
manager, err := lock.NewManager(ctx, runSettings.EtcdEndpoints, runSettings.ClientCertPubPath, runSettings.ClientCertKeyPath, runSettings.EtcdTxnTimeout, group, maxSlots)
36+
manager, err := lock.NewManager(ctx, runSettings, group, maxSlots)
3737
if err != nil {
3838
return err
3939
}

internal/config/settings.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type Settings struct {
2121
ClientCertKeyPath string
2222
EtcdTxnTimeout time.Duration
2323

24+
MemDBEnabled bool
25+
2426
LockGroups map[string]uint64
2527
}
2628

@@ -55,6 +57,8 @@ func defaultSettings() Settings {
5557
EtcdEndpoints: []string{},
5658
EtcdTxnTimeout: time.Duration(3) * time.Second,
5759

60+
MemDBEnabled: false,
61+
5862
LockGroups: make(map[string]uint64),
5963
}
6064
}

internal/config/toml.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type tomlConfig struct {
1111
Service *serviceSection `toml:"service"`
1212
Status *statusSection `toml:"status"`
1313
Etcd3 *etcd3Section `toml:"etcd3"`
14+
MemDB *MemDBSection `toml:"memdb"`
1415
Lock *lockSection `toml:"lock"`
1516
}
1617

@@ -37,6 +38,11 @@ type etcd3Section struct {
3738
ClientCertKeyPath string `toml:"client_cert_key_path"`
3839
}
3940

41+
// MemDBSection holds the optional `memdb` fragment
42+
type MemDBSection struct {
43+
Enabled *bool `toml:"enabled"`
44+
}
45+
4046
// lockSection holds the optional `lock` fragment
4147
type lockSection struct {
4248
DefaultGroupName *string `toml:"default_group_name"`
@@ -77,6 +83,9 @@ func mergeToml(settings *Settings, cfg tomlConfig) {
7783
if cfg.Etcd3 != nil {
7884
mergeEtcd(settings, *cfg.Etcd3)
7985
}
86+
if cfg.MemDB != nil {
87+
mergeMemDB(settings, *cfg.MemDB)
88+
}
8089
if cfg.Lock != nil {
8190
mergeLock(settings, *cfg.Lock)
8291
}
@@ -136,6 +145,16 @@ func mergeEtcd(settings *Settings, cfg etcd3Section) {
136145
}
137146
}
138147

148+
func mergeMemDB(settings *Settings, cfg MemDBSection) {
149+
if settings == nil {
150+
return
151+
}
152+
153+
if cfg.Enabled != nil {
154+
settings.MemDBEnabled = *cfg.Enabled
155+
}
156+
}
157+
139158
func mergeLock(settings *Settings, cfg lockSection) {
140159
if settings == nil {
141160
return

internal/lock/etcdlock.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package lock
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"net/url"
9+
"time"
10+
11+
transport "go.etcd.io/etcd/client/pkg/v3/transport"
12+
clientv3 "go.etcd.io/etcd/client/v3"
13+
)
14+
15+
const (
16+
keyTemplate = "com.coreos.airlock/groups/%s/v1/semaphore"
17+
)
18+
19+
var (
20+
// ErrNilEtcdManager is returned on nil manager
21+
ErrNilEtcdManager = errors.New("nil EtcdManager")
22+
)
23+
24+
// EtcdManager takes care of locking for clients
25+
type EtcdManager struct {
26+
client *clientv3.Client
27+
keyPath string
28+
}
29+
30+
// NewEtcdManager returns a new lock manager, ensuring the underlying semaphore is initialized.
31+
func NewEtcdManager(ctx context.Context, etcdURLs []string, certPubPath string, certKeyPath string, txnTimeoutMs time.Duration, group string, slots uint64) (*EtcdManager, error) {
32+
tlsInfo := transport.TLSInfo{
33+
CertFile: certPubPath,
34+
KeyFile: certKeyPath,
35+
}
36+
37+
tlsConfig, err := tlsInfo.ClientConfig()
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
client, err := clientv3.New(clientv3.Config{
43+
Endpoints: etcdURLs,
44+
DialTimeout: time.Duration(txnTimeoutMs) * time.Millisecond,
45+
TLS: tlsConfig,
46+
})
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
keyPath := fmt.Sprintf(keyTemplate, url.QueryEscape(group))
52+
manager := EtcdManager{client, keyPath}
53+
54+
if err := manager.ensureInit(ctx, slots); err != nil {
55+
return nil, err
56+
}
57+
58+
return &manager, nil
59+
}
60+
61+
// RecursiveLock adds this lock `id` as a holder of the semaphore
62+
//
63+
// It will return an error if there is a problem getting or setting the
64+
// semaphore, or if the maximum number of holders has been reached.
65+
func (m *EtcdManager) RecursiveLock(ctx context.Context, id string) (*Semaphore, error) {
66+
sem, version, err := m.get(ctx)
67+
if err != nil {
68+
return nil, err
69+
}
70+
71+
held, err := sem.RecursiveLock(id)
72+
if err != nil {
73+
return nil, err
74+
}
75+
if held {
76+
return sem, nil
77+
}
78+
79+
if err := m.set(ctx, sem, version); err != nil {
80+
return nil, err
81+
}
82+
83+
return sem, nil
84+
}
85+
86+
// UnlockIfHeld removes this lock `id` as a holder of the semaphore
87+
//
88+
// It returns an error if there is a problem getting or setting the semaphore.
89+
func (m *EtcdManager) UnlockIfHeld(ctx context.Context, id string) (*Semaphore, error) {
90+
sem, version, err := m.get(ctx)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
if err := sem.UnlockIfHeld(id); err != nil {
96+
return nil, err
97+
}
98+
99+
if err := m.set(ctx, sem, version); err != nil {
100+
return nil, err
101+
}
102+
103+
return sem, nil
104+
}
105+
106+
// FetchSemaphore fetches current semaphore version
107+
func (m *EtcdManager) FetchSemaphore(ctx context.Context) (*Semaphore, error) {
108+
semaphore, _, err := m.get(ctx)
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
return semaphore, nil
114+
}
115+
116+
// Close reaps all running goroutines
117+
func (m *EtcdManager) Close() {
118+
if m == nil {
119+
return
120+
}
121+
122+
m.client.Close()
123+
}
124+
125+
// ensureInit initialize the semaphore in etcd, if it does not exist yet
126+
func (m *EtcdManager) ensureInit(ctx context.Context, slots uint64) error {
127+
if m == nil {
128+
return ErrNilEtcdManager
129+
}
130+
131+
sem := NewSemaphore(slots)
132+
semValue, err := sem.String()
133+
if err != nil {
134+
return err
135+
}
136+
137+
_, err = m.client.Txn(ctx).If(
138+
// version=0 means that the key does not exist.
139+
clientv3.Compare(clientv3.Version(m.keyPath), "=", 0),
140+
).Then(
141+
clientv3.OpPut(m.keyPath, semValue),
142+
).Commit()
143+
144+
if err != nil {
145+
return err
146+
}
147+
return nil
148+
}
149+
150+
// get returns the current semaphore value and version, or an error
151+
func (m *EtcdManager) get(ctx context.Context) (*Semaphore, int64, error) {
152+
resp, err := m.client.Get(ctx, m.keyPath)
153+
if err != nil {
154+
return nil, 0, err
155+
}
156+
if resp.Count != 1 {
157+
return nil, 0, fmt.Errorf("unexpected number of results: %d", resp.Count)
158+
}
159+
160+
var data []byte
161+
var version int64
162+
for _, kv := range resp.Kvs {
163+
data = kv.Value
164+
version = kv.Version
165+
break
166+
}
167+
if version == 0 {
168+
return nil, 0, errors.New("key at version 0")
169+
}
170+
if len(data) == 0 {
171+
return nil, 0, errors.New("empty semaphore value")
172+
}
173+
174+
sem := &Semaphore{}
175+
err = json.Unmarshal(data, sem)
176+
if err != nil {
177+
return nil, 0, err
178+
}
179+
180+
return sem, version, nil
181+
}
182+
183+
// set updates the semaphore in etcd, if `version` matches the one previously observed
184+
func (m *EtcdManager) set(ctx context.Context, sem *Semaphore, version int64) error {
185+
if m == nil {
186+
return ErrNilEtcdManager
187+
}
188+
if sem == nil {
189+
return ErrNilSemaphore
190+
}
191+
192+
data, err := json.Marshal(sem)
193+
if err != nil {
194+
return err
195+
}
196+
197+
// Conditionally Put if version in etcd is still the same we observed.
198+
// If the condition is not met, the transaction will return as "not succeeding".
199+
resp, err := m.client.Txn(ctx).If(
200+
clientv3.Compare(clientv3.Version(m.keyPath), "=", version),
201+
).Then(
202+
clientv3.OpPut(m.keyPath, string(data)),
203+
).Commit()
204+
205+
if err != nil {
206+
return err
207+
}
208+
if !resp.Succeeded {
209+
return errors.New("conflict on semaphore detected, aborting")
210+
}
211+
212+
return nil
213+
}

0 commit comments

Comments
 (0)