Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![License][license-badge]][license]
[![Build Status][build-badge]][build]

LeifDb a clustered K-V store application that implements [Raft] for consistency, in Go. It has an [OpenAPIv2.0]-compatible HTTP interface for client interaction, and serves the schema for the client interface at the root HTTP endpoint to allow clients to discover and use endpoints programatically. In the near future, it will also employ erasure codes to improve performance and storage footprint, as described in the [CRaft] paper (check out the [milestones](https://github.com/btmorr/leifdb/milestones) to check on progress).
LeifDb a clustered K-V store application that implements [Raft] for consistency, in Go. It has an [OpenAPIv2.0]-compatible HTTP interface for client interaction, and serves the schema for the client interface at the root HTTP endpoint to allow clients to discover and use endpoints programmatically. In the near future, it will also employ erasure codes to improve performance and storage footprint, as described in the [CRaft] paper (check out the [milestones](https://github.com/btmorr/leifdb/milestones) to check on progress).

The aim of this project is to build a distributed, consistent, fault-tolerant K-V store providing high throughput and minimizing storage footprint at large scale.

Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
- "8080:8080"
- "16990:16990"
environment:
LEIFDB_SNAPSHOT_THRESHOLD: 1024
LEIFDB_MODE: "multi"
LEIFDB_HOST: "leifdb0"
LEIFDB_MEMBER_NODES: "leifdb0:16990,leifdb1:16991,leifdb2:16992"
Expand All @@ -15,6 +16,7 @@ services:
- "8081:8081"
- "16991:16991"
environment:
LEIFDB_SNAPSHOT_THRESHOLD: 1024
LEIFDB_MODE: "multi"
LEIFDB_HOST: "leifdb1"
LEIFDB_HTTP_PORT: 8081
Expand All @@ -26,6 +28,7 @@ services:
- "8082:8082"
- "16992:16992"
environment:
LEIFDB_SNAPSHOT_THRESHOLD: 1024
LEIFDB_MODE: "multi"
LEIFDB_HOST: "leifdb2"
LEIFDB_HTTP_PORT: 8082
Expand Down
35 changes: 26 additions & 9 deletions internal/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,45 @@ type pair struct {
V string
}

// Metadata includes the index and term of the last log entry included in the
// snapshot
type Metadata struct {
LastIndex int64
LastTerm int64
}

type snapshot struct {
Records []pair
Metadata Metadata
}

// BuildSnapshot serializes the database state into a JSON array of objects
// with keys K and V and the key and value for each entry as respective values
func BuildSnapshot(db *Database) ([]byte, error) {
// and combines this with the provided metadata before returning the result
func BuildSnapshot(db *Database, metadata Metadata) ([]byte, error) {
accumulator := []pair{}
db.underlying.Root().Walk(func(key []byte, value interface{}) bool {
accumulator = append(accumulator, pair{K: string(key), V: value.(string)})
return false
})
return json.Marshal(accumulator)
s := snapshot{
Records: accumulator,
Metadata: metadata,
}
return json.Marshal(s)
}

// InstallSnapshot deserializes a JSON string (following the schema created by
// BuildSnapshot) and returns a populated Database
func InstallSnapshot(data []byte) (*Database, error) {
var pairs []pair
// BuildSnapshot) and returns a populated Database and Metadata
func InstallSnapshot(data []byte) (*Database, *Metadata, error) {
var s snapshot
db := NewDatabase()

if err := json.Unmarshal(data, &pairs); err != nil {
return nil, err
if err := json.Unmarshal(data, &s); err != nil {
return nil, nil, err
}
for _, p := range pairs {
for _, p := range s.Records {
db.Set(p.K, p.V)
}
return db, nil
return db, &s.Metadata, nil
}
11 changes: 9 additions & 2 deletions internal/database/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,22 @@ func TestSnapshotRoundtrip(t *testing.T) {
d0.Set("2", "two")
d0.Set("3", "three")

snapshot, err := BuildSnapshot(d0)
meta := Metadata{2, 1}
snapshot, err := BuildSnapshot(d0, meta)
if err != nil {
t.Errorf("Error in BuildSnapshot: %v\n", err)
}
d1, err := InstallSnapshot(snapshot)
d1, meta1, err := InstallSnapshot(snapshot)
if err != nil {
t.Errorf("Error in InstallSnapshot: %v\n", err)
}

if meta.LastTerm != meta1.LastTerm {
t.Errorf("Metadata term expected %d, got %d\n", meta.LastTerm, meta1.LastTerm)
}
if meta.LastIndex != meta1.LastIndex {
t.Errorf("Metadata index expected %d, got %d\n", meta.LastIndex, meta1.LastIndex)
}
for _, key := range keys {
v0 := d0.Get(key)
v1 := d1.Get(key)
Expand Down
46 changes: 31 additions & 15 deletions internal/mgmt/shapshotmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,19 @@ func findExistingSnapshots(dataDir string) ([]string, int) {
}

// cloneAndSerialize makes a copy of the current commit index and database
// state, then returns a serialized version of the snapshot and the commit
// index, or an error
func cloneAndSerialize(node *node.Node) ([]byte, int64, error) {
node.Lock()
commitIndex := node.CommitIndex
clone := db.Clone(node.Store)
node.Unlock()

snapshot, err := db.BuildSnapshot(clone)
return snapshot, commitIndex, err
// state, then returns a serialized version of the snapshot with metadata, or
// an error
func cloneAndSerialize(n *node.Node) ([]byte, db.Metadata, error) {
n.Lock()
commitIndex := n.CommitIndex
clone := db.Clone(n.Store)
n.Unlock()

lastTerm := n.Log.Entries[commitIndex].Term
metadata := db.Metadata{LastIndex: commitIndex, LastTerm: lastTerm}
snapshot, err := db.BuildSnapshot(clone, metadata)

return snapshot, metadata, err
}

// persist writes a byte array (the serialized snapshot) to disk
Expand Down Expand Up @@ -109,14 +112,21 @@ func loadSnapshot(n *node.Node, snapshotPath string) error {
return err
}

newStore, err := db.InstallSnapshot(data)
newStore, metadata, err := db.InstallSnapshot(data)
if err != nil {
return err
}
n.Store = newStore
n.IndexOffset = metadata.LastIndex
n.LastSnapshotTerm = metadata.LastTerm
return nil
}

// StartSnapshotManager checks for existing snapshots and installs the most
// recent, and then monitors for when a new snapshot needs to be created.
// When the criteria are met (node log larger than threshold), a new
// snapshot is created and written to disk, logs in excess of the retain
// parameter are dropped (oldest first), then the node's logs are compacted.
func StartSnapshotManager(
dataDir string,
logFile string,
Expand Down Expand Up @@ -150,16 +160,17 @@ func StartSnapshotManager(
Msg("snapshot check")

if size > threshold {
snapshot, commitIndex, err := cloneAndSerialize(n)
snapshot, metadata, err := cloneAndSerialize(n)
if err != nil {
log.Error().Err(err).Msg("error building snapshot")
continue
}
log.Debug().
Int64("commit index", commitIndex).
Int64("index", metadata.LastIndex).
Int64("term", metadata.LastTerm).
Msg("doing snapshot")

filename := fmt.Sprintf("%s%06d", prefix, nextIndex)
filename := fmt.Sprintf("%s%09d", prefix, nextIndex)
fullPath := filepath.Join(dataDir, filename)
err = persist(snapshot, fullPath)
if err != nil {
Expand All @@ -169,7 +180,12 @@ func StartSnapshotManager(

nextIndex++
snapshotFiles = append(snapshotFiles, fullPath)
// todo: trigger node log compaction
err = n.CompactLogs(metadata.LastIndex, metadata.LastTerm)
if err != nil {
// should this be fatal? (snapshot exists but logs didn't get compacted)
log.Error().Err(err).Msg("error compacting logs")
continue
}
}

snapshotFiles = dropOldSnapshots(snapshotFiles, retain)
Expand Down
28 changes: 15 additions & 13 deletions internal/mgmt/snapshotmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,38 @@ func TestFindExisting(t *testing.T) {

func TestCloneAndSerialize(t *testing.T) {
n := setupServer(t)
n.Store.Set("ice", "cream")
n.CommitIndex++
n.Store.Set("straw", "bale")
n.CommitIndex++
n.State = node.Leader
n.SetTerm(1, n.RaftNode)
n.Set("ice", "cream")
n.SetTerm(2, n.RaftNode)
n.Set("straw", "bale")

var snapshot []byte
var index int64
var metadata *db.Metadata
var err error
var wg sync.WaitGroup

wg.Add(1)
go func() {
snapshot, index, err = cloneAndSerialize(n)
snapshot, _, err = cloneAndSerialize(n)
wg.Done()
}()
// simulate raft write starting during clone
time.Sleep(time.Microsecond * 50)
n.Lock()
n.Store.Set("straw", "berry")
n.CommitIndex++
n.Unlock()
n.SetTerm(3, n.RaftNode)
n.Set("straw", "berry")

wg.Wait()
reconstituted, err := db.InstallSnapshot(snapshot)
reconstituted, metadata, err := db.InstallSnapshot(snapshot)
if err != nil {
t.Errorf("Error installing snapshot: %v\n", err)
}

if index != 1 {
t.Errorf("Next index after first snapshot should be 1, got %d\n", index)
if metadata.LastIndex != 1 {
t.Errorf("Last index should be 1, got %d\n", metadata.LastIndex)
}
if metadata.LastTerm != 2 {
t.Errorf("Last term should be 2, got %d\n", metadata.LastTerm)
}

ice := reconstituted.Get("ice")
Expand Down
Loading