From ab44c31d33010f6a0e0b81cdd659c5ae94346bac Mon Sep 17 00:00:00 2001 From: Mateo Presa Castro Date: Sun, 16 Nov 2025 12:04:13 +0100 Subject: [PATCH] chore: rm config dir, clean up code --- config/files.go | 29 ----------------------------- example/start_nodes.sh | 2 +- kv/kv.go | 37 +++++++++++++++++++++---------------- kv/kv_test.go | 6 +++--- mokv/mokv.go | 2 +- 5 files changed, 26 insertions(+), 50 deletions(-) delete mode 100644 config/files.go diff --git a/config/files.go b/config/files.go deleted file mode 100644 index f423433..0000000 --- a/config/files.go +++ /dev/null @@ -1,29 +0,0 @@ -package config - -import ( - "os" - "path/filepath" -) - -var ( - CAFile = configFile("ca.pem") - ServerCertFile = configFile("server.pem") - ServerKeyFile = configFile("server-key.pem") - RootClientCertFile = configFile("root-client.pem") - RootClientKeyFile = configFile("root-client-key.pem") - NobodyClientCertFile = configFile("nobody-client.pem") - NobodyClientKeyFile = configFile("nobody-client-key.pem") - ACLModelFile = configFile("model.conf") - ACLPolicyFile = configFile("policy.csv") -) - -func configFile(fileName string) string { - if dir := os.Getenv("CONFIG_DIR"); dir != "" { - return filepath.Join(dir, fileName) - } - homeDir, err := os.UserHomeDir() - if err != nil { - panic(err) - } - return filepath.Join(homeDir, ".mokv", fileName) -} diff --git a/example/start_nodes.sh b/example/start_nodes.sh index 33cba4f..7583260 100755 --- a/example/start_nodes.sh +++ b/example/start_nodes.sh @@ -8,7 +8,7 @@ rm -rf /tmp/mokv3 bin/mokv --config-file=./example/config.yaml & PID1=$! echo "Started node 1 with PID $PID1" -sleep 5 # Wait 5 seconds for the bootstrap node to initialize +sleep 3 # Wait 3 seconds for the bootstrap node to initialize # Start node 2 and wait for it to join bin/mokv --config-file=./example/config2.yaml & diff --git a/kv/kv.go b/kv/kv.go index 7d194ee..202fc6f 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -33,9 +33,9 @@ type ServerProvider interface { type RequestType uint8 const ( - SetRequestType RequestType = 0 - DeleteRequestType RequestType = 1 - lenWidth = 8 + RequestTypeSet RequestType = iota + RequestTypeDelete + lenWidth = 8 ) type BatchOperation struct { @@ -45,14 +45,16 @@ type BatchOperation struct { result chan error } +type RaftConfig struct { + raft.Config + StreamLayer *StreamLayer + Bootstrap bool + BindAddr string + RPCPort string +} + type KVConfig struct { - Raft struct { - raft.Config - StreamLayer - Bootstrap bool - BindAddr string - RPCPort string - } + Raft RaftConfig DataDir string } @@ -76,9 +78,10 @@ func New(store store.Storer, cfg *KVConfig) (*KV, error) { func (kv *KV) Set(key string, value []byte) error { // Replicate Set - _, err := kv.apply(SetRequestType, &api.SetRequest{Key: key, Value: value}) + _, err := kv.apply(RequestTypeSet, &api.SetRequest{Key: key, Value: value}) if err != nil { - return fmt.Errorf("failed to apply replication setting key: %s, val: %s, from kv: %w", + return fmt.Errorf( + "failed to apply replication setting key: %s, val: %s, from kv: %w", key, string(value), err, ) } @@ -87,7 +90,7 @@ func (kv *KV) Set(key string, value []byte) error { func (kv *KV) Delete(key string) error { // Replicate Delete - _, err := kv.apply(DeleteRequestType, &api.DeleteRequest{Key: key}) + _, err := kv.apply(RequestTypeDelete, &api.DeleteRequest{Key: key}) if err != nil { return fmt.Errorf("failed to apply replication deleting key: %s from kv: %w", key, err) } @@ -262,7 +265,7 @@ func (kv *KV) setupRaft(dataDir string) error { maxPool := 5 timeout := 10 * time.Second transport := raft.NewNetworkTransport( - &kv.cfg.Raft.StreamLayer, + kv.cfg.Raft.StreamLayer, maxPool, timeout, os.Stderr, @@ -333,9 +336,9 @@ var _ raft.FSM = (*fsm)(nil) func (fsm *fsm) Apply(log *raft.Log) any { reqType := RequestType(log.Data[0]) switch reqType { - case SetRequestType: + case RequestTypeSet: return fsm.applySet(log.Data[1:]) - case DeleteRequestType: + case RequestTypeDelete: return fsm.applyDelete(log.Data[1:]) } return nil @@ -453,9 +456,11 @@ func (s *StreamLayer) Accept() (net.Conn, error) { b := make([]byte, 1) _, err = conn.Read(b) if err != nil { + conn.Close() return nil, err } if !bytes.Equal([]byte{byte(RaftRPC)}, b) { + conn.Close() return nil, errors.New("not a raft rpc") } return conn, nil diff --git a/kv/kv_test.go b/kv/kv_test.go index 5914757..e357503 100644 --- a/kv/kv_test.go +++ b/kv/kv_test.go @@ -30,7 +30,7 @@ func TestDistributedKVReplication(t *testing.T) { cfg1.Raft.RPCPort = "3000" cfg1.Raft.LocalID = "node-1" cfg1.Raft.Bootstrap = true - cfg1.Raft.StreamLayer = *kv.NewStreamLayer(raftLn1) + cfg1.Raft.StreamLayer = kv.NewStreamLayer(raftLn1) node1, err := kv.New(store1, cfg1) if err != nil { @@ -59,7 +59,7 @@ func TestDistributedKVReplication(t *testing.T) { cfg2.Raft.RPCPort = "3001" cfg2.Raft.LocalID = "node-2" cfg2.Raft.Bootstrap = false - cfg2.Raft.StreamLayer = *kv.NewStreamLayer(raftLn2) + cfg2.Raft.StreamLayer = kv.NewStreamLayer(raftLn2) node2, err := kv.New(store2, cfg2) if err != nil { @@ -146,7 +146,7 @@ func TestDistributedKVReplication(t *testing.T) { cfg3.Raft.RPCPort = "3002" cfg3.Raft.LocalID = "node-3" cfg3.Raft.Bootstrap = false - cfg3.Raft.StreamLayer = *kv.NewStreamLayer(raftLn3) + cfg3.Raft.StreamLayer = kv.NewStreamLayer(raftLn3) node3, err := kv.New(store3, cfg3) if err != nil { diff --git a/mokv/mokv.go b/mokv/mokv.go index 16ae2bd..c01846e 100644 --- a/mokv/mokv.go +++ b/mokv/mokv.go @@ -97,7 +97,7 @@ func New(cfg *Config, getEnv GetEnv) (*MOKV, error) { grpcLn := myCmux.Match(cmux.Any()) // Setup Raft stream layer - kvCFG.Raft.StreamLayer = *kv.NewStreamLayer(raftLn) + kvCFG.Raft.StreamLayer = kv.NewStreamLayer(raftLn) // Initialize store and KV store := store.New()