Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions config/files.go

This file was deleted.

2 changes: 1 addition & 1 deletion example/start_nodes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ rm -rf /tmp/mokv3
bin/mokv --config-file=./example/config.yaml &
PID1=$!
echo "Started node 1 with PID $PID1"
sleep 5 # Wait 5 seconds for the bootstrap node to initialize
sleep 3 # Wait 3 seconds for the bootstrap node to initialize

# Start node 2 and wait for it to join
bin/mokv --config-file=./example/config2.yaml &
Expand Down
37 changes: 21 additions & 16 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type ServerProvider interface {
type RequestType uint8

const (
SetRequestType RequestType = 0
DeleteRequestType RequestType = 1
lenWidth = 8
RequestTypeSet RequestType = iota
RequestTypeDelete
lenWidth = 8
)

type BatchOperation struct {
Expand All @@ -45,14 +45,16 @@ type BatchOperation struct {
result chan error
}

type RaftConfig struct {
raft.Config
StreamLayer *StreamLayer
Bootstrap bool
BindAddr string
RPCPort string
}

type KVConfig struct {
Raft struct {
raft.Config
StreamLayer
Bootstrap bool
BindAddr string
RPCPort string
}
Raft RaftConfig
DataDir string
}

Expand All @@ -76,9 +78,10 @@ func New(store store.Storer, cfg *KVConfig) (*KV, error) {

func (kv *KV) Set(key string, value []byte) error {
// Replicate Set
_, err := kv.apply(SetRequestType, &api.SetRequest{Key: key, Value: value})
_, err := kv.apply(RequestTypeSet, &api.SetRequest{Key: key, Value: value})
if err != nil {
return fmt.Errorf("failed to apply replication setting key: %s, val: %s, from kv: %w",
return fmt.Errorf(
"failed to apply replication setting key: %s, val: %s, from kv: %w",
key, string(value), err,
)
}
Expand All @@ -87,7 +90,7 @@ func (kv *KV) Set(key string, value []byte) error {

func (kv *KV) Delete(key string) error {
// Replicate Delete
_, err := kv.apply(DeleteRequestType, &api.DeleteRequest{Key: key})
_, err := kv.apply(RequestTypeDelete, &api.DeleteRequest{Key: key})
if err != nil {
return fmt.Errorf("failed to apply replication deleting key: %s from kv: %w", key, err)
}
Expand Down Expand Up @@ -262,7 +265,7 @@ func (kv *KV) setupRaft(dataDir string) error {
maxPool := 5
timeout := 10 * time.Second
transport := raft.NewNetworkTransport(
&kv.cfg.Raft.StreamLayer,
kv.cfg.Raft.StreamLayer,
maxPool,
timeout,
os.Stderr,
Expand Down Expand Up @@ -333,9 +336,9 @@ var _ raft.FSM = (*fsm)(nil)
func (fsm *fsm) Apply(log *raft.Log) any {
reqType := RequestType(log.Data[0])
switch reqType {
case SetRequestType:
case RequestTypeSet:
return fsm.applySet(log.Data[1:])
case DeleteRequestType:
case RequestTypeDelete:
return fsm.applyDelete(log.Data[1:])
}
return nil
Expand Down Expand Up @@ -453,9 +456,11 @@ func (s *StreamLayer) Accept() (net.Conn, error) {
b := make([]byte, 1)
_, err = conn.Read(b)
if err != nil {
conn.Close()
return nil, err
}
if !bytes.Equal([]byte{byte(RaftRPC)}, b) {
conn.Close()
return nil, errors.New("not a raft rpc")
}
return conn, nil
Expand Down
6 changes: 3 additions & 3 deletions kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestDistributedKVReplication(t *testing.T) {
cfg1.Raft.RPCPort = "3000"
cfg1.Raft.LocalID = "node-1"
cfg1.Raft.Bootstrap = true
cfg1.Raft.StreamLayer = *kv.NewStreamLayer(raftLn1)
cfg1.Raft.StreamLayer = kv.NewStreamLayer(raftLn1)

node1, err := kv.New(store1, cfg1)
if err != nil {
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestDistributedKVReplication(t *testing.T) {
cfg2.Raft.RPCPort = "3001"
cfg2.Raft.LocalID = "node-2"
cfg2.Raft.Bootstrap = false
cfg2.Raft.StreamLayer = *kv.NewStreamLayer(raftLn2)
cfg2.Raft.StreamLayer = kv.NewStreamLayer(raftLn2)

node2, err := kv.New(store2, cfg2)
if err != nil {
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestDistributedKVReplication(t *testing.T) {
cfg3.Raft.RPCPort = "3002"
cfg3.Raft.LocalID = "node-3"
cfg3.Raft.Bootstrap = false
cfg3.Raft.StreamLayer = *kv.NewStreamLayer(raftLn3)
cfg3.Raft.StreamLayer = kv.NewStreamLayer(raftLn3)

node3, err := kv.New(store3, cfg3)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion mokv/mokv.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func New(cfg *Config, getEnv GetEnv) (*MOKV, error) {
grpcLn := myCmux.Match(cmux.Any())

// Setup Raft stream layer
kvCFG.Raft.StreamLayer = *kv.NewStreamLayer(raftLn)
kvCFG.Raft.StreamLayer = kv.NewStreamLayer(raftLn)

// Initialize store and KV
store := store.New()
Expand Down